병렬 데이터 처리와 성능

자바 개발자는 컬렉션 데이터 처리 속도를 높이려고 따로 고민할 필요가 없습니다. 현대의 컴퓨터는 멀티코어를 활용해서 파이프라인 연산을 실행할 수 있음이 가장 중요한 특징으로 JDK 7 이후 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 포크/조인(fork/join) 프레임워크 기능 제공합니다.

병렬 스트림

아주 간단한 예로 시작해보겠습니다. 1 ~ n까지 전체 합을 구하는 메서드는 아래와 같이 구현할 수 있습니다. 아래코드는 대부분의 개발자가 손쉽게 작성할 수 있는 가장 간단한 방법(정말?) 입니다.


public static long iterativeSum(long n) {
  long result = 0;
  for (long i = 0; i <= n; i++) {
      result += i;
  }
  return result;
}

하지만 JDK 8에서 도입된 스트림을 사용하면 for구문을 아래와 같이 무한 스트림으로 변경 가능 합니다. reduce()BinaryOperator를 전달하기만 하면 합계가 아니라 다른 연산도 가능합니다.


public static long sequentialSum(long n) {
  return Stream.iterate(1L, i -> i + 1)
    .limit(n)
    .reduce(Long::sum)
    .get();
}

그런데 n이 엄청나게 커진다면 어떻게 해야 할까요?

순차 스트림을 병렬 스트림으로 변환

n이 엄청나게 커진다면 해당 연산을 병렬로 처리하면 어떨까요? 자바의 컬렉션에 parallel을 적용하면 병렬 스트림이 생성됩니다. 개별 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크(chunk)로 분할한 스트림이 만들어지며 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당 됩니다.


public static long parallelSum(long n) {
  return Stream.iterate(1L, i -> i + 1)
    .limit(n)
    .parallel() // 병렬로 가자!
    .reduce(Long::sum)
    .get();
}

순차 스트림에 parallel 메서드를 호출하면 기존의 리듀싱 연산이 병렬 처리되지만, 순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화가 없습니다. 내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 Boolean 플레그가 설정되며, 반대로 병렬 스트림을 sequential 스트림으로 변환 가능합니다.

스트림 성능 측정

병렬처리에 확신이 없다면 측정을 통해서 확인하며 진행; 컬렉션에선 순차 스트림을 병렬 스트림으로 쉽게 변환 가능하기 때문에 일단 병렬 스트림으로 바꾸면 될 것 같지만 그러한 병렬 처리가 꼭 효율적이라 말 할 수 없습니다. 따라서, 순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 적절한 벤치마크로 직접 성능 측정하는 것이 바람직합니다. 성능을 최적화할때 세 가지 황금 규칙은 아래와 같습니다.

  1. 측정, 2. 測定, 3. Measure

측정을 통해 스트림 성능을 판단해야 하며, 올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘 가능 합니다. 병렬화는 공짜가 아닙니다. 병렬화를 이용하려면 스트림을 재귀적으로 '분할', 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 '할당', 이 결과를 하나의 값으로 '합쳐야' 합니다.

앞에서 본 덧셈 예제의 경우 병렬 계산이 순차 계산보다 늦게 작동하는 경우가 존재합니다. 최악의 경우 병렬처리가 순차처리와 비슷할 것으로 예상되나, 병렬 계산이 더 느려지기도 하는데 이유가 뭘까요? 대부분의 경우 이전 연산의 결과에 따라 함수의 입력이 달라지지면 iterate 연산을 청크로 분할하기가 어렵기 때문입니다.

멀티코어 프로세스에서 합계 연산을 병렬로 실행하기 위해선 LongStream.rangeClosed라는 메서드를 사용하면 됩니다. 해당 연산은 iterate 연산에 비해서 몇가지 장점을 가지고 있습니다. 기본적으로 박싱과 언박싱 오버헤드가 없으며, 쉽게 청크로 분할할 수 있는 숫자 범위를 생성합니다.


public static long rangedSum(long n) {
  return LongStream.rangeClosed(1, n)
    .reduce(Long::sum)
    .getAsLong();
}

