Skip to content

Commit 14047d3

Browse files
committed
fix: do not use store snapshots anymore
1 parent 3b9fd43 commit 14047d3

File tree

1 file changed

+51
-52
lines changed

1 file changed

+51
-52
lines changed

iroh-willow/src/session/reconciler.rs

Lines changed: 51 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bytes::Bytes;
44
use futures_lite::StreamExt;
55
use genawaiter::rc::Co;
66
use iroh_blobs::store::Store as PayloadStore;
7-
use tracing::{debug, trace};
7+
use tracing::debug;
88

99
use crate::{
1010
proto::{
@@ -53,7 +53,7 @@ pub enum Output {
5353
pub struct Reconciler<S: Storage> {
5454
shared: Shared<S>,
5555
recv: Cancelable<MessageReceiver<ReconciliationMessage>>,
56-
targets: TargetMap<S>,
56+
targets: TargetMap,
5757
entry_state: EntryState,
5858
}
5959

@@ -219,30 +219,30 @@ impl<S: Storage> Reconciler<S> {
219219
}
220220

221221
#[derive(Debug)]
222-
struct TargetMap<S: Storage> {
223-
map: HashMap<TargetId, Target<S>>,
222+
struct TargetMap {
223+
map: HashMap<TargetId, Target>,
224224
inbox: CancelableReceiver<Input>,
225225
}
226226

227-
impl<S: Storage> TargetMap<S> {
227+
impl TargetMap {
228228
pub fn new(inbox: CancelableReceiver<Input>) -> Self {
229229
Self {
230230
map: Default::default(),
231231
inbox,
232232
}
233233
}
234-
pub async fn get_eventually(
234+
pub async fn get_eventually<S: Storage>(
235235
&mut self,
236236
shared: &Shared<S>,
237237
requested_id: &TargetId,
238-
) -> Result<&mut Target<S>, Error> {
238+
) -> Result<&mut Target, Error> {
239239
if !self.map.contains_key(requested_id) {
240240
self.wait_for_target(shared, requested_id).await?;
241241
}
242242
return Ok(self.map.get_mut(requested_id).unwrap());
243243
}
244244

245-
async fn wait_for_target(
245+
async fn wait_for_target<S: Storage>(
246246
&mut self,
247247
shared: &Shared<S>,
248248
requested_id: &TargetId,
@@ -261,13 +261,12 @@ impl<S: Storage> TargetMap<S> {
261261
Ok(())
262262
}
263263

264-
async fn init_target(
264+
async fn init_target<S: Storage>(
265265
&mut self,
266266
shared: &Shared<S>,
267267
intersection: AoiIntersection,
268268
) -> Result<TargetId, Error> {
269-
let snapshot = shared.store.entries().snapshot()?;
270-
let target = Target::init(snapshot, shared, intersection).await?;
269+
let target = Target::init(shared, intersection).await?;
271270
let id = target.id();
272271
debug!(
273272
our_handle = id.0.value(),
@@ -375,29 +374,23 @@ struct Shared<S: Storage> {
375374
}
376375

377376
#[derive(Debug)]
378-
struct Target<S: Storage> {
379-
snapshot: <S::Entries as EntryStorage>::Snapshot,
380-
377+
struct Target {
381378
intersection: AoiIntersection,
382-
383379
our_uncovered_ranges: HashSet<u64>,
384380
started: bool,
385-
386381
our_range_counter: u64,
387382
their_range_counter: u64,
388383
}
389384

390-
impl<S: Storage> Target<S> {
385+
impl Target {
391386
fn id(&self) -> TargetId {
392387
self.intersection.id()
393388
}
394-
async fn init(
395-
snapshot: <S::Entries as EntryStorage>::Snapshot,
389+
async fn init<S: Storage>(
396390
shared: &Shared<S>,
397391
intersection: AoiIntersection,
398392
) -> Result<Self, Error> {
399393
let mut this = Target {
400-
snapshot,
401394
intersection,
402395
our_uncovered_ranges: Default::default(),
403396
started: false,
@@ -414,9 +407,12 @@ impl<S: Storage> Target<S> {
414407
self.intersection.namespace
415408
}
416409

417-
async fn initiate(&mut self, shared: &Shared<S>) -> Result<(), Error> {
410+
async fn initiate<S: Storage>(&mut self, shared: &Shared<S>) -> Result<(), Error> {
418411
let range = self.intersection.area().to_range();
419-
let fingerprint = self.snapshot.fingerprint(self.namespace(), &range)?;
412+
let fingerprint = shared
413+
.store
414+
.entries()
415+
.fingerprint(self.namespace(), &range)?;
420416
self.send_fingerprint(shared, range, fingerprint, None)
421417
.await?;
422418
Ok(())
@@ -426,7 +422,7 @@ impl<S: Storage> Target<S> {
426422
self.started && self.our_uncovered_ranges.is_empty()
427423
}
428424

429-
async fn received_send_fingerprint(
425+
async fn received_send_fingerprint<S: Storage>(
430426
&mut self,
431427
shared: &Shared<S>,
432428
message: ReconciliationSendFingerprint,
@@ -437,22 +433,15 @@ impl<S: Storage> Target<S> {
437433
}
438434
let range_count = self.next_range_count_theirs();
439435

440-
let our_fingerprint = self
441-
.snapshot
436+
let our_fingerprint = shared
437+
.store
438+
.entries()
442439
.fingerprint(self.namespace(), &message.range)?;
443440

444441
// case 1: fingerprint match.
445442
if our_fingerprint == message.fingerprint {
446-
let reply = ReconciliationAnnounceEntries {
447-
range: message.range.clone(),
448-
is_empty: true,
449-
want_response: false,
450-
will_sort: false,
451-
sender_handle: message.receiver_handle,
452-
receiver_handle: message.sender_handle,
453-
covers: Some(range_count),
454-
};
455-
shared.send.send(reply).await?;
443+
self.announce_and_send_entries(shared, &message.range, false, Some(range_count), true)
444+
.await?;
456445
}
457446
// case 2: fingerprint is empty
458447
else if message.fingerprint.is_empty() {
@@ -462,10 +451,10 @@ impl<S: Storage> Target<S> {
462451
// case 3: fingerprint doesn't match and is non-empty
463452
else {
464453
// reply by splitting the range into parts unless it is very short
465-
// TODO: Expose
454+
// TODO: Expose these options to a higher level.
466455
let split_opts = SplitOpts::default();
467-
let snapshot = self.snapshot.clone();
468-
let mut iter = snapshot
456+
let store = shared.store.entries().clone();
457+
let mut iter = store
469458
.split_range(self.namespace(), &message.range, &split_opts)?
470459
.peekable();
471460
while let Some(res) = iter.next() {
@@ -474,7 +463,7 @@ impl<S: Storage> Target<S> {
474463
let covers = is_last.then_some(range_count);
475464
match action {
476465
SplitAction::SendEntries(count) => {
477-
self.announce_and_send_entries(shared, &subrange, true, covers, count > 0)
466+
self.announce_and_send_entries(shared, &subrange, true, covers, count == 0)
478467
.await?;
479468
}
480469
SplitAction::SendFingerprint(fingerprint) => {
@@ -488,12 +477,11 @@ impl<S: Storage> Target<S> {
488477
Ok(())
489478
}
490479

491-
async fn received_announce_entries(
480+
async fn received_announce_entries<S: Storage>(
492481
&mut self,
493482
shared: &Shared<S>,
494483
message: ReconciliationAnnounceEntries,
495484
) -> Result<(), Error> {
496-
trace!(?message, "received_announce_entries start");
497485
self.started = true;
498486
if let Some(range_count) = message.covers {
499487
self.mark_our_range_covered(range_count)?;
@@ -504,11 +492,10 @@ impl<S: Storage> Target<S> {
504492
self.announce_and_send_entries(shared, &message.range, false, Some(range_count), false)
505493
.await?;
506494
}
507-
trace!("received_announce_entries done");
508495
Ok(())
509496
}
510497

511-
async fn send_fingerprint(
498+
async fn send_fingerprint<S: Storage>(
512499
&mut self,
513500
shared: &Shared<S>,
514501
range: Range3d,
@@ -527,7 +514,9 @@ impl<S: Storage> Target<S> {
527514
Ok(())
528515
}
529516

530-
async fn announce_and_send_entries(
517+
/// Send a [`ReconciliationAnnounceEntries`] message for a range, and all entries in the range unless
518+
/// `is_empty` is set to true.
519+
async fn announce_and_send_entries<S: Storage>(
531520
&mut self,
532521
shared: &Shared<S>,
533522
range: &Range3d,
@@ -539,17 +528,25 @@ impl<S: Storage> Target<S> {
539528
self.mark_our_next_range_pending();
540529
}
541530

542-
let (iter, is_empty) = if is_empty {
543-
(None, true)
531+
// If we know for sure that our range is empty, we can skip creating the entry iterator alltogether.
532+
let mut iter = if is_empty {
533+
None
544534
} else {
545-
let mut iter = self
546-
.snapshot
547-
.get_authorised_entries(self.namespace(), range)
548-
.peekable();
549-
let is_empty = iter.peek().is_none();
550-
(Some(iter), is_empty)
535+
Some(
536+
shared
537+
.store
538+
.entries()
539+
.get_authorised_entries(self.namespace(), range)
540+
.peekable(),
541+
)
551542
};
543+
// Find out if we will send any entries at all.
544+
let is_empty = iter
545+
.as_mut()
546+
.map(|iter| iter.peek().is_none())
547+
.unwrap_or(true);
552548

549+
// Send the announce message
553550
let msg = ReconciliationAnnounceEntries {
554551
range: range.clone().into(),
555552
is_empty,
@@ -561,10 +558,12 @@ impl<S: Storage> Target<S> {
561558
};
562559
shared.send.send(msg).await?;
563560

561+
// If our range is empty, we're done!
564562
let Some(mut iter) = iter else {
565563
return Ok(());
566564
};
567565

566+
// Otherwise send all the entries in our iterator, and payloads if applicable.
568567
while let Some(authorised_entry) = iter.next() {
569568
let authorised_entry = authorised_entry?;
570569
let (entry, token) = authorised_entry.into_parts();

0 commit comments

Comments
 (0)