我一直在与一些Reactor核心Java合作,因为我想弄清楚这是否可以解决我目前使用这个框架遇到的一个问题。
目前我有一个很长的、正在执行的工作,大约需要40-50分钟才能完成。该方法看起来或多或少像这样:
public void doLongTask(List<Something> list){
//instructions.
for(Something sm : list){
if(condition){
executeLongOperation();
}
//instructions
if(condition){
executeLongOperation();
}
}
}
在我的控制器中,我有这样的东西:
@GetMapping(path = "/integersReactor", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Integer> getIntegersReactor(){
logger.debug("Request getIntegersReactor initialized.");
return simpleSearchService.getIntegersReactor();
}
在服务层,我有这样的东西:
@Override
public Flux<Integer> getIntegersReactor(){
return Flux.range(0, Integer.MAX_VALUE);
}
这只是我用作概念证明的占位符。我的真正意图是以某种方式返回我将定义自己的某个对象的Flux,该对象将具有一些字段,我将使用这些字段来告诉消费者作业的状态。
现在,事情变得有些复杂了,因为我想在执行执行时发送更新,而不是返回整数的通量,而是返回使用返回的执行对象的通量;
这可以与Flux一起完成吗?我如何利用Retor Corejava将执行时的所有返回值推送到响应流中,该响应流可以像getIntegersRetor()在我的示例中一样传递给控制器?
是的,这应该是可能的,但是由于执行长操作
是阻塞的,它需要在专用线程上偏移(这会减少您从自上而下的反应式实现中获得的好处)。
更改您的doLongTask
以返回Flux
public Flux<Foo> doLongTask(List<Something> list) {
return Flux.fromIterable(list)
//ensure `Something` are published on a dedicated thread on which
//we can block
.publishOn(Schedulers.elastic()) //maybe a dedicated Scheduler?
//for each `Something`, perform the work
.flatMap(sm -> {
//in case condition is false, we'll avoid long running task
Flux<Foo> work = Flux.empty();
//start declaring the work depending on conditions
if(condition) {
Mono<Foo> op = Mono.fromCallable(this::executeLongOperation);
work = conditional.concatWith(op);
}
//all other instructions should preferably be non-blocking
//but since we're on a dedicated thread at this point, it should be ok
if(condition) {
Mono<Foo> op = Mono.fromCallable(this::executeLongOperation);
work = conditional.concatWith(op);
}
//let the flatMap trigger the work
return work;
});
}