使用消息传递在线程之间传输数据

确保安全并发的一种越来越流行的方法是消息 passing,其中线程或 Actor 通过相互发送消息进行通信 包含数据。这是 Go 语言中的口号中的想法 文档: “不要通过共享内存来交流;相反,通过通信来共享内存。

为了实现消息发送并发,Rust 的标准库提供了一个 通道的实现。通道是一个通用的编程概念,由 哪些数据从一个线程发送到另一个线程。

你可以把编程中的通道想象成一个定向通道 水,例如溪流或河流。如果你放一个类似橡皮鸭的东西 进入河流,它将顺流而下到达水道的尽头。

通道有两半:发射器和接收器。发射器的一半是 将橡皮鸭放入河中的上游位置,以及 接收器的一半是橡皮鸭子最终到达下游的地方。您的一部分 code 使用您要发送的数据调用 transmitter 上的方法,以及 另一部分检查接收端是否有到达的消息。表示一个频道 如果发射器或接收器的一半被丢弃,则关闭

在这里,我们将创建一个程序,该程序具有一个线程来生成值和 将它们发送到一个通道,另一个线程将接收值和 打印出来。我们将使用通道在线程之间发送简单值 来说明该功能。熟悉该技术后,您可以 将 Channel 用于需要相互通信的任何线程,例如 作为聊天系统或许多线程执行部分计算的系统 并将各部分发送到一个聚合结果的线程。

首先,在示例 16-6 中,我们将创建一个 channel,但不对它做任何事情。 请注意,这还不会编译,因为 Rust 无法判断我们 想要通过通道发送。

文件名: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

示例 16-6:创建一个 channel 并分配两个 tx rx 的一半

我们使用该函数创建一个新通道; 代表多个生产者、单个消费者。简而言之,Rust 的标准库 implements channels 意味着一个 channel 可以有多个发送端,这些 生成值,但只有一个接收端使用这些值。想象 多条溪流汇成一条大河:一切顺流而下 的溪流最终会汇入一条河流。我们将从一个 producer 的 Producer 创建,但当我们获得此示例时,我们将添加多个 producer 加工。mpsc::channelmpsc

该函数返回一个元组,其第一个元素是 发送端 — 发送端 — 第二个元素是接收端 — 该 接收器。这些缩写 和 传统上用于许多领域 对于发射器接收器,因此我们这样命名我们的变量 以指示每个端点。我们使用的语句的模式是 解构 Tuples;我们将讨论语句中模式的使用 和解构。现在,请知道使用语句 这种方式是提取返回的 Tuples 片段的便捷方法 由。mpsc::channeltxrxletletletmpsc::channel

让我们将传输端移动到一个生成的线程中,并让它发送一个 string,以便生成的线程与主线程通信,如 示例 16-7.这就像在上游的河里放一只橡皮鸭子或 将聊天消息从一个线程发送到另一个线程。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

示例 16-7:将 tx 移动到一个生成的线程并发送 “嗨”

同样,我们用于创建新线程,然后使用 移动到闭包中,以便生成的线程拥有 .生成的 thread 需要拥有发送器才能通过 渠道。发射器有一个方法,它接受我们想要的值 发送。该方法返回一个类型,因此如果接收方具有 已经被丢弃,并且没有地方可以发送值,则 send作 将返回错误。在这个例子中,我们调用 panic 以防万一 的 error。但在实际应用程序中,我们会正确处理它:return 到 第 9 章回顾正确处理错误的策略。thread::spawnmovetxtxsendsendResult<T, E>unwrap

在示例 16-8 中,我们将从主线程中的接收器获取值。这 就像从河尽头的水中捞出橡皮鸭,或者 接收聊天消息。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

示例 16-8:在主线程中接收值 “hi” 并打印它

接收器有两种有用的方法: 和 。我们正在使用 , receive 的缩写,它将阻塞主线程的执行并等待 直到沿通道发送值。发送值后,将 以 .当发射器关闭时,将返回 一个 error 来表示不会有更多值。recvtry_recvrecvrecvResult<T, E>recv

