UP | HOME

并发

Table of Contents

安全且高效的处理并发编程是 Rust 的另一个主要目标:

  这两个概念随着计算机越来越多的利用多处理器的优势时显得愈发重要

  注意:出于简洁的考虑,将很多问题归类为 并发,而不是更准确的区分 并发和并行

  如果这是一本专注于并发和/或并行的书,肯定会更加精确的。对于本章,当谈到 并发 时,请自行脑内替换为 并发和 并行

通过利用 所有权类型检查 ,在 Rust 中很多并发错误都是 编译时 错误,而非运行时错误。相比花费大量时间尝试重现运行时并发 bug 出现的特定情况,Rust 会拒绝编译不正确的代码并提供解释问题的错误信息。因此,可以在开发时修复代码,而不是在部署到生产环境后修复代码

  很多语言所提供的处理并发问题的解决方法都非常有特色

  例如,Erlang 有着优雅的消息传递并发功能,但只有模糊不清的在线程间共享状态的方法

  对于高级语言来说,只实现可能解决方案的子集是一个合理的策略,因为高级语言所许诺的价值来源于牺牲一些控制来换取抽象

  然而对于底层语言则期望提供在任何给定的情况下有着最高的性能且对硬件有更少的抽象。因此,Rust 提供了多种工具,以符合实际情况和需求的方式来为问题建模

如下是接下来将要涉及到的内容:

使用线程同时运行代码

在大部分现代操作系统中,执行中程序的代码在一个 进程 中运行,_操作系统_ 则负责管理多个进程。在 程序内部 ,也可以拥有 多个 同时运行 的独立部分。这个运行这些独立部分的功能被称为 线程 。将程序中的计算拆分进多个线程可以 改善性能 ,因为程序可以同时进行多个任务,不过这也会 增加复杂性 。因为线程是同时运行的,所以无法预先保证不同线程中的代码的执行顺序。这会导致诸如此类的问题:

  • 竞争状态,多个线程以不一致的顺序访问数据或资源
  • 死锁,两个线程相互等待对方停止使用其所拥有的资源,这会阻止它们继续运行
  • 只会发生在特定情况且难以稳定重现和修复的 bug

Rust 尝试缓和使用线程的负面影响。不过在多线程上下文中编程仍需格外小心,同时其所要求的代码结构也不同于运行于单线程的程序

    编程语言有一些不同的方法来实现线程。很多操作系统提供了创建新线程的 API。这种由编程语言调用操作系统 API 创建线程的模型有时被称为 1:1,一个 OS 线程对应一个语言线程

    很多编程语言提供了自己特殊的线程实现。编程语言提供的线程被称为 绿色线程,使用绿色线程的语言会在不同数量的 OS 线程的上下文中执行它们。为此,绿色线程模式被称为 M:N 模型:M 个绿色线程对应 N 个 OS 线程,这里 M 和 N 不必相同

    每一个模型都有其优势和取舍,对于 Rust 来说最重要的取舍是运行时支持。在当前上下文中,运行时“代表二进制文件中包含的由语言自身提供的代码”

    这些代码根据语言的不同可大可小,不过任何非汇编语言都会有一定数量的运行时代码。为此,通常人们说一个语言 “没有运行时”,一般意味着 “小运行时”

    更小的运行时拥有更少的功能不过其优势在于更小的二进制输出,这使其易于在更多上下文中与其他语言相结合

    虽然很多语言觉得增加运行时来换取更多功能没有什么问题,但是 Rust 需要做到几乎没有运行时,同时为了保持高性能必需能够调用 C 语言,这点也是不能妥协的

    绿色线程的 M:N 模型需要更大的语言运行时来管理这些线程。因此,Rust 标准库只提供了 1:1 线程模型实现

    由于 Rust 是较为底层的语言,如果愿意牺牲性能来换取的抽象,以获得对线程运行更精细的控制及更低的上下文切换成本,可以使用实现了 M:N 线程模型的 crate

使用 spawn 创建新线程

为了创建一个新线程,需要调用 thread::spawn 函数并传递一个 闭包 ,并在其中包含希望在新线程运行的代码。下面的例子在主线程打印了一些文本而另一些文本则由新线程打印:

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

这个程序的输出可能每次都略有不同,不过它大体上看起来像这样:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
     注意:这个函数编写的方式,当主线程结束时,新线程也会结束,而不管其是否执行完毕

