Skip to content

Commit d53546f

Browse files
af-mdmaxdml
authored andcommitted
refactor: reorganize test code for step execution and enhance determinism checks in Go workflows
1 parent 573a17b commit d53546f

File tree

2 files changed

+105
-18
lines changed

2 files changed

+105
-18
lines changed

dbos/workflow.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1285,7 +1285,6 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12851285
// result := resultChan.result
12861286
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) {
12871287
if ctx == nil {
1288-
// is this the correct return here?
12891288
return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil")
12901289
}
12911290

dbos/workflows_test.go

Lines changed: 105 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,26 +48,11 @@ func simpleStepError(_ context.Context) (string, error) {
4848
return "", fmt.Errorf("step failure")
4949
}
5050

51-
type stepWithSleepOutput struct {
52-
StepID int
53-
Result string
54-
Error error
55-
}
56-
5751
func stepWithSleep(_ context.Context, duration time.Duration) (string, error) {
5852
time.Sleep(duration)
5953
return fmt.Sprintf("from step that slept for %s", duration), nil
6054
}
6155

62-
func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) {
63-
time.Sleep(duration)
64-
return stepWithSleepOutput{
65-
StepID: stepID,
66-
Result: fmt.Sprintf("from step that slept for %s", duration),
67-
Error: nil,
68-
}, nil
69-
}
70-
7156
func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) {
7257
return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) {
7358
return simpleStepError(ctx)
@@ -881,7 +866,37 @@ func TestSteps(t *testing.T) {
881866
})
882867
}
883868

869+
type stepWithSleepOutput struct {
870+
StepID int
871+
Result string
872+
Error error
873+
}
874+
875+
var (
876+
stepDeterminismStartEvent *Event
877+
stepDeterminismEvent *Event
878+
)
879+
880+
func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) {
881+
time.Sleep(duration)
882+
return stepWithSleepOutput{
883+
StepID: stepID,
884+
Result: fmt.Sprintf("from step that slept for %s", duration),
885+
Error: nil,
886+
}, nil
887+
}
888+
889+
// blocks indefinitely
890+
func stepThatBlocks(_ context.Context) (string, error) {
891+
stepDeterminismStartEvent.Set()
892+
fmt.Println("stepThatBlocks: started to block")
893+
stepDeterminismEvent.Wait()
894+
fmt.Println("stepThatBlocks: unblocked")
895+
return "from step that blocked", nil
896+
}
897+
884898
func TestGoRunningStepsInsideGoRoutines(t *testing.T) {
899+
885900
dbosCtx := setupDBOS(t, true, true)
886901

887902
// Register custom types for Gob encoding
@@ -922,15 +937,15 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) {
922937
require.Equal(t, "step error", err.Error())
923938
})
924939

925-
t.Run("Go must execute 100 steps simultaneously", func(t *testing.T) {
940+
t.Run("Go must execute 100 steps simultaneously then return the stepIDs in the correct sequence", func(t *testing.T) {
926941
// run 100 steps simultaneously
927942
const numSteps = 100
928943
results := make(chan string, numSteps)
929944
errors := make(chan error, numSteps)
930945
var resultChans []<-chan StepOutcome[stepWithSleepOutput]
931946

932947
goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
933-
for i := 0; i < numSteps; i++ {
948+
for i := range numSteps {
934949
resultChan, err := Go(dbosCtx, func(ctx context.Context) (stepWithSleepOutput, error) {
935950
return stepWithSleepCustomOutput(ctx, 20*time.Millisecond, i)
936951
})
@@ -961,7 +976,80 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) {
961976
require.NoError(t, err, "failed to get result from go workflow")
962977
assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results))
963978
assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors))
979+
})
980+
981+
t.Run("Go executes the same workflow twice, whilst blocking the first workflow, to test for deterministic execution when using Go routines", func(t *testing.T) {
982+
983+
stepDeterminismStartEvent = NewEvent()
984+
stepDeterminismEvent = NewEvent()
964985

986+
goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
987+
_, err := Go(dbosCtx, func(ctx context.Context) (string, error) {
988+
return stepWithSleep(ctx, 1*time.Second)
989+
})
990+
991+
if err != nil {
992+
return "", err
993+
}
994+
995+
_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
996+
return stepWithSleep(ctx, 1*time.Second)
997+
})
998+
999+
if err != nil {
1000+
return "", err
1001+
}
1002+
1003+
_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
1004+
return stepWithSleep(ctx, 1*time.Second)
1005+
})
1006+
1007+
if err != nil {
1008+
return "", err
1009+
}
1010+
1011+
_, err = Go(dbosCtx, func(ctx context.Context) (string, error) {
1012+
return stepThatBlocks(ctx)
1013+
})
1014+
1015+
if err != nil {
1016+
return "", err
1017+
}
1018+
1019+
return "WORKFLOW EXECUTED DETERMINISTICALLY", nil
1020+
}
1021+
1022+
// Run the first workflow
1023+
RegisterWorkflow(dbosCtx, goWorkflow)
1024+
handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input")
1025+
require.NoError(t, err, "failed to run go workflow")
1026+
1027+
// Wait for the first workflow to reach the blocking step
1028+
stepDeterminismStartEvent.Wait()
1029+
stepDeterminismStartEvent.Clear()
1030+
1031+
// Run the second workflow
1032+
handle2, err := RunWorkflow(dbosCtx, goWorkflow, "test-input", WithWorkflowID(handle.GetWorkflowID()))
1033+
1034+
// If it throws an error, it's because of steps not being deterministically executed when using Go routines in the first workflow
1035+
require.NoError(t, err, "failed to run go workflow")
1036+
1037+
// Complete the blocked workflow
1038+
stepDeterminismEvent.Set()
1039+
1040+
_, err = handle2.GetResult()
1041+
require.NoError(t, err, "failed to get result from go workflow")
1042+
1043+
// Verify workflow status is SUCCESS
1044+
status, err := handle.GetStatus()
1045+
require.NoError(t, err, "failed to get workflow status")
1046+
require.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be WorkflowStatusSuccess")
1047+
1048+
1049+
// Verify workflow result is "WORKFLOW EXECUTED DETERMINISTICALLY"
1050+
result, err := handle.GetResult()
1051+
require.NoError(t, err, "failed to get result from go workflow")
1052+
require.Equal(t, "WORKFLOW EXECUTED DETERMINISTICALLY", result, "expected result to be 'WORKFLOW EXECUTED DETERMINISTICALLY'")
9651053
})
9661054
}
9671055

0 commit comments

Comments
 (0)