将我们的单线程服务器转变为多线程服务器
现在,服务器将依次处理每个请求,这意味着它不会 处理第二个连接,直到第一个连接完成处理。如果 server 收到越来越多的请求,则此串行执行会更少,并且 不太理想。如果服务器收到请求,需要很长时间才能 进程,后续请求将不得不等待,直到长请求 已完成,即使新请求可以快速处理。我们需要修复 但首先,我们要看看实际问题。
在当前 Server 实现中模拟慢速请求
我们将了解处理缓慢的请求如何影响向 我们当前的 Server 实现。示例 20-10 实现了处理请求 更改为 /sleep,并模拟缓慢响应,这将导致服务器进入睡眠状态 5 秒后才做出回应。
文件名: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
我们从现在切换到现在有三个病例。我们需要
在 to 的切片上显式匹配 模式匹配
字符串文本值; 不执行自动引用,并且
dereferencing 就像 Equality 方法一样。if
match
request_line
match
第一个分支与示例 20-9 中的块相同。第二臂
匹配对 /sleep 的请求。收到该请求后,服务器将
休眠 5 秒,然后渲染成功的 HTML 页面。第三个分支是
与示例 20-9 中的块相同。if
else
你可以看到我们的服务器是多么原始:真正的库会处理 以不那么冗长的方式识别多个请求!
使用 启动服务器。然后打开两个浏览器窗口:一个用于 http://127.0.0.1:7878/,另一个用于 http://127.0.0.1:7878/sleep。如果
像以前一样,您输入 / URI 几次,您将看到它快速响应。
但是,如果你输入 /sleep 然后加载 /,你会看到 / 等到它睡了整整 5 秒才加载。cargo run
sleep
我们可以使用多种技术来避免请求备份 一个缓慢的请求;我们将实现的是线程池。
使用线程池提高吞吐量
线程池是一组正在等待并准备好的衍生线程 处理任务。当程序收到新任务时,它会分配一个 线程添加到任务池中,该线程将处理该任务。这 池中的剩余线程可用于处理出现的任何其他任务 in 中。当第一个线程完成时 处理其任务时,它被返回到空闲线程池,准备处理 新任务。线程池允许您并发处理连接, 增加服务器的吞吐量。
我们将池中的线程数限制为较小的数字以保护我们 拒绝服务 (DoS) 攻击;如果我们让我们的程序创建一个新线程 对于每个收到的请求,有人向我们的 服务器可能会耗尽我们服务器的所有资源并磨练 停止处理请求。
因此,我们将拥有固定数量的
在池中等待的线程。传入的请求将发送到池
加工。池将维护传入请求的队列。每个
池中的线程将从这个队列中弹出一个请求,处理该请求,
,然后向队列请求另一个请求。有了这个设计,我们可以处理
到 requests 并发,其中 是线程数。如果每个
线程正在响应长时间运行的请求,后续请求仍可以
备份到队列中,但我们增加了长时间运行的请求的数量
我们可以在达到那个点之前处理。N
N
这种技术只是提高 Web 吞吐量的众多方法之一 服务器。您可以探索的其他选项包括 fork/join 模型、单线程异步 I/O 模型或多线程异步 I/O 模型。如果 您对此主题感兴趣,您可以阅读有关其他解决方案和 尝试实施它们;对于像 Rust 这样的低级语言,所有这些 选项是可能的。
在我们开始实现线程池之前,我们先谈谈如何使用 pool 应该看起来像 .当您尝试设计代码时,编写客户端 Interface First 可以帮助指导您的设计。编写代码的 API,使其 以您想要的方式构建;然后实现该功能 而不是实现功能,然后 设计公共 API。
类似于我们在第 12 章项目中使用测试驱动开发的方式, 我们将在此处使用编译器驱动的开发。我们将编写调用 函数,然后我们将查看编译器中的错误以确定 我们接下来应该更改什么才能让代码工作。但是,在我们这样做之前, 我们将探索不打算用作起点的技术。
为每个请求生成一个线程
首先,让我们来探讨一下,如果代码确实为
每一个连接。如前所述,这不是我们的最终计划,因为
问题,但这是一个
首先获取正常工作的多线程服务器的起点。然后,我们将添加
thread pool 作为一种改进,对比这两种解决方案将是
容易。示例 20-11 显示了生成新线程所需的更改
来处理循环中的每个流。main
for
文件名: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
正如您在第 16 章中学到的那样,将创建一个新线程,然后
在新线程中运行 Closure 中的代码。如果你运行这段代码并在浏览器中加载 /sleep,然后在另外两个浏览器选项卡中加载 /sleep,你确实会看到
对 / 的请求不必等待 /sleep 完成。但是,由于
我们提到,这最终会压垮系统,因为你会让
没有任何限制的新线程。thread::spawn
创建有限数量的线程
我们希望我们的线程池以类似、熟悉的方式工作,因此从
线程池不需要对使用
我们的 API。示例 20-12 显示了我们想要使用的结构体的假设接口,而不是 。ThreadPool
thread::spawn
文件名: src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
我们用于创建一个具有可配置编号的新线程池
线程,在本例中为 4 个。然后,在循环中,有一个
类似于 it takes a closure the pool 应该
run 的我们需要实现,因此需要
closure 并将其提供给池中的线程运行。此代码尚未
compile,但我们会尝试让编译器指导我们如何修复它。ThreadPool::new
for
pool.execute
thread::spawn
pool.execute
使用 Compiler Driven Development 构建 ThreadPool
将示例 20-12 中的 src/main.rs 修改,然后让我们使用
编译器错误来驱动我们的开发。这是第一个
我们得到的错误:cargo check
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
伟大!这个错误告诉我们需要一个类型或模块,所以我们将
立即构建一个。我们的实现将独立于
我们的 Web 服务器正在做的工作。所以,让我们将 crate 从
binary crate 复制到一个库 crate 来保存我们的实现。后
我们改成一个库 crate,我们也可以使用单独的线程池
库来执行我们想要使用线程池做的任何工作,而不仅仅是用于服务
Web 请求。ThreadPool
ThreadPool
hello
ThreadPool
创建一个包含以下内容的 src/lib.rs,这是最简单的
我们现在可以拥有的结构体的定义:ThreadPool
文件名: src/lib.rs
pub struct ThreadPool;
然后编辑 main.rs 文件以从库中引入范围
crate,将以下代码添加到 src/main.rs 的顶部:ThreadPool
文件名: src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
这段代码仍然不起作用,但让我们再次检查它以获取下一个错误 我们需要解决:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
此错误表示接下来我们需要创建一个名为 的关联函数。我们也知道 需要有一个参数
它可以作为参数接受,并且应该返回一个实例。
让我们实现最简单的函数,该函数将具有这些
特性:new
ThreadPool
new
4
ThreadPool
new
文件名: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
我们选择作为参数的类型,因为我们知道
负数线程没有任何意义。我们也知道我们将使用它
4 作为线程集合中的元素数,这就是类型的用途,如第 3 章的“整数类型”部分所述。usize
size
usize
让我们再次检查代码:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
现在出现错误是因为 上没有方法 。
回想一下“创建有限数量的
Threads“部分,我们
决定我们的线程池应该有一个类似于 的接口。在
此外,我们将实现该函数,以便它接受它的
given,并将其提供给池中的空闲线程运行。execute
ThreadPool
thread::spawn
execute
我们将定义方法 on 以将闭包作为
参数。回想一下第 13 章的 “Moving Captured Values out of the Closure and the Fn
traits” 部分,我们可以采用
闭包作为具有三个不同特征的参数:、 、 和 。我们需要决定在这里使用哪种 closure 类型。我们知道我们会
最终做一些类似于标准库实现的事情,因此我们可以查看 Signature of 在其参数上的边界。该文档向我们展示了以下内容:execute
ThreadPool
Fn
FnMut
FnOnce
thread::spawn
thread::spawn
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
type 参数是我们在这里关注的参数;类型
parameter 与返回值相关,我们不关心这一点。我们
可以看到 用作绑定在 上的 trait 。这可能是
我们想要的,因为我们最终会将我们得到的参数传递给 。我们可以进一步确信这就是我们的特质
want to use,因为用于运行请求的线程只会执行该
request 的 close 值,它与 in 匹配。F
T
spawn
FnOnce
F
execute
spawn
FnOnce
Once
FnOnce
type 参数还具有 trait bound 和 lifetime bound ,这在我们的情况下很有用:我们需要将
从一个线程到另一个线程的 closure,因为我们不知道多长时间
线程将执行。让我们创建一个方法,该方法将接受具有这些边界的 type 的泛型参数:F
Send
'static
Send
'static
execute
ThreadPool
F
文件名: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们仍然使用 after,因为它表示一个闭包
,它不带任何参数并返回 Unit 类型 。就像功能一样
定义,则可以从签名中省略返回类型,但即使我们
没有参数,我们仍然需要括号。()
FnOnce
FnOnce
()
同样,这是该方法最简单的实现:它确实
什么都没有,但我们只是尝试让我们的代码编译。让我们再检查一次:execute
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
它编译!但请注意,如果您尝试在
browser,您将在浏览器中看到我们在
章节。我们的库实际上还没有调用传递给的闭包!cargo run
execute
注意:您可能听说过使用严格编译器的语言,例如 Haskell 和 Rust 的意思是“如果代码编译,它就可以工作”。但这种说法并非如此 普遍正确。我们的项目可以编译,但它绝对不做任何事情!如果我们 正在构建一个真实、完整的项目,这将是一个开始的好时机 编写单元测试来检查代码是否可编译并具有 要。
验证 new
中的线程数
我们没有对 和 的参数执行任何作。让我们
使用我们想要的行为实现这些函数的主体。首先,
让我们考虑一下。之前我们为参数选择了 unsigned 类型,因为线程数为负数的池没有意义。
但是,线程为零的池也没有意义,但 Zero 是完美的
有效。我们将添加代码来检查之前是否大于零
我们返回一个实例,如果程序收到
zero 使用宏,如示例 20-13 所示。new
execute
new
size
usize
size
ThreadPool
assert!
文件名: src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们还为 with doc comments 添加了一些文档。
请注意,我们遵循了良好的文档实践,添加了一个部分
调用函数可能 panic 的情况,如
第 14 章.尝试运行并单击结构体
以查看生成的 docs 是什么样子的!ThreadPool
cargo doc --open
ThreadPool
new
我们可以像在 I/O 中所做的那样,而不是像在这里那样添加宏并返回 a
project 中的实例。但是我们决定,在本例中,尝试创建一个
没有任何线程的线程池应该是不可恢复的错误。如果你是
雄心勃勃,尝试编写一个名为
签名与函数进行比较:assert!
new
build
Result
Config::build
build
new
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
创建空间以存储线程
现在我们有了一种方法来知道我们有有效数量的线程要存储
池中,我们可以创建这些线程并将它们存储在结构体
在返回结构体之前。但是我们如何 “存储” 线程呢?让我们再来看一个
查看签名:ThreadPool
thread::spawn
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
该函数返回一个 ,其中 是
closure 返回。让我们也尝试使用,看看会发生什么。在我们的
case 中,我们传递给线程池的闭包将处理 Connection
并且不返回任何内容,因此将是 Unit 类型 。spawn
JoinHandle<T>
T
JoinHandle
T
()
示例 20-14 中的代码可以编译,但还没有创建任何线程。
我们更改了 的定义以保存实例的向量,将 向量的容量初始化为 ,设置了一个循环,该循环将运行一些代码来创建线程,并且
返回一个包含它们的实例。ThreadPool
thread::JoinHandle<()>
size
for
ThreadPool
文件名: src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们在 library crate 中引入了范围,因为我们是
using 作为向量中项目的类型 。std::thread
thread::JoinHandle
ThreadPool
一旦收到有效大小,我们就会创建一个新的 vector,该 vector 可以
hold 项。该函数执行的任务与
向量。因为我们知道我们需要在 vector 中存储元素,所以执行
这种预先分配比使用 ,
它会在插入元素时调整自身大小。ThreadPool
size
with_capacity
Vec::new
size
Vec::new
当您再次运行时,它应该会成功。cargo check
负责将代码从 ThreadPool
发送到线程的 Worker
结构体
我们在示例 20-14 的循环中留下了一条关于创建
线程。在这里,我们将看看我们实际上是如何创建线程的。标准
library 提供创建线程的方法,并希望获得一些代码,线程应该在
线程。但是,在我们的例子中,我们想要创建线程并拥有
它们等待我们稍后发送的代码。标准库的
threads 的实现不包括任何执行此作的方法;我们必须
手动实现它。for
thread::spawn
thread::spawn
我们将通过在 和 线程之间引入一个新的数据结构来实现这个行为,该数据结构将管理这个新行为。我们将调用
this data structure Worker,这是池化中的常用术语
实现。Worker 拾取需要运行的代码,并运行
code 在 Worker 的线程中。想想在
restaurant:工作人员等待客户收到订单,然后
他们负责接受这些订单并完成它们。ThreadPool
不是在线程池中存储实例向量,
我们将存储结构体的实例。每个 API 将存储一个实例。然后我们将在该 Will 上实现一个方法
获取要运行的代码的闭包,并将其发送到已在运行的线程
执行。我们还将为每个 worker 提供一个 API,以便我们可以区分
日志记录或调试时池中的不同工作程序。JoinHandle<()>
Worker
Worker
JoinHandle<()>
Worker
id
以下是我们创建 .我们将
以这种方式进行设置后,实现将闭包发送到线程的代码:ThreadPool
Worker
- 定义一个包含 an 和 a 的结构体。
Worker
id
JoinHandle<()>
- 更改以保存实例向量。
ThreadPool
Worker
- 定义一个函数,该函数接受一个数字并返回一个实例,该实例包含和以空
关闭。
Worker::new
id
Worker
id
- 在 中,使用循环计数器生成一个 ,创建
a new 替换为 ,并将 worker 存储在 vector 中。
ThreadPool::new
for
id
Worker
id
如果您准备好迎接挑战,请尝试在此之前自行实施这些更改 看看示例 20-15 中的代码。
准备?这是示例 20-15,其中一种方法可以进行上述修改。
文件名: src/lib.rs
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
我们已将字段的名称从 更改为 ,因为它现在保存的是实例而不是实例。我们使用循环中的 counter 作为 的参数,并将每个 new 存储在名为 .ThreadPool
threads
workers
Worker
JoinHandle<()>
for
Worker::new
Worker
workers
外部代码(如我们在 src/main.rs 中的服务器)不需要知道
有关在 中使用结构的实现细节 ,
因此,我们将 struct 及其函数设为 private。该函数使用 we give it 并存储一个实例,该实例是使用空闭包生成新线程创建的。Worker
ThreadPool
Worker
new
Worker::new
id
JoinHandle<()>
注意:如果作系统无法创建线程,因为没有
足够的系统资源,将 panic。这将导致我们的
整个服务器都会导致 panic,即使创建某些线程可能会
成功。为了简单起见,这种行为很好,但在 Production 中
thread pool 实现,您可能希望改用 std::thread::Builder
及其返回的 spawn
方法。thread::spawn
Result
这段代码将编译并存储我们
指定为 的参数。但我们仍然没有处理
我们得到的 .让我们看看接下来如何做到这一点。Worker
ThreadPool::new
execute
通过通道向线程发送请求
我们要解决的下一个问题是 given to do 的 closure
绝对没有。目前,我们得到了要在方法中执行的闭包。但是我们需要给 run 一个 closure ,当我们
在创建 .thread::spawn
execute
thread::spawn
Worker
ThreadPool
我们希望刚刚创建的结构体能够获取要从中运行的代码
一个队列,并将该代码发送到其线程以运行。Worker
ThreadPool
我们在第 16 章中学到的通道 — 一种简单的通信方式
两个线程 - 将非常适合此使用案例。我们将使用一个 channel 来运作
作为作业队列,并将作业从
实例,它将 Job 发送到其线程。这是计划:execute
ThreadPool
Worker
- 将创建一个频道并保留发件人。
ThreadPool
- 每个都将抓住接收器。
Worker
- 我们将创建一个新的结构体,它将保存我们想要发送的闭包
沿着海峡而下。
Job
- 该方法将通过
寄件人。
execute
- 在其线程中,will 遍历其接收器并执行
关闭它收到的任何 job。
Worker
让我们首先创建一个通道并持有 sender
实例中,如示例 20-16 所示。结构体
目前不包含任何内容,但将是我们将发送的物品类型
频道。ThreadPool::new
ThreadPool
Job
文件名: src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
在 中,我们创建新通道,并让池保存
寄件人。这将成功编译。ThreadPool::new
让我们尝试将通道的接收器作为线程池传递给每个 worker
创建频道。我们知道我们想在线程中使用接收器,
worker spawn,因此我们将在 Closure 中引用该参数。这
示例 20-17 中的代码还不能完全编译。receiver
文件名: src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
我们做了一些小而直接的改变:我们将接收器传递给 ,然后在闭包中使用它。Worker::new
当我们尝试检查此代码时,我们收到以下错误:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
代码正在尝试传递给多个实例。这
不起作用,因为您会记得第 16 章:通道实现
Rust 提供多个生产者、单个消费者。这意味着我们不能
只需克隆通道的消费端即可修复此代码。我们也不
希望向多个使用者多次发送消息;我们想要一个列表
的消息,以便每条消息都得到处理一次。receiver
Worker
此外,从通道队列中取出作业涉及更改 ,因此线程需要一种安全的方式来共享和修改 ;
否则,我们可能会得到 race conditions (如 Chapter 16 所述)。receiver
receiver
回想一下第 16 章:共享中讨论的线程安全智能指针
所有权,并允许线程更改值,则
需要使用 .该类型将允许多个 worker 拥有
接收器,并确保只有一个 worker 从
接收器。示例 20-18 显示了我们需要做的更改。Arc<Mutex<T>>
Arc
Mutex
文件名: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
在 中,我们将接收器放在 an 和 a 中。对于每个
new worker,我们克隆 the 来增加引用计数,以便 worker 可以
接管人的股份所有权。ThreadPool::new
Arc
Mutex
Arc
通过这些更改,代码将编译!我们快到了!
实现 execute
方法
最后,让我们在 上实现该方法。我们还将把 trait 对象的 struct 改为 type alias,该对象持有
接收的 Closure 的 Closure 中。如“创建类型同义词
with Type Aliases“部分,类型别名允许我们缩短
易于使用。请看示例 20-19。execute
ThreadPool
Job
execute
文件名: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
使用我们得到的 closed 创建一个新实例后,我们
将该作业发送到通道的发送端。我们呼吁发送失败的情况。例如,如果我们
停止执行所有线程,这意味着接收端已停止
接收新消息。目前,我们无法阻止我们的线程
执行:只要池存在,我们的线程就会继续执行。这
我们使用的原因是我们知道失败情况不会发生,但是
编译器不知道这一点。Job
execute
unwrap
send
unwrap
但我们还没有完全完成!在 worker 中,我们传递给的 closure 仍然只引用 channel 的接收端。
相反,我们需要 Closure 永远循环,要求
channel 的 channel,并在获取 job 时运行 Job。让我们做出改变
如 示例 20-20 所示。thread::spawn
Worker::new
文件名: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
在这里,我们首先调用 the 来获取互斥锁,然后我们
call 来出现任何错误时出现 panic。如果互斥锁
处于中毒状态,如果其他线程在
持有锁而不是释放锁。在这种情况下,调用 让这个线程 panic 是正确的作。随意
将此更改为 an with an error message that is meaningful to
你。lock
receiver
unwrap
unwrap
unwrap
expect
如果我们在互斥锁上获得锁,则调用 Receive a from
渠道。final 也会跳过此处可能发生的任何错误
如果包含发送方的线程已关闭,则类似于接收方关闭时该方法的返回方式。recv
Job
unwrap
send
Err
对 blocks 的调用,因此如果还没有 job,则当前线程将
等待作业可用。这可确保一次只有一个线程尝试请求作业。recv
Mutex<T>
Worker
我们的线程池现在处于工作状态!试一试
请求:cargo run
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功!现在,我们有一个异步执行连接的线程池。 创建的线程永远不会超过四个,因此我们的系统不会获得 如果服务器收到大量请求,则为 overloaded。如果我们向 /sleep 发出请求,服务器将能够通过让另一个 thread 运行它们。
注意:如果您同时在多个浏览器窗口中打开 /sleep,则它们 可能会以 5 秒的间隔一次加载一个。某些 Web 浏览器执行 出于缓存原因,按顺序执行同一请求的多个实例。这 限制不是由我们的 Web 服务器引起的。
在第 18 章中了解了循环之后,您可能想知道
为什么我们没有编写示例 20-21 所示的 worker 线程代码。while let
文件名: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
此代码可编译并运行,但不会产生所需的线程
行为:慢速请求仍会导致其他请求等待
处理。原因有点微妙:结构没有公共方法,因为锁的所有权基于
在该方法返回的 内。在编译时,借用检查器可以强制执行该规则
除非我们持有
锁。但是,此实现也可能导致锁被持有
如果我们不注意 .Mutex
unlock
MutexGuard<T>
LockResult<MutexGuard<T>>
lock
Mutex
MutexGuard<T>
示例 20-20 中 use 的代码是有效的,因为 with , any
表达式中使用的 Temporary 值位于 equals 的右侧
sign 在语句结束时立即删除。但是,(and 和 ) 在
关联的块。在示例 20-21 中,锁在 continue continue 中保持
的调用,意味着其他 worker 无法接收 job。let job = receiver.lock().unwrap().recv().unwrap();
let
let
while let
if let
match
job()
本文档由官方文档翻译而来,如有差异请以官方英文文档(https://doc.rust-lang.org/)为准