diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2020-02-07 22:12:35 +0100 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2020-03-01 19:02:21 +0100 |
commit | 30ea63b7d2077883713e63cbf4e661ba0345bf68 (patch) | |
tree | 043b5c2c777f4f17292a86ddf01d1c4715174d97 | |
parent | 607960c7722b083652e0e39eb634181aa4ddaf8e (diff) | |
download | mariadb-git-bb-10.5-MDEV-21534.tar.gz |
MDEV-21534 - Improve innodb redo log group commit performancebb-10.5-MDEV-21534
Introduce special synchronization primitive group_commit_lock
for more efficient synchronization of redo log writing and flushing.
The goal is to reduce CPU consumption on log_write_up_to, to reduce
the spurious wakeups, and improve the throughput in write-intensive
benchmarks.
-rw-r--r-- | cmake/os/Windows.cmake | 6 | ||||
-rw-r--r-- | storage/innobase/CMakeLists.txt | 1 | ||||
-rw-r--r-- | storage/innobase/include/log0log.h | 37 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 217 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.cc | 306 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.h | 81 | ||||
-rw-r--r-- | storage/innobase/srv/srv0mon.cc | 5 | ||||
-rw-r--r-- | storage/innobase/srv/srv0start.cc | 7 |
8 files changed, 499 insertions, 161 deletions
diff --git a/cmake/os/Windows.cmake b/cmake/os/Windows.cmake index 08d231068de..1fbb1774bf7 100644 --- a/cmake/os/Windows.cmake +++ b/cmake/os/Windows.cmake @@ -203,9 +203,9 @@ IF(MSVC) ENDIF() ENDIF() -# Always link with socket library -STRING(APPEND CMAKE_C_STANDARD_LIBRARIES " ws2_32.lib") -STRING(APPEND CMAKE_CXX_STANDARD_LIBRARIES " ws2_32.lib") +# Always link with socket/synchronization libraries +STRING(APPEND CMAKE_C_STANDARD_LIBRARIES " ws2_32.lib synchronization.lib") +STRING(APPEND CMAKE_CXX_STANDARD_LIBRARIES " ws2_32.lib synchronization.lib") # System checks SET(SIGNAL_WITH_VIO_CLOSE 1) # Something that runtime team needs diff --git a/storage/innobase/CMakeLists.txt b/storage/innobase/CMakeLists.txt index e4286aac6a0..4e99cbc1766 100644 --- a/storage/innobase/CMakeLists.txt +++ b/storage/innobase/CMakeLists.txt @@ -84,6 +84,7 @@ SET(INNOBASE_SOURCES log/log0log.cc log/log0recv.cc log/log0crypt.cc + log/log0sync.cc mem/mem0mem.cc mtr/mtr0mtr.cc os/os0file.cc diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h index 3e87e03f646..69d6064f49f 100644 --- a/storage/innobase/include/log0log.h +++ b/storage/innobase/include/log0log.h @@ -577,8 +577,6 @@ struct log_t{ MY_ALIGNED(CACHE_LINE_SIZE) LogSysMutex mutex; /*!< mutex protecting the log */ MY_ALIGNED(CACHE_LINE_SIZE) - LogSysMutex write_mutex; /*!< mutex protecting writing to log */ - MY_ALIGNED(CACHE_LINE_SIZE) FlushOrderMutex log_flush_order_mutex;/*!< mutex to serialize access to the flush list when we are putting dirty blocks in the list. The idea @@ -710,13 +708,7 @@ struct log_t{ AND flushed to disk */ std::atomic<size_t> pending_flushes; /*!< system calls in progress */ std::atomic<size_t> flushes; /*!< system calls counter */ - ulint n_pending_flushes;/*!< number of currently - pending flushes; protected by - log_sys.mutex */ - os_event_t flush_event; /*!< this event is in the reset state - when a flush is running; - os_event_set() and os_event_reset() - are protected by log_sys.mutex */ + ulint n_log_ios; /*!< number of log i/os initiated thus far */ ulint n_log_ios_old; /*!< number of log i/o's at the @@ -834,6 +826,9 @@ public: /** Redo log system */ extern log_t log_sys; +#ifdef UNIV_DEBUG +extern bool log_write_lock_own(); +#endif /** Gets the log capacity. It is OK to read the value without holding log_sys.mutex because it is constant. @@ -848,7 +843,7 @@ inline lsn_t log_t::file::calc_lsn_offset(lsn_t lsn) const ut_ad(this == &log_sys.log); /* The lsn parameters are updated while holding both the mutexes and it is ok to have either of them while reading */ - ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned()); + ut_ad(log_sys.mutex.is_owned() || log_write_lock_own()); const lsn_t size = capacity(); lsn_t l= lsn - this->lsn; if (longlong(l) < 0) { @@ -862,12 +857,12 @@ inline lsn_t log_t::file::calc_lsn_offset(lsn_t lsn) const } inline void log_t::file::set_lsn(lsn_t a_lsn) { - ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned()); + ut_ad(log_sys.mutex.is_owned() || log_write_lock_own()); lsn = a_lsn; } inline void log_t::file::set_lsn_offset(lsn_t a_lsn) { - ut_ad(log_sys.mutex.is_owned() || log_sys.write_mutex.is_owned()); + ut_ad(log_sys.mutex.is_owned() || log_write_lock_own()); ut_ad((lsn % OS_FILE_LOG_BLOCK_SIZE) == (a_lsn % OS_FILE_LOG_BLOCK_SIZE)); lsn_offset = a_lsn; } @@ -888,32 +883,14 @@ inline void log_t::file::set_lsn_offset(lsn_t a_lsn) { /** Test if log sys mutex is owned. */ #define log_mutex_own() mutex_own(&log_sys.mutex) -/** Test if log sys write mutex is owned. */ -#define log_write_mutex_own() mutex_own(&log_sys.write_mutex) /** Acquire the log sys mutex. */ #define log_mutex_enter() mutex_enter(&log_sys.mutex) -/** Acquire the log sys write mutex. */ -#define log_write_mutex_enter() mutex_enter(&log_sys.write_mutex) - -/** Acquire all the log sys mutexes. */ -#define log_mutex_enter_all() do { \ - mutex_enter(&log_sys.write_mutex); \ - mutex_enter(&log_sys.mutex); \ -} while (0) /** Release the log sys mutex. */ #define log_mutex_exit() mutex_exit(&log_sys.mutex) -/** Release the log sys write mutex.*/ -#define log_write_mutex_exit() mutex_exit(&log_sys.write_mutex) - -/** Release all the log sys mutexes. */ -#define log_mutex_exit_all() do { \ - mutex_exit(&log_sys.mutex); \ - mutex_exit(&log_sys.write_mutex); \ -} while (0) /* log scrubbing speed, in bytes/sec */ extern ulonglong innodb_scrub_log_speed; diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 689d24083b5..178251843b4 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -54,6 +54,7 @@ Created 12/9/1995 Heikki Tuuri #include "srv0mon.h" #include "sync0sync.h" #include "buf0dump.h" +#include "log0sync.h" /* General philosophy of InnoDB redo-logs: @@ -509,7 +510,6 @@ void log_t::create() m_initialised= true; mutex_create(LATCH_ID_LOG_SYS, &mutex); - mutex_create(LATCH_ID_LOG_WRITE, &write_mutex); mutex_create(LATCH_ID_LOG_FLUSH_ORDER, &log_flush_order_mutex); /* Start the lsn from one log block from zero: this way every @@ -535,9 +535,6 @@ void log_t::create() buf_next_to_write= 0; write_lsn= lsn; flushed_to_disk_lsn= 0; - n_pending_flushes= 0; - flush_event = os_event_create("log_flush_event"); - os_event_set(flush_event); n_log_ios= 0; n_log_ios_old= 0; log_capacity= 0; @@ -856,7 +853,7 @@ log_file_header_flush( lsn_t start_lsn) /*!< in: log file data starts at this lsn */ { - ut_ad(log_write_mutex_own()); + ut_ad(log_write_lock_own()); ut_ad(!recv_no_log_write); ut_ad(log_sys.log.format == log_t::FORMAT_10_5 || log_sys.log.format == log_t::FORMAT_ENC_10_5); @@ -909,7 +906,7 @@ log_write_buf( lsn_t next_offset; ulint i; - ut_ad(log_write_mutex_own()); + ut_ad(log_write_lock_own()); ut_ad(!recv_no_log_write); ut_a(len % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); @@ -1002,20 +999,14 @@ loop: and invoke log_mutex_enter(). */ static void -log_write_flush_to_disk_low() +log_write_flush_to_disk_low(lsn_t lsn) { - /* FIXME: This is not holding log_sys.mutex while - calling os_event_set()! */ - ut_a(log_sys.n_pending_flushes == 1); /* No other threads here */ - log_sys.log.flush_data_only(); log_mutex_enter(); - log_sys.flushed_to_disk_lsn = log_sys.current_flush_lsn; - - log_sys.n_pending_flushes--; - - os_event_set(log_sys.flush_event); + ut_a(lsn >= log_sys.flushed_to_disk_lsn); + log_sys.flushed_to_disk_lsn = lsn; + log_mutex_exit(); } /** Switch the log buffer in use, and copy the content of last block @@ -1026,7 +1017,7 @@ void log_buffer_switch() { ut_ad(log_mutex_own()); - ut_ad(log_write_mutex_own()); + ut_ad(log_write_lock_own()); const byte* old_buf = log_sys.buf; ulong area_end = ut_calc_align( @@ -1053,86 +1044,24 @@ log_buffer_switch() log_sys.buf_next_to_write = log_sys.buf_free; } -/** Ensure that the log has been written to the log file up to a given -log entry (such as that of a transaction commit). Start a new write, or -wait and check if an already running write is covering the request. -@param[in] lsn log sequence number that should be -included in the redo log file write -@param[in] flush_to_disk whether the written log should also -be flushed to the file system -@param[in] rotate_key whether to rotate the encryption key */ -void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key) -{ -#ifdef UNIV_DEBUG - ulint loop_count = 0; -#endif /* UNIV_DEBUG */ - byte* write_buf; - lsn_t write_lsn; - - ut_ad(!srv_read_only_mode); - ut_ad(!rotate_key || flush_to_disk); - - if (recv_no_ibuf_operations) { - /* Recovery is running and no operations on the log file are - allowed yet (the variable name .._no_ibuf_.. is misleading) */ +/** +Writes log buffer to disk +which is the "write" part of log_write_up_to(). - return; - } +This function does not flush anything. -loop: - ut_ad(++loop_count < 128); - -#if UNIV_WORD_SIZE > 7 - /* We can do a dirty read of LSN. */ - /* NOTE: Currently doesn't do dirty read for - (flush_to_disk == true) case, because the log_mutex - contention also works as the arbitrator for write-IO - (fsync) bandwidth between log file and data files. */ - if (!flush_to_disk && log_sys.write_lsn >= lsn) { - return; - } -#endif +Note : the caller must have log_mutex locked, and this +mutex is released in the function. - log_write_mutex_enter(); +*/ +static void log_write(bool rotate_key) +{ + ut_ad(log_mutex_own()); ut_ad(!recv_no_log_write); - - lsn_t limit_lsn = flush_to_disk - ? log_sys.flushed_to_disk_lsn - : log_sys.write_lsn; - - if (limit_lsn >= lsn) { - log_write_mutex_exit(); - return; - } - - /* If it is a write call we should just go ahead and do it - as we checked that write_lsn is not where we'd like it to - be. If we have to flush as well then we check if there is a - pending flush and based on that we wait for it to finish - before proceeding further. */ - if (flush_to_disk - && (log_sys.n_pending_flushes > 0 - || !os_event_is_set(log_sys.flush_event))) { - /* Figure out if the current flush will do the job - for us. */ - bool work_done = log_sys.current_flush_lsn >= lsn; - - log_write_mutex_exit(); - - os_event_wait(log_sys.flush_event); - - if (work_done) { - return; - } else { - goto loop; - } - } - - log_mutex_enter(); - if (!flush_to_disk - && log_sys.buf_free == log_sys.buf_next_to_write) { - /* Nothing to write and no flush to disk requested */ - log_mutex_exit_all(); + lsn_t write_lsn; + if (log_sys.buf_free == log_sys.buf_next_to_write) { + /* Nothing to write */ + log_mutex_exit(); return; } @@ -1146,19 +1075,7 @@ loop: DBUG_PRINT("ib_log", ("write " LSN_PF " to " LSN_PF, log_sys.write_lsn, log_sys.lsn)); - if (flush_to_disk) { - log_sys.n_pending_flushes++; - log_sys.current_flush_lsn = log_sys.lsn; - os_event_reset(log_sys.flush_event); - - if (log_sys.buf_free == log_sys.buf_next_to_write) { - /* Nothing to write, flush only */ - log_mutex_exit_all(); - log_write_flush_to_disk_low(); - log_mutex_exit(); - return; - } - } + start_offset = log_sys.buf_next_to_write; end_offset = log_sys.buf_free; @@ -1175,7 +1092,7 @@ loop: log_sys.next_checkpoint_no); write_lsn = log_sys.lsn; - write_buf = log_sys.buf; + byte *write_buf = log_sys.buf; log_buffer_switch(); @@ -1209,8 +1126,7 @@ loop: if (UNIV_UNLIKELY(srv_shutdown_state != SRV_SHUTDOWN_NONE)) { service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, "InnoDB log write: " - LSN_PF "," LSN_PF, - log_sys.write_lsn, lsn); + LSN_PF, log_sys.write_lsn); } if (log_sys.is_encrypted()) { @@ -1230,16 +1146,76 @@ loop: start_offset - area_start); srv_stats.log_padded.add(pad_size); log_sys.write_lsn = write_lsn; + if (log_sys.log.writes_are_durable()) + log_sys.flushed_to_disk_lsn = write_lsn; + return; +} - log_write_mutex_exit(); +static group_commit_lock write_lock; +static group_commit_lock flush_lock; - if (flush_to_disk) { - log_write_flush_to_disk_low(); - ib_uint64_t flush_lsn = log_sys.flushed_to_disk_lsn; - log_mutex_exit(); +#ifdef UNIV_DEBUG +bool log_write_lock_own() +{ + return write_lock.is_owner(); +} +#endif - innobase_mysql_log_notify(flush_lsn); - } +/** Ensure that the log has been written to the log file up to a given +log entry (such as that of a transaction commit). Start a new write, or +wait and check if an already running write is covering the request. +@param[in] lsn log sequence number that should be +included in the redo log file write +@param[in] flush_to_disk whether the written log should also +be flushed to the file system +@param[in] rotate_key whether to rotate the encryption key */ +void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key) +{ + ut_ad(!srv_read_only_mode); + ut_ad(!rotate_key || flush_to_disk); + + if (recv_no_ibuf_operations) + { + /* Recovery is running and no operations on the log files are + allowed yet (the variable name .._no_ibuf_.. is misleading) */ + return; + } + + if (flush_to_disk && + flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED) + { + return; + } + + if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED) + { + log_mutex_enter(); + auto write_lsn = log_sys.lsn; + write_lock.set_pending(write_lsn); + + log_write(rotate_key); + + ut_a(log_sys.write_lsn == write_lsn); + write_lock.release(write_lsn); + } + + if (!flush_to_disk) + { + return; + } + + /* Flush the highest written lsn.*/ + auto flush_lsn = write_lock.value(); + flush_lock.set_pending(flush_lsn); + + if (!log_sys.log.writes_are_durable()) + { + log_write_flush_to_disk_low(flush_lsn); + } + + flush_lock.release(flush_lsn); + + innobase_mysql_log_notify(flush_lsn); } /** write to the log file up to the last log entry. @@ -1270,8 +1246,7 @@ log_buffer_sync_in_background( lsn = log_sys.lsn; if (flush - && log_sys.n_pending_flushes > 0 - && log_sys.current_flush_lsn >= lsn) { + && log_sys.flushed_to_disk_lsn >= lsn) { /* The write + flush will write enough */ log_mutex_exit(); return; @@ -1836,7 +1811,7 @@ wait_suspend_loop: if (log_sys.is_initialised()) { log_mutex_enter(); const ulint n_write = log_sys.n_pending_checkpoint_writes; - const ulint n_flush = log_sys.n_pending_flushes; + const ulint n_flush = log_sys.pending_flushes; log_mutex_exit(); if (log_scrub_thread_active || n_write || n_flush) { @@ -2011,7 +1986,7 @@ log_print( ULINTPF " pending log flushes, " ULINTPF " pending chkp writes\n" ULINTPF " log i/o's done, %.2f log i/o's/second\n", - log_sys.n_pending_flushes, + log_sys.pending_flushes.load(), log_sys.n_pending_checkpoint_writes, log_sys.n_log_ios, static_cast<double>( @@ -2047,9 +2022,7 @@ void log_t::close() ut_free_dodump(buf, srv_log_buffer_size * 2); buf = NULL; - os_event_destroy(flush_event); mutex_free(&mutex); - mutex_free(&write_mutex); mutex_free(&log_flush_order_mutex); if (!srv_read_only_mode && srv_scrub_log) diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc new file mode 100644 index 00000000000..cbac3692f26 --- /dev/null +++ b/storage/innobase/log/log0sync.cc @@ -0,0 +1,306 @@ +/***************************************************************************** +Copyright (c) 2020 MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/* +The group commit synchronization used in log_write_up_to() +works as follows + +For simplicity, lets consider only write operation,synchronozation of +flush operation works the same. + +Rules of the game + +A thread enters log_write_up_to() with lsn of the current transaction +1. If last written lsn is greater than wait lsn (another thread already + wrote the log buffer),then there is no need to do anything. +2. If no other thread is currently writing, write the log buffer, + and update last written lsn. +3. Otherwise, wait, and go to step 1. + +Synchronization can be done in different ways, e.g + +a) Simple mutex locking the entire check and write operation +Disadvantage that threads that could continue after updating +last written lsn, still wait. + +b) Spinlock, with periodic checks for last written lsn. +Fixes a) but burns CPU unnecessary. + +c) Mutex / condition variable combo. + +Condtion variable notifies (broadcast) all waiters, whenever +last written lsn is changed. + +Has a disadvantage of many suprious wakeups, stress on OS scheduler, +and mutex contention. + +d) Something else. +Make use of the waiter's lsn parameter, and only wakeup "right" waiting +threads. + +We chose d). Even if implementation is more complicated than alternatves +due to the need to maintain list of waiters, it provides the best performance. + +See group_commit_lock implementation for details. + +Note that if write operation is very fast, a) or b) can be fine as alternative. +*/ +#ifdef _WIN32 +#include <windows.h> +#endif + +#ifdef __linux__ +#include <linux/futex.h> +#include <sys/syscall.h> +#endif + +#include <atomic> +#include <thread> +#include <mutex> +#include <condition_variable> +#include <my_cpu.h> + +#include <log0types.h> +#include "log0sync.h" + +/** + Helper class , used in group commit lock. + + Binary semaphore, or (same thing), an auto-reset event + Has state (signalled or not), and provides 2 operations. + wait() and wake() + + The implementation uses efficient locking primitives on Linux and Windows. + Or, mutex/condition combo elsewhere. +*/ + +class binary_semaphore +{ +public: + /**Wait until semaphore becomes signalled, and atomically reset the state + to non-signalled*/ + void wait(); + /** signals the semaphore */ + void wake(); + +private: +#if defined(__linux__) || defined (_WIN32) + std::atomic<int> m_signalled; + const std::memory_order mem_order = std::memory_order::memory_order_acq_rel; +public: + binary_semaphore() :m_signalled(0) {} +#else + std::mutex m_mtx{}; + std::condition_variable m_cv{}; + bool m_signalled = false; +#endif +}; + +#if defined (__linux__) || defined (_WIN32) +void binary_semaphore::wait() +{ + for (;;) + { + if (m_signalled.exchange(0, mem_order) == 1) + { + break; + } +#ifdef _WIN32 + int zero = 0; + WaitOnAddress(&m_signalled, &zero, sizeof(m_signalled), INFINITE); +#else + syscall(SYS_futex, &m_signalled, FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0); +#endif + } +} + +void binary_semaphore::wake() +{ + if (m_signalled.exchange(1, mem_order) == 0) + { +#ifdef _WIN32 + WakeByAddressSingle(&m_signalled); +#else + syscall(SYS_futex, &m_signalled, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0); +#endif + } +} +#else +void binary_semaphore::wake() +{ + std::unique_lock<std::mutex> lk(m_mtx); + while (!m_signalled) + m_cv.wait(lk); + m_signalled = false; +} +void binary_semaphore::wake() +{ + std::unique_lock<std::mutex> lk(m_mtx); + m_signalled = true; + m_cv.notify_one(); +} +#endif + +/* A thread helper structure, used in group commit lock below*/ +struct group_commit_waiter_t +{ + lsn_t m_value; + binary_semaphore m_sema; + group_commit_waiter_t* m_next; + group_commit_waiter_t() :m_value(), m_sema(), m_next() {} +}; + +group_commit_lock::group_commit_lock() : + m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list() +{ +} + +group_commit_lock::value_type group_commit_lock::value() const +{ + return m_value.load(std::memory_order::memory_order_relaxed); +} + +group_commit_lock::value_type group_commit_lock::pending() const +{ + return m_pending_value.load(std::memory_order::memory_order_relaxed); +} + +void group_commit_lock::set_pending(group_commit_lock::value_type num) +{ + ut_a(num >= value()); + m_pending_value.store(num, std::memory_order::memory_order_relaxed); +} + +const unsigned int MAX_SPINS = 1; /** max spins in acquire */ +thread_local group_commit_waiter_t thread_local_waiter; + +group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num) +{ + unsigned int spins = MAX_SPINS; + + for(;;) + { + if (num <= value()) + { + /* No need to wait.*/ + return lock_return_code::EXPIRED; + } + + if(spins-- == 0) + break; + if (num > pending()) + { + /* Longer wait expected (longer than currently running operation), + don't spin.*/ + break; + } + ut_delay(1); + } + + thread_local_waiter.m_value = num; + std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock); + while (num > value()) + { + lk.lock(); + + /* Re-read current value after acquiring the lock*/ + if (num <= value()) + { + return lock_return_code::EXPIRED; + } + + if (!m_lock) + { + /* Take the lock, become group commit leader.*/ + m_lock = true; +#ifndef DBUG_OFF + m_owner_id = std::this_thread::get_id(); +#endif + return lock_return_code::ACQUIRED; + } + + /* Add yourself to waiters list.*/ + thread_local_waiter.m_next = m_waiters_list; + m_waiters_list = &thread_local_waiter; + lk.unlock(); + + /* Sleep until woken in release().*/ + thread_local_waiter.m_sema.wait(); + } + return lock_return_code::EXPIRED; +} + +void group_commit_lock::release(value_type num) +{ + std::unique_lock<std::mutex> lk(m_mtx); + m_lock = false; + + /* Update current value. */ + ut_a(num >= value()); + m_value.store(num, std::memory_order_relaxed); + + /* + Wake waiters for value <= current value. + Wake one more waiter, who will become the group commit lead. + */ + group_commit_waiter_t* cur, * prev, * next; + group_commit_waiter_t* wakeup_list = nullptr; + int extra_wake = 0; + + for (cur = m_waiters_list; cur; cur = next) + { + next = cur->m_next; + if (cur->m_value <= num || extra_wake++ == 0) + { + /* Move current waiter to wakeup_list*/ + + if (cur == m_waiters_list) + { + /* Remove from the start of the list.*/ + m_waiters_list = next; + } + else + { + /* Remove from the middle of the list.*/ + prev->m_next = cur->m_next; + } + + /* Append entry to the wakeup list.*/ + cur->m_next = wakeup_list; + wakeup_list = cur; + } + else + { + prev = cur; + } + } + lk.unlock(); + + for (cur = wakeup_list; cur; cur = next) + { + next = cur->m_next; + cur->m_sema.wake(); + } +} + +#ifndef DBUG_OFF +bool group_commit_lock::is_owner() +{ + return m_lock && std::this_thread::get_id() == m_owner_id; +} +#endif + diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h new file mode 100644 index 00000000000..40afbf74ecd --- /dev/null +++ b/storage/innobase/log/log0sync.h @@ -0,0 +1,81 @@ +/***************************************************************************** +Copyright (c) 2020 MariaDB Corporation. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +#include <atomic> +#include <thread> +#include <log0types.h> + +struct group_commit_waiter_t; + +/** +Special synchronization primitive, which is helpful for +performing group commit. + +It has a state consisting of + - locked (bool) + - current value (number). This value is always increasing. + - pending value (number). current value can soon become this number + This is only used for optimization, does not have to be exact + +Operations supported on this semaphore + +1.acquire(num): +- waits until current value exceeds num, or until lock is granted. + +- returns EXPIRED if current_value >= num, + or ACQUIRED, if current_value < num and lock is granted. + +2.release(num) +- releases lock +- sets new current value to max(num,current_value) +- releases some threads waiting in acquire() + +3. value() +- read current value + +4. pending_value() +- read pending value + +5. set_pending_value() +*/ +class group_commit_lock +{ + using value_type = lsn_t; +#ifndef DBUG_OFF + std::thread::id m_owner_id{}; +#endif + std::mutex m_mtx; + std::atomic<value_type> m_value; + std::atomic<value_type> m_pending_value; + bool m_lock; + group_commit_waiter_t* m_waiters_list; +public: + group_commit_lock(); + enum lock_return_code + { + ACQUIRED, + EXPIRED + }; + lock_return_code acquire(value_type num); + void release(value_type num); + value_type value() const; + value_type pending() const; + void set_pending(value_type num); +#ifndef DBUG_OFF + bool is_owner(); +#endif +}; diff --git a/storage/innobase/srv/srv0mon.cc b/storage/innobase/srv/srv0mon.cc index 8397d3a8ea3..ac5863d8efc 100644 --- a/storage/innobase/srv/srv0mon.cc +++ b/storage/innobase/srv/srv0mon.cc @@ -1976,9 +1976,8 @@ srv_mon_process_existing_counter( break; case MONITOR_PENDING_LOG_FLUSH: - mutex_enter(&log_sys.mutex); - value = static_cast<mon_type_t>(log_sys.n_pending_flushes); - mutex_exit(&log_sys.mutex); + value = static_cast<mon_type_t>(log_sys.pending_flushes); + break; case MONITOR_PENDING_CHECKPOINT_WRITE: diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index b6833543f44..65c89d718e8 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -1037,11 +1037,12 @@ static lsn_t srv_prepare_to_delete_redo_log_file(bool old_exists) } srv_start_lsn = flushed_lsn; - /* Flush the old log file. */ + bool do_flush_logs = flushed_lsn != log_sys.flushed_to_disk_lsn; log_mutex_exit(); - log_write_up_to(flushed_lsn, true); - + if (do_flush_logs) { + log_write_up_to(flushed_lsn, false); + } log_sys.log.flush_data_only(); ut_ad(flushed_lsn == log_get_lsn()); |