@@ -3,24 +3,28 @@ use std::sync::atomic::{AtomicBool, Ordering};
33use std:: sync:: Arc ;
44use tokio:: sync:: Notify ;
55
6- #[ derive( Clone ) ]
6+ /// Signaler returned as part of `NotifyOnEos::new` that can be polled to receive information,
7+ /// when the buffer gets advanced to the end.
8+ // Cannot be Clone due to usage of `Notify::notify_one` in `NotifyOnEos::advance`,
9+ // revisit once `Notify::notify_all` stabilizes.
710pub struct EosSignaler {
811 notifier : Arc < Notify > ,
912}
1013
1114impl EosSignaler {
12- fn notify_eos ( & self ) {
13- self . notifier . notify_waiters ( ) ;
14- }
15-
1615 pub async fn wait_till_eos ( self ) {
1716 self . notifier . notified ( ) . await ;
1817 }
1918}
2019
21- pub struct AlertOnEos < B > {
20+ /// Wrapper for `bytes::Buf` that returns a `EosSignaler` that can be polled to receive information,
21+ /// when the buffer gets advanced to the end.
22+ ///
23+ /// NOTE: For the notification to work, caller must ensure that `Buf::advance` gets called
24+ /// enough times to advance to the end of the buffer (so that `Buf::has_remaining` afterwards returns `0`).
25+ pub struct NotifyOnEos < B > {
2226 inner : B ,
23- signaler : EosSignaler ,
27+ notifier : Arc < Notify > ,
2428 // It'd be better if we consumed the signaler, making it inaccessible after notification.
2529 // Unfortunately, that would require something like AtomicOption.
2630 // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
@@ -29,21 +33,20 @@ pub struct AlertOnEos<B> {
2933 has_already_signaled : AtomicBool ,
3034}
3135
32- impl < B > AlertOnEos < B > {
36+ impl < B > NotifyOnEos < B > {
3337 pub fn new ( inner : B ) -> ( Self , EosSignaler ) {
34- let signal = EosSignaler {
35- notifier : Arc :: new ( Notify :: new ( ) ) ,
36- } ;
38+ let notifier = Arc :: new ( Notify :: new ( ) ) ;
3739 let this = Self {
3840 inner,
39- signaler : signal . clone ( ) ,
41+ notifier : notifier . clone ( ) ,
4042 has_already_signaled : AtomicBool :: new ( false ) ,
4143 } ;
44+ let signal = EosSignaler { notifier } ;
4245 ( this, signal)
4346 }
4447}
4548
46- impl < B : Buf > Buf for AlertOnEos < B > {
49+ impl < B : Buf > Buf for NotifyOnEos < B > {
4750 fn remaining ( & self ) -> usize {
4851 self . inner . remaining ( )
4952 }
@@ -55,22 +58,36 @@ impl<B: Buf> Buf for AlertOnEos<B> {
5558 fn advance ( & mut self , cnt : usize ) {
5659 self . inner . advance ( cnt) ;
5760 if !self . inner . has_remaining ( ) && !self . has_already_signaled . swap ( true , Ordering :: AcqRel ) {
58- self . signaler . notify_eos ( ) ;
61+ // tokio::sync::Notify has private method `notify_all` that, once stabilized,
62+ // would allow us to make `EosSignaler` Cloneable with better ergonomics
63+ // to await EOS from multiple places.
64+ self . notifier . notify_one ( ) ;
5965 }
6066 }
6167}
6268
6369#[ cfg( test) ]
6470mod tests {
65- use crate :: common:: buf:: AlertOnEos ;
66- use hyper:: body:: Bytes ;
71+ use crate :: common:: buf:: NotifyOnEos ;
72+ use hyper:: body:: { Buf , Bytes } ;
6773 use std:: time:: Duration ;
6874
6975 #[ tokio:: test]
70- async fn test_get_notified ( ) {
71- let buf = Bytes :: from_static ( b"" ) ;
72- let ( _buf, signaler) = AlertOnEos :: new ( buf) ;
73- let result = tokio:: time:: timeout ( Duration :: from_secs ( 1 ) , signaler. wait_till_eos ( ) ) . await ;
74- assert_eq ! ( result, Ok ( ( ) ) ) ;
76+ async fn test_get_notified_immediately ( ) {
77+ let buf = Bytes :: from_static ( b"abc" ) ;
78+ let ( mut buf, signaler) = NotifyOnEos :: new ( buf) ;
79+ buf. advance ( 3 ) ;
80+ signaler. wait_till_eos ( ) . await ;
81+ }
82+
83+ #[ tokio:: test]
84+ async fn test_get_notified_after_1ms ( ) {
85+ let buf = Bytes :: from_static ( b"abc" ) ;
86+ let ( mut buf, signaler) = NotifyOnEos :: new ( buf) ;
87+ tokio:: spawn ( async move {
88+ tokio:: time:: sleep ( Duration :: from_millis ( 1 ) ) . await ;
89+ buf. advance ( 3 ) ;
90+ } ) ;
91+ signaler. wait_till_eos ( ) . await ;
7592 }
7693}
0 commit comments