Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions mysql_ch_replicator/binlog_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import json
import random
import re
import pymysql

from enum import Enum
from logging import getLogger
from dataclasses import dataclass
from collections import OrderedDict

from pymysql.err import OperationalError

Expand Down Expand Up @@ -252,15 +254,15 @@ def __init__(self, replicator_settings: BinlogReplicatorSettings):
self.records_per_file = replicator_settings.records_per_file
self.db_file_writers: dict = {} # db_name => FileWriter

def store_event(self, log_event: LogEvent):
def store_event(self, log_event: LogEvent, allow_create_new_file: bool = True):
logger.debug(f'store event {log_event.transaction_id}')
file_writer = self.get_or_create_file_writer(log_event.db_name)
file_writer = self.get_or_create_file_writer(log_event.db_name, allow_create_new_file)
file_writer.write_event(log_event)

def get_or_create_file_writer(self, db_name: str) -> FileWriter:
def get_or_create_file_writer(self, db_name: str, allow_create_new_file: bool) -> FileWriter:
file_writer = self.db_file_writers.get(db_name)
if file_writer is not None:
if file_writer.num_records >= self.records_per_file:
if file_writer.num_records >= self.records_per_file and allow_create_new_file:
file_writer.close()
del self.db_file_writers[db_name]
file_writer = None
Expand Down Expand Up @@ -359,7 +361,10 @@ def __init__(self, settings: Settings):

log_file, log_pos = None, None
if self.state.prev_last_seen_transaction:
log_file, log_pos = self.state.prev_last_seen_transaction
log_file, log_pos = self.state.prev_last_seen_transaction[:2]

is_mariadb = self._detect_mariadb(mysql_settings)
logger.info(f'detected database type: {"MariaDB" if is_mariadb else "MySQL"}')

self.stream = BinLogStreamReader(
connection_settings=mysql_settings,
Expand All @@ -369,10 +374,26 @@ def __init__(self, settings: Settings):
log_pos=log_pos,
log_file=log_file,
mysql_timezone=settings.mysql_timezone,
is_mariadb=is_mariadb,
)
self.last_state_update = 0
self.last_binlog_clear_time = 0

def _detect_mariadb(self, mysql_settings):
"""Detect if connected to MariaDB or MySQL by checking version string"""

try:
conn = pymysql.connect(**mysql_settings)
cursor = conn.cursor()
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()[0]
cursor.close()
conn.close()
return 'MariaDB' in version
except Exception as e:
logger.warning(f'failed to detect database type: {e}, assuming MySQL')
return False

def clear_old_binlog_if_required(self):
curr_time = time.time()
if curr_time - self.last_binlog_clear_time < BinlogReplicator.BINLOG_CLEAN_INTERVAL:
Expand Down Expand Up @@ -426,6 +447,8 @@ def _try_parse_db_name_from_query(cls, query: str) -> str:

def run(self):
last_transaction_id = None
position_seq_cache = OrderedDict()
max_cache_size = 500

killer = GracefulKiller()

Expand All @@ -445,7 +468,23 @@ def run(self):
for event in self.stream:
last_read_count += 1
total_processed_events += 1
transaction_id = (self.stream.log_file, self.stream.log_pos)
base_transaction_id = (self.stream.log_file, self.stream.log_pos)

if self.stream.is_mariadb:
if base_transaction_id in position_seq_cache:
transaction_seq = position_seq_cache[base_transaction_id] + 1
else:
transaction_seq = 0

position_seq_cache[base_transaction_id] = transaction_seq

if len(position_seq_cache) > max_cache_size:
position_seq_cache.popitem(last=False)

transaction_id = (base_transaction_id[0], base_transaction_id[1], transaction_seq)
else:
transaction_id = base_transaction_id

last_transaction_id = transaction_id

self.update_state_if_required(transaction_id)
Expand Down Expand Up @@ -524,7 +563,8 @@ def run(self):
f'records: {log_event.records}',
)

self.data_writer.store_event(log_event)
allow_create_new_file = not self.stream.is_mariadb or transaction_seq == 0
self.data_writer.store_event(log_event, allow_create_new_file)

