0 前言 本片文章,简单总结下 Flow 线程切换的原理。
前面分析了 buffer 背压的原理,这里趁热打铁,讲讲 flowon 切换线程的原理:
1 flowon 和 Rxjava 的对比 RxJava 有 2 个操作符:subscribeOn 和 observeOn
subscribeOn
作用 :指定 上游 的线程。它决定了 Observable.create { ... } 在哪个线程执行。
数量 :多次调用 subscribeOn 只有第一次生效 (最靠近源头的那个)。
observeOn
作用 :指定 下游 的线程。它会影响调用 observeOn 之后所有操作的执行线程。
数量 :可以多次调用 ,从而在流的不同阶段切换线程。
Flow 更加简单,只有一个 flowon,flowon 等于 subscribeOn + observeOn:
作用 :flowOn 会改变 上游 所有操作的 CoroutineContext,这一点等于 RxJava 中 subscribeOn 的效果;但是它又可以生效多次,所以又等于 RxJava 中 observeOn 的效果;
机制 :flowOn 会在流中创建一个缓冲区,上游在指定的调度器上发射数据,下游从缓冲区中消费。
数量 :可以多次调用 ,每次调用都会改变它之前(上游) 的上下文。
其实看到:创建一个缓冲区,大家能猜到,FlowON 实际上是基于 buffer 的。
下面我们分析下源码即可。
2 flowon 源码分析 下面是 flowOn 函数的源码:
1 2 3 4 5 6 7 8 public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this , context = context) } }
可以看到,flowOn 方法内部对一些条件做了不同的处理;
EmptyCoroutineContext,也就是不切换线程,直接返回上游的 Flow 对象;
如果是 FusibleFlow,融合类型的 Flow,那么就执行 FusibleFlow 的 fuse 方法,创建一个新的 Flow;
其他情况,创建一个 ChannelFlowOperatorImpl 对象。
先来简单讲一下,什么是融合 Flow:FusibleFlow
FusibleFlow 的目的是优化执行链。当 Flow 链路存在多个操作符的时候,协程库会将这些操作符融合到一起,减少不必要的上下文切换和资源调度,提升性能。
我们先来看我们最熟悉的 ChannelFlowOperatorImpl;
2.1 ChannelFlowOperatorImpl ChannelFlowOperatorImpl 在前面 buffer 有提到:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 internal class ChannelFlowOperatorImpl <T >( flow: Flow<T>, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) { override fun create (context: CoroutineContext , capacity: Int , onBufferOverflow: BufferOverflow ) : ChannelFlow<T> = ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow) override fun dropChannelOperators () : Flow<T> = flow override suspend fun flowCollect (collector: FlowCollector <T >) = flow.collect(collector) }
collect 核心是在他的 collect 方法里,前面 buffer 没有讲这个可选通道,这里用篇幅讲下我的理解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 override suspend fun collect (collector: FlowCollector <T >) { if (capacity == Channel.OPTIONAL_CHANNEL) { val collectContext = coroutineContext val newContext = collectContext.newCoroutineContext(context) if (newContext == collectContext) return flowCollect(collector) if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor]) return collectWithContextUndispatched(collector, newContext) } super .collect(collector) }
这里的 coroutineContext 来自调用 collect suspend 函数时,所在的协程的上下文,由于 Flow 的 collect 链路是反向自下而上执行的,所以当前的 flowOn 的 coroutineContext 来自下游:
1 2 3 4 5 6 7 8 flow { emit(1 ) } .flowOn(Dispatchers.IO) .filter { it > 5 } .flowOn(Dispatchers.IO + exceptionHandler) .collect { println(it) }
快路径–上下文完全相同【路径压缩】 上面第一个 flowOn 是一个例子,下面又是一个简单的例子:
1 2 3 4 5 6 7 8 9 10 withContext(Dispatchers.IO) { flow { emit(1 ) } .flowOn(Dispatchers.IO) .collect { println(it) } }
比如:withContext 切了线程到 IO,而 flowOn 又指定了线程池为 IO,这种情况,就会进入快路径 1,
1 2 override suspend fun flowCollect (collector: FlowCollector <T >) = flow.collect(collector)
快路径–上下文只有分发器一样 下面是一个例子:上下文只有分发器一样,其他的不同(比如传入协程名称、指定异常处理器等等):
1 2 3 4 5 6 7 8 9 10 11 withContext(Dispatchers.IO) { flow { emit(1 ) } .flowOn(Dispatchers.IO + CoroutineExceptionHandler { ... } + ...) .collect { println(it) } }
内部的逻辑是通过 withContextUndispatched 挂起函数在当前 Dispatchers 上启动了一个新协程:
1 2 3 4 5 6 7 8 9 10 11 internal suspend fun <T, V> withContextUndispatched ( newContext: CoroutineContext , value: V , countOrElement: Any = threadContextElements(newContext) , block: suspend (V) -> T ): T = suspendCoroutineUninterceptedOrReturn { uCont -> withCoroutineContext(newContext, countOrElement) { block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext)) } }
这其实是协程对 flowOn 的优化,在一些特定的场景下,减少 channel 的创建,缩短链路,提升性能。
produceImpl 当上下文不一样的话,那么会进入基于 channel 的 Flow 对象。
继续往下看,我们又走到了 produceImpl 这里,也就是 produce 创建协程的地方:
1 2 3 4 5 6 7 public open fun produceImpl (scope: CoroutineScope ) : ReceiveChannel<T> = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun) override suspend fun collect (collector: FlowCollector <T >) : Unit = coroutineScope { collector.emitAll(produceImpl(this )) }
可以看到:
flowon 指定的分发器决定了 produce 启动的协程所在的线程。
到这里,我们就能初步得出 flowon 的原理:基于 buffer,通过使 produce 切换线程实现。
3 FusibleFlow - 融合 Flow 我们来聊聊 FusibleFlow 融合 Flow:
FusibleFlow 的目的是优化执行链。当 Flow 链路存在多个操作符的时候,协程库会将这些操作符融合到一起,减少不必要的上下文切换和资源调度,提升性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 @InternalCoroutinesApi public interface FusibleFlow<T> : Flow<T> { public fun fuse ( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : Flow<T>}
这个 fuse 方法主要在 Flow 操作符链式调用时被内部使用:
buffer().buffer()
flowOn().flowOn()
buffer().flowOn()
任何连续的 ChannelFlow 操作符
可以这样理解,如果多个操作符创建的 Flow 对象实现了 FusibleFlow 接口,那么就会触发 fuse 操作符,实现融合;
由于 Flow 对象的创建是自上而下的,所以这个 this 是上游的 Flow 对象,this is FusibleFlow 说明上游是一个 buffer() 或者 flowon() 等其他操作符;
1 2 3 4 5 6 7 8 9 public fun <T> Flow<T>.flowOn (context: CoroutineContext): Flow<T> { checkFlowContext (context) return when { context == EmptyCoroutineContext -> this this is FusibleFlow -> fuse (context = context) else -> ChannelFlowOperatorImpl (this , context = context) } }
fuse 融合的核心逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 internal class ChannelFlowOperatorImpl <T>( flow: Flow<T>, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) { override fun create (context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) : ChannelFlow<T> = ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow) override fun dropChannelOperators () : Flow<T> = flow override suspend fun flowCollect (collector: FlowCollector<T>) = flow.collect(collector) } public override fun fuse (context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) : Flow<T> { assert { capacity != Channel.CONFLATED } val newContext = context + this .context val newCapacity: Int val newOverflow: BufferOverflow if (onBufferOverflow != BufferOverflow.SUSPEND) { newCapacity = capacity newOverflow = onBufferOverflow } else { newCapacity = when { this .capacity == Channel.OPTIONAL_CHANNEL -> capacity capacity == Channel.OPTIONAL_CHANNEL -> this .capacity this .capacity == Channel.BUFFERED -> capacity capacity == Channel.BUFFERED -> this .capacity else -> { assert { this .capacity >= 0 } assert { capacity >= 0 } val sum = this .capacity + capacity if (sum >= 0 ) sum else Channel.UNLIMITED } } newOverflow = this .onBufferOverflow } if (newContext == this .context && newCapacity == this .capacity && newOverflow == this .onBufferOverflow) return this return create(newContext, newCapacity, newOverflow) }
融合该怎么从代码逻辑上理解呢?
举个例子:flow{}.buffer().flowOn(),第二个 buffer() 阶段创建的 ChannelFlow 如下:
1 Flow flow = flow{} ---> ChannelFlow(flow + channel) ---> ... ...
第三个 flowOn() 阶段发现需要融合:那么会进行如下的调整:
1 2 Flow flow = flow{} ---> ChannelFlow(flow + channel) ---> ChannelFlow2(flow + channel 更新溢出策略和容量) ---> ... ...
会直接根据基础的 flow 对象,新建一个新的 ChannelFlow,然后同步第二阶段的 channel 配置,更新生成第三阶段的 channel 的配置。
融合场景 简单看几个融合的例子:
多个 fusible 操作配置相同 配置一样,返回原来的 ChannelFlow 对象:
1 2 3 flow { emit(1 ) } .buffer(2 ) .buffer(2 )
多个 fusible 操作容量融合 容量不同,会新建一个新的 ChannelFlow 对象,容量为 15:
1 2 3 flow { emit(1 ) } .buffer(5 ) .buffer(10 )
4 总结 本篇文章简单分享了 fowon 的线程切换的原理;快路径、慢路径的区别,融合 Flow 的实现原理。