Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8c6af5c
feat: mongodb processor and subscription
arturwojnar Jul 9, 2025
bb80bf3
feat: receiving updates on messages
arturwojnar Jul 31, 2025
bb1ebfe
refactor: removed the esdb copy-paste leftovers
arturwojnar Aug 6, 2025
ae79330
fix: revert the processors type back to generic MessageProcessor
arturwojnar Aug 6, 2025
753103c
fix: removed onHandleStart
arturwojnar Aug 6, 2025
592a866
refactor: to mongoDbEventsConsumer renamed to mongoDBMessagesConsumer
arturwojnar Aug 6, 2025
30fa044
feat: databaseName as parameter for the readProcessorCheckpoint
arturwojnar Aug 6, 2025
2de9d86
test: storeProcessorCheckpoint and readProcessorCheckpoint tests
arturwojnar Aug 9, 2025
fc53348
refactor: storeProcessorCheckpoint
arturwojnar Aug 9, 2025
847580d
chore: eslint fix
arturwojnar Aug 9, 2025
fa8cb43
feat: handling an unknown Position
arturwojnar Aug 9, 2025
4c03a71
feat: starting from the earliest position
arturwojnar Aug 10, 2025
e4d5904
fix: eslint all fixed
arturwojnar Aug 11, 2025
ead380c
feat: processing messages one by one
arturwojnar Aug 15, 2025
13afb1e
fix: removed incorrect change
arturwojnar Aug 15, 2025
88d8724
Merge remote-tracking branch 'upstream/main' into feat/subscriptions
arturwojnar Aug 21, 2025
f669ab4
test: fix
arturwojnar Sep 11, 2025
f81bf91
test: tests, eslint, ts fixed
arturwojnar Sep 17, 2025
754afd3
Added default partition value instead of null in MongoDB checkpoints
oskardudycz Nov 8, 2025
f3d2317
Added closing of MongoClient in consumer if it was created internally…
oskardudycz Nov 8, 2025
b89fb91
Small adjustments to assertions in MongoDB tests
oskardudycz Nov 8, 2025
209b5f8
Moved reading database policies to subscription
oskardudycz Nov 8, 2025
278a7bf
Removed redundant message types from MongoDB processors
oskardudycz Nov 8, 2025
507a345
Made MongoDB checkpointers to be aligned with others
oskardudycz Nov 8, 2025
4bebdd1
Merged mongodb checkpoints into one file added more tests
oskardudycz Nov 8, 2025
b28cca3
Added check if mongodb subscription wasn't already stopped when starting
oskardudycz Nov 8, 2025
06ced8d
Fixed mongodb unavailable error detection
oskardudycz Nov 8, 2025
1a0a14c
Used URN like MongoDB checkpoint with message position
oskardudycz Nov 9, 2025
d5626b8
Renamed mongoDBEventStoreConsumer file
oskardudycz Nov 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import assert from 'assert';

