diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 995 |
1 files changed, 977 insertions, 18 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 62336073b28..d767fb50cae 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -16,10 +16,12 @@ #include "sql_priv.h" #include "unireg.h" +#include "sql_base.h" #include "sql_parse.h" // check_access #ifdef HAVE_REPLICATION #include "rpl_mi.h" +#include "rpl_rli.h" #include "sql_repl.h" #include "sql_acl.h" // SUPER_ACL #include "log_event.h" @@ -81,7 +83,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, uint ident_len = (uint) strlen(p); ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - int4store(header + SERVER_ID_OFFSET, server_id); + int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); @@ -505,6 +507,26 @@ get_mariadb_slave_capability(THD *thd) /* + Get the value of the @slave_connect_state user variable into the supplied + String (this is the GTID connect state requested by the connecting slave). + + Returns false if error (ie. slave did not set the variable and does not + want to use GTID to set start position), true if success. +*/ +static bool +get_slave_connect_state(THD *thd, String *out_str) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_str(&null_value, out_str, 0) && !null_value; +} + + +/* Function prepares and sends repliation heartbeat event. @param net net object of THD @@ -539,7 +561,7 @@ static int send_heartbeat_event(NET* net, String* packet, uint ident_len = strlen(p); ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - int4store(header + SERVER_ID_OFFSET, server_id); + int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, 0); @@ -567,6 +589,643 @@ static int send_heartbeat_event(NET* net, String* packet, } +struct binlog_file_entry +{ + binlog_file_entry *next; + char *name; +}; + +static binlog_file_entry * +get_binlog_list(MEM_ROOT *memroot) +{ + IO_CACHE *index_file; + char fname[FN_REFLEN]; + size_t length; + binlog_file_entry *current_list= NULL, *e; + DBUG_ENTER("get_binlog_list"); + + if (!mysql_bin_log.is_open()) + { + my_error(ER_NO_BINARY_LOGGING, MYF(0)); + DBUG_RETURN(NULL); + } + + mysql_bin_log.lock_index(); + index_file=mysql_bin_log.get_index_file(); + reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); + + /* The file ends with EOF or empty line */ + while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) + { + --length; /* Remove the newline */ + if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) || + !(e->name= strmake_root(memroot, fname, length))) + { + mysql_bin_log.unlock_index(); + my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e)); + DBUG_RETURN(NULL); + } + e->next= current_list; + current_list= e; + } + mysql_bin_log.unlock_index(); + + DBUG_RETURN(current_list); +} + +/* + Find the Gtid_list_log_event at the start of a binlog. + + NULL for ok, non-NULL error message for error. + + If ok, then the event is returned in *out_gtid_list. This can be NULL if we + get back to binlogs written by old server version without GTID support. If + so, it means we have reached the point to start from, as no GTID events can + exist in earlier binlogs. +*/ +static const char * +get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) +{ + Format_description_log_event init_fdle(BINLOG_VERSION); + Format_description_log_event *fdle; + Log_event *ev; + const char *errormsg = NULL; + + *out_gtid_list= NULL; + + if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle, + opt_master_verify_checksum)) || + ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + { + if (ev) + delete ev; + return "Could not read format description log event while looking for " + "GTID position in binlog"; + } + + fdle= static_cast<Format_description_log_event *>(ev); + + for (;;) + { + Log_event_type typ; + + ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum); + if (!ev) + { + errormsg= "Could not read GTID list event while looking for GTID " + "position in binlog"; + break; + } + typ= ev->get_type_code(); + if (typ == GTID_LIST_EVENT) + break; /* Done, found it */ + delete ev; + if (typ == ROTATE_EVENT || typ == STOP_EVENT || + typ == FORMAT_DESCRIPTION_EVENT) + continue; /* Continue looking */ + + /* We did not find any Gtid_list_log_event, must be old binlog. */ + ev= NULL; + break; + } + + delete fdle; + *out_gtid_list= static_cast<Gtid_list_log_event *>(ev); + return errormsg; +} + + +/* + Check if every GTID requested by the slave is contained in this (or a later) + binlog file. Return true if so, false if not. + + We do the check with a single scan of the list of GTIDs, avoiding the need + to build an in-memory hash or stuff like that. + + We need to check that slave did not request GTID D-S-N1, when the + Gtid_list_log_event for this binlog file has D-S-N2 with N2 > N1. + + In addition, we need to check that we do not have a GTID D-S-N3 in the + Gtid_list_log_event where D is not present in the requested slave state at + all. Since if D is not in requested slave state, it means that slave needs + to start at the very first GTID in domain D. +*/ +static bool +contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev) +{ + uint32 i; + + for (i= 0; i < glev->count; ++i) + { + const rpl_gtid *gtid= st->find(glev->list[i].domain_id); + if (!gtid) + { + /* + The slave needs to start from the very beginning of this domain, which + is in an earlier binlog file. So we need to search back further. + */ + return false; + } + if (gtid->server_id == glev->list[i].server_id && + gtid->seq_no < glev->list[i].seq_no) + { + /* + The slave needs to receive gtid, but it is contained in an earlier + binlog file. So we need to search back further. + */ + return false; + } + } + + return true; +} + + +/* + Check the start GTID state requested by the slave against our binlog state. + + Give an error if the slave requests something that we do not have in our + binlog. + + T +*/ + +static int +check_slave_start_position(THD *thd, slave_connection_state *st, + const char **errormsg, rpl_gtid *error_gtid) +{ + uint32 i; + bool found; + int err; + rpl_gtid **delete_list= NULL; + uint32 delete_idx= 0; + bool slave_state_loaded= false; + uint32 missing_domains= 0; + rpl_gtid missing_domain_gtid; + + for (i= 0; i < st->hash.records; ++i) + { + rpl_gtid *slave_gtid= (rpl_gtid *)my_hash_element(&st->hash, i); + rpl_gtid master_gtid; + rpl_gtid master_replication_gtid; + rpl_gtid start_gtid; + + if ((found= mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, + slave_gtid->server_id, + &master_gtid)) && + master_gtid.seq_no >= slave_gtid->seq_no) + continue; + + if (!slave_state_loaded) + { + if (rpl_load_gtid_slave_state(thd)) + { + *errormsg= "Failed to load replication slave GTID state"; + err= ER_CANNOT_LOAD_SLAVE_GTID_STATE; + goto end; + } + slave_state_loaded= true; + } + + if (!rpl_global_gtid_slave_state.domain_to_gtid(slave_gtid->domain_id, + &master_replication_gtid) || + slave_gtid->server_id != master_replication_gtid.server_id || + slave_gtid->seq_no != master_replication_gtid.seq_no) + { + rpl_gtid domain_gtid; + + if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, + &domain_gtid)) + { + /* + We do not have anything in this domain, neither in the binlog nor + in the slave state. So we are probably one master in a multi-master + setup, and this domain is served by a different master. + + This is not an error, however if we are missing _all_ domains + requested by the slave, then we still give error (below, after + the loop). + */ + if (!missing_domains) + missing_domain_gtid= *slave_gtid; + ++missing_domains; + continue; + } + *errormsg= "Requested slave GTID state not found in binlog"; + *error_gtid= *slave_gtid; + err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG; + goto end; + } + + /* + Ok, so connecting slave asked to start at a GTID that we do not have in + our binlog, but it was in fact the last GTID we applied earlier, when we + were acting as a replication slave. + + So this means that we were running as a replication slave without + --log-slave-updates, but now we switched to be a master. It is worth it + to handle this special case, as it allows users to run a simple + master -> slave without --log-slave-updates, and then exchange slave and + master, as long as they make sure the slave is caught up before switching. + */ + + /* + First check if we logged something ourselves as a master after being a + slave. This will be seen as a GTID with our own server_id and bigger + seq_no than what is in the slave state. + + If we did not log anything ourselves, then start the connecting slave + replicating from the current binlog end position, which in this case + corresponds to our replication slave state and hence what the connecting + slave is requesting. + */ + if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, + global_system_variables.server_id, + &start_gtid) && + start_gtid.seq_no > slave_gtid->seq_no) + { + /* + Start replication within this domain at the first GTID that we logged + ourselves after becoming a master. + */ + slave_gtid->server_id= global_system_variables.server_id; + } + else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, + &start_gtid)) + { + slave_gtid->server_id= start_gtid.server_id; + slave_gtid->seq_no= start_gtid.seq_no; + } + else + { + /* + We do not have _anything_ in our own binlog for this domain. Just + delete the entry in the slave connection state, then it will pick up + anything new that arrives. + + We just queue up the deletion and do it later, after the loop, so that + we do not mess up the iteration over the hash. + */ + if (!delete_list) + { + if ((delete_list= (rpl_gtid **)my_malloc(sizeof(*delete_list), + MYF(MY_WME)))) + { + *errormsg= "Out of memory while checking slave start position"; + err= ER_OUT_OF_RESOURCES; + goto end; + } + } + delete_list[delete_idx++]= slave_gtid; + } + } + + if (missing_domains == st->hash.records && missing_domains > 0) + { + *errormsg= "Requested slave GTID state not found in binlog"; + *error_gtid= missing_domain_gtid; + err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG; + goto end; + } + + /* Do any delayed deletes from the hash. */ + if (delete_list) + { + for (i= 0; i < delete_idx; ++i) + st->remove(delete_list[i]); + } + err= 0; + +end: + if (delete_list) + my_free(delete_list); + return err; +} + +/* + Find the name of the binlog file to start reading for a slave that connects + using GTID state. + + Returns the file name in out_name, which must be of size at least FN_REFLEN. + + Returns NULL on ok, error message on error. + + In case of non-error return, the returned binlog file is guaranteed to + contain the first event to be transmitted to the slave for every domain + present in our binlogs. It is still necessary to skip all GTIDs up to + and including the GTID requested by slave within each domain. + + However, as a special case, if the event to be sent to the slave is the very + first event (within that domain) in the returned binlog, then nothing should + be skipped, so that domain is deleted from the passed in slave connection + state. + + This is necessary in case the slave requests a GTID within a replication + domain that has long been inactive. The binlog file containing that GTID may + have been long since purged. However, as long as no GTIDs after that have + been purged, we have the GTID requested by slave in the Gtid_list_log_event + of the latest binlog. So we can start from there, as long as we delete the + corresponding entry in the slave state so we do not wrongly skip any events + that might turn up if that domain becomes active again, vainly looking for + the requested GTID that was already purged. +*/ +static const char * +gtid_find_binlog_file(slave_connection_state *state, char *out_name) +{ + MEM_ROOT memroot; + binlog_file_entry *list; + Gtid_list_log_event *glev= NULL; + const char *errormsg= NULL; + char buf[FN_REFLEN]; + + init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0, + MYF(MY_THREAD_SPECIFIC)); + if (!(list= get_binlog_list(&memroot))) + { + errormsg= "Out of memory while looking for GTID position in binlog"; + goto end; + } + + while (list) + { + File file; + IO_CACHE cache; + + if (!list->next) + { + /* + It should be safe to read the currently used binlog, as we will only + read the header part that is already written. + + But if that does not work on windows, then we will need to cache the + event somewhere in memory I suppose - that could work too. + */ + } + /* + Read the Gtid_list_log_event at the start of the binlog file to + get the binlog state. + */ + if (normalize_binlog_name(buf, list->name, false)) + { + errormsg= "Failed to determine binlog file name while looking for " + "GTID position in binlog"; + goto end; + } + bzero((char*) &cache, sizeof(cache)); + if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1) + goto end; + errormsg= get_gtid_list_event(&cache, &glev); + end_io_cache(&cache); + mysql_file_close(file, MYF(MY_WME)); + if (errormsg) + goto end; + + if (!glev || contains_all_slave_gtid(state, glev)) + { + uint32 i; + + strmake(out_name, buf, FN_REFLEN); + + /* + As a special case, we allow to start from binlog file N if the + requested GTID is the last event (in the corresponding domain) in + binlog file (N-1), but then we need to remove that GTID from the slave + state, rather than skipping events waiting for it to turn up. + */ + for (i= 0; i < glev->count; ++i) + { + const rpl_gtid *gtid= state->find(glev->list[i].domain_id); + if (!gtid) + { + /* contains_all_slave_gtid() would have returned false if so. */ + DBUG_ASSERT(0); + continue; + } + if (gtid->server_id == glev->list[i].server_id && + gtid->seq_no == glev->list[i].seq_no) + { + /* + The slave requested to start from the very beginning of this + domain in this binlog file. So delete the entry from the state, + we do not need to skip anything. + */ + state->remove(gtid); + } + } + + goto end; + } + delete glev; + glev= NULL; + list= list->next; + } + + /* We reached the end without finding anything. */ + errormsg= "Could not find GTID state requested by slave in any binlog " + "files. Probably the slave state is too old and required binlog files " + "have been purged."; + +end: + if (glev) + delete glev; + + free_root(&memroot, MYF(0)); + return errormsg; +} + + +/* + Given an old-style binlog position with file name and file offset, find the + corresponding gtid position. If the offset is not at an event boundary, give + an error. + + Return NULL on ok, error message string on error. + + ToDo: Improve the performance of this by using binlog index files. +*/ +static const char * +gtid_state_from_pos(const char *name, uint32 offset, + slave_connection_state *gtid_state) +{ + IO_CACHE cache; + File file; + const char *errormsg= NULL; + bool found_gtid_list_event= false; + bool found_format_description_event= false; + bool valid_pos= false; + uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; + int err; + String packet; + + if (gtid_state->load((const rpl_gtid *)NULL, 0)) + { + errormsg= "Internal error (out of memory?) initializing slave state " + "while scanning binlog to find start position"; + return errormsg; + } + + if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1) + return errormsg; + + /* + First we need to find the initial GTID_LIST_EVENT. We need this even + if the offset is at the very start of the binlog file. + + But if we do not find any GTID_LIST_EVENT, then this is an old binlog + with no GTID information, so we return empty GTID state. + */ + for (;;) + { + Log_event_type typ; + uint32 cur_pos; + + cur_pos= (uint32)my_b_tell(&cache); + if (cur_pos == offset) + valid_pos= true; + if (found_format_description_event && found_gtid_list_event && + cur_pos >= offset) + break; + + packet.length(0); + err= Log_event::read_log_event(&cache, &packet, NULL, + current_checksum_alg); + if (err) + { + errormsg= "Could not read binlog while searching for slave start " + "position on master"; + goto end; + } + /* + The cast to uchar is needed to avoid a signed char being converted to a + negative number. + */ + typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET]; + if (typ == FORMAT_DESCRIPTION_EVENT) + { + if (found_format_description_event) + { + errormsg= "Duplicate format description log event found while " + "searching for old-style position in binlog"; + goto end; + } + + current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length()); + found_format_description_event= true; + } + else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event) + { + errormsg= "Did not find format description log event while searching " + "for old-style position in binlog"; + goto end; + } + else if (typ == ROTATE_EVENT || typ == STOP_EVENT || + typ == BINLOG_CHECKPOINT_EVENT) + continue; /* Continue looking */ + else if (typ == GTID_LIST_EVENT) + { + rpl_gtid *gtid_list; + bool status; + uint32 list_len; + + if (found_gtid_list_event) + { + errormsg= "Found duplicate Gtid_list_log_event while scanning binlog " + "to find slave start position"; + goto end; + } + status= Gtid_list_log_event::peek(packet.ptr(), packet.length(), + >id_list, &list_len); + if (status) + { + errormsg= "Error reading Gtid_list_log_event while searching " + "for old-style position in binlog"; + goto end; + } + err= gtid_state->load(gtid_list, list_len); + my_free(gtid_list); + if (err) + { + errormsg= "Internal error (out of memory?) initialising slave state " + "while scanning binlog to find start position"; + goto end; + } + found_gtid_list_event= true; + } + else if (!found_gtid_list_event) + { + /* We did not find any Gtid_list_log_event, must be old binlog. */ + goto end; + } + else if (typ == GTID_EVENT) + { + rpl_gtid gtid; + uchar flags2; + if (Gtid_log_event::peek(packet.ptr(), packet.length(), >id.domain_id, + >id.server_id, >id.seq_no, &flags2)) + { + errormsg= "Corrupt gtid_log_event found while scanning binlog to find " + "initial slave position"; + goto end; + } + if (gtid_state->update(>id)) + { + errormsg= "Internal error (out of memory?) updating slave state while " + "scanning binlog to find start position"; + goto end; + } + } + } + + if (!valid_pos) + { + errormsg= "Slave requested incorrect position in master binlog. " + "Requested position %u in file '%s', but this position does not " + "correspond to the location of any binlog event."; + } + +end: + end_io_cache(&cache); + mysql_file_close(file, MYF(MY_WME)); + + return errormsg; +} + + +int +gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str) +{ + slave_connection_state gtid_state; + const char *lookup_name; + char name_buf[FN_REFLEN]; + LOG_INFO linfo; + + if (!mysql_bin_log.is_open()) + { + my_error(ER_NO_BINARY_LOGGING, MYF(0)); + return 1; + } + + if (in_name && in_name[0]) + { + mysql_bin_log.make_log_name(name_buf, in_name); + lookup_name= name_buf; + } + else + lookup_name= NULL; + linfo.index_file_offset= 0; + if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1)) + return 1; + + if (pos < 4) + pos= 4; + + if (gtid_state_from_pos(linfo.log_file_name, pos, >id_state) || + gtid_state.to_string(out_str)) + return 1; + return 0; +} + + /* Helper function for mysql_binlog_send() to write an event down the slave connection. @@ -577,9 +1236,63 @@ static const char * send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Log_event_type event_type, char *log_file_name, IO_CACHE *log, int mariadb_slave_capability, - ulong ev_offset, uint8 current_checksum_alg) + ulong ev_offset, uint8 current_checksum_alg, + bool using_gtid_state, slave_connection_state *gtid_state, + enum_gtid_skip_type *gtid_skip_group) { my_off_t pos; + size_t len= packet->length(); + + /* Skip GTID event groups until we reach slave position within a domain_id. */ + if (event_type == GTID_EVENT && using_gtid_state && gtid_state->count() > 0) + { + uint32 server_id, domain_id; + uint64 seq_no; + uchar flags2; + rpl_gtid *gtid; + + if (ev_offset > len || + Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + &domain_id, &server_id, &seq_no, &flags2)) + return "Failed to read Gtid_log_event: corrupt binlog"; + gtid= gtid_state->find(domain_id); + if (gtid != NULL) + { + /* Skip this event group if we have not yet reached slave start pos. */ + if (server_id != gtid->server_id || seq_no <= gtid->seq_no) + *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + /* + Delete this entry if we have reached slave start position (so we will + not skip subsequent events and won't have to look them up and check). + */ + if (server_id == gtid->server_id && seq_no >= gtid->seq_no) + gtid_state->remove(gtid); + } + } + + /* + Skip event group if we have not yet reached the correct slave GTID position. + + Note that slave that understands GTID can also tolerate holes, so there is + no need to supply dummy event. + */ + switch (*gtid_skip_group) + { + case GTID_SKIP_STANDALONE: + if (!Log_event::is_part_of_group(event_type)) + *gtid_skip_group= GTID_SKIP_NOT; + return NULL; + case GTID_SKIP_TRANSACTION: + if (event_type == XID_EVENT || + (event_type == QUERY_EVENT && + Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset, + len - ev_offset))) + *gtid_skip_group= GTID_SKIP_NOT; + return NULL; + case GTID_SKIP_NOT: + break; + } /* Do not send annotate_rows events unless slave requested it. */ if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) @@ -616,10 +1329,34 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, } /* - Do not send binlog checkpoint events to a slave that does not understand it. + Replace GTID events with old-style BEGIN events for slaves that do not + understand global transaction IDs. For stand-alone events, where there is + no terminating COMMIT query event, omit the GTID event or replace it with + a dummy event, as appropriate. + */ + if (event_type == GTID_EVENT && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID) + { + bool need_dummy= + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES; + bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy, + ev_offset, + current_checksum_alg); + if (err) + return "Failed to replace GTID event with backwards-compatible event: " + "currupt event."; + if (!need_dummy) + return NULL; + } + + /* + Do not send binlog checkpoint or gtid list events to a slave that does not + understand it. */ - if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && - mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) + if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) || + (unlikely(event_type == GTID_LIST_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)) { if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) { @@ -634,8 +1371,8 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, binlog positions. */ if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) - return "Failed to replace binlog checkpoint event with dummy: " - "too small event."; + return "Failed to replace binlog checkpoint or gtid list event with " + "dummy: too small event."; } } @@ -661,7 +1398,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, (thd, flags, packet, log_file_name, pos))) return "run 'before_send_event' hook failed"; - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if (my_net_write(net, (uchar*) packet->ptr(), len)) return "Failed on my_net_write()"; DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); @@ -696,6 +1433,12 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, mysql_mutex_t *log_lock; mysql_cond_t *log_cond; int mariadb_slave_capability; + char str_buf[256]; + String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); + bool using_gtid_state; + slave_connection_state gtid_state, return_gtid_state; + rpl_gtid error_gtid; + enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT; uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; @@ -706,6 +1449,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); + bzero(&error_gtid, sizeof(error_gtid)); /* heartbeat_period from @master_heartbeat_period user variable */ @@ -722,9 +1466,25 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, set_timespec_nsec(*heartbeat_ts, 0); } mariadb_slave_capability= get_mariadb_slave_capability(thd); + + connect_gtid_state.length(0); + using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); + DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;); + /* + We want to corrupt the first event, in Log_event::read_log_event(). + But we do not want the corruption to happen early, eg. when client does + BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to + set the real DBUG injection here. + */ + DBUG_EXECUTE_IF("corrupt_read_log_event2_set", + { + DBUG_SET("-d,corrupt_read_log_event2_set"); + DBUG_SET("+d,corrupt_read_log_event2"); + }); + if (global_system_variables.log_warnings > 1) sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", - thd->server_id, log_ident, (ulong)pos); + (int)thd->variables.server_id, log_ident, (ulong)pos); if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) { errmsg= "Failed to run hook 'transmit_start'"; @@ -755,10 +1515,36 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, } name=search_file_name; - if (log_ident[0]) - mysql_bin_log.make_log_name(search_file_name, log_ident); + if (using_gtid_state) + { + if (gtid_state.load(connect_gtid_state.c_ptr_quick(), + connect_gtid_state.length())) + { + errmsg= "Out of memory or malformed slave request when obtaining start " + "position from GTID state"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + if ((error= check_slave_start_position(thd, >id_state, &errmsg, + &error_gtid))) + { + my_errno= error; + goto err; + } + if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name))) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; + } + pos= 4; + } else - name=0; // Find first log + { + if (log_ident[0]) + mysql_bin_log.make_log_name(search_file_name, log_ident); + else + name=0; // Find first log + } linfo.index_file_offset = 0; @@ -1012,7 +1798,8 @@ impossible position"; if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, log_file_name, &log, mariadb_slave_capability, ev_offset, - current_checksum_alg))) + current_checksum_alg, using_gtid_state, + >id_state, >id_skip_group))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; @@ -1105,7 +1892,8 @@ impossible position"; int ret; ulong signal_cnt; DBUG_PRINT("wait",("waiting for data in binary log")); - if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0) + /* For mysqlbinlog (mysqlbinlog.server_id==0). */ + if (thd->variables.server_id==0) { mysql_mutex_unlock(log_lock); goto end; @@ -1172,7 +1960,9 @@ impossible position"; (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, log_file_name, &log, mariadb_slave_capability, ev_offset, - current_checksum_alg))) + current_checksum_alg, + using_gtid_state, >id_state, + >id_skip_group))) { errmsg= tmp_msg; my_errno= ER_UNKNOWN_ERROR; @@ -1264,6 +2054,22 @@ err: my_basename(p_coord->file_name), p_coord->pos, my_basename(log_file_name), my_b_tell(&log)); } + else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG) + { + my_snprintf(error_text, sizeof(error_text), + "Error: connecting slave requested to start from GTID " + "%u-%u-%llu, which is not in the master's binlog", + error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + /* Use this error code so slave will know not to try reconnect. */ + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } + else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE) + { + my_snprintf(error_text, sizeof(error_text), + "Failed to load replication slave GTID state from table %s.%s", + "mysql", rpl_gtid_slave_state_table_name.str); + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } else strcpy(error_text, errmsg); end_io_cache(&log); @@ -1629,7 +2435,7 @@ void kill_zombie_dump_threads(uint32 slave_server_id) while ((tmp=it++)) { if (tmp->command == COM_BINLOG_DUMP && - tmp->server_id == slave_server_id) + tmp->variables.server_id == slave_server_id) { mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete break; @@ -1838,7 +2644,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) { ulong s_id; get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i); - if (s_id == ::server_id && replicate_same_server_id) + if (s_id == global_system_variables.server_id && replicate_same_server_id) { my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id)); ret= TRUE; @@ -1898,6 +2704,13 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos; } + if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_ENABLE) + mi->using_gtid= true; + else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_MI_DISABLE || + lex_mi->log_file_name || lex_mi->pos || + lex_mi->relay_log_name || lex_mi->relay_log_pos) + mi->using_gtid= false; + /* If user did specify neither host nor port nor any log name nor any log pos, i.e. he specified only user/password/master_connect_retry, he probably @@ -1928,6 +2741,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) strmake(mi->master_log_name, mi->rli.group_master_log_name, sizeof(mi->master_log_name)-1); } + /* Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never a slave before). @@ -1950,6 +2764,16 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) ret= TRUE; goto err; } + + if (mi->using_gtid) + { + /* + Clear the position in the master binlogs, so that we request the + correct GTID position. + */ + mi->master_log_name[0]= 0; + mi->master_log_pos= 0; + } } else { @@ -2423,4 +3247,139 @@ int log_loaded_block(IO_CACHE* file) DBUG_RETURN(0); } + +/** + Initialise the slave replication state from the mysql.rpl_slave_state table. + + This is called each time an SQL thread starts, but the data is only actually + loaded on the first call. + + The slave state is the last GTID applied on the slave within each + replication domain. + + To avoid row lock contention, there are multiple rows for each domain_id. + The one containing the current slave state is the one with the maximal + sub_id value, within each domain_id. + + CREATE TABLE mysql.rpl_slave_state ( + domain_id INT UNSIGNED NOT NULL, + sub_id BIGINT UNSIGNED NOT NULL, + server_id INT UNSIGNED NOT NULL, + seq_no BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (domain_id, sub_id)) +*/ + +void +rpl_init_gtid_slave_state() +{ + rpl_global_gtid_slave_state.init(); +} + + +void +rpl_deinit_gtid_slave_state() +{ + rpl_global_gtid_slave_state.deinit(); +} + + +/* + Format the current GTID state as a string, for use when connecting to a + master server with GTID, or for returning the value of @@global.gtid_state. + + If the flag use_binlog is true, then the contents of the binary log (if + enabled) is merged into the current GTID state. +*/ +int +rpl_append_gtid_state(String *dest, bool use_binlog) +{ + int err; + rpl_gtid *gtid_list= NULL; + uint32 num_gtids= 0; + + if (opt_bin_log && + (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) + return err; + + rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids); + my_free(gtid_list); + + return 0; +} + + +bool +rpl_gtid_pos_check(char *str, size_t len) +{ + slave_connection_state tmp_slave_state; + + /* Check that we can parse the supplied string. */ + if (tmp_slave_state.load(str, len)) + return true; + + /* + Check our own binlog for any of our own transactions that are newer + than the GTID state the user is requesting. Any such transactions would + result in an out-of-order binlog, which could break anyone replicating + with us as master. + + So give an error if this is found, requesting the user to do a + RESET MASTER (to clean up the binlog) if they really want this. + */ + if (mysql_bin_log.is_open()) + { + rpl_gtid *binlog_gtid_list= NULL; + uint32 num_binlog_gtids= 0; + uint32 i; + + if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list, + &num_binlog_gtids)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + return true; + } + for (i= 0; i < num_binlog_gtids; ++i) + { + rpl_gtid *binlog_gtid= &binlog_gtid_list[i]; + rpl_gtid *slave_gtid; + if (binlog_gtid->server_id != global_system_variables.server_id) + continue; + if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id))) + { + my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0), + binlog_gtid->domain_id, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + break; + } + if (slave_gtid->seq_no < binlog_gtid->seq_no) + { + my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0), + slave_gtid->domain_id, slave_gtid->server_id, + slave_gtid->seq_no, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + break; + } + } + my_free(binlog_gtid_list); + if (i != num_binlog_gtids) + return true; + } + + return false; +} + + +bool +rpl_gtid_pos_update(THD *thd, char *str, size_t len) +{ + if (rpl_global_gtid_slave_state.load(thd, str, len, true)) + { + my_error(ER_FAILED_GTID_STATE_INIT, MYF(0)); + return true; + } + else + return false; +} + + #endif /* HAVE_REPLICATION */ |