diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2019-12-20 12:49:37 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2019-12-24 18:16:51 +0100 |
commit | c303ff05b0350657d07d53c04e933c43b0b35184 (patch) | |
tree | f532addc806550ed079d26e52767246620b4bded | |
parent | d35d4d5f7149032244e36063e57f10f03bae7650 (diff) | |
download | mariadb-git-c303ff05b0350657d07d53c04e933c43b0b35184.tar.gz |
-rw-r--r-- | include/my_pthread.h | 14 | ||||
-rw-r--r-- | sql/scheduler.cc | 9 | ||||
-rw-r--r-- | sql/scheduler.h | 3 | ||||
-rw-r--r-- | sql/sql_class.cc | 19 | ||||
-rw-r--r-- | sql/threadpool.h | 7 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 63 | ||||
-rw-r--r-- | sql/threadpool_win.cc | 156 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 81 |
8 files changed, 321 insertions, 31 deletions
diff --git a/include/my_pthread.h b/include/my_pthread.h index 17813047ce5..3fb2a9e18c5 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -113,13 +113,13 @@ int pthread_cancel(pthread_t thread); #undef SAFE_MUTEX /* This will cause conflicts */ #define pthread_key(T,V) DWORD V -#define pthread_key_create(A,B) ((*A=TlsAlloc())==0xFFFFFFFF) -#define pthread_key_delete(A) TlsFree(A) -#define my_pthread_setspecific_ptr(T,V) (!TlsSetValue((T),(V))) -#define pthread_setspecific(A,B) (!TlsSetValue((A),(B))) -#define pthread_getspecific(A) (TlsGetValue(A)) -#define my_pthread_getspecific(T,A) ((T) TlsGetValue(A)) -#define my_pthread_getspecific_ptr(T,V) ((T) TlsGetValue(V)) +#define pthread_key_create(A,B) ((*A=FlsAlloc(NULL))==0xFFFFFFFF) +#define pthread_key_delete(A) FlsFree(A) +#define my_pthread_setspecific_ptr(T,V) (!FlsSetValue((T),(V))) +#define pthread_setspecific(A,B) (!FlsSetValue((A),(B))) +#define pthread_getspecific(A) (FlsGetValue(A)) +#define my_pthread_getspecific(T,A) ((T) FlsGetValue(A)) +#define my_pthread_getspecific_ptr(T,V) ((T) FlsGetValue(V)) #define pthread_equal(A,B) ((A) == (B)) #define pthread_mutex_init(A,B) (InitializeCriticalSection(A),0) diff --git a/sql/scheduler.cc b/sql/scheduler.cc index 7380b134f13..526aa135805 100644 --- a/sql/scheduler.cc +++ b/sql/scheduler.cc @@ -109,7 +109,14 @@ void post_kill_notification(THD *thd) */ #ifndef EMBEDDED_LIBRARY +static void* my_scheduler_yield() +{ + return NULL; +} +static void my_scheduler_resume(void *) +{ +} void one_thread_per_connection_scheduler(scheduler_functions *func, ulong *arg_max_connections, Atomic_counter<uint> *arg_connection_count) @@ -120,6 +127,7 @@ void one_thread_per_connection_scheduler(scheduler_functions *func, func->connection_count= arg_connection_count; func->add_connection= create_thread_to_handle_connection; func->post_kill_notification= post_kill_notification; + } #else void handle_connection_in_main_thread(CONNECT *connect) @@ -139,3 +147,4 @@ void one_thread_scheduler(scheduler_functions *func) func->connection_count= &connection_count; func->add_connection= handle_connection_in_main_thread; } + diff --git a/sql/scheduler.h b/sql/scheduler.h index 676262f6454..8c6f408e329 100644 --- a/sql/scheduler.h +++ b/sql/scheduler.h @@ -40,6 +40,9 @@ struct scheduler_functions void (*thd_wait_end)(THD *thd); void (*post_kill_notification)(THD *thd); void (*end)(void); + void* (*get_context)(); + void (*yield)(void); + void (*resume)(void *); }; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 4577f1007be..53b11c345b6 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -4917,6 +4917,25 @@ void reset_thd(MYSQL_THD thd) free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC)); } +int my_scheduler_get_context(void **ctx, void (**resume_fct)(void*)) +{ + THD *thd = _current_thd(); + if (thd && thd->scheduler && thd->scheduler->yield) + { + auto s = thd->scheduler; + *ctx = s->get_context(); + *resume_fct =s->resume; + return ctx?0:1; + } + return 1; +} + +void my_scheduler_yield() +{ + THD* thd = _current_thd(); + thd->scheduler->yield(); +} + unsigned long long thd_get_query_id(const MYSQL_THD thd) { return((unsigned long long)thd->query_id); diff --git a/sql/threadpool.h b/sql/threadpool.h index fe77100a82a..cf7b74d2de9 100644 --- a/sql/threadpool.h +++ b/sql/threadpool.h @@ -135,6 +135,10 @@ struct TP_pool virtual int set_stall_limit(uint){ return 0; } virtual int get_thread_count() { return tp_stats.num_worker_threads; } virtual int get_idle_thread_count(){ return 0; } + virtual void* get_context() {return nullptr;} + virtual void yield() {} + virtual void resume(void *) {} + }; #ifdef _WIN32 @@ -147,6 +151,9 @@ struct TP_pool_win:TP_pool virtual void add(TP_connection *); virtual int set_max_threads(uint); virtual int set_min_threads(uint); + virtual void* get_context(); + virtual void yield(); + virtual void resume(void *); }; #endif diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index 0588562ae61..076fd34bf3b 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -169,15 +169,14 @@ static TP_PRIORITY get_priority(TP_connection *c) return prio; } - -void tp_callback(TP_connection *c) +int tp_callback_internal(TP_connection* c) { DBUG_ASSERT(c); Worker_thread_context worker_context; worker_context.save(); - THD *thd= c->thd; + THD* thd = c->thd; c->state = TP_STATE_RUNNING; @@ -185,13 +184,14 @@ void tp_callback(TP_connection *c) { /* No THD, need to login first. */ DBUG_ASSERT(c->connect); - thd= c->thd= threadpool_add_connection(c->connect, c); + my_thread_init(); + thd = c->thd = threadpool_add_connection(c->connect, c); if (!thd) { /* Bail out on connect error.*/ goto error; } - c->connect= 0; + c->connect = 0; } else if (threadpool_process_request(thd)) { @@ -200,28 +200,43 @@ void tp_callback(TP_connection *c) } /* Set priority */ - c->priority= get_priority(c); + c->priority = get_priority(c); /* Read next command from client. */ c->set_io_timeout(thd->get_net_wait_timeout()); - c->state= TP_STATE_IDLE; - if (c->start_io()) - goto error; + c->state = TP_STATE_IDLE; worker_context.restore(); - return; + return 0; error: - c->thd= 0; - delete c; + worker_context.restore(); + return -1; +} +void tp_destroy_connection(TP_connection *c) +{ + Worker_thread_context worker_context; + worker_context.save(); + + THD *thd = c->thd; + c->thd = 0; + delete c; if (thd) - { threadpool_remove_connection(thd); - } worker_context.restore(); } +void tp_callback(TP_connection *c) +{ + if (tp_callback_internal(c) + || c->start_io()) + { + tp_destroy_connection(c); + } +} + + static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data) { @@ -509,6 +524,21 @@ static void tp_post_kill_notification(THD *thd) post_kill_notification(thd); } +static void tp_yield() +{ + pool->yield(); +} + +static void* tp_get_context() +{ + return pool->get_context(); +} + +static void tp_resume(void* context) +{ + pool->resume(context); +} + static scheduler_functions tp_scheduler_functions= { 0, // max_threads @@ -519,7 +549,10 @@ static scheduler_functions tp_scheduler_functions= tp_wait_begin, // thd_wait_begin tp_wait_end, // thd_wait_end tp_post_kill_notification, // post kill notification - tp_end // end + tp_end, // end + tp_get_context, + tp_yield, + tp_resume }; void pool_of_threads_scheduler(struct scheduler_functions *func, diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc index c9968d48c06..f25402a4f51 100644 --- a/sql/threadpool_win.cc +++ b/sql/threadpool_win.cc @@ -41,6 +41,7 @@ static void tp_log_warning(const char *msg, const char *fct) static PTP_POOL pool; static TP_CALLBACK_ENVIRON callback_environ; +static TP_CALLBACK_ENVIRON callback_environ_highprio; static DWORD fls; static bool skip_completion_port_on_success = false; @@ -50,6 +51,25 @@ PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ() return pool? &callback_environ: 0; } +enum class child_return +{ + SUCCESS, + ERROR, + YIELD, + YIELD_CONFIRM, + PARENT_ERROR +}; + +struct tp_fiber_data +{ + void* parent_fiber; + void* child_fiber; + void* child_data; + child_return child_ret; + tp_fiber_data():parent_fiber(),child_fiber(),child_data(), child_ret(){} +}; + + /* Threadpool callbacks. @@ -97,8 +117,13 @@ public: PTP_IO io; PTP_TIMER timer; PTP_WORK work; + PTP_WORK work_highprio; bool long_callback; - + void *fiber; + void *parent_fiber; + volatile child_return fiber_ret; + std::atomic<int> executing; + DWORD executing_thread_id; }; struct TP_connection *new_TP_connection(CONNECT *connect) @@ -112,27 +137,136 @@ struct TP_connection *new_TP_connection(CONNECT *connect) return c; } +extern int tp_callback_internal(TP_connection* c); +extern void tp_destroy_connection(TP_connection* c); + +static void tp_fiber(void *param) +{ + auto c = (TP_connection_win*)param; + void *parent_fiber; + for(;;) + { + if(c->fiber_ret == child_return::PARENT_ERROR) + { + parent_fiber = c->parent_fiber; + tp_destroy_connection(c); + SwitchToFiber(parent_fiber); + } + else + { + c->fiber_ret = child_return::YIELD; + if (tp_callback_internal(c)) + c->fiber_ret = child_return::ERROR; + else + c->fiber_ret = child_return::SUCCESS; + SwitchToFiber(c->parent_fiber); + } + } +} + +static void tp_callback_fiber(TP_connection_win* c) +{ + c->parent_fiber = GetCurrentFiber(); + if (!c->fiber) + c->fiber = CreateFiber(my_thread_stack_size, tp_fiber, c); + if (!c->fiber) + { + tp_destroy_connection(c); + return; + } + void* child_fiber = c->fiber; + auto ret = c->fiber_ret; + switch (ret) + { + case child_return::ERROR: + case child_return::PARENT_ERROR: + abort(); + break; + case child_return::YIELD: + SubmitThreadpoolWork(c->work); + return; + case child_return::YIELD_CONFIRM: + break; + case child_return::SUCCESS: + break; + } + + if (c->executing++) + abort(); + c->executing_thread_id = GetCurrentThreadId(); + SwitchToFiber(c->fiber); + c->executing --; + + switch (c->fiber_ret) + { + case child_return::SUCCESS: + if (c->start_io()) + { + c->fiber_ret = child_return::PARENT_ERROR; + + SwitchToFiber(child_fiber); + DeleteFiber(child_fiber); + } + break; + case child_return::ERROR: + c->fiber_ret = child_return::PARENT_ERROR; + SwitchToFiber(child_fiber); + DeleteFiber(child_fiber); + break; + case child_return::YIELD: + c->fiber_ret = child_return::YIELD_CONFIRM; + break; + case child_return::PARENT_ERROR: + case child_return::YIELD_CONFIRM: + abort(); + } +} + void TP_pool_win::add(TP_connection *c) { if(FlsGetValue(fls)) { /* Inside threadpool(), execute callback directly. */ - tp_callback(c); + tp_callback_fiber((TP_connection_win*)c); } else { - SubmitThreadpoolWork(((TP_connection_win *)c)->work); + TP_connection_win *c_w = ((TP_connection_win*)c); + SubmitThreadpoolWork(c_w->work); } } +void TP_pool_win::yield() +{ + TP_connection_win *c = (TP_connection_win *) GetFiberData(); + c->fiber_ret = child_return::YIELD; + SwitchToFiber(c->parent_fiber); +} + +void* TP_pool_win::get_context() +{ + return GetFiberData(); +} + +void TP_pool_win::resume(void* context) +{ + auto c = (TP_connection_win*) context; + if (c->fiber_ret != child_return::YIELD && c->fiber_ret != child_return::YIELD_CONFIRM) + abort(); + SubmitThreadpoolWork(c->work); +} TP_connection_win::TP_connection_win(CONNECT *c) : TP_connection(c), - timeout(ULONGLONG_MAX), + timeout(ULONGLONG_MAX), callback_instance(0), io(0), timer(0), - work(0) + work(0), + parent_fiber(), + fiber(), + fiber_ret(), + executing(0) { } @@ -167,6 +301,7 @@ int TP_connection_win::init() CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ)); CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ)); CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(work_highprio = CreateThreadpoolWork(work_callback, this, &callback_environ_highprio)); return 0; } @@ -266,6 +401,9 @@ TP_connection_win::~TP_connection_win() if (work) CloseThreadpoolWork(work); + if (work_highprio) + CloseThreadpoolWork(work_highprio); + if (timer) { SetThreadpoolTimer(timer, 0, 0, 0); @@ -304,6 +442,7 @@ void tp_win_callback_prolog() if (FlsGetValue(fls) == NULL) { /* Running in new worker thread*/ + ConvertThreadToFiber(NULL); FlsSetValue(fls, (void *)1); statistic_increment(thread_created, &LOCK_status); InterlockedIncrement((volatile long *)&tp_stats.num_worker_threads); @@ -329,6 +468,7 @@ static VOID WINAPI thread_destructor(void *data) { if(data) { + ConvertFiberToThread(); InterlockedDecrement((volatile long *)&tp_stats.num_worker_threads); my_thread_end(); } @@ -339,7 +479,7 @@ static VOID WINAPI thread_destructor(void *data) static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context) { pre_callback(context, instance); - tp_callback((TP_connection *)context); + tp_callback_fiber((TP_connection_win *)context); } @@ -412,6 +552,9 @@ int TP_pool_win::init() InitializeThreadpoolEnvironment(&callback_environ); SetThreadpoolCallbackPool(&callback_environ, pool); + InitializeThreadpoolEnvironment(&callback_environ_highprio); + SetThreadpoolCallbackPriority(&callback_environ_highprio,TP_CALLBACK_PRIORITY_HIGH); + SetThreadpoolCallbackPool(&callback_environ_highprio, pool); if (threadpool_max_threads) { SetThreadpoolThreadMaximum(pool, threadpool_max_threads); @@ -446,6 +589,7 @@ TP_pool_win::~TP_pool_win() if (!pool) return; DestroyThreadpoolEnvironment(&callback_environ); + DestroyThreadpoolEnvironment(&callback_environ_highprio); SetThreadpoolThreadMaximum(pool, 0); CloseThreadpool(pool); if (!tp_stats.num_worker_threads) diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 9f9253e0b9c..f9c7f6bcb28 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -843,6 +843,66 @@ log_buffer_switch() log_sys.buf_next_to_write = log_sys.buf_free; } +struct flush_notify_t +{ + lsn_t m_lsn; + void *m_data; + void (*m_func)(void *); +}; +struct cmp_flush_notify +{ + bool operator()(const flush_notify_t& lhs, const flush_notify_t& rhs) + { + return lhs.m_lsn > rhs.m_lsn; + } +}; + + +#include <queue> +#include <mutex> +class flush_notify_queue_t +{ + OSMutex m_mtx; + std::vector<flush_notify_t> m_queue; + +public: + flush_notify_queue_t():m_mtx(),m_queue() + { + m_mtx.init(); + m_queue.reserve(1000); + } + void push(flush_notify_t e) + { + m_mtx.enter(); + m_queue.push_back(e); + m_mtx.exit(); + } + void notify(lsn_t lsn) + { + m_mtx.enter(); + for(auto e: m_queue) + { + if(e.m_lsn <= lsn) + e.m_func(e.m_data); + } + m_queue.erase(std::remove_if(m_queue.begin(), m_queue.end(), + [lsn](flush_notify_t const& e) + { + return e.m_lsn <= lsn; + }), m_queue.end()); + m_mtx.exit(); + } + ~flush_notify_queue_t() + { + ut_a(m_queue.empty()); + m_mtx.destroy(); + } +}; + +flush_notify_queue_t flush_notify_queue; +extern int my_scheduler_get_context(void** ctx, void (**resume_fct)(void*)); +extern void my_scheduler_yield(); + /** Ensure that the log has been written to the log file up to a given log entry (such as that of a transaction commit). Start a new write, or wait and check if an already running write is covering the request. @@ -906,10 +966,25 @@ loop: /* Figure out if the current flush will do the job for us. */ bool work_done = log_sys.current_flush_lsn >= lsn; - + bool do_yield = false; + flush_notify_t fn; + if (!my_scheduler_get_context(&(fn.m_data), &(fn.m_func))) + { + fn.m_lsn = lsn; + flush_notify_queue.push(fn); + do_yield = true; + } log_write_mutex_exit(); - os_event_wait(log_sys.flush_event); + if (do_yield) { + my_scheduler_yield(); +#if (UNIV_WORD_SIZE >=8) + ut_a(log_sys.write_lsn >= lsn); +#endif + return; + } else { + os_event_wait(log_sys.flush_event); + } if (work_done) { return; @@ -1034,8 +1109,8 @@ loop: log_write_flush_to_disk_low(); ib_uint64_t flush_lsn = log_sys.flushed_to_disk_lsn; log_mutex_exit(); - innobase_mysql_log_notify(flush_lsn); + flush_notify_queue.notify(flush_lsn); } } |