Skip to content

Commit 2b6311f

Browse files
authored
Kafka batch processing (#333)
* make buckets parameter required for histogram metric * Added batch processing support to Kafka consumer and removed messageType routing in handlers * mention Kafka in README * refactoring and improvements * tests fix * tests fix * removed redundant event type from test permission consumers * condition fix * types improvements * tests fix * linting * consumer closing improvement * refactoring * added DeserializedMessage type * reduced coverage for kafka test branches * updated kafka version * removed default batch size and timeout values * renamed folder * renamed test file * collect batches per topic and partition * added README for kafka * @platformatic/kafka version update
1 parent ce5a493 commit 2b6311f

27 files changed

+1559
-743
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ It consists of the following submodules:
1616
* `@message-queue-toolkit/amqp` - AMQP 0-9-1 (Advanced Message Queuing Protocol), used e. g. by RabbitMQ
1717
* `@message-queue-toolkit/sqs` - SQS (AWS Simple Queue Service)
1818
* `@message-queue-toolkit/sns` - SNS (AWS Simple Notification Service)
19+
* `@message-queue-toolkit/kafka` - Kafka (in development)
1920

2021
## Basic Usage
2122

@@ -29,7 +30,7 @@ They implement the following public methods:
2930
* `options`, composed by
3031
* `messageSchemas` – the `zod` schemas for all supported messages;
3132
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer;
32-
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation
33+
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation. **Note:** It is not supported for Kafka publisher
3334
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
3435
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`;
3536
* `policyConfig` - SQS only - configuration for queue access policies (see [SQS Policy Configuration](#sqs-policy-configuration) for more information);
@@ -93,7 +94,7 @@ Multi-schema consumers support multiple message types via handler configs. They
9394
* `dependencies` – a set of dependencies depending on the protocol;
9495
* `options`, composed by
9596
* `handlers` – configuration for handling each of the supported message types. See "Multi-schema handler definition" for more details;
96-
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler;
97+
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler; **Note:** It is not supported for Kafka consumer
9798
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or an ISO-8601 date string;
9899
* `maxRetryDuration` - how long (in seconds) the message should be retried due to the `retryLater` result before marking it as consumed (and sending to DLQ, if one is configured). This is used to avoid infinite loops. Default is 4 days;
99100
* `queueName`; (for SNS publishers this is a misnomer which actually refers to a topic name)

packages/kafka/README.md

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,40 @@
1-
# @message-queue-toolkit/kafka
1+
# Kafka
22

3-
In development
3+
This library provides utilities for implementing Kafka consumers and publishers.
4+
While following the same patterns as other message broker implementations,
5+
Kafka's unique characteristics require some specific adaptations in the publisher and consumer definitions.
46

7+
> **_NOTE:_** Check [README.md](../../README.md) for transport-agnostic library documentation.
58
6-
## Requirements
7-
- Node.js >= 22.14.0
9+
## Publishers
10+
11+
Use `AbstractKafkaPublisher` as a base class for publisher implementation.
12+
13+
See [test publisher](test/publisher/PermissionPublisher.ts) for an example of implementation.
14+
15+
## Consumers
16+
17+
Use `AbstractKafkaConsumer` as a base class for consumer implementation.
18+
19+
See [test consumer](test/consumer/PermissionConsumer.ts) for an example of implementation.
20+
21+
## Batch Processing
22+
23+
Kafka supports batch processing for improved throughput. To enable it, set `batchProcessingEnabled` to `true` and configure `batchProcessingOptions`.
24+
25+
When batch processing is enabled, message handlers receive an array of messages instead of a single message.
26+
27+
### Configuration Options
28+
29+
- `batchSize` - Maximum number of messages per batch
30+
- `timeoutMilliseconds` - Maximum time to wait for a batch to fill before processing
31+
32+
### How It Works
33+
34+
Messages are buffered per topic-partition combination. Batches are processed when either:
35+
- The buffer reaches the configured `batchSize`
36+
- The `timeoutMilliseconds` timeout is reached since the first message was added
37+
38+
After successful batch processing, the offset of the last message in the batch is committed.
39+
40+
See [test batch consumer](test/consumer/PermissionBatchConsumer.ts) for an example of implementation.

0 commit comments

Comments
 (0)