将我们的单线程服务器转变为多线程服务器

现在,服务器将依次处理每个请求,这意味着它不会 处理第二个连接,直到第一个连接完成处理。如果 server 收到越来越多的请求,则此串行执行会更少,并且 不太理想。如果服务器收到请求,需要很长时间才能 进程,后续请求将不得不等待,直到长请求 已完成,即使新请求可以快速处理。我们需要修复 但首先,我们要看看实际问题。

在当前 Server 实现中模拟慢速请求

我们将了解处理缓慢的请求如何影响向 我们当前的 Server 实现。示例 20-10 实现了处理请求 更改为 /sleep,并模拟缓慢响应,这将导致服务器进入睡眠状态 5 秒后才做出回应。

文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

示例 20-10:通过休眠 来模拟慢速请求 5 秒

我们从现在切换到现在有三个病例。我们需要 在 to 的切片上显式匹配 模式匹配 字符串文本值; 不执行自动引用,并且 dereferencing 就像 Equality 方法一样。ifmatchrequest_linematch

第一个分支与示例 20-9 中的块相同。第二臂 匹配对 /sleep 的请求。收到该请求后,服务器将 休眠 5 秒,然后渲染成功的 HTML 页面。第三个分支是 与示例 20-9 中的块相同。ifelse

你可以看到我们的服务器是多么原始:真正的库会处理 以不那么冗长的方式识别多个请求!

使用 启动服务器。然后打开两个浏览器窗口:一个用于 http://127.0.0.1:7878/,另一个用于 http://127.0.0.1:7878/sleep。如果 像以前一样,您输入 / URI 几次,您将看到它快速响应。 但是,如果你输入 /sleep 然后加载 /,你会看到 / 等到它睡了整整 5 秒才加载。cargo runsleep

我们可以使用多种技术来避免请求备份 一个缓慢的请求;我们将实现的是线程池。

使用线程池提高吞吐量

线程池是一组正在等待并准备好的衍生线程 处理任务。当程序收到新任务时,它会分配一个 线程添加到任务池中,该线程将处理该任务。这 池中的剩余线程可用于处理出现的任何其他任务 in 中。当第一个线程完成时 处理其任务时,它被返回到空闲线程池,准备处理 新任务。线程池允许您并发处理连接, 增加服务器的吞吐量。

我们将池中的线程数限制为较小的数字以保护我们 拒绝服务 (DoS) 攻击;如果我们让我们的程序创建一个新线程 对于每个收到的请求,有人向我们的 服务器可能会耗尽我们服务器的所有资源并磨练 停止处理请求。

因此,我们将拥有固定数量的 在池中等待的线程。传入的请求将发送到池 加工。池将维护传入请求的队列。每个 池中的线程将从这个队列中弹出一个请求,处理该请求, ,然后向队列请求另一个请求。有了这个设计,我们可以处理 到 requests 并发,其中 是线程数。如果每个 线程正在响应长时间运行的请求,后续请求仍可以 备份到队列中,但我们增加了长时间运行的请求的数量 我们可以在达到那个点之前处理。NN

这种技术只是提高 Web 吞吐量的众多方法之一 服务器。您可以探索的其他选项包括 fork/join 模型单线程异步 I/O 模型多线程异步 I/O 模型。如果 您对此主题感兴趣,您可以阅读有关其他解决方案和 尝试实施它们;对于像 Rust 这样的低级语言,所有这些 选项是可能的。

在我们开始实现线程池之前,我们先谈谈如何使用 pool 应该看起来像 .当您尝试设计代码时,编写客户端 Interface First 可以帮助指导您的设计。编写代码的 API,使其 以您想要的方式构建;然后实现该功能 而不是实现功能,然后 设计公共 API。

类似于我们在第 12 章项目中使用测试驱动开发的方式, 我们将在此处使用编译器驱动的开发。我们将编写调用 函数,然后我们将查看编译器中的错误以确定 我们接下来应该更改什么才能让代码工作。但是,在我们这样做之前, 我们将探索不打算用作起点的技术。

为每个请求生成一个线程

首先,让我们来探讨一下,如果代码确实为 每一个连接。如前所述,这不是我们的最终计划,因为 问题,但这是一个 首先获取正常工作的多线程服务器的起点。然后,我们将添加 thread pool 作为一种改进,对比这两种解决方案将是 容易。示例 20-11 显示了生成新线程所需的更改 来处理循环中的每个流。mainfor

文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

示例 20-11:为每个 流

正如您在第 16 章中学到的那样,将创建一个新线程,然后 在新线程中运行 Closure 中的代码。如果你运行这段代码并在浏览器中加载 /sleep,然后在另外两个浏览器选项卡中加载 /sleep,你确实会看到 对 / 的请求不必等待 /sleep 完成。但是,由于 我们提到,这最终会压垮系统,因为你会让 没有任何限制的新线程。thread::spawn

创建有限数量的线程

我们希望我们的线程池以类似、熟悉的方式工作,因此从 线程池不需要对使用 我们的 API。示例 20-12 显示了我们想要使用的结构体的假设接口,而不是 。ThreadPoolthread::spawn

文件名: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

示例 20-12:我们理想的 ThreadPool 接口

我们用于创建一个具有可配置编号的新线程池 线程,在本例中为 4 个。然后,在循环中,有一个 类似于 it takes a closure the pool 应该 run 的我们需要实现,因此需要 closure 并将其提供给池中的线程运行。此代码尚未 compile,但我们会尝试让编译器指导我们如何修复它。ThreadPool::newforpool.executethread::spawnpool.execute

使用 Compiler Driven Development 构建 ThreadPool

将示例 20-12 中的 src/main.rs 修改,然后让我们使用 编译器错误来驱动我们的开发。这是第一个 我们得到的错误:cargo check

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

伟大!这个错误告诉我们需要一个类型或模块,所以我们将 立即构建一个。我们的实现将独立于 我们的 Web 服务器正在做的工作。所以,让我们将 crate 从 binary crate 复制到一个库 crate 来保存我们的实现。后 我们改成一个库 crate,我们也可以使用单独的线程池 库来执行我们想要使用线程池做的任何工作,而不仅仅是用于服务 Web 请求。ThreadPoolThreadPoolhelloThreadPool

创建一个包含以下内容的 src/lib.rs,这是最简单的 我们现在可以拥有的结构体的定义:ThreadPool

文件名: src/lib.rs

pub struct ThreadPool;

然后编辑 main.rs 文件以从库中引入范围 crate,将以下代码添加到 src/main.rs 的顶部:ThreadPool

文件名: src/main.rs

use hello::ThreadPool;
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

这段代码仍然不起作用,但让我们再次检查它以获取下一个错误 我们需要解决:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

此错误表示接下来我们需要创建一个名为 的关联函数。我们也知道 需要有一个参数 它可以作为参数接受,并且应该返回一个实例。 让我们实现最简单的函数,该函数将具有这些 特性:newThreadPoolnew4ThreadPoolnew

文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

我们选择作为参数的类型,因为我们知道 负数线程没有任何意义。我们也知道我们将使用它 4 作为线程集合中的元素数,这就是类型的用途,如第 3 章的“整数类型”部分所述。usizesizeusize

让我们再次检查代码:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

现在出现错误是因为 上没有方法 。 回想一下“创建有限数量的 Threads“部分,我们 决定我们的线程池应该有一个类似于 的接口。在 此外,我们将实现该函数,以便它接受它的 given,并将其提供给池中的空闲线程运行。executeThreadPoolthread::spawnexecute

