Skip to content

Commit 370f237

Browse files
goffrieConvex, Inc.
authored andcommitted
Simplify representation of TimestampRange & make RepeatablePersistence::load_documents pre-filter the range (#40853)
Makes TimestampRange a simple inclusive range internally and make operations on it infallible. Then add `TimestampRange::intersect` so that we can get rid of post-filtering on `RepeatablePersistence::load_documents` & similar methods, by intersecting the provided TimestampRange with the upper bound. GitOrigin-RevId: a3259d4556f23602e981a8f7cc38a29969e409bd
1 parent 0fbed0b commit 370f237

File tree

12 files changed

+112
-82
lines changed

12 files changed

+112
-82
lines changed

crates/common/src/persistence.rs

Lines changed: 69 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -272,75 +272,91 @@ pub trait Persistence: Sync + Send + 'static {
272272

273273
#[derive(Debug, Clone, Copy)]
274274
pub struct TimestampRange {
275-
start_bound: Bound<Timestamp>,
276-
end_bound: Bound<Timestamp>,
275+
start_inclusive: Timestamp,
276+
end_inclusive: Timestamp,
277277
}
278278

279279
impl TimestampRange {
280-
pub fn new<T: RangeBounds<Timestamp>>(range: T) -> anyhow::Result<Self> {
281-
// Bounds check.
282-
Self::min_inclusive(&range.start_bound().cloned())?;
283-
Self::max_exclusive(&range.end_bound().cloned())?;
284-
Ok(Self {
285-
start_bound: range.start_bound().cloned(),
286-
end_bound: range.end_bound().cloned(),
287-
})
288-
}
289-
290-
pub fn snapshot(ts: Timestamp) -> Self {
280+
#[inline]
281+
pub fn new<T: RangeBounds<Timestamp>>(range: T) -> Self {
282+
let start_inclusive = match range.start_bound() {
283+
Bound::Included(t) => *t,
284+
Bound::Excluded(t) => {
285+
if let Some(succ) = t.succ_opt() {
286+
succ
287+
} else {
288+
return Self::empty();
289+
}
290+
},
291+
Bound::Unbounded => Timestamp::MIN,
292+
};
293+
let end_inclusive = match range.end_bound() {
294+
Bound::Included(t) => *t,
295+
Bound::Excluded(t) => {
296+
if let Some(pred) = t.pred_opt() {
297+
pred
298+
} else {
299+
return Self::empty();
300+
}
301+
},
302+
Bound::Unbounded => Timestamp::MAX,
303+
};
291304
Self {
292-
start_bound: Bound::Unbounded,
293-
end_bound: Bound::Included(ts),
305+
start_inclusive,
306+
end_inclusive,
294307
}
295308
}
296309

297-
pub fn all() -> Self {
310+
#[inline]
311+
pub fn empty() -> Self {
298312
Self {
299-
start_bound: Bound::Unbounded,
300-
end_bound: Bound::Unbounded,
313+
start_inclusive: Timestamp::MAX,
314+
end_inclusive: Timestamp::MIN,
301315
}
302316
}
303317

304-
pub fn at(ts: Timestamp) -> Self {
305-
Self {
306-
start_bound: Bound::Included(ts),
307-
end_bound: Bound::Included(ts),
308-
}
318+
#[inline]
319+
pub fn snapshot(ts: Timestamp) -> Self {
320+
Self::new(..=ts)
309321
}
310322

311-
pub fn greater_than(t: Timestamp) -> Self {
312-
Self {
313-
start_bound: Bound::Excluded(t),
314-
end_bound: Bound::Unbounded,
315-
}
323+
#[inline]
324+
pub fn all() -> Self {
325+
Self::new(..)
316326
}
317327

318-
fn min_inclusive(start_bound: &Bound<Timestamp>) -> anyhow::Result<Timestamp> {
319-
Ok(match start_bound {
320-
Bound::Included(t) => *t,
321-
Bound::Excluded(t) => t.succ()?,
322-
Bound::Unbounded => Timestamp::MIN,
323-
})
328+
#[inline]
329+
pub fn at(ts: Timestamp) -> Self {
330+
Self::new(ts..=ts)
324331
}
325332

326-
pub fn min_timestamp_inclusive(&self) -> Timestamp {
327-
Self::min_inclusive(&self.start_bound).unwrap()
333+
#[inline]
334+
pub fn greater_than(t: Timestamp) -> Self {
335+
Self::new((Bound::Excluded(t), Bound::Unbounded))
328336
}
329337

330-
fn max_exclusive(end_bound: &Bound<Timestamp>) -> anyhow::Result<Timestamp> {
331-
Ok(match end_bound {
332-
Bound::Included(t) => t.succ()?,
333-
Bound::Excluded(t) => *t,
334-
Bound::Unbounded => Timestamp::MAX,
335-
})
338+
#[inline]
339+
pub fn min_timestamp_inclusive(&self) -> Timestamp {
340+
self.start_inclusive
336341
}
337342

343+
#[inline]
338344
pub fn max_timestamp_exclusive(&self) -> Timestamp {
339-
Self::max_exclusive(&self.end_bound).unwrap()
345+
// assumes that Timestamp::MAX never actually exists
346+
self.end_inclusive.succ_opt().unwrap_or(Timestamp::MAX)
340347
}
341348

349+
#[inline]
342350
pub fn contains(&self, ts: Timestamp) -> bool {
343-
self.min_timestamp_inclusive() <= ts && ts < self.max_timestamp_exclusive()
351+
self.start_inclusive <= ts && ts <= self.end_inclusive
352+
}
353+
354+
#[inline]
355+
pub fn intersect(&self, other: Self) -> Self {
356+
Self {
357+
start_inclusive: self.start_inclusive.max(other.start_inclusive),
358+
end_inclusive: self.end_inclusive.min(other.end_inclusive),
359+
}
344360
}
345361
}
346362

@@ -582,13 +598,12 @@ impl RepeatablePersistence {
582598
/// Same as [`Persistence::load_documents`] but only including documents in
583599
/// the snapshot range.
584600
pub fn load_documents(&self, range: TimestampRange, order: Order) -> DocumentStream<'_> {
585-
let stream = self.reader.load_documents(
586-
range,
601+
self.reader.load_documents(
602+
range.intersect(TimestampRange::snapshot(*self.upper_bound)),
587603
order,
588604
*DEFAULT_DOCUMENTS_PAGE_SIZE,
589605
self.retention_validator.clone(),
590-
);
591-
Box::pin(stream.try_filter(|entry| future::ready(entry.ts <= *self.upper_bound)))
606+
)
592607
}
593608

594609
/// Same as [`Persistence::load_documents_from_table`] but only including
@@ -599,14 +614,13 @@ impl RepeatablePersistence {
599614
range: TimestampRange,
600615
order: Order,
601616
) -> DocumentStream<'_> {
602-
let stream = self.reader.load_documents_from_table(
617+
self.reader.load_documents_from_table(
603618
tablet_id,
604-
range,
619+
range.intersect(TimestampRange::snapshot(*self.upper_bound)),
605620
order,
606621
*DEFAULT_DOCUMENTS_PAGE_SIZE,
607622
self.retention_validator.clone(),
608-
);
609-
Box::pin(stream.try_filter(|entry| future::ready(entry.ts <= *self.upper_bound)))
623+
)
610624
}
611625

612626
/// Same as `load_documents` but doesn't use the `RetentionValidator` from
@@ -618,13 +632,12 @@ impl RepeatablePersistence {
618632
order: Order,
619633
retention_validator: Arc<dyn RetentionValidator>,
620634
) -> DocumentStream<'_> {
621-
let stream = self.reader.load_documents(
622-
range,
635+
self.reader.load_documents(
636+
range.intersect(TimestampRange::snapshot(*self.upper_bound)),
623637
order,
624638
*DEFAULT_DOCUMENTS_PAGE_SIZE,
625639
retention_validator,
626-
);
627-
Box::pin(stream.try_filter(|entry| future::ready(entry.ts <= *self.upper_bound)))
640+
)
628641
}
629642

630643
pub async fn previous_revisions(

crates/common/src/testing/persistence_test_suite.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ pub async fn write_and_load_from_table<P: Persistence>(p: Arc<P>) -> anyhow::Res
346346
test_load_documents_from_table(
347347
&p,
348348
doc1.id().tablet_id,
349-
TimestampRange::new(Timestamp::must(1)..)?,
349+
TimestampRange::new(Timestamp::must(1)..),
350350
Order::Asc,
351351
vec![DocumentLogEntry {
352352
ts: Timestamp::must(1),
@@ -360,7 +360,7 @@ pub async fn write_and_load_from_table<P: Persistence>(p: Arc<P>) -> anyhow::Res
360360
test_load_documents_from_table(
361361
&p,
362362
doc2.id().tablet_id,
363-
TimestampRange::new(Timestamp::must(1)..)?,
363+
TimestampRange::new(Timestamp::must(1)..),
364364
Order::Asc,
365365
vec![DocumentLogEntry {
366366
ts: Timestamp::must(1),
@@ -374,7 +374,7 @@ pub async fn write_and_load_from_table<P: Persistence>(p: Arc<P>) -> anyhow::Res
374374
test_load_documents_from_table(
375375
&p,
376376
doc1.id().tablet_id,
377-
TimestampRange::new(..Timestamp::must(1))?,
377+
TimestampRange::new(..Timestamp::must(1)),
378378
Order::Asc,
379379
vec![DocumentLogEntry {
380380
ts: Timestamp::must(0),
@@ -388,7 +388,7 @@ pub async fn write_and_load_from_table<P: Persistence>(p: Arc<P>) -> anyhow::Res
388388
test_load_documents_from_table(
389389
&p,
390390
doc2.id().tablet_id,
391-
TimestampRange::new(..Timestamp::must(1))?,
391+
TimestampRange::new(..Timestamp::must(1)),
392392
Order::Asc,
393393
vec![DocumentLogEntry {
394394
ts: Timestamp::must(0),
@@ -457,7 +457,7 @@ pub async fn write_and_load<P: Persistence>(p: Arc<P>) -> anyhow::Result<()> {
457457
test_load_documents(
458458
&p,
459459
&id_generator,
460-
TimestampRange::new(Timestamp::must(1)..)?,
460+
TimestampRange::new(Timestamp::must(1)..),
461461
Order::Asc,
462462
vec![DocumentLogEntry {
463463
ts: Timestamp::must(1),
@@ -485,7 +485,7 @@ pub async fn write_and_load<P: Persistence>(p: Arc<P>) -> anyhow::Result<()> {
485485
test_load_documents(
486486
&p,
487487
&id_generator,
488-
TimestampRange::new(..Timestamp::must(2))?,
488+
TimestampRange::new(..Timestamp::must(2)),
489489
Order::Desc,
490490
vec![
491491
DocumentLogEntry {

crates/convex/sync_types/src/timestamp.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use std::{
77
},
88
};
99

10-
use anyhow::Context;
10+
use anyhow::{
11+
anyhow,
12+
Context,
13+
};
1114
use derive_more::FromStr;
1215
use serde::Serialize;
1316
use serde_json::json;
@@ -25,18 +28,32 @@ impl Timestamp {
2528
pub const MAX: Self = Self(i64::MAX as u64);
2629
pub const MIN: Self = Self(0);
2730

28-
pub fn succ(&self) -> anyhow::Result<Self> {
31+
#[inline]
32+
pub fn succ_opt(&self) -> Option<Self> {
2933
if *self >= Self::MAX {
30-
anyhow::bail!("timestamp {self} already at max");
34+
None
35+
} else {
36+
Some(Self(self.0 + 1))
3137
}
32-
Ok(Self(self.0 + 1))
3338
}
3439

35-
pub fn pred(&self) -> anyhow::Result<Self> {
40+
pub fn succ(&self) -> anyhow::Result<Self> {
41+
self.succ_opt()
42+
.ok_or_else(|| anyhow!("timestamp {self} already at max"))
43+
}
44+
45+
#[inline]
46+
pub fn pred_opt(&self) -> Option<Self> {
3647
if *self <= Self::MIN {
37-
anyhow::bail!("timestamp {self} already at min");
48+
None
49+
} else {
50+
Some(Self(self.0 - 1))
3851
}
39-
Ok(Self(self.0 - 1))
52+
}
53+
54+
pub fn pred(&self) -> anyhow::Result<Self> {
55+
self.pred_opt()
56+
.ok_or_else(|| anyhow!("timestamp {self} already at min"))
4057
}
4158

4259
pub fn add(&self, duration: Duration) -> anyhow::Result<Self> {

crates/database/src/committer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ impl<RT: Runtime> Committer<RT> {
421421
!vector_index_manager.is_bootstrapping(),
422422
"Trying to update vector index while it's still bootstrapping"
423423
);
424-
let range = TimestampRange::new((Bound::Excluded(bootstrap_ts), Bound::Unbounded))?;
424+
let range = TimestampRange::new((Bound::Excluded(bootstrap_ts), Bound::Unbounded));
425425

426426
let revision_stream =
427427
stream_revision_pairs_for_indexes(tables_with_indexes, &persistence, range);

crates/database/src/database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1821,7 +1821,7 @@ impl<RT: Runtime> Database<RT> {
18211821
self.retention_validator(),
18221822
);
18231823
let range = match cursor {
1824-
Some(ts) => TimestampRange::new((Bound::Excluded(ts), Bound::Unbounded))?,
1824+
Some(ts) => TimestampRange::new((Bound::Excluded(ts), Bound::Unbounded)),
18251825
None => TimestampRange::all(),
18261826
};
18271827
let mut document_stream = repeatable_persistence.load_documents(range, Order::Asc);

crates/database/src/database_index_workers/index_writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ impl<RT: Runtime> IndexWriter<RT> {
405405
let producer = async {
406406
let revision_stream = self.stream_revision_pairs(
407407
&repeatable_persistence,
408-
TimestampRange::new(start_ts..=*end_ts)?,
408+
TimestampRange::new(start_ts..=*end_ts),
409409
Order::Asc,
410410
index_selector,
411411
);
@@ -466,7 +466,7 @@ impl<RT: Runtime> IndexWriter<RT> {
466466
let producer = async {
467467
let revision_stream = self.stream_revision_pairs(
468468
&repeatable_persistence,
469-
TimestampRange::new(end_ts..*start_ts)?,
469+
TimestampRange::new(end_ts..*start_ts),
470470
Order::Desc,
471471
index_selector,
472472
);

crates/database/src/retention.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
575575
);
576576
let reader_ = &reader;
577577
let mut index_entry_chunks = reader
578-
.load_documents(TimestampRange::new(*cursor..*min_snapshot_ts)?, Order::Asc)
578+
.load_documents(TimestampRange::new(*cursor..*min_snapshot_ts), Order::Asc)
579579
.try_chunks2(*RETENTION_READ_CHUNK)
580580
.map(move |chunk| async move {
581581
let chunk = chunk?;
@@ -815,7 +815,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
815815
let reader_ = &reader;
816816
let mut document_chunks = reader
817817
.load_documents_with_retention_validator(
818-
TimestampRange::new(*cursor..*min_document_snapshot_ts)?,
818+
TimestampRange::new(*cursor..*min_document_snapshot_ts),
819819
Order::Asc,
820820
Arc::new(NoopRetentionValidator),
821821
)
@@ -1503,7 +1503,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
15031503
let reader = persistence.reader();
15041504
let mut document_stream = reader.load_documents_from_table(
15051505
index_table_id,
1506-
TimestampRange::new(**cursor..*latest_ts)?,
1506+
TimestampRange::new(**cursor..*latest_ts),
15071507
Order::Asc,
15081508
*DEFAULT_DOCUMENTS_PAGE_SIZE,
15091509
retention_validator,

crates/database/src/search_index_bootstrap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ impl IndexesToBootstrap {
256256
let range = TimestampRange::new((
257257
Bound::Excluded(self.oldest_index_ts),
258258
Bound::Included(*upper_bound),
259-
))?;
259+
));
260260
let tables_with_indexes = self.tables_with_indexes();
261261
let revision_stream =
262262
stream_revision_pairs_for_indexes(&tables_with_indexes, persistence, range);

crates/database/src/search_index_workers/search_flusher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ impl<RT: Runtime, T: SearchIndex + 'static> SearchFlusher<RT, T> {
604604
TimestampRange::new((
605605
Bound::Excluded(*last_ts),
606606
Bound::Included(*snapshot_ts),
607-
))?,
607+
)),
608608
T::partial_document_order(),
609609
&row_rate_limiter,
610610
),

crates/database/src/search_index_workers/writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,7 @@ impl<RT: Runtime, T: SearchIndex> Inner<RT, T> {
635635
T::download_previous_segments(storage.clone(), segments_to_update).await?;
636636
let documents = database.load_documents_in_table(
637637
*index_name.table(),
638-
TimestampRange::new((Bound::Excluded(start_ts), Bound::Included(*current_ts)))?,
638+
TimestampRange::new((Bound::Excluded(start_ts), Bound::Included(*current_ts))),
639639
Order::Asc,
640640
&row_rate_limiter,
641641
);

0 commit comments

Comments
 (0)