1+ use hyper:: body:: Buf ;
2+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
3+ use std:: sync:: Arc ;
4+ use tokio:: sync:: Notify ;
5+
6+ #[ derive( Clone ) ]
7+ pub struct EosSignaler {
8+ notifier : Arc < Notify > ,
9+ }
10+
11+ impl EosSignaler {
12+ fn notify_eos ( & self ) {
13+ self . notifier . notify_waiters ( ) ;
14+ }
15+
16+ pub async fn wait_till_eos ( self ) {
17+ self . notifier . notified ( ) . await ;
18+ }
19+ }
20+
21+ pub struct AlertOnEos < B > {
22+ inner : B ,
23+ signaler : EosSignaler ,
24+ // It'd be better if we consumed the signaler, making it inaccessible after notification.
25+ // Unfortunately, that would require something like AtomicOption.
26+ // arc_swap::ArcSwapOption was tried, but it can only return an Arc, and the pointed-to value cannot be consumed.
27+ // One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/),
28+ // but it requires both unsafe and heap allocation, which is not worth it.
29+ has_already_signaled : AtomicBool ,
30+ }
31+
32+ impl < B > AlertOnEos < B > {
33+ pub fn new ( inner : B ) -> ( Self , EosSignaler ) {
34+ let signal = EosSignaler {
35+ notifier : Arc :: new ( Notify :: new ( ) ) ,
36+ } ;
37+ let this = Self {
38+ inner,
39+ signaler : signal. clone ( ) ,
40+ has_already_signaled : AtomicBool :: new ( false ) ,
41+ } ;
42+ ( this, signal)
43+ }
44+ }
45+
46+ impl < B : Buf > Buf for AlertOnEos < B > {
47+ fn remaining ( & self ) -> usize {
48+ self . inner . remaining ( )
49+ }
50+
51+ fn chunk ( & self ) -> & [ u8 ] {
52+ self . inner . chunk ( )
53+ }
54+
55+ fn advance ( & mut self , cnt : usize ) {
56+ self . inner . advance ( cnt) ;
57+ if !self . inner . has_remaining ( ) && !self . has_already_signaled . swap ( true , Ordering :: AcqRel ) {
58+ self . signaler . notify_eos ( ) ;
59+ }
60+ }
61+ }
62+
63+ #[ cfg( test) ]
64+ mod tests {
65+
66+ }
0 commit comments