summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2019-12-20 12:49:37 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2019-12-24 18:16:51 +0100
commitc303ff05b0350657d07d53c04e933c43b0b35184 (patch)
treef532addc806550ed079d26e52767246620b4bded
parentd35d4d5f7149032244e36063e57f10f03bae7650 (diff)
downloadmariadb-git-c303ff05b0350657d07d53c04e933c43b0b35184.tar.gz
-rw-r--r--include/my_pthread.h14
-rw-r--r--sql/scheduler.cc9
-rw-r--r--sql/scheduler.h3
-rw-r--r--sql/sql_class.cc19
-rw-r--r--sql/threadpool.h7
-rw-r--r--sql/threadpool_common.cc63
-rw-r--r--sql/threadpool_win.cc156
-rw-r--r--storage/innobase/log/log0log.cc81
8 files changed, 321 insertions, 31 deletions
diff --git a/include/my_pthread.h b/include/my_pthread.h
index 17813047ce5..3fb2a9e18c5 100644
--- a/include/my_pthread.h
+++ b/include/my_pthread.h
@@ -113,13 +113,13 @@ int pthread_cancel(pthread_t thread);
#undef SAFE_MUTEX /* This will cause conflicts */
#define pthread_key(T,V) DWORD V
-#define pthread_key_create(A,B) ((*A=TlsAlloc())==0xFFFFFFFF)
-#define pthread_key_delete(A) TlsFree(A)
-#define my_pthread_setspecific_ptr(T,V) (!TlsSetValue((T),(V)))
-#define pthread_setspecific(A,B) (!TlsSetValue((A),(B)))
-#define pthread_getspecific(A) (TlsGetValue(A))
-#define my_pthread_getspecific(T,A) ((T) TlsGetValue(A))
-#define my_pthread_getspecific_ptr(T,V) ((T) TlsGetValue(V))
+#define pthread_key_create(A,B) ((*A=FlsAlloc(NULL))==0xFFFFFFFF)
+#define pthread_key_delete(A) FlsFree(A)
+#define my_pthread_setspecific_ptr(T,V) (!FlsSetValue((T),(V)))
+#define pthread_setspecific(A,B) (!FlsSetValue((A),(B)))
+#define pthread_getspecific(A) (FlsGetValue(A))
+#define my_pthread_getspecific(T,A) ((T) FlsGetValue(A))
+#define my_pthread_getspecific_ptr(T,V) ((T) FlsGetValue(V))
#define pthread_equal(A,B) ((A) == (B))
#define pthread_mutex_init(A,B) (InitializeCriticalSection(A),0)
diff --git a/sql/scheduler.cc b/sql/scheduler.cc
index 7380b134f13..526aa135805 100644
--- a/sql/scheduler.cc
+++ b/sql/scheduler.cc
@@ -109,7 +109,14 @@ void post_kill_notification(THD *thd)
*/
#ifndef EMBEDDED_LIBRARY
+static void* my_scheduler_yield()
+{
+ return NULL;
+}
+static void my_scheduler_resume(void *)
+{
+}
void one_thread_per_connection_scheduler(scheduler_functions *func,
ulong *arg_max_connections,
Atomic_counter<uint> *arg_connection_count)
@@ -120,6 +127,7 @@ void one_thread_per_connection_scheduler(scheduler_functions *func,
func->connection_count= arg_connection_count;
func->add_connection= create_thread_to_handle_connection;
func->post_kill_notification= post_kill_notification;
+
}
#else
void handle_connection_in_main_thread(CONNECT *connect)
@@ -139,3 +147,4 @@ void one_thread_scheduler(scheduler_functions *func)
func->connection_count= &connection_count;
func->add_connection= handle_connection_in_main_thread;
}
+
diff --git a/sql/scheduler.h b/sql/scheduler.h
index 676262f6454..8c6f408e329 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -40,6 +40,9 @@ struct scheduler_functions
void (*thd_wait_end)(THD *thd);
void (*post_kill_notification)(THD *thd);
void (*end)(void);
+ void* (*get_context)();
+ void (*yield)(void);
+ void (*resume)(void *);
};
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 4577f1007be..53b11c345b6 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -4917,6 +4917,25 @@ void reset_thd(MYSQL_THD thd)
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
+int my_scheduler_get_context(void **ctx, void (**resume_fct)(void*))
+{
+ THD *thd = _current_thd();
+ if (thd && thd->scheduler && thd->scheduler->yield)
+ {
+ auto s = thd->scheduler;
+ *ctx = s->get_context();
+ *resume_fct =s->resume;
+ return ctx?0:1;
+ }
+ return 1;
+}
+
+void my_scheduler_yield()
+{
+ THD* thd = _current_thd();
+ thd->scheduler->yield();
+}
+
unsigned long long thd_get_query_id(const MYSQL_THD thd)
{
return((unsigned long long)thd->query_id);
diff --git a/sql/threadpool.h b/sql/threadpool.h
index fe77100a82a..cf7b74d2de9 100644
--- a/sql/threadpool.h
+++ b/sql/threadpool.h
@@ -135,6 +135,10 @@ struct TP_pool
virtual int set_stall_limit(uint){ return 0; }
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
virtual int get_idle_thread_count(){ return 0; }
+ virtual void* get_context() {return nullptr;}
+ virtual void yield() {}
+ virtual void resume(void *) {}
+
};
#ifdef _WIN32
@@ -147,6 +151,9 @@ struct TP_pool_win:TP_pool
virtual void add(TP_connection *);
virtual int set_max_threads(uint);
virtual int set_min_threads(uint);
+ virtual void* get_context();
+ virtual void yield();
+ virtual void resume(void *);
};
#endif
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index 0588562ae61..076fd34bf3b 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -169,15 +169,14 @@ static TP_PRIORITY get_priority(TP_connection *c)
return prio;
}
-
-void tp_callback(TP_connection *c)
+int tp_callback_internal(TP_connection* c)
{
DBUG_ASSERT(c);
Worker_thread_context worker_context;
worker_context.save();
- THD *thd= c->thd;
+ THD* thd = c->thd;
c->state = TP_STATE_RUNNING;
@@ -185,13 +184,14 @@ void tp_callback(TP_connection *c)
{
/* No THD, need to login first. */
DBUG_ASSERT(c->connect);
- thd= c->thd= threadpool_add_connection(c->connect, c);
+ my_thread_init();
+ thd = c->thd = threadpool_add_connection(c->connect, c);
if (!thd)
{
/* Bail out on connect error.*/
goto error;
}
- c->connect= 0;
+ c->connect = 0;
}
else if (threadpool_process_request(thd))
{
@@ -200,28 +200,43 @@ void tp_callback(TP_connection *c)
}
/* Set priority */
- c->priority= get_priority(c);
+ c->priority = get_priority(c);
/* Read next command from client. */
c->set_io_timeout(thd->get_net_wait_timeout());
- c->state= TP_STATE_IDLE;
- if (c->start_io())
- goto error;
+ c->state = TP_STATE_IDLE;
worker_context.restore();
- return;
+ return 0;
error:
- c->thd= 0;
- delete c;
+ worker_context.restore();
+ return -1;
+}
+void tp_destroy_connection(TP_connection *c)
+{
+ Worker_thread_context worker_context;
+ worker_context.save();
+
+ THD *thd = c->thd;
+ c->thd = 0;
+ delete c;
if (thd)
- {
threadpool_remove_connection(thd);
- }
worker_context.restore();
}
+void tp_callback(TP_connection *c)
+{
+ if (tp_callback_internal(c)
+ || c->start_io())
+ {
+ tp_destroy_connection(c);
+ }
+}
+
+
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
{
@@ -509,6 +524,21 @@ static void tp_post_kill_notification(THD *thd)
post_kill_notification(thd);
}
+static void tp_yield()
+{
+ pool->yield();
+}
+
+static void* tp_get_context()
+{
+ return pool->get_context();
+}
+
+static void tp_resume(void* context)
+{
+ pool->resume(context);
+}
+
static scheduler_functions tp_scheduler_functions=
{
0, // max_threads
@@ -519,7 +549,10 @@ static scheduler_functions tp_scheduler_functions=
tp_wait_begin, // thd_wait_begin
tp_wait_end, // thd_wait_end
tp_post_kill_notification, // post kill notification
- tp_end // end
+ tp_end, // end
+ tp_get_context,
+ tp_yield,
+ tp_resume
};
void pool_of_threads_scheduler(struct scheduler_functions *func,
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc
index c9968d48c06..f25402a4f51 100644
--- a/sql/threadpool_win.cc
+++ b/sql/threadpool_win.cc
@@ -41,6 +41,7 @@ static void tp_log_warning(const char *msg, const char *fct)
static PTP_POOL pool;
static TP_CALLBACK_ENVIRON callback_environ;
+static TP_CALLBACK_ENVIRON callback_environ_highprio;
static DWORD fls;
static bool skip_completion_port_on_success = false;
@@ -50,6 +51,25 @@ PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ()
return pool? &callback_environ: 0;
}
+enum class child_return
+{
+ SUCCESS,
+ ERROR,
+ YIELD,
+ YIELD_CONFIRM,
+ PARENT_ERROR
+};
+
+struct tp_fiber_data
+{
+ void* parent_fiber;
+ void* child_fiber;
+ void* child_data;
+ child_return child_ret;
+ tp_fiber_data():parent_fiber(),child_fiber(),child_data(), child_ret(){}
+};
+
+
/*
Threadpool callbacks.
@@ -97,8 +117,13 @@ public:
PTP_IO io;
PTP_TIMER timer;
PTP_WORK work;
+ PTP_WORK work_highprio;
bool long_callback;
-
+ void *fiber;
+ void *parent_fiber;
+ volatile child_return fiber_ret;
+ std::atomic<int> executing;
+ DWORD executing_thread_id;
};
struct TP_connection *new_TP_connection(CONNECT *connect)
@@ -112,27 +137,136 @@ struct TP_connection *new_TP_connection(CONNECT *connect)
return c;
}
+extern int tp_callback_internal(TP_connection* c);
+extern void tp_destroy_connection(TP_connection* c);
+
+static void tp_fiber(void *param)
+{
+ auto c = (TP_connection_win*)param;
+ void *parent_fiber;
+ for(;;)
+ {
+ if(c->fiber_ret == child_return::PARENT_ERROR)
+ {
+ parent_fiber = c->parent_fiber;
+ tp_destroy_connection(c);
+ SwitchToFiber(parent_fiber);
+ }
+ else
+ {
+ c->fiber_ret = child_return::YIELD;
+ if (tp_callback_internal(c))
+ c->fiber_ret = child_return::ERROR;
+ else
+ c->fiber_ret = child_return::SUCCESS;
+ SwitchToFiber(c->parent_fiber);
+ }
+ }
+}
+
+static void tp_callback_fiber(TP_connection_win* c)
+{
+ c->parent_fiber = GetCurrentFiber();
+ if (!c->fiber)
+ c->fiber = CreateFiber(my_thread_stack_size, tp_fiber, c);
+ if (!c->fiber)
+ {
+ tp_destroy_connection(c);
+ return;
+ }
+ void* child_fiber = c->fiber;
+ auto ret = c->fiber_ret;
+ switch (ret)
+ {
+ case child_return::ERROR:
+ case child_return::PARENT_ERROR:
+ abort();
+ break;
+ case child_return::YIELD:
+ SubmitThreadpoolWork(c->work);
+ return;
+ case child_return::YIELD_CONFIRM:
+ break;
+ case child_return::SUCCESS:
+ break;
+ }
+
+ if (c->executing++)
+ abort();
+ c->executing_thread_id = GetCurrentThreadId();
+ SwitchToFiber(c->fiber);
+ c->executing --;
+
+ switch (c->fiber_ret)
+ {
+ case child_return::SUCCESS:
+ if (c->start_io())
+ {
+ c->fiber_ret = child_return::PARENT_ERROR;
+
+ SwitchToFiber(child_fiber);
+ DeleteFiber(child_fiber);
+ }
+ break;
+ case child_return::ERROR:
+ c->fiber_ret = child_return::PARENT_ERROR;
+ SwitchToFiber(child_fiber);
+ DeleteFiber(child_fiber);
+ break;
+ case child_return::YIELD:
+ c->fiber_ret = child_return::YIELD_CONFIRM;
+ break;
+ case child_return::PARENT_ERROR:
+ case child_return::YIELD_CONFIRM:
+ abort();
+ }
+}
+
void TP_pool_win::add(TP_connection *c)
{
if(FlsGetValue(fls))
{
/* Inside threadpool(), execute callback directly. */
- tp_callback(c);
+ tp_callback_fiber((TP_connection_win*)c);
}
else
{
- SubmitThreadpoolWork(((TP_connection_win *)c)->work);
+ TP_connection_win *c_w = ((TP_connection_win*)c);
+ SubmitThreadpoolWork(c_w->work);
}
}
+void TP_pool_win::yield()
+{
+ TP_connection_win *c = (TP_connection_win *) GetFiberData();
+ c->fiber_ret = child_return::YIELD;
+ SwitchToFiber(c->parent_fiber);
+}
+
+void* TP_pool_win::get_context()
+{
+ return GetFiberData();
+}
+
+void TP_pool_win::resume(void* context)
+{
+ auto c = (TP_connection_win*) context;
+ if (c->fiber_ret != child_return::YIELD && c->fiber_ret != child_return::YIELD_CONFIRM)
+ abort();
+ SubmitThreadpoolWork(c->work);
+}
TP_connection_win::TP_connection_win(CONNECT *c) :
TP_connection(c),
- timeout(ULONGLONG_MAX),
+ timeout(ULONGLONG_MAX),
callback_instance(0),
io(0),
timer(0),
- work(0)
+ work(0),
+ parent_fiber(),
+ fiber(),
+ fiber_ret(),
+ executing(0)
{
}
@@ -167,6 +301,7 @@ int TP_connection_win::init()
CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
+ CHECK_ALLOC_ERROR(work_highprio = CreateThreadpoolWork(work_callback, this, &callback_environ_highprio));
return 0;
}
@@ -266,6 +401,9 @@ TP_connection_win::~TP_connection_win()
if (work)
CloseThreadpoolWork(work);
+ if (work_highprio)
+ CloseThreadpoolWork(work_highprio);
+
if (timer)
{
SetThreadpoolTimer(timer, 0, 0, 0);
@@ -304,6 +442,7 @@ void tp_win_callback_prolog()
if (FlsGetValue(fls) == NULL)
{
/* Running in new worker thread*/
+ ConvertThreadToFiber(NULL);
FlsSetValue(fls, (void *)1);
statistic_increment(thread_created, &LOCK_status);
InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads);
@@ -329,6 +468,7 @@ static VOID WINAPI thread_destructor(void *data)
{
if(data)
{
+ ConvertFiberToThread();
InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads);
my_thread_end();
}
@@ -339,7 +479,7 @@ static VOID WINAPI thread_destructor(void *data)
static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context)
{
pre_callback(context, instance);
- tp_callback((TP_connection *)context);
+ tp_callback_fiber((TP_connection_win *)context);
}
@@ -412,6 +552,9 @@ int TP_pool_win::init()
InitializeThreadpoolEnvironment(&callback_environ);
SetThreadpoolCallbackPool(&callback_environ, pool);
+ InitializeThreadpoolEnvironment(&callback_environ_highprio);
+ SetThreadpoolCallbackPriority(&callback_environ_highprio,TP_CALLBACK_PRIORITY_HIGH);
+ SetThreadpoolCallbackPool(&callback_environ_highprio, pool);
if (threadpool_max_threads)
{
SetThreadpoolThreadMaximum(pool, threadpool_max_threads);
@@ -446,6 +589,7 @@ TP_pool_win::~TP_pool_win()
if (!pool)
return;
DestroyThreadpoolEnvironment(&callback_environ);
+ DestroyThreadpoolEnvironment(&callback_environ_highprio);
SetThreadpoolThreadMaximum(pool, 0);
CloseThreadpool(pool);
if (!tp_stats.num_worker_threads)
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 9f9253e0b9c..f9c7f6bcb28 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -843,6 +843,66 @@ log_buffer_switch()
log_sys.buf_next_to_write = log_sys.buf_free;
}
+struct flush_notify_t
+{
+ lsn_t m_lsn;
+ void *m_data;
+ void (*m_func)(void *);
+};
+struct cmp_flush_notify
+{
+ bool operator()(const flush_notify_t& lhs, const flush_notify_t& rhs)
+ {
+ return lhs.m_lsn > rhs.m_lsn;
+ }
+};
+
+
+#include <queue>
+#include <mutex>
+class flush_notify_queue_t
+{
+ OSMutex m_mtx;
+ std::vector<flush_notify_t> m_queue;
+
+public:
+ flush_notify_queue_t():m_mtx(),m_queue()
+ {
+ m_mtx.init();
+ m_queue.reserve(1000);
+ }
+ void push(flush_notify_t e)
+ {
+ m_mtx.enter();
+ m_queue.push_back(e);
+ m_mtx.exit();
+ }
+ void notify(lsn_t lsn)
+ {
+ m_mtx.enter();
+ for(auto e: m_queue)
+ {
+ if(e.m_lsn <= lsn)
+ e.m_func(e.m_data);
+ }
+ m_queue.erase(std::remove_if(m_queue.begin(), m_queue.end(),
+ [lsn](flush_notify_t const& e)
+ {
+ return e.m_lsn <= lsn;
+ }), m_queue.end());
+ m_mtx.exit();
+ }
+ ~flush_notify_queue_t()
+ {
+ ut_a(m_queue.empty());
+ m_mtx.destroy();
+ }
+};
+
+flush_notify_queue_t flush_notify_queue;
+extern int my_scheduler_get_context(void** ctx, void (**resume_fct)(void*));
+extern void my_scheduler_yield();
+
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@@ -906,10 +966,25 @@ loop:
/* Figure out if the current flush will do the job
for us. */
bool work_done = log_sys.current_flush_lsn >= lsn;
-
+ bool do_yield = false;
+ flush_notify_t fn;
+ if (!my_scheduler_get_context(&(fn.m_data), &(fn.m_func)))
+ {
+ fn.m_lsn = lsn;
+ flush_notify_queue.push(fn);
+ do_yield = true;
+ }
log_write_mutex_exit();
- os_event_wait(log_sys.flush_event);
+ if (do_yield) {
+ my_scheduler_yield();
+#if (UNIV_WORD_SIZE >=8)
+ ut_a(log_sys.write_lsn >= lsn);
+#endif
+ return;
+ } else {
+ os_event_wait(log_sys.flush_event);
+ }
if (work_done) {
return;
@@ -1034,8 +1109,8 @@ loop:
log_write_flush_to_disk_low();
ib_uint64_t flush_lsn = log_sys.flushed_to_disk_lsn;
log_mutex_exit();
-
innobase_mysql_log_notify(flush_lsn);
+ flush_notify_queue.notify(flush_lsn);
}
}