Skip to content

Conversation

@arturwojnar
Copy link
Contributor

@arturwojnar arturwojnar commented Jul 31, 2025

This PR intends to provide a consumer relying on the MongoDB Change Streams.

Example of what caused me thinking about this feature 👇

switch (type) {
  case 'B2BInstitutionUsedLicencesIncreased':
    if (state._disabled) {
      throw new InstitutionInvalidState({
        details: { currentState: 'deleted' },
        message: `Cannot use a licence of a disabled institution.`,
      })
    }

    // Hack.
    const licenceId = generateLicenceId()
    await getEventStore().appendToStream<LicenceEvent>(
      toStreamName(LICENCES_STREAM_TYPE, event.patientId.toString()),
      [
        literalObject<B2BLincenceObtained>({
          type: 'B2BLincenceObtained',
          metadata: {
            patientId: event.patientId,
            institutionId: metadata.institutionId,
            licenceId,
          },
          data: { obtainedAt: event.timestamp, duration: state.licenceDuration as Duration },
        }),
      ],
    )

    return literalObject<B2BInstitution>({
      ...state,
      usedLicences: state.usedLicences + 1,
      availableLicences: state.availableLicences - 1,
    })
}

☝️ This is clipping of a projection's evolve function. As you see, to achieve the reactiveness I did something very ugly 🤮 - I converted the evolve into an asynchronous function and on processing the B2BInstitutionUsedLicencesIncreased event, I append to a stream another event - B2BLincenceObtained.

The latter event is a consequence of the first, but it should not be implicitly and directly coupled to the first one. Especially, if that happens in the projection's internals.

So, the reactiveness is the clue of this PR.

MongoDB brings the Change Stream functionality. This is a simple pull mechanism on MongoDB's oplog.

By subscribing to the Change Stream and storing last processed message's position (here's called a token) we implement, in fact, a simple version of the outbox pattern.

Here's the fixed version:

consumer.reactor<B2BInstitutionUsedLicencesIncreased>({
  processorId: v4(),
  eachMessage: async (event) => {
    await getEventStore().appendToStream<LicenceEvent>(
      toStreamName(LICENCES_STREAM_TYPE, event.patientId.toString()),
      [
        literalObject<B2BLincenceObtained>({
          type: 'B2BLincenceObtained',
          metadata: {
            patientId: event.patientId,
            institutionId: metadata.institutionId,
            licenceId,
          },
          data: { obtainedAt: event.timestamp, duration: state.licenceDuration },
        }),
      ],
    )
  },
  connectionOptions: {
    client,
  },
});

@arturwojnar arturwojnar marked this pull request as draft July 31, 2025 08:31
Copy link
Collaborator

@oskardudycz oskardudycz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturwojnar thanks for doing it!

I did a first round of review, I added some questions/comments/suggestions but overall, it looks like a great first step, after clarifications, adding more tests I'll be happy to merge it!

It'd be great also if you could update descriptions with the highlights of this PR.

@oskardudycz
Copy link
Collaborator

@arturwojnar could you ensure that linter, build and tests are passing? See: https://github.com/event-driven-io/emmett/actions/runs/16644021116/job/47194905100?pr=258. Thanks in advance!

Copy link
Collaborator

@alex-laycalvert alex-laycalvert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good start based off of the other consumers. Nothing that @oskardudycz hasn't already mentioned. I think some of the boillerplate can be removed since the addition of centralized consumer types and stuff.

Probably want to add some documentation that this consumer implementation requires change streams be enabled and can't be used in single-instance environments

@oskardudycz oskardudycz marked this pull request as ready for review November 8, 2025 11:31
Passed explicit checkpoint and used string instead of the nested
structure
@oskardudycz oskardudycz force-pushed the feat/subscriptions branch 2 times, most recently from 340f744 to 47d8944 Compare November 9, 2025 07:32
@oskardudycz oskardudycz force-pushed the feat/subscriptions branch 6 times, most recently from e34c0fc to 6d3d461 Compare November 9, 2025 20:18
That fixes issue when someone appended more than one event and second
being skipped as both were using the same checkpoint. This also aligns
with more future checkpoint handling.

Made also other improvements around resiliency as getting properly batch
result to stop consumers upon condition.
@oskardudycz oskardudycz added this to the 0.40.0 milestone Nov 9, 2025
Copy link
Collaborator

@oskardudycz oskardudycz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturwojnar, I added some additional fixes around starting/stopping/restarting, in general resilience alignment with other consumers.

I'll also need to check why some compatibility tests are not passing (currently commented out).

I also changed nested Mongo Resume Token to URN-based string formatted text, and added message position to it (as there can be more than one event appended in one changeset). I will generalise that to assume that checkpoint can be number, bigint or string, but in a separate PR.

Nevertheless, great work, thank you for doing it, and I appreciate your patience waiting for my final review. 👏 🥳

@oskardudycz oskardudycz merged commit 29c483d into event-driven-io:main Nov 9, 2025
1 check passed
@oskardudycz oskardudycz changed the title feat: MongoDB Consumer based on MongoDB ChangeStream subscription Added MongoDB Consumer based on MongoDB ChangeStream subscription Nov 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants