summaryrefslogtreecommitdiff
path: root/storage/innobase/log/log0sync.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/log/log0sync.cc')
-rw-r--r--storage/innobase/log/log0sync.cc115
1 files changed, 105 insertions, 10 deletions
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc
index 2a6e1b8b853..6b14d1d3591 100644
--- a/storage/innobase/log/log0sync.cc
+++ b/storage/innobase/log/log0sync.cc
@@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative.
#include <log0types.h>
#include "log0sync.h"
#include <mysql/service_thd_wait.h>
+#include <sql_class.h>
/**
Helper class , used in group commit lock.
@@ -158,10 +159,10 @@ void binary_semaphore::wake()
/* A thread helper structure, used in group commit lock below*/
struct group_commit_waiter_t
{
- lsn_t m_value;
- binary_semaphore m_sema;
- group_commit_waiter_t* m_next;
- group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
+ lsn_t m_value=0;
+ binary_semaphore m_sema{};
+ group_commit_waiter_t* m_next= nullptr;
+ bool m_group_commit_leader=false;
};
group_commit_lock::group_commit_lock() :
@@ -188,7 +189,13 @@ void group_commit_lock::set_pending(group_commit_lock::value_type num)
const unsigned int MAX_SPINS = 1; /** max spins in acquire */
thread_local group_commit_waiter_t thread_local_waiter;
-group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
+static inline void do_completion_callback(const completion_callback* cb)
+{
+ if (cb)
+ cb->m_callback(cb->m_param);
+}
+
+group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
{
unsigned int spins = MAX_SPINS;
@@ -197,6 +204,7 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
if (num <= value())
{
/* No need to wait.*/
+ do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
@@ -212,14 +220,18 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
}
thread_local_waiter.m_value = num;
+ thread_local_waiter.m_group_commit_leader= false;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
- while (num > value())
+ while (num > value() || thread_local_waiter.m_group_commit_leader)
{
lk.lock();
/* Re-read current value after acquiring the lock*/
- if (num <= value())
+ if (num <= value() &&
+ (!thread_local_waiter.m_group_commit_leader || m_lock))
{
+ lk.unlock();
+ do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
@@ -230,10 +242,28 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
#ifndef DBUG_OFF
m_owner_id = std::this_thread::get_id();
#endif
+ if (callback)
+ m_pending_callbacks.push_back({num,*callback});
return lock_return_code::ACQUIRED;
}
+ if (callback && (m_waiters_list || num <= pending()))
+ {
+ /*
+ If num > pending(), we have a good candidate for the next group
+ commit lead, that will be taking over the lock after current owner
+ releases it. We put current thread into waiter's list so it sleeps
+ and can be signaled and marked as group commit lead during lock release.
+
+ For this to work well, pending() must deliver a good approximation for N
+ in the next call to group_commit_lock::release(N).
+ */
+ m_pending_callbacks.push_back({num, *callback});
+ return lock_return_code::CALLBACK_QUEUED;
+ }
+
/* Add yourself to waiters list.*/
+ thread_local_waiter.m_group_commit_leader= false;
thread_local_waiter.m_next = m_waiters_list;
m_waiters_list = &thread_local_waiter;
lk.unlock();
@@ -244,11 +274,15 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
thd_wait_end(0);
}
+ do_completion_callback(callback);
return lock_return_code::EXPIRED;
}
-void group_commit_lock::release(value_type num)
+group_commit_lock::value_type group_commit_lock::release(value_type num)
{
+ completion_callback callbacks[1000];
+ size_t callback_count = 0;
+ value_type ret = 0;
std::unique_lock<std::mutex> lk(m_mtx);
m_lock = false;
@@ -262,12 +296,21 @@ void group_commit_lock::release(value_type num)
*/
group_commit_waiter_t* cur, * prev, * next;
group_commit_waiter_t* wakeup_list = nullptr;
- int extra_wake = 0;
+ for (auto& c : m_pending_callbacks)
+ {
+ if (c.first <= num)
+ {
+ if (callback_count < array_elements(callbacks))
+ callbacks[callback_count++] = c.second;
+ else
+ c.second.m_callback(c.second.m_param);
+ }
+ }
for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
{
next= cur->m_next;
- if (cur->m_value <= num || extra_wake++ == 0)
+ if (cur->m_value <= num)
{
/* Move current waiter to wakeup_list*/
@@ -291,13 +334,65 @@ void group_commit_lock::release(value_type num)
prev= cur;
}
}
+
+ auto it= std::remove_if(
+ m_pending_callbacks.begin(), m_pending_callbacks.end(),
+ [num](const pending_cb &c) { return c.first <= num; });
+
+ m_pending_callbacks.erase(it, m_pending_callbacks.end());
+
+ if (m_pending_callbacks.size() || m_waiters_list)
+ {
+ /*
+ Ensure that after this thread released the lock,
+ there is a new group commit leader
+ We take this from waiters list or wakeup list. It
+ might look like a spurious wake, but in fact we just
+ ensure the waiter do not wait for eternity.
+ */
+ if (m_waiters_list)
+ {
+ /* Move one waiter to wakeup list */
+ auto e= m_waiters_list;
+ m_waiters_list= m_waiters_list->m_next;
+ e->m_next= wakeup_list;
+ e->m_group_commit_leader= true;
+ wakeup_list = e;
+ }
+ else if (wakeup_list)
+ {
+ wakeup_list->m_group_commit_leader=true;
+ }
+ else
+ {
+ /* Tell the caller that some pending callbacks left, and he should
+ do something to prevent stalls. This should be a rare situation.*/
+ ret= m_pending_callbacks[0].first;
+ }
+ }
+
lk.unlock();
+ /*
+ Release designated next group commit lead first,
+ to minimize spurious wakeups.
+ */
+ if (wakeup_list && wakeup_list->m_group_commit_leader)
+ {
+ next = wakeup_list->m_next;
+ wakeup_list->m_sema.wake();
+ wakeup_list= next;
+ }
+
+ for (size_t i = 0; i < callback_count; i++)
+ callbacks[i].m_callback(callbacks[i].m_param);
+
for (cur= wakeup_list; cur; cur= next)
{
next= cur->m_next;
cur->m_sema.wake();
}
+ return ret;
}
#ifndef DBUG_OFF