summaryrefslogtreecommitdiff
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc94
1 files changed, 81 insertions, 13 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index cfba0ace2cb..df2cbc3f4d3 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -37,6 +37,8 @@
#include <cstdio>
#include <cstdlib>
#include "log_event.h"
+#include <list>
+#include <algorithm>
wsrep_t *wsrep = NULL;
/*
@@ -131,6 +133,8 @@ mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
+mysql_mutex_t LOCK_wsrep_kill;
+mysql_cond_t COND_wsrep_kill;
int wsrep_replaying= 0;
ulong wsrep_running_threads = 0; // # of currently running wsrep
@@ -138,6 +142,7 @@ ulong wsrep_running_threads = 0; // # of currently running wsrep
ulong wsrep_running_applier_threads = 0; // # of running applier threads
ulong wsrep_running_rollbacker_threads = 0; // # of running
// # rollbacker threads
+ulong wsrep_running_killer_threads = 0;
ulong my_bind_addr;
#ifdef HAVE_PSI_INTERFACE
@@ -145,11 +150,13 @@ PSI_mutex_key key_LOCK_wsrep_rollback,
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
- key_LOCK_wsrep_config_state;
+ key_LOCK_wsrep_config_state,
+ key_LOCK_wsrep_kill;
PSI_cond_key key_COND_wsrep_rollback,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
- key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
+ key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
+ key_COND_wsrep_kill;
PSI_file_key key_file_wsrep_gra_log;
@@ -164,7 +171,8 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
- { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
+ { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_kill, "LOCK_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_cond_info wsrep_conds[]=
@@ -174,7 +182,8 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
- { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
+ { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_kill, "COND_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_file_info wsrep_files[]=
@@ -183,14 +192,15 @@ static PSI_file_info wsrep_files[]=
};
PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
- key_wsrep_rollbacker, key_wsrep_applier;
+ key_wsrep_rollbacker, key_wsrep_applier, key_wsrep_killer;
static PSI_thread_info wsrep_threads[]=
{
{&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
- {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}
+ {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL},
+ {&key_wsrep_killer, "wsrep_killer_thread", PSI_FLAG_GLOBAL}
};
#endif /* HAVE_PSI_INTERFACE */
@@ -237,6 +247,7 @@ wsp::Config_state *wsrep_config_state;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
+std::list< wsrep_kill_t > wsrep_kill_list;
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
@@ -829,6 +840,8 @@ void wsrep_thr_init()
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_kill, &LOCK_wsrep_kill, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_kill, &COND_wsrep_kill, NULL);
DBUG_VOID_RETURN;
}
@@ -865,6 +878,7 @@ void wsrep_init_startup (bool first)
if (!wsrep_start_replication()) unireg_abort(1);
wsrep_create_rollbacker();
+ wsrep_create_killer();
wsrep_create_appliers(1);
if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
@@ -906,6 +920,8 @@ void wsrep_thr_deinit()
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_desync);
mysql_mutex_destroy(&LOCK_wsrep_config_state);
+ mysql_mutex_destroy(&LOCK_wsrep_kill);
+ mysql_cond_destroy(&COND_wsrep_kill);
delete wsrep_config_state;
wsrep_config_state= 0; // Safety
}
@@ -1657,7 +1673,7 @@ static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_,
if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
{
- WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
+ WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd));
return 1;
}
@@ -2147,9 +2163,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
case WSREP_ROLLBACKER_THREAD:
wsrep_running_rollbacker_threads++;
break;
+ case WSREP_KILLER_THREAD:
+ wsrep_running_killer_threads++;
+ thd->wsrep_killer= true;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
mysql_cond_broadcast(&COND_thread_count);
@@ -2172,9 +2192,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
wsrep_running_rollbacker_threads--;
break;
+ case WSREP_KILLER_THREAD:
+ DBUG_ASSERT(wsrep_running_killer_threads > 0);
+ wsrep_running_killer_threads--;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
my_free(args);
@@ -2416,7 +2440,11 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
}
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
- WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for rollbacker threads to close: %lu", wsrep_running_rollbacker_threads);
+ WSREP_DEBUG("Waiting for applier threads to close: %lu", wsrep_running_applier_threads);
+ WSREP_DEBUG("Waiting for killer threads to close: %lu", wsrep_running_killer_threads);
+ WSREP_DEBUG("Waiting for wsrep threads to close: %lu", wsrep_running_threads);
while (wait_to_end && have_client_connections())
{
@@ -2450,7 +2478,7 @@ void wsrep_close_threads(THD *thd)
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
- if (tmp->wsrep_applier && tmp != thd)
+ if ((tmp->wsrep_applier || tmp->wsrep_killer) && tmp != thd)
{
WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
wsrep_close_thread (tmp);
@@ -2464,7 +2492,7 @@ void wsrep_wait_appliers_close(THD *thd)
{
/* Wait for wsrep appliers to gracefully exit */
mysql_mutex_lock(&LOCK_thread_count);
- while (wsrep_running_threads > 1)
+ while (wsrep_running_threads > 2)
// 1 is for rollbacker thread which needs to be killed explicitly.
// This gotta be fixed in a more elegant manner if we gonna have arbitrary
// number of non-applier wsrep threads.
@@ -2738,9 +2766,12 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
{
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
+ mysql_mutex_assert_owner(&thd->LOCK_thd_kill);
+
if (signal)
{
- thd->awake(KILL_QUERY);
+ thd->awake_no_mutex(KILL_QUERY);
}
else
{
@@ -2748,6 +2779,9 @@ extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
mysql_cond_broadcast(&COND_wsrep_replaying);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
}
+
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
}
@@ -3001,3 +3035,37 @@ bool wsrep_node_is_synced()
{
return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
}
+
+bool wsrep_enqueue_background_kill(wsrep_kill_t item)
+{
+ std::list< wsrep_kill_t >::iterator it;
+ bool inserted= false;
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ for (it = wsrep_kill_list.begin(); it != wsrep_kill_list.end(); it++)
+ {
+ if ((*it).victim_thd_id == item.victim_thd_id)
+ break;
+ }
+
+ if(it != wsrep_kill_list.end())
+ {
+ WSREP_DEBUG("Thread: %lu already on kill list", item.victim_thd_id);
+ }
+ else
+ {
+ wsrep_kill_list.push_back(item);
+ mysql_cond_signal(&COND_wsrep_kill);
+ inserted= true;
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ return inserted;
+}
+
+void wsrep_LOCK(THD* thd)
+{
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
+}