Skip to content

Commit 66f8540

Browse files
committed
Move message sending into meta
1 parent baf4d54 commit 66f8540

File tree

2 files changed

+29
-48
lines changed

2 files changed

+29
-48
lines changed

src/store/fs.rs

Lines changed: 6 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -246,20 +246,8 @@ impl HashContext {
246246
}
247247

248248
/// Update the entry state in the database, and wait for completion.
249-
pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
250-
let (tx, rx) = oneshot::channel();
251-
self.db()
252-
.send(
253-
meta::Update {
254-
hash,
255-
state,
256-
tx: Some(tx),
257-
span: tracing::Span::current(),
258-
}
259-
.into(),
260-
)
261-
.await?;
262-
rx.await.map_err(|_e| io::Error::other(""))??;
249+
pub async fn update_await(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
250+
self.db().update_await(hash, state).await?;
263251
Ok(())
264252
}
265253

@@ -269,40 +257,13 @@ impl HashContext {
269257
data_location: DataLocation::Inline(Bytes::new()),
270258
outboard_location: OutboardLocation::NotNeeded,
271259
}));
272-
}
273-
let (tx, rx) = oneshot::channel();
274-
self.db()
275-
.send(
276-
meta::Get {
277-
hash,
278-
tx,
279-
span: tracing::Span::current(),
280-
}
281-
.into(),
282-
)
283-
.await
284-
.ok();
285-
let res = rx.await.map_err(io::Error::other)?;
286-
Ok(res.state?)
260+
};
261+
self.db().get(hash).await
287262
}
288263

289264
/// Update the entry state in the database, and wait for completion.
290265
pub async fn set(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
291-
let (tx, rx) = oneshot::channel();
292-
self.db()
293-
.send(
294-
meta::Set {
295-
hash,
296-
state,
297-
tx,
298-
span: tracing::Span::current(),
299-
}
300-
.into(),
301-
)
302-
.await
303-
.map_err(io::Error::other)?;
304-
rx.await.map_err(|_e| io::Error::other(""))??;
305-
Ok(())
266+
self.db().set(hash, state).await
306267
}
307268

308269
pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result<BaoFileHandle> {
@@ -892,7 +853,7 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R
892853
data_location,
893854
outboard_location,
894855
};
895-
ctx.update(hash, state).await?;
856+
ctx.update_await(hash, state).await?;
896857
Ok(())
897858
}
898859

src/store/fs/meta.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,25 @@ impl Db {
9696
Self { sender }
9797
}
9898

99+
pub async fn update_await(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
100+
let (tx, rx) = oneshot::channel();
101+
self.sender
102+
.send(
103+
Update {
104+
hash,
105+
state,
106+
tx: Some(tx),
107+
span: tracing::Span::current(),
108+
}
109+
.into(),
110+
)
111+
.await
112+
.map_err(|_| io::Error::other("send update"))?;
113+
rx.await
114+
.map_err(|_e| io::Error::other("receive update"))??;
115+
Ok(())
116+
}
117+
99118
/// Update the entry state for a hash, without awaiting completion.
100119
pub async fn update(&self, hash: Hash, state: EntryState<Bytes>) -> io::Result<()> {
101120
self.sender
@@ -132,7 +151,7 @@ impl Db {
132151
}
133152

134153
/// Get the entry state for a hash, if any.
135-
pub async fn get(&self, hash: Hash) -> anyhow::Result<Option<EntryState<Bytes>>> {
154+
pub async fn get(&self, hash: Hash) -> io::Result<Option<EntryState<Bytes>>> {
136155
let (tx, rx) = oneshot::channel();
137156
self.sender
138157
.send(
@@ -143,8 +162,9 @@ impl Db {
143162
}
144163
.into(),
145164
)
146-
.await?;
147-
let res = rx.await?;
165+
.await
166+
.map_err(|_| io::Error::other("send get"))?;
167+
let res = rx.await.map_err(|_| io::Error::other("receive get"))?;
148168
Ok(res.state?)
149169
}
150170

0 commit comments

Comments
 (0)