Skip to content

Commit c953ff2

Browse files
authored
Fix: Workflow Identity (#144)
Fixes #92
1 parent e489a42 commit c953ff2

File tree

3 files changed

+89
-10
lines changed

3 files changed

+89
-10
lines changed

dbos/system_database.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dbos
33
import (
44
"context"
55
_ "embed"
6+
"encoding/json"
67
"errors"
78
"fmt"
89
"io"
@@ -485,14 +486,22 @@ func (s *sysDB) insertWorkflowStatus(ctx context.Context, input insertWorkflowSt
485486
var result insertWorkflowResult
486487
var timeoutMSResult *int64
487488
var workflowDeadlineEpochMS *int64
489+
490+
// Marshal authenticated roles (slice of strings) to JSON for TEXT column
491+
authenticatedRoles, err := json.Marshal(input.status.AuthenticatedRoles)
492+
493+
if err != nil {
494+
return nil, fmt.Errorf("failed to marshal the authenticated roles: %w", err)
495+
}
496+
488497
err = input.tx.QueryRow(ctx, query,
489498
input.status.ID,
490499
input.status.Status,
491500
input.status.Name,
492501
input.status.QueueName,
493502
input.status.AuthenticatedUser,
494503
input.status.AssumedRole,
495-
input.status.AuthenticatedRoles,
504+
authenticatedRoles,
496505
input.status.ExecutorID,
497506
applicationVersion,
498507
input.status.ApplicationID,
@@ -705,11 +714,12 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
705714
var deduplicationID *string
706715
var applicationVersion *string
707716
var executorID *string
717+
var authenticatedRoles *string
708718

709719
// Build scan arguments dynamically based on loaded columns
710720
scanArgs := []any{
711721
&wf.ID, &wf.Status, &wf.Name, &wf.AuthenticatedUser, &wf.AssumedRole,
712-
&wf.AuthenticatedRoles, &executorID, &createdAtMs,
722+
&authenticatedRoles, &executorID, &createdAtMs,
713723
&updatedAtMs, &applicationVersion, &wf.ApplicationID,
714724
&wf.Attempts, &queueName, &timeoutMs,
715725
&deadlineMs, &startedAtMs, &deduplicationID, &wf.Priority,
@@ -727,6 +737,12 @@ func (s *sysDB) listWorkflows(ctx context.Context, input listWorkflowsDBInput) (
727737
return nil, fmt.Errorf("failed to scan workflow row: %w", err)
728738
}
729739

740+
if authenticatedRoles != nil && *authenticatedRoles != "" {
741+
if err := json.Unmarshal([]byte(*authenticatedRoles), &wf.AuthenticatedRoles); err != nil {
742+
return nil, fmt.Errorf("failed to unmarshal authenticated_roles: %w", err)
743+
}
744+
}
745+
730746
if queueName != nil && len(*queueName) > 0 {
731747
wf.QueueName = *queueName
732748
}
@@ -1088,13 +1104,20 @@ func (s *sysDB) forkWorkflow(ctx context.Context, input forkWorkflowDBInput) (st
10881104
return "", fmt.Errorf("failed to serialize input: %w", err)
10891105
}
10901106

1107+
// Marshal authenticated roles (slice of strings) to JSON for TEXT column
1108+
authenticatedRoles, err := json.Marshal(originalWorkflow.AuthenticatedRoles)
1109+
1110+
if err != nil {
1111+
return "", fmt.Errorf("failed to marshal the authenticated roles: %w", err)
1112+
}
1113+
10911114
_, err = tx.Exec(ctx, insertQuery,
10921115
forkedWorkflowID,
10931116
WorkflowStatusEnqueued,
10941117
originalWorkflow.Name,
10951118
originalWorkflow.AuthenticatedUser,
10961119
originalWorkflow.AssumedRole,
1097-
originalWorkflow.AuthenticatedRoles,
1120+
authenticatedRoles,
10981121
&appVersion,
10991122
originalWorkflow.ApplicationID,
11001123
_DBOS_INTERNAL_QUEUE_NAME,

dbos/workflow.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -488,13 +488,16 @@ type Workflow[P any, R any] func(ctx DBOSContext, input P) (R, error)
488488
type WorkflowFunc func(ctx DBOSContext, input any) (any, error)
489489

490490
type workflowOptions struct {
491-
workflowName string
492-
workflowID string
493-
queueName string
494-
applicationVersion string
495-
maxRetries int
496-
deduplicationID string
497-
priority uint
491+
workflowName string
492+
workflowID string
493+
queueName string
494+
applicationVersion string
495+
maxRetries int
496+
deduplicationID string
497+
priority uint
498+
authenticated_user string
499+
assumed_role string
500+
authenticated_roles []string
498501
}
499502

500503
// WorkflowOption is a functional option for configuring workflow execution parameters.
@@ -544,6 +547,27 @@ func withWorkflowName(name string) WorkflowOption {
544547
}
545548
}
546549

550+
// Sets the authenticated user for the workflow
551+
func WithAuthenticatedUser(user string) WorkflowOption {
552+
return func(p *workflowOptions) {
553+
p.authenticated_user = user
554+
}
555+
}
556+
557+
// Sets the assumed role for the workflow
558+
func WithAssumedRole(role string) WorkflowOption {
559+
return func(p *workflowOptions) {
560+
p.assumed_role = role
561+
}
562+
}
563+
564+
// Sets the authenticated role for the workflow
565+
func WithAuthenticatedRoles(roles []string) WorkflowOption {
566+
return func(p *workflowOptions) {
567+
p.authenticated_roles = roles
568+
}
569+
}
570+
547571
// RunWorkflow executes a workflow function with type safety and durability guarantees.
548572
// The workflow can be executed immediately or enqueued for later execution based on options.
549573
// Returns a typed handle that can be used to wait for completion and retrieve results.
@@ -730,6 +754,9 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
730754
QueueName: params.queueName,
731755
DeduplicationID: params.deduplicationID,
732756
Priority: int(params.priority),
757+
AuthenticatedUser: params.authenticated_user,
758+
AssumedRole: params.assumed_role,
759+
AuthenticatedRoles: params.authenticated_roles,
733760
}
734761

735762
var earlyReturnPollingHandle *workflowPollingHandle[any]

dbos/workflows_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4131,3 +4131,32 @@ func TestSpecialSteps(t *testing.T) {
41314131
require.Equal(t, "success", result, "workflow should return success")
41324132
})
41334133
}
4134+
func TestWorkflowIdentity(t *testing.T) {
4135+
dbosCtx := setupDBOS(t, true, true)
4136+
RegisterWorkflow(dbosCtx, simpleWorkflow)
4137+
handle, err := RunWorkflow(
4138+
dbosCtx,
4139+
simpleWorkflow,
4140+
"test",
4141+
WithWorkflowID("my-workflow-id"),
4142+
WithAuthenticatedUser("user123"),
4143+
WithAssumedRole("admin"),
4144+
WithAuthenticatedRoles([]string{"reader", "writer"}))
4145+
require.NoError(t, err, "failed to start workflow")
4146+
4147+
// Retrieve the workflow's status.
4148+
status, err := handle.GetStatus()
4149+
require.NoError(t, err)
4150+
4151+
t.Run("CheckAuthenticatedUser", func(t *testing.T) {
4152+
assert.Equal(t, "user123", status.AuthenticatedUser)
4153+
})
4154+
4155+
t.Run("CheckAssumedRole", func(t *testing.T) {
4156+
assert.Equal(t, "admin", status.AssumedRole)
4157+
})
4158+
4159+
t.Run("CheckAuthenticatedRoles", func(t *testing.T) {
4160+
assert.Equal(t, []string{"reader", "writer"}, status.AuthenticatedRoles)
4161+
})
4162+
}

0 commit comments

Comments
 (0)