kotlin 协程原理分析 - 协程线程池的原理

0 前言

从结构,原理等多个角度,简单分析下协程分发器和线程池的原理。

这里要说下协程的线程池和 java 线程池是不一样的,协程自己定制化了一套线程池逻辑。

1 Main 分发器

1
2
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

MainDispatcherLoader

通过内部的 dispatcher 获取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)

@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
//【1】可以看到这里通过快速途径:直接反射:kotlinx.coroutines.android.AndroidDispatcherFactory
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
//【2】默认的 SPI 机制实现加载;
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
@Suppress("ConstantConditionIf")
factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}

SPI 通过 ServiceLoader.load(MainDispatcherFactory::class.java) 来:

  • 扫描 classpath 中所有 META-INF/services/ 下的注册文件,并自动实例化所有找到的实现类
  • 返回这些实现的集合
1
kotlinx.coroutines.android.AndroidDispatcherFactory

AndroidDispatcherFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
internal class AndroidDispatcherFactory : MainDispatcherFactory {

//【1】可以看到实际上就是对主线成 Looper 的封装;
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
return HandlerContext(mainLooper.asHandler(async = true))
}

override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}

HandlerContext.dispatch

可以看到就是把协程丢到主线程中执行了:

1
2
3
4
5
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}

挺简单的。

2 Default 分发器

Dispatchers.Default 和 Dispatchers.IO 是复用同一个线程池的,这里先来看 Default 分发器:

DefaultScheduler

1
2
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler

核心在其父类:SchedulerCoroutineDispatcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Instance of Dispatchers.Default
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
// Shuts down the dispatcher, used only by Dispatchers.shutdown()
internal fun shutdown() {
super.close()
}

// Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
override fun close() {
throw UnsupportedOperationException("Dispatchers.Default cannot be closed")
}

override fun toString(): String = "Dispatchers.Default"
}

SchedulerCoroutineDispatcher

可以看到最后是 dispatch 到 CoroutineScheduler 对象内部去处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Instantiated in tests so we can test it in isolation
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() {

override val executor: Executor
get() = coroutineScheduler

// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()

private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

线程池的配置

可以看到这里指定了:corePoolSize 和 maxPoolSize:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@JvmField
internal val CORE_POOL_SIZE = systemProp( //【1】核心线程池数:取决于 cpu 的个数,最小为 2
"kotlinx.coroutines.scheduler.core.pool.size",
AVAILABLE_PROCESSORS.coerceAtLeast(2),
minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)

@JvmField
internal val MAX_POOL_SIZE = systemProp( //【2】最大线程池数:可以视作无限大,1 shl 21 - 2, // 2M threads max
"kotlinx.coroutines.scheduler.max.pool.size",
CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE,
maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
)

internal const val MIN_SUPPORTED_POOL_SIZE = 1
internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2

private const val BLOCKING_SHIFT = 21 // 2M threads max

有什么区别呢,由于 default 和 io 公用一个线程池,所以:

  • cool 用于 default 的,因为 default 用于 cpu 计算任务;
  • max - cool 用于 io 的,因为 io 用于 io 读写任务;

由于 default 和 io 分发器复用了一套逻辑,那么接下来我们直接看 default 的逻辑。

CoroutineScheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
init {
... ... ...
}

@JvmField
val globalCpuQueue = GlobalQueue() // cpu 队列
@JvmField
val globalBlockingQueue = GlobalQueue() // 阻塞队列

}

可以看到,这里是有 2 个队列的:

  • globalCpuQueue 用于存放 cpu 类型的任务;
  • globalBlockingQueue 用于存放阻塞类型的任务;

到底是哪个 queue,取决于是 default 还是 io 分发器。

addToGlobalQueue

可以看到,这里会根据 Task 是否 blocking 添加到对应的 queue 中:

1
2
3
4
5
6
7
8
9
10
private fun addToGlobalQueue(task: Task): Boolean {
return if (task.isBlocking) {
globalBlockingQueue.addLast(task)
} else {
globalCpuQueue.addLast(task)
}
}

// Task 的属性 taskContext 里面会记录是否 blocking。
internal inline val Task.isBlocking get() = taskContext.taskMode == TASK_PROBABLY_BLOCKING

dispatch

协程分发执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask()
//【1】创建一个 task,默认是 NonBlocking 的,返回的是一个 TaskImpl
val task = createTask(block, taskContext)

//【2】尝试获取当前线程的 Worker 对象,Worker 就是线程对象在线程池中的描述。
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) { // 不能 local 就 global
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null

//【2】这里根据不同的任务类型做不同的处理;
// 启动不同的线程执行任务;
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return // cpu 类型
signalCpuWork()
} else {
signalBlockingWork(skipUnpark = skipUnpark) // io 类型
}
}

1、是创建 Task 的逻辑,实际上是一个 TaskImpl 对象。

2、尝试获取当前线程的 Worker 对象:

1
private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

