提问者:小点点

Spring React多重订阅


我正试图将我的单声道拆分为其他分离的单声道,这些单声道将在不同的线程上处理相同的数据输入数据。

public Mono<String> process() { 
        Mono<String> someString = ... // fetching data from API 



        someString
                .publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
                .map(String::toLowerCase)
                .subscribe(this::saveLowercase);

        someString
                .publishOn(Schedulers.fromExecutorService(Executors.newFixedThreadPool(2)))
                .map(String::toUpperCase)
                .subscribe(this::saveUpperCase);

        return someString;
}

我在日志中看到,我提取了3次数据,因为每个订阅调用都从API中提取数据。我想用。cache()方法,但我想知道一些新的更好的方法,只调用一次API和处理乘以这个数据?如果Reactor做不到这一点,我可以把Reactor换成RxJava。


共1个答案

匿名用户

在您给出的示例中,您创建了一个冷单声道。因此,正如您所说,HTTP调用将针对每个订阅者进行。如果希望 HTTP 调用仅发生一次,请创建热可观察对象。您必须使用 .cache(),以便任何未来的订阅者都能获得响应。

这是执行此操作的正确方法。你为什么要寻找“更好的东西”?

官方文档中的示例:

DirectProcessor<String> hotSource = DirectProcessor.create();

Flux<String> hotFlux = hotSource.map(String::toUpperCase).cache();


hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");
hotSource.onNext("green");

hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete(); 

两个订阅者现在都将获得所有颜色。