@@ -28,7 +28,7 @@ use std::fmt;
2828use std:: sync:: { Mutex , PoisonError } ;
2929
3030use futures:: future;
31- use rand:: Rng ;
31+ use rand:: { self , rngs :: SmallRng , Error as RandError , Rng , SeedableRng } ;
3232
3333use crate :: options:: { ConflictRangeType , MutationType , TransactionOption } ;
3434use crate :: tuple:: { PackError , Subspace } ;
@@ -41,6 +41,7 @@ pub enum HcaError {
4141 PackError ( PackError ) ,
4242 InvalidDirectoryLayerMetadata ,
4343 PoisonError ,
44+ RandError ( RandError ) ,
4445}
4546
4647impl fmt:: Debug for HcaError {
@@ -52,6 +53,7 @@ impl fmt::Debug for HcaError {
5253 write ! ( f, "invalid directory layer metadata" )
5354 }
5455 HcaError :: PoisonError => write ! ( f, "mutex poisoned" ) ,
56+ HcaError :: RandError ( err) => err. fmt ( f) ,
5557 }
5658 }
5759}
@@ -71,6 +73,11 @@ impl<T> From<PoisonError<T>> for HcaError {
7173 Self :: PoisonError
7274 }
7375}
76+ impl From < RandError > for HcaError {
77+ fn from ( err : RandError ) -> Self {
78+ Self :: RandError ( err)
79+ }
80+ }
7481
7582impl TransactError for HcaError {
7683 fn try_into_fdb_error ( self ) -> Result < FdbError , Self > {
@@ -114,7 +121,7 @@ impl HighContentionAllocator {
114121 reverse : true ,
115122 ..RangeOption :: default ( )
116123 } ;
117- let mut rng = rand:: thread_rng ( ) ;
124+ let mut rng = SmallRng :: from_rng ( & mut rand:: thread_rng ( ) ) ? ;
118125
119126 loop {
120127 let kvs = trx. get_range ( & counters_range, 1 , true ) . await ?;
@@ -130,17 +137,18 @@ impl HighContentionAllocator {
130137 let window = loop {
131138 let counters_start = self . counters . subspace ( & start) ;
132139
133- let mutex_guard = self . allocation_mutex . lock ( ) ?;
134- if window_advanced {
135- trx. clear_range ( self . counters . bytes ( ) , counters_start. bytes ( ) ) ;
136- trx. set_option ( TransactionOption :: NextWriteNoWriteConflictRange ) ?;
137- trx. clear_range ( self . recent . bytes ( ) , self . recent . subspace ( & start) . bytes ( ) ) ;
138- }
139-
140- // Increment the allocation count for the current window
141- trx. atomic_op ( counters_start. bytes ( ) , ONE_BYTES , MutationType :: Add ) ;
142- let count_future = trx. get ( counters_start. bytes ( ) , true ) ;
143- drop ( mutex_guard) ;
140+ let count_future = {
141+ let _mutex_guard = self . allocation_mutex . lock ( ) ?;
142+ if window_advanced {
143+ trx. clear_range ( self . counters . bytes ( ) , counters_start. bytes ( ) ) ;
144+ trx. set_option ( TransactionOption :: NextWriteNoWriteConflictRange ) ?;
145+ trx. clear_range ( self . recent . bytes ( ) , self . recent . subspace ( & start) . bytes ( ) ) ;
146+ } ;
147+
148+ // Increment the allocation count for the current window
149+ trx. atomic_op ( counters_start. bytes ( ) , ONE_BYTES , MutationType :: Add ) ;
150+ trx. get ( counters_start. bytes ( ) , true )
151+ } ;
144152
145153 let count_value = count_future. await ?;
146154 let count = if let Some ( count_value) = count_value {
@@ -171,12 +179,14 @@ impl HighContentionAllocator {
171179 let candidate: i64 = rng. gen_range ( start, start + window) ;
172180 let recent_candidate = self . recent . subspace ( & candidate) ;
173181
174- let mutex_guard = self . allocation_mutex . lock ( ) ?;
175- let latest_counter = trx. get_range ( & counters_range, 1 , true ) ;
176- let candidate_value = trx. get ( recent_candidate. bytes ( ) , false ) ;
177- trx. set_option ( TransactionOption :: NextWriteNoWriteConflictRange ) ?;
178- trx. set ( recent_candidate. bytes ( ) , & [ ] ) ;
179- drop ( mutex_guard) ;
182+ let ( latest_counter, candidate_value) = {
183+ let _mutex_guard = self . allocation_mutex . lock ( ) ?;
184+ let latest_counter = trx. get_range ( & counters_range, 1 , true ) ;
185+ let candidate_value = trx. get ( recent_candidate. bytes ( ) , false ) ;
186+ trx. set_option ( TransactionOption :: NextWriteNoWriteConflictRange ) ?;
187+ trx. set ( recent_candidate. bytes ( ) , & [ ] ) ;
188+ ( latest_counter, candidate_value)
189+ } ;
180190
181191 let ( latest_counter, candidate_value) =
182192 future:: try_join ( latest_counter, candidate_value) . await ?;
0 commit comments