CompletableFuture

페이스북이나 트위터등에서 데이터를 수집하여 구글의 번역기를 사용해서 자연어 처리를 하는 프로그램은 각기 다른 서비스에서 다양한 데이터를 요청해야 하고, 번역기를 처리하는 시간 등을 고려하면 해당 애플리케이션을 설계하는 과정에서 병렬성(parallelism)과 동시성(concurrency)을 잘 적용해야 합니다.

병렬성은 하나의 동작을 하위 동작으로 분할하고 각각의 하위 동작을 다른 코어, 다른 CPU, 다른 기기로 할당하여 처리합니다. 반면 하나의 CPU 사용을 가장 극대화할 수 있도록 느슨하게 연관된 여러 작업을 수행해야 하는 상황이라면 원격 서비스 결과를 기다리거나, 데이터베이스 결과를 기다리면서 스레드를 블록하기를 원치 않을 것 입니다.

지금은 Future 인터페이스를 사용해서 동시성을 적용하는 방법을 알아보도록 하겠습니다. Future는 저수준 스레드에 비해 직관적으로 이해하기 쉽다는 장점이 있으며, Future를 이용하려면 시간이 오래 걸리는 작업을 Callable 객체 내부로 감싼 다음에 ExecutorService에 제출하면 됩니다.

Future

JDK 5부터 미래의 어느 시점에 결과를 얻는 모델에 활용할 수 있도록 Future 인터페이스를 제공하고 있습니다. 비동기 계산을 모델링하는데 Future를 이용할 수 있으며, Future는 계산이 끝났을 때 결과에 접근할 수 있는 레퍼런스를 제공합니다. 시간이 걸릴 수 있는 작업을 Future 내부로 설정하면 호출자 스레드가 결과를 기다리는 동안 다른 유용한 작업을 수행할 수 있습니다.


ExecutorService executorService = Executors.newCachedThreadPool();
 
Future<Double> future = executorService.submit(new Callable<Double>() {
  public Double call() {
    return doSomeLongComputation();
  }  
});
 
doSomeLongComputation
 
try {
  Double result = future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
    // ...
}

Future 클래스는 비동기 계산이 끝났는지 확인할 수 있는 isDone(), 타임아웃 기간을 결정하고 결과를 출력하는 get() 등이 있습니다. 간단한 비동기 처리는 되지만 조금 아쉽습니다. 실무에서는 비동기 처리가 꼭 하나씩 생긴다고 볼 수 없으며, 각 비동기 처리에 대한 결과를 동기를 맞춰 또 다른 결과를 내야할 수 도 있습니다. 각 Future 클래스 간 여러 의존성에 대한 관리가 힘들 수 있습니다.

Future로 비동기 처리를 하기 위해선 1) 두 개의 비동기 계산 결과를 하나로 합칠 수 있어야 합니다. 두 가지 계산 결과는 서로 독립적일 수 있으며 또는 두 번째 결과가 첫 번째 결과에 의존하는 상황일 수 있습니다. 2) Futere 집합이 실행하는 모든 태스크의 완료를 기다립니다. 3) Future 집합에서 가장 빨리 완료되는 태스크를 기다렸다가 결과를 얻습니다. 4) 수동으로 Future를 완료시킵니다. 5) Future 완료 동작에 반응해야 합니다.

앞서 소개한 기능을 JDK 8에서는 CompleteableFuture 를 통해 제공하며, StreamOptional 같이 람다표현식과 파이프라인을 사용하여 비동기 작업을 조합할 수 있습니다. 일단 CompleteableFuture 의 간단한 예제는 아래와 같습니다. CompleteableFuture 는 기본적으로 supplyAsync(), runAsync() 등 팩토리 메소드를 제공하며, 쉽게 비동기 작업을 수행할 수 있습니다.

비동기 API 구현

사용자가 이 API(calculatePrice)를 호출하면 비동기 동작이 완료될 때까지 1초 동안 블록이 되는 코드입니다. 이러한 코드는 바람직하지 않습니다. 일단은 동기 API를 비동기로적으로 소비하는 방법을 설명하기 위한 가상의 예제코드 입니다.


private double calculatePrice(final String product) {
  delay();
  return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

비동기 코드로 변환

JDK 5부터 비동기 계산의 결과를 표현할 수 있는 java.util.concurrent.Future 인터페이스를 제공합니다. 간단히 말해, Future는 결과값의 핸들일 뿐이며 계산이 완료되면 get()으로 결과를 얻을 수 있습니다.


public Future<Double> getPriceAsync(final String product) {
  // 계산 결과를 포함할 CompletableFuture 생성
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  // 비동기 작업 수행
  new Thread(() -> {
    // double price = calculatePrice(product);
    // futurePrice.complete(price);
    try {
      double price = calculatePrice(product);
      futurePrice.complete(price);
    } catch (Exception ex) {
      futurePrice.completeExceptionally(ex);
    }
  }).start();
  // 계산결과와 상관없이 `Future` 반환
  return futurePrice;
}

// Main
try {
  // `Future`에서 가격 정보를 읽고 가격 정보가 없으면 블록
  double price = futurePrice.get();
  System.out.printf("Price is %.2f%n", price);
} catch (ExecutionException | InterruptedException e) {
  throw new RuntimeException(e);
}

Future 계산시 예외가 발생하면 futurePrice.completeExceptionally(ex);와 같은 ExecutionException 예외가 발생합니다. 그리고 CompletableFuture를 좀 더 쉽게 생성할 수 있는 팩토리 메서드인 supplyAsync()도 제공합니다.


public Future<Double> getPrice(String product) {
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

비블록 코드

아래 코드에 문제점이 있을까요? 코드상 문제가 있을리는 없지만, 만약 아래 코드가 비동기로 동작한다면


public List<String> findPricesSequential(String product) {
  return shops.stream()
      .map(shop -> shop.getName() + " price is " + shop.getPrice(product))
      .collect(Collectors.toList());
}

CompletableFuture로 비동기 호출 구현

CompletableFuture를 사용해서 List<CompletableFuture<String>> 타입이 반환되고, map()을 통해서 List<String>으로 변환하고 해당 결과를 toList()를 사용해서 리스트로 반환합니다. CompletableFuture::joinget()과 동일한 의미를 갖는 것으로 join()은 아무 에외도 발생시키지 않습니다. 따라서 두 번째 map은 람다 표현식을 try/catch로 감쌀 필요가 없습니다.

public List<String> findPricesFuture(String product) {
  List<CompletableFuture<String>> priceFutures =
    shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
            + shop.getPrice(product), executor))
    .collect(Collectors.toList());

  List<String> prices = priceFutures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());
  return prices;
}

커스텀 Executor

작업량을 고려한 풀에서 관리하는 스레드 수에 맞게 Executor를 만들 수 있으면 좋을 것입니다.


private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() {
  @Override
  public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setDaemon(true);
      return t;
  }
});

파이프라인

thenApply(), thenCompose()등을 이용해서 Futrue를 조합해서 파이프라인을 구성할 수 있습니다.


public Stream<CompletableFuture<String>> findPricesStream(String product) {
  return shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
    .map(future -> future.thenApply(Quote::parse))
    .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

참고