Skip to content

Commit 9453cb9

Browse files
committed
select: Enable timeout on select when PROXY_TO_PTHREAD
This commit enables the select syscall to handle timeout with multiple event sources. PROXY_TO_PTHREAD is needed to prevent blocking the main worker. When a thread worker invokes the select syscall with non-zero timeout and no fd is ready, it blocks using Atmoics.wait until it receives a readiness notification. On the main worker, the underlying stream implementation can trigger readiness using Atomics.notify through a callback. A notification also issued automatically once the specified timeout expires. Communication between the thread worker and the main worker occurs via shared memory regions. To prevent a select invocation being unblocked by the callbacks of a previous invocation, all active callbacks are tracked in a list. See the comments in activeSelectCallbacks for details. Usage of the notification callback is optional. If the stream implementation doesn't support it, the "poll" method can still synchronously return the event status. Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
1 parent 5898ffe commit 9453cb9

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

src/lib/libsyscall.js

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,43 @@ var SyscallsLibrary = {
167167
}
168168
};
169169
},
170+
171+
#if PROXY_TO_PTHREAD
172+
// Pointers to regions which will be shared to the main worker for
173+
// notification purpose.
174+
selectFDRegion: undefined,
175+
selectFlagsRegion: undefined,
176+
177+
// On the main worker, activeSelectCallbacks records the set of callbacks
178+
// that are allowed to update the shared regions and notify the thread
179+
// worker. Any callback not in this set (i.e. when !isActiveSelectCallback)
180+
// must not update the regions.
181+
//
182+
// Each select syscall invocation must call deactivateSelectCallbacks to
183+
// reset this set, ensuring that callbacks from previous invocations don't
184+
// affect the current one.
185+
//
186+
// If a callback executes after the thread worker has already returned (due
187+
// to a timeout, a readiness notification or other exceptional conditions)
188+
// but before the next deactivation, it may still update the shared regions.
189+
// However the thread worker will no longer read from those regions and will
190+
// ignore such updates.
191+
//
192+
// activeSelectCallbacks records multiple callback lists one per thread
193+
// worker so that each worker can manage its own set of active callbacks
194+
// independently.
195+
activeSelectCallbacks: {},
196+
activateSelectCallback(i, cb) {
197+
if (SYSCALLS.activeSelectCallbacks[i] == null) SYSCALLS.activeSelectCallbacks[i] = [];
198+
SYSCALLS.activeSelectCallbacks[i].push(cb);
199+
},
200+
deactivateSelectCallbacks(i) {
201+
SYSCALLS.activeSelectCallbacks[i] = [];
202+
},
203+
isActiveSelectCallback(i, cb) {
204+
return (SYSCALLS.activeSelectCallbacks[i] != null) && (SYSCALLS.activeSelectCallbacks[i].indexOf(cb) != -1);
205+
},
206+
#endif
170207
},
171208

