Skip to content

Commit a859674

Browse files
committed
Add helpers for set and update
1 parent 6fc7630 commit a859674

File tree

2 files changed

+41
-36
lines changed

2 files changed

+41
-36
lines changed

src/store/fs/bao_file.rs

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,17 @@ use bytes::{Bytes, BytesMut};
2121
use derive_more::Debug;
2222
use irpc::channel::mpsc;
2323
use tokio::sync::watch;
24-
use tracing::{debug, error, info, trace, Span};
24+
use tracing::{debug, error, info, trace};
2525

2626
use super::{
2727
entry_state::{DataLocation, EntryState, OutboardLocation},
28-
meta::Update,
2928
options::{Options, PathOptions},
3029
BaoFilePart,
3130
};
3231
use crate::{
3332
api::blobs::Bitfield,
3433
store::{
35-
fs::{
36-
meta::{raw_outboard_size, Set},
37-
TaskContext,
38-
},
34+
fs::{meta::raw_outboard_size, TaskContext},
3935
util::{
4036
read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile,
4137
PartialMemStorage, SizeInfo, SparseMemFile, DD,
@@ -644,21 +640,7 @@ impl BaoFileHandle {
644640
let size = storage.bitfield.size;
645641
let (storage, entry_state) = storage.into_complete(size, &options)?;
646642
debug!("File was reconstructed as complete");
647-
let (tx, rx) = crate::util::channel::oneshot::channel();
648-
ctx.db
649-
.sender
650-
.send(
651-
Set {
652-
hash,
653-
state: entry_state,
654-
tx,
655-
span: Span::current(),
656-
}
657-
.into(),
658-
)
659-
.await
660-
.map_err(|_| io::Error::other("send update"))?;
661-
rx.await.map_err(|_| io::Error::other("receive update"))??;
643+
ctx.db.set(hash, entry_state).await?;
662644
storage.into()
663645
} else {
664646
storage.into()
@@ -796,19 +778,7 @@ impl BaoFileHandle {
796778
true
797779
});
798780
if let Some(update) = res? {
799-
ctx.db
800-
.sender
801-
.send(
802-
Update {
803-
hash: self.hash,
804-
state: update,
805-
tx: None,
806-
span: Span::current(),
807-
}
808-
.into(),
809-
)
810-
.await
811-
.map_err(|_| io::Error::other("send update"))?;
781+
ctx.db.update(self.hash, update).await?;
812782
}
813783
Ok(())
814784
}

src/store/fs/meta.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ mod proto;
3434
pub use proto::*;
3535
pub(crate) mod tables;
3636
use tables::{ReadOnlyTables, ReadableTables, Tables};
37-
use tracing::{debug, error, info_span, trace};
37+
use tracing::{debug, error, info_span, trace, Span};
3838

3939
use super::{
4040
delete_set::DeleteHandle,
@@ -88,14 +88,49 @@ pub type ActorResult<T> = Result<T, ActorError>;
8888

8989
#[derive(Debug, Clone)]
9090
pub struct Db {
91-
pub sender: tokio::sync::mpsc::Sender<Command>,
91+
sender: tokio::sync::mpsc::Sender<Command>,
9292
}
9393

9494
impl Db {
9595
pub fn new(sender: tokio::sync::mpsc::Sender<Command>) -> Self {
9696
Self { sender }
9797
}
9898

99+
/// Update the entry state for a hash, without awaiting completion.
100+
pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
101+
self.sender
102+
.send(
103+
Update {
104+
hash,
105+
state,
106+
tx: None,
107+
span: Span::current(),
108+
}
109+
.into(),
110+
)
111+
.await
112+
.map_err(|_| io::Error::other("send update"))
113+
}
114+
115+
/// Set the entry state and await completion.
116+
pub async fn set(&self, hash: Hash, entry_state: EntryState<Bytes>) -> io::Result<()> {
117+
let (tx, rx) = oneshot::channel();
118+
self.sender
119+
.send(
120+
Set {
121+
hash,
122+
state: entry_state,
123+
tx,
124+
span: Span::current(),
125+
}
126+
.into(),
127+
)
128+
.await
129+
.map_err(|_| io::Error::other("send update"))?;
130+
rx.await.map_err(|_| io::Error::other("receive update"))??;
131+
Ok(())
132+
}
133+
99134
/// Get the entry state for a hash, if any.
100135
pub async fn get(&self, hash: Hash) -> anyhow::Result<Option<EntryState<Bytes>>> {
101136
let (tx, rx) = oneshot::channel();

0 commit comments

Comments
 (0)