From f716ae9e6fdeb6b1ac73da5ca43bdc144b97d363 Mon Sep 17 00:00:00 2001 From: af-md Date: Wed, 3 Sep 2025 18:42:57 +0100 Subject: [PATCH 01/18] feat: add dbos.Go to run steps inside Go routine --- dbos/dbos.go | 1 + dbos/workflow.go | 96 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/dbos/dbos.go b/dbos/dbos.go index f9aa829..f226f06 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -110,6 +110,7 @@ type DBOSContext interface { // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow + Go(_ DBOSContext, fn StepFunc, StepID int, opts ...StepOption) (any, error) // Execute a function as a durable step within a Go routine RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow diff --git a/dbos/workflow.go b/dbos/workflow.go index 5117933..7be1042 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -8,6 +8,7 @@ import ( "math" "reflect" "runtime" + "sync" "time" "github.com/google/uuid" @@ -994,6 +995,7 @@ type stepOptions struct { baseInterval time.Duration // Initial delay between retries (default: 100ms) maxInterval time.Duration // Maximum delay between retries (default: 5s) stepName string // Custom name for the step (defaults to function name) + stepID int } // setDefaults applies default values to stepOptions @@ -1055,6 +1057,12 @@ func WithMaxInterval(interval time.Duration) StepOption { } } +func (c *dbosContext) WithNextStepID(stepID int) StepOption { + return func(opts *stepOptions) { + opts.stepID = stepID + } +} + // RunAsStep executes a function as a durable step within a workflow. // Steps provide at-least-once execution guarantees and automatic retry capabilities. // If a step has already been executed (e.g., during workflow recovery), its recorded @@ -1163,6 +1171,11 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) isWithinStep: true, } + // this logic needs to be looked at + if stepOpts.stepID >= 0 { + stepState.stepID = stepOpts.stepID + } + // Uncancellable context for DBOS operations uncancellableCtx := WithoutCancel(c) @@ -1246,6 +1259,89 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) return stepOutput, stepError } +// how can I setup the sdk in a way to test it locally with other files? + +// Package level +// Run step function using Go routines +// Add odocs +func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { + if ctx == nil { + return *new(R), newStepExecutionError("", "", "ctx cannot be nil") + } + + if fn == nil { + return *new(R), newStepExecutionError("", "", "step function cannot be nil") + } + + // Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name + stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() + opts = append(opts, WithStepName(stepName)) + + // create a determistic step ID + wfState, ok := ctx.Value(workflowStateKey).(*workflowState) + if !ok || wfState == nil { + return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") + } + + // Get stepID if it has been pre generated + + typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) + + // run step inside a Go routine by passing stepID + result, err := ctx.Go(ctx, typeErasedFn, 1, opts...) + + // Step function could return a nil result + if result == nil { + return *new(R), err + } + // Otherwise type-check and cast the result + typedResult, ok := result.(R) + if !ok { + return *new(R), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) + } + return typedResult, err +} + +type stepResultChan struct { + result any + err error +} + +// Private interface +// Add docs +func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, stepID int, opts ...StepOption) (any, error) { + var wg sync.WaitGroup + stepResult := make(chan stepResultChan, 1) + + wg.Add(1) + + go func() { + defer wg.Done() + res, err := c.RunAsStep(ctx, fn, c.WithNextStepID(stepID)) + if err != nil { + stepRes := stepResultChan{ + result: nil, + err: err, + } + stepResult <- stepRes + return + } + + stepRes := stepResultChan{ + result: res, + err: nil, + } + stepResult <- stepRes + }() + + wg.Wait() + close(stepResult) + + res := <-stepResult + + return res.result, res.err +} + /****************************************/ /******* WORKFLOW COMMUNICATIONS ********/ /****************************************/ From a89b214cafb78595e0ff6a55f2b74266c1dd5b76 Mon Sep 17 00:00:00 2001 From: af-md Date: Thu, 4 Sep 2025 16:29:01 +0100 Subject: [PATCH 02/18] run step and get result using channel --- dbos/workflow.go | 31 ++++++------------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 7be1042..414e612 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -8,7 +8,6 @@ import ( "math" "reflect" "runtime" - "sync" "time" "github.com/google/uuid" @@ -1310,36 +1309,18 @@ type stepResultChan struct { // Private interface // Add docs func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, stepID int, opts ...StepOption) (any, error) { - var wg sync.WaitGroup - stepResult := make(chan stepResultChan, 1) - - wg.Add(1) - + result := make(chan stepResultChan, 1) go func() { - defer wg.Done() res, err := c.RunAsStep(ctx, fn, c.WithNextStepID(stepID)) - if err != nil { - stepRes := stepResultChan{ - result: nil, - err: err, - } - stepResult <- stepRes - return - } - - stepRes := stepResultChan{ + result <- stepResultChan{ result: res, - err: nil, + err: err, } - stepResult <- stepRes }() - wg.Wait() - close(stepResult) - - res := <-stepResult - - return res.result, res.err + resultChan := <-result + close(result) + return resultChan.result, resultChan.err } /****************************************/ From 20a2beb37e11fed3881e1014538620ca438cab53 Mon Sep 17 00:00:00 2001 From: af-md Date: Thu, 4 Sep 2025 16:32:00 +0100 Subject: [PATCH 03/18] add comments --- dbos/workflow.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 414e612..c56d6fa 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1258,11 +1258,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) return stepOutput, stepError } -// how can I setup the sdk in a way to test it locally with other files? - -// Package level -// Run step function using Go routines -// Add odocs +// TODO: Add docs --- will add once I get the implementation right func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { if ctx == nil { return *new(R), newStepExecutionError("", "", "ctx cannot be nil") @@ -1301,13 +1297,13 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { return typedResult, err } +// TODO: move type above -- keeping it here for in case I need to modify quickly type stepResultChan struct { result any err error } -// Private interface -// Add docs +// TODO: Add docs --- will add once I get the implementation right func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, stepID int, opts ...StepOption) (any, error) { result := make(chan stepResultChan, 1) go func() { From ba8d2531b0e2ce1381ea410cc66c89c11bca76b4 Mon Sep 17 00:00:00 2001 From: af-md Date: Thu, 4 Sep 2025 16:38:05 +0100 Subject: [PATCH 04/18] append stepID to options --- dbos/workflow.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index c56d6fa..b5d18bb 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1056,7 +1056,8 @@ func WithMaxInterval(interval time.Duration) StepOption { } } -func (c *dbosContext) WithNextStepID(stepID int) StepOption { + +func WithNextStepID(stepID int) StepOption { return func(opts *stepOptions) { opts.stepID = stepID } @@ -1277,8 +1278,10 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { if !ok || wfState == nil { return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") } + stepID := wfState.NextStepID() + opts = append(opts, WithNextStepID(stepID)) - // Get stepID if it has been pre generated + // Type-erase the function typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) @@ -1297,7 +1300,7 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { return typedResult, err } -// TODO: move type above -- keeping it here for in case I need to modify quickly +// TODO: move type above -- keeping it here for in case I need to modify it quickly type stepResultChan struct { result any err error @@ -1307,7 +1310,7 @@ type stepResultChan struct { func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, stepID int, opts ...StepOption) (any, error) { result := make(chan stepResultChan, 1) go func() { - res, err := c.RunAsStep(ctx, fn, c.WithNextStepID(stepID)) + res, err := c.RunAsStep(ctx, fn, opts...) result <- stepResultChan{ result: res, err: err, From 6d3a84ad02f1cc59a7e6055286f4f45eb830a23e Mon Sep 17 00:00:00 2001 From: af-md Date: Thu, 4 Sep 2025 20:30:39 +0100 Subject: [PATCH 05/18] assign stepID --- dbos/workflow.go | 53 +++++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index b5d18bb..cd33ba2 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -987,18 +987,18 @@ type StepFunc func(ctx context.Context) (any, error) // Step represents a type-safe step function with a specific output type R. type Step[R any] func(ctx context.Context) (R, error) -// stepOptions holds the configuration for step execution using functional options pattern. -type stepOptions struct { - maxRetries int // Maximum number of retry attempts (0 = no retries) - backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) - baseInterval time.Duration // Initial delay between retries (default: 100ms) - maxInterval time.Duration // Maximum delay between retries (default: 5s) - stepName string // Custom name for the step (defaults to function name) - stepID int +// StepOptions holds the configuration for step execution using functional options pattern. +type StepOptions struct { + maxRetries int // Maximum number of retry attempts (0 = no retries) + backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) + baseInterval time.Duration // Initial delay between retries (default: 100ms) + maxInterval time.Duration // Maximum delay between retries (default: 5s) + stepName string // Custom name for the step (defaults to function name) + preGeneratedStepID *int // Pre generated stepID in case we want to run the function in a Go routine } // setDefaults applies default values to stepOptions -func (opts *stepOptions) setDefaults() { +func (opts *StepOptions) setDefaults() { if opts.backoffFactor == 0 { opts.backoffFactor = _DEFAULT_STEP_BACKOFF_FACTOR } @@ -1011,12 +1011,12 @@ func (opts *stepOptions) setDefaults() { } // StepOption is a functional option for configuring step execution parameters. -type StepOption func(*stepOptions) +type StepOption func(*StepOptions) // WithStepName sets a custom name for the step. If the step name has already been set // by a previous call to WithStepName, this option will be ignored func WithStepName(name string) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { if opts.stepName == "" { opts.stepName = name } @@ -1026,7 +1026,7 @@ func WithStepName(name string) StepOption { // WithStepMaxRetries sets the maximum number of retry attempts for the step. // A value of 0 means no retries (default behavior). func WithStepMaxRetries(maxRetries int) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { opts.maxRetries = maxRetries } } @@ -1035,7 +1035,7 @@ func WithStepMaxRetries(maxRetries int) StepOption { // The delay between retries is calculated as: BaseInterval * (BackoffFactor^(retry-1)) // Default value is 2.0. func WithBackoffFactor(factor float64) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { opts.backoffFactor = factor } } @@ -1043,7 +1043,7 @@ func WithBackoffFactor(factor float64) StepOption { // WithBaseInterval sets the initial delay between retries. // Default value is 100ms. func WithBaseInterval(interval time.Duration) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { opts.baseInterval = interval } } @@ -1051,15 +1051,15 @@ func WithBaseInterval(interval time.Duration) StepOption { // WithMaxInterval sets the maximum delay between retries. // Default value is 5s. func WithMaxInterval(interval time.Duration) StepOption { - return func(opts *stepOptions) { + return func(opts *StepOptions) { opts.maxInterval = interval } } func WithNextStepID(stepID int) StepOption { - return func(opts *stepOptions) { - opts.stepID = stepID + return func(opts *StepOptions) { + opts.preGeneratedStepID = &stepID } } @@ -1142,7 +1142,7 @@ func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) { // Process functional options - stepOpts := &stepOptions{} + stepOpts := &StepOptions{} for _, opt := range opts { opt(stepOpts) } @@ -1164,18 +1164,21 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) return fn(c) } + // Get stepID if it has been pre generated + var stepID int + if stepOpts.preGeneratedStepID != nil { + stepID = *stepOpts.preGeneratedStepID + } else { + stepID = wfState.nextStepID() // crucially, this increments the step ID on the *workflow* state + } + // Setup step state stepState := workflowState{ workflowID: wfState.workflowID, - stepID: wfState.nextStepID(), // crucially, this increments the step ID on the *workflow* state + stepID: stepID, isWithinStep: true, } - // this logic needs to be looked at - if stepOpts.stepID >= 0 { - stepState.stepID = stepOpts.stepID - } - // Uncancellable context for DBOS operations uncancellableCtx := WithoutCancel(c) @@ -1278,7 +1281,7 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { if !ok || wfState == nil { return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") } - stepID := wfState.NextStepID() + stepID := wfState.nextStepID() opts = append(opts, WithNextStepID(stepID)) // Type-erase the function From 723a6db5d02686e2839f94c2befd5227c4ea6261 Mon Sep 17 00:00:00 2001 From: af-md Date: Thu, 4 Sep 2025 20:37:37 +0100 Subject: [PATCH 06/18] remove extra stepID argument --- dbos/dbos.go | 2 +- dbos/workflow.go | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index f226f06..4706320 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -110,7 +110,7 @@ type DBOSContext interface { // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow - Go(_ DBOSContext, fn StepFunc, StepID int, opts ...StepOption) (any, error) // Execute a function as a durable step within a Go routine + Go(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a Go routine RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow diff --git a/dbos/workflow.go b/dbos/workflow.go index cd33ba2..168c9d1 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1285,11 +1285,10 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { opts = append(opts, WithNextStepID(stepID)) // Type-erase the function - typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) // run step inside a Go routine by passing stepID - result, err := ctx.Go(ctx, typeErasedFn, 1, opts...) + result, err := ctx.Go(ctx, typeErasedFn, opts...) // Step function could return a nil result if result == nil { @@ -1310,7 +1309,7 @@ type stepResultChan struct { } // TODO: Add docs --- will add once I get the implementation right -func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, stepID int, opts ...StepOption) (any, error) { +func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (any, error) { result := make(chan stepResultChan, 1) go func() { res, err := c.RunAsStep(ctx, fn, opts...) From ac7a1b146fe9c82d31890452b1ac79d09011e912 Mon Sep 17 00:00:00 2001 From: af-md Date: Sat, 6 Sep 2025 12:35:19 +0100 Subject: [PATCH 07/18] refactor: change Go function to return a channel of stepOutcome and simplify result handling --- dbos/workflow.go | 58 +++++++++++------------------------------------- 1 file changed, 13 insertions(+), 45 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 168c9d1..cab0401 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1063,6 +1063,12 @@ func WithNextStepID(stepID int) StepOption { } } +// StepOutcome holds the result and error from a step execution +type stepOutcome[R any] struct { + result R + err error +} + // RunAsStep executes a function as a durable step within a workflow. // Steps provide at-least-once execution guarantees and automatic retry capabilities. // If a step has already been executed (e.g., during workflow recovery), its recorded @@ -1263,65 +1269,27 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) } // TODO: Add docs --- will add once I get the implementation right -func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) { - if ctx == nil { - return *new(R), newStepExecutionError("", "", "ctx cannot be nil") - } - - if fn == nil { - return *new(R), newStepExecutionError("", "", "step function cannot be nil") - } - - // Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name - stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() - opts = append(opts, WithStepName(stepName)) - +func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcome[R], error) { // create a determistic step ID + // can we refactor this too? wfState, ok := ctx.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { - return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") + return nil, newStepExecutionError("", "", "workflow state not found in context: are you running this step within a workflow?") } stepID := wfState.nextStepID() opts = append(opts, WithNextStepID(stepID)) - // Type-erase the function - typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) - // run step inside a Go routine by passing stepID - result, err := ctx.Go(ctx, typeErasedFn, opts...) - - // Step function could return a nil result - if result == nil { - return *new(R), err - } - // Otherwise type-check and cast the result - typedResult, ok := result.(R) - if !ok { - return *new(R), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) - } - return typedResult, err -} - -// TODO: move type above -- keeping it here for in case I need to modify it quickly -type stepResultChan struct { - result any - err error -} - -// TODO: Add docs --- will add once I get the implementation right -func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (any, error) { - result := make(chan stepResultChan, 1) + result := make(chan stepOutcome[R], 1) go func() { - res, err := c.RunAsStep(ctx, fn, opts...) - result <- stepResultChan{ + res, err := RunAsStep(ctx, fn, opts...) + result <- stepOutcome[R]{ result: res, err: err, } }() - resultChan := <-result - close(result) - return resultChan.result, resultChan.err + return result, nil } /****************************************/ From 154ea2801d2fe0ce8170a5912b619353c06fe1c2 Mon Sep 17 00:00:00 2001 From: af-md Date: Sat, 6 Sep 2025 14:08:17 +0100 Subject: [PATCH 08/18] refactor: remove Go function from DBOSContext interface --- dbos/dbos.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index 4706320..f9aa829 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -110,7 +110,6 @@ type DBOSContext interface { // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow - Go(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a Go routine RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow From 09a949fed5b251d9e2d642c2a4e3fc9e6986240e Mon Sep 17 00:00:00 2001 From: af-md Date: Sat, 6 Sep 2025 18:56:15 +0100 Subject: [PATCH 09/18] test: add tests for Go function execution within workflows and introduce stepWithSleep function --- dbos/workflows_test.go | 84 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index f5672ec..2854f0a 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -47,6 +47,11 @@ func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } +func stepWithSleep(_ context.Context, duration time.Duration) (string, error) { + time.Sleep(duration) + return fmt.Sprintf("from step that slept for %s", duration), nil +} + func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) { return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStepError(ctx) @@ -860,6 +865,85 @@ func TestSteps(t *testing.T) { }) } +func TestGoRunningStepsInsideGoRoutines(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) + t.Run("Go must run steps inside a workflow", func(t *testing.T) { + _, err := Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 1*time.Second) + }) + require.Error(t, err, "expected error when running step outside of workflow context, but got none") + + dbosErr, ok := err.(*DBOSError) + require.True(t, ok, "expected error to be of type *DBOSError, got %T", err) + require.Equal(t, StepExecutionError, dbosErr.Code) + expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?" + require.Contains(t, err.Error(), expectedMessagePart, "expected error message to contain %q, but got %q", expectedMessagePart, err.Error()) + }) + + t.Run("Go must return step error correctly", func(t *testing.T) { + goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + result, _ := Go(dbosCtx, func(ctx context.Context) (string, error) { + return "", fmt.Errorf("step error") + }) + + resultChan := <-result + if resultChan.err != nil { + return "", resultChan.err + } + return resultChan.result, nil + } + + RegisterWorkflow(dbosCtx, goWorkflow) + + handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") + require.NoError(t, err, "failed to run go workflow") + _, err = handle.GetResult() + require.Error(t, err, "expected error when running step, but got none") + require.Equal(t, "step error", err.Error()) + }) + + t.Run("Go must execute 100 steps simultaneously", func(t *testing.T) { + // run 100 steps simultaneously + const numSteps = 100 + results := make(chan string, numSteps) + errors := make(chan error, numSteps) + var resultChans []<-chan stepOutcome[string] + + goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + for range numSteps { + resultChan, err := Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 20*time.Millisecond) + }) + + if err != nil { + return "", err + } + resultChans = append(resultChans, resultChan) + } + + for _, resultChan := range resultChans { + result1 := <-resultChan + if result1.err != nil { + errors <- result1.err + } + results <- result1.result + } + return "", nil + } + close(results) + close(errors) + + RegisterWorkflow(dbosCtx, goWorkflow) + handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") + require.NoError(t, err, "failed to run go workflow") + _, err = handle.GetResult() + require.NoError(t, err, "failed to get result from go workflow") + + assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results)) + assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors)) + }) +} + func TestChildWorkflow(t *testing.T) { dbosCtx := setupDBOS(t, true, true) From 3b211846b7ce2a98d43661bc7adee82c21bc68ba Mon Sep 17 00:00:00 2001 From: af-md Date: Sun, 7 Sep 2025 09:12:45 +0100 Subject: [PATCH 10/18] fix: ensure results and errors channels are closed after workflow execution in tests --- dbos/workflows_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 2854f0a..22fd008 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -930,15 +930,16 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { } return "", nil } - close(results) - close(errors) RegisterWorkflow(dbosCtx, goWorkflow) handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") require.NoError(t, err, "failed to run go workflow") _, err = handle.GetResult() - require.NoError(t, err, "failed to get result from go workflow") + + close(results) + close(errors) + require.NoError(t, err, "failed to get result from go workflow") assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results)) assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors)) }) From 81e86bef04369b6f2301962961711d242e4b6a23 Mon Sep 17 00:00:00 2001 From: af-md Date: Sun, 7 Sep 2025 09:30:03 +0100 Subject: [PATCH 11/18] fix: include step name in error message when workflow state is not found in context --- dbos/workflow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index cab0401..dd0829c 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1271,10 +1271,10 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) // TODO: Add docs --- will add once I get the implementation right func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcome[R], error) { // create a determistic step ID - // can we refactor this too? + stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() wfState, ok := ctx.Value(workflowStateKey).(*workflowState) if !ok || wfState == nil { - return nil, newStepExecutionError("", "", "workflow state not found in context: are you running this step within a workflow?") + return nil, newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") } stepID := wfState.nextStepID() opts = append(opts, WithNextStepID(stepID)) From df31e883d40ac28ef7cc1ab11911acabd3d58bd7 Mon Sep 17 00:00:00 2001 From: af-md Date: Mon, 8 Sep 2025 08:35:01 +0100 Subject: [PATCH 12/18] docs: enhance documentation for Go function, detailing its usage and step ID generation --- dbos/workflow.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index dd0829c..5930f2c 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1268,7 +1268,21 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) return stepOutput, stepError } -// TODO: Add docs --- will add once I get the implementation right +// Go runs a step inside a Go routine and returns a channel to receive the result. +// Go generates a deterministic step ID for the step before running the step in a routine, since routines are not deterministic. +// The step ID is used to track the steps within the same workflow and use the step ID to perform recovery. +// The folliwing examples shows how to use Go: +// +// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) { +// return "Hello, World!", nil +// }) +// +// resultChan := <-resultChan // wait for the channel to receive +// if resultChan.err != nil { +// // Handle error +// } +// result := resultChan.result +// func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcome[R], error) { // create a determistic step ID stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() @@ -1279,7 +1293,7 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcom stepID := wfState.nextStepID() opts = append(opts, WithNextStepID(stepID)) - // run step inside a Go routine by passing stepID + // run step inside a Go routine by passing a stepID result := make(chan stepOutcome[R], 1) go func() { res, err := RunAsStep(ctx, fn, opts...) From 7d0a4a14631e86bb1a1d27a0a2d7e26c9bc8d134 Mon Sep 17 00:00:00 2001 From: af-md Date: Wed, 10 Sep 2025 08:58:47 +0100 Subject: [PATCH 13/18] test: add validation for deterministic step IDs in Go workflow execution --- dbos/workflows_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 22fd008..1ca65fc 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -935,13 +935,21 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") require.NoError(t, err, "failed to run go workflow") _, err = handle.GetResult() - + close(results) close(errors) require.NoError(t, err, "failed to get result from go workflow") assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results)) assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors)) + + // Test step IDs are deterministic and in the order of execution + steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) + require.NoError(t, err, "failed to get workflow steps") + require.Len(t, steps, numSteps, "expected %d steps, got %d", numSteps, len(steps)) + for i := 0; i < numSteps; i++ { + assert.Equal(t, i, steps[i].StepID, "expected step ID to be %d, got %d", i, steps[i].StepID) + } }) } From c6f4f1d8d1b524f6a241a70c21ef4836b4db3b13 Mon Sep 17 00:00:00 2001 From: af-md Date: Thu, 16 Oct 2025 16:28:07 +0100 Subject: [PATCH 14/18] feat: add Go function to DBOSContext for executing steps in Go routines with result channel --- dbos/dbos.go | 1 + dbos/workflow.go | 74 ++++++++++++++++++++++++++++++++---------- dbos/workflows_test.go | 2 +- 3 files changed, 58 insertions(+), 19 deletions(-) diff --git a/dbos/dbos.go b/dbos/dbos.go index f9aa829..657dd81 100644 --- a/dbos/dbos.go +++ b/dbos/dbos.go @@ -111,6 +111,7 @@ type DBOSContext interface { // Workflow operations RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution + Go(_ DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) // Starts a step inside a Go routine and returns a channel to receive the result Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow diff --git a/dbos/workflow.go b/dbos/workflow.go index 5930f2c..79845dd 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1064,7 +1064,8 @@ func WithNextStepID(stepID int) StepOption { } // StepOutcome holds the result and error from a step execution -type stepOutcome[R any] struct { +// This struct is returned as part of a channel from the Go function when running the step inside a Go routine +type StepOutcome[R any] struct { result R err error } @@ -1271,19 +1272,56 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) // Go runs a step inside a Go routine and returns a channel to receive the result. // Go generates a deterministic step ID for the step before running the step in a routine, since routines are not deterministic. // The step ID is used to track the steps within the same workflow and use the step ID to perform recovery. -// The folliwing examples shows how to use Go: -// -// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) { -// return "Hello, World!", nil -// }) -// -// resultChan := <-resultChan // wait for the channel to receive -// if resultChan.err != nil { -// // Handle error -// } -// result := resultChan.result -// -func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcome[R], error) { +// The folliwing examples shows how to use Go: +// +// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) { +// return "Hello, World!", nil +// }) +// +// resultChan := <-resultChan // wait for the channel to receive +// if resultChan.err != nil { +// // Handle error +// } +// result := resultChan.result +func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) { + if ctx == nil { + // is this the correct return here? + return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil") + } + + if fn == nil { + return *new(chan StepOutcome[R]), newStepExecutionError("", "", "step function cannot be nil") + } + + // Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name + stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() + opts = append(opts, WithStepName(stepName)) + + // Type-erase the function + typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) + + result, err := ctx.Go(ctx, typeErasedFn, opts...) + // Step function could return a nil result + if result == nil { + return *new(chan StepOutcome[R]), err + } + + // Otherwise type-check and cast the result + outcome := <-result + typedResult, ok := outcome.result.(R) + if !ok { + return *new(chan StepOutcome[R]), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) + } + outcomeChan := make(chan StepOutcome[R], 1) + defer close(outcomeChan) + outcomeChan <- StepOutcome[R]{ + result: typedResult, + err: outcome.err, + } + return outcomeChan, nil +} + +func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) { // create a determistic step ID stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() wfState, ok := ctx.Value(workflowStateKey).(*workflowState) @@ -1293,11 +1331,11 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcom stepID := wfState.nextStepID() opts = append(opts, WithNextStepID(stepID)) - // run step inside a Go routine by passing a stepID - result := make(chan stepOutcome[R], 1) + // run step inside a Go routine by passing a stepID + result := make(chan StepOutcome[any], 1) go func() { - res, err := RunAsStep(ctx, fn, opts...) - result <- stepOutcome[R]{ + res, err := ctx.RunAsStep(ctx, fn, opts...) + result <- StepOutcome[any]{ result: res, err: err, } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 1ca65fc..8502fa4 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -907,7 +907,7 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { const numSteps = 100 results := make(chan string, numSteps) errors := make(chan error, numSteps) - var resultChans []<-chan stepOutcome[string] + var resultChans []<-chan StepOutcome[string] goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { for range numSteps { From 346bdc0114c43271fd2d2878580734256d4c8c52 Mon Sep 17 00:00:00 2001 From: af-md Date: Mon, 20 Oct 2025 16:24:18 +0100 Subject: [PATCH 15/18] refactor: improve error handling in Go function and update tests for custom output types --- dbos/workflow.go | 21 +++++++++++++++++---- dbos/workflows_test.go | 43 +++++++++++++++++++++++++++--------------- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 79845dd..50e6344 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1306,18 +1306,29 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcom return *new(chan StepOutcome[R]), err } - // Otherwise type-check and cast the result + outcomeChan := make(chan StepOutcome[R], 1) + defer close(outcomeChan) + outcome := <-result + + if outcome.err != nil { + outcomeChan <- StepOutcome[R]{ + result: *new(R), + err: outcome.err, + } + return outcomeChan, nil + } + + // Otherwise type-check and cast the result typedResult, ok := outcome.result.(R) if !ok { return *new(chan StepOutcome[R]), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) } - outcomeChan := make(chan StepOutcome[R], 1) - defer close(outcomeChan) outcomeChan <- StepOutcome[R]{ result: typedResult, - err: outcome.err, + err: nil, } + return outcomeChan, nil } @@ -1341,6 +1352,8 @@ func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan } }() + // TODO: do I need to close the channel here? + return result, nil } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 8502fa4..0c66665 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -2,6 +2,7 @@ package dbos import ( "context" + "encoding/gob" "errors" "fmt" "reflect" @@ -47,11 +48,26 @@ func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } +type stepWithSleepOutput struct { + StepID int + Result string + Error error +} + func stepWithSleep(_ context.Context, duration time.Duration) (string, error) { time.Sleep(duration) return fmt.Sprintf("from step that slept for %s", duration), nil } +func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) { + time.Sleep(duration) + return stepWithSleepOutput{ + StepID: stepID, + Result: fmt.Sprintf("from step that slept for %s", duration), + Error: nil, + }, nil +} + func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) { return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStepError(ctx) @@ -867,6 +883,10 @@ func TestSteps(t *testing.T) { func TestGoRunningStepsInsideGoRoutines(t *testing.T) { dbosCtx := setupDBOS(t, true, true) + + // Register custom types for Gob encoding + var stepOutput stepWithSleepOutput + gob.Register(stepOutput) t.Run("Go must run steps inside a workflow", func(t *testing.T) { _, err := Go(dbosCtx, func(ctx context.Context) (string, error) { return stepWithSleep(ctx, 1*time.Second) @@ -907,12 +927,12 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { const numSteps = 100 results := make(chan string, numSteps) errors := make(chan error, numSteps) - var resultChans []<-chan StepOutcome[string] + var resultChans []<-chan StepOutcome[stepWithSleepOutput] goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { - for range numSteps { - resultChan, err := Go(dbosCtx, func(ctx context.Context) (string, error) { - return stepWithSleep(ctx, 20*time.Millisecond) + for i := 0; i < numSteps; i++ { + resultChan, err := Go(dbosCtx, func(ctx context.Context) (stepWithSleepOutput, error) { + return stepWithSleepCustomOutput(ctx, 20*time.Millisecond, i) }) if err != nil { @@ -921,12 +941,13 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { resultChans = append(resultChans, resultChan) } - for _, resultChan := range resultChans { + for i, resultChan := range resultChans { result1 := <-resultChan if result1.err != nil { - errors <- result1.err + errors <- result1.result.Error } - results <- result1.result + assert.Equal(t, i, result1.result.StepID, "expected step ID to be %d, got %d", i, result1.result.StepID) + results <- result1.result.Result } return "", nil } @@ -938,18 +959,10 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { close(results) close(errors) - require.NoError(t, err, "failed to get result from go workflow") assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results)) assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors)) - // Test step IDs are deterministic and in the order of execution - steps, err := GetWorkflowSteps(dbosCtx, handle.GetWorkflowID()) - require.NoError(t, err, "failed to get workflow steps") - require.Len(t, steps, numSteps, "expected %d steps, got %d", numSteps, len(steps)) - for i := 0; i < numSteps; i++ { - assert.Equal(t, i, steps[i].StepID, "expected step ID to be %d, got %d", i, steps[i].StepID) - } }) } From 573a17b4078a13b06f2145270b1b32f04197fedb Mon Sep 17 00:00:00 2001 From: af-md Date: Mon, 20 Oct 2025 16:35:44 +0100 Subject: [PATCH 16/18] cleanup: remove commented TODO in Go function and tidy up test code by removing unnecessary line --- dbos/workflow.go | 2 -- dbos/workflows_test.go | 1 - 2 files changed, 3 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 50e6344..74020c8 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1352,8 +1352,6 @@ func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan } }() - // TODO: do I need to close the channel here? - return result, nil } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index 0c66665..f984700 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -956,7 +956,6 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") require.NoError(t, err, "failed to run go workflow") _, err = handle.GetResult() - close(results) close(errors) require.NoError(t, err, "failed to get result from go workflow") From d53546f47e2218dbd732a589e500259d93bf5928 Mon Sep 17 00:00:00 2001 From: af-md Date: Thu, 23 Oct 2025 12:08:27 +0100 Subject: [PATCH 17/18] refactor: reorganize test code for step execution and enhance determinism checks in Go workflows --- dbos/workflow.go | 1 - dbos/workflows_test.go | 122 +++++++++++++++++++++++++++++++++++------ 2 files changed, 105 insertions(+), 18 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 74020c8..6a1f8a3 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -1285,7 +1285,6 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) // result := resultChan.result func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) { if ctx == nil { - // is this the correct return here? return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil") } diff --git a/dbos/workflows_test.go b/dbos/workflows_test.go index f984700..6c6d6da 100644 --- a/dbos/workflows_test.go +++ b/dbos/workflows_test.go @@ -48,26 +48,11 @@ func simpleStepError(_ context.Context) (string, error) { return "", fmt.Errorf("step failure") } -type stepWithSleepOutput struct { - StepID int - Result string - Error error -} - func stepWithSleep(_ context.Context, duration time.Duration) (string, error) { time.Sleep(duration) return fmt.Sprintf("from step that slept for %s", duration), nil } -func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) { - time.Sleep(duration) - return stepWithSleepOutput{ - StepID: stepID, - Result: fmt.Sprintf("from step that slept for %s", duration), - Error: nil, - }, nil -} - func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) { return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { return simpleStepError(ctx) @@ -881,7 +866,37 @@ func TestSteps(t *testing.T) { }) } +type stepWithSleepOutput struct { + StepID int + Result string + Error error +} + +var ( + stepDeterminismStartEvent *Event + stepDeterminismEvent *Event +) + +func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) { + time.Sleep(duration) + return stepWithSleepOutput{ + StepID: stepID, + Result: fmt.Sprintf("from step that slept for %s", duration), + Error: nil, + }, nil +} + +// blocks indefinitely +func stepThatBlocks(_ context.Context) (string, error) { + stepDeterminismStartEvent.Set() + fmt.Println("stepThatBlocks: started to block") + stepDeterminismEvent.Wait() + fmt.Println("stepThatBlocks: unblocked") + return "from step that blocked", nil +} + func TestGoRunningStepsInsideGoRoutines(t *testing.T) { + dbosCtx := setupDBOS(t, true, true) // Register custom types for Gob encoding @@ -922,7 +937,7 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { require.Equal(t, "step error", err.Error()) }) - t.Run("Go must execute 100 steps simultaneously", func(t *testing.T) { + t.Run("Go must execute 100 steps simultaneously then return the stepIDs in the correct sequence", func(t *testing.T) { // run 100 steps simultaneously const numSteps = 100 results := make(chan string, numSteps) @@ -930,7 +945,7 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { var resultChans []<-chan StepOutcome[stepWithSleepOutput] goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { - for i := 0; i < numSteps; i++ { + for i := range numSteps { resultChan, err := Go(dbosCtx, func(ctx context.Context) (stepWithSleepOutput, error) { return stepWithSleepCustomOutput(ctx, 20*time.Millisecond, i) }) @@ -961,7 +976,80 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) { require.NoError(t, err, "failed to get result from go workflow") assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results)) assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors)) + }) + + t.Run("Go executes the same workflow twice, whilst blocking the first workflow, to test for deterministic execution when using Go routines", func(t *testing.T) { + + stepDeterminismStartEvent = NewEvent() + stepDeterminismEvent = NewEvent() + goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { + _, err := Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 1*time.Second) + }) + + if err != nil { + return "", err + } + + _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 1*time.Second) + }) + + if err != nil { + return "", err + } + + _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepWithSleep(ctx, 1*time.Second) + }) + + if err != nil { + return "", err + } + + _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { + return stepThatBlocks(ctx) + }) + + if err != nil { + return "", err + } + + return "WORKFLOW EXECUTED DETERMINISTICALLY", nil + } + + // Run the first workflow + RegisterWorkflow(dbosCtx, goWorkflow) + handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") + require.NoError(t, err, "failed to run go workflow") + + // Wait for the first workflow to reach the blocking step + stepDeterminismStartEvent.Wait() + stepDeterminismStartEvent.Clear() + + // Run the second workflow + handle2, err := RunWorkflow(dbosCtx, goWorkflow, "test-input", WithWorkflowID(handle.GetWorkflowID())) + + // If it throws an error, it's because of steps not being deterministically executed when using Go routines in the first workflow + require.NoError(t, err, "failed to run go workflow") + + // Complete the blocked workflow + stepDeterminismEvent.Set() + + _, err = handle2.GetResult() + require.NoError(t, err, "failed to get result from go workflow") + + // Verify workflow status is SUCCESS + status, err := handle.GetStatus() + require.NoError(t, err, "failed to get workflow status") + require.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be WorkflowStatusSuccess") + + + // Verify workflow result is "WORKFLOW EXECUTED DETERMINISTICALLY" + result, err := handle.GetResult() + require.NoError(t, err, "failed to get result from go workflow") + require.Equal(t, "WORKFLOW EXECUTED DETERMINISTICALLY", result, "expected result to be 'WORKFLOW EXECUTED DETERMINISTICALLY'") }) } From 27e3e9b36af8517572425a09375249ac5d810b7d Mon Sep 17 00:00:00 2001 From: af-md Date: Thu, 23 Oct 2025 12:39:14 +0100 Subject: [PATCH 18/18] refactor: rename StepOptions to stepOptions for consistency and update related function signatures --- dbos/workflow.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dbos/workflow.go b/dbos/workflow.go index 6a1f8a3..e21b9f4 100644 --- a/dbos/workflow.go +++ b/dbos/workflow.go @@ -987,8 +987,8 @@ type StepFunc func(ctx context.Context) (any, error) // Step represents a type-safe step function with a specific output type R. type Step[R any] func(ctx context.Context) (R, error) -// StepOptions holds the configuration for step execution using functional options pattern. -type StepOptions struct { +// stepOptions holds the configuration for step execution using functional options pattern. +type stepOptions struct { maxRetries int // Maximum number of retry attempts (0 = no retries) backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) baseInterval time.Duration // Initial delay between retries (default: 100ms) @@ -998,7 +998,7 @@ type StepOptions struct { } // setDefaults applies default values to stepOptions -func (opts *StepOptions) setDefaults() { +func (opts *stepOptions) setDefaults() { if opts.backoffFactor == 0 { opts.backoffFactor = _DEFAULT_STEP_BACKOFF_FACTOR } @@ -1011,12 +1011,12 @@ func (opts *StepOptions) setDefaults() { } // StepOption is a functional option for configuring step execution parameters. -type StepOption func(*StepOptions) +type StepOption func(*stepOptions) // WithStepName sets a custom name for the step. If the step name has already been set // by a previous call to WithStepName, this option will be ignored func WithStepName(name string) StepOption { - return func(opts *StepOptions) { + return func(opts *stepOptions) { if opts.stepName == "" { opts.stepName = name } @@ -1026,7 +1026,7 @@ func WithStepName(name string) StepOption { // WithStepMaxRetries sets the maximum number of retry attempts for the step. // A value of 0 means no retries (default behavior). func WithStepMaxRetries(maxRetries int) StepOption { - return func(opts *StepOptions) { + return func(opts *stepOptions) { opts.maxRetries = maxRetries } } @@ -1035,7 +1035,7 @@ func WithStepMaxRetries(maxRetries int) StepOption { // The delay between retries is calculated as: BaseInterval * (BackoffFactor^(retry-1)) // Default value is 2.0. func WithBackoffFactor(factor float64) StepOption { - return func(opts *StepOptions) { + return func(opts *stepOptions) { opts.backoffFactor = factor } } @@ -1043,7 +1043,7 @@ func WithBackoffFactor(factor float64) StepOption { // WithBaseInterval sets the initial delay between retries. // Default value is 100ms. func WithBaseInterval(interval time.Duration) StepOption { - return func(opts *StepOptions) { + return func(opts *stepOptions) { opts.baseInterval = interval } } @@ -1051,14 +1051,14 @@ func WithBaseInterval(interval time.Duration) StepOption { // WithMaxInterval sets the maximum delay between retries. // Default value is 5s. func WithMaxInterval(interval time.Duration) StepOption { - return func(opts *StepOptions) { + return func(opts *stepOptions) { opts.maxInterval = interval } } func WithNextStepID(stepID int) StepOption { - return func(opts *StepOptions) { + return func(opts *stepOptions) { opts.preGeneratedStepID = &stepID } } @@ -1149,7 +1149,7 @@ func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) { // Process functional options - stepOpts := &StepOptions{} + stepOpts := &stepOptions{} for _, opt := range opts { opt(stepOpts) }