UP | HOME

异步流

Table of Contents

挂起函数可以异步的返回单个值,但是该如何异步返回多个计算好的值呢

  这正是 Kotlin 流(Flow)的用武之地

表示多个值

在 Kotlin 中可以使用集合来表示多个值

    比如说,可以拥有一个函数 foo(),它返回一个包含三个数字的 List, 然后使用 forEach 打印它们
fun foo(): List<Int> = listOf(1, 2, 3)

fun main() {
    foo().forEach { value -> println(value) } 
}

序列

如果使用一些消耗 CPU 资源的阻塞代码计算数字(每次计算需要 100 毫秒)那么可以使用 Sequence 来表示数字:

fun foo(): Sequence<Int> = sequence { // 序列构建器
                                      for (i in 1..3) {
                                          Thread.sleep(100) // 假装我们正在计算
                                          yield(i) // 产生下一个值
                                      }
}

fun main() {
    foo().forEach { value -> println(value) } 
}
     这段代码输出相同的数字,但在打印每个数字之前等待 100 毫秒

挂起

     然而,计算过程阻塞运行该代码的主线程

当这些值由异步代码计算时,可以使用 suspend 修饰符标记函数 foo, 这样它就可以在 不阻塞 的情况下执行其工作并将结果作为列表返回:

import kotlinx.coroutines.*                 

//sampleStart
suspend fun foo(): List<Int> {
    delay(1000) // 假装我们在这里做了一些异步的事情
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    foo().forEach { value -> println(value) } 
}
//sampleEnd
     这段代码将会在等待一秒之后打印数字

     使用 List 结果类型,意味着只能一次返回所有值

为了表示异步计算的值流 stream ,可以使用 Flow 类型(正如同步计算值会使用 Sequence 类型):

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun foo3(): Flow<Int> = flow { // 流构建器
                               for (i in 1..3) {
                                   delay(100) // 假装我们在这里做了一些有用的事情
                                   emit(i) // 发送下一个值
                               }
}

fun main() = runBlocking {
    // 启动并发的协程以验证主线程并未阻塞
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // 收集这个流
    foo3().collect { value -> println(value) }
}
     这段代码在不阻塞主线程的情况下每等待 100 毫秒打印一个数字

     在主线程中运行一个单独的协程每 100 毫秒打印一次 “I'm not blocked” 已经经过了验证

输出如下:

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

注意使用 Flow 的代码与先前示例的下述区别:

  • 名为 flowFlow 类型构建器 函数
  • flow { … } 构建块 中的代码 可以挂起
  • 函数 foo () 不再 标有 suspend 修饰符
  • 流使用 emit 函数 发射
  • 流使用 collect 函数 收集
     可以在 foo 的 flow { ... } 函数体内使用 delay 代替 Thread.sleep 以观察主线程在本案例中被阻塞了

流是异步的

Flow 是一种类似于序列的异步流

flow 构建器中的代码直到流被收集的时候才运行

这在以下的示例中非常明显:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun foo4(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    println("Calling foo...")
    val flow = foo4()
    println("Calling collect...")
    flow.collect { value -> println(value) }
    println("Calling collect again...")
    flow.collect { value -> println(value) }
}

输出如下:

Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
    返回一个流的 foo() 函数没有标记 suspend 修饰符的主要原因:通过它自己,foo() 会尽快返回且不会进行任何等待


    流在每次收集的时候启动:当再次调用 collect 时会看到“Flow started”

取消流

流采用与协程同样的协作取消。然而,流的基础设施未引入其他取消点。取消完全透明

    像往常一样,流的收集只有在当流在一个可取消的挂起函数(例如 delay)中挂起的时候取消,否则不能取消

下面的示例展示了当 withTimeoutOrNull 块中代码在运行的时候流是如何在超时的情况下取消并停止执行其代码的:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull

fun foo5(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    withTimeoutOrNull(250) { // 在 250 毫秒后超时
                             foo5().collect { value -> println(value) }
    }
    println("Done")
}

在 foo() 函数中流仅发射两个数字,产生以下输出:

Emitting 1
1
Emitting 2
2
Done

流构建器

    先前示例中的 flow { ... } 构建器是最基础的一个

