提问者:小点点

在设计异步Java API时,如何确保整个CompletableFuture链由内部线程池执行?


我正在编写一个库,该库提供了几个返回“可完成未来”的异步方法。该库具有一个内部线程池,用于执行异步方法的计算工作。

我想确保满足以下两个要求:

  1. 返回的“可完成未来”不是由内部线程完成的,因此我的库外部的“可完成未来”链永远不会被库的内部线程池执行
  2. 我的异步方法的所有计算都由内部线程池执行,而不是由用户线程(即方法调用方的线程)执行

所以说图书馆有以下阻止方法

Data request(Address address) {
  Message request = encode(address);
  Message response = sendAndReceive(request);
  Data responseData = decode(response);
  return responseData;
}

和相应的异步方法

  CompletableFuture<Data> requestAsync(Address address) {
    return CompletableFuture.supplyAsync(() -> encode(address), internalThreadPool)
        .thenCompose(request -> sendAndReceiveAsync(request))
        .thenApply(response -> decode(response));
  }

第一个要求是通过添加链接来满足的。当完成同步((v,t) -

但是,为了满足第二个要求,需要做些什么呢?


共1个答案

匿名用户

Sergey Kuksenko在这里讨论了第二个需求的解决方案,并已在Java11的HttpClient实现中实现。

不满足要求,因为不能保证decode(响应)由内部线程执行。如果encode和sendAndReceiveAsync快速完成,decode实际上可以由调用者的线程执行。

可以通过引入CompletableFuturestartCf来启动CF链来解决该问题。

因此,完整的解决方案可能如下所示

CompletableFuture<Data> requestAsyncFixedAll(Address address) {
  CompletableFuture<Void> startCf = new CompletableFuture<>();
  CompletableFuture<Data> dataCf =  startCf.thenApplyAsync(v -> encode(address), internalThreadPool)
    .thenCompose(request -> sendAndReceiveAsync(request))
    .thenApply(response -> decode(response)).whenCompleteAsync((v, t) -> {});
  startCf.complete(null);
  return dataCf;
}