Skip to content

Commit d30a7f4

Browse files
authored
Merge pull request #2293 from Kobzol/job-queue-loop
Update job queue loop to avoid early exit
2 parents 202bcaa + ae60958 commit d30a7f4

File tree

2 files changed

+133
-76
lines changed

2 files changed

+133
-76
lines changed

collector/collect-job-queue.sh

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#!/bin/bash
2+
3+
# This script expects DATABASE and COLLECTOR_NAME to be defined in the environment
4+
5+
set -u -o pipefail
6+
7+
echo "Running job queue collector"
8+
9+
export RUST_LOG=collector=trace,collector::sysroot=debug
10+
export PATH="/home/collector/.cargo/bin:$PATH"
11+
12+
while : ; do
13+
# Update and rebuild the collector.
14+
git pull
15+
git reset --hard @{upstream}
16+
17+
# Make sure we have a recent build, so that we can successfully build
18+
# the collector.
19+
rustup update stable
20+
cargo +stable build --release -p collector
21+
22+
CURRENT_SHA=`git rev-parse HEAD`
23+
24+
target/release/collector benchmark_job_queue \
25+
--db "${DATABASE}" \
26+
--check_git_sha \
27+
--git_sha "${CURRENT_SHA}"
28+
--collector_name "${COLLECTOR_NAME}"
29+
30+
STATUS=$?
31+
echo finished run at `date` with exit code $STATUS
32+
33+
# Wait a bit if the command has failed.
34+
if [ $STATUS -ne 0 ]; then
35+
sleep 60
36+
fi
37+
done

collector/src/bin/collector.rs

Lines changed: 96 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,6 +1503,9 @@ fn get_host_tuple_from_rustc(rustc: &str) -> anyhow::Result<String> {
15031503
/// Maximum number of failures before a job will be marked as failed.
15041504
const MAX_JOB_FAILS: u32 = 3;
15051505

1506+
/// How long should the collector sleep for if it does not find any job in the job queue.
1507+
const JOB_WAIT_SLEEP_TIME: Duration = Duration::from_secs(30);
1508+
15061509
async fn run_job_queue_benchmarks(
15071510
pool: Pool,
15081511
mut conn: Box<dyn Connection>,
@@ -1514,96 +1517,113 @@ async fn run_job_queue_benchmarks(
15141517

15151518
let mut last_request_tag = None;
15161519

1517-
while let Some((benchmark_job, artifact_id)) = conn
1518-
.dequeue_benchmark_job(
1519-
collector.name(),
1520-
collector.target(),
1521-
collector.benchmark_set(),
1522-
)
1523-
.await?
1524-
{
1525-
// Are we benchmarking a different benchmark request than in the previous iteration of the
1526-
// loop?
1527-
let is_new_request = last_request_tag.is_some()
1528-
&& last_request_tag.as_deref() != Some(benchmark_job.request_tag());
1529-
if is_new_request {
1530-
let _ = tidy_toolchain_cache_dir();
1531-
}
1532-
1533-
// Here we check if we should update our commit SHA, if rustc-perf has been updated.
1534-
// We only check for updates when we switch *benchmark requests*, not *benchmark jobs*,
1535-
// to avoid changing code in the middle of benchmarking the same request.
1536-
// Note that if an update happens, the job that we have just dequeued will have its deque
1537-
// counter increased. But since updates are relatively rare, that shouldn't be a big deal,
1538-
// it will be dequeued again when the collector starts again.
1539-
if check_git_sha && is_new_request && needs_git_update(collector) {
1520+
// Outer loop - wait for some jobs to appear
1521+
loop {
1522+
if check_git_sha && needs_git_update(collector) {
15401523
log::warn!("Exiting collector to update itself from git.");
1541-
return Ok(());
1524+
break;
15421525
}
15431526

1544-
last_request_tag = Some(benchmark_job.request_tag().to_string());
1527+
while let Some((benchmark_job, artifact_id)) = conn
1528+
.dequeue_benchmark_job(
1529+
collector.name(),
1530+
collector.target(),
1531+
collector.benchmark_set(),
1532+
)
1533+
.await?
1534+
{
1535+
// Are we benchmarking a different benchmark request than in the previous iteration of the
1536+
// loop?
1537+
let is_new_request = last_request_tag.is_none()
1538+
|| (last_request_tag.as_deref() != Some(benchmark_job.request_tag()));
1539+
if is_new_request {
1540+
log::info!("Starting new request {}", benchmark_job.request_tag());
1541+
let _ = tidy_toolchain_cache_dir();
1542+
}
15451543

1546-
log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}");
1547-
let result = run_benchmark_job(
1548-
conn.as_mut(),
1549-
&benchmark_job,
1550-
artifact_id.clone(),
1551-
&all_compile_benchmarks,
1552-
)
1553-
.await;
1554-
match result {
1555-
Ok(_) => {
1556-
log::info!("Job finished sucessfully");
1557-
conn.mark_benchmark_job_as_completed(
1558-
benchmark_job.id(),
1559-
BenchmarkJobConclusion::Success,
1560-
)
1561-
.await?;
1544+
// Here we check if we should update our commit SHA, if rustc-perf has been updated.
1545+
// We only check for updates when we switch *benchmark requests*, not *benchmark jobs*,
1546+
// to avoid changing code in the middle of benchmarking the same request.
1547+
// Note that if an update happens, the job that we have just dequeued will have its deque
1548+
// counter increased. But since updates are relatively rare, that shouldn't be a big deal,
1549+
// it will be dequeued again when the collector starts again.
1550+
if check_git_sha && is_new_request && needs_git_update(collector) {
1551+
log::warn!("Exiting collector to update itself from git.");
1552+
return Ok(());
15621553
}
1563-
Err(error) => {
1564-
match error {
1565-
BenchmarkJobError::Permanent(error) => {
1566-
log::error!("Job finished with permanent error: {error:?}");
1567-
1568-
// Store the error to the database
1569-
let artifact_row_id = conn.artifact_id(&artifact_id).await;
1570-
// Use a <job> placeholder to say that the error is associated with a job,
1571-
// not with a benchmark.
1572-
conn.record_error(
1573-
artifact_row_id,
1574-
"Job failure",
1575-
&format!("Error while benchmarking job {benchmark_job:?}: {error:?}"),
1576-
Some(benchmark_job.id()),
1577-
)
1578-
.await;
1579-
1580-
// Something bad that probably cannot be retried has happened.
1581-
// Immediately mark the job as failed and continue with other jobs
1582-
log::info!("Marking the job as failed");
1583-
conn.mark_benchmark_job_as_completed(
1584-
benchmark_job.id(),
1585-
BenchmarkJobConclusion::Failure,
1586-
)
1587-
.await?;
1588-
}
1589-
BenchmarkJobError::Transient(error) => {
1590-
log::error!("Job finished with transient error: {error:?}");
15911554

1592-
// There was some transient (i.e. I/O, network or database) error.
1593-
// Let's retry the job later, with some sleep
1594-
log::info!("Retrying after 30s...");
1595-
tokio::time::sleep(Duration::from_secs(30)).await;
1555+
last_request_tag = Some(benchmark_job.request_tag().to_string());
15961556

1597-
// Maybe there was a DB issue. Try to reconnect to the database.
1598-
conn = pool.connection().await;
1557+
log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}");
1558+
let result = run_benchmark_job(
1559+
conn.as_mut(),
1560+
&benchmark_job,
1561+
artifact_id.clone(),
1562+
&all_compile_benchmarks,
1563+
)
1564+
.await;
1565+
match result {
1566+
Ok(_) => {
1567+
log::info!("Job finished sucessfully");
1568+
conn.mark_benchmark_job_as_completed(
1569+
benchmark_job.id(),
1570+
BenchmarkJobConclusion::Success,
1571+
)
1572+
.await?;
1573+
}
1574+
Err(error) => {
1575+
match error {
1576+
BenchmarkJobError::Permanent(error) => {
1577+
log::error!("Job finished with permanent error: {error:?}");
1578+
1579+
// Store the error to the database
1580+
let artifact_row_id = conn.artifact_id(&artifact_id).await;
1581+
// Use a <job> placeholder to say that the error is associated with a job,
1582+
// not with a benchmark.
1583+
conn.record_error(
1584+
artifact_row_id,
1585+
"Job failure",
1586+
&format!(
1587+
"Error while benchmarking job {benchmark_job:?}: {error:?}"
1588+
),
1589+
Some(benchmark_job.id()),
1590+
)
1591+
.await;
1592+
1593+
// Something bad that probably cannot be retried has happened.
1594+
// Immediately mark the job as failed and continue with other jobs
1595+
log::info!("Marking the job as failed");
1596+
conn.mark_benchmark_job_as_completed(
1597+
benchmark_job.id(),
1598+
BenchmarkJobConclusion::Failure,
1599+
)
1600+
.await?;
1601+
}
1602+
BenchmarkJobError::Transient(error) => {
1603+
log::error!("Job finished with transient error: {error:?}");
1604+
1605+
// There was some transient (i.e. I/O, network or database) error.
1606+
// Let's retry the job later, with some sleep
1607+
log::info!("Retrying after 30s...");
1608+
tokio::time::sleep(Duration::from_secs(30)).await;
1609+
1610+
// Maybe there was a DB issue. Try to reconnect to the database.
1611+
conn = pool.connection().await;
1612+
}
15991613
}
16001614
}
16011615
}
1616+
1617+
conn.update_collector_heartbeat(collector.name()).await?;
16021618
}
16031619

1620+
log::info!(
1621+
"No job found, sleeping for {}s",
1622+
JOB_WAIT_SLEEP_TIME.as_secs()
1623+
);
1624+
tokio::time::sleep(JOB_WAIT_SLEEP_TIME).await;
16041625
conn.update_collector_heartbeat(collector.name()).await?;
16051626
}
1606-
log::info!("No job found, exiting");
16071627
Ok(())
16081628
}
16091629

0 commit comments

Comments
 (0)