-
Notifications
You must be signed in to change notification settings - Fork 40
run a step inside a go routine #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f716ae9
a89b214
20a2beb
ba8d253
6d3a84a
723a6db
ac7a1b1
154ea28
09a949f
3b21184
81e86be
df31e88
7d0a4a1
c6f4f1d
346bdc0
573a17b
d53546f
27e3e9b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -989,11 +989,12 @@ | |
|
|
||
| // stepOptions holds the configuration for step execution using functional options pattern. | ||
| type stepOptions struct { | ||
| maxRetries int // Maximum number of retry attempts (0 = no retries) | ||
| backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) | ||
| baseInterval time.Duration // Initial delay between retries (default: 100ms) | ||
| maxInterval time.Duration // Maximum delay between retries (default: 5s) | ||
| stepName string // Custom name for the step (defaults to function name) | ||
| maxRetries int // Maximum number of retry attempts (0 = no retries) | ||
| backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0) | ||
| baseInterval time.Duration // Initial delay between retries (default: 100ms) | ||
| maxInterval time.Duration // Maximum delay between retries (default: 5s) | ||
| stepName string // Custom name for the step (defaults to function name) | ||
| preGeneratedStepID *int // Pre generated stepID in case we want to run the function in a Go routine | ||
| } | ||
|
|
||
| // setDefaults applies default values to stepOptions | ||
|
|
@@ -1055,6 +1056,20 @@ | |
| } | ||
| } | ||
|
|
||
|
|
||
| func WithNextStepID(stepID int) StepOption { | ||
| return func(opts *stepOptions) { | ||
| opts.preGeneratedStepID = &stepID | ||
| } | ||
| } | ||
|
|
||
| // StepOutcome holds the result and error from a step execution | ||
| // This struct is returned as part of a channel from the Go function when running the step inside a Go routine | ||
| type StepOutcome[R any] struct { | ||
| result R | ||
| err error | ||
| } | ||
|
|
||
| // RunAsStep executes a function as a durable step within a workflow. | ||
| // Steps provide at-least-once execution guarantees and automatic retry capabilities. | ||
| // If a step has already been executed (e.g., during workflow recovery), its recorded | ||
|
|
@@ -1156,10 +1171,18 @@ | |
| return fn(c) | ||
| } | ||
|
|
||
| // Get stepID if it has been pre generated | ||
| var stepID int | ||
| if stepOpts.preGeneratedStepID != nil { | ||
| stepID = *stepOpts.preGeneratedStepID | ||
| } else { | ||
| stepID = wfState.nextStepID() // crucially, this increments the step ID on the *workflow* state | ||
| } | ||
|
|
||
| // Setup step state | ||
| stepState := workflowState{ | ||
| workflowID: wfState.workflowID, | ||
| stepID: wfState.nextStepID(), // crucially, this increments the step ID on the *workflow* state | ||
| stepID: stepID, | ||
| isWithinStep: true, | ||
| } | ||
|
|
||
|
|
@@ -1246,6 +1269,91 @@ | |
| return stepOutput, stepError | ||
| } | ||
|
|
||
| // Go runs a step inside a Go routine and returns a channel to receive the result. | ||
| // Go generates a deterministic step ID for the step before running the step in a routine, since routines are not deterministic. | ||
| // The step ID is used to track the steps within the same workflow and use the step ID to perform recovery. | ||
| // The folliwing examples shows how to use Go: | ||
| // | ||
| // resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) { | ||
| // return "Hello, World!", nil | ||
| // }) | ||
| // | ||
| // resultChan := <-resultChan // wait for the channel to receive | ||
| // if resultChan.err != nil { | ||
| // // Handle error | ||
| // } | ||
| // result := resultChan.result | ||
| func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) { | ||
| if ctx == nil { | ||
| return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil") | ||
| } | ||
|
|
||
| if fn == nil { | ||
| return *new(chan StepOutcome[R]), newStepExecutionError("", "", "step function cannot be nil") | ||
| } | ||
|
|
||
| // Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name | ||
| stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() | ||
| opts = append(opts, WithStepName(stepName)) | ||
|
|
||
| // Type-erase the function | ||
| typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) }) | ||
|
|
||
| result, err := ctx.Go(ctx, typeErasedFn, opts...) | ||
| // Step function could return a nil result | ||
| if result == nil { | ||
| return *new(chan StepOutcome[R]), err | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need to close the |
||
|
|
||
| outcomeChan := make(chan StepOutcome[R], 1) | ||
| defer close(outcomeChan) | ||
|
|
||
| outcome := <-result | ||
|
|
||
| if outcome.err != nil { | ||
| outcomeChan <- StepOutcome[R]{ | ||
| result: *new(R), | ||
| err: outcome.err, | ||
| } | ||
| return outcomeChan, nil | ||
| } | ||
|
|
||
| // Otherwise type-check and cast the result | ||
| typedResult, ok := outcome.result.(R) | ||
| if !ok { | ||
| return *new(chan StepOutcome[R]), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result) | ||
| } | ||
| outcomeChan <- StepOutcome[R]{ | ||
| result: typedResult, | ||
| err: nil, | ||
| } | ||
|
Comment on lines
+1321
to
+1329
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're going to have to modify this after #175 is merged. Specifically, it is possible that the step output was returned from the database, encoded, in which case we'll need to decode it. We can mimick the code in
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also in case of error just return outcomeChan, not a nil channel (which reading from blocks forever). That's a bit of a footgun for the user. |
||
|
|
||
| return outcomeChan, nil | ||
| } | ||
|
|
||
| func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) { | ||
| // create a determistic step ID | ||
| stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not the step name we want to display: it is the name of the typed-erase step. We can retrieve the user-provided function's name by inspecting the options // Process functional options
stepOpts := &stepOptions{}
for _, opt := range opts {
opt(stepOpts)
}
name := stepOpts.stepName |
||
| wfState, ok := ctx.Value(workflowStateKey).(*workflowState) | ||
| if !ok || wfState == nil { | ||
| return nil, newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?") | ||
|
Check failure on line 1339 in dbos/workflow.go
|
||
| } | ||
| stepID := wfState.nextStepID() | ||
| opts = append(opts, WithNextStepID(stepID)) | ||
|
|
||
| // run step inside a Go routine by passing a stepID | ||
| result := make(chan StepOutcome[any], 1) | ||
| go func() { | ||
| res, err := ctx.RunAsStep(ctx, fn, opts...) | ||
| result <- StepOutcome[any]{ | ||
| result: res, | ||
| err: err, | ||
| } | ||
| }() | ||
|
|
||
| return result, nil | ||
| } | ||
|
|
||
| /****************************************/ | ||
| /******* WORKFLOW COMMUNICATIONS ********/ | ||
| /****************************************/ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ package dbos | |
|
|
||
| import ( | ||
| "context" | ||
| "encoding/gob" | ||
| "errors" | ||
| "fmt" | ||
| "reflect" | ||
|
|
@@ -47,6 +48,11 @@ func simpleStepError(_ context.Context) (string, error) { | |
| return "", fmt.Errorf("step failure") | ||
| } | ||
|
|
||
| func stepWithSleep(_ context.Context, duration time.Duration) (string, error) { | ||
| time.Sleep(duration) | ||
| return fmt.Sprintf("from step that slept for %s", duration), nil | ||
| } | ||
|
|
||
| func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, error) { | ||
| return RunAsStep(dbosCtx, func(ctx context.Context) (string, error) { | ||
| return simpleStepError(ctx) | ||
|
|
@@ -860,6 +866,193 @@ func TestSteps(t *testing.T) { | |
| }) | ||
| } | ||
|
|
||
| type stepWithSleepOutput struct { | ||
| StepID int | ||
| Result string | ||
| Error error | ||
| } | ||
|
|
||
| var ( | ||
| stepDeterminismStartEvent *Event | ||
| stepDeterminismEvent *Event | ||
| ) | ||
|
|
||
| func stepWithSleepCustomOutput(_ context.Context, duration time.Duration, stepID int) (stepWithSleepOutput, error) { | ||
| time.Sleep(duration) | ||
| return stepWithSleepOutput{ | ||
| StepID: stepID, | ||
| Result: fmt.Sprintf("from step that slept for %s", duration), | ||
| Error: nil, | ||
| }, nil | ||
| } | ||
|
|
||
| // blocks indefinitely | ||
| func stepThatBlocks(_ context.Context) (string, error) { | ||
| stepDeterminismStartEvent.Set() | ||
| fmt.Println("stepThatBlocks: started to block") | ||
| stepDeterminismEvent.Wait() | ||
| fmt.Println("stepThatBlocks: unblocked") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove print statements pls |
||
| return "from step that blocked", nil | ||
| } | ||
|
|
||
| func TestGoRunningStepsInsideGoRoutines(t *testing.T) { | ||
|
|
||
| dbosCtx := setupDBOS(t, true, true) | ||
|
|
||
| // Register custom types for Gob encoding | ||
| var stepOutput stepWithSleepOutput | ||
| gob.Register(stepOutput) | ||
| t.Run("Go must run steps inside a workflow", func(t *testing.T) { | ||
| _, err := Go(dbosCtx, func(ctx context.Context) (string, error) { | ||
| return stepWithSleep(ctx, 1*time.Second) | ||
| }) | ||
| require.Error(t, err, "expected error when running step outside of workflow context, but got none") | ||
|
|
||
| dbosErr, ok := err.(*DBOSError) | ||
| require.True(t, ok, "expected error to be of type *DBOSError, got %T", err) | ||
| require.Equal(t, StepExecutionError, dbosErr.Code) | ||
| expectedMessagePart := "workflow state not found in context: are you running this step within a workflow?" | ||
| require.Contains(t, err.Error(), expectedMessagePart, "expected error message to contain %q, but got %q", expectedMessagePart, err.Error()) | ||
| }) | ||
|
|
||
| t.Run("Go must return step error correctly", func(t *testing.T) { | ||
| goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { | ||
| result, _ := Go(dbosCtx, func(ctx context.Context) (string, error) { | ||
| return "", fmt.Errorf("step error") | ||
| }) | ||
|
|
||
| resultChan := <-result | ||
| if resultChan.err != nil { | ||
| return "", resultChan.err | ||
| } | ||
| return resultChan.result, nil | ||
| } | ||
|
|
||
| RegisterWorkflow(dbosCtx, goWorkflow) | ||
|
|
||
| handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") | ||
| require.NoError(t, err, "failed to run go workflow") | ||
| _, err = handle.GetResult() | ||
| require.Error(t, err, "expected error when running step, but got none") | ||
| require.Equal(t, "step error", err.Error()) | ||
| }) | ||
|
|
||
| t.Run("Go must execute 100 steps simultaneously then return the stepIDs in the correct sequence", func(t *testing.T) { | ||
| // run 100 steps simultaneously | ||
| const numSteps = 100 | ||
| results := make(chan string, numSteps) | ||
| errors := make(chan error, numSteps) | ||
| var resultChans []<-chan StepOutcome[stepWithSleepOutput] | ||
|
|
||
| goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { | ||
| for i := range numSteps { | ||
| resultChan, err := Go(dbosCtx, func(ctx context.Context) (stepWithSleepOutput, error) { | ||
| return stepWithSleepCustomOutput(ctx, 20*time.Millisecond, i) | ||
| }) | ||
|
|
||
| if err != nil { | ||
| return "", err | ||
| } | ||
| resultChans = append(resultChans, resultChan) | ||
| } | ||
|
|
||
| for i, resultChan := range resultChans { | ||
| result1 := <-resultChan | ||
| if result1.err != nil { | ||
| errors <- result1.result.Error | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we piping |
||
| } | ||
| assert.Equal(t, i, result1.result.StepID, "expected step ID to be %d, got %d", i, result1.result.StepID) | ||
| results <- result1.result.Result | ||
| } | ||
| return "", nil | ||
| } | ||
|
|
||
| RegisterWorkflow(dbosCtx, goWorkflow) | ||
| handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") | ||
| require.NoError(t, err, "failed to run go workflow") | ||
| _, err = handle.GetResult() | ||
| close(results) | ||
| close(errors) | ||
|
Comment on lines
+974
to
+975
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: defer the closing after creation |
||
| require.NoError(t, err, "failed to get result from go workflow") | ||
| assert.Equal(t, numSteps, len(results), "expected %d results, got %d", numSteps, len(results)) | ||
| assert.Equal(t, 0, len(errors), "expected no errors, got %d", len(errors)) | ||
| }) | ||
|
|
||
| t.Run("Go executes the same workflow twice, whilst blocking the first workflow, to test for deterministic execution when using Go routines", func(t *testing.T) { | ||
|
|
||
| stepDeterminismStartEvent = NewEvent() | ||
| stepDeterminismEvent = NewEvent() | ||
|
|
||
| goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) { | ||
| _, err := Go(dbosCtx, func(ctx context.Context) (string, error) { | ||
| return stepWithSleep(ctx, 1*time.Second) | ||
| }) | ||
|
|
||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { | ||
| return stepWithSleep(ctx, 1*time.Second) | ||
| }) | ||
|
|
||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { | ||
| return stepWithSleep(ctx, 1*time.Second) | ||
| }) | ||
|
|
||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| _, err = Go(dbosCtx, func(ctx context.Context) (string, error) { | ||
| return stepThatBlocks(ctx) | ||
| }) | ||
|
|
||
| if err != nil { | ||
| return "", err | ||
| } | ||
|
|
||
| return "WORKFLOW EXECUTED DETERMINISTICALLY", nil | ||
| } | ||
|
|
||
| // Run the first workflow | ||
| RegisterWorkflow(dbosCtx, goWorkflow) | ||
| handle, err := RunWorkflow(dbosCtx, goWorkflow, "test-input") | ||
| require.NoError(t, err, "failed to run go workflow") | ||
|
|
||
| // Wait for the first workflow to reach the blocking step | ||
| stepDeterminismStartEvent.Wait() | ||
| stepDeterminismStartEvent.Clear() | ||
|
|
||
| // Run the second workflow | ||
| handle2, err := RunWorkflow(dbosCtx, goWorkflow, "test-input", WithWorkflowID(handle.GetWorkflowID())) | ||
|
|
||
| // If it throws an error, it's because of steps not being deterministically executed when using Go routines in the first workflow | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment is not exactly accurate: determinism errors would come from |
||
| require.NoError(t, err, "failed to run go workflow") | ||
|
|
||
| // Complete the blocked workflow | ||
| stepDeterminismEvent.Set() | ||
|
|
||
| _, err = handle2.GetResult() | ||
| require.NoError(t, err, "failed to get result from go workflow") | ||
|
|
||
| // Verify workflow status is SUCCESS | ||
| status, err := handle.GetStatus() | ||
| require.NoError(t, err, "failed to get workflow status") | ||
| require.Equal(t, WorkflowStatusSuccess, status.Status, "expected workflow status to be WorkflowStatusSuccess") | ||
|
|
||
|
|
||
| // Verify workflow result is "WORKFLOW EXECUTED DETERMINISTICALLY" | ||
| result, err := handle.GetResult() | ||
| require.NoError(t, err, "failed to get result from go workflow") | ||
| require.Equal(t, "WORKFLOW EXECUTED DETERMINISTICALLY", result, "expected result to be 'WORKFLOW EXECUTED DETERMINISTICALLY'") | ||
| }) | ||
| } | ||
|
|
||
| func TestChildWorkflow(t *testing.T) { | ||
| dbosCtx := setupDBOS(t, true, true) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.