我想通过rest request和WebFlux触发更长时间的运行操作。调用结果应该只返回操作已开始的信息。我想在不同的调度程序上运行长时间运行的操作(例如Schedulers.单())。为了实现这一点,我使用了订阅:
Mono<RecalculationRequested> recalculateAll() {
return provider.size()
.doOnNext(size -> log.info("Size: {}", size))
.doOnNext(size -> recalculate(size))
.map(RecalculationRequested::new);
}
private void recalculate(int toRecalculateSize) {
Mono.just(toRecalculateSize)
.flatMapMany(this::toPages)
.flatMap(page -> recalculate(page))
.reduce(new RecalculationResult(), RecalculationResult::increment)
.subscribeOn(Schedulers.single())
.subscribe(result -> log.info("Result of recalculation - success:{}, failed: {}",
result.getSuccess(), result.getFailed()));
}
private Mono<RecalculationResult> recalculate(RecalculationPage pageToRecalculate) {
return provider.findElementsToRecalculate(pageToRecalculate.getPageNumber(), pageToRecalculate.getPageSize())
.flatMap(this::recalculateSingle)
.reduce(new RecalculationResult(), RecalculationResult::increment);
}
private Mono<RecalculationResult> recalculateSingle(ElementToRecalculate elementToRecalculate) {
return recalculationTrigger.recalculate(elementToRecalculate)
.doOnNext(result -> {
log.info("Finished recalculation for element: {}", elementToRecalculate);
})
.doOnError(error -> {
log.error("Error during recalculation for element: {}", elementToRecalculate, error);
});
}
从上面我想调用:
private void recalculate(int toRecalculateSize)
在不同的线程中。但是,它不在单个线程池上运行-它使用不同的线程池。我希望订阅为整个链更改它。我应该更改什么以及为什么要在单个线程池中执行它?
只是提一下-方法:
provider.findElementsToRecalculate(...)
使用WebClient获取元素。
订阅器
的一个警告是它按照它所说的做:它在提供的调度器
上运行“订阅”行为。订阅在运行时从下到上流动(订阅器
订阅其父发布器
)。
通常您会在留档和演示文稿中看到订阅
会影响整个链。这是因为大多数运算符/源本身不会更改线程,默认情况下会从订阅它们的线程开始发送onNext
/onError
信号。
但是一旦一个操作员在从上到下的数据通路中切换线程,的范围就会停止。典型的例子是当链中有一个
发布
时。
在这种情况下,数据源是react-netty
和netty
,它们在自己的线程上运行,因此就好像源处有一个PublishOn
一样。
对于WebFlux,我赞成在运算符主链中使用发布
,或者在内链中使用订阅
,例如在平面地图
中。
根据留档,所有以doOn为前缀的运算符有时被称为具有“副作用”。它们允许您在不修改它们的情况下窥视序列的事件。
如果您想在“Provider. size()”之后链接“重新计算”步骤,请使用平面图。