export class CancellationPromise<T = unknown> extends Promise<T> {
private _resolve: (value: T | PromiseLike<T>) => void;
private _reject: (reason?: unknown) => void;
private _state: 'resolved' | 'rejected' | 'pending' = 'pending';

constructor(
executor: (
resolve: (value: T | PromiseLike<T>) => void,
reject: (reason?: unknown) => void,
) => void = () => null,
) {
let _resolve: ((value: T | PromiseLike<T>) => void) | undefined = undefined;
let _reject: ((reason?: unknown) => void) | undefined = undefined;

super((resolve, reject) => {
executor(resolve, reject);
_resolve = resolve;
_reject = reject;
});

assert(_resolve);
assert(_reject);

this._resolve = _resolve;
this._reject = _reject;
}

reject(reason?: unknown): void {
this._state = 'rejected';
this._reject(reason);
}

resolve(value?: T): void {
this._state = 'resolved';
this._resolve(value as T);
}

get isResolved() {
return this._state === 'resolved';
}

get isRejected() {
return this._state === 'rejected';
}

get isPending() {
return this._state === 'pending';
}

static resolved<R = unknown>(value?: R) {
const promise = new CancellationPromise<R>();
promise.resolve(value as R);
return promise;
}

static rejected<R extends Error = Error>(value: R) {
const promise = new CancellationPromise<R>();
promise.reject(value);
return promise;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
import {
EmmettError,
type AnyEvent,
type AnyMessage,
type AsyncRetryOptions,
type CommonRecordedMessageMetadata,
type Event,
type GlobalPositionTypeOfRecordedMessageMetadata,
type Message,
type MessageConsumer,
type ReadEvent,
type RecordedMessage,
} from '@event-driven-io/emmett';
import { ChangeStream, MongoClient, type MongoClientOptions } from 'mongodb';
import { v4 as uuid } from 'uuid';
import type {
MongoDBRecordedMessageMetadata,
ReadEventMetadataWithGlobalPosition,
} from '../event';
import type {
EventStream,
MongoDBReadEventMetadata,
} from '../mongoDBEventStore';
import {
changeStreamReactor,
mongoDBProjector,
type MongoDBProcessor,
type MongoDBProcessorOptions,
type MongoDBProjectorOptions,
} from './mongoDBProcessor';
import {
subscribe as _subscribe,
zipMongoDBMessageBatchPullerStartFrom,
type ChangeStreamFullDocumentValuePolicy,
type MongoDBSubscriptionDocument,
} from './subscriptions';

const noop = () => Promise.resolve();

export type MessageConsumerOptions<
MessageType extends Message = AnyMessage,
MessageMetadataType extends
MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata,
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,

Check failure on line 44 in src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts

View workflow job for this annotation

GitHub Actions / Build application code

'CheckpointType' is defined but never used. Allowed unused vars must match /^_/u
> = {
consumerId?: string;

processors?: MongoDBProcessor<MessageType>[];
};

export type EventStoreDBEventStoreConsumerConfig<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ConsumerMessageType extends Message = any,
> = MessageConsumerOptions<ConsumerMessageType> & {
// from?: any;
pulling?: {
batchSize?: number;
};
resilience?: {
resubscribeOptions?: AsyncRetryOptions;
};
changeStreamFullDocumentPolicy: ChangeStreamFullDocumentValuePolicy;
};

export type MongoDBConsumerOptions<
ConsumerEventType extends Message = Message,
> = EventStoreDBEventStoreConsumerConfig<ConsumerEventType> &
(
| {
connectionString: string;
clientOptions?: MongoClientOptions;
client?: never;
onHandleStart?: (
messages: RecordedMessage<
ConsumerEventType,
ReadEventMetadataWithGlobalPosition
>[],
) => Promise<void>;
onHandleEnd?: (
messages: RecordedMessage<
ConsumerEventType,
ReadEventMetadataWithGlobalPosition
>[],
) => Promise<void>;
}
| {
client: MongoClient;
connectionString?: never;
clientOptions?: never;
onHandleStart?: (
messages: RecordedMessage<
ConsumerEventType,
ReadEventMetadataWithGlobalPosition
>[],
) => Promise<void>;
onHandleEnd?: (
messages: RecordedMessage<
ConsumerEventType,
ReadEventMetadataWithGlobalPosition
>[],
) => Promise<void>;
}
);

export type EventStoreDBEventStoreConsumer<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
ConsumerMessageType extends AnyMessage = any,
> = MessageConsumer<ConsumerMessageType> &
Readonly<{
reactor: <MessageType extends AnyMessage = ConsumerMessageType>(
options: MongoDBProcessorOptions<MessageType>,
) => MongoDBProcessor<MessageType>;
}> &
(AnyEvent extends ConsumerMessageType
? Readonly<{
projector: <
EventType extends AnyEvent = ConsumerMessageType & AnyEvent,
>(
options: MongoDBProjectorOptions<EventType>,
) => MongoDBProcessor<EventType>;
}>
: object);

type MessageArrayElement = `messages.${string}`;
type UpdateDescription<T> = {
updateDescription: {
updatedFields: Record<MessageArrayElement, T> & {
'metadata.streamPosition': number;
'metadata.updatedAt': Date;
};
};
};
type FullDocument<
EventType extends Event = Event,
EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata,
T extends EventStream = EventStream<EventType, EventMetaDataType>,
> = {
fullDocument: T;
};
type OplogChange<
EventType extends Event = Event,
EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata,
T extends EventStream = EventStream<EventType, EventMetaDataType>,
> =
| FullDocument<EventType, EventMetaDataType, T>
| UpdateDescription<ReadEvent<EventType, EventMetaDataType>>;

export const mongoDBEventsConsumer = <
ConsumerMessageType extends Message = AnyMessage,
>(
options: MongoDBConsumerOptions<ConsumerMessageType>,
): EventStoreDBEventStoreConsumer<ConsumerMessageType> => {
let start: Promise<void>;
let stream: ChangeStream<
EventStream<Event, CommonRecordedMessageMetadata>,
MongoDBSubscriptionDocument<
EventStream<Event, CommonRecordedMessageMetadata>
>
>;
let isRunning = false;
const client =
'client' in options && options.client
? options.client
: new MongoClient(options.connectionString, options.clientOptions);
const processors = options.processors ?? [];
const subscribe = _subscribe(
options.changeStreamFullDocumentPolicy,
client.db(),
);
const onHandleStart = options.onHandleStart || noop;
const onHandleEnd = options.onHandleEnd || noop;

return {
consumerId: options.consumerId ?? uuid(),
get isRunning() {
return isRunning;
},
processors,
reactor: <MessageType extends AnyMessage = ConsumerMessageType>(
options: MongoDBProcessorOptions<MessageType>,
): MongoDBProcessor<MessageType> => {
const processor = changeStreamReactor(options);

processors.push(processor as unknown as MongoDBProcessor);

return processor;
},
projector: <EventType extends AnyEvent = ConsumerMessageType & AnyEvent>(
options: MongoDBProjectorOptions<EventType>,
): MongoDBProcessor<EventType> => {
const processor = mongoDBProjector(options);

processors.push(processor as unknown as MongoDBProcessor);

return processor;
},
start: () => {
start = (async () => {
if (processors.length === 0)
return Promise.reject(
new EmmettError(
'Cannot start consumer without at least a single processor',
),
);

isRunning = true;

const positions = await Promise.all(
processors.map((o) => o.start(options)),
);
const startFrom = zipMongoDBMessageBatchPullerStartFrom(positions);

stream = subscribe(
typeof startFrom !== 'string' ? startFrom.lastCheckpoint : void 0,
);
stream.on('change', async (change) => {
const resumeToken = change._id;
const typedChange = change as OplogChange;
const streamChange =
'updateDescription' in typedChange
? {
messages: Object.entries(
typedChange.updateDescription.updatedFields,
)
.filter(([key]) => key.startsWith('messages.'))
.map(([, value]) => value as ReadEvent),
}
: typedChange.fullDocument;

if (!streamChange) {
return;
}

const messages = streamChange.messages.map((message) => {
return {
kind: message.kind,
type: message.type,
data: message.data,
metadata: {
...message.metadata,
streamPosition: resumeToken,
},
} as unknown as RecordedMessage<
ConsumerMessageType,
ReadEventMetadataWithGlobalPosition
>;
});

await onHandleStart(messages);

for (const processor of processors.filter(
({ isActive }) => isActive,
)) {
await processor.handle(messages, { client });
}

await onHandleEnd(messages);
});
})();

return start;
},
stop: async () => {
return Promise.resolve();
},
close: async () => {
await stream.close();
},
};
};
Loading
Loading