UP | HOME

通道

Table of Contents

  延期的值提供了一种便捷的方法使单个值在多个协程之间进行相互传输

通道提供了一种在 传输 值的方法

基础

Channel 是一个和 BlockingQueue 非常相似的概念

一个不同是它代替了阻塞的 put 操作并提供了 挂起send ,还替代了阻塞的 take 操作并提供了 挂起receive

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val channel = Channel<Int>()

    launch {
        // 这里可能是消耗大量 CPU 运算的异步逻辑,我们将仅仅做 5 次整数的平方并发送
        for (x in 1..5)
            channel.send(x * x)
    }
    // 这里我们打印了 5 次被接收的整数:
    repeat(5) {
        println(channel.receive())
    }
    println("Done!")
}

输出如下:

1
4
9
16
25
Done!

关闭与迭代通道

和队列不同,一个通道可以通过被 关闭 来表明 没有更多元素 将会 进入 通道。 在 接收者 中可以 定期 的使用 for 循环来从通道中 接收 元素

    从概念上来说,一个 close 操作就像向通道发送了一个特殊的关闭指令,这个迭代停止就说明关闭指令已经被接收了

    所以这里保证所有先前发送出去的元素都在通道关闭前被接收到
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5)
            channel.send(x * x)
        channel.close() // 我们结束发送
    }
    // 这里我们使用 `for` 循环来打印所有被接收到的元素(直到通道被关闭)
    for (y in channel)
        println(y)
    println("Done!")
} 

构建通道生产者

    协程生成一系列元素的模式很常见。 这是 生产者——消费者 模式的一部分,并且经常能在并发的代码中看到它

    可以将生产者抽象成一个函数,并且使通道作为它的参数,但这与必须从函数中返回结果的常识相违悖

下面实例中有一个名为 produce 的便捷的协程构建器,可以很容易的在 生产者端 正确工作, 并且使用扩展函数 consumeEach消费者端 替代 for 循环:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5)
        send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach {
        println(it)
    }
    println("Done!")
} 

管道

管道是一个 协程 中开始 生产 可能 无穷多元素 的模式:

fun CoroutineScope.produceNumbers() = produce {
    var x = 1
    while (true)
        send(x++) // 在流中开始从 1 生产无穷多个整数
}

并且另一个或多个协程开始 消费 这些流,做一些操作,并生产了一些额外的结果。 在下面的例子中,对这些数字仅仅做了平方操作:

fun CoroutineScope.square(numbers: ReceiveChannel): ReceiveChannel = produce {
    for (x in numbers)
        send(x * x)
}

更完整的一个例子:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val numbers = produceNumbers() // 从 1 开始生成整数
    val squares = square(numbers) // 整数求平方
    repeat(5) {
        println(squares.receive()) // 输出前五个
    }
    println("Done!") // 至此已完成
    coroutineContext.cancelChildren() // 取消子协程
}

fun CoroutineScope.produceNumbers() = produce {
    var x = 1
    while (true)
        send(x++) // 从 1 开始的无限的整数流
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers)
        send(x * x)
}
    所有创建了协程的函数被定义在了 CoroutineScope 的扩展上, 所以可以依靠结构化并发来确保没有常驻在应用程序中的全局协程

素数管道

     现在展示一个极端的例子:在协程中使用一个管道来生成素数

首先开启一个数字的无限序列:

fun CoroutineScope.numbersFrom(start: Int) = produce {
    var x = start
    while (true)
        send(x++) // 开启了一个无限的整数流
} 

在下面的管道阶段中过滤了来源于流中的数字,删除了所有可以被给定素数整除的数字:

fun CoroutineScope.filter(numbers: ReceiveChannel, prime: Int) = produce {
    for (x in numbers)
        if (x % prime != 0)
        send(x)
} 

现在开启了一个从 2 开始的数字流管道,从当前的通道中取一个素数, 并为每一个发现的素数启动一个流水线阶段:

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ……

