summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2022-03-07 10:36:16 +0100
committerVladislav Vaintroub <wlad@mariadb.com>2022-03-11 11:11:45 +0100
commit24541d06140b20e8fcd27ac5c7a38c14c876fc6d (patch)
tree860ea5a4286a913002fb4bcc08c71006fc20b7dd
parentb95942a2a7b0127e0fc466876974bf48e2c53ed3 (diff)
downloadmariadb-git-bb-10.9-MDEV-26603-async-redo-write.tar.gz
MDEV-26603 asynchronous redo log writebb-10.9-MDEV-26603-async-redo-write
Split redo log write submission and completion. Currently, AIO is not really used much, because it turns out to be 10-15% slower than synchronous IO, in write-heavy benchmarks. It looks like that AIO with callbacks in threadpool currently adds more context switches, if submitting thread waits for it. With this patch, AIO is only used where latency is not that important, in "fire-and-forget" manner, e.g log_write_up_to() retry logic, or prior to LRU flushing,or in srv_master periodic timer task. Still, thanks to AIO, internal group commit logic could be improved and sped up. Apparently, "next groupcommit lead candidate" thread waiting was removed. After some benchmarking, spins in group_commit_lock::acquire() were removed as they did not have any positive effect. So overall the effect of that patch should be positive.
-rw-r--r--storage/innobase/buf/buf0flu.cc2
-rw-r--r--storage/innobase/handler/ha_innodb.cc3
-rw-r--r--storage/innobase/include/log0log.h36
-rw-r--r--storage/innobase/log/log0log.cc269
-rw-r--r--storage/innobase/log/log0sync.cc136
-rw-r--r--storage/innobase/log/log0sync.h19
-rw-r--r--storage/innobase/mtr/mtr0mtr.cc4
-rw-r--r--storage/innobase/os/os0file.cc8
-rw-r--r--storage/innobase/srv/srv0srv.cc9
-rw-r--r--storage/innobase/srv/srv0start.cc4
-rw-r--r--storage/innobase/trx/trx0trx.cc3
11 files changed, 323 insertions, 170 deletions
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc
index 090757ef7ce..3667d0d2d89 100644
--- a/storage/innobase/buf/buf0flu.cc
+++ b/storage/innobase/buf/buf0flu.cc
@@ -1660,7 +1660,7 @@ ulint buf_flush_LRU(ulint max_n)
if (buf_pool.n_flush_LRU())
return 0;
- log_buffer_flush_to_disk();
+ log_buffer_flush_to_disk_async();
mysql_mutex_lock(&buf_pool.mutex);
if (buf_pool.n_flush_LRU_)
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index f1adfbe83a5..d50ec7f51c4 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -1775,6 +1775,7 @@ MYSQL_THD innobase_create_background_thd(const char* name)
MYSQL_THD thd= create_background_thd();
thd_proc_info(thd, name);
THDVAR(thd, background_thread) = true;
+ ut_ad(!thd_get_thread_id(thd));
return thd;
}
@@ -1782,7 +1783,7 @@ extern "C" void thd_increment_pending_ops(MYSQL_THD);
THD *innodb_thd_increment_pending_ops(THD *thd)
{
- if (!thd || THDVAR(thd, background_thread))
+ if (!thd || !thd_get_thread_id(thd))
return nullptr;
thd_increment_pending_ops(thd);
return thd;
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h
index 7e20fc12cda..f298b619107 100644
--- a/storage/innobase/include/log0log.h
+++ b/storage/innobase/include/log0log.h
@@ -70,15 +70,25 @@ void log_write_up_to(lsn_t lsn, bool durable,
const completion_callback *callback= nullptr);
/** Write to the log file up to the last log entry.
-@param durable whether to wait for a durable write to complete */
-void log_buffer_flush_to_disk(bool durable= true);
+@param durable whether to wait for a durable write to complete
+@param wait whether to wait for completion
+*/
+void log_buffer_flush_to_disk(bool durable= true, bool wait=true);
+
+/** Initiate log buffer write/flush */
+static inline void log_buffer_flush_to_disk_async()
+{
+ log_buffer_flush_to_disk(true, false);
+}
/** Prepare to invoke log_write_and_flush(), before acquiring log_sys.latch. */
ATTRIBUTE_COLD void log_write_and_flush_prepare();
-/** Durably write the log up to log_sys.get_lsn(). */
-ATTRIBUTE_COLD void log_write_and_flush();
+/** Durably write the log up to log_sys.get_lsn().
+@return lsn that log_write_up_to() must be invoked with
+@retval 0 if there is no need to invoke log_write_up_to() */
+ATTRIBUTE_COLD __attribute__((warn_unused_result)) lsn_t log_write_and_flush();
/** Make a checkpoint */
ATTRIBUTE_COLD void log_make_checkpoint();
@@ -136,7 +146,8 @@ public:
dberr_t close() noexcept;
dberr_t read(os_offset_t offset, span<byte> buf) noexcept;
- void write(os_offset_t offset, span<const byte> buf) noexcept;
+ void write(os_offset_t offset, span<const byte> buf,
+ tpool::aiocb *cb= nullptr) noexcept;
bool flush() const noexcept { return os_file_flush(m_file); }
#ifdef HAVE_PMEM
byte *mmap(bool read_only, const struct stat &st) noexcept;
@@ -485,8 +496,19 @@ public:
/** Write buf to ib_logfile0.
@tparam release_latch whether to invoke latch.wr_unlock()
- @return the current log sequence number */
- template<bool release_latch> inline lsn_t write_buf() noexcept;
+ @param durable whether to invoke a durable write
+ @param sync whether to invoke a synchronous write
+ @return new write target
+ @retval 0 if there is no need to call log_write_up_to() */
+ template<bool release_latch>
+ inline lsn_t write_buf(bool durable, bool sync) noexcept;
+
+ /** Complete write_buf().
+ @param lsn new value of write_lsn
+ @param durable whether the write was durable
+ @return new write target
+ @retval 0 if there is no need to call log_write_up_to() */
+ inline lsn_t complete_write_buf(lsn_t lsn, bool durable) noexcept;
/** Create the log. */
void create(lsn_t lsn) noexcept;
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 83b78ebf385..f1dd89fd5d2 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -74,6 +74,38 @@ log_t log_sys;
#define LOG_BUF_FLUSH_MARGIN ((4 * 4096) /* cf. log_t::append_prepare() */ \
+ (4U << srv_page_size_shift))
+/**
+ group commit completion callback used for anything
+ that can run asynchronous
+*/
+static const completion_callback async_io_callback{nullptr, nullptr};
+
+/**
+ group commit completion callback that is forcing synchronous IO
+*/
+static const completion_callback sync_io_callback{nullptr, nullptr};
+
+#ifndef DBUG_OFF
+/**
+ Crashing after disk flush requested via dbug_debug flag.
+ flush can be executed by background thread,
+ where DBUG_EXECUTE_IF() does not work, this the value
+ is passed via global variable.
+*/
+static bool crash_after_flush;
+#endif
+
+static void report_aio_error(const char *text, tpool::aiocb *cb);
+
+/** AIO control block with auxilliary information, for async writing.
+Protected by write_lock.*/
+struct Log_aiocb : tpool::aiocb
+{
+ lsn_t lsn;
+ bool durable;
+};
+static Log_aiocb log_aiocb;
+
void log_t::set_capacity()
{
#ifndef SUX_LOCK_GENERIC
@@ -155,12 +187,29 @@ dberr_t log_file_t::read(os_offset_t offset, span<byte> buf) noexcept
return os_file_read(IORequestRead, m_file, buf.data(), offset, buf.size());
}
-void log_file_t::write(os_offset_t offset, span<const byte> buf) noexcept
+void log_file_t::write(os_offset_t offset, span<const byte> buf,
+ tpool::aiocb *iocb) noexcept
{
ut_ad(is_opened());
- if (dberr_t err= os_file_write_func(IORequestWrite, "ib_logfile0", m_file,
+ if (iocb)
+ {
+ ut_ad(buf.size() < UINT_MAX);
+ iocb->m_fh= m_file;
+ iocb->m_opcode= tpool::aio_opcode::AIO_PWRITE;
+ iocb->m_offset= offset;
+ iocb->m_buffer= (void *) buf.data();
+ iocb->m_len= (unsigned) buf.size();
+ if (srv_thread_pool->submit_io(iocb))
+ {
+ iocb->m_err= IF_WIN(GetLastError(), errno);
+ report_aio_error("submitting asynchronous write to ib_logfile0", iocb);
+ }
+ }
+ else if (dberr_t err= os_file_write(IORequestWrite, "ib_logfile0", m_file,
buf.data(), offset, buf.size()))
+ {
ib::fatal() << "write(\"ib_logfile0\") returned " << err;
+ }
}
#ifdef HAVE_PMEM
@@ -505,10 +554,13 @@ void log_t::resize_abort() noexcept
/** Write an aligned buffer to ib_logfile0.
@param buf buffer to be written
@param len length of data to be written
-@param offset log file offset */
-static void log_write_buf(const byte *buf, size_t len, lsn_t offset)
+@param offset log file offset
+@param cb completion callback */
+static void log_write_buf(const byte *buf, size_t len, lsn_t offset,
+ tpool::aiocb *cb)
{
- ut_ad(write_lock.is_owner());
+ ut_ad(cb ? !write_lock.has_owner() : write_lock.is_owner());
+ ut_ad(write_lock.locked());
ut_ad(!recv_no_log_write);
ut_d(const size_t block_size_1= log_sys.get_block_size() - 1);
ut_ad(!(offset & block_size_1));
@@ -519,7 +571,7 @@ static void log_write_buf(const byte *buf, size_t len, lsn_t offset)
if (UNIV_LIKELY(offset + len <= log_sys.file_size))
{
write:
- log_sys.log.write(offset, {buf, len});
+ log_sys.log.write(offset, {buf, len}, cb);
return;
}
@@ -730,30 +782,110 @@ ATTRIBUTE_COLD void log_t::resize_write_buf(size_t length) noexcept
resize_flush_buf, offset, length) == DB_SUCCESS);
}
+
+static void report_aio_error(const char *text, tpool::aiocb *cb)
+{
+ ib::fatal() << "IO Error "
+ << cb->m_err IF_WIN(, << " " << strerror(cb->m_err)) << " "
+ << text << "," << cb->m_len << " bytes at offset "
+ << cb->m_offset;
+}
+
+/** Ensure that previous log writes are durable.
+@return new durable lsn target
+@retval 0 if caller does not need to call log_write_up_to() again
+
+*/
+static lsn_t log_flush()
+{
+ ut_ad(!log_sys.is_pmem());
+ lsn_t lsn= write_lock.value();
+ ut_a(log_sys.flush(lsn));
+#ifndef DBUG_OFF
+ if (crash_after_flush)
+ DBUG_SUICIDE();
+#endif
+ return flush_lock.release(lsn);
+}
+
+
+/** Complete write_buf().
+@param lsn new value of write_lsn
+@param durable whether the write was durable
+@return new write target
+@retval 0 if there is no need to call log_write_up_to() */
+inline lsn_t log_t::complete_write_buf(lsn_t lsn, bool durable) noexcept
+{
+ ut_ad(write_lock.is_owner());
+ ut_ad(durable == flush_lock.is_owner());
+
+ ut_a(lsn >= write_lsn);
+
+ write_lsn= lsn;
+ lsn_t pending_lsn= write_lock.release(lsn);
+ if (durable)
+ pending_lsn= std::max(pending_lsn, log_flush());
+ return pending_lsn;
+}
+
+static void aio_complete_write_buf(void *p)
+{
+ ut_ad(write_lock.locked());
+
+ Log_aiocb *cb= static_cast<Log_aiocb *>(p);
+ if (cb->m_err)
+ report_aio_error("in asynchronous write to ib_logfile0", cb);
+ const bool durable{cb->durable};
+#ifdef UNIV_DEBUG
+ if (durable)
+ {
+ ut_ad(flush_lock.locked());
+ flush_lock.set_owner();
+ }
+ write_lock.set_owner();
+#endif
+
+ if (lsn_t ret_lsn= log_sys.complete_write_buf(cb->lsn, durable))
+ {
+ /** prevent stalls. Also, force special synchronous callback
+ as optimization. We'll avoid threadpool machinery and context
+ switching (we're already in the background thread here)
+ */
+ log_write_up_to(ret_lsn, durable, &sync_io_callback);
+ }
+}
+
+
/** Write buf to ib_logfile0.
@tparam release_latch whether to invoke latch.wr_unlock()
-@return the current log sequence number */
-template<bool release_latch> inline lsn_t log_t::write_buf() noexcept
+@param durable whether to invoke a durable write
+@param sync whether to invoke a synchronous write
+@return new write target
+@retval 0 if there is no need to call log_write_up_to() */
+template<bool release_latch>
+inline lsn_t log_t::write_buf(bool durable, bool sync) noexcept
{
#ifndef SUX_LOCK_GENERIC
ut_ad(latch.is_write_locked());
#endif
ut_ad(!srv_read_only_mode);
ut_ad(!is_pmem());
+ ut_ad(write_lock.is_owner());
+ ut_ad(durable == flush_lock.is_owner());
const lsn_t lsn{get_lsn(std::memory_order_relaxed)};
-
+ DBUG_EXECUTE_IF("crash_after_log_write_upto", crash_after_flush= true;);
if (write_lsn >= lsn)
{
if (release_latch)
latch.wr_unlock();
- ut_ad(write_lsn == lsn);
+ ut_a(write_lsn == lsn);
+ return complete_write_buf(lsn, durable);
}
else
{
ut_ad(!recv_no_log_write);
- write_lock.set_pending(lsn);
- ut_ad(write_lsn >= get_flushed_lsn());
+ ut_a(write_lsn >= get_flushed_lsn());
const size_t block_size_1{get_block_size() - 1};
lsn_t offset{calc_lsn_offset(write_lsn) & ~lsn_t{block_size_1}};
@@ -804,20 +936,34 @@ template<bool release_latch> inline lsn_t log_t::write_buf() noexcept
"InnoDB log write: " LSN_PF, write_lsn);
}
- /* Do the write to the log file */
- log_write_buf(write_buf, length, offset);
if (UNIV_LIKELY_NULL(resize_buf))
resize_write_buf(length);
- write_lsn= lsn;
- }
- return lsn;
+ /* Do the write to the log file */
+ if (sync)
+ {
+ log_write_buf(write_buf, length, offset, nullptr);
+ return complete_write_buf(lsn, durable);
+ }
+
+ /* Async log IO
+ Note : flush/write lock ownership is going to migrate to a
+ background thread*/
+ ut_d(write_lock.reset_owner());
+ ut_d(if (durable) flush_lock.reset_owner());
+
+ log_aiocb.m_callback= aio_complete_write_buf;
+ log_aiocb.durable= durable;
+ log_aiocb.lsn= lsn;
+ log_write_buf(write_buf, length, offset, &log_aiocb);
+ return 0;
+ }
}
bool log_t::flush(lsn_t lsn) noexcept
{
ut_ad(lsn >= get_flushed_lsn());
- flush_lock.set_pending(lsn);
+ ut_ad(flush_lock.is_owner());
const bool success{srv_file_flush_method == SRV_O_DSYNC || log.flush()};
if (UNIV_LIKELY(success))
{
@@ -827,22 +973,25 @@ bool log_t::flush(lsn_t lsn) noexcept
return success;
}
-/** Ensure that previous log writes are durable.
-@param lsn previously written LSN
-@return new durable lsn target
-@retval 0 if there are no pending callbacks on flush_lock
- or there is another group commit lead.
+/*
+ Decide about whether to do synchronous IO.
+ Async might not make sense because of the higher latency or CPU
+ overhead in threadpool, or because the file is cached,and say libaio
+ can't do AIO on cached files.
+
+ Async IO apparently makes sense always if the waiter does
+ not care about result (i.e callback with NULL function)
+
+ NOTE: currently, async IO is mostly unused, because it turns
+ out to be worse in benchmarks. Perhaps it is just too many threads
+ involved in waking and waiting.
*/
-static lsn_t log_flush(lsn_t lsn)
+static bool use_sync_log_write(bool /* durable */,
+ const completion_callback *cb)
{
- ut_ad(!log_sys.is_pmem());
- ut_a(log_sys.flush(lsn));
- DBUG_EXECUTE_IF("crash_after_log_write_upto", DBUG_SUICIDE(););
- return flush_lock.release(lsn);
+ return !cb || cb->m_callback || cb == &sync_io_callback;
}
-static const completion_callback dummy_callback{[](void *) {},nullptr};
-
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
wait and check if an already running write is covering the request.
@@ -859,7 +1008,7 @@ void log_write_up_to(lsn_t lsn, bool durable,
{
/* A non-final batch of recovery is active no writes to the log
are allowed yet. */
- ut_a(!callback);
+ ut_a(!callback || !callback->m_callback);
return;
}
@@ -868,7 +1017,7 @@ void log_write_up_to(lsn_t lsn, bool durable,
#ifdef HAVE_PMEM
if (log_sys.is_pmem())
{
- ut_ad(!callback);
+ ut_ad(!callback || !callback->m_callback);
if (durable)
log_sys.persist(lsn);
return;
@@ -876,42 +1025,49 @@ void log_write_up_to(lsn_t lsn, bool durable,
#endif
repeat:
- if (durable)
- {
- if (flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
- return;
- flush_lock.set_pending(log_sys.get_lsn());
- }
+ if (durable &&
+ flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
+ return;
- lsn_t pending_write_lsn= 0, pending_flush_lsn= 0;
+ lsn_t pending_lsn= 0;
if (write_lock.acquire(lsn, durable ? nullptr : callback) ==
group_commit_lock::ACQUIRED)
{
+ const bool sync{use_sync_log_write(durable, callback)};
log_sys.latch.wr_lock(SRW_LOCK_CALL);
- pending_write_lsn= write_lock.release(log_sys.write_buf<true>());
+ pending_lsn= log_sys.write_buf<true>(durable, sync);
+ if (!pending_lsn)
+ return;
}
- if (durable)
+ if (durable && !pending_lsn)
{
- pending_flush_lsn= log_flush(write_lock.value());
+ /* We only get here if flush_lock is acquired, but write_lock
+ is expired, i.e lsn was already written, but not flushed yet.*/
+ pending_lsn= log_flush();
}
- if (pending_write_lsn || pending_flush_lsn)
+ if (pending_lsn)
{
- /* There is no new group commit lead; some async waiters could stall. */
- callback= &dummy_callback;
- lsn= std::max(pending_write_lsn, pending_flush_lsn);
+ /* There is no new group commit lead; some waiters could stall.
+ If special sync_io_callback was used we'll continue to use it
+ as optimization to reduce context switches.
+ */
+ if (callback != &sync_io_callback)
+ callback= &async_io_callback;
+ lsn= pending_lsn;
goto repeat;
}
}
/** Write to the log file up to the last log entry.
@param durable whether to wait for a durable write to complete */
-void log_buffer_flush_to_disk(bool durable)
+void log_buffer_flush_to_disk(bool durable, bool wait)
{
ut_ad(!srv_read_only_mode);
- log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), durable);
+ log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), durable,
+ wait ? nullptr : &async_io_callback);
}
/** Prepare to invoke log_write_and_flush(), before acquiring log_sys.latch. */
@@ -926,20 +1082,19 @@ ATTRIBUTE_COLD void log_write_and_flush_prepare()
group_commit_lock::ACQUIRED);
}
-/** Durably write the log up to log_sys.get_lsn(). */
-ATTRIBUTE_COLD void log_write_and_flush()
+/** Durably write the log up to log_sys.get_lsn().
+@return lsn that log_write_up_to() must be invoked with
+@retval 0 if there is no need to invoke log_write_up_to() */
+ATTRIBUTE_COLD __attribute__((warn_unused_result)) lsn_t log_write_and_flush()
{
ut_ad(!srv_read_only_mode);
if (!log_sys.is_pmem())
- {
- const lsn_t lsn{log_sys.write_buf<false>()};
- write_lock.release(lsn);
- log_flush(lsn);
- }
+ return log_sys.write_buf<false>(true, true);
+
#ifdef HAVE_PMEM
- else
- log_sys.persist(log_sys.get_lsn());
+ log_sys.persist(log_sys.get_lsn());
#endif
+ return 0;
}
/********************************************************************
diff --git a/storage/innobase/log/log0sync.cc b/storage/innobase/log/log0sync.cc
index 6b14d1d3591..641d7e65c36 100644
--- a/storage/innobase/log/log0sync.cc
+++ b/storage/innobase/log/log0sync.cc
@@ -1,5 +1,5 @@
/*****************************************************************************
-Copyright (c) 2020 MariaDB Corporation.
+Copyright (c) 2020, 2022, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -72,12 +72,8 @@ Note that if write operation is very fast, a) or b) can be fine as alternative.
#include <thread>
#include <mutex>
#include <condition_variable>
-#include <my_cpu.h>
-#include <log0types.h>
#include "log0sync.h"
-#include <mysql/service_thd_wait.h>
-#include <sql_class.h>
/**
Helper class , used in group commit lock.
@@ -166,7 +162,7 @@ struct group_commit_waiter_t
};
group_commit_lock::group_commit_lock() :
- m_mtx(), m_value(0), m_pending_value(0), m_lock(false), m_waiters_list()
+ m_mtx(), m_value(0), m_lock(false), m_waiters_list()
{
}
@@ -175,60 +171,32 @@ group_commit_lock::value_type group_commit_lock::value() const
return m_value.load(std::memory_order::memory_order_relaxed);
}
-group_commit_lock::value_type group_commit_lock::pending() const
-{
- return m_pending_value.load(std::memory_order::memory_order_relaxed);
-}
-
-void group_commit_lock::set_pending(group_commit_lock::value_type num)
-{
- ut_a(num >= value());
- m_pending_value.store(num, std::memory_order::memory_order_relaxed);
-}
-const unsigned int MAX_SPINS = 1; /** max spins in acquire */
thread_local group_commit_waiter_t thread_local_waiter;
static inline void do_completion_callback(const completion_callback* cb)
{
- if (cb)
+ if (cb && cb->m_callback)
cb->m_callback(cb->m_param);
}
-group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
+inline void store_callback(lsn_t num, const completion_callback *cb,
+ std::vector<std::pair<lsn_t, completion_callback>> &v)
{
- unsigned int spins = MAX_SPINS;
-
- for(;;)
- {
- if (num <= value())
- {
- /* No need to wait.*/
- do_completion_callback(callback);
- return lock_return_code::EXPIRED;
- }
-
- if(spins-- == 0)
- break;
- if (num > pending())
- {
- /* Longer wait expected (longer than currently running operation),
- don't spin.*/
- break;
- }
- ut_delay(1);
- }
+ if (!cb || !cb->m_callback)
+ return;
+ v.push_back({num, *cb});
+}
- thread_local_waiter.m_value = num;
- thread_local_waiter.m_group_commit_leader= false;
+group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
+{
+ bool group_commit_leader= false;
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
- while (num > value() || thread_local_waiter.m_group_commit_leader)
+ while (num > value() || group_commit_leader)
{
lk.lock();
-
/* Re-read current value after acquiring the lock*/
- if (num <= value() &&
- (!thread_local_waiter.m_group_commit_leader || m_lock))
+ if (num <= value() && (!group_commit_leader || m_lock))
{
lk.unlock();
do_completion_callback(callback);
@@ -239,40 +207,30 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, c
{
/* Take the lock, become group commit leader.*/
m_lock = true;
-#ifndef DBUG_OFF
- m_owner_id = std::this_thread::get_id();
-#endif
- if (callback)
- m_pending_callbacks.push_back({num,*callback});
+ ut_d(set_owner());
+ store_callback(num, callback,m_pending_callbacks);
return lock_return_code::ACQUIRED;
}
- if (callback && (m_waiters_list || num <= pending()))
+ if (callback)
{
- /*
- If num > pending(), we have a good candidate for the next group
- commit lead, that will be taking over the lock after current owner
- releases it. We put current thread into waiter's list so it sleeps
- and can be signaled and marked as group commit lead during lock release.
-
- For this to work well, pending() must deliver a good approximation for N
- in the next call to group_commit_lock::release(N).
- */
- m_pending_callbacks.push_back({num, *callback});
+ store_callback(num, callback, m_pending_callbacks);
return lock_return_code::CALLBACK_QUEUED;
}
/* Add yourself to waiters list.*/
- thread_local_waiter.m_group_commit_leader= false;
- thread_local_waiter.m_next = m_waiters_list;
- m_waiters_list = &thread_local_waiter;
+ auto *waiter= &thread_local_waiter;
+ waiter->m_value= num;
+ waiter->m_group_commit_leader= false;
+ waiter->m_next= m_waiters_list;
+ m_waiters_list= waiter;
lk.unlock();
/* Sleep until woken in release().*/
thd_wait_begin(0,THD_WAIT_GROUP_COMMIT);
- thread_local_waiter.m_sema.wait();
+ waiter->m_sema.wait();
thd_wait_end(0);
-
+ group_commit_leader= waiter->m_group_commit_leader;
}
do_completion_callback(callback);
return lock_return_code::EXPIRED;
@@ -282,9 +240,8 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
{
completion_callback callbacks[1000];
size_t callback_count = 0;
- value_type ret = 0;
+ value_type ret= 0;
std::unique_lock<std::mutex> lk(m_mtx);
- m_lock = false;
/* Update current value. */
ut_a(num >= value());
@@ -307,6 +264,12 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
}
}
+ auto it=
+ std::remove_if(m_pending_callbacks.begin(), m_pending_callbacks.end(),
+ [num](const pending_cb &c) { return c.first <= num; });
+
+ m_pending_callbacks.erase(it, m_pending_callbacks.end());
+
for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
{
next= cur->m_next;
@@ -335,12 +298,6 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
}
}
- auto it= std::remove_if(
- m_pending_callbacks.begin(), m_pending_callbacks.end(),
- [num](const pending_cb &c) { return c.first <= num; });
-
- m_pending_callbacks.erase(it, m_pending_callbacks.end());
-
if (m_pending_callbacks.size() || m_waiters_list)
{
/*
@@ -370,7 +327,8 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
ret= m_pending_callbacks[0].first;
}
}
-
+ ut_d(reset_owner();)
+ m_lock= false;
lk.unlock();
/*
@@ -396,9 +354,29 @@ group_commit_lock::value_type group_commit_lock::release(value_type num)
}
#ifndef DBUG_OFF
-bool group_commit_lock::is_owner()
+#include <tpool_structs.h>
+TPOOL_SUPPRESS_TSAN
+bool group_commit_lock::locked() const noexcept { return m_lock; }
+
+TPOOL_SUPPRESS_TSAN bool group_commit_lock::is_owner() const noexcept
{
- return m_lock && std::this_thread::get_id() == m_owner_id;
+ return locked() && std::this_thread::get_id() == m_owner_id;
+}
+
+void TPOOL_SUPPRESS_TSAN group_commit_lock::set_owner()
+{
+ DBUG_ASSERT(locked() && !has_owner());
+ m_owner_id= std::this_thread::get_id();
}
-#endif
+void TPOOL_SUPPRESS_TSAN group_commit_lock::reset_owner()
+{
+ DBUG_ASSERT(is_owner());
+ m_owner_id= std::thread::id();
+}
+
+bool TPOOL_SUPPRESS_TSAN group_commit_lock::has_owner() const noexcept
+{
+ return m_owner_id != std::thread::id();
+}
+#endif
diff --git a/storage/innobase/log/log0sync.h b/storage/innobase/log/log0sync.h
index 00686d39dac..7dc9e2aaf38 100644
--- a/storage/innobase/log/log0sync.h
+++ b/storage/innobase/log/log0sync.h
@@ -1,5 +1,5 @@
/*****************************************************************************
-Copyright (c) 2020 MariaDB Corporation.
+Copyright (c) 2020, 2022, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -56,14 +56,6 @@ Operations supported on this semaphore
- might return some lsn, meaning there are some pending
callbacks left, and there is no new group commit lead
(i.e caller must do something to flush those pending callbacks)
-
-3. value()
-- read current value
-
-4. pending_value()
-- read pending value
-
-5. set_pending_value()
*/
class group_commit_lock
{
@@ -73,7 +65,6 @@ class group_commit_lock
#endif
std::mutex m_mtx;
std::atomic<value_type> m_value;
- std::atomic<value_type> m_pending_value;
bool m_lock;
group_commit_waiter_t* m_waiters_list;
@@ -91,9 +82,11 @@ public:
lock_return_code acquire(value_type num, const completion_callback *cb);
value_type release(value_type num);
value_type value() const;
- value_type pending() const;
- void set_pending(value_type num);
#ifndef DBUG_OFF
- bool is_owner();
+ bool locked() const noexcept;
+ bool is_owner() const noexcept;
+ bool has_owner() const noexcept;
+ void set_owner();
+ void reset_owner();
#endif
};
diff --git a/storage/innobase/mtr/mtr0mtr.cc b/storage/innobase/mtr/mtr0mtr.cc
index 4e5809952cb..b76d230b761 100644
--- a/storage/innobase/mtr/mtr0mtr.cc
+++ b/storage/innobase/mtr/mtr0mtr.cc
@@ -540,7 +540,7 @@ void mtr_t::commit_shrink(fil_space_t &space)
const lsn_t start_lsn= do_write(true).first;
/* Durably write the reduced FSP_SIZE before truncating the data file. */
- log_write_and_flush();
+ lsn_t pending_lsn= log_write_and_flush();
#ifndef SUX_LOCK_GENERIC
ut_ad(log_sys.latch.is_write_locked());
#endif
@@ -585,6 +585,8 @@ void mtr_t::commit_shrink(fil_space_t &space)
m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>());
release_resources();
+ if (pending_lsn)
+ log_buffer_flush_to_disk_async();
}
/** Commit a mini-transaction that did not modify any pages,
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc
index a077f5736da..7406775b0eb 100644
--- a/storage/innobase/os/os0file.cc
+++ b/storage/innobase/os/os0file.cc
@@ -2220,6 +2220,8 @@ os_file_create_func(
? FILE_FLAG_OVERLAPPED : 0;
if (type == OS_LOG_FILE) {
+ ut_a(read_only || srv_thread_pool);
+ attributes|= FILE_FLAG_OVERLAPPED;
if(srv_flush_log_at_trx_commit != 2 && !log_sys.is_opened())
attributes|= FILE_FLAG_NO_BUFFERING;
if (srv_file_flush_method == SRV_O_DSYNC)
@@ -3706,7 +3708,11 @@ int os_aio_init()
OS_AIO_N_PENDING_IOS_PER_THREAD);
int max_read_events= int(srv_n_read_io_threads *
OS_AIO_N_PENDING_IOS_PER_THREAD);
- int max_events= max_read_events + max_write_events;
+
+ /* Count one extra aiocb for innodb async redo writes, which
+ bypasses the slots.*/
+
+ int max_events= max_read_events + max_write_events + 1;
read_slots.reset(new io_slots(max_read_events, srv_n_read_io_threads));
write_slots.reset(new io_slots(max_write_events, srv_n_write_io_threads));
diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc
index d9b2af54a32..9e093698af4 100644
--- a/storage/innobase/srv/srv0srv.cc
+++ b/storage/innobase/srv/srv0srv.cc
@@ -1455,7 +1455,7 @@ static void srv_sync_log_buffer_in_background()
srv_main_thread_op_info = "flushing log";
if (difftime(current_time, srv_last_log_flush_time)
>= srv_flush_log_at_timeout) {
- log_buffer_flush_to_disk();
+ log_buffer_flush_to_disk_async();
srv_last_log_flush_time = current_time;
srv_log_writes_and_flush++;
}
@@ -1567,15 +1567,14 @@ void srv_master_callback(void*)
if (!purge_state.m_running)
srv_wake_purge_thread_if_not_active();
ulonglong counter_time= microsecond_interval_timer();
- srv_sync_log_buffer_in_background();
- MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_LOG_FLUSH_MICROSECOND,
- counter_time);
-
if (srv_check_activity(&old_activity_count))
srv_master_do_active_tasks(counter_time);
else
srv_master_do_idle_tasks(counter_time);
+ srv_sync_log_buffer_in_background();
+ MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_LOG_FLUSH_MICROSECOND,
+ counter_time);
srv_main_thread_op_info= "sleeping";
}
diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc
index 4322f2a4e38..a81f5cb2e82 100644
--- a/storage/innobase/srv/srv0start.cc
+++ b/storage/innobase/srv/srv0start.cc
@@ -203,7 +203,7 @@ static dberr_t create_log_file(bool create_new_db, lsn_t lsn)
os_file_t file{
os_file_create_func(logfile0.c_str(),
OS_FILE_CREATE | OS_FILE_ON_ERROR_NO_EXIT,
- OS_FILE_NORMAL, OS_LOG_FILE, false, &ret)
+ OS_FILE_AIO, OS_LOG_FILE, false, &ret)
};
if (!ret) {
@@ -524,7 +524,7 @@ srv_check_undo_redo_logs_exists()
fh = os_file_create_func(logfilename.c_str(),
OS_FILE_OPEN_RETRY | OS_FILE_ON_ERROR_NO_EXIT
| OS_FILE_ON_ERROR_SILENT,
- OS_FILE_NORMAL, OS_LOG_FILE,
+ OS_FILE_AIO, OS_LOG_FILE,
srv_read_only_mode, &ret);
if (ret) {
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 7a4a51e4685..c880ed95f8f 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -1130,9 +1130,6 @@ static void trx_flush_log_if_needed_low(lsn_t lsn, const trx_t *trx)
if (!srv_flush_log_at_trx_commit)
return;
- if (log_sys.get_flushed_lsn(std::memory_order_relaxed) >= lsn)
- return;
-
completion_callback cb, *callback= nullptr;
if (trx->state != TRX_STATE_PREPARED && !log_sys.is_pmem() &&