我们将定义方法 on 以将闭包作为 参数。回想一下第 13 章的 “Moving Captured Values out of the Closure and the Fn traits” 部分,我们可以采用 闭包作为具有三个不同特征的参数:、 、 和 。我们需要决定在这里使用哪种 closure 类型。我们知道我们会 最终做一些类似于标准库实现的事情,因此我们可以查看 Signature of 在其参数上的边界。该文档向我们展示了以下内容:executeThreadPoolFnFnMutFnOncethread::spawnthread::spawn

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

type 参数是我们在这里关注的参数;类型 parameter 与返回值相关,我们不关心这一点。我们 可以看到 用作绑定在 上的 trait 。这可能是 我们想要的,因为我们最终会将我们得到的参数传递给 。我们可以进一步确信这就是我们的特质 want to use,因为用于运行请求的线程只会执行该 request 的 close 值,它与 in 匹配。FTspawnFnOnceFexecutespawnFnOnceOnceFnOnce

type 参数还具有 trait bound 和 lifetime bound ,这在我们的情况下很有用:我们需要将 从一个线程到另一个线程的 closure,因为我们不知道多长时间 线程将执行。让我们创建一个方法,该方法将接受具有这些边界的 type 的泛型参数:FSend'staticSend'staticexecuteThreadPoolF

文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

我们仍然使用 after,因为它表示一个闭包 ,它不带任何参数并返回 Unit 类型 。就像功能一样 定义,则可以从签名中省略返回类型,但即使我们 没有参数,我们仍然需要括号。()FnOnceFnOnce()

同样,这是该方法最简单的实现:它确实 什么都没有,但我们只是尝试让我们的代码编译。让我们再检查一次:execute

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

它编译!但请注意,如果您尝试在 browser,您将在浏览器中看到我们在 章节。我们的库实际上还没有调用传递给的闭包!cargo runexecute

注意:您可能听说过使用严格编译器的语言,例如 Haskell 和 Rust 的意思是“如果代码编译,它就可以工作”。但这种说法并非如此 普遍正确。我们的项目可以编译,但它绝对不做任何事情!如果我们 正在构建一个真实、完整的项目,这将是一个开始的好时机 编写单元测试来检查代码是否可编译具有 要。

验证 new 中的线程数

我们没有对 和 的参数执行任何作。让我们 使用我们想要的行为实现这些函数的主体。首先, 让我们考虑一下。之前我们为参数选择了 unsigned 类型,因为线程数为负数的池没有意义。 但是,线程为零的池也没有意义,但 Zero 是完美的 有效。我们将添加代码来检查之前是否大于零 我们返回一个实例,如果程序收到 zero 使用宏,如示例 20-13 所示。newexecutenewsizeusizesizeThreadPoolassert!

文件名: src/lib.rs

pub struct ThreadPool;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

示例 20-13:实现 ThreadPool::new 以在 size 为 0 时出现 panic

我们还为 with doc comments 添加了一些文档。 请注意,我们遵循了良好的文档实践,添加了一个部分 调用函数可能 panic 的情况,如 第 14 章.尝试运行并单击结构体 以查看生成的 docs 是什么样子的!ThreadPoolcargo doc --openThreadPoolnew

我们可以像在 I/O 中所做的那样,而不是像在这里那样添加宏并返回 a project 中的实例。但是我们决定,在本例中,尝试创建一个 没有任何线程的线程池应该是不可恢复的错误。如果你是 雄心勃勃,尝试编写一个名为 签名与函数进行比较:assert!newbuildResultConfig::buildbuildnew

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

创建空间以存储线程

现在我们有了一种方法来知道我们有有效数量的线程要存储 池中,我们可以创建这些线程并将它们存储在结构体 在返回结构体之前。但是我们如何 “存储” 线程呢?让我们再来看一个 查看签名:ThreadPoolthread::spawn

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

该函数返回一个 ,其中 是 closure 返回。让我们也尝试使用,看看会发生什么。在我们的 case 中,我们传递给线程池的闭包将处理 Connection 并且不返回任何内容,因此将是 Unit 类型 。spawnJoinHandle<T>TJoinHandleT()

