2020/6/10 Creating streams in Dart

dart:asyncライブラリは他の多くのDart APIにとって重要な二つの型、StreamとFutureクラスを含んでいます。Futureクラスは一つの計算の結果を表し、Streamクラスは複数の計算の結果を表します。streamをlistenすることで結果(データ、あるいはエラーの両方)が届いたことの通知を受けます。listen中にポーズ(一時停止)することもできますし、streamが完了する前にlistenを止めることもできます。

この記事ではstreamの使い方に関しての記事ではありません。streamの作り方に関しての記事です。streamを作るためにはいくつかの方法があります。

  • 既存のstreamを変換する。
  • async*関数(ジェネレータ関数)を用いて一からstreamを作る。
  • StreamControllerインスタンスを用いてstreamを作る。

この記事ではそれぞれのアプローチのサンプルコードを示します。


Transforming an existing stream

一般的なstreamの作り方は、すでにあるstreamのイベントを基にして新しい別のstreamを作る方法です。例えば、入力をUTF-8デコーディングして、既にある「bytesのstream」を「文字列のstream」に変換したいとします。最も一般的なアプローチは、オリジナルのstreamの各イベントの結果を待って新しいイベントを出力する新しいstreamを作ることです。

sample1

/// Splits a stream of consecutive strings into lines.
///
/// The input string is provided in smaller chunks through
/// the `source` stream.
Stream<String> lines(Stream<String> source) async* {
  // Stores any partial line from the previous chunk.
  var partial = '';
  // Wait until a new chunk is available, then process it.
  await for (var chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0]; // Prepend partial line.
    partial = lines.removeLast(); // Remove new partial line.
    for (var line in lines) {
      yield line; // Add lines to output stream.
    }
  }
  // Add final partial line to output stream, if any.
  if (partial.isNotEmpty) yield partial;
}

一般的なstreamの変換では、Streamクラスが提供している変換用のメソッド、例えば、map(),where(),expand(),take()などが使えます。

例えば既存のstreamとして、カウンターとして毎秒カウント数を発するcounterStreamがあるとします。例えばsample2-1のように実装できます。sample2-2のようにするとcounterStreamの結果を表示することができます。

sample2-1

var counterStream =
    Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);

sample2-2

counterStream.forEach(print); // 1ずつ増えていく整数を15回表示します。

counterStreamのそれぞれのイベントの結果を変換するために、counterStreamをlistenする前にmap()メソッドのような変換用メソッドを実行します(sample2-3)。

sample2-3

void main(){
  
  var counterStream =
    Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(6);
  
  var doubleCounterStream = counterStream.map((int x) => x * 2);
  doubleCounterStream.listen((n)=>print(n));
}
/*
0
2
4
6
8
10
*/

https://api.dart.dev/stable/2.8.4/dart-async/Stream/map.html

map()メソッドのほかに、

sample2-4(where)

sample2-5(take)

のような変換用メソッドを使うこともできます。

sample2-4(where)

void main(){
  
  var counterStream =
    Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(10);
  
  var evenCounterStream = counterStream.where((int x)=>x.isEven);

  evenCounterStream.forEach(print);
}
/*
0
2
4
6
8
*/

whereメソッドは引数で渡された関数を、オリジナルのstreamの各イベントの結果に対して適用し、trueを返すイベントだけを発生させ、falseのイベントは無視します。sample2-4ではカウントが偶数の場合のみ採用され、奇数のイベントは無視されます。

https://api.dart.dev/stable/2.8.4/dart-async/Stream/where.html


sample2-5(take)

void main(){
  
  var counterStream =
    Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(10);

  var fiveCounterStream = counterStream.take(5);

  fiveCounterStream.listen((r)=>print(r));
}
/*
0
1
2
3
4
*/

https://api.dart.dev/stable/2.8.4/dart-async/Stream/take.html


