11use super :: misc:: { self , marshal, unmarshal} ;
22use bytes:: { Bytes , BytesMut } ;
33use rsocket_rust:: error:: RSocketError ;
4- use rsocket_rust:: extension:: {
5- CompositeMetadata , CompositeMetadataEntry , MimeType , RoutingMetadata ,
6- } ;
4+ use rsocket_rust:: extension:: { CompositeMetadata , MimeType , RoutingMetadata } ;
75use rsocket_rust:: prelude:: * ;
86use rsocket_rust:: utils:: Writeable ;
97use rsocket_rust_transport_tcp:: TcpClientTransport ;
@@ -16,11 +14,13 @@ use std::result::Result;
1614use std:: sync:: Arc ;
1715use url:: Url ;
1816
19- type FnMetadata = Box < dyn FnMut ( ) -> Result < ( MimeType , Vec < u8 > ) , Box < dyn Error > > > ;
20- type FnData = Box < dyn FnMut ( & MimeType ) -> Result < Vec < u8 > , Box < dyn Error > > > ;
21- type PreflightResult = Result < ( Payload , MimeType , Arc < Box < dyn RSocket > > ) , Box < dyn Error > > ;
22- type UnpackerResult = Result < ( MimeType , Payload ) , RSocketError > ;
23- type UnpackersResult = Result < ( MimeType , Flux < Result < Payload , RSocketError > > ) , Box < dyn Error > > ;
17+ type FnMetadata = Box < dyn FnMut ( ) -> Result < ( MimeType , Vec < u8 > ) , Box < dyn Error + Sync + Send > > > ;
18+ type FnData = Box < dyn FnMut ( & MimeType ) -> Result < Vec < u8 > , Box < dyn Error + Sync + Send > > > ;
19+ type PreflightResult =
20+ Result < ( Payload , MimeType , Arc < Box < dyn RSocket > > ) , Box < dyn Error + Sync + Send > > ;
21+ type UnpackerResult = Result < ( MimeType , Payload ) , Box < dyn Error + Sync + Send > > ;
22+ type UnpackersResult =
23+ Result < ( MimeType , Flux < Result < Payload , RSocketError > > ) , Box < dyn Error + Sync + Send > > ;
2424
2525enum TransportKind {
2626 TCP ( String , u16 ) ,
@@ -41,8 +41,8 @@ pub struct RequestSpec {
4141pub struct RequesterBuilder {
4242 data_mime_type : Option < MimeType > ,
4343 route : Option < String > ,
44- metadata : Vec < CompositeMetadataEntry > ,
45- data : Option < Vec < u8 > > ,
44+ metadata : LinkedList < FnMetadata > ,
45+ data : Option < FnData > ,
4646 tp : Option < TransportKind > ,
4747}
4848
@@ -83,40 +83,26 @@ impl RequesterBuilder {
8383 self
8484 }
8585
86- pub fn setup_data < D > ( mut self , data : & D ) -> Self
86+ pub fn setup_data < D > ( mut self , data : D ) -> Self
8787 where
88- D : Sized + Serialize ,
88+ D : Sized + Serialize + ' static ,
8989 {
90- // TODO: lazy set
91- let result = match & self . data_mime_type {
92- Some ( m) => do_marshal ( m, data) ,
93- None => do_marshal ( & MimeType :: APPLICATION_JSON , data) ,
94- } ;
95- match result {
96- Ok ( raw) => {
97- self . data = Some ( raw) ;
98- }
99- Err ( e) => {
100- error ! ( "marshal failed: {:?}" , e) ;
101- }
102- }
90+ self . data = Some ( Box :: new ( move |mime_type : & MimeType | {
91+ do_marshal ( mime_type, & data)
92+ } ) ) ;
10393 self
10494 }
10595
106- pub fn setup_metadata < M , T > ( mut self , metadata : & M , mime_type : T ) -> Self
96+ pub fn setup_metadata < M , T > ( mut self , metadata : M , mime_type : T ) -> Self
10797 where
108- M : Sized + Serialize ,
98+ M : Sized + Serialize + ' static ,
10999 T : Into < MimeType > ,
110100 {
111- // TODO: lazy set
112101 let mime_type = mime_type. into ( ) ;
113- match do_marshal ( & mime_type, metadata) {
114- Ok ( raw) => {
115- let entry = CompositeMetadataEntry :: new ( mime_type, Bytes :: from ( raw) ) ;
116- self . metadata . push ( entry) ;
117- }
118- Err ( e) => error ! ( "marshal failed: {:?}" , e) ,
119- }
102+ self . metadata . push_back ( Box :: new ( move || {
103+ let raw = do_marshal ( & mime_type, & metadata) ?;
104+ Ok ( ( mime_type. clone ( ) , raw) )
105+ } ) ) ;
120106 self
121107 }
122108
@@ -148,8 +134,10 @@ impl RequesterBuilder {
148134 composite_builder. push ( MimeType :: MESSAGE_X_RSOCKET_ROUTING_V0 , routing. bytes ( ) ) ;
149135 added += 1 ;
150136 }
151- for it in self . metadata . into_iter ( ) {
152- composite_builder = composite_builder. push_entry ( it) ;
137+
138+ for mut gen in self . metadata . into_iter ( ) {
139+ let ( mime_type, raw) = gen ( ) ?;
140+ composite_builder = composite_builder. push ( mime_type, raw) ;
153141 added += 1 ;
154142 }
155143
@@ -159,8 +147,8 @@ impl RequesterBuilder {
159147 payload_builder = payload_builder. set_metadata ( composite_builder. build ( ) ) ;
160148 }
161149
162- if let Some ( raw ) = self . data {
163- payload_builder = payload_builder. set_data ( raw ) ;
150+ if let Some ( mut gen ) = self . data {
151+ payload_builder = payload_builder. set_data ( gen ( & data_mime_type ) ? ) ;
164152 }
165153
166154 let setup = payload_builder. build ( ) ;
@@ -291,16 +279,12 @@ impl RequestSpec {
291279 Ok ( v) => Unpacker {
292280 inner : Ok ( ( mime_type, v) ) ,
293281 } ,
294- Err ( e) => Unpacker { inner : Err ( e) } ,
295- }
296- }
297- Err ( e) => {
298- // TODO: better error
299- let msg = format ! ( "{}" , e) ;
300- Unpacker {
301- inner : Err ( RSocketError :: from ( msg) ) ,
282+ Err ( e) => Unpacker {
283+ inner : Err ( e. into ( ) ) ,
284+ } ,
302285 }
303286 }
287+ Err ( e) => Unpacker { inner : Err ( e) } ,
304288 }
305289 }
306290
@@ -337,7 +321,7 @@ impl RequestSpec {
337321}
338322
339323impl Unpackers {
340- pub async fn block < T > ( self ) -> Result < Vec < T > , Box < dyn Error > >
324+ pub async fn block < T > ( self ) -> Result < Vec < T > , Box < dyn Error + Sync + Send > >
341325 where
342326 T : Sized + DeserializeOwned ,
343327 {
@@ -353,12 +337,13 @@ impl Unpackers {
353337 }
354338 }
355339 }
356- Err ( e) => return Err ( format ! ( "{}" , e ) . into ( ) ) ,
340+ Err ( e) => return Err ( e . into ( ) ) ,
357341 }
358342 }
359343 Ok ( res)
360344 }
361- pub async fn foreach < T > ( self , callback : impl Fn ( T ) ) -> Result < ( ) , Box < dyn Error > >
345+
346+ pub async fn foreach < T > ( self , callback : impl Fn ( T ) ) -> Result < ( ) , Box < dyn Error + Send + Sync > >
362347 where
363348 T : Sized + DeserializeOwned ,
364349 {
@@ -381,39 +366,34 @@ impl Unpackers {
381366}
382367
383368impl Unpacker {
384- pub fn block < T > ( self ) -> Result < Option < T > , Box < dyn Error > >
369+ pub fn block < T > ( self ) -> Result < Option < T > , Box < dyn Error + Send + Sync > >
385370 where
386371 T : Sized + DeserializeOwned ,
387372 {
388- match self . inner {
389- Ok ( ( mime_type, inner) ) => match inner. data ( ) {
390- // TODO: support more mime types.
391- Some ( raw) => do_unmarshal ( & mime_type, raw) ,
392- None => Ok ( None ) ,
393- } ,
394- Err ( e) => Err ( format ! ( "{}" , e) . into ( ) ) ,
373+ let ( mime_type, inner) = self . inner ?;
374+ match inner. data ( ) {
375+ Some ( raw) => do_unmarshal ( & mime_type, raw) ,
376+ None => Ok ( None ) ,
395377 }
396378 }
397379}
398380
399- fn do_unmarshal < T > ( mime_type : & MimeType , raw : & Bytes ) -> Result < Option < T > , Box < dyn Error > >
381+ fn do_unmarshal < T > (
382+ mime_type : & MimeType ,
383+ raw : & Bytes ,
384+ ) -> Result < Option < T > , Box < dyn Error + Send + Sync > >
400385where
401386 T : Sized + DeserializeOwned ,
402387{
388+ // TODO: support more mime types
403389 match * mime_type {
404- MimeType :: APPLICATION_JSON => {
405- let t = unmarshal ( misc:: json ( ) , & raw . as_ref ( ) ) ?;
406- Ok ( Some ( t) )
407- }
408- MimeType :: APPLICATION_CBOR => {
409- let t = unmarshal ( misc:: cbor ( ) , & raw . as_ref ( ) ) ?;
410- Ok ( Some ( t) )
411- }
390+ MimeType :: APPLICATION_JSON => Ok ( Some ( unmarshal ( misc:: json ( ) , & raw . as_ref ( ) ) ?) ) ,
391+ MimeType :: APPLICATION_CBOR => Ok ( Some ( unmarshal ( misc:: cbor ( ) , & raw . as_ref ( ) ) ?) ) ,
412392 _ => Err ( "unsupported mime type!" . into ( ) ) ,
413393 }
414394}
415395
416- fn do_marshal < T > ( mime_type : & MimeType , data : & T ) -> Result < Vec < u8 > , Box < dyn Error > >
396+ fn do_marshal < T > ( mime_type : & MimeType , data : & T ) -> Result < Vec < u8 > , Box < dyn Error + Send + Sync > >
417397where
418398 T : Sized + Serialize ,
419399{
0 commit comments