Skip to content

Commit 6f78760

Browse files
committed
test: fix
1 parent 811bc97 commit 6f78760

File tree

5 files changed

+434
-143
lines changed

5 files changed

+434
-143
lines changed

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts

Lines changed: 41 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,16 @@ import {
44
type AnyEvent,
55
type AnyMessage,
66
type AsyncRetryOptions,
7-
type CommonRecordedMessageMetadata,
87
type DefaultRecord,
9-
type Event,
108
type GlobalPositionTypeOfRecordedMessageMetadata,
119
type Message,
1210
type MessageConsumer,
13-
type ReadEvent,
1411
type RecordedMessage,
1512
} from '@event-driven-io/emmett';
16-
import { ChangeStream, MongoClient, type MongoClientOptions } from 'mongodb';
13+
import { MongoClient, type MongoClientOptions } from 'mongodb';
1714
import { v4 as uuid } from 'uuid';
1815
import type { MongoDBRecordedMessageMetadata } from '../event';
19-
import type {
20-
EventStream,
21-
MongoDBReadEventMetadata,
22-
} from '../mongoDBEventStore';
16+
import type { MongoDBReadEventMetadata } from '../mongoDBEventStore';
2317
import { CancellationPromise } from './CancellablePromise';
2418
import {
2519
changeStreamReactor,
@@ -29,16 +23,18 @@ import {
2923
type MongoDBProjectorOptions,
3024
} from './mongoDBProcessor';
3125
import {
32-
subscribe as _subscribe,
26+
generateVersionPolicies,
27+
mongoDBSubscription,
3328
zipMongoDBMessageBatchPullerStartFrom,
3429
type ChangeStreamFullDocumentValuePolicy,
35-
type MongoDBSubscriptionDocument,
30+
type MongoDBSubscription,
3631
} from './subscriptions';
32+
import type { MongoDBResumeToken } from './subscriptions/types';
3733

3834
export type MessageConsumerOptions<
3935
MessageType extends Message = AnyMessage,
4036
MessageMetadataType extends
41-
MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata,
37+
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
4238
HandlerContext extends DefaultRecord | undefined = undefined,
4339
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
4440
> = {
@@ -56,7 +52,7 @@ export type MongoDBEventStoreConsumerConfig<
5652
// eslint-disable-next-line @typescript-eslint/no-explicit-any
5753
ConsumerMessageType extends Message = any,
5854
MessageMetadataType extends
59-
MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata,
55+
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
6056
HandlerContext extends DefaultRecord | undefined = undefined,
6157
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
6258
> = MessageConsumerOptions<
@@ -78,7 +74,7 @@ export type MongoDBEventStoreConsumerConfig<
7874
export type MongoDBConsumerOptions<
7975
ConsumerEventType extends Message = Message,
8076
MessageMetadataType extends
81-
MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata,
77+
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
8278
HandlerContext extends DefaultRecord | undefined = undefined,
8379
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
8480
> = MongoDBEventStoreConsumerConfig<
@@ -119,40 +115,17 @@ export type MongoDBEventStoreConsumer<
119115
}>
120116
: object);
121117

