@@ -12,20 +12,24 @@ use anyhow::{anyhow, Context, Result};
1212use bytes:: Bytes ;
1313use futures_util:: FutureExt ;
1414use iroh_blobs:: Hash ;
15+ use irpc:: channel:: mpsc;
1516use serde:: { Deserialize , Serialize } ;
1617use tokio:: { sync:: oneshot, task:: JoinSet } ;
1718use tracing:: { debug, error, error_span, trace, warn} ;
1819
1920use crate :: {
2021 metrics:: Metrics ,
2122 ranger:: Message ,
23+ rpc2:: {
24+ protocol:: { AuthorListResponse , ListResponse } ,
25+ RpcError , RpcResult ,
26+ } ,
2227 store:: {
2328 fs:: { ContentHashesIterator , StoreInstance } ,
2429 DownloadPolicy , ImportNamespaceOutcome , Query , Store ,
2530 } ,
26- Author , AuthorHeads , AuthorId , Capability , CapabilityKind , ContentStatus ,
27- ContentStatusCallback , Event , NamespaceId , NamespaceSecret , PeerIdBytes , Replica , ReplicaInfo ,
28- SignedEntry , SyncOutcome ,
31+ Author , AuthorHeads , AuthorId , Capability , ContentStatus , ContentStatusCallback , Event ,
32+ NamespaceId , NamespaceSecret , PeerIdBytes , Replica , ReplicaInfo , SignedEntry , SyncOutcome ,
2933} ;
3034
3135const ACTION_CAP : usize = 1024 ;
@@ -60,12 +64,12 @@ enum Action {
6064 #[ display( "ListAuthors" ) ]
6165 ListAuthors {
6266 #[ debug( "reply" ) ]
63- reply : async_channel :: Sender < Result < AuthorId > > ,
67+ reply : mpsc :: Sender < RpcResult < AuthorListResponse > > ,
6468 } ,
6569 #[ display( "ListReplicas" ) ]
6670 ListReplicas {
6771 #[ debug( "reply" ) ]
68- reply : async_channel :: Sender < Result < ( NamespaceId , CapabilityKind ) > > ,
72+ reply : mpsc :: Sender < RpcResult < ListResponse > > ,
6973 } ,
7074 #[ display( "ContentHashes" ) ]
7175 ContentHashes {
@@ -165,7 +169,7 @@ enum ReplicaAction {
165169 } ,
166170 GetMany {
167171 query : Query ,
168- reply : async_channel :: Sender < Result < SignedEntry > > ,
172+ reply : mpsc :: Sender < RpcResult < SignedEntry > > ,
169173 } ,
170174 DropReplica {
171175 reply : oneshot:: Sender < Result < ( ) > > ,
@@ -290,6 +294,7 @@ impl SyncHandle {
290294 }
291295
292296 pub async fn open ( & self , namespace : NamespaceId , opts : OpenOpts ) -> Result < ( ) > {
297+ tracing:: debug!( "SyncHandle::open called" ) ;
293298 let ( reply, rx) = oneshot:: channel ( ) ;
294299 let action = ReplicaAction :: Open { reply, opts } ;
295300 self . send_replica ( namespace, action) . await ?;
@@ -443,7 +448,7 @@ impl SyncHandle {
443448 & self ,
444449 namespace : NamespaceId ,
445450 query : Query ,
446- reply : async_channel :: Sender < Result < SignedEntry > > ,
451+ reply : mpsc :: Sender < RpcResult < SignedEntry > > ,
447452 ) -> Result < ( ) > {
448453 let action = ReplicaAction :: GetMany { query, reply } ;
449454 self . send_replica ( namespace, action) . await ?;
@@ -497,14 +502,14 @@ impl SyncHandle {
497502 Ok ( store)
498503 }
499504
500- pub async fn list_authors ( & self , reply : async_channel:: Sender < Result < AuthorId > > ) -> Result < ( ) > {
505+ pub async fn list_authors (
506+ & self ,
507+ reply : mpsc:: Sender < RpcResult < AuthorListResponse > > ,
508+ ) -> Result < ( ) > {
501509 self . send ( Action :: ListAuthors { reply } ) . await
502510 }
503511
504- pub async fn list_replicas (
505- & self ,
506- reply : async_channel:: Sender < Result < ( NamespaceId , CapabilityKind ) > > ,
507- ) -> Result < ( ) > {
512+ pub async fn list_replicas ( & self , reply : mpsc:: Sender < RpcResult < ListResponse > > ) -> Result < ( ) > {
508513 self . send ( Action :: ListReplicas { reply } ) . await
509514 }
510515
@@ -696,15 +701,18 @@ impl Actor {
696701 let iter = self
697702 . store
698703 . list_authors ( )
699- . map ( |a| a. map ( |a| a. map ( |a| a. id ( ) ) ) ) ;
704+ . map ( |a| a. map ( |a| a. map ( |a| AuthorListResponse { author_id : a. id ( ) } ) ) ) ;
700705 self . tasks
701- . spawn_local ( iter_to_channel_async ( reply, iter) . map ( |_| ( ) ) ) ;
706+ . spawn_local ( iter_to_irpc ( reply, iter) . map ( |_| ( ) ) ) ;
702707 Ok ( ( ) )
703708 }
704709 Action :: ListReplicas { reply } => {
705710 let iter = self . store . list_namespaces ( ) ;
711+ let iter = iter. map ( |inner| {
712+ inner. map ( |res| res. map ( |( id, capability) | ListResponse { id, capability } ) )
713+ } ) ;
706714 self . tasks
707- . spawn_local ( iter_to_channel_async ( reply, iter) . map ( |_| ( ) ) ) ;
715+ . spawn_local ( iter_to_irpc ( reply, iter) . map ( |_| ( ) ) ) ;
708716 Ok ( ( ) )
709717 }
710718 Action :: ContentHashes { reply } => {
@@ -838,7 +846,7 @@ impl Actor {
838846 . ensure_open ( & namespace)
839847 . and_then ( |_| self . store . get_many ( namespace, query) ) ;
840848 self . tasks
841- . spawn_local ( iter_to_channel_async ( reply, iter) . map ( |_| ( ) ) ) ;
849+ . spawn_local ( iter_to_irpc ( reply, iter) . map ( |_| ( ) ) ) ;
842850 Ok ( ( ) )
843851 }
844852 ReplicaAction :: DropReplica { reply } => send_reply_with ( reply, self , |this| {
@@ -984,6 +992,7 @@ impl OpenReplicas {
984992 }
985993 hash_map:: Entry :: Occupied ( mut e) => {
986994 let state = e. get_mut ( ) ;
995+ tracing:: debug!( "STATE {state:?}" ) ;
987996 state. handles = state. handles . wrapping_sub ( 1 ) ;
988997 if state. handles == 0 {
989998 let _ = e. remove_entry ( ) ;
@@ -1001,14 +1010,18 @@ impl OpenReplicas {
10011010 }
10021011}
10031012
1004- async fn iter_to_channel_async < T : Send + ' static > (
1005- channel : async_channel :: Sender < Result < T > > ,
1013+ async fn iter_to_irpc < T : irpc :: RpcMessage > (
1014+ channel : mpsc :: Sender < RpcResult < T > > ,
10061015 iter : Result < impl Iterator < Item = Result < T > > > ,
10071016) -> Result < ( ) , SendReplyError > {
10081017 match iter {
1009- Err ( err) => channel. send ( Err ( err) ) . await . map_err ( send_reply_error) ?,
1018+ Err ( err) => channel
1019+ . send ( Err ( RpcError :: new ( & * err) ) )
1020+ . await
1021+ . map_err ( send_reply_error) ?,
10101022 Ok ( iter) => {
10111023 for item in iter {
1024+ let item = item. map_err ( |err| RpcError :: new ( & * err) ) ;
10121025 channel. send ( item) . await . map_err ( send_reply_error) ?;
10131026 }
10141027 }
0 commit comments