본문 바로가기
Programming Language 이해하기/Java 이해하기

자바8 inAction - [7] 병렬데이터 처리와 성능

by simplify-len 2019. 8. 31.

병렬 데이터 처리와 성능

  • 병렬 스트림으로 데이터를 병렬 처리하기
  • 병렬 스트림의 성능분석
  • 포크/조인 프레임워크
  • Spliterator로 스트림 데이터 쪼개기

스트림을 이용하면 순차 스트림을 병렬 스트림으로 자연스럽게 바꿀 수 있다.

어떻게 이런 방법같은 일이 일어날 수 있는지, 더 나아가 자바7에 추가된 포크/조인 프레임워크와 내부적인 병렬 스트림 처리는 어떤 관계가 있는가?

우선 여러 청크를 병렬로 처리하기 전에 병렬 스트림이 요소를 여러 청크로 분활하는 방법과, 이 원리를 이해하지 못하면 의도치 않은 설명하기 어려운 결과가 발생할 수 있다. 따라서 커스텀 Spliterator를 직접 구현하면서 분할 과정을 우리가 원하는 방식으로 제어하는 방법도 설명한다.

병렬 스트림

parallelStream 을 호출하면 쉽게 생성 가능.

병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림.

따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.

숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 구현한다고 가정

public static long sequentialSum(long n){
  return Stream.iterate(1L, i -> i+ 1) // 무한 자연수 스트림 생성
                            .limit(n) // n개 이하로 제한
                            .reduce(0L Long::sum()); // 모든 숫자를 더하는 스트림 리듀싱 연산
}

n이 점점 커진다면 이 연산을 병렬로 처리하는 것이 좋을 것이다. 그럼 무엇부터 건드려야 할 까? 결과 변수는 어떻게 동기화해야 될까? 몇 개의 스레드를 사용해야 할까? 숫자는 어떻게 생성할까? 생성된 숫자는 누가 더할까?

병렬 스트림을 이용하면 걱정, 근심 없이 모든 문제를 쉽게 해결할 수 있다.

먼저 순차스트림을 병렬스트림으로 변경하는 방법은

public static long sequentialSum(long n){
  return Stream.iterate(1L, i -> i+ 1) // 무한 자연수 스트림 생성
                            .limit(n) // n개 이하로 제한
                            .parallel()  *****요 부분************ 
                            .reduce(0L Long::sum()); // 모든 숫자를 더하는 스트림 리듀싱 연산
}

내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 불린 플러그가 설정.

public static long sequentialSum(long n){
  return Stream.iterate(1L, i -> i+ 1) // 무한 자연수 스트림 생성
                            .limit(n) // n개 이하로 제한
                            .parallel()
                            .sequential()
                            .parallel()
                            .sequential()
                        .parallel()
                            .reduce(0L Long::sum()); // 모든 숫자를 더하는 스트림 리듀싱 연산
}

이렇게 반복될 경우에는 최종 parallel이 실행되면서 병렬로 실행 된다.

스트림 성능 측정

자 그럼 병렬화를 이용하면 성능이 더 좋아질까?

성능을 최적화할 때는 세 가지 황금 규칙을 기억해야 한다.

첫째도 측정, 둘째도 측정, 셋째도 측정!

public static <T, R> long measurePerf(Function<T, R> f, T input) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            R result = f.apply(input);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + result);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

일반적인 iterate sms 4msesc

public static long iterativeSum(long n) {
        long result = 0;
        for (long i = 0; i <= n; i++) {
            result += i;
        }
        return result;
    }

Stream을 활용한 일반 결과는 141 msecs

public static long sequentialSum(long n) {
        return Stream.iterate(1L, i -> i + 1)
          .limit(n)
          .reduce(Long::sum)
          .get();
    }

Stream을 활용한 병렬 결과는 125 msecs

public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
    }

왜 이런 결과가 나온걸까?

  • iterate가 박싱된 객체를 생성하므로 이를 다시 언방싱하는 과정이 필요했다.
  • iterates는 병렬로 실행될 수 있도록 독립적인 청크로 분할하기가 어렵다.

두 번쨰 문제는 예사롭게 넘길 수 없다. 우리에겐 병렬로 수행될 수 있는 스트림 모델이 필요하기 때문에.

또한 iterate는 본질적으로 순차적이다.

이처럼 병렬 프로그래밍은 까다로우며 때로는 이해하기 어려운 함정도 숨어 있다. 심지어 병렬프로그래밍은 오용(예를 들어 병렬과 거리가 먼 iterate를 사용)하면 오히러 전체 프로그램의 성능이 더 나빠질 수 있다. 따라서 마법 같은 parallel 메서드를 호출했을 때 내부적으로 어떤 일이 일어나는지 꼭 이해해야 한다.

조금더 나은 방법은 뭐가 있을까?

멀티 코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실행하려면 어떻게 해야 될까?

  • LongStream.rangeClosed 는 기본적으로 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.
  • LongStream.rangeClosed 는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다. 예를 들어 1-20 범위의 숫자를 각각 1-5, 6-10, 11-15, 16-20 범위의 숫자로 분할할 수 있다.

즉, LongStream과 같은 스트림을 활용하면 더 최적화된 병렬화된 결과를 가져올 수 있다. 올바른 자료구조를 선택해야 병렬실행도 최적의 성능을 발휘할 수 있다는 사실을 확인할 수 있다.

