提问者:小点点

SubscribeOn不会改变整个链的线程池


我想通过rest request和WebFlux触发更长时间的运行操作。调用结果应该只返回操作已开始的信息。我想在不同的调度程序上运行长时间运行的操作(例如Schedulers.单())。为了实现这一点,我使用了订阅:

Mono<RecalculationRequested> recalculateAll() {
  return provider.size()
      .doOnNext(size -> log.info("Size: {}", size))
      .doOnNext(size -> recalculate(size))
      .map(RecalculationRequested::new);
}

private void recalculate(int toRecalculateSize) {
  Mono.just(toRecalculateSize)
      .flatMapMany(this::toPages)
      .flatMap(page -> recalculate(page))
      .reduce(new RecalculationResult(), RecalculationResult::increment)
      .subscribeOn(Schedulers.single())
      .subscribe(result -> log.info("Result of recalculation - success:{}, failed: {}",
          result.getSuccess(), result.getFailed()));
}

private Mono<RecalculationResult> recalculate(RecalculationPage pageToRecalculate) {
  return provider.findElementsToRecalculate(pageToRecalculate.getPageNumber(), pageToRecalculate.getPageSize())
      .flatMap(this::recalculateSingle)
      .reduce(new RecalculationResult(), RecalculationResult::increment);
}

private Mono<RecalculationResult> recalculateSingle(ElementToRecalculate elementToRecalculate) {
  return recalculationTrigger.recalculate(elementToRecalculate)
      .doOnNext(result -> {
        log.info("Finished recalculation for element: {}", elementToRecalculate);
      })
      .doOnError(error -> {
        log.error("Error during recalculation for element: {}", elementToRecalculate, error);
      });
}

从上面我想调用:

private void recalculate(int toRecalculateSize)

在不同的线程中。但是,它不在单个线程池上运行-它使用不同的线程池。我希望订阅为整个链更改它。我应该更改什么以及为什么要在单个线程池中执行它?

只是提一下-方法:

provider.findElementsToRecalculate(...)

使用WebClient获取元素。


共2个答案

匿名用户

订阅器的一个警告是它按照它所说的做:它在提供的调度器上运行“订阅”行为。订阅在运行时从下到上流动(订阅器订阅其父发布器)。

通常您会在留档和演示文稿中看到订阅会影响整个链。这是因为大多数运算符/源本身不会更改线程,默认情况下会从订阅它们的线程开始发送onNext/onError信号。

但是一旦一个操作员在从上到下的数据通路中切换线程,的范围就会停止。典型的例子是当链中有一个发布时。

在这种情况下,数据源是react-nettynetty,它们在自己的线程上运行,因此就好像源处有一个PublishOn一样。

对于WebFlux,我赞成在运算符主链中使用发布,或者在内链中使用订阅,例如在平面地图中。

匿名用户

根据留档,所有以doOn为前缀的运算符有时被称为具有“副作用”。它们允许您在不修改它们的情况下窥视序列的事件。

如果您想在“Provider. size()”之后链接“重新计算”步骤,请使用平面图。

相关问题