diff options
Diffstat (limited to 'storage/innobase/log/log0sync.cc')
-rw-r--r-- | storage/innobase/log/log0sync.cc | 115 |
1 files changed, 105 insertions, 10 deletions
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc index 2a6e1b8b853..6b14d1d3591 100644 --- a/storage/innobase/log/log0sync.cc +++ b/storage/innobase/log/log0sync.cc @@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative. #include <log0types.h> #include "log0sync.h" #include <mysql/service_thd_wait.h> +#include <sql_class.h> /** Helper class , used in group commit lock. @@ -158,10 +159,10 @@ void binary_semaphore::wake() /* A thread helper structure, used in group commit lock below*/ struct group_commit_waiter_t { - lsn_t m_value; - binary_semaphore m_sema; - group_commit_waiter_t* m_next; - group_commit_waiter_t() :m_value(), m_sema(), m_next() {} + lsn_t m_value=0; + binary_semaphore m_sema{}; + group_commit_waiter_t* m_next= nullptr; + bool m_group_commit_leader=false; }; group_commit_lock::group_commit_lock() : @@ -188,7 +189,13 @@ void group_commit_lock::set_pending(group_commit_lock::value_type num) const unsigned int MAX_SPINS = 1; /** max spins in acquire */ thread_local group_commit_waiter_t thread_local_waiter; -group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) +static inline void do_completion_callback(const completion_callback* cb) +{ + if (cb) + cb->m_callback(cb->m_param); +} + +group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback) { unsigned int spins = MAX_SPINS; @@ -197,6 +204,7 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) if (num <= value()) { /* No need to wait.*/ + do_completion_callback(callback); return lock_return_code::EXPIRED; } @@ -212,14 +220,18 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) } thread_local_waiter.m_value = num; + thread_local_waiter.m_group_commit_leader= false; std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock); - while (num > value()) + while (num > value() || thread_local_waiter.m_group_commit_leader) { lk.lock(); /* Re-read current value after acquiring the lock*/ - if (num <= value()) + if (num <= value() && + (!thread_local_waiter.m_group_commit_leader || m_lock)) { + lk.unlock(); + do_completion_callback(callback); return lock_return_code::EXPIRED; } @@ -230,10 +242,28 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) #ifndef DBUG_OFF m_owner_id = std::this_thread::get_id(); #endif + if (callback) + m_pending_callbacks.push_back({num,*callback}); return lock_return_code::ACQUIRED; } + if (callback && (m_waiters_list || num <= pending())) + { + /* + If num > pending(), we have a good candidate for the next group + commit lead, that will be taking over the lock after current owner + releases it. We put current thread into waiter's list so it sleeps + and can be signaled and marked as group commit lead during lock release. + + For this to work well, pending() must deliver a good approximation for N + in the next call to group_commit_lock::release(N). + */ + m_pending_callbacks.push_back({num, *callback}); + return lock_return_code::CALLBACK_QUEUED; + } + /* Add yourself to waiters list.*/ + thread_local_waiter.m_group_commit_leader= false; thread_local_waiter.m_next = m_waiters_list; m_waiters_list = &thread_local_waiter; lk.unlock(); @@ -244,11 +274,15 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) thd_wait_end(0); } + do_completion_callback(callback); return lock_return_code::EXPIRED; } -void group_commit_lock::release(value_type num) +group_commit_lock::value_type group_commit_lock::release(value_type num) { + completion_callback callbacks[1000]; + size_t callback_count = 0; + value_type ret = 0; std::unique_lock<std::mutex> lk(m_mtx); m_lock = false; @@ -262,12 +296,21 @@ void group_commit_lock::release(value_type num) */ group_commit_waiter_t* cur, * prev, * next; group_commit_waiter_t* wakeup_list = nullptr; - int extra_wake = 0; + for (auto& c : m_pending_callbacks) + { + if (c.first <= num) + { + if (callback_count < array_elements(callbacks)) + callbacks[callback_count++] = c.second; + else + c.second.m_callback(c.second.m_param); + } + } for (prev= nullptr, cur= m_waiters_list; cur; cur= next) { next= cur->m_next; - if (cur->m_value <= num || extra_wake++ == 0) + if (cur->m_value <= num) { /* Move current waiter to wakeup_list*/ @@ -291,13 +334,65 @@ void group_commit_lock::release(value_type num) prev= cur; } } + + auto it= std::remove_if( + m_pending_callbacks.begin(), m_pending_callbacks.end(), + [num](const pending_cb &c) { return c.first <= num; }); + + m_pending_callbacks.erase(it, m_pending_callbacks.end()); + + if (m_pending_callbacks.size() || m_waiters_list) + { + /* + Ensure that after this thread released the lock, + there is a new group commit leader + We take this from waiters list or wakeup list. It + might look like a spurious wake, but in fact we just + ensure the waiter do not wait for eternity. + */ + if (m_waiters_list) + { + /* Move one waiter to wakeup list */ + auto e= m_waiters_list; + m_waiters_list= m_waiters_list->m_next; + e->m_next= wakeup_list; + e->m_group_commit_leader= true; + wakeup_list = e; + } + else if (wakeup_list) + { + wakeup_list->m_group_commit_leader=true; + } + else + { + /* Tell the caller that some pending callbacks left, and he should + do something to prevent stalls. This should be a rare situation.*/ + ret= m_pending_callbacks[0].first; + } + } + lk.unlock(); + /* + Release designated next group commit lead first, + to minimize spurious wakeups. + */ + if (wakeup_list && wakeup_list->m_group_commit_leader) + { + next = wakeup_list->m_next; + wakeup_list->m_sema.wake(); + wakeup_list= next; + } + + for (size_t i = 0; i < callback_count; i++) + callbacks[i].m_callback(callbacks[i].m_param); + for (cur= wakeup_list; cur; cur= next) { next= cur->m_next; cur->m_sema.wake(); } + return ret; } #ifndef DBUG_OFF |