@@ -7,8 +7,9 @@ use std::{
77
88use anyhow:: anyhow;
99use client:: {
10- blobs:: { BlobInfo , BlobStatus , IncompleteBlobInfo , WrapOption } ,
10+ blobs:: { self , BlobInfo , BlobStatus , IncompleteBlobInfo , WrapOption } ,
1111 tags:: TagInfo ,
12+ MemConnector ,
1213} ;
1314use futures_buffered:: BufferedStreamExt ;
1415use futures_lite:: StreamExt ;
@@ -32,7 +33,11 @@ use proto::{
3233 } ,
3334 Request , RpcError , RpcResult , RpcService ,
3435} ;
35- use quic_rpc:: server:: { ChannelTypes , RpcChannel , RpcServerError } ;
36+ use quic_rpc:: {
37+ server:: { ChannelTypes , RpcChannel , RpcServerError } ,
38+ RpcClient , RpcServer ,
39+ } ;
40+ use tokio_util:: task:: AbortOnDropHandle ;
3641
3742use crate :: {
3843 export:: ExportProgress ,
@@ -56,6 +61,16 @@ const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64;
5661const RPC_BLOB_GET_CHANNEL_CAP : usize = 2 ;
5762
5863impl < D : crate :: store:: Store > Blobs < D > {
64+ /// Get a client for the blobs protocol
65+ pub fn client ( self : Arc < Self > ) -> blobs:: MemClient {
66+ let client = self
67+ . rpc_handler
68+ . get_or_init ( || RpcHandler :: new ( & self ) )
69+ . client
70+ . clone ( ) ;
71+ blobs:: Client :: new ( client)
72+ }
73+
5974 /// Handle an RPC request
6075 pub async fn handle_rpc_request < C > (
6176 self : Arc < Self > ,
@@ -871,3 +886,23 @@ impl<D: crate::store::Store> Blobs<D> {
871886 Ok ( CreateCollectionResponse { hash, tag } )
872887 }
873888}
889+
890+ #[ derive( Debug ) ]
891+ pub ( crate ) struct RpcHandler {
892+ /// Client to hand out
893+ client : RpcClient < RpcService , MemConnector > ,
894+ /// Handler task
895+ _handler : AbortOnDropHandle < ( ) > ,
896+ }
897+
898+ impl RpcHandler {
899+ fn new < D : crate :: store:: Store > ( blobs : & Arc < Blobs < D > > ) -> Self {
900+ let blobs = blobs. clone ( ) ;
901+ let ( listener, connector) = quic_rpc:: transport:: flume:: channel ( 1 ) ;
902+ let listener = RpcServer :: new ( listener) ;
903+ let client = RpcClient :: new ( connector) ;
904+ let _handler = listener
905+ . spawn_accept_loop ( move |req, chan| blobs. clone ( ) . handle_rpc_request ( req, chan) ) ;
906+ Self { client, _handler }
907+ }
908+ }
0 commit comments