if last_read_count > 1000:
break
Expand Down
7 changes: 7 additions & 0 deletions mysql_ch_replicator/db_replicator_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ def handle_create_table_query(self, query, db_name):
return
target_table_name = self.replicator.get_target_table_name(mysql_structure.table_name)
ch_structure.table_name = target_table_name
check_table_exists = f"EXISTS TABLE `{self.replicator.clickhouse_api.database}`.`{target_table_name}`"
table_exists = self.replicator.clickhouse_api.client.command(check_table_exists)
if table_exists:
logger.info(f'table {target_table_name} already exists in {self.replicator.clickhouse_api.database}, skipping CREATE TABLE')
if mysql_structure.table_name not in self.replicator.state.tables_structure:
self.replicator.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
return
self.replicator.state.tables_structure[mysql_structure.table_name] = (mysql_structure, ch_structure)
indexes = self.replicator.config.get_indexes(self.replicator.database, mysql_structure.table_name)
partition_bys = self.replicator.config.get_partition_bys(self.replicator.database, mysql_structure.table_name)
Expand Down
26 changes: 26 additions & 0 deletions tests/test_e2e_integration.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from common import *
import pytest
import decimal
import os
from mysql_ch_replicator import config
from mysql_ch_replicator import mysql_api
from mysql_ch_replicator import clickhouse_api
from mysql_ch_replicator.binlog_replicator import FileReader, EventType


@pytest.mark.parametrize('config_file', [
Expand Down Expand Up @@ -45,6 +47,9 @@ def test_e2e_regular(config_file):

binlog_replicator_runner = BinlogReplicatorRunner(cfg_file=config_file)
binlog_replicator_runner.run()

time.sleep(10)

db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file=config_file)
db_replicator_runner.run()

Expand Down Expand Up @@ -148,6 +153,27 @@ def test_e2e_regular(config_file):
mysql.execute(f'DROP TABLE `{TEST_TABLE_NAME_3}`')
assert_wait(lambda: TEST_TABLE_NAME_3 not in ch.get_tables())

binlog_dir = os.path.join(cfg.binlog_replicator.data_dir, TEST_DB_NAME)
binlog_files = [f for f in os.listdir(binlog_dir) if f.endswith('.bin')]
assert len(binlog_files) > 0, 'no binlog files found'

expected_tuple_len = 2 if config_file == CONFIG_FILE else 3
binlog_file_path = os.path.join(binlog_dir, binlog_files[0])
file_reader = FileReader(binlog_file_path)

event_found = False
while True:
event = file_reader.read_next_event()
if event is None:
break
if event.event_type == EventType.ADD_EVENT.value:
assert len(event.transaction_id) == expected_tuple_len, \
f'expected transaction_id tuple length {expected_tuple_len}, got {len(event.transaction_id)}: {event.transaction_id}'
event_found = True

assert event_found, 'no ADD_EVENT found in binlog file'
file_reader.close()

db_replicator_runner.stop()


Expand Down
191 changes: 191 additions & 0 deletions tests/test_issue_209.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import time

from mysql_ch_replicator import config
from mysql_ch_replicator import mysql_api
from mysql_ch_replicator import clickhouse_api

from common import *


def test_issue_209_bulk_insert():
"""
Test for issue #209: Insertion cuts off after certain amount of records
https://github.com/bakwc/mysql_ch_replicator/issues/209

This test verifies that all records are properly replicated when inserting
1000 records at once.
"""
config_file = CONFIG_FILE_MARIADB

cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f"""
CREATE TABLE `sites` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`email` varchar(100) NOT NULL,
`project_id` int(11) DEFAULT NULL,
`kw` varchar(191) NOT NULL,
`timestamp` timestamp NOT NULL DEFAULT current_timestamp(),
`active` tinyint(1) NOT NULL DEFAULT 1,
`deleted_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `active_idx` (`active`),
KEY `sites_project_id_foreign` (`project_id`),
KEY `sites_user_id_index` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_unicode_ci
""")

run_all_runner = RunAllRunner(cfg_file=config_file)
run_all_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: 'sites' in ch.get_tables())

mysql.execute("""
INSERT INTO sites (user_id, email, project_id, kw, timestamp, active, deleted_at)
WITH RECURSIVE seq (n) AS (
SELECT 1
UNION ALL
SELECT n + 1 FROM seq WHERE n < 1000
)
SELECT
12345 AS user_id,
'test@example.com' AS email,
12345 AS project_id,
concat('Keyword ', n) AS kw,
now() AS timestamp,
1 AS active,
NULL AS deleted_at
FROM seq;
""", commit=True)

