summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVladislav Vaintroub <wlad@mariadb.com>2021-09-20 17:11:12 +0200
committerVladislav Vaintroub <wlad@mariadb.com>2021-09-20 18:05:03 +0200
commit550b11f12acc1792d87b2e1761d752b5f7cd9c39 (patch)
treebd07b036d03d96c938c210dfb57f42c92c77ae21
parentaad6d2f1498185f535479b1160b23d2a397988de (diff)
downloadmariadb-git-10.7-wlad-async-log-io.tar.gz
MDEV-26603 asynchronous redo log write10.7-wlad-async-log-io
- Split log_write_up_to into 2 functions - the one initiating the write and the write completion. In case of async write, the completion is called by another thread. - Allow asyncronous writes to log, if caller does not want to wait. - Change srv_sync_log_buffer_in_background() so it does not wait for write completion.
-rw-r--r--storage/innobase/include/log0log.h27
-rw-r--r--storage/innobase/include/os0file.h7
-rw-r--r--storage/innobase/log/log0log.cc327
-rw-r--r--storage/innobase/os/os0file.cc22
-rw-r--r--storage/innobase/srv/srv0srv.cc2
5 files changed, 313 insertions, 72 deletions
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h
index a651a56e8a0..a9cd97a0838 100644
--- a/storage/innobase/include/log0log.h
+++ b/storage/innobase/include/log0log.h
@@ -118,7 +118,7 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false,
also to be flushed to disk. */
void
log_buffer_flush_to_disk(
- bool sync = true);
+ bool sync = true, bool wait=true);
/** Make a checkpoint */
ATTRIBUTE_COLD void log_make_checkpoint();
@@ -379,9 +379,9 @@ public:
const char *new_path) noexcept= 0;
virtual dberr_t close() noexcept= 0;
virtual dberr_t read(os_offset_t offset, span<byte> buf) noexcept= 0;
- virtual dberr_t write(const char *path, os_offset_t offset,
- span<const byte> buf) noexcept= 0;
+ virtual dberr_t write(const char *path, tpool::aiocb *cb, bool sync) noexcept= 0;
virtual dberr_t flush() noexcept= 0;
+ virtual bool async_io_supported() const noexcept= 0;
/** Durable writes doesn't require calling flush() */
bool writes_are_durable() const noexcept { return m_durable_writes; }
@@ -405,9 +405,9 @@ public:
dberr_t rename(const char *old_path, const char *new_path) noexcept final;
dberr_t close() noexcept final;
dberr_t read(os_offset_t offset, span<byte> buf) noexcept final;
- dberr_t write(const char *path, os_offset_t offset,
- span<const byte> buf) noexcept final;
+ dberr_t write(const char *path, tpool::aiocb *cb, bool sync) noexcept final;
dberr_t flush() noexcept final;
+ bool async_io_supported() const noexcept final {return true;}
private:
pfs_os_file_t m_fd{OS_FILE_CLOSED};
@@ -428,7 +428,8 @@ public:
dberr_t close() noexcept;
dberr_t read(os_offset_t offset, span<byte> buf) noexcept;
bool writes_are_durable() const noexcept;
- dberr_t write(os_offset_t offset, span<const byte> buf) noexcept;
+ bool async_io_supported() const noexcept;
+ dberr_t write(tpool::aiocb *cb, bool sync) noexcept;
dberr_t flush() noexcept;
void free()
{
@@ -526,6 +527,20 @@ public:
void read(os_offset_t offset, span<byte> buf);
/** Tells whether writes require calling flush() */
bool writes_are_durable() const noexcept;
+ /* Whether there is a AIO support.
+ False for memory mapped files , true for anything else.*/
+ bool async_io_supported() const noexcept;
+ /** Perform async, or sync IO on file.
+ @param[in] cb - IO control block
+ @param[in] sync - true if synchronous IO should be used
+ @note cb can contain m_callback parameter, the function
+ that is called after IO is finished. This callback is also
+ executed in case of synchronous IO
+ */
+ void begin_write(tpool::aiocb *cb, bool sync);
+ /**Finish write on file, which was started with begin_write
+ This function mainly updates statistic counters */
+ void complete_write(tpool::aiocb* cb);
/** writes buffer to log file
@param[in] offset offset in log file
@param[in] buf buffer from which to write */
diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h
index 7e190b340de..b1d91a6fdf3 100644
--- a/storage/innobase/include/os0file.h
+++ b/storage/innobase/include/os0file.h
@@ -1111,6 +1111,13 @@ void os_aio_free();
@retval DB_IO_ERROR on I/O error */
dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n);
+/** Submit async IO to threadpool.
+
+A light wrapper of srv_thread_pool->submit_io, which adds
+PFS and error handling */
+dberr_t os_file_submit_aio(pfs_os_file_t handle, const char *name,
+ tpool::aiocb *cb);
+
/** Wait until there are no pending asynchronous writes.
Only used on FLUSH TABLES...FOR EXPORT. */
void os_aio_wait_until_no_pending_writes();
diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc
index 2757571b52c..bbf2fa306d2 100644
--- a/storage/innobase/log/log0log.cc
+++ b/storage/innobase/log/log0log.cc
@@ -285,7 +285,7 @@ dberr_t file_os_io::open(const char *path, bool read_only) noexcept
bool success;
auto tmp_fd= os_file_create(
innodb_log_file_key, path, OS_FILE_OPEN | OS_FILE_ON_ERROR_NO_EXIT,
- OS_FILE_NORMAL, OS_LOG_FILE, read_only, &success);
+ OS_FILE_AIO, OS_LOG_FILE, read_only, &success);
if (!success)
return DB_ERROR;
@@ -314,11 +314,15 @@ dberr_t file_os_io::read(os_offset_t offset, span<byte> buf) noexcept
return os_file_read(IORequestRead, m_fd, buf.data(), offset, buf.size());
}
-dberr_t file_os_io::write(const char *path, os_offset_t offset,
- span<const byte> buf) noexcept
+dberr_t file_os_io::write(const char *path, tpool::aiocb *cb, bool sync) noexcept
{
- return os_file_write(IORequestWrite, path, m_fd, buf.data(), offset,
- buf.size());
+ if (sync)
+ {
+ cb->m_err = os_file_write(IORequestWrite, path, m_fd, cb->m_buffer, cb->m_offset,
+ cb->m_len);
+ return cb->m_err == 0?DB_SUCCESS:DB_IO_ERROR;
+ }
+ return os_file_submit_aio(m_fd, path, cb);
}
dberr_t file_os_io::flush() noexcept
@@ -356,10 +360,11 @@ public:
memcpy(buf.data(), m_file.data() + offset, buf.size());
return DB_SUCCESS;
}
- dberr_t write(const char *, os_offset_t offset,
- span<const byte> buf) noexcept final
+ dberr_t write(const char *, tpool::aiocb *cb,
+ bool sync) noexcept final
{
- pmem_memcpy_persist(m_file.data() + offset, buf.data(), buf.size());
+ ut_ad(sync);
+ pmem_memcpy_persist(m_file.data() + cb->m_offset, cb->m_buffer, cb->m_len);
return DB_SUCCESS;
}
dberr_t flush() noexcept final
@@ -367,7 +372,10 @@ public:
ut_ad(0);
return DB_SUCCESS;
}
-
+ bool async_io_supported noexcept final
+ {
+ return false;
+ }
private:
mapped_file_t m_file;
};
@@ -428,10 +436,15 @@ bool log_file_t::writes_are_durable() const noexcept
return m_file->writes_are_durable();
}
-dberr_t log_file_t::write(os_offset_t offset, span<const byte> buf) noexcept
+bool log_file_t::async_io_supported() const noexcept
+{
+ return m_file->async_io_supported();
+}
+
+dberr_t log_file_t::write(tpool::aiocb *cb, bool sync) noexcept
{
ut_ad(is_opened());
- return m_file->write(m_path.c_str(), offset, buf);
+ return m_file->write(m_path.c_str(),cb, sync);
}
dberr_t log_file_t::flush() noexcept
@@ -490,17 +503,43 @@ bool log_t::file::writes_are_durable() const noexcept
return fd.writes_are_durable();
}
-void log_t::file::write(os_offset_t offset, span<byte> buf)
+bool log_t::file::async_io_supported() const noexcept
+{
+ return fd.async_io_supported();
+}
+
+void log_t::file::begin_write(tpool::aiocb *cb, bool sync)
{
srv_stats.os_log_pending_writes.inc();
- if (const dberr_t err= fd.write(offset, buf))
+ if (const dberr_t err= fd.write(cb, sync))
ib::fatal() << "write(" << fd.get_path() << ") returned " << err;
+}
+
+void log_t::file::complete_write(tpool::aiocb *cb)
+{
+ if (cb->m_err)
+ ib::fatal() << "write(" << fd.get_path() << ") returned " << cb->m_err;
+
srv_stats.os_log_pending_writes.dec();
- srv_stats.os_log_written.add(buf.size());
+ srv_stats.os_log_written.add(cb->m_len);
srv_stats.log_writes.inc();
log_sys.n_log_ios++;
}
+void log_t::file::write(os_offset_t offset, span<byte> buf)
+{
+ tpool::aiocb cb;
+ cb.m_offset = offset;
+ cb.m_buffer=buf.begin();
+ ut_a(buf.size() < INT_MAX);
+ cb.m_len= (int)buf.size();
+ cb.m_callback=nullptr; /* No callback function */
+
+ begin_write(&cb,true);
+ complete_write(&cb);
+}
+
+
void log_t::file::flush()
{
log_sys.pending_flushes.fetch_add(1, std::memory_order_acquire);
@@ -510,6 +549,7 @@ void log_t::file::flush()
log_sys.flushes.fetch_add(1, std::memory_order_release);
}
+
void log_t::file::close_file()
{
if (fd.is_opened())
@@ -547,10 +587,12 @@ log_write_buf(
lsn_t start_lsn, /*!< in: start lsn of the buffer; must
be divisible by
OS_FILE_LOG_BLOCK_SIZE */
- ulint new_data_offset)/*!< in: start offset of new data in
+ ulint new_data_offset,/*!< in: start offset of new data in
buf: this parameter is used to decide
if we have to write a new log file
header */
+ bool sync,
+ tpool::aiocb *cb)
{
ulint write_len;
lsn_t next_offset;
@@ -610,15 +652,20 @@ loop:
}
ut_a((next_offset >> srv_page_size_shift) <= ULINT_MAX);
-
- log_sys.log.write(static_cast<size_t>(next_offset), {buf, write_len});
-
- if (write_len < len) {
- start_lsn += write_len;
- len -= write_len;
- buf += write_len;
- goto loop;
- }
+ bool last_write= write_len >= len;
+ if (!last_write)
+ {
+ log_sys.log.write(next_offset, {buf, write_len});
+ start_lsn+= write_len;
+ len-= write_len;
+ buf+= write_len;
+ goto loop;
+ }
+ cb->m_offset= next_offset;
+ cb->m_buffer = buf;
+ ut_a(write_len < INT_MAX);
+ cb->m_len = (int)write_len;
+ log_sys.log.begin_write(cb,sync);
}
/** Flush the recently written changes to the log file.
@@ -660,6 +707,34 @@ log_buffer_switch()
log writes have been completed. */
void log_flush_notify(lsn_t flush_lsn);
+static void log_write_complete_io(tpool::aiocb *cb);
+
+/**
+ Information which is passed during async log IO
+ the completion thread. We keep it global variable
+ as it is protected by the group commit lock, and
+ there can be only single outstanding IO.
+*/
+
+struct Log_aio_cb:tpool::aiocb
+{
+ lsn_t write_lsn=0;
+ bool synchronous_io=false;
+ bool do_write;
+ bool do_flush;
+#ifndef DBUG_OFF
+ bool crash_on_completion= false;
+#endif
+ Log_aio_cb(bool write, bool flush):do_write(write), do_flush(flush)
+ {
+ m_opcode= tpool::aio_opcode::AIO_PWRITE;
+ m_callback = (tpool::callback_func)log_write_complete_io;
+ }
+};
+
+static Log_aio_cb flush_aio_cb(true, true);
+static Log_aio_cb write_aio_cb(true, false);
+
/**
Writes log buffer to disk
which is the "write" part of log_write_up_to().
@@ -670,17 +745,12 @@ Note : the caller must have log_sys.mutex locked, and this
mutex is released in the function.
*/
-static void log_write(bool rotate_key)
+static void log_write(bool rotate_key, bool sync, tpool::aiocb *cb)
{
mysql_mutex_assert_owner(&log_sys.mutex);
ut_ad(!recv_no_log_write);
lsn_t write_lsn;
- if (log_sys.buf_free == log_sys.buf_next_to_write) {
- /* Nothing to write */
- mysql_mutex_unlock(&log_sys.mutex);
- return;
- }
-
+ ut_a (log_sys.buf_free != log_sys.buf_next_to_write);
ulint start_offset;
ulint end_offset;
ulint area_start;
@@ -750,7 +820,7 @@ static void log_write(bool rotate_key)
area_end - area_start,
rotate_key ? LOG_ENCRYPT_ROTATE_KEY : LOG_ENCRYPT);
}
-
+ srv_stats.log_padded.add(pad_size);
/* Do the write to the log file */
log_write_buf(
write_buf + area_start, area_end - area_start + pad_size,
@@ -759,13 +829,7 @@ static void log_write(bool rotate_key)
#endif /* UNIV_DEBUG */
ut_uint64_align_down(log_sys.write_lsn,
OS_FILE_LOG_BLOCK_SIZE),
- start_offset - area_start);
- srv_stats.log_padded.add(pad_size);
- log_sys.write_lsn = write_lsn;
- if (log_sys.log.writes_are_durable()) {
- log_sys.set_flushed_lsn(write_lsn);
- log_flush_notify(write_lsn);
- }
+ start_offset - area_start, sync, cb);
return;
}
@@ -779,6 +843,89 @@ bool log_write_lock_own()
}
#endif
+/*
+ Used to take over the group commit role
+ in log_write_up_to(), but only if nobody else is currently doing it.
+ group commit lock supports not waiting and not signaling caller,
+ if the callback function is NULL.
+*/
+static const completion_callback log_nowait_callback{nullptr, nullptr};
+static const completion_callback log_force_nowait_callback{nullptr, nullptr};
+
+static void log_write_complete_io(tpool::aiocb *aiocb)
+{
+ Log_aio_cb *cb= (Log_aio_cb *) aiocb;
+ ut_ad(aiocb->m_opcode == tpool::aio_opcode::AIO_PWRITE);
+ ut_ad(cb->do_write || cb->do_flush);
+ lsn_t write_waiter_lsn= 0;
+ lsn_t flush_waiter_lsn= 0;
+ if (cb->do_write)
+ {
+ log_sys.log.complete_write(cb);
+ ut_ad(cb->write_lsn);
+ log_sys.write_lsn= cb->write_lsn;
+ write_waiter_lsn= write_lock.release(cb->write_lsn);
+ }
+
+ if (cb->do_flush)
+ {
+ /* Flush the highest written lsn.*/
+ auto flush_lsn= write_lock.value();
+ flush_lock.set_pending(flush_lsn);
+ log_write_flush_to_disk_low(flush_lsn);
+ flush_waiter_lsn= flush_lock.release(flush_lsn);
+#ifndef DBUG_OFF
+ if (cb->crash_on_completion)
+ DBUG_SUICIDE();
+#endif
+ log_flush_notify(flush_lsn);
+ }
+
+ /*
+ If there are async waiters pending, and no new lead,
+ we have to do something to release them.
+
+ Initiate an log_write_up_to, asynchronously, with dummy completion
+ so that current thread could become new commit lead for short moment
+ Not, that if current call was synchronous, this
+ introduces a (limited) recursion, but we'll leave this thread
+ anyway via one of below
+ - fast return from group_commit_lock::acquire() with
+ return code EXPIRED or CALLBACK_QUEUED,
+ - after async IO initiation in log_write,
+ - after thread_pool::submit_task for flush only
+ */
+
+ if (flush_waiter_lsn)
+ {
+ log_write_up_to(flush_waiter_lsn, true, false, &log_force_nowait_callback);
+ }
+ else if (write_waiter_lsn)
+ {
+ ut_ad(log_sys.log.async_io_supported());
+ log_write_up_to(write_waiter_lsn, false, false,
+ &log_force_nowait_callback);
+ }
+
+}
+
+/*
+ Whether synchronous log IO should be used for log writing and flushing.
+ TODO : figure out conditions where sync.write is more beneficial.
+*/
+static bool should_do_sync_log_write(const completion_callback *callback,
+ bool with_flush)
+{
+ if (!callback)
+ return true; /* Caller wants to wait */
+ if (!log_sys.log.async_io_supported())
+ return true; /* Can't do async IO*/
+ if (!callback->m_callback)
+ return false; /* fire-and-forget call.*/
+ if (!with_flush)
+ return true;
+ return false;
+}
/** Ensure that the log has been written to the log file up to a given
log entry (such as that of a transaction commit). Start a new write, or
@@ -787,14 +934,18 @@ wait and check if an already running write is covering the request.
included in the redo log file write
@param[in] flush_to_disk whether the written log should also
be flushed to the file system
-@param[in] rotate_key whether to rotate the encryption key */
+@param[in] rotate_key whether to rotate the encryption key
+@param[in] callback - signal completion by the callback.
+ Function will return, instead of waiting, if it would have to wait otherwise.
+ A special callback value {0,0} initiate write/flush if nobody else does,
+ does not wait for completion, does not use callbacks.
+*/
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key,
const completion_callback *callback)
{
ut_ad(!srv_read_only_mode);
ut_ad(!rotate_key || flush_to_disk);
ut_ad(lsn != LSN_MAX);
-
if (recv_no_ibuf_operations)
{
/* Recovery is running and no operations on the log files are
@@ -803,43 +954,89 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key,
return;
}
- if (flush_to_disk &&
- flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
- return;
+ Log_aio_cb *aiocb;
+ if (flush_to_disk)
+ {
+ aiocb= &flush_aio_cb;
+ if (flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
+ return;
+ /*
+ Check if we should write. May be someone else has written already
+ up to lsn, then we only flush. Use synchroous wait, no callbacks, we need
+ to ensure that we wrote up to this lsn.
+ */
+ aiocb->do_write=
+ write_lock.acquire(lsn, nullptr) == group_commit_lock::ACQUIRED;
+ ut_ad(aiocb->do_flush);
+ }
+ else
+ {
+ /* Write only, don't flush */
+ if (write_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
+ return;
+ aiocb= &write_aio_cb;
+ ut_ad(aiocb->do_write);
+ ut_ad(!aiocb->do_flush);
+ }
+ ut_ad(aiocb->m_opcode == tpool::aio_opcode::AIO_PWRITE);
+ DBUG_EXECUTE_IF("crash_after_log_write_upto",
+ aiocb->crash_on_completion= true;);
- if (write_lock.acquire(lsn, flush_to_disk ? nullptr : callback) ==
- group_commit_lock::ACQUIRED)
+ if (aiocb->do_write)
{
+ bool do_sync_write= should_do_sync_log_write(callback, aiocb->do_flush);
mysql_mutex_lock(&log_sys.mutex);
lsn_t write_lsn= log_sys.get_lsn();
+ if (log_sys.buf_free == log_sys.buf_next_to_write)
+ {
+ /* Nothing to write */
+ mysql_mutex_unlock(&log_sys.mutex);
+ aiocb->do_write= false;
+ write_lock.release(write_lsn);
+ if (flush_to_disk)
+ goto complete_io;
+ return;
+ }
write_lock.set_pending(write_lsn);
-
- log_write(rotate_key);
-
- ut_a(log_sys.write_lsn == write_lsn);
- write_lock.release(write_lsn);
+ aiocb->write_lsn= write_lsn;
+ ut_ad(aiocb->m_callback == (tpool::callback_func) log_write_complete_io);
+ log_write(rotate_key, do_sync_write, aiocb);
+ /* log_sys.mutex will be released by log_write. */
+ if (!do_sync_write)
+ {
+ /* Eventually, log_write_complete_io is executed by AIO completion thread.*/
+ return;
+ }
}
- if (!flush_to_disk)
- return;
-
- /* Flush the highest written lsn.*/
- auto flush_lsn = write_lock.value();
- flush_lock.set_pending(flush_lsn);
- log_write_flush_to_disk_low(flush_lsn);
- flush_lock.release(flush_lsn);
-
- log_flush_notify(flush_lsn);
- DBUG_EXECUTE_IF("crash_after_log_write_upto", DBUG_SUICIDE(););
+complete_io:
+ if (callback == &log_force_nowait_callback)
+ {
+ /* Special callback indicates that we were called from
+ log_write_complete_io(). Execute in another thread,to avoid recursion */
+ static tpool::task async_complete_io(
+ (tpool::callback_func) log_write_complete_io, aiocb);
+ srv_thread_pool->submit_task(&async_complete_io);
+ }
+ else
+ {
+ log_write_complete_io(aiocb);
+ }
}
/** write to the log file up to the last log entry.
@param[in] sync whether we want the written log
-also to be flushed to disk. */
-void log_buffer_flush_to_disk(bool sync)
+also to be flushed to disk.
+@param[in] wait wait for completion
+*/
+void log_buffer_flush_to_disk(bool sync, bool wait)
{
ut_ad(!srv_read_only_mode);
- log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), sync);
+ auto lsn= log_sys.get_lsn(std::memory_order_acquire);
+ if (wait)
+ log_write_up_to(lsn, sync);
+ else
+ log_write_up_to(lsn, sync, false, &log_nowait_callback);
}
/********************************************************************
diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc
index 613bef9b724..8640e8f75b6 100644
--- a/storage/innobase/os/os0file.cc
+++ b/storage/innobase/os/os0file.cc
@@ -3911,6 +3911,28 @@ func_exit:
goto func_exit;
}
+
+dberr_t os_file_submit_aio(pfs_os_file_t handle, const char *name, tpool::aiocb * cb)
+{
+ bool is_read= cb->m_opcode==tpool::aio_opcode::AIO_PREAD;
+#ifdef UNIV_PFS_IO
+ auto n= cb->m_len;
+ PSI_file_locker_state state;
+ PSI_file_locker *locker= nullptr;
+ register_pfs_file_io_begin(&state, locker, handle, n,
+ is_read?PSI_FILE_READ:PSI_FILE_WRITE,__FILE__, __LINE__);
+#endif
+ cb->m_fh= handle;
+ auto err= srv_thread_pool->submit_io(cb);
+ if (err)
+ os_file_handle_error(name, is_read? "aio read": "aio write");
+#ifdef UNIV_PFS_IO
+ register_pfs_file_io_end(locker, n);
+#endif
+ return err?DB_IO_ERROR:DB_SUCCESS;
+}
+
+
/** Prints info of the aio arrays.
@param[in,out] file file where to print */
void
diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc
index 7416bbd415d..530c46274a7 100644
--- a/storage/innobase/srv/srv0srv.cc
+++ b/storage/innobase/srv/srv0srv.cc
@@ -1493,7 +1493,7 @@ static void srv_sync_log_buffer_in_background()
srv_main_thread_op_info = "flushing log";
if (difftime(current_time, srv_last_log_flush_time)
>= srv_flush_log_at_timeout) {
- log_buffer_flush_to_disk();
+ log_buffer_flush_to_disk(true,false);
srv_last_log_flush_time = current_time;
srv_log_writes_and_flush++;
}