kotlin 协程原理分析 - Flow 背压机制分析

0 前言

本片文章,简单总结下 Flow 背压操作符的原理。

主要从下面几个方面来分析下背压的原理。

1、背压操作符的类型

2、背压操作符的原理

1 Flow 背压操作符

Flow 的背压操作符主要用于处理数据生产速度与消费速度不匹配的问题,核心策略包括缓冲、丢弃旧数据或取消旧处理。以下是主要操作符及其作用:

buffer

  • 功能:缓存数据并控制流速,支持指定缓冲区大小和溢出策略。
  • 溢出策略:
    • SUSPEND(默认):挂起发射器直到缓冲区有空闲。
    • DROP_OLDEST:丢弃最旧数据。
    • DROP_LATEST:丢弃最新数据。
  • 示例:flow.buffer(10, BufferOverflow.SUSPEND)
1
2
3
4
5
6
7
flow<Int> {
emit(2)
}.buffer(5, BufferOverflow.SUSPEND).filter {
it > 5
}.collect {
print(it)
}

conflate

  • 功能:==丢弃旧数据,仅保留最新值==,不设缓冲区。
  • 适用场景:数据更新频繁但旧值无需保留(如UI刷新)。
  • 示例:flow.conflate()

collectLatest

  • 功能:==取消前一个未完成的处理,仅处理最新值==。
  • 适用场景:需确保最新数据优先处理(如网络请求)。
1
2
3
4
flowOf(1, 2, 3).collectLatest {
delay(1)
println(it) // Expect only 3 to be printed
}

2 buffer 操作符

我们来分析下 buffer 的核心原理:

  • capacity 表示容量,可以指定为具体的数字,也可以选择 Kotlin 协程中 Channel 工厂的伴生对象定义的一些参数:
  • BufferOverflow 背压策略;SUSPEND 发送方挂起、DROP_OLDEST 丢弃最旧的、DROP_LATEST 丢弃最新的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
"Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
}
require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
// desugar CONFLATED capacity to (0, DROP_OLDEST)
var capacity = capacity
var onBufferOverflow = onBufferOverflow
//【1】如果是冲突类型的 channel,那么容量为 0,策略为 DROP_OLDEST。
if (capacity == CONFLATED) {
capacity = 0
onBufferOverflow = BufferOverflow.DROP_OLDEST
}
//【2】校验扩展对象的类型,当前的是 SafeFlow 对象,所以会新建一个 ChannelFlowOperatorImpl;
return when (this) {
is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
//【2.1】走入这里;
else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
}

buffer 的核心原理基于 kotlin 的 channel,参数 capacity 可以通过设置如下的值来确定使用哪个 channel。

可以看到,最后创建了一个 ChannelFlowOperatorImpl,包裹上游的 Flow 对象。

Channel 类型

1
public const val UNLIMITED: Int = Int.MAX_VALUE
  • 无限容量的 channel,使用 Int.MAX_VALUE 作为标识。
1
public const val RENDEZVOUS: Int = 0
  • 约会类型的 channel,sender 和 receiver 必须同时存在。
1
public const val CONFLATED: Int = -1
  • 只能存一个元素的 channel。
1
public const val BUFFERED: Int = -2
  • 基于队列的一个默认大小为 64 的缓存大小的 channel。

ChannelFlowOperatorImpl

我们知道 Flow 默认是冷流,也就是说,需要下游主动注入 FlowCollector 才能触发流的传递!

ChannelFlowOperatorImpl 本身也是一个 Flow,基于 Channel,关键在于 collect 方法,位于 ChannelFlowOperator 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//【1】ChannelFlowOperatorImpl;
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, T>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity, onBufferOverflow)

override fun dropChannelOperators(): Flow<T> = flow

override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
}

//【2】ChannelFlowOperator;
internal abstract class ChannelFlowOperator<S, T>(
@JvmField protected val flow: Flow<S>,
context: CoroutineContext,
capacity: Int,
onBufferOverflow: BufferOverflow
) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
... ... ...

