diff options
Diffstat (limited to 'client/mysqlbinlog.cc')
-rw-r--r-- | client/mysqlbinlog.cc | 381 |
1 files changed, 365 insertions, 16 deletions
diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 3d9b9712ec8..3ef174fefcb 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -128,6 +128,7 @@ static my_bool print_row_count_used= 0, print_row_event_positions_used= 0; static my_bool debug_info_flag, debug_check_flag; static my_bool force_if_open_opt= 1; static my_bool opt_raw_mode= 0, opt_stop_never= 0; +my_bool opt_gtid_strict_mode= true; static ulong opt_stop_never_slave_server_id= 0; static my_bool opt_verify_binlog_checksum= 1; static ulonglong offset = 0; @@ -143,10 +144,15 @@ static char *charset= 0; static uint verbose= 0; -static ulonglong start_position, stop_position; +static char *start_pos_str, *stop_pos_str; +static ulonglong start_position= BIN_LOG_HEADER_SIZE, + stop_position= (longlong)(~(my_off_t)0) ; #define start_position_mot ((my_off_t)start_position) #define stop_position_mot ((my_off_t)stop_position) +static Binlog_gtid_state_validator *gtid_state_validator= NULL; +static Domain_gtid_event_filter *domain_gtid_filter= NULL; + static char *start_datetime_str, *stop_datetime_str; static my_time_t start_datetime= 0, stop_datetime= MY_TIME_T_MAX; static ulonglong rec_count= 0; @@ -981,6 +987,10 @@ static bool print_row_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, return result; } +static inline my_bool is_gtid_filtering_enabled() +{ + return domain_gtid_filter != NULL; +} /** Print the given event, and either delete it or delegate the deletion @@ -1008,10 +1018,23 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, char ll_buff[21]; Log_event_type ev_type= ev->get_type_code(); my_bool destroy_evt= TRUE; + my_bool gtid_err= FALSE; DBUG_ENTER("process_event"); Exit_status retval= OK_CONTINUE; IO_CACHE *const head= &print_event_info->head_cache; + /* + We use Gtid_list_log_event information to determine if there is missing + data between where a user expects events to start/stop (i.e. the GTIDs + provided by --start-position and --stop-position), and the true start of + the specified binary logs. The first GLLE provides the initial state of the + binary logs. + + If --start-position is provided as a file offset, we want to skip initial + GTID state verification + */ + static my_bool was_first_glle_processed= start_position > BIN_LOG_HEADER_SIZE; + /* Bypass flashback settings to event */ ev->is_flashback= opt_flashback; #ifdef WHEN_FLASHBACK_REVIEW_READY @@ -1019,6 +1042,120 @@ Exit_status process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, #endif /* + Run time estimation of the output window configuration. + + Do not validate GLLE information is start position is provided as a file + offset. + */ + if (ev_type == GTID_LIST_EVENT && ev->when) + { + Gtid_list_log_event *glev= (Gtid_list_log_event *)ev; + + /* + If this is the first Gtid_list_log_event, initialize the state of the + GTID stream auditor to be consistent with the binary logs provided + */ + if (gtid_state_validator && !was_first_glle_processed && glev->count) + { + if (gtid_state_validator->initialize_gtid_state(stderr, glev->list, + glev->count)) + goto err; + + if (domain_gtid_filter && !domain_gtid_filter->get_num_start_gtids()) + { + /* + We need to validate the GTID list from --stop-position because we + couldn't prove it intrinsically (i.e. using stop > start) + */ + rpl_gtid *stop_gtids= domain_gtid_filter->get_stop_gtids(); + size_t n_stop_gtids= domain_gtid_filter->get_num_stop_gtids(); + if (gtid_state_validator->verify_stop_state(stderr, stop_gtids, + n_stop_gtids)) + { + my_free(stop_gtids); + goto err; + } + my_free(stop_gtids); + } + } + + /* + Verify that we are able to process events from this binlog. For example, + if our current GTID state is behind the state of the GLLE in the new log, + a user may have accidentally left out a log file to process. + */ + if (gtid_state_validator && verbose >= 3) + for (size_t k= 0; k < glev->count; k++) + gtid_state_validator->verify_gtid_state(stderr, &(glev->list[k])); + + was_first_glle_processed= TRUE; + } + + if (ev_type == GTID_EVENT) + { + rpl_gtid ev_gtid; + Gtid_log_event *gle= (Gtid_log_event*) ev; + ev_gtid= {gle->domain_id, gle->server_id, gle->seq_no}; + + /* + If the binlog output should be filtered using GTIDs, test the new event + group to see if its events should be ignored. + */ + if (domain_gtid_filter) + { + if (domain_gtid_filter->has_finished()) + { + retval= OK_STOP; + goto end; + } + + if (!domain_gtid_filter->exclude(&ev_gtid)) + print_event_info->activate_current_event_group(); + else + print_event_info->deactivate_current_event_group(); + } + + /* + Where we always ensure the initial binlog state is valid, we only + continually monitor the GTID stream for validity if we are in GTID + strict mode (for errors) or if three levels of verbosity is provided + (for warnings). + + If we don't care about ensuring GTID validity, just delete the auditor + object to disable it for future checks. + */ + if (gtid_state_validator) + { + if (!(opt_gtid_strict_mode || verbose >= 3)) + { + delete gtid_state_validator; + + /* + Explicitly reset to NULL to simplify checks on if auditing is enabled + i.e. if it is defined, assume we want to use it + */ + gtid_state_validator= NULL; + } + else + { + gtid_err= gtid_state_validator->record(&ev_gtid); + if (gtid_err && opt_gtid_strict_mode) + { + gtid_state_validator->report(stderr, opt_gtid_strict_mode); + goto err; + } + } + } + } + + /* + If the GTID is ignored, it shouldn't count towards offset (rec_count should + not be incremented) + */ + if (!print_event_info->is_event_group_active()) + goto end_skip_count; + + /* Format events are not concerned by --offset and such, we always need to read them to be able to process the wanted events. */ @@ -1458,6 +1595,7 @@ err: retval= ERROR_STOP; end: rec_count++; +end_skip_count: DBUG_PRINT("info", ("end event processing")); /* @@ -1658,15 +1796,22 @@ static struct my_option my_options[] = &start_datetime_str, &start_datetime_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"start-position", 'j', - "Start reading the binlog at position N. Applies to the first binlog " - "passed on the command line.", - &start_position, &start_position, 0, GET_ULL, - REQUIRED_ARG, BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE, - /* - COM_BINLOG_DUMP accepts only 4 bytes for the position - so remote log reading has lower limit. - */ - (ulonglong)(0xffffffffffffffffULL), 0, 0, 0}, + "Start reading the binlog at this position. Type can either be a positive " + "integer or a GTID list. When using a positive integer, the value only " + "applies to the first binlog passed on the command line. In GTID mode, " + "multiple GTIDs can be passed as a comma separated list, where each must " + "have a unique domain id. The list represents the gtid binlog state that " + "the client (another \"replica\" server) is aware of. Therefore, each GTID " + "is exclusive; only events after a given sequence number will be printed to " + "allow users to receive events after their current state.", + &start_pos_str, &start_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, + 0, 0, 0}, + {"gtid-strict-mode", 0, "Process binlog according to gtid-strict-mode " + "specification. The start, stop positions are verified to satisfy " + "start < stop comparison condition. Sequence numbers of any gtid domain " + "must comprise monotically growing sequence", + &opt_gtid_strict_mode, &opt_gtid_strict_mode, 0, + GET_BOOL, NO_ARG, 1, 0, 0, 0, 0, 0}, {"stop-datetime", OPT_STOP_DATETIME, "Stop reading the binlog at first event having a datetime equal or " "posterior to the argument; the argument must be a date and time " @@ -1684,11 +1829,14 @@ static struct my_option my_options[] = &opt_stop_never_slave_server_id, &opt_stop_never_slave_server_id, 0, GET_ULONG, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"stop-position", OPT_STOP_POSITION, - "Stop reading the binlog at position N. Applies to the last binlog " - "passed on the command line.", - &stop_position, &stop_position, 0, GET_ULL, - REQUIRED_ARG, (longlong)(~(my_off_t)0), BIN_LOG_HEADER_SIZE, - (ulonglong)(~(my_off_t)0), 0, 0, 0}, + "Stop reading the binlog at this position. Type can either be a positive " + "integer or a GTID list. When using a positive integer, the value only " + "applies to the last binlog passed on the command line. In GTID mode, " + "multiple GTIDs can be passed as a comma separated list, where each must " + "have a unique domain id. Each GTID is inclusive; only events up to the " + "given sequence numbers are printed.", + &stop_pos_str, &stop_pos_str, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, + 0, 0}, {"table", 'T', "List entries for just this table (local log only).", &table, &table, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, @@ -1702,7 +1850,9 @@ that may lead to an endless loop.", &user, &user, 0, GET_STR_ALLOC, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, {"verbose", 'v', "Reconstruct SQL statements out of row events. " - "-v -v adds comments on column data types.", + "-v -v adds comments on column data types. " + "-v -v -v adds diagnostic warnings about event " + "integrity before program exit.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, {"version", 'V', "Print version and exit.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, @@ -1824,8 +1974,14 @@ static void cleanup() my_free(const_cast<char*>(dirname_for_local_load)); my_free(start_datetime_str); my_free(stop_datetime_str); + my_free(start_pos_str); + my_free(stop_pos_str); free_root(&glob_root, MYF(0)); + delete domain_gtid_filter; + if (gtid_state_validator) + delete gtid_state_validator; + delete binlog_filter; delete glob_description_event; if (mysql) @@ -2075,6 +2231,110 @@ get_one_option(const struct my_option *opt, const char *argument, const char *fi print_version(); opt_version= 1; break; + case OPT_STOP_POSITION: + { + /* Stop position was already specified, so reset it and use the new list */ + if (domain_gtid_filter && domain_gtid_filter->get_num_stop_gtids() > 0) + domain_gtid_filter->clear_stop_gtids(); + + uint32 n_stop_gtid_ranges= 0; + rpl_gtid *stop_gtids= gtid_parse_string_to_list( + stop_pos_str, strlen(stop_pos_str), &n_stop_gtid_ranges); + if (stop_gtids == NULL) + { + int err= 0; + char *end_ptr= NULL; + /* + No GTIDs specified in OPT_STOP_POSITION specification. Treat the value + as a singular index. + */ + stop_position= my_strtoll10(stop_pos_str, &end_ptr, &err); + + if (err || *end_ptr) + { + // Can't parse the position from the user + sql_print_error("Stop position argument value is invalid. Should be " + "either a positive integer or GTID."); + return 1; + } + } + else if (n_stop_gtid_ranges > 0) + { + uint32 gtid_idx; + + if (domain_gtid_filter == NULL) + domain_gtid_filter= new Domain_gtid_event_filter(); + + for (gtid_idx = 0; gtid_idx < n_stop_gtid_ranges; gtid_idx++) + { + rpl_gtid *stop_gtid= &stop_gtids[gtid_idx]; + if (domain_gtid_filter->add_stop_gtid(stop_gtid)) + { + my_free(stop_gtids); + return 1; + } + } + my_free(stop_gtids); + } + else + { + DBUG_ASSERT(0); + } + break; + } + case 'j': + { + /* Start position was already specified, so reset it and use the new list */ + if (domain_gtid_filter && domain_gtid_filter->get_num_start_gtids() > 0) + domain_gtid_filter->clear_start_gtids(); + + uint32 n_start_gtid_ranges= 0; + rpl_gtid *start_gtids= gtid_parse_string_to_list( + start_pos_str, strlen(start_pos_str), &n_start_gtid_ranges); + + if (start_gtids == NULL) + { + int err= 0; + char *end_ptr= NULL; + /* + No GTIDs specified in OPT_START_POSITION specification. Treat the value + as a singular index. + */ + start_position= my_strtoll10(start_pos_str, &end_ptr, &err); + + if (err || *end_ptr) + { + // Can't parse the position from the user + sql_print_error("Start position argument value is invalid. Should be " + "either a positive integer or GTID."); + return 1; + } + } + else if (n_start_gtid_ranges > 0) + { + uint32 gtid_idx; + + if (domain_gtid_filter == NULL) + domain_gtid_filter= new Domain_gtid_event_filter(); + + for (gtid_idx = 0; gtid_idx < n_start_gtid_ranges; gtid_idx++) + { + rpl_gtid *start_gtid= &start_gtids[gtid_idx]; + if (start_gtid->seq_no && + domain_gtid_filter->add_start_gtid(start_gtid)) + { + my_free(start_gtids); + return 1; + } + } + my_free(start_gtids); + } + else + { + DBUG_ASSERT(0); + } + break; + } case '?': usage(); opt_version= 1; @@ -2107,6 +2367,37 @@ static int parse_args(int *argc, char*** argv) start_position); start_position= UINT_MAX32; } + + /* + Always initialize the stream auditor initially because it is used to check + the initial state of the binary log is correct. If we don't want it later + (i.e. --skip-gtid-strict-mode or -vvv is not given), it is deleted when we + are certain the initial gtid state is set. + */ + gtid_state_validator= new Binlog_gtid_state_validator(); + + if (domain_gtid_filter) + { + if (opt_gtid_strict_mode && domain_gtid_filter->validate_window_filters()) + { + /* + In strict mode, if any --start/stop-position GTID ranges are invalid, + quit in error. Note that any specific error messages will have + already been written. + */ + die(); + } + + /* + GTIDs before a start position shouldn't be validated, so we initialize + the stream auditor to only monitor GTIDs after these positions. + */ + size_t n_start_gtids= domain_gtid_filter->get_num_start_gtids(); + rpl_gtid *start_gtids= domain_gtid_filter->get_start_gtids(); + gtid_state_validator->initialize_start_gtids(start_gtids, n_start_gtids); + my_free(start_gtids); + } + return 0; } @@ -2187,6 +2478,9 @@ static Exit_status dump_log_entries(const char* logname) if (!print_event_info.init_ok()) return ERROR_STOP; + + if (domain_gtid_filter) + print_event_info.enable_event_group_filtering(); /* Set safe delimiter, to dump things like CREATE PROCEDURE safely @@ -2288,6 +2582,40 @@ static Exit_status check_master_version() goto err; } + if (domain_gtid_filter && domain_gtid_filter->get_num_start_gtids() > 0) + { + char str_buf[256]; + String query_str(str_buf, sizeof(str_buf), system_charset_info); + query_str.length(0); + query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"), + system_charset_info); + + size_t n_start_gtids= domain_gtid_filter->get_num_start_gtids(); + rpl_gtid *start_gtids= domain_gtid_filter->get_start_gtids(); + + for (size_t gtid_idx = 0; gtid_idx < n_start_gtids; gtid_idx++) + { + char buf[256]; + rpl_gtid *start_gtid= &start_gtids[gtid_idx]; + + sprintf(buf, "%u-%u-%llu", + start_gtid->domain_id, start_gtid->server_id, + start_gtid->seq_no); + query_str.append(buf, strlen(buf)); + if (gtid_idx < n_start_gtids - 1) + query_str.append(','); + } + my_free(start_gtids); + + query_str.append(STRING_WITH_LEN("'"), system_charset_info); + if (unlikely(mysql_real_query(mysql, query_str.ptr(), query_str.length()))) + { + error("Setting @slave_connect_state failed with error: %s", + mysql_error(mysql)); + goto err; + } + } + delete glob_description_event; glob_description_event= NULL; @@ -3210,12 +3538,32 @@ int main(int argc, char** argv) "/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;\n"); fprintf(result_file, "/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;\n"); + + if (is_gtid_filtering_enabled()) + { + fprintf(result_file, + "/*!100001 SET @@SESSION.SERVER_ID=@@GLOBAL.SERVER_ID */;\n" + "/*!100001 SET @@SESSION.GTID_DOMAIN_ID=@@GLOBAL.GTID_DOMAIN_ID " + "*/;\n"); + } } if (tmpdir.list) free_tmpdir(&tmpdir); if (result_file && result_file != stdout) my_fclose(result_file, MYF(0)); + + /* + Ensure the GTID state is correct. If not, end in error. + + Note that in gtid strict mode, we will not report here if any invalid GTIDs + are processed because it immediately errors (i.e. retval will be + ERROR_STOP) + */ + if (retval != ERROR_STOP && gtid_state_validator && + gtid_state_validator->report(stderr, opt_gtid_strict_mode)) + retval= ERROR_STOP; + cleanup(); /* We cannot free DBUG, it is used in global destructors after exit(). */ my_end(my_end_arg | MY_DONT_FREE_DBUG); @@ -3271,3 +3619,4 @@ struct encryption_service_st encryption_handler= #include "sql_list.cc" #include "rpl_filter.cc" #include "compat56.cc" +#include "rpl_gtid.cc" |