1+ use std:: marker:: PhantomData ;
12use std:: task:: { Context , Poll } ;
23
34use bytes:: { Buf , Bytes } ;
@@ -12,26 +13,21 @@ use crate::{
1213 frame:: { self , Frame , PayloadLen } ,
1314 stream:: StreamId ,
1415 } ,
15- quic:: { RecvStream , SendStream } ,
16+ quic:: { BidiStream , RecvStream , SendStream } ,
1617 stream:: WriteBuf ,
1718} ;
1819
19- pub struct FrameStream < S >
20- where
21- S : RecvStream ,
22- {
20+ pub struct FrameStream < S , B > {
2321 stream : S ,
2422 bufs : BufList < Bytes > ,
2523 decoder : FrameDecoder ,
2624 remaining_data : usize ,
2725 /// Set to true when `stream` reaches the end.
2826 is_eos : bool ,
27+ _phantom_buffer : PhantomData < B > ,
2928}
3029
31- impl < S > FrameStream < S >
32- where
33- S : RecvStream ,
34- {
30+ impl < S , B > FrameStream < S , B > {
3531 pub fn new ( stream : S ) -> Self {
3632 Self :: with_bufs ( stream, BufList :: new ( ) )
3733 }
4339 decoder : FrameDecoder :: default ( ) ,
4440 remaining_data : 0 ,
4541 is_eos : false ,
42+ _phantom_buffer : PhantomData ,
4643 }
4744 }
45+ }
4846
47+ impl < S , B > FrameStream < S , B >
48+ where
49+ S : RecvStream ,
50+ {
4951 pub fn poll_next (
5052 & mut self ,
5153 cx : & mut Context < ' _ > ,
@@ -136,9 +138,9 @@ where
136138 }
137139}
138140
139- impl < T , B > SendStream < B > for FrameStream < T >
141+ impl < T , B > SendStream < B > for FrameStream < T , B >
140142where
141- T : SendStream < B > + RecvStream ,
143+ T : SendStream < B > ,
142144 B : Buf ,
143145{
144146 type Error = <T as SendStream < B > >:: Error ;
@@ -164,6 +166,34 @@ where
164166 }
165167}
166168
169+ impl < S , B > FrameStream < S , B >
170+ where
171+ S : BidiStream < B > ,
172+ B : Buf ,
173+ {
174+ pub ( crate ) fn split ( self ) -> ( FrameStream < S :: SendStream , B > , FrameStream < S :: RecvStream , B > ) {
175+ let ( send, recv) = self . stream . split ( ) ;
176+ (
177+ FrameStream {
178+ stream : send,
179+ bufs : BufList :: new ( ) ,
180+ decoder : FrameDecoder :: default ( ) ,
181+ remaining_data : 0 ,
182+ is_eos : false ,
183+ _phantom_buffer : PhantomData ,
184+ } ,
185+ FrameStream {
186+ stream : recv,
187+ bufs : self . bufs ,
188+ decoder : self . decoder ,
189+ remaining_data : self . remaining_data ,
190+ is_eos : self . is_eos ,
191+ _phantom_buffer : PhantomData ,
192+ } ,
193+ )
194+ }
195+ }
196+
167197#[ derive( Default ) ]
168198pub struct FrameDecoder {
169199 expected : Option < usize > ,
@@ -338,7 +368,7 @@ mod tests {
338368 Frame :: headers ( & b"trailer" [ ..] ) . encode_with_payload ( & mut buf) ;
339369 recv. chunk ( buf. freeze ( ) ) ;
340370
341- let mut stream = FrameStream :: new ( recv) ;
371+ let mut stream: FrameStream < _ , ( ) > = FrameStream :: new ( recv) ;
342372
343373 assert_poll_matches ! (
344374 |mut cx| stream. poll_next( & mut cx) ,
@@ -366,7 +396,7 @@ mod tests {
366396 Frame :: headers ( & b"header" [ ..] ) . encode_with_payload ( & mut buf) ;
367397 let mut buf = buf. freeze ( ) ;
368398 recv. chunk ( buf. split_to ( buf. len ( ) - 1 ) ) ;
369- let mut stream = FrameStream :: new ( recv) ;
399+ let mut stream: FrameStream < _ , ( ) > = FrameStream :: new ( recv) ;
370400
371401 assert_poll_matches ! (
372402 |mut cx| stream. poll_next( & mut cx) ,
@@ -385,7 +415,7 @@ mod tests {
385415 FrameType :: DATA . encode ( & mut buf) ;
386416 VarInt :: from ( 4u32 ) . encode ( & mut buf) ;
387417 recv. chunk ( buf. freeze ( ) ) ;
388- let mut stream = FrameStream :: new ( recv) ;
418+ let mut stream: FrameStream < _ , ( ) > = FrameStream :: new ( recv) ;
389419
390420 assert_poll_matches ! (
391421 |mut cx| stream. poll_next( & mut cx) ,
@@ -407,7 +437,7 @@ mod tests {
407437 let mut buf = buf. freeze ( ) ;
408438 recv. chunk ( buf. split_to ( buf. len ( ) - 2 ) ) ;
409439 recv. chunk ( buf) ;
410- let mut stream = FrameStream :: new ( recv) ;
440+ let mut stream: FrameStream < _ , ( ) > = FrameStream :: new ( recv) ;
411441
412442 // We get the total size of data about to be received
413443 assert_poll_matches ! (
@@ -436,7 +466,7 @@ mod tests {
436466 VarInt :: from ( 4u32 ) . encode ( & mut buf) ;
437467 buf. put_slice ( & b"b" [ ..] ) ;
438468 recv. chunk ( buf. freeze ( ) ) ;
439- let mut stream = FrameStream :: new ( recv) ;
469+ let mut stream: FrameStream < _ , ( ) > = FrameStream :: new ( recv) ;
440470
441471 assert_poll_matches ! (
442472 |mut cx| stream. poll_next( & mut cx) ,
@@ -468,7 +498,7 @@ mod tests {
468498 Frame :: Data ( Bytes :: from ( "body" ) ) . encode_with_payload ( & mut buf) ;
469499
470500 recv. chunk ( buf. freeze ( ) ) ;
471- let mut stream = FrameStream :: new ( recv) ;
501+ let mut stream: FrameStream < _ , ( ) > = FrameStream :: new ( recv) ;
472502
473503 assert_poll_matches ! (
474504 |mut cx| stream. poll_next( & mut cx) ,
@@ -490,7 +520,7 @@ mod tests {
490520 buf. put_slice ( & b"bo" [ ..] ) ;
491521 recv. chunk ( buf. clone ( ) . freeze ( ) ) ;
492522
493- let mut stream = FrameStream :: new ( recv) ;
523+ let mut stream: FrameStream < _ , ( ) > = FrameStream :: new ( recv) ;
494524
495525 assert_poll_matches ! (
496526 |mut cx| stream. poll_next( & mut cx) ,
0 commit comments