並行プログラミング

Takami Torao
  • このエントリーをはてなブックマークに追加

概要

現代のコンピュータには個人向けであってもマルチコアプロセッサとマルチタスク OS が搭載されている。また汎用プログラミング言語でも言語仕様または標準ライブラリで async/awaitFuture といった機能を積極的に取り入れる風潮が続いている。

並行プログラミングとは、そのような環境で効率的な処理を実装するために複数のタスクを同時に実行するプログラミング技法である。並行性によりプログラムは一度に多くのタスクを処理することができるが、並行プログラムを書くことはことさら簡単なことではない。スレッドやロックなどの構造を処理し、競合状態やデッドロックなどを回避することは非常に面倒な作業であり、並行プログラミングの作成が困難である理由でもある。

Table of Contents

  1. 概要
  2. 導入
  3. 並行化プリミティブ
    1. プロセスとスレッド
    2. 割り込みとシグナル
    3. セマフォ
    4. poll/select
    5. software transactional memory, CAS
  4. ビルディングブロック
    1. Fork/Join パターン
    2. Worker パターン
    3. Future パターン
    4. async/await
    5. Latch パターン
    6. Channel パターン
    7. Select パターン
      1. Rust: tokio::select! マクロ
      2. Go: select 構文
      3. Scala:zio

導入

コンピュータに対する一連の命令はプログラム (アルゴリズム) として記述される。プロセス (process) は 1 つ以上のプロセッサ上でプログラムが実行されることによって生成される実行状態の動的なインスタンスである。並行プログラム (または並行アルゴリズム) は、共有メモリや TCP/IP などの通信媒体を介して協力する逐次的ステートマシンの集合の記述である。

並行性 (concurrency) とは 2 つ以上の処理 (計算) が同時に進行している状態でのシステム挙動の特性を表している。並行性を持つ実行形態のことを並行処理 (concurrent processing) と呼び、反対に、ある時点においてたかだか 1 つの処理しか実行状態とならない処理形態のことを逐次処理 (sequential processing) と呼ぶ。

\(A\) と \(B\) をそれぞれ独立した処理としたとき、逐次処理では処理 \(A\) が終了したあとでしか別の処理 \(B\) を開始できないが、並行処理では処理 \(A\) の終了を待つことなく処理 \(B\) を開始することができる。Fig 1 の例では、逐次処理はある時点で実行状態にあるタスク (厳密には定義しないがステップや命令のような処理単位) がたかだか 1 つしか存在しないのに対して、並行処理では同時に複数のタスクが実行状態になることができる。

Fig 1. 逐次処理と並行処理。

並行性は必ずしも複数のプロセッサ (演算装置) の存在を前提としていないことに注意。シングルプロセッサ環境であっても複数のタスクを短時間で切り替えながら進行することで並行性を持つことができる (このような動作をプリエンプション (preemption) やコンテキストスイッチ (CS; context switch) と呼ぶ)。反対に、複数の物理プロセッサが利用できる環境であってもプログラムを 1 タスクごとしか実行しない/実行できないのであればその処理は逐次的な特性に従う。

並行性の文脈において複数の物理プロセッサが同時にタスクを実行する環境の特性並列性 (parallelism) と呼び、並列性を持つ演算処理を並列演算 (parallel computing) と呼ぶ。並行と並列はしばしば混同されるが、並列処理はマルチプロセッサやベクトルプロセッサのような環境を前提とした課題によりフォーカスしている (並行性が対象とする課題には並列性も含まれている)。この記事では並行性に焦点を当て並列性については深く扱わない。

ある処理をネットワークを介して複数のコンピュータで実行する計算手法を分散処理 (distributed computing) と呼ぶ。分散処理は並行/並列処理の「物理プロセッサ」にネットワーク上のコンピュータを適用したもので、その特性は並行性や並列性の課題と広く共通している。

逐次処理は線形性 (linearization) が保たれることから、並行性に起因する様々な問題が排除されている。

非同期 (asynchronous) とは、ある状態から次の状態に遷移するために時間に関する前提がないことを意味する。つまり非同期処理は任意の速度で逐次処理を進行することができる。

並行化プリミティブ

プロセスとスレッド

スレッドプール

割り込みとシグナル

セマフォ

poll/select

software transactional memory, CAS

ビルディングブロック

一般的なソフトウェア開発向けに利用されているプログラミング言語の中で、言語仕様と標準ライブラリのレイヤー-でマルチスレッド機能が利用できるようになったのは私の知る限り Java (1996) が最初である。Win95 や Linux などのマルチタスク OS が普及する時勢に後押しされて (”エージェント指向” のような言葉が生まれるなど) この頃からマルチスレッドを前提としたプログラミングの理解が世間に浸透した。

当初こそ無造作にスレッド化するような設計が多く見られたが、x86 環境におけるコンテキストスイッチのコストの高さが障壁となり比較的早い段階で歩止まりに見舞われた。現在では限られた数のスレッド / プロセスリソースのそれぞれをなるべく wait させないで効率的に使用する設計が主流となりつつある。

Fork/Join パターン

Worker パターン

Future パターン

Promise とも呼ばれる。

async/await

Latch パターン

Channel パターン

Select パターン

Select パターンは並行化プリミティブで紹介した select() をより抽象化し汎用的にした設計と言える。複数の同期オブジェクトの少なくとも1つがシグナル状態となるまで待機し、シグナル状態になった同期オブジェクトの一つを選んで関連付けられた処理を実行する。オプションで、いずれの同期オブジェクトもシグナル状態になっていなければ直ちに選択される fallback の分岐を設置することもできる。

