提问者:小点点

rxJava buffer()带有尊重背压的时间


不按时运行的buffer运算符版本根据JavaDoc遵守反压:

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-int-

但是,任何涉及基于时间的缓冲区的buffer版本都不支持反压,比如这个

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-long-java.util.concurrent.TimeUnit-int-

我理解这是因为一旦时间滴答作响,你就不能像间隔运算符一样停止它,因为同样的原因,它也不支持反压。

我想要的是一个基于大小和时间的缓冲运算符,并且通过将背压信号传播到上游和时间滴答发生器来完全支持背压,类似于这样:

someFlowable()
.buffer(
     Flowable.interval(1, SECONDS).onBackpressureDrop(),
     10
);

所以现在我可以降低背压信号的滴答声。

这是目前在rxJava2中可以实现的吗?项目Reactor怎么样?


共3个答案

匿名用户

我最近遇到了这个问题,这是我的实现。它可以这样使用:

    Flowable<List<T>> bufferedFlow = (some flowable of T)
                              .compose(new BufferTransformer(1, TimeUnit.MILLISECONDS, 8))

它按您指定的计数支持反压。

这里是实现:https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d

匿名用户

当使用DisposableSubscriber作为订阅者时,我从https://stackoverflow.com/a/55136139/6719538的解决方案中遇到了问题,据我所知,这个转换器不考虑来自下游订阅者的调用Suscription#request(它可能会溢出它们)。我创建了我的版本,该版本在生产中进行了测试-BufferTransformerHonorableToBackpressure.java。方-杨-非常尊重想法。

匿名用户

已经有一段时间了,但我又看了一遍,不知何故,我突然想到:

public static <T> FlowableTransformer<T, List<T>> buffer(
    int n, long period, TimeUnit unit)
{
    return o ->
        o.groupBy(__ -> 1)
         .concatMapMaybe(
             gf ->
                 gf.take(n)
                   .take(period, SECONDS)
                   .toList()
                   .filter(l -> !l.isEmpty())
         );
}

几乎和我描述的一样。如果我是正确的,它是完全背压的,如果没有收集到足够的项目,它将缓冲n个项目,或者在指定的时间后