Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions newrelic/api/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ def normalize_name(self, name, rule_type="url"):
return self._agent.normalize_name(self._name, name, rule_type)
return name, False

def compute_sampled(self):
def compute_sampled(self, full_granularity, section, *args, **kwargs):
if not self.active or not self.settings.distributed_tracing.enabled:
return False

return self._agent.compute_sampled(self._name)
return self._agent.compute_sampled(self._name, full_granularity, section, *args, **kwargs)


def application_instance(name=None, activate=True):
Expand Down
146 changes: 114 additions & 32 deletions newrelic/api/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ def __init__(self, application, enabled=None, source=None):
self.tracestate = ""
self._priority = None
self._sampled = None
self._traceparent_sampled = None
# Remote parent sampled is set from the W3C parent header or the Newrelic header if no W3C parent header is present.
self._remote_parent_sampled = None

self._distributed_trace_state = 0

Expand Down Expand Up @@ -569,7 +570,7 @@ def __exit__(self, exc, value, tb):
if self._settings.distributed_tracing.enabled:
# Sampled and priority need to be computed at the end of the
# transaction when distributed tracing or span events are enabled.
self._compute_sampled_and_priority()
self._make_sampling_decision()

self._cached_path._name = self.path
agent_attributes = self.agent_attributes
Expand Down Expand Up @@ -636,6 +637,7 @@ def __exit__(self, exc, value, tb):
trace_id=self.trace_id,
loop_time=self._loop_time,
root=root_node,
partial_granularity_sampled=hasattr(self, "partial_granularity_sampled"),
)

# Clear settings as we are all done and don't need it
Expand Down Expand Up @@ -1004,35 +1006,117 @@ def _update_agent_attributes(self):
def user_attributes(self):
return create_attributes(self._custom_params, DST_ALL, self.attribute_filter)

def sampling_algo_compute_sampled_and_priority(self):
if self._priority is None:
def sampling_algo_compute_sampled_and_priority(self, priority, sampled, sampler_kwargs):
# self._priority and self._sampled are set when parsing the W3C tracestate
# or newrelic DT headers and may be overridden in _make_sampling_decision
# based on the configuration. The only time they are set in here is when the
# sampling decision must be made by the adaptive sampling algorithm.
if priority is None:
# Truncate priority field to 6 digits past the decimal.
self._priority = float(f"{random.random():.6f}") # noqa: S311
if self._sampled is None:
self._sampled = self._application.compute_sampled()
if self._sampled:
self._priority += 1

def _compute_sampled_and_priority(self):
if self._traceparent_sampled is None:
priority = float(f"{random.random():.6f}") # noqa: S311
if sampled is None:
_logger.debug("No trusted account id found. Sampling decision will be made by adaptive sampling algorithm.")
sampled = self._application.compute_sampled(**sampler_kwargs)
if sampled:
priority += 1
return priority, sampled

def _compute_sampled_and_priority(
self, priority, sampled, full_granularity, remote_parent_sampled_setting, remote_parent_not_sampled_setting
):
if self._remote_parent_sampled is None:
section = 0
config = "default" # Use sampling algo.
elif self._traceparent_sampled:
setting_path = "distributed_tracing.sampler.remote_parent_sampled"
config = self.settings.distributed_tracing.sampler.remote_parent_sampled
else: # self._traceparent_sampled is False.
setting_path = "distributed_tracing.sampler.remote_parent_not_sampled"
config = self.settings.distributed_tracing.sampler.remote_parent_not_sampled

_logger.debug("Sampling decision made based on no remote parent sampling decision present.")
elif self._remote_parent_sampled:
section = 1
setting_path = f"distributed_tracing.sampler.{'full_granularity' if full_granularity else 'partial_granularity'}.remote_parent_sampled"
config = remote_parent_sampled_setting
_logger.debug(
"Sampling decision made based on remote_parent_sampled=%s and %s=%s.",
self._remote_parent_sampled,
setting_path,
config,
)
else: # self._remote_parent_sampled is False.
section = 2
setting_path = f"distributed_tracing.sampler.{'full_granularity' if full_granularity else 'partial_granularity'}.remote_parent_not_sampled"
config = remote_parent_not_sampled_setting
_logger.debug(
"Sampling decision made based on remote_parent_sampled=%s and %s=%s.",
self._remote_parent_sampled,
setting_path,
config,
)
if config == "always_on":
self._sampled = True
self._priority = 2.0
sampled = True
priority = 2.0
elif config == "always_off":
self._sampled = False
self._priority = 0
sampled = False
priority = 0
else:
if config != "default":
if config not in ("default", "adaptive"):
_logger.warning("%s=%s is not a recognized value. Using 'default' instead.", setting_path, config)
self.sampling_algo_compute_sampled_and_priority()

_logger.debug(
"Let adaptive sampler algorithm decide based on sampled=%s and priority=%s.", sampled, priority
)
priority, sampled = self.sampling_algo_compute_sampled_and_priority(
priority, sampled, {"full_granularity": full_granularity, "section": section}
)
return priority, sampled

def _make_sampling_decision(self):
# The sampling decision is computed each time a DT header is generated for exit spans as it is needed
# to send the DT headers. Don't recompute the sampling decision multiple times as it is expensive.
if hasattr(self, "_sampling_decision_made"):
return
priority = self._priority
sampled = self._sampled
# Compute sampling decision for full granularity.
if self.settings.distributed_tracing.sampler.full_granularity.enabled:
_logger.debug(
"Full granularity tracing is enabled. Asking if full granularity wants to sample. priority=%s, sampled=%s",
priority,
sampled,
)
computed_priority, computed_sampled = self._compute_sampled_and_priority(
priority,
sampled,
full_granularity=True,
remote_parent_sampled_setting=self.settings.distributed_tracing.sampler.full_granularity._remote_parent_sampled,
remote_parent_not_sampled_setting=self.settings.distributed_tracing.sampler.full_granularity._remote_parent_not_sampled,
)
_logger.debug("Full granularity sampling decision was %s with priority=%s.", sampled, priority)
if computed_sampled or not self.settings.distributed_tracing.sampler.partial_granularity.enabled:
self._priority = computed_priority
self._sampled = computed_sampled
self._sampling_decision_made = True
return

# If full granularity is not going to sample, let partial granularity decide.
if self.settings.distributed_tracing.sampler.partial_granularity.enabled:
_logger.debug("Partial granularity tracing is enabled. Asking if partial granularity wants to sample.")
self._priority, self._sampled = self._compute_sampled_and_priority(
priority,
sampled,
full_granularity=False,
remote_parent_sampled_setting=self.settings.distributed_tracing.sampler.partial_granularity._remote_parent_sampled,
remote_parent_not_sampled_setting=self.settings.distributed_tracing.sampler.partial_granularity._remote_parent_not_sampled,
)
_logger.debug(
"Partial granularity sampling decision was %s with priority=%s.", self._sampled, self._priority
)
self._sampling_decision_made = True
if self._sampled:
self.partial_granularity_sampled = True
return

# This is only reachable if both full and partial granularity tracing are off.
# Set priority=0 and do not sample. This enables DT headers to still be sent
# even if the trace is never sampled.
self._priority = 0
self._sampled = False

def _freeze_path(self):
if self._frozen_path is None:
Expand Down Expand Up @@ -1101,7 +1185,7 @@ def _create_distributed_trace_data(self):
if not (account_id and application_id and trusted_account_key and settings.distributed_tracing.enabled):
return

self._compute_sampled_and_priority()
self._make_sampling_decision()
data = {
"ty": "App",
"ac": account_id,
Expand Down Expand Up @@ -1204,7 +1288,7 @@ def _accept_distributed_trace_payload(self, payload, transport_type="HTTP"):
if not any(k in data for k in ("id", "tx")):
self._record_supportability("Supportability/DistributedTrace/AcceptPayload/ParseException")
return False

self._remote_parent_sampled = data.get("sa")
settings = self._settings
account_id = data.get("ac")
trusted_account_key = settings.trusted_account_key or (
Expand Down Expand Up @@ -1254,10 +1338,8 @@ def _accept_distributed_trace_data(self, data, transport_type):

self._trace_id = data.get("tr")

priority = data.get("pr")
if priority is not None:
self._priority = priority
self._sampled = data.get("sa")
self._priority = data.get("pr")
self._sampled = data.get("sa")

if "ti" in data:
transport_start = data["ti"] / 1000.0
Expand Down Expand Up @@ -1297,6 +1379,7 @@ def accept_distributed_trace_headers(self, headers, transport_type="HTTP"):
try:
traceparent = ensure_str(traceparent).strip()
data = W3CTraceParent.decode(traceparent)
self._remote_parent_sampled = data.pop("sa", None)
except:
data = None

Expand Down Expand Up @@ -1332,7 +1415,6 @@ def accept_distributed_trace_headers(self, headers, transport_type="HTTP"):
else:
self._record_supportability("Supportability/TraceContext/TraceState/NoNrEntry")

self._traceparent_sampled = data.get("sa")
self._accept_distributed_trace_data(data, transport_type)
self._record_supportability("Supportability/TraceContext/Accept/Success")
return True
Expand Down
Loading
Loading