4040import com .datastax .oss .driver .internal .core .session .BuiltInRequestProcessors ;
4141import com .datastax .oss .driver .internal .core .session .RequestProcessor ;
4242import com .datastax .oss .driver .internal .core .session .RequestProcessorRegistry ;
43+ import com .datastax .oss .driver .shaded .guava .common .cache .CacheBuilder ;
4344import com .datastax .oss .driver .shaded .guava .common .cache .RemovalListener ;
4445import com .datastax .oss .driver .shaded .guava .common .util .concurrent .Uninterruptibles ;
4546import com .google .common .collect .ImmutableList ;
7273@ Category (IsolatedTests .class )
7374public class PreparedStatementCachingIT {
7475
75- private static final Logger LOG = LoggerFactory .getLogger (PreparedStatementCachingIT .class );
76-
7776 private CustomCcmRule ccmRule = CustomCcmRule .builder ().build ();
7877
7978 private SessionRule <CqlSession > sessionRule =
@@ -123,30 +122,11 @@ private static RemovalListener<Object, Object> buildCacheRemoveCallback(
123122 @ NonNull Optional <DefaultDriverContext > context ) {
124123 return (evt ) -> {
125124 try {
126- LOG .error (
127- "Cache removal callback triggered, cause: {}, key: {}" , evt .getCause (), evt .getKey ());
128125 CompletableFuture <PreparedStatement > future =
129126 (CompletableFuture <PreparedStatement >) evt .getValue ();
130-
131- // Add more detailed logging about the future state
132- LOG .error (
133- "Future state - done: {}, cancelled: {}, completedExceptionally: {}" ,
134- future .isDone (),
135- future .isCancelled (),
136- future .isCompletedExceptionally ());
137-
138- if (future .isDone () && !future .isCompletedExceptionally () && !future .isCancelled ()) {
139- ByteBuffer queryId = Uninterruptibles .getUninterruptibly (future ).getId ();
140- LOG .error ("Firing PreparedStatementRemovalEvent for queryId: {}" , queryId );
141- context .ifPresent (
142- ctx -> {
143- LOG .error ("About to fire PreparedStatementRemovalEvent on event bus" );
144- ctx .getEventBus ().fire (new PreparedStatementRemovalEvent (queryId ));
145- LOG .error ("PreparedStatementRemovalEvent fired successfully" );
146- });
147- } else {
148- LOG .error ("Skipping removal event - future not in valid state for extraction" );
149- }
127+ ByteBuffer queryId = Uninterruptibles .getUninterruptibly (future ).getId ();
128+ context .ifPresent (
129+ ctx -> ctx .getEventBus ().fire (new PreparedStatementRemovalEvent (queryId )));
150130 } catch (Exception e ) {
151131 LOG .error ("Unable to register removal handler" , e );
152132 }
@@ -156,7 +136,8 @@ private static RemovalListener<Object, Object> buildCacheRemoveCallback(
156136 public TestCqlPrepareAsyncProcessor (@ NonNull Optional <DefaultDriverContext > context ) {
157137 // Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so
158138 // to prevent cache entries from unexpectedly disappearing mid-test.
159- super (context , builder -> builder .removalListener (buildCacheRemoveCallback (context )));
139+ // TODO: it was still weak value cuz it's only a decorator.
140+ super (context , CacheBuilder .newBuilder ().removalListener (buildCacheRemoveCallback (context )));
160141 }
161142 }
162143
@@ -211,83 +192,6 @@ public static SessionBuilder builder() {
211192 return new TestSessionBuilder ();
212193 }
213194
214- private void debugCacheInvalidation (CqlSession session , TypeChangeEvent event ) {
215- try {
216- DefaultDriverContext ctx = (DefaultDriverContext ) session .getContext ();
217- // Get the processor to check cache state
218- RequestProcessorRegistry registry = ctx .getRequestProcessorRegistry ();
219-
220- LOG .error (
221- "Debug: TypeChangeEvent received for type: {} (changeType: {})" ,
222- event .oldType .getName (),
223- event .changeType );
224- LOG .error ("Debug: Current cache size: {}" , getPreparedCacheSize (session ));
225-
226- // Force cache cleanup to trigger any pending removals
227- if (registry != null ) {
228- LOG .error ("Debug: Forcing cache cleanup..." );
229- // We can't directly access the cache from here, but we can log that we're trying
230- }
231- } catch (Exception e ) {
232- LOG .error ("Debug: Error during cache invalidation debugging" , e );
233- }
234- }
235-
236- private boolean waitForCacheRemovalWithCleanup (
237- CountDownLatch latch , CqlSession session , long timeout , TimeUnit unit ) {
238- long timeoutNanos = unit .toNanos (timeout );
239- long startTime = System .nanoTime ();
240- long remainingNanos = timeoutNanos ;
241-
242- LOG .error ("Starting cache removal wait, initial latch count: {}" , latch .getCount ());
243-
244- // First check if the latch is already at 0
245- if (latch .getCount () == 0 ) {
246- LOG .error ("Cache removal latch already at 0, returning success immediately" );
247- return true ;
248- }
249-
250- while (remainingNanos > 0 && latch .getCount () > 0 ) {
251- // Wait for a short period
252- long waitTime = Math .min (remainingNanos , TimeUnit .SECONDS .toNanos (5 ));
253- LOG .error (
254- "Waiting for cache removal latch, current count: {}, wait time: {}s" ,
255- latch .getCount (),
256- TimeUnit .NANOSECONDS .toSeconds (waitTime ));
257-
258- boolean success =
259- Uninterruptibles .awaitUninterruptibly (latch , waitTime , TimeUnit .NANOSECONDS );
260-
261- if (success ) {
262- LOG .error ("Cache removal latch triggered successfully, final count: {}" , latch .getCount ());
263- return true ;
264- }
265-
266- // If we haven't succeeded yet, try to force cache cleanup
267- LOG .error (
268- "Cache removal latch not triggered yet (count: {}), forcing cleanup. Current cache size: {}" ,
269- latch .getCount (),
270- getPreparedCacheSize (session ));
271-
272- try {
273- // Force garbage collection to help with weak references
274- System .gc ();
275- Thread .sleep (100 );
276- } catch (InterruptedException e ) {
277- Thread .currentThread ().interrupt ();
278- break ;
279- }
280-
281- remainingNanos = timeoutNanos - (System .nanoTime () - startTime );
282- }
283-
284- LOG .error (
285- "Cache removal latch failed to trigger within timeout. Final latch count: {}, cache size: {}" ,
286- latch .getCount (),
287- getPreparedCacheSize (session ));
288- return latch .getCount () == 0 ; // Return true if latch reached 0 even if await timed out
289- }
290-
291195 private void invalidationResultSetTest (
292196 Consumer <CqlSession > setupTestSchema , Set <String > expectedChangedTypes ) {
293197 invalidationTestInner (
@@ -317,18 +221,13 @@ private void invalidationTestInner(
317221
318222 try (CqlSession session = sessionWithCacheSizeMetric ()) {
319223
320- // Ensure we start with a clean cache
321- long initialCacheSize = getPreparedCacheSize (session );
322- LOG .error ("Starting test with cache size: {}" , initialCacheSize );
323- assertThat (initialCacheSize ).isEqualTo (0 );
324-
325- // Force garbage collection to ensure clean state
326- System .gc ();
327- Uninterruptibles .sleepUninterruptibly (100 , TimeUnit .MILLISECONDS );
328-
224+ assertThat (getPreparedCacheSize (session )).isEqualTo (0 );
329225 setupTestSchema .accept (session );
330226
331- // Set up event handlers BEFORE creating prepared statements to avoid race conditions
227+ session .prepare (preparedStmtQueryType1 );
228+ ByteBuffer queryId2 = session .prepare (preparedStmtQueryType2 ).getId ();
229+ assertThat (getPreparedCacheSize (session )).isEqualTo (2 );
230+
332231 CountDownLatch preparedStmtCacheRemoveLatch = new CountDownLatch (1 );
333232 CountDownLatch typeChangeEventLatch = new CountDownLatch (expectedChangedTypes .size ());
334233
@@ -340,21 +239,11 @@ private void invalidationTestInner(
340239 new AtomicReference <>(Optional .empty ());
341240 AtomicReference <Optional <String >> removedQueryEventError =
342241 new AtomicReference <>(Optional .empty ());
343-
344- LOG .error ("Registering event handlers before creating prepared statements" );
345242 ctx .getEventBus ()
346243 .register (
347244 TypeChangeEvent .class ,
348245 (e ) -> {
349246 // expect one event per type changed and for every parent type that nests it
350- LOG .error (
351- "Received TypeChangeEvent for type: {} (changeType: {})" ,
352- e .oldType .getName (),
353- e .changeType );
354-
355- // Add detailed debugging for cache invalidation
356- debugCacheInvalidation (session , e );
357-
358247 if (Boolean .TRUE .equals (
359248 changedTypes .putIfAbsent (e .oldType .getName ().toString (), true ))) {
360249 // store an error if we see duplicate change event
@@ -367,85 +256,27 @@ private void invalidationTestInner(
367256 .register (
368257 PreparedStatementRemovalEvent .class ,
369258 (e ) -> {
370- LOG .error ("Received PreparedStatementRemovalEvent for queryId: {}" , e .queryId );
371259 if (!removedQueryIds .compareAndSet (Optional .empty (), Optional .of (e .queryId ))) {
372260 // store an error if we see multiple cache invalidation events
373261 // any non-empty error will fail the test so it's OK to do this multiple times
374262 removedQueryEventError .set (
375263 Optional .of ("Unable to set reference for PS removal event" ));
376- LOG .warn (
377- "Multiple PreparedStatementRemovalEvents received, ignoring subsequent ones" );
378264 }
379- LOG .error (
380- "About to countdown preparedStmtCacheRemoveLatch, current count: {}" ,
381- preparedStmtCacheRemoveLatch .getCount ());
382265 preparedStmtCacheRemoveLatch .countDown ();
383- LOG .error (
384- "Countdown completed, new count: {}" , preparedStmtCacheRemoveLatch .getCount ());
385266 });
386267
387- // Now create the prepared statements
388- PreparedStatement stmt1 = session .prepare (preparedStmtQueryType1 );
389- PreparedStatement stmt2 = session .prepare (preparedStmtQueryType2 );
390- ByteBuffer queryId2 = stmt2 .getId ();
391- assertThat (getPreparedCacheSize (session )).isEqualTo (2 );
392-
393- LOG .error ("Prepared statements in cache:" );
394- LOG .error (" Statement 1: {} (queryId: {})" , preparedStmtQueryType1 , stmt1 .getId ());
395- LOG .error (" Statement 2: {} (queryId: {})" , preparedStmtQueryType2 , stmt2 .getId ());
396-
397268 // alter test_type_caching_2 to trigger cache invalidation and above events
398- LOG .error ("Executing ALTER TYPE test_type_caching_2 add i blob" );
399- LOG .error ("Expected to invalidate statement 2 (queryId: {}) due to type change" , queryId2 );
400269 session .execute ("ALTER TYPE test_type_caching_2 add i blob" );
401270
402- // Give a longer delay to allow the schema change to propagate before checking agreement
403- LOG .error ("Waiting for schema change to propagate..." );
404- Uninterruptibles .sleepUninterruptibly (500 , TimeUnit .MILLISECONDS );
405271 session .checkSchemaAgreement ();
406272
407273 // wait for latches and fail if they don't reach zero before timeout
408- // Use longer timeout for cache removal as it depends on complex event chain
409- boolean typeChangeSuccess =
410- Uninterruptibles .awaitUninterruptibly (typeChangeEventLatch , 60 , TimeUnit .SECONDS );
411-
412- // For cache removal, use a more robust waiting mechanism with periodic cleanup
413- boolean cacheRemovalSuccess =
414- waitForCacheRemovalWithCleanup (
415- preparedStmtCacheRemoveLatch , session , 180 , TimeUnit .SECONDS );
416-
417- // Provide detailed diagnostics if either latch fails
418- if (!cacheRemovalSuccess || !typeChangeSuccess ) {
419- String diagnostics =
420- String .format (
421- "Test failure diagnostics:\n "
422- + " - Cache removal latch success: %s (count: %d)\n "
423- + " - Type change latch success: %s (count: %d)\n "
424- + " - Current cache size: %d\n "
425- + " - Expected changed types: %s\n "
426- + " - Actual changed types detected: %s\n "
427- + " - Expected removed query ID: %s\n "
428- + " - Actual removed query IDs: %s\n "
429- + " - Type change errors: %s\n "
430- + " - Removal event errors: %s" ,
431- cacheRemovalSuccess ,
432- preparedStmtCacheRemoveLatch .getCount (),
433- typeChangeSuccess ,
434- typeChangeEventLatch .getCount (),
435- getPreparedCacheSize (session ),
436- expectedChangedTypes ,
437- changedTypes .keySet (),
438- queryId2 ,
439- removedQueryIds .get (),
440- typeChangeEventError .get (),
441- removedQueryEventError .get ());
442- LOG .error ("Prepared statement cache invalidation test failed: {}" , diagnostics );
443- }
444-
445- assertThat (cacheRemovalSuccess )
274+ assertThat (
275+ Uninterruptibles .awaitUninterruptibly (
276+ preparedStmtCacheRemoveLatch , 120 , TimeUnit .SECONDS ))
446277 .withFailMessage ("preparedStmtCacheRemoveLatch did not trigger before timeout" )
447278 .isTrue ();
448- assertThat (typeChangeSuccess )
279+ assertThat (Uninterruptibles . awaitUninterruptibly ( typeChangeEventLatch , 20 , TimeUnit . SECONDS ) )
449280 .withFailMessage ("typeChangeEventLatch did not trigger before timeout" )
450281 .isTrue ();
451282
0 commit comments