|
1 | 1 | import io |
2 | 2 | import time |
3 | 3 | import unittest |
| 4 | +from random import randbytes, shuffle |
4 | 5 |
|
5 | 6 | from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation |
6 | 7 | from pymysqlreplication.tests import base |
@@ -420,6 +421,44 @@ def test_minimal_image_update_row_event(self): |
420 | 421 | self.assertEqual(event.rows[0]["after_values"]["id"], None) |
421 | 422 | self.assertEqual(event.rows[0]["after_values"]["data"], "World") |
422 | 423 |
|
| 424 | + def test_charset_parsing(self): |
| 425 | + char_columns = { |
| 426 | + f"c_utf8_{i}": ("VARCHAR(255) NOT NULL", "Hello") |
| 427 | + for i in range(3) |
| 428 | + } |
| 429 | + char_columns["c_binary"] = ("LONGBLOB NOT NULL", randbytes(1024)) |
| 430 | + int_columns = {f"i_{i}": ("INTEGER NOT NULL", i) for i in range(3)} |
| 431 | + columns = list({**char_columns, **int_columns}.items()) |
| 432 | + shuffle(columns) |
| 433 | + column_types = ", ".join(f"{name} {type}" for name, (type, _) in columns) |
| 434 | + column_names = ", ".join(name for name, _ in columns) |
| 435 | + column_value_placeholders = ", ".join("%s" for _ in range(len(columns))) |
| 436 | + column_values = [value for _, (_, value) in columns] |
| 437 | + query = f"CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, {column_types}, PRIMARY KEY (id)) DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;" |
| 438 | + self.execute(query) |
| 439 | + query = f"INSERT INTO test ({column_names}) VALUES({column_value_placeholders})" |
| 440 | + self.execute_with_args(query, column_values) |
| 441 | + self.execute("COMMIT") |
| 442 | + |
| 443 | + self.assertIsInstance(self.stream.fetchone(), RotateEvent) |
| 444 | + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) |
| 445 | + # QueryEvent for the Create Table |
| 446 | + self.assertIsInstance(self.stream.fetchone(), QueryEvent) |
| 447 | + # QueryEvent for the BEGIN |
| 448 | + self.assertIsInstance(self.stream.fetchone(), QueryEvent) |
| 449 | + |
| 450 | + event = self.stream.fetchone() |
| 451 | + self.assertIsInstance(event, TableMapEvent) |
| 452 | + if event.table_map[event.table_id].column_name_flag: |
| 453 | + columns = {c.name: c for c in event.columns} |
| 454 | + for name, column in columns.items(): |
| 455 | + if name.startswith("c_utf8_"): |
| 456 | + assert column.character_set_name == "utf8" |
| 457 | + assert column.collation_name == "utf8mb3_unicode_ci" |
| 458 | + assert columns["c_binary"].character_set_name == "binary" |
| 459 | + assert columns["c_binary"].collation_name == "binary" |
| 460 | + |
| 461 | + |
423 | 462 | def test_log_pos(self): |
424 | 463 | query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" |
425 | 464 | self.execute(query) |
|
0 commit comments