kotlin 协程原理分析 - 启动方式比较分析

0 前言

基于微模型,分析下协程常见的启动方式的差异和对比。

常见的启动方式有:launch、async、runblock 下面会对比分析下三者的区别。

细节就不讲了,都是微模型的那一套,区别在于启动方式、异常机制、分发器等的不同,这个后续在分析。

1 launch 启动

1
2
3
4
5
6
7
8
9
10
11
12
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

以上是 launch 启动的核心逻辑。

默认模式下的 launch 启动,job 是 StandaloneCoroutine,这种情况是立刻会启动该协程。

launch 也是支持 isLazy 启动的,job 是 LazyStandaloneCoroutine,我们看看二者的区别

StandaloneCoroutine

标准 DEFAULT 启动,这里是在后续 CoroutineStart 的时候会直接启动协程:

1
2
3
4
5
6
7
8
9
10
11
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {

//【1】异常处理机制;
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}

LazyStandaloneCoroutine

他是 StandaloneCoroutine 的扩展,所以具有一样的异常处理机制。

延迟启动,可以看到这里他把 block 存起来了,在 onStart 方法的时候会创建协程体,然后执行:

1
2
3
4
5
6
7
8
9
10
11
12
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
//【1】这里是 continuation 对象;
private val continuation = block.createCoroutineUnintercepted(this, this)

override fun onStart() {
//【2】通过 start 延迟启动协程,会走到这里;;
continuation.startCoroutineCancellable(this)
}
}

注意:onStart 方法:

onStart()

这里是通过 start 方法延迟启动的回调,下面会分析:

1
2
3
4
override fun onStart() {
//【1】通过 start 延迟启动协程,会走到这里;;
continuation.startCoroutineCancellable(this)
}

CoroutineStart

任何启动方式,都会最后都会到 CoroutineStart 这个地方:

1
2
3
4
5
6
7
8
9
// 执行 invoke 函数;;
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion) // 如果是 DEFAULT,立刻启动;
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // 如果是 LAZY,那么延迟启动;
}

DEFAULT,立刻启动,不多说了;

LAZY 模式通过 start 启动

懒加载启动方式,例子如下:

1
2
3
4
5
6
7
8
val conx = GlobalScope.launch(start = CoroutineStart.LAZY) {
println("GlobalScope 协程开始执行")
withContext(Dispatchers.IO) {
println("线程切换执行完毕!")
}
}
// 使用 start 方法启动
conx.start()

JobSupport.start

start 具体的实现是是在 AbstractCoroutine 的父接口 JobSupport 里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final override fun start(): Boolean {
loopOnState { state ->
when (startInternal(state)) { // qi
FALSE -> return false
TRUE -> return true
}
}
}

//【1】startInternal
private fun startInternal(state: Any?): Int {
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) return FALSE // already active
if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
//【1】这里触发 onStart 启动;
onStart()
return TRUE
}
is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
if (!_state.compareAndSet(state, state.list)) return RETRY
onStart()
return TRUE
}
else -> return FALSE // not a new state
}
}

可以看到这里调用了 onStart() 方法,

join 方法

和 launch 方法对应的有个 join 方法,用于等待协程执行完成,如下是例子:

1
2
3
4
5
6
7
8
9
10
GlobalScope.launch {
val conx = CoroutineScope(Dispatchers.IO).launch(start = CoroutineStart.DEFAULT) {
println("GlobalScope 协程开始执行")
withContext(Dispatchers.IO) {
println("线程切换执行完毕!")
}
}
// 挂起函数;
conx.join()
}

join 的原理是什么呢?

原理分析

接口在 JobSupport 中:

JobSupport

join –> joinSuspend

执行挂起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//【1】JobSupport
public final override suspend fun join() {
if (!joinInternal()) { // fast-path no wait
coroutineContext.ensureActive()
return // do not suspend
}
//【1】核心 joinSuspend;
return joinSuspend() // slow-path wait
}

// joinSuspend 是一个挂起函数;
// cont 是 CancellableContinuationImpl 对象,CancellableContinuationImpl 会包裹 DispatchedContinuation 对象;
private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
// invokeOnCompletion 注册一个完成回调处理对象;
// disposeOnCancellation 用于在协程取消时自动清理资源;
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler))
}

//【2】ResumeOnCompletion 执行结束通知对象;
private class ResumeOnCompletion(
private val continuation: Continuation<Unit>
) : JobNode() {
//【2.1】回复外面的协程;
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
}
// 父类 JobNode,其中 asHandler 是 CompletionHandlerBase 扩展函数,返回的是 this,这个不多说,可以自己去跟跟代码;
internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete {
... ... ...
}

cont 是 CancellableContinuationImpl 对象,分析 suspend 函数的时候有分析过。

ResumeOnCompletion 是一个执行结束通知对象,这里是用于在当前协程执行完成后通知外部协程,其中 asHandler 是扩展函数,返回的是 this,这个不多说,可以自己去跟跟代码。

makeNode

先看这个方法,创建节点,将 ResumeOnCompletion 对象封装 成一个 JobNode,同时关联 Job,然后会保存到当前协程内部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode {
val node = if (onCancelling) {
(handler as? JobCancellingNode) ?: InvokeOnCancelling(handler) //【1】包装为取消节点
} else {
(handler as? JobNode)?.also { assert { it !is JobCancellingNode } }
?: InvokeOnCompletion(handler) //【2】包装为完成节点
}
node.job = this
return node
}

// InvokeOnCompletion 节点;;
private class InvokeOnCompletion(
private val handler: CompletionHandler
) : JobNode() {
override fun invoke(cause: Throwable?) = handler.invoke(cause)
}
  • InvokeOnCancelling: 包装为取消节点;
  • InvokeOnCompletion: 包装为完成节点;

具体的细节,你们可以自己跟跟代码。

invokeOnCompletion 注册一个完成回调对象

这个代码看起来复杂的,其实也不简单,核心逻辑是:根据当前的 job 的 state 状态,注册 ResumeOnCompletion 对象;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public final override fun invokeOnCompletion(
onCancelling: Boolean, // false
invokeImmediately: Boolean, // true
handler: CompletionHandler
): DisposableHandle {
//【1】创建回调对应的 node 节点;
val node: JobNode = makeNode(handler, onCancelling)
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
if (_state.compareAndSet(state, node)) return node
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
is Incomplete -> { //【2】当前 job 还没有执行完成;
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode)
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
// 这里和取消相关,暂时不关注;
if (onCancelling && state is Finishing) {
synchronized(state) {
rootCause = state.rootCause // != null if cancelling job
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
if (rootCause == null) return node
handle = node
}
}
}
if (rootCause != null) {
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
} else {
//【2】核心逻辑:将前面的 ResumeOnCompletion 对应的 Node 添加到 state.list 中记录下来;
if (addLastAtomic(state, list, node)) return node
}
}
}
else -> {
//【3】当前 job 已经执行完成,立刻执行 ResumeOnCompletion 的 invoke 函数。
// 通知外部协程执行结束,resumeWith;
if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
return NonDisposableHandle
}
}
}
}

将前面的 ResumeOnCompletion 对应的 Node 添加到 state.list 中记录下来;

completeStateFinalization

当前协程执行完成后,会触发到这个函数,在这里会统一的通过 state.list 里面记录的 Node 来间接 resume 外部的协程;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// job 的 resumeWith
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}


// suppressed == true when any exceptions were suppressed while building the final completion cause
private fun completeStateFinalization(state: Incomplete, update: Any?) {
parentHandle?.let {
it.dispose()
parentHandle = NonDisposableHandle
}
val cause = (update as? CompletedExceptionally)?.cause
/*
* 2) Invoke completion handlers: .join(), callbacks etc.
* It's important to invoke them only AFTER exception handling and everything else, see #208
*/
if (state is JobNode) {
try {
state.invoke(cause)
} catch (ex: Throwable) {
handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
}
} else {
//【1】核心逻辑:
state.list?.notifyCompletion(cause)
}
}

总结

  • 1、调用 join() 函数的协程体会被封装成一个 Node 节点,添加到被等待的协程的 job 的 state.list。
  • 2、被等待的协程执行完成后,会通知对应的 job,job 会在 completeStateFinalization 中遍历 state.list 唤醒外面的等待的协程。

2 async 启动

我们来看看 async 启动的核心逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else // y
DeferredCoroutine<T>(newContext, active = true) // 默认启动
coroutine.start(start, coroutine, block)
return coroutine
}

默认情况下:async 也是通过 DEFAULT 方式启动,并且是立刻启动。

协程体的差异

1
block: suspend CoroutineScope.() -> T

这里能看出,async 的 block 是有返回值的。

对应的编译脚本生成的 SuspendLambda 的 invokeSuspend 函数的返回值会被强转为 T 类型!

同时,也会将 invokeSuspend 返回的结果,通过 completion.resumeWith 传递给 Job。

这个和 launch 启动的协程是不一样的,launch 协程返回值是 unit,和 java void 是一个意思。

LAZY 模式通过 start 启动

对于 lazy 方式,async 也是支持的,配合 start() 方法就行了。

这个不多讲了,和 launch 没啥太大区别。

DeferredCoroutine

如果是默认启动类型的话,那么走的是 DeferredCoroutine,LazyDeferredCoroutine 是 DeferredCoroutine 的子类。

1
2
3
4
5
6
7
8
9
10
11
12
@Suppress("UNCHECKED_CAST")
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> {
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T

override val onAwait: SelectClause1<T> get() = this // 和 select 有关系,这里先不关注~
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (T) -> R) =
registerSelectClause1Internal(select, block)
}

可以看到,其支持 await() 方法,其他协程能够通过 await() 实现。

onAwait 和 registerSelectClause1 是配合 select 操作符实现对多个协程体进行监听的逻辑,这里我们不多关注先。

await()

我们看一下 await 是如何拿到 async 的执行结果的,逻辑是不是和 launch 一样呢?

1
2
3
4
5
6
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont -> 
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.initCancellability()
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
cont.getResult()
}

果然是一样的操作。

  • onAwait 本质上就是一个挂起函数,所以外部协程会挂起。
  • invokeOnCompletion 依然是创建了一个 Node 节点,加入到当前协程 job 的 state.list 中
  • 最后统一唤醒。

其实 await 就是特殊的 join,join 传递的结果是特殊的 Unit 而已。

3 runBlocking 启动

最后看看 runBlocking,已经 runBlocking 的阻塞线程原理:

1
2
3
suspend fun main() = runBlocking {
... ... ... ...
}

核心逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Throws(InterruptedException::class)
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()

//【1】获取 Dispatchers 方法。
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext

//【2】根据是否指定 Dispatchers 做处理。
// 如果没有指定 Dispatchers,那么采用的是 ThreadLocalEventLoop。
if (contextInterceptor == null) {
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// 如果指定了 Dispatchers,看看是否是 EventLoop 是的话赋值 eventLoop
// 同时创建新上下文 CoroutineContext,这里会包含外部执行的
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
//【3】创建 BlockingCoroutine 这个 job,并启动;
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)

//【4】调用 joinBlocking 等待结果;
return coroutine.joinBlocking()
}

这里涉及到了一些关键点:

  • ThreadLocalEventLoop.eventLoop 的原理:BlockingEventLoop;
  • joinBlocking 的原理;
  • BlockingCoroutine 的原理;

BlockingEventLoop

其实最终返回的是一个 BlockingEventLoop

1
internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())

BlockingEventLoop 父类里面有一个队列:

1
2
3
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)

回忆下微模型,DispatchedContinuations 对象会被添加到 Dispatcher 中去执行。

其实在 BlockingEventLoop 的场景下,就是被添加到 BlockingEventLoop 的 quue 队列中了。

同时注意 BlockingEventLoop 的线程:currentThread,也就是说,是同步执行。

enqueue

dispatch 的过程就是 enqueue 的过程:

1
2
3
4
5
6
7
8
9
10
public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)

open fun enqueue(task: Runnable) {
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
DefaultExecutor.enqueue(task)
}
}

joinBlocking

我们看看 joinBlocking 函数,可以看到 joinBlocking 会进入到自旋状态,然后不断的从前面的 queue 中获取任务执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
@Suppress("DEPRECATION")
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }

//【1】从队列中弹出 Task 对象;
// 如果没有 eventLoop,那么就无限 park!!
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // 计算 park 时间

if (isCompleted) break
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// now return result
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}

override fun processNextEvent(): Long {
if (processUnconfinedEvent()) return 0
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break
}
}
//【1】核心:dequeue task 并执行;
val task = dequeue()
if (task != null) {
platformAutoreleasePool { task.run() }
return 0
}
return nextTime
}

所以如果不指定线程,默认就在当前线程,那么是同步执行的!!!

当前线程在执行完 task 后,会再次进入自旋状态,然后阻塞等待!!!

指定 Dispathers

如果指定了 Dispathers 的话呢?那么就没 loop 内部的队列了,那么当前线程会在 eventloop 的 processNextEvent 的地方一直阻塞。

BlockingCoroutine

知道协程体执行完成后,通知 BlockingCoroutine,resumeWith 返回里面在 afterCompletion 后会,unpark blockedThread 线程:

1
2
3
4
5
override fun afterCompletion(state: Any?) {
// wake up blocked thread
if (Thread.currentThread() != blockedThread)
unpark(blockedThread)
}

这样,调用 joinBlocking 的线程就会被唤醒了!

4 总结

本篇文章,总结了下协程常见的启动函数的原理,嗯嗯。

不同的启动模式有各自的特色,在特定场景使用才是关键。

文章作者: Li Shuaiqi
文章链接: https://lishuaiqi.top/2025/05/25/kotlin/kotlin-cotoutine-launch-diff/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Li Shuaiqi's Blog