Rust: スレッド間のデータ共有パターン

Takami Torao Rust 1.53 #Rust
  • このエントリーをはてなブックマークに追加

概要

Table of Contents

  1. 概要
  2. Cache Aside パターン
    1. 参照専用データの共有
    2. 更新チェック付きデータ共有
    3. 更新用データの共有

Cache Aside パターン

参照専用データの共有

immutable なデータを参照するだけの目的で複数のスレッドで共有することは Rust における最も簡単なデータ共有パターンと言える。典型的な例はデータを構築するために DB 読み出しや複雑な演算コストがかかるケースだろう。このような場合、共有しようとしている immutable なデータを非同期版参照カウント式のスマートポインタである Arc を介して共有するだけで良い。

以下のコードは重み付きランダムサンプリングを使った例である。各項目の降順ソート済み重み weights の参照を Arc::clone() で各スレッドに渡している。各スレッドは重みを読み込んでいるだけで更新は行っていない。

use ed25519_dalek::{Keypair, Signature, Signer};
use rand::{Rng, thread_rng};
use rand::rngs::OsRng;
use std::sync::Arc;
use std::thread::{JoinHandle, spawn};

#[test]
fn multi_threaded_signing() {

  // 鍵ペアの作成
  let mut csprng = OsRng {};
  let keypair = Keypair::generate(&mut csprng);

  // 10 個のメッセージを乱数を使って生成
  let mut rand = thread_rng();
  let messages = (0..10).map(|_| {
    let mut message = [0u8; 1024];
    rand.fill(&mut message);
    message.to_vec()
  }).collect::<Vec<Vec<u8>>>();

  // 複数のスレッドで鍵ペアを共有し各メッセージに対する署名を生成
  let keypair = Arc::new(keypair);
  let mut handles = Vec::<JoinHandle<Signature>>::with_capacity(messages.len());
  for i in 0..messages.len() {
    let keypair = keypair.clone();
    let message = messages[i].clone();
    handles.push(spawn(move || keypair.sign(&message)));
  }

  // スレッドを待機して署名の取り出し
  let mut signatures = Vec::<Signature>::with_capacity(messages.len());
  while !handles.is_empty() {
    signatures.push(handles.remove(0).join().unwrap());
  }

  // 署名の検証
  assert_eq!(messages.len(), signatures.len());
  for (message, signature) in messages.iter().zip(signatures.iter()) {
    assert!(keypair.verify(message, &signature).is_ok());
  }
}

mut な操作を行わなければ問題はない。

更新チェック付きデータ共有

データが更新されていたら再読み込み。ただし、データの更新チェックも負荷となるため更新チェックは 500ms 以上間隔を開けたい (言い換えると実際のデータ更新から 500ms 以内に検知/反映させたい)。差し替え可能な参照専用データは Arc を使って

use std::env::temp_dir;
use std::fs::{metadata, OpenOptions, read_to_string, remove_file, rename, write};
use std::io::{ErrorKind, Result};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

const FILE_NAME: &str = "foo.txt";

fn path() -> PathBuf {
  temp_dir().join(FILE_NAME)
}

fn temp_path(prefix: &str, suffix: &str) -> PathBuf {
  let dir = temp_dir();
  for i in 0.. {
    let path = dir.join(format!("{}{}{}", prefix, i, suffix));
    match OpenOptions::new().create_new(true).read(true).write(true).open(path.clone()) {
      Ok(_) => return path,
      Err(err) if err.kind() == ErrorKind::AlreadyExists => (),
      Err(err) => panic!("ERROR: {}", err),
    }
  }
  unreachable!()
}

fn last_modified() -> Option<SystemTime> {
  match metadata(path()) {
    Ok(meta) if meta.is_file() => meta.modified().ok(),
    Ok(_) => None,
    Err(err) if err.kind() == ErrorKind::NotFound => None,
    Err(err) => panic!("ERROR: {}", err)
  }
}

struct Cache {
  last_verified: SystemTime,
  last_modified: Option<SystemTime>,
  data: String,
}

impl Cache {
  fn new() -> Cache {
    Cache { last_verified: UNIX_EPOCH, last_modified: None, data: String::new() }
  }

  fn load(&mut self, id: usize) -> String {
    if let Ok(elapsed) = self.last_verified.elapsed() {
      if elapsed.as_millis() > 100 {
        self.last_verified = SystemTime::now();
        let last_modified = last_modified();
        if last_modified != self.last_modified {
          self.last_modified = last_modified;
          self.data = read_to_string(path()).unwrap();
          println!("{}: --- load ---", id);
        }
      }
    }
    self.data.clone()
  }
}

fn save(data: &str) -> Result<()> {
  let temp = temp_path("foo", ".txt");
  write(temp.clone(), data)?;
  rename(temp, path())?;
  Ok(())
}

#[test]
fn cache_aside() {
  if last_modified().is_some() {
    remove_file(path()).unwrap();
  }

  let cache = Arc::new(Mutex::new(Cache::new()));
  for id in 0..10 {
    let cache = cache.clone();
    spawn(move || {
      loop {
        let data = cache.lock().unwrap().load(id);
        println!("{}: {}", id, data);
        sleep(Duration::from_millis(10));
      }
    });
  }

  for data in vec!["one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"] {
    save(data).unwrap();
    sleep(Duration::from_millis(250));
  }
  remove_file(path()).unwrap();
}

更新用データの共有

file:/opt/site/docroot/mox/lang/rust/recipes/concurrent/data-sharing/index.xhtml: FileNotFoundException: /opt/site/docroot/mox/lang/rust/recipes/concurrent/data-sharing/samples/src/arc_mutex.rs (No such file or directory)