summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc2396
1 files changed, 2178 insertions, 218 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 91acdc27bfb..8f335fd3dc2 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -14,12 +14,15 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
+#include <my_global.h>
#include "sql_priv.h"
#include "unireg.h"
+#include "sql_base.h"
#include "sql_parse.h" // check_access
#ifdef HAVE_REPLICATION
#include "rpl_mi.h"
+#include "rpl_rli.h"
#include "sql_repl.h"
#include "sql_acl.h" // SUPER_ACL
#include "log_event.h"
@@ -28,22 +31,122 @@
#include "rpl_handler.h"
#include "debug_sync.h"
+
+enum enum_gtid_until_state {
+ GTID_UNTIL_NOT_DONE,
+ GTID_UNTIL_STOP_AFTER_STANDALONE,
+ GTID_UNTIL_STOP_AFTER_TRANSACTION
+};
+
+
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
#ifndef DBUG_OFF
static int binlog_dump_count = 0;
#endif
-/**
- a copy of active_mi->rli->slave_skip_counter, for showing in SHOW VARIABLES,
- INFORMATION_SCHEMA.GLOBAL_VARIABLES and @@sql_slave_skip_counter without
- taking all the mutexes needed to access active_mi->rli->slave_skip_counter
- properly.
-*/
-uint sql_slave_skip_counter;
-
extern TYPELIB binlog_checksum_typelib;
+
+static int
+fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
+ my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
+ uint8 checksum_alg_arg, uint32 end_pos)
+{
+ char header[LOG_EVENT_HEADER_LEN];
+ ulong event_len;
+
+ *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
+ checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
+
+ /*
+ 'when' (the timestamp) is set to 0 so that slave could distinguish between
+ real and fake Rotate events (if necessary)
+ */
+ memset(header, 0, 4);
+ header[EVENT_TYPE_OFFSET] = (uchar)event_type;
+ event_len= LOG_EVENT_HEADER_LEN + extra_len +
+ (*do_checksum ? BINLOG_CHECKSUM_LEN : 0);
+ int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
+ int4store(header + EVENT_LEN_OFFSET, event_len);
+ int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
+ // TODO: check what problems this may cause and fix them
+ int4store(header + LOG_POS_OFFSET, end_pos);
+ if (packet->append(header, sizeof(header)))
+ {
+ *errmsg= "Failed due to out-of-memory writing event";
+ return -1;
+ }
+ if (*do_checksum)
+ {
+ *crc= my_checksum(0, (uchar*)header, sizeof(header));
+ }
+ return 0;
+}
+
+
+static int
+fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg)
+{
+ if (do_checksum)
+ {
+ char b[BINLOG_CHECKSUM_LEN];
+ int4store(b, crc);
+ if (packet->append(b, sizeof(b)))
+ {
+ *errmsg= "Failed due to out-of-memory writing event checksum";
+ return -1;
+ }
+ }
+ return 0;
+}
+
+
+static int
+fake_event_write(NET *net, String *packet, const char **errmsg)
+{
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ {
+ *errmsg = "failed on my_net_write()";
+ return -1;
+ }
+ return 0;
+}
+
+
+/*
+ Helper structure, used to pass miscellaneous info from mysql_binlog_send()
+ into the helper functions that it calls.
+*/
+struct binlog_send_info {
+ rpl_binlog_state until_binlog_state;
+ slave_connection_state gtid_state;
+ THD *thd;
+ NET *net;
+ String *packet;
+ char *log_file_name;
+ slave_connection_state *until_gtid_state;
+ Format_description_log_event *fdev;
+ int mariadb_slave_capability;
+ enum_gtid_skip_type gtid_skip_group;
+ enum_gtid_until_state gtid_until_group;
+ ushort flags;
+ uint8 current_checksum_alg;
+ bool slave_gtid_strict_mode;
+ bool send_fake_gtid_list;
+ bool slave_gtid_ignore_duplicates;
+ bool using_gtid_state;
+
+ binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn)
+ : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
+ log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
+ gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
+ flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
+ slave_gtid_strict_mode(false), send_fake_gtid_list(false),
+ slave_gtid_ignore_duplicates(false)
+ { }
+};
+
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
@@ -62,64 +165,77 @@ extern TYPELIB binlog_checksum_typelib;
part.
*/
-static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
- ulonglong position, const char** errmsg,
- uint8 checksum_alg_arg)
+static int fake_rotate_event(binlog_send_info *info, ulonglong position,
+ const char** errmsg, uint8 checksum_alg_arg)
{
DBUG_ENTER("fake_rotate_event");
- char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100];
-
- /*
- this Rotate is to be sent with checksum if and only if
- slave's get_master_version_and_clock time handshake value
- of master's @@global.binlog_checksum was TRUE
- */
-
- my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF &&
- checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF;
-
- /*
- 'when' (the timestamp) is set to 0 so that slave could distinguish between
- real and fake Rotate events (if necessary)
- */
- memset(header, 0, 4);
- header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
-
- char* p = log_file_name+dirname_length(log_file_name);
+ char buf[ROTATE_HEADER_LEN+100];
+ my_bool do_checksum;
+ int err;
+ char* p = info->log_file_name+dirname_length(info->log_file_name);
uint ident_len = (uint) strlen(p);
- ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
- (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
- int4store(header + SERVER_ID_OFFSET, server_id);
- int4store(header + EVENT_LEN_OFFSET, event_len);
- int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
+ String *packet= info->packet;
+ ha_checksum crc;
- // TODO: check what problems this may cause and fix them
- int4store(header + LOG_POS_OFFSET, 0);
+ if ((err= fake_event_header(packet, ROTATE_EVENT,
+ ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc,
+ errmsg, checksum_alg_arg, 0)))
+ DBUG_RETURN(err);
- packet->append(header, sizeof(header));
int8store(buf+R_POS_OFFSET,position);
packet->append(buf, ROTATE_HEADER_LEN);
packet->append(p, ident_len);
if (do_checksum)
{
- char b[BINLOG_CHECKSUM_LEN];
- ha_checksum crc= my_checksum(0L, NULL, 0);
- crc= my_checksum(crc, (uchar*)header, sizeof(header));
crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
crc= my_checksum(crc, (uchar*)p, ident_len);
- int4store(b, crc);
- packet->append(b, sizeof(b));
}
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
+ (err= fake_event_write(info->net, packet, errmsg)))
+ DBUG_RETURN(err);
+
+ DBUG_RETURN(0);
+}
+
+
+static int fake_gtid_list_event(binlog_send_info *info,
+ Gtid_list_log_event *glev, const char** errmsg,
+ uint32 current_pos)
+{
+ my_bool do_checksum;
+ int err;
+ ha_checksum crc;
+ char buf[128];
+ String str(buf, sizeof(buf), system_charset_info);
+ String* packet= info->packet;
+
+ str.length(0);
+ if (glev->to_packet(&str))
{
- *errmsg = "failed on my_net_write()";
- DBUG_RETURN(-1);
+ *errmsg= "Failed due to out-of-memory writing Gtid_list event";
+ return -1;
}
- DBUG_RETURN(0);
+ if ((err= fake_event_header(packet, GTID_LIST_EVENT,
+ str.length(), &do_checksum, &crc,
+ errmsg, info->current_checksum_alg, current_pos)))
+ return err;
+
+ packet->append(str);
+ if (do_checksum)
+ {
+ crc= my_checksum(crc, (uchar*)str.ptr(), str.length());
+ }
+
+ if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) ||
+ (err= fake_event_write(info->net, packet, errmsg)))
+ return err;
+
+ return 0;
}
+
/*
Reset thread transmit packet buffer for event sending
@@ -492,6 +608,94 @@ static ulonglong get_heartbeat_period(THD * thd)
}
/*
+ Lookup the capabilities of the slave, which it announces by setting a value
+ MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability.
+
+ Older MariaDB slaves, and other MySQL slaves, do not set
+ @mariadb_slave_capability, corresponding to a capability of
+ MARIA_SLAVE_CAPABILITY_UNKNOWN (0).
+*/
+static int
+get_mariadb_slave_capability(THD *thd)
+{
+ bool null_value;
+ const LEX_STRING name= { C_STRING_WITH_LEN("mariadb_slave_capability") };
+ const user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry ?
+ (int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN;
+}
+
+
+/*
+ Get the value of the @slave_connect_state user variable into the supplied
+ String (this is the GTID connect state requested by the connecting slave).
+
+ Returns false if error (ie. slave did not set the variable and does not
+ want to use GTID to set start position), true if success.
+*/
+static bool
+get_slave_connect_state(THD *thd, String *out_str)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
+}
+
+
+static bool
+get_slave_gtid_strict_mode(THD *thd)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_strict_mode") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_int(&null_value) && !null_value;
+}
+
+
+static bool
+get_slave_gtid_ignore_duplicates(THD *thd)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_ignore_duplicates") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_int(&null_value) && !null_value;
+}
+
+
+/*
+ Get the value of the @slave_until_gtid user variable into the supplied
+ String (this is the GTID position specified for START SLAVE UNTIL
+ master_gtid_pos='xxx').
+
+ Returns false if error (ie. slave did not set the variable and is not doing
+ START SLAVE UNTIL mater_gtid_pos='xxx'), true if success.
+*/
+static bool
+get_slave_until_gtid(THD *thd, String *out_str)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_until_gtid") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
+}
+
+
+/*
Function prepares and sends repliation heartbeat event.
@param net net object of THD
@@ -526,7 +730,7 @@ static int send_heartbeat_event(NET* net, String* packet,
uint ident_len = strlen(p);
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN +
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
- int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0);
@@ -538,8 +742,8 @@ static int send_heartbeat_event(NET* net, String* packet,
if (do_checksum)
{
char b[BINLOG_CHECKSUM_LEN];
- ha_checksum crc= my_checksum(0L, NULL, 0);
- crc= my_checksum(crc, (uchar*) header, sizeof(header));
+ ha_checksum crc;
+ crc= my_checksum(0, (uchar*) header, sizeof(header));
crc= my_checksum(crc, (uchar*) p, ident_len);
int4store(b, crc);
packet->append(b, sizeof(b));
@@ -554,6 +758,810 @@ static int send_heartbeat_event(NET* net, String* packet,
}
+struct binlog_file_entry
+{
+ binlog_file_entry *next;
+ char *name;
+};
+
+static binlog_file_entry *
+get_binlog_list(MEM_ROOT *memroot)
+{
+ IO_CACHE *index_file;
+ char fname[FN_REFLEN];
+ size_t length;
+ binlog_file_entry *current_list= NULL, *e;
+ DBUG_ENTER("get_binlog_list");
+
+ if (!mysql_bin_log.is_open())
+ {
+ my_error(ER_NO_BINARY_LOGGING, MYF(0));
+ DBUG_RETURN(NULL);
+ }
+
+ mysql_bin_log.lock_index();
+ index_file=mysql_bin_log.get_index_file();
+ reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
+
+ /* The file ends with EOF or empty line */
+ while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
+ {
+ --length; /* Remove the newline */
+ if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) ||
+ !(e->name= strmake_root(memroot, fname, length)))
+ {
+ mysql_bin_log.unlock_index();
+ my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e));
+ DBUG_RETURN(NULL);
+ }
+ e->next= current_list;
+ current_list= e;
+ }
+ mysql_bin_log.unlock_index();
+
+ DBUG_RETURN(current_list);
+}
+
+/*
+ Find the Gtid_list_log_event at the start of a binlog.
+
+ NULL for ok, non-NULL error message for error.
+
+ If ok, then the event is returned in *out_gtid_list. This can be NULL if we
+ get back to binlogs written by old server version without GTID support. If
+ so, it means we have reached the point to start from, as no GTID events can
+ exist in earlier binlogs.
+*/
+static const char *
+get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
+{
+ Format_description_log_event init_fdle(BINLOG_VERSION);
+ Format_description_log_event *fdle;
+ Log_event *ev;
+ const char *errormsg = NULL;
+
+ *out_gtid_list= NULL;
+
+ if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
+ opt_master_verify_checksum)) ||
+ ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ {
+ if (ev)
+ delete ev;
+ return "Could not read format description log event while looking for "
+ "GTID position in binlog";
+ }
+
+ fdle= static_cast<Format_description_log_event *>(ev);
+
+ for (;;)
+ {
+ Log_event_type typ;
+
+ ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
+ if (!ev)
+ {
+ errormsg= "Could not read GTID list event while looking for GTID "
+ "position in binlog";
+ break;
+ }
+ typ= ev->get_type_code();
+ if (typ == GTID_LIST_EVENT)
+ break; /* Done, found it */
+ delete ev;
+ if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+ typ == FORMAT_DESCRIPTION_EVENT)
+ continue; /* Continue looking */
+
+ /* We did not find any Gtid_list_log_event, must be old binlog. */
+ ev= NULL;
+ break;
+ }
+
+ delete fdle;
+ *out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
+ return errormsg;
+}
+
+
+/*
+ Check if every GTID requested by the slave is contained in this (or a later)
+ binlog file. Return true if so, false if not.
+
+ We do the check with a single scan of the list of GTIDs, avoiding the need
+ to build an in-memory hash or stuff like that.
+
+ We need to check that slave did not request GTID D-S-N1, when the
+ Gtid_list_log_event for this binlog file has D-S-N2 with N2 >= N1.
+ (Because this means that requested GTID is in an earlier binlog).
+ However, if the Gtid_list_log_event indicates that D-S-N1 is the very last
+ GTID for domain D in prior binlog files, then it is ok to start from the
+ very start of this binlog file. This special case is important, as it
+ allows to purge old logs even if some domain is unused for long.
+
+ In addition, we need to check that we do not have a GTID D-S-N3 in the
+ Gtid_list_log_event where D is not present in the requested slave state at
+ all. Since if D is not in requested slave state, it means that slave needs
+ to start at the very first GTID in domain D.
+*/
+static bool
+contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
+{
+ uint32 i;
+
+ for (i= 0; i < glev->count; ++i)
+ {
+ uint32 gl_domain_id= glev->list[i].domain_id;
+ const rpl_gtid *gtid= st->find(gl_domain_id);
+ if (!gtid)
+ {
+ /*
+ The slave needs to start from the very beginning of this domain, which
+ is in an earlier binlog file. So we need to search back further.
+ */
+ return false;
+ }
+ if (gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no <= glev->list[i].seq_no)
+ {
+ /*
+ The slave needs to start after gtid, but it is contained in an earlier
+ binlog file. So we need to search back further, unless it was the very
+ last gtid logged for the domain in earlier binlog files.
+ */
+ if (gtid->seq_no < glev->list[i].seq_no)
+ return false;
+
+ /*
+ The slave requested D-S-N1, which happens to be the last GTID logged
+ in prior binlog files with same domain id D and server id S.
+
+ The Gtid_list is kept sorted on domain_id, with the last GTID in each
+ domain_id group being the last one logged. So if this is the last GTID
+ within the domain_id group, then it is ok to start from the very
+ beginning of this group, per the special case explained in comment at
+ the start of this function. If not, then we need to search back further.
+ */
+ if (i+1 < glev->count && gl_domain_id == glev->list[i+1].domain_id)
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
+static void
+give_error_start_pos_missing_in_binlog(int *err, const char **errormsg,
+ rpl_gtid *error_gtid)
+{
+ rpl_gtid binlog_gtid;
+
+ if (mysql_bin_log.lookup_domain_in_binlog_state(error_gtid->domain_id,
+ &binlog_gtid) &&
+ binlog_gtid.seq_no >= error_gtid->seq_no)
+ {
+ *errormsg= "Requested slave GTID state not found in binlog. The slave has "
+ "probably diverged due to executing erroneous transactions";
+ *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2;
+ }
+ else
+ {
+ *errormsg= "Requested slave GTID state not found in binlog";
+ *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
+ }
+}
+
+
+/*
+ Check the start GTID state requested by the slave against our binlog state.
+
+ Give an error if the slave requests something that we do not have in our
+ binlog.
+*/
+
+static int
+check_slave_start_position(binlog_send_info *info, const char **errormsg,
+ rpl_gtid *error_gtid)
+{
+ uint32 i;
+ int err;
+ slave_connection_state::entry **delete_list= NULL;
+ uint32 delete_idx= 0;
+ slave_connection_state *st= &info->gtid_state;
+
+ if (rpl_load_gtid_slave_state(info->thd))
+ {
+ *errormsg= "Failed to load replication slave GTID state";
+ err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
+ goto end;
+ }
+
+ for (i= 0; i < st->hash.records; ++i)
+ {
+ slave_connection_state::entry *slave_gtid_entry=
+ (slave_connection_state::entry *)my_hash_element(&st->hash, i);
+ rpl_gtid *slave_gtid= &slave_gtid_entry->gtid;
+ rpl_gtid master_gtid;
+ rpl_gtid master_replication_gtid;
+ rpl_gtid start_gtid;
+ bool start_at_own_slave_pos=
+ rpl_global_gtid_slave_state->domain_to_gtid(slave_gtid->domain_id,
+ &master_replication_gtid) &&
+ slave_gtid->server_id == master_replication_gtid.server_id &&
+ slave_gtid->seq_no == master_replication_gtid.seq_no;
+
+ if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
+ slave_gtid->server_id,
+ &master_gtid) &&
+ master_gtid.seq_no >= slave_gtid->seq_no)
+ {
+ /*
+ If connecting slave requests to start at the GTID we last applied when
+ we were ourselves a slave, then this GTID may not exist in our binlog
+ (in case of --log-slave-updates=0). So set the flag to disable the
+ error about missing GTID in the binlog in this case.
+ */
+ if (start_at_own_slave_pos)
+ slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS;
+ continue;
+ }
+
+ if (!start_at_own_slave_pos)
+ {
+ rpl_gtid domain_gtid;
+ slave_connection_state *until_gtid_state= info->until_gtid_state;
+ rpl_gtid *until_gtid;
+
+ if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
+ &domain_gtid))
+ {
+ /*
+ We do not have anything in this domain, neither in the binlog nor
+ in the slave state. So we are probably one master in a multi-master
+ setup, and this domain is served by a different master.
+
+ But set a flag so that if we then ever _do_ happen to encounter
+ anything in this domain, then we will re-check that the requested
+ slave position exists, and give the error at that time if not.
+ */
+ slave_gtid_entry->flags|= slave_connection_state::START_ON_EMPTY_DOMAIN;
+ continue;
+ }
+
+ if (info->slave_gtid_ignore_duplicates &&
+ domain_gtid.seq_no < slave_gtid->seq_no)
+ {
+ /*
+ When --gtid-ignore-duplicates, it is ok for the slave to request
+ something that we do not have (yet) - they might already have gotten
+ it through another path in a multi-path replication hierarchy.
+ */
+ continue;
+ }
+
+ if (until_gtid_state &&
+ ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) ||
+ (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id,
+ until_gtid->server_id,
+ &master_gtid) &&
+ master_gtid.seq_no >= until_gtid->seq_no)))
+ {
+ /*
+ The slave requested to start from a position that is not (yet) in
+ our binlog, but it also specified an UNTIL condition that _is_ in
+ our binlog (or a missing UNTIL, which means stop at the very
+ beginning). So the stop position is before the start position, and
+ we just delete the entry from the UNTIL hash to mark that this
+ domain has already reached the UNTIL condition.
+ */
+ if(until_gtid)
+ until_gtid_state->remove(until_gtid);
+ continue;
+ }
+
+ *error_gtid= *slave_gtid;
+ give_error_start_pos_missing_in_binlog(&err, errormsg, error_gtid);
+ goto end;
+ }
+
+ /*
+ Ok, so connecting slave asked to start at a GTID that we do not have in
+ our binlog, but it was in fact the last GTID we applied earlier, when we
+ were acting as a replication slave.
+
+ So this means that we were running as a replication slave without
+ --log-slave-updates, but now we switched to be a master. It is worth it
+ to handle this special case, as it allows users to run a simple
+ master -> slave without --log-slave-updates, and then exchange slave and
+ master, as long as they make sure the slave is caught up before switching.
+ */
+
+ /*
+ First check if we logged something ourselves as a master after being a
+ slave. This will be seen as a GTID with our own server_id and bigger
+ seq_no than what is in the slave state.
+
+ If we did not log anything ourselves, then start the connecting slave
+ replicating from the current binlog end position, which in this case
+ corresponds to our replication slave state and hence what the connecting
+ slave is requesting.
+ */
+ if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
+ global_system_variables.server_id,
+ &start_gtid) &&
+ start_gtid.seq_no > slave_gtid->seq_no)
+ {
+ /*
+ Start replication within this domain at the first GTID that we logged
+ ourselves after becoming a master.
+
+ Remember that this starting point is in fact a "fake" GTID which may
+ not exists in the binlog, so that we do not complain about it in
+ --gtid-strict-mode.
+ */
+ slave_gtid->server_id= global_system_variables.server_id;
+ slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS;
+ }
+ else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
+ &start_gtid))
+ {
+ slave_gtid->server_id= start_gtid.server_id;
+ slave_gtid->seq_no= start_gtid.seq_no;
+ }
+ else
+ {
+ /*
+ We do not have _anything_ in our own binlog for this domain. Just
+ delete the entry in the slave connection state, then it will pick up
+ anything new that arrives.
+
+ We just queue up the deletion and do it later, after the loop, so that
+ we do not mess up the iteration over the hash.
+ */
+ if (!delete_list)
+ {
+ if (!(delete_list= (slave_connection_state::entry **)
+ my_malloc(sizeof(*delete_list) * st->hash.records, MYF(MY_WME))))
+ {
+ *errormsg= "Out of memory while checking slave start position";
+ err= ER_OUT_OF_RESOURCES;
+ goto end;
+ }
+ }
+ delete_list[delete_idx++]= slave_gtid_entry;
+ }
+ }
+
+ /* Do any delayed deletes from the hash. */
+ if (delete_list)
+ {
+ for (i= 0; i < delete_idx; ++i)
+ st->remove(&(delete_list[i]->gtid));
+ }
+ err= 0;
+
+end:
+ if (delete_list)
+ my_free(delete_list);
+ return err;
+}
+
+/*
+ Find the name of the binlog file to start reading for a slave that connects
+ using GTID state.
+
+ Returns the file name in out_name, which must be of size at least FN_REFLEN.
+
+ Returns NULL on ok, error message on error.
+
+ In case of non-error return, the returned binlog file is guaranteed to
+ contain the first event to be transmitted to the slave for every domain
+ present in our binlogs. It is still necessary to skip all GTIDs up to
+ and including the GTID requested by slave within each domain.
+
+ However, as a special case, if the event to be sent to the slave is the very
+ first event (within that domain) in the returned binlog, then nothing should
+ be skipped, so that domain is deleted from the passed in slave connection
+ state.
+
+ This is necessary in case the slave requests a GTID within a replication
+ domain that has long been inactive. The binlog file containing that GTID may
+ have been long since purged. However, as long as no GTIDs after that have
+ been purged, we have the GTID requested by slave in the Gtid_list_log_event
+ of the latest binlog. So we can start from there, as long as we delete the
+ corresponding entry in the slave state so we do not wrongly skip any events
+ that might turn up if that domain becomes active again, vainly looking for
+ the requested GTID that was already purged.
+*/
+static const char *
+gtid_find_binlog_file(slave_connection_state *state, char *out_name,
+ slave_connection_state *until_gtid_state)
+{
+ MEM_ROOT memroot;
+ binlog_file_entry *list;
+ Gtid_list_log_event *glev= NULL;
+ const char *errormsg= NULL;
+ char buf[FN_REFLEN];
+
+ init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0,
+ MYF(MY_THREAD_SPECIFIC));
+ if (!(list= get_binlog_list(&memroot)))
+ {
+ errormsg= "Out of memory while looking for GTID position in binlog";
+ goto end;
+ }
+
+ while (list)
+ {
+ File file;
+ IO_CACHE cache;
+
+ if (!list->next)
+ {
+ /*
+ It should be safe to read the currently used binlog, as we will only
+ read the header part that is already written.
+
+ But if that does not work on windows, then we will need to cache the
+ event somewhere in memory I suppose - that could work too.
+ */
+ }
+ /*
+ Read the Gtid_list_log_event at the start of the binlog file to
+ get the binlog state.
+ */
+ if (normalize_binlog_name(buf, list->name, false))
+ {
+ errormsg= "Failed to determine binlog file name while looking for "
+ "GTID position in binlog";
+ goto end;
+ }
+ bzero((char*) &cache, sizeof(cache));
+ if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1)
+ goto end;
+ errormsg= get_gtid_list_event(&cache, &glev);
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+ if (errormsg)
+ goto end;
+
+ if (!glev || contains_all_slave_gtid(state, glev))
+ {
+ strmake(out_name, buf, FN_REFLEN);
+
+ if (glev)
+ {
+ uint32 i;
+
+ /*
+ As a special case, we allow to start from binlog file N if the
+ requested GTID is the last event (in the corresponding domain) in
+ binlog file (N-1), but then we need to remove that GTID from the slave
+ state, rather than skipping events waiting for it to turn up.
+
+ If slave is doing START SLAVE UNTIL, check for any UNTIL conditions
+ that are already included in a previous binlog file. Delete any such
+ from the UNTIL hash, to mark that such domains have already reached
+ their UNTIL condition.
+ */
+ for (i= 0; i < glev->count; ++i)
+ {
+ const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
+ if (!gtid)
+ {
+ /*
+ Contains_all_slave_gtid() returns false if there is any domain in
+ Gtid_list_event which is not in the requested slave position.
+
+ We may delete a domain from the slave state inside this loop, but
+ we only do this when it is the very last GTID logged for that
+ domain in earlier binlogs, and then we can not encounter it in any
+ further GTIDs in the Gtid_list.
+ */
+ DBUG_ASSERT(0);
+ } else if (gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no == glev->list[i].seq_no)
+ {
+ /*
+ The slave requested to start from the very beginning of this
+ domain in this binlog file. So delete the entry from the state,
+ we do not need to skip anything.
+ */
+ state->remove(gtid);
+ }
+
+ if (until_gtid_state &&
+ (gtid= until_gtid_state->find(glev->list[i].domain_id)) &&
+ gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no <= glev->list[i].seq_no)
+ {
+ /*
+ We've already reached the stop position in UNTIL for this domain,
+ since it is before the start position.
+ */
+ until_gtid_state->remove(gtid);
+ }
+ }
+ }
+
+ goto end;
+ }
+ delete glev;
+ glev= NULL;
+ list= list->next;
+ }
+
+ /* We reached the end without finding anything. */
+ errormsg= "Could not find GTID state requested by slave in any binlog "
+ "files. Probably the slave state is too old and required binlog files "
+ "have been purged.";
+
+end:
+ if (glev)
+ delete glev;
+
+ free_root(&memroot, MYF(0));
+ return errormsg;
+}
+
+
+/*
+ Given an old-style binlog position with file name and file offset, find the
+ corresponding gtid position. If the offset is not at an event boundary, give
+ an error.
+
+ Return NULL on ok, error message string on error.
+
+ ToDo: Improve the performance of this by using binlog index files.
+*/
+static const char *
+gtid_state_from_pos(const char *name, uint32 offset,
+ slave_connection_state *gtid_state)
+{
+ IO_CACHE cache;
+ File file;
+ const char *errormsg= NULL;
+ bool found_gtid_list_event= false;
+ bool found_format_description_event= false;
+ bool valid_pos= false;
+ uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+ int err;
+ String packet;
+ Format_description_log_event *fdev= NULL;
+
+ if (gtid_state->load((const rpl_gtid *)NULL, 0))
+ {
+ errormsg= "Internal error (out of memory?) initializing slave state "
+ "while scanning binlog to find start position";
+ return errormsg;
+ }
+
+ if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
+ return errormsg;
+
+ if (!(fdev= new Format_description_log_event(3)))
+ {
+ errormsg= "Out of memory initializing format_description event "
+ "while scanning binlog to find start position";
+ goto end;
+ }
+
+ /*
+ First we need to find the initial GTID_LIST_EVENT. We need this even
+ if the offset is at the very start of the binlog file.
+
+ But if we do not find any GTID_LIST_EVENT, then this is an old binlog
+ with no GTID information, so we return empty GTID state.
+ */
+ for (;;)
+ {
+ Log_event_type typ;
+ uint32 cur_pos;
+
+ cur_pos= (uint32)my_b_tell(&cache);
+ if (cur_pos == offset)
+ valid_pos= true;
+ if (found_format_description_event && found_gtid_list_event &&
+ cur_pos >= offset)
+ break;
+
+ packet.length(0);
+ err= Log_event::read_log_event(&cache, &packet, NULL,
+ current_checksum_alg);
+ if (err)
+ {
+ errormsg= "Could not read binlog while searching for slave start "
+ "position on master";
+ goto end;
+ }
+ /*
+ The cast to uchar is needed to avoid a signed char being converted to a
+ negative number.
+ */
+ typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
+ if (typ == FORMAT_DESCRIPTION_EVENT)
+ {
+ Format_description_log_event *tmp;
+
+ if (found_format_description_event)
+ {
+ errormsg= "Duplicate format description log event found while "
+ "searching for old-style position in binlog";
+ goto end;
+ }
+
+ current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
+ found_format_description_event= true;
+ if (!(tmp= new Format_description_log_event(packet.ptr(), packet.length(),
+ fdev)))
+ {
+ errormsg= "Corrupt Format_description event found or out-of-memory "
+ "while searching for old-style position in binlog";
+ goto end;
+ }
+ delete fdev;
+ fdev= tmp;
+ }
+ else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
+ {
+ errormsg= "Did not find format description log event while searching "
+ "for old-style position in binlog";
+ goto end;
+ }
+ else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+ typ == BINLOG_CHECKPOINT_EVENT)
+ continue; /* Continue looking */
+ else if (typ == GTID_LIST_EVENT)
+ {
+ rpl_gtid *gtid_list;
+ bool status;
+ uint32 list_len;
+
+ if (found_gtid_list_event)
+ {
+ errormsg= "Found duplicate Gtid_list_log_event while scanning binlog "
+ "to find slave start position";
+ goto end;
+ }
+ status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
+ current_checksum_alg,
+ &gtid_list, &list_len, fdev);
+ if (status)
+ {
+ errormsg= "Error reading Gtid_list_log_event while searching "
+ "for old-style position in binlog";
+ goto end;
+ }
+ err= gtid_state->load(gtid_list, list_len);
+ my_free(gtid_list);
+ if (err)
+ {
+ errormsg= "Internal error (out of memory?) initialising slave state "
+ "while scanning binlog to find start position";
+ goto end;
+ }
+ found_gtid_list_event= true;
+ }
+ else if (!found_gtid_list_event)
+ {
+ /* We did not find any Gtid_list_log_event, must be old binlog. */
+ goto end;
+ }
+ else if (typ == GTID_EVENT)
+ {
+ rpl_gtid gtid;
+ uchar flags2;
+ if (Gtid_log_event::peek(packet.ptr(), packet.length(),
+ current_checksum_alg, &gtid.domain_id,
+ &gtid.server_id, &gtid.seq_no, &flags2, fdev))
+ {
+ errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
+ "initial slave position";
+ goto end;
+ }
+ if (gtid_state->update(&gtid))
+ {
+ errormsg= "Internal error (out of memory?) updating slave state while "
+ "scanning binlog to find start position";
+ goto end;
+ }
+ }
+ }
+
+ if (!valid_pos)
+ {
+ errormsg= "Slave requested incorrect position in master binlog. "
+ "Requested position %u in file '%s', but this position does not "
+ "correspond to the location of any binlog event.";
+ }
+
+end:
+ delete fdev;
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+
+ return errormsg;
+}
+
+
+int
+gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
+{
+ slave_connection_state gtid_state;
+ const char *lookup_name;
+ char name_buf[FN_REFLEN];
+ LOG_INFO linfo;
+
+ if (!mysql_bin_log.is_open())
+ {
+ my_error(ER_NO_BINARY_LOGGING, MYF(0));
+ return 1;
+ }
+
+ if (in_name && in_name[0])
+ {
+ mysql_bin_log.make_log_name(name_buf, in_name);
+ lookup_name= name_buf;
+ }
+ else
+ lookup_name= NULL;
+ linfo.index_file_offset= 0;
+ if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1))
+ return 1;
+
+ if (pos < 4)
+ pos= 4;
+
+ if (gtid_state_from_pos(linfo.log_file_name, pos, &gtid_state) ||
+ gtid_state.to_string(out_str))
+ return 1;
+ return 0;
+}
+
+
+static bool
+is_until_reached(binlog_send_info *info, ulong *ev_offset,
+ Log_event_type event_type, const char **errmsg,
+ uint32 current_pos)
+{
+ switch (info->gtid_until_group)
+ {
+ case GTID_UNTIL_NOT_DONE:
+ return false;
+ case GTID_UNTIL_STOP_AFTER_STANDALONE:
+ if (Log_event::is_part_of_group(event_type))
+ return false;
+ break;
+ case GTID_UNTIL_STOP_AFTER_TRANSACTION:
+ if (event_type != XID_EVENT &&
+ (event_type != QUERY_EVENT ||
+ !Query_log_event::peek_is_commit_rollback
+ (info->packet->ptr()+*ev_offset,
+ info->packet->length()-*ev_offset,
+ info->current_checksum_alg)))
+ return false;
+ break;
+ }
+
+ /*
+ The last event group has been sent, now the START SLAVE UNTIL condition
+ has been reached.
+
+ Send a last fake Gtid_list_log_event with a flag set to mark that we
+ stop due to UNTIL condition.
+ */
+ if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg))
+ return true;
+ Gtid_list_log_event glev(&info->until_binlog_state,
+ Gtid_list_log_event::FLAG_UNTIL_REACHED);
+ if (fake_gtid_list_event(info, &glev, errmsg, current_pos))
+ return true;
+ *errmsg= NULL;
+ return true;
+}
+
+
/*
Helper function for mysql_binlog_send() to write an event down the slave
connection.
@@ -561,22 +1569,313 @@ static int send_heartbeat_event(NET* net, String* packet,
Returns NULL on success, error message string on error.
*/
static const char *
-send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
- Log_event_type event_type, char *log_file_name,
- IO_CACHE *log)
+send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
+ IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid)
{
my_off_t pos;
+ String* const packet= info->packet;
+ size_t len= packet->length();
+ int mariadb_slave_capability= info->mariadb_slave_capability;
+ uint8 current_checksum_alg= info->current_checksum_alg;
+ slave_connection_state *gtid_state= &info->gtid_state;
+ slave_connection_state *until_gtid_state= info->until_gtid_state;
+
+ if (event_type == GTID_LIST_EVENT &&
+ info->using_gtid_state && until_gtid_state)
+ {
+ rpl_gtid *gtid_list;
+ uint32 list_len;
+ bool err;
+
+ if (ev_offset > len ||
+ Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+ current_checksum_alg,
+ &gtid_list, &list_len, info->fdev))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to read Gtid_list_log_event: corrupt binlog";
+ }
+ err= info->until_binlog_state.load(gtid_list, list_len);
+ my_free(gtid_list);
+ if (err)
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed in internal GTID book-keeping: Out of memory";
+ }
+ }
+
+ /* Skip GTID event groups until we reach slave position within a domain_id. */
+ if (event_type == GTID_EVENT && info->using_gtid_state)
+ {
+ uchar flags2;
+ slave_connection_state::entry *gtid_entry;
+ rpl_gtid *gtid;
+
+ if (gtid_state->count() > 0 || until_gtid_state)
+ {
+ rpl_gtid event_gtid;
+
+ if (ev_offset > len ||
+ Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+ current_checksum_alg,
+ &event_gtid.domain_id, &event_gtid.server_id,
+ &event_gtid.seq_no, &flags2, info->fdev))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to read Gtid_log_event: corrupt binlog";
+ }
+
+ DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100",
+ {
+ rpl_gtid *dbug_gtid;
+ if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) &&
+ dbug_gtid->seq_no == 100)
+ {
+ DBUG_SET("-d,gtid_force_reconnect_at_10_1_100");
+ DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100");
+ my_errno= ER_UNKNOWN_ERROR;
+ return "DBUG-injected forced reconnect";
+ }
+ });
+
+ if (info->until_binlog_state.update_nolock(&event_gtid, false))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed in internal GTID book-keeping: Out of memory";
+ }
+
+ if (gtid_state->count() > 0)
+ {
+ gtid_entry= gtid_state->find_entry(event_gtid.domain_id);
+ if (gtid_entry != NULL)
+ {
+ gtid= &gtid_entry->gtid;
+ if (gtid_entry->flags & slave_connection_state::START_ON_EMPTY_DOMAIN)
+ {
+ rpl_gtid master_gtid;
+ if (!mysql_bin_log.find_in_binlog_state(gtid->domain_id,
+ gtid->server_id,
+ &master_gtid) ||
+ master_gtid.seq_no < gtid->seq_no)
+ {
+ int err;
+ const char *errormsg;
+ *error_gtid= *gtid;
+ give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid);
+ my_errno= err;
+ return errormsg;
+ }
+ gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN;
+ }
+
+ /* Skip this event group if we have not yet reached slave start pos. */
+ if (event_gtid.server_id != gtid->server_id ||
+ event_gtid.seq_no <= gtid->seq_no)
+ info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ if (event_gtid.server_id == gtid->server_id &&
+ event_gtid.seq_no >= gtid->seq_no)
+ {
+ if (info->slave_gtid_strict_mode &&
+ event_gtid.seq_no > gtid->seq_no &&
+ !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS))
+ {
+ /*
+ In strict mode, it is an error if the slave requests to start
+ in a "hole" in the master's binlog: a GTID that does not
+ exist, even though both the prior and subsequent seq_no exists
+ for same domain_id and server_id.
+ */
+ my_errno= ER_GTID_START_FROM_BINLOG_HOLE;
+ *error_gtid= *gtid;
+ return "The binlog on the master is missing the GTID requested "
+ "by the slave (even though both a prior and a subsequent "
+ "sequence number does exist), and GTID strict mode is enabled.";
+ }
+
+ /*
+ Send a fake Gtid_list event to the slave.
+ This allows the slave to update its current binlog position
+ so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work.
+ The fake event will be sent at the end of this event group.
+ */
+ info->send_fake_gtid_list= true;
+
+ /*
+ Delete this entry if we have reached slave start position (so we
+ will not skip subsequent events and won't have to look them up
+ and check).
+ */
+ gtid_state->remove(gtid);
+ }
+ }
+ }
+
+ if (until_gtid_state)
+ {
+ gtid= until_gtid_state->find(event_gtid.domain_id);
+ if (gtid == NULL)
+ {
+ /*
+ This domain already reached the START SLAVE UNTIL stop condition,
+ so skip this event group.
+ */
+ info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ else if (event_gtid.server_id == gtid->server_id &&
+ event_gtid.seq_no >= gtid->seq_no)
+ {
+ /*
+ We have reached the stop condition.
+ Delete this domain_id from the hash, so we will skip all further
+ events in this domain and eventually stop when all domains are
+ done.
+ */
+ uint64 until_seq_no= gtid->seq_no;
+ until_gtid_state->remove(gtid);
+ if (until_gtid_state->count() == 0)
+ info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_UNTIL_STOP_AFTER_STANDALONE :
+ GTID_UNTIL_STOP_AFTER_TRANSACTION);
+ if (event_gtid.seq_no > until_seq_no)
+ {
+ /*
+ The GTID in START SLAVE UNTIL condition is missing in our binlog.
+ This should normally not happen (user error), but since we can be
+ sure that we are now beyond the position that the UNTIL condition
+ should be in, we can just stop now. And we also need to skip this
+ event group (as it is beyond the UNTIL condition).
+ */
+ info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ Skip event group if we have not yet reached the correct slave GTID position.
+
+ Note that slave that understands GTID can also tolerate holes, so there is
+ no need to supply dummy event.
+ */
+ switch (info->gtid_skip_group)
+ {
+ case GTID_SKIP_STANDALONE:
+ if (!Log_event::is_part_of_group(event_type))
+ info->gtid_skip_group= GTID_SKIP_NOT;
+ return NULL;
+ case GTID_SKIP_TRANSACTION:
+ if (event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
+ len - ev_offset,
+ current_checksum_alg)))
+ info->gtid_skip_group= GTID_SKIP_NOT;
+ return NULL;
+ case GTID_SKIP_NOT:
+ break;
+ }
/* Do not send annotate_rows events unless slave requested it. */
if (event_type == ANNOTATE_ROWS_EVENT &&
- !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
- return NULL;
+ !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+ {
+ if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
+ {
+ /* This slave can tolerate events omitted from the binlog stream. */
+ return NULL;
+ }
+ else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE)
+ {
+ /*
+ The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as
+ it will not log them in its own binary log). However, it understands the
+ event and will just ignore it, and it would break if we omitted it,
+ leaving a hole in the binlog stream. So just send the event as-is.
+ */
+ }
+ else
+ {
+ /*
+ The slave does not understand ANNOTATE_ROWS_EVENT.
+
+ Older MariaDB slaves (and MySQL slaves) will break replication if there
+ are holes in the binlog stream (they will miscompute the binlog offset
+ and request the wrong position when reconnecting).
+
+ So replace the event with a dummy event of the same size that will be
+ a no-operation on the slave.
+ */
+ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to replace row annotate event with dummy: too small event.";
+ }
+ }
+ }
+
+ /*
+ Replace GTID events with old-style BEGIN events for slaves that do not
+ understand global transaction IDs. For stand-alone events, where there is
+ no terminating COMMIT query event, omit the GTID event or replace it with
+ a dummy event, as appropriate.
+ */
+ if (event_type == GTID_EVENT &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)
+ {
+ bool need_dummy=
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES;
+ bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy,
+ ev_offset,
+ current_checksum_alg);
+ if (err)
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to replace GTID event with backwards-compatible event: "
+ "currupt event.";
+ }
+ if (!need_dummy)
+ return NULL;
+ }
+
+ /*
+ Do not send binlog checkpoint or gtid list events to a slave that does not
+ understand it.
+ */
+ if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) ||
+ (unlikely(event_type == GTID_LIST_EVENT) &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID))
+ {
+ if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
+ {
+ /* This slave can tolerate events omitted from the binlog stream. */
+ return NULL;
+ }
+ else
+ {
+ /*
+ The slave does not understand BINLOG_CHECKPOINT_EVENT. Send a dummy
+ event instead, with same length so slave does not get confused about
+ binlog positions.
+ */
+ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to replace binlog checkpoint or gtid list event with "
+ "dummy: too small event.";
+ }
+ }
+ }
/*
Skip events with the @@skip_replication flag set, if slave requested
skipping of such events.
*/
- if (thd->variables.option_bits & OPTION_SKIP_REPLICATION)
+ if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION)
{
/*
The first byte of the packet is a '\0' to distinguish it from an error
@@ -587,29 +1886,43 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
return NULL;
}
- thd_proc_info(thd, "Sending binlog event to slave");
+ THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
- (thd, flags, packet, log_file_name, pos)))
+ (info->thd, info->flags, packet, info->log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
return "run 'before_send_event' hook failed";
+ }
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if (my_net_write(info->net, (uchar*) packet->ptr(), len))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
return "Failed on my_net_write()";
+ }
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
if (event_type == LOAD_EVENT)
{
- if (send_file(thd))
+ if (send_file(info->thd))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
return "failed in send_file()";
+ }
}
- if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ if (RUN_HOOK(binlog_transmit, after_send_event,
+ (info->thd, info->flags, packet)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
+ }
return NULL; /* Success */
}
+
void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
ushort flags)
{
@@ -621,24 +1934,32 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
IO_CACHE log;
File file = -1;
- String* const packet = &thd->packet;
+ String* const packet= &thd->packet;
int error;
const char *errmsg = "Unknown error", *tmp_msg;
char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message()
- NET* net = &thd->net;
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
+ char str_buf[128];
+ String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
+ char str_buf2[128];
+ String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info);
+ slave_connection_state until_gtid_state_obj;
+ rpl_gtid error_gtid;
+ binlog_send_info info(thd, packet, flags, log_file_name);
+ bool has_transmit_started= false;
- uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
+ uint dbug_reconnect_counter= 0;
#endif
DBUG_ENTER("mysql_binlog_send");
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ bzero(&error_gtid, sizeof(error_gtid));
/*
heartbeat_period from @master_heartbeat_period user variable
*/
@@ -649,21 +1970,43 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
*p_start_coord= &start_coord;
LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE },
*p_coord= &coord_buf;
- if (heartbeat_period != LL(0))
+ if (heartbeat_period != 0)
{
heartbeat_ts= &heartbeat_buf;
set_timespec_nsec(*heartbeat_ts, 0);
}
- if (global_system_variables.log_warnings > 1)
- sql_print_information("Start binlog_dump to slave_server(%u), pos(%s, %lu)",
- thd->server_id, log_ident, (ulong)pos);
- if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ info.mariadb_slave_capability= get_mariadb_slave_capability(thd);
+
+ connect_gtid_state.length(0);
+ info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;);
+ if (info.using_gtid_state)
{
- errmsg= "Failed to run hook 'transmit_start'";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
+ info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd);
+ info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd);
+ if(get_slave_until_gtid(thd, &slave_until_gtid_str))
+ info.until_gtid_state= &until_gtid_state_obj;
}
+ DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events",
+ {
+ DBUG_SET("-d,binlog_force_reconnect_after_22_events");
+ DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events");
+ dbug_reconnect_counter= 22;
+ });
+
+ /*
+ We want to corrupt the first event, in Log_event::read_log_event().
+ But we do not want the corruption to happen early, eg. when client does
+ BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to
+ set the real DBUG injection here.
+ */
+ DBUG_EXECUTE_IF("corrupt_read_log_event2_set",
+ {
+ DBUG_SET("-d,corrupt_read_log_event2_set");
+ DBUG_SET("+d,corrupt_read_log_event2");
+ });
+
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
{
@@ -673,6 +2016,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
#endif
+ if (!(info.fdev= new Format_description_log_event(3)))
+ {
+ errmsg= "Out of memory initializing format_description event";
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
+
if (!mysql_bin_log.is_open())
{
errmsg = "Binary log is not open";
@@ -687,10 +2037,45 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
name=search_file_name;
- if (log_ident[0])
- mysql_bin_log.make_log_name(search_file_name, log_ident);
+ if (info.using_gtid_state)
+ {
+ if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(),
+ connect_gtid_state.length()))
+ {
+ errmsg= "Out of memory or malformed slave request when obtaining start "
+ "position from GTID state";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ if (info.until_gtid_state &&
+ info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(),
+ slave_until_gtid_str.length()))
+ {
+ errmsg= "Out of memory or malformed slave request when obtaining UNTIL "
+ "position sent from slave";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ if ((error= check_slave_start_position(&info, &errmsg, &error_gtid)))
+ {
+ my_errno= error;
+ goto err;
+ }
+ if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name,
+ info.until_gtid_state)))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
+ pos= 4;
+ }
else
- name=0; // Find first log
+ {
+ if (log_ident[0])
+ mysql_bin_log.make_log_name(search_file_name, log_ident);
+ else
+ name=0; // Find first log
+ }
linfo.index_file_offset = 0;
@@ -718,6 +2103,17 @@ impossible position";
goto err;
}
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Start binlog_dump to slave_server(%lu), pos(%s, %lu)",
+ thd->variables.server_id, log_ident, (ulong)pos);
+ if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ {
+ errmsg= "Failed to run hook 'transmit_start'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ has_transmit_started= true;
+
/* reset transmit packet for the fake rotate event below */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
@@ -750,7 +2146,7 @@ impossible position";
given that we want minimum modification of 4.0, we send the normal
and fake Rotates.
*/
- if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg,
+ if (fake_rotate_event(&info, pos, &errmsg,
get_binlog_checksum_value_at_connect(thd)))
{
/*
@@ -800,14 +2196,16 @@ impossible position";
(*packet)[EVENT_TYPE_OFFSET+ev_offset]));
if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
- current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
- packet->length() - ev_offset);
- DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ Format_description_log_event *tmp;
+
+ info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+ packet->length() - ev_offset);
+ DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
@@ -817,6 +2215,18 @@ impossible position";
"slaves that cannot process them");
goto err;
}
+
+ if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
+ packet->length()-ev_offset,
+ info.fdev)))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ errmsg= "Corrupt Format_description event found or out-of-memory";
+ goto err;
+ }
+ delete info.fdev;
+ info.fdev= tmp;
+
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
@@ -832,12 +2242,12 @@ impossible position";
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* fix the checksum due to latest changes in header */
- if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset);
/* send it */
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@@ -867,12 +2277,22 @@ impossible position";
/* The Format_description_log_event event will be found naturally. */
}
+ /*
+ Handle the case of START SLAVE UNTIL with an UNTIL condition already
+ fulfilled at the start position.
+
+ We will send one event, the format_description, and then stop.
+ */
+ if (info.until_gtid_state && info.until_gtid_state->count() == 0)
+ info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE;
+
/* seek to the requested position, to start the requested dump */
my_b_seek(&log, pos); // Seek will done on next read
- while (!net->error && net->vio != 0 && !thd->killed)
+ while (!info.net->error && info.net->vio != 0 && !thd->killed)
{
Log_event_type event_type= UNKNOWN_EVENT;
+ killed_state killed;
/* reset the transmit packet for the event read from binary log
file */
@@ -880,15 +2300,16 @@ impossible position";
goto err;
bool is_active_binlog= false;
- while (!(error= Log_event::read_log_event(&log, packet, log_lock,
- current_checksum_alg,
+ while (!(killed= thd->killed) &&
+ !(error = Log_event::read_log_event(&log, packet, log_lock,
+ info.current_checksum_alg,
log_file_name,
&is_active_binlog)))
{
#ifndef DBUG_OFF
if (max_binlog_dump_events && !left_events--)
{
- net_flush(net);
+ net_flush(info.net);
errmsg = "Debugging binlog dump abort";
my_errno= ER_UNKNOWN_ERROR;
goto err;
@@ -906,7 +2327,7 @@ impossible position";
{
if (event_type == XID_EVENT)
{
- net_flush(net);
+ net_flush(info.net);
const char act[]=
"now "
"wait_for signal.continue";
@@ -923,14 +2344,16 @@ impossible position";
#endif
if (event_type == FORMAT_DESCRIPTION_EVENT)
{
- current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
+ Format_description_log_event *tmp;
+
+ info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset,
packet->length() - ev_offset);
- DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
- current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
+ DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32);
if (!is_slave_checksum_aware(thd) &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
- current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
errmsg= "Slave can not handle replication events with the checksum "
@@ -941,22 +2364,94 @@ impossible position";
goto err;
}
+ if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset,
+ packet->length()-ev_offset,
+ info.fdev)))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ errmsg= "Corrupt Format_description event found or out-of-memory";
+ goto err;
+ }
+ delete info.fdev;
+ info.fdev= tmp;
+
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+
+ if (info.using_gtid_state)
+ {
+ /*
+ If this event has the field `created' set, then it will cause the
+ slave to delete all active temporary tables. This must not happen
+ if the slave received any later GTIDs in a previous connect, as
+ those GTIDs might have created new temporary tables that are still
+ needed.
+
+ So here, we check if the starting GTID position was already
+ reached before this format description event. If not, we clear the
+ `created' flag to preserve temporary tables on the slave. (If the
+ slave connects at a position past this event, it means that it
+ already received and handled it in a previous connect).
+ */
+ if (!info.gtid_state.is_pos_reached())
+ {
+ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
+ if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ fix_checksum(packet, ev_offset);
+ }
+ }
}
- if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log)))
+#ifndef DBUG_OFF
+ if (dbug_reconnect_counter > 0)
+ {
+ --dbug_reconnect_counter;
+ if (dbug_reconnect_counter == 0)
+ {
+ errmsg= "DBUG-injected forced reconnect";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ }
+#endif
+
+ if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
+ ev_offset, &error_gtid)))
{
errmsg= tmp_msg;
- my_errno= ER_UNKNOWN_ERROR;
goto err;
}
+ if (unlikely(info.send_fake_gtid_list) &&
+ info.gtid_skip_group == GTID_SKIP_NOT)
+ {
+ Gtid_list_log_event glev(&info.until_binlog_state, 0);
+
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
+ fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ info.send_fake_gtid_list= false;
+ }
+ if (info.until_gtid_state &&
+ is_until_reached(&info, &ev_offset, event_type, &errmsg,
+ my_b_tell(&log)))
+ {
+ if (errmsg)
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ goto end;
+ }
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
{
if (event_type == XID_EVENT)
{
- net_flush(net);
+ net_flush(info.net);
}
});
@@ -964,6 +2459,8 @@ impossible position";
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
goto err;
}
+ if (killed)
+ goto end;
DBUG_EXECUTE_IF("wait_after_binlog_EOF",
{
@@ -991,7 +2488,7 @@ impossible position";
/*
Block until there is more data in the log
*/
- if (net_flush(net))
+ if (net_flush(info.net))
{
errmsg = "failed on net_flush()";
my_errno= ER_UNKNOWN_ERROR;
@@ -1034,7 +2531,7 @@ impossible position";
mysql_mutex_lock(log_lock);
switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0,
- current_checksum_alg)) {
+ info.current_checksum_alg)) {
case 0:
/* we read successfully, so we'll need to send it to the slave */
mysql_mutex_unlock(log_lock);
@@ -1049,7 +2546,8 @@ impossible position";
int ret;
ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
- if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
+ /* For mysqlbinlog (mysqlbinlog.server_id==0). */
+ if (thd->variables.server_id==0)
{
mysql_mutex_unlock(log_lock);
goto end;
@@ -1058,7 +2556,7 @@ impossible position";
#ifndef DBUG_OFF
ulong hb_info_counter= 0;
#endif
- const char* old_msg= thd->proc_info;
+ PSI_stage_info old_stage;
signal_cnt= mysql_bin_log.signal_cnt;
do
{
@@ -1067,9 +2565,11 @@ impossible position";
DBUG_ASSERT(heartbeat_ts);
set_timespec_nsec(*heartbeat_ts, heartbeat_period);
}
- thd->enter_cond(log_cond, log_lock,
- "Master has sent all binlog to slave; "
- "waiting for binlog to be updated");
+ thd->ENTER_COND(log_cond, log_lock,
+ &stage_master_has_sent_all_binlog_to_slave,
+ &old_stage);
+ if (thd->killed)
+ break;
ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts);
DBUG_ASSERT(ret == 0 || (heartbeat_period != 0));
if (ret == ETIMEDOUT || ret == ETIME)
@@ -1086,14 +2586,15 @@ impossible position";
/* reset transmit packet for the heartbeat event */
if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
{
- thd->exit_cond(old_msg);
+ thd->EXIT_COND(&old_stage);
goto err;
}
- if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg))
+ if (send_heartbeat_event(info.net, packet, p_coord,
+ info.current_checksum_alg))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
- thd->exit_cond(old_msg);
+ thd->EXIT_COND(&old_stage);
goto err;
}
}
@@ -1101,8 +2602,8 @@ impossible position";
{
DBUG_PRINT("wait",("binary log received update or a broadcast signal caught"));
}
- } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed);
- thd->exit_cond(old_msg);
+ } while (signal_cnt == mysql_bin_log.signal_cnt);
+ thd->EXIT_COND(&old_stage);
}
break;
@@ -1112,14 +2613,39 @@ impossible position";
goto err;
}
- if (read_packet &&
- (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
- log_file_name, &log)))
+ if (read_packet)
{
- errmsg= tmp_msg;
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ if ((tmp_msg= send_event_to_slave(&info, event_type, &log,
+ ev_offset, &error_gtid)))
+ {
+ errmsg= tmp_msg;
+ goto err;
+ }
+ if (unlikely(info.send_fake_gtid_list)
+ && info.gtid_skip_group == GTID_SKIP_NOT)
+ {
+ Gtid_list_log_event glev(&info.until_binlog_state, 0);
+
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) ||
+ fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ info.send_fake_gtid_list= false;
+ }
+ if (info.until_gtid_state &&
+ is_until_reached(&info, &ev_offset, event_type, &errmsg,
+ my_b_tell(&log)))
+ {
+ if (errmsg)
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ goto end;
+ }
+ }
log.error=0;
}
@@ -1129,7 +2655,7 @@ impossible position";
bool loop_breaker = 0;
/* need this to break out of the for loop from switch */
- thd_proc_info(thd, "Finished reading one binlog; switching to next binlog");
+ THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog);
switch (mysql_bin_log.find_next_log(&linfo, 1)) {
case 0:
break;
@@ -1166,8 +2692,8 @@ impossible position";
read and send is Format_description_log_event.
*/
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
- fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE,
- &errmsg, current_checksum_alg))
+ fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg,
+ info.current_checksum_alg))
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
@@ -1181,17 +2707,19 @@ end:
end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME));
- RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
+ if (has_transmit_started)
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
- thd_proc_info(thd, "Waiting to finalize termination");
+ THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
+ delete info.fdev;
DBUG_VOID_RETURN;
err:
- thd_proc_info(thd, "Waiting to finalize termination");
+ THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log))
{
/*
@@ -1207,10 +2735,50 @@ err:
my_basename(p_coord->file_name), p_coord->pos,
my_basename(log_file_name), my_b_tell(&log));
}
+ else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "Error: connecting slave requested to start from GTID "
+ "%u-%u-%llu, which is not in the master's binlog",
+ error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ /* Use this error code so slave will know not to try reconnect. */
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
+ else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "Error: connecting slave requested to start from GTID "
+ "%u-%u-%llu, which is not in the master's binlog. Since the "
+ "master's binlog contains GTIDs with higher sequence numbers, "
+ "it probably means that the slave has diverged due to "
+ "executing extra erroneous transactions",
+ error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ /* Use this error code so slave will know not to try reconnect. */
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
+ else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "The binlog on the master is missing the GTID %u-%u-%llu "
+ "requested by the slave (even though both a prior and a "
+ "subsequent sequence number does exist), and GTID strict mode "
+ "is enabled",
+ error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ /* Use this error code so slave will know not to try reconnect. */
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
+ else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "Failed to load replication slave GTID state from table %s.%s",
+ "mysql", rpl_gtid_slave_state_table_name.str);
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
else
strcpy(error_text, errmsg);
end_io_cache(&log);
- RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
+ if (has_transmit_started)
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
@@ -1224,6 +2792,7 @@ err:
if (file >= 0)
mysql_file_close(file, MYF(MY_WME));
thd->variables.max_allowed_packet= old_max_allowed_packet;
+ delete info.fdev;
my_message(my_errno, error_text, MYF(0));
DBUG_VOID_RETURN;
@@ -1242,18 +2811,61 @@ err:
@retval 0 success
@retval 1 error
+ @retval -1 fatal error
*/
+
int start_slave(THD* thd , Master_info* mi, bool net_report)
{
int slave_errno= 0;
int thread_mask;
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
DBUG_ENTER("start_slave");
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
- DBUG_RETURN(1);
- lock_slave_threads(mi); // this allows us to cleanly read slave_running
+ DBUG_RETURN(-1);
+
+ create_logfile_name_with_suffix(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, 0,
+ &mi->cmp_connection_name);
+ create_logfile_name_with_suffix(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, 0,
+ &mi->cmp_connection_name);
+
+ mi->lock_slave_threads();
+ if (mi->killed)
+ {
+ /* connection was deleted while we waited for lock_slave_threads */
+ mi->unlock_slave_threads();
+ my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(-1);
+ }
+
// Get a mask of _stopped_ threads
init_thread_mask(&thread_mask,mi,1 /* inverse */);
+
+ if (thd->lex->mi.gtid_pos_str.str)
+ {
+ if (thread_mask != (SLAVE_IO|SLAVE_SQL))
+ {
+ slave_errno= ER_SLAVE_WAS_RUNNING;
+ goto err;
+ }
+ if (thd->lex->slave_thd_opt)
+ {
+ slave_errno= ER_BAD_SLAVE_UNTIL_COND;
+ goto err;
+ }
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ {
+ slave_errno= ER_UNTIL_REQUIRES_USING_GTID;
+ goto err;
+ }
+ }
+
/*
Below we will start all stopped threads. But if the user wants to
start only one thread, do as if the other thread was running (as we
@@ -1264,10 +2876,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
thread_mask&= thd->lex->slave_thd_opt;
if (thread_mask) //some threads are stopped, start them
{
- if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
+ if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0,
thread_mask))
slave_errno=ER_MASTER_INFO;
- else if (server_id_supplied && *mi->host)
+ else if (!server_id_supplied)
+ {
+ slave_errno= ER_BAD_SLAVE; net_report= 0;
+ my_message(slave_errno, "Misconfigured slave: server_id was not set; Fix in config file",
+ MYF(0));
+ }
+ else if (!*mi->host)
+ {
+ slave_errno= ER_BAD_SLAVE; net_report= 0;
+ my_message(slave_errno, "Misconfigured slave: MASTER_HOST was not set; Fix in config file or with CHANGE MASTER TO",
+ MYF(0));
+ }
+ else
{
/*
If we will start SQL thread we will care about UNTIL options If
@@ -1298,10 +2922,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
strmake_buf(mi->rli.until_log_name, thd->lex->mi.relay_log_name);
}
+ else if (thd->lex->mi.gtid_pos_str.str)
+ {
+ if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str,
+ thd->lex->mi.gtid_pos_str.length))
+ {
+ slave_errno= ER_INCORRECT_GTID_STATE;
+ mysql_mutex_unlock(&mi->rli.data_lock);
+ goto err;
+ }
+ mi->rli.until_condition= Relay_log_info::UNTIL_GTID;
+ }
else
mi->rli.clear_until_condition();
- if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
+ if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS ||
+ mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS)
{
/* Preparing members for effective until condition checking */
const char *p= fn_ext(mi->rli.until_log_name);
@@ -1324,10 +2960,13 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
/* mark the cached result of the UNTIL comparison as "undefined" */
mi->rli.until_log_names_cmp_result=
Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
+ }
+ if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
+ {
/* Issuing warning then started without --skip-slave-start */
if (!opt_skip_slave_start)
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
+ push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
ER_MISSING_SKIP_SLAVE,
ER(ER_MISSING_SKIP_SLAVE));
}
@@ -1335,36 +2974,36 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
mysql_mutex_unlock(&mi->rli.data_lock);
}
else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
+ push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
ER(ER_UNTIL_COND_IGNORED));
if (!slave_errno)
- slave_errno = start_slave_threads(0 /*no mutex */,
- 1 /* wait for start */,
- mi,
- master_info_file,relay_log_info_file,
- thread_mask);
+ slave_errno = start_slave_threads(1,
+ 1 /* wait for start */,
+ mi,
+ master_info_file_tmp,
+ relay_log_info_file_tmp,
+ thread_mask);
}
- else
- slave_errno = ER_BAD_SLAVE;
}
else
{
/* no error if all threads are already started, only a warning */
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
+ push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING,
ER(ER_SLAVE_WAS_RUNNING));
}
- unlock_slave_threads(mi);
+err:
+ mi->unlock_slave_threads();
if (slave_errno)
{
if (net_report)
- my_message(slave_errno, ER(slave_errno), MYF(0));
- DBUG_RETURN(1);
+ my_error(slave_errno, MYF(0),
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(slave_errno == ER_BAD_SLAVE ? -1 : 1);
}
- else if (net_report)
- my_ok(thd);
DBUG_RETURN(0);
}
@@ -1382,21 +3021,25 @@ int start_slave(THD* thd , Master_info* mi, bool net_report)
@retval 0 success
@retval 1 error
+ @retval -1 error
*/
+
int stop_slave(THD* thd, Master_info* mi, bool net_report )
{
- DBUG_ENTER("stop_slave");
-
int slave_errno;
- if (!thd)
- thd = current_thd;
+ DBUG_ENTER("stop_slave");
+ DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str));
if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
- DBUG_RETURN(1);
- thd_proc_info(thd, "Killing slave");
+ DBUG_RETURN(-1);
+ THD_STAGE_INFO(thd, stage_killing_slave);
int thread_mask;
- lock_slave_threads(mi);
- // Get a mask of _running_ threads
+ mi->lock_slave_threads();
+ /*
+ Get a mask of _running_ threads.
+ We don't have to test for mi->killed as the thread_mask will take care
+ of checking if threads exists
+ */
init_thread_mask(&thread_mask,mi,0 /* not inverse*/);
/*
Below we will stop all running threads.
@@ -1409,18 +3052,17 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
if (thread_mask)
{
- slave_errno= terminate_slave_threads(mi,thread_mask,
- 1 /*skip lock */);
+ slave_errno= terminate_slave_threads(mi,thread_mask, 0 /* get lock */);
}
else
{
//no error if both threads are already stopped, only a warning
slave_errno= 0;
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
+ push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING,
ER(ER_SLAVE_WAS_NOT_RUNNING));
}
- unlock_slave_threads(mi);
- thd_proc_info(thd, 0);
+
+ mi->unlock_slave_threads();
if (slave_errno)
{
@@ -1428,8 +3070,6 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report )
my_message(slave_errno, ER(slave_errno), MYF(0));
DBUG_RETURN(1);
}
- else if (net_report)
- my_ok(thd);
DBUG_RETURN(0);
}
@@ -1452,16 +3092,28 @@ int reset_slave(THD *thd, Master_info* mi)
char fname[FN_REFLEN];
int thread_mask= 0, error= 0;
uint sql_errno=ER_UNKNOWN_ERROR;
- const char* errmsg= "Unknown error occured while reseting slave";
+ const char* errmsg= "Unknown error occurred while reseting slave";
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
DBUG_ENTER("reset_slave");
- lock_slave_threads(mi);
+ mi->lock_slave_threads();
+ if (mi->killed)
+ {
+ /* connection was deleted while we waited for lock_slave_threads */
+ mi->unlock_slave_threads();
+ my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(-1);
+ }
+
init_thread_mask(&thread_mask,mi,0 /* not inverse */);
if (thread_mask) // We refuse if any slave thread is running
{
- sql_errno= ER_SLAVE_MUST_STOP;
- error=1;
- goto err;
+ mi->unlock_slave_threads();
+ my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(ER_SLAVE_MUST_STOP);
}
ha_reset_slave(thd);
@@ -1485,30 +3137,46 @@ int reset_slave(THD *thd, Master_info* mi)
mi->clear_error();
mi->rli.clear_error();
mi->rli.clear_until_condition();
+ mi->rli.slave_skip_counter= 0;
// close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
end_master_info(mi);
+
end_relay_log_info(&mi->rli);
// and delete these two files
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ create_logfile_name_with_suffix(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, 0,
+ &mi->cmp_connection_name);
+ create_logfile_name_with_suffix(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, 0,
+ &mi->cmp_connection_name);
+
+ fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32);
if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) &&
mysql_file_delete(key_file_master_info, fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ else if (global_system_variables.log_warnings > 1)
+ sql_print_information("Deleted Master_info file '%s'.", fname);
+
// delete relay_log_info_file
- fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
+ fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32);
if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) &&
mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME)))
{
error=1;
goto err;
}
+ else if (global_system_variables.log_warnings > 1)
+ sql_print_information("Deleted Master_info file '%s'.", fname);
RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
- unlock_slave_threads(mi);
+ mi->unlock_slave_threads();
if (error)
my_error(sql_errno, MYF(0), errmsg);
DBUG_RETURN(error);
@@ -1542,8 +3210,8 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
while ((tmp=it++))
{
- if (tmp->command == COM_BINLOG_DUMP &&
- tmp->server_id == slave_server_id)
+ if (tmp->get_command() == COM_BINLOG_DUMP &&
+ tmp->variables.server_id == slave_server_id)
{
mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
break;
@@ -1598,10 +3266,14 @@ static bool get_string_parameter(char *to, const char *from, size_t length,
@param mi Pointer to Master_info object belonging to the slave's IO
thread.
+ @param master_info_added Out parameter saying if the Master_info *mi was
+ added to the global list of masters. This is useful in error conditions
+ to know if caller should free Master_info *mi.
+
@retval FALSE success
@retval TRUE error
*/
-bool change_master(THD* thd, Master_info* mi)
+bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
{
int thread_mask;
const char* errmsg= 0;
@@ -1610,20 +3282,17 @@ bool change_master(THD* thd, Master_info* mi)
char saved_host[HOSTNAME_LENGTH + 1];
uint saved_port;
char saved_log_name[FN_REFLEN];
+ Master_info::enum_using_gtid saved_using_gtid;
+ char master_info_file_tmp[FN_REFLEN];
+ char relay_log_info_file_tmp[FN_REFLEN];
my_off_t saved_log_pos;
+ LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
DBUG_ENTER("change_master");
- lock_slave_threads(mi);
- init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
- LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
- if (thread_mask) // We refuse if any slave thread is running
- {
- my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
- ret= TRUE;
- goto err;
- }
+ DBUG_ASSERT(master_info_index);
+ mysql_mutex_assert_owner(&LOCK_active_mi);
- thd_proc_info(thd, "Changing master");
+ *master_info_added= false;
/*
We need to check if there is an empty master_host. Otherwise
change master succeeds, a master.info file is created containing
@@ -1631,17 +3300,74 @@ bool change_master(THD* thd, Master_info* mi)
is thrown stating that the server is not configured as slave.
(See BUG#28796).
*/
- if(lex_mi->host && !*lex_mi->host)
+ if (lex_mi->host && !*lex_mi->host)
{
my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST");
- unlock_slave_threads(mi);
DBUG_RETURN(TRUE);
}
- // TODO: see if needs re-write
- if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
+ if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name,
+ lex_mi->host,
+ lex_mi->port))
+ DBUG_RETURN(TRUE);
+
+ mi->lock_slave_threads();
+ if (mi->killed)
+ {
+ /* connection was deleted while we waited for lock_slave_threads */
+ mi->unlock_slave_threads();
+ my_error(WARN_NO_MASTER_INFO, mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(TRUE);
+ }
+
+ init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
+ if (thread_mask) // We refuse if any slave thread is running
+ {
+ my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
+ mi->connection_name.str);
+ ret= TRUE;
+ goto err;
+ }
+
+ THD_STAGE_INFO(thd, stage_changing_master);
+
+ create_logfile_name_with_suffix(master_info_file_tmp,
+ sizeof(master_info_file_tmp),
+ master_info_file, 0,
+ &mi->cmp_connection_name);
+ create_logfile_name_with_suffix(relay_log_info_file_tmp,
+ sizeof(relay_log_info_file_tmp),
+ relay_log_info_file, 0,
+ &mi->cmp_connection_name);
+
+ /* if new Master_info doesn't exists, add it */
+ if (!master_info_index->get_master_info(&mi->connection_name,
+ Sql_condition::WARN_LEVEL_NOTE))
+ {
+ if (master_info_index->add_master_info(mi, TRUE))
+ {
+ my_error(ER_MASTER_INFO, MYF(0),
+ (int) lex_mi->connection_name.length,
+ lex_mi->connection_name.str);
+ ret= TRUE;
+ goto err;
+ }
+ *master_info_added= true;
+ }
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Master connection name: '%.*s' "
+ "Master_info_file: '%s' "
+ "Relay_info_file: '%s'",
+ (int) mi->connection_name.length,
+ mi->connection_name.str,
+ master_info_file_tmp, relay_log_info_file_tmp);
+
+ if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0,
thread_mask))
{
- my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
+ my_error(ER_MASTER_INFO, MYF(0),
+ (int) lex_mi->connection_name.length,
+ lex_mi->connection_name.str);
ret= TRUE;
goto err;
}
@@ -1659,6 +3385,7 @@ bool change_master(THD* thd, Master_info* mi)
saved_port= mi->port;
strmake_buf(saved_log_name, mi->master_log_name);
saved_log_pos= mi->master_log_pos;
+ saved_using_gtid= mi->using_gtid;
/*
If the user specified host or port without binlog or position,
@@ -1698,9 +3425,9 @@ bool change_master(THD* thd, Master_info* mi)
if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED)
mi->heartbeat_period = lex_mi->heartbeat_period;
else
- mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD,
+ mi->heartbeat_period= (float) MY_MIN(SLAVE_MAX_HEARTBEAT_PERIOD,
(slave_net_timeout/2.0));
- mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd
+ mi->received_heartbeats= 0; // counter lives until master is CHANGEd
/*
reset the last time server_id list if the current CHANGE MASTER
is mentioning IGNORE_SERVER_IDS= (...)
@@ -1711,7 +3438,7 @@ bool change_master(THD* thd, Master_info* mi)
{
ulong s_id;
get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
- if (s_id == ::server_id && replicate_same_server_id)
+ if (s_id == global_system_variables.server_id && replicate_same_server_id)
{
my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id));
ret= TRUE;
@@ -1746,11 +3473,16 @@ bool change_master(THD* thd, Master_info* mi)
strmake_buf(mi->ssl_cipher, lex_mi->ssl_cipher);
if (lex_mi->ssl_key)
strmake_buf(mi->ssl_key, lex_mi->ssl_key);
+ if (lex_mi->ssl_crl)
+ strmake_buf(mi->ssl_crl, lex_mi->ssl_crl);
+ if (lex_mi->ssl_crlpath)
+ strmake_buf(mi->ssl_crlpath, lex_mi->ssl_crlpath);
+
#ifndef HAVE_OPENSSL
if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath ||
lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key ||
- lex_mi->ssl_verify_server_cert )
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
+ lex_mi->ssl_verify_server_cert || lex_mi->ssl_crl || lex_mi->ssl_crlpath)
+ push_warning(thd, Sql_condition::WARN_LEVEL_NOTE,
ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS));
#endif
@@ -1769,6 +3501,15 @@ bool change_master(THD* thd, Master_info* mi)
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
}
+ if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_SLAVE_POS)
+ mi->using_gtid= Master_info::USE_GTID_SLAVE_POS;
+ else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_CURRENT_POS)
+ mi->using_gtid= Master_info::USE_GTID_CURRENT_POS;
+ else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_NO ||
+ lex_mi->log_file_name || lex_mi->pos ||
+ lex_mi->relay_log_name || lex_mi->relay_log_pos)
+ mi->using_gtid= Master_info::USE_GTID_NO;
+
/*
If user did specify neither host nor port nor any log name nor any log
pos, i.e. he specified only user/password/master_connect_retry, he probably
@@ -1789,15 +3530,16 @@ bool change_master(THD* thd, Master_info* mi)
{
/*
Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
- not initialized), so we use a max().
+ not initialized), so we use a MY_MAX().
What happens to mi->rli.master_log_pos during the initialization stages
of replication is not 100% clear, so we guard against problems using
- max().
+ MY_MAX().
*/
- mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
+ mi->master_log_pos = MY_MAX(BIN_LOG_HEADER_SIZE,
mi->rli.group_master_log_pos);
strmake_buf(mi->master_log_name, mi->rli.group_master_log_name);
}
+
/*
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
a slave before).
@@ -1810,8 +3552,7 @@ bool change_master(THD* thd, Master_info* mi)
}
if (need_relay_log_purge)
{
- relay_log_purge= 1;
- thd_proc_info(thd, "Purging old relay logs");
+ THD_STAGE_INFO(thd, stage_purging_old_relay_logs);
if (purge_relay_logs(&mi->rli, thd,
0 /* not only reset, but also reinit */,
&errmsg))
@@ -1824,7 +3565,6 @@ bool change_master(THD* thd, Master_info* mi)
else
{
const char* msg;
- relay_log_purge= 0;
/* Relay log is already initialized */
if (init_relay_log_pos(&mi->rli,
mi->rli.group_relay_log_name,
@@ -1859,6 +3599,7 @@ bool change_master(THD* thd, Master_info* mi)
/* Clear the errors, for a clean start */
mi->rli.clear_error();
mi->rli.clear_until_condition();
+ mi->rli.slave_skip_counter= 0;
sql_print_information("'CHANGE MASTER TO executed'. "
"Previous state master_host='%s', master_port='%u', master_log_file='%s', "
@@ -1867,6 +3608,11 @@ bool change_master(THD* thd, Master_info* mi)
"master_log_pos='%ld'.", saved_host, saved_port, saved_log_name,
(ulong) saved_log_pos, mi->host, mi->port, mi->master_log_name,
(ulong) mi->master_log_pos);
+ if (saved_using_gtid != Master_info::USE_GTID_NO ||
+ mi->using_gtid != Master_info::USE_GTID_NO)
+ sql_print_information("Previous Using_Gtid=%s. New Using_Gtid=%s",
+ mi->using_gtid_astext(saved_using_gtid),
+ mi->using_gtid_astext(mi->using_gtid));
/*
If we don't write new coordinates to disk now, then old will remain in
@@ -1875,13 +3621,13 @@ bool change_master(THD* thd, Master_info* mi)
in-memory value at restart (thus causing errors, as the old relay log does
not exist anymore).
*/
- flush_relay_log_info(&mi->rli);
+ if (flush_relay_log_info(&mi->rli))
+ ret= 1;
mysql_cond_broadcast(&mi->data_cond);
mysql_mutex_unlock(&mi->rli.data_lock);
err:
- unlock_slave_threads(mi);
- thd_proc_info(thd, 0);
+ mi->unlock_slave_threads();
if (ret == FALSE)
my_ok(thd);
DBUG_RETURN(ret);
@@ -1897,7 +3643,7 @@ err:
@retval 0 success
@retval 1 error
*/
-int reset_master(THD* thd)
+int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len)
{
if (!mysql_bin_log.is_open())
{
@@ -1906,7 +3652,7 @@ int reset_master(THD* thd)
return 1;
}
- if (mysql_bin_log.reset_logs(thd))
+ if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len))
return 1;
RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
return 0;
@@ -1932,8 +3678,8 @@ bool mysql_show_binlog_events(THD* thd)
File file = -1;
MYSQL_BIN_LOG *binary_log= NULL;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
+ Master_info *mi= 0;
LOG_INFO linfo;
-
DBUG_ENTER("mysql_show_binlog_events");
Log_event::init_show_field_list(&field_list);
@@ -1941,13 +3687,10 @@ bool mysql_show_binlog_events(THD* thd)
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
- Format_description_log_event *description_event= new
- Format_description_log_event(3); /* MySQL 4.0 by default */
-
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ||
thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
- /* select wich binary log to use: binlog or relay */
+ /* select which binary log to use: binlog or relay */
if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS )
{
/*
@@ -1961,23 +3704,35 @@ bool mysql_show_binlog_events(THD* thd)
}
else /* showing relay log contents */
{
- if (!active_mi)
+ if (!(mi= get_master_info(&thd->variables.default_master_connection,
+ Sql_condition::WARN_LEVEL_ERROR)))
+ {
DBUG_RETURN(TRUE);
-
- binary_log= &(active_mi->rli.relay_log);
+ }
+ binary_log= &(mi->rli.relay_log);
}
+ Format_description_log_event *description_event= new
+ Format_description_log_event(3); /* MySQL 4.0 by default */
+
if (binary_log->is_open())
{
LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
SELECT_LEX_UNIT *unit= &thd->lex->unit;
ha_rows event_count, limit_start, limit_end;
- my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
+ my_off_t pos = MY_MAX(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly
char search_file_name[FN_REFLEN], *name;
const char *log_file_name = lex_mi->log_file_name;
mysql_mutex_t *log_lock = binary_log->get_log_lock();
Log_event* ev;
+ if (mi)
+ {
+ /* We can unlock the mutex as we have a lock on the file */
+ mi->release();
+ mi= 0;
+ }
+
unit->set_limit(thd->lex->current_select);
limit_start= unit->offset_limit_cnt;
limit_end= unit->select_limit_cnt;
@@ -1996,6 +3751,7 @@ bool mysql_show_binlog_events(THD* thd)
goto err;
}
+ /* These locks is here to enable syncronization with log_in_use() */
mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = &linfo;
mysql_mutex_unlock(&LOCK_thread_count);
@@ -2072,6 +3828,9 @@ bool mysql_show_binlog_events(THD* thd)
mysql_mutex_unlock(log_lock);
}
+ else if (mi)
+ mi->release();
+
// Check that linfo is still on the function scope.
DEBUG_SYNC(thd, "after_show_binlog_events");
@@ -2091,8 +3850,9 @@ err:
else
my_eof(thd);
+ /* These locks is here to enable syncronization with log_in_use() */
mysql_mutex_lock(&LOCK_thread_count);
- thd->current_linfo = 0;
+ thd->current_linfo= 0;
mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet;
DBUG_RETURN(ret);
@@ -2252,14 +4012,14 @@ int log_loaded_block(IO_CACHE* file)
DBUG_RETURN(0);
for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0;
- buffer += min(block_len, max_event_size),
- block_len -= min(block_len, max_event_size))
+ buffer += MY_MIN(block_len, max_event_size),
+ block_len -= MY_MIN(block_len, max_event_size))
{
lf_info->last_pos_in_file= my_b_get_pos_in_file(file);
if (lf_info->wrote_create_file)
{
Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer,
- min(block_len, max_event_size),
+ MY_MIN(block_len, max_event_size),
lf_info->log_delayed);
if (mysql_bin_log.write(&a))
DBUG_RETURN(1);
@@ -2268,7 +4028,7 @@ int log_loaded_block(IO_CACHE* file)
{
Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db,
buffer,
- min(block_len, max_event_size),
+ MY_MIN(block_len, max_event_size),
lf_info->log_delayed);
if (mysql_bin_log.write(&b))
DBUG_RETURN(1);
@@ -2278,4 +4038,204 @@ int log_loaded_block(IO_CACHE* file)
DBUG_RETURN(0);
}
+
+/**
+ Initialise the slave replication state from the mysql.gtid_slave_pos table.
+
+ This is called each time an SQL thread starts, but the data is only actually
+ loaded on the first call.
+
+ The slave state is the last GTID applied on the slave within each
+ replication domain.
+
+ To avoid row lock contention, there are multiple rows for each domain_id.
+ The one containing the current slave state is the one with the maximal
+ sub_id value, within each domain_id.
+
+ CREATE TABLE mysql.gtid_slave_pos (
+ domain_id INT UNSIGNED NOT NULL,
+ sub_id BIGINT UNSIGNED NOT NULL,
+ server_id INT UNSIGNED NOT NULL,
+ seq_no BIGINT UNSIGNED NOT NULL,
+ PRIMARY KEY (domain_id, sub_id))
+*/
+
+void
+rpl_init_gtid_slave_state()
+{
+ rpl_global_gtid_slave_state= new rpl_slave_state;
+}
+
+
+void
+rpl_deinit_gtid_slave_state()
+{
+ delete rpl_global_gtid_slave_state;
+}
+
+
+void
+rpl_init_gtid_waiting()
+{
+ rpl_global_gtid_waiting.init();
+}
+
+
+void
+rpl_deinit_gtid_waiting()
+{
+ rpl_global_gtid_waiting.destroy();
+}
+
+
+/*
+ Format the current GTID state as a string, for returning the value of
+ @@global.gtid_slave_pos.
+
+ If the flag use_binlog is true, then the contents of the binary log (if
+ enabled) is merged into the current GTID state (@@global.gtid_current_pos).
+*/
+int
+rpl_append_gtid_state(String *dest, bool use_binlog)
+{
+ int err;
+ rpl_gtid *gtid_list= NULL;
+ uint32 num_gtids= 0;
+
+ if (use_binlog && opt_bin_log &&
+ (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
+ return err;
+
+ err= rpl_global_gtid_slave_state->tostring(dest, gtid_list, num_gtids);
+ my_free(gtid_list);
+
+ return err;
+}
+
+
+/*
+ Load the current GTID position into a slave_connection_state, for use when
+ connecting to a master server with GTID.
+
+ If the flag use_binlog is true, then the contents of the binary log (if
+ enabled) is merged into the current GTID state (master_use_gtid=current_pos).
+*/
+int
+rpl_load_gtid_state(slave_connection_state *state, bool use_binlog)
+{
+ int err;
+ rpl_gtid *gtid_list= NULL;
+ uint32 num_gtids= 0;
+
+ if (use_binlog && opt_bin_log &&
+ (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
+ return err;
+
+ err= state->load(rpl_global_gtid_slave_state, gtid_list, num_gtids);
+ my_free(gtid_list);
+
+ return err;
+}
+
+
+bool
+rpl_gtid_pos_check(THD *thd, char *str, size_t len)
+{
+ slave_connection_state tmp_slave_state;
+ bool gave_conflict_warning= false, gave_missing_warning= false;
+
+ /* Check that we can parse the supplied string. */
+ if (tmp_slave_state.load(str, len))
+ return true;
+
+ /*
+ Check our own binlog for any of our own transactions that are newer
+ than the GTID state the user is requesting. Any such transactions would
+ result in an out-of-order binlog, which could break anyone replicating
+ with us as master.
+
+ So give an error if this is found, requesting the user to do a
+ RESET MASTER (to clean up the binlog) if they really want this.
+ */
+ if (mysql_bin_log.is_open())
+ {
+ rpl_gtid *binlog_gtid_list= NULL;
+ uint32 num_binlog_gtids= 0;
+ uint32 i;
+
+ if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list,
+ &num_binlog_gtids))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ return true;
+ }
+ for (i= 0; i < num_binlog_gtids; ++i)
+ {
+ rpl_gtid *binlog_gtid= &binlog_gtid_list[i];
+ rpl_gtid *slave_gtid;
+ if (binlog_gtid->server_id != global_system_variables.server_id)
+ continue;
+ if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id)))
+ {
+ if (opt_gtid_strict_mode)
+ {
+ my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0),
+ binlog_gtid->domain_id, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ break;
+ }
+ else if (!gave_missing_warning)
+ {
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_MASTER_GTID_POS_MISSING_DOMAIN,
+ ER(ER_MASTER_GTID_POS_MISSING_DOMAIN),
+ binlog_gtid->domain_id, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ gave_missing_warning= true;
+ }
+ }
+ else if (slave_gtid->seq_no < binlog_gtid->seq_no)
+ {
+ if (opt_gtid_strict_mode)
+ {
+ my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0),
+ slave_gtid->domain_id, slave_gtid->server_id,
+ slave_gtid->seq_no, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ break;
+ }
+ else if (!gave_conflict_warning)
+ {
+ push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+ ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG,
+ ER(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG),
+ slave_gtid->domain_id, slave_gtid->server_id,
+ slave_gtid->seq_no, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ gave_conflict_warning= true;
+ }
+ }
+ }
+ my_free(binlog_gtid_list);
+ if (i != num_binlog_gtids)
+ return true;
+ }
+
+ return false;
+}
+
+
+bool
+rpl_gtid_pos_update(THD *thd, char *str, size_t len)
+{
+ if (rpl_global_gtid_slave_state->load(thd, str, len, true, true))
+ {
+ my_error(ER_FAILED_GTID_STATE_INIT, MYF(0));
+ return true;
+ }
+ else
+ return false;
+}
+
+
#endif /* HAVE_REPLICATION */