diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2017-11-20 20:39:59 +0000 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2017-11-28 13:23:51 +0000 |
commit | e02b8608612ad827dd789a6ac8113fee62a9d782 (patch) | |
tree | 254646378b93d3e806e2e057a02f3f07d3014ae8 /sql/threadpool_generic.cc | |
parent | 77872e4519a9ebba2cd2ab4f5794faccf0a1f879 (diff) | |
download | mariadb-git-e02b8608612ad827dd789a6ac8113fee62a9d782.tar.gz |
Windows, generic threadpool cleanups
- get rid of casts between int and HANDLE.
- store file descriptor in TP_connection_generic to get
rid of multiple mysql_socket_getfd(c->thd->net.vio->mysql_socket))
Also support named pipes (no reason not to support, since it is easy)
Diffstat (limited to 'sql/threadpool_generic.cc')
-rw-r--r-- | sql/threadpool_generic.cc | 129 |
1 files changed, 77 insertions, 52 deletions
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc index e90f87fffc4..f6fdd97c6df 100644 --- a/sql/threadpool_generic.cc +++ b/sql/threadpool_generic.cc @@ -28,11 +28,19 @@ #endif #ifdef HAVE_IOCP -#define OPTIONAL_IO_POLL_READ_PARAM &overlapped +#define OPTIONAL_IO_POLL_READ_PARAM this #else #define OPTIONAL_IO_POLL_READ_PARAM 0 #endif +#ifdef _WIN32 +typedef HANDLE TP_file_handle; +#else +typedef int TP_file_handle; +#define INVALID_HANDLE_VALUE -1 +#endif + + #include <sql_connect.h> #include <mysqld.h> #include <debug_sync.h> @@ -59,10 +67,10 @@ typedef OVERLAPPED_ENTRY native_event; #pragma warning (disable : 4312) #endif -static void io_poll_close(int fd) +static void io_poll_close(TP_file_handle fd) { #ifdef _WIN32 - CloseHandle((HANDLE)fd); + CloseHandle(fd); #else close(fd); #endif @@ -151,14 +159,17 @@ struct TP_connection_generic:public TP_connection TP_connection_generic **prev_in_queue; ulonglong abs_wait_timeout; ulonglong dequeue_time; + TP_file_handle fd; bool bound_to_poll_descriptor; int waiting; #ifdef HAVE_IOCP OVERLAPPED overlapped; #endif +#ifdef _WIN32 + enum_vio_type vio_type; +#endif }; -typedef TP_connection_generic TP_connection_generic; typedef I_P_List<TP_connection_generic, I_P_List_adapter<TP_connection_generic, @@ -177,7 +188,7 @@ struct thread_group_t worker_list_t waiting_threads; worker_thread_t *listener; pthread_attr_t *pthread_attr; - int pollfd; + TP_file_handle pollfd; int thread_count; int active_thread_count; int connection_count; @@ -245,11 +256,11 @@ static void print_pool_blocked_message(bool); Creates an io_poll descriptor On Linux: epoll_create() - - io_poll_associate_fd(int poll_fd, int fd, void *data, void *opt) + - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt) Associate file descriptor with io poll descriptor On Linux : epoll_ctl(..EPOLL_CTL_ADD)) - - io_poll_disassociate_fd(int pollfd, int fd) + - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) Associate file descriptor with io poll descriptor On Linux: epoll_ctl(..EPOLL_CTL_DEL) @@ -259,7 +270,7 @@ static void print_pool_blocked_message(bool); io_poll_associate_fd() was called. On Linux : epoll_ctl(..EPOLL_CTL_MOD) - - io_poll_wait (int pollfd, native_event *native_events, int maxevents, + - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents, int timeout_ms) wait until one or more descriptors added with io_poll_associate_fd() @@ -276,13 +287,13 @@ static void print_pool_blocked_message(bool); /* Early 2.6 kernel did not have EPOLLRDHUP */ #define EPOLLRDHUP 0 #endif -static int io_poll_create() +static TP_file_handle io_poll_create() { return epoll_create(1); } -int io_poll_associate_fd(int pollfd, int fd, void *data, void*) +int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*) { struct epoll_event ev; ev.data.u64= 0; /* Keep valgrind happy */ @@ -293,7 +304,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data, void*) -int io_poll_start_read(int pollfd, int fd, void *data, void *) +int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) { struct epoll_event ev; ev.data.u64= 0; /* Keep valgrind happy */ @@ -302,7 +313,7 @@ int io_poll_start_read(int pollfd, int fd, void *data, void *) return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev); } -int io_poll_disassociate_fd(int pollfd, int fd) +int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) { struct epoll_event ev; return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev); @@ -314,7 +325,7 @@ int io_poll_disassociate_fd(int pollfd, int fd) NOTE - in case of EINTR, it restarts with original timeout. Since we use either infinite or 0 timeouts, this is not critical */ -int io_poll_wait(int pollfd, native_event *native_events, int maxevents, +int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents, int timeout_ms) { int ret; @@ -347,12 +358,12 @@ static void *native_event_get_userdata(native_event *event) #endif -int io_poll_create() +TP_file_handle io_poll_create() { return kqueue(); } -int io_poll_start_read(int pollfd, int fd, void *data,void *) +int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *) { struct kevent ke; MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, @@ -361,7 +372,7 @@ int io_poll_start_read(int pollfd, int fd, void *data,void *) } -int io_poll_associate_fd(int pollfd, int fd, void *data,void *) +int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *) { struct kevent ke; MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, @@ -370,7 +381,7 @@ int io_poll_associate_fd(int pollfd, int fd, void *data,void *) } -int io_poll_disassociate_fd(int pollfd, int fd) +int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) { struct kevent ke; MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0); @@ -378,7 +389,7 @@ int io_poll_disassociate_fd(int pollfd, int fd) } -int io_poll_wait(int pollfd, struct kevent *events, int maxevents, int timeout_ms) +int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms) { struct timespec ts; int ret; @@ -403,27 +414,27 @@ static void* native_event_get_userdata(native_event *event) #elif defined (__sun) -static int io_poll_create() +static TP_file_handle io_poll_create() { return port_create(); } -int io_poll_start_read(int pollfd, int fd, void *data, void *) +int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) { return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data); } -static int io_poll_associate_fd(int pollfd, int fd, void *data, void *) +static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) { return io_poll_start_read(pollfd, fd, data, 0); } -int io_poll_disassociate_fd(int pollfd, int fd) +int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) { return port_dissociate(pollfd, PORT_SOURCE_FD, fd); } -int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms) +int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms) { struct timespec ts; int ret; @@ -451,25 +462,32 @@ static void* native_event_get_userdata(native_event *event) #elif defined(HAVE_IOCP) -static int io_poll_create() +static TP_file_handle io_poll_create() { - HANDLE h= CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); - return PtrToInt(h); + return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); } -int io_poll_start_read(int pollfd, int fd, void *, void *opt) +int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt) { - DWORD num_bytes = 0; static char c; + TP_connection_generic *con= (TP_connection_generic *)opt; + OVERLAPPED *overlapped= &con->overlapped; + if (con->vio_type == VIO_TYPE_NAMEDPIPE) + { + if (ReadFile(fd, &c, 0, NULL, overlapped)) + return 0; + } + else + { + WSABUF buf; + buf.buf= &c; + buf.len= 0; + DWORD flags=0; - WSABUF buf; - buf.buf= &c; - buf.len= 0; - DWORD flags=0; - - if (WSARecv((SOCKET)fd, &buf, 1, &num_bytes, &flags, (OVERLAPPED *)opt, NULL) == 0) - return 0; + if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0) + return 0; + } if (GetLastError() == ERROR_IO_PENDING) return 0; @@ -478,26 +496,26 @@ int io_poll_start_read(int pollfd, int fd, void *, void *opt) } -static int io_poll_associate_fd(int pollfd, int fd, void *data, void *opt) +static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt) { - HANDLE h= CreateIoCompletionPort(IntToPtr(fd), IntToPtr(pollfd), (ULONG_PTR)data, 0); + HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0); if (!h) return -1; return io_poll_start_read(pollfd,fd, 0, opt); } -int io_poll_disassociate_fd(int pollfd, int fd) +int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) { /* Not possible to unbind/rebind file descriptor in IOCP. */ return 0; } -int io_poll_wait(int pollfd, native_event *events, int maxevents, int timeout_ms) +int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms) { ULONG n; - BOOL ok = GetQueuedCompletionStatusEx((HANDLE)pollfd, events, + BOOL ok = GetQueuedCompletionStatusEx(pollfd, events, maxevents, &n, timeout_ms, FALSE); return ok ? (int)n : -1; @@ -1038,7 +1056,7 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) DBUG_ENTER("thread_group_init"); thread_group->pthread_attr = thread_attr; mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL); - thread_group->pollfd= -1; + thread_group->pollfd= INVALID_HANDLE_VALUE; thread_group->shutdown_pipe[0]= -1; thread_group->shutdown_pipe[1]= -1; queue_init(thread_group); @@ -1049,10 +1067,10 @@ int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) void thread_group_destroy(thread_group_t *thread_group) { mysql_mutex_destroy(&thread_group->mutex); - if (thread_group->pollfd != -1) + if (thread_group->pollfd != INVALID_HANDLE_VALUE) { io_poll_close(thread_group->pollfd); - thread_group->pollfd= -1; + thread_group->pollfd= INVALID_HANDLE_VALUE; } #ifndef HAVE_IOCP for(int i=0; i < 2; i++) @@ -1109,7 +1127,7 @@ static int wake_listener(thread_group_t *thread_group) if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) return -1; #else - PostQueuedCompletionStatus((HANDLE)thread_group->pollfd, 0, 0, 0); + PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0); #endif return 0; } @@ -1432,6 +1450,16 @@ TP_connection_generic::TP_connection_generic(CONNECT *c): , overlapped() #endif { + DBUG_ASSERT(c->vio); + +#ifdef _WIN32 + vio_type= c->vio->type; + fd= (vio_type == VIO_TYPE_NAMEDPIPE) ? + c->vio->hPipe: (TP_file_handle)mysql_socket_getfd(c->vio->mysql_socket); +#else + fd= mysql_socket_getfd(c->vio->mysql_socket); +#endif + /* Assign connection to a group. */ thread_group_t *group= &all_groups[c->thread_id%group_count]; @@ -1486,7 +1514,6 @@ static int change_group(TP_connection_generic *c, thread_group_t *new_group) { int ret= 0; - int fd= (int)mysql_socket_getfd(c->thd->net.vio->mysql_socket); DBUG_ASSERT(c->thread_group == old_group); @@ -1494,7 +1521,7 @@ static int change_group(TP_connection_generic *c, mysql_mutex_lock(&old_group->mutex); if (c->bound_to_poll_descriptor) { - io_poll_disassociate_fd(old_group->pollfd,fd); + io_poll_disassociate_fd(old_group->pollfd,c->fd); c->bound_to_poll_descriptor= false; } c->thread_group->connection_count--; @@ -1513,9 +1540,7 @@ static int change_group(TP_connection_generic *c, int TP_connection_generic::start_io() -{ - int fd= (int)mysql_socket_getfd(thd->net.vio->mysql_socket); - +{ #ifndef HAVE_IOCP /* Usually, connection will stay in the same group for the entire @@ -1666,10 +1691,10 @@ int TP_pool_generic::set_pool_size(uint size) { thread_group_t *group= &all_groups[i]; mysql_mutex_lock(&group->mutex); - if (group->pollfd == -1) + if (group->pollfd == INVALID_HANDLE_VALUE) { group->pollfd= io_poll_create(); - success= (group->pollfd >= 0); + success= (group->pollfd != INVALID_HANDLE_VALUE); if(!success) { sql_print_error("io_poll_create() failed, errno=%d\n", errno); @@ -1707,7 +1732,7 @@ int TP_pool_generic::set_stall_limit(uint limit) int TP_pool_generic::get_idle_thread_count() { int sum=0; - for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd >= 0; i++) + for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++) { sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count); } |