diff options
-rw-r--r-- | include/thread_pool_priv.h | 3 | ||||
-rw-r--r-- | libmysqld/lib_sql.cc | 9 | ||||
-rw-r--r-- | plugin/feedback/sender_thread.cc | 9 | ||||
-rw-r--r-- | plugin/handler_socket/handlersocket/database.cpp | 6 | ||||
-rw-r--r-- | sql/event_scheduler.cc | 24 | ||||
-rw-r--r-- | sql/mysqld.cc | 264 | ||||
-rw-r--r-- | sql/mysqld.h | 4 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 4 | ||||
-rw-r--r-- | sql/slave.cc | 8 | ||||
-rw-r--r-- | sql/sql_class.cc | 8 | ||||
-rw-r--r-- | sql/sql_class.h | 101 | ||||
-rw-r--r-- | sql/sql_connect.cc | 2 | ||||
-rw-r--r-- | sql/sql_insert.cc | 16 | ||||
-rw-r--r-- | sql/sql_parse.cc | 113 | ||||
-rw-r--r-- | sql/sql_plugin.cc | 30 | ||||
-rw-r--r-- | sql/sql_repl.cc | 110 | ||||
-rw-r--r-- | sql/sql_show.cc | 495 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 2 | ||||
-rw-r--r-- | sql/threadpool_generic.cc | 50 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 165 |
20 files changed, 703 insertions, 720 deletions
diff --git a/include/thread_pool_priv.h b/include/thread_pool_priv.h index f5fdbfbdf47..cd53306e851 100644 --- a/include/thread_pool_priv.h +++ b/include/thread_pool_priv.h @@ -61,9 +61,6 @@ void thd_set_mysys_var(THD *thd, st_my_thread_var *mysys_var); my_socket thd_get_fd(THD *thd); int thd_store_globals(THD* thd); -THD *first_global_thread(); -THD *next_global_thread(THD *thd); - /* Print to the MySQL error log */ void sql_print_error(const char *format, ...); diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc index 715f1dde5b5..305f6346c9e 100644 --- a/libmysqld/lib_sql.cc +++ b/libmysqld/lib_sql.cc @@ -432,11 +432,9 @@ int emb_unbuffered_fetch(MYSQL *mysql, char **row) static void emb_free_embedded_thd(MYSQL *mysql) { THD *thd= (THD*)mysql->thd; - mysql_mutex_lock(&LOCK_thread_count); + server_threads.erase(thd); thd->clear_data_list(); thd->store_globals(); - thd->unlink(); - mysql_mutex_unlock(&LOCK_thread_count); delete thd; my_pthread_setspecific_ptr(THR_THD, 0); mysql->thd=0; @@ -711,10 +709,7 @@ void *create_embedded_thd(int client_flag) thd->first_data= 0; thd->data_tail= &thd->first_data; bzero((char*) &thd->net, sizeof(thd->net)); - - mysql_mutex_lock(&LOCK_thread_count); - threads.append(thd); - mysql_mutex_unlock(&LOCK_thread_count); + server_threads.insert(thd); thd->mysys_var= 0; thd->reset_globals(); return thd; diff --git a/plugin/feedback/sender_thread.cc b/plugin/feedback/sender_thread.cc index b025879b6ee..591023d61fd 100644 --- a/plugin/feedback/sender_thread.cc +++ b/plugin/feedback/sender_thread.cc @@ -90,9 +90,7 @@ static int prepare_for_fill(TABLE_LIST *tables) in SHOW STATUS and we want to avoid skewing the statistics) */ thd->variables.pseudo_thread_id= thd->thread_id; - mysql_mutex_lock(&LOCK_thread_count); - threads.append(thd); - mysql_mutex_unlock(&LOCK_thread_count); + server_threads.insert(thd); thd->thread_stack= (char*) &tables; if (thd->store_globals()) return 1; @@ -258,12 +256,9 @@ ret: reset all thread local status variables to minimize the effect of the background thread on SHOW STATUS. */ - mysql_mutex_lock(&LOCK_thread_count); + server_threads.erase(thd); thd->set_status_var_init(); thd->killed= KILL_CONNECTION; - thd->unlink(); - mysql_cond_broadcast(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); delete thd; thd= 0; } diff --git a/plugin/handler_socket/handlersocket/database.cpp b/plugin/handler_socket/handlersocket/database.cpp index a76428b29d3..52ea8f2a8c4 100644 --- a/plugin/handler_socket/handlersocket/database.cpp +++ b/plugin/handler_socket/handlersocket/database.cpp @@ -280,7 +280,7 @@ dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag) DBG_THR(fprintf(stderr, "thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu " "O: %zu %zu %zu %zu %zu %zu %zu\n", - thd->thread_stack, sizeof(THD), sizeof(LOCK_thread_count), + thd->thread_stack, sizeof(THD), sizeof(mysql_mutex_t), DENA_THR_OFFSETOF(mdl_context), DENA_THR_OFFSETOF(net), DENA_THR_OFFSETOF(LOCK_thd_data), @@ -307,7 +307,7 @@ dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag) } { thd->thread_id = next_thread_id(); - add_to_active_threads(thd); + server_threads.insert(thd); } DBG_THR(fprintf(stderr, "HNDSOCK init thread wsts\n")); @@ -341,10 +341,8 @@ dbcontext::term_thread() close_tables_if(); my_pthread_setspecific_ptr(THR_THD, 0); { - pthread_mutex_lock(&LOCK_thread_count); delete thd; thd = 0; - pthread_mutex_unlock(&LOCK_thread_count); my_thread_end(); } } diff --git a/sql/event_scheduler.cc b/sql/event_scheduler.cc index f459fd34aee..99b3c9b93fb 100644 --- a/sql/event_scheduler.cc +++ b/sql/event_scheduler.cc @@ -150,7 +150,7 @@ deinit_event_thread(THD *thd) { thd->proc_info= "Clearing"; DBUG_PRINT("exit", ("Event thread finishing")); - unlink_not_visible_thd(thd); + server_threads.erase(thd); delete thd; } @@ -185,7 +185,7 @@ pre_init_event_thread(THD* thd) thd->net.read_timeout= slave_net_timeout; thd->variables.option_bits|= OPTION_AUTO_IS_NULL; thd->client_capabilities|= CLIENT_MULTI_RESULTS; - add_to_active_threads(thd); + server_threads.insert(thd); /* Guarantees that we will see the thread in SHOW PROCESSLIST though its @@ -679,20 +679,20 @@ end: Event_scheduler::workers_count() */ +static my_bool workers_count_callback(THD *thd, uint32_t *count) +{ + if (thd->system_thread == SYSTEM_THREAD_EVENT_WORKER) + ++*count; + return 0; +} + + uint Event_scheduler::workers_count() { - THD *tmp; - uint count= 0; - + uint32_t count= 0; DBUG_ENTER("Event_scheduler::workers_count"); - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) - ++count; - mysql_mutex_unlock(&LOCK_thread_count); - DBUG_PRINT("exit", ("%d", count)); + server_threads.iterate(workers_count_callback, &count); DBUG_RETURN(count); } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index f2065a55aed..e8806f2d3cb 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -651,26 +651,11 @@ Le_creator le_creator; int bootstrap_error; -I_List<THD> threads; +THD_list server_threads; Rpl_filter* cur_rpl_filter; Rpl_filter* global_rpl_filter; Rpl_filter* binlog_filter; -THD *first_global_thread() -{ - if (threads.is_empty()) - return NULL; - return threads.head(); -} - -THD *next_global_thread(THD *thd) -{ - if (threads.is_last(thd)) - return NULL; - struct ilink *next= thd->next; - return static_cast<THD*>(next); -} - struct system_variables global_system_variables; /** Following is just for options parsing, used with a difference against @@ -707,12 +692,7 @@ pthread_key(THD*, THR_THD); /* LOCK_thread_count protects the following variables: thread_count Number of threads with THD that servers queries. - threads Linked list of active THD's. - The effect of this is that one can't unlink and - delete a THD as long as one has locked - LOCK_thread_count. - ready_to_exit - delayed_insert_threads + ready_to_exit */ mysql_mutex_t LOCK_thread_count; @@ -910,7 +890,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_LOCK_error_messages, key_LOCK_start_thread, - key_LOCK_thread_count, key_LOCK_thread_cache, + key_LOCK_thread_count, key_Thread_map_mutex, key_LOCK_thread_cache, key_PARTITION_LOCK_auto_inc; PSI_mutex_key key_RELAYLOG_LOCK_index; PSI_mutex_key key_LOCK_relaylog_end_pos; @@ -1004,6 +984,7 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL}, { &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL}, { &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL}, + { &key_Thread_map_mutex, "Thread_map::mutex", PSI_FLAG_GLOBAL }, { &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL}, { &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0}, { &key_LOCK_slave_state, "LOCK_slave_state", 0}, @@ -1549,6 +1530,106 @@ static void end_ssl(); ** Code to end mysqld ****************************************************************************/ +static my_bool kill_all_threads(THD *thd, void *) +{ + DBUG_PRINT("quit", ("Informing thread %ld that it's time to die", + (ulong) thd->thread_id)); + /* We skip slave threads on this first loop through. */ + if (thd->slave_thread) + return 0; + + if (DBUG_EVALUATE_IF("only_kill_system_threads", !thd->system_thread, 0)) + return 0; + +#ifdef WITH_WSREP + /* skip wsrep system threads as well */ + if (WSREP(thd) && (wsrep_thd_is_applying(thd) || thd->wsrep_applier)) + return 0; +#endif + thd->set_killed(KILL_SERVER_HARD); + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); + if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_kill); + if (thd->mysys_var) + { + thd->mysys_var->abort= 1; + mysql_mutex_lock(&thd->mysys_var->mutex); + if (thd->mysys_var->current_cond) + { + for (uint i= 0; i < 2; i++) + { + int ret= mysql_mutex_trylock(thd->mysys_var->current_mutex); + mysql_cond_broadcast(thd->mysys_var->current_cond); + if (!ret) + { + /* Thread has surely got the signal, unlock and abort */ + mysql_mutex_unlock(thd->mysys_var->current_mutex); + break; + } + sleep(1); + } + } + mysql_mutex_unlock(&thd->mysys_var->mutex); + } + mysql_mutex_unlock(&thd->LOCK_thd_kill); + if (WSREP(thd)) mysql_mutex_unlock(&thd->LOCK_thd_data); + return 0; +} + + +static my_bool kill_all_threads_once_again(THD *thd, void *) +{ +#ifndef __bsdi__ // Bug in BSDI kernel + if (thd->vio_ok()) + { + if (global_system_variables.log_warnings) + sql_print_warning(ER_DEFAULT(ER_FORCING_CLOSE), my_progname, + (ulong) thd->thread_id, + (thd->main_security_ctx.user ? + thd->main_security_ctx.user : "")); + /* + close_connection() might need a valid current_thd + for memory allocation tracking. + */ + THD *save_thd= current_thd; + set_current_thd(thd); + close_connection(thd, ER_SERVER_SHUTDOWN); + set_current_thd(save_thd); + } +#endif + +#ifdef WITH_WSREP + /* + * WSREP_TODO: + * this code block may turn out redundant. wsrep->disconnect() + * should terminate slave threads gracefully, and we don't need + * to signal them here. + * The code here makes sure mysqld will not hang during shutdown + * even if wsrep provider has problems in shutting down. + */ + if (WSREP(thd) && wsrep_thd_is_applying(thd)) + { + sql_print_information("closing wsrep system thread"); + thd->set_killed(KILL_CONNECTION); + MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); + if (thd->mysys_var) + { + thd->mysys_var->abort=1; + mysql_mutex_lock(&thd->mysys_var->mutex); + if (thd->mysys_var->current_cond) + { + mysql_mutex_lock(thd->mysys_var->current_mutex); + mysql_cond_broadcast(thd->mysys_var->current_cond); + mysql_mutex_unlock(thd->mysys_var->current_mutex); + } + mysql_mutex_unlock(&thd->mysys_var->mutex); + } + } +#endif + return 0; +} + + static void close_connections(void) { #ifdef EXTRA_DEBUG @@ -1626,58 +1707,7 @@ static void close_connections(void) This will give the threads some time to gracefully abort their statements and inform their clients that the server is about to die. */ - - THD *tmp; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - DBUG_PRINT("quit",("Informing thread %ld that it's time to die", - (ulong) tmp->thread_id)); - /* We skip slave threads on this first loop through. */ - if (tmp->slave_thread) - continue; - - /* cannot use 'continue' inside DBUG_EXECUTE_IF()... */ - if (DBUG_EVALUATE_IF("only_kill_system_threads", !tmp->system_thread, 0)) - continue; - -#ifdef WITH_WSREP - /* skip wsrep system threads as well */ - if (WSREP(tmp) && (wsrep_thd_is_applying(tmp) || tmp->wsrep_applier)) - continue; -#endif - tmp->set_killed(KILL_SERVER_HARD); - MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp)); - if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data); - mysql_mutex_lock(&tmp->LOCK_thd_kill); - if (tmp->mysys_var) - { - tmp->mysys_var->abort=1; - mysql_mutex_lock(&tmp->mysys_var->mutex); - if (tmp->mysys_var->current_cond) - { - uint i; - for (i=0; i < 2; i++) - { - int ret= mysql_mutex_trylock(tmp->mysys_var->current_mutex); - mysql_cond_broadcast(tmp->mysys_var->current_cond); - if (!ret) - { - /* Thread has surely got the signal, unlock and abort */ - mysql_mutex_unlock(tmp->mysys_var->current_mutex); - break; - } - sleep(1); - } - } - mysql_mutex_unlock(&tmp->mysys_var->mutex); - } - mysql_mutex_unlock(&tmp->LOCK_thd_kill); - if (WSREP(tmp)) mysql_mutex_unlock(&tmp->LOCK_thd_data); - } - mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list + server_threads.iterate(kill_all_threads); Events::deinit(); slave_prepare_for_shutdown(); @@ -1708,65 +1738,8 @@ static void close_connections(void) This will ensure that threads that are waiting for a command from the client on a blocking read call are aborted. */ + server_threads.iterate(kill_all_threads_once_again); - for (;;) - { - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - if (!(tmp=threads.get())) - { - mysql_mutex_unlock(&LOCK_thread_count); - break; - } -#ifndef __bsdi__ // Bug in BSDI kernel - if (tmp->vio_ok()) - { - if (global_system_variables.log_warnings) - sql_print_warning(ER_DEFAULT(ER_FORCING_CLOSE),my_progname, - (ulong) tmp->thread_id, - (tmp->main_security_ctx.user ? - tmp->main_security_ctx.user : "")); - /* - close_connection() might need a valid current_thd - for memory allocation tracking. - */ - THD* save_thd= current_thd; - set_current_thd(tmp); - close_connection(tmp,ER_SERVER_SHUTDOWN); - set_current_thd(save_thd); - } -#endif - -#ifdef WITH_WSREP - /* - * WSREP_TODO: - * this code block may turn out redundant. wsrep->disconnect() - * should terminate slave threads gracefully, and we don't need - * to signal them here. - * The code here makes sure mysqld will not hang during shutdown - * even if wsrep provider has problems in shutting down. - */ - if (WSREP(tmp) && wsrep_thd_is_applying(tmp)) - { - sql_print_information("closing wsrep system thread"); - tmp->set_killed(KILL_CONNECTION); - MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp)); - if (tmp->mysys_var) - { - tmp->mysys_var->abort=1; - mysql_mutex_lock(&tmp->mysys_var->mutex); - if (tmp->mysys_var->current_cond) - { - mysql_mutex_lock(tmp->mysys_var->current_mutex); - mysql_cond_broadcast(tmp->mysys_var->current_cond); - mysql_mutex_unlock(tmp->mysys_var->current_mutex); - } - mysql_mutex_unlock(&tmp->mysys_var->mutex); - } - } -#endif - DBUG_PRINT("quit",("Unlocking LOCK_thread_count")); - mysql_mutex_unlock(&LOCK_thread_count); - } end_slave(); #ifdef WITH_WSREP if (wsrep_inited == 1) @@ -2249,6 +2222,7 @@ static void wait_for_signal_thread_to_end() static void clean_up_mutexes() { DBUG_ENTER("clean_up_mutexes"); + server_threads.destroy(); mysql_rwlock_destroy(&LOCK_grant); mysql_mutex_destroy(&LOCK_thread_count); mysql_mutex_destroy(&LOCK_thread_cache); @@ -2798,7 +2772,7 @@ void unlink_thd(THD *thd) thd->cleanup(); thd->add_status_to_global(); - unlink_not_visible_thd(thd); + server_threads.erase(thd); #ifdef WITH_WSREP /* @@ -2910,7 +2884,7 @@ static bool cache_thread(THD *thd) thd->thr_create_utime= microsecond_interval_timer(); thd->start_utime= thd->thr_create_utime; - add_to_active_threads(thd); + server_threads.insert(thd); DBUG_RETURN(1); } } @@ -4627,6 +4601,7 @@ static int init_common_variables() static int init_thread_environment() { DBUG_ENTER("init_thread_environment"); + server_threads.init(); mysql_mutex_init(key_LOCK_thread_count, &LOCK_thread_count, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_thread_cache, &LOCK_thread_cache, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_start_thread, &LOCK_start_thread, MY_MUTEX_INIT_FAST); @@ -8265,7 +8240,6 @@ static int mysql_init_variables(void) global_query_id= 1; global_thread_id= 0; strnmov(server_version, MYSQL_SERVER_VERSION, sizeof(server_version)-1); - threads.empty(); thread_cache.empty(); key_caches.empty(); if (!(dflt_key_cache= get_or_create_key_cache(default_key_cache_base.str, @@ -9973,6 +9947,14 @@ static my_thread_id thread_id_max= UINT_MAX32; @param[out] low - lower bound for the range @param[out] high - upper bound for the range */ + +static my_bool recalculate_callback(THD *thd, std::vector<my_thread_id> *ids) +{ + ids->push_back(thd->thread_id); + return 0; +} + + static void recalculate_thread_id_range(my_thread_id *low, my_thread_id *high) { std::vector<my_thread_id> ids; @@ -9980,15 +9962,7 @@ static void recalculate_thread_id_range(my_thread_id *low, my_thread_id *high) // Add sentinels ids.push_back(0); ids.push_back(UINT_MAX32); - - mysql_mutex_lock(&LOCK_thread_count); - - I_List_iterator<THD> it(threads); - THD *thd; - while ((thd=it++)) - ids.push_back(thd->thread_id); - - mysql_mutex_unlock(&LOCK_thread_count); + server_threads.iterate(recalculate_callback, &ids); std::sort(ids.begin(), ids.end()); my_thread_id max_gap= 0; diff --git a/sql/mysqld.h b/sql/mysqld.h index 2acfc7db5d3..a8e2e3cff6c 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -299,7 +299,6 @@ extern pthread_attr_t connection_attrib; extern my_bool old_mode; extern LEX_STRING opt_init_connect, opt_init_slave; extern int bootstrap_error; -extern I_List<THD> threads; extern char err_shared_dir[]; extern ulong connection_errors_select; extern ulong connection_errors_accept; @@ -346,7 +345,8 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list, key_rpl_group_info_sleep_lock, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_LOCK_start_thread, - key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; + key_LOCK_error_messages, key_LOCK_thread_count, key_Thread_map_mutex, + key_PARTITION_LOCK_auto_inc; extern PSI_mutex_key key_RELAYLOG_LOCK_index; extern PSI_mutex_key key_LOCK_relaylog_end_pos; extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 144b12a9fdf..dc5e3ff1fbf 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1023,7 +1023,7 @@ handle_rpl_parallel_thread(void *arg) my_thread_init(); thd = new THD(next_thread_id()); thd->thread_stack = (char*)&thd; - add_to_active_threads(thd); + server_threads.insert(thd); set_current_thd(thd); pthread_detach_this_thread(); thd->init_for_queries(); @@ -1432,7 +1432,7 @@ handle_rpl_parallel_thread(void *arg) thd->temporary_tables= 0; THD_CHECK_SENTRY(thd); - unlink_not_visible_thd(thd); + server_threads.erase(thd); delete thd; mysql_mutex_lock(&rpt->LOCK_rpl_thread); diff --git a/sql/slave.cc b/sql/slave.cc index f31021bc71e..345aa8aa53b 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -4698,7 +4698,7 @@ pthread_handler_t handle_slave_io(void *arg) goto err_during_init; } thd->system_thread_info.rpl_io_info= &io_info; - add_to_active_threads(thd); + server_threads.insert(thd); mi->slave_running = MYSQL_SLAVE_RUN_NOT_CONNECT; mi->abort_slave = 0; mysql_mutex_unlock(&mi->run_lock); @@ -5080,7 +5080,7 @@ err: flush_master_info(mi, TRUE, TRUE); THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); thd->add_status_to_global(); - unlink_not_visible_thd(thd); + server_threads.erase(thd); mysql_mutex_lock(&mi->run_lock); err_during_init: @@ -5368,7 +5368,7 @@ pthread_handler_t handle_slave_sql(void *arg) /* Ensure that slave can exeute any alter table it gets from master */ thd->variables.alter_algorithm= (ulong) Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT; - add_to_active_threads(thd); + server_threads.insert(thd); /* We are going to set slave_running to 1. Assuming slave I/O thread is alive and connected, this is going to make Seconds_Behind_Master be 0 @@ -5714,7 +5714,7 @@ pthread_handler_t handle_slave_sql(void *arg) } THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); thd->add_status_to_global(); - unlink_not_visible_thd(thd); + server_threads.erase(thd); mysql_mutex_lock(&rli->run_lock); err_during_init: diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 87e377c1819..e46b93f4bf1 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -1639,8 +1639,8 @@ THD::~THD() THD *orig_thd= current_thd; THD_CHECK_SENTRY(this); DBUG_ENTER("~THD()"); - /* Check that we have already called thd->unlink() */ - DBUG_ASSERT(prev == 0 && next == 0); + /* Make sure threads are not available via server_threads. */ + assert_not_linked(); /* This takes a long time so we should not do this under LOCK_thread_count */ mysql_mutex_assert_not_owner(&LOCK_thread_count); @@ -4772,14 +4772,14 @@ MYSQL_THD create_thd() thd->set_command(COM_DAEMON); thd->system_thread= SYSTEM_THREAD_GENERIC; thd->security_ctx->host_or_ip=""; - add_to_active_threads(thd); + server_threads.insert(thd); return thd; } void destroy_thd(MYSQL_THD thd) { thd->add_status_to_global(); - unlink_not_visible_thd(thd); + server_threads.erase(thd); delete thd; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 6338eab9d45..5194f5cb925 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -5004,27 +5004,6 @@ public: } }; -inline void add_to_active_threads(THD *thd) -{ - mysql_mutex_lock(&LOCK_thread_count); - threads.append(thd); - mysql_mutex_unlock(&LOCK_thread_count); -} - -/* - This should be called when you want to delete a thd that was not - running any queries. - This function will assert that the THD is linked. -*/ - -inline void unlink_not_visible_thd(THD *thd) -{ - thd->assert_linked(); - mysql_mutex_lock(&LOCK_thread_count); - thd->unlink(); - mysql_mutex_unlock(&LOCK_thread_count); -} - /** A short cut for thd->get_stmt_da()->set_ok_status(). */ inline void @@ -6935,5 +6914,85 @@ private: THD *thd; }; + +/** THD registry */ +class THD_list +{ + I_List<THD> threads; + mutable mysql_mutex_t mutex; + +public: + /** + Constructor replacement. + + Unfortunately we can't use fair constructor to initialize mutex + for two reasons: PFS and embedded. The former can probably be fixed, + the latter can probably be dropped. + */ + void init() + { + mysql_mutex_init(key_Thread_map_mutex, &mutex, MY_MUTEX_INIT_FAST); + } + + /** Destructor replacement. */ + void destroy() + { + mysql_mutex_destroy(&mutex); + } + + /** + Inserts thread to registry. + + @param thd thread + + Thread becomes accessible via server_threads. + */ + void insert(THD *thd) + { + mysql_mutex_lock(&mutex); + threads.append(thd); + mysql_mutex_unlock(&mutex); + } + + /** + Removes thread from registry. + + @param thd thread + + Thread becomes not accessible via server_threads. + */ + void erase(THD *thd) + { + thd->assert_linked(); + mysql_mutex_lock(&mutex); + thd->unlink(); + mysql_mutex_unlock(&mutex); + } + + /** + Iterates registered threads. + + @param action called for every element + @param argument opque argument passed to action + + @return + @retval 0 iteration completed successfully + @retval 1 iteration was interrupted (action returned 1) + */ + template <typename T> int iterate(my_bool (*action)(THD *thd, T *arg), T *arg= 0) + { + int res= 0; + mysql_mutex_lock(&mutex); + I_List_iterator<THD> it(threads); + while (auto tmp= it++) + if ((res= action(tmp, arg))) + break; + mysql_mutex_unlock(&mutex); + return res; + } +}; + +extern THD_list server_threads; + #endif /* MYSQL_SERVER */ #endif /* SQL_CLASS_INCLUDED */ diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index ec46d84c7ce..ac2503d29ff 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -1360,7 +1360,7 @@ void do_handle_one_connection(CONNECT *connect) delete connect; /* Make THD visible in show processlist */ - add_to_active_threads(thd); + server_threads.insert(thd); thd->thr_create_utime= thr_create_utime; /* We need to set this because of time_out_user_resource_limits */ diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 785d7c12bd2..a8f074e1783 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -2188,11 +2188,11 @@ public: mysql_mutex_init(key_delayed_insert_mutex, &mutex, MY_MUTEX_INIT_FAST); mysql_cond_init(key_delayed_insert_cond, &cond, NULL); mysql_cond_init(key_delayed_insert_cond_client, &cond_client, NULL); - mysql_mutex_lock(&LOCK_thread_count); + mysql_mutex_lock(&LOCK_delayed_insert); delayed_insert_threads++; + mysql_mutex_unlock(&LOCK_delayed_insert); delayed_lock= global_system_variables.low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; - mysql_mutex_unlock(&LOCK_thread_count); DBUG_VOID_RETURN; } ~Delayed_insert() @@ -2210,15 +2210,9 @@ public: mysql_cond_destroy(&cond); mysql_cond_destroy(&cond_client); - /* - We could use unlink_not_visible_threads() here, but as - delayed_insert_threads also needs to be protected by - the LOCK_thread_count mutex, we open code this. - */ - mysql_mutex_lock(&LOCK_thread_count); - thd.unlink(); // Must be unlinked under lock + server_threads.erase(&thd); + mysql_mutex_assert_owner(&LOCK_delayed_insert); delayed_insert_threads--; - mysql_mutex_unlock(&LOCK_thread_count); my_free(thd.query()); thd.security_ctx->user= 0; @@ -2940,7 +2934,7 @@ pthread_handler_t handle_delayed_insert(void *arg) pthread_detach_this_thread(); /* Add thread to THD list so that's it's visible in 'show processlist' */ thd->set_start_time(); - add_to_active_threads(thd); + server_threads.insert(thd); if (abort_loop) thd->set_killed(KILL_CONNECTION); else diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 320b9580ba3..f543d3b8dd4 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -8955,24 +8955,35 @@ void add_join_natural(TABLE_LIST *a, TABLE_LIST *b, List<String> *using_fields, pointer - thread found, and its LOCK_thd_kill is locked. */ -THD *find_thread_by_id(longlong id, bool query_id) +struct find_thread_callback_arg { - THD *tmp; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - I_List_iterator<THD> it(threads); - while ((tmp=it++)) + find_thread_callback_arg(longlong id_arg, bool query_id_arg): + thd(0), id(id_arg), query_id(query_id_arg) {} + THD *thd; + longlong id; + bool query_id; +}; + + +my_bool find_thread_callback(THD *thd, find_thread_callback_arg *arg) +{ + if (thd->get_command() != COM_DAEMON && + arg->id == (arg->query_id ? thd->query_id : (longlong) thd->thread_id)) { - if (tmp->get_command() == COM_DAEMON) - continue; - if (id == (query_id ? tmp->query_id : (longlong) tmp->thread_id)) - { - if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data); - mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete - break; - } + if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete + arg->thd= thd; + return 1; } - mysql_mutex_unlock(&LOCK_thread_count); - return tmp; + return 0; +} + + +THD *find_thread_by_id(longlong id, bool query_id) +{ + find_thread_callback_arg arg(id, query_id); + server_threads.iterate(find_thread_callback, &arg); + return arg.thd; } @@ -9056,53 +9067,63 @@ kill_one_thread(THD *thd, longlong id, killed_state kill_signal, killed_type typ are killed. */ -static uint kill_threads_for_user(THD *thd, LEX_USER *user, - killed_state kill_signal, ha_rows *rows) +struct kill_threads_callback_arg { - THD *tmp; + kill_threads_callback_arg(THD *thd_arg, LEX_USER *user_arg): + thd(thd_arg), user(user_arg) {} + THD *thd; + LEX_USER *user; List<THD> threads_to_kill; - DBUG_ENTER("kill_threads_for_user"); - - *rows= 0; - - if (unlikely(thd->is_fatal_error)) // If we run out of memory - DBUG_RETURN(ER_OUT_OF_RESOURCES); +}; - DBUG_PRINT("enter", ("user: %s signal: %u", user->user.str, - (uint) kill_signal)); - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - I_List_iterator<THD> it(threads); - while ((tmp=it++)) +static my_bool kill_threads_callback(THD *thd, kill_threads_callback_arg *arg) +{ + if (thd->security_ctx->user) { - if (!tmp->security_ctx->user) - continue; /* Check that hostname (if given) and user name matches. host.str[0] == '%' means that host name was not given. See sql_yacc.yy */ - if (((user->host.str[0] == '%' && !user->host.str[1]) || - !strcmp(tmp->security_ctx->host_or_ip, user->host.str)) && - !strcmp(tmp->security_ctx->user, user->user.str)) + if (((arg->user->host.str[0] == '%' && !arg->user->host.str[1]) || + !strcmp(thd->security_ctx->host_or_ip, arg->user->host.str)) && + !strcmp(thd->security_ctx->user, arg->user->user.str)) { - if (!(thd->security_ctx->master_access & SUPER_ACL) && - !thd->security_ctx->user_matches(tmp->security_ctx)) - { - mysql_mutex_unlock(&LOCK_thread_count); - DBUG_RETURN(ER_KILL_DENIED_ERROR); - } - if (!threads_to_kill.push_back(tmp, thd->mem_root)) + if (!(arg->thd->security_ctx->master_access & SUPER_ACL) && + !arg->thd->security_ctx->user_matches(thd->security_ctx)) + return 1; + if (!arg->threads_to_kill.push_back(thd, arg->thd->mem_root)) { - if (WSREP(tmp)) mysql_mutex_lock(&tmp->LOCK_thd_data); - mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete + if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data); + mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete } } } - mysql_mutex_unlock(&LOCK_thread_count); - if (!threads_to_kill.is_empty()) + return 0; +} + + +static uint kill_threads_for_user(THD *thd, LEX_USER *user, + killed_state kill_signal, ha_rows *rows) +{ + kill_threads_callback_arg arg(thd, user); + DBUG_ENTER("kill_threads_for_user"); + + *rows= 0; + + if (unlikely(thd->is_fatal_error)) // If we run out of memory + DBUG_RETURN(ER_OUT_OF_RESOURCES); + + DBUG_PRINT("enter", ("user: %s signal: %u", user->user.str, + (uint) kill_signal)); + + if (server_threads.iterate(kill_threads_callback, &arg)) + DBUG_RETURN(ER_KILL_DENIED_ERROR); + + if (!arg.threads_to_kill.is_empty()) { - List_iterator_fast<THD> it2(threads_to_kill); + List_iterator_fast<THD> it2(arg.threads_to_kill); THD *next_ptr; THD *ptr= it2++; do diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc index d448b7b9e02..21767b4de96 100644 --- a/sql/sql_plugin.cc +++ b/sql/sql_plugin.cc @@ -4354,25 +4354,23 @@ void wsrep_plugins_pre_init() members of wsrep startup threads with correct values, as these value were not available at the time these threads were created. */ -void wsrep_plugins_post_init() -{ - THD *thd; - I_List_iterator<THD> it(threads); - while ((thd= it++)) +my_bool post_init_callback(THD *thd, void *) +{ + if (thd->wsrep_applier) { - if (IF_WSREP(thd->wsrep_applier,1)) - { - // Save options_bits as it will get overwritten in plugin_thdvar_init() - ulonglong option_bits_saved= thd->variables.option_bits; - - plugin_thdvar_init(thd); - - // Restore option_bits - thd->variables.option_bits= option_bits_saved; - } + // Save options_bits as it will get overwritten in plugin_thdvar_init() + ulonglong option_bits_saved= thd->variables.option_bits; + plugin_thdvar_init(thd); + // Restore option_bits + thd->variables.option_bits= option_bits_saved; } + return 0; +} - return; + +void wsrep_plugins_post_init() +{ + server_threads.iterate(post_init_callback); } #endif /* WITH_WSREP */ diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index ab4a5c6a6b4..fc08399bb88 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -527,59 +527,48 @@ static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * Now they sync is done for next read. */ -void adjust_linfo_offsets(my_off_t purge_offset) +static my_bool adjust_callback(THD *thd, my_off_t *purge_offset) { - THD *tmp; - - mysql_mutex_lock(&LOCK_thread_count); - I_List_iterator<THD> it(threads); - - while ((tmp=it++)) + mysql_mutex_lock(&thd->LOCK_thd_data); + if (auto linfo= thd->current_linfo) { - LOG_INFO* linfo; - mysql_mutex_lock(&tmp->LOCK_thd_data); - if ((linfo = tmp->current_linfo)) - { - /* - Index file offset can be less that purge offset only if - we just started reading the index file. In that case - we have nothing to adjust - */ - if (linfo->index_file_offset < purge_offset) - linfo->fatal = (linfo->index_file_offset != 0); - else - linfo->index_file_offset -= purge_offset; - } - mysql_mutex_unlock(&tmp->LOCK_thd_data); + /* + Index file offset can be less that purge offset only if + we just started reading the index file. In that case + we have nothing to adjust + */ + if (linfo->index_file_offset < *purge_offset) + linfo->fatal= (linfo->index_file_offset != 0); + else + linfo->index_file_offset-= *purge_offset; } - mysql_mutex_unlock(&LOCK_thread_count); + mysql_mutex_unlock(&thd->LOCK_thd_data); + return 0; } -bool log_in_use(const char* log_name) +void adjust_linfo_offsets(my_off_t purge_offset) { - size_t log_name_len = strlen(log_name) + 1; - THD *tmp; - bool result = 0; - - mysql_mutex_lock(&LOCK_thread_count); - I_List_iterator<THD> it(threads); + server_threads.iterate(adjust_callback, &purge_offset); +} - while ((tmp=it++)) - { - LOG_INFO* linfo; - mysql_mutex_lock(&tmp->LOCK_thd_data); - if ((linfo = tmp->current_linfo)) - result = !memcmp(log_name, linfo->log_file_name, log_name_len); - mysql_mutex_unlock(&tmp->LOCK_thd_data); - if (result) - break; - } - mysql_mutex_unlock(&LOCK_thread_count); +static my_bool log_in_use_callback(THD *thd, const char *log_name) +{ + my_bool result= 0; + mysql_mutex_lock(&thd->LOCK_thd_data); + if (auto linfo= thd->current_linfo) + result= !memcmp(log_name, linfo->log_file_name, strlen(log_name) + 1); + mysql_mutex_unlock(&thd->LOCK_thd_data); return result; } + +bool log_in_use(const char* log_name) +{ + return server_threads.iterate(log_in_use_callback, log_name); +} + bool purge_error_message(THD* thd, int res) { uint errcode; @@ -3367,31 +3356,40 @@ err: slave_server_id the slave's server id */ -void kill_zombie_dump_threads(uint32 slave_server_id) +struct kill_callback_arg { - mysql_mutex_lock(&LOCK_thread_count); - I_List_iterator<THD> it(threads); - THD *tmp; + kill_callback_arg(uint32 id): slave_server_id(id), thd(0) {} + uint32 slave_server_id; + THD *thd; +}; - while ((tmp=it++)) +static my_bool kill_callback(THD *thd, kill_callback_arg *arg) +{ + if (thd->get_command() == COM_BINLOG_DUMP && + thd->variables.server_id == arg->slave_server_id) { - if (tmp->get_command() == COM_BINLOG_DUMP && - tmp->variables.server_id == slave_server_id) - { - mysql_mutex_lock(&tmp->LOCK_thd_kill); // Lock from delete - break; - } + arg->thd= thd; + mysql_mutex_lock(&thd->LOCK_thd_kill); // Lock from delete + return 1; } - mysql_mutex_unlock(&LOCK_thread_count); - if (tmp) + return 0; +} + + +void kill_zombie_dump_threads(uint32 slave_server_id) +{ + kill_callback_arg arg(slave_server_id); + server_threads.iterate(kill_callback, &arg); + + if (arg.thd) { /* Here we do not call kill_one_thread() as it will be slow because it will iterate through the list again. We just to do kill the thread ourselves. */ - tmp->awake_no_mutex(KILL_SLAVE_SAME_ID); - mysql_mutex_unlock(&tmp->LOCK_thd_kill); + arg.thd->awake_no_mutex(KILL_SLAVE_SAME_ID); + mysql_mutex_unlock(&arg.thd->LOCK_thd_kill); } } diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 5adee6731b5..70905bcb2e7 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -2734,13 +2734,111 @@ static const char *thread_state_info(THD *tmp) } +struct list_callback_arg +{ + list_callback_arg(const char *u, THD *t, ulong m): + user(u), thd(t), max_query_length(m) {} + I_List<thread_info> thread_infos; + const char *user; + THD *thd; + ulong max_query_length; +}; + + +static my_bool list_callback(THD *tmp, list_callback_arg *arg) +{ + + Security_context *tmp_sctx= tmp->security_ctx; + bool got_thd_data; + if ((tmp->vio_ok() || tmp->system_thread) && + (!arg->user || (!tmp->system_thread && + tmp_sctx->user && !strcmp(tmp_sctx->user, arg->user)))) + { + thread_info *thd_info= new (arg->thd->mem_root) thread_info; + + thd_info->thread_id=tmp->thread_id; + thd_info->os_thread_id=tmp->os_thread_id; + thd_info->user= arg->thd->strdup(tmp_sctx->user ? tmp_sctx->user : + (tmp->system_thread ? + "system user" : "unauthenticated user")); + if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) && + arg->thd->security_ctx->host_or_ip[0]) + { + if ((thd_info->host= (char*) arg->thd->alloc(LIST_PROCESS_HOST_LEN+1))) + my_snprintf((char *) thd_info->host, LIST_PROCESS_HOST_LEN, + "%s:%u", tmp_sctx->host_or_ip, tmp->peer_port); + } + else + thd_info->host= arg->thd->strdup(tmp_sctx->host_or_ip[0] ? + tmp_sctx->host_or_ip : + tmp_sctx->host ? tmp_sctx->host : ""); + thd_info->command=(int) tmp->get_command(); + + if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data))) + { + /* This is an approximation */ + thd_info->proc_info= (char*) (tmp->killed >= KILL_QUERY ? + "Killed" : 0); + + /* The following variables are only safe to access under a lock */ + thd_info->db= 0; + if (tmp->db.str) + thd_info->db= arg->thd->strmake(tmp->db.str, tmp->db.length); + + if (tmp->query()) + { + uint length= MY_MIN(arg->max_query_length, tmp->query_length()); + char *q= arg->thd->strmake(tmp->query(),length); + /* Safety: in case strmake failed, we set length to 0. */ + thd_info->query_string= + CSET_STRING(q, q ? length : 0, tmp->query_charset()); + } + + /* + Progress report. We need to do this under a lock to ensure that all + is from the same stage. + */ + if (tmp->progress.max_counter) + { + uint max_stage= MY_MAX(tmp->progress.max_stage, 1); + thd_info->progress= (((tmp->progress.stage / (double) max_stage) + + ((tmp->progress.counter / + (double) tmp->progress.max_counter) / + (double) max_stage)) * + 100.0); + set_if_smaller(thd_info->progress, 100); + } + else + thd_info->progress= 0.0; + } + else + { + thd_info->proc_info= "Busy"; + thd_info->progress= 0.0; + thd_info->db= ""; + } + + thd_info->state_info= thread_state_info(tmp); + thd_info->start_time= tmp->start_utime; + ulonglong utime_after_query_snapshot= tmp->utime_after_query; + if (thd_info->start_time < utime_after_query_snapshot) + thd_info->start_time= utime_after_query_snapshot; // COM_SLEEP + + if (got_thd_data) + mysql_mutex_unlock(&tmp->LOCK_thd_data); + arg->thread_infos.append(thd_info); + } + return 0; +} + + void mysqld_list_processes(THD *thd,const char *user, bool verbose) { Item *field; List<Item> field_list; - I_List<thread_info> thread_infos; - ulong max_query_length= (verbose ? thd->variables.max_allowed_packet : - PROCESS_LIST_WIDTH); + list_callback_arg arg(user, thd, + verbose ? thd->variables.max_allowed_packet : + PROCESS_LIST_WIDTH); Protocol *protocol= thd->protocol; MEM_ROOT *mem_root= thd->mem_root; DBUG_ENTER("mysqld_list_processes"); @@ -2771,7 +2869,7 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose) mem_root); field->maybe_null=1; field_list.push_back(field=new (mem_root) - Item_empty_string(thd, "Info", max_query_length), + Item_empty_string(thd, "Info", arg.max_query_length), mem_root); field->maybe_null=1; if (!thd->variables.old_mode && @@ -2790,102 +2888,13 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose) if (thd->killed) DBUG_VOID_RETURN; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - I_List_iterator<THD> it(threads); - THD *tmp; - while ((tmp=it++)) - { - Security_context *tmp_sctx= tmp->security_ctx; - bool got_thd_data; - if ((tmp->vio_ok() || tmp->system_thread) && - (!user || (!tmp->system_thread && - tmp_sctx->user && !strcmp(tmp_sctx->user, user)))) - { - thread_info *thd_info= new (thd->mem_root) thread_info; - - thd_info->thread_id=tmp->thread_id; - thd_info->os_thread_id=tmp->os_thread_id; - thd_info->user= thd->strdup(tmp_sctx->user ? tmp_sctx->user : - (tmp->system_thread ? - "system user" : "unauthenticated user")); - if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) && - thd->security_ctx->host_or_ip[0]) - { - if ((thd_info->host= (char*) thd->alloc(LIST_PROCESS_HOST_LEN+1))) - my_snprintf((char *) thd_info->host, LIST_PROCESS_HOST_LEN, - "%s:%u", tmp_sctx->host_or_ip, tmp->peer_port); - } - else - thd_info->host= thd->strdup(tmp_sctx->host_or_ip[0] ? - tmp_sctx->host_or_ip : - tmp_sctx->host ? tmp_sctx->host : ""); - thd_info->command=(int) tmp->get_command(); - - if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data))) - { - /* This is an approximation */ - thd_info->proc_info= (char*) (tmp->killed >= KILL_QUERY ? - "Killed" : 0); - /* - The following variables are only safe to access under a lock - */ - - thd_info->db= 0; - if (tmp->db.str) - thd_info->db= thd->strmake(tmp->db.str, tmp->db.length); - - if (tmp->query()) - { - uint length= MY_MIN(max_query_length, tmp->query_length()); - char *q= thd->strmake(tmp->query(),length); - /* Safety: in case strmake failed, we set length to 0. */ - thd_info->query_string= - CSET_STRING(q, q ? length : 0, tmp->query_charset()); - } + server_threads.iterate(list_callback, &arg); - /* - Progress report. We need to do this under a lock to ensure that all - is from the same stage. - */ - if (tmp->progress.max_counter) - { - uint max_stage= MY_MAX(tmp->progress.max_stage, 1); - thd_info->progress= (((tmp->progress.stage / (double) max_stage) + - ((tmp->progress.counter / - (double) tmp->progress.max_counter) / - (double) max_stage)) * - 100.0); - set_if_smaller(thd_info->progress, 100); - } - else - thd_info->progress= 0.0; - } - else - { - thd_info->proc_info= "Busy"; - thd_info->progress= 0.0; - thd_info->db= ""; - } - - thd_info->state_info= thread_state_info(tmp); - thd_info->start_time= tmp->start_utime; - ulonglong utime_after_query_snapshot= tmp->utime_after_query; - if (thd_info->start_time < utime_after_query_snapshot) - thd_info->start_time= utime_after_query_snapshot; // COM_SLEEP - - if (got_thd_data) - mysql_mutex_unlock(&tmp->LOCK_thd_data); - thread_infos.append(thd_info); - } - } - mysql_mutex_unlock(&LOCK_thread_count); - - thread_info *thd_info; ulonglong now= microsecond_interval_timer(); char buff[20]; // For progress String store_buffer(buff, sizeof(buff), system_charset_info); - while ((thd_info=thread_infos.get())) + while (auto thd_info= arg.thread_infos.get()) { protocol->prepare_for_resend(); protocol->store(thd_info->thread_id); @@ -3169,152 +3178,150 @@ int fill_show_explain(THD *thd, TABLE_LIST *table, COND *cond) } -int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond) +struct processlist_callback_arg { - TABLE *table= tables->table; - CHARSET_INFO *cs= system_charset_info; - char *user; - ulonglong unow= microsecond_interval_timer(); - DBUG_ENTER("fill_schema_processlist"); + processlist_callback_arg(THD *thd_arg, TABLE *table_arg): + thd(thd_arg), table(table_arg), unow(microsecond_interval_timer()) {} + THD *thd; + TABLE *table; + ulonglong unow; +}; - DEBUG_SYNC(thd,"fill_schema_processlist_after_unow"); - user= thd->security_ctx->master_access & PROCESS_ACL ? - NullS : thd->security_ctx->priv_user; +static my_bool processlist_callback(THD *tmp, processlist_callback_arg *arg) +{ + Security_context *tmp_sctx= tmp->security_ctx; + CHARSET_INFO *cs= system_charset_info; + const char *val; + ulonglong max_counter; + bool got_thd_data; + char *user= arg->thd->security_ctx->master_access & PROCESS_ACL ? + NullS : arg->thd->security_ctx->priv_user; + + if ((!tmp->vio_ok() && !tmp->system_thread) || + (user && (tmp->system_thread || !tmp_sctx->user || + strcmp(tmp_sctx->user, user)))) + return 0; - mysql_mutex_lock(&LOCK_thread_count); + restore_record(arg->table, s->default_values); + /* ID */ + arg->table->field[0]->store((longlong) tmp->thread_id, TRUE); + /* USER */ + val= tmp_sctx->user ? tmp_sctx->user : + (tmp->system_thread ? "system user" : "unauthenticated user"); + arg->table->field[1]->store(val, strlen(val), cs); + /* HOST */ + if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) && + arg->thd->security_ctx->host_or_ip[0]) + { + char host[LIST_PROCESS_HOST_LEN + 1]; + my_snprintf(host, LIST_PROCESS_HOST_LEN, "%s:%u", + tmp_sctx->host_or_ip, tmp->peer_port); + arg->table->field[2]->store(host, strlen(host), cs); + } + else + arg->table->field[2]->store(tmp_sctx->host_or_ip, + strlen(tmp_sctx->host_or_ip), cs); - if (!thd->killed) + if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data))) { - I_List_iterator<THD> it(threads); - THD* tmp; - - while ((tmp= it++)) + /* DB */ + if (tmp->db.str) { - Security_context *tmp_sctx= tmp->security_ctx; - const char *val; - ulonglong max_counter; - bool got_thd_data; - - if ((!tmp->vio_ok() && !tmp->system_thread) || - (user && (tmp->system_thread || !tmp_sctx->user || - strcmp(tmp_sctx->user, user)))) - continue; - - restore_record(table, s->default_values); - /* ID */ - table->field[0]->store((longlong) tmp->thread_id, TRUE); - /* USER */ - val= tmp_sctx->user ? tmp_sctx->user : - (tmp->system_thread ? "system user" : "unauthenticated user"); - table->field[1]->store(val, strlen(val), cs); - /* HOST */ - if (tmp->peer_port && (tmp_sctx->host || tmp_sctx->ip) && - thd->security_ctx->host_or_ip[0]) - { - char host[LIST_PROCESS_HOST_LEN + 1]; - my_snprintf(host, LIST_PROCESS_HOST_LEN, "%s:%u", - tmp_sctx->host_or_ip, tmp->peer_port); - table->field[2]->store(host, strlen(host), cs); - } - else - table->field[2]->store(tmp_sctx->host_or_ip, - strlen(tmp_sctx->host_or_ip), cs); + arg->table->field[3]->store(tmp->db.str, tmp->db.length, cs); + arg->table->field[3]->set_notnull(); + } + } - if ((got_thd_data= !trylock_short(&tmp->LOCK_thd_data))) - { - /* DB */ - if (tmp->db.str) - { - table->field[3]->store(tmp->db.str, tmp->db.length, cs); - table->field[3]->set_notnull(); - } - } + /* COMMAND */ + if ((val= (char *) (!got_thd_data ? "Busy" : + (tmp->killed >= KILL_QUERY ? + "Killed" : 0)))) + arg->table->field[4]->store(val, strlen(val), cs); + else + arg->table->field[4]->store(command_name[tmp->get_command()].str, + command_name[tmp->get_command()].length, cs); - /* COMMAND */ - if ((val= (char *) (!got_thd_data ? "Busy" : - (tmp->killed >= KILL_QUERY ? - "Killed" : 0)))) - table->field[4]->store(val, strlen(val), cs); - else - table->field[4]->store(command_name[tmp->get_command()].str, - command_name[tmp->get_command()].length, cs); + /* MYSQL_TIME */ + ulonglong utime= tmp->start_utime; + ulonglong utime_after_query_snapshot= tmp->utime_after_query; + if (utime < utime_after_query_snapshot) + utime= utime_after_query_snapshot; // COM_SLEEP + utime= utime && utime < arg->unow ? arg->unow - utime : 0; - /* MYSQL_TIME */ - ulonglong utime= tmp->start_utime; - ulonglong utime_after_query_snapshot= tmp->utime_after_query; - if (utime < utime_after_query_snapshot) - utime= utime_after_query_snapshot; // COM_SLEEP - utime= utime && utime < unow ? unow - utime : 0; + arg->table->field[5]->store(utime / HRTIME_RESOLUTION, TRUE); - table->field[5]->store(utime / HRTIME_RESOLUTION, TRUE); + if (got_thd_data) + { + if (tmp->query()) + { + arg->table->field[7]->store(tmp->query(), + MY_MIN(PROCESS_LIST_INFO_WIDTH, + tmp->query_length()), cs); + arg->table->field[7]->set_notnull(); - if (got_thd_data) - { - if (tmp->query()) - { - table->field[7]->store(tmp->query(), - MY_MIN(PROCESS_LIST_INFO_WIDTH, - tmp->query_length()), cs); - table->field[7]->set_notnull(); + /* INFO_BINARY */ + arg->table->field[16]->store(tmp->query(), + MY_MIN(PROCESS_LIST_INFO_WIDTH, + tmp->query_length()), + &my_charset_bin); + arg->table->field[16]->set_notnull(); + } - /* INFO_BINARY */ - table->field[16]->store(tmp->query(), - MY_MIN(PROCESS_LIST_INFO_WIDTH, - tmp->query_length()), - &my_charset_bin); - table->field[16]->set_notnull(); - } + /* + Progress report. We need to do this under a lock to ensure that all + is from the same stage. + */ + if ((max_counter= tmp->progress.max_counter)) + { + arg->table->field[9]->store((longlong) tmp->progress.stage + 1, 1); + arg->table->field[10]->store((longlong) tmp->progress.max_stage, 1); + arg->table->field[11]->store((double) tmp->progress.counter / + (double) max_counter*100.0); + } + mysql_mutex_unlock(&tmp->LOCK_thd_data); + } - /* - Progress report. We need to do this under a lock to ensure that all - is from the same stage. - */ - if ((max_counter= tmp->progress.max_counter)) - { - table->field[9]->store((longlong) tmp->progress.stage + 1, 1); - table->field[10]->store((longlong) tmp->progress.max_stage, 1); - table->field[11]->store((double) tmp->progress.counter / - (double) max_counter*100.0); - } - mysql_mutex_unlock(&tmp->LOCK_thd_data); - } + /* STATE */ + if ((val= thread_state_info(tmp))) + { + arg->table->field[6]->store(val, strlen(val), cs); + arg->table->field[6]->set_notnull(); + } - /* STATE */ - if ((val= thread_state_info(tmp))) - { - table->field[6]->store(val, strlen(val), cs); - table->field[6]->set_notnull(); - } + /* TIME_MS */ + arg->table->field[8]->store((double)(utime / (HRTIME_RESOLUTION / 1000.0))); - /* TIME_MS */ - table->field[8]->store((double)(utime / (HRTIME_RESOLUTION / 1000.0))); + /* + This may become negative if we free a memory allocated by another + thread in this thread. However it's better that we notice it eventually + than hide it. + */ + arg->table->field[12]->store((longlong) tmp->status_var.local_memory_used, + FALSE); + arg->table->field[13]->store((longlong) tmp->status_var.max_local_memory_used, + FALSE); + arg->table->field[14]->store((longlong) tmp->get_examined_row_count(), TRUE); - /* - This may become negative if we free a memory allocated by another - thread in this thread. However it's better that we notice it eventually - than hide it. - */ - table->field[12]->store((longlong) tmp->status_var.local_memory_used, - FALSE); - table->field[13]->store((longlong) tmp->status_var.max_local_memory_used, - FALSE); - table->field[14]->store((longlong) tmp->get_examined_row_count(), TRUE); + /* QUERY_ID */ + arg->table->field[15]->store(tmp->query_id, TRUE); - /* QUERY_ID */ - table->field[15]->store(tmp->query_id, TRUE); + arg->table->field[17]->store(tmp->os_thread_id); - table->field[17]->store(tmp->os_thread_id); + if (schema_table_store_record(arg->thd, arg->table)) + return 1; + return 0; +} - if (schema_table_store_record(thd, table)) - { - mysql_mutex_unlock(&LOCK_thread_count); - DBUG_RETURN(1); - } - } - } - mysql_mutex_unlock(&LOCK_thread_count); +int fill_schema_processlist(THD* thd, TABLE_LIST* tables, COND* cond) +{ + processlist_callback_arg arg(thd, tables->table); + DBUG_ENTER("fill_schema_processlist"); + DEBUG_SYNC(thd,"fill_schema_processlist_after_unow"); + if (!thd->killed && + server_threads.iterate(processlist_callback, &arg)) + DBUG_RETURN(1); DBUG_RETURN(0); } @@ -3770,36 +3777,38 @@ end: Return number of threads used */ -uint calc_sum_of_all_status(STATUS_VAR *to) +struct calc_sum_callback_arg { - uint count= 0; - DBUG_ENTER("calc_sum_of_all_status"); + calc_sum_callback_arg(STATUS_VAR *to_arg): to(to_arg), count(0) {} + STATUS_VAR *to; + uint count; +}; - /* Ensure that thread id not killed during loop */ - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - I_List_iterator<THD> it(threads); - THD *tmp; +static my_bool calc_sum_callback(THD *thd, calc_sum_callback_arg *arg) +{ + arg->count++; + if (!thd->status_in_global) + { + add_to_status(arg->to, &thd->status_var); + arg->to->local_memory_used+= thd->status_var.local_memory_used; + } + if (thd->get_command() != COM_SLEEP) + arg->to->threads_running++; + return 0; +} + + +uint calc_sum_of_all_status(STATUS_VAR *to) +{ + calc_sum_callback_arg arg(to); + DBUG_ENTER("calc_sum_of_all_status"); - /* Get global values as base */ *to= global_status_var; to->local_memory_used= 0; - /* Add to this status from existing threads */ - while ((tmp= it++)) - { - count++; - if (!tmp->status_in_global) - { - add_to_status(to, &tmp->status_var); - to->local_memory_used+= tmp->status_var.local_memory_used; - } - if (tmp->get_command() != COM_SLEEP) - to->threads_running++; - } - - mysql_mutex_unlock(&LOCK_thread_count); - DBUG_RETURN(count); + server_threads.iterate(calc_sum_callback, &arg); + DBUG_RETURN(arg.count); } diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 24ab972776c..695623cd4ea 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -243,7 +243,7 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) return NULL; } delete connect; - add_to_active_threads(thd); + server_threads.insert(thd); thd->set_mysys_var(mysys_var); thd->event_scheduler.data= scheduler_data; diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index 641d01645ba..e37fd6f0cf4 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -578,43 +578,24 @@ static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt) Also, recalculate time when next timeout check should run. */ -static void timeout_check(pool_timer_t *timer) +static my_bool timeout_check(THD *thd, pool_timer_t *timer) { DBUG_ENTER("timeout_check"); - - mysql_mutex_lock(&LOCK_thread_count); - I_List_iterator<THD> it(threads); - - /* Reset next timeout check, it will be recalculated in the loop below */ - my_atomic_fas64((volatile int64*)&timer->next_timeout_check, ULONGLONG_MAX); - - THD *thd; - while ((thd=it++)) + if (thd->net.reading_or_writing == 1) { - if (thd->net.reading_or_writing != 1) - continue; - - TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data; - if (!connection) - { - /* - Connection does not have scheduler data. This happens for example - if THD belongs to a different scheduler, that is listening to extra_port. - */ - continue; - } - - if(connection->abs_wait_timeout < timer->current_microtime) - { - tp_timeout_handler(connection); - } - else + /* + Check if connection does not have scheduler data. This happens for example + if THD belongs to a different scheduler, that is listening to extra_port. + */ + if (auto connection= (TP_connection_generic *) thd->event_scheduler.data) { - set_next_timeout_check(connection->abs_wait_timeout); + if (connection->abs_wait_timeout < timer->current_microtime) + tp_timeout_handler(connection); + else + set_next_timeout_check(connection->abs_wait_timeout); } } - mysql_mutex_unlock(&LOCK_thread_count); - DBUG_VOID_RETURN; + DBUG_RETURN(0); } @@ -671,7 +652,12 @@ static void* timer_thread(void *param) /* Check if any client exceeded wait_timeout */ if (timer->next_timeout_check <= timer->current_microtime) - timeout_check(timer); + { + /* Reset next timeout check, it will be recalculated below */ + my_atomic_fas64((volatile int64*) &timer->next_timeout_check, + ULONGLONG_MAX); + server_threads.iterate(timeout_check, timer); + } } mysql_mutex_unlock(&timer->mutex); } diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index e7f7db206a9..b33687afdbc 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2204,22 +2204,16 @@ static inline bool is_committing_connection(THD *thd) return ret; } -static bool have_client_connections() +static my_bool have_client_connections(THD *thd, void*) { - THD *tmp; - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) + DBUG_PRINT("quit",("Informing thread %lld that it's time to die", + (longlong) thd->thread_id)); + if (is_client_connection(thd) && thd->killed == KILL_CONNECTION) { - DBUG_PRINT("quit",("Informing thread %lld that it's time to die", - (longlong) tmp->thread_id)); - if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION) - { - (void)abort_replicated(tmp); - return true; - } + (void)abort_replicated(thd); + return 1; } - return false; + return 0; } static void wsrep_close_thread(THD *thd) @@ -2240,89 +2234,72 @@ static void wsrep_close_thread(THD *thd) } } -static my_bool have_committing_connections() +static my_bool have_committing_connections(THD *thd, void *) { - THD *tmp; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - if (!is_client_connection(tmp)) - continue; - - if (is_committing_connection(tmp)) - { - mysql_mutex_unlock(&LOCK_thread_count); - return TRUE; - } - } - mysql_mutex_unlock(&LOCK_thread_count); - return FALSE; + return is_client_connection(thd) && is_committing_connection(thd) ? 1 : 0; } int wsrep_wait_committing_connections_close(int wait_time) { int sleep_time= 100; - while (have_committing_connections() && wait_time > 0) + while (server_threads.iterate(have_committing_connections) && wait_time > 0) { WSREP_DEBUG("wait for committing transaction to close: %d", wait_time); my_sleep(sleep_time); wait_time -= sleep_time; } - if (have_committing_connections()) + return server_threads.iterate(have_committing_connections); +} + +static my_bool kill_all_threads(THD *thd, THD *caller_thd) +{ + DBUG_PRINT("quit", ("Informing thread %lld that it's time to die", + (longlong) thd->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (is_client_connection(thd) && thd != caller_thd) { - return 1; + if (is_replaying_connection(thd)) + thd->set_killed(KILL_CONNECTION); + else if (!abort_replicated(thd)) + { + /* replicated transactions must be skipped */ + WSREP_DEBUG("closing connection %lld", (longlong) thd->thread_id); + /* instead of wsrep_close_thread() we do now soft kill by THD::awake */ + thd->awake(KILL_CONNECTION); + } } return 0; } +static my_bool kill_remaining_threads(THD *thd, THD *caller_thd) +{ +#ifndef __bsdi__ // Bug in BSDI kernel + if (is_client_connection(thd) && + !abort_replicated(thd) && + !is_replaying_connection(thd) && + thd != caller_thd) + { + WSREP_INFO("killing local connection: %lld", (longlong) thd->thread_id); + close_connection(thd, 0); + } +#endif + return 0; +} + void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd) { /* First signal all threads that it's time to die */ - THD *tmp; mysql_mutex_lock(&LOCK_thread_count); // For unlink from list bool kill_cached_threads_saved= kill_cached_threads; kill_cached_threads= true; // prevent future threads caching mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die - I_List_iterator<THD> it(threads); - while ((tmp=it++)) - { - DBUG_PRINT("quit",("Informing thread %lld that it's time to die", - (longlong) tmp->thread_id)); - /* We skip slave threads & scheduler on this first loop through. */ - if (!is_client_connection(tmp)) - continue; - - if (tmp == except_caller_thd) - { - DBUG_ASSERT(is_client_connection(tmp)); - continue; - } - - if (is_replaying_connection(tmp)) - { - tmp->set_killed(KILL_CONNECTION); - continue; - } - - /* replicated transactions must be skipped */ - if (abort_replicated(tmp)) - continue; - - WSREP_DEBUG("closing connection %lld", (longlong) tmp->thread_id); - - /* - instead of wsrep_close_thread() we do now soft kill by THD::awake - */ - tmp->awake(KILL_CONNECTION); - } + server_threads.iterate(kill_all_threads, except_caller_thd); mysql_mutex_unlock(&LOCK_thread_count); if (thread_count) @@ -2332,26 +2309,12 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd) /* Force remaining threads to die by closing the connection to the client */ - - I_List_iterator<THD> it2(threads); - while ((tmp=it2++)) - { -#ifndef __bsdi__ // Bug in BSDI kernel - if (is_client_connection(tmp) && - !abort_replicated(tmp) && - !is_replaying_connection(tmp) && - tmp != except_caller_thd) - { - WSREP_INFO("killing local connection: %lld", (longlong) tmp->thread_id); - close_connection(tmp,0); - } -#endif - } + server_threads.iterate(kill_remaining_threads, except_caller_thd); DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); WSREP_DEBUG("waiting for client connections to close: %u", thread_count); - while (wait_to_end && have_client_connections()) + while (wait_to_end && server_threads.iterate(have_client_connections)) { mysql_cond_wait(&COND_thread_count, &LOCK_thread_count); DBUG_PRINT("quit",("One thread died (count=%u)", thread_count)); @@ -2371,25 +2334,22 @@ void wsrep_close_applier(THD *thd) wsrep_close_thread(thd); } -void wsrep_close_threads(THD *thd) +static my_bool wsrep_close_threads_callback(THD *thd, THD *caller_thd) { - THD *tmp; - mysql_mutex_lock(&LOCK_thread_count); // For unlink from list - - I_List_iterator<THD> it(threads); - while ((tmp=it++)) + DBUG_PRINT("quit",("Informing thread %lld that it's time to die", + (longlong) thd->thread_id)); + /* We skip slave threads & scheduler on this first loop through. */ + if (thd->wsrep_applier && thd != caller_thd) { - DBUG_PRINT("quit",("Informing thread %lld that it's time to die", - (longlong) tmp->thread_id)); - /* We skip slave threads & scheduler on this first loop through. */ - if (tmp->wsrep_applier && tmp != thd) - { - WSREP_DEBUG("closing wsrep thread %lld", (longlong) tmp->thread_id); - wsrep_close_thread (tmp); - } + WSREP_DEBUG("closing wsrep thread %lld", (longlong) thd->thread_id); + wsrep_close_thread(thd); } + return 0; +} - mysql_mutex_unlock(&LOCK_thread_count); +void wsrep_close_threads(THD *thd) +{ + server_threads.iterate(wsrep_close_threads_callback, thd); } void wsrep_wait_appliers_close(THD *thd) @@ -2730,7 +2690,7 @@ void* start_wsrep_THD(void *arg) goto error; } - mysql_mutex_lock(&LOCK_thread_count); + statistic_increment(thread_created, &LOCK_status); if (wsrep_gtid_mode) { @@ -2739,14 +2699,13 @@ void* start_wsrep_THD(void *arg) } thd->real_id=pthread_self(); // Keep purify happy - thread_created++; - threads.append(thd); my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0)); DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id)); thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer(); - (void) mysql_mutex_unlock(&LOCK_thread_count); + + server_threads.insert(thd); /* from bootstrap()... */ thd->bootstrap=1; @@ -2842,7 +2801,7 @@ void* start_wsrep_THD(void *arg) */ } - unlink_not_visible_thd(thd); + server_threads.erase(thd); delete thd; my_thread_end(); return(NULL); |