diff --git a/mysql_ch_replicator/binlog_replicator.py b/mysql_ch_replicator/binlog_replicator.py index 184737c..4671e2f 100644 --- a/mysql_ch_replicator/binlog_replicator.py +++ b/mysql_ch_replicator/binlog_replicator.py @@ -6,10 +6,12 @@ import json import random import re +import pymysql from enum import Enum from logging import getLogger from dataclasses import dataclass +from collections import OrderedDict from pymysql.err import OperationalError @@ -252,15 +254,15 @@ def __init__(self, replicator_settings: BinlogReplicatorSettings): self.records_per_file = replicator_settings.records_per_file self.db_file_writers: dict = {} # db_name => FileWriter - def store_event(self, log_event: LogEvent): + def store_event(self, log_event: LogEvent, allow_create_new_file: bool = True): logger.debug(f'store event {log_event.transaction_id}') - file_writer = self.get_or_create_file_writer(log_event.db_name) + file_writer = self.get_or_create_file_writer(log_event.db_name, allow_create_new_file) file_writer.write_event(log_event) - def get_or_create_file_writer(self, db_name: str) -> FileWriter: + def get_or_create_file_writer(self, db_name: str, allow_create_new_file: bool) -> FileWriter: file_writer = self.db_file_writers.get(db_name) if file_writer is not None: - if file_writer.num_records >= self.records_per_file: + if file_writer.num_records >= self.records_per_file and allow_create_new_file: file_writer.close() del self.db_file_writers[db_name] file_writer = None @@ -359,7 +361,10 @@ def __init__(self, settings: Settings): log_file, log_pos = None, None if self.state.prev_last_seen_transaction: - log_file, log_pos = self.state.prev_last_seen_transaction + log_file, log_pos = self.state.prev_last_seen_transaction[:2] + + is_mariadb = self._detect_mariadb(mysql_settings) + logger.info(f'detected database type: {"MariaDB" if is_mariadb else "MySQL"}') self.stream = BinLogStreamReader( connection_settings=mysql_settings, @@ -369,10 +374,26 @@ def __init__(self, settings: Settings): log_pos=log_pos, log_file=log_file, mysql_timezone=settings.mysql_timezone, + is_mariadb=is_mariadb, ) self.last_state_update = 0 self.last_binlog_clear_time = 0 + def _detect_mariadb(self, mysql_settings): + """Detect if connected to MariaDB or MySQL by checking version string""" + + try: + conn = pymysql.connect(**mysql_settings) + cursor = conn.cursor() + cursor.execute("SELECT VERSION()") + version = cursor.fetchone()[0] + cursor.close() + conn.close() + return 'MariaDB' in version + except Exception as e: + logger.warning(f'failed to detect database type: {e}, assuming MySQL') + return False + def clear_old_binlog_if_required(self): curr_time = time.time() if curr_time - self.last_binlog_clear_time < BinlogReplicator.BINLOG_CLEAN_INTERVAL: @@ -426,6 +447,8 @@ def _try_parse_db_name_from_query(cls, query: str) -> str: def run(self): last_transaction_id = None + position_seq_cache = OrderedDict() + max_cache_size = 500 killer = GracefulKiller() @@ -445,7 +468,23 @@ def run(self): for event in self.stream: last_read_count += 1 total_processed_events += 1 - transaction_id = (self.stream.log_file, self.stream.log_pos) + base_transaction_id = (self.stream.log_file, self.stream.log_pos) + + if self.stream.is_mariadb: + if base_transaction_id in position_seq_cache: + transaction_seq = position_seq_cache[base_transaction_id] + 1 + else: + transaction_seq = 0 + + position_seq_cache[base_transaction_id] = transaction_seq + + if len(position_seq_cache) > max_cache_size: + position_seq_cache.popitem(last=False) + + transaction_id = (base_transaction_id[0], base_transaction_id[1], transaction_seq) + else: + transaction_id = base_transaction_id + last_transaction_id = transaction_id self.update_state_if_required(transaction_id) @@ -524,7 +563,8 @@ def run(self): f'records: {log_event.records}', ) - self.data_writer.store_event(log_event) + allow_create_new_file = not self.stream.is_mariadb or transaction_seq == 0 + self.data_writer.store_event(log_event, allow_create_new_file) if last_read_count > 1000: break diff --git a/mysql_ch_replicator/db_replicator_realtime.py b/mysql_ch_replicator/db_replicator_realtime.py index 383b110..823d781 100644 --- a/mysql_ch_replicator/db_replicator_realtime.py +++ b/mysql_ch_replicator/db_replicator_realtime.py @@ -204,6 +204,13 @@ def handle_create_table_query(self, query, db_name): return target_table_name = self.replicator.get_target_table_name(mysql_structure.table_name) ch_structure.table_name = target_table_name + check_table_exists = f"EXISTS TABLE `{self.replicator.clickhouse_api.database}`.`{target_table_name}`" + table_exists = self.replicator.clickhouse_api.client.command(check_table_exists) + if table_exists: + logger.info(f'table {target_table_name} already exists in {self.replicator.clickhouse_api.database}, skipping CREATE TABLE') + if mysql_structure.table_name not in self.replicator.state.tables_structure: + self.replicator.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure) + return self.replicator.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure) indexes = self.replicator.config.get_indexes(self.replicator.database, mysql_structure.table_name) partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, mysql_structure.table_name) diff --git a/tests/test_e2e_integration.py b/tests/test_e2e_integration.py index 8e70eed..b482f5d 100644 --- a/tests/test_e2e_integration.py +++ b/tests/test_e2e_integration.py @@ -1,9 +1,11 @@ from common import * import pytest import decimal +import os from mysql_ch_replicator import config from mysql_ch_replicator import mysql_api from mysql_ch_replicator import clickhouse_api +from mysql_ch_replicator.binlog_replicator import FileReader, EventType @pytest.mark.parametrize('config_file', [ @@ -45,6 +47,9 @@ def test_e2e_regular(config_file): binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file) binlog_replicator_runner.run() + + time.sleep(10) + db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file) db_replicator_runner.run() @@ -148,6 +153,27 @@ def test_e2e_regular(config_file): mysql.execute(f'DROP TABLE `{TEST_TABLE_NAME_3}`') assert_wait(lambda: TEST_TABLE_NAME_3 not in ch.get_tables()) + binlog_dir = os.path.join(cfg.binlog_replicator.data_dir, TEST_DB_NAME) + binlog_files = [f for f in os.listdir(binlog_dir) if f.endswith('.bin')] + assert len(binlog_files) > 0, 'no binlog files found' + + expected_tuple_len = 2 if config_file == CONFIG_FILE else 3 + binlog_file_path = os.path.join(binlog_dir, binlog_files[0]) + file_reader = FileReader(binlog_file_path) + + event_found = False + while True: + event = file_reader.read_next_event() + if event is None: + break + if event.event_type == EventType.ADD_EVENT.value: + assert len(event.transaction_id) == expected_tuple_len, \ + f'expected transaction_id tuple length {expected_tuple_len}, got {len(event.transaction_id)}: {event.transaction_id}' + event_found = True + + assert event_found, 'no ADD_EVENT found in binlog file' + file_reader.close() + db_replicator_runner.stop() diff --git a/tests/test_issue_209.py b/tests/test_issue_209.py new file mode 100644 index 0000000..f8e0602 --- /dev/null +++ b/tests/test_issue_209.py @@ -0,0 +1,191 @@ +import time + +from mysql_ch_replicator import config +from mysql_ch_replicator import mysql_api +from mysql_ch_replicator import clickhouse_api + +from common import * + + +def test_issue_209_bulk_insert(): + """ + Test for issue #209: Insertion cuts off after certain amount of records + https://github.com/bakwc/mysql_ch_replicator/issues/209 + + This test verifies that all records are properly replicated when inserting + 1000 records at once. + """ + config_file = CONFIG_FILE_MARIADB + + cfg = config.Settings() + cfg.load(config_file) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute(f""" +CREATE TABLE `sites` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `user_id` int(11) DEFAULT NULL, + `email` varchar(100) NOT NULL, + `project_id` int(11) DEFAULT NULL, + `kw` varchar(191) NOT NULL, + `timestamp` timestamp NOT NULL DEFAULT current_timestamp(), + `active` tinyint(1) NOT NULL DEFAULT 1, + `deleted_at` timestamp NULL DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `active_idx` (`active`), + KEY `sites_project_id_foreign` (`project_id`), + KEY `sites_user_id_index` (`user_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_unicode_ci +""") + + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: 'sites' in ch.get_tables()) + + mysql.execute(""" +INSERT INTO sites (user_id, email, project_id, kw, timestamp, active, deleted_at) +WITH RECURSIVE seq (n) AS ( + SELECT 1 + UNION ALL + SELECT n + 1 FROM seq WHERE n < 1000 +) +SELECT + 12345 AS user_id, + 'test@example.com' AS email, + 12345 AS project_id, + concat('Keyword ', n) AS kw, + now() AS timestamp, + 1 AS active, + NULL AS deleted_at +FROM seq; +""", commit=True) + + mysql.execute("SELECT COUNT(*) FROM sites") + mysql_count = mysql.cursor.fetchall()[0][0] + assert mysql_count == 1000, f"Expected 1000 records in MySQL, got {mysql_count}" + + assert_wait(lambda: len(ch.select('sites')) == 1000, max_wait_time=10.0) + + ch_count = len(ch.select('sites')) + assert ch_count == 1000, f"Expected 1000 records in ClickHouse, got {ch_count}" + + run_all_runner.stop() + + assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) + assert 'Traceback' not in read_logs(TEST_DB_NAME) + + +def test_issue_209_bulk_update(): + """ + Test for issue #209: Bulk UPDATE operations not replicating properly + + This test reproduces the specific issue reported where UPDATE operations + on MariaDB split into multiple UpdateRowsEvent objects with same position, + causing events to be processed repeatedly. + """ + config_file = CONFIG_FILE_MARIADB + + cfg = config.Settings() + cfg.load(config_file) + + mysql = mysql_api.MySQLApi( + database=None, + mysql_settings=cfg.mysql, + ) + + ch = clickhouse_api.ClickhouseApi( + database=TEST_DB_NAME, + clickhouse_settings=cfg.clickhouse, + ) + + prepare_env(cfg, mysql, ch) + + mysql.execute("RESET MASTER", commit=True) + time.sleep(1) + + mysql.execute(f""" +CREATE TABLE `sites` ( + `id` int(11) NOT NULL AUTO_INCREMENT, + `user_id` int(11) DEFAULT NULL, + `email` varchar(100) NOT NULL, + `project_id` int(11) DEFAULT NULL, + `kw` varchar(191) NOT NULL, + `timestamp` timestamp NOT NULL DEFAULT current_timestamp(), + `active` tinyint(1) NOT NULL DEFAULT 1, + `deleted_at` timestamp NULL DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `active_idx` (`active`), + KEY `sites_project_id_foreign` (`project_id`), + KEY `sites_user_id_index` (`user_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_unicode_ci +""") + + run_all_runner = RunAllRunner(cfg_file=config_file) + run_all_runner.run() + + assert_wait(lambda: TEST_DB_NAME in ch.get_databases()) + ch.execute_command(f'USE `{TEST_DB_NAME}`') + assert_wait(lambda: 'sites' in ch.get_tables()) + + for batch in range(5): + offset = batch * 1000 + mysql.execute(f""" +INSERT INTO sites (user_id, email, project_id, kw, timestamp, active, deleted_at) +WITH RECURSIVE seq (n) AS ( + SELECT 1 + UNION ALL + SELECT n + 1 FROM seq WHERE n < 1000 +) +SELECT + 12345 AS user_id, + 'test@example.com' AS email, + 12345 AS project_id, + concat('Keyword ', n + {offset}) AS kw, + now() AS timestamp, + 0 AS active, + NULL AS deleted_at +FROM seq; +""", commit=True) + + mysql.execute("SELECT COUNT(*) FROM sites WHERE active = 0") + mysql_count = mysql.cursor.fetchall()[0][0] + assert mysql_count == 5000, f"Expected 5000 inactive records in MySQL, got {mysql_count}" + + assert_wait(lambda: len(ch.select('sites')) == 5000, max_wait_time=15.0) + + mysql.execute("UPDATE sites SET active = 1 WHERE active = 0", commit=True) + + mysql.execute("SELECT COUNT(*) FROM sites WHERE active = 1") + mysql_count_active = mysql.cursor.fetchall()[0][0] + assert mysql_count_active == 5000, f"Expected 5000 active records in MySQL, got {mysql_count_active}" + + def check_all_active(): + result = ch.select('sites') + active_count = sum(1 for row in result if row['active'] == 1) + return active_count == 5000 + + assert_wait(check_all_active, max_wait_time=30.0) + + result = ch.select('sites') + active_count = sum(1 for row in result if row['active'] == 1) + assert active_count == 5000, f"Expected 5000 active records in ClickHouse, got {active_count}" + + run_all_runner.stop() + + assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME)) + assert 'Traceback' not in read_logs(TEST_DB_NAME) + diff --git a/tests/tests_config.yaml b/tests/tests_config.yaml index c22016c..0b1f4c5 100644 --- a/tests/tests_config.yaml +++ b/tests/tests_config.yaml @@ -17,7 +17,7 @@ binlog_replicator: binlog_retention_period: 43200 # 12 hours in seconds databases: '*test*' -log_level: 'debug' +log_level: 'info' optimize_interval: 3 check_db_updated_interval: 3 diff --git a/tests/tests_config_mariadb.yaml b/tests/tests_config_mariadb.yaml index 5fefdcc..fa48a8c 100644 --- a/tests/tests_config_mariadb.yaml +++ b/tests/tests_config_mariadb.yaml @@ -16,7 +16,7 @@ binlog_replicator: records_per_file: 100000 databases: '*test*' -log_level: 'debug' +log_level: 'info' optimize_interval: 3 check_db_updated_interval: 3