Java/이론..

자바 스트림 API

후루룩짭짭 2014. 10. 8. 17:05

이번에는 Java SE 8에서 추가된 아주 좋은 SteamAPI에 대해서 알아 보도록 하겠습니다.


오역이 있을 수도 있으니 원본 문서도 한번 보는것을 추천 드립니다. 

원본 글  : http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html


오타 및 오역 지적해주시면 감사하겠습니다. 

============================================================================================


복잡한 데이터 처리 질의를 표현하기 위해 Stream을 사용!.


대부분의 자바 애플리케이션들이 데이터를 만들고 처리하기 위해서 컬렉션을 사용한다. 이 컬렉션을 처리하는 로직 또한 같이 구현되어 코드에 포함되어 있고 이는 많은 프로그램의 기본(데이터를 그룹핑 하고 제어 하는 등)이 된다. 

예를 들어 은행에서 고객들의 입출력 내역을 가지고 있는 컬렉션을 생성하고 이중에서 고객이 사용한 내역들에 대해 추출하는 로직을 작성할 수 있을 것이다. 이런 은행 관련 중요한 기능들을 컬렉션을 통해 작정할 수 있지만 코드가 그다지 아름답지 않고 지저분하게 보여진다. 


컬렉션을 사용하는데는 두가지 단점이 있다.


1. 컬렉션에서 데이터를 처리하는 전형적인 처리 패턴은 SQL에서 데이터를 찾는것과 비슷하거나(처리 내역 중 가장 높은 값을 찾기) 데이터를 묶는것(구매 내역중에 식료품에서 쇼핑한 내역만 찾기)과 비슷하다. 대부분의 데이터베이스들은 이를 명확하게 지원하는데 만약 가장 높은 값을 가지는 것을 찾고 싶으면 "SELECT ID, MAX(VALUE) FROM TRANSACTIONS"를 사용 하면 된다.


위의 예에서 보는것 처럼 최대값을 어떻게 계산해야 하는지에 대해서 직접 구현할 필요가 없이(반복문을 사용해서 가장 높은 값을 찾는 것) 단지 원하는 값을 표현해 주기만 하면 된다. 컬렉션도 데이터베이스처럼 데이터의 집합인데 왜 비슷한 방법으로 처리할 생각을 못하고 이와 비슷한 구현을 하기 위해서 얼마나 많은 시간을 소모해서 복잡한 반복문을 작성하고 또 작성했는지 생각해보자.


2. 만약 큰 사이즈의 컬렉션은 어떻게 처리 해야 할까? 처리 속도를 향상하기 위해서는 병렬 처리를 하도록 코드를 작성해야 되는데 이는 기술적으로도 어렵고 많은 에러가 발생한다.


이러한 단점들은 Java SE 8이 출시되면서 해결할 수 있게 되었다. Stream 이라는 새로운 API가 공개되었고 이를 통해 쿼리를 작성하듯 데이터를 명시적인 방법으로 처리할 수 있게 되었다. 게다가 Stream은 멀티 스레드 관련 코드도 별도로 작성할 필요 없이 멀티코어를 지원할 수 있게 되었다. 좋아 보이지 않는가? 이 문서에서는 이런 내용들에 대하여 살펴볼 것이다.


Stream을 가지고 무엇을 할 수 있는지 알아보기 전에 Java SE 8의 Stream 사용법에 대해 간단히 살펴보자. 식료품점에서 쇼핑한 모든 거래내역중에 거래 ID를  가장 큰 비용이 들어간 순으로 정렬해서 추출하는 것에 대한 코드를 작성해보자.


Listing 1. Java SE 7 이하에서의 처리 방법

List groceryTransactions = new ArrayList();
for (Transaction t : groceryTransactions) {
    if (t.getType() == Transaction.GROCERY) {
        groceryTransactions.add(t);
    }
}
Collections.sort(groceryTransactions, new Comparator() {
    public int compare(Transaction t1, Transaction t2) {
        return t2.getValue().compareTo(t1.getValue());
    }
});

