Skip to content

Commit 10257b4

Browse files
committed
Fixed cleanup and stopping processing right after error was caught
1 parent 11cf2f1 commit 10257b4

File tree

4 files changed

+140
-30
lines changed

4 files changed

+140
-30
lines changed

src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.handling.int.spec.ts

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import {
33
asyncAwaiter,
44
delay,
55
getInMemoryDatabase,
6-
JSONParser,
76
type Event,
87
type InMemoryReactorOptions,
98
type RecordedMessage,
@@ -24,7 +23,7 @@ import {
2423
type EventStoreDBEventStoreConsumerType,
2524
} from './eventStoreDBEventStoreConsumer';
2625

27-
const withDeadline = { timeout: 10000 };
26+
const withDeadline = { timeout: 1000000 };
2827

2928
void describe('EventStoreDB event store started consumer', () => {
3029
let eventStoreDB: StartedEventStoreDBContainer;
@@ -191,7 +190,7 @@ void describe('EventStoreDB event store started consumer', () => {
191190
event.metadata.globalPosition ===
192191
appendResult.lastEventGlobalPosition,
193192
eachMessage: async (event) => {
194-
await delay(Math.floor(Math.random() * 150));
193+
await delay(Math.floor(Math.random() * 200));
195194

196195
result.push(event);
197196
},
@@ -208,6 +207,69 @@ void describe('EventStoreDB event store started consumer', () => {
208207
}
209208
});
210209

210+
void it(
211+
`stops processing on unhandled error in handler`,
212+
{ timeout: 1500000 },
213+
async () => {
214+
// Given
215+
const guestId = uuid();
216+
const otherGuestId = uuid();
217+
const streamName = `guestStay-${otherGuestId}`;
218+
const otherStreamName = `guestStay-${guestId}`;
219+
const events: NumberRecorded[] = [
220+
{ type: 'NumberRecorded', data: { number: 1 } },
221+
{ type: 'NumberRecorded', data: { number: 2 } },
222+
{ type: 'NumberRecorded', data: { number: 3 } },
223+
{ type: 'NumberRecorded', data: { number: 4 } },
224+
{ type: 'NumberRecorded', data: { number: 5 } },
225+
{ type: 'NumberRecorded', data: { number: 6 } },
226+
{ type: 'NumberRecorded', data: { number: 7 } },
227+
{ type: 'NumberRecorded', data: { number: 8 } },
228+
{ type: 'NumberRecorded', data: { number: 9 } },
229+
{ type: 'NumberRecorded', data: { number: 10 } },
230+
];
231+
const appendResult = await eventStore.appendToStream(
232+
streamName,
233+
events,
234+
);
235+
await eventStore.appendToStream(otherStreamName, events);
236+
237+
const result: NumberRecorded[] = [];
238+
239+
let shouldThrowRandomError = false;
240+
241+
// When
242+
const consumer = eventStoreDBEventStoreConsumer({
243+
connectionString,
244+
from: { stream: streamName },
245+
});
246+
consumer.reactor<NumberRecorded>({
247+
processorId: uuid(),
248+
stopAfter: (event) =>
249+
event.metadata.globalPosition ===
250+
appendResult.lastEventGlobalPosition,
251+
eachMessage: (event) => {
252+
if (shouldThrowRandomError) {
253+
return Promise.reject(new Error('Random error'));
254+
}
255+
256+
result.push(event);
257+
258+
shouldThrowRandomError = !shouldThrowRandomError;
259+
return Promise.resolve();
260+
},
261+
});
262+
263+
try {
264+
await consumer.start();
265+
266+
assertThatArray(result.map((e) => e.data.number)).containsExactly(1);
267+
} finally {
268+
await consumer.close();
269+
}
270+
},
271+
);
272+
211273
void it(
212274
`handles all events from $all streams for subscription to stream`,
213275
withDeadline,
@@ -578,7 +640,6 @@ void describe('EventStoreDB event store started consumer', () => {
578640
eachMessage: async (event) => {
579641
await waitForStart.wait;
580642
result.push(event);
581-
console.log(`Received event: ${JSONParser.stringify(event)}`);
582643
},
583644
});
584645

src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,17 @@ export const eventStoreDBEventStoreConsumer = <
123123
}),
124124
);
125125

126+
const error = result.find((r) => r.status === 'rejected')?.reason as
127+
| Error
128+
| undefined;
129+
126130
return result.some(
127131
(r) => r.status === 'fulfilled' && r.value?.type !== 'STOP',
128132
)
129133
? undefined
130134
: {
131135
type: 'STOP',
136+
error: error ? EmmettError.mapFrom(error) : undefined,
132137
};
133138
};
134139

src/packages/emmett-esdb/src/eventStore/consumers/subscriptions/index.ts

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,13 @@ class SubscriptionSequentialHandler<
123123
> extends Transform {
124124
private options: SubscriptionSequentialHandlerOptions<MessageType>;
125125
private from: EventStoreDBEventStoreConsumerType | undefined;
126+
public isRunning: boolean;
126127

127128
constructor(options: SubscriptionSequentialHandlerOptions<MessageType>) {
128129
super({ objectMode: true, ...options });
129130
this.options = options;
130131
this.from = options.from;
132+
this.isRunning = true;
131133
}
132134

133135
async _transform(
@@ -136,23 +138,27 @@ class SubscriptionSequentialHandler<
136138
callback: (error?: Error | null) => void,
137139
): Promise<void> {
138140
try {
139-
if (!resolvedEvent.event) return;
141+
if (!this.isRunning || !resolvedEvent.event) {
142+
callback();
143+
return;
144+
}
140145

141146
const message = mapFromESDBEvent(resolvedEvent, this.from);
142147
const messageCheckpoint = getCheckpoint(message);
143148

144149
const result = await this.options.eachBatch([message]);
145150

146151
if (result && result.type === 'STOP') {
152+
this.isRunning = false;
147153
if (!result.error) this.push(messageCheckpoint);
148154

149155
this.push(result);
156+
this.push(null);
150157
callback();
151158
return;
152159
}
153160

154161
this.push(messageCheckpoint);
155-
156162
callback();
157163
} catch (error) {
158164
callback(error as Error);
@@ -172,6 +178,7 @@ export const eventStoreDBSubscription = <
172178
let isRunning = false;
173179

174180
let start: Promise<void>;
181+
let processor: SubscriptionSequentialHandler<MessageType>;
175182

176183
let subscription: StreamSubscription<MessageType>;
177184

@@ -186,8 +193,12 @@ export const eventStoreDBSubscription = <
186193

187194
const stopSubscription = (callback?: () => void): Promise<void> => {
188195
isRunning = false;
196+
if (processor) processor.isRunning = false;
189197
return subscription
190198
.unsubscribe()
199+
.then(() => {
200+
subscription.destroy();
201+
})
191202
.catch((err) => console.error('Error during unsubscribe.%s', err))
192203
.finally(callback ?? (() => {}));
193204
};
@@ -204,32 +215,47 @@ export const eventStoreDBSubscription = <
204215
);
205216
subscription = subscribe(client, from, options);
206217

207-
pipeline(
208-
subscription,
209-
new SubscriptionSequentialHandler({
210-
client,
211-
from,
212-
batchSize,
213-
eachBatch,
214-
resilience,
215-
}),
216-
new (class extends Writable {
217-
async _write(
218-
chunk: bigint | MessageHandlerResult,
219-
_encoding: string,
220-
done: () => void,
221-
) {
222-
if (isBigint(chunk)) {
223-
options.startFrom = {
224-
lastCheckpoint: chunk,
225-
};
226-
done();
227-
return;
228-
}
218+
processor = new SubscriptionSequentialHandler({
219+
client,
220+
from,
221+
batchSize,
222+
eachBatch,
223+
resilience,
224+
});
225+
226+
const handler = new (class extends Writable {
227+
async _write(
228+
result: bigint | MessageHandlerResult,
229+
_encoding: string,
230+
done: () => void,
231+
) {
232+
if (!isRunning) return;
233+
234+
if (isBigint(result)) {
235+
options.startFrom = {
236+
lastCheckpoint: result,
237+
};
238+
done();
239+
return;
240+
}
229241

230-
await stopSubscription(done);
242+
if (result && result.type === 'STOP' && result.error) {
243+
console.error(
244+
`Subscription stopped with error code: ${result.error.errorCode}, message: ${
245+
result.error.message
246+
}.`,
247+
);
231248
}
232-
})({ objectMode: true }),
249+
250+
await stopSubscription();
251+
done();
252+
}
253+
})({ objectMode: true });
254+
255+
pipeline(
256+
subscription,
257+
processor,
258+
handler,
233259
async (error: Error | null) => {
234260
console.info(`Stopping subscription.`);
235261
await stopSubscription(() => {

src/packages/emmett/src/errors/index.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ export class EmmettError extends Error {
4343
// 👇️ because we are extending a built-in class
4444
Object.setPrototypeOf(this, EmmettError.prototype);
4545
}
46+
47+
public static mapFrom(
48+
error: Error | { message?: string; errorCode?: number },
49+
): EmmettError {
50+
if (error instanceof EmmettError) {
51+
return error;
52+
}
53+
54+
return new EmmettError({
55+
errorCode:
56+
'errorCode' in error &&
57+
error.errorCode !== undefined &&
58+
error.errorCode !== null
59+
? error.errorCode
60+
: 500,
61+
message: error.message ?? 'An unknown error occurred',
62+
});
63+
}
4664
}
4765

4866
export class ConcurrencyError extends EmmettError {

0 commit comments

Comments
 (0)