Home 스트림 (Stream)
Post
Cancel

스트림 (Stream)

스트림이란 무엇일까?

자바 8 API에 새로 추가된 기능으로, 스트림이란 ‘데이터 처리 연산을 지원하도록 소스에서 추출된 연속된 요소’로 정의할 수 있다.

스트림의 정의에 사용된 단어의 의미를 하나하나 뜯어보자!

  • 데이터 처리 연산?

    함수형 프로그래밍 언어에서 일반적으로 지원하는 연산, 그리고 데이터베이스와 비슷한 연산을 지원한다. 예를 들어 filter, map, reduce, find, match, sort 등으로 데이터를 조작할 수 있다.

  • 소스?

    컬렉션, 배열, I/O 자원 등의 데이터 제공 소스로부터 데이터를 소비한다. 정렬된 컬렉션으로 스트림을 생성하면, 정렬이 그대로 유지된다. (리스트로 스트림을 만들면, 스트림의 요소는 리스트의 요소와 같은 순서를 유지함)

  • 연속된 요소?

    컬렉션과 마찬가지로, 스트림은 특정 요소 형식으로 이루어진 연속된 값 집합의 인터페이스를 제공한다.

    컬렉션은 자료구조이므로, 컬렉션에서는 요소 저장 및 접근 연산이 주를 이룬다. 하지만 스트림은 filter, sorted, map 처럼 표현 계산식이 주를 이룬다. (즉, 컬렉션의 주제는 데이터고, 스트림의 주제는 계산!)

그리고 스트림은 두 가지 중요한 특징을 갖는다.

  1. 파이프라이닝

    대부분의 스트림은 스트림 연산끼리 연결해 커다란 파이프라인을 구성할 수 있도록, 스트림 자신을 반환한다.

  2. 내부 반복

    반복자를 이용해 명시적으로 반복하는 컬렉션과는 달리, 스트림은 내부 반복을 지원한다. (내부 반복에 대한 내용은 아래에서 더 자세히 살펴보자 😉 )

컬렉션과 스트림 비교하기

컬렉션과 스트림은 모두 연속된(=순차적으로 값에 접근하는) 요소 형식의 값을 저장하는 자료구조의 인터페이스를 제공한다. 그렇다면 컬렉션과 스트림은 어떤 차이가 있을까?

컬렉션과 스트림의 차이점

컬렉션과 스트림의 가장 큰 차이는 데이터를 언제 계산하는가이다.

컬렉션의 모든 요소는 컬렉션에 추가하기 전에 계산되어야 한다. 쉽게 말하면 컬렉션에 요소를 추가하거나 삭제할 수 있는데, 이 연산을 수행할 때마다 컬렉션의 모든 요소를 메모리에 저장해야 하고, 컬렉션에 추가하려는 요소는 미리 계산되어야 한다는 뜻이다.

하지만 스트림은 요청할 때만 요소를 계산하는 고정된 자료구조이다. (스트림에 요소를 추가하거나 제거할 수 없다.)

컬렉션은 모든 값을 계산할 때까지 기다려야 하고, 스트림은 필요할 때만 값을 계산한다.



💡  컬렉션과 스트림 예시로 비교하기

컬렉션은 DVD에, 인터넷 스트리밍은 스트림에 비유할 수 있다.

DVD에 어떤 영화가 저장되어 있다고 해보자. DVD에 전체 자료구조가 저장되어 있고, DVD에 저장해 둔 영화를 재생해 시청한다.

그리고, 인터넷 스트리밍으로 같은 영화를 시청한다고 해보자. 스트리밍, 즉 스트림이 등장한 것이다. 스트리밍으로 영화를 재생할 때는 사용자가 시청하는 부분 몇 프레임을 미리 내려받고, 해당 프레임부터 재생할 수 있게 된다. 필요한 부분을 그때그때 내려받아 재생하는 것이다.


내부 반복과 외부 반복

컬렉션과 스트림의 또 다른 차이점은 데이터 반복 처리 방법이다.

컬렉션에서는 for-each 등을 사용해 사용자가 직접 요소를 반복해야 하는데, 이를 외부 반복이라고 한다. 반면 스트림에서는 반복을 알아서 처리하고, 결과 값을 어딘가에 저장해주는 내부 반복을 사용한다.

다음 코드를 살펴보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// (1) 내부적으로 숨겨진 반복자를 사용한 외부 반복
List<String> names = new ArrayList<>();
Iterator<String> iterator = menu.iterator();
while (iterator.hasNext()) {
	Dish dish = iterator.next();
	names.add(dish.getName());
}

// (2) for-each를 이용하는 외부 반복
List<String> names = new ArrayList<>();
for (Dish dish : menu) {
	names.add(dish.getName());
}

