1515//! the much simpler memory store.
1616use std:: {
1717 fmt:: { self , Debug } ,
18+ future:: { Future , IntoFuture } ,
1819 io,
1920 num:: NonZeroU64 ,
2021 ops:: { Bound , RangeBounds } ,
@@ -28,17 +29,25 @@ use bao_tree::{
2829 ChunkRanges ,
2930} ;
3031use bytes:: Bytes ;
32+ use genawaiter:: sync:: Gen ;
3133use irpc:: {
3234 channel:: { mpsc, oneshot} ,
3335 rpc_requests,
3436} ;
35- use n0_future:: Stream ;
37+ use n0_future:: { future , Stream } ;
3638use range_collections:: RangeSet2 ;
3739use serde:: { Deserialize , Serialize } ;
3840pub ( crate ) mod bitfield;
3941pub use bitfield:: Bitfield ;
4042
41- use crate :: { store:: util:: Tag , util:: temp_tag:: TempTag , BlobFormat , Hash , HashAndFormat } ;
43+ use crate :: {
44+ store:: util:: Tag ,
45+ util:: {
46+ irpc:: { IrpcReceiverFutExt , IrpcStreamItem } ,
47+ temp_tag:: TempTag ,
48+ } ,
49+ BlobFormat , Hash , HashAndFormat ,
50+ } ;
4251
4352pub ( crate ) trait HashSpecific {
4453 fn hash ( & self ) -> Hash ;
@@ -113,7 +122,7 @@ pub enum Request {
113122 ImportPath ( ImportPathRequest ) ,
114123 #[ rpc( tx = mpsc:: Sender <ExportProgressItem >) ]
115124 ExportPath ( ExportPathRequest ) ,
116- #[ rpc( tx = oneshot :: Sender <Vec < super :: Result < TagInfo >> >) ]
125+ #[ rpc( tx = mpsc :: Sender <ListTagsItem >) ]
117126 ListTags ( ListTagsRequest ) ,
118127 #[ rpc( tx = oneshot:: Sender <super :: Result <( ) >>) ]
119128 SetTag ( SetTagRequest ) ,
@@ -354,8 +363,110 @@ pub struct TagInfo {
354363#[ derive( Debug , Serialize , Deserialize ) ]
355364pub enum ListBlobsItem {
356365 Item ( Hash ) ,
366+ Error ( super :: Error ) ,
357367 Done ,
368+ }
369+
370+ #[ derive( Debug , Serialize , Deserialize ) ]
371+ pub enum ListTagsItem {
372+ Item ( TagInfo ) ,
358373 Error ( super :: Error ) ,
374+ Done ,
375+ }
376+
377+ impl From < std:: result:: Result < TagInfo , super :: Error > > for ListTagsItem {
378+ fn from ( item : std:: result:: Result < TagInfo , super :: Error > ) -> Self {
379+ match item {
380+ Ok ( item) => ListTagsItem :: Item ( item) ,
381+ Err ( err) => ListTagsItem :: Error ( err) ,
382+ }
383+ }
384+ }
385+
386+ impl IrpcStreamItem for ListTagsItem {
387+ type Error = super :: Error ;
388+ type Item = TagInfo ;
389+
390+ fn into_result_opt ( self ) -> Option < Result < TagInfo , super :: Error > > {
391+ match self {
392+ ListTagsItem :: Item ( item) => Some ( Ok ( item) ) ,
393+ ListTagsItem :: Done => None ,
394+ ListTagsItem :: Error ( err) => Some ( Err ( err) ) ,
395+ }
396+ }
397+
398+ fn from_result ( item : std:: result:: Result < TagInfo , super :: Error > ) -> Self {
399+ match item {
400+ Ok ( i) => Self :: Item ( i) ,
401+ Err ( e) => Self :: Error ( e. into ( ) ) ,
402+ }
403+ }
404+
405+ fn done ( ) -> Self {
406+ Self :: Done
407+ }
408+ }
409+
410+ pub struct ListTagsProgress {
411+ inner : future:: Boxed < irpc:: Result < mpsc:: Receiver < ListTagsItem > > > ,
412+ }
413+
414+ impl IntoFuture for ListTagsProgress {
415+ fn into_future ( self ) -> Self :: IntoFuture {
416+ Box :: pin ( self . inner . try_collect ( ) )
417+ }
418+
419+ type IntoFuture = future:: Boxed < Self :: Output > ;
420+
421+ type Output = super :: Result < Vec < TagInfo > > ;
422+ }
423+
424+ impl ListTagsProgress {
425+ pub ( super ) fn new (
426+ fut : impl Future < Output = irpc:: Result < mpsc:: Receiver < ListTagsItem > > > + Send + ' static ,
427+ ) -> Self {
428+ Self {
429+ inner : Box :: pin ( fut) ,
430+ }
431+ }
432+
433+ pub fn stream ( self ) -> impl Stream < Item = super :: Result < TagInfo > > {
434+ Gen :: new ( |co| async move {
435+ let mut rx = match self . inner . await {
436+ Ok ( rx) => rx,
437+ Err ( err) => {
438+ co. yield_ ( Err ( super :: Error :: from ( err) ) ) . await ;
439+ return ;
440+ }
441+ } ;
442+ loop {
443+ match rx. recv ( ) . await {
444+ Ok ( Some ( ListTagsItem :: Item ( item) ) ) => {
445+ co. yield_ ( Ok ( item) ) . await ;
446+ }
447+ Ok ( Some ( ListTagsItem :: Done ) ) => {
448+ break ;
449+ }
450+ Ok ( Some ( ListTagsItem :: Error ( err) ) ) => {
451+ co. yield_ ( Err ( err. into ( ) ) ) . await ;
452+ break ;
453+ }
454+ Ok ( None ) => {
455+ co. yield_ ( Err ( super :: Error :: Io ( io:: Error :: new (
456+ io:: ErrorKind :: UnexpectedEof ,
457+ "stream ended" ,
458+ ) ) ) )
459+ . await ;
460+ break ;
461+ }
462+ Err ( cause) => {
463+ co. yield_ ( Err ( super :: Error :: from ( cause) ) ) . await ;
464+ break ;
465+ }
466+ }
467+ }
468+ } )
469+ }
359470}
360471
361472impl From < TagInfo > for HashAndFormat {
0 commit comments