// <https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-basic-01.kt>
fun main() = runBlocking { // this: CoroutineScope
    launch { // launch a new coroutine and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello") // main coroutine continues while a previous one is delayed
}

launch - 启动协程

从 kotlin coroutines 的 Hello World! 看起

final class Example_basic_01Kt$main$1 extends SuspendLambda implements Function2 {
   int label;
   // $FF: synthetic field
   private Object L$0;

   Example_basic_01Kt$main$1(Continuation $completion) {
      super(2, $completion);
   }

   @Nullable
   public final Object invokeSuspend(@NotNull Object var1) {
      Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(this.label) {
      case 0:
         ResultKt.throwOnFailure(var1);
         CoroutineScope $this$runBlocking = (CoroutineScope)this.L$0;
         BuildersKt.launch$default($this$runBlocking, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new 1((Continuation)null)), 3, (Object)null);
         String var3 = "Hello";
         boolean var4 = false;
         System.out.println(var3);
         return Unit.INSTANCE;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }
   }

   @NotNull
   public final Continuation create(@Nullable Object value, @NotNull Continuation $completion) {
      Example_basic_01Kt$main$1 var3 = new Example_basic_01Kt$main$1($completion);
      var3.L$0 = value;
      return (Continuation)var3;
   }

   @Nullable
   public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation p2) {
      return ((Example_basic_01Kt$main$1)this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
   }
}

final class Example_basic_01Kt$main$1$1 extends SuspendLambda implements Function2 {
   int label;

   Example_basic_01Kt$main$1$1(Continuation $completion) {
      super(2, $completion);
   }