List transactionIds = new ArrayList();
for (Transaction t : groceryTransactions) {
    transactionIds.add(t.getId());
}


Listing 2. Java SE 8 에서의 처리 방법

List transactionsIds = 
    transactions.stream()
        .filter(t -> t.getType() == Transaction.GROCERY)
        .sorted(Comparator.comparing(Transaction::getValue).reversed())
        .map(Transaction::getId)
        .collect(Collectors.toList());


아래 그림 Fiqure 1은 Java SE 8에서 Stream이 동작하는 과정을 그린것이다.

먼저 모든 거래내역(List)로 부터 stream()메서드를 사용해서 stream을 가져오고 그 다음에 여러가지 기능(filtersortedmap,collect)를 체인 처럼 엮어서 데이터를 처리하는 쿼리처럼 보이게 만든다.


Figure 1

병렬 처리 코드 작성은 Java SE 8에서 매우 쉽게할 수 있다. Listing 3. 처럼 그냥 stream()을 parallelStream()으로 변경만 하면 된다. Stream API는 내부적으로 멀티코어로 동작하도록 처리한다.


Listing 3. 

List transactionsIds = 
    transactions.parallelStream()
        .filter(t -> t.getType() == Transaction.GROCERY)
        .sorted(Comparator.comparing(Transaction::getValue).reversed())
        .map(Transaction::getId)
        .collect(Collectors.toList());


처음보는 형태의 코드를 보고 걱정할 필요는 없다. 다음 섹션에서 이것들이 어떻게 동작하는지 살펴볼 것이다. 하지만 람다 표현식(t-> t.gerCategory() == Transaction.GROCERY)의 사용방법이나 메소드 참조(method references, Transaction::getId)에 대해서는 미리 한번 보는것이 좋다.


지금 부터 컬렉션에 저장된 데이터들을 SQL이 동작하는 것 처럼 스트림을 활용해서 처리 하는것에 대해서 살펴볼 것이다. 이 모든 동작들은 람다 표현식과 함께 간단한 파라미터로 처리 될 수 있다. 


Java SE 8의 Stream에 대한 이 문서를 다 읽어보면 Stream API를 사용하여 위 예제와 같이 멋진 방식으로 사용할 수 있을 것이다.


Stream 시작 하기

작은 부분부터 시작하도록 해보자. Stream의 정의는 무엇일까? 간단하게 정의해 보면 "집계 연산을 지원하는 요소의 순서(a sequence of elements from a source that supports aggregate operations.)"라고도 할수 있는데 이에 대해 더 알아 보도록 하자.

  • Sequence of element : Stream은 정의된 엘리먼트의 속성에 따라서 처리할 수 있는 인터페이스를 제공하지만 실제 엘리먼트들을 저장하지 않고 계산하는데만 쓰인다.
  • Source : 스트림은 컬렉션, 배열, I/O 리소스 등에서 제공받은 데이터를 가지고 작업을 처리 한다.
  • Aggreate operations : Stream은 SQL 같은 처리를 지원하고 함수형 프로그래밍 같은 처리 방법도 지원한다. (filtermap,reducefindmatch, sorted 등)
게다가 Stream은 기존의 컬렉션 처리 방법과 구분되게 하는 기본적인 두가지 특징이 있다.
  • Pipelining : 많은 Stream 기능들이 Stream 자기 자신을 리턴한다. 이 방식은 처리 작업이 체인처럼 연결되어 큰 파이프라인처럼 동작 하도록 한다. 이를 통해 laziness와 short-circuiting 과 같이 최적화 작업을 할 수 있다.
  • Internal iteration : 명시적으로 반복작업을 수행해야 되는 Collection과 비교 하면 Stream 작업은 내부에서 처리된다.
아래 그림(Figure 2) 은 Listing 2에 작성되어 있는 코드 상세한 그림으로 설명한 것이다.
Figure 2

