Skip to content

Commit 3a00f5a

Browse files
authored
handle duplicate gob registration (#166)
fix #164
1 parent 969d588 commit 3a00f5a

File tree

5 files changed

+247
-16
lines changed

5 files changed

+247
-16
lines changed

dbos/client.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package dbos
22

33
import (
44
"context"
5-
"encoding/gob"
65
"errors"
76
"fmt"
87
"log/slog"
@@ -241,10 +240,16 @@ func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, op
241240
}
242241

243242
// Register the input and outputs for gob encoding
243+
var logger *slog.Logger
244+
if cl, ok := c.(*client); ok {
245+
if ctx, ok := cl.dbosCtx.(*dbosContext); ok {
246+
logger = ctx.logger
247+
}
248+
}
244249
var typedInput P
245-
gob.Register(typedInput)
250+
safeGobRegister(typedInput, logger)
246251
var typedOutput R
247-
gob.Register(typedOutput)
252+
safeGobRegister(typedOutput, logger)
248253

249254
// Call the interface method with the same signature
250255
handle, err := c.Enqueue(queueName, workflowName, input, opts...)

dbos/dbos.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package dbos
33
import (
44
"context"
55
"crypto/sha256"
6-
"encoding/gob"
76
"encoding/hex"
87
"errors"
98
"fmt"
@@ -346,11 +345,11 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
346345

347346
// Register types we serialize with gob
348347
var t time.Time
349-
gob.Register(t)
348+
safeGobRegister(t, initExecutor.logger)
350349
var ws []WorkflowStatus
351-
gob.Register(ws)
350+
safeGobRegister(ws, initExecutor.logger)
352351
var si []StepInfo
353-
gob.Register(si)
352+
safeGobRegister(si, initExecutor.logger)
354353

355354
// Initialize global variables from processed config (already handles env vars and defaults)
356355
initExecutor.applicationVersion = config.ApplicationVersion

dbos/serialization.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"encoding/base64"
66
"encoding/gob"
77
"fmt"
8+
"log/slog"
9+
"strings"
810
)
911

1012
func serialize(data any) (string, error) {
@@ -39,3 +41,28 @@ func deserialize(data *string) (any, error) {
3941

4042
return result, nil
4143
}
44+
45+
// safeGobRegister attempts to register a type with gob, recovering only from
46+
// panics caused by duplicate type/name registrations (e.g., registering both T and *T).
47+
// These specific conflicts don't affect encoding/decoding correctness, so they're safe to ignore.
48+
// Other panics (like register `any`) are real errors and will propagate.
49+
func safeGobRegister(value any, logger *slog.Logger) {
50+
defer func() {
51+
if r := recover(); r != nil {
52+
if errStr, ok := r.(string); ok {
53+
// Check if this is one of the two specific duplicate registration errors we want to ignore
54+
// See https://cs.opensource.google/go/go/+/refs/tags/go1.25.1:src/encoding/gob/type.go;l=832
55+
if strings.Contains(errStr, "gob: registering duplicate types for") ||
56+
strings.Contains(errStr, "gob: registering duplicate names for") {
57+
if logger != nil {
58+
logger.Debug("gob registration conflict", "type", fmt.Sprintf("%T", value), "error", r)
59+
}
60+
return
61+
}
62+
}
63+
// Re-panic for any other errors
64+
panic(r)
65+
}
66+
}()
67+
gob.Register(value)
68+
}

dbos/workflow.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package dbos
22

33
import (
44
"context"
5-
"encoding/gob"
65
"errors"
76
"fmt"
7+
"log/slog"
88
"math"
99
"reflect"
1010
"runtime"
@@ -450,10 +450,14 @@ func RegisterWorkflow[P any, R any](ctx DBOSContext, fn Workflow[P, R], opts ...
450450
fqn := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
451451

452452
// Registry the input/output types for gob encoding
453+
var logger *slog.Logger
454+
if c, ok := ctx.(*dbosContext); ok {
455+
logger = c.logger
456+
}
453457
var p P
454458
var r R
455-
gob.Register(p)
456-
gob.Register(r)
459+
safeGobRegister(p, logger)
460+
safeGobRegister(r, logger)
457461

458462
// Register a type-erased version of the durable workflow for recovery
459463
typedErasedWorkflow := WorkflowFunc(func(ctx DBOSContext, input any) (any, error) {
@@ -1054,8 +1058,12 @@ func RunAsStep[R any](ctx DBOSContext, fn Step[R], opts ...StepOption) (R, error
10541058
}
10551059

10561060
// Register the output type for gob encoding
1061+
var logger *slog.Logger
1062+
if c, ok := ctx.(*dbosContext); ok {
1063+
logger = c.logger
1064+
}
10571065
var r R
1058-
gob.Register(r)
1066+
safeGobRegister(r, logger)
10591067

10601068
// Append WithStepName option to ensure the step name is set. This will not erase a user-provided step name
10611069
stepName := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
@@ -1218,8 +1226,12 @@ func Send[P any](ctx DBOSContext, destinationID string, message P, topic string)
12181226
if ctx == nil {
12191227
return errors.New("ctx cannot be nil")
12201228
}
1229+
var logger *slog.Logger
1230+
if c, ok := ctx.(*dbosContext); ok {
1231+
logger = c.logger
1232+
}
12211233
var typedMessage P
1222-
gob.Register(typedMessage)
1234+
safeGobRegister(typedMessage, logger)
12231235
return ctx.Send(ctx, destinationID, message, topic)
12241236
}
12251237

@@ -1295,8 +1307,12 @@ func SetEvent[P any](ctx DBOSContext, key string, message P) error {
12951307
if ctx == nil {
12961308
return errors.New("ctx cannot be nil")
12971309
}
1310+
var logger *slog.Logger
1311+
if c, ok := ctx.(*dbosContext); ok {
1312+
logger = c.logger
1313+
}
12981314
var typedMessage P
1299-
gob.Register(typedMessage)
1315+
safeGobRegister(typedMessage, logger)
13001316
return ctx.SetEvent(ctx, key, message)
13011317
}
13021318

@@ -1489,8 +1505,12 @@ func RetrieveWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle
14891505
}
14901506

14911507
// Register the output for gob encoding
1508+
var logger *slog.Logger
1509+
if c, ok := ctx.(*dbosContext); ok {
1510+
logger = c.logger
1511+
}
14921512
var r R
1493-
gob.Register(r)
1513+
safeGobRegister(r, logger)
14941514

14951515
// Call the interface method
14961516
handle, err := ctx.RetrieveWorkflow(ctx, workflowID)
@@ -1583,8 +1603,12 @@ func ResumeWorkflow[R any](ctx DBOSContext, workflowID string) (WorkflowHandle[R
15831603
}
15841604

15851605
// Register the output for gob encoding
1606+
var logger *slog.Logger
1607+
if c, ok := ctx.(*dbosContext); ok {
1608+
logger = c.logger
1609+
}
15861610
var r R
1587-
gob.Register(r)
1611+
safeGobRegister(r, logger)
15881612

15891613
_, err := ctx.ResumeWorkflow(ctx, workflowID)
15901614
if err != nil {
@@ -1675,8 +1699,12 @@ func ForkWorkflow[R any](ctx DBOSContext, input ForkWorkflowInput) (WorkflowHand
16751699
}
16761700

16771701
// Register the output for gob encoding
1702+
var logger *slog.Logger
1703+
if c, ok := ctx.(*dbosContext); ok {
1704+
logger = c.logger
1705+
}
16781706
var r R
1679-
gob.Register(r)
1707+
safeGobRegister(r, logger)
16801708

16811709
handle, err := ctx.ForkWorkflow(ctx, input)
16821710
if err != nil {

dbos/workflows_test.go

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"reflect"
88
"runtime"
9+
"strings"
910
"sync"
1011
"sync/atomic"
1112
"testing"
@@ -357,6 +358,177 @@ func TestWorkflowsRegistration(t *testing.T) {
357358
}()
358359
RegisterWorkflow(freshCtx, simpleWorkflow)
359360
})
361+
362+
t.Run("SafeGobRegister", func(t *testing.T) {
363+
// Create a fresh DBOS context for this test
364+
freshCtx := setupDBOS(t, false, true) // Don't reset DB but do check for leaks
365+
366+
// Test 1: Basic type vs pointer conflicts
367+
type TestType struct {
368+
Value string
369+
}
370+
371+
// Register workflows that use the same type to trigger potential gob conflicts
372+
// The safeGobRegister calls within RegisterWorkflow should handle the conflicts
373+
workflow1 := func(ctx DBOSContext, input TestType) (TestType, error) {
374+
return input, nil
375+
}
376+
workflow2 := func(ctx DBOSContext, input *TestType) (*TestType, error) {
377+
return input, nil
378+
}
379+
380+
// Both registrations should succeed despite using conflicting types (T and *T)
381+
RegisterWorkflow(freshCtx, workflow1)
382+
RegisterWorkflow(freshCtx, workflow2)
383+
384+
// Test 2: Multiple workflows with the same types (duplicate registrations)
385+
workflow3 := func(ctx DBOSContext, input TestType) (TestType, error) {
386+
return TestType{Value: input.Value + "-modified"}, nil
387+
}
388+
workflow4 := func(ctx DBOSContext, input TestType) (TestType, error) {
389+
return TestType{Value: input.Value + "-another"}, nil
390+
}
391+
392+
// These should succeed even though TestType is already registered
393+
RegisterWorkflow(freshCtx, workflow3)
394+
RegisterWorkflow(freshCtx, workflow4)
395+
396+
// Test 3: Nested structs
397+
type InnerType struct {
398+
ID int
399+
}
400+
type OuterType struct {
401+
Inner InnerType
402+
Name string
403+
}
404+
405+
workflow5 := func(ctx DBOSContext, input OuterType) (OuterType, error) {
406+
return input, nil
407+
}
408+
workflow6 := func(ctx DBOSContext, input *OuterType) (*OuterType, error) {
409+
return input, nil
410+
}
411+
412+
RegisterWorkflow(freshCtx, workflow5)
413+
RegisterWorkflow(freshCtx, workflow6)
414+
415+
// Test 4: Slice and map types
416+
workflow7 := func(ctx DBOSContext, input []TestType) ([]TestType, error) {
417+
return input, nil
418+
}
419+
workflow8 := func(ctx DBOSContext, input []*TestType) ([]*TestType, error) {
420+
return input, nil
421+
}
422+
workflow9 := func(ctx DBOSContext, input map[string]TestType) (map[string]TestType, error) {
423+
return input, nil
424+
}
425+
workflow10 := func(ctx DBOSContext, input map[string]*TestType) (map[string]*TestType, error) {
426+
return input, nil
427+
}
428+
429+
RegisterWorkflow(freshCtx, workflow7)
430+
RegisterWorkflow(freshCtx, workflow8)
431+
RegisterWorkflow(freshCtx, workflow9)
432+
RegisterWorkflow(freshCtx, workflow10)
433+
434+
// Launch and verify the system still works
435+
err := Launch(freshCtx)
436+
require.NoError(t, err, "failed to launch DBOS after gob conflict handling")
437+
defer Shutdown(freshCtx, 10*time.Second)
438+
439+
// Test all registered workflows to ensure they work correctly
440+
441+
// Run workflow1 with value type
442+
testValue := TestType{Value: "test"}
443+
handle1, err := RunWorkflow(freshCtx, workflow1, testValue)
444+
require.NoError(t, err, "failed to run workflow1")
445+
result1, err := handle1.GetResult()
446+
require.NoError(t, err, "failed to get result from workflow1")
447+
assert.Equal(t, testValue, result1, "unexpected result from workflow1")
448+
449+
// Run workflow2 with pointer type
450+
testPointer := &TestType{Value: "pointer"}
451+
handle2, err := RunWorkflow(freshCtx, workflow2, testPointer)
452+
require.NoError(t, err, "failed to run workflow2")
453+
result2, err := handle2.GetResult()
454+
require.NoError(t, err, "failed to get result from workflow2")
455+
assert.Equal(t, testPointer, result2, "unexpected result from workflow2")
456+
457+
// Run workflow3 with modified output
458+
handle3, err := RunWorkflow(freshCtx, workflow3, testValue)
459+
require.NoError(t, err, "failed to run workflow3")
460+
result3, err := handle3.GetResult()
461+
require.NoError(t, err, "failed to get result from workflow3")
462+
assert.Equal(t, TestType{Value: "test-modified"}, result3, "unexpected result from workflow3")
463+
464+
// Run workflow5 with nested struct
465+
testOuter := OuterType{Inner: InnerType{ID: 42}, Name: "test"}
466+
handle5, err := RunWorkflow(freshCtx, workflow5, testOuter)
467+
require.NoError(t, err, "failed to run workflow5")
468+
result5, err := handle5.GetResult()
469+
require.NoError(t, err, "failed to get result from workflow5")
470+
assert.Equal(t, testOuter, result5, "unexpected result from workflow5")
471+
472+
// Run workflow6 with nested struct pointer
473+
testOuterPtr := &OuterType{Inner: InnerType{ID: 43}, Name: "test-ptr"}
474+
handle6, err := RunWorkflow(freshCtx, workflow6, testOuterPtr)
475+
require.NoError(t, err, "failed to run workflow6")
476+
result6, err := handle6.GetResult()
477+
require.NoError(t, err, "failed to get result from workflow6")
478+
assert.Equal(t, testOuterPtr, result6, "unexpected result from workflow6")
479+
480+
// Run workflow7 with slice type
481+
testSlice := []TestType{{Value: "a"}, {Value: "b"}}
482+
handle7, err := RunWorkflow(freshCtx, workflow7, testSlice)
483+
require.NoError(t, err, "failed to run workflow7")
484+
result7, err := handle7.GetResult()
485+
require.NoError(t, err, "failed to get result from workflow7")
486+
assert.Equal(t, testSlice, result7, "unexpected result from workflow7")
487+
488+
// Run workflow8 with pointer slice type
489+
testPtrSlice := []*TestType{{Value: "a"}, {Value: "b"}}
490+
handle8, err := RunWorkflow(freshCtx, workflow8, testPtrSlice)
491+
require.NoError(t, err, "failed to run workflow8")
492+
result8, err := handle8.GetResult()
493+
require.NoError(t, err, "failed to get result from workflow8")
494+
assert.Equal(t, testPtrSlice, result8, "unexpected result from workflow8")
495+
496+
// Run workflow9 with map type
497+
testMap := map[string]TestType{"key1": {Value: "value1"}}
498+
handle9, err := RunWorkflow(freshCtx, workflow9, testMap)
499+
require.NoError(t, err, "failed to run workflow9")
500+
result9, err := handle9.GetResult()
501+
require.NoError(t, err, "failed to get result from workflow9")
502+
assert.Equal(t, testMap, result9, "unexpected result from workflow9")
503+
504+
// Run workflow10 with pointer map type
505+
testPtrMap := map[string]*TestType{"key1": {Value: "value1"}}
506+
handle10, err := RunWorkflow(freshCtx, workflow10, testPtrMap)
507+
require.NoError(t, err, "failed to run workflow10")
508+
result10, err := handle10.GetResult()
509+
require.NoError(t, err, "failed to get result from workflow10")
510+
assert.Equal(t, testPtrMap, result10, "unexpected result from workflow10")
511+
512+
t.Run("validPanic", func(t *testing.T) {
513+
// Verify that non-duplicate registration panics are still propagated
514+
workflow11 := func(ctx DBOSContext, input any) (any, error) {
515+
return input, nil
516+
}
517+
518+
// This should panic during registration because interface{} creates a nil value
519+
// which gob.Register cannot handle
520+
defer func() {
521+
r := recover()
522+
require.NotNil(t, r, "expected panic from interface{} registration but got none")
523+
// Verify it's not a duplicate registration error (which would be caught)
524+
if errStr, ok := r.(string); ok {
525+
assert.False(t, strings.Contains(errStr, "gob: registering duplicate"),
526+
"panic should not be a duplicate registration error, got: %v", r)
527+
}
528+
}()
529+
RegisterWorkflow(freshCtx, workflow11) // This should panic
530+
})
531+
})
360532
}
361533

362534
func stepWithinAStep(ctx context.Context) (string, error) {

0 commit comments

Comments
 (0)