提问者:小点点

将 CompletableFuture<Stream<T>> 转换为发布服务器是否正确<T>?


允许对来自 CompletableFuture 的结果流进行多次迭代

>

  • 将生成的未来转换为CompletableFuture

    将生成的未来转换为Flux

    通量

    用例:

    我想得到一个带有英超球队名称的序列(例如 Stream

    CompletableFuture<Stream<String>> teams = asyncHttpClient
        .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
        .execute()
        .toCompletableFuture()
        .thenApply(Response::getResponseBody)
        .thenApply(body -> gson.fromJson(body, League.class));
        .thenApply(l -> stream(l.standings).map(s -> s.teamName));
    

    要重用生成的流,我有两个选项:

    1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
    
    2. Flux<String> res = Flux.fromStream(teams::join).cache()
    

    通量

    或者我应该使用 CompletableFuture

    更新了一些想法(2018-03-16):

    可享受未来

    • 〔PROS〕<code>列表

    通量

    • [优点] 声明简洁
    • [优点] 如果我们只想使用它一次,那么我们可以省略 .cache() 并将其转发到下一层,它可以利用响应式 API,例如 web flux 反应式控制器,例如 @GetMapping(产生 =MediaType.TEXT_EVENT_STREAM) 公共 Flux

  • 共2个答案

    匿名用户

        CompletableFuture<Stream<String>> teams = ...;
        Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));
    

    Fulc. fromStream(团队::加入)是一种代码气味,因为它阻塞了当前线程以从另一个线程上运行的CompletableFuture获取结果。

    匿名用户

    一旦您下载了排行榜,并且从该表中提取了球队名称,我不确定您是否需要一个准备好背压的流来迭代这些项目。将流转换为标准列表(或数组)应该足够好,并且可能具有更好的性能,不是吗?

    例如:

    String[] teamNames = teams.join().toArray(String[]::new);