summaryrefslogtreecommitdiff
path: root/tpool
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2019-10-29 18:17:24 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2019-11-15 16:50:22 +0100
commit00ee8d85c925846acc76df2a6fc7c67a062c2ea6 (patch)
tree3b8c71b64f089d3a625eeda5c3c188098d50b07b /tpool
parent7e08dd85d6271be4750c5989ccd5053df281d2aa (diff)
downloadmariadb-git-00ee8d85c925846acc76df2a6fc7c67a062c2ea6.tar.gz
MDEV-16264: Add threadpool library
The library is capable of - asynchronous execution of tasks (and optionally waiting for them) - asynchronous file IO This is implemented using libaio on Linux and completion ports on Windows. Elsewhere, async io is "simulated", which means worker threads are performing synchronous IO. - timers, scheduling work asynchronously in some point of the future. Also periodic timers are implemented.
Diffstat (limited to 'tpool')
-rw-r--r--tpool/CMakeLists.txt28
-rw-r--r--tpool/aio_linux.cc157
-rw-r--r--tpool/aio_simulated.cc180
-rw-r--r--tpool/aio_win.cc139
-rw-r--r--tpool/task.cc68
-rw-r--r--tpool/task_group.cc90
-rw-r--r--tpool/tpool.h241
-rw-r--r--tpool/tpool_generic.cc770
-rw-r--r--tpool/tpool_structs.h353
-rw-r--r--tpool/tpool_win.cc291
10 files changed, 2317 insertions, 0 deletions
diff --git a/tpool/CMakeLists.txt b/tpool/CMakeLists.txt
new file mode 100644
index 00000000000..fc33f9b6932
--- /dev/null
+++ b/tpool/CMakeLists.txt
@@ -0,0 +1,28 @@
+INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
+IF(WIN32)
+ SET(EXTRA_SOURCES tpool_win.cc aio_win.cc)
+ELSE()
+ SET(EXTRA_SOURCES aio_linux.cc)
+ENDIF()
+
+IF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
+ CHECK_INCLUDE_FILES (libaio.h HAVE_LIBAIO_H)
+ CHECK_LIBRARY_EXISTS(aio io_queue_init "" HAVE_LIBAIO)
+ IF(HAVE_LIBAIO_H AND HAVE_LIBAIO)
+ ADD_DEFINITIONS(-DLINUX_NATIVE_AIO=1)
+ LINK_LIBRARIES(aio)
+ ENDIF()
+ENDIF()
+
+ADD_LIBRARY(tpool STATIC
+ aio_simulated.cc
+ tpool_structs.h
+ CMakeLists.txt
+ tpool.h
+ tpool_generic.cc
+ task_group.cc
+ task.cc
+ ${EXTRA_SOURCES}
+)
+
+INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include) \ No newline at end of file
diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc
new file mode 100644
index 00000000000..0a4820a2412
--- /dev/null
+++ b/tpool/aio_linux.cc
@@ -0,0 +1,157 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+
+#include <stdlib.h>
+#include <signal.h>
+#include <assert.h>
+#include "tpool.h"
+#include <thread>
+#ifdef LINUX_NATIVE_AIO
+#include <libaio.h>
+#endif
+/*
+ Linux AIO implementation, based on native AIO.
+ Needs libaio.h and -laio at the compile time.
+
+ submit_io() is used to submit async IO.
+
+ There is a single thread, that collects the completion notification
+ with io_getevent(), and forwards io completion callback
+ the worker threadpool.
+*/
+namespace tpool
+{
+#ifdef LINUX_NATIVE_AIO
+
+class aio_linux : public aio
+{
+ int m_max_io_count;
+ thread_pool* m_pool;
+ io_context_t m_io_ctx;
+ bool m_in_shutdown;
+ std::thread m_getevent_thread;
+
+ static void getevent_thread_routine(aio_linux* aio)
+ {
+ for (;;)
+ {
+ io_event event;
+ struct timespec ts{0, 500000000};
+ int ret = io_getevents(aio->m_io_ctx, 1, 1, &event, &ts);
+
+ if (aio->m_in_shutdown)
+ break;
+
+ if (ret > 0)
+ {
+ aiocb* iocb = (aiocb*)event.obj;
+ long long res = event.res;
+ if (res < 0)
+ {
+ iocb->m_err = -res;
+ iocb->m_ret_len = 0;
+ }
+ else
+ {
+ iocb->m_ret_len = ret;
+ iocb->m_err = 0;
+ }
+
+ iocb->m_internal_task.m_func = iocb->m_callback;
+ iocb->m_internal_task.m_arg = iocb;
+ iocb->m_internal_task.m_group = iocb->m_group;
+ aio->m_pool->submit_task(&iocb->m_internal_task);
+ continue;
+ }
+ switch (ret)
+ {
+ case -EAGAIN:
+ usleep(1000);
+ continue;
+ case -EINTR:
+ case 0:
+ continue;
+ default:
+ fprintf(stderr, "io_getevents returned %d\n", ret);
+ abort();
+ }
+ }
+ }
+
+public:
+ aio_linux(io_context_t ctx, thread_pool* pool, size_t max_count)
+ : m_max_io_count(max_count), m_pool(pool), m_io_ctx(ctx),
+ m_in_shutdown(), m_getevent_thread(getevent_thread_routine, this)
+ {
+ }
+
+ ~aio_linux()
+ {
+ m_in_shutdown = true;
+ m_getevent_thread.join();
+ io_destroy(m_io_ctx);
+ }
+
+ // Inherited via aio
+ virtual int submit_io(aiocb* cb) override
+ {
+
+ if (cb->m_opcode == aio_opcode::AIO_PREAD)
+ io_prep_pread((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
+ cb->m_offset);
+ else
+ io_prep_pwrite((iocb *)cb, cb->m_fh, cb->m_buffer, cb->m_len,
+ cb->m_offset);
+
+ int ret;
+ ret = io_submit(m_io_ctx, 1, (iocb * *)& cb);
+ if (ret == 1)
+ return 0;
+ errno = -ret;
+ return -1;
+ }
+
+ // Inherited via aio
+ virtual int bind(native_file_handle& fd) override
+ {
+ return 0;
+ }
+ virtual int unbind(const native_file_handle& fd) override
+ {
+ return 0;
+ }
+};
+
+aio* create_linux_aio(thread_pool* pool, int max_io)
+{
+ io_context_t ctx;
+ memset(&ctx, 0, sizeof(ctx));
+ int ret = io_setup(max_io, &ctx);
+ if (ret)
+ {
+ fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
+ return nullptr;
+ }
+ return new aio_linux(ctx, pool, max_io);
+}
+#else
+aio* create_linux_aio(thread_pool* pool, int max_aio)
+{
+ return nullptr;
+}
+#endif
+}
diff --git a/tpool/aio_simulated.cc b/tpool/aio_simulated.cc
new file mode 100644
index 00000000000..41c4897336d
--- /dev/null
+++ b/tpool/aio_simulated.cc
@@ -0,0 +1,180 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#ifndef _WIN32
+#include <unistd.h> /* pread(), pwrite() */
+#endif
+#include "tpool.h"
+#include "tpool_structs.h"
+#include <stdlib.h>
+#include <string.h>
+
+namespace tpool
+{
+#ifdef _WIN32
+
+/*
+ In order to be able to execute synchronous IO even on file opened
+ with FILE_FLAG_OVERLAPPED, and to bypass to completion port,
+ we use valid event handle for the hEvent member of the OVERLAPPED structure,
+ with its low-order bit set.
+
+ ´See MSDN docs for GetQueuedCompletionStatus() for description of this trick.
+*/
+static DWORD fls_sync_io= FLS_OUT_OF_INDEXES;
+HANDLE win_get_syncio_event()
+{
+ HANDLE h;
+
+ h= (HANDLE) FlsGetValue(fls_sync_io);
+ if (h)
+ {
+ return h;
+ }
+ h= CreateEventA(NULL, FALSE, FALSE, NULL);
+ /* Set low-order bit to keeps I/O completion from being queued */
+ h= (HANDLE)((uintptr_t) h | 1);
+ FlsSetValue(fls_sync_io, h);
+ return h;
+}
+#include <WinIoCtl.h>
+static void __stdcall win_free_syncio_event(void *data)
+{
+ if (data)
+ {
+ CloseHandle((HANDLE) data);
+ }
+}
+
+struct WinIoInit
+{
+ WinIoInit()
+ {
+ fls_sync_io= FlsAlloc(win_free_syncio_event);
+ if(fls_sync_io == FLS_OUT_OF_INDEXES)
+ abort();
+ }
+ ~WinIoInit() { FlsFree(fls_sync_io); }
+};
+
+static WinIoInit win_io_init;
+
+
+int pread(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset)
+{
+ OVERLAPPED ov{};
+ ULARGE_INTEGER uli;
+ uli.QuadPart= offset;
+ ov.Offset= uli.LowPart;
+ ov.OffsetHigh= uli.HighPart;
+ ov.hEvent= win_get_syncio_event();
+
+ if (ReadFile(h, buf, (DWORD) count, 0, &ov) ||
+ (GetLastError() == ERROR_IO_PENDING))
+ {
+ DWORD n_bytes;
+ if (GetOverlappedResult(h, &ov, &n_bytes, TRUE))
+ return n_bytes;
+ }
+
+ return -1;
+}
+
+int pwrite(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset)
+{
+ OVERLAPPED ov{};
+ ULARGE_INTEGER uli;
+ uli.QuadPart= offset;
+ ov.Offset= uli.LowPart;
+ ov.OffsetHigh= uli.HighPart;
+ ov.hEvent= win_get_syncio_event();
+
+ if (WriteFile(h, buf, (DWORD) count, 0, &ov) ||
+ (GetLastError() == ERROR_IO_PENDING))
+ {
+ DWORD n_bytes;
+ if (GetOverlappedResult(h, &ov, &n_bytes, TRUE))
+ return n_bytes;
+ }
+ return -1;
+}
+#endif
+
+/**
+ Simulated AIO.
+
+ Executes IO synchronously in worker pool
+ and then calls the completion routine.
+*/
+class simulated_aio : public aio
+{
+ thread_pool *m_pool;
+
+public:
+ simulated_aio(thread_pool *tp)
+ : m_pool(tp)
+ {
+ }
+
+ static void simulated_aio_callback(void *param)
+ {
+ aiocb *cb= (aiocb *) param;
+ int ret_len;
+ int err= 0;
+ switch (cb->m_opcode)
+ {
+ case aio_opcode::AIO_PREAD:
+ ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
+ break;
+ case aio_opcode::AIO_PWRITE:
+ ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
+ break;
+ default:
+ abort();
+ }
+ if (ret_len < 0)
+ {
+#ifdef _WIN32
+ err= GetLastError();
+#else
+ err= errno;
+#endif
+ }
+ cb->m_ret_len = ret_len;
+ cb->m_err = err;
+ cb->m_callback(cb);
+ }
+
+ virtual int submit_io(aiocb *aiocb) override
+ {
+ aiocb->m_internal_task.m_func = simulated_aio_callback;
+ aiocb->m_internal_task.m_arg = aiocb;
+ aiocb->m_internal_task.m_group = aiocb->m_group;
+ m_pool->submit_task(&aiocb->m_internal_task);
+ return 0;
+ }
+
+ virtual int bind(native_file_handle &fd) override { return 0; }
+ virtual int unbind(const native_file_handle &fd) override { return 0; }
+};
+
+aio *create_simulated_aio(thread_pool *tp)
+{
+ return new simulated_aio(tp);
+}
+
+} // namespace tpool \ No newline at end of file
diff --git a/tpool/aio_win.cc b/tpool/aio_win.cc
new file mode 100644
index 00000000000..b44f705bd1e
--- /dev/null
+++ b/tpool/aio_win.cc
@@ -0,0 +1,139 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include <algorithm>
+#include <assert.h>
+#include <condition_variable>
+#include <iostream>
+#include <limits.h>
+#include <mutex>
+#include <queue>
+#include <stack>
+#include <thread>
+#include <vector>
+#include <tpool.h>
+
+namespace tpool
+{
+
+/*
+ Windows AIO implementation, completion port based.
+ A single thread collects the completion notification with
+ GetQueuedCompletionStatus(), and forwards io completion callback
+ the worker threadpool
+*/
+class tpool_generic_win_aio : public aio
+{
+ /* Thread that does collects completion status from the completion port. */
+ std::thread m_thread;
+
+ /* IOCP Completion port.*/
+ HANDLE m_completion_port;
+
+ /* The worker pool where completion routine is executed, as task. */
+ thread_pool* m_pool;
+public:
+ tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool)
+ {
+ m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
+ m_thread = std::thread(aio_completion_thread_proc, this);
+ }
+
+ /**
+ Task to be executed in the work pool.
+ */
+ static void io_completion_task(void* data)
+ {
+ auto cb = (aiocb*)data;
+ cb->execute_callback();
+ }
+
+ void completion_thread_work()
+ {
+ for (;;)
+ {
+ DWORD n_bytes;
+ aiocb* aiocb;
+ ULONG_PTR key;
+ if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key,
+ (LPOVERLAPPED*)& aiocb, INFINITE))
+ break;
+
+ aiocb->m_err = 0;
+ aiocb->m_ret_len = n_bytes;
+
+ if (n_bytes != aiocb->m_len)
+ {
+ if (GetOverlappedResult(aiocb->m_fh, aiocb,
+ (LPDWORD)& aiocb->m_ret_len, FALSE))
+ {
+ aiocb->m_err = GetLastError();
+ }
+ }
+ aiocb->m_internal_task.m_func = aiocb->m_callback;
+ aiocb->m_internal_task.m_arg = aiocb;
+ aiocb->m_internal_task.m_group = aiocb->m_group;
+ m_pool->submit_task(&aiocb->m_internal_task);
+ }
+ }
+
+ static void aio_completion_thread_proc(tpool_generic_win_aio* aio)
+ {
+ aio->completion_thread_work();
+ }
+
+ ~tpool_generic_win_aio()
+ {
+ if (m_completion_port)
+ CloseHandle(m_completion_port);
+ m_thread.join();
+ }
+
+ virtual int submit_io(aiocb* cb) override
+ {
+ memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
+ cb->m_internal = this;
+ ULARGE_INTEGER uli;
+ uli.QuadPart = cb->m_offset;
+ cb->Offset = uli.LowPart;
+ cb->OffsetHigh = uli.HighPart;
+
+ BOOL ok;
+ if (cb->m_opcode == aio_opcode::AIO_PREAD)
+ ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
+ else
+ ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
+
+ if (ok || (GetLastError() == ERROR_IO_PENDING))
+ return 0;
+ return -1;
+ }
+
+ // Inherited via aio
+ virtual int bind(native_file_handle& fd) override
+ {
+ return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0
+ : GetLastError();
+ }
+ virtual int unbind(const native_file_handle& fd) override { return 0; }
+};
+
+aio* create_win_aio(thread_pool* pool, int max_io)
+{
+ return new tpool_generic_win_aio(pool, max_io);
+}
+
+} // namespace tpool
diff --git a/tpool/task.cc b/tpool/task.cc
new file mode 100644
index 00000000000..6d456aa6f30
--- /dev/null
+++ b/tpool/task.cc
@@ -0,0 +1,68 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include <tpool.h>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <tpool_structs.h>
+
+namespace tpool
+{
+ task::task(callback_func func, void* arg, task_group* group) :
+ m_func(func), m_arg(arg), m_group(group) {}
+
+ void task::execute()
+ {
+ if (m_group)
+ {
+ /* Executing in a group (limiting concurrency).*/
+ m_group->execute(this);
+ }
+ else
+ {
+ /* Execute directly. */
+ m_func(m_arg);
+ release();
+ }
+ }
+
+ /* Task that provide wait() operation. */
+ waitable_task::waitable_task(callback_func func, void* arg, task_group* group) :
+ task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(){}
+
+ void waitable_task::add_ref()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_ref_count++;
+ }
+
+ void waitable_task::release()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_ref_count--;
+ if (!m_ref_count && m_waiter_count)
+ m_cv.notify_all();
+ }
+ void waitable_task::wait()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_waiter_count++;
+ while (m_ref_count)
+ m_cv.wait(lk);
+ m_waiter_count--;
+ }
+
+} \ No newline at end of file
diff --git a/tpool/task_group.cc b/tpool/task_group.cc
new file mode 100644
index 00000000000..d65c658a198
--- /dev/null
+++ b/tpool/task_group.cc
@@ -0,0 +1,90 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include <tpool.h>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <tpool_structs.h>
+#include <thread>
+namespace tpool
+{
+ task_group::task_group(unsigned int max_concurrency) :
+ m_queue(8),
+ m_mtx(),
+ m_tasks_running(),
+ m_max_concurrent_tasks(max_concurrency)
+ {};
+
+ void task_group::set_max_tasks(unsigned int max_concurrency)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_max_concurrent_tasks = max_concurrency;
+ }
+ void task_group::execute(task* t)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_tasks_running == m_max_concurrent_tasks)
+ {
+ /* Queue for later execution by another thread.*/
+ m_queue.push(t);
+ return;
+ }
+ m_tasks_running++;
+ for (;;)
+ {
+ lk.unlock();
+ if (t)
+ {
+ t->m_func(t->m_arg);
+ t->release();
+ }
+ lk.lock();
+
+ if (m_queue.empty())
+ break;
+ t = m_queue.front();
+ m_queue.pop();
+ }
+ m_tasks_running--;
+ }
+
+ void task_group::cancel_pending(task* t)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (!t)
+ m_queue.clear();
+ for (auto it = m_queue.begin(); it != m_queue.end(); it++)
+ {
+ if (*it == t)
+ {
+ (*it)->release();
+ (*it) = nullptr;
+ }
+ }
+ }
+
+ task_group::~task_group()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_queue.clear();
+ while (m_tasks_running)
+ {
+ lk.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ lk.lock();
+ }
+ }
+} \ No newline at end of file
diff --git a/tpool/tpool.h b/tpool/tpool.h
new file mode 100644
index 00000000000..7b504c5ef57
--- /dev/null
+++ b/tpool/tpool.h
@@ -0,0 +1,241 @@
+/* Copyright(C) 2019 MariaDB
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#pragma once
+#include <memory> /* unique_ptr */
+#include <condition_variable>
+#include <mutex>
+#include <atomic>
+#include <tpool_structs.h>
+#ifdef LINUX_NATIVE_AIO
+#include <libaio.h>
+#endif
+#ifdef _WIN32
+#ifndef NOMINMAX
+#define NOMINMAX
+#endif
+#include <windows.h>
+/**
+ Windows-specific native file handle struct.
+ Apart from the actual handle, contains PTP_IO
+ used by the Windows threadpool.
+*/
+struct native_file_handle
+{
+ HANDLE m_handle;
+ PTP_IO m_ptp_io;
+ native_file_handle(){};
+ native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {}
+ operator HANDLE() const { return m_handle; }
+};
+#else
+#include <unistd.h>
+typedef int native_file_handle;
+#endif
+
+namespace tpool
+{
+/**
+ Task callback function
+ */
+typedef void (*callback_func)(void *);
+class task;
+
+/* A class that can be used e.g for
+restricting concurrency for specific class of tasks. */
+
+class task_group
+{
+private:
+ circular_queue<task*> m_queue;
+ std::mutex m_mtx;
+ std::condition_variable m_cv;
+ unsigned int m_tasks_running;
+ unsigned int m_max_concurrent_tasks;
+public:
+ task_group(unsigned int max_concurrency= 100000);
+ void set_max_tasks(unsigned int max_concurrent_tasks);
+ void execute(task* t);
+ void cancel_pending(task *t);
+ ~task_group();
+};
+
+
+class task
+{
+public:
+ callback_func m_func;
+ void *m_arg;
+ task_group* m_group;
+ virtual void add_ref() {};
+ virtual void release() {};
+ task() {};
+ task(callback_func func, void* arg, task_group* group = nullptr);
+ void* get_arg() { return m_arg; }
+ callback_func get_func() { return m_func; }
+ virtual void execute();
+ virtual ~task() {}
+};
+
+class waitable_task :public task
+{
+ std::mutex m_mtx;
+ std::condition_variable m_cv;
+ int m_ref_count;
+ int m_waiter_count;
+public:
+ waitable_task(callback_func func, void* arg, task_group* group = nullptr);
+ void add_ref() override;
+ void release() override;
+ bool is_running() { return m_ref_count > 0; }
+ bool get_ref_count() {return m_ref_count;}
+ void wait();
+ virtual ~waitable_task() {};
+};
+enum class aio_opcode
+{
+ AIO_PREAD,
+ AIO_PWRITE
+};
+const int MAX_AIO_USERDATA_LEN= 40;
+struct aiocb;
+
+/** IO control block, includes parameters for the IO, and the callback*/
+
+struct aiocb
+#ifdef _WIN32
+ :OVERLAPPED
+#elif defined LINUX_NATIVE_AIO
+ :iocb
+#endif
+{
+ native_file_handle m_fh;
+ aio_opcode m_opcode;
+ unsigned long long m_offset;
+ void *m_buffer;
+ unsigned int m_len;
+ callback_func m_callback;
+ task_group* m_group;
+ /* Returned length and error code*/
+ int m_ret_len;
+ int m_err;
+ void *m_internal;
+ task m_internal_task;
+ char m_userdata[MAX_AIO_USERDATA_LEN];
+
+ aiocb() : m_internal_task(nullptr, nullptr)
+ {}
+ void execute_callback()
+ {
+ task t(m_callback, this,m_group);
+ t.execute();
+ }
+};
+
+
+/**
+ AIO interface
+*/
+class aio
+{
+public:
+ /**
+ Submit asyncronous IO.
+ On completion, cb->m_callback is executed.
+ */
+ virtual int submit_io(aiocb *cb)= 0;
+ /** "Bind" file to AIO handler (used on Windows only) */
+ virtual int bind(native_file_handle &fd)= 0;
+ /** "Unind" file to AIO handler (used on Windows only) */
+ virtual int unbind(const native_file_handle &fd)= 0;
+ virtual ~aio(){};
+};
+
+class timer
+{
+public:
+ virtual void set_time(int initial_delay_ms, int period_ms) = 0;
+ virtual void disarm() = 0;
+ virtual ~timer(){}
+};
+
+class thread_pool;
+
+extern aio *create_simulated_aio(thread_pool *tp);
+
+class thread_pool
+{
+protected:
+ /* AIO handler */
+ std::unique_ptr<aio> m_aio;
+ virtual aio *create_native_aio(int max_io)= 0;
+
+ /**
+ Functions to be called at worker thread start/end
+ can be used for example to set some TLS variables
+ */
+ void (*m_worker_init_callback)(void);
+ void (*m_worker_destroy_callback)(void);
+
+public:
+ thread_pool() : m_aio(), m_worker_init_callback(), m_worker_destroy_callback()
+ {
+ }
+ virtual void submit_task(task *t)= 0;
+ virtual timer* create_timer(callback_func func, void *data=nullptr) = 0;
+ void set_thread_callbacks(void (*init)(), void (*destroy)())
+ {
+ m_worker_init_callback= init;
+ m_worker_destroy_callback= destroy;
+ }
+ int configure_aio(bool use_native_aio, int max_io)
+ {
+ if (use_native_aio)
+ m_aio.reset(create_native_aio(max_io));
+ if (!m_aio)
+ m_aio.reset(create_simulated_aio(this));
+ return !m_aio ? -1 : 0;
+ }
+ void disable_aio()
+ {
+ m_aio.reset();
+ }
+ int bind(native_file_handle &fd) { return m_aio->bind(fd); }
+ void unbind(const native_file_handle &fd) { m_aio->unbind(fd); }
+ int submit_io(aiocb *cb) { return m_aio->submit_io(cb); }
+ virtual ~thread_pool() {}
+};
+const int DEFAULT_MIN_POOL_THREADS= 1;
+const int DEFAULT_MAX_POOL_THREADS= 500;
+extern thread_pool *
+create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS,
+ int max_threads= DEFAULT_MAX_POOL_THREADS);
+#ifdef _WIN32
+extern thread_pool *
+create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS,
+ int max_threads= DEFAULT_MAX_POOL_THREADS);
+
+/*
+ Helper functions, to execute pread/pwrite even if file is
+ opened with FILE_FLAG_OVERLAPPED, and bound to completion
+ port.
+*/
+int pwrite(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset);
+int pread(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset);
+HANDLE win_get_syncio_event();
+#endif
+} // namespace tpool
diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc
new file mode 100644
index 00000000000..2a4971cd555
--- /dev/null
+++ b/tpool/tpool_generic.cc
@@ -0,0 +1,770 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include <limits.h>
+#include <algorithm>
+#include <assert.h>
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <iostream>
+#include <limits.h>
+#include <mutex>
+#include <queue>
+#include <stack>
+#include <thread>
+#include <vector>
+#include "tpool.h"
+#include <assert.h>
+#include <my_global.h>
+#include <my_dbug.h>
+#include <thr_timer.h>
+#include <stdlib.h>
+
+namespace tpool
+{
+
+#ifdef __linux__
+ extern aio* create_linux_aio(thread_pool* tp, int max_io);
+#endif
+#ifdef _WIN32
+ extern aio* create_win_aio(thread_pool* tp, int max_io);
+#endif
+
+ static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
+ static const int OVERSUBSCRIBE_FACTOR = 2;
+
+/**
+ Implementation of generic threadpool.
+ This threadpool consists of the following components
+
+ - The task queue. This queue is populated by submit()
+ - Worker that execute the work items.
+ - Timer thread that takes care of pool health
+
+ The task queue is populated by submit() method.
+ on submit(), a worker thread can be woken, or created
+ to execute tasks.
+
+ The timer thread watches if work items are being dequeued, and if not,
+ this can indicate potential deadlock.
+ Thus the timer thread can also wake or create a thread, to ensure some progress.
+
+ Optimizations:
+
+ - worker threads that are idle for long time will shutdown.
+ - worker threads are woken in LIFO order, which minimizes context switching
+ and also ensures that idle timeout works well. LIFO wakeup order ensures
+ that active threads stay active, and idle ones stay idle.
+
+ - to minimize spurious wakeups, some items are not put into the queue. Instead
+ submit() will pass the data directly to the thread it woke up.
+*/
+
+/**
+ Worker wakeup flags.
+*/
+enum worker_wake_reason
+{
+ WAKE_REASON_NONE,
+ WAKE_REASON_TASK,
+ WAKE_REASON_SHUTDOWN
+};
+
+
+
+/* A per-worker thread structure.*/
+struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
+{
+ /** Condition variable to wakeup this worker.*/
+ std::condition_variable m_cv;
+
+ /** Reason why worker was woken. */
+ worker_wake_reason m_wake_reason;
+
+ /**
+ If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
+ */
+ task* m_task;
+
+ /** Struct is member of intrusive doubly linked list */
+ worker_data* m_prev;
+ worker_data* m_next;
+
+ /* Current state of the worker.*/
+ enum state
+ {
+ NONE = 0,
+ EXECUTING_TASK = 1,
+ LONG_TASK = 2
+ };
+
+ int m_state;
+
+ bool is_executing_task()
+ {
+ return m_state & EXECUTING_TASK;
+ }
+ bool is_long_task()
+ {
+ return m_state & LONG_TASK;
+ }
+ std::chrono::system_clock::time_point m_task_start_time;
+ worker_data() :
+ m_cv(),
+ m_wake_reason(WAKE_REASON_NONE),
+ m_task(),
+ m_prev(),
+ m_next(),
+ m_state(NONE),
+ m_task_start_time()
+ {}
+
+ /*Define custom new/delete because of overaligned structure. */
+ void* operator new(size_t size)
+ {
+#ifdef _WIN32
+ return _aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
+#else
+ void* ptr;
+ int ret = posix_memalign(&ptr, CPU_LEVEL1_DCACHE_LINESIZE, size);
+ return ret ? 0 : ptr;
+#endif
+ }
+ void operator delete(void* p)
+ {
+#ifdef _WIN32
+ _aligned_free(p);
+#else
+ free(p);
+#endif
+ }
+};
+
+class thread_pool_generic : public thread_pool
+{
+ /** Cache for per-worker structures */
+ cache<worker_data> m_thread_data_cache;
+
+ /** The task queue */
+ circular_queue<task*> m_task_queue;
+
+ /* List of standby (idle) workers.*/
+ doubly_linked_list<worker_data> m_standby_threads;
+
+ /** List of threads that are executing tasks. */
+ doubly_linked_list<worker_data> m_active_threads;
+
+ /* Mutex that protects the whole struct, most importantly
+ the standby threads list, and task queue. */
+ std::mutex m_mtx;
+
+ /** Timeout after which idle worker shuts down.*/
+ std::chrono::milliseconds m_thread_timeout;
+
+ /** How often should timer wakeup.*/
+ std::chrono::milliseconds m_timer_interval;
+
+ /** Another condition variable, used in pool shutdown-*/
+ std::condition_variable m_cv_no_threads;
+
+ /** Condition variable for the timer thread. Signaled on shutdown.*/
+ std::condition_variable m_cv_timer;
+
+ /** Overall number of enqueues*/
+ unsigned long long m_tasks_enqueued;
+ /** Overall number of dequeued tasks. */
+ unsigned long long m_tasks_dequeued;
+
+ /**Statistic related, number of worker thread wakeups.*/
+ int m_wakeups;
+
+ /**
+ Statistic related, number of spurious thread wakeups
+ (i.e thread woke up, and the task queue is empty)
+ */
+ int m_spurious_wakeups;
+
+ /** The desired concurrency. This number of workers should be actively executing.*/
+ unsigned int m_concurrency;
+
+ /** True, if threadpool is being shutdown, false otherwise */
+ bool m_in_shutdown;
+
+ /** time point when timer last ran, used as a coarse clock. */
+ std::chrono::system_clock::time_point m_timestamp;
+
+ /** Number of long running tasks. The long running tasks are excluded when
+ adjusting concurrency */
+ int m_long_tasks_count;
+
+ /** Last time thread was created*/
+ std::chrono::system_clock::time_point m_last_thread_creation;
+
+ /** Minimumum number of threads in this pool.*/
+ unsigned int m_min_threads;
+
+ /** Maximimum number of threads in this pool. */
+ unsigned int m_max_threads;
+
+ /* Maintainence related statistics (see maintainence()) */
+ size_t m_last_thread_count;
+ unsigned long long m_last_activity;
+ std::unique_ptr<timer> m_maintaince_timer_task;
+
+ void worker_main(worker_data *thread_data);
+ void worker_end(worker_data* thread_data);
+
+ /* Checks threadpool responsiveness, adjusts thread_counts */
+ void maintainence();
+ static void maintainence_func(void* arg)
+ {
+ ((thread_pool_generic *)arg)->maintainence();
+ }
+ bool add_thread();
+ bool wake(worker_wake_reason reason, task *t = nullptr);
+ void wake_or_create_thread();
+ bool get_task(worker_data *thread_var, task **t);
+ bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
+ worker_data *thread_var);
+ void cancel_pending(task* t);
+
+ size_t thread_count()
+ {
+ return m_active_threads.size() + m_standby_threads.size();
+ }
+public:
+ thread_pool_generic(int min_threads, int max_threads);
+ ~thread_pool_generic();
+ void submit_task(task *task) override;
+ virtual aio *create_native_aio(int max_io) override
+ {
+#ifdef _WIN32
+ return create_win_aio(this, max_io);
+#elif defined(__linux__)
+ return create_linux_aio(this,max_io);
+#else
+ return nullptr;
+#endif
+ }
+
+ class timer_generic : public thr_timer_t, public timer
+ {
+ thread_pool_generic* m_pool;
+ waitable_task m_task;
+ callback_func m_callback;
+ void* m_data;
+ int m_period;
+ std::mutex m_mtx;
+ bool m_on;
+ std::atomic<bool> m_running;
+
+ void run()
+ {
+ /*
+ In rare cases, multiple callbacks can be scheduled,
+ e.g with set_time(0,0) in a loop.
+ We do not allow parallel execution, as user is not prepared.
+ */
+ bool expected = false;
+ if (!m_running.compare_exchange_strong(expected, true))
+ return;
+
+ m_callback(m_data);
+
+ m_running = false;
+
+ if (m_pool && m_period)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_on)
+ {
+ thr_timer_end(this);
+ thr_timer_settime(this, 1000ULL * m_period);
+ }
+ }
+ }
+
+ static void execute(void* arg)
+ {
+ auto timer = (timer_generic*)arg;
+ timer->run();
+ }
+
+ static void submit_task(void* arg)
+ {
+ timer_generic* timer = (timer_generic*)arg;
+ timer->m_pool->submit_task(&timer->m_task);
+ }
+
+ public:
+ timer_generic(callback_func func, void* data, thread_pool_generic * pool):
+ m_pool(pool),
+ m_task(timer_generic::execute,this),
+ m_callback(func),m_data(data),m_period(0),m_mtx(),
+ m_on(true),m_running()
+ {
+ if (pool)
+ {
+ /* EXecute callback in threadpool*/
+ thr_timer_init(this, submit_task, this);
+ }
+ else
+ {
+ /* run in "timer" thread */
+ thr_timer_init(this, m_task.get_func(), m_task.get_arg());
+ }
+ }
+
+ void set_time(int initial_delay_ms, int period_ms) override
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (!m_on)
+ return;
+ thr_timer_end(this);
+ if (!m_pool)
+ thr_timer_set_period(this, 1000ULL * period_ms);
+ else
+ m_period = period_ms;
+ thr_timer_settime(this, 1000ULL * initial_delay_ms);
+ }
+
+ void disarm() override
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_on = false;
+ thr_timer_end(this);
+ lk.unlock();
+
+ if (m_task.m_group)
+ {
+ m_task.m_group->cancel_pending(&m_task);
+ }
+ if (m_pool)
+ {
+ m_pool->cancel_pending(&m_task);
+ }
+ m_task.wait();
+ }
+
+ virtual ~timer_generic()
+ {
+ disarm();
+ }
+ };
+
+ virtual timer* create_timer(callback_func func, void *data) override
+ {
+ return new timer_generic(func, data, this);
+ }
+};
+
+void thread_pool_generic::cancel_pending(task* t)
+{
+ std::unique_lock <std::mutex> lk(m_mtx);
+ for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
+ {
+ if (*it == t)
+ {
+ t->release();
+ *it = nullptr;
+ }
+ }
+}
+/**
+ Register worker in standby list, and wait to be woken.
+
+ @return
+ true - thread was woken
+ false - idle wait timeout exceeded (the current thread need to shutdown)
+*/
+bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
+ worker_data *thread_data)
+{
+ assert(m_task_queue.empty());
+ assert(!m_in_shutdown);
+
+ thread_data->m_wake_reason= WAKE_REASON_NONE;
+ m_active_threads.erase(thread_data);
+ m_standby_threads.push_back(thread_data);
+
+ for (;;)
+ {
+ thread_data->m_cv.wait_for(lk, m_thread_timeout);
+
+ if (thread_data->m_wake_reason != WAKE_REASON_NONE)
+ {
+ /* Woke up not due to timeout.*/
+ return true;
+ }
+
+ if (thread_count() <= m_min_threads)
+ {
+ /* Do not shutdown thread, maintain required minimum of worker
+ threads.*/
+ continue;
+ }
+
+ /*
+ Woke up due to timeout, remove this thread's from the standby list. In
+ all other cases where it is signaled it is removed by the signaling
+ thread.
+ */
+ m_standby_threads.erase(thread_data);
+ m_active_threads.push_back(thread_data);
+ return false;
+ }
+}
+
+/**
+ Workers "get next task" routine.
+
+ A task can be handed over to the current thread directly during submit().
+ if thread_var->m_wake_reason == WAKE_REASON_TASK.
+
+ Or a task can be taken from the task queue.
+ In case task queue is empty, the worker thread will park (wait for wakeup).
+*/
+bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+
+ if (thread_var->is_long_task() && m_long_tasks_count)
+ m_long_tasks_count--;
+
+ thread_var->m_state = worker_data::NONE;
+
+ if (m_task_queue.empty())
+ {
+ if (m_in_shutdown)
+ return false;
+
+ if (!wait_for_tasks(lk, thread_var))
+ return false;
+
+ /* Task was handed over directly by signaling thread.*/
+ if (thread_var->m_wake_reason == WAKE_REASON_TASK)
+ {
+ *t= thread_var->m_task;
+ goto end;
+ }
+
+ if (m_task_queue.empty())
+ return false;
+ }
+
+ /* Dequeue from the task queue.*/
+ *t= m_task_queue.front();
+ m_task_queue.pop();
+ m_tasks_dequeued++;
+
+end:
+ thread_var->m_state |= worker_data::EXECUTING_TASK;
+ thread_var->m_task_start_time = m_timestamp;
+ return true;
+}
+
+/** Worker thread shutdown routine. */
+void thread_pool_generic::worker_end(worker_data* thread_data)
+{
+ std::lock_guard<std::mutex> lk(m_mtx);
+ m_active_threads.erase(thread_data);
+ m_thread_data_cache.put(thread_data);
+
+ if (!thread_count() && m_in_shutdown)
+ {
+ /* Signal the destructor that no more threads are left. */
+ m_cv_no_threads.notify_all();
+ }
+}
+
+/* The worker get/execute task loop.*/
+void thread_pool_generic::worker_main(worker_data *thread_var)
+{
+ task* task;
+
+ if(m_worker_init_callback)
+ m_worker_init_callback();
+
+ while (get_task(thread_var, &task) && task)
+ {
+ task->execute();
+ }
+
+ if (m_worker_destroy_callback)
+ m_worker_destroy_callback();
+
+ worker_end(thread_var);
+}
+
+/*
+ Periodic job to fix thread count and concurrency,
+ in case of long tasks, etc
+*/
+void thread_pool_generic::maintainence()
+{
+ /*
+ If pool is busy (i.e the its mutex is currently locked), we can
+ skip the maintainence task, some times, to lower mutex contention
+ */
+ static int skip_counter;
+ const int MAX_SKIPS = 10;
+ std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
+ if (skip_counter == MAX_SKIPS)
+ {
+ lk.lock();
+ }
+ else if (!lk.try_lock())
+ {
+ skip_counter++;
+ return;
+ }
+
+ skip_counter = 0;
+
+ m_timestamp = std::chrono::system_clock::now();
+
+ m_long_tasks_count = 0;
+
+ if (m_task_queue.empty())
+ {
+ m_last_activity = m_tasks_dequeued + m_wakeups;
+ return;
+ }
+
+ for (auto thread_data = m_active_threads.front();
+ thread_data;
+ thread_data = thread_data->m_next)
+ {
+ if (thread_data->is_executing_task() &&
+ (thread_data->is_long_task()
+ || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
+ {
+ thread_data->m_state |= worker_data::LONG_TASK;
+ m_long_tasks_count++;
+ }
+ }
+ size_t thread_cnt = (int)thread_count();
+ if (m_active_threads.size() - m_long_tasks_count < m_concurrency*OVERSUBSCRIBE_FACTOR)
+ {
+ wake_or_create_thread();
+ return;
+ }
+ if (m_last_activity == m_tasks_dequeued + m_wakeups &&
+ m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
+ {
+ // no progress made since last iteration. create new
+ // thread
+ add_thread();
+ }
+ m_last_activity = m_tasks_dequeued + m_wakeups;
+ m_last_thread_count= thread_cnt;
+}
+
+/*
+ Heuristic used for thread creation throttling.
+ Returns interval in milliseconds between thread creation
+ (depending on number of threads already in the pool, and
+ desired concurrency level)
+*/
+static int throttling_interval_ms(size_t n_threads,size_t concurrency)
+{
+ if (n_threads < concurrency*4)
+ return 0;
+
+ if (n_threads < concurrency*8)
+ return 50;
+
+ if (n_threads < concurrency*16)
+ return 100;
+
+ return 200;
+}
+
+/* Create a new worker.*/
+bool thread_pool_generic::add_thread()
+{
+ size_t n_threads = thread_count();
+
+ if (n_threads >= m_max_threads)
+ return false;
+
+ if (n_threads >= m_min_threads)
+ {
+ auto now = std::chrono::system_clock::now();
+ if (now - m_last_thread_creation <
+ std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
+ {
+ /* Throttle thread creation.*/
+ return false;
+ }
+ }
+
+ worker_data *thread_data = m_thread_data_cache.get();
+ m_active_threads.push_back(thread_data);
+ try
+ {
+ std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
+ m_last_thread_creation = std::chrono::system_clock::now();
+ thread.detach();
+ }
+ catch (std::system_error& e)
+ {
+ m_active_threads.erase(thread_data);
+ m_thread_data_cache.put(thread_data);
+ static bool warning_written;
+ if (!warning_written)
+ {
+ fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
+ "current number of threads in pool %zu\n", e.what(), thread_count());
+ warning_written = true;
+ }
+ return false;
+ }
+ return true;
+}
+
+/** Wake a standby thread, and hand the given task over to this thread. */
+bool thread_pool_generic::wake(worker_wake_reason reason, task *t)
+{
+ assert(reason != WAKE_REASON_NONE);
+
+ if (m_standby_threads.empty())
+ return false;
+ auto var= m_standby_threads.back();
+ m_standby_threads.pop_back();
+ m_active_threads.push_back(var);
+ assert(var->m_wake_reason == WAKE_REASON_NONE);
+ var->m_wake_reason= reason;
+ var->m_cv.notify_one();
+ if (t)
+ {
+ var->m_task= t;
+ }
+ m_wakeups++;
+ return true;
+}
+
+
+thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
+ m_thread_data_cache(max_threads),
+ m_task_queue(10000),
+ m_standby_threads(),
+ m_active_threads(),
+ m_mtx(),
+ m_thread_timeout(std::chrono::milliseconds(60000)),
+ m_timer_interval(std::chrono::milliseconds(400)),
+ m_cv_no_threads(),
+ m_cv_timer(),
+ m_tasks_enqueued(),
+ m_tasks_dequeued(),
+ m_wakeups(),
+ m_spurious_wakeups(),
+ m_concurrency(std::thread::hardware_concurrency()),
+ m_in_shutdown(),
+ m_timestamp(),
+ m_long_tasks_count(),
+ m_last_thread_creation(),
+ m_min_threads(min_threads),
+ m_max_threads(max_threads),
+ m_last_thread_count(),
+ m_last_activity(),
+ m_maintaince_timer_task()
+{
+
+ if (m_max_threads < m_concurrency)
+ m_concurrency = m_max_threads;
+ if (m_min_threads > m_concurrency)
+ m_concurrency = min_threads;
+ if (!m_concurrency)
+ m_concurrency = 1;
+
+ if (min_threads < max_threads)
+ {
+ m_maintaince_timer_task.reset(new timer_generic(thread_pool_generic::maintainence_func, this, nullptr));
+ m_maintaince_timer_task->set_time((int)m_timer_interval.count(), (int)m_timer_interval.count());
+ }
+}
+
+
+void thread_pool_generic::wake_or_create_thread()
+{
+ assert(!m_task_queue.empty());
+ if (!m_standby_threads.empty())
+ {
+ auto t= m_task_queue.front();
+ m_task_queue.pop();
+ wake(WAKE_REASON_TASK, t);
+ }
+ else
+ {
+ add_thread();
+ }
+}
+
+
+/** Submit a new task*/
+void thread_pool_generic::submit_task(task* task)
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_in_shutdown)
+ return;
+ task->add_ref();
+ m_tasks_enqueued++;
+ m_task_queue.push(task);
+
+ if (m_active_threads.size() - m_long_tasks_count < m_concurrency *OVERSUBSCRIBE_FACTOR)
+ wake_or_create_thread();
+}
+
+/**
+ Wake up all workers, and wait until they are gone
+ Stop the timer.
+*/
+thread_pool_generic::~thread_pool_generic()
+{
+ /*
+ Stop AIO early.
+ This is needed to prevent AIO completion thread
+ from calling submit_task() on an object that is being destroyed.
+ */
+ m_aio.reset();
+
+ /* Also stop the maintanence task early. */
+ m_maintaince_timer_task.reset();
+
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_in_shutdown= true;
+
+ /* Wake up idle threads. */
+ while (wake(WAKE_REASON_SHUTDOWN))
+ {
+ }
+
+ while (thread_count())
+ {
+ m_cv_no_threads.wait(lk);
+ }
+
+ lk.unlock();
+}
+
+thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
+{
+ return new thread_pool_generic(min_threads, max_threads);
+}
+
+} // namespace tpool
diff --git a/tpool/tpool_structs.h b/tpool/tpool_structs.h
new file mode 100644
index 00000000000..6f47b3b3e40
--- /dev/null
+++ b/tpool/tpool_structs.h
@@ -0,0 +1,353 @@
+/* Copyright(C) 2019 MariaDB Corporation
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#pragma once
+#include <vector>
+#include <stack>
+#include <mutex>
+#include <condition_variable>
+#include <assert.h>
+#include <algorithm>
+
+namespace tpool
+{
+
+enum cache_notification_mode
+{
+ NOTIFY_ONE,
+ NOTIFY_ALL
+};
+
+/**
+ Generic "pointer" cache of a fixed size
+ with fast put/get operations.
+
+ Compared to STL containers, is faster/does not
+ do allocations. However, put() operation will wait
+ if there is no free items.
+*/
+template<typename T> class cache
+{
+ std::mutex m_mtx;
+ std::condition_variable m_cv;
+ std::vector<T> m_base;
+ std::vector<T*> m_cache;
+ cache_notification_mode m_notification_mode;
+ int m_waiters;
+
+ bool is_full()
+ {
+ return m_cache.size() == m_base.size();
+ }
+
+public:
+ cache(size_t count, cache_notification_mode mode= tpool::cache_notification_mode::NOTIFY_ALL):
+ m_mtx(), m_cv(), m_base(count),m_cache(count), m_notification_mode(mode),m_waiters()
+ {
+ for(size_t i = 0 ; i < count; i++)
+ m_cache[i]=&m_base[i];
+ }
+
+ T* get(bool blocking=true)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (blocking)
+ {
+ while(m_cache.empty())
+ m_cv.wait(lk);
+ }
+ else
+ {
+ if(m_cache.empty())
+ return nullptr;
+ }
+ T* ret = m_cache.back();
+ m_cache.pop_back();
+ return ret;
+ }
+
+
+ void put(T *ele)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_cache.push_back(ele);
+ if (m_notification_mode == NOTIFY_ONE)
+ m_cv.notify_one();
+ else if(m_cache.size() == 1)
+ m_cv.notify_all(); // Signal cache is not empty
+ else if(m_waiters && is_full())
+ m_cv.notify_all(); // Signal cache is full
+ }
+
+ bool contains(T* ele)
+ {
+ return ele >= &m_base[0] && ele <= &m_base[m_base.size() -1];
+ }
+
+ /* Wait until cache is full.*/
+ void wait()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_waiters++;
+ while(!is_full())
+ m_cv.wait(lk);
+ m_waiters--;
+ }
+};
+
+
+/**
+ Circular, fixed size queue
+ used for the task queue.
+
+ Compared to STL queue, this one is
+ faster, and does not do memory allocations
+*/
+template <typename T> class circular_queue
+{
+
+public:
+ circular_queue(size_t N = 16)
+ : m_capacity(N + 1), m_buffer(m_capacity), m_head(), m_tail()
+ {
+ }
+ bool empty() { return m_head == m_tail; }
+ bool full() { return (m_head + 1) % m_capacity == m_tail; }
+ void clear() { m_head = m_tail = 0; }
+ void resize(size_t new_size)
+ {
+ auto current_size = size();
+ if (new_size <= current_size)
+ return;
+ size_t new_capacity = new_size - 1;
+ std::vector<T> new_buffer(new_capacity);
+ /* Figure out faster way to copy*/
+ size_t i = 0;
+ while (!empty())
+ {
+ T& ele = front();
+ pop();
+ new_buffer[i++] = ele;
+ }
+ m_buffer = new_buffer;
+ m_capacity = new_capacity;
+ m_tail = 0;
+ m_head = current_size;
+ }
+ void push(T ele)
+ {
+ if (full())
+ {
+ assert(size() == m_capacity - 1);
+ resize(size() + 1024);
+ }
+ m_buffer[m_head] = ele;
+ m_head = (m_head + 1) % m_capacity;
+ }
+ void push_front(T ele)
+ {
+ if (full())
+ {
+ resize(size() + 1024);
+ }
+ if (m_tail == 0)
+ m_tail = m_capacity - 1;
+ else
+ m_tail--;
+ m_buffer[m_tail] = ele;
+ }
+ T& front()
+ {
+ assert(!empty());
+ return m_buffer[m_tail];
+ }
+ void pop()
+ {
+ assert(!empty());
+ m_tail = (m_tail + 1) % m_capacity;
+ }
+ size_t size()
+ {
+ if (m_head < m_tail)
+ {
+ return m_capacity - m_tail + m_head;
+ }
+ else
+ {
+ return m_head - m_tail;
+ }
+ }
+
+ /*Iterator over elements in queue.*/
+ class iterator
+ {
+ size_t m_pos;
+ circular_queue<T>* m_queue;
+ public:
+ explicit iterator(size_t pos , circular_queue<T>* q) : m_pos(pos), m_queue(q) {}
+ iterator& operator++()
+ {
+ m_pos = (m_pos + 1) % m_queue->m_capacity;
+ return *this;
+ }
+ iterator operator++(int)
+ {
+ iterator retval = *this;
+ ++(*this);
+ return retval;
+ }
+ bool operator==(iterator other) const
+ {
+ return m_pos == other.m_pos;
+ }
+ bool operator!=(iterator other) const
+ {
+ return !(*this == other);
+ }
+ T& operator*() const
+ {
+ return m_queue->m_buffer[m_pos];
+ }
+ };
+
+ iterator begin()
+ {
+ return iterator(m_tail, this);
+ }
+ iterator end()
+ {
+ return iterator(m_head, this);
+ }
+private:
+ size_t m_capacity;
+ std::vector<T> m_buffer;
+ size_t m_head;
+ size_t m_tail;
+
+
+};
+
+/* Doubly linked list. Intrusive,
+ requires element to have m_next and m_prev pointers.
+*/
+template<typename T> class doubly_linked_list
+{
+public:
+ T* m_first;
+ T* m_last;
+ size_t m_count;
+ doubly_linked_list():m_first(),m_last(),m_count()
+ {}
+ void check()
+ {
+ assert(!m_first || !m_first->m_prev);
+ assert(!m_last || !m_last->m_next);
+ assert((!m_first && !m_last && m_count == 0)
+ || (m_first != 0 && m_last != 0 && m_count > 0));
+ T* current = m_first;
+ for(size_t i=1; i< m_count;i++)
+ {
+ current = current->m_next;
+ }
+ assert(current == m_last);
+ current = m_last;
+ for (size_t i = 1; i < m_count; i++)
+ {
+ current = current->m_prev;
+ }
+ assert(current == m_first);
+ }
+ T* front()
+ {
+ return m_first;
+ }
+ size_t size()
+ {
+ return m_count;
+ }
+ void push_back(T* ele)
+ {
+ ele->m_prev = m_last;
+ if (m_last)
+ m_last->m_next = ele;
+
+ ele->m_next = 0;
+ m_last = ele;
+ if (!m_first)
+ m_first = m_last;
+
+ m_count++;
+ }
+ T* back()
+ {
+ return m_last;
+ }
+ bool empty()
+ {
+ return m_count == 0;
+ }
+ void pop_back()
+ {
+ m_last = m_last->m_prev;
+ if (m_last)
+ m_last->m_next = 0;
+ else
+ m_first = 0;
+ m_count--;
+ }
+ bool contains(T* ele)
+ {
+ if (!ele)
+ return false;
+ T* current = m_first;
+ while(current)
+ {
+ if(current == ele)
+ return true;
+ current = current->m_next;
+ }
+ return false;
+ }
+
+ void erase(T* ele)
+ {
+ assert(contains(ele));
+
+ if (ele == m_first)
+ {
+ m_first = ele->m_next;
+ if (m_first)
+ m_first->m_prev = 0;
+ else
+ m_last = 0;
+ }
+ else if (ele == m_last)
+ {
+ assert(ele->m_prev);
+ m_last = ele->m_prev;
+ m_last->m_next = 0;
+ }
+ else
+ {
+ assert(ele->m_next);
+ assert(ele->m_prev);
+ ele->m_next->m_prev = ele->m_prev;
+ ele->m_prev->m_next = ele->m_next;
+ }
+ m_count--;
+ }
+};
+
+}
diff --git a/tpool/tpool_win.cc b/tpool/tpool_win.cc
new file mode 100644
index 00000000000..623b1b5577b
--- /dev/null
+++ b/tpool/tpool_win.cc
@@ -0,0 +1,291 @@
+/* Copyright(C) 2019 MariaDB
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include <stdlib.h>
+#include <tpool.h>
+#include <windows.h>
+#include <atomic>
+
+/**
+ Implementation of tpool/aio based on Windows native threadpool.
+*/
+
+namespace tpool
+{
+/**
+ Pool, based on Windows native(Vista+) threadpool.
+*/
+class thread_pool_win : public thread_pool
+{
+ /**
+ Handle per-thread init/term functions.
+ Since it is Windows that creates thread, and not us,
+ it is tricky. We employ thread local storage data
+ and check whether init function was called, inside every callback.
+ */
+ struct tls_data
+ {
+ thread_pool_win *m_pool;
+ ~tls_data()
+ {
+ /* Call thread termination function. */
+ if (!m_pool)
+ return;
+
+ if (m_pool->m_worker_destroy_callback)
+ m_pool->m_worker_destroy_callback();
+
+ m_pool->m_thread_count--;
+ }
+ /** This needs to be called before every IO or simple task callback.*/
+ void callback_prolog(thread_pool_win* pool)
+ {
+ assert(pool);
+ assert(!m_pool || (m_pool == pool));
+ if (m_pool)
+ {
+ // TLS data already initialized.
+ return;
+ }
+ m_pool = pool;
+ m_pool->m_thread_count++;
+ // Call the thread init function.
+ if (m_pool->m_worker_init_callback)
+ m_pool->m_worker_init_callback();
+ }
+ };
+
+ static thread_local struct tls_data tls_data;
+ /** Timer */
+ class native_timer : public timer
+ {
+ std::mutex m_mtx; // protects against parallel execution
+ std::mutex m_shutdown_mtx; // protects m_on
+ PTP_TIMER m_ptp_timer;
+ callback_func m_func;
+ void *m_data;
+ thread_pool_win& m_pool;
+ int m_period;
+ bool m_on;
+
+ static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE callback_instance, void *context,
+ PTP_TIMER callback_timer)
+ {
+ native_timer *timer= (native_timer *) context;
+ tls_data.callback_prolog(&timer->m_pool);
+ std::unique_lock<std::mutex> lk(timer->m_mtx, std::defer_lock);
+ if (!lk.try_lock())
+ {
+ /* Do not try to run timers in parallel */
+ return;
+ }
+ timer->m_func(timer->m_data);
+ if (timer->m_period)
+ timer->set_time(timer->m_period, timer->m_period);
+ }
+
+ public:
+ native_timer(thread_pool_win& pool, callback_func func, void* data) :
+ m_mtx(), m_func(func), m_data(data), m_pool(pool), m_period(), m_on(true)
+ {
+ m_ptp_timer= CreateThreadpoolTimer(timer_callback, this, &pool.m_env);
+ }
+ void set_time(int initial_delay_ms, int period_ms) override
+ {
+ std::unique_lock<std::mutex> lk(m_shutdown_mtx);
+ if (!m_on)
+ return;
+ long long initial_delay = -10000LL * initial_delay_ms;
+ SetThreadpoolTimer(m_ptp_timer, NULL, 0, 0);
+ SetThreadpoolTimer(m_ptp_timer, (PFILETIME)&initial_delay, 0, 100);
+ m_period = period_ms;
+ }
+ void disarm() override
+ {
+ std::unique_lock<std::mutex> lk(m_shutdown_mtx);
+ m_on = false;
+ SetThreadpoolTimer(m_ptp_timer, NULL , 0, 0);
+ lk.unlock();
+ /* Don't do it in timer callback, that will hang*/
+ WaitForThreadpoolTimerCallbacks(m_ptp_timer, TRUE);
+ }
+
+ ~native_timer()
+ {
+ disarm();
+ CloseThreadpoolTimer(m_ptp_timer);
+ }
+ };
+ /** AIO handler */
+ class native_aio : public aio
+ {
+ thread_pool_win& m_pool;
+
+ public:
+ native_aio(thread_pool_win &pool, int max_io)
+ : m_pool(pool)
+ {
+ }
+
+ /**
+ Submit async IO.
+ */
+ virtual int submit_io(aiocb* cb) override
+ {
+ memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
+
+ ULARGE_INTEGER uli;
+ uli.QuadPart = cb->m_offset;
+ cb->Offset = uli.LowPart;
+ cb->OffsetHigh = uli.HighPart;
+ cb->m_internal = this;
+ StartThreadpoolIo(cb->m_fh.m_ptp_io);
+
+ BOOL ok;
+ if (cb->m_opcode == aio_opcode::AIO_PREAD)
+ ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
+ else
+ ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
+
+ if (ok || (GetLastError() == ERROR_IO_PENDING))
+ return 0;
+
+ CancelThreadpoolIo(cb->m_fh.m_ptp_io);
+ return -1;
+ }
+
+ /**
+ PTP_WIN32_IO_CALLBACK-typed function, required parameter for
+ CreateThreadpoolIo(). The user callback and other auxiliary data is put into
+ the extended OVERLAPPED parameter.
+ */
+ static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID context, PVOID overlapped,
+ ULONG io_result, ULONG_PTR nbytes,
+ PTP_IO io)
+ {
+ aiocb* cb = (aiocb*)overlapped;
+ native_aio* aio = (native_aio*)cb->m_internal;
+ tls_data.callback_prolog(&aio->m_pool);
+ cb->m_err = io_result;
+ cb->m_ret_len = (int)nbytes;
+ cb->m_internal_task.m_func = cb->m_callback;
+ cb->m_internal_task.m_group = cb->m_group;
+ cb->m_internal_task.m_arg = cb;
+ cb->m_internal_task.execute();
+ }
+
+ /**
+ Binds the file handle via CreateThreadpoolIo().
+ */
+ virtual int bind(native_file_handle& fd) override
+ {
+ fd.m_ptp_io =
+ CreateThreadpoolIo(fd.m_handle, io_completion_callback, 0, &(m_pool.m_env));
+ if (fd.m_ptp_io)
+ return 0;
+ return -1;
+ }
+
+ /**
+ Unbind the file handle via CloseThreadpoolIo.
+ */
+ virtual int unbind(const native_file_handle& fd) override
+ {
+ if (fd.m_ptp_io)
+ CloseThreadpoolIo(fd.m_ptp_io);
+ return 0;
+ }
+ };
+
+ PTP_POOL m_ptp_pool;
+ TP_CALLBACK_ENVIRON m_env;
+ PTP_CLEANUP_GROUP m_cleanup;
+ const int TASK_CACHE_SIZE= 10000;
+
+ struct task_cache_entry
+ {
+ thread_pool_win *m_pool;
+ task* m_task;
+ };
+ cache<task_cache_entry> m_task_cache;
+ std::atomic<int> m_thread_count;
+public:
+ thread_pool_win(int min_threads= 0, int max_threads= 0)
+ : m_task_cache(TASK_CACHE_SIZE),m_thread_count(0)
+ {
+ InitializeThreadpoolEnvironment(&m_env);
+ m_ptp_pool= CreateThreadpool(NULL);
+ m_cleanup= CreateThreadpoolCleanupGroup();
+ SetThreadpoolCallbackPool(&m_env, m_ptp_pool);
+ SetThreadpoolCallbackCleanupGroup(&m_env, m_cleanup, 0);
+ if (min_threads)
+ SetThreadpoolThreadMinimum(m_ptp_pool, min_threads);
+ if (max_threads)
+ SetThreadpoolThreadMaximum(m_ptp_pool, max_threads);
+ }
+ ~thread_pool_win()
+ {
+ CloseThreadpoolCleanupGroupMembers(m_cleanup, TRUE, NULL);
+ CloseThreadpoolCleanupGroup(m_cleanup);
+ CloseThreadpool(m_ptp_pool);
+
+ // Wait until all threads finished and TLS destructors ran.
+ while(m_thread_count)
+ Sleep(1);
+ }
+ /**
+ PTP_SIMPLE_CALLBACK-typed function, used by TrySubmitThreadpoolCallback()
+ */
+ static void CALLBACK task_callback(PTP_CALLBACK_INSTANCE, void *param)
+ {
+ auto entry= (task_cache_entry *) param;
+ auto task= entry->m_task;
+
+ tls_data.callback_prolog(entry->m_pool);
+
+ entry->m_pool->m_task_cache.put(entry);
+
+ task->execute();
+ }
+ virtual void submit_task(task *task) override
+ {
+ auto entry= m_task_cache.get();
+ task->add_ref();
+ entry->m_pool= this;
+ entry->m_task= task;
+ if (!TrySubmitThreadpoolCallback(task_callback, entry, &m_env))
+ abort();
+ }
+
+ aio *create_native_aio(int max_io) override
+ {
+ return new native_aio(*this, max_io);
+ }
+
+ timer* create_timer(callback_func func, void* data) override
+ {
+ return new native_timer(*this, func, data);
+ }
+};
+
+thread_local struct thread_pool_win::tls_data thread_pool_win::tls_data;
+
+thread_pool *create_thread_pool_win(int min_threads, int max_threads)
+{
+ return new thread_pool_win(min_threads, max_threads);
+}
+} // namespace tpool