2020/11/24 Asynchronous programming: streamsの訳

ポイントは何ですか?

  • ストリームは、データの非同期シーケンスを提供します。

  • シーケンス:複数のデータの連続(のような意味)

  • データシーケンスには、ユーザーが生成したイベントとファイルから読み取られたデータが含まれます。

  • streamを進める方法として「await forを使う方法」と「StreamAPIのlisten()メソッドを使う方法」があります。

  • ストリームは、エラーに応答する方法を提供します。

  • ストリームには、シングルサブスクリプションとブロードキャストの2種類があります。

Dartの非同期プログラミングはFutureクラスとStreamクラスを使用します。

Futureは即時に完了しない(時間がかかる)計算(とその結果)を表現します。通常の関数が結果を返すのに対し、非同期関数はFutureを返します。

Futureは最終的には結果の値(あるいはエラー)を返します。futureは結果がいつ返ってきたかを教えてくれます。

streamは複数の複数の非同期イベントの連続を表現します。streamは非同期なイテラブルのようなものです。

要求したときに次のイベントを取得する代わりに、準備ができたときにイベントがあることをストリームが通知します。


Receiving stream events

streamを生成する方法はいくつもありますが、それについては他の記事で説明します。

streamを生成する方法に対して、streamを使用する(結果を受け取る)方法は一つです。

await for(asynchronous for loop)を使って、streamのイベントをイテレート(一つずつ順番に処理する)します。例えば、

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

このコードは、整数イベントのストリームの各イベント(整数の結果)を受け取り、それらを合計して、合計を返すだけです。ループ本体が終了すると、次のイベントが到着するか、ストリームが完了するまで、関数は一時停止されます。

sumStream関数はasyncキーワードが使われています。ボディ内で

await for

を使う場合、asyncキーワードが必要です。(asyncキーワードを使っている関数(async関数)のみawait forが使える。)

下記のサンプルはasync*関数(ジェネレータ関数)を使ってシンプルなstream<int>を生成して、上記のサンプルコードをテストしています。

// Copyright (c) 2015, the Dart project authors.
// Please see the AUTHORS file for details.
// All rights reserved. Use of this source code is governed
// by a BSD-style license that can be found in the LICENSE file.

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  await for (var value in stream) {
    sum += value;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    yield i;
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // 55
}

 


Error events

ストリームは、イベントがなくなるとdoneイベントを発生し、イベントを受信するコードには、新しいイベントが到着したことが通知されるのと同じように通知されます。await forループを使用してイベントを読み取る場合、ストリームが完了するとループが停止します。

場合によっては、ストリームが完了する前にエラーが発生することもあるでしょう。

リモートサーバーからファイルをフェッチしているときにネットワークに障害が発生した、

イベントを作成するコードにバグがあった

などの可能性がありますが、エラーが発生したことを知る必要があります。

ストリームは、データイベントを配信するように、エラーイベントを配信することもできます。ほとんどのストリームは最初のエラーの後で停止しますが、複数のエラーを配信するストリームと、エラーイベントの後にさらに多くのデータを配信するストリームが存在する可能性があります。このドキュメントでは、最大で1つのエラーを配信するストリームについてのみ説明します。

await forを使ってstreamを進めている場合、ループ文でエラーがスローされます。これでループも終了します。

try-catchを使用してエラーをキャッチできます。次の例では、ループイテレータが4に等しい場合にエラーをスローします。

// Copyright (c) 2015, the Dart project authors.
// Please see the AUTHORS file for details.
// All rights reserved. Use of this source code is governed
// by a BSD-style license that can be found in the LICENSE file.

import 'dart:async';

Future<int> sumStream(Stream<int> stream) async {
  var sum = 0;
  try {
    await for (var value in stream) {
      sum += value;
    }
  } catch (e) {
    return -1;
  }
  return sum;
}

Stream<int> countStream(int to) async* {
  for (int i = 1; i <= to; i++) {
    if (i == 4) {
      throw new Exception('Intentional exception');
    } else {
      yield i;
    }
  }
}

main() async {
  var stream = countStream(10);
  var sum = await sumStream(stream);
  print(sum); // -1
}

 


Working with streams

Streamクラスには、Iterableのメソッドと同様に、ストリームに対して一般的な操作を実行できる多数のヘルパーメソッドが含まれています たとえばlastWhere()、Stream APIを使用して、ストリーム内の最後の正の整数を見つけることができ ます。