下面的例子打印了前十个素数, 在主线程的上下文中运行整个管道。直到所有的协程在该主协程 runBlocking 的作用域中被启动完成。不必使用一个显式的列表来保存所有被已经启动的协程。 而是使用 cancelChildren 扩展函数在打印了前十个素数以后来取消所有的子协程:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // 取消所有的子协程来让主协程结束
}

fun CoroutineScope.numbersFrom(start: Int) = produce {
    var x = start
    while (true)
        send(x++) // 从 start 开始过滤整数流
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce {
    for (x in numbers)
        if (x % prime != 0)
            send(x)
} 

可以在标准库中使用 iterator 协程构建器来构建一个相似的管道:

  • 使用 iterator 替换 produce
  • yield 替换 send
  • next 替换 receive
  • Iterator 替换 ReceiveChannel 来摆脱协程作用域,将不再需要 runBlocking
     如上所示,如果在 Dispatchers.Default 上下文中运行它,使用通道的管道的好处在于它可以充分利用多核心 CPU


     不过,这是一种非常不切实际的寻找素数的方法

     在实践中,管道调用了另外的一些挂起中的调用(就像异步调用远程服务)并且这些管道不能内置使用 sequence/iterator,因为它们不被允许随意的挂起,不像 produce 是完全异步的

fan out

多个协程也许会接收相同的管道,在它们之间进行分布式工作。启动一个定期产生整数的生产者协程 (每秒十个数字):

fun CoroutineScope.produceNumbers() = produce {
    var x = 1
    // 从 1 开始
    while (true) {
        send(x++) // 产生下一个数字
        delay(100) // 等待 0.1 秒
    }
} 

接下来可以创建几个处理器协程。在这个示例中,它们只是打印它们的 id 和接收到的数字:

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

现在启动五个处理器协程并让它们工作将近一秒。看看发生了什么:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val producer = produceNumbers1()
    repeat (5) {
        launchProcessor(it, producer)
    }
    delay(950)
    producer.cancel() // 取消协程生产者从而将它们全部杀死
}

fun CoroutineScope.produceNumbers1() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // 产生下一个数字
        delay(100) // 等待 0.1 秒
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

该输出将类似于如下所示,尽管接收每个特定整数的处理器 id 可能会不同:

Processor #0 received 1
Processor #0 received 2
Processor #1 received 3
Processor #2 received 4
Processor #3 received 5
Processor #4 received 6
Processor #0 received 7
Processor #1 received 8
Processor #2 received 9

注意:取消生产者协程将关闭它的通道,从而最终终止处理器协程正在执行的此通道上的迭代

     注意如何使用 for 循环显式迭代通道以在 launchProcessor 代码中执行扇出,与 consumeEach 不同,这个 for 循环是安全完美地使用多个协程的

     如果其中一个处理器协程执行失败,其它的处理器协程仍然会继续处理通道,而通过 consumeEach 编写的处理器始终在正常或非正常完成时消耗(取消)底层通道 

fan in

多个协程可以发送到同一个通道

     比如说,创建一个字符串的通道,和一个在这个通道中以指定的延迟反复发送一个指定字符串的挂起函数
