summaryrefslogtreecommitdiff
path: root/sql/threadpool_generic.cc
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2017-11-20 20:39:59 +0000
committerVladislav Vaintroub <wlad@mariadb.com>2017-11-28 13:23:51 +0000
commite02b8608612ad827dd789a6ac8113fee62a9d782 (patch)
tree254646378b93d3e806e2e057a02f3f07d3014ae8 /sql/threadpool_generic.cc
parent77872e4519a9ebba2cd2ab4f5794faccf0a1f879 (diff)
downloadmariadb-git-e02b8608612ad827dd789a6ac8113fee62a9d782.tar.gz
Windows, generic threadpool cleanups
- get rid of casts between int and HANDLE. - store file descriptor in TP_connection_generic to get rid of multiple mysql_socket_getfd(c->thd->net.vio->mysql_socket)) Also support named pipes (no reason not to support, since it is easy)
Diffstat (limited to 'sql/threadpool_generic.cc')
-rw-r--r--sql/threadpool_generic.cc129
1 files changed, 77 insertions, 52 deletions
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index e90f87fffc4..f6fdd97c6df 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -28,11 +28,19 @@
#endif
#ifdef HAVE_IOCP
-#define OPTIONAL_IO_POLL_READ_PARAM &overlapped
+#define OPTIONAL_IO_POLL_READ_PARAM this
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
#endif
+#ifdef _WIN32
+typedef HANDLE TP_file_handle;
+#else
+typedef int TP_file_handle;
+#define INVALID_HANDLE_VALUE -1
+#endif
+
+
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
@@ -59,10 +67,10 @@ typedef OVERLAPPED_ENTRY native_event;
#pragma warning (disable : 4312)
#endif
-static void io_poll_close(int fd)
+static void io_poll_close(TP_file_handle fd)
{
#ifdef _WIN32
- CloseHandle((HANDLE)fd);
+ CloseHandle(fd);
#else
close(fd);
#endif
@@ -151,14 +159,17 @@ struct TP_connection_generic:public TP_connection
TP_connection_generic **prev_in_queue;
ulonglong abs_wait_timeout;
ulonglong dequeue_time;
+ TP_file_handle fd;
bool bound_to_poll_descriptor;
int waiting;
#ifdef HAVE_IOCP
OVERLAPPED overlapped;
#endif
+#ifdef _WIN32
+ enum_vio_type vio_type;
+#endif
};
-typedef TP_connection_generic TP_connection_generic;
typedef I_P_List<TP_connection_generic,
I_P_List_adapter<TP_connection_generic,
@@ -177,7 +188,7 @@ struct thread_group_t
worker_list_t waiting_threads;
worker_thread_t *listener;
pthread_attr_t *pthread_attr;
- int pollfd;
+ TP_file_handle pollfd;
int thread_count;
int active_thread_count;
int connection_count;
@@ -245,11 +256,11 @@ static void print_pool_blocked_message(bool);
Creates an io_poll descriptor
On Linux: epoll_create()
- - io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt)
+ - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt)
Associate file descriptor with io poll descriptor
On Linux : epoll_ctl(..EPOLL_CTL_ADD))
- - io_poll_disassociate_fd(int pollfd, int fd)
+ - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
Associate file descriptor with io poll descriptor
On Linux: epoll_ctl(..EPOLL_CTL_DEL)
@@ -259,7 +270,7 @@ static void print_pool_blocked_message(bool);
io_poll_associate_fd() was called.
On Linux : epoll_ctl(..EPOLL_CTL_MOD)
- - io_poll_wait (int pollfd, native_event *native_events, int maxevents,
+ - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents,
int timeout_ms)
wait until one or more descriptors added with io_poll_associate_fd()
@@ -276,13 +287,13 @@ static void print_pool_blocked_message(bool);
/* Early 2.6 kernel did not have EPOLLRDHUP */
#define EPOLLRDHUP 0
#endif
-static int io_poll_create()
+static TP_file_handle io_poll_create()
{
return epoll_create(1);
}
-int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
+int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
@@ -293,7 +304,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data, void*)
-int io_poll_start_read(int pollfd, int fd, void *data, void *)
+int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
struct epoll_event ev;
ev.data.u64= 0; /* Keep valgrind happy */
@@ -302,7 +313,7 @@ int io_poll_start_read(int pollfd, int fd, void *data, void *)
return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev);
}
-int io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
struct epoll_event ev;
return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev);
@@ -314,7 +325,7 @@ int io_poll_disassociate_fd(int pollfd, int fd)
NOTE - in case of EINTR, it restarts with original timeout. Since we use
either infinite or 0 timeouts, this is not critical
*/
-int io_poll_wait(int pollfd, native_event *native_events, int maxevents,
+int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents,
int timeout_ms)
{
int ret;
@@ -347,12 +358,12 @@ static void *native_event_get_userdata(native_event *event)
#endif
-int io_poll_create()
+TP_file_handle io_poll_create()
{
return kqueue();
}
-int io_poll_start_read(int pollfd, int fd, void *data,void *)
+int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
@@ -361,7 +372,7 @@ int io_poll_start_read(int pollfd, int fd, void *data,void *)
}
-int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
+int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *)
{
struct kevent ke;
MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT,
@@ -370,7 +381,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data,void *)
}
-int io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
struct kevent ke;
MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
@@ -378,7 +389,7 @@ int io_poll_disassociate_fd(int pollfd, int fd)
}
-int io_poll_wait(int pollfd, struct kevent *events, int maxevents, int timeout_ms)
+int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms)
{
struct timespec ts;
int ret;
@@ -403,27 +414,27 @@ static void* native_event_get_userdata(native_event *event)
#elif defined (__sun)
-static int io_poll_create()
+static TP_file_handle io_poll_create()
{
return port_create();
}
-int io_poll_start_read(int pollfd, int fd, void *data, void *)
+int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data);
}
-static int io_poll_associate_fd(int pollfd, int fd, void *data, void *)
+static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *)
{
return io_poll_start_read(pollfd, fd, data, 0);
}
-int io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
return port_dissociate(pollfd, PORT_SOURCE_FD, fd);
}
-int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
+int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
{
struct timespec ts;
int ret;
@@ -451,25 +462,32 @@ static void* native_event_get_userdata(native_event *event)
#elif defined(HAVE_IOCP)
-static int io_poll_create()
+static TP_file_handle io_poll_create()
{
- HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
- return PtrToInt(h);
+ return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
}
-int io_poll_start_read(int pollfd, int fd, void *, void *opt)
+int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
{
- DWORD num_bytes = 0;
static char c;
+ TP_connection_generic *con= (TP_connection_generic *)opt;
+ OVERLAPPED *overlapped= &con->overlapped;
+ if (con->vio_type == VIO_TYPE_NAMEDPIPE)
+ {
+ if (ReadFile(fd, &c, 0, NULL, overlapped))
+ return 0;
+ }
+ else
+ {
+ WSABUF buf;
+ buf.buf= &c;
+ buf.len= 0;
+ DWORD flags=0;
- WSABUF buf;
- buf.buf= &c;
- buf.len= 0;
- DWORD flags=0;
-
- if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0)
- return 0;
+ if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
+ return 0;
+ }
if (GetLastError() == ERROR_IO_PENDING)
return 0;
@@ -478,26 +496,26 @@ int io_poll_start_read(int pollfd, int fd, void *, void *opt)
}
-static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt)
+static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt)
{
- HANDLE h= CreateIoCompletionPort(IntToPtr(fd), IntToPtr(pollfd), (ULONG_PTR)data, 0);
+ HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0);
if (!h)
return -1;
return io_poll_start_read(pollfd,fd, 0, opt);
}
-int io_poll_disassociate_fd(int pollfd, int fd)
+int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
{
/* Not possible to unbind/rebind file descriptor in IOCP. */
return 0;
}
-int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms)
+int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
{
ULONG n;
- BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events,
+ BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
maxevents, &n, timeout_ms, FALSE);
return ok ? (int)n : -1;
@@ -1038,7 +1056,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
DBUG_ENTER("thread_group_init");
thread_group->pthread_attr = thread_attr;
mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL);
- thread_group->pollfd= -1;
+ thread_group->pollfd= INVALID_HANDLE_VALUE;
thread_group->shutdown_pipe[0]= -1;
thread_group->shutdown_pipe[1]= -1;
queue_init(thread_group);
@@ -1049,10 +1067,10 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr)
void thread_group_destroy(thread_group_t *thread_group)
{
mysql_mutex_destroy(&thread_group->mutex);
- if (thread_group->pollfd != -1)
+ if (thread_group->pollfd != INVALID_HANDLE_VALUE)
{
io_poll_close(thread_group->pollfd);
- thread_group->pollfd= -1;
+ thread_group->pollfd= INVALID_HANDLE_VALUE;
}
#ifndef HAVE_IOCP
for(int i=0; i < 2; i++)
@@ -1109,7 +1127,7 @@ static int wake_listener(thread_group_t *thread_group)
if (write(thread_group->shutdown_pipe[1], &c, 1) < 0)
return -1;
#else
- PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0);
+ PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0);
#endif
return 0;
}
@@ -1432,6 +1450,16 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
, overlapped()
#endif
{
+ DBUG_ASSERT(c->vio);
+
+#ifdef _WIN32
+ vio_type= c->vio->type;
+ fd= (vio_type == VIO_TYPE_NAMEDPIPE) ?
+ c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket);
+#else
+ fd= mysql_socket_getfd(c->vio->mysql_socket);
+#endif
+
/* Assign connection to a group. */
thread_group_t *group=
&all_groups[c->thread_id%group_count];
@@ -1486,7 +1514,6 @@ static int change_group(TP_connection_generic *c,
thread_group_t *new_group)
{
int ret= 0;
- int fd= (int)mysql_socket_getfd(c->thd->net.vio->mysql_socket);
DBUG_ASSERT(c->thread_group == old_group);
@@ -1494,7 +1521,7 @@ static int change_group(TP_connection_generic *c,
mysql_mutex_lock(&old_group->mutex);
if (c->bound_to_poll_descriptor)
{
- io_poll_disassociate_fd(old_group->pollfd,fd);
+ io_poll_disassociate_fd(old_group->pollfd,c->fd);
c->bound_to_poll_descriptor= false;
}
c->thread_group->connection_count--;
@@ -1513,9 +1540,7 @@ static int change_group(TP_connection_generic *c,
int TP_connection_generic::start_io()
-{
- int fd= (int)mysql_socket_getfd(thd->net.vio->mysql_socket);
-
+{
#ifndef HAVE_IOCP
/*
Usually, connection will stay in the same group for the entire
@@ -1666,10 +1691,10 @@ int TP_pool_generic::set_pool_size(uint size)
{
thread_group_t *group= &all_groups[i];
mysql_mutex_lock(&group->mutex);
- if (group->pollfd == -1)
+ if (group->pollfd == INVALID_HANDLE_VALUE)
{
group->pollfd= io_poll_create();
- success= (group->pollfd >= 0);
+ success= (group->pollfd != INVALID_HANDLE_VALUE);
if(!success)
{
sql_print_error("io_poll_create() failed, errno=%d\n", errno);
@@ -1707,7 +1732,7 @@ int TP_pool_generic::set_stall_limit(uint limit)
int TP_pool_generic::get_idle_thread_count()
{
int sum=0;
- for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++)
+ for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++)
{
sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count);
}