Skip to content

Commit 0d6fb72

Browse files
authored
Support set batchingMaxAllowedSizeInBytes on producer batch configuration (#436)
1 parent 5e76c68 commit 0d6fb72

File tree

3 files changed

+86
-0
lines changed

3 files changed

+86
-0
lines changed

index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ export interface ProducerConfig {
6969
accessMode?: ProducerAccessMode;
7070
batchingType?: ProducerBatchType;
7171
messageRouter?: MessageRouter;
72+
batchingMaxAllowedSizeInBytes?: number;
7273
}
7374

7475
export class Producer {

src/ProducerConfig.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ static const std::string CFG_COMPRESS_TYPE = "compressionType";
4040
static const std::string CFG_BATCH_ENABLED = "batchingEnabled";
4141
static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs";
4242
static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages";
43+
static const std::string CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES = "batchingMaxAllowedSizeInBytes";
4344
static const std::string CFG_SCHEMA = "schema";
4445
static const std::string CFG_PROPS = "properties";
4546
static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
@@ -201,6 +202,16 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
201202
}
202203
}
203204

205+
if (producerConfig.Has(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES) &&
206+
producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).IsNumber()) {
207+
int64_t batchingMaxAllowedSizeInBytes =
208+
producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).ToNumber().Int64Value();
209+
if (batchingMaxAllowedSizeInBytes > 0) {
210+
pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
211+
this->cProducerConfig.get(), (unsigned long)batchingMaxAllowedSizeInBytes);
212+
}
213+
}
214+
204215
if (producerConfig.Has(CFG_SCHEMA) && producerConfig.Get(CFG_SCHEMA).IsObject()) {
205216
SchemaInfo* schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
206217
schemaInfo->SetProducerSchema(this->cProducerConfig);

tests/producer.test.js

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,5 +239,79 @@ function getPartition(msgId) {
239239
expect(partitions.size).toBe(1);
240240
}, 30000);
241241
});
242+
describe('Batching', () => {
243+
function getBatchIndex(msgId) {
244+
const parts = msgId.toString().split(':');
245+
if (parts.length > 3) {
246+
return Number(parts[3]);
247+
}
248+
return -1;
249+
}
250+
251+
test('should batch messages based on max allowed size in bytes', async () => {
252+
const topicName = `persistent://public/default/test-batch-size-in-bytes-${Date.now()}`;
253+
const subName = 'subscription-name';
254+
const numOfMessages = 30;
255+
const prefix = '12345678'; // 8 bytes message prefix
256+
257+
let producer;
258+
let consumer;
259+
260+
try {
261+
// 1. Setup Producer with batching enabled and size limit
262+
producer = await client.createProducer({
263+
topic: topicName,
264+
compressionType: 'LZ4',
265+
batchingEnabled: true,
266+
batchingMaxMessages: 10000,
267+
batchingMaxAllowedSizeInBytes: 20,
268+
});
269+
270+
// 2. Setup Consumer
271+
consumer = await client.subscribe({
272+
topic: topicName,
273+
subscription: subName,
274+
});
275+
276+
// 3. Send messages asynchronously
277+
const sendPromises = [];
278+
for (let i = 0; i < numOfMessages; i += 1) {
279+
const messageContent = prefix + i;
280+
const msg = {
281+
data: Buffer.from(messageContent),
282+
properties: { msgIndex: String(i) },
283+
};
284+
sendPromises.push(producer.send(msg));
285+
}
286+
await producer.flush();
287+
await Promise.all(sendPromises);
288+
289+
// 4. Receive messages and run assertions
290+
let receivedCount = 0;
291+
for (let i = 0; i < numOfMessages; i += 1) {
292+
const receivedMsg = await consumer.receive(5000);
293+
const expectedMessageContent = prefix + i;
294+
295+
// Assert that batchIndex is 0 or 1, since batch size should be 2
296+
const batchIndex = getBatchIndex(receivedMsg.getMessageId());
297+
expect(batchIndex).toBeLessThan(2);
298+
299+
// Assert message properties and content
300+
expect(receivedMsg.getProperties().msgIndex).toBe(String(i));
301+
expect(receivedMsg.getData().toString()).toBe(expectedMessageContent);
302+
303+
await consumer.acknowledge(receivedMsg);
304+
receivedCount += 1;
305+
}
306+
307+
// 5. Final check on the number of consumed messages
308+
expect(receivedCount).toBe(numOfMessages);
309+
} finally {
310+
// 6. Cleanup
311+
if (producer) await producer.close();
312+
if (consumer) await consumer.close();
313+
}
314+
}, 30000);
315+
});
242316
});
243317
})();

0 commit comments

Comments
 (0)