変換用メソッドを使うだけで目的が達成されることも多いのですが、しかし時にはもっと複雑な変換が必要な場合もあります。そのような場合には、Streamクラスのtransform()メソッドを用い、その引数にStreamTransformerインスタンスを指定する方法があります。

例えば、sample2-6では、transformerとしてdart:convertライブラリの

utf8.decorder

LineSplitter

を使って変換しています。

sample2-6

Stream<List<int>> content = File('someFile.txt').openRead();
List<String> lines =
    await content.transform(utf8.decoder).transform(LineSplitter()).toList();

content.transform(utf8.decorder).transform(LineSplitter())は各イベント結果として一行の文字列を返すstream。

toList()メソッドは↑のstreamをlistenして各イベントの結果を要素として持つlist(配列)をFuture型として返す。つまりFuture<String>型を返す。

つまりcontent.transform(utf8.decorder).transform(LineSplitter()).toList()

の実行が開始される時点でfutureが生成され、contentのlistenが終了してtoList()メソッドがlist(配列)を作り終えた時点で変数linesに「文字列を要素に持つlist」がセットされる。awaitで受けているので、上記処理が完了する、あるいはエラーが出るまで待つ。終わったら次の行に処理が進む。(エラーハンドリングのコードは書いていないが、基本的にはそういう流れ)


Creating a stream from scratch

新しいstreamを生成する方法の一つとして、ジェネレータ関数(async*関数)を使う方法があります。

ジェネレータ関数が呼び出された時点でstreamが生成されます。

streamがlistenされた時点でジェネレータ関数のボディが実行開始されます。

ジェネレータ関数がreturnされた時点でsteamが閉じます。

ジェネレータ関数がreturnされるまで、yieldあるいはyield*を用いてイベントを発生させることができます。

上記の説明を見てわかるように、普通の関数とは挙動が違います。streamを作るための特殊な関数として、普通の関数の挙動とは別のものとして捉える必要があります。頭を切り替えましょう。

sample3-1は一定の間隔で数字を発生させるstreamを作るサンプルです。

sample3-1

void main(){
  Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

  var tc=timedCounter(Duration(seconds:1),7);
  tc.listen((r)=>print(r));
}
/*
0
1
2
3
4
5
6
*/

ジェネレータ関数timedCounterはStreamインスタンスを返します。streamがlistenされると、timedCounterのボディが開始されます。指定された間隔(サンプルでは1秒)の後、次の数字が(streamのイベントの結果として)yield(発生)されます。引数maxCountが省略されると、「ループの終了条件が無い」ということになるので、streamはいつまでもカウントをし続けることになります。

(listen()メソッド実行で返されるStreamSubscriptionクラスのcancel()メソッドを実行することにより)listenerがキャンセル(cancel)されると、次にボディ内でyield文に到達した時、yieldがreturn文のように動作します。中のfinally文が実行されると、ジェネレータ関数は終了します。終了する前にジェネレータ関数が結果の値を発生させようとしても失敗し、returnします。

最終的にジェネレータ関数が終了すると、cancel()メソッドにより返されたfutureはcompleteします。もしジェネレータ関数がエラーと共に終了した場合、futureはそのエラーと共にcompleteします。そうでなければnullと共にcompleteします。

 

工事中🏗

何も無いところからジェネレータ関数を使ってstreamを生成することはあまりありません。複数のデータソースを簡単に扱いたいわけですが、async*関数(ジェネレータ関数)はシンプルすぎて機能が十分ではありません。その代わりにStreamControllerクラスがあります。


Using a StreamController

工事中🏗

StreamControllerは新しいstreamを生成し、あらゆるタイミング、あらゆる場所でそのstreamに対してイベントを加える方法を提供します。StreamControllerにより生成されたstreamはリスナーを制御しポーズ(一時停止)するために必要な全てのロジックを持っています。streamを生成し、そのコントローラーを保持することができます。

