Kotlin协程源码分析-8 拦截器

ゝ一纸荒年。 2023-07-19 04:49 130阅读 0赞

前言

协程拦截器ContinuationInterceptor作用:

  1. 线程切换
  2. hook结果(一般用不到)

我们开发Swing或者Android更新UI需要在特定的线程上进行操作,那么这里就涉及线程的切换问题,那么我们看看ContinuationInterceptor在本章中如何使用和源码分析。

简单案例

  1. class MyContinuation() : Continuation<String> {
  2. override val context: CoroutineContext = EmptyCoroutineContext
  3. override fun resumeWith(result: Result<String>) {
  4. log("MyContinuation resumeWith 结果 = ${
  5. result.getOrNull()}")
  6. }
  7. }
  8. suspend fun demo() = suspendCoroutine<String> {
  9. c ->
  10. thread(name = "demo1创建的线程") {
  11. log("demo 调用resume回调")
  12. c.resume("hello")
  13. }
  14. }
  15. suspend fun demo2() = suspendCoroutine<String> {
  16. c ->
  17. thread(name = "demo2创建的线程") {
  18. log("demo2 调用resume回调")
  19. c.resume("world")
  20. }
  21. }
  22. fun testInterceptor() {
  23. // 假设下面的lambda需要在UI线程运行
  24. val suspendLambda = suspend {
  25. log("demo 运行前")
  26. val resultOne = demo()
  27. log("demo 运行后")
  28. val resultTwo = demo2()
  29. log("demo2 运行后")
  30. //拼接结果
  31. resultOne + resultTwo
  32. }
  33. val myContinuation = MyContinuation()
  34. thread(name = "一个新的线程") {
  35. suspendLambda.startCoroutine(myContinuation)
  36. }
  37. }
  38. fun log(msg: String) {
  39. println("[${
  40. Thread.currentThread().name}] ${
  41. msg}")
  42. }

输出:

  1. [一个新的线程] demo 运行前
  2. [demo1创建的线程] demo 调用resume回调
  3. [demo1创建的线程] demo 运行后
  4. [demo2创建的线程] demo2 调用resume回调
  5. [demo2创建的线程] demo2 运行后
  6. [demo2创建的线程] MyContinuation resumeWith 结果 = helloworld

首先我们先我们先明白上面的输出打印的时所在的线程状态。
suspendLambda会在编译时创建一个状态机函数,而这个函数的调用依靠外部调用Continuation.resume函数进行运行,那么这个状态机运行的线程自然会恢复在Continuation.resume调用线程,如果你看完前几章节的内容理解这个并不难。

假设我们期望suspendLambda只运行在Ui线程该如何实现?这里直接使用ContinuationInterceptor即可。

我们这里创建如下对象:

  1. class MyCoroutineDispatch : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
  2. override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
  3. log("interceptContinuation")
  4. return MyInterceptorContinuation<T>(continuation.context, continuation)
  5. }
  6. override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
  7. super.releaseInterceptedContinuation(continuation)
  8. log("releaseInterceptedContinuation " + continuation::class.java.simpleName)
  9. }
  10. class MyInterceptorContinuation<T>(
  11. override val context: CoroutineContext,
  12. val continuation: Continuation<T>
  13. ) : Continuation<T> {
  14. override fun resumeWith(result: Result<T>) {
  15. //获取Android主线程的Looper,进而切换主线程
  16. Handler(Looper.getMainLooper()).post {
  17. log("MyInterceptorContinuation resume")
  18. continuation.resumeWith(result)
  19. }
  20. }
  21. }
  22. }

然后MyContinuation上下文改用MyCoroutineDispatch即可

  1. class MyContinuation() : Continuation<String> {
  2. //这里不在使用空上下文
  3. override val context: CoroutineContext = MyCoroutineDispatch()
  4. override fun resumeWith(result: Result<String>) {
  5. log("MyContinuation resumeWith 结果 = ${
  6. result.getOrNull()}")
  7. }
  8. }

