Skip to content

Commit 66f935d

Browse files
goffrieConvex, Inc.
authored andcommitted
Lift stream_revision_pairs into PersistenceReader (#40957)
GitOrigin-RevId: ea665cfe28ce651ca4d64a32a7d2945affc9f768
1 parent 64285f0 commit 66f935d

File tree

5 files changed

+121
-47
lines changed

5 files changed

+121
-47
lines changed

crates/common/src/persistence.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,36 @@ pub trait PersistenceReader: Send + Sync + 'static {
420420
.boxed()
421421
}
422422

423+
/// Loads revision pairs from the document log in the given timestamp range.
424+
///
425+
/// If a tablet id is provided, the results are filtered to a single table.
426+
fn load_revision_pairs(
427+
&self,
428+
tablet_id: Option<TabletId>,
429+
range: TimestampRange,
430+
order: Order,
431+
page_size: u32,
432+
retention_validator: Arc<dyn RetentionValidator>,
433+
) -> DocumentRevisionStream<'_> {
434+
let stream = if let Some(tablet_id) = tablet_id {
435+
self.load_documents_from_table(
436+
tablet_id,
437+
range,
438+
order,
439+
page_size,
440+
retention_validator.clone(),
441+
)
442+
} else {
443+
self.load_documents(range, order, page_size, retention_validator.clone())
444+
};
445+
crate::persistence_helpers::persistence_reader_stream_revision_pairs(
446+
stream,
447+
self,
448+
retention_validator,
449+
)
450+
.boxed()
451+
}
452+
423453
/// Look up the previous revision of `(id, ts)`, returning a map where for
424454
/// each `(id, ts)` we have...
425455
///
@@ -595,8 +625,8 @@ impl RepeatablePersistence {
595625
self.upper_bound
596626
}
597627

598-
/// Same as [`Persistence::load_documents`] but only including documents in
599-
/// the snapshot range.
628+
/// Same as [`PersistenceReader::load_documents`] but only including
629+
/// documents in the snapshot range.
600630
pub fn load_documents(&self, range: TimestampRange, order: Order) -> DocumentStream<'_> {
601631
self.reader.load_documents(
602632
range.intersect(TimestampRange::snapshot(*self.upper_bound)),
@@ -606,8 +636,8 @@ impl RepeatablePersistence {
606636
)
607637
}
608638

609-
/// Same as [`Persistence::load_documents_from_table`] but only including
610-
/// documents in the snapshot range.
639+
/// Same as [`PersistenceReader::load_documents_from_table`] but only
640+
/// including documents in the snapshot range.
611641
pub fn load_documents_from_table(
612642
&self,
613643
tablet_id: TabletId,
@@ -623,6 +653,23 @@ impl RepeatablePersistence {
623653
)
624654
}
625655