처음 접한 Stream은 list에서 거래내역에 대한 목록을 stream() 메서드를 사용해서 Stream 으로 가져온 것이다. 그 다음은 몇가지 집계 관련된 작업을 진행하는데, filter(주어진 속성에 맞게 필터링), sorted(주어진 compator를 자기고 정렬), map(정보를 추출) 메서드를 사용해서 집계 관련된 처리를 수행한다. collect()를 제외한 모든 메서드들은 stream을 리턴하기 때문에 이들을 서로 연결해서 파이프라인형태로 처리할 수 있다.

이 작업들은 collect()가 호출되기전까지 실제로 실행되지 않는다. collect()는 파이프라인에 연결된 집계 작업을 시작하고 완료되면 결과를 리턴한다(Stream 형태가 아닌, 여기서는 list). collect()에 대해서는 나중에 좀 더 자세히 살펴볼테니 지금 당장 걱정할 필요는 없다. collect() 메서드를 보면 인자값을 가지고 있는것을 볼 수 있는데 이 파라미터를 가지고 리턴 값을 결정 한다. toLIst()는 Stream을 List로 변경하여 리턴해준다.

Stream에서 사용가능한 다른 메서드들에 대해 알아보기 전에 Stream과 Collection간의 개념적으로 다른점을 확인해 보도록 하자.

Stream VS Collections
예전부터 사용했던 Collection 과 이번에 새로 추가된 Stream은 엘리먼트들을 순차적으로 처리하는것과 관련된 인터페이스를 제공한다. 차이점을 간단히 말하면 Collection은 데이터와 관련된 것이고 Stream은 데이터를 계산하는것과 관련된 내용이다.

DVD에 저장된 영화를 예를 들어 생각해보자. 이것들은 데이터들로 구성되어 있기 때문에 Collection으로 볼 수 있다(바이트나 프레임으로 구성된 집합) 같은 영화를 인터넷 스트리밍을 통해서 본다고 가정해보자. 효율적으로 사용자들에게 비디오를 보여주기 위해 사용자가 보고 있는 부분들에 대해 미리 다운로드를 하는게 효율적이다. 이렇게 다운로드를 할 범위를 결정하는 것을 Stream으로 볼 수 있다.

간단하게 말하면 Collection과 Stream의 차이점은 무언가를 계산할때 라고 볼 수 있다 Collection은 메모리에 저장되는 데이터 구조고 Stream은 이를 계산해서 새로운 데이터를 만들어 낼 수 있다.

Collection 인터페이스를 사용하기 위해서는 사용자에 의해 정의된 반복문이 필요한데(for loop, foreach 등) 이를 외부 반복이라고 부른다.

이와 다르게 Stream은 내부 반복을 사용한다(내부적으로 반복작업을 수행하고 결과값을 어딘가에 저장해둔다) 사용자는 결과값을 가지고 무엇을 할 것인지만 정의하면 된다. 아래 코드는 두가지 차이점을 보여 준다.

Listing 4. 컬렉션에서 외부 반복작업

List<String> transactionIds = new ArrayList<>(); 
for(Transaction t: transactions){
    transactionIds.add(t.getId()); 
}


Listing 5. Stream의 내부 반복작업

List<Integer> transactionIds = 
    transactions.stream()
        .map(Transaction::getId)
        .collect(toList());


Listing 4는 명시적으로 반복문을 수행하고 거래 ID를 추출하여 리스트에 추가 하고 있다. 이와 다르게 Stream은 명시적인 반복문이 없다. Listing 5처럼 질의문의 만드는데 map은 거래 ID를 추출하고 collect는 Stream을 list로 변환해서 리턴해준다.

이제 Stream이 무엇이고 이를 가지고 할 수 있는게 무엇인지 감이 슬슬 올 것이다. Stream이 제공하는 다른 기능들에 대해서 살펴보고 실제 서비스에서 사용될만한 데이터 처리 쿼리를 작성해보자.

Stream Operations : Exploiting Streams to Process Data
Stream 인터페이스는 java.util.stream.Stream에 정의되어 있고 두가지 종류로 분류할 수 있는 많은 기능을 제공한다. Figure 1에서 봤던것 처럼 아래와 같은 기능들을 볼 수 있다.
  • filtersortedmap은 파이프라인처럼 서로 연결시킬 수 있다.
  • collect는 파이프라인을 종료 시키고 결과를 리턴한다.
Stream 처리는 중간 작업(intermediate operations)라고 불리는 것을 통해 서로 연결 될 수 있다. 이들의 리턴타입은 stream이기 때문에 서로 연결될 수 있다. 이 작업은 종료 작업(terminal operations)를 통해 작업을 종료할 수 있다. 처리된 결과를 ListInteger나 void 형태로 받을 수 있다.

이를 구분해서 처리하는 것이 왜 중요한지 의문이 생길수도 있다. 중간 작업들을 Stream 파이프라인에서 종료 작업이 호출되기 전에는 절대 수행되지 않는다. 이것을 Lazy라고 한다. 
중간 작업들은 보통 병합(merged)를 통해 데이터를 처리하고 종료 작업에 이를 전달해 준다.

Listing 6.
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
List<Integer> twoEvenSquares = 
    numbers.stream()
           .filter(n -> {
                    System.out.println("filtering " + n); 
                    return n % 2 == 0;
                  })
           .map(n -> {
                    System.out.println("mapping " + n);
                    return n * n;
                  })
           .limit(2)
           .collect(toList());

Listing 6의 코드는 주어진 숫자 목록에서 나머지가 0인 값들을 계산하는 코드이다.
아래 출력된 내용을 보면 예상과 다른 것을 볼 수 있다.

filtering 1
filtering 2
mapping 2
filtering 3
filtering 4
mapping 4

이와같은 내용으로 출력되는 원인은 limit(2)를 범위를 제한 했기 때문이다. 가끔 Stream에서 전체가 아닌 특정 부분에 대한 처리를 해야될 필요가 있다. 

지금까지 배운 Stream을 사용하는 일반적인 내용을 요약하면 3가지로 요약할 수 있다.
  • 질의를 할 데이터소스(Collection)같은게 필요하다
  • Stream 파이프라인을 형성하는 중간 작업
  • Stream 파이프라인을 실행하고 결과를 리턴하는 종료 작업
이제 Stream에서 사용할 수 있는 기능들을 살펴보자. 이에 대한 전체 목록은 java.util.stream 패키지를 참조 하라.