sample4-1は、sample3-1で出てきたtimedCounter()関数の実装について、StreamControllerを使った基本的なサンプルです。しかしsample4-1はStreamControllerの使い方に関して好ましくない方法を用いています。このコードでは返すべきstreamを生成し、そしてtimerを生成してタイマーのイベントとしてstreamのイベント生成(controller.add(counter))を設定しています。タイマーのイベントはfutureのイベントでもstreamのイベントでもありません。

sample4-1(好ましくないサンプル)

// NOTE: This implementation is FLAWED!
// It starts before it has subscribers, and it doesn't implement pause.
Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}

timedCounter()関数から返されたstreamはsample4-2のように使用します。

sample4-2

var counterStream = timedCounter(const Duration(seconds: 1), 15);
counterStream.listen(print); // Print an integer every second, 15 times.

sample4-1のtimedCounterの実装には以下のような問題点があります。

  • streamがlistener(subscriber)を持つ前から、streamがイベントを発生してしまう。
  • subscriberがポーズ(一時停止)を要求しても、streamはイベントを発生し続けてしまう。

次のセクションで、StreamControllerインスタンス生成時にonListenやonPauseなどのコールバックを設定することで、これらの問題点を解決します。


Waiting for a subscription

一つのルールとして、streamはsubscriberを持つまでイベントは発生させません。ジェネレータ関数(async*関数)ではこのルールは自動的に達成されます。しかしStreamControllerクラスを使う場合、いつイベントを発生させるかについて、私たちが全てを私たちの思い通りにできます。イベントを発生させてはいけないタイミング(subscriberがまだないタイミング)で発生させることもできてしまいます。

subscriberがないタイミングでstreamがイベントを発生させた場合、そのイベントの結果は、StreamControllerによりバッファ(とりあえず受け取った結果を保存しておく)されます。もしstreamがいつまでもsubscriberを持たない場合、このバッファはメモリリーク(不要なデータがいつまでもメモリに存在し続ける状態)になってしまいます。

streamを使用するコードをsample4-2に示しましたが、それをsample4-3のように変更してみます。(実際動くサンプルはsample4-4)

sampel4-3

void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (int n in counterStream) {
    print(n); // Print an integer every second, 15 times.
  }
}

sample4-4

import 'dart:async';

void main(){
  listenAfterDelay();
}

void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 10);
  await Future.delayed(const Duration(seconds: 5));

  // After 5 seconds, add a listener.
  await for (int n in counterStream) {
    print(n);
  }
}

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}
/*
1  //実行開始5秒後に1〜5までが一気に表示。
2  //6〜10は1秒おきにカウントされる。
3
4
5
6
7
8
9
10
*/

sample4-4を実行すると、最初の5秒間は何も表示されません。しかしその間もstreamはイベントを発生させ続けています。5秒経ってlistenerが加えられたら、それまでに発生したイベントの結果(1〜5)が全てそのタイミングに一気に表示されます。StreamControllerによりその間のイベントの結果がバッファされているからです。

sample4-4は、subscriptionの追加より前からstreamがイベントを発生させているの問題(ルール違反)です。この問題を解決するには、StreamControllerインスタンス生成時にonListen引数を指定します。onListenに指定されたコールバック関数は、streamが一番最初のsubscriberを追加された時に呼び出されます。onCancel引数にコールバックを指定した場合、streamcontrollerが最後のsubscriberを失った時に呼び出されます。後に示すようにTimer.periodic()の処理をonListenコールバック内に記述することで、「subscriber追加前はstreamはイベントを発生しない。subscriber追加からタイマー実行開始させ、streamがイベントを発生させる」という挙動を実現できます。


Honoring the pause state

もう一つのルールとして、listenerがポーズ(一時停止)を要求した時(つまりポーズ中)は、streamはイベント発生を止めるべきです。ジェネレータ関数(async*関数)では、subscriptionがポーズしている間は、yield文において自動的に一時停止されます。それに対してStreamControllerでは、ポーズ中に発生したイベントはバッファされます。イベントを提供する側のコードが(listenerのポーズの要求を)無視すると、際限なくバッファのサイズが大きくなっていきます。さらに、ポーズ解除後すぐにlistenerがlisteningを停止した場合、バッファを作るために行われた処理が無駄になります。

