From 03538474b4c72047e318f81f237e440078d67a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 14:49:48 +0200 Subject: [PATCH 01/33] AP-5046 WIP prisma adapter. --- packages/outbox-prisma-adapter/README.md | 42 +++++++++++++ packages/outbox-prisma-adapter/index.ts | 1 + .../lib/outbox-prisma-adapter.ts | 21 +++++++ packages/outbox-prisma-adapter/package.json | 60 +++++++++++++++++++ packages/outbox-prisma-adapter/tsconfig.json | 27 +++++++++ .../tsconfig.release.json | 5 ++ .../outbox-prisma-adapter/vitest.config.mts | 23 +++++++ 7 files changed, 179 insertions(+) create mode 100644 packages/outbox-prisma-adapter/README.md create mode 100644 packages/outbox-prisma-adapter/index.ts create mode 100644 packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts create mode 100644 packages/outbox-prisma-adapter/package.json create mode 100644 packages/outbox-prisma-adapter/tsconfig.json create mode 100644 packages/outbox-prisma-adapter/tsconfig.release.json create mode 100644 packages/outbox-prisma-adapter/vitest.config.mts diff --git a/packages/outbox-prisma-adapter/README.md b/packages/outbox-prisma-adapter/README.md new file mode 100644 index 00000000..91e9c541 --- /dev/null +++ b/packages/outbox-prisma-adapter/README.md @@ -0,0 +1,42 @@ +# outbox-core + +Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages. + +## Installation + +```bash +npm i -S @message-queue-toolkit/outbox-core +``` + +## Usage + +To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class: + +```typescript +import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core'; + +const job = new OutboxPeriodicJob( + //Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit + outboxStorage, + //Default available accumulator for gathering outbox entries as the process job is progressing. + new InMemoryOutboxAccumulator(), + //DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core + eventEmitter, + //See PeriodicJobDependencies from @lokalise/background-jobs-common + dependencies, + //Retry count, how many times outbox entries should be retried to be processed + 3, + //emitBatchSize - how many outbox entries should be emitted at once + 10, + //internalInMs - how often the job should be executed, e.g. below it runs every 1sec + 1000 +) +``` + +Job will take care of processing outbox entries emitted by: +```typescript +const emitter = new OutboxEventEmitter( + //Same instance of outbox storage that is used by OutboxPeriodicJob + outboxStorage +) +``` diff --git a/packages/outbox-prisma-adapter/index.ts b/packages/outbox-prisma-adapter/index.ts new file mode 100644 index 00000000..aeacc8f5 --- /dev/null +++ b/packages/outbox-prisma-adapter/index.ts @@ -0,0 +1 @@ +export * from './lib/outbox-prisma-adapter' diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts new file mode 100644 index 00000000..a03a33df --- /dev/null +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -0,0 +1,21 @@ +import type { OutboxAccumulator, OutboxEntry } from '@message-queue-toolkit/outbox-core' +import type { OutboxStorage } from '@message-queue-toolkit/outbox-core/dist/lib/storage' +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' + +export class OutboxPrismaAdapter + implements OutboxStorage +{ + createEntry( + outboxEntry: OutboxEntry, + ): Promise> { + return Promise.resolve(undefined) + } + + flush(outboxAccumulator: OutboxAccumulator): Promise { + return Promise.resolve(undefined) + } + + getEntries(maxRetryCount: number): Promise[]> { + return Promise.resolve([]) + } +} diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json new file mode 100644 index 00000000..3f0a2739 --- /dev/null +++ b/packages/outbox-prisma-adapter/package.json @@ -0,0 +1,60 @@ +{ + "name": "@message-queue-toolkit/outbox-prisma-adapter", + "version": "0.1.0", + "private": false, + "license": "MIT", + "description": "OutboxStorage implementation for @message-queue-toolkit/outbox-core package.", + "maintainers": [ + { + "name": "Igor Savin", + "email": "kibertoad@gmail.com" + } + ], + "main": "dist/index.js", + "types": "dist/index.d.ts", + "scripts": { + "build": "del-cli dist && tsc", + "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", + "test": "vitest", + "test:coverage": "npm test -- --coverage", + "test:ci": "npm run docker:start:dev && npm run test:coverage && npm run docker:stop:dev", + "lint": "biome check . && tsc --project tsconfig.json --noEmit", + "lint:fix": "biome check --write .", + "docker:start:dev": "docker compose up -d", + "docker:stop:dev": "docker compose down", + "prepublishOnly": "npm run build:release" + }, + "dependencies": {}, + "peerDependencies": { + "@message-queue-toolkit/core": ">=14.0.0", + "@message-queue-toolkit/schemas": ">=4.0.0", + "@message-queue-toolkit/outbox-core": ">=0.1.0" + }, + "devDependencies": { + "@biomejs/biome": "1.8.3", + "@kibertoad/biome-config": "^1.2.1", + "@types/node": "^22.0.0", + "@vitest/coverage-v8": "^2.0.4", + "del-cli": "^5.1.0", + "typescript": "^5.5.3", + "vitest": "^2.0.4", + "zod": "^3.23.8" + }, + "homepage": "https://github.com/kibertoad/message-queue-toolkit", + "repository": { + "type": "git", + "url": "git://github.com/kibertoad/message-queue-toolkit.git" + }, + "keywords": [ + "message", + "queue", + "queues", + "abstract", + "common", + "utils", + "notification", + "outbox", + "pattern" + ], + "files": ["README.md", "LICENSE", "dist/*"] +} diff --git a/packages/outbox-prisma-adapter/tsconfig.json b/packages/outbox-prisma-adapter/tsconfig.json new file mode 100644 index 00000000..9cd7c80a --- /dev/null +++ b/packages/outbox-prisma-adapter/tsconfig.json @@ -0,0 +1,27 @@ +{ + "compilerOptions": { + "outDir": "dist", + "module": "commonjs", + "target": "ES2022", + "lib": ["ES2022", "dom"], + "sourceMap": true, + "declaration": true, + "declarationMap": false, + "types": ["node", "vitest/globals"], + "strict": true, + "moduleResolution": "node", + "noUnusedLocals": false, + "noUnusedParameters": false, + "noFallthroughCasesInSwitch": true, + "strictNullChecks": true, + "importHelpers": true, + "baseUrl": ".", + "skipLibCheck": true, + "allowSyntheticDefaultImports": true, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true + }, + "include": ["lib/**/*.ts", "test/**/*.ts", "index.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/outbox-prisma-adapter/tsconfig.release.json b/packages/outbox-prisma-adapter/tsconfig.release.json new file mode 100644 index 00000000..93ab99f8 --- /dev/null +++ b/packages/outbox-prisma-adapter/tsconfig.release.json @@ -0,0 +1,5 @@ +{ + "extends": "./tsconfig.json", + "include": ["lib/**/*.ts", "index.ts"], + "exclude": ["node_modules", "dist", "lib/**/*.spec.ts"] +} diff --git a/packages/outbox-prisma-adapter/vitest.config.mts b/packages/outbox-prisma-adapter/vitest.config.mts new file mode 100644 index 00000000..2bcce478 --- /dev/null +++ b/packages/outbox-prisma-adapter/vitest.config.mts @@ -0,0 +1,23 @@ +import { defineConfig } from 'vitest/config' + +export default defineConfig({ + test: { + globals: true, + watch: false, + environment: 'node', + reporters: ['default'], + coverage: { + provider: 'v8', + include: ['lib/**/*.ts'], + exclude: ['lib/**/*.spec.ts', 'lib/**/*.test.ts', 'test/**/*.*'], + reporter: ['text'], + all: true, + thresholds: { + lines: 100, + functions: 100, + branches: 91.66, + statements: 100, + }, + }, + }, +}) From 14fbfba60499fae028fc51dfa3153726bd0be2a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Mon, 9 Sep 2024 15:49:31 +0200 Subject: [PATCH 02/33] WIP prisma adapter. --- .../outbox-prisma-adapter/docker-compose.yml | 10 +++++++ .../lib/outbox-prisma-adapter.spec.ts | 26 +++++++++++++++++++ .../lib/outbox-prisma-adapter.ts | 12 ++++++++- packages/outbox-prisma-adapter/package.json | 11 +++++--- .../outbox-prisma-adapter/test/schema.prisma | 15 +++++++++++ 5 files changed, 70 insertions(+), 4 deletions(-) create mode 100644 packages/outbox-prisma-adapter/docker-compose.yml create mode 100644 packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.spec.ts create mode 100644 packages/outbox-prisma-adapter/test/schema.prisma diff --git a/packages/outbox-prisma-adapter/docker-compose.yml b/packages/outbox-prisma-adapter/docker-compose.yml new file mode 100644 index 00000000..068b7a74 --- /dev/null +++ b/packages/outbox-prisma-adapter/docker-compose.yml @@ -0,0 +1,10 @@ +services: + + postgres: + image: postgres:16.2 + environment: + POSTGRES_USER: prisma + POSTGRES_PASSWORD: prisma + POSTGRES_DB: prisma + ports: + - 5432:5432 diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.spec.ts new file mode 100644 index 00000000..01005898 --- /dev/null +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.spec.ts @@ -0,0 +1,26 @@ +import { PrismaClient } from '@prisma/client' +import { beforeAll, describe, it } from 'vitest' + +describe('outbox-prisma-adapter', () => { + let prisma: PrismaClient + + beforeAll(async () => { + prisma = new PrismaClient() + + await prisma.$queryRaw` + CREATE TABLE prisma.outbox_entry (id UUID PRIMARY KEY, created TIMESTAMP NOT NULL)` + }) + + it('created outbox entry', async () => { + const result = await prisma.$queryRaw`SELECT 1 as counter;` + + console.log(result) + + await prisma.outboxEntry.create({ + data: { + id: 'ce08b43b-6162-4913-86ea-fa9367875e3b', + created: new Date(), + }, + }) + }) +}) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index a03a33df..f400d8b4 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -1,14 +1,24 @@ import type { OutboxAccumulator, OutboxEntry } from '@message-queue-toolkit/outbox-core' import type { OutboxStorage } from '@message-queue-toolkit/outbox-core/dist/lib/storage' import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import type { PrismaClient } from '@prisma/client' export class OutboxPrismaAdapter implements OutboxStorage { + constructor( + private readonly prisma: PrismaClient, + private readonly modelName: string, + ) {} + createEntry( outboxEntry: OutboxEntry, ): Promise> { - return Promise.resolve(undefined) + const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName] + + return prismaModel.create({ + data: outboxEntry, + }) } flush(outboxAccumulator: OutboxAccumulator): Promise { diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 3f0a2739..4eb8485d 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -24,11 +24,11 @@ "docker:stop:dev": "docker compose down", "prepublishOnly": "npm run build:release" }, - "dependencies": {}, "peerDependencies": { "@message-queue-toolkit/core": ">=14.0.0", + "@message-queue-toolkit/outbox-core": ">=0.1.0", "@message-queue-toolkit/schemas": ">=4.0.0", - "@message-queue-toolkit/outbox-core": ">=0.1.0" + "@prisma/client": "^5.19.1" }, "devDependencies": { "@biomejs/biome": "1.8.3", @@ -36,6 +36,7 @@ "@types/node": "^22.0.0", "@vitest/coverage-v8": "^2.0.4", "del-cli": "^5.1.0", + "prisma": "^5.19.1", "typescript": "^5.5.3", "vitest": "^2.0.4", "zod": "^3.23.8" @@ -56,5 +57,9 @@ "outbox", "pattern" ], - "files": ["README.md", "LICENSE", "dist/*"] + "files": [ + "README.md", + "LICENSE", + "dist/*" + ] } diff --git a/packages/outbox-prisma-adapter/test/schema.prisma b/packages/outbox-prisma-adapter/test/schema.prisma new file mode 100644 index 00000000..44134032 --- /dev/null +++ b/packages/outbox-prisma-adapter/test/schema.prisma @@ -0,0 +1,15 @@ +datasource db { + provider = "postgresql" + url = "postgresql://prisma:prisma@localhost:5432/prisma?schema=testdb" +} + +model OutboxEntry { + id String @id @default(uuid()) @db.Uuid + created DateTime @default(now()) + + @@map("outbox_entry") +} + +generator client { + provider = "prisma-client-js" +} From d55405ea7642b7c9c95059857c5c93185643bfb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 10:52:34 +0200 Subject: [PATCH 03/33] Working test. --- .../lib/outbox-prisma-adapter.spec.ts | 26 ------------ .../test/outbox-prisma-adapter.spec.ts | 42 +++++++++++++++++++ .../outbox-prisma-adapter/test/schema.prisma | 2 +- 3 files changed, 43 insertions(+), 27 deletions(-) delete mode 100644 packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.spec.ts create mode 100644 packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.spec.ts deleted file mode 100644 index 01005898..00000000 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.spec.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { PrismaClient } from '@prisma/client' -import { beforeAll, describe, it } from 'vitest' - -describe('outbox-prisma-adapter', () => { - let prisma: PrismaClient - - beforeAll(async () => { - prisma = new PrismaClient() - - await prisma.$queryRaw` - CREATE TABLE prisma.outbox_entry (id UUID PRIMARY KEY, created TIMESTAMP NOT NULL)` - }) - - it('created outbox entry', async () => { - const result = await prisma.$queryRaw`SELECT 1 as counter;` - - console.log(result) - - await prisma.outboxEntry.create({ - data: { - id: 'ce08b43b-6162-4913-86ea-fa9367875e3b', - created: new Date(), - }, - }) - }) -}) diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts new file mode 100644 index 00000000..8e7a6a53 --- /dev/null +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -0,0 +1,42 @@ +import { PrismaClient } from '@prisma/client' +import { afterAll, beforeAll, describe, expect, it } from 'vitest' + +describe('outbox-prisma-adapter', () => { + let prisma: PrismaClient + + beforeAll(async () => { + prisma = new PrismaClient() + + await prisma.$queryRaw`create schema if not exists prisma;` + await prisma.$queryRaw` + CREATE TABLE prisma.outbox_entry ( + id UUID PRIMARY KEY, + created TIMESTAMP NOT NULL + ) + ` + }) + + afterAll(async () => { + await prisma.$queryRaw`DROP TABLE prisma.outbox_entry;` + await prisma.$queryRaw`DROP SCHEMA prisma;` + await prisma.$disconnect() + }) + + it('test db connection', async () => { + const creationDate = new Date() + await prisma.outboxEntry.create({ + data: { + id: 'ce08b43b-6162-4913-86ea-fa9367875e3b', + created: creationDate, + }, + }) + + const result = await prisma.outboxEntry.findMany() + expect(result).toEqual([ + { + id: 'ce08b43b-6162-4913-86ea-fa9367875e3b', + created: creationDate, + }, + ]) + }) +}) diff --git a/packages/outbox-prisma-adapter/test/schema.prisma b/packages/outbox-prisma-adapter/test/schema.prisma index 44134032..812d7f9c 100644 --- a/packages/outbox-prisma-adapter/test/schema.prisma +++ b/packages/outbox-prisma-adapter/test/schema.prisma @@ -1,6 +1,6 @@ datasource db { provider = "postgresql" - url = "postgresql://prisma:prisma@localhost:5432/prisma?schema=testdb" + url = "postgresql://prisma:prisma@localhost:5432/prisma?schema=prisma" } model OutboxEntry { From e93bdadeced6ec6fba318ce22e6f01d4f5a74157 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 11 Sep 2024 12:46:21 +0200 Subject: [PATCH 04/33] WIP --- .../lib/outbox-prisma-adapter.ts | 4 +- packages/outbox-prisma-adapter/package.json | 1 + .../test/outbox-prisma-adapter.spec.ts | 40 +++++++++++++++++++ 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index f400d8b4..c0f272fa 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -1,6 +1,6 @@ import type { OutboxAccumulator, OutboxEntry } from '@message-queue-toolkit/outbox-core' import type { OutboxStorage } from '@message-queue-toolkit/outbox-core/dist/lib/storage' -import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' +import { type CommonEventDefinition, getMessageType } from '@message-queue-toolkit/schemas' import type { PrismaClient } from '@prisma/client' export class OutboxPrismaAdapter @@ -17,7 +17,7 @@ export class OutboxPrismaAdapter + +type SupportedEvents = (typeof events)[keyof typeof events][] describe('outbox-prisma-adapter', () => { let prisma: PrismaClient + let outboxPrismaAdapter: OutboxPrismaAdapter beforeAll(async () => { prisma = new PrismaClient() + outboxPrismaAdapter = new OutboxPrismaAdapter(prisma, 'OutboxEntry') + await prisma.$queryRaw`create schema if not exists prisma;` await prisma.$queryRaw` CREATE TABLE prisma.outbox_entry ( @@ -39,4 +63,20 @@ describe('outbox-prisma-adapter', () => { }, ]) }) + + it('creates entry in DB via outbox storage implementation', async () => { + await outboxPrismaAdapter.createEntry({ + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + } satisfies OutboxEntry) + }) }) From 963014a8e07aa83b505f9a05e4ccbda202f42625 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Tue, 26 Nov 2024 15:28:31 +0100 Subject: [PATCH 05/33] Working test for saving outbox entries. --- packages/outbox-prisma-adapter/README.md | 44 ++++-------------- .../lib/outbox-prisma-adapter.ts | 17 ++++++- .../test/outbox-prisma-adapter.spec.ts | 46 +++++++++++-------- .../outbox-prisma-adapter/test/schema.prisma | 8 +++- 4 files changed, 58 insertions(+), 57 deletions(-) diff --git a/packages/outbox-prisma-adapter/README.md b/packages/outbox-prisma-adapter/README.md index 91e9c541..e480b2a2 100644 --- a/packages/outbox-prisma-adapter/README.md +++ b/packages/outbox-prisma-adapter/README.md @@ -1,42 +1,18 @@ -# outbox-core +# outbox-prisma-adapter -Main package that contains the core functionality of the Outbox pattern to provide "at least once" delivery semantics for messages. +This package provides a Prisma adapter for the Outbox pattern. -## Installation +### Development -```bash -npm i -S @message-queue-toolkit/outbox-core -``` - -## Usage - -To process outbox entries and emit them to the message queue, you need to create an instance of the `OutboxPeriodicJob` class: +#### Tests -```typescript -import { OutboxPeriodicJob } from '@message-queue-toolkit/outbox-core'; +To run the tests, you need to have a PostgreSQL database running. You can use the following command to start a PostgreSQL database using Docker: -const job = new OutboxPeriodicJob( - //Implementation of OutboxStorage interface, TODO: Point to other packages in message-queue-toolkit - outboxStorage, - //Default available accumulator for gathering outbox entries as the process job is progressing. - new InMemoryOutboxAccumulator(), - //DomainEventEmitter, it will be used to publish events, see @message-queue-toolkit/core - eventEmitter, - //See PeriodicJobDependencies from @lokalise/background-jobs-common - dependencies, - //Retry count, how many times outbox entries should be retried to be processed - 3, - //emitBatchSize - how many outbox entries should be emitted at once - 10, - //internalInMs - how often the job should be executed, e.g. below it runs every 1sec - 1000 -) +```sh +docker-compose up -d ``` -Job will take care of processing outbox entries emitted by: -```typescript -const emitter = new OutboxEventEmitter( - //Same instance of outbox storage that is used by OutboxPeriodicJob - outboxStorage -) +Then update Prisma client: +```sh +npx prisma generate --schema=./test/schema.prisma ``` diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index c0f272fa..6b27b448 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -16,8 +16,15 @@ export class OutboxPrismaAdapter> { const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName] + const messageType = getMessageType(outboxEntry.event) return prismaModel.create({ - data: getMessageType(outboxEntry.event), + data: { + id: outboxEntry.id, + type: messageType, + created: outboxEntry.created, + data: outboxEntry.data, + status: outboxEntry.status, + }, }) } @@ -26,6 +33,12 @@ export class OutboxPrismaAdapter[]> { - return Promise.resolve([]) + return this.prisma[this.modelName].findMany({ + where: { + retryCount: { + lte: maxRetryCount, + }, + }, + }) } } diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index ff689091..6b74b13f 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -35,7 +35,11 @@ describe('outbox-prisma-adapter', () => { await prisma.$queryRaw` CREATE TABLE prisma.outbox_entry ( id UUID PRIMARY KEY, - created TIMESTAMP NOT NULL + type TEXT NOT NULL, + created TIMESTAMP NOT NULL, + retry_count INT NOT NULL DEFAULT 0, + data JSONB NOT NULL, + status TEXT NOT NULL ) ` }) @@ -46,24 +50,6 @@ describe('outbox-prisma-adapter', () => { await prisma.$disconnect() }) - it('test db connection', async () => { - const creationDate = new Date() - await prisma.outboxEntry.create({ - data: { - id: 'ce08b43b-6162-4913-86ea-fa9367875e3b', - created: creationDate, - }, - }) - - const result = await prisma.outboxEntry.findMany() - expect(result).toEqual([ - { - id: 'ce08b43b-6162-4913-86ea-fa9367875e3b', - created: creationDate, - }, - ]) - }) - it('creates entry in DB via outbox storage implementation', async () => { await outboxPrismaAdapter.createEntry({ id: uuidv7(), @@ -77,6 +63,28 @@ describe('outbox-prisma-adapter', () => { metadata: {}, timestamp: new Date().toISOString(), }, + retryCount: 0, + created: new Date(), } satisfies OutboxEntry) + + const entries = await outboxPrismaAdapter.getEntries(10) + + expect(entries).toEqual([ + { + id: expect.any(String), + type: 'entity.created', + created: expect.any(Date), + retryCount: 0, + data: { + id: expect.any(String), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: expect.any(String), + }, + status: 'CREATED', + }, + ]) }) }) diff --git a/packages/outbox-prisma-adapter/test/schema.prisma b/packages/outbox-prisma-adapter/test/schema.prisma index 812d7f9c..a7196c29 100644 --- a/packages/outbox-prisma-adapter/test/schema.prisma +++ b/packages/outbox-prisma-adapter/test/schema.prisma @@ -4,8 +4,12 @@ datasource db { } model OutboxEntry { - id String @id @default(uuid()) @db.Uuid - created DateTime @default(now()) + id String @id @default(uuid()) @db.Uuid + created DateTime @default(now()) + type String + retryCount Int @default(0) @map("retry_count") + data Json + status String @@map("outbox_entry") } From 7444fab001375659ed7d304b8084e273ac90a04c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Tue, 26 Nov 2024 16:11:57 +0100 Subject: [PATCH 06/33] failing test for updating. --- .../lib/outbox-prisma-adapter.ts | 27 +++- .../test/outbox-prisma-adapter.spec.ts | 124 +++++++++++++++++- .../outbox-prisma-adapter/test/schema.prisma | 1 + 3 files changed, 149 insertions(+), 3 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index 6b27b448..412620c8 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -22,14 +22,37 @@ export class OutboxPrismaAdapter): Promise { - return Promise.resolve(undefined) + async flush(outboxAccumulator: OutboxAccumulator): Promise { + const entries = await outboxAccumulator.getEntries() + + const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName] + + for (const entry of entries) { + await prismaModel.upsert({ + where: { + id: entry.id, + }, + update: { + status: 'SUCCESS', + updated: new Date(), + }, + create: { + id: entry.id, + type: getMessageType(entry.event), + created: entry.created, + updated: new Date(), + data: entry.data, + status: 'SUCCESS', + }, + }) + } } getEntries(maxRetryCount: number): Promise[]> { diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index 6b74b13f..c58cd8ed 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -1,4 +1,4 @@ -import type { OutboxEntry } from '@message-queue-toolkit/outbox-core' +import { InMemoryOutboxAccumulator, type OutboxEntry } from '@message-queue-toolkit/outbox-core' import { type CommonEventDefinition, enrichMessageSchemaWithBase, @@ -37,6 +37,7 @@ describe('outbox-prisma-adapter', () => { id UUID PRIMARY KEY, type TEXT NOT NULL, created TIMESTAMP NOT NULL, + updated TIMESTAMP, retry_count INT NOT NULL DEFAULT 0, data JSONB NOT NULL, status TEXT NOT NULL @@ -74,6 +75,7 @@ describe('outbox-prisma-adapter', () => { id: expect.any(String), type: 'entity.created', created: expect.any(Date), + updated: expect.any(Date), retryCount: 0, data: { id: expect.any(String), @@ -87,4 +89,124 @@ describe('outbox-prisma-adapter', () => { }, ]) }) + + it('should insert successful entries from accumulator', async () => { + const accumulator = new InMemoryOutboxAccumulator() + + const entry1 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + accumulator.add(entry1) + + const entry2 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT 2', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + accumulator.add(entry2) + + await outboxPrismaAdapter.flush(accumulator) + + const entriesAfterFlush = await outboxPrismaAdapter.getEntries(10) + + expect(entriesAfterFlush).toMatchObject([ + { + id: entry1.id, + status: 'SUCCESS', + }, + { + id: entry2.id, + status: 'SUCCESS', + }, + ]) + }) + + it("should update successful entries' status to 'SUCCESS'", async () => { + const accumulator = new InMemoryOutboxAccumulator() + + const entry1 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + accumulator.add(entry1) + + const entry2 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT 2', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + accumulator.add(entry2) + + await outboxPrismaAdapter.createEntry(entry1) + await outboxPrismaAdapter.createEntry(entry2) + + const beforeFlush = await outboxPrismaAdapter.getEntries(10) + expect(beforeFlush).toMatchObject([ + { + id: entry1.id, + status: 'CREATED', + }, + { + id: entry2.id, + status: 'CREATED', + }, + ]) + + outboxPrismaAdapter.flush(accumulator) + + const afterFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFlush).toMatchObject([ + { + id: entry1.id, + status: 'SUCCESS', + }, + { + id: entry2.id, + status: 'SUCCESS', + }, + ]) + }) }) diff --git a/packages/outbox-prisma-adapter/test/schema.prisma b/packages/outbox-prisma-adapter/test/schema.prisma index a7196c29..1b3c0ee8 100644 --- a/packages/outbox-prisma-adapter/test/schema.prisma +++ b/packages/outbox-prisma-adapter/test/schema.prisma @@ -6,6 +6,7 @@ datasource db { model OutboxEntry { id String @id @default(uuid()) @db.Uuid created DateTime @default(now()) + updated DateTime @default(now()) @updatedAt type String retryCount Int @default(0) @map("retry_count") data Json From 9c971fc409dfb11bb95587d69db74fa7be4b3f9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 10:38:37 +0100 Subject: [PATCH 07/33] Bulk update + insert. --- .../lib/outbox-prisma-adapter.ts | 52 ++++-- .../test/outbox-prisma-adapter.spec.ts | 157 +++++++++--------- 2 files changed, 112 insertions(+), 97 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index 412620c8..a6c24b2b 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -34,25 +34,43 @@ export class OutboxPrismaAdapter entry.id), }, - create: { - id: entry.id, - type: getMessageType(entry.event), - created: entry.created, - updated: new Date(), - data: entry.data, - status: 'SUCCESS', + }, + }) + + const toCreate = entries.filter( + (entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id), + ) + const toUpdate = entries.filter((entry) => + existingEntries.some((existingEntry) => existingEntry.id === entry.id), + ) + + await prismaModel.createMany({ + data: toCreate.map((entry) => ({ + id: entry.id, + type: getMessageType(entry.event), + created: entry.created, + updated: new Date(), + data: entry.data, + status: 'SUCCESS', + })), + }) + + await prismaModel.updateMany({ + where: { + id: { + in: toUpdate.map((entry) => entry.id), }, - }) - } + }, + data: { + status: 'SUCCESS', + updated: new Date(), + }, + }) } getEntries(maxRetryCount: number): Promise[]> { diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index c58cd8ed..de149565 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -5,7 +5,7 @@ import { } from '@message-queue-toolkit/schemas' import { PrismaClient } from '@prisma/client' import { uuidv7 } from 'uuidv7' -import { afterAll, beforeAll, describe, expect, it } from 'vitest' +import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest' import { z } from 'zod' import { OutboxPrismaAdapter } from '../lib/outbox-prisma-adapter' @@ -26,8 +26,42 @@ describe('outbox-prisma-adapter', () => { let prisma: PrismaClient let outboxPrismaAdapter: OutboxPrismaAdapter + const ENTRY_1 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + + const ENTRY_2 = { + id: uuidv7(), + event: events.created, + status: 'CREATED', + data: { + id: uuidv7(), + payload: { + message: 'TEST EVENT 2', + }, + metadata: {}, + timestamp: new Date().toISOString(), + }, + retryCount: 0, + created: new Date(), + } satisfies OutboxEntry + beforeAll(async () => { - prisma = new PrismaClient() + prisma = new PrismaClient({ + log: ['query'], + }) outboxPrismaAdapter = new OutboxPrismaAdapter(prisma, 'OutboxEntry') @@ -45,6 +79,10 @@ describe('outbox-prisma-adapter', () => { ` }) + beforeEach(async () => { + await prisma.$queryRaw`DELETE FROM prisma.outbox_entry;` + }) + afterAll(async () => { await prisma.$queryRaw`DROP TABLE prisma.outbox_entry;` await prisma.$queryRaw`DROP SCHEMA prisma;` @@ -92,40 +130,8 @@ describe('outbox-prisma-adapter', () => { it('should insert successful entries from accumulator', async () => { const accumulator = new InMemoryOutboxAccumulator() - - const entry1 = { - id: uuidv7(), - event: events.created, - status: 'CREATED', - data: { - id: uuidv7(), - payload: { - message: 'TEST EVENT', - }, - metadata: {}, - timestamp: new Date().toISOString(), - }, - retryCount: 0, - created: new Date(), - } satisfies OutboxEntry - accumulator.add(entry1) - - const entry2 = { - id: uuidv7(), - event: events.created, - status: 'CREATED', - data: { - id: uuidv7(), - payload: { - message: 'TEST EVENT 2', - }, - metadata: {}, - timestamp: new Date().toISOString(), - }, - retryCount: 0, - created: new Date(), - } satisfies OutboxEntry - accumulator.add(entry2) + accumulator.add(ENTRY_1) + accumulator.add(ENTRY_2) await outboxPrismaAdapter.flush(accumulator) @@ -133,78 +139,69 @@ describe('outbox-prisma-adapter', () => { expect(entriesAfterFlush).toMatchObject([ { - id: entry1.id, + id: ENTRY_1.id, status: 'SUCCESS', }, { - id: entry2.id, + id: ENTRY_2.id, status: 'SUCCESS', }, ]) }) - it("should update successful entries' status to 'SUCCESS'", async () => { + it("should update existing entries' status to 'SUCCESS'", async () => { const accumulator = new InMemoryOutboxAccumulator() + accumulator.add(ENTRY_1) + accumulator.add(ENTRY_2) - const entry1 = { - id: uuidv7(), - event: events.created, - status: 'CREATED', - data: { - id: uuidv7(), - payload: { - message: 'TEST EVENT', - }, - metadata: {}, - timestamp: new Date().toISOString(), - }, - retryCount: 0, - created: new Date(), - } satisfies OutboxEntry - accumulator.add(entry1) - - const entry2 = { - id: uuidv7(), - event: events.created, - status: 'CREATED', - data: { - id: uuidv7(), - payload: { - message: 'TEST EVENT 2', - }, - metadata: {}, - timestamp: new Date().toISOString(), - }, - retryCount: 0, - created: new Date(), - } satisfies OutboxEntry - accumulator.add(entry2) - - await outboxPrismaAdapter.createEntry(entry1) - await outboxPrismaAdapter.createEntry(entry2) + await outboxPrismaAdapter.createEntry(ENTRY_1) + await outboxPrismaAdapter.createEntry(ENTRY_2) const beforeFlush = await outboxPrismaAdapter.getEntries(10) expect(beforeFlush).toMatchObject([ { - id: entry1.id, + id: ENTRY_1.id, status: 'CREATED', }, { - id: entry2.id, + id: ENTRY_2.id, status: 'CREATED', }, ]) - outboxPrismaAdapter.flush(accumulator) + await outboxPrismaAdapter.flush(accumulator) const afterFlush = await outboxPrismaAdapter.getEntries(10) expect(afterFlush).toMatchObject([ { - id: entry1.id, + id: ENTRY_1.id, + status: 'SUCCESS', + }, + { + id: ENTRY_2.id, + status: 'SUCCESS', + }, + ]) + }) + + it('should handle mix of entries, non existing and existing, and change their status to SUCCESS', async () => { + const accumulator = new InMemoryOutboxAccumulator() + accumulator.add(ENTRY_1) + accumulator.add(ENTRY_2) + + //Only one exists in DB. + await outboxPrismaAdapter.createEntry(ENTRY_2) + + await outboxPrismaAdapter.flush(accumulator) + + const afterFirstFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFirstFlush).toMatchObject([ + { + id: ENTRY_1.id, status: 'SUCCESS', }, { - id: entry2.id, + id: ENTRY_2.id, status: 'SUCCESS', }, ]) From 5ad155f2831c8b9359d6a6e05996e7afceeb17bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 11:39:20 +0100 Subject: [PATCH 08/33] Failed entries handling. --- .../lib/outbox-prisma-adapter.ts | 102 ++++++++++++++---- .../test/outbox-prisma-adapter.spec.ts | 36 +++++++ 2 files changed, 116 insertions(+), 22 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index a6c24b2b..d8dd9c2b 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -25,23 +25,33 @@ export class OutboxPrismaAdapter): Promise { const entries = await outboxAccumulator.getEntries() - + const failedEntries = await outboxAccumulator.getFailedEntries() const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName] const existingEntries = await prismaModel.findMany({ where: { id: { - in: entries.map((entry) => entry.id), + in: [...entries.map((entry) => entry.id), ...failedEntries.map((entry) => entry.id)], }, }, }) + await this.handleSuccesses(prismaModel, entries, existingEntries) + await this.handleFailures(prismaModel, failedEntries, existingEntries) + } + + private async handleSuccesses( + prismaModel: PrismaClient[typeof this.modelName], + entries: OutboxEntry[], + existingEntries: OutboxEntry[], + ) { const toCreate = entries.filter( (entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id), ) @@ -49,28 +59,76 @@ export class OutboxPrismaAdapter existingEntry.id === entry.id), ) - await prismaModel.createMany({ - data: toCreate.map((entry) => ({ - id: entry.id, - type: getMessageType(entry.event), - created: entry.created, - updated: new Date(), - data: entry.data, - status: 'SUCCESS', - })), - }) + if (toCreate.length > 0) { + await prismaModel.createMany({ + data: toCreate.map((entry) => ({ + id: entry.id, + type: getMessageType(entry.event), + created: entry.created, + updated: new Date(), + data: entry.data, + status: 'SUCCESS', + })), + }) + } - await prismaModel.updateMany({ - where: { - id: { - in: toUpdate.map((entry) => entry.id), + if (toUpdate.length > 0) { + await prismaModel.updateMany({ + where: { + id: { + in: toUpdate.map((entry) => entry.id), + }, }, - }, - data: { - status: 'SUCCESS', - updated: new Date(), - }, - }) + data: { + status: 'SUCCESS', + updated: new Date(), + }, + }) + } + } + + private async handleFailures( + prismaModel: PrismaClient[typeof this.modelName], + entries: OutboxEntry[], + existingEntries: OutboxEntry[], + ) { + const toCreate = entries.filter( + (entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id), + ) + const toUpdate = entries.filter((entry) => + existingEntries.some((existingEntry) => existingEntry.id === entry.id), + ) + + if (toCreate.length > 0) { + await prismaModel.createMany({ + data: toCreate.map((entry) => ({ + id: entry.id, + type: getMessageType(entry.event), + created: entry.created, + updated: new Date(), + data: entry.data, + status: 'FAILED', + retryCount: 1, + })), + }) + } + + if (toUpdate.length > 0) { + await prismaModel.updateMany({ + where: { + id: { + in: toUpdate.map((entry) => entry.id), + }, + }, + data: { + status: 'FAILED', + updated: new Date(), + retryCount: { + increment: 1, + }, + }, + }) + } } getEntries(maxRetryCount: number): Promise[]> { diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index de149565..c5f1037d 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -206,4 +206,40 @@ describe('outbox-prisma-adapter', () => { }, ]) }) + + it("should change failed entries' status to 'FAILED' and increment retry count", async () => { + const accumulator = new InMemoryOutboxAccumulator() + accumulator.addFailure(ENTRY_1) + + await outboxPrismaAdapter.flush(accumulator) + + const afterFirstFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFirstFlush).toMatchObject([ + { + id: ENTRY_1.id, + status: 'FAILED', + retryCount: 1, + }, + ]) + }) + + it('should change failed EXISTING entries status to FAILED and increment retry count', async () => { + const accumulator = new InMemoryOutboxAccumulator() + const failedEntry = { ...ENTRY_1, retryCount: 3, status: 'FAILED' } satisfies OutboxEntry< + SupportedEvents[number] + > + accumulator.addFailure(failedEntry) + + await outboxPrismaAdapter.createEntry(failedEntry) + await outboxPrismaAdapter.flush(accumulator) + + const afterFirstFlush = await outboxPrismaAdapter.getEntries(10) + expect(afterFirstFlush).toMatchObject([ + { + id: failedEntry.id, + status: 'FAILED', + retryCount: 4, + }, + ]) + }) }) From d9e3d410b384882196643115a850bc48cbc3c089 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 12:02:10 +0100 Subject: [PATCH 09/33] fetching entries up to the retry count limit. --- .../test/outbox-prisma-adapter.spec.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index c5f1037d..5df57663 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -242,4 +242,15 @@ describe('outbox-prisma-adapter', () => { }, ]) }) + + it('should not fetch entries that exceed retry limit', async () => { + const failedEntry = { ...ENTRY_1, retryCount: 6, status: 'FAILED' } satisfies OutboxEntry< + SupportedEvents[number] + > + await outboxPrismaAdapter.createEntry(failedEntry) + + const entries = await outboxPrismaAdapter.getEntries(5) + + expect(entries).toEqual([]) + }) }) From c61533da2c4b4ec5687ac59e0a94f949fafd264d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 12:42:18 +0100 Subject: [PATCH 10/33] Narrowed down types. --- .../lib/outbox-prisma-adapter.ts | 34 +++++++++++++------ .../test/outbox-prisma-adapter.spec.ts | 7 ++-- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index d8dd9c2b..d97d32f4 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -3,18 +3,27 @@ import type { OutboxStorage } from '@message-queue-toolkit/outbox-core/dist/lib/ import { type CommonEventDefinition, getMessageType } from '@message-queue-toolkit/schemas' import type { PrismaClient } from '@prisma/client' -export class OutboxPrismaAdapter - implements OutboxStorage +type ModelDelegate = { + create: (args: any) => Promise + findMany: (args: any) => Promise + createMany: (args: any) => Promise + updateMany: (args: any) => Promise +} + +export class OutboxPrismaAdapter< + SupportedEvents extends CommonEventDefinition[], + ModelName extends keyof PrismaClient & string, +> implements OutboxStorage { constructor( private readonly prisma: PrismaClient, - private readonly modelName: string, + private readonly modelName: ModelName, ) {} createEntry( outboxEntry: OutboxEntry, ): Promise> { - const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName] + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate const messageType = getMessageType(outboxEntry.event) return prismaModel.create({ @@ -33,7 +42,7 @@ export class OutboxPrismaAdapter): Promise { const entries = await outboxAccumulator.getEntries() const failedEntries = await outboxAccumulator.getFailedEntries() - const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName] + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate const existingEntries = await prismaModel.findMany({ where: { @@ -43,12 +52,15 @@ export class OutboxPrismaAdapter { + const prismaModel = prisma[this.modelName] as ModelDelegate + await this.handleSuccesses(prismaModel, entries, existingEntries) + await this.handleFailures(prismaModel, failedEntries, existingEntries) + }) } private async handleSuccesses( - prismaModel: PrismaClient[typeof this.modelName], + prismaModel: ModelDelegate, entries: OutboxEntry[], existingEntries: OutboxEntry[], ) { @@ -88,7 +100,7 @@ export class OutboxPrismaAdapter[], existingEntries: OutboxEntry[], ) { @@ -132,7 +144,9 @@ export class OutboxPrismaAdapter[]> { - return this.prisma[this.modelName].findMany({ + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate + + return prismaModel.findMany({ where: { retryCount: { lte: maxRetryCount, diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index 5df57663..0e3499bb 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -24,7 +24,7 @@ type SupportedEvents = (typeof events)[keyof typeof events][] describe('outbox-prisma-adapter', () => { let prisma: PrismaClient - let outboxPrismaAdapter: OutboxPrismaAdapter + let outboxPrismaAdapter: OutboxPrismaAdapter const ENTRY_1 = { id: uuidv7(), @@ -63,7 +63,10 @@ describe('outbox-prisma-adapter', () => { log: ['query'], }) - outboxPrismaAdapter = new OutboxPrismaAdapter(prisma, 'OutboxEntry') + outboxPrismaAdapter = new OutboxPrismaAdapter( + prisma, + 'outboxEntry', + ) await prisma.$queryRaw`create schema if not exists prisma;` await prisma.$queryRaw` From 552e0b69603277e6bf2a08eee42de2d160f61e11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 12:48:03 +0100 Subject: [PATCH 11/33] lint fix --- packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts | 4 ++++ packages/outbox-prisma-adapter/package.json | 6 +----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index d97d32f4..6d16b22b 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -4,9 +4,13 @@ import { type CommonEventDefinition, getMessageType } from '@message-queue-toolk import type { PrismaClient } from '@prisma/client' type ModelDelegate = { + // biome-ignore lint/suspicious/noExplicitAny: create: (args: any) => Promise + // biome-ignore lint/suspicious/noExplicitAny: findMany: (args: any) => Promise + // biome-ignore lint/suspicious/noExplicitAny: createMany: (args: any) => Promise + // biome-ignore lint/suspicious/noExplicitAny: updateMany: (args: any) => Promise } diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 30ad98da..9082a069 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -58,9 +58,5 @@ "outbox", "pattern" ], - "files": [ - "README.md", - "LICENSE", - "dist/*" - ] + "files": ["README.md", "LICENSE", "dist/*"] } From f14338459a2d8eaf95d53f28212ca602e82127e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 13:10:16 +0100 Subject: [PATCH 12/33] Use generated db client from test dir. --- .gitignore | 3 +++ .../outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts | 2 +- packages/outbox-prisma-adapter/test/schema.prisma | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 3e5e6564..53a8393c 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ dist .pnp.* /.idea /package-lock.json + +# prisma +db-client diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index 0e3499bb..514d2a32 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -3,11 +3,11 @@ import { type CommonEventDefinition, enrichMessageSchemaWithBase, } from '@message-queue-toolkit/schemas' -import { PrismaClient } from '@prisma/client' import { uuidv7 } from 'uuidv7' import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest' import { z } from 'zod' import { OutboxPrismaAdapter } from '../lib/outbox-prisma-adapter' +import { PrismaClient } from './db-client' const events = { created: { diff --git a/packages/outbox-prisma-adapter/test/schema.prisma b/packages/outbox-prisma-adapter/test/schema.prisma index 1b3c0ee8..bb4bf71b 100644 --- a/packages/outbox-prisma-adapter/test/schema.prisma +++ b/packages/outbox-prisma-adapter/test/schema.prisma @@ -17,4 +17,5 @@ model OutboxEntry { generator client { provider = "prisma-client-js" + output = "./db-client" } From 432875be4732e6261d3fb161f2689ac10ea62d48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 13:22:49 +0100 Subject: [PATCH 13/33] Build includes building test prisma client. --- packages/outbox-prisma-adapter/package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 9082a069..1ec8539a 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -13,7 +13,7 @@ "main": "dist/index.js", "types": "dist/index.d.ts", "scripts": { - "build": "del-cli dist && tsc", + "build": "del-cli dist && npm run db:update-client && tsc", "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", "test": "vitest", "test:coverage": "npm test -- --coverage", @@ -22,6 +22,7 @@ "lint:fix": "biome check --write .", "docker:start:dev": "docker compose up -d", "docker:stop:dev": "docker compose down", + "db:update-client": "prisma generate --schema=./test/schema.prisma", "prepublishOnly": "npm run build:release" }, "peerDependencies": { From ccf29a69fdcf47d0a53720946f0f5e924c407786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 13:32:39 +0100 Subject: [PATCH 14/33] Fixed import. --- .../outbox-prisma-adapter/lib/outbox-prisma-adapter.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index 6d16b22b..85d4f9f5 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -1,5 +1,8 @@ -import type { OutboxAccumulator, OutboxEntry } from '@message-queue-toolkit/outbox-core' -import type { OutboxStorage } from '@message-queue-toolkit/outbox-core/dist/lib/storage' +import type { + OutboxAccumulator, + OutboxEntry, + OutboxStorage, +} from '@message-queue-toolkit/outbox-core' import { type CommonEventDefinition, getMessageType } from '@message-queue-toolkit/schemas' import type { PrismaClient } from '@prisma/client' From 3f39f92140fe4caeef11278cada9ad5d78afea59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 14:20:45 +0100 Subject: [PATCH 15/33] prisma main dependency. --- packages/outbox-prisma-adapter/package.json | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 1ec8539a..528b44aa 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -25,11 +25,13 @@ "db:update-client": "prisma generate --schema=./test/schema.prisma", "prepublishOnly": "npm run build:release" }, + "dependencies": { + "@prisma/client": "^5.19.1" + }, "peerDependencies": { "@message-queue-toolkit/core": ">=14.0.0", "@message-queue-toolkit/outbox-core": ">=0.1.0", - "@message-queue-toolkit/schemas": ">=4.0.0", - "@prisma/client": "^5.19.1" + "@message-queue-toolkit/schemas": ">=4.0.0" }, "devDependencies": { "@biomejs/biome": "1.8.3", From 0b7d7188bfe9d86205de9678773b854c5fbb9469 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Wed, 27 Nov 2024 14:28:33 +0100 Subject: [PATCH 16/33] prisma client dev dependency. --- packages/outbox-prisma-adapter/package.json | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 528b44aa..c0b909bb 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -25,13 +25,11 @@ "db:update-client": "prisma generate --schema=./test/schema.prisma", "prepublishOnly": "npm run build:release" }, - "dependencies": { - "@prisma/client": "^5.19.1" - }, "peerDependencies": { "@message-queue-toolkit/core": ">=14.0.0", "@message-queue-toolkit/outbox-core": ">=0.1.0", - "@message-queue-toolkit/schemas": ">=4.0.0" + "@message-queue-toolkit/schemas": ">=4.0.0", + "@prisma/client": "^5.19.1" }, "devDependencies": { "@biomejs/biome": "1.8.3", @@ -39,6 +37,7 @@ "@types/node": "^22.0.0", "@vitest/coverage-v8": "^2.0.4", "del-cli": "^5.1.0", + "@prisma/client": "^5.19.1", "prisma": "^5.19.1", "typescript": "^5.5.3", "uuidv7": "^1.0.2", From c2108e35c442ed371b6aaa4230d474405823c4f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 10:24:41 +0100 Subject: [PATCH 17/33] Build before lint. --- .github/workflows/linting.yml | 4 ++++ packages/outbox-prisma-adapter/package.json | 10 +++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/.github/workflows/linting.yml b/.github/workflows/linting.yml index 2b4ea8b9..05715da1 100644 --- a/.github/workflows/linting.yml +++ b/.github/workflows/linting.yml @@ -30,5 +30,9 @@ jobs: run: | npm install --ignore-scripts + - name: Build + run: | + npm run build + - name: Run lint run: npm run lint diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index c0b909bb..4d70b274 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -34,11 +34,11 @@ "devDependencies": { "@biomejs/biome": "1.8.3", "@kibertoad/biome-config": "^1.2.1", + "@prisma/client": "^5.22.0", "@types/node": "^22.0.0", "@vitest/coverage-v8": "^2.0.4", "del-cli": "^5.1.0", - "@prisma/client": "^5.19.1", - "prisma": "^5.19.1", + "prisma": "^5.22.0", "typescript": "^5.5.3", "uuidv7": "^1.0.2", "vitest": "^2.0.4", @@ -60,5 +60,9 @@ "outbox", "pattern" ], - "files": ["README.md", "LICENSE", "dist/*"] + "files": [ + "README.md", + "LICENSE", + "dist/*" + ] } From a7a9cb0c4ba2fe10c4c14f165fcc3563d404472f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 11:01:21 +0100 Subject: [PATCH 18/33] Ignore db client in biome. --- biome.json | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/biome.json b/biome.json index fa4d8f68..55921a49 100644 --- a/biome.json +++ b/biome.json @@ -1,12 +1,15 @@ { - "$schema": "./node_modules/@biomejs/biome/configuration_schema.json", - "extends": ["./node_modules/@kibertoad/biome-config/configs/biome-package.json"], - "linter": { - "rules": { - "performance": { - "noBarrelFile": "off", - "noReExportAll": "off" - } - } - } + "$schema": "./node_modules/@biomejs/biome/configuration_schema.json", + "extends": ["./node_modules/@kibertoad/biome-config/configs/biome-package.json"], + "linter": { + "rules": { + "performance": { + "noBarrelFile": "off", + "noReExportAll": "off" + } + } + }, + "files": { + "ignore": ["db-client"] + } } From 1b3c72c0af86cbd02299c21822dde97f3570cb25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 11:41:29 +0100 Subject: [PATCH 19/33] Peer prisma. --- packages/outbox-prisma-adapter/package.json | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 4d70b274..8c58fa9d 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -29,6 +29,7 @@ "@message-queue-toolkit/core": ">=14.0.0", "@message-queue-toolkit/outbox-core": ">=0.1.0", "@message-queue-toolkit/schemas": ">=4.0.0", + "prisma": "^5.22.0", "@prisma/client": "^5.19.1" }, "devDependencies": { @@ -60,9 +61,5 @@ "outbox", "pattern" ], - "files": [ - "README.md", - "LICENSE", - "dist/*" - ] + "files": ["README.md", "LICENSE", "dist/*"] } From 77b1d2654e1c132e53ef62318bb5e454d37f51a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 11:45:31 +0100 Subject: [PATCH 20/33] inferred type --- packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index 85d4f9f5..b25b9863 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -59,7 +59,7 @@ export class OutboxPrismaAdapter< }, }) - await this.prisma.$transaction(async (prisma) => { + await this.prisma.$transaction(async (prisma: { [x: string]: ModelDelegate }) => { const prismaModel = prisma[this.modelName] as ModelDelegate await this.handleSuccesses(prismaModel, entries, existingEntries) await this.handleFailures(prismaModel, failedEntries, existingEntries) From 856ee17c3656362ae88261cc5d7e4865dce4aebf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 12:33:47 +0100 Subject: [PATCH 21/33] debugging ci --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bc873220..2d629bef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: - name: Build TS run: | - npm run build + ls node_modules && npm run build - name: Run Tests run: | From 45d7310e60b1428ca613aa07bcf44b113675ae70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 12:42:42 +0100 Subject: [PATCH 22/33] keep prisma outside test folder in root --- packages/outbox-prisma-adapter/package.json | 2 +- packages/outbox-prisma-adapter/{test => prisma}/schema.prisma | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) rename packages/outbox-prisma-adapter/{test => prisma}/schema.prisma (94%) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 8c58fa9d..03cea9f3 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -22,7 +22,7 @@ "lint:fix": "biome check --write .", "docker:start:dev": "docker compose up -d", "docker:stop:dev": "docker compose down", - "db:update-client": "prisma generate --schema=./test/schema.prisma", + "db:update-client": "prisma generate", "prepublishOnly": "npm run build:release" }, "peerDependencies": { diff --git a/packages/outbox-prisma-adapter/test/schema.prisma b/packages/outbox-prisma-adapter/prisma/schema.prisma similarity index 94% rename from packages/outbox-prisma-adapter/test/schema.prisma rename to packages/outbox-prisma-adapter/prisma/schema.prisma index bb4bf71b..1b3c0ee8 100644 --- a/packages/outbox-prisma-adapter/test/schema.prisma +++ b/packages/outbox-prisma-adapter/prisma/schema.prisma @@ -17,5 +17,4 @@ model OutboxEntry { generator client { provider = "prisma-client-js" - output = "./db-client" } From 00ca6d052130041e59eb9194a91b00be00cb7767 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 12:47:07 +0100 Subject: [PATCH 23/33] Fixed import in spec. --- .github/workflows/ci.yml | 2 +- .../outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2d629bef..bc873220 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,7 +32,7 @@ jobs: - name: Build TS run: | - ls node_modules && npm run build + npm run build - name: Run Tests run: | diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index 514d2a32..0e3499bb 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -3,11 +3,11 @@ import { type CommonEventDefinition, enrichMessageSchemaWithBase, } from '@message-queue-toolkit/schemas' +import { PrismaClient } from '@prisma/client' import { uuidv7 } from 'uuidv7' import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest' import { z } from 'zod' import { OutboxPrismaAdapter } from '../lib/outbox-prisma-adapter' -import { PrismaClient } from './db-client' const events = { created: { From 33b954bb2171de5197b23456f9748991547924a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 12:51:10 +0100 Subject: [PATCH 24/33] temp ts ignore. --- packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index b25b9863..d8d7d516 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -32,6 +32,7 @@ export class OutboxPrismaAdapter< ): Promise> { const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate + // @ts-ignore const messageType = getMessageType(outboxEntry.event) return prismaModel.create({ data: { @@ -59,7 +60,7 @@ export class OutboxPrismaAdapter< }, }) - await this.prisma.$transaction(async (prisma: { [x: string]: ModelDelegate }) => { + await this.prisma.$transaction(async (prisma) => { const prismaModel = prisma[this.modelName] as ModelDelegate await this.handleSuccesses(prismaModel, entries, existingEntries) await this.handleFailures(prismaModel, failedEntries, existingEntries) @@ -82,6 +83,7 @@ export class OutboxPrismaAdapter< await prismaModel.createMany({ data: toCreate.map((entry) => ({ id: entry.id, + // @ts-ignore type: getMessageType(entry.event), created: entry.created, updated: new Date(), @@ -122,6 +124,7 @@ export class OutboxPrismaAdapter< await prismaModel.createMany({ data: toCreate.map((entry) => ({ id: entry.id, + // @ts-ignore type: getMessageType(entry.event), created: entry.created, updated: new Date(), From 829c9ac7b71fa0737654d5910107f64b0336e507 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 13:07:19 +0100 Subject: [PATCH 25/33] wait for db. --- packages/outbox-prisma-adapter/package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 03cea9f3..6066cd27 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -17,11 +17,12 @@ "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", "test": "vitest", "test:coverage": "npm test -- --coverage", - "test:ci": "npm run docker:start:dev && npm run test:coverage && npm run docker:stop:dev", + "test:ci": "npm run docker:start:dev && npm run db:wait && npm run test:coverage && npm run docker:stop:dev", "lint": "biome check . && tsc --project tsconfig.json --noEmit", "lint:fix": "biome check --write .", "docker:start:dev": "docker compose up -d", "docker:stop:dev": "docker compose down", + "db:wait": "while ! echo \"SELECT 1;\" | dotenv -c test -- prisma db execute --stdin; do sleep 1; done", "db:update-client": "prisma generate", "prepublishOnly": "npm run build:release" }, From 1f39dbf29bc21cfaac605f571b50d2154615596b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 13:11:51 +0100 Subject: [PATCH 26/33] wait for db. --- packages/outbox-prisma-adapter/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 6066cd27..07e0df12 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -16,7 +16,7 @@ "build": "del-cli dist && npm run db:update-client && tsc", "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", "test": "vitest", - "test:coverage": "npm test -- --coverage", + "test:coverage": "npm run docker:start:dev && npm run db:wait && npm test -- --coverage && npm run docker:stop:dev", "test:ci": "npm run docker:start:dev && npm run db:wait && npm run test:coverage && npm run docker:stop:dev", "lint": "biome check . && tsc --project tsconfig.json --noEmit", "lint:fix": "biome check --write .", From 8b818c15f543d7c2ec66273032b8ab491c73b14f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 13:15:35 +0100 Subject: [PATCH 27/33] wait for db. --- packages/outbox-prisma-adapter/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index 07e0df12..ea9db243 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -22,7 +22,7 @@ "lint:fix": "biome check --write .", "docker:start:dev": "docker compose up -d", "docker:stop:dev": "docker compose down", - "db:wait": "while ! echo \"SELECT 1;\" | dotenv -c test -- prisma db execute --stdin; do sleep 1; done", + "db:wait": "while ! echo \"SELECT 1;\" | prisma db execute --stdin; do sleep 1; done", "db:update-client": "prisma generate", "prepublishOnly": "npm run build:release" }, From ab58167fca37a0cfc887f3197ac1d240d540fca8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 28 Nov 2024 15:00:48 +0100 Subject: [PATCH 28/33] Redundant docker start. --- packages/outbox-prisma-adapter/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/outbox-prisma-adapter/package.json b/packages/outbox-prisma-adapter/package.json index ea9db243..f4878ee9 100644 --- a/packages/outbox-prisma-adapter/package.json +++ b/packages/outbox-prisma-adapter/package.json @@ -17,7 +17,7 @@ "build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json", "test": "vitest", "test:coverage": "npm run docker:start:dev && npm run db:wait && npm test -- --coverage && npm run docker:stop:dev", - "test:ci": "npm run docker:start:dev && npm run db:wait && npm run test:coverage && npm run docker:stop:dev", + "test:ci": "npm run test:coverage", "lint": "biome check . && tsc --project tsconfig.json --noEmit", "lint:fix": "biome check --write .", "docker:start:dev": "docker compose up -d", From ed4ce0259972e63606b6ed9a86e0f12ea6ae11d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 23 Jan 2025 14:34:11 +0100 Subject: [PATCH 29/33] Simplified outbox entry. --- packages/outbox-core/lib/objects.ts | 3 +-- packages/outbox-core/lib/outbox.ts | 13 ++++++++----- packages/outbox-core/package.json | 2 +- packages/outbox-core/test/outbox.spec.ts | 10 +++++----- 4 files changed, 15 insertions(+), 13 deletions(-) diff --git a/packages/outbox-core/lib/objects.ts b/packages/outbox-core/lib/objects.ts index 2ea6762b..b8ea41c6 100644 --- a/packages/outbox-core/lib/objects.ts +++ b/packages/outbox-core/lib/objects.ts @@ -15,8 +15,7 @@ export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED' export type OutboxEntry = { id: string - event: SupportedEvent - data: Omit, 'type'> + event: CommonEventDefinitionPublisherSchemaType precedingMessageMetadata?: Partial status: OutboxEntryStatus created: Date diff --git a/packages/outbox-core/lib/outbox.ts b/packages/outbox-core/lib/outbox.ts index 4d7b7079..6f23253f 100644 --- a/packages/outbox-core/lib/outbox.ts +++ b/packages/outbox-core/lib/outbox.ts @@ -6,6 +6,7 @@ import type { ConsumerMessageMetadataType, DomainEventEmitter, } from '@message-queue-toolkit/core' +import { enrichMessageSchemaWithBase } from '@message-queue-toolkit/schemas' import { PromisePool } from '@supercharge/promise-pool' import { uuidv7 } from 'uuidv7' import type { OutboxAccumulator } from './accumulators' @@ -50,7 +51,11 @@ export class OutboxProcessor { .withConcurrency(this.outboxProcessorConfiguration.emitBatchSize) .process(async (entry) => { try { - await eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata) + const event = entry.event + + const schema = { ...enrichMessageSchemaWithBase(event.type, event.payload) } + + await eventEmitter.emit(schema, entry.event, entry.precedingMessageMetadata) await outboxAccumulator.add(entry) } catch (e) { context.logger.error({ error: e }, 'Failed to process outbox entry.') @@ -133,14 +138,12 @@ export class OutboxEventEmitter * @param precedingMessageMetadata */ public async emit( - supportedEvent: SupportedEvent, - data: Omit, 'type'>, + data: CommonEventDefinitionPublisherSchemaType, precedingMessageMetadata?: Partial, ) { await this.storage.createEntry({ id: uuidv7(), - event: supportedEvent, - data, + event: data, precedingMessageMetadata, status: 'CREATED', created: new Date(), diff --git a/packages/outbox-core/package.json b/packages/outbox-core/package.json index 84174025..2c8f93e8 100644 --- a/packages/outbox-core/package.json +++ b/packages/outbox-core/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/outbox-core", - "version": "0.1.0", + "version": "0.2.0", "private": false, "license": "MIT", "description": "Outbox pattern implementation for message queue toolkit", diff --git a/packages/outbox-core/test/outbox.spec.ts b/packages/outbox-core/test/outbox.spec.ts index ec1950c5..b06861f8 100644 --- a/packages/outbox-core/test/outbox.spec.ts +++ b/packages/outbox-core/test/outbox.spec.ts @@ -91,7 +91,7 @@ describe('outbox', () => { }) it('saves outbox entry to storage', async () => { - await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + await outboxEventEmitter.emit(createdEventPayload, { correlationId: randomUUID(), }) @@ -101,7 +101,7 @@ describe('outbox', () => { }) it('saves outbox entry and process it', async () => { - await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + await outboxEventEmitter.emit(createdEventPayload, { correlationId: randomUUID(), }) @@ -140,7 +140,7 @@ describe('outbox', () => { }), ) - await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + await outboxEventEmitter.emit(createdEventPayload, { correlationId: randomUUID(), }) @@ -184,7 +184,7 @@ describe('outbox', () => { }) //Persist the event - await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + await outboxEventEmitter.emit(createdEventPayload, { correlationId: randomUUID(), }) @@ -229,7 +229,7 @@ describe('outbox', () => { it("doesn't emit event again if it's already present in accumulator", async () => { const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit') - await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { + await outboxEventEmitter.emit(createdEventPayload, { correlationId: randomUUID(), }) From bc80d7ddd1b931f55ff5f2ffdb9f1e42d86b7802 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Thu, 23 Jan 2025 14:48:24 +0100 Subject: [PATCH 30/33] Fixed tests to new version of outbox entry. --- .../lib/outbox-prisma-adapter.ts | 16 +++++++--------- .../outbox-prisma-adapter/prisma/schema.prisma | 2 +- .../test/outbox-prisma-adapter.spec.ts | 17 +++++++++-------- 3 files changed, 17 insertions(+), 18 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index d8d7d516..7194e3ff 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -3,7 +3,7 @@ import type { OutboxEntry, OutboxStorage, } from '@message-queue-toolkit/outbox-core' -import { type CommonEventDefinition, getMessageType } from '@message-queue-toolkit/schemas' +import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' import type { PrismaClient } from '@prisma/client' type ModelDelegate = { @@ -33,14 +33,13 @@ export class OutboxPrismaAdapter< const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate // @ts-ignore - const messageType = getMessageType(outboxEntry.event) return prismaModel.create({ data: { id: outboxEntry.id, - type: messageType, + type: outboxEntry.event.type, created: outboxEntry.created, updated: outboxEntry.updated, - data: outboxEntry.data, + event: outboxEntry.event, status: outboxEntry.status, retryCount: outboxEntry.retryCount, }, @@ -84,10 +83,10 @@ export class OutboxPrismaAdapter< data: toCreate.map((entry) => ({ id: entry.id, // @ts-ignore - type: getMessageType(entry.event), + type: entry.event.type, created: entry.created, updated: new Date(), - data: entry.data, + event: entry.event, status: 'SUCCESS', })), }) @@ -124,11 +123,10 @@ export class OutboxPrismaAdapter< await prismaModel.createMany({ data: toCreate.map((entry) => ({ id: entry.id, - // @ts-ignore - type: getMessageType(entry.event), + type: entry.event.type, created: entry.created, updated: new Date(), - data: entry.data, + event: entry.event, status: 'FAILED', retryCount: 1, })), diff --git a/packages/outbox-prisma-adapter/prisma/schema.prisma b/packages/outbox-prisma-adapter/prisma/schema.prisma index 1b3c0ee8..7395092a 100644 --- a/packages/outbox-prisma-adapter/prisma/schema.prisma +++ b/packages/outbox-prisma-adapter/prisma/schema.prisma @@ -9,7 +9,7 @@ model OutboxEntry { updated DateTime @default(now()) @updatedAt type String retryCount Int @default(0) @map("retry_count") - data Json + event Json status String @@map("outbox_entry") diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index 0e3499bb..94ff5d8c 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -28,15 +28,15 @@ describe('outbox-prisma-adapter', () => { const ENTRY_1 = { id: uuidv7(), - event: events.created, status: 'CREATED', - data: { + event: { id: uuidv7(), payload: { message: 'TEST EVENT', }, metadata: {}, timestamp: new Date().toISOString(), + type: 'entity.created', }, retryCount: 0, created: new Date(), @@ -44,13 +44,13 @@ describe('outbox-prisma-adapter', () => { const ENTRY_2 = { id: uuidv7(), - event: events.created, status: 'CREATED', - data: { + event: { id: uuidv7(), payload: { message: 'TEST EVENT 2', }, + type: 'entity.created', metadata: {}, timestamp: new Date().toISOString(), }, @@ -76,7 +76,7 @@ describe('outbox-prisma-adapter', () => { created TIMESTAMP NOT NULL, updated TIMESTAMP, retry_count INT NOT NULL DEFAULT 0, - data JSONB NOT NULL, + event JSONB NOT NULL, status TEXT NOT NULL ) ` @@ -95,13 +95,13 @@ describe('outbox-prisma-adapter', () => { it('creates entry in DB via outbox storage implementation', async () => { await outboxPrismaAdapter.createEntry({ id: uuidv7(), - event: events.created, status: 'CREATED', - data: { + event: { id: uuidv7(), payload: { message: 'TEST EVENT', }, + type: 'entity.created', metadata: {}, timestamp: new Date().toISOString(), }, @@ -118,11 +118,12 @@ describe('outbox-prisma-adapter', () => { created: expect.any(Date), updated: expect.any(Date), retryCount: 0, - data: { + event: { id: expect.any(String), payload: { message: 'TEST EVENT', }, + type: 'entity.created', metadata: {}, timestamp: expect.any(String), }, From 55bdd41a7a04202dcf537bfd06c86378e7226c0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Fri, 24 Jan 2025 10:50:35 +0100 Subject: [PATCH 31/33] Working on ModelDelegate --- .../lib/outbox-prisma-adapter.ts | 56 ++++++++++++++----- .../test/outbox-prisma-adapter.spec.ts | 3 + 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index 7194e3ff..edf06d32 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -4,17 +4,36 @@ import type { OutboxStorage, } from '@message-queue-toolkit/outbox-core' import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' -import type { PrismaClient } from '@prisma/client' +import type { Prisma, PrismaClient } from '@prisma/client' -type ModelDelegate = { - // biome-ignore lint/suspicious/noExplicitAny: - create: (args: any) => Promise - // biome-ignore lint/suspicious/noExplicitAny: - findMany: (args: any) => Promise +export type EnrichedOutboxEntry = OutboxEntry & { + type: string +} + +type ModelDelegate = { + create: (args: { data: EnrichedOutboxEntry }) => Promise> + findMany: (args: { + where: + | Partial> + | { + id?: Prisma.StringFilter + retryCount?: Prisma.IntFilter + } + }) => Promise[]> // biome-ignore lint/suspicious/noExplicitAny: createMany: (args: any) => Promise - // biome-ignore lint/suspicious/noExplicitAny: - updateMany: (args: any) => Promise + updateMany: (args: { + where: { + id: { + in: string[] + } + } + data: + | Partial> + | { + retryCount?: number | Prisma.IntFieldUpdateOperationsInput + } + }) => Promise } export class OutboxPrismaAdapter< @@ -30,9 +49,10 @@ export class OutboxPrismaAdapter< createEntry( outboxEntry: OutboxEntry, ): Promise> { - const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate< + SupportedEvents[number] + > - // @ts-ignore return prismaModel.create({ data: { id: outboxEntry.id, @@ -49,7 +69,9 @@ export class OutboxPrismaAdapter< async flush(outboxAccumulator: OutboxAccumulator): Promise { const entries = await outboxAccumulator.getEntries() const failedEntries = await outboxAccumulator.getFailedEntries() - const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate< + SupportedEvents[number] + > const existingEntries = await prismaModel.findMany({ where: { @@ -60,14 +82,16 @@ export class OutboxPrismaAdapter< }) await this.prisma.$transaction(async (prisma) => { - const prismaModel = prisma[this.modelName] as ModelDelegate + const prismaModel = prisma[this.modelName] as unknown as ModelDelegate< + SupportedEvents[number] + > await this.handleSuccesses(prismaModel, entries, existingEntries) await this.handleFailures(prismaModel, failedEntries, existingEntries) }) } private async handleSuccesses( - prismaModel: ModelDelegate, + prismaModel: ModelDelegate, entries: OutboxEntry[], existingEntries: OutboxEntry[], ) { @@ -108,7 +132,7 @@ export class OutboxPrismaAdapter< } private async handleFailures( - prismaModel: ModelDelegate, + prismaModel: ModelDelegate, entries: OutboxEntry[], existingEntries: OutboxEntry[], ) { @@ -152,7 +176,9 @@ export class OutboxPrismaAdapter< } getEntries(maxRetryCount: number): Promise[]> { - const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate + const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate< + SupportedEvents[number] + > return prismaModel.findMany({ where: { diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index 94ff5d8c..d84adc6e 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -38,6 +38,7 @@ describe('outbox-prisma-adapter', () => { timestamp: new Date().toISOString(), type: 'entity.created', }, + type: 'entity.created', retryCount: 0, created: new Date(), } satisfies OutboxEntry @@ -54,6 +55,7 @@ describe('outbox-prisma-adapter', () => { metadata: {}, timestamp: new Date().toISOString(), }, + type: 'entity.created', retryCount: 0, created: new Date(), } satisfies OutboxEntry @@ -105,6 +107,7 @@ describe('outbox-prisma-adapter', () => { metadata: {}, timestamp: new Date().toISOString(), }, + type: 'entity.created', retryCount: 0, created: new Date(), } satisfies OutboxEntry) From 01975c0a68e1963dbca7f7540cc187a0aaec0a8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Fri, 24 Jan 2025 11:20:40 +0100 Subject: [PATCH 32/33] Fixed model delegate. --- packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts index edf06d32..7a39a551 100644 --- a/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts +++ b/packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts @@ -20,8 +20,7 @@ type ModelDelegate = { retryCount?: Prisma.IntFilter } }) => Promise[]> - // biome-ignore lint/suspicious/noExplicitAny: - createMany: (args: any) => Promise + createMany: (args: { data: EnrichedOutboxEntry[] }) => Promise updateMany: (args: { where: { id: { @@ -106,12 +105,12 @@ export class OutboxPrismaAdapter< await prismaModel.createMany({ data: toCreate.map((entry) => ({ id: entry.id, - // @ts-ignore type: entry.event.type, created: entry.created, updated: new Date(), event: entry.event, status: 'SUCCESS', + retryCount: entry.retryCount, })), }) } From 0fc3e7f1ae571f583732120a939f9d85c09c5330 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Wylega=C5=82a?= Date: Fri, 24 Jan 2025 11:25:59 +0100 Subject: [PATCH 33/33] Type fix. --- .../outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts index d84adc6e..94ff5d8c 100644 --- a/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts +++ b/packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts @@ -38,7 +38,6 @@ describe('outbox-prisma-adapter', () => { timestamp: new Date().toISOString(), type: 'entity.created', }, - type: 'entity.created', retryCount: 0, created: new Date(), } satisfies OutboxEntry @@ -55,7 +54,6 @@ describe('outbox-prisma-adapter', () => { metadata: {}, timestamp: new Date().toISOString(), }, - type: 'entity.created', retryCount: 0, created: new Date(), } satisfies OutboxEntry @@ -107,7 +105,6 @@ describe('outbox-prisma-adapter', () => { metadata: {}, timestamp: new Date().toISOString(), }, - type: 'entity.created', retryCount: 0, created: new Date(), } satisfies OutboxEntry)