Skip to content

Commit 74deb3a

Browse files
committed
feat: add Go function to DBOSContext for executing steps in Go routines with result channel
1 parent 134949b commit 74deb3a

File tree

3 files changed

+58
-19
lines changed

3 files changed

+58
-19
lines changed

dbos/dbos.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ type DBOSContext interface {
111111
// Workflow operations
112112
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
113113
RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
114+
Go(_ DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) // Starts a step inside a Go routine and returns a channel to receive the result
114115
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
115116
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
116117
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow

dbos/workflow.go

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,7 +1015,8 @@ func WithNextStepID(stepID int) StepOption {
10151015
}
10161016

10171017
// StepOutcome holds the result and error from a step execution
1018-
type stepOutcome[R any] struct {
1018+
// This struct is returned as part of a channel from the Go function when running the step inside a Go routine
1019+
type StepOutcome[R any] struct {
10191020
result R
10201021
err error
10211022
}
@@ -1222,19 +1223,56 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12221223
// Go runs a step inside a Go routine and returns a channel to receive the result.
12231224
// Go generates a deterministic step ID for the step before running the step in a routine, since routines are not deterministic.
12241225
// The step ID is used to track the steps within the same workflow and use the step ID to perform recovery.
1225-
// The folliwing examples shows how to use Go:
1226-
//
1227-
// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) {
1228-
// return "Hello, World!", nil
1229-
// })
1230-
//
1231-
// resultChan := <-resultChan // wait for the channel to receive
1232-
// if resultChan.err != nil {
1233-
// // Handle error
1234-
// }
1235-
// result := resultChan.result
1236-
//
1237-
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcome[R], error) {
1226+
// The folliwing examples shows how to use Go:
1227+
//
1228+
// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) {
1229+
// return "Hello, World!", nil
1230+
// })
1231+
//
1232+
// resultChan := <-resultChan // wait for the channel to receive
1233+
// if resultChan.err != nil {
1234+
// // Handle error
1235+
// }
1236+
// result := resultChan.result
1237+
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) {
1238+
if ctx == nil {
1239+
// is this the correct return here?
1240+
return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil")
1241+
}
1242+
1243+
if fn == nil {
1244+
return *new(chan StepOutcome[R]), newStepExecutionError("", "", "step function cannot be nil")
1245+
}
1246+
1247+
// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
1248+
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
1249+
opts = append(opts, WithStepName(stepName))
1250+
1251+
// Type-erase the function
1252+
typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })
1253+
1254+
result, err := ctx.Go(ctx, typeErasedFn, opts...)
1255+
// Step function could return a nil result
1256+
if result == nil {
1257+
return *new(chan StepOutcome[R]), err
1258+
}
1259+
1260+
// Otherwise type-check and cast the result
1261+
outcome := <-result
1262+
typedResult, ok := outcome.result.(R)
1263+
if !ok {
1264+
return *new(chan StepOutcome[R]), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result)
1265+
}
1266+
outcomeChan := make(chan StepOutcome[R], 1)
1267+
defer close(outcomeChan)
1268+
outcomeChan <- StepOutcome[R]{
1269+
result: typedResult,
1270+
err: outcome.err,
1271+
}
1272+
return outcomeChan, nil
1273+
}
1274+
1275+
func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) {
12381276
// create a determistic step ID
12391277
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
12401278
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
@@ -1244,11 +1282,11 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcom
12441282
stepID := wfState.nextStepID()
12451283
opts = append(opts, WithNextStepID(stepID))
12461284

1247-
// run step inside a Go routine by passing a stepID
1248-
result := make(chan stepOutcome[R], 1)
1285+
// run step inside a Go routine by passing a stepID
1286+
result := make(chan StepOutcome[any], 1)
12491287
go func() {
1250-
res, err := RunAsStep(ctx, fn, opts...)
1251-
result <- stepOutcome[R]{
1288+
res, err := ctx.RunAsStep(ctx, fn, opts...)
1289+
result <- StepOutcome[any]{
12521290
result: res,
12531291
err: err,
12541292
}

dbos/workflows_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) {
902902
const numSteps = 100
903903
results := make(chan string, numSteps)
904904
errors := make(chan error, numSteps)
905-
var resultChans []<-chan stepOutcome[string]
905+
var resultChans []<-chan StepOutcome[string]
906906

907907
goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
908908
for range numSteps {

0 commit comments

Comments
 (0)