22
33use std:: { error:: Error , fmt, string:: FromUtf8Error , sync:: Arc } ;
44
5+ use http_body_util:: BodyExt as _;
56use hyper:: {
7+ body,
68 header:: { self , HeaderValue } ,
7- Body , Method , Request , Response , StatusCode ,
9+ Method , Request , Response , StatusCode ,
810} ;
911use juniper:: {
1012 http:: { GraphQLBatchRequest , GraphQLRequest as JuniperGraphQLRequest , GraphQLRequest } ,
@@ -16,7 +18,7 @@ use url::form_urlencoded;
1618pub async fn graphql_sync < CtxT , QueryT , MutationT , SubscriptionT , S > (
1719 root_node : Arc < RootNode < ' static , QueryT , MutationT , SubscriptionT , S > > ,
1820 context : Arc < CtxT > ,
19- req : Request < Body > ,
21+ req : Request < body :: Incoming > ,
2022) -> Response < String >
2123where
2224 QueryT : GraphQLType < S , Context = CtxT > ,
3739pub async fn graphql < CtxT , QueryT , MutationT , SubscriptionT , S > (
3840 root_node : Arc < RootNode < ' static , QueryT , MutationT , SubscriptionT , S > > ,
3941 context : Arc < CtxT > ,
40- req : Request < Body > ,
42+ req : Request < body :: Incoming > ,
4143) -> Response < String >
4244where
4345 QueryT : GraphQLTypeAsync < S , Context = CtxT > ,
5658}
5759
5860async fn parse_req < S : ScalarValue > (
59- req : Request < Body > ,
61+ req : Request < body :: Incoming > ,
6062) -> Result < GraphQLBatchRequest < S > , Response < String > > {
6163 match * req. method ( ) {
6264 Method :: GET => parse_get_req ( req) ,
@@ -77,7 +79,7 @@ async fn parse_req<S: ScalarValue>(
7779}
7880
7981fn parse_get_req < S : ScalarValue > (
80- req : Request < Body > ,
82+ req : Request < body :: Incoming > ,
8183) -> Result < GraphQLBatchRequest < S > , GraphQLRequestError > {
8284 req. uri ( )
8385 . query ( )
@@ -90,27 +92,29 @@ fn parse_get_req<S: ScalarValue>(
9092}
9193
9294async fn parse_post_json_req < S : ScalarValue > (
93- body : Body ,
95+ body : body :: Incoming ,
9496) -> Result < GraphQLBatchRequest < S > , GraphQLRequestError > {
95- let chunk = hyper:: body:: to_bytes ( body)
97+ let chunk = body
98+ . collect ( )
9699 . await
97100 . map_err ( GraphQLRequestError :: BodyHyper ) ?;
98101
99- let input = String :: from_utf8 ( chunk. iter ( ) . cloned ( ) . collect ( ) )
102+ let input = String :: from_utf8 ( chunk. to_bytes ( ) . iter ( ) . cloned ( ) . collect ( ) )
100103 . map_err ( GraphQLRequestError :: BodyUtf8 ) ?;
101104
102105 serde_json:: from_str :: < GraphQLBatchRequest < S > > ( & input)
103106 . map_err ( GraphQLRequestError :: BodyJSONError )
104107}
105108
106109async fn parse_post_graphql_req < S : ScalarValue > (
107- body : Body ,
110+ body : body :: Incoming ,
108111) -> Result < GraphQLBatchRequest < S > , GraphQLRequestError > {
109- let chunk = hyper:: body:: to_bytes ( body)
112+ let chunk = body
113+ . collect ( )
110114 . await
111115 . map_err ( GraphQLRequestError :: BodyHyper ) ?;
112116
113- let query = String :: from_utf8 ( chunk. iter ( ) . cloned ( ) . collect ( ) )
117+ let query = String :: from_utf8 ( chunk. to_bytes ( ) . iter ( ) . cloned ( ) . collect ( ) )
114118 . map_err ( GraphQLRequestError :: BodyUtf8 ) ?;
115119
116120 Ok ( GraphQLBatchRequest :: Single ( GraphQLRequest :: new (
@@ -306,18 +310,19 @@ impl Error for GraphQLRequestError {
306310
307311#[ cfg( test) ]
308312mod tests {
309- use hyper:: {
310- server:: Server ,
311- service:: { make_service_fn, service_fn} ,
312- Method , Response , StatusCode ,
313+ use std:: {
314+ convert:: Infallible , error:: Error , net:: SocketAddr , panic, sync:: Arc , time:: Duration ,
313315 } ;
316+
317+ use hyper:: { server:: conn:: http1, service:: service_fn, Method , Response , StatusCode } ;
318+ use hyper_util:: rt:: TokioIo ;
314319 use juniper:: {
315320 http:: tests as http_tests,
316321 tests:: fixtures:: starwars:: schema:: { Database , Query } ,
317322 EmptyMutation , EmptySubscription , RootNode ,
318323 } ;
319- use reqwest:: { self , blocking:: Response as ReqwestResponse } ;
320- use std :: { convert :: Infallible , net:: SocketAddr , sync :: Arc , thread , time:: Duration } ;
324+ use reqwest:: blocking:: Response as ReqwestResponse ;
325+ use tokio :: { net:: TcpListener , task , time:: sleep } ;
321326
322327 struct TestHyperIntegration {
323328 port : u16 ,
@@ -373,7 +378,7 @@ mod tests {
373378
374379 async fn run_hyper_integration ( is_sync : bool ) {
375380 let port = if is_sync { 3002 } else { 3001 } ;
376- let addr: SocketAddr = ( [ 127 , 0 , 0 , 1 ] , port) . into ( ) ;
381+ let addr = SocketAddr :: from ( ( [ 127 , 0 , 0 , 1 ] , port) ) ;
377382
378383 let db = Arc :: new ( Database :: new ( ) ) ;
379384 let root_node = Arc :: new ( RootNode :: new (
@@ -382,59 +387,74 @@ mod tests {
382387 EmptySubscription :: < Database > :: new ( ) ,
383388 ) ) ;
384389
385- let new_service = make_service_fn ( move |_| {
386- let root_node = root_node. clone ( ) ;
387- let ctx = db. clone ( ) ;
390+ let server: task:: JoinHandle < Result < ( ) , Box < dyn Error + Send + Sync > > > =
391+ task:: spawn ( async move {
392+ let listener = TcpListener :: bind ( addr) . await ?;
393+
394+ loop {
395+ let ( stream, _) = listener. accept ( ) . await ?;
396+ let io = TokioIo :: new ( stream) ;
388397
389- async move {
390- Ok :: < _ , hyper:: Error > ( service_fn ( move |req| {
391398 let root_node = root_node. clone ( ) ;
392- let ctx = ctx. clone ( ) ;
393- let matches = {
394- let path = req. uri ( ) . path ( ) ;
395- match req. method ( ) {
396- & Method :: POST | & Method :: GET => {
397- path == "/graphql" || path == "/graphql/"
398- }
399- _ => false ,
399+ let db = db. clone ( ) ;
400+
401+ _ = task:: spawn ( async move {
402+ let root_node = root_node. clone ( ) ;
403+ let db = db. clone ( ) ;
404+
405+ if let Err ( e) = http1:: Builder :: new ( )
406+ . serve_connection (
407+ io,
408+ service_fn ( move |req| {
409+ let root_node = root_node. clone ( ) ;
410+ let db = db. clone ( ) ;
411+ let matches = {
412+ let path = req. uri ( ) . path ( ) ;
413+ match req. method ( ) {
414+ & Method :: POST | & Method :: GET => {
415+ path == "/graphql" || path == "/graphql/"
416+ }
417+ _ => false ,
418+ }
419+ } ;
420+ async move {
421+ Ok :: < _ , Infallible > ( if matches {
422+ if is_sync {
423+ super :: graphql_sync ( root_node, db, req) . await
424+ } else {
425+ super :: graphql ( root_node, db, req) . await
426+ }
427+ } else {
428+ let mut resp = Response :: new ( String :: new ( ) ) ;
429+ * resp. status_mut ( ) = StatusCode :: NOT_FOUND ;
430+ resp
431+ } )
432+ }
433+ } ) ,
434+ )
435+ . await
436+ {
437+ eprintln ! ( "server error: {e}" ) ;
400438 }
401- } ;
402- async move {
403- Ok :: < _ , Infallible > ( if matches {
404- if is_sync {
405- super :: graphql_sync ( root_node, ctx, req) . await
406- } else {
407- super :: graphql ( root_node, ctx, req) . await
408- }
409- } else {
410- let mut resp = Response :: new ( String :: new ( ) ) ;
411- * resp. status_mut ( ) = StatusCode :: NOT_FOUND ;
412- resp
413- } )
414- }
415- } ) )
416- }
417- } ) ;
418-
419- let ( shutdown_fut, shutdown) = futures:: future:: abortable ( async {
420- tokio:: time:: sleep ( Duration :: from_secs ( 60 ) ) . await ;
421- } ) ;
422-
423- let server = Server :: bind ( & addr)
424- . serve ( new_service)
425- . with_graceful_shutdown ( async {
426- shutdown_fut. await . unwrap_err ( ) ;
439+ } ) ;
440+ }
427441 } ) ;
428442
429- tokio:: task:: spawn_blocking ( move || {
430- thread:: sleep ( Duration :: from_millis ( 10 ) ) ; // wait 10ms for server to bind
443+ sleep ( Duration :: from_secs ( 10 ) ) . await ; // wait 10ms for `server` to bind
444+
445+ match task:: spawn_blocking ( move || {
431446 let integration = TestHyperIntegration { port } ;
432447 http_tests:: run_http_test_suite ( & integration) ;
433- shutdown. abort ( ) ;
434- } ) ;
448+ } )
449+ . await
450+ {
451+ Err ( f) if f. is_panic ( ) => panic:: resume_unwind ( f. into_panic ( ) ) ,
452+ Ok ( ( ) ) | Err ( _) => { }
453+ }
435454
436- if let Err ( e) = server. await {
437- eprintln ! ( "server error: {e}" ) ;
455+ server. abort ( ) ;
456+ if let Ok ( Err ( e) ) = server. await {
457+ panic ! ( "server failed: {e}" ) ;
438458 }
439459 }
440460
0 commit comments