@@ -13,19 +13,26 @@ use parking_lot::RwLock;
1313use std:: { str:: FromStr , sync:: Arc } ;
1414use tokio:: time:: { self , Duration , MissedTickBehavior } ;
1515
16- pub fn run_new_queue ( ) -> bool {
17- std:: env:: var ( "RUN_CRON " )
16+ pub fn is_job_queue_enabled ( ) -> bool {
17+ std:: env:: var ( "USE_JOB_QUEUE " )
1818 . ok ( )
1919 . and_then ( |x| x. parse ( ) . ok ( ) )
20- . unwrap_or ( false )
20+ . unwrap_or ( true )
21+ }
22+
23+ /// rust-lang/rust PR that will be used for testing the job queue.
24+ const TEST_PR_FOR_JOB_QUEUE : u32 = 147039 ;
25+
26+ pub fn should_use_job_queue ( pr : u32 ) -> bool {
27+ is_job_queue_enabled ( ) && pr == TEST_PR_FOR_JOB_QUEUE
2128}
2229
2330/// Store the latest master commits or do nothing if all of them are
2431/// already in the database.
2532/// Returns `true` if at least one benchmark request was inserted.
2633async fn create_benchmark_request_master_commits (
2734 ctxt : & SiteCtxt ,
28- conn : & dyn database:: pool:: Connection ,
35+ _conn : & dyn database:: pool:: Connection ,
2936 index : & BenchmarkRequestIndex ,
3037) -> anyhow:: Result < bool > {
3138 let now = Utc :: now ( ) ;
@@ -40,23 +47,26 @@ async fn create_benchmark_request_master_commits(
4047 // TODO; delete at some point in the future
4148 let cutoff: chrono:: DateTime < Utc > = chrono:: DateTime :: from_str ( "2025-08-27T00:00:00.000Z" ) ?;
4249
43- let mut inserted = false ;
50+ let inserted = false ;
4451 for master_commit in master_commits {
4552 // We don't want to add masses of obsolete data
4653 if master_commit. time >= cutoff && !index. contains_tag ( & master_commit. sha ) {
47- let pr = master_commit. pr . unwrap_or ( 0 ) ;
48- let benchmark = BenchmarkRequest :: create_master (
49- & master_commit. sha ,
50- & master_commit. parent_sha ,
51- pr,
52- master_commit. time ,
53- ) ;
54- log:: info!( "Inserting master benchmark request {benchmark:?}" ) ;
55- if let Err ( error) = conn. insert_benchmark_request ( & benchmark) . await {
56- log:: error!( "Failed to insert master benchmark request: {error:?}" ) ;
57- } else {
58- inserted = true ;
59- }
54+ // let pr = master_commit.pr.unwrap_or(0);
55+ // let benchmark = BenchmarkRequest::create_master(
56+ // &master_commit.sha,
57+ // &master_commit.parent_sha,
58+ // pr,
59+ // master_commit.time,
60+ // );
61+ // log::info!("Inserting master benchmark request {benchmark:?}");
62+
63+ // Do not create benchmark requests on production, to allow running in parallel with
64+ // the old system.
65+ // if let Err(error) = conn.insert_benchmark_request(&benchmark).await {
66+ // log::error!("Failed to insert master benchmark request: {error:?}");
67+ // } else {
68+ // inserted = true;
69+ // }
6070 }
6171 }
6272 Ok ( inserted)
@@ -66,7 +76,7 @@ async fn create_benchmark_request_master_commits(
6676/// already in the database
6777/// Returns `true` if at least one benchmark request was inserted.
6878async fn create_benchmark_request_releases (
69- conn : & dyn database:: pool:: Connection ,
79+ _conn : & dyn database:: pool:: Connection ,
7080 index : & BenchmarkRequestIndex ,
7181) -> anyhow:: Result < bool > {
7282 let releases: String = reqwest:: get ( "https://static.rust-lang.org/manifests.txt" )
@@ -82,16 +92,19 @@ async fn create_benchmark_request_releases(
8292 . filter_map ( parse_release_string)
8393 . take ( 20 ) ;
8494
85- let mut inserted = false ;
95+ let inserted = false ;
8696 for ( name, commit_date) in releases {
8797 if commit_date >= cutoff && !index. contains_tag ( & name) {
88- let release_request = BenchmarkRequest :: create_release ( & name, commit_date) ;
89- log:: info!( "Inserting release benchmark request {release_request:?}" ) ;
90- if let Err ( error) = conn. insert_benchmark_request ( & release_request) . await {
91- log:: error!( "Failed to insert release benchmark request: {error}" ) ;
92- } else {
93- inserted = true ;
94- }
98+ // let release_request = BenchmarkRequest::create_release(&name, commit_date);
99+ // log::info!("Inserting release benchmark request {release_request:?}");
100+
101+ // Do not create benchmark requests on production, to allow running in parallel with
102+ // the old system.
103+ // if let Err(error) = conn.insert_benchmark_request(&release_request).await {
104+ // log::error!("Failed to insert release benchmark request: {error}");
105+ // } else {
106+ // inserted = true;
107+ // }
95108 }
96109 }
97110 Ok ( inserted)
@@ -204,20 +217,20 @@ pub async fn build_queue(
204217/// This is performed atomically, in a transaction.
205218pub async fn enqueue_benchmark_request (
206219 conn : & mut dyn database:: pool:: Connection ,
207- benchmark_request : & BenchmarkRequest ,
220+ request : & BenchmarkRequest ,
208221) -> anyhow:: Result < ( ) > {
209222 let mut tx = conn. transaction ( ) . await ;
210223
211- let Some ( request_tag) = benchmark_request . tag ( ) else {
212- panic ! ( "Benchmark request {benchmark_request :?} has no tag" ) ;
224+ let Some ( request_tag) = request . tag ( ) else {
225+ panic ! ( "Benchmark request {request :?} has no tag" ) ;
213226 } ;
214227
215- log:: info!( "Enqueuing jobs for request {benchmark_request :?}" ) ;
228+ log:: info!( "Enqueuing jobs for request {request :?}" ) ;
216229
217- let backends = benchmark_request . backends ( ) ?;
218- let profiles = benchmark_request . profiles ( ) ?;
230+ let backends = request . backends ( ) ?;
231+ let profiles = request . profiles ( ) ?;
219232 // Prevent the error from spamming the logs
220- let mut has_emitted_parent_sha_error = false ;
233+ // let mut has_emitted_parent_sha_error = false;
221234
222235 // Target x benchmark_set x backend x profile -> BenchmarkJob
223236 for target in Target :: all ( ) {
@@ -238,32 +251,36 @@ pub async fn enqueue_benchmark_request(
238251 // If the parent job has been deleted from the database
239252 // but was already benchmarked then the collector will ignore
240253 // it as it will see it already has results.
241- if let Some ( parent_sha) = benchmark_request. parent_sha ( ) {
242- let ( is_foreign_key_violation, result) = tx
243- . conn ( )
244- . enqueue_parent_benchmark_job (
245- parent_sha,
246- target,
247- backend,
248- profile,
249- benchmark_set as u32 ,
250- )
251- . await ;
252-
253- // At some point in time the parent_sha may not refer
254- // to a `benchmark_request` and we want to be able to
255- // see that error.
256- if let Err ( e) = result {
257- if is_foreign_key_violation && !has_emitted_parent_sha_error {
258- log:: error!( "Failed to create job for parent sha {e:?}" ) ;
259- has_emitted_parent_sha_error = true ;
260- } else if has_emitted_parent_sha_error && is_foreign_key_violation {
261- continue ;
262- } else {
263- return Err ( e) ;
264- }
265- }
266- }
254+
255+ // Do not enqueue parent jobs to allow parallel execution with the old system
256+ // If the parent artifact wouldn't be benchmarked yet, we would benchmark the
257+ // parent with the new system.
258+ // if let Some(parent_sha) = request.parent_sha() {
259+ // let (is_foreign_key_violation, result) = tx
260+ // .conn()
261+ // .enqueue_parent_benchmark_job(
262+ // parent_sha,
263+ // target,
264+ // backend,
265+ // profile,
266+ // benchmark_set as u32,
267+ // )
268+ // .await;
269+ //
270+ // // At some point in time the parent_sha may not refer
271+ // // to a `benchmark_request` and we want to be able to
272+ // // see that error.
273+ // if let Err(e) = result {
274+ // if is_foreign_key_violation && !has_emitted_parent_sha_error {
275+ // log::error!("Failed to create job for parent sha {e:?}");
276+ // has_emitted_parent_sha_error = true;
277+ // } else if has_emitted_parent_sha_error && is_foreign_key_violation {
278+ // continue;
279+ // } else {
280+ // return Err(e);
281+ // }
282+ // }
283+ // }
267284 }
268285 }
269286 }
@@ -287,12 +304,15 @@ async fn process_benchmark_requests(
287304) -> anyhow:: Result < Vec < BenchmarkRequest > > {
288305 let queue = build_queue ( conn) . await ?;
289306
307+ log:: debug!( "Current queue: {queue:?}" ) ;
308+
290309 let mut completed = vec ! [ ] ;
291310 for request in queue {
292311 match request. status ( ) {
293312 BenchmarkRequestStatus :: InProgress => {
294313 let tag = request. tag ( ) . expect ( "In progress request without a tag" ) ;
295314 if conn. maybe_mark_benchmark_request_as_completed ( tag) . await ? {
315+ log:: info!( "Request {tag} marked as completed" ) ;
296316 completed. push ( request) ;
297317 continue ;
298318 }
@@ -311,8 +331,9 @@ async fn process_benchmark_requests(
311331 Ok ( completed)
312332}
313333
314- /// For queueing jobs, add the jobs you want to queue to this function
315- async fn cron_enqueue_jobs ( ctxt : & SiteCtxt ) -> anyhow:: Result < ( ) > {
334+ /// Creates new benchmark requests, enqueues jobs for ready benchmark requests and
335+ /// finishes completed benchmark requests.
336+ async fn perform_queue_tick ( ctxt : & SiteCtxt ) -> anyhow:: Result < ( ) > {
316337 let mut conn = ctxt. conn ( ) . await ;
317338
318339 let index = ctxt. known_benchmark_requests . load ( ) ;
@@ -387,7 +408,10 @@ async fn cron_enqueue_jobs(ctxt: &SiteCtxt) -> anyhow::Result<()> {
387408}
388409
389410/// Entry point for the cron job that manages the benchmark request and job queue.
390- pub async fn cron_main ( site_ctxt : Arc < RwLock < Option < Arc < SiteCtxt > > > > , run_interval : Duration ) {
411+ pub async fn create_queue_process (
412+ site_ctxt : Arc < RwLock < Option < Arc < SiteCtxt > > > > ,
413+ run_interval : Duration ,
414+ ) {
391415 let mut interval = time:: interval ( run_interval) ;
392416 interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
393417
@@ -398,7 +422,7 @@ pub async fn cron_main(site_ctxt: Arc<RwLock<Option<Arc<SiteCtxt>>>>, run_interv
398422 let guard = ctxt. read ( ) ;
399423 guard. as_ref ( ) . cloned ( )
400424 } {
401- match cron_enqueue_jobs ( & ctxt_clone) . await {
425+ match perform_queue_tick ( & ctxt_clone) . await {
402426 Ok ( _) => log:: info!( "Cron job finished" ) ,
403427 Err ( e) => log:: error!( "Cron job failed to execute: {e:?}" ) ,
404428 }
0 commit comments