Flutter 개발 상자

[Flutter Stream②] Stream 기본 사용법 본문

Flutter

[Flutter Stream②] Stream 기본 사용법

망고상자 2023. 11. 12. 00:11
728x90

간단한 Stream 만들어보기

 

일단 RxDart는 다루지 않을 예정이다. ①에서 RxDart를 언급하긴했지만 기본적으로 Stream만으로도 Flutter로 앱을 만드는데 크게 불편함을 못느끼기 때문에 (Stream도 그렇게 자주 쓰이지가 않는다) 기본적인 Stream을 Deep하게 공부하고 넘어가는게 더 도움이 될것같다.

 

async* 와 yield

async*는 async와 마찬가지로 비동기 함수를 만들기 위한 키워드이다.

async가 단일값을 반환하는것과 달리 async*는 여러값 혹은 Stream을 반환할 수 있다.

yield는 return과 마찬가지로 값을 반환하지만 함수의 상태를 유지한다. 

 

여러 방법으로 Stream 만들어보기

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  // 함수로 스트링 생성
  countStream(10);
  // 10의 값을 가지는 스트림
  Stream.value(10);
  // 배열값을 가지는 스트림
  Stream.fromIterable([1, 2, 3, 4, 5]);
  // Future 배열을 가지는 스트림
  Stream.fromFutures([
    Future.delayed(const Duration(milliseconds: 500), () => print('첫번째 요소')),
    Future.delayed(const Duration(milliseconds: 500), () => print('두번째 요소')),
    Future.delayed(const Duration(milliseconds: 500), () => print('세번째 요소')),
  ]);
  // 1초마다 숫자 반환
  Stream.periodic(const Duration(seconds: 1), (computationCount) {
    print('count : $computationCount');
  },);
}

 

async*와 yield의 조합으로 Stream을 만드는게 꽤 중요할것같다.

아무래도 현업에서는 커스텀된 Stream을 쓸일이 많을것같으니...

 

1초마다 카운트를 방출하는 Stream의 값을 수신하기

// 입력받은 숫자에 도달할때까지 1초마다 숫자를 하나씩 방출한다.
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    await Future.delayed(const Duration(seconds: 1));
    yield i;
  }
}

void main() async {
  final Stream<int> stream = countStream(10);

  // 실행결과
  // count : 1, 2, 3 ... 10, end
  await for (int count in stream) {
    print('count : $count');
  }
  print('end');

  // 실행결과
  // count : 1, 2, 3 ... 10, end
  await stream.forEach(print);

  // 실행결과
  // end, count : 1, 2, 3, 4....
  stream.listen((count) {
    print('count : $count');
  });
  print('end');
}

 

await for 라는 키워드가 눈에 띈다.

Stream을 Future의 배열이라고 생각했을때 forEach문에 await를 걸어 Future의 값을 벗겨주는 개념으로 이해하면 좋을것같다.

조금 더 간소화해서 stream.forEach 라는 문법으로도 사용 가능하다.

 

listen 키워드는 Future의 then과 비슷해보인다. 코드의 흐름을 Blocking 하지 않고 Stream을 방출한다. 다만 then은 await를 사용할 수 있지만 listen은 await가 먹히지 않는다.

 

에러처리

 

// 입력받은 숫자에 도달할때까지 1초마다 숫자를 하나씩 방출한다.
Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

void main() async {
  final Stream<int> stream = countStream(10);

  // 출력결과
  // 1, 2, 3, error : Exception: Intentional exception
  try {
    await for (int count in stream) {
      print('count : $count');
    }
  } catch (error, stackTrace) {
    print('error : ${error.toString()}');
  }

  stream.listen(
    (count) {
      print('count : $count');
    },
    onError: (error, stackTrace) {
      print('error : ${error.toString()}');
    },
  );
}

 

try catch 블록의 위치에 주목해볼만하다.

await를 통해서 for문을 돌면서 값을 수신하고 있기 때문에 for문 밖에서 에러를 잡아주어야 한다.

listen은 onError 라는 파라미터를 통해서 에러를 핸들링할 수 있다.

 

Stream 관련 메서드

 

Stream을 처리하는 메서드들

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  final Stream<int> stream = countStream(10);
  StreamController<int> controller = StreamController();

  print(await stream.toSet()); // {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
  print(await stream.toList()); // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
  print(await stream.first); // 1
  // firstWhere 와 유사하지만 일치하는 요소가 2개 이상 발생하면 오류가 발생한다.
  print(await stream.singleWhere((element) => element == 2)); // 1
  print(await stream.firstWhere((element) => element == 2)); // 2
  print(await stream.lastWhere((element) => element == 2)); // 2
  print(await stream.join(' ')); // 1 2 3 4 5 6 7 8 9 10
  await stream.forEach((element) { print('$element'); }); // 12345678910
  print(await stream.reduce((previous, element) => previous + element)); // 55
  print(await stream.fold<int>(10, (previous, element) => previous + element)); // 65
  print(await stream.elementAt(2)); // 3
  print(await stream.first); // 1
  print(await stream.last); // 10
  print(await stream.isEmpty); // true
  print(await stream.contains(5)); // true
  // 스트림이 단일 요소일경우 값 반환, 아닐경우 에러 반환
  print(await stream.single); // 에러
  
  // 스트림에서 제공하는 요소를 이용하여 수식을 만들고, 이를 모든 요소가 만족하는지 확인
  print(await stream.every((element) {
    return element >= 5; // false
  }));
  // 스트림에서 제공하는 요소를 이용하여 수식을 만들고, 이를 만족시키는 요소가 있는지 확인
  print(await stream.any((element) {
    return element == 5; // true
  },));
  
  /// stream 고유 API
  // controller.addStream(stream)와 작업이 비슷하다. pipe은 요소를 모두 방출한 후 controller.close()를 호출해준다.
  stream.pipe(controller);
  
  // 스트림의 모든 데이터를 삭제하고 입력한 값을 반환시킴.
  print(await stream.drain()); // null
  print(await stream.drain(100)); // 100
}

 

링크 : https://dart.dev/tutorials/language/streams#process-stream-methods

 

Asynchronous programming: Streams

Learn how to consume single-subscriber and broadcast streams.

dart.dev

튜토리얼 내용에 따르면 drain()과 pipe()를 제외하면 Iterable 함수와 동일한 API들이라고 한다.

그래서 그런지 익숙한 함수들, 굳이 실행해보지 않아도 결과를 알것같은 함수들이 꽤 보인다.

근데 single이나 any, every 같은 함수는 생소해서 이번에 처음 사용법을 알게 되었다.

 

pipe은 나중에 또 공부하게될 StreamController에 Stream을 추가할때 쓰는 메서드로 보이는데 Controller.close() 작업이 더 붙어있다. 어떤때 pipe을 써야하는지는 아직 감이 잘 안온다.

drain()은 스트림의 모든 데이터를 Discards 하고 입력한 값을 반환시킨다고는 하는데... 이것 역시 언제 써야하지? 라는 의문이 남는다.

 

Stream을 수정하는 메서드들

 

import 'dart:async';

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

void main() async {
  Stream<int> stream = countStream(10);

  // 입력한 조건을 만족하는 스트림을 반환
  final newStream = stream.where((event) => event.isEven);
  // 조건을 충족시키지 못할때까지 요소를 집어넣은 스트림을 반환. skipWhile 과 반대되는 개념
  final newStream = stream.takeWhile((element) => element.isOdd);
  // 입력한 숫자만큼의 요소 길이를 가지는 스트림을 반환
  final newStream = stream.take(3);
  // 조건을 충족시키지 못할때까지 skip 하는 스트림을 반환
  final newStream = stream.skipWhile((element) => element < 5);
  // 입력된 숫자만큼 앞의 요소들을 제거한 스트림을 반환
  final newStream = stream.skip(3);
  // 스트림을 변형하여 새로운 스트림을 만든다.
  final newStream = stream.map<bool>((event) => event.isEven);
  // 마지막 요소에 11, 12가 추가된 스트림을 반환
  final newStream = stream.expand((element) => [element + 1, element + 2]);
  // 해당 제네릭 타입으로 캐스팅한 스트림을 반환
  final newStream = stream.cast<int>();

  await for(final count in newStream) {
    print('$count');
  }
}

 

역시 Iterable과 유사한 함수들이다.

