Rust: How to implement an event loop

Takami Torao Rust 1.48 #Rust #async
  • このエントリーをはてなブックマークに追加

概要

  1. 概要
  2. イベントループの実装

イベントループの実装

まず、非同期ソケットや poll を使用しないでイベントループを行っているスレッドとの通信を実装してみよう。どのような言語でも Worker スレッドを用いたこのような非同期処理の設計はしばしば目にする。

イベントループを行っているスレッド内で処理を行い、その結果を Future として返す実装は Rust で以下のように書くことができる。

開く
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::task::{Context, Poll, Waker};
use std::thread::{sleep, spawn};
use std::time::{Duration, SystemTime};
use tokio::runtime::Runtime;

fn main() {
  let (sender, receiver) = channel::<Task<SystemTime>>();
  spawn(move || event_loop(receiver));
  let rt = Runtime::new().unwrap();
  loop {
    sleep(Duration::from_secs(1));
    rt.block_on(async {
      let now = euqueue(&sender, Task::new(move || SystemTime::now())).await;
      println!("{:?}", now);
    });
  }
}

fn euqueue<R>(sender: &Sender<Task<R>>, task: Task<R>) -> TaskFuture<R> {
  let future = TaskFuture {
    state: task.state.clone()
  };
  sender.send(task).unwrap();
  future
}

fn event_loop<R>(receiver: Receiver<Task<R>>) {
  loop {
    let mut exec = receiver.recv().unwrap();
    exec.execute();
  }
}

type Executable<R> = dyn (FnMut() -> R) + Send + 'static;

struct TaskState<R> {
  result: Option<R>,
  waker: Option<Waker>,
}

struct Task<R> {
  executable: Box<Executable<R>>,
  state: Arc<Mutex<TaskState<R>>>,
}

impl<R> Task<R> {
  pub fn new<E>(executable: E) -> Self where E: (FnMut() -> R) + Send + 'static {
    Self {
      executable: Box::new(executable),
      state: Arc::new(Mutex::new(TaskState {
        result: None,
        waker: None,
      })),
    }
  }
  pub fn execute(&mut self) {
    let result = (self.executable)();
    let mut state = self.state.lock().unwrap();
    state.result = Some(result);
    if let Some(waker) = state.waker.take() {
      waker.wake();
    }
  }
}

struct TaskFuture<R> {
  state: Arc<Mutex<TaskState<R>>>
}

impl<R> Future for TaskFuture<R> {
  type Output = R;
  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut state = self.state.lock().unwrap();
    if let Some(result) = state.result.take() {
      Poll::Ready(result)
    } else {
      state.waker = Some(cx.waker().clone());
      Poll::Pending
    }
  }
}

すでに存在する特定のスレッドに処理を委譲しなければならないため async 関数を使うことができない点に注意。Rust のスレッド間通信は原則として channel を使用するのが望ましい。channel は片側のスレッドで send したデータをもう片方のスレッドから receive することができる (目的は Go の chan や Java の BlockingQueue と似ている)。

非同期で実行する処理 dyn FnMut() はコンパイル時にサイズが決定できないため Box 化する。また、受け渡しを行うために Send が必要である。

TaskState がスレッド間で共有される。

Sender<Task> イベントループに Task を送信する。
Receiver<Task> イベントループが Task を受け取る。
TaskState スレッド間で共有される状態。