0 前言 本片文章,简单总结下 Flow 背压操作符的原理。
主要从下面几个方面来分析下背压的原理。
1、背压操作符的类型
2、背压操作符的原理
1 Flow 背压操作符 Flow 的背压操作符主要用于处理数据生产速度与消费速度不匹配的问题,核心策略包括缓冲、丢弃旧数据或取消旧处理。以下是主要操作符及其作用:
buffer
功能:缓存数据并控制流速,支持指定缓冲区大小和溢出策略。
溢出策略:
SUSPEND(默认):挂起发射器直到缓冲区有空闲。
DROP_OLDEST:丢弃最旧数据。
DROP_LATEST:丢弃最新数据。
示例:flow.buffer(10, BufferOverflow.SUSPEND)
1 2 3 4 5 6 7 flow<Int> { emit (2 ) }.buffer (5 , BufferOverflow.SUSPEND).filter { it > 5 }.collect { print (it) }
conflate
功能:==丢弃旧数据,仅保留最新值==,不设缓冲区。
适用场景:数据更新频繁但旧值无需保留(如UI刷新)。
示例:flow.conflate()
collectLatest
功能:==取消前一个未完成的处理,仅处理最新值==。
适用场景:需确保最新数据优先处理(如网络请求)。
1 2 3 4 flowOf(1 , 2 , 3 ).collectLatest { delay(1 ) println(it) }
2 buffer 操作符 我们来分析下 buffer 的核心原理:
capacity 表示容量,可以指定为具体的数字,也可以选择 Kotlin 协程中 Channel 工厂的伴生对象定义的一些参数:
BufferOverflow 背压策略;SUSPEND 发送方挂起、DROP_OLDEST 丢弃最旧的、DROP_LATEST 丢弃最新的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> { require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) { "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity" } require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } var capacity = capacity var onBufferOverflow = onBufferOverflow if (capacity == CONFLATED) { capacity = 0 onBufferOverflow = BufferOverflow.DROP_OLDEST } return when (this ) { is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow) else -> ChannelFlowOperatorImpl(this , capacity = capacity, onBufferOverflow = onBufferOverflow) } 菜
buffer 的核心原理基于 kotlin 的 channel,参数 capacity 可以通过设置如下的值来确定使用哪个 channel。
可以看到,最后创建了一个 ChannelFlowOperatorImpl,包裹上游的 Flow 对象。
Channel 类型 1 public const val UNLIMITED: Int = Int .MAX_VALUE
无限容量的 channel,使用 Int.MAX_VALUE 作为标识。
1 public const val RENDEZVOUS: Int = 0
约会类型的 channel,sender 和 receiver 必须同时存在。
1 public const val CONFLATED: Int = -1
1 public const val BUFFERED: Int = -2
基于队列的一个默认大小为 64 的缓存大小的 channel。
ChannelFlowOperatorImpl 我们知道 Flow 默认是冷流,也就是说,需要下游主动注入 FlowCollector 才能触发流的传递!
ChannelFlowOperatorImpl 本身也是一个 Flow,基于 Channel,关键在于 collect 方法,位于 ChannelFlowOperator 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 internal class ChannelFlowOperatorImpl <T>( flow: Flow<T>, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) { override fun create (context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) : ChannelFlow<T> = ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow) override fun dropChannelOperators () : Flow<T> = flow override suspend fun flowCollect (collector: FlowCollector<T>) = flow.collect(collector) } internal abstract class ChannelFlowOperator <S, T>( @JvmField protected val flow: Flow<S>, context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow ) : ChannelFlow<T>(context, capacity, onBufferOverflow) { ... ... ... override suspend fun collect (collector: FlowCollector<T>) { if (capacity == Channel.OPTIONAL_CHANNEL) { val collectContext = coroutineContext val newContext = collectContext.newCoroutineContext(context) if (newContext == collectContext) return flowCollect(collector) if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor]) return collectWithContextUndispatched(collector, newContext) } super .collect(collector) } } @InternalCoroutinesApi public abstract class ChannelFlow <T>( @JvmField public val context: CoroutineContext, @JvmField public val capacity: Int, @JvmField public val onBufferOverflow: BufferOverflow ) : FusibleFlow<T> { ... ... ... override suspend fun collect (collector: FlowCollector<T>) : Unit = coroutineScope { collector.emitAll(produceImpl(this )) } }
可以看到:collect 实际上是在父类 ChannelFlow 内部。
核心逻辑:基于 coroutineScope 作用域,启动了一个协程,然后执行 produceImpl 和 emitAll;
上游 produceImpl 协程创建 produceImpl 内部通过 produce 创建了一个协程序:
1 2 public open fun produceImpl (scope: CoroutineScope) : ReceiveChannel<T> = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
produce 会创建一个协程:内部会有一个 channel,协程体内部 send,然后返回一个 ReceiveChannel 对象,其他的协程在协程体内部 receive 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 internal fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0 , onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null , @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E> { val channel = Channel<E>(capacity, onBufferOverflow) val newContext = newCoroutineContext(context) val coroutine = ProducerCoroutine(newContext, channel) if (onCompletion != null ) coroutine.invokeOnCompletion(handler = onCompletion) coroutine.start(start, coroutine, block) return coroutine }
不多说。
协程体–collectToFun collectToFun 内部为协程体,核心逻辑是:执行 collectTo 操作:
1 2 3 4 5 6 7 8 9 10 11 internal val collectToFun: suspend (ProducerScope<T>) -> Unit get () = { collectTo(it) } protected override suspend fun collectTo (scope: ProducerScope<T>) = flowCollect(SendingCollector(scope)) override suspend fun flowCollect (collector: FlowCollector<T>) = flow.collect(collector)
可以看到这里创建了一个 SendingCollector 对象,然后向上游 Flow 对象传过去,这个在 Flow 的启动流程中有分析,不多说。
这样就能触发上游的 flow 发送数据!
那么 SendingCollector 的 emit 方法做了什么呢?
SendingCollector 1 2 3 4 5 6 @InternalCoroutinesApi public class SendingCollector <T>( private val channel: SendChannel<T> ) : FlowCollector<T> { override suspend fun emit (value: T) : Unit = channel.send(value) }
果然:是把上游的数据通过 channel.send 进行发送:
可以看到,buffer 让上游的逻辑运行在 produce 创建的协程中了.
下游 FlowCollector.emitAll 再来看看下游该如何接受数据,注意参数:ReceiveChannel,这是前面 produceImpl 创建的 Channel:
也就是说,核心逻辑是从 channel 里面读取数据:
对于 emitAll 则是运行在另外一个协程中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit = emitAllImpl(channel, consume = true ) private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) { ensureActive() var cause: Throwable? = null try { for (element in channel) { emit(element) } } catch (e: Throwable) { cause = e throw e } finally { if (consume) channel.cancelConsumed(cause) } }
最终会走到 channel 内部的逻辑中,for-in 循环遍历集合是通过迭代器(Iterator)来实现的。
1 2 3 4 5 6 val iterator = channel.iterator()while (iterator.hasNext()) { val element = iterator.next() }
我们假设,此时创建的 channel 是 BUFFERED 类型,对应的 BufferedChannel,那么其迭代器是 BufferedChannelIterator。
其他的 ChannelIterator 逻辑类似:
BufferedChannelIterator 我们来看看迭代的逻辑:
hasNext() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 override suspend fun hasNext () : Boolean = receiveImpl( waiter = null , onElementRetrieved = { element -> this .receiveResult = element true } onSuspend = { _, _, _ -> error("unreachable" ) }, onClosed = { onClosedHasNext() } onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) } )
这最关键的就是:
onElementRetrieved :用于保存下一个要访问的元素。
onNoWaiterSuspend :在需要挂起的时候,会执行,然后进入挂起状态。
尾调用优化 : 在挂起情况下使用尾调用,避免栈帧增长;
当 channel 为空时,receiveImpl 会调用 onNoWaiterSuspend 回调,这里使用了 return 来从当前函数返回,进入真正的挂起函数。
receiveImpl - 接收总入口 receiveImpl 是一个内联函数,所以会直接展开,可以直接访问到外部的 Conitnuation 对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 private inline fun <R> receiveImpl( waiter: Any?, onElementRetrieved: (element: E) -> R, onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R, onClosed: () -> R, onNoWaiterSuspend: ( segm: ChannelSegment<E>, i: Int, r: Long ) -> R = { _, _, _ -> error("unexpected" ) } ): R { var segment = receiveSegment.value while (true ) { if (isClosedForReceive) return onClosed() val r = this .receivers.getAndIncrement() val id = r / SEGMENT_SIZE val i = (r % SEGMENT_SIZE).toInt() if (segment.id != id) { segment = findSegmentReceive(id, segment) ?: continue } val updCellResult = updateCellReceive(segment, i, r, waiter) return when { updCellResult === SUSPEND -> { (waiter as? Waiter)?.prepareReceiverForSuspension(segment, i) onSuspend(segment, i, r) } updCellResult === FAILED -> { if (r < sendersCounter) segment.cleanPrev() continue } updCellResult === SUSPEND_NO_WAITER -> { onNoWaiterSuspend(segment, i, r) } else -> { segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } } } } onNoWaiterSuspend = { segm, i, r -> return hasNextOnNoWaiterSuspend(segm, i, r) }
对于 bufferchannel 的逻辑,这篇文章先不做讨论,后续会单独写一篇文章来分析。
这里能看出:
如果有数据的话,那么会直接会 return 内部链表中的数据。
否则会返回 suspend,表示即将挂起状态。
注意:
这个 SUSPEND/SUSPEND_NO_WAITER 并不是真正的挂起,这是一个 channel 内部的状态。
原因:receiveImpl 不是 suspend 函数,挂起的条件首先必须是 suspend 函数!因为编译器只会识别 suspend 函数。
真正导致挂起的是在 hasNextOnNoWaiterSuspend 函数中。
hasNextOnNoWaiterSuspend – 真正的挂起 suspend 如果要挂起的话,根据逻辑 hasNextOnNoWaiterSuspend 会被调用,这里真正的挂起:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private suspend fun hasNextOnNoWaiterSuspend ( segment: ChannelSegment<E>, index: Int, r: Long ) : Boolean = suspendCancellableCoroutineReusable { cont -> this .continuation = cont receiveImplOnNoWaiter( segment = segment, index = index, r = r, waiter = this , onElementRetrieved = { element -> this .receiveResult = element this .continuation = null cont.resume(true , onUndeliveredElement?.bindCancellationFun(element, cont.context)) }, onClosed = { onClosedHasNextNoWaiterSuspend() } ) }
suspendCancellableCoroutineReusable 创建 CancellableContinuationImpl,包裹外部协程体;
将 cont 保存到 continuation 字段;
注册迭代器自己 (this) 为等待者,让发送方知道有接收者在等待;
注册的核心逻辑是在:receiveImplOnNoWaiter 方法中。
当 channel 的缓冲区为空的时候,send 和 receive 的关系 简单的总结下 send 和 receive 的逻辑关系,当 channel 的缓冲区为空的时候:
接收者
调用 iterator.hasNext()
receiveImpl 发现缓冲区为空
调用 hasNextOnNoWaiterSuspend
保存外部协程体到 continuation 字段
receiveImplOnNoWaiter 注册等待者到 channel
协程挂起。【这里就是 collector.emitAll 的挂起逻辑】。
1 2 3 4 5 6 7 8 9 10 11 12 13 iterator.hasNext() 调用 → receiveImpl(waiter = null ) → updateCellReceive 中尝试获取数据,结果木有,需要挂起,调用 onNoWaiterSuspend → hasNextOnNoWaiterSuspend 创建协程体 → receiveImplOnNoWaiter(waiter = this ) → updateCellReceive → state === null && r >= senders → segment.casState(null → waiter) 成功 → expandBuffer() → 返回 SUSPEND → waiter.prepareReceiverForSuspension() → 从 receiveImplOnNoWaiter 返回 → 从 hasNextOnNoWaiterSuspend 返回,协程挂起
1 2 3 4 5 6 7 8 9 发送者调用 send 相关方法 → 发现单元格中有等待者 (waiter) → 调用 waiter.tryResumeHasNext(element) → 从 continuation 字段获取协程体 → 清理 continuation = null → 保存元素 receiveResult = element → cont.tryResume0(true , ...) 恢复协程体 → 协程从挂起点继续执行 → hasNext() 返回 true
receiveImplOnNoWaiter – 挂起入口–>无法获取元素就准备进入挂起状态 receiveImplOnNoWaiter 这个函数是用于在无法立即获取元素时,将等待者(waiter)注册到通道的特定位置,并在适当的时候恢复,参数:
segment: 当前操作的通道段。
index: 段内的索引,指定具体的单元格。
r: 单元格的全局索引。
waiter: 要注册的等待者,通常是迭代器本身,很简单,当有数据时,waiter 要被唤醒继续执行。
onElementRetrieved: 成功获取元素时的回调。
onClosed: 通道关闭时的回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private inline fun receiveImplOnNoWaiter ( segment: ChannelSegment<E>, index: Int, r: Long, waiter: Waiter, onElementRetrieved: (element: E) -> Unit, onClosed: () -> Unit ) { val updCellResult = updateCellReceive(segment, index, r, waiter) when { updCellResult === SUSPEND -> { waiter.prepareReceiverForSuspension(segment, index) } updCellResult === FAILED -> { if (r < sendersCounter) segment.cleanPrev() receiveImpl( waiter = waiter, onElementRetrieved = onElementRetrieved, onSuspend = { _, _, _ -> }, onClosed = onClosed ) } else -> { segment.cleanPrev() @Suppress("UNCHECKED_CAST") onElementRetrieved(updCellResult as E) } } }
updateCellReceive 该方法是从 channel 中尝试获取元素,如果能获取到,那就返回;无法获取,就注册 Node 节点到 channel:
这个方法在获取数据和注册 waiter 的时候都会调用到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 private fun updateCellReceive ( segment: ChannelSegment<E>, index: Int, r: Long, waiter: Any?, ) : Any? { val state = segment.getState(index) when { state === null -> { val senders = sendersAndCloseStatus.value.sendersCounter if (r >= senders) { if (waiter === null ) { return SUSPEND_NO_WAITER } if (segment.casState(index, state, waiter)) { expandBuffer() return SUSPEND } } } state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) { expandBuffer() return segment.retrieveElement(index) } } return updateCellReceiveSlow(segment, index, r, waiter) }
这里是有一些优化操作:
总结 核心点是:通过 produce 创建了一个协程,通过 channel 来实现数据交互。
下游从 channel 中读取数据;
上游在 produce 创建的协程体中,向 channel 发送数据;
这也就是 buffer 的高效地方:在不同的协程中实现 send 和 receive 方法。
channel 常见的 3 种恢复机制如下,emitAll 这个属于第一种,当发送方 send 的时候,就会触发 tryResumeHasNext:
发送者发送元素 : tryResumeHasNext(element)
通道关闭 : tryResumeHasNextOnClosedChannel()
协程取消 : invokeOnCancellation()
3 conflate 操作符 我们来看看 conflate 的源码:
1 public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
其实就是 buffer 换皮而已。
CONFLATED 决定了内部 channel 类型为 CONFLATED,这种类型的 channel 只能存一个值,新值会覆盖旧值。
4 collectLatest 操作符 collectLatest 的话,其可以做到:取消前一个未完成的处理,仅处理最新值
只处理流中的最新值,当新值到来时,如果前一个值的处理还在进行,会立即取消前一个处理并开始处理新值。
1 2 3 4 5 6 7 8 public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) { mapLatest(action).buffer(0 ).collect() } @ExperimentalCoroutinesApi public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> = transformLatest { emit(transform(it)) }
可以看到
mapLatest(action) 作用:
将原始流 Flow<T> 转换为 Flow<Unit>
对每个值应用 action,但如果新值到达时前一个 action 还在执行,会取消前一个 action
返回一个新的流,其中每个元素代表一个 action 调用的开始
可以看到 transformLatest 新建了一个 ChannelFlowTransformLatest 对象,也是基于 ChannelFlow 对象的,和 buffer 很类似;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @ExperimentalCoroutinesApi public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> = ChannelFlowTransformLatest(transform, this ) internal class ChannelFlowTransformLatest <T, R>( private val transform: suspend FlowCollector<R>.(value: T) -> Unit, flow: Flow<T>, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) { override fun create (context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) : ChannelFlow<R> = ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)
ChannelFlowTransformLatest 是 ChannelFlowOperator 的子类。
关键在 flowCollect 方法中。
flowCollect– 核心 在 flowCollect 中进行 FlowCollector 的注入的时候,会在中间的 FlowCollector 的 emit 方法内直接 launch 一个新的协程。
并将这个协程保存到 previousFlow 里,那么如果是多个值的话,上一个 previousFlow 可以被取消;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 override suspend fun flowCollect (collector: FlowCollector<R>) { assert { collector is SendingCollector } coroutineScope { var previousFlow: Job? = null flow.collect { value -> previousFlow?.apply { cancel(ChildCancelledException()) join() } previousFlow = launch(start = CoroutineStart.UNDISPATCHED) { collector.transform(value) } } } }
5 总结 OK,关于 Flow 背压机制,到这里就分析完毕了,回见~