-
-
Notifications
You must be signed in to change notification settings - Fork 41
Added MongoDB Consumer based on MongoDB ChangeStream subscription #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
oskardudycz
merged 29 commits into
event-driven-io:main
from
arturwojnar:feat/subscriptions
Nov 9, 2025
Merged
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
8c6af5c
feat: mongodb processor and subscription
arturwojnar bb80bf3
feat: receiving updates on messages
arturwojnar bb1ebfe
refactor: removed the esdb copy-paste leftovers
arturwojnar ae79330
fix: revert the processors type back to generic MessageProcessor
arturwojnar 753103c
fix: removed onHandleStart
arturwojnar 592a866
refactor: to mongoDbEventsConsumer renamed to mongoDBMessagesConsumer
arturwojnar 30fa044
feat: databaseName as parameter for the readProcessorCheckpoint
arturwojnar 2de9d86
test: storeProcessorCheckpoint and readProcessorCheckpoint tests
arturwojnar fc53348
refactor: storeProcessorCheckpoint
arturwojnar 847580d
chore: eslint fix
arturwojnar fa8cb43
feat: handling an unknown Position
arturwojnar 4c03a71
feat: starting from the earliest position
arturwojnar e4d5904
fix: eslint all fixed
arturwojnar ead380c
feat: processing messages one by one
arturwojnar 13afb1e
fix: removed incorrect change
arturwojnar 88d8724
Merge remote-tracking branch 'upstream/main' into feat/subscriptions
arturwojnar f669ab4
test: fix
arturwojnar f81bf91
test: tests, eslint, ts fixed
arturwojnar 754afd3
Added default partition value instead of null in MongoDB checkpoints
oskardudycz f3d2317
Added closing of MongoClient in consumer if it was created internally…
oskardudycz b89fb91
Small adjustments to assertions in MongoDB tests
oskardudycz 209b5f8
Moved reading database policies to subscription
oskardudycz 278a7bf
Removed redundant message types from MongoDB processors
oskardudycz 507a345
Made MongoDB checkpointers to be aligned with others
oskardudycz 4bebdd1
Merged mongodb checkpoints into one file added more tests
oskardudycz b28cca3
Added check if mongodb subscription wasn't already stopped when starting
oskardudycz 06ced8d
Fixed mongodb unavailable error detection
oskardudycz 1a0a14c
Used URN like MongoDB checkpoint with message position
oskardudycz d5626b8
Renamed mongoDBEventStoreConsumer file
oskardudycz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
63 changes: 63 additions & 0 deletions
63
src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| import assert from 'assert'; | ||
|
|
||
| export class CancellationPromise<T = unknown> extends Promise<T> { | ||
| private _resolve: (value: T | PromiseLike<T>) => void; | ||
| private _reject: (reason?: unknown) => void; | ||
| private _state: 'resolved' | 'rejected' | 'pending' = 'pending'; | ||
|
|
||
| constructor( | ||
| executor: ( | ||
| resolve: (value: T | PromiseLike<T>) => void, | ||
| reject: (reason?: unknown) => void, | ||
| ) => void = () => null, | ||
| ) { | ||
| let _resolve: ((value: T | PromiseLike<T>) => void) | undefined = undefined; | ||
| let _reject: ((reason?: unknown) => void) | undefined = undefined; | ||
|
|
||
| super((resolve, reject) => { | ||
| executor(resolve, reject); | ||
| _resolve = resolve; | ||
| _reject = reject; | ||
| }); | ||
|
|
||
| assert(_resolve); | ||
| assert(_reject); | ||
|
|
||
| this._resolve = _resolve; | ||
| this._reject = _reject; | ||
| } | ||
|
|
||
| reject(reason?: unknown): void { | ||
| this._state = 'rejected'; | ||
| this._reject(reason); | ||
| } | ||
|
|
||
| resolve(value?: T): void { | ||
| this._state = 'resolved'; | ||
| this._resolve(value as T); | ||
| } | ||
|
|
||
| get isResolved() { | ||
| return this._state === 'resolved'; | ||
| } | ||
|
|
||
| get isRejected() { | ||
| return this._state === 'rejected'; | ||
| } | ||
|
|
||
| get isPending() { | ||
| return this._state === 'pending'; | ||
| } | ||
|
|
||
| static resolved<R = unknown>(value?: R) { | ||
| const promise = new CancellationPromise<R>(); | ||
| promise.resolve(value as R); | ||
| return promise; | ||
| } | ||
|
|
||
| static rejected<R extends Error = Error>(value: R) { | ||
| const promise = new CancellationPromise<R>(); | ||
| promise.reject(value); | ||
| return promise; | ||
| } | ||
| } |
270 changes: 270 additions & 0 deletions
270
src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,270 @@ | ||
| import { | ||
| EmmettError, | ||
| type AnyEvent, | ||
| type AnyMessage, | ||
| type AsyncRetryOptions, | ||
| type CommonRecordedMessageMetadata, | ||
| type Event, | ||
| type GlobalPositionTypeOfRecordedMessageMetadata, | ||
| type Message, | ||
| type MessageConsumer, | ||
| type ReadEvent, | ||
| type RecordedMessage, | ||
| } from '@event-driven-io/emmett'; | ||
| import { ChangeStream, MongoClient, type MongoClientOptions } from 'mongodb'; | ||
| import { v4 as uuid } from 'uuid'; | ||
| import type { | ||
| MongoDBRecordedMessageMetadata, | ||
| ReadEventMetadataWithGlobalPosition, | ||
| } from '../event'; | ||
| import type { | ||
| EventStream, | ||
| MongoDBReadEventMetadata, | ||
| } from '../mongoDBEventStore'; | ||
| import { | ||
| changeStreamReactor, | ||
| mongoDBProjector, | ||
| type MongoDBProcessor, | ||
| type MongoDBProcessorOptions, | ||
| type MongoDBProjectorOptions, | ||
| } from './mongoDBProcessor'; | ||
| import { | ||
| subscribe as _subscribe, | ||
| zipMongoDBMessageBatchPullerStartFrom, | ||
| type ChangeStreamFullDocumentValuePolicy, | ||
| type MongoDBSubscriptionDocument, | ||
| } from './subscriptions'; | ||
|
|
||
| const noop = () => Promise.resolve(); | ||
|
|
||
| export type MessageConsumerOptions< | ||
| MessageType extends Message = AnyMessage, | ||
| MessageMetadataType extends | ||
| MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata, | ||
| CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>, | ||
| > = { | ||
| consumerId?: string; | ||
|
|
||
| processors?: MongoDBProcessor<MessageType>[]; | ||
| }; | ||
|
|
||
| export type EventStoreDBEventStoreConsumerConfig< | ||
oskardudycz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| ConsumerMessageType extends Message = any, | ||
| > = MessageConsumerOptions<ConsumerMessageType> & { | ||
| // from?: any; | ||
| pulling?: { | ||
| batchSize?: number; | ||
| }; | ||
| resilience?: { | ||
| resubscribeOptions?: AsyncRetryOptions; | ||
| }; | ||
| changeStreamFullDocumentPolicy: ChangeStreamFullDocumentValuePolicy; | ||
| }; | ||
|
|
||
| export type MongoDBConsumerOptions< | ||
| ConsumerEventType extends Message = Message, | ||
| > = EventStoreDBEventStoreConsumerConfig<ConsumerEventType> & | ||
| ( | ||
| | { | ||
| connectionString: string; | ||
| clientOptions?: MongoClientOptions; | ||
| client?: never; | ||
| onHandleStart?: ( | ||
oskardudycz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| messages: RecordedMessage< | ||
| ConsumerEventType, | ||
| ReadEventMetadataWithGlobalPosition | ||
| >[], | ||
| ) => Promise<void>; | ||
| onHandleEnd?: ( | ||
| messages: RecordedMessage< | ||
| ConsumerEventType, | ||
| ReadEventMetadataWithGlobalPosition | ||
| >[], | ||
| ) => Promise<void>; | ||
| } | ||
| | { | ||
| client: MongoClient; | ||
| connectionString?: never; | ||
| clientOptions?: never; | ||
| onHandleStart?: ( | ||
| messages: RecordedMessage< | ||
| ConsumerEventType, | ||
| ReadEventMetadataWithGlobalPosition | ||
| >[], | ||
| ) => Promise<void>; | ||
| onHandleEnd?: ( | ||
| messages: RecordedMessage< | ||
| ConsumerEventType, | ||
| ReadEventMetadataWithGlobalPosition | ||
| >[], | ||
| ) => Promise<void>; | ||
| } | ||
| ); | ||
|
|
||
| export type EventStoreDBEventStoreConsumer< | ||
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
| ConsumerMessageType extends AnyMessage = any, | ||
| > = MessageConsumer<ConsumerMessageType> & | ||
| Readonly<{ | ||
| reactor: <MessageType extends AnyMessage = ConsumerMessageType>( | ||
| options: MongoDBProcessorOptions<MessageType>, | ||
| ) => MongoDBProcessor<MessageType>; | ||
| }> & | ||
| (AnyEvent extends ConsumerMessageType | ||
| ? Readonly<{ | ||
| projector: < | ||
| EventType extends AnyEvent = ConsumerMessageType & AnyEvent, | ||
| >( | ||
| options: MongoDBProjectorOptions<EventType>, | ||
| ) => MongoDBProcessor<EventType>; | ||
| }> | ||
| : object); | ||
|
|
||
| type MessageArrayElement = `messages.${string}`; | ||
| type UpdateDescription<T> = { | ||
| updateDescription: { | ||
| updatedFields: Record<MessageArrayElement, T> & { | ||
| 'metadata.streamPosition': number; | ||
| 'metadata.updatedAt': Date; | ||
| }; | ||
| }; | ||
| }; | ||
| type FullDocument< | ||
| EventType extends Event = Event, | ||
| EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, | ||
| T extends EventStream = EventStream<EventType, EventMetaDataType>, | ||
| > = { | ||
| fullDocument: T; | ||
| }; | ||
| type OplogChange< | ||
| EventType extends Event = Event, | ||
| EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, | ||
| T extends EventStream = EventStream<EventType, EventMetaDataType>, | ||
| > = | ||
| | FullDocument<EventType, EventMetaDataType, T> | ||
| | UpdateDescription<ReadEvent<EventType, EventMetaDataType>>; | ||
|
|
||
| export const mongoDBEventsConsumer = < | ||
oskardudycz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ConsumerMessageType extends Message = AnyMessage, | ||
| >( | ||
| options: MongoDBConsumerOptions<ConsumerMessageType>, | ||
| ): EventStoreDBEventStoreConsumer<ConsumerMessageType> => { | ||
| let start: Promise<void>; | ||
| let stream: ChangeStream< | ||
| EventStream<Event, CommonRecordedMessageMetadata>, | ||
| MongoDBSubscriptionDocument< | ||
| EventStream<Event, CommonRecordedMessageMetadata> | ||
| > | ||
| >; | ||
| let isRunning = false; | ||
| const client = | ||
| 'client' in options && options.client | ||
| ? options.client | ||
| : new MongoClient(options.connectionString, options.clientOptions); | ||
| const processors = options.processors ?? []; | ||
| const subscribe = _subscribe( | ||
| options.changeStreamFullDocumentPolicy, | ||
| client.db(), | ||
oskardudycz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ); | ||
| const onHandleStart = options.onHandleStart || noop; | ||
| const onHandleEnd = options.onHandleEnd || noop; | ||
|
|
||
| return { | ||
| consumerId: options.consumerId ?? uuid(), | ||
| get isRunning() { | ||
| return isRunning; | ||
| }, | ||
| processors, | ||
| reactor: <MessageType extends AnyMessage = ConsumerMessageType>( | ||
| options: MongoDBProcessorOptions<MessageType>, | ||
| ): MongoDBProcessor<MessageType> => { | ||
| const processor = changeStreamReactor(options); | ||
|
|
||
| processors.push(processor as unknown as MongoDBProcessor); | ||
|
|
||
| return processor; | ||
| }, | ||
| projector: <EventType extends AnyEvent = ConsumerMessageType & AnyEvent>( | ||
| options: MongoDBProjectorOptions<EventType>, | ||
| ): MongoDBProcessor<EventType> => { | ||
| const processor = mongoDBProjector(options); | ||
|
|
||
| processors.push(processor as unknown as MongoDBProcessor); | ||
|
|
||
| return processor; | ||
| }, | ||
| start: () => { | ||
| start = (async () => { | ||
| if (processors.length === 0) | ||
| return Promise.reject( | ||
| new EmmettError( | ||
| 'Cannot start consumer without at least a single processor', | ||
| ), | ||
| ); | ||
|
|
||
| isRunning = true; | ||
|
|
||
| const positions = await Promise.all( | ||
| processors.map((o) => o.start(options)), | ||
| ); | ||
| const startFrom = zipMongoDBMessageBatchPullerStartFrom(positions); | ||
|
|
||
| stream = subscribe( | ||
| typeof startFrom !== 'string' ? startFrom.lastCheckpoint : void 0, | ||
| ); | ||
| stream.on('change', async (change) => { | ||
| const resumeToken = change._id; | ||
| const typedChange = change as OplogChange; | ||
| const streamChange = | ||
| 'updateDescription' in typedChange | ||
| ? { | ||
| messages: Object.entries( | ||
| typedChange.updateDescription.updatedFields, | ||
| ) | ||
| .filter(([key]) => key.startsWith('messages.')) | ||
| .map(([, value]) => value as ReadEvent), | ||
| } | ||
| : typedChange.fullDocument; | ||
|
|
||
| if (!streamChange) { | ||
| return; | ||
| } | ||
|
|
||
| const messages = streamChange.messages.map((message) => { | ||
| return { | ||
| kind: message.kind, | ||
| type: message.type, | ||
| data: message.data, | ||
| metadata: { | ||
| ...message.metadata, | ||
| streamPosition: resumeToken, | ||
| }, | ||
| } as unknown as RecordedMessage< | ||
| ConsumerMessageType, | ||
| ReadEventMetadataWithGlobalPosition | ||
| >; | ||
| }); | ||
|
|
||
| await onHandleStart(messages); | ||
|
|
||
| for (const processor of processors.filter( | ||
| ({ isActive }) => isActive, | ||
| )) { | ||
| await processor.handle(messages, { client }); | ||
| } | ||
|
|
||
| await onHandleEnd(messages); | ||
| }); | ||
| })(); | ||
|
|
||
| return start; | ||
| }, | ||
| stop: async () => { | ||
| return Promise.resolve(); | ||
| }, | ||
| close: async () => { | ||
| await stream.close(); | ||
| }, | ||
| }; | ||
| }; | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.