|
1 | 1 | import { sleep, stringDeserializer, jsonDeserializer, Consumer } from '@platformatic/kafka' |
| 2 | +import promClient from 'prom-client' |
2 | 3 | import { buildServer } from '@platformatic/service' |
3 | 4 | import { NOT_FOUND } from 'http-errors-enhanced' |
4 | 5 | import { deepStrictEqual } from 'node:assert' |
@@ -727,3 +728,105 @@ test('should handle no pending request found for correlation ID', async t => { |
727 | 728 | t.assert.strictEqual(response.statusCode, 200) |
728 | 729 | t.assert.strictEqual(response.payload, 'legitimate response') |
729 | 730 | }) |
| 731 | + |
| 732 | +test('should increment DLQ metrics with network_error reason when network error occurs', async t => { |
| 733 | + const registry = new promClient.Registry() |
| 734 | + |
| 735 | + const originalPrometheus = globalThis.platformatic?.prometheus |
| 736 | + globalThis.platformatic = { |
| 737 | + prometheus: { registry, client: promClient } |
| 738 | + } |
| 739 | + |
| 740 | + const server = await startStackable(t, '', { |
| 741 | + topics: [ |
| 742 | + { |
| 743 | + topic: 'plt-kafka-hooks-network-error', |
| 744 | + url: 'http://127.0.0.1:1/fail', // Invalid URL that will cause network error |
| 745 | + dlq: true, |
| 746 | + retries: 1, |
| 747 | + retryDelay: 100 |
| 748 | + } |
| 749 | + ] |
| 750 | + }) |
| 751 | + |
| 752 | + // Get the DLQ counter metric before sending the message |
| 753 | + const dlqMetric = registry.getSingleMetric('kafka_hooks_dlq_messages_total') |
| 754 | + await publishMessage(server, 'plt-kafka-hooks-network-error', 'test message') |
| 755 | + await sleep(1500) |
| 756 | + |
| 757 | + // Get the metric value after processing |
| 758 | + const finalValue = await dlqMetric.get() |
| 759 | + |
| 760 | + // Find the specific metric for our topic and reason |
| 761 | + const networkErrorMetric = finalValue.values.find(v => |
| 762 | + v.labels.topic === 'plt-kafka-hooks-network-error' && |
| 763 | + v.labels.reason === 'network_error' |
| 764 | + ) |
| 765 | + |
| 766 | + t.assert.ok(networkErrorMetric, 'DLQ metric with network_error reason should exist') |
| 767 | + t.assert.strictEqual(networkErrorMetric.value, 1, 'DLQ metric should be incremented by 1') |
| 768 | + |
| 769 | + // Restore original prometheus |
| 770 | + if (originalPrometheus) { |
| 771 | + globalThis.platformatic.prometheus = originalPrometheus |
| 772 | + } else { |
| 773 | + delete globalThis.platformatic |
| 774 | + } |
| 775 | +}) |
| 776 | + |
| 777 | +test('should increment DLQ metrics with http status code reason when HTTP error occurs', async t => { |
| 778 | + const { url: targetUrl } = await startTargetServer(t) |
| 779 | + |
| 780 | + // Create a real Prometheus registry |
| 781 | + const registry = new promClient.Registry() |
| 782 | + |
| 783 | + // Set up global prometheus |
| 784 | + const originalPrometheus = globalThis.platformatic?.prometheus |
| 785 | + globalThis.platformatic = { |
| 786 | + prometheus: { |
| 787 | + registry, |
| 788 | + client: promClient |
| 789 | + } |
| 790 | + } |
| 791 | + |
| 792 | + const server = await startStackable(t, '', { |
| 793 | + topics: [ |
| 794 | + { |
| 795 | + topic: 'plt-kafka-hooks-http-error', |
| 796 | + url: `${targetUrl}/fail`, // Use the existing /fail endpoint |
| 797 | + dlq: true, |
| 798 | + retries: 1, |
| 799 | + retryDelay: 100 |
| 800 | + } |
| 801 | + ] |
| 802 | + }) |
| 803 | + |
| 804 | + await publishMessage(server, 'plt-kafka-hooks-http-error', 'test message') |
| 805 | + |
| 806 | + // Wait for processing to complete |
| 807 | + await sleep(1500) |
| 808 | + |
| 809 | + // Get the DLQ counter metric |
| 810 | + const dlqMetric = registry.getSingleMetric('kafka_hooks_dlq_messages_total') |
| 811 | + const metricValue = await dlqMetric.get() |
| 812 | + |
| 813 | + // Find the specific metric for our topic - we need to check what status code /fail returns |
| 814 | + // Looking at existing tests, it might return 500 or another status code |
| 815 | + const httpErrorMetric = metricValue.values.find(v => |
| 816 | + v.labels.topic === 'plt-kafka-hooks-http-error' && |
| 817 | + v.labels.reason.startsWith('http_') |
| 818 | + ) |
| 819 | + |
| 820 | + t.assert.ok(httpErrorMetric, 'DLQ metric with http status code reason should exist') |
| 821 | + t.assert.strictEqual(httpErrorMetric.value, 1, 'DLQ metric should be incremented by 1') |
| 822 | + |
| 823 | + // Log the actual reason for debugging |
| 824 | + t.diagnostic(`DLQ reason: ${httpErrorMetric.labels.reason}`) |
| 825 | + |
| 826 | + // Restore original prometheus |
| 827 | + if (originalPrometheus) { |
| 828 | + globalThis.platformatic.prometheus = originalPrometheus |
| 829 | + } else { |
| 830 | + delete globalThis.platformatic |
| 831 | + } |
| 832 | +}) |
0 commit comments