suspend fun sendString(channel: SendChannel, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

现在,启动了几个发送字符串的协程,看看会发生什么:

import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val channel = Channel<String>()
    launch {
        sendString(channel, "foo", 200L)
    }

    launch {
        sendString(channel, "BAR!", 500L)
    }

    repeat(6) {
        // 接收前六个
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // 取消所有子协程来让主协程结束
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

输出如下:

foo
foo
BAR!
foo
foo
BAR!
     在示例中,在主线程的上下文中作为主协程的子协程来启动它们

带缓冲的通道

到目前为止展示的通道都是没有缓冲区的。无缓冲的通道在发送者和接收者相遇时传输元素:

  • 如果发送先被调用,则它将被挂起直到接收被调用
  • 如果接收先被调用,它将被挂起直到发送被调用
aka rendezvous 这句话应该是个俚语,意思好像是“又是约会”的意思

Channel() 工厂函数与 produce 建造器通过一个可选的参数 capacity 来指定 缓冲区大小 。缓冲允许发送者在被挂起前发送多个元素

     就像 BlockingQueue 有指定的容量一样,当缓冲区被占满的时候将会引起阻塞

使用缓冲通道并给 capacity 参数传入 4:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val channel = Channel<Int>(4) // 启动带缓冲的通道
    val sender = launch { // 启动发送者协程
                          repeat(10) {
                              println("Sending $it") // 在每一个元素发送前打印它们
                              channel.send(it) // 将在缓冲区被占满时挂起
                          }
    }
    // 没有接收到东西……只是等待……
    delay(1000)
    sender.cancel() // 取消发送者协程
}

它将打印“sending” 5 次:

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
    前四个元素被加入到了缓冲区并且发送者在试图发送第五个元素的时候被挂起

通道是公平的

发送和接收操作是 公平的 并且尊重调用它们的多个协程。它们遵守 先进先出 原则,可以看到第一个协程调用 receive 并得到了元素

    在下面的例子中两个协程“乒”和“乓”都从共享的“桌子”通道接收到这个“球”元素
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // 一个共享的 table(桌子)
    launch {
        player("ping", table)
    }

    launch {
        player("pong", table)
    }

    table.send(Ball(0)) // 乒乓球
    delay(1000) // 延迟 1 秒钟
    coroutineContext.cancelChildren() // 游戏结束,取消它们
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) {
        // 在循环中接收球
        ball.hits++
        println("$name $ball")
        delay(300) // 等待一段时间
        table.send(ball) // 将球发送回去
    }
}

“乒”协程首先被启动,所以它首先接收到了球。甚至虽然“乒” 协程在将球发送会桌子以后立即开始接收,但是球还是被“乓” 协程接收了,因为它一直在等待着接收球

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
    注意:有时候通道执行时由于执行者的性质而看起来不那么公平

计时器通道

计时器通道 是一种特别的会合通道,每次经过 特定的延迟 都会从该 通道 进行 消费产生 Unit

    虽然它看起来似乎没用,它被用来构建分段来创建复杂的基于时间的 produce 管道和进行窗口化操作以及其它时间相关的处理

    可以在 select 中使用计时器通道来进行“打勾”操作

使用工厂方法 ticker 来创建这些通道。 为了表明不需要其它元素,请使用 ReceiveChannel.cancel 方法:

import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull

fun main() = runBlocking {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道
    var nextElement = withTimeoutOrNull(1) {
        tickerChannel.receive()
    }
    println("Initial element is available immediately: $nextElement") // 初始尚未经过的延迟
    nextElement = withTimeoutOrNull(50) {
        tickerChannel.receive()
    } 
    // 所有随后到来的元素都经过了 100 毫秒的延迟
    println("Next element is not ready in 50 ms: $nextElement")
    nextElement = withTimeoutOrNull(60) {
        tickerChannel.receive()
    }
    println("Next element is ready in 100 ms: $nextElement") 
    // 模拟大量消费延迟
    println("Consumer pauses for 150ms")
    delay(150) 
    // 下一个元素立即可用
    nextElement = withTimeoutOrNull(1) {
        tickerChannel.receive()
    }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // 请注意,`receive` 调用之间的暂停被考虑在内,下一个元素的到达速度更快
    nextElement = withTimeoutOrNull(60) {
        tickerChannel.receive()
    }
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
    tickerChannel.cancel() // 表明不再需要更多的元素
}

它的打印如下:

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
    请注意,ticker 知道可能的消费者暂停,并且默认情况下会调整下一个生成的元素如果发生暂停则延迟,试图保持固定的生成元素率

给可选的 mode 参数传入 TickerMode.FIXEDDELAY 可以保持固定元素之间的延迟

Next:异常处理 Previous:异步流 Home:协程