我有同步代码,我想用反应器实现非阻塞。
我想并行调用不同的URI,这些调用可以返回响应、错误或什么都没有。
有3种情况:
我已经以同步方式做到了这一点:
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
String responseBody = Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.blockFirst();
return Pair.of(responseBody, responseException.get());
我想删除阻塞调用并返回Mono
据我所知,我对发生的错误保持一种状态,我不能对Reactor保持一种状态。
如何跟踪发生的错误,但不发出它们,因为我想看看其他请求是否稍后发出结果?
这个版本有效吗?
AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
return Flux.fromIterable(uriList)
.flatMap(url -> repo.sendRequest(uri))
// sendRequest returns a Mono that either emit a response, an error or nothing
.onErrorContinue(WebClientResponseException.class, (error, element) -> {
var webclientError = (WebClientResponseException) error;
responseException.set(webclientError);
})
.next()
.switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));
原子参考会像闭包一样被关闭吗?
我认为MapDelayError
可能会实现你想要的,看这个例子:
int concurrency = 10;
int prefetch = 1;
Flux.just(
Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
.flatMapDelayError(
request -> request,
concurrency,
prefetch)
.next()
.doOnNext(result -> System.out.println("Result: " + result))
在此示例中,error
首先完成,但-DelayError
运算符持有它,然后fast
完成并作为结果发出。最后慢速
永远不会完成,因为. next()
因为我们有结果而取消了剩余的请求。