1313
1414
1515class TestCacheInvalidation (tb .ConnectedTestCase ):
16+
17+ def _get_cached_statements (self , connection = None ):
18+ if connection is None :
19+ connection = self .con
20+ return list (connection ._stmt_cache .iter_statements ())
21+
22+ def _check_statements_are_not_closed (self , statements ):
23+ self .assertGreater (len (statements ), 0 )
24+ self .assertTrue (all (not s .closed for s in statements ))
25+
26+ def _check_statements_are_closed (self , statements ):
27+ self .assertGreater (len (statements ), 0 )
28+ self .assertTrue (all (s .closed for s in statements ))
29+
1630 async def test_prepare_cache_invalidation_silent (self ):
1731 await self .con .execute ('CREATE TABLE tab1(a int, b int)' )
1832
@@ -21,11 +35,16 @@ async def test_prepare_cache_invalidation_silent(self):
2135 result = await self .con .fetchrow ('SELECT * FROM tab1' )
2236 self .assertEqual (result , (1 , 2 ))
2337
38+ statements = self ._get_cached_statements ()
39+ self ._check_statements_are_not_closed (statements )
40+
2441 await self .con .execute (
2542 'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text' )
2643
2744 result = await self .con .fetchrow ('SELECT * FROM tab1' )
2845 self .assertEqual (result , (1 , '2' ))
46+
47+ self ._check_statements_are_closed (statements )
2948 finally :
3049 await self .con .execute ('DROP TABLE tab1' )
3150
@@ -37,6 +56,9 @@ async def test_prepare_cache_invalidation_in_transaction(self):
3756 result = await self .con .fetchrow ('SELECT * FROM tab1' )
3857 self .assertEqual (result , (1 , 2 ))
3958
59+ statements = self ._get_cached_statements ()
60+ self ._check_statements_are_not_closed (statements )
61+
4062 await self .con .execute (
4163 'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text' )
4264
@@ -45,6 +67,8 @@ async def test_prepare_cache_invalidation_in_transaction(self):
4567 async with self .con .transaction ():
4668 result = await self .con .fetchrow ('SELECT * FROM tab1' )
4769
70+ self ._check_statements_are_closed (statements )
71+
4872 # This is now OK,
4973 result = await self .con .fetchrow ('SELECT * FROM tab1' )
5074 self .assertEqual (result , (1 , '2' ))
@@ -69,6 +93,12 @@ async def test_prepare_cache_invalidation_in_pool(self):
6993 result = await con2 .fetchrow ('SELECT * FROM tab1' )
7094 self .assertEqual (result , (1 , 2 ))
7195
96+ statements1 = self ._get_cached_statements (con1 )
97+ self ._check_statements_are_not_closed (statements1 )
98+
99+ statements2 = self ._get_cached_statements (con2 )
100+ self ._check_statements_are_not_closed (statements2 )
101+
72102 await self .con .execute (
73103 'ALTER TABLE tab1 ALTER COLUMN b SET DATA TYPE text' )
74104
@@ -77,6 +107,9 @@ async def test_prepare_cache_invalidation_in_pool(self):
77107 result = await con1 .fetchrow ('SELECT * FROM tab1' )
78108 self .assertEqual (result , (1 , '2' ))
79109
110+ self ._check_statements_are_closed (statements1 )
111+ self ._check_statements_are_closed (statements2 )
112+
80113 async with con2 .transaction ():
81114 # This should work, as con1 should have invalidated
82115 # the plan cache.
@@ -98,11 +131,17 @@ async def test_type_cache_invalidation_in_transaction(self):
98131 result = await self .con .fetchrow ('SELECT * FROM tab1' )
99132 self .assertEqual (result , (1 , (2 , 3 )))
100133
134+ statements = self ._get_cached_statements ()
135+ self ._check_statements_are_not_closed (statements )
136+
101137 async with self .con .transaction ():
102138 await self .con .execute ('ALTER TYPE typ1 ADD ATTRIBUTE c text' )
103139 with self .assertRaisesRegex (
104140 asyncpg .OutdatedSchemaCacheError , ERRNUM ):
105141 await self .con .fetchrow ('SELECT * FROM tab1' )
142+
143+ self ._check_statements_are_closed (statements )
144+
106145 # The second request must be correct (cache was dropped):
107146 result = await self .con .fetchrow ('SELECT * FROM tab1' )
108147 self .assertEqual (result , (1 , (2 , 3 , None )))
@@ -123,13 +162,19 @@ async def test_type_cache_invalidation_in_cancelled_transaction(self):
123162 result = await self .con .fetchrow ('SELECT * FROM tab1' )
124163 self .assertEqual (result , (1 , (2 , 3 )))
125164
165+ statements = self ._get_cached_statements ()
166+ self ._check_statements_are_not_closed (statements )
167+
126168 try :
127169 async with self .con .transaction ():
128170 await self .con .execute (
129171 'ALTER TYPE typ1 ADD ATTRIBUTE c text' )
130172 with self .assertRaisesRegex (
131173 asyncpg .OutdatedSchemaCacheError , ERRNUM ):
132174 await self .con .fetchrow ('SELECT * FROM tab1' )
175+
176+ self ._check_statements_are_closed (statements )
177+
133178 # The second request must be correct (cache was dropped):
134179 result = await self .con .fetchrow ('SELECT * FROM tab1' )
135180 self .assertEqual (result , (1 , (2 , 3 , None )))
@@ -158,13 +203,19 @@ async def test_prepared_type_cache_invalidation(self):
158203 result = await prep .fetchrow ()
159204 self .assertEqual (result , (1 , (2 , 3 )))
160205
206+ statements = self ._get_cached_statements ()
207+ self ._check_statements_are_not_closed (statements )
208+
161209 try :
162210 async with self .con .transaction ():
163211 await self .con .execute (
164212 'ALTER TYPE typ1 ADD ATTRIBUTE c text' )
165213 with self .assertRaisesRegex (
166214 asyncpg .OutdatedSchemaCacheError , ERRNUM ):
167215 await prep .fetchrow ()
216+
217+ self ._check_statements_are_closed (statements )
218+
168219 # PS has its local cache for types codecs, even after the
169220 # cache cleanup it is not possible to use it.
170221 # That's why it is marked as closed.
@@ -206,11 +257,16 @@ async def test_type_cache_invalidation_on_drop_type_attr(self):
206257 result = await self .con .fetchrow ('SELECT * FROM tab1' )
207258 self .assertEqual (result , (1 , (2 , 3 , 'x' )))
208259
260+ statements = self ._get_cached_statements ()
261+ self ._check_statements_are_not_closed (statements )
262+
209263 await self .con .execute ('ALTER TYPE typ1 DROP ATTRIBUTE x' )
210264 with self .assertRaisesRegex (
211265 asyncpg .OutdatedSchemaCacheError , ERRNUM ):
212266 await self .con .fetchrow ('SELECT * FROM tab1' )
213267
268+ self ._check_statements_are_closed (statements )
269+
214270 # This is now OK, the cache is filled after being dropped.
215271 result = await self .con .fetchrow ('SELECT * FROM tab1' )
216272 self .assertEqual (result , (1 , (3 , 'x' )))
@@ -228,6 +284,9 @@ async def test_type_cache_invalidation_on_change_attr(self):
228284 result = await self .con .fetchrow ('SELECT * FROM tab1' )
229285 self .assertEqual (result , (1 , (2 , 3 )))
230286
287+ statements = self ._get_cached_statements ()
288+ self ._check_statements_are_not_closed (statements )
289+
231290 # It is slightly artificial, but can take place in transactional
232291 # schema changing. Nevertheless, if the code checks and raises it
233292 # the most probable reason is a difference with the cache type.
@@ -237,6 +296,8 @@ async def test_type_cache_invalidation_on_change_attr(self):
237296 asyncpg .OutdatedSchemaCacheError , ERRTYP ):
238297 await self .con .fetchrow ('SELECT * FROM tab1' )
239298
299+ self ._check_statements_are_closed (statements )
300+
240301 # This is now OK, the cache is filled after being dropped.
241302 result = await self .con .fetchrow ('SELECT * FROM tab1' )
242303 self .assertEqual (result , (1 , (2 , None )))
@@ -265,9 +326,15 @@ async def test_type_cache_invalidation_in_pool(self):
265326 result = await con1 .fetchrow ('SELECT * FROM tab1' )
266327 self .assertEqual (result , (1 , (2 , 3 )))
267328
329+ statements1 = self ._get_cached_statements (con1 )
330+ self ._check_statements_are_not_closed (statements1 )
331+
268332 result = await con2 .fetchrow ('SELECT * FROM tab1' )
269333 self .assertEqual (result , (1 , (2 , 3 )))
270334
335+ statements2 = self ._get_cached_statements (con2 )
336+ self ._check_statements_are_not_closed (statements2 )
337+
271338 # Create the same schema in the "testdb", fetch data which caches
272339 # type info.
273340 con_chk = await pool_chk .acquire ()
@@ -277,6 +344,9 @@ async def test_type_cache_invalidation_in_pool(self):
277344 result = await con_chk .fetchrow ('SELECT * FROM tab1' )
278345 self .assertEqual (result , (1 , (2 , 3 )))
279346
347+ statements_chk = self ._get_cached_statements (con_chk )
348+ self ._check_statements_are_not_closed (statements_chk )
349+
280350 # Change schema in the databases.
281351 await self .con .execute ('ALTER TYPE typ1 ADD ATTRIBUTE c text' )
282352 await con_chk .execute ('ALTER TYPE typ1 ADD ATTRIBUTE c text' )
@@ -287,6 +357,9 @@ async def test_type_cache_invalidation_in_pool(self):
287357 asyncpg .OutdatedSchemaCacheError , ERRNUM ):
288358 await con1 .fetchrow ('SELECT * FROM tab1' )
289359
360+ self ._check_statements_are_closed (statements1 )
361+ self ._check_statements_are_closed (statements2 )
362+
290363 async with con2 .transaction ():
291364 # This should work, as con1 should have invalidated all caches.
292365 result = await con2 .fetchrow ('SELECT * FROM tab1' )
@@ -298,10 +371,14 @@ async def test_type_cache_invalidation_in_pool(self):
298371
299372 # Check the invalidation is database-specific, i.e. cache entries
300373 # for pool_chk/con_chk was not dropped via pool/con1.
374+
375+ self ._check_statements_are_not_closed (statements_chk )
376+
301377 with self .assertRaisesRegex (
302378 asyncpg .OutdatedSchemaCacheError , ERRNUM ):
303379 await con_chk .fetchrow ('SELECT * FROM tab1' )
304380
381+ self ._check_statements_are_closed (statements_chk )
305382 finally :
306383 await self .con .execute ('DROP TABLE tab1' )
307384 await self .con .execute ('DROP TYPE typ1' )
0 commit comments