並行プログラミング
概要
現代のコンピュータには個人向けであってもマルチコアプロセッサとマルチタスク OS が搭載されている。また汎用プログラミング言語でも言語仕様または標準ライブラリで async/await
や Future
といった機能を積極的に取り入れる風潮が続いている。
並行プログラミングとは、そのような環境で効率的な処理を実装するために複数のタスクを同時に実行するプログラミング技法である。並行性によりプログラムは一度に多くのタスクを処理することができるが、並行プログラムを書くことはことさら簡単なことではない。スレッドやロックなどの構造を処理し、競合状態やデッドロックなどを回避することは非常に面倒な作業であり、並行プログラミングの作成が困難である理由でもある。
Table of Contents
導入
コンピュータに対する一連の命令はプログラム (アルゴリズム) として記述される。プロセス (process) は 1 つ以上のプロセッサ上でプログラムが実行されることによって生成される実行状態の動的なインスタンスである。並行プログラム (または並行アルゴリズム) は、共有メモリや TCP/IP などの通信媒体を介して協力する逐次的ステートマシンの集合の記述である。
並行性 (concurrency) とは 2 つ以上の処理 (計算) が同時に進行している状態でのシステム挙動の特性を表している。並行性を持つ実行形態のことを並行処理 (concurrent processing) と呼び、反対に、ある時点においてたかだか 1 つの処理しか実行状態とならない処理形態のことを逐次処理 (sequential processing) と呼ぶ。
\(A\) と \(B\) をそれぞれ独立した処理としたとき、逐次処理では処理 \(A\) が終了したあとでしか別の処理 \(B\) を開始できないが、並行処理では処理 \(A\) の終了を待つことなく処理 \(B\) を開始することができる。Fig 1 の例では、逐次処理はある時点で実行状態にあるタスク (厳密には定義しないがステップや命令のような処理単位) がたかだか 1 つしか存在しないのに対して、並行処理では同時に複数のタスクが実行状態になることができる。
並行性は必ずしも複数のプロセッサ (演算装置) の存在を前提としていないことに注意。シングルプロセッサ環境であっても複数のタスクを短時間で切り替えながら進行することで並行性を持つことができる (このような動作をプリエンプション (preemption) やコンテキストスイッチ (CS; context switch) と呼ぶ)。反対に、複数の物理プロセッサが利用できる環境であってもプログラムを 1 タスクごとしか実行しない/実行できないのであればその処理は逐次的な特性に従う。
並行性の文脈において複数の物理プロセッサが同時にタスクを実行する環境の特性を並列性 (parallelism) と呼び、並列性を持つ演算処理を並列演算 (parallel computing) と呼ぶ。並行と並列はしばしば混同されるが、並列処理はマルチプロセッサやベクトルプロセッサのような環境を前提とした課題によりフォーカスしている (並行性が対象とする課題には並列性も含まれている)。この記事では並行性に焦点を当て並列性については深く扱わない。
ある処理をネットワークを介して複数のコンピュータで実行する計算手法を分散処理 (distributed computing) と呼ぶ。分散処理は並行/並列処理の「物理プロセッサ」にネットワーク上のコンピュータを適用したもので、その特性は並行性や並列性の課題と広く共通している。
逐次処理は線形性 (linearization) が保たれることから、並行性に起因する様々な問題が排除されている。
非同期 (asynchronous) とは、ある状態から次の状態に遷移するために時間に関する前提がないことを意味する。つまり非同期処理は任意の速度で逐次処理を進行することができる。
並行化プリミティブ
プロセスとスレッド
スレッドプール
割り込みとシグナル
セマフォ
poll/select
software transactional memory, CAS
ビルディングブロック
一般的なソフトウェア開発向けに利用されているプログラミング言語の中で、言語仕様と標準ライブラリのレイヤー-でマルチスレッド機能が利用できるようになったのは私の知る限り Java (1996) が最初である。Win95 や Linux などのマルチタスク OS が普及する時勢に後押しされて (”エージェント指向” のような言葉が生まれるなど) この頃からマルチスレッドを前提としたプログラミングの理解が世間に浸透した。
当初こそ無造作にスレッド化するような設計が多く見られたが、x86 環境におけるコンテキストスイッチのコストの高さが障壁となり比較的早い段階で歩止まりに見舞われた。現在では限られた数のスレッド / プロセスリソースのそれぞれをなるべく wait させないで効率的に使用する設計が主流となりつつある。
Fork/Join パターン
Worker パターン
Future パターン
Promise
とも呼ばれる。
async/await
Latch パターン
Channel パターン
Select パターン
Select パターンは並行化プリミティブで紹介した select()
をより抽象化し汎用的にした設計と言える。複数の同期オブジェクトの少なくとも1つがシグナル状態となるまで待機し、シグナル状態になった同期オブジェクトの一つを選んで関連付けられた処理を実行する。オプションで、いずれの同期オブジェクトもシグナル状態になっていなければ直ちに選択される fallback の分岐を設置することもできる。
Fig 2 は 🍎りんごが落ちる、🐔鳥が罠にかかる、🐟魚が釣れる、という 3 つのイベントを待機して、最初に起きた 🍎 イベントに関連する処理を (イベントの結果として得たりんごを伴って) 実行する様子を表している。
この例が示すように、Select パターンは並行処理の観点では one-of-many の同期ポイントの構築であり、逐次処理の観点では分岐である。
Select パターンではしばしば複数の同期オブジェクトが同時にシグナル状態となったにも関わらず一つしか選択されないことが問題となる。
一つは fairness に関する問題である。Select がシグナル状態となった同期オブジェクトのどれを選択するかというポリシーは全体の動作に影響を与えることがある。例えば Fig 2 の例が上から順に優先して選択するポリシーだった場合、極高頻度でりんごが落下する環境では鳥や魚がかかっても実際にそれを得る機会はほとんどなく、想定よりも肉類の供給が滞ることでシステム全体の停止を引き起こすかもしれない。逆に、緊急性の高いイベントは他のイベントより優先して選択してもらいたいケースも考えられる。
もう一つの問題は選択されたなかった同期オブジェクトの状態である。同期化を中断された他の同期オブジェクトがどのような状態となっているかは Select または同期オブジェクトの設計に依存している。さらに、Select パターンはしばしば外側にループを伴って利用されることから、中断された同期オブジェクトが再び Select で使用されたときにどのような振る舞いとなるかに注意を払う必要がある。
例えば Fig 2 でりんごが落ちると同時に魚が釣れていた場合、誰も引き上げなかったその魚はどうなるのだろうか? 再び Select に到達したときに以前にかかっていた魚がかかったままである保証はあるのだろうか? 10 バイト中 3 バイトまで読み込んだ同期オブジェクトが再び Select で参照されたとき正しく 3 バイトを保持し 4 バイト目から続きを読み出す保証はあるだろうか?
Rust: tokio::select!
マクロ
Rust では tokio::select!
マクロが Select パターンを実装している (公式の Tutorial 参照)。この実装は poll の成功した Future
に分岐し処理を実行する。
以下の例は起動後にブラウザで http://localhost:8080 にアクセスするとメッセージを表示する。ただし CTRL+C を押すか何も起きず 10 秒経過するか、または外部からシャットダウンの通知を受けるか (つまり標準入力に exit
と入力されるか) のいずれかのイベントが発生すると終了する。
use std::time::Duration;
use tokio;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (mut tx, rx) = oneshot::channel();
let h = tokio::spawn(async move {
// EOF に達するか `exit` と入力されるまで標準入力を繰り返し読み込み
let mut lines = BufReader::new(io::stdin()).lines();
loop {
eprint!("INPUT> ");
tokio::select! {
result = lines.next_line() => {
match result {
Ok(Some(line)) if line != "exit" => (),
_ => break,
}
}
_ = tx.closed() => break,
}
}
let _ = tx.send(());
});
server_loop(rx).await;
h.await.unwrap();
eprintln!("\nterminated");
}
async fn server_loop(mut shutdown: oneshot::Receiver<()>) {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
'event: loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => break 'event,
_ = tokio::time::sleep(Duration::from_secs(10)) => break 'event,
_ = &mut shutdown => break 'event,
result = listener.accept() => {
let (mut socket, _) = result.unwrap();
socket.write_all(b"HTTP/1.1 200 Ok\r\nContent-Type: text/html\r\n\r\nhello, world").await.unwrap();
}
}
}
}
tokio::select!
で選択されなかった Future
の中断状態は、Future
を実装している同期オブジェクトに強く依存していることに注意 (モノによってはそれ以降正しく継続することができない)。中断された Future
の挙動や fairness については API リファレンスを参照。
[dependencies]
tokio = { version = "1.17", features = ["full"] }
Go: select
構文
Go では言語標準の select
構文で Select パターンを使うことができる (構文仕様参照)。以下の例は 2 つの goroutine を使用し、1 秒おきに現在時刻を表示しながら 10 秒が経過したら終了する。
package main
import (
"fmt"
"time"
)
func main() {
// 1 秒ごとに現在時刻を送信するチャネル
clock := make(chan time.Time)
go func() {
for {
clock <- time.Now()
time.Sleep(1 * time.Second)
}
}()
// 10 秒後にシャットダウンを送信するチャネル
shutdown := make(chan string)
go func() {
time.Sleep(10 * time.Second)
shutdown <- "shutdown"
}()
alarm(clock, shutdown)
}
func alarm(clock chan time.Time, shutdown chan string) {
alarm:
for {
select {
case <-shutdown:
fmt.Printf("ding-dong!\n")
break alarm
case tm := <-clock:
fmt.Printf("%s\n", tm)
}
}
}
go ではチャネル chan
に対してのみ Select パターンを適用できることと、go のチャネルは単なるスレッドセーフなキューでしかないことから、選択されなかったチャネルのメッセージが欠落するような問題は発生しない。
Scala:zio
メモ。
golang や rust の非同期処理とメンタルモデルが近いのはFuture よりも cats-effect や zio の fiber ですね.
- 110416 (@by110416) March 22, 2022