diff options
author | Vladislav Vaintroub <wlad@montyprogram.com> | 2012-01-26 04:35:54 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@montyprogram.com> | 2012-01-26 04:35:54 +0100 |
commit | 57b6cb39aa268a49ad05f86025386ddde6516670 (patch) | |
tree | f42cc3ac4dfd9b7d45b57c3126b19755e604ec62 /sql/threadpool_unix.cc | |
parent | 7ed6530a066ab1e1659c39ee614e503462d8d403 (diff) | |
download | mariadb-git-57b6cb39aa268a49ad05f86025386ddde6516670.tar.gz |
Further review points and simplify Windows implementation
Diffstat (limited to 'sql/threadpool_unix.cc')
-rw-r--r-- | sql/threadpool_unix.cc | 132 |
1 files changed, 71 insertions, 61 deletions
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index 5dcc9d4420c..9dc3739dfb4 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -206,6 +206,7 @@ static void print_pool_blocked_message(bool); or io_poll_start_read() becomes readable. Data associated with descriptors can be retrieved from native_events array, using native_event_get_userdata() function. + On Linux: epoll_wait() */ @@ -248,6 +249,11 @@ int io_poll_disassociate_fd(int pollfd, int fd) } +/* + Wrapper around epoll_wait. + NOTE - in case of EINTR, it restarts with original timeout. Since we use + either infinite or 0 timeouts, this is not critical +*/ int io_poll_wait(int pollfd, native_event *native_events, int maxevents, int timeout_ms) { @@ -260,6 +266,7 @@ int io_poll_wait(int pollfd, native_event *native_events, int maxevents, return ret; } + static void *native_event_get_userdata(native_event *event) { return event->data.ptr; @@ -364,8 +371,10 @@ int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms (timeout_ms >= 0)?&ts:NULL); } while (ret == -1 && errno == EINTR); - return nget; + DBUG_ASSERT(nget < INT_MAX); + return (int)nget; } + static void* native_event_get_userdata(native_event *event) { return event->portev_user; @@ -375,9 +384,8 @@ static void* native_event_get_userdata(native_event *event) #endif - - /* Dequeue element from a workqueue */ + static connection_t *queue_get(thread_group_t *thread_group) { DBUG_ENTER("queue_get"); @@ -391,12 +399,12 @@ static connection_t *queue_get(thread_group_t *thread_group) } - /* Handle wait timeout : Find connections that have been idle for too long and kill them. Also, recalculate time when next timeout check should run. */ + static void timeout_check(pool_timer_t *timer) { DBUG_ENTER("timeout_check"); @@ -418,7 +426,7 @@ static void timeout_check(pool_timer_t *timer) { /* Connection does not have scheduler data. This happens for example - if THD belongs to another scheduler, that is listening to extra_port. + if THD belongs to a different scheduler, that is listening to extra_port. */ continue; } @@ -458,18 +466,18 @@ static void timeout_check(pool_timer_t *timer) static void* timer_thread(void *param) { uint i; - pool_timer_t* timer=(pool_timer_t *)param; - timer->next_timeout_check= ULONGLONG_MAX; - timer->current_microtime= microsecond_interval_timer(); - + my_thread_init(); DBUG_ENTER("timer_thread"); - + timer->next_timeout_check= ULONGLONG_MAX; + timer->current_microtime= microsecond_interval_timer(); + for(;;) { struct timespec ts; int err; + set_timespec_nsec(ts,timer->tick_interval*1000000); mysql_mutex_lock(&timer->mutex); err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); @@ -543,7 +551,6 @@ void check_stall(thread_group_t *thread_group) } - static void start_timer(pool_timer_t* timer) { pthread_t thread_id; @@ -555,6 +562,7 @@ static void start_timer(pool_timer_t* timer) DBUG_VOID_RETURN; } + static void stop_timer(pool_timer_t *timer) { DBUG_ENTER("stop_timer"); @@ -664,50 +672,50 @@ static connection_t * listener(worker_thread_t *current_thread, thread_group->queue.push_back(c); } - - if(thread_group->active_thread_count==0 && !listener_picks_event) + if (listener_picks_event) { - /* Wake one worker thread */ + /* Handle the first event. */ + retval= (connection_t *)native_event_get_userdata(&ev[0]); + mysql_mutex_unlock(&thread_group->mutex); + break; + } + + if(thread_group->active_thread_count==0) + { + /* We added some work items to queue, now wake a worker. */ if(wake_thread(thread_group)) { /* - Wake failed, groups has no idle threads. - Now check if the group has at least one worker. + Wake failed, hence groups has no idle threads. Now check if there are + any threads in the group except listener. */ if(thread_group->thread_count == 1 && thread_group->pending_thread_start_count == 0) { - /* + /* Currently there is no worker thread in the group, as indicated by - thread_count == 1 (means listener is the only one thread in the - group). - - Rhe queue is not empty, and listener is not going to handle - events. In order to drain the queue, we create a worker here. - Alternatively, we could just rely on timer to detect stall, but - this would be an inefficient, pointless delay. + thread_count == 1 (this means listener is the only one thread in + the group). + The queue is not empty, and listener is not going to handle + events. In order to drain the queue, we create a worker here. + Alternatively, we could just rely on timer to detect stall, and + create thread, but waiting for timer would be an inefficient and + pointless delay. */ create_worker(thread_group); } } } mysql_mutex_unlock(&thread_group->mutex); - - if (listener_picks_event) - { - retval= (connection_t *)native_event_get_userdata(&ev[0]); - break; - } } - - + DBUG_RETURN(retval); } -/* +/** Creates a new worker thread. thread_mutex must be held when calling this function @@ -806,10 +814,10 @@ static int wake_or_create_thread(thread_group_t *thread_group) if (thread_group->active_thread_count == 0) { /* - We're better off creating a new thread here with no delay, - either there is no workers at all, or they all are all blocking - and there was no sleeping thread to wakeup. It smells like deadlock - or very slowly executing requests, e.g sleeps or user locks. + We're better off creating a new thread here with no delay, either there + are no workers at all, or they all are all blocking and there was no + idle thread to wakeup. Smells like a potential deadlock or very slowly + executing requests, e.g sleeps or user locks. */ DBUG_RETURN(create_worker(thread_group)); } @@ -862,7 +870,8 @@ void thread_group_destroy(thread_group_t *thread_group) /** Wake sleeping thread from waiting list - */ +*/ + static int wake_thread(thread_group_t *thread_group) { DBUG_ENTER("wake_thread"); @@ -879,16 +888,14 @@ static int wake_thread(thread_group_t *thread_group) } -/* +/** Initiate shutdown for thread group. - - The shutdown is asynchronous, we only care to wake all threads - in here, so they can finish. We do not wait here until threads - terminate, - - Final cleanup of the group (thread_group_destroy) will be done by - the last exiting threads. + + The shutdown is asynchronous, we only care to wake all threads in here, so + they can finish. We do not wait here until threads terminate. Final cleanup + of the group (thread_group_destroy) will be done by the last exiting threads. */ + static void thread_group_close(thread_group_t *thread_group) { DBUG_ENTER("thread_group_close"); @@ -938,10 +945,11 @@ static void thread_group_close(thread_group_t *thread_group) perform login (this is done in worker threads). */ + static void queue_put(thread_group_t *thread_group, connection_t *connection) { DBUG_ENTER("queue_put"); - + mysql_mutex_lock(&thread_group->mutex); thread_group->queue.push_back(connection); if (thread_group->active_thread_count == 0) @@ -949,14 +957,16 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection) wake_or_create_thread(thread_group); } mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; } /* - This is used to prevent too many threads executing at the same time, - if the workload is not CPU bound. + Prevent too many threads executing at the same time,if the workload is + not CPU bound. */ + static bool too_many_threads(thread_group_t *thread_group) { return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe @@ -964,7 +974,6 @@ static bool too_many_threads(thread_group_t *thread_group) } - /** Retrieve a connection with pending event. @@ -981,17 +990,15 @@ static bool too_many_threads(thread_group_t *thread_group) @return connection with pending event. NULL is returned if timeout has expired,or on shutdown. */ + connection_t *get_event(worker_thread_t *current_thread, thread_group_t *thread_group, struct timespec *abstime) { DBUG_ENTER("get_event"); - connection_t *connection = NULL; int err=0; mysql_mutex_lock(&thread_group->mutex); - - DBUG_ASSERT(thread_group->active_thread_count >= 0); do @@ -1083,6 +1090,7 @@ connection_t *get_event(worker_thread_t *current_thread, Tells the pool that worker starts waiting on IO, lock, condition, sleep() or similar. */ + void wait_begin(thread_group_t *thread_group) { DBUG_ENTER("wait_begin"); @@ -1200,7 +1208,7 @@ static void connection_abort(connection_t *connection) /** - MySQL scheduler callback : kill connection + MySQL scheduler callback : kill connection */ void tp_post_kill_notification(THD *thd) @@ -1215,7 +1223,7 @@ void tp_post_kill_notification(THD *thd) } /** - MySQL scheduler callback: wait begin + MySQL scheduler callback: wait begin */ void tp_wait_begin(THD *thd, int type) @@ -1237,7 +1245,7 @@ void tp_wait_begin(THD *thd, int type) /** - MySQL scheduler callback: wait end + MySQL scheduler callback: wait end */ void tp_wait_end(THD *thd) @@ -1256,7 +1264,7 @@ void tp_wait_end(THD *thd) DBUG_VOID_RETURN; } - + static void set_next_timeout_check(ulonglong abstime) { DBUG_ENTER("set_next_timeout_check"); @@ -1273,7 +1281,6 @@ static void set_next_timeout_check(ulonglong abstime) /** Set wait timeout for connection. */ - static void set_wait_timeout(connection_t *c) { DBUG_ENTER("set_wait_timeout"); @@ -1400,8 +1407,9 @@ static void handle_event(connection_t *connection) } /** - Worker thread's main + Worker thread's main */ + static void *worker_main(void *param) { @@ -1543,7 +1551,8 @@ void tp_set_threadpool_stall_limit(uint limit) Calculate number of idle/waiting threads in the pool. Sum idle threads over all groups. - Don't do any locking, it is not required for stats. + D + on't do any locking, it is not required for stats. */ int tp_get_idle_thread_count() { @@ -1601,7 +1610,8 @@ static void print_pool_blocked_message(bool max_threads_reached) else sql_print_error(create_thread_error_msg, my_errno); - sql_print_information("Threadpool has been blocked for %u seconds\n",(uint)(now- pool_block_start)); + sql_print_information("Threadpool has been blocked for %u seconds\n", + (uint)(now- pool_block_start)); /* avoid reperated messages for the same blocking situation */ msg_written= true; } |