Skip to content

Commit 4cfc7a9

Browse files
committed
select: Enable timeout on select when PROXY_TO_PTHREAD
This commit enables the select syscall to handle timeout with multiple event sources. PROXY_TO_PTHREAD is needed to prevent blocking the main worker. When a thread worker invokes the select syscall with non-zero timeout and no fd is ready, it blocks using Atmoics.wait until it receives a readiness notification. On the main worker, the underlying stream implementation can trigger readiness using Atomics.notify through a callback. A notification also issued automatically once the specified timeout expires. Communication between the thread worker and the main worker occurs via a shared memory region. To prevent a select invocation being unblocked by the callbacks of a previous invocation, all active callbacks are tracked in a list. See the comments in activeSelectCallbacks for details. Usage of the notification callback is optional. If the stream implementation doesn't support it, the "poll" method can still synchronously return the event status. Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
1 parent c3a81b8 commit 4cfc7a9

File tree

2 files changed

+122
-0
lines changed

2 files changed

+122
-0
lines changed

src/lib/libpthread.js

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ var LibraryPThread = {
8888
'exit',
8989
#if PTHREADS_DEBUG || ASSERTIONS
9090
'$ptrToString',
91+
#endif
92+
#if PROXY_TO_PTHREAD
93+
'$addThreadToActiveSelectCallbacks',
94+
'$removeThreadFromActiveSelectCallbacks',
95+
'$activeSelectCallbacks',
9196
#endif
9297
],
9398
$PThread: {
@@ -587,6 +592,9 @@ var LibraryPThread = {
587592
#endif
588593
#if ASSERTIONS
589594
assert(worker);
595+
#endif
596+
#if PROXY_TO_PTHREAD
597+
removeThreadFromActiveSelectCallbacks(pthread_ptr);
590598
#endif
591599
PThread.returnWorkerToPool(worker);
592600
},
@@ -631,6 +639,49 @@ var LibraryPThread = {
631639
$registerTLSInit: (tlsInitFunc) => PThread.tlsInitFunctions.push(tlsInitFunc),
632640
#endif
633641

642+
#if PROXY_TO_PTHREAD
643+
// On the main worker, activeSelectCallbacks records the set of callbacks
644+
// that are allowed to update the shared region. Any callback not in this
645+
// set (i.e. when !isActiveSelectCallback) must not update the region.
646+
//
647+
// Each select syscall invocation must call deactivateSelectCallbacks to
648+
// reset this set, ensuring that callbacks from previous invocations don't
649+
// affect the current one.
650+
//
651+
// If a callback executes after the thread worker has already returned (due
652+
// to a timeout, a readiness notification or other exceptional conditions)
653+
// but before the next deactivation, it may still update the shared region.
654+
// However the thread worker will not read that value and just ignore it.
655+
//
656+
// activeSelectCallbacks records multiple callback lists one per thread
657+
// worker so that each worker can manage its own set of active callbacks
658+
// independently.
659+
$activeSelectCallbacks: {},
660+
$addThreadToActiveSelectCallbacks__deps: ['malloc'],
661+
$addThreadToActiveSelectCallbacks: (pthread_ptr) => {
662+
activeSelectCallbacks[pthread_ptr] = {
663+
buf: _malloc(8),
664+
callbacks: [],
665+
};
666+
},
667+
$removeThreadFromActiveSelectCallbacks: (pthread_ptr) => {
668+
delete activeSelectCallbacks[pthread_ptr];
669+
},
670+
$getActiveSelectCallbacks: (pthread_ptr) => {
671+
return activeSelectCallbacks[pthread_ptr];
672+
},
673+
$deactivateSelectCallbacks: (pthread_ptr) => {
674+
activeSelectCallbacks[pthread_ptr].callbacks = [];
675+
},
676+
$activateSelectCallback: (pthread_ptr, cb) => {
677+
activeSelectCallbacks[pthread_ptr].callbacks.push(cb);
678+
},
679+
$isActiveSelectCallback: (pthread_ptr, cb) => {
680+
return (activeSelectCallbacks[pthread_ptr] != null) &&
681+
(activeSelectCallbacks[pthread_ptr].callbacks.indexOf(cb) != -1);
682+
},
683+
#endif
684+
634685
$spawnThread: (threadParams) => {
635686
#if ASSERTIONS
636687
assert(!ENVIRONMENT_IS_PTHREAD, 'Internal Error! spawnThread() can only ever be called from main application thread!');
@@ -658,6 +709,9 @@ var LibraryPThread = {
658709
arg: threadParams.arg,
659710
pthread_ptr: threadParams.pthread_ptr,
660711
};
712+
#if PROXY_TO_PTHREAD
713+
addThreadToActiveSelectCallbacks(threadParams.pthread_ptr);
714+
#endif
661715
#if OFFSCREENCANVAS_SUPPORT
662716
// Note that we do not need to quote these names because they are only used
663717
// in this file, and not from the external worker.js.

src/lib/libsyscall.js

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,35 @@ var SyscallsLibrary = {
607607
FS.chdir(stream.path);
608608
return 0;
609609
},
610+
__syscall__newselect__deps: ['$newselectInner','malloc','free'],
611+
__syscall__newselect__proxy: 'none',
610612
__syscall__newselect: (nfds, readfds, writefds, exceptfds, timeout) => {
613+
#if PROXY_TO_PTHREAD
614+
var waitPtr = _malloc(8);
615+
var result = newselectInner(nfds, readfds, writefds, exceptfds, timeout, waitPtr);
616+
if ((result != 0) || ((timeout) && (SYSCALLS.getTimeoutInMillis(timeout) == 0))) {
617+
_free(waitPtr);
618+
return result;
619+
}
620+
var fdRegion = {{{ makeGetValue('waitPtr', 0, '*') }}};
621+
Atomics.wait(HEAP32 , fdRegion >> 2, -1);
622+
var fd = Atomics.load(HEAP32 , fdRegion >> 2);
623+
var flags = Atomics.load(HEAP32 , fdRegion >> 2 + 1);
624+
_free(waitPtr);
625+
if (fd < 0) return 0;
626+
var fdSet = SYSCALLS.parseSelectFDSet(readfds, writefds, exceptfds);
627+
fdSet.setFlags(fd, flags);
628+
fdSet.commit();
629+
return fdSet.getTotal();
630+
#else
631+
return newselectInner(nfds, readfds, writefds, exceptfds, timeout, -1);
632+
#endif
633+
},
634+
#if PROXY_TO_PTHREAD
635+
$newselectInner__deps: ['$PThread', '$deactivateSelectCallbacks', '$getActiveSelectCallbacks', '$activateSelectCallback', '$isActiveSelectCallback'],
636+
#endif
637+
$newselectInner__proxy: 'sync',
638+
$newselectInner: (nfds, readfds, writefds, exceptfds, timeout, waitPtr) => {
611639
// readfds are supported,
612640
// writefds checks socket open status
613641
// exceptfds are supported, although on web, such exceptional conditions never arise in web sockets
@@ -633,6 +661,34 @@ var SyscallsLibrary = {
633661
timeoutInMillis = SYSCALLS.getTimeoutInMillis(timeout);
634662
}
635663

664+
#if PROXY_TO_PTHREAD
665+
const pthread_ptr = PThread.currentProxiedOperationCallerThread;
666+
deactivateSelectCallbacks(pthread_ptr); // deactivate all old callbacks
667+
var makeNotifyCallback = (fd) => null;
668+
if (timeoutInMillis != 0) {
669+
var info = getActiveSelectCallbacks(pthread_ptr);
670+
{{{ makeSetValue('waitPtr', 0, 'info.buf', '*') }}};
671+
Atomics.store(HEAP32, info.buf >> 2, -1); // Initialize the shared region
672+
makeNotifyCallback = (fd) => {
673+
var cb = (flags) => {
674+
if (!isActiveSelectCallback(pthread_ptr, cb)) {
675+
return; // This callback is no longer active.
676+
}
677+
deactivateSelectCallbacks(pthread_ptr); // Only the first event is notified.
678+
Atomics.store(HEAP32, info.buf >> 2 + 1, flags);
679+
Atomics.store(HEAP32, info.buf >> 2, fd);
680+
Atomics.notify(HEAP32, info.buf >> 2);
681+
}
682+
activateSelectCallback(pthread_ptr, cb);
683+
return cb;
684+
}
685+
if (timeoutInMillis > 0) {
686+
var cb = makeNotifyCallback(-2);
687+
setTimeout(() => cb(0), timeoutInMillis);
688+
}
689+
}
690+
#endif
691+
636692
for (var fd = 0; fd < nfds; fd++) {
637693
var mask = 1 << (fd % 32);
638694
if (!(check(fd, allLow, allHigh, mask))) {
@@ -644,14 +700,26 @@ var SyscallsLibrary = {
644700
var flags = SYSCALLS.DEFAULT_POLLMASK;
645701

646702
if (stream.stream_ops.poll) {
703+
#if PROXY_TO_PTHREAD
704+
flags = stream.stream_ops.poll(stream, timeoutInMillis, makeNotifyCallback(fd));
705+
#else
647706
flags = stream.stream_ops.poll(stream, ((timeoutInMillis < 0) || readfds) ? timeoutInMillis : 0);
707+
#endif
648708
}
649709

650710
fdSet.setFlags(fd, flags);
651711
}
652712

653713

714+
#if PROXY_TO_PTHREAD
715+
if ((fdSet.getTotal() > 0) || (timeoutInMillis == 0) ) {
716+
fdSet.commit(fd, flags);
717+
// No wait will happen in the caller. Deactivate all callbacks.
718+
deactivateSelectCallbacks(pthread_ptr);
719+
}
720+
#else
654721
fdSet.commit(fd, flags);
722+
#endif
655723

656724
return fdSet.getTotal();
657725
},

0 commit comments

Comments
 (0)