kotlin 协程原理分析 - 启动微模型

(同步下学习记录)

0 前言

简单分析下,kotlin 协程的启动微模型,协程由于其结构化的原理,父子协程之间有依赖关系,而协程的执行也是有依赖关系,基于微模型。

只要搞清楚了这个微模型,那么协程的嵌套也就了如指掌了。

下面以 launch 为例子,我们分析下微模型。

1 启动一个简单协程

下面分析下 launch 的启动流程,核心的代码是到这个地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
//【1】默认的 job 情况;
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//【2】进入 job 启动环节;
coroutine.start(start, coroutine, block)
return coroutine
}

#CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
//【1】默认模式启动;
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}

无论走哪个分支,都是调用 block 的函数,而 ==block 就是一个 suspend 修饰的函数==。

startCoroutineCancellable

以 DEFAULT 为例 startCoroutineCancellable 接下来会调用到 IntrinsicsJvm.kt 里的:

1
2
3
4
5
6
@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion)
.intercepted()
.resumeCancellableWith(Result.success(Unit))
}

到这里就看到了协程体的创建三部曲啦。

  • 1、createCoroutineUnintercepted 创建协程体,关联 job;

  • 2、intercepted 执行拦截器逻辑;

  • 3、resumeCancellableWith 启动协程

下面可以分别看看他们的流程。

2. createCoroutineUnintercepted

该函数带了俩参数,其中的 receiver 为接收者,而 completion 为协程结束后调用的回调:

为什么看不到源码呢?

  1. 编译器内置函数createCoroutineUnintercepted 是一个编译器内置函数(intrinsic),它的实际实现是在 Kotlin 编译器层面完成的,而不是在标准库中提供具体的 Kotlin 代码。
  2. 平台相关实现:不同的平台(JVM、Native、JS)有不同的实现方式。
1
2
3
4
5
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
// 实际实现由编译器提供
}

当然这里我们关注 JVM 平台的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//【1】核心逻辑:BaseContinuationImpl
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}

BaseContinuationImpl 是什么?

这里的 this 是我们的协程体对象,那么协程体对象是什么样子呢?一定是 实现了 BaseContinuationImpl 的一个对象。

编译器黑魔法

在这个例子中:

1
2
3
4
GlobalScope.launch {
delay(1000L)
println("GlobalScope 协程执行完毕!")
}

我们的协程体经过编译器处理后会生成一个匿名内部类,并创建其对应的实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, 
//【1】就是这个地方
(Function2)(new Function2((Continuation)null) {
int label;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) { //【2】invokeSuspend
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000L, this) == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

String var2 = "GlobalScope 协程执行完毕!";
System.out.println(var2);
return Unit.INSTANCE;
}

@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion"); //【2】create anonymous
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}

public final Object invoke(Object var1, Object var2) { //【3】invoke 函数特性 --> 【4】
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);

这个匿名内部类实现了 Function2 接口,实现了 invoke 方法。

SuspendLambda 父类

1
2
3
4
5
6
7
8
#ContinuationImpl.kt
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>? //【1】关联 job
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
...
}

我们的匿名内部类实现了这个 SuspendLambda 类。

  • resumeWith 是 Continuation 的接口;
  • ContinuationImpl 实现了 BaseContinuationImpl,BaseContinuationImpl 实现了 Continuation;
  • invokeSuspend 和 create 是 BaseContinuationImpl 的接口;
1
2
3
4
5
6
7
8
#Continuation.kt
interface Continuation<in T> {
//【1】协程上下文
public val context: CoroutineContext

//【2】恢复协程
public fun resumeWith(result: Result<T>)
}

其实看逻辑已经能知道他的作用。

  • resumeWith 唤醒协程。
  • invokeSuspend 执行协程体。
  • create 创建协程体。

所以可以看到,createCoroutineUnintercepted 实际上就是调用了 匿名内部类对象的 create 方法,创建了协程体对象。

