由于android限制了只能在UI线程更新视图,而在UI线程中做耗时任务又会导致ANR,因此在平时的开发中,需要将耗时的数据请求工作放到子线程中执行,而视图更新工作放到UI线程中,使用传统的handler或者asyncTask,需要将逻辑分到多个函数内

使用kotlin的协程机制,可以用同步的方式实现异步
kotlin的协程机制是基于状态机模型和C-P-S风格实现的。
一个协程通过resume启动,当协程内部调用supended函数时,协程会被暂停,通过调用 resume可以再次启动协程。每次暂停都会修改协程的状态,再次启动协程时,会从新的状态处开始执行。

现在通过kotlin的基础api实现一个简单的异步调用接口,最后的效果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
btn.setOnClickListener {
runOnUI {
//执行在主线程,可以做一些初始化操作
Log.e("log", Thread.currentThread().name)
var used = async { //从工作线程直接返回数据到主线程
//切换到工作线程执行,而且lambda可以直接访问外部变量,构成闭包
Log.e("log", Thread.currentThread().name)
var start = System.currentTimeMillis()
Thread.sleep(3000)
System.currentTimeMillis() - start
}
//继续执行在主线程
Log.e("log", Thread.currentThread().name)
Toast.makeText(this@MainActivity, "后台线程用时${used}ms", Toast.LENGTH_SHORT).show()
}
}

在后续的内容中,我将在实现的过程中逐步分析kotlin协程机制的基本原理

首先声明一个创建协程的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//该函数接收一个 suspend类型的lambda
inline fun runOnUI(noinline block: suspend () -> Unit) {
var continuation = object : Continuation<Unit> {
//ThreadSwitcher是ContinuationInterceptor的子类,用于在协程resume时切换到主线程执行
override val context: CoroutineContext
get() = ThreadSwitcher()

override inline fun resume(value: Unit) = Unit

override inline fun resumeWithException(exception: Throwable) = Unit
}
//使用suspend类型的lambda创建一个协程并启动
block.createCoroutine(continuation).resume(Unit)
}

createCoroutine是官方提供的一个基础api,该函数如下:

1
2
3
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> = SafeContinuation(createCoroutineUnchecked(completion), COROUTINE_SUSPENDED)

