Skip to content

Commit 6e8c83b

Browse files
authored
Run Windows watcher in an isolate (#2239)
1 parent f0467d7 commit 6e8c83b

File tree

12 files changed

+443
-112
lines changed

12 files changed

+443
-112
lines changed

pkgs/watcher/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
- `DirectoryWatcher` on Windows performance: reduce 100ms buffering of events
77
before reporting to 5ms, the larger buffer isn't needed for correctness after
88
the various fixes.
9+
- `DirectoryWatcher` on Windows watches in a separate Isolate to make buffer
10+
exhaustion, "Directory watcher closed unexpectedly", much less likely. The old
11+
implementation which does not use a separate Isolate is available as
12+
`DirectoryWatcher(path, runInIsolateOnWindows: false)`.
913
- Bug fix: while listing directories skip symlinks that lead to a directory
1014
that has already been listed. This prevents a severe performance regression on
1115
MacOS and Linux when there are more than a few symlink loops.

pkgs/watcher/lib/src/directory_watcher.dart

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import '../watcher.dart';
88
import 'custom_watcher_factory.dart';
99
import 'directory_watcher/linux.dart';
1010
import 'directory_watcher/mac_os.dart';
11-
import 'directory_watcher/windows.dart';
11+
import 'directory_watcher/windows_resubscribable_watcher.dart';
1212

1313
/// Watches the contents of a directory and emits [WatchEvent]s when something
1414
/// in the directory has changed.
@@ -20,6 +20,8 @@ import 'directory_watcher/windows.dart';
2020
/// the message "Directory watcher closed unexpectedly" on the event stream. The
2121
/// code using the watcher needs to do additional work to account for the
2222
/// dropped events, for example by recomputing interesting files from scratch.
23+
/// By default, the watcher is started in a separate isolate to make this less
24+
/// likely. Pass `runInIsolateOnWindows = false` to not launch an isolate.
2325
///
2426
/// On Linux, the underlying SDK `Directory.watch` fails if the system limit on
2527
/// watchers has been reached. If this happens the SDK exception is thrown, it
@@ -40,7 +42,11 @@ abstract class DirectoryWatcher implements Watcher {
4042
/// shorter will give more immediate feedback at the expense of doing more IO
4143
/// and higher CPU usage. Defaults to one second. Ignored for non-polling
4244
/// watchers.
43-
factory DirectoryWatcher(String directory, {Duration? pollingDelay}) {
45+
///
46+
/// On Windows, pass [runInIsolateOnWindows] `false` to not run the watcher
47+
/// in a separate isolate to reduce buffer exhaustion failures.
48+
factory DirectoryWatcher(String directory,
49+
{Duration? pollingDelay, bool runInIsolateOnWindows = true}) {
4450
if (FileSystemEntity.isWatchSupported) {
4551
var customWatcher = createCustomDirectoryWatcher(
4652
directory,
@@ -49,7 +55,10 @@ abstract class DirectoryWatcher implements Watcher {
4955
if (customWatcher != null) return customWatcher;
5056
if (Platform.isLinux) return LinuxDirectoryWatcher(directory);
5157
if (Platform.isMacOS) return MacOSDirectoryWatcher(directory);
52-
if (Platform.isWindows) return WindowsDirectoryWatcher(directory);
58+
if (Platform.isWindows) {
59+
return WindowsDirectoryWatcher(directory,
60+
runInIsolate: runInIsolateOnWindows);
61+
}
5362
}
5463
return PollingDirectoryWatcher(directory, pollingDelay: pollingDelay);
5564
}

pkgs/watcher/lib/src/directory_watcher/directory_list.dart

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,17 @@ class _ResolvedDirectory {
142142
} on FileSystemException catch (e, s) {
143143
// The first operation on a directory is to resolve symbolic links, which
144144
// fails with a general FileSystemException if the file is not found.
145-
// Convert that into a PathNotFoundException as that makes more sense
146-
// to the caller, who didn't ask for anything to do with symbolic links.
147-
if (e.message.contains('Cannot resolve symbolic links') &&
148-
e.osError?.errorCode == 2) {
149-
throw Error.throwWithStackTrace(
150-
PathNotFoundException(directory.path, e.osError!), s);
145+
// Convert that into a PathNotFoundException or PathAccessException
146+
// as that makes more sense to the caller, who didn't ask for anything to
147+
// do with symbolic links.
148+
if (e.message.contains('Cannot resolve symbolic links')) {
149+
if (e.osError?.errorCode == 2) {
150+
throw Error.throwWithStackTrace(
151+
PathNotFoundException(directory.path, e.osError!), s);
152+
} else if (e.osError?.errorCode == 5) {
153+
throw Error.throwWithStackTrace(
154+
PathAccessException(directory.path, e.osError!), s);
155+
}
151156
}
152157
rethrow;
153158
}

pkgs/watcher/lib/src/directory_watcher/windows.dart

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ class WindowsDirectoryWatcher extends ResubscribableWatcher
2222
String get directory => path;
2323

2424
WindowsDirectoryWatcher(String directory)
25-
: super(directory, () => _WindowsDirectoryWatcher(directory));
25+
: super(
26+
directory, () => WindowsManuallyClosedDirectoryWatcher(directory));
2627
}
2728

2829
/// Windows directory watcher.
@@ -48,7 +49,7 @@ class WindowsDirectoryWatcher extends ResubscribableWatcher
4849
/// On my machine, the test failure rate due to the type drops from 150/1000
4950
/// at 900us to 0/10000 at 1000us. So, 1000us = 1ms is sufficient. Use 5ms to
5051
/// give a margin for error for different machine performance and load.
51-
class _WindowsDirectoryWatcher
52+
class WindowsManuallyClosedDirectoryWatcher
5253
implements DirectoryWatcher, ManuallyClosedWatcher {
5354
@override
5455
String get directory => path;
@@ -94,7 +95,7 @@ class _WindowsDirectoryWatcher
9495
final Set<StreamSubscription<FileSystemEntity>> _listSubscriptions =
9596
HashSet<StreamSubscription<FileSystemEntity>>();
9697

97-
_WindowsDirectoryWatcher(this.path) : _files = PathSet(path) {
98+
WindowsManuallyClosedDirectoryWatcher(this.path) : _files = PathSet(path) {
9899
// Before we're ready to emit events, wait for [_listDir] to complete.
99100
_listDir().then((_) {
100101
_startWatch();
@@ -181,7 +182,9 @@ class _WindowsDirectoryWatcher
181182
event.type == EventType.createDirectory,
182183
modified: event.type == EventType.modifyFile ||
183184
event.type == EventType.modifyDirectory,
184-
deleted: event.type == EventType.delete,
185+
deleted: event.type == EventType.delete ||
186+
event.type == EventType.moveFile ||
187+
event.type == EventType.moveDirectory,
185188
movedOnto: false);
186189
final destination = event.destination;
187190
if (destination != null) {
@@ -208,13 +211,15 @@ class _WindowsDirectoryWatcher
208211
void _poll(_PendingPoll poll) {
209212
final path = poll.path;
210213
final events = _eventsBasedOnFileSystem(path,
214+
reportCreate: poll.created || poll.movedOnto,
215+
reportDelete: poll.deleted,
211216
// A modification can be reported due to a modification event, a
212217
// create+delete together, or if the path is a move destination.
213218
// The important case where the file is present, an event arrives
214219
// for the file and a modification is _not_ reported is when the file
215220
// was already discovered by listing a new directory, then the "add"
216221
// event for it is processed afterwards.
217-
reportNoChangeAsModification:
222+
reportModification:
218223
poll.modified || (poll.created && poll.deleted) || poll.movedOnto);
219224

220225
for (final event in events) {
@@ -264,10 +269,13 @@ class _WindowsDirectoryWatcher
264269
/// This returns a list whose order should be reflected in the events emitted
265270
/// to the user, unlike the batched events from [Directory.watch].
266271
///
267-
/// [reportNoChangeAsModification] determines whether to report a modification
268-
/// if there was a file at [path] and there is still a file at [path].
272+
///
273+
/// [reportCreate], [reportModification] and [reportDelete] restrict the types
274+
/// of events that can be emitted.
269275
List<Event> _eventsBasedOnFileSystem(String path,
270-
{required bool reportNoChangeAsModification}) {
276+
{required bool reportCreate,
277+
required bool reportModification,
278+
required bool reportDelete}) {
271279
var fileExisted = _files.contains(path);
272280
var dirExisted = _files.containsDir(path);
273281

@@ -285,26 +293,26 @@ class _WindowsDirectoryWatcher
285293
var events = <Event>[];
286294
if (fileExisted) {
287295
if (fileExists) {
288-
if (reportNoChangeAsModification) events.add(Event.modifyFile(path));
296+
if (reportModification) events.add(Event.modifyFile(path));
289297
} else {
290-
events.add(Event.delete(path));
298+
if (reportDelete) events.add(Event.delete(path));
291299
}
292300
} else if (dirExisted) {
293301
if (dirExists) {
294302
// If we got contradictory events for a directory that used to exist and
295303
// still exists, we need to rescan the whole thing in case it was
296304
// replaced with a different directory.
297-
events.add(Event.delete(path));
298-
events.add(Event.createDirectory(path));
305+
if (reportDelete) events.add(Event.delete(path));
306+
if (reportCreate) events.add(Event.createDirectory(path));
299307
} else {
300-
events.add(Event.delete(path));
308+
if (reportDelete) events.add(Event.delete(path));
301309
}
302310
}
303311

304312
if (!fileExisted && fileExists) {
305-
events.add(Event.createFile(path));
313+
if (reportCreate) events.add(Event.createFile(path));
306314
} else if (!dirExisted && dirExists) {
307-
events.add(Event.createDirectory(path));
315+
if (reportCreate) events.add(Event.createDirectory(path));
308316
}
309317

310318
return events;
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:isolate';
7+
8+
import '../resubscribable.dart';
9+
import '../watch_event.dart';
10+
import 'windows.dart';
11+
12+
/// Runs [WindowsManuallyClosedDirectoryWatcher] in an isolate to work around
13+
/// a platform limitation.
14+
///
15+
/// On Windows, Directory.watch fails if too many events arrive without being
16+
/// processed by the Dart VM. See `directory_watcher/windows_test.dart` for code
17+
/// that reliably triggers the failure by doing file writes in a synchronous
18+
/// block that prevents the Dart VM from processing the events caused.
19+
///
20+
/// Running the watcher in an isolate makes buffer exhaustion much less likely
21+
/// as there is no unrelated work happening in the isolate that would block
22+
/// processing of events.
23+
class WindowsIsolateDirectoryWatcher implements ManuallyClosedWatcher {
24+
@override
25+
final String path;
26+
final ReceivePort _receivePort = ReceivePort();
27+
final Completer<SendPort> _sendPortCompleter = Completer();
28+
29+
final StreamController<WatchEvent> _eventsController =
30+
StreamController.broadcast();
31+
final Completer<void> _readyCompleter = Completer();
32+
33+
WindowsIsolateDirectoryWatcher(this.path) {
34+
_startIsolate(path, _receivePort.sendPort);
35+
_receivePort.listen((event) => _receiveFromIsolate(event as Event));
36+
}
37+
38+
void _receiveFromIsolate(Event event) {
39+
switch (event.type) {
40+
case EventType.sendPort:
41+
_sendPortCompleter.complete(event.sendPort);
42+
case EventType.ready:
43+
_readyCompleter.complete();
44+
case EventType.watchEvent:
45+
_eventsController.add(event.watchEvent!);
46+
case EventType.close:
47+
_eventsController.close();
48+
_receivePort.close();
49+
case EventType.error:
50+
_eventsController.addError(event.error!, event.stackTrace);
51+
}
52+
}
53+
54+
@override
55+
void close() {
56+
// The "close" event is the only event sent to the isolate, just send
57+
// `null`.
58+
_sendPortCompleter.future.then((sendPort) => sendPort.send(null));
59+
}
60+
61+
@override
62+
Stream<WatchEvent> get events => _eventsController.stream;
63+
64+
@override
65+
bool get isReady => _readyCompleter.isCompleted;
66+
67+
@override
68+
Future<void> get ready => _readyCompleter.future;
69+
}
70+
71+
/// Starts watching [path] in an isolate.
72+
///
73+
/// [sendPort] is the port from isolate to host, see `_WatcherIsolate`
74+
/// constructor implementation for the events that will be sent.
75+
void _startIsolate(String path, SendPort sendPort) async {
76+
unawaited(
77+
Isolate.run(() async => await _WatcherIsolate(path, sendPort).closed));
78+
}
79+
80+
class _WatcherIsolate {
81+
final String path;
82+
final WindowsManuallyClosedDirectoryWatcher watcher;
83+
final SendPort sendPort;
84+
85+
// The isolate stays open until this future completes.
86+
Future<void> get closed => _closeCompleter.future;
87+
final Completer<void> _closeCompleter = Completer();
88+
89+
_WatcherIsolate(this.path, this.sendPort)
90+
: watcher = WindowsManuallyClosedDirectoryWatcher(path) {
91+
final receivePort = ReceivePort();
92+
93+
// Six types of event are sent to the host.
94+
95+
// The `SendPort` for host to isolate communication on startup.
96+
sendPort.send(Event.sendPort(receivePort.sendPort));
97+
98+
// `Event.ready` when the watcher is ready.
99+
watcher.ready.then((_) {
100+
sendPort.send(Event.ready());
101+
});
102+
103+
watcher.events.listen((event) {
104+
// The watcher events.
105+
sendPort.send(Event.watchEvent(event));
106+
}, onDone: () {
107+
// `Event.close` if the watcher event stream closes.
108+
sendPort.send(Event.close());
109+
}, onError: (Object e, StackTrace s) {
110+
// `Event.error` on error.
111+
sendPort.send(Event.error(e, s));
112+
});
113+
114+
receivePort.listen((event) {
115+
// The only event sent from the host to the isolate is "close", no need
116+
// to check the value.
117+
watcher.close();
118+
_closeCompleter.complete();
119+
receivePort.close();
120+
});
121+
}
122+
}
123+
124+
/// Event sent from the isolate to the host.
125+
class Event {
126+
final EventType type;
127+
final SendPort? sendPort;
128+
final WatchEvent? watchEvent;
129+
final Object? error;
130+
final StackTrace? stackTrace;
131+
132+
Event.sendPort(this.sendPort)
133+
: type = EventType.sendPort,
134+
watchEvent = null,
135+
error = null,
136+
stackTrace = null;
137+
138+
Event.ready()
139+
: type = EventType.ready,
140+
sendPort = null,
141+
watchEvent = null,
142+
error = null,
143+
stackTrace = null;
144+
145+
Event.watchEvent(this.watchEvent)
146+
: type = EventType.watchEvent,
147+
sendPort = null,
148+
error = null,
149+
stackTrace = null;
150+
151+
Event.close()
152+
: type = EventType.close,
153+
sendPort = null,
154+
watchEvent = null,
155+
error = null,
156+
stackTrace = null;
157+
158+
Event.error(this.error, this.stackTrace)
159+
: type = EventType.error,
160+
sendPort = null,
161+
watchEvent = null;
162+
}
163+
164+
enum EventType {
165+
sendPort,
166+
ready,
167+
watchEvent,
168+
close,
169+
error;
170+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import '../directory_watcher.dart';
6+
import '../resubscribable.dart';
7+
import 'windows.dart';
8+
import 'windows_isolate_directory_watcher.dart';
9+
10+
class WindowsDirectoryWatcher extends ResubscribableWatcher
11+
implements DirectoryWatcher {
12+
@override
13+
String get directory => path;
14+
15+
/// Watches [directory].
16+
///
17+
/// If [runInIsolate], runs the watcher in an isolate to reduce the chance of
18+
/// hitting the Windows-specific buffer exhaustion failure.
19+
WindowsDirectoryWatcher(String directory, {bool runInIsolate = true})
20+
: super(
21+
directory,
22+
() => runInIsolate
23+
? WindowsIsolateDirectoryWatcher(directory)
24+
: WindowsManuallyClosedDirectoryWatcher(directory));
25+
}

pkgs/watcher/lib/src/resubscribable.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ abstract class ResubscribableWatcher implements Watcher {
4545

4646
_eventsController = StreamController<WatchEvent>.broadcast(
4747
onListen: () async {
48+
final completer = _readyCompleter;
4849
watcher = _factory();
4950
subscription = watcher.events.listen(_eventsController.add,
5051
onError: _eventsController.addError,
@@ -54,7 +55,7 @@ abstract class ResubscribableWatcher implements Watcher {
5455
// the time [onListen] is called, as opposed to the value when
5556
// [watcher.ready] fires. A new completer may be created by that time.
5657
await watcher.ready;
57-
_readyCompleter.complete();
58+
completer.complete();
5859
},
5960
onCancel: () {
6061
// Cancel the subscription before closing the watcher so that the

0 commit comments

Comments
 (0)