示例 20-14 中的代码可以编译,但还没有创建任何线程。 我们更改了 的定义以保存实例的向量,将 向量的容量初始化为 ,设置了一个循环,该循环将运行一些代码来创建线程,并且 返回一个包含它们的实例。ThreadPoolthread::JoinHandle<()>sizeforThreadPool

文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

示例 20-14:为 ThreadPool 创建一个 vector 来保存 线程

我们在 library crate 中引入了范围,因为我们是 using 作为向量中项目的类型 。std::threadthread::JoinHandleThreadPool

一旦收到有效大小,我们就会创建一个新的 vector,该 vector 可以 hold 项。该函数执行的任务与 向量。因为我们知道我们需要在 vector 中存储元素,所以执行 这种预先分配比使用 , 它会在插入元素时调整自身大小。ThreadPoolsizewith_capacityVec::newsizeVec::new

当您再次运行时,它应该会成功。cargo check

负责将代码从 ThreadPool 发送到线程的 Worker 结构体

我们在示例 20-14 的循环中留下了一条关于创建 线程。在这里,我们将看看我们实际上是如何创建线程的。标准 library 提供创建线程的方法,并希望获得一些代码,线程应该在 线程。但是,在我们的例子中,我们想要创建线程并拥有 它们等待我们稍后发送的代码。标准库的 threads 的实现不包括任何执行此作的方法;我们必须 手动实现它。forthread::spawnthread::spawn

我们将通过在 和 线程之间引入一个新的数据结构来实现这个行为,该数据结构将管理这个新行为。我们将调用 this data structure Worker,这是池化中的常用术语 实现。Worker 拾取需要运行的代码,并运行 code 在 Worker 的线程中。想想在 restaurant:工作人员等待客户收到订单,然后 他们负责接受这些订单并完成它们。ThreadPool

不是在线程池中存储实例向量, 我们将存储结构体的实例。每个 API 将存储一个实例。然后我们将在该 Will 上实现一个方法 获取要运行的代码的闭包,并将其发送到已在运行的线程 执行。我们还将为每个 worker 提供一个 API,以便我们可以区分 日志记录或调试时池中的不同工作程序。JoinHandle<()>WorkerWorkerJoinHandle<()>Workerid

以下是我们创建 .我们将 以这种方式进行设置后,实现将闭包发送到线程的代码:ThreadPoolWorker

  1. 定义一个包含 an 和 a 的结构体。WorkeridJoinHandle<()>
  2. 更改以保存实例向量。ThreadPoolWorker
  3. 定义一个函数,该函数接受一个数字并返回一个实例,该实例包含和以空 关闭。Worker::newidWorkerid
  4. 在 中,使用循环计数器生成一个 ,创建 a new 替换为 ,并将 worker 存储在 vector 中。ThreadPool::newforidWorkerid

如果您准备好迎接挑战,请尝试在此之前自行实施这些更改 看看示例 20-15 中的代码。

准备?这是示例 20-15,其中一种方法可以进行上述修改。

文件名: src/lib.rs

use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

示例 20-15:修改 ThreadPool 来保存 Worker 实例,而不是直接保存线程

我们已将字段的名称从 更改为 ,因为它现在保存的是实例而不是实例。我们使用循环中的 counter 作为 的参数,并将每个 new 存储在名为 .ThreadPoolthreadsworkersWorkerJoinHandle<()>forWorker::newWorkerworkers

外部代码(如我们在 src/main.rs 中的服务器)不需要知道 有关在 中使用结构的实现细节 , 因此,我们将 struct 及其函数设为 private。该函数使用 we give it 并存储一个实例,该实例是使用空闭包生成新线程创建的。WorkerThreadPoolWorkernewWorker::newidJoinHandle<()>

