如何动态匹配 tokio 派生线程?

249次阅读  |  发布于5月以前

当我们在使用tokio和MPSC(多生产者单消费者)通道时,通常以某种固定的方式连接派生线程。然而,在最近项目中,必须在各种配置中动态匹配异步生产者和消费者。

在这篇文章中,让我们来看看如何实现这种非常有用的动态匹配模式。

首先,我们创建一个关于餐厅的Rust项目:

cargo init restaurant

在Cargo.toml文件中加入依赖项:

[dependencies]
tokio = { version = "1.38.0", features = ["full"] }

然后,在src/main.rs文件中写入业务逻辑代码。

作为餐厅经理,可以分配不同的烹饪台来异步准备不同类型的食物,代码如下(现在不用担心未定义的值):

async fn cooking_stand(food: char) {
    loop {
        somewhere.send(food.clone()).await;
    }
}

食物应该被送到等待上菜的餐桌上,代码如下:

async fn table (number: u8) {
    loop {
        let food = somehow.recv().await;
        println!("Got {} at table {}", food, number);
    }
}

现在可以组织我们的餐厅了:

#[tokio::main]
async fn main() {
    // 烹饪台
    tokio::spawn(cooking_stand('')); // 沙拉
    tokio::spawn(cooking_stand('')); // 汉堡

    // 客人餐桌
    tokio::spawn(table(1));
    tokio::spawn(table(2));

    tokio::time::sleep(Duration::from_millis(1000)).await;
}

为简单起见,我们假设通过应用程序接受订单。例如,餐厅经理(主线程)知道餐桌1正在等待沙拉,餐桌3正在等待汉堡。但如何真正完成这些订单呢?

初级方法:

cooking_stand ->  ->         -> table 1
cooking_stand ->  -> manager -> table 2
cooking_stand ->  ->         -> table 3

如果我们强迫经理做这项工作,他可以等待沙拉烹饪站准备沙拉,然后将其传递给餐桌1。然后等待汉堡烹饪台准备好汉堡,把它端到3号餐桌。

这显然是一个有缺陷的设计:

我们需要服务员,幸运的是,Tokio为这项工作提供了完美的工具——oneshot 通道。这些通道被设计和优化为一次传递单个值。

let (waiter_rx, waiter_tx) = oneshot::channel::<char>();

为了让服务员先把沙拉送到1号桌,需要修改我们的烹饪台:

use tokio::sync::oneshot;

async fn cooking_stand (
    product: char,
    mut waiters: tokio::sync::mpsc::Receiver<oneshot::Sender<char>>
) {
    while let Some(waiter) = waiters.recv().await {
        waiter.send(product.clone());
    }
}

其中tokio::sync::mpsc::Receiver<oneshot::Sender>是一个等待队列。是的,你没看错,可以通过其他通道封装一个oneshot通道。当服务员到达烹饪台时,烹饪台就会把食物准备好,然后交给服务员送到餐桌上。让我们对餐桌做同样的事情,他们有特定的服务员接收部分,会给他们送食物:

async fn table (
    number: u8,
    mut waiters: tokio::sync::mpsc::Receiver<oneshot::Receiver<char>>
) {
    while let Some(waiter) = waiters.recv().await {
        let food = waiter.await.unwrap();
        println!("Got {} at table {}", food, number);
    }
}

当服务员被分配到餐桌上时,顾客等待服务员送来烹饪台生产的食物。为了完成这个谜题,我们来修改main函数。经理可以雇佣服务员,而不是自己做繁重的工作,并将他们分配到匹配的烹饪台和桌子上,以完成食物订单。

#[tokio::main]
async fn main() {
    // 经理分配服务员到烹饪台
    let (stand_salad_tx, stand_salad_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
    let (stand_pizza_tx, stand_pizza_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
    let (stand_burger_tx, stand_burger_rx) = mpsc::channel::<oneshot::Sender<char>>(100);

    // 搭建烹饪台
    tokio::spawn(cooking_stand('', stand_salad_rx));
    tokio::spawn(cooking_stand('', stand_pizza_rx));
    tokio::spawn(cooking_stand('', stand_burger_rx));

    // 经理分配服务员到餐桌
    let mut tables: Vec<tokio::sync::mpsc::Sender<oneshot::Receiver<char>>> = Vec::new();

    for number in 1..=4 {
        let (table_tx, table_rx) = mpsc::channel::<oneshot::Receiver<char>>(100);
        tables.push(table_tx);
        tokio::spawn(table(number, table_rx));
    }

   // t;
}

让我们通过在main函数的末尾添加以下代码来检查这种方式是否有效:

// 创建服务员
let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
// 分配到沙拉烹饪台
stand_salad_tx.send(waiter_tx).await.unwrap();
// 让他把食物送到1号桌
tables.first().unwrap().send(waiter_rx).await.unwrap();

运行结果:

Got  at table 1

这种通过常规通道发送oneshot通道的模式可以用于实现各种流量控制。以给定的比率、节流等方式传递消息。

完整代码如下:

use std::time::Duration;

use tokio::sync::{mpsc, oneshot};

async fn cooking_stand(
    product: char,
    mut waiters: tokio::sync::mpsc::Receiver<oneshot::Sender<char>>,
) {
    while let Some(waiter) = waiters.recv().await {
        waiter.send(product).unwrap();
    }
}

async fn table(number: u8, mut waiters: tokio::sync::mpsc::Receiver<oneshot::Receiver<char>>) {
    while let Some(waiter) = waiters.recv().await {
        let food = waiter.await.unwrap();
        println!("Got {} at table {}", food, number);
    }
}

#[tokio::main]
async fn main() {
    // 经理分配服务员到烹饪台
    let (stand_salad_tx, stand_salad_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
    let (stand_pizza_tx, stand_pizza_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
    let (stand_burger_tx, stand_burger_rx) = mpsc::channel::<oneshot::Sender<char>>(100);

    // 搭建烹饪台
    tokio::spawn(cooking_stand('', stand_salad_rx));
    tokio::spawn(cooking_stand('', stand_pizza_rx));
    tokio::spawn(cooking_stand('', stand_burger_rx));

    // 经理分配服务员到餐桌
    let mut tables: Vec<tokio::sync::mpsc::Sender<oneshot::Receiver<char>>> = Vec::new();

    for number in 1..=4 {
        let (table_tx, table_rx) = mpsc::channel::<oneshot::Receiver<char>>(100);
        tables.push(table_tx);
        tokio::spawn(table(number, table_rx));
    }

    // 创建服务员
    let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
    // 分配到沙拉烹饪台
    stand_salad_tx.send(waiter_tx).await.unwrap();
    // 让他把食物送到1号桌
    tables.first().unwrap().send(waiter_rx).await.unwrap();

    // 创建服务员
    let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
    // 分配到披萨烹饪台
    stand_pizza_tx.send(waiter_tx).await.unwrap();
    // 让他把食物送到2号桌
    tables.get(1).unwrap().send(waiter_rx).await.unwrap();

    // 创建服务员
    let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
    // 分配到披萨烹饪台
    stand_burger_tx.send(waiter_tx).await.unwrap();
    // 让他把食物送到3号桌
    tables.get(2).unwrap().send(waiter_rx).await.unwrap();

    tokio::time::sleep(Duration::from_millis(1000)).await;
}

运行结果:

Got  at table 2
Got  at table 3
Got  at table 1

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8