kotlin 协程原理分析 - Flow 线程切换的原理

0 前言

本片文章,简单总结下 Flow 线程切换的原理。

前面分析了 buffer 背压的原理,这里趁热打铁,讲讲 flowon 切换线程的原理:

1 flowon 和 Rxjava 的对比

RxJava 有 2 个操作符:subscribeOnobserveOn

  • subscribeOn
    • 作用:指定 上游 的线程。它决定了 Observable.create { ... } 在哪个线程执行。
    • 数量多次调用 subscribeOn 只有第一次生效(最靠近源头的那个)。
  • observeOn
    • 作用:指定 下游 的线程。它会影响调用 observeOn 之后所有操作的执行线程。
    • 数量:可以多次调用,从而在流的不同阶段切换线程。

Flow 更加简单,只有一个 flowon,flowon 等于 subscribeOn + observeOn:

  • 作用flowOn 会改变 上游 所有操作的 CoroutineContext,这一点等于 RxJava 中 subscribeOn 的效果;但是它又可以生效多次,所以又等于 RxJava 中 observeOn 的效果;
  • 机制flowOn 会在流中创建一个缓冲区,上游在指定的调度器上发射数据,下游从缓冲区中消费。
  • 数量:可以多次调用,每次调用都会改变它之前(上游) 的上下文。

其实看到:创建一个缓冲区,大家能猜到,FlowON 实际上是基于 buffer 的。

下面我们分析下源码即可。

2 flowon 源码分析

下面是 flowOn 函数的源码:

1
2
3
4
5
6
7
8
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}

可以看到,flowOn 方法内部对一些条件做了不同的处理;

  • EmptyCoroutineContext,也就是不切换线程,直接返回上游的 Flow 对象;
  • 如果是 FusibleFlow,融合类型的 Flow,那么就执行 FusibleFlow 的 fuse 方法,创建一个新的 Flow;
  • 其他情况,创建一个 ChannelFlowOperatorImpl 对象。

先来简单讲一下,什么是融合 Flow:FusibleFlow

FusibleFlow 的目的是优化执行链。当 Flow 链路存在多个操作符的时候,协程库会将这些操作符融合到一起,减少不必要的上下文切换和资源调度,提升性能。

我们先来看我们最熟悉的 ChannelFlowOperatorImpl;

2.1 ChannelFlowOperatorImpl

ChannelFlowOperatorImpl 在前面 buffer 有提到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//【1】ChannelFlowOperatorImpl
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL, //【2】注意看参数:OPTIONAL_CHANNEL,表示可选的;
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)

override fun dropChannelOperators(): Flow<T> = flow

override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}

collect

核心是在他的 collect 方法里,前面 buffer 没有讲这个可选通道,这里用篇幅讲下我的理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 父类:ChannelFlowOperator
override suspend fun collect(collector: FlowCollector<T>) {
//【1】快路径:当通道创建是可选的时;
if (capacity == Channel.OPTIONAL_CHANNEL) {
//【*】注意 coroutineContext 来自调用 collect 函数【所在协程】的上下文;
val collectContext = coroutineContext
// 计算最终的上下文,这里:调度器和异常处理器总是被替换,其他上下文元素按特定规则合并,返回一个新的上下文;
val newContext = collectContext.newCoroutineContext(context)

// #1: 如果计算后的上下文和之前完全相同,那就直接传递下游的 FlowCollector,等价于没有 Flowon;
if (newContext == collectContext)
return flowCollect(collector)

// #2: 如果计算后的上下文和之前只有 Dispatchers 一样,其他的都不一样,那就使用无通道模式;
// 内部的逻辑是通过 withContextUndispatched 挂起函数在当前 Dispatchers 上启动了一个新协程;
if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
return collectWithContextUndispatched(collector, newContext)
}
//【2】慢路径:本次会创建基于 channel 的 Flow 对象;
super.collect(collector)
}

这里的 coroutineContext 来自调用 collect suspend 函数时,所在的协程的上下文,由于 Flow 的 collect 链路是反向自下而上执行的,所以当前的 flowOn 的 coroutineContext 来自下游:

1
2
3
4
5
6
7
8
// eg:我们在 viewmodelscope 里面执行如下代码:
flow { emit(1) }
// 第一个 flowOn 的 coroutineContext,来自下面第二个 flowOn 创建的协程 (produce)
.flowOn(Dispatchers.IO)
.filter { it > 5 }
// 第二个 flowOn 的 coroutineContext,来自终止操作符 collect 所在的协程(外部协程)
.flowOn(Dispatchers.IO + exceptionHandler)
.collect { println(it) } // <-- 这里启动收集,所在的是外部协程,分发器是 MAIN;
快路径–上下文完全相同【路径压缩】

上面第一个 flowOn 是一个例子,下面又是一个简单的例子:

1
2
3
4
5
6
7
8
9
10
// eg:我们在 viewmodelscope 里面执行如下代码:
withContext(Dispatchers.IO) { // 这里指定了外部协程是 IO 分发器;
flow {
emit(1)
}
.flowOn(Dispatchers.IO) // 这里是相同的 IO 分发器;
.collect {
println(it)
}
}

比如:withContext 切了线程到 IO,而 flowOn 又指定了线程池为 IO,这种情况,就会进入快路径 1,

  • 慢路径:创建中间层的 FlowCollector 对象,同时新建一个 channel 对象。

  • 快路径:直接传递下游的 FlowCollector 给上游 Flow,路径压缩。

1
2
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
快路径–上下文只有分发器一样

下面是一个例子:上下文只有分发器一样,其他的不同(比如传入协程名称、指定异常处理器等等):

1
2
3
4
5
6
7
8
9
10
11
// eg:
withContext(Dispatchers.IO) { // 这里指定了外部协程是 IO 分发器;
flow {
emit(1)
}
// 也是在 IO,但是指定了异常处理器
.flowOn(Dispatchers.IO + CoroutineExceptionHandler { ... } + ...)
.collect {
println(it)
}
}

内部的逻辑是通过 withContextUndispatched 挂起函数在当前 Dispatchers 上启动了一个新协程:

1
2
3
4
5
6
7
8
9
10
11
internal suspend fun <T, V> withContextUndispatched(
newContext: CoroutineContext,
value: V,
countOrElement: Any = threadContextElements(newContext),
block: suspend (V) -> T
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
withCoroutineContext(newContext, countOrElement) { //【1】withCoroutineContext 启动了一个新协程;
block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
}
}

这其实是协程对 flowOn 的优化,在一些特定的场景下,减少 channel 的创建,缩短链路,提升性能。

produceImpl

当上下文不一样的话,那么会进入基于 channel 的 Flow 对象。

继续往下看,我们又走到了 produceImpl 这里,也就是 produce 创建协程的地方:

1
2
3
4
5
6
7
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}

可以看到:

flowon 指定的分发器决定了 produce 启动的协程所在的线程。

到这里,我们就能初步得出 flowon 的原理:基于 buffer,通过使 produce 切换线程实现。

3 FusibleFlow - 融合 Flow

我们来聊聊 FusibleFlow 融合 Flow:

FusibleFlow 的目的是优化执行链。当 Flow 链路存在多个操作符的时候,协程库会将这些操作符融合到一起,减少不必要的上下文切换和资源调度,提升性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
@InternalCoroutinesApi
public interface FusibleFlow<T> : Flow<T> {
/**
* This function is called by [flowOn] (with context) and [buffer] (with capacity) operators
* that are applied to this flow. Should not be used with [capacity] of [Channel.CONFLATED]
* (it shall be desugared to `capacity = 0, onBufferOverflow = DROP_OLDEST`).
*/
public fun fuse(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>
}

这个 fuse 方法主要在 Flow 操作符链式调用时被内部使用:

  • buffer().buffer()
  • flowOn().flowOn()
  • buffer().flowOn()
  • 任何连续的 ChannelFlow 操作符

可以这样理解,如果多个操作符创建的 Flow 对象实现了 FusibleFlow 接口,那么就会触发 fuse 操作符,实现融合;

由于 Flow 对象的创建是自上而下的,所以这个 this 是上游的 Flow 对象,this is FusibleFlow 说明上游是一个 buffer() 或者 flowon() 等其他操作符;

1
2
3
4
5
6
7
8
9
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
//【1】那么这里会执行融合操作,调用的上游 Flow 的方法:
this is FusibleFlow -> fuse(context = context) // context 参数传入
else -> ChannelFlowOperatorImpl(this, context = context)
}
}

