@@ -102,46 +102,16 @@ impl std::fmt::Debug for JoshProxyService {
102102 }
103103}
104104
105- #[ tracing:: instrument]
106- async fn fetch_upstream (
105+ fn fetch_needed (
107106 service : Arc < JoshProxyService > ,
108- upstream_repo : String ,
109- remote_auth : & RemoteAuth ,
110- remote_url : String ,
107+ remote_url : & String ,
108+ upstream_repo : & String ,
109+ force : bool ,
111110 head_ref : Option < & str > ,
112111 head_ref_resolved : Option < & str > ,
113- force : bool ,
114- ) -> Result < ( ) , FetchError > {
115- let key = remote_url. clone ( ) ;
116-
117- let refs_to_fetch = match head_ref {
118- Some ( head_ref) if head_ref != "HEAD" && !head_ref. starts_with ( "refs/heads/" ) => {
119- vec ! [
120- "HEAD*" ,
121- "refs/josh/*" ,
122- "refs/heads/*" ,
123- "refs/tags/*" ,
124- head_ref,
125- ]
126- }
127- _ => {
128- vec ! [ "HEAD*" , "refs/josh/*" , "refs/heads/*" , "refs/tags/*" ]
129- }
130- } ;
131-
132- let refs_to_fetch: Vec < _ > = refs_to_fetch. iter ( ) . map ( |x| x. to_string ( ) ) . collect ( ) ;
133-
134- let us = upstream_repo. clone ( ) ;
135- let semaphore = service
136- . fetch_permits
137- . lock ( ) ?
138- . entry ( us. clone ( ) )
139- . or_insert ( Arc :: new ( tokio:: sync:: Semaphore :: new ( 1 ) ) )
140- . clone ( ) ;
141- let permit = semaphore. acquire ( ) . await ;
142-
112+ ) -> Result < bool , FetchError > {
143113 let fetch_timer_ok = {
144- if let Some ( last) = service. fetch_timers . read ( ) ?. get ( & key ) {
114+ if let Some ( last) = service. fetch_timers . read ( ) ?. get ( remote_url ) {
145115 let since = std:: time:: Instant :: now ( ) . duration_since ( * last) ;
146116 let max = std:: time:: Duration :: from_secs ( ARGS . cache_duration ) ;
147117
@@ -171,24 +141,90 @@ async fn fetch_upstream(
171141 } ;
172142
173143 match ( force, fetch_timer_ok, head_ref, head_ref_resolved) {
174- ( false , true , None , _) => return Ok ( ( ) ) ,
144+ ( false , true , None , _) => return Ok ( false ) ,
175145 ( false , true , Some ( head_ref) , _) => {
176146 if ( resolve_cache_ref ( head_ref) . map_err ( FetchError :: from_josh_error) ?) . is_some ( ) {
177147 trace ! ( "cache ref resolved" ) ;
178- return Ok ( ( ) ) ;
148+ return Ok ( false ) ;
179149 }
180150 }
181151 ( false , false , Some ( head_ref) , Some ( head_ref_resolved) ) => {
182152 if let Some ( oid) = resolve_cache_ref ( head_ref) . map_err ( FetchError :: from_josh_error) ? {
183153 if oid. to_string ( ) == head_ref_resolved {
184154 trace ! ( "cache ref resolved and matches" ) ;
185- return Ok ( ( ) ) ;
155+ return Ok ( false ) ;
186156 }
187157 }
188158 }
189159 _ => ( ) ,
190160 } ;
191161
162+ return Ok ( true ) ;
163+ }
164+
165+ #[ tracing:: instrument]
166+ async fn fetch_upstream (
167+ service : Arc < JoshProxyService > ,
168+ upstream_repo : String ,
169+ remote_auth : & RemoteAuth ,
170+ remote_url : String ,
171+ head_ref : Option < & str > ,
172+ head_ref_resolved : Option < & str > ,
173+ force : bool ,
174+ ) -> Result < ( ) , FetchError > {
175+ let refs_to_fetch = match head_ref {
176+ Some ( head_ref) if head_ref != "HEAD" && !head_ref. starts_with ( "refs/heads/" ) => {
177+ vec ! [
178+ "HEAD*" ,
179+ "refs/josh/*" ,
180+ "refs/heads/*" ,
181+ "refs/tags/*" ,
182+ head_ref,
183+ ]
184+ }
185+ _ => {
186+ vec ! [ "HEAD*" , "refs/josh/*" , "refs/heads/*" , "refs/tags/*" ]
187+ }
188+ } ;
189+
190+ let refs_to_fetch: Vec < _ > = refs_to_fetch. iter ( ) . map ( |x| x. to_string ( ) ) . collect ( ) ;
191+
192+ // Check if we really need to fetch before locking the semaphore. This avoids
193+ // A "no fetch" case waiting for some already running fetch just to do nothing.
194+ if !fetch_needed (
195+ service. clone ( ) ,
196+ & remote_url,
197+ & upstream_repo,
198+ force,
199+ head_ref,
200+ head_ref_resolved,
201+ ) ? {
202+ return Ok ( ( ) ) ;
203+ }
204+
205+ let us = upstream_repo. clone ( ) ;
206+ let semaphore = service
207+ . fetch_permits
208+ . lock ( ) ?
209+ . entry ( us. clone ( ) )
210+ . or_insert ( Arc :: new ( tokio:: sync:: Semaphore :: new ( 1 ) ) )
211+ . clone ( ) ;
212+ let permit = semaphore. acquire ( ) . await ;
213+
214+ // Check the fetch condition once again after locking the semaphore, as an unknown
215+ // amount of time might have passed and the outcome of this check might have changed
216+ // while waiting.
217+ if !fetch_needed (
218+ service. clone ( ) ,
219+ & remote_url,
220+ & upstream_repo,
221+ force,
222+ head_ref,
223+ head_ref_resolved,
224+ ) ? {
225+ return Ok ( ( ) ) ;
226+ }
227+
192228 let fetch_timers = service. fetch_timers . clone ( ) ;
193229 let heads_map = service. heads_map . clone ( ) ;
194230 let br_path = service. repo_path . join ( "mirror" ) ;
@@ -220,7 +256,9 @@ async fn fetch_upstream(
220256 std:: mem:: drop ( permit) ;
221257
222258 if fetch_result. is_ok ( ) {
223- fetch_timers. write ( ) ?. insert ( key, std:: time:: Instant :: now ( ) ) ;
259+ fetch_timers
260+ . write ( ) ?
261+ . insert ( remote_url. clone ( ) , std:: time:: Instant :: now ( ) ) ;
224262 }
225263
226264 match ( fetch_result, remote_auth) {
0 commit comments