summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/thread_pool_priv.h3
-rw-r--r--libmysqld/lib_sql.cc9
-rw-r--r--plugin/feedback/sender_thread.cc9
-rw-r--r--plugin/handler_socket/handlersocket/database.cpp6
-rw-r--r--sql/event_scheduler.cc24
-rw-r--r--sql/mysqld.cc264
-rw-r--r--sql/mysqld.h4
-rw-r--r--sql/rpl_parallel.cc4
-rw-r--r--sql/slave.cc8
-rw-r--r--sql/sql_class.cc8
-rw-r--r--sql/sql_class.h101
-rw-r--r--sql/sql_connect.cc2
-rw-r--r--sql/sql_insert.cc16
-rw-r--r--sql/sql_parse.cc113
-rw-r--r--sql/sql_plugin.cc30
-rw-r--r--sql/sql_repl.cc110
-rw-r--r--sql/sql_show.cc495
-rw-r--r--sql/threadpool_common.cc2
-rw-r--r--sql/threadpool_generic.cc50
-rw-r--r--sql/wsrep_mysqld.cc165
20 files changed, 703 insertions, 720 deletions
diff --git a/include/thread_pool_priv.h b/include/thread_pool_priv.h
index f5fdbfbdf47..cd53306e851 100644
--- a/include/thread_pool_priv.h
+++ b/include/thread_pool_priv.h
@@ -61,9 +61,6 @@ void thd_set_mysys_var(THD *thd, st_my_thread_var *mysys_var);
my_socket thd_get_fd(THD *thd);
int thd_store_globals(THD* thd);
-THD *first_global_thread();
-THD *next_global_thread(THD *thd);
-
/* Print to the MySQL error log */
void sql_print_error(const char *format, ...);
diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc
index 715f1dde5b5..305f6346c9e 100644
--- a/libmysqld/lib_sql.cc
+++ b/libmysqld/lib_sql.cc
@@ -432,11 +432,9 @@ int emb_unbuffered_fetch(MYSQL *mysql, char **row)
static void emb_free_embedded_thd(MYSQL *mysql)
{
THD *thd= (THD*)mysql->thd;
- mysql_mutex_lock(&LOCK_thread_count);
+ server_threads.erase(thd);
thd->clear_data_list();
thd->store_globals();
- thd->unlink();
- mysql_mutex_unlock(&LOCK_thread_count);
delete thd;
my_pthread_setspecific_ptr(THR_THD, 0);
mysql->thd=0;
@@ -711,10 +709,7 @@ void *create_embedded_thd(int client_flag)
thd->first_data= 0;
thd->data_tail= &thd->first_data;
bzero((char*) &thd->net, sizeof(thd->net));
-
- mysql_mutex_lock(&LOCK_thread_count);
- threads.append(thd);
- mysql_mutex_unlock(&LOCK_thread_count);
+ server_threads.insert(thd);
thd->mysys_var= 0;
thd->reset_globals();
return thd;
diff --git a/plugin/feedback/sender_thread.cc b/plugin/feedback/sender_thread.cc
index b025879b6ee..591023d61fd 100644
--- a/plugin/feedback/sender_thread.cc
+++ b/plugin/feedback/sender_thread.cc
@@ -90,9 +90,7 @@ static int prepare_for_fill(TABLE_LIST *tables)
in SHOW STATUS and we want to avoid skewing the statistics)
*/
thd->variables.pseudo_thread_id= thd->thread_id;
- mysql_mutex_lock(&LOCK_thread_count);
- threads.append(thd);
- mysql_mutex_unlock(&LOCK_thread_count);
+ server_threads.insert(thd);
thd->thread_stack= (char*) &tables;
if (thd->store_globals())
return 1;
@@ -258,12 +256,9 @@ ret:
reset all thread local status variables to minimize
the effect of the background thread on SHOW STATUS.
*/
- mysql_mutex_lock(&LOCK_thread_count);
+ server_threads.erase(thd);
thd->set_status_var_init();
thd->killed= KILL_CONNECTION;
- thd->unlink();
- mysql_cond_broadcast(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
delete thd;
thd= 0;
}
diff --git a/plugin/handler_socket/handlersocket/database.cpp b/plugin/handler_socket/handlersocket/database.cpp
index a76428b29d3..52ea8f2a8c4 100644
--- a/plugin/handler_socket/handlersocket/database.cpp
+++ b/plugin/handler_socket/handlersocket/database.cpp
@@ -280,7 +280,7 @@ dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
DBG_THR(fprintf(stderr,
"thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu "
"O: %zu %zu %zu %zu %zu %zu %zu\n",
- thd->thread_stack, sizeof(THD), sizeof(LOCK_thread_count),
+ thd->thread_stack, sizeof(THD), sizeof(mysql_mutex_t),
DENA_THR_OFFSETOF(mdl_context),
DENA_THR_OFFSETOF(net),
DENA_THR_OFFSETOF(LOCK_thd_data),
@@ -307,7 +307,7 @@ dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
}
{
thd->thread_id = next_thread_id();
- add_to_active_threads(thd);
+ server_threads.insert(thd);
}
DBG_THR(fprintf(stderr, "HNDSOCK init thread wsts\n"));
@@ -341,10 +341,8 @@ dbcontext::term_thread()
close_tables_if();
my_pthread_setspecific_ptr(THR_THD, 0);
{
- pthread_mutex_lock(&LOCK_thread_count);
delete thd;
thd = 0;
- pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end();
}
}
diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc
index f459fd34aee..99b3c9b93fb 100644
--- a/sql/event_scheduler.cc
+++ b/sql/event_scheduler.cc
@@ -150,7 +150,7 @@ deinit_event_thread(THD *thd)
{
thd->proc_info= "Clearing";
DBUG_PRINT("exit", ("Event thread finishing"));
- unlink_not_visible_thd(thd);
+ server_threads.erase(thd);
delete thd;
}
@@ -185,7 +185,7 @@ pre_init_event_thread(THD* thd)
thd->net.read_timeout= slave_net_timeout;
thd->variables.option_bits|= OPTION_AUTO_IS_NULL;
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
- add_to_active_threads(thd);
+ server_threads.insert(thd);
/*
Guarantees that we will see the thread in SHOW PROCESSLIST though its
@@ -679,20 +679,20 @@ end:
Event_scheduler::workers_count()
*/
+static my_bool workers_count_callback(THD *thd, uint32_t *count)
+{
+ if (thd->system_thread == SYSTEM_THREAD_EVENT_WORKER)
+ ++*count;
+ return 0;
+}
+
+
uint
Event_scheduler::workers_count()
{
- THD *tmp;
- uint count= 0;
-
+ uint32_t count= 0;
DBUG_ENTER("Event_scheduler::workers_count");
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
- ++count;
- mysql_mutex_unlock(&LOCK_thread_count);
- DBUG_PRINT("exit", ("%d", count));
+ server_threads.iterate(workers_count_callback, &count);
DBUG_RETURN(count);
}
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index f2065a55aed..e8806f2d3cb 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -651,26 +651,11 @@ Le_creator le_creator;
int bootstrap_error;
-I_List<THD> threads;
+THD_list server_threads;
Rpl_filter* cur_rpl_filter;
Rpl_filter* global_rpl_filter;
Rpl_filter* binlog_filter;
-THD *first_global_thread()
-{
- if (threads.is_empty())
- return NULL;
- return threads.head();
-}
-
-THD *next_global_thread(THD *thd)
-{
- if (threads.is_last(thd))
- return NULL;
- struct ilink *next= thd->next;
- return static_cast<THD*>(next);
-}
-
struct system_variables global_system_variables;
/**
Following is just for options parsing, used with a difference against
@@ -707,12 +692,7 @@ pthread_key(THD*, THR_THD);
/*
LOCK_thread_count protects the following variables:
thread_count Number of threads with THD that servers queries.
- threads Linked list of active THD's.
- The effect of this is that one can't unlink and
- delete a THD as long as one has locked
- LOCK_thread_count.
- ready_to_exit
- delayed_insert_threads
+ ready_to_exit
*/
mysql_mutex_t LOCK_thread_count;
@@ -910,7 +890,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages,
key_LOCK_start_thread,
- key_LOCK_thread_count, key_LOCK_thread_cache,
+ key_LOCK_thread_count, key_Thread_map_mutex, key_LOCK_thread_cache,
key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_relaylog_end_pos;
@@ -1004,6 +984,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
+ { &key_Thread_map_mutex, "Thread_map::mutex", PSI_FLAG_GLOBAL },
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
{ &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
{ &key_LOCK_slave_state, "LOCK_slave_state", 0},
@@ -1549,6 +1530,106 @@ static void end_ssl();
** Code to end mysqld
****************************************************************************/
+static my_bool kill_all_threads(THD *thd, void *)
+{
+ DBUG_PRINT("quit", ("Informing thread %ld that it's time to die",
+ (ulong) thd->thread_id));
+ /* We skip slave threads on this first loop through. */
+ if (thd->slave_thread)
+ return 0;
+
+ if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0))
+ return 0;
+
+#ifdef WITH_WSREP
+ /* skip wsrep system threads as well */
+ if (WSREP(thd) && (wsrep_thd_is_applying(thd) || thd->wsrep_applier))
+ return 0;
+#endif
+ thd->set_killed(KILL_SERVER_HARD);
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+ if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
+ if (thd->mysys_var)
+ {
+ thd->mysys_var->abort= 1;
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ if (thd->mysys_var->current_cond)
+ {
+ for (uint i= 0; i < 2; i++)
+ {
+ int ret= mysql_mutex_trylock(thd->mysys_var->current_mutex);
+ mysql_cond_broadcast(thd->mysys_var->current_cond);
+ if (!ret)
+ {
+ /* Thread has surely got the signal, unlock and abort */
+ mysql_mutex_unlock(thd->mysys_var->current_mutex);
+ break;
+ }
+ sleep(1);
+ }
+ }
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+ if (WSREP(thd)) mysql_mutex_unlock(&thd->LOCK_thd_data);
+ return 0;
+}
+
+
+static my_bool kill_all_threads_once_again(THD *thd, void *)
+{
+#ifndef __bsdi__ // Bug in BSDI kernel
+ if (thd->vio_ok())
+ {
+ if (global_system_variables.log_warnings)
+ sql_print_warning(ER_DEFAULT(ER_FORCING_CLOSE), my_progname,
+ (ulong) thd->thread_id,
+ (thd->main_security_ctx.user ?
+ thd->main_security_ctx.user : ""));
+ /*
+ close_connection() might need a valid current_thd
+ for memory allocation tracking.
+ */
+ THD *save_thd= current_thd;
+ set_current_thd(thd);
+ close_connection(thd, ER_SERVER_SHUTDOWN);
+ set_current_thd(save_thd);
+ }
+#endif
+
+#ifdef WITH_WSREP
+ /*
+ * WSREP_TODO:
+ * this code block may turn out redundant. wsrep->disconnect()
+ * should terminate slave threads gracefully, and we don't need
+ * to signal them here.
+ * The code here makes sure mysqld will not hang during shutdown
+ * even if wsrep provider has problems in shutting down.
+ */
+ if (WSREP(thd) && wsrep_thd_is_applying(thd))
+ {
+ sql_print_information("closing wsrep system thread");
+ thd->set_killed(KILL_CONNECTION);
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+ if (thd->mysys_var)
+ {
+ thd->mysys_var->abort=1;
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ if (thd->mysys_var->current_cond)
+ {
+ mysql_mutex_lock(thd->mysys_var->current_mutex);
+ mysql_cond_broadcast(thd->mysys_var->current_cond);
+ mysql_mutex_unlock(thd->mysys_var->current_mutex);
+ }
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+ }
+ }
+#endif
+ return 0;
+}
+
+
static void close_connections(void)
{
#ifdef EXTRA_DEBUG
@@ -1626,58 +1707,7 @@ static void close_connections(void)
This will give the threads some time to gracefully abort their
statements and inform their clients that the server is about to die.
*/
-
- THD *tmp;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
- (ulong) tmp->thread_id));
- /* We skip slave threads on this first loop through. */
- if (tmp->slave_thread)
- continue;
-
- /* cannot use 'continue' inside DBUG_EXECUTE_IF()... */
- if (DBUG_EVALUATE_IF("only_kill_system_threads", !tmp->system_thread, 0))
- continue;
-
-#ifdef WITH_WSREP
- /* skip wsrep system threads as well */
- if (WSREP(tmp) && (wsrep_thd_is_applying(tmp) || tmp->wsrep_applier))
- continue;
-#endif
- tmp->set_killed(KILL_SERVER_HARD);
- MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
- if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data);
- mysql_mutex_lock(&tmp->LOCK_thd_kill);
- if (tmp->mysys_var)
- {
- tmp->mysys_var->abort=1;
- mysql_mutex_lock(&tmp->mysys_var->mutex);
- if (tmp->mysys_var->current_cond)
- {
- uint i;
- for (i=0; i < 2; i++)
- {
- int ret= mysql_mutex_trylock(tmp->mysys_var->current_mutex);
- mysql_cond_broadcast(tmp->mysys_var->current_cond);
- if (!ret)
- {
- /* Thread has surely got the signal, unlock and abort */
- mysql_mutex_unlock(tmp->mysys_var->current_mutex);
- break;
- }
- sleep(1);
- }
- }
- mysql_mutex_unlock(&tmp->mysys_var->mutex);
- }
- mysql_mutex_unlock(&tmp->LOCK_thd_kill);
- if (WSREP(tmp)) mysql_mutex_unlock(&tmp->LOCK_thd_data);
- }
- mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list
+ server_threads.iterate(kill_all_threads);
Events::deinit();
slave_prepare_for_shutdown();
@@ -1708,65 +1738,8 @@ static void close_connections(void)
This will ensure that threads that are waiting for a command from the
client on a blocking read call are aborted.
*/
+ server_threads.iterate(kill_all_threads_once_again);
- for (;;)
- {
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
- if (!(tmp=threads.get()))
- {
- mysql_mutex_unlock(&LOCK_thread_count);
- break;
- }
-#ifndef __bsdi__ // Bug in BSDI kernel
- if (tmp->vio_ok())
- {
- if (global_system_variables.log_warnings)
- sql_print_warning(ER_DEFAULT(ER_FORCING_CLOSE),my_progname,
- (ulong) tmp->thread_id,
- (tmp->main_security_ctx.user ?
- tmp->main_security_ctx.user : ""));
- /*
- close_connection() might need a valid current_thd
- for memory allocation tracking.
- */
- THD* save_thd= current_thd;
- set_current_thd(tmp);
- close_connection(tmp,ER_SERVER_SHUTDOWN);
- set_current_thd(save_thd);
- }
-#endif
-
-#ifdef WITH_WSREP
- /*
- * WSREP_TODO:
- * this code block may turn out redundant. wsrep->disconnect()
- * should terminate slave threads gracefully, and we don't need
- * to signal them here.
- * The code here makes sure mysqld will not hang during shutdown
- * even if wsrep provider has problems in shutting down.
- */
- if (WSREP(tmp) && wsrep_thd_is_applying(tmp))
- {
- sql_print_information("closing wsrep system thread");
- tmp->set_killed(KILL_CONNECTION);
- MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
- if (tmp->mysys_var)
- {
- tmp->mysys_var->abort=1;
- mysql_mutex_lock(&tmp->mysys_var->mutex);
- if (tmp->mysys_var->current_cond)
- {
- mysql_mutex_lock(tmp->mysys_var->current_mutex);
- mysql_cond_broadcast(tmp->mysys_var->current_cond);
- mysql_mutex_unlock(tmp->mysys_var->current_mutex);
- }
- mysql_mutex_unlock(&tmp->mysys_var->mutex);
- }
- }
-#endif
- DBUG_PRINT("quit",("Unlocking LOCK_thread_count"));
- mysql_mutex_unlock(&LOCK_thread_count);
- }
end_slave();
#ifdef WITH_WSREP
if (wsrep_inited == 1)
@@ -2249,6 +2222,7 @@ static void wait_for_signal_thread_to_end()
static void clean_up_mutexes()
{
DBUG_ENTER("clean_up_mutexes");
+ server_threads.destroy();
mysql_rwlock_destroy(&LOCK_grant);
mysql_mutex_destroy(&LOCK_thread_count);
mysql_mutex_destroy(&LOCK_thread_cache);
@@ -2798,7 +2772,7 @@ void unlink_thd(THD *thd)
thd->cleanup();
thd->add_status_to_global();
- unlink_not_visible_thd(thd);
+ server_threads.erase(thd);
#ifdef WITH_WSREP
/*
@@ -2910,7 +2884,7 @@ static bool cache_thread(THD *thd)
thd->thr_create_utime= microsecond_interval_timer();
thd->start_utime= thd->thr_create_utime;
- add_to_active_threads(thd);
+ server_threads.insert(thd);
DBUG_RETURN(1);
}
}
@@ -4627,6 +4601,7 @@ static int init_common_variables()
static int init_thread_environment()
{
DBUG_ENTER("init_thread_environment");
+ server_threads.init();
mysql_mutex_init(key_LOCK_thread_count, &LOCK_thread_count, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_thread_cache, &LOCK_thread_cache, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_start_thread, &LOCK_start_thread, MY_MUTEX_INIT_FAST);
@@ -8265,7 +8240,6 @@ static int mysql_init_variables(void)
global_query_id= 1;
global_thread_id= 0;
strnmov(server_version, MYSQL_SERVER_VERSION, sizeof(server_version)-1);
- threads.empty();
thread_cache.empty();
key_caches.empty();
if (!(dflt_key_cache= get_or_create_key_cache(default_key_cache_base.str,
@@ -9973,6 +9947,14 @@ static my_thread_id thread_id_max= UINT_MAX32;
@param[out] low - lower bound for the range
@param[out] high - upper bound for the range
*/
+
+static my_bool recalculate_callback(THD *thd, std::vector<my_thread_id> *ids)
+{
+ ids->push_back(thd->thread_id);
+ return 0;
+}
+
+
static void recalculate_thread_id_range(my_thread_id *low, my_thread_id *high)
{
std::vector<my_thread_id> ids;
@@ -9980,15 +9962,7 @@ static void recalculate_thread_id_range(my_thread_id *low, my_thread_id *high)
// Add sentinels
ids.push_back(0);
ids.push_back(UINT_MAX32);
-
- mysql_mutex_lock(&LOCK_thread_count);
-
- I_List_iterator<THD> it(threads);
- THD *thd;
- while ((thd=it++))
- ids.push_back(thd->thread_id);
-
- mysql_mutex_unlock(&LOCK_thread_count);
+ server_threads.iterate(recalculate_callback, &ids);
std::sort(ids.begin(), ids.end());
my_thread_id max_gap= 0;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 2acfc7db5d3..a8e2e3cff6c 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -299,7 +299,6 @@ extern pthread_attr_t connection_attrib;
extern my_bool old_mode;
extern LEX_STRING opt_init_connect, opt_init_slave;
extern int bootstrap_error;
-extern I_List<THD> threads;
extern char err_shared_dir[];
extern ulong connection_errors_select;
extern ulong connection_errors_accept;
@@ -346,7 +345,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_rpl_group_info_sleep_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_start_thread,
- key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
+ key_LOCK_error_messages, key_LOCK_thread_count, key_Thread_map_mutex,
+ key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
extern PSI_mutex_key key_LOCK_relaylog_end_pos;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 144b12a9fdf..dc5e3ff1fbf 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1023,7 +1023,7 @@ handle_rpl_parallel_thread(void *arg)
my_thread_init();
thd = new THD(next_thread_id());
thd->thread_stack = (char*)&thd;
- add_to_active_threads(thd);
+ server_threads.insert(thd);
set_current_thd(thd);
pthread_detach_this_thread();
thd->init_for_queries();
@@ -1432,7 +1432,7 @@ handle_rpl_parallel_thread(void *arg)
thd->temporary_tables= 0;
THD_CHECK_SENTRY(thd);
- unlink_not_visible_thd(thd);
+ server_threads.erase(thd);
delete thd;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
diff --git a/sql/slave.cc b/sql/slave.cc
index f31021bc71e..345aa8aa53b 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -4698,7 +4698,7 @@ pthread_handler_t handle_slave_io(void *arg)
goto err_during_init;
}
thd->system_thread_info.rpl_io_info= &io_info;
- add_to_active_threads(thd);
+ server_threads.insert(thd);
mi->slave_running = MYSQL_SLAVE_RUN_NOT_CONNECT;
mi->abort_slave = 0;
mysql_mutex_unlock(&mi->run_lock);
@@ -5080,7 +5080,7 @@ err:
flush_master_info(mi, TRUE, TRUE);
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
- unlink_not_visible_thd(thd);
+ server_threads.erase(thd);
mysql_mutex_lock(&mi->run_lock);
err_during_init:
@@ -5368,7 +5368,7 @@ pthread_handler_t handle_slave_sql(void *arg)
/* Ensure that slave can exeute any alter table it gets from master */
thd->variables.alter_algorithm= (ulong) Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT;
- add_to_active_threads(thd);
+ server_threads.insert(thd);
/*
We are going to set slave_running to 1. Assuming slave I/O thread is
alive and connected, this is going to make Seconds_Behind_Master be 0
@@ -5714,7 +5714,7 @@ pthread_handler_t handle_slave_sql(void *arg)
}
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
- unlink_not_visible_thd(thd);
+ server_threads.erase(thd);
mysql_mutex_lock(&rli->run_lock);
err_during_init:
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 87e377c1819..e46b93f4bf1 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1639,8 +1639,8 @@ THD::~THD()
THD *orig_thd= current_thd;
THD_CHECK_SENTRY(this);
DBUG_ENTER("~THD()");
- /* Check that we have already called thd->unlink() */
- DBUG_ASSERT(prev == 0 && next == 0);
+ /* Make sure threads are not available via server_threads. */
+ assert_not_linked();
/* This takes a long time so we should not do this under LOCK_thread_count */
mysql_mutex_assert_not_owner(&LOCK_thread_count);
@@ -4772,14 +4772,14 @@ MYSQL_THD create_thd()
thd->set_command(COM_DAEMON);
thd->system_thread= SYSTEM_THREAD_GENERIC;
thd->security_ctx->host_or_ip="";
- add_to_active_threads(thd);
+ server_threads.insert(thd);
return thd;
}
void destroy_thd(MYSQL_THD thd)
{
thd->add_status_to_global();
- unlink_not_visible_thd(thd);
+ server_threads.erase(thd);
delete thd;
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 6338eab9d45..5194f5cb925 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -5004,27 +5004,6 @@ public:
}
};
-inline void add_to_active_threads(THD *thd)
-{
- mysql_mutex_lock(&LOCK_thread_count);
- threads.append(thd);
- mysql_mutex_unlock(&LOCK_thread_count);
-}
-
-/*
- This should be called when you want to delete a thd that was not
- running any queries.
- This function will assert that the THD is linked.
-*/
-
-inline void unlink_not_visible_thd(THD *thd)
-{
- thd->assert_linked();
- mysql_mutex_lock(&LOCK_thread_count);
- thd->unlink();
- mysql_mutex_unlock(&LOCK_thread_count);
-}
-
/** A short cut for thd->get_stmt_da()->set_ok_status(). */
inline void
@@ -6935,5 +6914,85 @@ private:
THD *thd;
};
+
+/** THD registry */
+class THD_list
+{
+ I_List<THD> threads;
+ mutable mysql_mutex_t mutex;
+
+public:
+ /**
+ Constructor replacement.
+
+ Unfortunately we can't use fair constructor to initialize mutex
+ for two reasons: PFS and embedded. The former can probably be fixed,
+ the latter can probably be dropped.
+ */
+ void init()
+ {
+ mysql_mutex_init(key_Thread_map_mutex, &mutex, MY_MUTEX_INIT_FAST);
+ }
+
+ /** Destructor replacement. */
+ void destroy()
+ {
+ mysql_mutex_destroy(&mutex);
+ }
+
+ /**
+ Inserts thread to registry.
+
+ @param thd thread
+
+ Thread becomes accessible via server_threads.
+ */
+ void insert(THD *thd)
+ {
+ mysql_mutex_lock(&mutex);
+ threads.append(thd);
+ mysql_mutex_unlock(&mutex);
+ }
+
+ /**
+ Removes thread from registry.
+
+ @param thd thread
+
+ Thread becomes not accessible via server_threads.
+ */
+ void erase(THD *thd)
+ {
+ thd->assert_linked();
+ mysql_mutex_lock(&mutex);
+ thd->unlink();
+ mysql_mutex_unlock(&mutex);
+ }
+
+ /**
+ Iterates registered threads.
+
+ @param action called for every element
+ @param argument opque argument passed to action
+
+ @return
+ @retval 0 iteration completed successfully
+ @retval 1 iteration was interrupted (action returned 1)
+ */
+ template <typename T> int iterate(my_bool (*action)(THD *thd, T *arg), T *arg= 0)
+ {
+ int res= 0;
+ mysql_mutex_lock(&mutex);
+ I_List_iterator<THD> it(threads);
+ while (auto tmp= it++)
+ if ((res= action(tmp, arg)))
+ break;
+ mysql_mutex_unlock(&mutex);
+ return res;
+ }
+};
+
+extern THD_list server_threads;
+
#endif /* MYSQL_SERVER */
#endif /* SQL_CLASS_INCLUDED */
diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc
index ec46d84c7ce..ac2503d29ff 100644
--- a/sql/sql_connect.cc
+++ b/sql/sql_connect.cc
@@ -1360,7 +1360,7 @@ void do_handle_one_connection(CONNECT *connect)
delete connect;
/* Make THD visible in show processlist */
- add_to_active_threads(thd);
+ server_threads.insert(thd);
thd->thr_create_utime= thr_create_utime;
/* We need to set this because of time_out_user_resource_limits */
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 785d7c12bd2..a8f074e1783 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -2188,11 +2188,11 @@ public:
mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_delayed_insert_cond, &cond, NULL);
mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL);
- mysql_mutex_lock(&LOCK_thread_count);
+ mysql_mutex_lock(&LOCK_delayed_insert);
delayed_insert_threads++;
+ mysql_mutex_unlock(&LOCK_delayed_insert);
delayed_lock= global_system_variables.low_priority_updates ?
TL_WRITE_LOW_PRIORITY : TL_WRITE;
- mysql_mutex_unlock(&LOCK_thread_count);
DBUG_VOID_RETURN;
}
~Delayed_insert()
@@ -2210,15 +2210,9 @@ public:
mysql_cond_destroy(&cond);
mysql_cond_destroy(&cond_client);
- /*
- We could use unlink_not_visible_threads() here, but as
- delayed_insert_threads also needs to be protected by
- the LOCK_thread_count mutex, we open code this.
- */
- mysql_mutex_lock(&LOCK_thread_count);
- thd.unlink(); // Must be unlinked under lock
+ server_threads.erase(&thd);
+ mysql_mutex_assert_owner(&LOCK_delayed_insert);
delayed_insert_threads--;
- mysql_mutex_unlock(&LOCK_thread_count);
my_free(thd.query());
thd.security_ctx->user= 0;
@@ -2940,7 +2934,7 @@ pthread_handler_t handle_delayed_insert(void *arg)
pthread_detach_this_thread();
/* Add thread to THD list so that's it's visible in 'show processlist' */
thd->set_start_time();
- add_to_active_threads(thd);
+ server_threads.insert(thd);
if (abort_loop)
thd->set_killed(KILL_CONNECTION);
else
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 320b9580ba3..f543d3b8dd4 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -8955,24 +8955,35 @@ void add_join_natural(TABLE_LIST *a, TABLE_LIST *b, List<String> *using_fields,
pointer - thread found, and its LOCK_thd_kill is locked.
*/
-THD *find_thread_by_id(longlong id, bool query_id)
+struct find_thread_callback_arg
{
- THD *tmp;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
+ find_thread_callback_arg(longlong id_arg, bool query_id_arg):
+ thd(0), id(id_arg), query_id(query_id_arg) {}
+ THD *thd;
+ longlong id;
+ bool query_id;
+};
+
+
+my_bool find_thread_callback(THD *thd, find_thread_callback_arg *arg)
+{
+ if (thd->get_command() != COM_DAEMON &&
+ arg->id == (arg->query_id ? thd->query_id : (longlong) thd->thread_id))
{
- if (tmp->get_command() == COM_DAEMON)
- continue;
- if (id == (query_id ? tmp->query_id : (longlong) tmp->thread_id))
- {
- if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data);
- mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
- break;
- }
+ if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete
+ arg->thd= thd;
+ return 1;
}
- mysql_mutex_unlock(&LOCK_thread_count);
- return tmp;
+ return 0;
+}
+
+
+THD *find_thread_by_id(longlong id, bool query_id)
+{
+ find_thread_callback_arg arg(id, query_id);
+ server_threads.iterate(find_thread_callback, &arg);
+ return arg.thd;
}
@@ -9056,53 +9067,63 @@ kill_one_thread(THD *thd, longlong id, killed_state kill_signal, killed_type typ
are killed.
*/
-static uint kill_threads_for_user(THD *thd, LEX_USER *user,
- killed_state kill_signal, ha_rows *rows)
+struct kill_threads_callback_arg
{
- THD *tmp;
+ kill_threads_callback_arg(THD *thd_arg, LEX_USER *user_arg):
+ thd(thd_arg), user(user_arg) {}
+ THD *thd;
+ LEX_USER *user;
List<THD> threads_to_kill;
- DBUG_ENTER("kill_threads_for_user");
-
- *rows= 0;
-
- if (unlikely(thd->is_fatal_error)) // If we run out of memory
- DBUG_RETURN(ER_OUT_OF_RESOURCES);
+};
- DBUG_PRINT("enter", ("user: %s signal: %u", user->user.str,
- (uint) kill_signal));
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
+static my_bool kill_threads_callback(THD *thd, kill_threads_callback_arg *arg)
+{
+ if (thd->security_ctx->user)
{
- if (!tmp->security_ctx->user)
- continue;
/*
Check that hostname (if given) and user name matches.
host.str[0] == '%' means that host name was not given. See sql_yacc.yy
*/
- if (((user->host.str[0] == '%' && !user->host.str[1]) ||
- !strcmp(tmp->security_ctx->host_or_ip, user->host.str)) &&
- !strcmp(tmp->security_ctx->user, user->user.str))
+ if (((arg->user->host.str[0] == '%' && !arg->user->host.str[1]) ||
+ !strcmp(thd->security_ctx->host_or_ip, arg->user->host.str)) &&
+ !strcmp(thd->security_ctx->user, arg->user->user.str))
{
- if (!(thd->security_ctx->master_access & SUPER_ACL) &&
- !thd->security_ctx->user_matches(tmp->security_ctx))
- {
- mysql_mutex_unlock(&LOCK_thread_count);
- DBUG_RETURN(ER_KILL_DENIED_ERROR);
- }
- if (!threads_to_kill.push_back(tmp, thd->mem_root))
+ if (!(arg->thd->security_ctx->master_access & SUPER_ACL) &&
+ !arg->thd->security_ctx->user_matches(thd->security_ctx))
+ return 1;
+ if (!arg->threads_to_kill.push_back(thd, arg->thd->mem_root))
{
- if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data);
- mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
+ if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
+ mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete
}
}
}
- mysql_mutex_unlock(&LOCK_thread_count);
- if (!threads_to_kill.is_empty())
+ return 0;
+}
+
+
+static uint kill_threads_for_user(THD *thd, LEX_USER *user,
+ killed_state kill_signal, ha_rows *rows)
+{
+ kill_threads_callback_arg arg(thd, user);
+ DBUG_ENTER("kill_threads_for_user");
+
+ *rows= 0;
+
+ if (unlikely(thd->is_fatal_error)) // If we run out of memory
+ DBUG_RETURN(ER_OUT_OF_RESOURCES);
+
+ DBUG_PRINT("enter", ("user: %s signal: %u", user->user.str,
+ (uint) kill_signal));
+
+ if (server_threads.iterate(kill_threads_callback, &arg))
+ DBUG_RETURN(ER_KILL_DENIED_ERROR);
+
+ if (!arg.threads_to_kill.is_empty())
{
- List_iterator_fast<THD> it2(threads_to_kill);
+ List_iterator_fast<THD> it2(arg.threads_to_kill);
THD *next_ptr;
THD *ptr= it2++;
do
diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc
index d448b7b9e02..21767b4de96 100644
--- a/sql/sql_plugin.cc
+++ b/sql/sql_plugin.cc
@@ -4354,25 +4354,23 @@ void wsrep_plugins_pre_init()
members of wsrep startup threads with correct values, as these value
were not available at the time these threads were created.
*/
-void wsrep_plugins_post_init()
-{
- THD *thd;
- I_List_iterator<THD> it(threads);
- while ((thd= it++))
+my_bool post_init_callback(THD *thd, void *)
+{
+ if (thd->wsrep_applier)
{
- if (IF_WSREP(thd->wsrep_applier,1))
- {
- // Save options_bits as it will get overwritten in plugin_thdvar_init()
- ulonglong option_bits_saved= thd->variables.option_bits;
-
- plugin_thdvar_init(thd);
-
- // Restore option_bits
- thd->variables.option_bits= option_bits_saved;
- }
+ // Save options_bits as it will get overwritten in plugin_thdvar_init()
+ ulonglong option_bits_saved= thd->variables.option_bits;
+ plugin_thdvar_init(thd);
+ // Restore option_bits
+ thd->variables.option_bits= option_bits_saved;
}
+ return 0;
+}
- return;
+
+void wsrep_plugins_post_init()
+{
+ server_threads.iterate(post_init_callback);
}
#endif /* WITH_WSREP */
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index ab4a5c6a6b4..fc08399bb88 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -527,59 +527,48 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD *
Now they sync is done for next read.
*/
-void adjust_linfo_offsets(my_off_t purge_offset)
+static my_bool adjust_callback(THD *thd, my_off_t *purge_offset)
{
- THD *tmp;
-
- mysql_mutex_lock(&LOCK_thread_count);
- I_List_iterator<THD> it(threads);
-
- while ((tmp=it++))
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (auto linfo= thd->current_linfo)
{
- LOG_INFO* linfo;
- mysql_mutex_lock(&tmp->LOCK_thd_data);
- if ((linfo = tmp->current_linfo))
- {
- /*
- Index file offset can be less that purge offset only if
- we just started reading the index file. In that case
- we have nothing to adjust
- */
- if (linfo->index_file_offset < purge_offset)
- linfo->fatal = (linfo->index_file_offset != 0);
- else
- linfo->index_file_offset -= purge_offset;
- }
- mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ /*
+ Index file offset can be less that purge offset only if
+ we just started reading the index file. In that case
+ we have nothing to adjust
+ */
+ if (linfo->index_file_offset < *purge_offset)
+ linfo->fatal= (linfo->index_file_offset != 0);
+ else
+ linfo->index_file_offset-= *purge_offset;
}
- mysql_mutex_unlock(&LOCK_thread_count);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ return 0;
}
-bool log_in_use(const char* log_name)
+void adjust_linfo_offsets(my_off_t purge_offset)
{
- size_t log_name_len = strlen(log_name) + 1;
- THD *tmp;
- bool result = 0;
-
- mysql_mutex_lock(&LOCK_thread_count);
- I_List_iterator<THD> it(threads);
+ server_threads.iterate(adjust_callback, &purge_offset);
+}
- while ((tmp=it++))
- {
- LOG_INFO* linfo;
- mysql_mutex_lock(&tmp->LOCK_thd_data);
- if ((linfo = tmp->current_linfo))
- result = !memcmp(log_name, linfo->log_file_name, log_name_len);
- mysql_mutex_unlock(&tmp->LOCK_thd_data);
- if (result)
- break;
- }
- mysql_mutex_unlock(&LOCK_thread_count);
+static my_bool log_in_use_callback(THD *thd, const char *log_name)
+{
+ my_bool result= 0;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (auto linfo= thd->current_linfo)
+ result= !memcmp(log_name, linfo->log_file_name, strlen(log_name) + 1);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
return result;
}
+
+bool log_in_use(const char* log_name)
+{
+ return server_threads.iterate(log_in_use_callback, log_name);
+}
+
bool purge_error_message(THD* thd, int res)
{
uint errcode;
@@ -3367,31 +3356,40 @@ err:
slave_server_id the slave's server id
*/
-void kill_zombie_dump_threads(uint32 slave_server_id)
+struct kill_callback_arg
{
- mysql_mutex_lock(&LOCK_thread_count);
- I_List_iterator<THD> it(threads);
- THD *tmp;
+ kill_callback_arg(uint32 id): slave_server_id(id), thd(0) {}
+ uint32 slave_server_id;
+ THD *thd;
+};
- while ((tmp=it++))
+static my_bool kill_callback(THD *thd, kill_callback_arg *arg)
+{
+ if (thd->get_command() == COM_BINLOG_DUMP &&
+ thd->variables.server_id == arg->slave_server_id)
{
- if (tmp->get_command() == COM_BINLOG_DUMP &&
- tmp->variables.server_id == slave_server_id)
- {
- mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete
- break;
- }
+ arg->thd= thd;
+ mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete
+ return 1;
}
- mysql_mutex_unlock(&LOCK_thread_count);
- if (tmp)
+ return 0;
+}
+
+
+void kill_zombie_dump_threads(uint32 slave_server_id)
+{
+ kill_callback_arg arg(slave_server_id);
+ server_threads.iterate(kill_callback, &arg);
+
+ if (arg.thd)
{
/*
Here we do not call kill_one_thread() as
it will be slow because it will iterate through the list
again. We just to do kill the thread ourselves.
*/
- tmp->awake_no_mutex(KILL_SLAVE_SAME_ID);
- mysql_mutex_unlock(&tmp->LOCK_thd_kill);
+ arg.thd->awake_no_mutex(KILL_SLAVE_SAME_ID);
+ mysql_mutex_unlock(&arg.thd->LOCK_thd_kill);
}
}
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index 5adee6731b5..70905bcb2e7 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -2734,13 +2734,111 @@ static const char *thread_state_info(THD *tmp)
}
+struct list_callback_arg
+{
+ list_callback_arg(const char *u, THD *t, ulong m):
+ user(u), thd(t), max_query_length(m) {}
+ I_List<thread_info> thread_infos;
+ const char *user;
+ THD *thd;
+ ulong max_query_length;
+};
+
+
+static my_bool list_callback(THD *tmp, list_callback_arg *arg)
+{
+
+ Security_context *tmp_sctx= tmp->security_ctx;
+ bool got_thd_data;
+ if ((tmp->vio_ok() || tmp->system_thread) &&
+ (!arg->user || (!tmp->system_thread &&
+ tmp_sctx->user && !strcmp(tmp_sctx->user, arg->user))))
+ {
+ thread_info *thd_info= new (arg->thd->mem_root) thread_info;
+
+ thd_info->thread_id=tmp->thread_id;
+ thd_info->os_thread_id=tmp->os_thread_id;
+ thd_info->user= arg->thd->strdup(tmp_sctx->user ? tmp_sctx->user :
+ (tmp->system_thread ?
+ "system user" : "unauthenticated user"));
+ if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) &&
+ arg->thd->security_ctx->host_or_ip[0])
+ {
+ if ((thd_info->host= (char*) arg->thd->alloc(LIST_PROCESS_HOST_LEN+1)))
+ my_snprintf((char *) thd_info->host, LIST_PROCESS_HOST_LEN,
+ "%s:%u", tmp_sctx->host_or_ip, tmp->peer_port);
+ }
+ else
+ thd_info->host= arg->thd->strdup(tmp_sctx->host_or_ip[0] ?
+ tmp_sctx->host_or_ip :
+ tmp_sctx->host ? tmp_sctx->host : "");
+ thd_info->command=(int) tmp->get_command();
+
+ if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data)))
+ {
+ /* This is an approximation */
+ thd_info->proc_info= (char*) (tmp->killed >= KILL_QUERY ?
+ "Killed" : 0);
+
+ /* The following variables are only safe to access under a lock */
+ thd_info->db= 0;
+ if (tmp->db.str)
+ thd_info->db= arg->thd->strmake(tmp->db.str, tmp->db.length);
+
+ if (tmp->query())
+ {
+ uint length= MY_MIN(arg->max_query_length, tmp->query_length());
+ char *q= arg->thd->strmake(tmp->query(),length);
+ /* Safety: in case strmake failed, we set length to 0. */
+ thd_info->query_string=
+ CSET_STRING(q, q ? length : 0, tmp->query_charset());
+ }
+
+ /*
+ Progress report. We need to do this under a lock to ensure that all
+ is from the same stage.
+ */
+ if (tmp->progress.max_counter)
+ {
+ uint max_stage= MY_MAX(tmp->progress.max_stage, 1);
+ thd_info->progress= (((tmp->progress.stage / (double) max_stage) +
+ ((tmp->progress.counter /
+ (double) tmp->progress.max_counter) /
+ (double) max_stage)) *
+ 100.0);
+ set_if_smaller(thd_info->progress, 100);
+ }
+ else
+ thd_info->progress= 0.0;
+ }
+ else
+ {
+ thd_info->proc_info= "Busy";
+ thd_info->progress= 0.0;
+ thd_info->db= "";
+ }
+
+ thd_info->state_info= thread_state_info(tmp);
+ thd_info->start_time= tmp->start_utime;
+ ulonglong utime_after_query_snapshot= tmp->utime_after_query;
+ if (thd_info->start_time < utime_after_query_snapshot)
+ thd_info->start_time= utime_after_query_snapshot; // COM_SLEEP
+
+ if (got_thd_data)
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ arg->thread_infos.append(thd_info);
+ }
+ return 0;
+}
+
+
void mysqld_list_processes(THD *thd,const char *user, bool verbose)
{
Item *field;
List<Item> field_list;
- I_List<thread_info> thread_infos;
- ulong max_query_length= (verbose ? thd->variables.max_allowed_packet :
- PROCESS_LIST_WIDTH);
+ list_callback_arg arg(user, thd,
+ verbose ? thd->variables.max_allowed_packet :
+ PROCESS_LIST_WIDTH);
Protocol *protocol= thd->protocol;
MEM_ROOT *mem_root= thd->mem_root;
DBUG_ENTER("mysqld_list_processes");
@@ -2771,7 +2869,7 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
mem_root);
field->maybe_null=1;
field_list.push_back(field=new (mem_root)
- Item_empty_string(thd, "Info", max_query_length),
+ Item_empty_string(thd, "Info", arg.max_query_length),
mem_root);
field->maybe_null=1;
if (!thd->variables.old_mode &&
@@ -2790,102 +2888,13 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose)
if (thd->killed)
DBUG_VOID_RETURN;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
- I_List_iterator<THD> it(threads);
- THD *tmp;
- while ((tmp=it++))
- {
- Security_context *tmp_sctx= tmp->security_ctx;
- bool got_thd_data;
- if ((tmp->vio_ok() || tmp->system_thread) &&
- (!user || (!tmp->system_thread &&
- tmp_sctx->user && !strcmp(tmp_sctx->user, user))))
- {
- thread_info *thd_info= new (thd->mem_root) thread_info;
-
- thd_info->thread_id=tmp->thread_id;
- thd_info->os_thread_id=tmp->os_thread_id;
- thd_info->user= thd->strdup(tmp_sctx->user ? tmp_sctx->user :
- (tmp->system_thread ?
- "system user" : "unauthenticated user"));
- if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) &&
- thd->security_ctx->host_or_ip[0])
- {
- if ((thd_info->host= (char*) thd->alloc(LIST_PROCESS_HOST_LEN+1)))
- my_snprintf((char *) thd_info->host, LIST_PROCESS_HOST_LEN,
- "%s:%u", tmp_sctx->host_or_ip, tmp->peer_port);
- }
- else
- thd_info->host= thd->strdup(tmp_sctx->host_or_ip[0] ?
- tmp_sctx->host_or_ip :
- tmp_sctx->host ? tmp_sctx->host : "");
- thd_info->command=(int) tmp->get_command();
-
- if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data)))
- {
- /* This is an approximation */
- thd_info->proc_info= (char*) (tmp->killed >= KILL_QUERY ?
- "Killed" : 0);
- /*
- The following variables are only safe to access under a lock
- */
-
- thd_info->db= 0;
- if (tmp->db.str)
- thd_info->db= thd->strmake(tmp->db.str, tmp->db.length);
-
- if (tmp->query())
- {
- uint length= MY_MIN(max_query_length, tmp->query_length());
- char *q= thd->strmake(tmp->query(),length);
- /* Safety: in case strmake failed, we set length to 0. */
- thd_info->query_string=
- CSET_STRING(q, q ? length : 0, tmp->query_charset());
- }
+ server_threads.iterate(list_callback, &arg);
- /*
- Progress report. We need to do this under a lock to ensure that all
- is from the same stage.
- */
- if (tmp->progress.max_counter)
- {
- uint max_stage= MY_MAX(tmp->progress.max_stage, 1);
- thd_info->progress= (((tmp->progress.stage / (double) max_stage) +
- ((tmp->progress.counter /
- (double) tmp->progress.max_counter) /
- (double) max_stage)) *
- 100.0);
- set_if_smaller(thd_info->progress, 100);
- }
- else
- thd_info->progress= 0.0;
- }
- else
- {
- thd_info->proc_info= "Busy";
- thd_info->progress= 0.0;
- thd_info->db= "";
- }
-
- thd_info->state_info= thread_state_info(tmp);
- thd_info->start_time= tmp->start_utime;
- ulonglong utime_after_query_snapshot= tmp->utime_after_query;
- if (thd_info->start_time < utime_after_query_snapshot)
- thd_info->start_time= utime_after_query_snapshot; // COM_SLEEP
-
- if (got_thd_data)
- mysql_mutex_unlock(&tmp->LOCK_thd_data);
- thread_infos.append(thd_info);
- }
- }
- mysql_mutex_unlock(&LOCK_thread_count);
-
- thread_info *thd_info;
ulonglong now= microsecond_interval_timer();
char buff[20]; // For progress
String store_buffer(buff, sizeof(buff), system_charset_info);
- while ((thd_info=thread_infos.get()))
+ while (auto thd_info= arg.thread_infos.get())
{
protocol->prepare_for_resend();
protocol->store(thd_info->thread_id);
@@ -3169,152 +3178,150 @@ int fill_show_explain(THD *thd, TABLE_LIST *table, COND *cond)
}
-int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
+struct processlist_callback_arg
{
- TABLE *table= tables->table;
- CHARSET_INFO *cs= system_charset_info;
- char *user;
- ulonglong unow= microsecond_interval_timer();
- DBUG_ENTER("fill_schema_processlist");
+ processlist_callback_arg(THD *thd_arg, TABLE *table_arg):
+ thd(thd_arg), table(table_arg), unow(microsecond_interval_timer()) {}
+ THD *thd;
+ TABLE *table;
+ ulonglong unow;
+};
- DEBUG_SYNC(thd,"fill_schema_processlist_after_unow");
- user= thd->security_ctx->master_access & PROCESS_ACL ?
- NullS : thd->security_ctx->priv_user;
+static my_bool processlist_callback(THD *tmp, processlist_callback_arg *arg)
+{
+ Security_context *tmp_sctx= tmp->security_ctx;
+ CHARSET_INFO *cs= system_charset_info;
+ const char *val;
+ ulonglong max_counter;
+ bool got_thd_data;
+ char *user= arg->thd->security_ctx->master_access & PROCESS_ACL ?
+ NullS : arg->thd->security_ctx->priv_user;
+
+ if ((!tmp->vio_ok() && !tmp->system_thread) ||
+ (user && (tmp->system_thread || !tmp_sctx->user ||
+ strcmp(tmp_sctx->user, user))))
+ return 0;
- mysql_mutex_lock(&LOCK_thread_count);
+ restore_record(arg->table, s->default_values);
+ /* ID */
+ arg->table->field[0]->store((longlong) tmp->thread_id, TRUE);
+ /* USER */
+ val= tmp_sctx->user ? tmp_sctx->user :
+ (tmp->system_thread ? "system user" : "unauthenticated user");
+ arg->table->field[1]->store(val, strlen(val), cs);
+ /* HOST */
+ if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) &&
+ arg->thd->security_ctx->host_or_ip[0])
+ {
+ char host[LIST_PROCESS_HOST_LEN + 1];
+ my_snprintf(host, LIST_PROCESS_HOST_LEN, "%s:%u",
+ tmp_sctx->host_or_ip, tmp->peer_port);
+ arg->table->field[2]->store(host, strlen(host), cs);
+ }
+ else
+ arg->table->field[2]->store(tmp_sctx->host_or_ip,
+ strlen(tmp_sctx->host_or_ip), cs);
- if (!thd->killed)
+ if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data)))
{
- I_List_iterator<THD> it(threads);
- THD* tmp;
-
- while ((tmp= it++))
+ /* DB */
+ if (tmp->db.str)
{
- Security_context *tmp_sctx= tmp->security_ctx;
- const char *val;
- ulonglong max_counter;
- bool got_thd_data;
-
- if ((!tmp->vio_ok() && !tmp->system_thread) ||
- (user && (tmp->system_thread || !tmp_sctx->user ||
- strcmp(tmp_sctx->user, user))))
- continue;
-
- restore_record(table, s->default_values);
- /* ID */
- table->field[0]->store((longlong) tmp->thread_id, TRUE);
- /* USER */
- val= tmp_sctx->user ? tmp_sctx->user :
- (tmp->system_thread ? "system user" : "unauthenticated user");
- table->field[1]->store(val, strlen(val), cs);
- /* HOST */
- if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) &&
- thd->security_ctx->host_or_ip[0])
- {
- char host[LIST_PROCESS_HOST_LEN + 1];
- my_snprintf(host, LIST_PROCESS_HOST_LEN, "%s:%u",
- tmp_sctx->host_or_ip, tmp->peer_port);
- table->field[2]->store(host, strlen(host), cs);
- }
- else
- table->field[2]->store(tmp_sctx->host_or_ip,
- strlen(tmp_sctx->host_or_ip), cs);
+ arg->table->field[3]->store(tmp->db.str, tmp->db.length, cs);
+ arg->table->field[3]->set_notnull();
+ }
+ }
- if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data)))
- {
- /* DB */
- if (tmp->db.str)
- {
- table->field[3]->store(tmp->db.str, tmp->db.length, cs);
- table->field[3]->set_notnull();
- }
- }
+ /* COMMAND */
+ if ((val= (char *) (!got_thd_data ? "Busy" :
+ (tmp->killed >= KILL_QUERY ?
+ "Killed" : 0))))
+ arg->table->field[4]->store(val, strlen(val), cs);
+ else
+ arg->table->field[4]->store(command_name[tmp->get_command()].str,
+ command_name[tmp->get_command()].length, cs);
- /* COMMAND */
- if ((val= (char *) (!got_thd_data ? "Busy" :
- (tmp->killed >= KILL_QUERY ?
- "Killed" : 0))))
- table->field[4]->store(val, strlen(val), cs);
- else
- table->field[4]->store(command_name[tmp->get_command()].str,
- command_name[tmp->get_command()].length, cs);
+ /* MYSQL_TIME */
+ ulonglong utime= tmp->start_utime;
+ ulonglong utime_after_query_snapshot= tmp->utime_after_query;
+ if (utime < utime_after_query_snapshot)
+ utime= utime_after_query_snapshot; // COM_SLEEP
+ utime= utime && utime < arg->unow ? arg->unow - utime : 0;
- /* MYSQL_TIME */
- ulonglong utime= tmp->start_utime;
- ulonglong utime_after_query_snapshot= tmp->utime_after_query;
- if (utime < utime_after_query_snapshot)
- utime= utime_after_query_snapshot; // COM_SLEEP
- utime= utime && utime < unow ? unow - utime : 0;
+ arg->table->field[5]->store(utime / HRTIME_RESOLUTION, TRUE);
- table->field[5]->store(utime / HRTIME_RESOLUTION, TRUE);
+ if (got_thd_data)
+ {
+ if (tmp->query())
+ {
+ arg->table->field[7]->store(tmp->query(),
+ MY_MIN(PROCESS_LIST_INFO_WIDTH,
+ tmp->query_length()), cs);
+ arg->table->field[7]->set_notnull();
- if (got_thd_data)
- {
- if (tmp->query())
- {
- table->field[7]->store(tmp->query(),
- MY_MIN(PROCESS_LIST_INFO_WIDTH,
- tmp->query_length()), cs);
- table->field[7]->set_notnull();
+ /* INFO_BINARY */
+ arg->table->field[16]->store(tmp->query(),
+ MY_MIN(PROCESS_LIST_INFO_WIDTH,
+ tmp->query_length()),
+ &my_charset_bin);
+ arg->table->field[16]->set_notnull();
+ }
- /* INFO_BINARY */
- table->field[16]->store(tmp->query(),
- MY_MIN(PROCESS_LIST_INFO_WIDTH,
- tmp->query_length()),
- &my_charset_bin);
- table->field[16]->set_notnull();
- }
+ /*
+ Progress report. We need to do this under a lock to ensure that all
+ is from the same stage.
+ */
+ if ((max_counter= tmp->progress.max_counter))
+ {
+ arg->table->field[9]->store((longlong) tmp->progress.stage + 1, 1);
+ arg->table->field[10]->store((longlong) tmp->progress.max_stage, 1);
+ arg->table->field[11]->store((double) tmp->progress.counter /
+ (double) max_counter*100.0);
+ }
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ }
- /*
- Progress report. We need to do this under a lock to ensure that all
- is from the same stage.
- */
- if ((max_counter= tmp->progress.max_counter))
- {
- table->field[9]->store((longlong) tmp->progress.stage + 1, 1);
- table->field[10]->store((longlong) tmp->progress.max_stage, 1);
- table->field[11]->store((double) tmp->progress.counter /
- (double) max_counter*100.0);
- }
- mysql_mutex_unlock(&tmp->LOCK_thd_data);
- }
+ /* STATE */
+ if ((val= thread_state_info(tmp)))
+ {
+ arg->table->field[6]->store(val, strlen(val), cs);
+ arg->table->field[6]->set_notnull();
+ }
- /* STATE */
- if ((val= thread_state_info(tmp)))
- {
- table->field[6]->store(val, strlen(val), cs);
- table->field[6]->set_notnull();
- }
+ /* TIME_MS */
+ arg->table->field[8]->store((double)(utime / (HRTIME_RESOLUTION / 1000.0)));
- /* TIME_MS */
- table->field[8]->store((double)(utime / (HRTIME_RESOLUTION / 1000.0)));
+ /*
+ This may become negative if we free a memory allocated by another
+ thread in this thread. However it's better that we notice it eventually
+ than hide it.
+ */
+ arg->table->field[12]->store((longlong) tmp->status_var.local_memory_used,
+ FALSE);
+ arg->table->field[13]->store((longlong) tmp->status_var.max_local_memory_used,
+ FALSE);
+ arg->table->field[14]->store((longlong) tmp->get_examined_row_count(), TRUE);
- /*
- This may become negative if we free a memory allocated by another
- thread in this thread. However it's better that we notice it eventually
- than hide it.
- */
- table->field[12]->store((longlong) tmp->status_var.local_memory_used,
- FALSE);
- table->field[13]->store((longlong) tmp->status_var.max_local_memory_used,
- FALSE);
- table->field[14]->store((longlong) tmp->get_examined_row_count(), TRUE);
+ /* QUERY_ID */
+ arg->table->field[15]->store(tmp->query_id, TRUE);
- /* QUERY_ID */
- table->field[15]->store(tmp->query_id, TRUE);
+ arg->table->field[17]->store(tmp->os_thread_id);
- table->field[17]->store(tmp->os_thread_id);
+ if (schema_table_store_record(arg->thd, arg->table))
+ return 1;
+ return 0;
+}
- if (schema_table_store_record(thd, table))
- {
- mysql_mutex_unlock(&LOCK_thread_count);
- DBUG_RETURN(1);
- }
- }
- }
- mysql_mutex_unlock(&LOCK_thread_count);
+int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond)
+{
+ processlist_callback_arg arg(thd, tables->table);
+ DBUG_ENTER("fill_schema_processlist");
+ DEBUG_SYNC(thd,"fill_schema_processlist_after_unow");
+ if (!thd->killed &&
+ server_threads.iterate(processlist_callback, &arg))
+ DBUG_RETURN(1);
DBUG_RETURN(0);
}
@@ -3770,36 +3777,38 @@ end:
Return number of threads used
*/
-uint calc_sum_of_all_status(STATUS_VAR *to)
+struct calc_sum_callback_arg
{
- uint count= 0;
- DBUG_ENTER("calc_sum_of_all_status");
+ calc_sum_callback_arg(STATUS_VAR *to_arg): to(to_arg), count(0) {}
+ STATUS_VAR *to;
+ uint count;
+};
- /* Ensure that thread id not killed during loop */
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
- I_List_iterator<THD> it(threads);
- THD *tmp;
+static my_bool calc_sum_callback(THD *thd, calc_sum_callback_arg *arg)
+{
+ arg->count++;
+ if (!thd->status_in_global)
+ {
+ add_to_status(arg->to, &thd->status_var);
+ arg->to->local_memory_used+= thd->status_var.local_memory_used;
+ }
+ if (thd->get_command() != COM_SLEEP)
+ arg->to->threads_running++;
+ return 0;
+}
+
+
+uint calc_sum_of_all_status(STATUS_VAR *to)
+{
+ calc_sum_callback_arg arg(to);
+ DBUG_ENTER("calc_sum_of_all_status");
- /* Get global values as base */
*to= global_status_var;
to->local_memory_used= 0;
-
/* Add to this status from existing threads */
- while ((tmp= it++))
- {
- count++;
- if (!tmp->status_in_global)
- {
- add_to_status(to, &tmp->status_var);
- to->local_memory_used+= tmp->status_var.local_memory_used;
- }
- if (tmp->get_command() != COM_SLEEP)
- to->threads_running++;
- }
-
- mysql_mutex_unlock(&LOCK_thread_count);
- DBUG_RETURN(count);
+ server_threads.iterate(calc_sum_callback, &arg);
+ DBUG_RETURN(arg.count);
}
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 24ab972776c..695623cd4ea 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -243,7 +243,7 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
return NULL;
}
delete connect;
- add_to_active_threads(thd);
+ server_threads.insert(thd);
thd->set_mysys_var(mysys_var);
thd->event_scheduler.data= scheduler_data;
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index 641d01645ba..e37fd6f0cf4 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -578,43 +578,24 @@ static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
Also, recalculate time when next timeout check should run.
*/
-static void timeout_check(pool_timer_t *timer)
+static my_bool timeout_check(THD *thd, pool_timer_t *timer)
{
DBUG_ENTER("timeout_check");
-
- mysql_mutex_lock(&LOCK_thread_count);
- I_List_iterator<THD> it(threads);
-
- /* Reset next timeout check, it will be recalculated in the loop below */
- my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX);
-
- THD *thd;
- while ((thd=it++))
+ if (thd->net.reading_or_writing == 1)
{
- if (thd->net.reading_or_writing != 1)
- continue;
-
- TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data;
- if (!connection)
- {
- /*
- Connection does not have scheduler data. This happens for example
- if THD belongs to a different scheduler, that is listening to extra_port.
- */
- continue;
- }
-
- if(connection->abs_wait_timeout < timer->current_microtime)
- {
- tp_timeout_handler(connection);
- }
- else
+ /*
+ Check if connection does not have scheduler data. This happens for example
+ if THD belongs to a different scheduler, that is listening to extra_port.
+ */
+ if (auto connection= (TP_connection_generic *) thd->event_scheduler.data)
{
- set_next_timeout_check(connection->abs_wait_timeout);
+ if (connection->abs_wait_timeout < timer->current_microtime)
+ tp_timeout_handler(connection);
+ else
+ set_next_timeout_check(connection->abs_wait_timeout);
}
}
- mysql_mutex_unlock(&LOCK_thread_count);
- DBUG_VOID_RETURN;
+ DBUG_RETURN(0);
}
@@ -671,7 +652,12 @@ static void* timer_thread(void *param)
/* Check if any client exceeded wait_timeout */
if (timer->next_timeout_check <= timer->current_microtime)
- timeout_check(timer);
+ {
+ /* Reset next timeout check, it will be recalculated below */
+ my_atomic_fas64((volatile int64*) &timer->next_timeout_check,
+ ULONGLONG_MAX);
+ server_threads.iterate(timeout_check, timer);
+ }
}
mysql_mutex_unlock(&timer->mutex);
}
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index e7f7db206a9..b33687afdbc 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -2204,22 +2204,16 @@ static inline bool is_committing_connection(THD *thd)
return ret;
}
-static bool have_client_connections()
+static my_bool have_client_connections(THD *thd, void*)
{
- THD *tmp;
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
+ DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
+ (longlong) thd->thread_id));
+ if (is_client_connection(thd) && thd->killed == KILL_CONNECTION)
{
- DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
- (longlong) tmp->thread_id));
- if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
- {
- (void)abort_replicated(tmp);
- return true;
- }
+ (void)abort_replicated(thd);
+ return 1;
}
- return false;
+ return 0;
}
static void wsrep_close_thread(THD *thd)
@@ -2240,89 +2234,72 @@ static void wsrep_close_thread(THD *thd)
}
}
-static my_bool have_committing_connections()
+static my_bool have_committing_connections(THD *thd, void *)
{
- THD *tmp;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- if (!is_client_connection(tmp))
- continue;
-
- if (is_committing_connection(tmp))
- {
- mysql_mutex_unlock(&LOCK_thread_count);
- return TRUE;
- }
- }
- mysql_mutex_unlock(&LOCK_thread_count);
- return FALSE;
+ return is_client_connection(thd) && is_committing_connection(thd) ? 1 : 0;
}
int wsrep_wait_committing_connections_close(int wait_time)
{
int sleep_time= 100;
- while (have_committing_connections() && wait_time > 0)
+ while (server_threads.iterate(have_committing_connections) && wait_time > 0)
{
WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
my_sleep(sleep_time);
wait_time -= sleep_time;
}
- if (have_committing_connections())
+ return server_threads.iterate(have_committing_connections);
+}
+
+static my_bool kill_all_threads(THD *thd, THD *caller_thd)
+{
+ DBUG_PRINT("quit", ("Informing thread %lld that it's time to die",
+ (longlong) thd->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (is_client_connection(thd) && thd != caller_thd)
{
- return 1;
+ if (is_replaying_connection(thd))
+ thd->set_killed(KILL_CONNECTION);
+ else if (!abort_replicated(thd))
+ {
+ /* replicated transactions must be skipped */
+ WSREP_DEBUG("closing connection %lld", (longlong) thd->thread_id);
+ /* instead of wsrep_close_thread() we do now soft kill by THD::awake */
+ thd->awake(KILL_CONNECTION);
+ }
}
return 0;
}
+static my_bool kill_remaining_threads(THD *thd, THD *caller_thd)
+{
+#ifndef __bsdi__ // Bug in BSDI kernel
+ if (is_client_connection(thd) &&
+ !abort_replicated(thd) &&
+ !is_replaying_connection(thd) &&
+ thd != caller_thd)
+ {
+ WSREP_INFO("killing local connection: %lld", (longlong) thd->thread_id);
+ close_connection(thd, 0);
+ }
+#endif
+ return 0;
+}
+
void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
{
/*
First signal all threads that it's time to die
*/
- THD *tmp;
mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
bool kill_cached_threads_saved= kill_cached_threads;
kill_cached_threads= true; // prevent future threads caching
mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
- {
- DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
- (longlong) tmp->thread_id));
- /* We skip slave threads & scheduler on this first loop through. */
- if (!is_client_connection(tmp))
- continue;
-
- if (tmp == except_caller_thd)
- {
- DBUG_ASSERT(is_client_connection(tmp));
- continue;
- }
-
- if (is_replaying_connection(tmp))
- {
- tmp->set_killed(KILL_CONNECTION);
- continue;
- }
-
- /* replicated transactions must be skipped */
- if (abort_replicated(tmp))
- continue;
-
- WSREP_DEBUG("closing connection %lld", (longlong) tmp->thread_id);
-
- /*
- instead of wsrep_close_thread() we do now soft kill by THD::awake
- */
- tmp->awake(KILL_CONNECTION);
- }
+ server_threads.iterate(kill_all_threads, except_caller_thd);
mysql_mutex_unlock(&LOCK_thread_count);
if (thread_count)
@@ -2332,26 +2309,12 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
/*
Force remaining threads to die by closing the connection to the client
*/
-
- I_List_iterator<THD> it2(threads);
- while ((tmp=it2++))
- {
-#ifndef __bsdi__ // Bug in BSDI kernel
- if (is_client_connection(tmp) &&
- !abort_replicated(tmp) &&
- !is_replaying_connection(tmp) &&
- tmp != except_caller_thd)
- {
- WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id);
- close_connection(tmp,0);
- }
-#endif
- }
+ server_threads.iterate(kill_remaining_threads, except_caller_thd);
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
WSREP_DEBUG("waiting for client connections to close: %u", thread_count);
- while (wait_to_end && have_client_connections())
+ while (wait_to_end && server_threads.iterate(have_client_connections))
{
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
@@ -2371,25 +2334,22 @@ void wsrep_close_applier(THD *thd)
wsrep_close_thread(thd);
}
-void wsrep_close_threads(THD *thd)
+static my_bool wsrep_close_threads_callback(THD *thd, THD *caller_thd)
{
- THD *tmp;
- mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
-
- I_List_iterator<THD> it(threads);
- while ((tmp=it++))
+ DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
+ (longlong) thd->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (thd->wsrep_applier && thd != caller_thd)
{
- DBUG_PRINT("quit",("Informing thread %lld that it's time to die",
- (longlong) tmp->thread_id));
- /* We skip slave threads & scheduler on this first loop through. */
- if (tmp->wsrep_applier && tmp != thd)
- {
- WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id);
- wsrep_close_thread (tmp);
- }
+ WSREP_DEBUG("closing wsrep thread %lld", (longlong) thd->thread_id);
+ wsrep_close_thread(thd);
}
+ return 0;
+}
- mysql_mutex_unlock(&LOCK_thread_count);
+void wsrep_close_threads(THD *thd)
+{
+ server_threads.iterate(wsrep_close_threads_callback, thd);
}
void wsrep_wait_appliers_close(THD *thd)
@@ -2730,7 +2690,7 @@ void* start_wsrep_THD(void *arg)
goto error;
}
- mysql_mutex_lock(&LOCK_thread_count);
+ statistic_increment(thread_created, &LOCK_status);
if (wsrep_gtid_mode)
{
@@ -2739,14 +2699,13 @@ void* start_wsrep_THD(void *arg)
}
thd->real_id=pthread_self(); // Keep purify happy
- thread_created++;
- threads.append(thd);
my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
- (void) mysql_mutex_unlock(&LOCK_thread_count);
+
+ server_threads.insert(thd);
/* from bootstrap()... */
thd->bootstrap=1;
@@ -2842,7 +2801,7 @@ void* start_wsrep_THD(void *arg)
*/
}
- unlink_not_visible_thd(thd);
+ server_threads.erase(thd);
delete thd;
my_thread_end();
return(NULL);