Skip to content

Commit 5e76c68

Browse files
Support custom message router for partitioned topic producer (#435)
* Support custom message router for partitioned topic producer * Add test * Fix lint * simplify code * Add tests for exceptional cases * Fix router signature * Fix interface * Fix tests * Add documents * Test conflicts of messageRoutingMode and messageRouter
1 parent a2d75c2 commit 5e76c68

File tree

9 files changed

+175
-9
lines changed

9 files changed

+175
-9
lines changed

index.d.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export interface ProducerConfig {
6868
schema?: SchemaInfo;
6969
accessMode?: ProducerAccessMode;
7070
batchingType?: ProducerBatchType;
71+
messageRouter?: MessageRouter;
7172
}
7273

7374
export class Producer {
@@ -176,6 +177,23 @@ export class MessageId {
176177
toString(): string;
177178
}
178179

180+
export interface TopicMetadata {
181+
numPartitions: number;
182+
}
183+
184+
/**
185+
* @callback MessageRouter
186+
* @description When producing messages to a partitioned topic, this router is used to select the
187+
* target partition for each message. The router only works when the `messageRoutingMode` is set to
188+
* `CustomPartition`. Please note that `getTopicName()` cannot be called on the `message`, otherwise
189+
* the behavior will be undefined because the topic is unknown before sending the message.
190+
* @param message The message to be routed.
191+
* @param topicMetadata Metadata for the partitioned topic the message is being routed to.
192+
* @returns {number} The index of the target partition (must be a number between 0 and
193+
* topicMetadata.numPartitions - 1).
194+
*/
195+
export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) => number;
196+
179197
export interface SchemaInfo {
180198
schemaType: SchemaType;
181199
name?: string;

src/Producer.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
7373
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
7474
auto deferred = instanceContext->deferred;
7575
auto cClient = instanceContext->cClient;
76+
auto producerConfig = instanceContext->producerConfig;
7677
delete instanceContext;
7778

7879
if (result != pulsar_result_Ok) {
@@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
8182

8283
std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);
8384

84-
deferred->Resolve([cProducer](const Napi::Env env) {
85+
deferred->Resolve([cProducer, producerConfig](const Napi::Env env) {
8586
Napi::Object obj = Producer::constructor.New({});
8687
Producer *producer = Producer::Unwrap(obj);
8788
producer->SetCProducer(cProducer);
89+
producer->producerConfig = producerConfig;
8890
return obj;
8991
});
9092
},

src/Producer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <napi.h>
2424
#include <pulsar/c/client.h>
2525
#include <pulsar/c/producer.h>
26+
#include <memory>
27+
#include "ProducerConfig.h"
2628

2729
class Producer : public Napi::ObjectWrap<Producer> {
2830
public:
@@ -35,6 +37,9 @@ class Producer : public Napi::ObjectWrap<Producer> {
3537

3638
private:
3739
std::shared_ptr<pulsar_producer_t> cProducer;
40+
// Extend the lifetime of the producer config since it's env and router function could be used when sending
41+
// messages
42+
std::shared_ptr<ProducerConfig> producerConfig;
3843
Napi::Value Send(const Napi::CallbackInfo &info);
3944
Napi::Value Flush(const Napi::CallbackInfo &info);
4045
Napi::Value Close(const Napi::CallbackInfo &info);

src/ProducerConfig.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@
1818
*/
1919
#include "SchemaInfo.h"
2020
#include "ProducerConfig.h"
21+
#include "Message.h"
22+
#include <cstdio>
2123
#include <map>
24+
#include "napi-inl.h"
25+
#include "napi.h"
2226
#include "pulsar/ProducerConfiguration.h"
27+
#include "pulsar/c/message.h"
28+
#include "pulsar/c/message_router.h"
2329

2430
static const std::string CFG_TOPIC = "topic";
2531
static const std::string CFG_PRODUCER_NAME = "producerName";
@@ -42,6 +48,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
4248
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
4349
static const std::string CFG_ACCESS_MODE = "accessMode";
4450
static const std::string CFG_BATCHING_TYPE = "batchingType";
51+
static const std::string CFG_MESSAGE_ROUTER = "messageRouter";
4552

4653
struct _pulsar_producer_configuration {
4754
pulsar::ProducerConfiguration conf;
@@ -82,6 +89,25 @@ static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> PRODUC
8289
{"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
8390
};
8491

92+
static int choosePartition(pulsar_message_t* msg, pulsar_topic_metadata_t* metadata, void* ctx) {
93+
auto router = static_cast<Napi::FunctionReference*>(ctx);
94+
const auto& env = router->Env();
95+
auto jsMessage = Message::NewInstance(Napi::Object::New(env),
96+
std::shared_ptr<pulsar_message_t>(msg, [](pulsar_message_t*) {}));
97+
int numPartitions = pulsar_topic_metadata_get_num_partitions(metadata);
98+
99+
Napi::Object jsTopicMetadata = Napi::Object::New(env);
100+
jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions));
101+
102+
try {
103+
return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value();
104+
} catch (const Napi::Error& e) {
105+
// TODO: how to handle the error properly? For now, return an invalid partition to fail the send
106+
fprintf(stderr, "Error when calling messageRouter: %s\n", e.what());
107+
return numPartitions;
108+
}
109+
}
110+
85111
ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
86112
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
87113
pulsar_producer_configuration_create(), pulsar_producer_configuration_free);
@@ -131,8 +157,10 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
131157
pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig.get(), blockIfQueueFull);
132158
}
133159

160+
bool useCustomPartition = false;
134161
if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) {
135162
std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value();
163+
useCustomPartition = (messageRoutingMode == "CustomPartition");
136164
if (MESSAGE_ROUTING_MODE.count(messageRoutingMode))
137165
pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(),
138166
MESSAGE_ROUTING_MODE.at(messageRoutingMode));
@@ -224,6 +252,15 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
224252
if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
225253
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
226254
}
255+
256+
if (useCustomPartition && producerConfig.Has(CFG_MESSAGE_ROUTER)) {
257+
auto value = producerConfig.Get(CFG_MESSAGE_ROUTER);
258+
if (value.IsFunction()) {
259+
messageRouter = Napi::Persistent(value.As<Napi::Function>());
260+
pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), choosePartition,
261+
&messageRouter);
262+
}
263+
}
227264
}
228265

