|
2 | 2 |
|
3 | 3 | use std::{ |
4 | 4 | io, |
| 5 | + ops::Deref, |
5 | 6 | sync::{Arc, Mutex}, |
6 | 7 | }; |
7 | 8 |
|
8 | 9 | use anyhow::anyhow; |
9 | 10 | use client::{ |
10 | | - blobs::{self, BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption}, |
| 11 | + blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, MemClient, WrapOption}, |
11 | 12 | tags::TagInfo, |
12 | 13 | MemConnector, |
13 | 14 | }; |
@@ -62,13 +63,8 @@ const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; |
62 | 63 |
|
63 | 64 | impl<D: crate::store::Store> Blobs<D> { |
64 | 65 | /// Get a client for the blobs protocol |
65 | | - pub fn client(&self) -> blobs::MemClient { |
66 | | - let client = self |
67 | | - .rpc_handler |
68 | | - .get_or_init(|| RpcHandler::new(self)) |
69 | | - .client |
70 | | - .clone(); |
71 | | - blobs::Client::new(client) |
| 66 | + pub fn client(&self) -> RpcHandler { |
| 67 | + RpcHandler::new(self) |
72 | 68 | } |
73 | 69 |
|
74 | 70 | /// Handle an RPC request |
@@ -874,20 +870,34 @@ impl<D: crate::store::Store> Blobs<D> { |
874 | 870 | } |
875 | 871 | } |
876 | 872 |
|
| 873 | +/// A rpc handler for the blobs rpc protocol |
| 874 | +/// |
| 875 | +/// This struct contains both a task that handles rpc requests and a client |
| 876 | +/// that can be used to send rpc requests. Dropping it will stop the handler task, |
| 877 | +/// so you need to put it somewhere where it will be kept alive. |
877 | 878 | #[derive(Debug)] |
878 | | -pub(crate) struct RpcHandler { |
| 879 | +pub struct RpcHandler { |
879 | 880 | /// Client to hand out |
880 | | - client: RpcClient<RpcService, MemConnector>, |
| 881 | + client: MemClient, |
881 | 882 | /// Handler task |
882 | 883 | _handler: AbortOnDropHandle<()>, |
883 | 884 | } |
884 | 885 |
|
| 886 | +impl Deref for RpcHandler { |
| 887 | + type Target = MemClient; |
| 888 | + |
| 889 | + fn deref(&self) -> &Self::Target { |
| 890 | + &self.client |
| 891 | + } |
| 892 | +} |
| 893 | + |
885 | 894 | impl RpcHandler { |
886 | 895 | fn new<D: crate::store::Store>(blobs: &Blobs<D>) -> Self { |
887 | 896 | let blobs = blobs.clone(); |
888 | 897 | let (listener, connector) = quic_rpc::transport::flume::channel(1); |
889 | 898 | let listener = RpcServer::new(listener); |
890 | 899 | let client = RpcClient::new(connector); |
| 900 | + let client = MemClient::new(client); |
891 | 901 | let _handler = listener |
892 | 902 | .spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan)); |
893 | 903 | Self { client, _handler } |
|
0 commit comments