diff options
author | Jonas Oreland <jonaso@google.com> | 2014-12-19 12:36:23 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-12-23 14:16:13 +0100 |
commit | 4d8b346e079a27960dbe49e4d0ec4364bed8d30e (patch) | |
tree | 249baee07701bc22748d1feefbebab88815cc60f /sql/sql_repl.cc | |
parent | ea01fff5053acc6526bc995c2a3c85474c844508 (diff) | |
download | mariadb-git-4d8b346e079a27960dbe49e4d0ec4364bed8d30e.tar.gz |
MDEV-7257: Dump Thread Enhancements
Make the binlog dump threads not need to take LOCK_log while sending
binlog events to slave. Instead, a new LOCK_binlog_end_pos is used
just to coordinate tracking the current end-of-log.
This is a pre-requisite for MDEV-162, "Enhanced semisync
replication". It should also help reduce the contention on LOCK_log on
a busy master.
Also does some much-needed refactoring/cleanup of the related code in
the binlog dump thread.
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 1567 |
1 files changed, 857 insertions, 710 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index edc24e45bdb..6bc8d7e6c45 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -125,8 +125,9 @@ struct binlog_send_info { THD *thd; NET *net; String *packet; - char *log_file_name; + char *const log_file_name; // ptr/alias to linfo.log_file_name slave_connection_state *until_gtid_state; + slave_connection_state until_gtid_state_obj; Format_description_log_event *fdev; int mariadb_slave_capability; enum_gtid_skip_type gtid_skip_group; @@ -138,16 +139,57 @@ struct binlog_send_info { bool slave_gtid_ignore_duplicates; bool using_gtid_state; - binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn) + int error; + const char *errmsg; + char error_text[MAX_SLAVE_ERRMSG]; + rpl_gtid error_gtid; + + ulonglong heartbeat_period; + + /** start file/pos as requested by slave, for error message */ + char start_log_file_name[FN_REFLEN]; + my_off_t start_pos; + + /** last pos for error message */ + my_off_t last_pos; + +#ifndef DBUG_OFF + int left_events; + uint dbug_reconnect_counter; + ulong hb_info_counter; +#endif + + bool clear_initial_log_pos; + bool should_stop; + + binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, + char *lfn) : thd(thd_arg), net(&thd_arg->net), packet(packet_arg), log_file_name(lfn), until_gtid_state(NULL), fdev(NULL), gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE), flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), slave_gtid_strict_mode(false), send_fake_gtid_list(false), - slave_gtid_ignore_duplicates(false) - { } + slave_gtid_ignore_duplicates(false), + error(0), + errmsg("Unknown error"), + heartbeat_period(0), +#ifndef DBUG_OFF + left_events(max_binlog_dump_events), + dbug_reconnect_counter(0), + hb_info_counter(0), +#endif + clear_initial_log_pos(false), + should_stop(false) + { + error_text[0] = 0; + bzero(&error_gtid, sizeof(error_gtid)); + } }; +// prototype +static int reset_transmit_packet(struct binlog_send_info *info, ushort flags, + ulong *ev_offset, const char **errmsg); + /* fake_rotate_event() builds a fake (=which does not exist physically in any binlog) Rotate event, which contains the name of the binlog we are going to @@ -170,6 +212,7 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position, const char** errmsg, uint8 checksum_alg_arg) { DBUG_ENTER("fake_rotate_event"); + ulong ev_offset; char buf[ROTATE_HEADER_LEN+100]; my_bool do_checksum; int err; @@ -178,10 +221,18 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position, String *packet= info->packet; ha_checksum crc; + /* reset transmit packet for the fake rotate event below */ + if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) + DBUG_RETURN(1); + if ((err= fake_event_header(packet, ROTATE_EVENT, - ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc, + ident_len + ROTATE_HEADER_LEN, &do_checksum, + &crc, errmsg, checksum_alg_arg, 0))) + { + info->error= ER_UNKNOWN_ERROR; DBUG_RETURN(err); + } int8store(buf+R_POS_OFFSET,position); packet->append(buf, ROTATE_HEADER_LEN); @@ -195,8 +246,10 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position, if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || (err= fake_event_write(info->net, packet, errmsg))) + { + info->error= ER_UNKNOWN_ERROR; DBUG_RETURN(err); - + } DBUG_RETURN(0); } @@ -215,13 +268,17 @@ static int fake_gtid_list_event(binlog_send_info *info, str.length(0); if (glev->to_packet(&str)) { + info->error= ER_UNKNOWN_ERROR; *errmsg= "Failed due to out-of-memory writing Gtid_list event"; return -1; } if ((err= fake_event_header(packet, GTID_LIST_EVENT, str.length(), &do_checksum, &crc, errmsg, info->current_checksum_alg, current_pos))) + { + info->error= ER_UNKNOWN_ERROR; return err; + } packet->append(str); if (do_checksum) @@ -231,7 +288,10 @@ static int fake_gtid_list_event(binlog_send_info *info, if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || (err= fake_event_write(info->net, packet, errmsg))) + { + info->error= ER_UNKNOWN_ERROR; return err; + } return 0; } @@ -243,20 +303,20 @@ static int fake_gtid_list_event(binlog_send_info *info, This function allocates header bytes for event transmission, and should be called before store the event data to the packet buffer. */ -static int reset_transmit_packet(THD *thd, ushort flags, +static int reset_transmit_packet(binlog_send_info *info, ushort flags, ulong *ev_offset, const char **errmsg) { int ret= 0; - String *packet= &thd->packet; + String *packet= &info->thd->packet; /* reserve and set default header */ packet->length(0); packet->set("\0", 1, &my_charset_bin); - if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) + if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet))) { + info->error= ER_UNKNOWN_ERROR; *errmsg= "Failed to run hook 'reserve_header'"; - my_errno= ER_UNKNOWN_ERROR; ret= 1; } *ev_offset= packet->length(); @@ -556,36 +616,38 @@ bool purge_master_logs_before_date(THD* thd, time_t purge_time) mysql_bin_log.purge_logs_before_date(purge_time)); } -int test_for_non_eof_log_read_errors(int error, const char **errmsg) +void set_read_error(binlog_send_info *info, int error) { if (error == LOG_READ_EOF) - return 0; - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + { + return; + } + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; switch (error) { case LOG_READ_BOGUS: - *errmsg = "bogus data in log event"; + info->errmsg= "bogus data in log event"; break; case LOG_READ_TOO_LARGE: - *errmsg = "log event entry exceeded max_allowed_packet; \ -Increase max_allowed_packet on master"; + info->errmsg= "log event entry exceeded max_allowed_packet; " + "Increase max_allowed_packet on master"; break; case LOG_READ_IO: - *errmsg = "I/O error reading log event"; + info->errmsg= "I/O error reading log event"; break; case LOG_READ_MEM: - *errmsg = "memory allocation failed reading log event"; + info->errmsg= "memory allocation failed reading log event"; break; case LOG_READ_TRUNC: - *errmsg = "binlog truncated in the middle of event; consider out of disk space on master"; + info->errmsg= "binlog truncated in the middle of event; " + "consider out of disk space on master"; break; case LOG_READ_CHECKSUM_FAILURE: - *errmsg = "event read from binlog did not pass crc check"; + info->errmsg= "event read from binlog did not pass crc check"; break; default: - *errmsg = "unknown error reading log event on the master"; + info->errmsg= "unknown error reading log event on the master"; break; } - return error; } @@ -710,11 +772,17 @@ get_slave_until_gtid(THD *thd, String *out_str) The error to send is serious and should force terminating the dump thread. */ -static int send_heartbeat_event(NET* net, String* packet, +static int send_heartbeat_event(binlog_send_info *info, + NET* net, String* packet, const struct event_coordinates *coord, uint8 checksum_alg_arg) { DBUG_ENTER("send_heartbeat_event"); + + ulong ev_offset; + if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) + DBUG_RETURN(1); + char header[LOG_EVENT_HEADER_LEN]; my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; @@ -753,8 +821,10 @@ static int send_heartbeat_event(NET* net, String* packet, if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) || net_flush(net)) { + info->error= ER_UNKNOWN_ERROR; DBUG_RETURN(-1); } + DBUG_RETURN(0); } @@ -1552,7 +1622,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset, Send a last fake Gtid_list_log_event with a flag set to mark that we stop due to UNTIL condition. */ - if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg)) + if (reset_transmit_packet(info, info->flags, ev_offset, errmsg)) return true; Gtid_list_log_event glev(&info->until_binlog_state, Gtid_list_log_event::FLAG_UNTIL_REACHED); @@ -1593,14 +1663,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, current_checksum_alg, >id_list, &list_len, info->fdev)) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to read Gtid_list_log_event: corrupt binlog"; } err= info->until_binlog_state.load(gtid_list, list_len); my_free(gtid_list); if (err) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed in internal GTID book-keeping: Out of memory"; } } @@ -1622,7 +1692,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, &event_gtid.domain_id, &event_gtid.server_id, &event_gtid.seq_no, &flags2, info->fdev)) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to read Gtid_log_event: corrupt binlog"; } @@ -1634,14 +1704,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, { DBUG_SET("-d,gtid_force_reconnect_at_10_1_100"); DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100"); - my_errno= ER_UNKNOWN_ERROR; + info->error= ER_UNKNOWN_ERROR; return "DBUG-injected forced reconnect"; } }); if (info->until_binlog_state.update_nolock(&event_gtid, false)) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed in internal GTID book-keeping: Out of memory"; } @@ -1663,7 +1733,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, const char *errormsg; *error_gtid= *gtid; give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid); - my_errno= err; + info->error= err; return errormsg; } gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN; @@ -1687,7 +1757,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, exist, even though both the prior and subsequent seq_no exists for same domain_id and server_id. */ - my_errno= ER_GTID_START_FROM_BINLOG_HOLE; + info->error= ER_GTID_START_FROM_BINLOG_HOLE; *error_gtid= *gtid; return "The binlog on the master is missing the GTID requested " "by the slave (even though both a prior and a subsequent " @@ -1812,7 +1882,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace row annotate event with dummy: too small event."; } } @@ -1834,7 +1904,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, current_checksum_alg); if (err) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace GTID event with backwards-compatible event: " "currupt event."; } @@ -1865,7 +1935,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to replace binlog checkpoint or gtid list event with " "dummy: too small event."; } @@ -1893,13 +1963,13 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, if (RUN_HOOK(binlog_transmit, before_send_event, (info->thd, info->flags, packet, info->log_file_name, pos))) { - my_errno= ER_UNKNOWN_ERROR; + info->error= ER_UNKNOWN_ERROR; return "run 'before_send_event' hook failed"; } if (my_net_write(info->net, (uchar*) packet->ptr(), len)) { - my_errno= ER_UNKNOWN_ERROR; + info->error= ER_UNKNOWN_ERROR; return "Failed on my_net_write()"; } @@ -1908,7 +1978,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, { if (send_file(info->thd)) { - my_errno= ER_UNKNOWN_ERROR; + info->error= ER_UNKNOWN_ERROR; return "failed in send_file()"; } } @@ -1916,83 +1986,91 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, if (RUN_HOOK(binlog_transmit, after_send_event, (info->thd, info->flags, packet))) { - my_errno= ER_UNKNOWN_ERROR; + info->error= ER_UNKNOWN_ERROR; return "Failed to run hook 'after_send_event'"; } return NULL; /* Success */ } - -void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, - ushort flags) +static int check_start_offset(binlog_send_info *info, + const char *log_file_name, + my_off_t pos) { - LOG_INFO linfo; - char *log_file_name = linfo.log_file_name; - char search_file_name[FN_REFLEN], *name; + IO_CACHE log; + File file= -1; - ulong ev_offset; + /** check that requested position is inside of file */ + if ((file=open_binlog(&log, log_file_name, &info->errmsg)) < 0) + { + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return 1; + } - IO_CACHE log; - File file = -1; - String* const packet= &thd->packet; + if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log)) + { + const char* msg= "Client requested master to start replication from " + "impossible position"; + + info->errmsg= NULL; // don't do further modifications of error_text + snprintf(info->error_text, sizeof(info->error_text), + "%s; the first event '%s' at %lld, " + "the last event read from '%s' at %d, " + "the last byte read from '%s' at %d.", + msg, + my_basename(info->start_log_file_name), pos, + my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE, + my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE); + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; + } + +err: + end_io_cache(&log); + mysql_file_close(file, MYF(MY_WME)); + return info->error; +} + +static int init_binlog_sender(binlog_send_info *info, + LOG_INFO *linfo, + const char *log_ident, + my_off_t *pos) +{ + THD *thd= info->thd; int error; - const char *errmsg = "Unknown error", *tmp_msg; - char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message() - mysql_mutex_t *log_lock; - mysql_cond_t *log_cond; char str_buf[128]; String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); char str_buf2[128]; String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info); - slave_connection_state until_gtid_state_obj; - rpl_gtid error_gtid; - binlog_send_info info(thd, packet, flags, log_file_name); + connect_gtid_state.length(0); - int old_max_allowed_packet= thd->variables.max_allowed_packet; + /** save start file/pos that was requested by slave */ + strmake(info->start_log_file_name, log_ident, + sizeof(info->start_log_file_name)); + info->start_pos= *pos; -#ifndef DBUG_OFF - int left_events = max_binlog_dump_events; - uint dbug_reconnect_counter= 0; -#endif - DBUG_ENTER("mysql_binlog_send"); - DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); + /** init last pos */ + info->last_pos= *pos; - bzero((char*) &log,sizeof(log)); - bzero(&error_gtid, sizeof(error_gtid)); - /* - heartbeat_period from @master_heartbeat_period user variable - */ - ulonglong heartbeat_period= get_heartbeat_period(thd); - struct timespec heartbeat_buf; - struct timespec *heartbeat_ts= NULL; - const LOG_POS_COORD start_coord= { log_ident, pos }, - *p_start_coord= &start_coord; - LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE }, - *p_coord= &coord_buf; - if (heartbeat_period != 0) - { - heartbeat_ts= &heartbeat_buf; - set_timespec_nsec(*heartbeat_ts, 0); - } - info.mariadb_slave_capability= get_mariadb_slave_capability(thd); + info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd); + info->mariadb_slave_capability= get_mariadb_slave_capability(thd); + info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); + DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", + info->using_gtid_state= false;); - connect_gtid_state.length(0); - info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); - DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;); - if (info.using_gtid_state) + if (info->using_gtid_state) { - info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); - info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd); - if(get_slave_until_gtid(thd, &slave_until_gtid_str)) - info.until_gtid_state= &until_gtid_state_obj; + info->slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); + info->slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd); + if (get_slave_until_gtid(thd, &slave_until_gtid_str)) + info->until_gtid_state= &info->until_gtid_state_obj; } DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events", { DBUG_SET("-d,binlog_force_reconnect_after_22_events"); DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events"); - dbug_reconnect_counter= 22; + info->dbug_reconnect_counter= 22; }); /* @@ -2008,774 +2086,843 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, }); if (global_system_variables.log_warnings > 1) - sql_print_information("Start binlog_dump to slave_server(%lu), pos(%s, %lu)", - thd->variables.server_id, log_ident, (ulong)pos); - if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) - { - errmsg= "Failed to run hook 'transmit_start'"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + sql_print_information( + "Start binlog_dump to slave_server(%lu), pos(%s, %lu)", + thd->variables.server_id, log_ident, (ulong)*pos); #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) { - errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover"; - my_errno= ER_UNKNOWN_ERROR; - goto err; + info->errmsg= "Master failed COM_BINLOG_DUMP to test if slave can recover"; + info->error= ER_UNKNOWN_ERROR; + return 1; } #endif - if (!(info.fdev= new Format_description_log_event(3))) - { - errmsg= "Out of memory initializing format_description event"; - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; - } - if (!mysql_bin_log.is_open()) { - errmsg = "Binary log is not open"; - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; + info->errmsg= "Binary log is not open"; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return 1; } if (!server_id_supplied) { - errmsg = "Misconfigured master - server id was not set"; - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; + info->errmsg= "Misconfigured master - server id was not set"; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return 1; } - name=search_file_name; - if (info.using_gtid_state) + char search_file_name[FN_REFLEN]; + const char *name=search_file_name; + if (info->using_gtid_state) { - if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(), + if (info->gtid_state.load(connect_gtid_state.c_ptr_quick(), connect_gtid_state.length())) { - errmsg= "Out of memory or malformed slave request when obtaining start " - "position from GTID state"; - my_errno= ER_UNKNOWN_ERROR; - goto err; + info->errmsg= "Out of memory or malformed slave request when obtaining " + "start position from GTID state"; + info->error= ER_UNKNOWN_ERROR; + return 1; } - if (info.until_gtid_state && - info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), + if (info->until_gtid_state && + info->until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), slave_until_gtid_str.length())) { - errmsg= "Out of memory or malformed slave request when obtaining UNTIL " - "position sent from slave"; - my_errno= ER_UNKNOWN_ERROR; - goto err; + info->errmsg= "Out of memory or malformed slave request when " + "obtaining UNTIL position sent from slave"; + info->error= ER_UNKNOWN_ERROR; + return 1; } - if ((error= check_slave_start_position(&info, &errmsg, &error_gtid))) + if ((error= check_slave_start_position(info, &info->errmsg, + &info->error_gtid))) { - my_errno= error; - goto err; + info->error= error; + return 1; } - if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name, - info.until_gtid_state))) + if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state, + search_file_name, + info->until_gtid_state))) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return 1; } - pos= 4; + + /* start from beginning of binlog file */ + *pos = 4; } else { if (log_ident[0]) mysql_bin_log.make_log_name(search_file_name, log_ident); else - name=0; // Find first log + name=0; // Find first log } + linfo->index_file_offset= 0; - linfo.index_file_offset = 0; - - if (mysql_bin_log.find_log_pos(&linfo, name, 1)) + if (mysql_bin_log.find_log_pos(linfo, name, 1)) { - errmsg = "Could not find first log file name in binary log index file"; - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; + info->errmsg= "Could not find first log file name in binary " + "log index file"; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return 1; } + // set current pos too + linfo->pos= *pos; + + // note: publish that we use file, before we open it mysql_mutex_lock(&LOCK_thread_count); - thd->current_linfo = &linfo; + thd->current_linfo= linfo; mysql_mutex_unlock(&LOCK_thread_count); - if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0) + if (check_start_offset(info, linfo->log_file_name, *pos)) + return 1; + + if (*pos > BIN_LOG_HEADER_SIZE) { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; + /* + mark that first format descriptor with "log_pos=0", so the slave + should not increment master's binlog position + (rli->group_master_log_pos) + */ + info->clear_initial_log_pos= true; } - if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log)) + + return 0; +} + +/** + * send format descriptor event for one binlog file + */ +static int send_format_descriptor_event(binlog_send_info *info, + IO_CACHE *log, + LOG_INFO *linfo, + my_off_t start_pos) +{ + int error; + ulong ev_offset; + THD *thd= info->thd; + String *packet= info->packet; + Log_event_type event_type; + + /** + * 1) reset fdev before each log-file + * 2) read first event, should be the format descriptor + * 3) read second event, *might* be start encryption event + * if it's isn't, seek back to undo this read + */ + if (info->fdev != NULL) + delete info->fdev; + + if (!(info->fdev= new Format_description_log_event(3))) { - errmsg= "Client requested master to start replication from \ -impossible position"; - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; + info->errmsg= "Out of memory initializing format_description event"; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return 1; } - /* reset transmit packet for the fake rotate event below */ - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) - goto err; - - /* - Tell the client about the log name with a fake Rotate event; - this is needed even if we also send a Format_description_log_event - just after, because that event does not contain the binlog's name. - Note that as this Rotate event is sent before - Format_description_log_event, the slave cannot have any info to - understand this event's format, so the header len of - Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter - than other events except FORMAT_DESCRIPTION_EVENT). - Before 4.0.14 we called fake_rotate_event below only if (pos == - BIN_LOG_HEADER_SIZE), because if this is false then the slave - already knows the binlog's name. - Since, we always call fake_rotate_event; if the slave already knew - the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is - useless but does not harm much. It is nice for 3.23 (>=.58) slaves - which test Rotate events to see if the master is 4.0 (then they - choose to stop because they can't replicate 4.0); by always calling - fake_rotate_event we are sure that 3.23.58 and newer will detect the - problem as soon as replication starts (BUG#198). - Always calling fake_rotate_event makes sending of normal - (=from-binlog) Rotate events a priori unneeded, but it is not so - simple: the 2 Rotate events are not equivalent, the normal one is - before the Stop event, the fake one is after. If we don't send the - normal one, then the Stop event will be interpreted (by existing 4.0 - slaves) as "the master stopped", which is wrong. So for safety, - given that we want minimum modification of 4.0, we send the normal - and fake Rotates. - */ - if (fake_rotate_event(&info, pos, &errmsg, - get_binlog_checksum_value_at_connect(thd))) + do { + /* reset transmit packet for the event read from binary log file */ + if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) + break; + /* - This error code is not perfect, as fake_rotate_event() does not - read anything from the binlog; if it fails it's because of an - error in my_net_write(), fortunately it will say so in errmsg. + Try to find a Format_description_log_event at the beginning of + the binlog */ - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; + info->last_pos= my_b_tell(log); + error= Log_event::read_log_event(log, packet, /* LOCK_log */ NULL, + info->current_checksum_alg); + linfo->pos= my_b_tell(log); + + if (error) + { + set_read_error(info, error); + break; + } + + event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); + + /* + The packet has offsets equal to the normal offsets in a + binlog event + ev_offset (the first ev_offset characters are + the header (default \0)). + */ + DBUG_PRINT("info", + ("Looked for a Format_description_log_event, " + "found event type %d", (int)event_type)); + + if (event_type != FORMAT_DESCRIPTION_EVENT) + { + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->errmsg= "Failed to find format descriptor event in start of binlog"; + sql_print_warning("Failed to find format descriptor event in " + "start of binlog: %s", + info->log_file_name); + break; + } + + info->current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, + packet->length() - ev_offset); + + DBUG_ASSERT(info->current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + info->current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + info->current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + + if (!is_slave_checksum_aware(thd) && + info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + { + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->errmsg= "Slave can not handle replication events with the " + "checksum that master is configured to log"; + sql_print_warning("Master is configured to log replication events " + "with checksum, but will not send such events to " + "slaves that cannot process them"); + break; + } + + Format_description_log_event *tmp; + if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, + packet->length()-ev_offset, + info->fdev))) + { + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->errmsg= "Corrupt Format_description event found " + "or out-of-memory"; + break; + } + delete info->fdev; + info->fdev= tmp; + + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; + + if (info->clear_initial_log_pos) + { + info->clear_initial_log_pos= false; + /* + mark that this event with "log_pos=0", so the slave + should not increment master's binlog position + (rli->group_master_log_pos) + */ + int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0); + + /* + if reconnect master sends FD event with `created' as 0 + to avoid destroying temp tables. + */ + int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ + ST_CREATED_OFFSET+ev_offset, (ulong) 0); + + /* fix the checksum due to latest changes in header */ + if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + fix_checksum(packet, ev_offset); + } + + /* send it */ + if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length())) + { + info->errmsg= "Failed on my_net_write()"; + info->error= ER_UNKNOWN_ERROR; + break; + } + + /** all done */ + return 0; + + } while (false); + + return 1; +} + +static bool should_stop(binlog_send_info *info) +{ + return + info->net->error || + info->net->vio == NULL || + info->thd->killed || + info->error != 0 || + info->should_stop; +} + +/** + * wait for new events to enter binlog + * this function will send heartbeats while waiting if so configured + */ +static int wait_new_events(binlog_send_info *info, /* in */ + LOG_INFO* linfo, /* in */ + char binlog_end_pos_filename[], /* out */ + my_off_t *end_pos_ptr) /* out */ +{ + int ret= 1; + PSI_stage_info old_stage; + + mysql_bin_log.lock_binlog_end_pos(); + info->thd->ENTER_COND(mysql_bin_log.get_log_cond(), + mysql_bin_log.get_binlog_end_pos_lock(), + &stage_master_has_sent_all_binlog_to_slave, + &old_stage); + + while (!should_stop(info)) + { + *end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename); + if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0) + { + /* there has been a log file switch, we don't need to wait */ + ret= 0; + break; + } + + if (linfo->pos < *end_pos_ptr) + { + /* there is data to read, we don't need to wait */ + ret= 0; + break; + } + + if (info->heartbeat_period) + { + struct timespec ts; + set_timespec_nsec(ts, info->heartbeat_period); + ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, &ts); + if (ret == ETIMEDOUT || ret == ETIME) + { + struct event_coordinates coord = { linfo->log_file_name, linfo->pos }; +#ifndef DBUG_OFF + const ulong hb_info_counter_limit = 3; + if (info->hb_info_counter < hb_info_counter_limit) + { + sql_print_information("master sends heartbeat message %s:%llu", + linfo->log_file_name, linfo->pos); + info->hb_info_counter++; + if (info->hb_info_counter == hb_info_counter_limit) + sql_print_information("the rest of heartbeat info skipped ..."); + } +#endif + mysql_bin_log.unlock_binlog_end_pos(); + ret= send_heartbeat_event(info, + info->net, info->packet, &coord, + info->current_checksum_alg); + mysql_bin_log.lock_binlog_end_pos(); + + if (ret) + { + ret= 1; // error + break; + } + /** + * re-read heartbeat period after each sent + */ + info->heartbeat_period= get_heartbeat_period(info->thd); + } + else if (ret != 0) + { + ret= 1; // error + break; + } + } + else + { + ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL); + if (ret != 0 && ret != ETIMEDOUT && ret != ETIME) + { + ret= 1; // error + break; + } + } } - /* - Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become - this larger than the corresponding packet (query) sent - from client to master. - */ - thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET; + /* it releases the lock set in ENTER_COND */ + info->thd->EXIT_COND(&old_stage); + return ret; +} - /* - We can set log_lock now, it does not move (it's a member of - mysql_bin_log, and it's already inited, and it will be destroyed - only at shutdown). - */ - p_coord->pos= pos; // the first hb matches the slave's last seen value - log_lock= mysql_bin_log.get_log_lock(); - log_cond= mysql_bin_log.get_log_cond(); - if (pos > BIN_LOG_HEADER_SIZE) +/** + * get end pos of current log file, this function + * will wait if there is nothing available + */ +static my_off_t get_binlog_end_pos(binlog_send_info *info, + IO_CACHE* log, + LOG_INFO* linfo) +{ + my_off_t log_pos= my_b_tell(log); + + /** + * get current binlog end pos + */ + mysql_bin_log.lock_binlog_end_pos(); + char binlog_end_pos_filename[FN_REFLEN]; + my_off_t end_pos= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename); + mysql_bin_log.unlock_binlog_end_pos(); + + do { - /* reset transmit packet for the event read from binary log - file */ - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) - goto err; + if (strcmp(binlog_end_pos_filename, linfo->log_file_name) != 0) + { + /** + * this file is not active, since it's not written to again, + * it safe to check file length and use that as end_pos + */ + end_pos= my_b_filelength(log); - /* - Try to find a Format_description_log_event at the beginning of - the binlog - */ - if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0))) - { - /* - The packet has offsets equal to the normal offsets in a - binlog event + ev_offset (the first ev_offset characters are - the header (default \0)). + if (log_pos == end_pos) + return 0; // already at end of file inactive file + else + return end_pos; // return size of inactive file + } + else + { + /** + * this is the active file */ - DBUG_PRINT("info", - ("Looked for a Format_description_log_event, found event type %d", - (*packet)[EVENT_TYPE_OFFSET+ev_offset])); - if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT) - { - Format_description_log_event *tmp; - - info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, - packet->length() - ev_offset); - DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || - info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || - info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); - if (!is_slave_checksum_aware(thd) && - info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) - { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - errmsg= "Slave can not handle replication events with the checksum " - "that master is configured to log"; - sql_print_warning("Master is configured to log replication events " - "with checksum, but will not send such events to " - "slaves that cannot process them"); - goto err; - } - - if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, - packet->length()-ev_offset, - info.fdev))) - { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - errmsg= "Corrupt Format_description event found or out-of-memory"; - goto err; - } - delete info.fdev; - info.fdev= tmp; - - (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; - /* - mark that this event with "log_pos=0", so the slave - should not increment master's binlog position - (rli->group_master_log_pos) - */ - int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0); - /* - if reconnect master sends FD event with `created' as 0 - to avoid destroying temp tables. - */ - int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ - ST_CREATED_OFFSET+ev_offset, (ulong) 0); - - /* fix the checksum due to latest changes in header */ - if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) - fix_checksum(packet, ev_offset); - - /* send it */ - if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length())) - { - errmsg = "Failed on my_net_write()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - - /* - No need to save this event. We are only doing simple reads - (no real parsing of the events) so we don't need it. And so - we don't need the artificial Format_description_log_event of - 3.23&4.x. + + if (log_pos < end_pos) + { + /** + * there is data available to read */ - } - } - else - { - if (test_for_non_eof_log_read_errors(error, &errmsg)) - goto err; - /* - It's EOF, nothing to do, go on reading next events, the - Format_description_log_event will be found naturally if it is written. + return end_pos; + } + + /** + * check if we should wait for more data */ - } - } /* end of if (pos > BIN_LOG_HEADER_SIZE); */ - else - { - /* The Format_description_log_event event will be found naturally. */ - } + if ((info->flags & BINLOG_DUMP_NON_BLOCK) || + (info->thd->variables.server_id == 0)) + { + info->should_stop= true; + return 0; + } - /* - Handle the case of START SLAVE UNTIL with an UNTIL condition already - fulfilled at the start position. + /** + * flush data before waiting + */ + if (net_flush(info->net)) + { + info->errmsg= "failed on net_flush()"; + info->error= ER_UNKNOWN_ERROR; + return 1; + } - We will send one event, the format_description, and then stop. - */ - if (info.until_gtid_state && info.until_gtid_state->count() == 0) - info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; + if (wait_new_events(info, linfo, binlog_end_pos_filename, &end_pos)) + return 1; + } + } while (!should_stop(info)); - /* seek to the requested position, to start the requested dump */ - my_b_seek(&log, pos); // Seek will done on next read + return 0; +} + +/** + * This function sends events from one binlog file + * but only up until end_pos + * + * return 0 - OK + * else NOK + */ +static int send_events(binlog_send_info *info, + IO_CACHE* log, + LOG_INFO* linfo, + my_off_t end_pos) +{ + int error; + ulong ev_offset; - while (!info.net->error && info.net->vio != 0 && !thd->killed) + String *packet= info->packet; + linfo->pos= my_b_tell(log); + info->last_pos= my_b_tell(log); + + while (linfo->pos < end_pos) { - Log_event_type event_type= UNKNOWN_EVENT; - killed_state killed; + if (should_stop(info)) + return 0; /* reset the transmit packet for the event read from binary log file */ - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) - goto err; + if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) + return 1; - bool is_active_binlog= false; - while (!(killed= thd->killed) && - !(error = Log_event::read_log_event(&log, packet, log_lock, - info.current_checksum_alg, - log_file_name, - &is_active_binlog))) + info->last_pos= linfo->pos; + error = Log_event::read_log_event(log, packet, /* LOCK_log */ NULL, + info->current_checksum_alg, + NULL, NULL); + linfo->pos= my_b_tell(log); + + if (error) { + goto read_err; + } + + Log_event_type event_type= + (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); + #ifndef DBUG_OFF - if (max_binlog_dump_events && !left_events--) + if (info->dbug_reconnect_counter > 0) + { + --info->dbug_reconnect_counter; + if (info->dbug_reconnect_counter == 0) { - net_flush(info.net); - errmsg = "Debugging binlog dump abort"; - my_errno= ER_UNKNOWN_ERROR; - goto err; + info->errmsg= "DBUG-injected forced reconnect"; + info->error= ER_UNKNOWN_ERROR; + return 1; } + } #endif - /* - log's filename does not change while it's active - */ - p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); - event_type= - (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); #ifdef ENABLED_DEBUG_SYNC - DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", + DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", + { + if (event_type == XID_EVENT) { - if (event_type == XID_EVENT) - { - net_flush(info.net); - const char act[]= + net_flush(info->net); + const char act[]= "now " "wait_for signal.continue"; - DBUG_ASSERT(debug_sync_service); - DBUG_ASSERT(!debug_sync_set_action(thd, - STRING_WITH_LEN(act))); - const char act2[]= + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action( + info->thd, + STRING_WITH_LEN(act))); + + const char act2[]= "now " "signal signal.continued"; - DBUG_ASSERT(!debug_sync_set_action(current_thd, - STRING_WITH_LEN(act2))); - } - }); + DBUG_ASSERT(!debug_sync_set_action( + info->thd, + STRING_WITH_LEN(act2))); + } + }); #endif - if (event_type == FORMAT_DESCRIPTION_EVENT) - { - Format_description_log_event *tmp; - - info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, - packet->length() - ev_offset); - DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || - info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || - info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); - if (!is_slave_checksum_aware(thd) && - info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) - { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - errmsg= "Slave can not handle replication events with the checksum " - "that master is configured to log"; - sql_print_warning("Master is configured to log replication events " - "with checksum, but will not send such events to " - "slaves that cannot process them"); - goto err; - } - - if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, - packet->length()-ev_offset, - info.fdev))) - { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - errmsg= "Corrupt Format_description event found or out-of-memory"; - goto err; - } - delete info.fdev; - info.fdev= tmp; - (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; - } + if ((info->errmsg= send_event_to_slave(info, event_type, log, + ev_offset, &info->error_gtid))) + return 1; -#ifndef DBUG_OFF - if (dbug_reconnect_counter > 0) - { - --dbug_reconnect_counter; - if (dbug_reconnect_counter == 0) - { - errmsg= "DBUG-injected forced reconnect"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - } -#endif + if (unlikely(info->send_fake_gtid_list) && + info->gtid_skip_group == GTID_SKIP_NOT) + { + Gtid_list_log_event glev(&info->until_binlog_state, 0); - if ((tmp_msg= send_event_to_slave(&info, event_type, &log, - ev_offset, &error_gtid))) + if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) || + fake_gtid_list_event(info, &glev, &info->errmsg, my_b_tell(log))) { - errmsg= tmp_msg; - goto err; + info->error= ER_UNKNOWN_ERROR; + return 1; } - if (unlikely(info.send_fake_gtid_list) && - info.gtid_skip_group == GTID_SKIP_NOT) - { - Gtid_list_log_event glev(&info.until_binlog_state, 0); + info->send_fake_gtid_list= false; + } - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) || - fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log))) - { - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - info.send_fake_gtid_list= false; - } - if (info.until_gtid_state && - is_until_reached(&info, &ev_offset, event_type, &errmsg, - my_b_tell(&log))) + if (info->until_gtid_state && + is_until_reached(info, &ev_offset, event_type, &info->errmsg, + my_b_tell(log))) + { + if (info->errmsg) { - if (errmsg) - { - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - goto end; + info->error= ER_UNKNOWN_ERROR; + return 1; } - - DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", - { - if (event_type == XID_EVENT) - { - net_flush(info.net); - } - }); - - /* Abort server before it sends the XID_EVENT */ - DBUG_EXECUTE_IF("crash_before_send_xid", - { - if (event_type == XID_EVENT) - { - my_sleep(2000000); - DBUG_SUICIDE(); - } - }); - - /* reset transmit packet for next loop */ - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) - goto err; + info->should_stop= true; + return 0; } - if (killed) - goto end; - DBUG_EXECUTE_IF("wait_after_binlog_EOF", + /* Abort server before it sends the XID_EVENT */ + DBUG_EXECUTE_IF("crash_before_send_xid", { - const char act[]= "now wait_for signal.rotate_finished"; - DBUG_ASSERT(!debug_sync_set_action(current_thd, - STRING_WITH_LEN(act))); - };); + if (event_type == XID_EVENT) + { + my_sleep(2000000); + DBUG_SUICIDE(); + } + }); + } - /* - TODO: now that we are logging the offset, check to make sure - the recorded offset and the actual match. - Guilhem 2003-06: this is not true if this master is a slave - <4.0.15 running with --log-slave-updates, because then log_pos may - be the offset in the-master-of-this-master's binlog. - */ - if (test_for_non_eof_log_read_errors(error, &errmsg)) - goto err; + return 0; - /* - We should only move to the next binlog when the last read event - came from a already deactivated binlog. +read_err: + set_read_error(info, error); + + return 1; +} + +/** + * This function sends one binlog file to slave + * + * return 0 - OK + * 1 - NOK + */ +static int send_one_binlog_file(binlog_send_info *info, + IO_CACHE* log, + LOG_INFO* linfo, + my_off_t start_pos) +{ + assert_LOCK_log_owner(false); // we don't have LOCK_log + + /* seek to the requested position, to start the requested dump */ + if (start_pos != BIN_LOG_HEADER_SIZE) + { + my_b_seek(log, start_pos); + linfo->pos= start_pos; + } + + while (!should_stop(info)) + { + /** + * get end pos of current log file, this function + * will wait if there is nothing available */ - if (!(flags & BINLOG_DUMP_NON_BLOCK) && is_active_binlog) + my_off_t end_pos= get_binlog_end_pos(info, log, linfo); + if (end_pos <= 1) { - /* - Block until there is more data in the log - */ - if (net_flush(info.net)) - { - errmsg = "failed on net_flush()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + /** end of file or error */ + return end_pos; + } - /* - We may have missed the update broadcast from the log - that has just happened, let's try to catch it if it did. - If we did not miss anything, we just wait for other threads - to signal us. - */ - { - log.error=0; - bool read_packet = 0; + /** + * send events from current position up to end_pos + */ + if (send_events(info, log, linfo, end_pos)) + return 1; + } -#ifndef DBUG_OFF - if (max_binlog_dump_events && !left_events--) - { - errmsg = "Debugging binlog dump abort"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } -#endif + return 1; +} - /* reset the transmit packet for the event read from binary log - file */ - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) - goto err; - - /* - No one will update the log while we are reading - now, but we'll be quick and just read one record - - TODO: - Add an counter that is incremented for each time we update the - binary log. We can avoid the following read if the counter - has not been updated since last read. - */ - - mysql_mutex_lock(log_lock); - switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0, - info.current_checksum_alg)) { - case 0: - /* we read successfully, so we'll need to send it to the slave */ - mysql_mutex_unlock(log_lock); - read_packet = 1; - p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); - event_type= - (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]); - break; - - case LOG_READ_EOF: - { - int ret; - ulong signal_cnt; - DBUG_PRINT("wait",("waiting for data in binary log")); - /* For mysqlbinlog (mysqlbinlog.server_id==0). */ - if (thd->variables.server_id==0) - { - mysql_mutex_unlock(log_lock); - goto end; - } +void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, + ushort flags) +{ + LOG_INFO linfo; -#ifndef DBUG_OFF - ulong hb_info_counter= 0; -#endif - PSI_stage_info old_stage; - signal_cnt= mysql_bin_log.signal_cnt; - do - { - if (heartbeat_period != 0) - { - DBUG_ASSERT(heartbeat_ts); - set_timespec_nsec(*heartbeat_ts, heartbeat_period); - } - thd->ENTER_COND(log_cond, log_lock, - &stage_master_has_sent_all_binlog_to_slave, - &old_stage); - if (thd->killed) - break; - ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts); - DBUG_ASSERT(ret == 0 || (heartbeat_period != 0)); - if (ret == ETIMEDOUT || ret == ETIME) - { -#ifndef DBUG_OFF - if (hb_info_counter < 3) - { - sql_print_information("master sends heartbeat message"); - hb_info_counter++; - if (hb_info_counter == 3) - sql_print_information("the rest of heartbeat info skipped ..."); - } -#endif - /* reset transmit packet for the heartbeat event */ - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) - { - thd->EXIT_COND(&old_stage); - goto err; - } - if (send_heartbeat_event(info.net, packet, p_coord, - info.current_checksum_alg)) - { - errmsg = "Failed on my_net_write()"; - my_errno= ER_UNKNOWN_ERROR; - thd->EXIT_COND(&old_stage); - goto err; - } - } - else - { - DBUG_PRINT("wait",("binary log received update or a broadcast signal caught")); - } - } while (signal_cnt == mysql_bin_log.signal_cnt); - thd->EXIT_COND(&old_stage); - } - break; - - default: - mysql_mutex_unlock(log_lock); - test_for_non_eof_log_read_errors(error, &errmsg); - goto err; - } - - if (read_packet) - { - if ((tmp_msg= send_event_to_slave(&info, event_type, &log, - ev_offset, &error_gtid))) - { - errmsg= tmp_msg; - goto err; - } - if (unlikely(info.send_fake_gtid_list) - && info.gtid_skip_group == GTID_SKIP_NOT) - { - Gtid_list_log_event glev(&info.until_binlog_state, 0); + IO_CACHE log; + File file = -1; + String* const packet= &thd->packet; - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) || - fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log))) - { - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - info.send_fake_gtid_list= false; - } - if (info.until_gtid_state && - is_until_reached(&info, &ev_offset, event_type, &errmsg, - my_b_tell(&log))) - { - if (errmsg) - { - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - goto end; - } - } + binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name); + binlog_send_info *info= &infoobj; - log.error=0; - } - } - else - { - bool loop_breaker = 0; - /* need this to break out of the for loop from switch */ + int old_max_allowed_packet= thd->variables.max_allowed_packet; + thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET; - THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog); - switch (mysql_bin_log.find_next_log(&linfo, 1)) { - case 0: - break; - case LOG_INFO_EOF: - if (mysql_bin_log.is_active(log_file_name)) - { - loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK); - break; - } - default: - errmsg = "could not find next log"; - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; - } + DBUG_ENTER("mysql_binlog_send"); + DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); - if (loop_breaker) - break; + bzero((char*) &log,sizeof(log)); - end_io_cache(&log); - mysql_file_close(file, MYF(MY_WME)); + if (init_binlog_sender(info, &linfo, log_ident, &pos)) + goto err; - /* reset transmit packet for the possible fake rotate event */ - if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) - goto err; - + /* + run hook first when all check has been made that slave seems to + be requesting a reasonable position. i.e when transmit actually starts + */ + if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + { + info->errmsg= "Failed to run hook 'transmit_start'"; + info->error= ER_UNKNOWN_ERROR; + goto err; + } + + /* + heartbeat_period from @master_heartbeat_period user variable + NOTE: this is initialized after transmit_start-hook so that + the hook can affect value of heartbeat period + */ + info->heartbeat_period= get_heartbeat_period(thd); + + while (!should_stop(info)) + { + /* + Tell the client about the log name with a fake Rotate event; + this is needed even if we also send a Format_description_log_event + just after, because that event does not contain the binlog's name. + Note that as this Rotate event is sent before + Format_description_log_event, the slave cannot have any info to + understand this event's format, so the header len of + Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter + than other events except FORMAT_DESCRIPTION_EVENT). + Before 4.0.14 we called fake_rotate_event below only if (pos == + BIN_LOG_HEADER_SIZE), because if this is false then the slave + already knows the binlog's name. + Since, we always call fake_rotate_event; if the slave already knew + the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is + useless but does not harm much. It is nice for 3.23 (>=.58) slaves + which test Rotate events to see if the master is 4.0 (then they + choose to stop because they can't replicate 4.0); by always calling + fake_rotate_event we are sure that 3.23.58 and newer will detect the + problem as soon as replication starts (BUG#198). + Always calling fake_rotate_event makes sending of normal + (=from-binlog) Rotate events a priori unneeded, but it is not so + simple: the 2 Rotate events are not equivalent, the normal one is + before the Stop event, the fake one is after. If we don't send the + normal one, then the Stop event will be interpreted (by existing 4.0 + slaves) as "the master stopped", which is wrong. So for safety, + given that we want minimum modification of 4.0, we send the normal + and fake Rotates. + */ + if (fake_rotate_event(info, pos, &info->errmsg, info->current_checksum_alg)) + { /* - Call fake_rotate_event() in case the previous log (the one which - we have just finished reading) did not contain a Rotate event - (for example (I don't know any other example) the previous log - was the last one before the master was shutdown & restarted). - This way we tell the slave about the new log's name and - position. If the binlog is 5.0, the next event we are going to - read and send is Format_description_log_event. + This error code is not perfect, as fake_rotate_event() does not + read anything from the binlog; if it fails it's because of an + error in my_net_write(), fortunately it will say so in errmsg. */ - if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 || - fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg, - info.current_checksum_alg)) - { - my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; - goto err; - } + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; + } - p_coord->file_name= log_file_name; // reset to the next + if ((file=open_binlog(&log, linfo.log_file_name, &info->errmsg)) < 0) + { + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; } - } -end: - end_io_cache(&log); - mysql_file_close(file, MYF(MY_WME)); + if (send_format_descriptor_event(info, &log, &linfo, pos)) + { + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; + } - RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); - my_eof(thd); + /* + We want to corrupt the first event that will be sent to the slave. + But we do not want the corruption to happen early, eg. when client does + BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to + set the real DBUG injection here. + */ + DBUG_EXECUTE_IF("corrupt_read_log_event_to_slave_set", + { + DBUG_SET("-d,corrupt_read_log_event_to_slave_set"); + DBUG_SET("+d,corrupt_read_log_event2"); + }); + + /* + Handle the case of START SLAVE UNTIL with an UNTIL condition already + fulfilled at the start position. + + We will send one event, the format_description, and then stop. + */ + if (info->until_gtid_state && info->until_gtid_state->count() == 0) + info->gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; + + THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave); + if (send_one_binlog_file(info, &log, &linfo, pos)) + break; + + if (should_stop(info)) + break; + + DBUG_EXECUTE_IF("wait_after_binlog_EOF", + { + const char act[]= "now wait_for signal.rotate_finished"; + DBUG_ASSERT(!debug_sync_set_action(current_thd, + STRING_WITH_LEN(act))); + };); + + THD_STAGE_INFO(thd, + stage_finished_reading_one_binlog_switching_to_next_binlog); + if (mysql_bin_log.find_next_log(&linfo, 1)) + { + info->errmsg= "could not find next log"; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + break; + } + + /** start from start of next file */ + pos= BIN_LOG_HEADER_SIZE; + + /** close current cache/file */ + end_io_cache(&log); + mysql_file_close(file, MYF(MY_WME)); + file= -1; + } + +err: THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); + + const bool binlog_open = my_b_inited(&log); + if (file >= 0) + { + end_io_cache(&log); + mysql_file_close(file, MYF(MY_WME)); + } + mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = 0; mysql_mutex_unlock(&LOCK_thread_count); thd->variables.max_allowed_packet= old_max_allowed_packet; - delete info.fdev; - DBUG_VOID_RETURN; + delete info->fdev; -err: - THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); - if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log)) + if (info->error == ER_MASTER_FATAL_ERROR_READING_BINLOG && binlog_open) { - /* - detailing the fatal error message with coordinates + /* + detailing the fatal error message with coordinates of the last position read. */ - my_snprintf(error_text, sizeof(error_text), + my_snprintf(info->error_text, sizeof(info->error_text), "%s; the first event '%s' at %lld, " "the last event read from '%s' at %lld, " "the last byte read from '%s' at %lld.", - errmsg, - my_basename(p_start_coord->file_name), p_start_coord->pos, - my_basename(p_coord->file_name), p_coord->pos, - my_basename(log_file_name), my_b_tell(&log)); + info->errmsg, + my_basename(info->start_log_file_name), info->start_pos, + my_basename(info->log_file_name), info->last_pos, + my_basename(info->log_file_name), linfo.pos); } - else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG) + else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG) { - my_snprintf(error_text, sizeof(error_text), + my_snprintf(info->error_text, sizeof(info->error_text), "Error: connecting slave requested to start from GTID " "%u-%u-%llu, which is not in the master's binlog", - error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + info->error_gtid.domain_id, + info->error_gtid.server_id, + info->error_gtid.seq_no); /* Use this error code so slave will know not to try reconnect. */ - my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; } - else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2) + else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2) { - my_snprintf(error_text, sizeof(error_text), + my_snprintf(info->error_text, sizeof(info->error_text), "Error: connecting slave requested to start from GTID " "%u-%u-%llu, which is not in the master's binlog. Since the " "master's binlog contains GTIDs with higher sequence numbers, " "it probably means that the slave has diverged due to " "executing extra errorneous transactions", - error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + info->error_gtid.domain_id, + info->error_gtid.server_id, + info->error_gtid.seq_no); /* Use this error code so slave will know not to try reconnect. */ - my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; } - else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE) + else if (info->error == ER_GTID_START_FROM_BINLOG_HOLE) { - my_snprintf(error_text, sizeof(error_text), + my_snprintf(info->error_text, sizeof(info->error_text), "The binlog on the master is missing the GTID %u-%u-%llu " "requested by the slave (even though both a prior and a " "subsequent sequence number does exist), and GTID strict mode " "is enabled", - error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + info->error_gtid.domain_id, + info->error_gtid.server_id, + info->error_gtid.seq_no); /* Use this error code so slave will know not to try reconnect. */ - my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; } - else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE) + else if (info->error == ER_CANNOT_LOAD_SLAVE_GTID_STATE) { - my_snprintf(error_text, sizeof(error_text), + my_snprintf(info->error_text, sizeof(info->error_text), "Failed to load replication slave GTID state from table %s.%s", "mysql", rpl_gtid_slave_state_table_name.str); - my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG; + } + else if (info->error != 0 && info->errmsg != NULL) + strcpy(info->error_text, info->errmsg); + + if (info->error == 0) + { + my_eof(thd); } else - strcpy(error_text, errmsg); - end_io_cache(&log); - RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); - /* - Exclude iteration through thread list - this is needed for purge_logs() - it will iterate through - thread list and update thd->current_linfo->index_file_offset - this mutex will make sure that it never tried to update our linfo - after we return from this stack frame - */ - mysql_mutex_lock(&LOCK_thread_count); - thd->current_linfo = 0; - mysql_mutex_unlock(&LOCK_thread_count); - if (file >= 0) - mysql_file_close(file, MYF(MY_WME)); - thd->variables.max_allowed_packet= old_max_allowed_packet; - delete info.fdev; + { + my_message(info->error, info->error_text, MYF(0)); + } - my_message(my_errno, error_text, MYF(0)); DBUG_VOID_RETURN; } |