-
-
Notifications
You must be signed in to change notification settings - Fork 4.5k
ref(explorer): improve profile indexing query #103014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -233,7 +233,7 @@ def _fetch_and_process_profile( | |
| concurrently from multiple threads. | ||
|
|
||
| Args: | ||
| profile_info: Dictionary containing profile metadata (profile_id, span_id, etc.) | ||
| profile_info: Dictionary containing profile metadata (profile_id, is_continuous, start_ts, end_ts) | ||
| organization_id: Organization ID | ||
| project_id: Project ID | ||
| trace_id: Trace ID for logging | ||
|
|
@@ -242,7 +242,6 @@ def _fetch_and_process_profile( | |
| ProfileData if successful, None otherwise | ||
| """ | ||
| profile_id = profile_info["profile_id"] | ||
| span_id = profile_info["span_id"] | ||
| transaction_name = profile_info["transaction_name"] | ||
| is_continuous = profile_info["is_continuous"] | ||
| start_ts = profile_info["start_ts"] | ||
|
|
@@ -275,7 +274,6 @@ def _fetch_and_process_profile( | |
| if execution_tree: | ||
| return ProfileData( | ||
| profile_id=profile_id, | ||
| span_id=span_id, | ||
| transaction_name=transaction_name, | ||
| execution_tree=execution_tree, | ||
| project_id=project_id, | ||
|
|
@@ -297,7 +295,7 @@ def _fetch_and_process_profile( | |
|
|
||
| def get_profiles_for_trace(trace_id: str, project_id: int) -> TraceProfiles | None: | ||
| """ | ||
| Get profiles for a given trace, with one profile per unique span/transaction. | ||
| Get profiles for a given trace, supporting both transaction and continuous profiles. | ||
|
|
||
| Args: | ||
| trace_id: The trace ID to find profiles for | ||
|
|
@@ -328,129 +326,77 @@ def get_profiles_for_trace(trace_id: str, project_id: int) -> TraceProfiles | No | |
| auto_fields=True, | ||
| ) | ||
|
|
||
| # Step 1: Find spans in the trace that have profile data - using same constraint as flamegraph | ||
| profiling_constraint = "(has:profile.id) or (has:profiler.id has:thread.id)" | ||
| # Use aggregation query to get unique profile IDs and trace time range | ||
| # Query for both transaction profiles (profile.id) and continuous profiles (profiler.id) | ||
| profiles_result = Spans.run_table_query( | ||
| params=snuba_params, | ||
| query_string=f"trace:{trace_id} project.id:{project_id} {profiling_constraint}", | ||
| query_string=f"trace:{trace_id} project.id:{project_id} (has:profile.id OR has:profiler.id)", | ||
| selected_columns=[ | ||
| "span_id", | ||
| "profile.id", | ||
| "profiler.id", | ||
| "thread.id", | ||
| "transaction", | ||
| "span.op", | ||
| "is_transaction", | ||
| "precise.start_ts", | ||
| "precise.finish_ts", | ||
| "min(precise.start_ts)", | ||
| "max(precise.finish_ts)", | ||
| ], | ||
| orderby=["precise.start_ts"], | ||
| orderby=[], | ||
| offset=0, | ||
| limit=50, | ||
| limit=5, | ||
| referrer=Referrer.SEER_RPC, | ||
| config=config, | ||
| sampling_mode="NORMAL", | ||
| ) | ||
|
|
||
| # Step 2: Collect all profiles and merge those with same profile_id and is_continuous | ||
| all_profiles = [] | ||
| profile_data = [] | ||
|
|
||
| for row in profiles_result.get("data", []): | ||
| span_id = row.get("span_id") | ||
| profile_id = row.get("profile.id") # Transaction profiles | ||
| profiler_id = row.get("profiler.id") # Continuous profiles | ||
| transaction_name = row.get("transaction") | ||
| start_ts = row.get("precise.start_ts") | ||
| end_ts = row.get("precise.finish_ts") | ||
|
|
||
| logger.info( | ||
| "Iterating over span to get profiles", | ||
| extra={ | ||
| "span_id": span_id, | ||
| "profile_id": profile_id, | ||
| "profiler_id": profiler_id, | ||
| "transaction_name": transaction_name, | ||
| }, | ||
| ) | ||
|
|
||
| if not span_id: | ||
| logger.info( | ||
| "Span doesn't have an id, skipping", | ||
| extra={"span_id": span_id}, | ||
| ) | ||
| continue | ||
| start_ts = row.get("min(precise.start_ts)") | ||
| end_ts = row.get("max(precise.finish_ts)") | ||
|
|
||
| # Use profile.id first (transaction profiles), fallback to profiler.id (continuous profiles) | ||
| actual_profile_id = profile_id or profiler_id | ||
| actual_profile_id = profiler_id or profile_id | ||
| if not actual_profile_id: | ||
| logger.info( | ||
| "Span doesn't have a profile or profiler id, skipping", | ||
| extra={"span_id": span_id}, | ||
| ) | ||
| continue | ||
|
|
||
| # Determine if this is a continuous profile (profiler.id without profile.id) | ||
| is_continuous = profile_id is None and profiler_id is not None | ||
| is_continuous = profiler_id is not None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Correct Profile ID PrioritizationWhen a span has both |
||
|
|
||
| all_profiles.append( | ||
| profile_data.append( | ||
| { | ||
| "span_id": span_id, | ||
| "profile_id": actual_profile_id, | ||
| "transaction_name": transaction_name, | ||
| "is_continuous": is_continuous, | ||
| "start_ts": start_ts, | ||
| "end_ts": end_ts, | ||
| } | ||
| ) | ||
|
|
||
| # Merge profiles with same profile_id and is_continuous | ||
| # Use the earliest start_ts and latest end_ts for merged profiles | ||
| profile_groups = {} | ||
| for profile in all_profiles: | ||
| key = (profile["profile_id"], profile["is_continuous"]) | ||
|
|
||
| if key not in profile_groups: | ||
| profile_groups[key] = { | ||
| "span_id": profile["span_id"], # Keep the first span_id | ||
| "profile_id": profile["profile_id"], | ||
| "transaction_name": profile["transaction_name"], | ||
| "is_continuous": profile["is_continuous"], | ||
| "start_ts": profile["start_ts"], | ||
| "end_ts": profile["end_ts"], | ||
| } | ||
| else: | ||
| # Merge time ranges - use earliest start and latest end | ||
| existing = profile_groups[key] | ||
| if profile["start_ts"] and ( | ||
| existing["start_ts"] is None or profile["start_ts"] < existing["start_ts"] | ||
| ): | ||
| existing["start_ts"] = profile["start_ts"] | ||
| if profile["end_ts"] and ( | ||
| existing["end_ts"] is None or profile["end_ts"] > existing["end_ts"] | ||
| ): | ||
| existing["end_ts"] = profile["end_ts"] | ||
|
|
||
| unique_profiles = list(profile_groups.values()) | ||
|
|
||
| logger.info( | ||
| "Merged profiles", | ||
| extra={ | ||
| "original_count": len(all_profiles), | ||
| "merged_count": len(unique_profiles), | ||
| }, | ||
| ) | ||
|
|
||
| if not unique_profiles: | ||
| if not profile_data: | ||
| logger.info( | ||
| "No profiles found for trace", | ||
| extra={"trace_id": trace_id, "project_id": project_id}, | ||
| ) | ||
| return None | ||
|
|
||
| # Step 3: Fetch and process profiles in parallel | ||
| logger.info( | ||
| "Found unique profiles for trace", | ||
| extra={ | ||
| "trace_id": trace_id, | ||
| "profile_count": len(profile_data), | ||
| }, | ||
| ) | ||
|
|
||
| # Fetch and process profiles in parallel | ||
| processed_profiles = [] | ||
| profiles_to_fetch = [ | ||
| { | ||
| "profile_id": p["profile_id"], | ||
| "transaction_name": None, | ||
| "is_continuous": p["is_continuous"], | ||
| "start_ts": p["start_ts"], | ||
| "end_ts": p["end_ts"], | ||
| } | ||
| for p in profile_data | ||
| ] | ||
|
|
||
| with ThreadPoolExecutor(max_workers=min(len(unique_profiles), 10)) as executor: | ||
| with ThreadPoolExecutor(max_workers=min(len(profiles_to_fetch), 5)) as executor: | ||
| future_to_profile = { | ||
| executor.submit( | ||
| _fetch_and_process_profile, | ||
|
|
@@ -459,7 +405,7 @@ def get_profiles_for_trace(trace_id: str, project_id: int) -> TraceProfiles | No | |
| project_id, | ||
| trace_id, | ||
| ): profile_info | ||
| for profile_info in unique_profiles | ||
| for profile_info in profiles_to_fetch | ||
| } | ||
|
|
||
| for future in as_completed(future_to_profile): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Query uses
limit=5, preventing retrieval of all unique profiles for a trace, contradicting refactor's goal.Severity: CRITICAL | Confidence: 1.00
🔍 Detailed Analysis
The Snuba query constructed to retrieve unique profiles and their time ranges incorrectly applies a
limit=5. This hardcoded limit prevents the function from returning all unique profiles for a given trace if more than five distinct profiles are present, directly undermining the refactor's stated goal of obtaining all unique profiles.💡 Suggested Fix
Remove or substantially increase the
limitparameter in the Snuba query to ensure all unique profiles are fetched, aligning with the function's intended purpose.🤖 Prompt for AI Agent
Did we get this right? 👍 / 👎 to inform future reviews.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a bug