@PublishedApi internal open classCancellableContinuationImpl<in T>( final override val delegate: Continuation<T>, resumeMode: Int ) : DispatchedTask<T>(resumeMode), CancellableContinuation<T>, CoroutineStackFrame {
// result.toState 就是执行 onSuccess 或者 onFail 函数,假设 onSuccess,返回就是 CancellableContinuationImpl 自己; override fun resumeWith(result: Result<T>) = resumeImpl(result.toState(this), resumeMode) private fun resumeImpl( proposedUpdate: Any?, resumeMode: Int, onCancellation: ((cause: Throwable) -> Unit)? = null ) { //【2】处理自己的状态; _state.loop { state -> when (state) { is NotCompleted -> { //【1】更新状态; valupdate= resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null) if (!_state.compareAndSet(state, update)) return@loop// retry on cas failure detachChildIfNonResuable() //【2】分发结果; dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process return// done } is CancelledContinuation -> { /* * If continuation was cancelled, then resume attempt must be ignored, * because cancellation is asynchronous and may race with resume. * Racy exceptions will be lost, too. */ if (state.makeResumed()) { // check if trying to resume one (otherwise error) // call onCancellation onCancellation?.let { callOnCancellation(it, state.cause) } return// done } } } alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt) } }
publicfinal override fun run() { assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching valtaskContext=this.taskContext var fatalException: Throwable? = null try { valdelegate= delegate as DispatchedContinuation<T> valcontinuation= delegate.continuation withContinuationContext(continuation, delegate.countOrElement) { valcontext= continuation.context valstate= takeState() // NOTE: Must take state in any case, even if cancelled valexception= getExceptionalResult(state) /* * Check whether continuation was originally resumed with an exception. * If so, it dominates cancellation, otherwise the original exception * will be silently lost. */ // 这里就是唤醒的地方; valjob=if (exception == null && resumeMode.isCancellableMode) context[Job] elsenull if(job != null && !job.isActive) { valcause= job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { if (exception != null) { continuation.resumeWithException(exception) } else { continuation.resume(getSuccessfulResult(state)) } } } } catch (e: Throwable) { // This instead of runCatching to have nicer stacktrace and debug experience fatalException = e } finally { valresult= runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } }s