diff options
-rw-r--r-- | sql/threadpool.h | 5 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 11 | ||||
-rw-r--r-- | sql/threadpool_unix.cc | 68 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 8 |
4 files changed, 60 insertions, 32 deletions
diff --git a/sql/threadpool.h b/sql/threadpool.h index a2c61f8a6ca..754b8f1e1a8 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -115,6 +115,7 @@ struct TP_connection struct TP_pool { virtual ~TP_pool(){}; + virtual int init()= 0; virtual TP_connection *new_connection(CONNECT *)= 0; virtual void add(TP_connection *c)= 0; virtual int set_max_threads(uint){ return 0; } @@ -130,7 +131,8 @@ struct TP_pool #ifdef _WIN32 struct TP_pool_win:TP_pool { - TP_pool_win(); + TP_pool_win(); + virtual int init(); virtual ~TP_pool_win(); virtual TP_connection *new_connection(CONNECT *c); virtual void add(TP_connection *); @@ -143,6 +145,7 @@ struct TP_pool_unix :TP_pool { TP_pool_unix(); ~TP_pool_unix(); + virtual int init(); virtual TP_connection *new_connection(CONNECT *c); virtual void add(TP_connection *); virtual int set_pool_size(uint); diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 730ea2bd365..e3f968003b7 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -351,11 +351,18 @@ static bool tp_init() #ifdef _WIN32 pool = new (std::nothrow) TP_pool_win; - return 0; #else pool= new (std::nothrow) TP_pool_unix; - return 0; #endif + if (!pool) + return true; + if (pool->init()) + { + delete pool; + pool= 0; + return true; + } + return false; } static void tp_add_connection(CONNECT *connect) diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index 59df1d5403a..d3964714dd5 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -134,19 +134,9 @@ worker_list_t; struct TP_connection_unix:public TP_connection { - TP_connection_unix(CONNECT *c): - TP_connection(c), - thread_group(0), - next_in_queue(0), - prev_in_queue(0), - abs_wait_timeout(ULONGLONG_MAX), - bound_to_poll_descriptor(false), - waiting(false) -#ifdef HAVE_IOCP - ,overlapped() -#endif - { - } + TP_connection_unix(CONNECT *c); + ~TP_connection_unix(); + virtual int init(){ return 0; }; virtual void set_io_timeout(int sec); virtual int start_io(); @@ -1331,21 +1321,12 @@ void TP_pool_unix::add(TP_connection *c) DBUG_ENTER("tp_add_connection"); TP_connection_unix *connection=(TP_connection_unix *)c; - /* Assign connection to a group. */ - thread_group_t *group= - &all_groups[c->connect->thread_id%group_count]; - connection->thread_group=group; - - mysql_mutex_lock(&group->mutex); - group->connection_count++; - mysql_mutex_unlock(&group->mutex); - /* Add connection to the work queue.Actual logon will be done by a worker thread. */ - queue_put(group, connection); + queue_put(connection->thread_group, connection); DBUG_VOID_RETURN; } @@ -1392,6 +1373,35 @@ static void set_next_timeout_check(ulonglong abstime) DBUG_VOID_RETURN; } +TP_connection_unix::TP_connection_unix(CONNECT *c): + TP_connection(c), + thread_group(0), + next_in_queue(0), + prev_in_queue(0), + abs_wait_timeout(ULONGLONG_MAX), + bound_to_poll_descriptor(false), + waiting(false) +#ifdef HAVE_IOCP +, overlapped() +#endif +{ + /* Assign connection to a group. */ + thread_group_t *group= + &all_groups[c->thread_id%group_count]; + + thread_group=group; + + mysql_mutex_lock(&group->mutex); + group->connection_count++; + mysql_mutex_unlock(&group->mutex); +} + +TP_connection_unix::~TP_connection_unix() +{ + mysql_mutex_lock(&thread_group->mutex); + thread_group->connection_count--; + mysql_mutex_unlock(&thread_group->mutex); +} /** Set wait timeout for connection. @@ -1546,6 +1556,9 @@ static void *worker_main(void *param) TP_pool_unix::TP_pool_unix() +{} + +int TP_pool_unix::init() { DBUG_ENTER("TP_pool_unix::TP_pool_unix"); threadpool_max_size= MY_MAX(threadpool_size, 128); @@ -1555,11 +1568,10 @@ TP_pool_unix::TP_pool_unix() { threadpool_max_size= 0; sql_print_error("Allocation failed"); - exit(1); + DBUG_RETURN(-1); } - threadpool_started= true; scheduler_init(); - + threadpool_started= true; for (uint i= 0; i < threadpool_max_size; i++) { thread_group_init(&all_groups[i], get_connection_attrib()); @@ -1569,7 +1581,7 @@ TP_pool_unix::TP_pool_unix() { /* Something went wrong */ sql_print_error("Can't set threadpool size to %d",threadpool_size); - exit(1); + DBUG_RETURN(-1); } PSI_register(mutex); PSI_register(cond); @@ -1577,7 +1589,7 @@ TP_pool_unix::TP_pool_unix() pool_timer.tick_interval= threadpool_stall_limit; start_timer(&pool_timer); - DBUG_VOID_RETURN; + DBUG_RETURN(0); } TP_pool_unix::~TP_pool_unix() diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index 2c012a9e77b..cc8b3081ff6 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -443,6 +443,9 @@ static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context } TP_pool_win::TP_pool_win() +{} + +int TP_pool_win::init() { fls= FlsAlloc(thread_destructor); pool= CreateThreadpool(NULL); @@ -452,7 +455,7 @@ TP_pool_win::TP_pool_win() sql_print_error("Can't create threadpool. " "CreateThreadpool() failed with %d. Likely cause is memory pressure", GetLastError()); - exit(1); + return -1; } InitializeThreadpoolEnvironment(&callback_environ); @@ -486,6 +489,7 @@ TP_pool_win::TP_pool_win() "SetThreadpoolStackInformation"); } } + return 0; } @@ -494,6 +498,8 @@ TP_pool_win::TP_pool_win() */ TP_pool_win::~TP_pool_win() { + if (!pool) + return; DestroyThreadpoolEnvironment(&callback_environ); SetThreadpoolThreadMaximum(pool, 0); CloseThreadpool(pool); |