@@ -123,13 +123,30 @@ private static RemovalListener<Object, Object> buildCacheRemoveCallback(
123123 @ NonNull Optional <DefaultDriverContext > context ) {
124124 return (evt ) -> {
125125 try {
126- LOG .error ("Cache removal callback triggered, cause: {}" , evt .getCause ());
126+ LOG .error (
127+ "Cache removal callback triggered, cause: {}, key: {}" , evt .getCause (), evt .getKey ());
127128 CompletableFuture <PreparedStatement > future =
128129 (CompletableFuture <PreparedStatement >) evt .getValue ();
129- ByteBuffer queryId = Uninterruptibles .getUninterruptibly (future ).getId ();
130- LOG .error ("Firing PreparedStatementRemovalEvent for queryId: {}" , queryId );
131- context .ifPresent (
132- ctx -> ctx .getEventBus ().fire (new PreparedStatementRemovalEvent (queryId )));
130+
131+ // Add more detailed logging about the future state
132+ LOG .error (
133+ "Future state - done: {}, cancelled: {}, completedExceptionally: {}" ,
134+ future .isDone (),
135+ future .isCancelled (),
136+ future .isCompletedExceptionally ());
137+
138+ if (future .isDone () && !future .isCompletedExceptionally () && !future .isCancelled ()) {
139+ ByteBuffer queryId = Uninterruptibles .getUninterruptibly (future ).getId ();
140+ LOG .error ("Firing PreparedStatementRemovalEvent for queryId: {}" , queryId );
141+ context .ifPresent (
142+ ctx -> {
143+ LOG .error ("About to fire PreparedStatementRemovalEvent on event bus" );
144+ ctx .getEventBus ().fire (new PreparedStatementRemovalEvent (queryId ));
145+ LOG .error ("PreparedStatementRemovalEvent fired successfully" );
146+ });
147+ } else {
148+ LOG .error ("Skipping removal event - future not in valid state for extraction" );
149+ }
133150 } catch (Exception e ) {
134151 LOG .error ("Unable to register removal handler" , e );
135152 }
@@ -194,6 +211,68 @@ public static SessionBuilder builder() {
194211 return new TestSessionBuilder ();
195212 }
196213
214+ private void debugCacheInvalidation (CqlSession session , TypeChangeEvent event ) {
215+ try {
216+ DefaultDriverContext ctx = (DefaultDriverContext ) session .getContext ();
217+ // Get the processor to check cache state
218+ RequestProcessorRegistry registry = ctx .getRequestProcessorRegistry ();
219+
220+ LOG .error (
221+ "Debug: TypeChangeEvent received for type: {} (changeType: {})" ,
222+ event .oldType .getName (),
223+ event .changeType );
224+ LOG .error ("Debug: Current cache size: {}" , getPreparedCacheSize (session ));
225+
226+ // Force cache cleanup to trigger any pending removals
227+ if (registry != null ) {
228+ LOG .error ("Debug: Forcing cache cleanup..." );
229+ // We can't directly access the cache from here, but we can log that we're trying
230+ }
231+ } catch (Exception e ) {
232+ LOG .error ("Debug: Error during cache invalidation debugging" , e );
233+ }
234+ }
235+
236+ private boolean waitForCacheRemovalWithCleanup (
237+ CountDownLatch latch , CqlSession session , long timeout , TimeUnit unit ) {
238+ long timeoutNanos = unit .toNanos (timeout );
239+ long startTime = System .nanoTime ();
240+ long remainingNanos = timeoutNanos ;
241+
242+ while (remainingNanos > 0 && latch .getCount () > 0 ) {
243+ // Wait for a short period
244+ long waitTime = Math .min (remainingNanos , TimeUnit .SECONDS .toNanos (5 ));
245+ boolean success =
246+ Uninterruptibles .awaitUninterruptibly (latch , waitTime , TimeUnit .NANOSECONDS );
247+
248+ if (success ) {
249+ LOG .error ("Cache removal latch triggered successfully" );
250+ return true ;
251+ }
252+
253+ // If we haven't succeeded yet, try to force cache cleanup
254+ LOG .error (
255+ "Cache removal latch not triggered yet, forcing cleanup. Current cache size: {}" ,
256+ getPreparedCacheSize (session ));
257+
258+ try {
259+ // Force garbage collection to help with weak references
260+ System .gc ();
261+ Thread .sleep (100 );
262+ } catch (InterruptedException e ) {
263+ Thread .currentThread ().interrupt ();
264+ break ;
265+ }
266+
267+ remainingNanos = timeoutNanos - (System .nanoTime () - startTime );
268+ }
269+
270+ LOG .error (
271+ "Cache removal latch failed to trigger within timeout. Final cache size: {}" ,
272+ getPreparedCacheSize (session ));
273+ return false ;
274+ }
275+
197276 private void invalidationResultSetTest (
198277 Consumer <CqlSession > setupTestSchema , Set <String > expectedChangedTypes ) {
199278 invalidationTestInner (
@@ -223,7 +302,15 @@ private void invalidationTestInner(
223302
224303 try (CqlSession session = sessionWithCacheSizeMetric ()) {
225304
226- assertThat (getPreparedCacheSize (session )).isEqualTo (0 );
305+ // Ensure we start with a clean cache
306+ long initialCacheSize = getPreparedCacheSize (session );
307+ LOG .error ("Starting test with cache size: {}" , initialCacheSize );
308+ assertThat (initialCacheSize ).isEqualTo (0 );
309+
310+ // Force garbage collection to ensure clean state
311+ System .gc ();
312+ Uninterruptibles .sleepUninterruptibly (100 , TimeUnit .MILLISECONDS );
313+
227314 setupTestSchema .accept (session );
228315
229316 PreparedStatement stmt1 = session .prepare (preparedStmtQueryType1 );
@@ -255,6 +342,10 @@ private void invalidationTestInner(
255342 "Received TypeChangeEvent for type: {} (changeType: {})" ,
256343 e .oldType .getName (),
257344 e .changeType );
345+
346+ // Add detailed debugging for cache invalidation
347+ debugCacheInvalidation (session , e );
348+
258349 if (Boolean .TRUE .equals (
259350 changedTypes .putIfAbsent (e .oldType .getName ().toString (), true ))) {
260351 // store an error if we see duplicate change event
@@ -284,18 +375,25 @@ private void invalidationTestInner(
284375 LOG .error ("Expected to invalidate statement 2 (queryId: {}) due to type change" , queryId2 );
285376 session .execute ("ALTER TYPE test_type_caching_2 add i blob" );
286377
287- // Give a small delay to allow the schema change to propagate before checking agreement
288- Uninterruptibles .sleepUninterruptibly (100 , TimeUnit .MILLISECONDS );
378+ // Give a longer delay to allow the schema change to propagate before checking agreement
379+ LOG .error ("Waiting for schema change to propagate..." );
380+ Uninterruptibles .sleepUninterruptibly (500 , TimeUnit .MILLISECONDS );
289381 session .checkSchemaAgreement ();
290382
383+ // Additional delay to allow event processing to complete
384+ LOG .error ("Waiting for event processing to complete..." );
385+ Uninterruptibles .sleepUninterruptibly (1000 , TimeUnit .MILLISECONDS );
386+
291387 // wait for latches and fail if they don't reach zero before timeout
292388 // Use longer timeout for cache removal as it depends on complex event chain
293- boolean cacheRemovalSuccess =
294- Uninterruptibles .awaitUninterruptibly (
295- preparedStmtCacheRemoveLatch , 180 , TimeUnit .SECONDS );
296389 boolean typeChangeSuccess =
297390 Uninterruptibles .awaitUninterruptibly (typeChangeEventLatch , 60 , TimeUnit .SECONDS );
298391
392+ // For cache removal, use a more robust waiting mechanism with periodic cleanup
393+ boolean cacheRemovalSuccess =
394+ waitForCacheRemovalWithCleanup (
395+ preparedStmtCacheRemoveLatch , session , 180 , TimeUnit .SECONDS );
396+
299397 // Provide detailed diagnostics if either latch fails
300398 if (!cacheRemovalSuccess || !typeChangeSuccess ) {
301399 String diagnostics =
0 commit comments