Skip to content

Commit e79823b

Browse files
authored
Handle DBOS operations as workflow steps (#120)
Call DBOS operations inside `RunAsStep` if within a workflow. One thing this PR had to fix is the forked workflow ID generation, which was at a higher level and not idempotent. The systemDB forkWorkflow now returns the forked workflow ID.
1 parent 7741ebe commit e79823b

File tree

6 files changed

+372
-85
lines changed

6 files changed

+372
-85
lines changed

dbos/dbos.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,10 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
318318
// Register types we serialize with gob
319319
var t time.Time
320320
gob.Register(t)
321+
var ws []WorkflowStatus
322+
gob.Register(ws)
323+
var si []StepInfo
324+
gob.Register(si)
321325

322326
// Initialize global variables from processed config (already handles env vars and defaults)
323327
initExecutor.applicationVersion = config.ApplicationVersion

dbos/queues_test.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -926,23 +926,17 @@ func TestQueueTimeouts(t *testing.T) {
926926
}
927927
RegisterWorkflow(dbosCtx, queuedWaitForCancelWorkflow)
928928

929-
enqueuedWorkflowEnqueuesATimeoutWorkflow := func(ctx DBOSContext, _ string) (string, error) {
929+
enqueuedWorkflowEnqueuesATimeoutWorkflow := func(ctx DBOSContext, childWorkflowID string) (string, error) {
930930
// This workflow will enqueue a workflow that waits indefinitely until it is cancelled
931-
handle, err := RunWorkflow(ctx, queuedWaitForCancelWorkflow, "enqueued-wait-for-cancel", WithQueue(timeoutQueue.Name))
931+
handle, err := RunWorkflow(ctx, queuedWaitForCancelWorkflow, "enqueued-wait-for-cancel", WithQueue(timeoutQueue.Name), WithWorkflowID(childWorkflowID))
932932
require.NoError(t, err, "failed to start enqueued wait for cancel workflow")
933933
// Workflow should get AwaitedWorkflowCancelled DBOSError
934934
_, err = handle.GetResult()
935935
require.Error(t, err, "expected error when waiting for enqueued workflow to complete, but got none")
936936
var dbosErr *DBOSError
937937
require.ErrorAs(t, err, &dbosErr, "expected error to be of type *DBOSError, got %T", err)
938938
assert.Equal(t, AwaitedWorkflowCancelled, dbosErr.Code, "expected error code to be AwaitedWorkflowCancelled")
939-
940-
// enqueud workflow should have been cancelled
941-
status, err := handle.GetStatus()
942-
require.NoError(t, err, "failed to get status of enqueued workflow")
943-
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected enqueued workflow status to be WorkflowStatusCancelled")
944-
945-
return "should-never-see-this", nil
939+
return "", nil
946940
}
947941
RegisterWorkflow(dbosCtx, enqueuedWorkflowEnqueuesATimeoutWorkflow)
948942

@@ -956,19 +950,20 @@ func TestQueueTimeouts(t *testing.T) {
956950
}
957951

958952
enqueuedWorkflowEnqueuesADetachedWorkflow := func(ctx DBOSContext, timeout time.Duration) (string, error) {
953+
myId, err := GetWorkflowID(ctx)
954+
if err != nil {
955+
return "", fmt.Errorf("failed to get workflow ID: %v", err)
956+
}
957+
childID := fmt.Sprintf("%s-child", myId)
959958
// This workflow will enqueue a workflow that is not cancelable
960959
childCtx := WithoutCancel(ctx)
961-
handle, err := RunWorkflow(childCtx, detachedWorkflow, timeout*2, WithQueue(timeoutQueue.Name))
960+
handle, err := RunWorkflow(childCtx, detachedWorkflow, timeout*2, WithQueue(timeoutQueue.Name), WithWorkflowID(childID))
962961
require.NoError(t, err, "failed to start enqueued detached workflow")
963962

964963
// Wait for the enqueued workflow to complete
965964
result, err := handle.GetResult()
966965
require.NoError(t, err, "failed to get result from enqueued detached workflow")
967966
assert.Equal(t, "detached-workflow-completed", result, "expected result to be 'detached-workflow-completed'")
968-
// Check the workflow status: should be success
969-
status, err := handle.GetStatus()
970-
require.NoError(t, err, "failed to get enqueued detached workflow status")
971-
assert.Equal(t, WorkflowStatusSuccess, status.Status, "expected enqueued detached workflow status to be WorkflowStatusSuccess")
972967
return result, nil
973968
}
974969

@@ -1022,7 +1017,8 @@ func TestQueueTimeouts(t *testing.T) {
10221017
cancelCtx, cancelFunc := WithTimeout(dbosCtx, 1*time.Millisecond)
10231018
defer cancelFunc() // Ensure we clean up the context
10241019

1025-
handle, err := RunWorkflow(cancelCtx, enqueuedWorkflowEnqueuesATimeoutWorkflow, "enqueue-timeout-workflow", WithQueue(timeoutQueue.Name))
1020+
childWorkflowID := uuid.NewString()
1021+
handle, err := RunWorkflow(cancelCtx, enqueuedWorkflowEnqueuesATimeoutWorkflow, childWorkflowID, WithQueue(timeoutQueue.Name))
10261022
require.NoError(t, err, "failed to start enqueued workflow")
10271023

10281024
// Wait for the workflow to complete and get the result
@@ -1042,6 +1038,18 @@ func TestQueueTimeouts(t *testing.T) {
10421038
require.NoError(t, err, "failed to get workflow status")
10431039
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected workflow status to be WorkflowStatusCancelled")
10441040

1041+
// Wait for the child workflow status to become cancelled
1042+
require.Eventually(t, func() bool {
1043+
childHandle, err := RetrieveWorkflow[string](dbosCtx, childWorkflowID)
1044+
require.NoError(t, err, "failed to retrieve child workflow")
1045+
1046+
status, err := childHandle.GetStatus()
1047+
if err != nil {
1048+
return false
1049+
}
1050+
return status.Status == WorkflowStatusCancelled
1051+
}, 5*time.Second, 100*time.Millisecond, "expected enqueued workflow status to be WorkflowStatusCancelled")
1052+
10451053
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after workflow cancellation, but they are not")
10461054
})
10471055

@@ -1071,6 +1079,19 @@ func TestQueueTimeouts(t *testing.T) {
10711079
require.NoError(t, err, "failed to get enqueued detached workflow status")
10721080
assert.Equal(t, WorkflowStatusCancelled, status.Status, "expected enqueued detached workflow status to be WorkflowStatusCancelled")
10731081

1082+
// Check the child's status: should be success because it is detached
1083+
require.Eventually(t, func() bool {
1084+
childID := fmt.Sprintf("%s-child", handle.GetWorkflowID())
1085+
childHandle, err := RetrieveWorkflow[string](dbosCtx, childID)
1086+
require.NoError(t, err, "failed to retrieve detached workflow")
1087+
1088+
status, err := childHandle.GetStatus()
1089+
if err != nil {
1090+
return false
1091+
}
1092+
return status.Status == WorkflowStatusSuccess
1093+
}, 5*time.Second, 100*time.Millisecond, "expected detached workflow status to be WorkflowStatusSuccess")
1094+
10741095
require.True(t, queueEntriesAreCleanedUp(dbosCtx), "expected queue entries to be cleaned up after workflow cancellation, but they are not")
10751096
})
10761097

dbos/serialization_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,12 @@ func TestWorkflowEncoding(t *testing.T) {
9696
assert.Equal(t, "workflow error: step error", err.Error())
9797

9898
// Test results from ListWorkflows
99-
workflows, err := ListWorkflows(executor, WithWorkflowIDs(
100-
[]string{directHandle.GetWorkflowID()},
101-
))
99+
workflows, err := ListWorkflows(
100+
executor,
101+
WithWorkflowIDs([]string{directHandle.GetWorkflowID()}),
102+
WithLoadInput(true),
103+
WithLoadOutput(true),
104+
)
102105
require.NoError(t, err)
103106
require.Len(t, workflows, 1)
104107
workflow := workflows[0]
@@ -155,9 +158,12 @@ func TestWorkflowEncoding(t *testing.T) {
155158
assert.Equal(t, "processed by encodingStepStruct", retrievedResult.B)
156159

157160
// Test results from ListWorkflows
158-
workflows, err := ListWorkflows(executor, WithWorkflowIDs(
159-
[]string{directHandle.GetWorkflowID()},
160-
))
161+
workflows, err := ListWorkflows(executor,
162+
WithWorkflowIDs([]string{directHandle.GetWorkflowID()}),
163+
WithLoadInput(true),
164+
WithLoadOutput(true),
165+
)
166+
require.Len(t, workflows, 1)
161167
require.NoError(t, err)
162168
workflow := workflows[0]
163169
require.NotNil(t, workflow.Input)

dbos/system_database.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"sync"
1313
"time"
1414

15+
"github.com/google/uuid"
1516
"github.com/jackc/pgx/v5"
1617
"github.com/jackc/pgx/v5/pgconn"
1718
"github.com/jackc/pgx/v5/pgxpool"
@@ -35,7 +36,7 @@ type systemDatabase interface {
3536
cancelWorkflow(ctx context.Context, workflowID string) error
3637
cancelAllBefore(ctx context.Context, cutoffTime time.Time) error
3738
resumeWorkflow(ctx context.Context, workflowID string) error
38-
forkWorkflow(ctx context.Context, input forkWorkflowDBInput) error
39+
forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (string, error)
3940

4041
// Child workflows
4142
recordChildWorkflow(ctx context.Context, input recordChildWorkflowDBInput) error
@@ -657,7 +658,7 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
657658
var executorID *string
658659

659660
// Build scan arguments dynamically based on loaded columns
660-
scanArgs := []interface{}{
661+
scanArgs := []any{
661662
&wf.ID, &wf.Status, &wf.Name, &wf.AuthenticatedUser, &wf.AssumedRole,
662663
&wf.AuthenticatedRoles, &executorID, &createdAtMs,
663664
&updatedAtMs, &applicationVersion, &wf.ApplicationID,
@@ -849,7 +850,6 @@ func (s *sysDB) cancelAllBefore(ctx context.Context, cutoffTime time.Time) error
849850
// If desired we could funnel the errors back the caller (conductor, admin server)
850851
}
851852
}
852-
853853
return nil
854854
}
855855

@@ -977,15 +977,21 @@ type forkWorkflowDBInput struct {
977977
applicationVersion string
978978
}
979979

980-
func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) error {
980+
func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (string, error) {
981+
// Generate new workflow ID if not provided
982+
forkedWorkflowID := input.forkedWorkflowID
983+
if forkedWorkflowID == "" {
984+
forkedWorkflowID = uuid.New().String()
985+
}
986+
981987
// Validate startStep
982988
if input.startStep < 0 {
983-
return fmt.Errorf("startStep must be >= 0, got %d", input.startStep)
989+
return "", fmt.Errorf("startStep must be >= 0, got %d", input.startStep)
984990
}
985991

986992
tx, err := s.pool.Begin(ctx)
987993
if err != nil {
988-
return fmt.Errorf("failed to begin transaction: %w", err)
994+
return "", fmt.Errorf("failed to begin transaction: %w", err)
989995
}
990996
defer tx.Rollback(ctx)
991997

@@ -997,10 +1003,10 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) err
9971003
}
9981004
wfs, err := s.listWorkflows(ctx, listInput)
9991005
if err != nil {
1000-
return fmt.Errorf("failed to list workflows: %w", err)
1006+
return "", fmt.Errorf("failed to list workflows: %w", err)
10011007
}
10021008
if len(wfs) == 0 {
1003-
return newNonExistentWorkflowError(input.originalWorkflowID)
1009+
return "", newNonExistentWorkflowError(input.originalWorkflowID)
10041010
}
10051011

10061012
originalWorkflow := wfs[0]
@@ -1030,11 +1036,11 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) err
10301036

10311037
inputString, err := serialize(originalWorkflow.Input)
10321038
if err != nil {
1033-
return fmt.Errorf("failed to serialize input: %w", err)
1039+
return "", fmt.Errorf("failed to serialize input: %w", err)
10341040
}
10351041

10361042
_, err = tx.Exec(ctx, insertQuery,
1037-
input.forkedWorkflowID,
1043+
forkedWorkflowID,
10381044
WorkflowStatusEnqueued,
10391045
originalWorkflow.Name,
10401046
originalWorkflow.AuthenticatedUser,
@@ -1049,7 +1055,7 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) err
10491055
0)
10501056

