Skip to content

Commit 6d3a84a

Browse files
af-mdmaxdml
authored andcommitted
assign stepID
1 parent ba8d253 commit 6d3a84a

File tree

1 file changed

+28
-25
lines changed

1 file changed

+28
-25
lines changed

dbos/workflow.go

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -987,18 +987,18 @@ type StepFunc func(ctx context.Context) (any, error)
987987
// Step represents a type-safe step function with a specific output type R.
988988
type Step[R any] func(ctx context.Context) (R, error)
989989

990-
// stepOptions holds the configuration for step execution using functional options pattern.
991-
type stepOptions struct {
992-
maxRetries int // Maximum number of retry attempts (0 = no retries)
993-
backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0)
994-
baseInterval time.Duration // Initial delay between retries (default: 100ms)
995-
maxInterval time.Duration // Maximum delay between retries (default: 5s)
996-
stepName string // Custom name for the step (defaults to function name)
997-
stepID int
990+
// StepOptions holds the configuration for step execution using functional options pattern.
991+
type StepOptions struct {
992+
maxRetries int // Maximum number of retry attempts (0 = no retries)
993+
backoffFactor float64 // Exponential backoff multiplier between retries (default: 2.0)
994+
baseInterval time.Duration // Initial delay between retries (default: 100ms)
995+
maxInterval time.Duration // Maximum delay between retries (default: 5s)
996+
stepName string // Custom name for the step (defaults to function name)
997+
preGeneratedStepID *int // Pre generated stepID in case we want to run the function in a Go routine
998998
}
999999

10001000
// setDefaults applies default values to stepOptions
1001-
func (opts *stepOptions) setDefaults() {
1001+
func (opts *StepOptions) setDefaults() {
10021002
if opts.backoffFactor == 0 {
10031003
opts.backoffFactor = _DEFAULT_STEP_BACKOFF_FACTOR
10041004
}
@@ -1011,12 +1011,12 @@ func (opts *stepOptions) setDefaults() {
10111011
}
10121012

10131013
// StepOption is a functional option for configuring step execution parameters.
1014-
type StepOption func(*stepOptions)
1014+
type StepOption func(*StepOptions)
10151015

10161016
// WithStepName sets a custom name for the step. If the step name has already been set
10171017
// by a previous call to WithStepName, this option will be ignored
10181018
func WithStepName(name string) StepOption {
1019-
return func(opts *stepOptions) {
1019+
return func(opts *StepOptions) {
10201020
if opts.stepName == "" {
10211021
opts.stepName = name
10221022
}
@@ -1026,7 +1026,7 @@ func WithStepName(name string) StepOption {
10261026
// WithStepMaxRetries sets the maximum number of retry attempts for the step.
10271027
// A value of 0 means no retries (default behavior).
10281028
func WithStepMaxRetries(maxRetries int) StepOption {
1029-
return func(opts *stepOptions) {
1029+
return func(opts *StepOptions) {
10301030
opts.maxRetries = maxRetries
10311031
}
10321032
}
@@ -1035,31 +1035,31 @@ func WithStepMaxRetries(maxRetries int) StepOption {
10351035
// The delay between retries is calculated as: BaseInterval * (BackoffFactor^(retry-1))
10361036
// Default value is 2.0.
10371037
func WithBackoffFactor(factor float64) StepOption {
1038-
return func(opts *stepOptions) {
1038+
return func(opts *StepOptions) {
10391039
opts.backoffFactor = factor
10401040
}
10411041
}
10421042

10431043
// WithBaseInterval sets the initial delay between retries.
10441044
// Default value is 100ms.
10451045
func WithBaseInterval(interval time.Duration) StepOption {
1046-
return func(opts *stepOptions) {
1046+
return func(opts *StepOptions) {
10471047
opts.baseInterval = interval
10481048
}
10491049
}
10501050

10511051
// WithMaxInterval sets the maximum delay between retries.
10521052
// Default value is 5s.
10531053
func WithMaxInterval(interval time.Duration) StepOption {
1054-
return func(opts *stepOptions) {
1054+
return func(opts *StepOptions) {
10551055
opts.maxInterval = interval
10561056
}
10571057
}
10581058

10591059

10601060
func WithNextStepID(stepID int) StepOption {
1061-
return func(opts *stepOptions) {
1062-
opts.stepID = stepID
1061+
return func(opts *StepOptions) {
1062+
opts.preGeneratedStepID = &stepID
10631063
}
10641064
}
10651065

@@ -1142,7 +1142,7 @@ func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error
11421142

11431143
func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption) (any, error) {
11441144
// Process functional options
1145-
stepOpts := &stepOptions{}
1145+
stepOpts := &StepOptions{}
11461146
for _, opt := range opts {
11471147
opt(stepOpts)
11481148
}
@@ -1164,18 +1164,21 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
11641164
return fn(c)
11651165
}
11661166

1167+
// Get stepID if it has been pre generated
1168+
var stepID int
1169+
if stepOpts.preGeneratedStepID != nil {
1170+
stepID = *stepOpts.preGeneratedStepID
1171+
} else {
1172+
stepID = wfState.nextStepID() // crucially, this increments the step ID on the *workflow* state
1173+
}
1174+
11671175
// Setup step state
11681176
stepState := workflowState{
11691177
workflowID: wfState.workflowID,
1170-
stepID: wfState.nextStepID(), // crucially, this increments the step ID on the *workflow* state
1178+
stepID: stepID,
11711179
isWithinStep: true,
11721180
}
11731181

1174-
// this logic needs to be looked at
1175-
if stepOpts.stepID >= 0 {
1176-
stepState.stepID = stepOpts.stepID
1177-
}
1178-
11791182
// Uncancellable context for DBOS operations
11801183
uncancellableCtx := WithoutCancel(c)
11811184

@@ -1278,7 +1281,7 @@ func Go[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) {
12781281
if !ok || wfState == nil {
12791282
return *new(R), newStepExecutionError("", stepName, "workflow state not found in context: are you running this step within a workflow?")
12801283
}
1281-
stepID := wfState.NextStepID()
1284+
stepID := wfState.nextStepID()
12821285
opts = append(opts, WithNextStepID(stepID))
12831286

12841287
// Type-erase the function

0 commit comments

Comments
 (0)