From 12606663cb462e6ee65959698480a1ee3b94716a Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Thu, 6 Nov 2025 16:52:09 -0800 Subject: [PATCH 1/6] [Autorevert] Jobs with test failures are back to be considered failures Signed-off-by: Jean Schmidt --- .../pytorch-auto-revert/SIGNAL_EXTRACTION.md | 22 ---- .../pytorch_auto_revert/signal_extraction.py | 12 +- .../tests/test_signal_extraction.py | 117 +----------------- 3 files changed, 6 insertions(+), 145 deletions(-) diff --git a/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md b/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md index 6ef7314397..2447b37bdc 100644 --- a/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md +++ b/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md @@ -82,28 +82,6 @@ Notes - Each commit holds a list of `SignalEvent`s (time‑ordered by `started_at`). Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting. -### Test‑track semantics -- Source of truth for SUCCESS/FAILURE is `tests.all_test_runs` per test id. -- When a test row exists for an attempt: - - Emit at most one FAILURE if any failed runs exist; at most one SUCCESS if any successful runs exist. -- When no test rows exist for an attempt and any grouped job for that attempt is pending → emit PENDING. -- Otherwise (no test rows and not pending) → no event for that attempt. - -### Job‑track semantics (non‑test) -- Build per normalized job base across commits; aggregate shards by `(wf_run_id, run_attempt)`. -- Event mapping per attempt uses aggregated job meta with test‑failure filtering: - - FAILURE only when the attempt had non‑test failures (e.g. infra‑related). - - PENDING when the attempt is still running. - - SUCCESS otherwise, including when failures are exclusively test‑caused (these are handled by test‑track). -- Cancelled attempts are treated as missing (no event). -- Emit a job‑track Signal only when at least one attempt/commit shows a non‑test (infra) failure within the window. - -Event naming (for debuggability): -- Consistent key=value format: `wf= kind= id= run= attempt=` -- Examples: - - Test event: `wf=trunk kind=test id=inductor/test_foo.py::test_bar run=1744 attempt=1` - - Job event: `wf=trunk kind=job id=linux-jammy-cuda12.8-py3.10-gcc11 / test run=1744 attempt=2` - ### Test‑track mapping - Build a per‑commit map `test_id -> list[SignalEvent]` by combining all relevant jobs and shards: - For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `tests.all_test_runs` rows (if any) for each candidate `test_id`: diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py index 17052d4356..5c32c461d4 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py @@ -481,7 +481,7 @@ def _build_non_test_signals( for wf_name, base_name in wf_base_keys: commit_objs: List[SignalCommit] = [] # Track failure types across all attempts/commits for this base - has_relevant_failures = False # at least one non-test failure observed + has_failures = False # at least one failure observed for sha, _ in commits: attempt_keys: List[ @@ -497,14 +497,12 @@ def _build_non_test_signals( # Map aggregation verdict to outer SignalStatus if meta.status is None: continue - if meta.status == AggStatus.FAILURE and meta.has_non_test_failures: - # mark presence of non-test failures (relevant for job track) - has_relevant_failures = True + if meta.status == AggStatus.FAILURE: ev_status = SignalStatus.FAILURE + has_failures = True elif meta.status == AggStatus.PENDING: ev_status = SignalStatus.PENDING else: - # Note: when all failures are caused by tests, we do NOT emit job-level failures ev_status = SignalStatus.SUCCESS # Extract wf_run_id/run_attempt from the attempt key @@ -535,8 +533,8 @@ def _build_non_test_signals( ) ) - # Emit job signal when failures were present and failures were NOT exclusively test-caused - if has_relevant_failures: + # Emit job signal when failures were present + if has_failures: signals.append( Signal( key=base_name, diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py index 4ba9e608bc..9479f76f4c 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py @@ -187,7 +187,7 @@ def test_attempt_boundary_two_events_time_ordered(self): self.assertIn("attempt=1", events[0].name) self.assertIn("attempt=2", events[1].name) - def test_keep_going_failure_test_track_failure_and_no_job_signal(self): + def test_keep_going_failure_test_track_failure_and_job_signal(self): # in_progress + KG-adjusted failure for a test-classified job jobs = [ J( @@ -217,8 +217,6 @@ def test_keep_going_failure_test_track_failure_and_no_job_signal(self): test_sig = self._find_test_signal(signals, "trunk", "f.py::test_a") self.assertIsNotNone(test_sig) self.assertEqual(test_sig.commits[0].events[0].status, SignalStatus.FAILURE) - # Non-test signal for this base should be omitted due to test-only failure policy - self.assertIsNone(self._find_job_signal(signals, "trunk", jobs[0].base_name)) def test_cancelled_attempt_yields_no_event(self): # Include a separate failing commit so the job signal is emitted @@ -250,119 +248,6 @@ def test_cancelled_attempt_yields_no_event(self): x1 = next(c for c in sig.commits if c.head_sha == "X1") self.assertEqual(x1.events, []) - def test_non_test_inclusion_gate(self): - # (a) only test failures -> no job signal - jobs_a = [ - J( - sha="A2", - run=600, - job=40, - attempt=1, - started_at=ts(self.t0, 10), - conclusion="failure", - rule="pytest failure", - ), - J( - sha="A1", - run=610, - job=41, - attempt=1, - started_at=ts(self.t0, 5), - conclusion="failure", - rule="pytest failure", - ), - ] - tests_a = [ - T( - job=40, - run=600, - attempt=1, - file="f.py", - name="test_x", - failure_runs=1, - success_runs=0, - ), - T( - job=41, - run=610, - attempt=1, - file="f.py", - name="test_x", - failure_runs=1, - success_runs=0, - ), - ] - signals_a = self._extract(jobs_a, tests_a) - self.assertIsNone( - self._find_job_signal(signals_a, "trunk", jobs_a[0].base_name) - ) - - # (b) includes a non-test failure -> job signal emitted - jobs_b = [ - J( - sha="B2", - run=700, - job=50, - attempt=1, - started_at=ts(self.t0, 10), - conclusion="failure", - rule="infra-flake", # non-test classification - ), - J( - sha="B1", - run=710, - job=51, - attempt=1, - started_at=ts(self.t0, 5), - conclusion="success", - rule="", - ), - ] - signals_b = self._extract(jobs_b, tests=[]) - self.assertIsNotNone( - self._find_job_signal(signals_b, "trunk", jobs_b[0].base_name) - ) - - def test_job_track_treats_test_failures_as_success(self): - # When a base has a non-test (infra) failure somewhere (so a job signal is emitted), - # attempts that fail due to tests should NOT appear as FAILURES in the job track. - # They should be treated as SUCCESS at the job-track level, leaving the failure to test-track. - jobs = [ - # Newer commit: infra-caused failure (non-test classification) - J( - sha="Z2", - run=9100, - job=801, - attempt=1, - started_at=ts(self.t0, 20), - conclusion="failure", - rule="infra", # non-test - ), - # Older commit: failure caused by tests (test classification) - J( - sha="Z1", - run=9000, - job=800, - attempt=1, - started_at=ts(self.t0, 10), - conclusion="failure", - rule="pytest failure", # test-caused - ), - ] - - signals = self._extract(jobs, tests=[]) - base = jobs[0].base_name - sig = self._find_job_signal(signals, "trunk", base) - self.assertIsNotNone(sig) - # Expect commits newest->older - self.assertEqual([c.head_sha for c in sig.commits], ["Z2", "Z1"]) - # Newer infra failure remains FAILURE - self.assertEqual(len(sig.commits[0].events), 1) - self.assertEqual(sig.commits[0].events[0].status, SignalStatus.FAILURE) - # Older test-caused failure is mapped to SUCCESS in job track - self.assertEqual(len(sig.commits[1].events), 1) - self.assertEqual(sig.commits[1].events[0].status, SignalStatus.SUCCESS) - def test_commits_without_jobs_are_included(self): # Verify that commits with no jobs at all are still included in signals # Simulate case where C2 has a failure, C3 has no jobs (e.g., periodic workflow), From eba7a5afab60668fbe2bb20b19c1c0be9d45dc70 Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Fri, 7 Nov 2025 14:28:06 -0800 Subject: [PATCH 2/6] [Autorevert] Introduction separate signals by rule on jobs track Signed-off-by: Jean Schmidt --- .../pytorch_auto_revert/job_agg_index.py | 10 +- .../pytorch_auto_revert/signal_extraction.py | 185 +++++++++++++----- 2 files changed, 138 insertions(+), 57 deletions(-) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py index b17aee0b12..7004277403 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py @@ -50,13 +50,14 @@ class JobMeta: - job_id: Optional job_id from the failing job, or from the first job if none failed. """ - started_at: datetime = datetime.min - is_pending: bool = False - is_cancelled: bool = False - has_failures: bool = False all_completed_success: bool = False + rules: List[str] = [] + has_failures: bool = False has_non_test_failures: bool = False + is_cancelled: bool = False + is_pending: bool = False job_id: Optional[int] = None + started_at: datetime = datetime.min @property def status(self) -> Optional[SignalStatus]: @@ -183,6 +184,7 @@ def stats(self, key: KeyT) -> JobMeta: has_non_test_failures=( any((r.is_failure and not r.is_test_failure) for r in jrows) ), + rules=[r.rule for r in jrows if r.rule], job_id=job_id, ) self._meta_cache[key] = meta diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py index 5c32c461d4..f441b91492 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py @@ -7,9 +7,10 @@ Transforms raw workflow/job/test data into Signal objects used by signal.py. """ +from collections import defaultdict from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import Dict, Iterable, List, Optional, Set, Tuple +from typing import DefaultDict, Dict, Iterable, List, Optional, Set, Tuple from .job_agg_index import JobAggIndex, JobMeta, SignalStatus as AggStatus from .signal import Signal, SignalCommit, SignalEvent, SignalSource, SignalStatus @@ -88,7 +89,7 @@ def extract(self) -> List[Signal]: ) test_signals = self._build_test_signals(jobs, test_rows, commits) - job_signals = self._build_non_test_signals(jobs, commits) + job_signals = self._build_job_signals(jobs, commits) # Deduplicate events within commits across all signals as a final step # GitHub-specific behavior like "rerun failed" can reuse job instances for reruns. # When that happens, the jobs have identical timestamps by DIFFERENT job ids. @@ -442,7 +443,7 @@ def _build_test_signals( return signals - def _build_non_test_signals( + def _build_job_signals( self, jobs: List[JobRow], commits: List[Tuple[Sha, datetime]] ) -> List[Signal]: """Build Signals keyed by normalized job base name per workflow. @@ -453,7 +454,6 @@ def _build_non_test_signals( jobs: List of job rows from the datasource commits: Ordered list of (sha, timestamp) tuples (newest → older) """ - commit_timestamps = dict(commits) index = JobAggIndex.from_rows( @@ -479,41 +479,115 @@ def _build_non_test_signals( signals: List[Signal] = [] for wf_name, base_name in wf_base_keys: - commit_objs: List[SignalCommit] = [] - # Track failure types across all attempts/commits for this base - has_failures = False # at least one failure observed + signals += self._build_job_signals_for_wf( + commit_timestamps, wf_name, base_name, index, groups_index + ) - for sha, _ in commits: - attempt_keys: List[ - Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt] - ] = groups_index.get((sha, wf_name, base_name), []) - events: List[SignalEvent] = [] + return signals - for akey in attempt_keys: - meta = index.stats(akey) - if meta.is_cancelled: - # canceled attempts are treated as missing - continue - # Map aggregation verdict to outer SignalStatus - if meta.status is None: - continue - if meta.status == AggStatus.FAILURE: - ev_status = SignalStatus.FAILURE - has_failures = True - elif meta.status == AggStatus.PENDING: - ev_status = SignalStatus.PENDING - else: - ev_status = SignalStatus.SUCCESS - - # Extract wf_run_id/run_attempt from the attempt key - _, _, _, wf_run_id, run_attempt = akey - - events.append( + def _build_job_signals_for_wf( + self, + commit_timestamps: Dict[Sha, datetime], + wf_name: WorkflowName, + base_name: JobBaseName, + index: JobAggIndex[Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt]], + groups_index: DefaultDict[ + Tuple[Sha, WorkflowName, JobBaseName], List[Tuple[WfRunId, RunAttempt]] + ], + ) -> List[Signal]: + # It is simpler to extract rules per signal and then build the signals, + # as it will change lots of names classes, etc + # so doing in a single iteration would be a messy code + found_rules: Set[str] = set() + for sha, _ in commit_timestamps.items(): + attempt_keys: List[ + Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt] + ] = groups_index.get((sha, wf_name, base_name), []) + + for akey in attempt_keys: + meta = index.stats(akey) + if meta.is_cancelled: + # canceled attempts are treated as missing + continue + if meta.status == AggStatus.FAILURE: + found_rules.update(r or "UNDEFINED" for r in meta.rules) + + # we only build job signals when there are failures + if not found_rules: + return [] + + signals: Dict[str, Signal] = {} + for found_rule in found_rules: + rule_base_name = f"{base_name}::{found_rule}" + signals[found_rule] = Signal( + key=rule_base_name, + workflow_name=wf_name, + commits=[], + job_base_name=rule_base_name, + source=SignalSource.JOB, + ) + + for sha, _ in commit_timestamps.items(): + rule_events = self._build_job_rule_events_for_sha( + sha, wf_name, base_name, index, groups_index, found_rules + ) + + for found_rule in found_rules: + signals[found_rule].commits.append( + SignalCommit( + head_sha=sha, + timestamp=commit_timestamps[sha], + events=rule_events.get(found_rule, []), + ) + ) + + return list(signals.values()) + + def _build_job_rule_events_for_sha( + self, + sha: Sha, + wf_name: WorkflowName, + base_name: JobBaseName, + index: JobAggIndex[Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt]], + groups_index: DefaultDict[ + Tuple[Sha, WorkflowName, JobBaseName], List[Tuple[WfRunId, RunAttempt]] + ], + found_rules: Set[str], + ) -> DefaultDict[str, List[SignalEvent]]: + attempt_keys: List[ + Tuple[Sha, WorkflowName, JobBaseName, WfRunId, RunAttempt] + ] = groups_index.get((sha, wf_name, base_name), []) + + rule_events: DefaultDict[str, List[SignalEvent]] = defaultdict(list) + + for akey in attempt_keys: + meta = index.stats(akey) + if meta.is_cancelled: + # canceled attempts are treated as missing + continue + # Map aggregation verdict to outer SignalStatus + if meta.status is None: + continue + if meta.status == AggStatus.FAILURE: + ev_status = SignalStatus.FAILURE + elif meta.status == AggStatus.PENDING: + ev_status = SignalStatus.PENDING + else: + ev_status = SignalStatus.SUCCESS + + # Extract wf_run_id/run_attempt from the attempt key + _, _, _, wf_run_id, run_attempt = akey + + if ev_status != SignalStatus.FAILURE: + # if the signal is not a failure it is relevant + # for all failures signals columns + for rule in found_rules: + rule_events[rule].append( SignalEvent( name=self._fmt_event_name( workflow=wf_name, kind="job", - identifier=base_name, + identifier=f"{base_name}::{rule}", wf_run_id=wf_run_id, run_attempt=run_attempt, ), @@ -525,24 +599,29 @@ def _build_non_test_signals( job_id=meta.job_id, ) ) - - # important to always include the commit, even if no events - commit_objs.append( - SignalCommit( - head_sha=sha, timestamp=commit_timestamps[sha], events=events - ) - ) - - # Emit job signal when failures were present - if has_failures: - signals.append( - Signal( - key=base_name, - workflow_name=wf_name, - commits=commit_objs, - job_base_name=str(base_name), - source=SignalSource.JOB, + else: + # signals that contain failures rules then are + # relevant to only those affected rules. + # EX: + # A signal initially failing with some timeout than fails + # with some infra error, means that the timeout signal + # status is not able to be obtained at this stage. + for rule_unfiltered in meta.rules: + rule = rule_unfiltered or "UNDEFINED" + rule_events[rule].append( + SignalEvent( + name=self._fmt_event_name( + workflow=wf_name, + kind="job", + identifier=f"{base_name}::{rule}", + wf_run_id=wf_run_id, + run_attempt=run_attempt, + ), + status=ev_status, + started_at=meta.started_at, + ended_at=None, + wf_run_id=int(wf_run_id), + run_attempt=int(run_attempt), + job_id=meta.job_id, + ) ) - ) - - return signals From bc46d9659074994a9cb059a0f6b8be3106e89289 Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Fri, 7 Nov 2025 16:12:47 -0800 Subject: [PATCH 3/6] 20251107161247 --- .../pytorch_auto_revert/job_agg_index.py | 4 +-- .../pytorch_auto_revert/signal.py | 9 +++++ .../pytorch_auto_revert/signal_extraction.py | 23 ++++++++++--- .../signal_extraction_types.py | 1 + .../tests/test_signal_extraction.py | 34 ++++++++++++++----- 5 files changed, 55 insertions(+), 16 deletions(-) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py index 7004277403..fa05614aca 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/job_agg_index.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections import defaultdict -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import ( @@ -51,7 +51,7 @@ class JobMeta: """ all_completed_success: bool = False - rules: List[str] = [] + rules: List[str] = field(default_factory=lambda: []) has_failures: bool = False has_non_test_failures: bool = False is_cancelled: bool = False diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py index cd67868680..48569de7a9 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal.py @@ -4,6 +4,7 @@ from typing import Callable, List, Optional, Set, Tuple, Union from .bisection_planner import GapBisectionPlanner +from .signal_extraction_types import JobBaseName, JobBaseNameRule class SignalStatus(Enum): @@ -290,6 +291,14 @@ class Signal: - job_base_name: optional job base name for job-level signals (recorded when signal is created) """ + @classmethod + def derive_base_name_with_rule( + cls, base_name: JobBaseName, rule: str | None + ) -> JobBaseNameRule: + if not rule: + return JobBaseNameRule(f"{base_name}::UNDEFINED") + return JobBaseNameRule(f"{base_name}::{rule}") + def __init__( self, key: str, diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py index f441b91492..cd333d8541 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction.py @@ -510,7 +510,10 @@ def _build_job_signals_for_wf( # canceled attempts are treated as missing continue if meta.status == AggStatus.FAILURE: - found_rules.update(r or "UNDEFINED" for r in meta.rules) + if meta.rules: + found_rules.update(r or "UNDEFINED" for r in meta.rules) + else: + found_rules.add("UNDEFINED") # we only build job signals when there are failures if not found_rules: @@ -518,7 +521,9 @@ def _build_job_signals_for_wf( signals: Dict[str, Signal] = {} for found_rule in found_rules: - rule_base_name = f"{base_name}::{found_rule}" + rule_base_name = Signal.derive_base_name_with_rule( + base_name=str(base_name), rule=found_rule + ) signals[found_rule] = Signal( key=rule_base_name, workflow_name=wf_name, @@ -582,12 +587,15 @@ def _build_job_rule_events_for_sha( # if the signal is not a failure it is relevant # for all failures signals columns for rule in found_rules: + rule_base_name = Signal.derive_base_name_with_rule( + base_name=str(base_name), rule=rule + ) rule_events[rule].append( SignalEvent( name=self._fmt_event_name( workflow=wf_name, kind="job", - identifier=f"{base_name}::{rule}", + identifier=rule_base_name, wf_run_id=wf_run_id, run_attempt=run_attempt, ), @@ -606,14 +614,17 @@ def _build_job_rule_events_for_sha( # A signal initially failing with some timeout than fails # with some infra error, means that the timeout signal # status is not able to be obtained at this stage. - for rule_unfiltered in meta.rules: + for rule_unfiltered in meta.rules or [None]: rule = rule_unfiltered or "UNDEFINED" + rule_base_name = Signal.derive_base_name_with_rule( + base_name=str(base_name), rule=rule + ) rule_events[rule].append( SignalEvent( name=self._fmt_event_name( workflow=wf_name, kind="job", - identifier=f"{base_name}::{rule}", + identifier=rule_base_name, wf_run_id=wf_run_id, run_attempt=run_attempt, ), @@ -625,3 +636,5 @@ def _build_job_rule_events_for_sha( job_id=meta.job_id, ) ) + + return rule_events diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py index 5d8340f8f4..9f5de233c9 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/signal_extraction_types.py @@ -23,6 +23,7 @@ WorkflowName = NewType("WorkflowName", str) JobName = NewType("JobName", str) JobBaseName = NewType("JobBaseName", str) +JobBaseNameRule = NewType("JobBaseNameRule", str) TestId = NewType("TestId", str) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py index 9479f76f4c..791d652e42 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py @@ -2,7 +2,7 @@ from datetime import datetime, timedelta from typing import Iterable, List -from pytorch_auto_revert.signal import SignalStatus +from pytorch_auto_revert.signal import Signal, SignalStatus from pytorch_auto_revert.signal_extraction import SignalExtractor from pytorch_auto_revert.signal_extraction_datasource import SignalExtractionDatasource from pytorch_auto_revert.signal_extraction_types import ( @@ -148,8 +148,12 @@ def test_commit_order_is_stable(self): J(sha="C1", run=100, job=2, attempt=1, started_at=ts(self.t0, 5)), ] signals = self._extract(jobs, tests=[]) - base = jobs[0].base_name - sig = self._find_job_signal(signals, "trunk", base) + base = jobs[0] + sig = self._find_job_signal( + signals, + "trunk", + Signal.derive_base_name_with_rule(base_name=base.base_name, rule=base.rule), + ) self.assertIsNotNone(sig) self.assertEqual([c.head_sha for c in sig.commits], ["C2", "C1"]) @@ -174,8 +178,12 @@ def test_attempt_boundary_two_events_time_ordered(self): ), ] signals = self._extract(jobs, tests=[]) - base = jobs[0].base_name - sig = self._find_job_signal(signals, "trunk", base) + base = jobs[0] + sig = self._find_job_signal( + signals, + "trunk", + Signal.derive_base_name_with_rule(base_name=base.base_name, rule=""), + ) self.assertIsNotNone(sig) self.assertEqual(len(sig.commits), 1) events = sig.commits[0].events @@ -241,8 +249,12 @@ def test_cancelled_attempt_yields_no_event(self): ), ] signals = self._extract(jobs, tests=[]) - base = jobs[0].base_name - sig = self._find_job_signal(signals, "trunk", base) + base = jobs[0] + sig = self._find_job_signal( + signals, + "trunk", + Signal.derive_base_name_with_rule(base_name=base.base_name, rule=base.rule), + ) self.assertIsNotNone(sig) # find X1 commit in the signal and ensure it has no events x1 = next(c for c in sig.commits if c.head_sha == "X1") @@ -291,8 +303,12 @@ def fetch_commits_in_time_range( se._datasource = FakeDatasourceWithExtraCommit(jobs, []) signals = se.extract() - base = jobs[0].base_name - sig = self._find_job_signal(signals, "trunk", base) + base = jobs[0] + sig = self._find_job_signal( + signals, + "trunk", + Signal.derive_base_name_with_rule(base_name=base.base_name, rule=base.rule), + ) self.assertIsNotNone(sig) # Should have 3 commits: C2 (with events), C3 (no events), C1 (with events) self.assertEqual(len(sig.commits), 3) From c3395a2fc947d62ae29673d5a948e8fa737e2d85 Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Fri, 7 Nov 2025 16:20:17 -0800 Subject: [PATCH 4/6] 20251107162017 --- .../tests/test_signal_extraction.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py index 791d652e42..dbde0a9f09 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py @@ -260,6 +260,77 @@ def test_cancelled_attempt_yields_no_event(self): x1 = next(c for c in sig.commits if c.head_sha == "X1") self.assertEqual(x1.events, []) + def test_changing_rules_across_attempts(self): + # One commit with 3 attempts: + # - attempt 1: failure with rule "infra" + # - attempt 2: failure with rule "pytest failure" + # - attempt 3: success with rule "" + jobs = [ + J( + sha="R1", + run=600, + job=40, + attempt=1, + started_at=ts(self.t0, 1), + conclusion="failure", + rule="infra", + ), + J( + sha="R1", + run=600, + job=41, + attempt=2, + started_at=ts(self.t0, 2), + conclusion="failure", + rule="pytest failure", + ), + J( + sha="R1", + run=600, + job=42, + attempt=3, + started_at=ts(self.t0, 3), + conclusion="success", + rule="", + ), + ] + signals = self._extract(jobs, tests=[]) + # for 'infra' rule + base = jobs[0] + sig = self._find_job_signal( + signals, + "trunk", + Signal.derive_base_name_with_rule(base_name=base.base_name, rule=base.rule), + ) + self.assertIsNotNone(sig) + self.assertEqual(len(sig.commits), 1) + events = sig.commits[0].events + self.assertEqual(len(events), 2) + # the first infra failure + self.assertEqual(events[0].status, SignalStatus.FAILURE) + self.assertIn(base.rule, events[0].name) + # the final success + self.assertEqual(events[1].status, SignalStatus.SUCCESS) + self.assertIn(base.rule, events[1].name) + + # for 'pytest failure' rule + base2 = jobs[1] + sig2 = self._find_job_signal( + signals, + "trunk", + Signal.derive_base_name_with_rule(base_name=base2.base_name, rule=base2.rule), + ) + self.assertIsNotNone(sig2) + self.assertEqual(len(sig2.commits), 1) + events2 = sig2.commits[0].events + self.assertEqual(len(events2), 2) + # the pytest failure + self.assertEqual(events2[0].status, SignalStatus.FAILURE) + self.assertIn(base2.rule, events2[0].name) + # the final success + self.assertEqual(events2[1].status, SignalStatus.SUCCESS) + self.assertIn(base2.rule, events2[1].name) + def test_commits_without_jobs_are_included(self): # Verify that commits with no jobs at all are still included in signals # Simulate case where C2 has a failure, C3 has no jobs (e.g., periodic workflow), From dbb04c4f8eda65def26b7ab422a911cc9b5aab41 Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Fri, 7 Nov 2025 16:21:37 -0800 Subject: [PATCH 5/6] 20251107162137 --- .../pytorch_auto_revert/tests/test_signal_extraction.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py index dbde0a9f09..eec545a248 100644 --- a/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py +++ b/aws/lambda/pytorch-auto-revert/pytorch_auto_revert/tests/test_signal_extraction.py @@ -318,7 +318,9 @@ def test_changing_rules_across_attempts(self): sig2 = self._find_job_signal( signals, "trunk", - Signal.derive_base_name_with_rule(base_name=base2.base_name, rule=base2.rule), + Signal.derive_base_name_with_rule( + base_name=base2.base_name, rule=base2.rule + ), ) self.assertIsNotNone(sig2) self.assertEqual(len(sig2.commits), 1) From d1fb6b65cc06fd30584d67ba352d6a26625547d2 Mon Sep 17 00:00:00 2001 From: Jean Schmidt Date: Fri, 7 Nov 2025 16:52:21 -0800 Subject: [PATCH 6/6] 20251107165221 --- .../pytorch-auto-revert/SIGNAL_EXTRACTION.md | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md b/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md index 2447b37bdc..091b742907 100644 --- a/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md +++ b/aws/lambda/pytorch-auto-revert/SIGNAL_EXTRACTION.md @@ -82,6 +82,28 @@ Notes - Each commit holds a list of `SignalEvent`s (time‑ordered by `started_at`). Ordering: dicts in Python 3.7+ preserve insertion order. Phase A inserts commit keys in push‑timestamp DESC order, so iterating the mapping yields newest→older commits without extra sorting. +### Test‑track semantics +- Source of truth for SUCCESS/FAILURE is `tests.all_test_runs` per test id. +- When a test row exists for an attempt: + - Emit at most one FAILURE if any failed runs exist; at most one SUCCESS if any successful runs exist. +- When no test rows exist for an attempt and any grouped job for that attempt is pending → emit PENDING. +- Otherwise (no test rows and not pending) → no event for that attempt. + +### Job‑track semantics (non‑test) +- Build per normalized job base across commits; aggregate shards by `(wf_run_id, run_attempt)`. +- Event mapping per attempt uses aggregated job meta with test‑failure filtering: + - FAILURE only when the attempt had non‑test failures (e.g. infra‑related). + - PENDING when the attempt is still running. + - SUCCESS otherwise, including when failures are exclusively test‑caused (these are handled by test‑track). +- Cancelled attempts are treated as missing (no event). +- Emit a job‑track Signal only when at least one attempt/commit shows a non‑test (infra) failure within the window. + +Event naming (for debuggability): +- Consistent key=value format: `wf= kind= id= run= attempt=` +- Examples: + - Test event: `wf=trunk kind=test id=inductor/test_foo.py::test_bar run=1744 attempt=1` + - Job event: `wf=trunk kind=job id=linux-jammy-cuda12.8-py3.10-gcc11 / test run=1744 attempt=2` + ### Test‑track mapping - Build a per‑commit map `test_id -> list[SignalEvent]` by combining all relevant jobs and shards: - For each (wf_run_id, run_attempt, job_base_name) group in the commit, consult `tests.all_test_runs` rows (if any) for each candidate `test_id`: @@ -93,7 +115,7 @@ Notes - Within the same run, separate events for retries via `run_attempt` (name hints like "Attempt #2" are not relied upon). ### Non‑test mapping -- Similar to test‑track but grouping is coarser (by normalized job base name): +- Similar to test‑track but grouping is coarser (by normalized job base name plus classification rule): - For each (run_id, run_attempt, job_base_name) group in the commit - Within each group compute event status: - FAILURE if any row concluded failure.