diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-06-05 14:32:47 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-06-05 14:32:47 +0200 |
commit | 5cb486d159e45b9b8dc4d647b2df2492a286cf4d (patch) | |
tree | 85bdb56bae9cad68033b742092012f0961172745 /sql | |
parent | 7ad47ab0e080ca66f8a41de461b036d3bdff25fb (diff) | |
download | mariadb-git-5cb486d159e45b9b8dc4d647b2df2492a286cf4d.tar.gz |
MDEV-26: Global transaction ID.
Fix problems related to reconnect. When we need to reconnect (ie. explict
stop/start of just the IO thread by user, or automatic reconnect due to
loosing network connection with the master), it is a bit complex to correctly
resume at the right point without causing duplicate or missing events in the
relay log. The previous code had multiple problems in this regard.
With this patch, the problem is solved as follows. The IO thread keeps track
(in memory) of which GTID was last queued to the relay log. If it needs to
reconnect, it resumes at that GTID position. It also counts number of events
received within the last, possibly partial, event group, and skips the same
number of events after a reconnect, so that events already enqueued before the
reconnect are not duplicated.
(There is no need to keep any persistent state; whenever we restart slave
threads after both of them being stopped (such as after server restart), we
erase the relay logs and start over from the last GTID applied by SQL thread.
But while the SQL thread is running, this patch is needed to get correct relay
log).
Diffstat (limited to 'sql')
-rw-r--r-- | sql/rpl_gtid.cc | 91 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 4 | ||||
-rw-r--r-- | sql/rpl_mi.cc | 24 | ||||
-rw-r--r-- | sql/rpl_mi.h | 37 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 2 | ||||
-rw-r--r-- | sql/slave.cc | 290 | ||||
-rw-r--r-- | sql/sql_parse.cc | 21 | ||||
-rw-r--r-- | sql/sql_repl.cc | 131 | ||||
-rw-r--r-- | sql/sql_repl.h | 3 |
9 files changed, 504 insertions, 99 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index adaf9aa4e31..d5e9380296e 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -482,26 +482,10 @@ rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first) } -/* - Prepare the current slave state as a string, suitable for sending to the - master to request to receive binlog events starting from that GTID state. - - The state consists of the most recently applied GTID for each domain_id, - ie. the one with the highest sub_id within each domain_id. - - Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when - a server was previously a master and now needs to connect to a new master as - a slave. For each domain_id, if the GTID in the binlog was logged with our - own server_id _and_ has a higher seq_no than what is in the slave state, - then this should be used as the position to start replicating at. This - allows to promote a slave as new master, and connect the old master as a - slave with MASTER_GTID_POS=AUTO. -*/ - int -rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) +rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data, + rpl_gtid *extra_gtids, uint32 num_extra) { - bool first= true; uint32 i; HASH gtid_hash; uchar *rec; @@ -555,7 +539,7 @@ rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) } } - if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first)) + if ((res= (*cb)(&best_gtid, data))) { unlock(); goto err; @@ -568,7 +552,7 @@ rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) for (i= 0; i < gtid_hash.records; ++i) { gtid= (rpl_gtid *)my_hash_element(>id_hash, i); - if (rpl_slave_state_tostring_helper(dest, gtid, &first)) + if ((res= (*cb)(gtid, data))) goto err; } @@ -581,6 +565,44 @@ err: } +struct rpl_slave_state_tostring_data { + String *dest; + bool first; +}; +static int +rpl_slave_state_tostring_cb(rpl_gtid *gtid, void *data) +{ + rpl_slave_state_tostring_data *p= (rpl_slave_state_tostring_data *)data; + return rpl_slave_state_tostring_helper(p->dest, gtid, &p->first); +} + + +/* + Prepare the current slave state as a string, suitable for sending to the + master to request to receive binlog events starting from that GTID state. + + The state consists of the most recently applied GTID for each domain_id, + ie. the one with the highest sub_id within each domain_id. + + Optinally, extra_gtids is a list of GTIDs from the binlog. This is used when + a server was previously a master and now needs to connect to a new master as + a slave. For each domain_id, if the GTID in the binlog was logged with our + own server_id _and_ has a higher seq_no than what is in the slave state, + then this should be used as the position to start replicating at. This + allows to promote a slave as new master, and connect the old master as a + slave with MASTER_GTID_POS=AUTO. +*/ +int +rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) +{ + struct rpl_slave_state_tostring_data data; + data.first= true; + data.dest= dest; + + return iterate(rpl_slave_state_tostring_cb, &data, extra_gtids, num_extra); +} + + /* Lookup a domain_id in the current replication slave state. @@ -626,9 +648,6 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) Parse a GTID at the start of a string, and update the pointer to point at the first character after the parsed GTID. - GTID can be in short form with domain_id=0 implied, SERVERID-SEQNO. - Or long form, DOMAINID-SERVERID-SEQNO. - Returns 0 on ok, non-zero on parse error. */ static int @@ -1217,7 +1236,7 @@ slave_connection_state::load(char *slave_request, size_t len) rpl_gtid *gtid; const rpl_gtid *gtid2; - my_hash_reset(&hash); + reset(); p= slave_request; end= slave_request + len; if (p == end) @@ -1270,7 +1289,7 @@ slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count) { uint32 i; - my_hash_reset(&hash); + reset(); for (i= 0; i < count; ++i) if (update(>id_list[i])) return 1; @@ -1278,6 +1297,28 @@ slave_connection_state::load(const rpl_gtid *gtid_list, uint32 count) } +static int +slave_connection_state_load_cb(rpl_gtid *gtid, void *data) +{ + slave_connection_state *state= (slave_connection_state *)data; + return state->update(gtid); +} + + +/* + Same as rpl_slave_state::tostring(), but populates a slave_connection_state + instead. +*/ +int +slave_connection_state::load(rpl_slave_state *state, + rpl_gtid *extra_gtids, uint32 num_extra) +{ + reset(); + return state->iterate(slave_connection_state_load_cb, this, + extra_gtids, num_extra); +} + + rpl_gtid * slave_connection_state::find(uint32 domain_id) { diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 40d51568357..4d5302020bf 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -93,6 +93,8 @@ struct rpl_slave_state int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, bool in_transaction, bool in_statement); uint64 next_subid(uint32 domain_id); + int iterate(int (*cb)(rpl_gtid *, void *), void *data, + rpl_gtid *extra_gtids, uint32 num_extra); int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra); bool domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid); int load(THD *thd, char *state_from_master, size_t len, bool reset, @@ -178,8 +180,10 @@ struct slave_connection_state slave_connection_state(); ~slave_connection_state(); + void reset() { my_hash_reset(&hash); } int load(char *slave_request, size_t len); int load(const rpl_gtid *gtid_list, uint32 count); + int load(rpl_slave_state *state, rpl_gtid *extra_gtids, uint32 num_extra); rpl_gtid *find(uint32 domain_id); int update(const rpl_gtid *in_gtid); void remove(const rpl_gtid *gtid); diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 4ffe4f37cac..4a69eb4a6ee 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -38,7 +38,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg, connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0), slave_running(0), slave_run_id(0), sync_counter(0), heartbeat_period(0), received_heartbeats(0), master_id(0), - using_gtid(USE_GTID_NO) + using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), + gtid_reconnect_event_skip_count(0), gtid_event_seen(false) { host[0] = 0; user[0] = 0; password[0] = 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; @@ -147,6 +148,23 @@ void Master_info::clear_in_memory_info(bool all) } } + +const char * +Master_info::using_gtid_astext(enum enum_using_gtid arg) +{ + switch (arg) + { + case USE_GTID_NO: + return "No"; + case USE_GTID_SLAVE_POS: + return "Slave_Pos"; + default: + DBUG_ASSERT(arg == USE_GTID_CURRENT_POS); + return "Current_Pos"; + } +} + + void init_master_log_pos(Master_info* mi) { DBUG_ENTER("init_master_log_pos"); @@ -154,6 +172,10 @@ void init_master_log_pos(Master_info* mi) mi->master_log_name[0] = 0; mi->master_log_pos = BIN_LOG_HEADER_SIZE; // skip magic number mi->using_gtid= Master_info::USE_GTID_NO; + mi->gtid_current_pos.reset(); + mi->events_queued_since_last_gtid= 0; + mi->gtid_reconnect_event_skip_count= 0; + mi->gtid_event_seen= false; /* Intentionally init ssl_verify_server_cert to 0, no option available */ mi->ssl_verify_server_cert= 0; diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index 916bd0dae02..38daed0e260 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -61,6 +61,10 @@ typedef struct st_mysql MYSQL; class Master_info : public Slave_reporting_capability { public: + enum enum_using_gtid { + USE_GTID_NO= 0, USE_GTID_CURRENT_POS= 1, USE_GTID_SLAVE_POS= 2 + }; + Master_info(LEX_STRING *connection_name, bool is_slave_recovery); ~Master_info(); bool shall_ignore_server_id(ulong s_id); @@ -70,6 +74,7 @@ class Master_info : public Slave_reporting_capability /* If malloc() in initialization failed */ return connection_name.str == 0; } + static const char *using_gtid_astext(enum enum_using_gtid arg); /* the variables below are needed because we can change masters on the fly */ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ @@ -135,9 +140,35 @@ class Master_info : public Slave_reporting_capability Note that you can not change the numeric values of these, they are used in master.info. */ - enum { - USE_GTID_NO= 0, USE_GTID_CURRENT_POS= 1, USE_GTID_SLAVE_POS= 2 - } using_gtid; + enum enum_using_gtid using_gtid; + + /* + This GTID position records how far we have fetched into the relay logs. + This is used to continue fetching when the IO thread reconnects to the + master. + + (Full slave stop/start does not use it, as it resets the relay logs). + */ + slave_connection_state gtid_current_pos; + /* + If events_queued_since_last_gtid is non-zero, it is the number of events + queued so far in the relaylog of a GTID-prefixed event group. + It is zero when no partial event group has been queued at the moment. + */ + uint64 events_queued_since_last_gtid; + /* + The GTID of the partially-queued event group, when + events_queued_since_last_gtid is non-zero. + */ + rpl_gtid last_queued_gtid; + /* + When slave IO thread needs to reconnect, gtid_reconnect_event_skip_count + counts number of events to skip from the first GTID-prefixed event group, + to avoid duplicating events in the relay log. + */ + uint64 gtid_reconnect_event_skip_count; + /* gtid_event_seen is false until we receive first GTID event from master. */ + bool gtid_event_seen; }; int init_master_info(Master_info* mi, const char* master_info_fname, const char* slave_info_fname, diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index a6a331eeb18..9acdda3b78b 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6549,3 +6549,5 @@ ER_GTID_STRICT_OUT_OF_ORDER eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled." ER_GTID_START_FROM_BINLOG_HOLE eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled" +ER_SLAVE_UNEXPECTED_MASTER_SWITCH + eng "Unexpected GTID received from master after reconnect. This normally indicates that the master server was replaced without restarting the slave threads. %s" diff --git a/sql/slave.cc b/sql/slave.cc index 6b876c5e863..edd7a06d959 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -114,7 +114,7 @@ static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]= registration on master", "Reconnecting after a failed registration on master", "failed registering on master, reconnecting to try again, \ -log '%s' at position %s", +log '%s' at position %s%s", "COM_REGISTER_SLAVE", "Slave I/O thread killed during or after reconnect" }, @@ -122,7 +122,7 @@ log '%s' at position %s", "Waiting to reconnect after a failed binlog dump request", "Slave I/O thread killed while retrying master dump", "Reconnecting after a failed binlog dump request", - "failed dump request, reconnecting to try again, log '%s' at position %s", + "failed dump request, reconnecting to try again, log '%s' at position %s%s", "COM_BINLOG_DUMP", "Slave I/O thread killed during or after reconnect" }, @@ -131,7 +131,7 @@ log '%s' at position %s", "Slave I/O thread killed while waiting to reconnect after a failed read", "Reconnecting after a failed master event read", "Slave I/O thread: Failed reading log event, reconnecting to retry, \ -log '%s' at position %s", +log '%s' at position %s%s", "", "Slave I/O thread killed during or after a reconnect done to recover from \ failed read" @@ -879,9 +879,13 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, purge_relay_logs(&mi->rli, NULL, 0, &errmsg); mi->master_log_name[0]= 0; mi->master_log_pos= 0; + error= rpl_load_gtid_state(&mi->gtid_current_pos, mi->using_gtid == + Master_info::USE_GTID_CURRENT_POS); + mi->events_queued_since_last_gtid= 0; + mi->gtid_reconnect_event_skip_count= 0; } - if (thread_mask & SLAVE_IO) + if (!error && (thread_mask & SLAVE_IO)) error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE key_thread_slave_io, @@ -1873,18 +1877,9 @@ past_checksum: after_set_capability: #endif - /* - Request dump start from slave replication GTID state. - - Only request GTID position the first time we connect after CHANGE MASTER - or after starting both IO or SQL thread. - - Otherwise, if the IO thread was ahead of the SQL thread before the - restart or reconnect, we might end up re-fetching and hence re-applying - the same event(s) again. - */ - if (mi->using_gtid != Master_info::USE_GTID_NO && !mi->master_log_name[0]) + if (mi->using_gtid != Master_info::USE_GTID_NO) { + /* Request dump to start from slave replication GTID state. */ int rc; char str_buf[256]; String query_str(str_buf, sizeof(str_buf), system_charset_info); @@ -1913,9 +1908,7 @@ after_set_capability: query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"), system_charset_info); - if (rpl_append_gtid_state(&query_str, - mi->using_gtid == - Master_info::USE_GTID_CURRENT_POS)) + if (mi->gtid_current_pos.append_to_string(&query_str)) { err_code= ER_OUTOFMEMORY; errmsg= "The slave I/O thread stops because a fatal out-of-memory " @@ -2016,7 +2009,7 @@ after_set_capability: } } } - if (mi->using_gtid == Master_info::USE_GTID_NO) + else { /* If we are not using GTID to connect this time, then instead request @@ -2588,10 +2581,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, } // Master_Server_id protocol->store((uint32) mi->master_id); - protocol->store((mi->using_gtid==Master_info::USE_GTID_NO ? "No" : - (mi->using_gtid==Master_info::USE_GTID_SLAVE_POS ? - "Slave_Pos" : "Current_Pos")), - &my_charset_bin); + protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin); if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3424,8 +3414,22 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, if (!suppress_warnings) { char buf[256], llbuff[22]; + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + mi->gtid_current_pos.append_to_string(&tmp); + if (mi->events_queued_since_last_gtid == 0) + tmp.append(STRING_WITH_LEN("'")); + else + { + tmp.append(STRING_WITH_LEN("', GTID event skip ")); + tmp.append_ulonglong((ulonglong)mi->events_queued_since_last_gtid); + } + } my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED], - IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff)); + IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff), + tmp.c_ptr_safe()); /* Raise a warining during registering on master/requesting dump. Log a message reading event. @@ -3545,11 +3549,21 @@ pthread_handler_t handle_slave_io(void *arg) // we can get killed during safe_connect if (!safe_connect(thd, mysql, mi)) { - sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," - "replication started in log '%s' at position %s", - mi->user, mi->host, mi->port, - IO_RPL_LOG_NAME, - llstr(mi->master_log_pos,llbuff)); + if (mi->using_gtid == Master_info::USE_GTID_NO) + sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," + "replication started in log '%s' at position %s", + mi->user, mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); + else + { + String tmp; + mi->gtid_current_pos.to_string(&tmp); + sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," + "replication starts at GTID position '%s'", + mi->user, mi->host, mi->port, tmp.c_ptr_safe()); + } + /* Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O thread, since a replication event can become this much larger than @@ -3566,6 +3580,25 @@ pthread_handler_t handle_slave_io(void *arg) connected: + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* + When the IO thread (re)connects to the master using GTID, it will + connect at the start of an event group. But the IO thread may have + previously logged part of the following event group to the relay + log. + + When the IO and SQL thread are started together, we erase any previous + relay logs, but this is not possible/desirable while the SQL thread is + running. To avoid duplicating partial event groups in the relay logs in + this case, we remember the count of events in any partially logged event + group before the reconnect, and then here at connect we set up a counter + to skip the already-logged part of the group. + */ + mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid; + mi->gtid_event_seen= false; + } + #ifdef ENABLED_DEBUG_SYNC DBUG_EXECUTE_IF("dbug.before_get_running_status_yes", { @@ -3791,8 +3824,19 @@ log space"); // error = 0; err: // print the current replication position - sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", - IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + if (mi->using_gtid == Master_info::USE_GTID_NO) + sql_print_information("Slave I/O thread exiting, read up to log '%s', " + "position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + else + { + String tmp; + mi->gtid_current_pos.to_string(&tmp); + sql_print_information("Slave I/O thread exiting, read up to log '%s', " + "position %s; GTID position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff), + tmp.c_ptr_safe()); + } RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); thd->reset_query(); thd->reset_db(NULL, 0); @@ -4059,10 +4103,20 @@ pthread_handler_t handle_slave_sql(void *arg) rli->group_master_log_name, llstr(rli->group_master_log_pos,llbuff))); if (global_system_variables.log_warnings) + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, + mi->using_gtid==Master_info::USE_GTID_CURRENT_POS); + tmp.append(STRING_WITH_LEN("'")); + } sql_print_information("Slave SQL thread initialized, starting replication in \ -log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, +log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name, - llstr(rli->group_relay_log_pos,llbuff1)); + llstr(rli->group_relay_log_pos,llbuff1), tmp.c_ptr_safe()); + } if (check_temp_dir(rli->slave_patternload_file)) { @@ -4196,16 +4250,35 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); } if (udf_error) + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } sql_print_error("Error loading user-defined library, slave SQL " "thread aborted. Install the missing library, and restart the " "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " - "position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, - llbuff)); + "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, + llbuff), tmp.c_ptr_safe()); + } else + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } sql_print_error("\ Error running query, slave SQL thread aborted. Fix the problem, and restart \ the slave SQL thread with \"SLAVE START\". We stopped at log \ -'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff)); +'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff), + tmp.c_ptr_safe()); + } } goto err; } @@ -4213,9 +4286,20 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ } /* Thread stopped. Print the current replication position to the log */ - sql_print_information("Slave SQL thread exiting, replication stopped in log " - "'%s' at position %s", - RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_information("Slave SQL thread exiting, replication stopped in " + "log '%s' at position %s%s", + RPL_LOG_NAME, + llstr(rli->group_master_log_pos,llbuff), + tmp.c_ptr_safe()); + } err: @@ -4690,6 +4774,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); ulong s_id; bool unlock_data_lock= TRUE; + bool gtid_skip_enqueue= false; + /* FD_q must have been prepared for the first R_a event inside get_master_version_and_clock() @@ -4877,6 +4963,19 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg; /* + Do not queue any format description event that we receive after a + reconnect where we are skipping over a partial event group received + before the reconnect. + + (If we queued such an event, and it was the first format_description + event after master restart, the slave SQL thread would think that + the partial event group before it in the relay log was from a + previous master crash and should be rolled back. + */ + if (unlikely(mi->gtid_reconnect_event_skip_count)) + gtid_skip_enqueue= true; + + /* Though this does some conversion to the slave's format, this will preserve the master's binlog format version, and number of event types. */ @@ -4971,18 +5070,113 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) " UNTIL master_gtid_pos %s", str.c_ptr_safe()); mi->abort_slave= true; } + event_pos= glev->log_pos; delete glev; /* - Do not update position for fake Gtid_list event (which has a zero - end_log_pos). + We use fake Gtid_list events to update the old-style position (among + other things). + + Early code created fake Gtid_list events with zero log_pos, those should + not modify old-style position. */ - inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; + if (event_pos == 0 || event_pos <= mi->master_log_pos) + inc_pos= 0; + else + inc_pos= event_pos - mi->master_log_pos; + } + break; + + case GTID_EVENT: + { + uchar dummy_flag; + + if (mi->using_gtid == Master_info::USE_GTID_NO) + goto default_action; + if (unlikely(!mi->gtid_event_seen)) + { + mi->gtid_event_seen= true; + if (mi->gtid_reconnect_event_skip_count) + { + rpl_gtid gtid; + + /* + If we are reconnecting, and we need to skip a partial event group + already queued to the relay log before the reconnect, then we check + that we actually get the same event group (same GTID) as before, so + we do not end up with half of one group and half another. + + The only way we should be able to receive a different GTID than what + we expect is if the binlog on the master (or more likely the whole + master server) was replaced with a different one, one the same IP + address, _and_ the new master happens to have domains in a different + order so we get the GTID from a different domain first. Still, it is + best to protect against this case. + */ + if (Gtid_log_event::peek(buf, event_len, checksum_alg, + >id.domain_id, >id.server_id, + >id.seq_no, &dummy_flag)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + if (gtid.domain_id != mi->last_queued_gtid.domain_id || + gtid.server_id != mi->last_queued_gtid.server_id || + gtid.seq_no != mi->last_queued_gtid.seq_no) + { + bool first; + error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH; + error_msg.append(STRING_WITH_LEN("Expected: ")); + first= true; + rpl_slave_state_tostring_helper(&error_msg, &mi->last_queued_gtid, + &first); + error_msg.append(STRING_WITH_LEN(", received: ")); + first= true; + rpl_slave_state_tostring_helper(&error_msg, >id, &first); + goto err; + } + } + } + + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + goto default_action; + } + + /* + We have successfully queued to relay log everything before this GTID, so + in case of reconnect we can start from after any previous GTID. + */ + if (mi->events_queued_since_last_gtid) + { + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + } + if (Gtid_log_event::peek(buf, event_len, checksum_alg, + &mi->last_queued_gtid.domain_id, + &mi->last_queued_gtid.server_id, + &mi->last_queued_gtid.seq_no, &dummy_flag)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + ++mi->events_queued_since_last_gtid; } break; default: default_action: + if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen) + { + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + --mi->gtid_reconnect_event_skip_count; + gtid_skip_enqueue= true; + } + else if (mi->events_queued_since_last_gtid) + ++mi->events_queued_since_last_gtid; + } + inc_pos= event_len; break; } @@ -5067,8 +5261,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } else { - /* write the event to the relay log */ - if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) + /* + Write the event to the relay log, unless we reconnected in the middle + of an event group and now need to skip the initial part of the group that + we already wrote before reconnecting. + */ + if (unlikely(gtid_skip_enqueue)) + { + mi->master_log_pos+= inc_pos; + } + else if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) { mi->master_log_pos+= inc_pos; DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index c6372099600..4625a61f22c 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2776,13 +2776,34 @@ end_with_restore_list: { LEX_MASTER_INFO* lex_mi= &thd->lex->mi; Master_info *mi; + int load_error; + + load_error= rpl_load_gtid_slave_state(thd); + mysql_mutex_lock(&LOCK_active_mi); if ((mi= (master_info_index-> get_master_info(&lex_mi->connection_name, MYSQL_ERROR::WARN_LEVEL_ERROR)))) + { + if (load_error) + { + /* + We cannot start a slave using GTID if we cannot load the GTID position + from the mysql.gtid_slave_pos table. But we can allow non-GTID + replication (useful eg. during upgrade). + */ + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + mysql_mutex_unlock(&LOCK_active_mi); + break; + } + else + thd->clear_error(); + } if (!start_slave(thd, mi, 1 /* net report*/)) my_ok(thd); + } mysql_mutex_unlock(&LOCK_active_mi); break; } diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index fd9ead71472..d8aebbde8dc 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -50,7 +50,7 @@ extern TYPELIB binlog_checksum_typelib; static int fake_event_header(String* packet, Log_event_type event_type, ulong extra_len, my_bool *do_checksum, ha_checksum *crc, const char** errmsg, - uint8 checksum_alg_arg) + uint8 checksum_alg_arg, uint32 end_pos) { char header[LOG_EVENT_HEADER_LEN]; ulong event_len; @@ -70,7 +70,7 @@ fake_event_header(String* packet, Log_event_type event_type, ulong extra_len, int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); // TODO: check what problems this may cause and fix them - int4store(header + LOG_POS_OFFSET, 0); + int4store(header + LOG_POS_OFFSET, end_pos); if (packet->append(header, sizeof(header))) { *errmsg= "Failed due to out-of-memory writing event"; @@ -146,7 +146,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, if ((err= fake_event_header(packet, ROTATE_EVENT, ident_len + ROTATE_HEADER_LEN, &do_checksum, &crc, - errmsg, checksum_alg_arg))) + errmsg, checksum_alg_arg, 0))) DBUG_RETURN(err); int8store(buf+R_POS_OFFSET,position); @@ -169,7 +169,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, static int fake_gtid_list_event(NET* net, String* packet, Gtid_list_log_event *glev, const char** errmsg, - uint8 checksum_alg_arg) + uint8 checksum_alg_arg, uint32 current_pos) { my_bool do_checksum; int err; @@ -185,7 +185,7 @@ static int fake_gtid_list_event(NET* net, String* packet, } if ((err= fake_event_header(packet, GTID_LIST_EVENT, str.length(), &do_checksum, &crc, - errmsg, checksum_alg_arg))) + errmsg, checksum_alg_arg, current_pos))) return err; packet->append(str); @@ -1406,7 +1406,7 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset, enum_gtid_until_state gtid_until_group, Log_event_type event_type, uint8 current_checksum_alg, ushort flags, const char **errmsg, - rpl_binlog_state *until_binlog_state) + rpl_binlog_state *until_binlog_state, uint32 current_pos) { switch (gtid_until_group) { @@ -1437,7 +1437,8 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset, return true; Gtid_list_log_event glev(until_binlog_state, Gtid_list_log_event::FLAG_UNTIL_REACHED); - if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg)) + if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg, + current_pos)) return true; *errmsg= NULL; return true; @@ -1508,6 +1509,19 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, return "Failed to read Gtid_log_event: corrupt binlog"; } + DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100", + { + rpl_gtid *dbug_gtid; + if ((dbug_gtid= until_binlog_state->find(10,1)) && + dbug_gtid->seq_no == 100) + { + DBUG_SET("-d,gtid_force_reconnect_at_10_1_100"); + DBUG_SET_INITIAL("-d,gtid_force_reconnect_at_10_1_100"); + my_errno= ER_UNKNOWN_ERROR; + return "DBUG-injected forced reconnect"; + } + }); + if (until_binlog_state->update(&event_gtid, false)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; @@ -1527,19 +1541,31 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, if (event_gtid.server_id == gtid->server_id && event_gtid.seq_no >= gtid->seq_no) { - /* - In strict mode, it is an error if the slave requests to start in - a "hole" in the master's binlog: a GTID that does not exist, even - though both the prior and subsequent seq_no exists for same - domain_id and server_id. - */ - if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no) + if (event_gtid.seq_no > gtid->seq_no) + { + /* + In strict mode, it is an error if the slave requests to start + in a "hole" in the master's binlog: a GTID that does not + exist, even though both the prior and subsequent seq_no exists + for same domain_id and server_id. + */ + if (slave_gtid_strict_mode) + { + my_errno= ER_GTID_START_FROM_BINLOG_HOLE; + *error_gtid= *gtid; + return "The binlog on the master is missing the GTID requested " + "by the slave (even though both a prior and a subsequent " + "sequence number does exist), and GTID strict mode is enabled."; + } + } + else { - my_errno= ER_GTID_START_FROM_BINLOG_HOLE; - *error_gtid= *gtid; - return "The binlog on the master is missing the GTID requested " - "by the slave (even though both a prior and a subsequent " - "sequence number does exist), and GTID strict mode is enabled."; + /* + Send a fake Gtid_list event to the slave. + This allows the slave to update its current binlog position + so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work. + */ +// send_fake_gtid_list_event(until_binlog_state); } /* Delete this entry if we have reached slave start position (so we @@ -1797,6 +1823,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, int old_max_allowed_packet= thd->variables.max_allowed_packet; #ifndef DBUG_OFF int left_events = max_binlog_dump_events; + uint dbug_reconnect_counter= 0; #endif DBUG_ENTER("mysql_binlog_send"); DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); @@ -1830,6 +1857,13 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, until_gtid_state= &until_gtid_state_obj; } + DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events", + { + DBUG_SET("-d,binlog_force_reconnect_after_22_events"); + DBUG_SET_INITIAL("-d,binlog_force_reconnect_after_22_events"); + dbug_reconnect_counter= 22; + }); + /* We want to corrupt the first event, in Log_event::read_log_event(). But we do not want the corruption to happen early, eg. when client does @@ -2176,6 +2210,19 @@ impossible position"; (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; } +#ifndef DBUG_OFF + if (dbug_reconnect_counter > 0) + { + --dbug_reconnect_counter; + if (dbug_reconnect_counter == 0) + { + errmsg= "DBUG-injected forced reconnect"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + } +#endif + if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, log_file_name, &log, mariadb_slave_capability, ev_offset, @@ -2191,7 +2238,7 @@ impossible position"; if (until_gtid_state && is_until_reached(thd, net, packet, &ev_offset, gtid_until_group, event_type, current_checksum_alg, flags, &errmsg, - &until_binlog_state)) + &until_binlog_state, my_b_tell(&log))) { if (errmsg) { @@ -2373,7 +2420,7 @@ impossible position"; until_gtid_state && is_until_reached(thd, net, packet, &ev_offset, gtid_until_group, event_type, current_checksum_alg, flags, &errmsg, - &until_binlog_state)) + &until_binlog_state, my_b_tell(&log))) { if (errmsg) { @@ -2970,6 +3017,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) char saved_host[HOSTNAME_LENGTH + 1]; uint saved_port; char saved_log_name[FN_REFLEN]; + Master_info::enum_using_gtid saved_using_gtid; char master_info_file_tmp[FN_REFLEN]; char relay_log_info_file_tmp[FN_REFLEN]; my_off_t saved_log_pos; @@ -3059,6 +3107,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) saved_port= mi->port; strmake(saved_log_name, mi->master_log_name, FN_REFLEN - 1); saved_log_pos= mi->master_log_pos; + saved_using_gtid= mi->using_gtid; /* If the user specified host or port without binlog or position, @@ -3291,6 +3340,11 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added) "master_log_pos='%ld'.", saved_host, saved_port, saved_log_name, (ulong) saved_log_pos, mi->host, mi->port, mi->master_log_name, (ulong) mi->master_log_pos); + if (saved_using_gtid != Master_info::USE_GTID_NO || + mi->using_gtid != Master_info::USE_GTID_NO) + sql_print_information("Previous Using_Gtid=%s. New Using_Gtid=%s", + mi->using_gtid_astext(saved_using_gtid), + mi->using_gtid_astext(mi->using_gtid)); /* If we don't write new coordinates to disk now, then old will remain in @@ -3753,11 +3807,11 @@ rpl_deinit_gtid_slave_state() /* - Format the current GTID state as a string, for use when connecting to a - master server with GTID, or for returning the value of @@global.gtid_state. + Format the current GTID state as a string, for returning the value of + @@global.gtid_slave_pos. If the flag use_binlog is true, then the contents of the binary log (if - enabled) is merged into the current GTID state. + enabled) is merged into the current GTID state (@@global.gtid_current_pos). */ int rpl_append_gtid_state(String *dest, bool use_binlog) @@ -3770,10 +3824,35 @@ rpl_append_gtid_state(String *dest, bool use_binlog) (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) return err; - rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids); + err= rpl_global_gtid_slave_state.tostring(dest, gtid_list, num_gtids); my_free(gtid_list); - return 0; + return err; +} + + +/* + Load the current GITD position into a slave_connection_state, for use when + connecting to a master server with GTID. + + If the flag use_binlog is true, then the contents of the binary log (if + enabled) is merged into the current GTID state (master_use_gtid=current_pos). +*/ +int +rpl_load_gtid_state(slave_connection_state *state, bool use_binlog) +{ + int err; + rpl_gtid *gtid_list= NULL; + uint32 num_gtids= 0; + + if (use_binlog && opt_bin_log && + (err= mysql_bin_log.get_most_recent_gtid_list(>id_list, &num_gtids))) + return err; + + err= state->load(&rpl_global_gtid_slave_state, gtid_list, num_gtids); + my_free(gtid_list); + + return err; } diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 820ffed0928..a242fa4aeef 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -32,6 +32,8 @@ typedef struct st_slave_info THD* thd; } SLAVE_INFO; +class slave_connection_state; + extern my_bool opt_show_slave_auth_info; extern char *master_host, *master_info_file; extern bool server_id_supplied; @@ -70,6 +72,7 @@ void rpl_init_gtid_slave_state(); void rpl_deinit_gtid_slave_state(); int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str); int rpl_append_gtid_state(String *dest, bool use_binlog); +int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog); bool rpl_gtid_pos_check(THD *thd, char *str, size_t len); bool rpl_gtid_pos_update(THD *thd, char *str, size_t len); |