Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkgs/watcher/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
- `DirectoryWatcher` on Windows performance: reduce 100ms buffering of events
before reporting to 5ms, the larger buffer isn't needed for correctness after
the various fixes.
- `DirectoryWatcher` on Windows watches in a separate Isolate to make buffer
exhaustion, "Directory watcher closed unexpectedly", much less likely. The old
implementation which does not use a separate Isolate is available as
`DirectoryWatcher(path, runInIsolateOnWindows: false)`.
- Bug fix: while listing directories skip symlinks that lead to a directory
that has already been listed. This prevents a severe performance regression on
MacOS and Linux when there are more than a few symlink loops.
Expand Down
15 changes: 12 additions & 3 deletions pkgs/watcher/lib/src/directory_watcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import '../watcher.dart';
import 'custom_watcher_factory.dart';
import 'directory_watcher/linux.dart';
import 'directory_watcher/mac_os.dart';
import 'directory_watcher/windows.dart';
import 'directory_watcher/windows_resubscribable_watcher.dart';

/// Watches the contents of a directory and emits [WatchEvent]s when something
/// in the directory has changed.
Expand All @@ -20,6 +20,8 @@ import 'directory_watcher/windows.dart';
/// the message "Directory watcher closed unexpectedly" on the event stream. The
/// code using the watcher needs to do additional work to account for the
/// dropped events, for example by recomputing interesting files from scratch.
/// By default, the watcher is started in a separate isolate to make this less
/// likely. Pass `runInIsolateOnWindows = false` to not launch an isolate.
///
/// On Linux, the underlying SDK `Directory.watch` fails if the system limit on
/// watchers has been reached. If this happens the SDK exception is thrown, it
Expand All @@ -40,7 +42,11 @@ abstract class DirectoryWatcher implements Watcher {
/// shorter will give more immediate feedback at the expense of doing more IO
/// and higher CPU usage. Defaults to one second. Ignored for non-polling
/// watchers.
factory DirectoryWatcher(String directory, {Duration? pollingDelay}) {
///
/// On Windows, pass [runInIsolateOnWindows] `false` to not run the watcher
/// in a separate isolate to reduce buffer exhaustion failures.
factory DirectoryWatcher(String directory,
{Duration? pollingDelay, bool runInIsolateOnWindows = true}) {
if (FileSystemEntity.isWatchSupported) {
var customWatcher = createCustomDirectoryWatcher(
directory,
Expand All @@ -49,7 +55,10 @@ abstract class DirectoryWatcher implements Watcher {
if (customWatcher != null) return customWatcher;
if (Platform.isLinux) return LinuxDirectoryWatcher(directory);
if (Platform.isMacOS) return MacOSDirectoryWatcher(directory);
if (Platform.isWindows) return WindowsDirectoryWatcher(directory);
if (Platform.isWindows) {
return WindowsDirectoryWatcher(directory,
runInIsolate: runInIsolateOnWindows);
}
}
return PollingDirectoryWatcher(directory, pollingDelay: pollingDelay);
}
Expand Down
17 changes: 11 additions & 6 deletions pkgs/watcher/lib/src/directory_watcher/directory_list.dart
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,17 @@ class _ResolvedDirectory {
} on FileSystemException catch (e, s) {
// The first operation on a directory is to resolve symbolic links, which
// fails with a general FileSystemException if the file is not found.
// Convert that into a PathNotFoundException as that makes more sense
// to the caller, who didn't ask for anything to do with symbolic links.
if (e.message.contains('Cannot resolve symbolic links') &&
e.osError?.errorCode == 2) {
throw Error.throwWithStackTrace(
PathNotFoundException(directory.path, e.osError!), s);
// Convert that into a PathNotFoundException or PathAccessException
// as that makes more sense to the caller, who didn't ask for anything to
// do with symbolic links.
if (e.message.contains('Cannot resolve symbolic links')) {
if (e.osError?.errorCode == 2) {
throw Error.throwWithStackTrace(
PathNotFoundException(directory.path, e.osError!), s);
} else if (e.osError?.errorCode == 5) {
throw Error.throwWithStackTrace(
PathAccessException(directory.path, e.osError!), s);
}
}
rethrow;
}
Expand Down
38 changes: 23 additions & 15 deletions pkgs/watcher/lib/src/directory_watcher/windows.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class WindowsDirectoryWatcher extends ResubscribableWatcher
String get directory => path;

WindowsDirectoryWatcher(String directory)
: super(directory, () => _WindowsDirectoryWatcher(directory));
: super(
directory, () => WindowsManuallyClosedDirectoryWatcher(directory));
}

/// Windows directory watcher.
Expand All @@ -48,7 +49,7 @@ class WindowsDirectoryWatcher extends ResubscribableWatcher
/// On my machine, the test failure rate due to the type drops from 150/1000
/// at 900us to 0/10000 at 1000us. So, 1000us = 1ms is sufficient. Use 5ms to
/// give a margin for error for different machine performance and load.
class _WindowsDirectoryWatcher
class WindowsManuallyClosedDirectoryWatcher
implements DirectoryWatcher, ManuallyClosedWatcher {
@override
String get directory => path;
Expand Down Expand Up @@ -94,7 +95,7 @@ class _WindowsDirectoryWatcher
final Set<StreamSubscription<FileSystemEntity>> _listSubscriptions =
HashSet<StreamSubscription<FileSystemEntity>>();

_WindowsDirectoryWatcher(this.path) : _files = PathSet(path) {
WindowsManuallyClosedDirectoryWatcher(this.path) : _files = PathSet(path) {
// Before we're ready to emit events, wait for [_listDir] to complete.
_listDir().then((_) {
_startWatch();
Expand Down Expand Up @@ -181,7 +182,9 @@ class _WindowsDirectoryWatcher
event.type == EventType.createDirectory,
modified: event.type == EventType.modifyFile ||
event.type == EventType.modifyDirectory,
deleted: event.type == EventType.delete,
deleted: event.type == EventType.delete ||
event.type == EventType.moveFile ||
event.type == EventType.moveDirectory,
movedOnto: false);
final destination = event.destination;
if (destination != null) {
Expand All @@ -208,13 +211,15 @@ class _WindowsDirectoryWatcher
void _poll(_PendingPoll poll) {
final path = poll.path;
final events = _eventsBasedOnFileSystem(path,
reportCreate: poll.created || poll.movedOnto,
reportDelete: poll.deleted,
// A modification can be reported due to a modification event, a
// create+delete together, or if the path is a move destination.
// The important case where the file is present, an event arrives
// for the file and a modification is _not_ reported is when the file
// was already discovered by listing a new directory, then the "add"
// event for it is processed afterwards.
reportNoChangeAsModification:
reportModification:
poll.modified || (poll.created && poll.deleted) || poll.movedOnto);

for (final event in events) {
Expand Down Expand Up @@ -264,10 +269,13 @@ class _WindowsDirectoryWatcher
/// This returns a list whose order should be reflected in the events emitted
/// to the user, unlike the batched events from [Directory.watch].
///
/// [reportNoChangeAsModification] determines whether to report a modification
/// if there was a file at [path] and there is still a file at [path].
///
/// [reportCreate], [reportModification] and [reportDelete] restrict the types
/// of events that can be emitted.
List<Event> _eventsBasedOnFileSystem(String path,
{required bool reportNoChangeAsModification}) {
{required bool reportCreate,
required bool reportModification,
required bool reportDelete}) {
var fileExisted = _files.contains(path);
var dirExisted = _files.containsDir(path);

Expand All @@ -285,26 +293,26 @@ class _WindowsDirectoryWatcher
var events = <Event>[];
if (fileExisted) {
if (fileExists) {
if (reportNoChangeAsModification) events.add(Event.modifyFile(path));
if (reportModification) events.add(Event.modifyFile(path));
} else {
events.add(Event.delete(path));
if (reportDelete) events.add(Event.delete(path));
}
} else if (dirExisted) {
if (dirExists) {
// If we got contradictory events for a directory that used to exist and
// still exists, we need to rescan the whole thing in case it was
// replaced with a different directory.
events.add(Event.delete(path));
events.add(Event.createDirectory(path));
if (reportDelete) events.add(Event.delete(path));
if (reportCreate) events.add(Event.createDirectory(path));
} else {
events.add(Event.delete(path));
if (reportDelete) events.add(Event.delete(path));
}
}

if (!fileExisted && fileExists) {
events.add(Event.createFile(path));
if (reportCreate) events.add(Event.createFile(path));
} else if (!dirExisted && dirExists) {
events.add(Event.createDirectory(path));
if (reportCreate) events.add(Event.createDirectory(path));
}

return events;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:isolate';

import '../resubscribable.dart';
import '../watch_event.dart';
import 'windows.dart';

/// Runs [WindowsManuallyClosedDirectoryWatcher] in an isolate to work around
/// a platform limitation.
///
/// On Windows, Directory.watch fails if too many events arrive without being
/// processed by the Dart VM. See `directory_watcher/windows_test.dart` for code
/// that reliably triggers the failure by doing file writes in a synchronous
/// block that prevents the Dart VM from processing the events caused.
///
/// Running the watcher in an isolate makes buffer exhaustion much less likely
/// as there is no unrelated work happening in the isolate that would block
/// processing of events.
class WindowsIsolateDirectoryWatcher implements ManuallyClosedWatcher {
@override
final String path;
final ReceivePort _receivePort = ReceivePort();
final Completer<SendPort> _sendPortCompleter = Completer();

final StreamController<WatchEvent> _eventsController =
StreamController.broadcast();
final Completer<void> _readyCompleter = Completer();

WindowsIsolateDirectoryWatcher(this.path) {
_startIsolate(path, _receivePort.sendPort);
_receivePort.listen((event) => _receiveFromIsolate(event as Event));
}

void _receiveFromIsolate(Event event) {
switch (event.type) {
case EventType.sendPort:
_sendPortCompleter.complete(event.sendPort);
case EventType.ready:
_readyCompleter.complete();
case EventType.watchEvent:
_eventsController.add(event.watchEvent!);
case EventType.close:
_eventsController.close();
_receivePort.close();
case EventType.error:
_eventsController.addError(event.error!, event.stackTrace);
}
}

@override
void close() {
// The "close" event is the only event sent to the isolate, just send
// `null`.
_sendPortCompleter.future.then((sendPort) => sendPort.send(null));
}

@override
Stream<WatchEvent> get events => _eventsController.stream;

@override
bool get isReady => _readyCompleter.isCompleted;

@override
Future<void> get ready => _readyCompleter.future;
}

/// Starts watching [path] in an isolate.
///
/// [sendPort] is the port from isolate to host, see `_WatcherIsolate`
/// constructor implementation for the events that will be sent.
void _startIsolate(String path, SendPort sendPort) async {
unawaited(
Isolate.run(() async => await _WatcherIsolate(path, sendPort).closed));
}

class _WatcherIsolate {
final String path;
final WindowsManuallyClosedDirectoryWatcher watcher;
final SendPort sendPort;

// The isolate stays open until this future completes.
Future<void> get closed => _closeCompleter.future;
final Completer<void> _closeCompleter = Completer();

_WatcherIsolate(this.path, this.sendPort)
: watcher = WindowsManuallyClosedDirectoryWatcher(path) {
final receivePort = ReceivePort();

// Six types of event are sent to the host.

// The `SendPort` for host to isolate communication on startup.
sendPort.send(Event.sendPort(receivePort.sendPort));

// `Event.ready` when the watcher is ready.
watcher.ready.then((_) {
sendPort.send(Event.ready());
});

watcher.events.listen((event) {
// The watcher events.
sendPort.send(Event.watchEvent(event));
}, onDone: () {
// `Event.close` if the watcher event stream closes.
sendPort.send(Event.close());
}, onError: (Object e, StackTrace s) {
// `Event.error` on error.
sendPort.send(Event.error(e, s));
});

receivePort.listen((event) {
// The only event sent from the host to the isolate is "close", no need
// to check the value.
watcher.close();
_closeCompleter.complete();
receivePort.close();
});
}
}

/// Event sent from the isolate to the host.
class Event {
final EventType type;
final SendPort? sendPort;
final WatchEvent? watchEvent;
final Object? error;
final StackTrace? stackTrace;

Event.sendPort(this.sendPort)
: type = EventType.sendPort,
watchEvent = null,
error = null,
stackTrace = null;

Event.ready()
: type = EventType.ready,
sendPort = null,
watchEvent = null,
error = null,
stackTrace = null;

Event.watchEvent(this.watchEvent)
: type = EventType.watchEvent,
sendPort = null,
error = null,
stackTrace = null;

Event.close()
: type = EventType.close,
sendPort = null,
watchEvent = null,
error = null,
stackTrace = null;

Event.error(this.error, this.stackTrace)
: type = EventType.error,
sendPort = null,
watchEvent = null;
}

enum EventType {
sendPort,
ready,
watchEvent,
close,
error;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import '../directory_watcher.dart';
import '../resubscribable.dart';
import 'windows.dart';
import 'windows_isolate_directory_watcher.dart';

class WindowsDirectoryWatcher extends ResubscribableWatcher
implements DirectoryWatcher {
@override
String get directory => path;

/// Watches [directory].
///
/// If [runInIsolate], runs the watcher in an isolate to reduce the chance of
/// hitting the Windows-specific buffer exhaustion failure.
WindowsDirectoryWatcher(String directory, {bool runInIsolate = true})
: super(
directory,
() => runInIsolate
? WindowsIsolateDirectoryWatcher(directory)
: WindowsManuallyClosedDirectoryWatcher(directory));
}
3 changes: 2 additions & 1 deletion pkgs/watcher/lib/src/resubscribable.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ abstract class ResubscribableWatcher implements Watcher {

_eventsController = StreamController<WatchEvent>.broadcast(
onListen: () async {
final completer = _readyCompleter;
watcher = _factory();
subscription = watcher.events.listen(_eventsController.add,
onError: _eventsController.addError,
Expand All @@ -54,7 +55,7 @@ abstract class ResubscribableWatcher implements Watcher {
// the time [onListen] is called, as opposed to the value when
// [watcher.ready] fires. A new completer may be created by that time.
await watcher.ready;
_readyCompleter.complete();
completer.complete();
},
onCancel: () {
// Cancel the subscription before closing the watcher so that the
Expand Down
Loading