mysql.execute("SELECT COUNT(*) FROM sites")
mysql_count = mysql.cursor.fetchall()[0][0]
assert mysql_count == 1000, f"Expected 1000 records in MySQL, got {mysql_count}"

assert_wait(lambda: len(ch.select('sites')) == 1000, max_wait_time=10.0)

ch_count = len(ch.select('sites'))
assert ch_count == 1000, f"Expected 1000 records in ClickHouse, got {ch_count}"

run_all_runner.stop()

assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
assert 'Traceback' not in read_logs(TEST_DB_NAME)


def test_issue_209_bulk_update():
"""
Test for issue #209: Bulk UPDATE operations not replicating properly

This test reproduces the specific issue reported where UPDATE operations
on MariaDB split into multiple UpdateRowsEvent objects with same position,
causing events to be processed repeatedly.
"""
config_file = CONFIG_FILE_MARIADB

cfg = config.Settings()
cfg.load(config_file)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute("RESET MASTER", commit=True)
time.sleep(1)

mysql.execute(f"""
CREATE TABLE `sites` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) DEFAULT NULL,
`email` varchar(100) NOT NULL,
`project_id` int(11) DEFAULT NULL,
`kw` varchar(191) NOT NULL,
`timestamp` timestamp NOT NULL DEFAULT current_timestamp(),
`active` tinyint(1) NOT NULL DEFAULT 1,
`deleted_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `active_idx` (`active`),
KEY `sites_project_id_foreign` (`project_id`),
KEY `sites_user_id_index` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COLLATE=utf8mb3_unicode_ci
""")

run_all_runner = RunAllRunner(cfg_file=config_file)
run_all_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: 'sites' in ch.get_tables())

for batch in range(5):
offset = batch * 1000
mysql.execute(f"""
INSERT INTO sites (user_id, email, project_id, kw, timestamp, active, deleted_at)
WITH RECURSIVE seq (n) AS (
SELECT 1
UNION ALL
SELECT n + 1 FROM seq WHERE n < 1000
)
SELECT
12345 AS user_id,
'test@example.com' AS email,
12345 AS project_id,
concat('Keyword ', n + {offset}) AS kw,
now() AS timestamp,
0 AS active,
NULL AS deleted_at
FROM seq;
""", commit=True)

mysql.execute("SELECT COUNT(*) FROM sites WHERE active = 0")
mysql_count = mysql.cursor.fetchall()[0][0]
assert mysql_count == 5000, f"Expected 5000 inactive records in MySQL, got {mysql_count}"

assert_wait(lambda: len(ch.select('sites')) == 5000, max_wait_time=15.0)

mysql.execute("UPDATE sites SET active = 1 WHERE active = 0", commit=True)

mysql.execute("SELECT COUNT(*) FROM sites WHERE active = 1")
mysql_count_active = mysql.cursor.fetchall()[0][0]
assert mysql_count_active == 5000, f"Expected 5000 active records in MySQL, got {mysql_count_active}"

def check_all_active():
result = ch.select('sites')
active_count = sum(1 for row in result if row['active'] == 1)
return active_count == 5000

assert_wait(check_all_active, max_wait_time=30.0)

result = ch.select('sites')
active_count = sum(1 for row in result if row['active'] == 1)
assert active_count == 5000, f"Expected 5000 active records in ClickHouse, got {active_count}"

run_all_runner.stop()

assert_wait(lambda: 'stopping db_replicator' in read_logs(TEST_DB_NAME))
assert 'Traceback' not in read_logs(TEST_DB_NAME)

2 changes: 1 addition & 1 deletion tests/tests_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ binlog_replicator:
binlog_retention_period: 43200 # 12 hours in seconds

databases: '*test*'
log_level: 'debug'
log_level: 'info'
optimize_interval: 3
check_db_updated_interval: 3

Expand Down
2 changes: 1 addition & 1 deletion tests/tests_config_mariadb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ binlog_replicator:
records_per_file: 100000

databases: '*test*'
log_level: 'debug'
log_level: 'info'
optimize_interval: 3
check_db_updated_interval: 3

Expand Down
Loading