229266
ProducerConfig::~ProducerConfig() {}

src/ProducerConfig.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222

2323
#include <napi.h>
2424
#include <pulsar/c/producer_configuration.h>
25+
#include <memory>
26+
27+
struct MessageRouterContext {
28+
Napi::FunctionReference messageRouter;
29+
};
2530

2631
class ProducerConfig {
2732
public:
@@ -33,6 +38,8 @@ class ProducerConfig {
3338
private:
3439
std::shared_ptr<pulsar_producer_configuration_t> cProducerConfig;
3540
std::string topic;
41+
std::unique_ptr<MessageRouterContext> routerContext;
42+
Napi::FunctionReference messageRouter;
3643
};
3744

3845
#endif

tests/client.test.js

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
const httpRequest = require('./http_utils');
20+
const httpUtils = require('./http_utils');
2121
const Pulsar = require('../index');
2222

2323
const baseUrl = 'http://localhost:8080';
@@ -74,7 +74,7 @@ const baseUrl = 'http://localhost:8080';
7474
const nonPartitionedTopicName = 'test-non-partitioned-topic';
7575
const nonPartitionedTopic = `persistent://public/default/${nonPartitionedTopicName}`;
7676
const nonPartitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`;
77-
const createNonPartitionedTopicRes = await httpRequest(
77+
const createNonPartitionedTopicRes = await httpUtils.request(
7878
nonPartitionedTopicAdminURL, {
7979
headers: {
8080
'Content-Type': 'application/json',
@@ -91,7 +91,7 @@ const baseUrl = 'http://localhost:8080';
9191
const partitionedTopicName = 'test-partitioned-topic-1';
9292
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
9393
const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
94-
const createPartitionedTopicRes = await httpRequest(
94+
const createPartitionedTopicRes = await httpUtils.request(
9595
partitionedTopicAdminURL, {
9696
headers: {
9797
'Content-Type': 'text/plain',
@@ -110,9 +110,9 @@ const baseUrl = 'http://localhost:8080';
110110
'persistent://public/default/test-partitioned-topic-1-partition-3',
111111
]);
112112

113-
const deleteNonPartitionedTopicRes = await httpRequest(nonPartitionedTopicAdminURL, { method: 'DELETE' });
113+
const deleteNonPartitionedTopicRes = await httpUtils.request(nonPartitionedTopicAdminURL, { method: 'DELETE' });
114114
expect(deleteNonPartitionedTopicRes.statusCode).toBe(204);
115-
const deletePartitionedTopicRes = await httpRequest(partitionedTopicAdminURL, { method: 'DELETE' });
115+
const deletePartitionedTopicRes = await httpUtils.request(partitionedTopicAdminURL, { method: 'DELETE' });
116116
expect(deletePartitionedTopicRes.statusCode).toBe(204);
117117

118118
await client.close();

tests/http_utils.js

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,18 @@ const request = (url, { headers, data = {}, method }) => new Promise((resolve, r
4242
req.end();
4343
});
4444

45-
module.exports = request;
45+
function createPartitionedTopic(topic, numPartitions) {
46+
const url = `http://localhost:8080/admin/v2/persistent/public/default/${topic}/partitions`;
47+
return request(url, {
48+
headers: {
49+
'Content-Type': 'application/json',
50+
},
51+
data: numPartitions,
52+
method: 'PUT',
53+
});
54+
}
55+
56+
module.exports = {
57+
createPartitionedTopic,
58+
request,
59+
};