// (3) 내부 반복 (스트림 이용)
List<String> names = menu.stream()
                        .map(Dish::getName)
                        .collect(toList());

내부 반복을 이용하면 작업을 투명하게 병렬로 처리하거나, 더 최적화된 다양한 순서로 처리할 수 있다.

스트림의 내부 반복은 데이터 표현과 하드웨어를 활용한 병렬성 구현을 자동으로 선택하는 반면, 외부 반복에서는 병렬성을 스스로 관리해야 한다. 다시 말하면, 병렬성을 포기하던지, 아니면 synchronized를 이용해 복잡한 병렬 처리 코드를 직접 짜던지 선택해야 하는 힘든 길을 가야 한다는 것이다 🥹

즉, 내부 반복은 어떻게 요소를 반복시킬 것인지는 신경쓰지 않고, 요소 처리 코드에만 집중할 수 있다는 장점이 있다.

중간 연산과 최종 연산

스트림의 연산은 중간 연산과 최종 연산, 두 가지로 구분된다.

먼저 예제 코드를 살펴보자.

1
2
3
4
5
List<String> names = menu.stream()
                        .filter(dish -> dish.getCalories() > 300)
                        .map(Dish::getName)
                        .limit(3)
                        .collect(toList());

이 코드에서 연산을 두 그룹으로 구분할 수 있다.

  • filter, map, limit은 서로 연결되어 파이프라인을 형성한다.
  • collect로 파이프라인을 실행한 다음 닫는다.

이렇게, 연결할 수 있는 스트림 연산을 중간 연산, 스트림을 닫는 연산을 최종 연산이라고 한다.

중간 연산

filter나 sorted 같은 중간 연산은 스트림을 반환하기 떄문에, 여러 중간 연산을 연결해서 질의를 만들 수 있다.

중간 연산의 가장 중요한 특징은, 최종 연산을 실행하기 전까지는 아무 연산도 수행하지 않는다는 것(lazy, 게으르다) 이다. 중간 연산을 합친 다음에, 합쳐진 중간 연산을 최종 연산으로 한 번에 처리하기 때문이다.

⭐ 최종 연산이 시작되기 전까지 중간 연산은 지연(lazy) 된다. 즉, 최종 연산이 시작되어야 컬렉션의 요소가 처리되고 최종 연산까지 오게 된다!

최종 연산

최종 연산은 스트림 파이프라인에서 결과를 도출한다. 즉, 스트림을 결과로 반환하는 중간 연산과는 달리, 최종 연산에서는 List, Integer, void 등 스트림 이외의 결과가 반환된다.

스트림 활용하기

스트림 API가 지원하는 다양한 연산에 대해 살펴보자.

필터링

스트림의 요소를 선택하는 방법이다.

  • filter

    프레디케이트(boolean을 반환하는 함수) 를 인수로 받아, 해당 프레디케이트와 일치하는 모든 요소를 포함하는 스트림을 반환한다.

    1
    2
    3
    
      List<Dish> vegetarianMenu = menu.stream()
                                      .filter(Dish::isVegetarian)
                                      .collect(toList());
    
  • distinct

    고유 요소로 이루어진 스트림을 반환한다. 즉, 중복 요소를 필터링한다. ( 고유 여부는 해당 객체의 hashCode, equals로 결정된다. )

    1
    2
    3
    4
    
      List<Dish> vegetarianMenu = menu.stream()
                                      .filter(i -> i % 2 == 0)
                                      .distinct()
                                      .collect(toList());
    

슬라이싱

스트림의 요소를 선택하거나 스킵하는 방법이다.

  • takewhile & dropwhile

    프레디케이트를 이용한 슬라이싱 방법으로, 이미 정렬되어 있는 요소를 슬라이스할 때 사용한다. 자바 9부터 지원하는 메소드이다.

    • takewhile : 프레디케이트가 처음으로 거짓이 되는 지점까지 발견된 요소를 수집한다.
      ( 프레디케이트가 거짓이 되면, 그 지점에서 작업을 중단하고 해당 지점까지의 요소를 반환한다. )

      1
      2
      3
      4
      
        // 320 칼로리 이하인 메뉴 찾기
        List<Dish> filteredMenu = specialMenu.stream()
                                            .takewhile(dish -> dish.getCalories() < 320)
                                            .collect(toList());
      
    • dropwhile : 프레디케이트가 처음으로 거짓이 되는 지점까지 발견된 요소를 버린다.
      ( 프레디케이트가 거짓이 되면, 그 지점에서 작업을 중단하고 남은 모든 요소를 반환한다. )

      1
      2
      3
      4
      
        // 320 칼로리를 초과하는 메뉴 찾기
        List<Dish> filteredMenu = specialMenu.stream()
                                            .dropwhile(dish -> dish.getCalories() < 320)
                                            .collect(toList());
      
  • limit

    최대 n개 요소를 갖는 새로운 스트림을 반환한다. 정렬되지 않은 스트림(ex. Set) 에도 사용가능한데, 정렬되어있지 않았다면 limit의 결과도 정렬되지 않은 상태로 반환된다.

    1
    2
    3
    4
    5
    
      // 300 칼로리를 초과하는 메뉴 3개를 선택해 리스트로 만들기
      List<Dish> filteredMenu = specialMenu.stream()
                                          .filter(dish -> dish.getCalories() > 300)
                                          .limit(3)
                                          .collect(toList());
    
  • skip

    처음 n개 요소를 제외한 스트림을 반환한다. n개 이하의 요소를 포함하는 스트림에 skip(n)을 호출하면, 빈 스트림이 반환된다.

    1
    2
    3
    4
    5
    
      // 300 칼로리를 초과하는 메뉴 2개를 건너뛰고, 나머지 메뉴 반환
      List<Dish> filteredMenu = specialMenu.stream()
                                          .filter(dish -> dish.getCalories() > 300)
                                          .skip(2)
                                          .collect(toList());
    

매핑

특정 객체에서 특정 데이터를 선택하는 방법이다.

  • map

    함수를 인수로 받고, 각 요소에 해당 함수를 적용한 결과가 새로운 요소로 매핑된다.

    1
    2
    3
    4
    5
    
      // 각 요리명의 길이를 리스트로 만들어 반환
      List<Dish> dishNames = menu.stream()
                              .map(Dish::getName)
                              .map(String::length)
                              .collect(toList());
    
  • flatMap

    중복된 스트림을 1차원으로 평면화한다. 즉, 2차원 배열의 요소를 하나로 평면화 하는 것이라고 보면 된다.

    1
    2
    3
    4
    5
    6
    
      // 단어 목록에서 각각의 단어를 이루는 알파벳 중 고유한 알파벳 (중복 제거) 리스트 만들어 반환
      List<String> uniqueCharacters = words.stream()
                                          .map(word -> word.split(""))
                                          .flatMap(Arrays::stream)
                                          .distinct()
                                          .collect(toList());
    

검색과 매칭

특정 속성이 데이터 집합에 있는지 여부를 검색하는 데이터 처리 시 사용하는 방법이다.

  • anyMatch

    프레디케이트가 주어진 스트림에서 적어도 한 요소와 일치하는지 확인할 때 사용한다. ( boolean을 반환하기 때문에, 최종 연산에 해당한다. )

    1
    2
    3
    4
    
      // 채식 메뉴가 있는지 확인
      if (menu.stream().anyMatch(Dish::isVegetarian)) {
      	System.out.println("This menu is (somewhat) vegetarian friendly!");
      }
    
  • allMatch

    스트림의 모든 요소가 주어진 프레디케이트와 일치하는지 확인할 때 사용한다. ( boolean을 반환하기 때문에, 최종 연산에 해당한다. )

    1
    2
    3
    
      // 1000 칼로리 미만의 건강식 메뉴인지 확인
      boolean isHealthy = menu.stream()
                              .allMatch(dish -> dish.getCalories() < 1000);
    
  • noneMatch

    주어진 프레디케이트와 일치하는 요소가 없는지 확인할 때 사용한다.

    1
    2
    3
    
      // 1000 칼로리 미만의 건강식 메뉴인지 확인
      boolean isHealthy = menu.stream()
                              .noneMatch(dish -> dish.getCalories() >= 1000);
    
  • findAny

    현재 스트림에서 임의의 요소를 반환한다.

    1
    2
    3
    4
    
      // 채식 메뉴 중 임의의 메뉴 하나를 반환
      Optional<Dish> dish = menu.stream()
                              .filter(Dish::isVegetarian)
                              .findAny();
    
  • findFirst

    논리적인 아이템 순서가 정해져있는 스트림에서 첫 번째 요소를 찾을 때 사용한다.

    1
    2
    3
    4
    5
    6
    
      // 숫자 리스트에서 3으로 나누어 떨어지는 첫 번째 제곱값을 반환
      List<Integer> someNumbers = List.of(1,2,3,4,5);
      Optional<Integer> firstSquareDivisibleByThree = someNumbers.stream()
                                                              .map(n -> n*n)
                                                              .filter(n -> n%3 == 0)
                                                              .findFirst();
    

    💡  findFirst와 findAny 둘 다 필요한 이유?

    왜 findFirst와 findAny 메소드 모두 필요할까? 바로 병렬성 때문이다!

    병렬 실행에서는 첫 번째 요소를 찾기 어렵다. 그래서 요소의 반환 순서가 상관없다면, 병렬 스트림에서는 제약이 적은 findAny를 사용하기 때문이다.



