diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2022-04-14 14:30:26 +0300 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2022-04-14 14:30:26 +0300 |
commit | 656906c9572e4272a6a9b30e2d9c9e6d78a29220 (patch) | |
tree | bc774e94d08966b439e850fc01c9519e81b23f70 | |
parent | 3ed0348846c4486aa19854e7aa4d89f60c75ae4e (diff) | |
parent | fbf8646335280150a6ecf5727effb1a719f26b22 (diff) | |
download | mariadb-git-bb-10.9-MDEV-21423-MDEV-26603.tar.gz |
-rw-r--r-- | storage/innobase/buf/buf0flu.cc | 2 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 3 | ||||
-rw-r--r-- | storage/innobase/include/log0log.h | 36 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 269 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.cc | 136 | ||||
-rw-r--r-- | storage/innobase/log/log0sync.h | 19 | ||||
-rw-r--r-- | storage/innobase/mtr/mtr0mtr.cc | 4 | ||||
-rw-r--r-- | storage/innobase/os/os0file.cc | 8 | ||||
-rw-r--r-- | storage/innobase/srv/srv0srv.cc | 9 | ||||
-rw-r--r-- | storage/innobase/srv/srv0start.cc | 4 | ||||
-rw-r--r-- | storage/innobase/trx/trx0trx.cc | 3 |
11 files changed, 323 insertions, 170 deletions
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 0604234c5f6..d297695f6f4 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -1702,7 +1702,7 @@ ulint buf_flush_LRU(ulint max_n) if (buf_pool.n_flush_LRU()) return 0; - log_buffer_flush_to_disk(); + log_buffer_flush_to_disk_async(); mysql_mutex_lock(&buf_pool.mutex); if (buf_pool.n_flush_LRU_) diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index ed68f8cdc55..cb8084bd3e2 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -1770,6 +1770,7 @@ MYSQL_THD innobase_create_background_thd(const char* name) MYSQL_THD thd= create_background_thd(); thd_proc_info(thd, name); THDVAR(thd, background_thread) = true; + ut_ad(!thd_get_thread_id(thd)); return thd; } @@ -1777,7 +1778,7 @@ extern "C" void thd_increment_pending_ops(MYSQL_THD); THD *innodb_thd_increment_pending_ops(THD *thd) { - if (!thd || THDVAR(thd, background_thread)) + if (!thd || !thd_get_thread_id(thd)) return nullptr; thd_increment_pending_ops(thd); return thd; diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h index 04c3d7c371d..8f6fbaf30e2 100644 --- a/storage/innobase/include/log0log.h +++ b/storage/innobase/include/log0log.h @@ -70,15 +70,25 @@ void log_write_up_to(lsn_t lsn, bool durable, const completion_callback *callback= nullptr); /** Write to the log file up to the last log entry. -@param durable whether to wait for a durable write to complete */ -void log_buffer_flush_to_disk(bool durable= true); +@param durable whether to wait for a durable write to complete +@param wait whether to wait for completion +*/ +void log_buffer_flush_to_disk(bool durable= true, bool wait=true); + +/** Initiate log buffer write/flush */ +static inline void log_buffer_flush_to_disk_async() +{ + log_buffer_flush_to_disk(true, false); +} /** Prepare to invoke log_write_and_flush(), before acquiring log_sys.latch. */ ATTRIBUTE_COLD void log_write_and_flush_prepare(); -/** Durably write the log up to log_sys.get_lsn(). */ -ATTRIBUTE_COLD void log_write_and_flush(); +/** Durably write the log up to log_sys.get_lsn(). +@return lsn that log_write_up_to() must be invoked with +@retval 0 if there is no need to invoke log_write_up_to() */ +ATTRIBUTE_COLD __attribute__((warn_unused_result)) lsn_t log_write_and_flush(); /** Make a checkpoint */ ATTRIBUTE_COLD void log_make_checkpoint(); @@ -136,7 +146,8 @@ public: dberr_t close() noexcept; dberr_t read(os_offset_t offset, span<byte> buf) noexcept; - void write(os_offset_t offset, span<const byte> buf) noexcept; + void write(os_offset_t offset, span<const byte> buf, + tpool::aiocb *cb= nullptr) noexcept; bool flush() const noexcept { return os_file_flush(m_file); } #ifdef HAVE_PMEM byte *mmap(bool read_only, const struct stat &st) noexcept; @@ -485,8 +496,19 @@ public: /** Write buf to ib_logfile0. @tparam release_latch whether to invoke latch.wr_unlock() - @return the current log sequence number */ - template<bool release_latch> inline lsn_t write_buf() noexcept; + @param durable whether to invoke a durable write + @param sync whether to invoke a synchronous write + @return new write target + @retval 0 if there is no need to call log_write_up_to() */ + template<bool release_latch> + inline lsn_t write_buf(bool durable, bool sync) noexcept; + + /** Complete write_buf(). + @param lsn new value of write_lsn + @param durable whether the write was durable + @return new write target + @retval 0 if there is no need to call log_write_up_to() */ + inline lsn_t complete_write_buf(lsn_t lsn, bool durable) noexcept; /** Create the log. */ void create(lsn_t lsn) noexcept; diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 9529db0af06..7de28c69321 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -74,6 +74,38 @@ log_t log_sys; #define LOG_BUF_FLUSH_MARGIN ((4 * 4096) /* cf. log_t::append_prepare() */ \ + (4U << srv_page_size_shift)) +/** + group commit completion callback used for anything + that can run asynchronous +*/ +static const completion_callback async_io_callback{nullptr, nullptr}; + +/** + group commit completion callback that is forcing synchronous IO +*/ +static const completion_callback sync_io_callback{nullptr, nullptr}; + +#ifndef DBUG_OFF +/** + Crashing after disk flush requested via dbug_debug flag. + flush can be executed by background thread, + where DBUG_EXECUTE_IF() does not work, this the value + is passed via global variable. +*/ +static bool crash_after_flush; +#endif + +static void report_aio_error(const char *text, tpool::aiocb *cb); + +/** AIO control block with auxilliary information, for async writing. +Protected by write_lock.*/ +struct Log_aiocb : tpool::aiocb +{ + lsn_t lsn; + bool durable; +}; +static Log_aiocb log_aiocb; + void log_t::set_capacity() { #ifndef SUX_LOCK_GENERIC @@ -155,12 +187,29 @@ dberr_t log_file_t::read(os_offset_t offset, span<byte> buf) noexcept return os_file_read(IORequestRead, m_file, buf.data(), offset, buf.size()); } -void log_file_t::write(os_offset_t offset, span<const byte> buf) noexcept +void log_file_t::write(os_offset_t offset, span<const byte> buf, + tpool::aiocb *iocb) noexcept { ut_ad(is_opened()); - if (dberr_t err= os_file_write_func(IORequestWrite, "ib_logfile0", m_file, + if (iocb) + { + ut_ad(buf.size() < UINT_MAX); + iocb->m_fh= m_file; + iocb->m_opcode= tpool::aio_opcode::AIO_PWRITE; + iocb->m_offset= offset; + iocb->m_buffer= (void *) buf.data(); + iocb->m_len= (unsigned) buf.size(); + if (srv_thread_pool->submit_io(iocb)) + { + iocb->m_err= IF_WIN(GetLastError(), errno); + report_aio_error("submitting asynchronous write to ib_logfile0", iocb); + } + } + else if (dberr_t err= os_file_write(IORequestWrite, "ib_logfile0", m_file, buf.data(), offset, buf.size())) + { ib::fatal() << "write(\"ib_logfile0\") returned " << err; + } } #ifdef HAVE_PMEM @@ -512,10 +561,13 @@ void log_t::resize_abort() noexcept /** Write an aligned buffer to ib_logfile0. @param buf buffer to be written @param len length of data to be written -@param offset log file offset */ -static void log_write_buf(const byte *buf, size_t len, lsn_t offset) +@param offset log file offset +@param cb completion callback */ +static void log_write_buf(const byte *buf, size_t len, lsn_t offset, + tpool::aiocb *cb) { - ut_ad(write_lock.is_owner()); + ut_ad(cb ? !write_lock.has_owner() : write_lock.is_owner()); + ut_ad(write_lock.locked()); ut_ad(!recv_no_log_write); ut_d(const size_t block_size_1= log_sys.get_block_size() - 1); ut_ad(!(offset & block_size_1)); @@ -526,7 +578,7 @@ static void log_write_buf(const byte *buf, size_t len, lsn_t offset) if (UNIV_LIKELY(offset + len <= log_sys.file_size)) { write: - log_sys.log.write(offset, {buf, len}); + log_sys.log.write(offset, {buf, len}, cb); return; } @@ -738,30 +790,110 @@ ATTRIBUTE_COLD void log_t::resize_write_buf(size_t length) noexcept resize_flush_buf, offset, length) == DB_SUCCESS); } + +static void report_aio_error(const char *text, tpool::aiocb *cb) +{ + ib::fatal() << "IO Error " + << cb->m_err IF_WIN(, << " " << strerror(cb->m_err)) << " " + << text << "," << cb->m_len << " bytes at offset " + << cb->m_offset; +} + +/** Ensure that previous log writes are durable. +@return new durable lsn target +@retval 0 if caller does not need to call log_write_up_to() again + +*/ +static lsn_t log_flush() +{ + ut_ad(!log_sys.is_pmem()); + lsn_t lsn= write_lock.value(); + ut_a(log_sys.flush(lsn)); +#ifndef DBUG_OFF + if (crash_after_flush) + DBUG_SUICIDE(); +#endif + return flush_lock.release(lsn); +} + + +/** Complete write_buf(). +@param lsn new value of write_lsn +@param durable whether the write was durable +@return new write target +@retval 0 if there is no need to call log_write_up_to() */ +inline lsn_t log_t::complete_write_buf(lsn_t lsn, bool durable) noexcept +{ + ut_ad(write_lock.is_owner()); + ut_ad(durable == flush_lock.is_owner()); + + ut_a(lsn >= write_lsn); + + write_lsn= lsn; + lsn_t pending_lsn= write_lock.release(lsn); + if (durable) + pending_lsn= std::max(pending_lsn, log_flush()); + return pending_lsn; +} + +static void aio_complete_write_buf(void *p) +{ + ut_ad(write_lock.locked()); + + Log_aiocb *cb= static_cast<Log_aiocb *>(p); + if (cb->m_err) + report_aio_error("in asynchronous write to ib_logfile0", cb); + const bool durable{cb->durable}; +#ifdef UNIV_DEBUG + if (durable) + { + ut_ad(flush_lock.locked()); + flush_lock.set_owner(); + } + write_lock.set_owner(); +#endif + + if (lsn_t ret_lsn= log_sys.complete_write_buf(cb->lsn, durable)) + { + /** prevent stalls. Also, force special synchronous callback + as optimization. We'll avoid threadpool machinery and context + switching (we're already in the background thread here) + */ + log_write_up_to(ret_lsn, durable, &sync_io_callback); + } +} + + /** Write buf to ib_logfile0. @tparam release_latch whether to invoke latch.wr_unlock() -@return the current log sequence number */ -template<bool release_latch> inline lsn_t log_t::write_buf() noexcept +@param durable whether to invoke a durable write +@param sync whether to invoke a synchronous write +@return new write target +@retval 0 if there is no need to call log_write_up_to() */ +template<bool release_latch> +inline lsn_t log_t::write_buf(bool durable, bool sync) noexcept { #ifndef SUX_LOCK_GENERIC ut_ad(latch.is_write_locked()); #endif ut_ad(!srv_read_only_mode); ut_ad(!is_pmem()); + ut_ad(write_lock.is_owner()); + ut_ad(durable == flush_lock.is_owner()); const lsn_t lsn{get_lsn(std::memory_order_relaxed)}; - + DBUG_EXECUTE_IF("crash_after_log_write_upto", crash_after_flush= true;); if (write_lsn >= lsn) { if (release_latch) latch.wr_unlock(); - ut_ad(write_lsn == lsn); + ut_a(write_lsn == lsn); + return complete_write_buf(lsn, durable); } else { ut_ad(!recv_no_log_write); - write_lock.set_pending(lsn); - ut_ad(write_lsn >= get_flushed_lsn()); + ut_a(write_lsn >= get_flushed_lsn()); const size_t block_size_1{get_block_size() - 1}; lsn_t offset{calc_lsn_offset(write_lsn) & ~lsn_t{block_size_1}}; @@ -812,20 +944,34 @@ template<bool release_latch> inline lsn_t log_t::write_buf() noexcept "InnoDB log write: " LSN_PF, write_lsn); } - /* Do the write to the log file */ - log_write_buf(write_buf, length, offset); if (UNIV_LIKELY_NULL(resize_buf)) resize_write_buf(length); - write_lsn= lsn; - } - return lsn; + /* Do the write to the log file */ + if (sync) + { + log_write_buf(write_buf, length, offset, nullptr); + return complete_write_buf(lsn, durable); + } + + /* Async log IO + Note : flush/write lock ownership is going to migrate to a + background thread*/ + ut_d(write_lock.reset_owner()); + ut_d(if (durable) flush_lock.reset_owner()); + + log_aiocb.m_callback= aio_complete_write_buf; + log_aiocb.durable= durable; + log_aiocb.lsn= lsn; + log_write_buf(write_buf, length, offset, &log_aiocb); + return 0; + } } bool log_t::flush(lsn_t lsn) noexcept { ut_ad(lsn >= get_flushed_lsn()); - flush_lock.set_pending(lsn); + ut_ad(flush_lock.is_owner()); const bool success{srv_file_flush_method == SRV_O_DSYNC || log.flush()}; if (UNIV_LIKELY(success)) { @@ -835,22 +981,25 @@ bool log_t::flush(lsn_t lsn) noexcept return success; } -/** Ensure that previous log writes are durable. -@param lsn previously written LSN -@return new durable lsn target -@retval 0 if there are no pending callbacks on flush_lock - or there is another group commit lead. +/* + Decide about whether to do synchronous IO. + Async might not make sense because of the higher latency or CPU + overhead in threadpool, or because the file is cached,and say libaio + can't do AIO on cached files. + + Async IO apparently makes sense always if the waiter does + not care about result (i.e callback with NULL function) + + NOTE: currently, async IO is mostly unused, because it turns + out to be worse in benchmarks. Perhaps it is just too many threads + involved in waking and waiting. */ -static lsn_t log_flush(lsn_t lsn) +static bool use_sync_log_write(bool /* durable */, + const completion_callback *cb) { - ut_ad(!log_sys.is_pmem()); - ut_a(log_sys.flush(lsn)); - DBUG_EXECUTE_IF("crash_after_log_write_upto", DBUG_SUICIDE();); - return flush_lock.release(lsn); + return !cb || cb->m_callback || cb == &sync_io_callback; } -static const completion_callback dummy_callback{[](void *) {},nullptr}; - /** Ensure that the log has been written to the log file up to a given log entry (such as that of a transaction commit). Start a new write, or wait and check if an already running write is covering the request. @@ -867,7 +1016,7 @@ void log_write_up_to(lsn_t lsn, bool durable, { /* A non-final batch of recovery is active no writes to the log are allowed yet. */ - ut_a(!callback); + ut_a(!callback || !callback->m_callback); return; } @@ -876,7 +1025,7 @@ void log_write_up_to(lsn_t lsn, bool durable, #ifdef HAVE_PMEM if (log_sys.is_pmem()) { - ut_ad(!callback); + ut_ad(!callback || !callback->m_callback); if (durable) log_sys.persist(lsn); return; @@ -884,42 +1033,49 @@ void log_write_up_to(lsn_t lsn, bool durable, #endif repeat: - if (durable) - { - if (flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED) - return; - flush_lock.set_pending(log_sys.get_lsn()); - } + if (durable && + flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED) + return; - lsn_t pending_write_lsn= 0, pending_flush_lsn= 0; + lsn_t pending_lsn= 0; if (write_lock.acquire(lsn, durable ? nullptr : callback) == group_commit_lock::ACQUIRED) { + const bool sync{use_sync_log_write(durable, callback)}; log_sys.latch.wr_lock(SRW_LOCK_CALL); - pending_write_lsn= write_lock.release(log_sys.write_buf<true>()); + pending_lsn= log_sys.write_buf<true>(durable, sync); + if (!pending_lsn) + return; } - if (durable) + if (durable && !pending_lsn) { - pending_flush_lsn= log_flush(write_lock.value()); + /* We only get here if flush_lock is acquired, but write_lock + is expired, i.e lsn was already written, but not flushed yet.*/ + pending_lsn= log_flush(); } - if (pending_write_lsn || pending_flush_lsn) + if (pending_lsn) { - /* There is no new group commit lead; some async waiters could stall. */ - callback= &dummy_callback; - lsn= std::max(pending_write_lsn, pending_flush_lsn); + /* There is no new group commit lead; some waiters could stall. + If special sync_io_callback was used we'll continue to use it + as optimization to reduce context switches. + */ + if (callback != &sync_io_callback) + callback= &async_io_callback; + lsn= pending_lsn; goto repeat; } } /** Write to the log file up to the last log entry. @param durable whether to wait for a durable write to complete */ -void log_buffer_flush_to_disk(bool durable) +void log_buffer_flush_to_disk(bool durable, bool wait) { ut_ad(!srv_read_only_mode); - log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), durable); + log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), durable, + wait ? nullptr : &async_io_callback); } /** Prepare to invoke log_write_and_flush(), before acquiring log_sys.latch. */ @@ -934,20 +1090,19 @@ ATTRIBUTE_COLD void log_write_and_flush_prepare() group_commit_lock::ACQUIRED); } -/** Durably write the log up to log_sys.get_lsn(). */ -ATTRIBUTE_COLD void log_write_and_flush() +/** Durably write the log up to log_sys.get_lsn(). +@return lsn that log_write_up_to() must be invoked with +@retval 0 if there is no need to invoke log_write_up_to() */ +ATTRIBUTE_COLD __attribute__((warn_unused_result)) lsn_t log_write_and_flush() { ut_ad(!srv_read_only_mode); if (!log_sys.is_pmem()) - { - const lsn_t lsn{log_sys.write_buf<false>()}; - write_lock.release(lsn); - log_flush(lsn); - } + return log_sys.write_buf<false>(true, true); + #ifdef HAVE_PMEM - else - log_sys.persist(log_sys.get_lsn()); + log_sys.persist(log_sys.get_lsn()); #endif + return 0; } /******************************************************************** diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc index 6b14d1d3591..641d7e65c36 100644 --- a/storage/innobase/log/log0sync.cc +++ b/storage/innobase/log/log0sync.cc @@ -1,5 +1,5 @@ /***************************************************************************** -Copyright (c) 2020 MariaDB Corporation. +Copyright (c) 2020, 2022, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -72,12 +72,8 @@ Note that if write operation is very fast, a) or b) can be fine as alternative. #include <thread> #include <mutex> #include <condition_variable> -#include <my_cpu.h> -#include <log0types.h> #include "log0sync.h" -#include <mysql/service_thd_wait.h> -#include <sql_class.h> /** Helper class , used in group commit lock. @@ -166,7 +162,7 @@ struct group_commit_waiter_t }; group_commit_lock::group_commit_lock() : - m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list() + m_mtx(), m_value(0), m_lock(false), m_waiters_list() { } @@ -175,60 +171,32 @@ group_commit_lock::value_type group_commit_lock::value() const return m_value.load(std::memory_order::memory_order_relaxed); } -group_commit_lock::value_type group_commit_lock::pending() const -{ - return m_pending_value.load(std::memory_order::memory_order_relaxed); -} - -void group_commit_lock::set_pending(group_commit_lock::value_type num) -{ - ut_a(num >= value()); - m_pending_value.store(num, std::memory_order::memory_order_relaxed); -} -const unsigned int MAX_SPINS = 1; /** max spins in acquire */ thread_local group_commit_waiter_t thread_local_waiter; static inline void do_completion_callback(const completion_callback* cb) { - if (cb) + if (cb && cb->m_callback) cb->m_callback(cb->m_param); } -group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback) +inline void store_callback(lsn_t num, const completion_callback *cb, + std::vector<std::pair<lsn_t, completion_callback>> &v) { - unsigned int spins = MAX_SPINS; - - for(;;) - { - if (num <= value()) - { - /* No need to wait.*/ - do_completion_callback(callback); - return lock_return_code::EXPIRED; - } - - if(spins-- == 0) - break; - if (num > pending()) - { - /* Longer wait expected (longer than currently running operation), - don't spin.*/ - break; - } - ut_delay(1); - } + if (!cb || !cb->m_callback) + return; + v.push_back({num, *cb}); +} - thread_local_waiter.m_value = num; - thread_local_waiter.m_group_commit_leader= false; +group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback) +{ + bool group_commit_leader= false; std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock); - while (num > value() || thread_local_waiter.m_group_commit_leader) + while (num > value() || group_commit_leader) { lk.lock(); - /* Re-read current value after acquiring the lock*/ - if (num <= value() && - (!thread_local_waiter.m_group_commit_leader || m_lock)) + if (num <= value() && (!group_commit_leader || m_lock)) { lk.unlock(); do_completion_callback(callback); @@ -239,40 +207,30 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, c { /* Take the lock, become group commit leader.*/ m_lock = true; -#ifndef DBUG_OFF - m_owner_id = std::this_thread::get_id(); -#endif - if (callback) - m_pending_callbacks.push_back({num,*callback}); + ut_d(set_owner()); + store_callback(num, callback,m_pending_callbacks); return lock_return_code::ACQUIRED; } - if (callback && (m_waiters_list || num <= pending())) + if (callback) { - /* - If num > pending(), we have a good candidate for the next group - commit lead, that will be taking over the lock after current owner - releases it. We put current thread into waiter's list so it sleeps - and can be signaled and marked as group commit lead during lock release. - - For this to work well, pending() must deliver a good approximation for N - in the next call to group_commit_lock::release(N). - */ - m_pending_callbacks.push_back({num, *callback}); + store_callback(num, callback, m_pending_callbacks); return lock_return_code::CALLBACK_QUEUED; } /* Add yourself to waiters list.*/ - thread_local_waiter.m_group_commit_leader= false; - thread_local_waiter.m_next = m_waiters_list; - m_waiters_list = &thread_local_waiter; + auto *waiter= &thread_local_waiter; + waiter->m_value= num; + waiter->m_group_commit_leader= false; + waiter->m_next= m_waiters_list; + m_waiters_list= waiter; lk.unlock(); /* Sleep until woken in release().*/ thd_wait_begin(0,THD_WAIT_GROUP_COMMIT); - thread_local_waiter.m_sema.wait(); + waiter->m_sema.wait(); thd_wait_end(0); - + group_commit_leader= waiter->m_group_commit_leader; } do_completion_callback(callback); return lock_return_code::EXPIRED; @@ -282,9 +240,8 @@ group_commit_lock::value_type group_commit_lock::release(value_type num) { completion_callback callbacks[1000]; size_t callback_count = 0; - value_type ret = 0; + value_type ret= 0; std::unique_lock<std::mutex> lk(m_mtx); - m_lock = false; /* Update current value. */ ut_a(num >= value()); @@ -307,6 +264,12 @@ group_commit_lock::value_type group_commit_lock::release(value_type num) } } + auto it= + std::remove_if(m_pending_callbacks.begin(), m_pending_callbacks.end(), + [num](const pending_cb &c) { return c.first <= num; }); + + m_pending_callbacks.erase(it, m_pending_callbacks.end()); + for (prev= nullptr, cur= m_waiters_list; cur; cur= next) { next= cur->m_next; @@ -335,12 +298,6 @@ group_commit_lock::value_type group_commit_lock::release(value_type num) } } - auto it= std::remove_if( - m_pending_callbacks.begin(), m_pending_callbacks.end(), - [num](const pending_cb &c) { return c.first <= num; }); - - m_pending_callbacks.erase(it, m_pending_callbacks.end()); - if (m_pending_callbacks.size() || m_waiters_list) { /* @@ -370,7 +327,8 @@ group_commit_lock::value_type group_commit_lock::release(value_type num) ret= m_pending_callbacks[0].first; } } - + ut_d(reset_owner();) + m_lock= false; lk.unlock(); /* @@ -396,9 +354,29 @@ group_commit_lock::value_type group_commit_lock::release(value_type num) } #ifndef DBUG_OFF -bool group_commit_lock::is_owner() +#include <tpool_structs.h> +TPOOL_SUPPRESS_TSAN +bool group_commit_lock::locked() const noexcept { return m_lock; } + +TPOOL_SUPPRESS_TSAN bool group_commit_lock::is_owner() const noexcept { - return m_lock && std::this_thread::get_id() == m_owner_id; + return locked() && std::this_thread::get_id() == m_owner_id; +} + +void TPOOL_SUPPRESS_TSAN group_commit_lock::set_owner() +{ + DBUG_ASSERT(locked() && !has_owner()); + m_owner_id= std::this_thread::get_id(); } -#endif +void TPOOL_SUPPRESS_TSAN group_commit_lock::reset_owner() +{ + DBUG_ASSERT(is_owner()); + m_owner_id= std::thread::id(); +} + +bool TPOOL_SUPPRESS_TSAN group_commit_lock::has_owner() const noexcept +{ + return m_owner_id != std::thread::id(); +} +#endif diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h index 00686d39dac..7dc9e2aaf38 100644 --- a/storage/innobase/log/log0sync.h +++ b/storage/innobase/log/log0sync.h @@ -1,5 +1,5 @@ /***************************************************************************** -Copyright (c) 2020 MariaDB Corporation. +Copyright (c) 2020, 2022, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -56,14 +56,6 @@ Operations supported on this semaphore - might return some lsn, meaning there are some pending callbacks left, and there is no new group commit lead (i.e caller must do something to flush those pending callbacks) - -3. value() -- read current value - -4. pending_value() -- read pending value - -5. set_pending_value() */ class group_commit_lock { @@ -73,7 +65,6 @@ class group_commit_lock #endif std::mutex m_mtx; std::atomic<value_type> m_value; - std::atomic<value_type> m_pending_value; bool m_lock; group_commit_waiter_t* m_waiters_list; @@ -91,9 +82,11 @@ public: lock_return_code acquire(value_type num, const completion_callback *cb); value_type release(value_type num); value_type value() const; - value_type pending() const; - void set_pending(value_type num); #ifndef DBUG_OFF - bool is_owner(); + bool locked() const noexcept; + bool is_owner() const noexcept; + bool has_owner() const noexcept; + void set_owner(); + void reset_owner(); #endif }; diff --git a/storage/innobase/mtr/mtr0mtr.cc b/storage/innobase/mtr/mtr0mtr.cc index b68b820eb0c..e2b8740a5ac 100644 --- a/storage/innobase/mtr/mtr0mtr.cc +++ b/storage/innobase/mtr/mtr0mtr.cc @@ -614,7 +614,7 @@ void mtr_t::commit_shrink(fil_space_t &space) const lsn_t start_lsn= do_write().first; /* Durably write the reduced FSP_SIZE before truncating the data file. */ - log_write_and_flush(); + lsn_t pending_lsn= log_write_and_flush(); #ifndef SUX_LOCK_GENERIC ut_ad(log_sys.latch.is_write_locked()); #endif @@ -660,6 +660,8 @@ void mtr_t::commit_shrink(fil_space_t &space) m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>()); release_resources(); + if (pending_lsn) + log_buffer_flush_to_disk_async(); } /** Commit a mini-transaction that did not modify any pages, diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc index a9c4776c147..3654982ac4b 100644 --- a/storage/innobase/os/os0file.cc +++ b/storage/innobase/os/os0file.cc @@ -2205,6 +2205,8 @@ os_file_create_func( ? FILE_FLAG_OVERLAPPED : 0; if (type == OS_LOG_FILE) { + ut_a(read_only || srv_thread_pool); + attributes|= FILE_FLAG_OVERLAPPED; if(srv_flush_log_at_trx_commit != 2 && !log_sys.is_opened()) attributes|= FILE_FLAG_NO_BUFFERING; if (srv_file_flush_method == SRV_O_DSYNC) @@ -3690,7 +3692,11 @@ int os_aio_init() OS_AIO_N_PENDING_IOS_PER_THREAD); int max_read_events= int(srv_n_read_io_threads * OS_AIO_N_PENDING_IOS_PER_THREAD); - int max_events= max_read_events + max_write_events; + + /* Count one extra aiocb for innodb async redo writes, which + bypasses the slots.*/ + + int max_events= max_read_events + max_write_events + 1; read_slots.reset(new io_slots(max_read_events, srv_n_read_io_threads)); write_slots.reset(new io_slots(max_write_events, srv_n_write_io_threads)); diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index 7e1cd5145c2..7ce1ebf145c 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -1454,7 +1454,7 @@ static void srv_sync_log_buffer_in_background() srv_main_thread_op_info = "flushing log"; if (difftime(current_time, srv_last_log_flush_time) >= srv_flush_log_at_timeout) { - log_buffer_flush_to_disk(); + log_buffer_flush_to_disk_async(); srv_last_log_flush_time = current_time; srv_log_writes_and_flush++; } @@ -1566,15 +1566,14 @@ void srv_master_callback(void*) if (!purge_state.m_running) srv_wake_purge_thread_if_not_active(); ulonglong counter_time= microsecond_interval_timer(); - srv_sync_log_buffer_in_background(); - MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_LOG_FLUSH_MICROSECOND, - counter_time); - if (srv_check_activity(&old_activity_count)) srv_master_do_active_tasks(counter_time); else srv_master_do_idle_tasks(counter_time); + srv_sync_log_buffer_in_background(); + MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_LOG_FLUSH_MICROSECOND, + counter_time); srv_main_thread_op_info= "sleeping"; } diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index 40ae4f3a3e7..12baa8e8b39 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -203,7 +203,7 @@ static dberr_t create_log_file(bool create_new_db, lsn_t lsn) os_file_t file{ os_file_create_func(logfile0.c_str(), OS_FILE_CREATE | OS_FILE_ON_ERROR_NO_EXIT, - OS_FILE_NORMAL, OS_LOG_FILE, false, &ret) + OS_FILE_AIO, OS_LOG_FILE, false, &ret) }; if (!ret) { @@ -524,7 +524,7 @@ srv_check_undo_redo_logs_exists() fh = os_file_create_func(logfilename.c_str(), OS_FILE_OPEN_RETRY | OS_FILE_ON_ERROR_NO_EXIT | OS_FILE_ON_ERROR_SILENT, - OS_FILE_NORMAL, OS_LOG_FILE, + OS_FILE_AIO, OS_LOG_FILE, srv_read_only_mode, &ret); if (ret) { diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index af00db6606e..8383d6194d9 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -1139,9 +1139,6 @@ static void trx_flush_log_if_needed_low(lsn_t lsn, const trx_t *trx) if (!srv_flush_log_at_trx_commit) return; - if (log_sys.get_flushed_lsn(std::memory_order_relaxed) >= lsn) - return; - completion_callback cb, *callback= nullptr; if (trx->state != TRX_STATE_PREPARED && !log_sys.is_pmem() && |