还有其它构建器使流的声明更简单:

  • flowOf 构建器定义了一个发射 固定值 集的流
  • 使用 .asFlow() 扩展函数,可以将各种集合与序列转换为流

因此,从流中打印从 1 到 3 的数字的示例可以写成:

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    // 将一个整数区间转化为流
    (1..3).asFlow().collect { value -> println(value) }
}

过渡流操作符

    可以使用操作符转换流,就像使用集合与序列一样

过渡操作符 应用上游流 ,并 返回 下游流 ,基础的操作符拥有相似的名字,比如 map 与 filter

  • 这些操作符也是冷操作符,就像流一样

          这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义
    
  • 流与序列的主要区别在于这些操作符中的代码 可以 调用 挂起 函数

举例来说,一个请求中的流可以使用 map 操作符映射出结果,即使执行一个长时间的请求操作也可以使用挂起函数来实现:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking

suspend fun performRequest(request: Int): String {
    delay(1000) // 模仿长时间运行的异步工作
    return "response $request"
}

fun main() = runBlocking {
    (1..3).asFlow() // 一个请求流
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

它产生以下三行,每一行每秒出现一次:

response 1
response 2
response 3

转换操作符

在流转换操作符中,最通用的一种称为 transform 。它可以用来模仿简单的转换,例如 map 与 filter,以及实施更复杂的转换。 使用 transform 操作符,可以 发射 任意值任意次

     比如说,使用 transform 可以在执行长时间运行的异步请求之前发射一个字符串并跟踪这个响应
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    (1..3).asFlow() // 一个请求流
        .transform { request ->
                         emit("Making request $request")
                     emit(performRequest(request))
        }
        .collect { response -> println(response) }
}

这段代码的输出如下:

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

限长操作符

限长 过渡操作符(例如 take )在流 触及 相应 限制 的时候会将它的 执行 取消

     协程中的取消操作总是通过抛出异常来执行,这样所有的资源管理函数(如 try {...} finally {...} 块)会在取消的情况下正常运行
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking

fun numbers(): Flow<Int> = flow {
    try {                          
                                   emit(1)
                                   emit(2) 
                                   println("This line will not execute")
                                   emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // 只获取前两个
        .collect { value -> println(value) }
}  
     这段代码的输出清楚地表明,numbers() 函数中对 flow {...} 函数体的执行在发射出第二个数字后停止
1
2
Finally in numbers

末端流操作符

末端操作符 是在流上用于 启动 流收集挂起 函数

collect 是最基础的末端操作符

还有另外一些更方便使用的末端操作符:

  • 转化为各种集合,例如 toListtoSet
  • 获取第一个 first 值与确保流发射单个 single 值的操作符
  • 使用 reducefold 将流规约到单个值
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val sum = (1..5).asFlow()
        .map { it * it } // 数字 1 至 5 的平方
        .reduce { a, b -> a + b } // 求和(末端操作符)
    println(sum)
}
  打印单个数字: 55 

流是连续的

流的每次单独收集都是按 顺序 执行的,除非进行特殊操作的操作符使用多个流。该收集过程直接在 协程 中运行,该协程 调用 末端操作符

  • 默认情况下不启动新协程
  • 从上游到下游 每个 过渡操作符 都会 处理 每个 发射出的值 然后再交给末端操作符
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0
        }
        .map {
            println("Map $it")
            "string $it"
        }.collect {
            println("Collect $it")
        }
}

输出:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
    过滤偶数并将其映射到字符串

流上下文

流的收集总是在 调用 协程上下文 中发生

    例如,如果有一个流 foo,然后以下代码在它的编写者指定的上下文中运行,而无论流 foo 的实现细节如何
withContext(context) {
    foo.collect { value ->
                      println(value) // 运行在指定上下文中
    }
}

