Skip to content

Commit 4bebdd1

Browse files
committed
Merged mongodb checkpoints into one file added more tests
1 parent 507a345 commit 4bebdd1

13 files changed

+1598
-236
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import {
2+
type Message,
3+
type ReadProcessorCheckpointResult,
4+
getCheckpoint,
5+
} from '@event-driven-io/emmett';
6+
import type { MongoClient } from 'mongodb';
7+
import type { MongoDBCheckpointer } from './mongoDBProcessor';
8+
import { compareTwoTokens } from './subscriptions';
9+
import { DefaultProcessotCheckpointCollectionName, defaultTag } from './types';
10+
11+
export const mongoDBCheckpointer = <
12+
MessageType extends Message = Message,
13+
>(): MongoDBCheckpointer<MessageType> => ({
14+
read: async (options, context) => {
15+
const result = await readProcessorCheckpoint(context.client, options);
16+
17+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
18+
return { lastCheckpoint: result?.lastCheckpoint };
19+
},
20+
store: async (options, context) => {
21+
const newCheckpoint = getCheckpoint(options.message);
22+
23+
const result = await storeProcessorCheckpoint(context.client, {
24+
lastStoredCheckpoint: options.lastCheckpoint,
25+
newCheckpoint,
26+
processorId: options.processorId,
27+
partition: options.partition,
28+
version: options.version || 0,
29+
});
30+
31+
return result.success
32+
? { success: true, newCheckpoint: result.newCheckpoint }
33+
: result;
34+
},
35+
});
36+
37+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
38+
type ReadProcessorCheckpointMongoDBResult<Position = any> = {
39+
lastProcessedCheckpoint: Position;
40+
processorId: string;
41+
partitionId: string;
42+
version: number;
43+
};
44+
45+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
46+
export const readProcessorCheckpoint = async <CheckpointType = any>(
47+
client: MongoClient,
48+
options: {
49+
processorId: string;
50+
partition?: string;
51+
collectionName?: string;
52+
databaseName?: string;
53+
},
54+
): Promise<ReadProcessorCheckpointResult<CheckpointType>> => {
55+
const result = await client
56+
.db(options.databaseName)
57+
.collection<ReadProcessorCheckpointMongoDBResult<CheckpointType>>(
58+
options.collectionName || DefaultProcessotCheckpointCollectionName,
59+
)
60+
.findOne({
61+
processorId: options.processorId,
62+
partitionId: options.partition || defaultTag,
63+
});
64+
65+
return {
66+
lastCheckpoint: result !== null ? result.lastProcessedCheckpoint : null,
67+
};
68+
};
69+
70+
type StoreLastProcessedProcessorPositionResult<Position = unknown> =
71+
| {
72+
success: true;
73+
newCheckpoint: Position;
74+
}
75+
| { success: false; reason: 'IGNORED' | 'MISMATCH' };
76+
77+
export const storeProcessorCheckpoint = async <Position>(
78+
client: MongoClient,
79+
{
80+
processorId,
81+
version,
82+
newCheckpoint,
83+
lastStoredCheckpoint,
84+
partition,
85+
collectionName,
86+
dbName,
87+
}: {
88+
processorId: string;
89+
version: number;
90+
newCheckpoint: Position | null;
91+
lastStoredCheckpoint: Position | null;
92+
partition?: string;
93+
collectionName?: string;
94+
dbName?: string;
95+
},
96+
): Promise<
97+
StoreLastProcessedProcessorPositionResult<
98+
null extends Position ? Position | null : Position
99+
>
100+
> => {
101+
const checkpoints = client
102+
.db(dbName)
103+
.collection<ReadProcessorCheckpointMongoDBResult>(
104+
collectionName || DefaultProcessotCheckpointCollectionName,
105+
);
106+
107+
const filter = {
108+
processorId: processorId,
109+
partitionId: partition || defaultTag,
110+
};
111+
112+
const current = await checkpoints.findOne(filter);
113+
114+
// MISMATCH: we have a checkpoint but lastProcessedCheckpoint doesn’t match
115+
if (
116+
current &&
117+
compareTwoTokens(current.lastProcessedCheckpoint, lastStoredCheckpoint) !==
118+
0
119+
) {
120+
return { success: false, reason: 'MISMATCH' };
121+
}
122+
123+
// IGNORED: same or earlier position
124+
if (current?.lastProcessedCheckpoint && newCheckpoint) {
125+
if (
126+
compareTwoTokens(current.lastProcessedCheckpoint, newCheckpoint) !== -1
127+
) {
128+
return { success: false, reason: 'IGNORED' };
129+
}
130+
}
131+
132+
const updateResult = await checkpoints.updateOne(
133+
{ ...filter, lastProcessedCheckpoint: lastStoredCheckpoint },
134+
{ $set: { lastProcessedCheckpoint: newCheckpoint, version } },
135+
{ upsert: true },
136+
);
137+
138+
if (updateResult.matchedCount > 0 || updateResult.upsertedCount > 0) {
139+
return { success: true, newCheckpoint: newCheckpoint! };
140+
}
141+
142+
return { success: false, reason: 'MISMATCH' };
143+
};

src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts renamed to src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventStore.subscription.e2e.spec.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,20 @@ import {
2020
toStreamName,
2121
type EventStream,
2222
type MongoDBEventStore,
23-
} from '.';
23+
} from '..';
2424
import {
2525
type PricedProductItem,
2626
type ProductItemAdded,
2727
type ShoppingCartEvent,
28-
} from '../testing';
29-
import { CancellationPromise } from './consumers/CancellablePromise';
28+
} from '../../testing';
29+
import { CancellationPromise } from './CancellablePromise';
3030
import {
3131
mongoDBEventStoreConsumer,
3232
type MongoDBEventStoreConsumer,
33-
} from './consumers/mongoDBEventsConsumer';
34-
import type { MongoDBProcessor } from './consumers/mongoDBProcessor';
35-
import { compareTwoMongoDBTokensData } from './consumers/subscriptions';
36-
import type { MongoDBResumeToken } from './consumers/subscriptions/mongoDbResumeToken';
33+
} from './mongoDBEventsConsumer';
34+
import type { MongoDBProcessor } from './mongoDBProcessor';
35+
import { compareTwoMongoDBTokensData } from './subscriptions';
36+
import type { MongoDBResumeToken } from './subscriptions/mongoDbResumeToken';
3737

3838
void describe('MongoDBEventStore subscription', () => {
3939
let mongodb: StartedMongoDBContainer;

0 commit comments

Comments
 (0)