diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 0ba4ca5c..91947f25 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -188,6 +188,7 @@ def __init__( ignore_decode_errors=False, verify_checksum=False, enable_logging=True, + use_column_name_cache=False, ): """ Attributes: @@ -230,6 +231,8 @@ def __init__( verify_checksum: If true, verify events read from the binary log by examining checksums. enable_logging: When set to True, logs various details helpful for debugging and monitoring When set to False, logging is disabled to enhance performance. + use_column_name_cache: If true, enables caching of column names from INFORMATION_SCHEMA + for MySQL 5.7 compatibility when binlog metadata is missing. Default is False. """ self.__connection_settings = connection_settings @@ -254,6 +257,8 @@ def __init__( self.__ignore_decode_errors = ignore_decode_errors self.__verify_checksum = verify_checksum self.__optional_meta_data = False + self.__enable_logging = enable_logging + self.__use_column_name_cache = use_column_name_cache # We can't filter on packet level TABLE_MAP and rotate event because # we need them for handling other operations @@ -630,6 +635,8 @@ def fetchone(self): self.__ignore_decode_errors, self.__verify_checksum, self.__optional_meta_data, + self.__enable_logging, + self.__use_column_name_cache, ) if binlog_event.event_type == ROTATE_EVENT: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 868866de..374704b6 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -28,6 +28,8 @@ def __init__( ignore_decode_errors=False, verify_checksum=False, optional_meta_data=False, + enable_logging=False, + use_column_name_cache=False, ): self.packet = from_packet self.table_map = table_map diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index b70628fa..aa97087b 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -74,6 +74,8 @@ def __init__( ignore_decode_errors, verify_checksum, optional_meta_data, + enable_logging, + use_column_name_cache=False, ): # -1 because we ignore the ok byte self.read_bytes = 0 @@ -82,6 +84,8 @@ def __init__( self.packet = from_packet self.charset = ctl_connection.charset + self.enable_logging = enable_logging + self.use_column_name_cache = use_column_name_cache # OK value # timestamp @@ -127,6 +131,8 @@ def __init__( ignore_decode_errors=ignore_decode_errors, verify_checksum=verify_checksum, optional_meta_data=optional_meta_data, + enable_logging=enable_logging, + use_column_name_cache=use_column_name_cache, ) if not self.event._processed: self.event = None diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 11429f74..3044ea53 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -1,6 +1,7 @@ import struct import decimal import datetime +import logging from pymysql.charset import charset_by_name from enum import Enum @@ -15,6 +16,10 @@ from .bitmap import BitCount, BitGet + +# MySQL 5.7 compatibility: Cache for INFORMATION_SCHEMA column names +_COLUMN_NAME_CACHE = {} + class RowsEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) @@ -746,6 +751,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.__ignored_schemas = kwargs["ignored_schemas"] self.__freeze_schema = kwargs["freeze_schema"] self.__optional_meta_data = kwargs["optional_meta_data"] + self.__enable_logging = kwargs.get("enable_logging", False) + self.__use_column_name_cache = kwargs.get("use_column_name_cache", False) # Post-Header self.table_id = self._read_table_id() @@ -909,12 +916,70 @@ def _get_optional_meta_data(self): return optional_metadata + + def _fetch_column_names_from_schema(self): + """ + Fetch column names from INFORMATION_SCHEMA for MySQL 5.7 compatibility. + + Only executes if use_column_name_cache=True is enabled. + Uses module-level cache to avoid repeated queries. + + Returns: + list: Column names in ORDINAL_POSITION order, or empty list + """ + # Only fetch if explicitly enabled (opt-in feature) + if not self.__use_column_name_cache: + return [] + + cache_key = f"{self.schema}.{self.table}" + + # Check cache first + if cache_key in _COLUMN_NAME_CACHE: + return _COLUMN_NAME_CACHE[cache_key] + + try: + query = """ + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s + ORDER BY ORDINAL_POSITION + """ + cursor = self._ctl_connection.cursor() + cursor.execute(query, (self.schema, self.table)) + rows = cursor.fetchall() + # Handle both tuple and dict cursor results + if rows and isinstance(rows[0], dict): + column_names = [row['COLUMN_NAME'] for row in rows] + else: + column_names = [row[0] for row in rows] + cursor.close() + + # Cache result + _COLUMN_NAME_CACHE[cache_key] = column_names + + if self.__enable_logging and column_names: + logging.info(f"Cached column names for {cache_key}: {len(column_names)} columns") + + return column_names + except Exception as e: + if self.__enable_logging: + logging.warning(f"Failed to fetch column names for {cache_key}: {type(e).__name__}: {e}") + # Cache empty result to avoid retry spam + _COLUMN_NAME_CACHE[cache_key] = [] + return [] + def _sync_column_info(self): if not self.__optional_meta_data: - # If optional_meta_data is False Do not sync Event Time Column Schemas + column_names = self._fetch_column_names_from_schema() + if column_names and len(column_names) == self.column_count: + for column_idx in range(self.column_count): + self.columns[column_idx].name = column_names[column_idx] return if len(self.optional_metadata.column_name_list) == 0: - # May Be Now BINLOG_ROW_METADATA = FULL But Before Action BINLOG_ROW_METADATA Mode = MINIMAL + column_names = self._fetch_column_names_from_schema() + if column_names and len(column_names) == self.column_count: + for column_idx in range(self.column_count): + self.columns[column_idx].name = column_names[column_idx] return charset_pos = 0 enum_or_set_pos = 0