본문 바로가기

개발 서적/모던 자바 인 액션

[모던 자바 인 액션] chapter 7. 병렬 데이터 처리와 성능 (1)

 


[목차]
chapter 1. 자바 8, 9, 10, 11 : 무슨 일이 일어나고 있는가?

chapter 2. 동작 파라미터화 코드 전달하기
chapter 3. 람다 표현식(1)
chapter 3. 람다 표현식(2)
chapter 4. 스트림 소개
chapter 5. 스트림 활용

chapter 6. 스트림으로 데이터 수집(1)
chapter 6. 스트림으로 데이터 수집(2)
chapter 7. 병렬 데이터 처리와 성능(1)


 

자바 7이 등장하기 전에는 데이터 컬렉션을 병렬로 처리하기 어려웠음. 다음과 같은 일련의 과정을 거쳐야 했음.

  • 데이터를 서브 파트로 분할
  • 분할된 서브파트를 각각의 스레드로 할당
  • race condition이 발생하지 않도록 동기화 작업 추가
  • 부분 결과를 합침

자바 7은 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 포크/조인 프레임워크(fork/join framework) 기능을 제공한다.

이 장에서는 스트림으로 데이터 컬렉션 관련 동작을 얼마나 쉽게 병렬로 실행할 수 있는지 설명한다.

병렬 스트림

  • 컬렉션에 parallelStream을 추출하면 병렬 스트림(parallel stream)이 생성된다.
  • 병렬 스트림이란 각각의 스레드에서 처리할 수 잇도록 스트림 요소를 여러 청크로 분할한 스트림이다.
  • 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.

순차 스트림을 병렬 스트림으로 변환하기

숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 다음과 같이 구현할 수 있다.

public long sequentialSum(long n) {
        return Stream.iterate(1L, i -> i + 1)
                     .limit(n)
                     .reduce(0L, Long::sum);
    }

n이 커진다면 이 연산을 병렬로 처리하는 것이 좋을 것이다. 순차 스트림에 parallel 메서드를 호출하면 기조ㅓㄴ의 함수형 리듀싱 연산이 병렬로 처리된다.

public long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1)
                     .limit(n)
                     **.parallel()**
                     .reduce(0L, Long::sum);
    }

→ 스트림이 여러 청크로 분할되어 병렬로 수행되고, 마지막으로 리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출함

내부적으로 parallel을 호출하면 이후 연산이 병렬로 수행해야함을 의미하는 불리언 플래그가 설정된다. 반대로 sequential로 병렬을 순차 스트림으로 바꿀 수 있다.

stream().parallel()
				.filter(...)
				.sequential()
				.map(...)
				.parallel()
				.reduce();

→ 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다. 이 예제에서는 마지막으로 호출된 parallel에 따라 병렬로 실행된다.

병렬 스트림에서 사용하는 스레드 풀 설정

스트림의 parallel 메서드에서 병렬로 작업을 수행하는 스레드는 어디서, 몇 개나, 어떻게 생성될까?

→ 내부적으로 ForkJoinPool을 사용하며, 이는 기본적으로 프로세서 수(Runtime.getRuntime().availableProcessors())가 반환하는 값에 상응하는 스레드를 가짐

스트림 성능 측정

성능 측정을 위해 자바 마이크로 벤치마크(Java Microbenchmark Harness, JMH)라는 라이브러리를 이용한다. JMH를 이용하면 간단하고, 어노테이션 기반 방식을 지원하며, 안정적으로 자바 프로그램이나 JVM을 대상으로 하는 다른 언어용 벤치 마크를 구현할 수 있다.

package jmh;

import org.openjdk.jmh.annotations.*;

import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
public class ParallelStreamBenchmark {
    private static final long N = 10_000_000L;

    @Benchmark
    public long sequentialSum() {
        return Stream.iterate(1L, i -> i + 1)
                     .limit(N)
                     .reduce(0L, Long::sum);
    }

    @Benchmark
    public long interactiveSum() {
        long result = 0;
        for(long i = 1L; i <= N; i++) {
            result += i;
        }
        return result;
    }

    @Benchmark
    public long parallelSum() {
        return Stream.iterate(1L, i -> i + 1)
                     .limit(N)
                     .parallel()
                     .reduce(0L, Long::sum);
    }

