Rust:深入了解线程池

427次阅读  |  发布于7月以前

在某些情况下,你需要并发地执行许多短期任务。创建和销毁执行这些任务线程的开销可能会抑制程序的性能。解决这个问题的一个办法是建立一个任务池,并在需要时将它们从这个任务池中取出。

任务池的另一个优点是,可用线程的数量可以根据可用的计算资源进行调整,即处理器内核的数量或内存量。

这些任务的约束之一是它们不是相互依赖的,也就是说,一个任务的结果不依赖于前一个任务的结果,或者下一个任务不应依赖于当前任务的结果。这使任务保持隔离,并且易于存储在池中。

典型的用例包括:

Web服务和api服务:请求通常非常小且生命周期很短,因此非常适合于线程池,实际上许多web服务器都实现了线程池。

批量处理图像、视频文件或音频文件:例如,调整图像大小也是非常适合线程池的小型且定义良好的任务。

数据处理管道:数据处理管道中的每个阶段都可以由线程池处理。如前所述,任务不应该相互依赖,以提高线程池的效率。

使用Rust实现线程池

在这个例子中,我们将构建一个简单的线程池,但这可以很容易地扩展到一个真正的线程池。

在开始之前,需要添加一个库到Cargo.toml文件中:

[dependencies]
fstrings = "0.2.3"

我们将使用这个crate以类似python的方式格式化字符串。

接下来在src/main.rs文件中添加以下几行:

use std::sync::{Arc, Mutex};
use std::thread;
#[macro_use]
extern crate fstrings;

定义任务

在main.rs中定义一个WebRequest结构体:

struct WebRequest {
    work: Box<dyn FnOnce(&str) + Send + Sync>,
}

impl WebRequest {
    fn new<F>(f: F) -> Self
    where
        F: FnOnce(&str) + Send + Sync + 'static,
    {
        WebRequest { work: Box::new(f) }
    }
}

在这段代码中,WebRequest包含一个字段work,它是一个Box封装的闭包。为什么要使用Box?因为闭包的大小是动态的,换句话说,它的大小在编译时是未知的,所以我们需要将它存储在像Box这样的堆分配容器中。Send和Sync特性向编译器表明,这个特定的闭包可以安全地跨多个线程发送和访问。

构造函数接受闭包作为它的唯一参数。当然,它必须满足与结构体中字段相同的约束。静态生命周期是必需的,因为闭包可能比定义它的作用域活得更长。

实现线程池

在main.rs中定义一个ThreadPool结构体:

struct ThreadPool {
    workers: Vec<thread::JoinHandle<()>>,
    queue: Arc<Mutex<Vec<WebRequest>>>,
}

现在我们看一下实现。首先是构造函数:

impl ThreadPool {
    fn new(num_threads: usize) -> ThreadPool {
        let mut workers = Vec::with_capacity(num_threads);
        let queue = Arc::new(Mutex::new(Vec::<WebRequest>::new()));

        for i in 0..num_threads {
            let number = f!("Request {i}");
            let queue_clone = Arc::clone(&queue);
            workers.push(thread::spawn(move || loop {
                let task = queue_clone.lock().unwrap().pop();
                if let Some(task) = task {
                    (task.work)(&number);
                } else {
                    break;
                }
            }));
        }
        ThreadPool { workers, queue }
    }
}

此方法使用指定的线程数初始化池,创建队列。之后,构造函数生成工作线程。这些线程进入一个循环,弹出队列的任务,并执行它们。如果队列恰好为空,则工作线程中断循环。

然后,我们看一下execute()方法:

impl ThreadPool {
    ......

    fn execute<F>(&self, f: F)
    where
        F: FnOnce(&str) + Send + Sync + 'static,
    {
        let task = WebRequest::new(f);
        self.queue.lock().unwrap().push(task);
    }
}

这个方法只是用指定的闭包创建一个新的WebRequest,并将其push到任务队列中。

接下来,我们看一下join()方法:

impl ThreadPool {
    ......

    fn join(self) {
        for worker in self.workers {
            worker.join().unwrap();
        }
    }
}

这是一个阻塞操作,等待线程完成。

测试

使用如下代码测试线程池:

fn main() {
    let pool = ThreadPool::new(6);
    for i in 0..6 {
        pool.execute(move |message| {
            println!("Task: {} prints  {}",i, message);
        });
    }
    pool.join();
}

执行结果如下:

Task: 3 prints  Request 3
Task: 1 prints  Request 3
Task: 5 prints  Request 1
Task: 2 prints  Request 5
Task: 0 prints  Request 3
Task: 4 prints  Request 4

总结

正如你所看到的,这种模式非常灵活,但是使用时,请考虑以下影响性能和资源因素:

总而言之,找到正确的线程数有时可能非常清楚,有时需要使用一些尝试和错误来找到最佳数量。更高级的做法是可以根据需求动态地增加和减少可用线程的数量。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8