💡  쇼트서킷 기법

표현식에서 하나라도 거짓이라는 결과가 나오면, 나머지 표현식의 결과와 상관 없이 전체 결과도 거짓이 된다. 이러한 상황을 쇼트서킷이라고 부른다.

원하는 요소를 찾았을 때, 더이상의 연산을 하지 않고 즉시 결과를 반환할 수 있는 경우이다.

allMatch, noneMatch, anyMatch, findFirst, findAny 연산은 모든 스트림의 요소를 처리하지 않고도 결과를 반환할 수 있다. 그래서 이 메소드들은 모두 쇼트서킷 연산이다.

limit 메소드 또한 스트림의 모든 요소를 처리할 필요 없이, 주어진 크기의 스트림을 생성하기 때문에 쇼트서킷 연산이다.


리듀싱

모든 스트림 요소를 처리해서 값으로 도출하는 연산이다.

  • reduce

    모든 스트림 요소를 반복하며 조합해 값을 도출할 때 사용한다. ( 앞의 연산 결과에 해당 요소를 대입해 연쇄적으로 계산한다! )

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
      int sum = numbers.stream()
      							.reduce(0, (a,b) -> a+b);
        
      // 메소드 참조를 이용한 경우
      int sum = numbers.stream()
      							.reduce(0, Integer::sum);
        							
      // 초기값을 파라미터로 넣지 않는 경우
      Optional<Integer> sum = numbers.stream()
      							.reduce(Integer::sum);
    

    💡  reduce 메소드와 병렬화

    위 예시에서 reduce를 이용해 합계를 구하는 방법 대신, 일반적인 반복을 통해 합계를 구하는 방법을 이용하면 되지 않을까? 왜 reduce 메소드를 이용해야 할까? 🤔

    일반적인 반복을 통한 방법은 sum 변수를 공유해야 하기 때문에 쉽게 병렬화하기 어렵다.

    reduce를 이용하면 내부 반복이 추상화되면서, 내부 구현에서 병렬로 reduce를 실행할 수 있게 된다. 그래서 병렬로 실행되어야 하는 경우, reduce를 사용하는 것이 더 좋다 🙂


Atomic 객체 활용하기

java.concurrent.atomic 패키지는 동시성 프로그래밍에서 원자적으로(atomic) 변수의 값을 갱신하는 클래스를 제공한다. 쉽게 말하면, 서로 다른 스레드가 변수 값을 동시에 변경하려고 할 때 발생할 수 있는 race condition을 방지할 수 있다는 의미이다.

병렬 스트림에서 AtomicInteger를 이용하는 예시 코드를 살펴보자.

AtomicInteger에 초기 값을 선언하고, 병렬 스트림의 forEach에서 addAndGet 메소드를 통해 합계를 저장한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class StreamAtomicExample {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // AtomicInteger 사용
        AtomicInteger sum = new AtomicInteger(0);

        // 병렬 스트림을 사용하여 원자적으로 합계를 계산
        numbers.parallelStream()
            .forEach(number -> sum.addAndGet(number));

        System.out.println("Sum using AtomicInteger and parallel stream: " + sum.get());
    }
}

스트림에서 Atomic 객체를 사용하면 다음과 같은 장점이 있다.

  1. 동기화 문제 해결

    병렬 스트림을 사용할 때 여러 스레드가 동시에 데이터를 수정하려고 할 때 발생하는 race condition 문제를 해결한다.

  2. 성능 최적화

    Atomic 객체는 내부적으로 CAS(Compare-And-Swap) 알고리즘을 사용해, 동기화 블록이나 락을 사용하지 않고 안전하게 값을 갱신할 수 있다. 이로 인해 성능 저하를 최소화할 수 있다.


    💡 CAS(Compare-And-Swap) 알고리즘

    변수의 값을 비교한 후, 예상하는 값이면 새로운 값으로 교체하는 원자적 연산을 수행한다. 기존 값과 예상 값이 일치할 때만 변수의 값을 업데이트하기 때문에, 데이터의 일관성을 보장할 수 있다.

    CAS 알고리즘은 락을 사용하지 않기 때문에, 데드락과 같은 문제를 방지하면서도 높은 성능을 유지할 수 있다.


참고

This post is licensed under CC BY 4.0 by the author.

동작 파라미터화와 람다식

함수형 프로그래밍 (Functional Programming)