summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2016-09-21 14:28:42 +0000
committerVladislav Vaintroub <wlad@mariadb.com>2016-09-22 17:01:28 +0000
commitf7a7c0c2fec3dcca331bb529f8314273360c72ae (patch)
tree2e04f4036bd7def676d85690e67e393ec0c41a8e /sql
parentf32a5115584c9b33a2163df57830ad335cd2b3ab (diff)
downloadmariadb-git-f7a7c0c2fec3dcca331bb529f8314273360c72ae.tar.gz
MDEV-10297 Add priorization to threadpool
Also MDEV-10385 Threadpool refactoring
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt4
-rw-r--r--sql/mysqld.cc2
-rw-r--r--sql/sql_class.h1
-rw-r--r--sql/sys_vars.cc35
-rw-r--r--sql/threadpool.h121
-rw-r--r--sql/threadpool_common.cc214
-rw-r--r--sql/threadpool_generic.cc (renamed from sql/threadpool_unix.cc)573
-rw-r--r--sql/threadpool_win.cc550
8 files changed, 886 insertions, 614 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 65b7775dc3c..28072375bbc 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -157,9 +157,9 @@ IF (CMAKE_SYSTEM_NAME MATCHES "Linux" OR
ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS)
IF(WIN32)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
- ELSE()
- SET(SQL_SOURCE ${SQL_SOURCE} threadpool_unix.cc)
ENDIF()
+ SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc)
+
ENDIF()
MYSQL_ADD_PLUGIN(partition ha_partition.cc STORAGE_ENGINE DEFAULT STATIC_ONLY
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 789cd6788aa..cf9e99b54a6 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -4425,7 +4425,7 @@ static int init_common_variables()
#endif /* HAVE_SOLARIS_LARGE_PAGES */
-#if defined(HAVE_POOL_OF_THREADS) && !defined(_WIN32)
+#if defined(HAVE_POOL_OF_THREADS)
if (IS_SYSVAR_AUTOSIZE(&threadpool_size))
SYSVAR_AUTOSIZE(threadpool_size, my_getncpus());
#endif
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 2240b5fe0a9..d2a489bfef6 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -697,6 +697,7 @@ typedef struct system_variables
my_bool session_track_schema;
my_bool session_track_state_change;
+ ulong threadpool_priority;
} SV;
/**
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 55f2864a93e..5cc81585ed5 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -3241,23 +3241,17 @@ static Sys_var_ulong Sys_thread_cache_size(
#ifdef HAVE_POOL_OF_THREADS
static bool fix_tp_max_threads(sys_var *, THD *, enum_var_type)
{
-#ifdef _WIN32
tp_set_max_threads(threadpool_max_threads);
-#endif
return false;
}
-#ifdef _WIN32
static bool fix_tp_min_threads(sys_var *, THD *, enum_var_type)
{
tp_set_min_threads(threadpool_min_threads);
return false;
}
-#endif
-
-#ifndef _WIN32
static bool check_threadpool_size(sys_var *self, THD *thd, set_var *var)
{
ulonglong v= var->save_result.ulonglong_value;
@@ -3282,7 +3276,6 @@ static bool fix_threadpool_stall_limit(sys_var*, THD*, enum_var_type)
tp_set_threadpool_stall_limit(threadpool_stall_limit);
return false;
}
-#endif
#ifdef _WIN32
static Sys_var_uint Sys_threadpool_min_threads(
@@ -3293,7 +3286,24 @@ static Sys_var_uint Sys_threadpool_min_threads(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_tp_min_threads)
);
-#else
+
+static const char *threadpool_mode_names[]={ "windows", "generic", 0 };
+static Sys_var_enum Sys_threadpool_mode(
+ "thread_pool_mode",
+ "Chose implementation of the threadpool",
+ READ_ONLY GLOBAL_VAR(threadpool_mode), CMD_LINE(REQUIRED_ARG),
+ threadpool_mode_names, DEFAULT(TP_MODE_WINDOWS)
+ );
+#endif
+
+static const char *threadpool_priority_names[]={ "high", "low", "auto", 0 };
+static Sys_var_enum Sys_thread_pool_priority(
+ "thread_pool_priority",
+ "Threadpool priority. High priority connections usually start executing earlier than low priority."
+ "If priority set to 'auto', the the actual priority(low or high) is determined based on whether or not connection is inside transaction.",
+ SESSION_VAR(threadpool_priority), CMD_LINE(REQUIRED_ARG),
+ threadpool_priority_names, DEFAULT(TP_PRIORITY_AUTO));
+
static Sys_var_uint Sys_threadpool_idle_thread_timeout(
"thread_pool_idle_timeout",
"Timeout in seconds for an idle thread in the thread pool."
@@ -3328,7 +3338,7 @@ static Sys_var_uint Sys_threadpool_stall_limit(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_threadpool_stall_limit)
);
-#endif /* !WIN32 */
+
static Sys_var_uint Sys_threadpool_max_threads(
"thread_pool_max_threads",
"Maximum allowed number of worker threads in the thread pool",
@@ -3337,6 +3347,13 @@ static Sys_var_uint Sys_threadpool_max_threads(
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_tp_max_threads)
);
+
+static Sys_var_uint Sys_threadpool_threadpool_prio_kickup_timer(
+ "thread_pool_prio_kickup_timer",
+ "The number of milliseconds before a dequeued low-priority statement is moved to the high-priority queue",
+ GLOBAL_VAR(threadpool_prio_kickup_timer), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0, UINT_MAX), DEFAULT(1000), BLOCK_SIZE(1)
+);
#endif /* HAVE_POOL_OF_THREADS */
/**
diff --git a/sql/threadpool.h b/sql/threadpool.h
index 7ddc661565f..17c8b6ea4ea 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -23,28 +23,19 @@ extern uint threadpool_max_size;
extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/
extern uint threadpool_max_threads; /* Maximum threads in pool */
extern uint threadpool_oversubscribe; /* Maximum active threads in group */
+extern uint threadpool_prio_kickup_timer; /* Time before low prio item gets prio boost */
+#ifdef _WIN32
+extern uint threadpool_mode; /* Thread pool implementation , windows or generic */
+#define TP_MODE_WINDOWS 0
+#define TP_MODE_GENERIC 1
+#endif
+struct TP_connection;
+extern void tp_callback(TP_connection *c);
+extern void tp_timeout_handler(TP_connection *c);
-/* Common thread pool routines, suitable for different implementations */
-extern void threadpool_remove_connection(THD *thd);
-extern int threadpool_process_request(THD *thd);
-extern THD* threadpool_add_connection(CONNECT *connect, void *scheduled_data);
-/*
- Functions used by scheduler.
- OS-specific implementations are in
- threadpool_unix.cc or threadpool_win.cc
-*/
-extern bool tp_init();
-extern void tp_add_connection(CONNECT *);
-extern void tp_wait_begin(THD *, int);
-extern void tp_wait_end(THD*);
-extern void tp_post_kill_notification(THD *thd);
-extern void tp_end(void);
-
-/* Used in SHOW for threadpool_idle_thread_count */
-extern int tp_get_idle_thread_count();
/*
Threadpool statistics
@@ -63,9 +54,103 @@ extern void tp_set_min_threads(uint val);
extern void tp_set_max_threads(uint val);
extern void tp_set_threadpool_size(uint val);
extern void tp_set_threadpool_stall_limit(uint val);
+extern int tp_get_idle_thread_count();
+extern int tp_get_thread_count();
/* Activate threadpool scheduler */
extern void tp_scheduler(void);
extern int show_threadpool_idle_threads(THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope);
+
+enum TP_PRIORITY {
+ TP_PRIORITY_HIGH,
+ TP_PRIORITY_LOW,
+ TP_PRIORITY_AUTO
+};
+
+
+enum TP_STATE
+{
+ TP_STATE_IDLE,
+ TP_STATE_RUNNING,
+};
+
+/*
+ Connection structure, encapsulates THD + structures for asynchronous
+ IO and pool.
+
+ Platform specific parts are specified in subclasses called connection_t,
+ inside threadpool_win.cc and threadpool_unix.cc
+*/
+
+struct TP_connection
+{
+ THD* thd;
+ CONNECT* connect;
+ TP_STATE state;
+ TP_PRIORITY priority;
+ TP_connection(CONNECT *c) :
+ thd(0),
+ connect(c),
+ state(TP_STATE_IDLE),
+ priority(TP_PRIORITY_HIGH)
+ {}
+
+ virtual ~TP_connection()
+ {};
+
+ /* Initialize io structures windows threadpool, epoll etc */
+ virtual int init() = 0;
+
+ virtual void set_io_timeout(int sec) = 0;
+
+ /* Read for the next client command (async) with specified timeout */
+ virtual int start_io() = 0;
+
+ virtual void wait_begin(int type)= 0;
+ virtual void wait_end() = 0;
+
+};
+
+
+struct TP_pool
+{
+ virtual ~TP_pool(){};
+ virtual int init()= 0;
+ virtual TP_connection *new_connection(CONNECT *)= 0;
+ virtual void add(TP_connection *c)= 0;
+ virtual int set_max_threads(uint){ return 0; }
+ virtual int set_min_threads(uint){ return 0; }
+ virtual int set_pool_size(uint){ return 0; }
+ virtual int set_idle_timeout(uint){ return 0; }
+ virtual int set_oversubscribe(uint){ return 0; }
+ virtual int set_stall_limit(uint){ return 0; }
+ virtual int get_thread_count() { return tp_stats.num_worker_threads; }
+ virtual int get_idle_thread_count(){ return 0; }
+};
+
+#ifdef _WIN32
+struct TP_pool_win:TP_pool
+{
+ TP_pool_win();
+ virtual int init();
+ virtual ~TP_pool_win();
+ virtual TP_connection *new_connection(CONNECT *c);
+ virtual void add(TP_connection *);
+ virtual int set_max_threads(uint);
+ virtual int set_min_threads(uint);
+};
+#endif
+
+struct TP_pool_generic :TP_pool
+{
+ TP_pool_generic();
+ ~TP_pool_generic();
+ virtual int init();
+ virtual TP_connection *new_connection(CONNECT *c);
+ virtual void add(TP_connection *);
+ virtual int set_pool_size(uint);
+ virtual int set_stall_limit(uint);
+ virtual int get_idle_thread_count();
+};
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index d6c343dc04e..2308f4277d6 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -34,14 +34,25 @@ uint threadpool_max_size;
uint threadpool_stall_limit;
uint threadpool_max_threads;
uint threadpool_oversubscribe;
+uint threadpool_mode;
+uint threadpool_prio_kickup_timer;
/* Stats */
TP_STATISTICS tp_stats;
+static void threadpool_remove_connection(THD *thd);
+static int threadpool_process_request(THD *thd);
+static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
+
extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
extern bool do_command(THD*);
+static inline TP_connection *get_TP_connection(THD *thd)
+{
+ return (TP_connection *)thd->event_scheduler.data;
+}
+
/*
Worker threads contexts, and THD contexts.
=========================================
@@ -105,15 +116,81 @@ static void thread_attach(THD* thd)
#endif
}
+/*
+ Determine connection priority , using current
+ transaction state and 'threadpool_priority' variable value.
+*/
+static TP_PRIORITY get_priority(TP_connection *c)
+{
+ DBUG_ASSERT(c->thd == current_thd);
+ TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
+ if (prio == TP_PRIORITY_AUTO)
+ {
+ return c->thd->transaction.is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
+ }
+ return prio;
+}
-THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
+
+void tp_callback(TP_connection *c)
{
- THD *thd= NULL;
- int error=1;
+ DBUG_ASSERT(c);
Worker_thread_context worker_context;
worker_context.save();
+ THD *thd= c->thd;
+
+ c->state = TP_STATE_RUNNING;
+
+ if (!thd)
+ {
+ /* No THD, need to login first. */
+ DBUG_ASSERT(c->connect);
+ thd= c->thd= threadpool_add_connection(c->connect, c);
+ if (!thd)
+ {
+ /* Bail out on connect error.*/
+ goto error;
+ }
+ c->connect= 0;
+ }
+ else if (threadpool_process_request(thd))
+ {
+ /* QUIT or an error occured. */
+ goto error;
+ }
+
+ /* Set priority */
+ c->priority= get_priority(c);
+
+ /* Read next command from client. */
+ c->set_io_timeout(thd->variables.net_wait_timeout);
+ c->state= TP_STATE_IDLE;
+ if (c->start_io())
+ goto error;
+
+ worker_context.restore();
+ return;
+
+error:
+ c->thd= 0;
+ delete c;
+
+ if (thd)
+ {
+ threadpool_remove_connection(thd);
+ }
+ worker_context.restore();
+}
+
+
+static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
+{
+ THD *thd= NULL;
+ int error=1;
+
+
/*
Create a new connection context: mysys_thread_var and PSI thread
Store them in THD.
@@ -137,7 +214,6 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
#endif
my_thread_end();
}
- worker_context.restore();
return NULL;
}
delete connect;
@@ -184,17 +260,14 @@ THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
threadpool_remove_connection(thd);
thd= NULL;
}
- worker_context.restore();
return thd;
}
-void threadpool_remove_connection(THD *thd)
+static void threadpool_remove_connection(THD *thd)
{
- Worker_thread_context worker_context;
- worker_context.save();
thread_attach(thd);
-
+ thd->event_scheduler.data= 0;
thd->net.reading_or_writing = 0;
end_connection(thd);
close_connection(thd, 0);
@@ -206,19 +279,14 @@ void threadpool_remove_connection(THD *thd)
mysys thread_var and PSI thread.
*/
my_thread_end();
-
- worker_context.restore();
}
/**
Process a single client request or a single batch.
*/
-int threadpool_process_request(THD *thd)
+static int threadpool_process_request(THD *thd)
{
int retval= 0;
- Worker_thread_context worker_context;
- worker_context.save();
-
thread_attach(thd);
if (thd->killed >= KILL_CONNECTION)
@@ -268,7 +336,6 @@ int threadpool_process_request(THD *thd)
}
end:
- worker_context.restore();
return retval;
}
@@ -286,6 +353,119 @@ static bool tp_end_thread(THD *, bool)
return 0;
}
+static TP_pool *pool;
+
+static bool tp_init()
+{
+
+#ifdef _WIN32
+ if (threadpool_mode == TP_MODE_WINDOWS)
+ pool= new (std::nothrow) TP_pool_win;
+ else
+ pool= new (std::nothrow) TP_pool_generic;
+#else
+ pool= new (std::nothrow) TP_pool_generic;
+#endif
+ if (!pool)
+ return true;
+ if (pool->init())
+ {
+ delete pool;
+ pool= 0;
+ return true;
+ }
+ return false;
+}
+
+static void tp_add_connection(CONNECT *connect)
+{
+ TP_connection *c= pool->new_connection(connect);
+ DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
+ if (c)
+ pool->add(c);
+ else
+ connect->close_and_delete();
+}
+
+int tp_get_idle_thread_count()
+{
+ return pool? pool->get_idle_thread_count(): 0;
+}
+
+int tp_get_thread_count()
+{
+ return pool ? pool->get_thread_count() : 0;
+}
+
+void tp_set_min_threads(uint val)
+{
+ if (pool)
+ pool->set_min_threads(val);
+}
+
+
+void tp_set_max_threads(uint val)
+{
+ if (pool)
+ pool->set_max_threads(val);
+}
+
+void tp_set_threadpool_size(uint val)
+{
+ if (pool)
+ pool->set_pool_size(val);
+}
+
+
+void tp_set_threadpool_stall_limit(uint val)
+{
+ if (pool)
+ pool->set_stall_limit(val);
+}
+
+
+void tp_timeout_handler(TP_connection *c)
+{
+ if (c->state != TP_STATE_IDLE)
+ return;
+ THD *thd=c->thd;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->killed= KILL_CONNECTION;
+ c->priority= TP_PRIORITY_HIGH;
+ post_kill_notification(thd);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+}
+
+
+static void tp_wait_begin(THD *thd, int type)
+{
+ TP_connection *c = get_TP_connection(thd);
+ if (c)
+ c->wait_begin(type);
+}
+
+
+static void tp_wait_end(THD *thd)
+{
+ TP_connection *c = get_TP_connection(thd);
+ if (c)
+ c->wait_end();
+}
+
+
+static void tp_end()
+{
+ delete pool;
+}
+
+static void tp_post_kill_notification(THD *thd)
+{
+ TP_connection *c= get_TP_connection(thd);
+ if (c)
+ c->priority= TP_PRIORITY_HIGH;
+ post_kill_notification(thd);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -296,7 +476,7 @@ static scheduler_functions tp_scheduler_functions=
tp_add_connection, // add_connection
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
- post_kill_notification, // post_kill_notification
+ tp_post_kill_notification, // post kill notification
tp_end_thread, // Dummy function
tp_end // end
};
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_generic.cc
index 4079091e217..87c74d18aea 100644
--- a/sql/threadpool_unix.cc
+++ b/sql/threadpool_generic.cc
@@ -22,6 +22,17 @@
#ifdef HAVE_POOL_OF_THREADS
+#ifdef _WIN32
+/* AIX may define this, too ?*/
+#define HAVE_IOCP
+#endif
+
+#ifdef HAVE_IOCP
+#define OPTIONAL_IO_POLL_READ_PARAM &overlapped
+#else
+#define OPTIONAL_IO_POLL_READ_PARAM 0
+#endif
+
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
@@ -38,10 +49,23 @@ typedef struct kevent native_event;
#elif defined (__sun)
#include <port.h>
typedef port_event_t native_event;
+#elif defined (HAVE_IOCP)
+typedef OVERLAPPED_ENTRY native_event;
#else
#error threadpool is not available on this platform
#endif
+
+static void io_poll_close(int fd)
+{
+#ifdef _WIN32
+ CloseHandle((HANDLE)fd);
+#else
+ close(fd);
+#endif
+}
+
+
/** Maximum number of native events a listener can read in one go */
#define MAX_EVENTS 1024
@@ -108,32 +132,45 @@ typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
>
worker_list_t;
-struct connection_t
+struct TP_connection_generic:public TP_connection
{
+ TP_connection_generic(CONNECT *c);
+ ~TP_connection_generic();
+
+ virtual int init(){ return 0; };
+ virtual void set_io_timeout(int sec);
+ virtual int start_io();
+ virtual void wait_begin(int type);
+ virtual void wait_end();
- THD *thd;
thread_group_t *thread_group;
- connection_t *next_in_queue;
- connection_t **prev_in_queue;
+ TP_connection_generic *next_in_queue;
+ TP_connection_generic **prev_in_queue;
ulonglong abs_wait_timeout;
- CONNECT* connect;
- bool logged_in;
+ ulonglong dequeue_time;
bool bound_to_poll_descriptor;
- bool waiting;
+ int waiting;
+#ifdef HAVE_IOCP
+ OVERLAPPED overlapped;
+#endif
};
-typedef I_P_List<connection_t,
- I_P_List_adapter<connection_t,
- &connection_t::next_in_queue,
- &connection_t::prev_in_queue>,
+typedef TP_connection_generic TP_connection_generic;
+
+typedef I_P_List<TP_connection_generic,
+ I_P_List_adapter<TP_connection_generic,
+ &TP_connection_generic::next_in_queue,
+ &TP_connection_generic::prev_in_queue>,
I_P_List_null_counter,
- I_P_List_fast_push_back<connection_t> >
+ I_P_List_fast_push_back<TP_connection_generic> >
connection_queue_t;
+const int NQUEUES=2; /* We have high and low priority queues*/
+
struct thread_group_t
{
mysql_mutex_t mutex;
- connection_queue_t queue;
+ connection_queue_t queues[NQUEUES];
worker_list_t waiting_threads;
worker_thread_t *listener;
pthread_attr_t *pthread_attr;
@@ -147,9 +184,8 @@ struct thread_group_t
ulonglong last_thread_creation_time;
int shutdown_pipe[2];
bool shutdown;
- bool stalled;
-
-} MY_ALIGNED(512);
+ bool stalled;
+} MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE);
static thread_group_t *all_groups;
static uint group_count;
@@ -175,15 +211,13 @@ struct pool_timer_t
static pool_timer_t pool_timer;
-static void queue_put(thread_group_t *thread_group, connection_t *connection);
+static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection);
+static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt);
static int wake_thread(thread_group_t *thread_group);
-static void handle_event(connection_t *connection);
static int wake_or_create_thread(thread_group_t *thread_group);
static int create_worker(thread_group_t *thread_group);
static void *worker_main(void *param);
static void check_stall(thread_group_t *thread_group);
-static void connection_abort(connection_t *connection);
-static void set_wait_timeout(connection_t *connection);
static void set_next_timeout_check(ulonglong abstime);
static void print_pool_blocked_message(bool);
@@ -194,12 +228,12 @@ static void print_pool_blocked_message(bool);
This maps to different APIs on different Unixes.
Supported are currently Linux with epoll, Solaris with event ports,
- OSX and BSD with kevent. All those API's are used with one-shot flags
+ OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags
(the event is signalled once client has written something into the socket,
then socket is removed from the "poll-set" until the command is finished,
and we need to re-arm/re-register socket)
- No implementation for poll/select/AIO is currently provided.
+ No implementation for poll/select is currently provided.
The API closely resembles all of the above mentioned platform APIs
and consists of following functions.
@@ -208,7 +242,7 @@ static void print_pool_blocked_message(bool);
Creates an io_poll descriptor
On Linux: epoll_create()
- - io_poll_associate_fd(int poll_fd, int fd, void *data)
+ - io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt)
Associate file descriptor with io poll descriptor
On Linux : epoll_ctl(..EPOLL_CTL_ADD))
@@ -217,7 +251,7 @@ static void print_pool_blocked_message(bool);
On Linux: epoll_ctl(..EPOLL_CTL_DEL)
- - io_poll_start_read(int poll_fd,int fd, void *data)
+ - io_poll_start_read(int poll_fd,int fd, void *data, void *opt)
The same as io_poll_associate_fd(), but cannot be used before
io_poll_associate_fd() was called.
On Linux : epoll_ctl(..EPOLL_CTL_MOD)
@@ -245,7 +279,7 @@ static int io_poll_create()
}
-int io_poll_associate_fd(int pollfd, int fd, void *data)
+int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
@@ -256,7 +290,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data)
-int io_poll_start_read(int pollfd, int fd, void *data)
+int io_poll_start_read(int pollfd, int fd, void *data, void *)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
@@ -315,7 +349,7 @@ int io_poll_create()
return kqueue();
}
-int io_poll_start_read(int pollfd, int fd, void *data)
+int io_poll_start_read(int pollfd, int fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
@@ -324,12 +358,12 @@ int io_poll_start_read(int pollfd, int fd, void *data)
}
-int io_poll_associate_fd(int pollfd, int fd, void *data)
+int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
0, 0, data);
- return io_poll_start_read(pollfd,fd, data);
+ return io_poll_start_read(pollfd,fd, data, 0);
}
@@ -371,14 +405,14 @@ static int io_poll_create()
return port_create();
}
-int io_poll_start_read(int pollfd, int fd, void *data)
+int io_poll_start_read(int pollfd, int fd, void *data, void *)
{
return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
}
-static int io_poll_associate_fd(int pollfd, int fd, void *data)
+static int io_poll_associate_fd(int pollfd, int fd, void *data, void *)
{
- return io_poll_start_read(pollfd, fd, data);
+ return io_poll_start_read(pollfd, fd, data, 0);
}
int io_poll_disassociate_fd(int pollfd, int fd)
@@ -410,23 +444,115 @@ static void* native_event_get_userdata(native_event *event)
{
return event->portev_user;
}
+
+#elif defined(HAVE_IOCP)
+
+static int io_poll_create()
+{
+ HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
+ return (int)h;
+}
+
+
+int io_poll_start_read(int pollfd, int fd, void *, void *opt)
+{
+ DWORD num_bytes = 0;
+ static char c;
+
+ WSABUF buf;
+ buf.buf= &c;
+ buf.len= 0;
+ DWORD flags=0;
+
+ if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0)
+ return 0;
+
+ if (GetLastError() == ERROR_IO_PENDING)
+ return 0;
+
+ return 1;
+}
+
+
+static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt)
+{
+ HANDLE h= CreateIoCompletionPort((HANDLE)fd, (HANDLE)pollfd, (ULONG_PTR)data, 0);
+ if (!h)
+ return -1;
+ return io_poll_start_read(pollfd,fd, 0, opt);
+}
+
+
+int io_poll_disassociate_fd(int pollfd, int fd)
+{
+ /* Not possible to unbind/rebind file descriptor in IOCP. */
+ return 0;
+}
+
+
+int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
+{
+ ULONG n;
+ BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events,
+ maxevents, &n, timeout_ms, FALSE);
+
+ return ok ? (int)n : -1;
+}
+
+
+static void* native_event_get_userdata(native_event *event)
+{
+ return (void *)event->lpCompletionKey;
+}
+
#endif
/* Dequeue element from a workqueue */
-static connection_t *queue_get(thread_group_t *thread_group)
+static TP_connection_generic *queue_get(thread_group_t *thread_group)
{
DBUG_ENTER("queue_get");
thread_group->queue_event_count++;
- connection_t *c= thread_group->queue.front();
- if (c)
+ TP_connection_generic *c;
+ for (int i=0; i < NQUEUES;i++)
{
- thread_group->queue.remove(c);
+ c= thread_group->queues[i].pop_front();
+ if (c)
+ DBUG_RETURN(c);
}
- DBUG_RETURN(c);
+ DBUG_RETURN(0);
}
+static bool is_queue_empty(thread_group_t *thread_group)
+{
+ for (int i=0; i < NQUEUES; i++)
+ {
+ if (!thread_group->queues[i].is_empty())
+ return false;
+ }
+ return true;
+}
+
+
+static void queue_init(thread_group_t *thread_group)
+{
+ for (int i=0; i < NQUEUES; i++)
+ {
+ thread_group->queues[i].empty();
+ }
+}
+
+static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt)
+{
+ ulonglong now= pool_timer.current_microtime;
+ for(int i=0; i < cnt; i++)
+ {
+ TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]);
+ c->dequeue_time= now;
+ thread_group->queues[c->priority].push_back(c);
+ }
+}
/*
Handle wait timeout :
@@ -450,7 +576,7 @@ static void timeout_check(pool_timer_t *timer)
if (thd->net.reading_or_writing != 1)
continue;
- connection_t *connection= (connection_t *)thd->event_scheduler.data;
+ TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data;
if (!connection)
{
/*
@@ -462,11 +588,7 @@ static void timeout_check(pool_timer_t *timer)
if(connection->abs_wait_timeout < timer->current_microtime)
{
- /* Wait timeout exceeded, kill connection. */
- mysql_mutex_lock(&thd->LOCK_thd_data);
- thd->killed = KILL_CONNECTION;
- post_kill_notification(thd);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ tp_timeout_handler(connection);
}
else
{
@@ -545,10 +667,23 @@ static void* timer_thread(void *param)
void check_stall(thread_group_t *thread_group)
{
- if (mysql_mutex_trylock(&thread_group->mutex) != 0)
+ mysql_mutex_lock(&thread_group->mutex);
+
+ /*
+ Bump priority for the low priority connections that spent too much
+ time in low prio queue.
+ */
+ TP_connection_generic *c;
+ for (;;)
{
- /* Something happens. Don't disturb */
- return;
+ c= thread_group->queues[TP_PRIORITY_LOW].front();
+ if (c && pool_timer.current_microtime - c->dequeue_time > 1000ULL * threadpool_prio_kickup_timer)
+ {
+ thread_group->queues[TP_PRIORITY_LOW].remove(c);
+ thread_group->queues[TP_PRIORITY_HIGH].push_back(c);
+ }
+ else
+ break;
}
/*
@@ -593,7 +728,7 @@ void check_stall(thread_group_t *thread_group)
do wait and indicate that via thd_wait_begin/end callbacks, thread creation
will be faster.
*/
- if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)
+ if (!is_queue_empty(thread_group) && !thread_group->queue_event_count)
{
thread_group->stalled= true;
wake_or_create_thread(thread_group);
@@ -636,11 +771,11 @@ static void stop_timer(pool_timer_t *timer)
@return a ready connection, or NULL on shutdown
*/
-static connection_t * listener(worker_thread_t *current_thread,
+static TP_connection_generic * listener(worker_thread_t *current_thread,
thread_group_t *thread_group)
{
DBUG_ENTER("listener");
- connection_t *retval= NULL;
+ TP_connection_generic *retval= NULL;
for(;;)
{
@@ -707,28 +842,17 @@ static connection_t * listener(worker_thread_t *current_thread,
and wake a worker.
NOTE: Currently nothing is done to detect or prevent long queuing times.
- A solutionc for the future would be to give up "one active thread per
+ A solution for the future would be to give up "one active thread per
group" principle, if events stay in the queue for too long, and just wake
more workers.
*/
- bool listener_picks_event= thread_group->queue.is_empty();
-
- /*
- If listener_picks_event is set, listener thread will handle first event,
- and put the rest into the queue. If listener_pick_event is not set, all
- events go to the queue.
- */
- for(int i=(listener_picks_event)?1:0; i < cnt ; i++)
- {
- connection_t *c= (connection_t *)native_event_get_userdata(&ev[i]);
- thread_group->queue.push_back(c);
- }
-
+ bool listener_picks_event=is_queue_empty(thread_group);
+ queue_put(thread_group, ev, cnt);
if (listener_picks_event)
{
/* Handle the first event. */
- retval= (connection_t *)native_event_get_userdata(&ev[0]);
+ retval= queue_get(thread_group);
mysql_mutex_unlock(&thread_group->mutex);
break;
}
@@ -914,7 +1038,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
thread_group->pollfd= -1;
thread_group->shutdown_pipe[0]= -1;
thread_group->shutdown_pipe[1]= -1;
- thread_group->queue.empty();
+ queue_init(thread_group);
DBUG_RETURN(0);
}
@@ -924,9 +1048,10 @@ void thread_group_destroy(thread_group_t *thread_group)
mysql_mutex_destroy(&thread_group->mutex);
if (thread_group->pollfd != -1)
{
- close(thread_group->pollfd);
+ io_poll_close(thread_group->pollfd);
thread_group->pollfd= -1;
}
+#ifndef HAVE_IOCP
for(int i=0; i < 2; i++)
{
if(thread_group->shutdown_pipe[i] != -1)
@@ -935,6 +1060,8 @@ void thread_group_destroy(thread_group_t *thread_group)
thread_group->shutdown_pipe[i]= -1;
}
}
+#endif
+
if (my_atomic_add32(&shutdown_group_count, -1) == 1)
my_free(all_groups);
}
@@ -957,7 +1084,32 @@ static int wake_thread(thread_group_t *thread_group)
DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */
}
+/*
+ Wake listener thread (during shutdown)
+ Self-pipe trick is used in most cases,except IOCP.
+*/
+static int wake_listener(thread_group_t *thread_group)
+{
+#ifndef HAVE_IOCP
+ if (pipe(thread_group->shutdown_pipe))
+ {
+ return -1;
+ }
+ /* Wake listener */
+ if (io_poll_associate_fd(thread_group->pollfd,
+ thread_group->shutdown_pipe[0], NULL, NULL))
+ {
+ return -1;
+ }
+ char c= 0;
+ if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
+ return -1;
+#else
+ PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0);
+#endif
+ return 0;
+}
/**
Initiate shutdown for thread group.
@@ -981,20 +1133,7 @@ static void thread_group_close(thread_group_t *thread_group)
thread_group->shutdown= true;
thread_group->listener= NULL;
- if (pipe(thread_group->shutdown_pipe))
- {
- DBUG_VOID_RETURN;
- }
-
- /* Wake listener */
- if (io_poll_associate_fd(thread_group->pollfd,
- thread_group->shutdown_pipe[0], NULL))
- {
- DBUG_VOID_RETURN;
- }
- char c= 0;
- if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
- DBUG_VOID_RETURN;
+ wake_listener(thread_group);
/* Wake all workers. */
while(wake_thread(thread_group) == 0)
@@ -1015,18 +1154,16 @@ static void thread_group_close(thread_group_t *thread_group)
*/
-static void queue_put(thread_group_t *thread_group, connection_t *connection)
+static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection)
{
DBUG_ENTER("queue_put");
- mysql_mutex_lock(&thread_group->mutex);
- thread_group->queue.push_back(connection);
+ connection->dequeue_time= pool_timer.current_microtime;
+ thread_group->queues[connection->priority].push_back(connection);
if (thread_group->active_thread_count == 0)
wake_or_create_thread(thread_group);
- mysql_mutex_unlock(&thread_group->mutex);
-
DBUG_VOID_RETURN;
}
@@ -1061,18 +1198,19 @@ static bool too_many_threads(thread_group_t *thread_group)
NULL is returned if timeout has expired,or on shutdown.
*/
-connection_t *get_event(worker_thread_t *current_thread,
+TP_connection_generic *get_event(worker_thread_t *current_thread,
thread_group_t *thread_group, struct timespec *abstime)
{
DBUG_ENTER("get_event");
- connection_t *connection = NULL;
- int err=0;
+ TP_connection_generic *connection = NULL;
+
mysql_mutex_lock(&thread_group->mutex);
DBUG_ASSERT(thread_group->active_thread_count >= 0);
for(;;)
{
+ int err=0;
bool oversubscribed = too_many_threads(thread_group);
if (thread_group->shutdown)
break;
@@ -1100,22 +1238,27 @@ connection_t *get_event(worker_thread_t *current_thread,
thread_group->listener= NULL;
break;
}
-
+
+
/*
Last thing we try before going to sleep is to
- pick a single event via epoll, without waiting (timeout 0)
+ non-blocking event poll, i.e with timeout = 0.
+ If this returns events, pick one
*/
if (!oversubscribed)
{
- native_event nev;
- if (io_poll_wait(thread_group->pollfd,&nev,1, 0) == 1)
+
+ native_event ev[MAX_EVENTS];
+ int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0);
+ if (cnt > 0)
{
- thread_group->io_event_count++;
- connection = (connection_t *)native_event_get_userdata(&nev);
+ queue_put(thread_group, ev, cnt);
+ connection= queue_get(thread_group);
break;
}
}
+
/* And now, finally sleep */
current_thread->woken = false; /* wake() sets this to true */
@@ -1173,9 +1316,9 @@ void wait_begin(thread_group_t *thread_group)
DBUG_ASSERT(thread_group->active_thread_count >=0);
DBUG_ASSERT(thread_group->connection_count > 0);
-
+
if ((thread_group->active_thread_count == 0) &&
- (thread_group->queue.is_empty() || !thread_group->listener))
+ (is_queue_empty(thread_group) || !thread_group->listener))
{
/*
Group might stall while this thread waits, thus wake
@@ -1202,103 +1345,47 @@ void wait_end(thread_group_t *thread_group)
}
-/**
- Allocate/initialize a new connection structure.
-*/
-connection_t *alloc_connection()
+
+TP_connection * TP_pool_generic::new_connection(CONNECT *c)
{
- connection_t* connection;
- DBUG_ENTER("alloc_connection");
- DBUG_EXECUTE_IF("simulate_failed_connection_1", DBUG_RETURN(0); );
-
- if ((connection = (connection_t *)my_malloc(sizeof(connection_t),0)))
- {
- connection->waiting= false;
- connection->logged_in= false;
- connection->bound_to_poll_descriptor= false;
- connection->abs_wait_timeout= ULONGLONG_MAX;
- connection->thd= 0;
- }
- DBUG_RETURN(connection);
+ return new (std::nothrow) TP_connection_generic(c);
}
-
-
/**
Add a new connection to thread pool..
*/
-void tp_add_connection(CONNECT *connect)
+void TP_pool_generic::add(TP_connection *c)
{
- connection_t *connection;
DBUG_ENTER("tp_add_connection");
- connection= alloc_connection();
- if (!connection)
- {
- connect->close_and_delete();
- DBUG_VOID_RETURN;
- }
- connection->connect= connect;
-
- /* Assign connection to a group. */
- thread_group_t *group=
- &all_groups[connect->thread_id%group_count];
-
- connection->thread_group=group;
-
- mysql_mutex_lock(&group->mutex);
- group->connection_count++;
- mysql_mutex_unlock(&group->mutex);
-
+ TP_connection_generic *connection=(TP_connection_generic *)c;
+ thread_group_t *thread_group= connection->thread_group;
/*
Add connection to the work queue.Actual logon
will be done by a worker thread.
*/
- queue_put(group, connection);
+ mysql_mutex_lock(&thread_group->mutex);
+ queue_put(thread_group, connection);
+ mysql_mutex_unlock(&thread_group->mutex);
DBUG_VOID_RETURN;
}
-/**
- Terminate connection.
-*/
-
-static void connection_abort(connection_t *connection)
-{
- DBUG_ENTER("connection_abort");
- thread_group_t *group= connection->thread_group;
-
- if (connection->thd)
- {
- threadpool_remove_connection(connection->thd);
- }
-
- mysql_mutex_lock(&group->mutex);
- group->connection_count--;
- mysql_mutex_unlock(&group->mutex);
-
- my_free(connection);
- DBUG_VOID_RETURN;
-}
-
/**
MySQL scheduler callback: wait begin
*/
-void tp_wait_begin(THD *thd, int type)
+void TP_connection_generic::wait_begin(int type)
{
- DBUG_ENTER("tp_wait_begin");
- DBUG_ASSERT(thd);
- connection_t *connection = (connection_t *)thd->event_scheduler.data;
- if (connection)
- {
- DBUG_ASSERT(!connection->waiting);
- connection->waiting= true;
- wait_begin(connection->thread_group);
- }
+ DBUG_ENTER("wait_begin");
+
+ DBUG_ASSERT(!waiting);
+ waiting++;
+ if (waiting == 1)
+ ::wait_begin(thread_group);
DBUG_VOID_RETURN;
}
@@ -1307,18 +1394,13 @@ void tp_wait_begin(THD *thd, int type)
MySQL scheduler callback: wait end
*/
-void tp_wait_end(THD *thd)
+void TP_connection_generic::wait_end()
{
- DBUG_ENTER("tp_wait_end");
- DBUG_ASSERT(thd);
-
- connection_t *connection = (connection_t *)thd->event_scheduler.data;
- if (connection)
- {
- DBUG_ASSERT(connection->waiting);
- connection->waiting = false;
- wait_end(connection->thread_group);
- }
+ DBUG_ENTER("wait_end");
+ DBUG_ASSERT(waiting);
+ waiting--;
+ if (waiting == 0)
+ ::wait_end(thread_group);
DBUG_VOID_RETURN;
}
@@ -1335,12 +1417,41 @@ static void set_next_timeout_check(ulonglong abstime)
DBUG_VOID_RETURN;
}
+TP_connection_generic::TP_connection_generic(CONNECT *c):
+ TP_connection(c),
+ thread_group(0),
+ next_in_queue(0),
+ prev_in_queue(0),
+ abs_wait_timeout(ULONGLONG_MAX),
+ bound_to_poll_descriptor(false),
+ waiting(false)
+#ifdef HAVE_IOCP
+, overlapped()
+#endif
+{
+ /* Assign connection to a group. */
+ thread_group_t *group=
+ &all_groups[c->thread_id%group_count];
+
+ thread_group=group;
+
+ mysql_mutex_lock(&group->mutex);
+ group->connection_count++;
+ mysql_mutex_unlock(&group->mutex);
+}
+
+TP_connection_generic::~TP_connection_generic()
+{
+ mysql_mutex_lock(&thread_group->mutex);
+ thread_group->connection_count--;
+ mysql_mutex_unlock(&thread_group->mutex);
+}
/**
Set wait timeout for connection.
*/
-static void set_wait_timeout(connection_t *c)
+void TP_connection_generic::set_io_timeout(int timeout_sec)
{
DBUG_ENTER("set_wait_timeout");
/*
@@ -1351,11 +1462,11 @@ static void set_wait_timeout(connection_t *c)
one tick interval.
*/
- c->abs_wait_timeout= pool_timer.current_microtime +
+ abs_wait_timeout= pool_timer.current_microtime +
1000LL*pool_timer.tick_interval +
- 1000000LL*c->thd->variables.net_wait_timeout;
+ 1000000LL*timeout_sec;
- set_next_timeout_check(c->abs_wait_timeout);
+ set_next_timeout_check(abs_wait_timeout);
DBUG_VOID_RETURN;
}
@@ -1367,7 +1478,7 @@ static void set_wait_timeout(connection_t *c)
after thread_pool_size setting.
*/
-static int change_group(connection_t *c,
+static int change_group(TP_connection_generic *c,
thread_group_t *old_group,
thread_group_t *new_group)
{
@@ -1398,10 +1509,11 @@ static int change_group(connection_t *c,
}
-static int start_io(connection_t *connection)
+int TP_connection_generic::start_io()
{
- int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket);
+ int fd= mysql_socket_getfd(thd->net.vio->mysql_socket);
+#ifndef HAVE_IOCP
/*
Usually, connection will stay in the same group for the entire
connection's life. However, we do allow group_count to
@@ -1413,56 +1525,25 @@ static int start_io(connection_t *connection)
on thread_id and current group count, and migrate if necessary.
*/
thread_group_t *group =
- &all_groups[connection->thd->thread_id%group_count];
+ &all_groups[thd->thread_id%group_count];
- if (group != connection->thread_group)
+ if (group != thread_group)
{
- if (change_group(connection, connection->thread_group, group))
+ if (change_group(this, thread_group, group))
return -1;
}
-
+#endif
+
/*
Bind to poll descriptor if not yet done.
*/
- if (!connection->bound_to_poll_descriptor)
+ if (!bound_to_poll_descriptor)
{
- connection->bound_to_poll_descriptor= true;
- return io_poll_associate_fd(group->pollfd, fd, connection);
+ bound_to_poll_descriptor= true;
+ return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
}
- return io_poll_start_read(group->pollfd, fd, connection);
-}
-
-
-
-static void handle_event(connection_t *connection)
-{
-
- DBUG_ENTER("handle_event");
- int err;
-
- if (!connection->logged_in)
- {
- connection->thd = threadpool_add_connection(connection->connect, connection);
- err= (connection->thd == NULL);
- connection->logged_in= true;
- }
- else
- {
- err= threadpool_process_request(connection->thd);
- }
-
- if(err)
- goto end;
-
- set_wait_timeout(connection);
- err= start_io(connection);
-
-end:
- if (err)
- connection_abort(connection);
-
- DBUG_VOID_RETURN;
+ return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM);
}
@@ -1490,14 +1571,14 @@ static void *worker_main(void *param)
/* Run event loop */
for(;;)
{
- connection_t *connection;
+ TP_connection_generic *connection;
struct timespec ts;
set_timespec(ts,threadpool_idle_timeout);
connection = get_event(&this_thread, thread_group, &ts);
if (!connection)
break;
this_thread.event_count++;
- handle_event(connection);
+ tp_callback(connection);
}
/* Thread shutdown: cleanup per-worker-thread structure. */
@@ -1518,30 +1599,33 @@ static void *worker_main(void *param)
}
-bool tp_init()
+TP_pool_generic::TP_pool_generic()
+{}
+
+int TP_pool_generic::init()
{
- DBUG_ENTER("tp_init");
+ DBUG_ENTER("TP_pool_generic::TP_pool_generic");
threadpool_max_size= MY_MAX(threadpool_size, 128);
all_groups= (thread_group_t *)
my_malloc(sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL));
if (!all_groups)
{
threadpool_max_size= 0;
- DBUG_RETURN(1);
+ sql_print_error("Allocation failed");
+ DBUG_RETURN(-1);
}
- threadpool_started= true;
scheduler_init();
-
+ threadpool_started= true;
for (uint i= 0; i < threadpool_max_size; i++)
{
thread_group_init(&all_groups[i], get_connection_attrib());
}
- tp_set_threadpool_size(threadpool_size);
+ set_pool_size(threadpool_size);
if(group_count == 0)
{
/* Something went wrong */
sql_print_error("Can't set threadpool size to %d",threadpool_size);
- DBUG_RETURN(1);
+ DBUG_RETURN(-1);
}
PSI_register(mutex);
PSI_register(cond);
@@ -1552,7 +1636,7 @@ bool tp_init()
DBUG_RETURN(0);
}
-void tp_end()
+TP_pool_generic::~TP_pool_generic()
{
DBUG_ENTER("tp_end");
@@ -1571,13 +1655,10 @@ void tp_end()
/** Ensure that poll descriptors are created when threadpool_size changes */
-
-void tp_set_threadpool_size(uint size)
+int TP_pool_generic::set_pool_size(uint size)
{
bool success= true;
- if (!threadpool_started)
- return;
-
+
for(uint i=0; i< size; i++)
{
thread_group_t *group= &all_groups[i];
@@ -1596,20 +1677,20 @@ void tp_set_threadpool_size(uint size)
if (!success)
{
group_count= i;
- return;
+ return -1;
}
}
group_count= size;
+ return 0;
}
-void tp_set_threadpool_stall_limit(uint limit)
+int TP_pool_generic::set_stall_limit(uint limit)
{
- if (!threadpool_started)
- return;
mysql_mutex_lock(&(pool_timer.mutex));
pool_timer.tick_interval= limit;
mysql_mutex_unlock(&(pool_timer.mutex));
mysql_cond_signal(&(pool_timer.cond));
+ return 0;
}
@@ -1620,7 +1701,7 @@ void tp_set_threadpool_stall_limit(uint limit)
Don't do any locking, it is not required for stats.
*/
-int tp_get_idle_thread_count()
+int TP_pool_generic::get_idle_thread_count()
{
int sum=0;
for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++)
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index 9b1d8f6a7d8..dec898d92bb 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -64,8 +64,9 @@ static void tp_log_warning(const char *msg, const char *fct)
}
-PTP_POOL pool;
-DWORD fls;
+static PTP_POOL pool;
+static TP_CALLBACK_ENVIRON callback_environ;
+static DWORD fls;
static bool skip_completion_port_on_success = false;
@@ -85,13 +86,16 @@ static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io);
+
+static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work);
+
static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result);
static void CALLBACK shm_close_callback(PTP_CALLBACK_INSTANCE instance,
PVOID Context, PTP_WAIT wait,TP_WAIT_RESULT wait_result);
-static void check_thread_init();
+static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance);
/* Get current time as Windows time */
static ulonglong now()
@@ -101,74 +105,86 @@ static ulonglong now()
return current_time;
}
-/*
- Connection structure, encapsulates THD + structures for asynchronous
- IO and pool.
-*/
-
-struct connection_t
+struct TP_connection_win:public TP_connection
{
- THD *thd;
+public:
+ TP_connection_win(CONNECT*);
+ ~TP_connection_win();
+ virtual int init();
+ virtual int start_io();
+ virtual void set_io_timeout(int sec);
+ virtual void wait_begin(int type);
+ virtual void wait_end();
+
+ ulonglong timeout;
+ enum_vio_type vio_type;
HANDLE handle;
OVERLAPPED overlapped;
- /* absolute time for wait timeout (as Windows time) */
- volatile ulonglong timeout;
- TP_CALLBACK_ENVIRON callback_environ;
+ PTP_CALLBACK_INSTANCE callback_instance;
PTP_IO io;
PTP_TIMER timer;
PTP_WAIT shm_read;
- /* Callback instance, used to inform treadpool about long callbacks */
- PTP_CALLBACK_INSTANCE callback_instance;
- CONNECT* connect;
- bool logged_in;
+ PTP_WORK work;
+ bool long_callback;
+
};
+struct TP_connection *new_TP_connection(CONNECT *connect)
+{
+ TP_connection *c = new (std::nothrow) TP_connection_win(connect);
+ if (!c || c->init())
+ {
+ delete c;
+ return 0;
+ }
+ return c;
+}
-void init_connection(connection_t *connection, CONNECT *connect)
+void TP_pool_win::add(TP_connection *c)
{
- connection->logged_in = false;
- connection->handle= 0;
- connection->io= 0;
- connection->shm_read= 0;
- connection->timer= 0;
- connection->logged_in = false;
- connection->timeout= ULONGLONG_MAX;
- connection->callback_instance= 0;
- connection->thd= 0;
- memset(&connection->overlapped, 0, sizeof(OVERLAPPED));
- InitializeThreadpoolEnvironment(&connection->callback_environ);
- SetThreadpoolCallbackPool(&connection->callback_environ, pool);
- connection->connect= connect;
+ SubmitThreadpoolWork(((TP_connection_win *)c)->work);
}
-int init_io(connection_t *connection, THD *thd)
+TP_connection_win::TP_connection_win(CONNECT *c) :
+ TP_connection(c),
+ timeout(ULONGLONG_MAX),
+ callback_instance(0),
+ io(0),
+ shm_read(0),
+ timer(0),
+ work(0)
{
- connection->thd= thd;
- Vio *vio = thd->net.vio;
- switch(vio->type)
+}
+
+#define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; }
+
+int TP_connection_win::init()
+{
+
+ memset(&overlapped, 0, sizeof(OVERLAPPED));
+ Vio *vio = connect->vio;
+ switch ((vio_type = vio->type))
{
- case VIO_TYPE_SSL:
- case VIO_TYPE_TCPIP:
- connection->handle= (HANDLE)mysql_socket_getfd(connection->thd->net.vio->mysql_socket);
- break;
- case VIO_TYPE_NAMEDPIPE:
- connection->handle= (HANDLE)vio->hPipe;
- break;
- case VIO_TYPE_SHARED_MEMORY:
- connection->shm_read= CreateThreadpoolWait(shm_read_callback, connection,
- &connection->callback_environ);
- if (!connection->shm_read)
- {
- tp_log_warning("Allocation failed", "CreateThreadpoolWait");
- return -1;
- }
- break;
- default:
- abort();
+ case VIO_TYPE_SSL:
+ case VIO_TYPE_TCPIP:
+ handle= (HANDLE)mysql_socket_getfd(vio->mysql_socket);
+ break;
+ case VIO_TYPE_NAMEDPIPE:
+ handle= (HANDLE)vio->hPipe;
+ break;
+ case VIO_TYPE_SHARED_MEMORY:
+ handle= vio->event_server_wrote;
+ break;
+ default:
+ abort();
}
- if (connection->handle)
+ if (vio_type == VIO_TYPE_SHARED_MEMORY)
+ {
+ CHECK_ALLOC_ERROR(shm_read= CreateThreadpoolWait(shm_read_callback, this, &callback_environ));
+ }
+ else
{
/* Performance tweaks (s. MSDN documentation)*/
UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE;
@@ -176,25 +192,13 @@ int init_io(connection_t *connection, THD *thd)
{
flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
}
- (void)SetFileCompletionNotificationModes(connection->handle, flags);
-
+ (void)SetFileCompletionNotificationModes(handle, flags);
/* Assign io completion callback */
- connection->io= CreateThreadpoolIo(connection->handle,
- io_completion_callback, connection, &connection->callback_environ);
- if(!connection->io)
- {
- tp_log_warning("Allocation failed", "CreateThreadpoolWait");
- return -1;
- }
- }
- connection->timer= CreateThreadpoolTimer(timer_callback, connection,
- &connection->callback_environ);
- if (!connection->timer)
- {
- tp_log_warning("Allocation failed", "CreateThreadpoolWait");
- return -1;
+ CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
}
+ CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
+ CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
return 0;
}
@@ -202,9 +206,8 @@ int init_io(connection_t *connection, THD *thd)
/*
Start asynchronous read
*/
-int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
+int TP_connection_win::start_io()
{
- /* Start async read */
DWORD num_bytes = 0;
static char c;
WSABUF buf;
@@ -214,33 +217,20 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
DWORD last_error= 0;
int retval;
- Vio *vio= connection->thd->net.vio;
-
- if (vio->type == VIO_TYPE_SHARED_MEMORY)
+ if (shm_read)
{
- SetThreadpoolWait(connection->shm_read, vio->event_server_wrote, NULL);
- return 0;
- }
- if (vio->type == VIO_CLOSED)
- {
- return -1;
+ SetThreadpoolWait(shm_read, handle, NULL);
+ return 0;
}
-
- DBUG_ASSERT(vio->type == VIO_TYPE_TCPIP ||
- vio->type == VIO_TYPE_SSL ||
- vio->type == VIO_TYPE_NAMEDPIPE);
-
- OVERLAPPED *overlapped= &connection->overlapped;
- PTP_IO io= connection->io;
StartThreadpoolIo(io);
- if (vio->type == VIO_TYPE_TCPIP || vio->type == VIO_TYPE_SSL)
+ if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL)
{
/* Start async io (sockets). */
- if (WSARecv(mysql_socket_getfd(vio->mysql_socket) , &buf, 1, &num_bytes, &flags,
- overlapped, NULL) == 0)
+ if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags,
+ &overlapped, NULL) == 0)
{
- retval= last_error= 0;
+ retval= last_error= 0;
}
else
{
@@ -251,7 +241,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
else
{
/* Start async io (named pipe) */
- if (ReadFile(vio->hPipe, &c, 0, &num_bytes ,overlapped))
+ if (ReadFile(handle, &c, 0, &num_bytes,&overlapped))
{
retval= last_error= 0;
}
@@ -272,7 +262,7 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
if(skip_completion_port_on_success)
{
CancelThreadpoolIo(io);
- io_completion_callback(instance, connection, overlapped, last_error,
+ io_completion_callback(callback_instance, this, &overlapped, last_error,
num_bytes, io);
}
return 0;
@@ -288,81 +278,81 @@ int start_io(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
return -1;
}
-
-int login(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
-{
- if ((connection->thd= threadpool_add_connection(connection->connect, connection))
- && init_io(connection, connection->thd) == 0
- && start_io(connection, instance) == 0)
- {
- return 0;
- }
- return -1;
-}
-
/*
- Recalculate wait timeout, maybe reset timer.
+ Recalculate wait timeout, maybe reset timer.
*/
-void set_wait_timeout(connection_t *connection, ulonglong old_timeout)
+void TP_connection_win::set_io_timeout(int timeout_sec)
{
- ulonglong new_timeout = now() +
- 10000000LL*connection->thd->variables.net_wait_timeout;
+ ulonglong old_timeout= timeout;
+ ulonglong new_timeout = now() + 10000000LL * timeout_sec;
if (new_timeout < old_timeout)
{
- SetThreadpoolTimer(connection->timer, (PFILETIME) &new_timeout, 0, 1000);
+ SetThreadpoolTimer(timer, (PFILETIME)&new_timeout, 0, 1000);
}
- connection->timeout = new_timeout;
+ /* new_timeout > old_timeout case is handled by expiring timer. */
+ timeout = new_timeout;
}
-/* Connection destructor */
-void destroy_connection(connection_t *connection, PTP_CALLBACK_INSTANCE instance)
+TP_connection_win::~TP_connection_win()
{
- if (instance)
- DisassociateCurrentThreadFromCallback(instance);
- if (connection->io)
- {
- WaitForThreadpoolIoCallbacks(connection->io, TRUE);
- CloseThreadpoolIo(connection->io);
- }
+ if (io)
+ CloseThreadpoolIo(io);
- if(connection->shm_read)
- {
- WaitForThreadpoolWaitCallbacks(connection->shm_read, TRUE);
- CloseThreadpoolWait(connection->shm_read);
- }
+ if (shm_read)
+ CloseThreadpoolWait(shm_read);
+
+ if (work)
+ CloseThreadpoolWork(work);
- if(connection->timer)
+ if (timer)
{
- SetThreadpoolTimer(connection->timer, 0, 0, 0);
- WaitForThreadpoolTimerCallbacks(connection->timer, TRUE);
- CloseThreadpoolTimer(connection->timer);
+ WaitForThreadpoolTimerCallbacks(timer, TRUE);
+ CloseThreadpoolTimer(timer);
}
-
- if (connection->thd)
+}
+
+void TP_connection_win::wait_begin(int type)
+{
+
+ /*
+ Signal to the threadpool whenever callback can run long. Currently, binlog
+ waits are a good candidate, its waits are really long
+ */
+ if (type == THD_WAIT_BINLOG)
{
- threadpool_remove_connection(connection->thd);
+ if (!long_callback)
+ {
+ CallbackMayRunLong(callback_instance);
+ long_callback= true;
+ }
}
-
- DestroyThreadpoolEnvironment(&connection->callback_environ);
}
-
+void TP_connection_win::wait_end()
+{
+ /* Do we need to do anything ? */
+}
/*
This function should be called first whenever a callback is invoked in the
threadpool, does my_thread_init() if not yet done
*/
extern ulong thread_created;
-static void check_thread_init()
+static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance)
{
if (FlsGetValue(fls) == NULL)
{
+ /* Running in new worker thread*/
FlsSetValue(fls, (void *)1);
statistic_increment(thread_created, &LOCK_status);
InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads);
+ my_thread_init();
}
+ TP_connection_win *c = (TP_connection_win *)context;
+ c->callback_instance = instance;
+ c->long_callback = false;
}
@@ -375,153 +365,61 @@ static VOID WINAPI thread_destructor(void *data)
if(data)
{
InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads);
+ my_thread_end();
}
}
-/* Scheduler callback : init */
-bool tp_init(void)
-{
- fls= FlsAlloc(thread_destructor);
- pool= CreateThreadpool(NULL);
- if(!pool)
- {
- sql_print_error("Can't create threadpool. "
- "CreateThreadpool() failed with %d. Likely cause is memory pressure",
- GetLastError());
- exit(1);
- }
-
- if (threadpool_max_threads)
- {
- SetThreadpoolThreadMaximum(pool,threadpool_max_threads);
- }
-
- if (threadpool_min_threads)
- {
- if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads))
- {
- tp_log_warning( "Can't set threadpool minimum threads",
- "SetThreadpoolThreadMinimum");
- }
- }
-
- /*
- Control stack size (OS must be Win7 or later, plus corresponding SDK)
- */
-#if _MSC_VER >=1600
- if (SetThreadpoolStackInformation)
- {
- TP_POOL_STACK_INFORMATION stackinfo;
- stackinfo.StackCommit = 0;
- stackinfo.StackReserve = (SIZE_T)my_thread_stack_size;
- if (!SetThreadpoolStackInformation(pool, &stackinfo))
- {
- tp_log_warning("Can't set threadpool stack size",
- "SetThreadpoolStackInformation");
- }
- }
-#endif
-
- return 0;
-}
-
-/**
- Scheduler callback : Destroy the scheduler.
-*/
-void tp_end(void)
+static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context)
{
- if(pool)
- {
- SetThreadpoolThreadMaximum(pool, 0);
- CloseThreadpool(pool);
- }
+ pre_callback(context, instance);
+ tp_callback((TP_connection *)context);
}
+
/*
Handle read completion/notification.
*/
static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
{
- if(instance)
- {
- check_thread_init();
- }
-
- connection_t *connection = (connection_t*)context;
-
- if (io_result != ERROR_SUCCESS)
- goto error;
-
- THD *thd= connection->thd;
- ulonglong old_timeout = connection->timeout;
- connection->timeout = ULONGLONG_MAX;
- connection->callback_instance= instance;
- if (threadpool_process_request(connection->thd))
- goto error;
-
- set_wait_timeout(connection, old_timeout);
- if(start_io(connection, instance))
- goto error;
-
- return;
-
-error:
- /* Some error has occurred. */
-
- destroy_connection(connection, instance);
- free(connection);
+ TP_connection_win *c= (TP_connection_win *)context;
+ /*
+ Execute high priority connections immediately.
+ 'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)
+ which makes Windows threadpool place the items at the end of its internal work queue.
+ */
+ if (c->priority == TP_PRIORITY_HIGH)
+ tp_callback(instance, context);
+ else
+ SubmitThreadpoolWork(c->work);
}
-/* Simple callback for login */
-static void CALLBACK login_callback(PTP_CALLBACK_INSTANCE instance,
- PVOID context, PTP_WORK work)
-{
- if(instance)
- {
- check_thread_init();
- }
-
- connection_t *connection =(connection_t *)context;
- if (login(connection, instance) != 0)
- {
- destroy_connection(connection, instance);
- free(connection);
- }
-}
-
/*
Timer callback.
Invoked when connection times out (wait_timeout)
*/
-static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
+static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
PVOID parameter, PTP_TIMER timer)
{
- check_thread_init();
-
- connection_t *con= (connection_t*)parameter;
- ulonglong timeout= con->timeout;
-
- if (timeout <= now())
+ TP_connection_win *c = (TP_connection_win *)parameter;
+ if (c->timeout <= now())
{
- con->thd->killed = KILL_CONNECTION;
- if(con->thd->net.vio)
- vio_shutdown(con->thd->net.vio, SD_BOTH);
+ tp_timeout_handler(c);
}
- else if(timeout != ULONGLONG_MAX)
+ else
{
- /*
- Reset timer.
- There is a tiny possibility of a race condition, since the value of timeout
- could have changed to smaller value in the thread doing io callback.
+ /*
+ Reset timer.
+ There is a tiny possibility of a race condition, since the value of timeout
+ could have changed to smaller value in the thread doing io callback.
- Given the relative unimportance of the wait timeout, we accept race
+ Given the relative unimportance of the wait timeout, we accept race
condition.
- */
- SetThreadpoolTimer(timer, (PFILETIME)&timeout, 0, 1000);
+ */
+ SetThreadpoolTimer(timer, (PFILETIME)&c->timeout, 0, 1000);
}
}
@@ -530,10 +428,11 @@ static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance,
Shared memory read callback.
Invoked when read event is set on connection.
*/
+
static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PTP_WAIT wait,TP_WAIT_RESULT wait_result)
{
- connection_t *con= (connection_t *)context;
+ TP_connection_win *c= (TP_connection_win *)context;
/* Disarm wait. */
SetThreadpoolWait(wait, NULL, NULL);
@@ -542,97 +441,106 @@ static void CALLBACK shm_read_callback(PTP_CALLBACK_INSTANCE instance,
and the current state is "not set". Thus we need to reset the event again,
or vio_read will hang.
*/
- HANDLE h = con->thd->net.vio->event_server_wrote;
- SetEvent(h);
- io_completion_callback(instance, context, NULL, 0, 0 , 0);
+ SetEvent(c->handle);
+ tp_callback(instance, context);
}
-/*
- Notify the thread pool about a new connection.
-*/
+static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work)
+{
+ tp_callback(instance, context);
+}
+
+TP_pool_win::TP_pool_win()
+{}
-void tp_add_connection(CONNECT *connect)
+int TP_pool_win::init()
{
- connection_t *con;
- con= (connection_t *)malloc(sizeof(connection_t));
- DBUG_EXECUTE_IF("simulate_failed_connection_1", free(con);con= 0; );
- if (!con)
+ fls= FlsAlloc(thread_destructor);
+ pool= CreateThreadpool(NULL);
+
+ if (!pool)
{
- tp_log_warning("Allocation failed", "tp_add_connection");
- connect->close_and_delete();
- return;
+ sql_print_error("Can't create threadpool. "
+ "CreateThreadpool() failed with %d. Likely cause is memory pressure",
+ GetLastError());
+ return -1;
}
- init_connection(con, connect);
+ InitializeThreadpoolEnvironment(&callback_environ);
+ SetThreadpoolCallbackPool(&callback_environ, pool);
- /* Try to login asynchronously, using threads in the pool */
- PTP_WORK wrk = CreateThreadpoolWork(login_callback,con, &con->callback_environ);
- if (wrk)
+ if (threadpool_max_threads)
{
- SubmitThreadpoolWork(wrk);
- CloseThreadpoolWork(wrk);
+ SetThreadpoolThreadMaximum(pool, threadpool_max_threads);
}
- else
+
+ if (threadpool_min_threads)
{
- /* Likely memory pressure */
- connect->close_and_delete();
+ if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads))
+ {
+ tp_log_warning("Can't set threadpool minimum threads",
+ "SetThreadpoolThreadMinimum");
+ }
}
-}
-
-
-/**
- Sets the number of idle threads the thread pool maintains in anticipation of new
- requests.
-*/
-void tp_set_min_threads(uint val)
-{
- if (pool)
- SetThreadpoolThreadMinimum(pool, val);
-}
-
-void tp_set_max_threads(uint val)
-{
- if (pool)
- SetThreadpoolThreadMaximum(pool, val);
-}
-
-void tp_wait_begin(THD *thd, int type)
-{
- DBUG_ASSERT(thd);
/*
- Signal to the threadpool whenever callback can run long. Currently, binlog
- waits are a good candidate, its waits are really long
+ Control stack size (OS must be Win7 or later)
*/
- if (type == THD_WAIT_BINLOG)
+ if (SetThreadpoolStackInformation)
{
- connection_t *connection= (connection_t *)thd->event_scheduler.data;
- if(connection && connection->callback_instance)
+ TP_POOL_STACK_INFORMATION stackinfo;
+ stackinfo.StackCommit = 0;
+ stackinfo.StackReserve = (SIZE_T)my_thread_stack_size;
+ if (!SetThreadpoolStackInformation(pool, &stackinfo))
{
- CallbackMayRunLong(connection->callback_instance);
- /*
- Reset instance, to avoid calling CallbackMayRunLong twice within
- the same callback (it is an error according to docs).
- */
- connection->callback_instance= 0;
+ tp_log_warning("Can't set threadpool stack size",
+ "SetThreadpoolStackInformation");
}
}
+ return 0;
}
-void tp_wait_end(THD *thd)
+
+/**
+ Scheduler callback : Destroy the scheduler.
+*/
+TP_pool_win::~TP_pool_win()
{
- /* Do we need to do anything ? */
+ if (!pool)
+ return;
+ DestroyThreadpoolEnvironment(&callback_environ);
+ SetThreadpoolThreadMaximum(pool, 0);
+ CloseThreadpool(pool);
+ if (!tp_stats.num_worker_threads)
+ FlsFree(fls);
}
-
-
/**
- Number of idle threads in pool.
- This info is not available in Windows implementation,
- thus function always returns 0.
+ Sets the number of idle threads the thread pool maintains in anticipation of new
+ requests.
*/
-int tp_get_idle_thread_count()
+int TP_pool_win::set_min_threads(uint val)
+{
+ SetThreadpoolThreadMinimum(pool, val);
+ return 0;
+}
+
+int TP_pool_win::set_max_threads(uint val)
{
+ SetThreadpoolThreadMaximum(pool, val);
return 0;
}
+
+TP_connection *TP_pool_win::new_connection(CONNECT *connect)
+{
+ TP_connection *c= new (std::nothrow) TP_connection_win(connect);
+ if (!c )
+ return 0;
+ if (c->init())
+ {
+ delete c;
+ return 0;
+ }
+ return c;
+}