Skip to content

Commit 57fcd84

Browse files
committed
test: tests, eslint, ts fixed
1 parent 6f78760 commit 57fcd84

File tree

4 files changed

+35
-46
lines changed

4 files changed

+35
-46
lines changed

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ export type MongoDBEventStoreConsumerConfig<
6262
CheckpointType
6363
> & {
6464
// from?: any;
65-
pulling?: {
66-
batchSize?: number;
67-
};
65+
// pulling?: {
66+
// batchSize?: number;
67+
// };
6868
resilience?: {
6969
resubscribeOptions?: AsyncRetryOptions;
7070
};
@@ -224,9 +224,15 @@ export const mongoDBMessagesConsumer = <
224224
},
225225
});
226226

227+
const db = options.client?.db?.();
228+
229+
if (!db) {
230+
throw new EmmettError('MongoDB client is not connected');
231+
}
232+
227233
// TODO: Remember to fix.
228-
const policy = (await generateVersionPolicies(options.client?.db()!))
229-
.changeStreamFullDocumentValuePolicy;
234+
const versionPolicies = await generateVersionPolicies(db);
235+
const policy = versionPolicies.changeStreamFullDocumentValuePolicy;
230236

231237
await stream.start({
232238
getFullDocumentValue: policy,

src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBProcessor.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ const isResumeToken = (value: any): value is MongoDBResumeToken =>
9191

9292
export const getCheckpoint = <
9393
MessageType extends AnyMessage = AnyMessage,
94-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
9594
CheckpointType = MongoDBCheckpointer,
9695
MessageMetadataType extends
9796
ReadEventMetadataWithGlobalPosition<CheckpointType> = ReadEventMetadataWithGlobalPosition<CheckpointType>,

src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ export type MongoDBSubscriptionOptions<
3636
MessageType extends Message = Message,
3737
MessageMetadataType extends
3838
MongoDBReadEventMetadata = MongoDBRecordedMessageMetadata,
39-
CheckpointType = MongoDBResumeToken,
39+
// CheckpointType = MongoDBResumeToken,
40+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
41+
CheckpointType = any,
4042
> = {
4143
from?: CurrentMessageProcessorPosition<CheckpointType>;
4244
client: MongoClient;
43-
batchSize: number;
45+
// batchSize: number;
4446
eachBatch: BatchRecordedMessageHandlerWithoutContext<
4547
MessageType,
4648
MessageMetadataType
@@ -346,7 +348,6 @@ const createChangeStream = <
346348

347349
const subscribe =
348350
(getFullDocumentValue: ChangeStreamFullDocumentValuePolicy, db: Db) =>
349-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
350351
<EventType extends Message = AnyMessage, CheckpointType = MongoDBResumeToken>(
351352
resumeToken?: MongoDBSubscriptionStartFrom<CheckpointType>,
352353
) => {
@@ -361,7 +362,7 @@ export const mongoDBSubscription = <
361362
>({
362363
client,
363364
from,
364-
batchSize,
365+
// batchSize,
365366
eachBatch,
366367
resilience,
367368
}: MongoDBSubscriptionOptions<
@@ -396,14 +397,18 @@ export const mongoDBSubscription = <
396397
callback?.();
397398
resolve();
398399
} catch (error) {
399-
reject(error);
400+
reject(
401+
error instanceof Error
402+
? error
403+
: typeof error === 'string'
404+
? new Error(error)
405+
: new Error('Unknown error'),
406+
);
400407
}
401408
});
402409
} else {
403410
try {
404411
await subscription.close();
405-
} catch (error) {
406-
throw error;
407412
} finally {
408413
callback?.();
409414
}
@@ -434,7 +439,7 @@ export const mongoDBSubscription = <
434439
>({
435440
client,
436441
from,
437-
batchSize,
442+
// batchSize,
438443
eachBatch,
439444
resilience,
440445
});
@@ -449,7 +454,7 @@ export const mongoDBSubscription = <
449454

450455
if (isMongoDBResumeToken(result)) {
451456
options.startFrom = {
452-
lastCheckpoint: result,
457+
lastCheckpoint: result as ResumeToken,
453458
};
454459
done();
455460
return;
@@ -513,8 +518,7 @@ export const mongoDBSubscription = <
513518

514519
start = (async () => {
515520
isRunning = true;
516-
const a = pipeMessages(options);
517-
return a;
521+
return pipeMessages(options);
518522
})();
519523

520524
return start;

src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,6 @@ void describe('MongoDBEventStore subscription', () => {
6767
quantity: 10,
6868
price: 3,
6969
}) as PricedProductItem;
70-
const timeoutGuard = async (
71-
action: () => Promise<void>,
72-
timeoutAfterMs = 1000,
73-
) => {
74-
return new Promise<void>((resolve, reject) => {
75-
const timer = setTimeout(() => {
76-
reject(new Error('timeout'));
77-
clearTimeout(timer);
78-
}, timeoutAfterMs);
79-
80-
action()
81-
.catch(noop)
82-
.finally(() => {
83-
clearTimeout(timer);
84-
resolve();
85-
});
86-
});
87-
};
8870

8971
before(async () => {
9072
mongodb = await new MongoDBContainer('mongo:8.0.10').start();
@@ -126,11 +108,11 @@ void describe('MongoDBEventStore subscription', () => {
126108
stopAfter: (event) => {
127109
if (event.data.productItem.productId === lastProductItemIdTest1) {
128110
messageProcessingPromise1.resolve();
129-
consumer.stop();
111+
consumer.stop().catch(noop);
130112
}
131113
if (event.data.productItem.productId === lastProductItemIdTest2) {
132114
messageProcessingPromise2.resolve();
133-
consumer.stop();
115+
consumer.stop().catch(noop);
134116
}
135117

136118
return (
@@ -185,11 +167,7 @@ void describe('MongoDBEventStore subscription', () => {
185167
{ expectedStreamVersion: 2n },
186168
);
187169

188-
try {
189-
await consumer.start();
190-
} catch (err) {
191-
console.error(err);
192-
}
170+
await consumer.start();
193171

194172
const stream = await collection.findOne(
195173
{ streamName },
@@ -204,7 +182,7 @@ void describe('MongoDBEventStore subscription', () => {
204182
assertTrue(stream.metadata.updatedAt instanceof Date);
205183
});
206184

207-
void it.skip('should renew after the last event', async () => {
185+
void it('should renew after the last event', async () => {
208186
assertTrue(!!processor);
209187
assert(processor);
210188

@@ -215,8 +193,6 @@ void describe('MongoDBEventStore subscription', () => {
215193
assertIsNotNull(stream);
216194
assertEqual(3n, stream.metadata.streamPosition);
217195

218-
await consumer.start();
219-
220196
const position = await processor.start({ client });
221197

222198
assertTrue(!!position);
@@ -230,6 +206,10 @@ void describe('MongoDBEventStore subscription', () => {
230206
compareTwoMongoDBTokens(position.lastCheckpoint, lastResumeToken!),
231207
);
232208

209+
const consumerPromise = consumer.start();
210+
211+
await new Promise((resolve) => setTimeout(resolve, 1000));
212+
233213
await eventStore.appendToStream<ShoppingCartEvent>(
234214
streamName,
235215
[
@@ -241,7 +221,7 @@ void describe('MongoDBEventStore subscription', () => {
241221
{ expectedStreamVersion: 3n },
242222
);
243223

244-
await timeoutGuard(() => messageProcessingPromise2);
224+
await consumerPromise;
245225

246226
stream = await collection.findOne({ streamName }, { useBigInt64: true });
247227
assertIsNotNull(stream);

0 commit comments

Comments
 (0)