diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 2396 |
1 files changed, 2178 insertions, 218 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 91acdc27bfb..8f335fd3dc2 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -14,12 +14,15 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */ +#include <my_global.h> #include "sql_priv.h" #include "unireg.h" +#include "sql_base.h" #include "sql_parse.h" // check_access #ifdef HAVE_REPLICATION #include "rpl_mi.h" +#include "rpl_rli.h" #include "sql_repl.h" #include "sql_acl.h" // SUPER_ACL #include "log_event.h" @@ -28,22 +31,122 @@ #include "rpl_handler.h" #include "debug_sync.h" + +enum enum_gtid_until_state { + GTID_UNTIL_NOT_DONE, + GTID_UNTIL_STOP_AFTER_STANDALONE, + GTID_UNTIL_STOP_AFTER_TRANSACTION +}; + + int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; #ifndef DBUG_OFF static int binlog_dump_count = 0; #endif -/** - a copy of active_mi->rli->slave_skip_counter, for showing in SHOW VARIABLES, - INFORMATION_SCHEMA.GLOBAL_VARIABLES and @@sql_slave_skip_counter without - taking all the mutexes needed to access active_mi->rli->slave_skip_counter - properly. -*/ -uint sql_slave_skip_counter; - extern TYPELIB binlog_checksum_typelib; + +static int +fake_event_header(String* packet, Log_event_type event_type, ulong extra_len, + my_bool *do_checksum, ha_checksum *crc, const char** errmsg, + uint8 checksum_alg_arg, uint32 end_pos) +{ + char header[LOG_EVENT_HEADER_LEN]; + ulong event_len; + + *do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && + checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; + + /* + 'when' (the timestamp) is set to 0 so that slave could distinguish between + real and fake Rotate events (if necessary) + */ + memset(header, 0, 4); + header[EVENT_TYPE_OFFSET] = (uchar)event_type; + event_len= LOG_EVENT_HEADER_LEN + extra_len + + (*do_checksum ? BINLOG_CHECKSUM_LEN : 0); + int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); + int4store(header + EVENT_LEN_OFFSET, event_len); + int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); + // TODO: check what problems this may cause and fix them + int4store(header + LOG_POS_OFFSET, end_pos); + if (packet->append(header, sizeof(header))) + { + *errmsg= "Failed due to out-of-memory writing event"; + return -1; + } + if (*do_checksum) + { + *crc= my_checksum(0, (uchar*)header, sizeof(header)); + } + return 0; +} + + +static int +fake_event_footer(String *packet, my_bool do_checksum, ha_checksum crc, const char **errmsg) +{ + if (do_checksum) + { + char b[BINLOG_CHECKSUM_LEN]; + int4store(b, crc); + if (packet->append(b, sizeof(b))) + { + *errmsg= "Failed due to out-of-memory writing event checksum"; + return -1; + } + } + return 0; +} + + +static int +fake_event_write(NET *net, String *packet, const char **errmsg) +{ + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + { + *errmsg = "failed on my_net_write()"; + return -1; + } + return 0; +} + + +/* + Helper structure, used to pass miscellaneous info from mysql_binlog_send() + into the helper functions that it calls. +*/ +struct binlog_send_info { + rpl_binlog_state until_binlog_state; + slave_connection_state gtid_state; + THD *thd; + NET *net; + String *packet; + char *log_file_name; + slave_connection_state *until_gtid_state; + Format_description_log_event *fdev; + int mariadb_slave_capability; + enum_gtid_skip_type gtid_skip_group; + enum_gtid_until_state gtid_until_group; + ushort flags; + uint8 current_checksum_alg; + bool slave_gtid_strict_mode; + bool send_fake_gtid_list; + bool slave_gtid_ignore_duplicates; + bool using_gtid_state; + + binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn) + : thd(thd_arg), net(&thd_arg->net), packet(packet_arg), + log_file_name(lfn), until_gtid_state(NULL), fdev(NULL), + gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE), + flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), + slave_gtid_strict_mode(false), send_fake_gtid_list(false), + slave_gtid_ignore_duplicates(false) + { } +}; + /* fake_rotate_event() builds a fake (=which does not exist physically in any binlog) Rotate event, which contains the name of the binlog we are going to @@ -62,64 +165,77 @@ extern TYPELIB binlog_checksum_typelib; part. */ -static int fake_rotate_event(NET* net, String* packet, char* log_file_name, - ulonglong position, const char** errmsg, - uint8 checksum_alg_arg) +static int fake_rotate_event(binlog_send_info *info, ulonglong position, + const char** errmsg, uint8 checksum_alg_arg) { DBUG_ENTER("fake_rotate_event"); - char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN+100]; - - /* - this Rotate is to be sent with checksum if and only if - slave's get_master_version_and_clock time handshake value - of master's @@global.binlog_checksum was TRUE - */ - - my_bool do_checksum= checksum_alg_arg != BINLOG_CHECKSUM_ALG_OFF && - checksum_alg_arg != BINLOG_CHECKSUM_ALG_UNDEF; - - /* - 'when' (the timestamp) is set to 0 so that slave could distinguish between - real and fake Rotate events (if necessary) - */ - memset(header, 0, 4); - header[EVENT_TYPE_OFFSET] = ROTATE_EVENT; - - char* p = log_file_name+dirname_length(log_file_name); + char buf[ROTATE_HEADER_LEN+100]; + my_bool do_checksum; + int err; + char* p = info->log_file_name+dirname_length(info->log_file_name); uint ident_len = (uint) strlen(p); - ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + - (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - int4store(header + SERVER_ID_OFFSET, server_id); - int4store(header + EVENT_LEN_OFFSET, event_len); - int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); + String *packet= info->packet; + ha_checksum crc; - // TODO: check what problems this may cause and fix them - int4store(header + LOG_POS_OFFSET, 0); + if ((err= fake_event_header(packet, ROTATE_EVENT, + ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc, + errmsg, checksum_alg_arg, 0))) + DBUG_RETURN(err); - packet->append(header, sizeof(header)); int8store(buf+R_POS_OFFSET,position); packet->append(buf, ROTATE_HEADER_LEN); packet->append(p, ident_len); if (do_checksum) { - char b[BINLOG_CHECKSUM_LEN]; - ha_checksum crc= my_checksum(0L, NULL, 0); - crc= my_checksum(crc, (uchar*)header, sizeof(header)); crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN); crc= my_checksum(crc, (uchar*)p, ident_len); - int4store(b, crc); - packet->append(b, sizeof(b)); } - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || + (err= fake_event_write(info->net, packet, errmsg))) + DBUG_RETURN(err); + + DBUG_RETURN(0); +} + + +static int fake_gtid_list_event(binlog_send_info *info, + Gtid_list_log_event *glev, const char** errmsg, + uint32 current_pos) +{ + my_bool do_checksum; + int err; + ha_checksum crc; + char buf[128]; + String str(buf, sizeof(buf), system_charset_info); + String* packet= info->packet; + + str.length(0); + if (glev->to_packet(&str)) { - *errmsg = "failed on my_net_write()"; - DBUG_RETURN(-1); + *errmsg= "Failed due to out-of-memory writing Gtid_list event"; + return -1; } - DBUG_RETURN(0); + if ((err= fake_event_header(packet, GTID_LIST_EVENT, + str.length(), &do_checksum, &crc, + errmsg, info->current_checksum_alg, current_pos))) + return err; + + packet->append(str); + if (do_checksum) + { + crc= my_checksum(crc, (uchar*)str.ptr(), str.length()); + } + + if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || + (err= fake_event_write(info->net, packet, errmsg))) + return err; + + return 0; } + /* Reset thread transmit packet buffer for event sending @@ -492,6 +608,94 @@ static ulonglong get_heartbeat_period(THD * thd) } /* + Lookup the capabilities of the slave, which it announces by setting a value + MARIA_SLAVE_CAPABILITY_XXX in @mariadb_slave_capability. + + Older MariaDB slaves, and other MySQL slaves, do not set + @mariadb_slave_capability, corresponding to a capability of + MARIA_SLAVE_CAPABILITY_UNKNOWN (0). +*/ +static int +get_mariadb_slave_capability(THD *thd) +{ + bool null_value; + const LEX_STRING name= { C_STRING_WITH_LEN("mariadb_slave_capability") }; + const user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry ? + (int)(entry->val_int(&null_value)) : MARIA_SLAVE_CAPABILITY_UNKNOWN; +} + + +/* + Get the value of the @slave_connect_state user variable into the supplied + String (this is the GTID connect state requested by the connecting slave). + + Returns false if error (ie. slave did not set the variable and does not + want to use GTID to set start position), true if success. +*/ +static bool +get_slave_connect_state(THD *thd, String *out_str) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_connect_state") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_str(&null_value, out_str, 0) && !null_value; +} + + +static bool +get_slave_gtid_strict_mode(THD *thd) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_strict_mode") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_int(&null_value) && !null_value; +} + + +static bool +get_slave_gtid_ignore_duplicates(THD *thd) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_ignore_duplicates") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_int(&null_value) && !null_value; +} + + +/* + Get the value of the @slave_until_gtid user variable into the supplied + String (this is the GTID position specified for START SLAVE UNTIL + master_gtid_pos='xxx'). + + Returns false if error (ie. slave did not set the variable and is not doing + START SLAVE UNTIL mater_gtid_pos='xxx'), true if success. +*/ +static bool +get_slave_until_gtid(THD *thd, String *out_str) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_until_gtid") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_str(&null_value, out_str, 0) && !null_value; +} + + +/* Function prepares and sends repliation heartbeat event. @param net net object of THD @@ -526,7 +730,7 @@ static int send_heartbeat_event(NET* net, String* packet, uint ident_len = strlen(p); ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); - int4store(header + SERVER_ID_OFFSET, server_id); + int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, 0); @@ -538,8 +742,8 @@ static int send_heartbeat_event(NET* net, String* packet, if (do_checksum) { char b[BINLOG_CHECKSUM_LEN]; - ha_checksum crc= my_checksum(0L, NULL, 0); - crc= my_checksum(crc, (uchar*) header, sizeof(header)); + ha_checksum crc; + crc= my_checksum(0, (uchar*) header, sizeof(header)); crc= my_checksum(crc, (uchar*) p, ident_len); int4store(b, crc); packet->append(b, sizeof(b)); @@ -554,6 +758,810 @@ static int send_heartbeat_event(NET* net, String* packet, } +struct binlog_file_entry +{ + binlog_file_entry *next; + char *name; +}; + +static binlog_file_entry * +get_binlog_list(MEM_ROOT *memroot) +{ + IO_CACHE *index_file; + char fname[FN_REFLEN]; + size_t length; + binlog_file_entry *current_list= NULL, *e; + DBUG_ENTER("get_binlog_list"); + + if (!mysql_bin_log.is_open()) + { + my_error(ER_NO_BINARY_LOGGING, MYF(0)); + DBUG_RETURN(NULL); + } + + mysql_bin_log.lock_index(); + index_file=mysql_bin_log.get_index_file(); + reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0); + + /* The file ends with EOF or empty line */ + while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1) + { + --length; /* Remove the newline */ + if (!(e= (binlog_file_entry *)alloc_root(memroot, sizeof(*e))) || + !(e->name= strmake_root(memroot, fname, length))) + { + mysql_bin_log.unlock_index(); + my_error(ER_OUTOFMEMORY, MYF(0), length + 1 + sizeof(*e)); + DBUG_RETURN(NULL); + } + e->next= current_list; + current_list= e; + } + mysql_bin_log.unlock_index(); + + DBUG_RETURN(current_list); +} + +/* + Find the Gtid_list_log_event at the start of a binlog. + + NULL for ok, non-NULL error message for error. + + If ok, then the event is returned in *out_gtid_list. This can be NULL if we + get back to binlogs written by old server version without GTID support. If + so, it means we have reached the point to start from, as no GTID events can + exist in earlier binlogs. +*/ +static const char * +get_gtid_list_event(IO_CACHE *cache, Gtid_list_log_event **out_gtid_list) +{ + Format_description_log_event init_fdle(BINLOG_VERSION); + Format_description_log_event *fdle; + Log_event *ev; + const char *errormsg = NULL; + + *out_gtid_list= NULL; + + if (!(ev= Log_event::read_log_event(cache, 0, &init_fdle, + opt_master_verify_checksum)) || + ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + { + if (ev) + delete ev; + return "Could not read format description log event while looking for " + "GTID position in binlog"; + } + + fdle= static_cast<Format_description_log_event *>(ev); + + for (;;) + { + Log_event_type typ; + + ev= Log_event::read_log_event(cache, 0, fdle, opt_master_verify_checksum); + if (!ev) + { + errormsg= "Could not read GTID list event while looking for GTID " + "position in binlog"; + break; + } + typ= ev->get_type_code(); + if (typ == GTID_LIST_EVENT) + break; /* Done, found it */ + delete ev; + if (typ == ROTATE_EVENT || typ == STOP_EVENT || + typ == FORMAT_DESCRIPTION_EVENT) + continue; /* Continue looking */ + + /* We did not find any Gtid_list_log_event, must be old binlog. */ + ev= NULL; + break; + } + + delete fdle; + *out_gtid_list= static_cast<Gtid_list_log_event *>(ev); + return errormsg; +} + + +/* + Check if every GTID requested by the slave is contained in this (or a later) + binlog file. Return true if so, false if not. + + We do the check with a single scan of the list of GTIDs, avoiding the need + to build an in-memory hash or stuff like that. + + We need to check that slave did not request GTID D-S-N1, when the + Gtid_list_log_event for this binlog file has D-S-N2 with N2 >= N1. + (Because this means that requested GTID is in an earlier binlog). + However, if the Gtid_list_log_event indicates that D-S-N1 is the very last + GTID for domain D in prior binlog files, then it is ok to start from the + very start of this binlog file. This special case is important, as it + allows to purge old logs even if some domain is unused for long. + + In addition, we need to check that we do not have a GTID D-S-N3 in the + Gtid_list_log_event where D is not present in the requested slave state at + all. Since if D is not in requested slave state, it means that slave needs + to start at the very first GTID in domain D. +*/ +static bool +contains_all_slave_gtid(slave_connection_state *st, Gtid_list_log_event *glev) +{ + uint32 i; + + for (i= 0; i < glev->count; ++i) + { + uint32 gl_domain_id= glev->list[i].domain_id; + const rpl_gtid *gtid= st->find(gl_domain_id); + if (!gtid) + { + /* + The slave needs to start from the very beginning of this domain, which + is in an earlier binlog file. So we need to search back further. + */ + return false; + } + if (gtid->server_id == glev->list[i].server_id && + gtid->seq_no <= glev->list[i].seq_no) + { + /* + The slave needs to start after gtid, but it is contained in an earlier + binlog file. So we need to search back further, unless it was the very + last gtid logged for the domain in earlier binlog files. + */ + if (gtid->seq_no < glev->list[i].seq_no) + return false; + + /* + The slave requested D-S-N1, which happens to be the last GTID logged + in prior binlog files with same domain id D and server id S. + + The Gtid_list is kept sorted on domain_id, with the last GTID in each + domain_id group being the last one logged. So if this is the last GTID + within the domain_id group, then it is ok to start from the very + beginning of this group, per the special case explained in comment at + the start of this function. If not, then we need to search back further. + */ + if (i+1 < glev->count && gl_domain_id == glev->list[i+1].domain_id) + return false; + } + } + + return true; +} + + +static void +give_error_start_pos_missing_in_binlog(int *err, const char **errormsg, + rpl_gtid *error_gtid) +{ + rpl_gtid binlog_gtid; + + if (mysql_bin_log.lookup_domain_in_binlog_state(error_gtid->domain_id, + &binlog_gtid) && + binlog_gtid.seq_no >= error_gtid->seq_no) + { + *errormsg= "Requested slave GTID state not found in binlog. The slave has " + "probably diverged due to executing erroneous transactions"; + *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2; + } + else + { + *errormsg= "Requested slave GTID state not found in binlog"; + *err= ER_GTID_POSITION_NOT_FOUND_IN_BINLOG; + } +} + + +/* + Check the start GTID state requested by the slave against our binlog state. + + Give an error if the slave requests something that we do not have in our + binlog. +*/ + +static int +check_slave_start_position(binlog_send_info *info, const char **errormsg, + rpl_gtid *error_gtid) +{ + uint32 i; + int err; + slave_connection_state::entry **delete_list= NULL; + uint32 delete_idx= 0; + slave_connection_state *st= &info->gtid_state; + + if (rpl_load_gtid_slave_state(info->thd)) + { + *errormsg= "Failed to load replication slave GTID state"; + err= ER_CANNOT_LOAD_SLAVE_GTID_STATE; + goto end; + } + + for (i= 0; i < st->hash.records; ++i) + { + slave_connection_state::entry *slave_gtid_entry= + (slave_connection_state::entry *)my_hash_element(&st->hash, i); + rpl_gtid *slave_gtid= &slave_gtid_entry->gtid; + rpl_gtid master_gtid; + rpl_gtid master_replication_gtid; + rpl_gtid start_gtid; + bool start_at_own_slave_pos= + rpl_global_gtid_slave_state->domain_to_gtid(slave_gtid->domain_id, + &master_replication_gtid) && + slave_gtid->server_id == master_replication_gtid.server_id && + slave_gtid->seq_no == master_replication_gtid.seq_no; + + if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, + slave_gtid->server_id, + &master_gtid) && + master_gtid.seq_no >= slave_gtid->seq_no) + { + /* + If connecting slave requests to start at the GTID we last applied when + we were ourselves a slave, then this GTID may not exist in our binlog + (in case of --log-slave-updates=0). So set the flag to disable the + error about missing GTID in the binlog in this case. + */ + if (start_at_own_slave_pos) + slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS; + continue; + } + + if (!start_at_own_slave_pos) + { + rpl_gtid domain_gtid; + slave_connection_state *until_gtid_state= info->until_gtid_state; + rpl_gtid *until_gtid; + + if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, + &domain_gtid)) + { + /* + We do not have anything in this domain, neither in the binlog nor + in the slave state. So we are probably one master in a multi-master + setup, and this domain is served by a different master. + + But set a flag so that if we then ever _do_ happen to encounter + anything in this domain, then we will re-check that the requested + slave position exists, and give the error at that time if not. + */ + slave_gtid_entry->flags|= slave_connection_state::START_ON_EMPTY_DOMAIN; + continue; + } + + if (info->slave_gtid_ignore_duplicates && + domain_gtid.seq_no < slave_gtid->seq_no) + { + /* + When --gtid-ignore-duplicates, it is ok for the slave to request + something that we do not have (yet) - they might already have gotten + it through another path in a multi-path replication hierarchy. + */ + continue; + } + + if (until_gtid_state && + ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) || + (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id, + until_gtid->server_id, + &master_gtid) && + master_gtid.seq_no >= until_gtid->seq_no))) + { + /* + The slave requested to start from a position that is not (yet) in + our binlog, but it also specified an UNTIL condition that _is_ in + our binlog (or a missing UNTIL, which means stop at the very + beginning). So the stop position is before the start position, and + we just delete the entry from the UNTIL hash to mark that this + domain has already reached the UNTIL condition. + */ + if(until_gtid) + until_gtid_state->remove(until_gtid); + continue; + } + + *error_gtid= *slave_gtid; + give_error_start_pos_missing_in_binlog(&err, errormsg, error_gtid); + goto end; + } + + /* + Ok, so connecting slave asked to start at a GTID that we do not have in + our binlog, but it was in fact the last GTID we applied earlier, when we + were acting as a replication slave. + + So this means that we were running as a replication slave without + --log-slave-updates, but now we switched to be a master. It is worth it + to handle this special case, as it allows users to run a simple + master -> slave without --log-slave-updates, and then exchange slave and + master, as long as they make sure the slave is caught up before switching. + */ + + /* + First check if we logged something ourselves as a master after being a + slave. This will be seen as a GTID with our own server_id and bigger + seq_no than what is in the slave state. + + If we did not log anything ourselves, then start the connecting slave + replicating from the current binlog end position, which in this case + corresponds to our replication slave state and hence what the connecting + slave is requesting. + */ + if (mysql_bin_log.find_in_binlog_state(slave_gtid->domain_id, + global_system_variables.server_id, + &start_gtid) && + start_gtid.seq_no > slave_gtid->seq_no) + { + /* + Start replication within this domain at the first GTID that we logged + ourselves after becoming a master. + + Remember that this starting point is in fact a "fake" GTID which may + not exists in the binlog, so that we do not complain about it in + --gtid-strict-mode. + */ + slave_gtid->server_id= global_system_variables.server_id; + slave_gtid_entry->flags|= slave_connection_state::START_OWN_SLAVE_POS; + } + else if (mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, + &start_gtid)) + { + slave_gtid->server_id= start_gtid.server_id; + slave_gtid->seq_no= start_gtid.seq_no; + } + else + { + /* + We do not have _anything_ in our own binlog for this domain. Just + delete the entry in the slave connection state, then it will pick up + anything new that arrives. + + We just queue up the deletion and do it later, after the loop, so that + we do not mess up the iteration over the hash. + */ + if (!delete_list) + { + if (!(delete_list= (slave_connection_state::entry **) + my_malloc(sizeof(*delete_list) * st->hash.records, MYF(MY_WME)))) + { + *errormsg= "Out of memory while checking slave start position"; + err= ER_OUT_OF_RESOURCES; + goto end; + } + } + delete_list[delete_idx++]= slave_gtid_entry; + } + } + + /* Do any delayed deletes from the hash. */ + if (delete_list) + { + for (i= 0; i < delete_idx; ++i) + st->remove(&(delete_list[i]->gtid)); + } + err= 0; + +end: + if (delete_list) + my_free(delete_list); + return err; +} + +/* + Find the name of the binlog file to start reading for a slave that connects + using GTID state. + + Returns the file name in out_name, which must be of size at least FN_REFLEN. + + Returns NULL on ok, error message on error. + + In case of non-error return, the returned binlog file is guaranteed to + contain the first event to be transmitted to the slave for every domain + present in our binlogs. It is still necessary to skip all GTIDs up to + and including the GTID requested by slave within each domain. + + However, as a special case, if the event to be sent to the slave is the very + first event (within that domain) in the returned binlog, then nothing should + be skipped, so that domain is deleted from the passed in slave connection + state. + + This is necessary in case the slave requests a GTID within a replication + domain that has long been inactive. The binlog file containing that GTID may + have been long since purged. However, as long as no GTIDs after that have + been purged, we have the GTID requested by slave in the Gtid_list_log_event + of the latest binlog. So we can start from there, as long as we delete the + corresponding entry in the slave state so we do not wrongly skip any events + that might turn up if that domain becomes active again, vainly looking for + the requested GTID that was already purged. +*/ +static const char * +gtid_find_binlog_file(slave_connection_state *state, char *out_name, + slave_connection_state *until_gtid_state) +{ + MEM_ROOT memroot; + binlog_file_entry *list; + Gtid_list_log_event *glev= NULL; + const char *errormsg= NULL; + char buf[FN_REFLEN]; + + init_alloc_root(&memroot, 10*(FN_REFLEN+sizeof(binlog_file_entry)), 0, + MYF(MY_THREAD_SPECIFIC)); + if (!(list= get_binlog_list(&memroot))) + { + errormsg= "Out of memory while looking for GTID position in binlog"; + goto end; + } + + while (list) + { + File file; + IO_CACHE cache; + + if (!list->next) + { + /* + It should be safe to read the currently used binlog, as we will only + read the header part that is already written. + + But if that does not work on windows, then we will need to cache the + event somewhere in memory I suppose - that could work too. + */ + } + /* + Read the Gtid_list_log_event at the start of the binlog file to + get the binlog state. + */ + if (normalize_binlog_name(buf, list->name, false)) + { + errormsg= "Failed to determine binlog file name while looking for " + "GTID position in binlog"; + goto end; + } + bzero((char*) &cache, sizeof(cache)); + if ((file= open_binlog(&cache, buf, &errormsg)) == (File)-1) + goto end; + errormsg= get_gtid_list_event(&cache, &glev); + end_io_cache(&cache); + mysql_file_close(file, MYF(MY_WME)); + if (errormsg) + goto end; + + if (!glev || contains_all_slave_gtid(state, glev)) + { + strmake(out_name, buf, FN_REFLEN); + + if (glev) + { + uint32 i; + + /* + As a special case, we allow to start from binlog file N if the + requested GTID is the last event (in the corresponding domain) in + binlog file (N-1), but then we need to remove that GTID from the slave + state, rather than skipping events waiting for it to turn up. + + If slave is doing START SLAVE UNTIL, check for any UNTIL conditions + that are already included in a previous binlog file. Delete any such + from the UNTIL hash, to mark that such domains have already reached + their UNTIL condition. + */ + for (i= 0; i < glev->count; ++i) + { + const rpl_gtid *gtid= state->find(glev->list[i].domain_id); + if (!gtid) + { + /* + Contains_all_slave_gtid() returns false if there is any domain in + Gtid_list_event which is not in the requested slave position. + + We may delete a domain from the slave state inside this loop, but + we only do this when it is the very last GTID logged for that + domain in earlier binlogs, and then we can not encounter it in any + further GTIDs in the Gtid_list. + */ + DBUG_ASSERT(0); + } else if (gtid->server_id == glev->list[i].server_id && + gtid->seq_no == glev->list[i].seq_no) + { + /* + The slave requested to start from the very beginning of this + domain in this binlog file. So delete the entry from the state, + we do not need to skip anything. + */ + state->remove(gtid); + } + + if (until_gtid_state && + (gtid= until_gtid_state->find(glev->list[i].domain_id)) && + gtid->server_id == glev->list[i].server_id && + gtid->seq_no <= glev->list[i].seq_no) + { + /* + We've already reached the stop position in UNTIL for this domain, + since it is before the start position. + */ + until_gtid_state->remove(gtid); + } + } + } + + goto end; + } + delete glev; + glev= NULL; + list= list->next; + } + + /* We reached the end without finding anything. */ + errormsg= "Could not find GTID state requested by slave in any binlog " + "files. Probably the slave state is too old and required binlog files " + "have been purged."; + +end: + if (glev) + delete glev; + + free_root(&memroot, MYF(0)); + return errormsg; +} + + +/* + Given an old-style binlog position with file name and file offset, find the + corresponding gtid position. If the offset is not at an event boundary, give + an error. + + Return NULL on ok, error message string on error. + + ToDo: Improve the performance of this by using binlog index files. +*/ +static const char * +gtid_state_from_pos(const char *name, uint32 offset, + slave_connection_state *gtid_state) +{ + IO_CACHE cache; + File file; + const char *errormsg= NULL; + bool found_gtid_list_event= false; + bool found_format_description_event= false; + bool valid_pos= false; + uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; + int err; + String packet; + Format_description_log_event *fdev= NULL; + + if (gtid_state->load((const rpl_gtid *)NULL, 0)) + { + errormsg= "Internal error (out of memory?) initializing slave state " + "while scanning binlog to find start position"; + return errormsg; + } + + if ((file= open_binlog(&cache, name, &errormsg)) == (File)-1) + return errormsg; + + if (!(fdev= new Format_description_log_event(3))) + { + errormsg= "Out of memory initializing format_description event " + "while scanning binlog to find start position"; + goto end; + } + + /* + First we need to find the initial GTID_LIST_EVENT. We need this even + if the offset is at the very start of the binlog file. + + But if we do not find any GTID_LIST_EVENT, then this is an old binlog + with no GTID information, so we return empty GTID state. + */ + for (;;) + { + Log_event_type typ; + uint32 cur_pos; + + cur_pos= (uint32)my_b_tell(&cache); + if (cur_pos == offset) + valid_pos= true; + if (found_format_description_event && found_gtid_list_event && + cur_pos >= offset) + break; + + packet.length(0); + err= Log_event::read_log_event(&cache, &packet, NULL, + current_checksum_alg); + if (err) + { + errormsg= "Could not read binlog while searching for slave start " + "position on master"; + goto end; + } + /* + The cast to uchar is needed to avoid a signed char being converted to a + negative number. + */ + typ= (Log_event_type)(uchar)packet[EVENT_TYPE_OFFSET]; + if (typ == FORMAT_DESCRIPTION_EVENT) + { + Format_description_log_event *tmp; + + if (found_format_description_event) + { + errormsg= "Duplicate format description log event found while " + "searching for old-style position in binlog"; + goto end; + } + + current_checksum_alg= get_checksum_alg(packet.ptr(), packet.length()); + found_format_description_event= true; + if (!(tmp= new Format_description_log_event(packet.ptr(), packet.length(), + fdev))) + { + errormsg= "Corrupt Format_description event found or out-of-memory " + "while searching for old-style position in binlog"; + goto end; + } + delete fdev; + fdev= tmp; + } + else if (typ != FORMAT_DESCRIPTION_EVENT && !found_format_description_event) + { + errormsg= "Did not find format description log event while searching " + "for old-style position in binlog"; + goto end; + } + else if (typ == ROTATE_EVENT || typ == STOP_EVENT || + typ == BINLOG_CHECKPOINT_EVENT) + continue; /* Continue looking */ + else if (typ == GTID_LIST_EVENT) + { + rpl_gtid *gtid_list; + bool status; + uint32 list_len; + + if (found_gtid_list_event) + { + errormsg= "Found duplicate Gtid_list_log_event while scanning binlog " + "to find slave start position"; + goto end; + } + status= Gtid_list_log_event::peek(packet.ptr(), packet.length(), + current_checksum_alg, + >id_list, &list_len, fdev); + if (status) + { + errormsg= "Error reading Gtid_list_log_event while searching " + "for old-style position in binlog"; + goto end; + } + err= gtid_state->load(gtid_list, list_len); + my_free(gtid_list); + if (err) + { + errormsg= "Internal error (out of memory?) initialising slave state " + "while scanning binlog to find start position"; + goto end; + } + found_gtid_list_event= true; + } + else if (!found_gtid_list_event) + { + /* We did not find any Gtid_list_log_event, must be old binlog. */ + goto end; + } + else if (typ == GTID_EVENT) + { + rpl_gtid gtid; + uchar flags2; + if (Gtid_log_event::peek(packet.ptr(), packet.length(), + current_checksum_alg, >id.domain_id, + >id.server_id, >id.seq_no, &flags2, fdev)) + { + errormsg= "Corrupt gtid_log_event found while scanning binlog to find " + "initial slave position"; + goto end; + } + if (gtid_state->update(>id)) + { + errormsg= "Internal error (out of memory?) updating slave state while " + "scanning binlog to find start position"; + goto end; + } + } + } + + if (!valid_pos) + { + errormsg= "Slave requested incorrect position in master binlog. " + "Requested position %u in file '%s', but this position does not " + "correspond to the location of any binlog event."; + } + +end: + delete fdev; + end_io_cache(&cache); + mysql_file_close(file, MYF(MY_WME)); + + return errormsg; +} + + +int +gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str) +{ + slave_connection_state gtid_state; + const char *lookup_name; + char name_buf[FN_REFLEN]; + LOG_INFO linfo; + + if (!mysql_bin_log.is_open()) + { + my_error(ER_NO_BINARY_LOGGING, MYF(0)); + return 1; + } + + if (in_name && in_name[0]) + { + mysql_bin_log.make_log_name(name_buf, in_name); + lookup_name= name_buf; + } + else + lookup_name= NULL; + linfo.index_file_offset= 0; + if (mysql_bin_log.find_log_pos(&linfo, lookup_name, 1)) + return 1; + + if (pos < 4) + pos= 4; + + if (gtid_state_from_pos(linfo.log_file_name, pos, >id_state) || + gtid_state.to_string(out_str)) + return 1; + return 0; +} + + +static bool +is_until_reached(binlog_send_info *info, ulong *ev_offset, + Log_event_type event_type, const char **errmsg, + uint32 current_pos) +{ + switch (info->gtid_until_group) + { + case GTID_UNTIL_NOT_DONE: + return false; + case GTID_UNTIL_STOP_AFTER_STANDALONE: + if (Log_event::is_part_of_group(event_type)) + return false; + break; + case GTID_UNTIL_STOP_AFTER_TRANSACTION: + if (event_type != XID_EVENT && + (event_type != QUERY_EVENT || + !Query_log_event::peek_is_commit_rollback + (info->packet->ptr()+*ev_offset, + info->packet->length()-*ev_offset, + info->current_checksum_alg))) + return false; + break; + } + + /* + The last event group has been sent, now the START SLAVE UNTIL condition + has been reached. + + Send a last fake Gtid_list_log_event with a flag set to mark that we + stop due to UNTIL condition. + */ + if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg)) + return true; + Gtid_list_log_event glev(&info->until_binlog_state, + Gtid_list_log_event::FLAG_UNTIL_REACHED); + if (fake_gtid_list_event(info, &glev, errmsg, current_pos)) + return true; + *errmsg= NULL; + return true; +} + + /* Helper function for mysql_binlog_send() to write an event down the slave connection. @@ -561,22 +1569,313 @@ static int send_heartbeat_event(NET* net, String* packet, Returns NULL on success, error message string on error. */ static const char * -send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, - Log_event_type event_type, char *log_file_name, - IO_CACHE *log) +send_event_to_slave(binlog_send_info *info, Log_event_type event_type, + IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid) { my_off_t pos; + String* const packet= info->packet; + size_t len= packet->length(); + int mariadb_slave_capability= info->mariadb_slave_capability; + uint8 current_checksum_alg= info->current_checksum_alg; + slave_connection_state *gtid_state= &info->gtid_state; + slave_connection_state *until_gtid_state= info->until_gtid_state; + + if (event_type == GTID_LIST_EVENT && + info->using_gtid_state && until_gtid_state) + { + rpl_gtid *gtid_list; + uint32 list_len; + bool err; + + if (ev_offset > len || + Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + current_checksum_alg, + >id_list, &list_len, info->fdev)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed to read Gtid_list_log_event: corrupt binlog"; + } + err= info->until_binlog_state.load(gtid_list, list_len); + my_free(gtid_list); + if (err) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed in internal GTID book-keeping: Out of memory"; + } + } + + /* Skip GTID event groups until we reach slave position within a domain_id. */ + if (event_type == GTID_EVENT && info->using_gtid_state) + { + uchar flags2; + slave_connection_state::entry *gtid_entry; + rpl_gtid *gtid; + + if (gtid_state->count() > 0 || until_gtid_state) + { + rpl_gtid event_gtid; + + if (ev_offset > len || + Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, + current_checksum_alg, + &event_gtid.domain_id, &event_gtid.server_id, + &event_gtid.seq_no, &flags2, info->fdev)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed to read Gtid_log_event: corrupt binlog"; + } + + DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100", + { + rpl_gtid *dbug_gtid; + if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) && + dbug_gtid->seq_no == 100) + { + DBUG_SET("-d,gtid_force_reconnect_at_10_1_100"); + DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100"); + my_errno= ER_UNKNOWN_ERROR; + return "DBUG-injected forced reconnect"; + } + }); + + if (info->until_binlog_state.update_nolock(&event_gtid, false)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed in internal GTID book-keeping: Out of memory"; + } + + if (gtid_state->count() > 0) + { + gtid_entry= gtid_state->find_entry(event_gtid.domain_id); + if (gtid_entry != NULL) + { + gtid= >id_entry->gtid; + if (gtid_entry->flags & slave_connection_state::START_ON_EMPTY_DOMAIN) + { + rpl_gtid master_gtid; + if (!mysql_bin_log.find_in_binlog_state(gtid->domain_id, + gtid->server_id, + &master_gtid) || + master_gtid.seq_no < gtid->seq_no) + { + int err; + const char *errormsg; + *error_gtid= *gtid; + give_error_start_pos_missing_in_binlog(&err, &errormsg, error_gtid); + my_errno= err; + return errormsg; + } + gtid_entry->flags&= ~(uint32)slave_connection_state::START_ON_EMPTY_DOMAIN; + } + + /* Skip this event group if we have not yet reached slave start pos. */ + if (event_gtid.server_id != gtid->server_id || + event_gtid.seq_no <= gtid->seq_no) + info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + if (event_gtid.server_id == gtid->server_id && + event_gtid.seq_no >= gtid->seq_no) + { + if (info->slave_gtid_strict_mode && + event_gtid.seq_no > gtid->seq_no && + !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS)) + { + /* + In strict mode, it is an error if the slave requests to start + in a "hole" in the master's binlog: a GTID that does not + exist, even though both the prior and subsequent seq_no exists + for same domain_id and server_id. + */ + my_errno= ER_GTID_START_FROM_BINLOG_HOLE; + *error_gtid= *gtid; + return "The binlog on the master is missing the GTID requested " + "by the slave (even though both a prior and a subsequent " + "sequence number does exist), and GTID strict mode is enabled."; + } + + /* + Send a fake Gtid_list event to the slave. + This allows the slave to update its current binlog position + so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work. + The fake event will be sent at the end of this event group. + */ + info->send_fake_gtid_list= true; + + /* + Delete this entry if we have reached slave start position (so we + will not skip subsequent events and won't have to look them up + and check). + */ + gtid_state->remove(gtid); + } + } + } + + if (until_gtid_state) + { + gtid= until_gtid_state->find(event_gtid.domain_id); + if (gtid == NULL) + { + /* + This domain already reached the START SLAVE UNTIL stop condition, + so skip this event group. + */ + info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + } + else if (event_gtid.server_id == gtid->server_id && + event_gtid.seq_no >= gtid->seq_no) + { + /* + We have reached the stop condition. + Delete this domain_id from the hash, so we will skip all further + events in this domain and eventually stop when all domains are + done. + */ + uint64 until_seq_no= gtid->seq_no; + until_gtid_state->remove(gtid); + if (until_gtid_state->count() == 0) + info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_UNTIL_STOP_AFTER_STANDALONE : + GTID_UNTIL_STOP_AFTER_TRANSACTION); + if (event_gtid.seq_no > until_seq_no) + { + /* + The GTID in START SLAVE UNTIL condition is missing in our binlog. + This should normally not happen (user error), but since we can be + sure that we are now beyond the position that the UNTIL condition + should be in, we can just stop now. And we also need to skip this + event group (as it is beyond the UNTIL condition). + */ + info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + } + } + } + } + } + + /* + Skip event group if we have not yet reached the correct slave GTID position. + + Note that slave that understands GTID can also tolerate holes, so there is + no need to supply dummy event. + */ + switch (info->gtid_skip_group) + { + case GTID_SKIP_STANDALONE: + if (!Log_event::is_part_of_group(event_type)) + info->gtid_skip_group= GTID_SKIP_NOT; + return NULL; + case GTID_SKIP_TRANSACTION: + if (event_type == XID_EVENT || + (event_type == QUERY_EVENT && + Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset, + len - ev_offset, + current_checksum_alg))) + info->gtid_skip_group= GTID_SKIP_NOT; + return NULL; + case GTID_SKIP_NOT: + break; + } /* Do not send annotate_rows events unless slave requested it. */ if (event_type == ANNOTATE_ROWS_EVENT && - !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) - return NULL; + !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) + { + if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) + { + /* This slave can tolerate events omitted from the binlog stream. */ + return NULL; + } + else if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_ANNOTATE) + { + /* + The slave did not request ANNOTATE_ROWS_EVENT (it does not need them as + it will not log them in its own binary log). However, it understands the + event and will just ignore it, and it would break if we omitted it, + leaving a hole in the binlog stream. So just send the event as-is. + */ + } + else + { + /* + The slave does not understand ANNOTATE_ROWS_EVENT. + + Older MariaDB slaves (and MySQL slaves) will break replication if there + are holes in the binlog stream (they will miscompute the binlog offset + and request the wrong position when reconnecting). + + So replace the event with a dummy event of the same size that will be + a no-operation on the slave. + */ + if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed to replace row annotate event with dummy: too small event."; + } + } + } + + /* + Replace GTID events with old-style BEGIN events for slaves that do not + understand global transaction IDs. For stand-alone events, where there is + no terminating COMMIT query event, omit the GTID event or replace it with + a dummy event, as appropriate. + */ + if (event_type == GTID_EVENT && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID) + { + bool need_dummy= + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES; + bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy, + ev_offset, + current_checksum_alg); + if (err) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed to replace GTID event with backwards-compatible event: " + "currupt event."; + } + if (!need_dummy) + return NULL; + } + + /* + Do not send binlog checkpoint or gtid list events to a slave that does not + understand it. + */ + if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) || + (unlikely(event_type == GTID_LIST_EVENT) && + mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)) + { + if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) + { + /* This slave can tolerate events omitted from the binlog stream. */ + return NULL; + } + else + { + /* + The slave does not understand BINLOG_CHECKPOINT_EVENT. Send a dummy + event instead, with same length so slave does not get confused about + binlog positions. + */ + if (Query_log_event::dummy_event(packet, ev_offset, current_checksum_alg)) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + return "Failed to replace binlog checkpoint or gtid list event with " + "dummy: too small event."; + } + } + } /* Skip events with the @@skip_replication flag set, if slave requested skipping of such events. */ - if (thd->variables.option_bits & OPTION_SKIP_REPLICATION) + if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION) { /* The first byte of the packet is a '\0' to distinguish it from an error @@ -587,29 +1886,43 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, return NULL; } - thd_proc_info(thd, "Sending binlog event to slave"); + THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave); pos= my_b_tell(log); if (RUN_HOOK(binlog_transmit, before_send_event, - (thd, flags, packet, log_file_name, pos))) + (info->thd, info->flags, packet, info->log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; return "run 'before_send_event' hook failed"; + } - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if (my_net_write(info->net, (uchar*) packet->ptr(), len)) + { + my_errno= ER_UNKNOWN_ERROR; return "Failed on my_net_write()"; + } DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); if (event_type == LOAD_EVENT) { - if (send_file(thd)) + if (send_file(info->thd)) + { + my_errno= ER_UNKNOWN_ERROR; return "failed in send_file()"; + } } - if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + if (RUN_HOOK(binlog_transmit, after_send_event, + (info->thd, info->flags, packet))) + { + my_errno= ER_UNKNOWN_ERROR; return "Failed to run hook 'after_send_event'"; + } return NULL; /* Success */ } + void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags) { @@ -621,24 +1934,32 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, IO_CACHE log; File file = -1; - String* const packet = &thd->packet; + String* const packet= &thd->packet; int error; const char *errmsg = "Unknown error", *tmp_msg; char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message() - NET* net = &thd->net; mysql_mutex_t *log_lock; mysql_cond_t *log_cond; + char str_buf[128]; + String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); + char str_buf2[128]; + String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info); + slave_connection_state until_gtid_state_obj; + rpl_gtid error_gtid; + binlog_send_info info(thd, packet, flags, log_file_name); + bool has_transmit_started= false; - uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; #ifndef DBUG_OFF int left_events = max_binlog_dump_events; + uint dbug_reconnect_counter= 0; #endif DBUG_ENTER("mysql_binlog_send"); DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); + bzero(&error_gtid, sizeof(error_gtid)); /* heartbeat_period from @master_heartbeat_period user variable */ @@ -649,21 +1970,43 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, *p_start_coord= &start_coord; LOG_POS_COORD coord_buf= { log_file_name, BIN_LOG_HEADER_SIZE }, *p_coord= &coord_buf; - if (heartbeat_period != LL(0)) + if (heartbeat_period != 0) { heartbeat_ts= &heartbeat_buf; set_timespec_nsec(*heartbeat_ts, 0); } - if (global_system_variables.log_warnings > 1) - sql_print_information("Start binlog_dump to slave_server(%u), pos(%s, %lu)", - thd->server_id, log_ident, (ulong)pos); - if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + info.mariadb_slave_capability= get_mariadb_slave_capability(thd); + + connect_gtid_state.length(0); + info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); + DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;); + if (info.using_gtid_state) { - errmsg= "Failed to run hook 'transmit_start'"; - my_errno= ER_UNKNOWN_ERROR; - goto err; + info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); + info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd); + if(get_slave_until_gtid(thd, &slave_until_gtid_str)) + info.until_gtid_state= &until_gtid_state_obj; } + DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events", + { + DBUG_SET("-d,binlog_force_reconnect_after_22_events"); + DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events"); + dbug_reconnect_counter= 22; + }); + + /* + We want to corrupt the first event, in Log_event::read_log_event(). + But we do not want the corruption to happen early, eg. when client does + BINLOG_GTID_POS(). So test case sets a DBUG trigger which causes us to + set the real DBUG injection here. + */ + DBUG_EXECUTE_IF("corrupt_read_log_event2_set", + { + DBUG_SET("-d,corrupt_read_log_event2_set"); + DBUG_SET("+d,corrupt_read_log_event2"); + }); + #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) { @@ -673,6 +2016,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, } #endif + if (!(info.fdev= new Format_description_log_event(3))) + { + errmsg= "Out of memory initializing format_description event"; + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; + } + if (!mysql_bin_log.is_open()) { errmsg = "Binary log is not open"; @@ -687,10 +2037,45 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, } name=search_file_name; - if (log_ident[0]) - mysql_bin_log.make_log_name(search_file_name, log_ident); + if (info.using_gtid_state) + { + if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(), + connect_gtid_state.length())) + { + errmsg= "Out of memory or malformed slave request when obtaining start " + "position from GTID state"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + if (info.until_gtid_state && + info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), + slave_until_gtid_str.length())) + { + errmsg= "Out of memory or malformed slave request when obtaining UNTIL " + "position sent from slave"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + if ((error= check_slave_start_position(&info, &errmsg, &error_gtid))) + { + my_errno= error; + goto err; + } + if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name, + info.until_gtid_state))) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + goto err; + } + pos= 4; + } else - name=0; // Find first log + { + if (log_ident[0]) + mysql_bin_log.make_log_name(search_file_name, log_ident); + else + name=0; // Find first log + } linfo.index_file_offset = 0; @@ -718,6 +2103,17 @@ impossible position"; goto err; } + if (global_system_variables.log_warnings > 1) + sql_print_information("Start binlog_dump to slave_server(%lu), pos(%s, %lu)", + thd->variables.server_id, log_ident, (ulong)pos); + if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + { + errmsg= "Failed to run hook 'transmit_start'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + has_transmit_started= true; + /* reset transmit packet for the fake rotate event below */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) goto err; @@ -750,7 +2146,7 @@ impossible position"; given that we want minimum modification of 4.0, we send the normal and fake Rotates. */ - if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg, + if (fake_rotate_event(&info, pos, &errmsg, get_binlog_checksum_value_at_connect(thd))) { /* @@ -800,14 +2196,16 @@ impossible position"; (*packet)[EVENT_TYPE_OFFSET+ev_offset])); if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT) { - current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, - packet->length() - ev_offset); - DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || - current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || - current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + Format_description_log_event *tmp; + + info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, + packet->length() - ev_offset); + DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); if (!is_slave_checksum_aware(thd) && - current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; errmsg= "Slave can not handle replication events with the checksum " @@ -817,6 +2215,18 @@ impossible position"; "slaves that cannot process them"); goto err; } + + if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, + packet->length()-ev_offset, + info.fdev))) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + errmsg= "Corrupt Format_description event found or out-of-memory"; + goto err; + } + delete info.fdev; + info.fdev= tmp; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* mark that this event with "log_pos=0", so the slave @@ -832,12 +2242,12 @@ impossible position"; ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* fix the checksum due to latest changes in header */ - if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) fix_checksum(packet, ev_offset); /* send it */ - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length())) { errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; @@ -867,12 +2277,22 @@ impossible position"; /* The Format_description_log_event event will be found naturally. */ } + /* + Handle the case of START SLAVE UNTIL with an UNTIL condition already + fulfilled at the start position. + + We will send one event, the format_description, and then stop. + */ + if (info.until_gtid_state && info.until_gtid_state->count() == 0) + info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; + /* seek to the requested position, to start the requested dump */ my_b_seek(&log, pos); // Seek will done on next read - while (!net->error && net->vio != 0 && !thd->killed) + while (!info.net->error && info.net->vio != 0 && !thd->killed) { Log_event_type event_type= UNKNOWN_EVENT; + killed_state killed; /* reset the transmit packet for the event read from binary log file */ @@ -880,15 +2300,16 @@ impossible position"; goto err; bool is_active_binlog= false; - while (!(error= Log_event::read_log_event(&log, packet, log_lock, - current_checksum_alg, + while (!(killed= thd->killed) && + !(error = Log_event::read_log_event(&log, packet, log_lock, + info.current_checksum_alg, log_file_name, &is_active_binlog))) { #ifndef DBUG_OFF if (max_binlog_dump_events && !left_events--) { - net_flush(net); + net_flush(info.net); errmsg = "Debugging binlog dump abort"; my_errno= ER_UNKNOWN_ERROR; goto err; @@ -906,7 +2327,7 @@ impossible position"; { if (event_type == XID_EVENT) { - net_flush(net); + net_flush(info.net); const char act[]= "now " "wait_for signal.continue"; @@ -923,14 +2344,16 @@ impossible position"; #endif if (event_type == FORMAT_DESCRIPTION_EVENT) { - current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, + Format_description_log_event *tmp; + + info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, packet->length() - ev_offset); - DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || - current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || - current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); if (!is_slave_checksum_aware(thd) && - current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; errmsg= "Slave can not handle replication events with the checksum " @@ -941,22 +2364,94 @@ impossible position"; goto err; } + if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, + packet->length()-ev_offset, + info.fdev))) + { + my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; + errmsg= "Corrupt Format_description event found or out-of-memory"; + goto err; + } + delete info.fdev; + info.fdev= tmp; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; + + if (info.using_gtid_state) + { + /* + If this event has the field `created' set, then it will cause the + slave to delete all active temporary tables. This must not happen + if the slave received any later GTIDs in a previous connect, as + those GTIDs might have created new temporary tables that are still + needed. + + So here, we check if the starting GTID position was already + reached before this format description event. If not, we clear the + `created' flag to preserve temporary tables on the slave. (If the + slave connects at a position past this event, it means that it + already received and handled it in a previous connect). + */ + if (!info.gtid_state.is_pos_reached()) + { + int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ + ST_CREATED_OFFSET+ev_offset, (ulong) 0); + if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + fix_checksum(packet, ev_offset); + } + } } - if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, - log_file_name, &log))) +#ifndef DBUG_OFF + if (dbug_reconnect_counter > 0) + { + --dbug_reconnect_counter; + if (dbug_reconnect_counter == 0) + { + errmsg= "DBUG-injected forced reconnect"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + } +#endif + + if ((tmp_msg= send_event_to_slave(&info, event_type, &log, + ev_offset, &error_gtid))) { errmsg= tmp_msg; - my_errno= ER_UNKNOWN_ERROR; goto err; } + if (unlikely(info.send_fake_gtid_list) && + info.gtid_skip_group == GTID_SKIP_NOT) + { + Gtid_list_log_event glev(&info.until_binlog_state, 0); + + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) || + fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log))) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + info.send_fake_gtid_list= false; + } + if (info.until_gtid_state && + is_until_reached(&info, &ev_offset, event_type, &errmsg, + my_b_tell(&log))) + { + if (errmsg) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + goto end; + } DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid", { if (event_type == XID_EVENT) { - net_flush(net); + net_flush(info.net); } }); @@ -964,6 +2459,8 @@ impossible position"; if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) goto err; } + if (killed) + goto end; DBUG_EXECUTE_IF("wait_after_binlog_EOF", { @@ -991,7 +2488,7 @@ impossible position"; /* Block until there is more data in the log */ - if (net_flush(net)) + if (net_flush(info.net)) { errmsg = "failed on net_flush()"; my_errno= ER_UNKNOWN_ERROR; @@ -1034,7 +2531,7 @@ impossible position"; mysql_mutex_lock(log_lock); switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0, - current_checksum_alg)) { + info.current_checksum_alg)) { case 0: /* we read successfully, so we'll need to send it to the slave */ mysql_mutex_unlock(log_lock); @@ -1049,7 +2546,8 @@ impossible position"; int ret; ulong signal_cnt; DBUG_PRINT("wait",("waiting for data in binary log")); - if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0) + /* For mysqlbinlog (mysqlbinlog.server_id==0). */ + if (thd->variables.server_id==0) { mysql_mutex_unlock(log_lock); goto end; @@ -1058,7 +2556,7 @@ impossible position"; #ifndef DBUG_OFF ulong hb_info_counter= 0; #endif - const char* old_msg= thd->proc_info; + PSI_stage_info old_stage; signal_cnt= mysql_bin_log.signal_cnt; do { @@ -1067,9 +2565,11 @@ impossible position"; DBUG_ASSERT(heartbeat_ts); set_timespec_nsec(*heartbeat_ts, heartbeat_period); } - thd->enter_cond(log_cond, log_lock, - "Master has sent all binlog to slave; " - "waiting for binlog to be updated"); + thd->ENTER_COND(log_cond, log_lock, + &stage_master_has_sent_all_binlog_to_slave, + &old_stage); + if (thd->killed) + break; ret= mysql_bin_log.wait_for_update_bin_log(thd, heartbeat_ts); DBUG_ASSERT(ret == 0 || (heartbeat_period != 0)); if (ret == ETIMEDOUT || ret == ETIME) @@ -1086,14 +2586,15 @@ impossible position"; /* reset transmit packet for the heartbeat event */ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) { - thd->exit_cond(old_msg); + thd->EXIT_COND(&old_stage); goto err; } - if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg)) + if (send_heartbeat_event(info.net, packet, p_coord, + info.current_checksum_alg)) { errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; - thd->exit_cond(old_msg); + thd->EXIT_COND(&old_stage); goto err; } } @@ -1101,8 +2602,8 @@ impossible position"; { DBUG_PRINT("wait",("binary log received update or a broadcast signal caught")); } - } while (signal_cnt == mysql_bin_log.signal_cnt && !thd->killed); - thd->exit_cond(old_msg); + } while (signal_cnt == mysql_bin_log.signal_cnt); + thd->EXIT_COND(&old_stage); } break; @@ -1112,14 +2613,39 @@ impossible position"; goto err; } - if (read_packet && - (tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, - log_file_name, &log))) + if (read_packet) { - errmsg= tmp_msg; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + if ((tmp_msg= send_event_to_slave(&info, event_type, &log, + ev_offset, &error_gtid))) + { + errmsg= tmp_msg; + goto err; + } + if (unlikely(info.send_fake_gtid_list) + && info.gtid_skip_group == GTID_SKIP_NOT) + { + Gtid_list_log_event glev(&info.until_binlog_state, 0); + + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) || + fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log))) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + info.send_fake_gtid_list= false; + } + if (info.until_gtid_state && + is_until_reached(&info, &ev_offset, event_type, &errmsg, + my_b_tell(&log))) + { + if (errmsg) + { + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + goto end; + } + } log.error=0; } @@ -1129,7 +2655,7 @@ impossible position"; bool loop_breaker = 0; /* need this to break out of the for loop from switch */ - thd_proc_info(thd, "Finished reading one binlog; switching to next binlog"); + THD_STAGE_INFO(thd, stage_finished_reading_one_binlog_switching_to_next_binlog); switch (mysql_bin_log.find_next_log(&linfo, 1)) { case 0: break; @@ -1166,8 +2692,8 @@ impossible position"; read and send is Format_description_log_event. */ if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 || - fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE, - &errmsg, current_checksum_alg)) + fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg, + info.current_checksum_alg)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; @@ -1181,17 +2707,19 @@ end: end_io_cache(&log); mysql_file_close(file, MYF(MY_WME)); - RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); + if (has_transmit_started) + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); my_eof(thd); - thd_proc_info(thd, "Waiting to finalize termination"); + THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = 0; mysql_mutex_unlock(&LOCK_thread_count); thd->variables.max_allowed_packet= old_max_allowed_packet; + delete info.fdev; DBUG_VOID_RETURN; err: - thd_proc_info(thd, "Waiting to finalize termination"); + THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination); if (my_errno == ER_MASTER_FATAL_ERROR_READING_BINLOG && my_b_inited(&log)) { /* @@ -1207,10 +2735,50 @@ err: my_basename(p_coord->file_name), p_coord->pos, my_basename(log_file_name), my_b_tell(&log)); } + else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG) + { + my_snprintf(error_text, sizeof(error_text), + "Error: connecting slave requested to start from GTID " + "%u-%u-%llu, which is not in the master's binlog", + error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + /* Use this error code so slave will know not to try reconnect. */ + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } + else if (my_errno == ER_GTID_POSITION_NOT_FOUND_IN_BINLOG2) + { + my_snprintf(error_text, sizeof(error_text), + "Error: connecting slave requested to start from GTID " + "%u-%u-%llu, which is not in the master's binlog. Since the " + "master's binlog contains GTIDs with higher sequence numbers, " + "it probably means that the slave has diverged due to " + "executing extra erroneous transactions", + error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + /* Use this error code so slave will know not to try reconnect. */ + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } + else if (my_errno == ER_GTID_START_FROM_BINLOG_HOLE) + { + my_snprintf(error_text, sizeof(error_text), + "The binlog on the master is missing the GTID %u-%u-%llu " + "requested by the slave (even though both a prior and a " + "subsequent sequence number does exist), and GTID strict mode " + "is enabled", + error_gtid.domain_id, error_gtid.server_id, error_gtid.seq_no); + /* Use this error code so slave will know not to try reconnect. */ + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } + else if (my_errno == ER_CANNOT_LOAD_SLAVE_GTID_STATE) + { + my_snprintf(error_text, sizeof(error_text), + "Failed to load replication slave GTID state from table %s.%s", + "mysql", rpl_gtid_slave_state_table_name.str); + my_errno = ER_MASTER_FATAL_ERROR_READING_BINLOG; + } else strcpy(error_text, errmsg); end_io_cache(&log); - RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); + if (has_transmit_started) + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through @@ -1224,6 +2792,7 @@ err: if (file >= 0) mysql_file_close(file, MYF(MY_WME)); thd->variables.max_allowed_packet= old_max_allowed_packet; + delete info.fdev; my_message(my_errno, error_text, MYF(0)); DBUG_VOID_RETURN; @@ -1242,18 +2811,61 @@ err: @retval 0 success @retval 1 error + @retval -1 fatal error */ + int start_slave(THD* thd , Master_info* mi, bool net_report) { int slave_errno= 0; int thread_mask; + char master_info_file_tmp[FN_REFLEN]; + char relay_log_info_file_tmp[FN_REFLEN]; DBUG_ENTER("start_slave"); if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0)) - DBUG_RETURN(1); - lock_slave_threads(mi); // this allows us to cleanly read slave_running + DBUG_RETURN(-1); + + create_logfile_name_with_suffix(master_info_file_tmp, + sizeof(master_info_file_tmp), + master_info_file, 0, + &mi->cmp_connection_name); + create_logfile_name_with_suffix(relay_log_info_file_tmp, + sizeof(relay_log_info_file_tmp), + relay_log_info_file, 0, + &mi->cmp_connection_name); + + mi->lock_slave_threads(); + if (mi->killed) + { + /* connection was deleted while we waited for lock_slave_threads */ + mi->unlock_slave_threads(); + my_error(WARN_NO_MASTER_INFO, mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(-1); + } + // Get a mask of _stopped_ threads init_thread_mask(&thread_mask,mi,1 /* inverse */); + + if (thd->lex->mi.gtid_pos_str.str) + { + if (thread_mask != (SLAVE_IO|SLAVE_SQL)) + { + slave_errno= ER_SLAVE_WAS_RUNNING; + goto err; + } + if (thd->lex->slave_thd_opt) + { + slave_errno= ER_BAD_SLAVE_UNTIL_COND; + goto err; + } + if (mi->using_gtid == Master_info::USE_GTID_NO) + { + slave_errno= ER_UNTIL_REQUIRES_USING_GTID; + goto err; + } + } + /* Below we will start all stopped threads. But if the user wants to start only one thread, do as if the other thread was running (as we @@ -1264,10 +2876,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) thread_mask&= thd->lex->slave_thd_opt; if (thread_mask) //some threads are stopped, start them { - if (init_master_info(mi,master_info_file,relay_log_info_file, 0, + if (init_master_info(mi,master_info_file_tmp,relay_log_info_file_tmp, 0, thread_mask)) slave_errno=ER_MASTER_INFO; - else if (server_id_supplied && *mi->host) + else if (!server_id_supplied) + { + slave_errno= ER_BAD_SLAVE; net_report= 0; + my_message(slave_errno, "Misconfigured slave: server_id was not set; Fix in config file", + MYF(0)); + } + else if (!*mi->host) + { + slave_errno= ER_BAD_SLAVE; net_report= 0; + my_message(slave_errno, "Misconfigured slave: MASTER_HOST was not set; Fix in config file or with CHANGE MASTER TO", + MYF(0)); + } + else { /* If we will start SQL thread we will care about UNTIL options If @@ -1298,10 +2922,22 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) mi->rli.until_log_pos= thd->lex->mi.relay_log_pos; strmake_buf(mi->rli.until_log_name, thd->lex->mi.relay_log_name); } + else if (thd->lex->mi.gtid_pos_str.str) + { + if (mi->rli.until_gtid_pos.load(thd->lex->mi.gtid_pos_str.str, + thd->lex->mi.gtid_pos_str.length)) + { + slave_errno= ER_INCORRECT_GTID_STATE; + mysql_mutex_unlock(&mi->rli.data_lock); + goto err; + } + mi->rli.until_condition= Relay_log_info::UNTIL_GTID; + } else mi->rli.clear_until_condition(); - if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE) + if (mi->rli.until_condition == Relay_log_info::UNTIL_MASTER_POS || + mi->rli.until_condition == Relay_log_info::UNTIL_RELAY_POS) { /* Preparing members for effective until condition checking */ const char *p= fn_ext(mi->rli.until_log_name); @@ -1324,10 +2960,13 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) /* mark the cached result of the UNTIL comparison as "undefined" */ mi->rli.until_log_names_cmp_result= Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN; + } + if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE) + { /* Issuing warning then started without --skip-slave-start */ if (!opt_skip_slave_start) - push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_MISSING_SKIP_SLAVE, ER(ER_MISSING_SKIP_SLAVE)); } @@ -1335,36 +2974,36 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) mysql_mutex_unlock(&mi->rli.data_lock); } else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos) - push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED, + push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED, ER(ER_UNTIL_COND_IGNORED)); if (!slave_errno) - slave_errno = start_slave_threads(0 /*no mutex */, - 1 /* wait for start */, - mi, - master_info_file,relay_log_info_file, - thread_mask); + slave_errno = start_slave_threads(1, + 1 /* wait for start */, + mi, + master_info_file_tmp, + relay_log_info_file_tmp, + thread_mask); } - else - slave_errno = ER_BAD_SLAVE; } else { /* no error if all threads are already started, only a warning */ - push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING, + push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_RUNNING, ER(ER_SLAVE_WAS_RUNNING)); } - unlock_slave_threads(mi); +err: + mi->unlock_slave_threads(); if (slave_errno) { if (net_report) - my_message(slave_errno, ER(slave_errno), MYF(0)); - DBUG_RETURN(1); + my_error(slave_errno, MYF(0), + (int) mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(slave_errno == ER_BAD_SLAVE ? -1 : 1); } - else if (net_report) - my_ok(thd); DBUG_RETURN(0); } @@ -1382,21 +3021,25 @@ int start_slave(THD* thd , Master_info* mi, bool net_report) @retval 0 success @retval 1 error + @retval -1 error */ + int stop_slave(THD* thd, Master_info* mi, bool net_report ) { - DBUG_ENTER("stop_slave"); - int slave_errno; - if (!thd) - thd = current_thd; + DBUG_ENTER("stop_slave"); + DBUG_PRINT("enter",("Connection: %s", mi->connection_name.str)); if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0)) - DBUG_RETURN(1); - thd_proc_info(thd, "Killing slave"); + DBUG_RETURN(-1); + THD_STAGE_INFO(thd, stage_killing_slave); int thread_mask; - lock_slave_threads(mi); - // Get a mask of _running_ threads + mi->lock_slave_threads(); + /* + Get a mask of _running_ threads. + We don't have to test for mi->killed as the thread_mask will take care + of checking if threads exists + */ init_thread_mask(&thread_mask,mi,0 /* not inverse*/); /* Below we will stop all running threads. @@ -1409,18 +3052,17 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) if (thread_mask) { - slave_errno= terminate_slave_threads(mi,thread_mask, - 1 /*skip lock */); + slave_errno= terminate_slave_threads(mi,thread_mask, 0 /* get lock */); } else { //no error if both threads are already stopped, only a warning slave_errno= 0; - push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING, + push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_WAS_NOT_RUNNING, ER(ER_SLAVE_WAS_NOT_RUNNING)); } - unlock_slave_threads(mi); - thd_proc_info(thd, 0); + + mi->unlock_slave_threads(); if (slave_errno) { @@ -1428,8 +3070,6 @@ int stop_slave(THD* thd, Master_info* mi, bool net_report ) my_message(slave_errno, ER(slave_errno), MYF(0)); DBUG_RETURN(1); } - else if (net_report) - my_ok(thd); DBUG_RETURN(0); } @@ -1452,16 +3092,28 @@ int reset_slave(THD *thd, Master_info* mi) char fname[FN_REFLEN]; int thread_mask= 0, error= 0; uint sql_errno=ER_UNKNOWN_ERROR; - const char* errmsg= "Unknown error occured while reseting slave"; + const char* errmsg= "Unknown error occurred while reseting slave"; + char master_info_file_tmp[FN_REFLEN]; + char relay_log_info_file_tmp[FN_REFLEN]; DBUG_ENTER("reset_slave"); - lock_slave_threads(mi); + mi->lock_slave_threads(); + if (mi->killed) + { + /* connection was deleted while we waited for lock_slave_threads */ + mi->unlock_slave_threads(); + my_error(WARN_NO_MASTER_INFO, mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(-1); + } + init_thread_mask(&thread_mask,mi,0 /* not inverse */); if (thread_mask) // We refuse if any slave thread is running { - sql_errno= ER_SLAVE_MUST_STOP; - error=1; - goto err; + mi->unlock_slave_threads(); + my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(ER_SLAVE_MUST_STOP); } ha_reset_slave(thd); @@ -1485,30 +3137,46 @@ int reset_slave(THD *thd, Master_info* mi) mi->clear_error(); mi->rli.clear_error(); mi->rli.clear_until_condition(); + mi->rli.slave_skip_counter= 0; // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0 end_master_info(mi); + end_relay_log_info(&mi->rli); // and delete these two files - fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + create_logfile_name_with_suffix(master_info_file_tmp, + sizeof(master_info_file_tmp), + master_info_file, 0, + &mi->cmp_connection_name); + create_logfile_name_with_suffix(relay_log_info_file_tmp, + sizeof(relay_log_info_file_tmp), + relay_log_info_file, 0, + &mi->cmp_connection_name); + + fn_format(fname, master_info_file_tmp, mysql_data_home, "", 4+32); if (mysql_file_stat(key_file_master_info, fname, &stat_area, MYF(0)) && mysql_file_delete(key_file_master_info, fname, MYF(MY_WME))) { error=1; goto err; } + else if (global_system_variables.log_warnings > 1) + sql_print_information("Deleted Master_info file '%s'.", fname); + // delete relay_log_info_file - fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32); + fn_format(fname, relay_log_info_file_tmp, mysql_data_home, "", 4+32); if (mysql_file_stat(key_file_relay_log_info, fname, &stat_area, MYF(0)) && mysql_file_delete(key_file_relay_log_info, fname, MYF(MY_WME))) { error=1; goto err; } + else if (global_system_variables.log_warnings > 1) + sql_print_information("Deleted Master_info file '%s'.", fname); RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); err: - unlock_slave_threads(mi); + mi->unlock_slave_threads(); if (error) my_error(sql_errno, MYF(0), errmsg); DBUG_RETURN(error); @@ -1542,8 +3210,8 @@ void kill_zombie_dump_threads(uint32 slave_server_id) while ((tmp=it++)) { - if (tmp->command == COM_BINLOG_DUMP && - tmp->server_id == slave_server_id) + if (tmp->get_command() == COM_BINLOG_DUMP && + tmp->variables.server_id == slave_server_id) { mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete break; @@ -1598,10 +3266,14 @@ static bool get_string_parameter(char *to, const char *from, size_t length, @param mi Pointer to Master_info object belonging to the slave's IO thread. + @param master_info_added Out parameter saying if the Master_info *mi was + added to the global list of masters. This is useful in error conditions + to know if caller should free Master_info *mi. + @retval FALSE success @retval TRUE error */ -bool change_master(THD* thd, Master_info* mi) +bool change_master(THD* thd, Master_info* mi, bool *master_info_added) { int thread_mask; const char* errmsg= 0; @@ -1610,20 +3282,17 @@ bool change_master(THD* thd, Master_info* mi) char saved_host[HOSTNAME_LENGTH + 1]; uint saved_port; char saved_log_name[FN_REFLEN]; + Master_info::enum_using_gtid saved_using_gtid; + char master_info_file_tmp[FN_REFLEN]; + char relay_log_info_file_tmp[FN_REFLEN]; my_off_t saved_log_pos; + LEX_MASTER_INFO* lex_mi= &thd->lex->mi; DBUG_ENTER("change_master"); - lock_slave_threads(mi); - init_thread_mask(&thread_mask,mi,0 /*not inverse*/); - LEX_MASTER_INFO* lex_mi= &thd->lex->mi; - if (thread_mask) // We refuse if any slave thread is running - { - my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0)); - ret= TRUE; - goto err; - } + DBUG_ASSERT(master_info_index); + mysql_mutex_assert_owner(&LOCK_active_mi); - thd_proc_info(thd, "Changing master"); + *master_info_added= false; /* We need to check if there is an empty master_host. Otherwise change master succeeds, a master.info file is created containing @@ -1631,17 +3300,74 @@ bool change_master(THD* thd, Master_info* mi) is thrown stating that the server is not configured as slave. (See BUG#28796). */ - if(lex_mi->host && !*lex_mi->host) + if (lex_mi->host && !*lex_mi->host) { my_error(ER_WRONG_ARGUMENTS, MYF(0), "MASTER_HOST"); - unlock_slave_threads(mi); DBUG_RETURN(TRUE); } - // TODO: see if needs re-write - if (init_master_info(mi, master_info_file, relay_log_info_file, 0, + if (master_info_index->check_duplicate_master_info(&lex_mi->connection_name, + lex_mi->host, + lex_mi->port)) + DBUG_RETURN(TRUE); + + mi->lock_slave_threads(); + if (mi->killed) + { + /* connection was deleted while we waited for lock_slave_threads */ + mi->unlock_slave_threads(); + my_error(WARN_NO_MASTER_INFO, mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(TRUE); + } + + init_thread_mask(&thread_mask,mi,0 /*not inverse*/); + if (thread_mask) // We refuse if any slave thread is running + { + my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, + mi->connection_name.str); + ret= TRUE; + goto err; + } + + THD_STAGE_INFO(thd, stage_changing_master); + + create_logfile_name_with_suffix(master_info_file_tmp, + sizeof(master_info_file_tmp), + master_info_file, 0, + &mi->cmp_connection_name); + create_logfile_name_with_suffix(relay_log_info_file_tmp, + sizeof(relay_log_info_file_tmp), + relay_log_info_file, 0, + &mi->cmp_connection_name); + + /* if new Master_info doesn't exists, add it */ + if (!master_info_index->get_master_info(&mi->connection_name, + Sql_condition::WARN_LEVEL_NOTE)) + { + if (master_info_index->add_master_info(mi, TRUE)) + { + my_error(ER_MASTER_INFO, MYF(0), + (int) lex_mi->connection_name.length, + lex_mi->connection_name.str); + ret= TRUE; + goto err; + } + *master_info_added= true; + } + if (global_system_variables.log_warnings > 1) + sql_print_information("Master connection name: '%.*s' " + "Master_info_file: '%s' " + "Relay_info_file: '%s'", + (int) mi->connection_name.length, + mi->connection_name.str, + master_info_file_tmp, relay_log_info_file_tmp); + + if (init_master_info(mi, master_info_file_tmp, relay_log_info_file_tmp, 0, thread_mask)) { - my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0)); + my_error(ER_MASTER_INFO, MYF(0), + (int) lex_mi->connection_name.length, + lex_mi->connection_name.str); ret= TRUE; goto err; } @@ -1659,6 +3385,7 @@ bool change_master(THD* thd, Master_info* mi) saved_port= mi->port; strmake_buf(saved_log_name, mi->master_log_name); saved_log_pos= mi->master_log_pos; + saved_using_gtid= mi->using_gtid; /* If the user specified host or port without binlog or position, @@ -1698,9 +3425,9 @@ bool change_master(THD* thd, Master_info* mi) if (lex_mi->heartbeat_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) mi->heartbeat_period = lex_mi->heartbeat_period; else - mi->heartbeat_period= (float) min(SLAVE_MAX_HEARTBEAT_PERIOD, + mi->heartbeat_period= (float) MY_MIN(SLAVE_MAX_HEARTBEAT_PERIOD, (slave_net_timeout/2.0)); - mi->received_heartbeats= LL(0); // counter lives until master is CHANGEd + mi->received_heartbeats= 0; // counter lives until master is CHANGEd /* reset the last time server_id list if the current CHANGE MASTER is mentioning IGNORE_SERVER_IDS= (...) @@ -1711,7 +3438,7 @@ bool change_master(THD* thd, Master_info* mi) { ulong s_id; get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i); - if (s_id == ::server_id && replicate_same_server_id) + if (s_id == global_system_variables.server_id && replicate_same_server_id) { my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id)); ret= TRUE; @@ -1746,11 +3473,16 @@ bool change_master(THD* thd, Master_info* mi) strmake_buf(mi->ssl_cipher, lex_mi->ssl_cipher); if (lex_mi->ssl_key) strmake_buf(mi->ssl_key, lex_mi->ssl_key); + if (lex_mi->ssl_crl) + strmake_buf(mi->ssl_crl, lex_mi->ssl_crl); + if (lex_mi->ssl_crlpath) + strmake_buf(mi->ssl_crlpath, lex_mi->ssl_crlpath); + #ifndef HAVE_OPENSSL if (lex_mi->ssl || lex_mi->ssl_ca || lex_mi->ssl_capath || lex_mi->ssl_cert || lex_mi->ssl_cipher || lex_mi->ssl_key || - lex_mi->ssl_verify_server_cert ) - push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + lex_mi->ssl_verify_server_cert || lex_mi->ssl_crl || lex_mi->ssl_crlpath) + push_warning(thd, Sql_condition::WARN_LEVEL_NOTE, ER_SLAVE_IGNORED_SSL_PARAMS, ER(ER_SLAVE_IGNORED_SSL_PARAMS)); #endif @@ -1769,6 +3501,15 @@ bool change_master(THD* thd, Master_info* mi) mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos; } + if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_SLAVE_POS) + mi->using_gtid= Master_info::USE_GTID_SLAVE_POS; + else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_CURRENT_POS) + mi->using_gtid= Master_info::USE_GTID_CURRENT_POS; + else if (lex_mi->use_gtid_opt == LEX_MASTER_INFO::LEX_GTID_NO || + lex_mi->log_file_name || lex_mi->pos || + lex_mi->relay_log_name || lex_mi->relay_log_pos) + mi->using_gtid= Master_info::USE_GTID_NO; + /* If user did specify neither host nor port nor any log name nor any log pos, i.e. he specified only user/password/master_connect_retry, he probably @@ -1789,15 +3530,16 @@ bool change_master(THD* thd, Master_info* mi) { /* Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is - not initialized), so we use a max(). + not initialized), so we use a MY_MAX(). What happens to mi->rli.master_log_pos during the initialization stages of replication is not 100% clear, so we guard against problems using - max(). + MY_MAX(). */ - mi->master_log_pos = max(BIN_LOG_HEADER_SIZE, + mi->master_log_pos = MY_MAX(BIN_LOG_HEADER_SIZE, mi->rli.group_master_log_pos); strmake_buf(mi->master_log_name, mi->rli.group_master_log_name); } + /* Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never a slave before). @@ -1810,8 +3552,7 @@ bool change_master(THD* thd, Master_info* mi) } if (need_relay_log_purge) { - relay_log_purge= 1; - thd_proc_info(thd, "Purging old relay logs"); + THD_STAGE_INFO(thd, stage_purging_old_relay_logs); if (purge_relay_logs(&mi->rli, thd, 0 /* not only reset, but also reinit */, &errmsg)) @@ -1824,7 +3565,6 @@ bool change_master(THD* thd, Master_info* mi) else { const char* msg; - relay_log_purge= 0; /* Relay log is already initialized */ if (init_relay_log_pos(&mi->rli, mi->rli.group_relay_log_name, @@ -1859,6 +3599,7 @@ bool change_master(THD* thd, Master_info* mi) /* Clear the errors, for a clean start */ mi->rli.clear_error(); mi->rli.clear_until_condition(); + mi->rli.slave_skip_counter= 0; sql_print_information("'CHANGE MASTER TO executed'. " "Previous state master_host='%s', master_port='%u', master_log_file='%s', " @@ -1867,6 +3608,11 @@ bool change_master(THD* thd, Master_info* mi) "master_log_pos='%ld'.", saved_host, saved_port, saved_log_name, (ulong) saved_log_pos, mi->host, mi->port, mi->master_log_name, (ulong) mi->master_log_pos); + if (saved_using_gtid != Master_info::USE_GTID_NO || + mi->using_gtid != Master_info::USE_GTID_NO) + sql_print_information("Previous Using_Gtid=%s. New Using_Gtid=%s", + mi->using_gtid_astext(saved_using_gtid), + mi->using_gtid_astext(mi->using_gtid)); /* If we don't write new coordinates to disk now, then old will remain in @@ -1875,13 +3621,13 @@ bool change_master(THD* thd, Master_info* mi) in-memory value at restart (thus causing errors, as the old relay log does not exist anymore). */ - flush_relay_log_info(&mi->rli); + if (flush_relay_log_info(&mi->rli)) + ret= 1; mysql_cond_broadcast(&mi->data_cond); mysql_mutex_unlock(&mi->rli.data_lock); err: - unlock_slave_threads(mi); - thd_proc_info(thd, 0); + mi->unlock_slave_threads(); if (ret == FALSE) my_ok(thd); DBUG_RETURN(ret); @@ -1897,7 +3643,7 @@ err: @retval 0 success @retval 1 error */ -int reset_master(THD* thd) +int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len) { if (!mysql_bin_log.is_open()) { @@ -1906,7 +3652,7 @@ int reset_master(THD* thd) return 1; } - if (mysql_bin_log.reset_logs(thd)) + if (mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len)) return 1; RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); return 0; @@ -1932,8 +3678,8 @@ bool mysql_show_binlog_events(THD* thd) File file = -1; MYSQL_BIN_LOG *binary_log= NULL; int old_max_allowed_packet= thd->variables.max_allowed_packet; + Master_info *mi= 0; LOG_INFO linfo; - DBUG_ENTER("mysql_show_binlog_events"); Log_event::init_show_field_list(&field_list); @@ -1941,13 +3687,10 @@ bool mysql_show_binlog_events(THD* thd) Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(TRUE); - Format_description_log_event *description_event= new - Format_description_log_event(3); /* MySQL 4.0 by default */ - DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS || thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS); - /* select wich binary log to use: binlog or relay */ + /* select which binary log to use: binlog or relay */ if ( thd->lex->sql_command == SQLCOM_SHOW_BINLOG_EVENTS ) { /* @@ -1961,23 +3704,35 @@ bool mysql_show_binlog_events(THD* thd) } else /* showing relay log contents */ { - if (!active_mi) + if (!(mi= get_master_info(&thd->variables.default_master_connection, + Sql_condition::WARN_LEVEL_ERROR))) + { DBUG_RETURN(TRUE); - - binary_log= &(active_mi->rli.relay_log); + } + binary_log= &(mi->rli.relay_log); } + Format_description_log_event *description_event= new + Format_description_log_event(3); /* MySQL 4.0 by default */ + if (binary_log->is_open()) { LEX_MASTER_INFO *lex_mi= &thd->lex->mi; SELECT_LEX_UNIT *unit= &thd->lex->unit; ha_rows event_count, limit_start, limit_end; - my_off_t pos = max(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly + my_off_t pos = MY_MAX(BIN_LOG_HEADER_SIZE, lex_mi->pos); // user-friendly char search_file_name[FN_REFLEN], *name; const char *log_file_name = lex_mi->log_file_name; mysql_mutex_t *log_lock = binary_log->get_log_lock(); Log_event* ev; + if (mi) + { + /* We can unlock the mutex as we have a lock on the file */ + mi->release(); + mi= 0; + } + unit->set_limit(thd->lex->current_select); limit_start= unit->offset_limit_cnt; limit_end= unit->select_limit_cnt; @@ -1996,6 +3751,7 @@ bool mysql_show_binlog_events(THD* thd) goto err; } + /* These locks is here to enable syncronization with log_in_use() */ mysql_mutex_lock(&LOCK_thread_count); thd->current_linfo = &linfo; mysql_mutex_unlock(&LOCK_thread_count); @@ -2072,6 +3828,9 @@ bool mysql_show_binlog_events(THD* thd) mysql_mutex_unlock(log_lock); } + else if (mi) + mi->release(); + // Check that linfo is still on the function scope. DEBUG_SYNC(thd, "after_show_binlog_events"); @@ -2091,8 +3850,9 @@ err: else my_eof(thd); + /* These locks is here to enable syncronization with log_in_use() */ mysql_mutex_lock(&LOCK_thread_count); - thd->current_linfo = 0; + thd->current_linfo= 0; mysql_mutex_unlock(&LOCK_thread_count); thd->variables.max_allowed_packet= old_max_allowed_packet; DBUG_RETURN(ret); @@ -2252,14 +4012,14 @@ int log_loaded_block(IO_CACHE* file) DBUG_RETURN(0); for (block_len= (uint) (my_b_get_bytes_in_buffer(file)); block_len > 0; - buffer += min(block_len, max_event_size), - block_len -= min(block_len, max_event_size)) + buffer += MY_MIN(block_len, max_event_size), + block_len -= MY_MIN(block_len, max_event_size)) { lf_info->last_pos_in_file= my_b_get_pos_in_file(file); if (lf_info->wrote_create_file) { Append_block_log_event a(lf_info->thd, lf_info->thd->db, buffer, - min(block_len, max_event_size), + MY_MIN(block_len, max_event_size), lf_info->log_delayed); if (mysql_bin_log.write(&a)) DBUG_RETURN(1); @@ -2268,7 +4028,7 @@ int log_loaded_block(IO_CACHE* file) { Begin_load_query_log_event b(lf_info->thd, lf_info->thd->db, buffer, - min(block_len, max_event_size), + MY_MIN(block_len, max_event_size), lf_info->log_delayed); if (mysql_bin_log.write(&b)) DBUG_RETURN(1); @@ -2278,4 +4038,204 @@ int log_loaded_block(IO_CACHE* file) DBUG_RETURN(0); } + +/** + Initialise the slave replication state from the mysql.gtid_slave_pos table. + + This is called each time an SQL thread starts, but the data is only actually + loaded on the first call. + + The slave state is the last GTID applied on the slave within each + replication domain. + + To avoid row lock contention, there are multiple rows for each domain_id. + The one containing the current slave state is the one with the maximal + sub_id value, within each domain_id. + + CREATE TABLE mysql.gtid_slave_pos ( + domain_id INT UNSIGNED NOT NULL, + sub_id BIGINT UNSIGNED NOT NULL, + server_id INT UNSIGNED NOT NULL, + seq_no BIGINT UNSIGNED NOT NULL, + PRIMARY KEY (domain_id, sub_id)) +*/ + +void +rpl_init_gtid_slave_state() +{ + rpl_global_gtid_slave_state= new rpl_slave_state; +} + + +void +rpl_deinit_gtid_slave_state() +{ + delete rpl_global_gtid_slave_state; +} + + +void +rpl_init_gtid_waiting() +{ + rpl_global_gtid_waiting.init(); +} + + +void +rpl_deinit_gtid_waiting() +{ + rpl_global_gtid_waiting.destroy(); +} + + +/* + Format the current GTID state as a string, for returning the value of + @@global.gtid_slave_pos. + + If the flag use_binlog is true, then the contents of the binary log (if + enabled) is merged into the current GTID state (@@global.gtid_current_pos). +*/ +int +rpl_append_gtid_state(String *dest, bool use_binlog) +{ + int err; + rpl_gtid *gtid_list= NULL; + uint32 num_gtids= 0; + + if (use_binlog && opt_bin_log && + (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) + return err; + + err= rpl_global_gtid_slave_state->tostring(dest, gtid_list, num_gtids); + my_free(gtid_list); + + return err; +} + + +/* + Load the current GTID position into a slave_connection_state, for use when + connecting to a master server with GTID. + + If the flag use_binlog is true, then the contents of the binary log (if + enabled) is merged into the current GTID state (master_use_gtid=current_pos). +*/ +int +rpl_load_gtid_state(slave_connection_state *state, bool use_binlog) +{ + int err; + rpl_gtid *gtid_list= NULL; + uint32 num_gtids= 0; + + if (use_binlog && opt_bin_log && + (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) + return err; + + err= state->load(rpl_global_gtid_slave_state, gtid_list, num_gtids); + my_free(gtid_list); + + return err; +} + + +bool +rpl_gtid_pos_check(THD *thd, char *str, size_t len) +{ + slave_connection_state tmp_slave_state; + bool gave_conflict_warning= false, gave_missing_warning= false; + + /* Check that we can parse the supplied string. */ + if (tmp_slave_state.load(str, len)) + return true; + + /* + Check our own binlog for any of our own transactions that are newer + than the GTID state the user is requesting. Any such transactions would + result in an out-of-order binlog, which could break anyone replicating + with us as master. + + So give an error if this is found, requesting the user to do a + RESET MASTER (to clean up the binlog) if they really want this. + */ + if (mysql_bin_log.is_open()) + { + rpl_gtid *binlog_gtid_list= NULL; + uint32 num_binlog_gtids= 0; + uint32 i; + + if (mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list, + &num_binlog_gtids)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + return true; + } + for (i= 0; i < num_binlog_gtids; ++i) + { + rpl_gtid *binlog_gtid= &binlog_gtid_list[i]; + rpl_gtid *slave_gtid; + if (binlog_gtid->server_id != global_system_variables.server_id) + continue; + if (!(slave_gtid= tmp_slave_state.find(binlog_gtid->domain_id))) + { + if (opt_gtid_strict_mode) + { + my_error(ER_MASTER_GTID_POS_MISSING_DOMAIN, MYF(0), + binlog_gtid->domain_id, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + break; + } + else if (!gave_missing_warning) + { + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_MASTER_GTID_POS_MISSING_DOMAIN, + ER(ER_MASTER_GTID_POS_MISSING_DOMAIN), + binlog_gtid->domain_id, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + gave_missing_warning= true; + } + } + else if (slave_gtid->seq_no < binlog_gtid->seq_no) + { + if (opt_gtid_strict_mode) + { + my_error(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, MYF(0), + slave_gtid->domain_id, slave_gtid->server_id, + slave_gtid->seq_no, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + break; + } + else if (!gave_conflict_warning) + { + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG, + ER(ER_MASTER_GTID_POS_CONFLICTS_WITH_BINLOG), + slave_gtid->domain_id, slave_gtid->server_id, + slave_gtid->seq_no, binlog_gtid->domain_id, + binlog_gtid->server_id, binlog_gtid->seq_no); + gave_conflict_warning= true; + } + } + } + my_free(binlog_gtid_list); + if (i != num_binlog_gtids) + return true; + } + + return false; +} + + +bool +rpl_gtid_pos_update(THD *thd, char *str, size_t len) +{ + if (rpl_global_gtid_slave_state->load(thd, str, len, true, true)) + { + my_error(ER_FAILED_GTID_STATE_INIT, MYF(0)); + return true; + } + else + return false; +} + + #endif /* HAVE_REPLICATION */ |