summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2019-05-26 13:25:12 +0200
committerVladislav Vaintroub <wlad@mariadb.com>2019-05-26 19:20:35 +0200
commit2fc13d04d16f878ed693ad8ba56045b79ccb9650 (patch)
tree23cbb2dbe934d4879cae668b0164dbec0ba71503
parent5f18bd3a3545bae97956fbbf4b29d2e9288a6505 (diff)
downloadmariadb-git-2fc13d04d16f878ed693ad8ba56045b79ccb9650.tar.gz
MDEV-19313 Threadpool : provide information schema tables for internals of generic threadpool
Added thread_pool_groups, thread_pool_queues, thread_pool_waits and thread_pool_stats tables to information_schema.
-rw-r--r--sql/CMakeLists.txt1
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/thread_pool_info.cc340
-rw-r--r--sql/threadpool.h4
-rw-r--r--sql/threadpool_common.cc12
-rw-r--r--sql/threadpool_generic.cc195
-rw-r--r--sql/threadpool_generic.h150
7 files changed, 557 insertions, 147 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 4d8918d962e..6cc561877b9 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -164,6 +164,7 @@ IF ((CMAKE_SYSTEM_NAME MATCHES "Linux" OR
ENDIF()
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_common.cc)
+ MYSQL_ADD_PLUGIN(thread_pool_info thread_pool_info.cc DEFAULT STATIC_ONLY NOT_EMBEDDED)
ENDIF()
IF(WIN32)
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d3eb76f22f0..a035f629562 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -7461,7 +7461,7 @@ static int debug_status_func(THD *thd, SHOW_VAR *var, char *buff,
#endif
#ifdef HAVE_POOL_OF_THREADS
-int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff,
+static int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope)
{
var->type= SHOW_INT;
diff --git a/sql/thread_pool_info.cc b/sql/thread_pool_info.cc
new file mode 100644
index 00000000000..56760105668
--- /dev/null
+++ b/sql/thread_pool_info.cc
@@ -0,0 +1,340 @@
+/* Copyright(C) 2019 MariaDB
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include <mysql_version.h>
+#include <mysql/plugin.h>
+
+#include <my_global.h>
+#include <sql_class.h>
+#include <table.h>
+#include <mysql/plugin.h>
+#include <sql_show.h>
+#include <threadpool_generic.h>
+
+static ST_FIELD_INFO groups_fields_info[] =
+{
+ {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"CONNECTIONS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"ACTIVE_THREADS",6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"STANDBY_THREADS", 6, MYSQL_TYPE_LONG, 0, 0, 0,0},
+ {"QUEUE_LENGTH", 6, MYSQL_TYPE_LONG, 0,0, 0, 0},
+ {"HAS_LISTENER",1,MYSQL_TYPE_TINY, 0, 0, 0, 0},
+ {"IS_STALLED",1,MYSQL_TYPE_TINY, 0, 0, 0, 0},
+ {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
+};
+
+static int groups_fill_table(THD* thd, TABLE_LIST* tables, COND*)
+{
+ if (!all_groups)
+ return 0;
+
+ TABLE* table = tables->table;
+ for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
+ {
+ thread_group_t* group = &all_groups[i];
+ /* ID */
+ table->field[0]->store(i, true);
+ /* CONNECTION_COUNT */
+ table->field[1]->store(group->connection_count, true);
+ /* THREAD_COUNT */
+ table->field[2]->store(group->thread_count, true);
+ /* ACTIVE_THREAD_COUNT */
+ table->field[3]->store(group->active_thread_count, true);
+ /* STANDBY_THREAD_COUNT */
+ table->field[4]->store(group->waiting_threads.elements(), true);
+ /* QUEUE LENGTH */
+ uint queue_len = group->queues[TP_PRIORITY_LOW].elements()
+ + group->queues[TP_PRIORITY_HIGH].elements();
+ table->field[5]->store(queue_len, true);
+ /* HAS_LISTENER */
+ table->field[6]->store((longlong)(group->listener != 0), true);
+ /* IS_STALLED */
+ table->field[7]->store(group->stalled, true);
+
+ if (schema_table_store_record(thd, table))
+ return 1;
+ }
+ return 0;
+}
+
+
+static int groups_init(void* p)
+{
+ ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
+ schema->fields_info = groups_fields_info;
+ schema->fill_table = groups_fill_table;
+ return 0;
+}
+
+
+static ST_FIELD_INFO queues_field_info[] =
+{
+ {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"POSITION",6,MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"PRIORITY", 1, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"CONNECTION_ID", 19, MYSQL_TYPE_LONGLONG, MY_I_S_UNSIGNED, 0, 0, 0},
+ {"QUEUEING_TIME_MICROSECONDS", 19, MYSQL_TYPE_LONGLONG, 0, 0, 0, 0},
+ {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
+};
+
+typedef connection_queue_t::Iterator connection_queue_iterator;
+
+static int queues_fill_table(THD* thd, TABLE_LIST* tables, COND*)
+{
+ if (!all_groups)
+ return 0;
+
+ TABLE* table = tables->table;
+ for (uint group_id = 0;
+ group_id < threadpool_max_size && all_groups[group_id].pollfd != INVALID_HANDLE_VALUE;
+ group_id++)
+ {
+ thread_group_t* group = &all_groups[group_id];
+
+ mysql_mutex_lock(&group->mutex);
+ bool err = false;
+ int pos = 0;
+ ulonglong now = microsecond_interval_timer();
+ for (uint prio = 0; prio < NQUEUES && !err; prio++)
+ {
+ connection_queue_iterator it(group->queues[prio]);
+ TP_connection_generic* c;
+ while ((c = it++) != 0)
+ {
+ /* GROUP_ID */
+ table->field[0]->store(group_id, true);
+ /* POSITION */
+ table->field[1]->store(pos++, true);
+ /* PRIORITY */
+ table->field[2]->store(prio, true);
+ /* CONNECTION_ID */
+ table->field[3]->store(c->thd->thread_id, true);
+ /* QUEUEING_TIME */
+ table->field[4]->store(now - c->enqueue_time, true);
+
+ err = schema_table_store_record(thd, table);
+ if (err)
+ break;
+ }
+ }
+ mysql_mutex_unlock(&group->mutex);
+ if (err)
+ return 1;
+ }
+ return 0;
+}
+
+static int queues_init(void* p)
+{
+ ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
+ schema->fields_info = queues_field_info;
+ schema->fill_table = queues_fill_table;
+ return 0;
+}
+
+static ST_FIELD_INFO stats_fields_info[] =
+{
+ {"GROUP_ID", 6, MYSQL_TYPE_LONG, 0, 0, 0, 0},
+ {"THREAD_CREATIONS",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
+ {"THREAD_CREATIONS_DUE_TO_STALL",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
+ {"WAKES",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
+ {"WAKES_DUE_TO_STALL",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
+ {"THROTTLES",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
+ {"STALLS",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
+ {"POLLS_BY_LISTENER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
+ {"POLLS_BY_WORKER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
+ {"DEQUEUES_BY_LISTENER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
+ {"DEQUEUES_BY_WORKER",19,MYSQL_TYPE_LONGLONG,0,0, 0, 0},
+ {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
+};
+
+static int stats_fill_table(THD* thd, TABLE_LIST* tables, COND*)
+{
+ if (!all_groups)
+ return 0;
+
+ TABLE* table = tables->table;
+ for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
+ {
+ table->field[0]->store(i, true);
+ thread_group_t* group = &all_groups[i];
+
+ mysql_mutex_lock(&group->mutex);
+ thread_group_counters_t* counters = &group->counters;
+ table->field[1]->store(counters->thread_creations, true);
+ table->field[2]->store(counters->thread_creations_due_to_stall, true);
+ table->field[3]->store(counters->wakes, true);
+ table->field[4]->store(counters->wakes_due_to_stall, true);
+ table->field[5]->store(counters->throttles, true);
+ table->field[6]->store(counters->stalls, true);
+ table->field[7]->store(counters->polls_by_listener, true);
+ table->field[8]->store(counters->polls_by_worker, true);
+ table->field[9]->store(counters->dequeues_by_listener, true);
+ table->field[10]->store(counters->dequeues_by_worker, true);
+ mysql_mutex_unlock(&group->mutex);
+ if (schema_table_store_record(thd, table))
+ return 1;
+ }
+ return 0;
+}
+
+static int stats_reset_table()
+{
+ for (uint i = 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
+ {
+ thread_group_t* group = &all_groups[i];
+ mysql_mutex_lock(&group->mutex);
+ memset(&group->counters, 0, sizeof(group->counters));
+ mysql_mutex_unlock(&group->mutex);
+ }
+ return 0;
+}
+
+static int stats_init(void* p)
+{
+ ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
+ schema->fields_info = stats_fields_info;
+ schema->fill_table = stats_fill_table;
+ schema->reset_table = stats_reset_table;
+ return 0;
+}
+
+
+
+static ST_FIELD_INFO waits_fields_info[] =
+{
+ {"REASON", 16, MYSQL_TYPE_STRING, 0, 0, 0, 0},
+ {"COUNT",19,MYSQL_TYPE_LONGLONG,0,0, 0,0},
+ {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}
+};
+
+/* See thd_wait_type enum for explanation*/
+static const LEX_CSTRING wait_reasons[THD_WAIT_LAST] =
+{
+ {STRING_WITH_LEN("UNKNOWN")},
+ {STRING_WITH_LEN("SLEEP")},
+ {STRING_WITH_LEN("DISKIO")},
+ {STRING_WITH_LEN("ROW_LOCK")},
+ {STRING_WITH_LEN("GLOBAL_LOCK")},
+ {STRING_WITH_LEN("META_DATA_LOCK")},
+ {STRING_WITH_LEN("TABLE_LOCK")},
+ {STRING_WITH_LEN("USER_LOCK")},
+ {STRING_WITH_LEN("BINLOG")},
+ {STRING_WITH_LEN("GROUP_COMMIT")},
+ {STRING_WITH_LEN("SYNC")},
+ {STRING_WITH_LEN("NET")}
+};
+
+extern Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
+
+static int waits_fill_table(THD* thd, TABLE_LIST* tables, COND*)
+{
+ if (!all_groups)
+ return 0;
+
+ TABLE* table = tables->table;
+ for (auto i = 0; i < THD_WAIT_LAST; i++)
+ {
+ table->field[0]->store(wait_reasons[i].str, wait_reasons[i].length, system_charset_info);
+ table->field[1]->store(tp_waits[i], true);
+ if (schema_table_store_record(thd, table))
+ return 1;
+ }
+ return 0;
+}
+
+static int waits_reset_table()
+{
+ for (auto i = 0; i < THD_WAIT_LAST; i++)
+ tp_waits[i] = 0;
+
+ return 0;
+}
+
+static int waits_init(void* p)
+{
+ ST_SCHEMA_TABLE* schema = (ST_SCHEMA_TABLE*)p;
+ schema->fields_info = waits_fields_info;
+ schema->fill_table = waits_fill_table;
+ schema->reset_table = waits_reset_table;
+ return 0;
+}
+
+static struct st_mysql_information_schema plugin_descriptor =
+{ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };
+
+maria_declare_plugin(thread_pool_info)
+{
+ MYSQL_INFORMATION_SCHEMA_PLUGIN,
+ &plugin_descriptor,
+ "THREAD_POOL_GROUPS",
+ "Vladislav Vaintroub",
+ "Provides information about threadpool groups.",
+ PLUGIN_LICENSE_GPL,
+ groups_init,
+ 0,
+ 0x0100,
+ NULL,
+ NULL,
+ "1.0",
+ MariaDB_PLUGIN_MATURITY_STABLE
+},
+{
+ MYSQL_INFORMATION_SCHEMA_PLUGIN,
+ &plugin_descriptor,
+ "THREAD_POOL_QUEUES",
+ "Vladislav Vaintroub",
+ "Provides information about threadpool queues.",
+ PLUGIN_LICENSE_GPL,
+ queues_init,
+ 0,
+ 0x0100,
+ NULL,
+ NULL,
+ "1.0",
+ MariaDB_PLUGIN_MATURITY_STABLE
+},
+{
+ MYSQL_INFORMATION_SCHEMA_PLUGIN,
+ &plugin_descriptor,
+ "THREAD_POOL_STATS",
+ "Vladislav Vaintroub",
+ "Provides performance counter information for threadpool.",
+ PLUGIN_LICENSE_GPL,
+ stats_init,
+ 0,
+ 0x0100,
+ NULL,
+ NULL,
+ "1.0",
+ MariaDB_PLUGIN_MATURITY_STABLE
+},
+{
+ MYSQL_INFORMATION_SCHEMA_PLUGIN,
+ &plugin_descriptor,
+ "THREAD_POOL_WAITS",
+ "Vladislav Vaintroub",
+ "Provides wait counters for threadpool.",
+ PLUGIN_LICENSE_GPL,
+ waits_init,
+ 0,
+ 0x0100,
+ NULL,
+ NULL,
+ "1.0",
+ MariaDB_PLUGIN_MATURITY_STABLE
+}
+maria_declare_plugin_end;
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 6299510d002..7d2c34e0363 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -64,8 +64,6 @@ extern int tp_get_thread_count();
/* Activate threadpool scheduler */
extern void tp_scheduler(void);
-extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff,
- enum enum_var_type scope);
enum TP_PRIORITY {
TP_PRIORITY_HIGH,
@@ -88,6 +86,8 @@ enum TP_STATE
inside threadpool_win.cc and threadpool_unix.cc
*/
+class CONNECT;
+
struct TP_connection
{
THD* thd;
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 3628ceb9112..0cbd076016d 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -23,7 +23,7 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
-
+#include <my_counter.h>
/* Threadpool parameters */
@@ -153,9 +153,8 @@ static TP_PRIORITY get_priority(TP_connection *c)
DBUG_ASSERT(c->thd == current_thd);
TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
if (prio == TP_PRIORITY_AUTO)
- {
- return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
- }
+ prio= c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
+
return prio;
}
@@ -463,12 +462,17 @@ void tp_timeout_handler(TP_connection *c)
mysql_mutex_unlock(&thd->LOCK_thd_kill);
}
+MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
static void tp_wait_begin(THD *thd, int type)
{
TP_connection *c = get_TP_connection(thd);
if (c)
+ {
+ DBUG_ASSERT(type > 0 && type < THD_WAIT_LAST);
+ tp_waits[type]++;
c->wait_begin(type);
+ }
}
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index 1998b8d281b..3cb7be26eb7 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -13,56 +13,27 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
+#if (defined HAVE_POOL_OF_THREADS) && !defined(EMBEDDED_LIBRARY)
+
+#include "threadpool_generic.h"
#include "mariadb.h"
#include <violite.h>
#include <sql_priv.h>
#include <sql_class.h>
#include <my_pthread.h>
#include <scheduler.h>
-
-#ifdef HAVE_POOL_OF_THREADS
-
-#ifdef _WIN32
-/* AIX may define this, too ?*/
-#define HAVE_IOCP
-#endif
-
-#ifdef HAVE_IOCP
-#define OPTIONAL_IO_POLL_READ_PARAM this
-#else
-#define OPTIONAL_IO_POLL_READ_PARAM 0
-#endif
-
-#ifdef _WIN32
-typedef HANDLE TP_file_handle;
-#else
-typedef int TP_file_handle;
-#define INVALID_HANDLE_VALUE -1
-#endif
-
-
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
#include <time.h>
#include <sql_plist.h>
#include <threadpool.h>
-#include <time.h>
-#ifdef __linux__
-#include <sys/epoll.h>
-typedef struct epoll_event native_event;
-#elif defined(HAVE_KQUEUE)
-#include <sys/event.h>
-typedef struct kevent native_event;
-#elif defined (__sun)
-#include <port.h>
-typedef port_event_t native_event;
-#elif defined (HAVE_IOCP)
-typedef OVERLAPPED_ENTRY native_event;
-#else
-#error threadpool is not available on this platform
-#endif
+#ifdef HAVE_IOCP
+#define OPTIONAL_IO_POLL_READ_PARAM this
+#else
+#define OPTIONAL_IO_POLL_READ_PARAM 0
+#endif
static void io_poll_close(TP_file_handle fd)
{
@@ -119,86 +90,7 @@ static PSI_thread_info thread_list[] =
#define PSI_register(X) /* no-op */
#endif
-
-struct thread_group_t;
-
-/* Per-thread structure for workers */
-struct worker_thread_t
-{
- ulonglong event_count; /* number of request handled by this thread */
- thread_group_t* thread_group;
- worker_thread_t *next_in_list;
- worker_thread_t **prev_in_list;
-
- mysql_cond_t cond;
- bool woken;
-};
-
-typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
- &worker_thread_t::next_in_list,
- &worker_thread_t::prev_in_list>
- >
-worker_list_t;
-
-struct TP_connection_generic:public TP_connection
-{
- TP_connection_generic(CONNECT *c);
- ~TP_connection_generic();
-
- virtual int init(){ return 0; };
- virtual void set_io_timeout(int sec);
- virtual int start_io();
- virtual void wait_begin(int type);
- virtual void wait_end();
-
- thread_group_t *thread_group;
- TP_connection_generic *next_in_queue;
- TP_connection_generic **prev_in_queue;
- ulonglong abs_wait_timeout;
- ulonglong dequeue_time;
- TP_file_handle fd;
- bool bound_to_poll_descriptor;
- int waiting;
-#ifdef HAVE_IOCP
- OVERLAPPED overlapped;
-#endif
-#ifdef _WIN32
- enum_vio_type vio_type;
-#endif
-};
-
-
-typedef I_P_List<TP_connection_generic,
- I_P_List_adapter<TP_connection_generic,
- &TP_connection_generic::next_in_queue,
- &TP_connection_generic::prev_in_queue>,
- I_P_List_null_counter,
- I_P_List_fast_push_back<TP_connection_generic> >
-connection_queue_t;
-
-const int NQUEUES=2; /* We have high and low priority queues*/
-
-struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
-{
- mysql_mutex_t mutex;
- connection_queue_t queues[NQUEUES];
- worker_list_t waiting_threads;
- worker_thread_t *listener;
- pthread_attr_t *pthread_attr;
- TP_file_handle pollfd;
- int thread_count;
- int active_thread_count;
- int connection_count;
- /* Stats for the deadlock detection timer routine.*/
- int io_event_count;
- int queue_event_count;
- ulonglong last_thread_creation_time;
- int shutdown_pipe[2];
- bool shutdown;
- bool stalled;
-};
-
-static thread_group_t *all_groups;
+thread_group_t *all_groups;
static uint group_count;
static int32 shutdown_group_count;
@@ -224,9 +116,9 @@ static pool_timer_t pool_timer;
static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
-static int wake_thread(thread_group_t *thread_group);
-static int wake_or_create_thread(thread_group_t *thread_group);
-static int create_worker(thread_group_t *thread_group);
+static int wake_thread(thread_group_t *thread_group,bool due_to_stall);
+static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall=false);
+static int create_worker(thread_group_t *thread_group, bool due_to_stall);
static void *worker_main(void *param);
static void check_stall(thread_group_t *thread_group);
static void set_next_timeout_check(ulonglong abstime);
@@ -563,11 +455,11 @@ static void queue_init(thread_group_t *thread_group)
static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
{
- ulonglong now= pool_timer.current_microtime;
+ ulonglong now= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime;
for(int i=0; i < cnt; i++)
{
TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
- c->dequeue_time= now;
+ c->enqueue_time= now;
thread_group->queues[c->priority].push_back(c);
}
}
@@ -681,7 +573,7 @@ void check_stall(thread_group_t *thread_group)
for (;;)
{
c= thread_group->queues[TP_PRIORITY_LOW].front();
- if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer)
+ if (c && pool_timer.current_microtime - c->enqueue_time > 1000ULL * threadpool_prio_kickup_timer)
{
thread_group->queues[TP_PRIORITY_LOW].remove(c);
thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
@@ -698,7 +590,7 @@ void check_stall(thread_group_t *thread_group)
*/
if (!thread_group->listener && !thread_group->io_event_count)
{
- wake_or_create_thread(thread_group);
+ wake_or_create_thread(thread_group, true);
mysql_mutex_unlock(&thread_group->mutex);
return;
}
@@ -735,7 +627,8 @@ void check_stall(thread_group_t *thread_group)
if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
{
thread_group->stalled= true;
- wake_or_create_thread(thread_group);
+ TP_INCREMENT_GROUP_COUNTER(thread_group,stalls);
+ wake_or_create_thread(thread_group,true);
}
/* Reset queue event count */
@@ -790,13 +683,13 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
break;
cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
-
+ TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_listener);
if (cnt <=0)
{
DBUG_ASSERT(thread_group->shutdown);
break;
}
-
+
mysql_mutex_lock(&thread_group->mutex);
if (thread_group->shutdown)
@@ -864,7 +757,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
if(thread_group->active_thread_count==0)
{
/* We added some work items to queue, now wake a worker. */
- if(wake_thread(thread_group))
+ if(wake_thread(thread_group, false))
{
/*
Wake failed, hence groups has no idle threads. Now check if there are
@@ -882,7 +775,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
create thread, but waiting for timer would be an inefficient and
pointless delay.
*/
- create_worker(thread_group);
+ create_worker(thread_group, false);
}
}
}
@@ -919,7 +812,7 @@ static void add_thread_count(thread_group_t *thread_group, int32 count)
per group to prevent deadlocks (one listener + one worker)
*/
-static int create_worker(thread_group_t *thread_group)
+static int create_worker(thread_group_t *thread_group, bool due_to_stall)
{
pthread_t thread_id;
bool max_threads_reached= false;
@@ -942,6 +835,11 @@ static int create_worker(thread_group_t *thread_group)
thread_group->last_thread_creation_time=microsecond_interval_timer();
statistic_increment(thread_created,&LOCK_status);
add_thread_count(thread_group, 1);
+ TP_INCREMENT_GROUP_COUNTER(thread_group,thread_creations);
+ if(due_to_stall)
+ {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall);
+ }
}
else
{
@@ -993,15 +891,17 @@ static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
Worker creation is throttled, so we avoid too many threads
to be created during the short time.
*/
-static int wake_or_create_thread(thread_group_t *thread_group)
+static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall)
{
DBUG_ENTER("wake_or_create_thread");
if (thread_group->shutdown)
DBUG_RETURN(0);
- if (wake_thread(thread_group) == 0)
+ if (wake_thread(thread_group, due_to_stall) == 0)
+ {
DBUG_RETURN(0);
+ }
if (thread_group->thread_count > thread_group->connection_count)
DBUG_RETURN(-1);
@@ -1015,7 +915,7 @@ static int wake_or_create_thread(thread_group_t *thread_group)
idle thread to wakeup. Smells like a potential deadlock or very slowly
executing requests, e.g sleeps or user locks.
*/
- DBUG_RETURN(create_worker(thread_group));
+ DBUG_RETURN(create_worker(thread_group, due_to_stall));
}
ulonglong now = microsecond_interval_timer();
@@ -1026,9 +926,10 @@ static int wake_or_create_thread(thread_group_t *thread_group)
if (time_since_last_thread_created >
microsecond_throttling_interval(thread_group))
{
- DBUG_RETURN(create_worker(thread_group));
+ DBUG_RETURN(create_worker(thread_group, due_to_stall));
}
-
+
+ TP_INCREMENT_GROUP_COUNTER(thread_group,throttles);
DBUG_RETURN(-1);
}
@@ -1074,7 +975,7 @@ void thread_group_destroy(thread_group_t *thread_group)
Wake sleeping thread from waiting list
*/
-static int wake_thread(thread_group_t *thread_group)
+static int wake_thread(thread_group_t *thread_group,bool due_to_stall)
{
DBUG_ENTER("wake_thread");
worker_thread_t *thread = thread_group->waiting_threads.front();
@@ -1083,6 +984,11 @@ static int wake_thread(thread_group_t *thread_group)
thread->woken= true;
thread_group->waiting_threads.remove(thread);
mysql_cond_signal(&thread->cond);
+ TP_INCREMENT_GROUP_COUNTER(thread_group, wakes);
+ if (due_to_stall)
+ {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall);
+ }
DBUG_RETURN(0);
}
DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
@@ -1140,7 +1046,7 @@ static void thread_group_close(thread_group_t *thread_group)
wake_listener(thread_group);
/* Wake all workers. */
- while(wake_thread(thread_group) == 0)
+ while(wake_thread(thread_group, false) == 0)
{
}
@@ -1224,7 +1130,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
{
connection = queue_get(thread_group);
if(connection)
+ {
+ TP_INCREMENT_GROUP_COUNTER(thread_group,dequeues_by_worker);
break;
+ }
}
/* If there is currently no listener in the group, become one. */
@@ -1235,7 +1144,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
mysql_mutex_unlock(&thread_group->mutex);
connection = listener(current_thread, thread_group);
-
+ if (connection)
+ {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues_by_listener);
+ }
mysql_mutex_lock(&thread_group->mutex);
thread_group->active_thread_count++;
/* There is no listener anymore, it just returned. */
@@ -1251,9 +1163,9 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
*/
if (!oversubscribed)
{
-
native_event ev[MAX_EVENTS];
int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
+ TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_worker);
if (cnt > 0)
{
queue_put(thread_group, ev, cnt);
@@ -1300,6 +1212,7 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
}
thread_group->stalled= false;
+
mysql_mutex_unlock(&thread_group->mutex);
DBUG_RETURN(connection);
@@ -1515,7 +1428,7 @@ static int change_group(TP_connection_generic *c,
new_group->connection_count++;
/* Ensure that there is a listener in the new group. */
if (!new_group->thread_count)
- ret= create_worker(new_group);
+ ret= create_worker(new_group, false);
mysql_mutex_unlock(&new_group->mutex);
return ret;
}
@@ -1775,4 +1688,6 @@ static void print_pool_blocked_message(bool max_threads_reached)
}
}
+
+
#endif /* HAVE_POOL_OF_THREADS */
diff --git a/sql/threadpool_generic.h b/sql/threadpool_generic.h
new file mode 100644
index 00000000000..4b83e1d796f
--- /dev/null
+++ b/sql/threadpool_generic.h
@@ -0,0 +1,150 @@
+/* Copyright(C) 2019 MariaDB
+ *
+ * This program is free software; you can redistribute itand /or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#if defined (HAVE_POOL_OF_THREADS)
+#include <my_global.h>
+#include <sql_plist.h>
+#include <my_pthread.h>
+#include <threadpool.h>
+#include <mysqld.h>
+#include <violite.h>
+
+#ifdef _WIN32
+#include <windows.h>
+/* AIX may define this, too ?*/
+#define HAVE_IOCP
+#endif
+
+
+#ifdef _WIN32
+typedef HANDLE TP_file_handle;
+#else
+typedef int TP_file_handle;
+#define INVALID_HANDLE_VALUE -1
+#endif
+
+#ifdef __linux__
+#include <sys/epoll.h>
+typedef struct epoll_event native_event;
+#elif defined(HAVE_KQUEUE)
+#include <sys/event.h>
+typedef struct kevent native_event;
+#elif defined (__sun)
+#include <port.h>
+typedef port_event_t native_event;
+#elif defined (HAVE_IOCP)
+typedef OVERLAPPED_ENTRY native_event;
+#else
+#error threadpool is not available on this platform
+#endif
+
+struct thread_group_t;
+
+/* Per-thread structure for workers */
+struct worker_thread_t
+{
+ ulonglong event_count; /* number of request handled by this thread */
+ thread_group_t* thread_group;
+ worker_thread_t* next_in_list;
+ worker_thread_t** prev_in_list;
+ mysql_cond_t cond;
+ bool woken;
+};
+
+typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
+ & worker_thread_t::next_in_list,
+ & worker_thread_t::prev_in_list>,
+ I_P_List_counter
+>
+worker_list_t;
+
+struct TP_connection_generic :public TP_connection
+{
+ TP_connection_generic(CONNECT* c);
+ ~TP_connection_generic();
+
+ virtual int init() { return 0; };
+ virtual void set_io_timeout(int sec);
+ virtual int start_io();
+ virtual void wait_begin(int type);
+ virtual void wait_end();
+
+ thread_group_t* thread_group;
+ TP_connection_generic* next_in_queue;
+ TP_connection_generic** prev_in_queue;
+ ulonglong abs_wait_timeout;
+ ulonglong enqueue_time;
+ TP_file_handle fd;
+ bool bound_to_poll_descriptor;
+ int waiting;
+#ifdef HAVE_IOCP
+ OVERLAPPED overlapped;
+#endif
+#ifdef _WIN32
+ enum_vio_type vio_type;
+#endif
+};
+
+
+typedef I_P_List<TP_connection_generic,
+ I_P_List_adapter<TP_connection_generic,
+ & TP_connection_generic::next_in_queue,
+ & TP_connection_generic::prev_in_queue>,
+ I_P_List_counter,
+ I_P_List_fast_push_back<TP_connection_generic> >
+ connection_queue_t;
+
+const int NQUEUES = 2; /* We have high and low priority queues*/
+
+struct thread_group_counters_t
+{
+ ulonglong thread_creations;
+ ulonglong thread_creations_due_to_stall;
+ ulonglong wakes;
+ ulonglong wakes_due_to_stall;
+ ulonglong throttles;
+ ulonglong stalls;
+ ulonglong dequeues_by_worker;
+ ulonglong dequeues_by_listener;
+ ulonglong polls_by_listener;
+ ulonglong polls_by_worker;
+};
+
+struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
+{
+ mysql_mutex_t mutex;
+ connection_queue_t queues[NQUEUES];
+ worker_list_t waiting_threads;
+ worker_thread_t* listener;
+ pthread_attr_t* pthread_attr;
+ TP_file_handle pollfd;
+ int thread_count;
+ int active_thread_count;
+ int connection_count;
+ /* Stats for the deadlock detection timer routine.*/
+ int io_event_count;
+ int queue_event_count;
+ ulonglong last_thread_creation_time;
+ int shutdown_pipe[2];
+ bool shutdown;
+ bool stalled;
+ thread_group_counters_t counters;
+};
+
+#define TP_INCREMENT_GROUP_COUNTER(group,var) group->counters.var++;
+
+extern thread_group_t* all_groups;
+#endif
+