Kotlin协程在Android中的挂起流程

3460次阅读  |  发布于3年以前

我们知道Android实现异步有很多方案,比如AsyncTask,Executor,RxJava等,它们在复杂业务场景下使用起来比较复杂,代码可读性差,容易出现问题。而Kotlin协程使用更简单,性能更好,比上述方案更有优势,其中很大部分原因是因为其实现了协程挂起。理解协程的挂起流程,对我们正确使用协程,灵活使用协程都非常有帮助。

协程简介

协程源自Simula和Modula-2语言,它是一种编程思想,并不局限于特定的语言,在1958年的时候,Melvin Edward Conway提出这个术语并用于构建汇编程序。

协程是一个轻量级的线程,能提高大型项目中的异步任务的开发效率。Kotlin协程在不同的平台实现的原理并不相同,要在Android中用好协程,就需要理解其在Android上的设计思路。

在Android中,通过Kotlin 1.3中引入协程库,开始支持协程。

Android官方文档的协程定义如下:

❝协程是一种并发设计模式,您能够在 Android 平台上应用它来简化异步执行的代码

举个例子,异步顺序执行的3个网络请求,返回结果后,在主线程更新UI 的代码对比:

fun test1() { //Kotlin普通版异步实现
  Thread() { //切换到子线程
    request1(param) { value1 -> //第一个异步请求返回结果
      request2(value1) { value2 -> //第二个异步请求返回结果
        request3(value2) { value3 -> //第三个异步请求返回结果
          Handler mainHandler = new Handler(Looper.getMainLooper());
          mainHandler.post(new Runnable() {//切换回主线程
            @Override
            public void run() { //更新UI
              updateUI(value3)
            }
          });   
        } 
      }
    }
  }
} 

fun test2() { //Kotlin协程版异步实现
   CoroutineScope(Dispatchers.IO).launch { //切换到子线程
     val value1 = async{request1(param)}.await() //第一个异步请求返回结果
     val value2 = async{request2(value1)}.await() //第二个异步请求返回结果
     val value3 = async{request3(value2)}.await() //第三个异步请求返回结果
     launch(Dispatchers.Main) { //切换回主线程
        updateUI(value3) //更新UI
     } 
   }
}     

test1 是普通版本的异步请求更新,test2是协程版本,实现的功能完全一样,但协程版本明显更简洁,可读性和可维护性大大提高了。可见协程是一种异步设计,协程的设计目标,就是简化异步执行的代码,消除传统的Callback回调,以类同步代码的形式,进行异步编程。

协程中的核心概念和API

通过上一节介绍,我们了解了协程是什么。这一节我们需要进一步来了解协程中的几个核心概念。它们分别是作用域,创建协程/子协程,派发器,和挂起函数。

作用域

协程的作用域指定了协程机制运转的代码区块范围。在协程的源代码中,通过CoroutineScope来指定协程的作用域。根据使用场景的不同,对CoroutineScope扩展出其它几个作用域接口。

作用域 意义 解释
CoroutineScope 创建协程的局部作用域 局部协程作用域,可以指定派发器Dispatcher,并且通过调用返回的Job对象的cancel()方法,可以取消该scope下所有正在进行的任务
GlobalScope 创建协程的全局作用域 该API启动的协程为顶层协程,是个单例协程作用域,没有父任务,且该scope没有Job对象,所以无法对整个scope执行cancel()操作,所以需要手动管理内部的每个协程
MainScope 创建协程的局部作用域,且指定Dispatcher.Main CoroutineScope和Dispatcher.Main组合的协程作用域,会指定派发到Dispatcher.Main中执行,可以通过调用返回的Job对象管理协程
runBlocking 创建协程的局部作用域,且阻塞协程所在线程等待结果 局部协程作用域,可以指定派发器Dispatcher,会阻塞调用者所在的线程,直到协程内部返回结果。且该scope没有返回Job对象用于协程管理

创建和启动协程/子协程

在一个协程内部可以创建新协程。创建的协程与外部的协程成为父子关系,他们之间可以联动进行cancel、异常处理等协同管理。创建协程的API是CoroutineScope的扩展函数,所以协程和子协程必须在协程作用域中被创建。协程创建后会马上启动,所以我们会经常把创建和启动协程当做一个意思。

创建协程 意义 解释
launch 创建一个Job协程,或子协程 创建一个新协程,可以通过返回的Job管理协程
async 创建一个Deferred协程,或子协程 和launch的区别是,Deferred可以用来等待该协程执行完毕的返回值

线程派发器

