Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8c6af5c
feat: mongodb processor and subscription
arturwojnar Jul 9, 2025
bb80bf3
feat: receiving updates on messages
arturwojnar Jul 31, 2025
bb1ebfe
refactor: removed the esdb copy-paste leftovers
arturwojnar Aug 6, 2025
ae79330
fix: revert the processors type back to generic MessageProcessor
arturwojnar Aug 6, 2025
753103c
fix: removed onHandleStart
arturwojnar Aug 6, 2025
592a866
refactor: to mongoDbEventsConsumer renamed to mongoDBMessagesConsumer
arturwojnar Aug 6, 2025
30fa044
feat: databaseName as parameter for the readProcessorCheckpoint
arturwojnar Aug 6, 2025
2de9d86
test: storeProcessorCheckpoint and readProcessorCheckpoint tests
arturwojnar Aug 9, 2025
fc53348
refactor: storeProcessorCheckpoint
arturwojnar Aug 9, 2025
847580d
chore: eslint fix
arturwojnar Aug 9, 2025
fa8cb43
feat: handling an unknown Position
arturwojnar Aug 9, 2025
4c03a71
feat: starting from the earliest position
arturwojnar Aug 10, 2025
e4d5904
fix: eslint all fixed
arturwojnar Aug 11, 2025
ead380c
feat: processing messages one by one
arturwojnar Aug 15, 2025
13afb1e
fix: removed incorrect change
arturwojnar Aug 15, 2025
88d8724
Merge remote-tracking branch 'upstream/main' into feat/subscriptions
arturwojnar Aug 21, 2025
f669ab4
test: fix
arturwojnar Sep 11, 2025
f81bf91
test: tests, eslint, ts fixed
arturwojnar Sep 17, 2025
754afd3
Added default partition value instead of null in MongoDB checkpoints
oskardudycz Nov 8, 2025
f3d2317
Added closing of MongoClient in consumer if it was created internally…
oskardudycz Nov 8, 2025
b89fb91
Small adjustments to assertions in MongoDB tests
oskardudycz Nov 8, 2025
209b5f8
Moved reading database policies to subscription
oskardudycz Nov 8, 2025
278a7bf
Removed redundant message types from MongoDB processors
oskardudycz Nov 8, 2025
507a345
Made MongoDB checkpointers to be aligned with others
oskardudycz Nov 8, 2025
4bebdd1
Merged mongodb checkpoints into one file added more tests
oskardudycz Nov 8, 2025
b28cca3
Added check if mongodb subscription wasn't already stopped when starting
oskardudycz Nov 8, 2025
06ced8d
Fixed mongodb unavailable error detection
oskardudycz Nov 8, 2025
1a0a14c
Used URN like MongoDB checkpoint with message position
oskardudycz Nov 9, 2025
d5626b8
Renamed mongoDBEventStoreConsumer file
oskardudycz Nov 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,6 @@ export const eventStoreDBEventStoreConsumer = <
return start;
},
stop,
close: async () => {
await stop();
},
close: stop,
};
};
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ export const eventStoreDBSubscription = <
return asyncRetry(
() =>
new Promise<void>((resolve, reject) => {
if (!isRunning) {
resolve();
return;
}
console.info(
`Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify(
options.startFrom,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import assert from 'assert';

export class CancellationPromise<T = unknown> extends Promise<T> {
private _resolve: (value: T | PromiseLike<T>) => void;
private _reject: (reason?: unknown) => void;
private _state: 'resolved' | 'rejected' | 'pending' = 'pending';

constructor(
executor: (
resolve: (value: T | PromiseLike<T>) => void,
reject: (reason?: unknown) => void,
) => void = () => null,
) {
let _resolve: ((value: T | PromiseLike<T>) => void) | undefined = undefined;
let _reject: ((reason?: unknown) => void) | undefined = undefined;

super((resolve, reject) => {
executor(resolve, reject);
_resolve = resolve;
_reject = reject;
});

assert(_resolve);
assert(_reject);

this._resolve = _resolve;
this._reject = _reject;
}

reject(reason?: unknown): void {
this._state = 'rejected';
this._reject(reason);
}

resolve(value?: T): void {
this._state = 'resolved';
this._resolve(value as T);
}

get isResolved() {
return this._state === 'resolved';
}

get isRejected() {
return this._state === 'rejected';
}

get isPending() {
return this._state === 'pending';
}

static resolved<R = unknown>(value?: R) {
const promise = new CancellationPromise<R>();
promise.resolve(value as R);
return promise;
}

static rejected<R extends Error = Error>(value: R) {
const promise = new CancellationPromise<R>();
promise.reject(value);
return promise;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import {
type Message,
type ReadProcessorCheckpointResult,
getCheckpoint,
} from '@event-driven-io/emmett';
import type { MongoClient } from 'mongodb';
import type { MongoDBCheckpointer } from './mongoDBProcessor';
import { compareTwoTokens } from './subscriptions';
import { DefaultProcessotCheckpointCollectionName, defaultTag } from './types';

export const mongoDBCheckpointer = <
MessageType extends Message = Message,
>(): MongoDBCheckpointer<MessageType> => ({
read: async (options, context) => {
const result = await readProcessorCheckpoint(context.client, options);

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
return { lastCheckpoint: result?.lastCheckpoint };
},
store: async (options, context) => {
const newCheckpoint = getCheckpoint(options.message);

const result = await storeProcessorCheckpoint(context.client, {
lastStoredCheckpoint: options.lastCheckpoint,
newCheckpoint,
processorId: options.processorId,
partition: options.partition,
version: options.version || 0,
});

return result.success
? { success: true, newCheckpoint: result.newCheckpoint }
: result;
},
});

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type ReadProcessorCheckpointMongoDBResult<Position = any> = {
lastProcessedCheckpoint: Position;
processorId: string;
partitionId: string;
version: number;
};

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const readProcessorCheckpoint = async <CheckpointType = any>(
client: MongoClient,
options: {
processorId: string;
partition?: string;
collectionName?: string;
databaseName?: string;
},
): Promise<ReadProcessorCheckpointResult<CheckpointType>> => {
const result = await client
.db(options.databaseName)
.collection<ReadProcessorCheckpointMongoDBResult<CheckpointType>>(
options.collectionName || DefaultProcessotCheckpointCollectionName,
)
.findOne({
processorId: options.processorId,
partitionId: options.partition || defaultTag,
});

return {
lastCheckpoint: result !== null ? result.lastProcessedCheckpoint : null,
};
};

type StoreLastProcessedProcessorPositionResult<Position = unknown> =
| {
success: true;
newCheckpoint: Position;
}
| { success: false; reason: 'IGNORED' | 'MISMATCH' };

export const storeProcessorCheckpoint = async <Position>(
client: MongoClient,
{
processorId,
version,
newCheckpoint,
lastStoredCheckpoint,
partition,
collectionName,
dbName,
}: {
processorId: string;
version: number;
newCheckpoint: Position | null;
lastStoredCheckpoint: Position | null;
partition?: string;
collectionName?: string;
dbName?: string;
},
): Promise<
StoreLastProcessedProcessorPositionResult<
null extends Position ? Position | null : Position
>
> => {
const checkpoints = client
.db(dbName)
.collection<ReadProcessorCheckpointMongoDBResult>(
collectionName || DefaultProcessotCheckpointCollectionName,
);

const filter = {
processorId: processorId,
partitionId: partition || defaultTag,
};

const current = await checkpoints.findOne(filter);

// MISMATCH: we have a checkpoint but lastProcessedCheckpoint doesn’t match
if (
current &&
compareTwoTokens(current.lastProcessedCheckpoint, lastStoredCheckpoint) !==
0
) {
return { success: false, reason: 'MISMATCH' };
}

// IGNORED: same or earlier position
if (current?.lastProcessedCheckpoint && newCheckpoint) {
if (
compareTwoTokens(current.lastProcessedCheckpoint, newCheckpoint) !== -1
) {
return { success: false, reason: 'IGNORED' };
}
}

const updateResult = await checkpoints.updateOne(
{ ...filter, lastProcessedCheckpoint: lastStoredCheckpoint },
{ $set: { lastProcessedCheckpoint: newCheckpoint, version } },
{ upsert: true },
);

if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) {
return { success: true, newCheckpoint: newCheckpoint! };
}

return { success: false, reason: 'MISMATCH' };
};
Loading