非同期処理

Takami Torao Rust 1.48 #Rust #async
  • このエントリーをはてなブックマークに追加

概要

この記事では並行してタスクを実行するインスタンス (つまりスレッドやプロセス、物理ノードなどの単位) をプロセスと呼ぶ。

Table of Contents

  1. 概要
  2. 導入
  3. 同期処理の非同期化
    1. Worker プロセス
  4. スレッドに基づく実装

導入

2 つの独立したプロセス \(A\) と \(B\) が存在し、\(A\) が \(B\) に処理を依頼する状況を想定する。同期処理 (synchronous execution) とは \(B\) の処理が完了するまで \(A\) の処理は進行せず、\(B\) の処理完了通知を受信してから \(A\) の次の処理が進行する ─ つまり \(B\) の完了と \(A\) の再開が同期している動作形態である。同期処理はマルチプロセスでの動作であってもタスクの実行順序の観点では逐次処理と同等であることから逐次的な特性を持つ。

一方で、\(B\) の処理完了を待たずに\(A\) が次の処理を進行できる動作形態を非同期処理 (asynchronous execution) と呼ぶ。非同期処理は \(A\) と \(B\) の両方のタスクが同時に進行することから並行性を持つ。

非同期が意味する範囲では \(A\) は必ずしも \(B\) の処理完了を知る必要はない。しかし現実には \(A\) はどこか時点で \(B\) の処理結果が必要となることもある。このとき、\(B\) の通知を待つ同期ポイント (synchronous point) を設置して処理完了と結果を受信することができる。

Fig 1 はプロセス \(A\) がプロセス \(B\) に処理を要求したときの同期実行(左)と非同期実行(右)での動作の違いを表している。同期実行では \(B\) の処理完了を待ってから次の処理に進むが、非同期実行では \(B\) の処理完了を待たずに次の処理に進むことができる。ただしこのシーケンス図のようにプロセス \(B\) が \(A\) に完了通知を行う同期ポイントを設置し、\(A\) は必要に応じて通知を待機することで処理結果を得ることができる。

Fig 1. プロセス \(A\) がプロセス \(B\) に処理を要求したときの同期実行と非同期実行での処理シーケンス上の違い。

同期/非同期は並行性に関する概念だが、必ずしもマルチスレッド/マルチプロセスやマルチプロセッサ環境を前提としていないことに注意。

ノンブロッキング (non-blocking) とは、どのような状況でもプロセスがブロックされることなく制御がすぐに戻ることを保証するという意味である。一般に非同期処理の要求はノンブロッキングで行われることからしばしば双方の混乱も見られるが、ノンブロッキングは何らかの処理が並行して行われることを前提としていない点が異なる。

TCP ソケットからデータを読み出す例で説明すると、ノンブロッキングでは読み込み可能なデータが到着していなければ直ちにエラーで終了する (並行してデータの到着を待たない)。一方で非同期では制御がすぐに戻ることと並行してデータ到着の監視が始まり、データが読み出し可能になったことを通知として受け取ることができる (到着したデータは通知に含まれるか、ブロッキングなしに読み出せるだろう)。

同期処理の非同期化

一般的なプログラミング言語のエントリポイント (main()) は同期処理である。またマルチスレッドのような環境であっても局所的なコードは逐次処理である。

Worker プロセス

システムやアプリケーションに特定の処理を行うための独立したスレッド、プロセス、または物理ノードを導入することは同期処理を非同期化する一般的な手法である。この記事では処理の非同期化のために使用する演算リソースを Worker と呼ぶ。

マルチタスク環境で動作するアプリケーションにおいて時間のかかるタスク (処理) を実行するとき、そのタスクがアプリケーション自体の応答性を阻害しないために一般に Worker と呼ばれる独立したスレッドを導入する。この設計によってタスクの実行は非同期化され、アプリケーションはタスクの実行完了を待つことなくすぐに次のイベント (画面入力や RPC リクエストなど) に取り掛かることができる。

実行するタスクが発生するたびに Worker スレッドを起動することでも目的を達することができるが、そのような無作法な Worker 実装では効率も悪くすぐにスケールしなくなることが予想される。一般には:

  1. アプリケーションと Worker の間に Fig 2 のようなジョブキュー (job queue) またはタスクキューと呼ばれるキュー構造を設置する。
  2. Worker 数は固定個数か、負荷に応じて変動するように設計する。ただし動的な場合であっても上限を持つ。一般にスレッドやプロセスのように CPU bound な実行ではアプリケーションが利用可能な最大コア数が上限となる。

アプリケーションはジョブキューにタスク (ジョブ) を投入するだけですぐに次の処理を開始する。Worker はキューからタスクを取り出して実行することを繰り返す (キューが空の場合は新しいタスクが投入されるまで停止する)。

Fig 2. Worker プロセスにジョブ (タスク) の実行を移譲するアプリケーションのパターン。

アプリケーションが個々のタスクの実行結果を必要とする場合は少し工夫が必要である。同一プロセス内の処理であれば Future パターンを使用できるだろう。分散システムの場合はジョブキューと反対方向のメッセージング機構や共有 DB をポーリングする必要がある。

Worker は非同期処理のための基本的な設計で、適用はマルチスレッドプログラミング、メッセージパッシング、分散システム、Worker Thread パターン、Producer/Consumer パターンなど、非常に広範囲に及ぶ。またアクターモデルは Worker 設計を概念的に発展させたものである。

スレッドに基づく実装

まず、非同期ソケットや poll を使用しないでイベントループを行っているスレッドとの通信を実装してみよう。どのような言語でも Worker スレッドを用いたこのような非同期処理の設計はしばしば目にする。

イベントループを行っているスレッド内で処理を行い、その結果を Future として返す実装は Rust で以下のように書くことができる。

開く
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::task::{Context, Poll, Waker};
use std::thread::{sleep, spawn};
use std::time::{Duration, SystemTime};
use tokio::runtime::Runtime;

fn main() {
  let (sender, receiver) = channel::<Task<SystemTime>>();
  spawn(move || event_loop(receiver));
  let rt = Runtime::new().unwrap();
  loop {
    sleep(Duration::from_secs(1));
    rt.block_on(async {
      let now = euqueue(&sender, Task::new(move || SystemTime::now())).await;
      println!("{:?}", now);
    });
  }
}

fn euqueue<R>(sender: &Sender<Task<R>>, task: Task<R>) -> TaskFuture<R> {
  let future = TaskFuture {
    state: task.state.clone()
  };
  sender.send(task).unwrap();
  future
}

fn event_loop<R>(receiver: Receiver<Task<R>>) {
  loop {
    let mut exec = receiver.recv().unwrap();
    exec.execute();
  }
}

type Executable<R> = dyn (FnMut() -> R) + Send + 'static;

struct TaskState<R> {
  result: Option<R>,
  waker: Option<Waker>,
}

struct Task<R> {
  executable: Box<Executable<R>>,
  state: Arc<Mutex<TaskState<R>>>,
}

impl<R> Task<R> {
  pub fn new<E>(executable: E) -> Self where E: (FnMut() -> R) + Send + 'static {
    Self {
      executable: Box::new(executable),
      state: Arc::new(Mutex::new(TaskState {
        result: None,
        waker: None,
      })),
    }
  }
  pub fn execute(&mut self) {
    let result = (self.executable)();
    let mut state = self.state.lock().unwrap();
    state.result = Some(result);
    if let Some(waker) = state.waker.take() {
      waker.wake();
    }
  }
}

struct TaskFuture<R> {
  state: Arc<Mutex<TaskState<R>>>
}

impl<R> Future for TaskFuture<R> {
  type Output = R;
  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut state = self.state.lock().unwrap();
    if let Some(result) = state.result.take() {
      Poll::Ready(result)
    } else {
      state.waker = Some(cx.waker().clone());
      Poll::Pending
    }
  }
}

すでに存在する特定のスレッドに処理を委譲しなければならないため async 関数を使うことができない点に注意。Rust のスレッド間通信は原則として channel を使用するのが望ましい。channel は片側のスレッドで send したデータをもう片方のスレッドから receive することができる (目的は Go の chan や Java の BlockingQueue と似ている)。

非同期で実行する処理 dyn FnMut() はコンパイル時にサイズが決定できないため Box 化する。また、受け渡しを行うために Send が必要である。

TaskState がスレッド間で共有される。

Sender<Task> イベントループに Task を送信する。
Receiver<Task> イベントループが Task を受け取る。
TaskState スレッド間で共有される状態。