Skip to content

Commit 2a060e0

Browse files
abhinav04sharmainikep
authored andcommitted
Adding ability to apply binlogs thru mysqlbinlog in multi-threaded mode
Summary: Added a new flag in `mysqlbinlog` `--mta-workers=x` that tells the server to spawn `x` dependency applier workers to apply transactions. When `--mta-workers` is specified all events are printed in their base64 representation so we can create log events out of them. Differential Revision: D49466823 --------------------------------------------------------------------------- Cast enum into target type in a ternary operator (facebook#1411) Summary: This fixes a GCC build error: sql/rpl_replica.cc: In function ‘int slave_start_single_worker(Relay_log_info*, ulong)’: sql/rpl_replica.cc:7207:28: error: enumerated and non-enumerated type in conditional expression [-Werror=extra] 7207 | rli->is_fake() ? INFO_REPOSITORY_DUMMY : opt_rli_repository_id, i, | ~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Pull Request resolved: facebook#1411 Differential Revision: D52207263 fbshipit-source-id: 47d7f5c
1 parent 071cda5 commit 2a060e0

21 files changed

+398
-52
lines changed

client/mysqlbinlog.cc

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,7 @@ static char *database = nullptr;
743743
static char *output_file = nullptr;
744744
static char *rewrite = nullptr;
745745
bool force_opt = false, short_form = false, idempotent_mode = false;
746+
int mta_workers = 0;
746747
static bool debug_info_flag, debug_check_flag;
747748
static bool force_if_open_opt = true, raw_mode = false;
748749
static bool to_last_remote_log = false, stop_never = false;
@@ -1279,6 +1280,26 @@ static bool shall_skip_gtids(const Log_event *ev, Gtid *cached_gtid) {
12791280
return filtered;
12801281
}
12811282

1283+
static void print_event(Log_event *ev, FILE *result_file,
1284+
PRINT_EVENT_INFO *info) {
1285+
if (info->base64_output_mode != BASE64_OUTPUT_FULL) {
1286+
return ev->print(result_file, info);
1287+
}
1288+
1289+
auto cache = &info->head_cache;
1290+
1291+
if (!info->inside_group) my_b_printf(cache, "BINLOG '\n");
1292+
if (ev->starts_group() || is_gtid_event(ev)) info->inside_group = true;
1293+
if (ev->ends_group()) info->inside_group = false;
1294+
ev->print_base64(cache, info, info->inside_group);
1295+
1296+
if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT) {
1297+
info->printed_fd_event = true;
1298+
my_b_printf(cache, "/*!50616 SET @@SESSION.GTID_NEXT='AUTOMATIC'*/%s\n",
1299+
info->delimiter);
1300+
}
1301+
}
1302+
12821303
/**
12831304
Helper function that prints the cached begin query event to the output
12841305
@@ -1288,7 +1309,7 @@ static bool shall_skip_gtids(const Log_event *ev, Gtid *cached_gtid) {
12881309
@retval False ERROR
12891310
*/
12901311
static bool print_cached_begin_query(PRINT_EVENT_INFO *print_event_info) {
1291-
begin_query_ev_cache->print(result_file, print_event_info);
1312+
print_event(begin_query_ev_cache, result_file, print_event_info);
12921313
auto head = &print_event_info->head_cache;
12931314
if (head->error == -1) {
12941315
return false;
@@ -1520,7 +1541,7 @@ void handle_last_rows_query_event(bool print,
15201541
my_off_t temp_log_pos = last_rows_query_event.event_pos;
15211542
auto old_hexdump_from = print_event_info->hexdump_from;
15221543
print_event_info->hexdump_from = (opt_hexdump ? temp_log_pos : 0);
1523-
last_rows_query_event.event->print(result_file, print_event_info);
1544+
print_event(last_rows_query_event.event, result_file, print_event_info);
15241545
print_event_info->hexdump_from = old_hexdump_from;
15251546
}
15261547
last_rows_query_event.event->register_temp_buf(old_temp_buf);
@@ -1637,7 +1658,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
16371658

16381659
switch (ev_type) {
16391660
case binary_log::TRANSACTION_PAYLOAD_EVENT:
1640-
ev->print(result_file, print_event_info);
1661+
print_event(ev, result_file, print_event_info);
16411662
if (head->error == -1) goto err;
16421663
break;
16431664
case binary_log::QUERY_EVENT: {
@@ -1660,7 +1681,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
16601681
my_off_t temp_log_pos = pop_event_array.event_pos;
16611682
print_event_info->hexdump_from = (opt_hexdump ? temp_log_pos : 0);
16621683
if (!parent_query_skips)
1663-
temp_event->print(result_file, print_event_info);
1684+
print_event(temp_event, result_file, print_event_info);
16641685
delete temp_event;
16651686
}
16661687

@@ -1761,7 +1782,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
17611782
if (!in_transaction) seen_gtid = false;
17621783
}
17631784

1764-
ev->print(result_file, print_event_info);
1785+
print_event(ev, result_file, print_event_info);
17651786
if (head->error == -1) goto err;
17661787
break;
17671788
}
@@ -1795,7 +1816,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
17951816
the subsequent call load_processor.process fails, because the
17961817
output of Append_block_log_event::print is only a comment.
17971818
*/
1798-
ev->print(result_file, print_event_info);
1819+
print_event(ev, result_file, print_event_info);
17991820

18001821
if (opt_print_gtids && encounter_gtid(cached_gtid)) goto err;
18011822

@@ -1833,7 +1854,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
18331854

18341855
print_event_info->common_header_len =
18351856
dynamic_cast<Format_description_event *>(ev)->common_header_len;
1836-
ev->print(result_file, print_event_info);
1857+
print_event(ev, result_file, print_event_info);
18371858

18381859
if (head->error == -1) goto err;
18391860
if (!force_if_open_opt &&
@@ -1848,7 +1869,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
18481869
break;
18491870
}
18501871
case binary_log::BEGIN_LOAD_QUERY_EVENT:
1851-
ev->print(result_file, print_event_info);
1872+
print_event(ev, result_file, print_event_info);
18521873
if (head->error == -1) goto err;
18531874
if ((retval = load_processor.process(
18541875
(Begin_load_query_log_event *)ev)) != OK_CONTINUE)
@@ -1864,6 +1885,10 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
18641885
if (shall_skip_database(exlq->db))
18651886
print_event_info->skipped_event_in_transaction = true;
18661887
else {
1888+
if (print_event_info->base64_output_mode == BASE64_OUTPUT_FULL) {
1889+
error("Cannot handle Execute_load_query");
1890+
goto err;
1891+
}
18671892
if (fname) {
18681893
convert_path_to_forward_slashes(fname);
18691894
exlq->print(result_file, print_event_info, fname);
@@ -2038,7 +2063,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
20382063
goto err;
20392064
}
20402065

2041-
ev->print(result_file, print_event_info);
2066+
print_event(ev, result_file, print_event_info);
20422067

20432068
print_event_info->have_unflushed_events = true;
20442069

@@ -2069,7 +2094,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
20692094
print_event_info->delimiter);
20702095
print_event_info->skipped_event_in_transaction = false;
20712096

2072-
ev->print(result_file, print_event_info);
2097+
print_event(ev, result_file, print_event_info);
20732098
if (head->error == -1) goto err;
20742099
break;
20752100
}
@@ -2091,12 +2116,12 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
20912116
begin_query_ev_cache = nullptr;
20922117
if (skip) break;
20932118
}
2094-
ev->print(result_file, print_event_info);
2119+
print_event(ev, result_file, print_event_info);
20952120
if (head->error == -1) goto err;
20962121
break;
20972122
}
20982123
case binary_log::METADATA_EVENT: {
2099-
ev->print(result_file, print_event_info);
2124+
print_event(ev, result_file, print_event_info);
21002125
if (head->error == -1) goto err;
21012126

21022127
/* Copy and flush head cache and body cache */
@@ -2118,7 +2143,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
21182143
"--include-gtids, respectively, instead.");
21192144
[[fallthrough]];
21202145
default:
2121-
ev->print(result_file, print_event_info);
2146+
print_event(ev, result_file, print_event_info);
21222147
if (head->error == -1) goto err;
21232148
}
21242149
/* Flush head cache to result_file for every event */
@@ -2246,6 +2271,11 @@ static struct my_option my_long_options[] = {
22462271
"applying Row Events",
22472272
&idempotent_mode, &idempotent_mode, nullptr, GET_BOOL, NO_ARG, 0, 0, 0,
22482273
nullptr, 0, nullptr},
2274+
{"mta-workers", 'w',
2275+
"Number of multi-threaded workers to spawn on the "
2276+
"server to apply binlogs",
2277+
&mta_workers, &mta_workers, nullptr, GET_INT, REQUIRED_ARG, 0, 0, 0,
2278+
nullptr, 0, nullptr},
22492279
{"local-load", 'l',
22502280
"Prepare local temporary files for LOAD DATA INFILE in the specified "
22512281
"directory.",
@@ -3808,6 +3838,11 @@ static int args_post_process(void) {
38083838
global_sid_lock->unlock();
38093839
}
38103840

3841+
if (mta_workers && opt_skip_gtids == 0) {
3842+
error("--mta-workers requires --skip-gtids option");
3843+
return ERROR_STOP;
3844+
}
3845+
38113846
return OK_CONTINUE;
38123847
}
38133848

