diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2016-09-21 14:28:42 +0000 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2016-09-22 17:01:28 +0000 |
commit | f7a7c0c2fec3dcca331bb529f8314273360c72ae (patch) | |
tree | 2e04f4036bd7def676d85690e67e393ec0c41a8e /sql | |
parent | f32a5115584c9b33a2163df57830ad335cd2b3ab (diff) | |
download | mariadb-git-f7a7c0c2fec3dcca331bb529f8314273360c72ae.tar.gz |
MDEV-10297 Add priorization to threadpool
Also MDEV-10385 Threadpool refactoring
Diffstat (limited to 'sql')
-rw-r--r-- | sql/CMakeLists.txt | 4 | ||||
-rw-r--r-- | sql/mysqld.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.h | 1 | ||||
-rw-r--r-- | sql/sys_vars.cc | 35 | ||||
-rw-r--r-- | sql/threadpool.h | 121 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 214 | ||||
-rw-r--r-- | sql/threadpool_generic.cc (renamed from sql/threadpool_unix.cc) | 573 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 550 |
8 files changed, 886 insertions, 614 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 65b7775dc3c..28072375bbc 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -157,9 +157,9 @@ IF (CMAKE_SYSTEM_NAME MATCHES "Linux" OR ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS) IF(WIN32) SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc) - ELSE() - SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc) ENDIF() + SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc) + ENDIF() MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 789cd6788aa..cf9e99b54a6 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -4425,7 +4425,7 @@ static int init_common_variables() #endif /* HAVE_SOLARIS_LARGE_PAGES */ -#if defined(HAVE_POOL_OF_THREADS) && !defined(_WIN32) +#if defined(HAVE_POOL_OF_THREADS) if (IS_SYSVAR_AUTOSIZE(&threadpool_size)) SYSVAR_AUTOSIZE(threadpool_size, my_getncpus()); #endif diff --git a/sql/sql_class.h b/sql/sql_class.h index 2240b5fe0a9..d2a489bfef6 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -697,6 +697,7 @@ typedef struct system_variables my_bool session_track_schema; my_bool session_track_state_change; + ulong threadpool_priority; } SV; /** diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 55f2864a93e..5cc81585ed5 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -3241,23 +3241,17 @@ static Sys_var_ulong Sys_thread_cache_size( #ifdef HAVE_POOL_OF_THREADS static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type) { -#ifdef _WIN32 tp_set_max_threads(threadpool_max_threads); -#endif return false; } -#ifdef _WIN32 static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type) { tp_set_min_threads(threadpool_min_threads); return false; } -#endif - -#ifndef _WIN32 static bool check_threadpool_size(sys_var *self, THD *thd, set_var *var) { ulonglong v= var->save_result.ulonglong_value; @@ -3282,7 +3276,6 @@ static bool fix_threadpool_stall_limit(sys_var*, THD*, enum_var_type) tp_set_threadpool_stall_limit(threadpool_stall_limit); return false; } -#endif #ifdef _WIN32 static Sys_var_uint Sys_threadpool_min_threads( @@ -3293,7 +3286,24 @@ static Sys_var_uint Sys_threadpool_min_threads( NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_tp_min_threads) ); -#else + +static const char *threadpool_mode_names[]={ "windows", "generic", 0 }; +static Sys_var_enum Sys_threadpool_mode( + "thread_pool_mode", + "Chose implementation of the threadpool", + READ_ONLY GLOBAL_VAR(threadpool_mode), CMD_LINE(REQUIRED_ARG), + threadpool_mode_names, DEFAULT(TP_MODE_WINDOWS) + ); +#endif + +static const char *threadpool_priority_names[]={ "high", "low", "auto", 0 }; +static Sys_var_enum Sys_thread_pool_priority( + "thread_pool_priority", + "Threadpool priority. High priority connections usually start executing earlier than low priority." + "If priority set to 'auto', the the actual priority(low or high) is determined based on whether or not connection is inside transaction.", + SESSION_VAR(threadpool_priority), CMD_LINE(REQUIRED_ARG), + threadpool_priority_names, DEFAULT(TP_PRIORITY_AUTO)); + static Sys_var_uint Sys_threadpool_idle_thread_timeout( "thread_pool_idle_timeout", "Timeout in seconds for an idle thread in the thread pool." @@ -3328,7 +3338,7 @@ static Sys_var_uint Sys_threadpool_stall_limit( NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_threadpool_stall_limit) ); -#endif /* !WIN32 */ + static Sys_var_uint Sys_threadpool_max_threads( "thread_pool_max_threads", "Maximum allowed number of worker threads in the thread pool", @@ -3337,6 +3347,13 @@ static Sys_var_uint Sys_threadpool_max_threads( NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_tp_max_threads) ); + +static Sys_var_uint Sys_threadpool_threadpool_prio_kickup_timer( + "thread_pool_prio_kickup_timer", + "The number of milliseconds before a dequeued low-priority statement is moved to the high-priority queue", + GLOBAL_VAR(threadpool_prio_kickup_timer), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0, UINT_MAX), DEFAULT(1000), BLOCK_SIZE(1) +); #endif /* HAVE_POOL_OF_THREADS */ /** diff --git a/sql/threadpool.h b/sql/threadpool.h index 7ddc661565f..17c8b6ea4ea 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -23,28 +23,19 @@ extern uint threadpool_max_size; extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/ extern uint threadpool_max_threads; /* Maximum threads in pool */ extern uint threadpool_oversubscribe; /* Maximum active threads in group */ +extern uint threadpool_prio_kickup_timer; /* Time before low prio item gets prio boost */ +#ifdef _WIN32 +extern uint threadpool_mode; /* Thread pool implementation , windows or generic */ +#define TP_MODE_WINDOWS 0 +#define TP_MODE_GENERIC 1 +#endif +struct TP_connection; +extern void tp_callback(TP_connection *c); +extern void tp_timeout_handler(TP_connection *c); -/* Common thread pool routines, suitable for different implementations */ -extern void threadpool_remove_connection(THD *thd); -extern int threadpool_process_request(THD *thd); -extern THD* threadpool_add_connection(CONNECT *connect, void *scheduled_data); -/* - Functions used by scheduler. - OS-specific implementations are in - threadpool_unix.cc or threadpool_win.cc -*/ -extern bool tp_init(); -extern void tp_add_connection(CONNECT *); -extern void tp_wait_begin(THD *, int); -extern void tp_wait_end(THD*); -extern void tp_post_kill_notification(THD *thd); -extern void tp_end(void); - -/* Used in SHOW for threadpool_idle_thread_count */ -extern int tp_get_idle_thread_count(); /* Threadpool statistics @@ -63,9 +54,103 @@ extern void tp_set_min_threads(uint val); extern void tp_set_max_threads(uint val); extern void tp_set_threadpool_size(uint val); extern void tp_set_threadpool_stall_limit(uint val); +extern int tp_get_idle_thread_count(); +extern int tp_get_thread_count(); /* Activate threadpool scheduler */ extern void tp_scheduler(void); extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff, enum enum_var_type scope); + +enum TP_PRIORITY { + TP_PRIORITY_HIGH, + TP_PRIORITY_LOW, + TP_PRIORITY_AUTO +}; + + +enum TP_STATE +{ + TP_STATE_IDLE, + TP_STATE_RUNNING, +}; + +/* + Connection structure, encapsulates THD + structures for asynchronous + IO and pool. + + Platform specific parts are specified in subclasses called connection_t, + inside threadpool_win.cc and threadpool_unix.cc +*/ + +struct TP_connection +{ + THD* thd; + CONNECT* connect; + TP_STATE state; + TP_PRIORITY priority; + TP_connection(CONNECT *c) : + thd(0), + connect(c), + state(TP_STATE_IDLE), + priority(TP_PRIORITY_HIGH) + {} + + virtual ~TP_connection() + {}; + + /* Initialize io structures windows threadpool, epoll etc */ + virtual int init() = 0; + + virtual void set_io_timeout(int sec) = 0; + + /* Read for the next client command (async) with specified timeout */ + virtual int start_io() = 0; + + virtual void wait_begin(int type)= 0; + virtual void wait_end() = 0; + +}; + + +struct TP_pool +{ + virtual ~TP_pool(){}; + virtual int init()= 0; + virtual TP_connection *new_connection(CONNECT *)= 0; + virtual void add(TP_connection *c)= 0; + virtual int set_max_threads(uint){ return 0; } + virtual int set_min_threads(uint){ return 0; } + virtual int set_pool_size(uint){ return 0; } + virtual int set_idle_timeout(uint){ return 0; } + virtual int set_oversubscribe(uint){ return 0; } + virtual int set_stall_limit(uint){ return 0; } + virtual int get_thread_count() { return tp_stats.num_worker_threads; } + virtual int get_idle_thread_count(){ return 0; } +}; + +#ifdef _WIN32 +struct TP_pool_win:TP_pool +{ + TP_pool_win(); + virtual int init(); + virtual ~TP_pool_win(); + virtual TP_connection *new_connection(CONNECT *c); + virtual void add(TP_connection *); + virtual int set_max_threads(uint); + virtual int set_min_threads(uint); +}; +#endif + +struct TP_pool_generic :TP_pool +{ + TP_pool_generic(); + ~TP_pool_generic(); + virtual int init(); + virtual TP_connection *new_connection(CONNECT *c); + virtual void add(TP_connection *); + virtual int set_pool_size(uint); + virtual int set_stall_limit(uint); + virtual int get_idle_thread_count(); +}; diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index d6c343dc04e..2308f4277d6 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -34,14 +34,25 @@ uint threadpool_max_size; uint threadpool_stall_limit; uint threadpool_max_threads; uint threadpool_oversubscribe; +uint threadpool_mode; +uint threadpool_prio_kickup_timer; /* Stats */ TP_STATISTICS tp_stats; +static void threadpool_remove_connection(THD *thd); +static int threadpool_process_request(THD *thd); +static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data); + extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); extern bool do_command(THD*); +static inline TP_connection *get_TP_connection(THD *thd) +{ + return (TP_connection *)thd->event_scheduler.data; +} + /* Worker threads contexts, and THD contexts. ========================================= @@ -105,15 +116,81 @@ static void thread_attach(THD* thd) #endif } +/* + Determine connection priority , using current + transaction state and 'threadpool_priority' variable value. +*/ +static TP_PRIORITY get_priority(TP_connection *c) +{ + DBUG_ASSERT(c->thd == current_thd); + TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority; + if (prio == TP_PRIORITY_AUTO) + { + return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW; + } + return prio; +} -THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) + +void tp_callback(TP_connection *c) { - THD *thd= NULL; - int error=1; + DBUG_ASSERT(c); Worker_thread_context worker_context; worker_context.save(); + THD *thd= c->thd; + + c->state = TP_STATE_RUNNING; + + if (!thd) + { + /* No THD, need to login first. */ + DBUG_ASSERT(c->connect); + thd= c->thd= threadpool_add_connection(c->connect, c); + if (!thd) + { + /* Bail out on connect error.*/ + goto error; + } + c->connect= 0; + } + else if (threadpool_process_request(thd)) + { + /* QUIT or an error occured. */ + goto error; + } + + /* Set priority */ + c->priority= get_priority(c); + + /* Read next command from client. */ + c->set_io_timeout(thd->variables.net_wait_timeout); + c->state= TP_STATE_IDLE; + if (c->start_io()) + goto error; + + worker_context.restore(); + return; + +error: + c->thd= 0; + delete c; + + if (thd) + { + threadpool_remove_connection(thd); + } + worker_context.restore(); +} + + +static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) +{ + THD *thd= NULL; + int error=1; + + /* Create a new connection context: mysys_thread_var and PSI thread Store them in THD. @@ -137,7 +214,6 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) #endif my_thread_end(); } - worker_context.restore(); return NULL; } delete connect; @@ -184,17 +260,14 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) threadpool_remove_connection(thd); thd= NULL; } - worker_context.restore(); return thd; } -void threadpool_remove_connection(THD *thd) +static void threadpool_remove_connection(THD *thd) { - Worker_thread_context worker_context; - worker_context.save(); thread_attach(thd); - + thd->event_scheduler.data= 0; thd->net.reading_or_writing = 0; end_connection(thd); close_connection(thd, 0); @@ -206,19 +279,14 @@ void threadpool_remove_connection(THD *thd) mysys thread_var and PSI thread. */ my_thread_end(); - - worker_context.restore(); } /** Process a single client request or a single batch. */ -int threadpool_process_request(THD *thd) +static int threadpool_process_request(THD *thd) { int retval= 0; - Worker_thread_context worker_context; - worker_context.save(); - thread_attach(thd); if (thd->killed >= KILL_CONNECTION) @@ -268,7 +336,6 @@ int threadpool_process_request(THD *thd) } end: - worker_context.restore(); return retval; } @@ -286,6 +353,119 @@ static bool tp_end_thread(THD *, bool) return 0; } +static TP_pool *pool; + +static bool tp_init() +{ + +#ifdef _WIN32 + if (threadpool_mode == TP_MODE_WINDOWS) + pool= new (std::nothrow) TP_pool_win; + else + pool= new (std::nothrow) TP_pool_generic; +#else + pool= new (std::nothrow) TP_pool_generic; +#endif + if (!pool) + return true; + if (pool->init()) + { + delete pool; + pool= 0; + return true; + } + return false; +} + +static void tp_add_connection(CONNECT *connect) +{ + TP_connection *c= pool->new_connection(connect); + DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;); + if (c) + pool->add(c); + else + connect->close_and_delete(); +} + +int tp_get_idle_thread_count() +{ + return pool? pool->get_idle_thread_count(): 0; +} + +int tp_get_thread_count() +{ + return pool ? pool->get_thread_count() : 0; +} + +void tp_set_min_threads(uint val) +{ + if (pool) + pool->set_min_threads(val); +} + + +void tp_set_max_threads(uint val) +{ + if (pool) + pool->set_max_threads(val); +} + +void tp_set_threadpool_size(uint val) +{ + if (pool) + pool->set_pool_size(val); +} + + +void tp_set_threadpool_stall_limit(uint val) +{ + if (pool) + pool->set_stall_limit(val); +} + + +void tp_timeout_handler(TP_connection *c) +{ + if (c->state != TP_STATE_IDLE) + return; + THD *thd=c->thd; + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->killed= KILL_CONNECTION; + c->priority= TP_PRIORITY_HIGH; + post_kill_notification(thd); + mysql_mutex_unlock(&thd->LOCK_thd_data); +} + + +static void tp_wait_begin(THD *thd, int type) +{ + TP_connection *c = get_TP_connection(thd); + if (c) + c->wait_begin(type); +} + + +static void tp_wait_end(THD *thd) +{ + TP_connection *c = get_TP_connection(thd); + if (c) + c->wait_end(); +} + + +static void tp_end() +{ + delete pool; +} + +static void tp_post_kill_notification(THD *thd) +{ + TP_connection *c= get_TP_connection(thd); + if (c) + c->priority= TP_PRIORITY_HIGH; + post_kill_notification(thd); +} + static scheduler_functions tp_scheduler_functions= { 0, // max_threads @@ -296,7 +476,7 @@ static scheduler_functions tp_scheduler_functions= tp_add_connection, // add_connection tp_wait_begin, // thd_wait_begin tp_wait_end, // thd_wait_end - post_kill_notification, // post_kill_notification + tp_post_kill_notification, // post kill notification tp_end_thread, // Dummy function tp_end // end }; diff --git a/sql/threadpool_unix.cc b/sql/threadpool_generic.cc index 4079091e217..87c74d18aea 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_generic.cc @@ -22,6 +22,17 @@ #ifdef HAVE_POOL_OF_THREADS +#ifdef _WIN32 +/* AIX may define this, too ?*/ +#define HAVE_IOCP +#endif + +#ifdef HAVE_IOCP +#define OPTIONAL_IO_POLL_READ_PARAM &overlapped +#else +#define OPTIONAL_IO_POLL_READ_PARAM 0 +#endif + #include <sql_connect.h> #include <mysqld.h> #include <debug_sync.h> @@ -38,10 +49,23 @@ typedef struct kevent native_event; #elif defined (__sun) #include <port.h> typedef port_event_t native_event; +#elif defined (HAVE_IOCP) +typedef OVERLAPPED_ENTRY native_event; #else #error threadpool is not available on this platform #endif + +static void io_poll_close(int fd) +{ +#ifdef _WIN32 + CloseHandle((HANDLE)fd); +#else + close(fd); +#endif +} + + /** Maximum number of native events a listener can read in one go */ #define MAX_EVENTS 1024 @@ -108,32 +132,45 @@ typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t, > worker_list_t; -struct connection_t +struct TP_connection_generic:public TP_connection { + TP_connection_generic(CONNECT *c); + ~TP_connection_generic(); + + virtual int init(){ return 0; }; + virtual void set_io_timeout(int sec); + virtual int start_io(); + virtual void wait_begin(int type); + virtual void wait_end(); - THD *thd; thread_group_t *thread_group; - connection_t *next_in_queue; - connection_t **prev_in_queue; + TP_connection_generic *next_in_queue; + TP_connection_generic **prev_in_queue; ulonglong abs_wait_timeout; - CONNECT* connect; - bool logged_in; + ulonglong dequeue_time; bool bound_to_poll_descriptor; - bool waiting; + int waiting; +#ifdef HAVE_IOCP + OVERLAPPED overlapped; +#endif }; -typedef I_P_List<connection_t, - I_P_List_adapter<connection_t, - &connection_t::next_in_queue, - &connection_t::prev_in_queue>, +typedef TP_connection_generic TP_connection_generic; + +typedef I_P_List<TP_connection_generic, + I_P_List_adapter<TP_connection_generic, + &TP_connection_generic::next_in_queue, + &TP_connection_generic::prev_in_queue>, I_P_List_null_counter, - I_P_List_fast_push_back<connection_t> > + I_P_List_fast_push_back<TP_connection_generic> > connection_queue_t; +const int NQUEUES=2; /* We have high and low priority queues*/ + struct thread_group_t { mysql_mutex_t mutex; - connection_queue_t queue; + connection_queue_t queues[NQUEUES]; worker_list_t waiting_threads; worker_thread_t *listener; pthread_attr_t *pthread_attr; @@ -147,9 +184,8 @@ struct thread_group_t ulonglong last_thread_creation_time; int shutdown_pipe[2]; bool shutdown; - bool stalled; - -} MY_ALIGNED(512); + bool stalled; +} MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE); static thread_group_t *all_groups; static uint group_count; @@ -175,15 +211,13 @@ struct pool_timer_t static pool_timer_t pool_timer; -static void queue_put(thread_group_t *thread_group, connection_t *connection); +static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); +static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt); static int wake_thread(thread_group_t *thread_group); -static void handle_event(connection_t *connection); static int wake_or_create_thread(thread_group_t *thread_group); static int create_worker(thread_group_t *thread_group); static void *worker_main(void *param); static void check_stall(thread_group_t *thread_group); -static void connection_abort(connection_t *connection); -static void set_wait_timeout(connection_t *connection); static void set_next_timeout_check(ulonglong abstime); static void print_pool_blocked_message(bool); @@ -194,12 +228,12 @@ static void print_pool_blocked_message(bool); This maps to different APIs on different Unixes. Supported are currently Linux with epoll, Solaris with event ports, - OSX and BSD with kevent. All those API's are used with one-shot flags + OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags (the event is signalled once client has written something into the socket, then socket is removed from the "poll-set" until the command is finished, and we need to re-arm/re-register socket) - No implementation for poll/select/AIO is currently provided. + No implementation for poll/select is currently provided. The API closely resembles all of the above mentioned platform APIs and consists of following functions. @@ -208,7 +242,7 @@ static void print_pool_blocked_message(bool); Creates an io_poll descriptor On Linux: epoll_create() - - io_poll_associate_fd(int poll_fd, int fd, void *data) + - io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt) Associate file descriptor with io poll descriptor On Linux : epoll_ctl(..EPOLL_CTL_ADD)) @@ -217,7 +251,7 @@ static void print_pool_blocked_message(bool); On Linux: epoll_ctl(..EPOLL_CTL_DEL) - - io_poll_start_read(int poll_fd,int fd, void *data) + - io_poll_start_read(int poll_fd,int fd, void *data, void *opt) The same as io_poll_associate_fd(), but cannot be used before io_poll_associate_fd() was called. On Linux : epoll_ctl(..EPOLL_CTL_MOD) @@ -245,7 +279,7 @@ static int io_poll_create() } -int io_poll_associate_fd(int pollfd, int fd, void *data) +int io_poll_associate_fd(int pollfd, int fd, void *data, void*) { struct epoll_event ev; ev.data.u64= 0; /* Keep valgrind happy */ @@ -256,7 +290,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data) -int io_poll_start_read(int pollfd, int fd, void *data) +int io_poll_start_read(int pollfd, int fd, void *data, void *) { struct epoll_event ev; ev.data.u64= 0; /* Keep valgrind happy */ @@ -315,7 +349,7 @@ int io_poll_create() return kqueue(); } -int io_poll_start_read(int pollfd, int fd, void *data) +int io_poll_start_read(int pollfd, int fd, void *data,void *) { struct kevent ke; MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, @@ -324,12 +358,12 @@ int io_poll_start_read(int pollfd, int fd, void *data) } -int io_poll_associate_fd(int pollfd, int fd, void *data) +int io_poll_associate_fd(int pollfd, int fd, void *data,void *) { struct kevent ke; MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, 0, 0, data); - return io_poll_start_read(pollfd,fd, data); + return io_poll_start_read(pollfd,fd, data, 0); } @@ -371,14 +405,14 @@ static int io_poll_create() return port_create(); } -int io_poll_start_read(int pollfd, int fd, void *data) +int io_poll_start_read(int pollfd, int fd, void *data, void *) { return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data); } -static int io_poll_associate_fd(int pollfd, int fd, void *data) +static int io_poll_associate_fd(int pollfd, int fd, void *data, void *) { - return io_poll_start_read(pollfd, fd, data); + return io_poll_start_read(pollfd, fd, data, 0); } int io_poll_disassociate_fd(int pollfd, int fd) @@ -410,23 +444,115 @@ static void* native_event_get_userdata(native_event *event) { return event->portev_user; } + +#elif defined(HAVE_IOCP) + +static int io_poll_create() +{ + HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); + return (int)h; +} + + +int io_poll_start_read(int pollfd, int fd, void *, void *opt) +{ + DWORD num_bytes = 0; + static char c; + + WSABUF buf; + buf.buf= &c; + buf.len= 0; + DWORD flags=0; + + if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0) + return 0; + + if (GetLastError() == ERROR_IO_PENDING) + return 0; + + return 1; +} + + +static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt) +{ + HANDLE h= CreateIoCompletionPort((HANDLE)fd, (HANDLE)pollfd, (ULONG_PTR)data, 0); + if (!h) + return -1; + return io_poll_start_read(pollfd,fd, 0, opt); +} + + +int io_poll_disassociate_fd(int pollfd, int fd) +{ + /* Not possible to unbind/rebind file descriptor in IOCP. */ + return 0; +} + + +int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms) +{ + ULONG n; + BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events, + maxevents, &n, timeout_ms, FALSE); + + return ok ? (int)n : -1; +} + + +static void* native_event_get_userdata(native_event *event) +{ + return (void *)event->lpCompletionKey; +} + #endif /* Dequeue element from a workqueue */ -static connection_t *queue_get(thread_group_t *thread_group) +static TP_connection_generic *queue_get(thread_group_t *thread_group) { DBUG_ENTER("queue_get"); thread_group->queue_event_count++; - connection_t *c= thread_group->queue.front(); - if (c) + TP_connection_generic *c; + for (int i=0; i < NQUEUES;i++) { - thread_group->queue.remove(c); + c= thread_group->queues[i].pop_front(); + if (c) + DBUG_RETURN(c); } - DBUG_RETURN(c); + DBUG_RETURN(0); } +static bool is_queue_empty(thread_group_t *thread_group) +{ + for (int i=0; i < NQUEUES; i++) + { + if (!thread_group->queues[i].is_empty()) + return false; + } + return true; +} + + +static void queue_init(thread_group_t *thread_group) +{ + for (int i=0; i < NQUEUES; i++) + { + thread_group->queues[i].empty(); + } +} + +static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt) +{ + ulonglong now= pool_timer.current_microtime; + for(int i=0; i < cnt; i++) + { + TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]); + c->dequeue_time= now; + thread_group->queues[c->priority].push_back(c); + } +} /* Handle wait timeout : @@ -450,7 +576,7 @@ static void timeout_check(pool_timer_t *timer) if (thd->net.reading_or_writing != 1) continue; - connection_t *connection= (connection_t *)thd->event_scheduler.data; + TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data; if (!connection) { /* @@ -462,11 +588,7 @@ static void timeout_check(pool_timer_t *timer) if(connection->abs_wait_timeout < timer->current_microtime) { - /* Wait timeout exceeded, kill connection. */ - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->killed = KILL_CONNECTION; - post_kill_notification(thd); - mysql_mutex_unlock(&thd->LOCK_thd_data); + tp_timeout_handler(connection); } else { @@ -545,10 +667,23 @@ static void* timer_thread(void *param) void check_stall(thread_group_t *thread_group) { - if (mysql_mutex_trylock(&thread_group->mutex) != 0) + mysql_mutex_lock(&thread_group->mutex); + + /* + Bump priority for the low priority connections that spent too much + time in low prio queue. + */ + TP_connection_generic *c; + for (;;) { - /* Something happens. Don't disturb */ - return; + c= thread_group->queues[TP_PRIORITY_LOW].front(); + if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer) + { + thread_group->queues[TP_PRIORITY_LOW].remove(c); + thread_group->queues[TP_PRIORITY_HIGH].push_back(c); + } + else + break; } /* @@ -593,7 +728,7 @@ void check_stall(thread_group_t *thread_group) do wait and indicate that via thd_wait_begin/end callbacks, thread creation will be faster. */ - if (!thread_group->queue.is_empty() && !thread_group->queue_event_count) + if (!is_queue_empty(thread_group) && !thread_group->queue_event_count) { thread_group->stalled= true; wake_or_create_thread(thread_group); @@ -636,11 +771,11 @@ static void stop_timer(pool_timer_t *timer) @return a ready connection, or NULL on shutdown */ -static connection_t * listener(worker_thread_t *current_thread, +static TP_connection_generic * listener(worker_thread_t *current_thread, thread_group_t *thread_group) { DBUG_ENTER("listener"); - connection_t *retval= NULL; + TP_connection_generic *retval= NULL; for(;;) { @@ -707,28 +842,17 @@ static connection_t * listener(worker_thread_t *current_thread, and wake a worker. NOTE: Currently nothing is done to detect or prevent long queuing times. - A solutionc for the future would be to give up "one active thread per + A solution for the future would be to give up "one active thread per group" principle, if events stay in the queue for too long, and just wake more workers. */ - bool listener_picks_event= thread_group->queue.is_empty(); - - /* - If listener_picks_event is set, listener thread will handle first event, - and put the rest into the queue. If listener_pick_event is not set, all - events go to the queue. - */ - for(int i=(listener_picks_event)?1:0; i < cnt ; i++) - { - connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]); - thread_group->queue.push_back(c); - } - + bool listener_picks_event=is_queue_empty(thread_group); + queue_put(thread_group, ev, cnt); if (listener_picks_event) { /* Handle the first event. */ - retval= (connection_t *)native_event_get_userdata(&ev[0]); + retval= queue_get(thread_group); mysql_mutex_unlock(&thread_group->mutex); break; } @@ -914,7 +1038,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) thread_group->pollfd= -1; thread_group->shutdown_pipe[0]= -1; thread_group->shutdown_pipe[1]= -1; - thread_group->queue.empty(); + queue_init(thread_group); DBUG_RETURN(0); } @@ -924,9 +1048,10 @@ void thread_group_destroy(thread_group_t *thread_group) mysql_mutex_destroy(&thread_group->mutex); if (thread_group->pollfd != -1) { - close(thread_group->pollfd); + io_poll_close(thread_group->pollfd); thread_group->pollfd= -1; } +#ifndef HAVE_IOCP for(int i=0; i < 2; i++) { if(thread_group->shutdown_pipe[i] != -1) @@ -935,6 +1060,8 @@ void thread_group_destroy(thread_group_t *thread_group) thread_group->shutdown_pipe[i]= -1; } } +#endif + if (my_atomic_add32(&shutdown_group_count, -1) == 1) my_free(all_groups); } @@ -957,7 +1084,32 @@ static int wake_thread(thread_group_t *thread_group) DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ } +/* + Wake listener thread (during shutdown) + Self-pipe trick is used in most cases,except IOCP. +*/ +static int wake_listener(thread_group_t *thread_group) +{ +#ifndef HAVE_IOCP + if (pipe(thread_group->shutdown_pipe)) + { + return -1; + } + /* Wake listener */ + if (io_poll_associate_fd(thread_group->pollfd, + thread_group->shutdown_pipe[0], NULL, NULL)) + { + return -1; + } + char c= 0; + if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) + return -1; +#else + PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0); +#endif + return 0; +} /** Initiate shutdown for thread group. @@ -981,20 +1133,7 @@ static void thread_group_close(thread_group_t *thread_group) thread_group->shutdown= true; thread_group->listener= NULL; - if (pipe(thread_group->shutdown_pipe)) - { - DBUG_VOID_RETURN; - } - - /* Wake listener */ - if (io_poll_associate_fd(thread_group->pollfd, - thread_group->shutdown_pipe[0], NULL)) - { - DBUG_VOID_RETURN; - } - char c= 0; - if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) - DBUG_VOID_RETURN; + wake_listener(thread_group); /* Wake all workers. */ while(wake_thread(thread_group) == 0) @@ -1015,18 +1154,16 @@ static void thread_group_close(thread_group_t *thread_group) */ -static void queue_put(thread_group_t *thread_group, connection_t *connection) +static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection) { DBUG_ENTER("queue_put"); - mysql_mutex_lock(&thread_group->mutex); - thread_group->queue.push_back(connection); + connection->dequeue_time= pool_timer.current_microtime; + thread_group->queues[connection->priority].push_back(connection); if (thread_group->active_thread_count == 0) wake_or_create_thread(thread_group); - mysql_mutex_unlock(&thread_group->mutex); - DBUG_VOID_RETURN; } @@ -1061,18 +1198,19 @@ static bool too_many_threads(thread_group_t *thread_group) NULL is returned if timeout has expired,or on shutdown. */ -connection_t *get_event(worker_thread_t *current_thread, +TP_connection_generic *get_event(worker_thread_t *current_thread, thread_group_t *thread_group, struct timespec *abstime) { DBUG_ENTER("get_event"); - connection_t *connection = NULL; - int err=0; + TP_connection_generic *connection = NULL; + mysql_mutex_lock(&thread_group->mutex); DBUG_ASSERT(thread_group->active_thread_count >= 0); for(;;) { + int err=0; bool oversubscribed = too_many_threads(thread_group); if (thread_group->shutdown) break; @@ -1100,22 +1238,27 @@ connection_t *get_event(worker_thread_t *current_thread, thread_group->listener= NULL; break; } - + + /* Last thing we try before going to sleep is to - pick a single event via epoll, without waiting (timeout 0) + non-blocking event poll, i.e with timeout = 0. + If this returns events, pick one */ if (!oversubscribed) { - native_event nev; - if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1) + + native_event ev[MAX_EVENTS]; + int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0); + if (cnt > 0) { - thread_group->io_event_count++; - connection = (connection_t *)native_event_get_userdata(&nev); + queue_put(thread_group, ev, cnt); + connection= queue_get(thread_group); break; } } + /* And now, finally sleep */ current_thread->woken = false; /* wake() sets this to true */ @@ -1173,9 +1316,9 @@ void wait_begin(thread_group_t *thread_group) DBUG_ASSERT(thread_group->active_thread_count >=0); DBUG_ASSERT(thread_group->connection_count > 0); - + if ((thread_group->active_thread_count == 0) && - (thread_group->queue.is_empty() || !thread_group->listener)) + (is_queue_empty(thread_group) || !thread_group->listener)) { /* Group might stall while this thread waits, thus wake @@ -1202,103 +1345,47 @@ void wait_end(thread_group_t *thread_group) } -/** - Allocate/initialize a new connection structure. -*/ -connection_t *alloc_connection() + +TP_connection * TP_pool_generic::new_connection(CONNECT *c) { - connection_t* connection; - DBUG_ENTER("alloc_connection"); - DBUG_EXECUTE_IF("simulate_failed_connection_1", DBUG_RETURN(0); ); - - if ((connection = (connection_t *)my_malloc(sizeof(connection_t),0))) - { - connection->waiting= false; - connection->logged_in= false; - connection->bound_to_poll_descriptor= false; - connection->abs_wait_timeout= ULONGLONG_MAX; - connection->thd= 0; - } - DBUG_RETURN(connection); + return new (std::nothrow) TP_connection_generic(c); } - - /** Add a new connection to thread pool.. */ -void tp_add_connection(CONNECT *connect) +void TP_pool_generic::add(TP_connection *c) { - connection_t *connection; DBUG_ENTER("tp_add_connection"); - connection= alloc_connection(); - if (!connection) - { - connect->close_and_delete(); - DBUG_VOID_RETURN; - } - connection->connect= connect; - - /* Assign connection to a group. */ - thread_group_t *group= - &all_groups[connect->thread_id%group_count]; - - connection->thread_group=group; - - mysql_mutex_lock(&group->mutex); - group->connection_count++; - mysql_mutex_unlock(&group->mutex); - + TP_connection_generic *connection=(TP_connection_generic *)c; + thread_group_t *thread_group= connection->thread_group; /* Add connection to the work queue.Actual logon will be done by a worker thread. */ - queue_put(group, connection); + mysql_mutex_lock(&thread_group->mutex); + queue_put(thread_group, connection); + mysql_mutex_unlock(&thread_group->mutex); DBUG_VOID_RETURN; } -/** - Terminate connection. -*/ - -static void connection_abort(connection_t *connection) -{ - DBUG_ENTER("connection_abort"); - thread_group_t *group= connection->thread_group; - - if (connection->thd) - { - threadpool_remove_connection(connection->thd); - } - - mysql_mutex_lock(&group->mutex); - group->connection_count--; - mysql_mutex_unlock(&group->mutex); - - my_free(connection); - DBUG_VOID_RETURN; -} - /** MySQL scheduler callback: wait begin */ -void tp_wait_begin(THD *thd, int type) +void TP_connection_generic::wait_begin(int type) { - DBUG_ENTER("tp_wait_begin"); - DBUG_ASSERT(thd); - connection_t *connection = (connection_t *)thd->event_scheduler.data; - if (connection) - { - DBUG_ASSERT(!connection->waiting); - connection->waiting= true; - wait_begin(connection->thread_group); - } + DBUG_ENTER("wait_begin"); + + DBUG_ASSERT(!waiting); + waiting++; + if (waiting == 1) + ::wait_begin(thread_group); DBUG_VOID_RETURN; } @@ -1307,18 +1394,13 @@ void tp_wait_begin(THD *thd, int type) MySQL scheduler callback: wait end */ -void tp_wait_end(THD *thd) +void TP_connection_generic::wait_end() { - DBUG_ENTER("tp_wait_end"); - DBUG_ASSERT(thd); - - connection_t *connection = (connection_t *)thd->event_scheduler.data; - if (connection) - { - DBUG_ASSERT(connection->waiting); - connection->waiting = false; - wait_end(connection->thread_group); - } + DBUG_ENTER("wait_end"); + DBUG_ASSERT(waiting); + waiting--; + if (waiting == 0) + ::wait_end(thread_group); DBUG_VOID_RETURN; } @@ -1335,12 +1417,41 @@ static void set_next_timeout_check(ulonglong abstime) DBUG_VOID_RETURN; } +TP_connection_generic::TP_connection_generic(CONNECT *c): + TP_connection(c), + thread_group(0), + next_in_queue(0), + prev_in_queue(0), + abs_wait_timeout(ULONGLONG_MAX), + bound_to_poll_descriptor(false), + waiting(false) +#ifdef HAVE_IOCP +, overlapped() +#endif +{ + /* Assign connection to a group. */ + thread_group_t *group= + &all_groups[c->thread_id%group_count]; + + thread_group=group; + + mysql_mutex_lock(&group->mutex); + group->connection_count++; + mysql_mutex_unlock(&group->mutex); +} + +TP_connection_generic::~TP_connection_generic() +{ + mysql_mutex_lock(&thread_group->mutex); + thread_group->connection_count--; + mysql_mutex_unlock(&thread_group->mutex); +} /** Set wait timeout for connection. */ -static void set_wait_timeout(connection_t *c) +void TP_connection_generic::set_io_timeout(int timeout_sec) { DBUG_ENTER("set_wait_timeout"); /* @@ -1351,11 +1462,11 @@ static void set_wait_timeout(connection_t *c) one tick interval. */ - c->abs_wait_timeout= pool_timer.current_microtime + + abs_wait_timeout= pool_timer.current_microtime + 1000LL*pool_timer.tick_interval + - 1000000LL*c->thd->variables.net_wait_timeout; + 1000000LL*timeout_sec; - set_next_timeout_check(c->abs_wait_timeout); + set_next_timeout_check(abs_wait_timeout); DBUG_VOID_RETURN; } @@ -1367,7 +1478,7 @@ static void set_wait_timeout(connection_t *c) after thread_pool_size setting. */ -static int change_group(connection_t *c, +static int change_group(TP_connection_generic *c, thread_group_t *old_group, thread_group_t *new_group) { @@ -1398,10 +1509,11 @@ static int change_group(connection_t *c, } -static int start_io(connection_t *connection) +int TP_connection_generic::start_io() { - int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket); + int fd= mysql_socket_getfd(thd->net.vio->mysql_socket); +#ifndef HAVE_IOCP /* Usually, connection will stay in the same group for the entire connection's life. However, we do allow group_count to @@ -1413,56 +1525,25 @@ static int start_io(connection_t *connection) on thread_id and current group count, and migrate if necessary. */ thread_group_t *group = - &all_groups[connection->thd->thread_id%group_count]; + &all_groups[thd->thread_id%group_count]; - if (group != connection->thread_group) + if (group != thread_group) { - if (change_group(connection, connection->thread_group, group)) + if (change_group(this, thread_group, group)) return -1; } - +#endif + /* Bind to poll descriptor if not yet done. */ - if (!connection->bound_to_poll_descriptor) + if (!bound_to_poll_descriptor) { - connection->bound_to_poll_descriptor= true; - return io_poll_associate_fd(group->pollfd, fd, connection); + bound_to_poll_descriptor= true; + return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); } - return io_poll_start_read(group->pollfd, fd, connection); -} - - - -static void handle_event(connection_t *connection) -{ - - DBUG_ENTER("handle_event"); - int err; - - if (!connection->logged_in) - { - connection->thd = threadpool_add_connection(connection->connect, connection); - err= (connection->thd == NULL); - connection->logged_in= true; - } - else - { - err= threadpool_process_request(connection->thd); - } - - if(err) - goto end; - - set_wait_timeout(connection); - err= start_io(connection); - -end: - if (err) - connection_abort(connection); - - DBUG_VOID_RETURN; + return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); } @@ -1490,14 +1571,14 @@ static void *worker_main(void *param) /* Run event loop */ for(;;) { - connection_t *connection; + TP_connection_generic *connection; struct timespec ts; set_timespec(ts,threadpool_idle_timeout); connection = get_event(&this_thread, thread_group, &ts); if (!connection) break; this_thread.event_count++; - handle_event(connection); + tp_callback(connection); } /* Thread shutdown: cleanup per-worker-thread structure. */ @@ -1518,30 +1599,33 @@ static void *worker_main(void *param) } -bool tp_init() +TP_pool_generic::TP_pool_generic() +{} + +int TP_pool_generic::init() { - DBUG_ENTER("tp_init"); + DBUG_ENTER("TP_pool_generic::TP_pool_generic"); threadpool_max_size= MY_MAX(threadpool_size, 128); all_groups= (thread_group_t *) my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL)); if (!all_groups) { threadpool_max_size= 0; - DBUG_RETURN(1); + sql_print_error("Allocation failed"); + DBUG_RETURN(-1); } - threadpool_started= true; scheduler_init(); - + threadpool_started= true; for (uint i= 0; i < threadpool_max_size; i++) { thread_group_init(&all_groups[i], get_connection_attrib()); } - tp_set_threadpool_size(threadpool_size); + set_pool_size(threadpool_size); if(group_count == 0) { /* Something went wrong */ sql_print_error("Can't set threadpool size to %d",threadpool_size); - DBUG_RETURN(1); + DBUG_RETURN(-1); } PSI_register(mutex); PSI_register(cond); @@ -1552,7 +1636,7 @@ bool tp_init() DBUG_RETURN(0); } -void tp_end() +TP_pool_generic::~TP_pool_generic() { DBUG_ENTER("tp_end"); @@ -1571,13 +1655,10 @@ void tp_end() /** Ensure that poll descriptors are created when threadpool_size changes */ - -void tp_set_threadpool_size(uint size) +int TP_pool_generic::set_pool_size(uint size) { bool success= true; - if (!threadpool_started) - return; - + for(uint i=0; i< size; i++) { thread_group_t *group= &all_groups[i]; @@ -1596,20 +1677,20 @@ void tp_set_threadpool_size(uint size) if (!success) { group_count= i; - return; + return -1; } } group_count= size; + return 0; } -void tp_set_threadpool_stall_limit(uint limit) +int TP_pool_generic::set_stall_limit(uint limit) { - if (!threadpool_started) - return; mysql_mutex_lock(&(pool_timer.mutex)); pool_timer.tick_interval= limit; mysql_mutex_unlock(&(pool_timer.mutex)); mysql_cond_signal(&(pool_timer.cond)); + return 0; } @@ -1620,7 +1701,7 @@ void tp_set_threadpool_stall_limit(uint limit) Don't do any locking, it is not required for stats. */ -int tp_get_idle_thread_count() +int TP_pool_generic::get_idle_thread_count() { int sum=0; for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++) diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index 9b1d8f6a7d8..dec898d92bb 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -64,8 +64,9 @@ static void tp_log_warning(const char *msg, const char *fct) } -PTP_POOL pool; -DWORD fls; +static PTP_POOL pool; +static TP_CALLBACK_ENVIRON callback_environ; +static DWORD fls; static bool skip_completion_port_on_success = false; @@ -85,13 +86,16 @@ static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io); + +static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work); + static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance, PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result); -static void check_thread_init(); +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance); /* Get current time as Windows time */ static ulonglong now() @@ -101,74 +105,86 @@ static ulonglong now() return current_time; } -/* - Connection structure, encapsulates THD + structures for asynchronous - IO and pool. -*/ - -struct connection_t +struct TP_connection_win:public TP_connection { - THD *thd; +public: + TP_connection_win(CONNECT*); + ~TP_connection_win(); + virtual int init(); + virtual int start_io(); + virtual void set_io_timeout(int sec); + virtual void wait_begin(int type); + virtual void wait_end(); + + ulonglong timeout; + enum_vio_type vio_type; HANDLE handle; OVERLAPPED overlapped; - /* absolute time for wait timeout (as Windows time) */ - volatile ulonglong timeout; - TP_CALLBACK_ENVIRON callback_environ; + PTP_CALLBACK_INSTANCE callback_instance; PTP_IO io; PTP_TIMER timer; PTP_WAIT shm_read; - /* Callback instance, used to inform treadpool about long callbacks */ - PTP_CALLBACK_INSTANCE callback_instance; - CONNECT* connect; - bool logged_in; + PTP_WORK work; + bool long_callback; + }; +struct TP_connection *new_TP_connection(CONNECT *connect) +{ + TP_connection *c = new (std::nothrow) TP_connection_win(connect); + if (!c || c->init()) + { + delete c; + return 0; + } + return c; +} -void init_connection(connection_t *connection, CONNECT *connect) +void TP_pool_win::add(TP_connection *c) { - connection->logged_in = false; - connection->handle= 0; - connection->io= 0; - connection->shm_read= 0; - connection->timer= 0; - connection->logged_in = false; - connection->timeout= ULONGLONG_MAX; - connection->callback_instance= 0; - connection->thd= 0; - memset(&connection->overlapped, 0, sizeof(OVERLAPPED)); - InitializeThreadpoolEnvironment(&connection->callback_environ); - SetThreadpoolCallbackPool(&connection->callback_environ, pool); - connection->connect= connect; + SubmitThreadpoolWork(((TP_connection_win *)c)->work); } -int init_io(connection_t *connection, THD *thd) +TP_connection_win::TP_connection_win(CONNECT *c) : + TP_connection(c), + timeout(ULONGLONG_MAX), + callback_instance(0), + io(0), + shm_read(0), + timer(0), + work(0) { - connection->thd= thd; - Vio *vio = thd->net.vio; - switch(vio->type) +} + +#define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; } + +int TP_connection_win::init() +{ + + memset(&overlapped, 0, sizeof(OVERLAPPED)); + Vio *vio = connect->vio; + switch ((vio_type = vio->type)) { - case VIO_TYPE_SSL: - case VIO_TYPE_TCPIP: - connection->handle= (HANDLE)mysql_socket_getfd(connection->thd->net.vio->mysql_socket); - break; - case VIO_TYPE_NAMEDPIPE: - connection->handle= (HANDLE)vio->hPipe; - break; - case VIO_TYPE_SHARED_MEMORY: - connection->shm_read= CreateThreadpoolWait(shm_read_callback, connection, - &connection->callback_environ); - if (!connection->shm_read) - { - tp_log_warning("Allocation failed", "CreateThreadpoolWait"); - return -1; - } - break; - default: - abort(); + case VIO_TYPE_SSL: + case VIO_TYPE_TCPIP: + handle= (HANDLE)mysql_socket_getfd(vio->mysql_socket); + break; + case VIO_TYPE_NAMEDPIPE: + handle= (HANDLE)vio->hPipe; + break; + case VIO_TYPE_SHARED_MEMORY: + handle= vio->event_server_wrote; + break; + default: + abort(); } - if (connection->handle) + if (vio_type == VIO_TYPE_SHARED_MEMORY) + { + CHECK_ALLOC_ERROR(shm_read= CreateThreadpoolWait(shm_read_callback, this, &callback_environ)); + } + else { /* Performance tweaks (s. MSDN documentation)*/ UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE; @@ -176,25 +192,13 @@ int init_io(connection_t *connection, THD *thd) { flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; } - (void)SetFileCompletionNotificationModes(connection->handle, flags); - + (void)SetFileCompletionNotificationModes(handle, flags); /* Assign io completion callback */ - connection->io= CreateThreadpoolIo(connection->handle, - io_completion_callback, connection, &connection->callback_environ); - if(!connection->io) - { - tp_log_warning("Allocation failed", "CreateThreadpoolWait"); - return -1; - } - } - connection->timer= CreateThreadpoolTimer(timer_callback, connection, - &connection->callback_environ); - if (!connection->timer) - { - tp_log_warning("Allocation failed", "CreateThreadpoolWait"); - return -1; + CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ)); } + CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ)); return 0; } @@ -202,9 +206,8 @@ int init_io(connection_t *connection, THD *thd) /* Start asynchronous read */ -int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) +int TP_connection_win::start_io() { - /* Start async read */ DWORD num_bytes = 0; static char c; WSABUF buf; @@ -214,33 +217,20 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) DWORD last_error= 0; int retval; - Vio *vio= connection->thd->net.vio; - - if (vio->type == VIO_TYPE_SHARED_MEMORY) + if (shm_read) { - SetThreadpoolWait(connection->shm_read, vio->event_server_wrote, NULL); - return 0; - } - if (vio->type == VIO_CLOSED) - { - return -1; + SetThreadpoolWait(shm_read, handle, NULL); + return 0; } - - DBUG_ASSERT(vio->type == VIO_TYPE_TCPIP || - vio->type == VIO_TYPE_SSL || - vio->type == VIO_TYPE_NAMEDPIPE); - - OVERLAPPED *overlapped= &connection->overlapped; - PTP_IO io= connection->io; StartThreadpoolIo(io); - if (vio->type == VIO_TYPE_TCPIP || vio->type == VIO_TYPE_SSL) + if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL) { /* Start async io (sockets). */ - if (WSARecv(mysql_socket_getfd(vio->mysql_socket) , &buf, 1, &num_bytes, &flags, - overlapped, NULL) == 0) + if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags, + &overlapped, NULL) == 0) { - retval= last_error= 0; + retval= last_error= 0; } else { @@ -251,7 +241,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) else { /* Start async io (named pipe) */ - if (ReadFile(vio->hPipe, &c, 0, &num_bytes ,overlapped)) + if (ReadFile(handle, &c, 0, &num_bytes,&overlapped)) { retval= last_error= 0; } @@ -272,7 +262,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) if(skip_completion_port_on_success) { CancelThreadpoolIo(io); - io_completion_callback(instance, connection, overlapped, last_error, + io_completion_callback(callback_instance, this, &overlapped, last_error, num_bytes, io); } return 0; @@ -288,81 +278,81 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance) return -1; } - -int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance) -{ - if ((connection->thd= threadpool_add_connection(connection->connect, connection)) - && init_io(connection, connection->thd) == 0 - && start_io(connection, instance) == 0) - { - return 0; - } - return -1; -} - /* - Recalculate wait timeout, maybe reset timer. + Recalculate wait timeout, maybe reset timer. */ -void set_wait_timeout(connection_t *connection, ulonglong old_timeout) +void TP_connection_win::set_io_timeout(int timeout_sec) { - ulonglong new_timeout = now() + - 10000000LL*connection->thd->variables.net_wait_timeout; + ulonglong old_timeout= timeout; + ulonglong new_timeout = now() + 10000000LL * timeout_sec; if (new_timeout < old_timeout) { - SetThreadpoolTimer(connection->timer, (PFILETIME) &new_timeout, 0, 1000); + SetThreadpoolTimer(timer, (PFILETIME)&new_timeout, 0, 1000); } - connection->timeout = new_timeout; + /* new_timeout > old_timeout case is handled by expiring timer. */ + timeout = new_timeout; } -/* Connection destructor */ -void destroy_connection(connection_t *connection, PTP_CALLBACK_INSTANCE instance) +TP_connection_win::~TP_connection_win() { - if (instance) - DisassociateCurrentThreadFromCallback(instance); - if (connection->io) - { - WaitForThreadpoolIoCallbacks(connection->io, TRUE); - CloseThreadpoolIo(connection->io); - } + if (io) + CloseThreadpoolIo(io); - if(connection->shm_read) - { - WaitForThreadpoolWaitCallbacks(connection->shm_read, TRUE); - CloseThreadpoolWait(connection->shm_read); - } + if (shm_read) + CloseThreadpoolWait(shm_read); + + if (work) + CloseThreadpoolWork(work); - if(connection->timer) + if (timer) { - SetThreadpoolTimer(connection->timer, 0, 0, 0); - WaitForThreadpoolTimerCallbacks(connection->timer, TRUE); - CloseThreadpoolTimer(connection->timer); + WaitForThreadpoolTimerCallbacks(timer, TRUE); + CloseThreadpoolTimer(timer); } - - if (connection->thd) +} + +void TP_connection_win::wait_begin(int type) +{ + + /* + Signal to the threadpool whenever callback can run long. Currently, binlog + waits are a good candidate, its waits are really long + */ + if (type == THD_WAIT_BINLOG) { - threadpool_remove_connection(connection->thd); + if (!long_callback) + { + CallbackMayRunLong(callback_instance); + long_callback= true; + } } - - DestroyThreadpoolEnvironment(&connection->callback_environ); } - +void TP_connection_win::wait_end() +{ + /* Do we need to do anything ? */ +} /* This function should be called first whenever a callback is invoked in the threadpool, does my_thread_init() if not yet done */ extern ulong thread_created; -static void check_thread_init() +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance) { if (FlsGetValue(fls) == NULL) { + /* Running in new worker thread*/ FlsSetValue(fls, (void *)1); statistic_increment(thread_created, &LOCK_status); InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); + my_thread_init(); } + TP_connection_win *c = (TP_connection_win *)context; + c->callback_instance = instance; + c->long_callback = false; } @@ -375,153 +365,61 @@ static VOID WINAPI thread_destructor(void *data) if(data) { InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads); + my_thread_end(); } } -/* Scheduler callback : init */ -bool tp_init(void) -{ - fls= FlsAlloc(thread_destructor); - pool= CreateThreadpool(NULL); - if(!pool) - { - sql_print_error("Can't create threadpool. " - "CreateThreadpool() failed with %d. Likely cause is memory pressure", - GetLastError()); - exit(1); - } - - if (threadpool_max_threads) - { - SetThreadpoolThreadMaximum(pool,threadpool_max_threads); - } - - if (threadpool_min_threads) - { - if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads)) - { - tp_log_warning( "Can't set threadpool minimum threads", - "SetThreadpoolThreadMinimum"); - } - } - - /* - Control stack size (OS must be Win7 or later, plus corresponding SDK) - */ -#if _MSC_VER >=1600 - if (SetThreadpoolStackInformation) - { - TP_POOL_STACK_INFORMATION stackinfo; - stackinfo.StackCommit = 0; - stackinfo.StackReserve = (SIZE_T)my_thread_stack_size; - if (!SetThreadpoolStackInformation(pool, &stackinfo)) - { - tp_log_warning("Can't set threadpool stack size", - "SetThreadpoolStackInformation"); - } - } -#endif - - return 0; -} - -/** - Scheduler callback : Destroy the scheduler. -*/ -void tp_end(void) +static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context) { - if(pool) - { - SetThreadpoolThreadMaximum(pool, 0); - CloseThreadpool(pool); - } + pre_callback(context, instance); + tp_callback((TP_connection *)context); } + /* Handle read completion/notification. */ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io) { - if(instance) - { - check_thread_init(); - } - - connection_t *connection = (connection_t*)context; - - if (io_result != ERROR_SUCCESS) - goto error; - - THD *thd= connection->thd; - ulonglong old_timeout = connection->timeout; - connection->timeout = ULONGLONG_MAX; - connection->callback_instance= instance; - if (threadpool_process_request(connection->thd)) - goto error; - - set_wait_timeout(connection, old_timeout); - if(start_io(connection, instance)) - goto error; - - return; - -error: - /* Some error has occurred. */ - - destroy_connection(connection, instance); - free(connection); + TP_connection_win *c= (TP_connection_win *)context; + /* + Execute high priority connections immediately. + 'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback) + which makes Windows threadpool place the items at the end of its internal work queue. + */ + if (c->priority == TP_PRIORITY_HIGH) + tp_callback(instance, context); + else + SubmitThreadpoolWork(c->work); } -/* Simple callback for login */ -static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance, - PVOID context, PTP_WORK work) -{ - if(instance) - { - check_thread_init(); - } - - connection_t *connection =(connection_t *)context; - if (login(connection, instance) != 0) - { - destroy_connection(connection, instance); - free(connection); - } -} - /* Timer callback. Invoked when connection times out (wait_timeout) */ -static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, +static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, PVOID parameter, PTP_TIMER timer) { - check_thread_init(); - - connection_t *con= (connection_t*)parameter; - ulonglong timeout= con->timeout; - - if (timeout <= now()) + TP_connection_win *c = (TP_connection_win *)parameter; + if (c->timeout <= now()) { - con->thd->killed = KILL_CONNECTION; - if(con->thd->net.vio) - vio_shutdown(con->thd->net.vio, SD_BOTH); + tp_timeout_handler(c); } - else if(timeout != ULONGLONG_MAX) + else { - /* - Reset timer. - There is a tiny possibility of a race condition, since the value of timeout - could have changed to smaller value in the thread doing io callback. + /* + Reset timer. + There is a tiny possibility of a race condition, since the value of timeout + could have changed to smaller value in the thread doing io callback. - Given the relative unimportance of the wait timeout, we accept race + Given the relative unimportance of the wait timeout, we accept race condition. - */ - SetThreadpoolTimer(timer, (PFILETIME)&timeout, 0, 1000); + */ + SetThreadpoolTimer(timer, (PFILETIME)&c->timeout, 0, 1000); } } @@ -530,10 +428,11 @@ static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, Shared memory read callback. Invoked when read event is set on connection. */ + static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result) { - connection_t *con= (connection_t *)context; + TP_connection_win *c= (TP_connection_win *)context; /* Disarm wait. */ SetThreadpoolWait(wait, NULL, NULL); @@ -542,97 +441,106 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance, and the current state is "not set". Thus we need to reset the event again, or vio_read will hang. */ - HANDLE h = con->thd->net.vio->event_server_wrote; - SetEvent(h); - io_completion_callback(instance, context, NULL, 0, 0 , 0); + SetEvent(c->handle); + tp_callback(instance, context); } -/* - Notify the thread pool about a new connection. -*/ +static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work) +{ + tp_callback(instance, context); +} + +TP_pool_win::TP_pool_win() +{} -void tp_add_connection(CONNECT *connect) +int TP_pool_win::init() { - connection_t *con; - con= (connection_t *)malloc(sizeof(connection_t)); - DBUG_EXECUTE_IF("simulate_failed_connection_1", free(con);con= 0; ); - if (!con) + fls= FlsAlloc(thread_destructor); + pool= CreateThreadpool(NULL); + + if (!pool) { - tp_log_warning("Allocation failed", "tp_add_connection"); - connect->close_and_delete(); - return; + sql_print_error("Can't create threadpool. " + "CreateThreadpool() failed with %d. Likely cause is memory pressure", + GetLastError()); + return -1; } - init_connection(con, connect); + InitializeThreadpoolEnvironment(&callback_environ); + SetThreadpoolCallbackPool(&callback_environ, pool); - /* Try to login asynchronously, using threads in the pool */ - PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ); - if (wrk) + if (threadpool_max_threads) { - SubmitThreadpoolWork(wrk); - CloseThreadpoolWork(wrk); + SetThreadpoolThreadMaximum(pool, threadpool_max_threads); } - else + + if (threadpool_min_threads) { - /* Likely memory pressure */ - connect->close_and_delete(); + if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads)) + { + tp_log_warning("Can't set threadpool minimum threads", + "SetThreadpoolThreadMinimum"); + } } -} - - -/** - Sets the number of idle threads the thread pool maintains in anticipation of new - requests. -*/ -void tp_set_min_threads(uint val) -{ - if (pool) - SetThreadpoolThreadMinimum(pool, val); -} - -void tp_set_max_threads(uint val) -{ - if (pool) - SetThreadpoolThreadMaximum(pool, val); -} - -void tp_wait_begin(THD *thd, int type) -{ - DBUG_ASSERT(thd); /* - Signal to the threadpool whenever callback can run long. Currently, binlog - waits are a good candidate, its waits are really long + Control stack size (OS must be Win7 or later) */ - if (type == THD_WAIT_BINLOG) + if (SetThreadpoolStackInformation) { - connection_t *connection= (connection_t *)thd->event_scheduler.data; - if(connection && connection->callback_instance) + TP_POOL_STACK_INFORMATION stackinfo; + stackinfo.StackCommit = 0; + stackinfo.StackReserve = (SIZE_T)my_thread_stack_size; + if (!SetThreadpoolStackInformation(pool, &stackinfo)) { - CallbackMayRunLong(connection->callback_instance); - /* - Reset instance, to avoid calling CallbackMayRunLong twice within - the same callback (it is an error according to docs). - */ - connection->callback_instance= 0; + tp_log_warning("Can't set threadpool stack size", + "SetThreadpoolStackInformation"); } } + return 0; } -void tp_wait_end(THD *thd) + +/** + Scheduler callback : Destroy the scheduler. +*/ +TP_pool_win::~TP_pool_win() { - /* Do we need to do anything ? */ + if (!pool) + return; + DestroyThreadpoolEnvironment(&callback_environ); + SetThreadpoolThreadMaximum(pool, 0); + CloseThreadpool(pool); + if (!tp_stats.num_worker_threads) + FlsFree(fls); } - - /** - Number of idle threads in pool. - This info is not available in Windows implementation, - thus function always returns 0. + Sets the number of idle threads the thread pool maintains in anticipation of new + requests. */ -int tp_get_idle_thread_count() +int TP_pool_win::set_min_threads(uint val) +{ + SetThreadpoolThreadMinimum(pool, val); + return 0; +} + +int TP_pool_win::set_max_threads(uint val) { + SetThreadpoolThreadMaximum(pool, val); return 0; } + +TP_connection *TP_pool_win::new_connection(CONNECT *connect) +{ + TP_connection *c= new (std::nothrow) TP_connection_win(connect); + if (!c ) + return 0; + if (c->init()) + { + delete c; + return 0; + } + return c; +} |