@@ -2,15 +2,14 @@ use crate::db::{delete_crate, Pool};
22use crate :: docbuilder:: PackageKind ;
33use crate :: error:: Result ;
44use crate :: storage:: Storage ;
5- use crate :: utils:: { get_crate_priority, report_error} ;
5+ use crate :: utils:: { get_config , get_crate_priority, report_error, set_config , ConfigName } ;
66use crate :: { Config , Index , Metrics , RustwideBuilder } ;
77use anyhow:: Context ;
88
99use crates_index_diff:: Change ;
1010use log:: { debug, info} ;
1111
12- use std:: fs;
13- use std:: path:: PathBuf ;
12+ use git2:: Oid ;
1413use std:: sync:: Arc ;
1514
1615#[ derive( Debug , Clone , Eq , PartialEq , serde:: Serialize ) ]
@@ -48,6 +47,31 @@ impl BuildQueue {
4847 }
4948 }
5049
50+ pub fn last_seen_reference ( & self ) -> Result < Option < Oid > > {
51+ let mut conn = self . db . get ( ) ?;
52+ if let Some ( value) = get_config ( & mut conn, ConfigName :: LastSeenIndexReference ) ?. as_str ( ) {
53+ match Oid :: from_str ( value) {
54+ Ok ( oid) => return Ok ( Some ( oid) ) ,
55+ Err ( err) => {
56+ log:: error!( "queue locked because of invalid last_seen_index_reference \" {}\" in database: {}" , value, err) ;
57+ self . lock ( ) ?;
58+ return Ok ( None ) ;
59+ }
60+ }
61+ }
62+ Ok ( None )
63+ }
64+
65+ fn set_last_seen_reference ( & self , oid : Oid ) -> Result < ( ) > {
66+ let mut conn = self . db . get ( ) ?;
67+ set_config (
68+ & mut conn,
69+ ConfigName :: LastSeenIndexReference ,
70+ oid. to_string ( ) ,
71+ ) ?;
72+ Ok ( ( ) )
73+ }
74+
5175 pub fn add_crate (
5276 & self ,
5377 name : & str ,
@@ -118,14 +142,36 @@ impl BuildQueue {
118142 f : impl FnOnce ( & QueuedCrate ) -> Result < ( ) > ,
119143 ) -> Result < ( ) > {
120144 let mut conn = self . db . get ( ) ?;
121-
122- let queued = self . queued_crates ( ) ?;
123- let to_process = match queued. get ( 0 ) {
145+ let mut transaction = conn. transaction ( ) ?;
146+
147+ // fetch the next available crate from the queue table.
148+ // We are using `SELECT FOR UPDATE` inside a transaction so
149+ // the QueuedCrate is locked until we are finished with it.
150+ // `SKIP LOCKED` here will enable another build-server to just
151+ // skip over taken (=locked) rows and start building the first
152+ // available one.
153+ let to_process = match transaction
154+ . query_opt (
155+ "SELECT id, name, version, priority, registry
156+ FROM queue
157+ WHERE attempt < $1
158+ ORDER BY priority ASC, attempt ASC, id ASC
159+ LIMIT 1
160+ FOR UPDATE SKIP LOCKED" ,
161+ & [ & self . max_attempts ] ,
162+ ) ?
163+ . map ( |row| QueuedCrate {
164+ id : row. get ( "id" ) ,
165+ name : row. get ( "name" ) ,
166+ version : row. get ( "version" ) ,
167+ priority : row. get ( "priority" ) ,
168+ registry : row. get ( "registry" ) ,
169+ } ) {
124170 Some ( krate) => krate,
125171 None => return Ok ( ( ) ) ,
126172 } ;
127173
128- let res = f ( to_process) . with_context ( || {
174+ let res = f ( & to_process) . with_context ( || {
129175 format ! (
130176 "Failed to build package {}-{} from queue" ,
131177 to_process. name, to_process. version
@@ -134,15 +180,16 @@ impl BuildQueue {
134180 self . metrics . total_builds . inc ( ) ;
135181 match res {
136182 Ok ( ( ) ) => {
137- conn . execute ( "DELETE FROM queue WHERE id = $1;" , & [ & to_process. id ] ) ?;
183+ transaction . execute ( "DELETE FROM queue WHERE id = $1;" , & [ & to_process. id ] ) ?;
138184 }
139185 Err ( e) => {
140186 // Increase attempt count
141- let rows = conn. query (
142- "UPDATE queue SET attempt = attempt + 1 WHERE id = $1 RETURNING attempt;" ,
143- & [ & to_process. id ] ,
144- ) ?;
145- let attempt: i32 = rows[ 0 ] . get ( 0 ) ;
187+ let attempt: i32 = transaction
188+ . query_one (
189+ "UPDATE queue SET attempt = attempt + 1 WHERE id = $1 RETURNING attempt;" ,
190+ & [ & to_process. id ] ,
191+ ) ?
192+ . get ( 0 ) ;
146193
147194 if attempt >= self . max_attempts {
148195 self . metrics . failed_builds . inc ( ) ;
@@ -152,39 +199,33 @@ impl BuildQueue {
152199 }
153200 }
154201
202+ transaction. commit ( ) ?;
203+
155204 Ok ( ( ) )
156205 }
157206}
158207
159208/// Locking functions.
160209impl BuildQueue {
161- pub ( crate ) fn lock_path ( & self ) -> PathBuf {
162- self . config . prefix . join ( "docsrs.lock" )
163- }
210+ /// Checks for the lock and returns whether it currently exists.
211+ pub fn is_locked ( & self ) -> Result < bool > {
212+ let mut conn = self . db . get ( ) ? ;
164213
165- /// Checks for the lock file and returns whether it currently exists.
166- pub fn is_locked ( & self ) -> bool {
167- self . lock_path ( ) . exists ( )
214+ Ok ( get_config ( & mut conn , ConfigName :: QueueLocked ) ?
215+ . as_bool ( )
216+ . unwrap_or ( false ) )
168217 }
169218
170- /// Creates a lock file . Daemon will check this lock file and stop operating if it exists.
219+ /// lock the queue . Daemon will check this lock and stop operating if it exists.
171220 pub fn lock ( & self ) -> Result < ( ) > {
172- let path = self . lock_path ( ) ;
173- if !path. exists ( ) {
174- fs:: OpenOptions :: new ( ) . write ( true ) . create ( true ) . open ( path) ?;
175- }
176-
177- Ok ( ( ) )
221+ let mut conn = self . db . get ( ) ?;
222+ set_config ( & mut conn, ConfigName :: QueueLocked , true )
178223 }
179224
180- /// Removes lock file .
225+ /// unlock the queue .
181226 pub fn unlock ( & self ) -> Result < ( ) > {
182- let path = self . lock_path ( ) ;
183- if path. exists ( ) {
184- fs:: remove_file ( path) ?;
185- }
186-
187- Ok ( ( ) )
227+ let mut conn = self . db . get ( ) ?;
228+ set_config ( & mut conn, ConfigName :: QueueLocked , false )
188229 }
189230}
190231
@@ -266,8 +307,15 @@ impl BuildQueue {
266307 }
267308 }
268309
310+ // store the last seen reference as git reference in
311+ // the local crates.io index repo.
269312 diff. set_last_seen_reference ( oid) ?;
270313
314+ // additionally set the reference in the database
315+ // so this survives recreating the registry watcher
316+ // server.
317+ self . set_last_seen_reference ( oid) ?;
318+
271319 Ok ( crates_added)
272320 }
273321
@@ -559,4 +607,57 @@ mod tests {
559607 Ok ( ( ) )
560608 } ) ;
561609 }
610+
611+ #[ test]
612+ fn test_last_seen_reference_in_db ( ) {
613+ crate :: test:: wrapper ( |env| {
614+ let queue = env. build_queue ( ) ;
615+ queue. unlock ( ) ?;
616+ assert ! ( !queue. is_locked( ) ?) ;
617+ // initial db ref is empty
618+ assert_eq ! ( queue. last_seen_reference( ) ?, None ) ;
619+ assert ! ( !queue. is_locked( ) ?) ;
620+
621+ let oid = git2:: Oid :: from_str ( "ffffffff" ) ?;
622+ queue. set_last_seen_reference ( oid) ?;
623+
624+ assert_eq ! ( queue. last_seen_reference( ) ?, Some ( oid) ) ;
625+ assert ! ( !queue. is_locked( ) ?) ;
626+
627+ Ok ( ( ) )
628+ } ) ;
629+ }
630+
631+ #[ test]
632+ fn test_broken_db_reference_locks_queue ( ) {
633+ crate :: test:: wrapper ( |env| {
634+ let mut conn = env. db ( ) . conn ( ) ;
635+ set_config ( & mut conn, ConfigName :: LastSeenIndexReference , "invalid" ) ?;
636+
637+ let queue = env. build_queue ( ) ;
638+ queue. unlock ( ) ?;
639+ assert ! ( !queue. is_locked( ) ?) ;
640+ assert_eq ! ( queue. last_seen_reference( ) ?, None ) ;
641+ assert ! ( queue. is_locked( ) ?) ;
642+
643+ Ok ( ( ) )
644+ } ) ;
645+ }
646+
647+ #[ test]
648+ fn test_queue_lock ( ) {
649+ crate :: test:: wrapper ( |env| {
650+ let queue = env. build_queue ( ) ;
651+ // unlocked without config
652+ assert ! ( !queue. is_locked( ) ?) ;
653+
654+ queue. lock ( ) ?;
655+ assert ! ( queue. is_locked( ) ?) ;
656+
657+ queue. unlock ( ) ?;
658+ assert ! ( !queue. is_locked( ) ?) ;
659+
660+ Ok ( ( ) )
661+ } ) ;
662+ }
562663}
0 commit comments