kotlin 协程原理分析 - 挂起函数的原理

0 前言

suspend 函数,就是被 suspend 修饰的 function,本篇文章简单分析下 suspend 函数的工作原理和编译处理:

1 suspend 函数的例子

1
2
3
suspend fun helloWorld() {
println("hellow world")
}

suspend 关键字表示挂起,意味着该函数会被 kotlin 的编译器黑魔法处理。

2 suspend 关键字作用

场景限制

首先 suspend 会限制该函数的调用场景:只能在协程体或者另外一个 suspend 函数中执行,这是从编译器和语法角度的约束;

编译处理

suspend 函数会让 kotlin 编译器做针对性处理,内部会有一些黑魔法,用于实现挂起的功能,以及安全校验等功能;

挂起能力

suspend 函数可以实现协程挂起,减少线程上下文的切换;

3 suspend 函数原理分析

简单分析

还是上面的那个例子,经过编译器处理后,可以看到,编译器增加了一个参数:$completion:

1
2
3
4
5
6
@Nullable
public static final Object helloWorld(@NotNull Continuation $completion) {
String var1 = "hellow world";
System.out.println(var1);
return Unit.INSTANCE;
}

这个 $completion 参数表示,调用 helloWorld 函数的外部 Continuation:

  • 可能是一个协程体;
  • 也有可能是另一个 suspend 函数的 Continuation;

但是这样写,有个问题,我们没法直接拿到这个 $completion,需要有一个机制能把他暴漏出来,这就是 suspendCancellableCoroutine 函数的作用。

suspendCancelableCoroutine

下面我们来看看 suspendCancellableCoroutine 的原理:

1
2
3
4
5
6
7
8
9
10
11
//【1】进入到这里:suspendCancellableCoroutine
suspend fun requestForData() = suspendCancellableCoroutine<String> { continuation ->
getData(object: Callback) {
override fun onSuccess(str: String) {
continuation.resumeWith(Result.success("kapibala,makabaka."))
}
override fun onFailure(exception: Throwable) {
continuation.resumeWithException(exception)
}
}
}

suspendCancellableCoroutine

核心逻辑,进入 suspendCoroutineUninterceptedOrReturn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
//【1】执行校验,不重要;
//【2】这里的 uCont 对象就是前面的 $completion
suspendCoroutineUninterceptedOrReturn { uCont ->
//【2】创建了 CancellableContinuationImpl 对象,包裹外部的 $completion
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
/*
* For non-atomic cancellation we setup parent-child relationship immediately
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
* properly supports cancellation.
*/
cancellable.initCancellability()
//【3】执行我们的逻辑,并传入 CancellableContinuationImpl;
block(cancellable)
//【4】立刻返回结果,如果是自线程的话,返回的是 suspend 状态;
cancellable.getResult()
}

suspendCoroutineUninterceptedOrReturn 方法不用看,用于执行校验。

核心在于上面的这段逻辑。

  • 创建了 CancellableContinuationImpl 对象,wrapper 外面的 $completion;

  • 这里包裹的是 $completion.intercepted(),如果你知道协程微模型的话,其实包裹关系如下:

    • CancellableContinuationImpl –> DispatcheredContinuation –> 协程体 Continuation
    • DispatcheredContinuation 的目的就是为了切线程,当 suspend 函数回复后能回到特定线程中去;
  • block(cancellable) 传入 CancellableContinuationImpl 对象,执行我们的逻辑;
  • cancellable.getResult() 返回当前的结果,如果切了线程,那会返回 suspend 状态
    • 否则通过 CancellableContinuationImpl 的 resumeWith 返回结构;

其实已经可以看出 suspendCancellableCoroutine 的原理了。

CancellableContinuationImpl

可以看到 CancellableContinuationImpl 也是一个 DispatchedTask 对象:

1
2
3
4
5
@PublishedApi
internal open class CancellableContinuationImpl<in T>(
final override val delegate: Continuation<T>,
resumeMode: Int
) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {

getResult()

获取结果;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@PublishedApi
internal fun getResult(): Any? {
val isReusable = isReusable()
// trySuspend may fail either if 'block' has resumed/cancelled a continuation
// or we got async cancellation from parent.
//【1】看是否属于 suspend 状态
if (trySuspend()) {
if (parentHandle == null) {
installParentHandle()
}
if (isReusable) {
releaseClaimedReusableContinuation()
}
//【2】返回挂起状态;
return COROUTINE_SUSPENDED
}
if (isReusable) {
releaseClaimedReusableContinuation()
}
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
if (resumeMode.isCancellableMode) {
val job = context[Job]
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
throw recoverStackTrace(cause, this)
}
}
//【4】立刻返回结果;
return getSuccessfulResult(state)
}

如果没有切线程,那么 resumeWith 会在当前线程执行,那么协程的状态就会被切为 RESUME,那么 trySuspend() 返回 false,那么 getSuccessfulResult 会立刻返回结果;

如果切了线程,那么 resumeWith 和 block(cancellable) 在不同的线程执行,默认协程是 DECIDE 状态,此时 trySuspend() 返回 true,那么就返回 SUSPEND 状态,协程挂起啦。

resumeWith

如果我们异步获取到结果,通过 CancellableContinuationImpl 的 resumeWith 返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// result.toState 就是执行 onSuccess 或者 onFail 函数,假设 onSuccess,返回就是 CancellableContinuationImpl 自己;
override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(this), resumeMode)


private fun resumeImpl(
proposedUpdate: Any?,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)? = null
) {
//【2】处理自己的状态;
_state.loop { state ->
when (state) {
is NotCompleted -> {
//【1】更新状态;
val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
detachChildIfNonResuable()
//【2】分发结果;
dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
return // done
}
is CancelledContinuation -> {
/*
* If continuation was cancelled, then resume attempt must be ignored,
* because cancellation is asynchronous and may race with resume.
* Racy exceptions will be lost, too.
*/
if (state.makeResumed()) { // check if trying to resume one (otherwise error)
// call onCancellation
onCancellation?.let { callOnCancellation(it, state.cause) }
return // done
}
}
}
alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt)
}
}

dispatchResume

dispatchResume 最终会调用到 dispatch 方法里面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method
val delegate = this.delegate
val undispatched = mode == MODE_UNDISPATCHED
if (!undispatched && delegate is DispatchedContinuation<*>
&& mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
//【1】这里就是在外部协程的 Dispatchers 线程里面分发自己;
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
resumeUnconfined()
}
} else {
// delegate is coming from 3rd-party interceptor implementation (and does not support cancellation)
// or undispatched mode was requested
resume(delegate, undispatched)
}
}

run()

唤醒外部协程体的 DispatcherContinuation 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
// 这里就是唤醒的地方;
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}s

4 总结

挂起函数的原理主要有 2 个:编译时和运行时。

  • 编译时通过黑魔法,修改函数的参数,同时传递外层的 continuation。
  • 运行时通过 Continuation 的嵌套包裹,实现唤醒和挂起。
文章作者: Li Shuaiqi
文章链接: https://lishuaiqi.top/2025/04/22/kotlin/kotlin-cotoutine-suspend_function/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Li Shuaiqi's Blog