Future 機能

Takami Torao Java 8
  • このエントリーをはてなブックマークに追加

概要

CompletableFuture は Java での非同期プログラミングで使用するクラス。非同期プログラミングはアプリケーションのメインスレッドとは別のスレッドを使用して非ブロッキングにタスクを行うための手段である。一般にこのような並列化によってプログラムのパフォーマンスは大幅に向上する。Java ではメインスレッドとタスクとで CompletableFuture を共有することによって、メインスレッドはタスクの進行状況や完了または失敗を通知として受けることができる。

CompletableFutureFutureを拡張する形で Java 8 に導入された (Scala や JavaScript の Future/Promise を参考にしていることは明かだろう)。Java 5 で導入された Future は非同期で実行されるタスクの結果を受け渡すクラスとして導入されたが、常にブロッキングでしか結果を受け取れないことや合成ができないこと、また例外処理の機構が無いことなど、実際に非同期を意図した設計を行うには機能不足であった。

ワーカースレッドによるタスクの実行

単に時間のかかる処理を別スレッドで行いたい場合には CompletableFuture.runAsync() または supplyAsync() を使用することができる (両者の違いはタスク側から値として結果を返すかだけである)。supplyAsync() は渡されたクロージャーを別スレッドで実行して CompletableFuture 経由で結果の値をコールバックする。この方法は自前で Thread を生成するより簡便であるし、アプリケーションで用意した Executor (Thread Pool) を使用する方法への置き換えも容易である。

以下の例では supplyAsync() の呼び出しはすぐにリターンするが、同時にワーカースレッドが (総当たりで非常に遅い) 素数判定タスクを開始する。メインスレッドは素数判定タスクの結果が出るまで join() で待機している。

import java.util.concurrent.CompletableFuture;

public class FutureSampleSupplyAsync {

  public static void main(String[] args) {
    int number = Integer.parseInt(args[0]);

    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
      boolean prime = isPrime(number);
      System.out.println("END: task: " + Thread.currentThread().getName());
      return prime;
    });

    boolean prime = future.join();
    System.out.println("END: main: " + Thread.currentThread().getName() + ": " + prime);
  }

  private static boolean isPrime(int number) {
    for (int i = 2; i < number; i++) {
      if (number % i == 0) return false;
    }
    return true;
  }
}

このプログラムを実行すると以下のような出力が行われる。メインスレッドはワーカースレッド側の素数判定タスクが終了した後に join() を抜けていることが分かるだろう。ここで ForkJoinPoolは Fork/Join を行うために Java が内部に持っているスレッドプールである。

$ java FutureSampleSupplyAsync 20181107
END: task: ForkJoinPool.commonPool-worker-1
END: main: main: false

この例での処理の並列化と合流を図で示すと以下のようになる。

Fig 1. runAsync() のスレッド分岐と合流

この例では非同期処理を開始してすぐに join() しているため同期処理と変わらない動きとなっているが、典型的にメインスレッドは isPrime() の計算と並行して何らかの処理を行い、自身の処理が完了したところで join() するような使われ方をする。

タスク内で例外が発生した場合、メインスレッドは join() の呼び出しで例外が発生する。

この例で使用した ForkJoinPool は JavaVM 全体で共有しているスレッドプールである。無計画な消費でリソースロックが発生することを避けるため、プロダクション向けに実装する場合はリソース設計をきちんとおこなった Executor を使用すべきである。

マルチスレッド環境での CompletableFuture を使用した通信

supplyAsync() は便利な手段の一つであるが、一般的なマルチスレッドプログラミングの状況はより複雑である。スレッドの起動管理を自分で行う場合や、非同期 I/O などすでに存在する並列化されたフレームワークで使用するケースでは、スレッド間で共有する CompletableFuture を直接生成し、完了したタスクの結果を値または例外として通知する。以下は先述の例を Thread で実装したものである。

import java.util.concurrent.CompletableFuture;

public class FutureSampleThread {

  public static void main(String[] args) {
    int number = Integer.parseInt(args[0]);

    Solver solver = new Solver(number);
    solver.run();

    boolean prime = solver.future.join();
    System.out.println(prime);
  }

  private static boolean isPrime(int number) {
    for (int i = 2; i < number; i++) {
      if (number % i == 0) return false;
    }
    return true;
  }

  private static class Solver extends Thread {
    public final CompletableFuture<Boolean> future = new CompletableFuture<>();
    public final int number;
    public Solver(int number) {
      this.number = number;
    }
    public void run() {
      future.complete(isPrime(number));
    }
  }
}

supplyAsync()supplyRun() はこのようなスレッドの管理と結果の設定をタスクの完了時に行っている。

完了の通知と受信

CompletableFuture成功または失敗のいずれかを明示的に指定することによって完了状態へ遷移する。

  • タスクが成功した場合は completed() で結果を通知する。単にタスクの終了のみを通知する目的で <Void> としている場合は null を使用する。
  • タスクが失敗した場合は completedExceptionally() に例外を指定して状況を通知する。

