-
Notifications
You must be signed in to change notification settings - Fork 4
feat(mikroorm-driver): add PostgreSQL LISTEN/NOTIFY for instant event processing #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jtomaszewski
wants to merge
11
commits into
Nestixis:main
Choose a base branch
from
fullstackhouse:vk/e475-add-postgresql-l
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
feat(mikroorm-driver): add PostgreSQL LISTEN/NOTIFY for instant event processing #10
jtomaszewski
wants to merge
11
commits into
Nestixis:main
from
fullstackhouse:vk/e475-add-postgresql-l
+7,846
−1,642
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
* test(mikroorm-driver): add comprehensive test suite for MikroORM driver - Set up Vitest with SWC for decorator metadata support - Add test utilities for spinning up isolated NestJS apps with fresh PostgreSQL databases - Add unit tests for MikroOrmInboxOutboxTransportEvent entity - Add unit tests for MikroORMDatabaseDriver (persist, remove, flush, findAndExtendReadyToRetryEvents) - Add unit tests for MikroORMDatabaseDriverFactory (forked entity managers, isolation) - Add integration tests for TransactionalEventEmitter with real database - Fix integer overflow for timestamps by using bigint column type 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * ci: add GitHub workflow for MikroORM driver tests - Add test-mikroorm-driver.yml workflow with PostgreSQL service - Revert @mikro-orm versions to ^6.3.9 as requested 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * fix(ci): build core package before running MikroORM driver tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
Fix typo in field name throughout codebase and add database migrations to rename the column from delived_to_listeners to delivered_to_listeners. BREAKING CHANGE: Existing databases require running the new migration to rename the column. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude <noreply@anthropic.com>
* fix `npm run build` for all pkgs (vibe-kanban eeb84602) * Done. Updated the paths configuration in `test-core.yml` to match the mikroorm workflow pattern: - Added `branches: [main]` filter for both push and pull_request - Added `package.json` and `package-lock.json` (dependency changes may affect tests) - Added `.github/workflows/test-core.yml` (workflow changes should trigger a run) * Done. Removed `branches: [main]` from both workflows. They'll now trigger on any branch when the relevant paths change.
1. **test-core.yml**: Added a "Type check" step that runs `npm run build` before tests 2. **test-mikroorm-driver.yml**: Added a "Type check MikroORM driver" step that runs `npm run build` on the mikroorm-driver workspace before tests This ensures TypeScript type errors are caught early in CI before tests run, preventing the issue where tests could pass while type errors exist in the code.
* Done. Here's what I added: **Created `.github/workflows/publish-preview.yml`** - A new GitHub Action workflow that: - Triggers on pull requests and pushes to main - Builds all workspace packages - Publishes preview packages using pkg.pr.new with compact URLs **Updated `package.json` (root)** - Added a root-level `build` script for npm workspaces. **Updated `packages/core/package.json`** - Added the `repository` field (other packages already had it) to enable the `--compact` flag for shorter URLs. **Next step**: Install the [pkg.pr.new GitHub App](https://github.com/apps/pkg-pr-new) on the repository to activate the preview publishing. * fix(ci): remove --compact flag (requires npm publish with repository field)
…ntPoller (#4) * test workflows run twice in a PR? (vibe-kanban 7dc44370) see #8 * feat(core): add graceful shutdown handling to RetryableInboxOutboxEventPoller Implement OnModuleDestroy lifecycle hook to properly clean up resources during application shutdown: - Unsubscribe from RxJS interval to stop polling - Track in-flight event processing and wait for completion - Add isShuttingDown flag to prevent new processing during shutdown 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
* feat(core): add immediateProcessing option to event configuration Add optional `immediateProcessing` boolean to InboxOutboxModuleEventOptions (default: true for backward compatibility). When set to false, events are only saved to DB and processed later by the poller, enabling a safer "fire and forget" pattern for crash recovery scenarios. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> * add integration test for immediateProcessing (vibe-kanban 0a39979e) verify that it is not processed immediately, but eventually once poller runs * fix: use TransactionalEventEmitterOperations enum in integration tests --------- Co-authored-by: Claude <noreply@anthropic.com>
I've implemented PostgreSQL LISTEN/NOTIFY support for faster event processing. Here's what was added:
1. **`packages/core/src/poller/event-notification-listener.interface.ts`** - New interface defining the contract for notification listeners:
- `connect()` / `disconnect()` lifecycle methods
- `notifications$` RxJS Observable for notification events
2. **`packages/core/src/poller/retryable-inbox-outbox-event.poller.ts`** - Modified to:
- Accept optional `EventNotificationListener` via `@Optional()` decorator
- Connect the listener on `onModuleInit()`
- Merge notification events with polling interval using RxJS `merge()`
- Properly disconnect on `onModuleDestroy()`
- Keep polling as fallback (both sources trigger `processBatch()`)
3. **`packages/core/src/index.ts`** - Export the new interface and token
1. **`packages/mikroorm-driver/src/migrations/MigrationInboxOutbox1733300000.ts`** - New migration that creates:
- `notify_inbox_outbox_event()` PostgreSQL function
- `inbox_outbox_event_notify` trigger on `inbox_outbox_transport_event` table
2. **`packages/mikroorm-driver/src/listener/postgresql-event-notification.listener.ts`** - PostgreSQL-specific implementation:
- Uses `pg` Client for dedicated LISTEN connection
- Handles reconnection on connection errors
- Configurable reconnect delay
3. **`packages/mikroorm-driver/src/migrations/migrations.ts`** - Updated to include new migration
4. **`packages/mikroorm-driver/src/index.ts`** - Export the new listener
- `postgresql-event-notification-listener.spec.ts` - Unit tests for the listener
- `listen-notify-integration.spec.ts` - Integration tests verifying instant notifications
To enable LISTEN/NOTIFY in an application:
```typescript
import { PostgreSQLEventNotificationListener } from '@nestixis/nestjs-inbox-outbox-mikroorm-driver';
import { EVENT_NOTIFICATION_LISTENER_TOKEN } from '@nestixis/nestjs-inbox-outbox';
// In your module providers:
{
provide: EVENT_NOTIFICATION_LISTENER_TOKEN,
useFactory: () => new PostgreSQLEventNotificationListener({
host: 'localhost',
port: 5432,
user: 'postgres',
password: 'password',
database: 'mydb',
}),
}
```
The implementation is backward compatible - if no notification listener is provided, the system falls back to polling-only behavior.
- Rename EventNotificationListener → EventListener - Rename EVENT_NOTIFICATION_LISTENER_TOKEN → EVENT_LISTENER_TOKEN - Rename notifications$ → events$ - Rename PostgreSQLEventNotificationListener → PostgreSQLEventListener - PostgreSQLEventListener now takes MikroORM instance and extracts connection config automatically - Updated all tests with new naming 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
Adds PostgreSQL LISTEN/NOTIFY support for faster event processing as an alternative to polling. When a new event is inserted into the inbox/outbox table, a database trigger sends a notification that instantly triggers event processing, reducing latency from polling interval (default 30s) to near-instant.
Changes
Core package:
EventListenerinterface withconnect(),disconnect(), andevents$ObservableRetryableInboxOutboxEventPollerto optionally accept an event listener and merge its events with pollingMikroORM driver package:
PostgreSQLEventListenerimplementation that uses pg LISTEN/NOTIFYUsage
Run the migration to create the trigger:
Test plan
🤖 Generated with Claude Code