Kotlin协程核心库分析-1 Dispatchers

谁践踏了优雅 2023-07-19 05:00 98阅读 0赞

从本篇文章开始讲解Kotlin标准库的源码分析

请添加如下依赖

  1. implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.4'

Dispatchers

基础概念:Dispatchers 是一个标准库中帮我们封装了切换线程的帮助类,可以简单理解为一个线程池。

  1. public actual object Dispatchers {
  2. @JvmStatic
  3. public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
  4. @JvmStatic
  5. public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
  6. @JvmStatic
  7. public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
  8. public val IO: CoroutineDispatcher = DefaultScheduler.IO
  9. }

上面定义了四种不同的线程池

  1. Default默认的线程池一般用于计算型任务。注意它和IO共享线程池,只不过限制了最大并发数不同
  2. Main 所谓的Ui线程,在Android中进行UI绘制的线程,或者SwinginvokeLater。此处根据平台实现,利用serviceLoader加载
  3. Unconfined 未定义的线程,使用这个启动的协程会立即在当前的线程执行,并且遇到第一个挂起点后根据其挂起点回调线程决定后续的代码在哪运行。(后文再讲)
  4. IO 一个用于经常IO操作的线程池,告并行量。与Default共享线程池

Unconfined 线程池说明

由于这个线程池比较特殊,所以举例说明下,后面直接撸源码。

直接看Demo和输出吧

  1. class MyContinuation() : Continuation<String> {
  2. override val context: CoroutineContext = Dispatchers.Unconfined
  3. override fun resumeWith(result: Result<String>) {
  4. log("MyContinuation resumeWith 结果 = ${
  5. result.getOrNull()}")
  6. }
  7. }
  8. suspend fun demo() = suspendCoroutine<String> {
  9. c ->
  10. thread(name = "自己创建的线程") {
  11. TimeUnit.SECONDS.sleep(1)
  12. log("demo 调用resume回调")
  13. c.resume("hello")
  14. }
  15. }
  16. fun main() {
  17. Thread.currentThread().name = "非Ui线程"
  18. Thread.currentThread().state
  19. val suspendLambda = suspend {
  20. log("demo 运行前")
  21. val resultOne = demo()
  22. log("demo 运行后")
  23. //拼接结果
  24. resultOne
  25. }
  26. SwingUtilities.invokeLater {
  27. }
  28. val myContinuation = MyContinuation()
  29. suspendLambda.startCoroutine(myContinuation)
  30. TimeUnit.HOURS.sleep(1111)
  31. }
  32. fun log(msg: String) {
  33. println("[${
  34. Thread.currentThread().name}] ${
  35. msg}")
  36. }

输出:

  1. [非Ui线程] demo 运行前
  2. [自己创建的线程] demo 调用resume回调
  3. [自己创建的线程] demo 运行后
  4. [自己创建的线程] MyContinuation resumeWith 结果 = hello

由于比较简单我这里直接总结下:
由于Dispatchers.Unconfined未定义线程池,所以执行的时候默认在启动线程。遇到第一个挂起点,之后由调用resume的线程决定恢复协程的线程。这块不理解可以看博主前几篇核心库文章。

CoroutineDispatcher 对象

之前的文章https://blog.csdn.net/qfanmingyiq/article/details/105181027 写到:如果要创建一个线程调度器,需要实现一个ContinuationInterceptor对象。而CoroutineDispatcherkotlin标准对ContinuationInterceptor一个封装。

来看下声明

  1. public abstract class CoroutineDispatcher :
  2. AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
  3. //是否切换线程,这个不重要
  4. public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
  5. //返回一个Continuation代理对象进行切换协程线程。
  6. public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
  7. DispatchedContinuation(this, continuation)
  8. }

我们这里只需要关注一个interceptContinuation函数返回了一个DispatchedContinuation对象,其中包含两个参数,第一个CoroutineDispatcher对象,第二个被代理的对象。