这是判断当前的 thread 是否是线程池的 Worker 线程:

Worker

如下就是 Worker 线程,Thread 子类,注意看:

他内部有个 localQueue,本地任务队列,Worker 会优先执行本地任务队列中的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
internal inner class Worker private constructor() : Thread() {
init {
isDaemon = true
}

... ... ...

inline val scheduler get() = this@CoroutineScheduler

@JvmField
val localQueue: WorkQueue = WorkQueue() //【1】本地任务;

... ... ...
}

继续看:

3、如果是 Worker 线程的话,那么就直接添加到 Worker 内部的 localQueue 中去了,下面【3】可以看到距离的逻辑:

submitToLocalQueue

注意如果是 task 是 cpu 类型的,而线程是用于处理 blocking 任务的,无法添加,直接返回;

因为 cpu 类型的任务需要对任务和 cpu 个数座做校验。

1
2
3
4
5
6
7
8
9
10
11
12
private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
if (this == null) return task
if (state === WorkerState.TERMINATED) return task
//【1】注意如果是 task 是 cpu 类型的,而线程是用于处理 blocking 任务
// 无法添加,直接返回 task;
if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
return task
}
mayHaveLocalTasks = true
//【2】添加到 localQueue;
return localQueue.add(task, fair = tailDispatch)
}

继续看:

4、signalBlockingWork 和 signalCpuWork 区别不大:核心都会走到 tryCreateWorker 方法中:

signalCpuWork

核心就是:tryCreateWorker 的工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
private fun signalBlockingWork(skipUnpark: Boolean) {
val stateSnapshot = incrementBlockingTasks()
if (skipUnpark) return
if (tryUnpark()) return // 尝试唤醒其他 Worker 工作
if (tryCreateWorker(stateSnapshot)) return // 创建 Worker 对象;
tryUnpark()
}

fun signalCpuWork() {
if (tryUnpark()) return
if (tryCreateWorker()) return // 创建 Worker 对象;
tryUnpark()
}
tryCreateWorker

创建 Worker 线程对象。

可以看到这里对 cpu 类型的线程个数做了校验,不能超过 corePoolSize;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private fun tryCreateWorker(state: Long = controlState.value): Boolean {
//【1】再次创建 Worker 对象;
val created = createdWorkers(state)
//【2】计算 blocking 任务的个数;
val blocking = blockingTasks(state)
//【3】计算当前 cpu 线程的个数;
val cpuWorkers = (created - blocking).coerceAtLeast(0)
//【4】有且当 cpu worker 的个数小于 corePoolSize 的时候,才允许创建线程
if (cpuWorkers < corePoolSize) {
val newCpuWorkers = createNewWorker()
if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
if (newCpuWorkers > 0) return true
}
return false
}

那么它是如何计算的呢:

1
private val controlState = atomic (corePoolSize.toLong() shl CPU_PERMITS_SHIFT) 

controlState 是一个原子长整型变量,用于高效地管理线程池中工作线程的状态,这个 long 值的不同位段存储了不同的状态信息:

1
2
| created workers | CPU-acquired workers | blocking workers |
|-----------------|----------------------|------------------|

每个部分占用 BLOCKING_SHIFT 位(通常是 16 位或 32 位)。

blockingTasks 通过位运算的方式获取到了 blocking 线程的个数,

createdWorkers 通过位运算的方式获取到了当前所有的线程个数,相减就是 cpu 的线程个数了。

Worker

最后看看 Worker 的执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
override fun run() = runWorker() 

private fun runWorker() {
var rescanned = false
//【1】不断自旋;
while (!isTerminated && state != WorkerState.TERMINATED) {
//【2】获取 Task
val task = findTask(mayHaveLocalTasks)
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
executeTask(task) //【3】执行 Task;
continue
} else {
mayHaveLocalTasks = false
}

if (minDelayUntilStealableTaskNs != 0L) {
if (!rescanned) {
rescanned = true
} else {
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L
}
continue
}

tryPark()
}
tryReleaseCpu(WorkerState.TERMINATED)
}
findTask

寻找任务:

1
2
3
4
5
6
7
8
9
10
11
12
fun findTask(scanLocalQueue: Boolean): Task? {
//【1】尝试请求 cpu token,如果有权限,就从 cpu queue 中拿 Task;
// 此时执行的是非阻塞任务;
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
//【3】此时执行阻塞任务,先 loacl 后 globalBlockingQueue;
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
}

以上就是 default 线程池的一些逻辑。

3 IO 分发器

Dispatchers.IO 复用同一个线程池的,实现是 DefaultIoScheduler

1
2
@JvmStatic
public val IO: CoroutineDispatcher = DefaultIoScheduler

DefaultIoScheduler

可以看到,内部指定了一个 limitedParallelism 的属性:限制的最大并发个数,默认是 64。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.parallelism"


// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {

//【1】这是创建分发器 LimitedDispatcher,限制了并发个数;
private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)