ポーズに対するサポートがないと何が起こるか、streamを使用するコードをsample5-1のように変更してみます。(動くコードはsample5-2)

sample5-1

void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

sample5-2

import 'dart:async';

void main(){
  listenWithPause();
}

void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  StreamSubscription<int> subscription;

  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    if (counter == 5) {
      // After 5 ticks, pause for five seconds, then resume.
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  Timer.periodic(interval, tick); // BAD: Starts before it has subscribers.
  return controller.stream;
}
/*
1
2
3
4
5  //1〜5はカウントされる。
6
7
8
9
10  //6〜10は実行開始の10秒後に一気に表示される。
11
12
13
14
15  //11〜15はカウントされる。
*/

5秒間のポーズが終わると、そのポーズの間に発生されたイベントの結果を全て同じタイミングで受け取ります。sample5-2では、「subscriptionのポーズ中にタイマーを止める」コードがありませんので、streamはポーズを無視してイベントを発生させ続けます。そしてstreamは発生したイベントをバッファします。そしてポーズが終了した時にlistenerがポーズの間のイベントの結果を一気に受け取ります。


sample5-3では、StreamControllerインスタンス生成時にonListen,onPause,onResume,onCancelコールバックを設定して、ポーズを実装しています。

sample5-3

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  StreamController<int> controller;
  Timer timer;
  int counter = 0;

  void tick(_) {
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    if (counter == maxCount) {
      timer.cancel();
      controller.close(); // Ask stream to shut down and tell listeners.
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    if (timer != null) {
      timer.cancel();
      timer = null;
    }
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

sample5-3をsample5-1のlistenWithPause()関数と一緒に実行させると、ポーズ中はカウントが停止し、ポーズが終わるとカウントが再開する挙動を見ることができます。


sample5-4

//結局onlisten,onpauseなどがいつ呼び出されるのか、わかりやすいように
//listen,pauseなどのメソッド実行時に表示を出す。

import 'dart:async';

void main(){
  listenWithPause();
}

void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 10);
  StreamSubscription<int> subscription;

  print("stream.listen実行。");
  subscription = counterStream.listen((int counter) {
    print(counter); // Print an integer every second.
    
    if (counter == 4) {
      // After 4 ticks, pause for four seconds, then resume.
      print("subscription.pause実行。");
      subscription.pause(Future.delayed(const Duration(seconds: 4),(){print("future完了によりポーズ終了。");}));
    }
    
  });
}

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  StreamController<int> controller;
  Timer timer;
  int counter = 0;

  void tick(_) {
    /*
    if (counter == maxCount) {
      timer.cancel();
      print("controller.close実行。");
      controller.close(); // Ask stream to shut down and tell listeners.
    }
    */
    counter++;
    controller.add(counter); // Ask stream to send counter values as event.
    
    if (counter == maxCount) {
      timer.cancel();
      print("controller.close実行。");
      controller.close(); // Ask stream to shut down and tell listeners.
    }
    
  }

  void startTimer() {
    
      print("startTimer実行!");

    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    if(controller.isPaused){
      print("onPause実行!");
    }else if(controller.isClosed){
      print("onCancel実行");
    }
    if (timer != null) {
      timer.cancel();
      timer = null;
    }
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}
/*
stream.listen実行。
startTimer実行!
1
2
3
4
subscription.pause実行。
onPause実行!
future完了によりポーズ終了。
startTimer実行!
5
6
7
8
9
controller.close実行。
10
onCancel実行
*/

実際に動くコードがsample5-4です。

 

 

 

参考

https://dart.dev/articles/libraries/creating-streams

コメントを残す

メールアドレスが公開されることはありません。