在协程作用域内做到异步执行任务,需要有派发器将异步任务派发到不同的线程执行。Kotlin库默认提供了4个默认的派发器,都放在Dispatchers类下:

派发器 意义 解释
Dispatchers.Main 任务派发到Android主线程中执行 通过MainLooper的handler来向主线程派发任务到主线程执行
Dispatchers.Default 任务派发到默认的工作线程池中执行 协程内部实现的Excutor线程池,核心线程数和最大线程数依赖CPU数量,适用于计算类耗时任务调度
Dispatchers.IO 任务派发到工作线程池中执行 IO和Default共享线程池,但运行并发数不同,支持最大并行任务数,适用IO任务调度
Dispatchers.Unconfined 任务会根据上下文环境,派发到指定线程or线程池执行 无指定派发线程,会根据运行时的上下文环境决定。

挂起函数

挂起函数,是指把协程代码挂起不继续执行的函数,也叫协程被函数挂起了。协程中调用挂起函数时,协程所在的线程不会挂起也不会阻塞,但是协程被挂起了。也就是说,协程内挂起函数之后的代码停止执行了,直到挂起函数完成后恢复协程,协程才继续执行后续的代码。所有挂起函数都会通过suspend修饰符修饰。

挂起函数 意义 解释
join 挂起当前协程,直到等待的子协程执行完毕 通过当前协程返回的Job接口的join方法,可以单纯的挂起当前协程,等待子协程完成后再恢复继续执行
await 挂起当前协程,直到等待的子协程返回结果 和join的区别是,它属于Job接口的子接口Deferred的方法,可以等待子协程完成后,带着返回值恢复当前协程
delay 挂起当前协程,直到指定时间后恢复当前协程 单纯挂起当前协程,指定时长后恢复协程执行
withContext() 挂起外部协程,直到自己内部协程全部返回后,才会恢复外部的协程。 没有创建新的协程,在指定协程上运行挂起代码块,并挂起该协程直至代码块运行完成并返回结果。类似async.await的效果

协程挂起流程详解

从上面的介绍和示例,我们了解了协程的核心概念和API,也看到协程实现了用同步代码的形式实现了异步编程。协程实现异步的核心原理就是通过挂起函数实现协程体的挂起,还不阻塞协程体所在的线程。

那我们进一步来看看,协程实现异步的具体流程。我们通过一个示例来认识协程的工作过程,和协程挂起的效果:

fun testInMain() {
    Log.d("["+Thread.currentThread().name+"]testInMain start")
    var job = CoroutineScope(Dispatchers.Main).launch { //启动协程job
        Log.d("[" + Thread.currentThread().name+"]job start")
        var job1 = async(Dispatchers.IO) { //启动协程job1
            Log.d("["+Thread.currentThread().name+"]job1 start")
            delay(3000) //挂起job1协程 3秒
            Log.d("["+Thread.currentThread().name+"]job1 end ")
            "job1-Return"
        } //job1协程 续体执行完毕

        var job2 = async(Dispatchers.Default) {
            Log.d("["+Thread.currentThread().name+"]job2 start" )
            delay(1000) //挂起job2协程 1秒
            Log.d("["+Thread.currentThread().name+"]job2 end")
            "job2-Return"
        } //job2协程 续体执行完毕

        Log.d("["+Thread.currentThread().name+"]before job1 return")
        Log.d("["+Thread.currentThread().name+"]job1 result = " + job1.await()) //挂起job协程,等待job1返回结果;如果已有结果,不挂起,直接返回

Log.d("["+Thread.currentThread().name+"]before job2 return")
        Log.d("["+Thread.currentThread().name+"]job2 result = " + job2.await()) //挂起job协程,等待job2返回结果;如果已有结果,不挂起,直接返回

        Log.d("["+Thread.currentThread().name+"]job end ")
    } //job协程 续体执行完毕

    Log.d("["+Thread.currentThread().name+"]testInMain end")
} //testInMain

示例代码的log输出如下,我们需要重点关注Log输出的次序,和时间间隔:

10:15:04.046 26079-26079/com.example.myapplication D/TC: [main]testInMain start
10:15:04.067 26079-26079/com.example.myapplication D/TC: [main]testInMain end
10:15:04.080 26079-26079/com.example.myapplication D/TC: 
[main]job start
10:15:04.083 26079-26079/com.example.myapplication D/TC:
[main]before job1 return
10:15:04.086 26079-26683/com.example.myapplication D/TC: [DefaultDispatcher-worker-1]job1 start 
10:15:04.087 26079-26684/com.example.myapplication D/TC: [DefaultDispatcher-worker-2]job2 start
10:15:05.090 26079-26683/com.example.myapplication D/TC: [DefaultDispatcher-worker-1]job2 end
10:15:05.095 26079-26079/com.example.myapplication D/TC: 
[main]button-2 onclick now
10:15:07.090 26079-26685/com.example.myapplication D/TC: [DefaultDispatcher-worker-3]job1 end 
10:15:07.091 26079-26079/com.example.myapplication D/TC: 
[main]job1 result = job1-Return
10:15:07.091 26079-26079/com.example.myapplication D/TC:
[main]before job2 return
10:15:07.091 26079-26079/com.example.myapplication D/TC: 
[main]job2 result = job2-Return
10:15:07.091 26079-26079/com.example.myapplication D/TC: 
[main]job end

通过输出,我们可以了解到示例代码的具体运行情况:

至此,testInMain中所有的相关协程运行完毕,比testInMain函数晚约3.5秒(04.067 - 07.091)后结束。

上述代码的运行过程,我们根据执行片段梳理,可以进一步简化为图,方便大家理解:

图片

从图中,我们可以清晰的得到几点结论:

  1. job协程内部,通过await 阻塞了后续代码的执行。job1和job2协程,通过delay阻塞了后续代码的执行。
  2. 协程job1,job2 启动后,保持并行执行。job2 并没有等待job1执行完才启动执行和恢复,而是在各自线程并行执行。
  3. job的后续代码被await 阻塞后,并没有阻塞main线程,main线程中其它模块的代码能同时被执行,并打印出"[main]button 2 onclick now"。
  4. job1 被delay阻塞后续代码执行时,并没有阻塞所在线程[DefaultDispatcher-worker-1],job2中的后续代码被恢复到此[DefaultDispatcher-worker-1]子线程中执行。
  5. job1 和 job2 协程在恢复执行时,并不能确保在原线程中执行后续代码。如log所示,job2在DefaultDispatcher-worker-2中启动和阻塞后,却在DefaultDispatcher-worker-1中恢复了后续的代码执行。

所以可以看出,协程的挂起,并不会阻塞协程所在的线程,而只是中断了协程后面的代码执行。然后等待挂起函数完成后,恢复协程的后续代码执行。这就是协程挂起最最基本的关键点。

从示例我们知道协程的挂起,是通过await挂起函数实现的,其函数的定义如下:

// Deferred.kt
public interface Deferred<out T> : Job {
  public suspend fun await(): T
} 

// Delay.kt
public suspend fun delay(timeMillis: Long) {...}

挂起函数比正常的函数多了一个suspend关键字修饰。这个关键字的意思就是“挂起”,所以suspend关键字修饰的函数也叫“挂起函数”。挂起函数有个调用约定:挂起函数必须在协程或者其他挂起函数中被调用。这保证了所有挂起函数都是在协程中被调用,是协程机制运转的基本要求。

挂起函数会把所在协程挂起来,中断协程后续的代码执行。协程的代码被中断执行,但它所在的线程并没有被阻塞,如上图实例中的job被await阻塞后,所在的main线程,仍然可以执行其它代码。换个方式说,就是协程被挂起时,协程与它运行所在的线程解绑了,线程会被线程池继续调度执行别的代码。

当挂起函数等待的结果返回后,会通知协程的恢复后续代码的执行。协程的恢复由挂起函数内部的续体机制来实现,后面会具体说明。

协程挂起的实现原理

了解了Kotlin协程挂起的整体运行流程,我们进一步来看看协程挂起的实现原理。要想了解Kotlin在Android中的实现原理,必须分析其在Android中的源码实现。不过,因为Kotlin协程的源码和运行时的代码有些差别,实际运行时的代码,很多是通过编译器自动处理生成的。所以,我们需要借助Kotlin Bytedcode Decomile工具反编译出java文件,来进行代码分析。

上节中的示例代码,经过反编译后的核心代码如下:

//TestCoroutin.decompiled.java
public final void testInMain() {
  Log.d("cjf---", var10001.append("testInMain start").toString());

  Job job = BuildersKt.launch$default( CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getMain()),  (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
      //单独拆分到下面,需要详细讲解
    }

    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {/......./}

    public final Object invoke(Object var1, Object var2){/......./}

  }), 3, (Object)null);

  Log.d("cjf---", var10001.append("testInMain end ").toString());
}

//job协程的SuspendLambda续体,其invokeSuspend方法代码
public final Object invokeSuspend(@NotNull Object $result) {
  ... ...
  label17: {
    Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch(this.label) {
    case 0:
      Log.d(var10001.append("job start ").toString());
      Deferred job1 = BuildersKt.async$default(/......./);
      job2 = BuildersKt.async$defaultdefault(/......./);
      Log.d(var10001.append("before job1 return").toString());
      var6 = var10001.append("job1 result =");
      this.L$0 = job2;
      this.L$1 = var5;
      this.L$2 = var6;
      this.label = 1;
      var10000 = job1.await(this);
      if (var10000 == var8) {
        return var8;
      }
      break;
    case 1:
      var6 = (StringBuilder)this.L$2;
      var5 = (String)this.L$1;
      job2 = (Deferred)this.L$0;
      ResultKt.throwOnFailure($result);
      var10000 = $result;
      break;
    case 2:
      var6 = (StringBuilder)this.L$1;
      var5 = (String)this.L$0;
      ResultKt.throwOnFailure($result);
      var10000 = $result;
      break label17;
    default:
      throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }

    var7 = var10000;
    Log.d(var5, var6.append((String)var7).toString());
    Log.d(var10001.append("before wait job2 return").toString());
    var6 = var10001.append("job2 result = ");
    this.L$0 = var5;
    this.L$1 = var6;
    this.L$2 = null;
    this.label = 2;
    var10000 = job2.await(this);
    if (var10000 == var8) {
      return var8;
    }
  } //end of label17

  Log.d(var5, var6.append((String)var7).toString());
  Log.d("cjf---", var10001.append("job end ").toString());
  return Unit.INSTANCE;
} //end of invokeSuspend

我们可以看到,反编译后的TestCoroutine类的testInMain和源代码中的区别,除了job协程块不同,其它变化不大,先打印“[main]testInMain start”,然后创建job协后,最后打印“[main]testInMain end”,完成整个方法的执行。

反编译后的主要区别在job协程,其Lambda代码块转换成了Function2 实现。

我们借助APK反编译工具,可以看到执行代码中,Function2 实际上被SuspendLambda 类继承实现。

图片

SuspendLambda实现类的关键逻辑在invokeSuspend方法中,而invokeSuspend方法中采用了CPS(Continuation-Passing-Style) 续体传递风格

续体传递风格会将job协程的Lambda代码块,通过label标签和switch分割成多个代码块。代码块分割的点,就是协程中调用suspend挂起函数的地方。

分支代码调用到await挂起函数时,如果返回了COROUTINE_SUSPENDED,就退出invokeSuspend,进入挂起状态。

我们用流程图来描述上面示例代码,转换后的续体传递风格代码,如下:

图片

我们可以看到,整个示例代码,被分割成了5个代码块。其中case1 代码块主要负责为label17 代码块进行参数转换;case2 代码块主要负责为最外层代码块进行参数转换;所以相当于2个await挂起函数,将lambda代码块分割成了3个实际执行的代码块。

而且job1.await和job2.await会根据挂起函数的返回值进行不同处理,如果返回挂起,则进行协程挂起,当前协程退出执行;如果返回其它值,则协程继续后续代码块的执行。

编译器在编译期间,除了会对所有suspend修饰的函数调用处进行续体传递风格变换,还会同时改变suspend函数的签名。比如await函数是个suspend函数,签名声明如下:

suspend fun <T> CompletableFuture<T>.await(): T

编译器进行变换后:

fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?

我们可以看到,改变后的函数多了一个类型为Continuation的参数。Continuation可以称之为协程续体,它提供了协程恢复的基本方法:resumeWith。Continuation续体声明很简单:

interface Continuation<in T> {
    val context: CoroutineContext
    fun resumeWith(result: Result<T>)
}

其具体实现在SuspendLambda的父类BaseContinuationImpl中:

//class BaseContinuationImpl 中 fun resumeWith 内部核心代码
while (true) {
  probeCoroutineResumed(current)
  with(current) {
    val completion = completion!! 
    val outcome: Result<Any?> = //协程返回了结果,说明协程执行完毕
        try {
            val outcome = invokeSuspend(param)//执行协程的续体代码块
            if (outcome === COROUTINE_SUSPENDED) return //挂起函数返回挂起标志,退出后续代码执行
            Result.success(outcome) //没有返回挂起标志,将返回值outcome封装为Result返给外层outcome
        } catch (exception: Throwable) {
            Result.failure(exception)//将异常Result返给外层outcome
        }
    releaseIntercepted() // 释放当前协程的拦截器
    if (completion is BaseContinuationImpl) {//如果上一层续体是一个单纯的续体,则将结果作为上一层续体的恢复参数,进行上一层续体的恢复
        current = completion
        param = outcome
    } else {//上一层续体是一个协程,则调用协程的恢复函数,进行上一层的协程恢复
        completion.resumeWith(outcome)
        return
    }
  }
}

如果invokeSuspend函数返回中断标志时,会直接从函数中返回,等待后续继续被恢复执行。

如果invokeSuspend函数返回的是结果,且上一层续体不是单纯的续体而是协程体,它会调用参数completion的resumeWith函数,恢复上一层协程的invokeSuspend代码的执行。

协程被resumeWith恢复后,会继续调用invokeSuspend函数,根据label值执行下一个case分支代码块。按照这个恢复流程,直到所有invokeSuspend代码执行完,返回非COROUTINE_SUSPENDED的结果,协程就执行结束。

我们继续看job续体在invokeSuspend中调用到job1.await函数时,await是怎么实现返回挂起标志,和后续恢复job协程的。核心代码可以在awaitSuspend中查看:

// JobSupport.kt中 awaitSuspend方法
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
    val cont = AwaitContinuation(uCont.intercepted(), this)
    cont.disposeOnCancellation(invokeOnCompletion(                          
                   ResumeAwaitOnCompletion(this, cont).asHandler))
    cont.getResult()
}

// JobSupport.kt中 invokeOnCompletion方法
public final override fun invokeOnCompletion(...):DisposableHandle {
  var nodeCache: JobNode<*>? = null
  loopOnState { state ->
    when (state) {
      is Empty -> { // 没有completion handlers,直接创建Node放入state
         val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
         if (_state.compareAndSet(state, node)) return nod
      }
      is Incomplete -> {// 有completion handlers,加入到node list列表
          val list = state.list
          val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
          if (!addLastAtomic(state, list, node)) return@loopOnState /
      }
      else -> { // 已经完成,不需要加入结果监听Node
          if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) return NonDisposableHandle
      }
    }
  }
}

// AbstractCoroutine.kt 中 resumeWith方法
// 通知state node,进行恢复
public final override fun resumeWith(result: Result<T>) {
    // makeCompletingOnce 大致实现是修改协程状态,如果需要的话还会将结果返回给调用者协程,并恢复调用者协程
    makeCompletingOnce(result.toState(), defaultResumeMode)
}

可以看出,job1.await()首先会通过getResult()去获取job1的结果,如果有结果则直接返回结果,否则立即返回中断标志,这样就实现了await挂起点挂起job协程了。await()挂起函数恢复job协程的流程是,将job 协程封装为 ResumeAwaitOnCompletion,并将其再次封装成handler 节点,添加job1协程的 state.list。

等job1协程完成后,会通知 handler 节点调用job协程的 resumeWith(result) 方法,从而恢复 job协程await 挂起点之后的代码块的执行。

我们再次结合示例代码, 来梳理这个挂起和恢复流程:

图片

note:绿色底色,表示在主线程执行;红色字体,表示调用挂起函数;

可以看到整个过程:

通过续体传递风格的invokeSuspend代码,和续体之间形成的resumewith恢复链,协程得以实现挂起和恢复的核心流程。

总结

本文通过示例Log和反编译后的源码分析,讲解了Kotlin协程的挂起和恢复流程。

协程的挂起和恢复,是通过invokeSuspend和resumeWith这2个核心方法串起来的,其最重要的就是Continuation续体链的建立和恢复。

协程中的Coroutine和SuspendLambda,以及线程派发器DispatchedContinuation,都是继承自Continuation续体,都能通过resumeWith恢复上一层续体或协程。

协程的挂起流程就是通过Coroutine,SuspendLambda,和Dispatcher这三层进行Cotinuation封装,形成Cotinuation恢复链,配合CPS续传体变形实现了协程内有序代码的异步挂起和恢复。

通过了解协程的挂起原理,我们可灵活运用协程的特点,进行业务中的异步任务调度,输出清晰明了,安全可维护的代码。

如果我们新的业务在用协程时,想要复用原有的异步任务,就可以对异步任务进行suspend封装;对于原异步模块无复用需求的模块,我们可以进一步引入Jetpack对协程的支持,如生命周期感知型ViewModelScope,LifecycleScope,来控制业务中异步任务的生命周期。

❝许多 Jetpack 库都包含提供全面协程支持的扩展。某些库还提供自己的协程作用域,可供您用于结构化并发。ViewModel 包含一组可直接与协程配合使用的 KTX 扩展。这些扩展是lifecycle-viewmodel-ktx 库。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8