override val executor: Executor
get() = this

override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
return UnlimitedIoScheduler.limitedParallelism(parallelism)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
//【2】通过 LimitedDispatcher 分发,参数 DispatchedContinuation 对象;
default.dispatch(context, block)
}

... ... ...
}

LimitedDispatcher

LimitedDispatcher 用于限制协程调度器的并行度的一个分发器:

可以看到 dispatch 又交给了 DefaultScheduler 实现,注意此时设置的是 BlockingContext,也是是 blocking 阻塞任务;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//【1】this 是 UnlimitedIoScheduler 实例,可以看到 dispatch 又交给了 DefaultScheduler 实现;
private object UnlimitedIoScheduler : CoroutineDispatcher() {

@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
}
}

// LimitedDispatcher
@ExperimentalCoroutinesApi
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
//【1】分发到 LimitedDispatcher 里面;
return LimitedDispatcher(this, parallelism)
}

我们来看看 LimitedDispatcher 内部的逻辑:

构造器

注意这里通过委托 by 实现了 Delay 功能:

1
2
3
4
5
6
7
8
9
10
11
12
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {

@Volatile
private var runningWorkers = 0 /

private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false) // cas 实现的无锁 queue;

// A separate object that we can synchronize on for K/N
private val workerAllocationLock = SynchronizedObject()

这里看到了一个由by关键字实现的委托属性:

1
Delay by (dispatcher as? Delay ?: DefaultDelay)

表示LimitedDispatcherDelay接口的实现委托给另一个对象。具体来说:

  • 如果dispatcher(即被包装的分发器参数)实现了Delay接口,那么就直接使用dispatcherDelay功能。
  • 如果dispatcher没有实现Delay接口,那么就使用DefaultDelay

DefaultDelay是 Kotlin 协程库中提供的一个默认的Delay实现,通常基于事件循环(例如在 JVM 上使用ScheduledExecutorService)来实现延迟。

dispatch

回过头来,LimitedDispatcher 内部的分发机制如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) {
//【1】使用 DefaultScheduler 分发,复用一套线程池;
dispatcher.dispatch(this, this)
}
}

//【2】这是对任务执行的条件做了一些校验
private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {
if (addAndTryDispatching(block)) return
if (!tryAllocateWorker()) return

//【3】执行分发
dispatch()
}
addAndTryDispatching

先添加到内部的并发队列中,然后判断 runningWorkers 是否超过了 parallelism 指定的数量,如果超过了,那就不会分发;

1
2
3
4
private fun addAndTryDispatching(block: Runnable): Boolean {
queue.addLast(block)
return runningWorkers >= parallelism
}
tryAllocateWorker

分配 Worker 线程,这里只是计数而已,分配在线程池:

1
2
3
4
5
6
7
8
9
// worker分配
private fun tryAllocateWorker(): Boolean {
synchronized(workerAllocationLock) {
if (runningWorkers >= parallelism) return false
//【1】计数+1
++runningWorkers
return true
}
}

最后看看任务执行:

run

LimitedDispatcher 自身会被作为 task 丢到线程之中执行,run 方法会不断的自旋,读取 queue 队列里面 DispatchContinuation 对象执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
override fun run() {
var fairnessCounter = 0
while (true) {
val task = queue.removeFirstOrNull()
if (task != null) {
try {
//【1】执行 DispatchContinuation;
task.run()
} catch (e: Throwable) {
handleCoroutineException(EmptyCoroutineContext, e)
}
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
// Do "yield" to let other views to execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
dispatcher.dispatch(this, this)
return
}
continue
}
//【2】引用计数调整;
synchronized(workerAllocationLock) {
--runningWorkers
if (queue.size == 0) return
++runningWorkers
fairnessCounter = 0
}
}
}

以上就是 IO 分发器的逻辑。

4 Unconfined 分发器

最后看看 Unconfined 分发器:

1
2
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

Unconfined

Unconfined 分发器,表示不明确指定具体的线程。

isDispatchNeeded 这里为 false,那么就会在调用者的线程中执行,也就是在当前线程执行,而不是被分发到其他线程。

所以你可以看到 dispatch 里面没有执行 block: Runnable 的任何代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
internal object Unconfined : CoroutineDispatcher() {

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined")
}

//【1】isDispatchNeeded 这里为 false,那么就会在调用者的线程中执行,也就是在当前线程 执行,而不是被分发到其他线程。
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

override fun dispatch(context: CoroutineContext, block: Runnable) {
val yieldContext = context[YieldContext]
if (yieldContext != null) {
yieldContext.dispatcherWasUnconfined = true
return
}
throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
"If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
"isDispatchNeeded and dispatch calls.")
}

override fun toString(): String = "Dispatchers.Unconfined"
}

不多说了。

文章作者: Li Shuaiqi
文章链接: https://lishuaiqi.top/2025/06/01/kotlin/kotlin-cotoutine-dispatcher_thread/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Li Shuaiqi's Blog