Skip to content

Commit dc892a6

Browse files
committed
Fixed ESDB reactors to handle events sequentially
1 parent d0ff0d7 commit dc892a6

File tree

10 files changed

+200
-65
lines changed

10 files changed

+200
-65
lines changed

src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.handling.int.spec.ts

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
assertThatArray,
3+
delay,
34
getInMemoryDatabase,
45
type Event,
56
type InMemoryReactorOptions,
@@ -21,7 +22,7 @@ import {
2122
type EventStoreDBEventStoreConsumerType,
2223
} from './eventStoreDBEventStoreConsumer';
2324

24-
const withDeadline = { timeout: 5000 };
25+
const withDeadline = { timeout: 10000 };
2526

2627
void describe('EventStoreDB event store started consumer', () => {
2728
let eventStoreDB: StartedEventStoreDBContainer;
@@ -103,15 +104,7 @@ void describe('EventStoreDB event store started consumer', () => {
103104
const expectedEvents: RecordedMessage<GuestStayEvent>[] = [
104105
...events,
105106
...events,
106-
].map(
107-
(e, i) =>
108-
({
109-
...e,
110-
metadata: {
111-
checkpoint: BigInt(i),
112-
},
113-
}) as unknown as RecordedMessage<GuestStayEvent>,
114-
);
107+
] as unknown as RecordedMessage<GuestStayEvent>[];
115108

116109
assertThatArray(result).hasSize(expectedEvents.length);
117110
assertThatArray(result).containsElementsMatching(expectedEvents);
@@ -167,6 +160,52 @@ void describe('EventStoreDB event store started consumer', () => {
167160
},
168161
);
169162

163+
void it(`handles events SEQUENTIALLY`, { timeout: 15000 }, async () => {
164+
// Given
165+
const guestId = uuid();
166+
const otherGuestId = uuid();
167+
const streamName = `guestStay-${otherGuestId}`;
168+
const otherStreamName = `guestStay-${guestId}`;
169+
const events: NumberRecorded[] = [
170+
{ type: 'NumberRecorded', data: { number: 1 } },
171+
{ type: 'NumberRecorded', data: { number: 2 } },
172+
{ type: 'NumberRecorded', data: { number: 3 } },
173+
{ type: 'NumberRecorded', data: { number: 4 } },
174+
{ type: 'NumberRecorded', data: { number: 5 } },
175+
];
176+
const appendResult = await eventStore.appendToStream(streamName, events);
177+
await eventStore.appendToStream(otherStreamName, events);
178+
179+
const result: NumberRecorded[] = [];
180+
181+
// When
182+
const consumer = eventStoreDBEventStoreConsumer({
183+
connectionString,
184+
from: { stream: streamName },
185+
});
186+
consumer.reactor<NumberRecorded>({
187+
processorId: uuid(),
188+
stopAfter: (event) =>
189+
event.metadata.globalPosition ===
190+
appendResult.lastEventGlobalPosition,
191+
eachMessage: async (event) => {
192+
await delay(Math.floor(Math.random() * 150));
193+
194+
result.push(event);
195+
},
196+
});
197+
198+
try {
199+
await consumer.start();
200+
201+
assertThatArray(
202+
result.map((e) => e.data.number),
203+
).containsElementsMatching(events.map((e) => e.data.number));
204+
} finally {
205+
await consumer.close();
206+
}
207+
});
208+
170209
void it(
171210
`handles all events from $all streams for subscription to stream`,
172211
withDeadline,
@@ -641,3 +680,5 @@ type GuestCheckedIn = Event<'GuestCheckedIn', { guestId: string }>;
641680
type GuestCheckedOut = Event<'GuestCheckedOut', { guestId: string }>;
642681

643682
type GuestStayEvent = GuestCheckedIn | GuestCheckedOut;
683+
684+
type NumberRecorded = Event<'NumberRecorded', { number: number }>;

src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.inMemory.projections.int.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
} from '../eventstoreDBEventStore';
2323
import { eventStoreDBEventStoreConsumer } from './eventStoreDBEventStoreConsumer';
2424

25-
const withDeadline = { timeout: 5000 };
25+
const withDeadline = { timeout: 10000 };
2626

2727
void describe('EventStoreDB event store started consumer', () => {
2828
let eventStoreDB: StartedEventStoreDBContainer;

src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts

Lines changed: 108 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
11
import {
22
asyncRetry,
3+
getCheckpoint,
4+
isBigint,
5+
JSONParser,
36
type AnyMessage,
47
type AsyncRetryOptions,
58
type BatchRecordedMessageHandlerWithoutContext,
69
type EmmettError,
710
type Message,
11+
type MessageHandlerResult,
812
} from '@event-driven-io/emmett';
913
import {
1014
END,
1115
EventStoreDBClient,
1216
excludeSystemEvents,
1317
START,
18+
type ResolvedEvent,
1419
type StreamSubscription,
1520
} from '@eventstore/db-client';
16-
import { finished, Readable } from 'stream';
21+
import { pipeline, Transform, Writable, type WritableOptions } from 'stream';
1722
import {
1823
mapFromESDBEvent,
1924
type EventStoreDBReadEventMetadata,
@@ -86,13 +91,13 @@ const subscribe = (
8691
) =>
8792
from == undefined || from.stream == $all
8893
? client.subscribeToAll({
94+
...(from?.options ?? {}),
8995
fromPosition: toGlobalPosition(options.startFrom),
9096
filter: excludeSystemEvents(),
91-
...(from?.options ?? {}),
9297
})
9398
: client.subscribeToStream(from.stream, {
94-
fromRevision: toStreamPosition(options.startFrom),
9599
...(from.options ?? {}),
100+
fromRevision: toStreamPosition(options.startFrom),
96101
});
97102

98103
export const isDatabaseUnavailableError = (error: unknown) =>
@@ -109,12 +114,58 @@ export const EventStoreDBResubscribeDefaultOptions: AsyncRetryOptions = {
109114
shouldRetryError: (error) => !isDatabaseUnavailableError(error),
110115
};
111116

117+
type SubscriptionSequentialHandlerOptions<
118+
MessageType extends AnyMessage = AnyMessage,
119+
> = EventStoreDBSubscriptionOptions<MessageType> & WritableOptions;
120+
121+
class SubscriptionSequentialHandler<
122+
MessageType extends AnyMessage = AnyMessage,
123+
> extends Transform {
124+
private options: SubscriptionSequentialHandlerOptions<MessageType>;
125+
private from: EventStoreDBEventStoreConsumerType | undefined;
126+
127+
constructor(options: SubscriptionSequentialHandlerOptions<MessageType>) {
128+
super({ objectMode: true, ...options });
129+
this.options = options;
130+
this.from = options.from;
131+
}
132+
133+
async _transform(
134+
resolvedEvent: ResolvedEvent<MessageType>,
135+
_encoding: BufferEncoding,
136+
callback: (error?: Error | null) => void,
137+
): Promise<void> {
138+
try {
139+
if (!resolvedEvent.event) return;
140+
141+
const message = mapFromESDBEvent(resolvedEvent, this.from);
142+
const messageCheckpoint = getCheckpoint(message);
143+
144+
const result = await this.options.eachBatch([message]);
145+
146+
if (result && result.type === 'STOP') {
147+
if (!result.error) this.push(messageCheckpoint);
148+
149+
this.push(result);
150+
callback();
151+
return;
152+
}
153+
154+
this.push(messageCheckpoint);
155+
156+
callback();
157+
} catch (error) {
158+
callback(error as Error);
159+
}
160+
}
161+
}
162+
112163
export const eventStoreDBSubscription = <
113164
MessageType extends AnyMessage = AnyMessage,
114165
>({
115166
client,
116167
from,
117-
//batchSize,
168+
batchSize,
118169
eachBatch,
119170
resilience,
120171
}: EventStoreDBSubscriptionOptions<MessageType>): EventStoreDBSubscription => {
@@ -133,49 +184,65 @@ export const eventStoreDBSubscription = <
133184
EventStoreDBResubscribeDefaultOptions.shouldRetryError!(error),
134185
};
135186

136-
const pipeMessages = (options: EventStoreDBSubscriptionStartOptions) => {
137-
subscription = subscribe(client, from, options);
187+
const stopSubscription = (callback?: () => void): Promise<void> => {
188+
isRunning = false;
189+
return subscription
190+
.unsubscribe()
191+
.catch((err) => console.error('Error during unsubscribe.%s', err))
192+
.finally(callback ?? (() => {}));
193+
};
138194

195+
const pipeMessages = (options: EventStoreDBSubscriptionStartOptions) => {
196+
let retry = 0;
139197
return asyncRetry(
140198
() =>
141199
new Promise<void>((resolve, reject) => {
142-
finished(
143-
subscription.on('data', async (resolvedEvent) => {
144-
if (!resolvedEvent.event) return;
145-
146-
const message = mapFromESDBEvent(resolvedEvent, from);
147-
148-
const result = await eachBatch([message]);
200+
console.info(
201+
`Starting subscription. ${retry++} retries. From: ${JSONParser.stringify(from ?? '$all')}, Start from: ${JSONParser.stringify(
202+
options.startFrom,
203+
)}`,
204+
);
205+
subscription = subscribe(client, from, options);
149206

150-
if (result && result.type === 'STOP') {
151-
isRunning = false;
152-
await subscription.unsubscribe();
153-
}
207+
pipeline(
208+
subscription,
209+
new SubscriptionSequentialHandler({
210+
client,
211+
from,
212+
batchSize,
213+
eachBatch,
214+
resilience,
215+
}),
216+
new (class extends Writable {
217+
async _write(
218+
chunk: bigint | MessageHandlerResult,
219+
_encoding: string,
220+
done: () => void,
221+
) {
222+
if (isBigint(chunk)) {
223+
options.startFrom = {
224+
lastCheckpoint: chunk,
225+
};
226+
done();
227+
return;
228+
}
154229

155-
from = {
156-
stream: from?.stream ?? $all,
157-
options: {
158-
...(from?.options ?? {}),
159-
...(!from || from?.stream === $all
160-
? {
161-
fromPosition: resolvedEvent.event.position,
162-
}
163-
: {
164-
fromRevision:
165-
resolvedEvent.link?.revision ??
166-
resolvedEvent.event.revision,
167-
}),
168-
},
169-
};
170-
}) as unknown as Readable,
171-
(error) => {
172-
if (error) {
173-
console.error(`Received error: ${JSON.stringify(error)}.`);
174-
reject(error);
175-
return;
230+
await stopSubscription(done);
176231
}
232+
})({ objectMode: true }),
233+
async (error: Error | null) => {
177234
console.info(`Stopping subscription.`);
178-
resolve();
235+
await stopSubscription(() => {
236+
if (!error) {
237+
console.info('Subscription ended successfully.');
238+
resolve();
239+
return;
240+
}
241+
console.error(
242+
`Received error: ${JSONParser.stringify(error)}.`,
243+
);
244+
reject(error);
245+
});
179246
},
180247
);
181248
}),
@@ -199,9 +266,8 @@ export const eventStoreDBSubscription = <
199266
return start;
200267
},
201268
stop: async () => {
202-
if (!isRunning) return;
203-
isRunning = false;
204-
await subscription?.unsubscribe();
269+
if (!isRunning) return start ? await start : Promise.resolve();
270+
await stopSubscription();
205271
await start;
206272
},
207273
};

src/packages/emmett/src/database/inMemoryDatabase.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,9 @@ export const getInMemoryDatabase = (): InMemoryDatabase => {
185185

186186
const documentsInCollection = storage.get(collectionName)!;
187187

188-
const foundIndexes = documentsInCollection
189-
.filter((doc) => predicate(doc as T))
190-
.map((_, index) => index);
191-
192-
const firstIndex = foundIndexes[0];
188+
const firstIndex = documentsInCollection.findIndex((doc) =>
189+
predicate(doc as T),
190+
);
193191

194192
if (firstIndex === undefined || firstIndex === -1) {
195193
return Promise.resolve(
@@ -239,7 +237,7 @@ export const getInMemoryDatabase = (): InMemoryDatabase => {
239237
{
240238
successful: true,
241239
modifiedCount: 1,
242-
matchedCount: foundIndexes.length,
240+
matchedCount: firstIndex,
243241
nextExpectedVersion: newVersion,
244242
},
245243
{ operationName: 'replaceOne', collectionName, errors },

src/packages/emmett/src/eventStore/projections/inMemory/inMemoryProjectionSpec.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
type InMemoryDatabase,
1010
} from '../../../database';
1111
import { isErrorConstructor } from '../../../errors';
12+
import { JSONParser } from '../../../serialization';
1213
import {
1314
assertFails,
1415
AssertionError,
@@ -247,7 +248,7 @@ export function documentExists<T extends DocumentWithId>(
247248
const propKey = key as keyof typeof document;
248249
if (
249250
!(key in document) ||
250-
JSON.stringify(document[propKey]) !== JSON.stringify(value)
251+
JSONParser.stringify(document[propKey]) !== JSONParser.stringify(value)
251252
) {
252253
assertFails(`Property ${key} doesn't match the expected value`);
253254
return Promise.resolve(false);

src/packages/emmett/src/processors/inMemoryProcessors.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,19 +94,20 @@ export const inMemoryCheckpointer = <
9494
currentPosition &&
9595
(currentPosition === newCheckpoint ||
9696
currentPosition !== lastCheckpoint)
97-
)
98-
return Promise.resolve({
97+
) {
98+
return {
9999
success: false,
100100
reason: currentPosition === newCheckpoint ? 'IGNORED' : 'MISMATCH',
101-
});
101+
};
102+
}
102103

103104
await checkpoints.handle(processorId, (existing) => ({
104105
...(existing ?? {}),
105106
_id: processorId,
106107
lastCheckpoint: newCheckpoint,
107108
}));
108109

109-
return Promise.resolve({ success: true, newCheckpoint });
110+
return { success: true, newCheckpoint };
110111
},
111112
};
112113
};

0 commit comments

Comments
 (0)