Filtering : Stream에는 필터 처리를 제공하는 여러가지 메서드들이 있다.
  • filter(Predicated) : 주어진 predicate(java.util.function.Predicate)와 일치하는 stream을 리턴한다.
  • distinct : 중복을 제거한 유니크 엘리먼트를 리턴한다.(stream에 포함된 엘리먼트들의 equals()구현에 따라 구분된다.
  • limit(n) : 주어진 사이즈(n)에 까지의 stream을 리턴한다.
  • skip(n) : 주어진 엘리먼트 길이 까지 제외한 stream을 리턴한다.
Finding and matching : 가장 일반적인 데이터 처리 패턴은 주어진 조건에 일치하는 것을 찾아 내는 것이다. anyMatch,allMatchnoneMatch와 같은 기능을 사용하여 필요한 것을 찾을 수 있다. 이들을 전부 predicate를 인수로 받아서 결과를 boolean형태로 리턴한다. 에를 들어 Stream에 포함된 엘리먼트들이 모두 100보다 큰지 확인하고 싶으면 아래 코드처럼 allMatch를 사용하면 된다.

Listing 7.
boolean expensive =
    transactions.stream()
        .allMatch(t -> t.getValue() > 100);

Stream은 임의의 요소를 찾는 findFirstfindAny 기능도 제공한다. 이 기능은 필터와 같이 Stream을 조작하는데 사용될 수 있다.FindFirstFindAny 값은 Optional 객체를 리턴한다.

Listing 8.
Optional<Transaction> = 
    transactions.stream()
        .filter(t -> t.getType() == Transaction.GROCERY)
        .findAny();

Optional<T> 클래스(java.util.Optional)는 값이 존재 하거나 존재하지 않을 경우에 사용할 수 있는 컨데이터다. Listing 8의 크도를 보면 Transaction 타입이 GROCERY가 아닌것이 있을 수도 있다. Optional 클래스는 엘리먼트들이 존재하는지 확인하는 여러 메서드를 가지고 있다. 예를 들어 만약 엘리먼트가 존재 한다면 ifPresent 메서드를 사용해서 특정 작업을 수행할 수 있다.

Listing 9.
transactions.stream()
    .filter(t -> t.getType() == Transaction.GROCERY)
    .findAny()
    .ifPresent(System.out::println);

Mapping : map 기능을 사용해서 인자(java.util.function.Function)로 전달받은 값들을 가지고 새로운 stream을 만들 수 있다. 

예를 들어 Stream에 포함된 엘리먼트들에 대한 특정 정보만 필요할 수도 있다. Listin 10 처럼 List에 저장된 단어의 길이들을 가지고 있는 새로운 List를 생성할 수 있다. 

Listing 10.
List<String> words = Arrays.asList("Oracle", "Java", "Magazine");
List<Integer> wordLengths = words.stream()
    .map(String::length)
    .collect(toList());

Recuding : 지금까지 본 stream의 종료작업은 boolean(allMatch 등), void(forEach), optional(findAny 등) 오브젝트들을 결과로 리턴했고 collect를 사용해서 List형태로도 리턴을 할 수 있었다.

이외에도 "가장 큰 ID를 가진 거래내역은 무엇인가", "모든 거래 내역의 합계" 처럼 Stream의 엘리먼트들에 복잡한 형태의 공식같은걸 적용하는 처리 쿼르를 만들수도 있다. 이 기능은 Stream의 reduce를 통해서 사용한데 이는 결과가 생성될때 까지 반복적으로 각 요소에 대한 (두 숫자를 추가 한다든지)동작을 허용한다.
이것을 함수형 프로그래밍에서는 고차 함수(fold operation)라고 부른다.

코드들을 보면 좀 더 쉽게 이해할 수 있을 것이다.
for 반복문을 사용해서 덧셈을 할때는 아래와 같은 코드로 작성된다.
int sum = 0;
for (int x : numbers) {
    sum += x; 
}

각각 엘리먼트드을 순회하면서 값들을 더하도록 되어 있다. 이 코드에는 두개의 파라미터가 있다고 볼 수 있는데 sum( 여기서는 0)은 값을 표현하는 것이고, 각 엘리먼트들을 더하귀 위한 것(여기서는 +)가 있다.

Stream에서 reduce 메서드를 사용한다면 엘리먼트들의 합은 Listing 11과 같은 코드로 표현된다.
reduce메서드는 두개의 인자값을 가진다. 
Listing 11.
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
  • 초기 화 값, 0
  • BinaryOperator<T>, 두개의 엘리먼트들을 더해서 새로운 값을 생성
reduce는 반복 적용 패턴을 추상화 해준다. 모든 값을 곱한다던지 최대값을 구하는 것들은 아래처럼 표현될 수 있다.
Listing12.
int product = numbers.stream().reduce(1, (a, b) -> a * b);
int product = numbers.stream().reduce(1, Integer::max);

Numeric Streams 
reduce 메서드를 사용해서 Stream에 있는 숫자형 값들에 대한 합계를 구할 수 있다. 하지만 이를 계산하는데 오브젝트를 Integer 형태로 Boxing 처리를 해야 하기 때문에 어느 정도 Cost가 든다. 아래 코드 처럼 명시적으로 sum() 메서드를 호출하는 것이 좀 더 좋아 보인다.

Listing 13.
int statement = 
    transactions.stream()
        .map(Transaction::getValue)
        .sum(); // error since Stream has no sum method

위와 같은 기능을 제공하기 위해 Stream에서는 3가지 기본형태의 Stream 인터페이스를 제공한다. IntStreamDoubleStream,LongStream 이 Stream들은 intdoublelong 형태의 Stream을 처리 하도록 되어 있다.

가장 빈번히 사용될 메서드는 아마 mapToIntmapToDoublemapToLong일 것이다 이 3가지 메서드들을 이전에 보았던 map과 동일한 동작을 하지만 이것의 리턴값은 Stream<T> 형태의 값을 준다. 예를 들어 Listing 13의 코드는 Listing 14처럼 수정할 수 있다. 

Listing 14.
int statementSum = 
    transactions.stream()
        .mapToInt(Transaction::getValue)
        .sum(); // works!

마지막으로 숫자형 Stream의 다른 활용 방법은 숫자 범위를 지정할 수 있는데 있다. 예를 들어 1에서 100사이의 값을 생성하려고 할때 range와 rangeClosed 메서드를 IntStreamDoubleSteamLongStream에서 사용할 수 있다.

이 두 메서드는 첫번째 인자와 마지막 인자사이의 값을 가져온다. range는 마지막 값을 제외하고 계산하고 rangeClosed는 이를 포함해서 계산한다. Listing 15에서 rangeClosed를 사용해서 10과 30사의 홀수를 가져오는 것을 볼 수 있다.

Listing 15.
IntStream oddNumbers = 
    IntStream.rangeClosed(10, 30)
        .filter(n -> n % 2 == 1);

Building Streams
Stream을 생성하는 몇가지 방법들이 있는데 지금까지는 컬렉션을 통해서 Stream을 생성했었고, 숫자를 사용해서 만들기도 했다. Stream은 값 목록이나 배열, 파일과 같은곳에서도 생성할 수 있으며 무한 Stream을 생성하기 위해 함수를 사용할 수도 있다.

값 목록이나 배열에서 Stream을 생성하는 것은 어렵지 않다. 스태틱 메서드인 Stream.of를 사용하거나 Arrays.Stream을 사용하면 된다.

Listing 16.
Stream<Integer> numbersFromValues = Stream.of(1, 2, 3, 4);
int[] numbers = {1, 2, 3, 4};
IntStream numbersFromArray = Arrays.stream(numbers);

파일에서 Stream을 생성할 경우 Files.Lines를 사용하면 되고, 아래 코드는 파일에서 라인수를 카운팅한다.

Listing 17.
long numberOfLines = 
    Files.lines(Paths.get(“yourFile.txt”), Charset.defaultCharset())
    .count();

Infinite streams : 지금까지 Stream이 무엇이고 어떻게 사용하는지에 대해서 이래했을 것이다. 함수형 프로그래밍에서 Stream을 생성하는 방법은 Stream.iterate와 Stream.generate가 있다. 엘리먼트들이 필요한 시점에 계산되기 때문에 이 두 함수는 지속적으로 엘리먼트들을 가져 올 수 있다. 이것이 바로 무한 Stream이다(고정된 사이즈의 Stream이 아닌)

Listing 18은 iterate를 사용해서 모든 숫자에 10을 더하는 기능이다. iterate메서드는 초기값 0부터 시작해서 계속 새로운 값을 생성한다. 

Listing 18.
Stream<Integer> numbers = Stream.iterate(0, n -> n + 10);

limit()을 사용해서 무한 Stream에서 고정된 크기의 Stream을 생성할 수 있다. Listing 19는 Stream의 사이즈를 5로 고정한 것을 보여주는 코드이다.

Listing 19.
numbers.limit(5).forEach(System.out::println); // 0, 10, 20, 30, 40

결론.
Java SE 8에서는 빠르고 정교한 데이터 처리를 위해 Stream API를 공개했다. 이 문서에서는 Stream이 제공하는 많은 기능들(filter,mapreduceiterate)을 살펴 봤고 이를 활용해서 간결한 코드로 정교한 데이터 처리 작업을 할 수 있었다. 이 방법은 Java SE 8 이전에 Collection 데이터들을 처리하는 방법과 매우 다르지만 많은 장점이 있다.
  1. Steream API는 몇가지 기능들을 활요해서 laziness 및 short-circuiting을 통해 데이터 처리를 최적화 할 수 있다.
  2. Stream은 병렬 처리를 손쉽게 지원한다.
다음 문서에서는 Stream의 filterMap이나 collect 같은 기능에 대하여 좀 더 알아볼 것이다.