Skip to content

Commit e577358

Browse files
committed
Added hook to be able to detect if schema was created already when using event store session (e.g. in command handler)
1 parent 5e6bf09 commit e577358

File tree

3 files changed

+100
-14
lines changed

3 files changed

+100
-14
lines changed

src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.e2e.spec.ts

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
PostgreSqlContainer,
1212
StartedPostgreSqlContainer,
1313
} from '@testcontainers/postgresql';
14-
import { after, before, describe, it } from 'node:test';
14+
import { after, afterEach, before, beforeEach, describe, it } from 'node:test';
1515
import { v4 as uuid } from 'uuid';
1616
import {
1717
getPostgreSQLEventStore,
@@ -26,38 +26,97 @@ void describe('EventStoreDBEventStore', () => {
2626
let connectionString: string;
2727
let pongo: PongoClient;
2828

29+
const productItem: PricedProductItem = {
30+
productId: '123',
31+
quantity: 10,
32+
price: 3,
33+
};
34+
let clientId: string;
35+
let shoppingCartId: string;
36+
let schemaHookCreationHookCalls = 0;
37+
2938
before(async () => {
3039
postgres = await new PostgreSqlContainer().start();
3140
connectionString = postgres.getConnectionUri();
41+
pongo = pongoClient(connectionString);
42+
});
43+
44+
beforeEach(() => {
3245
eventStore = getPostgreSQLEventStore(connectionString, {
3346
projections: projections.inline([
3447
shoppingCartShortInfoProjection,
3548
customProjection,
3649
]),
50+
hooks: {
51+
onAfterSchemaCreated: () => {
52+
schemaHookCreationHookCalls++;
53+
},
54+
},
3755
});
38-
pongo = pongoClient(connectionString);
39-
return eventStore;
56+
clientId = uuid();
57+
shoppingCartId = `shopping_cart-${clientId}`;
58+
schemaHookCreationHookCalls = 0;
4059
});
4160

42-
after(async () => {
61+
afterEach(async () => {
4362
try {
4463
await eventStore.close();
64+
} catch (error) {
65+
console.log(error);
66+
}
67+
});
68+
69+
after(async () => {
70+
try {
4571
await pongo.close();
4672
await postgres.stop();
4773
} catch (error) {
4874
console.log(error);
4975
}
5076
});
5177

78+
void it('should create schema only once ', async () => {
79+
await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [
80+
{
81+
type: 'ProductItemAdded',
82+
data: { productItem },
83+
metadata: { clientId },
84+
},
85+
]);
86+
await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [
87+
{
88+
type: 'ProductItemAdded',
89+
data: { productItem },
90+
metadata: { clientId },
91+
},
92+
]);
93+
94+
assertEqual(1, schemaHookCreationHookCalls);
95+
});
96+
97+
void it('should create schema only once with session', async () => {
98+
await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [
99+
{
100+
type: 'ProductItemAdded',
101+
data: { productItem },
102+
metadata: { clientId },
103+
},
104+
]);
105+
await eventStore.withSession(({ eventStore: session }) =>
106+
session.appendToStream<ShoppingCartEvent>(shoppingCartId, [
107+
{
108+
type: 'ProductItemAdded',
109+
data: { productItem },
110+
metadata: { clientId },
111+
},
112+
]),
113+
);
114+
115+
assertEqual(1, schemaHookCreationHookCalls);
116+
});
117+
52118
void it('should append events correctly using appendEvent function', async () => {
53-
const productItem: PricedProductItem = {
54-
productId: '123',
55-
quantity: 10,
56-
price: 3,
57-
};
58119
const discount = 10;
59-
const clientId = uuid();
60-
const shoppingCartId = `shopping_cart-${clientId}`;
61120
handledEventsInCustomProjection = [];
62121

63122
await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [

src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ export type PostgresEventStoreOptions = {
143143
>[];
144144
schema?: { autoMigration?: MigrationStyle };
145145
connectionOptions?: PostgresEventStoreConnectionOptions;
146+
hooks?: {
147+
/**
148+
* This hook will be called **AFTER** event store schema was created
149+
*/
150+
onAfterSchemaCreated?: () => Promise<void> | void;
151+
};
146152
};
147153

148154
export const defaultPostgreSQLOptions: PostgresEventStoreOptions = {
@@ -161,7 +167,7 @@ export const getPostgreSQLEventStore = (
161167
...(options.connectionOptions ? options.connectionOptions : {}),
162168
};
163169
const pool = 'dumbo' in poolOptions ? poolOptions.dumbo : dumbo(poolOptions);
164-
let migrateSchema: Promise<void>;
170+
let migrateSchema: Promise<void> | undefined = undefined;
165171

166172
const autoGenerateSchema =
167173
options.schema?.autoMigration === undefined ||
@@ -171,7 +177,11 @@ export const getPostgreSQLEventStore = (
171177
if (!autoGenerateSchema) return Promise.resolve();
172178

173179
if (!migrateSchema) {
174-
migrateSchema = createEventStoreSchema(pool);
180+
migrateSchema = createEventStoreSchema(pool).then(async () => {
181+
if (options.hooks?.onAfterSchemaCreated) {
182+
await options.hooks.onAfterSchemaCreated();
183+
}
184+
});
175185
}
176186
return migrateSchema;
177187
};
@@ -301,11 +311,28 @@ export const getPostgreSQLEventStore = (
301311
callback: (session: EventStoreSession<PostgresEventStore>) => Promise<T>,
302312
): Promise<T> {
303313
return await pool.withConnection(async (connection) => {
314+
const autoMigration: MigrationStyle = migrateSchema
315+
? 'None'
316+
: (options.schema?.autoMigration ?? 'CreateOrUpdate');
317+
304318
const storeOptions: PostgresEventStoreOptions = {
305319
...options,
306320
connectionOptions: {
307321
connection,
308322
},
323+
schema: {
324+
autoMigration,
325+
},
326+
hooks: {
327+
...(options.hooks ?? {}),
328+
onAfterSchemaCreated: async () => {
329+
migrateSchema = Promise.resolve();
330+
331+
if (options.hooks?.onAfterSchemaCreated) {
332+
await options.hooks.onAfterSchemaCreated();
333+
}
334+
},
335+
},
309336
};
310337

311338
const eventStore = getPostgreSQLEventStore(

src/packages/emmett-postgresql/src/eventStore/schema/tables.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export const streamsTableSQL = rawSql(
1818
PRIMARY KEY (stream_id, partition, is_archived)
1919
) PARTITION BY LIST (partition);
2020
21-
CREATE UNIQUE INDEX idx_streams_unique
21+
CREATE UNIQUE INDEX IF NOT EXISTS idx_streams_unique
2222
ON ${streamsTable.name}(stream_id, partition, is_archived)
2323
INCLUDE (stream_position);`,
2424
);

0 commit comments

Comments
 (0)