diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2019-10-29 18:17:24 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2019-11-15 16:50:22 +0100 |
commit | 00ee8d85c925846acc76df2a6fc7c67a062c2ea6 (patch) | |
tree | 3b8c71b64f089d3a625eeda5c3c188098d50b07b /tpool/aio_simulated.cc | |
parent | 7e08dd85d6271be4750c5989ccd5053df281d2aa (diff) | |
download | mariadb-git-00ee8d85c925846acc76df2a6fc7c67a062c2ea6.tar.gz |
MDEV-16264: Add threadpool library
The library is capable of
- asynchronous execution of tasks (and optionally waiting for them)
- asynchronous file IO
This is implemented using libaio on Linux and completion ports on
Windows. Elsewhere, async io is "simulated", which means worker threads
are performing synchronous IO.
- timers, scheduling work asynchronously in some point of the future.
Also periodic timers are implemented.
Diffstat (limited to 'tpool/aio_simulated.cc')
-rw-r--r-- | tpool/aio_simulated.cc | 180 |
1 files changed, 180 insertions, 0 deletions
diff --git a/tpool/aio_simulated.cc b/tpool/aio_simulated.cc new file mode 100644 index 00000000000..41c4897336d --- /dev/null +++ b/tpool/aio_simulated.cc @@ -0,0 +1,180 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#ifndef _WIN32 +#include <unistd.h> /* pread(), pwrite() */ +#endif +#include "tpool.h" +#include "tpool_structs.h" +#include <stdlib.h> +#include <string.h> + +namespace tpool +{ +#ifdef _WIN32 + +/* + In order to be able to execute synchronous IO even on file opened + with FILE_FLAG_OVERLAPPED, and to bypass to completion port, + we use valid event handle for the hEvent member of the OVERLAPPED structure, + with its low-order bit set. + + īSee MSDN docs for GetQueuedCompletionStatus() for description of this trick. +*/ +static DWORD fls_sync_io= FLS_OUT_OF_INDEXES; +HANDLE win_get_syncio_event() +{ + HANDLE h; + + h= (HANDLE) FlsGetValue(fls_sync_io); + if (h) + { + return h; + } + h= CreateEventA(NULL, FALSE, FALSE, NULL); + /* Set low-order bit to keeps I/O completion from being queued */ + h= (HANDLE)((uintptr_t) h | 1); + FlsSetValue(fls_sync_io, h); + return h; +} +#include <WinIoCtl.h> +static void __stdcall win_free_syncio_event(void *data) +{ + if (data) + { + CloseHandle((HANDLE) data); + } +} + +struct WinIoInit +{ + WinIoInit() + { + fls_sync_io= FlsAlloc(win_free_syncio_event); + if(fls_sync_io == FLS_OUT_OF_INDEXES) + abort(); + } + ~WinIoInit() { FlsFree(fls_sync_io); } +}; + +static WinIoInit win_io_init; + + +int pread(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset) +{ + OVERLAPPED ov{}; + ULARGE_INTEGER uli; + uli.QuadPart= offset; + ov.Offset= uli.LowPart; + ov.OffsetHigh= uli.HighPart; + ov.hEvent= win_get_syncio_event(); + + if (ReadFile(h, buf, (DWORD) count, 0, &ov) || + (GetLastError() == ERROR_IO_PENDING)) + { + DWORD n_bytes; + if (GetOverlappedResult(h, &ov, &n_bytes, TRUE)) + return n_bytes; + } + + return -1; +} + +int pwrite(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset) +{ + OVERLAPPED ov{}; + ULARGE_INTEGER uli; + uli.QuadPart= offset; + ov.Offset= uli.LowPart; + ov.OffsetHigh= uli.HighPart; + ov.hEvent= win_get_syncio_event(); + + if (WriteFile(h, buf, (DWORD) count, 0, &ov) || + (GetLastError() == ERROR_IO_PENDING)) + { + DWORD n_bytes; + if (GetOverlappedResult(h, &ov, &n_bytes, TRUE)) + return n_bytes; + } + return -1; +} +#endif + +/** + Simulated AIO. + + Executes IO synchronously in worker pool + and then calls the completion routine. +*/ +class simulated_aio : public aio +{ + thread_pool *m_pool; + +public: + simulated_aio(thread_pool *tp) + : m_pool(tp) + { + } + + static void simulated_aio_callback(void *param) + { + aiocb *cb= (aiocb *) param; + int ret_len; + int err= 0; + switch (cb->m_opcode) + { + case aio_opcode::AIO_PREAD: + ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + case aio_opcode::AIO_PWRITE: + ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + default: + abort(); + } + if (ret_len < 0) + { +#ifdef _WIN32 + err= GetLastError(); +#else + err= errno; +#endif + } + cb->m_ret_len = ret_len; + cb->m_err = err; + cb->m_callback(cb); + } + + virtual int submit_io(aiocb *aiocb) override + { + aiocb->m_internal_task.m_func = simulated_aio_callback; + aiocb->m_internal_task.m_arg = aiocb; + aiocb->m_internal_task.m_group = aiocb->m_group; + m_pool->submit_task(&aiocb->m_internal_task); + return 0; + } + + virtual int bind(native_file_handle &fd) override { return 0; } + virtual int unbind(const native_file_handle &fd) override { return 0; } +}; + +aio *create_simulated_aio(thread_pool *tp) +{ + return new simulated_aio(tp); +} + +} // namespace tpool
\ No newline at end of file |