//【2.1】下游的 FlowCollector 对象。
override suspend fun collect(collector: FlowCollector<T>) {
//【1】快路径:当通道创建是可选的时;
if (capacity == Channel.OPTIONAL_CHANNEL) {
//【*】注意 coroutineContext 来自调用 collect 函数所在的协程的上下文:
val collectContext = coroutineContext
val newContext = collectContext.newCoroutineContext(context) // 计算最终的上下文,后续比较;

// #1: 如果计算后的上下文和之前完全相同,那就直接传递下游的 FlowCollector,等价于没有 Flowon;
if (newContext == collectContext)
return flowCollect(collector)

// #2: 如果计算后的上下文和之前只有 Dispatchers 一样,其他的都不一样,那就使用无通道模式;
// 内部的逻辑是通过 withContextUndispatched 挂起函数在当前 Dispatchers 上启动了一个新协程;
if (newContext[ContinuationInterceptor] == collectContext[ContinuationInterceptor])
return collectWithContextUndispatched(collector, newContext)
}
//【2】慢路径:本次会创建基于 channel 的 Flow 对象;
super.collect(collector)
}
}


//【3】ChannelFlow
@InternalCoroutinesApi
public abstract class ChannelFlow<T>(
// upstream context
@JvmField public val context: CoroutineContext,
// buffer capacity between upstream and downstream context
@JvmField public val capacity: Int,
// buffer overflow strategy
@JvmField public val onBufferOverflow: BufferOverflow
) : FusibleFlow<T> {

... ... ...
override suspend fun collect(collector: FlowCollector<T>): Unit =
//【3.1】基于 coroutineScope 作用域,启动了一个协程,然后执行 produceImpl 和 emitAll;
coroutineScope {
collector.emitAll(produceImpl(this))
}
}

可以看到:collect 实际上是在父类 ChannelFlow 内部。

核心逻辑:基于 coroutineScope 作用域,启动了一个协程,然后执行 produceImpl 和 emitAll;

上游 produceImpl 协程创建

produceImpl 内部通过 produce 创建了一个协程序:

1
2
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

produce 会创建一个协程:内部会有一个 channel,协程体内部 send,然后返回一个 ReceiveChannel 对象,其他的协程在协程体内部 receive 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Internal version of produce that is maximally flexible, but is not exposed through public API (too many params)
internal fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
//【1】这个就是核心的 channel;
val channel = Channel<E>(capacity, onBufferOverflow)
val newContext = newCoroutineContext(context)
//【2】这个是 produce 协程对应的 job,内部通过接口委托的方式来操作 channel;
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
//【3】启动协程体;
coroutine.start(start, coroutine, block)
return coroutine
}

不多说。

协程体–collectToFun

collectToFun 内部为协程体,核心逻辑是:执行 collectTo 操作:

1
2
3
4
5
6
7
8
9
10
11
// ChannelFlow
internal val collectToFun: suspend (ProducerScope<T>) -> Unit
get() = { collectTo(it) }

// ChannelFlowOperator
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))

// ChannelFlowOperatorImpl
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)

可以看到这里创建了一个 SendingCollector 对象,然后向上游 Flow 对象传过去,这个在 Flow 的启动流程中有分析,不多说。

这样就能触发上游的 flow 发送数据!

那么 SendingCollector 的 emit 方法做了什么呢?

SendingCollector
1
2
3
4
5
6
@InternalCoroutinesApi
public class SendingCollector<T>(
private val channel: SendChannel<T>
) : FlowCollector<T> {
override suspend fun emit(value: T): Unit = channel.send(value)
}

果然:是把上游的数据通过 channel.send 进行发送:

可以看到,buffer 让上游的逻辑运行在 produce 创建的协程中了.

下游 FlowCollector.emitAll

再来看看下游该如何接受数据,注意参数:ReceiveChannel,这是前面 produceImpl 创建的 Channel:

也就是说,核心逻辑是从 channel 里面读取数据:

对于 emitAll 则是运行在另外一个协程中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
emitAllImpl(channel, consume = true) // 进入到这里。

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
ensureActive()
var cause: Throwable? = null
try {
//【1】从 channel 中读取数据,这里使用了一个语法糖:for-in
for (element in channel) {
//【2】这里是调用 FlowCollector 的 emit 方法,传递数据流;
emit(element)
}
} catch (e: Throwable) {
cause = e
throw e
} finally {
if (consume) channel.cancelConsumed(cause)
}
}

最终会走到 channel 内部的逻辑中,for-in 循环遍历集合是通过迭代器(Iterator)来实现的。

1
2
3
4
5
6
// for-in 类似下面的代码:
val iterator = channel.iterator()
while (iterator.hasNext()) { // 可能在这里挂起
val element = iterator.next()
// 处理元素
}

我们假设,此时创建的 channel 是 BUFFERED 类型,对应的 BufferedChannel,那么其迭代器是 BufferedChannelIterator。

其他的 ChannelIterator 逻辑类似:

BufferedChannelIterator

我们来看看迭代的逻辑:

hasNext()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// `hasNext()` 本质上是一种特殊的接收操作
override suspend fun hasNext(): Boolean =
receiveImpl( // <-- 这是一个内联函数
// 初始时不创建协程体(continuation),等到真正需要时才创建
// 如果需要,稍后会通过 [onNoWaiterSuspend] 创建协程体
waiter = null,

//【1】将接收到的元素存在 receiveResult 中
// 同时通知 BufferedChannel 扩展此接收操作的同步已完成
onElementRetrieved = { element ->
this.receiveResult = element // 保存元素供 next() 使用
true // hasNext() 返回 true
}

// 由于没有提供等待者(waiter),所以挂起是不可能的
// 这个回调在正常情况下不应该被执行到
onSuspend = { _, _, _ -> error("unreachable") },

// 当通道已关闭时调用
onClosed = { onClosedHasNext() } // 返回 false 或抛出异常

//【2】如果 hasNext() 决定要挂起,则调用该挂起函数;
onNoWaiterSuspend = { segm, i, r ->
// 真正的挂起点;
return hasNextOnNoWaiterSuspend(segm, i, r) // 尾调用优化
}
)

这最关键的就是:

  • onElementRetrieved:用于保存下一个要访问的元素。
  • onNoWaiterSuspend:在需要挂起的时候,会执行,然后进入挂起状态。
  • 尾调用优化: 在挂起情况下使用尾调用,避免栈帧增长;

当 channel 为空时,receiveImpl 会调用 onNoWaiterSuspend 回调,这里使用了 return 来从当前函数返回,进入真正的挂起函数。

receiveImpl - 接收总入口

receiveImpl 是一个内联函数,所以会直接展开,可以直接访问到外部的 Conitnuation 对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private inline fun <R> receiveImpl(
waiter: Any?,
onElementRetrieved: (element: E) -> R,
onSuspend: (segm: ChannelSegment<E>, i: Int, r: Long) -> R,
onClosed: () -> R,
onNoWaiterSuspend: (
segm: ChannelSegment<E>,
i: Int,
r: Long
) -> R = { _, _, _ -> error("unexpected") }
): R {
// 在计数器递增之前读取段引用;
// 这对于稍后能够找到所需的段至关重要
var segment = receiveSegment.value

//【1】主循环:在需要重试操作时进行自旋操作;;
while (true) {
// 类似于 `send(e)` 操作,`receive()` 首先检查
// 通道是否已经关闭接收
if (isClosedForReceive) return onClosed()

val r = this.receivers.getAndIncrement()

//【2】计算所需的段 ID 和其中的单元格索引
// 如果段不合适,那就自旋再寻找;
val id = r / SEGMENT_SIZE
val i = (r % SEGMENT_SIZE).toInt()

if (segment.id != id) {
segment = findSegmentReceive(id, segment) ?:
continue
}

//【3】遍历查询需要的数据;
val updCellResult = updateCellReceive(segment, i, r, waiter)

//【4】根据结果处理不同情况
return when {
updCellResult === SUSPEND -> {
//【4.1】这里是有 waiter 的情况,并将指定的等待者存储在单元格中;
(waiter as? Waiter)?.prepareReceiverForSuspension(segment, i)
onSuspend(segment, i, r)
}
updCellResult === FAILED -> {
// 失败了;
if (r < sendersCounter) segment.cleanPrev()
continue // 继续循环,重试操作
}
updCellResult === SUSPEND_NO_WAITER -> { // 表示即将挂起;
//【4.2】--> 没有 waiter,所以进入这里,也就是表示要挂起了;
onNoWaiterSuspend(segment, i, r)
}
else -> {
//【4.3】element - 成功获取到元素
segment.cleanPrev()
@Suppress("UNCHECKED_CAST")
onElementRetrieved(updCellResult as E)
}
}
}
}

