33// SPDX-License-Identifier: Apache-2.0
44//
55
6+ use crate :: r#async:: utils;
67use nix:: unistd;
7- use protobuf:: { CodedInputStream , Message } ;
88use std:: collections:: HashMap ;
99use std:: os:: unix:: io:: RawFd ;
10+ use std:: result:: Result as StdResult ;
1011use std:: sync:: Arc ;
1112
1213use crate :: asynchronous:: stream:: { receive, respond, respond_with_status} ;
@@ -15,7 +16,7 @@ use crate::common::{self, Domain, MESSAGE_TYPE_REQUEST};
1516use crate :: context;
1617use crate :: error:: { get_status, Error , Result } ;
1718use crate :: r#async:: { MethodHandler , TtrpcContext } ;
18- use crate :: ttrpc:: { Code , Request } ;
19+ use crate :: ttrpc:: { Code , Status } ;
1920use crate :: MessageHeader ;
2021use futures:: stream:: Stream ;
2122use futures:: StreamExt as _;
@@ -303,63 +304,53 @@ async fn spawn_connection_handler<S>(
303304 } ) ;
304305}
305306
307+ async fn do_handle_request (
308+ fd : RawFd ,
309+ methods : Arc < HashMap < String , Box < dyn MethodHandler + Send + Sync > > > ,
310+ header : MessageHeader ,
311+ body : & [ u8 ] ,
312+ ) -> StdResult < ( u32 , Vec < u8 > ) , Status > {
313+ let req = utils:: body_to_request ( body) ?;
314+ let path = utils:: get_path ( & req. service , & req. method ) ;
315+ let method = methods
316+ . get ( & path)
317+ . ok_or_else ( || get_status ( Code :: INVALID_ARGUMENT , format ! ( "{} does not exist" , & path) ) ) ?;
318+
319+ let ctx = TtrpcContext {
320+ fd,
321+ mh : header,
322+ metadata : context:: from_pb ( & req. metadata ) ,
323+ } ;
324+
325+ method. handler ( ctx, req) . await . map_err ( |e| {
326+ error ! ( "method handle {} got error {:?}" , path, & e) ;
327+ get_status ( Code :: UNKNOWN , e)
328+ } )
329+ }
330+
306331async fn handle_request (
307332 tx : Sender < Vec < u8 > > ,
308333 fd : RawFd ,
309334 methods : Arc < HashMap < String , Box < dyn MethodHandler + Send + Sync > > > ,
310335 message : ( MessageHeader , Vec < u8 > ) ,
311336) {
312337 let ( header, body) = message;
313- if header. type_ != MESSAGE_TYPE_REQUEST {
314- return ;
315- }
316-
317- let mut req = Request :: new ( ) ;
318- let merge_result;
319- {
320- let mut s = CodedInputStream :: from_bytes ( & body) ;
321- merge_result = req. merge_from ( & mut s) ;
322- }
323-
324- if merge_result. is_err ( ) {
325- let status = get_status ( Code :: INVALID_ARGUMENT , "" . to_string ( ) ) ;
326-
327- if let Err ( x) = respond_with_status ( tx. clone ( ) , header. stream_id , status) . await {
328- error ! ( "respond get error {:?}" , x) ;
329- }
338+ let stream_id = header. stream_id ;
330339
340+ if header. type_ != MESSAGE_TYPE_REQUEST {
331341 return ;
332342 }
333- trace ! ( "Got Message request {:?}" , req) ;
334343
335- let stream_id = header. stream_id ;
336- let path = format ! ( "/{}/{}" , req. service, req. method) ;
337- if let Some ( x) = methods. get ( & path) {
338- let method = x;
339- let ctx = TtrpcContext {
340- fd,
341- mh : header,
342- metadata : context:: from_pb ( & req. metadata ) ,
343- } ;
344-
345- match method. handler ( ctx, req) . await {
346- Ok ( ( stream_id, body) ) => {
347- if let Err ( x) = respond ( tx. clone ( ) , stream_id, body) . await {
348- error ! ( "respond get error {:?}" , x) ;
349- }
350- }
351- Err ( e) => {
352- error ! ( "method handle {} get error {:?}" , path, e) ;
353- let status = get_status ( Code :: UNKNOWN , e) ;
354- if let Err ( e) = respond_with_status ( tx, stream_id, status) . await {
355- error ! ( "respond get error {:?}" , e) ;
356- }
344+ match do_handle_request ( fd, methods, header, & body) . await {
345+ Ok ( ( stream_id, resp_body) ) => {
346+ if let Err ( x) = respond ( tx. clone ( ) , stream_id, resp_body) . await {
347+ error ! ( "respond got error {:?}" , x) ;
357348 }
358349 }
359- } else {
360- let status = get_status ( Code :: INVALID_ARGUMENT , format ! ( "{} does not exist" , path ) ) ;
361- if let Err ( e ) = respond_with_status ( tx , header . stream_id , status ) . await {
362- error ! ( "respond get error {:?}" , e ) ;
350+ Err ( status ) => {
351+ if let Err ( x ) = respond_with_status ( tx . clone ( ) , stream_id , status ) . await {
352+ error ! ( "respond got error {:?}" , x ) ;
353+ }
363354 }
364355 }
365356}
0 commit comments