From 6c87bec709ac2309a3aec45e1d694f463e2c5cdb Mon Sep 17 00:00:00 2001 From: David Morgan Date: Fri, 7 Nov 2025 11:54:55 +0100 Subject: [PATCH] Run Windows directory watcher in an isolate. --- pkgs/watcher/CHANGELOG.md | 4 + pkgs/watcher/lib/src/directory_watcher.dart | 15 +- .../src/directory_watcher/directory_list.dart | 17 +- .../lib/src/directory_watcher/windows.dart | 38 ++-- .../windows_isolate_directory_watcher.dart | 170 ++++++++++++++++++ .../windows_resubscribable_watcher.dart | 25 +++ pkgs/watcher/lib/src/resubscribable.dart | 3 +- .../test/directory_watcher/file_tests.dart | 78 ++++++++ .../windows_isolate_test.dart | 115 ++++++++++++ .../test/directory_watcher/windows_test.dart | 86 +-------- .../test/no_subscription/windows_test.dart | 2 +- pkgs/watcher/test/ready/windows_test.dart | 2 +- 12 files changed, 443 insertions(+), 112 deletions(-) create mode 100644 pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart create mode 100644 pkgs/watcher/lib/src/directory_watcher/windows_resubscribable_watcher.dart create mode 100644 pkgs/watcher/test/directory_watcher/windows_isolate_test.dart diff --git a/pkgs/watcher/CHANGELOG.md b/pkgs/watcher/CHANGELOG.md index 06d8bdfba..8f53ab91c 100644 --- a/pkgs/watcher/CHANGELOG.md +++ b/pkgs/watcher/CHANGELOG.md @@ -6,6 +6,10 @@ - `DirectoryWatcher` on Windows performance: reduce 100ms buffering of events before reporting to 5ms, the larger buffer isn't needed for correctness after the various fixes. +- `DirectoryWatcher` on Windows watches in a separate Isolate to make buffer + exhaustion, "Directory watcher closed unexpectedly", much less likely. The old + implementation which does not use a separate Isolate is available as + `DirectoryWatcher(path, runInIsolateOnWindows: false)`. - Bug fix: while listing directories skip symlinks that lead to a directory that has already been listed. This prevents a severe performance regression on MacOS and Linux when there are more than a few symlink loops. diff --git a/pkgs/watcher/lib/src/directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher.dart index 9cc27eeed..a600acb81 100644 --- a/pkgs/watcher/lib/src/directory_watcher.dart +++ b/pkgs/watcher/lib/src/directory_watcher.dart @@ -8,7 +8,7 @@ import '../watcher.dart'; import 'custom_watcher_factory.dart'; import 'directory_watcher/linux.dart'; import 'directory_watcher/mac_os.dart'; -import 'directory_watcher/windows.dart'; +import 'directory_watcher/windows_resubscribable_watcher.dart'; /// Watches the contents of a directory and emits [WatchEvent]s when something /// in the directory has changed. @@ -20,6 +20,8 @@ import 'directory_watcher/windows.dart'; /// the message "Directory watcher closed unexpectedly" on the event stream. The /// code using the watcher needs to do additional work to account for the /// dropped events, for example by recomputing interesting files from scratch. +/// By default, the watcher is started in a separate isolate to make this less +/// likely. Pass `runInIsolateOnWindows = false` to not launch an isolate. /// /// On Linux, the underlying SDK `Directory.watch` fails if the system limit on /// watchers has been reached. If this happens the SDK exception is thrown, it @@ -40,7 +42,11 @@ abstract class DirectoryWatcher implements Watcher { /// shorter will give more immediate feedback at the expense of doing more IO /// and higher CPU usage. Defaults to one second. Ignored for non-polling /// watchers. - factory DirectoryWatcher(String directory, {Duration? pollingDelay}) { + /// + /// On Windows, pass [runInIsolateOnWindows] `false` to not run the watcher + /// in a separate isolate to reduce buffer exhaustion failures. + factory DirectoryWatcher(String directory, + {Duration? pollingDelay, bool runInIsolateOnWindows = true}) { if (FileSystemEntity.isWatchSupported) { var customWatcher = createCustomDirectoryWatcher( directory, @@ -49,7 +55,10 @@ abstract class DirectoryWatcher implements Watcher { if (customWatcher != null) return customWatcher; if (Platform.isLinux) return LinuxDirectoryWatcher(directory); if (Platform.isMacOS) return MacOSDirectoryWatcher(directory); - if (Platform.isWindows) return WindowsDirectoryWatcher(directory); + if (Platform.isWindows) { + return WindowsDirectoryWatcher(directory, + runInIsolate: runInIsolateOnWindows); + } } return PollingDirectoryWatcher(directory, pollingDelay: pollingDelay); } diff --git a/pkgs/watcher/lib/src/directory_watcher/directory_list.dart b/pkgs/watcher/lib/src/directory_watcher/directory_list.dart index 12d4017a1..7d84201f5 100644 --- a/pkgs/watcher/lib/src/directory_watcher/directory_list.dart +++ b/pkgs/watcher/lib/src/directory_watcher/directory_list.dart @@ -142,12 +142,17 @@ class _ResolvedDirectory { } on FileSystemException catch (e, s) { // The first operation on a directory is to resolve symbolic links, which // fails with a general FileSystemException if the file is not found. - // Convert that into a PathNotFoundException as that makes more sense - // to the caller, who didn't ask for anything to do with symbolic links. - if (e.message.contains('Cannot resolve symbolic links') && - e.osError?.errorCode == 2) { - throw Error.throwWithStackTrace( - PathNotFoundException(directory.path, e.osError!), s); + // Convert that into a PathNotFoundException or PathAccessException + // as that makes more sense to the caller, who didn't ask for anything to + // do with symbolic links. + if (e.message.contains('Cannot resolve symbolic links')) { + if (e.osError?.errorCode == 2) { + throw Error.throwWithStackTrace( + PathNotFoundException(directory.path, e.osError!), s); + } else if (e.osError?.errorCode == 5) { + throw Error.throwWithStackTrace( + PathAccessException(directory.path, e.osError!), s); + } } rethrow; } diff --git a/pkgs/watcher/lib/src/directory_watcher/windows.dart b/pkgs/watcher/lib/src/directory_watcher/windows.dart index 67586d967..27ab8bacb 100644 --- a/pkgs/watcher/lib/src/directory_watcher/windows.dart +++ b/pkgs/watcher/lib/src/directory_watcher/windows.dart @@ -22,7 +22,8 @@ class WindowsDirectoryWatcher extends ResubscribableWatcher String get directory => path; WindowsDirectoryWatcher(String directory) - : super(directory, () => _WindowsDirectoryWatcher(directory)); + : super( + directory, () => WindowsManuallyClosedDirectoryWatcher(directory)); } /// Windows directory watcher. @@ -48,7 +49,7 @@ class WindowsDirectoryWatcher extends ResubscribableWatcher /// On my machine, the test failure rate due to the type drops from 150/1000 /// at 900us to 0/10000 at 1000us. So, 1000us = 1ms is sufficient. Use 5ms to /// give a margin for error for different machine performance and load. -class _WindowsDirectoryWatcher +class WindowsManuallyClosedDirectoryWatcher implements DirectoryWatcher, ManuallyClosedWatcher { @override String get directory => path; @@ -94,7 +95,7 @@ class _WindowsDirectoryWatcher final Set> _listSubscriptions = HashSet>(); - _WindowsDirectoryWatcher(this.path) : _files = PathSet(path) { + WindowsManuallyClosedDirectoryWatcher(this.path) : _files = PathSet(path) { // Before we're ready to emit events, wait for [_listDir] to complete. _listDir().then((_) { _startWatch(); @@ -181,7 +182,9 @@ class _WindowsDirectoryWatcher event.type == EventType.createDirectory, modified: event.type == EventType.modifyFile || event.type == EventType.modifyDirectory, - deleted: event.type == EventType.delete, + deleted: event.type == EventType.delete || + event.type == EventType.moveFile || + event.type == EventType.moveDirectory, movedOnto: false); final destination = event.destination; if (destination != null) { @@ -208,13 +211,15 @@ class _WindowsDirectoryWatcher void _poll(_PendingPoll poll) { final path = poll.path; final events = _eventsBasedOnFileSystem(path, + reportCreate: poll.created || poll.movedOnto, + reportDelete: poll.deleted, // A modification can be reported due to a modification event, a // create+delete together, or if the path is a move destination. // The important case where the file is present, an event arrives // for the file and a modification is _not_ reported is when the file // was already discovered by listing a new directory, then the "add" // event for it is processed afterwards. - reportNoChangeAsModification: + reportModification: poll.modified || (poll.created && poll.deleted) || poll.movedOnto); for (final event in events) { @@ -264,10 +269,13 @@ class _WindowsDirectoryWatcher /// This returns a list whose order should be reflected in the events emitted /// to the user, unlike the batched events from [Directory.watch]. /// - /// [reportNoChangeAsModification] determines whether to report a modification - /// if there was a file at [path] and there is still a file at [path]. + /// + /// [reportCreate], [reportModification] and [reportDelete] restrict the types + /// of events that can be emitted. List _eventsBasedOnFileSystem(String path, - {required bool reportNoChangeAsModification}) { + {required bool reportCreate, + required bool reportModification, + required bool reportDelete}) { var fileExisted = _files.contains(path); var dirExisted = _files.containsDir(path); @@ -285,26 +293,26 @@ class _WindowsDirectoryWatcher var events = []; if (fileExisted) { if (fileExists) { - if (reportNoChangeAsModification) events.add(Event.modifyFile(path)); + if (reportModification) events.add(Event.modifyFile(path)); } else { - events.add(Event.delete(path)); + if (reportDelete) events.add(Event.delete(path)); } } else if (dirExisted) { if (dirExists) { // If we got contradictory events for a directory that used to exist and // still exists, we need to rescan the whole thing in case it was // replaced with a different directory. - events.add(Event.delete(path)); - events.add(Event.createDirectory(path)); + if (reportDelete) events.add(Event.delete(path)); + if (reportCreate) events.add(Event.createDirectory(path)); } else { - events.add(Event.delete(path)); + if (reportDelete) events.add(Event.delete(path)); } } if (!fileExisted && fileExists) { - events.add(Event.createFile(path)); + if (reportCreate) events.add(Event.createFile(path)); } else if (!dirExisted && dirExists) { - events.add(Event.createDirectory(path)); + if (reportCreate) events.add(Event.createDirectory(path)); } return events; diff --git a/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart new file mode 100644 index 000000000..422c1457b --- /dev/null +++ b/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart @@ -0,0 +1,170 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:isolate'; + +import '../resubscribable.dart'; +import '../watch_event.dart'; +import 'windows.dart'; + +/// Runs [WindowsManuallyClosedDirectoryWatcher] in an isolate to work around +/// a platform limitation. +/// +/// On Windows, Directory.watch fails if too many events arrive without being +/// processed by the Dart VM. See `directory_watcher/windows_test.dart` for code +/// that reliably triggers the failure by doing file writes in a synchronous +/// block that prevents the Dart VM from processing the events caused. +/// +/// Running the watcher in an isolate makes buffer exhaustion much less likely +/// as there is no unrelated work happening in the isolate that would block +/// processing of events. +class WindowsIsolateDirectoryWatcher implements ManuallyClosedWatcher { + @override + final String path; + final ReceivePort _receivePort = ReceivePort(); + final Completer _sendPortCompleter = Completer(); + + final StreamController _eventsController = + StreamController.broadcast(); + final Completer _readyCompleter = Completer(); + + WindowsIsolateDirectoryWatcher(this.path) { + _startIsolate(path, _receivePort.sendPort); + _receivePort.listen((event) => _receiveFromIsolate(event as Event)); + } + + void _receiveFromIsolate(Event event) { + switch (event.type) { + case EventType.sendPort: + _sendPortCompleter.complete(event.sendPort); + case EventType.ready: + _readyCompleter.complete(); + case EventType.watchEvent: + _eventsController.add(event.watchEvent!); + case EventType.close: + _eventsController.close(); + _receivePort.close(); + case EventType.error: + _eventsController.addError(event.error!, event.stackTrace); + } + } + + @override + void close() { + // The "close" event is the only event sent to the isolate, just send + // `null`. + _sendPortCompleter.future.then((sendPort) => sendPort.send(null)); + } + + @override + Stream get events => _eventsController.stream; + + @override + bool get isReady => _readyCompleter.isCompleted; + + @override + Future get ready => _readyCompleter.future; +} + +/// Starts watching [path] in an isolate. +/// +/// [sendPort] is the port from isolate to host, see `_WatcherIsolate` +/// constructor implementation for the events that will be sent. +void _startIsolate(String path, SendPort sendPort) async { + unawaited( + Isolate.run(() async => await _WatcherIsolate(path, sendPort).closed)); +} + +class _WatcherIsolate { + final String path; + final WindowsManuallyClosedDirectoryWatcher watcher; + final SendPort sendPort; + + // The isolate stays open until this future completes. + Future get closed => _closeCompleter.future; + final Completer _closeCompleter = Completer(); + + _WatcherIsolate(this.path, this.sendPort) + : watcher = WindowsManuallyClosedDirectoryWatcher(path) { + final receivePort = ReceivePort(); + + // Six types of event are sent to the host. + + // The `SendPort` for host to isolate communication on startup. + sendPort.send(Event.sendPort(receivePort.sendPort)); + + // `Event.ready` when the watcher is ready. + watcher.ready.then((_) { + sendPort.send(Event.ready()); + }); + + watcher.events.listen((event) { + // The watcher events. + sendPort.send(Event.watchEvent(event)); + }, onDone: () { + // `Event.close` if the watcher event stream closes. + sendPort.send(Event.close()); + }, onError: (Object e, StackTrace s) { + // `Event.error` on error. + sendPort.send(Event.error(e, s)); + }); + + receivePort.listen((event) { + // The only event sent from the host to the isolate is "close", no need + // to check the value. + watcher.close(); + _closeCompleter.complete(); + receivePort.close(); + }); + } +} + +/// Event sent from the isolate to the host. +class Event { + final EventType type; + final SendPort? sendPort; + final WatchEvent? watchEvent; + final Object? error; + final StackTrace? stackTrace; + + Event.sendPort(this.sendPort) + : type = EventType.sendPort, + watchEvent = null, + error = null, + stackTrace = null; + + Event.ready() + : type = EventType.ready, + sendPort = null, + watchEvent = null, + error = null, + stackTrace = null; + + Event.watchEvent(this.watchEvent) + : type = EventType.watchEvent, + sendPort = null, + error = null, + stackTrace = null; + + Event.close() + : type = EventType.close, + sendPort = null, + watchEvent = null, + error = null, + stackTrace = null; + + Event.error(this.error, this.stackTrace) + : type = EventType.error, + sendPort = null, + watchEvent = null; +} + +enum EventType { + sendPort, + ready, + watchEvent, + close, + error; +} diff --git a/pkgs/watcher/lib/src/directory_watcher/windows_resubscribable_watcher.dart b/pkgs/watcher/lib/src/directory_watcher/windows_resubscribable_watcher.dart new file mode 100644 index 000000000..bfbd4a55d --- /dev/null +++ b/pkgs/watcher/lib/src/directory_watcher/windows_resubscribable_watcher.dart @@ -0,0 +1,25 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import '../directory_watcher.dart'; +import '../resubscribable.dart'; +import 'windows.dart'; +import 'windows_isolate_directory_watcher.dart'; + +class WindowsDirectoryWatcher extends ResubscribableWatcher + implements DirectoryWatcher { + @override + String get directory => path; + + /// Watches [directory]. + /// + /// If [runInIsolate], runs the watcher in an isolate to reduce the chance of + /// hitting the Windows-specific buffer exhaustion failure. + WindowsDirectoryWatcher(String directory, {bool runInIsolate = true}) + : super( + directory, + () => runInIsolate + ? WindowsIsolateDirectoryWatcher(directory) + : WindowsManuallyClosedDirectoryWatcher(directory)); +} diff --git a/pkgs/watcher/lib/src/resubscribable.dart b/pkgs/watcher/lib/src/resubscribable.dart index b99e9d7b4..a2c37f8c2 100644 --- a/pkgs/watcher/lib/src/resubscribable.dart +++ b/pkgs/watcher/lib/src/resubscribable.dart @@ -45,6 +45,7 @@ abstract class ResubscribableWatcher implements Watcher { _eventsController = StreamController.broadcast( onListen: () async { + final completer = _readyCompleter; watcher = _factory(); subscription = watcher.events.listen(_eventsController.add, onError: _eventsController.addError, @@ -54,7 +55,7 @@ abstract class ResubscribableWatcher implements Watcher { // the time [onListen] is called, as opposed to the value when // [watcher.ready] fires. A new completer may be created by that time. await watcher.ready; - _readyCompleter.complete(); + completer.complete(); }, onCancel: () { // Cancel the subscription before closing the watcher so that the diff --git a/pkgs/watcher/test/directory_watcher/file_tests.dart b/pkgs/watcher/test/directory_watcher/file_tests.dart index d56f921d8..9d7e9450c 100644 --- a/pkgs/watcher/test/directory_watcher/file_tests.dart +++ b/pkgs/watcher/test/directory_watcher/file_tests.dart @@ -2,10 +2,12 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:io' as io; import 'dart:io'; import 'dart:isolate'; +import 'package:async/async.dart'; import 'package:test/test.dart'; import 'package:test_descriptor/test_descriptor.dart' as d; import 'package:watcher/src/utils.dart'; @@ -34,6 +36,82 @@ void _fileTests({required bool isNative}) { } }); + // ResubscribableWatcher wraps all the directory watchers to add handling of + // multiple subscribers. The underlying watcher is created when there is at + // least one subscriber and closed when there are zero subscribers. So, + // exercise that behavior in various ways. + test('ResubscribableWatcher handles multiple subscriptions ', () async { + final watcher = createWatcher(); + + // One subscription, one event, close the subscription. + final queue1 = StreamQueue(watcher.events); + final event1 = queue1.next; + await watcher.ready; + writeFile('a.txt'); + expect(await event1, isAddEvent('a.txt')); + await queue1.cancel(immediate: true); + + // Open before "ready", cancel before event. + final queue2a = StreamQueue(watcher.events); + // Open before "ready", cancel after one event. + final queue2b = StreamQueue(watcher.events); + // Open before "ready", cancel after two events. + final queue2c = StreamQueue(watcher.events); + + final queue2aHasNext = queue2a.hasNext; + unawaited(queue2a.cancel(immediate: true)); + expect(await queue2aHasNext, false); + + await watcher.ready; + + // Open after "ready", cancel before event. + final queue2d = StreamQueue(watcher.events); + + // Open after "ready", cancel after one event. + final queue2e = StreamQueue(watcher.events); + + // Open after "ready", cancel after two events. + final queue2f = StreamQueue(watcher.events); + + final queue2dHasNext = queue2d.hasNext; + unawaited(queue2d.cancel(immediate: true)); + expect(await queue2dHasNext, false); + + writeFile('b.txt'); + + expect(await queue2b.next, isAddEvent('b.txt')); + expect(await queue2c.next, isAddEvent('b.txt')); + expect(await queue2e.next, isAddEvent('b.txt')); + expect(await queue2f.next, isAddEvent('b.txt')); + final queue2bHasNext = queue2b.hasNext; + await queue2b.cancel(immediate: true); + expect(await queue2bHasNext, false); + final queue2eHasNext = queue2e.hasNext; + await queue2e.cancel(immediate: true); + expect(await queue2eHasNext, false); + + // Remaining subscriptions still get events. + writeFile('c.txt'); + expect(await queue2c.next, isAddEvent('c.txt')); + expect(await queue2f.next, isAddEvent('c.txt')); + final queue2cHasNext = queue2c.hasNext; + await queue2c.cancel(immediate: true); + expect(await queue2cHasNext, false); + final queue2fHasNext = queue2f.hasNext; + await queue2f.cancel(immediate: true); + expect(await queue2fHasNext, false); + + // Repeat the first simple test: one subscription, one event, close the + // subscription. + final queue3 = StreamQueue(watcher.events); + await watcher.ready; + writeFile('d.txt'); + expect(await queue3.next, isAddEvent('d.txt')); + final queue3HasNext = queue3.hasNext; + await queue3.cancel(immediate: true); + expect(await queue3HasNext, false); + }); + test('does not notify for files that already exist when started', () async { // Make some pre-existing files. writeFile('a.txt'); diff --git a/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart b/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart new file mode 100644 index 000000000..11aee06c0 --- /dev/null +++ b/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart @@ -0,0 +1,115 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +@TestOn('windows') +@Timeout.factor(2) +library; + +import 'dart:async'; +import 'dart:io'; + +import 'package:test/test.dart'; +import 'package:watcher/watcher.dart'; + +void main() { + // The Windows native watcher has a buffer that gets exhausted if events are + // not handled quickly enough. Then, it throws an error and stops watching. + // The exhaustion is reliably triggered if enough events arrive during a sync + // block. The `package:watcher` implementation tries to catch this and recover + // by starting a new watcher. + for (final runInIsolate in [false, true]) { + late StreamSubscription subscription; + late Directory temp; + late int eventsSeen; + late int errorsSeen; + late int totalErrorsSeen; + + setUp(() async { + temp = Directory.systemTemp.createTempSync(); + final watcher = + DirectoryWatcher(temp.path, runInIsolateOnWindows: runInIsolate); + + eventsSeen = 0; + errorsSeen = 0; + totalErrorsSeen = 0; + subscription = watcher.events.listen( + (e) { + ++eventsSeen; + }, + onError: (_, __) { + ++errorsSeen; + }, + ); + await watcher.ready; + }); + + tearDown(() { + subscription.cancel(); + }); + + test( + runInIsolate + ? 'No buffer exhaustion if running in isolate' + : 'Recover from buffer exhaustion if not running in isolate', + () async { + // Use a long filename to fill the buffer. + final file = File('${temp.path}\\file'.padRight(255, 'a')); + + // Repeatedly trigger buffer exhaustion, to check that recovery is + // reliable. + for (var times = 0; times != 200; ++times) { + errorsSeen = 0; + eventsSeen = 0; + + // Syncronously trigger 100 events. Because this is a sync block, the VM + // won't handle the events, so this has a very high chance of triggering + // a buffer exhaustion. + // + // If a buffer exhaustion happens, `package:watcher` turns this into an + // error on the event stream, so `errorsSeen` will get incremented once. + // The number of changes 200 is chosen so this is very likely to happen. + // If there is _not_ an exhaustion, the 200 events will show on the + // stream as a single event because they are changes of the same file. + // So, `eventsSeen` will instead be incremented once. + for (var i = 0; i != 200; ++i) { + file.writeAsStringSync(''); + } + + // Events only happen when there is an async gap, wait for such a gap. + // The event usually arrives in under 10ms, try for 100ms. + var tries = 0; + while (errorsSeen == 0 && eventsSeen == 0 && tries < 10) { + await Future.delayed(const Duration(milliseconds: 10)); + ++tries; + } + + totalErrorsSeen += errorsSeen; + + // If everything is going well, there should have been either one event + // seen or one error seen. + if (errorsSeen == 0 && eventsSeen == 0) { + // It looks like the watcher is now broken: there were file changes + // but no event and no error. Do some non-sync writes to confirm + // whether the watcher really is now broken. + for (var i = 0; i != 5; ++i) { + await file.writeAsString(''); + } + await Future.delayed(const Duration(milliseconds: 10)); + fail( + 'On attempt ${times + 1}, watcher registered nothing. ' + 'On retry, it registered: $errorsSeen error(s), $eventsSeen ' + 'event(s).', + ); + } + } + + // Buffer exhaustion is likely without the isolate but not guaranteed. + if (runInIsolate) { + expect(totalErrorsSeen, 0); + } else { + expect(totalErrorsSeen, greaterThan(150)); + } + }); + } +} diff --git a/pkgs/watcher/test/directory_watcher/windows_test.dart b/pkgs/watcher/test/directory_watcher/windows_test.dart index 4bf0d4bde..fd9e3defc 100644 --- a/pkgs/watcher/test/directory_watcher/windows_test.dart +++ b/pkgs/watcher/test/directory_watcher/windows_test.dart @@ -11,7 +11,7 @@ import 'dart:io'; import 'package:path/path.dart' as p; import 'package:test/test.dart'; -import 'package:watcher/src/directory_watcher/windows.dart'; +import 'package:watcher/src/directory_watcher/windows_resubscribable_watcher.dart'; import 'package:watcher/watcher.dart'; import '../utils.dart'; @@ -116,88 +116,4 @@ void main() { expect(errorsSeen, 0); }); }); - - // The Windows native watcher has a buffer that gets exhausted if events are - // not handled quickly enough. Then, it throws an error and stops watching. - // The exhaustion is reliably triggered if enough events arrive during a sync - // block. The `package:watcher` implementation tries to catch this and recover - // by starting a new watcher. - group('Buffer exhaustion', () { - late StreamSubscription subscription; - late Directory temp; - late int eventsSeen; - late int errorsSeen; - - setUp(() async { - temp = Directory.systemTemp.createTempSync(); - final watcher = DirectoryWatcher(temp.path); - - eventsSeen = 0; - errorsSeen = 0; - subscription = watcher.events.listen( - (e) { - ++eventsSeen; - }, - onError: (_, __) { - ++errorsSeen; - }, - ); - await watcher.ready; - }); - - tearDown(() { - subscription.cancel(); - }); - - test('recovery', () async { - // Use a long filename to fill the buffer. - final file = File('${temp.path}\\file'.padRight(255, 'a')); - - // Repeatedly trigger buffer exhaustion, to check that recovery is - // reliable. - for (var times = 0; times != 200; ++times) { - errorsSeen = 0; - eventsSeen = 0; - - // Syncronously trigger 200 events. Because this is a sync block, the VM - // won't handle the events, so this has a very high chance of triggering - // a buffer exhaustion. - // - // If a buffer exhaustion happens, `package:watcher` turns this into an - // error on the event stream, so `errorsSeen` will get incremented once. - // The number of changes 200 is chosen so this is very likely to happen. - // If there is _not_ an exhaustion, the 200 events will show on the - // stream as a single event because they are changes of the same file. - // So, `eventsSeen` will instead be incremented once. - for (var i = 0; i != 200; ++i) { - file.writeAsStringSync(''); - } - - // Events only happen when there is an async gap, wait for such a gap. - // The event usually arrives in under 10ms, try for 100ms. - var tries = 0; - while (errorsSeen == 0 && eventsSeen == 0 && tries < 10) { - await Future.delayed(const Duration(milliseconds: 10)); - ++tries; - } - - // If everything is going well, there should have been either one event - // seen or one error seen. - if (errorsSeen == 0 && eventsSeen == 0) { - // It looks like the watcher is now broken: there were file changes - // but no event and no error. Do some non-sync writes to confirm - // whether the watcher really is now broken. - for (var i = 0; i != 5; ++i) { - await file.writeAsString(''); - } - await Future.delayed(const Duration(milliseconds: 10)); - fail( - 'On attempt ${times + 1}, watcher registered nothing. ' - 'On retry, it registered: $errorsSeen error(s), $eventsSeen ' - 'event(s).', - ); - } - } - }); - }); } diff --git a/pkgs/watcher/test/no_subscription/windows_test.dart b/pkgs/watcher/test/no_subscription/windows_test.dart index 9f9e5a9c3..985aa68d1 100644 --- a/pkgs/watcher/test/no_subscription/windows_test.dart +++ b/pkgs/watcher/test/no_subscription/windows_test.dart @@ -6,7 +6,7 @@ library; import 'package:test/test.dart'; -import 'package:watcher/src/directory_watcher/windows.dart'; +import 'package:watcher/src/directory_watcher/windows_resubscribable_watcher.dart'; import '../utils.dart'; import 'shared.dart'; diff --git a/pkgs/watcher/test/ready/windows_test.dart b/pkgs/watcher/test/ready/windows_test.dart index 9f9e5a9c3..985aa68d1 100644 --- a/pkgs/watcher/test/ready/windows_test.dart +++ b/pkgs/watcher/test/ready/windows_test.dart @@ -6,7 +6,7 @@ library; import 'package:test/test.dart'; -import 'package:watcher/src/directory_watcher/windows.dart'; +import 'package:watcher/src/directory_watcher/windows_resubscribable_watcher.dart'; import '../utils.dart'; import 'shared.dart';