@@ -4,22 +4,16 @@ import {
44 type AnyEvent ,
55 type AnyMessage ,
66 type AsyncRetryOptions ,
7- type CommonRecordedMessageMetadata ,
87 type DefaultRecord ,
9- type Event ,
108 type GlobalPositionTypeOfRecordedMessageMetadata ,
119 type Message ,
1210 type MessageConsumer ,
13- type ReadEvent ,
1411 type RecordedMessage ,
1512} from '@event-driven-io/emmett' ;
16- import { ChangeStream , MongoClient , type MongoClientOptions } from 'mongodb' ;
13+ import { MongoClient , type MongoClientOptions } from 'mongodb' ;
1714import { v4 as uuid } from 'uuid' ;
1815import type { MongoDBRecordedMessageMetadata } from '../event' ;
19- import type {
20- EventStream ,
21- MongoDBReadEventMetadata ,
22- } from '../mongoDBEventStore' ;
16+ import type { MongoDBReadEventMetadata } from '../mongoDBEventStore' ;
2317import { CancellationPromise } from './CancellablePromise' ;
2418import {
2519 changeStreamReactor ,
@@ -29,16 +23,18 @@ import {
2923 type MongoDBProjectorOptions ,
3024} from './mongoDBProcessor' ;
3125import {
32- subscribe as _subscribe ,
26+ generateVersionPolicies ,
27+ mongoDBSubscription ,
3328 zipMongoDBMessageBatchPullerStartFrom ,
3429 type ChangeStreamFullDocumentValuePolicy ,
35- type MongoDBSubscriptionDocument ,
30+ type MongoDBSubscription ,
3631} from './subscriptions' ;
32+ import type { MongoDBResumeToken } from './subscriptions/types' ;
3733
3834export type MessageConsumerOptions <
3935 MessageType extends Message = AnyMessage ,
4036 MessageMetadataType extends
41- MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata ,
37+ MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata ,
4238 HandlerContext extends DefaultRecord | undefined = undefined ,
4339 CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata < MessageMetadataType > ,
4440> = {
@@ -56,7 +52,7 @@ export type MongoDBEventStoreConsumerConfig<
5652 // eslint-disable-next-line @typescript-eslint/no-explicit-any
5753 ConsumerMessageType extends Message = any ,
5854 MessageMetadataType extends
59- MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata ,
55+ MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata ,
6056 HandlerContext extends DefaultRecord | undefined = undefined ,
6157 CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata < MessageMetadataType > ,
6258> = MessageConsumerOptions <
@@ -78,7 +74,7 @@ export type MongoDBEventStoreConsumerConfig<
7874export type MongoDBConsumerOptions <
7975 ConsumerEventType extends Message = Message ,
8076 MessageMetadataType extends
81- MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata ,
77+ MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata ,
8278 HandlerContext extends DefaultRecord | undefined = undefined ,
8379 CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata < MessageMetadataType > ,
8480> = MongoDBEventStoreConsumerConfig <
@@ -119,40 +115,17 @@ export type MongoDBEventStoreConsumer<
119115 } >
120116 : object ) ;
121117
122- type MessageArrayElement = `messages.${string } `;
123- type UpdateDescription < T > = {
124- updateDescription : {
125- updatedFields : Record < MessageArrayElement , T > & {
126- 'metadata.streamPosition' : number ;
127- 'metadata.updatedAt' : Date ;
128- } ;
129- } ;
130- } ;
131- type FullDocument <
132- EventType extends Event = Event ,
133- EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata ,
134- T extends EventStream = EventStream < EventType , EventMetaDataType > ,
135- > = {
136- fullDocument : T ;
137- } ;
138- type OplogChange <
139- EventType extends Event = Event ,
140- EventMetaDataType extends MongoDBReadEventMetadata = MongoDBReadEventMetadata ,
141- T extends EventStream = EventStream < EventType , EventMetaDataType > ,
142- > =
143- | FullDocument < EventType , EventMetaDataType , T >
144- | UpdateDescription < ReadEvent < EventType , EventMetaDataType > > ;
145-
146118export type MongoDBConsumerHandlerContext = {
147119 client ?: MongoClient ;
148120} ;
121+
149122export const mongoDBMessagesConsumer = <
150123 ConsumerMessageType extends Message = AnyMessage ,
151124 MessageMetadataType extends
152- MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata ,
125+ MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata ,
153126 HandlerContext extends
154127 MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext ,
155- CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata < MessageMetadataType > ,
128+ CheckpointType = MongoDBResumeToken ,
156129> (
157130 options : MongoDBConsumerOptions <
158131 ConsumerMessageType ,
@@ -162,23 +135,14 @@ export const mongoDBMessagesConsumer = <
162135 > ,
163136) : MongoDBEventStoreConsumer < ConsumerMessageType > => {
164137 let start : Promise < void > ;
165- let stream : ChangeStream <
166- EventStream < Event , CommonRecordedMessageMetadata > ,
167- MongoDBSubscriptionDocument <
168- EventStream < Event , CommonRecordedMessageMetadata >
169- >
170- > ;
138+ let stream : MongoDBSubscription < CheckpointType > ;
171139 let isRunning = false ;
172140 let runningPromise = new CancellationPromise < null > ( ) ;
173141 const client =
174142 'client' in options && options . client
175143 ? options . client
176144 : new MongoClient ( options . connectionString , options . clientOptions ) ;
177145 const processors = options . processors ?? [ ] ;
178- const subscribe = _subscribe (
179- options . changeStreamFullDocumentPolicy ,
180- client . db ( ) ,
181- ) ;
182146
183147 return {
184148 consumerId : options . consumerId ?? uuid ( ) ,
@@ -237,81 +201,51 @@ export const mongoDBMessagesConsumer = <
237201 const startFrom =
238202 zipMongoDBMessageBatchPullerStartFrom < CheckpointType > ( positions ) ;
239203
240- stream = subscribe < Event , CheckpointType > ( startFrom ) ;
241-
242- void ( async ( ) => {
243- while ( ! stream . closed && isRunning ) {
244- const hasNext = await Promise . race ( [
245- stream . hasNext ( ) ,
246- runningPromise ,
247- ] ) ;
248-
249- if ( hasNext === null ) {
250- break ;
251- }
252-
253- if ( ! hasNext ) {
254- continue ;
255- }
256-
257- const change = await stream . next ( ) ;
258- const resumeToken = change . _id ;
259- const typedChange = change as OplogChange ;
260- const streamChange =
261- 'updateDescription' in typedChange
262- ? {
263- messages : Object . entries (
264- typedChange . updateDescription . updatedFields ,
265- )
266- . filter ( ( [ key ] ) => key . startsWith ( 'messages.' ) )
267- . map ( ( [ , value ] ) => value as ReadEvent ) ,
268- }
269- : typedChange . fullDocument ;
270-
271- if ( ! streamChange ) {
272- return ;
273- }
274-
275- const messages = streamChange . messages . map ( ( message ) => {
276- return {
277- kind : message . kind ,
278- type : message . type ,
279- data : message . data ,
280- metadata : {
281- ...message . metadata ,
282- globalPosition : resumeToken ,
283- } ,
284- } as unknown as RecordedMessage <
285- ConsumerMessageType ,
286- MessageMetadataType
287- > ;
288- } ) ;
289-
204+ stream = mongoDBSubscription <
205+ ConsumerMessageType ,
206+ MessageMetadataType ,
207+ CheckpointType
208+ > ( {
209+ client,
210+ from : startFrom ,
211+ eachBatch : async (
212+ messages : RecordedMessage <
213+ ConsumerMessageType ,
214+ MessageMetadataType
215+ > [ ] ,
216+ ) => {
290217 for ( const processor of processors . filter (
291218 ( { isActive } ) => isActive ,
292219 ) ) {
293220 await processor . handle ( messages , {
294221 client,
295222 } as Partial < HandlerContext > ) ;
296223 }
297- }
224+ } ,
225+ } ) ;
226+
227+ // TODO: Remember to fix.
228+ const policy = ( await generateVersionPolicies ( options . client ?. db ( ) ! ) )
229+ . changeStreamFullDocumentValuePolicy ;
298230
299- console . log ( 'END' ) ;
300- } ) ( ) ;
231+ await stream . start ( {
232+ getFullDocumentValue : policy ,
233+ startFrom,
234+ } ) ;
301235 } ) ( ) ;
302236
303237 return start ;
304238 } ,
305239 stop : async ( ) => {
306- if ( stream ) {
307- await stream . close ( ) ;
240+ if ( stream . isRunning ) {
241+ await stream . stop ( ) ;
308242 isRunning = false ;
309243 runningPromise . resolve ( null ) ;
310244 }
311245 } ,
312246 close : async ( ) => {
313- if ( stream ) {
314- await stream . close ( ) ;
247+ if ( stream . isRunning ) {
248+ await stream . stop ( ) ;
315249 isRunning = false ;
316250 runningPromise . resolve ( null ) ;
317251 }
@@ -322,7 +256,7 @@ export const mongoDBMessagesConsumer = <
322256export const mongoDBChangeStreamMessagesConsumer = <
323257 ConsumerMessageType extends Message = AnyMessage ,
324258 MessageMetadataType extends
325- MongoDBRecordedMessageMetadata = MongoDBRecordedMessageMetadata ,
259+ MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata ,
326260 HandlerContext extends
327261 MongoDBConsumerHandlerContext = MongoDBConsumerHandlerContext ,
328262 CheckpointType = GlobalPositionTypeOfRecordedMessageMetadata < MessageMetadataType > ,
0 commit comments