diff options
author | Vladislav Vaintroub <wlad@montyprogram.com> | 2011-12-08 19:17:49 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@montyprogram.com> | 2011-12-08 19:17:49 +0100 |
commit | e91bbca5fb080a8d988c156d78c7dc1b1daaad82 (patch) | |
tree | edeb15da451e956ae0d6874657c910ab0df111f8 /sql/threadpool_win.cc | |
parent | 5e7b949e61f4330e27013c8ec81fa3d450e5dce6 (diff) | |
download | mariadb-git-e91bbca5fb080a8d988c156d78c7dc1b1daaad82.tar.gz |
Initial threadpool implementation for MariaDB 5.5
Diffstat (limited to 'sql/threadpool_win.cc')
-rw-r--r-- | sql/threadpool_win.cc | 756 |
1 files changed, 756 insertions, 0 deletions
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc new file mode 100644 index 00000000000..4e46e393c24 --- /dev/null +++ b/sql/threadpool_win.cc @@ -0,0 +1,756 @@ +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT +#endif + +#define _WIN32_WINNT 0x0601 + +#include <my_global.h> +#include <violite.h> +#include <sql_priv.h> +#include <sql_class.h> +#include <my_pthread.h> +#include <scheduler.h> +#include <sql_connect.h> +#include <mysqld.h> +#include <debug_sync.h> +#include <threadpool.h> +#include <windows.h> + + + +TP_STATISTICS tp_stats; + +#define WEAK_SYMBOL(return_type, function, ...) \ + typedef return_type (WINAPI *pFN_##function)(__VA_ARGS__); \ + static pFN_##function my_##function = (pFN_##function) \ + (GetProcAddress(GetModuleHandle("kernel32"),#function)) + +WEAK_SYMBOL(VOID, CancelThreadpoolIo, PTP_IO); +#define CancelThreadpoolIo my_CancelThreadpoolIo + +WEAK_SYMBOL(VOID, CloseThreadpool, PTP_POOL); +#define CloseThreadpool my_CloseThreadpool + +WEAK_SYMBOL(VOID, CloseThreadpoolIo, PTP_IO); +#define CloseThreadpoolIo my_CloseThreadpoolIo + +WEAK_SYMBOL(VOID, CloseThreadpoolTimer,PTP_TIMER); +#define CloseThreadpoolTimer my_CloseThreadpoolTimer + +WEAK_SYMBOL(VOID, CloseThreadpoolWait,PTP_WAIT); +#define CloseThreadpoolWait my_CloseThreadpoolWait + +WEAK_SYMBOL(PTP_POOL, CreateThreadpool,PVOID); +#define CreateThreadpool my_CreateThreadpool + +WEAK_SYMBOL(PTP_IO, CreateThreadpoolIo, HANDLE, PTP_WIN32_IO_CALLBACK, PVOID , + PTP_CALLBACK_ENVIRON); +#define CreateThreadpoolIo my_CreateThreadpoolIo + +WEAK_SYMBOL(PTP_TIMER, CreateThreadpoolTimer, PTP_TIMER_CALLBACK , + PVOID pv, PTP_CALLBACK_ENVIRON pcbe); +#define CreateThreadpoolTimer my_CreateThreadpoolTimer + +WEAK_SYMBOL(PTP_WAIT, CreateThreadpoolWait, PTP_WAIT_CALLBACK, PVOID, + PTP_CALLBACK_ENVIRON); +#define CreateThreadpoolWait my_CreateThreadpoolWait + +WEAK_SYMBOL(VOID, DisassociateCurrentThreadFromCallback, PTP_CALLBACK_INSTANCE); +#define DisassociateCurrentThreadFromCallback my_DisassociateCurrentThreadFromCallback + +WEAK_SYMBOL(DWORD, FlsAlloc, PFLS_CALLBACK_FUNCTION); +#define FlsAlloc my_FlsAlloc + +WEAK_SYMBOL(PVOID, FlsGetValue, DWORD); +#define FlsGetValue my_FlsGetValue + +WEAK_SYMBOL(BOOL, FlsSetValue, DWORD, PVOID); +#define FlsSetValue my_FlsSetValue + +WEAK_SYMBOL(VOID, SetThreadpoolThreadMaximum, PTP_POOL, DWORD); +#define SetThreadpoolThreadMaximum my_SetThreadpoolThreadMaximum + +WEAK_SYMBOL(BOOL, SetThreadpoolThreadMinimum, PTP_POOL, DWORD); +#define SetThreadpoolThreadMinimum my_SetThreadpoolThreadMinimum + +WEAK_SYMBOL(VOID, SetThreadpoolTimer, PTP_TIMER, PFILETIME,DWORD,DWORD); +#define SetThreadpoolTimer my_SetThreadpoolTimer + +WEAK_SYMBOL(VOID, SetThreadpoolWait, PTP_WAIT,HANDLE,PFILETIME); +#define SetThreadpoolWait my_SetThreadpoolWait + +WEAK_SYMBOL(VOID, StartThreadpoolIo, PTP_IO); +#define StartThreadpoolIo my_StartThreadpoolIo + +WEAK_SYMBOL(VOID, WaitForThreadpoolIoCallbacks,PTP_IO, BOOL); +#define WaitForThreadpoolIoCallbacks my_WaitForThreadpoolIoCallbacks + +WEAK_SYMBOL(VOID, WaitForThreadpoolTimerCallbacks, PTP_TIMER, BOOL); +#define WaitForThreadpoolTimerCallbacks my_WaitForThreadpoolTimerCallbacks + +WEAK_SYMBOL(VOID, WaitForThreadpoolWaitCallbacks, PTP_WAIT, BOOL); +#define WaitForThreadpoolWaitCallbacks my_WaitForThreadpoolWaitCallbacks + +WEAK_SYMBOL(BOOL, SetFileCompletionNotificationModes, HANDLE, UCHAR); +#define SetFileCompletionNotificationModes my_SetFileCompletionNotificationModes + +WEAK_SYMBOL(BOOL, TrySubmitThreadpoolCallback, PTP_SIMPLE_CALLBACK pfns, + PVOID pv,PTP_CALLBACK_ENVIRON pcbe); +#define TrySubmitThreadpoolCallback my_TrySubmitThreadpoolCallback + +WEAK_SYMBOL(PTP_WORK, CreateThreadpoolWork, PTP_WORK_CALLBACK pfnwk, PVOID pv, + PTP_CALLBACK_ENVIRON pcbe); +#define CreateThreadpoolWork my_CreateThreadpoolWork + +WEAK_SYMBOL(VOID, SubmitThreadpoolWork,PTP_WORK pwk); +#define SubmitThreadpoolWork my_SubmitThreadpoolWork + +WEAK_SYMBOL(VOID, CloseThreadpoolWork, PTP_WORK pwk); +#define CloseThreadpoolWork my_CloseThreadpoolWork + +#if _MSC_VER >= 1600 +/* Stack size manipulation available only on Win7+ /declarations in VS10 */ +WEAK_SYMBOL(BOOL, SetThreadpoolStackInformation, PTP_POOL, + PTP_POOL_STACK_INFORMATION); +#define SetThreadpoolStackInformation my_SetThreadpoolStackInformation +#endif + +#if _MSC_VER < 1600 +#define SetThreadpoolCallbackPriority(env,prio) +typedef enum _TP_CALLBACK_PRIORITY { + TP_CALLBACK_PRIORITY_HIGH, + TP_CALLBACK_PRIORITY_NORMAL, + TP_CALLBACK_PRIORITY_LOW, + TP_CALLBACK_PRIORITY_INVALID +} TP_CALLBACK_PRIORITY; +#endif + + +/* Log a warning */ +static void tp_log_warning(const char *msg, const char *fct) +{ + sql_print_warning("Threadpool: %s. %s failed (last error %d)",msg, fct, + GetLastError()); +} + + +PTP_POOL pool; +DWORD fls; +extern int skip_net_wait_timeout; + +static bool skip_completion_port_on_success = false; + +/* + Threadpool callbacks. + + io_completion_callback - handle client request + timer_callback - handle wait timeout (kill connection) + shm_read_callback, shm_close_callback - shared memory stuff + login_callback - user login (submitted as threadpool work) + +*/ + +static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PTP_TIMER timer); + +static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io); + +static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, + PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); + +static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance, + PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); + +#define CONNECTION_SIGNATURE 0xAFFEAFFE + +static void check_thread_init(); + +/* Get current time as Windows time */ +static ulonglong now() +{ + ulonglong current_time; + GetSystemTimeAsFileTime((PFILETIME)¤t_time); + return current_time; +} + +/* + Connection structure, encapsulates THD + structures for asynchronous + IO and pool. +*/ + +struct connection_t +{ + THD *thd; + bool logged_in; + HANDLE handle; + OVERLAPPED overlapped; + + /* absolute time for wait timeout (as Windows time) */ + volatile ulonglong timeout; + + PTP_CLEANUP_GROUP cleanup_group; + TP_CALLBACK_ENVIRON callback_environ; + + PTP_IO io; + PTP_TIMER timer; + PTP_WAIT shm_read; +}; + +void init_connection(connection_t *connection) +{ + connection->logged_in = false; + connection->handle= 0; + connection->io= 0; + connection->shm_read= 0; + connection->timer= 0; + connection->logged_in = false; + connection->timeout= ULONGLONG_MAX; + memset(&connection->overlapped, 0, sizeof(OVERLAPPED)); + InitializeThreadpoolEnvironment(&connection->callback_environ); + SetThreadpoolCallbackPool(&connection->callback_environ, pool); + connection->thd = 0; +} + +int init_io(connection_t *connection, THD *thd) +{ + connection->thd= thd; + Vio *vio = thd->net.vio; + switch(vio->type) + { + case VIO_TYPE_SSL: + case VIO_TYPE_TCPIP: + connection->handle= (HANDLE)vio->sd; + break; + case VIO_TYPE_NAMEDPIPE: + connection->handle= (HANDLE)vio->hPipe; + break; + case VIO_TYPE_SHARED_MEMORY: + connection->shm_read= CreateThreadpoolWait(shm_read_callback, connection, + &connection->callback_environ); + if (!connection->shm_read) + { + tp_log_warning("Allocation failed", "CreateThreadpoolWait"); + return -1; + } + break; + default: + abort(); + } + + if (connection->handle) + { + /* Performance tweaks (s. MSDN documentation)*/ + UCHAR flags = FILE_SKIP_SET_EVENT_ON_HANDLE; + if (skip_completion_port_on_success) + { + flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; + } + (void)SetFileCompletionNotificationModes(connection->handle, flags); + + /* Assign io completion callback */ + connection->io = CreateThreadpoolIo(connection->handle, + io_completion_callback, connection, &connection->callback_environ); + if(!connection->io) + { + tp_log_warning("Allocation failed", "CreateThreadpoolWait"); + return -1; + } + } + connection->timer = CreateThreadpoolTimer(timer_callback, connection, + &connection->callback_environ); + if (!connection->timer) + { + tp_log_warning("Allocation failed", "CreateThreadpoolWait"); + return -1; + } + + return 0; +} + + +/* + Start asynchronous read +*/ +int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) +{ + /* Start async read */ + DWORD num_bytes = 0; + static char c; + WSABUF buf; + buf.buf= &c; + buf.len= 0; + DWORD flags=0; + DWORD last_error= 0; + + int retval; + Vio *vio= connection->thd->net.vio; + + if (vio->type == VIO_TYPE_SHARED_MEMORY) + { + SetThreadpoolWait(connection->shm_read, vio->event_server_wrote, NULL); + return 0; + } + if (vio->type == VIO_CLOSED) + { + return -1; + } + + DBUG_ASSERT(vio->type == VIO_TYPE_TCPIP || + vio->type == VIO_TYPE_SSL || + vio->type == VIO_TYPE_NAMEDPIPE); + + OVERLAPPED *overlapped= &connection->overlapped; + PTP_IO io= connection->io; + StartThreadpoolIo(io); + + if (vio->type == VIO_TYPE_TCPIP || vio->type == VIO_TYPE_SSL) + { + /* Start async io (sockets). */ + if (WSARecv(vio->sd , &buf, 1, &num_bytes, &flags, + overlapped, NULL) == 0) + { + retval= last_error= 0; + } + else + { + retval= -1; + last_error= WSAGetLastError(); + } + } + else + { + /* Start async io (named pipe) */ + if (ReadFile(vio->hPipe, &c, 0, &num_bytes ,overlapped)) + { + retval= last_error= 0; + } + else + { + retval= -1; + last_error= GetLastError(); + } + } + + if (retval == 0 || last_error == ERROR_MORE_DATA) + { + /* + IO successfully finished (synchronously). + If skip_completion_port_on_success is set, we need to handle it right + here, because completion callback would not be executed by the pool. + */ + if(skip_completion_port_on_success) + { + CancelThreadpoolIo(io); + io_completion_callback(instance, connection, overlapped, last_error, + num_bytes, io); + } + return 0; + } + + if(last_error == ERROR_IO_PENDING) + { + return 0; + } + + /* Some error occured */ + CancelThreadpoolIo(io); + return -1; +} + +int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance) +{ + if (threadpool_add_connection(connection->thd) == 0 + && init_io(connection, connection->thd) == 0 + && start_io(connection, instance) == 0) + { + return 0; + } + return -1; +} + +/* + Recalculate wait timeout, maybe reset timer. +*/ +void set_wait_timeout(connection_t *connection, ulonglong old_timeout) +{ + ulonglong new_timeout = now() + + 10000000LL*connection->thd->variables.net_wait_timeout; + + if (new_timeout < old_timeout) + { + SetThreadpoolTimer(connection->timer, (PFILETIME) &new_timeout, 0, 1000); + } + connection->timeout = new_timeout; +} + +/* + Terminates (idle) connection by closing the socket. + This will activate io_completion_callback() in a different thread +*/ +void post_kill_notification(connection_t *connection) +{ + check_thread_init(); + THD *thd=connection->thd; + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed = KILL_CONNECTION; + vio_shutdown(thd->net.vio, SHUT_RDWR); + thd->mysys_var= NULL; + mysql_mutex_unlock(&thd->LOCK_thd_data); +} + + +/* Connection destructor */ +void destroy_connection(connection_t *connection) +{ + if (connection->thd) + { + threadpool_remove_connection(connection->thd); + } + + if (connection->io) + { + WaitForThreadpoolIoCallbacks(connection->io, TRUE); + CloseThreadpoolIo(connection->io); + } + + if(connection->shm_read) + { + WaitForThreadpoolWaitCallbacks(connection->shm_read, TRUE); + CloseThreadpoolWait(connection->shm_read); + } + + if(connection->timer) + { + SetThreadpoolTimer(connection->timer, 0, 0, 0); + WaitForThreadpoolTimerCallbacks(connection->timer, TRUE); + CloseThreadpoolTimer(connection->timer); + } + + DestroyThreadpoolEnvironment(&connection->callback_environ); +} + + + +/* + This function should be called first whenever a callback is invoked in the + threadpool, does my_thread_init() if not yet done +*/ +extern ulong thread_created; +static void check_thread_init() +{ + if (FlsGetValue(fls) == NULL) + { + FlsSetValue(fls, (void *)1); + my_thread_init(); + thread_created++; + InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); + } +} + + +/* + Take care of proper cleanup when threadpool threads exit. + We do not control how threads are created, thus it is our responsibility to + check that my_thread_init() is called on thread initialization and + my_thread_end() on thread destruction. On Windows, FlsAlloc() provides the + thread destruction callbacks. +*/ +static VOID WINAPI thread_destructor(void *data) +{ + if(data) + { + if (InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads) >= 0) + { + /* + The above check for number of thread >= 0 is due to shutdown code ( + see tp_end()) where we forcefully set num_worker_threads to 0, even + if not all threads have shut down yet to the point they would ran Fls + destructors, even after CloseThreadpool(). See also comment in tp_end(). + */ + mysql_mutex_lock(&LOCK_thread_count); + my_thread_end(); + mysql_mutex_unlock(&LOCK_thread_count); + } + } +} + + +/* Scheduler callback : init */ +bool tp_init(void) +{ + fls= FlsAlloc(thread_destructor); + pool= CreateThreadpool(NULL); + if(!pool) + { + sql_print_error("Can't create threadpool. " + "CreateThreadpool() failed with %d. Likely cause is memory pressure", + GetLastError()); + exit(1); + } + + if (threadpool_max_threads) + { + SetThreadpoolThreadMaximum(pool,threadpool_max_threads); + } + + if (threadpool_min_threads) + { + if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads)) + { + tp_log_warning( "Can't set threadpool minimum threads", + "SetThreadpoolThreadMinimum"); + } + } + + /* + Control stack size (OS must be Win7 or later, plus corresponding SDK) + */ +#if _MSC_VER >=1600 + if (SetThreadpoolStackInformation) + { + TP_POOL_STACK_INFORMATION stackinfo; + stackinfo.StackCommit = 0; + stackinfo.StackReserve = my_thread_stack_size; + if (!SetThreadpoolStackInformation(pool, &stackinfo)) + { + tp_log_warning("Can't set threadpool stack size", + "SetThreadpoolStackInformation"); + } + } +#endif + + skip_net_wait_timeout = 1; + return 0; +} + + +/* + Scheduler callback : Destroy the scheduler. +*/ + +extern "C" uint THR_thread_count; +extern "C" mysql_mutex_t THR_LOCK_threads; +extern "C" mysql_cond_t THR_COND_threads; + +void tp_end(void) +{ + if(pool) + { + SetThreadpoolThreadMaximum(pool, 0); + CloseThreadpool(pool); + + /* + Tell my_global_thread_end() we're complete. + + This would not be necessary if CloseThreadpool() would synchronously + release all threads and wait until they disappear and call all their FLS + destrructors . However, threads in the pool are released asynchronously + and might spend some time in the CRT shutdown code. Thus zero + num_worker_threads, to avoid thread destructor's my_thread_end()s after + this point. + */ + LONG remaining_threads= + InterlockedExchange( (volatile long *)&tp_stats.num_worker_threads, 0); + + if (remaining_threads) + { + mysql_mutex_lock(&THR_LOCK_threads); + THR_thread_count -= remaining_threads; + mysql_cond_signal(&THR_COND_threads); + mysql_mutex_unlock(&THR_LOCK_threads); + } + } + skip_net_wait_timeout= 0; +} + +/* + Notify pool about connection being killed. +*/ +void tp_post_kill_notification(THD *thd) +{ + if (current_thd == thd) + return; /* There is nothing to do.*/ + + if (thd->system_thread) + return; /* Will crash if we attempt to kill system thread. */ + + Vio *vio= thd->net.vio; + + vio_shutdown(vio, SD_BOTH); + +} + +/* + Handle read completion/notification. +*/ +static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io) +{ + if(instance) + { + check_thread_init(); + } + + connection_t *connection = (connection_t*)context; + THD *thd= connection->thd; + ulonglong old_timeout = connection->timeout; + connection->timeout = ULONGLONG_MAX; + + if (threadpool_process_request(connection->thd)) + goto error; + + set_wait_timeout(connection, old_timeout); + if(start_io(connection, instance)) + goto error; + + return; + +error: + /* Some error has occured. */ + if (instance) + DisassociateCurrentThreadFromCallback(instance); + + destroy_connection(connection); + my_free(connection); +} + + +/* Simple callback for login */ +static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PTP_WORK work) +{ + if(instance) + { + check_thread_init(); + } + + connection_t *connection =(connection_t *)context; + if (login(connection, instance) != 0) + { + destroy_connection(connection); + my_free(connection); + } +} + +/* + Timer callback. + Invoked when connection times out (wait_timeout) +*/ +static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, + PVOID parameter, PTP_TIMER timer) +{ + check_thread_init(); + + connection_t *con= (connection_t*)parameter; + ulonglong timeout= con->timeout; + + if (timeout <= now()) + { + con->thd->killed = KILL_CONNECTION; + if(con->thd->net.vio) + vio_shutdown(con->thd->net.vio, SD_BOTH); + } + else if(timeout != ULONGLONG_MAX) + { + /* + Reset timer. + There is a tiny possibility of a race condition, since the value of timeout + could have changed to smaller value in the thread doing io callback. + + Given the relative unimportance of the wait timeout, we accept race + condition. + */ + SetThreadpoolTimer(timer, (PFILETIME)&timeout, 0, 1000); + } +} + + +/* + Shared memory read callback. + Invoked when read event is set on connection. +*/ +static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result) +{ + connection_t *con= (connection_t *)context; + /* Disarm wait. */ + SetThreadpoolWait(wait, NULL, NULL); + + /* + This is an autoreset event, and one wakeup is eaten already by threadpool, + and the current state is "not set". Thus we need to reset the event again, + or vio_read will hang. + */ + HANDLE h = con->thd->net.vio->event_server_wrote; + SetEvent(h); + io_completion_callback(instance, context, NULL, 0, 0 , 0); +} + + +/* + Notify the thread pool about a new connection. + NOTE: LOCK_thread_count is locked on entry. This function must unlock it. +*/ +void tp_add_connection(THD *thd) +{ + bool success = false; + connection_t *con = (connection_t *)my_malloc(sizeof(connection_t), 0); + + if (con) + threads.append(thd); + mysql_mutex_unlock(&LOCK_thread_count); + + if(!con) + { + tp_log_warning("Allocation failed", "tp_add_connection"); + return; + } + + init_connection(con); + con->thd= thd; + /* Try to login asynchronously, using threads in the pool */ + PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ); + if (wrk) + { + SubmitThreadpoolWork(wrk); + CloseThreadpoolWork(wrk); + } + else + { + /* Likely memory pressure */ + login_callback(NULL, con, NULL); /* deletes connection if something goes wrong */ + } +} + + + +/* + Sets the number of idle threads the thread pool maintains in anticipation of new + requests. +*/ +void tp_set_min_threads(uint val) +{ + if (pool) + SetThreadpoolThreadMinimum(pool, val); +} + +void tp_set_max_threads(uint val) +{ + if (pool) + SetThreadpoolThreadMaximum(pool, val); +} + +void tp_wait_begin(THD *thd, int type) +{ + if (thd && thd->event_scheduler.data) + { + /* TODO: call CallbackMayRunLong() */ + } +} + +void tp_wait_end(THD *thd) +{ + /* Do we need to do anything ? */ +} + |