最后看下完整的代码

  1. class MyCoroutineDispatch : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
  2. override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
  3. log("interceptContinuation")
  4. return MyInterceptorContinuation<T>(continuation.context, continuation)
  5. }
  6. override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
  7. super.releaseInterceptedContinuation(continuation)
  8. log("releaseInterceptedContinuation " + continuation::class.java.simpleName)
  9. }
  10. class MyInterceptorContinuation<T>(
  11. override val context: CoroutineContext,
  12. val continuation: Continuation<T>
  13. ) : Continuation<T> {
  14. override fun resumeWith(result: Result<T>) {
  15. //获取Android主线程的Looper,进而切换主线程
  16. Handler(Looper.getMainLooper()).post {
  17. log("MyInterceptorContinuation resume")
  18. continuation.resumeWith(result)
  19. }
  20. }
  21. }
  22. }
  23. class MyContinuation() : Continuation<String> {
  24. //这里不在使用空上下文
  25. override val context: CoroutineContext = MyCoroutineDispatch()
  26. override fun resumeWith(result: Result<String>) {
  27. log("MyContinuation resumeWith 结果 = ${
  28. result.getOrNull()}")
  29. }
  30. }
  31. suspend fun demo() = suspendCoroutine<String> {
  32. c ->
  33. thread(name = "demo1创建的线程") {
  34. log("demo 调用resume回调")
  35. c.resume("hello")
  36. }
  37. }
  38. suspend fun demo2() = suspendCoroutine<String> {
  39. c ->
  40. thread(name = "demo2创建的线程") {
  41. log("demo2 调用resume回调")
  42. c.resume("world")
  43. }
  44. }
  45. fun testInterceptor() {
  46. // 假设下面的lambda需要在UI线程运行
  47. val suspendLambda = suspend {
  48. log("demo 运行前")
  49. val resultOne = demo()
  50. log("demo 运行后")
  51. val resultTwo = demo2()
  52. log("demo2 运行后")
  53. //拼接结果
  54. resultOne + resultTwo
  55. }
  56. val myContinuation = MyContinuation()
  57. thread(name = "一个新的线程") {
  58. suspendLambda.startCoroutine(myContinuation)
  59. }
  60. }
  61. fun log(msg: String) {
  62. Log.e("TAG","[${
  63. Thread.currentThread().name}] ${
  64. msg}")
  65. }

对应输出:

  1. [一个新的线程] interceptContinuation
  2. [main] MyInterceptorContinuation resume
  3. [main] demo 运行前
  4. [demo1创建的线程] demo 调用resume回调
  5. [main] MyInterceptorContinuation resume
  6. [main] demo 运行后
  7. [demo2创建的线程] demo2 调用resume回调
  8. [main] MyInterceptorContinuation resume
  9. [main] demo2 运行后
  10. [main] releaseInterceptedContinuation MyInterceptorContinuation
  11. [main] MyContinuation resumeWith 结果 = helloworld

看到输出后总算看到我们期待的那样suspendLambda输出在main/ui线程

分析源码

MyCoroutineDispatch分析

我们来看MyCoroutineDispatch 类声明

  1. class MyCoroutineDispatch :
  2. AbstractCoroutineContextElement(ContinuationInterceptor),
  3. ContinuationInterceptor {
  4. }

我们继承AbstractCoroutineContextElement类,并实现了ContinuationInterceptor接口,我们分别看看各自的用处。
AbstractCoroutineContextElement 的声明:

  1. public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

可以看到了有实现了Element接口,Element接口是什么?

  1. public interface Element : CoroutineContext {
  2. public val key: Key<*>
  3. public override operator fun <E : Element> get(key: Key<E>): E? =
  4. @Suppress("UNCHECKED_CAST")
  5. if (this.key == key) this as E else null
  6. public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
  7. operation(initial, this)
  8. public override fun minusKey(key: Key<*>): CoroutineContext =
  9. if (this.key == key) EmptyCoroutineContext else this
  10. }

原来这个Element是前面文章介绍上下文相关的东西,简单来说就是可以放入某个协程上下文中的链表存储的对象。而Element本身也是一个上下文对象。在上下文中可以用get函数或者[]操作符获取对应的存储对象。

所以这个MyCoroutineDispatch可以当做上下文使用,并且也可以放入其他上下文存储,自身的key是ContinuationInterceptor。所以他可以放入MyContinuation中做上下文对象。

再看看ContinuationInterceptor

  1. public interface ContinuationInterceptor : CoroutineContext.Element {
  2. companion object Key : CoroutineContext.Key<ContinuationInterceptor>
  3. public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
  4. public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
  5. /* do nothing by default */
  6. }

ContinuationInterceptor是一个拦截规范,interceptContinuation传入一个原始continuation对象,然后返回一个代理的Continuation,然后在代理Continuation中进行现场切换。如果不返回代理continuation,直接返回原始continuation 即可。当状态机结束的时候releaseInterceptedContinuation会被调用,参数是interceptContinuation返回的对象。

获取拦截器流程

回过头再看看下面的代码

  1. fun testInterceptor() {
  2. // 假设下面的lambda需要在UI线程运行
  3. val suspendLambda = suspend {
  4. log("demo 运行前")
  5. val resultOne = demo()
  6. log("demo 运行后")
  7. val resultTwo = demo2()
  8. log("demo2 运行后")
  9. //拼接结果
  10. resultOne + resultTwo
  11. }
  12. val myContinuation = MyContinuation()
  13. thread(name = "一个新的线程") {
  14. suspendLambda.startCoroutine(myContinuation)
  15. }
  16. }

suspendLambda会被转化为SuspendLambdaSuspendLambda有继承ContinuationImpl

