diff --git a/src/pulsarutil/start_pos.go b/src/pulsarutil/start_pos.go new file mode 100644 index 0000000..044a11a --- /dev/null +++ b/src/pulsarutil/start_pos.go @@ -0,0 +1,39 @@ +package pulsarutil + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/apache/pulsar-client-go/pulsar" +) + +func GetStartOption(startFrom string) (pulsar.SubscriptionInitialPosition, *pulsar.MessageID, *time.Time, error) { + switch { + case startFrom == "earliest": + return pulsar.SubscriptionPositionEarliest, nil, nil, nil + case strings.HasPrefix(startFrom, "messageId:"): + parts := strings.Split(strings.TrimPrefix(startFrom, "messageId:"), ":") + if len(parts) != 2 { + return 0, nil, nil, fmt.Errorf("invalid messageId format, expected ledgerId:entryId") + } + ledgerId, err1 := strconv.ParseInt(parts[0], 10, 64) + entryId, err2 := strconv.ParseInt(parts[1], 10, 64) + if err1 != nil || err2 != nil { + return 0, nil, nil, fmt.Errorf("invalid messageId numbers") + } + msgID := pulsar.NewMessageID(ledgerId, entryId, -1) + return 0, &msgID, nil, nil + case strings.HasPrefix(startFrom, "timestamp:"): + millisStr := strings.TrimPrefix(startFrom, "timestamp:") + millis, err := strconv.ParseInt(millisStr, 10, 64) + if err != nil { + return 0, nil, nil, fmt.Errorf("invalid timestamp") + } + t := time.UnixMilli(millis) + return 0, nil, &t, nil + default: + return 0, nil, nil, fmt.Errorf("unsupported startFrom value") + } +} diff --git a/src/route/handlers.go b/src/route/handlers.go index f624684..2f78d9e 100644 --- a/src/route/handlers.go +++ b/src/route/handlers.go @@ -18,6 +18,7 @@ import ( "github.com/kafkaesque-io/pulsar-beam/src/model" "github.com/kafkaesque-io/pulsar-beam/src/pulsardriver" "github.com/kafkaesque-io/pulsar-beam/src/util" + "github.com/kafkaesque-io/pulsar-beam/src/pulsarutil" log "github.com/sirupsen/logrus" ) @@ -196,6 +197,9 @@ func SSEHandler(w http.ResponseWriter, r *http.Request) { return } + // Parse the startFrom query parameter + startFrom := r.URL.Query().Get("startFrom") + // Make sure that the writer supports flushing. flusher, ok := w.(http.Flusher) if !ok { @@ -219,6 +223,22 @@ func SSEHandler(w http.ResponseWriter, r *http.Request) { defer consumer.Unsubscribe() } + // Apply seek after subscribing based on the startFrom query parameter + if startFrom != "" { + pos, msgID, ts, err := pulsarutil.GetStartOption(startFrom) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if msgID != nil { + consumer.Seek(*msgID) + } else if ts != nil { + consumer.SeekByTime(*ts) + } else { + consumer.SeekByTime(time.Now()) + } + } + consumChan := consumer.Chan() for { select {