1- use std:: {
2- collections:: { HashMap , HashSet } ,
3- num:: NonZeroU64 ,
4- } ;
1+ use std:: collections:: { HashMap , HashSet } ;
52
63use bytes:: Bytes ;
74use futures_lite:: StreamExt ;
@@ -136,7 +133,7 @@ impl<S: Storage> Reconciler<S> {
136133 ReconciliationMessage :: AnnounceEntries ( message) => {
137134 let target_id = message. handles ( ) ;
138135 self . entry_state
139- . received_announce_entries ( target_id, message. count ) ?;
136+ . received_announce_entries ( target_id, message. is_empty ) ?;
140137 let target = self
141138 . targets
142139 . get_eventually ( & self . shared , & target_id)
@@ -173,9 +170,11 @@ impl<S: Storage> Reconciler<S> {
173170 . received_send_payload ( self . shared . store . payloads ( ) , message. bytes )
174171 . await ?;
175172 }
176- ReconciliationMessage :: TerminatePayload ( _message) => {
177- if let Some ( completed_target) =
178- self . entry_state . received_terminate_payload ( ) . await ?
173+ ReconciliationMessage :: TerminatePayload ( message) => {
174+ if let Some ( completed_target) = self
175+ . entry_state
176+ . received_terminate_payload ( message. is_final )
177+ . await ?
179178 {
180179 let target = self
181180 . targets
@@ -288,14 +287,17 @@ impl EntryState {
288287 self . 0 . is_none ( )
289288 }
290289
291- pub fn received_announce_entries ( & mut self , target : TargetId , count : u64 ) -> Result < ( ) , Error > {
290+ pub fn received_announce_entries (
291+ & mut self ,
292+ target : TargetId ,
293+ is_empty : bool ,
294+ ) -> Result < ( ) , Error > {
292295 if self . 0 . is_some ( ) {
293296 return Err ( Error :: InvalidMessageInCurrentState ) ;
294297 }
295- if let Some ( count ) = NonZeroU64 :: new ( count ) {
298+ if !is_empty {
296299 self . 0 = Some ( EntryStateInner {
297300 target,
298- remaining_entries : Some ( count) ,
299301 current_payload : CurrentPayload :: default ( ) ,
300302 } ) ;
301303 }
@@ -310,10 +312,6 @@ impl EntryState {
310312 ) -> Result < ( ) , Error > {
311313 let state = self . get_mut ( ) ?;
312314 state. current_payload . ensure_none ( ) ?;
313- state. remaining_entries = match state. remaining_entries . take ( ) {
314- None => return Err ( Error :: InvalidMessageInCurrentState ) ,
315- Some ( c) => NonZeroU64 :: new ( c. get ( ) . saturating_sub ( 1 ) ) ,
316- } ;
317315 state. current_payload . set (
318316 payload_digest,
319317 total_payload_length,
@@ -335,10 +333,13 @@ impl EntryState {
335333 Ok ( ( ) )
336334 }
337335
338- pub async fn received_terminate_payload ( & mut self ) -> Result < Option < TargetId > , Error > {
336+ pub async fn received_terminate_payload (
337+ & mut self ,
338+ is_final : bool ,
339+ ) -> Result < Option < TargetId > , Error > {
339340 let state = self . get_mut ( ) ?;
340341 state. current_payload . finalize ( ) . await ?;
341- if state . remaining_entries . is_none ( ) {
342+ if is_final {
342343 let target_id = state. target ;
343344 self . 0 = None ;
344345 Ok ( Some ( target_id) )
@@ -358,7 +359,6 @@ impl EntryState {
358359#[ derive( Debug ) ]
359360struct EntryStateInner {
360361 target : TargetId ,
361- remaining_entries : Option < NonZeroU64 > ,
362362 current_payload : CurrentPayload ,
363363}
364364
@@ -445,7 +445,7 @@ impl<S: Storage> Target<S> {
445445 if our_fingerprint == message. fingerprint {
446446 let reply = ReconciliationAnnounceEntries {
447447 range : message. range . clone ( ) ,
448- count : 0 ,
448+ is_empty : true ,
449449 want_response : false ,
450450 will_sort : false ,
451451 sender_handle : message. receiver_handle ,
@@ -456,7 +456,7 @@ impl<S: Storage> Target<S> {
456456 }
457457 // case 2: fingerprint is empty
458458 else if message. fingerprint . is_empty ( ) {
459- self . announce_and_send_entries ( shared, & message. range , true , Some ( range_count) , None )
459+ self . announce_and_send_entries ( shared, & message. range , true , Some ( range_count) , false )
460460 . await ?;
461461 }
462462 // case 3: fingerprint doesn't match and is non-empty
@@ -474,14 +474,8 @@ impl<S: Storage> Target<S> {
474474 let covers = is_last. then_some ( range_count) ;
475475 match action {
476476 SplitAction :: SendEntries ( count) => {
477- self . announce_and_send_entries (
478- shared,
479- & subrange,
480- true ,
481- covers,
482- Some ( count) ,
483- )
484- . await ?;
477+ self . announce_and_send_entries ( shared, & subrange, true , covers, count > 0 )
478+ . await ?;
485479 }
486480 SplitAction :: SendFingerprint ( fingerprint) => {
487481 self . send_fingerprint ( shared, subrange, fingerprint, covers)
@@ -507,7 +501,7 @@ impl<S: Storage> Target<S> {
507501
508502 if message. want_response {
509503 let range_count = self . next_range_count_theirs ( ) ;
510- self . announce_and_send_entries ( shared, & message. range , false , Some ( range_count) , None )
504+ self . announce_and_send_entries ( shared, & message. range , false , Some ( range_count) , false )
511505 . await ?;
512506 }
513507 trace ! ( "received_announce_entries done" ) ;
@@ -539,30 +533,39 @@ impl<S: Storage> Target<S> {
539533 range : & Range3d ,
540534 want_response : bool ,
541535 covers : Option < u64 > ,
542- our_entry_count : Option < u64 > ,
536+ is_empty : bool ,
543537 ) -> Result < ( ) , Error > {
544- let our_entry_count = match our_entry_count {
545- Some ( count) => count,
546- None => self . snapshot . count ( self . namespace ( ) , range) ?,
538+ if want_response {
539+ self . mark_our_next_range_pending ( ) ;
540+ }
541+
542+ let ( iter, is_empty) = if is_empty {
543+ ( None , true )
544+ } else {
545+ let mut iter = self
546+ . snapshot
547+ . get_authorised_entries ( self . namespace ( ) , range)
548+ . peekable ( ) ;
549+ let is_empty = iter. peek ( ) . is_none ( ) ;
550+ ( Some ( iter) , is_empty)
547551 } ;
552+
548553 let msg = ReconciliationAnnounceEntries {
549554 range : range. clone ( ) . into ( ) ,
550- count : our_entry_count ,
555+ is_empty ,
551556 want_response,
552557 will_sort : false , // todo: sorted?
553558 sender_handle : self . intersection . our_handle ,
554559 receiver_handle : self . intersection . their_handle ,
555560 covers,
556561 } ;
557- if want_response {
558- self . mark_our_next_range_pending ( ) ;
559- }
560562 shared. send . send ( msg) . await ?;
561563
562- for authorised_entry in self
563- . snapshot
564- . get_authorised_entries ( self . namespace ( ) , range)
565- {
564+ let Some ( mut iter) = iter else {
565+ return Ok ( ( ) ) ;
566+ } ;
567+
568+ while let Some ( authorised_entry) = iter. next ( ) {
566569 let authorised_entry = authorised_entry?;
567570 let ( entry, token) = authorised_entry. into_parts ( ) ;
568571
@@ -590,7 +593,11 @@ impl<S: Storage> Target<S> {
590593 } )
591594 . await ?;
592595 }
593- shared. send . send ( ReconciliationTerminatePayload ) . await ?;
596+ let is_final = iter. peek ( ) . is_none ( ) ;
597+ shared
598+ . send
599+ . send ( ReconciliationTerminatePayload { is_final } )
600+ . await ?;
594601 }
595602 Ok ( ( ) )
596603 }
0 commit comments