Rust: スレッド間のデータ共有パターン
概要
Table of Contents
Cache Aside パターン
参照専用データの共有
immutable なデータを参照するだけの目的で複数のスレッドで共有することは Rust における最も簡単なデータ共有パターンと言える。典型的な例はデータを構築するために DB 読み出しや複雑な演算コストがかかるケースだろう。このような場合、共有しようとしている immutable なデータを非同期版参照カウント式のスマートポインタである Arc
を介して共有するだけで良い。
以下のコードは重み付きランダムサンプリングを使った例である。各項目の降順ソート済み重み weights
の参照を Arc::clone()
で各スレッドに渡している。各スレッドは重みを読み込んでいるだけで更新は行っていない。
use ed25519_dalek::{Keypair, Signature, Signer};
use rand::{Rng, thread_rng};
use rand::rngs::OsRng;
use std::sync::Arc;
use std::thread::{JoinHandle, spawn};
#[test]
fn multi_threaded_signing() {
// 鍵ペアの作成
let mut csprng = OsRng {};
let keypair = Keypair::generate(&mut csprng);
// 10 個のメッセージを乱数を使って生成
let mut rand = thread_rng();
let messages = (0..10).map(|_| {
let mut message = [0u8; 1024];
rand.fill(&mut message);
message.to_vec()
}).collect::<Vec<Vec<u8>>>();
// 複数のスレッドで鍵ペアを共有し各メッセージに対する署名を生成
let keypair = Arc::new(keypair);
let mut handles = Vec::<JoinHandle<Signature>>::with_capacity(messages.len());
for i in 0..messages.len() {
let keypair = keypair.clone();
let message = messages[i].clone();
handles.push(spawn(move || keypair.sign(&message)));
}
// スレッドを待機して署名の取り出し
let mut signatures = Vec::<Signature>::with_capacity(messages.len());
while !handles.is_empty() {
signatures.push(handles.remove(0).join().unwrap());
}
// 署名の検証
assert_eq!(messages.len(), signatures.len());
for (message, signature) in messages.iter().zip(signatures.iter()) {
assert!(keypair.verify(message, &signature).is_ok());
}
}
mut
な操作を行わなければ問題はない。
更新チェック付きデータ共有
データが更新されていたら再読み込み。ただし、データの更新チェックも負荷となるため更新チェックは 500ms 以上間隔を開けたい (言い換えると実際のデータ更新から 500ms 以内に検知/反映させたい)。差し替え可能な参照専用データは Arc
を使って
use std::env::temp_dir;
use std::fs::{metadata, OpenOptions, read_to_string, remove_file, rename, write};
use std::io::{ErrorKind, Result};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
const FILE_NAME: &str = "foo.txt";
fn path() -> PathBuf {
temp_dir().join(FILE_NAME)
}
fn temp_path(prefix: &str, suffix: &str) -> PathBuf {
let dir = temp_dir();
for i in 0.. {
let path = dir.join(format!("{}{}{}", prefix, i, suffix));
match OpenOptions::new().create_new(true).read(true).write(true).open(path.clone()) {
Ok(_) => return path,
Err(err) if err.kind() == ErrorKind::AlreadyExists => (),
Err(err) => panic!("ERROR: {}", err),
}
}
unreachable!()
}
fn last_modified() -> Option<SystemTime> {
match metadata(path()) {
Ok(meta) if meta.is_file() => meta.modified().ok(),
Ok(_) => None,
Err(err) if err.kind() == ErrorKind::NotFound => None,
Err(err) => panic!("ERROR: {}", err)
}
}
struct Cache {
last_verified: SystemTime,
last_modified: Option<SystemTime>,
data: String,
}
impl Cache {
fn new() -> Cache {
Cache { last_verified: UNIX_EPOCH, last_modified: None, data: String::new() }
}
fn load(&mut self, id: usize) -> String {
if let Ok(elapsed) = self.last_verified.elapsed() {
if elapsed.as_millis() > 100 {
self.last_verified = SystemTime::now();
let last_modified = last_modified();
if last_modified != self.last_modified {
self.last_modified = last_modified;
self.data = read_to_string(path()).unwrap();
println!("{}: --- load ---", id);
}
}
}
self.data.clone()
}
}
fn save(data: &str) -> Result<()> {
let temp = temp_path("foo", ".txt");
write(temp.clone(), data)?;
rename(temp, path())?;
Ok(())
}
#[test]
fn cache_aside() {
if last_modified().is_some() {
remove_file(path()).unwrap();
}
let cache = Arc::new(Mutex::new(Cache::new()));
for id in 0..10 {
let cache = cache.clone();
spawn(move || {
loop {
let data = cache.lock().unwrap().load(id);
println!("{}: {}", id, data);
sleep(Duration::from_millis(10));
}
});
}
for data in vec!["one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"] {
save(data).unwrap();
sleep(Duration::from_millis(250));
}
remove_file(path()).unwrap();
}
更新用データの共有
file:/opt/site/docroot/mox/lang/rust/recipes/concurrent/data-sharing/index.xhtml: FileNotFoundException: /opt/site/docroot/mox/lang/rust/recipes/concurrent/data-sharing/samples/src/arc_mutex.rs (No such file or directory)