此类的声明(已删减部分代码):

  1. internal class DispatchedContinuation<in T>(
  2. @JvmField val dispatcher: CoroutineDispatcher,
  3. @JvmField val continuation: Continuation<T>//被代理的原始对象
  4. ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
  5. override fun resumeWith(result: Result<T>) {
  6. val context = continuation.context
  7. val state = result.toState()
  8. if (dispatcher.isDispatchNeeded(context)) {
  9. _state = state
  10. resumeMode = MODE_ATOMIC_DEFAULT
  11. //看着,当挂起函数切换线程的时候又把回调给了dispatcher
  12. //dispatcher就是传入的CoroutineDispatcher。
  13. dispatcher.dispatch(context, this)
  14. } else {
  15. executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
  16. withCoroutineContext(this.context, countOrElement) {
  17. continuation.resumeWith(result)
  18. }
  19. }
  20. }
  21. }
  22. }

我们只需要聚焦一行代码而已:

  1. dispatcher.dispatch(context, this)

dispatch(context, this)函数声明:

  1. class CoroutineDispatcher{
  2. fun dispatch(context: CoroutineContext, block: Runnable)
  3. }

我们注意看第二个参数Runable,我们DispatchedContinuation调用时传入的this参数,证明自己实现Runnable接口。

我们看下DispatchedContinuation接口实现链吧:
在这里插入图片描述
这里需要注意DispatchedContinuation继承了Task类,而Task又实现了Runable接口。

在继承链中DispatchedTask是实现了Runable具体方法

  1. internal abstract class DispatchedTask<in T>(
  2. @JvmField public var resumeMode: Int
  3. ) : SchedulerTask(){
  4. //调用dispatcher.dispatch(context, this) 最终会在在某个线程中回调下面的run方法
  5. public final override fun run() {
  6. val taskContext = this.taskContext
  7. var fatalException: Throwable? = null
  8. try {
  9. val delegate = delegate as DispatchedContinuation<T>
  10. val continuation = delegate.continuation
  11. val context = continuation.context
  12. val state = takeState() // NOTE: Must take state in any case, even if cancelled
  13. withCoroutineContext(context, delegate.countOrElement) {
  14. val exception = getExceptionalResult(state)
  15. val job = if (resumeMode.isCancellableMode) context[Job] else null
  16. if (exception == null && job != null && !job.isActive) {
  17. val cause = job.getCancellationException()
  18. cancelResult(state, cause)
  19. continuation.resumeWithStackTrace(cause)
  20. } else {
  21. if (exception != null) continuation.resumeWithException(exception)
  22. //看这 ,回调原始的Continuation对象,完成线程切换
  23. else continuation.resume(getSuccessfulResult(state))
  24. }
  25. }
  26. } catch (e: Throwable) {
  27. // This instead of runCatching to have nicer stacktrace and debug experience
  28. fatalException = e
  29. } finally {
  30. val result = runCatching {
  31. taskContext.afterTask() }
  32. handleFatalException(fatalException, result.exceptionOrNull())
  33. }
  34. }
  35. }

我们可以看到最终run方法会回调原始的Continuation完成一个协程的线程切换。
我们画一个时序图吧(内部Dispatch.IO 也可以是其他的线程池这里只是举例):
在这里插入图片描述

总的来说
Dispatchers内部的几个线程池(IODefault等)只是内部做了一个线程池然后切换回调。

IO 线程池源码说明

这里不贴使用Demo,直接撸源码。如果想测试的同学可以把《Unconfined 线程池说明》这一节的代码稍微改一下即可。

我们看下IO线程池的声明

  1. @JvmStatic
  2. public val IO: CoroutineDispatcher = DefaultScheduler.IO

跟进DefaultScheduler看看

  1. //Tip DefaultScheduler其实也用于 Dispatchers.Default
  2. internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
  3. //Dispatchers.IO实例对象。设置最大的并发数,然后创建一个CoroutineDispatcher返回
  4. val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
  5. }

tip: Dispatchers.Default 其实也是用DefaultScheduler对象,而Dispatchers.IO只是使用DefaultScheduler内部的一个属性对象(IO)而已,本质会共享DefaultScheduler父类创建的线程

继续

  1. public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
  2. //创建一个CoroutineDispatcher,请注意this对象是DefaultScheduler
  3. return LimitingDispatcher(this, parallelism, TASK_PROBABLY_BLOCKING)
  4. }

看下LimitingDispatcher声明:

  1. private class LimitingDispatcher(
  2. val dispatcher: ExperimentalCoroutineDispatcher,
  3. val parallelism: Int,
  4. override val taskMode: Int
  5. ) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
  6. private val queue = ConcurrentLinkedQueue<Runnable>()
  7. private val inFlightTasks = atomic(0)
  8. override val executor: Executor
  9. get() = this
  10. override fun execute(command: Runnable) = dispatch(command, false)
  11. override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")
  12. override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
  13. //线程切换的回调
  14. private fun dispatch(block: Runnable, tailDispatch: Boolean) {
  15. var taskToSchedule = block
  16. while (true) {
  17. val inFlight = inFlightTasks.incrementAndGet()
  18. //判断当前运行的task数量,如果小于数量,直接放入线程池队列中
  19. if (inFlight <= parallelism) {
  20. dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
  21. return
  22. }
  23. // 放入阻塞队列中等候下次取出
  24. queue.add(taskToSchedule)
  25. if (inFlightTasks.decrementAndGet() >= parallelism) {
  26. return
  27. }
  28. taskToSchedule = queue.poll() ?: return
  29. }
  30. }
  31. }

我们只需要关注下面的函数

  1. dispatch(block: Runnable, tailDispatch: Boolean)

函数内部只是判断了最大并发数量,然后如果没有达到最大的并发数量回调构造参数的dispatcherdispatchWithContext函数进行内部的线程切换,这里dispatcher就是DefaultScheduler对象,现在大家理解为什么Dispatchs.IODispatchs.Default共享线程池了。

我们继续看下DefaultScheduler.dispatchWithContext函数对应实现(内部由父亲ExperimentalCoroutineDispatcher实现)。

  1. //ExperimentalCoroutineDispatcher.kt
  2. internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
  3. try {
  4. coroutineScheduler.dispatch(block, context, tailDispatch)
  5. } catch (e: RejectedExecutionException) {
  6. // Context shouldn't be lost here to properly invoke before/after task
  7. DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
  8. }
  9. }

可以看到任务有分发到了一个coroutineScheduler属性对象上

  1. @InternalCoroutinesApi
  2. open class ExperimentalCoroutineDispatcher(
  3. private val corePoolSize: Int,
  4. private val maxPoolSize: Int,
  5. private val idleWorkerKeepAliveNs: Long,
  6. private val schedulerName: String = "CoroutineScheduler"
  7. ) : ExecutorCoroutineDispatcher() {
  8. private var coroutineScheduler = createScheduler()
  9. private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
  10. }

可以看出Dispatchs.IODispatchs.Default核心线程切换逻辑位于CoroutineScheduler这个类上。
我们这里慢慢开始讲讲这个类

CoroutineScheduler 线程池管理对象

我们首先看两个属性globalCpuQueueglobalBlockingQueue,这两个对象是一个集合对象,简单来说就是一个队列,内部存储了线程池要执行的任务,每个任务被封装成Task,我们前面说过DispatchedContinuation继承Task

  1. internal class CoroutineScheduler(
  2. @JvmField val corePoolSize: Int,
  3. @JvmField val maxPoolSize: Int,
  4. @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
  5. @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
  6. ) : Executor, Closeable {
  7. @JvmField
  8. val globalCpuQueue = GlobalQueue()
  9. @JvmField
  10. val globalBlockingQueue = GlobalQueue()
  11. private fun addToGlobalQueue(task: Task): Boolean {
  12. return if (task.isBlocking) {
  13. globalBlockingQueue.addLast(task)
  14. } else {
  15. globalCpuQueue.addLast(task)
  16. }
  17. }

简单看下这个GlobalQueue

  1. internal class GlobalQueue : LockFreeTaskQueue<Task>(singleConsumer = false)

LockFreeTaskQueue<T>:
一个线程安全的链表,用于存放任务队列,内部采用CAS和链表结构完成.CAS使用了kotlin的便捷库https://github.com/Kotlin/kotlinx.atomicfu

  1. val queue = LockFreeTaskQueue<String>(false)
  2. queue.addLast("a")
  3. queue.addLast("b")
  4. val one = queue.removeFirstOrNull()
  5. val two = queue.removeFirstOrNull()
  6. val three = queue.removeFirstOrNull()
  7. println("one ${
  8. one} two ${
  9. two} three ${
  10. three}")

输出:

  1. one a two b three null

CoroutineScheduler既然是一个线程池管理对象,必然内部会创建多个Thread然后休眠自旋保证存活然后唤醒调度任务(线程池基本实现,这块不了解的同学可以看看JDK的实现,kotlin协程实现也差不多)。

CoroutineScheduler创建和管理的Thread是一个Worker对象,简单看下核心的实现方法

  1. internal class CoroutineScheduler{
  2. //注意是一个internal修饰的内部类,主要作用是用来获取globalCpuQueue和globalBlockingQueue队列任务,然后执行
  3. internal inner class Worker private constructor() : Thread() {
  4. init {
  5. //必然是个守护进程
  6. isDaemon = true
  7. }
  8. //转到runWorker()
  9. override fun run() = runWorker()
  10. //内部要进行自旋转休眠,有任务的时候唤醒
  11. private fun runWorker() {
  12. var rescanned = false
  13. //一个while自旋转
  14. while (!isTerminated && state != WorkerState.TERMINATED) {
  15. //取出CoroutineScheduler的globalCpuQueue和globalBlockingQueue任务
  16. val task = findTask(mayHaveLocalTasks)
  17. if (task != null) {
  18. rescanned = false
  19. minDelayUntilStealableTaskNs = 0L
  20. //队列不为空那么取出task运行,task就是DispatchedContinuation对象
  21. //这里会回调DispatchedContinuation的run方法。这里进步跟进去了
  22. executeTask(task)
  23. continue
  24. } else {
  25. mayHaveLocalTasks = false
  26. }
  27. if (minDelayUntilStealableTaskNs != 0L) {
  28. if (!rescanned) {
  29. rescanned = true
  30. } else {
  31. rescanned = false
  32. tryReleaseCpu(WorkerState.PARKING)
  33. interrupted()
  34. LockSupport.parkNanos(minDelayUntilStealableTaskNs)
  35. minDelayUntilStealableTaskNs = 0L
  36. }
  37. continue
  38. }
  39. //休眠线城市内部采用LockSupport.park()休眠,在外部也会在适当时候
  40. //LockSupport.unPark()唤醒线程
  41. tryPark()
  42. }
  43. tryReleaseCpu(WorkerState.TERMINATED)
  44. }
  45. }
  46. }

CoroutineScheduler 调度流程分析

我们前面讲到 任务调度会最终调用到CoroutineScheduler.dispatch函数

  1. //ExperimentalCoroutineDispatcher.kt
  2. internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
  3. try {
  4. coroutineScheduler.dispatch(block, context, tailDispatch)
  5. } catch (e: RejectedExecutionException) {
  6. DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
  7. }
  8. }

