Skip to content

Commit 565bfd0

Browse files
mcollinaclaude
andauthored
feat: implement request/response pattern over Kafka (#6)
* feat: implement request/response pattern over Kafka - Add HTTP request/response routing through Kafka topics - Implement correlation ID handling for request matching - Add configurable timeout support (default 30s) - Support custom HTTP status codes in responses - Preserve all HTTP headers in Kafka messages - Add comprehensive tests for request/response flows - Update schema to support requestResponse configuration 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * docs: add comprehensive request/response pattern documentation - Add detailed How It Works section explaining the flow - Include complete configuration examples with all options - Provide step-by-step usage examples with curl commands - Document response headers and error handling scenarios - Add use cases for microservice communication patterns - Update main feature list to highlight new capability 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * feat: add support for path parameters and query strings - Add new header constants for path params and query string data - Automatically extract path parameters from URL routes - Pass path parameters as JSON in x-plt-kafka-hooks-path-params header - Pass query string parameters as JSON in x-plt-kafka-hooks-query-string header - Support Fastify path parameter syntax (e.g., /users/:userId/orders/:orderId) - Add comprehensive tests for path params, query strings, and combined usage - Update documentation with examples and header reference tables - Maintain backward compatibility with existing functionality 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * test: add comprehensive integration tests for request/response pattern - Add complete end-to-end integration tests demonstrating real-world usage - Test path parameters and query string extraction and forwarding - Test error handling with custom HTTP status codes - Demonstrate microservice communication patterns via Kafka - Show how kafka-hooks enables decoupled, reliable service architecture - Include realistic scenarios with user profiles, orders, and error cases - Verify correlation ID handling and message flow integrity - Test both successful responses and error conditions The integration tests demonstrate: - HTTP → Kafka → Processing Service → Kafka → HTTP flow - Path parameters: /api/users/:userId/orders/:orderId - Query strings: ?include=items&format=detailed&currency=USD - JSON request/response payloads with complex nested data - Error responses with custom status codes (404, 500, etc.) - Complete microservice architecture patterns 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: resolve linting issues in integration tests 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * fix: change timeout status code from 408 to 504 Use 504 Gateway Timeout instead of 408 Request Timeout for request/response pattern timeouts, as this is more appropriate for scenarios where the gateway times out waiting for an upstream service. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent db11768 commit 565bfd0

File tree

9 files changed

+1184
-10
lines changed

9 files changed

+1184
-10
lines changed

README.md

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,191 @@ Then you can:
88

99
- Export the messages published on one or more topics to a HTTP endpoint.
1010
- Publish messages to a topic from a HTTP endpoint with a POST to the `/topics/:topic` endpoint.
11+
- Build request/response patterns over Kafka topics with HTTP-style semantics.
1112

1213
## Features
1314

1415
- Consume messages from Kafka topics and forward to HTTP endpoints.
1516
- Send messages to Kafka topics via HTTP API.
17+
- **Request/Response pattern over Kafka topics.**
1618
- Direct binary message passing.
1719
- Configurable retries and concurrency.
1820
- Dead Letter Queue (DLQ) for failed messages.
1921

22+
## Request/Response Pattern
23+
24+
The kafka-hooks library supports HTTP request/response patterns routed through Kafka topics. This enables building responsive microservices that communicate asynchronously via Kafka while maintaining HTTP-style request/response semantics.
25+
26+
### How It Works
27+
28+
1. **HTTP Request**: Client makes a POST request to a configured endpoint
29+
2. **Kafka Request**: The request is published to a Kafka request topic with a unique correlation ID
30+
3. **Service Processing**: External service consumes from the request topic, processes the message
31+
4. **Kafka Response**: Service publishes response to a response topic with the same correlation ID
32+
5. **HTTP Response**: The original HTTP request completes with the response data
33+
34+
### Configuration
35+
36+
Add request/response mappings to your `platformatic.json`:
37+
38+
```json
39+
{
40+
"kafka": {
41+
"brokers": ["localhost:9092"],
42+
"topics": [
43+
{
44+
"topic": "response-topic",
45+
"url": "http://localhost:3043/webhook"
46+
}
47+
],
48+
"requestResponse": [
49+
{
50+
"path": "/api/process",
51+
"requestTopic": "request-topic",
52+
"responseTopic": "response-topic",
53+
"timeout": 5000
54+
}
55+
],
56+
"consumer": {
57+
"groupId": "my-group"
58+
}
59+
}
60+
}
61+
```
62+
63+
### Request/Response Options
64+
65+
| Option | Description | Default |
66+
| --------------- | ----------------------------------------------------- | ---------- |
67+
| `path` | HTTP endpoint path to expose (supports path parameters) | Required |
68+
| `requestTopic` | Kafka topic to publish requests to | Required |
69+
| `responseTopic` | Kafka topic to consume responses from | Required |
70+
| `timeout` | Request timeout in milliseconds | `30000` |
71+
72+
### Path Parameters and Query Strings
73+
74+
The request/response pattern supports both path parameters and query strings, which are automatically passed to Kafka consumers via headers.
75+
76+
**Path Parameters:**
77+
```json
78+
{
79+
"path": "/api/users/:userId/orders/:orderId"
80+
}
81+
```
82+
83+
**Request with path parameters:**
84+
```bash
85+
curl -X POST http://localhost:3042/api/users/123/orders/456 \
86+
-H "Content-Type: application/json" \
87+
-d '{"action": "cancel"}'
88+
```
89+
90+
**Kafka message headers include:**
91+
```json
92+
{
93+
"x-plt-kafka-hooks-path-params": "{\"userId\":\"123\",\"orderId\":\"456\"}"
94+
}
95+
```
96+
97+
**Query String Parameters:**
98+
```bash
99+
curl -X POST http://localhost:3042/api/search?q=coffee&limit=10&sort=price \
100+
-H "Content-Type: application/json" \
101+
-d '{"filters": {...}}'
102+
```
103+
104+
**Kafka message headers include:**
105+
```json
106+
{
107+
"x-plt-kafka-hooks-query-string": "{\"q\":\"coffee\",\"limit\":\"10\",\"sort\":\"price\"}"
108+
}
109+
```
110+
111+
### Usage Example
112+
113+
**Make a request:**
114+
```bash
115+
curl -X POST http://localhost:3042/api/process \
116+
-H "Content-Type: application/json" \
117+
-d '{"userId": 123, "action": "process"}'
118+
```
119+
120+
**Request message published to Kafka:**
121+
```json
122+
{
123+
"headers": {
124+
"content-type": "application/json",
125+
"x-plt-kafka-hooks-correlation-id": "550e8400-e29b-41d4-a716-446655440000"
126+
},
127+
"value": "{\"userId\": 123, \"action\": \"process\"}"
128+
}
129+
```
130+
131+
**Service processes and responds:**
132+
```bash
133+
# External service publishes response
134+
curl -X POST http://localhost:3042/topics/response-topic \
135+
-H "Content-Type: application/json" \
136+
-H "x-plt-kafka-hooks-correlation-id: 550e8400-e29b-41d4-a716-446655440000" \
137+
-H "x-status-code: 200" \
138+
-d '{"result": "success", "data": {...}}'
139+
```
140+
141+
**HTTP response returned to client:**
142+
```json
143+
{
144+
"result": "success",
145+
"data": {...}
146+
}
147+
```
148+
149+
### Request Headers
150+
151+
Request messages automatically include these headers when published to Kafka:
152+
153+
| Header | Description | When Added |
154+
| ------------------------------------ | ---------------------------------------------- | ---------- |
155+
| `x-plt-kafka-hooks-correlation-id` | Unique correlation ID for request matching | Always |
156+
| `x-plt-kafka-hooks-path-params` | JSON string of path parameters | When path parameters present |
157+
| `x-plt-kafka-hooks-query-string` | JSON string of query string parameters | When query parameters present |
158+
| `content-type` | Content type of the request | Always |
159+
160+
### Response Headers
161+
162+
Response messages support these special headers:
163+
164+
| Header | Description | Default |
165+
| ------------------------------------ | ---------------------------------------------- | ------- |
166+
| `x-plt-kafka-hooks-correlation-id` | Must match the original request correlation ID | Required|
167+
| `x-status-code` | HTTP status code for the response | `200` |
168+
| `content-type` | Content type of the response | Preserved |
169+
170+
### Error Handling
171+
172+
**Timeout Response:**
173+
If no response is received within the configured timeout:
174+
```json
175+
{
176+
"code": "HTTP_ERROR_GATEWAY_TIMEOUT",
177+
"error": "Gateway Timeout",
178+
"message": "Request timeout",
179+
"statusCode": 504
180+
}
181+
```
182+
183+
**Missing Correlation ID:**
184+
Responses without correlation IDs are logged as warnings and ignored.
185+
186+
**No Pending Request:**
187+
Responses for non-existent correlation IDs are logged as warnings and ignored.
188+
189+
### Use Cases
190+
191+
- **Microservice Communication**: Route requests through Kafka for reliable delivery
192+
- **Async Processing**: Handle long-running tasks with HTTP-like interface
193+
- **Event-Driven APIs**: Build responsive APIs on event-driven architecture
194+
- **Service Decoupling**: Maintain HTTP contracts while decoupling services via Kafka
195+
20196
## Standalone Install & Setup
21197

22198
You can generate a standalone application with:

config.d.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ export interface PlatformaticKafkaHooksConfiguration {
6464
[k: string]: unknown;
6565
};
6666
};
67+
formatters?: {
68+
path: string;
69+
};
70+
timestamp?: "epochTime" | "unixTime" | "nullTime" | "isoTime";
71+
redact?: {
72+
paths: string[];
73+
censor?: string;
74+
};
6775
[k: string]: unknown;
6876
};
6977
loggerInstance?: {
@@ -330,6 +338,13 @@ export interface PlatformaticKafkaHooksConfiguration {
330338
};
331339
concurrency?: number;
332340
serialization?: string;
341+
requestResponse?: {
342+
path: string;
343+
requestTopic: string;
344+
responseTopic: string;
345+
timeout?: number;
346+
[k: string]: unknown;
347+
}[];
333348
[k: string]: unknown;
334349
};
335350
}

lib/definitions.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
export const keyHeader = 'x-plt-kafka-hooks-key'
22
export const attemptHeader = 'x-plt-kafka-hooks-attempt'
3+
export const correlationIdHeader = 'x-plt-kafka-hooks-correlation-id'
4+
export const pathParamsHeader = 'x-plt-kafka-hooks-path-params'
5+
export const queryStringHeader = 'x-plt-kafka-hooks-query-string'
36
export const minimumRetryDelay = 250
47

58
export const defaultDlqTopic = 'plt-kafka-hooks-dlq'
@@ -8,3 +11,4 @@ export const defaultRetries = 3
811
export const defaultMethod = 'POST'
912
export const defaultIncludeAttemptInRequests = true
1013
export const defaultConcurrency = 10
14+
export const defaultRequestResponseTimeout = 30000

0 commit comments

Comments
 (0)