diff options
Diffstat (limited to 'client/mysqlbinlog.cc')
-rw-r--r-- | client/mysqlbinlog.cc | 331 |
1 files changed, 306 insertions, 25 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index d65828ea71c..dcb362e6b3a 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -143,10 +143,18 @@ static char *charset= 0; static uint verbose= 0; -static ulonglong start_position, stop_position; +static char *start_pos_str, *stop_pos_str; +static ulonglong start_position= BIN_LOG_HEADER_SIZE, + stop_position= (longlong)(~(my_off_t)0) ; #define start_position_mot ((my_off_t)start_position) #define stop_position_mot ((my_off_t)stop_position) +static Delegating_gtid_event_filter *gtid_event_filter= NULL; +static rpl_gtid *start_gtids, *stop_gtids; +static my_bool is_event_gtid_active= FALSE; +static uint32 n_start_gtid_ranges= 0; +static uint32 n_stop_gtid_ranges= 0; + static char *start_datetime_str, *stop_datetime_str; static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX; static ulonglong rec_count= 0; @@ -981,6 +989,40 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, return result; } +static inline my_bool is_gtid_filtering_enabled() +{ + return gtid_event_filter != NULL; +} + +/* + Where the binlog is processed sequentially, the variable is_event_gtid_active + keeps track of the state of active event groups. When a new Gtid_log_event is + read, if it should be output, this function will return true. Otherwise, this + function will return false. +*/ +static inline my_bool is_event_group_active() +{ + return is_event_gtid_active; +} + +/* + When a Gtid_log_event marks a GTID that should be output, this function is + invoked to change the program state to start writing binlog events until + the event group has ended. +*/ +static inline void activate_current_event_group() +{ + is_event_gtid_active= TRUE; +} + +/* + When an active event group has written its last event, this function is + invoked to change the program state to stop writing events. +*/ +static inline void deactivate_current_event_group() +{ + is_event_gtid_active= FALSE; +} /** Print the given event, and either delete it or delegate the deletion @@ -1019,11 +1061,34 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, #endif /* + If the binlog output should be filtered using GTIDs, test the new event + group to see if its events should be written or ignored. + */ + if (ev_type == GTID_EVENT && is_gtid_filtering_enabled()) + { + Gtid_log_event *gle= (Gtid_log_event*) ev; + rpl_gtid gtid; + gtid.domain_id= gle->domain_id; + gtid.server_id= gle->server_id; + gtid.seq_no= gle->seq_no; + if (!gtid_event_filter->exclude(>id)) + { + activate_current_event_group(); + } + else + { + deactivate_current_event_group(); + } + + } + + /* Format events are not concerned by --offset and such, we always need to read them to be able to process the wanted events. */ if (((rec_count >= offset) && - (ev->when >= start_datetime)) || + (ev->when >= start_datetime) && + (!is_gtid_filtering_enabled() || is_event_group_active())) || (ev_type == FORMAT_DESCRIPTION_EVENT)) { if (ev_type != FORMAT_DESCRIPTION_EVENT) @@ -1500,6 +1565,13 @@ end: } } + /* Xid_log_events or Query_log_events mark the end of a GTID event group. */ + if ((ev_type == XID_EVENT || ev_type == QUERY_EVENT) && + is_event_group_active()) + { + deactivate_current_event_group(); + } + if (destroy_evt) /* destroy it later if not set (ignored table map) */ delete ev; } @@ -1658,15 +1730,13 @@ static struct my_option my_options[] = &start_datetime_str, &start_datetime_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"start-position", 'j', - "Start reading the binlog at position N. Applies to the first binlog " - "passed on the command line.", - &start_position, &start_position, 0, GET_ULL, - REQUIRED_ARG, BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE, - /* - COM_BINLOG_DUMP accepts only 4 bytes for the position - so remote log reading has lower limit. - */ - (ulonglong)(0xffffffffffffffffULL), 0, 0, 0}, + "Start reading the binlog at this position. Type can either be a positive " + "integer or a GTID. When using a positive integer, the value only " + "applies to the first binlog passed on the command line. In GTID mode, " + "multiple GTIDs can be passed as a comma separated list, where each must " + "have a unique domain id.", + &start_pos_str, &start_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, + 0, 0, 0}, {"stop-datetime", OPT_STOP_DATETIME, "Stop reading the binlog at first event having a datetime equal or " "posterior to the argument; the argument must be a date and time " @@ -1684,11 +1754,13 @@ static struct my_option my_options[] = &opt_stop_never_slave_server_id, &opt_stop_never_slave_server_id, 0, GET_ULONG, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"stop-position", OPT_STOP_POSITION, - "Stop reading the binlog at position N. Applies to the last binlog " - "passed on the command line.", - &stop_position, &stop_position, 0, GET_ULL, - REQUIRED_ARG, (longlong)(~(my_off_t)0), BIN_LOG_HEADER_SIZE, - (ulonglong)(~(my_off_t)0), 0, 0, 0}, + "Stop reading the binlog at this position. Type can either be a positive " + "integer or a GTID. When using a positive integer, the value only " + "applies to the last binlog passed on the command line. In GTID mode, " + "multiple GTIDs can be passed as a comma separated list, where each must " + "have a unique domain id.", + &stop_pos_str, &stop_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, + 0, 0}, {"table", 'T', "List entries for just this table (local log only).", &table, &table, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, @@ -1824,8 +1896,14 @@ static void cleanup() my_free(const_cast<char*>(dirname_for_local_load)); my_free(start_datetime_str); my_free(stop_datetime_str); + my_free(start_pos_str); + my_free(stop_pos_str); + my_free(start_gtids); + my_free(stop_gtids); free_root(&glob_root, MYF(0)); + delete gtid_event_filter; + delete binlog_filter; delete glob_description_event; if (mysql) @@ -2075,6 +2153,123 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi print_version(); opt_version= 1; break; + case OPT_STOP_POSITION: + { + stop_gtids= gtid_parse_string_to_list(stop_pos_str, strlen(stop_pos_str), + &n_stop_gtid_ranges); + if (stop_gtids == NULL) + { + int err= 0; + char *end_ptr= NULL; + /* + No GTIDs specified in OPT_STOP_POSITION specification. Treat the value + as a singular index. + */ + stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err); + + if (err || *end_ptr) + { + // Can't parse the position from the user + sql_print_error("Stop position argument value is invalid. Should be " + "either a position or GTID."); + return 1; + } + } + else if (n_stop_gtid_ranges > 0) + { + uint32 gtid_idx; + Domain_gtid_event_filter *domain_filter; + + if (gtid_event_filter == NULL) + { + domain_filter= new Domain_gtid_event_filter(); + gtid_event_filter= domain_filter; + } + else + { + domain_filter= (Domain_gtid_event_filter *) gtid_event_filter; + } + + for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++) + { + rpl_gtid *stop_gtid= &stop_gtids[gtid_idx]; + if (!domain_filter->add_stop_gtid(stop_gtid)) + { + sql_print_error("Cannot add stop position; domain id %u " + "already has a stop position", + stop_gtid->domain_id); + return 1; + } + } + } + else + { + // Can't parse the position from the user + sql_print_error("Stop position argument value is invalid. Should be " + "either a position or GTID."); + return 1; + } + break; + } + case 'j': + { + start_gtids= gtid_parse_string_to_list( + start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges); + + if (start_gtids == NULL) + { + int err= 0; + char *end_ptr= NULL; + /* + No GTIDs specified in OPT_START_POSITION specification. Treat the value + as a singular index. + */ + start_position= my_strtoll10(start_pos_str, &end_ptr, &err); + + if (err || *end_ptr) + { + // Can't parse the position from the user + sql_print_error("Start position argument value is invalid. Should be " + "either a position or GTID."); + return 1; + } + } + else if (n_start_gtid_ranges > 0) + { + uint32 gtid_idx; + Domain_gtid_event_filter *domain_filter; + + if (gtid_event_filter == NULL) + { + domain_filter= new Domain_gtid_event_filter(); + gtid_event_filter= domain_filter; + } + else + { + domain_filter= (Domain_gtid_event_filter *) gtid_event_filter; + } + + for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++) + { + rpl_gtid *start_gtid= &start_gtids[gtid_idx]; + if (!domain_filter->add_start_gtid(start_gtid)) + { + sql_print_error("Cannot add start position; domain id %u " + "already has a start position", + start_gtid->domain_id); + return 1; + } + } + } + else + { + // Can't parse the position from the user + sql_print_error("Start position argument value is invalid. Should be " + "either a position or GTID."); + return 1; + } + break; + } case '?': usage(); opt_version= 1; @@ -2237,11 +2432,12 @@ static Exit_status dump_log_entries(const char* logname) @retval ERROR_STOP An error occurred - the program should terminate. @retval OK_CONTINUE No error, the program should continue. */ -static Exit_status check_master_version() +static Exit_status check_master_version(uint *major, uint *minor, uint *patch) { MYSQL_RES* res = 0; MYSQL_ROW row; uint version; + size_t version_iter; if (mysql_query(mysql, "SELECT VERSION()") || !(res = mysql_store_result(mysql))) @@ -2257,12 +2453,52 @@ static Exit_status check_master_version() goto err; } - if (!(version = atoi(row[0]))) + if (!(*major = atoi(row[0]))) { error("Could not find server version: " "Master reported NULL for the version."); goto err; } + + /* Try to save the minor version, if supplied with a storage location */ + if (minor != NULL) + { + for (version_iter= 0; version_iter < strlen((const char *) row); + version_iter++) + { + if (row[0][version_iter] == '.') + { + version_iter= version_iter + 1; + break; + } + } + if (!(*minor= atoi(&row[0][version_iter]))) + { + error("Could not find server minor version: " + "Master reported NULL for the minor version."); + } + } + + /* Try to save the patch version, if supplied with a storage location */ + if (patch != NULL) + { + for (; version_iter < strlen((const char *) row); version_iter++) + { + if (row[0][version_iter] == '.') + { + version_iter= version_iter + 1; + break; + } + } + if (!(*patch= atoi(&row[0][version_iter]))) + { + error("Could not find server patch version: " + "Master reported NULL for the patch version."); + } + } + + version= *major; + /* Make a notice to the server that this client is checksum-aware. It does not need the first fake Rotate @@ -2606,21 +2842,56 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info, DBUG_RETURN(retval); net= &mysql->net; - if ((retval= check_master_version()) != OK_CONTINUE) + uint major_version, minor_version; + if ((retval= check_master_version(&major_version, &minor_version, NULL)) != + OK_CONTINUE) DBUG_RETURN(retval); /* COM_BINLOG_DUMP accepts only 4 bytes for the position, so we are forced to cast to uint32. */ - DBUG_ASSERT(start_position <= UINT_MAX32); - int4store(buf, (uint32)start_position); + size_t buf_idx= 0; + if (is_gtid_filtering_enabled()) + { + if (major_version < 10 || (major_version == 10 && minor_version < 7)) + { + error("Master does not support GTID filtering. This feature was added " + "in MariaDB version 10.7"); + DBUG_RETURN(ERROR_STOP); + } + + size_t i; + for (i = 0; i < n_start_gtid_ranges; i++) + { + if (i > 0) + { + buf[buf_idx]= ','; + buf_idx += 1; + } + + + int4store(buf + buf_idx, start_gtids[i].domain_id); + buf[buf_idx+4]= '-'; + int4store(buf + buf_idx + 5, start_gtids[i].server_id); + buf[buf_idx+9]= '-'; + int8store(buf + buf_idx + 10, start_gtids[i].seq_no); + buf_idx += 18; + } + } + else + { + DBUG_ASSERT(start_position <= UINT_MAX32); + int4store(buf, (uint32) start_position); + buf_idx= BIN_LOG_HEADER_SIZE; + } if (!opt_skip_annotate_row_events) binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; if (!opt_stop_never) binlog_flags|= BINLOG_DUMP_NON_BLOCK; - int2store(buf + BIN_LOG_HEADER_SIZE, binlog_flags); + int2store(buf + buf_idx, binlog_flags); + buf_idx += 2; size_t tlen = strlen(logname); if (tlen > sizeof(buf) - 10) @@ -2637,9 +2908,10 @@ static Exit_status dump_remote_log_entries(PRINT_EVENT_INFO *print_event_info, } else slave_id= 0; - int4store(buf + 6, slave_id); - memcpy(buf + 10, logname, logname_len); - if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + 10, 1)) + int4store(buf + buf_idx, slave_id); + buf_idx += 4; + memcpy(buf + buf_idx, logname, logname_len); + if (simple_command(mysql, COM_BINLOG_DUMP, buf, logname_len + buf_idx, 1)) { error("Got fatal error sending the log dump command."); DBUG_RETURN(ERROR_STOP); @@ -3210,6 +3482,14 @@ int main(int argc, char** argv) "/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n"); fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n"); + + if (is_gtid_filtering_enabled()) + { + fprintf(result_file, + "/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n" + "/*!100001 SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID " + "*/;\n"); + } } if (tmpdir.list) @@ -3271,3 +3551,4 @@ struct encryption_service_st encryption_handler= #include "sql_list.cc" #include "rpl_filter.cc" #include "compat56.cc" +#include "rpl_gtid.cc"
\ No newline at end of file |