注意:如果作系统无法创建线程,因为没有 足够的系统资源,将 panic。这将导致我们的 整个服务器都会导致 panic,即使创建某些线程可能会 成功。为了简单起见,这种行为很好,但在 Production 中 thread pool 实现,您可能希望改用 std::thread::Builder 及其返回的 spawn 方法。thread::spawnResult

这段代码将编译并存储我们 指定为 的参数。但我们仍然没有处理 我们得到的 .让我们看看接下来如何做到这一点。WorkerThreadPool::newexecute

通过通道向线程发送请求

我们要解决的下一个问题是 given to do 的 closure 绝对没有。目前,我们得到了要在方法中执行的闭包。但是我们需要给 run 一个 closure ,当我们 在创建 .thread::spawnexecutethread::spawnWorkerThreadPool

我们希望刚刚创建的结构体能够获取要从中运行的代码 一个队列,并将该代码发送到其线程以运行。WorkerThreadPool

我们在第 16 章中学到的通道 — 一种简单的通信方式 两个线程 - 将非常适合此使用案例。我们将使用一个 channel 来运作 作为作业队列,并将作业从 实例,它将 Job 发送到其线程。这是计划:executeThreadPoolWorker

  1. 将创建一个频道并保留发件人。ThreadPool
  2. 每个都将抓住接收器。Worker
  3. 我们将创建一个新的结构体,它将保存我们想要发送的闭包 沿着海峡而下。Job
  4. 该方法将通过 寄件人。execute
  5. 在其线程中,will 遍历其接收器并执行 关闭它收到的任何 job。Worker

让我们首先创建一个通道并持有 sender 实例中,如示例 20-16 所示。结构体 目前不包含任何内容,但将是我们将发送的物品类型 频道。ThreadPool::newThreadPoolJob

文件名: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

示例 20-16:修改 ThreadPool 以存储 传输 Job 实例的通道的 sender

在 中,我们创建新通道,并让池保存 寄件人。这将成功编译。ThreadPool::new

让我们尝试将通道的接收器作为线程池传递给每个 worker 创建频道。我们知道我们想在线程中使用接收器, worker spawn,因此我们将在 Closure 中引用该参数。这 示例 20-17 中的代码还不能完全编译。receiver

文件名: src/lib.rs

use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--


struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

示例 20-17:将接收器传递给 worker

我们做了一些小而直接的改变:我们将接收器传递给 ,然后在闭包中使用它。Worker::new

当我们尝试检查此代码时,我们收到以下错误:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

代码正在尝试传递给多个实例。这 不起作用,因为您会记得第 16 章:通道实现 Rust 提供多个生产者、单个消费者。这意味着我们不能 只需克隆通道的消费端即可修复此代码。我们也不 希望向多个使用者多次发送消息;我们想要一个列表 的消息,以便每条消息都得到处理一次。receiverWorker

此外,从通道队列中取出作业涉及更改 ,因此线程需要一种安全的方式来共享和修改 ; 否则,我们可能会得到 race conditions (如 Chapter 16 所述)。receiverreceiver

回想一下第 16 章:共享中讨论的线程安全智能指针 所有权,并允许线程更改值,则 需要使用 .该类型将允许多个 worker 拥有 接收器,并确保只有一个 worker 从 接收器。示例 20-18 显示了我们需要做的更改。Arc<Mutex<T>>ArcMutex

文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
// --snip--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

示例 20-18:在 worker 之间共享接收器 使用 ArcMutex

在 中,我们将接收器放在 an 和 a 中。对于每个 new worker,我们克隆 the 来增加引用计数,以便 worker 可以 接管人的股份所有权。ThreadPool::newArcMutexArc

通过这些更改,代码将编译!我们快到了!

实现 execute 方法

最后,让我们在 上实现该方法。我们还将把 trait 对象的 struct 改为 type alias,该对象持有 接收的 Closure 的 Closure 中。如“创建类型同义词 with Type Aliases“部分,类型别名允许我们缩短 易于使用。请看示例 20-19。executeThreadPoolJobexecute