流的该属性称为 上下文保存

    默认的,flow { ... } 构建器中的代码运行在相应流的收集器提供的上下文中

    举例来说,考虑打印线程的 foo 的实现, 它被调用并发射三个数字
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun foo(): Flow<Int> = flow {
    log("Started foo flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking {
    foo().collect { value -> log("Collected $value") }
}

运行这段代码:

[main @coroutine#1] Started foo flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
  由于 foo().collect 是在主线程调用的,则 foo 的流主体也是在主线程调用的

这是快速运行或异步代码的理想默认形式,它 不关心执行的上下文 并且 不会阻塞调用者

withContext 发出错误

     然而,长时间运行的消耗 CPU 的代码也许需要在 Dispatchers.Default 上下文中执行,并且更新 UI 的代码也许需要在 Dispatchers.Main 中执行

通常,withContext 用于在 Kotlin 协程中改变代码的上下文,但是 flow {…} 构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext

fun foo7(): Flow<Int> = flow {
    // 在流构建器中更改消耗 CPU 代码的上下文的错误方式
    withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
            emit(i) // 发射下一个值
        }
    }
}

fun main() = runBlocking {
    foo7().collect { value -> println(value) }
}    

这段代码产生如下的异常:

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutine{Active}@2eb45460, BlockingEventLoop@4861502a],
but emission happened in [DispatchedCoroutine{Active}@31ddf337, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead

flowOn 操作符

flowOn 函数用于 更改 流发射的上下文

     以下示例展示了更改流上下文的正确方法,该示例还通过打印相应线程的名字以展示它们的工作方式
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking

fun foo8(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // 假装我们以消耗 CPU 的方式进行计算
        log("Emitting $i")
        emit(i) // 发射下一个值
    }
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文的正确方式

fun main() = runBlocking {
    foo8().collect { value -> log("Collected $value") }
}

输出如下:

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
     收集发生在一个协程中(“coroutine#1”)而发射发生在运行于另一个线程中与收集协程并发运行的另一个协程(“coroutine#2”)中

当上游流必须改变其上下文中的 CoroutineDispatcher 的时候,flowOn 操作符 创建另一个协程

缓冲

从收集流所花费的时间来看,将流的不同部分运行在不同的协程中将会很有帮助,特别是当涉及到长时间运行的异步操作时

    例如 foo() 流的发射很慢,它每花费 100 毫秒才产生一个元素;而收集器也非常慢, 需要花费 300 毫秒来处理元素

    看看从该流收集三个数字要花费多长时间
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

fun foo9(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 假装我们异步等待了 100 毫秒
        emit(i) // 发射下一个值
    }
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        foo9().collect { value ->
                             delay(300) // 假装我们花费 300 毫秒来处理它
                         println(value)
        }
    }
    println("Collected in $time ms")
}

它会产生这样的结果,整个收集过程大约需要 1200 毫秒(3 个数字,每个花费 400 毫秒):

1
2
3
Collected in 1279 ms

可以在流上使用 buffer 操作符来 并发 运行 foo() 中发射元素的代码以及收集的代码, 而不是顺序运行它们:

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 假装我们异步等待了 100 毫秒
        emit(i) // 发射下一个值
    }
}

fun main() = runBlocking<Unit> { 
                                 val time = measureTimeMillis {
                                     foo()
                                         .buffer() // 缓冲发射项,无需等待
                                         .collect { value -> 
                                                        delay(300) // 假装我们花费 300 毫秒来处理它
                                                    println(value) 
                                         } 
                                 }   
                                 println("Collected in $time ms")
}

它产生了相同的数字,只是更快了:

1
2
3
Collected in 1071 ms
    由于高效地创建了处理流水线, 仅仅需要等待第一个数字产生的 100 毫秒以及处理每个数字各需花费的 300 毫秒

    这种方式大约花费了 1000 毫秒来运行

合并

     当流代表部分操作结果或操作状态更新时,可能没有必要处理每个值,而是只处理最新的那个

在本示例中,当收集器处理它们太慢的时候, conflate 操作符可以用于跳过中间值

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

fun foo10(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 假装我们异步等待了 100 毫秒
        emit(i) // 发射下一个值
    }
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        foo10()
            .conflate() // 合并发射项,不对每个值进行处理
            .collect { value ->
                           delay(300) // 假装我们花费 300 毫秒来处理它
                       println(value)
            }
    }
    println("Collected in $time ms")
}

