@@ -35,76 +35,60 @@ use crate::metrics::network_metrics;
3535use crate :: metrics:: server_metrics;
3636
3737pub type WatcherId = i64 ;
38- pub type WatcherStreamSender = Sender < Result < WatchResponse , Status > > ;
3938
40- type CreateWatcherEvent = ( WatchRequest , WatcherStreamSender ) ;
39+ /// A sender for dispatcher to send event to interested watchers.
40+ pub type WatcherSender = Sender < Result < WatchResponse , Status > > ;
4141
42+ /// A sender for event source, such as raft state machine, to send event to [`EventDispatcher`].
4243#[ derive( Clone , Debug ) ]
43- pub struct WatcherStateMachineSubscriber {
44- event_tx : mpsc:: UnboundedSender < WatcherEvent > ,
45- }
44+ pub ( crate ) struct DispatcherSender ( pub ( crate ) mpsc:: UnboundedSender < WatchEvent > ) ;
4645
4746#[ derive( Clone ) ]
48- pub enum WatcherEvent {
49- CreateWatcherEvent ( CreateWatcherEvent ) ,
50- StateMachineKvDataEvent ( Change < Vec < u8 > , String > ) ,
51- }
52-
53- #[ derive( Debug ) ]
54- pub struct WatcherManager {
55- event_tx : mpsc:: UnboundedSender < WatcherEvent > ,
56-
57- pub subscriber : WatcherStateMachineSubscriber ,
47+ pub enum WatchEvent {
48+ AddWatcher ( ( WatchRequest , WatcherSender ) ) ,
49+ KVChange ( Change < Vec < u8 > , String > ) ,
5850}
5951
60- struct WatcherManagerCore {
61- event_rx : mpsc:: UnboundedReceiver < WatcherEvent > ,
52+ /// Receives events from event sources, dispatches them to interested watchers.
53+ pub ( crate ) struct EventDispatcher {
54+ event_rx : mpsc:: UnboundedReceiver < WatchEvent > ,
6255
6356 /// map range to WatcherId
6457 watcher_range_map : RangeMap < String , WatcherId , WatcherStream > ,
6558
6659 current_watcher_id : WatcherId ,
6760}
6861
69- impl WatcherManager {
70- pub fn create ( ) -> Self {
62+ impl EventDispatcher {
63+ /// Spawn a dispatcher loop task.
64+ pub ( crate ) fn spawn ( ) -> mpsc:: UnboundedSender < WatchEvent > {
7165 let ( event_tx, event_rx) = mpsc:: unbounded_channel ( ) ;
7266
73- let core = WatcherManagerCore {
67+ let dispatcher = EventDispatcher {
7468 event_rx,
7569 watcher_range_map : RangeMap :: new ( ) ,
7670 current_watcher_id : 1 ,
7771 } ;
7872
79- let _h = tokio:: spawn ( core . watcher_manager_main ( ) ) ;
73+ let _h = tokio:: spawn ( dispatcher . main ( ) ) ;
8074
81- WatcherManager {
82- event_tx : event_tx. clone ( ) ,
83- subscriber : WatcherStateMachineSubscriber { event_tx } ,
84- }
85- }
86-
87- pub fn create_watcher_stream ( & self , request : WatchRequest , tx : WatcherStreamSender ) {
88- let create: CreateWatcherEvent = ( request, tx) ;
89- let _ = self . event_tx . send ( WatcherEvent :: CreateWatcherEvent ( create) ) ;
75+ event_tx
9076 }
91- }
9277
93- impl WatcherManagerCore {
9478 #[ tracing:: instrument( level = "trace" , skip( self ) ) ]
95- async fn watcher_manager_main ( mut self ) {
79+ async fn main ( mut self ) {
9680 loop {
9781 if let Some ( event) = self . event_rx . recv ( ) . await {
9882 match event {
99- WatcherEvent :: CreateWatcherEvent ( ( req, tx) ) => {
100- self . create_watcher_stream ( req, tx) . await ;
83+ WatchEvent :: AddWatcher ( ( req, tx) ) => {
84+ self . add_watcher ( req, tx) . await ;
10185 }
102- WatcherEvent :: StateMachineKvDataEvent ( kv_change) => {
103- self . notify_event ( kv_change) . await ;
86+ WatchEvent :: KVChange ( kv_change) => {
87+ self . dispatch_event ( kv_change) . await ;
10488 }
10589 }
10690 } else {
107- info ! ( "watcher manager has been shutdown " ) ;
91+ info ! ( "all event senders are closed. quit. " ) ;
10892 break ;
10993 }
11094 }
@@ -117,7 +101,8 @@ impl WatcherManagerCore {
117101 server_metrics:: incr_watchers ( -1 ) ;
118102 }
119103
120- async fn notify_event ( & mut self , change : Change < Vec < u8 > , String > ) {
104+ /// Dispatch a kv change event to interested watchers.
105+ async fn dispatch_event ( & mut self , change : Change < Vec < u8 > , String > ) {
121106 let k = change. ident . as_ref ( ) . unwrap ( ) ;
122107 let set = self . watcher_range_map . get_by_point ( k) ;
123108 if set. is_empty ( ) {
@@ -165,23 +150,24 @@ impl WatcherManagerCore {
165150 } ;
166151 }
167152
153+ // TODO: when a watcher stream is dropped, send a event to remove the watcher explicitly
168154 for range_key in remove_range_keys {
169155 self . close_stream ( range_key) ;
170156 }
171157 }
172158
173159 #[ tracing:: instrument( level = "debug" , skip( self ) ) ]
174- pub async fn create_watcher_stream ( & mut self , create : WatchRequest , tx : WatcherStreamSender ) {
160+ pub async fn add_watcher ( & mut self , create : WatchRequest , tx : WatcherSender ) {
175161 info ! ( "create_watcher_stream: {:?}" , create) ;
176162
177- let range = match WatcherManagerCore :: get_range_key ( create. key . clone ( ) , & create. key_end ) {
163+ let range = match EventDispatcher :: get_range_key ( create. key . clone ( ) , & create. key_end ) {
178164 Ok ( range) => range,
179165 Err ( _) => return ,
180166 } ;
181167
182168 self . current_watcher_id += 1 ;
183169 let watcher_id = self . current_watcher_id ;
184- let filter = create. filter_type ( ) ;
170+ let filter: FilterType = create. filter_type ( ) ;
185171
186172 let watcher_stream = WatcherStream :: new (
187173 watcher_id,
@@ -210,10 +196,8 @@ impl WatcherManagerCore {
210196 }
211197}
212198
213- impl StateMachineSubscriber for WatcherStateMachineSubscriber {
199+ impl StateMachineSubscriber for DispatcherSender {
214200 fn kv_changed ( & self , change : Change < Vec < u8 > , String > ) {
215- let _ = self
216- . event_tx
217- . send ( WatcherEvent :: StateMachineKvDataEvent ( change) ) ;
201+ let _ = self . 0 . send ( WatchEvent :: KVChange ( change) ) ;
218202 }
219203}
0 commit comments