thread::sleep 调用强制线程 停止 执行一小段时间,这会允许其他不同的线程运行

     这些线程可能会轮流运行,不过并不保证如此:这依赖操作系统如何调度线程

     在这里,主线程首先打印,即便新创建线程的打印语句位于程序的开头,甚至即便告诉新建的线程打印直到 i 等于 9 ,它在主线程结束之前也只打印到了 5

使用 join 等待线程结束

     由于主线程结束,上面示例代码大部分时候不光会提早结束新建线程,甚至不能实际保证新建线程会被执行。其原因在于无法保证线程运行的顺序!

可以通过将 thread::spawn返回值 储存变量 中来修复新建线程部分没有执行或者完全没有执行的问题。thread::spawn 的返回值类型是 JoinHandle 。JoinHandle 是一个 拥有所有权 的值,当对其调用 join 方法时,它会 等待线程结束 。下面的例子里展示了如何使用JoinHandle 并调用 join 来确保新建线程在 main 退出前结束运行:

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

通过调用 handle 的 join 会阻塞当前线程直到 handle 所代表的线程结束。 阻塞 线程意味着 阻止 该线程执行工作或退出。因为将 join 调用放在了主线程的 for 循环之后,运行后应该会产生类似这样的输出:

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
   这两个线程仍然会交替执行,不过主线程会由于 handle.join() 调用会等待直到新建线程执行完毕

如果将 handle.join() 移动到 main 中 for 循环之前会发生什么:

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    handle.join().unwrap();

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

主线程会等待直到新建线程执行完毕之后才开始执行 for 循环,所以输出将不会交替出现,如下所示:

hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!
   将 join 放置于何处这样一个细节会影响线程是否同时运行

线程与 move 闭包

move 闭包 ,其经常与 thread::spawn 一起使用,因为它允许在一个线程中使用另一个线程的数据

     在第十三章中,曾经讲到可以在参数列表前使用 move 关键字强制闭包获取其使用的环境值的所有权

     这个技巧在创建新线程将值的所有权从一个线程移动到另一个线程时最为实用

下面展示了一个尝试在主线程中创建一个 vector 并用于新建线程的例子,如下所示:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

闭包使用了 v,所以闭包会捕获 v 并使其成为闭包环境的一部分。因为 thread::spawn 在一个新线程中运行这个闭包,所以可以在新线程中访问 v。然而当编译这个例子时,会得到如下错误:

error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {:?}", v);
  |                                           - `v` is borrowed here
  |
help: to force the closure to take ownership of `v` (and any other referenced
variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^
Rust 会 推断 如何捕获 v,因为 println! 只需要 v 的引用,闭包尝试借用 v

然而这有一个问题:Rust 不知道这个新建线程会执行多久,所以无法知晓 v 的引用是否一直有效

下面就展示了一个 v 的引用很可能不再有效的场景:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(|| {
        println!("Here's a vector: {:?}", v);
    });

    drop(v); // oh no!

    handle.join().unwrap();
}
     假如这段代码能正常运行的话,则新建线程则可能会立刻被转移到后台并完全没有机会运行

     新建线程内部有一个 v 的引用,不过主线程立刻就使用 drop 丢弃了 v

     接着当新建线程开始执行,v 已不再有效,所以其引用也是无效的

为了修复前面的编译错误,可以听取错误信息的建议:

