Skip to content

Commit 586aaa7

Browse files
committed
select: Enable timeout on select when PROXY_TO_PTHREAD
This commit enables the select syscall to handle timeout with multiple event sources. PROXY_TO_PTHREAD is needed to prevent blocking the main worker. When a thread worker invokes the select syscall with non-zero timeout and no fd is ready, it blocks using Atmoics.wait until it receives a readiness notification. On the main worker, the underlying stream implementation can trigger readiness using Atomics.notify through a callback. A notification also issued automatically once the specified timeout expires. Communication between the thread worker and the main worker occurs via a shared memory region. To prevent a select invocation being unblocked by the callbacks of a previous invocation, all active callbacks are tracked in a list. See the comments in activeSelectCallbacks for details. Usage of the notification callback is optional. If the stream implementation doesn't support it, the "poll" method can still synchronously return the event status. Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
1 parent bc1913e commit 586aaa7

File tree

2 files changed

+122
-0
lines changed

2 files changed

+122
-0
lines changed

src/lib/libpthread.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ var LibraryPThread = {
8888
'exit',
8989
#if PTHREADS_DEBUG || ASSERTIONS
9090
'$ptrToString',
91+
#endif
92+
#if PROXY_TO_PTHREAD
93+
'$addThreadToActiveSelectCallbacks',
94+
'$removeThreadFromActiveSelectCallbacks',
95+
'$activeSelectCallbacks',
9196
#endif
9297
],
9398
$PThread: {
@@ -577,6 +582,9 @@ var LibraryPThread = {
577582
#endif
578583
#if ASSERTIONS
579584
assert(worker);
585+
#endif
586+
#if PROXY_TO_PTHREAD
587+
removeThreadFromActiveSelectCallbacks(pthread_ptr);
580588
#endif
581589
PThread.returnWorkerToPool(worker);
582590
},
@@ -621,6 +629,49 @@ var LibraryPThread = {
621629
$registerTLSInit: (tlsInitFunc) => PThread.tlsInitFunctions.push(tlsInitFunc),
622630
#endif
623631

632+
#if PROXY_TO_PTHREAD
633+
// On the main worker, activeSelectCallbacks records the set of callbacks
634+
// that are allowed to update the shared region. Any callback not in this
635+
// set (i.e. when !isActiveSelectCallback) must not update the region.
636+
//
637+
// Each select syscall invocation must call deactivateSelectCallbacks to
638+
// reset this set, ensuring that callbacks from previous invocations don't
639+
// affect the current one.
640+
//
641+
// If a callback executes after the thread worker has already returned (due
642+
// to a timeout, a readiness notification or other exceptional conditions)
643+
// but before the next deactivation, it may still update the shared region.
644+
// However the thread worker will not read that value and just ignore it.
645+
//
646+
// activeSelectCallbacks records multiple callback lists one per thread
647+
// worker so that each worker can manage its own set of active callbacks
648+
// independently.
649+
$activeSelectCallbacks: {},
650+
$addThreadToActiveSelectCallbacks__deps: ['malloc'],
651+
$addThreadToActiveSelectCallbacks: (pthread_ptr) => {
652+
activeSelectCallbacks[pthread_ptr] = {
653+
buf: _malloc(8),
654+
callbacks: [],
655+
};
656+
},
657+
$removeThreadFromActiveSelectCallbacks: (pthread_ptr) => {
658+
delete activeSelectCallbacks[pthread_ptr];
659+
},
660+
$getActiveSelectCallbacks: (pthread_ptr) => {
661+
return activeSelectCallbacks[pthread_ptr];
662+
},
663+
$deactivateSelectCallbacks: (pthread_ptr) => {
664+
activeSelectCallbacks[pthread_ptr].callbacks = [];
665+
},
666+
$activateSelectCallback: (pthread_ptr, cb) => {
667+
activeSelectCallbacks[pthread_ptr].callbacks.push(cb);
668+
},
669+
$isActiveSelectCallback: (pthread_ptr, cb) => {
670+
return (activeSelectCallbacks[pthread_ptr] != null) &&
671+
(activeSelectCallbacks[pthread_ptr].callbacks.indexOf(cb) != -1);
672+
},
673+
#endif
674+
624675
$spawnThread: (threadParams) => {
625676
#if ASSERTIONS
626677
assert(!ENVIRONMENT_IS_PTHREAD, 'Internal Error! spawnThread() can only ever be called from main application thread!');
@@ -648,6 +699,9 @@ var LibraryPThread = {
648699
arg: threadParams.arg,
649700
pthread_ptr: threadParams.pthread_ptr,
650701
};
702+
#if PROXY_TO_PTHREAD
703+
addThreadToActiveSelectCallbacks(threadParams.pthread_ptr);
704+
#endif
651705
#if OFFSCREENCANVAS_SUPPORT
652706
// Note that we do not need to quote these names because they are only used
653707
// in this file, and not from the external worker.js.

src/lib/libsyscall.js

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,35 @@ var SyscallsLibrary = {
607607
FS.chdir(stream.path);
608608
return 0;
609609
},
610+
__syscall__newselect__deps: ['$newselectInner','malloc','free'],
611+
__syscall__newselect__proxy: 'none',
610612
__syscall__newselect: (nfds, readfds, writefds, exceptfds, timeout) => {
613+
#if PROXY_TO_PTHREAD
614+
var waitPtr = _malloc(8);
615+
var result = newselectInner(nfds, readfds, writefds, exceptfds, timeout, waitPtr);
616+
if ((result != 0) || ((timeout) && (SYSCALLS.getTimeoutInMillis(timeout) == 0))) {
617+
_free(waitPtr);
618+
return result;
619+
}
620+
var fdRegion = {{{ makeGetValue('waitPtr', 0, '*') }}};
621+
Atomics.wait(HEAP32 , fdRegion >> 2, -1);
622+
var fd = Atomics.load(HEAP32 , fdRegion >> 2);
623+
var flags = Atomics.load(HEAP32 , fdRegion >> 2 + 1);
624+
_free(waitPtr);
625+
if (fd < 0) return 0;
626+
var fdSet = SYSCALLS.parseSelectFDSet(readfds, writefds, exceptfds);
627+
fdSet.setFlags(fd, flags);
628+
fdSet.commit();
629+
return fdSet.getTotal();
630+
#else
631+
return newselectInner(nfds, readfds, writefds, exceptfds, timeout, -1);
632+
#endif
633+
},
634+
#if PROXY_TO_PTHREAD
635+
$newselectInner__deps: ['$PThread', '$deactivateSelectCallbacks', '$getActiveSelectCallbacks', '$activateSelectCallback', '$isActiveSelectCallback'],
636+
#endif
637+
$newselectInner__proxy: 'sync',
638+
$newselectInner: (nfds, readfds, writefds, exceptfds, timeout, waitPtr) => {
611639
// readfds are supported,
612640
// writefds checks socket open status
613641
// exceptfds are supported, although on web, such exceptional conditions never arise in web sockets
@@ -633,6 +661,34 @@ var SyscallsLibrary = {
633661
timeoutInMillis = SYSCALLS.getTimeoutInMillis(timeout);
634662
}
635663

664+
#if PROXY_TO_PTHREAD
665+
const pthread_ptr = PThread.currentProxiedOperationCallerThread;
666+
deactivateSelectCallbacks(pthread_ptr); // deactivate all old callbacks
667+
var makeNotifyCallback = (fd) => null;
668+
if (timeoutInMillis != 0) {
669+
var info = getActiveSelectCallbacks(pthread_ptr);
670+
{{{ makeSetValue('waitPtr', 0, 'info.buf', '*') }}};
671+
Atomics.store(HEAP32, info.buf >> 2, -1); // Initialize the shared region
672+
makeNotifyCallback = (fd) => {
673+
var cb = (flags) => {
674+
if (!isActiveSelectCallback(pthread_ptr, cb)) {
675+
return; // This callback is no longer active.
676+
}
677+
deactivateSelectCallbacks(pthread_ptr); // Only the first event is notified.
678+
Atomics.store(HEAP32, info.buf >> 2 + 1, flags);
679+
Atomics.store(HEAP32, info.buf >> 2, fd);
680+
Atomics.notify(HEAP32, info.buf >> 2);
681+
}
682+
activateSelectCallback(pthread_ptr, cb);
683+
return cb;
684+
}
685+
if (timeoutInMillis > 0) {
686+
var cb = makeNotifyCallback(-2);
687+
setTimeout(() => cb(0), timeoutInMillis);
688+
}
689+
}
690+
#endif
691+
636692
for (var fd = 0; fd < nfds; fd++) {
637693
var mask = 1 << (fd % 32);
638694
if (!(check(fd, allLow, allHigh, mask))) {
@@ -644,14 +700,26 @@ var SyscallsLibrary = {
644700
var flags = SYSCALLS.DEFAULT_POLLMASK;
645701

646702
if (stream.stream_ops.poll) {
703+
#if PROXY_TO_PTHREAD
704+
flags = stream.stream_ops.poll(stream, timeoutInMillis, makeNotifyCallback(fd));
705+
#else
647706
flags = stream.stream_ops.poll(stream, ((timeoutInMillis < 0) || readfds) ? timeoutInMillis : 0);
707+
#endif
648708
}
649709

650710
fdSet.setFlags(fd, flags);
651711
}
652712

653713

714+
#if PROXY_TO_PTHREAD
715+
if ((fdSet.getTotal() > 0) || (timeoutInMillis == 0) ) {
716+
fdSet.commit(fd, flags);
717+
// No wait will happen in the caller. Deactivate all callbacks.
718+
deactivateSelectCallbacks(pthread_ptr);
719+
}
720+
#else
654721
fdSet.commit(fd, flags);
722+
#endif
655723

656724
return fdSet.getTotal();
657725
},

0 commit comments

Comments
 (0)