Skip to content

Commit f032c7e

Browse files
committed
wip
1 parent 58df600 commit f032c7e

File tree

8 files changed

+816
-198
lines changed

8 files changed

+816
-198
lines changed

dbos/client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,15 +242,19 @@ func Enqueue[P any, R any](c Client, queueName, workflowName string, input P, op
242242

243243
// Register the input and outputs for gob encoding
244244
var logger *slog.Logger
245+
var serializer Serializer
245246
if cl, ok := c.(*client); ok {
246247
if ctx, ok := cl.dbosCtx.(*dbosContext); ok {
247248
logger = ctx.logger
249+
serializer = ctx.serializer
248250
}
249251
}
250252
var typedInput P
251-
safeGobRegister(typedInput, logger)
252253
var typedOutput R
253-
safeGobRegister(typedOutput, logger)
254+
if isGobSerializer(serializer) {
255+
safeGobRegister(typedInput, logger)
256+
safeGobRegister(typedOutput, logger)
257+
}
254258

255259
// Call the interface method with the same signature
256260
handle, err := c.Enqueue(queueName, workflowName, input, opts...)

dbos/dbos.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func processConfig(inputConfig *Config) (*Config, error) {
7878
dbosConfig.DatabaseSchema = _DEFAULT_SYSTEM_DB_SCHEMA
7979
}
8080
if dbosConfig.Serializer == nil {
81-
dbosConfig.Serializer = NewJSONSerializer()
81+
dbosConfig.Serializer = NewGobSerializer()
8282
}
8383

8484
// Override with environment variables if set
@@ -213,6 +213,7 @@ func WithValue(ctx DBOSContext, key, val any) DBOSContext {
213213
applicationVersion: dbosCtx.applicationVersion,
214214
executorID: dbosCtx.executorID,
215215
applicationID: dbosCtx.applicationID,
216+
serializer: dbosCtx.serializer,
216217
}
217218
childCtx.launched.Store(launched)
218219
return childCtx
@@ -241,6 +242,7 @@ func WithoutCancel(ctx DBOSContext) DBOSContext {
241242
applicationVersion: dbosCtx.applicationVersion,
242243
executorID: dbosCtx.executorID,
243244
applicationID: dbosCtx.applicationID,
245+
serializer: dbosCtx.serializer,
244246
}
245247
childCtx.launched.Store(launched)
246248
return childCtx
@@ -268,6 +270,7 @@ func WithTimeout(ctx DBOSContext, timeout time.Duration) (DBOSContext, context.C
268270
applicationVersion: dbosCtx.applicationVersion,
269271
executorID: dbosCtx.executorID,
270272
applicationID: dbosCtx.applicationID,
273+
serializer: dbosCtx.serializer,
271274
}
272275
childCtx.launched.Store(launched)
273276
return childCtx, cancelFunc
@@ -362,19 +365,21 @@ func NewDBOSContext(ctx context.Context, inputConfig Config) (DBOSContext, error
362365
initExecutor.logger = config.Logger
363366
initExecutor.logger.Info("Initializing DBOS context", "app_name", config.AppName, "dbos_version", getDBOSVersion())
364367

365-
// Register types we serialize with gob
366-
var t time.Time
367-
safeGobRegister(t, initExecutor.logger)
368-
var ws []WorkflowStatus
369-
safeGobRegister(ws, initExecutor.logger)
370-
var si []StepInfo
371-
safeGobRegister(si, initExecutor.logger)
372-
373368
// Initialize global variables from processed config (already handles env vars and defaults)
374369
initExecutor.applicationVersion = config.ApplicationVersion
375370
initExecutor.executorID = config.ExecutorID
376371
initExecutor.serializer = config.Serializer
377372

373+
// Register types we serialize with gob (only if using GobSerializer)
374+
if isGobSerializer(initExecutor.serializer) {
375+
var t time.Time
376+
safeGobRegister(t, initExecutor.logger)
377+
var ws []WorkflowStatus
378+
safeGobRegister(ws, initExecutor.logger)
379+
var si []StepInfo
380+
safeGobRegister(si, initExecutor.logger)
381+
}
382+
378383
initExecutor.applicationID = os.Getenv("DBOS__APPID")
379384

380385
newSystemDatabaseInputs := newSystemDatabaseInput{

dbos/errors.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ func (e *DBOSError) Unwrap() error {
5555
return e.wrappedErr
5656
}
5757

58+
func (e *DBOSError) Is(target error) bool {
59+
t, ok := target.(*DBOSError)
60+
if !ok {
61+
return false
62+
}
63+
// Match if codes are equal (and target code is set)
64+
return t.Code != 0 && e.Code == t.Code
65+
}
66+
5867
func newConflictingWorkflowError(workflowID, message string) *DBOSError {
5968
msg := fmt.Sprintf("Conflicting workflow invocation with the same ID (%s)", workflowID)
6069
if message != "" {

dbos/recovery.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
package dbos
22

3-
import (
4-
"strings"
5-
)
6-
73
func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]WorkflowHandle[any], error) {
84
workflowHandles := make([]WorkflowHandle[any], 0)
95
// List pending workflows for the executors
@@ -18,13 +14,6 @@ func recoverPendingWorkflows(ctx *dbosContext, executorIDs []string) ([]Workflow
1814
}
1915

2016
for _, workflow := range pendingWorkflows {
21-
if inputStr, ok := workflow.Input.(string); ok {
22-
if strings.Contains(inputStr, "Failed to decode") {
23-
ctx.logger.Warn("Skipping workflow recovery due to input decoding failure", "workflow_id", workflow.ID, "name", workflow.Name)
24-
continue
25-
}
26-
}
27-
2817
if workflow.QueueName != "" {
2918
cleared, err := ctx.systemDB.clearQueueAssignment(ctx, workflow.ID)
3019
if err != nil {

dbos/serialization.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func (g *GobSerializer) Encode(data any) (string, error) {
3333
var inputBytes []byte
3434
if !isNilValue(data) {
3535
// Register the type with gob for proper serialization
36+
// This is useful when dealing with interface types that didn't have a concrete value at workflow registration time
3637
safeGobRegister(data, nil)
3738

3839
var buf bytes.Buffer
@@ -116,6 +117,42 @@ func serialize(ctx DBOSContext, data any) (string, error) {
116117
return dbosCtx.serializer.Encode(data)
117118
}
118119

120+
// isJSONSerializer checks if the given serializer is a JSONSerializer
121+
func isJSONSerializer(s Serializer) bool {
122+
_, ok := s.(*JSONSerializer)
123+
return ok
124+
}
125+
126+
// isGobSerializer checks if the given serializer is a GobSerializer
127+
func isGobSerializer(s Serializer) bool {
128+
_, ok := s.(*GobSerializer)
129+
return ok
130+
}
131+
132+
// convertJSONToType converts a JSON-decoded value (map[string]interface{}) to type T
133+
// via marshal/unmarshal round-trip.
134+
//
135+
// This is needed because JSON deserialization loses type information when decoding
136+
// into `any` - it converts structs to map[string]interface{}, numbers to float64, etc.
137+
// By re-marshaling and unmarshaling into a typed target, we (mostly) restore the original structure.
138+
func convertJSONToType[T any](value any) (T, error) {
139+
if value == nil {
140+
return *new(T), nil
141+
}
142+
143+
jsonBytes, err := json.Marshal(value)
144+
if err != nil {
145+
return *new(T), fmt.Errorf("marshaling for type conversion: %w", err)
146+
}
147+
148+
var typedResult T
149+
if err := json.Unmarshal(jsonBytes, &typedResult); err != nil {
150+
return *new(T), fmt.Errorf("unmarshaling for type conversion: %w", err)
151+
}
152+
153+
return typedResult, nil
154+
}
155+
119156
// Handle cases where the provided data interface wraps a nil value (e.g., var p *int; data := any(p). data != nil but the underlying value is nil)
120157
func isNilValue(data any) bool {
121158
if data == nil {

0 commit comments

Comments
 (0)