help: to force the closure to take ownership of `v` (and any other referenced
variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ^^^^^^^

通过在闭包之前增加 move 关键字, 强制 闭包 获取使用的值的所有权 ,而不是任由 Rust 推断它应该借用值。这可以按照预期编译并运行:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

如果为闭包增加 move,将会把 v 移动进闭包的环境中,如此将不能在主线程中对其调用 drop 了。会得到如下不同的编译错误:

error[E0382]: use of moved value: `v`
  --> src/main.rs:10:10
   |
6  |     let handle = thread::spawn(move || {
   |                                ------- value moved (into closure) here
...
10 |     drop(v); // oh no!
   |          ^ value used here after move
   |
   = note: move occurs because `v` has type `std::vec::Vec<i32>`, which does
   not implement the `Copy` trait
这里Rust 的所有权规则又一次帮助了我们!

最开始的错误是因为 Rust 是保守的并只会为线程借用 v,这意味着主线程理论上可能使新建线程的引用无效

然而通过告诉 Rust 将 v 的所有权移动到新建线程,向 Rust 保证主线程不会再使用 v。当在主线程中使用 v 时就会违反所有权规则

move 关键字覆盖了 Rust 默认保守的借用:其也不允许违反所有权规则

使用消息传递在线程间传送数据

    一个日益流行的确保安全并发的方式是消息传递,线程或 actor 通过发送包含数据的消息来相互沟通

    这个思想来源于“Go 编程语言文档”中的口号:“不要共享内存来通讯;而是要通讯来共享内存”

Rust 中一个实现消息传递并发的主要工具是 通道 ,Rust 标准库提供了其实现的编程概念。通道有两部分组:

  • 发送者:代码中的一部分调用发送者的方法以及希望发送的数据,
  • 接收者:代码中另一部分用来检查接受到的消息
  • 当发送者或接收者任一被丢弃时可以认为通道被 关闭
    这里将开发一个程序,它会在一个线程生成值向通道发送,而在另一个线程会接收值并打印出来。接下来会通过通道在线程间发送简单值来演示这个功能

    一旦熟悉了这项技术,就能使用通道来实现聊天系统,或利用很多线程进行分布式计算并将部分计算结果发送给一个线程进行聚合

首先,创建了一个通道但没有做任何事:

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}
  注意这还不能编译,因为 Rust 不知道想要在通道中发送什么类型

这里使用 mpsc::channel 函数 创建 一个新的通道

mpsc 是 多个生产者,单个消费者的缩写。简而言之,Rust 标准库实现通道的方式意味着一个通道可以有多个产生值的发送端,但只能有一个消费这些值的接收端

想象一下多条小河小溪最终汇聚成大河:所有通过这些小河发出的东西最后都会来到大河的下游

目前以单个生产者开始,但是当示例可以工作后会增加多个生产者

mpsc::channel 函数返回一个 元组 :第一个元素是 发送端 ,而第二个元素是 接收端

    由于历史原因,tx 和 rx 通常作为 发送者和 接收者的缩写,所以用来绑定这两端变量的名字

    这里使用了一个 let 语句和模式来解构了此元组;以后会讨论 let 语句中的模式和解构。这是一个方便提取 mpsc::channel 返回的元组中一部分的手段

将发送端移动到一个新建线程中并发送一个字符串,这样新建线程就可以和主线程通讯了:

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}
    这里再次使用 thread::spawn 来创建一个新线程并使用 move 将 tx 移动到闭包中这样新建线程就拥有 tx 了

    因为新建线程需要拥有通道的发送端以便可以通过通道来发送消息

通道的发送端有一个 send 方法用来获取需要放入通道的值。send 方法返回一个 Result<T, E> 类型:如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作会返回错误

    在这个例子中,出错的时候调用 unwrap 产生 panic

    不过对于一个真实程序,需要合理地处理它

现在主线程中从通道的接收端获取值:

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

通道的接收端有两个有用的方法 recvtry_recv

  • recv: 阻塞 主线程执行直到 从通道中接收一个值
    • 一旦发送了一个值,recv 会在一个 Result<T, E> 中返回它
    • 当通道发送端关闭,recv 会返回一个错误表明不会再有新的值到来了
  • try_recv: 不会阻塞,相反它立刻返回一个 Result<T, E>
    • Ok 值包含可用的信息
    • Err 值代表此时没有任何消息
    如果线程在等待消息过程中还有其他工作时使用 try_recv 很有用:可以编写一个循环来频繁调用 try_recv,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查

    出于简单的考虑,这个例子使用了 recv;主线程中除了等待消息之外没有任何其他工作,所以阻塞主线程是合适的

运行上面的示例后,将会看到主线程打印出这个值:

Got: hi

通道与所有权转移

     所有权规则在消息传递中扮演了重要角色,其有助于编写安全的并发代码。防止并发编程中的错误是在 Rust 程序中考虑所有权的一大优势

现在做一个试验来看看通道与所有权如何一同协作以避免产生问题:尝试在新建线程中的通道中发送完 val 值 之后 再使用它:

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

这里尝试在通过 tx.send 发送 val 到通道中之后将其打印出来。允许这么做是一个坏主意:一旦将值发送到另一个线程后,那个线程可能会在再次使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。然而,尝试编译代码时,Rust 会给出一个错误:

error[E0382]: use of moved value: `val`
  --> src/main.rs:10:31
   |
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value used here after move
   |
   = note: move occurs because `val` has type `std::string::String`, which does not implement the `Copy` trait

这里的并发错误会造成一个编译时错误。send 函数获取其参数的所有权并移动这个值归接收者所有。这可以防止在发送后再次意外地使用这个值;所有权系统检查一切是否合乎规则

发送多个值并观察接收者的等待

     上面示例的代码可以编译和运行,不过它并没有明确的告诉我们两个独立的线程通过通道相互通讯

下面示例有一些改进来证明是并发执行的:新建线程现在会发送多个消息并在每个消息之间暂停一秒钟

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}
     在新建线程中有一个字符串 vector 希望发送到主线程。遍历他们,单独的发送每一个字符串并通过一个 Duration 值调用 thread::sleep 函数来暂停一秒

     在主线程中,不再显式调用 recv 函数:而是将 rx 当作一个迭代器。对于每一个接收到的值,将其打印出来。当通道被关闭时,迭代器也将结束