   @Nullable
   public final Object invokeSuspend(@NotNull Object $result) {
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(this.label) {
      case 0:
         ResultKt.throwOnFailure($result);
         Continuation var10001 = (Continuation)this;
         this.label = 1;
         if (DelayKt.delay(1000L, var10001) == var4) {
            return var4;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      String var2 = "World!";
      boolean var3 = false;
      System.out.println(var2);
      return Unit.INSTANCE;
   }

   @NotNull
   public final Continuation create(@Nullable Object value, @NotNull Continuation $completion) {
      return (Continuation)(new Example_basic_01Kt$main$1$1($completion));
   }

   @Nullable
   public final Object invoke(@NotNull CoroutineScope p1, @Nullable Continuation p2) {
      return ((Example_basic_01Kt$main$1$1)this.create(p1, p2)).invokeSuspend(Unit.INSTANCE);
   }
}

需要先了解的是 launch 的参数 block: suspend CoroutineScope.() -> Unit 被编译为继承自 SuspendLamdaFunction2<CoroutineScope, Continuation>,如下面的代码所示(decompile by bytecode-viewer

SuspendLambda 的继承关系如下:SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation,所以 block 在库代码里一般称为 continuation

CoroutineScope.launch                              // return Job (实际上是 StandaloneCoroutine/LazyStandaloneCoroutine)
AbstractCoroutine.start                            // 创建 StandaloneCoroutine/LazyStandaloneCoroutine
CoroutineStart.invoke(block, receiver, completion) // CoroutineStart.DEFAULT
startCoroutineCancellable(receiver, completion, onCancellation)
    createCoroutineUnintercepted       // 此方法定义在 kotlin-stdlib 包的 /kotlin/coroutines/intrinsics/IntrinsicsJvm.kt 文件里
        block.create(value = null, completion = coroutine) // 如上面反编译出来的代码所示,重新创建一个 block 的实例
    ContinuationImpl.intercepted       // 包装为 DispatchedContinuation(dispatcher 是 BlockingEventLoop)
        BlockingEventLoop.interceptContinuation
    Continuation.resumeCancellableWith // 将 block 放入任务队列
        DispatchedContinuation.resumeCancellableWith
        BlockingEventLoop.dispatch
            BlockingEventLoop.enqueue

class EventLoopImplBase {
    public fun enqueue(task: Runnable) {
        if (enqueueImpl(task)) {
            unpark()
        } else {
            DefaultExecutor.enqueue(task)
        }
    }

   //_queue 是 Atomic<Any?>,当任务队列里只有一个任务时 _queue 持有此任务的引用,当任务队列里有多个任务时,_queue 是 Queue<Runnable>
    private fun enqueueImpl(task: Runnable): Boolean {
        _queue.loop { queue ->
            if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
            when (queue) {
                null -> if (_queue.compareAndSet(null, task)) return true
                is Queue<*> -> {
                    when ((queue as Queue<Runnable>).addLast(task)) {
                        Queue.ADD_SUCCESS -> return true
                        Queue.ADD_CLOSED -> return false
                        Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
                    }
                }
                else -> when {
                    queue === CLOSED_EMPTY -> return false
                    else -> {
                        // update to full-blown queue to add one more
                        val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
                        newQueue.addLast(queue as Runnable)
                        newQueue.addLast(task)
                        if (_queue.compareAndSet(queue, newQueue)) return true
                    }
                }
            }
        }
    }
}

// Infinite loop that reads this atomic variable and performs the specified action on its value.
public inline fun <T> AtomicRef<T>.loop(action: (T) -> Unit): Nothing {
    while (true) {
        action(value)
    }
}

然后一路跟踪下去看看 launch 做了什么

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    val currentThread = Thread.currentThread()
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    if (contextInterceptor == null) {
        // create or use private event loop if no dispatcher is specified
        eventLoop = ThreadLocalEventLoop.eventLoop
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {
        // See if context's interceptor is an event loop that we shall use (to support TestContext)
        // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
            ?: ThreadLocalEventLoop.currentOrNull()
        newContext = GlobalScope.newCoroutineContext(context)
    }
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking()
}

private class BlockingCoroutine {
    fun joinBlocking(): T {
        registerTimeLoopThread()
        try {
            eventLoop?.incrementUseCount()
            try {
                while (true) {
                    @Suppress("DEPRECATION")
                    if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
                    val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
                    // note: process next even may loose unpark flag, so check if completed before parking
                    if (isCompleted) break
                    parkNanos(this, parkNanos)
                }
            } finally { // paranoia
                eventLoop?.decrementUseCount()
            }
        } finally { // paranoia
            unregisterTimeLoopThread()
        }
        // now return result
        val state = this.state.unboxState()
        (state as? CompletedExceptionally)?.let { throw it.cause }
        return state as T
    }
}

至此知道了 block 是被放到了任务队列里,那是谁在执行任务队列里的任务呢?这就不得不说起 runBlocking

runBlocking 通过 BlockingCoroutine.joinBlocking 不断地执行任务队列里的任务(EventLoopBase.processNextEvent)直到队列为空,因此它是 blocking method;第一个加入到任务队列的是 runBlocking block,所以 Hello World! 示例里会先输出 Hello 然后执行第二个 coroutine 输出 World!

总结

launch 把 block 放入任务队列等待执行,类似于 ExecutorService.submitHandler.post,同时说明 协程 本质上就是对任务的调度,底层是线程/线程池 + 任务队列

public interface CoroutineContext {
    /**
     * Returns the element with the given [key] from this context or `null`.
     */
    public operator fun <E : Element> get(key: Key<E>): E?
}

public interface ContinuationInterceptor : CoroutineContext.Element {
    /**
     * The key that defines *the* context interceptor.
     */
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
}

Dispatcher - 任务调度

首先要了解下 CoroutineContext 这个概念,跟 Android 上 Context 的意义是一样的,就是代表了一系列 coroutine API 的 上下文,本质上是一个 Map,通过 CoroutineContext[key] = value 存取

其中有一个非常重要的组件:CoroutineContext[ContinuationInterceptor],负责分发/调度 coroutine,有 BlockingEventLoop, Dispatchers.Default, Dispatchers.IO 等等

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

通过 CoroutineScope.launch 启动 coroutine 时,新创建的 coroutine 会继承 scpoe context(launch 的参数 context 会覆盖相同 key 的 scope context element),而且当 context 不含 interceptor 时主动添加 Dispatchers.Default 作为 Dispatcher,也就是说 Dispatcher/CoroutineContext[ContinuationInterceptor] 作为任务调度器是一个 必不可少 的 coroutine 组件

CoroutineScope.launch
AbstractCoroutine.start
CoroutineStart.invoke(block, receiver, completion)
startCoroutineCancellable(receiver, completion, onCancellation)
    createCoroutineUnintercepted
        block.create(value = null, completion = coroutine)
    ContinuationImpl.intercepted
        ContinuationInterceptor.interceptContinuation
    Continuation.resumeCancellableWith
        DispatchedContinuation.resumeCancellableWith
        CoroutineDispatcher.dispatch

// kotlin-stdlib - kotlin/coroutines/intrinsics/IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl) //block -> SuspendLambda -> ContinuationImpl -> BaseContinuationImpl
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

internal abstract class ContinuationImpl {
    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
}

class CoroutineDispatcher {
   public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
}

现在回顾下 launch

上一章节分析了 coroutine 本质上就是任务队列里的任务,并简单提了下 BlockingEventLoop 这个调度器,这一章节分析各种各样的调度器

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    val currentThread = Thread.currentThread()
    val contextInterceptor = context[ContinuationInterceptor]
    val eventLoop: EventLoop?
    val newContext: CoroutineContext
    if (contextInterceptor == null) {
        // create or use private event loop if no dispatcher is specified
        eventLoop = ThreadLocalEventLoop.eventLoop
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
    } else {
        // See if context's interceptor is an event loop that we shall use (to support TestContext)
        // or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
            ?: ThreadLocalEventLoop.currentOrNull()
        newContext = GlobalScope.newCoroutineContext(context)
    }
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking()
}

internal object ThreadLocalEventLoop {
    private val ref = CommonThreadLocal<EventLoop?>()

    internal val eventLoop: EventLoop
        get() = ref.get() ?: createEventLoop().also { ref.set(it) }
}

// <https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/jvm/src/EventLoop.kt>
internal class BlockingEventLoop(
    override val thread: Thread
) : EventLoopImplBase()

internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())

class EventLoopImplBase {
    private val _queue = atomic<Any?>(null)