10511057
if err != nil {
1052-
return fmt.Errorf("failed to insert forked workflow status: %w", err)
1058+
return "", fmt.Errorf("failed to insert forked workflow status: %w", err)
10531059
}
10541060

10551061
// If startStep > 0, copy the original workflow's outputs into the forked workflow
@@ -1060,17 +1066,17 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) err
10601066
FROM dbos.operation_outputs
10611067
WHERE workflow_uuid = $2 AND function_id < $3`
10621068

1063-
_, err = tx.Exec(ctx, copyOutputsQuery, input.forkedWorkflowID, input.originalWorkflowID, input.startStep)
1069+
_, err = tx.Exec(ctx, copyOutputsQuery, forkedWorkflowID, input.originalWorkflowID, input.startStep)
10641070
if err != nil {
1065-
return fmt.Errorf("failed to copy operation outputs: %w", err)
1071+
return "", fmt.Errorf("failed to copy operation outputs: %w", err)
10661072
}
10671073
}
10681074

10691075
if err := tx.Commit(ctx); err != nil {
1070-
return fmt.Errorf("failed to commit transaction: %w", err)
1076+
return "", fmt.Errorf("failed to commit transaction: %w", err)
10711077
}
10721078

1073-
return nil
1079+
return forkedWorkflowID, nil
10741080
}
10751081

10761082
func (s *sysDB) awaitWorkflowResult(ctx context.Context, workflowID string) (any, error) {

0 commit comments

Comments
 (0)