summaryrefslogtreecommitdiff
path: root/tpool
diff options
context:
space:
mode:
Diffstat (limited to 'tpool')
-rw-r--r--tpool/CMakeLists.txt11
-rw-r--r--tpool/aio_liburing.cc185
-rw-r--r--tpool/aio_linux.cc6
-rw-r--r--tpool/tpool.h5
-rw-r--r--tpool/tpool_generic.cc4
5 files changed, 201 insertions, 10 deletions
diff --git a/tpool/CMakeLists.txt b/tpool/CMakeLists.txt
index 3e3f8e0b42a..239745ab90c 100644
--- a/tpool/CMakeLists.txt
+++ b/tpool/CMakeLists.txt
@@ -1,16 +1,19 @@
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
IF(WIN32)
SET(EXTRA_SOURCES tpool_win.cc aio_win.cc)
-ELSE()
- SET(EXTRA_SOURCES aio_linux.cc)
ENDIF()
-IF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
+IF(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND LIBURING)
+ SET(EXTRA_SOURCES aio_liburing.cc)
+ENDIF()
+
+IF(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND NOT LIBURING)
CHECK_INCLUDE_FILES (libaio.h HAVE_LIBAIO_H)
CHECK_LIBRARY_EXISTS(aio io_queue_init "" HAVE_LIBAIO)
IF(HAVE_LIBAIO_H AND HAVE_LIBAIO)
ADD_DEFINITIONS(-DLINUX_NATIVE_AIO=1)
LINK_LIBRARIES(aio)
+ SET(EXTRA_SOURCES aio_linux.cc)
ENDIF()
ENDIF()
@@ -26,4 +29,4 @@ ADD_LIBRARY(tpool STATIC
${EXTRA_SOURCES}
)
-INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include) \ No newline at end of file
+INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include)
diff --git a/tpool/aio_liburing.cc b/tpool/aio_liburing.cc
new file mode 100644
index 00000000000..14219f1d499
--- /dev/null
+++ b/tpool/aio_liburing.cc
@@ -0,0 +1,185 @@
+/* Copyright (C) 2021, MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include "tpool.h"
+#include "mysql/service_my_print_error.h"
+#include "mysqld_error.h"
+
+#include <liburing.h>
+
+#include <algorithm>
+#include <vector>
+#include <thread>
+#include <mutex>
+
+namespace
+{
+
+class aio_uring final : public tpool::aio
+{
+public:
+ aio_uring(tpool::thread_pool *tpool, int max_aio) : tpool_(tpool)
+ {
+ if (io_uring_queue_init(max_aio, &uring_, 0) != 0)
+ {
+ switch (const auto e= errno) {
+ case ENOMEM:
+ case ENOSYS:
+ my_printf_error(ER_UNKNOWN_ERROR, e == ENOMEM
+ ? "io_uring_queue_init() failed with ENOMEM:"
+ " try larger ulimit -l\n"
+ : "io_uring_queue_init() failed with ENOSYS:"
+ " try uprading the kernel\n",
+ ME_ERROR_LOG | ME_WARNING);
+ break;
+ default:
+ my_printf_error(ER_UNKNOWN_ERROR,
+ "io_uring_queue_init() failed with errno %d\n",
+ ME_ERROR_LOG | ME_WARNING, e);
+ }
+ throw std::runtime_error("aio_uring()");
+ }
+
+ thread_= std::thread(thread_routine, this);
+ }
+
+ ~aio_uring() noexcept
+ {
+ {
+ std::lock_guard<std::mutex> _(mutex_);
+ io_uring_sqe *sqe= io_uring_get_sqe(&uring_);
+ io_uring_prep_nop(sqe);
+ io_uring_sqe_set_data(sqe, nullptr);
+ auto ret= io_uring_submit(&uring_);
+ if (ret != 1)
+ {
+ my_printf_error(ER_UNKNOWN_ERROR,
+ "io_uring_submit() returned %d during shutdown:"
+ " this may cause a hang\n",
+ ME_ERROR_LOG | ME_FATAL, ret);
+ abort();
+ }
+ }
+ thread_.join();
+ io_uring_queue_exit(&uring_);
+ }
+
+ int submit_io(tpool::aiocb *cb) final
+ {
+ cb->iov_base= cb->m_buffer;
+ cb->iov_len= cb->m_len;
+
+ // The whole operation since io_uring_get_sqe() and till io_uring_submit()
+ // must be atomical. This is because liburing provides thread-unsafe calls.
+ std::lock_guard<std::mutex> _(mutex_);
+
+ io_uring_sqe *sqe= io_uring_get_sqe(&uring_);
+ if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD)
+ io_uring_prep_readv(sqe, cb->m_fh, static_cast<struct iovec *>(cb), 1,
+ cb->m_offset);
+ else
+ io_uring_prep_writev(sqe, cb->m_fh, static_cast<struct iovec *>(cb), 1,
+ cb->m_offset);
+ io_uring_sqe_set_data(sqe, cb);
+
+ return io_uring_submit(&uring_) == 1 ? 0 : -1;
+ }
+
+ int bind(native_file_handle &fd) final
+ {
+ std::lock_guard<std::mutex> _(files_mutex_);
+ auto it= std::lower_bound(files_.begin(), files_.end(), fd);
+ assert(it == files_.end() || *it != fd);
+ files_.insert(it, fd);
+ return io_uring_register_files_update(&uring_, 0, files_.data(),
+ files_.size());
+ }
+
+ int unbind(const native_file_handle &fd) final
+ {
+ std::lock_guard<std::mutex> _(files_mutex_);
+ auto it= std::lower_bound(files_.begin(), files_.end(), fd);
+ assert(*it == fd);
+ files_.erase(it);
+ return io_uring_register_files_update(&uring_, 0, files_.data(),
+ files_.size());
+ }
+
+private:
+ static void thread_routine(aio_uring *aio)
+ {
+ for (;;)
+ {
+ io_uring_cqe *cqe;
+ if (int ret= io_uring_wait_cqe(&aio->uring_, &cqe))
+ {
+ if (ret == -EINTR) // this may occur during shutdown
+ break;
+ my_printf_error(ER_UNKNOWN_ERROR,
+ "io_uring_wait_cqe() returned %d\n",
+ ME_ERROR_LOG | ME_FATAL, ret);
+ abort();
+ }
+
+ auto *iocb= static_cast<tpool::aiocb*>(io_uring_cqe_get_data(cqe));
+ if (!iocb)
+ break;
+
+ int res= cqe->res;
+ if (res < 0)
+ {
+ iocb->m_err= -res;
+ iocb->m_ret_len= 0;
+ }
+ else
+ {
+ iocb->m_err= 0;
+ iocb->m_ret_len= res;
+ }
+
+ io_uring_cqe_seen(&aio->uring_, cqe);
+
+ iocb->m_internal_task.m_func= iocb->m_callback;
+ iocb->m_internal_task.m_arg= iocb;
+ iocb->m_internal_task.m_group= iocb->m_group;
+ aio->tpool_->submit_task(&iocb->m_internal_task);
+ }
+ }
+
+ io_uring uring_;
+ std::mutex mutex_;
+ tpool::thread_pool *tpool_;
+ std::thread thread_;
+
+ std::vector<native_file_handle> files_;
+ std::mutex files_mutex_;
+};
+
+} // namespace
+
+namespace tpool
+{
+
+aio *create_linux_aio(thread_pool *pool, int max_aio)
+{
+ try {
+ return new aio_uring(pool, max_aio);
+ } catch (std::runtime_error& error) {
+ return nullptr;
+ }
+}
+
+} // namespace tpool
diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc
index d9aa8be2347..4abc2139881 100644
--- a/tpool/aio_linux.cc
+++ b/tpool/aio_linux.cc
@@ -16,7 +16,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#include "tpool_structs.h"
#include "tpool.h"
-#ifdef LINUX_NATIVE_AIO
# include <thread>
# include <atomic>
# include <libaio.h>
@@ -69,7 +68,6 @@ static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
}
return ret;
}
-#endif
/*
@@ -84,7 +82,6 @@ static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
*/
namespace tpool
{
-#ifdef LINUX_NATIVE_AIO
class aio_linux final : public aio
{
@@ -187,7 +184,4 @@ aio *create_linux_aio(thread_pool *pool, int max_io)
}
return new aio_linux(ctx, pool);
}
-#else
-aio *create_linux_aio(thread_pool*, int) { return nullptr; }
-#endif
}
diff --git a/tpool/tpool.h b/tpool/tpool.h
index 3a5658c0f36..d33c0608959 100644
--- a/tpool/tpool.h
+++ b/tpool/tpool.h
@@ -22,6 +22,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
#ifdef LINUX_NATIVE_AIO
#include <libaio.h>
#endif
+#ifdef HAVE_URING
+#include <sys/uio.h>
+#endif
#ifdef _WIN32
#ifndef NOMINMAX
#define NOMINMAX
@@ -123,6 +126,8 @@ struct aiocb
:OVERLAPPED
#elif defined LINUX_NATIVE_AIO
:iocb
+#elif defined HAVE_URING
+ :iovec
#endif
{
native_file_handle m_fh;
diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc
index 7c645b09785..0c769d67c99 100644
--- a/tpool/tpool_generic.cc
+++ b/tpool/tpool_generic.cc
@@ -38,7 +38,11 @@ namespace tpool
{
#ifdef __linux__
+#if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO)
extern aio* create_linux_aio(thread_pool* tp, int max_io);
+#else
+ aio *create_linux_aio(thread_pool *, int) { return nullptr; };
+#endif
#endif
#ifdef _WIN32
extern aio* create_win_aio(thread_pool* tp, int max_io);