    public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)

    public fun enqueue(task: Runnable) {
        if (enqueueImpl(task)) {
            // todo: we should unpark only when this delayed task became first in the queue
            unpark()
        } else {
            DefaultExecutor.enqueue(task)
        }
    }

    private fun enqueueImpl(task: Runnable): Boolean {
        _queue.loop { queue ->
            if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
            when (queue) {
                null -> if (_queue.compareAndSet(null, task)) return true
                is Queue<*> -> {
                    when ((queue as Queue<Runnable>).addLast(task)) {
                        Queue.ADD_SUCCESS -> return true
                        Queue.ADD_CLOSED -> return false
                        Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
                    }
                }
                else -> when {
                    queue === CLOSED_EMPTY -> return false
                    else -> {
                        // update to full-blown queue to add one more
                        val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
                        newQueue.addLast(queue as Runnable)
                        newQueue.addLast(task)
                        if (_queue.compareAndSet(queue, newQueue)) return true
                    }
                }
            }
        }
    }
}

BlockingEventLoop / runBlocking

runBlocking 会往 context 添加 BlockingEventLoop

fun main() = runBlocking {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job = scope.launch {
        println("Thread name: ${Thread.currentThread().name}")
    }
    job.join()
}

Dispatchers.Default

上一章节说过 Coroutine.launch 开启协程时,如果 context 不包含 dispatcher,会自动添加默认的 dispatcher: Dispatchers.Default,如下示例代码

public actual object Dispatchers {
    /**
     * The default [CoroutineDispatcher] that is used by all standard builders like
     * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
     * if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
     *
     * It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used
     * by this dispatcher is equal to the number of CPU cores, but is at least two.
     * Level of parallelism X guarantees that no more than X tasks can be executed in this dispatcher in parallel.
     */
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
}

internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool

internal object DefaultScheduler : ExperimentalCoroutineDispatcher()

