|
1 | 1 | import io |
2 | 2 | import time |
3 | 3 | import unittest |
| 4 | +from unittest.mock import patch |
| 5 | + |
| 6 | +from pymysql.protocol import MysqlPacket |
4 | 7 |
|
5 | | -from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation |
6 | | -from pymysqlreplication.tests import base |
7 | 8 | from pymysqlreplication import BinLogStreamReader |
8 | | -from pymysqlreplication.gtid import GtidSet, Gtid |
9 | | -from pymysqlreplication.event import * |
10 | 9 | from pymysqlreplication.constants.BINLOG import * |
11 | 10 | from pymysqlreplication.constants.NONE_SOURCE import * |
12 | | -from pymysqlreplication.row_event import * |
| 11 | +from pymysqlreplication.event import * |
| 12 | +from pymysqlreplication.gtid import Gtid, GtidSet |
| 13 | +from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation |
13 | 14 | from pymysqlreplication.packet import BinLogPacketWrapper |
14 | | -from pymysql.protocol import MysqlPacket |
15 | | -from unittest.mock import patch |
16 | | - |
| 15 | +from pymysqlreplication.row_event import * |
| 16 | +from pymysqlreplication.tests import base |
17 | 17 |
|
18 | 18 | __all__ = [ |
19 | 19 | "TestBasicBinLogStreamReader", |
@@ -271,6 +271,87 @@ def test_write_row_event(self): |
271 | 271 | self.assertEqual(event.rows[0]["values"]["data"], "Hello World") |
272 | 272 | self.assertEqual(event.columns[1].name, "data") |
273 | 273 |
|
| 274 | + def test_fetch_column_names_from_schema(self): |
| 275 | + # This test is for scenarios where column names are NOT in the binlog |
| 276 | + # (MySQL 5.7 or older, or MySQL 8.0+ with binlog_row_metadata=MINIMAL) |
| 277 | + |
| 278 | + # Check if binlog_row_metadata exists (MySQL 8.0+) |
| 279 | + try: |
| 280 | + cursor = self.execute("SHOW GLOBAL VARIABLES LIKE 'binlog_row_metadata'") |
| 281 | + result = cursor.fetchone() |
| 282 | + if result: |
| 283 | + global_binlog_row_metadata = result[1] |
| 284 | + if global_binlog_row_metadata == 'FULL': |
| 285 | + self.skipTest("binlog_row_metadata is FULL globally, use_column_name_cache is not needed") |
| 286 | + # If result is None, binlog_row_metadata doesn't exist (MySQL 5.7 or older), so proceed |
| 287 | + except pymysql.err.OperationalError as e: |
| 288 | + if e.args[0] == 1193: # ER_UNKNOWN_SYSTEM_VARIABLE |
| 289 | + # Variable doesn't exist, likely MySQL 5.7 or older, so proceed |
| 290 | + pass |
| 291 | + else: |
| 292 | + raise |
| 293 | + |
| 294 | + query = "CREATE TABLE test_column_cache (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" |
| 295 | + self.execute(query) |
| 296 | + self.execute("INSERT INTO test_column_cache (data) VALUES('Hello')") |
| 297 | + self.execute("COMMIT") |
| 298 | + |
| 299 | + # Test with use_column_name_cache = True |
| 300 | + self.stream.close() |
| 301 | + self.stream = BinLogStreamReader( |
| 302 | + self.database, |
| 303 | + server_id=1024, |
| 304 | + use_column_name_cache=True, |
| 305 | + only_events=[WriteRowsEvent], |
| 306 | + ) |
| 307 | + |
| 308 | + event = self.stream.fetchone() |
| 309 | + self.assertIsInstance(event, WriteRowsEvent) |
| 310 | + self.assertEqual(event.table, "test_column_cache") |
| 311 | + self.assertIn("id", event.rows[0]["values"]) |
| 312 | + self.assertIn("data", event.rows[0]["values"]) |
| 313 | + self.assertEqual(event.rows[0]["values"]["id"], 1) |
| 314 | + self.assertEqual(event.rows[0]["values"]["data"], "Hello") |
| 315 | + |
| 316 | + # Test with use_column_name_cache = False |
| 317 | + self.stream.close() |
| 318 | + |
| 319 | + # Clear cache before next run |
| 320 | + from pymysqlreplication import row_event |
| 321 | + row_event._COLUMN_NAME_CACHE.clear() |
| 322 | + |
| 323 | + self.stream = BinLogStreamReader( |
| 324 | + self.database, |
| 325 | + server_id=1025, # different server_id to avoid caching issues |
| 326 | + use_column_name_cache=False, |
| 327 | + only_events=[WriteRowsEvent], |
| 328 | + ) |
| 329 | + |
| 330 | + # Reset and replay events |
| 331 | + self.resetBinLog() |
| 332 | + self.execute("INSERT INTO test_column_cache (data) VALUES('World')") |
| 333 | + self.execute("COMMIT") |
| 334 | + |
| 335 | + # Skip RotateEvent and FormatDescriptionEvent |
| 336 | + self.stream.fetchone() |
| 337 | + self.stream.fetchone() |
| 338 | + # Skip QueryEvent for BEGIN |
| 339 | + if not self.isMariaDB(): |
| 340 | + self.stream.fetchone() |
| 341 | + # Skip TableMapEvent |
| 342 | + self.stream.fetchone() |
| 343 | + |
| 344 | + event = self.stream.fetchone() |
| 345 | + self.assertIsInstance(event, WriteRowsEvent) |
| 346 | + self.assertEqual(event.table, "test_column_cache") |
| 347 | + # With cache disabled, we should not have column names |
| 348 | + self.assertNotIn("id", event.rows[0]["values"]) |
| 349 | + self.assertNotIn("data", event.rows[0]["values"]) |
| 350 | + |
| 351 | + # cleanup |
| 352 | + row_event._COLUMN_NAME_CACHE.clear() |
| 353 | + |
| 354 | + |
274 | 355 | def test_delete_row_event(self): |
275 | 356 | query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" |
276 | 357 | self.execute(query) |
|
0 commit comments