summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc995
1 files changed, 977 insertions, 18 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 62336073b28..d767fb50cae 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -16,10 +16,12 @@
#include "sql_priv.h"
#include "unireg.h"
+#include "sql_base.h"
#include "sql_parse.h" // check_access
#ifdef HAVE_REPLICATION
#include "rpl_mi.h"
+#include "rpl_rli.h"
#include "sql_repl.h"
#include "sql_acl.h" // SUPER_ACL
#include "log_event.h"
@@ -81,7 +83,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
uint ident_len = (uint) strlen(p);
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
- int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
@@ -505,6 +507,26 @@ get_mariadb_slave_capability(THD *thd)
/*
+ Get the value of the @slave_connect_state user variable into the supplied
+ String (this is the GTID connect state requested by the connecting slave).
+
+ Returns false if error (ie. slave did not set the variable and does not
+ want to use GTID to set start position), true if success.
+*/
+static bool
+get_slave_connect_state(THD *thd, String *out_str)
+{
+ bool null_value;
+
+ const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") };
+ user_var_entry *entry=
+ (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str,
+ name.length);
+ return entry && entry->val_str(&null_value, out_str, 0) && !null_value;
+}
+
+
+/*
Function prepares and sends repliation heartbeat event.
@param net net object of THD
@@ -539,7 +561,7 @@ static int send_heartbeat_event(NET* net, String* packet,
uint ident_len = strlen(p);
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN +
(do_checksum ? BINLOG_CHECKSUM_LEN : 0);
- int4store(header + SERVER_ID_OFFSET, server_id);
+ int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0);
@@ -567,6 +589,643 @@ static int send_heartbeat_event(NET* net, String* packet,
}
+struct binlog_file_entry
+{
+ binlog_file_entry *next;
+ char *name;
+};
+
+static binlog_file_entry *
+get_binlog_list(MEM_ROOT *memroot)
+{
+ IO_CACHE *index_file;
+ char fname[FN_REFLEN];
+ size_t length;
+ binlog_file_entry *current_list= NULL, *e;
+ DBUG_ENTER("get_binlog_list");
+
+ if (!mysql_bin_log.is_open())
+ {
+ my_error(ER_NO_BINARY_LOGGING, MYF(0));
+ DBUG_RETURN(NULL);
+ }
+
+ mysql_bin_log.lock_index();
+ index_file=mysql_bin_log.get_index_file();
+ reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
+
+ /* The file ends with EOF or empty line */
+ while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
+ {
+ --length; /* Remove the newline */
+ if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) ||
+ !(e->name= strmake_root(memroot, fname, length)))
+ {
+ mysql_bin_log.unlock_index();
+ my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e));
+ DBUG_RETURN(NULL);
+ }
+ e->next= current_list;
+ current_list= e;
+ }
+ mysql_bin_log.unlock_index();
+
+ DBUG_RETURN(current_list);
+}
+
+/*
+ Find the Gtid_list_log_event at the start of a binlog.
+
+ NULL for ok, non-NULL error message for error.
+
+ If ok, then the event is returned in *out_gtid_list. This can be NULL if we
+ get back to binlogs written by old server version without GTID support. If
+ so, it means we have reached the point to start from, as no GTID events can
+ exist in earlier binlogs.
+*/
+static const char *
+get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list)
+{
+ Format_description_log_event init_fdle(BINLOG_VERSION);
+ Format_description_log_event *fdle;
+ Log_event *ev;
+ const char *errormsg = NULL;
+
+ *out_gtid_list= NULL;
+
+ if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle,
+ opt_master_verify_checksum)) ||
+ ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ {
+ if (ev)
+ delete ev;
+ return "Could not read format description log event while looking for "
+ "GTID position in binlog";
+ }
+
+ fdle= static_cast<Format_description_log_event *>(ev);
+
+ for (;;)
+ {
+ Log_event_type typ;
+
+ ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum);
+ if (!ev)
+ {
+ errormsg= "Could not read GTID list event while looking for GTID "
+ "position in binlog";
+ break;
+ }
+ typ= ev->get_type_code();
+ if (typ == GTID_LIST_EVENT)
+ break; /* Done, found it */
+ delete ev;
+ if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+ typ == FORMAT_DESCRIPTION_EVENT)
+ continue; /* Continue looking */
+
+ /* We did not find any Gtid_list_log_event, must be old binlog. */
+ ev= NULL;
+ break;
+ }
+
+ delete fdle;
+ *out_gtid_list= static_cast<Gtid_list_log_event *>(ev);
+ return errormsg;
+}
+
+
+/*
+ Check if every GTID requested by the slave is contained in this (or a later)
+ binlog file. Return true if so, false if not.
+
+ We do the check with a single scan of the list of GTIDs, avoiding the need
+ to build an in-memory hash or stuff like that.
+
+ We need to check that slave did not request GTID D-S-N1, when the
+ Gtid_list_log_event for this binlog file has D-S-N2 with N2 > N1.
+
+ In addition, we need to check that we do not have a GTID D-S-N3 in the
+ Gtid_list_log_event where D is not present in the requested slave state at
+ all. Since if D is not in requested slave state, it means that slave needs
+ to start at the very first GTID in domain D.
+*/
+static bool
+contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev)
+{
+ uint32 i;
+
+ for (i= 0; i < glev->count; ++i)
+ {
+ const rpl_gtid *gtid= st->find(glev->list[i].domain_id);
+ if (!gtid)
+ {
+ /*
+ The slave needs to start from the very beginning of this domain, which
+ is in an earlier binlog file. So we need to search back further.
+ */
+ return false;
+ }
+ if (gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no < glev->list[i].seq_no)
+ {
+ /*
+ The slave needs to receive gtid, but it is contained in an earlier
+ binlog file. So we need to search back further.
+ */
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
+/*
+ Check the start GTID state requested by the slave against our binlog state.
+
+ Give an error if the slave requests something that we do not have in our
+ binlog.
+
+ T
+*/
+
+static int
+check_slave_start_position(THD *thd, slave_connection_state *st,
+ const char **errormsg, rpl_gtid *error_gtid)
+{
+ uint32 i;
+ bool found;
+ int err;
+ rpl_gtid **delete_list= NULL;
+ uint32 delete_idx= 0;
+ bool slave_state_loaded= false;
+ uint32 missing_domains= 0;
+ rpl_gtid missing_domain_gtid;
+
+ for (i= 0; i < st->hash.records; ++i)
+ {
+ rpl_gtid *slave_gtid= (rpl_gtid *)my_hash_element(&st->hash, i);
+ rpl_gtid master_gtid;
+ rpl_gtid master_replication_gtid;
+ rpl_gtid start_gtid;
+
+ if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
+ slave_gtid->server_id,
+ &master_gtid)) &&
+ master_gtid.seq_no >= slave_gtid->seq_no)
+ continue;
+
+ if (!slave_state_loaded)
+ {
+ if (rpl_load_gtid_slave_state(thd))
+ {
+ *errormsg= "Failed to load replication slave GTID state";
+ err= ER_CANNOT_LOAD_SLAVE_GTID_STATE;
+ goto end;
+ }
+ slave_state_loaded= true;
+ }
+
+ if (!rpl_global_gtid_slave_state.domain_to_gtid(slave_gtid->domain_id,
+ &master_replication_gtid) ||
+ slave_gtid->server_id != master_replication_gtid.server_id ||
+ slave_gtid->seq_no != master_replication_gtid.seq_no)
+ {
+ rpl_gtid domain_gtid;
+
+ if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
+ &domain_gtid))
+ {
+ /*
+ We do not have anything in this domain, neither in the binlog nor
+ in the slave state. So we are probably one master in a multi-master
+ setup, and this domain is served by a different master.
+
+ This is not an error, however if we are missing _all_ domains
+ requested by the slave, then we still give error (below, after
+ the loop).
+ */
+ if (!missing_domains)
+ missing_domain_gtid= *slave_gtid;
+ ++missing_domains;
+ continue;
+ }
+ *errormsg= "Requested slave GTID state not found in binlog";
+ *error_gtid= *slave_gtid;
+ err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
+ goto end;
+ }
+
+ /*
+ Ok, so connecting slave asked to start at a GTID that we do not have in
+ our binlog, but it was in fact the last GTID we applied earlier, when we
+ were acting as a replication slave.
+
+ So this means that we were running as a replication slave without
+ --log-slave-updates, but now we switched to be a master. It is worth it
+ to handle this special case, as it allows users to run a simple
+ master -> slave without --log-slave-updates, and then exchange slave and
+ master, as long as they make sure the slave is caught up before switching.
+ */
+
+ /*
+ First check if we logged something ourselves as a master after being a
+ slave. This will be seen as a GTID with our own server_id and bigger
+ seq_no than what is in the slave state.
+
+ If we did not log anything ourselves, then start the connecting slave
+ replicating from the current binlog end position, which in this case
+ corresponds to our replication slave state and hence what the connecting
+ slave is requesting.
+ */
+ if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id,
+ global_system_variables.server_id,
+ &start_gtid) &&
+ start_gtid.seq_no > slave_gtid->seq_no)
+ {
+ /*
+ Start replication within this domain at the first GTID that we logged
+ ourselves after becoming a master.
+ */
+ slave_gtid->server_id= global_system_variables.server_id;
+ }
+ else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id,
+ &start_gtid))
+ {
+ slave_gtid->server_id= start_gtid.server_id;
+ slave_gtid->seq_no= start_gtid.seq_no;
+ }
+ else
+ {
+ /*
+ We do not have _anything_ in our own binlog for this domain. Just
+ delete the entry in the slave connection state, then it will pick up
+ anything new that arrives.
+
+ We just queue up the deletion and do it later, after the loop, so that
+ we do not mess up the iteration over the hash.
+ */
+ if (!delete_list)
+ {
+ if ((delete_list= (rpl_gtid **)my_malloc(sizeof(*delete_list),
+ MYF(MY_WME))))
+ {
+ *errormsg= "Out of memory while checking slave start position";
+ err= ER_OUT_OF_RESOURCES;
+ goto end;
+ }
+ }
+ delete_list[delete_idx++]= slave_gtid;
+ }
+ }
+
+ if (missing_domains == st->hash.records && missing_domains > 0)
+ {
+ *errormsg= "Requested slave GTID state not found in binlog";
+ *error_gtid= missing_domain_gtid;
+ err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG;
+ goto end;
+ }
+
+ /* Do any delayed deletes from the hash. */
+ if (delete_list)
+ {
+ for (i= 0; i < delete_idx; ++i)
+ st->remove(delete_list[i]);
+ }
+ err= 0;
+
+end:
+ if (delete_list)
+ my_free(delete_list);
+ return err;
+}
+
+/*
+ Find the name of the binlog file to start reading for a slave that connects
+ using GTID state.
+
+ Returns the file name in out_name, which must be of size at least FN_REFLEN.
+
+ Returns NULL on ok, error message on error.
+
+ In case of non-error return, the returned binlog file is guaranteed to
+ contain the first event to be transmitted to the slave for every domain
+ present in our binlogs. It is still necessary to skip all GTIDs up to
+ and including the GTID requested by slave within each domain.
+
+ However, as a special case, if the event to be sent to the slave is the very
+ first event (within that domain) in the returned binlog, then nothing should
+ be skipped, so that domain is deleted from the passed in slave connection
+ state.
+
+ This is necessary in case the slave requests a GTID within a replication
+ domain that has long been inactive. The binlog file containing that GTID may
+ have been long since purged. However, as long as no GTIDs after that have
+ been purged, we have the GTID requested by slave in the Gtid_list_log_event
+ of the latest binlog. So we can start from there, as long as we delete the
+ corresponding entry in the slave state so we do not wrongly skip any events
+ that might turn up if that domain becomes active again, vainly looking for
+ the requested GTID that was already purged.
+*/
+static const char *
+gtid_find_binlog_file(slave_connection_state *state, char *out_name)
+{
+ MEM_ROOT memroot;
+ binlog_file_entry *list;
+ Gtid_list_log_event *glev= NULL;
+ const char *errormsg= NULL;
+ char buf[FN_REFLEN];
+
+ init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0,
+ MYF(MY_THREAD_SPECIFIC));
+ if (!(list= get_binlog_list(&memroot)))
+ {
+ errormsg= "Out of memory while looking for GTID position in binlog";
+ goto end;
+ }
+
+ while (list)
+ {
+ File file;
+ IO_CACHE cache;
+
+ if (!list->next)
+ {
+ /*
+ It should be safe to read the currently used binlog, as we will only
+ read the header part that is already written.
+
+ But if that does not work on windows, then we will need to cache the
+ event somewhere in memory I suppose - that could work too.
+ */
+ }
+ /*
+ Read the Gtid_list_log_event at the start of the binlog file to
+ get the binlog state.
+ */
+ if (normalize_binlog_name(buf, list->name, false))
+ {
+ errormsg= "Failed to determine binlog file name while looking for "
+ "GTID position in binlog";
+ goto end;
+ }
+ bzero((char*) &cache, sizeof(cache));
+ if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1)
+ goto end;
+ errormsg= get_gtid_list_event(&cache, &glev);
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+ if (errormsg)
+ goto end;
+
+ if (!glev || contains_all_slave_gtid(state, glev))
+ {
+ uint32 i;
+
+ strmake(out_name, buf, FN_REFLEN);
+
+ /*
+ As a special case, we allow to start from binlog file N if the
+ requested GTID is the last event (in the corresponding domain) in
+ binlog file (N-1), but then we need to remove that GTID from the slave
+ state, rather than skipping events waiting for it to turn up.
+ */
+ for (i= 0; i < glev->count; ++i)
+ {
+ const rpl_gtid *gtid= state->find(glev->list[i].domain_id);
+ if (!gtid)
+ {
+ /* contains_all_slave_gtid() would have returned false if so. */
+ DBUG_ASSERT(0);
+ continue;
+ }
+ if (gtid->server_id == glev->list[i].server_id &&
+ gtid->seq_no == glev->list[i].seq_no)
+ {
+ /*
+ The slave requested to start from the very beginning of this
+ domain in this binlog file. So delete the entry from the state,
+ we do not need to skip anything.
+ */
+ state->remove(gtid);
+ }
+ }
+
+ goto end;
+ }
+ delete glev;
+ glev= NULL;
+ list= list->next;
+ }
+
+ /* We reached the end without finding anything. */
+ errormsg= "Could not find GTID state requested by slave in any binlog "
+ "files. Probably the slave state is too old and required binlog files "
+ "have been purged.";
+
+end:
+ if (glev)
+ delete glev;
+
+ free_root(&memroot, MYF(0));
+ return errormsg;
+}
+
+
+/*
+ Given an old-style binlog position with file name and file offset, find the
+ corresponding gtid position. If the offset is not at an event boundary, give
+ an error.
+
+ Return NULL on ok, error message string on error.
+
+ ToDo: Improve the performance of this by using binlog index files.
+*/
+static const char *
+gtid_state_from_pos(const char *name, uint32 offset,
+ slave_connection_state *gtid_state)
+{
+ IO_CACHE cache;
+ File file;
+ const char *errormsg= NULL;
+ bool found_gtid_list_event= false;
+ bool found_format_description_event= false;
+ bool valid_pos= false;
+ uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
+ int err;
+ String packet;
+
+ if (gtid_state->load((const rpl_gtid *)NULL, 0))
+ {
+ errormsg= "Internal error (out of memory?) initializing slave state "
+ "while scanning binlog to find start position";
+ return errormsg;
+ }
+
+ if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1)
+ return errormsg;
+
+ /*
+ First we need to find the initial GTID_LIST_EVENT. We need this even
+ if the offset is at the very start of the binlog file.
+
+ But if we do not find any GTID_LIST_EVENT, then this is an old binlog
+ with no GTID information, so we return empty GTID state.
+ */
+ for (;;)
+ {
+ Log_event_type typ;
+ uint32 cur_pos;
+
+ cur_pos= (uint32)my_b_tell(&cache);
+ if (cur_pos == offset)
+ valid_pos= true;
+ if (found_format_description_event && found_gtid_list_event &&
+ cur_pos >= offset)
+ break;
+
+ packet.length(0);
+ err= Log_event::read_log_event(&cache, &packet, NULL,
+ current_checksum_alg);
+ if (err)
+ {
+ errormsg= "Could not read binlog while searching for slave start "
+ "position on master";
+ goto end;
+ }
+ /*
+ The cast to uchar is needed to avoid a signed char being converted to a
+ negative number.
+ */
+ typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET];
+ if (typ == FORMAT_DESCRIPTION_EVENT)
+ {
+ if (found_format_description_event)
+ {
+ errormsg= "Duplicate format description log event found while "
+ "searching for old-style position in binlog";
+ goto end;
+ }
+
+ current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length());
+ found_format_description_event= true;
+ }
+ else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event)
+ {
+ errormsg= "Did not find format description log event while searching "
+ "for old-style position in binlog";
+ goto end;
+ }
+ else if (typ == ROTATE_EVENT || typ == STOP_EVENT ||
+ typ == BINLOG_CHECKPOINT_EVENT)
+ continue; /* Continue looking */
+ else if (typ == GTID_LIST_EVENT)
+ {
+ rpl_gtid *gtid_list;
+ bool status;
+ uint32 list_len;
+
+ if (found_gtid_list_event)
+ {
+ errormsg= "Found duplicate Gtid_list_log_event while scanning binlog "
+ "to find slave start position";
+ goto end;
+ }
+ status= Gtid_list_log_event::peek(packet.ptr(), packet.length(),
+ &gtid_list, &list_len);
+ if (status)
+ {
+ errormsg= "Error reading Gtid_list_log_event while searching "
+ "for old-style position in binlog";
+ goto end;
+ }
+ err= gtid_state->load(gtid_list, list_len);
+ my_free(gtid_list);
+ if (err)
+ {
+ errormsg= "Internal error (out of memory?) initialising slave state "
+ "while scanning binlog to find start position";
+ goto end;
+ }
+ found_gtid_list_event= true;
+ }
+ else if (!found_gtid_list_event)
+ {
+ /* We did not find any Gtid_list_log_event, must be old binlog. */
+ goto end;
+ }
+ else if (typ == GTID_EVENT)
+ {
+ rpl_gtid gtid;
+ uchar flags2;
+ if (Gtid_log_event::peek(packet.ptr(), packet.length(), &gtid.domain_id,
+ &gtid.server_id, &gtid.seq_no, &flags2))
+ {
+ errormsg= "Corrupt gtid_log_event found while scanning binlog to find "
+ "initial slave position";
+ goto end;
+ }
+ if (gtid_state->update(&gtid))
+ {
+ errormsg= "Internal error (out of memory?) updating slave state while "
+ "scanning binlog to find start position";
+ goto end;
+ }
+ }
+ }
+
+ if (!valid_pos)
+ {
+ errormsg= "Slave requested incorrect position in master binlog. "
+ "Requested position %u in file '%s', but this position does not "
+ "correspond to the location of any binlog event.";
+ }
+
+end:
+ end_io_cache(&cache);
+ mysql_file_close(file, MYF(MY_WME));
+
+ return errormsg;
+}
+
+
+int
+gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str)
+{
+ slave_connection_state gtid_state;
+ const char *lookup_name;
+ char name_buf[FN_REFLEN];
+ LOG_INFO linfo;
+
+ if (!mysql_bin_log.is_open())
+ {
+ my_error(ER_NO_BINARY_LOGGING, MYF(0));
+ return 1;
+ }
+
+ if (in_name && in_name[0])
+ {
+ mysql_bin_log.make_log_name(name_buf, in_name);
+ lookup_name= name_buf;
+ }
+ else
+ lookup_name= NULL;
+ linfo.index_file_offset= 0;
+ if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1))
+ return 1;
+
+ if (pos < 4)
+ pos= 4;
+
+ if (gtid_state_from_pos(linfo.log_file_name, pos, &gtid_state) ||
+ gtid_state.to_string(out_str))
+ return 1;
+ return 0;
+}
+
+
/*
Helper function for mysql_binlog_send() to write an event down the slave
connection.
@@ -577,9 +1236,63 @@ static const char *
send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
Log_event_type event_type, char *log_file_name,
IO_CACHE *log, int mariadb_slave_capability,
- ulong ev_offset, uint8 current_checksum_alg)
+ ulong ev_offset, uint8 current_checksum_alg,
+ bool using_gtid_state, slave_connection_state *gtid_state,
+ enum_gtid_skip_type *gtid_skip_group)
{
my_off_t pos;
+ size_t len= packet->length();
+
+ /* Skip GTID event groups until we reach slave position within a domain_id. */
+ if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0)
+ {
+ uint32 server_id, domain_id;
+ uint64 seq_no;
+ uchar flags2;
+ rpl_gtid *gtid;
+
+ if (ev_offset > len ||
+ Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset,
+ &domain_id, &server_id, &seq_no, &flags2))
+ return "Failed to read Gtid_log_event: corrupt binlog";
+ gtid= gtid_state->find(domain_id);
+ if (gtid != NULL)
+ {
+ /* Skip this event group if we have not yet reached slave start pos. */
+ if (server_id != gtid->server_id || seq_no <= gtid->seq_no)
+ *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ /*
+ Delete this entry if we have reached slave start position (so we will
+ not skip subsequent events and won't have to look them up and check).
+ */
+ if (server_id == gtid->server_id && seq_no >= gtid->seq_no)
+ gtid_state->remove(gtid);
+ }
+ }
+
+ /*
+ Skip event group if we have not yet reached the correct slave GTID position.
+
+ Note that slave that understands GTID can also tolerate holes, so there is
+ no need to supply dummy event.
+ */
+ switch (*gtid_skip_group)
+ {
+ case GTID_SKIP_STANDALONE:
+ if (!Log_event::is_part_of_group(event_type))
+ *gtid_skip_group= GTID_SKIP_NOT;
+ return NULL;
+ case GTID_SKIP_TRANSACTION:
+ if (event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset,
+ len - ev_offset)))
+ *gtid_skip_group= GTID_SKIP_NOT;
+ return NULL;
+ case GTID_SKIP_NOT:
+ break;
+ }
/* Do not send annotate_rows events unless slave requested it. */
if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
@@ -616,10 +1329,34 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
}
/*
- Do not send binlog checkpoint events to a slave that does not understand it.
+ Replace GTID events with old-style BEGIN events for slaves that do not
+ understand global transaction IDs. For stand-alone events, where there is
+ no terminating COMMIT query event, omit the GTID event or replace it with
+ a dummy event, as appropriate.
+ */
+ if (event_type == GTID_EVENT &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)
+ {
+ bool need_dummy=
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES;
+ bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy,
+ ev_offset,
+ current_checksum_alg);
+ if (err)
+ return "Failed to replace GTID event with backwards-compatible event: "
+ "currupt event.";
+ if (!need_dummy)
+ return NULL;
+ }
+
+ /*
+ Do not send binlog checkpoint or gtid list events to a slave that does not
+ understand it.
*/
- if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
- mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT)
+ if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) ||
+ (unlikely(event_type == GTID_LIST_EVENT) &&
+ mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID))
{
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
{
@@ -634,8 +1371,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
binlog positions.
*/
if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg))
- return "Failed to replace binlog checkpoint event with dummy: "
- "too small event.";
+ return "Failed to replace binlog checkpoint or gtid list event with "
+ "dummy: too small event.";
}
}
@@ -661,7 +1398,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
(thd, flags, packet, log_file_name, pos)))
return "run 'before_send_event' hook failed";
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if (my_net_write(net, (uchar*) packet->ptr(), len))
return "Failed on my_net_write()";
DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] ));
@@ -696,6 +1433,12 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
mysql_mutex_t *log_lock;
mysql_cond_t *log_cond;
int mariadb_slave_capability;
+ char str_buf[256];
+ String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info);
+ bool using_gtid_state;
+ slave_connection_state gtid_state, return_gtid_state;
+ rpl_gtid error_gtid;
+ enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT;
uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int old_max_allowed_packet= thd->variables.max_allowed_packet;
@@ -706,6 +1449,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ bzero(&error_gtid, sizeof(error_gtid));
/*
heartbeat_period from @master_heartbeat_period user variable
*/
@@ -722,9 +1466,25 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
set_timespec_nsec(*heartbeat_ts, 0);
}
mariadb_slave_capability= get_mariadb_slave_capability(thd);
+
+ connect_gtid_state.length(0);
+ using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
+ DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;);
+ /*
+ We want to corrupt the first event, in Log_event::read_log_event().
+ But we do not want the corruption to happen early, eg. when client does
+ BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to
+ set the real DBUG injection here.
+ */
+ DBUG_EXECUTE_IF("corrupt_read_log_event2_set",
+ {
+ DBUG_SET("-d,corrupt_read_log_event2_set");
+ DBUG_SET("+d,corrupt_read_log_event2");
+ });
+
if (global_system_variables.log_warnings > 1)
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
- thd->server_id, log_ident, (ulong)pos);
+ (int)thd->variables.server_id, log_ident, (ulong)pos);
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
{
errmsg= "Failed to run hook 'transmit_start'";
@@ -755,10 +1515,36 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
}
name=search_file_name;
- if (log_ident[0])
- mysql_bin_log.make_log_name(search_file_name, log_ident);
+ if (using_gtid_state)
+ {
+ if (gtid_state.load(connect_gtid_state.c_ptr_quick(),
+ connect_gtid_state.length()))
+ {
+ errmsg= "Out of memory or malformed slave request when obtaining start "
+ "position from GTID state";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ if ((error= check_slave_start_position(thd, &gtid_state, &errmsg,
+ &error_gtid)))
+ {
+ my_errno= error;
+ goto err;
+ }
+ if ((errmsg= gtid_find_binlog_file(&gtid_state, search_file_name)))
+ {
+ my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ goto err;
+ }
+ pos= 4;
+ }
else
- name=0; // Find first log
+ {
+ if (log_ident[0])
+ mysql_bin_log.make_log_name(search_file_name, log_ident);
+ else
+ name=0; // Find first log
+ }
linfo.index_file_offset = 0;
@@ -1012,7 +1798,8 @@ impossible position";
if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
- current_checksum_alg)))
+ current_checksum_alg, using_gtid_state,
+ &gtid_state, &gtid_skip_group)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
@@ -1105,7 +1892,8 @@ impossible position";
int ret;
ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log"));
- if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0)
+ /* For mysqlbinlog (mysqlbinlog.server_id==0). */
+ if (thd->variables.server_id==0)
{
mysql_mutex_unlock(log_lock);
goto end;
@@ -1172,7 +1960,9 @@ impossible position";
(tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type,
log_file_name, &log,
mariadb_slave_capability, ev_offset,
- current_checksum_alg)))
+ current_checksum_alg,
+ using_gtid_state, &gtid_state,
+ &gtid_skip_group)))
{
errmsg= tmp_msg;
my_errno= ER_UNKNOWN_ERROR;
@@ -1264,6 +2054,22 @@ err:
my_basename(p_coord->file_name), p_coord->pos,
my_basename(log_file_name), my_b_tell(&log));
}
+ else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "Error: connecting slave requested to start from GTID "
+ "%u-%u-%llu, which is not in the master's binlog",
+ error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no);
+ /* Use this error code so slave will know not to try reconnect. */
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
+ else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE)
+ {
+ my_snprintf(error_text, sizeof(error_text),
+ "Failed to load replication slave GTID state from table %s.%s",
+ "mysql", rpl_gtid_slave_state_table_name.str);
+ my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ }
else
strcpy(error_text, errmsg);
end_io_cache(&log);
@@ -1629,7 +2435,7 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
while ((tmp=it++))
{
if (tmp->command == COM_BINLOG_DUMP &&
- tmp->server_id == slave_server_id)
+ tmp->variables.server_id == slave_server_id)
{
mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
break;
@@ -1838,7 +2644,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
{
ulong s_id;
get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
- if (s_id == ::server_id && replicate_same_server_id)
+ if (s_id == global_system_variables.server_id && replicate_same_server_id)
{
my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id));
ret= TRUE;
@@ -1898,6 +2704,13 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
}
+ if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_ENABLE)
+ mi->using_gtid= true;
+ else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_DISABLE ||
+ lex_mi->log_file_name || lex_mi->pos ||
+ lex_mi->relay_log_name || lex_mi->relay_log_pos)
+ mi->using_gtid= false;
+
/*
If user did specify neither host nor port nor any log name nor any log
pos, i.e. he specified only user/password/master_connect_retry, he probably
@@ -1928,6 +2741,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
strmake(mi->master_log_name, mi->rli.group_master_log_name,
sizeof(mi->master_log_name)-1);
}
+
/*
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
a slave before).
@@ -1950,6 +2764,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
ret= TRUE;
goto err;
}
+
+ if (mi->using_gtid)
+ {
+ /*
+ Clear the position in the master binlogs, so that we request the
+ correct GTID position.
+ */
+ mi->master_log_name[0]= 0;
+ mi->master_log_pos= 0;
+ }
}
else
{
@@ -2423,4 +3247,139 @@ int log_loaded_block(IO_CACHE* file)
DBUG_RETURN(0);
}
+
+/**
+ Initialise the slave replication state from the mysql.rpl_slave_state table.
+
+ This is called each time an SQL thread starts, but the data is only actually
+ loaded on the first call.
+
+ The slave state is the last GTID applied on the slave within each
+ replication domain.
+
+ To avoid row lock contention, there are multiple rows for each domain_id.
+ The one containing the current slave state is the one with the maximal
+ sub_id value, within each domain_id.
+
+ CREATE TABLE mysql.rpl_slave_state (
+ domain_id INT UNSIGNED NOT NULL,
+ sub_id BIGINT UNSIGNED NOT NULL,
+ server_id INT UNSIGNED NOT NULL,
+ seq_no BIGINT UNSIGNED NOT NULL,
+ PRIMARY KEY (domain_id, sub_id))
+*/
+
+void
+rpl_init_gtid_slave_state()
+{
+ rpl_global_gtid_slave_state.init();
+}
+
+
+void
+rpl_deinit_gtid_slave_state()
+{
+ rpl_global_gtid_slave_state.deinit();
+}
+
+
+/*
+ Format the current GTID state as a string, for use when connecting to a
+ master server with GTID, or for returning the value of @@global.gtid_state.
+
+ If the flag use_binlog is true, then the contents of the binary log (if
+ enabled) is merged into the current GTID state.
+*/
+int
+rpl_append_gtid_state(String *dest, bool use_binlog)
+{
+ int err;
+ rpl_gtid *gtid_list= NULL;
+ uint32 num_gtids= 0;
+
+ if (opt_bin_log &&
+ (err= mysql_bin_log.get_most_recent_gtid_list(&gtid_list, &num_gtids)))
+ return err;
+
+ rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids);
+ my_free(gtid_list);
+
+ return 0;
+}
+
+
+bool
+rpl_gtid_pos_check(char *str, size_t len)
+{
+ slave_connection_state tmp_slave_state;
+
+ /* Check that we can parse the supplied string. */
+ if (tmp_slave_state.load(str, len))
+ return true;
+
+ /*
+ Check our own binlog for any of our own transactions that are newer
+ than the GTID state the user is requesting. Any such transactions would
+ result in an out-of-order binlog, which could break anyone replicating
+ with us as master.
+
+ So give an error if this is found, requesting the user to do a
+ RESET MASTER (to clean up the binlog) if they really want this.
+ */
+ if (mysql_bin_log.is_open())
+ {
+ rpl_gtid *binlog_gtid_list= NULL;
+ uint32 num_binlog_gtids= 0;
+ uint32 i;
+
+ if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list,
+ &num_binlog_gtids))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ return true;
+ }
+ for (i= 0; i < num_binlog_gtids; ++i)
+ {
+ rpl_gtid *binlog_gtid= &binlog_gtid_list[i];
+ rpl_gtid *slave_gtid;
+ if (binlog_gtid->server_id != global_system_variables.server_id)
+ continue;
+ if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id)))
+ {
+ my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0),
+ binlog_gtid->domain_id, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ break;
+ }
+ if (slave_gtid->seq_no < binlog_gtid->seq_no)
+ {
+ my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0),
+ slave_gtid->domain_id, slave_gtid->server_id,
+ slave_gtid->seq_no, binlog_gtid->domain_id,
+ binlog_gtid->server_id, binlog_gtid->seq_no);
+ break;
+ }
+ }
+ my_free(binlog_gtid_list);
+ if (i != num_binlog_gtids)
+ return true;
+ }
+
+ return false;
+}
+
+
+bool
+rpl_gtid_pos_update(THD *thd, char *str, size_t len)
+{
+ if (rpl_global_gtid_slave_state.load(thd, str, len, true))
+ {
+ my_error(ER_FAILED_GTID_STATE_INIT, MYF(0));
+ return true;
+ }
+ else
+ return false;
+}
+
+
#endif /* HAVE_REPLICATION */