Skip to content

Commit 6d3d461

Browse files
committed
Used URN like MongoDB checkpoint with message position
That fixes issue when someone appended more than one event and second being skipped as both were using the same checkpoint. This also aligns with more future checkpoint handling. Made also other improvements around resiliency as getting properly batch result to stop consumers upon condition.
1 parent 06ced8d commit 6d3d461

9 files changed

+1369
-1404
lines changed

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStore.subscription.e2e.spec.ts

Lines changed: 91 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ import {
3232
type MongoDBEventStoreConsumer,
3333
} from './mongoDBEventsConsumer';
3434
import type { MongoDBProcessor } from './mongoDBProcessor';
35-
import { compareTwoMongoDBTokensData } from './subscriptions';
36-
import type { MongoDBResumeToken } from './subscriptions/mongoDbResumeToken';
35+
import { compareTwoMongoDBCheckpoints } from './subscriptions';
36+
import type { MongoDBCheckpoint } from './subscriptions/mongoDBCheckpoint';
37+
38+
const withDeadline = { timeout: 30000 };
3739

3840
void describe('MongoDBEventStore subscription', () => {
3941
let mongodb: StartedMongoDBContainer;
@@ -42,7 +44,7 @@ void describe('MongoDBEventStore subscription', () => {
4244
let collection: Collection<EventStream>;
4345
let consumer: MongoDBEventStoreConsumer<ShoppingCartEvent>;
4446
let processor: MongoDBProcessor<ProductItemAdded> | undefined;
45-
let lastResumeToken: MongoDBResumeToken['_data'] | null = null;
47+
let lastResumeToken: MongoDBCheckpoint | null = null;
4648

4749
const messageProcessingPromise1 = new CancellationPromise<void>();
4850
const messageProcessingPromise2 = new CancellationPromise<void>();
@@ -95,89 +97,93 @@ void describe('MongoDBEventStore subscription', () => {
9597
await mongodb.stop();
9698
});
9799

98-
void it('should react to new events added by the appendToStream', async () => {
99-
let receivedMessageCount: 0 | 1 | 2 = 0;
100-
101-
processor = consumer.reactor<ProductItemAdded>({
102-
processorId: v4(),
103-
stopAfter: (event) => {
104-
if (event.data.productItem.productId === lastProductItemIdTest1) {
105-
messageProcessingPromise1.resolve();
106-
consumer.stop().catch(noop);
107-
}
108-
if (event.data.productItem.productId === lastProductItemIdTest2) {
109-
messageProcessingPromise2.resolve();
110-
consumer.stop().catch(noop);
111-
}
112-
113-
return (
114-
event.data.productItem.productId === lastProductItemIdTest1 ||
115-
event.data.productItem.productId === lastProductItemIdTest2
116-
);
117-
},
118-
eachMessage: (event) => {
119-
lastResumeToken = event.metadata.globalPosition;
120-
121-
assertTrue(receivedMessageCount <= 3);
122-
assertEqual(
123-
expectedProductItemIds[receivedMessageCount],
124-
event.data.productItem.productId,
125-
);
126-
127-
receivedMessageCount++;
128-
},
129-
connectionOptions: {
130-
client,
131-
},
132-
});
133-
134-
await eventStore.appendToStream<ShoppingCartEvent>(
135-
streamName,
136-
[
137-
{
138-
type: 'ProductItemAdded',
139-
data: { productItem: productItem(expectedProductItemIds[0]) },
140-
},
141-
],
142-
{ expectedStreamVersion: STREAM_DOES_NOT_EXIST },
143-
);
144-
await eventStore.appendToStream<ShoppingCartEvent>(
145-
streamName,
146-
[
147-
{
148-
type: 'ProductItemAdded',
149-
data: { productItem: productItem(expectedProductItemIds[1]) },
150-
},
151-
],
152-
{ expectedStreamVersion: 1n },
153-
);
154-
await eventStore.appendToStream<ShoppingCartEvent>(
155-
streamName,
156-
[
157-
{
158-
type: 'ProductItemAdded',
159-
data: { productItem: productItem(expectedProductItemIds[2]) },
100+
void it(
101+
'should react to new events added by the appendToStream',
102+
withDeadline,
103+
async () => {
104+
let receivedMessageCount: 0 | 1 | 2 = 0;
105+
106+
processor = consumer.reactor<ProductItemAdded>({
107+
processorId: v4(),
108+
stopAfter: (event) => {
109+
if (event.data.productItem.productId === lastProductItemIdTest1) {
110+
messageProcessingPromise1.resolve();
111+
consumer.stop().catch(noop);
112+
}
113+
if (event.data.productItem.productId === lastProductItemIdTest2) {
114+
messageProcessingPromise2.resolve();
115+
consumer.stop().catch(noop);
116+
}
117+
118+
return (
119+
event.data.productItem.productId === lastProductItemIdTest1 ||
120+
event.data.productItem.productId === lastProductItemIdTest2
121+
);
160122
},
161-
],
162-
{ expectedStreamVersion: 2n },
163-
);
164-
165-
await consumer.start();
123+
eachMessage: (event) => {
124+
lastResumeToken = event.metadata.globalPosition;
166125

167-
const stream = await collection.findOne(
168-
{ streamName },
169-
{ useBigInt64: true },
170-
);
126+
assertTrue(receivedMessageCount <= 3);
127+
assertEqual(
128+
expectedProductItemIds[receivedMessageCount],
129+
event.data.productItem.productId,
130+
);
171131

172-
assertIsNotNull(stream);
173-
assertEqual(3n, stream.metadata.streamPosition);
174-
assertEqual(shoppingCartId, stream.metadata.streamId);
175-
assertEqual(streamType, stream.metadata.streamType);
176-
assertTrue(stream.metadata.createdAt instanceof Date);
177-
assertTrue(stream.metadata.updatedAt instanceof Date);
178-
});
179-
180-
void it('should renew after the last event', async () => {
132+
receivedMessageCount++;
133+
},
134+
connectionOptions: {
135+
client,
136+
},
137+
});
138+
139+
await eventStore.appendToStream<ShoppingCartEvent>(
140+
streamName,
141+
[
142+
{
143+
type: 'ProductItemAdded',
144+
data: { productItem: productItem(expectedProductItemIds[0]) },
145+
},
146+
],
147+
{ expectedStreamVersion: STREAM_DOES_NOT_EXIST },
148+
);
149+
await eventStore.appendToStream<ShoppingCartEvent>(
150+
streamName,
151+
[
152+
{
153+
type: 'ProductItemAdded',
154+
data: { productItem: productItem(expectedProductItemIds[1]) },
155+
},
156+
],
157+
{ expectedStreamVersion: 1n },
158+
);
159+
await eventStore.appendToStream<ShoppingCartEvent>(
160+
streamName,
161+
[
162+
{
163+
type: 'ProductItemAdded',
164+
data: { productItem: productItem(expectedProductItemIds[2]) },
165+
},
166+
],
167+
{ expectedStreamVersion: 2n },
168+
);
169+
170+
await consumer.start();
171+
172+
const stream = await collection.findOne(
173+
{ streamName },
174+
{ useBigInt64: true },
175+
);
176+
177+
assertIsNotNull(stream);
178+
assertEqual(3n, stream.metadata.streamPosition);
179+
assertEqual(shoppingCartId, stream.metadata.streamId);
180+
assertEqual(streamType, stream.metadata.streamType);
181+
assertTrue(stream.metadata.createdAt instanceof Date);
182+
assertTrue(stream.metadata.updatedAt instanceof Date);
183+
},
184+
);
185+
186+
void it('should renew after the last event', withDeadline, async () => {
181187
assertOk(processor);
182188

183189
let stream = await collection.findOne(
@@ -196,7 +202,7 @@ void describe('MongoDBEventStore subscription', () => {
196202
// processor after restart is renewed after the 3rd position.
197203
assertEqual(
198204
0,
199-
compareTwoMongoDBTokensData(position.lastCheckpoint, lastResumeToken!),
205+
compareTwoMongoDBCheckpoints(position.lastCheckpoint, lastResumeToken!),
200206
);
201207

202208
const consumerPromise = consumer.start();
@@ -223,7 +229,7 @@ void describe('MongoDBEventStore subscription', () => {
223229
// lastResumeToken has changed after the last message
224230
assertEqual(
225231
1,
226-
compareTwoMongoDBTokensData(lastResumeToken!, position.lastCheckpoint),
232+
compareTwoMongoDBCheckpoints(lastResumeToken!, position.lastCheckpoint),
227233
);
228234

229235
await consumer.stop();

0 commit comments

Comments
 (0)