Skip to content

Commit 31315f4

Browse files
authored
Fix record operation result conflict handling (#178)
Do not ignore conflicts. When conflicts are detected (i.e., another process is attempting to record the outcome of the step after the first recording happened), the logic will detect it and return an conflict error. This helps the end user detect when unexpected concurrent executions take place. This PR also fixes the logic we use to detect concurrent executions when executing a workflow function. Specifically: - `newStepExecutionError` now wraps the underlying error (it was losing it). - Implement a custom `errors.Is` function to compare `DBOSError` instances based on their underlying code. - Replace the `errors.As` check by an `error.Is`. `As` looks up for the first instance of a matching error, whereas `Is` iterates through the error tree until its condition is satisfied. Note that the conflict detection is already exercised in the `TestWorkflowRecovery`, which recovers a workflow blocked in a step. Both original or recovery execution race to record the outcome of the step, and the loser falls back to polling the database for the workflow result.
1 parent c1f41a6 commit 31315f4

File tree

4 files changed

+47
-24
lines changed

4 files changed

+47
-24
lines changed

dbos/errors.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ func (e *DBOSError) Unwrap() error {
5555
return e.wrappedErr
5656
}
5757

58+
// Implements https://pkg.go.dev/errors#Is
59+
func (e *DBOSError) Is(target error) bool {
60+
t, ok := target.(*DBOSError)
61+
if !ok {
62+
return false
63+
}
64+
// Match if codes are equal (and target code is set)
65+
return t.Code != 0 && e.Code == t.Code
66+
}
67+
5868
func newConflictingWorkflowError(workflowID, message string) *DBOSError {
5969
msg := fmt.Sprintf("Conflicting workflow invocation with the same ID (%s)", workflowID)
6070
if message != "" {
@@ -147,12 +157,13 @@ func newWorkflowExecutionError(workflowID string, err error) *DBOSError {
147157
}
148158
}
149159

150-
func newStepExecutionError(workflowID, stepName, message string) *DBOSError {
160+
func newStepExecutionError(workflowID, stepName string, err error) *DBOSError {
151161
return &DBOSError{
152-
Message: fmt.Sprintf("Step %s in workflow %s execution error: %s", stepName, workflowID, message),
162+
Message: fmt.Sprintf("Step %s in workflow %s execution error: %v", stepName, workflowID, err),
153163
Code: StepExecutionError,
154164
WorkflowID: workflowID,
155165
StepName: stepName,
166+
wrappedErr: err,
156167
}
157168
}
158169

dbos/system_database.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,8 +1211,7 @@ type recordOperationResultDBInput struct {
12111211
func (s *sysDB) recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error {
12121212
query := fmt.Sprintf(`INSERT INTO %s.operation_outputs
12131213
(workflow_uuid, function_id, output, error, function_name)
1214-
VALUES ($1, $2, $3, $4, $5)
1215-
ON CONFLICT DO NOTHING`, pgx.Identifier{s.schema}.Sanitize())
1214+
VALUES ($1, $2, $3, $4, $5)`, pgx.Identifier{s.schema}.Sanitize())
12161215

12171216
var errorString *string
12181217
if input.err != nil {
@@ -1532,11 +1531,11 @@ func (s *sysDB) sleep(ctx context.Context, input sleepInput) (time.Duration, err
15321531
// Get workflow state from context
15331532
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
15341533
if !ok || wfState == nil {
1535-
return 0, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
1534+
return 0, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
15361535
}
15371536

15381537
if wfState.isWithinStep {
1539-
return 0, newStepExecutionError(wfState.workflowID, functionName, "cannot call Sleep within a step")
1538+
return 0, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Sleep within a step"))
15401539
}
15411540

15421541
// Determine step ID
@@ -1749,7 +1748,7 @@ func (s *sysDB) send(ctx context.Context, input WorkflowSendInput) error {
17491748
if ok && wfState != nil {
17501749
isInWorkflow = true
17511750
if wfState.isWithinStep {
1752-
return newStepExecutionError(wfState.workflowID, functionName, "cannot call Send within a step")
1751+
return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Send within a step"))
17531752
}
17541753
stepID = wfState.nextStepID()
17551754
}
@@ -1832,11 +1831,11 @@ func (s *sysDB) recv(ctx context.Context, input recvInput) (any, error) {
18321831
// Get workflow state from context
18331832
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
18341833
if !ok || wfState == nil {
1835-
return nil, newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
1834+
return nil, newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
18361835
}
18371836

18381837
if wfState.isWithinStep {
1839-
return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call Recv within a step")
1838+
return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call Recv within a step"))
18401839
}
18411840

18421841
stepID := wfState.nextStepID()
@@ -1988,11 +1987,11 @@ func (s *sysDB) setEvent(ctx context.Context, input WorkflowSetEventInput) error
19881987
// Get workflow state from context
19891988
wfState, ok := ctx.Value(workflowStateKey).(*workflowState)
19901989
if !ok || wfState == nil {
1991-
return newStepExecutionError("", functionName, "workflow state not found in context: are you running this step within a workflow?")
1990+
return newStepExecutionError("", functionName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
19921991
}
19931992

19941993
if wfState.isWithinStep {
1995-
return newStepExecutionError(wfState.workflowID, functionName, "cannot call SetEvent within a step")
1994+
return newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call SetEvent within a step"))
19961995
}
19971996

19981997
stepID := wfState.nextStepID()
@@ -2071,7 +2070,7 @@ func (s *sysDB) getEvent(ctx context.Context, input getEventInput) (any, error)
20712070
if ok && wfState != nil {
20722071
isInWorkflow = true
20732072
if wfState.isWithinStep {
2074-
return nil, newStepExecutionError(wfState.workflowID, functionName, "cannot call GetEvent within a step")
2073+
return nil, newStepExecutionError(wfState.workflowID, functionName, fmt.Errorf("cannot call GetEvent within a step"))
20752074
}
20762075
stepID = wfState.nextStepID()
20772076
sleepStepID = wfState.nextStepID() // We will use a sleep step to implement the timeout

dbos/workflow.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
742742

743743
// Prevent spawning child workflows from within a step
744744
if isChildWorkflow && parentWorkflowState.isWithinStep {
745-
return nil, newStepExecutionError(parentWorkflowState.workflowID, params.workflowName, "cannot spawn child workflow from within a step")
745+
return nil, newStepExecutionError(parentWorkflowState.workflowID, params.workflowName, fmt.Errorf("cannot spawn child workflow from within a step"))
746746
}
747747

748748
if isChildWorkflow {
@@ -935,8 +935,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
935935
result, err = fn(workflowCtx, input)
936936

937937
// Handle DBOS ID conflict errors by waiting workflow result
938-
var dbosErr *DBOSError
939-
if errors.As(err, &dbosErr) && dbosErr.Code == ConflictingIDError {
938+
if errors.Is(err, &DBOSError{Code: ConflictingIDError}) {
940939
c.logger.Warn("Workflow ID conflict detected. Waiting for existing workflow to complete", "workflow_id", workflowID)
941940
result, err = retryWithResult(c, func() (any, error) {
942941
return c.systemDB.awaitWorkflowResult(uncancellableCtx, workflowID)
@@ -1098,11 +1097,11 @@ func WithMaxInterval(interval time.Duration) StepOption {
10981097
// Under the hood, DBOS uses the provided context to manage durable execution.
10991098
func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error) {
11001099
if ctx == nil {
1101-
return *new(R), newStepExecutionError("", "", "ctx cannot be nil")
1100+
return *new(R), newStepExecutionError("", "", fmt.Errorf("ctx cannot be nil"))
11021101
}
11031102

11041103
if fn == nil {
1105-
return *new(R), newStepExecutionError("", "", "step function cannot be nil")
1104+
return *new(R), newStepExecutionError("", "", fmt.Errorf("step function cannot be nil"))
11061105
}
11071106

11081107
// Register the output type for gob encoding
@@ -1144,12 +1143,12 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
11441143
// Get workflow state from context
11451144
wfState, ok := c.Value(workflowStateKey).(*workflowState)
11461145
if !ok || wfState == nil {
1147-
return nil, newStepExecutionError("", stepOpts.stepName, "workflow state not found in context: are you running this step within a workflow?")
1146+
return nil, newStepExecutionError("", stepOpts.stepName, fmt.Errorf("workflow state not found in context: are you running this step within a workflow?"))
11481147
}
11491148

11501149
// This should not happen when called from the package-level RunAsStep
11511150
if fn == nil {
1152-
return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, "step function cannot be nil")
1151+
return nil, newStepExecutionError(wfState.workflowID, stepOpts.stepName, fmt.Errorf("step function cannot be nil"))
11531152
}
11541153

11551154
// If within a step, just run the function directly
@@ -1176,7 +1175,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
11761175
})
11771176
}, withRetrierLogger(c.logger))
11781177
if err != nil {
1179-
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("checking operation execution: %v", err))
1178+
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("checking operation execution: %w", err))
11801179
}
11811180
if recordedOutput != nil {
11821181
return recordedOutput.output, recordedOutput.err
@@ -1205,7 +1204,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12051204
// Wait before retry
12061205
select {
12071206
case <-c.Done():
1208-
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("context cancelled during retry: %v", c.Err()))
1207+
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Errorf("context cancelled during retry: %w", c.Err()))
12091208
case <-time.After(delay):
12101209
// Continue to retry
12111210
}
@@ -1241,7 +1240,7 @@ func (c *dbosContext) RunAsStep(_ DBOSContext, fn StepFunc, opts ...StepOption)
12411240
return c.systemDB.recordOperationResult(uncancellableCtx, dbInput)
12421241
}, withRetrierLogger(c.logger))
12431242
if recErr != nil {
1244-
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, fmt.Sprintf("recording step outcome: %v", recErr))
1243+
return nil, newStepExecutionError(stepState.workflowID, stepOpts.stepName, recErr)
12451244
}
12461245

12471246
return stepOutput, stepError

dbos/workflows_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,6 +1285,7 @@ func TestWorkflowRecovery(t *testing.T) {
12851285
recoveryCounters []int64
12861286
recoveryEvents []*Event
12871287
blockingEvents []*Event
1288+
secondStepErrors []error
12881289
)
12891290

12901291
recoveryWorkflow := func(dbosCtx DBOSContext, index int) (int64, error) {
@@ -1306,6 +1307,7 @@ func TestWorkflowRecovery(t *testing.T) {
13061307
return fmt.Sprintf("completed-%d", index), nil
13071308
}, WithStepName(fmt.Sprintf("BlockingStep-%d", index)))
13081309
if err != nil {
1310+
secondStepErrors = append(secondStepErrors, err)
13091311
return 0, err
13101312
}
13111313

@@ -1324,6 +1326,7 @@ func TestWorkflowRecovery(t *testing.T) {
13241326
recoveryCounters = make([]int64, numWorkflows)
13251327
recoveryEvents = make([]*Event, numWorkflows)
13261328
blockingEvents = make([]*Event, numWorkflows)
1329+
secondStepErrors = make([]error, 0)
13271330

13281331
// Create events for each workflow
13291332
for i := range numWorkflows {
@@ -1425,6 +1428,16 @@ func TestWorkflowRecovery(t *testing.T) {
14251428
assert.Nil(t, steps[0].Error, "workflow %d first step should not have error", i)
14261429
assert.Nil(t, steps[1].Error, "workflow %d second step should not have error", i)
14271430
}
1431+
1432+
// At least 5 of the 2nd steps should have errored due to execution race
1433+
// Check they are DBOSErrors with StepExecutionError wrapping a ConflictingIDError
1434+
require.GreaterOrEqual(t, len(secondStepErrors), 5, "expected at least 5 errors from second steps due to recovery race, got %d", len(secondStepErrors))
1435+
for _, err := range secondStepErrors {
1436+
dbosErr, ok := err.(*DBOSError)
1437+
require.True(t, ok, "expected error to be of type *DBOSError, got %T", err)
1438+
require.Equal(t, StepExecutionError, dbosErr.Code, "expected error code to be StepExecutionError, got %v", dbosErr.Code)
1439+
require.True(t, errors.Is(dbosErr.Unwrap(), &DBOSError{Code: ConflictingIDError}), "expected underlying error to be ConflictingIDError, got %T", dbosErr.Unwrap())
1440+
}
14281441
})
14291442
}
14301443

@@ -4149,9 +4162,10 @@ func TestGarbageCollect(t *testing.T) {
41494162
found = true
41504163
require.Equal(t, WorkflowStatusPending, wf.Status, "blocked workflow should still be pending")
41514164
}
4152-
if wf.Status == WorkflowStatusPending {
4165+
switch wf.Status {
4166+
case WorkflowStatusPending:
41534167
pendingCount++
4154-
} else if wf.Status == WorkflowStatusSuccess {
4168+
case WorkflowStatusSuccess:
41554169
completedCount++
41564170
}
41574171
}

0 commit comments

Comments
 (0)