하지만 병렬화가 완전 공짜는 아니라는 사실을 기억하자.

병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 한다. 멀티코어 간의 데이터 이동은 우리 생각보다 비싸다. 따라서 코어 간에 데이터 전송 시간보다 휠씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다. 또한 상황에 따라 쉽게 병렬화를 이용할 수 있거나 아니면 아예 병렬화를 이용할 수 없는 때도 있다. 그리고 스트림을 병렬화해서 코드 실행 속도를 빠르게 하고 싶으면 항상 병렬화를 올바르게 사용하고 있는지 확인해야 한다.

병렬 스트림 효과적으로 사용하기.

  • 확신이 서지 않는다면 직접 측정하라. 순차 스트림을 병렬 스트림으로 쉽게 바꿀 수 있다. 하지만 무조건 병렬 스트림으로 바꾸는 것이 능사는 아니다. 이미 살펴본 것처럼 언제나 병렬 스트림이 순차스트림보다 따른 것이 아니기 때문이다. 더욱이 병렬 스트림의 수행 과정이 투명하지 않을 때가 많다. 따라서 순차 스트림과 병렬 스트림 중 어떤 스트림이 좋은지 모르겠다면 적절한 벹치마크로 직접 성능을 측정하는 것이 바람직하다.
  • 박싱을 주의하라. 자동박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소다. 자바8은 박싱 독장을 피할 수 있도록 기본형 특화스트림(InsStream, LongStream, DoubleStream) 을 제공한다. 따라서 되록이면 기본형 특화 스트림을 사용하는 것이 좋다.
  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. 특히 limit나 findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다. 예를 들어 findAny는 요소의 순서와 상관없이 연산하므로 findFirst보다 성능이 좋다. 정렬된 스트림에 unorederd 를 호출하면 비정렬된 스트림을 얻을 수 있다. 스트림에 N개 요소가 있을 때 요소의 순서와 상관없다면 비정렬된 스트림에 limit를 호출하는 것이 더 효율적
  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라. 처리해야 할 요소 수가 N이고 하나의 요소를 처리하는 데 드는 비용이 Q라 하면 전체 스트림 파이프라인 처리 비용은 N * Q로 예상할 수 있다. Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미
  • 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다. 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부과 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문
  • 스트림을 구성하는 자료구조가 적절한지 확인하라. 예를 들어 ArrayList를 LinkedList보다 효율적으로 분할, LinkedList를 분할하려면 모든 요소를 탐색해야 하지만 ArrayList는 요소를 탐색하지 않고도 리스트를 분할 할수 있기 때문. 또한 range 팩토리 메서드로 만든 기본형 스트림도 쉽게 분해.
  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다. 예를 들어 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할 할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있다. 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.
  • 최종 연산의 병합 과정(Collector의 Combiner 메서드) 비용을 살펴보라. 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분결과를 합치는 과정에서 상쇄.

포크/조인 프레임워크

포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브테스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계. 포크/조인 프레임워크에서는 서브테스크를 스레드 풀(ForkJoinPool) 의 작업자 스레드에 분산할당하는 ExecutorService 인터페이스를 구현.

RecursiveTask 활용

포크/조인 프레임워크를 제대로 사용하는 방법

작업 훔치기

Spliterator

Spliterator는 '분할 할수 있는 반복자(spliable iterator)' 라는 의미. Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬 작업에 특화.

모든 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공

이번에는 원리를 이해해보자.

public interface Spliterator<T> {
  boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
  long estimateSize();
  int characteristics();
}

여기서 T는 Spliterator에서 탐색하는 요소의 형식을 가리킨다. tryAdvence 메서드는 Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환한다. ( 즉, 일반적인 Iterator 동작과 동일)

반면 trySplit 메서드는 Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성하는 메서드.

Spliterator에서는 estimateSize 메서드는 탐색해야 할 요소 수 정보를 제공할 수 있다. 특히, 탐색해야 할 요소 수가 정확하진 않더라도 제공된 값을 이용해서 더 쉽고 공평하게 Spliterator를 분할 할수 있다.

결과

  • 내부 반복을 이용하면 명시적으로 다른 스레드를 사용하지 않고도 스트림을 병렬로 처리할 수 있다.
  • 간단하게 스트림을 병렬로 처리할 수 있지만 항상 병렬 처리가 빠른 것은 아니다. 병렬 소프트웨어 동작 방법과 동성능은 직관적이지 않을 때가 많으므로 병렬 처리를 사용했을 때 성능을 직접 측정해봐야 한다.
  • 병렬 스트림으로 데이터 집합을 병렬 실행할 때 특히 처리해야 할 데이터가 아주 많거나 각 요소를 처리하는데 오랜 시간이 걸릴 때 성능을 높일 수 있다.
  • 가능하면 기본형 특화 스트림을 사용하는 등 올바른 자료구조 선택이 어떤 연산을 병렬로 처리하는 것보다 성능적으로 더 큰 영향을 미칠 수 있다.
  • 포크/조인 프레임워크에서는 병렬화할 수 있는 태스크를 작은 태스크로 분할한 다음에 분할된 태스크를 각각의 스레드로 실행하며 서브태스크 각각의 결과를 합쳐서 최종 결과를 생산한다.
  • Spliterator는 탐색하려는 테이터를 포함하는 스트림을 어떻게 병렬화할 것인지 정의한다.

댓글