Rust: tokio による非同期プログラミング

Takami Torao Rust 1.59 tokio 1.17 #Rust #tokio

概要

tokio (トーキョー) は Rust 言語で非同期アプリケーションを作成するためのイベント駆動型フレームワークです。煩雑さを排除し統一された方法で使用できる非同期ランタイムおよびノンブロッキング I/O とネットワークのプラットフォームを提供しています。開発者は tokio を使用することで高度にスケーラブルな設計のネットワークアプリケーションを迅速に開発することができます。

tokio は epoll (Linux) や kqueue (FreeBSD)、IOCP (Windows) システムコールを使用する低レベルのクレート mio をベースにしています。また、並行して処理している未確定な結果の抽象化された構造である Future パターンに基づいています。

Table of Contents

  1. 概要
    1. 非同期処理
  2. tokio 非同期プログラミング
    1. tokio::main マクロ
    2. asyncawait 機能
      1. await による同期実行
  3. Service トレイトを使った非同期特性の導入

非同期処理

単純な逐次処理設計 (シングルスレッド設計) のアプリケーションで時間のかかるブロッキング処理を使用すると全体の動作も停止します。ちょっとしたコマンドラインツールであればそれでも問題ありませんが、サーバ環境や GUI 環境ではアプリケーション全体の応答が停止してしまうため大きな問題になります。

時間のかかる処理の応答を別のスレッドで待つようにして全体としてはブロックしないように設計する手法があります。しかし、スレッドは多くのメモリを消費し、スレッドの切り替え (コンテキストスイッチ) が CPU 使用率を圧迫して容易にリソーススタベーションを発生させます。結果としてブロッキングによるアプリケーション全体の停止は回避できるものの、一度に数百程度の "応答待ちスレッド" までしかスケールしない方法です。

この方法の明らかな問題は 1 つのブロッキング処理を実行するために 1 つのスレッドを割り当てる必要がある点です。一方で、非同期プログラミングは呼び出された側が処理の終了通知を行うモデルです。非同期を導入することによって、スレッドは時間のかかる処理の終了を待つ必要がなくなり、時間のかかる処理を行っている間に別の処理を行ったり、同時に複数の処理を実行して終わったものから順に続きを行うように設計することができます。結果としてアプリケーションは (以前はブロッキングで行っていた) 多くの処理を最小限のスレッドで切り盛りできるようになり、プログラムの同時実行性能とスケーラビリティが飛躍的に向上します。

ただし、非同期プログラミングは単純な逐次処理設計の同期プログラミングよりもはるかに困難です。非同期フレームワークを使用して定型化されたモデルで実装することが重要です。

tokio 非同期プログラミング

他の言語と比べて後発の Rust の非同期フレームワークはいくつかの変遷を経て tokio がデファクトスタンダードとなりつつあります。しかしまだ成熟した段階とは言えず、今後も半年や数年の規模でベストと言われる書き方が変わる可能性が高いように思います。この記事で対象とする tokio バージョンは以下の通り。

[dependencies]
tokio = { version = "1.17", features = ["full"] }

tokio::main マクロ

tokio の非同期処理は Future に基づいているため async ブロック内で使用する必要があります。このため同期処理から非同期処理を使用するには右下のような「非同期処理をブロッキングっで呼び出す」というボイラープレートを記述する必要があります。

tokio には従来の main() のような同期関数を透過的に async 化するための便利なマクロが用意されています。左下のように async fn main() 関数に tokio::main 属性をつけることで右下のように展開され、その処理の実体がランタイムの適切な Executor で非同期実行されるようになります。

#[tokio::main]
async fn main() {
  // tokio を使った非同期処理
  ...
}
fn main() {
  tokio::runtime::Builder::new_multi_thread()
    .enable_all()
    .build()
    .unwrap()
    .block_on(async {
      // tokio を使った非同期処理
      ...
    })
}

Rust のエントリポイントが同期処理の main() であることには変わりはありません。tokio::main マクロが行っている変換は同期処理から非同期処理にスイッチするエントリポイントと見ることもできます。また main() だけではなく、別の関数に付与してそこを非同期処理のエントリポイントにすることもできます。

fn main() {
    hello();
}

// async 宣言されているが、この関数自体は同期処理として呼び出すことができる
#[tokio::main]
async fn hello() {
  // 関数の中は非同期で実行され Future を経由して結果が返される
  ...
}

asyncawait 機能

async を付与された関数や処理ブロックを呼び出すと結果として Future を生成します。Future 自体は「後で実行可能な処理」つまりクロージャーや継続 (continuation) に似た概念を表しているに過ぎず、Future を生成しただけでは ─ つまり async 関数を呼び出しただけではその処理はまだ開始していません。

await による同期実行

Future で表現された async 処理を実行する手っ取り早い方法は後置キーワードの await を付けることです。これにより Future の処理を同期処理と同じように実行して結果を得ることができます。以下の例では 1) 変数 greeting を束縛した時点ではまだ中の処理は開始しておらず、2) await された時点で実行され、3) その終了を待って後続の処理が実行されていることが分かります。またスレッド ID に変化がないことからすべて同じスレッドで実行されていることが分かります。

use std::thread;
use std::time::Duration;

#[tokio::main]
async fn main() {
  let greeting = async {   // impl Fugure<Output = &str>
    print!("{}: Hello, ", tid());
    thread::sleep(Duration::from_millis(1000));
    println!("Susan. This is Mike.");
    "Mike"
  };
  thread::sleep(Duration::from_millis(500));
  println!("{}: (The telephone rings. Susan picks it up.)", tid());
  let speaking_to = greeting.await;
  println!("{}: Hi, {}. Good afternoon.", tid(), speaking_to);
}

fn tid() -> String {
  format!("{:?}", thread::current().id())
}
torao@lazurite:~$ cargo run
...
ThreadId(1): (The telephone rings. Susan picks it up.)
ThreadId(1): Hello, Susan. This is Mike.
ThreadId(1): Hi, Mike. Good afternoon.

言い換えると asyncFuture を使っていても await で実行しているのを見かけたらその部分は単なる同期プログラミングです。非同期処理を同期処理のように扱えるという意味では #[tokio::main] と似ているかもしれませんが、awaitasync 処理の中でしか使用することはできません。

Service トレイトを使った非同期特性の導入

Service トレイトRequest 型から Response 型への非同期変換を表します。アプリケーションは Service トレイトを使用することで独自の非同期特性を追加することができます。