假如我们有这样一个需求:
fun childFun1(): Int {
return 10
}
fun childFun2(): Int {
return 20
}
fun childFun3(num1: Int, num2: Int): Int {
return num1 + num2
}
fun parentFun() : Int {
val num1 = childFun1()
val num2 = childFun2()
val sum = childFun3(num1, num2)
return sum
}
即: 从多个操作中(childFun1, childFun2) 获取值,然后再对这些值进行处理(childFun3),程序逻辑非常直观易懂。
例如,先上传用户选择的图片到服务端储存,获取服务端返回的图片对应的地址,然后这些地址设置到对应位置。
如果这些操作都是耗时操作,为了不阻塞线程,需要将这些耗时操作放到其他线程中,即
val executor: ExecutorService = Executors.newFixedThreadPool(2)
fun childFun1(callback: (Int) -> Unit): Unit {
executor.execute {
Thread.sleep(1000)
callback(10)
}
}
fun childFun2(callback: (Int) -> Unit): Unit {
executor.execute {
Thread.sleep(2000)
callback(20)
}
}
fun childFun3(num1: Int, num2: Int, callback: (Int) -> Unit): Unit {
executor.execute {
Thread.sleep(500)
callback(num1 + num2)
}
}
fun parentFun(callback: (Int) -> Unit) : Unit {
childFun1(fun(num1) {
childFun2(fun(num2) {
childFun3(num1, num2, callback)
})
})
}
因为是异步操作,结果值不能直接返回,只能通过 callback 方式异步回传,所以当异步操作很多的时候,整个回调链就很长了,让代码逻辑显得不清晰。
suspend fun childFun1(): Int {
delay(1000)
return 10
}
suspend fun childFun2(): Int {
delay(2000)
return 20
}
suspend fun childFun3(num1: Int, num2: Int): Int {
delay(500)
return num1 + num2
}
suspend fun parentFun() : Int {
val num1 = childFun1()
val num2 = childFun2()
val sum = childFun3(num1, num2)
return sum
}
我们可以看到和同步方式操作一模一样,只不过方法上多了 suspend 关键字而已。
注: 这里的 delay 方法,不会像 Thread.sleep 阻塞当前线程。
上面说过,suspend 不会阻塞当前线程,那么它怎么将异步操作的数据,同步传递回来呢?答案其实也是回调,只不过隐藏的很深,我们慢慢分析。
suspend fun childFun1(): Int {
Thread.sleep(1000)
return 10
}
suspend fun childFun2(): Int {
Thread.sleep(2000)
return 20
}
suspend fun childFun3(num1: Int, num2: Int): Int {
Thread.sleep(500)
return num1 + num2
}
suspend fun parentFun() : Int {
val num1 = childFun1()
val num2 = childFun2()
val sum = childFun3(num1, num2)
return sum
}
这里将 delay 换成 Thread.sleep ,先看 suspend 反编译的 java 代码
public final class CoroutineKt {
@Nullable
public static final Object childFun1(@NotNull Continuation $completion) {
Thread.sleep(1000L);
return Boxing.boxInt(10);
}
@Nullable
public static final Object childFun2(@NotNull Continuation $completion) {
Thread.sleep(2000L);
return Boxing.boxInt(20);
}
@Nullable
public static final Object childFun3(int num1, int num2, @NotNull Continuation $completion) {
Thread.sleep(500L);
return Boxing.boxInt(num1 + num2);
}
@Nullable
public static final Object parentFun(@NotNull Continuation $completion) {
Object $continuation;
label37: {
if ($completion instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)$completion;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label37;
}
}
$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
int label;
int I$0;
int I$1;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return CoroutineKt.parentFun(this);
}
};
}
Object var10000;
label31: {
int num1;
int num2;
Object var6;
label30: {
Object $result = ((<undefinedtype>)$continuation).result;
var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
var10000 = childFun1((Continuation)$continuation);
if (var10000 == var6) {
return var6;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
num1 = ((<undefinedtype>)$continuation).I$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label30;
case 3:
num2 = ((<undefinedtype>)$continuation).I$1;
num1 = ((<undefinedtype>)$continuation).I$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label31;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
num1 = ((Number)var10000).intValue();
((<undefinedtype>)$continuation).I$0 = num1;
((<undefinedtype>)$continuation).label = 2;
var10000 = childFun2((Continuation)$continuation);
if (var10000 == var6) {
return var6;
}
}
num2 = ((Number)var10000).intValue();
((<undefinedtype>)$continuation).I$0 = num1;
((<undefinedtype>)$continuation).I$1 = num2;
((<undefinedtype>)$continuation).label = 3;
var10000 = childFun3(num1, num2, (Continuation)$continuation);
if (var10000 == var6) {
return var6;
}
}
int sum = ((Number)var10000).intValue();
return Boxing.boxInt(sum);
}
}
我们注意到 suspend 修饰的方法,转成 java 方法时,会在方法最后面添加上 Continuation 类型的参数:
suspend fun childFun3(num1: Int, num2: Int): Int {
Thread.sleep(500)
return num1 + num2
}
// 变成了
@Nullable
public static final Object childFun3(int num1, int num2, @NotNull Continuation $completion) {
Thread.sleep(500L);
return Boxing.boxInt(num1 + num2);
}
这个 Continuation 实例非常重要,它是协程能够实现异步回调的关键对象。
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
注: 这里的唤醒,并不是说协程被线程阻塞了
观察 childFun1 、 childFun2 和 childFun3 方法,除了参数上多了一个 $completion 参数,并没有其他变化,那是因为这三个方法中,没有调用其他 suspend 方法,所以和普通函数没有多大区别。
suspend fun parentFun() : Int {
val num1 = childFun1()
val num2 = childFun2()
val sum = childFun3(num1, num2)
return sum
}
// 转变成了
@Nullable
public static final Object parentFun(@NotNull Continuation $completion) {
Object $continuation;
label37: {
if ($completion instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)$completion;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label37;
}
}
$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
int label;
int I$0;
int I$1;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return CoroutineKt.parentFun(this);
}
};
}
Object var10000;
label31: {
int num1;
int num2;
Object var6;
label30: {
Object $result = ((<undefinedtype>)$continuation).result;
var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
var10000 = childFun1((Continuation)$continuation);
if (var10000 == var6) {
return var6;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
num1 = ((<undefinedtype>)$continuation).I$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label30;
case 3:
num2 = ((<undefinedtype>)$continuation).I$1;
num1 = ((<undefinedtype>)$continuation).I$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label31;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
num1 = ((Number)var10000).intValue();
((<undefinedtype>)$continuation).I$0 = num1;
((<undefinedtype>)$continuation).label = 2;
var10000 = childFun2((Continuation)$continuation);
if (var10000 == var6) {
return var6;
}
}
num2 = ((Number)var10000).intValue();
((<undefinedtype>)$continuation).I$0 = num1;
((<undefinedtype>)$continuation).I$1 = num2;
((<undefinedtype>)$continuation).label = 3;
var10000 = childFun3(num1, num2, (Continuation)$continuation);
if (var10000 == var6) {
return var6;
}
}
int sum = ((Number)var10000).intValue();
return Boxing.boxInt(sum);
}
我们发现 parentFun 方法转成的 java 代码,比我们想象中的要多,这个就是协程实现的秘密。
方法流程分析:
注: 当调用 $continuation 对象的 resumeWith 方法会调用 invokeSuspend 方法,就会再次调用 parentFun 方法。
2 . 当 label 不同的时候,执行的逻辑不同: 1 . 当 label = 0 时,先将 label 设置成 1 ,并调用 childFun1 方法,参数就是当前协程体 continuation 的 resumeWith 方法回调来的,得到 childFun1 方法异步结果值。我们的例子中,不会走到这一步,因为我们直接返回了结果值。3 . 当 label = 2 时, 同上,是 childFun2 方法内部通过 $continuation 的 resumeWith 方法回调来的,得到 childFun2 方法异步结果值。4 . 当 label = 3 时, 同上原理。
即: label = 1 表示调用了 childFun1 方法;label = 2 表示调用了 childFun2 方法;label = 3 表示调用了 childFun3 方法。并等待 resumeWith 回调带来的结果值。
这种实现方式,我们称之为状态机。
通过上面的分析,我们了解到协程是如何实现挂起和恢复的。
不同于线程的阻塞和唤醒,协程的挂起是方法直接返回,不执行接下来的代码,它的恢复是通过被调用放来实现的。以上面的例子为例:
因此我们可以总结:
本质上协程也是通过回调实现异步操作的,只不过 kotlin 编译器将协程函数变成状态机。也明白了为什么 suspend 函数为什么只能在 suspend 函数内部调用,而不能在普通函数内部执行,因为没有隐藏的 $continuation 对象。
上面分析了 suspend 函数,但是现在这个函数,没办法执行,因为 suspend 函数都需要 Continuation 实例,那么第一个 Continuation 实例该如何创建呢?
kotlin 标准库中提供了两个函数来创建 Continuation 实例
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
通过 suspend 函数去创建 Continuation 对象,返回 SafeContinuation 类的实例,这个类以后我们有机会分析。completion 当协程完成之后,会调用它的 resumeWith 方法。
例如:
fun main() {
val coroutine = (::parentFun).createCoroutine(object : Continuation<Int>{
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("result:$result")
}
})
// 执行
coroutine.resume(Unit)
}
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}
这个方法不仅创建了 Continuation 对象,并且还直接执行了协程函数。例如:
fun main() {
(::parentFun).startCoroutine(object : Continuation<Int>{
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("result:$result")
}
})
}
上面的例子中 childFun1 这些方法,我们并没有实现异步操作,这里有两个难点:
针对这种情况,kotlin 提供了 suspendCoroutine 方法来解决这个问题。
@SinceKotlin("1.3")
@InlineOnly
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
将 childFun1 方法进行改变
suspend fun childFun1(): Int {
Thread.sleep(1000)
return 10
}
// 转变成
val executor = Executors.newScheduledThreadPool(1)
suspend fun childFun1(): Int = suspendCoroutine { continuation ->
executor.schedule(fun() { continuation.resume(10) }, 1000, TimeUnit.MILLISECONDS)
}
这里我们并没有直接返回,而是通过 executor 线程池,延迟 1 秒钟之后再返回值,模拟异步操作。
转成的 java 代码
@Nullable
public static final Object childFun1(@NotNull Continuation $completion) {
boolean var1 = false;
boolean var3 = false;
SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
Continuation continuation = (Continuation)var4;
int var6 = false;
executor.schedule((Runnable)(new CoroutineKt$childFun1$2$1(continuation)), 1000L, TimeUnit.MILLISECONDS);
Object var10000 = var4.getOrThrow();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
return var10000;
}
因为 suspendCoroutine 函数时内联函数,因此函数内容直接复制到 childFun1 函数中。
SafeContinuation 的 getOrThrow 方法
@PublishedApi
internal actual fun getOrThrow(): Any? {
var result = this.result // atomic read
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Result.Failure -> throw result.exception
else -> result // either COROUTINE_SUSPENDED or data
}
}
可以看出在没有调用 resumeWith 方法时,就返回 COROUTINE_SUSPENDED
val executor = Executors.newScheduledThreadPool(2)
suspend fun childFun1(): Int = suspendCoroutine { continuation ->
executor.schedule(fun() { continuation.resume(10) }, 1000, TimeUnit.MILLISECONDS)
}
suspend fun childFun2(): Int = suspendCoroutine { continuation ->
executor.schedule(fun() { continuation.resume(20) }, 2000, TimeUnit.MILLISECONDS)
}
suspend fun childFun3(num1: Int, num2: Int): Int = suspendCoroutine { continuation ->
executor.schedule(fun() { continuation.resume(num1 + num2) }, 1000, TimeUnit.MILLISECONDS)
}
suspend fun parentFun() : Int {
val num1 = childFun1()
val num2 = childFun2()
val sum = childFun3(num1, num2)
return sum
}
fun main() {
(::parentFun).startCoroutine(object : Continuation<Int>{
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("result:$result")
}
})
}
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8