-
-
Notifications
You must be signed in to change notification settings - Fork 40
Added MongoDB Consumer based on MongoDB ChangeStream subscription #258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added MongoDB Consumer based on MongoDB ChangeStream subscription #258
Conversation
src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/mongoDBEventsConsumer.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/readProcessorCheckpoint.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/subscriptions/index.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/mongoDBEventStore.subscription.e2e.spec.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts
Outdated
Show resolved
Hide resolved
src/packages/emmett-mongodb/src/eventStore/consumers/storeProcessorCheckpoint.ts
Outdated
Show resolved
Hide resolved
oskardudycz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arturwojnar thanks for doing it!
I did a first round of review, I added some questions/comments/suggestions but overall, it looks like a great first step, after clarifications, adding more tests I'll be happy to merge it!
It'd be great also if you could update descriptions with the highlights of this PR.
|
@arturwojnar could you ensure that linter, build and tests are passing? See: https://github.com/event-driven-io/emmett/actions/runs/16644021116/job/47194905100?pr=258. Thanks in advance! |
alex-laycalvert
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a good start based off of the other consumers. Nothing that @oskardudycz hasn't already mentioned. I think some of the boillerplate can be removed since the addition of centralized consumer types and stuff.
Probably want to add some documentation that this consumer implementation requires change streams be enabled and can't be used in single-instance environments
57fcd84 to
f81bf91
Compare
85c7d37 to
278a7bf
Compare
Passed explicit checkpoint and used string instead of the nested structure
53e7b17 to
507a345
Compare
340f744 to
47d8944
Compare
e34c0fc to
6d3d461
Compare
That fixes issue when someone appended more than one event and second being skipped as both were using the same checkpoint. This also aligns with more future checkpoint handling. Made also other improvements around resiliency as getting properly batch result to stop consumers upon condition.
6d3d461 to
1a0a14c
Compare
oskardudycz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arturwojnar, I added some additional fixes around starting/stopping/restarting, in general resilience alignment with other consumers.
I'll also need to check why some compatibility tests are not passing (currently commented out).
I also changed nested Mongo Resume Token to URN-based string formatted text, and added message position to it (as there can be more than one event appended in one changeset). I will generalise that to assume that checkpoint can be number, bigint or string, but in a separate PR.
Nevertheless, great work, thank you for doing it, and I appreciate your patience waiting for my final review. 👏 🥳
This PR intends to provide a consumer relying on the MongoDB Change Streams.
Example of what caused me thinking about this feature 👇
☝️ This is clipping of a projection's evolve function. As you see, to achieve the reactiveness I did something very ugly 🤮 - I converted the evolve into an asynchronous function and on processing the
B2BInstitutionUsedLicencesIncreasedevent, I append to a stream another event -B2BLincenceObtained.The latter event is a consequence of the first, but it should not be implicitly and directly coupled to the first one. Especially, if that happens in the projection's internals.
So, the reactiveness is the clue of this PR.
MongoDB brings the Change Stream functionality. This is a simple pull mechanism on MongoDB's oplog.
By subscribing to the Change Stream and storing last processed message's position (here's called a token) we implement, in fact, a simple version of the outbox pattern.
Here's the fixed version: