Skip to content

Commit 601c725

Browse files
jtomaszewskiclaude
andcommitted
feat(core): add immediateProcessing option to event configuration
Add optional `immediateProcessing` boolean to InboxOutboxModuleEventOptions (default: true for backward compatibility). When set to false, events are only saved to DB and processed later by the poller, enabling a safer "fire and forget" pattern for crash recovery scenarios. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent a7a0386 commit 601c725

File tree

4 files changed

+137
-0
lines changed

4 files changed

+137
-0
lines changed

packages/core/README.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,31 @@ await this.transactionalEventEmitter.emitAsync(
154154

155155
> **Note:** Use `emitAsync` if you need to wait for listeners to execute and complete before moving on. Use `emit` if you want to fire-and-forget the event delivery.
156156
157+
### Immediate vs Deferred Processing
158+
159+
By default, events are immediately processed after being saved to the database. You can disable this behavior per-event using the `immediateProcessing` option:
160+
161+
```typescript
162+
{
163+
name: UserApplicationAssignedEvent.name,
164+
listeners: {
165+
expiresAtTTL: 1000 * 60 * 60 * 24,
166+
maxExecutionTimeTTL: 1000 * 15,
167+
readyToRetryAfterTTL: 10000,
168+
},
169+
immediateProcessing: false, // Only save to DB, process later via poller
170+
}
171+
```
172+
173+
**Trade-offs:**
174+
175+
| Immediate Processing (`true`, default) | Deferred Processing (`false`) |
176+
|----------------------------------------|-------------------------------|
177+
| Lower latency for listeners | Higher latency (waits for poller) |
178+
| Best effort delivery on first attempt | All delivery via poller |
179+
| If app crashes during processing, event is still in DB for retry | Safer for crash recovery - no in-flight processing |
180+
| Suitable for most use cases | Suitable for "fire and forget" pattern |
181+
157182
### Event Contract:
158183
Ensure that your event classes implement the `InboxOutboxEvent` interface for consistency and clarity.
159184

@@ -165,6 +190,7 @@ Ensure that your event classes implement the `InboxOutboxEvent` interface for co
165190
- **readyToRetryAfterTTL**: This is how long it will wait before retrying the event listeners
166191
- **retryEveryMilliseconds**: This is how often it will check for events that need to be retried
167192
- **maxInboxOutboxTransportEventPerRetry**: This is how many events it will retry at a time
193+
- **immediateProcessing** (optional, default: `true`): Whether to immediately process the event after saving to DB. When `true`, events are saved and immediately delivered to listeners. When `false`, events are only saved to DB and processed later by the poller (fire-and-forget pattern)
168194

169195
#### Registration
170196
- Register the `InboxOutboxModule` within your application's bootstrap process, specifying global accessibility and event configurations.

packages/core/src/emitter/transactional-event-emitter.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ export class TransactionalEventEmitter {
6161
persister.persist(inboxOutboxTransportEvent);
6262
await persister.flush();
6363

64+
if (eventOptions.immediateProcessing === false) {
65+
return;
66+
}
67+
6468
if (awaitProcessor) {
6569
await this.inboxOutboxEventProcessor.process(eventOptions, inboxOutboxTransportEvent, this.getListeners(event.name));
6670
return;

packages/core/src/inbox-outbox.module-definition.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ export interface InboxOutboxModuleEventOptions {
88
readyToRetryAfterTTL: number;
99
maxExecutionTimeTTL: number;
1010
};
11+
/**
12+
* Whether to immediately process the event after saving to DB.
13+
* When true (default), events are saved and immediately delivered to listeners.
14+
* When false, events are only saved to DB and processed later by the poller.
15+
* Use false for "fire and forget" pattern that's safer for crash recovery.
16+
* @default true
17+
*/
18+
immediateProcessing?: boolean;
1119
}
1220

1321
export interface InboxOutboxModuleOptions {

packages/core/src/test/unit/transaction-event-emitter.spec.ts

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,4 +285,103 @@ describe('TransacationalEventEmitter', () => {
285285

286286
expect(transactionalEventEmitter.getEventNames()).toContain('eventName');
287287
})
288+
289+
it('Should not call process when immediateProcessing is false', async () => {
290+
inboxOutboxOptions.events = [
291+
{
292+
name: 'newEvent',
293+
listeners: {
294+
expiresAtTTL: 1000,
295+
readyToRetryAfterTTL: 1000,
296+
maxExecutionTimeTTL: 1000,
297+
},
298+
immediateProcessing: false,
299+
},
300+
];
301+
302+
const transactionalEventEmitter = new TransactionalEventEmitter(inboxOutboxOptions, mockedDriverFactory, mockedInboxOutboxEventProcessor, mockedEventConfigurationResolver);
303+
304+
const newEvent = {
305+
name: 'newEvent',
306+
};
307+
308+
await transactionalEventEmitter.emit(newEvent, []);
309+
310+
expect(mockedDriver.persist).toHaveBeenCalledTimes(1);
311+
expect(mockedDriver.flush).toHaveBeenCalled();
312+
expect(mockedInboxOutboxEventProcessor.process).not.toHaveBeenCalled();
313+
});
314+
315+
it('Should not call process when immediateProcessing is false with emitAsync', async () => {
316+
inboxOutboxOptions.events = [
317+
{
318+
name: 'newEvent',
319+
listeners: {
320+
expiresAtTTL: 1000,
321+
readyToRetryAfterTTL: 1000,
322+
maxExecutionTimeTTL: 1000,
323+
},
324+
immediateProcessing: false,
325+
},
326+
];
327+
328+
const transactionalEventEmitter = new TransactionalEventEmitter(inboxOutboxOptions, mockedDriverFactory, mockedInboxOutboxEventProcessor, mockedEventConfigurationResolver);
329+
330+
const newEvent = {
331+
name: 'newEvent',
332+
};
333+
334+
await transactionalEventEmitter.emitAsync(newEvent, []);
335+
336+
expect(mockedDriver.persist).toHaveBeenCalledTimes(1);
337+
expect(mockedDriver.flush).toHaveBeenCalled();
338+
expect(mockedInboxOutboxEventProcessor.process).not.toHaveBeenCalled();
339+
});
340+
341+
it('Should call process when immediateProcessing is true', async () => {
342+
inboxOutboxOptions.events = [
343+
{
344+
name: 'newEvent',
345+
listeners: {
346+
expiresAtTTL: 1000,
347+
readyToRetryAfterTTL: 1000,
348+
maxExecutionTimeTTL: 1000,
349+
},
350+
immediateProcessing: true,
351+
},
352+
];
353+
354+
const transactionalEventEmitter = new TransactionalEventEmitter(inboxOutboxOptions, mockedDriverFactory, mockedInboxOutboxEventProcessor, mockedEventConfigurationResolver);
355+
356+
const newEvent = {
357+
name: 'newEvent',
358+
};
359+
360+
await transactionalEventEmitter.emit(newEvent, []);
361+
362+
expect(mockedInboxOutboxEventProcessor.process).toHaveBeenCalledTimes(1);
363+
});
364+
365+
it('Should call process when immediateProcessing is undefined (default behavior)', async () => {
366+
inboxOutboxOptions.events = [
367+
{
368+
name: 'newEvent',
369+
listeners: {
370+
expiresAtTTL: 1000,
371+
readyToRetryAfterTTL: 1000,
372+
maxExecutionTimeTTL: 1000,
373+
},
374+
},
375+
];
376+
377+
const transactionalEventEmitter = new TransactionalEventEmitter(inboxOutboxOptions, mockedDriverFactory, mockedInboxOutboxEventProcessor, mockedEventConfigurationResolver);
378+
379+
const newEvent = {
380+
name: 'newEvent',
381+
};
382+
383+
await transactionalEventEmitter.emit(newEvent, []);
384+
385+
expect(mockedInboxOutboxEventProcessor.process).toHaveBeenCalledTimes(1);
386+
});
288387
});

0 commit comments

Comments
 (0)