Skip to content

Commit 11cf2f1

Browse files
committed
Improved tests resiliency around restarts
1 parent 8a1b007 commit 11cf2f1

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

src/packages/emmett-esdb/src/eventStore/consumers/eventStoreDBEventStoreConsumer.handling.int.spec.ts

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import {
22
assertThatArray,
3+
asyncAwaiter,
34
delay,
45
getInMemoryDatabase,
6+
JSONParser,
57
type Event,
68
type InMemoryReactorOptions,
79
type RecordedMessage,
@@ -431,6 +433,7 @@ void describe('EventStoreDB event store started consumer', () => {
431433

432434
const guestId = uuid();
433435
const streamName = `guestStay-${guestId}`;
436+
const waitForStart = asyncAwaiter();
434437

435438
// When
436439
const consumer = eventStoreDBEventStoreConsumer({
@@ -441,7 +444,8 @@ void describe('EventStoreDB event store started consumer', () => {
441444
processorId: uuid(),
442445
stopAfter: (event) =>
443446
event.metadata.globalPosition === stopAfterPosition,
444-
eachMessage: (event) => {
447+
eachMessage: async (event) => {
448+
await waitForStart.wait;
445449
result.push(event);
446450
},
447451
});
@@ -459,6 +463,7 @@ void describe('EventStoreDB event store started consumer', () => {
459463
events,
460464
);
461465
stopAfterPosition = appendResult.lastEventGlobalPosition;
466+
waitForStart.resolve();
462467

463468
await consumerPromise;
464469

@@ -492,6 +497,7 @@ void describe('EventStoreDB event store started consumer', () => {
492497

493498
const result: GuestStayEvent[] = [];
494499
let stopAfterPosition: bigint | undefined = undefined;
500+
const waitForStart = asyncAwaiter();
495501

496502
// When
497503
const consumer = eventStoreDBEventStoreConsumer({
@@ -503,7 +509,8 @@ void describe('EventStoreDB event store started consumer', () => {
503509
startFrom: 'CURRENT',
504510
stopAfter: (event) =>
505511
event.metadata.globalPosition === stopAfterPosition,
506-
eachMessage: (event) => {
512+
eachMessage: async (event) => {
513+
await waitForStart.wait;
507514
result.push(event);
508515
},
509516
});
@@ -516,6 +523,7 @@ void describe('EventStoreDB event store started consumer', () => {
516523
events,
517524
);
518525
stopAfterPosition = appendResult.lastEventGlobalPosition;
526+
waitForStart.resolve();
519527

520528
await consumerPromise;
521529

@@ -555,6 +563,8 @@ void describe('EventStoreDB event store started consumer', () => {
555563
let result: GuestStayEvent[] = [];
556564
let stopAfterPosition: bigint | undefined = lastEventGlobalPosition;
557565

566+
const waitForStart = asyncAwaiter();
567+
558568
// When
559569
const consumer = eventStoreDBEventStoreConsumer({
560570
connectionString,
@@ -565,26 +575,33 @@ void describe('EventStoreDB event store started consumer', () => {
565575
startFrom: 'CURRENT',
566576
stopAfter: (event) =>
567577
event.metadata.globalPosition === stopAfterPosition,
568-
eachMessage: (event) => {
578+
eachMessage: async (event) => {
579+
await waitForStart.wait;
569580
result.push(event);
581+
console.log(`Received event: ${JSONParser.stringify(event)}`);
570582
},
571583
});
572584

573-
await consumer.start();
585+
let consumerPromise = consumer.start();
586+
waitForStart.resolve();
587+
await consumerPromise;
574588
await consumer.stop();
575589

590+
waitForStart.reset();
591+
576592
result = [];
577593

578594
stopAfterPosition = undefined;
579595

580596
try {
581-
const consumerPromise = consumer.start();
597+
consumerPromise = consumer.start();
582598

583599
const appendResult = await eventStore.appendToStream(
584600
streamName,
585601
events,
586602
);
587603
stopAfterPosition = appendResult.lastEventGlobalPosition;
604+
waitForStart.resolve();
588605

589606
await consumerPromise;
590607

@@ -621,12 +638,14 @@ void describe('EventStoreDB event store started consumer', () => {
621638
let result: GuestStayEvent[] = [];
622639
let stopAfterPosition: bigint | undefined = lastEventGlobalPosition;
623640

641+
const waitForStart = asyncAwaiter();
624642
const processorOptions: InMemoryReactorOptions<GuestStayEvent> = {
625643
processorId: uuid(),
626644
startFrom: 'CURRENT',
627645
stopAfter: (event) =>
628646
event.metadata.globalPosition === stopAfterPosition,
629-
eachMessage: (event) => {
647+
eachMessage: async (event) => {
648+
await waitForStart.wait;
630649
result.push(event);
631650
},
632651
connectionOptions: { database },
@@ -640,13 +659,15 @@ void describe('EventStoreDB event store started consumer', () => {
640659
try {
641660
consumer.reactor<GuestStayEvent>(processorOptions);
642661

662+
waitForStart.resolve();
643663
await consumer.start();
644664
} finally {
645665
await consumer.close();
646666
}
647667

648668
result = [];
649669

670+
waitForStart.reset();
650671
stopAfterPosition = undefined;
651672

652673
const newConsumer = eventStoreDBEventStoreConsumer({
@@ -662,6 +683,7 @@ void describe('EventStoreDB event store started consumer', () => {
662683
streamName,
663684
events,
664685
);
686+
waitForStart.resolve();
665687
stopAfterPosition = appendResult.lastEventGlobalPosition;
666688

667689
await consumerPromise;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,25 @@
11
export const delay = (ms: number): Promise<void> => {
22
return new Promise((resolve) => setTimeout(resolve, ms));
33
};
4+
5+
export type AsyncAwaiter<T = void> = {
6+
wait: Promise<T>;
7+
resolve: (value: T | PromiseLike<T>) => void;
8+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
9+
reject: (reason?: any) => void;
10+
reset: () => void;
11+
};
12+
13+
// TODO: Remove this after migrating to Node 22
14+
export const asyncAwaiter = <T = void>(): AsyncAwaiter<T> => {
15+
const result: AsyncAwaiter<T> = {} as AsyncAwaiter<T>;
16+
17+
(result.reset = () => {
18+
result.wait = new Promise<T>((res, rej) => {
19+
result.resolve = res;
20+
result.reject = rej;
21+
});
22+
})();
23+
24+
return result;
25+
};

0 commit comments

Comments
 (0)