1919//! will take the raw data received here and extract meaningful results from it.
2020
2121use std:: cmp;
22- use std:: collections:: HashMap ;
22+ use std:: collections:: { HashMap , BTreeSet } ;
2323use std:: marker:: PhantomData ;
2424use std:: sync:: Arc ;
2525
2626use ethcore:: executed:: { Executed , ExecutionError } ;
2727
28- use futures:: { Poll , Future } ;
29- use futures:: sync:: oneshot:: { self , Receiver , Canceled } ;
28+ use futures:: { Poll , Future , Async } ;
29+ use futures:: sync:: oneshot:: { self , Receiver } ;
3030use network:: PeerId ;
3131use parking_lot:: { RwLock , Mutex } ;
3232use rand;
33+ use std:: time:: { Duration , SystemTime } ;
3334
3435use net:: {
3536 self , Handler , PeerStatus , Status , Capabilities ,
@@ -49,7 +50,45 @@ pub mod request;
4950/// The result of execution
5051pub type ExecutionResult = Result < Executed , ExecutionError > ;
5152
53+ /// The default number of retries for OnDemand queries to send to the other nodes
54+ pub const DEFAULT_RETRY_COUNT : usize = 10 ;
55+
56+ /// The default time limit in milliseconds for inactive (no new peer to connect to) OnDemand queries (0 for unlimited)
57+ pub const DEFAULT_QUERY_TIME_LIMIT : Duration = Duration :: from_millis ( 10000 ) ;
58+
59+ const NULL_DURATION : Duration = Duration :: from_secs ( 0 ) ;
60+
61+ /// OnDemand related errors
62+ pub mod error {
63+ use futures:: sync:: oneshot:: Canceled ;
64+
65+ error_chain ! {
66+
67+ foreign_links {
68+ ChannelCanceled ( Canceled ) #[ doc = "Canceled oneshot channel" ] ;
69+ }
70+
71+ errors {
72+ #[ doc = "Max number of on-demand query attempts reached without result." ]
73+ MaxAttemptReach ( query_index: usize ) {
74+ description( "On-demand query limit reached" )
75+ display( "On-demand query limit reached on query #{}" , query_index)
76+ }
77+
78+ #[ doc = "No reply with current peer set, time out occured while waiting for new peers for additional query attempt." ]
79+ TimeoutOnNewPeers ( query_index: usize , remaining_attempts: usize ) {
80+ description( "Timeout for On-demand query" )
81+ display( "Timeout for On-demand query; {} query attempts remain for query #{}" , remaining_attempts, query_index)
82+ }
83+
84+ }
85+
86+ }
87+
88+ }
89+
5290// relevant peer info.
91+ #[ derive( Debug , Clone , PartialEq , Eq ) ]
5392struct Peer {
5493 status : Status ,
5594 capabilities : Capabilities ,
@@ -74,13 +113,21 @@ impl Peer {
74113 }
75114}
76115
116+
117+ /// Either an array of responses or a single error.
118+ type PendingResponse = self :: error:: Result < Vec < Response > > ;
119+
77120// Attempted request info and sender to put received value.
78121struct Pending {
79122 requests : basic_request:: Batch < CheckedRequest > ,
80123 net_requests : basic_request:: Batch < NetworkRequest > ,
81124 required_capabilities : Capabilities ,
82125 responses : Vec < Response > ,
83- sender : oneshot:: Sender < Vec < Response > > ,
126+ sender : oneshot:: Sender < PendingResponse > ,
127+ base_query_index : usize ,
128+ remaining_query_count : usize ,
129+ query_id_history : BTreeSet < PeerId > ,
130+ inactive_time_limit : Option < SystemTime > ,
84131}
85132
86133impl Pending {
@@ -142,7 +189,9 @@ impl Pending {
142189 // if the requests are complete, send the result and consume self.
143190 fn try_complete ( self ) -> Option < Self > {
144191 if self . requests . is_complete ( ) {
145- let _ = self . sender . send ( self . responses ) ;
192+ if self . sender . send ( Ok ( self . responses ) ) . is_err ( ) {
193+ debug ! ( target: "on_demand" , "Dropped oneshot channel receiver on complete request at query #{}" , self . query_id_history. len( ) ) ;
194+ }
146195 None
147196 } else {
148197 Some ( self )
@@ -177,6 +226,25 @@ impl Pending {
177226 self . net_requests = builder. build ( ) ;
178227 self . required_capabilities = capabilities;
179228 }
229+
230+ // returning no reponse, it will result in an error.
231+ // self is consumed on purpose.
232+ fn no_response ( self ) {
233+ trace ! ( target: "on_demand" , "Dropping a pending query (no reply) at query #{}" , self . query_id_history. len( ) ) ;
234+ let err = self :: error:: ErrorKind :: MaxAttemptReach ( self . requests . num_answered ( ) ) ;
235+ if self . sender . send ( Err ( err. into ( ) ) ) . is_err ( ) {
236+ debug ! ( target: "on_demand" , "Dropped oneshot channel receiver on no response" ) ;
237+ }
238+ }
239+
240+ // returning a peer discovery timeout during query attempts
241+ fn time_out ( self ) {
242+ trace ! ( target: "on_demand" , "Dropping a pending query (no new peer time out) at query #{}" , self . query_id_history. len( ) ) ;
243+ let err = self :: error:: ErrorKind :: TimeoutOnNewPeers ( self . requests . num_answered ( ) , self . query_id_history . len ( ) ) ;
244+ if self . sender . send ( Err ( err. into ( ) ) ) . is_err ( ) {
245+ debug ! ( target: "on_demand" , "Dropped oneshot channel receiver on time out" ) ;
246+ }
247+ }
180248}
181249
182250// helper to guess capabilities required for a given batch of network requests.
@@ -230,16 +298,21 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
230298/// A future extracting the concrete output type of the generic adapter
231299/// from a vector of responses.
232300pub struct OnResponses < T : request:: RequestAdapter > {
233- receiver : Receiver < Vec < Response > > ,
301+ receiver : Receiver < PendingResponse > ,
234302 _marker : PhantomData < T > ,
235303}
236304
237305impl < T : request:: RequestAdapter > Future for OnResponses < T > {
238306 type Item = T :: Out ;
239- type Error = Canceled ;
307+ type Error = self :: error :: Error ;
240308
241309 fn poll ( & mut self ) -> Poll < Self :: Item , Self :: Error > {
242- self . receiver . poll ( ) . map ( |async| async . map ( T :: extract_from) )
310+ match self . receiver . poll ( ) {
311+ Ok ( Async :: Ready ( Ok ( v) ) ) => Ok ( Async :: Ready ( T :: extract_from ( v) ) ) ,
312+ Ok ( Async :: Ready ( Err ( e) ) ) => Err ( e) ,
313+ Ok ( Async :: NotReady ) => Ok ( Async :: NotReady ) ,
314+ Err ( e) => Err ( e. into ( ) ) ,
315+ }
243316 }
244317}
245318
@@ -253,9 +326,12 @@ pub struct OnDemand {
253326 in_transit : RwLock < HashMap < ReqId , Pending > > ,
254327 cache : Arc < Mutex < Cache > > ,
255328 no_immediate_dispatch : bool ,
329+ base_retry_count : usize ,
330+ query_inactive_time_limit : Option < Duration > ,
256331}
257332
258333impl OnDemand {
334+
259335 /// Create a new `OnDemand` service with the given cache.
260336 pub fn new ( cache : Arc < Mutex < Cache > > ) -> Self {
261337 OnDemand {
@@ -264,6 +340,8 @@ impl OnDemand {
264340 in_transit : RwLock :: new ( HashMap :: new ( ) ) ,
265341 cache,
266342 no_immediate_dispatch : false ,
343+ base_retry_count : DEFAULT_RETRY_COUNT ,
344+ query_inactive_time_limit : Some ( DEFAULT_QUERY_TIME_LIMIT ) ,
267345 }
268346 }
269347
@@ -282,11 +360,11 @@ impl OnDemand {
282360 /// Fails if back-references are not coherent.
283361 /// The returned vector of responses will correspond to the requests exactly.
284362 pub fn request_raw ( & self , ctx : & BasicContext , requests : Vec < Request > )
285- -> Result < Receiver < Vec < Response > > , basic_request:: NoSuchOutput >
363+ -> Result < Receiver < PendingResponse > , basic_request:: NoSuchOutput >
286364 {
287365 let ( sender, receiver) = oneshot:: channel ( ) ;
288366 if requests. is_empty ( ) {
289- assert ! ( sender. send( Vec :: new( ) ) . is_ok( ) , "receiver still in scope; qed" ) ;
367+ assert ! ( sender. send( Ok ( Vec :: new( ) ) ) . is_ok( ) , "receiver still in scope; qed" ) ;
290368 return Ok ( receiver) ;
291369 }
292370
@@ -325,6 +403,10 @@ impl OnDemand {
325403 required_capabilities : capabilities,
326404 responses,
327405 sender,
406+ base_query_index : 0 ,
407+ remaining_query_count : 0 ,
408+ query_id_history : BTreeSet :: new ( ) ,
409+ inactive_time_limit : None ,
328410 } ) ;
329411
330412 Ok ( receiver)
@@ -363,30 +445,68 @@ impl OnDemand {
363445 let peers = self . peers . read ( ) ;
364446 * pending = :: std:: mem:: replace ( & mut * pending, Vec :: new ( ) ) . into_iter ( )
365447 . filter ( |pending| !pending. sender . is_canceled ( ) )
366- . filter_map ( |pending| {
448+ . filter_map ( |mut pending| {
367449 // the peer we dispatch to is chosen randomly
368450 let num_peers = peers. len ( ) ;
369- let rng = rand:: random :: < usize > ( ) % cmp:: max ( num_peers, 1 ) ;
370- for ( peer_id, peer) in peers. iter ( ) . chain ( peers. iter ( ) ) . skip ( rng) . take ( num_peers) {
451+ let history_len = pending. query_id_history . len ( ) ;
452+ let offset = if history_len == 0 {
453+ pending. remaining_query_count = self . base_retry_count ;
454+ let rand = rand:: random :: < usize > ( ) ;
455+ pending. base_query_index = rand;
456+ rand
457+ } else {
458+ pending. base_query_index + history_len
459+ } % cmp:: max ( num_peers, 1 ) ;
460+ let init_remaining_query_count = pending. remaining_query_count ; // to fail in case of big reduction of nb of peers
461+ for ( peer_id, peer) in peers. iter ( ) . chain ( peers. iter ( ) )
462+ . skip ( offset) . take ( num_peers) {
371463 // TODO: see which requests can be answered by the cache?
372-
373- if !peer. can_fulfill ( & pending. required_capabilities ) {
374- continue
464+ if pending. remaining_query_count == 0 {
465+ break
375466 }
376467
377- match ctx. request_from ( * peer_id, pending. net_requests . clone ( ) ) {
378- Ok ( req_id) => {
379- trace ! ( target: "on_demand" , "Dispatched request {} to peer {}" , req_id, peer_id) ;
380- self . in_transit . write ( ) . insert ( req_id, pending) ;
381- return None
468+ if pending. query_id_history . insert ( peer_id. clone ( ) ) {
469+
470+ if !peer. can_fulfill ( & pending. required_capabilities ) {
471+ trace ! ( target: "on_demand" , "Peer {} without required capabilities, skipping, {} remaining attempts" , peer_id, pending. remaining_query_count) ;
472+ continue
473+ }
474+
475+ pending. remaining_query_count -= 1 ;
476+ pending. inactive_time_limit = None ;
477+
478+ match ctx. request_from ( * peer_id, pending. net_requests . clone ( ) ) {
479+ Ok ( req_id) => {
480+ trace ! ( target: "on_demand" , "Dispatched request {} to peer {}, {} remaining attempts" , req_id, peer_id, pending. remaining_query_count) ;
481+ self . in_transit . write ( ) . insert ( req_id, pending) ;
482+ return None
483+ }
484+ Err ( net:: Error :: NoCredits ) | Err ( net:: Error :: NotServer ) => { }
485+ Err ( e) => debug ! ( target: "on_demand" , "Error dispatching request to peer: {}" , e) ,
382486 }
383- Err ( net:: Error :: NoCredits ) | Err ( net:: Error :: NotServer ) => { }
384- Err ( e) => debug ! ( target: "on_demand" , "Error dispatching request to peer: {}" , e) ,
385487 }
386488 }
387489
388- // TODO: maximum number of failures _when we have peers_.
389- Some ( pending)
490+ if pending. remaining_query_count == 0 {
491+ pending. no_response ( ) ;
492+ None
493+ } else if init_remaining_query_count == pending. remaining_query_count {
494+ if let Some ( query_inactive_time_limit) = self . query_inactive_time_limit {
495+ let now = SystemTime :: now ( ) ;
496+ if let Some ( inactive_time_limit) = pending. inactive_time_limit {
497+ if now > inactive_time_limit {
498+ pending. time_out ( ) ;
499+ return None
500+ }
501+ } else {
502+ debug ! ( target: "on_demand" , "No more peers to query, waiting for {} seconds until dropping query" , query_inactive_time_limit. as_secs( ) ) ;
503+ pending. inactive_time_limit = Some ( now + query_inactive_time_limit) ;
504+ }
505+ }
506+ Some ( pending)
507+ } else {
508+ Some ( pending)
509+ }
390510 } )
391511 . collect ( ) ; // `pending` now contains all requests we couldn't dispatch.
392512
@@ -406,6 +526,21 @@ impl OnDemand {
406526 self . attempt_dispatch ( ctx) ;
407527 }
408528 }
529+
530+ /// Set the retry count for a query.
531+ pub fn default_retry_number ( & mut self , nb_retry : usize ) {
532+ self . base_retry_count = nb_retry;
533+ }
534+
535+ /// Set the time limit for a query.
536+ pub fn query_inactive_time_limit ( & mut self , inactive_time_limit : Duration ) {
537+ self . query_inactive_time_limit = if inactive_time_limit == NULL_DURATION {
538+ None
539+ } else {
540+ Some ( inactive_time_limit)
541+ } ;
542+ }
543+
409544}
410545
411546impl Handler for OnDemand {
@@ -458,6 +593,16 @@ impl Handler for OnDemand {
458593 None => return ,
459594 } ;
460595
596+ if responses. is_empty ( ) {
597+ if pending. remaining_query_count == 0 {
598+ pending. no_response ( ) ;
599+ return ;
600+ }
601+ } else {
602+ // do not keep query counter for others elements of this batch
603+ pending. query_id_history . clear ( ) ;
604+ }
605+
461606 // for each incoming response
462607 // 1. ensure verification data filled.
463608 // 2. pending.requests.supply_response
0 commit comments