当运行示例代码时,将看到如下输出,每一行都会暂停一秒:

Got: hi
Got: from
Got: the
Got: thread
   因为主线程中的 for 循环里并没有任何暂停或等待的代码,所以可以说主线程是在等待从新建线程中接收值

通过克隆发送者来创建多个生产者

   之前提到了mpsc是 multiple producer, single consumer 的缩写

可以运用 mpsc 来来创建向 同一接收者 发送 值的 多个线程 。这可以通过 克隆 通道的 发送端 来做到:

// --snip--

let (tx, rx) = mpsc::channel();

let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {}", received);
}
     这一次,在创建新线程之前,对通道的发送端调用了 clone 方法,这会给我们一个可以传递给第一个新建线程的发送端句柄

     我们会将原始的通道发送端传递给第二个新建线程,这样就会有两个线程,每个线程将向通道的接收端发送不同的消息

如果运行这些代码,可能会看到这样的输出:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
     虽然可能会看到这些值以不同的顺序出现;这依赖于你的系统。这也就是并发既有趣又困难的原因

     如果通过 thread::sleep 做实验,在不同的线程中提供不同的值,就会发现他们的运行更加不确定,且每次都会产生不同的输出

共享状态并发

  在某种程度上,任何编程语言中的通道都类似于单所有权,因为一旦将一个值传送到通道中,将无法再使用这个值

共享内存类似于多所有权:多个线程可以同时访问相同的内存位置

智能指针如何使得多所有权成为可能,然而这会增加额外的复杂性,因为需要以某种方式管理这些不同的所有者

Rust 的类型系统和所有权规则极大的协助了正确地管理这些所有权

作为一个例子,来看看互斥器,一个更为常见的共享内存并发原语

互斥器

互斥器 是 mutual exclusion 的缩写,也就是说,任意时刻,其只 允许 一个线程 访问 某些数据 。为了访问互斥器中的数据,线程首先需要通过获取互斥器的 来表明其希望访问数据。锁是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,描述互斥器为通过锁系统 保护 其数据。互斥器以难以使用著称,因为你不得不记住:

  • 在使用数据之前尝试获取锁
  • 处理完被互斥器所保护的数据之后,必须解锁数据,这样其他线程才能够获取锁
     作为一个现实中互斥器的例子,想象一下在某个会议的一次小组座谈会中,只有一个麦克风

     如果一位成员要发言,他必须请求或表示希望使用麦克风。一旦得到了麦克风,他可以畅所欲言,然后将麦克风交给下一位希望讲话的成员

     如果一位成员结束发言后忘记将麦克风交还,其他人将无法发言。如果对共享麦克风的管理出现了问题,座谈会将无法如期进行!

     正确的管理互斥器异常复杂,这也是许多人之所以热衷于通道的原因。然而,在 Rust 中,得益于类型系统和所有权,帮助不会在锁和解锁上出错

Mutex<T> 的API

