@@ -4,7 +4,7 @@ use bytes::Bytes;
44use futures_lite:: StreamExt ;
55use genawaiter:: rc:: Co ;
66use iroh_blobs:: store:: Store as PayloadStore ;
7- use tracing:: { debug, trace } ;
7+ use tracing:: debug;
88
99use crate :: {
1010 proto:: {
@@ -53,7 +53,7 @@ pub enum Output {
5353pub struct Reconciler < S : Storage > {
5454 shared : Shared < S > ,
5555 recv : Cancelable < MessageReceiver < ReconciliationMessage > > ,
56- targets : TargetMap < S > ,
56+ targets : TargetMap ,
5757 entry_state : EntryState ,
5858}
5959
@@ -219,30 +219,30 @@ impl<S: Storage> Reconciler<S> {
219219}
220220
221221#[ derive( Debug ) ]
222- struct TargetMap < S : Storage > {
223- map : HashMap < TargetId , Target < S > > ,
222+ struct TargetMap {
223+ map : HashMap < TargetId , Target > ,
224224 inbox : CancelableReceiver < Input > ,
225225}
226226
227- impl < S : Storage > TargetMap < S > {
227+ impl TargetMap {
228228 pub fn new ( inbox : CancelableReceiver < Input > ) -> Self {
229229 Self {
230230 map : Default :: default ( ) ,
231231 inbox,
232232 }
233233 }
234- pub async fn get_eventually (
234+ pub async fn get_eventually < S : Storage > (
235235 & mut self ,
236236 shared : & Shared < S > ,
237237 requested_id : & TargetId ,
238- ) -> Result < & mut Target < S > , Error > {
238+ ) -> Result < & mut Target , Error > {
239239 if !self . map . contains_key ( requested_id) {
240240 self . wait_for_target ( shared, requested_id) . await ?;
241241 }
242242 return Ok ( self . map . get_mut ( requested_id) . unwrap ( ) ) ;
243243 }
244244
245- async fn wait_for_target (
245+ async fn wait_for_target < S : Storage > (
246246 & mut self ,
247247 shared : & Shared < S > ,
248248 requested_id : & TargetId ,
@@ -261,13 +261,12 @@ impl<S: Storage> TargetMap<S> {
261261 Ok ( ( ) )
262262 }
263263
264- async fn init_target (
264+ async fn init_target < S : Storage > (
265265 & mut self ,
266266 shared : & Shared < S > ,
267267 intersection : AoiIntersection ,
268268 ) -> Result < TargetId , Error > {
269- let snapshot = shared. store . entries ( ) . snapshot ( ) ?;
270- let target = Target :: init ( snapshot, shared, intersection) . await ?;
269+ let target = Target :: init ( shared, intersection) . await ?;
271270 let id = target. id ( ) ;
272271 debug ! (
273272 our_handle = id. 0 . value( ) ,
@@ -375,29 +374,23 @@ struct Shared<S: Storage> {
375374}
376375
377376#[ derive( Debug ) ]
378- struct Target < S : Storage > {
379- snapshot : <S :: Entries as EntryStorage >:: Snapshot ,
380-
377+ struct Target {
381378 intersection : AoiIntersection ,
382-
383379 our_uncovered_ranges : HashSet < u64 > ,
384380 started : bool ,
385-
386381 our_range_counter : u64 ,
387382 their_range_counter : u64 ,
388383}
389384
390- impl < S : Storage > Target < S > {
385+ impl Target {
391386 fn id ( & self ) -> TargetId {
392387 self . intersection . id ( )
393388 }
394- async fn init (
395- snapshot : <S :: Entries as EntryStorage >:: Snapshot ,
389+ async fn init < S : Storage > (
396390 shared : & Shared < S > ,
397391 intersection : AoiIntersection ,
398392 ) -> Result < Self , Error > {
399393 let mut this = Target {
400- snapshot,
401394 intersection,
402395 our_uncovered_ranges : Default :: default ( ) ,
403396 started : false ,
@@ -414,9 +407,12 @@ impl<S: Storage> Target<S> {
414407 self . intersection . namespace
415408 }
416409
417- async fn initiate ( & mut self , shared : & Shared < S > ) -> Result < ( ) , Error > {
410+ async fn initiate < S : Storage > ( & mut self , shared : & Shared < S > ) -> Result < ( ) , Error > {
418411 let range = self . intersection . area ( ) . to_range ( ) ;
419- let fingerprint = self . snapshot . fingerprint ( self . namespace ( ) , & range) ?;
412+ let fingerprint = shared
413+ . store
414+ . entries ( )
415+ . fingerprint ( self . namespace ( ) , & range) ?;
420416 self . send_fingerprint ( shared, range, fingerprint, None )
421417 . await ?;
422418 Ok ( ( ) )
@@ -426,7 +422,7 @@ impl<S: Storage> Target<S> {
426422 self . started && self . our_uncovered_ranges . is_empty ( )
427423 }
428424
429- async fn received_send_fingerprint (
425+ async fn received_send_fingerprint < S : Storage > (
430426 & mut self ,
431427 shared : & Shared < S > ,
432428 message : ReconciliationSendFingerprint ,
@@ -437,22 +433,15 @@ impl<S: Storage> Target<S> {
437433 }
438434 let range_count = self . next_range_count_theirs ( ) ;
439435
440- let our_fingerprint = self
441- . snapshot
436+ let our_fingerprint = shared
437+ . store
438+ . entries ( )
442439 . fingerprint ( self . namespace ( ) , & message. range ) ?;
443440
444441 // case 1: fingerprint match.
445442 if our_fingerprint == message. fingerprint {
446- let reply = ReconciliationAnnounceEntries {
447- range : message. range . clone ( ) ,
448- is_empty : true ,
449- want_response : false ,
450- will_sort : false ,
451- sender_handle : message. receiver_handle ,
452- receiver_handle : message. sender_handle ,
453- covers : Some ( range_count) ,
454- } ;
455- shared. send . send ( reply) . await ?;
443+ self . announce_and_send_entries ( shared, & message. range , false , Some ( range_count) , true )
444+ . await ?;
456445 }
457446 // case 2: fingerprint is empty
458447 else if message. fingerprint . is_empty ( ) {
@@ -462,10 +451,10 @@ impl<S: Storage> Target<S> {
462451 // case 3: fingerprint doesn't match and is non-empty
463452 else {
464453 // reply by splitting the range into parts unless it is very short
465- // TODO: Expose
454+ // TODO: Expose these options to a higher level.
466455 let split_opts = SplitOpts :: default ( ) ;
467- let snapshot = self . snapshot . clone ( ) ;
468- let mut iter = snapshot
456+ let store = shared . store . entries ( ) . clone ( ) ;
457+ let mut iter = store
469458 . split_range ( self . namespace ( ) , & message. range , & split_opts) ?
470459 . peekable ( ) ;
471460 while let Some ( res) = iter. next ( ) {
@@ -474,7 +463,7 @@ impl<S: Storage> Target<S> {
474463 let covers = is_last. then_some ( range_count) ;
475464 match action {
476465 SplitAction :: SendEntries ( count) => {
477- self . announce_and_send_entries ( shared, & subrange, true , covers, count > 0 )
466+ self . announce_and_send_entries ( shared, & subrange, true , covers, count == 0 )
478467 . await ?;
479468 }
480469 SplitAction :: SendFingerprint ( fingerprint) => {
@@ -488,12 +477,11 @@ impl<S: Storage> Target<S> {
488477 Ok ( ( ) )
489478 }
490479
491- async fn received_announce_entries (
480+ async fn received_announce_entries < S : Storage > (
492481 & mut self ,
493482 shared : & Shared < S > ,
494483 message : ReconciliationAnnounceEntries ,
495484 ) -> Result < ( ) , Error > {
496- trace ! ( ?message, "received_announce_entries start" ) ;
497485 self . started = true ;
498486 if let Some ( range_count) = message. covers {
499487 self . mark_our_range_covered ( range_count) ?;
@@ -504,11 +492,10 @@ impl<S: Storage> Target<S> {
504492 self . announce_and_send_entries ( shared, & message. range , false , Some ( range_count) , false )
505493 . await ?;
506494 }
507- trace ! ( "received_announce_entries done" ) ;
508495 Ok ( ( ) )
509496 }
510497
511- async fn send_fingerprint (
498+ async fn send_fingerprint < S : Storage > (
512499 & mut self ,
513500 shared : & Shared < S > ,
514501 range : Range3d ,
@@ -527,7 +514,9 @@ impl<S: Storage> Target<S> {
527514 Ok ( ( ) )
528515 }
529516
530- async fn announce_and_send_entries (
517+ /// Send a [`ReconciliationAnnounceEntries`] message for a range, and all entries in the range unless
518+ /// `is_empty` is set to true.
519+ async fn announce_and_send_entries < S : Storage > (
531520 & mut self ,
532521 shared : & Shared < S > ,
533522 range : & Range3d ,
@@ -539,17 +528,25 @@ impl<S: Storage> Target<S> {
539528 self . mark_our_next_range_pending ( ) ;
540529 }
541530
542- let ( iter, is_empty) = if is_empty {
543- ( None , true )
531+ // If we know for sure that our range is empty, we can skip creating the entry iterator alltogether.
532+ let mut iter = if is_empty {
533+ None
544534 } else {
545- let mut iter = self
546- . snapshot
547- . get_authorised_entries ( self . namespace ( ) , range)
548- . peekable ( ) ;
549- let is_empty = iter. peek ( ) . is_none ( ) ;
550- ( Some ( iter) , is_empty)
535+ Some (
536+ shared
537+ . store
538+ . entries ( )
539+ . get_authorised_entries ( self . namespace ( ) , range)
540+ . peekable ( ) ,
541+ )
551542 } ;
543+ // Find out if we will send any entries at all.
544+ let is_empty = iter
545+ . as_mut ( )
546+ . map ( |iter| iter. peek ( ) . is_none ( ) )
547+ . unwrap_or ( true ) ;
552548
549+ // Send the announce message
553550 let msg = ReconciliationAnnounceEntries {
554551 range : range. clone ( ) . into ( ) ,
555552 is_empty,
@@ -561,10 +558,12 @@ impl<S: Storage> Target<S> {
561558 } ;
562559 shared. send . send ( msg) . await ?;
563560
561+ // If our range is empty, we're done!
564562 let Some ( mut iter) = iter else {
565563 return Ok ( ( ) ) ;
566564 } ;
567565
566+ // Otherwise send all the entries in our iterator, and payloads if applicable.
568567 while let Some ( authorised_entry) = iter. next ( ) {
569568 let authorised_entry = authorised_entry?;
570569 let ( entry, token) = authorised_entry. into_parts ( ) ;
0 commit comments