Skip to content

Commit 216c1ed

Browse files
committed
Add a function to update collector heartbeat
1 parent aa31f21 commit 216c1ed

File tree

4 files changed

+25
-0
lines changed

4 files changed

+25
-0
lines changed

collector/src/bin/collector.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,8 @@ async fn run_job_queue_benchmarks(
13951395
all_compile_benchmarks: Vec<Benchmark>,
13961396
host_target_tuple: &str,
13971397
) -> anyhow::Result<()> {
1398+
conn.update_collector_heartbeat(collector.name()).await?;
1399+
13981400
// TODO: reconnect to the DB if there was an error with the previous job
13991401
while let Some((benchmark_job, artifact_id)) = conn
14001402
.dequeue_benchmark_job(
@@ -1478,6 +1480,8 @@ async fn run_job_queue_benchmarks(
14781480
tokio::time::sleep(Duration::from_secs(30)).await;
14791481
}
14801482
}
1483+
1484+
conn.update_collector_heartbeat(collector.name()).await?;
14811485
}
14821486
log::info!("No job found, exiting");
14831487
Ok(())

database/src/pool.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,9 @@ pub trait Connection: Send + Sync {
276276

277277
/// Get all of the configuration for all of the collectors
278278
async fn get_collector_configs(&self) -> anyhow::Result<Vec<CollectorConfig>>;
279+
280+
/// Updates the last known heartbeat of a collector to the current time.
281+
async fn update_collector_heartbeat(&self, collector_name: &str) -> anyhow::Result<()>;
279282
}
280283

281284
#[async_trait::async_trait]

database/src/pool/postgres.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2212,6 +2212,20 @@ where
22122212

22132213
Ok(configs)
22142214
}
2215+
2216+
async fn update_collector_heartbeat(&self, collector_name: &str) -> anyhow::Result<()> {
2217+
self.conn()
2218+
.query(
2219+
r#"
2220+
UPDATE collector_config
2221+
SET last_heartbeat_at = NOW()
2222+
WHERE name = $1
2223+
"#,
2224+
&[&collector_name],
2225+
)
2226+
.await?;
2227+
Ok(())
2228+
}
22152229
}
22162230

22172231
fn row_to_benchmark_request(row: &Row) -> BenchmarkRequest {

database/src/pool/sqlite.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,6 +1377,10 @@ impl Connection for SqliteConnection {
13771377
async fn get_collector_configs(&self) -> anyhow::Result<Vec<CollectorConfig>> {
13781378
no_queue_implementation_abort!()
13791379
}
1380+
1381+
async fn update_collector_heartbeat(&self, _collector_name: &str) -> anyhow::Result<()> {
1382+
no_queue_implementation_abort!()
1383+
}
13801384
}
13811385

13821386
fn parse_artifact_id(ty: &str, sha: &str, date: Option<i64>) -> ArtifactId {

0 commit comments

Comments
 (0)