Skip to content

Commit 53e7b17

Browse files
committed
Made MongoDB checkpointers to be aligned with others
Passed explicit checkpoint and used string instead of the nested structure
1 parent 278a7bf commit 53e7b17

File tree

14 files changed

+169
-230
lines changed

14 files changed

+169
-230
lines changed

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ import {
99
type Message,
1010
type MessageConsumer,
1111
type RecordedMessage,
12+
type RecordedMessageMetadataWithGlobalPosition,
1213
} from '@event-driven-io/emmett';
1314
import { MongoClient, type MongoClientOptions } from 'mongodb';
1415
import { v4 as uuid } from 'uuid';
15-
import type { MongoDBRecordedMessageMetadata } from '../event';
16-
import type { MongoDBReadEventMetadata } from '../mongoDBEventStore';
1716
import { CancellationPromise } from './CancellablePromise';
1817
import {
1918
changeStreamReactor,
@@ -27,12 +26,15 @@ import {
2726
zipMongoDBMessageBatchPullerStartFrom,
2827
type MongoDBSubscription,
2928
} from './subscriptions';
30-
import type { MongoDBResumeToken } from './subscriptions/types';
29+
import type { MongoDBResumeToken } from './subscriptions/mongoDbResumeToken';
30+
31+
export type MongoDBChangeStreamMessageMetadata =
32+
RecordedMessageMetadataWithGlobalPosition<MongoDBResumeToken['_data']>;
3133

3234
export type MessageConsumerOptions<
3335
MessageType extends Message = AnyMessage,
3436
MessageMetadataType extends
35-
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
37+
MongoDBChangeStreamMessageMetadata = MongoDBChangeStreamMessageMetadata,
3638
HandlerContext extends DefaultRecord | undefined = undefined,
3739
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
3840
> = {
@@ -50,7 +52,7 @@ export type MongoDBEventStoreConsumerConfig<
5052
// eslint-disable-next-line @typescript-eslint/no-explicit-any
5153
ConsumerMessageType extends Message = any,
5254
MessageMetadataType extends
53-
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
55+
MongoDBChangeStreamMessageMetadata = MongoDBChangeStreamMessageMetadata,
5456
HandlerContext extends DefaultRecord | undefined = undefined,
5557
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
5658
> = MessageConsumerOptions<
@@ -67,7 +69,7 @@ export type MongoDBEventStoreConsumerConfig<
6769
export type MongoDBConsumerOptions<
6870
ConsumerEventType extends Message = Message,
6971
MessageMetadataType extends
70-
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
72+
MongoDBChangeStreamMessageMetadata = MongoDBChangeStreamMessageMetadata,
7173
HandlerContext extends DefaultRecord | undefined = undefined,
7274
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
7375
> = MongoDBEventStoreConsumerConfig<
@@ -130,7 +132,7 @@ export type MongoDBConsumerHandlerContext = {
130132
export const mongoDBEventStoreConsumer = <
131133
ConsumerMessageType extends Message = AnyMessage,
132134
MessageMetadataType extends
133-
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
135+
MongoDBChangeStreamMessageMetadata = MongoDBChangeStreamMessageMetadata,
134136
HandlerContext extends
135137
MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext,
136138
CheckpointType = MongoDBResumeToken,

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts

Lines changed: 29 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,15 @@ import {
99
MessageProcessor,
1010
type ProjectorOptions,
1111
type ReactorOptions,
12-
type RecordedMessage,
12+
getCheckpoint,
1313
projector,
1414
reactor,
1515
} from '@event-driven-io/emmett';
1616
import { MongoClient } from 'mongodb';
17-
import type { ReadEventMetadataWithGlobalPosition } from '../event';
1817
import type { MongoDBEventStoreConnectionOptions } from '../mongoDBEventStore';
18+
import type { MongoDBChangeStreamMessageMetadata } from './mongoDBEventsConsumer';
1919
import { readProcessorCheckpoint } from './readProcessorCheckpoint';
2020
import { storeProcessorCheckpoint } from './storeProcessorCheckpoint';
21-
import type { MongoDBResumeToken } from './subscriptions/types';
2221

2322
type MongoDBConnectionOptions = {
2423
connectionOptions: MongoDBEventStoreConnectionOptions;
@@ -31,98 +30,51 @@ export type MongoDBProcessorHandlerContext = {
3130
export type MongoDBProcessor<MessageType extends Message = AnyMessage> =
3231
MessageProcessor<
3332
MessageType,
34-
ReadEventMetadataWithGlobalPosition,
33+
MongoDBChangeStreamMessageMetadata,
3534
MongoDBProcessorHandlerContext
3635
>;
3736

3837
export type MongoDBProcessorOptions<MessageType extends Message = Message> =
3938
ReactorOptions<
4039
MessageType,
41-
ReadEventMetadataWithGlobalPosition,
40+
MongoDBChangeStreamMessageMetadata,
4241
MongoDBProcessorHandlerContext
4342
> & { connectionOptions: MongoDBEventStoreConnectionOptions };
4443

45-
export type MongoDBCheckpointer<
46-
MessageType extends AnyMessage = AnyMessage,
47-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
48-
CheckpointType = any,
49-
> = Checkpointer<
50-
MessageType,
51-
ReadEventMetadataWithGlobalPosition<CheckpointType>,
52-
MongoDBProcessorHandlerContext,
53-
CheckpointType
54-
>;
44+
export type MongoDBCheckpointer<MessageType extends AnyMessage = AnyMessage> =
45+
Checkpointer<
46+
MessageType,
47+
MongoDBChangeStreamMessageMetadata,
48+
MongoDBProcessorHandlerContext
49+
>;
5550

5651
export type MongoDBProjectorOptions<EventType extends AnyEvent = AnyEvent> =
5752
ProjectorOptions<
5853
EventType,
59-
ReadEventMetadataWithGlobalPosition,
54+
MongoDBChangeStreamMessageMetadata,
6055
MongoDBProcessorHandlerContext
6156
> &
6257
MongoDBConnectionOptions;
6358

64-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
65-
const isResumeToken = (value: any): value is MongoDBResumeToken =>
66-
typeof value === 'object' &&
67-
'_data' in value &&
68-
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
69-
typeof value._data === 'string';
70-
71-
export const getCheckpoint = <
72-
MessageType extends AnyMessage = AnyMessage,
73-
CheckpointType = MongoDBCheckpointer,
74-
MessageMetadataType extends
75-
ReadEventMetadataWithGlobalPosition<CheckpointType> = ReadEventMetadataWithGlobalPosition<CheckpointType>,
76-
>(
77-
message: RecordedMessage<MessageType, MessageMetadataType>,
78-
): CheckpointType | null => {
79-
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
80-
return 'checkpoint' in message.metadata &&
81-
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
82-
isResumeToken(message.metadata.checkpoint)
83-
? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
84-
message.metadata.checkpoint
85-
: 'globalPosition' in message.metadata &&
86-
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
87-
isResumeToken(message.metadata.globalPosition)
88-
? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
89-
message.metadata.globalPosition
90-
: 'streamPosition' in message.metadata &&
91-
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
92-
isResumeToken(message.metadata.streamPosition)
93-
? // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
94-
message.metadata.streamPosition
95-
: null;
96-
};
97-
9859
export const mongoDBCheckpointer = <
9960
MessageType extends Message = Message,
100-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
101-
CheckpointType = any,
102-
>(): MongoDBCheckpointer<MessageType, CheckpointType> => ({
61+
>(): MongoDBCheckpointer<MessageType> => ({
10362
read: async (options, context) => {
104-
const result = await readProcessorCheckpoint<CheckpointType>(
105-
context.client,
106-
options,
107-
);
63+
const result = await readProcessorCheckpoint(context.client, options);
10864

109-
return { lastCheckpoint: result?.lastProcessedPosition };
65+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
66+
return { lastCheckpoint: result?.lastCheckpoint };
11067
},
11168
store: async (options, context) => {
112-
const newPosition = getCheckpoint<MessageType, CheckpointType>(
113-
options.message,
114-
);
115-
116-
const result = await storeProcessorCheckpoint<CheckpointType>(
117-
context.client,
118-
{
119-
lastProcessedPosition: options.lastCheckpoint,
120-
newPosition,
121-
processorId: options.processorId,
122-
partition: options.partition,
123-
version: options.version || 0,
124-
},
125-
);
69+
const newPosition = getCheckpoint(options.message);
70+
71+
const result = await storeProcessorCheckpoint(context.client, {
72+
lastProcessedPosition: options.lastCheckpoint,
73+
newPosition,
74+
processorId: options.processorId,
75+
partition: options.partition,
76+
version: options.version || 0,
77+
});
12678

12779
return result.success
12880
? { success: true, newCheckpoint: result.newPosition }
@@ -134,8 +86,6 @@ const mongoDBProcessingScope = (options: {
13486
client: MongoClient;
13587
processorId: string;
13688
}): MessageProcessingScope<MongoDBProcessorHandlerContext> => {
137-
// const processorConnectionString = options.connectionString;
138-
13989
const processingScope: MessageProcessingScope<
14090
MongoDBProcessorHandlerContext
14191
> = async <Result = MessageHandlerResult>(
@@ -144,15 +94,6 @@ const mongoDBProcessingScope = (options: {
14494
) => Result | Promise<Result>,
14595
partialContext: Partial<MongoDBProcessorHandlerContext>,
14696
) => {
147-
// const connection = partialContext?.connection;
148-
// const connectionString =
149-
// processorConnectionString ?? connection?.connectionString;
150-
151-
// if (!connectionString)
152-
// throw new EmmettError(
153-
// `MongoDB processor '${options.processorId}' is missing connection string. Ensure that you passed it through options`,
154-
// );
155-
15697
return handler({
15798
client: options.client,
15899
...partialContext,
@@ -174,6 +115,10 @@ export const mongoDBProjector = <EventType extends Event = Event>(
174115
}
175116
: undefined,
176117
};
118+
// TODO: This should be eventually moved to the mongoDBProcessingScope
119+
// In the similar way as it's made in the postgresql processor
120+
// So creating client only if it's needed and different than consumer is passing
121+
// through handler context
177122
const client =
178123
'client' in connectionOptions && connectionOptions.client
179124
? connectionOptions.client
@@ -184,7 +129,7 @@ export const mongoDBProjector = <EventType extends Event = Event>(
184129

185130
return projector<
186131
EventType,
187-
ReadEventMetadataWithGlobalPosition,
132+
MongoDBChangeStreamMessageMetadata,
188133
MongoDBProcessorHandlerContext
189134
>({
190135
...options,

src/packages/emmett-mongodb/src/eventStore/consumers/processorCheckpoint.e2e.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { MongoClient } from 'mongodb';
77
import { after, before, describe, it } from 'node:test';
88
import { readProcessorCheckpoint } from './readProcessorCheckpoint';
99
import { storeProcessorCheckpoint } from './storeProcessorCheckpoint';
10-
import type { MongoDBResumeToken } from './subscriptions/types';
10+
import type { MongoDBResumeToken } from './subscriptions/mongoDbResumeToken';
1111

1212
void describe('storeProcessorCheckpoint and readProcessorCheckpoint tests', () => {
1313
let mongodb: StartedMongoDBContainer;
@@ -117,7 +117,7 @@ void describe('storeProcessorCheckpoint and readProcessorCheckpoint tests', () =
117117
processorId,
118118
});
119119

120-
assertDeepEqual(result, { lastProcessedPosition: resumeToken2 });
120+
assertDeepEqual(result, { lastCheckpoint: resumeToken2 });
121121
});
122122

123123
void it('it can read a position of a processor with a defined partition', async () => {
@@ -126,6 +126,6 @@ void describe('storeProcessorCheckpoint and readProcessorCheckpoint tests', () =
126126
partition: 'partition-2',
127127
});
128128

129-
assertDeepEqual(result, { lastProcessedPosition: resumeToken1 });
129+
assertDeepEqual(result, { lastCheckpoint: resumeToken1 });
130130
});
131131
});
Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
1+
import type { ReadProcessorCheckpointResult } from '@event-driven-io/emmett';
12
import type { MongoClient } from 'mongodb';
2-
import {
3-
DefaultProcessotCheckpointCollectionName,
4-
defaultTag,
5-
type ReadProcessorCheckpointSqlResult,
6-
} from './types';
7-
8-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
9-
export type ReadProcessorCheckpointResult<CheckpointType = any> = {
10-
lastProcessedPosition: CheckpointType | null;
11-
};
3+
import { DefaultProcessotCheckpointCollectionName, defaultTag } from './types';
124

135
// eslint-disable-next-line @typescript-eslint/no-explicit-any
146
export const readProcessorCheckpoint = async <CheckpointType = any>(
@@ -22,7 +14,7 @@ export const readProcessorCheckpoint = async <CheckpointType = any>(
2214
): Promise<ReadProcessorCheckpointResult<CheckpointType>> => {
2315
const result = await client
2416
.db(options.databaseName)
25-
.collection<ReadProcessorCheckpointSqlResult<CheckpointType>>(
17+
.collection<ReadProcessorCheckpointResult<CheckpointType>>(
2618
options.collectionName || DefaultProcessotCheckpointCollectionName,
2719
)
2820
.findOne({
@@ -31,6 +23,6 @@ export const readProcessorCheckpoint = async <CheckpointType = any>(
3123
});
3224

3325
return {
34-
lastProcessedPosition: result !== null ? result.lastProcessedToken : null,
26+
lastCheckpoint: result !== null ? result.lastCheckpoint : null,
3527
};
3628
};

src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { MongoClient } from 'mongodb';
22
import { compareTwoTokens } from './subscriptions';
33
import {
4-
type ReadProcessorCheckpointSqlResult,
4+
type ReadProcessorCheckpointResult,
55
DefaultProcessotCheckpointCollectionName,
66
defaultTag,
77
} from './types';
@@ -39,7 +39,7 @@ export const storeProcessorCheckpoint = async <Position>(
3939
> => {
4040
const checkpoints = client
4141
.db(dbName)
42-
.collection<ReadProcessorCheckpointSqlResult>(
42+
.collection<ReadProcessorCheckpointResult>(
4343
collectionName || DefaultProcessotCheckpointCollectionName,
4444
);
4545

@@ -53,21 +53,21 @@ export const storeProcessorCheckpoint = async <Position>(
5353
// MISMATCH: we have a checkpoint but lastProcessedPosition doesn’t match
5454
if (
5555
current &&
56-
compareTwoTokens(current.lastProcessedToken, lastProcessedPosition) !== 0
56+
compareTwoTokens(current.lastProcessedPosition, lastProcessedPosition) !== 0
5757
) {
5858
return { success: false, reason: 'MISMATCH' };
5959
}
6060

6161
// IGNORED: same or earlier position
62-
if (current?.lastProcessedToken && newPosition) {
63-
if (compareTwoTokens(current.lastProcessedToken, newPosition) !== -1) {
62+
if (current?.lastProcessedPosition && newPosition) {
63+
if (compareTwoTokens(current.lastProcessedPosition, newPosition) !== -1) {
6464
return { success: false, reason: 'IGNORED' };
6565
}
6666
}
6767

6868
const updateResult = await checkpoints.updateOne(
69-
{ ...filter, lastProcessedToken: lastProcessedPosition },
70-
{ $set: { lastProcessedToken: newPosition, version } },
69+
{ ...filter, lastProcessedPosition: lastProcessedPosition },
70+
{ $set: { lastProcessedPosition: newPosition, version } },
7171
{ upsert: true },
7272
);
7373

0 commit comments

Comments
 (0)