@@ -6,10 +6,10 @@ mod tests;
66
77use std:: path:: Path ;
88
9- use anyhow:: { Context , Result , anyhow, bail} ;
9+ use anyhow:: { Context , Error , Result , anyhow, bail} ;
1010use futures_util:: stream:: StreamExt ;
1111use std:: sync:: Arc ;
12- use tokio:: sync:: Semaphore ;
12+ use tokio:: sync:: { Semaphore , mpsc } ;
1313use tracing:: info;
1414use url:: Url ;
1515
@@ -154,8 +154,7 @@ impl Manifestation {
154154 let altered = tmp_cx. dist_server != DEFAULT_DIST_SERVER ;
155155
156156 // Download component packages and validate hashes
157- let mut things_to_install = Vec :: new ( ) ;
158- let mut things_downloaded = Vec :: new ( ) ;
157+ let mut things_downloaded: Vec < String > = Vec :: new ( ) ;
159158 let components = update. components_urls_and_hashes ( new_manifest) ?;
160159 let components_len = components. len ( ) ;
161160
@@ -173,49 +172,7 @@ impl Manifestation {
173172 . and_then ( |s| s. parse ( ) . ok ( ) )
174173 . unwrap_or ( DEFAULT_MAX_RETRIES ) ;
175174
176- info ! ( "downloading component(s)" ) ;
177- for bin in & components {
178- ( download_cfg. notify_handler ) ( Notification :: DownloadingComponent (
179- & bin. component . short_name ( new_manifest) ,
180- & self . target_triple ,
181- bin. component . target . as_ref ( ) ,
182- & bin. binary . url ,
183- ) ) ;
184- }
185-
186- let semaphore = Arc :: new ( Semaphore :: new ( concurrent_downloads) ) ;
187- let component_stream = tokio_stream:: iter ( components. into_iter ( ) ) . map ( |bin| {
188- let sem = semaphore. clone ( ) ;
189- async move {
190- let _permit = sem. acquire ( ) . await . unwrap ( ) ;
191- let url = if altered {
192- utils:: parse_url (
193- & bin. binary
194- . url
195- . replace ( DEFAULT_DIST_SERVER , tmp_cx. dist_server . as_str ( ) ) ,
196- ) ?
197- } else {
198- utils:: parse_url ( & bin. binary . url ) ?
199- } ;
200-
201- bin. download ( & url, download_cfg, max_retries, new_manifest)
202- . await
203- . map ( |downloaded| ( bin, downloaded) )
204- }
205- } ) ;
206- if components_len > 0 {
207- let results = component_stream
208- . buffered ( components_len)
209- . collect :: < Vec < _ > > ( )
210- . await ;
211- for result in results {
212- let ( bin, downloaded_file) = result?;
213- things_downloaded. push ( bin. binary . hash . clone ( ) ) ;
214- things_to_install. push ( ( bin, downloaded_file) ) ;
215- }
216- }
217-
218- // Begin transaction
175+ // Begin transaction before the downloads, as installations are interleaved with those
219176 let mut tx = Transaction :: new (
220177 prefix. clone ( ) ,
221178 tmp_cx,
@@ -227,6 +184,16 @@ impl Manifestation {
227184 // to uninstall it first.
228185 tx = self . maybe_handle_v2_upgrade ( & config, tx, download_cfg. process ) ?;
229186
187+ info ! ( "downloading component(s)" ) ;
188+ for bin in & components {
189+ ( download_cfg. notify_handler ) ( Notification :: DownloadingComponent (
190+ & bin. component . short_name ( new_manifest) ,
191+ & self . target_triple ,
192+ bin. component . target . as_ref ( ) ,
193+ & bin. binary . url ,
194+ ) ) ;
195+ }
196+
230197 // Uninstall components
231198 for component in & update. components_to_uninstall {
232199 let notification = if implicit_modify {
@@ -249,16 +216,77 @@ impl Manifestation {
249216 ) ?;
250217 }
251218
252- // Install components
253- for ( component_bin, installer_file) in things_to_install {
254- tx = self . install_component (
255- component_bin,
256- installer_file,
257- tmp_cx,
258- download_cfg,
259- new_manifest,
260- tx,
261- ) ?;
219+ if components_len > 0 {
220+ // Create a channel to communicate whenever a download is done and the component can be installed
221+ // The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
222+ // This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
223+ let total_components = components. len ( ) ;
224+ let ( download_tx, mut download_rx) =
225+ mpsc:: channel :: < Result < ( ComponentBinary < ' _ > , File ) > > ( total_components) ;
226+
227+ let semaphore = Arc :: new ( Semaphore :: new ( concurrent_downloads) ) ;
228+ let component_stream = tokio_stream:: iter ( components. into_iter ( ) ) . map ( |bin| {
229+ let sem = semaphore. clone ( ) ;
230+ let download_tx = download_tx. clone ( ) ;
231+ async move {
232+ let _permit = sem. acquire ( ) . await . unwrap ( ) ;
233+ let url = if altered {
234+ utils:: parse_url (
235+ & bin. binary
236+ . url
237+ . replace ( DEFAULT_DIST_SERVER , tmp_cx. dist_server . as_str ( ) ) ,
238+ ) ?
239+ } else {
240+ utils:: parse_url ( & bin. binary . url ) ?
241+ } ;
242+
243+ let installer_file = bin
244+ . download ( & url, download_cfg, max_retries, new_manifest)
245+ . await ?;
246+ let hash = bin. binary . hash . clone ( ) ;
247+ let _ = download_tx. send ( Ok ( ( bin, installer_file) ) ) . await ;
248+ Ok ( hash)
249+ }
250+ } ) ;
251+
252+ let mut stream = component_stream. buffered ( components_len) ;
253+ let download_handle = async {
254+ let mut hashes = Vec :: new ( ) ;
255+ while let Some ( result) = stream. next ( ) . await {
256+ match result {
257+ Ok ( hash) => {
258+ hashes. push ( hash) ;
259+ }
260+ Err ( e) => {
261+ let _ = download_tx. send ( Err ( e) ) . await ;
262+ }
263+ }
264+ }
265+ hashes
266+ } ;
267+ let install_handle = async {
268+ let mut current_tx = tx;
269+ let mut counter = 0 ;
270+ while counter < total_components
271+ && let Some ( message) = download_rx. recv ( ) . await
272+ {
273+ let ( component_bin, installer_file) = message?;
274+ current_tx = self . install_component (
275+ component_bin,
276+ installer_file,
277+ tmp_cx,
278+ download_cfg,
279+ new_manifest,
280+ current_tx,
281+ ) ?;
282+ counter += 1 ;
283+ }
284+ Ok :: < _ , Error > ( current_tx)
285+ } ;
286+
287+ let ( download_results, install_result) = tokio:: join!( download_handle, install_handle) ;
288+ things_downloaded = download_results;
289+ tx = install_result?;
262290 }
263291
264292 // Install new distribution manifest
0 commit comments