diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2019-05-26 13:25:12 +0200 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2019-05-26 19:20:35 +0200 |
commit | 2fc13d04d16f878ed693ad8ba56045b79ccb9650 (patch) | |
tree | 23cbb2dbe934d4879cae668b0164dbec0ba71503 | |
parent | 5f18bd3a3545bae97956fbbf4b29d2e9288a6505 (diff) | |
download | mariadb-git-2fc13d04d16f878ed693ad8ba56045b79ccb9650.tar.gz |
MDEV-19313 Threadpool : provide information schema tables for internals of generic threadpool
Added thread_pool_groups, thread_pool_queues, thread_pool_waits and
thread_pool_stats tables to information_schema.
-rw-r--r-- | sql/CMakeLists.txt | 1 | ||||
-rw-r--r-- | sql/mysqld.cc | 2 | ||||
-rw-r--r-- | sql/thread_pool_info.cc | 340 | ||||
-rw-r--r-- | sql/threadpool.h | 4 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 12 | ||||
-rw-r--r-- | sql/threadpool_generic.cc | 195 | ||||
-rw-r--r-- | sql/threadpool_generic.h | 150 |
7 files changed, 557 insertions, 147 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 4d8918d962e..6cc561877b9 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -164,6 +164,7 @@ IF ((CMAKE_SYSTEM_NAME MATCHES "Linux" OR ENDIF() SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc) SET(SQL_SOURCE ${SQL_SOURCE} threadpool_common.cc) + MYSQL_ADD_PLUGIN(thread_pool_info thread_pool_info.cc DEFAULT STATIC_ONLY NOT_EMBEDDED) ENDIF() IF(WIN32) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index d3eb76f22f0..a035f629562 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -7461,7 +7461,7 @@ static int debug_status_func(THD *thd, SHOW_VAR *var, char *buff, #endif #ifdef HAVE_POOL_OF_THREADS -int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff, +static int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff, enum enum_var_type scope) { var->type= SHOW_INT; diff --git a/sql/thread_pool_info.cc b/sql/thread_pool_info.cc new file mode 100644 index 00000000000..56760105668 --- /dev/null +++ b/sql/thread_pool_info.cc @@ -0,0 +1,340 @@ +/* Copyright(C) 2019 MariaDB + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include <mysql_version.h> +#include <mysql/plugin.h> + +#include <my_global.h> +#include <sql_class.h> +#include <table.h> +#include <mysql/plugin.h> +#include <sql_show.h> +#include <threadpool_generic.h> + +static ST_FIELD_INFO groups_fields_info[] = +{ + {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"CONNECTIONS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"ACTIVE_THREADS",6, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"STANDBY_THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0,0}, + {"QUEUE_LENGTH", 6, MYSQL_TYPE_LONG, 0,0, 0, 0}, + {"HAS_LISTENER",1,MYSQL_TYPE_TINY, 0, 0, 0, 0}, + {"IS_STALLED",1,MYSQL_TYPE_TINY, 0, 0, 0, 0}, + {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0} +}; + +static int groups_fill_table(THD* thd, TABLE_LIST* tables, COND*) +{ + if (!all_groups) + return 0; + + TABLE* table = tables->table; + for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++) + { + thread_group_t* group = &all_groups[i]; + /* ID */ + table->field[0]->store(i, true); + /* CONNECTION_COUNT */ + table->field[1]->store(group->connection_count, true); + /* THREAD_COUNT */ + table->field[2]->store(group->thread_count, true); + /* ACTIVE_THREAD_COUNT */ + table->field[3]->store(group->active_thread_count, true); + /* STANDBY_THREAD_COUNT */ + table->field[4]->store(group->waiting_threads.elements(), true); + /* QUEUE LENGTH */ + uint queue_len = group->queues[TP_PRIORITY_LOW].elements() + + group->queues[TP_PRIORITY_HIGH].elements(); + table->field[5]->store(queue_len, true); + /* HAS_LISTENER */ + table->field[6]->store((longlong)(group->listener != 0), true); + /* IS_STALLED */ + table->field[7]->store(group->stalled, true); + + if (schema_table_store_record(thd, table)) + return 1; + } + return 0; +} + + +static int groups_init(void* p) +{ + ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p; + schema->fields_info = groups_fields_info; + schema->fill_table = groups_fill_table; + return 0; +} + + +static ST_FIELD_INFO queues_field_info[] = +{ + {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"POSITION",6,MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"PRIORITY", 1, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"CONNECTION_ID", 19, MYSQL_TYPE_LONGLONG, MY_I_S_UNSIGNED, 0, 0, 0}, + {"QUEUEING_TIME_MICROSECONDS", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0}, + {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0} +}; + +typedef connection_queue_t::Iterator connection_queue_iterator; + +static int queues_fill_table(THD* thd, TABLE_LIST* tables, COND*) +{ + if (!all_groups) + return 0; + + TABLE* table = tables->table; + for (uint group_id = 0; + group_id < threadpool_max_size && all_groups[group_id].pollfd != INVALID_HANDLE_VALUE; + group_id++) + { + thread_group_t* group = &all_groups[group_id]; + + mysql_mutex_lock(&group->mutex); + bool err = false; + int pos = 0; + ulonglong now = microsecond_interval_timer(); + for (uint prio = 0; prio < NQUEUES && !err; prio++) + { + connection_queue_iterator it(group->queues[prio]); + TP_connection_generic* c; + while ((c = it++) != 0) + { + /* GROUP_ID */ + table->field[0]->store(group_id, true); + /* POSITION */ + table->field[1]->store(pos++, true); + /* PRIORITY */ + table->field[2]->store(prio, true); + /* CONNECTION_ID */ + table->field[3]->store(c->thd->thread_id, true); + /* QUEUEING_TIME */ + table->field[4]->store(now - c->enqueue_time, true); + + err = schema_table_store_record(thd, table); + if (err) + break; + } + } + mysql_mutex_unlock(&group->mutex); + if (err) + return 1; + } + return 0; +} + +static int queues_init(void* p) +{ + ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p; + schema->fields_info = queues_field_info; + schema->fill_table = queues_fill_table; + return 0; +} + +static ST_FIELD_INFO stats_fields_info[] = +{ + {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0}, + {"THREAD_CREATIONS",19,MYSQL_TYPE_LONGLONG,0,0, 0,0}, + {"THREAD_CREATIONS_DUE_TO_STALL",19,MYSQL_TYPE_LONGLONG,0,0, 0,0}, + {"WAKES",19,MYSQL_TYPE_LONGLONG,0,0, 0,0}, + {"WAKES_DUE_TO_STALL",19,MYSQL_TYPE_LONGLONG,0,0, 0,0}, + {"THROTTLES",19,MYSQL_TYPE_LONGLONG,0,0, 0,0}, + {"STALLS",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0}, + {"POLLS_BY_LISTENER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0}, + {"POLLS_BY_WORKER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0}, + {"DEQUEUES_BY_LISTENER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0}, + {"DEQUEUES_BY_WORKER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0}, + {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0} +}; + +static int stats_fill_table(THD* thd, TABLE_LIST* tables, COND*) +{ + if (!all_groups) + return 0; + + TABLE* table = tables->table; + for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++) + { + table->field[0]->store(i, true); + thread_group_t* group = &all_groups[i]; + + mysql_mutex_lock(&group->mutex); + thread_group_counters_t* counters = &group->counters; + table->field[1]->store(counters->thread_creations, true); + table->field[2]->store(counters->thread_creations_due_to_stall, true); + table->field[3]->store(counters->wakes, true); + table->field[4]->store(counters->wakes_due_to_stall, true); + table->field[5]->store(counters->throttles, true); + table->field[6]->store(counters->stalls, true); + table->field[7]->store(counters->polls_by_listener, true); + table->field[8]->store(counters->polls_by_worker, true); + table->field[9]->store(counters->dequeues_by_listener, true); + table->field[10]->store(counters->dequeues_by_worker, true); + mysql_mutex_unlock(&group->mutex); + if (schema_table_store_record(thd, table)) + return 1; + } + return 0; +} + +static int stats_reset_table() +{ + for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++) + { + thread_group_t* group = &all_groups[i]; + mysql_mutex_lock(&group->mutex); + memset(&group->counters, 0, sizeof(group->counters)); + mysql_mutex_unlock(&group->mutex); + } + return 0; +} + +static int stats_init(void* p) +{ + ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p; + schema->fields_info = stats_fields_info; + schema->fill_table = stats_fill_table; + schema->reset_table = stats_reset_table; + return 0; +} + + + +static ST_FIELD_INFO waits_fields_info[] = +{ + {"REASON", 16, MYSQL_TYPE_STRING, 0, 0, 0, 0}, + {"COUNT",19,MYSQL_TYPE_LONGLONG,0,0, 0,0}, + {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0} +}; + +/* See thd_wait_type enum for explanation*/ +static const LEX_CSTRING wait_reasons[THD_WAIT_LAST] = +{ + {STRING_WITH_LEN("UNKNOWN")}, + {STRING_WITH_LEN("SLEEP")}, + {STRING_WITH_LEN("DISKIO")}, + {STRING_WITH_LEN("ROW_LOCK")}, + {STRING_WITH_LEN("GLOBAL_LOCK")}, + {STRING_WITH_LEN("META_DATA_LOCK")}, + {STRING_WITH_LEN("TABLE_LOCK")}, + {STRING_WITH_LEN("USER_LOCK")}, + {STRING_WITH_LEN("BINLOG")}, + {STRING_WITH_LEN("GROUP_COMMIT")}, + {STRING_WITH_LEN("SYNC")}, + {STRING_WITH_LEN("NET")} +}; + +extern Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST]; + +static int waits_fill_table(THD* thd, TABLE_LIST* tables, COND*) +{ + if (!all_groups) + return 0; + + TABLE* table = tables->table; + for (auto i = 0; i < THD_WAIT_LAST; i++) + { + table->field[0]->store(wait_reasons[i].str, wait_reasons[i].length, system_charset_info); + table->field[1]->store(tp_waits[i], true); + if (schema_table_store_record(thd, table)) + return 1; + } + return 0; +} + +static int waits_reset_table() +{ + for (auto i = 0; i < THD_WAIT_LAST; i++) + tp_waits[i] = 0; + + return 0; +} + +static int waits_init(void* p) +{ + ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p; + schema->fields_info = waits_fields_info; + schema->fill_table = waits_fill_table; + schema->reset_table = waits_reset_table; + return 0; +} + +static struct st_mysql_information_schema plugin_descriptor = +{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION }; + +maria_declare_plugin(thread_pool_info) +{ + MYSQL_INFORMATION_SCHEMA_PLUGIN, + &plugin_descriptor, + "THREAD_POOL_GROUPS", + "Vladislav Vaintroub", + "Provides information about threadpool groups.", + PLUGIN_LICENSE_GPL, + groups_init, + 0, + 0x0100, + NULL, + NULL, + "1.0", + MariaDB_PLUGIN_MATURITY_STABLE +}, +{ + MYSQL_INFORMATION_SCHEMA_PLUGIN, + &plugin_descriptor, + "THREAD_POOL_QUEUES", + "Vladislav Vaintroub", + "Provides information about threadpool queues.", + PLUGIN_LICENSE_GPL, + queues_init, + 0, + 0x0100, + NULL, + NULL, + "1.0", + MariaDB_PLUGIN_MATURITY_STABLE +}, +{ + MYSQL_INFORMATION_SCHEMA_PLUGIN, + &plugin_descriptor, + "THREAD_POOL_STATS", + "Vladislav Vaintroub", + "Provides performance counter information for threadpool.", + PLUGIN_LICENSE_GPL, + stats_init, + 0, + 0x0100, + NULL, + NULL, + "1.0", + MariaDB_PLUGIN_MATURITY_STABLE +}, +{ + MYSQL_INFORMATION_SCHEMA_PLUGIN, + &plugin_descriptor, + "THREAD_POOL_WAITS", + "Vladislav Vaintroub", + "Provides wait counters for threadpool.", + PLUGIN_LICENSE_GPL, + waits_init, + 0, + 0x0100, + NULL, + NULL, + "1.0", + MariaDB_PLUGIN_MATURITY_STABLE +} +maria_declare_plugin_end; diff --git a/sql/threadpool.h b/sql/threadpool.h index 6299510d002..7d2c34e0363 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -64,8 +64,6 @@ extern int tp_get_thread_count(); /* Activate threadpool scheduler */ extern void tp_scheduler(void); -extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff, - enum enum_var_type scope); enum TP_PRIORITY { TP_PRIORITY_HIGH, @@ -88,6 +86,8 @@ enum TP_STATE inside threadpool_win.cc and threadpool_unix.cc */ +class CONNECT; + struct TP_connection { THD* thd; diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 3628ceb9112..0cbd076016d 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -23,7 +23,7 @@ #include <sql_audit.h> #include <debug_sync.h> #include <threadpool.h> - +#include <my_counter.h> /* Threadpool parameters */ @@ -153,9 +153,8 @@ static TP_PRIORITY get_priority(TP_connection *c) DBUG_ASSERT(c->thd == current_thd); TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority; if (prio == TP_PRIORITY_AUTO) - { - return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW; - } + prio= c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW; + return prio; } @@ -463,12 +462,17 @@ void tp_timeout_handler(TP_connection *c) mysql_mutex_unlock(&thd->LOCK_thd_kill); } +MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST]; static void tp_wait_begin(THD *thd, int type) { TP_connection *c = get_TP_connection(thd); if (c) + { + DBUG_ASSERT(type > 0 && type < THD_WAIT_LAST); + tp_waits[type]++; c->wait_begin(type); + } } diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index 1998b8d281b..3cb7be26eb7 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -13,56 +13,27 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ +#if (defined HAVE_POOL_OF_THREADS) && !defined(EMBEDDED_LIBRARY) + +#include "threadpool_generic.h" #include "mariadb.h" #include <violite.h> #include <sql_priv.h> #include <sql_class.h> #include <my_pthread.h> #include <scheduler.h> - -#ifdef HAVE_POOL_OF_THREADS - -#ifdef _WIN32 -/* AIX may define this, too ?*/ -#define HAVE_IOCP -#endif - -#ifdef HAVE_IOCP -#define OPTIONAL_IO_POLL_READ_PARAM this -#else -#define OPTIONAL_IO_POLL_READ_PARAM 0 -#endif - -#ifdef _WIN32 -typedef HANDLE TP_file_handle; -#else -typedef int TP_file_handle; -#define INVALID_HANDLE_VALUE -1 -#endif - - #include <sql_connect.h> #include <mysqld.h> #include <debug_sync.h> #include <time.h> #include <sql_plist.h> #include <threadpool.h> -#include <time.h> -#ifdef __linux__ -#include <sys/epoll.h> -typedef struct epoll_event native_event; -#elif defined(HAVE_KQUEUE) -#include <sys/event.h> -typedef struct kevent native_event; -#elif defined (__sun) -#include <port.h> -typedef port_event_t native_event; -#elif defined (HAVE_IOCP) -typedef OVERLAPPED_ENTRY native_event; -#else -#error threadpool is not available on this platform -#endif +#ifdef HAVE_IOCP +#define OPTIONAL_IO_POLL_READ_PARAM this +#else +#define OPTIONAL_IO_POLL_READ_PARAM 0 +#endif static void io_poll_close(TP_file_handle fd) { @@ -119,86 +90,7 @@ static PSI_thread_info thread_list[] = #define PSI_register(X) /* no-op */ #endif - -struct thread_group_t; - -/* Per-thread structure for workers */ -struct worker_thread_t -{ - ulonglong event_count; /* number of request handled by this thread */ - thread_group_t* thread_group; - worker_thread_t *next_in_list; - worker_thread_t **prev_in_list; - - mysql_cond_t cond; - bool woken; -}; - -typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t, - &worker_thread_t::next_in_list, - &worker_thread_t::prev_in_list> - > -worker_list_t; - -struct TP_connection_generic:public TP_connection -{ - TP_connection_generic(CONNECT *c); - ~TP_connection_generic(); - - virtual int init(){ return 0; }; - virtual void set_io_timeout(int sec); - virtual int start_io(); - virtual void wait_begin(int type); - virtual void wait_end(); - - thread_group_t *thread_group; - TP_connection_generic *next_in_queue; - TP_connection_generic **prev_in_queue; - ulonglong abs_wait_timeout; - ulonglong dequeue_time; - TP_file_handle fd; - bool bound_to_poll_descriptor; - int waiting; -#ifdef HAVE_IOCP - OVERLAPPED overlapped; -#endif -#ifdef _WIN32 - enum_vio_type vio_type; -#endif -}; - - -typedef I_P_List<TP_connection_generic, - I_P_List_adapter<TP_connection_generic, - &TP_connection_generic::next_in_queue, - &TP_connection_generic::prev_in_queue>, - I_P_List_null_counter, - I_P_List_fast_push_back<TP_connection_generic> > -connection_queue_t; - -const int NQUEUES=2; /* We have high and low priority queues*/ - -struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t -{ - mysql_mutex_t mutex; - connection_queue_t queues[NQUEUES]; - worker_list_t waiting_threads; - worker_thread_t *listener; - pthread_attr_t *pthread_attr; - TP_file_handle pollfd; - int thread_count; - int active_thread_count; - int connection_count; - /* Stats for the deadlock detection timer routine.*/ - int io_event_count; - int queue_event_count; - ulonglong last_thread_creation_time; - int shutdown_pipe[2]; - bool shutdown; - bool stalled; -}; - -static thread_group_t *all_groups; +thread_group_t *all_groups; static uint group_count; static int32 shutdown_group_count; @@ -224,9 +116,9 @@ static pool_timer_t pool_timer; static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt); -static int wake_thread(thread_group_t *thread_group); -static int wake_or_create_thread(thread_group_t *thread_group); -static int create_worker(thread_group_t *thread_group); +static int wake_thread(thread_group_t *thread_group,bool due_to_stall); +static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall=false); +static int create_worker(thread_group_t *thread_group, bool due_to_stall); static void *worker_main(void *param); static void check_stall(thread_group_t *thread_group); static void set_next_timeout_check(ulonglong abstime); @@ -563,11 +455,11 @@ static void queue_init(thread_group_t *thread_group) static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt) { - ulonglong now= pool_timer.current_microtime; + ulonglong now= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime; for(int i=0; i < cnt; i++) { TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]); - c->dequeue_time= now; + c->enqueue_time= now; thread_group->queues[c->priority].push_back(c); } } @@ -681,7 +573,7 @@ void check_stall(thread_group_t *thread_group) for (;;) { c= thread_group->queues[TP_PRIORITY_LOW].front(); - if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer) + if (c && pool_timer.current_microtime - c->enqueue_time > 1000ULL * threadpool_prio_kickup_timer) { thread_group->queues[TP_PRIORITY_LOW].remove(c); thread_group->queues[TP_PRIORITY_HIGH].push_back(c); @@ -698,7 +590,7 @@ void check_stall(thread_group_t *thread_group) */ if (!thread_group->listener && !thread_group->io_event_count) { - wake_or_create_thread(thread_group); + wake_or_create_thread(thread_group, true); mysql_mutex_unlock(&thread_group->mutex); return; } @@ -735,7 +627,8 @@ void check_stall(thread_group_t *thread_group) if (!is_queue_empty(thread_group) && !thread_group->queue_event_count) { thread_group->stalled= true; - wake_or_create_thread(thread_group); + TP_INCREMENT_GROUP_COUNTER(thread_group,stalls); + wake_or_create_thread(thread_group,true); } /* Reset queue event count */ @@ -790,13 +683,13 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, break; cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); - + TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_listener); if (cnt <=0) { DBUG_ASSERT(thread_group->shutdown); break; } - + mysql_mutex_lock(&thread_group->mutex); if (thread_group->shutdown) @@ -864,7 +757,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, if(thread_group->active_thread_count==0) { /* We added some work items to queue, now wake a worker. */ - if(wake_thread(thread_group)) + if(wake_thread(thread_group, false)) { /* Wake failed, hence groups has no idle threads. Now check if there are @@ -882,7 +775,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, create thread, but waiting for timer would be an inefficient and pointless delay. */ - create_worker(thread_group); + create_worker(thread_group, false); } } } @@ -919,7 +812,7 @@ static void add_thread_count(thread_group_t *thread_group, int32 count) per group to prevent deadlocks (one listener + one worker) */ -static int create_worker(thread_group_t *thread_group) +static int create_worker(thread_group_t *thread_group, bool due_to_stall) { pthread_t thread_id; bool max_threads_reached= false; @@ -942,6 +835,11 @@ static int create_worker(thread_group_t *thread_group) thread_group->last_thread_creation_time=microsecond_interval_timer(); statistic_increment(thread_created,&LOCK_status); add_thread_count(thread_group, 1); + TP_INCREMENT_GROUP_COUNTER(thread_group,thread_creations); + if(due_to_stall) + { + TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall); + } } else { @@ -993,15 +891,17 @@ static ulonglong microsecond_throttling_interval(thread_group_t *thread_group) Worker creation is throttled, so we avoid too many threads to be created during the short time. */ -static int wake_or_create_thread(thread_group_t *thread_group) +static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall) { DBUG_ENTER("wake_or_create_thread"); if (thread_group->shutdown) DBUG_RETURN(0); - if (wake_thread(thread_group) == 0) + if (wake_thread(thread_group, due_to_stall) == 0) + { DBUG_RETURN(0); + } if (thread_group->thread_count > thread_group->connection_count) DBUG_RETURN(-1); @@ -1015,7 +915,7 @@ static int wake_or_create_thread(thread_group_t *thread_group) idle thread to wakeup. Smells like a potential deadlock or very slowly executing requests, e.g sleeps or user locks. */ - DBUG_RETURN(create_worker(thread_group)); + DBUG_RETURN(create_worker(thread_group, due_to_stall)); } ulonglong now = microsecond_interval_timer(); @@ -1026,9 +926,10 @@ static int wake_or_create_thread(thread_group_t *thread_group) if (time_since_last_thread_created > microsecond_throttling_interval(thread_group)) { - DBUG_RETURN(create_worker(thread_group)); + DBUG_RETURN(create_worker(thread_group, due_to_stall)); } - + + TP_INCREMENT_GROUP_COUNTER(thread_group,throttles); DBUG_RETURN(-1); } @@ -1074,7 +975,7 @@ void thread_group_destroy(thread_group_t *thread_group) Wake sleeping thread from waiting list */ -static int wake_thread(thread_group_t *thread_group) +static int wake_thread(thread_group_t *thread_group,bool due_to_stall) { DBUG_ENTER("wake_thread"); worker_thread_t *thread = thread_group->waiting_threads.front(); @@ -1083,6 +984,11 @@ static int wake_thread(thread_group_t *thread_group) thread->woken= true; thread_group->waiting_threads.remove(thread); mysql_cond_signal(&thread->cond); + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes); + if (due_to_stall) + { + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall); + } DBUG_RETURN(0); } DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ @@ -1140,7 +1046,7 @@ static void thread_group_close(thread_group_t *thread_group) wake_listener(thread_group); /* Wake all workers. */ - while(wake_thread(thread_group) == 0) + while(wake_thread(thread_group, false) == 0) { } @@ -1224,7 +1130,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, { connection = queue_get(thread_group); if(connection) + { + TP_INCREMENT_GROUP_COUNTER(thread_group,dequeues_by_worker); break; + } } /* If there is currently no listener in the group, become one. */ @@ -1235,7 +1144,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, mysql_mutex_unlock(&thread_group->mutex); connection = listener(current_thread, thread_group); - + if (connection) + { + TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues_by_listener); + } mysql_mutex_lock(&thread_group->mutex); thread_group->active_thread_count++; /* There is no listener anymore, it just returned. */ @@ -1251,9 +1163,9 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, */ if (!oversubscribed) { - native_event ev[MAX_EVENTS]; int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0); + TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_worker); if (cnt > 0) { queue_put(thread_group, ev, cnt); @@ -1300,6 +1212,7 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, } thread_group->stalled= false; + mysql_mutex_unlock(&thread_group->mutex); DBUG_RETURN(connection); @@ -1515,7 +1428,7 @@ static int change_group(TP_connection_generic *c, new_group->connection_count++; /* Ensure that there is a listener in the new group. */ if (!new_group->thread_count) - ret= create_worker(new_group); + ret= create_worker(new_group, false); mysql_mutex_unlock(&new_group->mutex); return ret; } @@ -1775,4 +1688,6 @@ static void print_pool_blocked_message(bool max_threads_reached) } } + + #endif /* HAVE_POOL_OF_THREADS */ diff --git a/sql/threadpool_generic.h b/sql/threadpool_generic.h new file mode 100644 index 00000000000..4b83e1d796f --- /dev/null +++ b/sql/threadpool_generic.h @@ -0,0 +1,150 @@ +/* Copyright(C) 2019 MariaDB + * + * This program is free software; you can redistribute itand /or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#if defined (HAVE_POOL_OF_THREADS) +#include <my_global.h> +#include <sql_plist.h> +#include <my_pthread.h> +#include <threadpool.h> +#include <mysqld.h> +#include <violite.h> + +#ifdef _WIN32 +#include <windows.h> +/* AIX may define this, too ?*/ +#define HAVE_IOCP +#endif + + +#ifdef _WIN32 +typedef HANDLE TP_file_handle; +#else +typedef int TP_file_handle; +#define INVALID_HANDLE_VALUE -1 +#endif + +#ifdef __linux__ +#include <sys/epoll.h> +typedef struct epoll_event native_event; +#elif defined(HAVE_KQUEUE) +#include <sys/event.h> +typedef struct kevent native_event; +#elif defined (__sun) +#include <port.h> +typedef port_event_t native_event; +#elif defined (HAVE_IOCP) +typedef OVERLAPPED_ENTRY native_event; +#else +#error threadpool is not available on this platform +#endif + +struct thread_group_t; + +/* Per-thread structure for workers */ +struct worker_thread_t +{ + ulonglong event_count; /* number of request handled by this thread */ + thread_group_t* thread_group; + worker_thread_t* next_in_list; + worker_thread_t** prev_in_list; + mysql_cond_t cond; + bool woken; +}; + +typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t, + & worker_thread_t::next_in_list, + & worker_thread_t::prev_in_list>, + I_P_List_counter +> +worker_list_t; + +struct TP_connection_generic :public TP_connection +{ + TP_connection_generic(CONNECT* c); + ~TP_connection_generic(); + + virtual int init() { return 0; }; + virtual void set_io_timeout(int sec); + virtual int start_io(); + virtual void wait_begin(int type); + virtual void wait_end(); + + thread_group_t* thread_group; + TP_connection_generic* next_in_queue; + TP_connection_generic** prev_in_queue; + ulonglong abs_wait_timeout; + ulonglong enqueue_time; + TP_file_handle fd; + bool bound_to_poll_descriptor; + int waiting; +#ifdef HAVE_IOCP + OVERLAPPED overlapped; +#endif +#ifdef _WIN32 + enum_vio_type vio_type; +#endif +}; + + +typedef I_P_List<TP_connection_generic, + I_P_List_adapter<TP_connection_generic, + & TP_connection_generic::next_in_queue, + & TP_connection_generic::prev_in_queue>, + I_P_List_counter, + I_P_List_fast_push_back<TP_connection_generic> > + connection_queue_t; + +const int NQUEUES = 2; /* We have high and low priority queues*/ + +struct thread_group_counters_t +{ + ulonglong thread_creations; + ulonglong thread_creations_due_to_stall; + ulonglong wakes; + ulonglong wakes_due_to_stall; + ulonglong throttles; + ulonglong stalls; + ulonglong dequeues_by_worker; + ulonglong dequeues_by_listener; + ulonglong polls_by_listener; + ulonglong polls_by_worker; +}; + +struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t +{ + mysql_mutex_t mutex; + connection_queue_t queues[NQUEUES]; + worker_list_t waiting_threads; + worker_thread_t* listener; + pthread_attr_t* pthread_attr; + TP_file_handle pollfd; + int thread_count; + int active_thread_count; + int connection_count; + /* Stats for the deadlock detection timer routine.*/ + int io_event_count; + int queue_event_count; + ulonglong last_thread_creation_time; + int shutdown_pipe[2]; + bool shutdown; + bool stalled; + thread_group_counters_t counters; +}; + +#define TP_INCREMENT_GROUP_COUNTER(group,var) group->counters.var++; + +extern thread_group_t* all_groups; +#endif + |