From cb8472b2126113f5d01b722543360caa03f2eff6 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Wed, 5 Nov 2025 16:40:06 -0500 Subject: [PATCH 01/12] add span pointer attributes --- ddtrace/_trace/trace_handlers.py | 198 ++++++++++++++++-- tests/contrib/fastapi/test_fastapi.py | 12 +- ...pi.test_websocket_context_propagation.json | 122 ++++++----- 3 files changed, 255 insertions(+), 77 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 45dba9128cf..e362939345b 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -16,6 +16,7 @@ from ddtrace import config from ddtrace._trace._inferred_proxy import create_inferred_proxy_span_if_headers_exist from ddtrace._trace._span_pointer import _SpanPointerDescription +from ddtrace._trace._span_pointer import _SpanPointerDirection from ddtrace._trace.span import Span from ddtrace._trace.utils import extract_DD_context_from_messages from ddtrace.constants import _SPAN_MEASURED_KEY @@ -992,6 +993,70 @@ def _set_client_ip_tags(scope: Mapping[str, Any], span: Span): log.debug("Could not validate client IP address for websocket send message: %s", str(e)) +def _init_websocket_message_counters(scope: Mapping[str, Any]) -> None: + if "datadog" not in scope: + scope["datadog"] = {} # type: ignore[index] + if "websocket_receive_counter" not in scope["datadog"]: + scope["datadog"]["websocket_receive_counter"] = 0 # type: ignore[index] + if "websocket_send_counter" not in scope["datadog"]: + scope["datadog"]["websocket_send_counter"] = 0 # type: ignore[index] + + +def _increment_websocket_counter(scope: Mapping[str, Any], counter_type: str) -> int: + """ + Increment and return websocket message counter (either websocket_receive_counter or websocket_send_counter) + """ + scope["datadog"][counter_type] += 1 # type: ignore[index] + return scope["datadog"][counter_type] # type: ignore[index,return-value] + + +def _build_websocket_span_pointer_hash( + handshake_trace_id: int, + handshake_span_id: int, + counter: int, + is_server: bool, + is_incoming: bool, +) -> str: + """ + Build websocket span pointer hash. + + Format: <128 bit hex trace id><64 bit hex span id><32 bit hex counter> + Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing + + :param handshake_trace_id: Trace ID from the handshake span + :param handshake_span_id: Span ID from the handshake span + :param counter: Message counter value + :param is_server: True if running on server side, False if client + :param is_incoming: True if incoming message, False if outgoing + :return: Span pointer hash string + """ + if (is_server and not is_incoming) or (not is_server and is_incoming): + prefix = "S" + else: + prefix = "C" + + trace_id_hex = f"{handshake_trace_id:032x}" + span_id_hex = f"{handshake_span_id:016x}" + counter_hex = f"{counter:08x}" + + return f"{prefix}{trace_id_hex}{span_id_hex}{counter_hex}" + + +def _has_distributed_tracing_context(span: Span) -> bool: + """ + Check if the handshake span has extracted distributed tracing context. + + A websocket server must not set the span pointer if the handshake has not extracted a context + + A span has distributed tracing context if it has a parent context that was + extracted from headers (_is_remote=True). + """ + if not span or not span._parent_context: + return False + # Check if the context was extracted from remote headers + return span._parent_context._is_remote + + def _on_asgi_websocket_receive_message(ctx, scope, message): """ Handle websocket receive message events. @@ -1011,16 +1076,38 @@ def _on_asgi_websocket_receive_message(ctx, scope, message): span.set_metric(websocket.MESSAGE_FRAMES, 1) if hasattr(ctx, "parent") and ctx.parent.span: + handshake_span = ctx.parent.span + + # Add span pointer attributes if distributed tracing is enabled and context was extracted + if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): + counter = _increment_websocket_counter(scope, "websocket_receive_counter") + + ptr_hash = _build_websocket_span_pointer_hash( + handshake_trace_id=handshake_span.trace_id, + handshake_span_id=handshake_span.span_id, + counter=counter, + is_server=True, + is_incoming=True, + ) + + link_attributes = { + "link.name": "span-pointer-up", + "dd.kind": "span-pointer", + "ptr.kind": "websocket", + "ptr.dir": _SpanPointerDirection.UPSTREAM, + "ptr.hash": ptr_hash, + } + span.set_link( - trace_id=ctx.parent.span.trace_id, - span_id=ctx.parent.span.span_id, - attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED}, + trace_id=handshake_span.trace_id, + span_id=handshake_span.span_id, + attributes=link_attributes, ) if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True): - _inherit_sampling_tags(span, ctx.parent.span._local_root) + _inherit_sampling_tags(span, handshake_span._local_root) - _copy_trace_level_tags(span, ctx.parent.span) + _copy_trace_level_tags(span, handshake_span) def _on_asgi_websocket_send_message(ctx, scope, message): @@ -1041,10 +1128,32 @@ def _on_asgi_websocket_send_message(ctx, scope, message): span.set_metric(websocket.MESSAGE_FRAMES, 1) if hasattr(ctx, "parent") and ctx.parent.span: + handshake_span = ctx.parent.span + + # Add span pointer attributes if distributed tracing is enabled and context was extracted + if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): + counter = _increment_websocket_counter(scope, "websocket_send_counter") + + ptr_hash = _build_websocket_span_pointer_hash( + handshake_trace_id=handshake_span.trace_id, + handshake_span_id=handshake_span.span_id, + counter=counter, + is_server=True, + is_incoming=False, + ) + + link_attributes = { + "link.name": "span-pointer-down", + "dd.kind": "span-pointer", + "ptr.kind": "websocket", + "ptr.dir": _SpanPointerDirection.DOWNSTREAM, + "ptr.hash": ptr_hash, + } + span.set_link( - trace_id=ctx.parent.span.trace_id, - span_id=ctx.parent.span.span_id, - attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING}, + trace_id=handshake_span.trace_id, + span_id=handshake_span.span_id, + attributes=link_attributes, ) @@ -1068,13 +1177,38 @@ def _on_asgi_websocket_close_message(ctx, scope, message): _set_websocket_close_tags(span, message) if hasattr(ctx, "parent") and ctx.parent.span: + handshake_span = ctx.parent.span + + # Build span link attributes + link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} + + # Add span pointer attributes if distributed tracing is enabled and context was extracted + if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): + counter = _increment_websocket_counter(scope, "websocket_send_counter") + + ptr_hash = _build_websocket_span_pointer_hash( + handshake_trace_id=handshake_span.trace_id, + handshake_span_id=handshake_span.span_id, + counter=counter, + is_server=True, + is_incoming=False, + ) + + link_attributes = { + "link.name": "span-pointer-down", + "dd.kind": "span-pointer", + "ptr.kind": "websocket", + "ptr.dir": _SpanPointerDirection.DOWNSTREAM, + "ptr.hash": ptr_hash, + } + span.set_link( - trace_id=ctx.parent.span.trace_id, - span_id=ctx.parent.span.span_id, - attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING}, + trace_id=handshake_span.trace_id, + span_id=handshake_span.span_id, + attributes=link_attributes, ) - _copy_trace_level_tags(span, ctx.parent.span) + _copy_trace_level_tags(span, handshake_span) def _on_asgi_websocket_disconnect_message(ctx, scope, message): @@ -1093,16 +1227,38 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message): _set_websocket_close_tags(span, message) if hasattr(ctx, "parent") and ctx.parent.span: + handshake_span = ctx.parent.span + + # Add span pointer attributes if distributed tracing is enabled and context was extracted + if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): + counter = _increment_websocket_counter(scope, "websocket_receive_counter") + + ptr_hash = _build_websocket_span_pointer_hash( + handshake_trace_id=handshake_span.trace_id, + handshake_span_id=handshake_span.span_id, + counter=counter, + is_server=True, + is_incoming=True, + ) + + link_attributes = { + "link.name": "span-pointer-up", + "dd.kind": "span-pointer", + "ptr.kind": "websocket", + "ptr.dir": _SpanPointerDirection.UPSTREAM, + "ptr.hash": ptr_hash, + } + span.set_link( - trace_id=ctx.parent_span.trace_id, - span_id=ctx.parent_span.span_id, - attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED}, + trace_id=handshake_span.trace_id, + span_id=handshake_span.span_id, + attributes=link_attributes, ) if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True): - _inherit_sampling_tags(span, ctx.parent.span._local_root) + _inherit_sampling_tags(span, handshake_span._local_root) - _copy_trace_level_tags(span, ctx.parent.span) + _copy_trace_level_tags(span, handshake_span) def _on_asgi_request(ctx: core.ExecutionContext) -> None: @@ -1115,14 +1271,16 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None: span = _start_span(ctx) ctx.set_item("req_span", span) - if scope["type"] == "websocket": - span._set_tag_str("http.upgraded", "websocket") - if "datadog" not in scope: scope["datadog"] = {"request_spans": [span]} else: scope["datadog"]["request_spans"].append(span) + if scope["type"] == "websocket": + span._set_tag_str("http.upgraded", "websocket") + # Initialize websocket message counters for span pointer tracking + _init_websocket_message_counters(scope) + def listen(): core.on("wsgi.request.prepare", _on_request_prepare) diff --git a/tests/contrib/fastapi/test_fastapi.py b/tests/contrib/fastapi/test_fastapi.py index 15c29246314..43065f6012d 100644 --- a/tests/contrib/fastapi/test_fastapi.py +++ b/tests/contrib/fastapi/test_fastapi.py @@ -624,11 +624,11 @@ def _run_websocket_send_only_test(): fastapi_unpatch() -@pytest.mark.subprocess( - env=dict( - DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true", - ) -) +# @pytest.mark.subprocess( +# env=dict( +# DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true", +# ) +# ) @snapshot(ignores=["meta._dd.span_links", "metrics.websocket.message.length"]) def test_traced_websocket(test_spans, snapshot_app): from tests.contrib.fastapi.test_fastapi import _run_websocket_test @@ -740,7 +740,7 @@ def _run_websocket_context_propagation_test(): fastapi_unpatch() -@pytest.mark.subprocess(env=dict(DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true")) +# @pytest.mark.subprocess(env=dict(DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true")) @snapshot(ignores=["meta._dd.span_links", "metrics.websocket.message.length"]) def test_websocket_context_propagation(snapshot_app): """Test trace context propagation.""" diff --git a/tests/snapshots/tests.contrib.fastapi.test_fastapi.test_websocket_context_propagation.json b/tests/snapshots/tests.contrib.fastapi.test_fastapi.test_websocket_context_propagation.json index ed2565cfa38..5a6fca2600c 100644 --- a/tests/snapshots/tests.contrib.fastapi.test_fastapi.test_websocket_context_propagation.json +++ b/tests/snapshots/tests.contrib.fastapi.test_fastapi.test_websocket_context_propagation.json @@ -11,18 +11,18 @@ "meta": { "_dd.base_service": "tests.contrib.fastapi", "_dd.p.dm": "-0", - "_dd.p.tid": "68a755dc00000000", + "_dd.p.tid": "690baf1300000000", "language": "python", - "runtime-id": "a5cd0b3a0a68429286fa9b33d92eec5b" + "runtime-id": "b7b5d96e21fd459b984afa1d3e4696b4" }, "metrics": { "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 49501 + "process_id": 29721 }, - "duration": 31000, - "start": 1755796956135042000 + "duration": 34000, + "start": 1762373395708160000 }], [ { @@ -46,17 +46,17 @@ "http.url": "ws://testserver/ws", "http.useragent": "testclient", "language": "python", - "runtime-id": "a5cd0b3a0a68429286fa9b33d92eec5b", + "runtime-id": "b7b5d96e21fd459b984afa1d3e4696b4", "span.kind": "server" }, "metrics": { "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 49501 + "process_id": 29721 }, - "duration": 619000, - "start": 1755796956134720000 + "duration": 928000, + "start": 1762373395707641000 }, { "name": "websocket.send", @@ -71,7 +71,7 @@ "_dd.base_service": "tests.contrib.fastapi", "_dd.origin": "rum", "_dd.p.dm": "-0", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"resuming\"}}]", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-down\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"d\", \"ptr.hash\": \"S000000000000000000000000075bcd15772e70ed55e9996b00000001\"}}]", "baggage.account.id": "456", "baggage.session.id": "789", "baggage.user.id": "123", @@ -88,8 +88,8 @@ "websocket.message.frames": 1, "websocket.message.length": 27 }, - "duration": 106000, - "start": 1755796956135439000 + "duration": 180000, + "start": 1762373395708691000 }], [ { @@ -107,14 +107,14 @@ "_dd.dm.service": "fastapi", "_dd.origin": "rum", "_dd.p.dm": "-0", - "_dd.p.tid": "68a755dc00000000", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"executed_by\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-up\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"u\", \"ptr.hash\": \"C000000000000000000000000075bcd15772e70ed55e9996b00000001\"}}]", "baggage.account.id": "456", "baggage.session.id": "789", "baggage.user.id": "123", "component": "fastapi", "language": "python", - "runtime-id": "a5cd0b3a0a68429286fa9b33d92eec5b", + "runtime-id": "b7b5d96e21fd459b984afa1d3e4696b4", "span.kind": "consumer", "websocket.duration.style": "blocking", "websocket.message.type": "text" @@ -124,12 +124,12 @@ "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 49501, + "process_id": 29721, "websocket.message.frames": 1, "websocket.message.length": 9 }, - "duration": 231000, - "start": 1755796956135665000 + "duration": 150000, + "start": 1762373395709016000 }, { "name": "websocket.send", @@ -142,19 +142,23 @@ "error": 0, "meta": { "_dd.base_service": "tests.contrib.fastapi", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"resuming\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-down\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"d\", \"ptr.hash\": \"S000000000000000000000000075bcd15772e70ed55e9996b00000002\"}}]", "component": "fastapi", + "language": "python", "network.client.ip": "testclient", "out.host": "testclient", "span.kind": "producer", "websocket.message.type": "text" }, "metrics": { + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, "websocket.message.frames": 1, "websocket.message.length": 6 }, - "duration": 38000, - "start": 1755796956135789000 + "duration": 62000, + "start": 1762373395709259000 }], [ { @@ -172,14 +176,14 @@ "_dd.dm.service": "fastapi", "_dd.origin": "rum", "_dd.p.dm": "-0", - "_dd.p.tid": "68a755dc00000000", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"executed_by\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-up\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"u\", \"ptr.hash\": \"C000000000000000000000000075bcd15772e70ed55e9996b00000002\"}}]", "baggage.account.id": "456", "baggage.session.id": "789", "baggage.user.id": "123", "component": "fastapi", "language": "python", - "runtime-id": "a5cd0b3a0a68429286fa9b33d92eec5b", + "runtime-id": "b7b5d96e21fd459b984afa1d3e4696b4", "span.kind": "consumer", "websocket.duration.style": "blocking", "websocket.message.type": "text" @@ -189,12 +193,12 @@ "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 49501, + "process_id": 29721, "websocket.message.frames": 1, "websocket.message.length": 9 }, - "duration": 270000, - "start": 1755796956135879000 + "duration": 130000, + "start": 1762373395709414000 }, { "name": "websocket.send", @@ -207,19 +211,23 @@ "error": 0, "meta": { "_dd.base_service": "tests.contrib.fastapi", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"resuming\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-down\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"d\", \"ptr.hash\": \"S000000000000000000000000075bcd15772e70ed55e9996b00000003\"}}]", "component": "fastapi", + "language": "python", "network.client.ip": "testclient", "out.host": "testclient", "span.kind": "producer", "websocket.message.type": "text" }, "metrics": { + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, "websocket.message.frames": 1, "websocket.message.length": 6 }, - "duration": 35000, - "start": 1755796956136049000 + "duration": 62000, + "start": 1762373395709647000 }], [ { @@ -237,14 +245,14 @@ "_dd.dm.service": "fastapi", "_dd.origin": "rum", "_dd.p.dm": "-0", - "_dd.p.tid": "68a755dc00000000", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"executed_by\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-up\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"u\", \"ptr.hash\": \"C000000000000000000000000075bcd15772e70ed55e9996b00000003\"}}]", "baggage.account.id": "456", "baggage.session.id": "789", "baggage.user.id": "123", "component": "fastapi", "language": "python", - "runtime-id": "a5cd0b3a0a68429286fa9b33d92eec5b", + "runtime-id": "b7b5d96e21fd459b984afa1d3e4696b4", "span.kind": "consumer", "websocket.duration.style": "blocking", "websocket.message.type": "text" @@ -254,12 +262,12 @@ "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 49501, + "process_id": 29721, "websocket.message.frames": 1, "websocket.message.length": 9 }, - "duration": 258000, - "start": 1755796956136132000 + "duration": 102000, + "start": 1762373395709803000 }, { "name": "websocket.send", @@ -272,19 +280,23 @@ "error": 0, "meta": { "_dd.base_service": "tests.contrib.fastapi", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"resuming\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-down\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"d\", \"ptr.hash\": \"S000000000000000000000000075bcd15772e70ed55e9996b00000004\"}}]", "component": "fastapi", + "language": "python", "network.client.ip": "testclient", "out.host": "testclient", "span.kind": "producer", "websocket.message.type": "text" }, "metrics": { + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, "websocket.message.frames": 1, "websocket.message.length": 6 }, - "duration": 34000, - "start": 1755796956136292000 + "duration": 49000, + "start": 1762373395709981000 }], [ { @@ -302,14 +314,14 @@ "_dd.dm.service": "fastapi", "_dd.origin": "rum", "_dd.p.dm": "-0", - "_dd.p.tid": "68a755dc00000000", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"executed_by\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-up\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"u\", \"ptr.hash\": \"C000000000000000000000000075bcd15772e70ed55e9996b00000004\"}}]", "baggage.account.id": "456", "baggage.session.id": "789", "baggage.user.id": "123", "component": "fastapi", "language": "python", - "runtime-id": "a5cd0b3a0a68429286fa9b33d92eec5b", + "runtime-id": "b7b5d96e21fd459b984afa1d3e4696b4", "span.kind": "consumer", "websocket.duration.style": "blocking", "websocket.message.type": "text" @@ -319,12 +331,12 @@ "_dd.top_level": 1, "_dd.tracer_kr": 1.0, "_sampling_priority_v1": 1, - "process_id": 49501, + "process_id": 29721, "websocket.message.frames": 1, "websocket.message.length": 7 }, - "duration": 303000, - "start": 1755796956136373000 + "duration": 111000, + "start": 1762373395710111000 }, { "name": "websocket.send", @@ -337,19 +349,23 @@ "error": 0, "meta": { "_dd.base_service": "tests.contrib.fastapi", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"resuming\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-down\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"d\", \"ptr.hash\": \"S000000000000000000000000075bcd15772e70ed55e9996b00000005\"}}]", "component": "fastapi", + "language": "python", "network.client.ip": "testclient", "out.host": "testclient", "span.kind": "producer", "websocket.message.type": "text" }, "metrics": { + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, "websocket.message.frames": 1, "websocket.message.length": 3 }, - "duration": 34000, - "start": 1755796956136528000 + "duration": 51000, + "start": 1762373395710310000 }, { "name": "websocket.close", @@ -364,18 +380,22 @@ "_dd.base_service": "tests.contrib.fastapi", "_dd.origin": "rum", "_dd.p.dm": "-0", - "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"a205584a3b2a0fae\", \"attributes\": {\"dd.kind\": \"resuming\"}}]", + "_dd.p.tid": "690baf1300000000", + "_dd.span_links": "[{\"trace_id\": \"000000000000000000000000075bcd15\", \"span_id\": \"772e70ed55e9996b\", \"attributes\": {\"dd.kind\": \"span-pointer\", \"link.name\": \"span-pointer-down\", \"ptr.kind\": \"websocket\", \"ptr.dir\": \"d\", \"ptr.hash\": \"S000000000000000000000000075bcd15772e70ed55e9996b00000006\"}}]", "baggage.account.id": "456", "baggage.session.id": "789", "baggage.user.id": "123", "component": "fastapi", + "language": "python", "network.client.ip": "testclient", "out.host": "testclient", "span.kind": "producer" }, "metrics": { + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, "websocket.close.code": 1000 }, - "duration": 48000, - "start": 1755796956136614000 + "duration": 74000, + "start": 1762373395710462000 }]] From 47a79ed7d697b6e41a37c5bc63f85069c8eee164 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Wed, 5 Nov 2025 17:03:18 -0500 Subject: [PATCH 02/12] fix typing --- ddtrace/_trace/trace_handlers.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index e362939345b..2acb98ebf2d 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -993,21 +993,21 @@ def _set_client_ip_tags(scope: Mapping[str, Any], span: Span): log.debug("Could not validate client IP address for websocket send message: %s", str(e)) -def _init_websocket_message_counters(scope: Mapping[str, Any]) -> None: +def _init_websocket_message_counters(scope: Dict[str, Any]) -> None: if "datadog" not in scope: - scope["datadog"] = {} # type: ignore[index] + scope["datadog"] = {} if "websocket_receive_counter" not in scope["datadog"]: - scope["datadog"]["websocket_receive_counter"] = 0 # type: ignore[index] + scope["datadog"]["websocket_receive_counter"] = 0 if "websocket_send_counter" not in scope["datadog"]: - scope["datadog"]["websocket_send_counter"] = 0 # type: ignore[index] + scope["datadog"]["websocket_send_counter"] = 0 -def _increment_websocket_counter(scope: Mapping[str, Any], counter_type: str) -> int: +def _increment_websocket_counter(scope: Dict[str, Any], counter_type: str) -> int: """ Increment and return websocket message counter (either websocket_receive_counter or websocket_send_counter) """ - scope["datadog"][counter_type] += 1 # type: ignore[index] - return scope["datadog"][counter_type] # type: ignore[index,return-value] + scope["datadog"][counter_type] += 1 + return scope["datadog"][counter_type] def _build_websocket_span_pointer_hash( From 2d81a35c2086fb8708acf1e00c4147bf6678072d Mon Sep 17 00:00:00 2001 From: quinna-h Date: Fri, 7 Nov 2025 16:54:11 -0500 Subject: [PATCH 03/12] update fastapi test --- ddtrace/_trace/trace_handlers.py | 71 ++++++++++++++++----------- tests/contrib/fastapi/test_fastapi.py | 12 ++--- 2 files changed, 47 insertions(+), 36 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 2acb98ebf2d..77aeb5c4649 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -1051,7 +1051,9 @@ def _has_distributed_tracing_context(span: Span) -> bool: A span has distributed tracing context if it has a parent context that was extracted from headers (_is_remote=True). """ + # breakpoint() if not span or not span._parent_context: + # breakpoint() return False # Check if the context was extracted from remote headers return span._parent_context._is_remote @@ -1077,7 +1079,7 @@ def _on_asgi_websocket_receive_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span - + link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} # Add span pointer attributes if distributed tracing is enabled and context was extracted if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): counter = _increment_websocket_counter(scope, "websocket_receive_counter") @@ -1090,13 +1092,15 @@ def _on_asgi_websocket_receive_message(ctx, scope, message): is_incoming=True, ) - link_attributes = { - "link.name": "span-pointer-up", - "dd.kind": "span-pointer", - "ptr.kind": "websocket", - "ptr.dir": _SpanPointerDirection.UPSTREAM, - "ptr.hash": ptr_hash, - } + link_attributes.update( + { + "link.name": "span-pointer-up", + "dd.kind": "span-pointer", + "ptr.kind": "websocket", + "ptr.dir": _SpanPointerDirection.UPSTREAM, + "ptr.hash": ptr_hash, + } + ) span.set_link( trace_id=handshake_span.trace_id, @@ -1129,6 +1133,7 @@ def _on_asgi_websocket_send_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span + link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} # Add span pointer attributes if distributed tracing is enabled and context was extracted if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): @@ -1142,13 +1147,15 @@ def _on_asgi_websocket_send_message(ctx, scope, message): is_incoming=False, ) - link_attributes = { - "link.name": "span-pointer-down", - "dd.kind": "span-pointer", - "ptr.kind": "websocket", - "ptr.dir": _SpanPointerDirection.DOWNSTREAM, - "ptr.hash": ptr_hash, - } + link_attributes.update( + { + "link.name": "span-pointer-down", + "dd.kind": "span-pointer", + "ptr.kind": "websocket", + "ptr.dir": _SpanPointerDirection.DOWNSTREAM, + "ptr.hash": ptr_hash, + } + ) span.set_link( trace_id=handshake_span.trace_id, @@ -1179,7 +1186,6 @@ def _on_asgi_websocket_close_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span - # Build span link attributes link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} # Add span pointer attributes if distributed tracing is enabled and context was extracted @@ -1194,13 +1200,15 @@ def _on_asgi_websocket_close_message(ctx, scope, message): is_incoming=False, ) - link_attributes = { - "link.name": "span-pointer-down", - "dd.kind": "span-pointer", - "ptr.kind": "websocket", - "ptr.dir": _SpanPointerDirection.DOWNSTREAM, - "ptr.hash": ptr_hash, - } + link_attributes.update( + { + "link.name": "span-pointer-down", + "dd.kind": "span-pointer", + "ptr.kind": "websocket", + "ptr.dir": _SpanPointerDirection.DOWNSTREAM, + "ptr.hash": ptr_hash, + } + ) span.set_link( trace_id=handshake_span.trace_id, @@ -1228,6 +1236,7 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span + link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} # Add span pointer attributes if distributed tracing is enabled and context was extracted if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): @@ -1241,13 +1250,15 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message): is_incoming=True, ) - link_attributes = { - "link.name": "span-pointer-up", - "dd.kind": "span-pointer", - "ptr.kind": "websocket", - "ptr.dir": _SpanPointerDirection.UPSTREAM, - "ptr.hash": ptr_hash, - } + link_attributes.update( + { + "link.name": "span-pointer-up", + "dd.kind": "span-pointer", + "ptr.kind": "websocket", + "ptr.dir": _SpanPointerDirection.UPSTREAM, + "ptr.hash": ptr_hash, + } + ) span.set_link( trace_id=handshake_span.trace_id, diff --git a/tests/contrib/fastapi/test_fastapi.py b/tests/contrib/fastapi/test_fastapi.py index 43065f6012d..15c29246314 100644 --- a/tests/contrib/fastapi/test_fastapi.py +++ b/tests/contrib/fastapi/test_fastapi.py @@ -624,11 +624,11 @@ def _run_websocket_send_only_test(): fastapi_unpatch() -# @pytest.mark.subprocess( -# env=dict( -# DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true", -# ) -# ) +@pytest.mark.subprocess( + env=dict( + DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true", + ) +) @snapshot(ignores=["meta._dd.span_links", "metrics.websocket.message.length"]) def test_traced_websocket(test_spans, snapshot_app): from tests.contrib.fastapi.test_fastapi import _run_websocket_test @@ -740,7 +740,7 @@ def _run_websocket_context_propagation_test(): fastapi_unpatch() -# @pytest.mark.subprocess(env=dict(DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true")) +@pytest.mark.subprocess(env=dict(DD_TRACE_WEBSOCKET_MESSAGES_ENABLED="true")) @snapshot(ignores=["meta._dd.span_links", "metrics.websocket.message.length"]) def test_websocket_context_propagation(snapshot_app): """Test trace context propagation.""" From a399cddacfd8ab07d27e928df706abaefe3a28e7 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Fri, 7 Nov 2025 16:56:45 -0500 Subject: [PATCH 04/12] remove breakpoints --- ddtrace/_trace/trace_handlers.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 77aeb5c4649..34bc0e24d5a 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -1051,9 +1051,7 @@ def _has_distributed_tracing_context(span: Span) -> bool: A span has distributed tracing context if it has a parent context that was extracted from headers (_is_remote=True). """ - # breakpoint() if not span or not span._parent_context: - # breakpoint() return False # Check if the context was extracted from remote headers return span._parent_context._is_remote From 3e73f97691288a353092d9f0d0de59e0a7750734 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Mon, 10 Nov 2025 16:39:41 -0500 Subject: [PATCH 05/12] refactor to use constants + release note --- ddtrace/_trace/trace_handlers.py | 30 +++++++++++-------- ddtrace/internal/constants.py | 2 ++ ...socket-span-pointers-25e07939aa75527a.yaml | 4 +++ 3 files changed, 23 insertions(+), 13 deletions(-) create mode 100644 releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 34bc0e24d5a..c51713f9379 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -15,6 +15,7 @@ import ddtrace from ddtrace import config from ddtrace._trace._inferred_proxy import create_inferred_proxy_span_if_headers_exist +from ddtrace._trace._span_link import SpanLinkKind as _SpanLinkKind from ddtrace._trace._span_pointer import _SpanPointerDescription from ddtrace._trace._span_pointer import _SpanPointerDirection from ddtrace._trace.span import Span @@ -32,6 +33,7 @@ from ddtrace.contrib.internal.trace_utils import _set_url_tag from ddtrace.ext import SpanKind from ddtrace.ext import SpanLinkKind +from ddtrace.ext import SpanTypes from ddtrace.ext import db from ddtrace.ext import http from ddtrace.ext import net @@ -50,6 +52,8 @@ from ddtrace.internal.constants import MESSAGING_OPERATION from ddtrace.internal.constants import MESSAGING_SYSTEM from ddtrace.internal.constants import SPAN_LINK_KIND +from ddtrace.internal.constants import SPAN_POINTER_DOWN_DIRECTION +from ddtrace.internal.constants import SPAN_POINTER_UP_DIRECTION from ddtrace.internal.logger import get_logger from ddtrace.internal.sampling import _inherit_sampling_tags from ddtrace.internal.schema.span_attribute_schema import SpanDirection @@ -1092,9 +1096,9 @@ def _on_asgi_websocket_receive_message(ctx, scope, message): link_attributes.update( { - "link.name": "span-pointer-up", - "dd.kind": "span-pointer", - "ptr.kind": "websocket", + "link.name": SPAN_POINTER_UP_DIRECTION, + "dd.kind": _SpanLinkKind.SPAN_POINTER.value, + "ptr.kind": SpanTypes.WEBSOCKET, "ptr.dir": _SpanPointerDirection.UPSTREAM, "ptr.hash": ptr_hash, } @@ -1147,9 +1151,9 @@ def _on_asgi_websocket_send_message(ctx, scope, message): link_attributes.update( { - "link.name": "span-pointer-down", - "dd.kind": "span-pointer", - "ptr.kind": "websocket", + "link.name": SPAN_POINTER_DOWN_DIRECTION, + "dd.kind": _SpanLinkKind.SPAN_POINTER.value, + "ptr.kind": SpanTypes.WEBSOCKET, "ptr.dir": _SpanPointerDirection.DOWNSTREAM, "ptr.hash": ptr_hash, } @@ -1200,9 +1204,9 @@ def _on_asgi_websocket_close_message(ctx, scope, message): link_attributes.update( { - "link.name": "span-pointer-down", - "dd.kind": "span-pointer", - "ptr.kind": "websocket", + "link.name": SPAN_POINTER_DOWN_DIRECTION, + "dd.kind": _SpanLinkKind.SPAN_POINTER.value, + "ptr.kind": SpanTypes.WEBSOCKET, "ptr.dir": _SpanPointerDirection.DOWNSTREAM, "ptr.hash": ptr_hash, } @@ -1250,9 +1254,9 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message): link_attributes.update( { - "link.name": "span-pointer-up", - "dd.kind": "span-pointer", - "ptr.kind": "websocket", + "link.name": SPAN_POINTER_UP_DIRECTION, + "dd.kind": _SpanLinkKind.SPAN_POINTER.value, + "ptr.kind": SpanTypes.WEBSOCKET, "ptr.dir": _SpanPointerDirection.UPSTREAM, "ptr.hash": ptr_hash, } @@ -1286,7 +1290,7 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None: scope["datadog"]["request_spans"].append(span) if scope["type"] == "websocket": - span._set_tag_str("http.upgraded", "websocket") + span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET) # Initialize websocket message counters for span pointer tracking _init_websocket_message_counters(scope) diff --git a/ddtrace/internal/constants.py b/ddtrace/internal/constants.py index fc572d0ffaa..c7f3722b87c 100644 --- a/ddtrace/internal/constants.py +++ b/ddtrace/internal/constants.py @@ -47,6 +47,8 @@ SAMPLING_DECISION_MAKER_RESOURCE = "_dd.dm.resource" SPAN_LINK_KIND = "dd.kind" SPAN_LINKS_KEY = "_dd.span_links" +SPAN_POINTER_DOWN_DIRECTION = "span-pointer-down" +SPAN_POINTER_UP_DIRECTION = "span-pointer-up" SPAN_EVENTS_KEY = "events" SPAN_API_DATADOG = "datadog" SPAN_API_OTEL = "otel" diff --git a/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml b/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml new file mode 100644 index 00000000000..554170503d8 --- /dev/null +++ b/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Adds span pointers for WebSocket message tracing to enable distributed context propagation across client-server boundaries. From 142785ffc300b6c052e5dbef456f19668c781004 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Mon, 10 Nov 2025 16:42:15 -0500 Subject: [PATCH 06/12] wip --- ddtrace/_trace/trace_handlers.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index c51713f9379..b361e57453f 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -1026,13 +1026,6 @@ def _build_websocket_span_pointer_hash( Format: <128 bit hex trace id><64 bit hex span id><32 bit hex counter> Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing - - :param handshake_trace_id: Trace ID from the handshake span - :param handshake_span_id: Span ID from the handshake span - :param counter: Message counter value - :param is_server: True if running on server side, False if client - :param is_incoming: True if incoming message, False if outgoing - :return: Span pointer hash string """ if (is_server and not is_incoming) or (not is_server and is_incoming): prefix = "S" From 4383b5e03b0f1b2780263cbb3f15590e9de9e318 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Tue, 11 Nov 2025 11:10:02 -0500 Subject: [PATCH 07/12] cleanup comments --- ddtrace/_trace/trace_handlers.py | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index b361e57453f..c3fdb789838 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -1046,20 +1046,16 @@ def _has_distributed_tracing_context(span: Span) -> bool: A websocket server must not set the span pointer if the handshake has not extracted a context A span has distributed tracing context if it has a parent context that was - extracted from headers (_is_remote=True). + extracted from headers. """ if not span or not span._parent_context: return False - # Check if the context was extracted from remote headers return span._parent_context._is_remote def _on_asgi_websocket_receive_message(ctx, scope, message): """ Handle websocket receive message events. - - This handler is called when a websocket receive message event is dispatched. - It sets up the span with appropriate tags, metrics, and links. """ span = ctx.span integration_config = ctx.get_item("integration_config") @@ -1075,7 +1071,6 @@ def _on_asgi_websocket_receive_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} - # Add span pointer attributes if distributed tracing is enabled and context was extracted if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): counter = _increment_websocket_counter(scope, "websocket_receive_counter") @@ -1112,9 +1107,6 @@ def _on_asgi_websocket_receive_message(ctx, scope, message): def _on_asgi_websocket_send_message(ctx, scope, message): """ Handle websocket send message events. - - This handler is called when a websocket send message event is dispatched. - It sets up the span with appropriate tags, metrics, and links. """ span = ctx.span integration_config = ctx.get_item("integration_config") @@ -1130,7 +1122,6 @@ def _on_asgi_websocket_send_message(ctx, scope, message): handshake_span = ctx.parent.span link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} - # Add span pointer attributes if distributed tracing is enabled and context was extracted if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): counter = _increment_websocket_counter(scope, "websocket_send_counter") @@ -1162,9 +1153,6 @@ def _on_asgi_websocket_send_message(ctx, scope, message): def _on_asgi_websocket_close_message(ctx, scope, message): """ Handle websocket close message events. - - This handler is called when a websocket close message event is dispatched. - It sets up the span with appropriate tags, metrics, and links. """ span = ctx.span integration_config = ctx.get_item("integration_config") @@ -1183,7 +1171,6 @@ def _on_asgi_websocket_close_message(ctx, scope, message): link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} - # Add span pointer attributes if distributed tracing is enabled and context was extracted if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): counter = _increment_websocket_counter(scope, "websocket_send_counter") @@ -1217,9 +1204,6 @@ def _on_asgi_websocket_close_message(ctx, scope, message): def _on_asgi_websocket_disconnect_message(ctx, scope, message): """ Handle websocket disconnect message events. - - This handler is called when a websocket disconnect message event is dispatched. - It sets up the span with appropriate tags, metrics, and links. """ span = ctx.span integration_config = ctx.get_item("integration_config") @@ -1233,7 +1217,6 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message): handshake_span = ctx.parent.span link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} - # Add span pointer attributes if distributed tracing is enabled and context was extracted if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): counter = _increment_websocket_counter(scope, "websocket_receive_counter") @@ -1284,7 +1267,6 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None: if scope["type"] == "websocket": span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET) - # Initialize websocket message counters for span pointer tracking _init_websocket_message_counters(scope) From 6e2d23797d28d888b0a5bb71bc07ecf5dcc8cb51 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Tue, 11 Nov 2025 11:16:36 -0500 Subject: [PATCH 08/12] wip --- ddtrace/_trace/trace_handlers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index c3fdb789838..0f1674ade3e 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -1260,15 +1260,15 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None: span = _start_span(ctx) ctx.set_item("req_span", span) + if scope["type"] == "websocket": + span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET) + _init_websocket_message_counters(scope) + if "datadog" not in scope: scope["datadog"] = {"request_spans": [span]} else: scope["datadog"]["request_spans"].append(span) - if scope["type"] == "websocket": - span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET) - _init_websocket_message_counters(scope) - def listen(): core.on("wsgi.request.prepare", _on_request_prepare) From 8ce652085f4fb234158c1e2e204a476ae421f7c7 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Tue, 11 Nov 2025 13:37:51 -0500 Subject: [PATCH 09/12] refactor --- ddtrace/_trace/trace_handlers.py | 146 +++++++++++++------------------ 1 file changed, 61 insertions(+), 85 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 0f1674ade3e..c51998d0e4f 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -1053,6 +1053,50 @@ def _has_distributed_tracing_context(span: Span) -> bool: return span._parent_context._is_remote +def _add_websocket_span_pointer_attributes( + link_attributes: Dict[str, Any], + integration_config: Any, + handshake_span: Span, + scope: Dict[str, Any], + is_incoming: bool, +) -> None: + """ + Add span pointer attributes to link_attributes for websocket message correlation. + """ + + if not integration_config.distributed_tracing or not _has_distributed_tracing_context(handshake_span): + return + + # Increment counter based on message direction + counter_type = "websocket_receive_counter" if is_incoming else "websocket_send_counter" + counter = _increment_websocket_counter(scope, counter_type) + + ptr_hash = _build_websocket_span_pointer_hash( + handshake_trace_id=handshake_span.trace_id, + handshake_span_id=handshake_span.span_id, + counter=counter, + is_server=True, + is_incoming=is_incoming, + ) + + if is_incoming: + link_name = SPAN_POINTER_UP_DIRECTION + ptr_direction = _SpanPointerDirection.UPSTREAM + else: + link_name = SPAN_POINTER_DOWN_DIRECTION + ptr_direction = _SpanPointerDirection.DOWNSTREAM + + link_attributes.update( + { + "link.name": link_name, + "dd.kind": _SpanLinkKind.SPAN_POINTER.value, + "ptr.kind": SpanTypes.WEBSOCKET, + "ptr.dir": ptr_direction, + "ptr.hash": ptr_hash, + } + ) + + def _on_asgi_websocket_receive_message(ctx, scope, message): """ Handle websocket receive message events. @@ -1071,26 +1115,10 @@ def _on_asgi_websocket_receive_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} - if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): - counter = _increment_websocket_counter(scope, "websocket_receive_counter") - - ptr_hash = _build_websocket_span_pointer_hash( - handshake_trace_id=handshake_span.trace_id, - handshake_span_id=handshake_span.span_id, - counter=counter, - is_server=True, - is_incoming=True, - ) - - link_attributes.update( - { - "link.name": SPAN_POINTER_UP_DIRECTION, - "dd.kind": _SpanLinkKind.SPAN_POINTER.value, - "ptr.kind": SpanTypes.WEBSOCKET, - "ptr.dir": _SpanPointerDirection.UPSTREAM, - "ptr.hash": ptr_hash, - } - ) + + _add_websocket_span_pointer_attributes( + link_attributes, integration_config, handshake_span, scope, is_incoming=True + ) span.set_link( trace_id=handshake_span.trace_id, @@ -1122,26 +1150,9 @@ def _on_asgi_websocket_send_message(ctx, scope, message): handshake_span = ctx.parent.span link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} - if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): - counter = _increment_websocket_counter(scope, "websocket_send_counter") - - ptr_hash = _build_websocket_span_pointer_hash( - handshake_trace_id=handshake_span.trace_id, - handshake_span_id=handshake_span.span_id, - counter=counter, - is_server=True, - is_incoming=False, - ) - - link_attributes.update( - { - "link.name": SPAN_POINTER_DOWN_DIRECTION, - "dd.kind": _SpanLinkKind.SPAN_POINTER.value, - "ptr.kind": SpanTypes.WEBSOCKET, - "ptr.dir": _SpanPointerDirection.DOWNSTREAM, - "ptr.hash": ptr_hash, - } - ) + _add_websocket_span_pointer_attributes( + link_attributes, integration_config, handshake_span, scope, is_incoming=False + ) span.set_link( trace_id=handshake_span.trace_id, @@ -1168,29 +1179,11 @@ def _on_asgi_websocket_close_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span - link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} - if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): - counter = _increment_websocket_counter(scope, "websocket_send_counter") - - ptr_hash = _build_websocket_span_pointer_hash( - handshake_trace_id=handshake_span.trace_id, - handshake_span_id=handshake_span.span_id, - counter=counter, - is_server=True, - is_incoming=False, - ) - - link_attributes.update( - { - "link.name": SPAN_POINTER_DOWN_DIRECTION, - "dd.kind": _SpanLinkKind.SPAN_POINTER.value, - "ptr.kind": SpanTypes.WEBSOCKET, - "ptr.dir": _SpanPointerDirection.DOWNSTREAM, - "ptr.hash": ptr_hash, - } - ) + _add_websocket_span_pointer_attributes( + link_attributes, integration_config, handshake_span, scope, is_incoming=False + ) span.set_link( trace_id=handshake_span.trace_id, @@ -1217,26 +1210,9 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message): handshake_span = ctx.parent.span link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} - if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span): - counter = _increment_websocket_counter(scope, "websocket_receive_counter") - - ptr_hash = _build_websocket_span_pointer_hash( - handshake_trace_id=handshake_span.trace_id, - handshake_span_id=handshake_span.span_id, - counter=counter, - is_server=True, - is_incoming=True, - ) - - link_attributes.update( - { - "link.name": SPAN_POINTER_UP_DIRECTION, - "dd.kind": _SpanLinkKind.SPAN_POINTER.value, - "ptr.kind": SpanTypes.WEBSOCKET, - "ptr.dir": _SpanPointerDirection.UPSTREAM, - "ptr.hash": ptr_hash, - } - ) + _add_websocket_span_pointer_attributes( + link_attributes, integration_config, handshake_span, scope, is_incoming=True + ) span.set_link( trace_id=handshake_span.trace_id, @@ -1260,15 +1236,15 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None: span = _start_span(ctx) ctx.set_item("req_span", span) - if scope["type"] == "websocket": - span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET) - _init_websocket_message_counters(scope) - if "datadog" not in scope: scope["datadog"] = {"request_spans": [span]} else: scope["datadog"]["request_spans"].append(span) + if scope["type"] == "websocket": + span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET) + _init_websocket_message_counters(scope) + def listen(): core.on("wsgi.request.prepare", _on_request_prepare) From b07a27fe0eb6d68164168afd6debc0de9a99a025 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Thu, 13 Nov 2025 14:35:36 -0500 Subject: [PATCH 10/12] refactor to address comments --- ddtrace/_trace/_span_pointer.py | 5 +++ ddtrace/_trace/trace_handlers.py | 45 +++++++------------ ddtrace/internal/constants.py | 3 +- ...socket-span-pointers-25e07939aa75527a.yaml | 2 +- 4 files changed, 23 insertions(+), 32 deletions(-) diff --git a/ddtrace/_trace/_span_pointer.py b/ddtrace/_trace/_span_pointer.py index cad57bfcda5..b0f4a7866af 100644 --- a/ddtrace/_trace/_span_pointer.py +++ b/ddtrace/_trace/_span_pointer.py @@ -24,6 +24,11 @@ class _SpanPointerDirection(Enum): DOWNSTREAM = "d" +class _SpanPointerDirectionName(Enum): + UPSTREAM = "span-pointer-up" + DOWNSTREAM = "span-pointer-down" + + class _SpanPointerDescription(NamedTuple): # Not to be confused with _SpanPointer. This class describes the parameters # required to attach a span pointer to a Span. It lets us decouple code diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index c51998d0e4f..ab3341c322b 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -18,6 +18,7 @@ from ddtrace._trace._span_link import SpanLinkKind as _SpanLinkKind from ddtrace._trace._span_pointer import _SpanPointerDescription from ddtrace._trace._span_pointer import _SpanPointerDirection +from ddtrace._trace._span_pointer import _SpanPointerDirectionName from ddtrace._trace.span import Span from ddtrace._trace.utils import extract_DD_context_from_messages from ddtrace.constants import _SPAN_MEASURED_KEY @@ -46,14 +47,13 @@ from ddtrace.internal.constants import FLASK_ENDPOINT from ddtrace.internal.constants import FLASK_URL_RULE from ddtrace.internal.constants import FLASK_VIEW_ARGS +from ddtrace.internal.constants import HTTP_REQUEST_UPGRADED from ddtrace.internal.constants import MESSAGING_BATCH_COUNT from ddtrace.internal.constants import MESSAGING_DESTINATION_NAME from ddtrace.internal.constants import MESSAGING_MESSAGE_ID from ddtrace.internal.constants import MESSAGING_OPERATION from ddtrace.internal.constants import MESSAGING_SYSTEM from ddtrace.internal.constants import SPAN_LINK_KIND -from ddtrace.internal.constants import SPAN_POINTER_DOWN_DIRECTION -from ddtrace.internal.constants import SPAN_POINTER_UP_DIRECTION from ddtrace.internal.logger import get_logger from ddtrace.internal.sampling import _inherit_sampling_tags from ddtrace.internal.schema.span_attribute_schema import SpanDirection @@ -62,6 +62,9 @@ log = get_logger(__name__) +_WEBSOCKET_LINK_ATTRS_EXECUTED = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} +_WEBSOCKET_LINK_ATTRS_RESUMING = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} + class _TracedIterable(wrapt.ObjectProxy): def __init__(self, wrapped, span, parent_span, wrapped_is_iterator=False): @@ -1080,10 +1083,10 @@ def _add_websocket_span_pointer_attributes( ) if is_incoming: - link_name = SPAN_POINTER_UP_DIRECTION + link_name = _SpanPointerDirectionName.UPSTREAM ptr_direction = _SpanPointerDirection.UPSTREAM else: - link_name = SPAN_POINTER_DOWN_DIRECTION + link_name = _SpanPointerDirectionName.DOWNSTREAM ptr_direction = _SpanPointerDirection.DOWNSTREAM link_attributes.update( @@ -1114,17 +1117,13 @@ def _on_asgi_websocket_receive_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span - link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} + link_attributes = _WEBSOCKET_LINK_ATTRS_EXECUTED.copy() _add_websocket_span_pointer_attributes( link_attributes, integration_config, handshake_span, scope, is_incoming=True ) - span.set_link( - trace_id=handshake_span.trace_id, - span_id=handshake_span.span_id, - attributes=link_attributes, - ) + span.link_span(handshake_span.context, link_attributes) if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True): _inherit_sampling_tags(span, handshake_span._local_root) @@ -1148,17 +1147,13 @@ def _on_asgi_websocket_send_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span - link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} + link_attributes = _WEBSOCKET_LINK_ATTRS_RESUMING.copy() _add_websocket_span_pointer_attributes( link_attributes, integration_config, handshake_span, scope, is_incoming=False ) - span.set_link( - trace_id=handshake_span.trace_id, - span_id=handshake_span.span_id, - attributes=link_attributes, - ) + span.link_span(handshake_span.context, link_attributes) def _on_asgi_websocket_close_message(ctx, scope, message): @@ -1179,17 +1174,13 @@ def _on_asgi_websocket_close_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span - link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING} + link_attributes = _WEBSOCKET_LINK_ATTRS_RESUMING.copy() _add_websocket_span_pointer_attributes( link_attributes, integration_config, handshake_span, scope, is_incoming=False ) - span.set_link( - trace_id=handshake_span.trace_id, - span_id=handshake_span.span_id, - attributes=link_attributes, - ) + span.link_span(handshake_span.context, link_attributes) _copy_trace_level_tags(span, handshake_span) @@ -1208,17 +1199,13 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message): if hasattr(ctx, "parent") and ctx.parent.span: handshake_span = ctx.parent.span - link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED} + link_attributes = _WEBSOCKET_LINK_ATTRS_EXECUTED.copy() _add_websocket_span_pointer_attributes( link_attributes, integration_config, handshake_span, scope, is_incoming=True ) - span.set_link( - trace_id=handshake_span.trace_id, - span_id=handshake_span.span_id, - attributes=link_attributes, - ) + span.link_span(handshake_span.context, link_attributes) if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True): _inherit_sampling_tags(span, handshake_span._local_root) @@ -1242,7 +1229,7 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None: scope["datadog"]["request_spans"].append(span) if scope["type"] == "websocket": - span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET) + span._set_tag_str(HTTP_REQUEST_UPGRADED, SpanTypes.WEBSOCKET) _init_websocket_message_counters(scope) diff --git a/ddtrace/internal/constants.py b/ddtrace/internal/constants.py index 34d989422ef..4df0ac4c3b6 100644 --- a/ddtrace/internal/constants.py +++ b/ddtrace/internal/constants.py @@ -47,8 +47,6 @@ SAMPLING_DECISION_MAKER_RESOURCE = "_dd.dm.resource" SPAN_LINK_KIND = "dd.kind" SPAN_LINKS_KEY = "_dd.span_links" -SPAN_POINTER_DOWN_DIRECTION = "span-pointer-down" -SPAN_POINTER_UP_DIRECTION = "span-pointer-up" SPAN_EVENTS_KEY = "events" SPAN_API_DATADOG = "datadog" SPAN_API_OTEL = "otel" @@ -69,6 +67,7 @@ HTTP_REQUEST_HEADER = "http.request.header" HTTP_REQUEST_PARAMETER = "http.request.parameter" HTTP_REQUEST_BODY = "http.request.body" +HTTP_REQUEST_UPGRADED = "http.upgraded" HTTP_REQUEST_PATH_PARAMETER = "http.request.path.parameter" REQUEST_PATH_PARAMS = "http.request.path_params" STATUS_403_TYPE_AUTO = {"status_code": 403, "type": "auto"} diff --git a/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml b/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml index 554170503d8..6a238d33689 100644 --- a/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml +++ b/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml @@ -1,4 +1,4 @@ --- features: - | - Adds span pointers for WebSocket message tracing to enable distributed context propagation across client-server boundaries. + feat(asgi): Enable context propagation between websocket messages by implementing span pointers between incoming and outgoing messages. From 3cdf96b75b38bbbe9c9a8cb106d739d2a81847d4 Mon Sep 17 00:00:00 2001 From: quinna-h Date: Thu, 13 Nov 2025 15:10:31 -0500 Subject: [PATCH 11/12] minor edit release note --- .../notes/websocket-span-pointers-25e07939aa75527a.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml b/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml index 6a238d33689..cd2e411f631 100644 --- a/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml +++ b/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml @@ -1,4 +1,4 @@ --- features: - | - feat(asgi): Enable context propagation between websocket messages by implementing span pointers between incoming and outgoing messages. + feat(asgi): Enable context propagation between websocket message spans by implementing span pointers between incoming and outgoing messages. From af2c76a78a90d847f687504910cb4a635f8b0da4 Mon Sep 17 00:00:00 2001 From: Quinna Halim Date: Fri, 14 Nov 2025 11:30:22 -0500 Subject: [PATCH 12/12] Update releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml Co-authored-by: Brett Langdon --- .../notes/websocket-span-pointers-25e07939aa75527a.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml b/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml index cd2e411f631..75d9fb68c55 100644 --- a/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml +++ b/releasenotes/notes/websocket-span-pointers-25e07939aa75527a.yaml @@ -1,4 +1,4 @@ --- features: - | - feat(asgi): Enable context propagation between websocket message spans by implementing span pointers between incoming and outgoing messages. + feat(asgi): Enable context propagation between websocket message spans.