Skip to content

Commit 23d2bee

Browse files
jtomaszewskiclaude
andcommitted
feat(core): add graceful shutdown handling to RetryableInboxOutboxEventPoller
Implement OnModuleDestroy lifecycle hook to properly clean up resources during application shutdown: - Unsubscribe from RxJS interval to stop polling - Track in-flight event processing and wait for completion - Add isShuttingDown flag to prevent new processing during shutdown 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent a7a0386 commit 23d2bee

File tree

2 files changed

+214
-16
lines changed

2 files changed

+214
-16
lines changed

packages/core/src/poller/retryable-inbox-outbox-event.poller.ts

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { Inject, Injectable, Logger, OnModuleInit } from '@nestjs/common';
2-
import { EMPTY, catchError, concatMap, from, interval, repeat } from 'rxjs';
1+
import { Inject, Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
2+
import { EMPTY, Subscription, catchError, concatMap, from, interval, repeat } from 'rxjs';
33
import { DATABASE_DRIVER_FACTORY_TOKEN, DatabaseDriverFactory } from '../driver/database-driver.factory';
44
import { TransactionalEventEmitter } from '../emitter/transactional-event-emitter';
55
import { InboxOutboxModuleOptions, MODULE_OPTIONS_TOKEN } from '../inbox-outbox.module-definition';
@@ -8,7 +8,11 @@ import { INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN, InboxOutboxEventProcessorContract }
88
import { EventConfigurationResolver } from '../resolver/event-configuration.resolver';
99

1010
@Injectable()
11-
export class RetryableInboxOutboxEventPoller implements OnModuleInit {
11+
export class RetryableInboxOutboxEventPoller implements OnModuleInit, OnModuleDestroy {
12+
private subscription: Subscription | null = null;
13+
private inFlightProcessing: Set<Promise<unknown>> = new Set();
14+
private isShuttingDown = false;
15+
1216
constructor(
1317
@Inject(MODULE_OPTIONS_TOKEN) private options: InboxOutboxModuleOptions,
1418
@Inject(DATABASE_DRIVER_FACTORY_TOKEN) private databaseDriverFactory: DatabaseDriverFactory,
@@ -19,9 +23,12 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit {
1923
) {}
2024
async onModuleInit() {
2125
this.logger.log(`Inbox options: retryEveryMilliseconds: ${this.options.retryEveryMilliseconds}, maxInboxOutboxTransportEventPerRetry: ${this.options.maxInboxOutboxTransportEventPerRetry}, events: ${JSON.stringify(this.options.events)}, driver: ${this.options.driverFactory.constructor.name}`);
22-
interval(this.options.retryEveryMilliseconds)
26+
this.subscription = interval(this.options.retryEveryMilliseconds)
2327
.pipe(
2428
concatMap(() => {
29+
if (this.isShuttingDown) {
30+
return EMPTY;
31+
}
2532
return from(this.poolRetryableEvents());
2633
}),
2734
catchError((exception) => {
@@ -34,6 +41,24 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit {
3441
.subscribe();
3542
}
3643

44+
async onModuleDestroy() {
45+
this.isShuttingDown = true;
46+
this.logger.log('Shutting down RetryableInboxOutboxEventPoller...');
47+
48+
if (this.subscription) {
49+
this.subscription.unsubscribe();
50+
this.subscription = null;
51+
}
52+
53+
if (this.inFlightProcessing.size > 0) {
54+
this.logger.log(`Waiting for ${this.inFlightProcessing.size} in-flight event(s) to complete...`);
55+
await Promise.allSettled([...this.inFlightProcessing]);
56+
this.logger.log('All in-flight events completed.');
57+
}
58+
59+
this.logger.log('RetryableInboxOutboxEventPoller shutdown complete.');
60+
}
61+
3762
async poolRetryableEvents() {
3863
try {
3964
const maxInboxOutboxTransportEventPerRetry = this.options.maxInboxOutboxTransportEventPerRetry;
@@ -55,18 +80,25 @@ export class RetryableInboxOutboxEventPoller implements OnModuleInit {
5580
}
5681

5782
private async processAsynchronousRetryableEvents(inboxOutboxTransportEvents: InboxOutboxTransportEvent[]) {
58-
return Promise.allSettled(
59-
inboxOutboxTransportEvents.map((inboxOutboxTransportEvent) => {
60-
const notDeliveredToListeners = this.transactionalEventEmitter.getListeners(inboxOutboxTransportEvent.eventName).filter((listener) => {
61-
return !inboxOutboxTransportEvent.deliveredToListeners.includes(listener.getName());
62-
});
83+
const processingPromises = inboxOutboxTransportEvents.map((inboxOutboxTransportEvent) => {
84+
const notDeliveredToListeners = this.transactionalEventEmitter.getListeners(inboxOutboxTransportEvent.eventName).filter((listener) => {
85+
return !inboxOutboxTransportEvent.deliveredToListeners.includes(listener.getName());
86+
});
87+
88+
const processingPromise = this.inboxOutboxEventProcessor.process(
89+
this.options.events.find((event) => event.name === inboxOutboxTransportEvent.eventName),
90+
inboxOutboxTransportEvent,
91+
notDeliveredToListeners,
92+
);
93+
94+
this.inFlightProcessing.add(processingPromise);
95+
processingPromise.finally(() => {
96+
this.inFlightProcessing.delete(processingPromise);
97+
});
98+
99+
return processingPromise;
100+
});
63101

64-
return this.inboxOutboxEventProcessor.process(
65-
this.options.events.find((event) => event.name === inboxOutboxTransportEvent.eventName),
66-
inboxOutboxTransportEvent,
67-
notDeliveredToListeners,
68-
);
69-
}),
70-
);
102+
return Promise.allSettled(processingPromises);
71103
}
72104
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
import { Logger } from '@nestjs/common';
2+
import { DatabaseDriverFactory } from '../../driver/database-driver.factory';
3+
import { DatabaseDriver } from '../../driver/database.driver';
4+
import { TransactionalEventEmitter } from '../../emitter/transactional-event-emitter';
5+
import { InboxOutboxModuleOptions } from '../../inbox-outbox.module-definition';
6+
import { RetryableInboxOutboxEventPoller } from '../../poller/retryable-inbox-outbox-event.poller';
7+
import { InboxOutboxEventProcessorContract } from '../../processor/inbox-outbox-event-processor.contract';
8+
import { EventConfigurationResolver } from '../../resolver/event-configuration.resolver';
9+
import { createMockedDriverFactory } from './mock/driver-factory.mock';
10+
import { createMockedDriver } from './mock/driver.mock';
11+
import { createMockedInboxOutboxOptionsFactory } from './mock/inbox-outbox-options.mock';
12+
13+
describe('RetryableInboxOutboxEventPoller', () => {
14+
let mockedDriver: DatabaseDriver;
15+
let mockedDriverFactory: DatabaseDriverFactory;
16+
let inboxOutboxOptions: InboxOutboxModuleOptions;
17+
let mockLogger: Logger;
18+
let mockTransactionalEventEmitter: TransactionalEventEmitter;
19+
let mockEventConfigurationResolver: EventConfigurationResolver;
20+
let mockInboxOutboxEventProcessor: InboxOutboxEventProcessorContract;
21+
22+
beforeEach(() => {
23+
jest.useFakeTimers();
24+
mockedDriver = createMockedDriver();
25+
mockedDriverFactory = createMockedDriverFactory(mockedDriver);
26+
inboxOutboxOptions = createMockedInboxOutboxOptionsFactory(mockedDriverFactory, [
27+
{
28+
name: 'testEvent',
29+
listeners: {
30+
expiresAtTTL: 1000,
31+
readyToRetryAfterTTL: 1000,
32+
maxExecutionTimeTTL: 1000,
33+
},
34+
},
35+
]);
36+
mockLogger = {
37+
log: jest.fn(),
38+
error: jest.fn(),
39+
warn: jest.fn(),
40+
debug: jest.fn(),
41+
} as unknown as Logger;
42+
43+
mockTransactionalEventEmitter = {
44+
getListeners: jest.fn().mockReturnValue([]),
45+
} as unknown as TransactionalEventEmitter;
46+
47+
mockEventConfigurationResolver = {} as EventConfigurationResolver;
48+
49+
mockInboxOutboxEventProcessor = {
50+
process: jest.fn().mockResolvedValue(undefined),
51+
};
52+
});
53+
54+
afterEach(() => {
55+
jest.useRealTimers();
56+
});
57+
58+
function createPoller() {
59+
return new RetryableInboxOutboxEventPoller(
60+
inboxOutboxOptions,
61+
mockedDriverFactory,
62+
mockInboxOutboxEventProcessor,
63+
mockTransactionalEventEmitter,
64+
mockEventConfigurationResolver,
65+
mockLogger,
66+
);
67+
}
68+
69+
describe('onModuleDestroy', () => {
70+
it('should unsubscribe from interval on shutdown', async () => {
71+
const poller = createPoller();
72+
await poller.onModuleInit();
73+
74+
await poller.onModuleDestroy();
75+
76+
expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...');
77+
expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.');
78+
});
79+
80+
it('should stop polling after shutdown is initiated', async () => {
81+
(mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([]);
82+
const poller = createPoller();
83+
await poller.onModuleInit();
84+
85+
jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds);
86+
await Promise.resolve();
87+
88+
const callCountBeforeShutdown = (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mock.calls.length;
89+
90+
await poller.onModuleDestroy();
91+
92+
jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds * 5);
93+
await Promise.resolve();
94+
95+
const callCountAfterShutdown = (mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mock.calls.length;
96+
expect(callCountAfterShutdown).toBe(callCountBeforeShutdown);
97+
});
98+
99+
it('should wait for in-flight processing to complete before shutdown', async () => {
100+
let resolveProcessing: () => void;
101+
const processingPromise = new Promise<void>((resolve) => {
102+
resolveProcessing = resolve;
103+
});
104+
105+
(mockInboxOutboxEventProcessor.process as jest.Mock).mockReturnValue(processingPromise);
106+
107+
const mockEvent = {
108+
id: 1,
109+
eventName: 'testEvent',
110+
eventPayload: {},
111+
deliveredToListeners: [],
112+
readyToRetryAfter: Date.now(),
113+
expireAt: Date.now() + 1000,
114+
insertedAt: Date.now(),
115+
};
116+
(mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([mockEvent]);
117+
118+
const poller = createPoller();
119+
await poller.onModuleInit();
120+
121+
jest.advanceTimersByTime(inboxOutboxOptions.retryEveryMilliseconds);
122+
await Promise.resolve();
123+
await Promise.resolve();
124+
125+
const shutdownPromise = poller.onModuleDestroy();
126+
127+
expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...');
128+
expect(mockLogger.log).toHaveBeenCalledWith(expect.stringContaining('Waiting for'));
129+
130+
let shutdownCompleted = false;
131+
shutdownPromise.then(() => {
132+
shutdownCompleted = true;
133+
});
134+
135+
await Promise.resolve();
136+
expect(shutdownCompleted).toBe(false);
137+
138+
resolveProcessing!();
139+
await shutdownPromise;
140+
141+
expect(mockLogger.log).toHaveBeenCalledWith('All in-flight events completed.');
142+
expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.');
143+
});
144+
145+
it('should handle shutdown when no in-flight processing exists', async () => {
146+
(mockedDriver.findAndExtendReadyToRetryEvents as jest.Mock).mockResolvedValue([]);
147+
const poller = createPoller();
148+
await poller.onModuleInit();
149+
150+
await poller.onModuleDestroy();
151+
152+
expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...');
153+
expect(mockLogger.log).not.toHaveBeenCalledWith(expect.stringContaining('Waiting for'));
154+
expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.');
155+
});
156+
157+
it('should handle shutdown gracefully even if called before onModuleInit', async () => {
158+
const poller = createPoller();
159+
160+
await poller.onModuleDestroy();
161+
162+
expect(mockLogger.log).toHaveBeenCalledWith('Shutting down RetryableInboxOutboxEventPoller...');
163+
expect(mockLogger.log).toHaveBeenCalledWith('RetryableInboxOutboxEventPoller shutdown complete.');
164+
});
165+
});
166+
});

0 commit comments

Comments
 (0)