summaryrefslogtreecommitdiff
path: root/sql/threadpool_unix.cc
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@montyprogram.com>2012-01-26 04:35:54 +0100
committerVladislav Vaintroub <wlad@montyprogram.com>2012-01-26 04:35:54 +0100
commit57b6cb39aa268a49ad05f86025386ddde6516670 (patch)
treef42cc3ac4dfd9b7d45b57c3126b19755e604ec62 /sql/threadpool_unix.cc
parent7ed6530a066ab1e1659c39ee614e503462d8d403 (diff)
downloadmariadb-git-57b6cb39aa268a49ad05f86025386ddde6516670.tar.gz
Further review points and simplify Windows implementation
Diffstat (limited to 'sql/threadpool_unix.cc')
-rw-r--r--sql/threadpool_unix.cc132
1 files changed, 71 insertions, 61 deletions
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc
index 5dcc9d4420c..9dc3739dfb4 100644
--- a/sql/threadpool_unix.cc
+++ b/sql/threadpool_unix.cc
@@ -206,6 +206,7 @@ static void print_pool_blocked_message(bool);
or io_poll_start_read() becomes readable. Data associated with
descriptors can be retrieved from native_events array, using
native_event_get_userdata() function.
+
On Linux: epoll_wait()
*/
@@ -248,6 +249,11 @@ int io_poll_disassociate_fd(int pollfd, int fd)
}
+/*
+ Wrapper around epoll_wait.
+ NOTE - in case of EINTR, it restarts with original timeout. Since we use
+ either infinite or 0 timeouts, this is not critical
+*/
int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
int timeout_ms)
{
@@ -260,6 +266,7 @@ int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
return ret;
}
+
static void *native_event_get_userdata(native_event *event)
{
return event->data.ptr;
@@ -364,8 +371,10 @@ int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms
(timeout_ms >= 0)?&ts:NULL);
}
while (ret == -1 && errno == EINTR);
- return nget;
+ DBUG_ASSERT(nget < INT_MAX);
+ return (int)nget;
}
+
static void* native_event_get_userdata(native_event *event)
{
return event->portev_user;
@@ -375,9 +384,8 @@ static void* native_event_get_userdata(native_event *event)
#endif
-
-
/* Dequeue element from a workqueue */
+
static connection_t *queue_get(thread_group_t *thread_group)
{
DBUG_ENTER("queue_get");
@@ -391,12 +399,12 @@ static connection_t *queue_get(thread_group_t *thread_group)
}
-
/*
Handle wait timeout :
Find connections that have been idle for too long and kill them.
Also, recalculate time when next timeout check should run.
*/
+
static void timeout_check(pool_timer_t *timer)
{
DBUG_ENTER("timeout_check");
@@ -418,7 +426,7 @@ static void timeout_check(pool_timer_t *timer)
{
/*
Connection does not have scheduler data. This happens for example
- if THD belongs to another scheduler, that is listening to extra_port.
+ if THD belongs to a different scheduler, that is listening to extra_port.
*/
continue;
}
@@ -458,18 +466,18 @@ static void timeout_check(pool_timer_t *timer)
static void* timer_thread(void *param)
{
uint i;
-
pool_timer_t* timer=(pool_timer_t *)param;
- timer->next_timeout_check= ULONGLONG_MAX;
- timer->current_microtime= microsecond_interval_timer();
-
+
my_thread_init();
DBUG_ENTER("timer_thread");
-
+ timer->next_timeout_check= ULONGLONG_MAX;
+ timer->current_microtime= microsecond_interval_timer();
+
for(;;)
{
struct timespec ts;
int err;
+
set_timespec_nsec(ts,timer->tick_interval*1000000);
mysql_mutex_lock(&timer->mutex);
err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts);
@@ -543,7 +551,6 @@ void check_stall(thread_group_t *thread_group)
}
-
static void start_timer(pool_timer_t* timer)
{
pthread_t thread_id;
@@ -555,6 +562,7 @@ static void start_timer(pool_timer_t* timer)
DBUG_VOID_RETURN;
}
+
static void stop_timer(pool_timer_t *timer)
{
DBUG_ENTER("stop_timer");
@@ -664,50 +672,50 @@ static connection_t * listener(worker_thread_t *current_thread,
thread_group->queue.push_back(c);
}
-
- if(thread_group->active_thread_count==0 && !listener_picks_event)
+ if (listener_picks_event)
{
- /* Wake one worker thread */
+ /* Handle the first event. */
+ retval= (connection_t *)native_event_get_userdata(&ev[0]);
+ mysql_mutex_unlock(&thread_group->mutex);
+ break;
+ }
+
+ if(thread_group->active_thread_count==0)
+ {
+ /* We added some work items to queue, now wake a worker. */
if(wake_thread(thread_group))
{
/*
- Wake failed, groups has no idle threads.
- Now check if the group has at least one worker.
+ Wake failed, hence groups has no idle threads. Now check if there are
+ any threads in the group except listener.
*/
if(thread_group->thread_count == 1 &&
thread_group->pending_thread_start_count == 0)
{
- /*
+ /*
Currently there is no worker thread in the group, as indicated by
- thread_count == 1 (means listener is the only one thread in the
- group).
-
- Rhe queue is not empty, and listener is not going to handle
- events. In order to drain the queue, we create a worker here.
- Alternatively, we could just rely on timer to detect stall, but
- this would be an inefficient, pointless delay.
+ thread_count == 1 (this means listener is the only one thread in
+ the group).
+ The queue is not empty, and listener is not going to handle
+ events. In order to drain the queue, we create a worker here.
+ Alternatively, we could just rely on timer to detect stall, and
+ create thread, but waiting for timer would be an inefficient and
+ pointless delay.
*/
create_worker(thread_group);
}
}
}
mysql_mutex_unlock(&thread_group->mutex);
-
- if (listener_picks_event)
- {
- retval= (connection_t *)native_event_get_userdata(&ev[0]);
- break;
- }
}
-
-
+
DBUG_RETURN(retval);
}
-/*
+/**
Creates a new worker thread.
thread_mutex must be held when calling this function
@@ -806,10 +814,10 @@ static int wake_or_create_thread(thread_group_t *thread_group)
if (thread_group->active_thread_count == 0)
{
/*
- We're better off creating a new thread here with no delay,
- either there is no workers at all, or they all are all blocking
- and there was no sleeping thread to wakeup. It smells like deadlock
- or very slowly executing requests, e.g sleeps or user locks.
+ We're better off creating a new thread here with no delay, either there
+ are no workers at all, or they all are all blocking and there was no
+ idle thread to wakeup. Smells like a potential deadlock or very slowly
+ executing requests, e.g sleeps or user locks.
*/
DBUG_RETURN(create_worker(thread_group));
}
@@ -862,7 +870,8 @@ void thread_group_destroy(thread_group_t *thread_group)
/**
Wake sleeping thread from waiting list
- */
+*/
+
static int wake_thread(thread_group_t *thread_group)
{
DBUG_ENTER("wake_thread");
@@ -879,16 +888,14 @@ static int wake_thread(thread_group_t *thread_group)
}
-/*
+/**
Initiate shutdown for thread group.
-
- The shutdown is asynchronous, we only care to wake all threads
- in here, so they can finish. We do not wait here until threads
- terminate,
-
- Final cleanup of the group (thread_group_destroy) will be done by
- the last exiting threads.
+
+ The shutdown is asynchronous, we only care to wake all threads in here, so
+ they can finish. We do not wait here until threads terminate. Final cleanup
+ of the group (thread_group_destroy) will be done by the last exiting threads.
*/
+
static void thread_group_close(thread_group_t *thread_group)
{
DBUG_ENTER("thread_group_close");
@@ -938,10 +945,11 @@ static void thread_group_close(thread_group_t *thread_group)
perform login (this is done in worker threads).
*/
+
static void queue_put(thread_group_t *thread_group, connection_t *connection)
{
DBUG_ENTER("queue_put");
-
+
mysql_mutex_lock(&thread_group->mutex);
thread_group->queue.push_back(connection);
if (thread_group->active_thread_count == 0)
@@ -949,14 +957,16 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection)
wake_or_create_thread(thread_group);
}
mysql_mutex_unlock(&thread_group->mutex);
+
DBUG_VOID_RETURN;
}
/*
- This is used to prevent too many threads executing at the same time,
- if the workload is not CPU bound.
+ Prevent too many threads executing at the same time,if the workload is
+ not CPU bound.
*/
+
static bool too_many_threads(thread_group_t *thread_group)
{
return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe
@@ -964,7 +974,6 @@ static bool too_many_threads(thread_group_t *thread_group)
}
-
/**
Retrieve a connection with pending event.
@@ -981,17 +990,15 @@ static bool too_many_threads(thread_group_t *thread_group)
@return
connection with pending event. NULL is returned if timeout has expired,or on shutdown.
*/
+
connection_t *get_event(worker_thread_t *current_thread,
thread_group_t *thread_group, struct timespec *abstime)
{
DBUG_ENTER("get_event");
-
connection_t *connection = NULL;
int err=0;
mysql_mutex_lock(&thread_group->mutex);
-
-
DBUG_ASSERT(thread_group->active_thread_count >= 0);
do
@@ -1083,6 +1090,7 @@ connection_t *get_event(worker_thread_t *current_thread,
Tells the pool that worker starts waiting on IO, lock, condition,
sleep() or similar.
*/
+
void wait_begin(thread_group_t *thread_group)
{
DBUG_ENTER("wait_begin");
@@ -1200,7 +1208,7 @@ static void connection_abort(connection_t *connection)
/**
- MySQL scheduler callback : kill connection
+ MySQL scheduler callback : kill connection
*/
void tp_post_kill_notification(THD *thd)
@@ -1215,7 +1223,7 @@ void tp_post_kill_notification(THD *thd)
}
/**
- MySQL scheduler callback: wait begin
+ MySQL scheduler callback: wait begin
*/
void tp_wait_begin(THD *thd, int type)
@@ -1237,7 +1245,7 @@ void tp_wait_begin(THD *thd, int type)
/**
- MySQL scheduler callback: wait end
+ MySQL scheduler callback: wait end
*/
void tp_wait_end(THD *thd)
@@ -1256,7 +1264,7 @@ void tp_wait_end(THD *thd)
DBUG_VOID_RETURN;
}
-
+
static void set_next_timeout_check(ulonglong abstime)
{
DBUG_ENTER("set_next_timeout_check");
@@ -1273,7 +1281,6 @@ static void set_next_timeout_check(ulonglong abstime)
/**
Set wait timeout for connection.
*/
-
static void set_wait_timeout(connection_t *c)
{
DBUG_ENTER("set_wait_timeout");
@@ -1400,8 +1407,9 @@ static void handle_event(connection_t *connection)
}
/**
- Worker thread's main
+ Worker thread's main
*/
+
static void *worker_main(void *param)
{
@@ -1543,7 +1551,8 @@ void tp_set_threadpool_stall_limit(uint limit)
Calculate number of idle/waiting threads in the pool.
Sum idle threads over all groups.
- Don't do any locking, it is not required for stats.
+ D
+ on't do any locking, it is not required for stats.
*/
int tp_get_idle_thread_count()
{
@@ -1601,7 +1610,8 @@ static void print_pool_blocked_message(bool max_threads_reached)
else
sql_print_error(create_thread_error_msg, my_errno);
- sql_print_information("Threadpool has been blocked for %u seconds\n",(uint)(now- pool_block_start));
+ sql_print_information("Threadpool has been blocked for %u seconds\n",
+ (uint)(now- pool_block_start));
/* avoid reperated messages for the same blocking situation */
msg_written= true;
}