提问者:小点点

如何创建Flux,其中通量的每个元素都是轮询操作,并在成功或错误时终止通量?


类似于如何使用Spring reactive逐个处理每个产品并进行增量进度更新?

我想做的事是给

enum Status {
  PROCESSING,
  ERROR,
  COMPLETE
}
record MyResp {
  String requestId;
  Status status;
  double progress;
  URI result;
  String errorMessage;
};

Mono<MyResp> requestSomethingSlow(MyReqObjectNotRelevant request);
/**
 * Sets status to SUCCESS or return a Mono.error()
 */
Mono<MyResp> checkIfDone(String requestId);

我想要一个这样的方法:

Flux<MyResp> requestSomethingSlowFlux(MyReqObjectNotRelevant request, Duration delay) {

   return ...???
     ??? requestSomethingSlow(request)
     . ???
     .delayElements(delay)
     . ???
     . checkIfDone(...?)
     ...???

}

我认为它类似于Flux.生成,但我如何转换单声道

我还在查看Mono.扩展,如如何使用返回Mono的生成包装调用创建Flux中所示,但这不起作用,因为我没有有限的调用集。

到目前为止,我的实施尝试看起来像

Flux<MyResp> requestSomethingSlowFlux(MyReqObjectNotRelevant request, Duration delay) {

   return requestSomethingSlow(request)
     .flatMapMany(initialResponse -> {
       if (initialResponse.getStatus() == COMPLETE) {
         return Flux.just(initialResponse);
       } else if (initialResponse.getStatus() == ERROR) {
         return Flux.error(
           new IllegalStateException(
             initialResponse.getErrorMessage()));
       } else {
         ... still figuring this part out
       }
     }
     .delayElements(delay)
     
}

同样类似于你如何在项目Reactor中实现轮询逻辑?但是我希望它是一个显示进度的事件流,而不是全部或没有。


共1个答案

匿名用户

使用这个答案,但删除last()调用,这样我就可以获取所有事件。

Flux<MyResp> requestSomethingSlowFlux(MyReqObjectNotRelevant request, Duration delay) {

   return requestSomethingSlow(request)
     .flatMapMany(initialResponse -> {
       if (initialResponse.getStatus() == COMPLETE) {
         return Flux.just(initialResponse);
       } else if (initialResponse.getStatus() == ERROR) {
         return Flux.error(
           new IllegalStateException(
             initialResponse.getErrorMessage()));
       } else {
         return checkIfDone(initialResponse.getRequestId())
           .repeatWhen(repeat -> repeat.delayElements(delay))
           .doNext(response -> {
             if (response.getStatus() == ERROR) {
               throw new IllegalStateException(
                 initialResponse.getErrorMessage());
             }
           })
           .takeUntil(response -> response.getStatus() != PROCESSING)
       }
     });
     
}

不过它有一些缺陷

  1. 从初始响应开始,状态处理或多或少会重复
  2. 延迟是Mono的一部分,而不是Flux的一部分。因此,我不能让它返回通量并在通量上使用delayElements。我认为这是重复时
  3. 的选择