Skip to content

Commit 58df600

Browse files
committed
custom serializer
1 parent c1f41a6 commit 58df600

File tree

9 files changed

+419
-331
lines changed

9 files changed

+419
-331
lines changed

dbos/client_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
func TestEnqueue(t *testing.T) {
1616
// Setup server context - this will process tasks
17-
serverCtx := setupDBOS(t, true, true)
17+
serverCtx := setupDBOS(t, true, true, nil)
1818

1919
// Create queue for communication between client and server
2020
queue := NewWorkflowQueue(serverCtx, "client-enqueue-queue")
@@ -265,7 +265,7 @@ func TestCancelResume(t *testing.T) {
265265
var stepsCompleted int
266266

267267
// Setup server context - this will process tasks
268-
serverCtx := setupDBOS(t, true, true)
268+
serverCtx := setupDBOS(t, true, true, nil)
269269

270270
// Create queue for communication between client and server
271271
queue := NewWorkflowQueue(serverCtx, "cancel-resume-queue")
@@ -507,7 +507,7 @@ func TestForkWorkflow(t *testing.T) {
507507
)
508508

509509
// Setup server context - this will process tasks
510-
serverCtx := setupDBOS(t, true, true)
510+
serverCtx := setupDBOS(t, true, true, nil)
511511

512512
// Create queue for communication between client and server
513513
queue := NewWorkflowQueue(serverCtx, "fork-workflow-queue")
@@ -931,7 +931,7 @@ func TestListWorkflows(t *testing.T) {
931931

932932
func TestGetWorkflowSteps(t *testing.T) {
933933
// Setup server context
934-
serverCtx := setupDBOS(t, true, true)
934+
serverCtx := setupDBOS(t, true, true, nil)
935935

936936
// Create queue for communication
937937
queue := NewWorkflowQueue(serverCtx, "get-workflow-steps-queue")

dbos/dbos.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Config struct {
4040
ConductorAPIKey string // DBOS conductor API key (optional)
4141
ApplicationVersion string // Application version (optional, overridden by DBOS__APPVERSION env var)
4242
ExecutorID string // Executor ID (optional, overridden by DBOS__VMID env var)
43+
Serializer Serializer // Custom serializer for encoding/decoding workflow data (defaults to JSON)
4344
}
4445

4546
func processConfig(inputConfig *Config) (*Config, error) {
@@ -66,6 +67,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
6667
ApplicationVersion: inputConfig.ApplicationVersion,
6768
ExecutorID: inputConfig.ExecutorID,
6869
SystemDBPool: inputConfig.SystemDBPool,
70+
Serializer: inputConfig.Serializer,
6971
}
7072

7173
// Load defaults
@@ -75,6 +77,9 @@ func processConfig(inputConfig *Config) (*Config, error) {
7577
if dbosConfig.DatabaseSchema == "" {
7678
dbosConfig.DatabaseSchema = _DEFAULT_SYSTEM_DB_SCHEMA
7779
}
80+
if dbosConfig.Serializer == nil {
81+
dbosConfig.Serializer = NewJSONSerializer()
82+
}
7883

7984
// Override with environment variables if set
8085
if envAppVersion := os.Getenv("DBOS__APPVERSION"); envAppVersion != "" {
@@ -165,6 +170,9 @@ type dbosContext struct {
165170
// Workflow scheduler
166171
workflowScheduler *cron.Cron
167172

173+
// Serializer for encoding/decoding workflow data
174+
serializer Serializer
175+
168176
// logger
169177
logger *slog.Logger
170178
}
@@ -365,6 +373,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
365373
// Initialize global variables from processed config (already handles env vars and defaults)
366374
initExecutor.applicationVersion = config.ApplicationVersion
367375
initExecutor.executorID = config.ExecutorID
376+
initExecutor.serializer = config.Serializer
368377

369378
initExecutor.applicationID = os.Getenv("DBOS__APPID")
370379

@@ -373,6 +382,7 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
373382
databaseSchema: config.DatabaseSchema,
374383
customPool: config.SystemDBPool,
375384
logger: initExecutor.logger,
385+
serializer: config.Serializer,
376386
}
377387

378388
// Create the system database

dbos/queues_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func queueStep(_ context.Context, input string) (string, error) {
3232
}
3333

3434
func TestWorkflowQueues(t *testing.T) {
35-
dbosCtx := setupDBOS(t, true, true)
35+
dbosCtx := setupDBOS(t, true, true, nil)
3636

3737
queue := NewWorkflowQueue(dbosCtx, "test-queue")
3838
dlqEnqueueQueue := NewWorkflowQueue(dbosCtx, "test-successive-enqueue-queue")
@@ -465,7 +465,7 @@ func TestWorkflowQueues(t *testing.T) {
465465
}
466466

467467
func TestQueueRecovery(t *testing.T) {
468-
dbosCtx := setupDBOS(t, true, true)
468+
dbosCtx := setupDBOS(t, true, true, nil)
469469

470470
recoveryQueue := NewWorkflowQueue(dbosCtx, "recovery-queue")
471471
var recoveryStepCounter int64
@@ -568,7 +568,7 @@ func TestQueueRecovery(t *testing.T) {
568568

569569
// Note: we could update this test to have the same logic than TestWorkerConcurrency
570570
func TestGlobalConcurrency(t *testing.T) {
571-
dbosCtx := setupDBOS(t, true, true)
571+
dbosCtx := setupDBOS(t, true, true, nil)
572572

573573
globalConcurrencyQueue := NewWorkflowQueue(dbosCtx, "test-global-concurrency-queue", WithGlobalConcurrency(1))
574574
workflowEvent1 := NewEvent()
@@ -627,9 +627,9 @@ func TestGlobalConcurrency(t *testing.T) {
627627
func TestWorkerConcurrency(t *testing.T) {
628628
// Create two contexts that will represent 2 DBOS executors
629629
os.Setenv("DBOS__VMID", "worker1")
630-
dbosCtx1 := setupDBOS(t, true, true)
630+
dbosCtx1 := setupDBOS(t, true, true, nil)
631631
os.Setenv("DBOS__VMID", "worker2")
632-
dbosCtx2 := setupDBOS(t, false, false) // Don't check for leaks because t.Cancel is called in LIFO order. Also don't reset the DB here.
632+
dbosCtx2 := setupDBOS(t, false, false, nil) // Don't check for leaks because t.Cancel is called in LIFO order. Also don't reset the DB here.
633633
os.Unsetenv("DBOS__VMID")
634634

635635
assert.Equal(t, "worker1", dbosCtx1.GetExecutorID(), "expected first executor ID to be 'worker1'")
@@ -742,7 +742,7 @@ func TestWorkerConcurrency(t *testing.T) {
742742
}
743743

744744
func TestWorkerConcurrencyXRecovery(t *testing.T) {
745-
dbosCtx := setupDBOS(t, true, true)
745+
dbosCtx := setupDBOS(t, true, true, nil)
746746

747747
workerConcurrencyRecoveryQueue := NewWorkflowQueue(dbosCtx, "test-worker-concurrency-recovery-queue", WithWorkerConcurrency(1))
748748
workerConcurrencyRecoveryStartEvent1 := NewEvent()
@@ -831,7 +831,7 @@ func rateLimiterTestWorkflow(ctx DBOSContext, _ string) (time.Time, error) {
831831
}
832832

833833
func TestQueueRateLimiter(t *testing.T) {
834-
dbosCtx := setupDBOS(t, true, true)
834+
dbosCtx := setupDBOS(t, true, true, nil)
835835

836836
rateLimiterQueue := NewWorkflowQueue(dbosCtx, "test-rate-limiter-queue", WithRateLimiter(&RateLimiter{Limit: 5, Period: time.Duration(1800 * time.Millisecond)}))
837837

@@ -914,7 +914,7 @@ func TestQueueRateLimiter(t *testing.T) {
914914
}
915915

916916
func TestQueueTimeouts(t *testing.T) {
917-
dbosCtx := setupDBOS(t, true, true)
917+
dbosCtx := setupDBOS(t, true, true, nil)
918918

919919
timeoutQueue := NewWorkflowQueue(dbosCtx, "timeout-queue")
920920

@@ -1137,7 +1137,7 @@ func TestQueueTimeouts(t *testing.T) {
11371137
}
11381138

11391139
func TestPriorityQueue(t *testing.T) {
1140-
dbosCtx := setupDBOS(t, true, true)
1140+
dbosCtx := setupDBOS(t, true, true, nil)
11411141

11421142
// Create priority-enabled queue with max concurrency of 1
11431143
priorityQueue := NewWorkflowQueue(dbosCtx, "test_queue_priority", WithGlobalConcurrency(1), WithPriorityEnabled())
@@ -1232,7 +1232,7 @@ func TestPriorityQueue(t *testing.T) {
12321232
}
12331233

12341234
func TestListQueuedWorkflows(t *testing.T) {
1235-
dbosCtx := setupDBOS(t, true, true)
1235+
dbosCtx := setupDBOS(t, true, true, nil)
12361236

12371237
// Simple test workflow that completes immediately
12381238
testWorkflow := func(ctx DBOSContext, input string) (string, error) {

dbos/serialization.go

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,37 @@ import (
44
"bytes"
55
"encoding/base64"
66
"encoding/gob"
7+
"encoding/json"
78
"fmt"
89
"log/slog"
10+
"reflect"
911
"strings"
1012
)
1113

12-
func serialize(data any) (string, error) {
14+
// Serializer defines the interface for pluggable serializers.
15+
// Encode and Decode are called during database storage and retrieval respectively.
16+
type Serializer interface {
17+
// Encode serializes data to a string for database storage
18+
Encode(data any) (string, error)
19+
// Decode deserializes data from a string
20+
Decode(data *string) (any, error)
21+
}
22+
23+
// GobSerializer implements Serializer using encoding/gob
24+
type GobSerializer struct{}
25+
26+
func NewGobSerializer() *GobSerializer {
27+
return &GobSerializer{}
28+
}
29+
30+
// Encode serializes data using gob and encodes it to a base64 string
31+
// Performs "lazy" registration of the type with gob
32+
func (g *GobSerializer) Encode(data any) (string, error) {
1333
var inputBytes []byte
14-
if data != nil {
34+
if !isNilValue(data) {
35+
// Register the type with gob for proper serialization
36+
safeGobRegister(data, nil)
37+
1538
var buf bytes.Buffer
1639
enc := gob.NewEncoder(&buf)
1740
if err := enc.Encode(&data); err != nil {
@@ -22,7 +45,8 @@ func serialize(data any) (string, error) {
2245
return base64.StdEncoding.EncodeToString(inputBytes), nil
2346
}
2447

25-
func deserialize(data *string) (any, error) {
48+
// Decode deserializes data from a base64 string using gob
49+
func (g *GobSerializer) Decode(data *string) (any, error) {
2650
if data == nil || *data == "" {
2751
return nil, nil
2852
}
@@ -42,6 +66,73 @@ func deserialize(data *string) (any, error) {
4266
return result, nil
4367
}
4468

69+
// JSONSerializer implements Serializer using encoding/json
70+
type JSONSerializer struct{}
71+
72+
func NewJSONSerializer() *JSONSerializer {
73+
return &JSONSerializer{}
74+
}
75+
76+
func (j *JSONSerializer) Encode(data any) (string, error) {
77+
var inputBytes []byte
78+
if !isNilValue(data) {
79+
jsonBytes, err := json.Marshal(data)
80+
if err != nil {
81+
return "", fmt.Errorf("failed to marshal data to JSON: %w", err)
82+
}
83+
inputBytes = jsonBytes
84+
}
85+
return base64.StdEncoding.EncodeToString(inputBytes), nil
86+
}
87+
88+
func (j *JSONSerializer) Decode(data *string) (any, error) {
89+
if data == nil || *data == "" {
90+
return nil, nil
91+
}
92+
93+
dataBytes, err := base64.StdEncoding.DecodeString(*data)
94+
if err != nil {
95+
return nil, fmt.Errorf("failed to decode data: %w", err)
96+
}
97+
98+
var result any
99+
if err := json.Unmarshal(dataBytes, &result); err != nil {
100+
return nil, fmt.Errorf("failed to unmarshal JSON data: %w", err)
101+
}
102+
103+
return result, nil
104+
}
105+
106+
// serialize serializes data using the serializer from the DBOSContext
107+
// this is only use in workflow handles
108+
func serialize(ctx DBOSContext, data any) (string, error) {
109+
dbosCtx, ok := ctx.(*dbosContext)
110+
if !ok {
111+
return "", fmt.Errorf("invalid DBOSContext: expected *dbosContext")
112+
}
113+
if dbosCtx.serializer == nil {
114+
return "", fmt.Errorf("no serializer configured in DBOSContext")
115+
}
116+
return dbosCtx.serializer.Encode(data)
117+
}
118+
119+
// Handle cases where the provided data interface wraps a nil value (e.g., var p *int; data := any(p). data != nil but the underlying value is nil)
120+
func isNilValue(data any) bool {
121+
if data == nil {
122+
return true
123+
}
124+
v := reflect.ValueOf(data)
125+
// Check if the value is invalid (zero Value from reflect)
126+
if !v.IsValid() {
127+
return true
128+
}
129+
switch v.Kind() {
130+
case reflect.Pointer, reflect.Slice, reflect.Map, reflect.Interface:
131+
return v.IsNil()
132+
}
133+
return false
134+
}
135+
45136
// safeGobRegister attempts to register a type with gob, recovering only from
46137
// panics caused by duplicate type/name registrations (e.g., registering both T and *T).
47138
// These specific conflicts don't affect encoding/decoding correctness, so they're safe to ignore.

0 commit comments

Comments
 (0)