// <https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt>
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}
从 kotlin coroutines 的 Hello World! 看起
final class Example_basic_01Kt$main$1 extends SuspendLambda implements Function2 {
int label;
// $FF: synthetic field
private Object L$0;
Example_basic_01Kt$main$1(Continuation $completion) {
super(2, $completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
CoroutineScope $this$runBlocking = (CoroutineScope)this.L$0;
BuildersKt.launch$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new 1((Continuation)null)), 3, (Object)null);
String var3 = "Hello";
boolean var4 = false;
System.out.println(var3);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation $completion) {
Example_basic_01Kt$main$1 var3 = new Example_basic_01Kt$main$1($completion);
var3.L$0 = value;
return (Continuation)var3;
}
@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation p2) {
return ((Example_basic_01Kt$main$1)this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
}
}
final class Example_basic_01Kt$main$1$1 extends SuspendLambda implements Function2 {
int label;
Example_basic_01Kt$main$1$1(Continuation $completion) {
super(2, $completion);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
Continuation var10001 = (Continuation)this;
this.label = 1;
if (DelayKt.delay(1000L, var10001) == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
String var2 = "World!";
boolean var3 = false;
System.out.println(var2);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation $completion) {
return (Continuation)(new Example_basic_01Kt$main$1$1($completion));
}
@Nullable
public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation p2) {
return ((Example_basic_01Kt$main$1$1)this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
}
}
需要先了解的是 launch 的参数 block: suspend CoroutineScope.() -> Unit 被编译为继承自 SuspendLamda 和 Function2<CoroutineScope, Continuation>,如下面的代码所示(decompile by bytecode-viewer)
SuspendLambda 的继承关系如下:SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation,所以 block 在库代码里一般称为 continuation
CoroutineScope.launch // return Job (实际上是 StandaloneCoroutine/LazyStandaloneCoroutine)
AbstractCoroutine.start // 创建 StandaloneCoroutine/LazyStandaloneCoroutine
CoroutineStart.invoke(block, receiver, completion) // CoroutineStart.DEFAULT
startCoroutineCancellable(receiver, completion, onCancellation)
createCoroutineUnintercepted // 此方法定义在 kotlin-stdlib 包的 /kotlin/coroutines/intrinsics/IntrinsicsJvm.kt 文件里
block.create(value = null, completion = coroutine) // 如上面反编译出来的代码所示,重新创建一个 block 的实例
ContinuationImpl.intercepted // 包装为 DispatchedContinuation(dispatcher 是 BlockingEventLoop)
BlockingEventLoop.interceptContinuation
Continuation.resumeCancellableWith // 将 block 放入任务队列
DispatchedContinuation.resumeCancellableWith
BlockingEventLoop.dispatch
BlockingEventLoop.enqueue
class EventLoopImplBase {
public fun enqueue(task: Runnable) {
if (enqueueImpl(task)) {
unpark()
} else {
DefaultExecutor.enqueue(task)
}
}
//_queue 是 Atomic<Any?>,当任务队列里只有一个任务时 _queue 持有此任务的引用,当任务队列里有多个任务时,_queue 是 Queue<Runnable>
private fun enqueueImpl(task: Runnable): Boolean {
_queue.loop { queue ->
if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
when (queue) {
null -> if (_queue.compareAndSet(null, task)) return true
is Queue<*> -> {
when ((queue as Queue<Runnable>).addLast(task)) {
Queue.ADD_SUCCESS -> return true
Queue.ADD_CLOSED -> return false
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
}
}
else -> when {
queue === CLOSED_EMPTY -> return false
else -> {
// update to full-blown queue to add one more
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
newQueue.addLast(task)
if (_queue.compareAndSet(queue, newQueue)) return true
}
}
}
}
}
}
// Infinite loop that reads this atomic variable and performs the specified action on its value.
public inline fun <T> AtomicRef<T>.loop(action: (T) -> Unit): Nothing {
while (true) {
action(value)
}
}
然后一路跟踪下去看看 launch 做了什么
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
private class BlockingCoroutine {
fun joinBlocking(): T {
registerTimeLoopThread()
try {
eventLoop?.incrementUseCount()
try {
while (true) {
@Suppress("DEPRECATION")
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// note: process next even may loose unpark flag, so check if completed before parking
if (isCompleted) break
parkNanos(this, parkNanos)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// now return result
val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
}
至此知道了 block 是被放到了任务队列里,那是谁在执行任务队列里的任务呢?这就不得不说起 runBlocking
runBlocking 通过 BlockingCoroutine.joinBlocking 不断地执行任务队列里的任务(EventLoopBase.processNextEvent)直到队列为空,因此它是 blocking method;第一个加入到任务队列的是 runBlocking block,所以 Hello World! 示例里会先输出 Hello 然后执行第二个 coroutine 输出 World!
launch 把 block 放入任务队列等待执行,类似于 ExecutorService.submit 和 Handler.post,同时说明 协程 本质上就是对任务的调度,底层是线程/线程池 + 任务队列
public interface CoroutineContext {
/**
* Returns the element with the given [key] from this context or `null`.
*/
public operator fun <E : Element> get(key: Key<E>): E?
}
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
}
首先要了解下 CoroutineContext 这个概念,跟 Android 上 Context 的意义是一样的,就是代表了一系列 coroutine API 的 上下文,本质上是一个 Map,通过 CoroutineContext[key] = value 存取
其中有一个非常重要的组件:CoroutineContext[ContinuationInterceptor],负责分发/调度 coroutine,有 BlockingEventLoop, Dispatchers.Default, Dispatchers.IO 等等
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
通过 CoroutineScope.launch 启动 coroutine 时,新创建的 coroutine 会继承 scpoe context(launch 的参数 context 会覆盖相同 key 的 scope context element),而且当 context 不含 interceptor 时主动添加 Dispatchers.Default 作为 Dispatcher,也就是说 Dispatcher/CoroutineContext[ContinuationInterceptor] 作为任务调度器是一个 必不可少 的 coroutine 组件
CoroutineScope.launch
AbstractCoroutine.start
CoroutineStart.invoke(block, receiver, completion)
startCoroutineCancellable(receiver, completion, onCancellation)
createCoroutineUnintercepted
block.create(value = null, completion = coroutine)
ContinuationImpl.intercepted
ContinuationInterceptor.interceptContinuation
Continuation.resumeCancellableWith
DispatchedContinuation.resumeCancellableWith
CoroutineDispatcher.dispatch
// kotlin-stdlib - kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl) //block -> SuspendLambda -> ContinuationImpl -> BaseContinuationImpl
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}
internal abstract class ContinuationImpl {
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
class CoroutineDispatcher {
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
现在回顾下 launch
createCoroutineUnintercepted - 创建 block 实例 Continuation,并使其获得 contextContinuationImpl.intercepted - 从 context 里取出 Dispatcher,由它负责把 Continuation 包装成 DispatchedContinuation(Dispatcher + Continuation),这样 任务 和 调度器 就齐活了Continuation.resumeCancellableWith - 让 Dispatcher 负责调度 Continuation上一章节分析了 coroutine 本质上就是任务队列里的任务,并简单提了下 BlockingEventLoop 这个调度器,这一章节分析各种各样的调度器
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
internal object ThreadLocalEventLoop {
private val ref = CommonThreadLocal<EventLoop?>()
internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
}
// <https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/EventLoop.kt>
internal class BlockingEventLoop(
override val thread: Thread
) : EventLoopImplBase()
internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
class EventLoopImplBase {
private val _queue = atomic<Any?>(null)
public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
public fun enqueue(task: Runnable) {
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
DefaultExecutor.enqueue(task)
}
}
private fun enqueueImpl(task: Runnable): Boolean {
_queue.loop { queue ->
if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
when (queue) {
null -> if (_queue.compareAndSet(null, task)) return true
is Queue<*> -> {
when ((queue as Queue<Runnable>).addLast(task)) {
Queue.ADD_SUCCESS -> return true
Queue.ADD_CLOSED -> return false
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
}
}
else -> when {
queue === CLOSED_EMPTY -> return false
else -> {
// update to full-blown queue to add one more
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
newQueue.addLast(task)
if (_queue.compareAndSet(queue, newQueue)) return true
}
}
}
}
}
}
runBlocking 会往 context 添加 BlockingEventLoop
Queue 的任务队列;只有一个任务时 _queue 直接引用这个任务节省空间,多个任务时 _queue 才引用 QueueThreadLocal 的,跟 Looper 一样每个线程单独绑定一个 BlockingEventLoopdispatch 只是简单地将任务入队,然后唤醒对应的线程fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
val job = scope.launch {
println("Thread name: ${Thread.currentThread().name}")
}
job.join()
}
上一章节说过 Coroutine.launch 开启协程时,如果 context 不包含 dispatcher,会自动添加默认的 dispatcher: Dispatchers.Default,如下示例代码
public actual object Dispatchers {
/**
* The default [CoroutineDispatcher] that is used by all standard builders like
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used
* by this dispatcher is equal to the number of CPU cores, but is at least two.
* Level of parallelism X guarantees that no more than X tasks can be executed in this dispatcher in parallel.
*/
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
}
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
internal object DefaultScheduler : ExperimentalCoroutineDispatcher()
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable
从下面的代码可以看到 Dispatchers.Default 最终是通过 CoroutineScheduler 进行任务调度的,CoroutineScheduler 的代码量比较大这里就不深入分析了,但从它的构造函数参数 corePoolSize、maxPoolSize 和 idleWorkerKeepAliveNs 这些熟悉的概念可以看出它是个 线程池
public actual object Dispatchers {
/**
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
*
* Additional threads in this pool are created and are shutdown on demand.
* The number of threads used by tasks in this dispatcher is limited by the value of
* `kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
*
* Moreover, the maximum configurable number of threads is capped by the
* `kotlinx.coroutines.scheduler.max.pool.size` system property.
* If you need a higher number of parallel threads,
* you should use a custom dispatcher backed by your own thread pool.
*
* ### Implementation note
*
* This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using
* `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default]
* dispatcher does not lead to an actual switching to another thread — typically execution
* continues in the same thread.
* As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used)
* during operations over IO dispatcher.
*/
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), // 64
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING // 1
)
}
private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
private val parallelism: Int,
private val name: String?,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
private val queue = ConcurrentLinkedQueue<Runnable>()
private val inFlightTasks = atomic(0)
override val executor: Executor
get() = this
override fun execute(command: Runnable) = dispatch(command, false)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
// Commit in-flight tasks slot
val inFlight = inFlightTasks.incrementAndGet()
// Fast path, if parallelism limit is not reached, dispatch task and return
if (inFlight <= parallelism) {
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
// Parallelism limit is reached, add task to the queue
queue.add(taskToSchedule)
/*
* We're not actually scheduled anything, so rollback committed in-flight task slot:
* If the amount of in-flight tasks is still above the limit, do nothing
* If the amount of in-flight tasks is lesser than parallelism, then
* it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue
* to avoid starvation.
*
* Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
*
* T1: submit task, start execution, R == 1
* T2: commit slot for next task, R == 2
* T1: finish T1, R == 1
* T2: submit next task to local queue, decrement R, R == 0
* Without retries, task from T2 will be stuck in the local queue
*/
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}
taskToSchedule = queue.poll() ?: return
}
}
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatch(block, tailDispatch = true)
}
override fun toString(): String {
return name ?: "${super.toString()}[dispatcher = $dispatcher]"
}
/**
* Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
*
* Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid
* non-blocking continuations starvation.
* E.g. for
* ```
* foo()
* blocking()
* bar()
* ```
* it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
*/
override fun afterTask() {
var next = queue.poll()
// If we have pending tasks in current blocking context, dispatch first
if (next != null) {
dispatcher.dispatchWithContext(next, this, true)
return
}
inFlightTasks.decrementAndGet()
/*
* Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue.
* Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
* T1: submit task, start execution, R == 1
* T2: commit slot for next task, R == 2
* T1: finish T1, poll queue (it's still empty), R == 2
* T2: submit next task to the local queue, decrement R, R == 1
* T1: decrement R, finish. R == 0
*
* The task from T2 is stuck is the local queue
*/
next = queue.poll() ?: return
dispatch(next, true)
}
}
Dispatchers.IO 如注释所说的,是一个用以承载阻塞型 IO 操作的线程池,线程池是由上面说的 CoroutineScheduler 实现的,而它本身之所以叫做 LimitingDispatcher 是因为它限制了并发任务数为 64:
inFlightTasks 记录了并发任务数,如果小于阈值 parallelism(默认 64)则提交到线程池queue: ConcurrentLinkedQueue 里,等提交到线程池的任务执行完自身逻辑,空出一个并发任务的位置后,再从 queue 里取一个等待中的任务提交到线程池Dispatchers.Default 和 Dispatcher.IO 并没有实现 Delay 接口,如上面反编译后的 getDelay 所示,是由 DefaultDelay 来实现的