    @TearDown(Level.Invocation)
    public void tearDown() {
        System.gc();
    }
}

순차적인 스트림(sequentialSum), 전통적인 for루프 방식(interactiveSum), 병렬 스트림(parallelSum)의 성능 측정 결과 interactiveSum > sequentaialSum > parallelSum 순으로 빠른 처리 속도를 보임을 확인할 수 있었다.

  • 전통적인 for루프 방식이 빠른 이유? 순차적 스트림보다 병렬 스트림이 더 느린 이유?→ 반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.
  • → 반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다.

iterate 연산은 본질적으로 순차적이다. 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 iterate 연산을 청크로 분할하기 어렵다. → 스트림을 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 순차처리 방식과 크게 다른점이 없으므로 스레드를 할당하는 오버헤드만 증가

더 특화된 메서드 사용

iterate 대신 LongStream과 같은 기본형 특화 스트림을 이용해서 박싱 비용을 줄여보자. LongStream이 iterate에 비해 같는 장점은 다음과 같다.

  • LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라짐
  • LongStream.rangeClosed는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산함. 예를들어 1-20범위의 숫자를 각각 1-5, 6-10, 11-15, 16-20으로 분할할 수 있음
		@Benchmark
    public long rangedSum() {
        return LongStream.rangeClosed(1, N)
                         .reduce(0L, Long::sum);
    }

→ 측정 결과 iterate로 생성한 기존의 순차 스트림에 비해 처리 속도가 더 빠르다. (상황에 따라서는 알고리즘보다 적절한 자료구조가 중요함을 보여주는 사례)

새로운 버전에 병렬 스트림을 적용하면 성능이 향상될까?

		@Benchmark
    public long rangedSum() {
        return LongStream.rangeClosed(1, N)
                         .reduce(0L, Long::sum);
    }

→ 올바른 자료구조는 병렬 실행의 최적의 성능을 발휘할 수 있도록 한다. 순차 실행보다 빠른 성능을 갖는 병렬 리듀싱이 되었다.

결론적으로 함수형 프로그래밍을 올바르게 사용하면 최신 멀티 코어 CPU가 제공하는 병렬 실행의 힘을 단순하게 직접적으로 얻을 수 있음!

하지만 병렬화가 완전 공짜라는 아니라는 사실을 기억하자!

  • 스트림을 재귀적으로 분할하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 결과를 하나의 값으로 합치는 일련의 작업이 수반됨
  • 멀티코어 간의 데이터 이동은 비싼 비용을 치른다. 코어간 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 수행하는 것이 바람직함
  • 상황에 따라 아예 병렬화를 사용할 수 없는 때도 있음

스트림을 병렬화해서 코드 실행 속도를 높이고자 한다면 병렬화를 올바르게 사용하고 있는지 짚어봐야한다.

병렬 스트림의 올바른 사용법

병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문.

// n까지의 자연수를 더하면서 공유된 누적자를 바꾸는 코드
public long sideEffectSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n).forEach(accumulator::add);
        return accumulator.total;
    }

public class Accumulator {
    public long total = 0;
    public void add(long value) {
        total += value;
    }
}

위 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있으므로 병렬로 실행하면 참사가 일어난다.

  • total을 접근할 때마다 (다수의 스레드에서 동시에 데이터에 접근하는) 데이터 race condition 문제가 일어남
  • 동기화로 문제를 해결하려다보면 병렬화라는 특성이 사라짐

스트림을 병렬로 만들어서 어떤 일이 발생하는지 확인

public long sideEffectParallelSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
        return accumulator.total;
    }

System.out.println(
        measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs");

올바른 결과값(50000005000000)이 나오지 않는다. 여러 스레드에서 동시에 누적자의 total += value를 실행하면서 문제가 발생한다.

→ 병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피해야함을 확인

병렬 스트림 효과적으로 사용하기

  • 확신이 서지 않으면 직접 측정하라.
  • 박싱을 주의하라.(기본형 특화 스트림을 활용)
  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.
  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라
  • 소량의 데이터에서는 병렬 스트림이 도움되지 않는다.
  • 스트림을 구성하는 자료구조가 적절한지 확인하라.
  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
  • 최종 연산의 병합 과정 비용을 살펴보라.
반응형