File tree Expand file tree Collapse file tree 4 files changed +47
-2
lines changed Expand file tree Collapse file tree 4 files changed +47
-2
lines changed Original file line number Diff line number Diff line change 1+ ## 2.13.1-wip
2+
3+ - Fix ` StreamGroup.broadcast().close() ` to properly complete when all streams in the group close without being explicitly removed.
4+
15## 2.13.0
26
37- Fix type check and cast in SubscriptionStream's cancelOnError wrapper
Original file line number Diff line number Diff line change @@ -289,7 +289,32 @@ class StreamGroup<T> implements Sink<Stream<T>> {
289289 if (_closed) return _controller.done;
290290
291291 _closed = true ;
292- if (_subscriptions.isEmpty) _controller.close ();
292+
293+ if (_subscriptions.isEmpty) {
294+ _onIdleController? .close ();
295+ _controller.close ();
296+ return _controller.done;
297+ }
298+
299+ if (_controller.stream.isBroadcast) {
300+ // For a broadcast group that's closed, we must listen to streams with
301+ // null subscriptions to detect when they complete. This ensures the
302+ // group itself can close once all its streams have closed.
303+ List <Stream <T >>? streamsToRemove;
304+
305+ _subscriptions.updateAll ((stream, subscription) {
306+ if (subscription != null ) return subscription;
307+
308+ try {
309+ return _listenToStream (stream);
310+ } on Object {
311+ (streamsToRemove ?? = []).add (stream);
312+ return null ;
313+ }
314+ });
315+
316+ streamsToRemove? .forEach (_subscriptions.remove);
317+ }
293318
294319 return _controller.done;
295320 }
Original file line number Diff line number Diff line change 11name : async
2- version : 2.13.0
2+ version : 2.13.1-wip
33description : Utility functions and classes related to the 'dart:async' library.
44repository : https://github.com/dart-lang/core/tree/main/pkgs/async
55issue_tracker : https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync
Original file line number Diff line number Diff line change @@ -491,6 +491,22 @@ void main() {
491491 controller.add ('first' );
492492 expect (streamGroup.close (), completes);
493493 });
494+
495+ test ('completes close() when streams close without being removed' ,
496+ () async {
497+ var controller = StreamController .broadcast ();
498+ var group = StreamGroup .broadcast ();
499+ group.add (controller.stream);
500+ var closeCompleted = false ;
501+ group.close ().then ((_) => closeCompleted = true );
502+
503+ await flushMicrotasks ();
504+ expect (closeCompleted, isFalse);
505+
506+ await controller.close ();
507+ await flushMicrotasks ();
508+ expect (closeCompleted, isTrue);
509+ });
494510 });
495511
496512 group ('regardless of type' , () {
You can’t perform that action at this time.
0 commit comments