@@ -32,8 +32,10 @@ import {
3232 type MongoDBEventStoreConsumer ,
3333} from './mongoDBEventsConsumer' ;
3434import type { MongoDBProcessor } from './mongoDBProcessor' ;
35- import { compareTwoMongoDBTokensData } from './subscriptions' ;
36- import type { MongoDBResumeToken } from './subscriptions/mongoDbResumeToken' ;
35+ import { compareTwoMongoDBCheckpoints } from './subscriptions' ;
36+ import type { MongoDBCheckpoint } from './subscriptions/mongoDBCheckpoint' ;
37+
38+ const withDeadline = { timeout : 30000 } ;
3739
3840void describe ( 'MongoDBEventStore subscription' , ( ) => {
3941 let mongodb : StartedMongoDBContainer ;
@@ -42,7 +44,7 @@ void describe('MongoDBEventStore subscription', () => {
4244 let collection : Collection < EventStream > ;
4345 let consumer : MongoDBEventStoreConsumer < ShoppingCartEvent > ;
4446 let processor : MongoDBProcessor < ProductItemAdded > | undefined ;
45- let lastResumeToken : MongoDBResumeToken [ '_data' ] | null = null ;
47+ let lastResumeToken : MongoDBCheckpoint | null = null ;
4648
4749 const messageProcessingPromise1 = new CancellationPromise < void > ( ) ;
4850 const messageProcessingPromise2 = new CancellationPromise < void > ( ) ;
@@ -95,89 +97,93 @@ void describe('MongoDBEventStore subscription', () => {
9597 await mongodb . stop ( ) ;
9698 } ) ;
9799
98- void it ( 'should react to new events added by the appendToStream' , async ( ) => {
99- let receivedMessageCount : 0 | 1 | 2 = 0 ;
100-
101- processor = consumer . reactor < ProductItemAdded > ( {
102- processorId : v4 ( ) ,
103- stopAfter : ( event ) => {
104- if ( event . data . productItem . productId === lastProductItemIdTest1 ) {
105- messageProcessingPromise1 . resolve ( ) ;
106- consumer . stop ( ) . catch ( noop ) ;
107- }
108- if ( event . data . productItem . productId === lastProductItemIdTest2 ) {
109- messageProcessingPromise2 . resolve ( ) ;
110- consumer . stop ( ) . catch ( noop ) ;
111- }
112-
113- return (
114- event . data . productItem . productId === lastProductItemIdTest1 ||
115- event . data . productItem . productId === lastProductItemIdTest2
116- ) ;
117- } ,
118- eachMessage : ( event ) => {
119- lastResumeToken = event . metadata . globalPosition ;
120-
121- assertTrue ( receivedMessageCount <= 3 ) ;
122- assertEqual (
123- expectedProductItemIds [ receivedMessageCount ] ,
124- event . data . productItem . productId ,
125- ) ;
126-
127- receivedMessageCount ++ ;
128- } ,
129- connectionOptions : {
130- client,
131- } ,
132- } ) ;
133-
134- await eventStore . appendToStream < ShoppingCartEvent > (
135- streamName ,
136- [
137- {
138- type : 'ProductItemAdded' ,
139- data : { productItem : productItem ( expectedProductItemIds [ 0 ] ) } ,
140- } ,
141- ] ,
142- { expectedStreamVersion : STREAM_DOES_NOT_EXIST } ,
143- ) ;
144- await eventStore . appendToStream < ShoppingCartEvent > (
145- streamName ,
146- [
147- {
148- type : 'ProductItemAdded' ,
149- data : { productItem : productItem ( expectedProductItemIds [ 1 ] ) } ,
150- } ,
151- ] ,
152- { expectedStreamVersion : 1n } ,
153- ) ;
154- await eventStore . appendToStream < ShoppingCartEvent > (
155- streamName ,
156- [
157- {
158- type : 'ProductItemAdded' ,
159- data : { productItem : productItem ( expectedProductItemIds [ 2 ] ) } ,
100+ void it (
101+ 'should react to new events added by the appendToStream' ,
102+ withDeadline ,
103+ async ( ) => {
104+ let receivedMessageCount : 0 | 1 | 2 = 0 ;
105+
106+ processor = consumer . reactor < ProductItemAdded > ( {
107+ processorId : v4 ( ) ,
108+ stopAfter : ( event ) => {
109+ if ( event . data . productItem . productId === lastProductItemIdTest1 ) {
110+ messageProcessingPromise1 . resolve ( ) ;
111+ consumer . stop ( ) . catch ( noop ) ;
112+ }
113+ if ( event . data . productItem . productId === lastProductItemIdTest2 ) {
114+ messageProcessingPromise2 . resolve ( ) ;
115+ consumer . stop ( ) . catch ( noop ) ;
116+ }
117+
118+ return (
119+ event . data . productItem . productId === lastProductItemIdTest1 ||
120+ event . data . productItem . productId === lastProductItemIdTest2
121+ ) ;
160122 } ,
161- ] ,
162- { expectedStreamVersion : 2n } ,
163- ) ;
164-
165- await consumer . start ( ) ;
123+ eachMessage : ( event ) => {
124+ lastResumeToken = event . metadata . globalPosition ;
166125
167- const stream = await collection . findOne (
168- { streamName } ,
169- { useBigInt64 : true } ,
170- ) ;
126+ assertTrue ( receivedMessageCount <= 3 ) ;
127+ assertEqual (
128+ expectedProductItemIds [ receivedMessageCount ] ,
129+ event . data . productItem . productId ,
130+ ) ;
171131
172- assertIsNotNull ( stream ) ;
173- assertEqual ( 3n , stream . metadata . streamPosition ) ;
174- assertEqual ( shoppingCartId , stream . metadata . streamId ) ;
175- assertEqual ( streamType , stream . metadata . streamType ) ;
176- assertTrue ( stream . metadata . createdAt instanceof Date ) ;
177- assertTrue ( stream . metadata . updatedAt instanceof Date ) ;
178- } ) ;
179-
180- void it ( 'should renew after the last event' , async ( ) => {
132+ receivedMessageCount ++ ;
133+ } ,
134+ connectionOptions : {
135+ client,
136+ } ,
137+ } ) ;
138+
139+ await eventStore . appendToStream < ShoppingCartEvent > (
140+ streamName ,
141+ [
142+ {
143+ type : 'ProductItemAdded' ,
144+ data : { productItem : productItem ( expectedProductItemIds [ 0 ] ) } ,
145+ } ,
146+ ] ,
147+ { expectedStreamVersion : STREAM_DOES_NOT_EXIST } ,
148+ ) ;
149+ await eventStore . appendToStream < ShoppingCartEvent > (
150+ streamName ,
151+ [
152+ {
153+ type : 'ProductItemAdded' ,
154+ data : { productItem : productItem ( expectedProductItemIds [ 1 ] ) } ,
155+ } ,
156+ ] ,
157+ { expectedStreamVersion : 1n } ,
158+ ) ;
159+ await eventStore . appendToStream < ShoppingCartEvent > (
160+ streamName ,
161+ [
162+ {
163+ type : 'ProductItemAdded' ,
164+ data : { productItem : productItem ( expectedProductItemIds [ 2 ] ) } ,
165+ } ,
166+ ] ,
167+ { expectedStreamVersion : 2n } ,
168+ ) ;
169+
170+ await consumer . start ( ) ;
171+
172+ const stream = await collection . findOne (
173+ { streamName } ,
174+ { useBigInt64 : true } ,
175+ ) ;
176+
177+ assertIsNotNull ( stream ) ;
178+ assertEqual ( 3n , stream . metadata . streamPosition ) ;
179+ assertEqual ( shoppingCartId , stream . metadata . streamId ) ;
180+ assertEqual ( streamType , stream . metadata . streamType ) ;
181+ assertTrue ( stream . metadata . createdAt instanceof Date ) ;
182+ assertTrue ( stream . metadata . updatedAt instanceof Date ) ;
183+ } ,
184+ ) ;
185+
186+ void it ( 'should renew after the last event' , withDeadline , async ( ) => {
181187 assertOk ( processor ) ;
182188
183189 let stream = await collection . findOne (
@@ -196,7 +202,7 @@ void describe('MongoDBEventStore subscription', () => {
196202 // processor after restart is renewed after the 3rd position.
197203 assertEqual (
198204 0 ,
199- compareTwoMongoDBTokensData ( position . lastCheckpoint , lastResumeToken ! ) ,
205+ compareTwoMongoDBCheckpoints ( position . lastCheckpoint , lastResumeToken ! ) ,
200206 ) ;
201207
202208 const consumerPromise = consumer . start ( ) ;
@@ -223,7 +229,7 @@ void describe('MongoDBEventStore subscription', () => {
223229 // lastResumeToken has changed after the last message
224230 assertEqual (
225231 1 ,
226- compareTwoMongoDBTokensData ( lastResumeToken ! , position . lastCheckpoint ) ,
232+ compareTwoMongoDBCheckpoints ( lastResumeToken ! , position . lastCheckpoint ) ,
227233 ) ;
228234
229235 await consumer . stop ( ) ;
0 commit comments