원래 Stream에는 영향을 주지 않고 모두 새로운 Stream을 반환한다.

근데 보통 map이나 where 말고는 잘 사용을 안하기 때문에 생소한게 많았다.

expand는 캐스케이드 문법으로 대체될 것 같고, cast는 쓰기 까다로워보여서 map이 더 어울린다.

while류 문법 역시 어디서 써야할지... 굳이 몰라도 될것같다. 이번에 한번 맛본걸로 만족해야지

 

distinct, asyncMap, asyncExpand

void main() async {
  Stream<int> stream = Stream.fromIterable([2, 6, 6, 8, 12, 8, 8, 2]);

  // 스트림에서 연속된 데이터 이벤트가 두번 이상 나타나지 않도록 필터링 해준다.
  // 다만 이전값과 다음값만 비교하기 때문에 멀리 떨어져 있는 중복값은 필터링 되지 않는다.
  // 파라미터로 정렬 함수를 제공할 수 있다. 이는 List를 sort할때 함수를 제공하는것과 동일하다.
  // 아래의 출력은 2, 6, 8, 12, 8, 2
  final newStream = stream.distinct();
  // 기본적으로 map 과 비슷하지만 요소를 변형하는 과정에서 Future 를 반환할 수 있게 해준다.
  final newStream = stream.asyncMap<bool>((event) async {
    await Future.delayed(Duration(seconds: 1));
    return event.isEven;
  });
  // 스트림에서 반환되는 요소를 통해서 새로운 스트림을 만들고, 그 스트림들을 하나로 합쳐서 새로운 스트림을 만든다.
  final newStream = stream.asyncExpand((event) => Stream.value(event + 1));

  newStream.forEach(print);
}

 

이번에는 좀 더 응용버전이다. distinct는 중복된 요소가 2번 나오지 않도록 필터링해준다.

asyncMap은 꽤 유용할것같다. Future를 넣어서 변형시킬 수 있다.

asyncExpand는 어디에 써야할지 좀 난해해보인다.

 

handleError

// count가 2일때 에러가 발생하지만 스트림이 멈추지 않는다. handleError에서 오류를 처리하고 다음 요소로 넘어간다.
  Stream.periodic(const Duration(seconds: 1), (count) {
    if (count == 2) {
      throw Exception('Exceptional event');
    }
    return count;
  }).take(4).handleError((error) {
    print('Caught error: $error');
    // 에러를 다시 던지거나 처리
  }).forEach(print);

 

Stream에 에러 핸들링을 추가하고, 에러로 인해서 Stream이 끊기지 않게 해주는 개념

함수로 Stream을 만들고 try catch를 사용해도 비슷한 효과가 나올것같다.

 

timeout

// 일정 시간내에 이벤트가 발생하지 않을때 타임아웃 처리를 수행할 수 있게 해준다.
  // onTimeout이 정의되어 있지 않다면 Exception 이 발생한다.
  // 아래의 출력 결과는 timeout
  Stream.periodic(const Duration(seconds: 3), (count) {
    return count;
  }).take(4).timeout(
    Duration(seconds: 2),
    onTimeout: (sink) {
      print('timeout');
      sink.close();
    },
  ).forEach(print);

 

일정시간동안 이벤트가 발생하지 않을때 타임아웃처리

직접 사용해보면 어렵지 않은 개념

 

transform

final stream = Stream.fromIterable([1, 2, 3, 4, 5]);

  final transformedStream = stream.transform(StreamTransformer<int, String>.fromHandlers(
    handleData: (int data, EventSink<String> sink) {
      sink.add('data : $data');
    },
  ));

  transformedStream.listen((String data) {
    print(data);
  });

 

개념을 이해하기가 좀 어렵다.

기본적으로는 map과 비슷하게 Stream을 변형하여 새로운 Stream을 만든다.

근데 map과 무엇이 다른지는 잘 모르겠다. 이건 Stream을 더 공부하고 다시 봐야했다.

 

 

요약 제목

  • async* 는 여러값 반환 가능, yield는 반환후 함수상태 유지
  • stream 값 수신할때는 await for 키워드 혹은 .listen
  • stream은 기본적으로 Iterator 와 유사한 함수가 많다.
728x90