Skip to content

Commit 437d892

Browse files
committed
Added task processing
1 parent 8c53ffe commit 437d892

File tree

10 files changed

+917
-34
lines changed

10 files changed

+917
-34
lines changed

src/packages/emmett-postgresql/src/eventStore/projections/postgresProjection.multi.int.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ReadEvent } from '@event-driven-io/emmett/src';
1+
import type { ReadEvent } from '@event-driven-io/emmett';
22
import {
33
PostgreSqlContainer,
44
StartedPostgreSqlContainer,

src/packages/emmett-testcontainers/src/eventStore/eventStoreDBContainer.ts

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { InProcessLock } from '@event-driven-io/emmett';
12
import { EventStoreDBClient } from '@eventstore/db-client';
23
import {
34
AbstractStartedContainer,
@@ -9,8 +10,8 @@ import type { Environment } from 'testcontainers/build/types';
910

1011
export const EVENTSTOREDB_PORT = 2113;
1112
export const EVENTSTOREDB_IMAGE_NAME = 'eventstore/eventstore';
12-
export const EVENTSTOREDB_IMAGE_TAG = '23.10.1-bookworm-slim';
13-
export const EVENTSTOREDB_ARM64_IMAGE_TAG = '23.10.1-alpha-arm64v8';
13+
export const EVENTSTOREDB_IMAGE_TAG = '24.10.0-bookworm-slim';
14+
export const EVENTSTOREDB_ARM64_IMAGE_TAG = '24.10.0-alpha-arm64v8';
1415

1516
export const EVENTSTOREDB_DEFAULT_IMAGE = `${EVENTSTOREDB_IMAGE_NAME}:${process.arch !== 'arm64' ? EVENTSTOREDB_IMAGE_TAG : EVENTSTOREDB_ARM64_IMAGE_TAG}`;
1617

@@ -86,44 +87,52 @@ export class StartedEventStoreDBContainer extends AbstractStartedContainer {
8687
return EventStoreDBClient.connectionString(this.getConnectionString());
8788
}
8889
}
90+
8991
let container: EventStoreDBContainer | null = null;
9092
let startedContainer: StartedEventStoreDBContainer | null = null;
9193
let startedCount = 0;
94+
const lock = InProcessLock();
9295

93-
export const getSharedEventStoreDBTestContainer = async () => {
94-
if (startedContainer) return startedContainer;
96+
export const getSharedEventStoreDBTestContainer = () =>
97+
lock.withAcquire(
98+
async () => {
99+
if (startedContainer) return startedContainer;
95100

96-
if (!container)
97-
container = new EventStoreDBContainer(EVENTSTOREDB_DEFAULT_IMAGE, {
98-
withReuse: true,
99-
});
101+
if (!container)
102+
container = new EventStoreDBContainer(EVENTSTOREDB_DEFAULT_IMAGE);
100103

101-
startedContainer = await container.start();
102-
startedCount++;
104+
startedContainer = await container.start();
105+
startedCount++;
103106

104-
container.withLogConsumer((stream) =>
105-
stream
106-
.on('data', (line) => console.log(line))
107-
.on('err', (line) => console.error(line))
108-
.on('end', () => console.log('Stream closed')),
109-
);
107+
container.withLogConsumer((stream) =>
108+
stream
109+
.on('data', (line) => console.log(line))
110+
.on('err', (line) => console.error(line))
111+
.on('end', () => console.log('Stream closed')),
112+
);
110113

111-
return startedContainer;
112-
};
114+
return startedContainer;
115+
},
116+
{ lockId: 'SharedEventStoreDBTestContainer' },
117+
);
113118

114119
export const getSharedTestEventStoreDBClient = async () => {
115120
return (await getSharedEventStoreDBTestContainer()).getClient();
116121
};
117122

118-
export const releaseSharedEventStoreDBTestContainer = async () => {
119-
const containerToStop = startedContainer;
120-
if (containerToStop && --startedCount === 0) {
121-
try {
122-
startedContainer = null;
123-
container = null;
124-
await containerToStop.stop();
125-
} catch {
126-
/* do nothing */
127-
}
128-
}
129-
};
123+
export const releaseSharedEventStoreDBTestContainer = () =>
124+
lock.withAcquire(
125+
async () => {
126+
const containerToStop = startedContainer;
127+
if (containerToStop && --startedCount === 0) {
128+
try {
129+
startedContainer = null;
130+
container = null;
131+
await containerToStop.stop();
132+
} catch {
133+
/* do nothing */
134+
}
135+
}
136+
},
137+
{ lockId: 'SharedEventStoreDBTestContainer' },
138+
);

src/packages/emmett-tests/src/eventStore/esdb/eventstoreDBEventStore.e2e.spec.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
//import { streamTransformations, type Event } from '@event-driven-io/emmett';
22
import { getEventStoreDBEventStore } from '@event-driven-io/emmett-esdb';
33
import {
4-
EventStoreDBContainer,
4+
getSharedEventStoreDBTestContainer,
5+
releaseSharedEventStoreDBTestContainer,
56
StartedEventStoreDBContainer,
67
} from '@event-driven-io/emmett-testcontainers';
78
import { describe } from 'node:test';
@@ -15,12 +16,12 @@ void describe('EventStoreDBEventStore', async () => {
1516
let esdbContainer: StartedEventStoreDBContainer;
1617

1718
const eventStoreFactory: EventStoreFactory = async () => {
18-
esdbContainer = await new EventStoreDBContainer().start();
19+
esdbContainer = await getSharedEventStoreDBTestContainer();
1920
return getEventStoreDBEventStore(esdbContainer.getClient());
2021
};
2122

2223
const teardownHook = async () => {
23-
await esdbContainer.stop();
24+
await releaseSharedEventStoreDBTestContainer();
2425
};
2526

2627
await testAggregateStream(eventStoreFactory, {

src/packages/emmett/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export * from './projections';
77
export * from './serialization';
88
export * from './streaming';
99
export * from './subscriptions';
10+
export * from './taskProcessing';
1011
export * from './testing';
1112
export * from './typing';
1213
export * from './utils';
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './taskProcessor';
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import { EmmettError } from '../errors';
2+
3+
export type TaskQueue = TaskQueueItem[];
4+
5+
export type TaskQueueItem = {
6+
task: () => Promise<void>;
7+
options?: EnqueueTaskOptions;
8+
};
9+
10+
export type TaskProcessorOptions = {
11+
maxActiveTasks: number;
12+
maxQueueSize: number;
13+
maxTaskIdleTime?: number;
14+
};
15+
16+
export type Task<T> = (context: TaskContext) => Promise<T>;
17+
18+
export type TaskContext = {
19+
ack: () => void;
20+
};
21+
22+
export type EnqueueTaskOptions = { taskGroupId?: string };
23+
24+
export class TaskProcessor {
25+
private queue: TaskQueue = [];
26+
private isProcessing = false;
27+
private activeTasks = 0;
28+
private activeGroups: Set<string> = new Set();
29+
30+
constructor(private options: TaskProcessorOptions) {}
31+
32+
enqueue<T>(task: Task<T>, options?: EnqueueTaskOptions): Promise<T> {
33+
if (this.queue.length >= this.options.maxQueueSize) {
34+
return Promise.reject(
35+
new EmmettError(
36+
'Too many pending connections. Please try again later.',
37+
),
38+
);
39+
}
40+
41+
return this.schedule(task, options);
42+
}
43+
44+
waitForEndOfProcessing(): Promise<void> {
45+
return this.schedule(({ ack }) => Promise.resolve(ack()));
46+
}
47+
48+
private schedule<T>(task: Task<T>, options?: EnqueueTaskOptions): Promise<T> {
49+
return promiseWithDeadline(
50+
(resolve, reject) => {
51+
const taskWithContext = () => {
52+
return new Promise<void>((resolveTask, failTask) => {
53+
const taskPromise = task({
54+
ack: resolveTask,
55+
});
56+
57+
taskPromise.then(resolve).catch((err) => {
58+
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
59+
failTask(err);
60+
reject(err);
61+
});
62+
});
63+
};
64+
65+
this.queue.push({ task: taskWithContext, options });
66+
if (!this.isProcessing) {
67+
this.ensureProcessing();
68+
}
69+
},
70+
{ deadline: this.options.maxTaskIdleTime },
71+
);
72+
}
73+
74+
private ensureProcessing(): void {
75+
if (this.isProcessing) return;
76+
this.isProcessing = true;
77+
this.processQueue();
78+
}
79+
80+
private processQueue(): void {
81+
try {
82+
while (
83+
this.activeTasks < this.options.maxActiveTasks &&
84+
this.queue.length > 0
85+
) {
86+
const item = this.takeFirstAvailableItem();
87+
88+
if (item === null) return;
89+
90+
const groupId = item.options?.taskGroupId;
91+
92+
if (groupId) {
93+
// Mark the group as active
94+
this.activeGroups.add(groupId);
95+
}
96+
97+
this.activeTasks++;
98+
void this.executeItem(item);
99+
}
100+
} catch (error) {
101+
console.error(error);
102+
throw error;
103+
} finally {
104+
this.isProcessing = false;
105+
if (
106+
this.hasItemsToProcess() &&
107+
this.activeTasks < this.options.maxActiveTasks
108+
) {
109+
this.ensureProcessing();
110+
}
111+
}
112+
}
113+
114+
private async executeItem({ task, options }: TaskQueueItem): Promise<void> {
115+
try {
116+
await task();
117+
} finally {
118+
this.activeTasks--;
119+
120+
// Mark the group as inactive after task completion
121+
if (options && options.taskGroupId) {
122+
this.activeGroups.delete(options.taskGroupId);
123+
}
124+
125+
this.ensureProcessing();
126+
}
127+
}
128+
129+
private takeFirstAvailableItem = (): TaskQueueItem | null => {
130+
const taskIndex = this.queue.findIndex(
131+
(item) =>
132+
!item.options?.taskGroupId ||
133+
!this.activeGroups.has(item.options.taskGroupId),
134+
);
135+
136+
if (taskIndex === -1) {
137+
// All remaining tasks are blocked by active groups
138+
return null;
139+
}
140+
141+
// Remove the task from the queue
142+
const [item] = this.queue.splice(taskIndex, 1);
143+
144+
return item ?? null;
145+
};
146+
147+
private hasItemsToProcess = (): boolean =>
148+
this.queue.findIndex(
149+
(item) =>
150+
!item.options?.taskGroupId ||
151+
!this.activeGroups.has(item.options.taskGroupId),
152+
) !== -1;
153+
}
154+
155+
const DEFAULT_PROMISE_DEADLINE = 2147483647;
156+
157+
const promiseWithDeadline = <T>(
158+
executor: (
159+
resolve: (value: T | PromiseLike<T>) => void,
160+
reject: (reason?: unknown) => void,
161+
) => void,
162+
options: { deadline?: number },
163+
) => {
164+
return new Promise<T>((resolve, reject) => {
165+
let taskStarted = false;
166+
167+
const maxWaitingTime = options.deadline || DEFAULT_PROMISE_DEADLINE;
168+
169+
let timeoutId: NodeJS.Timeout | null = setTimeout(() => {
170+
if (!taskStarted) {
171+
reject(
172+
new Error('Task was not started within the maximum waiting time'),
173+
);
174+
}
175+
}, maxWaitingTime);
176+
177+
executor((value) => {
178+
taskStarted = true;
179+
if (timeoutId) {
180+
clearTimeout(timeoutId);
181+
}
182+
timeoutId = null;
183+
resolve(value);
184+
}, reject);
185+
});
186+
};

0 commit comments

Comments
 (0)