Skip to content

Commit e8c93b8

Browse files
committed
Refactor runner.py and utils.py for improved readability and consistency
- Standardized string formatting across command initialization in `runner.py` for better consistency. - Enhanced the structure of the `DbReplicatorRunner` class by using multi-line arguments for improved readability. - Updated logging messages in both `runner.py` and `utils.py` to use consistent string formatting. - Improved import organization in `runner.py` and `utils.py` for better clarity and maintainability. - Added helper methods in `base_replication_test.py` to streamline replication setup and target database creation in tests.
1 parent 74017be commit e8c93b8

File tree

10 files changed

+388
-520
lines changed

10 files changed

+388
-520
lines changed

mysql_ch_replicator/runner.py

Lines changed: 77 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,71 @@
11
import os
2-
import time
32
import sys
43
import threading
5-
from uvicorn import Config, Server
6-
from fastapi import APIRouter, FastAPI
7-
4+
import time
85
from logging import getLogger
96

10-
from .config import Settings
11-
from .mysql_api import MySQLApi
12-
from .utils import ProcessRunner, GracefulKiller
7+
from fastapi import APIRouter, FastAPI
8+
from uvicorn import Config, Server
139

1410
from . import db_replicator
15-
11+
from .config import Settings
12+
from .mysql_api import MySQLApi
13+
from .utils import GracefulKiller, ProcessRunner
1614

1715
logger = getLogger(__name__)
1816

1917

20-
2118
class BinlogReplicatorRunner(ProcessRunner):
2219
def __init__(self, config_file):
23-
super().__init__(f'{sys.argv[0]} --config {config_file} binlog_replicator')
20+
super().__init__(f"{sys.argv[0]} --config {config_file} binlog_replicator")
2421

2522

2623
class DbReplicatorRunner(ProcessRunner):
27-
def __init__(self, db_name, config_file, worker_id=None, total_workers=None, initial_only=False):
28-
cmd = f'{sys.argv[0]} --config {config_file} --db {db_name} db_replicator'
29-
24+
def __init__(
25+
self,
26+
db_name,
27+
config_file,
28+
worker_id=None,
29+
total_workers=None,
30+
initial_only=False,
31+
):
32+
cmd = f"{sys.argv[0]} --config {config_file} --db {db_name} db_replicator"
33+
3034
if worker_id is not None:
31-
cmd += f' --worker_id={worker_id}'
32-
35+
cmd += f" --worker_id={worker_id}"
36+
3337
if total_workers is not None:
34-
cmd += f' --total_workers={total_workers}'
35-
38+
cmd += f" --total_workers={total_workers}"
39+
3640
if initial_only:
37-
cmd += ' --initial_only=True'
38-
41+
cmd += " --initial_only=True"
42+
3943
super().__init__(cmd)
4044

4145

4246
class DbOptimizerRunner(ProcessRunner):
4347
def __init__(self, config_file):
44-
super().__init__(f'{sys.argv[0]} --config {config_file} db_optimizer')
48+
super().__init__(f"{sys.argv[0]} --config {config_file} db_optimizer")
4549

4650

4751
class RunAllRunner(ProcessRunner):
4852
def __init__(self, db_name, config_file):
49-
super().__init__(f'{sys.argv[0]} --config {config_file} run_all --db {db_name}')
53+
super().__init__(f"{sys.argv[0]} --config {config_file} run_all --db {db_name}")
5054

5155

5256
app = FastAPI()
5357

5458

55-
5659
class Runner:
57-
5860
DB_REPLICATOR_RUN_DELAY = 5
5961

