diff --git a/src/sentry/seer/explorer/index_data.py b/src/sentry/seer/explorer/index_data.py index fb75e3aa628586..b8a989867679e2 100644 --- a/src/sentry/seer/explorer/index_data.py +++ b/src/sentry/seer/explorer/index_data.py @@ -233,7 +233,7 @@ def _fetch_and_process_profile( concurrently from multiple threads. Args: - profile_info: Dictionary containing profile metadata (profile_id, span_id, etc.) + profile_info: Dictionary containing profile metadata (profile_id, is_continuous, start_ts, end_ts) organization_id: Organization ID project_id: Project ID trace_id: Trace ID for logging @@ -242,7 +242,6 @@ def _fetch_and_process_profile( ProfileData if successful, None otherwise """ profile_id = profile_info["profile_id"] - span_id = profile_info["span_id"] transaction_name = profile_info["transaction_name"] is_continuous = profile_info["is_continuous"] start_ts = profile_info["start_ts"] @@ -275,7 +274,6 @@ def _fetch_and_process_profile( if execution_tree: return ProfileData( profile_id=profile_id, - span_id=span_id, transaction_name=transaction_name, execution_tree=execution_tree, project_id=project_id, @@ -297,7 +295,7 @@ def _fetch_and_process_profile( def get_profiles_for_trace(trace_id: str, project_id: int) -> TraceProfiles | None: """ - Get profiles for a given trace, with one profile per unique span/transaction. + Get profiles for a given trace, supporting both transaction and continuous profiles. Args: trace_id: The trace ID to find profiles for @@ -328,129 +326,77 @@ def get_profiles_for_trace(trace_id: str, project_id: int) -> TraceProfiles | No auto_fields=True, ) - # Step 1: Find spans in the trace that have profile data - using same constraint as flamegraph - profiling_constraint = "(has:profile.id) or (has:profiler.id has:thread.id)" + # Use aggregation query to get unique profile IDs and trace time range + # Query for both transaction profiles (profile.id) and continuous profiles (profiler.id) profiles_result = Spans.run_table_query( params=snuba_params, - query_string=f"trace:{trace_id} project.id:{project_id} {profiling_constraint}", + query_string=f"trace:{trace_id} project.id:{project_id} (has:profile.id OR has:profiler.id)", selected_columns=[ - "span_id", "profile.id", "profiler.id", - "thread.id", - "transaction", - "span.op", - "is_transaction", - "precise.start_ts", - "precise.finish_ts", + "min(precise.start_ts)", + "max(precise.finish_ts)", ], - orderby=["precise.start_ts"], + orderby=[], offset=0, - limit=50, + limit=5, referrer=Referrer.SEER_RPC, config=config, sampling_mode="NORMAL", ) - # Step 2: Collect all profiles and merge those with same profile_id and is_continuous - all_profiles = [] + profile_data = [] for row in profiles_result.get("data", []): - span_id = row.get("span_id") profile_id = row.get("profile.id") # Transaction profiles profiler_id = row.get("profiler.id") # Continuous profiles - transaction_name = row.get("transaction") - start_ts = row.get("precise.start_ts") - end_ts = row.get("precise.finish_ts") - - logger.info( - "Iterating over span to get profiles", - extra={ - "span_id": span_id, - "profile_id": profile_id, - "profiler_id": profiler_id, - "transaction_name": transaction_name, - }, - ) - - if not span_id: - logger.info( - "Span doesn't have an id, skipping", - extra={"span_id": span_id}, - ) - continue + start_ts = row.get("min(precise.start_ts)") + end_ts = row.get("max(precise.finish_ts)") - # Use profile.id first (transaction profiles), fallback to profiler.id (continuous profiles) - actual_profile_id = profile_id or profiler_id + actual_profile_id = profiler_id or profile_id if not actual_profile_id: - logger.info( - "Span doesn't have a profile or profiler id, skipping", - extra={"span_id": span_id}, - ) continue - # Determine if this is a continuous profile (profiler.id without profile.id) - is_continuous = profile_id is None and profiler_id is not None + is_continuous = profiler_id is not None - all_profiles.append( + profile_data.append( { - "span_id": span_id, "profile_id": actual_profile_id, - "transaction_name": transaction_name, "is_continuous": is_continuous, "start_ts": start_ts, "end_ts": end_ts, } ) - # Merge profiles with same profile_id and is_continuous - # Use the earliest start_ts and latest end_ts for merged profiles - profile_groups = {} - for profile in all_profiles: - key = (profile["profile_id"], profile["is_continuous"]) - - if key not in profile_groups: - profile_groups[key] = { - "span_id": profile["span_id"], # Keep the first span_id - "profile_id": profile["profile_id"], - "transaction_name": profile["transaction_name"], - "is_continuous": profile["is_continuous"], - "start_ts": profile["start_ts"], - "end_ts": profile["end_ts"], - } - else: - # Merge time ranges - use earliest start and latest end - existing = profile_groups[key] - if profile["start_ts"] and ( - existing["start_ts"] is None or profile["start_ts"] < existing["start_ts"] - ): - existing["start_ts"] = profile["start_ts"] - if profile["end_ts"] and ( - existing["end_ts"] is None or profile["end_ts"] > existing["end_ts"] - ): - existing["end_ts"] = profile["end_ts"] - - unique_profiles = list(profile_groups.values()) - - logger.info( - "Merged profiles", - extra={ - "original_count": len(all_profiles), - "merged_count": len(unique_profiles), - }, - ) - - if not unique_profiles: + if not profile_data: logger.info( "No profiles found for trace", extra={"trace_id": trace_id, "project_id": project_id}, ) return None - # Step 3: Fetch and process profiles in parallel + logger.info( + "Found unique profiles for trace", + extra={ + "trace_id": trace_id, + "profile_count": len(profile_data), + }, + ) + + # Fetch and process profiles in parallel processed_profiles = [] + profiles_to_fetch = [ + { + "profile_id": p["profile_id"], + "transaction_name": None, + "is_continuous": p["is_continuous"], + "start_ts": p["start_ts"], + "end_ts": p["end_ts"], + } + for p in profile_data + ] - with ThreadPoolExecutor(max_workers=min(len(unique_profiles), 10)) as executor: + with ThreadPoolExecutor(max_workers=min(len(profiles_to_fetch), 5)) as executor: future_to_profile = { executor.submit( _fetch_and_process_profile, @@ -459,7 +405,7 @@ def get_profiles_for_trace(trace_id: str, project_id: int) -> TraceProfiles | No project_id, trace_id, ): profile_info - for profile_info in unique_profiles + for profile_info in profiles_to_fetch } for future in as_completed(future_to_profile): diff --git a/src/sentry/seer/sentry_data_models.py b/src/sentry/seer/sentry_data_models.py index 9c64392cdb70c2..c8fa55080aea36 100644 --- a/src/sentry/seer/sentry_data_models.py +++ b/src/sentry/seer/sentry_data_models.py @@ -58,7 +58,6 @@ class ExecutionTreeNode(BaseModel): class ProfileData(BaseModel): profile_id: str - span_id: str transaction_name: str | None execution_tree: list[ExecutionTreeNode] project_id: int diff --git a/tests/sentry/seer/explorer/test_index_data.py b/tests/sentry/seer/explorer/test_index_data.py index a022bc779715b8..4363d82f267189 100644 --- a/tests/sentry/seer/explorer/test_index_data.py +++ b/tests/sentry/seer/explorer/test_index_data.py @@ -3,6 +3,7 @@ from unittest import mock import orjson +import pytest from sentry.search.snuba.backend import EventsDatasetSnubaSearchBackend from sentry.seer.explorer.index_data import ( @@ -372,8 +373,8 @@ def mock_service_response(method, path, *args, **kwargs): json_data=mock.ANY, ) - def test_get_profiles_for_trace_merges_duplicate_profiles(self) -> None: - """Test that profiles with same profile_id and is_continuous are merged, regardless of transaction name.""" + def test_get_profiles_for_trace_aggregates_duplicate_profiles(self) -> None: + """Test that aggregation query returns one row per unique profile_id.""" trace_id = "b" * 32 # Valid 32-char hex trace ID profile_id = uuid.uuid4().hex # Same profile ID for multiple spans @@ -381,6 +382,7 @@ def test_get_profiles_for_trace_merges_duplicate_profiles(self) -> None: transaction_name2 = "api/different/transaction" # Create multiple spans with the same profile_id but different transactions + # The aggregation query should group these and return just one row span1 = self.create_span( { "trace_id": trace_id, @@ -414,7 +416,7 @@ def test_get_profiles_for_trace_merges_duplicate_profiles(self) -> None: ) span3.update({"profile_id": profile_id}) - # Create a span with different profile_id (should not be merged) + # Create a span with different profile_id (should be separate row in aggregation) different_profile_id = uuid.uuid4().hex span4 = self.create_span( { @@ -440,9 +442,9 @@ def mock_service_response(method, path, *args, **kwargs): "profile": { "frames": [ { - "function": "merged_function", + "function": "aggregated_function", "module": "app", - "filename": "merged.py", + "filename": "aggregated.py", "lineno": 10, "in_app": True, } @@ -471,30 +473,25 @@ def mock_service_response(method, path, *args, **kwargs): assert result.trace_id == trace_id assert result.project_id == self.project.id - # Should have 2 profiles: 1 merged for the shared profile_id and 1 for the different profile_id + # Aggregation query should return 2 rows: one per unique profile_id assert len(result.profiles) == 2 - # Find the profiles by profile_id - merged_profiles = [p for p in result.profiles if p.profile_id == profile_id] - different_profiles = [ - p for p in result.profiles if p.profile_id == different_profile_id - ] - - assert ( - len(merged_profiles) == 1 - ), "Should merge 3 spans with same profile_id into 1, regardless of transaction name" - assert len(different_profiles) == 1, "Should keep different profile_id separate" + # Verify each unique profile_id appears exactly once + profile_ids_found = [p.profile_id for p in result.profiles] + assert profile_id in profile_ids_found + assert different_profile_id in profile_ids_found + assert profile_ids_found.count(profile_id) == 1 + assert profile_ids_found.count(different_profile_id) == 1 # Verify that the profile service was called only twice (once per unique profile_id) assert mock_service.call_count == 2 - # Check that both unique profile_ids were processed - profile_ids_found = [p.profile_id for p in result.profiles] - assert profile_id in profile_ids_found - assert different_profile_id in profile_ids_found + # Verify all profiles are transaction profiles (not continuous) + for profile in result.profiles: + assert profile.is_continuous is False - def test_get_profiles_for_trace_merges_continuous_profiles(self) -> None: - """Test that continuous profiles with same profiler_id and is_continuous are merged, regardless of transaction name.""" + def test_get_profiles_for_trace_aggregates_continuous_profiles(self) -> None: + """Test that aggregation query returns one row per unique profiler_id for continuous profiles.""" trace_id = "c" * 32 # Valid 32-char hex trace ID profiler_id = uuid.uuid4().hex # Same profiler ID for multiple spans @@ -502,10 +499,11 @@ def test_get_profiles_for_trace_merges_continuous_profiles(self) -> None: transaction_name1 = "api/continuous/test" transaction_name2 = "api/different/continuous" - # Create multiple spans with the same profiler_id but different transactions (continuous profiles) + # Create multiple spans with the same profiler_id but different transactions + # The aggregation query should group these and return just one row spans = [] for i in range(3): - # Alternate between transaction names to test merging across transactions + # Alternate between transaction names to test aggregation across transactions transaction_name = transaction_name1 if i % 2 == 0 else transaction_name2 span = self.create_span( { @@ -532,7 +530,7 @@ def test_get_profiles_for_trace_merges_continuous_profiles(self) -> None: ) spans.append(span) - # Create a continuous profile span with different profiler_id (should not be merged) + # Create a continuous profile span with different profiler_id (should be separate row) different_profiler_id = uuid.uuid4().hex span_different = self.create_span( { @@ -572,7 +570,7 @@ def mock_service_response(method, path, *args, **kwargs): "profile": { "frames": [ { - "function": "continuous_merged_function", + "function": "continuous_aggregated_function", "module": "profiler", "filename": "continuous.py", "lineno": 15, @@ -604,19 +602,15 @@ def mock_service_response(method, path, *args, **kwargs): assert result.trace_id == trace_id assert result.project_id == self.project.id - # Should have 2 profiles: 1 merged for the shared profiler_id and 1 for the different profiler_id + # Aggregation query should return 2 rows: one per unique profiler_id assert len(result.profiles) == 2 - # Find the profiles by profiler_id - merged_profiles = [p for p in result.profiles if p.profile_id == profiler_id] - different_profiles = [ - p for p in result.profiles if p.profile_id == different_profiler_id - ] - - assert ( - len(merged_profiles) == 1 - ), "Should merge 3 continuous spans with same profiler_id into 1, regardless of transaction name" - assert len(different_profiles) == 1, "Should keep different profiler_id separate" + # Verify each unique profiler_id appears exactly once + profile_ids_found = [p.profile_id for p in result.profiles] + assert profiler_id in profile_ids_found + assert different_profiler_id in profile_ids_found + assert profile_ids_found.count(profiler_id) == 1 + assert profile_ids_found.count(different_profiler_id) == 1 # Verify that the profile service was called only twice (once per unique profiler_id) # Both should use the /chunks endpoint for continuous profiles @@ -627,10 +621,124 @@ def mock_service_response(method, path, *args, **kwargs): assert call[1]["method"] == "POST" assert "/chunks" in call[1]["path"] - # Check that both unique profiler_ids were processed - profile_ids_found = [p.profile_id for p in result.profiles] - assert profiler_id in profile_ids_found - assert different_profiler_id in profile_ids_found + # Verify all profiles are continuous profiles + for profile in result.profiles: + assert profile.is_continuous is True + + def test_get_profiles_for_trace_uses_aggregated_timestamps(self) -> None: + """Test that aggregation query correctly computes min/max timestamps for each profile.""" + trace_id = "d" * 32 # Valid 32-char hex trace ID + + profile_id = uuid.uuid4().hex + + # Create spans with the same profile_id at different times + # The aggregation should use min(start) and max(end) timestamps + span1_start = self.ten_mins_ago + span1_duration_ms = 50.0 # span1 ends at ten_mins_ago + 50ms + + span2_start = self.ten_mins_ago + timedelta(milliseconds=100) + span2_duration_ms = 100.0 # span2 ends at ten_mins_ago + 200ms (latest end) + + span3_start = self.ten_mins_ago + timedelta(milliseconds=25) + span3_duration_ms = 50.0 # span3 ends at ten_mins_ago + 75ms (middle) + + span1 = self.create_span( + { + "trace_id": trace_id, + "description": "First span (earliest start)", + "sentry_tags": {"transaction": "api/test", "op": "http.server"}, + "is_segment": True, + }, + start_ts=span1_start, + duration=int(span1_duration_ms), + ) + span1.update({"profile_id": profile_id}) + + span2 = self.create_span( + { + "trace_id": trace_id, + "description": "Second span (latest end)", + "sentry_tags": {"transaction": "api/test", "op": "db.query"}, + "is_segment": False, + }, + start_ts=span2_start, + duration=int(span2_duration_ms), + ) + span2.update({"profile_id": profile_id}) + + span3 = self.create_span( + { + "trace_id": trace_id, + "description": "Third span (middle)", + "sentry_tags": {"transaction": "api/test", "op": "cache.get"}, + "is_segment": False, + }, + start_ts=span3_start, + duration=int(span3_duration_ms), + ) + span3.update({"profile_id": profile_id}) + + self.store_spans([span1, span2, span3], is_eap=True) + + captured_timestamps = {} + + with mock.patch("sentry.seer.explorer.index_data.fetch_profile_data") as mock_fetch: + # Mock to capture the timestamps passed to fetch_profile_data + def capture_and_return( + profile_id, organization_id, project_id, start_ts, end_ts, is_continuous + ): + captured_timestamps["start_ts"] = start_ts + captured_timestamps["end_ts"] = end_ts + captured_timestamps["profile_id"] = profile_id + captured_timestamps["is_continuous"] = is_continuous + + return { + "profile": { + "frames": [ + { + "function": "test_function", + "module": "app", + "filename": "test.py", + "lineno": 10, + "in_app": True, + } + ], + "stacks": [[0]], + "samples": [ + { + "elapsed_since_start_ns": 1000000, + "thread_id": "1", + "stack_id": 0, + } + ], + "thread_metadata": {"1": {"name": "MainThread"}}, + } + } + + mock_fetch.side_effect = capture_and_return + + # Call the function + result = get_profiles_for_trace(trace_id, self.project.id) + + # Verify result + assert result is not None + assert len(result.profiles) == 1 + + # Verify fetch_profile_data was called with aggregated timestamps + assert mock_fetch.call_count == 1 + + # Calculate expected end times based on start + duration + _ = span1_start + timedelta(milliseconds=span1_duration_ms) + span2_end = span2_start + timedelta(milliseconds=span2_duration_ms) + _ = span3_start + timedelta(milliseconds=span3_duration_ms) + + # The aggregation should use: + # - min(start_ts) = span1_start (earliest start) + # - max(finish_ts) = span2_end (latest end: ten_mins_ago + 200ms) + assert captured_timestamps["start_ts"] == pytest.approx(span1_start.timestamp()) + assert captured_timestamps["end_ts"] == pytest.approx(span2_end.timestamp()) + assert captured_timestamps["profile_id"] == profile_id + assert captured_timestamps["is_continuous"] is False class TestGetIssuesForTransaction(APITransactionTestCase, SpanTestCase, SharedSnubaMixin):