Future < int > lastPositive Stream < int > stream => 
    stream lastWhere ((x => x > = 0 );

Two kinds of streams

Single subscription streams

Single subscription streamは、最も一般的な種類のストリームであり、より大きな全体の一部である一連のイベントが含まれています。イベントは、正しい順序で、見逃すことなく配信される必要があります。これは、ファイルを読んだり、Webリクエストを受け取ったりしたときに取得する種類のストリームです。

このようなストリームは一度しか聞くことができません。後でもう一度聞くと、最初のイベントを見逃すことになる可能性があり、ストリームの残りの部分は意味がありません。リスニングを開始すると、データがフェッチされ、チャンクで提供されます。

Single subscription streamは

一つしかリスナーを持てない。

リスナーを持って初めてイベントを発する。

リスナーが無くなったら(アンサブスクライブ)イベントを発するのをやめる。

詳しくはこのページ(Stream<T>クラスのドキュメント)。


Broadcast streams

もう1つの種類のストリーム(Broadcast stream)は、一度に1つずつ処理できる個々のメッセージを対象としています。この種のストリームは、たとえばブラウザのマウスイベントに使用できます。

このようなストリームはいつでも聴き始めることができ、聴いている間に発生するイベントを取得できます。複数のリスナーが同時に聞くことができ、前のサブスクリプションをキャンセルした後、後でもう一度聞くことができます。


Methods that process a stream

Stream<T>クラスの下記のメソッドはstreamを進めて結果を返します。

二つの種類のstreamのどちらも結局リッスンしないと結果は得られないので、

下記のゲッター・メソッドを実行する=listenメソッドを実行する、そしてstreamを進める、ということになる。

Future<T> get first;
Future<bool> get isEmpty;
Future<T> get last;
Future<int> get length;
Future<T> get single;
Future<bool> any(bool Function(T element) test);
Future<bool> contains(Object needle);
Future<E> drain<E>([E futureValue]);
Future<T> elementAt(int index);
Future<bool> every(bool Function(T element) test);
Future<T> firstWhere(bool Function(T element) test, {T Function() orElse});
Future<S> fold<S>(S initialValue, S Function(S previous, T element) combine);
Future forEach(void Function(T element) action);
Future<String> join([String separator = ""]);
Future<T> lastWhere(bool Function(T element) test, {T Function() orElse});
Future pipe(StreamConsumer<T> streamConsumer);
Future<T> reduce(T Function(T previous, T element) combine);
Future<T> singleWhere(bool Function(T element) test, {T Function() orElse});
Future<List<T>> toList();
Future<Set<T>> toSet();

drain()メソッドとpipe()メソッドを除き、対応する関数がIterableクラスにあります。それぞれのメソッドはasync関数とawait forループを用いて簡潔に記述することができます。例えば、いくつかの実装は下記のように記述できます。

Future<bool> contains(Object needle) async {
  await for (var event in this) {
    if (event == needle) return true;
  }
  return false;
}

Future forEach(void Function(T element) action) async {
  await for (var event in this) {
    action(event);
  }
}

Future<List<T>> toList() async {
  final result = <T>[];
  await this.forEach(result.add);
  return result;
}

Future<String> join([String separator = ""]) async =>
    (await this.toList()).join(separator);

(実際の実装はもっと複雑ですが、主な原因は歴史的な理由です。)


Methods that modify a stream

Streamクラスの下記のメソッドはオリジナルのstreamを基にして新しいstreamを生成します。基のstreamがリッスンされるまで新しいstreamがリッスンされるのを待ちます。

Stream<R> cast<R>();
Stream<S> expand<S>(Iterable<S> Function(T element) convert);
Stream<S> map<S>(S Function(T event) convert);
Stream<T> skip(int count);
Stream<T> skipWhile(bool Function(T element) test);
Stream<T> take(int count);
Stream<T> takeWhile(bool Function(T element) test);
Stream<T> where(bool Function(T event) test);

上記のメソッドは、Iterable を別のIterableに変換するIterableの同様のメソッドに対応します。これらはすべてasyncawaitforループを備えた関数を使用して簡単に記述できます。

Stream<E> asyncExpand<E>(Stream<E> Function(T event) convert);
Stream<E> asyncMap<E>(FutureOr<E> Function(T event) convert);
Stream<T> distinct([bool Function(T previous, T next) equals]);

工事中🏗

 


Reading and decoding a file

ファイルの読み取りとデコード

次のコードは、ファイルを読み取り、ストリームに対して2つの変換を実行します。最初にUTF8からのデータを変換し、次にLineSplitterを介して実行します。 ハッシュタグで始まる行を除いて、すべての行が出力され#ます。

import 'dart:convert';
import 'dart:io';

Future<void> main(List<String> args) async {
  var file = File(args[0]);
  var lines = utf8.decoder
      .bind(file.openRead())
      .transform(LineSplitter());
  await for (var line in lines) {
    if (!line.startsWith('#')) print(line);
  }
}

The listen() method

このページで最後に説明するStreamクラスのメソッドはlisten()メソッドです。

他の全てのstreamのメソッドはlisten()メソッドで定義されています。

StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});

新しいStream型を定義したいときは、Streamクラスを継承し、listenメソッドを実装すれば良いのです。Streamクラスの他の全てのメソッドはlisten()メソッドを呼び出します。

listen()メソッドを実行すると、streamのリッスンをスタートすることができます。

そうするまで、ストリームは、表示したいイベントを説明する不活性オブジェクトです。

listen()メソッドを呼び出すと、イベントを発するアクティブなストリームを表すStreamSubscriptionオブジェクトが返されます。

これは、Iterableクラスはただのオブジェクトのコレクションであり、iteratorが実際のイテレートをするものである、というのと似ています。

stream subscriptionを使えば、サブスクリプションをポーズ(一時停止)したり、ポーズから再開したり、キャンセルしたり、などの挙動を完璧にコントロールできます。

データイベントまたはエラーイベントごとに、およびストリームが閉じられたときに呼び出されるようにコールバックを設定できます。

 

 

参考

https://dart.dev/tutorials/language/streams

コメントを残す

メールアドレスが公開されることはありません。