From 4fe1ede0c1592d49e2e0fe3ce8b16b3ea9f7e1b4 Mon Sep 17 00:00:00 2001 From: David Morgan Date: Mon, 27 Oct 2025 13:21:11 +0100 Subject: [PATCH] Add test for unsubscribing and resubscribing. --- .../test/directory_watcher/file_tests.dart | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/pkgs/watcher/test/directory_watcher/file_tests.dart b/pkgs/watcher/test/directory_watcher/file_tests.dart index d56f921d8..00014937d 100644 --- a/pkgs/watcher/test/directory_watcher/file_tests.dart +++ b/pkgs/watcher/test/directory_watcher/file_tests.dart @@ -2,10 +2,12 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:io' as io; import 'dart:io'; import 'dart:isolate'; +import 'package:async/async.dart'; import 'package:test/test.dart'; import 'package:test_descriptor/test_descriptor.dart' as d; import 'package:watcher/src/utils.dart'; @@ -34,6 +36,78 @@ void _fileTests({required bool isNative}) { } }); + // ResubscribableWatcher wraps all the directory watchers to add handling of + // multiple subscribers. The underlying watcher is created when there is at + // least one subscriber and closed when there are zero subscribers. So, + // exercise that behavior in various ways. + test('ResubscribableWatcher handles multiple subscriptions ', () async { + final watcher = createWatcher(); + + // One subscription, one event, close the subscription. + final queue1 = StreamQueue(watcher.events); + final event1 = queue1.next; + await watcher.ready; + writeFile('a.txt'); + expect(await event1, isAddEvent('a.txt')); + await queue1.cancel(immediate: true); + + // Open before "ready", cancel before event. + final queue2a = StreamQueue(watcher.events); + // Open before "ready", cancel after one event. + final queue2b = StreamQueue(watcher.events); + // Open before "ready", cancel after two events. + final queue2c = StreamQueue(watcher.events); + + unawaited(queue2a.cancel(immediate: true)); + + await watcher.ready; + + // Open after "ready", cancel before event. + final queue2d = StreamQueue(watcher.events); + + // Open after "ready", cancel after one event. + final queue2e = StreamQueue(watcher.events); + + // Open after "ready", cancel after two events. + final queue2f = StreamQueue(watcher.events); + + unawaited(queue2d.cancel(immediate: true)); + + writeFile('b.txt'); + + expect(await queue2b.next, isAddEvent('b.txt')); + expect(await queue2c.next, isAddEvent('b.txt')); + expect(await queue2e.next, isAddEvent('b.txt')); + expect(await queue2f.next, isAddEvent('b.txt')); + await queue2b.cancel(immediate: true); + await queue2e.cancel(immediate: true); + + // Remaining subscriptions still get events. + writeFile('c.txt'); + expect(await queue2c.next, isAddEvent('c.txt')); + expect(await queue2f.next, isAddEvent('c.txt')); + + // New subscriber gets events without waiting for "ready" because the + // underlying "watch" is still running. + final queue2g = StreamQueue(watcher.events); + writeFile('d.txt'); + expect(await queue2g.next, isAddEvent('d.txt')); + + // Close all the subscriptions. + await queue2c.cancel(immediate: true); + await queue2f.cancel(immediate: true); + await queue2g.cancel(immediate: true); + + // Open another subscription. This time it does not get events from before + // "ready" because the underlying "watch" was closed. + final queue3 = StreamQueue(watcher.events); + writeFile('e.txt'); + await watcher.ready; + writeFile('f.txt'); + expect(await queue3.next, isAddEvent('f.txt')); + await queue3.cancel(immediate: true); + }); + test('does not notify for files that already exist when started', () async { // Make some pre-existing files. writeFile('a.txt');