Skip to content

Commit ac7a1b1

Browse files
af-mdmaxdml
authored andcommitted
refactor: change Go function to return a channel of stepOutcome and simplify result handling
1 parent 723a6db commit ac7a1b1

File tree

1 file changed

+13
-45
lines changed

1 file changed

+13
-45
lines changed

dbos/workflow.go

Lines changed: 13 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,6 +1063,12 @@ func WithNextStepID(stepID int) StepOption {
10631063
}
10641064
}
10651065

1066+
// StepOutcome holds the result and error from a step execution
1067+
type stepOutcome[R any] struct {
1068+
result R
1069+
err error
1070+
}
1071+
10661072
// RunAsStep executes a function as a durable step within a workflow.
10671073
// Steps provide at-least-once execution guarantees and automatic retry capabilities.
10681074
// If a step has already been executed (e.g., during workflow recovery), its recorded
@@ -1263,65 +1269,27 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12631269
}
12641270

12651271
// TODO: Add docs --- will add once I get the implementation right
1266-
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) {
1267-
if ctx == nil {
1268-
return *new(R), newStepExecutionError("", "", "ctx cannot be nil")
1269-
}
1270-
1271-
if fn == nil {
1272-
return *new(R), newStepExecutionError("", "", "step function cannot be nil")
1273-
}
1274-
1275-
// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
1276-
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
1277-
opts = append(opts, WithStepName(stepName))
1278-
1272+
func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (chan stepOutcome[R], error) {
12791273
// create a determistic step ID
1274+
// can we refactor this too?
12801275
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
12811276
if !ok || wfState == nil {
1282-
return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?")
1277+
return nil, newStepExecutionError("", "", "workflow state not found in context: are you running this step within a workflow?")
12831278
}
12841279
stepID := wfState.nextStepID()
12851280
opts = append(opts, WithNextStepID(stepID))
12861281

1287-
// Type-erase the function
1288-
typeErasedFn := StepFunc(func(ctx context.Context) (any, error) { return fn(ctx) })
1289-
12901282
// run step inside a Go routine by passing stepID
1291-
result, err := ctx.Go(ctx, typeErasedFn, opts...)
1292-
1293-
// Step function could return a nil result
1294-
if result == nil {
1295-
return *new(R), err
1296-
}
1297-
// Otherwise type-check and cast the result
1298-
typedResult, ok := result.(R)
1299-
if !ok {
1300-
return *new(R), fmt.Errorf("unexpected result type: expected %T, got %T", *new(R), result)
1301-
}
1302-
return typedResult, err
1303-
}
1304-
1305-
// TODO: move type above -- keeping it here for in case I need to modify it quickly
1306-
type stepResultChan struct {
1307-
result any
1308-
err error
1309-
}
1310-
1311-
// TODO: Add docs --- will add once I get the implementation right
1312-
func (c *dbosContext) Go(ctx DBOSContext, fn StepFunc, opts ...StepOption) (any, error) {
1313-
result := make(chan stepResultChan, 1)
1283+
result := make(chan stepOutcome[R], 1)
13141284
go func() {
1315-
res, err := c.RunAsStep(ctx, fn, opts...)
1316-
result <- stepResultChan{
1285+
res, err := RunAsStep(ctx, fn, opts...)
1286+
result <- stepOutcome[R]{
13171287
result: res,
13181288
err: err,
13191289
}
13201290
}()
13211291

1322-
resultChan := <-result
1323-
close(result)
1324-
return resultChan.result, resultChan.err
1292+
return result, nil
13251293
}
13261294

13271295
/****************************************/

0 commit comments

Comments
 (0)