跟进

  1. internal class CoroutineScheduler(
  2. @JvmField val corePoolSize: Int,
  3. @JvmField val maxPoolSize: Int,
  4. @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
  5. @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
  6. ) : Executor, Closeable {
  7. fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
  8. trackTask()
  9. val task = createTask(block, taskContext)
  10. val currentWorker = currentWorker()
  11. val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
  12. if (notAdded != null) {
  13. //把Task放入CoroutineScheduler的队列中
  14. if (!addToGlobalQueue(notAdded)) {
  15. throw RejectedExecutionException("$schedulerName was terminated")
  16. }
  17. }
  18. val skipUnpark = tailDispatch && currentWorker != null
  19. if (task.mode == TASK_NON_BLOCKING) {
  20. if (skipUnpark) return
  21. signalCpuWork()
  22. } else {
  23. // 然后唤醒或者创建一个线程运行Task的run函数
  24. signalBlockingWork(skipUnpark = skipUnpark)
  25. }
  26. }
  27. }

上面的函数简单就是下面的代码

  1. internal class CoroutineScheduler(
  2. @JvmField val corePoolSize: Int,
  3. @JvmField val maxPoolSize: Int,
  4. @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
  5. @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
  6. ) : Executor, Closeable {
  7. fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
  8. //添加一个任务到队列
  9. addToGlobalQueue(block)
  10. // 然后唤醒或者创建一个线程运行Task的run函数
  11. signalBlockingWork(skipUnpark = skipUnpark)
  12. }
  13. }

看下比较重要signalBlockingWork函数

  1. internal class CoroutineScheduler(
  2. @JvmField val corePoolSize: Int,
  3. @JvmField val maxPoolSize: Int,
  4. @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
  5. @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
  6. ) : Executor, Closeable {
  7. private fun signalBlockingWork(skipUnpark: Boolean) {
  8. val stateSnapshot = incrementBlockingTasks()
  9. if (skipUnpark) return
  10. //由于此时CoroutineScheduler并没有一个Thread所以此处不会唤醒线程所以返回false
  11. if (tryUnpark()) return
  12. //创建一个thread去执行task
  13. if (tryCreateWorker(stateSnapshot)) return
  14. tryUnpark()
  15. }
  16. }

最后看看tryCreateWorker

  1. internal class CoroutineScheduler{
  2. private fun tryCreateWorker(state: Long = controlState.value): Boolean {
  3. val created = createdWorkers(state)
  4. val blocking = blockingTasks(state)
  5. val cpuWorkers = (created - blocking).coerceAtLeast(0)
  6. if (cpuWorkers < corePoolSize) {
  7. //创建一个线程
  8. val newCpuWorkers = createNewWorker()
  9. //在创建一个,用于任务窃取,可以类比forkjoin,反正可以简单理解就是一个优化机制,提高效率所以多创建一个线程
  10. if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
  11. if (newCpuWorkers > 0) return true
  12. }
  13. return false
  14. }
  15. /**
  16. *创建一个对象Worker对象然后启动这个线程
  17. **/
  18. private fun createNewWorker(): Int {
  19. synchronized(workers) {
  20. if (isTerminated) return -1
  21. val state = controlState.value
  22. val created = createdWorkers(state)
  23. val blocking = blockingTasks(state)
  24. val cpuWorkers = (created - blocking).coerceAtLeast(0)
  25. if (cpuWorkers >= corePoolSize) return 0
  26. if (created >= maxPoolSize) return 0
  27. val newIndex = createdWorkers + 1
  28. require(newIndex > 0 && workers[newIndex] == null)
  29. //创建一个Thread
  30. val worker = Worker(newIndex)
  31. workers[newIndex] = worker
  32. require(newIndex == incrementCreatedWorkers())
  33. //启动线程
  34. worker.start()
  35. return cpuWorkers + 1
  36. }
  37. }
  38. }

之后便是在指定的线程中回调Task(DispatchedContinuation)run函数。

我们总结下DispatchersIODefault就是把Task放入CoroutineScheduler的队列中,然后由指定的线程调度Taskrun函数。

请注意DispatchersIODefault共享线程池,只是运行并发数不同。

发表评论

表情:
评论列表 (有 0 条评论,98人围观)

还没有评论,来说两句吧...

相关阅读