// returns NO_VALUE if cannot take value without suspension private fun tryTakeValue(slot: SharedFlowSlot): Any? { var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES valvalue=synchronized(this) { //【1】通过 SharedFlowSlot 返回合适的 index valindex= tryPeekLocked(slot) if (index < 0) { // -1 表示要挂起; NO_VALUE } else { //【2】计算下一个 index,并更新 slot; // 获取当前要访问的值:newValue; valoldIndex= slot.index valnewValue= getPeekedValueLockedAt(index) slot.index = index + 1// points to the next index after peeked one //【3】更新收集器 index,并尝试恢复等待的发射器协程,目的:提升并发性能。 resumes = updateCollectorIndexLocked(oldIndex) newValue } } //【4】我的理解是尝试唤醒其他的收集器协程,提升读取效率! for (resume in resumes) resume?.resume(Unit) return value }
// returns -1 if cannot peek value without suspension private fun tryPeekLocked(slot: SharedFlowSlot): Long { // return buffered value if possible valindex= slot.index //【1】当前索引小于缓冲区数据的末尾索引,说明缓冲区有数据可读,那就直接返回; // 否则,说明此时缓存区里面没有可读数据,或者没有缓冲区。 if (index < bufferEndIndex) return index //【2】检查是否有缓冲区,bufferCapacity 通过参数指定。 // 如果有缓存区但是没缓存数据,那么返回 -1,表示需要挂起,收集器永远不会尝试与发射者进行会合,也就是直接传递数据。 if (bufferCapacity > 0) return -1L// if there's a buffer, never try to rendezvous with emitters
//【3】这里是说明没有配置缓存区,属于同步共享流。 // head 是缓存数据的起始下标,index 大于 head 说明按照顺序,这个收集器靠后,不能立刻获取到数据。 // 返回 -1 表示挂起,确保永远第一个收集器接收发射者的数据。 // Synchronous shared flow (bufferCapacity == 0) tries to rendezvous if (index > head) return -1L// ... but only with the first emitter (never look forward) //【4】如果没有等待的发射者(发射队列为空),无法进行会合,那就返回 -1,表示挂起。 if (queueSize == 0) return -1L// nothing there to rendezvous with //【5】满足所有条件,与第一个等待的发射者直接会合。 return index // rendezvous with the first emitter }
这里的 rendezvous 直译过来是会合,简单理解就是:
不需要经过缓存区,发射器直接传递数据给收集器,这种同步共享流。
在同步共享流(无缓冲区)中:
发射者发送值时,如果没有订阅者准备好接收,发射者会挂起
订阅者尝试读取时,如果没有值可用,但有等待的发射者,它们可以直接 “会合”
这种会合允许值直接从发射者传递到订阅者,不经过缓冲区。
getPeekedValueLockedAt
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
private fun getPeekedValueLockedAt(index: Long): Any? = //【1】从环形缓冲区中获取指定索引处的元素 when (valitem= buffer!!.getBufferAt(index)) { //【2】如果是 Emitter 对象,从中获取值。 is Emitter -> item.value //【3】其他的情况,直接获取到缓存值。 else -> item }
// Emitter privateclassEmitter( @JvmField val flow: SharedFlowImpl<*>, @JvmFieldvar index: Long, @JvmField val value: Any?, @JvmField val cont: Continuation<Unit> // 发射器所在的协程体。 ) : DisposableHandle { override fun dispose() = flow.cancelEmitter(this) }
//【1】不支持重播缓存,那么这个值丢弃掉,假装发送成功。 if (replay == 0) returntrue
//【2】数据进缓存队列。 enqueueLocked(value) bufferSize++
//【3】对重播数据做调整,根据背压机制。 if (bufferSize > replay) dropOldestLocked()
//【4】没有收集器,所以当前所有的数据都不需要被处理,那么 minCollectorIndex 直接设置到 buffer 末尾。 minCollectorIndex = head + bufferSize
returntrue }
上面是插入的逻辑分析:
三个背压策略的行为对比
策略
当缓冲区满且被阻塞时的行为
返回值
发射者状态
SUSPEND
发射者需要挂起等待,不发射
false
挂起,直到有空间
DROP_LATEST
丢弃新发射的值
true
继续逻辑,新值会丢失
DROP_OLDEST
接受新值,丢弃缓冲区中最旧的值
true
继续逻辑,旧值会丢失
缓冲区结构分析
如下是 SharedFlow 内部的核心数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13
// Stored state privatevar buffer: Array<Any?>? = null// allocated when needed, allocated size always power of two privatevarreplayIndex=0L// minimal index from which new collector gets values privatevarminCollectorIndex=0L// minimal index of active collectors, equal to replayIndex if there are none privatevarbufferSize=0// number of buffered values privatevarqueueSize=0// number of queued emitters
// Computed state private val head: Long get() = minOf(minCollectorIndex, replayIndex) private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt() private val totalSize: Int get() = bufferSize + queueSize private val bufferEndIndex: Long get() = head + bufferSize private val queueEndIndex: Long get() = head + bufferSize + queueSize
/* buffered values /-----------------------\ replayCache queued emitters /----------\/----------------------\ +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ | | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E | | | | +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ <sup> </sup> <sup> </sup> | | | | 1.head | head + bufferSize head + totalSize 4.queueEndIndex | | | index of the slowest | index of the fastest possible collector | possible collector 3.bufferEndIndex | | | 2.replayIndex == new collector's index \---------------------- / range of possible 5.minCollectorIndex head == minOf(minCollectorIndex, replayIndex) // by definition totalSize == bufferSize + queueSize // by definition INVARIANTS: minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize) replayIndex <= head + bufferSize */
根据图示我们做一些简单的梳理:
缓存区的核心存储区
1
privatevar buffer: Array<Any?>? = null// allocated when needed, allocated size always power of two
privatevarminCollectorIndex=0L// minimal index of active collectors, equal to replayIndex if there are none /* INVARIANTS: minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize) */
index < minCollectorIndex 且 索引 < replayIndex 的数据(不再被访问且不再被重播)
必须保留的数据:
index ≥ minCollectorIndex 的数据(收集器需要)
index ≥ replayIndex 的数据(重放缓存)
缓冲区的区间
基于前面的 index,我们继续梳理如下的区间概念:
replay cache - 重播缓存
1
private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
靠近缓冲区的尾部,为新订阅者提供历史粘性数据,范围由 replay 参数配置。
1 2 3 4 5 6 7 8 9 10 11
override val replayCache: List<T> get() = synchronized(this) { valreplaySize=this.replaySize if(replaySize == 0)return emptyList() valresult= ArrayList<T>(replaySize) valbuffer= buffer!! // must be allocated, because replaySize > 0 @Suppress("UNCHECKED_CAST") // 这里能看到范围; for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T result }
buffered values - 缓冲值
1
privatevarbufferSize=0// number of buffered values
// StateFlowSlot privateclassStateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() { /** * Each slot can have one of the following states: * * * `null` -- it is not used right now. Can [allocateLocked] to new collector. * * `NONE` -- used by a collector, but neither suspended nor has pending value. * * `PENDING` -- pending to process new value. * * `CancellableContinuationImpl<Unit>` -- suspended waiting for new value. * * It is important that default `null` value is used, because there can be a race between allocation * of a new slot and trying to do [makePending] on this slot. */ privateval_state= atomic<Any?>(null)
override fun allocateLocked(flow: StateFlowImpl<*>): Boolean { // No need for atomic check & update here, since allocated happens under StateFlow lock if (_state.value != null) returnfalse// not free _state.value = NONE // allocated returntrue }
override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> { _state.value = null// free now return EMPTY_RESUMES // nothing more to do }
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state -> assert { state !is CancellableContinuationImpl<*> } // can be NONE or PENDING return state === PENDING //【1】state 为 PENDING 表示准备处理值。 }
@Suppress("UNCHECKED_CAST") suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont -> assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING //【1】尝试将状态从 NONE 改为挂起 if (_state.compareAndSet(NONE, cont)) return@sc// 挂起成功,等待被 resume,suspend 返回。
fun makePending() { _state.loop { state -> when { state == null -> return// this slot is free - skip it state === PENDING -> return// already pending, nothing to do state === NONE -> { //【1】将 slot 更改为 PENDING 状态,这表明收集器协程序在处理新值的时候,又来了新值。 if (_state.compareAndSet(state, PENDING)) return } else -> { // must be a suspend continuation state // we must still use CAS here since continuation may get cancelled and free the slot at any time //【2】恢复协程,并修改为 NONE 状态。 if (_state.compareAndSet(state, NONE)) { (state as CancellableContinuationImpl<Unit>).resume(Unit) return } } } } }
// StateFlowImpl override suspend fun collect(collector: FlowCollector<T>): Nothing { //【1】和 SharedFlow 一样的操作,但是创建的是 StateFlowSlot 对象。 valslot= allocateSlot() try { if (collector is SubscribedFlowCollector) collector.onSubscription() valcollectorJob= currentCoroutineContext()[Job] //【2】旧状态数据。 var oldState: Any? = null// previously emitted T!! | NULL (null -- nothing emitted yet) // The loop is arranged so that it starts delivering current value without waiting first //【3】自旋,不断的读取数据。 while (true) { // Here the coroutine could have waited for a while to be dispatched, // so we use the most recent state here to ensure the best possible conflation of stale values valnewState= _state.value // always check for cancellation collectorJob?.ensureActive()
//【3】比较数据的区别,只有第一次发送或者数据不同,会立刻发送值; if (oldState == null || oldState != newState) { collector.emit(NULL.unbox(newState)) oldState = newState } //【3】判断是否进入了 pending 状态,没有的话执行 awaitPending 等待 pending // awaitPending 可能会挂起协程!! // 如果被 cancelled 的话,那么会跑出 canceledException 异常,然后退出 loop,执行 freeSlot。 if (!slot.takePending()) { // try fast-path without suspending first slot.awaitPending() // only suspend for new values when needed } } } finally { freeSlot(slot) } }
可以看到:
如果有新值的话, 会先发送,再立刻尝试挂起协程。
如果已经是 pending 状态,会直接返回。
emit – 发射器
很简单,核心是对状态变量 value 的 set:
1 2 3
override suspend fun emit(value: T) { this.value = value }