Skip to content

Commit 846b6bd

Browse files
authored
Add client GetWorkflowSteps (#167)
- Does not load step outputs - This requires launching the context in a few tests that check step outputs - fix: propagate `launched` to children `DBOSContext`.
1 parent 3a00f5a commit 846b6bd

File tree

7 files changed

+320
-214
lines changed

7 files changed

+320
-214
lines changed

dbos/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type Client interface {
3131
CancelWorkflow(workflowID string) error
3232
ResumeWorkflow(workflowID string) (WorkflowHandle[any], error)
3333
ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], error)
34+
GetWorkflowSteps(workflowID string) ([]StepInfo, error)
3435
Shutdown(timeout time.Duration) // Simply close the system DB connection pool
3536
}
3637

@@ -295,6 +296,11 @@ func (c *client) ForkWorkflow(input ForkWorkflowInput) (WorkflowHandle[any], err
295296
return c.dbosCtx.ForkWorkflow(c.dbosCtx, input)
296297
}
297298

299+
// GetWorkflowSteps retrieves the execution steps of a workflow.
300+
func (c *client) GetWorkflowSteps(workflowID string) ([]StepInfo, error) {
301+
return c.dbosCtx.GetWorkflowSteps(c.dbosCtx, workflowID)
302+
}
303+
298304
// Shutdown gracefully shuts down the client and closes the system database connection.
299305
func (c *client) Shutdown(timeout time.Duration) {
300306
// Get the concrete dbosContext to access internal fields

dbos/client_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,3 +928,69 @@ func TestListWorkflows(t *testing.T) {
928928
// Verify all queue entries are cleaned up
929929
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after list workflows tests")
930930
}
931+
932+
func TestGetWorkflowSteps(t *testing.T) {
933+
// Setup server context
934+
serverCtx := setupDBOS(t, true, true)
935+
936+
// Create queue for communication
937+
queue := NewWorkflowQueue(serverCtx, "get-workflow-steps-queue")
938+
939+
// Workflow with one step
940+
stepFunction := func(ctx context.Context) (string, error) {
941+
return "abc", nil
942+
}
943+
944+
testWorkflow := func(ctx DBOSContext, input string) (string, error) {
945+
result, err := RunAsStep(ctx, stepFunction, WithStepName("TestStep"))
946+
if err != nil {
947+
return "", err
948+
}
949+
return result, nil
950+
}
951+
RegisterWorkflow(serverCtx, testWorkflow, WithWorkflowName("TestWorkflow"))
952+
953+
// Launch server
954+
err := Launch(serverCtx)
955+
require.NoError(t, err)
956+
957+
// Setup client
958+
databaseURL := getDatabaseURL()
959+
config := ClientConfig{
960+
DatabaseURL: databaseURL,
961+
}
962+
client, err := NewClient(context.Background(), config)
963+
require.NoError(t, err)
964+
t.Cleanup(func() {
965+
if client != nil {
966+
client.Shutdown(30 * time.Second)
967+
}
968+
})
969+
970+
// Enqueue and run the workflow
971+
workflowID := "test-get-workflow-steps"
972+
handle, err := Enqueue[string, string](client, queue.Name, "TestWorkflow", "test-input", WithEnqueueWorkflowID(workflowID))
973+
require.NoError(t, err)
974+
975+
// Wait for workflow to complete
976+
result, err := handle.GetResult()
977+
require.NoError(t, err)
978+
assert.Equal(t, "abc", result)
979+
980+
// Test GetWorkflowSteps with loadOutput = true
981+
stepsWithOutput, err := client.GetWorkflowSteps(workflowID)
982+
require.NoError(t, err)
983+
require.Len(t, stepsWithOutput, 1, "expected exactly 1 step")
984+
985+
step := stepsWithOutput[0]
986+
assert.Equal(t, 0, step.StepID, "expected step ID to be 0")
987+
assert.Equal(t, "TestStep", step.StepName, "expected step name to be set")
988+
assert.Nil(t, step.Error, "expected no error in step")
989+
assert.Equal(t, "", step.ChildWorkflowID, "expected no child workflow ID")
990+
991+
// Verify the output wasn't loaded
992+
require.Nil(t, step.Output, "expected output not to be loaded")
993+
994+
// Verify all queue entries are cleaned up
995+
require.True(t, queueEntriesAreCleanedUp(serverCtx), "expected queue entries to be cleaned up after get workflow steps test")
996+
}

dbos/dbos.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
194194
}
195195
// Will do nothing if the concrete type is not dbosContext
196196
if dbosCtx, ok := ctx.(*dbosContext); ok {
197-
return &dbosContext{
197+
launched := dbosCtx.launched.Load()
198+
childCtx := &dbosContext{
198199
ctx: context.WithValue(dbosCtx.ctx, key, val), // Spawn a new child context with the value set
199200
logger: dbosCtx.logger,
200201
systemDB: dbosCtx.systemDB,
@@ -205,6 +206,8 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
205206
executorID: dbosCtx.executorID,
206207
applicationID: dbosCtx.applicationID,
207208
}
209+
childCtx.launched.Store(launched)
210+
return childCtx
208211
}
209212
return nil
210213
}
@@ -217,7 +220,10 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
217220
return nil
218221
}
219222
if dbosCtx, ok := ctx.(*dbosContext); ok {
220-
return &dbosContext{
223+
launched := dbosCtx.launched.Load()
224+
// Create a new context that is not canceled when the parent is canceled
225+
// but retains all other values
226+
childCtx := &dbosContext{
221227
ctx: context.WithoutCancel(dbosCtx.ctx),
222228
logger: dbosCtx.logger,
223229
systemDB: dbosCtx.systemDB,
@@ -228,6 +234,8 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
228234
executorID: dbosCtx.executorID,
229235
applicationID: dbosCtx.applicationID,
230236
}
237+
childCtx.launched.Store(launched)
238+
return childCtx
231239
}
232240
return nil
233241
}
@@ -240,8 +248,9 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
240248
return nil, func() {}
241249
}
242250
if dbosCtx, ok := ctx.(*dbosContext); ok {
251+
launched := dbosCtx.launched.Load()
243252
newCtx, cancelFunc := context.WithTimeoutCause(dbosCtx.ctx, timeout, errors.New("DBOS context timeout"))
244-
return &dbosContext{
253+
childCtx := &dbosContext{
245254
ctx: newCtx,
246255
logger: dbosCtx.logger,
247256
systemDB: dbosCtx.systemDB,
@@ -251,7 +260,9 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
251260
applicationVersion: dbosCtx.applicationVersion,
252261
executorID: dbosCtx.executorID,
253262
applicationID: dbosCtx.applicationID,
254-
}, cancelFunc
263+
}
264+
childCtx.launched.Store(launched)
265+
return childCtx, cancelFunc
255266
}
256267
return nil, func() {}
257268
}