172209
$syscallGetVarargI__internal: true,
@@ -607,7 +644,36 @@ var SyscallsLibrary = {
607644
FS.chdir(stream.path);
608645
return 0;
609646
},
647+
__syscall__newselect__deps: ['$newselectInner','malloc','free'],
648+
__syscall__newselect__proxy: 'none',
610649
__syscall__newselect: (nfds, readfds, writefds, exceptfds, timeout) => {
650+
#if PROXY_TO_PTHREAD
651+
if (SYSCALLS.selectFDRegion == undefined) {
652+
SYSCALLS.selectFDRegion = _malloc(4) >> 2;
653+
SYSCALLS.selectFlagsRegion = _malloc(4) >> 2;
654+
}
655+
var fdIdx = SYSCALLS.selectFDRegion;
656+
var flagsIdx = SYSCALLS.selectFlagsRegion;
657+
658+
var result = newselectInner(nfds, readfds, writefds, exceptfds, timeout, fdIdx, flagsIdx);
659+
if ((result != 0) || ((timeout) && (SYSCALLS.getTimeoutInMillis(timeout) == 0))) {
660+
return result;
661+
}
662+
663+
Atomics.wait(HEAP32 , fdIdx, -1);
664+
var fd = HEAP32[fdIdx];
665+
var flags = HEAP32[flagsIdx];
666+
if (fd < 0) return 0;
667+
var fdSet = SYSCALLS.parseSelectFDSet(readfds, writefds, exceptfds);
668+
fdSet.setFlags(fd, flags);
669+
fdSet.commit();
670+
return fdSet.getTotal();
671+
#else
672+
return newselectInner(nfds, readfds, writefds, exceptfds, timeout, -1, -1);
673+
#endif
674+
},
675+
$newselectInner__proxy: 'sync',
676+
$newselectInner: (nfds, readfds, writefds, exceptfds, timeout, fdIdx, flagsIdx) => {
611677
// readfds are supported,
612678
// writefds checks socket open status
613679
// exceptfds are supported, although on web, such exceptional conditions never arise in web sockets
@@ -633,6 +699,32 @@ var SyscallsLibrary = {
633699
timeoutInMillis = SYSCALLS.getTimeoutInMillis(timeout);
634700
}
635701

702+
#if PROXY_TO_PTHREAD
703+
SYSCALLS.deactivateSelectCallbacks(fdIdx); // deactivate all old callbacks
704+
var makeNotifyCallback = (fd) => null;
705+
if (timeoutInMillis != 0) {
706+
Atomics.store(HEAP32, fdIdx, -1); // Initialize the shared region
707+
Atomics.store(HEAP32, flagsIdx, -1); // Initialize the shared region
708+
makeNotifyCallback = (fd) => {
709+
var cb = (flags) => {
710+
if (!SYSCALLS.isActiveSelectCallback(fdIdx, cb)) {
711+
return; // This callback is no longer active.
712+
}
713+
SYSCALLS.deactivateSelectCallbacks(fdIdx); // Only the first event is notified.
714+
Atomics.store(HEAP32, flagsIdx, flags);
715+
Atomics.store(HEAP32, fdIdx, fd);
716+
Atomics.notify(HEAP32, fdIdx);
717+
}
718+
SYSCALLS.activateSelectCallback(fdIdx, cb);
719+
return cb;
720+
}
721+
if (timeoutInMillis > 0) {
722+
var cb = makeNotifyCallback(-2);
723+
setTimeout(() => cb(0), timeoutInMillis);
724+
}
725+
}
726+
#endif
727+
636728
for (var fd = 0; fd < nfds; fd++) {
637729
var mask = 1 << (fd % 32);
638730
if (!(check(fd, allLow, allHigh, mask))) {
@@ -644,14 +736,26 @@ var SyscallsLibrary = {
644736
var flags = SYSCALLS.DEFAULT_POLLMASK;
645737

646738
if (stream.stream_ops.poll) {
739+
#if PROXY_TO_PTHREAD
740+
flags = stream.stream_ops.poll(stream, timeoutInMillis, makeNotifyCallback(fd));
741+
#else
647742
flags = stream.stream_ops.poll(stream, ((timeoutInMillis < 0) || readfds) ? timeoutInMillis : 0);
743+
#endif
648744
}
649745

650746
fdSet.setFlags(fd, flags);
651747
}
652748

653749

750+
#if PROXY_TO_PTHREAD
751+
if ((fdSet.getTotal() > 0) || (timeoutInMillis == 0) ) {
752+
fdSet.commit(fd, flags);
753+
// No wait will happen in the caller. Deactivate all callbacks.
754+
SYSCALLS.deactivateSelectCallbacks(fdIdx);
755+
}
756+
#else
654757
fdSet.commit(fd, flags);
758+
#endif
655759

656760
return fdSet.getTotal();
657761
},

0 commit comments

Comments
 (0)