diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2018-09-25 16:06:32 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2018-10-05 09:29:22 +0100 |
commit | 8f329e8d37e08c28d4e39d5c87ea22300285d9cd (patch) | |
tree | 127f33d9649277bd515d61435f8cb4a193bd2b92 | |
parent | 25ad38abe5b3fb6bb2aafff315de256b8f1e7839 (diff) | |
download | mariadb-git-8f329e8d37e08c28d4e39d5c87ea22300285d9cd.tar.gz |
MDEV-10384 Windows : Refactor threading in mysqld startup.
Remove threads that are doing nothing but wait
- main thread now handles the connections
(if threadpool is used, also threadpool threads would wait for connections)
- thread for socket and pipe connections are removed
- shutdown thread is now removed, we wait for shutdown
notification in main thread as well
- kill_server() is also called inside the main thread, after connection
loop finished.
-rw-r--r-- | sql/CMakeLists.txt | 1 | ||||
-rw-r--r-- | sql/handle_connections_win.cc | 555 | ||||
-rw-r--r-- | sql/handle_connections_win.h | 20 | ||||
-rw-r--r-- | sql/init.h | 2 | ||||
-rw-r--r-- | sql/mysqld.cc | 541 | ||||
-rw-r--r-- | sql/mysqld.h | 6 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 31 |
7 files changed, 730 insertions, 426 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 708c36a58b0..0b48f3b8254 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -152,6 +152,7 @@ IF (CMAKE_SYSTEM_NAME MATCHES "Linux" OR ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS) IF(WIN32) SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc) + SET(SQL_SOURCE ${SQL_SOURCE} handle_connections_win.cc) ENDIF() SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc) diff --git a/sql/handle_connections_win.cc b/sql/handle_connections_win.cc new file mode 100644 index 00000000000..b37b4dedad1 --- /dev/null +++ b/sql/handle_connections_win.cc @@ -0,0 +1,555 @@ +/* Copyright (c) 2018 MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ + +/* Accepting connections on Windows */ + +#include <my_global.h> +#include <sql_class.h> +#include <sql_connect.h> +#include <mysqld.h> +#include <mswsock.h> +#include <mysql/psi/mysql_socket.h> +#include <sddl.h> + +#include <handle_connections_win.h> + +/* From mysqld.cc */ +extern HANDLE hEventShutdown; +extern MYSQL_SOCKET base_ip_sock, extra_ip_sock; +extern PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ(); +extern void tp_win_callback_prolog(); +static SECURITY_ATTRIBUTES pipe_security; + +/** + Abstract base class for accepting new connection, + asynchronously (i.e the accept() operation can be posted, + and result is retrieved later) , and creating a new connection. +*/ + +struct Listener +{ + /** Windows handle of the Listener. + Subclasses would use SOCKET or named pipe handle + */ + HANDLE m_handle; + /** Required for all async IO*/ + OVERLAPPED m_overlapped; + + /** Create new listener + @param handle - @see m_handle + @param wait_handle - usually, event handle or INVALID_HANDLE_VALUE + @see wait_handle + */ + Listener(HANDLE handle, HANDLE wait_handle): + m_handle(handle), m_overlapped() + { + m_overlapped.hEvent= wait_handle; + } + + /** + if not NULL, this handle can be be used in WaitForSingle/MultipleObject(s). + This handle will be closed when object is destroyed. + + If NULL, the completion notification happens in threadpool. + */ + HANDLE wait_handle() + { + return m_overlapped.hEvent; + } + + /* Start waiting for new client connection. */ + virtual void begin_accept()= 0; + + /** + Completion callback,called whenever IO posted by begin_accept is finisjed + Listener needs to create a new THD then (or, call scheduler so it creates one) + + @param success - whether IO completed successfull + */ + virtual void completion_callback(bool success)= 0; + + /** + Completion callback for Listener, that uses events for waiting + to IO. Not suitable for threadpool etc. Retrieves the status of + completed IO from the OVERLAPPED structure + */ + void completion_callback() + { + DBUG_ASSERT(wait_handle() && (wait_handle() != INVALID_HANDLE_VALUE)); + DWORD bytes; + return completion_callback( + GetOverlappedResult(wait_handle(), &m_overlapped, &bytes, FALSE)); + } + + /** Cancel an in-progress IO. Useful for threadpool-bound IO */ + void cancel() + { + CancelIoEx(m_handle, &m_overlapped); + } + + /* Destructor. Closes wait handle, if it was passed in constructor */ + virtual ~Listener() + { + if (m_overlapped.hEvent) + CloseHandle(m_overlapped.hEvent); + }; +}; + +/* Winsock extension finctions. */ +static LPFN_ACCEPTEX my_AcceptEx; +static LPFN_GETACCEPTEXSOCKADDRS my_GetAcceptExSockaddrs; + +/** + Listener that handles socket connections. + Can be threadpool-bound (i.e the completion is executed in threadpool thread), + or use events for waits. + + Threadpool-bound listener should be used with theradpool scheduler, for better + performance. +*/ +struct Socket_Listener: public Listener +{ + /** Client socket passed to AcceptEx() call.*/ + SOCKET m_client_socket; + + /** Buffer for sockaddrs passed to AcceptEx()/GetAcceptExSockaddrs() */ + char m_buffer[2 * sizeof(sockaddr_storage) + 32]; + + /* Threadpool IO struct.*/ + PTP_IO m_tp_io; + + /** + Callback for Windows threadpool's StartThreadpoolIo() function. + */ + static void CALLBACK tp_accept_completion_callback( + PTP_CALLBACK_INSTANCE, PVOID context, PVOID , ULONG io_result, + ULONG_PTR, PTP_IO io) + { + tp_win_callback_prolog(); + Listener *listener= (Listener *)context; + + if (io_result == ERROR_OPERATION_ABORTED) + { + /* ERROR_OPERATION_ABORTED caused by CancelIoEx()*/ + CloseThreadpoolIo(io); + delete listener; + return; + } + listener->completion_callback(io_result == 0); + } + + /** + Constructor + @param listen_socket - listening socket + @PTP_CALLBACK_ENVIRON callback_environ - threadpool environment, or NULL + if threadpool is not used for completion callbacks. + */ + Socket_Listener(MYSQL_SOCKET listen_socket, PTP_CALLBACK_ENVIRON callback_environ) : + Listener((HANDLE)listen_socket.fd,0), + m_client_socket(INVALID_SOCKET) + { + if (callback_environ) + { + /* Accept executed in threadpool. */ + m_tp_io= CreateThreadpoolIo(m_handle, + tp_accept_completion_callback, this, callback_environ); + } + else + { + /* Completion signaled via event. */ + m_tp_io= 0; + m_overlapped.hEvent= CreateEvent(0, FALSE , FALSE, 0); + } + } + + /* + Use AcceptEx to asynchronously wait for new connection; + */ + void begin_accept() + { +retry : + m_client_socket= socket(server_socket_ai_family, SOCK_STREAM, IPPROTO_TCP); + if (m_client_socket == INVALID_SOCKET) + { + sql_perror("socket() call failed."); + unireg_abort(1); + } + + DWORD bytes_received; + if (m_tp_io) + StartThreadpoolIo(m_tp_io); + + BOOL ret= my_AcceptEx( + (SOCKET)m_handle, + m_client_socket, + m_buffer, + 0, + sizeof(sockaddr_storage) + 16, + sizeof(sockaddr_storage) + 16, + &bytes_received, + &m_overlapped); + + DWORD last_error= ret? 0: WSAGetLastError(); + if (last_error == WSAECONNRESET) + { + if (m_tp_io) + CancelThreadpoolIo(m_tp_io); + goto retry; + } + + if (ret || last_error == ERROR_IO_PENDING || abort_loop) + return; + + sql_print_error("my_AcceptEx failed, last error %u", last_error); + abort(); + } + + /* Create new socket connection.*/ + void completion_callback(bool success) + { + if (!success) + { + /* my_AcceptEx() returned error */ + closesocket(m_client_socket); + begin_accept(); + return; + } + + MYSQL_SOCKET s_client{m_client_socket}; + MYSQL_SOCKET s_listen{(SOCKET)m_handle}; + +#ifdef HAVE_PSI_SOCKET_INTERFACE + /* Parse socket addresses buffer filled by AcceptEx(), + only needed for PSI instrumentation. */ + sockaddr *local_addr, *remote_addr; + int local_addr_len, remote_addr_len; + + my_GetAcceptExSockaddrs(m_buffer, + 0, sizeof(sockaddr_storage) + 16, sizeof(sockaddr_storage) + 16, + &local_addr, &local_addr_len, &remote_addr, &remote_addr_len); + + s_client.m_psi= PSI_SOCKET_CALL(init_socket) + (key_socket_client_connection, (const my_socket*)&s_listen.fd, remote_addr, remote_addr_len); +#endif + + /* Start accepting new connection. After this point, do not use + any member data, they could be used by a different (threadpool) thread. */ + begin_accept(); + + /* Some chores post-AcceptEx() that we need to create a normal socket.*/ + if (setsockopt(s_client.fd, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + (char *)&s_listen.fd, sizeof(s_listen.fd))) + { + if (!abort_loop) + { + sql_perror("setsockopt(SO_UPDATE_ACCEPT_CONTEXT) failed."); + abort(); + } + } + + /* Create a new connection.*/ + handle_accepted_socket(s_client, s_listen); + } + + ~Socket_Listener() + { + if (m_client_socket != INVALID_SOCKET) + closesocket(m_client_socket); + } + + /* + Retrieve the pointer to the Winsock extension functions + AcceptEx and GetAcceptExSockaddrs. + */ + static void init_winsock_extensions() + { + SOCKET s= mysql_socket_getfd(base_ip_sock); + if (s == INVALID_SOCKET) + s= mysql_socket_getfd(extra_ip_sock); + if (s == INVALID_SOCKET) + { + /* --skip-networking was used*/ + return; + } + GUID guid_AcceptEx= WSAID_ACCEPTEX; + GUID guid_GetAcceptExSockaddrs= WSAID_GETACCEPTEXSOCKADDRS; + + GUID *guids[]= { &guid_AcceptEx, &guid_GetAcceptExSockaddrs }; + void *funcs[]= { &my_AcceptEx, &my_GetAcceptExSockaddrs }; + DWORD bytes; + for (int i= 0; i < array_elements(guids); i++) + { + if (WSAIoctl(s, + SIO_GET_EXTENSION_FUNCTION_POINTER, + guids[i], sizeof(GUID), + funcs[i], sizeof(void *), + &bytes, 0, 0) == -1) + { + sql_print_error("WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER) failed"); + unireg_abort(1); + } + } + } +}; + + +/** + Pipe Listener. + Only event notification mode is implemented, no threadpool +*/ +struct Pipe_Listener : public Listener +{ + PTP_CALLBACK_ENVIRON m_tp_env; + Pipe_Listener(): + Listener(INVALID_HANDLE_VALUE, CreateEvent(0, FALSE, FALSE, 0)), + m_tp_env(get_threadpool_win_callback_environ()) + { + } + + /* + Creates local named pipe instance \\.\pipe\$socket for named pipe connection. + */ + static HANDLE create_named_pipe() + { + static bool first_instance= true; + static char pipe_name[512]; + DWORD open_mode= PIPE_ACCESS_DUPLEX | + FILE_FLAG_OVERLAPPED; + + if (first_instance) + { + snprintf(pipe_name, sizeof(pipe_name), "\\\\.\\pipe\\%s", mysqld_unix_port); + open_mode |= FILE_FLAG_FIRST_PIPE_INSTANCE; + if (!ConvertStringSecurityDescriptorToSecurityDescriptorA( + "S:(ML;; NW;;; LW) D:(A;; FRFW;;; WD)", + 1, &pipe_security.lpSecurityDescriptor, NULL)) + { + sql_perror("Can't start server : Initialize security descriptor"); + unireg_abort(1); + } + pipe_security.nLength= sizeof(SECURITY_ATTRIBUTES); + pipe_security.bInheritHandle= FALSE; + } + HANDLE pipe_handle= CreateNamedPipe(pipe_name, + open_mode, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + (int)global_system_variables.net_buffer_length, + (int)global_system_variables.net_buffer_length, + NMPWAIT_USE_DEFAULT_WAIT, + &pipe_security); + if (pipe_handle == INVALID_HANDLE_VALUE) + { + sql_perror("Create named pipe failed"); + sql_print_error("Aborting\n"); + exit(1); + } + first_instance= false; + return pipe_handle; + } + + static void create_pipe_connection(HANDLE pipe) + { + CONNECT *connect; + if (!(connect= new CONNECT) || !(connect->vio= vio_new_win32pipe(pipe))) + { + CloseHandle(pipe); + delete connect; + statistic_increment(aborted_connects, &LOCK_status); + statistic_increment(connection_errors_internal, &LOCK_status); + return; + } + connect->host= my_localhost; + create_new_thread(connect); + } + + /* Threadpool callback.*/ + static void CALLBACK tp_create_pipe_connection( + PTP_CALLBACK_INSTANCE,void *Context) + { + tp_win_callback_prolog(); + create_pipe_connection(Context); + } + + void begin_accept() + { + m_handle= create_named_pipe(); + BOOL connected= ConnectNamedPipe(m_handle, &m_overlapped); + if (connected) + { + /* Overlapped ConnectNamedPipe should return zero. */ + sql_perror("Overlapped ConnectNamedPipe() already connected."); + abort(); + } + DWORD last_error= GetLastError(); + switch (last_error) + { + case ERROR_PIPE_CONNECTED: + /* Client is already connected, so signal an event.*/ + { + /* + Cleanup overlapped (so that subsequent GetOverlappedResult() + does not show results of previous IO + */ + HANDLE e= m_overlapped.hEvent; + memset(&m_overlapped, 0, sizeof(m_overlapped)); + m_overlapped.hEvent = e; + } + if (!SetEvent(m_overlapped.hEvent)) + { + sql_perror("SetEvent() failed for connected pipe."); + abort(); + } + break; + case ERROR_IO_PENDING: + break; + default: + sql_perror("ConnectNamedPipe() failed."); + abort(); + break; + } + } + + void completion_callback(bool success) + { + if (!success) + { +#ifdef DBUG_OFF + sql_print_warning("ConnectNamedPipe completed with %u", GetLastError()); +#endif + CloseHandle(m_handle); + m_handle= INVALID_HANDLE_VALUE; + begin_accept(); + return; + } + HANDLE pipe= m_handle; + begin_accept(); + // If threadpool is on, create connection in threadpool thread + if (!m_tp_env || !TrySubmitThreadpoolCallback(tp_create_pipe_connection, pipe, m_tp_env)) + create_pipe_connection(pipe); + } + + ~Pipe_Listener() + { + if (m_handle != INVALID_HANDLE_VALUE) + { + CloseHandle(m_handle); + } + } + + static void cleanup() + { + LocalFree(pipe_security.lpSecurityDescriptor); + } +}; + +/** + Accept new client connections on Windows. + + Since we deal with pipe and sockets, they cannot be put into a select/loop. + But we can use asynchronous IO, and WaitForMultipleObject() loop. + + In addition, for slightly better performance, if we're using threadpool, + socket connections are accepted directly in the threadpool. + + The mode of operation is therefore + + 1. There is WaitForMultipleObject() loop that waits for shutdown notification + (hEventShutdown),and possibly pipes and sockets(e.g if threadpool is not used) + This loop ends when shutdown notification is detected. + + 2. If threadpool is used, new socket connections are accepted there. +*/ + + +#define MAX_WAIT_HANDLES 32 +#define NUM_PIPE_LISTENERS 24 +#define SHUTDOWN_IDX 0 +#define LISTENER_START_IDX 1 + +void handle_connections_win() +{ + Listener* all_listeners[MAX_WAIT_HANDLES]= {}; + HANDLE wait_events[MAX_WAIT_HANDLES]= {}; + int n_listeners= 0; + int n_waits= 0; + + Socket_Listener::init_winsock_extensions(); + + /* Listen for TCP connections on "extra-port" (no threadpool).*/ + if (extra_ip_sock.fd != INVALID_SOCKET) + all_listeners[n_listeners++]= new Socket_Listener(extra_ip_sock, 0); + + /* Listen for named pipe connections */ + if (mysqld_unix_port[0] && !opt_bootstrap && opt_enable_named_pipe) + { + /* + Use several listeners for pipe, to reduce ERROR_PIPE_BUSY on client side. + */ + for (int i= 0; i < NUM_PIPE_LISTENERS; i++) + all_listeners[n_listeners++]= new Pipe_Listener(); + } + + if (base_ip_sock.fd != INVALID_SOCKET) + { + /* Wait for TCP connections.*/ + SetFileCompletionNotificationModes((HANDLE)base_ip_sock.fd, FILE_SKIP_SET_EVENT_ON_HANDLE); + all_listeners[n_listeners++]= new Socket_Listener(base_ip_sock, get_threadpool_win_callback_environ()); + } + + if (!n_listeners && !opt_bootstrap) + { + sql_print_error("Either TCP connections or named pipe connections must be enabled."); + unireg_abort(1); + } + + wait_events[SHUTDOWN_IDX]= hEventShutdown; + n_waits = 1; + + for (int i= 0; i < n_listeners; i++) + { + HANDLE wait_handle= all_listeners[i]->wait_handle(); + if(wait_handle) + { + DBUG_ASSERT((i == 0) || (all_listeners[i-1]->wait_handle() != 0)); + wait_events[n_waits++]= wait_handle; + } + all_listeners[i]->begin_accept(); + } + + for (;;) + { + DWORD idx = WaitForMultipleObjects(n_waits ,wait_events, FALSE, INFINITE); + DBUG_ASSERT((int)idx >= 0 && (int)idx < n_waits); + + if (idx == SHUTDOWN_IDX) + break; + + all_listeners[idx - LISTENER_START_IDX]->completion_callback(); + } + + /* Cleanup */ + for (int i= 0; i < n_listeners; i++) + { + Listener *listener= all_listeners[i]; + if (listener->wait_handle()) + delete listener; + else + // Threadpool-bound listener will be deleted in threadpool + // Do not call destructor, because callback maybe running. + listener->cancel(); + } + Pipe_Listener::cleanup(); +}
\ No newline at end of file diff --git a/sql/handle_connections_win.h b/sql/handle_connections_win.h new file mode 100644 index 00000000000..a81f4346fb2 --- /dev/null +++ b/sql/handle_connections_win.h @@ -0,0 +1,20 @@ +/* Copyright (c) 2018 MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ + +/** + Handles incoming socket and pipe connections, on Windows. + Creates new (THD) connections.. +*/ +extern void handle_connections_win(); diff --git a/sql/init.h b/sql/init.h index e8dec0c1e2e..e96bb478cee 100644 --- a/sql/init.h +++ b/sql/init.h @@ -17,6 +17,6 @@ #define INIT_INCLUDED void unireg_init(ulong options); -ATTRIBUTE_NORETURN void unireg_end(void); +void unireg_end(void); #endif /* INIT_INCLUDED */ diff --git a/sql/mysqld.cc b/sql/mysqld.cc index acb7532f922..93927ed994c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -117,6 +117,10 @@ #include <poll.h> #endif +#ifdef _WIN32 +#include <handle_connections_win.h> +#endif + #include <my_service_manager.h> #define mysqld_charset &my_charset_latin1 @@ -319,16 +323,6 @@ MY_TIMER_INFO sys_timer_info; /* static variables */ #ifdef HAVE_PSI_INTERFACE -#if defined(_WIN32) && !defined(EMBEDDED_LIBRARY) -static PSI_thread_key key_thread_handle_con_namedpipes; -static PSI_cond_key key_COND_handler_count; -static PSI_thread_key key_thread_handle_con_sockets; -#endif /* _WIN32 |&& !EMBEDDED_LIBRARY */ - -#ifdef _WIN32 -static PSI_thread_key key_thread_handle_shutdown; -#endif - #ifdef HAVE_OPENSSL10 static PSI_rwlock_key key_rwlock_openssl; #endif @@ -364,6 +358,7 @@ static char *character_set_filesystem_name; static char *lc_messages; static char *lc_time_names_name; char *my_bind_addr_str; +int server_socket_ai_family; static char *default_collation_name; char *default_storage_engine, *default_tmp_storage_engine; char *enforced_storage_engine=NULL; @@ -737,7 +732,6 @@ mysql_mutex_t LOCK_thread_count; other threads. It also protects these variables: - handler_count in_bootstrap select_thread_in_use slave_init_thread_running @@ -1091,9 +1085,6 @@ PSI_cond_key key_COND_ack_receiver; static PSI_cond_info all_server_conds[]= { -#if defined(_WIN32) && !defined(EMBEDDED_LIBRARY) - { &key_COND_handler_count, "COND_handler_count", PSI_FLAG_GLOBAL}, -#endif /* _WIN32 && !EMBEDDED_LIBRARY */ #ifdef HAVE_MMAP { &key_PAGE_cond, "PAGE::cond", 0}, { &key_COND_active, "TC_LOG_MMAP::COND_active", 0}, @@ -1154,12 +1145,6 @@ PSI_thread_key key_thread_ack_receiver; static PSI_thread_info all_server_threads[]= { -#if (defined (_WIN32) && !defined (EMBEDDED_LIBRARY)) - { &key_thread_handle_con_namedpipes, "con_named_pipes", PSI_FLAG_GLOBAL}, - { &key_thread_handle_con_sockets, "con_sockets", PSI_FLAG_GLOBAL}, - { &key_thread_handle_shutdown, "shutdown", PSI_FLAG_GLOBAL}, -#endif - { &key_thread_bootstrap, "bootstrap", PSI_FLAG_GLOBAL}, { &key_thread_delayed_insert, "delayed_insert", 0}, { &key_thread_handle_manager, "manager", PSI_FLAG_GLOBAL}, @@ -1402,10 +1387,10 @@ void Buffered_logs::print() /** Logs reported before a logger is available. */ static Buffered_logs buffered_logs; -static MYSQL_SOCKET unix_sock, base_ip_sock, extra_ip_sock; struct my_rnd_struct sql_rand; ///< used by sql_class.cc:THD::THD() #ifndef EMBEDDED_LIBRARY +MYSQL_SOCKET unix_sock, base_ip_sock, extra_ip_sock; /** Error reporter that buffer log messages. @param level log message level @@ -1461,27 +1446,18 @@ static pthread_t select_thread; #undef getpid #include <process.h> -static mysql_cond_t COND_handler_count; -static uint handler_count; static bool start_mode=0, use_opt_args; static int opt_argc; static char **opt_argv; #if !defined(EMBEDDED_LIBRARY) -static HANDLE hEventShutdown; +HANDLE hEventShutdown; static char shutdown_event_name[40]; #include "nt_servc.h" static NTService Service; ///< Service object for WinNT #endif /* EMBEDDED_LIBRARY */ #endif /* __WIN__ */ -#ifdef _WIN32 -#include <sddl.h> /* ConvertStringSecurityDescriptorToSecurityDescriptor */ -static char pipe_name[512]; -static SECURITY_ATTRIBUTES saPipeSecurity; -static HANDLE hPipe = INVALID_HANDLE_VALUE; -#endif - #ifndef EMBEDDED_LIBRARY bool mysqld_embedded=0; #else @@ -1554,16 +1530,13 @@ extern "C" my_bool mysqld_get_one_option(int, const struct my_option *, char *); static int init_thread_environment(); static char *get_relative_path(const char *path); static int fix_paths(void); +#ifndef _WIN32 void handle_connections_sockets(); -#ifdef _WIN32 -pthread_handler_t handle_connections_sockets_thread(void *arg); #endif + pthread_handler_t kill_server_thread(void *arg); static void bootstrap(MYSQL_FILE *file); static bool read_init_file(char *file_name); -#ifdef _WIN32 -pthread_handler_t handle_connections_namedpipes(void *arg); -#endif pthread_handler_t handle_slave(void *arg); static void clean_up(bool print_message); static int test_if_case_insensitive(const char *dir_name); @@ -1598,6 +1571,7 @@ static void close_connections(void) kill_cached_threads++; flush_thread_cache(); + /* kill connection thread */ #if !defined(__WIN__) DBUG_PRINT("quit", ("waiting for select thread: %lu", @@ -1647,30 +1621,7 @@ static void close_connections(void) extra_ip_sock= MYSQL_INVALID_SOCKET; } } -#ifdef _WIN32 - if (hPipe != INVALID_HANDLE_VALUE && opt_enable_named_pipe) - { - HANDLE temp; - DBUG_PRINT("quit", ("Closing named pipes") ); - - /* Create connection to the handle named pipe handler to break the loop */ - if ((temp = CreateFile(pipe_name, - GENERIC_READ | GENERIC_WRITE, - 0, - NULL, - OPEN_EXISTING, - 0, - NULL )) != INVALID_HANDLE_VALUE) - { - WaitNamedPipe(pipe_name, 1000); - DWORD dwMode = PIPE_READMODE_BYTE | PIPE_WAIT; - SetNamedPipeHandleState(temp, &dwMode, NULL, NULL); - CancelIo(temp); - DisconnectNamedPipe(temp); - CloseHandle(temp); - } - } -#endif + #ifdef HAVE_SYS_UN_H if (mysql_socket_getfd(unix_sock) != INVALID_SOCKET) { @@ -1910,12 +1861,6 @@ void kill_mysql(THD *thd) { DBUG_PRINT("error",("Got error: %ld from SetEvent",GetLastError())); } - /* - or: - HANDLE hEvent=OpenEvent(0, FALSE, "MySqlShutdown"); - SetEvent(hEventShutdown); - CloseHandle(hEvent); - */ } #endif #elif defined(HAVE_PTHREAD_KILL) @@ -1947,7 +1892,7 @@ void kill_mysql(THD *thd) /** Force server down. Kill all connections and threads and exit. - @param sig_ptr Signal number that caused kill_server to be called. + @param sig Signal number that caused kill_server to be called. @note A signal number of 0 mean that the function was not called @@ -1955,22 +1900,14 @@ void kill_mysql(THD *thd) or stop, we just want to kill the server. */ -#if !defined(__WIN__) -static void *kill_server(void *sig_ptr) -#define RETURN_FROM_KILL_SERVER return 0 -#else -static void __cdecl kill_server(int sig_ptr) -#define RETURN_FROM_KILL_SERVER return -#endif +static void kill_server(int sig) { DBUG_ENTER("kill_server"); #ifndef EMBEDDED_LIBRARY - int sig=(int) (long) sig_ptr; // This is passed a int // if there is a signal during the kill in progress, ignore the other if (kill_in_progress) // Safety { - DBUG_LEAVE; - RETURN_FROM_KILL_SERVER; + DBUG_VOID_RETURN; } kill_in_progress=TRUE; abort_loop=1; // This should be set @@ -2004,20 +1941,9 @@ static void __cdecl kill_server(int sig_ptr) else unireg_end(); - /* purecov: begin deadcode */ - DBUG_LEAVE; // Must match DBUG_ENTER() - my_thread_end(); - pthread_exit(0); - /* purecov: end */ - - RETURN_FROM_KILL_SERVER; // Avoid compiler warnings - -#else /* EMBEDDED_LIBRARY*/ - - DBUG_LEAVE; - RETURN_FROM_KILL_SERVER; +#endif /* EMBEDDED_LIBRARY*/ -#endif /* EMBEDDED_LIBRARY */ + DBUG_VOID_RETURN; } @@ -2026,11 +1952,9 @@ pthread_handler_t kill_server_thread(void *arg __attribute__((unused))) { my_thread_init(); // Initialize new thread kill_server(0); - /* purecov: begin deadcode */ my_thread_end(); pthread_exit(0); return 0; - /* purecov: end */ } #endif @@ -2076,13 +2000,7 @@ static void clean_up_error_log_mutex() void unireg_end(void) { clean_up(1); - my_thread_end(); sd_notify(0, "STATUS=MariaDB server is down"); -#if defined(SIGNALS_DONT_BREAK_READ) - exit(0); -#else - pthread_exit(0); // Exit is in main thread -#endif } @@ -2577,6 +2495,7 @@ static MYSQL_SOCKET activate_tcp_port(uint port) } else { + server_socket_ai_family= a->ai_family; sql_print_information("Server socket created on IP: '%s'.", (const char *) ip_addr); break; @@ -2703,44 +2622,6 @@ static void network_init(void) extra_ip_sock= activate_tcp_port(mysqld_extra_port); } -#ifdef _WIN32 - /* create named pipe */ - if (mysqld_unix_port[0] && !opt_bootstrap && - opt_enable_named_pipe) - { - - strxnmov(pipe_name, sizeof(pipe_name)-1, "\\\\.\\pipe\\", - mysqld_unix_port, NullS); - /* - Create a security descriptor for pipe. - - Use low integrity level, so that it is possible to connect - from any process. - - Give Everyone read/write access to pipe. - */ - if (!ConvertStringSecurityDescriptorToSecurityDescriptor( - "S:(ML;; NW;;; LW) D:(A;; FRFW;;; WD)", - SDDL_REVISION_1, &saPipeSecurity.lpSecurityDescriptor, NULL)) - { - sql_perror("Can't start server : Initialize security descriptor"); - unireg_abort(1); - } - saPipeSecurity.nLength = sizeof(SECURITY_ATTRIBUTES); - saPipeSecurity.bInheritHandle = FALSE; - if ((hPipe= CreateNamedPipe(pipe_name, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, - PIPE_UNLIMITED_INSTANCES, - (int) global_system_variables.net_buffer_length, - (int) global_system_variables.net_buffer_length, - NMPWAIT_USE_DEFAULT_WAIT, - &saPipeSecurity)) == INVALID_HANDLE_VALUE) - { - sql_perror("Create named pipe failed"); - unireg_abort(1); - } - } -#endif - #if defined(HAVE_SYS_UN_H) /* ** Create the UNIX socket @@ -3559,7 +3440,7 @@ pthread_handler_t signal_hand(void *arg __attribute__((unused))) sql_print_error("Can't create thread to kill server (errno= %d)", error); #else - kill_server((void*) sig); // MIT THREAD has a alarm thread + kill_server(sig); // MIT THREAD has a alarm thread #endif } break; @@ -3675,23 +3556,6 @@ void *my_str_malloc_mysqld(size_t size) } -#ifdef __WIN__ - -pthread_handler_t handle_shutdown(void *arg) -{ - MSG msg; - my_thread_init(); - - /* this call should create the message queue for this thread */ - PeekMessage(&msg, NULL, 1, 65534,PM_NOREMOVE); -#if !defined(EMBEDDED_LIBRARY) - if (WaitForSingleObject(hEventShutdown,INFINITE)==WAIT_OBJECT_0) -#endif /* EMBEDDED_LIBRARY */ - kill_server(MYSQL_KILL_SIGNAL); - return 0; -} -#endif - #include <mysqld_default_groups.h> #if defined(__WIN__) && !defined(EMBEDDED_LIBRARY) @@ -5590,79 +5454,14 @@ static int init_server_components() #ifndef EMBEDDED_LIBRARY #ifdef _WIN32 -static void create_shutdown_thread() +static void create_shutdown_event() { -#ifdef __WIN__ hEventShutdown=CreateEvent(0, FALSE, FALSE, shutdown_event_name); - pthread_t hThread; - int error; - if (unlikely((error= mysql_thread_create(key_thread_handle_shutdown, - &hThread, &connection_attrib, - handle_shutdown, 0)))) - sql_print_warning("Can't create thread to handle shutdown requests" - " (errno= %d)", error); - // On "Stop Service" we have to do regular shutdown Service.SetShutdownEvent(hEventShutdown); -#endif /* __WIN__ */ } - -static void handle_connections_methods() -{ - pthread_t hThread; - int error; - DBUG_ENTER("handle_connections_methods"); - if (hPipe == INVALID_HANDLE_VALUE && opt_disable_networking) - { - sql_print_error("TCP/IP, or --named-pipe should be configured on Windows"); - unireg_abort(1); // Will not return - } - - mysql_mutex_lock(&LOCK_start_thread); - mysql_cond_init(key_COND_handler_count, &COND_handler_count, NULL); - handler_count=0; - if (hPipe != INVALID_HANDLE_VALUE) - { - handler_count++; - if ((error= mysql_thread_create(key_thread_handle_con_namedpipes, - &hThread, &connection_attrib, - handle_connections_namedpipes, 0))) - { - sql_print_warning("Can't create thread to handle named pipes" - " (errno= %d)", error); - handler_count--; - } - } - if (have_tcpip && !opt_disable_networking) - { - handler_count++; - if ((error= mysql_thread_create(key_thread_handle_con_sockets, - &hThread, &connection_attrib, - handle_connections_sockets_thread, 0))) - { - sql_print_warning("Can't create thread to handle TCP/IP", - " (errno= %d)", error); - handler_count--; - } - } - - while (handler_count > 0) - mysql_cond_wait(&COND_handler_count, &LOCK_start_thread); - mysql_mutex_unlock(&LOCK_start_thread); - DBUG_VOID_RETURN; -} - -void decrement_handler_count() -{ - mysql_mutex_lock(&LOCK_start_thread); - if (--handler_count == 0) - mysql_cond_signal(&COND_handler_count); - mysql_mutex_unlock(&LOCK_start_thread); - my_thread_end(); -} -#else /* WIN32*/ -#define create_shutdown_thread() -#define decrement_handler_count() +#else /*_WIN32*/ +#define create_shutdown_event() #endif #endif /* EMBEDDED_LIBRARY */ @@ -6067,7 +5866,7 @@ int mysqld_main(int argc, char **argv) } } - create_shutdown_thread(); + create_shutdown_event(); start_handle_manager(); /* Copy default global rpl_filter to global_rpl_filter */ @@ -6137,7 +5936,8 @@ int mysqld_main(int argc, char **argv) start_memory_used= global_status_var.global_memory_used; #ifdef _WIN32 - handle_connections_methods(); + handle_connections_win(); + kill_server(0); #else handle_connections_sockets(); #endif /* _WIN32 */ @@ -6549,7 +6349,7 @@ void create_thread_to_handle_connection(CONNECT *connect) @param[in,out] thd Thread handle of future thread. */ -static void create_new_thread(CONNECT *connect) +void create_new_thread(CONNECT *connect) { DBUG_ENTER("create_new_thread"); @@ -6617,18 +6417,107 @@ inline void kill_broken_server() #ifndef EMBEDDED_LIBRARY +void handle_accepted_socket(MYSQL_SOCKET new_sock, MYSQL_SOCKET sock) +{ + CONNECT *connect; + bool is_unix_sock; + +#ifdef FD_CLOEXEC + (void) fcntl(mysql_socket_getfd(new_sock), F_SETFD, FD_CLOEXEC); +#endif + +#ifdef HAVE_LIBWRAP + { + if (mysql_socket_getfd(sock) == mysql_socket_getfd(base_ip_sock) || + mysql_socket_getfd(sock) == mysql_socket_getfd(extra_ip_sock)) + { + struct request_info req; + signal(SIGCHLD, SIG_DFL); + request_init(&req, RQ_DAEMON, libwrapName, RQ_FILE, + mysql_socket_getfd(new_sock), NULL); + my_fromhost(&req); + if (!my_hosts_access(&req)) + { + /* + This may be stupid but refuse() includes an exit(0) + which we surely don't want... + clean_exit() - same stupid thing ... + */ + syslog(deny_severity, "refused connect from %s", + my_eval_client(&req)); + + /* + C++ sucks (the gibberish in front just translates the supplied + sink function pointer in the req structure from a void (*sink)(); + to a void(*sink)(int) if you omit the cast, the C++ compiler + will cry... + */ + if (req.sink) + ((void(*)(int))req.sink)(req.fd); + + (void)mysql_socket_shutdown(new_sock, SHUT_RDWR); + (void)mysql_socket_close(new_sock); + /* + The connection was refused by TCP wrappers. + There are no details (by client IP) available to update the + host_cache. + */ + statistic_increment(connection_errors_tcpwrap, &LOCK_status); + return; + } + } + } +#endif /* HAVE_LIBWRAP */ + + DBUG_PRINT("info", ("Creating CONNECT for new connection")); + + if ((connect= new CONNECT())) + { + is_unix_sock= (mysql_socket_getfd(sock) == + mysql_socket_getfd(unix_sock)); + + if (!(connect->vio= + mysql_socket_vio_new(new_sock, + is_unix_sock ? VIO_TYPE_SOCKET : + VIO_TYPE_TCPIP, + is_unix_sock ? VIO_LOCALHOST : 0))) + { + delete connect; + connect= 0; // Error handling below + } + } + + if (!connect) + { + /* Connect failure */ + (void)mysql_socket_close(new_sock); + statistic_increment(aborted_connects, &LOCK_status); + statistic_increment(connection_errors_internal, &LOCK_status); + return; + } + + if (is_unix_sock) + connect->host= my_localhost; + + if (mysql_socket_getfd(sock) == mysql_socket_getfd(extra_ip_sock)) + { + connect->extra_port= 1; + connect->scheduler= extra_thread_scheduler; + } + create_new_thread(connect); +} + +#ifndef _WIN32 void handle_connections_sockets() { MYSQL_SOCKET sock= mysql_socket_invalid(); MYSQL_SOCKET new_sock= mysql_socket_invalid(); uint error_count=0; - CONNECT *connect; struct sockaddr_storage cAddr; int ip_flags __attribute__((unused))=0; int socket_flags __attribute__((unused))= 0; int extra_ip_flags __attribute__((unused))=0; int flags=0,retval; - bool is_unix_sock; #ifdef HAVE_POLL int socket_count= 0; struct pollfd fds[3]; // for ip_sock, unix_sock and extra_ip_sock @@ -6760,10 +6649,7 @@ void handle_connections_sockets() } #endif } -#if !defined(NO_FCNTL_NONBLOCK) - if (!(test_flags & TEST_BLOCKING)) - fcntl(mysql_socket_getfd(sock), F_SETFL, flags); -#endif + if (mysql_socket_getfd(new_sock) == INVALID_SOCKET) { /* @@ -6779,199 +6665,18 @@ void handle_connections_sockets() sleep(1); // Give other threads some time continue; } -#ifdef FD_CLOEXEC - (void) fcntl(mysql_socket_getfd(new_sock), F_SETFD, FD_CLOEXEC); +#if !defined(NO_FCNTL_NONBLOCK) + if (!(test_flags & TEST_BLOCKING)) + fcntl(mysql_socket_getfd(sock), F_SETFL, flags); #endif - -#ifdef HAVE_LIBWRAP - { - if (mysql_socket_getfd(sock) == mysql_socket_getfd(base_ip_sock) || - mysql_socket_getfd(sock) == mysql_socket_getfd(extra_ip_sock)) - { - struct request_info req; - signal(SIGCHLD, SIG_DFL); - request_init(&req, RQ_DAEMON, libwrapName, RQ_FILE, - mysql_socket_getfd(new_sock), NULL); - my_fromhost(&req); - if (!my_hosts_access(&req)) - { - /* - This may be stupid but refuse() includes an exit(0) - which we surely don't want... - clean_exit() - same stupid thing ... - */ - syslog(deny_severity, "refused connect from %s", - my_eval_client(&req)); - - /* - C++ sucks (the gibberish in front just translates the supplied - sink function pointer in the req structure from a void (*sink)(); - to a void(*sink)(int) if you omit the cast, the C++ compiler - will cry... - */ - if (req.sink) - ((void (*)(int))req.sink)(req.fd); - - (void) mysql_socket_shutdown(new_sock, SHUT_RDWR); - (void) mysql_socket_close(new_sock); - /* - The connection was refused by TCP wrappers. - There are no details (by client IP) available to update the - host_cache. - */ - statistic_increment(connection_errors_tcpwrap, &LOCK_status); - continue; - } - } - } -#endif /* HAVE_LIBWRAP */ - - DBUG_PRINT("info", ("Creating CONNECT for new connection")); - - if ((connect= new CONNECT())) - { - is_unix_sock= (mysql_socket_getfd(sock) == - mysql_socket_getfd(unix_sock)); - - if (!(connect->vio= - mysql_socket_vio_new(new_sock, - is_unix_sock ? VIO_TYPE_SOCKET : - VIO_TYPE_TCPIP, - is_unix_sock ? VIO_LOCALHOST: 0))) - { - delete connect; - connect= 0; // Error handling below - } - } - - if (!connect) - { - /* Connect failure */ - (void) mysql_socket_shutdown(new_sock, SHUT_RDWR); - (void) mysql_socket_close(new_sock); - statistic_increment(aborted_connects,&LOCK_status); - statistic_increment(connection_errors_internal, &LOCK_status); - continue; - } - - if (is_unix_sock) - connect->host= my_localhost; - - if (mysql_socket_getfd(sock) == mysql_socket_getfd(extra_ip_sock)) - { - connect->extra_port= 1; - connect->scheduler= extra_thread_scheduler; - } - create_new_thread(connect); + handle_accepted_socket(new_sock, sock); } sd_notify(0, "STOPPING=1\n" "STATUS=Shutdown in progress\n"); DBUG_VOID_RETURN; } - -#ifdef _WIN32 -pthread_handler_t handle_connections_sockets_thread(void *arg) -{ - my_thread_init(); - handle_connections_sockets(); - decrement_handler_count(); - return 0; -} - -pthread_handler_t handle_connections_namedpipes(void *arg) -{ - HANDLE hConnectedPipe; - OVERLAPPED connectOverlapped= {0}; - my_thread_init(); - DBUG_ENTER("handle_connections_namedpipes"); - connectOverlapped.hEvent= CreateEvent(NULL, TRUE, FALSE, NULL); - if (!connectOverlapped.hEvent) - { - sql_print_error("Can't create event, last error=%u", GetLastError()); - unireg_abort(1); - } - DBUG_PRINT("general",("Waiting for named pipe connections.")); - while (!abort_loop) - { - /* wait for named pipe connection */ - BOOL fConnected= ConnectNamedPipe(hPipe, &connectOverlapped); - if (!fConnected && (GetLastError() == ERROR_IO_PENDING)) - { - /* - ERROR_IO_PENDING says async IO has started but not yet finished. - GetOverlappedResult will wait for completion. - */ - DWORD bytes; - fConnected= GetOverlappedResult(hPipe, &connectOverlapped,&bytes, TRUE); - } - if (abort_loop) - break; - if (!fConnected) - fConnected = GetLastError() == ERROR_PIPE_CONNECTED; - if (!fConnected) - { - CloseHandle(hPipe); - if ((hPipe= CreateNamedPipe(pipe_name, - PIPE_ACCESS_DUPLEX | - FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | - PIPE_READMODE_BYTE | - PIPE_WAIT, - PIPE_UNLIMITED_INSTANCES, - (int) global_system_variables. - net_buffer_length, - (int) global_system_variables. - net_buffer_length, - NMPWAIT_USE_DEFAULT_WAIT, - &saPipeSecurity)) == - INVALID_HANDLE_VALUE) - { - sql_perror("Can't create new named pipe!"); - break; // Abort - } - } - hConnectedPipe = hPipe; - /* create new pipe for new connection */ - if ((hPipe = CreateNamedPipe(pipe_name, - PIPE_ACCESS_DUPLEX | - FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | - PIPE_READMODE_BYTE | - PIPE_WAIT, - PIPE_UNLIMITED_INSTANCES, - (int) global_system_variables.net_buffer_length, - (int) global_system_variables.net_buffer_length, - NMPWAIT_USE_DEFAULT_WAIT, - &saPipeSecurity)) == - INVALID_HANDLE_VALUE) - { - sql_perror("Can't create new named pipe!"); - hPipe=hConnectedPipe; - continue; // We have to try again - } - CONNECT *connect; - if (!(connect= new CONNECT) || - !(connect->vio= vio_new_win32pipe(hConnectedPipe))) - { - DisconnectNamedPipe(hConnectedPipe); - CloseHandle(hConnectedPipe); - delete connect; - statistic_increment(aborted_connects,&LOCK_status); - statistic_increment(connection_errors_internal, &LOCK_status); - continue; - } - connect->host= my_localhost; - create_new_thread(connect); - } - LocalFree(saPipeSecurity.lpSecurityDescriptor); - CloseHandle(connectOverlapped.hEvent); - DBUG_LEAVE; - decrement_handler_count(); - return 0; -} -#endif /* _WIN32 */ - +#endif /* _WIN32*/ #endif /* EMBEDDED_LIBRARY */ @@ -8661,7 +8366,9 @@ static int mysql_init_variables(void) character_set_filesystem= &my_charset_bin; opt_specialflag= SPECIAL_ENGLISH; +#ifndef EMBEDDED_LIBRARY unix_sock= base_ip_sock= extra_ip_sock= MYSQL_INVALID_SOCKET; +#endif mysql_home_ptr= mysql_home; log_error_file_ptr= log_error_file; protocol_version= PROTOCOL_VERSION; diff --git a/sql/mysqld.h b/sql/mysqld.h index 6c0c7308d91..4ee795115a7 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -24,6 +24,7 @@ #include "mysql_com.h" /* SERVER_VERSION_LENGTH */ #include "my_atomic.h" #include "mysql/psi/mysql_file.h" /* MYSQL_FILE */ +#include "mysql/psi/mysql_socket.h" /* MYSQL_SOCKET */ #include "sql_list.h" /* I_List */ #include "sql_cmd.h" #include <my_rnd.h> @@ -92,6 +93,8 @@ void refresh_status(THD *thd); bool is_secure_file_path(char *path); void dec_connection_count(scheduler_functions *scheduler); extern void init_net_server_extension(THD *thd); +extern void handle_accepted_socket(MYSQL_SOCKET new_sock, MYSQL_SOCKET sock); +extern void create_new_thread(CONNECT *connect); extern "C" MYSQL_PLUGIN_IMPORT CHARSET_INFO *system_charset_info; extern MYSQL_PLUGIN_IMPORT CHARSET_INFO *files_charset_info ; @@ -152,6 +155,7 @@ extern ulong opt_replicate_events_marked_for_skip; extern char *default_tz_name; extern Time_zone *default_tz; extern char *my_bind_addr_str; +extern int server_socket_ai_family; extern char *default_storage_engine, *default_tmp_storage_engine; extern char *enforced_storage_engine; extern char *gtid_pos_auto_engines; @@ -759,7 +763,7 @@ enum enum_query_type /* query_id */ extern query_id_t global_query_id; -ATTRIBUTE_NORETURN void unireg_end(void); +void unireg_end(void); /* increment query_id and return it. */ inline __attribute__((warn_unused_result)) query_id_t next_query_id() diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index 3bc1ae3b371..67a8e783208 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -70,6 +70,11 @@ static DWORD fls; static bool skip_completion_port_on_success = false; +PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ() +{ + return pool? &callback_environ: 0; +} + /* Threadpool callbacks. @@ -134,7 +139,15 @@ struct TP_connection *new_TP_connection(CONNECT *connect) void TP_pool_win::add(TP_connection *c) { - SubmitThreadpoolWork(((TP_connection_win *)c)->work); + if(FlsGetValue(fls)) + { + /* Inside threadpool(), execute callback directly. */ + tp_callback(c); + } + else + { + SubmitThreadpoolWork(((TP_connection_win *)c)->work); + } } @@ -288,14 +301,13 @@ TP_connection_win::~TP_connection_win() void TP_connection_win::wait_begin(int type) { - /* Signal to the threadpool whenever callback can run long. Currently, binlog waits are a good candidate, its waits are really long */ if (type == THD_WAIT_BINLOG) { - if (!long_callback) + if (!long_callback && callback_instance) { CallbackMayRunLong(callback_instance); long_callback= true; @@ -308,12 +320,11 @@ void TP_connection_win::wait_end() /* Do we need to do anything ? */ } -/* - This function should be called first whenever a callback is invoked in the +/* + This function should be called first whenever a callback is invoked in the threadpool, does my_thread_init() if not yet done */ -extern ulong thread_created; -static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance) +void tp_win_callback_prolog() { if (FlsGetValue(fls) == NULL) { @@ -323,6 +334,12 @@ static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance) InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); my_thread_init(); } +} + +extern ulong thread_created; +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance) +{ + tp_win_callback_prolog(); TP_connection_win *c = (TP_connection_win *)context; c->callback_instance = instance; c->long_callback = false; |