summaryrefslogtreecommitdiff
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc102
1 files changed, 89 insertions, 13 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index ba6c2d24f77..54dda0a1caf 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -38,7 +38,8 @@
#include <cstdlib>
#include "log_event.h"
#include "sql_plugin.h" /* wsrep_plugins_pre_init() */
-#include <vector>
+#include <list>
+#include <algorithm>
wsrep_t *wsrep = NULL;
/*
@@ -133,6 +134,8 @@ mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
+mysql_mutex_t LOCK_wsrep_kill;
+mysql_cond_t COND_wsrep_kill;
int wsrep_replaying= 0;
ulong wsrep_running_threads = 0; // # of currently running wsrep
@@ -140,6 +143,7 @@ ulong wsrep_running_threads = 0; // # of currently running wsrep
ulong wsrep_running_applier_threads = 0; // # of running applier threads
ulong wsrep_running_rollbacker_threads = 0; // # of running
// # rollbacker threads
+ulong wsrep_running_killer_threads = 0;
ulong my_bind_addr;
#ifdef HAVE_PSI_INTERFACE
@@ -147,11 +151,13 @@ PSI_mutex_key key_LOCK_wsrep_rollback,
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
- key_LOCK_wsrep_config_state;
+ key_LOCK_wsrep_config_state,
+ key_LOCK_wsrep_kill;
PSI_cond_key key_COND_wsrep_rollback,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
- key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
+ key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
+ key_COND_wsrep_kill;
PSI_file_key key_file_wsrep_gra_log;
@@ -166,7 +172,8 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
- { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
+ { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_kill, "LOCK_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_cond_info wsrep_conds[]=
@@ -176,7 +183,8 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
- { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
+ { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_kill, "COND_wsrep_kill", PSI_FLAG_GLOBAL}
};
static PSI_file_info wsrep_files[]=
@@ -185,14 +193,15 @@ static PSI_file_info wsrep_files[]=
};
PSI_thread_key key_wsrep_sst_joiner, key_wsrep_sst_donor,
- key_wsrep_rollbacker, key_wsrep_applier;
+ key_wsrep_rollbacker, key_wsrep_applier, key_wsrep_killer;
static PSI_thread_info wsrep_threads[]=
{
{&key_wsrep_sst_joiner, "wsrep_sst_joiner_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_sst_donor, "wsrep_sst_donor_thread", PSI_FLAG_GLOBAL},
{&key_wsrep_rollbacker, "wsrep_rollbacker_thread", PSI_FLAG_GLOBAL},
- {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL}
+ {&key_wsrep_applier, "wsrep_applier_thread", PSI_FLAG_GLOBAL},
+ {&key_wsrep_killer, "wsrep_killer_thread", PSI_FLAG_GLOBAL}
};
#endif /* HAVE_PSI_INTERFACE */
@@ -239,6 +248,7 @@ wsp::Config_state *wsrep_config_state;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
+std::list< wsrep_kill_t* > wsrep_kill_list;
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
@@ -820,6 +830,8 @@ void wsrep_thr_init()
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_kill, &LOCK_wsrep_kill, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_kill, &COND_wsrep_kill, NULL);
DBUG_VOID_RETURN;
}
@@ -856,6 +868,7 @@ void wsrep_init_startup (bool first)
if (!wsrep_start_replication()) unireg_abort(1);
wsrep_create_rollbacker();
+ wsrep_create_killer();
wsrep_create_appliers(1);
if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
@@ -897,6 +910,8 @@ void wsrep_thr_deinit()
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_desync);
mysql_mutex_destroy(&LOCK_wsrep_config_state);
+ mysql_mutex_destroy(&LOCK_wsrep_kill);
+ mysql_cond_destroy(&COND_wsrep_kill);
delete wsrep_config_state;
wsrep_config_state= 0; // Safety
}
@@ -1648,7 +1663,7 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
{
- WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
+ WSREP_DEBUG("No TOI for %s", wsrep_thd_query(thd));
return 1;
}
@@ -2144,9 +2159,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
case WSREP_ROLLBACKER_THREAD:
wsrep_running_rollbacker_threads++;
break;
+ case WSREP_KILLER_THREAD:
+ wsrep_running_killer_threads++;
+ thd->wsrep_killer= true;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
mysql_cond_broadcast(&COND_thread_count);
@@ -2169,9 +2188,13 @@ pthread_handler_t start_wsrep_THD(void *arg)
DBUG_ASSERT(wsrep_running_rollbacker_threads > 0);
wsrep_running_rollbacker_threads--;
break;
+ case WSREP_KILLER_THREAD:
+ DBUG_ASSERT(wsrep_running_killer_threads > 0);
+ wsrep_running_killer_threads--;
+ break;
default:
WSREP_ERROR("Incorrect wsrep thread type: %d", args->thread_type);
- break;
+ assert(0);
}
my_free(args);
@@ -2418,7 +2441,11 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
}
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
- WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for client connections to close: %u", thread_count);
+ WSREP_DEBUG("Waiting for rollbacker threads to close: %lu", wsrep_running_rollbacker_threads);
+ WSREP_DEBUG("Waiting for applier threads to close: %lu", wsrep_running_applier_threads);
+ WSREP_DEBUG("Waiting for killer threads to close: %lu", wsrep_running_killer_threads);
+ WSREP_DEBUG("Waiting for wsrep threads to close: %lu", wsrep_running_threads);
while (wait_to_end && have_client_connections())
{
@@ -2452,7 +2479,7 @@ void wsrep_close_threads(THD *thd)
DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
(longlong) tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */
- if (tmp->wsrep_applier && tmp != thd)
+ if ((tmp->wsrep_applier || tmp->wsrep_killer) && tmp != thd)
{
WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
wsrep_close_thread (tmp);
@@ -2466,7 +2493,7 @@ void wsrep_wait_appliers_close(THD *thd)
{
/* Wait for wsrep appliers to gracefully exit */
mysql_mutex_lock(&LOCK_thread_count);
- while (wsrep_running_threads > 1)
+ while (wsrep_running_threads > 2)
// 1 is for rollbacker thread which needs to be killed explicitly.
// This gotta be fixed in a more elegant manner if we gonna have arbitrary
// number of non-applier wsrep threads.
@@ -2995,3 +3022,52 @@ bool wsrep_node_is_synced()
{
return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
}
+
+bool wsrep_enqueue_background_kill(wsrep_kill_t *item)
+{
+ std::list< wsrep_kill_t* >::iterator it;
+ bool inserted= false;
+
+ mysql_mutex_lock(&LOCK_wsrep_kill);
+
+ for (it = wsrep_kill_list.begin(); it != wsrep_kill_list.end(); it++)
+ {
+ if ((*it)->victim_thd == item->victim_thd)
+ break;
+ }
+
+ if(it != wsrep_kill_list.end())
+ {
+ WSREP_DEBUG("Thread: %lld query: %s already on kill list",
+ item->victim_thd->thread_id, wsrep_thd_query(item->victim_thd));
+ }
+ else
+ {
+ wsrep_kill_list.push_back(item);
+ mysql_cond_signal(&COND_wsrep_kill);
+ inserted= true;
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_kill);
+ return inserted;
+}
+
+wsrep_kill_t* wsrep_dequeue_background_kill()
+{
+ wsrep_kill_t* item= NULL;
+
+ mysql_mutex_assert_owner(&LOCK_wsrep_kill);
+
+ if (!wsrep_kill_list.empty())
+ {
+ item= wsrep_kill_list.front();
+ wsrep_kill_list.pop_front();
+ }
+
+ return item;
+}
+
+bool wsrep_kill_empty()
+{
+ return wsrep_kill_list.empty();
+}