其实这个地方创建了 2 个,因为:new Function2 是中间对象,然后借助 create 创建我们真正的协程体对象。

3 intercepted

启动拦截,处理 dispatchers 逻辑:

1
2
3
4
5
6
7
8
9
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this


public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

可以看到,这一步是从 context 中取 ContinuationInterceptor,其实就是我们的 Dispatchers。

Dispatchers.IO

interceptContinuation 的作用是创建一个 DispatcheredContinuation 对象,把 Dispatchers 和协程体包裹在里面;

1
2
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation) //【1】

DispatchedContinuation

DispatchedContinuation 本质上是一个 DispatchedTask,实现了 Runnable 接口,也就意味着,可以被丢到线程池中运行:

1
2
3
4
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher, // Dispatchers.IO
@JvmField val continuation: Continuation<T> // 我们的协程体;
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {

同时,他也实现了 Continuation 接口,所以他也有 resumeWith 方法;

4 resumeCancellableWith

启动协程体,到这里就要启动协程体了:

1
2
3
4
5
6
7
8
9
@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
//【1】此时进入这里
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}

执行了 DispatchedContinuation 的 resumeCancellableWith 方法!

DispatchedContinuation

resumeCancellableWith

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// We inline it to save an entry on the stack in cases where it shows (unconfined dispatcher)
// It is used only in Continuation<T>.resumeCancellableWith
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//【1】核心:把自己丢到 dispatchers.io 的线程池中执行;
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}

核心:把自己丢到 dispatchers.io 的线程池中执行;

DispatchedContinuation 被丢进线程池啦;

DispatchedTask

run

此时 DispatchedContinuation 作为任务在线程池中执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
//【1】核心逻辑:执行协程体,在特定的线程池中;
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}

可以看到,这里已经开始触发,真正协程体的执行了!

5 真正协程体的执行

此时,真正协程体的就要执行了。

BaseContinuationImpl

resumeWith

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public abstract class BaseContinuationImpl implements Continuation<Object>, CoroutineStackFrame, Serializable {
private final Continuation<Object> completion;

... ... ...

//【1】用于挂起函数来唤醒协程体;
public final void resumeWith(Object paramObject) {
BaseContinuationImpl baseContinuationImpl = this;

while (true) {
DebugProbesKt.probeCoroutineResumed(baseContinuationImpl);
BaseContinuationImpl baseContinuationImpl1 = baseContinuationImpl;
Continuation<Object> continuation = baseContinuationImpl1.completion;

Intrinsics.checkNotNull(continuation);

try {
//【1】执行协程体的状态机逻辑;
paramObject = baseContinuationImpl1.invokeSuspend(paramObject);

//【2】如果返回的是挂起状态,那就跳出循环,直接返回;
if (paramObject == IntrinsicsKt.getCOROUTINE_SUSPENDED())
return;
} finally {
paramObject = null;
Result.Companion companion = Result.Companion;
}

baseContinuationImpl1.releaseIntercepted();
if (continuation instanceof BaseContinuationImpl)
continue;

//【2】执行成功,通知结果给 job;
continuation.resumeWith(paramObject);
return;
}
}

到这里,协程就执行完成啦。

6 微模型总结

总结下协程启动的微模型,协程的嵌套就是微模型嵌套。

  • 一个协程体,关联一个 job coroutine,协程之间的父子关系则是以 job coroutine 进行关联;
  • 协程体执行完成后会通过 job.resumeWith 通知执行结果;
  • 协程体会被 DispatchedContinuation 的 wrapper 对象包裹到内部,丢到 Dispatchers 线程池中执行;
  • DispatchedContinuation 是一个 Runnable,run 方法里会真正执行我们的协程体对象;

结束~

文章作者: Li Shuaiqi
文章链接: https://lishuaiqi.top/2025/04/12/kotlin/kotlin-cotoutine-instart2/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Li Shuaiqi's Blog