diff options
Diffstat (limited to 'tpool')
-rw-r--r-- | tpool/CMakeLists.txt | 11 | ||||
-rw-r--r-- | tpool/aio_liburing.cc | 185 | ||||
-rw-r--r-- | tpool/aio_linux.cc | 6 | ||||
-rw-r--r-- | tpool/tpool.h | 5 | ||||
-rw-r--r-- | tpool/tpool_generic.cc | 4 |
5 files changed, 201 insertions, 10 deletions
diff --git a/tpool/CMakeLists.txt b/tpool/CMakeLists.txt index 3e3f8e0b42a..239745ab90c 100644 --- a/tpool/CMakeLists.txt +++ b/tpool/CMakeLists.txt @@ -1,16 +1,19 @@ INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}) IF(WIN32) SET(EXTRA_SOURCES tpool_win.cc aio_win.cc) -ELSE() - SET(EXTRA_SOURCES aio_linux.cc) ENDIF() -IF(CMAKE_SYSTEM_NAME STREQUAL "Linux") +IF(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND LIBURING) + SET(EXTRA_SOURCES aio_liburing.cc) +ENDIF() + +IF(CMAKE_SYSTEM_NAME STREQUAL "Linux" AND NOT LIBURING) CHECK_INCLUDE_FILES (libaio.h HAVE_LIBAIO_H) CHECK_LIBRARY_EXISTS(aio io_queue_init "" HAVE_LIBAIO) IF(HAVE_LIBAIO_H AND HAVE_LIBAIO) ADD_DEFINITIONS(-DLINUX_NATIVE_AIO=1) LINK_LIBRARIES(aio) + SET(EXTRA_SOURCES aio_linux.cc) ENDIF() ENDIF() @@ -26,4 +29,4 @@ ADD_LIBRARY(tpool STATIC ${EXTRA_SOURCES} ) -INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include)
\ No newline at end of file +INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include) diff --git a/tpool/aio_liburing.cc b/tpool/aio_liburing.cc new file mode 100644 index 00000000000..14219f1d499 --- /dev/null +++ b/tpool/aio_liburing.cc @@ -0,0 +1,185 @@ +/* Copyright (C) 2021, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include "tpool.h" +#include "mysql/service_my_print_error.h" +#include "mysqld_error.h" + +#include <liburing.h> + +#include <algorithm> +#include <vector> +#include <thread> +#include <mutex> + +namespace +{ + +class aio_uring final : public tpool::aio +{ +public: + aio_uring(tpool::thread_pool *tpool, int max_aio) : tpool_(tpool) + { + if (io_uring_queue_init(max_aio, &uring_, 0) != 0) + { + switch (const auto e= errno) { + case ENOMEM: + case ENOSYS: + my_printf_error(ER_UNKNOWN_ERROR, e == ENOMEM + ? "io_uring_queue_init() failed with ENOMEM:" + " try larger ulimit -l\n" + : "io_uring_queue_init() failed with ENOSYS:" + " try uprading the kernel\n", + ME_ERROR_LOG | ME_WARNING); + break; + default: + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_queue_init() failed with errno %d\n", + ME_ERROR_LOG | ME_WARNING, e); + } + throw std::runtime_error("aio_uring()"); + } + + thread_= std::thread(thread_routine, this); + } + + ~aio_uring() noexcept + { + { + std::lock_guard<std::mutex> _(mutex_); + io_uring_sqe *sqe= io_uring_get_sqe(&uring_); + io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, nullptr); + auto ret= io_uring_submit(&uring_); + if (ret != 1) + { + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_submit() returned %d during shutdown:" + " this may cause a hang\n", + ME_ERROR_LOG | ME_FATAL, ret); + abort(); + } + } + thread_.join(); + io_uring_queue_exit(&uring_); + } + + int submit_io(tpool::aiocb *cb) final + { + cb->iov_base= cb->m_buffer; + cb->iov_len= cb->m_len; + + // The whole operation since io_uring_get_sqe() and till io_uring_submit() + // must be atomical. This is because liburing provides thread-unsafe calls. + std::lock_guard<std::mutex> _(mutex_); + + io_uring_sqe *sqe= io_uring_get_sqe(&uring_); + if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD) + io_uring_prep_readv(sqe, cb->m_fh, static_cast<struct iovec *>(cb), 1, + cb->m_offset); + else + io_uring_prep_writev(sqe, cb->m_fh, static_cast<struct iovec *>(cb), 1, + cb->m_offset); + io_uring_sqe_set_data(sqe, cb); + + return io_uring_submit(&uring_) == 1 ? 0 : -1; + } + + int bind(native_file_handle &fd) final + { + std::lock_guard<std::mutex> _(files_mutex_); + auto it= std::lower_bound(files_.begin(), files_.end(), fd); + assert(it == files_.end() || *it != fd); + files_.insert(it, fd); + return io_uring_register_files_update(&uring_, 0, files_.data(), + files_.size()); + } + + int unbind(const native_file_handle &fd) final + { + std::lock_guard<std::mutex> _(files_mutex_); + auto it= std::lower_bound(files_.begin(), files_.end(), fd); + assert(*it == fd); + files_.erase(it); + return io_uring_register_files_update(&uring_, 0, files_.data(), + files_.size()); + } + +private: + static void thread_routine(aio_uring *aio) + { + for (;;) + { + io_uring_cqe *cqe; + if (int ret= io_uring_wait_cqe(&aio->uring_, &cqe)) + { + if (ret == -EINTR) // this may occur during shutdown + break; + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_wait_cqe() returned %d\n", + ME_ERROR_LOG | ME_FATAL, ret); + abort(); + } + + auto *iocb= static_cast<tpool::aiocb*>(io_uring_cqe_get_data(cqe)); + if (!iocb) + break; + + int res= cqe->res; + if (res < 0) + { + iocb->m_err= -res; + iocb->m_ret_len= 0; + } + else + { + iocb->m_err= 0; + iocb->m_ret_len= res; + } + + io_uring_cqe_seen(&aio->uring_, cqe); + + iocb->m_internal_task.m_func= iocb->m_callback; + iocb->m_internal_task.m_arg= iocb; + iocb->m_internal_task.m_group= iocb->m_group; + aio->tpool_->submit_task(&iocb->m_internal_task); + } + } + + io_uring uring_; + std::mutex mutex_; + tpool::thread_pool *tpool_; + std::thread thread_; + + std::vector<native_file_handle> files_; + std::mutex files_mutex_; +}; + +} // namespace + +namespace tpool +{ + +aio *create_linux_aio(thread_pool *pool, int max_aio) +{ + try { + return new aio_uring(pool, max_aio); + } catch (std::runtime_error& error) { + return nullptr; + } +} + +} // namespace tpool diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc index d9aa8be2347..4abc2139881 100644 --- a/tpool/aio_linux.cc +++ b/tpool/aio_linux.cc @@ -16,7 +16,6 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ #include "tpool_structs.h" #include "tpool.h" -#ifdef LINUX_NATIVE_AIO # include <thread> # include <atomic> # include <libaio.h> @@ -69,7 +68,6 @@ static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev) } return ret; } -#endif /* @@ -84,7 +82,6 @@ static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev) */ namespace tpool { -#ifdef LINUX_NATIVE_AIO class aio_linux final : public aio { @@ -187,7 +184,4 @@ aio *create_linux_aio(thread_pool *pool, int max_io) } return new aio_linux(ctx, pool); } -#else -aio *create_linux_aio(thread_pool*, int) { return nullptr; } -#endif } diff --git a/tpool/tpool.h b/tpool/tpool.h index 3a5658c0f36..d33c0608959 100644 --- a/tpool/tpool.h +++ b/tpool/tpool.h @@ -22,6 +22,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ #ifdef LINUX_NATIVE_AIO #include <libaio.h> #endif +#ifdef HAVE_URING +#include <sys/uio.h> +#endif #ifdef _WIN32 #ifndef NOMINMAX #define NOMINMAX @@ -123,6 +126,8 @@ struct aiocb :OVERLAPPED #elif defined LINUX_NATIVE_AIO :iocb +#elif defined HAVE_URING + :iovec #endif { native_file_handle m_fh; diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc index 7c645b09785..0c769d67c99 100644 --- a/tpool/tpool_generic.cc +++ b/tpool/tpool_generic.cc @@ -38,7 +38,11 @@ namespace tpool { #ifdef __linux__ +#if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO) extern aio* create_linux_aio(thread_pool* tp, int max_io); +#else + aio *create_linux_aio(thread_pool *, int) { return nullptr; }; +#endif #endif #ifdef _WIN32 extern aio* create_win_aio(thread_pool* tp, int max_io); |