public static long parallelRangedSum(long n) {
  return LongStream.rangeClosed(1, n)
    .parallel()
    .reduce(Long::sum)
    .getAsLong();
}

  1. 박싱 주의; 오토/언박싱은 성능을 크게 저하시킬 수 있는 요소 중 하나입니다. JDK 8은 박싱 동작을 피할 수 있도록 기본형 특화 스트림 을 제공(IntStream, LongStream, DoubleStream)하기 때문에 병렬 처리를 위해선 기본형 특화 스트림을 사용하는 것이 좋습니다. 앞선 본 예제에서 확인 할 수 있듯이 오토/언박싱 등의 오버헤드를 수반하기 때문에 특화 스트림을 적절히 사용해야 합니다. 그리고 멀티코어 간의 데이터 이동은 우리 생각보다 많은 비용을 지불해야 하기 때문에 코어간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직합니다.

public static long sideEffectSum(long n) {
  Accumulator accumulator = new Accumulator();
  LongStream.rangeClosed(1, n).forEach(accumulator::add);
  return accumulator.total;
}

public static long sideEffectParallelSum(long n) {
  Accumulator accumulator = new Accumulator();
  LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
  return accumulator.total;
}

public static class Accumulator {
  private long total = 0;
  public void add(long value) {
    total += value;
  }
}

  1. 아토믹(atomic) 연산; sideEffectSum 예제에서 확인할 수 있듯이 total += value;와 같은 연산은 아토믹 연산과 유사해 보이지만 아토믹 연산이 아니기 때문에 forEach 블럭에서 계속해서 상태를 변경하는 add 메서드 때문에 상태 공유에 따른 부작용이 발생합니다. 병렬 스트림을 잘못 사용하면서 발생하는 문제의 대부분은 공유된 상태를 바꾸는 알고리즘을 사용하기 때문이며 따라서 병렬 스트림과 병렬 계산에서는 공유된 변경 가능한 상태를 피해야 합니다.

  2. 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산 존재; limit, findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치뤄야 합니다. findAny는 요소의 순서와 상관없이 연산하므로 findFirst보다 성능이 좋으며, 정렬된 스트림에 unordered를 호출하면 비정렬된 스트림을 반환합니다. 따라서 스트림에서 수행하는 전체 파이프라인 연산 비용 고려해야 하며, 소량의 데이터에서는 병렬 스트림이 도움되지 않기 때문에 적절한 데이터 크기를 고려해야 합니다. 이런 문제의 핵심은 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문입니다.

  3. 스트림을 구성하는 자료구조가 적절한지 확인; 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있습니다. 최종 연산의 병합 과정 비용도 확인해야 하며, 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분결과를 합치는 과정에서 상쇄될 수 있습니다. 분해와 관련된 다양한 스트림 중에서 병렬화에 적합한 자료구조의 적합도는 ArratList, IntStream.range가 가장 좋으며, HashSet, TreeSet은 사용할만하며, LinkedList, Stream.iterate를 병렬처리에 꺼려야 합니다.

포크/조인(Fork/Join) 프레임워크

병렬화할 수 있는 작업의 경우 해당 작업을 작게 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 구할 수 있습니다. ExecutorService 인터페이스 구현체를 스레드 풀(ForkJoinPool)의 태스크로 등록하면 해당 작업을 손쉽게 사용할 수 있습니다.

RecursiveTask 활용

스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 생성해야 합니다. 제네릭 타입의 R은 병렬처리되는 태스크가 생성하는 결과 타입이며 결과가 없을 때는 RecursiveAction 형식입니다. RecursiveTask를 정의하기 위해선 추상 메서드인 compute 를 구현하면 됩니다. compute 메서드는 테스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의합니다.

일반적으로 어플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않습니다. 즉, 소프트웨어의 필요한 것에서 언제든 가져다 쓸 수 있도록 ForkJoinPool을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장합니다.

ForkJoinSumCalculator 예제

ForkJoinSumCalculatorForkJoinPool로 전달(forkJoinSum)하면 풀의 스레드가 ForkJoinSumCalculatorcompute 메서드를 실행하면서 작업 수행하며, compute 메서드는 병렬로 실행할 수 있을만큼 태스크의 크기가 충분히 작아졌는지 확인, 아직 태스트의 크기가 크다고 판단되면숫자 배열의 반으로 분할해서 두 개의 새로운 ForkJoinSumCalculator로 할당됩니다. 그러면 다시 ForkJoinPool이 새로 생성된 ForkJoinSumCalculator를 실행합니다. 결국 이 과정이 재귀적으로 반복되면서 주어진 조건을 만족할 떄 까지 태스크 분할을 반복합니다.


