Skip to content

Commit 7c05401

Browse files
committed
Change list blobs rpc to have an explicit end case
1 parent babdc93 commit 7c05401

File tree

5 files changed

+53
-18
lines changed

5 files changed

+53
-18
lines changed

src/api/blobs.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ use super::{
5656
ApiClient, RequestResult, Tags,
5757
};
5858
use crate::{
59-
api::proto::{BatchRequest, ImportByteStreamUpdate},
59+
api::proto::{BatchRequest, ImportByteStreamUpdate, ListBlobsItem},
6060
provider::StreamContext,
6161
store::IROH_BLOCK_SIZE,
6262
util::temp_tag::TempTag,
@@ -835,32 +835,53 @@ impl ImportBaoHandle {
835835

836836
/// A progress handle for a blobs list operation.
837837
pub struct BlobsListProgress {
838-
inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
838+
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListBlobsItem>>>,
839839
}
840840

841841
impl BlobsListProgress {
842842
fn new(
843-
fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
843+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListBlobsItem>>> + Send + 'static,
844844
) -> Self {
845845
Self {
846846
inner: Box::pin(fut),
847847
}
848848
}
849849

850850
pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
851-
let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
851+
let mut rx = self.inner.await?;
852852
let mut hashes = Vec::new();
853-
while let Some(item) = rx.recv().await? {
854-
hashes.push(item?);
853+
loop {
854+
match rx.recv().await? {
855+
Some(ListBlobsItem::Item(hash)) => hashes.push(hash),
856+
Some(ListBlobsItem::Error(cause)) => return Err(cause.into()),
857+
Some(ListBlobsItem::Done) => break,
858+
None => return Err(super::Error::other("unexpected end of stream").into()),
859+
}
855860
}
856861
Ok(hashes)
857862
}
858863

859864
pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
860865
let mut rx = self.inner.await?;
861866
Ok(Gen::new(|co| async move {
862-
while let Ok(Some(item)) = rx.recv().await {
863-
co.yield_(item).await;
867+
loop {
868+
match rx.recv().await {
869+
Ok(Some(ListBlobsItem::Item(hash))) => co.yield_(Ok(hash)).await,
870+
Ok(Some(ListBlobsItem::Error(cause))) => {
871+
co.yield_(Err(cause)).await;
872+
break;
873+
}
874+
Ok(Some(ListBlobsItem::Done)) => break,
875+
Ok(None) => {
876+
co.yield_(Err(super::Error::other("unexpected end of stream").into()))
877+
.await;
878+
break;
879+
}
880+
Err(cause) => {
881+
co.yield_(Err(cause.into())).await;
882+
break;
883+
}
884+
}
864885
}
865886
}))
866887
}

src/api/proto.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl HashSpecific for CreateTagMsg {
8989
#[rpc_requests(message = Command, alias = "Msg")]
9090
#[derive(Debug, Serialize, Deserialize)]
9191
pub enum Request {
92-
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
92+
#[rpc(tx = mpsc::Sender<ListBlobsItem>)]
9393
ListBlobs(ListRequest),
9494
#[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
9595
Batch(BatchRequest),
@@ -351,6 +351,13 @@ pub struct TagInfo {
351351
pub hash: Hash,
352352
}
353353

354+
#[derive(Debug, Serialize, Deserialize)]
355+
pub enum ListBlobsItem {
356+
Item(Hash),
357+
Done,
358+
Error(super::Error),
359+
}
360+
354361
impl From<TagInfo> for HashAndFormat {
355362
fn from(tag_info: TagInfo) -> Self {
356363
HashAndFormat {

src/store/fs/meta.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use crate::{
2323
blobs::BlobStatus,
2424
proto::{
2525
BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, ClearProtectedMsg,
26-
CreateTagRequest, DeleteBlobsMsg, DeleteTagsRequest, ListBlobsMsg, ListRequest,
27-
ListTagsRequest, RenameTagRequest, SetTagRequest, ShutdownMsg, SyncDbMsg,
26+
CreateTagRequest, DeleteBlobsMsg, DeleteTagsRequest, ListBlobsItem, ListBlobsMsg,
27+
ListRequest, ListTagsRequest, RenameTagRequest, SetTagRequest, ShutdownMsg, SyncDbMsg,
2828
},
2929
tags::TagInfo,
3030
},
@@ -887,20 +887,21 @@ pub async fn list_blobs(snapshot: ReadOnlyTables, cmd: ListBlobsMsg) {
887887
Ok(()) => {}
888888
Err(e) => {
889889
error!("error listing blobs: {}", e);
890-
tx.send(Err(e)).await.ok();
890+
tx.send(ListBlobsItem::Error(e)).await.ok();
891891
}
892892
}
893893
}
894894

895895
async fn list_blobs_impl(
896896
snapshot: ReadOnlyTables,
897897
_cmd: ListRequest,
898-
tx: &mut mpsc::Sender<api::Result<Hash>>,
898+
tx: &mut mpsc::Sender<ListBlobsItem>,
899899
) -> api::Result<()> {
900900
for item in snapshot.blobs.iter().map_err(api::Error::other)? {
901901
let (k, _) = item.map_err(api::Error::other)?;
902902
let k = k.value();
903-
tx.send(Ok(k)).await.ok();
903+
tx.send(ListBlobsItem::Item(k)).await.ok();
904904
}
905+
tx.send(ListBlobsItem::Done).await.ok();
905906
Ok(())
906907
}

src/store/mem.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,10 +330,15 @@ impl Actor {
330330
let blobs = self.state.data.keys().cloned().collect::<Vec<Hash>>();
331331
self.spawn(async move {
332332
for blob in blobs {
333-
if tx.send(Ok(blob)).await.is_err() {
333+
if tx
334+
.send(api::proto::ListBlobsItem::Item(blob))
335+
.await
336+
.is_err()
337+
{
334338
break;
335339
}
336340
}
341+
tx.send(api::proto::ListBlobsItem::Done).await.ok();
337342
});
338343
}
339344
Command::BlobStatus(cmd) => {

src/store/readonly_mem.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use crate::{
3636
proto::{
3737
self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg,
3838
ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest,
39-
ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg,
40-
ObserveRequest, WaitIdleMsg,
39+
ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ListBlobsItem,
40+
ObserveMsg, ObserveRequest, WaitIdleMsg,
4141
},
4242
ApiClient, TempTag,
4343
},
@@ -178,8 +178,9 @@ impl Actor {
178178
let hashes: Vec<Hash> = self.data.keys().cloned().collect();
179179
self.tasks.spawn(async move {
180180
for hash in hashes {
181-
cmd.tx.send(Ok(hash)).await.ok();
181+
cmd.tx.send(ListBlobsItem::Item(hash)).await.ok();
182182
}
183+
cmd.tx.send(ListBlobsItem::Done).await.ok();
183184
});
184185
}
185186
Command::BlobStatus(cmd) => {

0 commit comments

Comments
 (0)