diff options
author | Vladislav Vaintroub <wlad@montyprogram.com> | 2011-12-19 13:28:30 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@montyprogram.com> | 2011-12-19 13:28:30 +0100 |
commit | df48c9bf20955c4e49009c95c5c7e1856c212e81 (patch) | |
tree | f39850b5c53f661416a4923c0d0ba869449ebbcd /sql/threadpool_unix.cc | |
parent | 2e4bde4c0febf1282639ce2f13e24c707e1f45b0 (diff) | |
download | mariadb-git-df48c9bf20955c4e49009c95c5c7e1856c212e81.tar.gz |
allow changing thread_pool_size without server restart
Diffstat (limited to 'sql/threadpool_unix.cc')
-rw-r--r-- | sql/threadpool_unix.cc | 186 |
1 files changed, 147 insertions, 39 deletions
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index a8ceb250e5c..0b5c151d93b 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -98,7 +98,8 @@ struct thread_group_t ulonglong queue_event_count; } MY_ALIGNED(512); -static thread_group_t all_groups[128]; +static thread_group_t all_groups[MAX_THREAD_GROUPS]; +static uint group_count; /* Global timer for all groups */ struct pool_timer_t @@ -213,10 +214,10 @@ int io_poll_start_read(int pollfd, int fd, void *data) return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev); } -void io_poll_disassociate_fd(int pollfd, int fd) +int io_poll_disassociate_fd(int pollfd, int fd) { struct epoll_event ev; - epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev); + return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev); } @@ -258,11 +259,11 @@ int io_poll_associate_fd(int pollfd, int fd, void *data) } -int io_poll_disassociate_fd(thread_group_t *thread_group, int fd) +int io_poll_disassociate_fd(int pollfd, int fd) { struct kevent ke; EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); - return kevent(thread_group->pollfd, &ke, 1, 0, 0, 0); + return kevent(pollfd, &ke, 1, 0, 0, 0); } @@ -315,6 +316,11 @@ static int io_poll_associate_fd(int pollfd, int fd, void *data) return io_poll_start_read(pollfd, fd, data); } +int io_poll_disassociate_fd(int pollfd, int fd) +{ + return 0; +} + int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms) { struct timespec ts; @@ -466,7 +472,7 @@ static void* timer_thread(void *param) timer->current_microtime= microsecond_interval_timer(); /* Check stallls in thread groups */ - for(i=0; i< threadpool_size;i++) + for(i=0; i< array_elements(all_groups);i++) { if(all_groups[i].connection_count) check_stall(&all_groups[i]); @@ -723,21 +729,9 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) SLIST_INIT(&thread_group->waiting_threads); thread_group->pending_thread_start_count= 0; - thread_group->pollfd= io_poll_create(); thread_group->stalled= false; - if (thread_group->pollfd < 0) - { - DBUG_RETURN(-1); - } - if (pipe(thread_group->shutdown_pipe)) - { - DBUG_RETURN(-1); - } - if (io_poll_associate_fd(thread_group->pollfd, - thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT)) - { - DBUG_RETURN(-1); - } + + thread_group->pollfd= -1; DBUG_RETURN(0); } @@ -772,9 +766,29 @@ static void thread_group_close(thread_group_t *thread_group) char c= 0; mysql_mutex_lock(&thread_group->mutex); + if (thread_group->thread_count == 0 && + thread_group->pending_thread_start_count == 0) + { + if (thread_group->pollfd >= 0) + close(thread_group->pollfd); + mysql_mutex_unlock(&thread_group->mutex); + mysql_mutex_destroy(&thread_group->mutex); + DBUG_VOID_RETURN; + } + thread_group->shutdown= true; thread_group->listener= NULL; + if (pipe(thread_group->shutdown_pipe)) + { + DBUG_VOID_RETURN; + } + if (io_poll_associate_fd(thread_group->pollfd, + thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT)) + { + DBUG_VOID_RETURN; + } + /* Wake listener. */ if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) DBUG_VOID_RETURN; @@ -1000,7 +1014,7 @@ void tp_add_connection(THD *thd) connection_t *c= alloc_connection(thd); if(c) { - c->thread_group= &all_groups[c->thd->thread_id%threadpool_size]; + c->thread_group= &all_groups[c->thd->thread_id%group_count]; mysql_mutex_lock(&c->thread_group->mutex); c->thread_group->connection_count++; mysql_mutex_unlock(&c->thread_group->mutex); @@ -1101,6 +1115,87 @@ static void set_wait_timeout(connection_t *c) DBUG_VOID_RETURN; } + + +/* + Handle a (rare) special case,where connection needs to + migrate to a different group because group_count has changed + as a result of thread_pool_size setting. +*/ +static int change_group(connection_t *c, + thread_group_t *old_group, + thread_group_t *new_group) +{ + int ret= 0; + int fd = c->thd->net.vio->sd; + + DBUG_ASSERT(c->thread_group == old_group); + + /* Remove connection from the old group. */ + mysql_mutex_lock(&old_group->mutex); + if (c->logged_in) + io_poll_disassociate_fd(old_group->pollfd,fd); + c->thread_group->connection_count--; + mysql_mutex_lock(&old_group->mutex); + + /* Add connection to the new group. */ + mysql_mutex_lock(&new_group->mutex); + + c->thread_group= new_group; + new_group->connection_count++; + + /* Ensure that there is a listener in the new group. */ + if(!new_group->thread_count && !new_group->pending_thread_start_count) + ret= create_worker(new_group); + + mysql_mutex_unlock(&new_group->mutex); + return ret; +} + + +static int start_io(connection_t *c) +{ + int fd = c->thd->net.vio->sd; + + /* + Usually, connection will stay in the same group for the entire + connection's life. However, we do allow group_count to + change at runtime, which means in rare cases when it changes is + connection should need to migrate to another group, this ensures + to ensure equal load between groups. + + So we recalculate in which group the connection should be, based + on thread_id and current group count, and migrate if necessary. + */ + thread_group_t *g = &all_groups[c->thd->thread_id%group_count]; + + if (g != c->thread_group) + { + if (!change_group(c, c->thread_group, g)) + { + c->logged_in= true; + return io_poll_associate_fd(c->thread_group->pollfd, fd, c); + } + else + return -1; + } + + + /* + Handle case where connection is not yet logged in, i.e + not associated with poll fd. + */ + if(!c->logged_in) + { + c->logged_in= true; + return io_poll_associate_fd(c->thread_group->pollfd, fd, c); + } + + return io_poll_start_read(c->thread_group->pollfd, fd, c); +} + + + static void handle_event(pool_event_t *ev) { @@ -1108,13 +1203,11 @@ static void handle_event(pool_event_t *ev) /* Normal case, handle query on connection */ connection_t *c = (connection_t*)(void *)ev; - bool do_login = (!c->logged_in); int ret; - if (do_login) + if (!c->logged_in) { ret= threadpool_add_connection(c->thd); - c->logged_in= true; } else { @@ -1124,13 +1217,7 @@ static void handle_event(pool_event_t *ev) if(!ret) { set_wait_timeout(c); - int fd = c->thd->net.vio->sd; - if (do_login) - { - ret= io_poll_associate_fd(c->thread_group->pollfd, fd, c); - } - else - ret= io_poll_start_read(c->thread_group->pollfd, fd, c); + ret= start_io(c); } if (ret) @@ -1159,8 +1246,8 @@ static void *worker_main(void *param) this_thread.thread_group= thread_group; this_thread.event_count=0; + my_atomic_add32(&tp_stats.num_worker_threads, 1); mysql_mutex_lock(&thread_group->mutex); - tp_stats.num_worker_threads++; thread_group->thread_count++; thread_group->active_thread_count++; thread_group->pending_thread_start_count--; @@ -1187,8 +1274,8 @@ static void *worker_main(void *param) mysql_mutex_lock(&thread_group->mutex); thread_group->active_thread_count--; thread_group->thread_count--; - tp_stats.num_worker_threads--; mysql_mutex_unlock(&thread_group->mutex); + my_atomic_add32(&tp_stats.num_worker_threads, -1); /* If it is the last thread in pool and pool is terminating, destroy pool.*/ if (thread_group->shutdown && (thread_group->thread_count == 0)) @@ -1209,15 +1296,12 @@ bool tp_init() DBUG_ENTER("tp_init"); started = true; scheduler_init(); - if (threadpool_size == 0) - { - threadpool_size= my_getncpus(); - } - for(uint i=0; i < threadpool_size; i++) + for(uint i=0; i < array_elements(all_groups); i++) { thread_group_init(&all_groups[i], get_connection_attrib()); } + tp_set_threadpool_size(threadpool_size); #define PSI_register(X) \ if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) @@ -1239,9 +1323,33 @@ void tp_end() DBUG_VOID_RETURN; stop_timer(&pool_timer); - for(uint i=0; i< threadpool_size; i++) + for(uint i=0; i< array_elements(all_groups); i++) { thread_group_close(&all_groups[i]); } DBUG_VOID_RETURN; } + +/* Ensure that poll descriptors are created when threadpool_size changes */ +int tp_set_threadpool_size(uint size) +{ + bool success= true; + for(uint i=0; i< size; i++) + { + thread_group_t *group= &all_groups[i]; + mysql_mutex_lock(&group->mutex); + if (group->pollfd == -1) + { + group->pollfd= io_poll_create(); + success= (group->pollfd >= 0); + } + mysql_mutex_unlock(&all_groups[i].mutex); + if (!success) + { + group_count= i-1; + return -1; + } + } + group_count= size; + return 0; +} |