1- import { sleep , stringDeserializer , jsonDeserializer , Consumer } from '@platformatic/kafka'
1+ import { stringDeserializer , jsonDeserializer , Consumer } from '@platformatic/kafka'
22import promClient from 'prom-client'
33import { buildServer } from '@platformatic/service'
44import { NOT_FOUND } from 'http-errors-enhanced'
@@ -104,6 +104,22 @@ async function publishMessage (server, topic, message, headers = {}) {
104104 return res
105105}
106106
107+ async function waitForMetricUpdate ( registry , metricName , labelMatcher ) {
108+ const { promise, resolve } = Promise . withResolvers ( )
109+
110+ const metric = registry . getSingleMetric ( metricName )
111+ if ( ! metric ) return false
112+
113+ const metricValue = await metric . get ( )
114+ const matchingMetric = metricValue . values . find ( v => labelMatcher ( v . labels ) )
115+
116+ if ( matchingMetric && matchingMetric . value >= 1 ) {
117+ resolve ( matchingMetric )
118+ }
119+
120+ return promise
121+ }
122+
107123test ( 'should produce messages to Kafka and then forward them to the target server' , async t => {
108124 // Start the monitor
109125 const { consumer, stream } = await createMonitor ( stringDeserializer )
@@ -249,15 +265,24 @@ test('should not publish a permanently failed message to the DLQ if asked to', a
249265 } )
250266 const value = randomUUID ( )
251267
268+ const { promise : processingPromise , resolve : resolveProcessing } = Promise . withResolvers ( )
252269 let dlqMessages = 0
270+
253271 stream . on ( 'data' , message => {
254272 if ( message . topic === defaultDlqTopic ) {
255273 dlqMessages ++
256274 }
275+ // Mark processing as complete when we receive the original message
276+ if ( message . topic === 'plt-kafka-hooks-fail' ) {
277+ resolveProcessing ( )
278+ }
257279 } )
258280
259281 await publishMessage ( server , 'plt-kafka-hooks-fail' , value )
260- await sleep ( 3000 )
282+
283+ // Wait for the message to be processed and retries to complete
284+ await processingPromise
285+
261286 deepStrictEqual ( dlqMessages , 0 )
262287} )
263288
@@ -730,6 +755,9 @@ test('should handle no pending request found for correlation ID', async t => {
730755} )
731756
732757test ( 'should increment DLQ metrics with network_error reason when network error occurs' , async t => {
758+ const { consumer, stream } = await createMonitor ( stringDeserializer )
759+ t . after ( ( ) => consumer . close ( true ) )
760+
733761 const registry = new promClient . Registry ( )
734762
735763 const originalPrometheus = globalThis . platformatic ?. prometheus
@@ -749,22 +777,25 @@ test('should increment DLQ metrics with network_error reason when network error
749777 ]
750778 } )
751779
752- // Get the DLQ counter metric before sending the message
753- const dlqMetric = registry . getSingleMetric ( 'kafka_hooks_dlq_messages_total' )
754- await publishMessage ( server , 'plt-kafka-hooks-network-error' , 'test message' )
755- await sleep ( 1500 )
780+ const { promise : processingPromise , resolve : resolveProcessing } = Promise . withResolvers ( )
756781
757- // Get the metric value after processing
758- const finalValue = await dlqMetric . get ( )
782+ stream . on ( 'data' , message => {
783+ if ( message . topic === 'plt-kafka-hooks-dlq' ) {
784+ resolveProcessing ( )
785+ }
786+ } )
787+ await publishMessage ( server , 'plt-kafka-hooks-network-error' , 'test message' )
788+ await processingPromise
759789
760- // Find the specific metric for our topic and reason
761- const networkErrorMetric = finalValue . values . find ( v =>
762- v . labels . topic === 'plt-kafka-hooks-network-error' &&
763- v . labels . reason === 'network_error'
790+ // Wait for the metric to be updated
791+ const matchingMetric = await waitForMetricUpdate (
792+ registry ,
793+ 'kafka_hooks_dlq_messages_total' ,
794+ ( labels ) => labels . topic === 'plt-kafka-hooks-network-error' && labels . reason === 'network_error'
764795 )
765796
766- t . assert . ok ( networkErrorMetric , 'DLQ metric with network_error reason should exist' )
767- t . assert . strictEqual ( networkErrorMetric . value , 1 , 'DLQ metric should be incremented by 1' )
797+ t . assert . ok ( matchingMetric , 'DLQ metric with network_error reason should exist' )
798+ t . assert . strictEqual ( matchingMetric . value , 1 , 'DLQ metric should be incremented by 1' )
768799
769800 // Restore original prometheus
770801 if ( originalPrometheus ) {
@@ -775,6 +806,8 @@ test('should increment DLQ metrics with network_error reason when network error
775806} )
776807
777808test ( 'should increment DLQ metrics with http status code reason when HTTP error occurs' , async t => {
809+ const { consumer, stream } = await createMonitor ( stringDeserializer )
810+ t . after ( ( ) => consumer . close ( true ) )
778811 const { url : targetUrl } = await startTargetServer ( t )
779812
780813 // Create a real Prometheus registry
@@ -801,27 +834,29 @@ test('should increment DLQ metrics with http status code reason when HTTP error
801834 ]
802835 } )
803836
804- await publishMessage ( server , 'plt-kafka-hooks-http-error' , 'test message' )
837+ const { promise : processingPromise , resolve : resolveProcessing } = Promise . withResolvers ( )
805838
806- // Wait for processing to complete
807- await sleep ( 1500 )
839+ stream . on ( 'data' , message => {
840+ if ( message . topic === 'plt-kafka-hooks-dlq' ) {
841+ resolveProcessing ( )
842+ }
843+ } )
808844
809- // Get the DLQ counter metric
810- const dlqMetric = registry . getSingleMetric ( 'kafka_hooks_dlq_messages_total' )
811- const metricValue = await dlqMetric . get ( )
845+ await publishMessage ( server , 'plt-kafka-hooks-http-error' , 'test message' )
846+ await processingPromise
812847
813- // Find the specific metric for our topic - we need to check what status code /fail returns
814- // Looking at existing tests, it might return 500 or another status code
815- const httpErrorMetric = metricValue . values . find ( v =>
816- v . labels . topic === 'plt-kafka-hooks-http-error' &&
817- v . labels . reason . startsWith ( 'http_' )
848+ // Wait for the metric to be updated
849+ const matchingMetric = await waitForMetricUpdate (
850+ registry ,
851+ 'kafka_hooks_dlq_messages_total' ,
852+ ( labels ) => labels . topic === 'plt-kafka-hooks-http-error' && labels . reason . startsWith ( 'http_' )
818853 )
819854
820- t . assert . ok ( httpErrorMetric , 'DLQ metric with http status code reason should exist' )
821- t . assert . strictEqual ( httpErrorMetric . value , 1 , 'DLQ metric should be incremented by 1' )
855+ t . assert . ok ( matchingMetric , 'DLQ metric with http status code reason should exist' )
856+ t . assert . strictEqual ( matchingMetric . value , 1 , 'DLQ metric should be incremented by 1' )
822857
823858 // Log the actual reason for debugging
824- t . diagnostic ( `DLQ reason: ${ httpErrorMetric . labels . reason } ` )
859+ t . diagnostic ( `DLQ reason: ${ matchingMetric . labels . reason } ` )
825860
826861 // Restore original prometheus
827862 if ( originalPrometheus ) {
0 commit comments