dbos/serialization_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,6 @@ import (
1111
"github.com/stretchr/testify/require"
1212
)
1313

14-
/** Test serialization and deserialization
15-
[x] Built in types
16-
[x] User defined types (structs)
17-
[x] Workflow inputs/outputs
18-
[x] Step inputs/outputs
19-
[x] Direct handlers, polling handler, list workflows results, get step infos
20-
[x] Set/get event with user defined types
21-
*/
22-
2314
// Builtin types
2415
func encodingStepBuiltinTypes(_ context.Context, input int) (int, error) {
2516
return input, errors.New("step error")
@@ -76,6 +67,9 @@ func TestWorkflowEncoding(t *testing.T) {
7667
RegisterWorkflow(executor, encodingWorkflowBuiltinTypes)
7768
RegisterWorkflow(executor, encodingWorkflowStruct)
7869

70+
err := Launch(executor)
71+
require.NoError(t, err)
72+
7973
t.Run("BuiltinTypes", func(t *testing.T) {
8074
// Test a workflow that uses a built-in type (string)
8175
directHandle, err := RunWorkflow(executor, encodingWorkflowBuiltinTypes, "test")

dbos/system_database.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type systemDatabase interface {
5151
// Steps
5252
recordOperationResult(ctx context.Context, input recordOperationResultDBInput) error
5353
checkOperationExecution(ctx context.Context, input checkOperationExecutionDBInput) (*recordedResult, error)
54-
getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error)
54+
getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]StepInfo, error)
5555

5656
// Communication (special steps)
5757
send(ctx context.Context, input WorkflowSendInput) error
@@ -1457,13 +1457,18 @@ type StepInfo struct {
14571457
ChildWorkflowID string // The ID of a child workflow spawned by this step (if applicable)
14581458
}
14591459

1460-
func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]StepInfo, error) {
1460+
type getWorkflowStepsInput struct {
1461+
workflowID string
1462+
loadOutput bool
1463+
}
1464+
1465+
func (s *sysDB) getWorkflowSteps(ctx context.Context, input getWorkflowStepsInput) ([]StepInfo, error) {
14611466
query := fmt.Sprintf(`SELECT function_id, function_name, output, error, child_workflow_id
14621467
FROM %s.operation_outputs
14631468
WHERE workflow_uuid = $1
14641469
ORDER BY function_id ASC`, pgx.Identifier{s.schema}.Sanitize())
14651470

1466-
rows, err := s.pool.Query(ctx, query, workflowID)
1471+
rows, err := s.pool.Query(ctx, query, input.workflowID)
14671472
if err != nil {
14681473
return nil, fmt.Errorf("failed to query workflow steps: %w", err)
14691474
}
@@ -1481,8 +1486,8 @@ func (s *sysDB) getWorkflowSteps(ctx context.Context, workflowID string) ([]Step
14811486
return nil, fmt.Errorf("failed to scan step row: %w", err)
14821487
}
14831488

1484-
// Deserialize output if present
1485-
if outputString != nil {
1489+
// Deserialize output if present and loadOutput is true
1490+
if input.loadOutput && outputString != nil {
14861491
output, err := deserialize(outputString)
14871492
if err != nil {
14881493
return nil, fmt.Errorf("failed to deserialize output: %w", err)

dbos/workflow.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,14 +1956,25 @@ func ListWorkflows(ctx DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStat
19561956
}
19571957

19581958
func (c *dbosContext) GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) {
1959+
var loadOutput bool
1960+
if c.launched.Load() {
1961+
loadOutput = true
1962+
} else {
1963+
loadOutput = false
1964+
}
1965+
getWorkflowStepsInput := getWorkflowStepsInput{
1966+
workflowID: workflowID,
1967+
loadOutput: loadOutput,
1968+
}
1969+
19591970
workflowState, ok := c.Value(workflowStateKey).(*workflowState)
19601971
isWithinWorkflow := ok && workflowState != nil
19611972
if isWithinWorkflow {
19621973
return RunAsStep(c, func(ctx context.Context) ([]StepInfo, error) {
1963-
return c.systemDB.getWorkflowSteps(ctx, workflowID)
1974+
return c.systemDB.getWorkflowSteps(ctx, getWorkflowStepsInput)
19641975
}, WithStepName("DBOS.getWorkflowSteps"))
19651976
} else {
1966-
return c.systemDB.getWorkflowSteps(c, workflowID)
1977+
return c.systemDB.getWorkflowSteps(c, getWorkflowStepsInput)
19671978
}
19681979
}
19691980

0 commit comments

Comments
 (0)