656+
/// Same as [`PersistenceReader::load_revision_pairs`] but only including
657+
/// revisions in the snapshot range.
658+
pub fn load_revision_pairs(
659+
&self,
660+
tablet_id: Option<TabletId>,
661+
range: TimestampRange,
662+
order: Order,
663+
) -> DocumentRevisionStream<'_> {
664+
self.reader.load_revision_pairs(
665+
tablet_id,
666+
range.intersect(TimestampRange::snapshot(*self.upper_bound)),
667+
order,
668+
*DEFAULT_DOCUMENTS_PAGE_SIZE,
669+
self.retention_validator.clone(),
670+
)
671+
}
672+
626673
pub async fn previous_revisions(
627674
&self,
628675
ids: BTreeSet<(InternalDocumentId, Timestamp)>,

crates/common/src/persistence_helpers.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use anyhow::Context as _;
24
use futures::{
35
Stream,
@@ -12,19 +14,23 @@ use crate::{
1214
persistence::{
1315
DocumentLogEntry,
1416
DocumentPrevTsQuery,
17+
PersistenceReader,
1518
RepeatablePersistence,
19+
RetentionValidator,
1620
},
1721
try_chunks::TryChunksExt,
1822
types::Timestamp,
1923
};
2024

2125
#[derive(Debug)]
26+
#[cfg_attr(any(test, feature = "testing"), derive(PartialEq))]
2227
pub struct DocumentRevision {
2328
pub ts: Timestamp,
2429
pub document: Option<ResolvedDocument>,
2530
}
2631

2732
#[derive(Debug)]
33+
#[cfg_attr(any(test, feature = "testing"), derive(PartialEq))]
2834
pub struct RevisionPair {
2935
pub id: InternalDocumentId,
3036
pub rev: DocumentRevision,
@@ -47,6 +53,57 @@ impl RevisionPair {
4753

4854
type RevisionStreamEntry = anyhow::Result<DocumentLogEntry>;
4955

56+
/// Exposed as PersistenceReader::load_revision_pairs
57+
#[allow(clippy::needless_lifetimes)]
58+
#[try_stream(ok = RevisionPair, error = anyhow::Error)]
59+
pub(crate) async fn persistence_reader_stream_revision_pairs<'a, P: PersistenceReader + ?Sized>(
60+
documents: impl Stream<Item = RevisionStreamEntry> + 'a,
61+
reader: &'a P,
62+
retention_validator: Arc<dyn RetentionValidator>,
63+
) {
64+
let documents = documents.try_chunks2(*DOCUMENTS_IN_MEMORY);
65+
futures::pin_mut!(documents);
66+
67+
while let Some(read_chunk) = documents.try_next().await? {
68+
let queries = read_chunk
69+
.iter()
70+
.filter_map(|entry| {
71+
entry.prev_ts.map(|prev_ts| DocumentPrevTsQuery {
72+
id: entry.id,
73+
ts: entry.ts,
74+
prev_ts,
75+
})
76+
})
77+
.collect();
78+
let mut prev_revs = reader
79+
.previous_revisions_of_documents(queries, retention_validator.clone())
80+
.await?;
81+
for DocumentLogEntry {
82+
ts,
83+
prev_ts,
84+
id,
85+
value: document,
86+
..
87+
} in read_chunk
88+
{
89+
let rev = DocumentRevision { ts, document };
90+
let prev_rev = prev_ts
91+
.map(|prev_ts| {
92+
let entry = prev_revs
93+
.remove(&DocumentPrevTsQuery { id, ts, prev_ts })
94+
.with_context(|| format!("prev_ts is missing for {id}@{ts}: {prev_ts}"))?;
95+
anyhow::Ok(DocumentRevision {
96+
ts: entry.ts,
97+
document: entry.value,
98+
})
99+
})
100+
.transpose()?;
101+
yield RevisionPair { id, rev, prev_rev };
102+
}
103+
}
104+
}
105+
106+
// TODO: remove this and make users go through PersistenceReader
50107
#[allow(clippy::needless_lifetimes)]
51108
#[try_stream(ok = RevisionPair, error = anyhow::Error)]
52109
pub async fn stream_revision_pairs<'a>(

crates/database/src/database_index_workers/index_writer.rs

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,6 @@ use common::{
3131
RetentionValidator,
3232
TimestampRange,
3333
},
34-
persistence_helpers::{
35-
stream_revision_pairs,
36-
RevisionPair,
37-
},
3834
query::Order,
3935
runtime::{
4036
new_rate_limiter,
@@ -57,7 +53,6 @@ use futures::{
5753
self,
5854
FusedStream,
5955
},
60-
Stream,
6156
StreamExt,
6257
TryStreamExt,
6358
};
@@ -403,11 +398,10 @@ impl<RT: Runtime> IndexWriter<RT> {
403398
);
404399
let (tx, rx) = mpsc::channel(32);
405400
let producer = async {
406-
let revision_stream = self.stream_revision_pairs(
407-
&repeatable_persistence,
401+
let revision_stream = repeatable_persistence.load_revision_pairs(
402+
index_selector.tablet_id(),
408403
TimestampRange::new(start_ts..=*end_ts),
409404
Order::Asc,
410-
index_selector,
411405
);
412406
futures::pin_mut!(revision_stream);
413407
while let Some(revision_pair) = revision_stream.try_next().await? {
@@ -464,11 +458,10 @@ impl<RT: Runtime> IndexWriter<RT> {
464458
self.retention_validator.clone(),
465459
);
466460
let producer = async {
467-
let revision_stream = self.stream_revision_pairs(
468-
&repeatable_persistence,
461+
let revision_stream = repeatable_persistence.load_revision_pairs(
462+
index_selector.tablet_id(),
469463
TimestampRange::new(end_ts..*start_ts),
470464
Order::Desc,
471-
index_selector,
472465
);
473466
futures::pin_mut!(revision_stream);
474467
while let Some(revision_pair) = revision_stream.try_next().await? {
@@ -528,21 +521,6 @@ impl<RT: Runtime> IndexWriter<RT> {
528521
start_ts.prior_ts(backfilled_ts)
529522
}
530523

531-
fn stream_revision_pairs<'a>(
532-
&'a self,
533-
reader: &'a RepeatablePersistence,
534-
range: TimestampRange,
535-
order: Order,
536-
index_selector: &'a IndexSelector,
537-
) -> impl Stream<Item = anyhow::Result<RevisionPair>> + 'a {
538-
let document_stream = if let Some(tablet_id) = index_selector.tablet_id() {
539-
reader.load_documents_from_table(tablet_id, range, order)
540-
} else {
541-
reader.load_documents(range, order)
542-
};
543-
stream_revision_pairs(document_stream, reader)
544-
}
545-
546524
async fn write_index_entries(
547525
&self,
548526
updates: impl FusedStream<Item = (Timestamp, DatabaseIndexUpdate)>,

crates/database/src/persistence_helpers.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@ use common::{
33
RepeatablePersistence,
44
TimestampRange,
55
},
6-
persistence_helpers::{
7-
stream_revision_pairs,
8-
RevisionPair,
9-
},
6+
persistence_helpers::RevisionPair,
107
query::Order,
118
types::Timestamp,
129
};
@@ -48,8 +45,7 @@ pub async fn stream_transactions<'a>(
4845
range: TimestampRange,
4946
order: Order,
5047
) {
51-
let document_stream = reader.load_documents(range, order);
52-
let revision_stream = stream_revision_pairs(document_stream, reader);
48+
let revision_stream = reader.load_revision_pairs(None /* tablet_id */, range, order);
5349
futures::pin_mut!(revision_stream);
5450

5551
if let Some(first_pair) = revision_stream.try_next().await? {

crates/database/src/search_index_bootstrap.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,7 @@ use common::{
2828
RepeatablePersistence,
2929
TimestampRange,
3030
},
31-
persistence_helpers::{
32-
stream_revision_pairs,
33-
RevisionPair,
34-
},
31+
persistence_helpers::RevisionPair,
3532
query::Order,
3633
runtime::{
3734
try_join_buffer_unordered,
@@ -441,16 +438,15 @@ pub fn stream_revision_pairs_for_indexes<'a>(
441438
persistence: &'a RepeatablePersistence,
442439
range: TimestampRange,
443440
) -> impl Stream<Item = anyhow::Result<RevisionPair>> + 'a {
444-
let document_stream = persistence
445-
.load_documents(range, Order::Asc)
446-
.try_filter(|entry| {
447-
let is_in_indexed_table = tables_with_indexes.contains(&entry.id.table());
441+
persistence
442+
.load_revision_pairs(None /* tablet_id */, range, Order::Asc)
443+
.try_filter(|revision| {
444+
let is_in_indexed_table = tables_with_indexes.contains(&revision.id.table());
448445
if !is_in_indexed_table {
449446
log_document_skipped();
450447
}
451-
future::ready(tables_with_indexes.contains(&entry.id.table()))
452-
});
453-
stream_revision_pairs(document_stream, persistence)
448+
future::ready(is_in_indexed_table)
449+
})
454450
}
455451

456452
impl<RT: Runtime> SearchIndexBootstrapWorker<RT> {

0 commit comments

Comments
 (0)