summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc363
1 files changed, 197 insertions, 166 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 9e99b7292d0..eaed0660e46 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -115,6 +115,39 @@ fake_event_write(NET *net, String *packet, const char **errmsg)
/*
+ Helper structure, used to pass miscellaneous info from mysql_binlog_send()
+ into the helper functions that it calls.
+*/
+struct binlog_send_info {
+ rpl_binlog_state until_binlog_state;
+ slave_connection_state gtid_state;
+ THD *thd;
+ NET *net;
+ String *packet;
+ char *log_file_name;
+ slave_connection_state *until_gtid_state;
+ Format_description_log_event *fdev;
+ int mariadb_slave_capability;
+ enum_gtid_skip_type gtid_skip_group;
+ enum_gtid_until_state gtid_until_group;
+ ushort flags;
+ uint8 current_checksum_alg;
+ bool slave_gtid_strict_mode;
+ bool send_fake_gtid_list;
+ bool slave_gtid_ignore_duplicates;
+ bool using_gtid_state;
+
+ binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn)
+ : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
+ log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
+ gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
+ flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
+ slave_gtid_strict_mode(false), send_fake_gtid_list(false),
+ slave_gtid_ignore_duplicates(false)
+ { }
+};
+
+/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
send to the slave (because the slave may not know it if it just asked for
@@ -132,16 +165,16 @@ fake_event_write(NET *net, String *packet, const char **errmsg)
part.
*/
-static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
- ulonglong position, const char** errmsg,
- uint8 checksum_alg_arg)
+static int fake_rotate_event(binlog_send_info *info, ulonglong position,
+ const char** errmsg, uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
char buf[ROTATE_HEADER_LEN+100];
my_bool do_checksum;
int err;
- char* p = log_file_name+dirname_length(log_file_name);
+ char* p = info->log_file_name+dirname_length(info->log_file_name);
uint ident_len = (uint) strlen(p);
+ String *packet= info->packet;
ha_checksum crc;
if ((err= fake_event_header(packet, ROTATE_EVENT,
@@ -160,22 +193,23 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
}
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
- (err= fake_event_write(net, packet, errmsg)))
+ (err= fake_event_write(info->net, packet, errmsg)))
DBUG_RETURN(err);
DBUG_RETURN(0);
}
-static int fake_gtid_list_event(NET* net, String* packet,
+static int fake_gtid_list_event(binlog_send_info *info,
Gtid_list_log_event *glev, const char** errmsg,
- uint8 checksum_alg_arg, uint32 current_pos)
+ uint32 current_pos)
{
my_bool do_checksum;
int err;
ha_checksum crc;
char buf[128];
String str(buf, sizeof(buf), system_charset_info);
+ String* packet= info->packet;
str.length(0);
if (glev->to_packet(&str))
@@ -185,7 +219,7 @@ static int fake_gtid_list_event(NET* net, String* packet,
}
if ((err= fake_event_header(packet, GTID_LIST_EVENT,
str.length(), &do_checksum, &crc,
- errmsg, checksum_alg_arg, current_pos)))
+ errmsg, info->current_checksum_alg, current_pos)))
return err;
packet->append(str);
@@ -195,7 +229,7 @@ static int fake_gtid_list_event(NET* net, String* packet,
}
if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
- (err= fake_event_write(net, packet, errmsg)))
+ (err= fake_event_write(info->net, packet, errmsg)))
return err;
return 0;
@@ -627,6 +661,19 @@ get_slave_gtid_strict_mode(THD *thd)
}
+static bool
+get_slave_gtid_ignore_duplicates(THD *thd)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_ignore_duplicates") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_int(&null_value) && !null_value;
+}
+
+
/*
Get the value of the @slave_until_gtid user variable into the supplied
String (this is the GTID position specified for START SLAVE UNTIL
@@ -914,16 +961,16 @@ give_error_start_pos_missing_in_binlog(int *err, const char **errormsg,
*/
static int
-check_slave_start_position(THD *thd, slave_connection_state *st,
- const char **errormsg, rpl_gtid *error_gtid,
- slave_connection_state *until_gtid_state)
+check_slave_start_position(binlog_send_info *info, const char **errormsg,
+ rpl_gtid *error_gtid)
{
uint32 i;
int err;
slave_connection_state::entry **delete_list= NULL;
uint32 delete_idx= 0;
+ slave_connection_state *st= &info->gtid_state;
- if (rpl_load_gtid_slave_state(thd))
+ if (rpl_load_gtid_slave_state(info->thd))
{
*errormsg= "Failed to load replication slave GTID state";
err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
@@ -963,6 +1010,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
if (!start_at_own_slave_pos)
{
rpl_gtid domain_gtid;
+ slave_connection_state *until_gtid_state= info->until_gtid_state;
rpl_gtid *until_gtid;
if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
@@ -981,6 +1029,17 @@ check_slave_start_position(THD *thd, slave_connection_state *st,
continue;
}
+ if (info->slave_gtid_ignore_duplicates &&
+ domain_gtid.seq_no < slave_gtid->seq_no)
+ {
+ /*
+ When --gtid-ignore-duplicates, it is ok for the slave to request
+ something that we do not have (yet) - they might already have gotten
+ it through another path in a multi-path replication hierarchy.
+ */
+ continue;
+ }
+
if (until_gtid_state &&
( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
(mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
@@ -1462,13 +1521,11 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
static bool
-is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
- enum_gtid_until_state gtid_until_group,
- Log_event_type event_type, uint8 current_checksum_alg,
- ushort flags, const char **errmsg,
- rpl_binlog_state *until_binlog_state, uint32 current_pos)
+is_until_reached(binlog_send_info *info, ulong *ev_offset,
+ Log_event_type event_type, const char **errmsg,
+ uint32 current_pos)
{
- switch (gtid_until_group)
+ switch (info->gtid_until_group)
{
case GTID_UNTIL_NOT_DONE:
return false;
@@ -1479,9 +1536,10 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
case GTID_UNTIL_STOP_AFTER_TRANSACTION:
if (event_type != XID_EVENT &&
(event_type != QUERY_EVENT ||
- !Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset,
- packet->length()-*ev_offset,
- current_checksum_alg)))
+ !Query_log_event::peek_is_commit_rollback
+ (info->packet->ptr()+*ev_offset,
+ info->packet->length()-*ev_offset,
+ info->current_checksum_alg)))
return false;
break;
}
@@ -1493,12 +1551,11 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
Send a last fake Gtid_list_log_event with a flag set to mark that we
stop due to UNTIL condition.
*/
- if (reset_transmit_packet(thd, flags, ev_offset, errmsg))
+ if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg))
return true;
- Gtid_list_log_event glev(until_binlog_state,
+ Gtid_list_log_event glev(&info->until_binlog_state,
Gtid_list_log_event::FLAG_UNTIL_REACHED);
- if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg,
- current_pos))
+ if (fake_gtid_list_event(info, &glev, errmsg, current_pos))
return true;
*errmsg= NULL;
return true;
@@ -1512,23 +1569,19 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset,
Returns NULL on success, error message string on error.
*/
static const char *
-send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
- Log_event_type event_type, char *log_file_name,
- IO_CACHE *log, int mariadb_slave_capability,
- ulong ev_offset, uint8 current_checksum_alg,
- bool using_gtid_state, slave_connection_state *gtid_state,
- enum_gtid_skip_type *gtid_skip_group,
- slave_connection_state *until_gtid_state,
- enum_gtid_until_state *gtid_until_group,
- rpl_binlog_state *until_binlog_state,
- bool slave_gtid_strict_mode, rpl_gtid *error_gtid,
- bool *send_fake_gtid_list,
- Format_description_log_event *fdev)
+send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
+ IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid)
{
my_off_t pos;
+ String* const packet= info->packet;
size_t len= packet->length();
+ int mariadb_slave_capability= info->mariadb_slave_capability;
+ uint8 current_checksum_alg= info->current_checksum_alg;
+ slave_connection_state *gtid_state= &info->gtid_state;
+ slave_connection_state *until_gtid_state= info->until_gtid_state;
- if (event_type == GTID_LIST_EVENT && using_gtid_state && until_gtid_state)
+ if (event_type == GTID_LIST_EVENT &&
+ info->using_gtid_state && until_gtid_state)
{
rpl_gtid *gtid_list;
uint32 list_len;
@@ -1537,12 +1590,12 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
if (ev_offset > len ||
Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
- &gtid_list, &list_len, fdev))
+ &gtid_list, &list_len, info->fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_list_log_event: corrupt binlog";
}
- err= until_binlog_state->load(gtid_list, list_len);
+ err= info->until_binlog_state.load(gtid_list, list_len);
my_free(gtid_list);
if (err)
{
@@ -1552,7 +1605,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
}
/* Skip GTID event groups until we reach slave position within a domain_id. */
- if (event_type == GTID_EVENT && using_gtid_state)
+ if (event_type == GTID_EVENT && info->using_gtid_state)
{
uchar flags2;
slave_connection_state::entry *gtid_entry;
@@ -1566,7 +1619,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
current_checksum_alg,
&event_gtid.domain_id, &event_gtid.server_id,
- &event_gtid.seq_no, &flags2, fdev))
+ &event_gtid.seq_no, &flags2, info->fdev))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed to read Gtid_log_event: corrupt binlog";
@@ -1575,7 +1628,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
{
rpl_gtid *dbug_gtid;
- if ((dbug_gtid= until_binlog_state->find_nolock(10,1)) &&
+ if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) &&
dbug_gtid->seq_no == 100)
{
DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
@@ -1585,7 +1638,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
}
});
- if (until_binlog_state->update_nolock(&event_gtid, false))
+ if (info->until_binlog_state.update_nolock(&event_gtid, false))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
return "Failed in internal GTID book-keeping: Out of memory";
@@ -1618,12 +1671,13 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
/* Skip this event group if we have not yet reached slave start pos. */
if (event_gtid.server_id != gtid->server_id ||
event_gtid.seq_no <= gtid->seq_no)
- *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
if (event_gtid.server_id == gtid->server_id &&
event_gtid.seq_no >= gtid->seq_no)
{
- if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no &&
+ if (info->slave_gtid_strict_mode &&
+ event_gtid.seq_no > gtid->seq_no &&
!(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS))
{
/*
@@ -1645,7 +1699,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
The fake event will be sent at the end of this event group.
*/
- *send_fake_gtid_list= true;
+ info->send_fake_gtid_list= true;
/*
Delete this entry if we have reached slave start position (so we
@@ -1666,7 +1720,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
This domain already reached the START SLAVE UNTIL stop condition,
so skip this event group.
*/
- *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
}
else if (event_gtid.server_id == gtid->server_id &&
@@ -1681,9 +1735,9 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
uint64 until_seq_no= gtid->seq_no;
until_gtid_state->remove(gtid);
if (until_gtid_state->count() == 0)
- *gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
- GTID_UNTIL_STOP_AFTER_STANDALONE :
- GTID_UNTIL_STOP_AFTER_TRANSACTION);
+ info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_UNTIL_STOP_AFTER_STANDALONE :
+ GTID_UNTIL_STOP_AFTER_TRANSACTION);
if (event_gtid.seq_no > until_seq_no)
{
/*
@@ -1693,7 +1747,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
should be in, we can just stop now. And we also need to skip this
event group (as it is beyond the UNTIL condition).
*/
- *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
}
}
@@ -1707,11 +1761,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Note that slave that understands GTID can also tolerate holes, so there is
no need to supply dummy event.
*/
- switch (*gtid_skip_group)
+ switch (info->gtid_skip_group)
{
case GTID_SKIP_STANDALONE:
if (!Log_event::is_part_of_group(event_type))
- *gtid_skip_group= GTID_SKIP_NOT;
+ info->gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_TRANSACTION:
if (event_type == XID_EVENT ||
@@ -1719,14 +1773,15 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
len - ev_offset,
current_checksum_alg)))
- *gtid_skip_group= GTID_SKIP_NOT;
+ info->gtid_skip_group= GTID_SKIP_NOT;
return NULL;
case GTID_SKIP_NOT:
break;
}
/* Do not send annotate_rows events unless slave requested it. */
- if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+ if (event_type == ANNOTATE_ROWS_EVENT &&
+ !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
{
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
{
@@ -1820,7 +1875,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Skip events with the @@skip_replication flag set, if slave requested
skipping of such events.
*/
- if (thd->variables.option_bits & OPTION_SKIP_REPLICATION)
+ if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION)
{
/*
The first byte of the packet is a '\0' to distinguish it from an error
@@ -1831,17 +1886,17 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
return NULL;
}
- THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave);
+ THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
- (thd, flags, packet, log_file_name, pos)))
+ (info->thd, info->flags, packet, info->log_file_name, pos)))
{
my_errno= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
}
- if (my_net_write(net, (uchar*) packet->ptr(), len))
+ if (my_net_write(info->net, (uchar*) packet->ptr(), len))
{
my_errno= ER_UNKNOWN_ERROR;
return "Failed on my_net_write()";
@@ -1850,14 +1905,15 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
if (event_type == LOAD_EVENT)
{
- if (send_file(thd))
+ if (send_file(info->thd))
{
my_errno= ER_UNKNOWN_ERROR;
return "failed in send_file()";
}
}
- if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ if (RUN_HOOK(binlog_transmit, after_send_event,
+ (info->thd, info->flags, packet)))
{
my_errno= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
@@ -1878,31 +1934,21 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
IO_CACHE log;
File file = -1;
- String* const packet = &thd->packet;
+ String* const packet= &thd->packet;
int error;
const char *errmsg = "Unknown error", *tmp_msg;
char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
- NET* net = &thd->net;
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
- int mariadb_slave_capability;
char str_buf[128];
String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
- bool using_gtid_state;
char str_buf2[128];
String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
- slave_connection_state gtid_state, until_gtid_state_obj;
- slave_connection_state *until_gtid_state= NULL;
+ slave_connection_state until_gtid_state_obj;
rpl_gtid error_gtid;
- enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
- enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE;
- rpl_binlog_state until_binlog_state;
- bool slave_gtid_strict_mode= false;
- bool send_fake_gtid_list= false;
+ binlog_send_info info(thd, packet, flags, log_file_name);
- uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
- Format_description_log_event *fdev= NULL;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
@@ -1928,16 +1974,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
heartbeat_ts= &heartbeat_buf;
set_timespec_nsec(*heartbeat_ts, 0);
}
- mariadb_slave_capability= get_mariadb_slave_capability(thd);
+ info.mariadb_slave_capability= get_mariadb_slave_capability(thd);
connect_gtid_state.length(0);
- using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
- DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
- if (using_gtid_state)
+ info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;);
+ if (info.using_gtid_state)
{
- slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+ info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+ info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
if(get_slave_until_gtid(thd, &slave_until_gtid_str))
- until_gtid_state= &until_gtid_state_obj;
+ info.until_gtid_state= &until_gtid_state_obj;
}
DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
@@ -1978,7 +2025,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
#endif
- if (!(fdev= new Format_description_log_event(3)))
+ if (!(info.fdev= new Format_description_log_event(3)))
{
errmsg= "Out of memory initializing format_description event";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
@@ -1999,33 +2046,32 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
name=search_file_name;
- if (using_gtid_state)
+ if (info.using_gtid_state)
{
- if (gtid_state.load(connect_gtid_state.c_ptr_quick(),
- connect_gtid_state.length()))
+ if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(),
+ connect_gtid_state.length()))
{
errmsg= "Out of memory or malformed slave request when obtaining start "
"position from GTID state";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- if (until_gtid_state &&
- until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
- slave_until_gtid_str.length()))
+ if (info.until_gtid_state &&
+ info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
+ slave_until_gtid_str.length()))
{
errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
"position sent from slave";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
- &error_gtid, until_gtid_state)))
+ if ((error= check_slave_start_position(&info, &errmsg, &error_gtid)))
{
my_errno= error;
goto err;
}
- if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name,
- until_gtid_state)))
+ if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name,
+ info.until_gtid_state)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -2098,7 +2144,7 @@ impossible position";
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
- if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
+ if (fake_rotate_event(&info, pos, &errmsg,
get_binlog_checksum_value_at_connect(thd)))
{
/*
@@ -2150,14 +2196,14 @@ impossible position";
{
Format_description_log_event *tmp;
- current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
- packet->length() - ev_offset);
- DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+ packet->length() - ev_offset);
+ DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
@@ -2170,14 +2216,14 @@ impossible position";
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset,
- fdev)))
+ info.fdev)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory";
goto err;
}
- delete fdev;
- fdev= tmp;
+ delete info.fdev;
+ info.fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
@@ -2194,12 +2240,12 @@ impossible position";
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* fix the checksum due to latest changes in header */
- if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset);
/* send it */
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -2235,13 +2281,13 @@ impossible position";
We will send one event, the format_description, and then stop.
*/
- if (until_gtid_state && until_gtid_state->count() == 0)
- gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+ if (info.until_gtid_state && info.until_gtid_state->count() == 0)
+ info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
/* seek to the requested position, to start the requested dump */
my_b_seek(&log, pos); // Seek will done on next read
- while (!net->error && net->vio != 0 && !thd->killed)
+ while (!info.net->error && info.net->vio != 0 && !thd->killed)
{
Log_event_type event_type= UNKNOWN_EVENT;
killed_state killed;
@@ -2254,14 +2300,14 @@ impossible position";
bool is_active_binlog= false;
while (!(killed= thd->killed) &&
!(error = Log_event::read_log_event(&log, packet, log_lock,
- current_checksum_alg,
+ info.current_checksum_alg,
log_file_name,
&is_active_binlog)))
{
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
{
- net_flush(net);
+ net_flush(info.net);
errmsg = "Debugging binlog dump abort";
my_errno= ER_UNKNOWN_ERROR;
goto err;
@@ -2279,7 +2325,7 @@ impossible position";
{
if (event_type == XID_EVENT)
{
- net_flush(net);
+ net_flush(info.net);
const char act[]=
"now "
"wait_for signal.continue";
@@ -2298,14 +2344,14 @@ impossible position";
{
Format_description_log_event *tmp;
- current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+ info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
- DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
@@ -2318,14 +2364,14 @@ impossible position";
if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
packet->length()-ev_offset,
- fdev)))
+ info.fdev)))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Corrupt Format_description event found or out-of-memory";
goto err;
}
- delete fdev;
- fdev= tmp;
+ delete info.fdev;
+ info.fdev= tmp;
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
@@ -2343,36 +2389,28 @@ impossible position";
}
#endif
- if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log,
- mariadb_slave_capability, ev_offset,
- current_checksum_alg, using_gtid_state,
- &gtid_state, &gtid_skip_group,
- until_gtid_state, &gtid_until_group,
- &until_binlog_state,
- slave_gtid_strict_mode, &error_gtid,
- &send_fake_gtid_list, fdev)))
+ if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
+ ev_offset, &error_gtid)))
{
errmsg= tmp_msg;
goto err;
}
- if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT)
+ if (unlikely(info.send_fake_gtid_list) &&
+ info.gtid_skip_group == GTID_SKIP_NOT)
{
- Gtid_list_log_event glev(&until_binlog_state, 0);
+ Gtid_list_log_event glev(&info.until_binlog_state, 0);
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
- fake_gtid_list_event(net, packet, &glev, &errmsg,
- current_checksum_alg, my_b_tell(&log)))
+ fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
{
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- send_fake_gtid_list= false;
+ info.send_fake_gtid_list= false;
}
- if (until_gtid_state &&
- is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
- event_type, current_checksum_alg, flags, &errmsg,
- &until_binlog_state, my_b_tell(&log)))
+ if (info.until_gtid_state &&
+ is_until_reached(&info, &ev_offset, event_type, &errmsg,
+ my_b_tell(&log)))
{
if (errmsg)
{
@@ -2386,7 +2424,7 @@ impossible position";
{
if (event_type == XID_EVENT)
{
- net_flush(net);
+ net_flush(info.net);
}
});
@@ -2423,7 +2461,7 @@ impossible position";
/*
Block until there is more data in the log
*/
- if (net_flush(net))
+ if (net_flush(info.net))
{
errmsg = "failed on net_flush()";
my_errno= ER_UNKNOWN_ERROR;
@@ -2466,7 +2504,7 @@ impossible position";
mysql_mutex_lock(log_lock);
switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
- current_checksum_alg)) {
+ info.current_checksum_alg)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
mysql_mutex_unlock(log_lock);
@@ -2524,7 +2562,8 @@ impossible position";
thd->EXIT_COND(&old_stage);
goto err;
}
- if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
+ if (send_heartbeat_event(info.net, packet, p_coord,
+ info.current_checksum_alg))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -2549,36 +2588,28 @@ impossible position";
if (read_packet)
{
- if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log,
- mariadb_slave_capability, ev_offset,
- current_checksum_alg,
- using_gtid_state, &gtid_state,
- &gtid_skip_group, until_gtid_state,
- &gtid_until_group, &until_binlog_state,
- slave_gtid_strict_mode, &error_gtid,
- &send_fake_gtid_list, fdev)))
+ if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
+ ev_offset, &error_gtid)))
{
errmsg= tmp_msg;
goto err;
}
- if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT)
+ if (unlikely(info.send_fake_gtid_list)
+ && info.gtid_skip_group == GTID_SKIP_NOT)
{
- Gtid_list_log_event glev(&until_binlog_state, 0);
+ Gtid_list_log_event glev(&info.until_binlog_state, 0);
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
- fake_gtid_list_event(net, packet, &glev, &errmsg,
- current_checksum_alg, my_b_tell(&log)))
+ fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
{
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- send_fake_gtid_list= false;
+ info.send_fake_gtid_list= false;
}
- if (until_gtid_state &&
- is_until_reached(thd, net, packet, &ev_offset, gtid_until_group,
- event_type, current_checksum_alg, flags, &errmsg,
- &until_binlog_state, my_b_tell(&log)))
+ if (info.until_gtid_state &&
+ is_until_reached(&info, &ev_offset, event_type, &errmsg,
+ my_b_tell(&log)))
{
if (errmsg)
{
@@ -2633,8 +2664,8 @@ impossible position";
read and send is Format_description_log_event.
*/
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
- fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
- &errmsg, current_checksum_alg))
+ fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg,
+ info.current_checksum_alg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -2655,7 +2686,7 @@ end:
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
- delete fdev;
+ delete info.fdev;
DBUG_VOID_RETURN;
err:
@@ -2731,7 +2762,7 @@ err:
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
- delete fdev;
+ delete info.fdev;
my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN;