@@ -14,21 +14,23 @@ use std::ops::DerefMut;
1414use std:: sync:: Arc ;
1515use std:: sync:: Mutex ;
1616
17+ use dashmap:: DashMap ;
1718use tokio:: sync:: mpsc;
1819use tokio:: sync:: mpsc:: error:: SendError ;
20+ use uuid:: Uuid ;
1921
20- use crate :: dashmap :: DashMap ;
22+ use crate :: ActorId ;
2123
2224/// A client's re-ordering buffer state.
2325struct BufferState < T > {
2426 /// the last sequence number sent to receiver for this client. seq starts
2527 /// with 1 and 0 mean no message has been sent.
26- last_seq : usize ,
28+ last_seq : u64 ,
2729 /// Buffer out-of-order messages in order to ensures messages are delivered
2830 /// strictly in per-client sequence order.
2931 ///
3032 /// Map's key is seq_no, value is msg.
31- buffer : HashMap < usize , T > ,
33+ buffer : HashMap < u64 , T > ,
3234}
3335
3436impl < T > Default for BufferState < T > {
@@ -43,9 +45,8 @@ impl<T> Default for BufferState<T> {
4345/// A sender that ensures messages are delivered in per-client sequence order.
4446pub ( crate ) struct OrderedSender < T > {
4547 tx : mpsc:: UnboundedSender < T > ,
46- // map's key is name client which sens messages through this channel. Map's
47- // value is the buffer state of that client.
48- states : Arc < DashMap < String , Arc < Mutex < BufferState < T > > > > > ,
48+ /// Map's key is session ID, and value is the buffer state of that session.
49+ states : Arc < DashMap < Uuid , Arc < Mutex < BufferState < T > > > > > ,
4950 pub ( crate ) enable_buffering : bool ,
5051 /// The identify of this object, which is used to distiguish it in debugging.
5152 log_id : String ,
@@ -98,8 +99,8 @@ impl<T> OrderedSender<T> {
9899 /// * calls from different clients will be executed concurrently.
99100 pub ( crate ) fn send (
100101 & self ,
101- client : String ,
102- seq_no : usize ,
102+ session_id : Uuid ,
103+ seq_no : u64 ,
103104 msg : T ,
104105 ) -> Result < ( ) , OrderedSenderError < T > > {
105106 use std:: cmp:: Ordering ;
@@ -109,25 +110,17 @@ impl<T> OrderedSender<T> {
109110 return Err ( OrderedSenderError :: InvalidZeroSeq ( msg) ) ;
110111 }
111112
112- // Make sure only this client's state is locked, not all states.
113- let state = match self . states . get ( & client) {
114- Some ( state) => state. value ( ) . clone ( ) ,
115- None => self
116- . states
117- . entry ( client. clone ( ) )
118- . or_default ( )
119- . value ( )
120- . clone ( ) ,
121- } ;
113+ // Make sure only this session's state is locked, not all states.
114+ let state = self . states . entry ( session_id) . or_default ( ) . value ( ) . clone ( ) ;
122115 let mut state_guard = state. lock ( ) . unwrap ( ) ;
123116 let BufferState { last_seq, buffer } = state_guard. deref_mut ( ) ;
124117
125118 match seq_no. cmp ( & ( * last_seq + 1 ) ) {
126119 Ordering :: Less => {
127120 tracing:: warn!(
128- "{} duplicate message from {} with seq no: {}" ,
121+ "{} duplicate message from session {} with seq no: {}" ,
129122 self . log_id,
130- client ,
123+ session_id ,
131124 seq_no,
132125 ) ;
133126 }
@@ -176,9 +169,49 @@ impl<T> OrderedSender<T> {
176169 }
177170}
178171
172+ /// Used by sender to track the message sequence numbers it sends to each actor.
173+ /// Each [Sequencer] object has a session id, sequencer numbers are scoped by
174+ /// the (session_id, destination_actor) pair.
175+ #[ derive( Clone , Debug ) ]
176+ pub struct Sequencer {
177+ session_id : Uuid ,
178+ // map's key is the destination actor's name, value is the last seq number
179+ // sent to that actor.
180+ last_seqs : Arc < Mutex < HashMap < ActorId , u64 > > > ,
181+ }
182+
183+ impl Sequencer {
184+ pub ( crate ) fn new ( session_id : Uuid ) -> Self {
185+ Self {
186+ session_id,
187+ last_seqs : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
188+ }
189+ }
190+
191+ /// Assign the next seq for the given actor ID, mutate the sequencer with
192+ /// the new seq, and return the new seq.
193+ pub fn assign_seq ( & self , actor_id : & ActorId ) -> u64 {
194+ let mut guard = self . last_seqs . lock ( ) . unwrap ( ) ;
195+ let mut_ref = match guard. get_mut ( actor_id) {
196+ Some ( m) => m,
197+ None => guard. entry ( actor_id. clone ( ) ) . or_default ( ) ,
198+ } ;
199+ * mut_ref += 1 ;
200+ * mut_ref
201+ }
202+
203+ /// Id of the session this sequencer belongs to.
204+ pub fn session_id ( & self ) -> Uuid {
205+ self . session_id
206+ }
207+ }
208+
179209#[ cfg( test) ]
180210mod tests {
211+ use std:: sync:: Arc ;
212+
181213 use super :: * ;
214+ use crate :: id;
182215
183216 fn drain_try_recv < T : std:: fmt:: Debug + Clone > ( rx : & mut mpsc:: UnboundedReceiver < T > ) -> Vec < T > {
184217 let mut out = Vec :: new ( ) ;
@@ -190,26 +223,28 @@ mod tests {
190223
191224 #[ test]
192225 fn test_ordered_channel_single_client_send_in_order ( ) {
193- let ( tx, mut rx) = ordered_channel :: < usize > ( "test" . to_string ( ) , true ) ;
226+ let session_id_a = Uuid :: now_v7 ( ) ;
227+ let ( tx, mut rx) = ordered_channel :: < u64 > ( "test" . to_string ( ) , true ) ;
194228 for s in 1 ..=10 {
195- tx. send ( "A" . into ( ) , s, s) . unwrap ( ) ;
229+ tx. send ( session_id_a , s, s) . unwrap ( ) ;
196230 let got = drain_try_recv ( & mut rx) ;
197231 assert_eq ! ( got, vec![ s] ) ;
198232 }
199233 }
200234
201235 #[ test]
202236 fn test_ordered_channel_single_client_send_out_of_order ( ) {
203- let ( tx, mut rx) = ordered_channel :: < usize > ( "test" . to_string ( ) , true ) ;
237+ let session_id_a = Uuid :: now_v7 ( ) ;
238+ let ( tx, mut rx) = ordered_channel :: < u64 > ( "test" . to_string ( ) , true ) ;
204239
205240 // Send 2 to 4 in descending order: all should buffer until 1 arrives.
206241 for s in ( 2 ..=4 ) . rev ( ) {
207- tx. send ( "A" . into ( ) , s, s) . unwrap ( ) ;
242+ tx. send ( session_id_a , s, s) . unwrap ( ) ;
208243 }
209244
210245 // Send 7 to 9 in descending order: all should buffer until 1 - 6 arrives.
211246 for s in ( 7 ..=9 ) . rev ( ) {
212- tx. send ( "A" . into ( ) , s, s) . unwrap ( ) ;
247+ tx. send ( session_id_a , s, s) . unwrap ( ) ;
213248 }
214249
215250 assert ! (
@@ -218,127 +253,175 @@ mod tests {
218253 ) ;
219254
220255 // Now send 1: should deliver 1 then flush 2 - 4.
221- tx. send ( "A" . into ( ) , 1 , 1 ) . unwrap ( ) ;
256+ tx. send ( session_id_a , 1 , 1 ) . unwrap ( ) ;
222257 assert_eq ! ( drain_try_recv( & mut rx) , vec![ 1 , 2 , 3 , 4 ] ) ;
223258
224259 // Now send 5: should deliver immediately but not flush 7 - 9.
225- tx. send ( "A" . into ( ) , 5 , 5 ) . unwrap ( ) ;
260+ tx. send ( session_id_a , 5 , 5 ) . unwrap ( ) ;
226261 assert_eq ! ( drain_try_recv( & mut rx) , vec![ 5 ] ) ;
227262
228263 // Now send 6: should deliver 6 then flush 7 - 9.
229- tx. send ( "A" . into ( ) , 6 , 6 ) . unwrap ( ) ;
264+ tx. send ( session_id_a , 6 , 6 ) . unwrap ( ) ;
230265 assert_eq ! ( drain_try_recv( & mut rx) , vec![ 6 , 7 , 8 , 9 ] ) ;
231266
232267 // Send 10: should deliver immediately.
233- tx. send ( "A" . into ( ) , 10 , 10 ) . unwrap ( ) ;
268+ tx. send ( session_id_a , 10 , 10 ) . unwrap ( ) ;
234269 let got = drain_try_recv ( & mut rx) ;
235270 assert_eq ! ( got, vec![ 10 ] ) ;
236271 }
237272
238273 #[ test]
239274 fn test_ordered_channel_multi_clients ( ) {
240- let ( tx, mut rx) = ordered_channel :: < ( String , usize ) > ( "test" . to_string ( ) , true ) ;
275+ let session_id_a = Uuid :: now_v7 ( ) ;
276+ let session_id_b = Uuid :: now_v7 ( ) ;
277+ let ( tx, mut rx) = ordered_channel :: < ( Uuid , u64 ) > ( "test" . to_string ( ) , true ) ;
241278
242279 // A1 -> deliver
243- tx. send ( "A" . into ( ) , 1 , ( "A" . into ( ) , 1 ) ) . unwrap ( ) ;
244- assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( "A" . into ( ) , 1 ) ] ) ;
280+ tx. send ( session_id_a , 1 , ( session_id_a , 1 ) ) . unwrap ( ) ;
281+ assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( session_id_a , 1 ) ] ) ;
245282 // B1 -> deliver
246- tx. send ( "B" . into ( ) , 1 , ( "B" . into ( ) , 1 ) ) . unwrap ( ) ;
247- assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( "B" . into ( ) , 1 ) ] ) ;
283+ tx. send ( session_id_b , 1 , ( session_id_b , 1 ) ) . unwrap ( ) ;
284+ assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( session_id_b , 1 ) ] ) ;
248285 for s in ( 3 ..=5 ) . rev ( ) {
249286 // A3-5 -> buffer (waiting for A2)
250- tx. send ( "A" . into ( ) , s, ( "A" . into ( ) , s) ) . unwrap ( ) ;
287+ tx. send ( session_id_a , s, ( session_id_a , s) ) . unwrap ( ) ;
251288 // B3-5 -> buffer (waiting for B2)
252- tx. send ( "B" . into ( ) , s, ( "B" . into ( ) , s) ) . unwrap ( ) ;
289+ tx. send ( session_id_b , s, ( session_id_b , s) ) . unwrap ( ) ;
253290 }
254291 for s in ( 7 ..=9 ) . rev ( ) {
255292 // A7-9 -> buffer (waiting for A1-6)
256- tx. send ( "A" . into ( ) , s, ( "A" . into ( ) , s) ) . unwrap ( ) ;
293+ tx. send ( session_id_a , s, ( session_id_a , s) ) . unwrap ( ) ;
257294 // B7-9 -> buffer (waiting for B1-6)
258- tx. send ( "B" . into ( ) , s, ( "B" . into ( ) , s) ) . unwrap ( ) ;
295+ tx. send ( session_id_b , s, ( session_id_b , s) ) . unwrap ( ) ;
259296 }
260297 assert ! (
261298 drain_try_recv( & mut rx) . is_empty( ) ,
262299 "nothing should be delivered yet"
263300 ) ;
264301
265302 // A2 -> deliver A2 then flush A3
266- tx. send ( "A" . into ( ) , 2 , ( "A" . into ( ) , 2 ) ) . unwrap ( ) ;
303+ tx. send ( session_id_a , 2 , ( session_id_a , 2 ) ) . unwrap ( ) ;
267304 assert_eq ! (
268305 drain_try_recv( & mut rx) ,
269306 vec![
270- ( "A" . into ( ) , 2 ) ,
271- ( "A" . into ( ) , 3 ) ,
272- ( "A" . into ( ) , 4 ) ,
273- ( "A" . into ( ) , 5 ) ,
307+ ( session_id_a , 2 ) ,
308+ ( session_id_a , 3 ) ,
309+ ( session_id_a , 4 ) ,
310+ ( session_id_a , 5 ) ,
274311 ]
275312 ) ;
276313 // B2 -> deliver B2 then flush B3
277- tx. send ( "B" . into ( ) , 2 , ( "B" . into ( ) , 2 ) ) . unwrap ( ) ;
314+ tx. send ( session_id_b , 2 , ( session_id_b , 2 ) ) . unwrap ( ) ;
278315 assert_eq ! (
279316 drain_try_recv( & mut rx) ,
280317 vec![
281- ( "B" . into ( ) , 2 ) ,
282- ( "B" . into ( ) , 3 ) ,
283- ( "B" . into ( ) , 4 ) ,
284- ( "B" . into ( ) , 5 ) ,
318+ ( session_id_b , 2 ) ,
319+ ( session_id_b , 3 ) ,
320+ ( session_id_b , 4 ) ,
321+ ( session_id_b , 5 ) ,
285322 ]
286323 ) ;
287324
288325 // A6 -> should deliver immediately and flush A7-9
289- tx. send ( "A" . into ( ) , 6 , ( "A" . into ( ) , 6 ) ) . unwrap ( ) ;
326+ tx. send ( session_id_a , 6 , ( session_id_a , 6 ) ) . unwrap ( ) ;
290327 assert_eq ! (
291328 drain_try_recv( & mut rx) ,
292329 vec![
293- ( "A" . into ( ) , 6 ) ,
294- ( "A" . into ( ) , 7 ) ,
295- ( "A" . into ( ) , 8 ) ,
296- ( "A" . into ( ) , 9 )
330+ ( session_id_a , 6 ) ,
331+ ( session_id_a , 7 ) ,
332+ ( session_id_a , 8 ) ,
333+ ( session_id_a , 9 )
297334 ]
298335 ) ;
299336 // B6 -> should deliver immediately and flush B7-9
300- tx. send ( "B" . into ( ) , 6 , ( "B" . into ( ) , 6 ) ) . unwrap ( ) ;
337+ tx. send ( session_id_b , 6 , ( session_id_b , 6 ) ) . unwrap ( ) ;
301338 assert_eq ! (
302339 drain_try_recv( & mut rx) ,
303340 vec![
304- ( "B" . into ( ) , 6 ) ,
305- ( "B" . into ( ) , 7 ) ,
306- ( "B" . into ( ) , 8 ) ,
307- ( "B" . into ( ) , 9 )
341+ ( session_id_b , 6 ) ,
342+ ( session_id_b , 7 ) ,
343+ ( session_id_b , 8 ) ,
344+ ( session_id_b , 9 )
308345 ]
309346 ) ;
310347 }
311348
312349 #[ test]
313350 fn test_ordered_channel_duplicates ( ) {
314- fn verify_empty_buffers < T > ( states : & DashMap < String , Arc < Mutex < BufferState < T > > > > ) {
351+ let session_id_a = Uuid :: now_v7 ( ) ;
352+ fn verify_empty_buffers < T > ( states : & DashMap < Uuid , Arc < Mutex < BufferState < T > > > > ) {
315353 for entry in states. iter ( ) {
316354 assert ! ( entry. value( ) . lock( ) . unwrap( ) . buffer. is_empty( ) ) ;
317355 }
318356 }
319357
320- let ( tx, mut rx) = ordered_channel :: < ( String , usize ) > ( "test" . to_string ( ) , true ) ;
358+ let ( tx, mut rx) = ordered_channel :: < ( Uuid , u64 ) > ( "test" . to_string ( ) , true ) ;
321359 // A1 -> deliver
322- tx. send ( "A" . into ( ) , 1 , ( "A" . into ( ) , 1 ) ) . unwrap ( ) ;
323- assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( "A" . into ( ) , 1 ) ] ) ;
360+ tx. send ( session_id_a , 1 , ( session_id_a , 1 ) ) . unwrap ( ) ;
361+ assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( session_id_a , 1 ) ] ) ;
324362 verify_empty_buffers ( & tx. states ) ;
325363 // duplicate A1 -> drop even if the message is different.
326- tx. send ( "A" . into ( ) , 1 , ( "A" . into ( ) , 1_000 ) ) . unwrap ( ) ;
364+ tx. send ( session_id_a , 1 , ( session_id_a , 1_000 ) ) . unwrap ( ) ;
327365 assert ! (
328366 drain_try_recv( & mut rx) . is_empty( ) ,
329367 "nothing should be delivered yet"
330368 ) ;
331369 verify_empty_buffers ( & tx. states ) ;
332370 // A2 -> deliver
333- tx. send ( "A" . into ( ) , 2 , ( "A" . into ( ) , 2 ) ) . unwrap ( ) ;
334- assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( "A" . into ( ) , 2 ) ] ) ;
371+ tx. send ( session_id_a , 2 , ( session_id_a , 2 ) ) . unwrap ( ) ;
372+ assert_eq ! ( drain_try_recv( & mut rx) , vec![ ( session_id_a , 2 ) ] ) ;
335373 verify_empty_buffers ( & tx. states ) ;
336374 // late A1 duplicate -> drop
337- tx. send ( "A" . into ( ) , 1 , ( "A" . into ( ) , 1_001 ) ) . unwrap ( ) ;
375+ tx. send ( session_id_a , 1 , ( session_id_a , 1_001 ) ) . unwrap ( ) ;
338376 assert ! (
339377 drain_try_recv( & mut rx) . is_empty( ) ,
340378 "nothing should be delivered yet"
341379 ) ;
342380 verify_empty_buffers ( & tx. states ) ;
343381 }
382+
383+ #[ test]
384+ fn test_sequencer_clone ( ) {
385+ let sequencer = Sequencer {
386+ session_id : Uuid :: now_v7 ( ) ,
387+ last_seqs : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
388+ } ;
389+
390+ let actor_id = id ! ( test[ 0 ] . test) ;
391+
392+ // Modify original sequencer
393+ sequencer. assign_seq ( & actor_id) ;
394+ sequencer. assign_seq ( & actor_id) ;
395+
396+ // Clone should share the same state
397+ let cloned_sequencer = sequencer. clone ( ) ;
398+ assert_eq ! ( sequencer. session_id( ) , cloned_sequencer. session_id( ) , ) ;
399+ assert_eq ! ( cloned_sequencer. assign_seq( & actor_id) , 3 ) ;
400+ }
401+
402+ #[ test]
403+ fn test_sequencer_assign_seq ( ) {
404+ let sequencer = Sequencer {
405+ session_id : Uuid :: now_v7 ( ) ,
406+ last_seqs : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ,
407+ } ;
408+
409+ let actor_id_0 = id ! ( worker[ 0 ] . worker) ;
410+ let actor_id_1 = id ! ( worker[ 1 ] . worker) ;
411+
412+ // Both actors should start with next_seq = 1
413+ assert_eq ! ( sequencer. assign_seq( & actor_id_0) , 1 ) ;
414+ assert_eq ! ( sequencer. assign_seq( & actor_id_1) , 1 ) ;
415+
416+ // Increment actor_0 twice
417+ sequencer. assign_seq ( & actor_id_0) ;
418+ sequencer. assign_seq ( & actor_id_0) ;
419+
420+ // Increment actor_1 once
421+ sequencer. assign_seq ( & actor_id_1) ;
422+
423+ // Check independent sequences
424+ assert_eq ! ( sequencer. assign_seq( & actor_id_0) , 4 ) ;
425+ assert_eq ! ( sequencer. assign_seq( & actor_id_1) , 3 ) ;
426+ }
344427}
0 commit comments