fuse

融合的核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 上游 ChannelFlowOperatorImpl
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
// create:创建下游的 ChannelFlowOperatorImpl 新对象;
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
//【1】注意这里的 flow,这个 flow 是一个非 FusibleFlow,比如 flow{}.buffer().flowOn()
// 那么这个 flow 是 flow{} 创建的 SafeFlow,而不是中间的 buffer() 的 channelFlow,因为要融合;
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)

override fun dropChannelOperators(): Flow<T> = flow

override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}


public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
// 校验:capacity 不能是 CONFLATED。
assert { capacity != Channel.CONFLATED }

// 1、上下文合并:将传入的上下文与上游 Flow 的上下文合并;
val newContext = context + this.context

val newCapacity: Int
val newOverflow: BufferOverflow

// 2、针对于内部 channel 的溢出策略做调整;
// newCapacity 决定了 channelFlow 内部选择哪个 channel;
// newOverflow 决定了数据移除后的策略;
if (onBufferOverflow != BufferOverflow.SUSPEND) {
// 如果溢出策略不是 SUSPEND(比如是 DROP_OLDEST 等)
// 那么这个新的缓冲区永远不会挂起 => 完全覆盖之前的缓冲配置
newCapacity = capacity
newOverflow = onBufferOverflow

} else {
// 3、合并容量,但保持之前的溢出策略
newCapacity = when {
// 3.1 OPTIONAL_CHANNEL 会被其他类型覆盖;
// 如果其中一个容量是 OPTIONAL_CHANNEL,就使用另外一个;
this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
capacity == Channel.OPTIONAL_CHANNEL -> this.capacity

// 3.2 BUFFERED 会被具体容量值替换(限制数据个数)
// 如果其中一个容量是 BUFFERED,就使用另外一个;
this.capacity == Channel.BUFFERED -> capacity
capacity == Channel.BUFFERED -> this.capacity

else -> {
assert { this.capacity >= 0 }
assert { capacity >= 0 }
// 3.3 合并容量,在溢出时限制为 UNLIMITED
val sum = this.capacity + capacity
// 如果和 >= 0 就使用和,否则使用无限容量(防止整数溢出)
if (sum >= 0) sum else Channel.UNLIMITED
}
}
// 保持之前的溢出策略
newOverflow = this.onBufferOverflow
}

// 3、如果合并后的配置与当前配置完全相同,就直接返回当前对象(优化)
if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
return this

// 4、否则,使用合并后的配置创建新的 Flow,替换旧的 Flow 对象;
return create(newContext, newCapacity, newOverflow)
}

融合该怎么从代码逻辑上理解呢?

举个例子:flow{}.buffer().flowOn(),第二个 buffer() 阶段创建的 ChannelFlow 如下:

1
Flow flow = flow{} ---> ChannelFlow(flow + channel) ---> ... ...

第三个 flowOn() 阶段发现需要融合:那么会进行如下的调整:

1
2
Flow flow = flow{} ---> ChannelFlow(flow + channel) // 废弃
---> ChannelFlow2(flow + channel 更新溢出策略和容量) ---> ... ...

会直接根据基础的 flow 对象,新建一个新的 ChannelFlow,然后同步第二阶段的 channel 配置,更新生成第三阶段的 channel 的配置。

融合场景

简单看几个融合的例子:

多个 fusible 操作配置相同

配置一样,返回原来的 ChannelFlow 对象:

1
2
3
flow { emit(1) }
.buffer(2)
.buffer(2)

多个 fusible 操作容量融合

容量不同,会新建一个新的 ChannelFlow 对象,容量为 15:

1
2
3
flow { emit(1) }
.buffer(5)
.buffer(10)

4 总结

本篇文章简单分享了 fowon 的线程切换的原理;快路径、慢路径的区别,融合 Flow 的实现原理。

文章作者: Li Shuaiqi
文章链接: https://lishuaiqi.top/2025/10/16/kotlin/kotlin-cotoutine-coroutine_flow_flowon/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Li Shuaiqi's Blog