当我们在使用tokio和MPSC(多生产者单消费者)通道时,通常以某种固定的方式连接派生线程。然而,在最近项目中,必须在各种配置中动态匹配异步生产者和消费者。
在这篇文章中,让我们来看看如何实现这种非常有用的动态匹配模式。
首先,我们创建一个关于餐厅的Rust项目:
cargo init restaurant
在Cargo.toml文件中加入依赖项:
[dependencies]
tokio = { version = "1.38.0", features = ["full"] }
然后,在src/main.rs文件中写入业务逻辑代码。
作为餐厅经理,可以分配不同的烹饪台来异步准备不同类型的食物,代码如下(现在不用担心未定义的值):
async fn cooking_stand(food: char) {
loop {
somewhere.send(food.clone()).await;
}
}
食物应该被送到等待上菜的餐桌上,代码如下:
async fn table (number: u8) {
loop {
let food = somehow.recv().await;
println!("Got {} at table {}", food, number);
}
}
现在可以组织我们的餐厅了:
#[tokio::main]
async fn main() {
// 烹饪台
tokio::spawn(cooking_stand('')); // 沙拉
tokio::spawn(cooking_stand('')); // 汉堡
// 客人餐桌
tokio::spawn(table(1));
tokio::spawn(table(2));
tokio::time::sleep(Duration::from_millis(1000)).await;
}
为简单起见,我们假设通过应用程序接受订单。例如,餐厅经理(主线程)知道餐桌1正在等待沙拉,餐桌3正在等待汉堡。但如何真正完成这些订单呢?
初级方法:
cooking_stand -> -> -> table 1
cooking_stand -> -> manager -> table 2
cooking_stand -> -> -> table 3
如果我们强迫经理做这项工作,他可以等待沙拉烹饪站准备沙拉,然后将其传递给餐桌1。然后等待汉堡烹饪台准备好汉堡,把它端到3号餐桌。
这显然是一个有缺陷的设计:
我们需要服务员,幸运的是,Tokio为这项工作提供了完美的工具——oneshot 通道。这些通道被设计和优化为一次传递单个值。
let (waiter_rx, waiter_tx) = oneshot::channel::<char>();
为了让服务员先把沙拉送到1号桌,需要修改我们的烹饪台:
use tokio::sync::oneshot;
async fn cooking_stand (
product: char,
mut waiters: tokio::sync::mpsc::Receiver<oneshot::Sender<char>>
) {
while let Some(waiter) = waiters.recv().await {
waiter.send(product.clone());
}
}
其中tokio::sync::mpsc::Receiver<oneshot::Sender
async fn table (
number: u8,
mut waiters: tokio::sync::mpsc::Receiver<oneshot::Receiver<char>>
) {
while let Some(waiter) = waiters.recv().await {
let food = waiter.await.unwrap();
println!("Got {} at table {}", food, number);
}
}
当服务员被分配到餐桌上时,顾客等待服务员送来烹饪台生产的食物。为了完成这个谜题,我们来修改main函数。经理可以雇佣服务员,而不是自己做繁重的工作,并将他们分配到匹配的烹饪台和桌子上,以完成食物订单。
#[tokio::main]
async fn main() {
// 经理分配服务员到烹饪台
let (stand_salad_tx, stand_salad_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_pizza_tx, stand_pizza_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_burger_tx, stand_burger_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
// 搭建烹饪台
tokio::spawn(cooking_stand('', stand_salad_rx));
tokio::spawn(cooking_stand('', stand_pizza_rx));
tokio::spawn(cooking_stand('', stand_burger_rx));
// 经理分配服务员到餐桌
let mut tables: Vec<tokio::sync::mpsc::Sender<oneshot::Receiver<char>>> = Vec::new();
for number in 1..=4 {
let (table_tx, table_rx) = mpsc::channel::<oneshot::Receiver<char>>(100);
tables.push(table_tx);
tokio::spawn(table(number, table_rx));
}
// t;
}
让我们通过在main函数的末尾添加以下代码来检查这种方式是否有效:
// 创建服务员
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到沙拉烹饪台
stand_salad_tx.send(waiter_tx).await.unwrap();
// 让他把食物送到1号桌
tables.first().unwrap().send(waiter_rx).await.unwrap();
运行结果:
Got at table 1
这种通过常规通道发送oneshot通道的模式可以用于实现各种流量控制。以给定的比率、节流等方式传递消息。
完整代码如下:
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
async fn cooking_stand(
product: char,
mut waiters: tokio::sync::mpsc::Receiver<oneshot::Sender<char>>,
) {
while let Some(waiter) = waiters.recv().await {
waiter.send(product).unwrap();
}
}
async fn table(number: u8, mut waiters: tokio::sync::mpsc::Receiver<oneshot::Receiver<char>>) {
while let Some(waiter) = waiters.recv().await {
let food = waiter.await.unwrap();
println!("Got {} at table {}", food, number);
}
}
#[tokio::main]
async fn main() {
// 经理分配服务员到烹饪台
let (stand_salad_tx, stand_salad_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_pizza_tx, stand_pizza_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
let (stand_burger_tx, stand_burger_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
// 搭建烹饪台
tokio::spawn(cooking_stand('', stand_salad_rx));
tokio::spawn(cooking_stand('', stand_pizza_rx));
tokio::spawn(cooking_stand('', stand_burger_rx));
// 经理分配服务员到餐桌
let mut tables: Vec<tokio::sync::mpsc::Sender<oneshot::Receiver<char>>> = Vec::new();
for number in 1..=4 {
let (table_tx, table_rx) = mpsc::channel::<oneshot::Receiver<char>>(100);
tables.push(table_tx);
tokio::spawn(table(number, table_rx));
}
// 创建服务员
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到沙拉烹饪台
stand_salad_tx.send(waiter_tx).await.unwrap();
// 让他把食物送到1号桌
tables.first().unwrap().send(waiter_rx).await.unwrap();
// 创建服务员
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到披萨烹饪台
stand_pizza_tx.send(waiter_tx).await.unwrap();
// 让他把食物送到2号桌
tables.get(1).unwrap().send(waiter_rx).await.unwrap();
// 创建服务员
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到披萨烹饪台
stand_burger_tx.send(waiter_tx).await.unwrap();
// 让他把食物送到3号桌
tables.get(2).unwrap().send(waiter_rx).await.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
}
运行结果:
Got at table 2
Got at table 3
Got at table 1
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8