summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/violite.h1
-rw-r--r--sql/CMakeLists.txt2
-rw-r--r--sql/threadpool.h5
-rw-r--r--sql/threadpool_common.cc18
-rw-r--r--sql/threadpool_generic.cc72
-rw-r--r--sql/threadpool_generic.h20
-rw-r--r--sql/threadpool_win.cc163
-rw-r--r--sql/threadpool_winsockets.cc259
-rw-r--r--sql/threadpool_winsockets.h80
9 files changed, 448 insertions, 172 deletions
diff --git a/include/violite.h b/include/violite.h
index 59fac20c376..c7feca683cc 100644
--- a/include/violite.h
+++ b/include/violite.h
@@ -280,6 +280,7 @@ struct st_vio
#ifdef _WIN32
HANDLE hPipe;
OVERLAPPED overlapped;
+ void *tp_ctx; /* threadpool context */
#endif
};
#endif /* vio_violite_h_ */
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 0dc3caab507..c2bf0c68f2f 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -171,7 +171,7 @@ IF ((CMAKE_SYSTEM_NAME MATCHES "Linux" OR
AND (NOT DISABLE_THREADPOOL))
ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS)
IF(WIN32)
- SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
+ SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc threadpool_winsockets.cc threadpool_winsockets.h)
ENDIF()
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_common.cc)
diff --git a/sql/threadpool.h b/sql/threadpool.h
index b781e50b992..285b46e3b27 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -37,6 +37,8 @@ extern uint threadpool_mode; /* Thread pool implementation , windows or generic
#define DEFAULT_THREADPOOL_STALL_LIMIT 500U
struct TP_connection;
+struct st_vio;
+
extern void tp_callback(TP_connection *c);
extern void tp_timeout_handler(TP_connection *c);
@@ -112,7 +114,7 @@ struct TP_connection
virtual void wait_begin(int type)= 0;
virtual void wait_end() = 0;
-
+ IF_WIN(virtual,) void init_vio(st_vio *){};
};
@@ -133,6 +135,7 @@ struct TP_pool
};
#ifdef _WIN32
+
struct TP_pool_win:TP_pool
{
TP_pool_win();
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index bd8573951b6..9acd5fed1e7 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -28,6 +28,10 @@
#include "wsrep_trans_observer.h"
#endif /* WITH_WSREP */
+#ifdef _WIN32
+#include "threadpool_winsockets.h"
+#endif
+
/* Threadpool parameters */
uint threadpool_min_threads;
@@ -48,7 +52,7 @@ TP_STATISTICS tp_stats;
static void threadpool_remove_connection(THD *thd);
static int threadpool_process_request(THD *thd);
-static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
+static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
extern bool do_command(THD*);
@@ -220,7 +224,7 @@ error:
}
-static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
+static THD *threadpool_add_connection(CONNECT *connect, TP_connection *c)
{
THD *thd= NULL;
@@ -243,7 +247,7 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
}
delete connect;
- thd->event_scheduler.data = scheduler_data;
+ thd->event_scheduler.data = c;
server_threads.insert(thd);
thd->set_mysys_var(mysys_var);
@@ -261,6 +265,8 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
if (thd_prepare_connection(thd))
goto end;
+ c->init_vio(thd->net.vio);
+
/*
Check if THD is ok, as prepare_new_connection_state()
can fail, for example if init command failed.
@@ -397,6 +403,9 @@ static bool tp_init()
pool= 0;
return true;
}
+#ifdef _WIN32
+ init_win_aio_buffers(max_connections);
+#endif
return false;
}
@@ -484,6 +493,9 @@ static void tp_wait_end(THD *thd)
static void tp_end()
{
delete pool;
+#ifdef _WIN32
+ destroy_win_aio_buffers();
+#endif
}
static void tp_post_kill_notification(THD *thd)
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc
index d9286602ace..2a5587fa04a 100644
--- a/sql/threadpool_generic.cc
+++ b/sql/threadpool_generic.cc
@@ -29,8 +29,8 @@
#include <sql_plist.h>
#include <threadpool.h>
#include <algorithm>
-
-#ifdef HAVE_IOCP
+#ifdef _WIN32
+#include "threadpool_winsockets.h"
#define OPTIONAL_IO_POLL_READ_PARAM this
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
@@ -347,7 +347,7 @@ static void* native_event_get_userdata(native_event *event)
return event->portev_user;
}
-#elif defined(HAVE_IOCP)
+#elif defined(_WIN32)
static TP_file_handle io_poll_create()
@@ -358,29 +358,8 @@ static TP_file_handle io_poll_create()
int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
{
- static char c;
- TP_connection_generic *con= (TP_connection_generic *)opt;
- OVERLAPPED *overlapped= &con->overlapped;
- if (con->vio_type == VIO_TYPE_NAMEDPIPE)
- {
- if (ReadFile(fd, &c, 0, NULL, overlapped))
- return 0;
- }
- else
- {
- WSABUF buf;
- buf.buf= &c;
- buf.len= 0;
- DWORD flags=0;
-
- if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
- return 0;
- }
-
- if (GetLastError() == ERROR_IO_PENDING)
- return 0;
-
- return 1;
+ auto c= (TP_connection_generic *) opt;
+ return (int) c->win_sock.begin_read();
}
@@ -429,20 +408,33 @@ int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
}
-int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
+static void *native_event_get_userdata(native_event *event)
{
- ULONG n;
- BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
- maxevents, &n, timeout_ms, FALSE);
-
- return ok ? (int)n : -1;
+ return (void *) event->lpCompletionKey;
}
-
-static void* native_event_get_userdata(native_event *event)
+int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents,
+ int timeout_ms)
{
- return (void *)event->lpCompletionKey;
+ ULONG n;
+ if (!GetQueuedCompletionStatusEx(pollfd, events, maxevents, &n, timeout_ms, FALSE))
+ return -1;
+
+ /* Update win_sock with number of bytes read.*/
+ for (ULONG i= 0; i < n; i++)
+ {
+ auto ev= &events[i];
+ auto c= (TP_connection_generic *) native_event_get_userdata(ev);
+ /* null userdata zero means shutdown (see PostQueuedCompletionStatus() usage*/
+ if (c)
+ {
+ c->win_sock.end_read(ev->dwNumberOfBytesTransferred, 0);
+ }
+ }
+
+ return (int) n;
}
+
#endif
@@ -995,7 +987,7 @@ void thread_group_destroy(thread_group_t *thread_group)
io_poll_close(thread_group->pollfd);
thread_group->pollfd= INVALID_HANDLE_VALUE;
}
-#ifndef HAVE_IOCP
+#ifndef _WIN32
for(int i=0; i < 2; i++)
{
if(thread_group->shutdown_pipe[i] != -1)
@@ -1039,7 +1031,7 @@ static int wake_thread(thread_group_t *thread_group,bool due_to_stall)
*/
static int wake_listener(thread_group_t *thread_group)
{
-#ifndef HAVE_IOCP
+#ifndef _WIN32
if (pipe(thread_group->shutdown_pipe))
{
return -1;
@@ -1385,12 +1377,6 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
bound_to_poll_descriptor(false),
waiting(false),
fix_group(false)
-#ifdef HAVE_IOCP
-, overlapped()
-#endif
-#ifdef _WIN32
-, vio_type(c->vio_type)
-#endif
{
DBUG_ASSERT(c->vio_type != VIO_CLOSED);
diff --git a/sql/threadpool_generic.h b/sql/threadpool_generic.h
index acf5ec6978b..1d50294a04a 100644
--- a/sql/threadpool_generic.h
+++ b/sql/threadpool_generic.h
@@ -23,6 +23,7 @@
#ifdef _WIN32
#include <windows.h>
+#include "threadpool_winsockets.h"
/* AIX may define this, too ?*/
#define HAVE_IOCP
#endif
@@ -75,11 +76,11 @@ struct TP_connection_generic :public TP_connection
TP_connection_generic(CONNECT* c);
~TP_connection_generic();
- virtual int init() { return 0; };
- virtual void set_io_timeout(int sec);
- virtual int start_io();
- virtual void wait_begin(int type);
- virtual void wait_end();
+ int init() override { return 0; }
+ void set_io_timeout(int sec) override;
+ int start_io() override;
+ void wait_begin(int type) override;
+ void wait_end() override;
thread_group_t* thread_group;
TP_connection_generic* next_in_queue;
@@ -90,12 +91,13 @@ struct TP_connection_generic :public TP_connection
bool bound_to_poll_descriptor;
int waiting;
bool fix_group;
-#ifdef HAVE_IOCP
- OVERLAPPED overlapped;
-#endif
+
#ifdef _WIN32
- enum_vio_type vio_type;
+ win_aiosocket win_sock{};
+ void init_vio(st_vio *vio) override
+ { win_sock.init(vio);}
#endif
+
};
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index 6003b06bc7b..515bf0e02bc 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -31,6 +31,8 @@
#include <threadpool.h>
#include <windows.h>
+#include "threadpool_winsockets.h"
+
/* Log a warning */
static void tp_log_warning(const char *msg, const char *fct)
{
@@ -43,8 +45,6 @@ static PTP_POOL pool;
static TP_CALLBACK_ENVIRON callback_environ;
static DWORD fls;
-static bool skip_completion_port_on_success = false;
-
PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ()
{
return pool? &callback_environ: 0;
@@ -83,22 +83,21 @@ struct TP_connection_win:public TP_connection
public:
TP_connection_win(CONNECT*);
~TP_connection_win();
- virtual int init();
- virtual int start_io();
- virtual void set_io_timeout(int sec);
- virtual void wait_begin(int type);
- virtual void wait_end();
-
- ulonglong timeout;
- enum_vio_type vio_type;
- HANDLE handle;
- OVERLAPPED overlapped;
- PTP_CALLBACK_INSTANCE callback_instance;
- PTP_IO io;
- PTP_TIMER timer;
- PTP_WORK work;
- bool long_callback;
-
+ int init() override;
+ void init_vio(st_vio *vio) override;
+ int start_io() override;
+ void set_io_timeout(int sec) override;
+ void wait_begin(int type) override;
+ void wait_end() override;
+
+ ulonglong timeout=ULLONG_MAX;
+ OVERLAPPED overlapped{};
+ PTP_CALLBACK_INSTANCE callback_instance{};
+ PTP_IO io{};
+ PTP_TIMER timer{};
+ PTP_WORK work{};
+ bool long_callback{};
+ win_aiosocket sock;
};
struct TP_connection *new_TP_connection(CONNECT *connect)
@@ -125,120 +124,50 @@ void TP_pool_win::add(TP_connection *c)
}
}
+#define CHECK_ALLOC_ERROR(op) \
+ do \
+ { \
+ if (!(op)) \
+ { \
+ tp_log_warning("Allocation failed", #op); \
+ } \
+ } while (0)
TP_connection_win::TP_connection_win(CONNECT *c) :
- TP_connection(c),
- timeout(ULONGLONG_MAX),
- callback_instance(0),
- io(0),
- timer(0),
- work(0)
+ TP_connection(c)
{
-}
+ /* Assign io completion callback */
+ HANDLE h= c->vio_type == VIO_TYPE_NAMEDPIPE ? c->pipe
+ : (HANDLE)mysql_socket_getfd(c->sock);
-#define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; }
+ CHECK_ALLOC_ERROR(io=CreateThreadpoolIo(h, io_completion_callback, this, &callback_environ));
+ CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
+ CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
+}
int TP_connection_win::init()
{
-
- memset(&overlapped, 0, sizeof(OVERLAPPED));
- switch ((vio_type = connect->vio_type))
- {
- case VIO_TYPE_SSL:
- case VIO_TYPE_TCPIP:
- handle= (HANDLE) mysql_socket_getfd(connect->sock);
- break;
- case VIO_TYPE_NAMEDPIPE:
- handle= connect->pipe;
- break;
- default:
- abort();
- }
-
-
- /* Performance tweaks (s. MSDN documentation)*/
- UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE;
- if (skip_completion_port_on_success)
- {
- flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
- }
- (void)SetFileCompletionNotificationModes(handle, flags);
- /* Assign io completion callback */
- CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
- CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
- CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
- return 0;
+ return !io || !timer || !work ;
}
+void TP_connection_win::init_vio(st_vio* vio)
+{
+ sock.init(vio);
+}
/*
Start asynchronous read
*/
int TP_connection_win::start_io()
{
- DWORD num_bytes = 0;
- static char c;
- WSABUF buf;
- buf.buf= &c;
- buf.len= 0;
- DWORD flags=0;
- DWORD last_error= 0;
-
- int retval;
StartThreadpoolIo(io);
-
- if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL)
+ if (sock.begin_read())
{
- /* Start async io (sockets). */
- if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags,
- &overlapped, NULL) == 0)
- {
- retval= last_error= 0;
- }
- else
- {
- retval= -1;
- last_error= WSAGetLastError();
- }
- }
- else
- {
- /* Start async io (named pipe) */
- if (ReadFile(handle, &c, 0, &num_bytes,&overlapped))
- {
- retval= last_error= 0;
- }
- else
- {
- retval= -1;
- last_error= GetLastError();
- }
- }
-
- if (retval == 0 || last_error == ERROR_MORE_DATA)
- {
- /*
- IO successfully finished (synchronously).
- If skip_completion_port_on_success is set, we need to handle it right
- here, because completion callback would not be executed by the pool.
- */
- if (skip_completion_port_on_success)
- {
- CancelThreadpoolIo(io);
- io_completion_callback(callback_instance, this, &overlapped, last_error,
- num_bytes, io);
- }
- return 0;
- }
-
- if (last_error == ERROR_IO_PENDING)
- {
- return 0;
+ /* Some error occurred */
+ CancelThreadpoolIo(io);
+ return -1;
}
-
- /* Some error occurred */
- CancelThreadpoolIo(io);
- return -1;
+ return 0;
}
/*
@@ -305,7 +234,7 @@ void tp_win_callback_prolog()
{
/* Running in new worker thread*/
FlsSetValue(fls, (void *)1);
- statistic_increment(thread_created, &LOCK_status);
+ thread_created++;
tp_stats.num_worker_threads++;
my_thread_init();
}
@@ -350,6 +279,10 @@ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
{
TP_connection_win *c= (TP_connection_win *)context;
+
+ /* How many bytes were preread into read buffer */
+ c->sock.end_read((ULONG)nbytes, io_result);
+
/*
Execute high priority connections immediately.
'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)
diff --git a/sql/threadpool_winsockets.cc b/sql/threadpool_winsockets.cc
new file mode 100644
index 00000000000..6b4758a451f
--- /dev/null
+++ b/sql/threadpool_winsockets.cc
@@ -0,0 +1,259 @@
+/* Copyright (C) 2012 Monty Program Ab
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+ */
+
+#include <winsock2.h>
+#include <my_global.h>
+#include <violite.h>
+#include "threadpool_winsockets.h"
+#include <algorithm>
+#include <vector>
+#include <mutex>
+
+/*
+ A cache for IO buffers for asynchronous socket(or named pipe) reads.
+
+ Considerations on Windows : since Windows locks the AIO buffers in physical memory,
+ it is important that these buffers are compactly allocated.
+ We try to to prevent any kinds of memory fragmentation
+
+ A relatively small region (at most 1MB) is allocated, for equally sized smallish(256 bytes)
+ This allow buffers. The region is pagesize-aligned (via VirtualAlloc allocation)
+
+ We use smallish IO buffers, 256 bytes is probably large enough for most of
+ the queries. Larger buffers could have funny effects(thread hogginng)
+ on threadpool scheduling in case client is using protocol pipelining.
+
+ Also note, that even in an unlikely situation where cache runs out of buffers,
+ this does not lead to errors, zero szed reads will be used in WSARecv then.
+*/
+
+constexpr size_t READ_BUFSIZ= 256;
+class AIO_buffer_cache
+{
+ const size_t ITEM_SIZE= READ_BUFSIZ;
+
+ /** Limit the whole cache to 1MB*/
+ const size_t MAX_SIZE= 1048576;
+
+ /* Allocation base */
+ char *m_base= 0;
+
+ /* "Free list" with LIFO policy */
+ std::vector<char *> m_cache;
+ std::mutex m_mtx;
+ size_t m_elements=0;
+
+public:
+ void set_size(size_t n_items);
+ char *acquire_buffer();
+ void release_buffer(char *v);
+ void clear();
+ ~AIO_buffer_cache();
+};
+
+
+void AIO_buffer_cache::set_size(size_t n_items)
+{
+ DBUG_ASSERT(!m_base);
+ m_elements= std::min(n_items, MAX_SIZE / ITEM_SIZE);
+ auto sz= m_elements * ITEM_SIZE;
+
+ m_base=
+ (char *) VirtualAlloc(0, sz, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
+ if (!m_base)
+ {
+ m_elements= 0;
+ return;
+ }
+
+ /* Try to help memory manager here, by prelocking region in memory*/
+ (void) VirtualLock(m_base, sz);
+
+ m_cache.reserve(m_elements);
+ for (ssize_t i= m_elements - 1; i >= 0 ; i--)
+ m_cache.push_back(m_base + i * ITEM_SIZE);
+}
+
+/*
+ Returns a buffer, or NULL if no free buffers.
+
+ LIFO policy is implemented, so we do not touch too many
+ pages (no std::stack though)
+*/
+char *AIO_buffer_cache::acquire_buffer()
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_cache.empty())
+ return nullptr;
+ auto p= m_cache.back();
+ m_cache.pop_back();
+ return p;
+}
+
+void AIO_buffer_cache::release_buffer(char *v)
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_cache.push_back(v);
+}
+
+void AIO_buffer_cache::clear()
+{
+ if (!m_base)
+ return;
+
+ /* Check that all items are returned to the cache. */
+ DBUG_ASSERT(m_cache.size() == m_elements);
+ VirtualFree(m_base, 0, MEM_RELEASE);
+ m_cache.clear();
+ m_base= 0;
+ m_elements= 0;
+}
+
+AIO_buffer_cache::~AIO_buffer_cache() { clear(); }
+
+/* Global variable for the cache buffers.*/
+AIO_buffer_cache read_buffers;
+
+win_aiosocket::~win_aiosocket()
+{
+ if (m_buf_ptr)
+ read_buffers.release_buffer(m_buf_ptr);
+}
+
+
+/** Return number of unread bytes.*/
+size_t win_aiosocket::buffer_remaining()
+{
+ return m_buf_datalen - m_buf_off;
+}
+
+static my_bool my_vio_has_data(st_vio *vio)
+{
+ auto sock= (win_aiosocket *) vio->tp_ctx;
+ return sock->buffer_remaining() || sock->m_orig_vio_has_data(vio);
+}
+
+/*
+ (Half-)buffered read.
+
+ The buffer is filled once, by completion of the async IO.
+
+ We do not refill the buffer once it is read off,
+ does not make sense.
+*/
+static size_t my_vio_read(st_vio *vio, uchar *dest, size_t sz)
+{
+ auto sock= (win_aiosocket *) vio->tp_ctx;
+ DBUG_ASSERT(sock);
+
+ auto nbytes= std::min(sock->buffer_remaining(), sz);
+
+ if (nbytes > 0)
+ {
+ /* Copy to output, adjust the offset.*/
+ memcpy(dest, sock->m_buf_ptr + sock->m_buf_off, nbytes);
+ sock->m_buf_off += nbytes;
+ return nbytes;
+ }
+
+ return sock->m_orig_vio_read(vio, dest, sz);
+}
+
+DWORD win_aiosocket::begin_read()
+{
+ DWORD err = ERROR_SUCCESS;
+ static char c;
+ WSABUF buf;
+
+ DBUG_ASSERT(!buffer_remaining());
+
+ /*
+ If there is no internal buffer to store data,
+ we do zero size read, but still need a valid
+ pointer for the buffer parameter.
+ */
+ if (m_buf_ptr)
+ buf= {(ULONG)READ_BUFSIZ, m_buf_ptr};
+ else
+ buf= {0, &c};
+
+
+ if (!m_is_pipe)
+ {
+ /* Do async io (sockets). */
+ DWORD flags= 0;
+ if (WSARecv((SOCKET) m_handle, &buf, 1, 0, &flags, &m_overlapped, NULL))
+ err= WSAGetLastError();
+ }
+ else
+ {
+ /* Do async read (named pipe) */
+ if (ReadFile(m_handle, buf.buf, buf.len, 0, &m_overlapped))
+ err= GetLastError();
+ }
+
+ if (!err || err == ERROR_IO_PENDING)
+ return 0;
+ return err;
+}
+
+void win_aiosocket::end_read(ULONG nbytes, DWORD err)
+{
+ DBUG_ASSERT(!buffer_remaining());
+ DBUG_ASSERT(!nbytes || m_buf_ptr);
+ m_buf_off= 0;
+ m_buf_datalen= nbytes;
+}
+
+void win_aiosocket::init(Vio *vio)
+{
+ m_is_pipe= vio->type == VIO_TYPE_NAMEDPIPE;
+ m_handle=
+ m_is_pipe ? vio->hPipe : (HANDLE) mysql_socket_getfd(vio->mysql_socket);
+
+ SetFileCompletionNotificationModes(m_handle, FILE_SKIP_SET_EVENT_ON_HANDLE);
+ if (vio->type == VIO_TYPE_SSL)
+ {
+ /*
+ TODO : This requires fixing viossl to call our manipulated VIO
+ */
+ return;
+ }
+
+ if (!(m_buf_ptr = read_buffers.acquire_buffer()))
+ {
+ /* Ran out of buffers, that's fine.*/
+ return;
+ }
+
+ vio->tp_ctx= this;
+
+ m_orig_vio_has_data= vio->has_data;
+ vio->has_data= my_vio_has_data;
+
+ m_orig_vio_read= vio->read;
+ vio->read= my_vio_read;
+}
+
+void init_win_aio_buffers(unsigned int n_buffers)
+{
+ read_buffers.set_size(n_buffers);
+}
+
+extern void destroy_win_aio_buffers()
+{
+ read_buffers.clear();
+}
diff --git a/sql/threadpool_winsockets.h b/sql/threadpool_winsockets.h
new file mode 100644
index 00000000000..ca2068b759d
--- /dev/null
+++ b/sql/threadpool_winsockets.h
@@ -0,0 +1,80 @@
+/* Copyright (C) 2020 Monty Program Ab
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+ */
+#pragma once
+
+#include <WinSock2.h>
+#include <windows.h>
+
+struct st_vio;
+
+struct win_aiosocket
+{
+ /** OVERLAPPED is needed by all Windows AIO*/
+ OVERLAPPED m_overlapped{};
+ /** Handle to pipe, or socket */
+ HANDLE m_handle{};
+ /** Whether the m_handle refers to pipe*/
+ bool m_is_pipe{};
+
+ /* Read buffer handling */
+
+ /** Pointer to buffer of size READ_BUFSIZ. Can be NULL.*/
+ char *m_buf_ptr{};
+ /** Offset to current buffer position*/
+ size_t m_buf_off{};
+ /** Size of valid data in the buffer*/
+ size_t m_buf_datalen{};
+
+ /* Vio handling */
+ /** Pointer to original vio->vio_read/vio->has_data function */
+ size_t (*m_orig_vio_read)(st_vio *, unsigned char *, size_t){};
+ char (*m_orig_vio_has_data)(st_vio *){};
+
+
+
+ /**
+ Begins asynchronnous reading from socket/pipe.
+ On IO completion, pre-read some bytes into internal buffer
+ */
+ DWORD begin_read();
+
+ /**
+ Update number of bytes returned, and IO error status
+
+ Should be called right after IO is completed
+ GetQueuedCompletionStatus() , or threadpool IO completion
+ callback would return nbytes and the error.
+
+ Sets the valid data length in the read buffer.
+ */
+ void end_read(ULONG nbytes, DWORD err);
+
+ /**
+ Override VIO routines with ours, accounting for
+ one-shot buffering.
+ */
+ void init(st_vio *vio);
+
+ /** Return number of unread bytes.*/
+ size_t buffer_remaining();
+
+ /* Frees the read buffer.*/
+ ~win_aiosocket();
+};
+
+/* Functions related to IO buffers caches.*/
+extern void init_win_aio_buffers(unsigned int n_buffers);
+extern void destroy_win_aio_buffers();