public class ForkJoinSumCalculator extends RecursiveTask<Long> {

    public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();

    public static final long THRESHOLD = 10_000;

    private final long[] numbers;
    private final int start;
    private final int end;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
        leftTask.fork();
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return FORK_JOIN_POOL.invoke(task);
    }
}

포크/조인 프레임워크를 제대로 사용하는 방법

  1. join 메서드를 태스크에 호출하면 태스크가 생산하는 결과를 준비될 때까지 호출자를 블록합니다. 따라서, 두 서브태스크가 모두 시작된 다음에 join을 호출 해야 합니다. 만약 다른 태스크가 끝나길 기다리는 일이 발생하면 원래 순차 알고리즘보다 느리고 복잡한 프로그램이 됩니다.

  2. RecursiveTask 내에서는 ForkJoinPoolinvoke 메서드를 호출하지 않도록 합니다. computefork 메서드를 직접 호출 가능하고 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용하세요.

  3. 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정 조절이 가능합니다. 왼쪽 작업과 오른쪽 작업 모두 fork 메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에는 fork를 호출하는 것보다 compute를 호출하는 것이 효율적이며, 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있습니다.

  4. 병렬 계산은 디버깅하기 어렵습니다. 보통 IDE로 디버깅할 때 스택 프레이스로 문제가 일어난 과정을 쉽게 확인 가능하나, 포크/조인 프레임워크에서는 fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움되지 않습니다.

  5. 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠를 거라는 생각은 금물입니다. 병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브태스크로 분할 가능해야 하며, 각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야 합니다.

작업 훔치기

앞선 예제에선 덧셈을 수행할 숫자가 만개 이하면 서브태스크 분할을 중단했습니다. 기준값을 비꿔가면서 측정하는 방법 외에는 좋은 기준을 찾을 뾰족한 방법이 없습니다.

우선 천만 개 항목을 포힘히는 배열을 시용하면 ForkJoinSumCalculator는 천 개 이싱의 서브 태스크를 포크할 것입니다. 대부분의 기기에는 코어가 네 개뿐이므로 천 개 이상의 서브태스크는 자원만 낭비하는 것 같아 보일 수 있습니다. 실제로 각각의 태스크가 CPU로 할당되는 상황이라면 어차피 천 개 이상의 서브태스크로 분할한다고 해서 성능이 좋아지지는 않을 것입니다.

코어 개수와 관계없이 적절한 크기로 분할된 많은 태스크를 포킹하는 것이 바람직합니다. 이론적으로는 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하며 모든 CPU 코어에서 태스크를 실행할 것이고 크기가 같은 각각의 태스크는 같은 시간에 종료될 것이라 생각할 수 있습니다. 하지만 복잡한 시나리오가 사용되는 현실에서는 각각의 서브태스크의 작업완료 시간이 크게 달라질수 있습니다. 분할 기법이 효율적이지 않았기 때문일수도 있고, 예기치 않게 디스크 접근 속도가 저하되었거나 외부 서비스와 협력하는 과정에서 지연이 발생 할 수도 있기 때문입니다.

작업 훔치기(work stealing)라는 기법을 통해 위 문제를 해결 가능합니다. ForkJoinPool의 모든 스레드를 거의 공정하게 분할합니다. 각각의 스레드는 자신에세 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때 마다 큐의 헤드에서 다른 테스크를 가져와 작업을 처리합니다. 이때 한 스레드는 다른 스레드 보다 자신에게 할당된 태스크를 더 빨리 처리 가능합니다. 즉, 다른 스레드는 바쁘게 일하고 있는데 한 스레드는 할 일이 다 떨어진 상황입니다. 이때 할일이 없는 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드 큐의 꼬리에서 작업을 가져(훔쳐)옵니다. 모든 태스크가 작업을 끝낼 때 까지 모든 큐가 빌 때까지 이 과정을 반복하기 때문에 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지 가능합니다.

Spliterator