tests/producer.test.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818
*/
1919

2020
const Pulsar = require('../index');
21+
const httpUtils = require('./http_utils');
22+
23+
function getPartition(msgId) {
24+
// The message id string is in the format of "entryId,ledgerId,partition,batchIndex"
25+
return Number(msgId.toString().split(',')[2]);
26+
}
2127

2228
(() => {
2329
describe('Producer', () => {
@@ -156,5 +162,82 @@ const Pulsar = require('../index');
156162
await producer2.close();
157163
});
158164
});
165+
describe('Message Routing', () => {
166+
test('Custom Message Router', async () => {
167+
const topic = `test-custom-router-${Date.now()}`;
168+
const numPartitions = 3;
169+
const response = await httpUtils.createPartitionedTopic(topic, numPartitions);
170+
expect(response.statusCode).toBe(204);
171+
172+
const producer = await client.createProducer({
173+
topic,
174+
batchingMaxMessages: 2,
175+
messageRouter: (message, topicMetadata) => parseInt(message.getPartitionKey(), 10)
176+
% topicMetadata.numPartitions,
177+
messageRoutingMode: 'CustomPartition',
178+
});
179+
180+
const promises = [];
181+
const numMessages = 5;
182+
for (let i = 0; i < numMessages; i += 1) {
183+
const sendPromise = producer.send({
184+
partitionKey: `${i}`,
185+
data: Buffer.from(`msg-${i}`),
186+
});
187+
await sendPromise;
188+
promises.push(sendPromise);
189+
}
190+
try {
191+
const allMsgIds = await Promise.all(promises);
192+
console.log(`All messages have been sent. IDs: ${allMsgIds.join(', ')}`);
193+
for (let i = 0; i < allMsgIds.length; i += 1) {
194+
// The message id string is in the format of "entryId,ledgerId,partition,batchIndex"
195+
const partition = getPartition(allMsgIds[i]);
196+
expect(i % numPartitions).toBe(partition);
197+
}
198+
} catch (error) {
199+
console.error('One or more messages failed to send:', error);
200+
}
201+
}, 30000);
202+
test('Exception in router', async () => {
203+
const topic = `test-exception-in-router-${Date.now()}`;
204+
const numPartitions = 2;
205+
const response = await httpUtils.createPartitionedTopic(topic, numPartitions);
206+
expect(response.statusCode).toBe(204);
207+
const producer = await client.createProducer({
208+
topic,
209+
messageRouter: (message, topicMetadata) => {
210+
throw new Error('Custom error in message router');
211+
},
212+
messageRoutingMode: 'CustomPartition',
213+
});
214+
await expect(
215+
producer.send({ data: Buffer.from('test') }),
216+
).rejects.toThrow('Failed to send message: UnknownError');
217+
}, 30000);
218+
test('Not CustomPartition', async () => {
219+
const topic = `test-not-custom-part-${Date.now()}`;
220+
const numPartitions = 2;
221+
const response = await httpUtils.createPartitionedTopic(topic, numPartitions);
222+
expect(response.statusCode).toBe(204);
223+
224+
let index = 0;
225+
const producer = await client.createProducer({
226+
topic,
227+
messageRouter: (_, topicMetadata) => {
228+
const result = index % topicMetadata.numPartitions;
229+
index += 1;
230+
return result;
231+
},
232+
messageRoutingMode: 'UseSinglePartition',
233+
});
234+
const partitions = new Set();
235+
for (let i = 0; i < 10; i += 1) {
236+
const msgId = await producer.send({ data: Buffer.from('msg') });
237+
partitions.add(getPartition(msgId));
238+
}
239+
expect(partitions.size).toBe(1);
240+
}, 30000);
241+
});
159242
});
160243
})();

tests/reader.test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
const lodash = require('lodash');
2121
const Pulsar = require('../index');
22-
const httpRequest = require('./http_utils');
22+
const httpUtils = require('./http_utils');
2323

2424
const baseUrl = 'http://localhost:8080';
2525

@@ -81,7 +81,7 @@ const baseUrl = 'http://localhost:8080';
8181
const partitionedTopicName = 'test-reader-partitioned-topic';
8282
const partitionedTopic = `persistent://public/default/${partitionedTopicName}`;
8383
const partitionedTopicAdminURL = `${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
84-
const createPartitionedTopicRes = await httpRequest(
84+
const createPartitionedTopicRes = await httpUtils.request(
8585
partitionedTopicAdminURL, {
8686
headers: {
8787
'Content-Type': 'text/plain',

0 commit comments

Comments
 (0)