不按时运行的buffer
运算符版本根据JavaDoc遵守反压:
http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-int-
但是,任何涉及基于时间的缓冲区的buffer
版本都不支持反压,比如这个
http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#buffer-long-java.util.concurrent.TimeUnit-int-
我理解这是因为一旦时间滴答作响,你就不能像间隔
运算符一样停止它,因为同样的原因,它也不支持反压。
我想要的是一个基于大小和时间的缓冲运算符,并且通过将背压信号传播到上游和时间滴答发生器来完全支持背压,类似于这样:
someFlowable()
.buffer(
Flowable.interval(1, SECONDS).onBackpressureDrop(),
10
);
所以现在我可以降低背压信号的滴答声。
这是目前在rxJava2中可以实现的吗?项目Reactor怎么样?
我最近遇到了这个问题,这是我的实现。它可以这样使用:
Flowable<List<T>> bufferedFlow = (some flowable of T)
.compose(new BufferTransformer(1, TimeUnit.MILLISECONDS, 8))
它按您指定的计数支持反压。
这里是实现:https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d
当使用DisposableSubscriber
作为订阅者时,我从https://stackoverflow.com/a/55136139/6719538的解决方案中遇到了问题,据我所知,这个转换器不考虑来自下游订阅者的调用Suscription#request
(它可能会溢出它们)。我创建了我的版本,该版本在生产中进行了测试-BufferTransformerHonorableToBackpressure.java。方-杨-非常尊重想法。
已经有一段时间了,但我又看了一遍,不知何故,我突然想到:
public static <T> FlowableTransformer<T, List<T>> buffer(
int n, long period, TimeUnit unit)
{
return o ->
o.groupBy(__ -> 1)
.concatMapMaybe(
gf ->
gf.take(n)
.take(period, SECONDS)
.toList()
.filter(l -> !l.isEmpty())
);
}
几乎和我描述的一样。如果我是正确的,它是完全背压的,如果没有收集到足够的项目,它将缓冲n个项目,或者在指定的时间后