@@ -23,6 +23,7 @@ const {
2323const { Buffer } = require ( 'buffer' ) ;
2424const MessageCache = require ( './_consumer_cache' ) ;
2525const { hrtime } = require ( 'process' ) ;
26+ const { LinkedList } = require ( './_linked-list' ) ;
2627
2728const ConsumerState = Object . freeze ( {
2829 INIT : 0 ,
@@ -170,9 +171,9 @@ class Consumer {
170171 #fetchInProgress;
171172
172173 /**
173- * Promise that resolves when there is something we need to poll for (messages, rebalance, etc) .
174+ * List of DeferredPromises waiting on consumer queue to be non-empty .
174175 */
175- #queueNonEmpty = new DeferredPromise ( ) ;
176+ #queueWaiters = new LinkedList ( ) ;
176177
177178 /**
178179 * Whether any rebalance callback is in progress.
@@ -1270,8 +1271,9 @@ class Consumer {
12701271 }
12711272
12721273 #queueNonEmptyCb( ) {
1273- /* Unconditionally resolve the promise - not a problem if it's already resolved. */
1274- this . #queueNonEmpty. resolve ( ) ;
1274+ for ( const waiter of this . #queueWaiters) {
1275+ waiter . resolve ( ) ;
1276+ }
12751277 }
12761278
12771279 async #nextFetchRetry( ) {
@@ -1280,15 +1282,21 @@ class Consumer {
12801282 } else {
12811283 /* Backoff a little. If m is null, we might be without messages
12821284 * or in available partition starvation, and calling consumeSingleCached
1283- * in a tight loop will help no one. We still keep it to 1000ms because we
1284- * want to keep polling, though (ideally) we could increase it all the way
1285- * up to max.poll.interval.ms.
1285+ * in a tight loop will help no one.
12861286 * In case there is any message in the queue, we'll be woken up before the
1287- * timer expires. */
1288- await Timer . withTimeout ( 1000 , this . #queueNonEmpty) ;
1289- if ( this . #queueNonEmpty. resolved ) {
1290- this . #queueNonEmpty = new DeferredPromise ( ) ;
1291- }
1287+ * timer expires.
1288+ * We have a per-worker promise, otherwise we end up awakening
1289+ * other workers when they've already looped and just restarted awaiting.
1290+ * The `Promise` passed to `Timer.withTimeout` cannot be reused
1291+ * in next call to this method, to avoid memory leaks caused
1292+ * by `Promise.race`. */
1293+ const waiter = new DeferredPromise ( ) ;
1294+ const waiterNode = this . #queueWaiters. addLast ( waiter ) ;
1295+ await Timer . withTimeout ( 1000 , waiter ) ;
1296+
1297+ /* Resolves the "extra" promise that has been spawned when creating the timer. */
1298+ waiter . resolve ( ) ;
1299+ this . #queueWaiters. remove ( waiterNode ) ;
12921300 }
12931301 }
12941302
@@ -1374,10 +1382,7 @@ class Consumer {
13741382 let interval = Number ( cacheExpiration - now ) / 1e6 ;
13751383 if ( interval < 100 )
13761384 interval = 100 ;
1377- const promises = Promise . race ( [ this . #workerTerminationScheduled,
1378- this . #maxPollIntervalRestart] ) ;
1379- await Timer . withTimeout ( interval ,
1380- promises ) ;
1385+ await Timer . withTimeout ( interval , this . #maxPollIntervalRestart) ;
13811386 if ( this . #maxPollIntervalRestart. resolved )
13821387 this . #maxPollIntervalRestart = new DeferredPromise ( ) ;
13831388 }
0 commit comments