summaryrefslogtreecommitdiff
path: root/sql/threadpool_generic.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/threadpool_generic.cc')
-rw-r--r--sql/threadpool_generic.cc228
1 files changed, 72 insertions, 156 deletions
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index eb92846ed07..768dbab4e6b 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -13,56 +13,28 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
+#if (defined HAVE_POOL_OF_THREADS) && !defined(EMBEDDED_LIBRARY)
+
+#include "threadpool_generic.h"
#include "mariadb.h"
#include <violite.h>
#include <sql_priv.h>
#include <sql_class.h>
#include <my_pthread.h>
#include <scheduler.h>
-
-#ifdef HAVE_POOL_OF_THREADS
-
-#ifdef _WIN32
-/* AIX may define this, too ?*/
-#define HAVE_IOCP
-#endif
-
-#ifdef HAVE_IOCP
-#define OPTIONAL_IO_POLL_READ_PARAM this
-#else
-#define OPTIONAL_IO_POLL_READ_PARAM 0
-#endif
-
-#ifdef _WIN32
-typedef HANDLE TP_file_handle;
-#else
-typedef int TP_file_handle;
-#define INVALID_HANDLE_VALUE -1
-#endif
-
-
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
#include <time.h>
#include <sql_plist.h>
#include <threadpool.h>
-#include <time.h>
-#ifdef __linux__
-#include <sys/epoll.h>
-typedef struct epoll_event native_event;
-#elif defined(HAVE_KQUEUE)
-#include <sys/event.h>
-typedef struct kevent native_event;
-#elif defined (__sun)
-#include <port.h>
-typedef port_event_t native_event;
-#elif defined (HAVE_IOCP)
-typedef OVERLAPPED_ENTRY native_event;
-#else
-#error threadpool is not available on this platform
-#endif
+#include <algorithm>
+#ifdef HAVE_IOCP
+#define OPTIONAL_IO_POLL_READ_PARAM this
+#else
+#define OPTIONAL_IO_POLL_READ_PARAM 0
+#endif
static void io_poll_close(TP_file_handle fd)
{
@@ -119,86 +91,7 @@ static PSI_thread_info thread_list[] =
#define PSI_register(X) /* no-op */
#endif
-
-struct thread_group_t;
-
-/* Per-thread structure for workers */
-struct worker_thread_t
-{
- ulonglong event_count; /* number of request handled by this thread */
- thread_group_t* thread_group;
- worker_thread_t *next_in_list;
- worker_thread_t **prev_in_list;
-
- mysql_cond_t cond;
- bool woken;
-};
-
-typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
- &worker_thread_t::next_in_list,
- &worker_thread_t::prev_in_list>
- >
-worker_list_t;
-
-struct TP_connection_generic:public TP_connection
-{
- TP_connection_generic(CONNECT *c);
- ~TP_connection_generic();
-
- virtual int init(){ return 0; };
- virtual void set_io_timeout(int sec);
- virtual int start_io();
- virtual void wait_begin(int type);
- virtual void wait_end();
-
- thread_group_t *thread_group;
- TP_connection_generic *next_in_queue;
- TP_connection_generic **prev_in_queue;
- ulonglong abs_wait_timeout;
- ulonglong dequeue_time;
- TP_file_handle fd;
- bool bound_to_poll_descriptor;
- int waiting;
-#ifdef HAVE_IOCP
- OVERLAPPED overlapped;
-#endif
-#ifdef _WIN32
- enum_vio_type vio_type;
-#endif
-};
-
-
-typedef I_P_List<TP_connection_generic,
- I_P_List_adapter<TP_connection_generic,
- &TP_connection_generic::next_in_queue,
- &TP_connection_generic::prev_in_queue>,
- I_P_List_null_counter,
- I_P_List_fast_push_back<TP_connection_generic> >
-connection_queue_t;
-
-const int NQUEUES=2; /* We have high and low priority queues*/
-
-struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) thread_group_t
-{
- mysql_mutex_t mutex;
- connection_queue_t queues[NQUEUES];
- worker_list_t waiting_threads;
- worker_thread_t *listener;
- pthread_attr_t *pthread_attr;
- TP_file_handle pollfd;
- int thread_count;
- int active_thread_count;
- int connection_count;
- /* Stats for the deadlock detection timer routine.*/
- int io_event_count;
- int queue_event_count;
- ulonglong last_thread_creation_time;
- int shutdown_pipe[2];
- bool shutdown;
- bool stalled;
-};
-
-static thread_group_t *all_groups;
+thread_group_t *all_groups;
static uint group_count;
static int32 shutdown_group_count;
@@ -224,9 +117,9 @@ static pool_timer_t pool_timer;
static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
-static int wake_thread(thread_group_t *thread_group);
-static int wake_or_create_thread(thread_group_t *thread_group);
-static int create_worker(thread_group_t *thread_group);
+static int wake_thread(thread_group_t *thread_group,bool due_to_stall);
+static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall=false);
+static int create_worker(thread_group_t *thread_group, bool due_to_stall);
static void *worker_main(void *param);
static void check_stall(thread_group_t *thread_group);
static void set_next_timeout_check(ulonglong abstime);
@@ -563,11 +456,11 @@ static void queue_init(thread_group_t *thread_group)
static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
{
- ulonglong now= pool_timer.current_microtime;
+ ulonglong now= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime;
for(int i=0; i < cnt; i++)
{
TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
- c->dequeue_time= now;
+ c->enqueue_time= now;
thread_group->queues[c->priority].push_back(c);
}
}
@@ -681,7 +574,7 @@ void check_stall(thread_group_t *thread_group)
for (;;)
{
c= thread_group->queues[TP_PRIORITY_LOW].front();
- if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer)
+ if (c && pool_timer.current_microtime - c->enqueue_time > 1000ULL * threadpool_prio_kickup_timer)
{
thread_group->queues[TP_PRIORITY_LOW].remove(c);
thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
@@ -698,7 +591,7 @@ void check_stall(thread_group_t *thread_group)
*/
if (!thread_group->listener && !thread_group->io_event_count)
{
- wake_or_create_thread(thread_group);
+ wake_or_create_thread(thread_group, true);
mysql_mutex_unlock(&thread_group->mutex);
return;
}
@@ -735,7 +628,8 @@ void check_stall(thread_group_t *thread_group)
if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
{
thread_group->stalled= true;
- wake_or_create_thread(thread_group);
+ TP_INCREMENT_GROUP_COUNTER(thread_group,stalls);
+ wake_or_create_thread(thread_group,true);
}
/* Reset queue event count */
@@ -790,13 +684,13 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
break;
cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1);
-
+ TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_listener);
if (cnt <=0)
{
DBUG_ASSERT(thread_group->shutdown);
break;
}
-
+
mysql_mutex_lock(&thread_group->mutex);
if (thread_group->shutdown)
@@ -851,7 +745,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
more workers.
*/
- bool listener_picks_event=is_queue_empty(thread_group);
+ bool listener_picks_event=is_queue_empty(thread_group) && !threadpool_dedicated_listener;
queue_put(thread_group, ev, cnt);
if (listener_picks_event)
{
@@ -864,7 +758,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
if(thread_group->active_thread_count==0)
{
/* We added some work items to queue, now wake a worker. */
- if(wake_thread(thread_group))
+ if(wake_thread(thread_group, false))
{
/*
Wake failed, hence groups has no idle threads. Now check if there are
@@ -882,7 +776,7 @@ static TP_connection_generic * listener(worker_thread_t *current_thread,
create thread, but waiting for timer would be an inefficient and
pointless delay.
*/
- create_worker(thread_group);
+ create_worker(thread_group, false);
}
}
}
@@ -919,7 +813,7 @@ static void add_thread_count(thread_group_t *thread_group, int32 count)
per group to prevent deadlocks (one listener + one worker)
*/
-static int create_worker(thread_group_t *thread_group)
+static int create_worker(thread_group_t *thread_group, bool due_to_stall)
{
pthread_t thread_id;
bool max_threads_reached= false;
@@ -942,6 +836,11 @@ static int create_worker(thread_group_t *thread_group)
thread_group->last_thread_creation_time=microsecond_interval_timer();
statistic_increment(thread_created,&LOCK_status);
add_thread_count(thread_group, 1);
+ TP_INCREMENT_GROUP_COUNTER(thread_group,thread_creations);
+ if(due_to_stall)
+ {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall);
+ }
}
else
{
@@ -967,23 +866,24 @@ end:
The actual values were not calculated using any scientific methods.
They just look right, and behave well in practice.
-
- TODO: Should throttling depend on thread_pool_stall_limit?
*/
+
+#define THROTTLING_FACTOR (threadpool_stall_limit/std::max(DEFAULT_THREADPOOL_STALL_LIMIT,threadpool_stall_limit))
+
static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
{
int count= thread_group->thread_count;
- if (count < 4)
+ if (count < 1+ (int)threadpool_oversubscribe)
return 0;
-
+
if (count < 8)
- return 50*1000;
-
+ return 50*1000*THROTTLING_FACTOR;
+
if(count < 16)
- return 100*1000;
-
- return 200*1000;
+ return 100*1000*THROTTLING_FACTOR;
+
+ return 200*100*THROTTLING_FACTOR;
}
@@ -993,15 +893,17 @@ static ulonglong microsecond_throttling_interval(thread_group_t *thread_group)
Worker creation is throttled, so we avoid too many threads
to be created during the short time.
*/
-static int wake_or_create_thread(thread_group_t *thread_group)
+static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall)
{
DBUG_ENTER("wake_or_create_thread");
if (thread_group->shutdown)
DBUG_RETURN(0);
- if (wake_thread(thread_group) == 0)
+ if (wake_thread(thread_group, due_to_stall) == 0)
+ {
DBUG_RETURN(0);
+ }
if (thread_group->thread_count > thread_group->connection_count)
DBUG_RETURN(-1);
@@ -1015,7 +917,7 @@ static int wake_or_create_thread(thread_group_t *thread_group)
idle thread to wakeup. Smells like a potential deadlock or very slowly
executing requests, e.g sleeps or user locks.
*/
- DBUG_RETURN(create_worker(thread_group));
+ DBUG_RETURN(create_worker(thread_group, due_to_stall));
}
ulonglong now = microsecond_interval_timer();
@@ -1026,9 +928,10 @@ static int wake_or_create_thread(thread_group_t *thread_group)
if (time_since_last_thread_created >
microsecond_throttling_interval(thread_group))
{
- DBUG_RETURN(create_worker(thread_group));
+ DBUG_RETURN(create_worker(thread_group, due_to_stall));
}
-
+
+ TP_INCREMENT_GROUP_COUNTER(thread_group,throttles);
DBUG_RETURN(-1);
}
@@ -1074,7 +977,7 @@ void thread_group_destroy(thread_group_t *thread_group)
Wake sleeping thread from waiting list
*/
-static int wake_thread(thread_group_t *thread_group)
+static int wake_thread(thread_group_t *thread_group,bool due_to_stall)
{
DBUG_ENTER("wake_thread");
worker_thread_t *thread = thread_group->waiting_threads.front();
@@ -1083,6 +986,11 @@ static int wake_thread(thread_group_t *thread_group)
thread->woken= true;
thread_group->waiting_threads.remove(thread);
mysql_cond_signal(&thread->cond);
+ TP_INCREMENT_GROUP_COUNTER(thread_group, wakes);
+ if (due_to_stall)
+ {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall);
+ }
DBUG_RETURN(0);
}
DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
@@ -1140,7 +1048,7 @@ static void thread_group_close(thread_group_t *thread_group)
wake_listener(thread_group);
/* Wake all workers. */
- while(wake_thread(thread_group) == 0)
+ while(wake_thread(thread_group, false) == 0)
{
}
@@ -1162,7 +1070,7 @@ static void queue_put(thread_group_t *thread_group, TP_connection_generic *conne
{
DBUG_ENTER("queue_put");
- connection->dequeue_time= pool_timer.current_microtime;
+ connection->enqueue_time= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime;
thread_group->queues[connection->priority].push_back(connection);
if (thread_group->active_thread_count == 0)
@@ -1224,7 +1132,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
{
connection = queue_get(thread_group);
if(connection)
+ {
+ TP_INCREMENT_GROUP_COUNTER(thread_group,dequeues_by_worker);
break;
+ }
}
/* If there is currently no listener in the group, become one. */
@@ -1235,7 +1146,10 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
mysql_mutex_unlock(&thread_group->mutex);
connection = listener(current_thread, thread_group);
-
+ if (connection)
+ {
+ TP_INCREMENT_GROUP_COUNTER(thread_group, dequeues_by_listener);
+ }
mysql_mutex_lock(&thread_group->mutex);
thread_group->active_thread_count++;
/* There is no listener anymore, it just returned. */
@@ -1251,9 +1165,9 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
*/
if (!oversubscribed)
{
-
native_event ev[MAX_EVENTS];
int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
+ TP_INCREMENT_GROUP_COUNTER(thread_group, polls_by_worker);
if (cnt > 0)
{
queue_put(thread_group, ev, cnt);
@@ -1300,6 +1214,7 @@ TP_connection_generic *get_event(worker_thread_t *current_thread,
}
thread_group->stalled= false;
+
mysql_mutex_unlock(&thread_group->mutex);
DBUG_RETURN(connection);
@@ -1433,14 +1348,13 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
, overlapped()
#endif
{
- DBUG_ASSERT(c->vio);
+ DBUG_ASSERT(c->vio_type != VIO_CLOSED);
#ifdef _WIN32
- vio_type= c->vio->type;
- fd= (vio_type == VIO_TYPE_NAMEDPIPE) ?
- c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket);
+ fd= (c->vio_type == VIO_TYPE_NAMEDPIPE) ?
+ c->pipe: (TP_file_handle) mysql_socket_getfd(c->sock);
#else
- fd= mysql_socket_getfd(c->vio->mysql_socket);
+ fd= mysql_socket_getfd(c->sock);
#endif
/* Assign connection to a group. */
@@ -1516,7 +1430,7 @@ static int change_group(TP_connection_generic *c,
new_group->connection_count++;
/* Ensure that there is a listener in the new group. */
if (!new_group->thread_count)
- ret= create_worker(new_group);
+ ret= create_worker(new_group, false);
mysql_mutex_unlock(&new_group->mutex);
return ret;
}
@@ -1776,4 +1690,6 @@ static void print_pool_blocked_message(bool max_threads_reached)
}
}
+
+
#endif /* HAVE_POOL_OF_THREADS */