summaryrefslogtreecommitdiff
path: root/tpool/aio_linux.cc
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2019-10-29 18:17:24 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2019-11-15 16:50:22 +0100
commit00ee8d85c925846acc76df2a6fc7c67a062c2ea6 (patch)
tree3b8c71b64f089d3a625eeda5c3c188098d50b07b /tpool/aio_linux.cc
parent7e08dd85d6271be4750c5989ccd5053df281d2aa (diff)
downloadmariadb-git-00ee8d85c925846acc76df2a6fc7c67a062c2ea6.tar.gz
MDEV-16264: Add threadpool library
The library is capable of - asynchronous execution of tasks (and optionally waiting for them) - asynchronous file IO This is implemented using libaio on Linux and completion ports on Windows. Elsewhere, async io is "simulated", which means worker threads are performing synchronous IO. - timers, scheduling work asynchronously in some point of the future. Also periodic timers are implemented.
Diffstat (limited to 'tpool/aio_linux.cc')
-rw-r--r--tpool/aio_linux.cc157
1 files changed, 157 insertions, 0 deletions
diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc
new file mode 100644
index 00000000000..0a4820a2412
--- /dev/null
+++ b/tpool/aio_linux.cc
@@ -0,0 +1,157 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+
+#include <stdlib.h>
+#include <signal.h>
+#include <assert.h>
+#include "tpool.h"
+#include <thread>
+#ifdef LINUX_NATIVE_AIO
+#include <libaio.h>
+#endif
+/*
+ Linux AIO implementation, based on native AIO.
+ Needs libaio.h and -laio at the compile time.
+
+ submit_io() is used to submit async IO.
+
+ There is a single thread, that collects the completion notification
+ with io_getevent(), and forwards io completion callback
+ the worker threadpool.
+*/
+namespace tpool
+{
+#ifdef LINUX_NATIVE_AIO
+
+class aio_linux : public aio
+{
+ int m_max_io_count;
+ thread_pool* m_pool;
+ io_context_t m_io_ctx;
+ bool m_in_shutdown;
+ std::thread m_getevent_thread;
+
+ static void getevent_thread_routine(aio_linux* aio)
+ {
+ for (;;)
+ {
+ io_event event;
+ struct timespec ts{0, 500000000};
+ int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts);
+
+ if (aio->m_in_shutdown)
+ break;
+
+ if (ret > 0)
+ {
+ aiocb* iocb = (aiocb*)event.obj;
+ long long res = event.res;
+ if (res < 0)
+ {
+ iocb->m_err = -res;
+ iocb->m_ret_len = 0;
+ }
+ else
+ {
+ iocb->m_ret_len = ret;
+ iocb->m_err = 0;
+ }
+
+ iocb->m_internal_task.m_func = iocb->m_callback;
+ iocb->m_internal_task.m_arg = iocb;
+ iocb->m_internal_task.m_group = iocb->m_group;
+ aio->m_pool->submit_task(&iocb->m_internal_task);
+ continue;
+ }
+ switch (ret)
+ {
+ case -EAGAIN:
+ usleep(1000);
+ continue;
+ case -EINTR:
+ case 0:
+ continue;
+ default:
+ fprintf(stderr, "io_getevents returned %d\n", ret);
+ abort();
+ }
+ }
+ }
+
+public:
+ aio_linux(io_context_t ctx, thread_pool* pool, size_t max_count)
+ : m_max_io_count(max_count), m_pool(pool), m_io_ctx(ctx),
+ m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this)
+ {
+ }
+
+ ~aio_linux()
+ {
+ m_in_shutdown = true;
+ m_getevent_thread.join();
+ io_destroy(m_io_ctx);
+ }
+
+ // Inherited via aio
+ virtual int submit_io(aiocb* cb) override
+ {
+
+ if (cb->m_opcode == aio_opcode::AIO_PREAD)
+ io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
+ cb->m_offset);
+ else
+ io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
+ cb->m_offset);
+
+ int ret;
+ ret = io_submit(m_io_ctx, 1, (iocb * *)& cb);
+ if (ret == 1)
+ return 0;
+ errno = -ret;
+ return -1;
+ }
+
+ // Inherited via aio
+ virtual int bind(native_file_handle& fd) override
+ {
+ return 0;
+ }
+ virtual int unbind(const native_file_handle& fd) override
+ {
+ return 0;
+ }
+};
+
+aio* create_linux_aio(thread_pool* pool, int max_io)
+{
+ io_context_t ctx;
+ memset(&ctx, 0, sizeof(ctx));
+ int ret = io_setup(max_io, &ctx);
+ if (ret)
+ {
+ fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
+ return nullptr;
+ }
+ return new aio_linux(ctx, pool, max_io);
+}
+#else
+aio* create_linux_aio(thread_pool* pool, int max_aio)
+{
+ return nullptr;
+}
+#endif
+}