Skip to content

Commit f22b8fc

Browse files
committed
Distinguish transient and permanent errors that happen during a job benchmark
1 parent 216c1ed commit f22b8fc

File tree

2 files changed

+204
-92
lines changed

2 files changed

+204
-92
lines changed

collector/src/bin/collector.rs

Lines changed: 148 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ use collector::runtime::{
5454
};
5555
use collector::runtime::{profile_runtime, RuntimeCompilationOpts};
5656
use collector::toolchain::{
57-
create_toolchain_from_published_version, get_local_toolchain, Sysroot, Toolchain,
58-
ToolchainConfig,
57+
create_toolchain_from_published_version, get_local_toolchain, Sysroot, SysrootDownloadError,
58+
Toolchain, ToolchainConfig,
5959
};
6060
use collector::utils::cachegrind::cachegrind_diff;
6161
use collector::utils::{is_installed, wait_for_future};
@@ -1078,6 +1078,7 @@ fn main_result() -> anyhow::Result<i32> {
10781078
&host_target_tuple,
10791079
&backends,
10801080
))
1081+
.map_err(SysrootDownloadError::as_anyhow_error)
10811082
.with_context(|| format!("failed to install sysroot for {commit:?}"))?;
10821083

10831084
let mut benchmarks = get_compile_benchmarks(
@@ -1269,12 +1270,14 @@ fn main_result() -> anyhow::Result<i32> {
12691270
let commit = get_commit_or_fake_it(last_sha).expect("success");
12701271

12711272
let rt = build_async_runtime();
1272-
let mut sysroot = rt.block_on(Sysroot::install(
1273-
Path::new(TOOLCHAIN_CACHE_DIRECTORY),
1274-
commit.sha,
1275-
&host_target_tuple,
1276-
&codegen_backends.0,
1277-
))?;
1273+
let mut sysroot = rt
1274+
.block_on(Sysroot::install(
1275+
Path::new(TOOLCHAIN_CACHE_DIRECTORY),
1276+
commit.sha,
1277+
&host_target_tuple,
1278+
&codegen_backends.0,
1279+
))
1280+
.map_err(SysrootDownloadError::as_anyhow_error)?;
12781281
sysroot.preserve(); // don't delete it
12791282

12801283
// Print the directory containing the toolchain.
@@ -1358,6 +1361,7 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc
13581361

13591362
// Obtain the configuration and validate that it matches the
13601363
// collector's host target
1364+
// TODO: update heartbeat
13611365
let collector_config = rt
13621366
.block_on(conn.get_collector_config(&collector_name))?
13631367
.ok_or_else(|| {
@@ -1395,8 +1399,10 @@ async fn run_job_queue_benchmarks(
13951399
all_compile_benchmarks: Vec<Benchmark>,
13961400
host_target_tuple: &str,
13971401
) -> anyhow::Result<()> {
1402+
let conn = conn.as_mut();
13981403
conn.update_collector_heartbeat(collector.name()).await?;
13991404

1405+
// TODO: check collector SHA vs site SHA
14001406
// TODO: reconnect to the DB if there was an error with the previous job
14011407
while let Some((benchmark_job, artifact_id)) = conn
14021408
.dequeue_benchmark_job(
@@ -1407,77 +1413,57 @@ async fn run_job_queue_benchmarks(
14071413
.await?
14081414
{
14091415
log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}");
1410-
1411-
// Fail the job if it has been dequeued too many times
1412-
if benchmark_job.deque_count() > MAX_JOB_FAILS {
1413-
// TODO: store some error into the DB
1414-
conn.as_mut()
1415-
.mark_benchmark_job_as_completed(
1416-
benchmark_job.id(),
1417-
BenchmarkJobConclusion::Failure,
1418-
)
1419-
.await?;
1420-
continue;
1421-
}
1422-
1423-
log::info!("Downloading sysroot");
1424-
// TODO: if there is an error with downloading the toolchain, we need to mark the job as
1425-
// failed
1426-
let toolchain = match &artifact_id {
1427-
ArtifactId::Commit(commit) => {
1428-
let mut sysroot = Sysroot::install(
1429-
Path::new(TOOLCHAIN_CACHE_DIRECTORY),
1430-
commit.sha.clone(),
1431-
benchmark_job.target().as_str(),
1432-
&[benchmark_job.backend().into()],
1433-
)
1434-
.await
1435-
.with_context(|| format!("failed to install sysroot for {commit:?}"))?;
1436-
// Avoid redownloading the same sysroot multiple times for different jobs, even
1437-
// across collector restarts.
1438-
1439-
// TODO: Periodically clear the cache directory to avoid running out of disk space.
1440-
sysroot.preserve();
1441-
Toolchain::from_sysroot(&sysroot, commit.sha.clone())
1442-
}
1443-
ArtifactId::Tag(tag) => {
1444-
create_toolchain_from_published_version(&tag, &host_target_tuple)?
1445-
}
1446-
};
1447-
log::info!("Sysroot download finished");
1448-
1449-
let (compile_config, runtime_config) = create_benchmark_configs(
1450-
conn.as_mut(),
1451-
&toolchain,
1452-
&artifact_id,
1416+
let result = run_benchmark_job(
1417+
conn,
14531418
&benchmark_job,
1419+
artifact_id.clone(),
14541420
&all_compile_benchmarks,
1421+
host_target_tuple,
14551422
)
1456-
.await?;
1457-
1458-
let shared = SharedBenchmarkConfig {
1459-
artifact_id,
1460-
toolchain,
1461-
record_duration: false,
1462-
};
1463-
1464-
// TODO: distinguish transient and permanent errors
1465-
let job_result =
1466-
run_benchmarks(conn.as_mut(), shared, compile_config, runtime_config).await;
1467-
match job_result {
1423+
.await;
1424+
match result {
14681425
Ok(_) => {
14691426
log::info!("Job finished sucessfully");
1470-
conn.as_mut()
1471-
.mark_benchmark_job_as_completed(
1472-
benchmark_job.id(),
1473-
BenchmarkJobConclusion::Success,
1474-
)
1475-
.await?;
1427+
conn.mark_benchmark_job_as_completed(
1428+
benchmark_job.id(),
1429+
BenchmarkJobConclusion::Success,
1430+
)
1431+
.await?;
14761432
}
14771433
Err(error) => {
1478-
// TODO: record the error *somewhere*
1479-
log::error!("Job finished with error: {error:?}. Retrying after 30s...");
1480-
tokio::time::sleep(Duration::from_secs(30)).await;
1434+
match error {
1435+
BenchmarkJobError::Permanent(error) => {
1436+
log::error!("Job finished with permanent error: {error:?}");
1437+
1438+
// Store the error to the database
1439+
let artifact_row_id = conn.artifact_id(&artifact_id).await;
1440+
// Use a <job> placeholder to say that the error is associated with a job,
1441+
// not with a benchmark.
1442+
conn.record_error(
1443+
artifact_row_id,
1444+
&format!("job:{}", benchmark_job.id()),
1445+
&format!("Error while benchmarking job {benchmark_job:?}: {error:?}"),
1446+
)
1447+
.await;
1448+
1449+
// Something bad that probably cannot be retried has happened.
1450+
// Immediately mark the job as failed and continue with other jobs
1451+
log::info!("Marking the job as failed");
1452+
conn.mark_benchmark_job_as_completed(
1453+
benchmark_job.id(),
1454+
BenchmarkJobConclusion::Failure,
1455+
)
1456+
.await?;
1457+
}
1458+
BenchmarkJobError::Transient(error) => {
1459+
log::error!("Job finished with transient error: {error:?}");
1460+
1461+
// There was some transient (i.e. I/O, network or database) error.
1462+
// Let's retry the job later, with some sleep
1463+
log::info!("Retrying after 30s...");
1464+
tokio::time::sleep(Duration::from_secs(3)).await;
1465+
}
1466+
}
14811467
}
14821468
}
14831469

@@ -1487,6 +1473,96 @@ async fn run_job_queue_benchmarks(
14871473
Ok(())
14881474
}
14891475

1476+
/// Error that happened during benchmarking of a job.
1477+
enum BenchmarkJobError {
1478+
/// The error is non-recoverable.
1479+
/// For example, a rustc toolchain does not exist on CI
1480+
Permanent(anyhow::Error),
1481+
Transient(anyhow::Error),
1482+
}
1483+
1484+
impl From<anyhow::Error> for BenchmarkJobError {
1485+
fn from(error: Error) -> Self {
1486+
Self::Transient(error)
1487+
}
1488+
}
1489+
1490+
async fn run_benchmark_job(
1491+
conn: &mut dyn Connection,
1492+
job: &BenchmarkJob,
1493+
artifact_id: ArtifactId,
1494+
all_compile_benchmarks: &[Benchmark],
1495+
host_target_tuple: &str,
1496+
) -> Result<(), BenchmarkJobError> {
1497+
// Fail the job if it has been dequeued too many times
1498+
if job.deque_count() > MAX_JOB_FAILS {
1499+
return Err(BenchmarkJobError::Permanent(anyhow::anyhow!(
1500+
"Job failed after being dequeued for {MAX_JOB_FAILS} times"
1501+
)));
1502+
}
1503+
1504+
log::info!("Downloading sysroot");
1505+
let toolchain = match &artifact_id {
1506+
ArtifactId::Commit(commit) => {
1507+
let mut sysroot = match Sysroot::install(
1508+
Path::new(TOOLCHAIN_CACHE_DIRECTORY),
1509+
commit.sha.clone(),
1510+
job.target().as_str(),
1511+
&[job.backend().into()],
1512+
)
1513+
.await
1514+
{
1515+
Ok(sysroot) => sysroot,
1516+
Err(SysrootDownloadError::SysrootShaNotFound) => {
1517+
return Err(BenchmarkJobError::Permanent(anyhow::anyhow!(
1518+
"Artifacts for SHA {} and target {} were not found on CI servers",
1519+
commit.sha,
1520+
job.target().as_str()
1521+
)))
1522+
}
1523+
Err(SysrootDownloadError::IO(error)) => return Err(error.into()),
1524+
};
1525+
// Avoid redownloading the same sysroot multiple times for different jobs, even
1526+
// across collector restarts.
1527+
1528+
// TODO: Periodically clear the cache directory to avoid running out of disk space.
1529+
sysroot.preserve();
1530+
Toolchain::from_sysroot(&sysroot, commit.sha.clone())
1531+
}
1532+
ArtifactId::Tag(tag) => create_toolchain_from_published_version(&tag, &host_target_tuple)?,
1533+
};
1534+
log::info!("Sysroot download finished");
1535+
1536+
let (compile_config, runtime_config) = create_benchmark_configs(
1537+
conn,
1538+
&toolchain,
1539+
&artifact_id,
1540+
&job,
1541+
&all_compile_benchmarks,
1542+
)
1543+
.await
1544+
.map_err(|error| {
1545+
BenchmarkJobError::Permanent(anyhow::anyhow!(
1546+
"Cannot prepare benchmark configs: {error:?}"
1547+
))
1548+
})?;
1549+
1550+
let shared = SharedBenchmarkConfig {
1551+
artifact_id,
1552+
toolchain,
1553+
record_duration: false,
1554+
};
1555+
1556+
// A failure here means that it was not possible to compile something, that likely won't resolve
1557+
// itself automatically.
1558+
run_benchmarks(conn, shared, compile_config, runtime_config)
1559+
.await
1560+
.map_err(|error| {
1561+
BenchmarkJobError::Permanent(anyhow::anyhow!("Cannot run benchmarks: {error:?}"))
1562+
})?;
1563+
Ok(())
1564+
}
1565+
14901566
async fn create_benchmark_configs(
14911567
conn: &mut dyn Connection,
14921568
toolchain: &Toolchain,

0 commit comments

Comments
 (0)