1- import { AbstractPeriodicJob , type JobExecutionContext } from '@lokalise/background-jobs-common'
21import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common'
2+ import { AbstractPeriodicJob , type JobExecutionContext } from '@lokalise/background-jobs-common'
33import type {
44 CommonEventDefinition ,
55 CommonEventDefinitionPublisherSchemaType ,
@@ -9,6 +9,7 @@ import type {
99import { PromisePool } from '@supercharge/promise-pool'
1010import { uuidv7 } from 'uuidv7'
1111import type { OutboxAccumulator } from './accumulators'
12+ import type { OutboxEntry } from './objects'
1213import type { OutboxStorage } from './storage'
1314
1415export type OutboxDependencies < SupportedEvents extends CommonEventDefinition [ ] > = {
@@ -42,10 +43,8 @@ export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
4243
4344 const entries = await outboxStorage . getEntries ( this . outboxProcessorConfiguration . maxRetryCount )
4445
45- const currentEntriesInAccumulator = new Set (
46- ( await outboxAccumulator . getEntries ( ) ) . map ( ( entry ) => entry . id ) ,
47- )
48- const filteredEntries = entries . filter ( ( entry ) => ! currentEntriesInAccumulator . has ( entry . id ) )
46+ const filteredEntries =
47+ entries . length === 0 ? entries : await this . getFilteredEntries ( entries , outboxAccumulator )
4948
5049 await PromisePool . for ( filteredEntries )
5150 . withConcurrency ( this . outboxProcessorConfiguration . emitBatchSize )
@@ -63,6 +62,16 @@ export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
6362 await outboxStorage . flush ( outboxAccumulator )
6463 await outboxAccumulator . clear ( )
6564 }
65+
66+ private async getFilteredEntries (
67+ entries : OutboxEntry < SupportedEvents [ number ] > [ ] ,
68+ outboxAccumulator : OutboxAccumulator < SupportedEvents > ,
69+ ) {
70+ const currentEntriesInAccumulator = new Set (
71+ ( await outboxAccumulator . getEntries ( ) ) . map ( ( entry ) => entry . id ) ,
72+ )
73+ return entries . filter ( ( entry ) => ! currentEntriesInAccumulator . has ( entry . id ) )
74+ }
6675}
6776
6877/**
0 commit comments