提问者:小点点

如果通量为空,如何发出过滤掉的错误


我有同步代码,我想用反应器实现非阻塞。

我想并行调用不同的URI,这些调用可以返回响应、错误或什么都没有。

有3种情况:

  • 一个请求返回一个响应,我不等待其他请求完成就返回它。如果其他请求更早地返回了错误,我将删除错误
  • 至少有一个请求返回错误,没有其他请求返回响应,我返回错误
  • 所有请求都没有返回任何内容(没有响应,没有错误),我什么也没有返回

我已经以同步方式做到了这一点:

    AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
    
    String responseBody = Flux.fromIterable(uriList)
            .flatMap(url -> repo.sendRequest(uri)) 
            // sendRequest  returns a Mono that either emit a response, an error or nothing
            .onErrorContinue(WebClientResponseException.class, (error, element) -> {
                var webclientError = (WebClientResponseException) error;
                responseException.set(webclientError);
            })
            .blockFirst();
    
    return Pair.of(responseBody, responseException.get());

我想删除阻塞调用并返回Mono

据我所知,我对发生的错误保持一种状态,我不能对Reactor保持一种状态。

如何跟踪发生的错误,但不发出它们,因为我想看看其他请求是否稍后发出结果?

这个版本有效吗?

AtomicReference<WebClientResponseException> responseException = new AtomicReference<>();
    
    return Flux.fromIterable(uriList)
            .flatMap(url -> repo.sendRequest(uri)) 
            // sendRequest  returns a Mono that either emit a response, an error or nothing
            .onErrorContinue(WebClientResponseException.class, (error, element) -> {
                var webclientError = (WebClientResponseException) error;
                responseException.set(webclientError);
            })
            .next()
            .switchIfEmpty(Mono.defer(() -> Mono.error(responseException.get())));

原子参考会像闭包一样被关闭吗?


共1个答案

匿名用户

我认为MapDelayError可能会实现你想要的,看这个例子:

int concurrency = 10;
int prefetch = 1;

Flux.just(
        Mono.error(new IOException("error")).delaySubscription(Duration.ofSeconds(2)),
        Mono.just("fast").delaySubscription(Duration.ofSeconds(4)),
        Mono.just("slow").delaySubscription(Duration.ofSeconds(6)))
    .flatMapDelayError(
        request -> request,
        concurrency,
        prefetch)
    .next()
    .doOnNext(result -> System.out.println("Result: " + result))

在此示例中,error首先完成,但-DelayError运算符持有它,然后fast完成并作为结果发出。最后慢速永远不会完成,因为. next()因为我们有结果而取消了剩余的请求。