0 前言 从结构,原理等多个角度,简单分析下协程分发器和线程池的原理。
这里要说下协程的线程池和 java 线程池是不一样的,协程自己定制化了一套线程池逻辑。
1 Main 分发器 1 2 @JvmStatic public actual val Main: MainCoroutineDispatcher get () = MainDispatcherLoader.dispatcher
MainDispatcherLoader 通过内部的 dispatcher 获取:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true )@JvmField val dispatcher: MainCoroutineDispatcher = loadMainDispatcher() private fun loadMainDispatcher () : MainCoroutineDispatcher { return try { val factories = if (FAST_SERVICE_LOADER_ENABLED) { FastServiceLoader.loadMainDispatcherFactory() } else { ServiceLoader.load( MainDispatcherFactory::class.java, MainDispatcherFactory::class.java.classLoader ).iterator().asSequence().toList() } @Suppress("ConstantConditionIf") factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories) ?: createMissingDispatcher() } catch (e: Throwable) { createMissingDispatcher(e) } }
SPI 通过 ServiceLoader.load(MainDispatcherFactory::class.java) 来:
扫描 classpath 中所有 META-INF/services/ 下的注册文件,并自动实例化所有找到的实现类
返回这些实现的集合
1 kotlinx.coroutines.android.AndroidDispatcherFactory
AndroidDispatcherFactory 1 2 3 4 5 6 7 8 9 10 11 12 13 internal class AndroidDispatcherFactory : MainDispatcherFactory { override fun createDispatcher (allFactories: List<MainDispatcherFactory>) : MainCoroutineDispatcher { val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available" ) return HandlerContext(mainLooper.asHandler(async = true )) } override fun hintOnError () : String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used" override val loadPriority: Int get () = Int.MAX_VALUE / 2 }
HandlerContext.dispatch 可以看到就是把协程丢到主线程中执行了:
1 2 3 4 5 override fun dispatch (context: CoroutineContext, block: Runnable) { if (!handler.post(block)) { cancelOnRejection(context, block) } }
挺简单的。
2 Default 分发器 Dispatchers.Default 和 Dispatchers.IO 是复用同一个线程池的,这里先来看 Default 分发器:
DefaultScheduler 1 2 @JvmStatic public actual val Default: CoroutineDispatcher = DefaultScheduler
核心在其父类:SchedulerCoroutineDispatcher
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 internal object DefaultScheduler : SchedulerCoroutineDispatcher( CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME ) { internal fun shutdown () { super .close() } override fun close () { throw UnsupportedOperationException("Dispatchers.Default cannot be closed" ) } override fun toString () : String = "Dispatchers.Default" }
SchedulerCoroutineDispatcher 可以看到最后是 dispatch 到 CoroutineScheduler 对象内部去处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 internal open class SchedulerCoroutineDispatcher ( private val corePoolSize: Int = CORE_POOL_SIZE, private val maxPoolSize: Int = MAX_POOL_SIZE, private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, private val schedulerName: String = "CoroutineScheduler" , ) : ExecutorCoroutineDispatcher() { override val executor: Executor get () = coroutineScheduler private var coroutineScheduler = createScheduler() private fun createScheduler () = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) override fun dispatch (context: CoroutineContext, block: Runnable) : Unit = coroutineScheduler.dispatch(block)
线程池的配置 可以看到这里指定了:corePoolSize 和 maxPoolSize:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @JvmField internal val CORE_POOL_SIZE = systemProp( "kotlinx.coroutines.scheduler.core.pool.size" , AVAILABLE_PROCESSORS.coerceAtLeast(2 ), minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE ) @JvmField internal val MAX_POOL_SIZE = systemProp( "kotlinx.coroutines.scheduler.max.pool.size" , CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE, maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE ) internal const val MIN_SUPPORTED_POOL_SIZE = 1 internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2 private const val BLOCKING_SHIFT = 21
有什么区别呢,由于 default 和 io 公用一个线程池,所以:
cool 用于 default 的,因为 default 用于 cpu 计算任务;
max - cool 用于 io 的,因为 io 用于 io 读写任务;
由于 default 和 io 分发器复用了一套逻辑,那么接下来我们直接看 default 的逻辑。
CoroutineScheduler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 internal class CoroutineScheduler ( @JvmField val corePoolSize: Int , @JvmField val maxPoolSize: Int , @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { init { ... ... ... } @JvmField val globalCpuQueue = GlobalQueue() @JvmField val globalBlockingQueue = GlobalQueue() }
可以看到,这里是有 2 个队列的:
globalCpuQueue 用于存放 cpu 类型的任务;
globalBlockingQueue 用于存放阻塞类型的任务;
到底是哪个 queue,取决于是 default 还是 io 分发器。
addToGlobalQueue 可以看到,这里会根据 Task 是否 blocking 添加到对应的 queue 中:
1 2 3 4 5 6 7 8 9 10 private fun addToGlobalQueue (task: Task ) : Boolean { return if (task.isBlocking) { globalBlockingQueue.addLast(task) } else { globalCpuQueue.addLast(task) } } internal inline val Task.isBlocking get () = taskContext.taskMode == TASK_PROBABLY_BLOCKING
dispatch 协程分发执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 fun dispatch (block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false ) { trackTask() val task = createTask(block, taskContext) val currentWorker = currentWorker() val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null ) { if (!addToGlobalQueue(notAdded)) { throw RejectedExecutionException("$schedulerName was terminated" ) } } val skipUnpark = tailDispatch && currentWorker != null if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return signalCpuWork() } else { signalBlockingWork(skipUnpark = skipUnpark) } }
1、是创建 Task 的逻辑,实际上是一个 TaskImpl 对象。
2、尝试获取当前线程的 Worker 对象:
1 private fun currentWorker () : Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
这是判断当前的 thread 是否是线程池的 Worker 线程:
Worker 如下就是 Worker 线程,Thread 子类,注意看:
他内部有个 localQueue,本地任务队列,Worker 会优先执行本地任务队列中的任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 internal inner class Worker private constructor () : Thread() { init { isDaemon = true } ... ... ... inline val scheduler get () = this @CoroutineScheduler @JvmField val localQueue: WorkQueue = WorkQueue() ... ... ... }
继续看:
3、如果是 Worker 线程的话,那么就直接添加到 Worker 内部的 localQueue 中去了,下面【3】可以看到距离的逻辑:
submitToLocalQueue 注意如果是 task 是 cpu 类型的,而线程是用于处理 blocking 任务的,无法添加,直接返回;
因为 cpu 类型的任务需要对任务和 cpu 个数座做校验。
1 2 3 4 5 6 7 8 9 10 11 12 private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? { if (this == null ) return task if (state === WorkerState.TERMINATED) return task if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) { return task } mayHaveLocalTasks = true return localQueue.add(task, fair = tailDispatch) }
继续看:
4、signalBlockingWork 和 signalCpuWork 区别不大:核心都会走到 tryCreateWorker 方法中:
signalCpuWork 核心就是:tryCreateWorker 的工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 private fun signalBlockingWork (skipUnpark: Boolean) { val stateSnapshot = incrementBlockingTasks() if (skipUnpark) return if (tryUnpark()) return if (tryCreateWorker(stateSnapshot)) return tryUnpark() } fun signalCpuWork () { if (tryUnpark()) return if (tryCreateWorker()) return tryUnpark() }
tryCreateWorker 创建 Worker 线程对象。
可以看到这里对 cpu 类型的线程个数做了校验,不能超过 corePoolSize;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private fun tryCreateWorker (state: Long = controlState.value) : Boolean { val created = createdWorkers(state) val blocking = blockingTasks(state) val cpuWorkers = (created - blocking).coerceAtLeast(0 ) if (cpuWorkers < corePoolSize) { val newCpuWorkers = createNewWorker() if (newCpuWorkers == 1 && corePoolSize > 1 ) createNewWorker() if (newCpuWorkers > 0 ) return true } return false }
那么它是如何计算的呢:
1 private val controlState = atomic (corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
controlState 是一个原子长整型变量,用于高效地管理线程池中工作线程的状态,这个 long 值的不同位段存储了不同的状态信息:
1 2 | created workers | CPU-acquired workers | blocking workers | |-----------------|----------------------|------------------|
每个部分占用 BLOCKING_SHIFT 位(通常是 16 位或 32 位)。
blockingTasks 通过位运算的方式获取到了 blocking 线程的个数,
createdWorkers 通过位运算的方式获取到了当前所有的线程个数,相减就是 cpu 的线程个数了。
Worker 最后看看 Worker 的执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 override fun run () = runWorker() private fun runWorker () { var rescanned = false while (!isTerminated && state != WorkerState.TERMINATED) { val task = findTask(mayHaveLocalTasks) if (task != null ) { rescanned = false minDelayUntilStealableTaskNs = 0L executeTask(task) continue } else { mayHaveLocalTasks = false } if (minDelayUntilStealableTaskNs != 0L ) { if (!rescanned) { rescanned = true } else { rescanned = false tryReleaseCpu (WorkerState.PARKING) interrupted() LockSupport.parkNanos(minDelayUntilStealableTaskNs) minDelayUntilStealableTaskNs = 0L } continue } tryPark() } tryReleaseCpu(WorkerState.TERMINATED) }
findTask 寻找任务:
1 2 3 4 5 6 7 8 9 10 11 12 fun findTask (scanLocalQueue: Boolean) : Task? { if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) val task = if (scanLocalQueue) { localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull() } return task ?: trySteal(blockingOnly = true ) }
以上就是 default 线程池的一些逻辑。
3 IO 分发器 Dispatchers.IO 复用同一个线程池的,实现是 DefaultIoScheduler
1 2 @JvmStatic public val IO: CoroutineDispatcher = DefaultIoScheduler
DefaultIoScheduler 可以看到,内部指定了一个 limitedParallelism 的属性:限制的最大并发个数,默认是 64。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.parallelism" internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { private val default = UnlimitedIoScheduler.limitedParallelism( systemProp( IO_PARALLELISM_PROPERTY_NAME, 64. coerceAtLeast(AVAILABLE_PROCESSORS) ) ) override val executor: Executor get () = this override fun execute (command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command) @ExperimentalCoroutinesApi override fun limitedParallelism (parallelism: Int) : CoroutineDispatcher { return UnlimitedIoScheduler.limitedParallelism(parallelism) } override fun dispatch (context: CoroutineContext, block: Runnable) { default .dispatch(context, block) } ... ... ... }
LimitedDispatcher LimitedDispatcher 用于限制协程调度器的并行度的一个分发器:
可以看到 dispatch 又交给了 DefaultScheduler 实现,注意此时设置的是 BlockingContext,也是是 blocking 阻塞任务;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private object UnlimitedIoScheduler : CoroutineDispatcher() { @InternalCoroutinesApi override fun dispatchYield (context: CoroutineContext, block: Runnable) { DefaultScheduler.dispatchWithContext(block, BlockingContext, true ) } override fun dispatch (context: CoroutineContext, block: Runnable) { DefaultScheduler.dispatchWithContext(block, BlockingContext, false ) } } @ExperimentalCoroutinesApi public open fun limitedParallelism (parallelism: Int) : CoroutineDispatcher { parallelism.checkParallelism() return LimitedDispatcher(this , parallelism) }
我们来看看 LimitedDispatcher 内部的逻辑:
构造器 注意这里通过委托 by 实现了 Delay 功能:
1 2 3 4 5 6 7 8 9 10 11 12 internal class LimitedDispatcher ( private val dispatcher: CoroutineDispatcher, private val parallelism: Int ) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) { @Volatile private var runningWorkers = 0 / private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false ) private val workerAllocationLock = SynchronizedObject()
这里看到了一个由by关键字实现的委托属性:
1 Delay by (dispatcher as? Delay ?: DefaultDelay)
表示LimitedDispatcher将Delay接口的实现委托给另一个对象。具体来说:
如果dispatcher(即被包装的分发器参数)实现了Delay接口,那么就直接使用dispatcher的Delay功能。
如果dispatcher没有实现Delay接口,那么就使用DefaultDelay。
DefaultDelay是 Kotlin 协程库中提供的一个默认的Delay实现,通常基于事件循环(例如在 JVM 上使用ScheduledExecutorService)来实现延迟。
dispatch 回过头来,LimitedDispatcher 内部的分发机制如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 override fun dispatch (context: CoroutineContext, block: Runnable) { dispatchInternal(block) { dispatcher.dispatch(this , this ) } } private inline fun dispatchInternal (block: Runnable, dispatch: () -> Unit) { if (addAndTryDispatching(block)) return if (!tryAllocateWorker()) return dispatch() }
addAndTryDispatching 先添加到内部的并发队列中,然后判断 runningWorkers 是否超过了 parallelism 指定的数量,如果超过了,那就不会分发;
1 2 3 4 private fun addAndTryDispatching (block: Runnable) : Boolean { queue.addLast(block) return runningWorkers >= parallelism }
tryAllocateWorker 分配 Worker 线程,这里只是计数而已,分配在线程池:
1 2 3 4 5 6 7 8 9 private fun tryAllocateWorker () : Boolean { synchronized (workerAllocationLock) { if (runningWorkers >= parallelism) return false ++runningWorkers return true } }
最后看看任务执行:
run LimitedDispatcher 自身会被作为 task 丢到线程之中执行,run 方法会不断的自旋,读取 queue 队列里面 DispatchContinuation 对象执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 override fun run () { var fairnessCounter = 0 while (true ) { val task = queue.removeFirstOrNull() if (task != null ) { try { task.run() } catch (e: Throwable) { handleCoroutineException(EmptyCoroutineContext, e) } if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this )) { dispatcher.dispatch(this , this ) return } continue } synchronized (workerAllocationLock) { --runningWorkers if (queue.size == 0 ) return ++runningWorkers fairnessCounter = 0 } } }
以上就是 IO 分发器的逻辑。
4 Unconfined 分发器 最后看看 Unconfined 分发器:
1 2 @JvmStatic public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
Unconfined Unconfined 分发器,表示不明确指定具体的线程。
isDispatchNeeded 这里为 false,那么就会在调用者的线程中执行,也就是在当前线程 执行,而不是被分发到其他线程。
所以你可以看到 dispatch 里面没有执行 block: Runnable 的任何代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 internal object Unconfined : CoroutineDispatcher() { @ExperimentalCoroutinesApi override fun limitedParallelism (parallelism: Int) : CoroutineDispatcher { throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined" ) } override fun isDispatchNeeded (context: CoroutineContext) : Boolean = false override fun dispatch (context: CoroutineContext, block: Runnable) { val yieldContext = context[YieldContext] if (yieldContext != null ) { yieldContext.dispatcherWasUnconfined = true return } throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " + "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " + "isDispatchNeeded and dispatch calls." ) } override fun toString () : String = "Dispatchers.Unconfined" }
不多说了。