diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2016-09-21 14:28:42 +0000 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2016-09-22 17:01:28 +0000 |
commit | f7a7c0c2fec3dcca331bb529f8314273360c72ae (patch) | |
tree | 2e04f4036bd7def676d85690e67e393ec0c41a8e /sql/threadpool_win.cc | |
parent | f32a5115584c9b33a2163df57830ad335cd2b3ab (diff) | |
download | mariadb-git-f7a7c0c2fec3dcca331bb529f8314273360c72ae.tar.gz |
MDEV-10297 Add priorization to threadpool
Also MDEV-10385 Threadpool refactoring
Diffstat (limited to 'sql/threadpool_win.cc')
-rw-r--r-- | sql/threadpool_win.cc | 550 |
1 files changed, 229 insertions, 321 deletions
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index 9b1d8f6a7d8..dec898d92bb 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -64,8 +64,9 @@ static void tp_log_warning(const char *msg, const char *fct) } -PTP_POOL pool; -DWORD fls; +static PTP_POOL pool; +static TP_CALLBACK_ENVIRON callback_environ; +static DWORD fls; static bool skip_completion_port_on_success = false; @@ -85,13 +86,16 @@ static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io); + +static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work); + static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance, PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); -static void check_thread_init(); +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance); /* Get current time as Windows time */ static ulonglong now() @@ -101,74 +105,86 @@ static ulonglong now() return current_time; } -/* - Connection structure, encapsulates THD + structures for asynchronous - IO and pool. -*/ - -struct connection_t +struct TP_connection_win:public TP_connection { - THD *thd; +public: + TP_connection_win(CONNECT*); + ~TP_connection_win(); + virtual int init(); + virtual int start_io(); + virtual void set_io_timeout(int sec); + virtual void wait_begin(int type); + virtual void wait_end(); + + ulonglong timeout; + enum_vio_type vio_type; HANDLE handle; OVERLAPPED overlapped; - /* absolute time for wait timeout (as Windows time) */ - volatile ulonglong timeout; - TP_CALLBACK_ENVIRON callback_environ; + PTP_CALLBACK_INSTANCE callback_instance; PTP_IO io; PTP_TIMER timer; PTP_WAIT shm_read; - /* Callback instance, used to inform treadpool about long callbacks */ - PTP_CALLBACK_INSTANCE callback_instance; - CONNECT* connect; - bool logged_in; + PTP_WORK work; + bool long_callback; + }; +struct TP_connection *new_TP_connection(CONNECT *connect) +{ + TP_connection *c = new (std::nothrow) TP_connection_win(connect); + if (!c || c->init()) + { + delete c; + return 0; + } + return c; +} -void init_connection(connection_t *connection, CONNECT *connect) +void TP_pool_win::add(TP_connection *c) { - connection->logged_in = false; - connection->handle= 0; - connection->io= 0; - connection->shm_read= 0; - connection->timer= 0; - connection->logged_in = false; - connection->timeout= ULONGLONG_MAX; - connection->callback_instance= 0; - connection->thd= 0; - memset(&connection->overlapped, 0, sizeof(OVERLAPPED)); - InitializeThreadpoolEnvironment(&connection->callback_environ); - SetThreadpoolCallbackPool(&connection->callback_environ, pool); - connection->connect= connect; + SubmitThreadpoolWork(((TP_connection_win *)c)->work); } -int init_io(connection_t *connection, THD *thd) +TP_connection_win::TP_connection_win(CONNECT *c) : + TP_connection(c), + timeout(ULONGLONG_MAX), + callback_instance(0), + io(0), + shm_read(0), + timer(0), + work(0) { - connection->thd= thd; - Vio *vio = thd->net.vio; - switch(vio->type) +} + +#define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; } + +int TP_connection_win::init() +{ + + memset(&overlapped, 0, sizeof(OVERLAPPED)); + Vio *vio = connect->vio; + switch ((vio_type = vio->type)) { - case VIO_TYPE_SSL: - case VIO_TYPE_TCPIP: - connection->handle= (HANDLE)mysql_socket_getfd(connection->thd->net.vio->mysql_socket); - break; - case VIO_TYPE_NAMEDPIPE: - connection->handle= (HANDLE)vio->hPipe; - break; - case VIO_TYPE_SHARED_MEMORY: - connection->shm_read= CreateThreadpoolWait(shm_read_callback, connection, - &connection->callback_environ); - if (!connection->shm_read) - { - tp_log_warning("Allocation failed", "CreateThreadpoolWait"); - return -1; - } - break; - default: - abort(); + case VIO_TYPE_SSL: + case VIO_TYPE_TCPIP: + handle= (HANDLE)mysql_socket_getfd(vio->mysql_socket); + break; + case VIO_TYPE_NAMEDPIPE: + handle= (HANDLE)vio->hPipe; + break; + case VIO_TYPE_SHARED_MEMORY: + handle= vio->event_server_wrote; + break; + default: + abort(); } - if (connection->handle) + if (vio_type == VIO_TYPE_SHARED_MEMORY) + { + CHECK_ALLOC_ERROR(shm_read= CreateThreadpoolWait(shm_read_callback, this, &callback_environ)); + } + else { /* Performance tweaks (s. MSDN documentation)*/ UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE; @@ -176,25 +192,13 @@ int init_io(connection_t *connection, THD *thd) { flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; } - (void)SetFileCompletionNotificationModes(connection->handle, flags); - + (void)SetFileCompletionNotificationModes(handle, flags); /* Assign io completion callback */ - connection->io= CreateThreadpoolIo(connection->handle, - io_completion_callback, connection, &connection->callback_environ); - if(!connection->io) - { - tp_log_warning("Allocation failed", "CreateThreadpoolWait"); - return -1; - } - } - connection->timer= CreateThreadpoolTimer(timer_callback, connection, - &connection->callback_environ); - if (!connection->timer) - { - tp_log_warning("Allocation failed", "CreateThreadpoolWait"); - return -1; + CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ)); } + CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ)); return 0; } @@ -202,9 +206,8 @@ int init_io(connection_t *connection, THD *thd) /* Start asynchronous read */ -int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) +int TP_connection_win::start_io() { - /* Start async read */ DWORD num_bytes = 0; static char c; WSABUF buf; @@ -214,33 +217,20 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) DWORD last_error= 0; int retval; - Vio *vio= connection->thd->net.vio; - - if (vio->type == VIO_TYPE_SHARED_MEMORY) + if (shm_read) { - SetThreadpoolWait(connection->shm_read, vio->event_server_wrote, NULL); - return 0; - } - if (vio->type == VIO_CLOSED) - { - return -1; + SetThreadpoolWait(shm_read, handle, NULL); + return 0; } - - DBUG_ASSERT(vio->type == VIO_TYPE_TCPIP || - vio->type == VIO_TYPE_SSL || - vio->type == VIO_TYPE_NAMEDPIPE); - - OVERLAPPED *overlapped= &connection->overlapped; - PTP_IO io= connection->io; StartThreadpoolIo(io); - if (vio->type == VIO_TYPE_TCPIP || vio->type == VIO_TYPE_SSL) + if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL) { /* Start async io (sockets). */ - if (WSARecv(mysql_socket_getfd(vio->mysql_socket) , &buf, 1, &num_bytes, &flags, - overlapped, NULL) == 0) + if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags, + &overlapped, NULL) == 0) { - retval= last_error= 0; + retval= last_error= 0; } else { @@ -251,7 +241,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) else { /* Start async io (named pipe) */ - if (ReadFile(vio->hPipe, &c, 0, &num_bytes ,overlapped)) + if (ReadFile(handle, &c, 0, &num_bytes,&overlapped)) { retval= last_error= 0; } @@ -272,7 +262,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) if(skip_completion_port_on_success) { CancelThreadpoolIo(io); - io_completion_callback(instance, connection, overlapped, last_error, + io_completion_callback(callback_instance, this, &overlapped, last_error, num_bytes, io); } return 0; @@ -288,81 +278,81 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) return -1; } - -int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance) -{ - if ((connection->thd= threadpool_add_connection(connection->connect, connection)) - && init_io(connection, connection->thd) == 0 - && start_io(connection, instance) == 0) - { - return 0; - } - return -1; -} - /* - Recalculate wait timeout, maybe reset timer. + Recalculate wait timeout, maybe reset timer. */ -void set_wait_timeout(connection_t *connection, ulonglong old_timeout) +void TP_connection_win::set_io_timeout(int timeout_sec) { - ulonglong new_timeout = now() + - 10000000LL*connection->thd->variables.net_wait_timeout; + ulonglong old_timeout= timeout; + ulonglong new_timeout = now() + 10000000LL * timeout_sec; if (new_timeout < old_timeout) { - SetThreadpoolTimer(connection->timer, (PFILETIME) &new_timeout, 0, 1000); + SetThreadpoolTimer(timer, (PFILETIME)&new_timeout, 0, 1000); } - connection->timeout = new_timeout; + /* new_timeout > old_timeout case is handled by expiring timer. */ + timeout = new_timeout; } -/* Connection destructor */ -void destroy_connection(connection_t *connection, PTP_CALLBACK_INSTANCE instance) +TP_connection_win::~TP_connection_win() { - if (instance) - DisassociateCurrentThreadFromCallback(instance); - if (connection->io) - { - WaitForThreadpoolIoCallbacks(connection->io, TRUE); - CloseThreadpoolIo(connection->io); - } + if (io) + CloseThreadpoolIo(io); - if(connection->shm_read) - { - WaitForThreadpoolWaitCallbacks(connection->shm_read, TRUE); - CloseThreadpoolWait(connection->shm_read); - } + if (shm_read) + CloseThreadpoolWait(shm_read); + + if (work) + CloseThreadpoolWork(work); - if(connection->timer) + if (timer) { - SetThreadpoolTimer(connection->timer, 0, 0, 0); - WaitForThreadpoolTimerCallbacks(connection->timer, TRUE); - CloseThreadpoolTimer(connection->timer); + WaitForThreadpoolTimerCallbacks(timer, TRUE); + CloseThreadpoolTimer(timer); } - - if (connection->thd) +} + +void TP_connection_win::wait_begin(int type) +{ + + /* + Signal to the threadpool whenever callback can run long. Currently, binlog + waits are a good candidate, its waits are really long + */ + if (type == THD_WAIT_BINLOG) { - threadpool_remove_connection(connection->thd); + if (!long_callback) + { + CallbackMayRunLong(callback_instance); + long_callback= true; + } } - - DestroyThreadpoolEnvironment(&connection->callback_environ); } - +void TP_connection_win::wait_end() +{ + /* Do we need to do anything ? */ +} /* This function should be called first whenever a callback is invoked in the threadpool, does my_thread_init() if not yet done */ extern ulong thread_created; -static void check_thread_init() +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance) { if (FlsGetValue(fls) == NULL) { + /* Running in new worker thread*/ FlsSetValue(fls, (void *)1); statistic_increment(thread_created, &LOCK_status); InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); + my_thread_init(); } + TP_connection_win *c = (TP_connection_win *)context; + c->callback_instance = instance; + c->long_callback = false; } @@ -375,153 +365,61 @@ static VOID WINAPI thread_destructor(void *data) if(data) { InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads); + my_thread_end(); } } -/* Scheduler callback : init */ -bool tp_init(void) -{ - fls= FlsAlloc(thread_destructor); - pool= CreateThreadpool(NULL); - if(!pool) - { - sql_print_error("Can't create threadpool. " - "CreateThreadpool() failed with %d. Likely cause is memory pressure", - GetLastError()); - exit(1); - } - - if (threadpool_max_threads) - { - SetThreadpoolThreadMaximum(pool,threadpool_max_threads); - } - - if (threadpool_min_threads) - { - if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads)) - { - tp_log_warning( "Can't set threadpool minimum threads", - "SetThreadpoolThreadMinimum"); - } - } - - /* - Control stack size (OS must be Win7 or later, plus corresponding SDK) - */ -#if _MSC_VER >=1600 - if (SetThreadpoolStackInformation) - { - TP_POOL_STACK_INFORMATION stackinfo; - stackinfo.StackCommit = 0; - stackinfo.StackReserve = (SIZE_T)my_thread_stack_size; - if (!SetThreadpoolStackInformation(pool, &stackinfo)) - { - tp_log_warning("Can't set threadpool stack size", - "SetThreadpoolStackInformation"); - } - } -#endif - - return 0; -} - -/** - Scheduler callback : Destroy the scheduler. -*/ -void tp_end(void) +static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context) { - if(pool) - { - SetThreadpoolThreadMaximum(pool, 0); - CloseThreadpool(pool); - } + pre_callback(context, instance); + tp_callback((TP_connection *)context); } + /* Handle read completion/notification. */ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io) { - if(instance) - { - check_thread_init(); - } - - connection_t *connection = (connection_t*)context; - - if (io_result != ERROR_SUCCESS) - goto error; - - THD *thd= connection->thd; - ulonglong old_timeout = connection->timeout; - connection->timeout = ULONGLONG_MAX; - connection->callback_instance= instance; - if (threadpool_process_request(connection->thd)) - goto error; - - set_wait_timeout(connection, old_timeout); - if(start_io(connection, instance)) - goto error; - - return; - -error: - /* Some error has occurred. */ - - destroy_connection(connection, instance); - free(connection); + TP_connection_win *c= (TP_connection_win *)context; + /* + Execute high priority connections immediately. + 'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback) + which makes Windows threadpool place the items at the end of its internal work queue. + */ + if (c->priority == TP_PRIORITY_HIGH) + tp_callback(instance, context); + else + SubmitThreadpoolWork(c->work); } -/* Simple callback for login */ -static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance, - PVOID context, PTP_WORK work) -{ - if(instance) - { - check_thread_init(); - } - - connection_t *connection =(connection_t *)context; - if (login(connection, instance) != 0) - { - destroy_connection(connection, instance); - free(connection); - } -} - /* Timer callback. Invoked when connection times out (wait_timeout) */ -static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, +static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, PVOID parameter, PTP_TIMER timer) { - check_thread_init(); - - connection_t *con= (connection_t*)parameter; - ulonglong timeout= con->timeout; - - if (timeout <= now()) + TP_connection_win *c = (TP_connection_win *)parameter; + if (c->timeout <= now()) { - con->thd->killed = KILL_CONNECTION; - if(con->thd->net.vio) - vio_shutdown(con->thd->net.vio, SD_BOTH); + tp_timeout_handler(c); } - else if(timeout != ULONGLONG_MAX) + else { - /* - Reset timer. - There is a tiny possibility of a race condition, since the value of timeout - could have changed to smaller value in the thread doing io callback. + /* + Reset timer. + There is a tiny possibility of a race condition, since the value of timeout + could have changed to smaller value in the thread doing io callback. - Given the relative unimportance of the wait timeout, we accept race + Given the relative unimportance of the wait timeout, we accept race condition. - */ - SetThreadpoolTimer(timer, (PFILETIME)&timeout, 0, 1000); + */ + SetThreadpoolTimer(timer, (PFILETIME)&c->timeout, 0, 1000); } } @@ -530,10 +428,11 @@ static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, Shared memory read callback. Invoked when read event is set on connection. */ + static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result) { - connection_t *con= (connection_t *)context; + TP_connection_win *c= (TP_connection_win *)context; /* Disarm wait. */ SetThreadpoolWait(wait, NULL, NULL); @@ -542,97 +441,106 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, and the current state is "not set". Thus we need to reset the event again, or vio_read will hang. */ - HANDLE h = con->thd->net.vio->event_server_wrote; - SetEvent(h); - io_completion_callback(instance, context, NULL, 0, 0 , 0); + SetEvent(c->handle); + tp_callback(instance, context); } -/* - Notify the thread pool about a new connection. -*/ +static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work) +{ + tp_callback(instance, context); +} + +TP_pool_win::TP_pool_win() +{} -void tp_add_connection(CONNECT *connect) +int TP_pool_win::init() { - connection_t *con; - con= (connection_t *)malloc(sizeof(connection_t)); - DBUG_EXECUTE_IF("simulate_failed_connection_1", free(con);con= 0; ); - if (!con) + fls= FlsAlloc(thread_destructor); + pool= CreateThreadpool(NULL); + + if (!pool) { - tp_log_warning("Allocation failed", "tp_add_connection"); - connect->close_and_delete(); - return; + sql_print_error("Can't create threadpool. " + "CreateThreadpool() failed with %d. Likely cause is memory pressure", + GetLastError()); + return -1; } - init_connection(con, connect); + InitializeThreadpoolEnvironment(&callback_environ); + SetThreadpoolCallbackPool(&callback_environ, pool); - /* Try to login asynchronously, using threads in the pool */ - PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ); - if (wrk) + if (threadpool_max_threads) { - SubmitThreadpoolWork(wrk); - CloseThreadpoolWork(wrk); + SetThreadpoolThreadMaximum(pool, threadpool_max_threads); } - else + + if (threadpool_min_threads) { - /* Likely memory pressure */ - connect->close_and_delete(); + if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads)) + { + tp_log_warning("Can't set threadpool minimum threads", + "SetThreadpoolThreadMinimum"); + } } -} - - -/** - Sets the number of idle threads the thread pool maintains in anticipation of new - requests. -*/ -void tp_set_min_threads(uint val) -{ - if (pool) - SetThreadpoolThreadMinimum(pool, val); -} - -void tp_set_max_threads(uint val) -{ - if (pool) - SetThreadpoolThreadMaximum(pool, val); -} - -void tp_wait_begin(THD *thd, int type) -{ - DBUG_ASSERT(thd); /* - Signal to the threadpool whenever callback can run long. Currently, binlog - waits are a good candidate, its waits are really long + Control stack size (OS must be Win7 or later) */ - if (type == THD_WAIT_BINLOG) + if (SetThreadpoolStackInformation) { - connection_t *connection= (connection_t *)thd->event_scheduler.data; - if(connection && connection->callback_instance) + TP_POOL_STACK_INFORMATION stackinfo; + stackinfo.StackCommit = 0; + stackinfo.StackReserve = (SIZE_T)my_thread_stack_size; + if (!SetThreadpoolStackInformation(pool, &stackinfo)) { - CallbackMayRunLong(connection->callback_instance); - /* - Reset instance, to avoid calling CallbackMayRunLong twice within - the same callback (it is an error according to docs). - */ - connection->callback_instance= 0; + tp_log_warning("Can't set threadpool stack size", + "SetThreadpoolStackInformation"); } } + return 0; } -void tp_wait_end(THD *thd) + +/** + Scheduler callback : Destroy the scheduler. +*/ +TP_pool_win::~TP_pool_win() { - /* Do we need to do anything ? */ + if (!pool) + return; + DestroyThreadpoolEnvironment(&callback_environ); + SetThreadpoolThreadMaximum(pool, 0); + CloseThreadpool(pool); + if (!tp_stats.num_worker_threads) + FlsFree(fls); } - - /** - Number of idle threads in pool. - This info is not available in Windows implementation, - thus function always returns 0. + Sets the number of idle threads the thread pool maintains in anticipation of new + requests. */ -int tp_get_idle_thread_count() +int TP_pool_win::set_min_threads(uint val) +{ + SetThreadpoolThreadMinimum(pool, val); + return 0; +} + +int TP_pool_win::set_max_threads(uint val) { + SetThreadpoolThreadMaximum(pool, val); return 0; } + +TP_connection *TP_pool_win::new_connection(CONNECT *connect) +{ + TP_connection *c= new (std::nothrow) TP_connection_win(connect); + if (!c ) + return 0; + if (c->init()) + { + delete c; + return 0; + } + return c; +} |