Skip to content

Commit 811bc97

Browse files
committed
Merge remote-tracking branch 'upstream/main' into feat/subscriptions
2 parents 13afb1e + e577358 commit 811bc97

File tree

5 files changed

+115
-20
lines changed

5 files changed

+115
-20
lines changed

src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.e2e.spec.ts

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
PostgreSqlContainer,
1212
StartedPostgreSqlContainer,
1313
} from '@testcontainers/postgresql';
14-
import { after, before, describe, it } from 'node:test';
14+
import { after, afterEach, before, beforeEach, describe, it } from 'node:test';
1515
import { v4 as uuid } from 'uuid';
1616
import {
1717
getPostgreSQLEventStore,
@@ -26,38 +26,97 @@ void describe('EventStoreDBEventStore', () => {
2626
let connectionString: string;
2727
let pongo: PongoClient;
2828

29+
const productItem: PricedProductItem = {
30+
productId: '123',
31+
quantity: 10,
32+
price: 3,
33+
};
34+
let clientId: string;
35+
let shoppingCartId: string;
36+
let schemaHookCreationHookCalls = 0;
37+
2938
before(async () => {
3039
postgres = await new PostgreSqlContainer().start();
3140
connectionString = postgres.getConnectionUri();
41+
pongo = pongoClient(connectionString);
42+
});
43+
44+
beforeEach(() => {
3245
eventStore = getPostgreSQLEventStore(connectionString, {
3346
projections: projections.inline([
3447
shoppingCartShortInfoProjection,
3548
customProjection,
3649
]),
50+
hooks: {
51+
onAfterSchemaCreated: () => {
52+
schemaHookCreationHookCalls++;
53+
},
54+
},
3755
});
38-
pongo = pongoClient(connectionString);
39-
return eventStore;
56+
clientId = uuid();
57+
shoppingCartId = `shopping_cart-${clientId}`;
58+
schemaHookCreationHookCalls = 0;
4059
});
4160

42-
after(async () => {
61+
afterEach(async () => {
4362
try {
4463
await eventStore.close();
64+
} catch (error) {
65+
console.log(error);
66+
}
67+
});
68+
69+
after(async () => {
70+
try {
4571
await pongo.close();
4672
await postgres.stop();
4773
} catch (error) {
4874
console.log(error);
4975
}
5076
});
5177

78+
void it('should create schema only once ', async () => {
79+
await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [
80+
{
81+
type: 'ProductItemAdded',
82+
data: { productItem },
83+
metadata: { clientId },
84+
},
85+
]);
86+
await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [
87+
{
88+
type: 'ProductItemAdded',
89+
data: { productItem },
90+
metadata: { clientId },
91+
},
92+
]);
93+
94+
assertEqual(1, schemaHookCreationHookCalls);
95+
});
96+
97+
void it('should create schema only once with session', async () => {
98+
await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [
99+
{
100+
type: 'ProductItemAdded',
101+
data: { productItem },
102+
metadata: { clientId },
103+
},
104+
]);
105+
await eventStore.withSession(({ eventStore: session }) =>
106+
session.appendToStream<ShoppingCartEvent>(shoppingCartId, [
107+
{
108+
type: 'ProductItemAdded',
109+
data: { productItem },
110+
metadata: { clientId },
111+
},
112+
]),
113+
);
114+
115+
assertEqual(1, schemaHookCreationHookCalls);
116+
});
117+
52118
void it('should append events correctly using appendEvent function', async () => {
53-
const productItem: PricedProductItem = {
54-
productId: '123',
55-
quantity: 10,
56-
price: 3,
57-
};
58119
const discount = 10;
59-
const clientId = uuid();
60-
const shoppingCartId = `shopping_cart-${clientId}`;
61120
handledEventsInCustomProjection = [];
62121

63122
await eventStore.appendToStream<ShoppingCartEvent>(shoppingCartId, [

src/packages/emmett-postgresql/src/eventStore/postgreSQLEventStore.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ export type PostgresEventStoreOptions = {
143143
>[];
144144
schema?: { autoMigration?: MigrationStyle };
145145
connectionOptions?: PostgresEventStoreConnectionOptions;
146+
hooks?: {
147+
/**
148+
* This hook will be called **AFTER** event store schema was created
149+
*/
150+
onAfterSchemaCreated?: () => Promise<void> | void;
151+
};
146152
};
147153

148154
export const defaultPostgreSQLOptions: PostgresEventStoreOptions = {
@@ -161,7 +167,7 @@ export const getPostgreSQLEventStore = (
161167
...(options.connectionOptions ? options.connectionOptions : {}),
162168
};
163169
const pool = 'dumbo' in poolOptions ? poolOptions.dumbo : dumbo(poolOptions);
164-
let migrateSchema: Promise<void>;
170+
let migrateSchema: Promise<void> | undefined = undefined;
165171

166172
const autoGenerateSchema =
167173
options.schema?.autoMigration === undefined ||
@@ -171,7 +177,11 @@ export const getPostgreSQLEventStore = (
171177
if (!autoGenerateSchema) return Promise.resolve();
172178

173179
if (!migrateSchema) {
174-
migrateSchema = createEventStoreSchema(pool);
180+
migrateSchema = createEventStoreSchema(pool).then(async () => {
181+
if (options.hooks?.onAfterSchemaCreated) {
182+
await options.hooks.onAfterSchemaCreated();
183+
}
184+
});
175185
}
176186
return migrateSchema;
177187
};
@@ -301,11 +311,28 @@ export const getPostgreSQLEventStore = (
301311
callback: (session: EventStoreSession<PostgresEventStore>) => Promise<T>,
302312
): Promise<T> {
303313
return await pool.withConnection(async (connection) => {
314+
const autoMigration: MigrationStyle = migrateSchema
315+
? 'None'
316+
: (options.schema?.autoMigration ?? 'CreateOrUpdate');
317+
304318
const storeOptions: PostgresEventStoreOptions = {
305319
...options,
306320
connectionOptions: {
307321
connection,
308322
},
323+
schema: {
324+
autoMigration,
325+
},
326+
hooks: {
327+
...(options.hooks ?? {}),
328+
onAfterSchemaCreated: async () => {
329+
migrateSchema = Promise.resolve();
330+
331+
if (options.hooks?.onAfterSchemaCreated) {
332+
await options.hooks.onAfterSchemaCreated();
333+
}
334+
},
335+
},
309336
};
310337

311338
const eventStore = getPostgreSQLEventStore(

src/packages/emmett-postgresql/src/eventStore/schema/appendToStream.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,15 @@ export const appendToStreamSQL = rawSql(
4848
v_transaction_id := pg_current_xact_id();
4949
5050
IF v_expected_stream_position IS NULL THEN
51-
SELECT COALESCE(max(stream_position), 0) INTO v_expected_stream_position
52-
FROM ${streamsTable.name}
53-
WHERE stream_id = v_stream_id AND partition = v_partition;
51+
SELECT COALESCE(
52+
(SELECT stream_position
53+
FROM ${streamsTable.name}
54+
WHERE stream_id = v_stream_id
55+
AND partition = v_partition
56+
AND is_archived = FALSE
57+
LIMIT 1),
58+
0
59+
) INTO v_expected_stream_position;
5460
END IF;
5561
5662
v_next_stream_position := v_expected_stream_position + array_upper(v_messages_data, 1);

src/packages/emmett-postgresql/src/eventStore/schema/tables.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ export const streamsTableSQL = rawSql(
1515
stream_type TEXT NOT NULL,
1616
stream_metadata JSONB NOT NULL,
1717
is_archived BOOLEAN NOT NULL DEFAULT FALSE,
18-
PRIMARY KEY (stream_id, stream_position, partition, is_archived),
19-
UNIQUE (stream_id, partition, is_archived)
20-
) PARTITION BY LIST (partition);`,
18+
PRIMARY KEY (stream_id, partition, is_archived)
19+
) PARTITION BY LIST (partition);
20+
21+
CREATE UNIQUE INDEX IF NOT EXISTS idx_streams_unique
22+
ON ${streamsTable.name}(stream_id, partition, is_archived)
23+
INCLUDE (stream_position);`,
2124
);
2225

2326
export const messagesTableSQL = rawSql(

src/packages/emmett-sqlite/src/eventStore/schema/tables.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ export const streamsTableSQL = sql(
1616
stream_type TEXT NOT NULL,
1717
stream_metadata JSONB NOT NULL,
1818
is_archived BOOLEAN NOT NULL DEFAULT FALSE,
19-
PRIMARY KEY (stream_id, stream_position, partition, is_archived),
19+
PRIMARY KEY (stream_id, partition, is_archived),
2020
UNIQUE (stream_id, partition, is_archived)
2121
);`,
2222
);

0 commit comments

Comments
 (0)