// 回顾:如果 hasNext() 决定要挂起,则调用该挂起函数
// 这里应用了尾调用优化(tail-call optimization)
onNoWaiterSuspend = { segm, i, r ->
return hasNextOnNoWaiterSuspend(segm, i, r)
}

对于 bufferchannel 的逻辑,这篇文章先不做讨论,后续会单独写一篇文章来分析。

这里能看出:

  • 如果有数据的话,那么会直接会 return 内部链表中的数据。
  • 否则会返回 suspend,表示即将挂起状态。

注意:

这个 SUSPEND/SUSPEND_NO_WAITER 并不是真正的挂起,这是一个 channel 内部的状态。

原因:receiveImpl 不是 suspend 函数,挂起的条件首先必须是 suspend 函数!因为编译器只会识别 suspend 函数。

真正导致挂起的是在 hasNextOnNoWaiterSuspend 函数中。

hasNextOnNoWaiterSuspend – 真正的挂起 suspend

如果要挂起的话,根据逻辑 hasNextOnNoWaiterSuspend 会被调用,这里真正的挂起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private suspend fun hasNextOnNoWaiterSuspend(
segment: ChannelSegment<E>,
index: Int,
r: Long
): Boolean = suspendCancellableCoroutineReusable { cont ->
this.continuation = cont //【1】这个表示当前所在的协程体 (但其实是一个 wrapper 对象);

receiveImplOnNoWaiter( // 同样的是内联函数;
segment = segment, index = index, r = r,
waiter = this, //【2】迭代器自己,这次提供真实的等待者;

onElementRetrieved = { element -> //【3】获取到元素后的回调;
this.receiveResult = element
this.continuation = null

//【4】恢复外部的协程体;
cont.resume(true, onUndeliveredElement?.bindCancellationFun(element, cont.context))
},
onClosed = { onClosedHasNextNoWaiterSuspend() }
)
}
  • suspendCancellableCoroutineReusable 创建 CancellableContinuationImpl,包裹外部协程体;
  • cont 保存到 continuation 字段;
  • 注册迭代器自己 (this) 为等待者,让发送方知道有接收者在等待;

注册的核心逻辑是在:receiveImplOnNoWaiter 方法中。

当 channel 的缓冲区为空的时候,send 和 receive 的关系

简单的总结下 send 和 receive 的逻辑关系,当 channel 的缓冲区为空的时候:

  • 接收者

    • 调用 iterator.hasNext()

    • receiveImpl 发现缓冲区为空

    • 调用 hasNextOnNoWaiterSuspend

    • 保存外部协程体到 continuation 字段

    • receiveImplOnNoWaiter 注册等待者到 channel

    • 协程挂起。【这里就是 collector.emitAll 的挂起逻辑】。

1
2
3
4
5
6
7
8
9
10
11
12
13
iterator.hasNext() 调用
→ receiveImpl(waiter = null) // 初始调用
→ updateCellReceive 中尝试获取数据,结果木有,需要挂起,调用 onNoWaiterSuspend
→ hasNextOnNoWaiterSuspend 创建协程体
→ receiveImplOnNoWaiter(waiter = this) // 这次传入真实的等待者,后续作为唤醒;
→ updateCellReceive
→ state === null && r >= senders // 确认需要挂起
→ segment.casState(null → waiter) 成功
→ expandBuffer() // 扩展缓冲区逻辑
→ 返回 SUSPEND
→ waiter.prepareReceiverForSuspension() // 挂起准备
→ 从 receiveImplOnNoWaiter 返回
→ 从 hasNextOnNoWaiterSuspend 返回,协程挂起
  • 发送者

    • 发送者调用 tryResumeHasNext(element)

    • 获取到 continuation 对象

    • 清理 continuation 字段

    • 保存元素到 receiveResult

    • cont.resume(true) 恢复协程执行

    • 协程从挂起点继续执行,返回 true。【这里就是 collector.emitAll 的挂起逻辑】

1
2
3
4
5
6
7
8
9
发送者调用 send 相关方法
→ 发现单元格中有等待者 (waiter)
→ 调用 waiter.tryResumeHasNext(element)
→ 从 continuation 字段获取协程体
→ 清理 continuation = null
→ 保存元素 receiveResult = element
→ cont.tryResume0(true, ...) 恢复协程体
→ 协程从挂起点继续执行
→ hasNext() 返回 true
receiveImplOnNoWaiter – 挂起入口–>无法获取元素就准备进入挂起状态

receiveImplOnNoWaiter 这个函数是用于在无法立即获取元素时,将等待者(waiter)注册到通道的特定位置,并在适当的时候恢复,参数:

  • segment: 当前操作的通道段。
  • index: 段内的索引,指定具体的单元格。
  • r: 单元格的全局索引。
  • waiter: 要注册的等待者,通常是迭代器本身,很简单,当有数据时,waiter 要被唤醒继续执行。
  • onElementRetrieved: 成功获取元素时的回调。
  • onClosed: 通道关闭时的回调。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private inline fun receiveImplOnNoWaiter(
segment: ChannelSegment<E>,
index: Int,
r: Long,
waiter: Waiter,
onElementRetrieved: (element: E) -> Unit,
onClosed: () -> Unit
) {
//【1】关键步骤:尝试更新单元格状态;
val updCellResult = updateCellReceive(segment, index, r, waiter)

when {
updCellResult === SUSPEND -> {
//【2】情况 1:成功注册为等待者,准备挂起;
waiter.prepareReceiverForSuspension(segment, index)
}
updCellResult === FAILED -> {
//【3】情况 2:CAS 乐谷锁更新失败,需要重试;
if (r < sendersCounter) segment.cleanPrev()
receiveImpl( // 重新调用通用实现;
waiter = waiter,
onElementRetrieved = onElementRetrieved,
onSuspend = { _, _, _ -> }, // 空的挂起处理;
onClosed = onClosed
)
}
else -> {
//【4】情况 3:立即获得了元素,不需要挂起了;
segment.cleanPrev()
@Suppress("UNCHECKED_CAST")

onElementRetrieved(updCellResult as E)
}
}
}
updateCellReceive

该方法是从 channel 中尝试获取元素,如果能获取到,那就返回;无法获取,就注册 Node 节点到 channel:

这个方法在获取数据和注册 waiter 的时候都会调用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private fun updateCellReceive(
segment: ChannelSegment<E>,
index: Int,
r: Long,
waiter: Any?,
): Any? {
val state = segment.getState(index)

when {
//【1】情况 1:单元格为空,这是注册 waiter
state === null -> {
val senders = sendersAndCloseStatus.value.sendersCounter
if (r >= senders) { //【2】这里是确认需要挂起了。
//【3】这里是 hasNext 第一次会进入的地方,入口是 receiveImpl。
if (waiter === null) {
return SUSPEND_NO_WAITER
}
//【4】尝试安装等待者,这里就是要挂起,入口是 receiveImplOnNoWaiter;
if (segment.casState(index, state, waiter)) {
expandBuffer() // 扩展缓冲区;
return SUSPEND // 成功挂起;
}
}
}
// 情况 2:单元格中有缓冲的元素,立即返回元素;
state === BUFFERED -> if (segment.casState(index, state, DONE_RCV)) {
expandBuffer() // 扩展缓冲区;
return segment.retrieveElement(index) // 立即返回元素;
}
}
// 情况 3:快速路径失败,进入慢速路径,内部通过 CAS 插入和修改;
return updateCellReceiveSlow(segment, index, r, waiter)
}

这里是有一些优化操作:

  • CAS 乐观锁,提升性能;

  • 快速路径优化:大多数情况在快速路径中处理,这个后续单独开一片来分析。

总结

核心点是:通过 produce 创建了一个协程,通过 channel 来实现数据交互。

  • 下游从 channel 中读取数据;
  • 上游在 produce 创建的协程体中,向 channel 发送数据;

这也就是 buffer 的高效地方:在不同的协程中实现 send 和 receive 方法。

channel 常见的 3 种恢复机制如下,emitAll 这个属于第一种,当发送方 send 的时候,就会触发 tryResumeHasNext:

  1. 发送者发送元素tryResumeHasNext(element)
  2. 通道关闭tryResumeHasNextOnClosedChannel()
  3. 协程取消invokeOnCancellation()

3 conflate 操作符

我们来看看 conflate 的源码:

1
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

其实就是 buffer 换皮而已。

CONFLATED 决定了内部 channel 类型为 CONFLATED,这种类型的 channel 只能存一个值,新值会覆盖旧值。

4 collectLatest 操作符

collectLatest 的话,其可以做到:取消前一个未完成的处理,仅处理最新值

只处理流中的最新值,当新值到来时,如果前一个值的处理还在进行,会立即取消前一个处理并开始处理新值。

1
2
3
4
5
6
7
8
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
//【1】buffer 的参数:RENDEZVOUS 表示约会类型的 channel,表示接受者和发送者必须都存在!
mapLatest(action).buffer(0).collect()
}

@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
transformLatest { emit(transform(it)) }

可以看到

  • 先通过 mapLatest 做一次数据转换,然后调用 buffer 继续处理;

  • buffer 参数为 0 表示 RENDEZVOUS,也就是约会类型的 channel,接受者和发送者必须都存在;

mapLatest(action)

作用:

  • 将原始流 Flow<T> 转换为 Flow<Unit>
  • 对每个值应用 action,但如果新值到达时前一个 action 还在执行,会取消前一个 action
  • 返回一个新的流,其中每个元素代表一个 action 调用的开始

ChannelFlowTransformLatest

可以看到 transformLatest 新建了一个 ChannelFlowTransformLatest 对象,也是基于 ChannelFlow 对象的,和 buffer 很类似;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.transformLatest(@BuilderInference
transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
ChannelFlowTransformLatest(transform, this)

// ChannelFlowTransformLatest;
internal class ChannelFlowTransformLatest<T, R>(
private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> =
ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)

ChannelFlowTransformLatest 是 ChannelFlowOperator 的子类。

关键在 flowCollect 方法中。

flowCollect– 核心

在 flowCollect 中进行 FlowCollector 的注入的时候,会在中间的 FlowCollector 的 emit 方法内直接 launch 一个新的协程。

并将这个协程保存到 previousFlow 里,那么如果是多个值的话,上一个 previousFlow 可以被取消;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
override suspend fun flowCollect(collector: FlowCollector<R>) {
assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
coroutineScope {
var previousFlow: Job? = null
flow.collect { value ->
previousFlow?.apply {
cancel(ChildCancelledException())
join()
}
//【1】能够保存当前的协程 job;
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
//【2】触发下游的 collector 的 emit 方法;
collector.transform(value)
}
}
}
}

5 总结

OK,关于 Flow 背压机制,到这里就分析完毕了,回见~

文章作者: Li Shuaiqi
文章链接: https://lishuaiqi.top/2025/09/26/kotlin/kotlin-cotoutine-coroutine_flow_backpress/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Li Shuaiqi's Blog