Skip to content

Commit 969d588

Browse files
PhakornKiongmaxdml
andauthored
helper to list workflow registration (#157)
Adds exported methods to list registered workflows and scheduled workflows with their registration parameters. Resolves #150 --------- Co-authored-by: Max dml <maxdml@dbos.dev>
1 parent e1922e1 commit 969d588

File tree

5 files changed

+178
-20
lines changed

5 files changed

+178
-20
lines changed

dbos/dbos.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,13 @@ type DBOSContext interface {
121121
GetStepID() (int, error) // Get the current step ID (only available within workflows)
122122

123123
// Workflow management
124-
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
125-
CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED
126-
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
127-
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
128-
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
129-
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
124+
RetrieveWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Get a handle to an existing workflow
125+
CancelWorkflow(_ DBOSContext, workflowID string) error // Cancel a workflow by setting its status to CANCELLED
126+
ResumeWorkflow(_ DBOSContext, workflowID string) (WorkflowHandle[any], error) // Resume a cancelled workflow
127+
ForkWorkflow(_ DBOSContext, input ForkWorkflowInput) (WorkflowHandle[any], error) // Fork a workflow from a specific step
128+
ListWorkflows(_ DBOSContext, opts ...ListWorkflowsOption) ([]WorkflowStatus, error) // List workflows based on filtering criteria
129+
GetWorkflowSteps(_ DBOSContext, workflowID string) ([]StepInfo, error) // Get the execution steps of a workflow
130+
ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) // List registered workflows with filtering options
130131

131132
// Accessors
132133
GetApplicationVersion() string // Get the application version for this context
@@ -159,7 +160,7 @@ type dbosContext struct {
159160
workflowsWg *sync.WaitGroup
160161

161162
// Workflow registry - read-mostly sync.Map since registration happens only before launch
162-
workflowRegistry *sync.Map // map[string]workflowRegistryEntry
163+
workflowRegistry *sync.Map // map[string]WorkflowRegistryEntry
163164
workflowCustomNametoFQN *sync.Map // Maps fully qualified workflow names to custom names. Usefor when client enqueues a workflow by name because registry is indexed by FQN.
164165

165166
// Workflow scheduler
@@ -275,6 +276,34 @@ func (c *dbosContext) GetApplicationID() string {
275276
return c.applicationID
276277
}
277278

279+
// ListRegisteredWorkflows returns information about registered workflows with their registration parameters.
280+
// Supports filtering using functional options.
281+
func (c *dbosContext) ListRegisteredWorkflows(_ DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) {
282+
// Initialize parameters with defaults
283+
params := &listRegisteredWorkflowsOptions{}
284+
285+
// Apply all provided options
286+
for _, opt := range opts {
287+
opt(params)
288+
}
289+
290+
// Get all registered workflows and apply filters
291+
var filteredWorkflows []WorkflowRegistryEntry
292+
c.workflowRegistry.Range(func(key, value interface{}) bool {
293+
workflow := value.(WorkflowRegistryEntry)
294+
295+
// Filter by scheduled only
296+
if params.scheduledOnly && workflow.CronSchedule == "" {
297+
return true
298+
}
299+
300+
filteredWorkflows = append(filteredWorkflows, workflow)
301+
return true
302+
})
303+
304+
return filteredWorkflows, nil
305+
}
306+
278307
// NewDBOSContext creates a new DBOS context with the provided configuration.
279308
// The context must be launched with Launch() for workflow execution and should be shut down with Shutdown().
280309
// This function initializes the DBOS system database, sets up the queue sub-system, and prepares the workflow registry.

dbos/queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (qr *queueRunner) run(ctx *dbosContext) {
221221
qr.logger.Error("workflow function not found in registry", "workflow_name", workflow.name)
222222
continue
223223
}
224-
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
224+
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
225225
if !ok {
226226
qr.logger.Error("invalid workflow registry entry type", "workflow_name", workflow.name)
227227
continue

dbos/recovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
4848
ctx.logger.Error("Workflow function not found in registry", "workflow_id", workflow.ID, "name", workflow.Name)
4949
continue
5050
}
51-
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
51+
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
5252
if !ok {
5353
ctx.logger.Error("invalid workflow registry entry type", "workflow_id", workflow.ID, "name", workflow.Name)
5454
continue

dbos/workflow.go

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -269,10 +269,12 @@ func (h *workflowHandleProxy[R]) GetWorkflowID() string {
269269
/**********************************/
270270
type wrappedWorkflowFunc func(ctx DBOSContext, input any, opts ...WorkflowOption) (WorkflowHandle[any], error)
271271

272-
type workflowRegistryEntry struct {
272+
type WorkflowRegistryEntry struct {
273273
wrappedFunction wrappedWorkflowFunc
274-
maxRetries int
275-
name string
274+
MaxRetries int
275+
Name string
276+
FQN string // Fully qualified name of the workflow function
277+
CronSchedule string // Empty string for non-scheduled workflows
276278
}
277279

278280
func registerWorkflow(ctx DBOSContext, workflowFQN string, fn wrappedWorkflowFunc, maxRetries int, customName string) {
@@ -287,10 +289,12 @@ func registerWorkflow(ctx DBOSContext, workflowFQN string, fn wrappedWorkflowFun
287289
}
288290

289291
// Check if workflow already exists and store atomically using LoadOrStore
290-
entry := workflowRegistryEntry{
292+
entry := WorkflowRegistryEntry{
291293
wrappedFunction: fn,
292-
maxRetries: maxRetries,
293-
name: customName,
294+
FQN: workflowFQN,
295+
MaxRetries: maxRetries,
296+
Name: customName,
297+
CronSchedule: "",
294298
}
295299

296300
if _, exists := c.workflowRegistry.LoadOrStore(workflowFQN, entry); exists {
@@ -321,6 +325,15 @@ func registerScheduledWorkflow(ctx DBOSContext, workflowName string, fn Workflow
321325
panic("Cannot register scheduled workflow after DBOS has launched")
322326
}
323327

328+
// Update the existing workflow entry with the cron schedule
329+
registryEntryAny, exists := c.workflowRegistry.Load(workflowName)
330+
if !exists {
331+
panic(fmt.Sprintf("workflow %s must be registered before scheduling", workflowName))
332+
}
333+
registryEntry := registryEntryAny.(WorkflowRegistryEntry)
334+
registryEntry.CronSchedule = cronSchedule
335+
c.workflowRegistry.Store(workflowName, registryEntry)
336+
324337
var entryID cron.EntryID
325338
entryID, err := c.getWorkflowScheduler().AddFunc(cronSchedule, func() {
326339
// Execute the workflow on the cron schedule once DBOS is launched
@@ -658,15 +671,15 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
658671
if !exists {
659672
return nil, newNonExistentWorkflowError(params.workflowName)
660673
}
661-
registeredWorkflow, ok := registeredWorkflowAny.(workflowRegistryEntry)
674+
registeredWorkflow, ok := registeredWorkflowAny.(WorkflowRegistryEntry)
662675
if !ok {
663676
return nil, fmt.Errorf("invalid workflow registry entry type for workflow %s", params.workflowName)
664677
}
665-
if registeredWorkflow.maxRetries > 0 {
666-
params.maxRetries = registeredWorkflow.maxRetries
678+
if registeredWorkflow.MaxRetries > 0 {
679+
params.maxRetries = registeredWorkflow.MaxRetries
667680
}
668-
if len(registeredWorkflow.name) > 0 {
669-
params.workflowName = registeredWorkflow.name
681+
if len(registeredWorkflow.Name) > 0 {
682+
params.workflowName = registeredWorkflow.Name
670683
}
671684

672685
// Check if we are within a workflow (and thus a child workflow)
@@ -1951,3 +1964,48 @@ func GetWorkflowSteps(ctx DBOSContext, workflowID string) ([]StepInfo, error) {
19511964
}
19521965
return ctx.GetWorkflowSteps(ctx, workflowID)
19531966
}
1967+
1968+
// listRegisteredWorkflowsOptions holds configuration parameters for listing registered workflows
1969+
type listRegisteredWorkflowsOptions struct {
1970+
scheduledOnly bool
1971+
}
1972+
1973+
// ListRegisteredWorkflowsOption is a functional option for configuring registered workflow listing parameters.
1974+
type ListRegisteredWorkflowsOption func(*listRegisteredWorkflowsOptions)
1975+
1976+
// WithScheduledOnly filters to only return scheduled workflows (those with a cron schedule).
1977+
func WithScheduledOnly() ListRegisteredWorkflowsOption {
1978+
return func(p *listRegisteredWorkflowsOptions) {
1979+
p.scheduledOnly = true
1980+
}
1981+
}
1982+
1983+
// ListRegisteredWorkflows returns information about workflows registered with DBOS.
1984+
// Each WorkflowRegistryEntry contains:
1985+
// - MaxRetries: Maximum number of retry attempts for workflow recovery
1986+
// - Name: Custom name if provided during registration, otherwise empty
1987+
// - FQN: Fully qualified name of the workflow function (always present)
1988+
// - CronSchedule: Empty string for non-scheduled workflows
1989+
//
1990+
// The function supports filtering using functional options:
1991+
// - WithScheduledOnly(): Return only scheduled workflows
1992+
//
1993+
// Example:
1994+
//
1995+
// // List all registered workflows
1996+
// workflows, err := dbos.ListRegisteredWorkflows(ctx)
1997+
// if err != nil {
1998+
// log.Fatal(err)
1999+
// }
2000+
//
2001+
// // List only scheduled workflows
2002+
// scheduled, err := dbos.ListRegisteredWorkflows(ctx, dbos.WithScheduledOnly())
2003+
// if err != nil {
2004+
// log.Fatal(err)
2005+
// }
2006+
func ListRegisteredWorkflows(ctx DBOSContext, opts ...ListRegisteredWorkflowsOption) ([]WorkflowRegistryEntry, error) {
2007+
if ctx == nil {
2008+
return nil, errors.New("ctx cannot be nil")
2009+
}
2010+
return ctx.ListRegisteredWorkflows(ctx, opts...)
2011+
}

dbos/workflows_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ func simpleWorkflowWithStepError(dbosCtx DBOSContext, input string) (string, err
4747
})
4848
}
4949

50+
func simpleWorkflowWithSchedule(dbosCtx DBOSContext, scheduledTime time.Time) (time.Time, error) {
51+
return scheduledTime, nil
52+
}
53+
5054
// idempotencyWorkflow increments a global counter and returns the input
5155
func incrementCounter(_ context.Context, value int64) (int64, error) {
5256
idempotencyCounter += value
@@ -4238,6 +4242,73 @@ func TestSpecialSteps(t *testing.T) {
42384242
require.Equal(t, "success", result, "workflow should return success")
42394243
})
42404244
}
4245+
4246+
func TestRegisteredWorkflowListing(t *testing.T) {
4247+
dbosCtx := setupDBOS(t, true, true)
4248+
4249+
// Register some regular workflows
4250+
RegisterWorkflow(dbosCtx, simpleWorkflow)
4251+
RegisterWorkflow(dbosCtx, simpleWorkflowError, WithMaxRetries(5))
4252+
RegisterWorkflow(dbosCtx, simpleWorkflowWithStep, WithWorkflowName("CustomStepWorkflow"))
4253+
RegisterWorkflow(dbosCtx, simpleWorkflowWithSchedule, WithWorkflowName("ScheduledWorkflow"), WithSchedule("0 0 * * * *"))
4254+
4255+
err := Launch(dbosCtx)
4256+
require.NoError(t, err, "failed to launch DBOS")
4257+
4258+
t.Run("ListRegisteredWorkflows", func(t *testing.T) {
4259+
workflows, err := ListRegisteredWorkflows(dbosCtx)
4260+
require.NoError(t, err, "ListRegisteredWorkflows should not return an error")
4261+
4262+
// Should have 4 workflows (3 regular + 1 scheduled)
4263+
require.GreaterOrEqual(t, len(workflows), 4, "Should have 4 registered workflows")
4264+
4265+
// Create a map for easier lookup
4266+
workflowMap := make(map[string]WorkflowRegistryEntry)
4267+
for _, wf := range workflows {
4268+
workflowMap[wf.FQN] = wf
4269+
}
4270+
4271+
// Check that simpleWorkflow is registered
4272+
simpleWorkflowFQN := runtime.FuncForPC(reflect.ValueOf(simpleWorkflow).Pointer()).Name()
4273+
simpleWf, exists := workflowMap[simpleWorkflowFQN]
4274+
require.True(t, exists, "simpleWorkflow should be registered")
4275+
require.Equal(t, _DEFAULT_MAX_RECOVERY_ATTEMPTS, simpleWf.MaxRetries, "simpleWorkflow should have default max retries")
4276+
require.Empty(t, simpleWf.CronSchedule, "simpleWorkflow should not have cron schedule")
4277+
4278+
// Check that simpleWorkflowError is registered with custom max retries
4279+
simpleWorkflowErrorFQN := runtime.FuncForPC(reflect.ValueOf(simpleWorkflowError).Pointer()).Name()
4280+
errorWf, exists := workflowMap[simpleWorkflowErrorFQN]
4281+
require.True(t, exists, "simpleWorkflowError should be registered")
4282+
require.Equal(t, 5, errorWf.MaxRetries, "simpleWorkflowError should have custom max retries")
4283+
require.Empty(t, errorWf.CronSchedule, "simpleWorkflowError should not have cron schedule")
4284+
4285+
// Check that custom named workflow is registered
4286+
customStepWorkflowFQN := runtime.FuncForPC(reflect.ValueOf(simpleWorkflowWithStep).Pointer()).Name()
4287+
customWf, exists := workflowMap[customStepWorkflowFQN]
4288+
require.True(t, exists, "CustomStepWorkflow should be found")
4289+
require.Equal(t, "CustomStepWorkflow", customWf.Name, "CustomStepWorkflow should have the correct name")
4290+
require.Empty(t, customWf.CronSchedule, "CustomStepWorkflow should not have cron schedule")
4291+
4292+
// Check that scheduled workflow is registered
4293+
scheduledWorkflowFQN := runtime.FuncForPC(reflect.ValueOf(simpleWorkflowWithSchedule).Pointer()).Name()
4294+
scheduledWf, exists := workflowMap[scheduledWorkflowFQN]
4295+
require.True(t, exists, "ScheduledWorkflow should be found")
4296+
require.Equal(t, "ScheduledWorkflow", scheduledWf.Name, "ScheduledWorkflow should have the correct name")
4297+
require.Equal(t, "0 0 * * * *", scheduledWf.CronSchedule, "ScheduledWorkflow should have the correct cron schedule")
4298+
})
4299+
4300+
t.Run("ListRegisteredWorkflowsWithScheduledOnly", func(t *testing.T) {
4301+
scheduledWorkflows, err := ListRegisteredWorkflows(dbosCtx, WithScheduledOnly())
4302+
require.NoError(t, err, "ListRegisteredWorkflows with WithScheduledOnly should not return an error")
4303+
require.Equal(t, 1, len(scheduledWorkflows), "Should have exactly 1 scheduled workflow")
4304+
4305+
entry := scheduledWorkflows[0]
4306+
scheduledWorkflowFQN := runtime.FuncForPC(reflect.ValueOf(simpleWorkflowWithSchedule).Pointer()).Name()
4307+
require.Equal(t, scheduledWorkflowFQN, entry.FQN, "ScheduledWorkflow should have the correct FQN")
4308+
require.Equal(t, "0 0 * * * *", entry.CronSchedule, "ScheduledWorkflow should have the correct cron schedule")
4309+
})
4310+
}
4311+
42414312
func TestWorkflowIdentity(t *testing.T) {
42424313
dbosCtx := setupDBOS(t, true, true)
42434314
RegisterWorkflow(dbosCtx, simpleWorkflow)

0 commit comments

Comments
 (0)