diff --git a/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts b/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts index 7019768b..b959bb8e 100644 --- a/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts +++ b/src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts @@ -217,8 +217,6 @@ export const eventStoreDBEventStoreConsumer = < return start; }, stop, - close: async () => { - await stop(); - }, + close: stop, }; }; diff --git a/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts index 709effbb..6390fa7c 100644 --- a/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts +++ b/src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts @@ -208,6 +208,10 @@ export const eventStoreDBSubscription = < return asyncRetry( () => new Promise((resolve, reject) => { + if (!isRunning) { + resolve(); + return; + } console.info( `Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify( options.startFrom, diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts new file mode 100644 index 00000000..74204424 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/CancellablePromise.ts @@ -0,0 +1,63 @@ +import assert from 'assert'; + +export class CancellationPromise extends Promise { + private _resolve: (value: T | PromiseLike) => void; + private _reject: (reason?: unknown) => void; + private _state: 'resolved' | 'rejected' | 'pending' = 'pending'; + + constructor( + executor: ( + resolve: (value: T | PromiseLike) => void, + reject: (reason?: unknown) => void, + ) => void = () => null, + ) { + let _resolve: ((value: T | PromiseLike) => void) | undefined = undefined; + let _reject: ((reason?: unknown) => void) | undefined = undefined; + + super((resolve, reject) => { + executor(resolve, reject); + _resolve = resolve; + _reject = reject; + }); + + assert(_resolve); + assert(_reject); + + this._resolve = _resolve; + this._reject = _reject; + } + + reject(reason?: unknown): void { + this._state = 'rejected'; + this._reject(reason); + } + + resolve(value?: T): void { + this._state = 'resolved'; + this._resolve(value as T); + } + + get isResolved() { + return this._state === 'resolved'; + } + + get isRejected() { + return this._state === 'rejected'; + } + + get isPending() { + return this._state === 'pending'; + } + + static resolved(value?: R) { + const promise = new CancellationPromise(); + promise.resolve(value as R); + return promise; + } + + static rejected(value: R) { + const promise = new CancellationPromise(); + promise.reject(value); + return promise; + } +} diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBCheckpointer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBCheckpointer.ts new file mode 100644 index 00000000..c5cbc5bd --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBCheckpointer.ts @@ -0,0 +1,143 @@ +import { + type Message, + type ReadProcessorCheckpointResult, + getCheckpoint, +} from '@event-driven-io/emmett'; +import type { MongoClient } from 'mongodb'; +import type { MongoDBCheckpointer } from './mongoDBProcessor'; +import { compareTwoTokens } from './subscriptions'; +import { DefaultProcessotCheckpointCollectionName, defaultTag } from './types'; + +export const mongoDBCheckpointer = < + MessageType extends Message = Message, +>(): MongoDBCheckpointer => ({ + read: async (options, context) => { + const result = await readProcessorCheckpoint(context.client, options); + + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment + return { lastCheckpoint: result?.lastCheckpoint }; + }, + store: async (options, context) => { + const newCheckpoint = getCheckpoint(options.message); + + const result = await storeProcessorCheckpoint(context.client, { + lastStoredCheckpoint: options.lastCheckpoint, + newCheckpoint, + processorId: options.processorId, + partition: options.partition, + version: options.version || 0, + }); + + return result.success + ? { success: true, newCheckpoint: result.newCheckpoint } + : result; + }, +}); + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type ReadProcessorCheckpointMongoDBResult = { + lastProcessedCheckpoint: Position; + processorId: string; + partitionId: string; + version: number; +}; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const readProcessorCheckpoint = async ( + client: MongoClient, + options: { + processorId: string; + partition?: string; + collectionName?: string; + databaseName?: string; + }, +): Promise> => { + const result = await client + .db(options.databaseName) + .collection>( + options.collectionName || DefaultProcessotCheckpointCollectionName, + ) + .findOne({ + processorId: options.processorId, + partitionId: options.partition || defaultTag, + }); + + return { + lastCheckpoint: result !== null ? result.lastProcessedCheckpoint : null, + }; +}; + +type StoreLastProcessedProcessorPositionResult = + | { + success: true; + newCheckpoint: Position; + } + | { success: false; reason: 'IGNORED' | 'MISMATCH' }; + +export const storeProcessorCheckpoint = async ( + client: MongoClient, + { + processorId, + version, + newCheckpoint, + lastStoredCheckpoint, + partition, + collectionName, + dbName, + }: { + processorId: string; + version: number; + newCheckpoint: Position | null; + lastStoredCheckpoint: Position | null; + partition?: string; + collectionName?: string; + dbName?: string; + }, +): Promise< + StoreLastProcessedProcessorPositionResult< + null extends Position ? Position | null : Position + > +> => { + const checkpoints = client + .db(dbName) + .collection( + collectionName || DefaultProcessotCheckpointCollectionName, + ); + + const filter = { + processorId: processorId, + partitionId: partition || defaultTag, + }; + + const current = await checkpoints.findOne(filter); + + // MISMATCH: we have a checkpoint but lastProcessedCheckpoint doesn’t match + if ( + current && + compareTwoTokens(current.lastProcessedCheckpoint, lastStoredCheckpoint) !== + 0 + ) { + return { success: false, reason: 'MISMATCH' }; + } + + // IGNORED: same or earlier position + if (current?.lastProcessedCheckpoint && newCheckpoint) { + if ( + compareTwoTokens(current.lastProcessedCheckpoint, newCheckpoint) !== -1 + ) { + return { success: false, reason: 'IGNORED' }; + } + } + + const updateResult = await checkpoints.updateOne( + { ...filter, lastProcessedCheckpoint: lastStoredCheckpoint }, + { $set: { lastProcessedCheckpoint: newCheckpoint, version } }, + { upsert: true }, + ); + + if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) { + return { success: true, newCheckpoint: newCheckpoint! }; + } + + return { success: false, reason: 'MISMATCH' }; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStore.subscription.e2e.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStore.subscription.e2e.spec.ts new file mode 100644 index 00000000..3a85dd8f --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStore.subscription.e2e.spec.ts @@ -0,0 +1,237 @@ +import { + assertDefined, + assertEqual, + assertIsNotNull, + assertNotEqual, + assertOk, + assertTrue, + STREAM_DOES_NOT_EXIST, +} from '@event-driven-io/emmett'; +import { + MongoDBContainer, + type StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { MongoClient, type Collection } from 'mongodb'; +import { after, before, describe, it } from 'node:test'; +import { v4 as uuid, v4 } from 'uuid'; +import { + getMongoDBEventStore, + toStreamCollectionName, + toStreamName, + type EventStream, + type MongoDBEventStore, +} from '..'; +import { + type PricedProductItem, + type ProductItemAdded, + type ShoppingCartEvent, +} from '../../testing'; +import { CancellationPromise } from './CancellablePromise'; +import { + mongoDBEventStoreConsumer, + type MongoDBEventStoreConsumer, +} from './mongoDBEventStoreConsumer'; +import type { MongoDBProcessor } from './mongoDBProcessor'; +import { compareTwoMongoDBCheckpoints } from './subscriptions'; +import type { MongoDBCheckpoint } from './subscriptions/mongoDBCheckpoint'; + +const withDeadline = { timeout: 30000 }; + +void describe('MongoDBEventStore subscription', () => { + let mongodb: StartedMongoDBContainer; + let eventStore: MongoDBEventStore; + let client: MongoClient; + let collection: Collection; + let consumer: MongoDBEventStoreConsumer; + let processor: MongoDBProcessor | undefined; + let lastResumeToken: MongoDBCheckpoint | null = null; + + const messageProcessingPromise1 = new CancellationPromise(); + const messageProcessingPromise2 = new CancellationPromise(); + const lastProductItemIdTest1 = '789'; + const lastProductItemIdTest2 = '999'; + const expectedProductItemIds = [ + '123', + '456', + lastProductItemIdTest1, + lastProductItemIdTest2, + ] as const; + + const shoppingCartId = uuid(); + const streamType = 'shopping_cart'; + const streamName = toStreamName(streamType, shoppingCartId); + const noop = () => {}; + const productItem = (productId: string) => + ({ + productId, + quantity: 10, + price: 3, + }) as PricedProductItem; + + before(async () => { + mongodb = await new MongoDBContainer('mongo:8.0.10').start(); + client = new MongoClient(mongodb.getConnectionString(), { + directConnection: true, + }); + + await client.connect(); + const db = client.db(); + collection = db.collection( + toStreamCollectionName('shopping_cart'), + ); + + eventStore = getMongoDBEventStore({ + client, + }); + + consumer = mongoDBEventStoreConsumer({ + client, + }); + }); + + after(async () => { + if (consumer) { + await consumer.close(); + } + await client.close(); + await mongodb.stop(); + }); + + void it( + 'should react to new events added by the appendToStream', + withDeadline, + async () => { + let receivedMessageCount: 0 | 1 | 2 = 0; + + processor = consumer.reactor({ + processorId: v4(), + stopAfter: (event) => { + if (event.data.productItem.productId === lastProductItemIdTest1) { + messageProcessingPromise1.resolve(); + consumer.stop().catch(noop); + } + if (event.data.productItem.productId === lastProductItemIdTest2) { + messageProcessingPromise2.resolve(); + consumer.stop().catch(noop); + } + + return ( + event.data.productItem.productId === lastProductItemIdTest1 || + event.data.productItem.productId === lastProductItemIdTest2 + ); + }, + eachMessage: (event) => { + lastResumeToken = event.metadata.globalPosition; + + assertTrue(receivedMessageCount <= 3); + assertEqual( + expectedProductItemIds[receivedMessageCount], + event.data.productItem.productId, + ); + + receivedMessageCount++; + }, + connectionOptions: { + client, + }, + }); + + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[0]) }, + }, + ], + { expectedStreamVersion: STREAM_DOES_NOT_EXIST }, + ); + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[1]) }, + }, + ], + { expectedStreamVersion: 1n }, + ); + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[2]) }, + }, + ], + { expectedStreamVersion: 2n }, + ); + + await consumer.start(); + + const stream = await collection.findOne( + { streamName }, + { useBigInt64: true }, + ); + + assertIsNotNull(stream); + assertEqual(3n, stream.metadata.streamPosition); + assertEqual(shoppingCartId, stream.metadata.streamId); + assertEqual(streamType, stream.metadata.streamType); + assertTrue(stream.metadata.createdAt instanceof Date); + assertTrue(stream.metadata.updatedAt instanceof Date); + }, + ); + + void it('should renew after the last event', withDeadline, async () => { + assertOk(processor); + + let stream = await collection.findOne( + { streamName }, + { useBigInt64: true }, + ); + assertIsNotNull(stream); + assertEqual(3n, stream.metadata.streamPosition); + + const position = await processor.start({ client }); + + assertOk(position); + assertNotEqual(typeof position, 'string'); + assertDefined(typeof position !== 'string'); + + // processor after restart is renewed after the 3rd position. + assertEqual( + 0, + compareTwoMongoDBCheckpoints(position.lastCheckpoint, lastResumeToken!), + ); + + const consumerPromise = consumer.start(); + + await new Promise((resolve) => setTimeout(resolve, 1000)); + + await eventStore.appendToStream( + streamName, + [ + { + type: 'ProductItemAdded', + data: { productItem: productItem(expectedProductItemIds[3]) }, + }, + ], + { expectedStreamVersion: 3n }, + ); + + await consumerPromise; + + stream = await collection.findOne({ streamName }, { useBigInt64: true }); + assertIsNotNull(stream); + assertEqual(4n, stream.metadata.streamPosition); + + // lastResumeToken has changed after the last message + assertEqual( + 1, + compareTwoMongoDBCheckpoints(lastResumeToken!, position.lastCheckpoint), + ); + + await consumer.stop(); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.handling.int.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.handling.int.spec.ts new file mode 100644 index 00000000..9737d161 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.handling.int.spec.ts @@ -0,0 +1,654 @@ +import { + assertThatArray, + delay, + inMemoryReactor, + type Closeable, + type Event, +} from '@event-driven-io/emmett'; +import { + MongoDBContainer, + StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { after, before, describe, it } from 'node:test'; +import { v4 as uuid } from 'uuid'; +import { + getMongoDBEventStore, + type MongoDBEventStore, +} from '../mongoDBEventStore'; +import { mongoDBEventStoreConsumer } from './mongoDBEventStoreConsumer'; + +const withDeadline = { timeout: 1000000 }; + +void describe('EventStoreDB event store started consumer', () => { + let mongoDB: StartedMongoDBContainer; + let connectionString: string; + let eventStore: MongoDBEventStore & Closeable; + //const database = getInMemoryDatabase(); + + before(async () => { + mongoDB = await new MongoDBContainer('mongo:6.0.1').start(); + connectionString = mongoDB.getConnectionString(); + eventStore = getMongoDBEventStore({ + connectionString: mongoDB.getConnectionString(), + clientOptions: { directConnection: true }, + }); + }); + + after(async () => { + try { + await eventStore.close(); + await mongoDB.stop(); + } catch (error) { + console.log(error); + } + }); + + void describe('eachMessage', () => { + void it(`handles events SEQUENTIALLY`, { timeout: 15000 }, async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${otherGuestId}`; + const otherStreamName = `guestStay-${guestId}`; + const events: NumberRecorded[] = [ + { type: 'NumberRecorded', data: { number: 1 } }, + { type: 'NumberRecorded', data: { number: 2 } }, + { type: 'NumberRecorded', data: { number: 3 } }, + { type: 'NumberRecorded', data: { number: 4 } }, + { type: 'NumberRecorded', data: { number: 5 } }, + ]; + const appendResult = await eventStore.appendToStream(streamName, events); + await eventStore.appendToStream(otherStreamName, events); + + const result: NumberRecorded[] = []; + + // When + const consumer = mongoDBEventStoreConsumer({ + connectionString, + processors: [ + inMemoryReactor({ + processorId: uuid(), + stopAfter: (event) => + event.metadata.streamName === streamName && + event.metadata.streamPosition === + appendResult.nextExpectedStreamVersion, + eachMessage: async (event) => { + await delay(Math.floor(Math.random() * 200)); + + result.push(event); + }, + }), + ], + clientOptions: { directConnection: true }, + }); + + try { + await consumer.start(); + + assertThatArray( + result.map((e) => e.data.number), + ).containsElementsMatching(events.map((e) => e.data.number)); + } finally { + await consumer.close(); + } + }); + + void it( + `stops processing on unhandled error in handler`, + { timeout: 1500000 }, + async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${otherGuestId}`; + const otherStreamName = `guestStay-${guestId}`; + const events: NumberRecorded[] = [ + { type: 'NumberRecorded', data: { number: 1 } }, + { type: 'NumberRecorded', data: { number: 2 } }, + { type: 'NumberRecorded', data: { number: 3 } }, + { type: 'NumberRecorded', data: { number: 4 } }, + { type: 'NumberRecorded', data: { number: 5 } }, + { type: 'NumberRecorded', data: { number: 6 } }, + { type: 'NumberRecorded', data: { number: 7 } }, + { type: 'NumberRecorded', data: { number: 8 } }, + { type: 'NumberRecorded', data: { number: 9 } }, + { type: 'NumberRecorded', data: { number: 10 } }, + ]; + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + await eventStore.appendToStream(otherStreamName, events); + + const result: NumberRecorded[] = []; + + let shouldThrowRandomError = false; + + // When + const consumer = mongoDBEventStoreConsumer({ + connectionString, + processors: [ + inMemoryReactor({ + processorId: uuid(), + stopAfter: (event) => + event.metadata.streamName === streamName && + event.metadata.streamPosition === + appendResult.nextExpectedStreamVersion, + eachMessage: (event) => { + if (shouldThrowRandomError) { + return Promise.reject(new Error('Random error')); + } + + result.push(event); + + shouldThrowRandomError = !shouldThrowRandomError; + return Promise.resolve(); + }, + }), + ], + clientOptions: { directConnection: true }, + }); + + try { + await consumer.start(); + + assertThatArray(result.map((e) => e.data.number)).containsExactly(1); + } finally { + await consumer.close(); + } + }, + ); + + void it(`handles all events`, withDeadline, async () => { + // Given + const guestId = uuid(); + const otherGuestId = uuid(); + const streamName = `guestStay-${otherGuestId}`; + const otherStreamName = `guestStay-${guestId}`; + const events: GuestStayEvent[] = [ + { type: 'GuestCheckedIn', data: { guestId } }, + { type: 'GuestCheckedOut', data: { guestId } }, + ]; + await eventStore.appendToStream(streamName, events); + const appendResult = await eventStore.appendToStream( + otherStreamName, + events, + ); + + const result: GuestStayEvent[] = []; + + // When + const consumer = mongoDBEventStoreConsumer({ + connectionString, + processors: [ + inMemoryReactor({ + processorId: uuid(), + stopAfter: (event) => + event.metadata.streamName === otherStreamName && + event.metadata.streamPosition === + appendResult.nextExpectedStreamVersion, + eachMessage: (event) => { + if ( + event.metadata.streamName === streamName || + event.metadata.streamName === otherStreamName + ) + result.push(event); + }, + }), + ], + clientOptions: { directConnection: true }, + }); + + try { + await consumer.start(); + + assertThatArray(result).hasSize(events.length * 2); + + assertThatArray(result).containsElementsMatching([ + ...events, + ...events, + ]); + } finally { + await consumer.close(); + } + }); + + // void it( + // `handles ONLY events from stream AFTER provided global position`, + // withDeadline, + // async () => { + // // Given + // const guestId = uuid(); + // const otherGuestId = uuid(); + // const streamName = `guestStay-${guestId}`; + + // const initialEvents: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId } }, + // { type: 'GuestCheckedOut', data: { guestId } }, + // ]; + // const { nextExpectedStreamVersion: startPosition } = + // await eventStore.appendToStream(streamName, initialEvents); + + // const events: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + // { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + // ]; + + // const result: GuestStayEvent[] = []; + // let stopAfterPosition: bigint | undefined = undefined; + + // // When + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // from: { stream: streamName }, + // }); + // consumer.reactor({ + // processorId: uuid(), + // startFrom: { lastCheckpoint: startPosition }, + // stopAfter: (event) => + // event.metadata.streamPosition === stopAfterPosition, + // eachMessage: (event) => { + // result.push(event); + // }, + // }); + + // try { + // const consumerPromise = consumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // stopAfterPosition = appendResult.nextExpectedStreamVersion; + + // await consumerPromise; + + // assertThatArray(result).containsOnlyElementsMatching(events); + // } finally { + // await consumer.close(); + // } + // }, + // ); + + // void it( + // `handles ONLY events from $all AFTER provided global position`, + // withDeadline, + // async () => { + // // Given + // const guestId = uuid(); + // const otherGuestId = uuid(); + // const streamName = `guestStay-${guestId}`; + + // const initialEvents: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId } }, + // { type: 'GuestCheckedOut', data: { guestId } }, + // ]; + // const { lastEventGlobalPosition: startPosition } = + // await eventStore.appendToStream(streamName, initialEvents); + + // const events: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + // { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + // ]; + + // const result: GuestStayEvent[] = []; + // let stopAfterPosition: bigint | undefined = undefined; + + // // When + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // from: { stream: $all }, + // }); + // consumer.reactor({ + // processorId: uuid(), + // startFrom: { lastCheckpoint: startPosition }, + // stopAfter: (event) => + // event.metadata.globalPosition === stopAfterPosition, + // eachMessage: (event) => { + // result.push(event); + // }, + // }); + + // try { + // const consumerPromise = consumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // stopAfterPosition = appendResult.lastEventGlobalPosition; + + // await consumerPromise; + + // assertThatArray(result).containsOnlyElementsMatching(events); + // } finally { + // await consumer.close(); + // } + // }, + // ); + + // consumeFrom.forEach(([displayName, from]) => { + // void it( + // `handles all events from ${displayName} appended to event store BEFORE processor was started`, + // withDeadline, + // async () => { + // // Given + // const guestId = uuid(); + // const streamName = `guestStay-${guestId}`; + // const events: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId } }, + // { type: 'GuestCheckedOut', data: { guestId } }, + // ]; + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + + // const result: GuestStayEvent[] = []; + + // // When + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // from: from(streamName), + // }); + // consumer.reactor({ + // processorId: uuid(), + // stopAfter: (event) => + // event.metadata.globalPosition === + // appendResult.lastEventGlobalPosition, + // eachMessage: (event) => { + // result.push(event); + // }, + // }); + + // try { + // await consumer.start(); + + // assertThatArray(result).containsElementsMatching(events); + // } finally { + // await consumer.close(); + // } + // }, + // ); + + // void it( + // `handles all events from ${displayName} appended to event store AFTER processor was started`, + // withDeadline, + // async () => { + // // Given + + // const result: GuestStayEvent[] = []; + // let stopAfterPosition: bigint | undefined = undefined; + + // const guestId = uuid(); + // const streamName = `guestStay-${guestId}`; + // const waitForStart = asyncAwaiter(); + + // // When + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // from: from(streamName), + // }); + // consumer.reactor({ + // processorId: uuid(), + // stopAfter: (event) => + // event.metadata.globalPosition === stopAfterPosition, + // eachMessage: async (event) => { + // await waitForStart.wait; + // result.push(event); + // }, + // }); + + // const events: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId } }, + // { type: 'GuestCheckedOut', data: { guestId } }, + // ]; + + // try { + // const consumerPromise = consumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // stopAfterPosition = appendResult.lastEventGlobalPosition; + // waitForStart.resolve(); + + // await consumerPromise; + + // assertThatArray(result).containsElementsMatching(events); + // } finally { + // await consumer.close(); + // } + // }, + // ); + + // void it( + // `handles all events from ${displayName} when CURRENT position is NOT stored`, + // withDeadline, + // async () => { + // // Given + // const guestId = uuid(); + // const otherGuestId = uuid(); + // const streamName = `guestStay-${guestId}`; + + // const initialEvents: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId } }, + // { type: 'GuestCheckedOut', data: { guestId } }, + // ]; + + // await eventStore.appendToStream(streamName, initialEvents); + + // const events: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + // { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + // ]; + + // const result: GuestStayEvent[] = []; + // let stopAfterPosition: bigint | undefined = undefined; + // const waitForStart = asyncAwaiter(); + + // // When + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // from: from(streamName), + // }); + // consumer.reactor({ + // processorId: uuid(), + // startFrom: 'CURRENT', + // stopAfter: (event) => + // event.metadata.globalPosition === stopAfterPosition, + // eachMessage: async (event) => { + // await waitForStart.wait; + // result.push(event); + // }, + // }); + + // try { + // const consumerPromise = consumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // stopAfterPosition = appendResult.lastEventGlobalPosition; + // waitForStart.resolve(); + + // await consumerPromise; + + // assertThatArray(result).containsElementsMatching([ + // ...initialEvents, + // ...events, + // ]); + // } finally { + // await consumer.close(); + // } + // }, + // ); + + // void it( + // `handles only new events when CURRENT position is stored for restarted consumer from ${displayName}`, + // withDeadline, + // async () => { + // // Given + // const guestId = uuid(); + // const otherGuestId = uuid(); + // const streamName = `guestStay-${guestId}`; + + // const initialEvents: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId } }, + // { type: 'GuestCheckedOut', data: { guestId } }, + // ]; + // const { lastEventGlobalPosition } = await eventStore.appendToStream( + // streamName, + // initialEvents, + // ); + + // const events: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + // { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + // ]; + + // let result: GuestStayEvent[] = []; + // let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; + + // const waitForStart = asyncAwaiter(); + + // // When + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // from: from(streamName), + // }); + // consumer.reactor({ + // processorId: uuid(), + // startFrom: 'CURRENT', + // stopAfter: (event) => + // event.metadata.globalPosition === stopAfterPosition, + // eachMessage: async (event) => { + // await waitForStart.wait; + // result.push(event); + // }, + // }); + + // let consumerPromise = consumer.start(); + // waitForStart.resolve(); + // await consumerPromise; + // await consumer.stop(); + + // waitForStart.reset(); + + // result = []; + + // stopAfterPosition = undefined; + + // try { + // consumerPromise = consumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // stopAfterPosition = appendResult.lastEventGlobalPosition; + // waitForStart.resolve(); + + // await consumerPromise; + + // assertThatArray(result).containsOnlyElementsMatching(events); + // } finally { + // await consumer.close(); + // } + // }, + // ); + + // void it( + // `handles only new events when CURRENT position is stored for a new consumer from ${displayName}`, + // withDeadline, + // async () => { + // // Given + // const guestId = uuid(); + // const otherGuestId = uuid(); + // const streamName = `guestStay-${guestId}`; + + // const initialEvents: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId } }, + // { type: 'GuestCheckedOut', data: { guestId } }, + // ]; + // const { lastEventGlobalPosition } = await eventStore.appendToStream( + // streamName, + // initialEvents, + // ); + + // const events: GuestStayEvent[] = [ + // { type: 'GuestCheckedIn', data: { guestId: otherGuestId } }, + // { type: 'GuestCheckedOut', data: { guestId: otherGuestId } }, + // ]; + + // let result: GuestStayEvent[] = []; + // let stopAfterPosition: bigint | undefined = lastEventGlobalPosition; + + // const waitForStart = asyncAwaiter(); + // const processorOptions: InMemoryReactorOptions = { + // processorId: uuid(), + // startFrom: 'CURRENT', + // stopAfter: (event) => + // event.metadata.globalPosition === stopAfterPosition, + // eachMessage: async (event) => { + // await waitForStart.wait; + // result.push(event); + // }, + // connectionOptions: { database }, + // }; + + // // When + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // from: from(streamName), + // }); + // try { + // consumer.reactor(processorOptions); + + // waitForStart.resolve(); + // await consumer.start(); + // } finally { + // await consumer.close(); + // } + + // result = []; + + // waitForStart.reset(); + // stopAfterPosition = undefined; + + // const newConsumer = mongoDBEventStoreConsumer({ + // connectionString, + // from: from(streamName), + // }); + // newConsumer.reactor(processorOptions); + + // try { + // const consumerPromise = newConsumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // waitForStart.resolve(); + // stopAfterPosition = appendResult.lastEventGlobalPosition; + + // await consumerPromise; + + // assertThatArray(result).containsOnlyElementsMatching(events); + // } finally { + // await newConsumer.close(); + // } + // }, + // ); + // }); + }); +}); + +type GuestCheckedIn = Event<'GuestCheckedIn', { guestId: string }>; +type GuestCheckedOut = Event<'GuestCheckedOut', { guestId: string }>; + +type GuestStayEvent = GuestCheckedIn | GuestCheckedOut; + +type NumberRecorded = Event<'NumberRecorded', { number: number }>; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.inMemory.projections.int.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.inMemory.projections.int.spec.ts new file mode 100644 index 00000000..ac5050ca --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.inMemory.projections.int.spec.ts @@ -0,0 +1,501 @@ +import { + assertMatches, + getInMemoryDatabase, + inMemoryProjector, + inMemorySingleStreamProjection, + type Closeable, + type InMemoryDocumentsCollection, + type ReadEvent, +} from '@event-driven-io/emmett'; +import { + MongoDBContainer, + type StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { after, before, describe, it } from 'node:test'; +import { v4 as uuid } from 'uuid'; +import type { + ProductItemAdded, + ShoppingCartConfirmed, +} from '../../testing/shoppingCart.domain'; +import { + getMongoDBEventStore, + type MongoDBEventStore, +} from '../mongoDBEventStore'; +import { mongoDBEventStoreConsumer } from './mongoDBEventStoreConsumer'; + +const withDeadline = { timeout: 30000 }; + +void describe('mongoDB event store started consumer', () => { + let mongoDB: StartedMongoDBContainer; + let connectionString: string; + let eventStore: MongoDBEventStore & Closeable; + let summaries: InMemoryDocumentsCollection; + const productItem = { price: 10, productId: uuid(), quantity: 10 }; + const confirmedAt = new Date(); + const database = getInMemoryDatabase(); + + before(async () => { + mongoDB = await new MongoDBContainer('mongo:6.0.1').start(); + connectionString = mongoDB.getConnectionString(); + eventStore = getMongoDBEventStore({ + connectionString: mongoDB.getConnectionString(), + clientOptions: { directConnection: true }, + }); + summaries = database.collection(shoppingCartsSummaryCollectionName); + }); + + after(async () => { + try { + await eventStore.close(); + await mongoDB.stop(); + } catch (error) { + console.log(error); + } + }); + + void describe('eachMessage', () => { + void it( + 'handles all events appended to event store BEFORE projector was started', + withDeadline, + async () => { + // Given + const shoppingCartId = `shoppingCart:${uuid()}`; + const streamName = `shopping_cart-${shoppingCartId}`; + const events: ShoppingCartSummaryEvent[] = [ + { type: 'ProductItemAdded', data: { productItem } }, + { type: 'ShoppingCartConfirmed', data: { confirmedAt } }, + ]; + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + + const inMemoryProcessor = inMemoryProjector({ + processorId: uuid(), + projection: shoppingCartsSummaryProjection, + connectionOptions: { database }, + stopAfter: (event) => + event.metadata.streamName === streamName && + event.metadata.streamPosition === + appendResult.nextExpectedStreamVersion, + }); + + // When + const consumer = mongoDBEventStoreConsumer({ + connectionString, + clientOptions: { directConnection: true }, + processors: [inMemoryProcessor], + }); + + try { + await consumer.start(); + + const summary = await summaries.findOne((d) => d._id === streamName); + + assertMatches(summary, { + _id: streamName, + status: 'confirmed', + // TODO: ensure that _version and _id works like in Pongo + //_version: 2n, + productItemsCount: productItem.quantity, + }); + } finally { + await consumer.close(); + } + }, + ); + + void it( + 'handles all events appended to event store AFTER projector was started', + withDeadline, + async () => { + // Given + const shoppingCartId = `shoppingCart:${uuid()}`; + const streamName = `shopping_cart-${shoppingCartId}`; + let stopAfterPosition: bigint | undefined = undefined; + + const inMemoryProcessor = inMemoryProjector({ + processorId: uuid(), + projection: shoppingCartsSummaryProjection, + connectionOptions: { database }, + stopAfter: (event) => + event.metadata.streamName === streamName && + event.metadata.streamPosition === stopAfterPosition, + }); + const consumer = mongoDBEventStoreConsumer({ + connectionString, + clientOptions: { directConnection: true }, + processors: [inMemoryProcessor], + }); + + // When + const events: ShoppingCartSummaryEvent[] = [ + { + type: 'ProductItemAdded', + data: { + productItem, + }, + }, + { + type: 'ShoppingCartConfirmed', + data: { confirmedAt }, + }, + ]; + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.nextExpectedStreamVersion; + + await consumerPromise; + + const summary = await summaries.findOne((d) => d._id === streamName); + + assertMatches(summary, { + _id: streamName, + status: 'confirmed', + //_version: 2n, + productItemsCount: productItem.quantity, + }); + } finally { + await consumer.close(); + } + }, + ); + + // void it( + // 'handles ONLY events AFTER provided global position', + // withDeadline, + // async () => { + // // Given + // const shoppingCartId = `shoppingCart:${uuid()}`; + // const streamName = `shopping_cart-${shoppingCartId}`; + + // const initialEvents: ShoppingCartSummaryEvent[] = [ + // { type: 'ProductItemAdded', data: { productItem } }, + // { type: 'ProductItemAdded', data: { productItem } }, + // ]; + // const { lastEventGlobalPosition: startPosition } = + // await eventStore.appendToStream(streamName, initialEvents); + + // const events: ShoppingCartSummaryEvent[] = [ + // { type: 'ProductItemAdded', data: { productItem } }, + // { + // type: 'ShoppingCartConfirmed', + // data: { confirmedAt }, + // }, + // ]; + + // let stopAfterPosition: bigint | undefined = undefined; + + // const inMemoryProcessor = inMemoryProjector({ + // processorId: uuid(), + // projection: shoppingCartsSummaryProjection, + // connectionOptions: { database }, + // startFrom: { lastCheckpoint: startPosition }, + // stopAfter: (event) => + // event.metadata.globalPosition === stopAfterPosition, + // }); + + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // processors: [inMemoryProcessor], + // }); + + // // When + // try { + // const consumerPromise = consumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // stopAfterPosition = appendResult.lastEventGlobalPosition; + + // await consumerPromise; + + // const summary = await summaries.findOne((d) => d._id === streamName); + + // assertMatches(summary, { + // _id: streamName, + // status: 'confirmed', + // _version: 2n, + // productItemsCount: productItem.quantity, + // }); + // } finally { + // await consumer.close(); + // } + // }, + // ); + + void it( + 'handles all events when CURRENT position is NOT stored', + withDeadline, + async () => { + // Given + const shoppingCartId = `shoppingCart:${uuid()}`; + const streamName = `shopping_cart-${shoppingCartId}`; + + const initialEvents: ShoppingCartSummaryEvent[] = [ + { type: 'ProductItemAdded', data: { productItem } }, + { type: 'ProductItemAdded', data: { productItem } }, + ]; + + await eventStore.appendToStream(streamName, initialEvents); + + const events: ShoppingCartSummaryEvent[] = [ + { type: 'ProductItemAdded', data: { productItem } }, + { + type: 'ShoppingCartConfirmed', + data: { confirmedAt }, + }, + ]; + + let stopAfterPosition: bigint | undefined = undefined; + + const inMemoryProcessor = inMemoryProjector({ + processorId: uuid(), + projection: shoppingCartsSummaryProjection, + connectionOptions: { database }, + startFrom: 'CURRENT', + stopAfter: (event) => + event.metadata.streamName === streamName && + event.metadata.streamPosition === stopAfterPosition, + }); + + const consumer = mongoDBEventStoreConsumer({ + connectionString, + clientOptions: { directConnection: true }, + processors: [inMemoryProcessor], + }); + + // When + + try { + const consumerPromise = consumer.start(); + + const appendResult = await eventStore.appendToStream( + streamName, + events, + ); + stopAfterPosition = appendResult.nextExpectedStreamVersion; + + await consumerPromise; + + const summary = await summaries.findOne((d) => d._id === streamName); + + assertMatches(summary, { + _id: streamName, + status: 'confirmed', + // _version: 4n, + productItemsCount: productItem.quantity * 3, + }); + } finally { + await consumer.close(); + } + }, + ); + + // void it( + // 'handles only new events when CURRENT position is stored for restarted consumer', + // withDeadline, + // async () => { + // // Given + // const shoppingCartId = `shoppingCart:${uuid()}`; + // const streamName = `shopping_cart-${shoppingCartId}`; + + // const initialEvents: ShoppingCartSummaryEvent[] = [ + // { type: 'ProductItemAdded', data: { productItem } }, + // { type: 'ProductItemAdded', data: { productItem } }, + // ]; + // const { nextExpectedStreamVersion } = await eventStore.appendToStream( + // streamName, + // initialEvents, + // ); + + // const events: ShoppingCartSummaryEvent[] = [ + // { type: 'ProductItemAdded', data: { productItem } }, + // { + // type: 'ShoppingCartConfirmed', + // data: { confirmedAt }, + // }, + // ]; + + // let stopAfterPosition: bigint | undefined = nextExpectedStreamVersion; + + // const inMemoryProcessor = inMemoryProjector({ + // processorId: uuid(), + // projection: shoppingCartsSummaryProjection, + // connectionOptions: { database }, + // startFrom: 'CURRENT', + // stopAfter: (event) => + // event.metadata.streamName === streamName && + // event.metadata.streamPosition === stopAfterPosition, + // }); + + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // clientOptions: { directConnection: true }, + // processors: [inMemoryProcessor], + // }); + + // // When + // await consumer.start(); + // await consumer.stop(); + + // stopAfterPosition = undefined; + + // try { + // const consumerPromise = consumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // stopAfterPosition = appendResult.nextExpectedStreamVersion; + + // await consumerPromise; + + // const summary = await summaries.findOne((d) => d._id === streamName); + + // assertMatches(summary, { + // _id: streamName, + // status: 'confirmed', + // //_version: 4n, + // productItemsCount: productItem.quantity * 3, + // }); + // } finally { + // await consumer.close(); + // } + // }, + // ); + + // void it( + // 'handles only new events when CURRENT position is stored for a new consumer', + // withDeadline, + // async () => { + // // Given + // const shoppingCartId = `shoppingCart:${uuid()}`; + // const streamName = `shopping_cart-${shoppingCartId}`; + + // const initialEvents: ShoppingCartSummaryEvent[] = [ + // { type: 'ProductItemAdded', data: { productItem } }, + // { type: 'ProductItemAdded', data: { productItem } }, + // ]; + // const { nextExpectedStreamVersion } = await eventStore.appendToStream( + // streamName, + // initialEvents, + // ); + + // const events: ShoppingCartSummaryEvent[] = [ + // { type: 'ProductItemAdded', data: { productItem } }, + // { + // type: 'ShoppingCartConfirmed', + // data: { confirmedAt }, + // }, + // ]; + + // let stopAfterPosition: bigint | undefined = nextExpectedStreamVersion; + + // const inMemoryProcessor = inMemoryProjector({ + // processorId: uuid(), + // projection: shoppingCartsSummaryProjection, + // connectionOptions: { database }, + // startFrom: 'CURRENT', + // stopAfter: (event) => + // event.metadata.streamName === streamName && + // event.metadata.streamPosition === stopAfterPosition, + // }); + + // const consumer = mongoDBEventStoreConsumer({ + // connectionString, + // clientOptions: { directConnection: true }, + // processors: [inMemoryProcessor], + // }); + + // // When + // try { + // await consumer.start(); + // } finally { + // await consumer.close(); + // } + + // stopAfterPosition = undefined; + + // const newConsumer = mongoDBEventStoreConsumer({ + // connectionString, + // clientOptions: { directConnection: true }, + // processors: [inMemoryProcessor], + // }); + + // try { + // const consumerPromise = newConsumer.start(); + + // const appendResult = await eventStore.appendToStream( + // streamName, + // events, + // ); + // stopAfterPosition = appendResult.nextExpectedStreamVersion; + + // await consumerPromise; + + // const summary = await summaries.findOne((d) => d._id === streamName); + + // assertMatches(summary, { + // _id: streamName, + // status: 'confirmed', + // //_version: 4n, + // productItemsCount: productItem.quantity * 3, + // }); + // } finally { + // await newConsumer.close(); + // } + // }, + // ); + }); +}); + +type ShoppingCartSummary = { + _id?: string; + productItemsCount: number; + status: string; +}; + +const shoppingCartsSummaryCollectionName = 'shoppingCartsSummary'; + +export type ShoppingCartSummaryEvent = ProductItemAdded | ShoppingCartConfirmed; + +const evolve = ( + document: ShoppingCartSummary, + { type, data }: ReadEvent, +): ShoppingCartSummary => { + switch (type) { + case 'ProductItemAdded': + return { + ...document, + productItemsCount: + document.productItemsCount + data.productItem.quantity, + }; + case 'ShoppingCartConfirmed': + return { + ...document, + status: 'confirmed', + }; + default: + return document; + } +}; + +const shoppingCartsSummaryProjection = inMemorySingleStreamProjection({ + collectionName: shoppingCartsSummaryCollectionName, + evolve, + canHandle: ['ProductItemAdded', 'ShoppingCartConfirmed'], + initialState: () => ({ + status: 'pending', + productItemsCount: 0, + }), +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.int.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.int.spec.ts new file mode 100644 index 00000000..8de5598c --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.int.spec.ts @@ -0,0 +1,175 @@ +import { + assertFails, + assertFalse, + assertThrowsAsync, + assertTrue, + EmmettError, + type MessageProcessor, +} from '@event-driven-io/emmett'; +import { + MongoDBContainer, + type StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { after, afterEach, before, beforeEach, describe, it } from 'node:test'; +import { v4 as uuid } from 'uuid'; +import { + mongoDBEventStoreConsumer, + type MongoDBEventStoreConsumer, +} from './mongoDBEventStoreConsumer'; +import { isDatabaseUnavailableError } from './subscriptions'; + +const withDeadline = { timeout: 30000 }; + +void describe('mongoDB event store consumer', () => { + let mongoDB: StartedMongoDBContainer; + let connectionString: string; + const dummyProcessor: MessageProcessor = { + type: 'reactor', + id: uuid(), + start: () => Promise.resolve('BEGINNING'), + close: () => Promise.resolve(), + handle: () => Promise.resolve(), + isActive: false, + }; + + before(async () => { + mongoDB = await new MongoDBContainer('mongo:6.0.1').start(); + connectionString = mongoDB.getConnectionString(); + }); + + after(async () => { + try { + await mongoDB.stop(); + } catch (error) { + console.log(error); + } + }); + + void it( + 'creates not-started consumer for the specified connection string', + withDeadline, + () => { + const consumer = mongoDBEventStoreConsumer({ + connectionString, + clientOptions: { directConnection: true }, + processors: [dummyProcessor], + }); + + assertFalse(consumer.isRunning); + }, + ); + + void it( + 'creates not-started consumer if connection string targets not existing mongoDB database', + withDeadline, + () => { + const connectionStringToNotExistingDB = 'mongodb://not-existing:32792'; + const consumer = mongoDBEventStoreConsumer({ + connectionString: connectionStringToNotExistingDB, + clientOptions: { directConnection: true }, + processors: [dummyProcessor], + }); + + assertFalse(consumer.isRunning); + }, + ); + + void describe('created consumer', () => { + let consumer: MongoDBEventStoreConsumer; + + beforeEach(() => { + consumer = mongoDBEventStoreConsumer({ + connectionString, + clientOptions: { directConnection: true }, + processors: [dummyProcessor], + }); + }); + afterEach(() => { + return consumer.close(); + }); + + void it('subscribes to existing event store', withDeadline, () => { + consumer.start().catch(() => assertFails()); + + assertTrue(consumer.isRunning); + }); + + void it( + 'fails to start if connection string targets not existing mongoDB database', + { timeout: 60000 }, + async () => { + const connectionStringToNotExistingDB = 'mongodb://not-existing:2113'; + const consumerToNotExistingServer = mongoDBEventStoreConsumer({ + connectionString: connectionStringToNotExistingDB, + clientOptions: { directConnection: true }, + processors: [dummyProcessor], + }); + await assertThrowsAsync( + () => consumerToNotExistingServer.start(), + isDatabaseUnavailableError, + ); + }, + ); + + void it( + 'fails to start if there are no processors', + withDeadline, + async () => { + const consumerToNotExistingServer = mongoDBEventStoreConsumer({ + connectionString, + clientOptions: { directConnection: true }, + processors: [], + }); + await assertThrowsAsync( + () => consumerToNotExistingServer.start(), + (error) => { + return ( + error.message === + 'Cannot start consumer without at least a single processor' + ); + }, + ); + }, + ); + + void it( + `stopping not started consumer doesn't fail`, + withDeadline, + async () => { + await consumer.stop(); + + assertFalse(consumer.isRunning); + }, + ); + + void it( + `stopping not started consumer is idempotent`, + withDeadline, + async () => { + await consumer.stop(); + await consumer.stop(); + + assertFalse(consumer.isRunning); + }, + ); + }); + + void describe('started consumer', withDeadline, () => { + let consumer: MongoDBEventStoreConsumer; + + beforeEach(() => { + consumer = mongoDBEventStoreConsumer({ + connectionString, + clientOptions: { directConnection: true }, + processors: [dummyProcessor], + }); + }); + afterEach(() => consumer.close()); + + void it('stops started consumer', withDeadline, async () => { + await consumer.stop(); + + assertFalse(consumer.isRunning); + }); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.ts new file mode 100644 index 00000000..c92dd0dd --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStoreConsumer.ts @@ -0,0 +1,233 @@ +import { + EmmettError, + MessageProcessor, + type AnyEvent, + type AnyMessage, + type AnyRecordedMessageMetadata, + type AsyncRetryOptions, + type BatchRecordedMessageHandlerWithoutContext, + type DefaultRecord, + type Message, + type MessageConsumer, + type MessageConsumerOptions, + type RecordedMessageMetadataWithGlobalPosition, +} from '@event-driven-io/emmett'; +import { MongoClient, type MongoClientOptions } from 'mongodb'; +import { v4 as uuid } from 'uuid'; +import { CancellationPromise } from './CancellablePromise'; +import { + changeStreamReactor, + mongoDBProjector, + type MongoDBProcessor, + type MongoDBProcessorOptions, + type MongoDBProjectorOptions, +} from './mongoDBProcessor'; +import { + mongoDBSubscription, + zipMongoDBMessageBatchPullerStartFrom, + type MongoDBSubscription, +} from './subscriptions'; +import type { MongoDBCheckpoint } from './subscriptions/mongoDBCheckpoint'; + +export type MongoDBChangeStreamMessageMetadata = + RecordedMessageMetadataWithGlobalPosition; + +export type MongoDBEventStoreConsumerConfig< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ConsumerMessageType extends Message = any, +> = MessageConsumerOptions & { + resilience?: { + resubscribeOptions?: AsyncRetryOptions; + }; +}; + +export type MongoDBConsumerOptions< + ConsumerMessageType extends Message = Message, +> = MongoDBEventStoreConsumerConfig & + ( + | { + connectionString: string; + clientOptions?: MongoClientOptions; + client?: never; + } + | { + client: MongoClient; + connectionString?: never; + clientOptions?: never; + } + ); + +export type MongoDBEventStoreConsumer< + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ConsumerMessageType extends AnyMessage = any, +> = MessageConsumer & + Readonly<{ + reactor: ( + options: MongoDBProcessorOptions, + ) => MongoDBProcessor; + }> & + (AnyEvent extends ConsumerMessageType + ? Readonly<{ + projector: < + EventType extends AnyEvent = ConsumerMessageType & AnyEvent, + >( + options: MongoDBProjectorOptions, + ) => MongoDBProcessor; + }> + : object); + +export type MongoDBConsumerHandlerContext = { + client?: MongoClient; +}; + +/** + * Creates a MongoDB event store consumer that processes messages from a MongoDB change stream. + * + * This consumer implementation requires change streams to be enabled on the MongoDB collection + * and cannot be used in single-instance environments. It allows for the registration of message + * processors and projectors to handle incoming messages. + * + * @template ConsumerMessageType - The type of messages consumed. + * @template MessageMetadataType - The type of metadata associated with the messages. + * @template HandlerContext - The context type for the message handlers. + * @template CheckpointType - The type used for resuming from checkpoints. + * + * @param options - The options for configuring the MongoDB consumer. + * @returns A MongoDBEventStoreConsumer instance that can start and stop processing messages. + */ +export const mongoDBEventStoreConsumer = < + ConsumerMessageType extends Message = AnyMessage, +>( + options: MongoDBConsumerOptions, +): MongoDBEventStoreConsumer => { + let start: Promise; + let stream: MongoDBSubscription | undefined; + let isRunning = false; + let runningPromise = new CancellationPromise(); + const client = + 'client' in options && options.client + ? options.client + : new MongoClient(options.connectionString, options.clientOptions); + const processors = options.processors ?? []; + + const eachBatch: BatchRecordedMessageHandlerWithoutContext< + ConsumerMessageType, + MongoDBChangeStreamMessageMetadata + > = async (messagesBatch) => { + const activeProcessors = processors.filter((s) => s.isActive); + + if (activeProcessors.length === 0) + return { + type: 'STOP', + reason: 'No active processors', + }; + + const result = await Promise.allSettled( + activeProcessors.map(async (s) => { + // TODO: Add here filtering to only pass messages that can be handled by + return await s.handle(messagesBatch, { client }); + }), + ); + + const error = result.find((r) => r.status === 'rejected')?.reason as + | Error + | undefined; + + return result.some( + (r) => r.status === 'fulfilled' && r.value?.type !== 'STOP', + ) + ? undefined + : { + type: 'STOP', + error: error ? EmmettError.mapFrom(error) : undefined, + }; + }; + + const stop = async () => { + if (stream?.isRunning !== true) return; + await stream.stop(); + isRunning = false; + runningPromise.resolve(null); + }; + + return { + consumerId: options.consumerId ?? uuid(), + get isRunning() { + return isRunning; + }, + processors, + reactor: ( + options: MongoDBProcessorOptions, + ): MongoDBProcessor => { + const processor = changeStreamReactor(options); + + processors.push( + // TODO: change that + processor as unknown as MessageProcessor< + ConsumerMessageType, + AnyRecordedMessageMetadata, + DefaultRecord + >, + ); + + return processor; + }, + projector: ( + options: MongoDBProjectorOptions, + ): MongoDBProcessor => { + const processor = mongoDBProjector(options); + + processors.push( + // TODO: change that + processor as unknown as MessageProcessor< + ConsumerMessageType, + AnyRecordedMessageMetadata, + DefaultRecord + >, + ); + + return processor; + }, + start: () => { + if (isRunning) return start; + + start = (async () => { + if (processors.length === 0) + return Promise.reject( + new EmmettError( + 'Cannot start consumer without at least a single processor', + ), + ); + + isRunning = true; + + runningPromise = new CancellationPromise(); + + const positions = await Promise.all( + processors.map((o) => o.start({ client })), + ); + const startFrom = zipMongoDBMessageBatchPullerStartFrom(positions); + + stream = mongoDBSubscription({ + client, + from: startFrom, + eachBatch, + }); + + await stream.start({ + startFrom, + }); + })(); + + return start; + }, + stop, + close: async () => { + try { + await stop(); + } finally { + if (!options.client) await client.close(); + } + }, + }; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts new file mode 100644 index 00000000..87fc0a4c --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts @@ -0,0 +1,151 @@ +import { + type AnyEvent, + type AnyMessage, + type Checkpointer, + type Event, + type Message, + type MessageHandlerResult, + type MessageProcessingScope, + MessageProcessor, + type ProjectorOptions, + type ReactorOptions, + projector, + reactor, +} from '@event-driven-io/emmett'; +import { MongoClient } from 'mongodb'; +import type { MongoDBEventStoreConnectionOptions } from '../mongoDBEventStore'; +import { mongoDBCheckpointer } from './mongoDBCheckpointer'; +import type { MongoDBChangeStreamMessageMetadata } from './mongoDBEventStoreConsumer'; + +type MongoDBConnectionOptions = { + connectionOptions: MongoDBEventStoreConnectionOptions; +}; + +export type MongoDBProcessorHandlerContext = { + client: MongoClient; +}; + +export type MongoDBProcessor = + MessageProcessor< + MessageType, + MongoDBChangeStreamMessageMetadata, + MongoDBProcessorHandlerContext + >; + +export type MongoDBProcessorOptions = + ReactorOptions< + MessageType, + MongoDBChangeStreamMessageMetadata, + MongoDBProcessorHandlerContext + > & { connectionOptions: MongoDBEventStoreConnectionOptions }; + +export type MongoDBCheckpointer = + Checkpointer< + MessageType, + MongoDBChangeStreamMessageMetadata, + MongoDBProcessorHandlerContext + >; + +export type MongoDBProjectorOptions = + ProjectorOptions< + EventType, + MongoDBChangeStreamMessageMetadata, + MongoDBProcessorHandlerContext + > & + MongoDBConnectionOptions; + +const mongoDBProcessingScope = (options: { + client: MongoClient; + processorId: string; +}): MessageProcessingScope => { + const processingScope: MessageProcessingScope< + MongoDBProcessorHandlerContext + > = async ( + handler: ( + context: MongoDBProcessorHandlerContext, + ) => Result | Promise, + partialContext: Partial, + ) => { + return handler({ + client: options.client, + ...partialContext, + }); + }; + + return processingScope; +}; + +export const mongoDBProjector = ( + options: MongoDBProjectorOptions, +): MongoDBProcessor => { + const { connectionOptions } = options; + const hooks = { + onStart: options.hooks?.onStart, + onClose: options.hooks?.onClose + ? async () => { + if (options.hooks?.onClose) await options.hooks?.onClose(); + } + : undefined, + }; + // TODO: This should be eventually moved to the mongoDBProcessingScope + // In the similar way as it's made in the postgresql processor + // So creating client only if it's needed and different than consumer is passing + // through handler context + const client = + 'client' in connectionOptions && connectionOptions.client + ? connectionOptions.client + : new MongoClient( + connectionOptions.connectionString, + connectionOptions.clientOptions, + ); + + return projector< + EventType, + MongoDBChangeStreamMessageMetadata, + MongoDBProcessorHandlerContext + >({ + ...options, + hooks, + processingScope: mongoDBProcessingScope({ + client, + processorId: + options.processorId ?? `projection:${options.projection.name}`, + }), + + checkpoints: mongoDBCheckpointer(), + }); +}; + +export const changeStreamReactor = < + MessageType extends AnyMessage = AnyMessage, +>( + options: MongoDBProcessorOptions, +): MongoDBProcessor => { + const connectionOptions = options.connectionOptions || {}; + const client = + 'client' in connectionOptions && connectionOptions.client + ? connectionOptions.client + : new MongoClient( + connectionOptions.connectionString, + connectionOptions.clientOptions, + ); + + const hooks = { + onStart: options.hooks?.onStart, + onClose: options.hooks?.onClose + ? async () => { + if (options.hooks?.onClose) await options.hooks?.onClose(); + } + : undefined, + }; + + return reactor({ + ...options, + hooks, + processingScope: mongoDBProcessingScope({ + client, + processorId: options.processorId, + }), + checkpoints: mongoDBCheckpointer(), + }); +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDbCheckpointer.int.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDbCheckpointer.int.spec.ts new file mode 100644 index 00000000..947ce97e --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/mongoDbCheckpointer.int.spec.ts @@ -0,0 +1,144 @@ +import { assertDeepEqual } from '@event-driven-io/emmett'; +import { + MongoDBContainer, + type StartedMongoDBContainer, +} from '@testcontainers/mongodb'; +import { MongoClient } from 'mongodb'; +import { after, before, describe, it } from 'node:test'; +import { + readProcessorCheckpoint, + storeProcessorCheckpoint, +} from './mongoDBCheckpointer'; +import { + toMongoDBCheckpoint, + type MongoDBCheckpoint, +} from './subscriptions/mongoDBCheckpoint'; + +void describe('storeProcessorCheckpoint and readProcessorCheckpoint tests', () => { + let mongodb: StartedMongoDBContainer; + let client: MongoClient; + + const processorId = 'processorId-1'; + const resumeToken1: MongoDBCheckpoint = toMongoDBCheckpoint( + { + _data: + '82687E948D000000032B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }, + undefined, + ); + const resumeToken2: MongoDBCheckpoint = toMongoDBCheckpoint( + { + _data: + '82687E949E000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }, + 1, + ); + const resumeToken3: MongoDBCheckpoint = toMongoDBCheckpoint( + { + _data: + '82687E94D4000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004', + }, + 2, + ); + before(async () => { + mongodb = await new MongoDBContainer('mongo:6.0.1').start(); + client = new MongoClient(mongodb.getConnectionString(), { + directConnection: true, + }); + + await client.connect(); + }); + + after(async () => { + await client.close(); + await mongodb.stop(); + }); + + void it('should store successfully last proceeded MongoDB resume token for the first time', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastStoredCheckpoint: null, + newCheckpoint: resumeToken1, + version: 1, + }); + + assertDeepEqual(result, { + success: true, + newCheckpoint: resumeToken1, + }); + }); + + void it('should store successfully a new checkpoint expecting the previous token', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastStoredCheckpoint: resumeToken1, + newCheckpoint: resumeToken2, + version: 2, + }); + + assertDeepEqual(result, { + success: true, + newCheckpoint: resumeToken2, + }); + }); + + void it('it returns IGNORED when the newCheckpoint is the same or earlier than the lastProcessedPosition', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastStoredCheckpoint: resumeToken2, + newCheckpoint: resumeToken1, + version: 3, + }); + + assertDeepEqual(result, { + success: false, + reason: 'IGNORED', + }); + }); + + void it('it returns MISMATCH when the lastProcessedPosition is not the one that is currently stored', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastStoredCheckpoint: resumeToken1, + newCheckpoint: resumeToken3, + version: 3, + }); + + assertDeepEqual(result, { + success: false, + reason: 'MISMATCH', + }); + }); + + void it('it can save a checkpoint with a specific partition', async () => { + const result = await storeProcessorCheckpoint(client, { + processorId, + lastStoredCheckpoint: null, + newCheckpoint: resumeToken1, + partition: 'partition-2', + version: 1, + }); + + assertDeepEqual(result, { + success: true, + newCheckpoint: resumeToken1, + }); + }); + + void it('it can read a position of a processor with the default partition', async () => { + const result = await readProcessorCheckpoint(client, { + processorId, + }); + + assertDeepEqual(result, { lastCheckpoint: resumeToken2 }); + }); + + void it('it can read a position of a processor with a defined partition', async () => { + const result = await readProcessorCheckpoint(client, { + processorId, + partition: 'partition-2', + }); + + assertDeepEqual(result, { lastCheckpoint: resumeToken1 }); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts new file mode 100644 index 00000000..2908fe1f --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts @@ -0,0 +1,508 @@ +import { + asyncRetry, + EmmettError, + JSONParser, + type AnyMessage, + type AsyncRetryOptions, + type BatchRecordedMessageHandlerWithoutContext, + type CurrentMessageProcessorPosition, + type Event, + type Message, + type MessageHandlerResult, + type ReadEvent, + type RecordedMessage, + type RecordedMessageMetadata, +} from '@event-driven-io/emmett'; +import { + ChangeStream, + Timestamp, + type ChangeStreamInsertDocument, + type ChangeStreamReplaceDocument, + type ChangeStreamUpdateDocument, + type Db, + type Document, + type MongoClient, +} from 'mongodb'; +import { pipeline, Transform, Writable, type WritableOptions } from 'stream'; +import type { + EventStream, + MongoDBReadEventMetadata, +} from '../../mongoDBEventStore'; +import type { MongoDBChangeStreamMessageMetadata } from '../mongoDBEventStoreConsumer'; +import { + isMongoDBCheckpoint, + toMongoDBCheckpoint, + toMongoDBResumeToken, + type MongoDBCheckpoint, + type MongoDBResumeToken, +} from './mongoDBCheckpoint'; + +export type MongoDBSubscriptionOptions = + { + from?: CurrentMessageProcessorPosition; + client: MongoClient; + // batchSize: number; + eachBatch: BatchRecordedMessageHandlerWithoutContext< + MessageType, + MongoDBChangeStreamMessageMetadata + >; + resilience?: { + resubscribeOptions?: AsyncRetryOptions; + }; + }; +export type ChangeStreamFullDocumentValuePolicy = () => + | 'whenAvailable' + | 'updateLookup'; +export type MongoDBSubscriptionDocument = + | ChangeStreamInsertDocument + | ChangeStreamUpdateDocument + | ChangeStreamReplaceDocument; +// https://www.mongodb.com/docs/manual/reference/command/buildInfo/ +export type BuildInfo = { + version: string; + gitVersion: string; + sysInfo: string; + loaderFlags: string; + compilerFlags: string; + allocator: string; + versionArray: number[]; + openssl: Document; + javascriptEngine: string; + bits: number; + debug: boolean; + maxBsonObjectSize: number; + storageEngines: string[]; + ok: number; +}; +export type MongoDBSubscriptionStartFrom = + CurrentMessageProcessorPosition; + +export type MongoDBSubscriptionStartOptions = { + startFrom: MongoDBSubscriptionStartFrom; + dbName?: string; +}; + +export type MongoDBSubscription = { + isRunning: boolean; + start(options: MongoDBSubscriptionStartOptions): Promise; + stop(): Promise; +}; + +export type StreamSubscription< + EventType extends Message = AnyMessage, + MessageMetadataType extends + MongoDBChangeStreamMessageMetadata = MongoDBChangeStreamMessageMetadata, +> = ChangeStream< + EventStream, MessageMetadataType>, + MongoDBSubscriptionDocument< + EventStream, RecordedMessageMetadata> + > +>; +export type MessageArrayElement = `messages.${string}`; +export type UpdateDescription = { + _id: MongoDBResumeToken; + operationType: 'update'; + updateDescription: { + updatedFields: Record & { + 'metadata.streamPosition': number; + 'metadata.updatedAt': Date; + }; + }; +}; +export type FullDocument< + EventType extends Event = Event, + EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, + T extends EventStream = EventStream, +> = { + _id: MongoDBResumeToken; + operationType: 'insert'; + fullDocument: T; +}; +export type OplogChange< + EventType extends Message = AnyMessage, + EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata, + T extends EventStream = EventStream< + Extract, + EventMetaDataType + >, +> = + | FullDocument, EventMetaDataType, T> + | UpdateDescription< + ReadEvent, EventMetaDataType> + >; + +type SubscriptionSequentialHandlerOptions< + MessageType extends AnyMessage = AnyMessage, +> = MongoDBSubscriptionOptions & WritableOptions; + +class SubscriptionSequentialHandler< + MessageType extends Message = AnyMessage, +> extends Transform { + private options: SubscriptionSequentialHandlerOptions; + public isRunning: boolean; + + constructor(options: SubscriptionSequentialHandlerOptions) { + super({ objectMode: true, ...options }); + this.options = options; + // this.from = options.from; + this.isRunning = true; + } + + async _transform( + change: OplogChange, + _encoding: BufferEncoding, + callback: (error?: Error | null) => void, + ): Promise { + try { + if (!this.isRunning || !change) { + callback(); + return; + } + + const changeStreamCheckpoint = change._id; + const streamChange = + change.operationType === 'insert' + ? change.fullDocument + : change.operationType === 'update' + ? { + messages: Object.entries(change.updateDescription.updatedFields) + .filter(([key]) => key.startsWith('messages.')) + .map(([, value]) => value as ReadEvent), + } + : void 0; + + if (!streamChange) { + return; + } + + let lastCheckpoint: MongoDBCheckpoint | undefined = undefined; + const messages = streamChange.messages.map((message, index) => { + lastCheckpoint = toMongoDBCheckpoint(changeStreamCheckpoint, index); + return { + kind: message.kind, + type: message.type, + data: message.data, + metadata: { + ...message.metadata, + checkpoint: lastCheckpoint, + globalPosition: lastCheckpoint, + }, + } as unknown as RecordedMessage< + MessageType, + MongoDBChangeStreamMessageMetadata + >; + }); + + const result = await this.options.eachBatch(messages); + + if (result && result.type === 'STOP') { + this.isRunning = false; + if (!result.error) this.push(lastCheckpoint); + this.push(result); + this.push(null); + callback(); + return; + } + + this.push(lastCheckpoint); + callback(); + } catch (error) { + callback(error as Error); + } + } +} + +const REGEXP = + /^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$/; + +const databaseUnavailableErrorMessages = [ + 'getaddrinfo ENOTFOUND not-existing', + 'getaddrinfo EAI_AGAIN not-existing', + 'Topology is closed', +]; + +export const isDatabaseUnavailableError = (error: unknown) => { + return ( + error instanceof Error && + databaseUnavailableErrorMessages.indexOf(error.message) !== -1 + ); +}; + +export const MongoDBResubscribeDefaultOptions: AsyncRetryOptions = { + forever: true, + minTimeout: 100, + factor: 1.5, + shouldRetryError: (error) => !isDatabaseUnavailableError(error), +}; + +export const parseSemVer = (value: string = '') => { + const versions = REGEXP.exec(value); + + return { + major: Number(versions?.[1]) || void 0, + minor: Number(versions?.[2]) || void 0, + bugfix: Number(versions?.[3]) || void 0, + rc: versions?.[4] || void 0, + }; +}; + +export const getDatabaseVersionPolicies = async (db: Db) => { + const buildInfo = (await db.admin().buildInfo()) as BuildInfo; + const semver = parseSemVer(buildInfo.version); + const major = semver.major || 0; + const throwNotSupportedError = (): never => { + throw new EmmettError( + `Not supported MongoDB version: ${buildInfo.version}.`, + ); + }; + + const supportedVersionCheckPolicy = () => { + if (major < 5) { + throwNotSupportedError(); + } + }; + const changeStreamFullDocumentValuePolicy: ChangeStreamFullDocumentValuePolicy = + () => { + if (major >= 6) { + return 'whenAvailable'; + } else if (major === 5) { + return 'updateLookup'; + } else { + return throwNotSupportedError(); + } + }; + + return { + supportedVersionCheckPolicy, + changeStreamFullDocumentValuePolicy, + }; +}; + +// const DEFAULT_PARTITION_KEY_NAME = 'default'; +const createChangeStream = ( + getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, + db: Db, + resumeToken?: CurrentMessageProcessorPosition, + // partitionKey: string = DEFAULT_PARTITION_KEY_NAME, +) => { + const $match = { + 'ns.coll': { $regex: /^emt:/, $ne: 'emt:processors' }, + $or: [ + { operationType: 'insert' }, + { + operationType: 'update', + }, + ], + // 'fullDocument.partitionKey': partitionKey, + }; + const pipeline = [ + { + $match, + }, + ]; + + return db.watch< + EventStream>, + MongoDBSubscriptionDocument< + EventStream> + > + >(pipeline, { + useBigInt64: true, + fullDocument: getFullDocumentValue(), + ...(resumeToken === undefined || resumeToken === 'BEGINNING' + ? { + /* + The MongoDB's API is designed around starting from now or resuming from a known position + (resumeAfter, startAfter, or startAtOperationTime). + By passing a date set a long time ago (year 2000), we force MongoDB to start + from the earliest possible position in the oplog. + If the retention is 48 hours, then it will be 24 hours back. + */ + startAtOperationTime: new Timestamp({ + t: 946684800, + i: 0, + }), + } + : resumeToken === 'END' + ? void 0 + : toMongoDBResumeToken(resumeToken.lastCheckpoint)), + }); +}; + +const subscribe = + (getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db) => + ( + resumeToken?: MongoDBSubscriptionStartFrom, + ) => + createChangeStream(getFullDocumentValue, db, resumeToken); + +export const mongoDBSubscription = ({ + client, + from, + // batchSize, + eachBatch, + resilience, +}: MongoDBSubscriptionOptions): MongoDBSubscription => { + let isRunning = false; + + let start: Promise; + let processor: SubscriptionSequentialHandler; + + let subscription: StreamSubscription | undefined; + + const resubscribeOptions: AsyncRetryOptions = + resilience?.resubscribeOptions ?? { + ...MongoDBResubscribeDefaultOptions, + shouldRetryResult: () => isRunning, + shouldRetryError: (error) => + isRunning && MongoDBResubscribeDefaultOptions.shouldRetryError!(error), + }; + + const stopSubscription = async (callback?: () => void): Promise => { + isRunning = false; + if (processor) processor.isRunning = false; + + if (!subscription) return Promise.resolve(); + + if (subscription.closed) { + return new Promise((resolve, reject) => { + try { + callback?.(); + resolve(); + } catch (error) { + reject( + error instanceof Error + ? error + : typeof error === 'string' + ? new Error(error) + : new Error('Unknown error'), + ); + } + }); + } else { + try { + await subscription.close(); + } finally { + callback?.(); + } + } + }; + + const pipeMessages = (options: MongoDBSubscriptionStartOptions) => { + let retry = 0; + + return asyncRetry(async () => { + const db = client.db(options.dbName); + + const versionPolicies = await getDatabaseVersionPolicies(db); + const policy = versionPolicies.changeStreamFullDocumentValuePolicy; + + return new Promise((resolve, reject) => { + if (!isRunning) { + resolve(); + return; + } + + console.info( + `Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify( + options.startFrom, + )}`, + ); + + subscription = subscribe( + policy, + client.db(options.dbName), + )(options.startFrom); + + processor = new SubscriptionSequentialHandler({ + client, + from, + // batchSize, + eachBatch, + resilience, + }); + + const handler = new (class extends Writable { + async _write( + result: MongoDBCheckpoint | MessageHandlerResult, + _encoding: string, + done: () => void, + ) { + if (!isRunning) return; + + if (isMongoDBCheckpoint(result)) { + options.startFrom = { + lastCheckpoint: result, + }; + done(); + return; + } + + if (result && result.type === 'STOP' && result.error) { + console.error( + `Subscription stopped with error code: ${result.error.errorCode}, message: ${ + result.error.message + }.`, + ); + } + + await stopSubscription(); + done(); + } + })({ objectMode: true }); + + pipeline( + subscription, + processor, + handler, + async (error: Error | null) => { + console.info(`Stopping subscription.`); + await stopSubscription(() => { + if (!error) { + console.info('Subscription ended successfully.'); + resolve(); + return; + } + + if ( + error.message === 'ChangeStream is closed' && + error.name === 'MongoAPIError' + ) { + console.info('Subscription ended successfully.'); + resolve(); + return; + } + + console.error(`Received error: ${JSONParser.stringify(error)}.`); + reject(error); + }); + }, + ); + }); + }, resubscribeOptions); + }; + + return { + get isRunning() { + return isRunning; + }, + start: (options) => { + if (isRunning) return start; + + start = (async () => { + isRunning = true; + return pipeMessages(options); + })(); + + return start; + }, + stop: async () => { + if (!isRunning) return start ? await start : Promise.resolve(); + await stopSubscription(); + await start; + }, + }; +}; + +export * from './mongoDBCheckpoint'; +export { subscribe }; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/mongoDBCheckpoint.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/mongoDBCheckpoint.ts new file mode 100644 index 00000000..1f737f74 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/mongoDBCheckpoint.ts @@ -0,0 +1,151 @@ +import { + IllegalStateError, + type CurrentMessageProcessorPosition, +} from '@event-driven-io/emmett'; + +export type MongoDBResumeToken = Readonly<{ _data: string }>; + +export type MongoDBCheckpoint = + `emt:chkpt:mongodb:${MongoDBResumeToken['_data']}:${bigint}`; + +export const isMongoDBCheckpoint = ( + value: unknown, +): value is MongoDBCheckpoint => + typeof value === 'string' && value.startsWith('emt:chkpt:mongodb:'); + +export const toMongoDBCheckpoint = ( + resumeToken: MongoDBResumeToken, + position: bigint | number | undefined, +): MongoDBCheckpoint => { + return `emt:chkpt:mongodb:${resumeToken._data}:${position ?? 0}` as MongoDBCheckpoint; +}; + +export const toMongoDBCheckpointValues = ( + checkpoint: MongoDBCheckpoint, +): { resumeToken: MongoDBResumeToken['_data']; position: bigint } => { + const parts = checkpoint.split(':'); + if ( + parts.length !== 5 || + parts[0] !== 'emt' || + parts[1] !== 'chkpt' || + parts[2] !== 'mongodb' + ) { + throw new IllegalStateError( + `Invalid MongoDB checkpoint format: ${checkpoint}`, + ); + } + + return { resumeToken: parts[3]!, position: BigInt(parts[4]!) }; +}; + +export const toMongoDBResumeToken = ( + checkpoint: MongoDBCheckpoint, +): MongoDBResumeToken => { + const { resumeToken } = toMongoDBCheckpointValues(checkpoint); + return { _data: resumeToken }; +}; + +export const isMongoDBResumeToken = ( + value: unknown, +): value is MongoDBResumeToken => { + return !!( + typeof value === 'object' && + value && + '_data' in value && + typeof value._data === 'string' + ); +}; + +/** + * Compares two MongoDB Resume Tokens. + * @param token1 Token 1. + * @param token2 Token 2. + * @returns 0 - if the tokens are the same, 1 - if the token1 is later, -1 - is the token1 is earlier. + */ +export const compareTwoMongoDBTokens = ( + token1: MongoDBResumeToken, + token2: MongoDBResumeToken, +) => compareTwoHexBuffers(token1._data, token2._data); + +/** + * Compares two MongoDB Resume Tokens. + * @param token1 Token 1. + * @param token2 Token 2. + * @returns 0 - if the tokens are the same, 1 - if the token1 is later, -1 - is the token1 is earlier. + */ +export const compareTwoHexBuffers = ( + token1: MongoDBResumeToken['_data'], + token2: MongoDBResumeToken['_data'], +) => { + const bufA = Buffer.from(token1, 'hex'); + const bufB = Buffer.from(token2, 'hex'); + + return Buffer.compare(bufA, bufB); +}; + +export const compareTwoMongoDBCheckpoints = ( + checkpoint1: MongoDBCheckpoint, + checkpoint2: MongoDBCheckpoint, +) => { + const { resumeToken: rt1, position: pos1 } = + toMongoDBCheckpointValues(checkpoint1); + const { resumeToken: rt2, position: pos2 } = + toMongoDBCheckpointValues(checkpoint2); + const tokenComparison = compareTwoHexBuffers(rt1, rt2); + + if (tokenComparison !== 0) { + return tokenComparison; + } + + return pos1 < pos2 ? -1 : pos1 > pos2 ? 1 : 0; +}; + +export const compareTwoTokens = (token1: unknown, token2: unknown) => { + if (token1 === null && token2) { + return -1; + } + + if (token1 && token2 === null) { + return 1; + } + + if (token1 === null && token2 === null) { + return 0; + } + + if (isMongoDBCheckpoint(token1) && isMongoDBCheckpoint(token2)) { + return compareTwoMongoDBCheckpoints(token1, token2); + } + + if (typeof token1 === 'string' && typeof token2 === 'string') { + return compareTwoHexBuffers(token1, token2); + } + + throw new IllegalStateError(`Type of tokens is not comparable`); +}; + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export const zipMongoDBMessageBatchPullerStartFrom = ( + options: (CurrentMessageProcessorPosition | undefined)[], +): CurrentMessageProcessorPosition => { + if ( + options.length === 0 || + options.some((o) => o === undefined || o === 'BEGINNING') + ) { + return 'BEGINNING'; + } + + if (options.every((o) => o === 'END')) { + return 'END'; + } + + const positionTokens = options.filter( + (o) => o !== undefined && o !== 'BEGINNING' && o !== 'END', + ); + + const sorted = positionTokens.sort((a, b) => { + return compareTwoTokens(a.lastCheckpoint, b.lastCheckpoint); + }); + + return sorted[0]!; +}; diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts new file mode 100644 index 00000000..3ff1bc35 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/zipMongoDBMessageBatchPullerStartFrom.spec.ts @@ -0,0 +1,32 @@ +import { assertEqual, assertNotEqual } from '@event-driven-io/emmett'; +import assert from 'assert'; +import { describe, it } from 'node:test'; +import { zipMongoDBMessageBatchPullerStartFrom } from './mongoDBCheckpoint'; + +void describe('zipMongoDBMessageBatchPullerStartFrom', () => { + void it('it can get the earliest MongoDB oplog token', () => { + // tokens are sorted in descending order, so the earliest message is at the end + const input = [ + { + lastCheckpoint: { + _data: `82687E94D4000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + { + lastCheckpoint: { + _data: `82687E949E000000012B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + { + lastCheckpoint: { + _data: `82687E948D000000032B042C0100296E5A100461BBC0449CFA4531AE298EB6083F923A463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064687E948DC5FE3CA1AF560962000004`, + }, + }, + ]; + const result = zipMongoDBMessageBatchPullerStartFrom(input); + + assertNotEqual('string', typeof result); + assert(typeof result !== 'string'); + assertEqual(input[2]?.lastCheckpoint._data, result.lastCheckpoint._data); + }); +}); diff --git a/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts new file mode 100644 index 00000000..2f7ce844 --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/consumers/types.ts @@ -0,0 +1,3 @@ +export const defaultTag = 'emt:default'; + +export const DefaultProcessotCheckpointCollectionName = 'emt:processors'; diff --git a/src/packages/emmett-mongodb/src/eventStore/example.ts b/src/packages/emmett-mongodb/src/eventStore/example.ts new file mode 100644 index 00000000..49f809ea --- /dev/null +++ b/src/packages/emmett-mongodb/src/eventStore/example.ts @@ -0,0 +1,48 @@ +import { type Event } from '@event-driven-io/emmett'; +import { MongoClient } from 'mongodb'; +import { getMongoDBEventStore } from '../eventStore'; + +export type PricedProductItem = { + productId: string; + quantity: number; + price: number; +}; +export type ProductItemAdded = Event< + 'ProductItemAdded', + { productItem: PricedProductItem } +>; +export type DiscountApplied = Event< + 'DiscountApplied', + { percent: number; couponId: string } +>; + +export type ShoppingCartEvent = ProductItemAdded | DiscountApplied; + +const connectionString = `mongodb://localhost:30003,localhost:30004/ylah-access?replicaSet=rsmongo&retryWrites=true&w=majority`; + +const main = async () => { + const mongo = new MongoClient(connectionString); + await mongo.connect(); + const es = getMongoDBEventStore({ + client: mongo, + }); + await es.appendToStream('test', [ + { + type: 'ProductItemAdded', + data: { + productItem: { + price: 100, + productId: '111-000', + quantity: 1, + }, + }, + }, + ]); + process.on('SIGTERM', async () => { + console.info(`Closing...`); + await mongo.close(); + }); +}; + +// eslint-disable-next-line @typescript-eslint/no-floating-promises +main(); diff --git a/src/packages/emmett-mongodb/src/testing/shoppingCart.domain.ts b/src/packages/emmett-mongodb/src/testing/shoppingCart.domain.ts index c7b7ec78..f1d9fd7a 100644 --- a/src/packages/emmett-mongodb/src/testing/shoppingCart.domain.ts +++ b/src/packages/emmett-mongodb/src/testing/shoppingCart.domain.ts @@ -31,7 +31,10 @@ export type DiscountApplied = Event< 'DiscountApplied', { percent: number; couponId: string } >; - +export type ShoppingCartConfirmed = Event< + 'ShoppingCartConfirmed', + { confirmedAt: Date } +>; export type DeletedShoppingCart = Event< 'DeletedShoppingCart', { deletedAt: Date; reason: string } @@ -40,6 +43,7 @@ export type DeletedShoppingCart = Event< export type ShoppingCartEvent = | ProductItemAdded | DiscountApplied + | ShoppingCartConfirmed | DeletedShoppingCart; export const evolve = ( @@ -60,6 +64,8 @@ export const evolve = ( ...state, totalAmount: state.totalAmount * (1 - data.percent / 100), }; + case 'ShoppingCartConfirmed': + return state; case 'DeletedShoppingCart': return null; } @@ -83,6 +89,8 @@ export const evolveWithMetadata = ( ...state, totalAmount: state.totalAmount * (1 - data.percent / 100), }; + case 'ShoppingCartConfirmed': + return state; case 'DeletedShoppingCart': return null; } diff --git a/src/packages/emmett-postgresql/src/streaming/subscriptions/caughtUpTransformStream.ts b/src/packages/emmett-postgresql/src/streaming/subscriptions/caughtUpTransformStream.ts index d2751472..1db7df9a 100644 --- a/src/packages/emmett-postgresql/src/streaming/subscriptions/caughtUpTransformStream.ts +++ b/src/packages/emmett-postgresql/src/streaming/subscriptions/caughtUpTransformStream.ts @@ -1,12 +1,12 @@ -import { - globalStreamCaughtUp, - type GlobalSubscriptionEvent, -} from '@event-driven-io/emmett'; import type { Event, ReadEvent, ReadEventMetadataWithGlobalPosition, -} from '@event-driven-io/emmett/src/typing'; +} from '@event-driven-io/emmett'; +import { + globalStreamCaughtUp, + type GlobalSubscriptionEvent, +} from '@event-driven-io/emmett'; import { TransformStream } from 'node:stream/web'; export const streamTrackingGlobalPosition = ( diff --git a/src/packages/emmett/src/consumers/consumers.ts b/src/packages/emmett/src/consumers/consumers.ts index a2db98be..e80e3e25 100644 --- a/src/packages/emmett/src/consumers/consumers.ts +++ b/src/packages/emmett/src/consumers/consumers.ts @@ -7,7 +7,7 @@ export type MessageConsumerOptions< > = { consumerId?: string; // eslint-disable-next-line @typescript-eslint/no-explicit-any - processors?: Array>; + processors?: Array>; }; export type MessageConsumer< diff --git a/src/packages/emmett/src/processors/processors.ts b/src/packages/emmett/src/processors/processors.ts index 59ec33fb..47e8d0b6 100644 --- a/src/packages/emmett/src/processors/processors.ts +++ b/src/packages/emmett/src/processors/processors.ts @@ -40,9 +40,7 @@ export const getCheckpoint = < message: RecordedMessage, ): CheckpointType | null => { // eslint-disable-next-line @typescript-eslint/no-unsafe-return - return 'checkpoint' in message.metadata && - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - isBigint(message.metadata.checkpoint) + return 'checkpoint' in message.metadata ? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access message.metadata.checkpoint : 'globalPosition' in message.metadata && diff --git a/src/packages/emmett/src/testing/assertions.ts b/src/packages/emmett/src/testing/assertions.ts index f697bd09..72419cf4 100644 --- a/src/packages/emmett/src/testing/assertions.ts +++ b/src/packages/emmett/src/testing/assertions.ts @@ -159,6 +159,13 @@ export const assertThat = (item: T) => { }; }; +export const assertDefined = ( + value: unknown, + message?: string | Error, +): asserts value => { + assertOk(value, message instanceof Error ? message.message : message); +}; + export function assertFalse( condition: boolean, message?: string, @@ -175,6 +182,7 @@ export function assertTrue( throw new AssertionError(message ?? `Condition is false`); } +// TODO: replace with assertDefined export function assertOk( obj: T | null | undefined, message?: string,