使用消息传递在线程之间传输数据
确保安全并发的一种越来越流行的方法是消息 passing,其中线程或 Actor 通过相互发送消息进行通信 包含数据。这是 Go 语言中的口号中的想法 文档: “不要通过共享内存来交流;相反,通过通信来共享内存。
为了实现消息发送并发,Rust 的标准库提供了一个 通道的实现。通道是一个通用的编程概念,由 哪些数据从一个线程发送到另一个线程。
你可以把编程中的通道想象成一个定向通道 水,例如溪流或河流。如果你放一个类似橡皮鸭的东西 进入河流,它将顺流而下到达水道的尽头。
通道有两半:发射器和接收器。发射器的一半是 将橡皮鸭放入河中的上游位置,以及 接收器的一半是橡皮鸭子最终到达下游的地方。您的一部分 code 使用您要发送的数据调用 transmitter 上的方法,以及 另一部分检查接收端是否有到达的消息。表示一个频道 如果发射器或接收器的一半被丢弃,则关闭。
在这里,我们将创建一个程序,该程序具有一个线程来生成值和 将它们发送到一个通道,另一个线程将接收值和 打印出来。我们将使用通道在线程之间发送简单值 来说明该功能。熟悉该技术后,您可以 将 Channel 用于需要相互通信的任何线程,例如 作为聊天系统或许多线程执行部分计算的系统 并将各部分发送到一个聚合结果的线程。
首先,在示例 16-6 中,我们将创建一个 channel,但不对它做任何事情。 请注意,这还不会编译,因为 Rust 无法判断我们 想要通过通道发送。
文件名: src/main.rs
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
我们使用该函数创建一个新通道; 代表多个生产者、单个消费者。简而言之,Rust 的标准库
implements channels 意味着一个 channel 可以有多个发送端,这些
生成值,但只有一个接收端使用这些值。想象
多条溪流汇成一条大河:一切顺流而下
的溪流最终会汇入一条河流。我们将从一个
producer 的 Producer 创建,但当我们获得此示例时,我们将添加多个 producer
加工。mpsc::channel
mpsc
该函数返回一个元组,其第一个元素是
发送端 — 发送端 — 第二个元素是接收端 — 该
接收器。这些缩写 和 传统上用于许多领域
对于发射器和接收器,因此我们这样命名我们的变量
以指示每个端点。我们使用的语句的模式是
解构 Tuples;我们将讨论语句中模式的使用
和解构。现在,请知道使用语句
这种方式是提取返回的 Tuples 片段的便捷方法
由。mpsc::channel
tx
rx
let
let
let
mpsc::channel
让我们将传输端移动到一个生成的线程中,并让它发送一个 string,以便生成的线程与主线程通信,如 示例 16-7.这就像在上游的河里放一只橡皮鸭子或 将聊天消息从一个线程发送到另一个线程。
文件名: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
同样,我们用于创建新线程,然后使用 移动到闭包中,以便生成的线程拥有 .生成的
thread 需要拥有发送器才能通过
渠道。发射器有一个方法,它接受我们想要的值
发送。该方法返回一个类型,因此如果接收方具有
已经被丢弃,并且没有地方可以发送值,则 send作
将返回错误。在这个例子中,我们调用 panic 以防万一
的 error。但在实际应用程序中,我们会正确处理它:return 到
第 9 章回顾正确处理错误的策略。thread::spawn
move
tx
tx
send
send
Result<T, E>
unwrap
在示例 16-8 中,我们将从主线程中的接收器获取值。这 就像从河尽头的水中捞出橡皮鸭,或者 接收聊天消息。
文件名: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {received}"); }
接收器有两种有用的方法: 和 。我们正在使用 ,
receive 的缩写,它将阻塞主线程的执行并等待
直到沿通道发送值。发送值后,将
以 .当发射器关闭时,将返回
一个 error 来表示不会有更多值。recv
try_recv
recv
recv
Result<T, E>
recv
该方法不会阻塞,而是会立即返回一个:一个值,如果消息可用,则包含一个值,如果这次没有任何消息。在以下情况下使用 is useful
这个线程在等待消息时还有其他工作要做:我们可以编写一个
循环,如果
available,否则会执行其他工作一段时间,直到选中
再。try_recv
Result<T, E>
Ok
Err
try_recv
try_recv
为了简单起见,我们在此示例中使用了 API;我们没有任何其他工作
对于主线程来说,除了等待消息之外,所以阻塞主线程
thread 是合适的。recv
当我们运行示例 16-8 中的代码时,我们会看到从 main 线:
Got: hi
完善!
渠道和所有权转移
所有权规则在消息发送中起着至关重要的作用,因为它们可以帮助您
编写安全的并发代码。防止并发编程中的错误是
在整个 Rust 程序中考虑所有权的优势。让我们开始吧
一个实验,用于展示频道和所有权如何协同工作来防止
问题:在完成
把它送到了 Channel 上。尝试编译示例 16-9 中的代码,看看为什么
不允许使用此代码:val
文件名: src/main.rs
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
在这里,我们尝试在通过 .
允许这将是一个坏主意:一旦该值已发送到另一个值
thread 的 Thread 中,该线程可以在我们尝试使用值
再。其他线程的修改可能会导致错误或
由于数据不一致或不存在而导致的意外结果。但是,Rust 提供
如果我们尝试编译示例 16-9 中的代码,则会出现错误:val
tx.send
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
我们的并发错误导致了编译时错误。函数
获取其参数的所有权,当值被移动时,接收器
拥有它的所有权。这可以防止我们再次意外地使用 value
发送后;所有权系统检查一切正常。send
发送多个值并看到接收方正在等待
示例 16-8 中的代码编译并运行,但它没有清楚地告诉我们 两个独立的线程通过频道相互通信。在列表中 16-10 我们做了一些修改,以证明示例 16-8 中的代码是 同时运行:生成的线程现在将发送多条消息,并且 在每条消息之间暂停一秒钟。
文件名: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
这一次,生成的线程有一个我们想要发送到的字符串向量
主线程。我们迭代它们,单独发送每个,然后暂停
通过调用值为
1 秒。thread::sleep
Duration
在主线程中,我们不再显式调用该函数:
相反,我们将 iterator 视为一个迭代器。对于收到的每个值,我们是
打印它。当通道关闭时,迭代将结束。recv
rx
运行示例 16-10 中的代码时,您应该会看到以下输出 每行之间有 1 秒的停顿:
Got: hi
Got: from
Got: the
Got: thread
因为我们没有任何代码会在
main thread 中,我们可以看出主线程正在等待接收来自
生成的线程。for
通过克隆发射机创建多个生产者
前面我们提到过,这是多个生产者的首字母缩写词,
单一消费者。让我们使用并扩展示例 16-10 中的代码
创建多个线程,这些线程都向同一个接收器发送值。我们能做到
所以通过克隆发射器,如示例 16-11 所示:mpsc
mpsc
文件名: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
这一次,在创建第一个生成的线程之前,我们调用
发射机。这将为我们提供一个新的发射器,我们可以传递给第一个
spawned thread 的 Thread 中。我们将原始 transmitter 传递给第二个生成的线程。
这为我们提供了两个线程,每个线程向一个接收器发送不同的消息。clone
当您运行代码时,您的输出应如下所示:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
您可能会按其他顺序看到值,具体取决于您的系统。这是
是什么让并发既有趣又困难。如果你试验 ,在不同的线程中给它不同的值,每个线程都会运行
将更加不确定,并且每次都会创建不同的输出。thread::sleep
现在我们已经了解了渠道的工作原理,让我们看看 并发。
本文档由官方文档翻译而来,如有差异请以官方英文文档(https://doc.rust-lang.org/)为准