@@ -11,38 +11,50 @@ import { uuidv7 } from 'uuidv7'
1111import type { OutboxAccumulator } from './accumulators'
1212import type { OutboxStorage } from './storage'
1313
14+ export type OutboxDependencies < SupportedEvents extends CommonEventDefinition [ ] > = {
15+ outboxStorage : OutboxStorage < SupportedEvents >
16+ outboxAccumulator : OutboxAccumulator < SupportedEvents >
17+ eventEmitter : DomainEventEmitter < SupportedEvents >
18+ }
19+
20+ export type OutboxConfiguration = {
21+ maxRetryCount : number
22+ emitBatchSize : number
23+ jobIntervalInMs : number
24+ }
25+
1426/**
1527 * Main logic for handling outbox entries.
1628 *
1729 * If entry is rejected, it is NOT going to be handled during the same execution. Next execution will pick it up.
1830 */
1931export class OutboxProcessor < SupportedEvents extends CommonEventDefinition [ ] > {
2032 constructor (
21- private readonly outboxStorage : OutboxStorage < SupportedEvents > ,
22- private readonly outboxAccumulator : OutboxAccumulator < SupportedEvents > ,
23- private readonly eventEmitter : DomainEventEmitter < SupportedEvents > ,
33+ private readonly outboxDependencies : OutboxDependencies < SupportedEvents > ,
2434 private readonly maxRetryCount : number ,
2535 private readonly emitBatchSize : number ,
2636 ) { }
2737
2838 public async processOutboxEntries ( context : JobExecutionContext ) {
29- const entries = await this . outboxStorage . getEntries ( this . maxRetryCount )
39+ const { outboxStorage, eventEmitter, outboxAccumulator } = this . outboxDependencies
40+
41+ const entries = await outboxStorage . getEntries ( this . maxRetryCount )
3042
3143 await PromisePool . for ( entries )
3244 . withConcurrency ( this . emitBatchSize )
3345 . process ( async ( entry ) => {
3446 try {
35- await this . eventEmitter . emit ( entry . event , entry . data , entry . precedingMessageMetadata )
36- await this . outboxAccumulator . add ( entry )
47+ await eventEmitter . emit ( entry . event , entry . data , entry . precedingMessageMetadata )
48+ await outboxAccumulator . add ( entry )
3749 } catch ( e ) {
3850 context . logger . error ( { error : e } , 'Failed to process outbox entry.' )
3951
40- await this . outboxAccumulator . addFailure ( entry )
52+ await outboxAccumulator . addFailure ( entry )
4153 }
4254 } )
4355
44- await this . outboxStorage . flush ( this . outboxAccumulator )
45- await this . outboxAccumulator . clear ( )
56+ await outboxStorage . flush ( outboxAccumulator )
57+ await outboxAccumulator . clear ( )
4658 }
4759}
4860
@@ -59,19 +71,15 @@ export class OutboxPeriodicJob<
5971 private readonly outboxProcessor : OutboxProcessor < SupportedEvents >
6072
6173 constructor (
62- outboxStorage : OutboxStorage < SupportedEvents > ,
63- outboxAccumulator : OutboxAccumulator < SupportedEvents > ,
64- eventEmitter : DomainEventEmitter < SupportedEvents > ,
74+ outboxDependencies : OutboxDependencies < SupportedEvents > ,
75+ outboxConfiguration : OutboxConfiguration ,
6576 dependencies : PeriodicJobDependencies ,
66- maxRetryCount : number ,
67- emitBatchSize : number ,
68- intervalInMs : number ,
6977 ) {
7078 super (
7179 {
7280 jobId : 'OutboxJob' ,
7381 schedule : {
74- intervalInMs : intervalInMs ,
82+ intervalInMs : outboxConfiguration . jobIntervalInMs ,
7583 } ,
7684 singleConsumerMode : {
7785 enabled : true ,
@@ -87,11 +95,9 @@ export class OutboxPeriodicJob<
8795 )
8896
8997 this . outboxProcessor = new OutboxProcessor < SupportedEvents > (
90- outboxStorage ,
91- outboxAccumulator ,
92- eventEmitter ,
93- maxRetryCount ,
94- emitBatchSize ,
98+ outboxDependencies ,
99+ outboxConfiguration . maxRetryCount ,
100+ outboxConfiguration . emitBatchSize ,
95101 )
96102 }
97103
0 commit comments