kotlin 协程原理分析 - Flow 原理分析

0 前言

本片文章,简单总结下 Flow 启动流程和原理。

1 Flow 启动方式

Flow 作为协程体系下的流式加载库,提供了比 Rxjava 更加简洁的功能,同时其和谷歌全家桶完美结合,可以帮大家写出架构更加完美的程序。

Flow 常见的启动方式如下:

flowOf

该操纵符号将多个同类:

1
2
3
4
5
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}

flow{}

这个是 flow 标准启动操作:

1
2
3
flow<Int> { 
emit(1)
}

asFlow()

这是集合的 flow 转换操作:

1
2
3
4
5
6
val list = mutableListOf<Int>().asFlow()
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}

核心最后会到 flow{} 这个操作符这里。

2 Flow 启动流程

Flow 和 Rxjava 设计理念很一模一样,整个流程分为:自上而下、自下而上,自上而下。

1
2
3
4
5
flow<Int> {
emit(1)
}.map { value -> value + 5 }
.collect {
}

我们通过上面的例子来简单看看:

2.1 流对象链-自上而下

我们从 flow\ 开始讲起,这个过程收集器、分发器也会一并分析下;

flow{}

可以看到 flow 操作符,新建了一个 SafeFlow 对象,包裹了 block:

1
2
3
4
5
flow<Int> { 
emit(1)
}

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

这里的 block 是一个 suspend 类型的闭包,所以一定会编译成一个 SuspendLambda 对象,也就是协程体,但他并不是一个完整的协程,没有关联 Job。

我们可以在内部调用 emit 挂起函数,触发流数据传播。

SafeFlow

继承了 AbstractFlow,内部重写了 collectSafely 方法:

1
2
3
4
5
6
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}

collect 方法:

这里创建了一个 SafeCollector 对象,包裹下游的 FlowCollector 对象;

1
2
3
4
5
6
7
8
9
10
11
// AbstractFlow
public final override suspend fun collect(collector: FlowCollector<T>) {
//【1】包裹下游的 FlowCollector 对象
val safeCollector = SafeCollector(collector, coroutineContext)
try {
//【2】执行处理;
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}

可以看到 collect 接收一个下游传递上来的 FlowCollector 对象。

然后执行 safeCollector.block(),block 本身是 FlowCollector 的扩展函数,因此,可以在收集器内部执行 emit 方法了,也就是触发数据的分发。

safeCollector.emit 方法内部会继续执行:collector.emit。

这样就会实现数据的分发;

map{…}

继续看中间操作符 map,他负责将 T 类型的数据转成 R:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> 
= transform { value ->
//【1】对上游的数据,做转换 transform(value),emit 发送给下游;
return@transform emit(transform(value))
}

@PublishedApi
internal inline fun <T, R> Flow<T>.unsafeTransform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow {
//【2】这里:新建中间 FlowCollector 对象,依赖注入给上游 Flow<T> 的 collect 方法;
// this 是上游的 Flow~~
collect { value ->
//【3】接收上游的数据,分发给下游;
return@collect transform(value)
}
}

@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
//【4】这里新建了一个 Flow 对象,参数 collector 下游传递上来;
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}

可以看到,中间操作符 map 是新建了一个 Flow 对象,间接可以访问上游的 Flow 对象。

为了方便分析下面的链路,这里可以一起梳理下:

Flow 默认是属于冷流,所以需要下游的 FlowCollector 对象才能机会数据分发。

  • unsafeFlow 方法创建的 Flow 的 collect 传入一个 FlowCollector 对象1,触发了下游注入;
  • 基于下游 FlowCollector 执行 unsafeFlow 的闭包参数;
  • 上游的 Flow 对象的 collect 被调用,新建一个 FlowCollector 对象2,将对象2调用上游 Flow collect 方法向上游注入;
  • 对象2 的 emit 方法,执行 map 转换,调用下游的 emit 方法;

这里简单讲了下上下游的交互原理。

继续往下看:

collect

collect 就是我们的终止操作符,需要我们新建一个 FlowCollector 接收数据,然后向上游传递:

1
2
3
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}

到这里,流对象链就分析完了。

核心点

  • 下游的 Flow 间接访问上游的 Flow,如果有多个中间操作符,那就会有多个中间 Flow 对象。

2.2 收集器链-自上而下

回顾一下:

Flow 默认是属于冷流,所以需要下游的 FlowCollector 对象才能机会数据分发。

  • unsafeFlow 方法创建的 Flow 的 collect 传入一个 FlowCollector 对象1,触发了下游注入;
  • 基于下游 FlowCollector 执行 unsafeFlow 的闭包参数;
  • 上游的 Flow 对象的 collect 被调用,新建一个 FlowCollector 对象2,将对象2调用上游 Flow collect 方法向上游注入;
  • 对象2 的 emit 方法,执行 map 转换,调用下游的 emit 方法;

这个地方已经讲了收集器链该如何建立:

  • 当我们调用终止操作符的时候,终止 FlowCollector 对象会通过执行就近的 Flow collect 方法向上游传递。

  • 每经过一个中间 Flow 对象,都会新建一个中间 FlowCollector 对象,在将该中间 FlowCollector 对象向上游传递。

这个就是收集器链的建立过程。

核心点

  • 上游的 FlowCollector 包裹下游的 FlowCollector

deepseek_mermaid_20251117_76d00a

2.3 分发器链-自上而下

核心点

最后就是分发器链,当我们建立起 FlowCollector 链后,上游的 FlowCollector 就会调用下游 FlowCollector 的 emit 方法。

在调用的过程中执行中间操作符号的转换数据操作。

3 总结

简单总结:

  • 流对象链自上而下:建立 Flow 的包裹状态;
  • 收集器链自下而上:建立 FlowCollector 的包裹状态;
  • 分发器链自上而下:通过 FlowCollector emit 方法,执行数据的链式发送和数据转换;
文章作者: Li Shuaiqi
文章链接: https://lishuaiqi.top/2025/07/15/kotlin/kotlin-cotoutine-coroutine_flow/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Li Shuaiqi's Blog