Skip to content

Commit 890aa15

Browse files
committed
Add log forwarding functionality to ProcessRunner for real-time logging
1 parent 9e4f577 commit 890aa15

File tree

1 file changed

+97
-0
lines changed

1 file changed

+97
-0
lines changed

mysql_ch_replicator/utils.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import subprocess
55
import sys
66
import tempfile
7+
import threading
78
import time
89
from logging import getLogger
910
from pathlib import Path
@@ -38,6 +39,68 @@ def __init__(self, cmd):
3839
self.cmd = cmd
3940
self.process = None
4041
self.log_file = None
42+
self.log_forwarding_thread = None
43+
self.should_stop_forwarding = False
44+
45+
def _forward_logs(self):
46+
"""Forward subprocess logs to the main process logger in real-time."""
47+
if not self.log_file or not hasattr(self.log_file, 'name'):
48+
return
49+
50+
log_path = self.log_file.name
51+
last_position = 0
52+
53+
# Extract process name from command for logging prefix
54+
cmd_parts = self.cmd.split()
55+
process_name = "subprocess"
56+
if len(cmd_parts) > 0:
57+
if "binlog_replicator" in self.cmd:
58+
process_name = "binlogrepl"
59+
elif "db_replicator" in self.cmd and "--db" in cmd_parts:
60+
try:
61+
db_index = cmd_parts.index("--db") + 1
62+
if db_index < len(cmd_parts):
63+
db_name = cmd_parts[db_index]
64+
process_name = f"dbrepl {db_name}"
65+
except (ValueError, IndexError):
66+
process_name = "dbrepl"
67+
elif "db_optimizer" in self.cmd:
68+
process_name = "dbopt"
69+
70+
while not self.should_stop_forwarding:
71+
try:
72+
if os.path.exists(log_path):
73+
with open(log_path, 'r') as f:
74+
f.seek(last_position)
75+
new_content = f.read()
76+
if new_content:
77+
# Forward each line to main logger with subprocess prefix
78+
lines = new_content.strip().split('\n')
79+
for line in lines:
80+
if line.strip():
81+
# Remove timestamp and level from subprocess log to avoid duplication
82+
# Format: [tag timestamp level] message -> message
83+
clean_line = line
84+
if '] ' in line:
85+
bracket_end = line.find('] ')
86+
if bracket_end != -1:
87+
clean_line = line[bracket_end + 2:]
88+
89+
# Only forward important log messages to avoid spam
90+
# Forward stats, errors, warnings, and key info messages
91+
if any(keyword in clean_line.lower() for keyword in
92+
['stats:', 'ch_stats:', 'error', 'warning', 'failed', 'last transaction',
93+
'processed events', 'connection', 'replication', 'events_count',
94+
'insert_events_count', 'erase_events_count']):
95+
logger.info(f"[{process_name}] {clean_line}")
96+
97+
last_position = f.tell()
98+
99+
time.sleep(2) # Check for new logs every 2 seconds to reduce overhead
100+
101+
except Exception as e:
102+
logger.debug(f"Error forwarding logs for {process_name}: {e}")
103+
time.sleep(2)
41104

42105
def run(self):
43106
"""
@@ -147,6 +210,16 @@ def run(self):
147210
)
148211
self.log_file.flush()
149212
logger.debug(f"Started process {self.process.pid}: {self.cmd}")
213+
214+
# Start log forwarding thread
215+
self.should_stop_forwarding = False
216+
self.log_forwarding_thread = threading.Thread(
217+
target=self._forward_logs,
218+
daemon=True,
219+
name=f"LogForwarder-{self.process.pid}"
220+
)
221+
self.log_forwarding_thread.start()
222+
150223
except Exception as e:
151224
if self.log_file:
152225
self.log_file.close()
@@ -186,6 +259,14 @@ def restart_dead_process_if_required(self):
186259
# Process is running fine.
187260
return
188261

262+
# Stop log forwarding thread for dead process
263+
self.should_stop_forwarding = True
264+
if self.log_forwarding_thread and self.log_forwarding_thread.is_alive():
265+
try:
266+
self.log_forwarding_thread.join(timeout=2.0)
267+
except Exception as e:
268+
logger.debug(f"Error joining log forwarding thread during restart: {e}")
269+
189270
# Read log file for debugging instead of using communicate() to avoid deadlock
190271
log_content = ""
191272
if self.log_file:
@@ -210,6 +291,14 @@ def restart_dead_process_if_required(self):
210291
self.run()
211292

212293
def stop(self):
294+
# Stop log forwarding thread first
295+
self.should_stop_forwarding = True
296+
if self.log_forwarding_thread and self.log_forwarding_thread.is_alive():
297+
try:
298+
self.log_forwarding_thread.join(timeout=2.0)
299+
except Exception as e:
300+
logger.debug(f"Error joining log forwarding thread: {e}")
301+
213302
if self.process is not None:
214303
try:
215304
# Send SIGINT first for graceful shutdown
@@ -244,6 +333,14 @@ def wait_complete(self):
244333
self.process.wait()
245334
self.process = None
246335

336+
# Stop log forwarding thread
337+
self.should_stop_forwarding = True
338+
if self.log_forwarding_thread and self.log_forwarding_thread.is_alive():
339+
try:
340+
self.log_forwarding_thread.join(timeout=2.0)
341+
except Exception as e:
342+
logger.debug(f"Error joining log forwarding thread: {e}")
343+
247344
# Clean up log file
248345
if self.log_file:
249346
try:

0 commit comments

Comments
 (0)