diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2019-05-26 13:25:12 +0200 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2019-05-26 19:20:35 +0200 |
commit | 2fc13d04d16f878ed693ad8ba56045b79ccb9650 (patch) | |
tree | 23cbb2dbe934d4879cae668b0164dbec0ba71503 /sql/threadpool_generic.cc | |
parent | 5f18bd3a3545bae97956fbbf4b29d2e9288a6505 (diff) | |
download | mariadb-git-2fc13d04d16f878ed693ad8ba56045b79ccb9650.tar.gz |
MDEV-19313 Threadpool : provide information schema tables for internals of generic threadpool
Added thread_pool_groups, thread_pool_queues, thread_pool_waits and
thread_pool_stats tables to information_schema.
Diffstat (limited to 'sql/threadpool_generic.cc')
-rw-r--r-- | sql/threadpool_generic.cc | 195 |
1 files changed, 55 insertions, 140 deletions
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index 1998b8d281b..3cb7be26eb7 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -13,56 +13,27 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ +#if (defined HAVE_POOL_OF_THREADS) && !defined(EMBEDDED_LIBRARY) + +#include "threadpool_generic.h" #include "mariadb.h" #include <violite.h> #include <sql_priv.h> #include <sql_class.h> #include <my_pthread.h> #include <scheduler.h> - -#ifdef HAVE_POOL_OF_THREADS - -#ifdef _WIN32 -/* AIX may define this, too ?*/ -#define HAVE_IOCP -#endif - -#ifdef HAVE_IOCP -#define OPTIONAL_IO_POLL_READ_PARAM this -#else -#define OPTIONAL_IO_POLL_READ_PARAM 0 -#endif - -#ifdef _WIN32 -typedef HANDLE TP_file_handle; -#else -typedef int TP_file_handle; -#define INVALID_HANDLE_VALUE -1 -#endif - - #include <sql_connect.h> #include <mysqld.h> #include <debug_sync.h> #include <time.h> #include <sql_plist.h> #include <threadpool.h> -#include <time.h> -#ifdef __linux__ -#include <sys/epoll.h> -typedef struct epoll_event native_event; -#elif defined(HAVE_KQUEUE) -#include <sys/event.h> -typedef struct kevent native_event; -#elif defined (__sun) -#include <port.h> -typedef port_event_t native_event; -#elif defined (HAVE_IOCP) -typedef OVERLAPPED_ENTRY native_event; -#else -#error threadpool is not available on this platform -#endif +#ifdef HAVE_IOCP +#define OPTIONAL_IO_POLL_READ_PARAM this +#else +#define OPTIONAL_IO_POLL_READ_PARAM 0 +#endif static void io_poll_close(TP_file_handle fd) { @@ -119,86 +90,7 @@ static PSI_thread_info thread_list[] = #define PSI_register(X) /* no-op */ #endif - -struct thread_group_t; - -/* Per-thread structure for workers */ -struct worker_thread_t -{ - ulonglong event_count; /* number of request handled by this thread */ - thread_group_t* thread_group; - worker_thread_t *next_in_list; - worker_thread_t **prev_in_list; - - mysql_cond_t cond; - bool woken; -}; - -typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t, - &worker_thread_t::next_in_list, - &worker_thread_t::prev_in_list> - > -worker_list_t; - -struct TP_connection_generic:public TP_connection -{ - TP_connection_generic(CONNECT *c); - ~TP_connection_generic(); - - virtual int init(){ return 0; }; - virtual void set_io_timeout(int sec); - virtual int start_io(); - virtual void wait_begin(int type); - virtual void wait_end(); - - thread_group_t *thread_group; - TP_connection_generic *next_in_queue; - TP_connection_generic **prev_in_queue; - ulonglong abs_wait_timeout; - ulonglong dequeue_time; - TP_file_handle fd; - bool bound_to_poll_descriptor; - int waiting; -#ifdef HAVE_IOCP - OVERLAPPED overlapped; -#endif -#ifdef _WIN32 - enum_vio_type vio_type; -#endif -}; - - -typedef I_P_List<TP_connection_generic, - I_P_List_adapter<TP_connection_generic, - &TP_connection_generic::next_in_queue, - &TP_connection_generic::prev_in_queue>, - I_P_List_null_counter, - I_P_List_fast_push_back<TP_connection_generic> > -connection_queue_t; - -const int NQUEUES=2; /* We have high and low priority queues*/ - -struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t -{ - mysql_mutex_t mutex; - connection_queue_t queues[NQUEUES]; - worker_list_t waiting_threads; - worker_thread_t *listener; - pthread_attr_t *pthread_attr; - TP_file_handle pollfd; - int thread_count; - int active_thread_count; - int connection_count; - /* Stats for the deadlock detection timer routine.*/ - int io_event_count; - int queue_event_count; - ulonglong last_thread_creation_time; - int shutdown_pipe[2]; - bool shutdown; - bool stalled; -}; - -static thread_group_t *all_groups; +thread_group_t *all_groups; static uint group_count; static int32 shutdown_group_count; @@ -224,9 +116,9 @@ static pool_timer_t pool_timer; static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt); -static int wake_thread(thread_group_t *thread_group); -static int wake_or_create_thread(thread_group_t *thread_group); -static int create_worker(thread_group_t *thread_group); +static int wake_thread(thread_group_t *thread_group,bool due_to_stall); +static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall=false); +static int create_worker(thread_group_t *thread_group, bool due_to_stall); static void *worker_main(void *param); static void check_stall(thread_group_t *thread_group); static void set_next_timeout_check(ulonglong abstime); @@ -563,11 +455,11 @@ static void queue_init(thread_group_t *thread_group) static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt) { - ulonglong now= pool_timer.current_microtime; + ulonglong now= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime; for(int i=0; i < cnt; i++) { TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]); - c->dequeue_time= now; + c->enqueue_time= now; thread_group->queues[c->priority].push_back(c); } } @@ -681,7 +573,7 @@ void check_stall(thread_group_t *thread_group) for (;;) { c= thread_group->queues[TP_PRIORITY_LOW].front(); - if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer) + if (c && pool_timer.current_microtime - c->enqueue_time > 1000ULL * threadpool_prio_kickup_timer) { thread_group->queues[TP_PRIORITY_LOW].remove(c); thread_group->queues[TP_PRIORITY_HIGH].push_back(c); @@ -698,7 +590,7 @@ void check_stall(thread_group_t *thread_group) */ if (!thread_group->listener && !thread_group->io_event_count) { - wake_or_create_thread(thread_group); + wake_or_create_thread(thread_group, true); mysql_mutex_unlock(&thread_group->mutex); return; } @@ -735,7 +627,8 @@ void check_stall(thread_group_t *thread_group) if (!is_queue_empty(thread_group) && !thread_group->queue_event_count) { thread_group->stalled= true; - wake_or_create_thread(thread_group); + TP_INCREMENT_GROUP_COUNTER(thread_group,stalls); + wake_or_create_thread(thread_group,true); } /* Reset queue event count */ @@ -790,13 +683,13 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, break; cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); - + TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_listener); if (cnt <=0) { DBUG_ASSERT(thread_group->shutdown); break; } - + mysql_mutex_lock(&thread_group->mutex); if (thread_group->shutdown) @@ -864,7 +757,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, if(thread_group->active_thread_count==0) { /* We added some work items to queue, now wake a worker. */ - if(wake_thread(thread_group)) + if(wake_thread(thread_group, false)) { /* Wake failed, hence groups has no idle threads. Now check if there are @@ -882,7 +775,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread, create thread, but waiting for timer would be an inefficient and pointless delay. */ - create_worker(thread_group); + create_worker(thread_group, false); } } } @@ -919,7 +812,7 @@ static void add_thread_count(thread_group_t *thread_group, int32 count) per group to prevent deadlocks (one listener + one worker) */ -static int create_worker(thread_group_t *thread_group) +static int create_worker(thread_group_t *thread_group, bool due_to_stall) { pthread_t thread_id; bool max_threads_reached= false; @@ -942,6 +835,11 @@ static int create_worker(thread_group_t *thread_group) thread_group->last_thread_creation_time=microsecond_interval_timer(); statistic_increment(thread_created,&LOCK_status); add_thread_count(thread_group, 1); + TP_INCREMENT_GROUP_COUNTER(thread_group,thread_creations); + if(due_to_stall) + { + TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall); + } } else { @@ -993,15 +891,17 @@ static ulonglong microsecond_throttling_interval(thread_group_t *thread_group) Worker creation is throttled, so we avoid too many threads to be created during the short time. */ -static int wake_or_create_thread(thread_group_t *thread_group) +static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall) { DBUG_ENTER("wake_or_create_thread"); if (thread_group->shutdown) DBUG_RETURN(0); - if (wake_thread(thread_group) == 0) + if (wake_thread(thread_group, due_to_stall) == 0) + { DBUG_RETURN(0); + } if (thread_group->thread_count > thread_group->connection_count) DBUG_RETURN(-1); @@ -1015,7 +915,7 @@ static int wake_or_create_thread(thread_group_t *thread_group) idle thread to wakeup. Smells like a potential deadlock or very slowly executing requests, e.g sleeps or user locks. */ - DBUG_RETURN(create_worker(thread_group)); + DBUG_RETURN(create_worker(thread_group, due_to_stall)); } ulonglong now = microsecond_interval_timer(); @@ -1026,9 +926,10 @@ static int wake_or_create_thread(thread_group_t *thread_group) if (time_since_last_thread_created > microsecond_throttling_interval(thread_group)) { - DBUG_RETURN(create_worker(thread_group)); + DBUG_RETURN(create_worker(thread_group, due_to_stall)); } - + + TP_INCREMENT_GROUP_COUNTER(thread_group,throttles); DBUG_RETURN(-1); } @@ -1074,7 +975,7 @@ void thread_group_destroy(thread_group_t *thread_group) Wake sleeping thread from waiting list */ -static int wake_thread(thread_group_t *thread_group) +static int wake_thread(thread_group_t *thread_group,bool due_to_stall) { DBUG_ENTER("wake_thread"); worker_thread_t *thread = thread_group->waiting_threads.front(); @@ -1083,6 +984,11 @@ static int wake_thread(thread_group_t *thread_group) thread->woken= true; thread_group->waiting_threads.remove(thread); mysql_cond_signal(&thread->cond); + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes); + if (due_to_stall) + { + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall); + } DBUG_RETURN(0); } DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ @@ -1140,7 +1046,7 @@ static void thread_group_close(thread_group_t *thread_group) wake_listener(thread_group); /* Wake all workers. */ - while(wake_thread(thread_group) == 0) + while(wake_thread(thread_group, false) == 0) { } @@ -1224,7 +1130,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, { connection = queue_get(thread_group); if(connection) + { + TP_INCREMENT_GROUP_COUNTER(thread_group,dequeues_by_worker); break; + } } /* If there is currently no listener in the group, become one. */ @@ -1235,7 +1144,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, mysql_mutex_unlock(&thread_group->mutex); connection = listener(current_thread, thread_group); - + if (connection) + { + TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues_by_listener); + } mysql_mutex_lock(&thread_group->mutex); thread_group->active_thread_count++; /* There is no listener anymore, it just returned. */ @@ -1251,9 +1163,9 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, */ if (!oversubscribed) { - native_event ev[MAX_EVENTS]; int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0); + TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_worker); if (cnt > 0) { queue_put(thread_group, ev, cnt); @@ -1300,6 +1212,7 @@ TP_connection_generic *get_event(worker_thread_t *current_thread, } thread_group->stalled= false; + mysql_mutex_unlock(&thread_group->mutex); DBUG_RETURN(connection); @@ -1515,7 +1428,7 @@ static int change_group(TP_connection_generic *c, new_group->connection_count++; /* Ensure that there is a listener in the new group. */ if (!new_group->thread_count) - ret= create_worker(new_group); + ret= create_worker(new_group, false); mysql_mutex_unlock(&new_group->mutex); return ret; } @@ -1775,4 +1688,6 @@ static void print_pool_blocked_message(bool max_threads_reached) } } + + #endif /* HAVE_POOL_OF_THREADS */ |