60-
def __init__(self, config: Settings, wait_initial_replication: bool, databases: str):
62+
def __init__(
63+
self, config: Settings, wait_initial_replication: bool, databases: str
64+
):
6165
self.config = config
6266
self.databases = databases or config.databases
6367
self.wait_initial_replication = wait_initial_replication
64-
self.runners: dict[str: DbReplicatorRunner] = {}
68+
self.runners: dict[str, DbReplicatorRunner] = {}
6569
self.binlog_runner = None
6670
self.db_optimizer = None
6771
self.http_server = None
@@ -71,13 +75,15 @@ def __init__(self, config: Settings, wait_initial_replication: bool, databases:
7175

7276
def run_server(self):
7377
if not self.config.http_host or not self.config.http_port:
74-
logger.info('http server disabled')
78+
logger.info("http server disabled")
7579
return
76-
logger.info('starting http server')
80+
logger.info("starting http server")
7781

7882
config = Config(app=app, host=self.config.http_host, port=self.config.http_port)
7983
self.router = APIRouter()
80-
self.router.add_api_route("/restart_replication", self.restart_replication, methods=["GET"])
84+
self.router.add_api_route(
85+
"/restart_replication", self.restart_replication, methods=["GET"]
86+
)
8187
app.include_router(self.router)
8288

8389
self.http_server = Server(config)
@@ -87,15 +93,15 @@ def restart_replication(self):
8793
self.replication_restarted = False
8894
self.need_restart_replication = True
8995
while not self.replication_restarted:
90-
logger.info('waiting replication restarted..')
96+
logger.info("waiting replication restarted..")
9197
time.sleep(1)
9298
return {"restarted": True}
9399

94100
def is_initial_replication_finished(self, db_name):
95101
state_path = os.path.join(
96102
self.config.binlog_replicator.data_dir,
97103
db_name,
98-
'state.pckl',
104+
"state.pckl",
99105
)
100106
state = db_replicator.State(state_path)
101107
return state.status == db_replicator.Status.RUNNING_REALTIME_REPLICATION
@@ -111,43 +117,48 @@ def restart_dead_processes(self):
111117
def restart_replication_if_required(self):
112118
if not self.need_restart_replication:
113119
return
114-
logger.info('restarting replication')
120+
logger.info("restarting replication")
115121
for db_name, runner in self.runners.items():
116-
logger.info(f'stopping runner {db_name}')
122+
logger.info(f"stopping runner {db_name}")
117123
runner.stop()
118-
path = os.path.join(self.config.binlog_replicator.data_dir, db_name, 'state.pckl')
124+
path = os.path.join(
125+
self.config.binlog_replicator.data_dir, db_name, "state.pckl"
126+
)
119127
if os.path.exists(path):
120-
logger.debug(f'removing {path}')
128+
logger.debug(f"removing {path}")
121129
os.remove(path)
122130

123-
logger.info('starting replication')
131+
logger.info("starting replication")
124132
self.restart_dead_processes()
125133
self.need_restart_replication = False
126134
self.replication_restarted = True
127135

128136
def check_databases_updated(self, mysql_api: MySQLApi):
129-
logger.debug('check if databases were created / removed in mysql')
137+
logger.debug("check if databases were created / removed in mysql")
130138
databases = mysql_api.get_databases()
131-
logger.info(f'mysql databases: {databases}')
139+
logger.info(f"mysql databases: {databases}")
132140
databases = [db for db in databases if self.config.is_database_matches(db)]
133-
logger.info(f'mysql databases filtered: {databases}')
141+
logger.info(f"mysql databases filtered: {databases}")
134142
for db in databases:
135143
if db in self.runners:
136144
continue
137-
logger.info(f'running replication for {db} (database created in mysql)')
138-
runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file)
145+
logger.info(f"running replication for {db} (database created in mysql)")
146+
runner = self.runners[db] = DbReplicatorRunner(
147+
db_name=db, config_file=self.config.settings_file
148+
)
139149
runner.run()
140150

141151
for db in self.runners.keys():
142152
if db in databases:
143153
continue
144-
logger.info(f'stop replication for {db} (database removed from mysql)')
154+
logger.info(f"stop replication for {db} (database removed from mysql)")
145155
self.runners[db].stop()
146156
self.runners.pop(db)
147157

148158
def run(self):
149159
mysql_api = MySQLApi(
150-
database=None, mysql_settings=self.config.mysql,
160+
database=None,
161+
mysql_settings=self.config.mysql,
151162
)
152163
databases = mysql_api.get_databases()
153164
databases = [db for db in databases if self.config.is_database_matches(db)]
@@ -173,8 +184,10 @@ def run(self):
173184
break
174185
if not self.is_initial_replication_finished(db_name=db):
175186
continue
176-
logger.info(f'running replication for {db} (initial replication finished)')
177-
runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file)
187+
logger.info(f"running replication for {db} (initial replication finished)")
188+
runner = self.runners[db] = DbReplicatorRunner(
189+
db_name=db, config_file=self.config.settings_file
190+
)
178191
runner.run()
179192

180193
# Second - run replication for other DBs one by one and wait until initial replication finished
@@ -184,44 +197,54 @@ def run(self):
184197
if killer.kill_now:
185198
break
186199

187-
logger.info(f'running replication for {db} (initial replication not finished - waiting)')
188-
runner = self.runners[db] = DbReplicatorRunner(db_name=db, config_file=self.config.settings_file)
200+
logger.info(
201+
f"running replication for {db} (initial replication not finished - waiting)"
202+
)
203+
runner = self.runners[db] = DbReplicatorRunner(
204+
db_name=db, config_file=self.config.settings_file
205+
)
189206
runner.run()
190207
if not self.wait_initial_replication:
191208
continue
192209

193-
while not self.is_initial_replication_finished(db_name=db) and not killer.kill_now:
210+
while (
211+
not self.is_initial_replication_finished(db_name=db)
212+
and not killer.kill_now
213+
):
194214
time.sleep(1)
195215
self.restart_dead_processes()
196216

197-
logger.info('all replicators launched')
217+
logger.info("all replicators launched")
198218

199219
last_check_db_updated = time.time()
200220
while not killer.kill_now:
201221
time.sleep(1)
202222
self.restart_replication_if_required()
203223
self.restart_dead_processes()
204-
if time.time() - last_check_db_updated > self.config.check_db_updated_interval:
224+
if (
225+
time.time() - last_check_db_updated
226+
> self.config.check_db_updated_interval
227+
):
205228
self.check_databases_updated(mysql_api=mysql_api)
206229
last_check_db_updated = time.time()
207230

208-
logger.info('stopping runner')
231+
logger.info("stopping runner")
209232

210233
if self.binlog_runner is not None:
211-
logger.info('stopping binlog replication')
234+
logger.info("stopping binlog replication")
212235
self.binlog_runner.stop()
213236

214237
if self.db_optimizer is not None:
215-
logger.info('stopping db_optimizer')
238+
logger.info("stopping db_optimizer")
216239
self.db_optimizer.stop()
217240

218241
for db_name, db_replication_runner in self.runners.items():
219-
logger.info(f'stopping replication for {db_name}')
242+
logger.info(f"stopping replication for {db_name}")
220243
db_replication_runner.stop()
221244

222245
if self.http_server:
223246
self.http_server.should_exit = True
224247

225248
server_thread.join()
226249

227-
logger.info('stopped')
250+
logger.info("stopped")

0 commit comments

Comments
 (0)