summaryrefslogtreecommitdiff
path: root/sql/threadpool_winsockets.cc
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2020-06-26 14:43:56 +0200
committerVladislav Vaintroub <wlad@mariadb.com>2020-06-26 14:44:36 +0200
commitd15c839c0dab4016eb425fdb109a5dd8ecd918a4 (patch)
tree0a7c99f2ccc4f15d2da1dbb00caad4063ad40e2d /sql/threadpool_winsockets.cc
parentb3acad4a488c51d72ac63c8a686e496975b10479 (diff)
downloadmariadb-git-d15c839c0dab4016eb425fdb109a5dd8ecd918a4.tar.gz
MDEV-22990 Threadpool : Optimize network/named pipe IO for Windows
This patch reduces the overhead of system calls prior to a query, for threadpool. Previously, 3 system calls were done 1. WSARecv() to get notification of input data from client, asynchronous equivalent of select() in one-thread-per-connection 2. recv(4 bytes) - reading packet header length 3. recv(packet payload) Now there will be usually, just WSARecv(), which pre-reads user data into a buffer, so we spared 2 syscalls Profiler shows the most expensive call WSARecv(16%CPU) becomes 4% CPU, after the patch, benchmark results (network heavy ones like point-select) improve by ~20% The buffer management was rather carefully done to keep buffers together, as Windows would keeps the pages pinned in memory for the duration of async calls. At most 1MB memory is used for the buffers, and overhead per-connection is only 256 bytes, which should cover most of the uses. SSL does not yet use the optmization, so far it does not properly use VIO for reads and writes. Neither one-thread-per-connection would get any benefit, but that should be fine, it is not even default on Windows.
Diffstat (limited to 'sql/threadpool_winsockets.cc')
-rw-r--r--sql/threadpool_winsockets.cc259
1 files changed, 259 insertions, 0 deletions
diff --git a/sql/threadpool_winsockets.cc b/sql/threadpool_winsockets.cc
new file mode 100644
index 00000000000..6b4758a451f
--- /dev/null
+++ b/sql/threadpool_winsockets.cc
@@ -0,0 +1,259 @@
+/* Copyright (C) 2012 Monty Program Ab
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+ */
+
+#include <winsock2.h>
+#include <my_global.h>
+#include <violite.h>
+#include "threadpool_winsockets.h"
+#include <algorithm>
+#include <vector>
+#include <mutex>
+
+/*
+ A cache for IO buffers for asynchronous socket(or named pipe) reads.
+
+ Considerations on Windows : since Windows locks the AIO buffers in physical memory,
+ it is important that these buffers are compactly allocated.
+ We try to to prevent any kinds of memory fragmentation
+
+ A relatively small region (at most 1MB) is allocated, for equally sized smallish(256 bytes)
+ This allow buffers. The region is pagesize-aligned (via VirtualAlloc allocation)
+
+ We use smallish IO buffers, 256 bytes is probably large enough for most of
+ the queries. Larger buffers could have funny effects(thread hogginng)
+ on threadpool scheduling in case client is using protocol pipelining.
+
+ Also note, that even in an unlikely situation where cache runs out of buffers,
+ this does not lead to errors, zero szed reads will be used in WSARecv then.
+*/
+
+constexpr size_t READ_BUFSIZ= 256;
+class AIO_buffer_cache
+{
+ const size_t ITEM_SIZE= READ_BUFSIZ;
+
+ /** Limit the whole cache to 1MB*/
+ const size_t MAX_SIZE= 1048576;
+
+ /* Allocation base */
+ char *m_base= 0;
+
+ /* "Free list" with LIFO policy */
+ std::vector<char *> m_cache;
+ std::mutex m_mtx;
+ size_t m_elements=0;
+
+public:
+ void set_size(size_t n_items);
+ char *acquire_buffer();
+ void release_buffer(char *v);
+ void clear();
+ ~AIO_buffer_cache();
+};
+
+
+void AIO_buffer_cache::set_size(size_t n_items)
+{
+ DBUG_ASSERT(!m_base);
+ m_elements= std::min(n_items, MAX_SIZE / ITEM_SIZE);
+ auto sz= m_elements * ITEM_SIZE;
+
+ m_base=
+ (char *) VirtualAlloc(0, sz, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
+ if (!m_base)
+ {
+ m_elements= 0;
+ return;
+ }
+
+ /* Try to help memory manager here, by prelocking region in memory*/
+ (void) VirtualLock(m_base, sz);
+
+ m_cache.reserve(m_elements);
+ for (ssize_t i= m_elements - 1; i >= 0 ; i--)
+ m_cache.push_back(m_base + i * ITEM_SIZE);
+}
+
+/*
+ Returns a buffer, or NULL if no free buffers.
+
+ LIFO policy is implemented, so we do not touch too many
+ pages (no std::stack though)
+*/
+char *AIO_buffer_cache::acquire_buffer()
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_cache.empty())
+ return nullptr;
+ auto p= m_cache.back();
+ m_cache.pop_back();
+ return p;
+}
+
+void AIO_buffer_cache::release_buffer(char *v)
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_cache.push_back(v);
+}
+
+void AIO_buffer_cache::clear()
+{
+ if (!m_base)
+ return;
+
+ /* Check that all items are returned to the cache. */
+ DBUG_ASSERT(m_cache.size() == m_elements);
+ VirtualFree(m_base, 0, MEM_RELEASE);
+ m_cache.clear();
+ m_base= 0;
+ m_elements= 0;
+}
+
+AIO_buffer_cache::~AIO_buffer_cache() { clear(); }
+
+/* Global variable for the cache buffers.*/
+AIO_buffer_cache read_buffers;
+
+win_aiosocket::~win_aiosocket()
+{
+ if (m_buf_ptr)
+ read_buffers.release_buffer(m_buf_ptr);
+}
+
+
+/** Return number of unread bytes.*/
+size_t win_aiosocket::buffer_remaining()
+{
+ return m_buf_datalen - m_buf_off;
+}
+
+static my_bool my_vio_has_data(st_vio *vio)
+{
+ auto sock= (win_aiosocket *) vio->tp_ctx;
+ return sock->buffer_remaining() || sock->m_orig_vio_has_data(vio);
+}
+
+/*
+ (Half-)buffered read.
+
+ The buffer is filled once, by completion of the async IO.
+
+ We do not refill the buffer once it is read off,
+ does not make sense.
+*/
+static size_t my_vio_read(st_vio *vio, uchar *dest, size_t sz)
+{
+ auto sock= (win_aiosocket *) vio->tp_ctx;
+ DBUG_ASSERT(sock);
+
+ auto nbytes= std::min(sock->buffer_remaining(), sz);
+
+ if (nbytes > 0)
+ {
+ /* Copy to output, adjust the offset.*/
+ memcpy(dest, sock->m_buf_ptr + sock->m_buf_off, nbytes);
+ sock->m_buf_off += nbytes;
+ return nbytes;
+ }
+
+ return sock->m_orig_vio_read(vio, dest, sz);
+}
+
+DWORD win_aiosocket::begin_read()
+{
+ DWORD err = ERROR_SUCCESS;
+ static char c;
+ WSABUF buf;
+
+ DBUG_ASSERT(!buffer_remaining());
+
+ /*
+ If there is no internal buffer to store data,
+ we do zero size read, but still need a valid
+ pointer for the buffer parameter.
+ */
+ if (m_buf_ptr)
+ buf= {(ULONG)READ_BUFSIZ, m_buf_ptr};
+ else
+ buf= {0, &c};
+
+
+ if (!m_is_pipe)
+ {
+ /* Do async io (sockets). */
+ DWORD flags= 0;
+ if (WSARecv((SOCKET) m_handle, &buf, 1, 0, &flags, &m_overlapped, NULL))
+ err= WSAGetLastError();
+ }
+ else
+ {
+ /* Do async read (named pipe) */
+ if (ReadFile(m_handle, buf.buf, buf.len, 0, &m_overlapped))
+ err= GetLastError();
+ }
+
+ if (!err || err == ERROR_IO_PENDING)
+ return 0;
+ return err;
+}
+
+void win_aiosocket::end_read(ULONG nbytes, DWORD err)
+{
+ DBUG_ASSERT(!buffer_remaining());
+ DBUG_ASSERT(!nbytes || m_buf_ptr);
+ m_buf_off= 0;
+ m_buf_datalen= nbytes;
+}
+
+void win_aiosocket::init(Vio *vio)
+{
+ m_is_pipe= vio->type == VIO_TYPE_NAMEDPIPE;
+ m_handle=
+ m_is_pipe ? vio->hPipe : (HANDLE) mysql_socket_getfd(vio->mysql_socket);
+
+ SetFileCompletionNotificationModes(m_handle, FILE_SKIP_SET_EVENT_ON_HANDLE);
+ if (vio->type == VIO_TYPE_SSL)
+ {
+ /*
+ TODO : This requires fixing viossl to call our manipulated VIO
+ */
+ return;
+ }
+
+ if (!(m_buf_ptr = read_buffers.acquire_buffer()))
+ {
+ /* Ran out of buffers, that's fine.*/
+ return;
+ }
+
+ vio->tp_ctx= this;
+
+ m_orig_vio_has_data= vio->has_data;
+ vio->has_data= my_vio_has_data;
+
+ m_orig_vio_read= vio->read;
+ vio->read= my_vio_read;
+}
+
+void init_win_aio_buffers(unsigned int n_buffers)
+{
+ read_buffers.set_size(n_buffers);
+}
+
+extern void destroy_win_aio_buffers()
+{
+ read_buffers.clear();
+}