@@ -610,3 +610,120 @@ test('should handle request/response pattern with both path and query parameters
610610 t . assert . strictEqual ( response . statusCode , 200 )
611611 t . assert . strictEqual ( response . payload , '{"status": "updated"}' )
612612} )
613+
614+ test ( 'should ignore response message missing correlation ID' , async t => {
615+ const server = await startStackable ( t , '' , {
616+ topics : [
617+ {
618+ topic : 'plt-kafka-hooks-response' ,
619+ url : 'http://localhost:3043/response'
620+ }
621+ ] ,
622+ requestResponse : [
623+ {
624+ path : '/api/process' ,
625+ requestTopic : 'plt-kafka-hooks-request' ,
626+ responseTopic : 'plt-kafka-hooks-response' ,
627+ timeout : 1000 // Short timeout for this test
628+ }
629+ ]
630+ } )
631+
632+ // Start a legitimate request that will timeout
633+ const requestPromise = server . inject ( {
634+ method : 'POST' ,
635+ url : '/api/process' ,
636+ payload : 'test request' ,
637+ headers : {
638+ 'content-type' : 'text/plain'
639+ }
640+ } )
641+
642+ // Send a response message without correlation ID - this should be ignored
643+ await publishMessage ( server , 'plt-kafka-hooks-response' , 'response without correlation' , {
644+ 'content-type' : 'text/plain' ,
645+ 'x-status-code' : '200'
646+ // No correlationIdHeader
647+ } )
648+
649+ // The request should still timeout because the invalid response was ignored
650+ const response = await requestPromise
651+ t . assert . strictEqual ( response . statusCode , 504 ) // Gateway timeout
652+
653+ const json = response . json ( )
654+ t . assert . strictEqual ( json . code , 'HTTP_ERROR_GATEWAY_TIMEOUT' )
655+ } )
656+
657+ test ( 'should handle no pending request found for correlation ID' , async t => {
658+ const server = await startStackable ( t , '' , {
659+ topics : [
660+ {
661+ topic : 'plt-kafka-hooks-response' ,
662+ url : 'http://localhost:3043/response'
663+ }
664+ ] ,
665+ requestResponse : [
666+ {
667+ path : '/api/process' ,
668+ requestTopic : 'plt-kafka-hooks-request' ,
669+ responseTopic : 'plt-kafka-hooks-response' ,
670+ timeout : 5000
671+ }
672+ ]
673+ } )
674+
675+ // Random correlation ID that doesn't correspond to any pending request
676+ const nonExistentCorrelationId = randomUUID ( )
677+
678+ // Send a response message with a correlation ID that has no pending request
679+ await publishMessage ( server , 'plt-kafka-hooks-response' , 'orphaned response' , {
680+ [ correlationIdHeader ] : nonExistentCorrelationId ,
681+ 'content-type' : 'text/plain' ,
682+ 'x-status-code' : '200'
683+ } )
684+
685+ // Verify the system still works normally by doing a proper request/response cycle
686+ const requestConsumer = new Consumer ( {
687+ groupId : randomUUID ( ) ,
688+ bootstrapBrokers : 'localhost:9092' ,
689+ maxWaitTime : 500 ,
690+ deserializers : {
691+ value : stringDeserializer
692+ }
693+ } )
694+
695+ await requestConsumer . metadata ( { topics : [ 'plt-kafka-hooks-request' ] , autocreateTopics : true } )
696+ const requestStream = await requestConsumer . consume ( { topics : [ 'plt-kafka-hooks-request' ] } )
697+ t . after ( ( ) => requestConsumer . close ( true ) )
698+
699+ // Start a legitimate request
700+ const requestPromise = server . inject ( {
701+ method : 'POST' ,
702+ url : '/api/process' ,
703+ payload : 'legitimate request' ,
704+ headers : {
705+ 'content-type' : 'text/plain'
706+ }
707+ } )
708+
709+ // Wait for the request to be published
710+ const [ requestMessage ] = await once ( requestStream , 'data' )
711+
712+ const headers = { }
713+ for ( const [ key , value ] of requestMessage . headers ) {
714+ headers [ key . toString ( ) ] = value . toString ( )
715+ }
716+
717+ // Send proper response with the correct correlation ID
718+ const correctCorrelationId = headers [ correlationIdHeader ]
719+ await publishMessage ( server , 'plt-kafka-hooks-response' , 'legitimate response' , {
720+ [ correlationIdHeader ] : correctCorrelationId ,
721+ 'content-type' : 'text/plain' ,
722+ 'x-status-code' : '200'
723+ } )
724+
725+ // Verify the legitimate request still works properly
726+ const response = await requestPromise
727+ t . assert . strictEqual ( response . statusCode , 200 )
728+ t . assert . strictEqual ( response . payload , 'legitimate response' )
729+ } )
0 commit comments