summaryrefslogtreecommitdiff
path: root/tpool/aio_simulated.cc
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2019-10-29 18:17:24 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2019-11-15 16:50:22 +0100
commit00ee8d85c925846acc76df2a6fc7c67a062c2ea6 (patch)
tree3b8c71b64f089d3a625eeda5c3c188098d50b07b /tpool/aio_simulated.cc
parent7e08dd85d6271be4750c5989ccd5053df281d2aa (diff)
downloadmariadb-git-00ee8d85c925846acc76df2a6fc7c67a062c2ea6.tar.gz
MDEV-16264: Add threadpool library
The library is capable of - asynchronous execution of tasks (and optionally waiting for them) - asynchronous file IO This is implemented using libaio on Linux and completion ports on Windows. Elsewhere, async io is "simulated", which means worker threads are performing synchronous IO. - timers, scheduling work asynchronously in some point of the future. Also periodic timers are implemented.
Diffstat (limited to 'tpool/aio_simulated.cc')
-rw-r--r--tpool/aio_simulated.cc180
1 files changed, 180 insertions, 0 deletions
diff --git a/tpool/aio_simulated.cc b/tpool/aio_simulated.cc
new file mode 100644
index 00000000000..41c4897336d
--- /dev/null
+++ b/tpool/aio_simulated.cc
@@ -0,0 +1,180 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#ifndef _WIN32
+#include <unistd.h> /* pread(), pwrite() */
+#endif
+#include "tpool.h"
+#include "tpool_structs.h"
+#include <stdlib.h>
+#include <string.h>
+
+namespace tpool
+{
+#ifdef _WIN32
+
+/*
+ In order to be able to execute synchronous IO even on file opened
+ with FILE_FLAG_OVERLAPPED, and to bypass to completion port,
+ we use valid event handle for the hEvent member of the OVERLAPPED structure,
+ with its low-order bit set.
+
+ īSee MSDN docs for GetQueuedCompletionStatus() for description of this trick.
+*/
+static DWORD fls_sync_io= FLS_OUT_OF_INDEXES;
+HANDLE win_get_syncio_event()
+{
+ HANDLE h;
+
+ h= (HANDLE) FlsGetValue(fls_sync_io);
+ if (h)
+ {
+ return h;
+ }
+ h= CreateEventA(NULL, FALSE, FALSE, NULL);
+ /* Set low-order bit to keeps I/O completion from being queued */
+ h= (HANDLE)((uintptr_t) h | 1);
+ FlsSetValue(fls_sync_io, h);
+ return h;
+}
+#include <WinIoCtl.h>
+static void __stdcall win_free_syncio_event(void *data)
+{
+ if (data)
+ {
+ CloseHandle((HANDLE) data);
+ }
+}
+
+struct WinIoInit
+{
+ WinIoInit()
+ {
+ fls_sync_io= FlsAlloc(win_free_syncio_event);
+ if(fls_sync_io == FLS_OUT_OF_INDEXES)
+ abort();
+ }
+ ~WinIoInit() { FlsFree(fls_sync_io); }
+};
+
+static WinIoInit win_io_init;
+
+
+int pread(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset)
+{
+ OVERLAPPED ov{};
+ ULARGE_INTEGER uli;
+ uli.QuadPart= offset;
+ ov.Offset= uli.LowPart;
+ ov.OffsetHigh= uli.HighPart;
+ ov.hEvent= win_get_syncio_event();
+
+ if (ReadFile(h, buf, (DWORD) count, 0, &ov) ||
+ (GetLastError() == ERROR_IO_PENDING))
+ {
+ DWORD n_bytes;
+ if (GetOverlappedResult(h, &ov, &n_bytes, TRUE))
+ return n_bytes;
+ }
+
+ return -1;
+}
+
+int pwrite(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset)
+{
+ OVERLAPPED ov{};
+ ULARGE_INTEGER uli;
+ uli.QuadPart= offset;
+ ov.Offset= uli.LowPart;
+ ov.OffsetHigh= uli.HighPart;
+ ov.hEvent= win_get_syncio_event();
+
+ if (WriteFile(h, buf, (DWORD) count, 0, &ov) ||
+ (GetLastError() == ERROR_IO_PENDING))
+ {
+ DWORD n_bytes;
+ if (GetOverlappedResult(h, &ov, &n_bytes, TRUE))
+ return n_bytes;
+ }
+ return -1;
+}
+#endif
+
+/**
+ Simulated AIO.
+
+ Executes IO synchronously in worker pool
+ and then calls the completion routine.
+*/
+class simulated_aio : public aio
+{
+ thread_pool *m_pool;
+
+public:
+ simulated_aio(thread_pool *tp)
+ : m_pool(tp)
+ {
+ }
+
+ static void simulated_aio_callback(void *param)
+ {
+ aiocb *cb= (aiocb *) param;
+ int ret_len;
+ int err= 0;
+ switch (cb->m_opcode)
+ {
+ case aio_opcode::AIO_PREAD:
+ ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
+ break;
+ case aio_opcode::AIO_PWRITE:
+ ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
+ break;
+ default:
+ abort();
+ }
+ if (ret_len < 0)
+ {
+#ifdef _WIN32
+ err= GetLastError();
+#else
+ err= errno;
+#endif
+ }
+ cb->m_ret_len = ret_len;
+ cb->m_err = err;
+ cb->m_callback(cb);
+ }
+
+ virtual int submit_io(aiocb *aiocb) override
+ {
+ aiocb->m_internal_task.m_func = simulated_aio_callback;
+ aiocb->m_internal_task.m_arg = aiocb;
+ aiocb->m_internal_task.m_group = aiocb->m_group;
+ m_pool->submit_task(&aiocb->m_internal_task);
+ return 0;
+ }
+
+ virtual int bind(native_file_handle &fd) override { return 0; }
+ virtual int unbind(const native_file_handle &fd) override { return 0; }
+};
+
+aio *create_simulated_aio(thread_pool *tp)
+{
+ return new simulated_aio(tp);
+}
+
+} // namespace tpool \ No newline at end of file