|
1 | 1 | import json |
| 2 | +import os |
| 3 | +import shutil |
2 | 4 | import time |
3 | 5 | from collections import defaultdict |
4 | 6 | from logging import getLogger |
5 | 7 |
|
| 8 | +import pymysql.err |
| 9 | + |
6 | 10 | from .binlog_replicator import EventType, LogEvent |
7 | 11 | from .common import Status |
8 | 12 | from .converter import strip_sql_comments |
@@ -70,7 +74,44 @@ def run_realtime_replication(self): |
70 | 74 | ) |
71 | 75 | break |
72 | 76 |
|
73 | | - event = self.replicator.data_reader.read_next_event() |
| 77 | + try: |
| 78 | + event = self.replicator.data_reader.read_next_event() |
| 79 | + except pymysql.err.OperationalError as e: |
| 80 | + # Check if this is the binlog index file corruption error (Error 1236) |
| 81 | + if e.args[0] == 1236: |
| 82 | + logger.error( |
| 83 | + "[binlogrepl] operational error (1236, 'Could not find first log file name in binary log index file')" |
| 84 | + ) |
| 85 | + logger.error(f"[binlogrepl] Full error: {e}") |
| 86 | + logger.info("[binlogrepl] Attempting automatic recovery...") |
| 87 | + |
| 88 | + # Get binlog directory path for this database |
| 89 | + binlog_dir = os.path.join( |
| 90 | + self.replicator.config.binlog_replicator.data_dir, |
| 91 | + self.replicator.database |
| 92 | + ) |
| 93 | + |
| 94 | + # Delete the corrupted binlog directory |
| 95 | + if os.path.exists(binlog_dir): |
| 96 | + logger.warning(f"[binlogrepl] Deleting corrupted binlog directory: {binlog_dir}") |
| 97 | + try: |
| 98 | + shutil.rmtree(binlog_dir) |
| 99 | + logger.info(f"[binlogrepl] Successfully deleted binlog directory: {binlog_dir}") |
| 100 | + except Exception as delete_error: |
| 101 | + logger.error(f"[binlogrepl] Failed to delete binlog directory: {delete_error}", exc_info=True) |
| 102 | + raise RuntimeError("Failed to delete corrupted binlog directory") from delete_error |
| 103 | + else: |
| 104 | + logger.warning(f"[binlogrepl] Binlog directory does not exist: {binlog_dir}") |
| 105 | + |
| 106 | + # Exit process cleanly to trigger automatic restart by runner |
| 107 | + logger.info("[binlogrepl] Exiting process for automatic restart by runner") |
| 108 | + logger.info("[binlogrepl] The runner will automatically restart this process") |
| 109 | + raise RuntimeError("Binlog corruption detected (Error 1236) - restarting for recovery") from e |
| 110 | + else: |
| 111 | + # Re-raise other OperationalErrors |
| 112 | + logger.error(f"[binlogrepl] Unhandled OperationalError: {e}", exc_info=True) |
| 113 | + raise |
| 114 | + |
74 | 115 | if event is None: |
75 | 116 | time.sleep(self.READ_LOG_INTERVAL) |
76 | 117 | self.upload_records_if_required(table_name=None) |
|
0 commit comments