共享的可变状态
协程可用 多线程调度器 (比如默认的 Dispatchers.Default)并发执行。这样就可能出现所有常见的并发问题
主要的问题是同步访问共享的可变状态 协程领域对这个问题的一些解决方案类似于多线程领域中的解决方案,但另外解决方案则是独一无二的
问题
启动一百个协程,它们都做一千次相同的操作。同时会测量它们的完成时间以便进一步的比较:
suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // 启动的协程数量 val k = 1000 // 每个协程重复执行同一动作的次数 val time = measureTimeMillis { coroutineScope { // 协程的作用域 repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") }
先从一个非常简单的动作开始:使用多线程的 Dispatchers.Default 来递增一个共享的可变变量
import kotlinx.coroutines.* import kotlin.system.measureTimeMillis suspend fun massiveRun(action: suspend () -> Unit) { val n = 100 // 启动的协程数量 val k = 1000 // 每个协程重复执行同一动作的次数 val time = measureTimeMillis { coroutineScope { // 协程的作用域 repeat(n) { launch { repeat(k) { action() } } } } } println("Completed ${n * k} actions in $time ms") } var counter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter++ } } println("Counter = $counter") }
这段代码最后打印出什么结果? 它不太可能打印出“Counter = 100000”,因为一百个协程在多个线程中同时递增计数器但没有做并发处理
volatile 无济于事
有一种常见的误解:volatile 可以解决并发问题。尝试一下:
@Volatile var counter2 = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter2++ } } println("Counter = $counter2") }
这段代码运行速度更慢了,但最后仍然没有得到“Counter = 100000”这个结果 因为 volatile 变量保证并发下的可见性,但在大量并发(示例中即“递增”操作)修改时并不提供原子性
线程安全的数据结构
一种对线程、协程都有效的常规解决方法,就是使用 线程安全 (也称为 同步的 、 可线性化、原子)的数据结构,它为需要在共享状态上执行的相应操作提供所有必需的同步处理。在简单的计数器场景中,可以使用具有 incrementAndGet 原子操作的 AtomicInteger 类:
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import java.util.concurrent.atomic.AtomicInteger var counter3 = AtomicInteger() fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { counter3.incrementAndGet() } } println("Counter = $counter3") }
这是针对此类特定问题的最快解决方案 它适用于普通计数器、集合、队列和其他标准数据结构以及它们的基本操作 然而,它并不容易被扩展来应对复杂状态、或一些没有现成的线程安全实现的复杂操作
细粒度限制线程
限制线程 是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都 限制 在 单个线程 中。这在协程中很容易实现,通过使用一个单线程上下文:
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext val counterContext = newSingleThreadContext("CounterContext") var counter4 = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // 将每次自增限制在单线程上下文中 withContext(counterContext) { counter4++ } } } println ("Counter = $counter4") }
这段代码运行非常缓慢,因为它进行了 细粒度 的线程限制 每个增量操作都得使用 [withContext(counterContext)] 块从多线程 Dispatchers.Default 上下文切换到单线程上下文 它通常应用于 UI 程序中:所有 UI 状态都局限于单个事件分发线程或应用主线程中
以粗粒度限制线程
在实践中,线程限制是在 大段代码 中执行的
例如:状态更新类业务逻辑中大部分都是限于单线程中
下面的示例演示了这种情况, 在单线程上下文中运行每个协程:
import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext val counterContext = newSingleThreadContext("CounterContext") var counter4 = 0 fun main() = runBlocking { massiveRun { // 将每次自增限制在单线程上下文中 withContext(counterContext) { counter4++ } } println ("Counter = $counter4") }
这段代码运行更快而且打印出了正确的结果
互斥
互斥解决方案:使用 永远不会同时执行 的 关键代码块 来保护共享状态的所有修改
在阻塞的世界中,通常会为此目的使用 synchronized 或者 ReentrantLock
在协程中的替代品叫做 Mutex 。它具有 lock 和 unlock 方法, 可以隔离关键的部分:
- 关键的区别在于 Mutex.lock() 是一个 挂起 函数,它不会 阻塞 线程
- 另外 withLock 扩展函数,可以方便的替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() } 模式
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext val mutex = Mutex() var counter = 0 fun main() = runBlocking { withContext(Dispatchers.Default) { massiveRun { // 用锁保护每次自增 mutex.withLock { counter++ } } } println("Counter = $counter") }
此示例中锁是细粒度的,因此会付出一些代价 但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态
Actor
一个 actor 是由 协程 、被 限制 并 封装 到该协程中的 状态 以及一个与 其它协程通信 的 通道 组合而成的一个实体
一个简单的 actor 可以简单的写成一个函数, 但是一个拥有复杂状态的 actor 更适合由类来表示 一个 actor 协程构建器,它可以方便地将 actor 的邮箱通道组合到其作用域中 可以用来接收消息、组合发送 channel 与结果集对象,这样对 actor 的单个引用就可以作为其句柄持有
使用 actor 的第一步是 定义 一个 actor 要处理的 消息类
// 计数器 Actor 的各种类型 sealed class CounterMsg
Kotlin 的密封类很适合这种场景
- IncCounter 消息:用来递增计数器
- GetCounter 消息:用来获取值
- 需要发送回复,使用 CompletableDeferred 通信原语表示 未来可知 (可传达)的单个值
object IncCounter : CounterMsg() // 递增计数器的单向消息 class GetCounter(val response: CompletableDeferred) : CounterMsg() // 携带回复的请求
接下来定义一个函数,使用 actor 协程构建器 来启动一个 actor:
// 这个函数启动一个新的计数器 actor fun CoroutineScope.counterActor() = actor<CounterMsg> { var counter = 0 // actor 状态 for (msg in channel) { // 即将到来消息的迭代器 when (msg) { is IncCounter -> counter++ is GetCounter -> msg.response.complete(counter) } } }
main 函数代码很简单:
fun main() :Unit = runBlocking { val counter = counterActor() // 创建该 actor withContext(Dispatchers.Default) { massiveRun { counter.send(IncCounter) } } // 发送一条消息以用来从一个 actor 中获取计数值 val response = CompletableDeferred<Int>() counter.send(GetCounter(response)) println("Counter = ${response.await()}") counter.close() // 关闭该actor }
actor 本身执行时所处上下文(就正确性而言)无关紧要。一个 actor 是一个协程,而一个协程是按顺序执行的,因此将状态限制到特定协程可以解决共享可变状态的问题
实际上,actor 可以修改自己的私有状态, 但只能通过消息互相影响(避免任何锁定) actor 在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文
Next:select 表达式 | Previous:异常处理 | Home:协程 |