Important Notice: this service will be discontinued by the end of 2024 because for multiple years now, Plume is no longer under active/continuous development. Sadly each time there was hope, active development came to a stop again. Please consider using our Writefreely instance instead.

Rustで「n個のキューを使ってHTTPリクエスト」

例えば1000ぐらいのページにアクセスしたいとして、順番にやってったら時間が掛かり過ぎるので並行にやりたい。

reqwesttokioでちゃちゃっと書けば全てを並行にするのは簡単なんだけど、リソースを使い過ぎたのか何なのか(ちゃんと調べてない)、タイムアウトするやつとか出てきてしまった。

そこで、同時にリクエストするのは50ぐらいにしておいて、空きができたら次のリクエスト……とすることにした。

サイズドキューを使ってマルチスレッドでやればいいわけだ。のだけどRustでのサイズドキューライブラリー探しに苦労したのと、都合によりシングルスレッドでやりたかったのでasync/awaitを使ってて、そのやり方を調べるのにも苦労した。

紆余曲折

そもそもRustの非同期も、ブログ記事とか見ることはあってもまともに書いたことなかったので改めて勉強することにした。

まずはTokio チュートリアル (日本語訳)を読む。訳者が書いているように、イキバタじゃなくてある程度基礎から学ぶことで見通し・自分のやるべきことが分かるようになった。最高。

この本でサンプルもあったmpsc(マルチプロデューサー・シングルコンシューマー)を試してみるけどうまくいかず、そもそも課題にフィットしてないな、と実感し始めたところで他のライブラリーを探し始めた。「自分のやりたいのはサイズドキューなんだ、だから少なくともコンシューマー側は複数無いといけないんだ」と自覚してから見たのは、やはり本で触れられているwatch。でもこれは違う。

あれこれ検索して見付けたのが非同期 Rust パターンという記事。長いのだけど前半は明らかに求める内容じゃないからすっ飛ばして後半を読むと、

非同期対応した固定長 mpmc キューには flume::bounded が便利です。 固定長なので、バッファしていっぱいになったら producer 側は .await で consumer 側の消費を待てて、 consumer 側も mc の特権である Stream + Clone から .await した順に処理対象が降ってるので。

とあって、これだ! となった。

が、既存のアプリケーションに見様見真似で試してみるも、どうもうまくいかない。そこでflumeのテストコードなども見ながら、ステップバイステップで動かしていくことにした。

プログラムが終了しない

サンプルっぽいコードを書いたらプログラムが終了しなかった。

use tokio::runtime;

#[tokio::main]
async fn main() {
    let (tx, rx) = flume::bounded(3);
    for i in 0..3 {
        tx.send_async(i).await.unwrap();
    }
    while let Ok(i) = rx.recv_async().await {
        eprintln!("{}", i);
    }
}

0
1
2

で、プログラムが終了しない。

一つのワーカーだけが使われる

そういやリクエストは50個のキューから投げたいんだったな、今だと一つだけだわ、と思ってコンシューマーを増やした。

それぞれがちゃんと使われるか見るために出力してみた(キューのことをここではworkerと書いた)。

use tokio::runtime;

#[tokio::main]
async fn main() {
    let capacity = 20;
    let (tx, rx) = flume::bounded(capacity);
    for i in 0..capacity {
        tx.send_async(i).await.unwrap();
    }
    for j in 0..3 {
        let rx = rx.clone();
        while let Ok(i) = rx.recv_async().await {
            eprintln!("worker: {}, data: {}", j, i);
        }
    }
}

worker: 0, data: 0
worker: 0, data: 1
worker: 0, data: 2
worker: 0, data: 3
worker: 0, data: 4
worker: 0, data: 5
worker: 0, data: 6
worker: 0, data: 7
worker: 0, data: 8
worker: 0, data: 9
worker: 0, data: 10
worker: 0, data: 11
worker: 0, data: 12
worker: 0, data: 13
worker: 0, data: 14
worker: 0, data: 15
worker: 0, data: 16
worker: 0, data: 17
worker: 0, data: 18
worker: 0, data: 19

やはり終了しない。そして一つのワーカーしか使われていない。

開始しない

プロデューサーとコンシューマーの順番がおかしいのかな?

use tokio::runtime;

#[tokio::main]
async fn main() {
    let capacity = 20;
    let (tx, rx) = flume::bounded(capacity);
    for j in 0..3 {
        let rx = rx.clone();
        while let Ok(i) = rx.recv_async().await {
            eprintln!("worker: {}, data: {}", j, i);
        }
    }
    for i in 0..capacity {
        tx.send_async(i).await.unwrap();
    }
}

今度は始まらない(出力がない)。そして終わらない。

動くようになった

use tokio::runtime;

#[tokio::main]
async fn main() {
    let capacity = 20;
    let (tx, rx) = flume::bounded(capacity);
    for j in 0..3 {
        let rx = rx.clone();
        tokio::spawn(async move {
            while let Ok(i) = rx.recv_async().await {
                eprintln!("worker: {}, data: {}", j, i);
            }
        });
    }
    for i in 0..capacity {
        tx.send_async(i).await.unwrap();
    }
}

worker: 1, data: 0
worker: 1, data: 3
worker: 1, data: 4
worker: 1, data: 5
worker: 1, data: 6
worker: 1, data: 7
worker: 1, data: 8
worker: 1, data: 9
worker: 1, data: 10
worker: 1, data: 11
worker: 1, data: 12
worker: 0, data: 2
worker: 0, data: 14
worker: 0, data: 15
worker: 0, data: 16
worker: 0, data: 17
worker: 0, data: 18
worker: 0, data: 19
worker: 1, data: 13
worker: 2, data: 1

spawn()する必要があった。

ランタイムの明示実行:即時終了してしまう

都合によりtokio::mainマクロではなくて自分でランタイムを生成・実行したかったので、tokioのドキュメントを見ながら書き替えた。

use tokio::runtime;

fn main() {
    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let capacity = 20;
        let (tx, rx) = flume::bounded(capacity);
        for j in 0..3 {
            let rx = rx.clone();
            tokio::spawn(async move {
                while let Ok(i) = rx.recv_async().await {
                    eprintln!("worker: {}, data: {}", j, i);
                }
            });
        }
        for i in 0..capacity {
            tx.send_async(i).await.unwrap();
        }
    });
}

すると一切の出力がないまま、すぐに終了してしまった。

インスタンスメソッドで spawn

関係ないとは思うものの、 spawn()tokio クレートではなくて生成したランタイムの物を使ってみる。

use tokio::runtime;

fn main() {
    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let capacity = 20;
        let (tx, rx) = flume::bounded(capacity);
        for j in 0..3 {
            let rx = rx.clone();
            rt.spawn(async move {
                while let Ok(i) = rx.recv_async().await {
                    eprintln!("worker: {}, data: {}", j, i);
                }
            });
        }
        for i in 0..capacity {
            tx.send_async(i).await.unwrap();
        }
    });
}

変わらない。

マルチスレッド

Builder の呼ぶメソッドを変えてマルチスレッドにしてみる。

use tokio::runtime;

fn main() {
    let rt = runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let capacity = 20;
        let (tx, rx) = flume::bounded(capacity);
        for j in 0..3 {
            let rx = rx.clone();
            rt.spawn(async move {
                while let Ok(i) = rx.recv_async().await {
                    eprintln!("worker: {}, data: {}", j, i);
                }
            });
        }
        for i in 0..capacity {
            tx.send_async(i).await.unwrap();
        }
    });
}
worker: 1, data: 1
worker: 2, data: 2
worker: 2, data: 4
worker: 2, data: 5
worker: 2, data: 6
worker: 1, data: 3
worker: 1, data: 7
worker: 1, data: 9
worker: 1, data: 10
worker: 1, data: 11
worker: 1, data: 12
worker: 1, data: 13
worker: 1, data: 14
worker: 1, data: 15
worker: 2, data: 8
worker: 2, data: 16
worker: 2, data: 17
worker: 0, data: 0
worker: 1, data: 18
worker: 0, data: 19

動いた! シングルスレッドだとだめなのか……。

ジョインハンドル

そういや spawn() したらジョインしないといけないんだったな(むしろマルチスレッドはなぜ動いていたんだ。速過ぎたのか?)。

use futures::future::join_all;
use tokio::runtime;

fn main() {
    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let capacity = 20;
        let (tx, rx) = flume::bounded(capacity);
        let workers = 3;
        let mut handles = Vec::with_capacity(workers);
        for j in 0..workers {
            let rx = rx.clone();
            let t = rt.spawn(async move {
                while let Ok(i) = rx.recv_async().await {
                    eprintln!("worker: {}, data: {}", j, i);
                }
            });
            handles.push(t);
        }
        for i in 0..capacity {
            tx.send_async(i).await.unwrap();
        }

        join_all(handles).await;
    });
}
worker: 0, data: 0
worker: 0, data: 1
worker: 0, data: 2
worker: 0, data: 3
worker: 0, data: 4
worker: 0, data: 5
worker: 0, data: 6
worker: 0, data: 7
worker: 0, data: 8
worker: 0, data: 9
worker: 0, data: 10
worker: 0, data: 11
worker: 0, data: 12
worker: 0, data: 13
worker: 0, data: 14
worker: 0, data: 15
worker: 0, data: 16
worker: 0, data: 17
worker: 0, data: 18
worker: 0, data: 19

ワーカーが変わらない。

そして終わらない。

プロデューサーをdrop

flumeのドキュメントに「 SenderReceiver が全部dropしたら終わる」「 ReceiverSenderdropしたら終わる」って書いてあったのを思い出してプロデューサーを drop してみる。

use futures::future::join_all;
use tokio::runtime;

fn main() {
    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let capacity = 20;
        let (tx, rx) = flume::bounded(capacity);
        let workers = 3;
        let mut handles = Vec::with_capacity(workers);
        for j in 0..workers {
            let rx = rx.clone();
            let t = rt.spawn(async move {
                while let Ok(i) = rx.recv_async().await {
                    eprintln!("worker: {}, data: {}", j, i);
                }
            });
            handles.push(t);
        }
        for i in 0..capacity {
            tx.send_async(i).await.unwrap();
        }
        drop(tx);

        join_all(handles).await;
    });
}
worker: 0, data: 0
worker: 0, data: 1
worker: 0, data: 2
worker: 0, data: 3
worker: 0, data: 4
worker: 0, data: 5
worker: 0, data: 6
worker: 0, data: 7
worker: 0, data: 8
worker: 0, data: 9
worker: 0, data: 10
worker: 0, data: 11
worker: 0, data: 12
worker: 0, data: 13
worker: 0, data: 14
worker: 0, data: 15
worker: 0, data: 16
worker: 0, data: 17
worker: 0, data: 18
worker: 0, data: 19

惜しい。プログラムは終了したが、ワーカーが切り替わらない。

1秒スリープ

もしかして最初のワーカーが速過ぎて全部そいつが処理してしまっただけなのかも? と思ってスリープを入れてみる。

use futures::future::join_all;
use tokio::{runtime, time::{sleep, Duration}};

fn main() {
    let rt = runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let capacity = 20;
        let (tx, rx) = flume::bounded(capacity);
        let workers = 3;
        let mut handles = Vec::with_capacity(workers);
        for j in 0..workers {
            let rx = rx.clone();
            let t = rt.spawn(async move {
                while let Ok(i) = rx.recv_async().await {
                    sleep(Duration::from_secs(1)).await;
                    eprintln!("worker: {}, data: {}", j, i);
                }
            });
            handles.push(t);
        }
        for i in 0..capacity {
            tx.send_async(i).await.unwrap();
        }
        drop(tx);

        join_all(handles).await;
    });
}
worker: 0, data: 0
worker: 1, data: 1
worker: 2, data: 2
worker: 0, data: 3
worker: 1, data: 4
worker: 2, data: 5
worker: 0, data: 6
worker: 1, data: 7
worker: 2, data: 8
worker: 0, data: 9
worker: 1, data: 10
worker: 2, data: 11
worker: 0, data: 12
worker: 1, data: 13
worker: 2, data: 14
worker: 0, data: 15
worker: 1, data: 16
worker: 2, data: 17
worker: 0, data: 18
worker: 1, data: 19

これこれ!

これを元に実際のアプリケーションに適用してみたらうまくいったっぽい。少なくともローカルでは。このサイトに適用して少し様子を見てみよう。

これでいいと思っているわけでもないので、よりよいライブラリー、よりよい書き方があったら教えてください!