summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2020-02-07 22:12:35 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2020-03-01 19:02:21 +0100
commit30ea63b7d2077883713e63cbf4e661ba0345bf68 (patch)
tree043b5c2c777f4f17292a86ddf01d1c4715174d97
parent607960c7722b083652e0e39eb634181aa4ddaf8e (diff)
downloadmariadb-git-bb-10.5-MDEV-21534.tar.gz
MDEV-21534 - Improve innodb redo log group commit performancebb-10.5-MDEV-21534
Introduce special synchronization primitive group_commit_lock for more efficient synchronization of redo log writing and flushing. The goal is to reduce CPU consumption on log_write_up_to, to reduce the spurious wakeups, and improve the throughput in write-intensive benchmarks.
-rw-r--r--cmake/os/Windows.cmake6
-rw-r--r--storage/innobase/CMakeLists.txt1
-rw-r--r--storage/innobase/include/log0log.h37
-rw-r--r--storage/innobase/log/log0log.cc217
-rw-r--r--storage/innobase/log/log0sync.cc306
-rw-r--r--storage/innobase/log/log0sync.h81
-rw-r--r--storage/innobase/srv/srv0mon.cc5
-rw-r--r--storage/innobase/srv/srv0start.cc7
8 files changed, 499 insertions, 161 deletions
diff --git a/cmake/os/Windows.cmake b/cmake/os/Windows.cmake
index 08d231068de..1fbb1774bf7 100644
--- a/cmake/os/Windows.cmake
+++ b/cmake/os/Windows.cmake
@@ -203,9 +203,9 @@ IF(MSVC)
ENDIF()
ENDIF()
-# Always link with socket library
-STRING(APPEND CMAKE_C_STANDARD_LIBRARIES " ws2_32.lib")
-STRING(APPEND CMAKE_CXX_STANDARD_LIBRARIES " ws2_32.lib")
+# Always link with socket/synchronization libraries
+STRING(APPEND CMAKE_C_STANDARD_LIBRARIES " ws2_32.lib synchronization.lib")
+STRING(APPEND CMAKE_CXX_STANDARD_LIBRARIES " ws2_32.lib synchronization.lib")
# System checks
SET(SIGNAL_WITH_VIO_CLOSE 1) # Something that runtime team needs
diff --git a/storage/innobase/CMakeLists.txt b/storage/innobase/CMakeLists.txt
index e4286aac6a0..4e99cbc1766 100644
--- a/storage/innobase/CMakeLists.txt
+++ b/storage/innobase/CMakeLists.txt
@@ -84,6 +84,7 @@ SET(INNOBASE_SOURCES
log/log0log.cc
log/log0recv.cc
log/log0crypt.cc
+ log/log0sync.cc
mem/mem0mem.cc
mtr/mtr0mtr.cc
os/os0file.cc
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h
index 3e87e03f646..69d6064f49f 100644
--- a/storage/innobase/include/log0log.h
+++ b/storage/innobase/include/log0log.h
@@ -577,8 +577,6 @@ struct log_t{
MY_ALIGNED(CACHE_LINE_SIZE)
LogSysMutex mutex; /*!< mutex protecting the log */
MY_ALIGNED(CACHE_LINE_SIZE)
- LogSysMutex write_mutex; /*!< mutex protecting writing to log */
- MY_ALIGNED(CACHE_LINE_SIZE)
FlushOrderMutex log_flush_order_mutex;/*!< mutex to serialize access to
the flush list when we are putting
dirty blocks in the list. The idea
@@ -710,13 +708,7 @@ struct log_t{
AND flushed to disk */
std::atomic<size_t> pending_flushes; /*!< system calls in progress */
std::atomic<size_t> flushes; /*!< system calls counter */
- ulint n_pending_flushes;/*!< number of currently
- pending flushes; protected by
- log_sys.mutex */
- os_event_t flush_event; /*!< this event is in the reset state
- when a flush is running;
- os_event_set() and os_event_reset()
- are protected by log_sys.mutex */
+
ulint n_log_ios; /*!< number of log i/os initiated thus
far */
ulint n_log_ios_old; /*!< number of log i/o's at the
@@ -834,6 +826,9 @@ public:
/** Redo log system */
extern log_t log_sys;
+#ifdef UNIV_DEBUG
+extern bool log_write_lock_own();
+#endif
/** Gets the log capacity. It is OK to read the value without
holding log_sys.mutex because it is constant.
@@ -848,7 +843,7 @@ inline lsn_t log_t::file::calc_lsn_offset(lsn_t lsn) const
ut_ad(this == &log_sys.log);
/* The lsn parameters are updated while holding both the mutexes
and it is ok to have either of them while reading */
- ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned());
+ ut_ad(log_sys.mutex.is_owned() || log_write_lock_own());
const lsn_t size = capacity();
lsn_t l= lsn - this->lsn;
if (longlong(l) < 0) {
@@ -862,12 +857,12 @@ inline lsn_t log_t::file::calc_lsn_offset(lsn_t lsn) const
}
inline void log_t::file::set_lsn(lsn_t a_lsn) {
- ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned());
+ ut_ad(log_sys.mutex.is_owned() || log_write_lock_own());
lsn = a_lsn;
}
inline void log_t::file::set_lsn_offset(lsn_t a_lsn) {
- ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned());
+ ut_ad(log_sys.mutex.is_owned() || log_write_lock_own());
ut_ad((lsn % OS_FILE_LOG_BLOCK_SIZE) == (a_lsn % OS_FILE_LOG_BLOCK_SIZE));
lsn_offset = a_lsn;
}
@@ -888,32 +883,14 @@ inline void log_t::file::set_lsn_offset(lsn_t a_lsn) {
/** Test if log sys mutex is owned. */
#define log_mutex_own() mutex_own(&log_sys.mutex)
-/** Test if log sys write mutex is owned. */
-#define log_write_mutex_own() mutex_own(&log_sys.write_mutex)
/** Acquire the log sys mutex. */
#define log_mutex_enter() mutex_enter(&log_sys.mutex)
-/** Acquire the log sys write mutex. */
-#define log_write_mutex_enter() mutex_enter(&log_sys.write_mutex)
-
-/** Acquire all the log sys mutexes. */
-#define log_mutex_enter_all() do { \
- mutex_enter(&log_sys.write_mutex); \
- mutex_enter(&log_sys.mutex); \
-} while (0)
/** Release the log sys mutex. */
#define log_mutex_exit() mutex_exit(&log_sys.mutex)
-/** Release the log sys write mutex.*/
-#define log_write_mutex_exit() mutex_exit(&log_sys.write_mutex)
-
-/** Release all the log sys mutexes. */
-#define log_mutex_exit_all() do { \
- mutex_exit(&log_sys.mutex); \
- mutex_exit(&log_sys.write_mutex); \
-} while (0)
/* log scrubbing speed, in bytes/sec */
extern ulonglong innodb_scrub_log_speed;
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 689d24083b5..178251843b4 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -54,6 +54,7 @@ Created 12/9/1995 Heikki Tuuri
#include "srv0mon.h"
#include "sync0sync.h"
#include "buf0dump.h"
+#include "log0sync.h"
/*
General philosophy of InnoDB redo-logs:
@@ -509,7 +510,6 @@ void log_t::create()
m_initialised= true;
mutex_create(LATCH_ID_LOG_SYS, &mutex);
- mutex_create(LATCH_ID_LOG_WRITE, &write_mutex);
mutex_create(LATCH_ID_LOG_FLUSH_ORDER, &log_flush_order_mutex);
/* Start the lsn from one log block from zero: this way every
@@ -535,9 +535,6 @@ void log_t::create()
buf_next_to_write= 0;
write_lsn= lsn;
flushed_to_disk_lsn= 0;
- n_pending_flushes= 0;
- flush_event = os_event_create("log_flush_event");
- os_event_set(flush_event);
n_log_ios= 0;
n_log_ios_old= 0;
log_capacity= 0;
@@ -856,7 +853,7 @@ log_file_header_flush(
lsn_t start_lsn) /*!< in: log file data starts at this
lsn */
{
- ut_ad(log_write_mutex_own());
+ ut_ad(log_write_lock_own());
ut_ad(!recv_no_log_write);
ut_ad(log_sys.log.format == log_t::FORMAT_10_5
|| log_sys.log.format == log_t::FORMAT_ENC_10_5);
@@ -909,7 +906,7 @@ log_write_buf(
lsn_t next_offset;
ulint i;
- ut_ad(log_write_mutex_own());
+ ut_ad(log_write_lock_own());
ut_ad(!recv_no_log_write);
ut_a(len % OS_FILE_LOG_BLOCK_SIZE == 0);
ut_a(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
@@ -1002,20 +999,14 @@ loop:
and invoke log_mutex_enter(). */
static
void
-log_write_flush_to_disk_low()
+log_write_flush_to_disk_low(lsn_t lsn)
{
- /* FIXME: This is not holding log_sys.mutex while
- calling os_event_set()! */
- ut_a(log_sys.n_pending_flushes == 1); /* No other threads here */
-
log_sys.log.flush_data_only();
log_mutex_enter();
- log_sys.flushed_to_disk_lsn = log_sys.current_flush_lsn;
-
- log_sys.n_pending_flushes--;
-
- os_event_set(log_sys.flush_event);
+ ut_a(lsn >= log_sys.flushed_to_disk_lsn);
+ log_sys.flushed_to_disk_lsn = lsn;
+ log_mutex_exit();
}
/** Switch the log buffer in use, and copy the content of last block
@@ -1026,7 +1017,7 @@ void
log_buffer_switch()
{
ut_ad(log_mutex_own());
- ut_ad(log_write_mutex_own());
+ ut_ad(log_write_lock_own());
const byte* old_buf = log_sys.buf;
ulong area_end = ut_calc_align(
@@ -1053,86 +1044,24 @@ log_buffer_switch()
log_sys.buf_next_to_write = log_sys.buf_free;
}
-/** Ensure that the log has been written to the log file up to a given
-log entry (such as that of a transaction commit). Start a new write, or
-wait and check if an already running write is covering the request.
-@param[in] lsn log sequence number that should be
-included in the redo log file write
-@param[in] flush_to_disk whether the written log should also
-be flushed to the file system
-@param[in] rotate_key whether to rotate the encryption key */
-void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
-{
-#ifdef UNIV_DEBUG
- ulint loop_count = 0;
-#endif /* UNIV_DEBUG */
- byte* write_buf;
- lsn_t write_lsn;
-
- ut_ad(!srv_read_only_mode);
- ut_ad(!rotate_key || flush_to_disk);
-
- if (recv_no_ibuf_operations) {
- /* Recovery is running and no operations on the log file are
- allowed yet (the variable name .._no_ibuf_.. is misleading) */
+/**
+Writes log buffer to disk
+which is the "write" part of log_write_up_to().
- return;
- }
+This function does not flush anything.
-loop:
- ut_ad(++loop_count < 128);
-
-#if UNIV_WORD_SIZE > 7
- /* We can do a dirty read of LSN. */
- /* NOTE: Currently doesn't do dirty read for
- (flush_to_disk == true) case, because the log_mutex
- contention also works as the arbitrator for write-IO
- (fsync) bandwidth between log file and data files. */
- if (!flush_to_disk && log_sys.write_lsn >= lsn) {
- return;
- }
-#endif
+Note : the caller must have log_mutex locked, and this
+mutex is released in the function.
- log_write_mutex_enter();
+*/
+static void log_write(bool rotate_key)
+{
+ ut_ad(log_mutex_own());
ut_ad(!recv_no_log_write);
-
- lsn_t limit_lsn = flush_to_disk
- ? log_sys.flushed_to_disk_lsn
- : log_sys.write_lsn;
-
- if (limit_lsn >= lsn) {
- log_write_mutex_exit();
- return;
- }
-
- /* If it is a write call we should just go ahead and do it
- as we checked that write_lsn is not where we'd like it to
- be. If we have to flush as well then we check if there is a
- pending flush and based on that we wait for it to finish
- before proceeding further. */
- if (flush_to_disk
- && (log_sys.n_pending_flushes > 0
- || !os_event_is_set(log_sys.flush_event))) {
- /* Figure out if the current flush will do the job
- for us. */
- bool work_done = log_sys.current_flush_lsn >= lsn;
-
- log_write_mutex_exit();
-
- os_event_wait(log_sys.flush_event);
-
- if (work_done) {
- return;
- } else {
- goto loop;
- }
- }
-
- log_mutex_enter();
- if (!flush_to_disk
- && log_sys.buf_free == log_sys.buf_next_to_write) {
- /* Nothing to write and no flush to disk requested */
- log_mutex_exit_all();
+ lsn_t write_lsn;
+ if (log_sys.buf_free == log_sys.buf_next_to_write) {
+ /* Nothing to write */
+ log_mutex_exit();
return;
}
@@ -1146,19 +1075,7 @@ loop:
DBUG_PRINT("ib_log", ("write " LSN_PF " to " LSN_PF,
log_sys.write_lsn,
log_sys.lsn));
- if (flush_to_disk) {
- log_sys.n_pending_flushes++;
- log_sys.current_flush_lsn = log_sys.lsn;
- os_event_reset(log_sys.flush_event);
-
- if (log_sys.buf_free == log_sys.buf_next_to_write) {
- /* Nothing to write, flush only */
- log_mutex_exit_all();
- log_write_flush_to_disk_low();
- log_mutex_exit();
- return;
- }
- }
+
start_offset = log_sys.buf_next_to_write;
end_offset = log_sys.buf_free;
@@ -1175,7 +1092,7 @@ loop:
log_sys.next_checkpoint_no);
write_lsn = log_sys.lsn;
- write_buf = log_sys.buf;
+ byte *write_buf = log_sys.buf;
log_buffer_switch();
@@ -1209,8 +1126,7 @@ loop:
if (UNIV_UNLIKELY(srv_shutdown_state != SRV_SHUTDOWN_NONE)) {
service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
"InnoDB log write: "
- LSN_PF "," LSN_PF,
- log_sys.write_lsn, lsn);
+ LSN_PF, log_sys.write_lsn);
}
if (log_sys.is_encrypted()) {
@@ -1230,16 +1146,76 @@ loop:
start_offset - area_start);
srv_stats.log_padded.add(pad_size);
log_sys.write_lsn = write_lsn;
+ if (log_sys.log.writes_are_durable())
+ log_sys.flushed_to_disk_lsn = write_lsn;
+ return;
+}
- log_write_mutex_exit();
+static group_commit_lock write_lock;
+static group_commit_lock flush_lock;
- if (flush_to_disk) {
- log_write_flush_to_disk_low();
- ib_uint64_t flush_lsn = log_sys.flushed_to_disk_lsn;
- log_mutex_exit();
+#ifdef UNIV_DEBUG
+bool log_write_lock_own()
+{
+ return write_lock.is_owner();
+}
+#endif
- innobase_mysql_log_notify(flush_lsn);
- }
+/** Ensure that the log has been written to the log file up to a given
+log entry (such as that of a transaction commit). Start a new write, or
+wait and check if an already running write is covering the request.
+@param[in] lsn log sequence number that should be
+included in the redo log file write
+@param[in] flush_to_disk whether the written log should also
+be flushed to the file system
+@param[in] rotate_key whether to rotate the encryption key */
+void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
+{
+ ut_ad(!srv_read_only_mode);
+ ut_ad(!rotate_key || flush_to_disk);
+
+ if (recv_no_ibuf_operations)
+ {
+ /* Recovery is running and no operations on the log files are
+ allowed yet (the variable name .._no_ibuf_.. is misleading) */
+ return;
+ }
+
+ if (flush_to_disk &&
+ flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED)
+ {
+ return;
+ }
+
+ if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED)
+ {
+ log_mutex_enter();
+ auto write_lsn = log_sys.lsn;
+ write_lock.set_pending(write_lsn);
+
+ log_write(rotate_key);
+
+ ut_a(log_sys.write_lsn == write_lsn);
+ write_lock.release(write_lsn);
+ }
+
+ if (!flush_to_disk)
+ {
+ return;
+ }
+
+ /* Flush the highest written lsn.*/
+ auto flush_lsn = write_lock.value();
+ flush_lock.set_pending(flush_lsn);
+
+ if (!log_sys.log.writes_are_durable())
+ {
+ log_write_flush_to_disk_low(flush_lsn);
+ }
+
+ flush_lock.release(flush_lsn);
+
+ innobase_mysql_log_notify(flush_lsn);
}
/** write to the log file up to the last log entry.
@@ -1270,8 +1246,7 @@ log_buffer_sync_in_background(
lsn = log_sys.lsn;
if (flush
- && log_sys.n_pending_flushes > 0
- && log_sys.current_flush_lsn >= lsn) {
+ && log_sys.flushed_to_disk_lsn >= lsn) {
/* The write + flush will write enough */
log_mutex_exit();
return;
@@ -1836,7 +1811,7 @@ wait_suspend_loop:
if (log_sys.is_initialised()) {
log_mutex_enter();
const ulint n_write = log_sys.n_pending_checkpoint_writes;
- const ulint n_flush = log_sys.n_pending_flushes;
+ const ulint n_flush = log_sys.pending_flushes;
log_mutex_exit();
if (log_scrub_thread_active || n_write || n_flush) {
@@ -2011,7 +1986,7 @@ log_print(
ULINTPF " pending log flushes, "
ULINTPF " pending chkp writes\n"
ULINTPF " log i/o's done, %.2f log i/o's/second\n",
- log_sys.n_pending_flushes,
+ log_sys.pending_flushes.load(),
log_sys.n_pending_checkpoint_writes,
log_sys.n_log_ios,
static_cast<double>(
@@ -2047,9 +2022,7 @@ void log_t::close()
ut_free_dodump(buf, srv_log_buffer_size * 2);
buf = NULL;
- os_event_destroy(flush_event);
mutex_free(&mutex);
- mutex_free(&write_mutex);
mutex_free(&log_flush_order_mutex);
if (!srv_read_only_mode && srv_scrub_log)
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc
new file mode 100644
index 00000000000..cbac3692f26
--- /dev/null
+++ b/storage/innobase/log/log0sync.cc
@@ -0,0 +1,306 @@
+/*****************************************************************************
+Copyright (c) 2020 MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/*
+The group commit synchronization used in log_write_up_to()
+works as follows
+
+For simplicity, lets consider only write operation,synchronozation of
+flush operation works the same.
+
+Rules of the game
+
+A thread enters log_write_up_to() with lsn of the current transaction
+1. If last written lsn is greater than wait lsn (another thread already
+ wrote the log buffer),then there is no need to do anything.
+2. If no other thread is currently writing, write the log buffer,
+ and update last written lsn.
+3. Otherwise, wait, and go to step 1.
+
+Synchronization can be done in different ways, e.g
+
+a) Simple mutex locking the entire check and write operation
+Disadvantage that threads that could continue after updating
+last written lsn, still wait.
+
+b) Spinlock, with periodic checks for last written lsn.
+Fixes a) but burns CPU unnecessary.
+
+c) Mutex / condition variable combo.
+
+Condtion variable notifies (broadcast) all waiters, whenever
+last written lsn is changed.
+
+Has a disadvantage of many suprious wakeups, stress on OS scheduler,
+and mutex contention.
+
+d) Something else.
+Make use of the waiter's lsn parameter, and only wakeup "right" waiting
+threads.
+
+We chose d). Even if implementation is more complicated than alternatves
+due to the need to maintain list of waiters, it provides the best performance.
+
+See group_commit_lock implementation for details.
+
+Note that if write operation is very fast, a) or b) can be fine as alternative.
+*/
+#ifdef _WIN32
+#include <windows.h>
+#endif
+
+#ifdef __linux__
+#include <linux/futex.h>
+#include <sys/syscall.h>
+#endif
+
+#include <atomic>
+#include <thread>
+#include <mutex>
+#include <condition_variable>
+#include <my_cpu.h>
+
+#include <log0types.h>
+#include "log0sync.h"
+
+/**
+ Helper class , used in group commit lock.
+
+ Binary semaphore, or (same thing), an auto-reset event
+ Has state (signalled or not), and provides 2 operations.
+ wait() and wake()
+
+ The implementation uses efficient locking primitives on Linux and Windows.
+ Or, mutex/condition combo elsewhere.
+*/
+
+class binary_semaphore
+{
+public:
+ /**Wait until semaphore becomes signalled, and atomically reset the state
+ to non-signalled*/
+ void wait();
+ /** signals the semaphore */
+ void wake();
+
+private:
+#if defined(__linux__) || defined (_WIN32)
+ std::atomic<int> m_signalled;
+ const std::memory_order mem_order = std::memory_order::memory_order_acq_rel;
+public:
+ binary_semaphore() :m_signalled(0) {}
+#else
+ std::mutex m_mtx{};
+ std::condition_variable m_cv{};
+ bool m_signalled = false;
+#endif
+};
+
+#if defined (__linux__) || defined (_WIN32)
+void binary_semaphore::wait()
+{
+ for (;;)
+ {
+ if (m_signalled.exchange(0, mem_order) == 1)
+ {
+ break;
+ }
+#ifdef _WIN32
+ int zero = 0;
+ WaitOnAddress(&m_signalled, &zero, sizeof(m_signalled), INFINITE);
+#else
+ syscall(SYS_futex, &m_signalled, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);
+#endif
+ }
+}
+
+void binary_semaphore::wake()
+{
+ if (m_signalled.exchange(1, mem_order) == 0)
+ {
+#ifdef _WIN32
+ WakeByAddressSingle(&m_signalled);
+#else
+ syscall(SYS_futex, &m_signalled, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);
+#endif
+ }
+}
+#else
+void binary_semaphore::wake()
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ while (!m_signalled)
+ m_cv.wait(lk);
+ m_signalled = false;
+}
+void binary_semaphore::wake()
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_signalled = true;
+ m_cv.notify_one();
+}
+#endif
+
+/* A thread helper structure, used in group commit lock below*/
+struct group_commit_waiter_t
+{
+ lsn_t m_value;
+ binary_semaphore m_sema;
+ group_commit_waiter_t* m_next;
+ group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
+};
+
+group_commit_lock::group_commit_lock() :
+ m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list()
+{
+}
+
+group_commit_lock::value_type group_commit_lock::value() const
+{
+ return m_value.load(std::memory_order::memory_order_relaxed);
+}
+
+group_commit_lock::value_type group_commit_lock::pending() const
+{
+ return m_pending_value.load(std::memory_order::memory_order_relaxed);
+}
+
+void group_commit_lock::set_pending(group_commit_lock::value_type num)
+{
+ ut_a(num >= value());
+ m_pending_value.store(num, std::memory_order::memory_order_relaxed);
+}
+
+const unsigned int MAX_SPINS = 1; /** max spins in acquire */
+thread_local group_commit_waiter_t thread_local_waiter;
+
+group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
+{
+ unsigned int spins = MAX_SPINS;
+
+ for(;;)
+ {
+ if (num <= value())
+ {
+ /* No need to wait.*/
+ return lock_return_code::EXPIRED;
+ }
+
+ if(spins-- == 0)
+ break;
+ if (num > pending())
+ {
+ /* Longer wait expected (longer than currently running operation),
+ don't spin.*/
+ break;
+ }
+ ut_delay(1);
+ }
+
+ thread_local_waiter.m_value = num;
+ std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
+ while (num > value())
+ {
+ lk.lock();
+
+ /* Re-read current value after acquiring the lock*/
+ if (num <= value())
+ {
+ return lock_return_code::EXPIRED;
+ }
+
+ if (!m_lock)
+ {
+ /* Take the lock, become group commit leader.*/
+ m_lock = true;
+#ifndef DBUG_OFF
+ m_owner_id = std::this_thread::get_id();
+#endif
+ return lock_return_code::ACQUIRED;
+ }
+
+ /* Add yourself to waiters list.*/
+ thread_local_waiter.m_next = m_waiters_list;
+ m_waiters_list = &thread_local_waiter;
+ lk.unlock();
+
+ /* Sleep until woken in release().*/
+ thread_local_waiter.m_sema.wait();
+ }
+ return lock_return_code::EXPIRED;
+}
+
+void group_commit_lock::release(value_type num)
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_lock = false;
+
+ /* Update current value. */
+ ut_a(num >= value());
+ m_value.store(num, std::memory_order_relaxed);
+
+ /*
+ Wake waiters for value <= current value.
+ Wake one more waiter, who will become the group commit lead.
+ */
+ group_commit_waiter_t* cur, * prev, * next;
+ group_commit_waiter_t* wakeup_list = nullptr;
+ int extra_wake = 0;
+
+ for (cur = m_waiters_list; cur; cur = next)
+ {
+ next = cur->m_next;
+ if (cur->m_value <= num || extra_wake++ == 0)
+ {
+ /* Move current waiter to wakeup_list*/
+
+ if (cur == m_waiters_list)
+ {
+ /* Remove from the start of the list.*/
+ m_waiters_list = next;
+ }
+ else
+ {
+ /* Remove from the middle of the list.*/
+ prev->m_next = cur->m_next;
+ }
+
+ /* Append entry to the wakeup list.*/
+ cur->m_next = wakeup_list;
+ wakeup_list = cur;
+ }
+ else
+ {
+ prev = cur;
+ }
+ }
+ lk.unlock();
+
+ for (cur = wakeup_list; cur; cur = next)
+ {
+ next = cur->m_next;
+ cur->m_sema.wake();
+ }
+}
+
+#ifndef DBUG_OFF
+bool group_commit_lock::is_owner()
+{
+ return m_lock && std::this_thread::get_id() == m_owner_id;
+}
+#endif
+
diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h
new file mode 100644
index 00000000000..40afbf74ecd
--- /dev/null
+++ b/storage/innobase/log/log0sync.h
@@ -0,0 +1,81 @@
+/*****************************************************************************
+Copyright (c) 2020 MariaDB Corporation.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+#include <atomic>
+#include <thread>
+#include <log0types.h>
+
+struct group_commit_waiter_t;
+
+/**
+Special synchronization primitive, which is helpful for
+performing group commit.
+
+It has a state consisting of
+ - locked (bool)
+ - current value (number). This value is always increasing.
+ - pending value (number). current value can soon become this number
+ This is only used for optimization, does not have to be exact
+
+Operations supported on this semaphore
+
+1.acquire(num):
+- waits until current value exceeds num, or until lock is granted.
+
+- returns EXPIRED if current_value >= num,
+ or ACQUIRED, if current_value < num and lock is granted.
+
+2.release(num)
+- releases lock
+- sets new current value to max(num,current_value)
+- releases some threads waiting in acquire()
+
+3. value()
+- read current value
+
+4. pending_value()
+- read pending value
+
+5. set_pending_value()
+*/
+class group_commit_lock
+{
+ using value_type = lsn_t;
+#ifndef DBUG_OFF
+ std::thread::id m_owner_id{};
+#endif
+ std::mutex m_mtx;
+ std::atomic<value_type> m_value;
+ std::atomic<value_type> m_pending_value;
+ bool m_lock;
+ group_commit_waiter_t* m_waiters_list;
+public:
+ group_commit_lock();
+ enum lock_return_code
+ {
+ ACQUIRED,
+ EXPIRED
+ };
+ lock_return_code acquire(value_type num);
+ void release(value_type num);
+ value_type value() const;
+ value_type pending() const;
+ void set_pending(value_type num);
+#ifndef DBUG_OFF
+ bool is_owner();
+#endif
+};
diff --git a/storage/innobase/srv/srv0mon.cc b/storage/innobase/srv/srv0mon.cc
index 8397d3a8ea3..ac5863d8efc 100644
--- a/storage/innobase/srv/srv0mon.cc
+++ b/storage/innobase/srv/srv0mon.cc
@@ -1976,9 +1976,8 @@ srv_mon_process_existing_counter(
break;
case MONITOR_PENDING_LOG_FLUSH:
- mutex_enter(&log_sys.mutex);
- value = static_cast<mon_type_t>(log_sys.n_pending_flushes);
- mutex_exit(&log_sys.mutex);
+ value = static_cast<mon_type_t>(log_sys.pending_flushes);
+
break;
case MONITOR_PENDING_CHECKPOINT_WRITE:
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc
index b6833543f44..65c89d718e8 100644
--- a/storage/innobase/srv/srv0start.cc
+++ b/storage/innobase/srv/srv0start.cc
@@ -1037,11 +1037,12 @@ static lsn_t srv_prepare_to_delete_redo_log_file(bool old_exists)
}
srv_start_lsn = flushed_lsn;
- /* Flush the old log file. */
+ bool do_flush_logs = flushed_lsn != log_sys.flushed_to_disk_lsn;
log_mutex_exit();
- log_write_up_to(flushed_lsn, true);
-
+ if (do_flush_logs) {
+ log_write_up_to(flushed_lsn, false);
+ }
log_sys.log.flush_data_only();
ut_ad(flushed_lsn == log_get_lsn());