类似于如何使用Spring reactive逐个处理每个产品并进行增量进度更新?
我想做的事是给
enum Status {
PROCESSING,
ERROR,
COMPLETE
}
record MyResp {
String requestId;
Status status;
double progress;
URI result;
String errorMessage;
};
Mono<MyResp> requestSomethingSlow(MyReqObjectNotRelevant request);
/**
* Sets status to SUCCESS or return a Mono.error()
*/
Mono<MyResp> checkIfDone(String requestId);
我想要一个这样的方法:
Flux<MyResp> requestSomethingSlowFlux(MyReqObjectNotRelevant request, Duration delay) {
return ...???
??? requestSomethingSlow(request)
. ???
.delayElements(delay)
. ???
. checkIfDone(...?)
...???
}
我认为它类似于Flux.生成
,但我如何转换单声道
我还在查看Mono.扩展
,如如何使用返回Mono的生成包装调用创建Flux中所示,但这不起作用,因为我没有有限的调用集。
到目前为止,我的实施尝试看起来像
Flux<MyResp> requestSomethingSlowFlux(MyReqObjectNotRelevant request, Duration delay) {
return requestSomethingSlow(request)
.flatMapMany(initialResponse -> {
if (initialResponse.getStatus() == COMPLETE) {
return Flux.just(initialResponse);
} else if (initialResponse.getStatus() == ERROR) {
return Flux.error(
new IllegalStateException(
initialResponse.getErrorMessage()));
} else {
... still figuring this part out
}
}
.delayElements(delay)
}
同样类似于你如何在项目Reactor中实现轮询逻辑?但是我希望它是一个显示进度的事件流,而不是全部或没有。
使用这个答案,但删除last()
调用,这样我就可以获取所有事件。
Flux<MyResp> requestSomethingSlowFlux(MyReqObjectNotRelevant request, Duration delay) {
return requestSomethingSlow(request)
.flatMapMany(initialResponse -> {
if (initialResponse.getStatus() == COMPLETE) {
return Flux.just(initialResponse);
} else if (initialResponse.getStatus() == ERROR) {
return Flux.error(
new IllegalStateException(
initialResponse.getErrorMessage()));
} else {
return checkIfDone(initialResponse.getRequestId())
.repeatWhen(repeat -> repeat.delayElements(delay))
.doNext(response -> {
if (response.getStatus() == ERROR) {
throw new IllegalStateException(
initialResponse.getErrorMessage());
}
})
.takeUntil(response -> response.getStatus() != PROCESSING)
}
});
}
不过它有一些缺陷
重复时