@@ -1102,3 +1102,129 @@ def test_receive_migrated_after_moving(self, pool_class):
11021102 finally :
11031103 if hasattr (test_redis_client .connection_pool , "disconnect" ):
11041104 test_redis_client .connection_pool .disconnect ()
1105+
1106+ @pytest .mark .parametrize ("pool_class" , [ConnectionPool , BlockingConnectionPool ])
1107+ def test_overlapping_moving_events (self , pool_class ):
1108+ """
1109+ Test handling of overlapping/duplicate MOVING events (e.g., two MOVING events before the first expires).
1110+ Ensures that the second MOVING event updates the pool and connections as expected, and that expiry/cleanup works.
1111+ """
1112+ test_redis_client = self ._get_client (
1113+ pool_class , max_connections = 5 , setup_pool_handler = True
1114+ )
1115+ try :
1116+ # Create and release some connections
1117+ for _ in range (3 ):
1118+ conn = test_redis_client .connection_pool .get_connection ()
1119+ test_redis_client .connection_pool .release (conn )
1120+
1121+ # Take 2 connections to be in use
1122+ in_use_connections = []
1123+ for _ in range (2 ):
1124+ conn = test_redis_client .connection_pool .get_connection ()
1125+ in_use_connections .append (conn )
1126+
1127+ # Trigger first MOVING event
1128+ key_moving1 = "key_receive_moving_0"
1129+ value_moving1 = "value3_0"
1130+ result1 = test_redis_client .set (key_moving1 , value_moving1 )
1131+ assert result1 is True
1132+ self ._validate_conn_kwargs (
1133+ test_redis_client .connection_pool ,
1134+ MockSocket .DEFAULT_ADDRESS .split (":" )[0 ],
1135+ int (MockSocket .DEFAULT_ADDRESS .split (":" )[1 ]),
1136+ MockSocket .AFTER_MOVING_ADDRESS .split (":" )[0 ],
1137+ self .config .relax_timeout ,
1138+ )
1139+ # Validate all connections reflect the first MOVING event
1140+ self ._validate_in_use_connections_state (in_use_connections )
1141+ self ._validate_free_connections_state (
1142+ test_redis_client .connection_pool ,
1143+ MockSocket .AFTER_MOVING_ADDRESS .split (":" )[0 ],
1144+ self .config .relax_timeout ,
1145+ should_be_connected_count = 1 ,
1146+ connected_to_tmp_addres = True ,
1147+ )
1148+
1149+ # Before the first MOVING expires, trigger a second MOVING event (simulate new address)
1150+ # Patch MockSocket to use a new address for the second event
1151+ new_address = "5.6.7.8:6380"
1152+ orig_after_moving = MockSocket .AFTER_MOVING_ADDRESS
1153+ MockSocket .AFTER_MOVING_ADDRESS = new_address
1154+ try :
1155+ key_moving2 = "key_receive_moving_1"
1156+ value_moving2 = "value3_1"
1157+ result2 = test_redis_client .set (key_moving2 , value_moving2 )
1158+ assert result2 is True
1159+ self ._validate_conn_kwargs (
1160+ test_redis_client .connection_pool ,
1161+ MockSocket .DEFAULT_ADDRESS .split (":" )[0 ],
1162+ int (MockSocket .DEFAULT_ADDRESS .split (":" )[1 ]),
1163+ new_address .split (":" )[0 ],
1164+ self .config .relax_timeout ,
1165+ )
1166+ # Validate all connections reflect the second MOVING event
1167+ self ._validate_in_use_connections_state (in_use_connections )
1168+ self ._validate_free_connections_state (
1169+ test_redis_client .connection_pool ,
1170+ new_address .split (":" )[0 ],
1171+ self .config .relax_timeout ,
1172+ should_be_connected_count = 1 ,
1173+ connected_to_tmp_addres = True ,
1174+ )
1175+ finally :
1176+ MockSocket .AFTER_MOVING_ADDRESS = orig_after_moving
1177+
1178+ # Wait for both MOVING timeouts to expire
1179+ sleep (MockSocket .MOVING_TIMEOUT + 0.5 )
1180+ self ._validate_conn_kwargs (
1181+ test_redis_client .connection_pool ,
1182+ MockSocket .DEFAULT_ADDRESS .split (":" )[0 ],
1183+ int (MockSocket .DEFAULT_ADDRESS .split (":" )[1 ]),
1184+ None ,
1185+ - 1 ,
1186+ )
1187+ finally :
1188+ if hasattr (test_redis_client .connection_pool , "disconnect" ):
1189+ test_redis_client .connection_pool .disconnect ()
1190+
1191+ @pytest .mark .parametrize ("pool_class" , [ConnectionPool , BlockingConnectionPool ])
1192+ def test_thread_safety_concurrent_event_handling (self , pool_class ):
1193+ """
1194+ Test thread-safety under concurrent maintenance event handling.
1195+ Simulates multiple threads triggering MOVING events and performing operations concurrently.
1196+ """
1197+ import threading
1198+
1199+ test_redis_client = self ._get_client (
1200+ pool_class , max_connections = 5 , setup_pool_handler = True
1201+ )
1202+ results = []
1203+ errors = []
1204+
1205+ def worker (idx ):
1206+ try :
1207+ key = f"key_receive_moving_{ idx } "
1208+ value = f"value3_{ idx } "
1209+ result = test_redis_client .set (key , value )
1210+ results .append (result )
1211+ except Exception as e :
1212+ errors .append (e )
1213+
1214+ threads = [threading .Thread (target = worker , args = (i ,)) for i in range (5 )]
1215+ for t in threads :
1216+ t .start ()
1217+ for t in threads :
1218+ t .join ()
1219+ assert all (results ), f"Not all threads succeeded: { results } "
1220+ assert not errors , f"Errors occurred in threads: { errors } "
1221+ # After all threads, MOVING event should have been handled safely
1222+ self ._validate_conn_kwargs (
1223+ test_redis_client .connection_pool ,
1224+ MockSocket .DEFAULT_ADDRESS .split (":" )[0 ],
1225+ int (MockSocket .DEFAULT_ADDRESS .split (":" )[1 ]),
1226+ MockSocket .AFTER_MOVING_ADDRESS .split (":" )[0 ],
1227+ self .config .relax_timeout ,
1228+ )
1229+ if hasattr (test_redis_client .connection_pool , "disconnect" ):
1230+ test_redis_client .connection_pool .disconnect ()
0 commit comments