1+ import { type RedisOptions } from "@internal/redis" ;
12import {
23 context ,
34 propagation ,
@@ -8,21 +9,32 @@ import {
89 trace ,
910 Tracer ,
1011} from "@opentelemetry/api" ;
11- import { type RedisOptions } from "@internal/redis" ;
1212import {
1313 SEMATTRS_MESSAGE_ID ,
14- SEMATTRS_MESSAGING_SYSTEM ,
1514 SEMATTRS_MESSAGING_OPERATION ,
15+ SEMATTRS_MESSAGING_SYSTEM ,
1616} from "@opentelemetry/semantic-conventions" ;
17+ import { Logger } from "@trigger.dev/core/logger" ;
18+ import { tryCatch } from "@trigger.dev/core/utils" ;
1719import { flattenAttributes } from "@trigger.dev/core/v3" ;
20+ import { Worker , type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker" ;
1821import Redis , { type Callback , type Result } from "ioredis" ;
22+ import { setInterval as setIntervalAsync } from "node:timers/promises" ;
23+ import z from "zod" ;
1924import { env } from "~/env.server" ;
2025import { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
2126import { logger } from "~/services/logger.server" ;
2227import { singleton } from "~/utils/singleton" ;
28+ import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server" ;
2329import { concurrencyTracker } from "../services/taskRunConcurrencyTracker.server" ;
2430import { attributesFromAuthenticatedEnv , tracer } from "../tracer.server" ;
2531import { AsyncWorker } from "./asyncWorker.server" ;
32+ import {
33+ MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS ,
34+ MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET ,
35+ MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET ,
36+ MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS ,
37+ } from "./constants.server" ;
2638import { FairDequeuingStrategy } from "./fairDequeuingStrategy.server" ;
2739import { MarQSShortKeyProducer } from "./marqsKeyProducer" ;
2840import {
@@ -36,18 +48,6 @@ import {
3648 VisibilityTimeoutStrategy ,
3749} from "./types" ;
3850import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server" ;
39- import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server" ;
40- import {
41- MARQS_DELAYED_REQUEUE_THRESHOLD_IN_MS ,
42- MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET ,
43- MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET ,
44- MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS ,
45- } from "./constants.server" ;
46- import { setInterval } from "node:timers/promises" ;
47- import { tryCatch } from "@trigger.dev/core/utils" ;
48- import { Worker , type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker" ;
49- import z from "zod" ;
50- import { Logger } from "@trigger.dev/core/logger" ;
5151
5252const KEY_PREFIX = "marqs:" ;
5353
@@ -78,6 +78,8 @@ export type MarQSOptions = {
7878 subscriber ?: MessageQueueSubscriber ;
7979 sharedWorkerQueueConsumerIntervalMs ?: number ;
8080 sharedWorkerQueueMaxMessageCount ?: number ;
81+ sharedWorkerQueueCooloffPeriodMs ?: number ;
82+ sharedWorkerQueueCooloffCountThreshold ?: number ;
8183 eagerDequeuingEnabled ?: boolean ;
8284 workerOptions : {
8385 pollIntervalMs ?: number ;
@@ -107,6 +109,9 @@ export class MarQS {
107109 public keys : MarQSKeyProducer ;
108110 #rebalanceWorkers: Array < AsyncWorker > = [ ] ;
109111 private worker : Worker < typeof workerCatalog > ;
112+ private queueDequeueCooloffPeriod : Map < string , number > = new Map ( ) ;
113+ private queueDequeueCooloffCounts : Map < string , number > = new Map ( ) ;
114+ private clearCooloffPeriodInterval : NodeJS . Timeout ;
110115
111116 constructor ( private readonly options : MarQSOptions ) {
112117 this . redis = options . redis ;
@@ -116,6 +121,12 @@ export class MarQS {
116121 this . #startRebalanceWorkers( ) ;
117122 this . #registerCommands( ) ;
118123
124+ // This will prevent these cooloff maps from growing indefinitely
125+ this . clearCooloffPeriodInterval = setInterval ( ( ) => {
126+ this . queueDequeueCooloffCounts . clear ( ) ;
127+ this . queueDequeueCooloffPeriod . clear ( ) ;
128+ } , 60_000 * 10 ) ; // 10 minutes
129+
119130 this . worker = new Worker ( {
120131 name : "marqs-worker" ,
121132 redisOptions : options . workerOptions . redisOptions ,
@@ -135,6 +146,19 @@ export class MarQS {
135146 if ( options . workerOptions ?. enabled ) {
136147 this . worker . start ( ) ;
137148 }
149+
150+ this . #setupShutdownHandlers( ) ;
151+ }
152+
153+ #setupShutdownHandlers( ) {
154+ process . on ( "SIGTERM" , ( ) => this . shutdown ( "SIGTERM" ) ) ;
155+ process . on ( "SIGINT" , ( ) => this . shutdown ( "SIGINT" ) ) ;
156+ }
157+
158+ async shutdown ( signal : NodeJS . Signals ) {
159+ console . log ( "👇 Shutting down marqs" , this . name , signal ) ;
160+ clearInterval ( this . clearCooloffPeriodInterval ) ;
161+ this . #rebalanceWorkers. forEach ( ( worker ) => worker . stop ( ) ) ;
138162 }
139163
140164 get name ( ) {
@@ -737,7 +761,7 @@ export class MarQS {
737761 let processedCount = 0 ;
738762
739763 try {
740- for await ( const _ of setInterval (
764+ for await ( const _ of setIntervalAsync (
741765 this . options . sharedWorkerQueueConsumerIntervalMs ?? 500 ,
742766 null ,
743767 {
@@ -821,6 +845,7 @@ export class MarQS {
821845 let attemptedEnvs = 0 ;
822846 let attemptedQueues = 0 ;
823847 let messageCount = 0 ;
848+ let coolOffPeriodCount = 0 ;
824849
825850 // Try each queue in order, attempt to dequeue a message from each queue, keep going until we've tried all the queues
826851 for ( const env of envQueues ) {
@@ -829,6 +854,20 @@ export class MarQS {
829854 for ( const messageQueue of env . queues ) {
830855 attemptedQueues ++ ;
831856
857+ const cooloffPeriod = this . queueDequeueCooloffPeriod . get ( messageQueue ) ;
858+
859+ // If the queue is in a cooloff period, skip attempting to dequeue from it
860+ if ( cooloffPeriod ) {
861+ // If the cooloff period is still active, skip attempting to dequeue from it
862+ if ( cooloffPeriod > Date . now ( ) ) {
863+ coolOffPeriodCount ++ ;
864+ continue ;
865+ } else {
866+ // If the cooloff period is over, delete the cooloff period and attempt to dequeue from the queue
867+ this . queueDequeueCooloffPeriod . delete ( messageQueue ) ;
868+ }
869+ }
870+
832871 await this . #trace(
833872 "attemptDequeue" ,
834873 async ( attemptDequeueSpan ) => {
@@ -862,10 +901,32 @@ export class MarQS {
862901 ) ;
863902
864903 if ( ! messages || messages . length === 0 ) {
904+ const cooloffCount = this . queueDequeueCooloffCounts . get ( messageQueue ) ?? 0 ;
905+
906+ const cooloffCountThreshold = Math . max (
907+ 10 ,
908+ this . options . sharedWorkerQueueCooloffCountThreshold ?? 10
909+ ) ; // minimum of 10
910+
911+ if ( cooloffCount >= cooloffCountThreshold ) {
912+ // If no messages were dequeued, set a cooloff period for the queue
913+ // This is to prevent the queue from being dequeued too frequently
914+ // and to give other queues a chance to dequeue messages more frequently
915+ this . queueDequeueCooloffPeriod . set (
916+ messageQueue ,
917+ Date . now ( ) + ( this . options . sharedWorkerQueueCooloffPeriodMs ?? 10_000 ) // defaults to 10 seconds
918+ ) ;
919+ this . queueDequeueCooloffCounts . delete ( messageQueue ) ;
920+ } else {
921+ this . queueDequeueCooloffCounts . set ( messageQueue , cooloffCount + 1 ) ;
922+ }
923+
865924 attemptDequeueSpan . setAttribute ( "message_count" , 0 ) ;
866925 return null ; // Try next queue if no message was dequeued
867926 }
868927
928+ this . queueDequeueCooloffCounts . delete ( messageQueue ) ;
929+
869930 messageCount += messages . length ;
870931
871932 attemptDequeueSpan . setAttribute ( "message_count" , messages . length ) ;
@@ -916,6 +977,7 @@ export class MarQS {
916977 span . setAttribute ( "attempted_queues" , attemptedQueues ) ;
917978 span . setAttribute ( "attempted_envs" , attemptedEnvs ) ;
918979 span . setAttribute ( "message_count" , messageCount ) ;
980+ span . setAttribute ( "cooloff_period_count" , coolOffPeriodCount ) ;
919981
920982 return ;
921983 } ,
@@ -2614,6 +2676,8 @@ function getMarQSClient() {
26142676 sharedWorkerQueueConsumerIntervalMs : env . MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS ,
26152677 sharedWorkerQueueMaxMessageCount : env . MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT ,
26162678 eagerDequeuingEnabled : env . MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1" ,
2679+ sharedWorkerQueueCooloffCountThreshold : env . MARQS_SHARED_WORKER_QUEUE_COOLOFF_COUNT_THRESHOLD ,
2680+ sharedWorkerQueueCooloffPeriodMs : env . MARQS_SHARED_WORKER_QUEUE_COOLOFF_PERIOD_MS ,
26172681 workerOptions : {
26182682 enabled : env . MARQS_WORKER_ENABLED === "1" ,
26192683 pollIntervalMs : env . MARQS_WORKER_POLL_INTERVAL_MS ,
0 commit comments