在某些情况下,你需要并发地执行许多短期任务。创建和销毁执行这些任务线程的开销可能会抑制程序的性能。解决这个问题的一个办法是建立一个任务池,并在需要时将它们从这个任务池中取出。
任务池的另一个优点是,可用线程的数量可以根据可用的计算资源进行调整,即处理器内核的数量或内存量。
这些任务的约束之一是它们不是相互依赖的,也就是说,一个任务的结果不依赖于前一个任务的结果,或者下一个任务不应依赖于当前任务的结果。这使任务保持隔离,并且易于存储在池中。
典型的用例包括:
Web服务和api服务:请求通常非常小且生命周期很短,因此非常适合于线程池,实际上许多web服务器都实现了线程池。
批量处理图像、视频文件或音频文件:例如,调整图像大小也是非常适合线程池的小型且定义良好的任务。
数据处理管道:数据处理管道中的每个阶段都可以由线程池处理。如前所述,任务不应该相互依赖,以提高线程池的效率。
在这个例子中,我们将构建一个简单的线程池,但这可以很容易地扩展到一个真正的线程池。
在开始之前,需要添加一个库到Cargo.toml文件中:
[dependencies]
fstrings = "0.2.3"
我们将使用这个crate以类似python的方式格式化字符串。
接下来在src/main.rs文件中添加以下几行:
use std::sync::{Arc, Mutex};
use std::thread;
#[macro_use]
extern crate fstrings;
在main.rs中定义一个WebRequest结构体:
struct WebRequest {
work: Box<dyn FnOnce(&str) + Send + Sync>,
}
impl WebRequest {
fn new<F>(f: F) -> Self
where
F: FnOnce(&str) + Send + Sync + 'static,
{
WebRequest { work: Box::new(f) }
}
}
在这段代码中,WebRequest包含一个字段work,它是一个Box封装的闭包。为什么要使用Box?因为闭包的大小是动态的,换句话说,它的大小在编译时是未知的,所以我们需要将它存储在像Box这样的堆分配容器中。Send和Sync特性向编译器表明,这个特定的闭包可以安全地跨多个线程发送和访问。
构造函数接受闭包作为它的唯一参数。当然,它必须满足与结构体中字段相同的约束。静态生命周期是必需的,因为闭包可能比定义它的作用域活得更长。
在main.rs中定义一个ThreadPool结构体:
struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
queue: Arc<Mutex<Vec<WebRequest>>>,
}
现在我们看一下实现。首先是构造函数:
impl ThreadPool {
fn new(num_threads: usize) -> ThreadPool {
let mut workers = Vec::with_capacity(num_threads);
let queue = Arc::new(Mutex::new(Vec::<WebRequest>::new()));
for i in 0..num_threads {
let number = f!("Request {i}");
let queue_clone = Arc::clone(&queue);
workers.push(thread::spawn(move || loop {
let task = queue_clone.lock().unwrap().pop();
if let Some(task) = task {
(task.work)(&number);
} else {
break;
}
}));
}
ThreadPool { workers, queue }
}
}
此方法使用指定的线程数初始化池,创建队列。之后,构造函数生成工作线程。这些线程进入一个循环,弹出队列的任务,并执行它们。如果队列恰好为空,则工作线程中断循环。
然后,我们看一下execute()方法:
impl ThreadPool {
......
fn execute<F>(&self, f: F)
where
F: FnOnce(&str) + Send + Sync + 'static,
{
let task = WebRequest::new(f);
self.queue.lock().unwrap().push(task);
}
}
这个方法只是用指定的闭包创建一个新的WebRequest,并将其push到任务队列中。
接下来,我们看一下join()方法:
impl ThreadPool {
......
fn join(self) {
for worker in self.workers {
worker.join().unwrap();
}
}
}
这是一个阻塞操作,等待线程完成。
使用如下代码测试线程池:
fn main() {
let pool = ThreadPool::new(6);
for i in 0..6 {
pool.execute(move |message| {
println!("Task: {} prints {}",i, message);
});
}
pool.join();
}
执行结果如下:
Task: 3 prints Request 3
Task: 1 prints Request 3
Task: 5 prints Request 1
Task: 2 prints Request 5
Task: 0 prints Request 3
Task: 4 prints Request 4
正如你所看到的,这种模式非常灵活,但是使用时,请考虑以下影响性能和资源因素:
总而言之,找到正确的线程数有时可能非常清楚,有时需要使用一些尝试和错误来找到最佳数量。更高级的做法是可以根据需求动态地增加和减少可用线程的数量。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8