diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2019-10-29 18:17:24 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2019-11-15 16:50:22 +0100 |
commit | 00ee8d85c925846acc76df2a6fc7c67a062c2ea6 (patch) | |
tree | 3b8c71b64f089d3a625eeda5c3c188098d50b07b /tpool | |
parent | 7e08dd85d6271be4750c5989ccd5053df281d2aa (diff) | |
download | mariadb-git-00ee8d85c925846acc76df2a6fc7c67a062c2ea6.tar.gz |
MDEV-16264: Add threadpool library
The library is capable of
- asynchronous execution of tasks (and optionally waiting for them)
- asynchronous file IO
This is implemented using libaio on Linux and completion ports on
Windows. Elsewhere, async io is "simulated", which means worker threads
are performing synchronous IO.
- timers, scheduling work asynchronously in some point of the future.
Also periodic timers are implemented.
Diffstat (limited to 'tpool')
-rw-r--r-- | tpool/CMakeLists.txt | 28 | ||||
-rw-r--r-- | tpool/aio_linux.cc | 157 | ||||
-rw-r--r-- | tpool/aio_simulated.cc | 180 | ||||
-rw-r--r-- | tpool/aio_win.cc | 139 | ||||
-rw-r--r-- | tpool/task.cc | 68 | ||||
-rw-r--r-- | tpool/task_group.cc | 90 | ||||
-rw-r--r-- | tpool/tpool.h | 241 | ||||
-rw-r--r-- | tpool/tpool_generic.cc | 770 | ||||
-rw-r--r-- | tpool/tpool_structs.h | 353 | ||||
-rw-r--r-- | tpool/tpool_win.cc | 291 |
10 files changed, 2317 insertions, 0 deletions
diff --git a/tpool/CMakeLists.txt b/tpool/CMakeLists.txt new file mode 100644 index 00000000000..fc33f9b6932 --- /dev/null +++ b/tpool/CMakeLists.txt @@ -0,0 +1,28 @@ +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}) +IF(WIN32) + SET(EXTRA_SOURCES tpool_win.cc aio_win.cc) +ELSE() + SET(EXTRA_SOURCES aio_linux.cc) +ENDIF() + +IF(CMAKE_SYSTEM_NAME STREQUAL "Linux") + CHECK_INCLUDE_FILES (libaio.h HAVE_LIBAIO_H) + CHECK_LIBRARY_EXISTS(aio io_queue_init "" HAVE_LIBAIO) + IF(HAVE_LIBAIO_H AND HAVE_LIBAIO) + ADD_DEFINITIONS(-DLINUX_NATIVE_AIO=1) + LINK_LIBRARIES(aio) + ENDIF() +ENDIF() + +ADD_LIBRARY(tpool STATIC + aio_simulated.cc + tpool_structs.h + CMakeLists.txt + tpool.h + tpool_generic.cc + task_group.cc + task.cc + ${EXTRA_SOURCES} +) + +INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include)
\ No newline at end of file diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc new file mode 100644 index 00000000000..0a4820a2412 --- /dev/null +++ b/tpool/aio_linux.cc @@ -0,0 +1,157 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" + +#include <stdlib.h> +#include <signal.h> +#include <assert.h> +#include "tpool.h" +#include <thread> +#ifdef LINUX_NATIVE_AIO +#include <libaio.h> +#endif +/* + Linux AIO implementation, based on native AIO. + Needs libaio.h and -laio at the compile time. + + submit_io() is used to submit async IO. + + There is a single thread, that collects the completion notification + with io_getevent(), and forwards io completion callback + the worker threadpool. +*/ +namespace tpool +{ +#ifdef LINUX_NATIVE_AIO + +class aio_linux : public aio +{ + int m_max_io_count; + thread_pool* m_pool; + io_context_t m_io_ctx; + bool m_in_shutdown; + std::thread m_getevent_thread; + + static void getevent_thread_routine(aio_linux* aio) + { + for (;;) + { + io_event event; + struct timespec ts{0, 500000000}; + int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts); + + if (aio->m_in_shutdown) + break; + + if (ret > 0) + { + aiocb* iocb = (aiocb*)event.obj; + long long res = event.res; + if (res < 0) + { + iocb->m_err = -res; + iocb->m_ret_len = 0; + } + else + { + iocb->m_ret_len = ret; + iocb->m_err = 0; + } + + iocb->m_internal_task.m_func = iocb->m_callback; + iocb->m_internal_task.m_arg = iocb; + iocb->m_internal_task.m_group = iocb->m_group; + aio->m_pool->submit_task(&iocb->m_internal_task); + continue; + } + switch (ret) + { + case -EAGAIN: + usleep(1000); + continue; + case -EINTR: + case 0: + continue; + default: + fprintf(stderr, "io_getevents returned %d\n", ret); + abort(); + } + } + } + +public: + aio_linux(io_context_t ctx, thread_pool* pool, size_t max_count) + : m_max_io_count(max_count), m_pool(pool), m_io_ctx(ctx), + m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this) + { + } + + ~aio_linux() + { + m_in_shutdown = true; + m_getevent_thread.join(); + io_destroy(m_io_ctx); + } + + // Inherited via aio + virtual int submit_io(aiocb* cb) override + { + + if (cb->m_opcode == aio_opcode::AIO_PREAD) + io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len, + cb->m_offset); + else + io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len, + cb->m_offset); + + int ret; + ret = io_submit(m_io_ctx, 1, (iocb * *)& cb); + if (ret == 1) + return 0; + errno = -ret; + return -1; + } + + // Inherited via aio + virtual int bind(native_file_handle& fd) override + { + return 0; + } + virtual int unbind(const native_file_handle& fd) override + { + return 0; + } +}; + +aio* create_linux_aio(thread_pool* pool, int max_io) +{ + io_context_t ctx; + memset(&ctx, 0, sizeof(ctx)); + int ret = io_setup(max_io, &ctx); + if (ret) + { + fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret); + return nullptr; + } + return new aio_linux(ctx, pool, max_io); +} +#else +aio* create_linux_aio(thread_pool* pool, int max_aio) +{ + return nullptr; +} +#endif +} diff --git a/tpool/aio_simulated.cc b/tpool/aio_simulated.cc new file mode 100644 index 00000000000..41c4897336d --- /dev/null +++ b/tpool/aio_simulated.cc @@ -0,0 +1,180 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#ifndef _WIN32 +#include <unistd.h> /* pread(), pwrite() */ +#endif +#include "tpool.h" +#include "tpool_structs.h" +#include <stdlib.h> +#include <string.h> + +namespace tpool +{ +#ifdef _WIN32 + +/* + In order to be able to execute synchronous IO even on file opened + with FILE_FLAG_OVERLAPPED, and to bypass to completion port, + we use valid event handle for the hEvent member of the OVERLAPPED structure, + with its low-order bit set. + + ´See MSDN docs for GetQueuedCompletionStatus() for description of this trick. +*/ +static DWORD fls_sync_io= FLS_OUT_OF_INDEXES; +HANDLE win_get_syncio_event() +{ + HANDLE h; + + h= (HANDLE) FlsGetValue(fls_sync_io); + if (h) + { + return h; + } + h= CreateEventA(NULL, FALSE, FALSE, NULL); + /* Set low-order bit to keeps I/O completion from being queued */ + h= (HANDLE)((uintptr_t) h | 1); + FlsSetValue(fls_sync_io, h); + return h; +} +#include <WinIoCtl.h> +static void __stdcall win_free_syncio_event(void *data) +{ + if (data) + { + CloseHandle((HANDLE) data); + } +} + +struct WinIoInit +{ + WinIoInit() + { + fls_sync_io= FlsAlloc(win_free_syncio_event); + if(fls_sync_io == FLS_OUT_OF_INDEXES) + abort(); + } + ~WinIoInit() { FlsFree(fls_sync_io); } +}; + +static WinIoInit win_io_init; + + +int pread(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset) +{ + OVERLAPPED ov{}; + ULARGE_INTEGER uli; + uli.QuadPart= offset; + ov.Offset= uli.LowPart; + ov.OffsetHigh= uli.HighPart; + ov.hEvent= win_get_syncio_event(); + + if (ReadFile(h, buf, (DWORD) count, 0, &ov) || + (GetLastError() == ERROR_IO_PENDING)) + { + DWORD n_bytes; + if (GetOverlappedResult(h, &ov, &n_bytes, TRUE)) + return n_bytes; + } + + return -1; +} + +int pwrite(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset) +{ + OVERLAPPED ov{}; + ULARGE_INTEGER uli; + uli.QuadPart= offset; + ov.Offset= uli.LowPart; + ov.OffsetHigh= uli.HighPart; + ov.hEvent= win_get_syncio_event(); + + if (WriteFile(h, buf, (DWORD) count, 0, &ov) || + (GetLastError() == ERROR_IO_PENDING)) + { + DWORD n_bytes; + if (GetOverlappedResult(h, &ov, &n_bytes, TRUE)) + return n_bytes; + } + return -1; +} +#endif + +/** + Simulated AIO. + + Executes IO synchronously in worker pool + and then calls the completion routine. +*/ +class simulated_aio : public aio +{ + thread_pool *m_pool; + +public: + simulated_aio(thread_pool *tp) + : m_pool(tp) + { + } + + static void simulated_aio_callback(void *param) + { + aiocb *cb= (aiocb *) param; + int ret_len; + int err= 0; + switch (cb->m_opcode) + { + case aio_opcode::AIO_PREAD: + ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + case aio_opcode::AIO_PWRITE: + ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + default: + abort(); + } + if (ret_len < 0) + { +#ifdef _WIN32 + err= GetLastError(); +#else + err= errno; +#endif + } + cb->m_ret_len = ret_len; + cb->m_err = err; + cb->m_callback(cb); + } + + virtual int submit_io(aiocb *aiocb) override + { + aiocb->m_internal_task.m_func = simulated_aio_callback; + aiocb->m_internal_task.m_arg = aiocb; + aiocb->m_internal_task.m_group = aiocb->m_group; + m_pool->submit_task(&aiocb->m_internal_task); + return 0; + } + + virtual int bind(native_file_handle &fd) override { return 0; } + virtual int unbind(const native_file_handle &fd) override { return 0; } +}; + +aio *create_simulated_aio(thread_pool *tp) +{ + return new simulated_aio(tp); +} + +} // namespace tpool
\ No newline at end of file diff --git a/tpool/aio_win.cc b/tpool/aio_win.cc new file mode 100644 index 00000000000..b44f705bd1e --- /dev/null +++ b/tpool/aio_win.cc @@ -0,0 +1,139 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include <algorithm> +#include <assert.h> +#include <condition_variable> +#include <iostream> +#include <limits.h> +#include <mutex> +#include <queue> +#include <stack> +#include <thread> +#include <vector> +#include <tpool.h> + +namespace tpool +{ + +/* + Windows AIO implementation, completion port based. + A single thread collects the completion notification with + GetQueuedCompletionStatus(), and forwards io completion callback + the worker threadpool +*/ +class tpool_generic_win_aio : public aio +{ + /* Thread that does collects completion status from the completion port. */ + std::thread m_thread; + + /* IOCP Completion port.*/ + HANDLE m_completion_port; + + /* The worker pool where completion routine is executed, as task. */ + thread_pool* m_pool; +public: + tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool) + { + m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); + m_thread = std::thread(aio_completion_thread_proc, this); + } + + /** + Task to be executed in the work pool. + */ + static void io_completion_task(void* data) + { + auto cb = (aiocb*)data; + cb->execute_callback(); + } + + void completion_thread_work() + { + for (;;) + { + DWORD n_bytes; + aiocb* aiocb; + ULONG_PTR key; + if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key, + (LPOVERLAPPED*)& aiocb, INFINITE)) + break; + + aiocb->m_err = 0; + aiocb->m_ret_len = n_bytes; + + if (n_bytes != aiocb->m_len) + { + if (GetOverlappedResult(aiocb->m_fh, aiocb, + (LPDWORD)& aiocb->m_ret_len, FALSE)) + { + aiocb->m_err = GetLastError(); + } + } + aiocb->m_internal_task.m_func = aiocb->m_callback; + aiocb->m_internal_task.m_arg = aiocb; + aiocb->m_internal_task.m_group = aiocb->m_group; + m_pool->submit_task(&aiocb->m_internal_task); + } + } + + static void aio_completion_thread_proc(tpool_generic_win_aio* aio) + { + aio->completion_thread_work(); + } + + ~tpool_generic_win_aio() + { + if (m_completion_port) + CloseHandle(m_completion_port); + m_thread.join(); + } + + virtual int submit_io(aiocb* cb) override + { + memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED)); + cb->m_internal = this; + ULARGE_INTEGER uli; + uli.QuadPart = cb->m_offset; + cb->Offset = uli.LowPart; + cb->OffsetHigh = uli.HighPart; + + BOOL ok; + if (cb->m_opcode == aio_opcode::AIO_PREAD) + ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + else + ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + + if (ok || (GetLastError() == ERROR_IO_PENDING)) + return 0; + return -1; + } + + // Inherited via aio + virtual int bind(native_file_handle& fd) override + { + return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0 + : GetLastError(); + } + virtual int unbind(const native_file_handle& fd) override { return 0; } +}; + +aio* create_win_aio(thread_pool* pool, int max_io) +{ + return new tpool_generic_win_aio(pool, max_io); +} + +} // namespace tpool diff --git a/tpool/task.cc b/tpool/task.cc new file mode 100644 index 00000000000..6d456aa6f30 --- /dev/null +++ b/tpool/task.cc @@ -0,0 +1,68 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include <tpool.h> +#include <queue> +#include <mutex> +#include <condition_variable> +#include <tpool_structs.h> + +namespace tpool +{ + task::task(callback_func func, void* arg, task_group* group) : + m_func(func), m_arg(arg), m_group(group) {} + + void task::execute() + { + if (m_group) + { + /* Executing in a group (limiting concurrency).*/ + m_group->execute(this); + } + else + { + /* Execute directly. */ + m_func(m_arg); + release(); + } + } + + /* Task that provide wait() operation. */ + waitable_task::waitable_task(callback_func func, void* arg, task_group* group) : + task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(){} + + void waitable_task::add_ref() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_ref_count++; + } + + void waitable_task::release() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_ref_count--; + if (!m_ref_count && m_waiter_count) + m_cv.notify_all(); + } + void waitable_task::wait() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_waiter_count++; + while (m_ref_count) + m_cv.wait(lk); + m_waiter_count--; + } + +}
\ No newline at end of file diff --git a/tpool/task_group.cc b/tpool/task_group.cc new file mode 100644 index 00000000000..d65c658a198 --- /dev/null +++ b/tpool/task_group.cc @@ -0,0 +1,90 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include <tpool.h> +#include <queue> +#include <mutex> +#include <condition_variable> +#include <tpool_structs.h> +#include <thread> +namespace tpool +{ + task_group::task_group(unsigned int max_concurrency) : + m_queue(8), + m_mtx(), + m_tasks_running(), + m_max_concurrent_tasks(max_concurrency) + {}; + + void task_group::set_max_tasks(unsigned int max_concurrency) + { + std::unique_lock<std::mutex> lk(m_mtx); + m_max_concurrent_tasks = max_concurrency; + } + void task_group::execute(task* t) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (m_tasks_running == m_max_concurrent_tasks) + { + /* Queue for later execution by another thread.*/ + m_queue.push(t); + return; + } + m_tasks_running++; + for (;;) + { + lk.unlock(); + if (t) + { + t->m_func(t->m_arg); + t->release(); + } + lk.lock(); + + if (m_queue.empty()) + break; + t = m_queue.front(); + m_queue.pop(); + } + m_tasks_running--; + } + + void task_group::cancel_pending(task* t) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (!t) + m_queue.clear(); + for (auto it = m_queue.begin(); it != m_queue.end(); it++) + { + if (*it == t) + { + (*it)->release(); + (*it) = nullptr; + } + } + } + + task_group::~task_group() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_queue.clear(); + while (m_tasks_running) + { + lk.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + lk.lock(); + } + } +}
\ No newline at end of file diff --git a/tpool/tpool.h b/tpool/tpool.h new file mode 100644 index 00000000000..7b504c5ef57 --- /dev/null +++ b/tpool/tpool.h @@ -0,0 +1,241 @@ +/* Copyright(C) 2019 MariaDB + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#pragma once +#include <memory> /* unique_ptr */ +#include <condition_variable> +#include <mutex> +#include <atomic> +#include <tpool_structs.h> +#ifdef LINUX_NATIVE_AIO +#include <libaio.h> +#endif +#ifdef _WIN32 +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include <windows.h> +/** + Windows-specific native file handle struct. + Apart from the actual handle, contains PTP_IO + used by the Windows threadpool. +*/ +struct native_file_handle +{ + HANDLE m_handle; + PTP_IO m_ptp_io; + native_file_handle(){}; + native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {} + operator HANDLE() const { return m_handle; } +}; +#else +#include <unistd.h> +typedef int native_file_handle; +#endif + +namespace tpool +{ +/** + Task callback function + */ +typedef void (*callback_func)(void *); +class task; + +/* A class that can be used e.g for +restricting concurrency for specific class of tasks. */ + +class task_group +{ +private: + circular_queue<task*> m_queue; + std::mutex m_mtx; + std::condition_variable m_cv; + unsigned int m_tasks_running; + unsigned int m_max_concurrent_tasks; +public: + task_group(unsigned int max_concurrency= 100000); + void set_max_tasks(unsigned int max_concurrent_tasks); + void execute(task* t); + void cancel_pending(task *t); + ~task_group(); +}; + + +class task +{ +public: + callback_func m_func; + void *m_arg; + task_group* m_group; + virtual void add_ref() {}; + virtual void release() {}; + task() {}; + task(callback_func func, void* arg, task_group* group = nullptr); + void* get_arg() { return m_arg; } + callback_func get_func() { return m_func; } + virtual void execute(); + virtual ~task() {} +}; + +class waitable_task :public task +{ + std::mutex m_mtx; + std::condition_variable m_cv; + int m_ref_count; + int m_waiter_count; +public: + waitable_task(callback_func func, void* arg, task_group* group = nullptr); + void add_ref() override; + void release() override; + bool is_running() { return m_ref_count > 0; } + bool get_ref_count() {return m_ref_count;} + void wait(); + virtual ~waitable_task() {}; +}; +enum class aio_opcode +{ + AIO_PREAD, + AIO_PWRITE +}; +const int MAX_AIO_USERDATA_LEN= 40; +struct aiocb; + +/** IO control block, includes parameters for the IO, and the callback*/ + +struct aiocb +#ifdef _WIN32 + :OVERLAPPED +#elif defined LINUX_NATIVE_AIO + :iocb +#endif +{ + native_file_handle m_fh; + aio_opcode m_opcode; + unsigned long long m_offset; + void *m_buffer; + unsigned int m_len; + callback_func m_callback; + task_group* m_group; + /* Returned length and error code*/ + int m_ret_len; + int m_err; + void *m_internal; + task m_internal_task; + char m_userdata[MAX_AIO_USERDATA_LEN]; + + aiocb() : m_internal_task(nullptr, nullptr) + {} + void execute_callback() + { + task t(m_callback, this,m_group); + t.execute(); + } +}; + + +/** + AIO interface +*/ +class aio +{ +public: + /** + Submit asyncronous IO. + On completion, cb->m_callback is executed. + */ + virtual int submit_io(aiocb *cb)= 0; + /** "Bind" file to AIO handler (used on Windows only) */ + virtual int bind(native_file_handle &fd)= 0; + /** "Unind" file to AIO handler (used on Windows only) */ + virtual int unbind(const native_file_handle &fd)= 0; + virtual ~aio(){}; +}; + +class timer +{ +public: + virtual void set_time(int initial_delay_ms, int period_ms) = 0; + virtual void disarm() = 0; + virtual ~timer(){} +}; + +class thread_pool; + +extern aio *create_simulated_aio(thread_pool *tp); + +class thread_pool +{ +protected: + /* AIO handler */ + std::unique_ptr<aio> m_aio; + virtual aio *create_native_aio(int max_io)= 0; + + /** + Functions to be called at worker thread start/end + can be used for example to set some TLS variables + */ + void (*m_worker_init_callback)(void); + void (*m_worker_destroy_callback)(void); + +public: + thread_pool() : m_aio(), m_worker_init_callback(), m_worker_destroy_callback() + { + } + virtual void submit_task(task *t)= 0; + virtual timer* create_timer(callback_func func, void *data=nullptr) = 0; + void set_thread_callbacks(void (*init)(), void (*destroy)()) + { + m_worker_init_callback= init; + m_worker_destroy_callback= destroy; + } + int configure_aio(bool use_native_aio, int max_io) + { + if (use_native_aio) + m_aio.reset(create_native_aio(max_io)); + if (!m_aio) + m_aio.reset(create_simulated_aio(this)); + return !m_aio ? -1 : 0; + } + void disable_aio() + { + m_aio.reset(); + } + int bind(native_file_handle &fd) { return m_aio->bind(fd); } + void unbind(const native_file_handle &fd) { m_aio->unbind(fd); } + int submit_io(aiocb *cb) { return m_aio->submit_io(cb); } + virtual ~thread_pool() {} +}; +const int DEFAULT_MIN_POOL_THREADS= 1; +const int DEFAULT_MAX_POOL_THREADS= 500; +extern thread_pool * +create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS, + int max_threads= DEFAULT_MAX_POOL_THREADS); +#ifdef _WIN32 +extern thread_pool * +create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS, + int max_threads= DEFAULT_MAX_POOL_THREADS); + +/* + Helper functions, to execute pread/pwrite even if file is + opened with FILE_FLAG_OVERLAPPED, and bound to completion + port. +*/ +int pwrite(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset); +int pread(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset); +HANDLE win_get_syncio_event(); +#endif +} // namespace tpool diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc new file mode 100644 index 00000000000..2a4971cd555 --- /dev/null +++ b/tpool/tpool_generic.cc @@ -0,0 +1,770 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include <limits.h> +#include <algorithm> +#include <assert.h> +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <iostream> +#include <limits.h> +#include <mutex> +#include <queue> +#include <stack> +#include <thread> +#include <vector> +#include "tpool.h" +#include <assert.h> +#include <my_global.h> +#include <my_dbug.h> +#include <thr_timer.h> +#include <stdlib.h> + +namespace tpool +{ + +#ifdef __linux__ + extern aio* create_linux_aio(thread_pool* tp, int max_io); +#endif +#ifdef _WIN32 + extern aio* create_win_aio(thread_pool* tp, int max_io); +#endif + + static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500); + static const int OVERSUBSCRIBE_FACTOR = 2; + +/** + Implementation of generic threadpool. + This threadpool consists of the following components + + - The task queue. This queue is populated by submit() + - Worker that execute the work items. + - Timer thread that takes care of pool health + + The task queue is populated by submit() method. + on submit(), a worker thread can be woken, or created + to execute tasks. + + The timer thread watches if work items are being dequeued, and if not, + this can indicate potential deadlock. + Thus the timer thread can also wake or create a thread, to ensure some progress. + + Optimizations: + + - worker threads that are idle for long time will shutdown. + - worker threads are woken in LIFO order, which minimizes context switching + and also ensures that idle timeout works well. LIFO wakeup order ensures + that active threads stay active, and idle ones stay idle. + + - to minimize spurious wakeups, some items are not put into the queue. Instead + submit() will pass the data directly to the thread it woke up. +*/ + +/** + Worker wakeup flags. +*/ +enum worker_wake_reason +{ + WAKE_REASON_NONE, + WAKE_REASON_TASK, + WAKE_REASON_SHUTDOWN +}; + + + +/* A per-worker thread structure.*/ +struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data +{ + /** Condition variable to wakeup this worker.*/ + std::condition_variable m_cv; + + /** Reason why worker was woken. */ + worker_wake_reason m_wake_reason; + + /** + If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute. + */ + task* m_task; + + /** Struct is member of intrusive doubly linked list */ + worker_data* m_prev; + worker_data* m_next; + + /* Current state of the worker.*/ + enum state + { + NONE = 0, + EXECUTING_TASK = 1, + LONG_TASK = 2 + }; + + int m_state; + + bool is_executing_task() + { + return m_state & EXECUTING_TASK; + } + bool is_long_task() + { + return m_state & LONG_TASK; + } + std::chrono::system_clock::time_point m_task_start_time; + worker_data() : + m_cv(), + m_wake_reason(WAKE_REASON_NONE), + m_task(), + m_prev(), + m_next(), + m_state(NONE), + m_task_start_time() + {} + + /*Define custom new/delete because of overaligned structure. */ + void* operator new(size_t size) + { +#ifdef _WIN32 + return _aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE); +#else + void* ptr; + int ret = posix_memalign(&ptr, CPU_LEVEL1_DCACHE_LINESIZE, size); + return ret ? 0 : ptr; +#endif + } + void operator delete(void* p) + { +#ifdef _WIN32 + _aligned_free(p); +#else + free(p); +#endif + } +}; + +class thread_pool_generic : public thread_pool +{ + /** Cache for per-worker structures */ + cache<worker_data> m_thread_data_cache; + + /** The task queue */ + circular_queue<task*> m_task_queue; + + /* List of standby (idle) workers.*/ + doubly_linked_list<worker_data> m_standby_threads; + + /** List of threads that are executing tasks. */ + doubly_linked_list<worker_data> m_active_threads; + + /* Mutex that protects the whole struct, most importantly + the standby threads list, and task queue. */ + std::mutex m_mtx; + + /** Timeout after which idle worker shuts down.*/ + std::chrono::milliseconds m_thread_timeout; + + /** How often should timer wakeup.*/ + std::chrono::milliseconds m_timer_interval; + + /** Another condition variable, used in pool shutdown-*/ + std::condition_variable m_cv_no_threads; + + /** Condition variable for the timer thread. Signaled on shutdown.*/ + std::condition_variable m_cv_timer; + + /** Overall number of enqueues*/ + unsigned long long m_tasks_enqueued; + /** Overall number of dequeued tasks. */ + unsigned long long m_tasks_dequeued; + + /**Statistic related, number of worker thread wakeups.*/ + int m_wakeups; + + /** + Statistic related, number of spurious thread wakeups + (i.e thread woke up, and the task queue is empty) + */ + int m_spurious_wakeups; + + /** The desired concurrency. This number of workers should be actively executing.*/ + unsigned int m_concurrency; + + /** True, if threadpool is being shutdown, false otherwise */ + bool m_in_shutdown; + + /** time point when timer last ran, used as a coarse clock. */ + std::chrono::system_clock::time_point m_timestamp; + + /** Number of long running tasks. The long running tasks are excluded when + adjusting concurrency */ + int m_long_tasks_count; + + /** Last time thread was created*/ + std::chrono::system_clock::time_point m_last_thread_creation; + + /** Minimumum number of threads in this pool.*/ + unsigned int m_min_threads; + + /** Maximimum number of threads in this pool. */ + unsigned int m_max_threads; + + /* Maintainence related statistics (see maintainence()) */ + size_t m_last_thread_count; + unsigned long long m_last_activity; + std::unique_ptr<timer> m_maintaince_timer_task; + + void worker_main(worker_data *thread_data); + void worker_end(worker_data* thread_data); + + /* Checks threadpool responsiveness, adjusts thread_counts */ + void maintainence(); + static void maintainence_func(void* arg) + { + ((thread_pool_generic *)arg)->maintainence(); + } + bool add_thread(); + bool wake(worker_wake_reason reason, task *t = nullptr); + void wake_or_create_thread(); + bool get_task(worker_data *thread_var, task **t); + bool wait_for_tasks(std::unique_lock<std::mutex> &lk, + worker_data *thread_var); + void cancel_pending(task* t); + + size_t thread_count() + { + return m_active_threads.size() + m_standby_threads.size(); + } +public: + thread_pool_generic(int min_threads, int max_threads); + ~thread_pool_generic(); + void submit_task(task *task) override; + virtual aio *create_native_aio(int max_io) override + { +#ifdef _WIN32 + return create_win_aio(this, max_io); +#elif defined(__linux__) + return create_linux_aio(this,max_io); +#else + return nullptr; +#endif + } + + class timer_generic : public thr_timer_t, public timer + { + thread_pool_generic* m_pool; + waitable_task m_task; + callback_func m_callback; + void* m_data; + int m_period; + std::mutex m_mtx; + bool m_on; + std::atomic<bool> m_running; + + void run() + { + /* + In rare cases, multiple callbacks can be scheduled, + e.g with set_time(0,0) in a loop. + We do not allow parallel execution, as user is not prepared. + */ + bool expected = false; + if (!m_running.compare_exchange_strong(expected, true)) + return; + + m_callback(m_data); + + m_running = false; + + if (m_pool && m_period) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (m_on) + { + thr_timer_end(this); + thr_timer_settime(this, 1000ULL * m_period); + } + } + } + + static void execute(void* arg) + { + auto timer = (timer_generic*)arg; + timer->run(); + } + + static void submit_task(void* arg) + { + timer_generic* timer = (timer_generic*)arg; + timer->m_pool->submit_task(&timer->m_task); + } + + public: + timer_generic(callback_func func, void* data, thread_pool_generic * pool): + m_pool(pool), + m_task(timer_generic::execute,this), + m_callback(func),m_data(data),m_period(0),m_mtx(), + m_on(true),m_running() + { + if (pool) + { + /* EXecute callback in threadpool*/ + thr_timer_init(this, submit_task, this); + } + else + { + /* run in "timer" thread */ + thr_timer_init(this, m_task.get_func(), m_task.get_arg()); + } + } + + void set_time(int initial_delay_ms, int period_ms) override + { + std::unique_lock<std::mutex> lk(m_mtx); + if (!m_on) + return; + thr_timer_end(this); + if (!m_pool) + thr_timer_set_period(this, 1000ULL * period_ms); + else + m_period = period_ms; + thr_timer_settime(this, 1000ULL * initial_delay_ms); + } + + void disarm() override + { + std::unique_lock<std::mutex> lk(m_mtx); + m_on = false; + thr_timer_end(this); + lk.unlock(); + + if (m_task.m_group) + { + m_task.m_group->cancel_pending(&m_task); + } + if (m_pool) + { + m_pool->cancel_pending(&m_task); + } + m_task.wait(); + } + + virtual ~timer_generic() + { + disarm(); + } + }; + + virtual timer* create_timer(callback_func func, void *data) override + { + return new timer_generic(func, data, this); + } +}; + +void thread_pool_generic::cancel_pending(task* t) +{ + std::unique_lock <std::mutex> lk(m_mtx); + for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++) + { + if (*it == t) + { + t->release(); + *it = nullptr; + } + } +} +/** + Register worker in standby list, and wait to be woken. + + @return + true - thread was woken + false - idle wait timeout exceeded (the current thread need to shutdown) +*/ +bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk, + worker_data *thread_data) +{ + assert(m_task_queue.empty()); + assert(!m_in_shutdown); + + thread_data->m_wake_reason= WAKE_REASON_NONE; + m_active_threads.erase(thread_data); + m_standby_threads.push_back(thread_data); + + for (;;) + { + thread_data->m_cv.wait_for(lk, m_thread_timeout); + + if (thread_data->m_wake_reason != WAKE_REASON_NONE) + { + /* Woke up not due to timeout.*/ + return true; + } + + if (thread_count() <= m_min_threads) + { + /* Do not shutdown thread, maintain required minimum of worker + threads.*/ + continue; + } + + /* + Woke up due to timeout, remove this thread's from the standby list. In + all other cases where it is signaled it is removed by the signaling + thread. + */ + m_standby_threads.erase(thread_data); + m_active_threads.push_back(thread_data); + return false; + } +} + +/** + Workers "get next task" routine. + + A task can be handed over to the current thread directly during submit(). + if thread_var->m_wake_reason == WAKE_REASON_TASK. + + Or a task can be taken from the task queue. + In case task queue is empty, the worker thread will park (wait for wakeup). +*/ +bool thread_pool_generic::get_task(worker_data *thread_var, task **t) +{ + std::unique_lock<std::mutex> lk(m_mtx); + + if (thread_var->is_long_task() && m_long_tasks_count) + m_long_tasks_count--; + + thread_var->m_state = worker_data::NONE; + + if (m_task_queue.empty()) + { + if (m_in_shutdown) + return false; + + if (!wait_for_tasks(lk, thread_var)) + return false; + + /* Task was handed over directly by signaling thread.*/ + if (thread_var->m_wake_reason == WAKE_REASON_TASK) + { + *t= thread_var->m_task; + goto end; + } + + if (m_task_queue.empty()) + return false; + } + + /* Dequeue from the task queue.*/ + *t= m_task_queue.front(); + m_task_queue.pop(); + m_tasks_dequeued++; + +end: + thread_var->m_state |= worker_data::EXECUTING_TASK; + thread_var->m_task_start_time = m_timestamp; + return true; +} + +/** Worker thread shutdown routine. */ +void thread_pool_generic::worker_end(worker_data* thread_data) +{ + std::lock_guard<std::mutex> lk(m_mtx); + m_active_threads.erase(thread_data); + m_thread_data_cache.put(thread_data); + + if (!thread_count() && m_in_shutdown) + { + /* Signal the destructor that no more threads are left. */ + m_cv_no_threads.notify_all(); + } +} + +/* The worker get/execute task loop.*/ +void thread_pool_generic::worker_main(worker_data *thread_var) +{ + task* task; + + if(m_worker_init_callback) + m_worker_init_callback(); + + while (get_task(thread_var, &task) && task) + { + task->execute(); + } + + if (m_worker_destroy_callback) + m_worker_destroy_callback(); + + worker_end(thread_var); +} + +/* + Periodic job to fix thread count and concurrency, + in case of long tasks, etc +*/ +void thread_pool_generic::maintainence() +{ + /* + If pool is busy (i.e the its mutex is currently locked), we can + skip the maintainence task, some times, to lower mutex contention + */ + static int skip_counter; + const int MAX_SKIPS = 10; + std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock); + if (skip_counter == MAX_SKIPS) + { + lk.lock(); + } + else if (!lk.try_lock()) + { + skip_counter++; + return; + } + + skip_counter = 0; + + m_timestamp = std::chrono::system_clock::now(); + + m_long_tasks_count = 0; + + if (m_task_queue.empty()) + { + m_last_activity = m_tasks_dequeued + m_wakeups; + return; + } + + for (auto thread_data = m_active_threads.front(); + thread_data; + thread_data = thread_data->m_next) + { + if (thread_data->is_executing_task() && + (thread_data->is_long_task() + || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION))) + { + thread_data->m_state |= worker_data::LONG_TASK; + m_long_tasks_count++; + } + } + size_t thread_cnt = (int)thread_count(); + if (m_active_threads.size() - m_long_tasks_count < m_concurrency*OVERSUBSCRIBE_FACTOR) + { + wake_or_create_thread(); + return; + } + if (m_last_activity == m_tasks_dequeued + m_wakeups && + m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt) + { + // no progress made since last iteration. create new + // thread + add_thread(); + } + m_last_activity = m_tasks_dequeued + m_wakeups; + m_last_thread_count= thread_cnt; +} + +/* + Heuristic used for thread creation throttling. + Returns interval in milliseconds between thread creation + (depending on number of threads already in the pool, and + desired concurrency level) +*/ +static int throttling_interval_ms(size_t n_threads,size_t concurrency) +{ + if (n_threads < concurrency*4) + return 0; + + if (n_threads < concurrency*8) + return 50; + + if (n_threads < concurrency*16) + return 100; + + return 200; +} + +/* Create a new worker.*/ +bool thread_pool_generic::add_thread() +{ + size_t n_threads = thread_count(); + + if (n_threads >= m_max_threads) + return false; + + if (n_threads >= m_min_threads) + { + auto now = std::chrono::system_clock::now(); + if (now - m_last_thread_creation < + std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency))) + { + /* Throttle thread creation.*/ + return false; + } + } + + worker_data *thread_data = m_thread_data_cache.get(); + m_active_threads.push_back(thread_data); + try + { + std::thread thread(&thread_pool_generic::worker_main, this, thread_data); + m_last_thread_creation = std::chrono::system_clock::now(); + thread.detach(); + } + catch (std::system_error& e) + { + m_active_threads.erase(thread_data); + m_thread_data_cache.put(thread_data); + static bool warning_written; + if (!warning_written) + { + fprintf(stderr, "Warning : threadpool thread could not be created :%s," + "current number of threads in pool %zu\n", e.what(), thread_count()); + warning_written = true; + } + return false; + } + return true; +} + +/** Wake a standby thread, and hand the given task over to this thread. */ +bool thread_pool_generic::wake(worker_wake_reason reason, task *t) +{ + assert(reason != WAKE_REASON_NONE); + + if (m_standby_threads.empty()) + return false; + auto var= m_standby_threads.back(); + m_standby_threads.pop_back(); + m_active_threads.push_back(var); + assert(var->m_wake_reason == WAKE_REASON_NONE); + var->m_wake_reason= reason; + var->m_cv.notify_one(); + if (t) + { + var->m_task= t; + } + m_wakeups++; + return true; +} + + +thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : + m_thread_data_cache(max_threads), + m_task_queue(10000), + m_standby_threads(), + m_active_threads(), + m_mtx(), + m_thread_timeout(std::chrono::milliseconds(60000)), + m_timer_interval(std::chrono::milliseconds(400)), + m_cv_no_threads(), + m_cv_timer(), + m_tasks_enqueued(), + m_tasks_dequeued(), + m_wakeups(), + m_spurious_wakeups(), + m_concurrency(std::thread::hardware_concurrency()), + m_in_shutdown(), + m_timestamp(), + m_long_tasks_count(), + m_last_thread_creation(), + m_min_threads(min_threads), + m_max_threads(max_threads), + m_last_thread_count(), + m_last_activity(), + m_maintaince_timer_task() +{ + + if (m_max_threads < m_concurrency) + m_concurrency = m_max_threads; + if (m_min_threads > m_concurrency) + m_concurrency = min_threads; + if (!m_concurrency) + m_concurrency = 1; + + if (min_threads < max_threads) + { + m_maintaince_timer_task.reset(new timer_generic(thread_pool_generic::maintainence_func, this, nullptr)); + m_maintaince_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count()); + } +} + + +void thread_pool_generic::wake_or_create_thread() +{ + assert(!m_task_queue.empty()); + if (!m_standby_threads.empty()) + { + auto t= m_task_queue.front(); + m_task_queue.pop(); + wake(WAKE_REASON_TASK, t); + } + else + { + add_thread(); + } +} + + +/** Submit a new task*/ +void thread_pool_generic::submit_task(task* task) +{ + std::unique_lock<std::mutex> lk(m_mtx); + if (m_in_shutdown) + return; + task->add_ref(); + m_tasks_enqueued++; + m_task_queue.push(task); + + if (m_active_threads.size() - m_long_tasks_count < m_concurrency *OVERSUBSCRIBE_FACTOR) + wake_or_create_thread(); +} + +/** + Wake up all workers, and wait until they are gone + Stop the timer. +*/ +thread_pool_generic::~thread_pool_generic() +{ + /* + Stop AIO early. + This is needed to prevent AIO completion thread + from calling submit_task() on an object that is being destroyed. + */ + m_aio.reset(); + + /* Also stop the maintanence task early. */ + m_maintaince_timer_task.reset(); + + std::unique_lock<std::mutex> lk(m_mtx); + m_in_shutdown= true; + + /* Wake up idle threads. */ + while (wake(WAKE_REASON_SHUTDOWN)) + { + } + + while (thread_count()) + { + m_cv_no_threads.wait(lk); + } + + lk.unlock(); +} + +thread_pool *create_thread_pool_generic(int min_threads, int max_threads) +{ + return new thread_pool_generic(min_threads, max_threads); +} + +} // namespace tpool diff --git a/tpool/tpool_structs.h b/tpool/tpool_structs.h new file mode 100644 index 00000000000..6f47b3b3e40 --- /dev/null +++ b/tpool/tpool_structs.h @@ -0,0 +1,353 @@ +/* Copyright(C) 2019 MariaDB Corporation + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#pragma once +#include <vector> +#include <stack> +#include <mutex> +#include <condition_variable> +#include <assert.h> +#include <algorithm> + +namespace tpool +{ + +enum cache_notification_mode +{ + NOTIFY_ONE, + NOTIFY_ALL +}; + +/** + Generic "pointer" cache of a fixed size + with fast put/get operations. + + Compared to STL containers, is faster/does not + do allocations. However, put() operation will wait + if there is no free items. +*/ +template<typename T> class cache +{ + std::mutex m_mtx; + std::condition_variable m_cv; + std::vector<T> m_base; + std::vector<T*> m_cache; + cache_notification_mode m_notification_mode; + int m_waiters; + + bool is_full() + { + return m_cache.size() == m_base.size(); + } + +public: + cache(size_t count, cache_notification_mode mode= tpool::cache_notification_mode::NOTIFY_ALL): + m_mtx(), m_cv(), m_base(count),m_cache(count), m_notification_mode(mode),m_waiters() + { + for(size_t i = 0 ; i < count; i++) + m_cache[i]=&m_base[i]; + } + + T* get(bool blocking=true) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (blocking) + { + while(m_cache.empty()) + m_cv.wait(lk); + } + else + { + if(m_cache.empty()) + return nullptr; + } + T* ret = m_cache.back(); + m_cache.pop_back(); + return ret; + } + + + void put(T *ele) + { + std::unique_lock<std::mutex> lk(m_mtx); + m_cache.push_back(ele); + if (m_notification_mode == NOTIFY_ONE) + m_cv.notify_one(); + else if(m_cache.size() == 1) + m_cv.notify_all(); // Signal cache is not empty + else if(m_waiters && is_full()) + m_cv.notify_all(); // Signal cache is full + } + + bool contains(T* ele) + { + return ele >= &m_base[0] && ele <= &m_base[m_base.size() -1]; + } + + /* Wait until cache is full.*/ + void wait() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_waiters++; + while(!is_full()) + m_cv.wait(lk); + m_waiters--; + } +}; + + +/** + Circular, fixed size queue + used for the task queue. + + Compared to STL queue, this one is + faster, and does not do memory allocations +*/ +template <typename T> class circular_queue +{ + +public: + circular_queue(size_t N = 16) + : m_capacity(N + 1), m_buffer(m_capacity), m_head(), m_tail() + { + } + bool empty() { return m_head == m_tail; } + bool full() { return (m_head + 1) % m_capacity == m_tail; } + void clear() { m_head = m_tail = 0; } + void resize(size_t new_size) + { + auto current_size = size(); + if (new_size <= current_size) + return; + size_t new_capacity = new_size - 1; + std::vector<T> new_buffer(new_capacity); + /* Figure out faster way to copy*/ + size_t i = 0; + while (!empty()) + { + T& ele = front(); + pop(); + new_buffer[i++] = ele; + } + m_buffer = new_buffer; + m_capacity = new_capacity; + m_tail = 0; + m_head = current_size; + } + void push(T ele) + { + if (full()) + { + assert(size() == m_capacity - 1); + resize(size() + 1024); + } + m_buffer[m_head] = ele; + m_head = (m_head + 1) % m_capacity; + } + void push_front(T ele) + { + if (full()) + { + resize(size() + 1024); + } + if (m_tail == 0) + m_tail = m_capacity - 1; + else + m_tail--; + m_buffer[m_tail] = ele; + } + T& front() + { + assert(!empty()); + return m_buffer[m_tail]; + } + void pop() + { + assert(!empty()); + m_tail = (m_tail + 1) % m_capacity; + } + size_t size() + { + if (m_head < m_tail) + { + return m_capacity - m_tail + m_head; + } + else + { + return m_head - m_tail; + } + } + + /*Iterator over elements in queue.*/ + class iterator + { + size_t m_pos; + circular_queue<T>* m_queue; + public: + explicit iterator(size_t pos , circular_queue<T>* q) : m_pos(pos), m_queue(q) {} + iterator& operator++() + { + m_pos = (m_pos + 1) % m_queue->m_capacity; + return *this; + } + iterator operator++(int) + { + iterator retval = *this; + ++(*this); + return retval; + } + bool operator==(iterator other) const + { + return m_pos == other.m_pos; + } + bool operator!=(iterator other) const + { + return !(*this == other); + } + T& operator*() const + { + return m_queue->m_buffer[m_pos]; + } + }; + + iterator begin() + { + return iterator(m_tail, this); + } + iterator end() + { + return iterator(m_head, this); + } +private: + size_t m_capacity; + std::vector<T> m_buffer; + size_t m_head; + size_t m_tail; + + +}; + +/* Doubly linked list. Intrusive, + requires element to have m_next and m_prev pointers. +*/ +template<typename T> class doubly_linked_list +{ +public: + T* m_first; + T* m_last; + size_t m_count; + doubly_linked_list():m_first(),m_last(),m_count() + {} + void check() + { + assert(!m_first || !m_first->m_prev); + assert(!m_last || !m_last->m_next); + assert((!m_first && !m_last && m_count == 0) + || (m_first != 0 && m_last != 0 && m_count > 0)); + T* current = m_first; + for(size_t i=1; i< m_count;i++) + { + current = current->m_next; + } + assert(current == m_last); + current = m_last; + for (size_t i = 1; i < m_count; i++) + { + current = current->m_prev; + } + assert(current == m_first); + } + T* front() + { + return m_first; + } + size_t size() + { + return m_count; + } + void push_back(T* ele) + { + ele->m_prev = m_last; + if (m_last) + m_last->m_next = ele; + + ele->m_next = 0; + m_last = ele; + if (!m_first) + m_first = m_last; + + m_count++; + } + T* back() + { + return m_last; + } + bool empty() + { + return m_count == 0; + } + void pop_back() + { + m_last = m_last->m_prev; + if (m_last) + m_last->m_next = 0; + else + m_first = 0; + m_count--; + } + bool contains(T* ele) + { + if (!ele) + return false; + T* current = m_first; + while(current) + { + if(current == ele) + return true; + current = current->m_next; + } + return false; + } + + void erase(T* ele) + { + assert(contains(ele)); + + if (ele == m_first) + { + m_first = ele->m_next; + if (m_first) + m_first->m_prev = 0; + else + m_last = 0; + } + else if (ele == m_last) + { + assert(ele->m_prev); + m_last = ele->m_prev; + m_last->m_next = 0; + } + else + { + assert(ele->m_next); + assert(ele->m_prev); + ele->m_next->m_prev = ele->m_prev; + ele->m_prev->m_next = ele->m_next; + } + m_count--; + } +}; + +} diff --git a/tpool/tpool_win.cc b/tpool/tpool_win.cc new file mode 100644 index 00000000000..623b1b5577b --- /dev/null +++ b/tpool/tpool_win.cc @@ -0,0 +1,291 @@ +/* Copyright(C) 2019 MariaDB + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include <stdlib.h> +#include <tpool.h> +#include <windows.h> +#include <atomic> + +/** + Implementation of tpool/aio based on Windows native threadpool. +*/ + +namespace tpool +{ +/** + Pool, based on Windows native(Vista+) threadpool. +*/ +class thread_pool_win : public thread_pool +{ + /** + Handle per-thread init/term functions. + Since it is Windows that creates thread, and not us, + it is tricky. We employ thread local storage data + and check whether init function was called, inside every callback. + */ + struct tls_data + { + thread_pool_win *m_pool; + ~tls_data() + { + /* Call thread termination function. */ + if (!m_pool) + return; + + if (m_pool->m_worker_destroy_callback) + m_pool->m_worker_destroy_callback(); + + m_pool->m_thread_count--; + } + /** This needs to be called before every IO or simple task callback.*/ + void callback_prolog(thread_pool_win* pool) + { + assert(pool); + assert(!m_pool || (m_pool == pool)); + if (m_pool) + { + // TLS data already initialized. + return; + } + m_pool = pool; + m_pool->m_thread_count++; + // Call the thread init function. + if (m_pool->m_worker_init_callback) + m_pool->m_worker_init_callback(); + } + }; + + static thread_local struct tls_data tls_data; + /** Timer */ + class native_timer : public timer + { + std::mutex m_mtx; // protects against parallel execution + std::mutex m_shutdown_mtx; // protects m_on + PTP_TIMER m_ptp_timer; + callback_func m_func; + void *m_data; + thread_pool_win& m_pool; + int m_period; + bool m_on; + + static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE callback_instance, void *context, + PTP_TIMER callback_timer) + { + native_timer *timer= (native_timer *) context; + tls_data.callback_prolog(&timer->m_pool); + std::unique_lock<std::mutex> lk(timer->m_mtx, std::defer_lock); + if (!lk.try_lock()) + { + /* Do not try to run timers in parallel */ + return; + } + timer->m_func(timer->m_data); + if (timer->m_period) + timer->set_time(timer->m_period, timer->m_period); + } + + public: + native_timer(thread_pool_win& pool, callback_func func, void* data) : + m_mtx(), m_func(func), m_data(data), m_pool(pool), m_period(), m_on(true) + { + m_ptp_timer= CreateThreadpoolTimer(timer_callback, this, &pool.m_env); + } + void set_time(int initial_delay_ms, int period_ms) override + { + std::unique_lock<std::mutex> lk(m_shutdown_mtx); + if (!m_on) + return; + long long initial_delay = -10000LL * initial_delay_ms; + SetThreadpoolTimer(m_ptp_timer, NULL, 0, 0); + SetThreadpoolTimer(m_ptp_timer, (PFILETIME)&initial_delay, 0, 100); + m_period = period_ms; + } + void disarm() override + { + std::unique_lock<std::mutex> lk(m_shutdown_mtx); + m_on = false; + SetThreadpoolTimer(m_ptp_timer, NULL , 0, 0); + lk.unlock(); + /* Don't do it in timer callback, that will hang*/ + WaitForThreadpoolTimerCallbacks(m_ptp_timer, TRUE); + } + + ~native_timer() + { + disarm(); + CloseThreadpoolTimer(m_ptp_timer); + } + }; + /** AIO handler */ + class native_aio : public aio + { + thread_pool_win& m_pool; + + public: + native_aio(thread_pool_win &pool, int max_io) + : m_pool(pool) + { + } + + /** + Submit async IO. + */ + virtual int submit_io(aiocb* cb) override + { + memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED)); + + ULARGE_INTEGER uli; + uli.QuadPart = cb->m_offset; + cb->Offset = uli.LowPart; + cb->OffsetHigh = uli.HighPart; + cb->m_internal = this; + StartThreadpoolIo(cb->m_fh.m_ptp_io); + + BOOL ok; + if (cb->m_opcode == aio_opcode::AIO_PREAD) + ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + else + ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + + if (ok || (GetLastError() == ERROR_IO_PENDING)) + return 0; + + CancelThreadpoolIo(cb->m_fh.m_ptp_io); + return -1; + } + + /** + PTP_WIN32_IO_CALLBACK-typed function, required parameter for + CreateThreadpoolIo(). The user callback and other auxiliary data is put into + the extended OVERLAPPED parameter. + */ + static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, + ULONG io_result, ULONG_PTR nbytes, + PTP_IO io) + { + aiocb* cb = (aiocb*)overlapped; + native_aio* aio = (native_aio*)cb->m_internal; + tls_data.callback_prolog(&aio->m_pool); + cb->m_err = io_result; + cb->m_ret_len = (int)nbytes; + cb->m_internal_task.m_func = cb->m_callback; + cb->m_internal_task.m_group = cb->m_group; + cb->m_internal_task.m_arg = cb; + cb->m_internal_task.execute(); + } + + /** + Binds the file handle via CreateThreadpoolIo(). + */ + virtual int bind(native_file_handle& fd) override + { + fd.m_ptp_io = + CreateThreadpoolIo(fd.m_handle, io_completion_callback, 0, &(m_pool.m_env)); + if (fd.m_ptp_io) + return 0; + return -1; + } + + /** + Unbind the file handle via CloseThreadpoolIo. + */ + virtual int unbind(const native_file_handle& fd) override + { + if (fd.m_ptp_io) + CloseThreadpoolIo(fd.m_ptp_io); + return 0; + } + }; + + PTP_POOL m_ptp_pool; + TP_CALLBACK_ENVIRON m_env; + PTP_CLEANUP_GROUP m_cleanup; + const int TASK_CACHE_SIZE= 10000; + + struct task_cache_entry + { + thread_pool_win *m_pool; + task* m_task; + }; + cache<task_cache_entry> m_task_cache; + std::atomic<int> m_thread_count; +public: + thread_pool_win(int min_threads= 0, int max_threads= 0) + : m_task_cache(TASK_CACHE_SIZE),m_thread_count(0) + { + InitializeThreadpoolEnvironment(&m_env); + m_ptp_pool= CreateThreadpool(NULL); + m_cleanup= CreateThreadpoolCleanupGroup(); + SetThreadpoolCallbackPool(&m_env, m_ptp_pool); + SetThreadpoolCallbackCleanupGroup(&m_env, m_cleanup, 0); + if (min_threads) + SetThreadpoolThreadMinimum(m_ptp_pool, min_threads); + if (max_threads) + SetThreadpoolThreadMaximum(m_ptp_pool, max_threads); + } + ~thread_pool_win() + { + CloseThreadpoolCleanupGroupMembers(m_cleanup, TRUE, NULL); + CloseThreadpoolCleanupGroup(m_cleanup); + CloseThreadpool(m_ptp_pool); + + // Wait until all threads finished and TLS destructors ran. + while(m_thread_count) + Sleep(1); + } + /** + PTP_SIMPLE_CALLBACK-typed function, used by TrySubmitThreadpoolCallback() + */ + static void CALLBACK task_callback(PTP_CALLBACK_INSTANCE, void *param) + { + auto entry= (task_cache_entry *) param; + auto task= entry->m_task; + + tls_data.callback_prolog(entry->m_pool); + + entry->m_pool->m_task_cache.put(entry); + + task->execute(); + } + virtual void submit_task(task *task) override + { + auto entry= m_task_cache.get(); + task->add_ref(); + entry->m_pool= this; + entry->m_task= task; + if (!TrySubmitThreadpoolCallback(task_callback, entry, &m_env)) + abort(); + } + + aio *create_native_aio(int max_io) override + { + return new native_aio(*this, max_io); + } + + timer* create_timer(callback_func func, void* data) override + { + return new native_timer(*this, func, data); + } +}; + +thread_local struct thread_pool_win::tls_data thread_pool_win::tls_data; + +thread_pool *create_thread_pool_win(int min_threads, int max_threads) +{ + return new thread_pool_win(min_threads, max_threads); +} +} // namespace tpool |