@@ -83,11 +83,27 @@ impl StoreEventListener {
8383 }
8484}
8585
86+ #[ async_trait]
87+ trait EventSink : Send + Sync {
88+ async fn send ( & self , event : Arc < StoreEvent > ) -> Result < ( ) , Error > ;
89+ fn is_closed ( & self ) -> bool ;
90+ }
91+
92+ #[ async_trait]
93+ impl EventSink for Sender < Arc < StoreEvent > > {
94+ async fn send ( & self , event : Arc < StoreEvent > ) -> Result < ( ) , Error > {
95+ Ok ( self . send ( event) . await ?)
96+ }
97+
98+ fn is_closed ( & self ) -> bool {
99+ self . is_closed ( )
100+ }
101+ }
102+
86103/// Manage subscriptions to the `StoreEvent` stream. Keep a list of
87104/// currently active subscribers and forward new events to each of them
88105pub struct SubscriptionManager {
89- subscriptions :
90- Arc < RwLock < HashMap < String , ( Arc < Vec < SubscriptionFilter > > , Sender < Arc < StoreEvent > > ) > > > ,
106+ subscriptions : Arc < RwLock < HashMap < String , ( Arc < Vec < SubscriptionFilter > > , Arc < dyn EventSink > ) > > > ,
91107
92108 /// Keep the notification listener alive
93109 listener : StoreEventListener ,
@@ -182,7 +198,7 @@ impl SubscriptionManagerTrait for SubscriptionManager {
182198 self . subscriptions
183199 . write ( )
184200 . unwrap ( )
185- . insert ( id, ( Arc :: new ( entities. clone ( ) ) , sender) ) ;
201+ . insert ( id, ( Arc :: new ( entities. clone ( ) ) , Arc :: new ( sender) ) ) ;
186202
187203 // Return the subscription ID and entity change stream
188204 StoreEventStream :: new ( Box :: new ( ReceiverStream :: new ( receiver) . map ( Ok ) . compat ( ) ) )
0 commit comments