作为展示如何使用互斥器的例子,从在单线程上下文使用互斥器开始:

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}
  • 像很多类型一样,使用关联函数 new创建 一个 Mutex<T>
  • 使用 lock 方法获取锁,以访问互斥器中的数据
    • 这个调用会 阻塞 当前线程,直到 拥有 锁为止
    • 如果另一个线程 拥有 锁,并且那个线程 panic 了,则 lock 调用会 失败 。在这种情况下,没人能够再获取锁,所以这里选择 unwrap 并在遇到这种情况时使线程 panic
  • 一旦获取了锁,就可以将 返回值 视为一个其内部数据的可变引用了
      类型系统确保了在使用 m 中的值之前获取锁:Mutex<i32> 并不是一个 i32,所以必须获取锁才能使用这个 i32 值

      我们是不会忘记这么做的,因为反之类型系统不允许访问内部的 i32 值

      同时lock 调用返回 一个叫做 MutexGuard 的智能指针

      这个智能指针实现了 Deref 来指向其内部数据;其也提供了一个 Drop 实现当 MutexGuard 离开作用域时自动释放锁

      这发生于内部作用域的结尾。为此,不会冒忘记释放锁并阻塞互斥器为其它线程所用的风险,因为锁的释放是自动发生的

丢弃了锁之后,可以打印出互斥器的值,并发现能够将其内部的 i32 改为 6

在线程间共享 Mutex<T>

现在尝试使用 Mutex<T> 在多个线程间共享值。为此将启动十个线程,并在各个线程中对同一个计数器值加一,这样计数器将从 0 变为 10:

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
      这里创建了一个 counter 变量来存放内含 i32 的 Mutex<T>

      接下来遍历 range 创建了 10 个线程。使用了 thread::spawn 并对所有线程使用了相同的闭包:
      他们每一个都将调用 lock 方法来获取 Mutex<T> 上的锁,接着将互斥器中的值加一
      当一个线程结束执行,num 会离开闭包作用域并释放锁,这样另一个线程就可以获取它了

      在主线程中,收集了所有的 join 句柄,调用它们的 join 方法来确保所有线程都会结束。这时,主线程会获取锁并打印出程序的结果

但是这个例子却无法编译:

error[E0382]: use of moved value: `counter`
  --> src/main.rs:9:36
   |
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here,
in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure
   |
   = note: move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
      错误信息表明 counter 值在上一次循环中被移动了,所以 Rust 告诉我们不能将 counter 锁的所有权移动到多个线程中

      我们可以通过前面讨论过的“多所有权”技术来修复这个编译错误

多线程和多所有权

      曾经通过使用智能指针 Rc<T> 来创建引用计数的值,以便拥有多所有者。在这也这么做看看会发生什么

将 Mutex<T> 封装进 Rc<T> 中并在将所有权移入线程之前克隆了 Rc<T>:

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

再一次编译并…出现了不同的错误!

error[E0277]: `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
  --> src/main.rs:11:22
   |
11 |         let handle = thread::spawn(move || {
   |                      ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>`
cannot be sent between threads safely
   |
   = help: within `[closure@src/main.rs:11:36: 14:10
counter:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send`
is not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
   = note: required because it appears within the type
