Skip to content

Commit 85c7d37

Browse files
committed
Removed redundant message types from MongoDB processors
1 parent 209b5f8 commit 85c7d37

File tree

2 files changed

+82
-107
lines changed

2 files changed

+82
-107
lines changed

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,27 +28,6 @@ export type MongoDBProcessorHandlerContext = {
2828
client: MongoClient;
2929
};
3030

31-
export type CommonRecordedMessageMetadata<StreamPosition = MongoDBResumeToken> =
32-
Readonly<{
33-
messageId: string;
34-
streamPosition: StreamPosition;
35-
streamName: string;
36-
}>;
37-
38-
export type WithGlobalPosition<GlobalPosition> = Readonly<{
39-
globalPosition: GlobalPosition;
40-
}>;
41-
42-
export type RecordedMessageMetadata<
43-
GlobalPosition = undefined,
44-
StreamPosition = MongoDBResumeToken,
45-
> = CommonRecordedMessageMetadata<StreamPosition> &
46-
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
47-
(GlobalPosition extends undefined ? {} : WithGlobalPosition<GlobalPosition>);
48-
49-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
50-
export type AnyRecordedMessageMetadata = RecordedMessageMetadata<any, any>;
51-
5231
export type MongoDBProcessor<MessageType extends Message = AnyMessage> =
5332
MessageProcessor<
5433
MessageType,

src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts

Lines changed: 82 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -419,99 +419,95 @@ export const mongoDBSubscription = <
419419
) => {
420420
let retry = 0;
421421

422-
return asyncRetry(
423-
() =>
424-
new Promise<void>(async (resolve, reject) => {
425-
console.info(
426-
`Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify(
427-
options.startFrom,
428-
)}`,
429-
);
422+
return asyncRetry(async () => {
423+
const db = client.db(options.dbName);
424+
425+
const versionPolicies = await getDatabaseVersionPolicies(db);
426+
const policy = versionPolicies.changeStreamFullDocumentValuePolicy;
427+
428+
return new Promise<void>((resolve, reject) => {
429+
console.info(
430+
`Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify(
431+
options.startFrom,
432+
)}`,
433+
);
434+
435+
subscription = subscribe(
436+
policy,
437+
client.db(options.dbName),
438+
)<MessageType, ResumeToken>(options.startFrom);
439+
440+
processor = new SubscriptionSequentialHandler<
441+
MessageType,
442+
MessageMetadataType
443+
>({
444+
client,
445+
from,
446+
// batchSize,
447+
eachBatch,
448+
resilience,
449+
});
450+
451+
const handler = new (class extends Writable {
452+
async _write(
453+
result: MongoDBResumeToken | MessageHandlerResult,
454+
_encoding: string,
455+
done: () => void,
456+
) {
457+
if (!isRunning) return;
458+
459+
if (isMongoDBResumeToken(result)) {
460+
options.startFrom = {
461+
lastCheckpoint: result as ResumeToken,
462+
};
463+
done();
464+
return;
465+
}
430466

431-
const db = client.db(options.dbName);
432-
433-
const versionPolicies = await getDatabaseVersionPolicies(db);
434-
const policy = versionPolicies.changeStreamFullDocumentValuePolicy;
435-
436-
subscription = subscribe(
437-
policy,
438-
client.db(options.dbName),
439-
)<MessageType, ResumeToken>(options.startFrom);
440-
441-
processor = new SubscriptionSequentialHandler<
442-
MessageType,
443-
MessageMetadataType
444-
>({
445-
client,
446-
from,
447-
// batchSize,
448-
eachBatch,
449-
resilience,
450-
});
451-
452-
const handler = new (class extends Writable {
453-
async _write(
454-
result: MongoDBResumeToken | MessageHandlerResult,
455-
_encoding: string,
456-
done: () => void,
457-
) {
458-
if (!isRunning) return;
459-
460-
if (isMongoDBResumeToken(result)) {
461-
options.startFrom = {
462-
lastCheckpoint: result as ResumeToken,
463-
};
464-
done();
467+
if (result && result.type === 'STOP' && result.error) {
468+
console.error(
469+
`Subscription stopped with error code: ${result.error.errorCode}, message: ${
470+
result.error.message
471+
}.`,
472+
);
473+
}
474+
475+
await stopSubscription();
476+
done();
477+
}
478+
})({ objectMode: true });
479+
480+
pipeline(
481+
subscription,
482+
processor,
483+
handler,
484+
async (error: Error | null) => {
485+
console.info(`Stopping subscription.`);
486+
await stopSubscription(() => {
487+
if (!error) {
488+
console.info('Subscription ended successfully.');
489+
resolve();
465490
return;
466491
}
467492

468-
if (result && result.type === 'STOP' && result.error) {
469-
console.error(
470-
`Subscription stopped with error code: ${result.error.errorCode}, message: ${
471-
result.error.message
472-
}.`,
473-
);
493+
if (
494+
error.message === 'ChangeStream is closed' &&
495+
error.name === 'MongoAPIError'
496+
) {
497+
console.info('Subscription ended successfully.');
498+
resolve();
499+
return;
474500
}
475501

476-
await stopSubscription();
477-
done();
478-
}
479-
})({ objectMode: true });
480-
481-
pipeline(
482-
subscription,
483-
processor,
484-
handler,
485-
async (error: Error | null) => {
486-
console.info(`Stopping subscription.`);
487-
await stopSubscription(() => {
488-
if (!error) {
489-
console.info('Subscription ended successfully.');
490-
resolve();
491-
return;
492-
}
493-
494-
if (
495-
error.message === 'ChangeStream is closed' &&
496-
error.name === 'MongoAPIError'
497-
) {
498-
console.info('Subscription ended successfully.');
499-
resolve();
500-
return;
501-
}
502-
503-
console.error(
504-
`Received error: ${JSONParser.stringify(error)}.`,
505-
);
506-
reject(error);
507-
});
508-
},
509-
);
502+
console.error(`Received error: ${JSONParser.stringify(error)}.`);
503+
reject(error);
504+
});
505+
},
506+
);
510507

511-
console.log('OK');
512-
}),
513-
resubscribeOptions,
514-
);
508+
console.log('OK');
509+
});
510+
}, resubscribeOptions);
515511
};
516512

517513
return {

0 commit comments

Comments
 (0)