@@ -4182,6 +4217,17 @@ int main(int argc, char **argv) {
41824217
fprintf(result_file, "/*!80019 SET @@SESSION.REQUIRE_ROW_FORMAT=1*/;\n\n");
41834218
}
41844219

4220+
auto orig_base64_output_mode = opt_base64_output_mode;
4221+
auto orig_short_form = short_form;
4222+
if (mta_workers) {
4223+
// we need to work in full base64 and short form mode for MTA
4224+
opt_base64_output_mode = BASE64_OUTPUT_FULL;
4225+
short_form = true;
4226+
fprintf(result_file,
4227+
"/*!50700 SET @@SESSION.MTA_BINLOG_STATEMENT_WORKERS=%d*/;\n",
4228+
mta_workers);
4229+
}
4230+
41854231
if (opt_start_gtid_str != nullptr || opt_find_gtid_str != nullptr) {
41864232
if (opt_start_gtid_str != nullptr && opt_remote_proto == BINLOG_DUMP_GTID) {
41874233
char *args = const_cast<char *>("");
@@ -4210,7 +4256,12 @@ int main(int argc, char **argv) {
42104256

42114257
if (!raw_mode && opt_find_gtid_str == nullptr) {
42124258
fprintf(result_file, "# End of log file\n");
4213-
4259+
if (mta_workers) {
4260+
opt_base64_output_mode = orig_base64_output_mode;
4261+
short_form = orig_short_form;
4262+
fprintf(result_file,
4263+
"/*!50700 SET @@SESSION.MTA_BINLOG_STATEMENT_WORKERS=0*/;\n");
4264+
}
42144265
fprintf(result_file,
42154266
"/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;\n");
42164267
if (disable_log_bin)

mysql-test/r/mysqld--help-notwin.result

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,6 +1090,9 @@ The following options may be given as the first argument:
10901090
minimum_hlc_ns is successfully updated is guranteed to be
10911091
greater than this value. The maximum allowed drift
10921092
(forward) is controlled by maximum_hlc_drift_ns
1093+
--mta-binlog-statement-workers[=#]
1094+
Internal variable to specify the Number of workers to
1095+
spawn to apply binlogs thru mysqlbinlog piping
10931096
--mts-dependency-cond-wait-timeout[=#]
10941097
Timeout for all conditional waits in dependency repl in
10951098
milliseconds
@@ -3442,6 +3445,7 @@ memlock FALSE
34423445
min-examined-row-limit 0
34433446
min-examined-row-limit-sql-stats 0
34443447
minimum-hlc-ns 0
3448+
mta-binlog-statement-workers 0
34453449
mts-dependency-cond-wait-timeout 5000
34463450
mts-dependency-max-keys 100000
34473451
mts-dependency-order-commits DB

mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ SET PERSIST_ONLY max_binlog_cache_size = @@GLOBAL.max_binlog_cache_size;
159159
SET PERSIST_ONLY max_binlog_size = @@GLOBAL.max_binlog_size;
160160
SET PERSIST_ONLY max_binlog_stmt_cache_size = @@GLOBAL.max_binlog_stmt_cache_size;
161161
SET PERSIST_ONLY max_relay_log_size = @@GLOBAL.max_relay_log_size;
162+
SET PERSIST_ONLY mta_binlog_statement_workers = @@GLOBAL.mta_binlog_statement_workers;
162163
SET PERSIST_ONLY mts_dependency_replication = @@GLOBAL.mts_dependency_replication;
163164
SET PERSIST_ONLY prev_gtid_and_opid = @@GLOBAL.prev_gtid_and_opid;
164165
ERROR HY000: Variable 'prev_gtid_and_opid' is a non persistent read only variable
@@ -403,6 +404,7 @@ RESET PERSIST max_binlog_cache_size;
403404
RESET PERSIST max_binlog_size;
404405
RESET PERSIST max_binlog_stmt_cache_size;
405406
RESET PERSIST max_relay_log_size;
407+
RESET PERSIST mta_binlog_statement_workers;
406408
RESET PERSIST mts_dependency_replication;
407409
RESET PERSIST prev_gtid_and_opid;
408410
ERROR HY000: Variable prev_gtid_and_opid does not exist in persisted config file

mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ SET PERSIST max_binlog_cache_size = @@GLOBAL.max_binlog_cache_size;
134134
SET PERSIST max_binlog_size = @@GLOBAL.max_binlog_size;
135135
SET PERSIST max_binlog_stmt_cache_size = @@GLOBAL.max_binlog_stmt_cache_size;
136136
SET PERSIST max_relay_log_size = @@GLOBAL.max_relay_log_size;
137+
SET PERSIST mta_binlog_statement_workers = @@GLOBAL.mta_binlog_statement_workers;
137138
SET PERSIST mts_dependency_replication = @@GLOBAL.mts_dependency_replication;
138139
SET PERSIST prev_gtid_and_opid = @@GLOBAL.prev_gtid_and_opid;
139140
ERROR HY000: Variable 'prev_gtid_and_opid' is a read only variable
@@ -405,6 +406,7 @@ RESET PERSIST IF EXISTS max_binlog_cache_size;
405406
RESET PERSIST IF EXISTS max_binlog_size;
406407
RESET PERSIST IF EXISTS max_binlog_stmt_cache_size;
407408
RESET PERSIST IF EXISTS max_relay_log_size;
409+
RESET PERSIST IF EXISTS mta_binlog_statement_workers;
408410
RESET PERSIST IF EXISTS mts_dependency_replication;
409411
RESET PERSIST IF EXISTS prev_gtid_and_opid;
410412
Warnings:
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
include/master-slave.inc
2+
Warnings:
3+
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
4+
Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information.
5+
[connection master]
6+
call mtr.add_suppression("HA_ERR_FOUND_DUPP_KEY");
7+
create table t1 (a int primary key) engine = innodb;
8+
flush binary logs;
9+
purge binary logs to 'master-bin.000001';
10+
insert into t1 values(1);
11+
insert into t1 values(2);
12+
insert into t1 values(3);
13+
insert into t1 values(4);
14+
include/sync_slave_sql_with_master.inc
15+
flush binary logs;
16+
delete from t1;
17+
"Case 1: No errors"
18+
include/sync_slave_sql_with_master.inc
19+
select * from t1;
20+
a
21+
1
22+
2
23+
3
24+
4
25+
select * from t1;
26+
a
27+
1
28+
2
29+
3
30+
4
31+
"Case 2: Duplicate key error on worker"
32+
delete from t1;
33+
insert into t1 values(3);
34+
include/sync_slave_sql_with_master.inc
35+
drop table t1;
36+
include/sync_slave_sql_with_master.inc
37+
include/rpl_end.inc
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
--enable-binlog-hlc
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
source include/master-slave.inc;
2+
source include/have_binlog_format_row.inc;
3+
4+
call mtr.add_suppression("HA_ERR_FOUND_DUPP_KEY");
5+
6+
connection master;
7+
let $MYSQLD_DATADIR = `select @@global.datadir`;
8+
9+
let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1);
10+
create table t1 (a int primary key) engine = innodb;
11+
12+
flush binary logs;
13+
eval purge binary logs to '$binlog_file';
14+
15+
insert into t1 values(1);
16+
insert into t1 values(2);
17+
insert into t1 values(3);
18+
insert into t1 values(4);
19+
source include/sync_slave_sql_with_master.inc;
20+
21+
connection master;
22+
let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1);
23+
24+
flush binary logs;
25+
delete from t1;
26+
27+
echo "Case 1: No errors";
28+
exec $MYSQL_BINLOG --skip-gtids --mta-workers=2 $MYSQLD_DATADIR/$binlog_file | $MYSQL --host=127.0.0.1 -P $MASTER_MYPORT -uroot;
29+
source include/sync_slave_sql_with_master.inc;
30+
31+
connection master;
32+
select * from t1;
33+
34+
connection slave;
35+
select * from t1;
36+
37+
echo "Case 2: Duplicate key error on worker";
38+
connection master;
39+
delete from t1;
40+
insert into t1 values(3); # this will cause dup key error
41+
42+
exec $MYSQL_BINLOG --skip-gtids --mta-workers=2 $MYSQLD_DATADIR/$binlog_file | $MYSQL --host=127.0.0.1 -P $MASTER_MYPORT -uroot || true;
43+
source include/sync_slave_sql_with_master.inc;
44+
45+
connection master;
46+
drop table t1;
47+
source include/sync_slave_sql_with_master.inc;
48+
49+
source include/rpl_end.inc;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
SET @start_value = @@global.mta_binlog_statement_workers;
2+
SELECT @start_value;
3+
@start_value
4+
0
5+
SET @@GLOBAL.mta_binlog_statement_workers = 4;
6+
SET @@SESSION.mta_binlog_statement_workers = 4;
7+
SET @@SESSION.mta_binlog_statement_workers = 0;
8+
Warnings:
9+
Warning 1231 mta_binlog_statement_workers can only be set from mysqlbinlog
10+
SET @@GLOBAL.mta_binlog_statement_workers = @start_value;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
source include/load_sysvars.inc;
2+
3+
SET @start_value = @@global.mta_binlog_statement_workers;
4+
SELECT @start_value;
5+
6+
SET @@GLOBAL.mta_binlog_statement_workers = 4;
7+
8+
SET @@SESSION.mta_binlog_statement_workers = 4;
9+
10+
SET @@SESSION.mta_binlog_statement_workers = 0;
11+
12+
SET @@GLOBAL.mta_binlog_statement_workers = @start_value;

sql/log_event.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2563,7 +2563,10 @@ void Log_event::print_base64(IO_CACHE *file, PRINT_EVENT_INFO *print_event_info,
25632563
}
25642564

25652565
if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) {
2566-
if (my_b_tell(file) == 0) {
2566+
// In BASE64_OUTPUT_FULL mode mysqlbinlog.cc will determine when to print
2567+
// "BINLOG". See @print_event()
2568+
if (print_event_info->base64_output_mode != BASE64_OUTPUT_FULL &&
2569+
my_b_tell(file) == 0) {
25672570
my_b_printf(file, "\nBINLOG '\n");
25682571
print_event_info->inside_binlog = true;
25692572
}

0 commit comments

Comments
 (0)