虽然第一个数字仍在处理中,但第二个和第三个数字已经产生,因此第二个是 conflated ,只有最新的(第三个)被交付给收集器:

1
3
Collected in 758 ms

处理最新值

     当发射器和收集器都很慢的时候,合并是加快处理速度的一种方式。它通过删除发射值来实现

另一种方式是 取消 缓慢的收集器,并在每次 发射 新值的时候 重新启动 它。有一组与 xxx 操作符执行相同基本逻辑的 xxxLatest 操作符,但是在新值产生的时候取消执行其块中的代码。在先前的示例中尝试更换 conflate 为 collectLatest

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis

fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // 假装我们异步等待了 100 毫秒
        emit(i) // 发射下一个值
    }
}

fun main() = runBlocking {
    val time = measureTimeMillis {
        foo()
            .collectLatest { value -> // 取消并重新发射最后一个值
                             println("Collecting $value")
                             delay(300) // 假装我们花费 300 毫秒来处理它
                             println("Done $value")
            }
    }
    println("Collected in $time ms")
}

由于 collectLatest 的函数体需要花费 300 毫秒,但是新值每 100 秒发射一次,看到该代码块对每个值运行,但是只收集最后一个值:

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms

组合多个流

组合多个流有很多种方式

zip

   就像 Kotlin 标准库中的 Sequence.zip 扩展函数一样

流拥有一个 zip 操作符用于 组合 两个流中的相关值:

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val nums = (1..3).asFlow() // 数字 1..3
    val strs = flowOf("one", "two", "three") // 字符串
    nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串
        .collect { println(it) } // 收集并打印
}

示例打印如下:

1 -> one
2 -> two
3 -> three

Combine

