在RxJava 2 Flowable中有不同的反压策略,其中最有趣的是:
在整个Rx链中受到尊重。
在静态编程语言中,有Flow,它声明它具有开箱即用的背压支持。我能够使用以下方法使Flow具有BUFFER和LATEST策略:
对于缓冲:
observeFlow()
.buffer(10)
.collect { ... }
与最新:
observeFlow()
.conflate()
.collect { ... }
这只是同一个缓冲区运算符的快捷方式。
但是我找不到任何可以和DROP一样工作的东西。简而言之,当以前的值还没有被处理时,DROP会丢弃流中的任何值。对于Flow,我甚至不确定这是可能的。
考虑到本案:
observeFlow()
.backpressureDrop() // non-existent operator, just for illustrative purposes
.map { ... }
.flatMapMerge { ... }
.collect { ... }
因此backpressureDrop应该尊重流中下面完成的任何工作,而该运算符对下面发生的事情一无所知(没有来自RxJava订阅服务器中类似底部的“请求”方法的显式回调)。因此,这似乎是不可能的。并且该运算符不应该在收集前一个项目之前传递任何事件。
有没有现成的操作符,我错过了,或者有没有一种直接的方法可以用现有的API实现这样的东西?
我们可以使用由会合通道支持的流来构建它。
当容量为0时-它创建RendezvousChannel。该通道根本没有任何缓冲区。只有当发送和接收调用及时相遇(会合)时,元素才会从发送方传输到接收方,因此send挂起,直到另一个协程调用接收和接收挂起,直到另一个协程调用send。
会合通道没有缓冲区。因此,该通道的消费者需要被挂起并等待下一个元素才能将元素发送到该通道。我们可以利用这种质量来丢弃在通道未挂起的情况下无法接受的值,使用Channel. offer
,这是一个正常的非挂起函数。
Channel. offer
如果可以在不违反容量限制的情况下立即将元素添加到此队列中并返回true。否则,它会立即返回false或在通道为ClosedFor发送时抛出异常(有关详细信息,请参阅关闭)。
因为channelFlow
是缓冲的,所以我们需要应用Flow
/**
* Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
* are offered to the underlying [channelFlow]. If the consumer is not currently suspended and
* waiting for the next element, the element is dropped.
*
* @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
*/
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
collect { offer(it) }
}.buffer(capacity = 0)
下面是一个慢消费者如何使用它来删除元素的示例。
fun main() = runBlocking {
flow {
(0..100).forEach {
emit(it)
delay(100)
}
}.drop().collect {
delay(1000)
println(it)
}
}
具有相应的输出:
0
11
21
31
41
51
61
71
81
91
有没有一种直接的方法来实现这样的东西
取决于你对直截了当的衡量。我是这样做的。
反压力转换为协程世界中的程序化暂停和恢复。对于onBackpressureDrop
,下游必须指示它已准备好处理一个项目并将其挂起,而上游不应等待下游准备好。
您必须以无界的方式使用上游,并将项目和终端事件移交给下游,等待这些信号。
package hu.akarnokd.kotlin.flow.impl
import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
: AbstractFlow<T>() {
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
coroutineScope {
val consumerReady = AtomicBoolean()
val producerReady = Resumable()
val value = AtomicReference<T>()
val done = AtomicBoolean()
val error = AtomicReference<Throwable>();
launch {
try {
source.collect {
if (consumerReady.get()) {
value.set(it);
consumerReady.set(false);
producerReady.resume();
}
}
done.set(true)
} catch (ex: Throwable) {
error.set(ex)
}
producerReady.resume()
}
while (true) {
consumerReady.set(true)
producerReady.await()
val d = done.get()
val ex = error.get()
val v = value.getAndSet(null)
if (ex != null) {
throw ex;
}
if (d) {
break;
}
collector.emit(v)
}
}
}
}
注:可恢复实施。
所以让我们来看看实现。
首先,需要5个变量来在上游的收集器和为下游工作的收集器之间传递信息:-消费者就绪
表示下游准备好下一个项目,-producerReady
表示生产者已经存储了下一个项目(或终端信号)并且下游可以恢复-value
上游项目准备好消费-done
上游已经结束-error
上游已经失败
接下来,我们必须为上游启动收集器,因为收集是挂起的,并且在完成之前根本不会让下游消费者循环运行。在这个收集器中,我们检查下游消费者是否准备好了(通过消费者就绪
),如果是,则存储当前项目,清除就绪标志并通过producerReady
通知其可用性。清除消费者就绪
将阻止存储后续的上游项目,直到下游本身表示新的就绪状态。
当上游结束或崩溃时,我们设置done
或error
变量并指示生产者已发言。
在启动{}
部分之后,我们现在将继续代表下游收集器使用共享变量。
每轮的第一件事是指示我们准备好下一个值,然后等待生产者端信号,它已将下一个事件放入共享变量中。
接下来,我们从这些变量中收集值。我们渴望完成或抛出错误,并且仅作为最后手段将上游项目重新发送给下游收集器。
但问题是默认情况下,channelFlowbuilder使用BUFFER
策略,不允许参数化容量。
有一种方法可以参数化ChannelFlowBuilder中的容量,但问题是API是内部的,ChannelFlowBuilder
是私有的。
但本质上,如果复制粘贴ChannelFlowBuilder实现并创建这样的类:
class BackPressureDropFlow<T>(private val source: Flow<T>) : AbstractFlow<T>() {
@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
ChannelFlowBuilder<T>({ source.collect { offer(it) } }, capacity = 0)
.collect { collector.emit(it) }
}
}
(或直接应用类似的解决方案作为转换)。
然后它似乎工作。
这里的主键是使用容量=0
,它说下游将在接收到的每个项目上暂停(因为没有缓冲区容量)。