Flutter stream周りをもう一度ちゃんと理解する

2秒ごとに無限に数字が出力し続ける

import 'dart:async';

void main() {
  // 最後にtake()を追加すると指定した回数で止まる
  var counterStream =
    Stream<int>.periodic(const Duration(seconds: 2), (x) => x);
  counterStream.forEach(print);
}

上記とほぼ同等のコードがawait forで書ける

import 'dart:async';

Future<void> main() async {
  var counterStream =
    Stream<int>.periodic(const Duration(seconds: 2), (x) => x);
  await for(int n in counterStream){
    print(n);
  }
}

https://dart.dev/guides/libraries/library-tour#stream

じゃあ2つのstreamがある場合は

import 'dart:async';

void main() {
  var counterStream =
    Stream<int>.periodic(const Duration(seconds: 2), (x) => x*2);
  counterStream.forEach(print);

    var counterStream2 =
    Stream<int>.periodic(const Duration(seconds: 2), (x) => x*2-1);
  counterStream2.forEach(print);
}

2つのストリームが非同期的に出力される

上記をawait forで書き換えても同じ結果にはならない。 counterStreamがendless streamsなのでいつまで経っても二つ目のストリームが実行されない

import 'dart:async';

Future<void> main() async {
  var counterStream =
    Stream<int>.periodic(const Duration(seconds: 2), (x) => x*2);
  await for(int n in counterStream) {
    print(n);
  } 

    var counterStream2 =
    Stream<int>.periodic(const Duration(seconds: 2), (x) => x*2-1);
  await for(int n in counterStream2) {
    print(n);
  } 
}

yieldを理解する

import 'dart:async';

void main() {
  func1().listen((e)=>print(e));
  // 2
  // 4
  // 6
}

Stream<int> func1() async* {
  // *をつけると別のgenerator functionに委譲できる
  // 再帰処理はこれで書くとパフォーマンスが良くなるらしい
  yield* func2();
}

Stream<int> func2() async* {
  yield 2;
  yield 4;
  yield 6;
}

https://dart.dev/guides/language/language-tour#generators

StreamSubscription

この辺からが本題

in_app_purchaseで登場するので使い方を押さえたい
https://pub.dev/packages/in_app_purchase#listening-to-purchase-updates

いろいろテストしてもonDoneの部分は通らないが、どういう場面実行されるのだろうか。

https://github.com/flutter/plugins/blob/c1c46531c77f7c7007cfe31071f34a4b1fbbdf51/packages/in_app_purchase/in_app_purchase/lib/in_app_purchase.dart#L53

This stream will never close as long as the app is active.

と書いてあるけども。onDoneのリファレンスにも

onDone method - StreamSubscription class - dart:async library - Dart API

The handleDone function is called when the stream closes.

と書いてある。

StreamSubscription自体のリファレンスは下記
StreamSubscription class - dart:async library - Dart API

リファレンスを読むとpause, resume, cancelなどができる。cancelしたものはresumeできないそうです。

firestoreでサンプル作ってみました。

import 'dart:async';

import 'package:firebase_core/firebase_core.dart';
import 'package:flutter/material.dart';
import 'package:cloud_firestore/cloud_firestore.dart';

Future<void> main() async {
  WidgetsFlutterBinding.ensureInitialized();
  await Firebase.initializeApp();
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',
      theme: ThemeData(
        primarySwatch: Colors.blue,
      ),
      home: MyHomePage(title: 'Flutter Demo Home Page'),
    );
  }
}

class MyHomePage extends StatefulWidget {
  MyHomePage({Key? key, required this.title}) : super(key: key);
  final String title;

  @override
  _MyHomePageState createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
  late StreamSubscription<DocumentSnapshot<Map<String, dynamic>>> _subscription;
  Map<String, dynamic> _public = {};

  @override
  void initState() {
    final firestore = FirebaseFirestore.instance;
    _subscription = firestore
        .collection('publics')
        .doc('763fznZQsyE0DmiUzeMZ')
        .snapshots()
        .listen((event) {
      print('on data.');
      setState(() {
        _public = event.data() ?? {};
      });
    }, onDone: () => print('on done.'));
    // firestoreだとdoneが実行されることはなさそう
    // onDoneが実行された後の再復活みたいなことはあるのだろうか?

    super.initState();
  }

  @override
  void dispose() {
    _subscription.cancel();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Text(
              'description: ${_public['description']}',
              style: Theme.of(context).textTheme.headline4,
            ),
            ElevatedButton(
                onPressed: _subscription.cancel,
                child: Text('subscription cancel')),
            ElevatedButton(
                onPressed: _subscription.pause,
                child: Text('subscription pause')),
            ElevatedButton(
                onPressed: _subscription.resume,
                child: Text('subscription resume')),
          ],
        ),
      ),
    );
  }
}

cancelもしくはpauseするとconsole.firebaseから変更しても画面には反映されない。 pauseした場合はresumeすると最新の情報が同期される。