Spliterarot는 "분할할 수 있는 반복자" 라는 의미로 Iterator처럼 소스의 요소 탐색 기능을 제공하는 점은 같지만 Spliterator는 병렬 작업에 특화되어 있습니다. Spliterator 인터페이스는 여러 메서드를 정의합니다.

일반적으로 단어를 카운팅하는 코드는 아래와 같습니다.


public static final String SENTENCE =
  " Nel   mezzo del cammin  di nostra  vita " +
  " mi  ritrovai in una  selva oscura " +
  " che la  dritta via era   smarrita ";

public static int countWordsIteratively(String s) {
  int counter = 0;
  boolean lastSpace = true;
  for (char c : s.toCharArray()) {
    if (Character.isWhitespace(c)) {
        lastSpace = true;
    } else {
        if (lastSpace) counter++;
        lastSpace = Character.isWhitespace(c);
    }
  }
  return counter;
}

함수형으로 단어 개수 계산 메서드 재구현하는 방법은 아래와 같습니다. 그런데 아래 코드에서 볼 수 있듯이 안타깝게도 스트림은 int, long, double 기본형만 제공하기 때문에 Stream<Character>을 사용해야 합니다.


public static int countWords2(String s) {
  // Stream<Character> stream = IntStream.range(0, s.length())
  //                                   .mapToObj(SENTENCE::charAt).parallel();
  Spliterator<Character> spliterator = new WordCounterSpliterator(s);
  Stream<Character> stream = StreamSupport.stream(spliterator, true);

  return countWords(stream);
}

단어 개수를 계산하기 위해서 간단한 클래스를 작성하였습니다.

private static class WordCounter {
  private final int counter;
  private final boolean lastSpace;

  public WordCounter(int counter, boolean lastSpace) {
      this.counter = counter;
      this.lastSpace = lastSpace;
  }

  public WordCounter accumulate(Character c) {
      if (Character.isWhitespace(c)) {
          return lastSpace ? this : new WordCounter(counter, true);
      } else {
          return lastSpace ? new WordCounter(counter+1, false) : this;
      }
  }

  public WordCounter combine(WordCounter wordCounter) {
      return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);
  }

  public int getCounter() {
      return counter;
  }
}

이것은 병렬로 처리하는 방법은 아래와 같습니다.

private static class WordCounterSpliterator implements Spliterator<Character> {

  private final String string;
  private int currentChar = 0;

  private WordCounterSpliterator(String string) {
      this.string = string;
  }

  @Override
  public boolean tryAdvance(Consumer<? super Character> action) {
      action.accept(string.charAt(currentChar++));
      return currentChar < string.length();
  }

  @Override
  public Spliterator<Character> trySplit() {
      int currentSize = string.length() - currentChar;
      if (currentSize < 10) {
          return null;
      }
      for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
          if (Character.isWhitespace(string.charAt(splitPos))) {
              Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
              currentChar = splitPos;
              return spliterator;
          }
      }
      return null;
  }

  @Override
  public long estimateSize() {
      return string.length() - currentChar;
  }

  @Override
  public int characteristics() {
      return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
  }
}

분할 과정

스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어납니다. trySplit의 결과가 null이 될때까지 재귀적으로 요소를 분합니다. 이 분할과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받습니다.

characteristics라는 추상 메서드 정의합니다. Characteristics 메서드는 Spliterator 자체의 특성 집합을 포함하는 int 반환합니다.

  • ORDERED; 리스트처럼 요소에 정해진 순서가 있으므로 Spliterator는 요소를 탐색하고 분할할 때 이 순서에 유의
  • DISTINCT; x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환
  • SORTED; 탐색된 요소는 미리 정의된 정렬 순서를 따름
  • SIZED; 크기가 알려진 소스로 Spliterator를 생성했으므로 estimatedSize()는 정확한 값을 반환
  • NONNULL; 탐색하는 모든 요소는 null이 아님
  • IMMUTABLE; 이 Spliterator의 소스는 불변. 즉, 요소를 탐색하는 동안 요소를 추가하거나, 삭제하거나, 고칠 수 없음
  • CONCURRENT; 동기화 없이 Spliterator의 소스를 여러 스레드에서 동시게 고칠 수 있음
  • SUBSIZED; 이 Spliterator 그리고 분할된 모든 Spliterator는 SIZED 특성을 갖음

참고