那么来看看这个ContinuationImpl

  1. internal abstract class ContinuationImpl(
  2. completion: Continuation<Any?>?,
  3. private val _context: CoroutineContext?
  4. ) : BaseContinuationImpl(completion) {
  5. constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
  6. public override val context: CoroutineContext
  7. get() = _context!!
  8. @Transient
  9. private var intercepted: Continuation<Any?>? = null
  10. public fun intercepted(): Continuation<Any?> =
  11. intercepted
  12. ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
  13. .also {
  14. intercepted = it }
  15. protected override fun releaseIntercepted() {
  16. val intercepted = intercepted
  17. if (intercepted != null && intercepted !== this) {
  18. context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
  19. }
  20. this.intercepted = CompletedContinuation // just in case
  21. }
  22. }

我们注意下context获取方式

  1. internal abstract class ContinuationImpl(
  2. completion: Continuation<Any?>?,
  3. private val _context: CoroutineContext?
  4. ) : BaseContinuationImpl(completion) {
  5. //使用传入completion的上下文作为ContinuationImpl的上下文。
  6. //MyContinuation是completion,而MyContinuation的上下文MyCoroutineDispatch
  7. //MyCoroutineDispatch就是我们创建的拦截器
  8. constructor(completion: Continuation<Any?>?)
  9. : this(completion, completion?.context)//we
  10. public override val context: CoroutineContext
  11. get() = _context!!
  12. }

我们在来看看个函数intercepted

  1. //ContinuationImpl.kt
  2. @Transient
  3. private var intercepted: Continuation<Any?>? = null
  4. public fun intercepted(): Continuation<Any?> =
  5. //如果拦截器为空那么会做如下三步
  6. //1.上下文中获取可以为ContinuationInterceptor的拦截器
  7. //2.调用拦截器interceptContinuation函数获取一个代理Continuation对象。所以拦截器的interceptContinuation只会调用一次
  8. //3.保存拦截器返回的代理Continuation对象后面方便再次获取就不需要再次调用interceptContinuation
  9. intercepted
  10. ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
  11. .also {
  12. //保存获取的拦截器
  13. intercepted = it
  14. }

我们我们最后看看什么时候第一次调用intercepted的代码。

  1. public fun <T> (suspend () -> T).startCoroutine(
  2. completion: Continuation<T>
  3. ) {
  4. createCoroutineUnintercepted(completion).intercepted().resume(Unit)
  5. }

启动协程的时候回获取一次拦截器,然后用拦截器返回代理Continuationresume

再来看看我们的写的拦截器:

  1. class MyCoroutineDispatch : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
  2. //intercepted()第一次调用的会调用到这里
  3. override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
  4. log("interceptContinuation")
  5. //返回一个代理Continuation对象
  6. return MyInterceptorContinuation<T>(continuation.context, continuation)
  7. }
  8. override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
  9. super.releaseInterceptedContinuation(continuation)
  10. log("releaseInterceptedContinuation " + continuation::class.java.simpleName)
  11. }
  12. class MyInterceptorContinuation<T>(
  13. override val context: CoroutineContext,
  14. val continuation: Continuation<T>
  15. ) : Continuation<T> {
  16. override fun resumeWith(result: Result<T>) {
  17. //获取Android主线程的Looper,进而切换主线程
  18. Handler(Looper.getMainLooper()).post {
  19. log("MyInterceptorContinuation resume")
  20. //回调原始的Continuation对象
  21. continuation.resumeWith(result)
  22. }
  23. }
  24. }
  25. }

当调用启动协程的时候回调用拦截器的代理Continuation对象的resumeWith,然后在Ui线程回调原始Continuation对象。

我们再看看我的挂起函数demo又是怎么切换回ui线程

  1. suspend fun demo() = suspendCoroutine<String> {
  2. c ->
  3. thread(name = "demo1创建的线程") {
  4. log("demo 调用resume回调")
  5. c.resume("hello")
  6. }
  7. }

在正常不启用拦截器的情况会回调suspendLambdademo1创建的线程线程回调。但是我们发现启用拦截器后被在ui线程回调。而真正做切换的逻辑在suspendCoroutine这个lambda表达式上。

  1. public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
  2. suspendCoroutineUninterceptedOrReturn {
  3. c: Continuation<T> ->
  4. val safe = SafeContinuation(c.intercepted())//返回拦截器的代理的Continuation对象
  5. block(safe)
  6. safe.getOrThrow()
  7. }

这里我们便知道答案。demo()函数拿到的Continuation会经过一层拦截器代理对象,一切便自然解释的通了。

我们画个图来总结下。
在这里插入图片描述

总的来说:拦截器返回一个代理Continuation对象给挂起函数,当挂起函数恢复的时候,恢复代理Continuationresume函数,最后代理Continuation对象切换指定的线程在回调原始的Continuation对象

发表评论

表情:
评论列表 (有 0 条评论,130人围观)

还没有评论,来说两句吧...

相关阅读