diff options
author | Vladislav Vaintroub <wlad@montyprogram.com> | 2012-01-26 04:35:54 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@montyprogram.com> | 2012-01-26 04:35:54 +0100 |
commit | 57b6cb39aa268a49ad05f86025386ddde6516670 (patch) | |
tree | f42cc3ac4dfd9b7d45b57c3126b19755e604ec62 /sql/threadpool_win.cc | |
parent | 7ed6530a066ab1e1659c39ee614e503462d8d403 (diff) | |
download | mariadb-git-57b6cb39aa268a49ad05f86025386ddde6516670.tar.gz |
Further review points and simplify Windows implementation
Diffstat (limited to 'sql/threadpool_win.cc')
-rw-r--r-- | sql/threadpool_win.cc | 125 |
1 files changed, 44 insertions, 81 deletions
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index 7a2c8c0c6cf..ae186a614ee 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -17,6 +17,30 @@ #include <windows.h> +/* + Threadpool API is not available on XP. We still want to compile a single + version on Windows, but use the latest functionality if available. + We cannot use threadpool functionality directly, since executable won't + start on XP and loader will complain about missing symbols. + + We solve using the usual way it is done on Windows, i.e with dynamic loading. + We'll need to load a lot of function, and make this less painful with the + WEAK_SYMBOL macro below +*/ + +/* + WEAK_SYMBOL(return_type, function_name, argument_type1,..,argument_typeN) + + Declare and load function pointer from kernel32. The name of the static + variable that holds the function pointer is my_<original function name> + This should be combined with + #define <original function name> my_<original function name> + so that one could use Widows APIs transparently, without worrying whether + they are present in a particular version or not. + + Of course, prior to use of any function there should be a check for correct + Windows version, or check whether function pointer is not NULL. +*/ #define WEAK_SYMBOL(return_type, function, ...) \ typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \ static pFN_##function my_##function = (pFN_##function) \ @@ -110,9 +134,7 @@ WEAK_SYMBOL(VOID, CloseThreadpoolWork, PTP_WORK pwk); WEAK_SYMBOL(BOOL, SetThreadpoolStackInformation, PTP_POOL, PTP_POOL_STACK_INFORMATION); #define SetThreadpoolStackInformation my_SetThreadpoolStackInformation -#endif - -#if _MSC_VER < 1600 +#else /* _MSC_VER < 1600 */ #define SetThreadpoolCallbackPriority(env,prio) typedef enum _TP_CALLBACK_PRIORITY { TP_CALLBACK_PRIORITY_HIGH, @@ -158,8 +180,6 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance, PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); -#define CONNECTION_SIGNATURE 0xAFFEAFFE - static void check_thread_init(); /* Get current time as Windows time */ @@ -178,21 +198,19 @@ static ulonglong now() struct connection_t { THD *thd; - bool logged_in; HANDLE handle; OVERLAPPED overlapped; - /* absolute time for wait timeout (as Windows time) */ volatile ulonglong timeout; - PTP_CLEANUP_GROUP cleanup_group; TP_CALLBACK_ENVIRON callback_environ; - PTP_IO io; PTP_TIMER timer; PTP_WAIT shm_read; + bool logged_in; }; + void init_connection(connection_t *connection) { connection->logged_in = false; @@ -208,6 +226,7 @@ void init_connection(connection_t *connection) connection->thd = 0; } + int init_io(connection_t *connection, THD *thd) { connection->thd= thd; @@ -237,7 +256,7 @@ int init_io(connection_t *connection, THD *thd) if (connection->handle) { /* Performance tweaks (s. MSDN documentation)*/ - UCHAR flags = FILE_SKIP_SET_EVENT_ON_HANDLE; + UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE; if (skip_completion_port_on_success) { flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; @@ -245,7 +264,7 @@ int init_io(connection_t *connection, THD *thd) (void)SetFileCompletionNotificationModes(connection->handle, flags); /* Assign io completion callback */ - connection->io = CreateThreadpoolIo(connection->handle, + connection->io= CreateThreadpoolIo(connection->handle, io_completion_callback, connection, &connection->callback_environ); if(!connection->io) { @@ -253,7 +272,7 @@ int init_io(connection_t *connection, THD *thd) return -1; } } - connection->timer = CreateThreadpoolTimer(timer_callback, connection, + connection->timer= CreateThreadpoolTimer(timer_callback, connection, &connection->callback_environ); if (!connection->timer) { @@ -354,6 +373,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) return -1; } + int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance) { if (threadpool_add_connection(connection->thd) == 0 @@ -380,21 +400,6 @@ void set_wait_timeout(connection_t *connection, ulonglong old_timeout) connection->timeout = new_timeout; } -/* - Terminates (idle) connection by closing the socket. - This will activate io_completion_callback() in a different thread -*/ -void post_kill_notification(connection_t *connection) -{ - check_thread_init(); - THD *thd=connection->thd; - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->killed = KILL_CONNECTION; - vio_shutdown(thd->net.vio, SHUT_RDWR); - thd->mysys_var= NULL; - mysql_mutex_unlock(&thd->LOCK_thd_data); -} - /* Connection destructor */ void destroy_connection(connection_t *connection) @@ -438,7 +443,6 @@ static void check_thread_init() if (FlsGetValue(fls) == NULL) { FlsSetValue(fls, (void *)1); - my_thread_init(); thread_created++; InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); } @@ -446,28 +450,14 @@ static void check_thread_init() /* - Take care of proper cleanup when threadpool threads exit. - We do not control how threads are created, thus it is our responsibility to - check that my_thread_init() is called on thread initialization and - my_thread_end() on thread destruction. On Windows, FlsAlloc() provides the - thread destruction callbacks. + Decrement number of threads when a thread exits . + On Windows, FlsAlloc() provides the thread destruction callbacks. */ static VOID WINAPI thread_destructor(void *data) { if(data) { - if (InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads) >= 0) - { - /* - The above check for number of thread >= 0 is due to shutdown code ( - see tp_end()) where we forcefully set num_worker_threads to 0, even - if not all threads have shut down yet to the point they would ran Fls - destructors, even after CloseThreadpool(). See also comment in tp_end(). - */ - mysql_mutex_lock(&LOCK_thread_count); - my_thread_end(); - mysql_mutex_unlock(&LOCK_thread_count); - } + InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads); } } @@ -507,7 +497,7 @@ bool tp_init(void) { TP_POOL_STACK_INFORMATION stackinfo; stackinfo.StackCommit = 0; - stackinfo.StackReserve = my_thread_stack_size; + stackinfo.StackReserve = (SIZE_T)my_thread_stack_size; if (!SetThreadpoolStackInformation(pool, &stackinfo)) { tp_log_warning("Can't set threadpool stack size", @@ -520,45 +510,19 @@ bool tp_init(void) } -/* +/** Scheduler callback : Destroy the scheduler. */ - -extern "C" uint THR_thread_count; -extern "C" mysql_mutex_t THR_LOCK_threads; -extern "C" mysql_cond_t THR_COND_threads; - void tp_end(void) { if(pool) { SetThreadpoolThreadMaximum(pool, 0); CloseThreadpool(pool); - - /* - Tell my_global_thread_end() we're complete. - - This would not be necessary if CloseThreadpool() would synchronously - release all threads and wait until they disappear and call all their FLS - destructors . However, threads in the pool are released asynchronously - and might spend some time in the CRT shutdown code. Thus zero - num_worker_threads, to avoid thread destructor's my_thread_end()s after - this point. - */ - LONG remaining_threads= - InterlockedExchange( (volatile long *)&tp_stats.num_worker_threads, 0); - - if (remaining_threads) - { - mysql_mutex_lock(&THR_LOCK_threads); - THR_thread_count -= remaining_threads; - mysql_cond_signal(&THR_COND_threads); - mysql_mutex_unlock(&THR_LOCK_threads); - } } } -/* +/** Notify pool about connection being killed. */ void tp_post_kill_notification(THD *thd) @@ -606,7 +570,7 @@ error: DisassociateCurrentThreadFromCallback(instance); destroy_connection(connection); - my_free(connection); + free(connection); } @@ -623,7 +587,7 @@ static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance, if (login(connection, instance) != 0) { destroy_connection(connection); - my_free(connection); + free(connection); } } @@ -688,9 +652,8 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, */ void tp_add_connection(THD *thd) { - bool success = false; - connection_t *con = (connection_t *)my_malloc(sizeof(connection_t), 0); - + connection_t *con = (connection_t *)malloc(sizeof(connection_t)); + if (con) threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); @@ -698,6 +661,7 @@ void tp_add_connection(THD *thd) if(!con) { tp_log_warning("Allocation failed", "tp_add_connection"); + threadpool_remove_connection(thd); return; } @@ -718,8 +682,7 @@ void tp_add_connection(THD *thd) } - -/* +/** Sets the number of idle threads the thread pool maintains in anticipation of new requests. */ |