summaryrefslogtreecommitdiff
path: root/sql/threadpool_unix.cc
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@montyprogram.com>2011-12-19 13:28:30 +0100
committerVladislav Vaintroub <wlad@montyprogram.com>2011-12-19 13:28:30 +0100
commitdf48c9bf20955c4e49009c95c5c7e1856c212e81 (patch)
treef39850b5c53f661416a4923c0d0ba869449ebbcd /sql/threadpool_unix.cc
parent2e4bde4c0febf1282639ce2f13e24c707e1f45b0 (diff)
downloadmariadb-git-df48c9bf20955c4e49009c95c5c7e1856c212e81.tar.gz
allow changing thread_pool_size without server restart
Diffstat (limited to 'sql/threadpool_unix.cc')
-rw-r--r--sql/threadpool_unix.cc186
1 files changed, 147 insertions, 39 deletions
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc
index a8ceb250e5c..0b5c151d93b 100644
--- a/sql/threadpool_unix.cc
+++ b/sql/threadpool_unix.cc
@@ -98,7 +98,8 @@ struct thread_group_t
ulonglong queue_event_count;
} MY_ALIGNED(512);
-static thread_group_t all_groups[128];
+static thread_group_t all_groups[MAX_THREAD_GROUPS];
+static uint group_count;
/* Global timer for all groups */
struct pool_timer_t
@@ -213,10 +214,10 @@ int io_poll_start_read(int pollfd, int fd, void *data)
return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
}
-void io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(int pollfd, int fd)
{
struct epoll_event ev;
- epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
+ return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
}
@@ -258,11 +259,11 @@ int io_poll_associate_fd(int pollfd, int fd, void *data)
}
-int io_poll_disassociate_fd(thread_group_t *thread_group, int fd)
+int io_poll_disassociate_fd(int pollfd, int fd)
{
struct kevent ke;
EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- return kevent(thread_group->pollfd, &ke, 1, 0, 0, 0);
+ return kevent(pollfd, &ke, 1, 0, 0, 0);
}
@@ -315,6 +316,11 @@ static int io_poll_associate_fd(int pollfd, int fd, void *data)
return io_poll_start_read(pollfd, fd, data);
}
+int io_poll_disassociate_fd(int pollfd, int fd)
+{
+ return 0;
+}
+
int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
{
struct timespec ts;
@@ -466,7 +472,7 @@ static void* timer_thread(void *param)
timer->current_microtime= microsecond_interval_timer();
/* Check stallls in thread groups */
- for(i=0; i< threadpool_size;i++)
+ for(i=0; i< array_elements(all_groups);i++)
{
if(all_groups[i].connection_count)
check_stall(&all_groups[i]);
@@ -723,21 +729,9 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
SLIST_INIT(&thread_group->waiting_threads);
thread_group->pending_thread_start_count= 0;
- thread_group->pollfd= io_poll_create();
thread_group->stalled= false;
- if (thread_group->pollfd < 0)
- {
- DBUG_RETURN(-1);
- }
- if (pipe(thread_group->shutdown_pipe))
- {
- DBUG_RETURN(-1);
- }
- if (io_poll_associate_fd(thread_group->pollfd,
- thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT))
- {
- DBUG_RETURN(-1);
- }
+
+ thread_group->pollfd= -1;
DBUG_RETURN(0);
}
@@ -772,9 +766,29 @@ static void thread_group_close(thread_group_t *thread_group)
char c= 0;
mysql_mutex_lock(&thread_group->mutex);
+ if (thread_group->thread_count == 0 &&
+ thread_group->pending_thread_start_count == 0)
+ {
+ if (thread_group->pollfd >= 0)
+ close(thread_group->pollfd);
+ mysql_mutex_unlock(&thread_group->mutex);
+ mysql_mutex_destroy(&thread_group->mutex);
+ DBUG_VOID_RETURN;
+ }
+
thread_group->shutdown= true;
thread_group->listener= NULL;
+ if (pipe(thread_group->shutdown_pipe))
+ {
+ DBUG_VOID_RETURN;
+ }
+ if (io_poll_associate_fd(thread_group->pollfd,
+ thread_group->shutdown_pipe[0], &POOL_SHUTDOWN_EVENT))
+ {
+ DBUG_VOID_RETURN;
+ }
+
/* Wake listener. */
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
DBUG_VOID_RETURN;
@@ -1000,7 +1014,7 @@ void tp_add_connection(THD *thd)
connection_t *c= alloc_connection(thd);
if(c)
{
- c->thread_group= &all_groups[c->thd->thread_id%threadpool_size];
+ c->thread_group= &all_groups[c->thd->thread_id%group_count];
mysql_mutex_lock(&c->thread_group->mutex);
c->thread_group->connection_count++;
mysql_mutex_unlock(&c->thread_group->mutex);
@@ -1101,6 +1115,87 @@ static void set_wait_timeout(connection_t *c)
DBUG_VOID_RETURN;
}
+
+
+/*
+ Handle a (rare) special case,where connection needs to
+ migrate to a different group because group_count has changed
+ as a result of thread_pool_size setting.
+*/
+static int change_group(connection_t *c,
+ thread_group_t *old_group,
+ thread_group_t *new_group)
+{
+ int ret= 0;
+ int fd = c->thd->net.vio->sd;
+
+ DBUG_ASSERT(c->thread_group == old_group);
+
+ /* Remove connection from the old group. */
+ mysql_mutex_lock(&old_group->mutex);
+ if (c->logged_in)
+ io_poll_disassociate_fd(old_group->pollfd,fd);
+ c->thread_group->connection_count--;
+ mysql_mutex_lock(&old_group->mutex);
+
+ /* Add connection to the new group. */
+ mysql_mutex_lock(&new_group->mutex);
+
+ c->thread_group= new_group;
+ new_group->connection_count++;
+
+ /* Ensure that there is a listener in the new group. */
+ if(!new_group->thread_count && !new_group->pending_thread_start_count)
+ ret= create_worker(new_group);
+
+ mysql_mutex_unlock(&new_group->mutex);
+ return ret;
+}
+
+
+static int start_io(connection_t *c)
+{
+ int fd = c->thd->net.vio->sd;
+
+ /*
+ Usually, connection will stay in the same group for the entire
+ connection's life. However, we do allow group_count to
+ change at runtime, which means in rare cases when it changes is
+ connection should need to migrate to another group, this ensures
+ to ensure equal load between groups.
+
+ So we recalculate in which group the connection should be, based
+ on thread_id and current group count, and migrate if necessary.
+ */
+ thread_group_t *g = &all_groups[c->thd->thread_id%group_count];
+
+ if (g != c->thread_group)
+ {
+ if (!change_group(c, c->thread_group, g))
+ {
+ c->logged_in= true;
+ return io_poll_associate_fd(c->thread_group->pollfd, fd, c);
+ }
+ else
+ return -1;
+ }
+
+
+ /*
+ Handle case where connection is not yet logged in, i.e
+ not associated with poll fd.
+ */
+ if(!c->logged_in)
+ {
+ c->logged_in= true;
+ return io_poll_associate_fd(c->thread_group->pollfd, fd, c);
+ }
+
+ return io_poll_start_read(c->thread_group->pollfd, fd, c);
+}
+
+
+
static void handle_event(pool_event_t *ev)
{
@@ -1108,13 +1203,11 @@ static void handle_event(pool_event_t *ev)
/* Normal case, handle query on connection */
connection_t *c = (connection_t*)(void *)ev;
- bool do_login = (!c->logged_in);
int ret;
- if (do_login)
+ if (!c->logged_in)
{
ret= threadpool_add_connection(c->thd);
- c->logged_in= true;
}
else
{
@@ -1124,13 +1217,7 @@ static void handle_event(pool_event_t *ev)
if(!ret)
{
set_wait_timeout(c);
- int fd = c->thd->net.vio->sd;
- if (do_login)
- {
- ret= io_poll_associate_fd(c->thread_group->pollfd, fd, c);
- }
- else
- ret= io_poll_start_read(c->thread_group->pollfd, fd, c);
+ ret= start_io(c);
}
if (ret)
@@ -1159,8 +1246,8 @@ static void *worker_main(void *param)
this_thread.thread_group= thread_group;
this_thread.event_count=0;
+ my_atomic_add32(&tp_stats.num_worker_threads, 1);
mysql_mutex_lock(&thread_group->mutex);
- tp_stats.num_worker_threads++;
thread_group->thread_count++;
thread_group->active_thread_count++;
thread_group->pending_thread_start_count--;
@@ -1187,8 +1274,8 @@ static void *worker_main(void *param)
mysql_mutex_lock(&thread_group->mutex);
thread_group->active_thread_count--;
thread_group->thread_count--;
- tp_stats.num_worker_threads--;
mysql_mutex_unlock(&thread_group->mutex);
+ my_atomic_add32(&tp_stats.num_worker_threads, -1);
/* If it is the last thread in pool and pool is terminating, destroy pool.*/
if (thread_group->shutdown && (thread_group->thread_count == 0))
@@ -1209,15 +1296,12 @@ bool tp_init()
DBUG_ENTER("tp_init");
started = true;
scheduler_init();
- if (threadpool_size == 0)
- {
- threadpool_size= my_getncpus();
- }
- for(uint i=0; i < threadpool_size; i++)
+ for(uint i=0; i < array_elements(all_groups); i++)
{
thread_group_init(&all_groups[i], get_connection_attrib());
}
+ tp_set_threadpool_size(threadpool_size);
#define PSI_register(X) \
if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list))
@@ -1239,9 +1323,33 @@ void tp_end()
DBUG_VOID_RETURN;
stop_timer(&pool_timer);
- for(uint i=0; i< threadpool_size; i++)
+ for(uint i=0; i< array_elements(all_groups); i++)
{
thread_group_close(&all_groups[i]);
}
DBUG_VOID_RETURN;
}
+
+/* Ensure that poll descriptors are created when threadpool_size changes */
+int tp_set_threadpool_size(uint size)
+{
+ bool success= true;
+ for(uint i=0; i< size; i++)
+ {
+ thread_group_t *group= &all_groups[i];
+ mysql_mutex_lock(&group->mutex);
+ if (group->pollfd == -1)
+ {
+ group->pollfd= io_poll_create();
+ success= (group->pollfd >= 0);
+ }
+ mysql_mutex_unlock(&all_groups[i].mutex);
+ if (!success)
+ {
+ group_count= i-1;
+ return -1;
+ }
+ }
+ group_count= size;
+ return 0;
+}