文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

示例 20-19:为保存每个闭包的 Box 创建一个 Job 类型别名,然后将 job 发送到通道中

使用我们得到的 closed 创建一个新实例后,我们 将该作业发送到通道的发送端。我们呼吁发送失败的情况。例如,如果我们 停止执行所有线程,这意味着接收端已停止 接收新消息。目前,我们无法阻止我们的线程 执行:只要池存在,我们的线程就会继续执行。这 我们使用的原因是我们知道失败情况不会发生,但是 编译器不知道这一点。Jobexecuteunwrapsendunwrap

但我们还没有完全完成!在 worker 中,我们传递给的 closure 仍然只引用 channel 的接收端。 相反,我们需要 Closure 永远循环,要求 channel 的 channel,并在获取 job 时运行 Job。让我们做出改变 如 示例 20-20 所示。thread::spawnWorker::new

文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

示例 20-20:接收和执行 worker 的线程

在这里,我们首先调用 the 来获取互斥锁,然后我们 call 来出现任何错误时出现 panic。如果互斥锁 处于中毒状态,如果其他线程在 持有锁而不是释放锁。在这种情况下,调用 让这个线程 panic 是正确的作。随意 将此更改为 an with an error message that is meaningful to 你。lockreceiverunwrapunwrapunwrapexpect

如果我们在互斥锁上获得锁,则调用 Receive a from 渠道。final 也会跳过此处可能发生的任何错误 如果包含发送方的线程已关闭,则类似于接收方关闭时该方法的返回方式。recvJobunwrapsendErr

对 blocks 的调用,因此如果还没有 job,则当前线程将 等待作业可用。这可确保一次只有一个线程尝试请求作业。recvMutex<T>Worker

我们的线程池现在处于工作状态!试一试 请求:cargo run

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
  --> src/lib.rs:48:5
   |
48 |     id: usize,
   |     ^^^^^^^^^

warning: field is never read: `thread`
  --> src/lib.rs:49:5
   |
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: `hello` (lib) generated 3 warnings
    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

成功!现在,我们有一个异步执行连接的线程池。 创建的线程永远不会超过四个,因此我们的系统不会获得 如果服务器收到大量请求,则为 overloaded。如果我们向 /sleep 发出请求,服务器将能够通过让另一个 thread 运行它们。

注意:如果您同时在多个浏览器窗口中打开 /sleep,则它们 可能会以 5 秒的间隔一次加载一个。某些 Web 浏览器执行 出于缓存原因,按顺序执行同一请求的多个实例。这 限制不是由我们的 Web 服务器引起的。

在第 18 章中了解了循环之后,您可能想知道 为什么我们没有编写示例 20-21 所示的 worker 线程代码。while let

文件名: src/lib.rs

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

示例 20-21:使用 while letWorker::new 的另一种实现

此代码可编译并运行,但不会产生所需的线程 行为:慢速请求仍会导致其他请求等待 处理。原因有点微妙:结构没有公共方法,因为锁的所有权基于 在该方法返回的 内。在编译时,借用检查器可以强制执行该规则 除非我们持有 锁。但是,此实现也可能导致锁被持有 如果我们不注意 .MutexunlockMutexGuard<T>LockResult<MutexGuard<T>>lockMutexMutexGuard<T>

示例 20-20 中 use 的代码是有效的,因为 with , any 表达式中使用的 Temporary 值位于 equals 的右侧 sign 在语句结束时立即删除。但是,(and 和 ) 在 关联的块。在示例 20-21 中,锁在 continue continue 中保持 的调用,意味着其他 worker 无法接收 job。let job = receiver.lock().unwrap().recv().unwrap();letletwhile letif letmatchjob()

本文档由官方文档翻译而来,如有差异请以官方英文文档(https://doc.rust-lang.org/)为准