Skip to content

Commit ead380c

Browse files
committed
feat: processing messages one by one
1 parent e4d5904 commit ead380c

File tree

4 files changed

+200
-98
lines changed

4 files changed

+200
-98
lines changed

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts

Lines changed: 91 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import type {
2020
EventStream,
2121
MongoDBReadEventMetadata,
2222
} from '../mongoDBEventStore';
23+
import { CancellationPromise } from './CancellablePromise';
2324
import {
2425
changeStreamReactor,
2526
mongoDBProjector,
@@ -168,6 +169,7 @@ export const mongoDBMessagesConsumer = <
168169
>
169170
>;
170171
let isRunning = false;
172+
let runningPromise = new CancellationPromise<null>();
171173
const client =
172174
'client' in options && options.client
173175
? options.client
@@ -227,6 +229,8 @@ export const mongoDBMessagesConsumer = <
227229

228230
isRunning = true;
229231

232+
runningPromise = new CancellationPromise<null>();
233+
230234
const positions = await Promise.all(
231235
processors.map((o) => o.start({ client } as Partial<HandlerContext>)),
232236
);
@@ -235,58 +239,104 @@ export const mongoDBMessagesConsumer = <
235239

236240
stream = subscribe<Event, CheckpointType>(startFrom);
237241

238-
stream.on('change', async (change) => {
239-
const resumeToken = change._id;
240-
const typedChange = change as OplogChange;
241-
const streamChange =
242-
'updateDescription' in typedChange
243-
? {
244-
messages: Object.entries(
245-
typedChange.updateDescription.updatedFields,
246-
)
247-
.filter(([key]) => key.startsWith('messages.'))
248-
.map(([, value]) => value as ReadEvent),
249-
}
250-
: typedChange.fullDocument;
242+
void (async () => {
243+
while (!stream.closed && isRunning) {
244+
const hasNext = await Promise.race([
245+
stream.hasNext(),
246+
runningPromise,
247+
]);
251248

252-
if (!streamChange) {
253-
return;
254-
}
249+
if (hasNext === null) {
250+
break;
251+
}
252+
253+
if (!hasNext) {
254+
continue;
255+
}
256+
257+
const change = await stream.next();
258+
const resumeToken = change._id;
259+
const typedChange = change as OplogChange;
260+
const streamChange =
261+
'updateDescription' in typedChange
262+
? {
263+
messages: Object.entries(
264+
typedChange.updateDescription.updatedFields,
265+
)
266+
.filter(([key]) => key.startsWith('messages.'))
267+
.map(([, value]) => value as ReadEvent),
268+
}
269+
: typedChange.fullDocument;
255270

256-
const messages = streamChange.messages.map((message) => {
257-
return {
258-
kind: message.kind,
259-
type: message.type,
260-
data: message.data,
261-
metadata: {
262-
...message.metadata,
263-
streamPosition: resumeToken,
264-
},
265-
} as unknown as RecordedMessage<
266-
ConsumerMessageType,
267-
MessageMetadataType
268-
>;
269-
});
271+
if (!streamChange) {
272+
return;
273+
}
270274

271-
for (const processor of processors.filter(
272-
({ isActive }) => isActive,
273-
)) {
274-
await processor.handle(messages, {
275-
client,
276-
} as Partial<HandlerContext>);
275+
const messages = streamChange.messages.map((message) => {
276+
return {
277+
kind: message.kind,
278+
type: message.type,
279+
data: message.data,
280+
metadata: {
281+
...message.metadata,
282+
globalPosition: resumeToken,
283+
},
284+
} as unknown as RecordedMessage<
285+
ConsumerMessageType,
286+
MessageMetadataType
287+
>;
288+
});
289+
290+
for (const processor of processors.filter(
291+
({ isActive }) => isActive,
292+
)) {
293+
await processor.handle(messages, {
294+
client,
295+
} as Partial<HandlerContext>);
296+
}
277297
}
278-
});
298+
299+
console.log('END');
300+
})();
279301
})();
280302

281303
return start;
282304
},
283305
stop: async () => {
284-
await stream.close();
285-
isRunning = false;
306+
if (stream) {
307+
await stream.close();
308+
isRunning = false;
309+
runningPromise.resolve(null);
310+
}
286311
},
287312
close: async () => {
288-
await stream.close();
289-
isRunning = false;
313+
if (stream) {
314+
await stream.close();
315+
isRunning = false;
316+
runningPromise.resolve(null);
317+
}
290318
},
291319
};
292320
};
321+
322+
export const mongoDBChangeStreamMessagesConsumer = <
323+
ConsumerMessageType extends Message = AnyMessage,
324+
MessageMetadataType extends
325+
MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata,
326+
HandlerContext extends
327+
MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext,
328+
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
329+
>(
330+
options: MongoDBConsumerOptions<
331+
ConsumerMessageType,
332+
MessageMetadataType,
333+
HandlerContext,
334+
CheckpointType
335+
>,
336+
): MongoDBEventStoreConsumer<ConsumerMessageType> =>
337+
mongoDBMessagesConsumer<
338+
ConsumerMessageType,
339+
MessageMetadataType,
340+
HandlerContext,
341+
CheckpointType
342+
>(options);

src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ export const storeProcessorCheckpoint = async <Position>(
7171
);
7272

7373
if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) {
74-
return { success: true, newPosition };
74+
return { success: true, newPosition: newPosition! };
7575
}
7676

7777
return { success: false, reason: 'MISMATCH' };

src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
EmmettError,
23
IllegalStateError,
34
type AsyncRetryOptions,
45
type BatchRecordedMessageHandlerWithoutContext,
@@ -84,11 +85,9 @@ export const generateVersionPolicies = async (db: Db) => {
8485
const semver = parseSemVer(buildInfo.version);
8586
const major = semver.major || 0;
8687
const throwNotSupportedError = (): never => {
87-
throw new Error();
88-
// throw new NotSupportedMongoVersionError({
89-
// currentVersion: buildInfo.version,
90-
// supportedVersions: SupportedMajorMongoVersions,
91-
// });
88+
throw new EmmettError(
89+
`Not supported MongoDB version: ${buildInfo.version}.`,
90+
);
9291
};
9392

9493
const supportedVersionCheckPolicy = () => {

0 commit comments

Comments
 (0)