11use futures03:: TryStreamExt ;
2+ use graph:: parking_lot:: Mutex ;
23use graph:: tokio_stream:: wrappers:: ReceiverStream ;
4+ use std:: collections:: BTreeSet ;
35use std:: sync:: { atomic:: Ordering , Arc , RwLock } ;
46use std:: { collections:: HashMap , sync:: atomic:: AtomicUsize } ;
57use tokio:: sync:: mpsc:: { channel, Sender } ;
@@ -84,38 +86,46 @@ impl StoreEventListener {
8486 }
8587}
8688
87- #[ async_trait]
88- trait EventSink : Send + Sync {
89- async fn send ( & self , event : Arc < StoreEvent > ) -> Result < ( ) , Error > ;
90- fn is_closed ( & self ) -> bool ;
89+ struct Watcher < T > {
90+ sender : Arc < watch:: Sender < T > > ,
91+ receiver : watch:: Receiver < T > ,
9192}
9293
93- #[ async_trait]
94- impl EventSink for Sender < Arc < StoreEvent > > {
95- async fn send ( & self , event : Arc < StoreEvent > ) -> Result < ( ) , Error > {
96- Ok ( self . send ( event) . await ?)
94+ impl < T : Clone + Debug + Send + Sync + ' static > Watcher < T > {
95+ fn new ( init : T ) -> Self {
96+ let ( sender, receiver) = watch:: channel ( init) ;
97+ Watcher {
98+ sender : Arc :: new ( sender) ,
99+ receiver,
100+ }
97101 }
98102
99- fn is_closed ( & self ) -> bool {
100- self . is_closed ( )
103+ fn send ( & self , v : T ) {
104+ // Unwrap: `self` holds a receiver.
105+ self . sender . send ( v) . unwrap ( )
101106 }
102- }
103107
104- # [ async_trait ]
105- impl EventSink for watch :: Sender < ( ) > {
106- async fn send ( & self , _event : Arc < StoreEvent > ) -> Result < ( ) , Error > {
107- Ok ( self . send ( ( ) ) ? )
108+ fn stream ( & self ) -> Box < dyn futures03 :: Stream < Item = T > + Unpin + Send + Sync > {
109+ Box :: new ( tokio_stream :: wrappers :: WatchStream :: new (
110+ self . receiver . clone ( ) ,
111+ ) )
108112 }
109113
110- fn is_closed ( & self ) -> bool {
111- self . is_closed ( )
114+ /// Outstanding receivers returned from `Self::stream`.
115+ fn receiver_count ( & self ) -> usize {
116+ // Do not count the internal receiver.
117+ self . sender . receiver_count ( ) - 1
112118 }
113119}
114120
115121/// Manage subscriptions to the `StoreEvent` stream. Keep a list of
116122/// currently active subscribers and forward new events to each of them
117123pub struct SubscriptionManager {
118- subscriptions : Arc < RwLock < HashMap < String , ( Arc < Vec < SubscriptionFilter > > , Arc < dyn EventSink > ) > > > ,
124+ // These are more efficient since only one entry is stored per filter.
125+ subscriptions_no_payload : Arc < Mutex < HashMap < BTreeSet < SubscriptionFilter > , Watcher < ( ) > > > > ,
126+
127+ subscriptions :
128+ Arc < RwLock < HashMap < String , ( Arc < BTreeSet < SubscriptionFilter > > , Sender < Arc < StoreEvent > > ) > > > ,
119129
120130 /// Keep the notification listener alive
121131 listener : StoreEventListener ,
@@ -126,6 +136,7 @@ impl SubscriptionManager {
126136 let ( listener, store_events) = StoreEventListener :: new ( logger, postgres_url, registry) ;
127137
128138 let mut manager = SubscriptionManager {
139+ subscriptions_no_payload : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
129140 subscriptions : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
130141 listener,
131142 } ;
@@ -146,61 +157,101 @@ impl SubscriptionManager {
146157 & self ,
147158 store_events : Box < dyn Stream < Item = StoreEvent , Error = ( ) > + Send > ,
148159 ) {
149- let subscriptions = self . subscriptions . clone ( ) ;
160+ let subscriptions = self . subscriptions . cheap_clone ( ) ;
161+ let subscriptions_no_payload = self . subscriptions_no_payload . cheap_clone ( ) ;
150162 let mut store_events = store_events. compat ( ) ;
151163
152164 // This channel is constantly receiving things and there are locks involved,
153165 // so it's best to use a blocking task.
154166 graph:: spawn_blocking ( async move {
155167 while let Some ( Ok ( event) ) = store_events. next ( ) . await {
156- let senders = subscriptions. read ( ) . unwrap ( ) . clone ( ) ;
157168 let event = Arc :: new ( event) ;
158169
159- // Write change to all matching subscription streams; remove subscriptions
160- // whose receiving end has been dropped
161- for ( id, ( _, sender) ) in senders
162- . iter ( )
163- . filter ( |( _, ( filter, _) ) | event. matches ( filter) )
170+ // Send to `subscriptions`.
164171 {
165- if sender. send ( event. cheap_clone ( ) ) . await . is_err ( ) {
166- // Receiver was dropped
167- subscriptions. write ( ) . unwrap ( ) . remove ( id) ;
172+ let senders = subscriptions. read ( ) . unwrap ( ) . clone ( ) ;
173+
174+ // Write change to all matching subscription streams; remove subscriptions
175+ // whose receiving end has been dropped
176+ for ( id, ( _, sender) ) in senders
177+ . iter ( )
178+ . filter ( |( _, ( filter, _) ) | event. matches ( filter) )
179+ {
180+ if sender. send ( event. cheap_clone ( ) ) . await . is_err ( ) {
181+ // Receiver was dropped
182+ subscriptions. write ( ) . unwrap ( ) . remove ( id) ;
183+ }
184+ }
185+ }
186+
187+ // Send to `subscriptions_no_payload`.
188+ {
189+ let watchers = subscriptions_no_payload. lock ( ) ;
190+
191+ // Write change to all matching subscription streams
192+ for ( _, watcher) in watchers. iter ( ) . filter ( |( filter, _) | event. matches ( filter) )
193+ {
194+ watcher. send ( ( ) ) ;
168195 }
169196 }
170197 }
171198 } ) ;
172199 }
173200
174201 fn periodically_clean_up_stale_subscriptions ( & self ) {
175- let subscriptions = self . subscriptions . clone ( ) ;
202+ let subscriptions = self . subscriptions . cheap_clone ( ) ;
203+ let subscriptions_no_payload = self . subscriptions_no_payload . cheap_clone ( ) ;
176204
177205 // Clean up stale subscriptions every 5s
178206 graph:: spawn ( async move {
179207 let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 5 ) ) ;
180208 loop {
181209 interval. tick ( ) . await ;
182- let mut subscriptions = subscriptions. write ( ) . unwrap ( ) ;
183-
184- // Obtain IDs of subscriptions whose receiving end has gone
185- let stale_ids = subscriptions
186- . iter_mut ( )
187- . filter_map ( |( id, ( _, sender) ) | match sender. is_closed ( ) {
188- true => Some ( id. clone ( ) ) ,
189- false => None ,
190- } )
191- . collect :: < Vec < _ > > ( ) ;
192-
193- // Remove all stale subscriptions
194- for id in stale_ids {
195- subscriptions. remove ( & id) ;
210+
211+ // Cleanup `subscriptions`.
212+ {
213+ let mut subscriptions = subscriptions. write ( ) . unwrap ( ) ;
214+
215+ // Obtain IDs of subscriptions whose receiving end has gone
216+ let stale_ids = subscriptions
217+ . iter_mut ( )
218+ . filter_map ( |( id, ( _, sender) ) | match sender. is_closed ( ) {
219+ true => Some ( id. clone ( ) ) ,
220+ false => None ,
221+ } )
222+ . collect :: < Vec < _ > > ( ) ;
223+
224+ // Remove all stale subscriptions
225+ for id in stale_ids {
226+ subscriptions. remove ( & id) ;
227+ }
228+ }
229+
230+ // Cleanup `subscriptions_no_payload`.
231+ {
232+ let mut subscriptions = subscriptions_no_payload. lock ( ) ;
233+
234+ // Obtain IDs of subscriptions whose receiving end has gone
235+ let stale_ids = subscriptions
236+ . iter_mut ( )
237+ . filter_map ( |( id, watcher) | match watcher. receiver_count ( ) == 0 {
238+ true => Some ( id. clone ( ) ) ,
239+ false => None ,
240+ } )
241+ . collect :: < Vec < _ > > ( ) ;
242+
243+ // Remove all stale subscriptions
244+ for id in stale_ids {
245+ subscriptions. remove ( & id) ;
246+ }
196247 }
197248 }
198249 } ) ;
199250 }
200251}
201252
202253impl SubscriptionManagerTrait for SubscriptionManager {
203- fn subscribe ( & self , entities : Vec < SubscriptionFilter > ) -> StoreEventStreamBox {
254+ fn subscribe ( & self , entities : BTreeSet < SubscriptionFilter > ) -> StoreEventStreamBox {
204255 let id = Uuid :: new_v4 ( ) . to_string ( ) ;
205256
206257 // Prepare the new subscription by creating a channel and a subscription object
@@ -210,23 +261,18 @@ impl SubscriptionManagerTrait for SubscriptionManager {
210261 self . subscriptions
211262 . write ( )
212263 . unwrap ( )
213- . insert ( id, ( Arc :: new ( entities. clone ( ) ) , Arc :: new ( sender) ) ) ;
264+ . insert ( id, ( Arc :: new ( entities. clone ( ) ) , sender) ) ;
214265
215266 // Return the subscription ID and entity change stream
216267 StoreEventStream :: new ( Box :: new ( ReceiverStream :: new ( receiver) . map ( Ok ) . compat ( ) ) )
217268 . filter_by_entities ( entities)
218269 }
219270
220- fn subscribe_no_payload ( & self , entities : Vec < SubscriptionFilter > ) -> UnitStream {
221- let id = Uuid :: new_v4 ( ) . to_string ( ) ;
222-
223- let ( sender, receiver) = watch:: channel ( ( ) ) ;
224-
225- self . subscriptions
226- . write ( )
227- . unwrap ( )
228- . insert ( id, ( Arc :: new ( entities. clone ( ) ) , Arc :: new ( sender) ) ) ;
229-
230- Box :: new ( tokio_stream:: wrappers:: WatchStream :: new ( receiver) )
271+ fn subscribe_no_payload ( & self , entities : BTreeSet < SubscriptionFilter > ) -> UnitStream {
272+ self . subscriptions_no_payload
273+ . lock ( )
274+ . entry ( entities)
275+ . or_insert_with ( || Watcher :: new ( ( ) ) )
276+ . stream ( )
231277 }
232278}
0 commit comments