Future 機能
概要
CompletableFuture は Java での非同期プログラミングで使用するクラス。非同期プログラミングはアプリケーションのメインスレッドとは別のスレッドを使用して非ブロッキングにタスクを行うための手段である。一般にこのような並列化によってプログラムのパフォーマンスは大幅に向上する。Java ではメインスレッドとタスクとで CompletableFuture を共有することによって、メインスレッドはタスクの進行状況や完了または失敗を通知として受けることができる。
CompletableFuture
は Future
を拡張する形で Java 8 に導入された (Scala や JavaScript の Future/Promise を参考にしていることは明かだろう)。Java 5 で導入された Future は非同期で実行されるタスクの結果を受け渡すクラスとして導入されたが、常にブロッキングでしか結果を受け取れないことや合成ができないこと、また例外処理の機構が無いことなど、実際に非同期を意図した設計を行うには機能不足であった。
ワーカースレッドによるタスクの実行
単に時間のかかる処理を別スレッドで行いたい場合には CompletableFuture.runAsync() または supplyAsync() を使用することができる (両者の違いはタスク側から値として結果を返すかだけである)。supplyAsync() は渡されたクロージャーを別スレッドで実行して CompletableFuture 経由で結果の値をコールバックする。この方法は自前で Thread を生成するより簡便であるし、アプリケーションで用意した Executor (Thread Pool) を使用する方法への置き換えも容易である。
以下の例では supplyAsync() の呼び出しはすぐにリターンするが、同時にワーカースレッドが (総当たりで非常に遅い) 素数判定タスクを開始する。メインスレッドは素数判定タスクの結果が出るまで join() で待機している。
import java.util.concurrent.CompletableFuture;
public class FutureSampleSupplyAsync {
public static void main(String[] args) {
int number = Integer.parseInt(args[0]);
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
boolean prime = isPrime(number);
System.out.println("END: task: " + Thread.currentThread().getName());
return prime;
});
boolean prime = future.join();
System.out.println("END: main: " + Thread.currentThread().getName() + ": " + prime);
}
private static boolean isPrime(int number) {
for (int i = 2; i < number; i++) {
if (number % i == 0) return false;
}
return true;
}
}
このプログラムを実行すると以下のような出力が行われる。メインスレッドはワーカースレッド側の素数判定タスクが終了した後に join() を抜けていることが分かるだろう。ここで ForkJoinPool
は Fork/Join を行うために Java が内部に持っているスレッドプールである。
$ java FutureSampleSupplyAsync 20181107
END: task: ForkJoinPool.commonPool-worker-1
END: main: main: false
この例での処理の並列化と合流を図で示すと以下のようになる。
runAsync() のスレッド分岐と合流この例では非同期処理を開始してすぐに join() しているため同期処理と変わらない動きとなっているが、典型的にメインスレッドは isPrime() の計算と並行して何らかの処理を行い、自身の処理が完了したところで join() するような使われ方をする。
タスク内で例外が発生した場合、メインスレッドは join() の呼び出しで例外が発生する。
この例で使用した ForkJoinPool は JavaVM 全体で共有しているスレッドプールである。無計画な消費でリソースロックが発生することを避けるため、プロダクション向けに実装する場合はリソース設計をきちんとおこなった Executor を使用すべきである。
マルチスレッド環境での CompletableFuture を使用した通信
supplyAsync() は便利な手段の一つであるが、一般的なマルチスレッドプログラミングの状況はより複雑である。スレッドの起動管理を自分で行う場合や、非同期 I/O などすでに存在する並列化されたフレームワークで使用するケースでは、スレッド間で共有する CompletableFuture を直接生成し、完了したタスクの結果を値または例外として通知する。以下は先述の例を Thread で実装したものである。
import java.util.concurrent.CompletableFuture;
public class FutureSampleThread {
public static void main(String[] args) {
int number = Integer.parseInt(args[0]);
Solver solver = new Solver(number);
solver.run();
boolean prime = solver.future.join();
System.out.println(prime);
}
private static boolean isPrime(int number) {
for (int i = 2; i < number; i++) {
if (number % i == 0) return false;
}
return true;
}
private static class Solver extends Thread {
public final CompletableFuture<Boolean> future = new CompletableFuture<>();
public final int number;
public Solver(int number) {
this.number = number;
}
public void run() {
future.complete(isPrime(number));
}
}
}
supplyAsync() や supplyRun() はこのようなスレッドの管理と結果の設定をタスクの完了時に行っている。
完了の通知と受信
CompletableFuture は成功または失敗のいずれかを明示的に指定することによって完了状態へ遷移する。
- タスクが成功した場合は
completed()で結果を通知する。単にタスクの終了のみを通知する目的で<Void>としている場合はnullを使用する。 - タスクが失敗した場合は
completedExceptionally()に例外を指定して状況を通知する。
タスクの結果を受け取るにはメインスレッドで終了を待機する方法とコールバックとして受ける方法がある。
join(),get()はCompletableFutureが完了するまで待機して結果を返す (つまり強制的に同期化する)。失敗で完了した場合はメソッド内からCompletionExceptionがスローされる。get()は checked な例外が宣言されているため try-catch で囲む必要がある (旧Futureとのインターフェース互換性のため用意されている)。- 結果が確定したタイミングでコールバックを受けたい場合は
thenAccept()やwhenComplete()を使用することができる。
変換と合成
CompletableFuture を合成するいくつかのパターンについて説明する。
ネストした CompletableFutureを平坦化する-
処理の共通化や他の非同期ライブラリを使用することによって
CompletableFutureが多重にネストすることがある。Scala でのflatten()に相当する機能は用得されていないためthenCompose()で平坦化する。CompletableFuture<CompletableFuture<String>> nested = CompletableFuture.completedFuture( CompletableFuture.completedFuture("hello, world") ); CompletableFuture<String> flatten = nested.thenCompose(Function.identity()); flatten.join(); // hello, worldjava.util.function.Function.identity()は恒等写像x -> xのを表している。
依存関係のある非同期処理を合成する-
ある Future A の結果を使用して別の Future B を生成する処理を行い、最終的に B と等価の Future を得たい場合
thenCompose()で合成する。// Future A: 非同期にデータを保存しそのバイト数を返すメソッド CompletableFuture<Long> saveAsync(Data data); // Future B: 保存したデータサイズを通知するメソッド CompletableFuture<Boolean> notifyAsync(long size); CompletableFuture<Boolean> sum = saveAsync(data).thenCompose(size -> notifyAsync(size) );単純に
thenApply()で合成するとネストしたCompletableFuture<CompletableFuture<Boolean>>が生成される。thenCompose()はラムダの返値の Future を flatMap するように作用する。しばしばラムダ内で前の結果に対して
if等の分岐を行う。分岐の 1 つ以上が Future を生成するのであれば、同期で決定可能な結果でも Future を返してthenCompose()で合成しなければならない。CompletableFuture<byte[]> data = existsAsync(id).thenCompose(exists -> { if(exists){ return readAsync(id); } else { return CompletableFuture.completedFuture(EMPTY_DATA); } });
独立した非同期処理を合成する-
CompletableFutureが複数存在する場合はallOf()を使用することで単一の Future に合成することができる。allOf()自体はCompletableFuture<Void>であるためthenApply()の中でそれぞれをjoin()したり (結果は確定しているのですぐリターンする):CompletableFuture<Integer> f1 = ...; CompletableFuture<Integer> f2 = ...; CompletableFuture<Integer> f3 = ...; CompletableFuture<Integer> sum = CompletableFuture.allOf(f1, f2, f3) .thenApply(() -> f1.join() + f2.join() + f3.join());あるいは
allOf()の結果をjoin()した後にそれぞれをjoin()して参照する。CompletableFuture.allOf(f1, f2, f3).join(); int sum = f1.join().intValue() + f2.join().intValue() + f3.join().intValue();これはリスト化された Future
List<CompletableFuture<T>>が発生した場合にも便利な方法である。
Future 化されたカーソルのイテレーション-
非同期に部分的な読み込みを進めるカーソルのような構造を Future 化したケースでは、合成によって単純な構造にすることはできないが、カーソルから読み込んだ分を逐次コールバックする方法に変換すると扱いが容易になる。例えば以下のような API を利用する場合:
public interface CUrsor<T> { public List<T> get(); public boolean isFinished(); } CompletableFuture<Cursor<T>> readFirst(); CompletableFuture<Cursor<T>> readNext(Cursor<T> cursor);部分的に読み出したデータを逐次コールバックし、全てのイテレーションが終了したら完了し読み出したデータ数を返す
CompletableFutureを返す関数を考えてみよう。long count = read(list -> list.forEach(System.out::println) ).join();readFirst()とreadNext()が返す Future が「部分的な読み込みの完了」であるのに対して、必要な Future は「全ての読み込みの完了」であることから、これらを合成するのではなく、独立したCompletableFutureに完了を設定している。class Iteratee extends BiConsumer<Cursor<T>, Throwable> { public final CompletableFuture<Long> future = new CompletableFuture<>(); private final AtomicLong count = new AtomicLong(0L); private final Consumer<List<T>> callback; public Iteratee(Consumer<List<T>> callback){ this.callback = callback; } @Override public void accept(Cursor<T> cursor, Throwable ex){ if(ex != null){ future.completeExceptionally(ex); } else try { List<T> list = cursor.get(); callback.accept(list); count.addAndGet(list.size()); if(! cursor.isFinished()){ readNext(cursor).whenCompleted(this); } else { future.complete(count.get()); } } catch(Throwable ex){ future.completeExceptionally(ex); } } } public <T> CompletableFuture<Long> read(Consumer<List<T>> callback){ Iteratee it = new Iteratee(); readFirst().whenCompleted(it); return it.future; }IterateeはwhenComplete()で再帰的に実行されているように見えるが、whenCompleted()に渡されたラムダはその時点では実行されないことから再帰呼び出しではないことに注意。