可以看到调用了createCoroutineUnchecked创建一个Coroutine,继续查看该方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SinceKotlin("1.1")
@kotlin.jvm.JvmVersion
public fun <T> (suspend () -> T).createCoroutineUnchecked(
completion: Continuation<T>
): Continuation<Unit> =
//这里的this是执行createCoroutine函数的block
if (this !is kotlin.coroutines.experimental.jvm.internal.CoroutineImpl)
buildContinuationByInvokeCall(completion) {
@Suppress("UNCHECKED_CAST")
(this as Function1<Continuation<T>, Any?>).invoke(completion)
}
else
//编译时,block会被编译成一个CoroutineImpl的子类,所以走这个分支
(this.create(completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade

查看编译之后生成的block

1
2
3
4
5
6
//查看在Activity#onCreate调用runOnUI处传入的lambda的编译类
final class ymc/demo/com/asyncframe/MainActivity$onCreate$1$1
extends kotlin/coroutines/experimental/jvm/internal/CoroutineImpl
implements kotlin/jvm/functions/Function1 { //lambda编译类都实现FunctionN函数
...
}

可以看到传入runOnUIlambda确实被编译成了一个CoroutineImpl,这是因为编译器推断出了这个lambdasuspend类型的。

继续上面的分析,创建协程所涉及到的两个方法中都出现了 Continuation这个类,那么这个类是干嘛的呢?
首先,先看看completion,这个是我们调用createCoroutine手动传入的,当协程结束时,他的resume会被调用,当协程异常结束时,他的resumeWithException会被调用。
再看看createCoroutineUnchecked,这个函数也返回了一个Continuation,那么这个又是什么呢?

1
(this.create(completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade

可以看到,返回的是CoroutineImplfacade,那这个又是什么呢?
我们进入CoroutineImpl,可以看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
abstract class CoroutineImpl(
arity: Int,
@JvmField
protected var completion: Continuation<Any?>?
) : Lambda(arity), Continuation<Any?> { //Coroutine本身是一个Continuation

override val context: CoroutineContext
get() = _context!!

private var _facade: Continuation<Any?>? = null

val facade: Continuation<Any?> get() {
if (_facade == null) _facade = interceptContinuationIfNeeded(_context!!, this)
return _facade!!
}
...
}

原来这是一个代理属性,接着查看interceptContinuationIfNeeded

1
2
3
4
internal fun <T> interceptContinuationIfNeeded(
context: CoroutineContext,
continuation: Continuation<T>
) = context[ContinuationInterceptor]?.interceptContinuation(continuation) ?: continuation

这个函数从Coroutine的上下文中查找ContinuationInterceptor,如果有就调用他的interceptContinuation对传入的continuation进行包装,否则直接返回传入的continuation

Continuation是一个可继续执行体的抽象,每个Coroutine都是一个可继续执行体,Continuation是一个协程对外的接口,启动/恢复协程的resume就是在该接口中定义的。
协程可以是链式连接的,一个协程可以有子协程,子协程持有父协程的引用,当子协程执行时,父协程暂停,子协程结束时,内部通过调用父协程的resume返回父协程。

还记得我们前面用到的ThreadSwitcher吗,他就是一个ContinuationInterceptor
我们来看看来看ThreadSwitcher的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
Interceptor用于用于拦截并包装Continuation,让我们有机会在协程resume前做一些额外的操作,比如线程切换
**/
class ThreadSwitcher : ContinuationInterceptor, AbstractCoroutineContextElement(ContinuationInterceptor.Key) {

override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
= object : Continuation<T> by continuation {

override fun resume(value: T) {
//如果在主线程,直接执行
if (Looper.getMainLooper() === Looper.myLooper()) {
continuation.resume(value)
} else {
//否则,使用handler机制post到主线程执行
postman.post {
resume(value)
}
}
}

override fun resumeWithException(exception: Throwable) {
if (Looper.getMainLooper() === Looper.myLooper()) {
continuation.resumeWithException(exception)
} else {
postman.post {
resumeWithException(exception)
}
}
}
}
}

从上面的分析中,我们可以想象,我们创建的协程会被ThreadSwitcher包装,

1
block.createCoroutine(continuation).resume(Unit)

createCoroutine返回的实际是ThreadSwitcher返回的Continuation,所以当我们执行resume启动协程时,会先切换到主线程执行。

紧接着,我们来实现async

1
2
3
4
5
6
7
suspend inline fun <T> async(crossinline block: () -> T): T
= suspendCoroutine {
//dispatcher是一个对线程池的封装,将任务分发到子线程中
dispatcher.dispatch {
it.resume(block())
}
}

使用suspend修饰的方法只可以在协程内部调用,而suspendCoroutine方法是kotlin提供的一个基础api,用于实现暂停协程。
我们接着来分析suspendCoroutine,查看他的实现:

1
2
3
4
5
6
public inline suspend fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
suspendCoroutineOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c)
block(safe)
safe.getResult()
}

可以看到这个方法接收的block是带Continuation参数的
真正实现功能的是suspendCoroutineOrReturn,当我们继续跟进时,发现:

1
2
public inline suspend fun <T> suspendCoroutineOrReturn(crossinline block: (Continuation<T>) -> Any?): T =
throw NotImplementedError("Implementation is intrinsic")

what!直接抛出异常了???
这是因为这是一个特殊的函数,需要编译器特殊处理,他需要将当前协程内的_facade属性,包装成SafeContinuation,再作为我们传入的block的参数,而且这个_facade是经过ContinuationInterceptor处理过的,也就是说当我们调用resume恢复线程时,会先切换到主线程。
为了验证上面的分析,我们查看async编译之后的字节码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//可以看到编译之后,我们的async多了一个Continuation类型的参数
private final static async(Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
L0
LINENUMBER 70 L0
NOP
L1
LINENUMBER 77 L1
ICONST_0
INVOKESTATIC kotlin/jvm/internal/InlineMarker.mark (I)V
ALOAD 1 //将第二个参数,也就是Continuation入栈
//调用CoroutineIntrinsics.normalizeContinuation
INVOKESTATIC kotlin/coroutines/experimental/jvm/internal/CoroutineIntrinsics.normalizeContinuation (Lkotlin/coroutines/experimental/Continuation;)Lkotlin/coroutines/experimental/Continuation;
//将返回值存到slot3
ASTORE 3
L2
LINENUMBER 78 L2
//new 一个SafeContinuation
NEW kotlin/coroutines/experimental/SafeContinuation
DUP
//将刚刚normalizeContinuation返回的continuation传入SafeContinuation的构造函数
ALOAD 3
INVOKESPECIAL kotlin/coroutines/experimental/SafeContinuation.<init> (Lkotlin/coroutines/experimental/Continuation;)V
ASTORE 4
L3
...

我们可以看到,编译之后的字节码已经没有了suspendCoroutinesuspendCoroutineOrReturn的身影,因为这两个函数都是inline函数。
我们接着来看CoroutineIntrinsics.normalizeContinuation的实现:

1
2
fun <T> normalizeContinuation(continuation: Continuation<T>): Continuation<T> =
(continuation as? CoroutineImpl)?.facade ?: continuation

还记得我们刚刚分析过facade这个属性吗?他是对_facade的代理,这个函数返回的是经过拦截器处理过的Continuation
根据刚刚的字节码,我们可以发现suspend类型的函数,都会隐式额外接受一个当前协程的引用,但是又不能在函数中直接访问。

最后,还有两个上文出现过的线程切换处理类,postmandispatcher,使用的是单例模式:

1
2
3
4
5
6
7
8
9
10
11
12
object postman : Handler(Looper.getMainLooper()) {
override fun handleMessage(msg: Message?) {
msg?.callback?.run()
}
}

object dispatcher {
val mCachedThreads = Executors.newCachedThreadPool()
inline fun dispatch(noinline block: () -> Unit) {
mCachedThreads.execute(block)
}
}

到此,我们实现了一个简易的异步调用库!