@@ -2,13 +2,12 @@ use std::fmt;
22use std:: future:: Future ;
33use std:: ops:: Range ;
44use std:: pin:: Pin ;
5- use std:: str:: FromStr ;
65use std:: task:: { Context , Poll } ;
76
87use async_std:: io:: { self , Read } ;
9- use async_std:: sync:: { channel , Arc , Receiver , Sender } ;
8+ use async_std:: sync:: Arc ;
109use byte_pool:: { Block , BytePool } ;
11- use http_types:: headers :: { HeaderName , HeaderValue } ;
10+ use http_types:: trailers :: { Trailers , TrailersSender } ;
1211
1312const INITIAL_CAPACITY : usize = 1024 * 4 ;
1413const MAX_CAPACITY : usize = 512 * 1024 * 1024 ; // 512 MiB
@@ -33,29 +32,20 @@ pub(crate) struct ChunkedDecoder<R: Read> {
3332 /// Current state.
3433 state : State ,
3534 /// Trailer channel sender.
36- trailer_sender : Sender < Vec < ( HeaderName , HeaderValue ) > > ,
37- /// Trailer channel receiver.
38- trailer_receiver : Receiver < Vec < ( HeaderName , HeaderValue ) > > ,
35+ trailer_sender : Option < TrailersSender > ,
3936}
4037
4138impl < R : Read > ChunkedDecoder < R > {
42- pub ( crate ) fn new ( inner : R ) -> Self {
43- let ( sender, receiver) = channel ( 1 ) ;
44-
39+ pub ( crate ) fn new ( inner : R , trailer_sender : TrailersSender ) -> Self {
4540 ChunkedDecoder {
4641 inner,
4742 buffer : POOL . alloc ( INITIAL_CAPACITY ) ,
4843 current : Range { start : 0 , end : 0 } ,
4944 initial_decode : false , // buffer is empty initially, nothing to decode}
5045 state : State :: Init ,
51- trailer_sender : sender,
52- trailer_receiver : receiver,
46+ trailer_sender : Some ( trailer_sender) ,
5347 }
5448 }
55-
56- pub ( crate ) fn trailer ( & self ) -> Receiver < Vec < ( HeaderName , HeaderValue ) > > {
57- self . trailer_receiver . clone ( )
58- }
5949}
6050
6151impl < R : Read + Unpin > ChunkedDecoder < R > {
@@ -94,7 +84,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
9484
9585 return Ok ( DecodeResult :: Some {
9686 read,
97- new_state,
87+ new_state : Some ( new_state ) ,
9888 new_pos,
9989 buffer,
10090 pending : false ,
@@ -115,7 +105,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
115105
116106 Ok ( DecodeResult :: Some {
117107 read,
118- new_state,
108+ new_state : Some ( new_state ) ,
119109 new_pos,
120110 buffer,
121111 pending : false ,
@@ -124,7 +114,7 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
124114 Poll :: Pending => {
125115 return Ok ( DecodeResult :: Some {
126116 read : 0 ,
127- new_state : State :: Chunk ( new_current, len) ,
117+ new_state : Some ( State :: Chunk ( new_current, len) ) ,
128118 new_pos,
129119 buffer,
130120 pending : true ,
@@ -155,14 +145,27 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
155145 decode_trailer ( buffer, pos)
156146 }
157147 State :: TrailerDone ( ref mut headers) => {
158- let headers = std:: mem:: replace ( headers, Vec :: new ( ) ) ;
159- let mut fut = Box :: pin ( self . trailer_sender . send ( headers) ) ;
160- match Pin :: new ( & mut fut) . poll ( cx) {
148+ let headers = std:: mem:: replace ( headers, Trailers :: new ( ) ) ;
149+ let sender = self . trailer_sender . take ( ) ;
150+ let sender =
151+ sender. expect ( "invalid chunked state, tried sending multiple trailers" ) ;
152+
153+ let fut = Box :: pin ( sender. send ( Ok ( headers) ) ) ;
154+ Ok ( DecodeResult :: Some {
155+ read : 0 ,
156+ new_state : Some ( State :: TrailerSending ( fut) ) ,
157+ new_pos : pos. clone ( ) ,
158+ buffer,
159+ pending : false ,
160+ } )
161+ }
162+ State :: TrailerSending ( ref mut fut) => {
163+ match Pin :: new ( fut) . poll ( cx) {
161164 Poll :: Ready ( _) => { }
162165 Poll :: Pending => {
163166 return Ok ( DecodeResult :: Some {
164167 read : 0 ,
165- new_state : self . state . clone ( ) ,
168+ new_state : None ,
166169 new_pos : pos. clone ( ) ,
167170 buffer,
168171 pending : true ,
@@ -172,15 +175,15 @@ impl<R: Read + Unpin> ChunkedDecoder<R> {
172175
173176 Ok ( DecodeResult :: Some {
174177 read : 0 ,
175- new_state : State :: Done ,
178+ new_state : Some ( State :: Done ) ,
176179 new_pos : pos. clone ( ) ,
177180 buffer,
178181 pending : false ,
179182 } )
180183 }
181184 State :: Done => Ok ( DecodeResult :: Some {
182185 read : 0 ,
183- new_state : State :: Done ,
186+ new_state : Some ( State :: Done ) ,
184187 new_pos : pos. clone ( ) ,
185188 buffer,
186189 pending : false ,
@@ -217,15 +220,23 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
217220 pending,
218221 } => {
219222 this. current = new_pos. clone ( ) ;
220- this. state = new_state;
223+ if let Some ( state) = new_state {
224+ this. state = state;
225+ }
221226
222227 if pending {
223228 // initial_decode is still true
224229 this. buffer = buffer;
225230 return Poll :: Pending ;
226231 }
227232
228- if State :: Done == this. state || read > 0 {
233+ if let State :: Done = this. state {
234+ // initial_decode is still true
235+ this. buffer = buffer;
236+ return Poll :: Ready ( Ok ( read) ) ;
237+ }
238+
239+ if read > 0 {
229240 // initial_decode is still true
230241 this. buffer = buffer;
231242 return Poll :: Ready ( Ok ( read) ) ;
@@ -281,11 +292,18 @@ impl<R: Read + Unpin> Read for ChunkedDecoder<R> {
281292 // current buffer might now contain more data inside, so we need to attempt
282293 // to decode it next time
283294 this. initial_decode = true ;
284- this. state = new_state;
295+ if let Some ( state) = new_state {
296+ this. state = state;
297+ }
285298 this. current = new_pos. clone ( ) ;
286299 n = new_pos;
287300
288- if State :: Done == this. state || read > 0 {
301+ if let State :: Done = this. state {
302+ this. buffer = new_buffer;
303+ return Poll :: Ready ( Ok ( read) ) ;
304+ }
305+
306+ if read > 0 {
289307 this. buffer = new_buffer;
290308 return Poll :: Ready ( Ok ( read) ) ;
291309 }
@@ -329,7 +347,7 @@ enum DecodeResult {
329347 /// The new range of valid data in `buffer`.
330348 new_pos : Range < usize > ,
331349 /// The new state.
332- new_state : State ,
350+ new_state : Option < State > ,
333351 /// Should poll return `Pending`.
334352 pending : bool ,
335353 } ,
@@ -338,7 +356,6 @@ enum DecodeResult {
338356}
339357
340358/// Decoder state.
341- #[ derive( Debug , PartialEq , Clone ) ]
342359enum State {
343360 /// Initial state.
344361 Init ,
@@ -349,10 +366,25 @@ enum State {
349366 /// Decoding trailers.
350367 Trailer ,
351368 /// Trailers were decoded, are now set to the decoded trailers.
352- TrailerDone ( Vec < ( HeaderName , HeaderValue ) > ) ,
369+ TrailerDone ( Trailers ) ,
370+ TrailerSending ( Pin < Box < dyn Future < Output = ( ) > + ' static + Send + Sync > > ) ,
353371 /// All is said and done.
354372 Done ,
355373}
374+ impl fmt:: Debug for State {
375+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
376+ use State :: * ;
377+ match self {
378+ Init => write ! ( f, "State::Init" ) ,
379+ Chunk ( a, b) => write ! ( f, "State::Chunk({}, {})" , a, b) ,
380+ ChunkEnd => write ! ( f, "State::ChunkEnd" ) ,
381+ Trailer => write ! ( f, "State::Trailer" ) ,
382+ TrailerDone ( trailers) => write ! ( f, "State::TrailerDone({:?})" , & trailers) ,
383+ TrailerSending ( _) => write ! ( f, "State::TrailerSending" ) ,
384+ Done => write ! ( f, "State::Done" ) ,
385+ }
386+ }
387+ }
356388
357389impl fmt:: Debug for DecodeResult {
358390 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
@@ -395,7 +427,7 @@ fn decode_init(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<DecodeR
395427 read : 0 ,
396428 buffer,
397429 new_pos,
398- new_state,
430+ new_state : Some ( new_state ) ,
399431 pending : false ,
400432 } )
401433 }
@@ -418,7 +450,7 @@ fn decode_chunk_end(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<De
418450 start : pos. start + 2 ,
419451 end : pos. end ,
420452 } ,
421- new_state : State :: Init ,
453+ new_state : Some ( State :: Init ) ,
422454 pending : false ,
423455 } ) ;
424456 }
@@ -434,21 +466,16 @@ fn decode_trailer(buffer: Block<'static>, pos: &Range<usize>) -> io::Result<Deco
434466
435467 match httparse:: parse_headers ( & buffer[ pos. start ..pos. end ] , & mut headers) {
436468 Ok ( Status :: Complete ( ( used, headers) ) ) => {
437- let headers = headers
438- . iter ( )
439- . map ( |header| {
440- // TODO: error propagation
441- let name = HeaderName :: from_str ( header. name ) . unwrap ( ) ;
442- let value =
443- HeaderValue :: from_str ( & std:: string:: String :: from_utf8_lossy ( header. value ) )
444- . unwrap ( ) ;
445- ( name, value)
446- } )
447- . collect ( ) ;
469+ let mut trailers = Trailers :: new ( ) ;
470+ for header in headers {
471+ let value = std:: string:: String :: from_utf8_lossy ( header. value ) . to_string ( ) ;
472+ trailers. insert ( header. name , value) . unwrap ( ) ;
473+ }
474+
448475 Ok ( DecodeResult :: Some {
449476 read : 0 ,
450477 buffer,
451- new_state : State :: TrailerDone ( headers ) ,
478+ new_state : Some ( State :: TrailerDone ( trailers ) ) ,
452479 new_pos : Range {
453480 start : pos. start + used,
454481 end : pos. end ,
@@ -481,7 +508,10 @@ mod tests {
481508 \r \n "
482509 . as_bytes ( ) ,
483510 ) ;
484- let mut decoder = ChunkedDecoder :: new ( input) ;
511+
512+ let ( s, _r) = async_std:: sync:: channel ( 1 ) ;
513+ let sender = TrailersSender :: new ( s) ;
514+ let mut decoder = ChunkedDecoder :: new ( input, sender) ;
485515
486516 let mut output = String :: new ( ) ;
487517 decoder. read_to_string ( & mut output) . await . unwrap ( ) ;
@@ -509,19 +539,21 @@ mod tests {
509539 \r \n "
510540 . as_bytes ( ) ,
511541 ) ;
512- let mut decoder = ChunkedDecoder :: new ( input) ;
542+ let ( s, r) = async_std:: sync:: channel ( 1 ) ;
543+ let sender = TrailersSender :: new ( s) ;
544+ let mut decoder = ChunkedDecoder :: new ( input, sender) ;
513545
514546 let mut output = String :: new ( ) ;
515547 decoder. read_to_string ( & mut output) . await . unwrap ( ) ;
516548 assert_eq ! ( output, "MozillaDeveloperNetwork" ) ;
517549
518- let trailer = decoder . trailer ( ) . recv ( ) . await ;
550+ let trailer = r . recv ( ) . await . unwrap ( ) . unwrap ( ) ;
519551 assert_eq ! (
520- trailer,
521- Some ( vec![ (
522- "Expires" . parse( ) . unwrap( ) ,
523- "Wed, 21 Oct 2015 07:28:00 GMT" . parse( ) . unwrap( ) ,
524- ) ] )
552+ trailer. iter ( ) . collect :: < Vec <_>> ( ) ,
553+ vec![ (
554+ & "Expires" . parse( ) . unwrap( ) ,
555+ & vec! [ "Wed, 21 Oct 2015 07:28:00 GMT" . parse( ) . unwrap( ) ] ,
556+ ) ]
525557 ) ;
526558 } ) ;
527559 }
0 commit comments