提问者:小点点

在通量中,如果内部通量最终为空,如何完成外部通量


在这些条件下,请考虑以下代码:

生成通量

getOneResponePage(int)最终将返回一个空的通量

package ch.cimnine.test;

import org.junit.Test;
import reactor.core.publisher.Flux;

public class PaginationTest {
    @Test
    public void main() {
        final Flux<Integer> finalFlux = getAllResponses();

        finalFlux.subscribe(resultItem -> {
            try {
                Thread.sleep(200); // Simulate heavy processing
            } catch (InterruptedException ignore) {
            }

            System.out.println(resultItem);
        });
    }

    private Flux<Integer> getAllResponses() {
        Flux<Flux<Integer>> myFlux = Flux.generate(
            () -> 0, // inital page
            (page, sink) -> {
                var innerFlux = getOneResponePage(page); // eventually returns a Flux.empty()

                // my way to check whether the `innerFlux` is now empty
                innerFlux.hasElements().subscribe(
                    hasElements -> {
                        if (hasElements) {
                            System.out.println("hasElements=true");
                            sink.next(innerFlux);
                            return;
                        }

                        System.out.println("hasElements=false");
                        sink.complete();
                    }
                );

                return page + 1;
            }
        );

        return Flux.concat(myFlux);
    }

    private Flux<Integer> getOneResponePage(int page) {
        System.out.println("Request for page " + page);
        
        // there's only content on the first 3 pages
        if (page < 3) {
            return Flux
                .just(1, 2, 3, 5, 7, 11, 13, 17, 23, 27, 31)
                .map(i -> (1000 * page) + i);
        }

        return Flux.empty();
    }
}

目标是有一个名为getAllResponses()的方法,该方法返回连续的流量

  1. 由于我是反应式编程的新手,我的想法对吗
  2. IntelliJ警告我,不建议在非阻塞上下文中调用“subscribe”。如何正确操作

在我的实际代码中,getOne响应页面(int)使用org.springframework.web.reactive.function.client.WebClient发送请求。它连接到返回结果的服务。该服务每次调用最多只返回1000个结果。必须发送偏移参数才能获得更多结果。

API有点奇怪,因为确定您是否拥有所有结果的唯一方法是使用不断增加的偏移量重复查询它,直到您得到一个空结果集。它会很乐意为仍在增加的偏移量返回更多空结果集(...直到达到偏移量的内部最大值并返回400错误请求。)

getOneResponePage(int)的实际实现几乎与此相同:

private Flux<ResponseItem> getOneResponePage(int page) {
    return webClientInstance
        .get()
        .uri(uriBuilder -> {
            uriBuilder.queryParam("offset", page * LIMIT);
            uriBuilder.queryParam("limit", LIMIT);
            // …
        })
        .retrieve()
        .bodyToFlux(ResponseItem.class);
}

共1个答案

匿名用户

没有从内部流停止外部流的直接方法。最接近的方法是在内部序列中使用SwitchIf空Flux.error(NoSuchElementException),然后在外部序列中使用onErrorResumeNext并返回空Flux,如果它找到NoSuchElementException

Flux.just(listOf(1, 2, 3), listOf(), listOf(4, 5, 6))
.flatMap(list ->
     Flux.fromIterable(list)
     .switchIfEmpty(Flux.error(new NoSuchElementException()))
)
.onErrorResumeNext(e -> 
      e instanceof NoSuchElementException ?
      Flux.empty() : Flux.error(e)
);