`[closure@src/main.rs:11:36: 14:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`
   = note: required by `std::thread::spawn`
      第一行错误表明 `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely

      编译器也告诉了我们原因 the trait bound `Send` is not satisfied。下一部分会讲到 Send:这是确保所使用的类型可以用于并发环境的 trait 之一

不幸的是, Rc<T>不能安全的线程间共享

当 Rc<T> 管理引用计数时,它必须在每一个 clone 调用时增加计数,并在每一个克隆被丢弃时减少计数

Rc<T> 并没有使用任何并发原语,来确保改变计数的操作不会被其他线程打断

在计数出错时可能会导致诡异的 bug,比如可能会造成内存泄漏,或在使用结束之前就丢弃一个值

我们所需要的是一个完全类似 Rc<T>,又以一种线程安全的方式改变引用计数的类型

原子引用计数 Arc<T>

所幸 Arc<T> 正是 这么一个类似 Rc<T> 并可以安全的用于并发环境的类型

      字母 a 代表 原子性,所以这是一个原子引用计数类型。原子性是另一类这里还未涉及到的并发原语,其中的要点就是:原子性类型工作起来类似原始类型,不过可以安全的在线程间共享

      为什么不是所有的原始类型都是原子性的?为什么不是所有标准库中的类型都默认使用 Arc<T> 实现?

      原因在于线程安全带有性能惩罚,希望只在必要时才为此买单。如果只是在单线程中对值进行操作,原子性提供的保证并无必要,代码可以因此运行的更快。

回到之前的例子:Arc<T> 和 Rc<T> 有着相同的 API,所以修改程序中的 use 行和 new 调用:

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

终于打印出:

Result: 10
      这个例子中构建的结构可以用于比增加计数更为复杂的操作

      使用这个策略,可将计算分成独立的部分,分散到多个线程中,接着使用 Mutex<T> 使用各自的结算结果更新最终的结果

RefCell<T>/Rc<T> 与 Mutex<T>/Arc<T> 的相似性

     因为 counter 是不可变的,不过可以获取其内部值的可变引用;这意味着 Mutex<T> 提供了内部可变性

     就像 Cell 系列类型那样。正如前面使用 RefCell<T> 可以改变 Rc<T> 中的内容那样,同样的可以使用 Mutex<T> 来改变 Arc<T> 中的内容

     另一个值得注意的细节是 Rust 不能避免使用 Mutex<T> 的全部逻辑错误

     就像使用 Rc<T> 就有造成引用循环的风险,这时两个 Rc<T> 值相互引用,造成内存泄露。同理 Mutex<T> 也有造成”死锁“的风险。这发生于当一个操作需要锁住两个资源而两个线程各持一个锁,这会造成它们永远相互等待

使用 Sync 和 Send trait 的可扩展并发

Rust 的并发模型中一个有趣的方面是:语言本身对并发知之甚少,之前讨论的几乎所有内容,都属于标准库,而不是语言本身的内容

由于不需要语言提供并发相关的基础设施,并发方案不受标准库或语言所限:可以编写自己的或使用别人编写的并发功能

然而有两个并发概念是内嵌于语言中的: std::marker 中的 SyncSend trait

通过 Send 允许在线程间转移所有权

Send 标记 trait 表明 类型的所有权 可以在 线程间 传递

几乎所有的 Rust 类型都是Send 的,不过有一些例外,包括 Rc<T>:这是不能 Send 的。如果克隆了 Rc<T> 的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数,而Rc<T> 被实现为用于单线程场景,这时不需要为拥有线程安全的引用计数而付出性能代价

Rust 类型系统和 trait bound 确保永远也不会意外的将不安全的 Rc<T> 在线程间发送,当尝试这么做的时候,会得到错误 the trait Send is not implemented for Rc<Mutex<i32>>。而使用标记为 Send 的 Arc<T> 时,就没有问题了

任何完全由 Send 的类型组成的类型也会自动被标记为 Send。几乎所有基本类型都是 Send 的

Sync 允许多线程访问

Sync 标记 trait 表明一个实现了 Sync 的类型可以 安全的多个线程拥有 其值的引用

  • 对于任意类型 T,如果 &T(T 的引用)是 Send 的话 T 就是 Sync 的,这意味着其引用就可以安全的发送到另一个线程
  • 类似于 Send 的情况,基本类型是 Sync 的,完全由 Sync 的类型组成的类型也是 Sync 的
智能指针 Rc<T> 也不是 Sync 的,出于其不是 Send 相同的原因,RefCell<T>和 Cell<T> 系列类型不是 Sync 的,RefCell<T> 在运行时所进行的借用检查也不是线程安全的

Mutex<T> 是 Sync 的,正如 “在线程间共享 Mutex<T>” 部分所讲的它可以被用来在多线程中共享访问

手动实现 Send 和 Sync 是不安全的

通常并不需要手动实现 Send 和 Sync trait,因为由 Send 和 Sync 的类型组成的类型,自动就是 Send 和 Sync 的。因为他们是标记 trait,甚至都不需要实现任何方法。他们只是用来加强并发相关的不可变性的

     手动实现这些标记 trait 涉及到编写不安全的 Rust 代码,以后将会讲述具体的方法

     当前重要的是,在创建新的由不是 Send 和 Sync 的部分构成的并发类型时需要多加小心,以确保维持其安全保证

总结

  • Rust 提供了用于消息传递的通道,和像 Mutex<T> 和 Arc<T> 这样可以安全的用于并发上下文的智能指针
  • 类型系统和借用检查器会确保这些场景中的代码,不会出现数据竞争和无效的引用
    一旦代码可以编译了,就可以坚信这些代码可以正确的运行于多线程环境,而不会出现其他语言中经常出现的那些难以追踪的 bug

    并发编程不再是什么可怕的概念:无所畏惧地并发吧!

Next:面向对象

Previous:智能指针

Home: 目录