diff options
Diffstat (limited to 'tpool')
-rw-r--r-- | tpool/aio_linux.cc | 161 |
1 files changed, 79 insertions, 82 deletions
diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc index 24bc04c75ba..51b656a604b 100644 --- a/tpool/aio_linux.cc +++ b/tpool/aio_linux.cc @@ -1,4 +1,4 @@ -/* Copyright(C) 2019 MariaDB Corporation. +/* Copyright (C) 2019, 2020, MariaDB Corporation. This program is free software; you can redistribute itand /or modify it under the terms of the GNU General Public License as published by @@ -14,133 +14,133 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ #include "tpool_structs.h" - -#include <stdlib.h> -#include <signal.h> -#include <assert.h> #include "tpool.h" -#include <thread> + #ifdef LINUX_NATIVE_AIO -#include <libaio.h> +# include <thread> +# include <atomic> +# include <libaio.h> +# include <sys/syscall.h> + +/** A simpler alternative to io_getevents(), without +aio_ring_is_empty() that may trigger SIGSEGV */ +static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev) +{ + int saved_errno= errno; + int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx), + min_nr, nr, ev, 0); + if (ret < 0) + { + ret= -errno; + errno= saved_errno; + } + return ret; +} #endif /* Linux AIO implementation, based on native AIO. Needs libaio.h and -laio at the compile time. - submit_io() is used to submit async IO. + io_submit() is used to submit async IO. - There is a single thread, that collects the completion notification - with io_getevent(), and forwards io completion callback + A single thread will collect the completion notification + with io_getevents() and forward io completion callback to the worker threadpool. */ namespace tpool { #ifdef LINUX_NATIVE_AIO -class aio_linux : public aio +class aio_linux final : public aio { - thread_pool* m_pool; + thread_pool *m_pool; io_context_t m_io_ctx; - bool m_in_shutdown; std::thread m_getevent_thread; + static std::atomic<bool> shutdown_in_progress; - static void getevent_thread_routine(aio_linux* aio) + static void getevent_thread_routine(aio_linux *aio) { + io_event events[1]; for (;;) { - io_event event; - struct timespec ts{0, 500000000}; - int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts); - - if (aio->m_in_shutdown) - break; - - if (ret > 0) - { - aiocb* iocb = (aiocb*)event.obj; - long long res = event.res; - if (res < 0) + switch (int ret= my_getevents(aio->m_io_ctx, 1, 1, events)) { + case -EINTR: + case 0: + continue; + case -EINVAL: + if (shutdown_in_progress) + return; + /* fall through */ + default: + if (ret != 1) { - iocb->m_err = static_cast<int>(-res); - iocb->m_ret_len = 0; + fprintf(stderr, "io_getevents returned %d\n", ret); + abort(); + return; } else { - iocb->m_ret_len = ret; - iocb->m_err = 0; + const io_event &event= events[0]; + aiocb *iocb= static_cast<aiocb*>(event.obj); + if (static_cast<int>(event.res) < 0) + { + iocb->m_err= -event.res; + iocb->m_ret_len= 0; + } + else + { + iocb->m_ret_len= event.res; + iocb->m_err= 0; + } + iocb->m_internal_task.m_func= iocb->m_callback; + iocb->m_internal_task.m_arg= iocb; + iocb->m_internal_task.m_group= iocb->m_group; + aio->m_pool->submit_task(&iocb->m_internal_task); } - - iocb->m_internal_task.m_func = iocb->m_callback; - iocb->m_internal_task.m_arg = iocb; - iocb->m_internal_task.m_group = iocb->m_group; - aio->m_pool->submit_task(&iocb->m_internal_task); - continue; - } - switch (ret) - { - case -EAGAIN: - usleep(1000); - continue; - case -EINTR: - case 0: - continue; - default: - fprintf(stderr, "io_getevents returned %d\n", ret); - abort(); } } } public: - aio_linux(io_context_t ctx, thread_pool* pool) + aio_linux(io_context_t ctx, thread_pool *pool) : m_pool(pool), m_io_ctx(ctx), - m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this) + m_getevent_thread(getevent_thread_routine, this) { } ~aio_linux() { - m_in_shutdown = true; - m_getevent_thread.join(); + shutdown_in_progress= true; io_destroy(m_io_ctx); + m_getevent_thread.join(); + shutdown_in_progress= false; } - // Inherited via aio - virtual int submit_io(aiocb* cb) override + int submit_io(aiocb *cb) override { - - if (cb->m_opcode == aio_opcode::AIO_PREAD) - io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len, - cb->m_offset); - else - io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len, - cb->m_offset); - - int ret; - ret = io_submit(m_io_ctx, 1, (iocb * *)& cb); + io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len, + cb->m_offset); + if (cb->m_opcode != aio_opcode::AIO_PREAD) + cb->aio_lio_opcode= IO_CMD_PWRITE; + iocb *icb= static_cast<iocb*>(cb); + int ret= io_submit(m_io_ctx, 1, &icb); if (ret == 1) return 0; - errno = -ret; + errno= -ret; return -1; } - // Inherited via aio - virtual int bind(native_file_handle& fd) override - { - return 0; - } - virtual int unbind(const native_file_handle& fd) override - { - return 0; - } + int bind(native_file_handle&) override { return 0; } + int unbind(const native_file_handle&) override { return 0; } }; -aio* create_linux_aio(thread_pool* pool, int max_io) +std::atomic<bool> aio_linux::shutdown_in_progress; + +aio *create_linux_aio(thread_pool *pool, int max_io) { io_context_t ctx; - memset(&ctx, 0, sizeof(ctx)); - int ret = io_setup(max_io, &ctx); - if (ret) + memset(&ctx, 0, sizeof ctx); + if (int ret= io_setup(max_io, &ctx)) { fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret); return nullptr; @@ -148,9 +148,6 @@ aio* create_linux_aio(thread_pool* pool, int max_io) return new aio_linux(ctx, pool); } #else -aio* create_linux_aio(thread_pool* pool, int max_aio) -{ - return nullptr; -} +aio *create_linux_aio(thread_pool*, int) { return nullptr; } #endif } |