summaryrefslogtreecommitdiff
path: root/tpool
diff options
context:
space:
mode:
Diffstat (limited to 'tpool')
-rw-r--r--tpool/aio_linux.cc161
1 files changed, 79 insertions, 82 deletions
diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc
index 24bc04c75ba..51b656a604b 100644
--- a/tpool/aio_linux.cc
+++ b/tpool/aio_linux.cc
@@ -1,4 +1,4 @@
-/* Copyright(C) 2019 MariaDB Corporation.
+/* Copyright (C) 2019, 2020, MariaDB Corporation.
This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
@@ -14,133 +14,133 @@ along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
-
-#include <stdlib.h>
-#include <signal.h>
-#include <assert.h>
#include "tpool.h"
-#include <thread>
+
#ifdef LINUX_NATIVE_AIO
-#include <libaio.h>
+# include <thread>
+# include <atomic>
+# include <libaio.h>
+# include <sys/syscall.h>
+
+/** A simpler alternative to io_getevents(), without
+aio_ring_is_empty() that may trigger SIGSEGV */
+static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
+{
+ int saved_errno= errno;
+ int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
+ min_nr, nr, ev, 0);
+ if (ret < 0)
+ {
+ ret= -errno;
+ errno= saved_errno;
+ }
+ return ret;
+}
#endif
/*
Linux AIO implementation, based on native AIO.
Needs libaio.h and -laio at the compile time.
- submit_io() is used to submit async IO.
+ io_submit() is used to submit async IO.
- There is a single thread, that collects the completion notification
- with io_getevent(), and forwards io completion callback
+ A single thread will collect the completion notification
+ with io_getevents() and forward io completion callback to
the worker threadpool.
*/
namespace tpool
{
#ifdef LINUX_NATIVE_AIO
-class aio_linux : public aio
+class aio_linux final : public aio
{
- thread_pool* m_pool;
+ thread_pool *m_pool;
io_context_t m_io_ctx;
- bool m_in_shutdown;
std::thread m_getevent_thread;
+ static std::atomic<bool> shutdown_in_progress;
- static void getevent_thread_routine(aio_linux* aio)
+ static void getevent_thread_routine(aio_linux *aio)
{
+ io_event events[1];
for (;;)
{
- io_event event;
- struct timespec ts{0, 500000000};
- int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts);
-
- if (aio->m_in_shutdown)
- break;
-
- if (ret > 0)
- {
- aiocb* iocb = (aiocb*)event.obj;
- long long res = event.res;
- if (res < 0)
+ switch (int ret= my_getevents(aio->m_io_ctx, 1, 1, events)) {
+ case -EINTR:
+ case 0:
+ continue;
+ case -EINVAL:
+ if (shutdown_in_progress)
+ return;
+ /* fall through */
+ default:
+ if (ret != 1)
{
- iocb->m_err = static_cast<int>(-res);
- iocb->m_ret_len = 0;
+ fprintf(stderr, "io_getevents returned %d\n", ret);
+ abort();
+ return;
}
else
{
- iocb->m_ret_len = ret;
- iocb->m_err = 0;
+ const io_event &event= events[0];
+ aiocb *iocb= static_cast<aiocb*>(event.obj);
+ if (static_cast<int>(event.res) < 0)
+ {
+ iocb->m_err= -event.res;
+ iocb->m_ret_len= 0;
+ }
+ else
+ {
+ iocb->m_ret_len= event.res;
+ iocb->m_err= 0;
+ }
+ iocb->m_internal_task.m_func= iocb->m_callback;
+ iocb->m_internal_task.m_arg= iocb;
+ iocb->m_internal_task.m_group= iocb->m_group;
+ aio->m_pool->submit_task(&iocb->m_internal_task);
}
-
- iocb->m_internal_task.m_func = iocb->m_callback;
- iocb->m_internal_task.m_arg = iocb;
- iocb->m_internal_task.m_group = iocb->m_group;
- aio->m_pool->submit_task(&iocb->m_internal_task);
- continue;
- }
- switch (ret)
- {
- case -EAGAIN:
- usleep(1000);
- continue;
- case -EINTR:
- case 0:
- continue;
- default:
- fprintf(stderr, "io_getevents returned %d\n", ret);
- abort();
}
}
}
public:
- aio_linux(io_context_t ctx, thread_pool* pool)
+ aio_linux(io_context_t ctx, thread_pool *pool)
: m_pool(pool), m_io_ctx(ctx),
- m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this)
+ m_getevent_thread(getevent_thread_routine, this)
{
}
~aio_linux()
{
- m_in_shutdown = true;
- m_getevent_thread.join();
+ shutdown_in_progress= true;
io_destroy(m_io_ctx);
+ m_getevent_thread.join();
+ shutdown_in_progress= false;
}
- // Inherited via aio
- virtual int submit_io(aiocb* cb) override
+ int submit_io(aiocb *cb) override
{
-
- if (cb->m_opcode == aio_opcode::AIO_PREAD)
- io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
- cb->m_offset);
- else
- io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
- cb->m_offset);
-
- int ret;
- ret = io_submit(m_io_ctx, 1, (iocb * *)& cb);
+ io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
+ cb->m_offset);
+ if (cb->m_opcode != aio_opcode::AIO_PREAD)
+ cb->aio_lio_opcode= IO_CMD_PWRITE;
+ iocb *icb= static_cast<iocb*>(cb);
+ int ret= io_submit(m_io_ctx, 1, &icb);
if (ret == 1)
return 0;
- errno = -ret;
+ errno= -ret;
return -1;
}
- // Inherited via aio
- virtual int bind(native_file_handle& fd) override
- {
- return 0;
- }
- virtual int unbind(const native_file_handle& fd) override
- {
- return 0;
- }
+ int bind(native_file_handle&) override { return 0; }
+ int unbind(const native_file_handle&) override { return 0; }
};
-aio* create_linux_aio(thread_pool* pool, int max_io)
+std::atomic<bool> aio_linux::shutdown_in_progress;
+
+aio *create_linux_aio(thread_pool *pool, int max_io)
{
io_context_t ctx;
- memset(&ctx, 0, sizeof(ctx));
- int ret = io_setup(max_io, &ctx);
- if (ret)
+ memset(&ctx, 0, sizeof ctx);
+ if (int ret= io_setup(max_io, &ctx))
{
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
return nullptr;
@@ -148,9 +148,6 @@ aio* create_linux_aio(thread_pool* pool, int max_io)
return new aio_linux(ctx, pool);
}
#else
-aio* create_linux_aio(thread_pool* pool, int max_aio)
-{
- return nullptr;
-}
+aio *create_linux_aio(thread_pool*, int) { return nullptr; }
#endif
}