updated: 2023/07/04

  1. 冷流和热流
// 冷流:当出现订阅者/观察者时,才开始计算结果,并且每个订阅者收到的都是独立的、互不相关的结果
val colors = flow {
    emit("Red")
    delay(100)
    emit("Green")
    delay(100)
    emit("Blue")
}
launch(context = Dispatchers.IO) {
    colors.collect { }
}
launch(context = Dispatchers.IO) {
    colors.collect { }
}
// 热流:计算结果不依赖于是否拥有订阅者/观察者,即使没有订阅者也可以发射数据
val state = MutableStateFlow("Red")
state.emit("Green")
launch(context = Dispatchers.IO) {
    state.collect { }
}
state.emit("Blue")
  1. flow、SharedFlow、MutableSharedFlow、StateFlow、MutableStateFlow 之间的区别?

可以将它们归为两类:MutableStateFlow 和 MutableSharedFlow

StateFlow 实现类是 StateFlowImpl,可以理解为一个可以被观测/订阅的值;初始化时必须提供一个默认值,value 表示当前的值,setValue、emit、tryEmit 和 compareAndSet 作用都是一样的就是修改当前值;订阅时(collect)会立即收到当前值,后续的修改也会通知订阅者

StateFlow 适合作为 UI 的状态,只关心当前值不关心历史值(渲染内容只跟当前值有关),并且需要默认值(不能渲染空白)

SharedFlow 的核心是一个缓冲区(replay + extraBufferCapacity + emitor),它包含已计算出来的结果(replay + extraBufferCapacity),以及因为存放计算结果的缓冲区的容量不足导致“阻塞”的 emitor

SharedFlow 没有当前值(value)的概念,取而代之的是 replayCache,一个大小为 replay 的集合(对比 value 是单个值),新订阅者会立即收到包含历史值的 replayCache,StateFlow 相当于 replay = 1 的 SharedFlow

val state = MutableSharedFlow<String>(
    replay = 2,
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.SUSPEND)
state.emit("Red")
state.emit("Green")
state.emit("Grey")
launch(context = Dispatchers.IO) {
    state.collect {
        Log.i("cyrus", "#1 $it")
    }
}
launch(context = Dispatchers.IO) {
    state.collect {
        Log.i("cyrus", "#2 $it")
    }
}

// #1 Green
// #1 Grey
// #2 Green
// #2 Grey

存放结果的缓冲区是有容量限制的:replay + extraBufferCapacity,当结果缓冲区满了,emitor 会“阻塞”(BufferOverflow.SUSPEND,按顺序 append 至缓冲区)直到缓冲区里最旧的元素被订阅者消费,空出一个新位置,从而恢复阻塞的 emitor 并用其发射的值替换 emitor,相当于将新值 append 至结果缓冲区

onBufferOverflow 是溢出策略,可选不阻塞而是替换缓冲区中最旧的(DROP_OLDEST)、或者最新的(DROP_LATEST)

val state = MutableSharedFlow<String>(
    replay = 0,
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.SUSPEND)
val emit: suspend (String) -> Unit = {
    Log.i("cyrus", "#0 start $it")
    state.emit(it)
    Log.i("cyrus", "#0 end $it")
}
launch(context = Executors.newSingleThreadExecutor().asCoroutineDispatcher()) { // 注意得是单线程
    state.collect {
        Log.i("cyrus", "#1 $it")
        delay(1000)
    }
}
delay(1000)
emit("Red")
emit("Green")
emit("Grey")
emit("Blue")
emit("White")
emit("Pink")
emit("Yellow")

43.956 #0 start Red    // Red 被立即消费掉
43.957 #0 end Red
43.957 #0 start Green  // 此时消费者被 delay 了,所以 Green 放到缓冲区
43.958 #0 end Green
43.958 #0 start Grey   // 缓冲区里是 Green,Grey 没有位置,发射被阻塞
43.961 #1 Red          // Red 被立即消费
44.963 #1 Green        // 1s 后消费 Green
44.966 #0 end Grey     // 缓冲区的 Green 被消费,空出位置放 Grey
44.966 #0 start Blue   // 但 Blue 又因缓冲区满了被阻塞
45.965 #1 Grey         // Green 后 1s 消费缓冲区的 Grey
45.966 #0 end Blue     // 把 Blue 放入缓冲区
46.967 #1 Blue

SharedFlow 适合用作事件总线 EventBus,无需历史值,缓冲区保证事件按顺序被消费且无遗漏:replay = 0,extraBufferCapacity = Int.MAX_VALUE

流量、内存、CPU 等资源的采样器,只保留最近 N 个采样用以计算平均值:replay = N,extraBufferCapacity = 0,onBufferOverflow = BufferOverflow.DROP_OLDEST

replay = 1,extraBufferCapacity = 0,onBufferOverflow = BufferOverflow.SUSPEND 此时相当于 StateFlow