当流表示一个变量或操作的最新值时,可能需要执行计算,这依赖于相应流的最新值,并且每当上游流产生值的时候都需要重新计算。这种相应的操作符家族称为 combine

     例如,先前示例中的数字如果每 300 毫秒更新一次,但字符串每 400 毫秒更新一次, 然后使用 zip 操作符合并它们,但仍会产生相同的结果, 尽管每 400 毫秒打印一次结果
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
    val startTime = System.currentTimeMillis() // 记录开始的时间
    nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
        .collect { value -> // 收集并打印
                   println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

输出如下:

1 -> one at 493 ms from start
2 -> two at 893 ms from start
3 -> three at 1297 ms from start
     示例中使用 onEach 过渡操作符来延时每次元素发射并使该流更具说明性以及更简洁

然而,当在这里使用 combine 操作符来替换 zip:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> { 
                                 val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
                                 val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
                                 val startTime = System.currentTimeMillis() // 记录开始的时间
                                 nums.combine(strs) { a, b -> "$a -> $b" } // 使用“combine”组合单个字符串
                                     .collect { value -> // 收集并打印
                                                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
                                     } 
}

这次得到了完全不同的输出,其中,nums 或 strs 流中的每次发射都会打印一行:

1 -> one at 532 ms from start
2 -> one at 737 ms from start
2 -> two at 933 ms from start
3 -> two at 1038 ms from start
3 -> three at 1334 ms from start

流展平

    流是用来异步地接收一系列的值,所以它很容易遇到这样的场景:每个值都会触发一个请求去获取另外一系列的值

例如:下面的函数会先发射字符串 "First",再间隔500ms发射另一个字符串 "Second"

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

现在有一条由三个整形值组成的流,其中每一个值都去调用 requestFlow 函数:

(1..3).asFlow().map { requestFlow(it) }

最后就得到了一条由流组成的流 Flow<Flow<String>> ,如果要进一步处理,就需要把它 展平 到一条单独的流去

    集合和Sequence有对应的 flatten 和 flattenMap 操作符

    然而由于流的异步特性,展平的时候会有不同的模式,因此流也有一系列的操作符对应不同的模式

flatMapConcat

Concatenating 模式通过 flatMapConcatflattenConcat 操作符实现. 这是最直接的一种模式,内部流的每一个值都收集完毕才开始外部流下一个值的收集:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

fun main() = runBlocking {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
        .flatMapConcat { requestFlow(it) }
        .collect { value -> // collect and print
                   println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

flatMapMerge

这种模式是 并发收集 流输入然后把值 合并 到一条单独的流,这样可以让值尽可能快地发射出来。这种模式由 flatMapMerge and flattenMerge 操作符实现

     这两个操作符都接受一个可选的并发参数,用来限制在同一时间并发收集的流数量

     默认值 DEFAULT_CONCURRENCY 
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // collect and print
                   println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

输出结果如下:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
     注意:flatMapMerge 顺序地调用  ({ requestFlow(it) } 代码段, 但是对于结果流的收集是并发地

     这等同于执行一个顺序地映射 { requestFlow(it) } 然后在对结果调用 flattenMerge 

flatMapLatest

类似于 collectLatest,另外一种展平模式是只收集最新的值:如果新的流开始发射,前面流的集合就会被抛弃。这种模式是由 flatMapLatest 操作符实现:

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
        .flatMapLatest { requestFlow(it) }
        .collect { value -> // collect and print
                   println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

下面的输出是一个很好的例子:

1: First at 320 ms from start
2: First at 430 ms from start
3: First at 533 ms from start
3: Second at 1044 ms from start
     注意:每当有一个新的值 在 ({ requestFlow(it) } 产生,会取消所有已经收集到的数字流

     哪怕这里 requestFlow 函数运行很快,非挂起,不允许取消,行为也是这样

流异常

流收集可能以异常来完成收集:

  • 发射值的时候抛出异常
  • 收集的时候抛出异常

    有不同的方式来处理这些异常

收集器 try 与 catch

收集操作符可以使用 try/catch 代码块来处理异常:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun foo11(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

fun main() = runBlocking {
    try {
        foo11().collect { value ->
                              println(value)
                          check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

在成功捕获了 collect 操作符内产生的异常后, 不再 有新的值被 发射 出来:

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2

一切都已捕获

     实际上前面的例子会捕获任何来自于发射或收集时候产生的异常

现在试着让异常产生于发射的代码内:

fun foo(): Flow<String> = 
flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}
.map { value ->
           check(value <= 1) { "Crashed on $value" }                 
       "string $value"
}

fun main() = runBlocking<Unit> {
    try {
        foo().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}

类似地,异常同样被捕获,收集也被停止了

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

异常透明性

    美中不足的是,这里的代码无法隐藏处理异常的逻辑

    一般来说流应该对异常透明,因为这保证了在收集时候捕获的异常是收集时候的代码所产生的

发射器可以使用 catch 操作符来维持异常透明性,并 包装 异常处理逻辑 。一般是分析错误,并且根据不同的错误做出不同的反应:

  • 异常被再次扔出
  • 异常转换成另外一个值发射出去
  • 异常被忽略,记录,或被其他代码处理
fun foo15(): Flow<String> =
flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}
.map { value ->
           check(value <= 1) { "Crashed on $value" }
       "string $value"
}

fun main() = runBlocking<Unit> {
    foo15()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
}

虽然这里的输出和前面一样,但是不再有 try/catch 显示处理了

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2

异常逃逸

     然而上面的例子却无法处理收集器内 collect 代码块产生的异常
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking

fun foo16(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    foo16()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
                       check(value <= 1) { "Collected $value" }
                   println(value)
        }
}

输出如下:

Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at tutorial.coroutine.flow.FlowExceptionEscapeKt$main$1$2.emit(FlowExceptionEscape.kt:19)

声明式捕获

做为改进可以把 collect 操作符里的代码移动到一个 onEach 操作符内,并把 onEach 操作符放在 catch 操作符前面:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun foo17(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking {
    foo17()
        .onEach { value ->
                      check(value <= 1) { "Collected $value" }
                  println(value)
        }
        .catch { e -> println("Caught $e") }
        .collect {}
}

现在可以看到一条类似以 "Caught…" 开头的消息被打印出来

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
     这里既使用了 catch 操作符的声明性,并且也处理了所有的异常

     但也要注意:这里的 collect 不能有任何额外的逻辑

流完成

当流收集完成以后(无论正常或有异常),可能会需要执行某些逻辑。这同样可以通过两种方式实现:命令式和声明式

命令式 finally 块

在 try/catch 块之外,收集器可以使用 finally 块来执行某些收集完成之后的逻辑:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.runBlocking

fun foo18(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking {
    try {
        foo18().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}    

打印了3个数字,最后会打印 "Done" 字符串

1
2
3
Done

声明式处理

流同样也有一个声明式的操作符 onCompletion 来调用流收集完成后的逻辑。相对于命令式的做法,最主要的优点在于 onCompletion 有一个可为空的 Throwable 参数来表示 收集 是否 正常 结束:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onCompletionOn
import kotlinx.coroutines.runBlocking

fun foo19(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking {
    foo19()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}

输出如下:

1
Flow completed exceptionally
Caught exception
onCompletion 操作符 与 catch 不同,它并不处理异常

异常是传递给下一个方法,然后在 catch 操作符里被处理

仅限上游异常

和 catch 操作符类似, onCompletion 操作符也仅能捕获到上游传过来的异常,而无法感知下游产生的异常:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.runBlocking

fun foo20(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking {
    foo20()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

可以看到 onCompletion 里的 cause 为空,但整个收集依旧是以异常而终止:

1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
      现在已经看过分别用命令式和声明式来处理错误和完成收集,那么究竟应该使用哪种呢?

      做为一个库,两种方式都是有效的。如何选择应该由个人爱好和编程风格所决定

流启动

    流非常适合用来表示从某些源头异步发送的事件

    这种情况下,需要注册一个类似于 AddEventListener 的函数,这个函数代码里包含了如何处理接收到的事件

onEach 操作符可以起到注册的职责。然而 onEach 操作符只是一个中端操作符,仍然需要一个末端操作符来收集流,否则只调用 onEach 不起作用

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(1000) }

fun main() = runBlocking {
    events()
        .onEach { event -> println("Event: $event") }
        .collect{} // <--- Collecting the flow waits
    println("Done")
}

输出如下:

Event: 1
Event: 2
Event: 3
Done
    在 onEach 后面加上 collect 操作符,所有 collect {} 之后的代码(打印 "Done" 字符串)会在流被收集完毕之后才执行

如果使用 lauchIn 来替换 collect 操作符,可以让 流收集 运行在另外一个 单独协程 中,lauchIn 后面的代码会 立刻 执行:

import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}  

这时的输出如下:

Done
Event: 1
Event: 2
Event: 3

lauchIn 必须的参数是一个 CoroutineScope : 流收集跑在哪个协程上下文里

    实例中这个协程上下文参数是来自于 runBlocking 构造器

    因此当流在运行时,会继续执行main函数里其他代码,并等待所有子协程跑完

实际应用中如果这个上下文来自于一个有限生命周期的实例,当这个实例终止时,协程会被取消,同样收集流的协程也会被取消

    这时候,onEach { ... }.launchIn(scope) 的工作方式和 addEventListener 相似

    然而不需要显示地调用 removeEventListener ,因为协程取消隐式地包含了这一点

注意: launchIn 也会返回一个 Job。这个 Job 也可以被用来单独取消流收集

    这种情况适用于 不取消 CoroutineScope 参数对应的协程,只取消收集流对应的协程

流 与 响应式流

    对于熟悉响应式流(Reactive Streams)或诸如 RxJava 与 Project Reactor 这样的响应式框架的人来说, Flow 的设计也许看起来会非常熟悉

确实,其设计灵感来源于响应式流以及其各种实现。但是 Flow 的主要目标是拥有尽可能简单的设计, 对 Kotlin 以及挂起友好且遵从结构化并发

虽然有所不同,但从概念上讲,Flow 依然是响应式流,并且可以将它转换为响应式(规范及符合 TCK)的发布者(Publisher),反之亦然

这些开箱即用的转换器可以在 kotlinx.coroutines 提供的相关响应式模块:
1. kotlinx-coroutines-reactive 用于 Reactive Streams
2. kotlinx-coroutines-reactor 用于 Project Reactor
3. kotlinx-coroutines-rx2 用于 RxJava2

集成模块包含 Flow 与其他实现之间的转换,与 Reactor 的 Context 集成以及与一系列响应式实体配合使用的挂起友好的使用方式
Next:通道 Previous:调度器 Home:协程