Skip to content

Commit f893361

Browse files
authored
feat: support propagation of OpenTelemetry context from clients (#568)
1 parent e43c79b commit f893361

File tree

11 files changed

+358
-9
lines changed

11 files changed

+358
-9
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
steampipe_postgres_fdw.h
88
# generated C imports
99
0_prebuild.go
10+
prebuild.go
1011

1112
# Binaries for programs and plugins
1213
*.exe
@@ -27,4 +28,4 @@ build-*/
2728
# intermediate files from clang
2829
*.bc
2930
# work directory created by the standalone fdw building
30-
/work
31+
/work

fdw.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"net"
2020
"net/http"
2121
"os"
22+
"strings"
2223
"time"
2324
"unsafe"
2425

@@ -172,6 +173,30 @@ func goFdwGetRelSize(state *C.FdwPlanState, root *C.PlannerInfo, rows *C.double,
172173

173174
tableOpts := GetFTableOptions(types.Oid(state.foreigntableid))
174175

176+
// Extract trace context if available
177+
var traceContext string
178+
if state.trace_context_string != nil {
179+
traceContext = C.GoString(state.trace_context_string)
180+
log.Printf("[TRACE] Extracted trace context from session: %s", traceContext)
181+
182+
if len(traceContext) > 0 {
183+
log.Printf("[DEBUG] Trace context length: %d characters", len(traceContext))
184+
if strings.Contains(traceContext, "traceparent=") {
185+
log.Printf("[DEBUG] Trace context contains traceparent field")
186+
} else {
187+
log.Printf("[WARN] Trace context missing traceparent field - may be malformed")
188+
}
189+
}
190+
} else {
191+
log.Printf("[DEBUG] No trace context found in session variables")
192+
}
193+
194+
// Add trace context to options for hub layer
195+
if traceContext != "" {
196+
tableOpts["trace_context"] = traceContext
197+
log.Printf("[DEBUG] Added trace context to table options")
198+
}
199+
175200
// build columns
176201
var columns []string
177202
if state.target_list != nil {
@@ -296,6 +321,21 @@ func goFdwBeginForeignScan(node *C.ForeignScanState, eflags C.int) {
296321
plan := (*C.ForeignScan)(unsafe.Pointer(node.ss.ps.plan))
297322
var execState *C.FdwExecState = C.initializeExecState(unsafe.Pointer(plan.fdw_private))
298323

324+
// Extract trace context from session variables for scan operation
325+
var traceContext string
326+
if traceContextPtr := C.getTraceContextFromSession(); traceContextPtr != nil {
327+
traceContext = C.GoString(traceContextPtr)
328+
log.Printf("[TRACE] Extracted trace context from session for scan: %s", traceContext)
329+
} else {
330+
log.Printf("[DEBUG] No trace context found in session variables for scan")
331+
}
332+
333+
// Add trace context to options for hub layer
334+
if traceContext != "" {
335+
opts["trace_context"] = traceContext
336+
log.Printf("[DEBUG] Added trace context to scan options")
337+
}
338+
299339
log.Printf("[INFO] goFdwBeginForeignScan, canPushdownAllSortFields %v", execState.canPushdownAllSortFields)
300340
var columns []string
301341
if execState.target_list != nil {

fdw/common.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ typedef struct FdwPlanState
6767
int width;
6868
// the number of rows to return (limit+offset). -1 means no limit
6969
int limit;
70+
// OpenTelemetry trace context extracted from session variables
71+
char *trace_context_string;
7072

7173
} FdwPlanState;
7274

@@ -133,4 +135,4 @@ List *deserializeDeparsedSortGroup(List *items);
133135
OpExpr *canonicalOpExpr(OpExpr *opExpr, Relids base_relids);
134136
ScalarArrayOpExpr *canonicalScalarArrayOpExpr(ScalarArrayOpExpr *opExpr, Relids base_relids);
135137
char *getOperatorString(Oid opoid);
136-
#endif // FDW_COMMON_H
138+
#endif // FDW_COMMON_H

fdw/fdw.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include "fdw_handlers.h"
55
#include "nodes/plannodes.h"
66
#include "access/xact.h"
7+
#include "utils/guc.h"
8+
#include "utils/builtins.h"
79

810
extern PGDLLEXPORT void _PG_init(void);
911

@@ -92,6 +94,40 @@ static bool fdwIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel, Ran
9294
return getenv("STEAMPIPE_FDW_PARALLEL_SAFE") != NULL;
9395
}
9496

97+
/*
98+
* Extract OpenTelemetry trace context from PostgreSQL session variables
99+
* Returns a formatted string containing traceparent and tracestate, or NULL if not set
100+
*/
101+
static char *extractTraceContextFromSession(void)
102+
{
103+
const char *traceparent = GetConfigOption("steampipe.traceparent", true, false);
104+
const char *tracestate = GetConfigOption("steampipe.tracestate", true, false);
105+
char *result = NULL;
106+
107+
// Format the result string for Go layer consumption
108+
if (traceparent != NULL) {
109+
if (tracestate != NULL) {
110+
result = psprintf("traceparent=%s;tracestate=%s", traceparent, tracestate);
111+
} else {
112+
result = psprintf("traceparent=%s", traceparent);
113+
}
114+
115+
elog(DEBUG1, "extracted trace context: %s", result);
116+
} else {
117+
elog(DEBUG2, "no trace context found in session variables");
118+
}
119+
120+
return result;
121+
}
122+
123+
/*
124+
* Public wrapper for extractTraceContextFromSession - callable from Go
125+
*/
126+
char *getTraceContextFromSession(void)
127+
{
128+
return extractTraceContextFromSession();
129+
}
130+
95131
static void fdwGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
96132
{
97133
FdwPlanState *planstate;
@@ -111,6 +147,16 @@ static void fdwGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid for
111147
// Save plan state information
112148
baserel->fdw_private = planstate;
113149
planstate->foreigntableid = foreigntableid;
150+
151+
// Extract trace context from session variables
152+
char *traceContext = extractTraceContextFromSession();
153+
if (traceContext != NULL) {
154+
planstate->trace_context_string = pstrdup(traceContext);
155+
pfree(traceContext);
156+
elog(DEBUG1, "stored trace context in plan state");
157+
} else {
158+
planstate->trace_context_string = NULL;
159+
}
114160

115161
// Initialize the conversion info array
116162
{

fdw/fdw_helpers.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,7 @@ static inline char *nameStr(Name n) { return NameStr(*n); }
105105

106106

107107
// logging
108-
char *tagTypeToString(NodeTag type);
108+
char *tagTypeToString(NodeTag type);
109+
110+
// trace context
111+
char *getTraceContextFromSession(void);

hub/hub_base.go

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
"go.opentelemetry.io/otel"
1919
"go.opentelemetry.io/otel/attribute"
2020
"go.opentelemetry.io/otel/metric"
21+
"go.opentelemetry.io/otel/propagation"
22+
"go.opentelemetry.io/otel/trace"
2123
)
2224

2325
type hubBase struct {
@@ -372,8 +374,31 @@ func (h *hubBase) executeCommandScan(connectionName, table string, queryTimestam
372374
}
373375
}
374376

375-
func (h *hubBase) traceContextForScan(table string, columns []string, limit int64, qualMap map[string]*proto.Quals, connectionName string) *telemetry.TraceCtx {
376-
ctx, span := telemetry.StartSpan(context.Background(), FdwName, "RemoteHub.Scan (%s)", table)
377+
func (h *hubBase) traceContextForScan(table string, columns []string, limit int64, qualMap map[string]*proto.Quals, connectionName string, opts types.Options) *telemetry.TraceCtx {
378+
var baseCtx context.Context = context.Background()
379+
380+
// Check if we have trace context from session variables
381+
if traceContextStr, exists := opts["trace_context"]; exists && traceContextStr != "" {
382+
log.Printf("[DEBUG] traceContextForScan received trace context: %s", traceContextStr)
383+
if parentCtx := h.parseTraceContext(traceContextStr); parentCtx != nil {
384+
baseCtx = parentCtx
385+
log.Printf("[TRACE] Using parent trace context for scan of table: %s", table)
386+
387+
// Verify the parent context has the expected trace ID
388+
parentSpanCtx := trace.SpanContextFromContext(parentCtx)
389+
if parentSpanCtx.IsValid() {
390+
log.Printf("[DEBUG] Parent context TraceID: %s, SpanID: %s",
391+
parentSpanCtx.TraceID().String(), parentSpanCtx.SpanID().String())
392+
}
393+
} else {
394+
log.Printf("[WARN] Failed to parse trace context for table: %s", table)
395+
}
396+
} else {
397+
log.Printf("[DEBUG] No trace context found in options for table: %s", table)
398+
}
399+
400+
// Create span with potentially propagated context
401+
ctx, span := telemetry.StartSpan(baseCtx, FdwName, "RemoteHub.Scan (%s)", table)
377402
span.SetAttributes(
378403
attribute.StringSlice("columns", columns),
379404
attribute.String("table", table),
@@ -383,9 +408,69 @@ func (h *hubBase) traceContextForScan(table string, columns []string, limit int6
383408
if limit != -1 {
384409
span.SetAttributes(attribute.Int64("limit", limit))
385410
}
411+
412+
spanCtx := span.SpanContext()
413+
if spanCtx.IsValid() {
414+
log.Printf("[DEBUG] Created span for table %s - TraceID: %s, SpanID: %s",
415+
table, spanCtx.TraceID().String(), spanCtx.SpanID().String())
416+
}
417+
386418
return &telemetry.TraceCtx{Ctx: ctx, Span: span}
387419
}
388420

421+
// parseTraceContext parses trace context string from session variables
422+
// Format: "traceparent=00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01;tracestate=rojo=00f067aa0ba902b7"
423+
func (h *hubBase) parseTraceContext(traceContextString string) context.Context {
424+
log.Printf("[DEBUG] parseTraceContext called with: %s", traceContextString)
425+
426+
if traceContextString == "" {
427+
log.Printf("[DEBUG] Empty trace context string")
428+
return nil
429+
}
430+
431+
carrier := propagation.MapCarrier{}
432+
433+
// Parse the trace context string format: "traceparent=..;tracestate=.."
434+
parts := strings.Split(traceContextString, ";")
435+
log.Printf("[DEBUG] Split trace context into %d parts: %v", len(parts), parts)
436+
437+
for _, part := range parts {
438+
if kv := strings.SplitN(part, "=", 2); len(kv) == 2 {
439+
key := strings.TrimSpace(kv[0])
440+
value := strings.TrimSpace(kv[1])
441+
carrier[key] = value
442+
log.Printf("[DEBUG] Added to carrier: %s = %s", key, value)
443+
} else {
444+
log.Printf("[DEBUG] Skipping invalid part: %s", part)
445+
}
446+
}
447+
448+
log.Printf("[DEBUG] Final carrier contents: %v", carrier)
449+
450+
if len(carrier) == 0 {
451+
log.Printf("[WARN] No valid trace context found in: %s", traceContextString)
452+
return nil
453+
}
454+
455+
// Use OpenTelemetry propagator to extract context
456+
propagator := propagation.NewCompositeTextMapPropagator(
457+
propagation.TraceContext{},
458+
propagation.Baggage{},
459+
)
460+
extractedCtx := propagator.Extract(context.Background(), carrier)
461+
462+
// Verify we actually got a valid span context
463+
spanCtx := trace.SpanContextFromContext(extractedCtx)
464+
if spanCtx.IsValid() {
465+
log.Printf("[TRACE] Successfully extracted trace context - TraceID: %s, SpanID: %s",
466+
spanCtx.TraceID().String(), spanCtx.SpanID().String())
467+
return extractedCtx
468+
}
469+
470+
log.Printf("[WARN] Extracted trace context is not valid - carrier was: %v", carrier)
471+
return nil
472+
}
473+
389474
// determine whether to include the limit, based on the quals
390475
// we ONLY pushdown the limit if all quals have corresponding key columns,
391476
// and if the qual operator is supported by the key column

hub/hub_local.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (l *HubLocal) GetIterator(columns []string, quals *proto.Quals, unhandledRe
128128
}
129129

130130
// create a span for this scan
131-
scanTraceCtx := l.traceContextForScan(table, columns, limit, qualMap, connectionName)
131+
scanTraceCtx := l.traceContextForScan(table, columns, limit, qualMap, connectionName, opts)
132132
iterator, err := l.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, sortOrder, queryTimestamp, scanTraceCtx)
133133

134134
if err != nil {

hub/hub_remote.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (h *RemoteHub) GetIterator(columns []string, quals *proto.Quals, unhandledR
102102
}
103103

104104
// create a span for this scan
105-
scanTraceCtx := h.traceContextForScan(table, columns, limit, qualMap, connectionName)
105+
scanTraceCtx := h.traceContextForScan(table, columns, limit, qualMap, connectionName, opts)
106106
iterator, err := h.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, sortOrder, queryTimestamp, scanTraceCtx)
107107

108108
if err != nil {

hub/scan_iterator_base.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/turbot/steampipe-plugin-sdk/v5/telemetry"
1717
"github.com/turbot/steampipe-postgres-fdw/v2/types"
1818
"github.com/turbot/steampipe/v2/pkg/query/queryresult"
19+
"go.opentelemetry.io/otel/trace"
1920
"google.golang.org/protobuf/reflect/protoreflect"
2021
)
2122

@@ -199,13 +200,25 @@ func (i *scanIteratorBase) GetScanMetadata() []queryresult.ScanMetadataRow {
199200
}
200201

201202
func (i *scanIteratorBase) newExecuteRequest() *proto.ExecuteRequest {
203+
traceCarrier := grpc.CreateCarrierFromContext(i.traceCtx.Ctx)
204+
log.Printf("[DEBUG] newExecuteRequest creating trace carrier for table %s: %v", i.table, traceCarrier)
205+
206+
// Validate span context from the trace context
207+
spanCtx := trace.SpanContextFromContext(i.traceCtx.Ctx)
208+
if spanCtx.IsValid() {
209+
log.Printf("[DEBUG] newExecuteRequest has valid span context - TraceID: %s, SpanID: %s",
210+
spanCtx.TraceID().String(), spanCtx.SpanID().String())
211+
} else {
212+
log.Printf("[WARN] newExecuteRequest has invalid span context for table %s", i.table)
213+
}
214+
202215
req := &proto.ExecuteRequest{
203216
Table: i.table,
204217
QueryContext: i.queryContext,
205218
CallId: i.callId,
206219
// pass connection name - used for aggregators
207220
Connection: i.connectionName,
208-
TraceContext: grpc.CreateCarrierFromContext(i.traceCtx.Ctx),
221+
TraceContext: traceCarrier,
209222
ExecuteConnectionData: make(map[string]*proto.ExecuteConnectionData),
210223
}
211224

0 commit comments

Comments
 (0)