본문 바로가기
프로그래밍/JAVA 내용정리

[ModernJavaInAction] 7장 병렬스트림 정리 2편

by 노잼인간이라불립니다 2022. 10. 11.

이 글은 ModernJavaInAction이란 책을 읽고 요약 정리한 글 입니다.

 

모든 실습 코드는 아래 주소에 있습니다.

https://github.com/jojojojocho/mordernjavainaction

 

GitHub - jojojojocho/mordernjavainaction: 모던자바인액션 연습코드

모던자바인액션 연습코드. Contribute to jojojojocho/mordernjavainaction development by creating an account on GitHub.

github.com

 

1편에 이어서

자바 7부터 병렬화 처리를 쉽게 하도록 도와주는

포크 / 조인 프레임워크 대해 알아 보도록 하죠.

 

포크 / 조인 프레임 워크

포크 / 조인 프레임 워크는 병렬화 할 수 있는 메인 태스크를

재귀를 이용하여 작은 서브 태스크로 분할하여 계산 후,

서브태스크의 각각의 결과를 합쳐서 전체 결과로 도출할 수 있도록 설계 되어 있습니다. 

 

포크 / 조인 프레임 워크에서는 서브 태스크를

스레드풀의 스레드에 분산, 할당하는 ExecutorService 인터페이스를 구현합니다.

 

 

스레드 풀을 이용하려면 해당 클래스에 RecursiveTask<R>을 상속받아야 합니다.

R은 병렬화된 태스크의 결과타입, 또는 결과가 없을 경우 RecursiveAction 타입입니다.

 

RecursiveTask 안에는 추상 메서드 compute가 선언 되어 있기 때문에

상속 받은 클래스에서는 반드시 재 정의를 해주어야 합니다.

 

compute 메서드는 태스크 분할 기준에 따라 태스크를 서브태스크로 분할하는 로직과

더 이상 분할할 수 없을 경우의 서브태스크를 실행하여 결과값을 도출할 로직을 정의 해야 합니다.

 

예제코드로 살펴보겠습니다.

 

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    private final long[] numbers; // 이 계산기에서 더할 숫자배열
    private final int start; // 초기 위치
    private final int end; // 마지막 위치
    public static final long THRESHOLD = 10_000; // 서브태스크로 나눌지를 결정하는 기준.

    /**
     * 메인 태스크 생성시 사용할 생성자
     *
     * @param numbers
     */
    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers,0, numbers.length);
    }

    /**
     * 서브태스크 생성시 사용할 생성자
     *
     * @param numbers
     * @param start
     * @param end
     */
    public ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }


    /**
     * RecursiveTask의 추상메서드를 재정의한 메서드.
     *
     * @return
     */
    @Override
    protected Long compute() {
        int length = this.end - this.start; // 이 태스크의 배열의 길이

        if(length <= THRESHOLD) {
            return computSequentially(); // 서브태스크 분할기준과 같거나 낮을 경우 순차적으로 계산한다.
        }

        ForkJoinSumCalculator leftTask =
                new ForkJoinSumCalculator(numbers, start, start + length / 2);
        leftTask.fork(); // leftTask를 비동기로 실행

        ForkJoinSumCalculator rightTask =
                new ForkJoinSumCalculator(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute(); // rightTask를 동기로 실행

        Long leftResult = leftTask.join(); // 비동기로 실행한 결과를 가져옴.
        return leftResult + rightResult;


    }

    private long computSequentially() {
        long sum = 0;
        for(int i= this.start; i < this.end; i++){
            sum += numbers[i];
        }
        return sum;
    }

    public static long forkJoinSum(long n){
        long[] numbers = LongStream.rangeClosed(1,n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

재 정의된 compute 메서드를 살펴보자면,

1. 처음의 if 문에서는 더 이상 서브태스크로 분할 할 수 없을 경우에 실행되는

계산메서드의 실행결과를 return하도록 되어 있고,

2. 그 아래에는 계속해서 서브태스크로 분할 하도록 하는 로직이 정의 되어 있습니다.

 

3. 마지막에는 분할한 모든 결과를 더해서 return하도록 하고 있습니다.

 

 

ForkJoinSumCalculator의 실행 flow를 살펴보자면,

1. 맨 아래에 있는 forkJoinSum에서 ForkJoinSumCalculator는 task로 정의 되고,

ForkJoinPool의 invoke 메서드의 인수로 전달됩니다.

 

2. ForkJoinPool에 의해 호출된 ForkJoinSumCalculator는 내부의 compute 메서드를 실행시킵니다.

 

3. compute 메서드는 병렬로 실행 할 수 있을 만큼의 태스크의 크기인지 확인합니다.

 

4. 태스크의 크기가 분할 가능하다면, 분할 하여 다시 ForkJoinSumCalculator에 할당합니다.

 

5. 그러면 ForkJoinPool이 새로운 작업자 스레드에 새로 생성한 ForkJoinSumCalculator를 할당하게 됩니다.

 

6. 이렇게 재귀 방식으로 진행되어 leftResult와 RightResult를 더하며 최종 결과를 도출하게 됩니다.

 

 

이러한 포크 / 조인 프레임 워크도 제대로 사용하지 않으면, 비효율 적일 수 있습니다.

그럼 포크 / 조인 프레임 워크를 좀 더 효율적으로 사용할 수 있는 방법을 알아 볼까요?

 

1. 포크 / 조인 프레임 워크는  join메서드를 호출 시키는 타이밍이 중요합니다!!

join 메서드를 호출하게 되면 서브태스크가 끝날 때 까지 호출자를 블록시키기 때문에

태스크가 끝나기 전에 무분별하게 호출하게 되면 각각의 서브태스크들의 결과 값이 나올 때 까지

호출자 들은 블록이 되기 때문에 오히려 순차 실행되는 스트림보다 느릴 수 있습니다.

그러므로 fork와 compute가 다 실행된다음 join 메서드를 사용 해야 합니다.

 

2. RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하면 안됩니다!!

ForkJoinPool의 invoke 메서드는 return 값이 task.join 이므로

위에 설명했던 이유와 같은 이유로 성능에 영향이 있을 수 있으므로 쓰면 안됩니다!!

 

3. 최적화를 위해서는 벤치마킹을 통해 성능 측정하는 것이 바람직합니다!!

어쨌든간 우리는 컴퓨터가 아니기 때문에 최적의 성능을 뽑아내기 위해서는

컴퓨터가 실행한 결과값을 가지고 비교하는 것이 제일 현명한 방법입니다.

벤치마크를 적극 활용합시다!!

 

4. 컴파일러는 순차처리에 최적화 되어 있을 수도 있다는 사실을 잊지 말것!

무조건 병렬이 빠를 것이라고 생각할 수도 있지만, 

사실 개발자가 고려해야할 사항도 많고, 자칫하면 복잡한 로직의 지옥에 빠질 수도 있습니다.

그렇기 때문에 오히려 컴파일러가 죽은코드, 처리하지 않아도 되는 코드를 실행하지 않고, 걸러주는

순차처리 방식을 채택하는 것도 하나의 방법이 될 수 있습니다.

 

 

이제 조금 더 심화내용을 알아 보도록 합시다.

작업훔치기

ForkJoinPool이 어떻게 작업을 균등하게 분배하는지 궁금하지 않으셨나요?

ForkJoinPool은 작업훔치기 기법을 통해 작업자 스레드에 태스크를 공정하게 분배합니다.

 

ForkJoinPool에서의 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리합니다.

작업 훔치기를 통해 작업자 스레드에 작업을 분배 하는 그림

 

다음으로는 스트림을 자동으로 분할해주는 Spliterator를 살펴보도록 하겠습니다.

 

Spliterator 인터페이스

Spliterator는 Splitable과 iterator가 합쳐진 말로

직역하자면 분할 가능한 반복자라는 의미입니다.

 

iterator 처럼 요소를 탐색하지만, 병렬작업에 특화되어 있는게 특징입니다.

 

그럼 본격적으로 Spliterator 인터페이스를 살펴볼까요?

 

Spliterator 인터페이스는 내부에 선언된 메서드는 많지만 대표적으로 4가지만 살펴보도록 하겠습니다.

 

 /**
  * 나머지 요소가 존재하는 경우 해당 요소에 대해 지정된 작업을 수행하고,
  * {@code true} 반환; 그렇지 않으면 {@code false}를 반환합니다. 이 경우
  * Spliterator는 {@link #ORDERED}입니다.
  * 조우 순서의 다음 요소. 에 의해 던져진 예외
  * 작업은 호출자에게 전달됩니다.
  *
  * @param 액션 액션
  * @return {@code false} 나머지 요소가 존재하지 않는 경우
  * 이 메서드에 들어갈 때, 그렇지 않으면 {@code true}.
  * 지정된 작업이 null인 경우 @throws NullPointerException
  */
 
boolean tryAdvance(Consumer<? super T> action);

 

1. tryAdvance()는 나머지 요소가 존재하는 경우 

해당 요소에 대해 지정된 작업을 수행하고, {@code true} 반환; 

그렇지 않으면 {@code false}를 반환합니다.

 

 

/**
 * 이 분할자를 분할할 수 있는 경우 분할자를 반환합니다.
 * 이 메서드에서 반환되면 요소를 포함하지 않습니다.
 * 이 스플리터가 적용됩니다.
 *
 * 이 Spliterator가 {@link #ORDERED}인 경우 반환되는 Spliterator
 * 요소의 엄격한 접두사를 포함해야 합니다.
 *
 * 이 Spliterator가 무한한 수의 요소를 포함하지 않는 한,
 * {@code trySplit()}에 대한 반복적인 호출은 결국 {@code null}을 반환해야 합니다.
 * null이 아닌 반환 시:
 * 분할하기 전에 {@code EstimateSize()}에 대해 보고된 값,
 * 분할 후 {@code EstimateSize()}보다 크거나 같아야 합니다.
 * 이것과 반환된 Spliterator의 경우; 그리고
 * 이 Spliterator가 {@code SUBSIZED}이면 {@code EstimateSize()}
 * 분할 전 이 분할자의 경우 다음 합계와 같아야 합니다.
 * 이에 대한 {@code EstimateSize()} 및 반환된 Spliterator 이후
 * 분할.
 *
 *
 *이 메서드는 어떤 이유로든 {@code null}을 반환할 수 있습니다.
 * 비어 있음, 순회 후에 분할할 수 없음 포함
 * 시작, 데이터 구조 제약 및 효율성
 * 고려 사항.
 *
 * @apiNote
 * 효율적으로 이상적인 {@code trySplit} 메서드(
 * 순회)는 요소를 정확히 반으로 나눕니다.
 * 균형 잡힌 병렬 계산. 이 이상에서 많은 이탈
 * 매우 효과적입니다. 예를 들어 대략적으로만
 * 대략적으로 균형 잡힌 나무를 분할하거나
 * 어떤 리프 노드가 하나 또는 두 개의 요소를 포함할 수 있는지,
 * 이러한 노드를 추가로 분할하지 못했습니다. 그러나 큰
 * 균형의 편차 및/또는 지나치게 비효율적인 {@code
 * trySplit} 메커니즘은 일반적으로 불량한 병렬을 초래합니다.
 * 성능.
 *
 * @return a {@code Spliterator}
 * 요소 또는 이 분할자를 분할할 수 없는 경우 {@code null}
 */
Spliterator<T> trySplit();

 

2. trySplit() 메서드는 Spliterator의 일부요소를 분할해서 두 번째 Spliterator를 생성합니다.

 

 

 

/**
 * 예상되는 요소 수를 반환합니다.
 * {@link #forEachRemaining} 순회에서 발생하거나 {@link를 반환합니다.
 * Long#MAX_VALUE}는 무한하거나 알 수 없거나 계산하기에 너무 비싼 경우입니다.
 *
 * 이 Spliterator가 {@link #SIZED}이고 아직 부분적으로 적용되지 않은 경우
 * 트래버스 또는 스플릿, 또는 이 스플리터는 {@link #SUBSIZED}이고 다음을 가지고 있습니다.
 * 아직 부분적으로 순회되지 않았으므로 이 추정치는 정확해야 합니다.
 * 완전한 순회에서 만날 수 있는 요소의 수.
 * 그렇지 않으면 이 추정치가 임의로 부정확할 수 있지만 감소해야 합니다.
 * {@link #trySplit} 호출 전반에 걸쳐 지정된 대로.
 *
 * @apiNote
 * 정확하지 않은 추정치라도 종종 유용하고 비용이 적게 듭니다.
 * 예를 들어, 대략적으로 균형 잡힌 이진 트리의 하위 분할기
 * 요소의 수를 절반으로 추정하는 값을 반환할 수 있습니다.
 * 부모의 것; 루트 Spliterator가 유지 관리하지 않는 경우
 * 정확한 개수, 크기를 2의 거듭제곱으로 추정할 수 있음
 * 최대 깊이에 해당합니다.
 *
 * @return 예상 크기, 무한이면 {@code Long.MAX_VALUE},
 * 알 수 없거나 계산하기에 너무 비쌉니다.
 */
long estimateSize();

 

3. estimateSize() 메서드는 요소의 수를 반환합니다.

 

 

/**
     * 이 Spliterator의 특성 세트와 해당
     * 요소. 결과는 {@link의 OR 값으로 표시됩니다.
     * #ORDERED}, {@link #DISTINCT}, {@link #SORTED}, {@link #SIZED},
     * {@link #NONNULL}, {@link #IMMUTABLE}, {@link #CONCURRENT},
     * {@link #SUBSIZED}. {@code properties()}에 대한 반복적인 호출
     * {@code trySplit} 호출 전 또는 호출 사이에 주어진 분할자,
     * 항상 같은 결과를 반환해야 합니다.
     *
     * Spliterator가 일치하지 않는 집합을 보고하는 경우
     * 특성(단일 호출에서 반환된 특성
     * 또는 여러 호출에 걸쳐), 보장할 수 없음
     * 이 Spliterator를 사용하는 모든 계산에 대해.
     *
     * @apiNote 분할 전 주어진 분할기의 특성
     * 분할 후 특성과 다를 수 있습니다. 특정
     * 예는 특성 값 {@link #SIZED}, {@link #SUBSIZED} 참조
     * 및 {@link #CONCURRENT}.
     *
     * @return 특성 표현
     */
int characteristics();

 

 

4. characteristics() 메서드는 Spliterator의 특성 정보를 반환합니다.(특성정보를 포함하는 int)

 

 

그럼 이제 Spliterator 인터페이스의 구현체를 통해 살펴볼까요?

 

public class WordCounterSpliterator implements Spliterator<Character> {

    private final String string; // 문자열
    private int currentChar = 0; // 인덱스

    public WordCounterSpliterator(String string) {
        this.string = string;
    }

    /**
     * 문자열에서 현재 인덱스에 해당하는 문자를 Consumer에게 제공한다음 인덱스를 증가시킴.
     * Consumer : 스트림을 탐색하면서 적용해야 하는 함수 집합이 작업을 처리할 수 있도록 소비한 문자를 전달하는 자바 내부 클래스.
     *
     * @param action
     * @return
     */
    @Override
    public boolean tryAdvance(Consumer<? super Character> action) {
        // 현재 문자를 소비한다.
        action.accept(string.charAt(currentChar++));

        // 소비할 문자가 남아있으면 true 리턴
        return currentChar < string.length();
    }

    /**
     * 반복될 자료구조를 분할하는 로직
     *
     * @return
     */
    @Override
    public Spliterator<Character> trySplit() {
        int currentSize = string.length() - currentChar;

        // 문자열이 분할 기준값인 10 이하면 더 이상 분할되지 않음.
        if (currentSize < 10) {
            return null; // null 리턴시 분할이 중단 됨.
        }
        // 분할 로직
        for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
            // 지금 문자가 공백이라면 새로운 spliterator를 만들어서 리턴한다.
            if (Character.isWhitespace(string.charAt(splitPos))) {
                Spliterator<Character> spliterator =
                        new WordCounterSpliterator(string.substring(currentChar, splitPos));

                currentChar = splitPos;
                return spliterator;
            }
        }
        return null;
    }

    /**
     * 탐색해야할 요소의 갯수
     *
     * @return
     */
    @Override
    public long estimateSize() {
        return string.length() - currentChar;
    }

    /**
     * ORDERED - 유의미한 순서가 있음.
     * DISTINCT - x,y 두 요소를 방문했을 경우 항상 false인 경우.(중복이 없다.)
     * SORTED - 탐색될 요소들은 미리 정렬되어 있는지 여부.
     * SIZED - 요소의 길이
     * NON-NULL - 탐색하는 모든요소는 NULL이 아님
     * IMMUTABLE - 이 Spliterator의 소스는 불변이다.
     * CONCURRENT - 동기화 없이 Spliterator의 소스를 여러스레드에서 동시에 고칠 수 있다.(thread safe하다)
     * SUBSIZED - 이 Spliterator와 분할되는 Spliterator들은 SIZE 특성을 가진다.
     *
     * @return
     */
    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
}

 

다음은 Spliterator의 구현체의 모습입니다.

문자열을 제공받아 단어의 갯수를 세는 기능을 하는데요. 

 

위의 인터페이스에서 선언되었던 것들이 모두 재정의 되어 선언되어 있는 것을 보실 수 있습니다.

 

 

지금까지 병렬스트림에 대해서 알아 보았습니다.

 

마지막으로 정리해보자면.

 

1. 스트림에 parallel()만 추가해주면, 순차스트림을 손 쉽게 병렬로 처리할 수 있다.

 

2. 그러나 항상 병렬처리가 빠른 것은 아니므로, 순차처리를 했을 경우와 병렬처리를 했을 경우를 잘 비교 하여

최적이라고 판단되는 것을 선택할 수 있어야 한다.

(벤치마킹으로 성능측정하는 것도 좋은 방법이다.)

 

3. 병렬스트림은 데이터가 커지면 커질수록,

각 요소를 처리하는 비용이 증가할 수록 성능을 높여줄 수 있는 옵션이기 때문에 고려해볼 만하다.

(반대로 데이터 양이 적을 때에는 오히려 성능이 나쁘다.)

 

4. 기본 특화 스트림과 올바른 자료구조를 이용하는 것이 병렬스트림을 사용하는 것보다 성능개선에 효과적일 수 있다.

 

5. 포크 / 조인 프레임워크에서는 태스크를 작은 서브태스크로 분할하여 각각의 작업자 스레드에 할당하며, 서브태스크를 join 하여 각각의 결과 값을 합쳐 최종결과를 도출한다.

 

6. Spliterator는 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화 할 것인지에 대해

분해과정을 완전 제어 및 병렬화 방법을 재정의 할 수 있다.