public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {

    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)

    override val executor: Executor
        get() = coroutineScheduler

    // This is variable for test purposes, so that we can reinitialize from clean state
    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.dispatch(context, block)
        }

    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable

从下面的代码可以看到 Dispatchers.Default 最终是通过 CoroutineScheduler 进行任务调度的,CoroutineScheduler 的代码量比较大这里就不深入分析了,但从它的构造函数参数 corePoolSizemaxPoolSizeidleWorkerKeepAliveNs 这些熟悉的概念可以看出它是个 线程池

public actual object Dispatchers {
    /**
     * The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
     *
     * Additional threads in this pool are created and are shutdown on demand.
     * The number of threads used by tasks in this dispatcher is limited by the value of
     * `kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
     * It defaults to the limit of 64 threads or the number of cores (whichever is larger).
     *
     * Moreover, the maximum configurable number of threads is capped by the
     * `kotlinx.coroutines.scheduler.max.pool.size` system property.
     * If you need a higher number of parallel threads,
     * you should use a custom dispatcher backed by your own thread pool.
     *
     * ### Implementation note
     *
     * This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using
     * `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default]
     * dispatcher does not lead to an actual switching to another thread &mdash; typically execution
     * continues in the same thread.
     * As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used)
     * during operations over IO dispatcher.
     */
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), // 64
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING // 1
    )
}

private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {

    private val queue = ConcurrentLinkedQueue<Runnable>()
    private val inFlightTasks = atomic(0)

    override val executor: Executor
        get() = this

    override fun execute(command: Runnable) = dispatch(command, false)

    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {
            // Commit in-flight tasks slot
            val inFlight = inFlightTasks.incrementAndGet()

            // Fast path, if parallelism limit is not reached, dispatch task and return
            if (inFlight <= parallelism) {
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }

            // Parallelism limit is reached, add task to the queue
            queue.add(taskToSchedule)

            /*
             * We're not actually scheduled anything, so rollback committed in-flight task slot:
             * If the amount of in-flight tasks is still above the limit, do nothing
             * If the amount of in-flight tasks is lesser than parallelism, then
             * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue
             * to avoid starvation.
             *
             * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
             *
             * T1: submit task, start execution, R == 1
             * T2: commit slot for next task, R == 2
             * T1: finish T1, R == 1
             * T2: submit next task to local queue, decrement R, R == 0
             * Without retries, task from T2 will be stuck in the local queue
             */
            if (inFlightTasks.decrementAndGet() >= parallelism) {
                return
            }

            taskToSchedule = queue.poll() ?: return
        }
    }

    override fun dispatchYield(context: CoroutineContext, block: Runnable) {
        dispatch(block, tailDispatch = true)
    }

    override fun toString(): String {
        return name ?: "${super.toString()}[dispatcher = $dispatcher]"
    }

    /**
     * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
     *
     * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid
     * non-blocking continuations starvation.
     * E.g. for
     * ```
     * foo()
     * blocking()
     * bar()
     * ```
     * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
     */
    override fun afterTask() {
        var next = queue.poll()
        // If we have pending tasks in current blocking context, dispatch first
        if (next != null) {
            dispatcher.dispatchWithContext(next, this, true)
            return
        }
        inFlightTasks.decrementAndGet()

        /*
         * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue.
         * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
         * T1: submit task, start execution, R == 1
         * T2: commit slot for next task, R == 2
         * T1: finish T1, poll queue (it's still empty), R == 2
         * T2: submit next task to the local queue, decrement R, R == 1
         * T1: decrement R, finish. R == 0
         *
         * The task from T2 is stuck is the local queue
         */
        next = queue.poll() ?: return
        dispatch(next, true)
    }
}

Dispatchers.IO

Dispatchers.IO 如注释所说的,是一个用以承载阻塞型 IO 操作的线程池,线程池是由上面说的 CoroutineScheduler 实现的,而它本身之所以叫做 LimitingDispatcher 是因为它限制了并发任务数为 64:

DefaultDelay

Dispatchers.DefaultDispatcher.IO 并没有实现 Delay 接口,如上面反编译后的 getDelay 所示,是由 DefaultDelay 来实现的