该方法不会阻塞,而是会立即返回一个:一个值,如果消息可用,则包含一个值,如果这次没有任何消息。在以下情况下使用 is useful 这个线程在等待消息时还有其他工作要做:我们可以编写一个 循环,如果 available,否则会执行其他工作一段时间,直到选中 再。try_recvResult<T, E>OkErrtry_recvtry_recv

为了简单起见,我们在此示例中使用了 API;我们没有任何其他工作 对于主线程来说,除了等待消息之外,所以阻塞主线程 thread 是合适的。recv

当我们运行示例 16-8 中的代码时,我们会看到从 main 线:

Got: hi

完善!

渠道和所有权转移

所有权规则在消息发送中起着至关重要的作用,因为它们可以帮助您 编写安全的并发代码。防止并发编程中的错误是 在整个 Rust 程序中考虑所有权的优势。让我们开始吧 一个实验,用于展示频道和所有权如何协同工作来防止 问题:在完成 把它送到了 Channel 上。尝试编译示例 16-9 中的代码,看看为什么 不允许使用此代码:val

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

示例 16-9:在我们发送 val 后尝试使用它 沿着英吉利海峡

在这里,我们尝试在通过 . 允许这将是一个坏主意:一旦该值已发送到另一个值 thread 的 Thread 中,该线程可以在我们尝试使用值 再。其他线程的修改可能会导致错误或 由于数据不一致或不存在而导致的意外结果。但是,Rust 提供 如果我们尝试编译示例 16-9 中的代码,则会出现错误:valtx.send

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                          ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

我们的并发错误导致了编译时错误。函数 获取其参数的所有权,当值被移动时,接收器 拥有它的所有权。这可以防止我们再次意外地使用 value 发送后;所有权系统检查一切正常。send

发送多个值并看到接收方正在等待

示例 16-8 中的代码编译并运行,但它没有清楚地告诉我们 两个独立的线程通过频道相互通信。在列表中 16-10 我们做了一些修改,以证明示例 16-8 中的代码是 同时运行:生成的线程现在将发送多条消息,并且 在每条消息之间暂停一秒钟。

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

示例 16-10:发送多条消息并暂停 在每个

这一次,生成的线程有一个我们想要发送到的字符串向量 主线程。我们迭代它们,单独发送每个,然后暂停 通过调用值为 1 秒。thread::sleepDuration

在主线程中,我们不再显式调用该函数: 相反,我们将 iterator 视为一个迭代器。对于收到的每个值,我们是 打印它。当通道关闭时,迭代将结束。recvrx

运行示例 16-10 中的代码时,您应该会看到以下输出 每行之间有 1 秒的停顿:

Got: hi
Got: from
Got: the
Got: thread

因为我们没有任何代码会在 main thread 中,我们可以看出主线程正在等待接收来自 生成的线程。for

通过克隆发射机创建多个生产者

前面我们提到过,这是多个生产者的首字母缩写词, 单一消费者。让我们使用并扩展示例 16-10 中的代码 创建多个线程,这些线程都向同一个接收器发送值。我们能做到 所以通过克隆发射器,如示例 16-11 所示:mpscmpsc

文件名: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}

示例 16-11:从多个发送多条消息 生产者

这一次,在创建第一个生成的线程之前,我们调用 发射机。这将为我们提供一个新的发射器,我们可以传递给第一个 spawned thread 的 Thread 中。我们将原始 transmitter 传递给第二个生成的线程。 这为我们提供了两个线程,每个线程向一个接收器发送不同的消息。clone

当您运行代码时,您的输出应如下所示:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

您可能会按其他顺序看到值,具体取决于您的系统。这是 是什么让并发既有趣又困难。如果你试验 ,在不同的线程中给它不同的值,每个线程都会运行 将更加不确定,并且每次都会创建不同的输出。thread::sleep

现在我们已经了解了渠道的工作原理,让我们看看 并发。

本文档由官方文档翻译而来,如有差异请以官方英文文档(https://doc.rust-lang.org/)为准