diff --git a/pkgs/watcher/CHANGELOG.md b/pkgs/watcher/CHANGELOG.md index 340de8b08..839bce4cf 100644 --- a/pkgs/watcher/CHANGELOG.md +++ b/pkgs/watcher/CHANGELOG.md @@ -1,5 +1,9 @@ ## 1.1.5-wip +- `DirectoryWatcher` on Windows watches in a separate Isolate to make buffer + exhaustion, "Directory watcher closed unexpectedly", much less likely. The old + implementation which does not use a separate Isolate is available as + `DirectoryWatcher(path, runInIsolateOnWindows: false)`. - Polling watchers now check file sizes as well as "last modified" times, so they are less likely to miss changes on platforms with low resolution timestamps. diff --git a/pkgs/watcher/lib/src/directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher.dart index 8caf09f8a..0348b6b71 100644 --- a/pkgs/watcher/lib/src/directory_watcher.dart +++ b/pkgs/watcher/lib/src/directory_watcher.dart @@ -20,6 +20,8 @@ import 'directory_watcher/windows.dart'; /// the message "Directory watcher closed unexpectedly" on the event stream. The /// code using the watcher needs to do additional work to account for the /// dropped events, for example by recomputing interesting files from scratch. +/// By default, the watcher is started in a separate isolate to make this less +/// likely. Pass `runInIsolateOnWindows = false` to not launch an isolate. abstract class DirectoryWatcher implements Watcher { /// The directory whose contents are being monitored. @Deprecated('Expires in 1.0.0. Use DirectoryWatcher.path instead.') @@ -35,7 +37,11 @@ abstract class DirectoryWatcher implements Watcher { /// shorter will give more immediate feedback at the expense of doing more IO /// and higher CPU usage. Defaults to one second. Ignored for non-polling /// watchers. - factory DirectoryWatcher(String directory, {Duration? pollingDelay}) { + /// + /// On Windows, pass [runInIsolateOnWindows] `false` to not run the watcher + /// in a separate isolate to reduce buffer exhaustion failures. + factory DirectoryWatcher(String directory, + {Duration? pollingDelay, bool runInIsolateOnWindows = true}) { if (FileSystemEntity.isWatchSupported) { var customWatcher = createCustomDirectoryWatcher( directory, @@ -44,7 +50,10 @@ abstract class DirectoryWatcher implements Watcher { if (customWatcher != null) return customWatcher; if (Platform.isLinux) return LinuxDirectoryWatcher(directory); if (Platform.isMacOS) return MacOSDirectoryWatcher(directory); - if (Platform.isWindows) return WindowsDirectoryWatcher(directory); + if (Platform.isWindows) { + return WindowsDirectoryWatcher(directory, + runInIsolate: runInIsolateOnWindows); + } } return PollingDirectoryWatcher(directory, pollingDelay: pollingDelay); } diff --git a/pkgs/watcher/lib/src/directory_watcher/windows.dart b/pkgs/watcher/lib/src/directory_watcher/windows.dart index c8d1af012..e262a13f1 100644 --- a/pkgs/watcher/lib/src/directory_watcher/windows.dart +++ b/pkgs/watcher/lib/src/directory_watcher/windows.dart @@ -1,472 +1,24 @@ -// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -// TODO(rnystrom): Merge with mac_os version. - -import 'dart:async'; -import 'dart:collection'; -import 'dart:io'; - -import 'package:path/path.dart' as p; import '../directory_watcher.dart'; -import '../event.dart'; -import '../path_set.dart'; import '../resubscribable.dart'; -import '../utils.dart'; -import '../watch_event.dart'; +import 'windows_isolate_directory_watcher.dart'; +import 'windows_manually_closed_directory_watcher.dart'; class WindowsDirectoryWatcher extends ResubscribableWatcher implements DirectoryWatcher { @override String get directory => path; - WindowsDirectoryWatcher(String directory) - : super(directory, () => _WindowsDirectoryWatcher(directory)); -} - -class _EventBatcher { - static const Duration _batchDelay = Duration(milliseconds: 100); - final List events = []; - Timer? timer; - - void addEvent(Event event, void Function() callback) { - events.add(event); - timer?.cancel(); - timer = Timer(_batchDelay, callback); - } - - void cancelTimer() { - timer?.cancel(); - } -} - -class _WindowsDirectoryWatcher - implements DirectoryWatcher, ManuallyClosedWatcher { - @override - String get directory => path; - @override - final String path; - - @override - Stream get events => _eventsController.stream; - final _eventsController = StreamController.broadcast(); - - @override - bool get isReady => _readyCompleter.isCompleted; - - @override - Future get ready => _readyCompleter.future; - final _readyCompleter = Completer(); - - final Map _eventBatchers = - HashMap(); - - /// The set of files that are known to exist recursively within the watched - /// directory. - /// - /// The state of files on the filesystem is compared against this to determine - /// the real change that occurred. This is also used to emit REMOVE events - /// when subdirectories are moved out of the watched directory. - final PathSet _files; - - /// The subscription to the stream returned by [Directory.watch]. - StreamSubscription? _watchSubscription; - - /// The subscription to the stream returned by [Directory.watch] of the - /// parent directory to [directory]. This is needed to detect changes to - /// [directory], as they are not included on Windows. - StreamSubscription? _parentWatchSubscription; - - /// The subscription to the [Directory.list] call for the initial listing of - /// the directory to determine its initial state. - StreamSubscription? _initialListSubscription; - - /// The subscriptions to the [Directory.list] calls for listing the contents - /// of subdirectories that were moved into the watched directory. - final Set> _listSubscriptions = - HashSet>(); - - _WindowsDirectoryWatcher(this.path) : _files = PathSet(path) { - // Before we're ready to emit events, wait for [_listDir] to complete. - _listDir().then((_) { - _startWatch(); - _startParentWatcher(); - if (!isReady) { - _readyCompleter.complete(); - } - }); - } - - @override - void close() { - _watchSubscription?.cancel(); - _parentWatchSubscription?.cancel(); - _initialListSubscription?.cancel(); - for (var sub in _listSubscriptions) { - sub.cancel(); - } - _listSubscriptions.clear(); - for (var batcher in _eventBatchers.values) { - batcher.cancelTimer(); - } - _eventBatchers.clear(); - _watchSubscription = null; - _parentWatchSubscription = null; - _initialListSubscription = null; - _eventsController.close(); - } - - /// On Windows, if [directory] is deleted, we will not receive any event. + /// Watches [directory]. /// - /// Instead, we add a watcher on the parent folder (if any), that can notify - /// us about [path]. This also includes events such as moves. - void _startParentWatcher() { - var absoluteDir = p.absolute(path); - var parent = p.dirname(absoluteDir); - try { - // Check if [path] is already the root directory. - if (FileSystemEntity.identicalSync(parent, path)) return; - } on FileSystemException catch (_) { - // Either parent or path or both might be gone due to concurrently - // occurring changes. Just ignore and continue. If we fail to - // watch path we will report an error from _startWatch. - return; - } - var parentStream = Directory(parent).watch(recursive: false); - _parentWatchSubscription = parentStream.listen( - (event) { - // Only look at events for 'directory'. - if (p.basename(event.path) != p.basename(absoluteDir)) return; - // Test if the directory is removed. FileSystemEntity.typeSync will - // return NOT_FOUND if it's unable to decide upon the type, including - // access denied issues, which may happen when the directory is deleted. - // FileSystemMoveEvent and FileSystemDeleteEvent events will always mean - // the directory is now gone. - if (event is FileSystemMoveEvent || - event is FileSystemDeleteEvent || - (FileSystemEntity.typeSync(path) == - FileSystemEntityType.notFound)) { - for (var path in _files.paths) { - _emitEvent(ChangeType.REMOVE, path); - } - _files.clear(); - close(); - } - }, - onError: (error) { - // Ignore errors, simply close the stream. The user listens on - // [directory], and while it can fail to listen on the parent, we may - // still be able to listen on the path requested. - _parentWatchSubscription?.cancel(); - _parentWatchSubscription = null; - }, - ); - } - - void _onEvent(FileSystemEvent fileSystemEvent) { - assert(isReady); - final event = Event.checkAndConvert(fileSystemEvent); - if (event == null) return; - if (event.type == EventType.moveFile) { - _batchEvent(Event.delete(event.path)); - final destination = event.destination; - if (destination != null) { - _batchEvent(Event.createFile(destination)); - } - } else if (event.type == EventType.moveDirectory) { - _batchEvent(Event.delete(event.path)); - final destination = event.destination; - if (destination != null) { - _batchEvent(Event.createDirectory(destination)); - } - } else { - _batchEvent(event); - } - } - - void _batchEvent(Event event) { - final batcher = _eventBatchers.putIfAbsent(event.path, _EventBatcher.new); - batcher.addEvent(event, () { - _eventBatchers.remove(event.path); - _onBatch(batcher.events); - }); - } - - /// The callback that's run when [Directory.watch] emits a batch of events. - void _onBatch(List batch) { - _sortEvents(batch).forEach((path, eventSet) { - var canonicalEvent = _canonicalEvent(eventSet); - var events = canonicalEvent == null - ? _eventsBasedOnFileSystem(path) - : [canonicalEvent]; - - for (var event in events) { - switch (event.type) { - case EventType.createFile: - if (_files.contains(path)) continue; - _emitEvent(ChangeType.ADD, path); - _files.add(path); - - case EventType.createDirectory: - if (_files.containsDir(path)) continue; - - // "Path not found" can be caused by creating then quickly removing - // a directory: continue without reporting an error. Nested files - // that get removed during the `list` are already ignored by `list` - // itself, so there are no other types of "path not found" that - // might need different handling here. - var stream = Directory(path) - .list(recursive: true) - .ignoring(); - var subscription = stream.listen((entity) { - if (entity is Directory) return; - if (_files.contains(entity.path)) return; - - _emitEvent(ChangeType.ADD, entity.path); - _files.add(entity.path); - }, cancelOnError: true); - subscription.onDone(() { - _listSubscriptions.remove(subscription); - }); - subscription.onError((Object e, StackTrace stackTrace) { - _listSubscriptions.remove(subscription); - _emitError(e, stackTrace); - }); - _listSubscriptions.add(subscription); - - case EventType.modifyFile: - _emitEvent(ChangeType.MODIFY, path); - - case EventType.delete: - for (var removedPath in _files.remove(path)) { - _emitEvent(ChangeType.REMOVE, removedPath); - } - - // Move events are removed by `_onEvent` and never returned by - // `_eventsBasedOnFileSystem`. - case EventType.moveFile: - case EventType.moveDirectory: - throw StateError(event.type.name); - - // Dropped by [Event.checkAndConvert]. - case EventType.modifyDirectory: - assert(event.type.isIgnoredOnWindows); - } - } - }); - } - - /// Sort all the events in a batch into sets based on their path. - Map> _sortEvents(List batch) { - var eventsForPaths = >{}; - - // On Windows new links to directories are sometimes reported by - // Directory.watch as directories. On all other platforms it reports them - // consistently as files. See https://github.com/dart-lang/sdk/issues/61797. - // - // The wrong type is because Windows creates links to directories as actual - // directories, then converts them to links. Directory.watch sometimes - // checks the type too early and gets the wrong result. - // - // The batch delay is plenty for the link to be fully created, so verify the - // file system entity type for all createDirectory` events, converting to - // `createFile` when needed. - for (var i = 0; i != batch.length; ++i) { - final event = batch[i]; - if (event.type == EventType.createDirectory) { - if (FileSystemEntity.typeSync(event.path, followLinks: false) == - FileSystemEntityType.link) { - batch[i] = Event.createFile(event.path); - } - } - } - - // Events within directories that already have create events are not needed - // as the directory's full content will be listed. - var createdDirectories = unionAll(batch.map((event) { - return event.type == EventType.createDirectory - ? {event.path} - : const {}; - })); - - bool isInCreatedDirectory(String path) => - createdDirectories.any((dir) => path != dir && p.isWithin(dir, path)); - - void addEvent(String path, Event event) { - if (isInCreatedDirectory(path)) return; - eventsForPaths.putIfAbsent(path, () => {}).add(event); - } - - for (var event in batch) { - addEvent(event.path, event); - } - - return eventsForPaths; - } - - /// Returns the canonical event from a batch of events on the same path, or - /// `null` to indicate that the filesystem should be checked. - Event? _canonicalEvent(Set batch) { - // If the batch is empty, return `null`. - if (batch.isEmpty) return null; - - // Resolve the event type for the batch. - var types = batch.map((e) => e.type).toSet(); - EventType type; - if (types.length == 1) { - // There's only one event. - type = types.single; - } else if (types.length == 2 && - types.contains(EventType.modifyFile) && - types.contains(EventType.createFile)) { - // Combine events of type [EventType.modifyFile] and - // [EventType.createFile] to one event. - type = EventType.createFile; - } else { - // There are incompatible event types, check the filesystem. - return null; - } - - return batch.firstWhere((e) => e.type == type); - } - - /// Returns zero or more events that describe the change between the last - /// known state of [path] and its current state on the filesystem. /// - /// This returns a list whose order should be reflected in the events emitted - /// to the user, unlike the batched events from [Directory.watch]. The - /// returned list may be empty, indicating that no changes occurred to [path] - /// (probably indicating that it was created and then immediately deleted). - List _eventsBasedOnFileSystem(String path) { - var fileExisted = _files.contains(path); - var dirExisted = _files.containsDir(path); - - bool fileExists; - bool dirExists; - try { - fileExists = File(path).existsSync(); - dirExists = Directory(path).existsSync(); - } on FileSystemException { - return const []; - } - - var events = []; - if (fileExisted) { - if (fileExists) { - events.add(Event.modifyFile(path)); - } else { - events.add(Event.delete(path)); - } - } else if (dirExisted) { - if (dirExists) { - // If we got contradictory events for a directory that used to exist and - // still exists, we need to rescan the whole thing in case it was - // replaced with a different directory. - events.add(Event.delete(path)); - events.add(Event.createDirectory(path)); - } else { - events.add(Event.delete(path)); - } - } - - if (!fileExisted && fileExists) { - events.add(Event.createFile(path)); - } else if (!dirExisted && dirExists) { - events.add(Event.createDirectory(path)); - } - - return events; - } - - /// The callback that's run when the [Directory.watch] stream is closed. - /// Note that this is unlikely to happen on Windows, unless the system itself - /// closes the handle. - void _onDone() { - _watchSubscription = null; - - // Emit remove events for any remaining files. - for (var file in _files.paths) { - _emitEvent(ChangeType.REMOVE, file); - } - _files.clear(); - close(); - } - - /// Start or restart the underlying [Directory.watch] stream. - void _startWatch() { - // Note: in older SDKs "watcher closed" exceptions might not get sent over - // the stream returned by watch, and must be caught via a zone handler. - runZonedGuarded( - () { - var innerStream = Directory(path).watch(recursive: true); - _watchSubscription = innerStream.listen( - _onEvent, - onError: _restartWatchOnOverflowOr(_eventsController.addError), - onDone: _onDone, - ); - }, - _restartWatchOnOverflowOr((error, stackTrace) { - // ignore: only_throw_errors - throw error; - }), - ); - } - - void Function(Object, StackTrace) _restartWatchOnOverflowOr( - void Function(Object, StackTrace) otherwise) { - return (Object error, StackTrace stackTrace) async { - if (error is FileSystemException && - error.message.startsWith('Directory watcher closed unexpectedly')) { - // Wait to work around https://github.com/dart-lang/sdk/issues/61378. - // Give the VM time to reset state after the error. See the issue for - // more discussion of the workaround. - await _watchSubscription?.cancel(); - await Future.delayed(const Duration(milliseconds: 1)); - _eventsController.addError(error, stackTrace); - _startWatch(); - } else { - otherwise(error, stackTrace); - } - }; - } - - /// Starts or restarts listing the watched directory to get an initial picture - /// of its state. - Future _listDir() { - assert(!isReady); - _initialListSubscription?.cancel(); - - _files.clear(); - var completer = Completer(); - var stream = Directory(path).listRecursivelyIgnoringErrors(); - void handleEntity(FileSystemEntity entity) { - if (entity is! Directory) _files.add(entity.path); - } - - _initialListSubscription = stream.listen( - handleEntity, - onError: _emitError, - onDone: completer.complete, - cancelOnError: true, - ); - return completer.future; - } - - /// Emit an event with the given [type] and [path]. - void _emitEvent(ChangeType type, String path) { - if (!isReady) return; - - _eventsController.add(WatchEvent(type, path)); - } - - /// Emit an error, then close the watcher. - void _emitError(Object error, StackTrace stackTrace) { - // Guarantee that ready always completes. - if (!isReady) { - _readyCompleter.complete(); - } - _eventsController.addError(error, stackTrace); - close(); - } + WindowsDirectoryWatcher(String directory, {bool runInIsolate = true}) + : super( + directory, + () => runInIsolate + ? WindowsIsolateDirectoryWatcher(directory) + : WindowsManuallyClosedDirectoryWatcher(directory)); } diff --git a/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart new file mode 100644 index 000000000..a4e2b7ad0 --- /dev/null +++ b/pkgs/watcher/lib/src/directory_watcher/windows_isolate_directory_watcher.dart @@ -0,0 +1,170 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:isolate'; + +import '../resubscribable.dart'; +import '../watch_event.dart'; +import 'windows_manually_closed_directory_watcher.dart'; + +/// Runs [WindowsManuallyClosedDirectoryWatcher] in an isolate to work around +/// a platform limitation. +/// +/// On Windows, Directory.watch fails if too many events arrive without being +/// processed by the Dart VM. See `directory_watcher/windows_test.dart` for code +/// that reliably triggers the failure by doing file writes in a synchronous +/// block that prevents the Dart VM from processing the events caused. +/// +/// Running the watcher in an isolate makes buffer exhaustion much less likely +/// as there is no unrelated work happening in the isolate that would block +/// processing of events. +class WindowsIsolateDirectoryWatcher implements ManuallyClosedWatcher { + @override + final String path; + final ReceivePort _receivePort = ReceivePort(); + final Completer _sendPortCompleter = Completer(); + + final StreamController _eventsController = + StreamController.broadcast(); + final Completer _readyCompleter = Completer(); + + WindowsIsolateDirectoryWatcher(this.path) { + _startIsolate(path, _receivePort.sendPort); + _receivePort.listen((event) => _receiveFromIsolate(event as Event)); + } + + void _receiveFromIsolate(Event event) { + switch (event.type) { + case EventType.sendPort: + _sendPortCompleter.complete(event.sendPort); + case EventType.ready: + _readyCompleter.complete(); + case EventType.watchEvent: + _eventsController.add(event.watchEvent!); + case EventType.close: + _eventsController.close(); + _receivePort.close(); + case EventType.error: + _eventsController.addError(event.error!, event.stackTrace); + } + } + + @override + void close() { + // The "close" event is the only event sent to the isolate, just send + // `null`. + _sendPortCompleter.future.then((sendPort) => sendPort.send(null)); + } + + @override + Stream get events => _eventsController.stream; + + @override + bool get isReady => _readyCompleter.isCompleted; + + @override + Future get ready => _readyCompleter.future; +} + +/// Starts watching [path] in an isolate. +/// +/// [sendPort] is the port from isolate to host, see `_WatcherIsolate` +/// constructor implementation for the events that will be sent. +void _startIsolate(String path, SendPort sendPort) async { + unawaited( + Isolate.run(() async => await _WatcherIsolate(path, sendPort).closed)); +} + +class _WatcherIsolate { + final String path; + final WindowsManuallyClosedDirectoryWatcher watcher; + final SendPort sendPort; + + // The isolate stays open until this future completes. + Future get closed => _closeCompleter.future; + final Completer _closeCompleter = Completer(); + + _WatcherIsolate(this.path, this.sendPort) + : watcher = WindowsManuallyClosedDirectoryWatcher(path) { + final receivePort = ReceivePort(); + + // Six types of event are sent to the host. + + // The `SendPort` for host to isolate communication on startup. + sendPort.send(Event.sendPort(receivePort.sendPort)); + + // `Event.ready` when the watcher is ready. + watcher.ready.then((_) { + sendPort.send(Event.ready()); + }); + + watcher.events.listen((event) { + // The watcher events. + sendPort.send(Event.watchEvent(event)); + }, onDone: () { + // `Event.close` if the watcher event stream closes. + sendPort.send(Event.close()); + }, onError: (Object e, StackTrace s) { + // `Event.error` on error. + sendPort.send(Event.error(e, s)); + }); + + receivePort.listen((event) { + // The only event sent from the host to the isolate is "close", no need + // to check the value. + watcher.close(); + _closeCompleter.complete(); + receivePort.close(); + }); + } +} + +/// Event sent from the isolate to the host. +class Event { + final EventType type; + final SendPort? sendPort; + final WatchEvent? watchEvent; + final Object? error; + final StackTrace? stackTrace; + + Event.sendPort(this.sendPort) + : type = EventType.sendPort, + watchEvent = null, + error = null, + stackTrace = null; + + Event.ready() + : type = EventType.ready, + sendPort = null, + watchEvent = null, + error = null, + stackTrace = null; + + Event.watchEvent(this.watchEvent) + : type = EventType.watchEvent, + sendPort = null, + error = null, + stackTrace = null; + + Event.close() + : type = EventType.close, + sendPort = null, + watchEvent = null, + error = null, + stackTrace = null; + + Event.error(this.error, this.stackTrace) + : type = EventType.error, + sendPort = null, + watchEvent = null; +} + +enum EventType { + sendPort, + ready, + watchEvent, + close, + error; +} diff --git a/pkgs/watcher/lib/src/directory_watcher/windows_manually_closed_directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher/windows_manually_closed_directory_watcher.dart new file mode 100644 index 000000000..b36551feb --- /dev/null +++ b/pkgs/watcher/lib/src/directory_watcher/windows_manually_closed_directory_watcher.dart @@ -0,0 +1,458 @@ +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:collection'; +import 'dart:io'; + +import 'package:path/path.dart' as p; + +import '../event.dart'; +import '../path_set.dart'; +import '../resubscribable.dart'; +import '../utils.dart'; +import '../watch_event.dart'; + +class _EventBatcher { + static const Duration _batchDelay = Duration(milliseconds: 100); + final List events = []; + Timer? timer; + + void addEvent(Event event, void Function() callback) { + events.add(event); + timer?.cancel(); + timer = Timer(_batchDelay, callback); + } + + void cancelTimer() { + timer?.cancel(); + } +} + +class WindowsManuallyClosedDirectoryWatcher implements ManuallyClosedWatcher { + @override + final String path; + + @override + Stream get events => _eventsController.stream; + final _eventsController = StreamController.broadcast(); + + @override + bool get isReady => _readyCompleter.isCompleted; + + @override + Future get ready => _readyCompleter.future; + final _readyCompleter = Completer(); + + final Map _eventBatchers = + HashMap(); + + /// The set of files that are known to exist recursively within the watched + /// directory. + /// + /// The state of files on the filesystem is compared against this to determine + /// the real change that occurred. This is also used to emit REMOVE events + /// when subdirectories are moved out of the watched directory. + final PathSet _files; + + /// The subscription to the stream returned by [Directory.watch]. + StreamSubscription? _watchSubscription; + + /// The subscription to the stream returned by [Directory.watch] of the + /// parent directory to [path]. This is needed to detect changes to + /// [path], as they are not included on Windows. + StreamSubscription? _parentWatchSubscription; + + /// The subscription to the [Directory.list] call for the initial listing of + /// the directory to determine its initial state. + StreamSubscription? _initialListSubscription; + + /// The subscriptions to the [Directory.list] calls for listing the contents + /// of subdirectories that were moved into the watched directory. + final Set> _listSubscriptions = + HashSet>(); + + WindowsManuallyClosedDirectoryWatcher(this.path) : _files = PathSet(path) { + // Before we're ready to emit events, wait for [_listDir] to complete. + _listDir().then((_) { + _startWatch(); + _startParentWatcher(); + if (!isReady) { + _readyCompleter.complete(); + } + }); + } + + @override + void close() { + _watchSubscription?.cancel(); + _parentWatchSubscription?.cancel(); + _initialListSubscription?.cancel(); + for (var sub in _listSubscriptions) { + sub.cancel(); + } + _listSubscriptions.clear(); + for (var batcher in _eventBatchers.values) { + batcher.cancelTimer(); + } + _eventBatchers.clear(); + _watchSubscription = null; + _parentWatchSubscription = null; + _initialListSubscription = null; + _eventsController.close(); + } + + /// On Windows, if [path] is deleted, we will not receive any event. + /// + /// Instead, we add a watcher on the parent folder (if any), that can notify + /// us about [path]. This also includes events such as moves. + void _startParentWatcher() { + var absoluteDir = p.absolute(path); + var parent = p.dirname(absoluteDir); + try { + // Check if [path] is already the root directory. + if (FileSystemEntity.identicalSync(parent, path)) return; + } on FileSystemException catch (_) { + // Either parent or path or both might be gone due to concurrently + // occurring changes. Just ignore and continue. If we fail to + // watch path we will report an error from _startWatch. + return; + } + var parentStream = Directory(parent).watch(recursive: false); + _parentWatchSubscription = parentStream.listen( + (event) { + // Only look at events for 'directory'. + if (p.basename(event.path) != p.basename(absoluteDir)) return; + // Test if the directory is removed. FileSystemEntity.typeSync will + // return NOT_FOUND if it's unable to decide upon the type, including + // access denied issues, which may happen when the directory is deleted. + // FileSystemMoveEvent and FileSystemDeleteEvent events will always mean + // the directory is now gone. + if (event is FileSystemMoveEvent || + event is FileSystemDeleteEvent || + (FileSystemEntity.typeSync(path) == + FileSystemEntityType.notFound)) { + for (var path in _files.paths) { + _emitEvent(ChangeType.REMOVE, path); + } + _files.clear(); + close(); + } + }, + onError: (error) { + // Ignore errors, simply close the stream. The user listens on + // [directory], and while it can fail to listen on the parent, we may + // still be able to listen on the path requested. + _parentWatchSubscription?.cancel(); + _parentWatchSubscription = null; + }, + ); + } + + void _onEvent(FileSystemEvent fileSystemEvent) { + assert(isReady); + final event = Event.checkAndConvert(fileSystemEvent); + if (event == null) return; + if (event.type == EventType.moveFile) { + _batchEvent(Event.delete(event.path)); + final destination = event.destination; + if (destination != null) { + _batchEvent(Event.createFile(destination)); + } + } else if (event.type == EventType.moveDirectory) { + _batchEvent(Event.delete(event.path)); + final destination = event.destination; + if (destination != null) { + _batchEvent(Event.createDirectory(destination)); + } + } else { + _batchEvent(event); + } + } + + void _batchEvent(Event event) { + final batcher = _eventBatchers.putIfAbsent(event.path, _EventBatcher.new); + batcher.addEvent(event, () { + _eventBatchers.remove(event.path); + _onBatch(batcher.events); + }); + } + + /// The callback that's run when [Directory.watch] emits a batch of events. + void _onBatch(List batch) { + _sortEvents(batch).forEach((path, eventSet) { + var canonicalEvent = _canonicalEvent(eventSet); + var events = canonicalEvent == null + ? _eventsBasedOnFileSystem(path) + : [canonicalEvent]; + + for (var event in events) { + switch (event.type) { + case EventType.createFile: + if (_files.contains(path)) continue; + _emitEvent(ChangeType.ADD, path); + _files.add(path); + + case EventType.createDirectory: + if (_files.containsDir(path)) continue; + + // "Path not found" can be caused by creating then quickly removing + // a directory: continue without reporting an error. Nested files + // that get removed during the `list` are already ignored by `list` + // itself, so there are no other types of "path not found" that + // might need different handling here. + var stream = Directory(path) + .list(recursive: true) + .ignoring(); + var subscription = stream.listen((entity) { + if (entity is Directory) return; + if (_files.contains(entity.path)) return; + + _emitEvent(ChangeType.ADD, entity.path); + _files.add(entity.path); + }, cancelOnError: true); + subscription.onDone(() { + _listSubscriptions.remove(subscription); + }); + subscription.onError((Object e, StackTrace stackTrace) { + _listSubscriptions.remove(subscription); + _emitError(e, stackTrace); + }); + _listSubscriptions.add(subscription); + + case EventType.modifyFile: + _emitEvent(ChangeType.MODIFY, path); + + case EventType.delete: + for (var removedPath in _files.remove(path)) { + _emitEvent(ChangeType.REMOVE, removedPath); + } + + // Move events are removed by `_onEvent` and never returned by + // `_eventsBasedOnFileSystem`. + case EventType.moveFile: + case EventType.moveDirectory: + throw StateError(event.type.name); + + // Dropped by [Event.checkAndConvert]. + case EventType.modifyDirectory: + assert(event.type.isIgnoredOnWindows); + } + } + }); + } + + /// Sort all the events in a batch into sets based on their path. + Map> _sortEvents(List batch) { + // On Windows new links to directories are sometimes reported by + // Directory.watch as directories. On all other platforms it reports them + // consistently as files. See https://github.com/dart-lang/sdk/issues/61797. + // + // The wrong type is because Windows creates links to directories as actual + // directories, then converts them to links. Directory.watch sometimes + // checks the type too early and gets the wrong result. + // + // The batch delay is plenty for the link to be fully created, so verify the + // file system entity type for all createDirectory` events, converting to + // `createFile` when needed. + for (var i = 0; i != batch.length; ++i) { + final event = batch[i]; + if (event.type == EventType.createDirectory) { + if (FileSystemEntity.typeSync(event.path, followLinks: false) == + FileSystemEntityType.link) { + batch[i] = Event.createFile(event.path); + } + } + } + + var eventsForPaths = >{}; + + // Events within directories that already have create events are not needed + // as the directory's full content will be listed. + var createdDirectories = unionAll(batch.map((event) { + return event.type == EventType.createDirectory + ? {event.path} + : const {}; + })); + + bool isInCreatedDirectory(String path) => + createdDirectories.any((dir) => path != dir && p.isWithin(dir, path)); + + void addEvent(String path, Event event) { + if (isInCreatedDirectory(path)) return; + eventsForPaths.putIfAbsent(path, () => {}).add(event); + } + + for (var event in batch) { + addEvent(event.path, event); + } + + return eventsForPaths; + } + + /// Returns the canonical event from a batch of events on the same path, or + /// `null` to indicate that the filesystem should be checked. + Event? _canonicalEvent(Set batch) { + // If the batch is empty, return `null`. + if (batch.isEmpty) return null; + + // Resolve the event type for the batch. + var types = batch.map((e) => e.type).toSet(); + EventType type; + if (types.length == 1) { + // There's only one event. + type = types.single; + } else if (types.length == 2 && + types.contains(EventType.modifyFile) && + types.contains(EventType.createFile)) { + // Combine events of type [EventType.modifyFile] and + // [EventType.createFile] to one event. + type = EventType.createFile; + } else { + // There are incompatible event types, check the filesystem. + return null; + } + + return batch.firstWhere((e) => e.type == type); + } + + /// Returns zero or more events that describe the change between the last + /// known state of [path] and its current state on the filesystem. + /// + /// This returns a list whose order should be reflected in the events emitted + /// to the user, unlike the batched events from [Directory.watch]. The + /// returned list may be empty, indicating that no changes occurred to [path] + /// (probably indicating that it was created and then immediately deleted). + List _eventsBasedOnFileSystem(String path) { + var fileExisted = _files.contains(path); + var dirExisted = _files.containsDir(path); + + bool fileExists; + bool dirExists; + try { + fileExists = File(path).existsSync(); + dirExists = Directory(path).existsSync(); + } on FileSystemException { + return const []; + } + + var events = []; + if (fileExisted) { + if (fileExists) { + events.add(Event.modifyFile(path)); + } else { + events.add(Event.delete(path)); + } + } else if (dirExisted) { + if (dirExists) { + // If we got contradictory events for a directory that used to exist and + // still exists, we need to rescan the whole thing in case it was + // replaced with a different directory. + events.add(Event.delete(path)); + events.add(Event.createDirectory(path)); + } else { + events.add(Event.delete(path)); + } + } + + if (!fileExisted && fileExists) { + events.add(Event.createFile(path)); + } else if (!dirExisted && dirExists) { + events.add(Event.createDirectory(path)); + } + + return events; + } + + /// The callback that's run when the [Directory.watch] stream is closed. + /// Note that this is unlikely to happen on Windows, unless the system itself + /// closes the handle. + void _onDone() { + _watchSubscription = null; + + // Emit remove events for any remaining files. + for (var file in _files.paths) { + _emitEvent(ChangeType.REMOVE, file); + } + _files.clear(); + close(); + } + + /// Start or restart the underlying [Directory.watch] stream. + void _startWatch() { + // Note: in older SDKs "watcher closed" exceptions might not get sent over + // the stream returned by watch, and must be caught via a zone handler. + runZonedGuarded( + () { + var innerStream = Directory(path).watch(recursive: true); + _watchSubscription = innerStream.listen( + _onEvent, + onError: _restartWatchOnOverflowOr(_eventsController.addError), + onDone: _onDone, + ); + }, + _restartWatchOnOverflowOr((error, stackTrace) { + // ignore: only_throw_errors + throw error; + }), + ); + } + + void Function(Object, StackTrace) _restartWatchOnOverflowOr( + void Function(Object, StackTrace) otherwise) { + return (Object error, StackTrace stackTrace) async { + if (error is FileSystemException && + error.message.startsWith('Directory watcher closed unexpectedly')) { + // Wait to work around https://github.com/dart-lang/sdk/issues/61378. + // Give the VM time to reset state after the error. See the issue for + // more discussion of the workaround. + await _watchSubscription?.cancel(); + await Future.delayed(const Duration(milliseconds: 1)); + _eventsController.addError(error, stackTrace); + _startWatch(); + } else { + otherwise(error, stackTrace); + } + }; + } + + /// Starts or restarts listing the watched directory to get an initial picture + /// of its state. + Future _listDir() { + assert(!isReady); + _initialListSubscription?.cancel(); + + _files.clear(); + var completer = Completer(); + var stream = Directory(path).listRecursivelyIgnoringErrors(); + void handleEntity(FileSystemEntity entity) { + if (entity is! Directory) _files.add(entity.path); + } + + _initialListSubscription = stream.listen( + handleEntity, + onError: _emitError, + onDone: completer.complete, + cancelOnError: true, + ); + return completer.future; + } + + /// Emit an event with the given [type] and [path]. + void _emitEvent(ChangeType type, String path) { + if (!isReady) return; + + _eventsController.add(WatchEvent(type, path)); + } + + /// Emit an error, then close the watcher. + void _emitError(Object error, StackTrace stackTrace) { + // Guarantee that ready always completes. + if (!isReady) { + _readyCompleter.complete(); + } + _eventsController.addError(error, stackTrace); + close(); + } +} diff --git a/pkgs/watcher/test/directory_watcher/file_tests.dart b/pkgs/watcher/test/directory_watcher/file_tests.dart index d56f921d8..998d3c451 100644 --- a/pkgs/watcher/test/directory_watcher/file_tests.dart +++ b/pkgs/watcher/test/directory_watcher/file_tests.dart @@ -2,10 +2,12 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:io' as io; import 'dart:io'; import 'dart:isolate'; +import 'package:async/async.dart'; import 'package:test/test.dart'; import 'package:test_descriptor/test_descriptor.dart' as d; import 'package:watcher/src/utils.dart'; @@ -34,6 +36,89 @@ void _fileTests({required bool isNative}) { } }); + // ResubscribableWatcher wraps all the directory watchers to add handling of + // multiple subscribers. The underlying watcher is created when there is at + // least one subscriber and closed when there are zero subscribers. So, + // exercise that behavior in various ways. + test('ResubscribableWatcher handles multiple subscriptions ', () async { + final watcher = createWatcher(); + + // One subscription then close it. + final queue1 = StreamQueue(watcher.events); + final event1 = queue1.next; + await watcher.ready; + writeFile('a.txt'); + expect(await event1, isAddEvent('a.txt')); + await queue1.cancel(immediate: true); + + // Open before "ready", cancel before event. + final queue2a = StreamQueue(watcher.events); + // Open before "ready", cancel after one event. + final queue2b = StreamQueue(watcher.events); + // Open before "ready", cancel after two events. + final queue2c = StreamQueue(watcher.events); + + final queue2aHasNext = queue2a.hasNext; + unawaited(queue2a.cancel(immediate: true)); + expect(await queue2aHasNext, false); + + await watcher.ready; + + // Open after "ready", cancel before event. + final queue2d = StreamQueue(watcher.events); + + // Open after "ready", cancel after one event. + final queue2e = StreamQueue(watcher.events); + + // Open after "ready", cancel after two events. + final queue2f = StreamQueue(watcher.events); + + unawaited(queue2d.cancel(immediate: true)); + + writeFile('b.txt'); + + expect(await queue2b.next, isAddEvent('b.txt')); + expect(await queue2c.next, isAddEvent('b.txt')); + expect(await queue2e.next, isAddEvent('b.txt')); + expect(await queue2f.next, isAddEvent('b.txt')); + final queue2bHasNext = queue2b.hasNext; + await queue2b.cancel(immediate: true); + expect(await queue2bHasNext, false); + final queue2eHasNext = queue2e.hasNext; + await queue2e.cancel(immediate: true); + expect(await queue2eHasNext, false); + + // Remaining subscriptions still get events. + writeFile('c.txt'); + expect(await queue2c.next, isAddEvent('c.txt')); + expect(await queue2f.next, isAddEvent('c.txt')); + final queue2cHasNext = queue2c.hasNext; + await queue2c.cancel(immediate: true); + expect(await queue2cHasNext, false); + final queue2fHasNext = queue2f.hasNext; + await queue2f.cancel(immediate: true); + expect(await queue2fHasNext, false); + + // Repeat the first simple test: one subscription then close it. + final queue3 = StreamQueue(watcher.events); + await watcher.ready; + writeFile('d.txt'); + expect(await queue3.next, isAddEvent('d.txt')); + final queue3HasNext = queue3.hasNext; + await queue3.cancel(immediate: true); + expect(await queue3HasNext, false); + }); + + test('unsubscribe then resubscribe', () async { + final watcher = createWatcher(); + + await watcher.events.listen((_) {}).cancel(); + + final event = watcher.events.first; + writeFile('a.txt'); + expect(await event, isAddEvent('a.txt')); + }); + test('does not notify for files that already exist when started', () async { // Make some pre-existing files. writeFile('a.txt'); diff --git a/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart b/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart new file mode 100644 index 000000000..9f160b354 --- /dev/null +++ b/pkgs/watcher/test/directory_watcher/windows_isolate_test.dart @@ -0,0 +1,116 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +@TestOn('windows') +@Timeout.factor(2) +library; + +import 'dart:async'; +import 'dart:io'; + +import 'package:test/test.dart'; +import 'package:watcher/watcher.dart'; + +void main() { + // The Windows native watcher has a buffer that gets exhausted if events are + // not handled quickly enough. Then, it throws an error and stops watching. + // The exhaustion is reliably triggered if enough events arrive during a sync + // block. The `package:watcher` implementation tries to catch this and recover + // by starting a new watcher. + for (final runInIsolate in [false, true]) { + late StreamSubscription subscription; + late Directory temp; + late int eventsSeen; + late int errorsSeen; + late int totalErrorsSeen; + + setUp(() async { + temp = Directory.systemTemp.createTempSync(); + final watcher = + DirectoryWatcher(temp.path, runInIsolateOnWindows: runInIsolate); + + eventsSeen = 0; + errorsSeen = 0; + totalErrorsSeen = 0; + subscription = watcher.events.listen( + (e) { + ++eventsSeen; + }, + onError: (_, __) { + ++errorsSeen; + }, + ); + await watcher.ready; + }); + + tearDown(() { + subscription.cancel(); + }); + + test( + runInIsolate + ? 'No buffer exhaustion if running in isolate' + : 'Recover from buffer exhaustion if not running in isolate', + () async { + // Use a long filename to fill the buffer. + final file = File('${temp.path}\\file'.padRight(255, 'a')); + + // Repeatedly trigger buffer exhaustion, to check that recovery is + // reliable. + for (var times = 0; times != 200; ++times) { + errorsSeen = 0; + eventsSeen = 0; + + // Syncronously trigger 100 events. Because this is a sync block, the VM + // won't handle the events, so this has a very high chance of triggering + // a buffer exhaustion. + // + // If a buffer exhaustion happens, `package:watcher` turns this into an + // error on the event stream, so `errorsSeen` will get incremented once. + // The number of changes 200 is chosen so this is very likely to happen. + // If there is _not_ an exhaustion, the 200 events will show on the + // stream as a single event because they are changes of the same file. + // So, `eventsSeen` will instead be incremented once. + for (var i = 0; i != 200; ++i) { + file.writeAsStringSync(''); + } + + // Events only happen when there is an async gap, wait for such a gap. + // The event usually arrives in under 10ms, try for 100ms. + var tries = 0; + while (errorsSeen == 0 && eventsSeen == 0 && tries < 10) { + await Future.delayed(const Duration(milliseconds: 10)); + ++tries; + } + + totalErrorsSeen += errorsSeen; + + // If everything is going well, there should have been either one event + // seen or one error seen. + if (errorsSeen == 0 && eventsSeen == 0) { + // It looks like the watcher is now broken: there were file changes + // but no event and no error. Do some non-sync writes to confirm + // whether the watcher really is now broken. + for (var i = 0; i != 5; ++i) { + await file.writeAsString(''); + } + await Future.delayed(const Duration(milliseconds: 10)); + fail( + 'On attempt ${times + 1}, watcher registered nothing. ' + 'On retry, it registered: $errorsSeen error(s), $eventsSeen ' + 'event(s).', + ); + } + } + + // Buffer exhaustion is very likely without the isolate and very unlikely + // with it, but neither case is guaranteed. + if (runInIsolate) { + expect(totalErrorsSeen, lessThan(10)); + } else { + expect(totalErrorsSeen, greaterThan(190)); + } + }); + } +} diff --git a/pkgs/watcher/test/directory_watcher/windows_test.dart b/pkgs/watcher/test/directory_watcher/windows_test.dart index c4f6d44a0..eb5fd2d43 100644 --- a/pkgs/watcher/test/directory_watcher/windows_test.dart +++ b/pkgs/watcher/test/directory_watcher/windows_test.dart @@ -114,88 +114,4 @@ void main() { expect(errorsSeen, 0); }); }); - - // The Windows native watcher has a buffer that gets exhausted if events are - // not handled quickly enough. Then, it throws an error and stops watching. - // The exhaustion is reliably triggered if enough events arrive during a sync - // block. The `package:watcher` implementation tries to catch this and recover - // by starting a new watcher. - group('Buffer exhaustion', () { - late StreamSubscription subscription; - late Directory temp; - late int eventsSeen; - late int errorsSeen; - - setUp(() async { - temp = Directory.systemTemp.createTempSync(); - final watcher = DirectoryWatcher(temp.path); - - eventsSeen = 0; - errorsSeen = 0; - subscription = watcher.events.listen( - (e) { - ++eventsSeen; - }, - onError: (_, __) { - ++errorsSeen; - }, - ); - await watcher.ready; - }); - - tearDown(() { - subscription.cancel(); - }); - - test('recovery', () async { - // Use a long filename to fill the buffer. - final file = File('${temp.path}\\file'.padRight(255, 'a')); - - // Repeatedly trigger buffer exhaustion, to check that recovery is - // reliable. - for (var times = 0; times != 200; ++times) { - errorsSeen = 0; - eventsSeen = 0; - - // Syncronously trigger 200 events. Because this is a sync block, the VM - // won't handle the events, so this has a very high chance of triggering - // a buffer exhaustion. - // - // If a buffer exhaustion happens, `package:watcher` turns this into an - // error on the event stream, so `errorsSeen` will get incremented once. - // The number of changes 200 is chosen so this is very likely to happen. - // If there is _not_ an exhaustion, the 200 events will show on the - // stream as a single event because they are changes of the same file. - // So, `eventsSeen` will instead be incremented once. - for (var i = 0; i != 200; ++i) { - file.writeAsStringSync(''); - } - - // Events only happen when there is an async gap, wait for such a gap. - // The event usually arrives in under 10ms, try for 100ms. - var tries = 0; - while (errorsSeen == 0 && eventsSeen == 0 && tries < 10) { - await Future.delayed(const Duration(milliseconds: 10)); - ++tries; - } - - // If everything is going well, there should have been either one event - // seen or one error seen. - if (errorsSeen == 0 && eventsSeen == 0) { - // It looks like the watcher is now broken: there were file changes - // but no event and no error. Do some non-sync writes to confirm - // whether the watcher really is now broken. - for (var i = 0; i != 5; ++i) { - await file.writeAsString(''); - } - await Future.delayed(const Duration(milliseconds: 10)); - fail( - 'On attempt ${times + 1}, watcher registered nothing. ' - 'On retry, it registered: $errorsSeen error(s), $eventsSeen ' - 'event(s).', - ); - } - } - }); - }); }