Skip to content

Conversation

@alexluong
Copy link
Collaborator

@alexluong alexluong commented Nov 12, 2025

Summary

Returns detailed ingestion status showing what happened with each event, including idempotency detection.

API Response Changes

Before

{ "id": "evt_123" }

After

{
  "id": "evt_123",
  "duplicate": false
}

Response Fields

  • id: The event ID (provided in request or server-generated)
  • duplicate: Whether this event was already processed (idempotency hit)

Detecting Idempotency

Fresh request: duplicate == false
Duplicate: duplicate == true
Still processing: HTTP 409 Conflict

Examples

// Normal publish
{ "id": "evt_123", "duplicate": false }

// Duplicate (idempotency hit)
{ "id": "evt_123", "duplicate": true }

// No matches
{ "id": "evt_123", "duplicate": false }

// With destination_id - successfully queued
{ "id": "evt_123", "duplicate": false }

// With destination_id - disabled/not_found/topic_mismatch (not queued)
{ "id": "evt_123", "duplicate": false }

Idempotency Strategy

Matching OUTSIDE idempotency → fresh status every request
Publishing INSIDE idempotency → single key per event

// Step 1: Match destinations (OUTSIDE idempotency - always fresh)
if event.destination_id:
    destinations = matchSpecificDestination(event.destination_id)
else:
    destinations = matchEventByTopic(event.topic)

result = {
    id: event.id,
    duplicate: false
}

if len(destinations) == 0:
    return result  // Early exit

// Step 2: Publish (INSIDE idempotency - single execution per event)
executed = false
idempotence.Exec(key, func() {
    executed = true
    publishToDestinations(destinations)
})

// Step 3: Set duplicate flag based on execution
if !executed:
    result.duplicate = true  // Idempotency hit

return result

Original PR Description (for reference)

Summary

Returns detailed ingestion status showing what happened with each event, including idempotency detection and destination-specific feedback.

API Response Changes

Before

{ "id": "evt_123" }

After

{
  "id": "evt_123",
  "matched_count": 3,
  "queued_count": 3
}

Response Fields

  • matched_count: Destinations that matched the event
  • queued_count: Deliveries actually queued this request
  • destination_status: Status string (only when destination_id specified AND nothing queued)

Detecting Idempotency

Fresh request: matched_count == queued_count (both non-zero)
Duplicate: matched_count > 0 but queued_count == 0
Still processing: HTTP 409 Conflict

Examples

// Normal publish
{ "matched_count": 3, "queued_count": 3 }

// Duplicate (idempotency hit)
{ "matched_count": 3, "queued_count": 0 }

// No matches
{ "matched_count": 0, "queued_count": 0 }

// With destination_id - successfully queued
{ "matched_count": 1, "queued_count": 1 }

// With destination_id - disabled (not queued)
{ "matched_count": 0, "queued_count": 0, "destination_status": "disabled" }

// With destination_id - not found (not queued)
{ "matched_count": 0, "queued_count": 0, "destination_status": "not_found" }

// With destination_id - topic mismatch (not queued)
{ "matched_count": 0, "queued_count": 0, "destination_status": "topic_mismatch" }

Idempotency Strategy

Matching OUTSIDE idempotency → fresh status every request
Publishing INSIDE idempotency → single key per event

// Step 1: Match destinations (OUTSIDE idempotency - always fresh)
if event.destination_id:
    destinations, status = matchSpecificDestination(event.destination_id)
else:
    destinations = matchEventByTopic(event.topic)

result = {
    matched_count: len(destinations),
    queued_count: 0,
    destination_status: status  // only set if destination_id provided AND nothing matched
}

if len(destinations) == 0:
    return result  // Early exit

// Step 2: Publish (INSIDE idempotency - single execution per event)
executed = false
idempotence.Exec(key, func() {
    executed = true
    publishToDestinations(destinations)
})

// Step 3: Set queued count based on execution
if executed:
    result.queued_count = len(destinations)  // Fresh publish
