summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-10-20 20:13:45 -0600
committerBrandon Nesterenko <brandon.nesterenko@mariadb.com>2021-11-01 09:51:30 -0600
commite9c3de0502a14b720ead3cf053c3d3f4363b65c5 (patch)
tree237f202a5e949a8a162f5f607f631f92831ae52e /sql
parent36f8cca6f31941ca6bf5f45cbfdbc9ea676707d9 (diff)
downloadmariadb-git-10.4-MDEV-11853.tar.gz
MDEV-11853: semisync thread can be killed after sync binlog but before ACK in the sync state10.4-MDEV-11853
Problem: ======== If a primary is shutdown during an active semi-sync connection during the period when the primary is awaiting an ACK, the primary hard kills the active communication thread and does not ensure the transaction was received by a replica. This can lead to an inconsistent replication state. Solution: ======== During shutdown, the primary should wait for an ACK or timeout before hard killing a thread which is awaiting a communication. We extend the `SHUTDOWN WAIT FOR SLAVES` logic to identify and ignore any threads waiting for a semi-sync ACK in phase 1. Then, before stopping the ack receiver thread, the shutdown is delayed until all waiting semi-sync connections receive an ACK or time out. The connections are then killed in phase 2. Reviewed By: ============
Diffstat (limited to 'sql')
-rw-r--r--sql/mysqld.cc113
-rw-r--r--sql/semisync_master.cc60
-rw-r--r--sql/semisync_master.h13
-rw-r--r--sql/semisync_master_ack_receiver.cc1
-rw-r--r--sql/semisync_slave.cc48
-rw-r--r--sql/semisync_slave.h2
-rw-r--r--sql/slave.cc13
-rw-r--r--sql/sql_class.cc3
-rw-r--r--sql/sql_class.h14
9 files changed, 218 insertions, 49 deletions
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index ec77366129a..fde371ec991 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -1522,10 +1522,20 @@ static void kill_thread(THD *thd)
/**
First shutdown everything but slave threads and binlog dump connections
*/
-static my_bool kill_thread_phase_1(THD *thd, void *)
+static my_bool kill_thread_phase_1(THD *thd, DYNAMIC_ARRAY *phase_2_kill_threads)
{
DBUG_PRINT("quit", ("Informing thread %ld that it's time to die",
(ulong) thd->thread_id));
+
+ if (thd->is_awaiting_semisync_ack())
+ {
+ insert_dynamic(phase_2_kill_threads, (const void *) &(thd->thread_id));
+ DBUG_PRINT("quit",
+ ("Thread %ld kill delayed to phase 2", (ulong) thd->thread_id));
+
+ return 0;
+ }
+
if (thd->slave_thread || thd->is_binlog_dump_thread())
return 0;
@@ -1542,30 +1552,64 @@ static my_bool kill_thread_phase_1(THD *thd, void *)
/**
Last shutdown binlog dump connections
*/
-static my_bool kill_thread_phase_2(THD *thd, void *)
+static my_bool kill_thread_phase_2(THD *thd, DYNAMIC_ARRAY *phase_2_kill_threads)
{
if (shutdown_wait_for_slaves)
{
- thd->set_killed(KILL_SERVER);
- }
- else
- {
- thd->set_killed(KILL_SERVER_HARD);
- MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+ uint i;
+ long long unsigned int test_tid;
+ bool hard_kill= FALSE;
+
+ for(i= 0; i < phase_2_kill_threads->elements; i++)
+ {
+ get_dynamic(phase_2_kill_threads, (void *)(&test_tid), i);
+ if (test_tid == thd->thread_id)
+ {
+ hard_kill= TRUE;
+ break;
+ }
+ }
+
+ if (!hard_kill)
+ {
+ thd->set_killed(KILL_SERVER);
+ goto end;
+ }
}
+
+ thd->set_killed(KILL_SERVER_HARD);
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+
+end:
kill_thread(thd);
return 0;
}
/* associated with the kill thread phase 1 */
-static my_bool warn_threads_active_after_phase_1(THD *thd, void *)
+static my_bool warn_threads_active_after_phase_1(THD *thd, DYNAMIC_ARRAY *phase_2_kill_threads)
{
- if (!thd->is_binlog_dump_thread() && thd->vio_ok())
- sql_print_warning("%s: Thread %llu (user : '%s') did not exit\n", my_progname,
- (ulonglong) thd->thread_id,
- (thd->main_security_ctx.user ?
- thd->main_security_ctx.user : ""));
+ uint i;
+ long long unsigned int test_tid;
+
+ if (thd->is_binlog_dump_thread() || !thd->vio_ok())
+ goto end;
+
+ /* If the thread is identified for phase 2 kill */
+ for(i= 0; i < phase_2_kill_threads->elements; i++)
+ {
+ get_dynamic(phase_2_kill_threads, (void *)(&test_tid), i);
+ if (test_tid == thd->thread_id)
+ goto end;
+ }
+
+ /* Unknown reason for this thread being alive */
+ sql_print_warning("%s: Thread %llu (user : '%s') did not exit\n", my_progname,
+ (ulonglong) thd->thread_id,
+ (thd->main_security_ctx.user ?
+ thd->main_security_ctx.user : ""));
+
+end:
return 0;
}
@@ -1727,7 +1771,28 @@ static void close_connections(void)
This will give the threads some time to gracefully abort their
statements and inform their clients that the server is about to die.
*/
- server_threads.iterate(kill_thread_phase_1);
+ DYNAMIC_ARRAY phase_2_hard_kill_threads;
+ my_init_dynamic_array(&phase_2_hard_kill_threads, sizeof(long long unsigned int), 4, 4, MYF(0));
+ DBUG_EXECUTE_IF("mysqld_delay_kill_threads_phase_1", my_sleep(200000););
+ server_threads.iterate(kill_thread_phase_1, &phase_2_hard_kill_threads);
+
+ /*
+ If we are waiting on any ACKs, delay killing the thread until either an ACK
+ is received or the timeout is hit.
+
+ Allow at max the number of sessions to await a timeout; however, if all
+ ACKs have been received in less iterations, then quit early
+ */
+ if (shutdown_wait_for_slaves)
+ {
+ int delay_attempts= rpl_semi_sync_master_wait_sessions;
+ if (delay_attempts)
+ sql_print_information("Delaying shutdown to await semi-sync ACK");
+
+ while (rpl_semi_sync_master_wait_sessions && (delay_attempts-- > 0))
+ repl_semisync_master.await_slave_reply();
+ }
+ DBUG_EXECUTE_IF("delay_shutdown_phase_2_after_semisync_wait", my_sleep(500000););
Events::deinit();
slave_prepare_for_shutdown();
@@ -1750,11 +1815,15 @@ static void close_connections(void)
*/
DBUG_PRINT("info", ("THD_count: %u", THD_count::value()));
- for (int i= 0; (THD_count::value() - binlog_dump_thread_count) && i < 1000; i++)
+ for (int i= 0; (THD_count::value() - binlog_dump_thread_count -
+ phase_2_hard_kill_threads.elements) &&
+ i < 1000;
+ i++)
my_sleep(20000);
if (global_system_variables.log_warnings)
- server_threads.iterate(warn_threads_active_after_phase_1);
+ server_threads.iterate(warn_threads_active_after_phase_1,
+ &phase_2_hard_kill_threads);
#ifdef WITH_WSREP
if (wsrep_inited == 1)
@@ -1763,13 +1832,17 @@ static void close_connections(void)
}
#endif
/* All threads has now been aborted */
- DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)", THD_count::value()));
+ DBUG_PRINT("quit", ("Waiting for threads to die (count=%u)",
+ THD_count::value() - binlog_dump_thread_count -
+ phase_2_hard_kill_threads.elements));
- while (THD_count::value() - binlog_dump_thread_count)
+ while (THD_count::value() - binlog_dump_thread_count -
+ phase_2_hard_kill_threads.elements)
my_sleep(1000);
/* Kill phase 2 */
- server_threads.iterate(kill_thread_phase_2);
+ server_threads.iterate(kill_thread_phase_2, &phase_2_hard_kill_threads);
+ delete_dynamic(&phase_2_hard_kill_threads);
for (uint64 i= 0; THD_count::value(); i++)
{
/*
diff --git a/sql/semisync_master.cc b/sql/semisync_master.cc
index b239a9776a7..8c07775d254 100644
--- a/sql/semisync_master.cc
+++ b/sql/semisync_master.cc
@@ -463,6 +463,28 @@ void Repl_semi_sync_master::cleanup()
delete m_active_tranxs;
}
+void Repl_semi_sync_master::create_timeout(struct timespec *out,
+ struct timespec *start_arg)
+{
+ struct timespec *start_ts;
+ struct timespec now_ts;
+ if (!start_arg)
+ {
+ set_timespec(now_ts, 0);
+ start_ts= &now_ts;
+ }
+ else
+ {
+ start_ts= start_arg;
+ }
+
+ long diff_secs= (long) (m_wait_timeout / TIME_THOUSAND);
+ long diff_nsecs= (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION);
+ long nsecs= start_ts->tv_nsec + diff_nsecs;
+ out->tv_sec= start_ts->tv_sec + diff_secs + nsecs / TIME_BILLION;
+ out->tv_nsec= nsecs % TIME_BILLION;
+}
+
void Repl_semi_sync_master::lock()
{
mysql_mutex_lock(&LOCK_binlog);
@@ -862,13 +884,6 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
m_wait_file_name, (ulong)m_wait_file_pos));
}
- /* Calcuate the waiting period. */
- long diff_secs = (long) (m_wait_timeout / TIME_THOUSAND);
- long diff_nsecs = (long) ((m_wait_timeout % TIME_THOUSAND) * TIME_MILLION);
- long nsecs = start_ts.tv_nsec + diff_nsecs;
- abstime.tv_sec = start_ts.tv_sec + diff_secs + nsecs/TIME_BILLION;
- abstime.tv_nsec = nsecs % TIME_BILLION;
-
/* In semi-synchronous replication, we wait until the binlog-dump
* thread has received the reply on the relevant binlog segment from the
* replication slave.
@@ -876,16 +891,28 @@ int Repl_semi_sync_master::commit_trx(const char* trx_wait_binlog_name,
* Let us suspend this thread to wait on the condition;
* when replication has progressed far enough, we will release
* these waiting threads.
+ *
+ * Additionally, the thread state and system variable which represent
+ * this suspended thread need to be synchronized to ensure this thread is
+ * not immediately killed while awaiting the ACK if a shutdown is issued.
*/
+ mysql_mutex_lock(&thd->LOCK_thd_data);
rpl_semi_sync_master_wait_sessions++;
+ thd->set_awaiting_semisync_ack(TRUE);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
DBUG_PRINT("semisync", ("%s: wait %lu ms for binlog sent (%s, %lu)",
"Repl_semi_sync_master::commit_trx",
m_wait_timeout,
m_wait_file_name, (ulong)m_wait_file_pos));
+ create_timeout(&abstime, &start_ts);
wait_result = cond_timewait(&abstime);
+
+ mysql_mutex_lock(&thd->LOCK_thd_data);
rpl_semi_sync_master_wait_sessions--;
+ thd->set_awaiting_semisync_ack(FALSE);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
if (wait_result != 0)
{
@@ -1319,6 +1346,25 @@ void Repl_semi_sync_master::set_export_stats()
unlock();
}
+void Repl_semi_sync_master::await_slave_reply()
+{
+ struct timespec abstime;
+
+ DBUG_ENTER("Repl_semi_sync_master::::await_slave_reply");
+ lock();
+
+ /* Just return if there is nothing to wait for */
+ if (!rpl_semi_sync_master_wait_sessions)
+ goto end;
+
+ create_timeout(&abstime, NULL);
+ cond_timewait(&abstime);
+
+end:
+ unlock();
+ DBUG_VOID_RETURN;
+}
+
/* Get the waiting time given the wait's staring time.
*
* Return:
diff --git a/sql/semisync_master.h b/sql/semisync_master.h
index 74f6c24c8ea..3f0af1bfe17 100644
--- a/sql/semisync_master.h
+++ b/sql/semisync_master.h
@@ -472,6 +472,19 @@ class Repl_semi_sync_master
m_wait_timeout = wait_timeout;
}
+ /*
+ Calculates a timeout that is m_wait_timeout after start_arg and saves it
+ in out. If start_arg is NULL, the timeout is m_wait_timeout after the
+ current system time.
+ */
+ void create_timeout(struct timespec *out, struct timespec *start_arg);
+
+ /*
+ Blocks the calling thread until the ack_receiver either receives an ACK
+ or times out (from rpl_semi_sync_master_timeout)
+ */
+ void await_slave_reply();
+
/*set the ACK point, after binlog sync or after transaction commit*/
void set_wait_point(unsigned long ack_point)
{
diff --git a/sql/semisync_master_ack_receiver.cc b/sql/semisync_master_ack_receiver.cc
index b24d6452480..691c5a70e04 100644
--- a/sql/semisync_master_ack_receiver.cc
+++ b/sql/semisync_master_ack_receiver.cc
@@ -257,6 +257,7 @@ void Ack_receiver::run()
continue;
}
+
set_stage_info(stage_reading_semi_sync_ack);
Slave_ilist_iterator it(m_slaves);
while ((slave= it++))
diff --git a/sql/semisync_slave.cc b/sql/semisync_slave.cc
index df8baf045ac..b09571d6dc7 100644
--- a/sql/semisync_slave.cc
+++ b/sql/semisync_slave.cc
@@ -114,11 +114,19 @@ int Repl_semi_sync_slave::slave_start(Master_info *mi)
int Repl_semi_sync_slave::slave_stop(Master_info *mi)
{
+ int ret= 0;
+ if (get_slave_enabled())
+ {
+ if ((ret= kill_connection(mi->mysql)))
+ {
+ sql_print_warning("Failed to kill the active semi-sync connection");
+ }
+ }
+
if (rpl_semi_sync_slave_status)
rpl_semi_sync_slave_status= 0;
- if (get_slave_enabled())
- kill_connection(mi->mysql);
- return 0;
+
+ return ret;
}
int Repl_semi_sync_slave::reset_slave(Master_info *mi)
@@ -126,31 +134,35 @@ int Repl_semi_sync_slave::reset_slave(Master_info *mi)
return 0;
}
-void Repl_semi_sync_slave::kill_connection(MYSQL *mysql)
+int Repl_semi_sync_slave::kill_connection(MYSQL *mysql)
{
- if (!mysql)
- return;
-
+ bool ret= 0;
char kill_buffer[30];
MYSQL *kill_mysql = NULL;
+ size_t kill_buffer_length;
+
+ if (!mysql)
+ goto end;
+
kill_mysql = mysql_init(kill_mysql);
mysql_options(kill_mysql, MYSQL_OPT_CONNECT_TIMEOUT, &m_kill_conn_timeout);
mysql_options(kill_mysql, MYSQL_OPT_READ_TIMEOUT, &m_kill_conn_timeout);
mysql_options(kill_mysql, MYSQL_OPT_WRITE_TIMEOUT, &m_kill_conn_timeout);
- bool ret= (!mysql_real_connect(kill_mysql, mysql->host,
+ ret= (!mysql_real_connect(kill_mysql, mysql->host,
mysql->user, mysql->passwd,0, mysql->port, mysql->unix_socket, 0));
- if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", 1, 0) || ret)
- {
- sql_print_information("cannot connect to master to kill slave io_thread's "
- "connection");
- mysql_close(kill_mysql);
- return;
- }
- size_t kill_buffer_length = my_snprintf(kill_buffer, 30, "KILL %lu",
- mysql->thread_id);
- mysql_real_query(kill_mysql, kill_buffer, (ulong)kill_buffer_length);
+ if (DBUG_EVALUATE_IF("semisync_slave_failed_kill", (ret= 1), 0) || ret)
+ goto end;
+
+ DBUG_EXECUTE_IF("slave_delay_killing_semisync_connection", my_sleep(400000););
+
+ kill_buffer_length=
+ my_snprintf(kill_buffer, 30, "KILL %lu", mysql->thread_id);
+ ret= mysql_real_query(kill_mysql, kill_buffer, (ulong)kill_buffer_length);
+
+end:
mysql_close(kill_mysql);
+ return ret;
}
int Repl_semi_sync_slave::request_transmit(Master_info *mi)
diff --git a/sql/semisync_slave.h b/sql/semisync_slave.h
index d65262f151d..85f515caaf8 100644
--- a/sql/semisync_slave.h
+++ b/sql/semisync_slave.h
@@ -90,7 +90,7 @@ public:
int slave_start(Master_info *mi);
int slave_stop(Master_info *mi);
int request_transmit(Master_info*);
- void kill_connection(MYSQL *mysql);
+ int kill_connection(MYSQL *mysql);
int reset_slave(Master_info *mi);
private:
diff --git a/sql/slave.cc b/sql/slave.cc
index 31bd9372a14..e712e8a7010 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -4520,6 +4520,7 @@ pthread_handler_t handle_slave_io(void *arg)
uint retry_count;
bool suppress_warnings;
int ret;
+ int err_stopping_semisync;
rpl_io_thread_info io_info;
#ifndef DBUG_OFF
mi->dbug_do_disconnect= false;
@@ -4848,6 +4849,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
not cause the slave IO thread to stop, and the error messages are
already reported.
*/
+ DBUG_EXECUTE_IF("simulate_delay_semisync_slave_reply", my_sleep(800000););
(void)repl_semisync_slave.slave_reply(mi);
}
@@ -4919,7 +4921,7 @@ err:
tmp.c_ptr_safe());
sql_print_information("master was %s:%d", mi->host, mi->port);
}
- repl_semisync_slave.slave_stop(mi);
+ err_stopping_semisync= repl_semisync_slave.slave_stop(mi);
thd->reset_query();
thd->reset_db(&null_clex_str);
if (mysql)
@@ -4935,7 +4937,14 @@ err:
#ifdef SIGNAL_WITH_VIO_CLOSE
thd->clear_active_vio();
#endif
- mysql_close(mysql);
+ /*
+ Don't close the connection if it is semi-sync and failed to close
+ gracefully in `repl_semisync_slave.slave_stop`, e.g. if the master is
+ shutting down and force killed the kill_mysql connection before issuing
+ `KILL {mysql->thread_id}`. Let it time out instead.
+ */
+ if (!err_stopping_semisync)
+ mysql_close(mysql);
mi->mysql=0;
}
write_ignored_events_info_to_relay_log(thd, mi);
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 28bf77c94e8..f0ee3f39ba5 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -640,7 +640,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
#ifdef HAVE_REPLICATION
,
current_linfo(0),
- slave_info(0)
+ slave_info(0),
+ awaiting_semisync_ack(0)
#endif
#ifdef WITH_WSREP
,
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 7570211f586..a65e21f80c1 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4826,12 +4826,26 @@ public:
LOG_INFO *current_linfo;
Slave_info *slave_info;
+ /*
+ Indicates if this thread is suspunded due to awaiting an ACK from a
+ replica. True if suspended, false otherwise.
+ */
+ bool awaiting_semisync_ack;
+
void set_current_linfo(LOG_INFO *linfo);
void reset_current_linfo() { set_current_linfo(0); }
+ void set_awaiting_semisync_ack(bool status) { awaiting_semisync_ack= status; }
int register_slave(uchar *packet, size_t packet_length);
void unregister_slave();
bool is_binlog_dump_thread();
+ bool is_awaiting_semisync_ack()
+ {
+ mysql_mutex_lock(&LOCK_thd_data);
+ bool res= awaiting_semisync_ack;
+ mysql_mutex_unlock(&LOCK_thd_data);
+ return res;
+ }
#endif
inline ulong wsrep_binlog_format() const