kt协程 | suspend非阻塞挂起魔术解密

一 前言


kotin协程,一种轻量级用户态线程,能通过suspend函数避免回调地狱以及快速实现线程的切换等,已经普及到大量实际项目中。这里将解析协程核心功能suspend的「非阻塞式挂起」实现原理,从而避免管中窥豹,使得在后续使用kotlin协程开发能更加得心应手。


二 callback代码同步化



kotlin官方宣传的非阻塞式挂起,即用同步代码写异步操作。其实可以理解,就是通过suspend将代码“用阻塞式的写法实现非阻塞式线程调度”,就是内部帮我们切线程。 先简述同步化写法,下面会逐步分析,kotlin协程是如何通过suspend来实现非阻塞式的代码同步化



2.1 callback代码同步化


假设一个简单的需求,先从网络获取最新的消息,再与本地数据库做增量操作,获取完整消息。 在原生中,回调代码如下:


fun fetch(){
fetchRemote { msg->
fetchLocal(msg) { result ->
//实际业务操作
println("result:$result")
}
}
}

fun fetchRemote(onNext:(Int)->Unit){
Thread.sleep(300)
val value = 1

onNext(value)
}
fun fetchLocal(id:Int,onNext:(Int)->Unit){
Thread.sleep(300)
val value = 2

onNext(id + value)

利用了kotlin协程,可以直接以同步方式:


suspend fun fetch():Int{//用正常同步写法,消除回调
val msg = fetchRemote()
val result = fetchLocal(msg)
println("result:$result")
return result
}

//简单起见,采用suspendCoroutine
suspend fun fetchRemote() = suspendCoroutine {
it.resume(1)
}

suspend fun fetchLocal(id:Int) = suspendCoroutine {
it.resume(id + 2)
}

ok,上面的 suspendFetch函数写法,就是传说中的 “同步代码实现异步操作” 了,简称「代码同步化」


三 suspend解密



备注:为方便理解,下面展示的是伪代码,与实际字节码翻译可能存在不同;



3.1 suspend函数解体


这里先讲解一个声明为suspend的函数,如suspend fun fetch():Int,会如何被kotlin编译器解体,再讲述执行过程。


先总结:kotlin编译器会使用状态机实现回调功能,每一个suspend函数都是状态机的一个状态,suspend就是声明一个状态而已,再往简单地说,编译器会注入代码,内部帮我们实现了回调!



  1. 编译器首先会在suspend函数,加入一个额外的参数 completion: Continuation,比如会将上述的suspend fun fetch():Int变成fun fetch(completion: Continuation):Any,这也额外解释了为何suspend只能被suspend函数或协程内调用。

注意,这里的返回值变成any,是因为除了我们定义的返回值以外,还可能返回挂起标识CoroutineSingletons.COROUTINE_SUSPENDED,也就是用于实现挂起的逻辑



  1. kotlin编译器用状态机机制判断当前会执行哪个代码块,每一个挂起函数都会被当成一个状态点,用label来表示,如fetchRemote是一个label,一个可能会存在挂起的状态,伪代码:

 
fun fetch(completion: Continuation):Any{
...
when(label){
0 -> {//label 为默认值0 ,即fetch函数被第一次调用运行,函数代码此时正常运行,还没接触到任何其他挂起函数
...
label = 1 //下面会运行一个挂起函数,所以状态label立即加1,也就是说label==1,表示代码运行到了第一个挂起函数,此处是fetchRemote()
val state = fetchRemote()
...
return COROUTINE_SUSPENDED
}
1 -> { //label 1 ,表示在遇到第一个挂起函数fetchRemote() 之后,调用resume等方式恢复了调度
...
label = 2 //下面会运行另一个挂起函数,所以状态label立即加1,也就是说label==2,表示代码运行到了第二个挂起函数,此处是fetchLocal()
val state = fetchLocal(id)
...
return COROUTINE_SUSPENDED
}
2 -> {//label 2 ,表示在遇到第二个挂起函数fetchLocal() 之后,调用resume等方式恢复了调度
...
println("result:$result")
return result
}
}
}

再次提下总结:每一个suspend函数都是状态机的一个状态,suspend就是声明一个状态,体现到代码层次就是一个label值来表示。



  1. 到这里,还需要在状态之间分发上一个状态机的执行结果「即,上一个suspend的返回值」。kotlin通过生成一个状态机管理类,存储label和结果值,解决这个问题:


这里的类命名只是为了方便理解



class FetchStateMachine(
completion: Continuation
) : ContinuationImpl(completion) {
var result: Result? = null
var label: Int = 0

override fun invokeSuspend(result: Any?) {
this.result = result
fetch(this)
}
}

先注意这里的invokeSuspend包裹了真实的要执行的协程体,并保存了传进来的执行结果result,负责存储每个suspend函数执行结果以共享。


4.一个小点,就是如何判断它是第一次执行这个suspend函数,也就是初始状态label==0。这里比较简单,直接通过判断completion是不是生成的状态机类就知道了,不是状态机类就代表第一次执行,包裹起来:


val continuation = completion as? FetchStateMachine ?: FetchStateMachine(completion)


  1. 再接上最开始提到的挂起逻辑。是否特别好奇过,究竟协程是如何知道该挂起,该怎么做了?答案很简单,当某个挂起函数,如fetchRemote(),没有调resume时,编译器会让它返回一个CoroutineSingletons.COROUTINE_SUSPENDED结果,这也是为什么返回值会变成Any,然后只要判断result == 挂起标志,代码直接return,就实现挂起了!!是不是很朴实??

val result = fetchRemote(continuation)
if (result == CoroutineSingletons.COROUTINE_SUSPENDED){
return result
}

到了这里,就可以看到编译器对fetch()解体的代码的模样了:


fun fetch(completion: Continuation): Any {
class FetchStateMachine(
completion: Continuation
) : ContinuationImpl(completion) {
var result: Result? = null //执行结果的共享
var label: Int = 0 //判断执行到哪个代码快,挂起函数

override fun invokeSuspend(result: Any?) {//触发状态机运行,调用resumeWith时会触发
this.result = result
suspendFetch(this)
}
}

//第一次执行,包裹成状态机类
val continuation = completion as? FetchStateMachine ?: FetchStateMachine(completion)

val result = continuation.result
val suspended = COROUTINE_SUSPENDED

when (continuation.label) {
0 -> {
//检查是否异常
result.throwOnFailure()
//立即修改label+1
continuation.label = 1
val var0 = fetchRemote(continuation)
if (var0 == suspended){ //表示suspendRemote挂起
return var0
}

//再次触发状态机跑下一个 label1,正常情况不会跑这里。只有当suspendRemote实现是普通函数 suspend fun suspendRemote() = 1,才会触发
fetch(continuation)
}
1 -> {
result.throwOnFailure()
continuation.label = 2
val var0 = fetchLocal(result.value,continuation)
if (var0 == suspended){//这里就相当于一次挂起了
return var0
}
fetch(continuation)
}
2 -> {
result.throwOnFailure()
return result.value
}
else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
}

3.2 执行流程


ok,这里针对编译器解体的代码,讲一下状态机执行过程;



  1. launch协程后,会触发协程体执行,从而第一次调用到fetch()方法,开始执行状态机;
  2. 第一次进来,将completion包装成状态机类,此时label为0,执行到第一个挂起函数fetchRemote()
  3. fetchRemote() 是个普通函数,类似suspend fun a()=1这种只是简单声明suspend的函数,会直接返回函数结果值,递归调度fetch(continuation)

//Decompilerkotlin to java by cfr
public static final Object a(@NotNull Continuation $completion) {
return Boxing.boxInt((int)1);
}


  1. fetchRemote() 是实现了suspendCoroutine/suspendCoroutine的正经挂起函数时,函数会返回一个挂起标志CoroutineSingletons.COROUTINE_SUSPENDED,这也是会什么suspend函数返回值是Any类型,到这里会发生一次挂起;

image.png



  1. 对于fetchRemote,当调用resumeWith恢复调度时,会递归循环调用我们一开始生成的状态机包裹类的invokeSuspend方法,而invokeSuspend方法就是会再次触发自身函数,即fetch()

image.png



  1. 此时触发状态机接着跑此时的label为1,会跑到fetchLocal挂起方法。然后循环递归步骤3 4,直到结束。

这里的执行流程核心就是一个循环递归,从而帮我们内部实现回调。


作者:CYQ
链接:https://juejin.cn/post/7016636661400338463
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册