@@ -335,12 +335,19 @@ pub struct Config {
335335}
336336
337337/// Handle for the download services.
338- #[ derive( Clone , Debug ) ]
338+ #[ derive( Debug , Clone ) ]
339339pub struct Downloader {
340+ inner : Arc < Inner > ,
341+ }
342+
343+ #[ derive( Debug ) ]
344+ struct Inner {
340345 /// Next id to use for a download intent.
341- next_id : Arc < AtomicU64 > ,
346+ next_id : AtomicU64 ,
342347 /// Channel to communicate with the service.
343348 msg_tx : mpsc:: Sender < Message > ,
349+ /// Configuration for the downloader.
350+ config : Arc < Config > ,
344351}
345352
346353impl Downloader {
@@ -349,53 +356,46 @@ impl Downloader {
349356 where
350357 S : Store ,
351358 {
352- Self :: with_config ( store, endpoint, rt, Default :: default ( ) , Default :: default ( ) )
359+ Self :: with_config ( store, endpoint, rt, Default :: default ( ) )
353360 }
354361
355362 /// Create a new Downloader with custom [`ConcurrencyLimits`] and [`RetryConfig`].
356- pub fn with_config < S > (
357- store : S ,
358- endpoint : Endpoint ,
359- rt : LocalPoolHandle ,
360- concurrency_limits : ConcurrencyLimits ,
361- retry_config : RetryConfig ,
362- ) -> Self
363+ pub fn with_config < S > ( store : S , endpoint : Endpoint , rt : LocalPoolHandle , config : Config ) -> Self
363364 where
364365 S : Store ,
365366 {
367+ let config = Arc :: new ( config) ;
366368 let me = endpoint. node_id ( ) . fmt_short ( ) ;
367369 let ( msg_tx, msg_rx) = mpsc:: channel ( SERVICE_CHANNEL_CAPACITY ) ;
368370 let dialer = Dialer :: new ( endpoint) ;
369-
371+ let config2 = config . clone ( ) ;
370372 let create_future = move || {
371373 let getter = get:: IoGetter {
372374 store : store. clone ( ) ,
373375 } ;
374-
375- let service = Service :: new ( getter, dialer, concurrency_limits, retry_config, msg_rx) ;
376+ let service = Service :: new ( getter, dialer, config2, msg_rx) ;
376377
377378 service. run ( ) . instrument ( error_span ! ( "downloader" , %me) )
378379 } ;
379380 rt. spawn_detached ( create_future) ;
380381 Self {
381- next_id : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
382- msg_tx,
382+ inner : Arc :: new ( Inner {
383+ next_id : AtomicU64 :: new ( 0 ) ,
384+ msg_tx,
385+ config,
386+ } ) ,
383387 }
384388 }
385389
386390 /// Get the current configuration.
387- pub async fn get_config ( & self ) -> anyhow:: Result < Config > {
388- let ( tx, rx) = oneshot:: channel ( ) ;
389- let msg = Message :: GetConfig { tx } ;
390- self . msg_tx . send ( msg) . await ?;
391- let config = rx. await ?;
392- Ok ( config)
391+ pub fn config ( & self ) -> & Config {
392+ & self . inner . config
393393 }
394394
395395 /// Queue a download.
396396 pub async fn queue ( & self , request : DownloadRequest ) -> DownloadHandle {
397397 let kind = request. kind ;
398- let intent_id = IntentId ( self . next_id . fetch_add ( 1 , Ordering :: SeqCst ) ) ;
398+ let intent_id = IntentId ( self . inner . next_id . fetch_add ( 1 , Ordering :: SeqCst ) ) ;
399399 let ( sender, receiver) = oneshot:: channel ( ) ;
400400 let handle = DownloadHandle {
401401 id : intent_id,
@@ -409,7 +409,7 @@ impl Downloader {
409409 } ;
410410 // if this fails polling the handle will fail as well since the sender side of the oneshot
411411 // will be dropped
412- if let Err ( send_err) = self . msg_tx . send ( msg) . await {
412+ if let Err ( send_err) = self . inner . msg_tx . send ( msg) . await {
413413 let msg = send_err. 0 ;
414414 debug ! ( ?msg, "download not sent" ) ;
415415 }
@@ -425,7 +425,7 @@ impl Downloader {
425425 receiver : _,
426426 } = handle;
427427 let msg = Message :: CancelIntent { id, kind } ;
428- if let Err ( send_err) = self . msg_tx . send ( msg) . await {
428+ if let Err ( send_err) = self . inner . msg_tx . send ( msg) . await {
429429 let msg = send_err. 0 ;
430430 debug ! ( ?msg, "cancel not sent" ) ;
431431 }
@@ -437,7 +437,7 @@ impl Downloader {
437437 /// downloads. Use [`Self::queue`] to queue a download.
438438 pub async fn nodes_have ( & mut self , hash : Hash , nodes : Vec < NodeId > ) {
439439 let msg = Message :: NodesHave { hash, nodes } ;
440- if let Err ( send_err) = self . msg_tx . send ( msg) . await {
440+ if let Err ( send_err) = self . inner . msg_tx . send ( msg) . await {
441441 let msg = send_err. 0 ;
442442 debug ! ( ?msg, "nodes have not been sent" )
443443 }
@@ -459,11 +459,6 @@ enum Message {
459459 /// Cancel an intent. The associated request will be cancelled when the last intent is
460460 /// cancelled.
461461 CancelIntent { id : IntentId , kind : DownloadKind } ,
462- /// Get the config
463- GetConfig {
464- #[ debug( skip) ]
465- tx : oneshot:: Sender < Config > ,
466- } ,
467462}
468463
469464#[ derive( derive_more:: Debug ) ]
@@ -590,19 +585,13 @@ struct Service<G: Getter, D: DialerT> {
590585 progress_tracker : ProgressTracker ,
591586}
592587impl < G : Getter < Connection = D :: Connection > , D : DialerT > Service < G , D > {
593- fn new (
594- getter : G ,
595- dialer : D ,
596- concurrency_limits : ConcurrencyLimits ,
597- retry_config : RetryConfig ,
598- msg_rx : mpsc:: Receiver < Message > ,
599- ) -> Self {
588+ fn new ( getter : G , dialer : D , config : Arc < Config > , msg_rx : mpsc:: Receiver < Message > ) -> Self {
600589 Service {
601590 getter,
602591 dialer,
603592 msg_rx,
604- concurrency_limits,
605- retry_config,
593+ concurrency_limits : config . concurrency ,
594+ retry_config : config . retry ,
606595 connected_nodes : Default :: default ( ) ,
607596 retry_node_state : Default :: default ( ) ,
608597 providers : Default :: default ( ) ,
@@ -691,13 +680,6 @@ impl<G: Getter<Connection = D::Connection>, D: DialerT> Service<G, D> {
691680 self . queue . unpark_hash ( hash) ;
692681 }
693682 }
694- Message :: GetConfig { tx } => {
695- let config = Config {
696- concurrency : self . concurrency_limits ,
697- retry : self . retry_config ,
698- } ;
699- tx. send ( config) . ok ( ) ;
700- }
701683 }
702684 }
703685
0 commit comments