Skip to content

Commit babdc93

Browse files
committed
Move the handling of the long running list blobs cmd into the metadata db
This makes the metadata db more self-contained - the level above does not need to know about snapshots.
1 parent ee3e710 commit babdc93

File tree

3 files changed

+20
-29
lines changed

3 files changed

+20
-29
lines changed

src/store/fs.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ use entry_state::{DataLocation, OutboardLocation};
9494
use gc::run_gc;
9595
use import::{ImportEntry, ImportSource};
9696
use irpc::channel::mpsc;
97-
use meta::list_blobs;
9897
use n0_future::{future::yield_now, io};
9998
use nested_enum_utils::enum_conversions;
10099
use range_collections::range_set::RangeSetRange;
@@ -507,9 +506,7 @@ impl Actor {
507506
}
508507
Command::ListBlobs(cmd) => {
509508
trace!("{cmd:?}");
510-
if let Ok(snapshot) = self.db().snapshot(cmd.span.clone()).await {
511-
self.spawn(list_blobs(snapshot, cmd));
512-
}
509+
self.db().send(cmd.into()).await.ok();
513510
}
514511
Command::Batch(cmd) => {
515512
trace!("{cmd:?}");

src/store/fs/meta.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use n0_snafu::SpanTrace;
1515
use nested_enum_utils::common_fields;
1616
use redb::{Database, DatabaseError, ReadableTable};
1717
use snafu::{Backtrace, ResultExt, Snafu};
18-
use tokio::pin;
18+
use tokio::{pin, task::JoinSet};
1919

2020
use crate::{
2121
api::{
@@ -96,15 +96,6 @@ impl Db {
9696
Self { sender }
9797
}
9898

99-
pub async fn snapshot(&self, span: tracing::Span) -> io::Result<ReadOnlyTables> {
100-
let (tx, rx) = tokio::sync::oneshot::channel();
101-
self.sender
102-
.send(Snapshot { tx, span }.into())
103-
.await
104-
.map_err(|_| io::Error::other("send snapshot"))?;
105-
rx.await.map_err(|_| io::Error::other("receive snapshot"))
106-
}
107-
10899
pub async fn update_await(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
109100
let (tx, rx) = oneshot::channel();
110101
self.sender
@@ -463,6 +454,7 @@ pub struct Actor {
463454
ds: DeleteHandle,
464455
options: BatchOptions,
465456
protected: HashSet<Hash>,
457+
tasks: JoinSet<()>,
466458
}
467459

468460
impl Actor {
@@ -492,6 +484,7 @@ impl Actor {
492484
ds,
493485
options,
494486
protected: Default::default(),
487+
tasks: JoinSet::new(),
495488
})
496489
}
497490

@@ -707,6 +700,7 @@ impl Actor {
707700

708701
async fn handle_toplevel(
709702
db: &mut Database,
703+
tasks: &mut JoinSet<()>,
710704
cmd: TopLevelCommand,
711705
op: TxnNum,
712706
) -> ActorResult<Option<ShutdownMsg>> {
@@ -726,11 +720,11 @@ impl Actor {
726720
// nothing to do here, since the database will be dropped
727721
Some(cmd)
728722
}
729-
TopLevelCommand::Snapshot(cmd) => {
723+
TopLevelCommand::ListBlobs(cmd) => {
730724
trace!("{cmd:?}");
731725
let txn = db.begin_read().context(TransactionSnafu)?;
732726
let snapshot = ReadOnlyTables::new(&txn).context(TableSnafu)?;
733-
cmd.tx.send(snapshot).ok();
727+
tasks.spawn(list_blobs(snapshot, cmd));
734728
None
735729
}
736730
})
@@ -741,14 +735,20 @@ impl Actor {
741735
let options = &self.options;
742736
let mut op = 0u64;
743737
let shutdown = loop {
738+
let cmd = tokio::select! {
739+
cmd = self.cmds.recv() => cmd,
740+
_ = self.tasks.join_next(), if !self.tasks.is_empty() => continue,
741+
};
744742
op += 1;
745-
let Some(cmd) = self.cmds.recv().await else {
743+
let Some(cmd) = cmd else {
746744
break None;
747745
};
748746
match cmd {
749747
Command::TopLevel(cmd) => {
750748
let op = TxnNum::TopLevel(op);
751-
if let Some(shutdown) = Self::handle_toplevel(&mut db, cmd, op).await? {
749+
if let Some(shutdown) =
750+
Self::handle_toplevel(&mut db, &mut self.tasks, cmd, op).await?
751+
{
752752
break Some(shutdown);
753753
}
754754
}

src/store/fs/meta/proto.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use bytes::Bytes;
55
use nested_enum_utils::enum_conversions;
66
use tracing::Span;
77

8-
use super::{ActorResult, ReadOnlyTables};
8+
use super::ActorResult;
99
use crate::{
1010
api::proto::{
11-
BlobStatusMsg, ClearProtectedMsg, DeleteBlobsMsg, ProcessExitRequest, ShutdownMsg,
12-
SyncDbMsg,
11+
BlobStatusMsg, ClearProtectedMsg, DeleteBlobsMsg, ListBlobsMsg, ProcessExitRequest,
12+
ShutdownMsg, SyncDbMsg,
1313
},
1414
store::{fs::entry_state::EntryState, util::DD},
1515
util::channel::oneshot,
@@ -49,12 +49,6 @@ pub struct Dump {
4949
pub span: Span,
5050
}
5151

52-
#[derive(Debug)]
53-
pub struct Snapshot {
54-
pub(crate) tx: tokio::sync::oneshot::Sender<ReadOnlyTables>,
55-
pub span: Span,
56-
}
57-
5852
pub struct Update {
5953
pub hash: Hash,
6054
pub state: EntryState<Bytes>,
@@ -167,7 +161,7 @@ impl ReadWriteCommand {
167161
pub enum TopLevelCommand {
168162
SyncDb(SyncDbMsg),
169163
Shutdown(ShutdownMsg),
170-
Snapshot(Snapshot),
164+
ListBlobs(ListBlobsMsg),
171165
}
172166

173167
impl TopLevelCommand {
@@ -181,7 +175,7 @@ impl TopLevelCommand {
181175
match self {
182176
Self::SyncDb(x) => x.parent_span_opt(),
183177
Self::Shutdown(x) => x.parent_span_opt(),
184-
Self::Snapshot(x) => Some(&x.span),
178+
Self::ListBlobs(x) => Some(&x.span),
185179
}
186180
}
187181
}

0 commit comments

Comments
 (0)