else:
    result.queued_count = 0                  // Idempotency hit

return result

@vercel
Copy link

vercel bot commented Nov 12, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
outpost-docs Ready Ready Preview Comment Nov 17, 2025 6:00am
outpost-website Ready Ready Preview Comment Nov 17, 2025 6:00am

@alexbouchardd
Copy link
Contributor

I'm not a fan of the semantics of those responses. First I think the key is not explicit/descriptive enough like match_count. Second there's no reason for the queued_count really it can either be the same as match_count or 0 which implies the actual value isn't the right way to represent an idempotency hit. Instead just be explict?

@alexluong
Copy link
Collaborator Author

hmm I see your point. How about queued_count and if the event is already processed then we can have "duplicate" or something like that?

@fmvilas
Copy link
Contributor

fmvilas commented Nov 12, 2025

Why not just something like this?

{
  "id": "evt_123",
  "duplicate": true // Or false if it's not duplicate.
}

@alexbouchardd
Copy link
Contributor

Why not just something like this?

{
  "id": "evt_123",
  "duplicate": true // Or false if it's not duplicate.
}

🤯

Base automatically changed from retry-monitor-loop to v0.9.0 November 17, 2025 04:38
@alexluong
Copy link
Collaborator Author

@claude please review

@claude
Copy link

claude bot commented Nov 17, 2025

Claude finished @alexluong's task —— View job


Code Review

Overall Assessment

This PR successfully implements detailed ingestion status feedback with a clean architecture. The implementation is well-tested and correctly handles the idempotency strategy described in the PR. However, I recommend considering the feedback from @fmvilas and @alexbouchardd about using a simpler duplicate: boolean field instead of the current matched_count/queued_count approach.

Key Strengths:

  • Comprehensive test coverage with clear scenarios
  • Clean separation of matching (outside idempotency) and publishing (inside idempotency)
  • Well-documented edge cases for destination_id scenarios

Areas for Improvement:

  • Response semantics could be simplified per team discussion
  • Minor code quality improvements needed

1. API Response Design - Consider Simplification

The current implementation uses matched_count and queued_count, but the team discussion suggests this may be overly complex. @fmvilas proposed:

{
  "id": "evt_123",
  "duplicate": true
}

Current implementation analysis:

  • matched_count and queued_count are always either equal or one is zero
  • The actual counts don't provide much value beyond detecting duplicates
  • Clients need to do math (matched_count > 0 && queued_count == 0) to detect duplicates

Recommendation:
Consider refactoring to use a simple duplicate: boolean field as suggested. This would:

  • Simplify client logic (no math required)
  • Make the API more intuitive
  • Reduce payload size
  • Still support all use cases described in the PR

If keeping the current approach, I'd recommend renaming matched_count to something more explicit like destinations_matched or match_count (as @alexbouchardd suggested).

Files affected:

  • internal/publishmq/eventhandler.go:27-31
  • internal/publishmq/eventhandler_test.go (all test assertions)
  • internal/apirouter/publish_handlers.go:65
2. Code Quality - matchSpecificDestination Function

Location: internal/publishmq/eventhandler.go:172-200

The matchSpecificDestination function has some redundancy:

destination, err := h.entityStore.RetrieveDestination(ctx, event.TenantID, event.DestinationID)
if err != nil {
    h.logger.Ctx(ctx).Warn("failed to retrieve destination",
        zap.Error(err),
        zap.String("destination_id", event.DestinationID))
    status := DestinationStatusNotFound
    return []models.DestinationSummary{}, &status, nil
}

if destination == nil {
    status := DestinationStatusNotFound
    return []models.DestinationSummary{}, &status, nil
}

Issue: Both error and nil cases return the same status. These could be combined:

