@@ -43,6 +43,11 @@ export type FairDequeuingStrategyOptions = {
4343 biases ?: FairDequeuingStrategyBiases ;
4444 reuseSnapshotCount ?: number ;
4545 maximumEnvCount ?: number ;
46+ /**
47+ * Maximum number of queues to process per environment
48+ * If not provided, all queues in an environment will be processed
49+ */
50+ maximumQueuePerEnvCount ?: number ;
4651} ;
4752
4853type FairQueueConcurrency = {
@@ -216,8 +221,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
216221 return result ;
217222 }
218223
219- // Helper method to maintain DRY principle
220- // Update return type
221224 #orderQueuesByEnvs( envs : string [ ] , snapshot : FairQueueSnapshot ) : Array < EnvQueues > {
222225 const queuesByEnv = snapshot . queues . reduce ( ( acc , queue ) => {
223226 if ( ! acc [ queue . env ] ) {
@@ -231,11 +234,17 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
231234 if ( queuesByEnv [ envId ] ) {
232235 // Get ordered queues for this env
233236 const orderedQueues = this . #weightedRandomQueueOrder( queuesByEnv [ envId ] ) ;
237+
238+ // Apply queue limit if maximumQueuePerEnvCount is set
239+ const limitedQueues = this . options . maximumQueuePerEnvCount
240+ ? orderedQueues . slice ( 0 , this . options . maximumQueuePerEnvCount )
241+ : orderedQueues ;
242+
234243 // Only add the env if it has queues
235- if ( orderedQueues . length > 0 ) {
244+ if ( limitedQueues . length > 0 ) {
236245 acc . push ( {
237246 envId,
238- queues : orderedQueues . map ( ( queue ) => queue . id ) ,
247+ queues : limitedQueues . map ( ( queue ) => queue . id ) ,
239248 } ) ;
240249 }
241250 }
@@ -512,6 +521,10 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
512521
513522 span . setAttribute ( "queue_count" , result . length ) ;
514523
524+ if ( result . length === this . options . parentQueueLimit ) {
525+ span . setAttribute ( "parent_queue_limit_reached" , true ) ;
526+ }
527+
515528 return result ;
516529 } ) ;
517530 }
0 commit comments