122-
type MessageArrayElement = `messages.${string}`;
123-
type UpdateDescription<T> = {
124-
updateDescription: {
125-
updatedFields: Record<MessageArrayElement, T> & {
126-
'metadata.streamPosition': number;
127-
'metadata.updatedAt': Date;
128-
};
129-
};
130-
};
131-
type FullDocument<
132-
EventType extends Event = Event,
133-
EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata,
134-
T extends EventStream = EventStream<EventType, EventMetaDataType>,
135-
> = {
136-
fullDocument: T;
137-
};
138-
type OplogChange<
139-
EventType extends Event = Event,
140-
EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata,
141-
T extends EventStream = EventStream<EventType, EventMetaDataType>,
142-
> =
143-
| FullDocument<EventType, EventMetaDataType, T>
144-
| UpdateDescription<ReadEvent<EventType, EventMetaDataType>>;
145-
146118
export type MongoDBConsumerHandlerContext = {
147119
client?: MongoClient;
148120
};
121+
149122
export const mongoDBMessagesConsumer = <
150123
ConsumerMessageType extends Message = AnyMessage,
151124
MessageMetadataType extends
152-
MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata,
125+
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
153126
HandlerContext extends
154127
MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext,
155-
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,
128+
CheckpointType = MongoDBResumeToken,
156129
>(
157130
options: MongoDBConsumerOptions<
158131
ConsumerMessageType,
@@ -162,23 +135,14 @@ export const mongoDBMessagesConsumer = <
162135
>,
163136
): MongoDBEventStoreConsumer<ConsumerMessageType> => {
164137
let start: Promise<void>;
165-
let stream: ChangeStream<
166-
EventStream<Event, CommonRecordedMessageMetadata>,
167-
MongoDBSubscriptionDocument<
168-
EventStream<Event, CommonRecordedMessageMetadata>
169-
>
170-
>;
138+
let stream: MongoDBSubscription<CheckpointType>;
171139
let isRunning = false;
172140
let runningPromise = new CancellationPromise<null>();
173141
const client =
174142
'client' in options && options.client
175143
? options.client
176144
: new MongoClient(options.connectionString, options.clientOptions);
177145
const processors = options.processors ?? [];
178-
const subscribe = _subscribe(
179-
options.changeStreamFullDocumentPolicy,
180-
client.db(),
181-
);
182146

183147
return {
184148
consumerId: options.consumerId ?? uuid(),
@@ -237,81 +201,51 @@ export const mongoDBMessagesConsumer = <
237201
const startFrom =
238202
zipMongoDBMessageBatchPullerStartFrom<CheckpointType>(positions);
239203

240-
stream = subscribe<Event, CheckpointType>(startFrom);
241-
242-
void (async () => {
243-
while (!stream.closed && isRunning) {
244-
const hasNext = await Promise.race([
245-
stream.hasNext(),
246-
runningPromise,
247-
]);
248-
249-
if (hasNext === null) {
250-
break;
251-
}
252-
253-
if (!hasNext) {
254-
continue;
255-
}
256-
257-
const change = await stream.next();
258-
const resumeToken = change._id;
259-
const typedChange = change as OplogChange;
260-
const streamChange =
261-
'updateDescription' in typedChange
262-
? {
263-
messages: Object.entries(
264-
typedChange.updateDescription.updatedFields,
265-
)
266-
.filter(([key]) => key.startsWith('messages.'))
267-
.map(([, value]) => value as ReadEvent),
268-
}
269-
: typedChange.fullDocument;
270-
271-
if (!streamChange) {
272-
return;
273-
}
274-
275-
const messages = streamChange.messages.map((message) => {
276-
return {
277-
kind: message.kind,
278-
type: message.type,
279-
data: message.data,
280-
metadata: {
281-
...message.metadata,
282-
globalPosition: resumeToken,
283-
},
284-
} as unknown as RecordedMessage<
285-
ConsumerMessageType,
286-
MessageMetadataType
287-
>;
288-
});
289-
204+
stream = mongoDBSubscription<
205+
ConsumerMessageType,
206+
MessageMetadataType,
207+
CheckpointType
208+
>({
209+
client,
210+
from: startFrom,
211+
eachBatch: async (
212+
messages: RecordedMessage<
213+
ConsumerMessageType,
214+
MessageMetadataType
215+
>[],
216+
) => {
290217
for (const processor of processors.filter(
291218
({ isActive }) => isActive,
292219
)) {
293220
await processor.handle(messages, {
294221
client,
295222
} as Partial<HandlerContext>);
296223
}
297-
}
224+
},
225+
});
226+
227+
// TODO: Remember to fix.
228+
const policy = (await generateVersionPolicies(options.client?.db()!))
229+
.changeStreamFullDocumentValuePolicy;
298230

299-
console.log('END');
300-
})();
231+
await stream.start({
232+
getFullDocumentValue: policy,
233+
startFrom,
234+
});
301235
})();
302236

303237
return start;
304238
},
305239
stop: async () => {
306-
if (stream) {
307-
await stream.close();
240+
if (stream.isRunning) {
241+
await stream.stop();
308242
isRunning = false;
309243
runningPromise.resolve(null);
310244
}
311245
},
312246
close: async () => {
313-
if (stream) {
314-
await stream.close();
247+
if (stream.isRunning) {
248+
await stream.stop();
315249
isRunning = false;
316250
runningPromise.resolve(null);
317251
}
@@ -322,7 +256,7 @@ export const mongoDBMessagesConsumer = <
322256
export const mongoDBChangeStreamMessagesConsumer = <
323257
ConsumerMessageType extends Message = AnyMessage,
324258
MessageMetadataType extends
325-
MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata,
259+
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
326260
HandlerContext extends
327261
MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext,
328262
CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata<MessageMetadataType>,

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,15 @@ export type MongoDBProjectorOptions<EventType extends AnyEvent = AnyEvent> =
8484

8585
// eslint-disable-next-line @typescript-eslint/no-explicit-any
8686
const isResumeToken = (value: any): value is MongoDBResumeToken =>
87+
typeof value === 'object' &&
8788
'_data' in value &&
8889
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
8990
typeof value._data === 'string';
9091

9192
export const getCheckpoint = <
9293
MessageType extends AnyMessage = AnyMessage,
9394
// eslint-disable-next-line @typescript-eslint/no-explicit-any
94-
CheckpointType = any,
95+
CheckpointType = MongoDBCheckpointer,
9596
MessageMetadataType extends
9697
ReadEventMetadataWithGlobalPosition<CheckpointType> = ReadEventMetadataWithGlobalPosition<CheckpointType>,
9798
>(

0 commit comments

Comments
 (0)