提问者:小点点

使用Project Retor管理对共享资源的访问


如何使用项目Reactor管理对共享资源的访问?

给定一个虚构的关键组件,它当时只能执行操作(文件存储、昂贵的远程服务等),如果有多个访问该组件的点(多个API方法、订阅者…),如何以响应式方式编排对该组件的访问?如果资源可以自由执行操作,它应该立即执行它,如果其他操作已经在进行中,则将我的操作添加到队列中,并在我的操作完成后完成我的Mono。

我的想法是将任务添加到一个接一个地执行任务的通量队列中,并返回一个Mono,一旦队列中的任务完成,该Mono将完成,而不会阻塞。

class CriticalResource {

    private final Sinks.Many<Mono<?>> taskExecutor = Sinks.many()
                                                          .unicast()
                                                          .onBackpressureBuffer();

    private final Disposable taskExecutorDisposable =  taskExecutor.asFlux()
                                                                      .concatMap(Function.identity()) //this executes actions in sequential order
                                                                      .subscribe();


    public Mono<Void> resourceOperation1() {
        doSomething();
        .as(this::sequential);
    }


    public Mono<Void> resourceOperation2() {
        doSomethingElse();
        .as(this::sequential);
    }


    public Mono<Void> resourceOperation3() {
        doSomething();
        .then(somethingElse())
        .as(this::sequential);
    }

    private <T> Mono<T> sequential(Mono<T> action) {
        return Mono.defer(() -> {
            Sinks.One<T> actionResult = Sinks.one(); //create a new mono which should complete when our action mono completes. 
//Since task executor subscribes to action mono, we are subscribing on action result mono
            while (taskExecutor.tryEmitNext(action.doOnError(t -> actionResult.emitError(t,
                                                                                         Sinks.EmitFailureHandler.FAIL_FAST))
                                                  .doOnSuccess(next -> actionResult.emitValue(next,
                                                                                              Sinks.EmitFailureHandler.FAIL_FAST)))
                    != Sinks.EmitResult.OK) {
            }
            return actionResult.asMono();
        });
    }
    
}

这不是一个完整的例子,因为需要更多的工作来正确传播反压力、传输上下文等…但是我想知道是否有更好的方法来实现这一点,由Project Retor支持?


共1个答案

匿名用户

本质上,这看起来像是Reactor池功能的简化版本。你考虑过使用它吗?例如。最大大小为1?

https://github.com/reactor/reactor-pool/

https://projectreactor.io/docs/pool/0.2.7/api/reactor/pool/Pool.html

池可能是矫枉过正的,因为它的开销是必须在多个竞争借款者之上处理多个资源,就像你的情况一样,但也许它可以为你更进一步提供一些灵感。