@@ -589,48 +589,126 @@ export class MarQS {
589589 for ( const messageQueue of env . queues ) {
590590 attemptedQueues ++ ;
591591
592- try {
593- const messageData = await this . #callDequeueMessage( {
594- messageQueue,
595- parentQueue,
596- } ) ;
597-
598- if ( ! messageData ) {
599- continue ; // Try next queue if no message was dequeued
592+ const result = await this . #trace(
593+ "attemptDequeue" ,
594+ async ( innerSpan ) => {
595+ try {
596+ innerSpan . setAttributes ( {
597+ [ SemanticAttributes . QUEUE ] : messageQueue ,
598+ [ SemanticAttributes . PARENT_QUEUE ] : parentQueue ,
599+ } ) ;
600+
601+ const messageData = await this . #trace(
602+ "callDequeueMessage" ,
603+ async ( dequeueSpan ) => {
604+ dequeueSpan . setAttributes ( {
605+ [ SemanticAttributes . QUEUE ] : messageQueue ,
606+ [ SemanticAttributes . PARENT_QUEUE ] : parentQueue ,
607+ } ) ;
608+
609+ return await this . #callDequeueMessage( {
610+ messageQueue,
611+ parentQueue,
612+ } ) ;
613+ } ,
614+ {
615+ kind : SpanKind . CONSUMER ,
616+ attributes : {
617+ [ SEMATTRS_MESSAGING_OPERATION ] : "receive" ,
618+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
619+ } ,
620+ }
621+ ) ;
622+
623+ if ( ! messageData ) {
624+ return null ; // Try next queue if no message was dequeued
625+ }
626+
627+ const message = await this . readMessage ( messageData . messageId ) ;
628+
629+ if ( message ) {
630+ const attributes = {
631+ [ SEMATTRS_MESSAGE_ID ] : message . messageId ,
632+ [ SemanticAttributes . QUEUE ] : message . queue ,
633+ [ SemanticAttributes . MESSAGE_ID ] : message . messageId ,
634+ [ SemanticAttributes . CONCURRENCY_KEY ] : message . concurrencyKey ,
635+ [ SemanticAttributes . PARENT_QUEUE ] : message . parentQueue ,
636+ attempted_queues : attemptedQueues , // How many queues we tried before success
637+ attempted_envs : attemptedEnvs , // How many environments we tried before success
638+ message_timestamp : message . timestamp ,
639+ message_age : this . #calculateMessageAge( message ) ,
640+ message_priority : message . priority ,
641+ message_enqueue_method : message . enqueueMethod ,
642+ message_available_at : message . availableAt ,
643+ ...flattenAttributes ( message . data , "message.data" ) ,
644+ } ;
645+
646+ span . setAttributes ( attributes ) ;
647+ innerSpan . setAttributes ( attributes ) ;
648+
649+ await this . #trace(
650+ "messageDequeued" ,
651+ async ( subscriberSpan ) => {
652+ subscriberSpan . setAttributes ( {
653+ [ SemanticAttributes . MESSAGE_ID ] : message . messageId ,
654+ [ SemanticAttributes . QUEUE ] : message . queue ,
655+ [ SemanticAttributes . PARENT_QUEUE ] : message . parentQueue ,
656+ } ) ;
657+
658+ return await this . options . subscriber ?. messageDequeued ( message ) ;
659+ } ,
660+ {
661+ kind : SpanKind . INTERNAL ,
662+ attributes : {
663+ [ SEMATTRS_MESSAGING_OPERATION ] : "receive" ,
664+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
665+ } ,
666+ }
667+ ) ;
668+
669+ await this . #trace(
670+ "startHeartbeat" ,
671+ async ( heartbeatSpan ) => {
672+ heartbeatSpan . setAttributes ( {
673+ [ SemanticAttributes . MESSAGE_ID ] : messageData . messageId ,
674+ visibility_timeout_ms : this . visibilityTimeoutInMs ,
675+ } ) ;
676+
677+ return await this . options . visibilityTimeoutStrategy . startHeartbeat (
678+ messageData . messageId ,
679+ this . visibilityTimeoutInMs
680+ ) ;
681+ } ,
682+ {
683+ kind : SpanKind . INTERNAL ,
684+ attributes : {
685+ [ SEMATTRS_MESSAGING_OPERATION ] : "receive" ,
686+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
687+ } ,
688+ }
689+ ) ;
690+
691+ return message ;
692+ }
693+ } catch ( error ) {
694+ // Log error but continue trying other queues
695+ logger . warn ( `[${ this . name } ] Failed to dequeue from queue ${ messageQueue } ` , {
696+ error,
697+ } ) ;
698+ return null ;
699+ }
700+ } ,
701+ {
702+ kind : SpanKind . CONSUMER ,
703+ attributes : {
704+ [ SEMATTRS_MESSAGING_OPERATION ] : "receive" ,
705+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
706+ } ,
600707 }
708+ ) ;
601709
602- const message = await this . readMessage ( messageData . messageId ) ;
603-
604- if ( message ) {
605- span . setAttributes ( {
606- [ SEMATTRS_MESSAGE_ID ] : message . messageId ,
607- [ SemanticAttributes . QUEUE ] : message . queue ,
608- [ SemanticAttributes . MESSAGE_ID ] : message . messageId ,
609- [ SemanticAttributes . CONCURRENCY_KEY ] : message . concurrencyKey ,
610- [ SemanticAttributes . PARENT_QUEUE ] : message . parentQueue ,
611- attempted_queues : attemptedQueues , // How many queues we tried before success
612- attempted_envs : attemptedEnvs , // How many environments we tried before success
613- message_timestamp : message . timestamp ,
614- message_age : this . #calculateMessageAge( message ) ,
615- message_priority : message . priority ,
616- message_enqueue_method : message . enqueueMethod ,
617- message_available_at : message . availableAt ,
618- ...flattenAttributes ( message . data , "message.data" ) ,
619- } ) ;
620-
621- await this . options . subscriber ?. messageDequeued ( message ) ;
622-
623- await this . options . visibilityTimeoutStrategy . startHeartbeat (
624- messageData . messageId ,
625- this . visibilityTimeoutInMs
626- ) ;
627-
628- return message ;
629- }
630- } catch ( error ) {
631- // Log error but continue trying other queues
632- logger . warn ( `[${ this . name } ] Failed to dequeue from queue ${ messageQueue } ` , { error } ) ;
633- continue ;
710+ if ( result ) {
711+ return result ;
634712 }
635713 }
636714 }
0 commit comments