Skip to content

Commit c444c47

Browse files
committed
qeueue decode + test
1 parent b7a69fd commit c444c47

File tree

3 files changed

+58
-19
lines changed

3 files changed

+58
-19
lines changed

dbos/queue.go

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package dbos
22

33
import (
4-
"bytes"
54
"context"
6-
"encoding/base64"
7-
"encoding/gob"
85
"log/slog"
96
"math"
107
"math/rand"
@@ -227,23 +224,13 @@ func (qr *queueRunner) run(ctx *dbosContext) {
227224
continue
228225
}
229226

230-
// Deserialize input
231-
var input any
232-
if len(workflow.input) > 0 {
233-
inputBytes, err := base64.StdEncoding.DecodeString(workflow.input)
234-
if err != nil {
235-
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
236-
continue
237-
}
238-
buf := bytes.NewBuffer(inputBytes)
239-
dec := gob.NewDecoder(buf)
240-
if err := dec.Decode(&input); err != nil {
241-
qr.logger.Error("failed to decode input for workflow", "workflow_id", workflow.id, "error", err)
242-
continue
243-
}
227+
input, err := ctx.serializer.Decode(&workflow.input)
228+
if err != nil {
229+
qr.logger.Error("Failed to decode workflow input", "workflow_id", workflow.id, "error", err)
230+
continue
244231
}
245232

246-
_, err := registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id))
233+
_, err = registeredWorkflow.wrappedFunction(ctx, input, WithWorkflowID(workflow.id))
247234
if err != nil {
248235
qr.logger.Error("Error running queued workflow", "error", err)
249236
}

dbos/serialization_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,9 @@ func TestSerializer(t *testing.T) {
534534
t.Run(serializerName, func(t *testing.T) {
535535
executor := setupDBOS(t, true, true, serializerFactory())
536536

537+
// Create a test queue for queued workflow tests
538+
testQueue := NewWorkflowQueue(executor, "serializer-test-queue")
539+
537540
// Register workflows
538541
RegisterWorkflow(executor, serializerWorkflow)
539542
RegisterWorkflow(executor, serializerNilValueWorkflow)
@@ -747,6 +750,56 @@ func TestSerializer(t *testing.T) {
747750
testWorkflowRecovery(t, executor, serializerAnyRecoveryWorkflow, serializerAnyRecoveryStartEvent, serializerAnyRecoveryEvent, any(input), "serializer-any-recovery-wf")
748751
})
749752

753+
// Test queued workflow with TestWorkflowData type
754+
t.Run("QueuedWorkflow", func(t *testing.T) {
755+
input := TestWorkflowData{
756+
ID: "queued-test-id",
757+
Message: "queued test message",
758+
Value: 456,
759+
Active: false,
760+
Data: TestData{Message: "queued nested", Value: 789, Active: true},
761+
Metadata: map[string]string{"type": "queued"},
762+
}
763+
764+
// Start workflow with queue option
765+
handle, err := RunWorkflow(executor, serializerWorkflow, input, WithWorkflowID("serializer-queued-wf"), WithQueue(testQueue.Name))
766+
require.NoError(t, err, "failed to start queued workflow")
767+
768+
// Get result from the handle
769+
result, err := handle.GetResult()
770+
require.NoError(t, err, "queued workflow should complete successfully")
771+
assert.Equal(t, input, result, "queued workflow result should match input")
772+
})
773+
774+
// Test queued workflow with any type
775+
t.Run("QueuedWorkflowAny", func(t *testing.T) {
776+
if serializerName == "Gob" {
777+
t.Skip("Skipping test for Gob serializer due to Gob limitations with interface types")
778+
}
779+
780+
input := TestWorkflowData{
781+
ID: "queued-any-test-id",
782+
Message: "queued any test message",
783+
Value: 321,
784+
Active: true,
785+
Data: TestData{Message: "queued any nested", Value: 654, Active: false},
786+
Metadata: map[string]string{"type": "queued-any"},
787+
}
788+
789+
// Start workflow with queue option
790+
handle, err := RunWorkflow(executor, serializerAnyValueWorkflow, any(input), WithWorkflowID("serializer-queued-any-wf"), WithQueue(testQueue.Name))
791+
require.NoError(t, err, "failed to start queued workflow")
792+
793+
// Get result from the handle
794+
result, err := handle.GetResult()
795+
require.NoError(t, err, "queued workflow should complete successfully")
796+
797+
// Convert the result from any type
798+
typedResult, err := convertJSONToType[TestWorkflowData](result)
799+
require.NoError(t, err, "Failed to convert result")
800+
assert.Equal(t, input, typedResult, "queued workflow result should match input")
801+
})
802+
750803
})
751804
}
752805
}

dbos/workflow.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,7 +990,6 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
990990

991991
// Handle DBOS ID conflict errors by waiting workflow result
992992
needsConversion := false
993-
fmt.Println(err)
994993
if errors.Is(err, &DBOSError{Code: ConflictingIDError}) {
995994
fmt.Println("Detected workflow ID conflict error, awaiting existing workflow result")
996995
c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID)

0 commit comments

Comments
 (0)