diff options
author | Vladislav Vaintroub <wlad@mariadb.com> | 2021-09-20 17:11:12 +0200 |
---|---|---|
committer | Vladislav Vaintroub <wlad@mariadb.com> | 2021-09-20 18:05:03 +0200 |
commit | 550b11f12acc1792d87b2e1761d752b5f7cd9c39 (patch) | |
tree | bd07b036d03d96c938c210dfb57f42c92c77ae21 | |
parent | aad6d2f1498185f535479b1160b23d2a397988de (diff) | |
download | mariadb-git-10.7-wlad-async-log-io.tar.gz |
MDEV-26603 asynchronous redo log write10.7-wlad-async-log-io
- Split log_write_up_to into 2 functions - the one initiating the write
and the write completion. In case of async write, the completion is called
by another thread.
- Allow asyncronous writes to log, if caller does not want to wait.
- Change srv_sync_log_buffer_in_background() so it does not wait for write
completion.
-rw-r--r-- | storage/innobase/include/log0log.h | 27 | ||||
-rw-r--r-- | storage/innobase/include/os0file.h | 7 | ||||
-rw-r--r-- | storage/innobase/log/log0log.cc | 327 | ||||
-rw-r--r-- | storage/innobase/os/os0file.cc | 22 | ||||
-rw-r--r-- | storage/innobase/srv/srv0srv.cc | 2 |
5 files changed, 313 insertions, 72 deletions
diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h index a651a56e8a0..a9cd97a0838 100644 --- a/storage/innobase/include/log0log.h +++ b/storage/innobase/include/log0log.h @@ -118,7 +118,7 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false, also to be flushed to disk. */ void log_buffer_flush_to_disk( - bool sync = true); + bool sync = true, bool wait=true); /** Make a checkpoint */ ATTRIBUTE_COLD void log_make_checkpoint(); @@ -379,9 +379,9 @@ public: const char *new_path) noexcept= 0; virtual dberr_t close() noexcept= 0; virtual dberr_t read(os_offset_t offset, span<byte> buf) noexcept= 0; - virtual dberr_t write(const char *path, os_offset_t offset, - span<const byte> buf) noexcept= 0; + virtual dberr_t write(const char *path, tpool::aiocb *cb, bool sync) noexcept= 0; virtual dberr_t flush() noexcept= 0; + virtual bool async_io_supported() const noexcept= 0; /** Durable writes doesn't require calling flush() */ bool writes_are_durable() const noexcept { return m_durable_writes; } @@ -405,9 +405,9 @@ public: dberr_t rename(const char *old_path, const char *new_path) noexcept final; dberr_t close() noexcept final; dberr_t read(os_offset_t offset, span<byte> buf) noexcept final; - dberr_t write(const char *path, os_offset_t offset, - span<const byte> buf) noexcept final; + dberr_t write(const char *path, tpool::aiocb *cb, bool sync) noexcept final; dberr_t flush() noexcept final; + bool async_io_supported() const noexcept final {return true;} private: pfs_os_file_t m_fd{OS_FILE_CLOSED}; @@ -428,7 +428,8 @@ public: dberr_t close() noexcept; dberr_t read(os_offset_t offset, span<byte> buf) noexcept; bool writes_are_durable() const noexcept; - dberr_t write(os_offset_t offset, span<const byte> buf) noexcept; + bool async_io_supported() const noexcept; + dberr_t write(tpool::aiocb *cb, bool sync) noexcept; dberr_t flush() noexcept; void free() { @@ -526,6 +527,20 @@ public: void read(os_offset_t offset, span<byte> buf); /** Tells whether writes require calling flush() */ bool writes_are_durable() const noexcept; + /* Whether there is a AIO support. + False for memory mapped files , true for anything else.*/ + bool async_io_supported() const noexcept; + /** Perform async, or sync IO on file. + @param[in] cb - IO control block + @param[in] sync - true if synchronous IO should be used + @note cb can contain m_callback parameter, the function + that is called after IO is finished. This callback is also + executed in case of synchronous IO + */ + void begin_write(tpool::aiocb *cb, bool sync); + /**Finish write on file, which was started with begin_write + This function mainly updates statistic counters */ + void complete_write(tpool::aiocb* cb); /** writes buffer to log file @param[in] offset offset in log file @param[in] buf buffer from which to write */ diff --git a/storage/innobase/include/os0file.h b/storage/innobase/include/os0file.h index 7e190b340de..b1d91a6fdf3 100644 --- a/storage/innobase/include/os0file.h +++ b/storage/innobase/include/os0file.h @@ -1111,6 +1111,13 @@ void os_aio_free(); @retval DB_IO_ERROR on I/O error */ dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n); +/** Submit async IO to threadpool. + +A light wrapper of srv_thread_pool->submit_io, which adds +PFS and error handling */ +dberr_t os_file_submit_aio(pfs_os_file_t handle, const char *name, + tpool::aiocb *cb); + /** Wait until there are no pending asynchronous writes. Only used on FLUSH TABLES...FOR EXPORT. */ void os_aio_wait_until_no_pending_writes(); diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 2757571b52c..bbf2fa306d2 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -285,7 +285,7 @@ dberr_t file_os_io::open(const char *path, bool read_only) noexcept bool success; auto tmp_fd= os_file_create( innodb_log_file_key, path, OS_FILE_OPEN | OS_FILE_ON_ERROR_NO_EXIT, - OS_FILE_NORMAL, OS_LOG_FILE, read_only, &success); + OS_FILE_AIO, OS_LOG_FILE, read_only, &success); if (!success) return DB_ERROR; @@ -314,11 +314,15 @@ dberr_t file_os_io::read(os_offset_t offset, span<byte> buf) noexcept return os_file_read(IORequestRead, m_fd, buf.data(), offset, buf.size()); } -dberr_t file_os_io::write(const char *path, os_offset_t offset, - span<const byte> buf) noexcept +dberr_t file_os_io::write(const char *path, tpool::aiocb *cb, bool sync) noexcept { - return os_file_write(IORequestWrite, path, m_fd, buf.data(), offset, - buf.size()); + if (sync) + { + cb->m_err = os_file_write(IORequestWrite, path, m_fd, cb->m_buffer, cb->m_offset, + cb->m_len); + return cb->m_err == 0?DB_SUCCESS:DB_IO_ERROR; + } + return os_file_submit_aio(m_fd, path, cb); } dberr_t file_os_io::flush() noexcept @@ -356,10 +360,11 @@ public: memcpy(buf.data(), m_file.data() + offset, buf.size()); return DB_SUCCESS; } - dberr_t write(const char *, os_offset_t offset, - span<const byte> buf) noexcept final + dberr_t write(const char *, tpool::aiocb *cb, + bool sync) noexcept final { - pmem_memcpy_persist(m_file.data() + offset, buf.data(), buf.size()); + ut_ad(sync); + pmem_memcpy_persist(m_file.data() + cb->m_offset, cb->m_buffer, cb->m_len); return DB_SUCCESS; } dberr_t flush() noexcept final @@ -367,7 +372,10 @@ public: ut_ad(0); return DB_SUCCESS; } - + bool async_io_supported noexcept final + { + return false; + } private: mapped_file_t m_file; }; @@ -428,10 +436,15 @@ bool log_file_t::writes_are_durable() const noexcept return m_file->writes_are_durable(); } -dberr_t log_file_t::write(os_offset_t offset, span<const byte> buf) noexcept +bool log_file_t::async_io_supported() const noexcept +{ + return m_file->async_io_supported(); +} + +dberr_t log_file_t::write(tpool::aiocb *cb, bool sync) noexcept { ut_ad(is_opened()); - return m_file->write(m_path.c_str(), offset, buf); + return m_file->write(m_path.c_str(),cb, sync); } dberr_t log_file_t::flush() noexcept @@ -490,17 +503,43 @@ bool log_t::file::writes_are_durable() const noexcept return fd.writes_are_durable(); } -void log_t::file::write(os_offset_t offset, span<byte> buf) +bool log_t::file::async_io_supported() const noexcept +{ + return fd.async_io_supported(); +} + +void log_t::file::begin_write(tpool::aiocb *cb, bool sync) { srv_stats.os_log_pending_writes.inc(); - if (const dberr_t err= fd.write(offset, buf)) + if (const dberr_t err= fd.write(cb, sync)) ib::fatal() << "write(" << fd.get_path() << ") returned " << err; +} + +void log_t::file::complete_write(tpool::aiocb *cb) +{ + if (cb->m_err) + ib::fatal() << "write(" << fd.get_path() << ") returned " << cb->m_err; + srv_stats.os_log_pending_writes.dec(); - srv_stats.os_log_written.add(buf.size()); + srv_stats.os_log_written.add(cb->m_len); srv_stats.log_writes.inc(); log_sys.n_log_ios++; } +void log_t::file::write(os_offset_t offset, span<byte> buf) +{ + tpool::aiocb cb; + cb.m_offset = offset; + cb.m_buffer=buf.begin(); + ut_a(buf.size() < INT_MAX); + cb.m_len= (int)buf.size(); + cb.m_callback=nullptr; /* No callback function */ + + begin_write(&cb,true); + complete_write(&cb); +} + + void log_t::file::flush() { log_sys.pending_flushes.fetch_add(1, std::memory_order_acquire); @@ -510,6 +549,7 @@ void log_t::file::flush() log_sys.flushes.fetch_add(1, std::memory_order_release); } + void log_t::file::close_file() { if (fd.is_opened()) @@ -547,10 +587,12 @@ log_write_buf( lsn_t start_lsn, /*!< in: start lsn of the buffer; must be divisible by OS_FILE_LOG_BLOCK_SIZE */ - ulint new_data_offset)/*!< in: start offset of new data in + ulint new_data_offset,/*!< in: start offset of new data in buf: this parameter is used to decide if we have to write a new log file header */ + bool sync, + tpool::aiocb *cb) { ulint write_len; lsn_t next_offset; @@ -610,15 +652,20 @@ loop: } ut_a((next_offset >> srv_page_size_shift) <= ULINT_MAX); - - log_sys.log.write(static_cast<size_t>(next_offset), {buf, write_len}); - - if (write_len < len) { - start_lsn += write_len; - len -= write_len; - buf += write_len; - goto loop; - } + bool last_write= write_len >= len; + if (!last_write) + { + log_sys.log.write(next_offset, {buf, write_len}); + start_lsn+= write_len; + len-= write_len; + buf+= write_len; + goto loop; + } + cb->m_offset= next_offset; + cb->m_buffer = buf; + ut_a(write_len < INT_MAX); + cb->m_len = (int)write_len; + log_sys.log.begin_write(cb,sync); } /** Flush the recently written changes to the log file. @@ -660,6 +707,34 @@ log_buffer_switch() log writes have been completed. */ void log_flush_notify(lsn_t flush_lsn); +static void log_write_complete_io(tpool::aiocb *cb); + +/** + Information which is passed during async log IO + the completion thread. We keep it global variable + as it is protected by the group commit lock, and + there can be only single outstanding IO. +*/ + +struct Log_aio_cb:tpool::aiocb +{ + lsn_t write_lsn=0; + bool synchronous_io=false; + bool do_write; + bool do_flush; +#ifndef DBUG_OFF + bool crash_on_completion= false; +#endif + Log_aio_cb(bool write, bool flush):do_write(write), do_flush(flush) + { + m_opcode= tpool::aio_opcode::AIO_PWRITE; + m_callback = (tpool::callback_func)log_write_complete_io; + } +}; + +static Log_aio_cb flush_aio_cb(true, true); +static Log_aio_cb write_aio_cb(true, false); + /** Writes log buffer to disk which is the "write" part of log_write_up_to(). @@ -670,17 +745,12 @@ Note : the caller must have log_sys.mutex locked, and this mutex is released in the function. */ -static void log_write(bool rotate_key) +static void log_write(bool rotate_key, bool sync, tpool::aiocb *cb) { mysql_mutex_assert_owner(&log_sys.mutex); ut_ad(!recv_no_log_write); lsn_t write_lsn; - if (log_sys.buf_free == log_sys.buf_next_to_write) { - /* Nothing to write */ - mysql_mutex_unlock(&log_sys.mutex); - return; - } - + ut_a (log_sys.buf_free != log_sys.buf_next_to_write); ulint start_offset; ulint end_offset; ulint area_start; @@ -750,7 +820,7 @@ static void log_write(bool rotate_key) area_end - area_start, rotate_key ? LOG_ENCRYPT_ROTATE_KEY : LOG_ENCRYPT); } - + srv_stats.log_padded.add(pad_size); /* Do the write to the log file */ log_write_buf( write_buf + area_start, area_end - area_start + pad_size, @@ -759,13 +829,7 @@ static void log_write(bool rotate_key) #endif /* UNIV_DEBUG */ ut_uint64_align_down(log_sys.write_lsn, OS_FILE_LOG_BLOCK_SIZE), - start_offset - area_start); - srv_stats.log_padded.add(pad_size); - log_sys.write_lsn = write_lsn; - if (log_sys.log.writes_are_durable()) { - log_sys.set_flushed_lsn(write_lsn); - log_flush_notify(write_lsn); - } + start_offset - area_start, sync, cb); return; } @@ -779,6 +843,89 @@ bool log_write_lock_own() } #endif +/* + Used to take over the group commit role + in log_write_up_to(), but only if nobody else is currently doing it. + group commit lock supports not waiting and not signaling caller, + if the callback function is NULL. +*/ +static const completion_callback log_nowait_callback{nullptr, nullptr}; +static const completion_callback log_force_nowait_callback{nullptr, nullptr}; + +static void log_write_complete_io(tpool::aiocb *aiocb) +{ + Log_aio_cb *cb= (Log_aio_cb *) aiocb; + ut_ad(aiocb->m_opcode == tpool::aio_opcode::AIO_PWRITE); + ut_ad(cb->do_write || cb->do_flush); + lsn_t write_waiter_lsn= 0; + lsn_t flush_waiter_lsn= 0; + if (cb->do_write) + { + log_sys.log.complete_write(cb); + ut_ad(cb->write_lsn); + log_sys.write_lsn= cb->write_lsn; + write_waiter_lsn= write_lock.release(cb->write_lsn); + } + + if (cb->do_flush) + { + /* Flush the highest written lsn.*/ + auto flush_lsn= write_lock.value(); + flush_lock.set_pending(flush_lsn); + log_write_flush_to_disk_low(flush_lsn); + flush_waiter_lsn= flush_lock.release(flush_lsn); +#ifndef DBUG_OFF + if (cb->crash_on_completion) + DBUG_SUICIDE(); +#endif + log_flush_notify(flush_lsn); + } + + /* + If there are async waiters pending, and no new lead, + we have to do something to release them. + + Initiate an log_write_up_to, asynchronously, with dummy completion + so that current thread could become new commit lead for short moment + Not, that if current call was synchronous, this + introduces a (limited) recursion, but we'll leave this thread + anyway via one of below + - fast return from group_commit_lock::acquire() with + return code EXPIRED or CALLBACK_QUEUED, + - after async IO initiation in log_write, + - after thread_pool::submit_task for flush only + */ + + if (flush_waiter_lsn) + { + log_write_up_to(flush_waiter_lsn, true, false, &log_force_nowait_callback); + } + else if (write_waiter_lsn) + { + ut_ad(log_sys.log.async_io_supported()); + log_write_up_to(write_waiter_lsn, false, false, + &log_force_nowait_callback); + } + +} + +/* + Whether synchronous log IO should be used for log writing and flushing. + TODO : figure out conditions where sync.write is more beneficial. +*/ +static bool should_do_sync_log_write(const completion_callback *callback, + bool with_flush) +{ + if (!callback) + return true; /* Caller wants to wait */ + if (!log_sys.log.async_io_supported()) + return true; /* Can't do async IO*/ + if (!callback->m_callback) + return false; /* fire-and-forget call.*/ + if (!with_flush) + return true; + return false; +} /** Ensure that the log has been written to the log file up to a given log entry (such as that of a transaction commit). Start a new write, or @@ -787,14 +934,18 @@ wait and check if an already running write is covering the request. included in the redo log file write @param[in] flush_to_disk whether the written log should also be flushed to the file system -@param[in] rotate_key whether to rotate the encryption key */ +@param[in] rotate_key whether to rotate the encryption key +@param[in] callback - signal completion by the callback. + Function will return, instead of waiting, if it would have to wait otherwise. + A special callback value {0,0} initiate write/flush if nobody else does, + does not wait for completion, does not use callbacks. +*/ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key, const completion_callback *callback) { ut_ad(!srv_read_only_mode); ut_ad(!rotate_key || flush_to_disk); ut_ad(lsn != LSN_MAX); - if (recv_no_ibuf_operations) { /* Recovery is running and no operations on the log files are @@ -803,43 +954,89 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key, return; } - if (flush_to_disk && - flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED) - return; + Log_aio_cb *aiocb; + if (flush_to_disk) + { + aiocb= &flush_aio_cb; + if (flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED) + return; + /* + Check if we should write. May be someone else has written already + up to lsn, then we only flush. Use synchroous wait, no callbacks, we need + to ensure that we wrote up to this lsn. + */ + aiocb->do_write= + write_lock.acquire(lsn, nullptr) == group_commit_lock::ACQUIRED; + ut_ad(aiocb->do_flush); + } + else + { + /* Write only, don't flush */ + if (write_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED) + return; + aiocb= &write_aio_cb; + ut_ad(aiocb->do_write); + ut_ad(!aiocb->do_flush); + } + ut_ad(aiocb->m_opcode == tpool::aio_opcode::AIO_PWRITE); + DBUG_EXECUTE_IF("crash_after_log_write_upto", + aiocb->crash_on_completion= true;); - if (write_lock.acquire(lsn, flush_to_disk ? nullptr : callback) == - group_commit_lock::ACQUIRED) + if (aiocb->do_write) { + bool do_sync_write= should_do_sync_log_write(callback, aiocb->do_flush); mysql_mutex_lock(&log_sys.mutex); lsn_t write_lsn= log_sys.get_lsn(); + if (log_sys.buf_free == log_sys.buf_next_to_write) + { + /* Nothing to write */ + mysql_mutex_unlock(&log_sys.mutex); + aiocb->do_write= false; + write_lock.release(write_lsn); + if (flush_to_disk) + goto complete_io; + return; + } write_lock.set_pending(write_lsn); - - log_write(rotate_key); - - ut_a(log_sys.write_lsn == write_lsn); - write_lock.release(write_lsn); + aiocb->write_lsn= write_lsn; + ut_ad(aiocb->m_callback == (tpool::callback_func) log_write_complete_io); + log_write(rotate_key, do_sync_write, aiocb); + /* log_sys.mutex will be released by log_write. */ + if (!do_sync_write) + { + /* Eventually, log_write_complete_io is executed by AIO completion thread.*/ + return; + } } - if (!flush_to_disk) - return; - - /* Flush the highest written lsn.*/ - auto flush_lsn = write_lock.value(); - flush_lock.set_pending(flush_lsn); - log_write_flush_to_disk_low(flush_lsn); - flush_lock.release(flush_lsn); - - log_flush_notify(flush_lsn); - DBUG_EXECUTE_IF("crash_after_log_write_upto", DBUG_SUICIDE();); +complete_io: + if (callback == &log_force_nowait_callback) + { + /* Special callback indicates that we were called from + log_write_complete_io(). Execute in another thread,to avoid recursion */ + static tpool::task async_complete_io( + (tpool::callback_func) log_write_complete_io, aiocb); + srv_thread_pool->submit_task(&async_complete_io); + } + else + { + log_write_complete_io(aiocb); + } } /** write to the log file up to the last log entry. @param[in] sync whether we want the written log -also to be flushed to disk. */ -void log_buffer_flush_to_disk(bool sync) +also to be flushed to disk. +@param[in] wait wait for completion +*/ +void log_buffer_flush_to_disk(bool sync, bool wait) { ut_ad(!srv_read_only_mode); - log_write_up_to(log_sys.get_lsn(std::memory_order_acquire), sync); + auto lsn= log_sys.get_lsn(std::memory_order_acquire); + if (wait) + log_write_up_to(lsn, sync); + else + log_write_up_to(lsn, sync, false, &log_nowait_callback); } /******************************************************************** diff --git a/storage/innobase/os/os0file.cc b/storage/innobase/os/os0file.cc index 613bef9b724..8640e8f75b6 100644 --- a/storage/innobase/os/os0file.cc +++ b/storage/innobase/os/os0file.cc @@ -3911,6 +3911,28 @@ func_exit: goto func_exit; } + +dberr_t os_file_submit_aio(pfs_os_file_t handle, const char *name, tpool::aiocb * cb) +{ + bool is_read= cb->m_opcode==tpool::aio_opcode::AIO_PREAD; +#ifdef UNIV_PFS_IO + auto n= cb->m_len; + PSI_file_locker_state state; + PSI_file_locker *locker= nullptr; + register_pfs_file_io_begin(&state, locker, handle, n, + is_read?PSI_FILE_READ:PSI_FILE_WRITE,__FILE__, __LINE__); +#endif + cb->m_fh= handle; + auto err= srv_thread_pool->submit_io(cb); + if (err) + os_file_handle_error(name, is_read? "aio read": "aio write"); +#ifdef UNIV_PFS_IO + register_pfs_file_io_end(locker, n); +#endif + return err?DB_IO_ERROR:DB_SUCCESS; +} + + /** Prints info of the aio arrays. @param[in,out] file file where to print */ void diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index 7416bbd415d..530c46274a7 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -1493,7 +1493,7 @@ static void srv_sync_log_buffer_in_background() srv_main_thread_op_info = "flushing log"; if (difftime(current_time, srv_last_log_flush_time) >= srv_flush_log_at_timeout) { - log_buffer_flush_to_disk(); + log_buffer_flush_to_disk(true,false); srv_last_log_flush_time = current_time; srv_log_writes_and_flush++; } |