diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 379 |
1 files changed, 325 insertions, 54 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index c7f4dc08096..46a7ddb28f3 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -114,7 +114,7 @@ static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]= registration on master", "Reconnecting after a failed registration on master", "failed registering on master, reconnecting to try again, \ -log '%s' at position %s", +log '%s' at position %s%s", "COM_REGISTER_SLAVE", "Slave I/O thread killed during or after reconnect" }, @@ -122,7 +122,7 @@ log '%s' at position %s", "Waiting to reconnect after a failed binlog dump request", "Slave I/O thread killed while retrying master dump", "Reconnecting after a failed binlog dump request", - "failed dump request, reconnecting to try again, log '%s' at position %s", + "failed dump request, reconnecting to try again, log '%s' at position %s%s", "COM_BINLOG_DUMP", "Slave I/O thread killed during or after reconnect" }, @@ -131,7 +131,7 @@ log '%s' at position %s", "Slave I/O thread killed while waiting to reconnect after a failed read", "Reconnecting after a failed master event read", "Slave I/O thread: Failed reading log event, reconnecting to retry, \ -log '%s' at position %s", +log '%s' at position %s%s", "", "Slave I/O thread killed during or after a reconnect done to recover from \ failed read" @@ -253,6 +253,66 @@ static void init_slave_psi_keys(void) } #endif /* HAVE_PSI_INTERFACE */ + +static bool slave_init_thread_running; + + +pthread_handler_t +handle_slave_init(void *arg __attribute__((unused))) +{ + THD *thd; + + my_thread_init(); + thd= new THD; + thd->thread_stack= (char*) &thd; /* Set approximate stack start */ + mysql_mutex_lock(&LOCK_thread_count); + thd->thread_id= thread_id++; + mysql_mutex_unlock(&LOCK_thread_count); + thd->store_globals(); + + thd_proc_info(thd, "Loading slave GTID position from table"); + if (rpl_load_gtid_slave_state(thd)) + sql_print_warning("Failed to load slave replication state from table " + "%s.%s: %u: %s", "mysql", + rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + + mysql_mutex_lock(&LOCK_thread_count); + delete thd; + mysql_mutex_unlock(&LOCK_thread_count); + my_thread_end(); + + mysql_mutex_lock(&LOCK_thread_count); + slave_init_thread_running= false; + mysql_cond_signal(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + return 0; +} + + +static int +run_slave_init_thread() +{ + pthread_t th; + + slave_init_thread_running= true; + if (mysql_thread_create(key_thread_slave_init, &th, NULL, + handle_slave_init, NULL)) + { + sql_print_error("Failed to create thread while initialising slave"); + return 1; + } + + mysql_mutex_lock(&LOCK_thread_count); + while (slave_init_thread_running) + mysql_cond_wait(&COND_thread_count, &LOCK_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + return 0; +} + + /* Initialize slave structures */ int init_slave() @@ -264,6 +324,9 @@ int init_slave() init_slave_psi_keys(); #endif + if (run_slave_init_thread()) + return 1; + /* This is called when mysqld starts. Before client connections are accepted. However bootstrap may conflict with us if it does START SLAVE. @@ -381,16 +444,13 @@ int init_recovery(Master_info* mi, const char** errmsg) { mi->master_log_pos= max(BIN_LOG_HEADER_SIZE, rli->group_master_log_pos); - strmake(mi->master_log_name, rli->group_master_log_name, - sizeof(mi->master_log_name)-1); + strmake_buf(mi->master_log_name, rli->group_master_log_name); sql_print_warning("Recovery from master pos %ld and file %s.", (ulong) mi->master_log_pos, mi->master_log_name); - strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(), - sizeof(rli->group_relay_log_name)-1); - strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(), - sizeof(mi->rli.event_relay_log_name)-1); + strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); + strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; } @@ -830,9 +890,13 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, purge_relay_logs(&mi->rli, NULL, 0, &errmsg); mi->master_log_name[0]= 0; mi->master_log_pos= 0; + error= rpl_load_gtid_state(&mi->gtid_current_pos, mi->using_gtid == + Master_info::USE_GTID_CURRENT_POS); + mi->events_queued_since_last_gtid= 0; + mi->gtid_reconnect_event_skip_count= 0; } - if (thread_mask & SLAVE_IO) + if (!error && (thread_mask & SLAVE_IO)) error= start_slave_thread( #ifdef HAVE_PSI_INTERFACE key_thread_slave_io, @@ -1824,18 +1888,9 @@ past_checksum: after_set_capability: #endif - /* - Request dump start from slave replication GTID state. - - Only request GTID position the first time we connect after CHANGE MASTER - or after starting both IO or SQL thread. - - Otherwise, if the IO thread was ahead of the SQL thread before the - restart or reconnect, we might end up re-fetching and hence re-applying - the same event(s) again. - */ - if (mi->using_gtid != Master_info::USE_GTID_NO && !mi->master_log_name[0]) + if (mi->using_gtid != Master_info::USE_GTID_NO) { + /* Request dump to start from slave replication GTID state. */ int rc; char str_buf[256]; String query_str(str_buf, sizeof(str_buf), system_charset_info); @@ -1864,9 +1919,7 @@ after_set_capability: query_str.append(STRING_WITH_LEN("SET @slave_connect_state='"), system_charset_info); - if (rpl_append_gtid_state(&query_str, - mi->using_gtid == - Master_info::USE_GTID_CURRENT_POS)) + if (mi->gtid_current_pos.append_to_string(&query_str)) { err_code= ER_OUTOFMEMORY; errmsg= "The slave I/O thread stops because a fatal out-of-memory " @@ -1967,7 +2020,7 @@ after_set_capability: } } } - if (mi->using_gtid == Master_info::USE_GTID_NO) + else { /* If we are not using GTID to connect this time, then instead request @@ -2548,10 +2601,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, protocol->store(mi->ssl_ca, &my_charset_bin); // Master_Ssl_Crlpath protocol->store(mi->ssl_capath, &my_charset_bin); - protocol->store((mi->using_gtid==Master_info::USE_GTID_NO ? "No" : - (mi->using_gtid==Master_info::USE_GTID_SLAVE_POS ? - "Slave_Pos" : "Current_Pos")), - &my_charset_bin); + protocol->store(mi->using_gtid_astext(mi->using_gtid), &my_charset_bin); if (full) { protocol->store((uint32) mi->rli.retried_trans); @@ -3383,8 +3433,22 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi, if (!suppress_warnings) { char buf[256], llbuff[22]; + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + mi->gtid_current_pos.append_to_string(&tmp); + if (mi->events_queued_since_last_gtid == 0) + tmp.append(STRING_WITH_LEN("'")); + else + { + tmp.append(STRING_WITH_LEN("', GTID event skip ")); + tmp.append_ulonglong((ulonglong)mi->events_queued_since_last_gtid); + } + } my_snprintf(buf, sizeof(buf), messages[SLAVE_RECON_MSG_FAILED], - IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff)); + IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff), + tmp.c_ptr_safe()); /* Raise a warining during registering on master/requesting dump. Log a message reading event. @@ -3476,6 +3540,16 @@ pthread_handler_t handle_slave_io(void *arg) /* This must be called before run any binlog_relay_io hooks */ my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi); + /* Load the set of seen GTIDs, if we did not already. */ + if (rpl_load_gtid_slave_state(thd)) + { + mi->report(ERROR_LEVEL, thd->stmt_da->sql_errno(), + "Unable to load replication GTID slave state from mysql.%s: %s", + rpl_gtid_slave_state_table_name.str, thd->stmt_da->message()); + goto err; + } + + if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, @@ -3494,11 +3568,21 @@ pthread_handler_t handle_slave_io(void *arg) // we can get killed during safe_connect if (!safe_connect(thd, mysql, mi)) { - sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," - "replication started in log '%s' at position %s", - mi->user, mi->host, mi->port, - IO_RPL_LOG_NAME, - llstr(mi->master_log_pos,llbuff)); + if (mi->using_gtid == Master_info::USE_GTID_NO) + sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," + "replication started in log '%s' at position %s", + mi->user, mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); + else + { + String tmp; + mi->gtid_current_pos.to_string(&tmp); + sql_print_information("Slave I/O thread: connected to master '%s@%s:%d'," + "replication starts at GTID position '%s'", + mi->user, mi->host, mi->port, tmp.c_ptr_safe()); + } + /* Adding MAX_LOG_EVENT_HEADER_LEN to the max_packet_size on the I/O thread, since a replication event can become this much larger than @@ -3515,6 +3599,25 @@ pthread_handler_t handle_slave_io(void *arg) connected: + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* + When the IO thread (re)connects to the master using GTID, it will + connect at the start of an event group. But the IO thread may have + previously logged part of the following event group to the relay + log. + + When the IO and SQL thread are started together, we erase any previous + relay logs, but this is not possible/desirable while the SQL thread is + running. To avoid duplicating partial event groups in the relay logs in + this case, we remember the count of events in any partially logged event + group before the reconnect, and then here at connect we set up a counter + to skip the already-logged part of the group. + */ + mi->gtid_reconnect_event_skip_count= mi->events_queued_since_last_gtid; + mi->gtid_event_seen= false; + } + #ifdef ENABLED_DEBUG_SYNC DBUG_EXECUTE_IF("dbug.before_get_running_status_yes", { @@ -3740,8 +3843,19 @@ log space"); // error = 0; err: // print the current replication position - sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", - IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + if (mi->using_gtid == Master_info::USE_GTID_NO) + sql_print_information("Slave I/O thread exiting, read up to log '%s', " + "position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + else + { + String tmp; + mi->gtid_current_pos.to_string(&tmp); + sql_print_information("Slave I/O thread exiting, read up to log '%s', " + "position %s; GTID position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff), + tmp.c_ptr_safe()); + } RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); thd->reset_query(); thd->reset_db(NULL, 0); @@ -4008,10 +4122,20 @@ pthread_handler_t handle_slave_sql(void *arg) rli->group_master_log_name, llstr(rli->group_master_log_pos,llbuff))); if (global_system_variables.log_warnings) + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, + mi->using_gtid==Master_info::USE_GTID_CURRENT_POS); + tmp.append(STRING_WITH_LEN("'")); + } sql_print_information("Slave SQL thread initialized, starting replication in \ -log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, +log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name, - llstr(rli->group_relay_log_pos,llbuff1)); + llstr(rli->group_relay_log_pos,llbuff1), tmp.c_ptr_safe()); + } if (check_temp_dir(rli->slave_patternload_file)) { @@ -4049,8 +4173,8 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, mysql_mutex_lock(&rli->data_lock); if (rli->slave_skip_counter) { - strmake(saved_log_name, rli->group_relay_log_name, FN_REFLEN - 1); - strmake(saved_master_log_name, rli->group_master_log_name, FN_REFLEN - 1); + strmake_buf(saved_log_name, rli->group_relay_log_name); + strmake_buf(saved_master_log_name, rli->group_master_log_name); saved_log_pos= rli->group_relay_log_pos; saved_master_log_pos= rli->group_master_log_pos; saved_skip= rli->slave_skip_counter; @@ -4145,16 +4269,35 @@ log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME, sql_print_warning("Slave: %s Error_code: %d", err->get_message_text(), err->get_sql_errno()); } if (udf_error) + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } sql_print_error("Error loading user-defined library, slave SQL " "thread aborted. Install the missing library, and restart the " "slave SQL thread with \"SLAVE START\". We stopped at log '%s' " - "position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, - llbuff)); + "position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, + llbuff), tmp.c_ptr_safe()); + } else + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } sql_print_error("\ Error running query, slave SQL thread aborted. Fix the problem, and restart \ the slave SQL thread with \"SLAVE START\". We stopped at log \ -'%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff)); +'%s' position %s%s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff), + tmp.c_ptr_safe()); + } } goto err; } @@ -4162,9 +4305,20 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ } /* Thread stopped. Print the current replication position to the log */ - sql_print_information("Slave SQL thread exiting, replication stopped in log " - "'%s' at position %s", - RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); + { + String tmp; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + tmp.append(STRING_WITH_LEN("; GTID position '")); + rpl_append_gtid_state(&tmp, false); + tmp.append(STRING_WITH_LEN("'")); + } + sql_print_information("Slave SQL thread exiting, replication stopped in " + "log '%s' at position %s%s", + RPL_LOG_NAME, + llstr(rli->group_master_log_pos,llbuff), + tmp.c_ptr_safe()); + } err: @@ -4639,6 +4793,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mysql_mutex_t *log_lock= rli->relay_log.get_log_lock(); ulong s_id; bool unlock_data_lock= TRUE; + bool gtid_skip_enqueue= false; + /* FD_q must have been prepared for the first R_a event inside get_master_version_and_clock() @@ -4826,6 +4982,19 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg; /* + Do not queue any format description event that we receive after a + reconnect where we are skipping over a partial event group received + before the reconnect. + + (If we queued such an event, and it was the first format_description + event after master restart, the slave SQL thread would think that + the partial event group before it in the relay log was from a + previous master crash and should be rolled back). + */ + if (unlikely(mi->gtid_reconnect_event_skip_count && !mi->gtid_event_seen)) + gtid_skip_enqueue= true; + + /* Though this does some conversion to the slave's format, this will preserve the master's binlog format version, and number of event types. */ @@ -4920,18 +5089,113 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) " UNTIL master_gtid_pos %s", str.c_ptr_safe()); mi->abort_slave= true; } + event_pos= glev->log_pos; delete glev; /* - Do not update position for fake Gtid_list event (which has a zero - end_log_pos). + We use fake Gtid_list events to update the old-style position (among + other things). + + Early code created fake Gtid_list events with zero log_pos, those should + not modify old-style position. */ - inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; + if (event_pos == 0 || event_pos <= mi->master_log_pos) + inc_pos= 0; + else + inc_pos= event_pos - mi->master_log_pos; + } + break; + + case GTID_EVENT: + { + uchar dummy_flag; + + if (mi->using_gtid == Master_info::USE_GTID_NO) + goto default_action; + if (unlikely(!mi->gtid_event_seen)) + { + mi->gtid_event_seen= true; + if (mi->gtid_reconnect_event_skip_count) + { + rpl_gtid gtid; + + /* + If we are reconnecting, and we need to skip a partial event group + already queued to the relay log before the reconnect, then we check + that we actually get the same event group (same GTID) as before, so + we do not end up with half of one group and half another. + + The only way we should be able to receive a different GTID than what + we expect is if the binlog on the master (or more likely the whole + master server) was replaced with a different one, one the same IP + address, _and_ the new master happens to have domains in a different + order so we get the GTID from a different domain first. Still, it is + best to protect against this case. + */ + if (Gtid_log_event::peek(buf, event_len, checksum_alg, + >id.domain_id, >id.server_id, + >id.seq_no, &dummy_flag)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + if (gtid.domain_id != mi->last_queued_gtid.domain_id || + gtid.server_id != mi->last_queued_gtid.server_id || + gtid.seq_no != mi->last_queued_gtid.seq_no) + { + bool first; + error= ER_SLAVE_UNEXPECTED_MASTER_SWITCH; + error_msg.append(STRING_WITH_LEN("Expected: ")); + first= true; + rpl_slave_state_tostring_helper(&error_msg, &mi->last_queued_gtid, + &first); + error_msg.append(STRING_WITH_LEN(", received: ")); + first= true; + rpl_slave_state_tostring_helper(&error_msg, >id, &first); + goto err; + } + } + } + + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + goto default_action; + } + + /* + We have successfully queued to relay log everything before this GTID, so + in case of reconnect we can start from after any previous GTID. + */ + if (mi->events_queued_since_last_gtid) + { + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + } + if (Gtid_log_event::peek(buf, event_len, checksum_alg, + &mi->last_queued_gtid.domain_id, + &mi->last_queued_gtid.server_id, + &mi->last_queued_gtid.seq_no, &dummy_flag)) + { + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + goto err; + } + ++mi->events_queued_since_last_gtid; } break; default: default_action: + if (mi->using_gtid != Master_info::USE_GTID_NO && mi->gtid_event_seen) + { + if (unlikely(mi->gtid_reconnect_event_skip_count)) + { + --mi->gtid_reconnect_event_skip_count; + gtid_skip_enqueue= true; + } + else if (mi->events_queued_since_last_gtid) + ++mi->events_queued_since_last_gtid; + } + inc_pos= event_len; break; } @@ -5016,8 +5280,16 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } else { - /* write the event to the relay log */ - if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) + /* + Write the event to the relay log, unless we reconnected in the middle + of an event group and now need to skip the initial part of the group that + we already wrote before reconnecting. + */ + if (unlikely(gtid_skip_enqueue)) + { + mi->master_log_pos+= inc_pos; + } + else if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) { mi->master_log_pos+= inc_pos; DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos)); @@ -5755,8 +6027,7 @@ static Log_event* next_event(Relay_log_info* rli) goto err; } rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE; - strmake(rli->event_relay_log_name,rli->linfo.log_file_name, - sizeof(rli->event_relay_log_name)-1); + strmake_buf(rli->event_relay_log_name,rli->linfo.log_file_name); flush_relay_log_info(rli); } |