diff options
author | Vladislav Vaintroub <wlad@montyprogram.com> | 2012-01-28 01:09:28 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@montyprogram.com> | 2012-01-28 01:09:28 +0100 |
commit | 4d1c7b794730b101619c5dac33af71e7abcd54d2 (patch) | |
tree | 134e697087703e5ade58d9f286395f596cc74983 /sql/threadpool_unix.cc | |
parent | d782e098a7dea389ab79d313c9467129a9aac3f4 (diff) | |
download | mariadb-git-4d1c7b794730b101619c5dac33af71e7abcd54d2.tar.gz |
some more whitespace, remove pending_thread_start_count. increment counters (thread_group->count, thread_group->active_thread_count) whenever mysql_create_thread returns success.
Diffstat (limited to 'sql/threadpool_unix.cc')
-rw-r--r-- | sql/threadpool_unix.cc | 93 |
1 files changed, 49 insertions, 44 deletions
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index bdb9b2fc8a7..0a1e3ceb8a7 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -58,7 +58,7 @@ static PSI_thread_info thread_list[] = /* Macro to simplify performance schema registration */ #define PSI_register(X) \ - if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) + if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) struct thread_group_t; @@ -112,7 +112,6 @@ struct thread_group_t int pollfd; int thread_count; int active_thread_count; - int pending_thread_start_count; int connection_count; /* Stats for the deadlock detection timer routine.*/ int io_event_count; @@ -691,8 +690,7 @@ static connection_t * listener(worker_thread_t *current_thread, Wake failed, hence groups has no idle threads. Now check if there are any threads in the group except listener. */ - if(thread_group->thread_count == 1 && - thread_group->pending_thread_start_count == 0) + if(thread_group->thread_count == 1) { /* Currently there is no worker thread in the group, as indicated by @@ -714,7 +712,22 @@ static connection_t * listener(worker_thread_t *current_thread, DBUG_RETURN(retval); } +/** + Adjust thread counters in group or global + whenever thread is created or is about to exit + + @param thread_group + @param count - 1, when new thread is created + -1, when thread is about to exit +*/ +static void add_thread_count(thread_group_t *thread_group, int32 count) +{ + thread_group->thread_count += count; + /* worker starts out and end in "active" state */ + thread_group->active_thread_count += count; + my_atomic_add32(&tp_stats.num_worker_threads, count); +} /** @@ -740,13 +753,15 @@ static int create_worker(thread_group_t *thread_group) max_threads_reached= true; goto end; } + err= mysql_thread_create(key_worker_thread, &thread_id, thread_group->pthread_attr, worker_main, thread_group); if (!err) { - thread_group->pending_thread_start_count++; thread_group->last_thread_creation_time=microsecond_interval_timer(); + thread_created++; + add_thread_count(thread_group, 1); } else { @@ -803,12 +818,12 @@ static int wake_or_create_thread(thread_group_t *thread_group) { DBUG_ENTER("wake_or_create_thread"); + if (thread_group->shutdown) + DBUG_RETURN(0); + if (wake_thread(thread_group) == 0) DBUG_RETURN(0); - if (thread_group->pending_thread_start_count > 0) - DBUG_RETURN(-1); - if (thread_group->thread_count > thread_group->connection_count) DBUG_RETURN(-1); @@ -903,8 +918,7 @@ static void thread_group_close(thread_group_t *thread_group) DBUG_ENTER("thread_group_close"); mysql_mutex_lock(&thread_group->mutex); - if (thread_group->thread_count == 0 && - thread_group->pending_thread_start_count == 0) + if (thread_group->thread_count == 0) { mysql_mutex_unlock(&thread_group->mutex); thread_group_destroy(thread_group); @@ -954,10 +968,10 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection) mysql_mutex_lock(&thread_group->mutex); thread_group->queue.push_back(connection); + if (thread_group->active_thread_count == 0) - { wake_or_create_thread(thread_group); - } + mysql_mutex_unlock(&thread_group->mutex); DBUG_VOID_RETURN; @@ -1060,7 +1074,7 @@ connection_t *get_event(worker_thread_t *current_thread, thread_group->waiting_threads.push_front(current_thread); thread_group->active_thread_count--; - if(abstime) + if (abstime) { err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, abstime); @@ -1081,7 +1095,7 @@ connection_t *get_event(worker_thread_t *current_thread, thread_group->waiting_threads.remove(current_thread); } - if(err) + if (err) break; } @@ -1107,7 +1121,7 @@ void wait_begin(thread_group_t *thread_group) DBUG_ASSERT(thread_group->active_thread_count >=0); DBUG_ASSERT(thread_group->connection_count > 0); - if((thread_group->active_thread_count == 0) && + if ((thread_group->active_thread_count == 0) && (thread_group->queue.is_empty() || !thread_group->listener)) { /* @@ -1168,7 +1182,7 @@ void tp_add_connection(THD *thd) threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); connection_t *connection= alloc_connection(thd); - if(connection) + if (connection) { thd->event_scheduler.data= connection; @@ -1243,7 +1257,7 @@ void tp_wait_begin(THD *thd, int type) DBUG_VOID_RETURN; connection_t *connection = (connection_t *)thd->event_scheduler.data; - if(connection) + if (connection) { DBUG_ASSERT(!connection->waiting); connection->waiting= true; @@ -1264,7 +1278,7 @@ void tp_wait_end(THD *thd) DBUG_VOID_RETURN; connection_t *connection = (connection_t *)thd->event_scheduler.data; - if(connection) + if (connection) { DBUG_ASSERT(connection->waiting); connection->waiting = false; @@ -1290,6 +1304,7 @@ static void set_next_timeout_check(ulonglong abstime) /** Set wait timeout for connection. */ + static void set_wait_timeout(connection_t *c) { DBUG_ENTER("set_wait_timeout"); @@ -1316,6 +1331,7 @@ static void set_wait_timeout(connection_t *c) migrate to a different group because group_count has changed after thread_pool_size setting. */ + static int change_group(connection_t *c, thread_group_t *old_group, thread_group_t *new_group) @@ -1340,7 +1356,7 @@ static int change_group(connection_t *c, c->thread_group= new_group; new_group->connection_count++; /* Ensure that there is a listener in the new group. */ - if(!new_group->thread_count && !new_group->pending_thread_start_count) + if (!new_group->thread_count) ret= create_worker(new_group); mysql_mutex_unlock(&new_group->mutex); return ret; @@ -1373,7 +1389,7 @@ static int start_io(connection_t *connection) /* Bind to poll descriptor if not yet done. */ - if(!connection->bound_to_poll_descriptor) + if (!connection->bound_to_poll_descriptor) { connection->bound_to_poll_descriptor= true; return io_poll_associate_fd(group->pollfd, fd, connection); @@ -1400,7 +1416,7 @@ static void handle_event(connection_t *connection) err= threadpool_process_request(connection->thd); } - if(!err) + if (!err) { set_wait_timeout(connection); err= start_io(connection); @@ -1427,7 +1443,6 @@ static void *worker_main(void *param) DBUG_ENTER("worker_main"); - thread_created++; thread_group_t *thread_group = (thread_group_t *)param; /* Init per-thread structure */ @@ -1435,13 +1450,6 @@ static void *worker_main(void *param) this_thread.thread_group= thread_group; this_thread.event_count=0; - my_atomic_add32(&tp_stats.num_worker_threads, 1); - mysql_mutex_lock(&thread_group->mutex); - thread_group->thread_count++; - thread_group->active_thread_count++; - thread_group->pending_thread_start_count--; - mysql_mutex_unlock(&thread_group->mutex); - /* Run event loop */ for(;;) { @@ -1450,9 +1458,7 @@ static void *worker_main(void *param) set_timespec(ts,threadpool_idle_timeout); connection = get_event(&this_thread, thread_group, &ts); if (!connection) - { break; - } this_thread.event_count++; handle_event(connection); } @@ -1460,19 +1466,16 @@ static void *worker_main(void *param) /* Thread shutdown: cleanup per-worker-thread structure. */ mysql_cond_destroy(&this_thread.cond); + bool last_thread; /* last thread in group exits */ mysql_mutex_lock(&thread_group->mutex); - thread_group->active_thread_count--; - thread_group->thread_count--; + add_thread_count(thread_group, -1); + last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown); mysql_mutex_unlock(&thread_group->mutex); - my_atomic_add32(&tp_stats.num_worker_threads, -1); - /* If it is the last thread in group and pool is terminating, destroy group.*/ - if (thread_group->shutdown - && thread_group->thread_count == 0 - && thread_group->pending_thread_start_count == 0) - { + /* Last thread in group exits and pool is terminating, destroy group.*/ + if (last_thread) thread_group_destroy(thread_group); - } + my_thread_end(); return NULL; } @@ -1519,6 +1522,7 @@ void tp_end() /** Ensure that poll descriptors are created when threadpool_size changes */ + int tp_set_threadpool_size(uint size) { bool success= true; @@ -1560,9 +1564,9 @@ void tp_set_threadpool_stall_limit(uint limit) Calculate number of idle/waiting threads in the pool. Sum idle threads over all groups. - D - on't do any locking, it is not required for stats. + Don't do any locking, it is not required for stats. */ + int tp_get_idle_thread_count() { int sum=0; @@ -1599,6 +1603,7 @@ static const char *create_thread_error_msg= It will be just a single message for each blocking situation (to prevent log flood). */ + static void print_pool_blocked_message(bool max_threads_reached) { time_t now; @@ -1612,9 +1617,9 @@ static void print_pool_blocked_message(bool max_threads_reached) return; } - if(now > pool_block_start + BLOCK_MSG_DELAY && !msg_written) + if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written) { - if(max_threads_reached) + if (max_threads_reached) sql_print_error(max_threads_reached_msg); else sql_print_error(create_thread_error_msg, my_errno); |