destination, err := h.entityStore.RetrieveDestination(ctx, event.TenantID, event.DestinationID)
if err != nil || destination == nil {
    if err != nil {
        h.logger.Ctx(ctx).Warn("failed to retrieve destination",
            zap.Error(err),
            zap.String("destination_id", event.DestinationID))
    }
    status := DestinationStatusNotFound
    return []models.DestinationSummary{}, &status, nil
}
3. Topic Validation Logic - Potential Edge Case

Location: internal/publishmq/eventhandler.go:193

if event.Topic != "" && event.Topic != "*" && destination.Topics[0] != "*" && !slices.Contains(destination.Topics, event.Topic) {

Issue: This line accesses destination.Topics[0] without checking if the slice is empty.

Risk: If a destination somehow has an empty Topics slice, this will panic with an index out of bounds error.

Recommendation: Add a length check:

if event.Topic != "" && event.Topic != "*" && 
    len(destination.Topics) > 0 && destination.Topics[0] != "*" && 
    !slices.Contains(destination.Topics, event.Topic) {

Alternatively, since slices.Contains already handles empty slices gracefully, you could check for wildcard using the slice method:

if event.Topic != "" && event.Topic != "*" && 
    !slices.Contains(destination.Topics, "*") && 
    !slices.Contains(destination.Topics, event.Topic) {
4. Test Coverage - Good but Missing One Scenario

The test coverage in internal/publishmq/eventhandler_test.go is excellent. However, there's one edge case that could be tested:

Missing test: "duplicate event with destination_id specified"

This would verify that when:

  • A specific destination_id is provided
  • The event is published twice
  • The second request correctly shows duplicate: true (or queued_count: 0 in current implementation)

This is an important scenario since the idempotency logic is shared between topic-based and destination-specific paths.

5. Documentation - DestinationStatus Usage

Location: internal/publishmq/eventhandler.go:30

The DestinationStatus field in the response has complex conditional logic:

  • Only set when destination_id is specified AND nothing was queued
  • Nil when successfully queued or when using topic-based matching

Recommendation: Add a comment to the struct field explaining when it's populated:

type HandleResult struct {
    EventID           string                  `json:"id"`
    Duplicate         bool                    `json:"duplicate"`
    // DestinationStatus is only populated when destination_id is specified 
    // AND the delivery was not queued (e.g., disabled, not_found, topic_mismatch)
    DestinationStatus *DestinationMatchStatus `json:"destination_status,omitempty"`
}
6. Positive Observations

Excellent idempotency architecture:

  • Clean separation between matching (outside) and publishing (inside) idempotency - eventhandler.go:95-146
  • Single idempotency key per event prevents race conditions
  • The executed flag pattern is elegant and easy to understand

Comprehensive test scenarios:

  • Tests cover all happy paths and edge cases
  • Good use of descriptive test names
  • Test refactoring in retry_test.go improves readability and reduces duplication

Proper error handling:

  • Specific destination match statuses provide clear feedback
  • System errors are properly distinguished from publish errors
  • Logging is appropriate and informative

Summary

The implementation is solid and well-tested. My main recommendation is to consider the team's feedback about using a simpler duplicate: boolean response instead of the current counts-based approach. This would make the API more intuitive and easier to use.

If you decide to keep the current approach, please address:

  • The potential index out of bounds issue with destination.Topics[0] access
  • Consider the code quality improvements suggested above

@alexluong alexluong merged commit d002b8a into v0.9.0 Nov 17, 2025
1 of 4 checks passed
@alexluong alexluong deleted the ingestion branch November 17, 2025 05:59
alexluong added a commit that referenced this pull request Nov 17, 2025
* test: event with destination_id should not match disabled destination

* fix: check disabled

* feat: enhane publish handler with more result info

* refactor: match event logic with destination_id field

* test: remove parallel from flaky tests

* test: extend timeout to avoid flaky test

* refactor: api response

* test: use fast timeout to avoid flaky tests

* test: retry poll timeout

* refactor: simplify API response to use duplicate boolean

* refactor: topics.matchtopic util

* test: duplicate publish

* chore: simplify publish response

* docs: openapi.yaml & api.mdx
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants