0 前言 基于微模型,分析下协程常见的启动方式的差异和对比。
常见的启动方式有:launch、async、runblock 下面会对比分析下三者的区别。
细节就不讲了,都是微模型的那一套,区别在于启动方式、异常机制、分发器等的不同,这个后续在分析。
1 launch 启动 1 2 3 4 5 6 7 8 9 10 11 12 public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true ) coroutine.start(start, coroutine, block) return coroutine }
以上是 launch 启动的核心逻辑。
默认模式下的 launch 启动,job 是 StandaloneCoroutine,这种情况是立刻会启动该协程。
launch 也是支持 isLazy 启动的,job 是 LazyStandaloneCoroutine,我们看看二者的区别
StandaloneCoroutine 标准 DEFAULT 启动,这里是在后续 CoroutineStart 的时候会直接启动协程:
1 2 3 4 5 6 7 8 9 10 11 private open class StandaloneCoroutine ( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<Unit>(parentContext, initParentJob = true , active = active) { override fun handleJobException (exception: Throwable) : Boolean { handleCoroutineException(context, exception) return true } }
LazyStandaloneCoroutine 他是 StandaloneCoroutine 的扩展,所以具有一样的异常处理机制。
延迟启动,可以看到这里他把 block 存起来了,在 onStart 方法的时候会创建协程体,然后执行:
1 2 3 4 5 6 7 8 9 10 11 12 private class LazyStandaloneCoroutine ( parentContext: CoroutineContext, block: suspend CoroutineScope.() -> Unit ) : StandaloneCoroutine(parentContext, active = false ) { private val continuation = block.createCoroutineUnintercepted(this , this ) override fun onStart () { continuation.startCoroutineCancellable(this ) } }
注意:onStart 方法:
onStart() 这里是通过 start 方法延迟启动的回调,下面会分析:
1 2 3 4 override fun onStart () { continuation.startCoroutineCancellable(this ) }
CoroutineStart 任何启动方式,都会最后都会到 CoroutineStart 这个地方:
1 2 3 4 5 6 7 8 9 @InternalCoroutinesApi public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit = when (this ) { DEFAULT -> block.startCoroutineCancellable(receiver, completion) ATOMIC -> block.startCoroutine(receiver, completion) UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) LAZY -> Unit }
DEFAULT,立刻启动,不多说了;
LAZY 模式通过 start 启动 懒加载启动方式,例子如下:
1 2 3 4 5 6 7 8 val conx = GlobalScope.launch(start = CoroutineStart.LAZY) { println("GlobalScope 协程开始执行" ) withContext(Dispatchers.IO) { println("线程切换执行完毕!" ) } } conx.start()
JobSupport.start start 具体的实现是是在 AbstractCoroutine 的父接口 JobSupport 里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public final override fun start () : Boolean { loopOnState { state -> when (startInternal(state)) { FALSE -> return false TRUE -> return true } } } private fun startInternal (state: Any?) : Int { when (state) { is Empty -> { if (state.isActive) return FALSE if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY onStart() return TRUE } is InactiveNodeList -> { if (!_state.compareAndSet(state, state.list)) return RETRY onStart () return TRUE } else -> return FALSE } }
可以看到这里调用了 onStart() 方法,
join 方法 和 launch 方法对应的有个 join 方法,用于等待协程执行完成,如下是例子:
1 2 3 4 5 6 7 8 9 10 GlobalScope.launch { val conx = CoroutineScope(Dispatchers.IO).launch(start = CoroutineStart.DEFAULT) { println("GlobalScope 协程开始执行" ) withContext(Dispatchers.IO) { println("线程切换执行完毕!" ) } } conx.join() }
join 的原理是什么呢?
原理分析 接口在 JobSupport 中:
JobSupport join –> joinSuspend 执行挂起
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public final override suspend fun join () { if (!joinInternal()) { coroutineContext.ensureActive() return } return joinSuspend() } private suspend fun joinSuspend () = suspendCancellableCoroutine<Unit> { cont -> cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler)) } private class ResumeOnCompletion ( private val continuation: Continuation<Unit> ) : JobNode() { override fun invoke (cause: Throwable?) = continuation.resume(Unit) } internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete { ... ... ... }
cont 是 CancellableContinuationImpl 对象,分析 suspend 函数的时候有分析过。
ResumeOnCompletion 是一个执行结束通知对象,这里是用于在当前协程执行完成后通知外部协程,其中 asHandler 是扩展函数,返回的是 this,这个不多说,可以自己去跟跟代码。
makeNode 先看这个方法,创建节点,将 ResumeOnCompletion 对象封装 成一个 JobNode,同时关联 Job,然后会保存到当前协程内部:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private fun makeNode (handler: CompletionHandler, onCancelling: Boolean) : JobNode { val node = if (onCancelling) { (handler as? JobCancellingNode) ?: InvokeOnCancelling(handler) } else { (handler as? JobNode)?.also { assert { it !is JobCancellingNode } } ?: InvokeOnCompletion(handler) } node.job = this return node } private class InvokeOnCompletion ( private val handler: CompletionHandler ) : JobNode() { override fun invoke (cause: Throwable?) = handler.invoke(cause) }
InvokeOnCancelling: 包装为取消节点;
InvokeOnCompletion: 包装为完成节点;
具体的细节,你们可以自己跟跟代码。
invokeOnCompletion 注册一个完成回调对象 这个代码看起来复杂的,其实也不简单,核心逻辑是:根据当前的 job 的 state 状态,注册 ResumeOnCompletion 对象;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public final override fun invokeOnCompletion ( onCancelling: Boolean, // false invokeImmediately: Boolean, // true handler: CompletionHandler ) : DisposableHandle { val node: JobNode = makeNode(handler, onCancelling) loopOnState { state -> when (state) { is Empty -> { if (state.isActive) { if (_state.compareAndSet(state, node)) return node } else promoteEmptyToNodeList(state) } is Incomplete -> { val list = state.list if (list == null ) { promoteSingleToNodeList(state as JobNode) } else { var rootCause: Throwable? = null var handle: DisposableHandle = NonDisposableHandle if (onCancelling && state is Finishing) { synchronized (state) { rootCause = state.rootCause if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) { if (!addLastAtomic(state, list, node)) return @loopOnState if (rootCause == null ) return node handle = node } } } if (rootCause != null ) { if (invokeImmediately) handler.invokeIt(rootCause) return handle } else { if (addLastAtomic(state, list, node)) return node } } } else -> { if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) return NonDisposableHandle } } } }
将前面的 ResumeOnCompletion 对应的 Node 添加到 state.list 中记录下来;
completeStateFinalization 当前协程执行完成后,会触发到这个函数,在这里会统一的通过 state.list 里面记录的 Node 来间接 resume 外部的协程;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public final override fun resumeWith (result: Result<T>) { val state = makeCompletingOnce(result.toState()) if (state === COMPLETING_WAITING_CHILDREN) return afterResume(state) } private fun completeStateFinalization (state: Incomplete, update: Any?) { parentHandle?.let { it.dispose() parentHandle = NonDisposableHandle } val cause = (update as? CompletedExceptionally)?.cause if (state is JobNode) { try { state.invoke(cause) } catch (ex: Throwable) { handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this" , ex)) } } else { state.list?.notifyCompletion(cause) } }
总结
1、调用 join() 函数的协程体会被封装成一个 Node 节点,添加到被等待的协程的 job 的 state.list。
2、被等待的协程执行完成后,会通知对应的 job,job 会在 completeStateFinalization 中遍历 state.list 唤醒外面的等待的协程。
2 async 启动 我们来看看 async 启动的核心逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T> { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else DeferredCoroutine<T>(newContext, active = true ) coroutine.start(start, coroutine, block) return coroutine }
默认情况下:async 也是通过 DEFAULT 方式启动,并且是立刻启动。
协程体的差异 1 block: suspend CoroutineScope.() -> T
这里能看出,async 的 block 是有返回值的。
对应的编译脚本生成的 SuspendLambda 的 invokeSuspend 函数的返回值会被强转为 T 类型!
同时,也会将 invokeSuspend 返回的结果,通过 completion.resumeWith 传递给 Job。
这个和 launch 启动的协程是不一样的,launch 协程返回值是 unit,和 java void 是一个意思。
LAZY 模式通过 start 启动 对于 lazy 方式,async 也是支持的,配合 start() 方法就行了。
这个不多讲了,和 launch 没啥太大区别。
DeferredCoroutine 如果是默认启动类型的话,那么走的是 DeferredCoroutine,LazyDeferredCoroutine 是 DeferredCoroutine 的子类。
1 2 3 4 5 6 7 8 9 10 11 12 @Suppress("UNCHECKED_CAST" ) private open class DeferredCoroutine <T >( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine<T>(parentContext, true , active = active), Deferred<T>, SelectClause1<T> { override fun getCompleted () : T = getCompletedInternal() as T override suspend fun await () : T = awaitInternal() as T override val onAwait: SelectClause1<T> get () = this override fun <R> registerSelectClause1 (select: SelectInstance <R >, block: suspend (T ) -> R ) = registerSelectClause1Internal(select, block) }
可以看到,其支持 await() 方法,其他协程能够通过 await() 实现。
onAwait 和 registerSelectClause1 是配合 select 操作符实现对多个协程体进行监听的逻辑,这里我们不多关注先。
await() 我们看一下 await 是如何拿到 async 的执行结果的,逻辑是不是和 launch 一样呢?
1 2 3 4 5 6 private suspend fun awaitSuspend () : Any? = suspendCoroutineUninterceptedOrReturn { uCont -> val cont = AwaitContinuation(uCont.intercepted(), this ) cont.initCancellability() cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler)) cont.getResult() }
果然是一样的操作。
onAwait 本质上就是一个挂起函数,所以外部协程会挂起。
invokeOnCompletion 依然是创建了一个 Node 节点,加入到当前协程 job 的 state.list 中
最后统一唤醒。
其实 await 就是特殊的 join,join 传递的结果是特殊的 Unit 而已。
3 runBlocking 启动 最后看看 runBlocking,已经 runBlocking 的阻塞线程原理:
1 2 3 suspend fun main () = runBlocking { ... ... ... ... }
核心逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Throws(InterruptedException::class) public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } val currentThread = Thread.currentThread() val contextInterceptor = context[ContinuationInterceptor] val eventLoop: EventLoop? val newContext: CoroutineContext if (contextInterceptor == null ) { eventLoop = ThreadLocalEventLoop.eventLoop newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } ?: ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() }
这里涉及到了一些关键点:
ThreadLocalEventLoop.eventLoop 的原理:BlockingEventLoop;
joinBlocking 的原理;
BlockingCoroutine 的原理;
BlockingEventLoop 其实最终返回的是一个 BlockingEventLoop
1 internal actual fun createEventLoop () : EventLoop = BlockingEventLoop(Thread.currentThread())
BlockingEventLoop 父类里面有一个队列:
1 2 3 internal abstract class EventLoopImplBase : EventLoopImplPlatform(), Delay { private val _queue = atomic<Any?>(null )
回忆下微模型,DispatchedContinuations 对象会被添加到 Dispatcher 中去执行。
其实在 BlockingEventLoop 的场景下,就是被添加到 BlockingEventLoop 的 quue 队列中了。
同时注意 BlockingEventLoop 的线程:currentThread,也就是说,是同步执行。
enqueue dispatch 的过程就是 enqueue 的过程:
1 2 3 4 5 6 7 8 9 10 public final override fun dispatch (context: CoroutineContext, block: Runnable) = enqueue(block)open fun enqueue (task: Runnable) { if (enqueueImpl(task)) { unpark() } else { DefaultExecutor.enqueue(task) } }
joinBlocking 我们看看 joinBlocking 函数,可以看到 joinBlocking 会进入到自旋状态,然后不断的从前面的 queue 中获取任务执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Suppress("UNCHECKED_CAST") fun joinBlocking () : T { registerTimeLoopThread() try { eventLoop?.incrementUseCount() try { while (true ) { @Suppress("DEPRECATION") if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) } val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE if (isCompleted) break parkNanos (this , parkNanos) } } finally { eventLoop?.decrementUseCount() } } finally { unregisterTimeLoopThread() } val state = this .state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } return state as T } override fun processNextEvent () : Long { if (processUnconfinedEvent()) return 0 val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { val now = nanoTime() while (true ) { delayed.removeFirstIf { if (it.timeToExecute(now)) { enqueueImpl(it) } else false } ?: break } } val task = dequeue() if (task != null ) { platformAutoreleasePool { task.run() } return 0 } return nextTime }
所以如果不指定线程,默认就在当前线程,那么是同步执行的!!!
当前线程在执行完 task 后,会再次进入自旋状态,然后阻塞等待!!!
指定 Dispathers 如果指定了 Dispathers 的话呢?那么就没 loop 内部的队列了,那么当前线程会在 eventloop 的 processNextEvent 的地方一直阻塞。
BlockingCoroutine 知道协程体执行完成后,通知 BlockingCoroutine,resumeWith 返回里面在 afterCompletion 后会,unpark blockedThread 线程:
1 2 3 4 5 override fun afterCompletion (state: Any?) { if (Thread.currentThread() != blockedThread) unpark(blockedThread) }
这样,调用 joinBlocking 的线程就会被唤醒了!
4 总结 本篇文章,总结了下协程常见的启动函数的原理,嗯嗯。
不同的启动模式有各自的特色,在特定场景使用才是关键。