Skip to content

Commit 5439acf

Browse files
authored
Merge pull request #194 from fjall-rs/revamp-flush-api
Revamped flush API
2 parents faaef8e + c0b9060 commit 5439acf

File tree

15 files changed

+345
-297
lines changed

15 files changed

+345
-297
lines changed

src/abstract.rs

Lines changed: 88 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,22 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use crate::{
6-
blob_tree::FragmentationMap, compaction::CompactionStrategy, config::TreeType,
7-
iter_guard::IterGuardImpl, table::Table, tree::inner::MemtableId, version::Version,
8-
vlog::BlobFile, AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
6+
blob_tree::FragmentationMap,
7+
compaction::CompactionStrategy,
8+
config::TreeType,
9+
iter_guard::IterGuardImpl,
10+
table::Table,
11+
tree::inner::MemtableId,
12+
version::{SuperVersions, Version},
13+
vlog::BlobFile,
14+
AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo,
915
SequenceNumberCounter, TableId, Tree, TreeId, UserKey, UserValue,
1016
};
1117
use enum_dispatch::enum_dispatch;
12-
use std::{ops::RangeBounds, sync::Arc};
18+
use std::{
19+
ops::RangeBounds,
20+
sync::{Arc, MutexGuard, RwLockWriteGuard},
21+
};
1322

1423
pub type RangeItem = crate::Result<KvPair>;
1524

@@ -28,18 +37,76 @@ pub trait AbstractTree {
2837
#[doc(hidden)]
2938
fn current_version(&self) -> Version;
3039

31-
/// Synchronously flushes the active memtable to a table.
40+
#[doc(hidden)]
41+
fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, SuperVersions>;
42+
43+
/// Seals the active memtable and flushes to table(s).
3244
///
33-
/// The function may not return a result, if, during concurrent workloads, the memtable
34-
/// ends up being empty before the flush is set up.
45+
/// If there are already other sealed memtables lined up, those will be flushed as well.
3546
///
36-
/// The result will contain the [`Table`].
47+
/// Only used in tests.
48+
#[doc(hidden)]
49+
fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<()> {
50+
let lock = self.get_flush_lock();
51+
self.rotate_memtable();
52+
self.flush(&lock, eviction_seqno)?;
53+
Ok(())
54+
}
55+
56+
/// Synchronously flushes pending sealed memtables to tables.
57+
///
58+
/// Returns the sum of flushed memtable sizes that were flushed.
59+
///
60+
/// The function may not return a result, if nothing was flushed.
3761
///
3862
/// # Errors
3963
///
4064
/// Will return `Err` if an IO error occurs.
41-
#[doc(hidden)]
42-
fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Table>>;
65+
fn flush(
66+
&self,
67+
_lock: &MutexGuard<'_, ()>,
68+
seqno_threshold: SeqNo,
69+
) -> crate::Result<Option<u64>> {
70+
use crate::{compaction::stream::CompactionStream, merge::Merger};
71+
72+
let version_history = self.get_version_history_lock();
73+
let latest = version_history.latest_version();
74+
75+
if latest.sealed_memtables.len() == 0 {
76+
return Ok(None);
77+
}
78+
79+
let sealed_ids = latest
80+
.sealed_memtables
81+
.iter()
82+
.map(|mt| mt.0)
83+
.collect::<Vec<_>>();
84+
85+
let flushed_size = latest.sealed_memtables.iter().map(|(_, x)| x.size()).sum();
86+
87+
let merger = Merger::new(
88+
latest
89+
.sealed_memtables
90+
.iter()
91+
.map(|(_, mt)| mt.iter().map(Ok))
92+
.collect::<Vec<_>>(),
93+
);
94+
let stream = CompactionStream::new(merger, seqno_threshold);
95+
96+
drop(version_history);
97+
98+
if let Some((tables, blob_files)) = self.flush_to_tables(stream)? {
99+
self.register_tables(
100+
&tables,
101+
blob_files.as_deref(),
102+
None,
103+
&sealed_ids,
104+
seqno_threshold,
105+
)?;
106+
}
107+
108+
Ok(Some(flushed_size))
109+
}
43110

44111
/// Returns an iterator that scans through the entire tree.
45112
///
@@ -145,6 +212,9 @@ pub trait AbstractTree {
145212
#[cfg(feature = "metrics")]
146213
fn metrics(&self) -> &Arc<crate::Metrics>;
147214

215+
/// Acquires the flush lock which is required to call [`Tree::flush`].
216+
fn get_flush_lock(&self) -> MutexGuard<'_, ()>;
217+
148218
/// Synchronously flushes a memtable to a table.
149219
///
150220
/// This method will not make the table immediately available,
@@ -154,12 +224,10 @@ pub trait AbstractTree {
154224
///
155225
/// Will return `Err` if an IO error occurs.
156226
#[warn(clippy::type_complexity)]
157-
fn flush_memtable(
227+
fn flush_to_tables(
158228
&self,
159-
table_id: TableId, // TODO: remove?
160-
memtable: &Arc<Memtable>,
161-
seqno_threshold: SeqNo,
162-
) -> crate::Result<Option<(Table, Option<BlobFile>)>>;
229+
stream: impl Iterator<Item = crate::Result<InternalValue>>,
230+
) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>>;
163231

164232
/// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
165233
///
@@ -171,6 +239,8 @@ pub trait AbstractTree {
171239
tables: &[Table],
172240
blob_files: Option<&[BlobFile]>,
173241
frag_map: Option<FragmentationMap>,
242+
sealed_memtables_to_delete: &[MemtableId],
243+
gc_watermark: SeqNo,
174244
) -> crate::Result<()>;
175245

176246
/// Clears the active memtable atomically.
@@ -222,8 +292,8 @@ pub trait AbstractTree {
222292
/// Returns the tree type.
223293
fn tree_type(&self) -> TreeType;
224294

225-
/// Seals the active memtable, and returns a reference to it.
226-
fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)>;
295+
/// Seals the active memtable.
296+
fn rotate_memtable(&self) -> Option<Arc<Memtable>>;
227297

228298
/// Returns the number of tables currently in the tree.
229299
fn table_count(&self) -> usize;
@@ -239,9 +309,7 @@ pub trait AbstractTree {
239309
fn l0_run_count(&self) -> usize;
240310

241311
/// Returns the number of blob files currently in the tree.
242-
fn blob_file_count(&self) -> usize {
243-
0
244-
}
312+
fn blob_file_count(&self) -> usize;
245313

246314
/// Approximates the number of items in the tree.
247315
fn approximate_len(&self) -> usize;

0 commit comments

Comments
 (0)