summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
authorJonas Oreland <jonaso@google.com>2014-12-19 12:36:23 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-12-23 14:16:13 +0100
commit4d8b346e079a27960dbe49e4d0ec4364bed8d30e (patch)
tree249baee07701bc22748d1feefbebab88815cc60f /sql/sql_repl.cc
parentea01fff5053acc6526bc995c2a3c85474c844508 (diff)
downloadmariadb-git-4d8b346e079a27960dbe49e4d0ec4364bed8d30e.tar.gz
MDEV-7257: Dump Thread Enhancements
Make the binlog dump threads not need to take LOCK_log while sending binlog events to slave. Instead, a new LOCK_binlog_end_pos is used just to coordinate tracking the current end-of-log. This is a pre-requisite for MDEV-162, "Enhanced semisync replication". It should also help reduce the contention on LOCK_log on a busy master. Also does some much-needed refactoring/cleanup of the related code in the binlog dump thread.
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc1567
1 files changed, 857 insertions, 710 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index edc24e45bdb..6bc8d7e6c45 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -125,8 +125,9 @@ struct binlog_send_info {
THD *thd;
NET *net;
String *packet;
- char *log_file_name;
+ char *const log_file_name; // ptr/alias to linfo.log_file_name
slave_connection_state *until_gtid_state;
+ slave_connection_state until_gtid_state_obj;
Format_description_log_event *fdev;
int mariadb_slave_capability;
enum_gtid_skip_type gtid_skip_group;
@@ -138,16 +139,57 @@ struct binlog_send_info {
bool slave_gtid_ignore_duplicates;
bool using_gtid_state;
- binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn)
+ int error;
+ const char *errmsg;
+ char error_text[MAX_SLAVE_ERRMSG];
+ rpl_gtid error_gtid;
+
+ ulonglong heartbeat_period;
+
+ /** start file/pos as requested by slave, for error message */
+ char start_log_file_name[FN_REFLEN];
+ my_off_t start_pos;
+
+ /** last pos for error message */
+ my_off_t last_pos;
+
+#ifndef DBUG_OFF
+ int left_events;
+ uint dbug_reconnect_counter;
+ ulong hb_info_counter;
+#endif
+
+ bool clear_initial_log_pos;
+ bool should_stop;
+
+ binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg,
+ char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
slave_gtid_strict_mode(false), send_fake_gtid_list(false),
- slave_gtid_ignore_duplicates(false)
- { }
+ slave_gtid_ignore_duplicates(false),
+ error(0),
+ errmsg("Unknown error"),
+ heartbeat_period(0),
+#ifndef DBUG_OFF
+ left_events(max_binlog_dump_events),
+ dbug_reconnect_counter(0),
+ hb_info_counter(0),
+#endif
+ clear_initial_log_pos(false),
+ should_stop(false)
+ {
+ error_text[0] = 0;
+ bzero(&error_gtid, sizeof(error_gtid));
+ }
};
+// prototype
+static int reset_transmit_packet(struct binlog_send_info *info, ushort flags,
+ ulong *ev_offset, const char **errmsg);
+
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
@@ -170,6 +212,7 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
const char** errmsg, uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
+ ulong ev_offset;
char buf[ROTATE_HEADER_LEN+100];
my_bool do_checksum;
int err;
@@ -178,10 +221,18 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
String *packet= info->packet;
ha_checksum crc;
+ /* reset transmit packet for the fake rotate event below */
+ if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
+ DBUG_RETURN(1);
+
if ((err= fake_event_header(packet, ROTATE_EVENT,
- ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc,
+ ident_len + ROTATE_HEADER_LEN, &do_checksum,
+ &crc,
errmsg, checksum_alg_arg, 0)))
+ {
+ info->error= ER_UNKNOWN_ERROR;
DBUG_RETURN(err);
+ }
int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN);
@@ -195,8 +246,10 @@ static int fake_rotate_event(binlog_send_info *info, ulonglong position,
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
(err= fake_event_write(info->net, packet, errmsg)))
+ {
+ info->error= ER_UNKNOWN_ERROR;
DBUG_RETURN(err);
-
+ }
DBUG_RETURN(0);
}
@@ -215,13 +268,17 @@ static int fake_gtid_list_event(binlog_send_info *info,
str.length(0);
if (glev->to_packet(&str))
{
+ info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed due to out-of-memory writing Gtid_list event";
return -1;
}
if ((err= fake_event_header(packet, GTID_LIST_EVENT,
str.length(), &do_checksum, &crc,
errmsg, info->current_checksum_alg, current_pos)))
+ {
+ info->error= ER_UNKNOWN_ERROR;
return err;
+ }
packet->append(str);
if (do_checksum)
@@ -231,7 +288,10 @@ static int fake_gtid_list_event(binlog_send_info *info,
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
(err= fake_event_write(info->net, packet, errmsg)))
+ {
+ info->error= ER_UNKNOWN_ERROR;
return err;
+ }
return 0;
}
@@ -243,20 +303,20 @@ static int fake_gtid_list_event(binlog_send_info *info,
This function allocates header bytes for event transmission, and
should be called before store the event data to the packet buffer.
*/
-static int reset_transmit_packet(THD *thd, ushort flags,
+static int reset_transmit_packet(binlog_send_info *info, ushort flags,
ulong *ev_offset, const char **errmsg)
{
int ret= 0;
- String *packet= &thd->packet;
+ String *packet= &info->thd->packet;
/* reserve and set default header */
packet->length(0);
packet->set("\0", 1, &my_charset_bin);
- if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+ if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
{
+ info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed to run hook 'reserve_header'";
- my_errno= ER_UNKNOWN_ERROR;
ret= 1;
}
*ev_offset= packet->length();
@@ -556,36 +616,38 @@ bool purge_master_logs_before_date(THD* thd, time_t purge_time)
mysql_bin_log.purge_logs_before_date(purge_time));
}
-int test_for_non_eof_log_read_errors(int error, const char **errmsg)
+void set_read_error(binlog_send_info *info, int error)
{
if (error == LOG_READ_EOF)
- return 0;
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ {
+ return;
+ }
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
switch (error) {
case LOG_READ_BOGUS:
- *errmsg = "bogus data in log event";
+ info->errmsg= "bogus data in log event";
break;
case LOG_READ_TOO_LARGE:
- *errmsg = "log event entry exceeded max_allowed_packet; \
-Increase max_allowed_packet on master";
+ info->errmsg= "log event entry exceeded max_allowed_packet; "
+ "Increase max_allowed_packet on master";
break;
case LOG_READ_IO:
- *errmsg = "I/O error reading log event";
+ info->errmsg= "I/O error reading log event";
break;
case LOG_READ_MEM:
- *errmsg = "memory allocation failed reading log event";
+ info->errmsg= "memory allocation failed reading log event";
break;
case LOG_READ_TRUNC:
- *errmsg = "binlog truncated in the middle of event; consider out of disk space on master";
+ info->errmsg= "binlog truncated in the middle of event; "
+ "consider out of disk space on master";
break;
case LOG_READ_CHECKSUM_FAILURE:
- *errmsg = "event read from binlog did not pass crc check";
+ info->errmsg= "event read from binlog did not pass crc check";
break;
default:
- *errmsg = "unknown error reading log event on the master";
+ info->errmsg= "unknown error reading log event on the master";
break;
}
- return error;
}
@@ -710,11 +772,17 @@ get_slave_until_gtid(THD *thd, String *out_str)
The error to send is serious and should force terminating
the dump thread.
*/
-static int send_heartbeat_event(NET* net, String* packet,
+static int send_heartbeat_event(binlog_send_info *info,
+ NET* net, String* packet,
const struct event_coordinates *coord,
uint8 checksum_alg_arg)
{
DBUG_ENTER("send_heartbeat_event");
+
+ ulong ev_offset;
+ if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
+ DBUG_RETURN(1);
+
char header[LOG_EVENT_HEADER_LEN];
my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
@@ -753,8 +821,10 @@ static int send_heartbeat_event(NET* net, String* packet,
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ||
net_flush(net))
{
+ info->error= ER_UNKNOWN_ERROR;
DBUG_RETURN(-1);
}
+
DBUG_RETURN(0);
}
@@ -1552,7 +1622,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset,
Send a last fake Gtid_list_log_event with a flag set to mark that we
stop due to UNTIL condition.
*/
- if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg))
+ if (reset_transmit_packet(info, info->flags, ev_offset, errmsg))
return true;
Gtid_list_log_event glev(&info->until_binlog_state,
Gtid_list_log_event::FLAG_UNTIL_REACHED);
@@ -1593,14 +1663,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
current_checksum_alg,
&gtid_list, &list_len, info->fdev))
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_list_log_event: corrupt binlog";
}
err= info->until_binlog_state.load(gtid_list, list_len);
my_free(gtid_list);
if (err)
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory";
}
}
@@ -1622,7 +1692,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
&event_gtid.domain_id, &event_gtid.server_id,
&event_gtid.seq_no, &flags2, info->fdev))
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_log_event: corrupt binlog";
}
@@ -1634,14 +1704,14 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
{
DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100");
- my_errno= ER_UNKNOWN_ERROR;
+ info->error= ER_UNKNOWN_ERROR;
return "DBUG-injected forced reconnect";
}
});
if (info->until_binlog_state.update_nolock(&event_gtid, false))
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory";
}
@@ -1663,7 +1733,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
const char *errormsg;
*error_gtid= *gtid;
give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid);
- my_errno= err;
+ info->error= err;
return errormsg;
}
gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN;
@@ -1687,7 +1757,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
exist, even though both the prior and subsequent seq_no exists
for same domain_id and server_id.
*/
- my_errno= ER_GTID_START_FROM_BINLOG_HOLE;
+ info->error= ER_GTID_START_FROM_BINLOG_HOLE;
*error_gtid= *gtid;
return "The binlog on the master is missing the GTID requested "
"by the slave (even though both a prior and a subsequent "
@@ -1812,7 +1882,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
*/
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace row annotate event with dummy: too small event.";
}
}
@@ -1834,7 +1904,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
current_checksum_alg);
if (err)
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace GTID event with backwards-compatible event: "
"currupt event.";
}
@@ -1865,7 +1935,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
*/
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to replace binlog checkpoint or gtid list event with "
"dummy: too small event.";
}
@@ -1893,13 +1963,13 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
if (RUN_HOOK(binlog_transmit, before_send_event,
(info->thd, info->flags, packet, info->log_file_name, pos)))
{
- my_errno= ER_UNKNOWN_ERROR;
+ info->error= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
}
if (my_net_write(info->net, (uchar*) packet->ptr(), len))
{
- my_errno= ER_UNKNOWN_ERROR;
+ info->error= ER_UNKNOWN_ERROR;
return "Failed on my_net_write()";
}
@@ -1908,7 +1978,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
{
if (send_file(info->thd))
{
- my_errno= ER_UNKNOWN_ERROR;
+ info->error= ER_UNKNOWN_ERROR;
return "failed in send_file()";
}
}
@@ -1916,83 +1986,91 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
if (RUN_HOOK(binlog_transmit, after_send_event,
(info->thd, info->flags, packet)))
{
- my_errno= ER_UNKNOWN_ERROR;
+ info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
}
return NULL; /* Success */
}
-
-void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
- ushort flags)
+static int check_start_offset(binlog_send_info *info,
+ const char *log_file_name,
+ my_off_t pos)
{
- LOG_INFO linfo;
- char *log_file_name = linfo.log_file_name;
- char search_file_name[FN_REFLEN], *name;
+ IO_CACHE log;
+ File file= -1;
- ulong ev_offset;
+ /** check that requested position is inside of file */
+ if ((file=open_binlog(&log, log_file_name, &info->errmsg)) < 0)
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return 1;
+ }
- IO_CACHE log;
- File file = -1;
- String* const packet= &thd->packet;
+ if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
+ {
+ const char* msg= "Client requested master to start replication from "
+ "impossible position";
+
+ info->errmsg= NULL; // don't do further modifications of error_text
+ snprintf(info->error_text, sizeof(info->error_text),
+ "%s; the first event '%s' at %lld, "
+ "the last event read from '%s' at %d, "
+ "the last byte read from '%s' at %d.",
+ msg,
+ my_basename(info->start_log_file_name), pos,
+ my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE,
+ my_basename(info->start_log_file_name), BIN_LOG_HEADER_SIZE);
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
+
+err:
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+ return info->error;
+}
+
+static int init_binlog_sender(binlog_send_info *info,
+ LOG_INFO *linfo,
+ const char *log_ident,
+ my_off_t *pos)
+{
+ THD *thd= info->thd;
int error;
- const char *errmsg = "Unknown error", *tmp_msg;
- char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
- mysql_mutex_t *log_lock;
- mysql_cond_t *log_cond;
char str_buf[128];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
char str_buf2[128];
String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
- slave_connection_state until_gtid_state_obj;
- rpl_gtid error_gtid;
- binlog_send_info info(thd, packet, flags, log_file_name);
+ connect_gtid_state.length(0);
- int old_max_allowed_packet= thd->variables.max_allowed_packet;
+ /** save start file/pos that was requested by slave */
+ strmake(info->start_log_file_name, log_ident,
+ sizeof(info->start_log_file_name));
+ info->start_pos= *pos;
-#ifndef DBUG_OFF
- int left_events = max_binlog_dump_events;
- uint dbug_reconnect_counter= 0;
-#endif
- DBUG_ENTER("mysql_binlog_send");
- DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
+ /** init last pos */
+ info->last_pos= *pos;
- bzero((char*) &log,sizeof(log));
- bzero(&error_gtid, sizeof(error_gtid));
- /*
- heartbeat_period from @master_heartbeat_period user variable
- */
- ulonglong heartbeat_period= get_heartbeat_period(thd);
- struct timespec heartbeat_buf;
- struct timespec *heartbeat_ts= NULL;
- const LOG_POS_COORD start_coord= { log_ident, pos },
- *p_start_coord= &start_coord;
- LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
- *p_coord= &coord_buf;
- if (heartbeat_period != 0)
- {
- heartbeat_ts= &heartbeat_buf;
- set_timespec_nsec(*heartbeat_ts, 0);
- }
- info.mariadb_slave_capability= get_mariadb_slave_capability(thd);
+ info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
+ info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
+ info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ DBUG_EXECUTE_IF("simulate_non_gtid_aware_master",
+ info->using_gtid_state= false;);
- connect_gtid_state.length(0);
- info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
- DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;);
- if (info.using_gtid_state)
+ if (info->using_gtid_state)
{
- info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
- info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
- if(get_slave_until_gtid(thd, &slave_until_gtid_str))
- info.until_gtid_state= &until_gtid_state_obj;
+ info->slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+ info->slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
+ if (get_slave_until_gtid(thd, &slave_until_gtid_str))
+ info->until_gtid_state= &info->until_gtid_state_obj;
}
DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
{
DBUG_SET("-d,binlog_force_reconnect_after_22_events");
DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events");
- dbug_reconnect_counter= 22;
+ info->dbug_reconnect_counter= 22;
});
/*
@@ -2008,774 +2086,843 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
});
if (global_system_variables.log_warnings > 1)
- sql_print_information("Start binlog_dump to slave_server(%lu), pos(%s, %lu)",
- thd->variables.server_id, log_ident, (ulong)pos);
- if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
- {
- errmsg= "Failed to run hook 'transmit_start'";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ sql_print_information(
+ "Start binlog_dump to slave_server(%lu), pos(%s, %lu)",
+ thd->variables.server_id, log_ident, (ulong)*pos);
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
{
- errmsg = "Master failed COM_BINLOG_DUMP to test if slave can recover";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
+ info->errmsg= "Master failed COM_BINLOG_DUMP to test if slave can recover";
+ info->error= ER_UNKNOWN_ERROR;
+ return 1;
}
#endif
- if (!(info.fdev= new Format_description_log_event(3)))
- {
- errmsg= "Out of memory initializing format_description event";
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
- }
-
if (!mysql_bin_log.is_open())
{
- errmsg = "Binary log is not open";
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
+ info->errmsg= "Binary log is not open";
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return 1;
}
if (!server_id_supplied)
{
- errmsg = "Misconfigured master - server id was not set";
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
+ info->errmsg= "Misconfigured master - server id was not set";
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return 1;
}
- name=search_file_name;
- if (info.using_gtid_state)
+ char search_file_name[FN_REFLEN];
+ const char *name=search_file_name;
+ if (info->using_gtid_state)
{
- if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(),
+ if (info->gtid_state.load(connect_gtid_state.c_ptr_quick(),
connect_gtid_state.length()))
{
- errmsg= "Out of memory or malformed slave request when obtaining start "
- "position from GTID state";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
+ info->errmsg= "Out of memory or malformed slave request when obtaining "
+ "start position from GTID state";
+ info->error= ER_UNKNOWN_ERROR;
+ return 1;
}
- if (info.until_gtid_state &&
- info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
+ if (info->until_gtid_state &&
+ info->until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
slave_until_gtid_str.length()))
{
- errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
- "position sent from slave";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
+ info->errmsg= "Out of memory or malformed slave request when "
+ "obtaining UNTIL position sent from slave";
+ info->error= ER_UNKNOWN_ERROR;
+ return 1;
}
- if ((error= check_slave_start_position(&info, &errmsg, &error_gtid)))
+ if ((error= check_slave_start_position(info, &info->errmsg,
+ &info->error_gtid)))
{
- my_errno= error;
- goto err;
+ info->error= error;
+ return 1;
}
- if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name,
- info.until_gtid_state)))
+ if ((info->errmsg= gtid_find_binlog_file(&info->gtid_state,
+ search_file_name,
+ info->until_gtid_state)))
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return 1;
}
- pos= 4;
+
+ /* start from beginning of binlog file */
+ *pos = 4;
}
else
{
if (log_ident[0])
mysql_bin_log.make_log_name(search_file_name, log_ident);
else
- name=0; // Find first log
+ name=0; // Find first log
}
+ linfo->index_file_offset= 0;
- linfo.index_file_offset = 0;
-
- if (mysql_bin_log.find_log_pos(&linfo, name, 1))
+ if (mysql_bin_log.find_log_pos(linfo, name, 1))
{
- errmsg = "Could not find first log file name in binary log index file";
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
+ info->errmsg= "Could not find first log file name in binary "
+ "log index file";
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return 1;
}
+ // set current pos too
+ linfo->pos= *pos;
+
+ // note: publish that we use file, before we open it
mysql_mutex_lock(&LOCK_thread_count);
- thd->current_linfo = &linfo;
+ thd->current_linfo= linfo;
mysql_mutex_unlock(&LOCK_thread_count);
- if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0)
+ if (check_start_offset(info, linfo->log_file_name, *pos))
+ return 1;
+
+ if (*pos > BIN_LOG_HEADER_SIZE)
{
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
+ /*
+ mark that first format descriptor with "log_pos=0", so the slave
+ should not increment master's binlog position
+ (rli->group_master_log_pos)
+ */
+ info->clear_initial_log_pos= true;
}
- if (pos < BIN_LOG_HEADER_SIZE || pos > my_b_filelength(&log))
+
+ return 0;
+}
+
+/**
+ * send format descriptor event for one binlog file
+ */
+static int send_format_descriptor_event(binlog_send_info *info,
+ IO_CACHE *log,
+ LOG_INFO *linfo,
+ my_off_t start_pos)
+{
+ int error;
+ ulong ev_offset;
+ THD *thd= info->thd;
+ String *packet= info->packet;
+ Log_event_type event_type;
+
+ /**
+ * 1) reset fdev before each log-file
+ * 2) read first event, should be the format descriptor
+ * 3) read second event, *might* be start encryption event
+ * if it's isn't, seek back to undo this read
+ */
+ if (info->fdev != NULL)
+ delete info->fdev;
+
+ if (!(info->fdev= new Format_description_log_event(3)))
{
- errmsg= "Client requested master to start replication from \
-impossible position";
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
+ info->errmsg= "Out of memory initializing format_description event";
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return 1;
}
- /* reset transmit packet for the fake rotate event below */
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
- goto err;
-
- /*
- Tell the client about the log name with a fake Rotate event;
- this is needed even if we also send a Format_description_log_event
- just after, because that event does not contain the binlog's name.
- Note that as this Rotate event is sent before
- Format_description_log_event, the slave cannot have any info to
- understand this event's format, so the header len of
- Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
- than other events except FORMAT_DESCRIPTION_EVENT).
- Before 4.0.14 we called fake_rotate_event below only if (pos ==
- BIN_LOG_HEADER_SIZE), because if this is false then the slave
- already knows the binlog's name.
- Since, we always call fake_rotate_event; if the slave already knew
- the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
- useless but does not harm much. It is nice for 3.23 (>=.58) slaves
- which test Rotate events to see if the master is 4.0 (then they
- choose to stop because they can't replicate 4.0); by always calling
- fake_rotate_event we are sure that 3.23.58 and newer will detect the
- problem as soon as replication starts (BUG#198).
- Always calling fake_rotate_event makes sending of normal
- (=from-binlog) Rotate events a priori unneeded, but it is not so
- simple: the 2 Rotate events are not equivalent, the normal one is
- before the Stop event, the fake one is after. If we don't send the
- normal one, then the Stop event will be interpreted (by existing 4.0
- slaves) as "the master stopped", which is wrong. So for safety,
- given that we want minimum modification of 4.0, we send the normal
- and fake Rotates.
- */
- if (fake_rotate_event(&info, pos, &errmsg,
- get_binlog_checksum_value_at_connect(thd)))
+ do
{
+ /* reset transmit packet for the event read from binary log file */
+ if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
+ break;
+
/*
- This error code is not perfect, as fake_rotate_event() does not
- read anything from the binlog; if it fails it's because of an
- error in my_net_write(), fortunately it will say so in errmsg.
+ Try to find a Format_description_log_event at the beginning of
+ the binlog
*/
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
+ info->last_pos= my_b_tell(log);
+ error= Log_event::read_log_event(log, packet, /* LOCK_log */ NULL,
+ info->current_checksum_alg);
+ linfo->pos= my_b_tell(log);
+
+ if (error)
+ {
+ set_read_error(info, error);
+ break;
+ }
+
+ event_type= (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
+
+ /*
+ The packet has offsets equal to the normal offsets in a
+ binlog event + ev_offset (the first ev_offset characters are
+ the header (default \0)).
+ */
+ DBUG_PRINT("info",
+ ("Looked for a Format_description_log_event, "
+ "found event type %d", (int)event_type));
+
+ if (event_type != FORMAT_DESCRIPTION_EVENT)
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->errmsg= "Failed to find format descriptor event in start of binlog";
+ sql_print_warning("Failed to find format descriptor event in "
+ "start of binlog: %s",
+ info->log_file_name);
+ break;
+ }
+
+ info->current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+ packet->length() - ev_offset);
+
+ DBUG_ASSERT(info->current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ info->current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ info->current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+
+ if (!is_slave_checksum_aware(thd) &&
+ info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->errmsg= "Slave can not handle replication events with the "
+ "checksum that master is configured to log";
+ sql_print_warning("Master is configured to log replication events "
+ "with checksum, but will not send such events to "
+ "slaves that cannot process them");
+ break;
+ }
+
+ Format_description_log_event *tmp;
+ if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
+ packet->length()-ev_offset,
+ info->fdev)))
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->errmsg= "Corrupt Format_description event found "
+ "or out-of-memory";
+ break;
+ }
+ delete info->fdev;
+ info->fdev= tmp;
+
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+
+ if (info->clear_initial_log_pos)
+ {
+ info->clear_initial_log_pos= false;
+ /*
+ mark that this event with "log_pos=0", so the slave
+ should not increment master's binlog position
+ (rli->group_master_log_pos)
+ */
+ int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, (ulong) 0);
+
+ /*
+ if reconnect master sends FD event with `created' as 0
+ to avoid destroying temp tables.
+ */
+ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
+
+ /* fix the checksum due to latest changes in header */
+ if (info->current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ fix_checksum(packet, ev_offset);
+ }
+
+ /* send it */
+ if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length()))
+ {
+ info->errmsg= "Failed on my_net_write()";
+ info->error= ER_UNKNOWN_ERROR;
+ break;
+ }
+
+ /** all done */
+ return 0;
+
+ } while (false);
+
+ return 1;
+}
+
+static bool should_stop(binlog_send_info *info)
+{
+ return
+ info->net->error ||
+ info->net->vio == NULL ||
+ info->thd->killed ||
+ info->error != 0 ||
+ info->should_stop;
+}
+
+/**
+ * wait for new events to enter binlog
+ * this function will send heartbeats while waiting if so configured
+ */
+static int wait_new_events(binlog_send_info *info, /* in */
+ LOG_INFO* linfo, /* in */
+ char binlog_end_pos_filename[], /* out */
+ my_off_t *end_pos_ptr) /* out */
+{
+ int ret= 1;
+ PSI_stage_info old_stage;
+
+ mysql_bin_log.lock_binlog_end_pos();
+ info->thd->ENTER_COND(mysql_bin_log.get_log_cond(),
+ mysql_bin_log.get_binlog_end_pos_lock(),
+ &stage_master_has_sent_all_binlog_to_slave,
+ &old_stage);
+
+ while (!should_stop(info))
+ {
+ *end_pos_ptr= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
+ if (strcmp(linfo->log_file_name, binlog_end_pos_filename) != 0)
+ {
+ /* there has been a log file switch, we don't need to wait */
+ ret= 0;
+ break;
+ }
+
+ if (linfo->pos < *end_pos_ptr)
+ {
+ /* there is data to read, we don't need to wait */
+ ret= 0;
+ break;
+ }
+
+ if (info->heartbeat_period)
+ {
+ struct timespec ts;
+ set_timespec_nsec(ts, info->heartbeat_period);
+ ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, &ts);
+ if (ret == ETIMEDOUT || ret == ETIME)
+ {
+ struct event_coordinates coord = { linfo->log_file_name, linfo->pos };
+#ifndef DBUG_OFF
+ const ulong hb_info_counter_limit = 3;
+ if (info->hb_info_counter < hb_info_counter_limit)
+ {
+ sql_print_information("master sends heartbeat message %s:%llu",
+ linfo->log_file_name, linfo->pos);
+ info->hb_info_counter++;
+ if (info->hb_info_counter == hb_info_counter_limit)
+ sql_print_information("the rest of heartbeat info skipped ...");
+ }
+#endif
+ mysql_bin_log.unlock_binlog_end_pos();
+ ret= send_heartbeat_event(info,
+ info->net, info->packet, &coord,
+ info->current_checksum_alg);
+ mysql_bin_log.lock_binlog_end_pos();
+
+ if (ret)
+ {
+ ret= 1; // error
+ break;
+ }
+ /**
+ * re-read heartbeat period after each sent
+ */
+ info->heartbeat_period= get_heartbeat_period(info->thd);
+ }
+ else if (ret != 0)
+ {
+ ret= 1; // error
+ break;
+ }
+ }
+ else
+ {
+ ret= mysql_bin_log.wait_for_update_binlog_end_pos(info->thd, NULL);
+ if (ret != 0 && ret != ETIMEDOUT && ret != ETIME)
+ {
+ ret= 1; // error
+ break;
+ }
+ }
}
- /*
- Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
- this larger than the corresponding packet (query) sent
- from client to master.
- */
- thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
+ /* it releases the lock set in ENTER_COND */
+ info->thd->EXIT_COND(&old_stage);
+ return ret;
+}
- /*
- We can set log_lock now, it does not move (it's a member of
- mysql_bin_log, and it's already inited, and it will be destroyed
- only at shutdown).
- */
- p_coord->pos= pos; // the first hb matches the slave's last seen value
- log_lock= mysql_bin_log.get_log_lock();
- log_cond= mysql_bin_log.get_log_cond();
- if (pos > BIN_LOG_HEADER_SIZE)
+/**
+ * get end pos of current log file, this function
+ * will wait if there is nothing available
+ */
+static my_off_t get_binlog_end_pos(binlog_send_info *info,
+ IO_CACHE* log,
+ LOG_INFO* linfo)
+{
+ my_off_t log_pos= my_b_tell(log);
+
+ /**
+ * get current binlog end pos
+ */
+ mysql_bin_log.lock_binlog_end_pos();
+ char binlog_end_pos_filename[FN_REFLEN];
+ my_off_t end_pos= mysql_bin_log.get_binlog_end_pos(binlog_end_pos_filename);
+ mysql_bin_log.unlock_binlog_end_pos();
+
+ do
{
- /* reset transmit packet for the event read from binary log
- file */
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
- goto err;
+ if (strcmp(binlog_end_pos_filename, linfo->log_file_name) != 0)
+ {
+ /**
+ * this file is not active, since it's not written to again,
+ * it safe to check file length and use that as end_pos
+ */
+ end_pos= my_b_filelength(log);
- /*
- Try to find a Format_description_log_event at the beginning of
- the binlog
- */
- if (!(error = Log_event::read_log_event(&log, packet, log_lock, 0)))
- {
- /*
- The packet has offsets equal to the normal offsets in a
- binlog event + ev_offset (the first ev_offset characters are
- the header (default \0)).
+ if (log_pos == end_pos)
+ return 0; // already at end of file inactive file
+ else
+ return end_pos; // return size of inactive file
+ }
+ else
+ {
+ /**
+ * this is the active file
*/
- DBUG_PRINT("info",
- ("Looked for a Format_description_log_event, found event type %d",
- (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
- if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
- {
- Format_description_log_event *tmp;
-
- info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
- packet->length() - ev_offset);
- DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
- info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
- info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
- if (!is_slave_checksum_aware(thd) &&
- info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
- {
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- errmsg= "Slave can not handle replication events with the checksum "
- "that master is configured to log";
- sql_print_warning("Master is configured to log replication events "
- "with checksum, but will not send such events to "
- "slaves that cannot process them");
- goto err;
- }
-
- if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
- packet->length()-ev_offset,
- info.fdev)))
- {
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- errmsg= "Corrupt Format_description event found or out-of-memory";
- goto err;
- }
- delete info.fdev;
- info.fdev= tmp;
-
- (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
- /*
- mark that this event with "log_pos=0", so the slave
- should not increment master's binlog position
- (rli->group_master_log_pos)
- */
- int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
- /*
- if reconnect master sends FD event with `created' as 0
- to avoid destroying temp tables.
- */
- int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
- ST_CREATED_OFFSET+ev_offset, (ulong) 0);
-
- /* fix the checksum due to latest changes in header */
- if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
- fix_checksum(packet, ev_offset);
-
- /* send it */
- if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length()))
- {
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
-
- /*
- No need to save this event. We are only doing simple reads
- (no real parsing of the events) so we don't need it. And so
- we don't need the artificial Format_description_log_event of
- 3.23&4.x.
+
+ if (log_pos < end_pos)
+ {
+ /**
+ * there is data available to read
*/
- }
- }
- else
- {
- if (test_for_non_eof_log_read_errors(error, &errmsg))
- goto err;
- /*
- It's EOF, nothing to do, go on reading next events, the
- Format_description_log_event will be found naturally if it is written.
+ return end_pos;
+ }
+
+ /**
+ * check if we should wait for more data
*/
- }
- } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
- else
- {
- /* The Format_description_log_event event will be found naturally. */
- }
+ if ((info->flags & BINLOG_DUMP_NON_BLOCK) ||
+ (info->thd->variables.server_id == 0))
+ {
+ info->should_stop= true;
+ return 0;
+ }
- /*
- Handle the case of START SLAVE UNTIL with an UNTIL condition already
- fulfilled at the start position.
+ /**
+ * flush data before waiting
+ */
+ if (net_flush(info->net))
+ {
+ info->errmsg= "failed on net_flush()";
+ info->error= ER_UNKNOWN_ERROR;
+ return 1;
+ }
- We will send one event, the format_description, and then stop.
- */
- if (info.until_gtid_state && info.until_gtid_state->count() == 0)
- info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+ if (wait_new_events(info, linfo, binlog_end_pos_filename, &end_pos))
+ return 1;
+ }
+ } while (!should_stop(info));
- /* seek to the requested position, to start the requested dump */
- my_b_seek(&log, pos); // Seek will done on next read
+ return 0;
+}
+
+/**
+ * This function sends events from one binlog file
+ * but only up until end_pos
+ *
+ * return 0 - OK
+ * else NOK
+ */
+static int send_events(binlog_send_info *info,
+ IO_CACHE* log,
+ LOG_INFO* linfo,
+ my_off_t end_pos)
+{
+ int error;
+ ulong ev_offset;
- while (!info.net->error && info.net->vio != 0 && !thd->killed)
+ String *packet= info->packet;
+ linfo->pos= my_b_tell(log);
+ info->last_pos= my_b_tell(log);
+
+ while (linfo->pos < end_pos)
{
- Log_event_type event_type= UNKNOWN_EVENT;
- killed_state killed;
+ if (should_stop(info))
+ return 0;
/* reset the transmit packet for the event read from binary log
file */
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
- goto err;
+ if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg))
+ return 1;
- bool is_active_binlog= false;
- while (!(killed= thd->killed) &&
- !(error = Log_event::read_log_event(&log, packet, log_lock,
- info.current_checksum_alg,
- log_file_name,
- &is_active_binlog)))
+ info->last_pos= linfo->pos;
+ error = Log_event::read_log_event(log, packet, /* LOCK_log */ NULL,
+ info->current_checksum_alg,
+ NULL, NULL);
+ linfo->pos= my_b_tell(log);
+
+ if (error)
{
+ goto read_err;
+ }
+
+ Log_event_type event_type=
+ (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
+
#ifndef DBUG_OFF
- if (max_binlog_dump_events && !left_events--)
+ if (info->dbug_reconnect_counter > 0)
+ {
+ --info->dbug_reconnect_counter;
+ if (info->dbug_reconnect_counter == 0)
{
- net_flush(info.net);
- errmsg = "Debugging binlog dump abort";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
+ info->errmsg= "DBUG-injected forced reconnect";
+ info->error= ER_UNKNOWN_ERROR;
+ return 1;
}
+ }
#endif
- /*
- log's filename does not change while it's active
- */
- p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
- event_type=
- (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
#ifdef ENABLED_DEBUG_SYNC
- DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
+ DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
+ {
+ if (event_type == XID_EVENT)
{
- if (event_type == XID_EVENT)
- {
- net_flush(info.net);
- const char act[]=
+ net_flush(info->net);
+ const char act[]=
"now "
"wait_for signal.continue";
- DBUG_ASSERT(debug_sync_service);
- DBUG_ASSERT(!debug_sync_set_action(thd,
- STRING_WITH_LEN(act)));
- const char act2[]=
+ DBUG_ASSERT(debug_sync_service);
+ DBUG_ASSERT(!debug_sync_set_action(
+ info->thd,
+ STRING_WITH_LEN(act)));
+
+ const char act2[]=
"now "
"signal signal.continued";
- DBUG_ASSERT(!debug_sync_set_action(current_thd,
- STRING_WITH_LEN(act2)));
- }
- });
+ DBUG_ASSERT(!debug_sync_set_action(
+ info->thd,
+ STRING_WITH_LEN(act2)));
+ }
+ });
#endif
- if (event_type == FORMAT_DESCRIPTION_EVENT)
- {
- Format_description_log_event *tmp;
-
- info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
- packet->length() - ev_offset);
- DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
- info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
- info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
- if (!is_slave_checksum_aware(thd) &&
- info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
- {
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- errmsg= "Slave can not handle replication events with the checksum "
- "that master is configured to log";
- sql_print_warning("Master is configured to log replication events "
- "with checksum, but will not send such events to "
- "slaves that cannot process them");
- goto err;
- }
-
- if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
- packet->length()-ev_offset,
- info.fdev)))
- {
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- errmsg= "Corrupt Format_description event found or out-of-memory";
- goto err;
- }
- delete info.fdev;
- info.fdev= tmp;
- (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
- }
+ if ((info->errmsg= send_event_to_slave(info, event_type, log,
+ ev_offset, &info->error_gtid)))
+ return 1;
-#ifndef DBUG_OFF
- if (dbug_reconnect_counter > 0)
- {
- --dbug_reconnect_counter;
- if (dbug_reconnect_counter == 0)
- {
- errmsg= "DBUG-injected forced reconnect";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- }
-#endif
+ if (unlikely(info->send_fake_gtid_list) &&
+ info->gtid_skip_group == GTID_SKIP_NOT)
+ {
+ Gtid_list_log_event glev(&info->until_binlog_state, 0);
- if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
- ev_offset, &error_gtid)))
+ if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg) ||
+ fake_gtid_list_event(info, &glev, &info->errmsg, my_b_tell(log)))
{
- errmsg= tmp_msg;
- goto err;
+ info->error= ER_UNKNOWN_ERROR;
+ return 1;
}
- if (unlikely(info.send_fake_gtid_list) &&
- info.gtid_skip_group == GTID_SKIP_NOT)
- {
- Gtid_list_log_event glev(&info.until_binlog_state, 0);
+ info->send_fake_gtid_list= false;
+ }
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
- fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
- {
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- info.send_fake_gtid_list= false;
- }
- if (info.until_gtid_state &&
- is_until_reached(&info, &ev_offset, event_type, &errmsg,
- my_b_tell(&log)))
+ if (info->until_gtid_state &&
+ is_until_reached(info, &ev_offset, event_type, &info->errmsg,
+ my_b_tell(log)))
+ {
+ if (info->errmsg)
{
- if (errmsg)
- {
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- goto end;
+ info->error= ER_UNKNOWN_ERROR;
+ return 1;
}
-
- DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
- {
- if (event_type == XID_EVENT)
- {
- net_flush(info.net);
- }
- });
-
- /* Abort server before it sends the XID_EVENT */
- DBUG_EXECUTE_IF("crash_before_send_xid",
- {
- if (event_type == XID_EVENT)
- {
- my_sleep(2000000);
- DBUG_SUICIDE();
- }
- });
-
- /* reset transmit packet for next loop */
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
- goto err;
+ info->should_stop= true;
+ return 0;
}
- if (killed)
- goto end;
- DBUG_EXECUTE_IF("wait_after_binlog_EOF",
+ /* Abort server before it sends the XID_EVENT */
+ DBUG_EXECUTE_IF("crash_before_send_xid",
{
- const char act[]= "now wait_for signal.rotate_finished";
- DBUG_ASSERT(!debug_sync_set_action(current_thd,
- STRING_WITH_LEN(act)));
- };);
+ if (event_type == XID_EVENT)
+ {
+ my_sleep(2000000);
+ DBUG_SUICIDE();
+ }
+ });
+ }
- /*
- TODO: now that we are logging the offset, check to make sure
- the recorded offset and the actual match.
- Guilhem 2003-06: this is not true if this master is a slave
- <4.0.15 running with --log-slave-updates, because then log_pos may
- be the offset in the-master-of-this-master's binlog.
- */
- if (test_for_non_eof_log_read_errors(error, &errmsg))
- goto err;
+ return 0;
- /*
- We should only move to the next binlog when the last read event
- came from a already deactivated binlog.
+read_err:
+ set_read_error(info, error);
+
+ return 1;
+}
+
+/**
+ * This function sends one binlog file to slave
+ *
+ * return 0 - OK
+ * 1 - NOK
+ */
+static int send_one_binlog_file(binlog_send_info *info,
+ IO_CACHE* log,
+ LOG_INFO* linfo,
+ my_off_t start_pos)
+{
+ assert_LOCK_log_owner(false); // we don't have LOCK_log
+
+ /* seek to the requested position, to start the requested dump */
+ if (start_pos != BIN_LOG_HEADER_SIZE)
+ {
+ my_b_seek(log, start_pos);
+ linfo->pos= start_pos;
+ }
+
+ while (!should_stop(info))
+ {
+ /**
+ * get end pos of current log file, this function
+ * will wait if there is nothing available
*/
- if (!(flags & BINLOG_DUMP_NON_BLOCK) && is_active_binlog)
+ my_off_t end_pos= get_binlog_end_pos(info, log, linfo);
+ if (end_pos <= 1)
{
- /*
- Block until there is more data in the log
- */
- if (net_flush(info.net))
- {
- errmsg = "failed on net_flush()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ /** end of file or error */
+ return end_pos;
+ }
- /*
- We may have missed the update broadcast from the log
- that has just happened, let's try to catch it if it did.
- If we did not miss anything, we just wait for other threads
- to signal us.
- */
- {
- log.error=0;
- bool read_packet = 0;
+ /**
+ * send events from current position up to end_pos
+ */
+ if (send_events(info, log, linfo, end_pos))
+ return 1;
+ }
-#ifndef DBUG_OFF
- if (max_binlog_dump_events && !left_events--)
- {
- errmsg = "Debugging binlog dump abort";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
-#endif
+ return 1;
+}
- /* reset the transmit packet for the event read from binary log
- file */
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
- goto err;
-
- /*
- No one will update the log while we are reading
- now, but we'll be quick and just read one record
-
- TODO:
- Add an counter that is incremented for each time we update the
- binary log. We can avoid the following read if the counter
- has not been updated since last read.
- */
-
- mysql_mutex_lock(log_lock);
- switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
- info.current_checksum_alg)) {
- case 0:
- /* we read successfully, so we'll need to send it to the slave */
- mysql_mutex_unlock(log_lock);
- read_packet = 1;
- p_coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
- event_type=
- (Log_event_type)((uchar)(*packet)[LOG_EVENT_OFFSET+ev_offset]);
- break;
-
- case LOG_READ_EOF:
- {
- int ret;
- ulong signal_cnt;
- DBUG_PRINT("wait",("waiting for data in binary log"));
- /* For mysqlbinlog (mysqlbinlog.server_id==0). */
- if (thd->variables.server_id==0)
- {
- mysql_mutex_unlock(log_lock);
- goto end;
- }
+void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
+ ushort flags)
+{
+ LOG_INFO linfo;
-#ifndef DBUG_OFF
- ulong hb_info_counter= 0;
-#endif
- PSI_stage_info old_stage;
- signal_cnt= mysql_bin_log.signal_cnt;
- do
- {
- if (heartbeat_period != 0)
- {
- DBUG_ASSERT(heartbeat_ts);
- set_timespec_nsec(*heartbeat_ts, heartbeat_period);
- }
- thd->ENTER_COND(log_cond, log_lock,
- &stage_master_has_sent_all_binlog_to_slave,
- &old_stage);
- if (thd->killed)
- break;
- ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
- DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
- if (ret == ETIMEDOUT || ret == ETIME)
- {
-#ifndef DBUG_OFF
- if (hb_info_counter < 3)
- {
- sql_print_information("master sends heartbeat message");
- hb_info_counter++;
- if (hb_info_counter == 3)
- sql_print_information("the rest of heartbeat info skipped ...");
- }
-#endif
- /* reset transmit packet for the heartbeat event */
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
- {
- thd->EXIT_COND(&old_stage);
- goto err;
- }
- if (send_heartbeat_event(info.net, packet, p_coord,
- info.current_checksum_alg))
- {
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- thd->EXIT_COND(&old_stage);
- goto err;
- }
- }
- else
- {
- DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
- }
- } while (signal_cnt == mysql_bin_log.signal_cnt);
- thd->EXIT_COND(&old_stage);
- }
- break;
-
- default:
- mysql_mutex_unlock(log_lock);
- test_for_non_eof_log_read_errors(error, &errmsg);
- goto err;
- }
-
- if (read_packet)
- {
- if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
- ev_offset, &error_gtid)))
- {
- errmsg= tmp_msg;
- goto err;
- }
- if (unlikely(info.send_fake_gtid_list)
- && info.gtid_skip_group == GTID_SKIP_NOT)
- {
- Gtid_list_log_event glev(&info.until_binlog_state, 0);
+ IO_CACHE log;
+ File file = -1;
+ String* const packet= &thd->packet;
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
- fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
- {
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- info.send_fake_gtid_list= false;
- }
- if (info.until_gtid_state &&
- is_until_reached(&info, &ev_offset, event_type, &errmsg,
- my_b_tell(&log)))
- {
- if (errmsg)
- {
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
- goto end;
- }
- }
+ binlog_send_info infoobj(thd, packet, flags, linfo.log_file_name);
+ binlog_send_info *info= &infoobj;
- log.error=0;
- }
- }
- else
- {
- bool loop_breaker = 0;
- /* need this to break out of the for loop from switch */
+ int old_max_allowed_packet= thd->variables.max_allowed_packet;
+ thd->variables.max_allowed_packet= MAX_MAX_ALLOWED_PACKET;
- THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog);
- switch (mysql_bin_log.find_next_log(&linfo, 1)) {
- case 0:
- break;
- case LOG_INFO_EOF:
- if (mysql_bin_log.is_active(log_file_name))
- {
- loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
- break;
- }
- default:
- errmsg = "could not find next log";
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
- }
+ DBUG_ENTER("mysql_binlog_send");
+ DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
- if (loop_breaker)
- break;
+ bzero((char*) &log,sizeof(log));
- end_io_cache(&log);
- mysql_file_close(file, MYF(MY_WME));
+ if (init_binlog_sender(info, &linfo, log_ident, &pos))
+ goto err;
- /* reset transmit packet for the possible fake rotate event */
- if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
- goto err;
-
+ /*
+ run hook first when all check has been made that slave seems to
+ be requesting a reasonable position. i.e when transmit actually starts
+ */
+ if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ {
+ info->errmsg= "Failed to run hook 'transmit_start'";
+ info->error= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
+ /*
+ heartbeat_period from @master_heartbeat_period user variable
+ NOTE: this is initialized after transmit_start-hook so that
+ the hook can affect value of heartbeat period
+ */
+ info->heartbeat_period= get_heartbeat_period(thd);
+
+ while (!should_stop(info))
+ {
+ /*
+ Tell the client about the log name with a fake Rotate event;
+ this is needed even if we also send a Format_description_log_event
+ just after, because that event does not contain the binlog's name.
+ Note that as this Rotate event is sent before
+ Format_description_log_event, the slave cannot have any info to
+ understand this event's format, so the header len of
+ Rotate_log_event is FROZEN (so in 5.0 it will have a header shorter
+ than other events except FORMAT_DESCRIPTION_EVENT).
+ Before 4.0.14 we called fake_rotate_event below only if (pos ==
+ BIN_LOG_HEADER_SIZE), because if this is false then the slave
+ already knows the binlog's name.
+ Since, we always call fake_rotate_event; if the slave already knew
+ the log's name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is
+ useless but does not harm much. It is nice for 3.23 (>=.58) slaves
+ which test Rotate events to see if the master is 4.0 (then they
+ choose to stop because they can't replicate 4.0); by always calling
+ fake_rotate_event we are sure that 3.23.58 and newer will detect the
+ problem as soon as replication starts (BUG#198).
+ Always calling fake_rotate_event makes sending of normal
+ (=from-binlog) Rotate events a priori unneeded, but it is not so
+ simple: the 2 Rotate events are not equivalent, the normal one is
+ before the Stop event, the fake one is after. If we don't send the
+ normal one, then the Stop event will be interpreted (by existing 4.0
+ slaves) as "the master stopped", which is wrong. So for safety,
+ given that we want minimum modification of 4.0, we send the normal
+ and fake Rotates.
+ */
+ if (fake_rotate_event(info, pos, &info->errmsg, info->current_checksum_alg))
+ {
/*
- Call fake_rotate_event() in case the previous log (the one which
- we have just finished reading) did not contain a Rotate event
- (for example (I don't know any other example) the previous log
- was the last one before the master was shutdown & restarted).
- This way we tell the slave about the new log's name and
- position. If the binlog is 5.0, the next event we are going to
- read and send is Format_description_log_event.
+ This error code is not perfect, as fake_rotate_event() does not
+ read anything from the binlog; if it fails it's because of an
+ error in my_net_write(), fortunately it will say so in errmsg.
*/
- if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
- fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg,
- info.current_checksum_alg))
- {
- my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
- goto err;
- }
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
- p_coord->file_name= log_file_name; // reset to the next
+ if ((file=open_binlog(&log, linfo.log_file_name, &info->errmsg)) < 0)
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
}
- }
-end:
- end_io_cache(&log);
- mysql_file_close(file, MYF(MY_WME));
+ if (send_format_descriptor_event(info, &log, &linfo, pos))
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
- RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
- my_eof(thd);
+ /*
+ We want to corrupt the first event that will be sent to the slave.
+ But we do not want the corruption to happen early, eg. when client does
+ BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to
+ set the real DBUG injection here.
+ */
+ DBUG_EXECUTE_IF("corrupt_read_log_event_to_slave_set",
+ {
+ DBUG_SET("-d,corrupt_read_log_event_to_slave_set");
+ DBUG_SET("+d,corrupt_read_log_event2");
+ });
+
+ /*
+ Handle the case of START SLAVE UNTIL with an UNTIL condition already
+ fulfilled at the start position.
+
+ We will send one event, the format_description, and then stop.
+ */
+ if (info->until_gtid_state && info->until_gtid_state->count() == 0)
+ info->gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+
+ THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
+ if (send_one_binlog_file(info, &log, &linfo, pos))
+ break;
+
+ if (should_stop(info))
+ break;
+
+ DBUG_EXECUTE_IF("wait_after_binlog_EOF",
+ {
+ const char act[]= "now wait_for signal.rotate_finished";
+ DBUG_ASSERT(!debug_sync_set_action(current_thd,
+ STRING_WITH_LEN(act)));
+ };);
+
+ THD_STAGE_INFO(thd,
+ stage_finished_reading_one_binlog_switching_to_next_binlog);
+ if (mysql_bin_log.find_next_log(&linfo, 1))
+ {
+ info->errmsg= "could not find next log";
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ break;
+ }
+
+ /** start from start of next file */
+ pos= BIN_LOG_HEADER_SIZE;
+
+ /** close current cache/file */
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+ file= -1;
+ }
+
+err:
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
+
+ const bool binlog_open = my_b_inited(&log);
+ if (file >= 0)
+ {
+ end_io_cache(&log);
+ mysql_file_close(file, MYF(MY_WME));
+ }
+
mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
- delete info.fdev;
- DBUG_VOID_RETURN;
+ delete info->fdev;
-err:
- THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
- if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
+ if (info->error == ER_MASTER_FATAL_ERROR_READING_BINLOG && binlog_open)
{
- /*
- detailing the fatal error message with coordinates
+ /*
+ detailing the fatal error message with coordinates
of the last position read.
*/
- my_snprintf(error_text, sizeof(error_text),
+ my_snprintf(info->error_text, sizeof(info->error_text),
"%s; the first event '%s' at %lld, "
"the last event read from '%s' at %lld, "
"the last byte read from '%s' at %lld.",
- errmsg,
- my_basename(p_start_coord->file_name), p_start_coord->pos,
- my_basename(p_coord->file_name), p_coord->pos,
- my_basename(log_file_name), my_b_tell(&log));
+ info->errmsg,
+ my_basename(info->start_log_file_name), info->start_pos,
+ my_basename(info->log_file_name), info->last_pos,
+ my_basename(info->log_file_name), linfo.pos);
}
- else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
+ else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
{
- my_snprintf(error_text, sizeof(error_text),
+ my_snprintf(info->error_text, sizeof(info->error_text),
"Error: connecting slave requested to start from GTID "
"%u-%u-%llu, which is not in the master's binlog",
- error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ info->error_gtid.domain_id,
+ info->error_gtid.server_id,
+ info->error_gtid.seq_no);
/* Use this error code so slave will know not to try reconnect. */
- my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
}
- else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2)
+ else if (info->error == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2)
{
- my_snprintf(error_text, sizeof(error_text),
+ my_snprintf(info->error_text, sizeof(info->error_text),
"Error: connecting slave requested to start from GTID "
"%u-%u-%llu, which is not in the master's binlog. Since the "
"master's binlog contains GTIDs with higher sequence numbers, "
"it probably means that the slave has diverged due to "
"executing extra errorneous transactions",
- error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ info->error_gtid.domain_id,
+ info->error_gtid.server_id,
+ info->error_gtid.seq_no);
/* Use this error code so slave will know not to try reconnect. */
- my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
}
- else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE)
+ else if (info->error == ER_GTID_START_FROM_BINLOG_HOLE)
{
- my_snprintf(error_text, sizeof(error_text),
+ my_snprintf(info->error_text, sizeof(info->error_text),
"The binlog on the master is missing the GTID %u-%u-%llu "
"requested by the slave (even though both a prior and a "
"subsequent sequence number does exist), and GTID strict mode "
"is enabled",
- error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ info->error_gtid.domain_id,
+ info->error_gtid.server_id,
+ info->error_gtid.seq_no);
/* Use this error code so slave will know not to try reconnect. */
- my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
}
- else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
+ else if (info->error == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
{
- my_snprintf(error_text, sizeof(error_text),
+ my_snprintf(info->error_text, sizeof(info->error_text),
"Failed to load replication slave GTID state from table %s.%s",
"mysql", rpl_gtid_slave_state_table_name.str);
- my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
+ else if (info->error != 0 && info->errmsg != NULL)
+ strcpy(info->error_text, info->errmsg);
+
+ if (info->error == 0)
+ {
+ my_eof(thd);
}
else
- strcpy(error_text, errmsg);
- end_io_cache(&log);
- RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
- /*
- Exclude iteration through thread list
- this is needed for purge_logs() - it will iterate through
- thread list and update thd->current_linfo->index_file_offset
- this mutex will make sure that it never tried to update our linfo
- after we return from this stack frame
- */
- mysql_mutex_lock(&LOCK_thread_count);
- thd->current_linfo = 0;
- mysql_mutex_unlock(&LOCK_thread_count);
- if (file >= 0)
- mysql_file_close(file, MYF(MY_WME));
- thd->variables.max_allowed_packet= old_max_allowed_packet;
- delete info.fdev;
+ {
+ my_message(info->error, info->error_text, MYF(0));
+ }
- my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN;
}