Skip to content

Commit c6f4f1d

Browse files
af-mdmaxdml
authored andcommitted
feat: add Go function to DBOSContext for executing steps in Go routines with result channel
1 parent 7d0a4a1 commit c6f4f1d

File tree

3 files changed

+58
-19
lines changed

3 files changed

+58
-19
lines changed

dbos/dbos.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ type DBOSContext interface {
111111
// Workflow operations
112112
RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) // Execute a function as a durable step within a workflow
113113
RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opts ...WorkflowOption) (WorkflowHandle[any], error) // Start a new workflow execution
114+
Go(_ DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) // Starts a step inside a Go routine and returns a channel to receive the result
114115
Send(_ DBOSContext, destinationID string, message any, topic string) error // Send a message to another workflow
115116
Recv(_ DBOSContext, topic string, timeout time.Duration) (any, error) // Receive a message sent to this workflow
116117
SetEvent(_ DBOSContext, key string, message any) error // Set a key-value event for this workflow

dbos/workflow.go

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,7 +1064,8 @@ func WithNextStepID(stepID int) StepOption {
10641064
}
10651065

10661066
// StepOutcome holds the result and error from a step execution
1067-
type stepOutcome[R any] struct {
1067+
// This struct is returned as part of a channel from the Go function when running the step inside a Go routine
1068+
type StepOutcome[R any] struct {
10681069
result R
10691070
err error
10701071
}
@@ -1271,19 +1272,56 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12711272
// Go runs a step inside a Go routine and returns a channel to receive the result.
12721273
// Go generates a deterministic step ID for the step before running the step in a routine, since routines are not deterministic.
12731274
// The step ID is used to track the steps within the same workflow and use the step ID to perform recovery.
1274-
// The folliwing examples shows how to use Go:
1275-
//
1276-
// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) {
1277-
// return "Hello, World!", nil
1278-
// })
1279-
//
1280-
// resultChan := <-resultChan // wait for the channel to receive
1281-
// if resultChan.err != nil {
1282-
// // Handle error
1283-
// }
1284-
// result := resultChan.result
1285-
//
1286-
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcome[R], error) {
1275+
// The folliwing examples shows how to use Go:
1276+
//
1277+
// resultChan, err := dbos.Go(ctx, func(ctx context.Context) (string, error) {
1278+
// return "Hello, World!", nil
1279+
// })
1280+
//
1281+
// resultChan := <-resultChan // wait for the channel to receive
1282+
// if resultChan.err != nil {
1283+
// // Handle error
1284+
// }
1285+
// result := resultChan.result
1286+
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan StepOutcome[R], error) {
1287+
if ctx == nil {
1288+
// is this the correct return here?
1289+
return *new(chan StepOutcome[R]), newStepExecutionError("", "", "ctx cannot be nil")
1290+
}
1291+
1292+
if fn == nil {
1293+
return *new(chan StepOutcome[R]), newStepExecutionError("", "", "step function cannot be nil")
1294+
}
1295+
1296+
// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
1297+
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
1298+
opts = append(opts, WithStepName(stepName))
1299+
1300+
// Type-erase the function
1301+
typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })
1302+
1303+
result, err := ctx.Go(ctx, typeErasedFn, opts...)
1304+
// Step function could return a nil result
1305+
if result == nil {
1306+
return *new(chan StepOutcome[R]), err
1307+
}
1308+
1309+
// Otherwise type-check and cast the result
1310+
outcome := <-result
1311+
typedResult, ok := outcome.result.(R)
1312+
if !ok {
1313+
return *new(chan StepOutcome[R]), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result)
1314+
}
1315+
outcomeChan := make(chan StepOutcome[R], 1)
1316+
defer close(outcomeChan)
1317+
outcomeChan <- StepOutcome[R]{
1318+
result: typedResult,
1319+
err: outcome.err,
1320+
}
1321+
return outcomeChan, nil
1322+
}
1323+
1324+
func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (chan StepOutcome[any], error) {
12871325
// create a determistic step ID
12881326
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
12891327
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
@@ -1293,11 +1331,11 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcom
12931331
stepID := wfState.nextStepID()
12941332
opts = append(opts, WithNextStepID(stepID))
12951333

1296-
// run step inside a Go routine by passing a stepID
1297-
result := make(chan stepOutcome[R], 1)
1334+
// run step inside a Go routine by passing a stepID
1335+
result := make(chan StepOutcome[any], 1)
12981336
go func() {
1299-
res, err := RunAsStep(ctx, fn, opts...)
1300-
result <- stepOutcome[R]{
1337+
res, err := ctx.RunAsStep(ctx, fn, opts...)
1338+
result <- StepOutcome[any]{
13011339
result: res,
13021340
err: err,
13031341
}

dbos/workflows_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -907,7 +907,7 @@ func TestGoRunningStepsInsideGoRoutines(t *testing.T) {
907907
const numSteps = 100
908908
results := make(chan string, numSteps)
909909
errors := make(chan error, numSteps)
910-
var resultChans []<-chan stepOutcome[string]
910+
var resultChans []<-chan StepOutcome[string]
911911

912912
goWorkflow := func(dbosCtx DBOSContext, input string) (string, error) {
913913
for range numSteps {

0 commit comments

Comments
 (0)