Fig 2 は 🍎りんごが落ちる、🐔鳥が罠にかかる、🐟魚が釣れる、という 3 つのイベントを待機して、最初に起きた 🍎 イベントに関連する処理を (イベントの結果として得たりんごを伴って) 実行する様子を表している。

この例が示すように、Select パターンは並行処理の観点では one-of-many の同期ポイントの構築であり、逐次処理の観点では分岐である。

Fig 2. Select パターンを猟や採集に例えると。

Select パターンではしばしば複数の同期オブジェクトが同時にシグナル状態となったにも関わらず一つしか選択されないことが問題となる。

一つは fairness に関する問題である。Select がシグナル状態となった同期オブジェクトのどれを選択するかというポリシーは全体の動作に影響を与えることがある。例えば Fig 2 の例が上から順に優先して選択するポリシーだった場合、極高頻度でりんごが落下する環境では鳥や魚がかかっても実際にそれを得る機会はほとんどなく、想定よりも肉類の供給が滞ることでシステム全体の停止を引き起こすかもしれない。逆に、緊急性の高いイベントは他のイベントより優先して選択してもらいたいケースも考えられる。

もう一つの問題は選択されたなかった同期オブジェクトの状態である。同期化を中断された他の同期オブジェクトがどのような状態となっているかは Select または同期オブジェクトの設計に依存している。さらに、Select パターンはしばしば外側にループを伴って利用されることから、中断された同期オブジェクトが再び Select で使用されたときにどのような振る舞いとなるかに注意を払う必要がある。

例えば Fig 2 でりんごが落ちると同時に魚が釣れていた場合、誰も引き上げなかったその魚はどうなるのだろうか? 再び Select に到達したときに以前にかかっていた魚がかかったままである保証はあるのだろうか? 10 バイト中 3 バイトまで読み込んだ同期オブジェクトが再び Select で参照されたとき正しく 3 バイトを保持し 4 バイト目から続きを読み出す保証はあるだろうか?

Rust: tokio::select! マクロ

Rust では tokio::select! マクロが Select パターンを実装している (公式の Tutorial 参照)。この実装は poll の成功した Future に分岐し処理を実行する。

以下の例は起動後にブラウザで http://localhost:8080 にアクセスするとメッセージを表示する。ただし CTRL+C を押すか何も起きず 10 秒経過するか、または外部からシャットダウンの通知を受けるか (つまり標準入力に exit と入力されるか) のいずれかのイベントが発生すると終了する。

use std::time::Duration;
use tokio;
use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
  let (mut tx, rx) = oneshot::channel();

  let h = tokio::spawn(async move {
    // EOF に達するか `exit` と入力されるまで標準入力を繰り返し読み込み
    let mut lines = BufReader::new(io::stdin()).lines();
    loop {
      eprint!("INPUT> ");
      tokio::select! {
        result = lines.next_line() => {
          match result {
            Ok(Some(line)) if line != "exit" => (),
            _ => break,
          }
        }
        _ = tx.closed() => break,
      }
    }
    let _ = tx.send(());
  });

  server_loop(rx).await;
  h.await.unwrap();
  eprintln!("\nterminated");
}

async fn server_loop(mut shutdown: oneshot::Receiver<()>) {
  let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
  'event: loop {
    tokio::select! {
      _ = tokio::signal::ctrl_c() => break 'event,
      _ = tokio::time::sleep(Duration::from_secs(10)) => break 'event,
      _ = &mut shutdown => break 'event,
      result = listener.accept() => {
        let (mut socket, _) = result.unwrap();
        socket.write_all(b"HTTP/1.1 200 Ok\r\nContent-Type: text/html\r\n\r\nhello, world").await.unwrap();
      }
    }
  }
}

tokio::select! で選択されなかった Future の中断状態は、Future を実装している同期オブジェクトに強く依存していることに注意 (モノによってはそれ以降正しく継続することができない)。中断された Future の挙動や fairness については API リファレンスを参照。

[dependencies]
tokio = { version = "1.17", features = ["full"] }

Go: select 構文

Go では言語標準の select 構文で Select パターンを使うことができる (構文仕様参照)。以下の例は 2 つの goroutine を使用し、1 秒おきに現在時刻を表示しながら 10 秒が経過したら終了する。

package main

import (
	"fmt"
	"time"
)

func main() {
	// 1 秒ごとに現在時刻を送信するチャネル
	clock := make(chan time.Time)
	go func() {
		for {
			clock <- time.Now()
			time.Sleep(1 * time.Second)
		}
	}()

	// 10 秒後にシャットダウンを送信するチャネル
	shutdown := make(chan string)
	go func() {
		time.Sleep(10 * time.Second)
		shutdown <- "shutdown"
	}()

	alarm(clock, shutdown)
}

func alarm(clock chan time.Time, shutdown chan string) {
alarm:
	for {
		select {
		case <-shutdown:
			fmt.Printf("ding-dong!\n")
			break alarm
		case tm := <-clock:
			fmt.Printf("%s\n", tm)
		}
	}
}

go ではチャネル chan に対してのみ Select パターンを適用できることと、go のチャネルは単なるスレッドセーフなキューでしかないことから、選択されなかったチャネルのメッセージが欠落するような問題は発生しない。

Scala:zio

メモ。