タスクの結果を受け取るにはメインスレッドで終了を待機する方法とコールバックとして受ける方法がある。

  • join(), get()CompletableFuture完了するまで待機して結果を返す (つまり強制的に同期化する)。失敗で完了した場合はメソッド内から CompletionException がスローされる。get() は checked な例外が宣言されているため try-catch で囲む必要がある (旧 Future とのインターフェース互換性のため用意されている)。
  • 結果が確定したタイミングでコールバックを受けたい場合は thenAccept()whenComplete() を使用することができる。

変換と合成

CompletableFuture を合成するいくつかのパターンについて説明する。

ネストした CompletableFuture を平坦化する

処理の共通化や他の非同期ライブラリを使用することによって CompletableFuture が多重にネストすることがある。Scala での flatten() に相当する機能は用得されていないため thenCompose() で平坦化する。

CompletableFuture<CompletableFuture<String>> nested = CompletableFuture.completedFuture(
  CompletableFuture.completedFuture("hello, world")
);

CompletableFuture<String> flatten = nested.thenCompose(Function.identity());

flatten.join();   // hello, world

java.util.function.Function.identity() は恒等写像 x -> x のを表している。

依存関係のある非同期処理を合成する

ある Future A の結果を使用して別の Future B を生成する処理を行い、最終的に B と等価の Future を得たい場合 thenCompose() で合成する

// Future A: 非同期にデータを保存しそのバイト数を返すメソッド
CompletableFuture<Long> saveAsync(Data data);

// Future B: 保存したデータサイズを通知するメソッド
CompletableFuture<Boolean> notifyAsync(long size);

CompletableFuture<Boolean> sum = saveAsync(data).thenCompose(size ->
  notifyAsync(size)
);

単純に thenApply() で合成するとネストした CompletableFuture<CompletableFuture<Boolean>> が生成される。thenCompose() はラムダの返値の Future を flatMap するように作用する。

Fig 1. andThen() による合成。

しばしばラムダ内で前の結果に対して if 等の分岐を行う。分岐の 1 つ以上が Future を生成するのであれば、同期で決定可能な結果でも Future を返して thenCompose() で合成しなければならない。

CompletableFuture<byte[]> data = existsAsync(id).thenCompose(exists -> {
  if(exists){
    return readAsync(id);
  } else {
    return CompletableFuture.completedFuture(EMPTY_DATA);
  }
});
独立した非同期処理を合成する

CompletableFuture が複数存在する場合は allOf() を使用することで単一の Future に合成することができる。allOf() 自体は CompletableFuture<Void> であるため thenApply() の中でそれぞれを join() したり (結果は確定しているのですぐリターンする):

CompletableFuture<Integer> f1 = ...;
CompletableFuture<Integer> f2 = ...;
CompletableFuture<Integer> f3 = ...;

CompletableFuture<Integer> sum = CompletableFuture.allOf(f1, f2, f3)
  .thenApply(() -> f1.join() + f2.join() + f3.join());

あるいは allOf() の結果を join() した後にそれぞれを join() して参照する。

CompletableFuture.allOf(f1, f2, f3).join();
int sum = f1.join().intValue() + f2.join().intValue() + f3.join().intValue();

これはリスト化された Future List<CompletableFuture<T>> が発生した場合にも便利な方法である。

Future 化されたカーソルのイテレーション

非同期に部分的な読み込みを進めるカーソルのような構造を Future 化したケースでは、合成によって単純な構造にすることはできないが、カーソルから読み込んだ分を逐次コールバックする方法に変換すると扱いが容易になる。例えば以下のような API を利用する場合:

public interface CUrsor<T> {
  public List<T> get();
  public boolean isFinished();
}
CompletableFuture<Cursor<T>> readFirst();
CompletableFuture<Cursor<T>> readNext(Cursor<T> cursor);

部分的に読み出したデータを逐次コールバックし、全てのイテレーションが終了したら完了し読み出したデータ数を返す CompletableFuture を返す関数を考えてみよう。

long count = read(list ->
  list.forEach(System.out::println)
).join();

readFirst()readNext() が返す Future が「部分的な読み込みの完了」であるのに対して、必要な Future は「全ての読み込みの完了」であることから、これらを合成するのではなく、独立した CompletableFuture に完了を設定している。

class Iteratee extends BiConsumer<Cursor<T>, Throwable> {
  public final CompletableFuture<Long> future = new CompletableFuture<>();
  private final AtomicLong count = new AtomicLong(0L);
  private final Consumer<List<T>> callback;
  public Iteratee(Consumer<List<T>> callback){
    this.callback = callback;
  }
  @Override
  public void accept(Cursor<T> cursor, Throwable ex){
    if(ex != null){
      future.completeExceptionally(ex);
    } else try {
      List<T> list = cursor.get();
      callback.accept(list);
      count.addAndGet(list.size());
      if(! cursor.isFinished()){
        readNext(cursor).whenCompleted(this);
      } else {
        future.complete(count.get());
      }
    } catch(Throwable ex){
      future.completeExceptionally(ex);
    }
  }
}

public <T> CompletableFuture<Long> read(Consumer<List<T>> callback){
  Iteratee it = new Iteratee();
  readFirst().whenCompleted(it);
  return it.future;
}

IterateewhenComplete() で再帰的に実行されているように見えるが、whenCompleted() に渡されたラムダはその時点では実行されないことから再帰呼び出しではないことに注意。