diff options
Diffstat (limited to 'storage/innobase/mtr/mtr0mtr.cc')
-rw-r--r-- | storage/innobase/mtr/mtr0mtr.cc | 956 |
1 files changed, 606 insertions, 350 deletions
diff --git a/storage/innobase/mtr/mtr0mtr.cc b/storage/innobase/mtr/mtr0mtr.cc index b6d520d2e76..98f40bd01e4 100644 --- a/storage/innobase/mtr/mtr0mtr.cc +++ b/storage/innobase/mtr/mtr0mtr.cc @@ -24,18 +24,15 @@ Mini-transaction buffer Created 11/26/1995 Heikki Tuuri *******************************************************/ -#include "mtr0mtr.h" - +#include "mtr0log.h" #include "buf0buf.h" #include "buf0flu.h" -#include "fsp0sysspace.h" #include "page0types.h" -#include "mtr0log.h" -#include "log0recv.h" -#include "my_cpu.h" +#include "log0crypt.h" #ifdef BTR_CUR_HASH_ADAPT # include "btr0sea.h" #endif +#include "log.h" /** Iterate over a memo block in reverse. */ template <typename Functor> @@ -228,18 +225,19 @@ static void memo_slot_release(mtr_memo_slot_t *slot) default: buf_page_t *bpage= static_cast<buf_page_t*>(object); bpage->unfix(); - switch (auto latch= slot->type & ~MTR_MEMO_MODIFY) { + switch (type) { case MTR_MEMO_PAGE_S_FIX: bpage->lock.s_unlock(); - return; - case MTR_MEMO_PAGE_SX_FIX: - case MTR_MEMO_PAGE_X_FIX: - bpage->lock.u_or_x_unlock(latch == MTR_MEMO_PAGE_SX_FIX); - /* fall through */ + break; case MTR_MEMO_BUF_FIX: - return; + break; + default: + ut_ad(type == MTR_MEMO_PAGE_SX_FIX || + type == MTR_MEMO_PAGE_X_FIX || + type == MTR_MEMO_PAGE_SX_MODIFY || + type == MTR_MEMO_PAGE_X_MODIFY); + bpage->lock.u_or_x_unlock(type & MTR_MEMO_PAGE_SX_FIX); } - ut_ad("invalid type" == 0); } } @@ -355,28 +353,266 @@ struct DebugCheck { }; #endif -/** Release page latches held by the mini-transaction. */ -struct ReleaseBlocks +/** Prepare to insert a modified blcok into flush_list. +@param lsn start LSN of the mini-transaction +@return insert position for insert_into_flush_list() */ +inline buf_page_t *buf_pool_t::prepare_insert_into_flush_list(lsn_t lsn) + noexcept { +#ifndef SUX_LOCK_GENERIC + ut_ad(recv_recovery_is_on() || log_sys.latch.is_locked()); +#endif + ut_ad(lsn >= log_sys.last_checkpoint_lsn); + mysql_mutex_assert_owner(&flush_list_mutex); + static_assert(log_t::FIRST_LSN >= 2, "compatibility"); + +rescan: + buf_page_t *prev= UT_LIST_GET_FIRST(flush_list); + if (prev) + { + lsn_t om= prev->oldest_modification(); + if (om == 1) + { + delete_from_flush_list(prev); + goto rescan; + } + ut_ad(om > 2); + if (om <= lsn) + return nullptr; + while (buf_page_t *next= UT_LIST_GET_NEXT(list, prev)) + { + om= next->oldest_modification(); + if (om == 1) + { + delete_from_flush_list(next); + continue; + } + ut_ad(om > 2); + if (om <= lsn) + break; + prev= next; + } + flush_hp.adjust(prev); + } + return prev; +} + +/** Insert a modified block into the flush list. +@param prev insert position (from prepare_insert_into_flush_list()) +@param block modified block +@param lsn start LSN of the mini-transaction that modified the block */ +inline void buf_pool_t::insert_into_flush_list(buf_page_t *prev, + buf_block_t *block, lsn_t lsn) + noexcept +{ + ut_ad(!fsp_is_system_temporary(block->page.id().space())); + mysql_mutex_assert_owner(&flush_list_mutex); + + MEM_CHECK_DEFINED(block->page.zip.data + ? block->page.zip.data : block->page.frame, + block->physical_size()); + + if (const lsn_t old= block->page.oldest_modification()) + { + if (old > 1) + return; + flush_hp.adjust(&block->page); + UT_LIST_REMOVE(flush_list, &block->page); + } + else + stat.flush_list_bytes+= block->physical_size(); + + ut_ad(stat.flush_list_bytes <= curr_pool_size); + + if (prev) + UT_LIST_INSERT_AFTER(flush_list, prev, &block->page); + else + UT_LIST_ADD_FIRST(flush_list, &block->page); + + block->page.set_oldest_modification(lsn); +} + +/** Update modified pages of the mini-transaction. */ +struct ReleaseModified +{ + buf_page_t *const prev; const lsn_t start, end; - ReleaseBlocks(lsn_t start, lsn_t end) : start(start), end(end) {} + mutable size_t modified= 0; + + ReleaseModified(buf_page_t *prev, lsn_t start, lsn_t end) : + prev(prev), start(start), end(end) + { + ut_ad(start > 2); + ut_ad(end >= start); + } /** @return true always */ bool operator()(mtr_memo_slot_t *slot) const { - if (!slot->object) + if (!slot->object || !(slot->type & MTR_MEMO_MODIFY)) return true; - switch (slot->type) { - case MTR_MEMO_PAGE_X_MODIFY: - case MTR_MEMO_PAGE_SX_MODIFY: + ut_ad(slot->type == MTR_MEMO_PAGE_X_MODIFY || + slot->type == MTR_MEMO_PAGE_SX_MODIFY); + + modified++; + buf_block_t *b= static_cast<buf_block_t*>(slot->object); + ut_ad(b->page.id() < end_page_id); + ut_d(const auto s= b->page.state()); + ut_ad(s > buf_page_t::FREED); + ut_ad(s < buf_page_t::READ_FIX); + ut_ad(mach_read_from_8(b->page.frame + FIL_PAGE_LSN) <= end); + mach_write_to_8(b->page.frame + FIL_PAGE_LSN, end); + if (UNIV_LIKELY_NULL(b->page.zip.data)) + memcpy_aligned<8>(FIL_PAGE_LSN + b->page.zip.data, + FIL_PAGE_LSN + b->page.frame, 8); + buf_pool.insert_into_flush_list(prev, b, start); + return true; + } +}; + +/** Release latches to already dirtied pages. +This is a bit more than ReleaseAll, +kind of a combination of ReleaseLatches and a subset of ReleaseModified. */ +struct ReleaseSimple +{ + const lsn_t end; + mutable size_t modified; + ReleaseSimple(lsn_t end) : end(end), modified(0) { ut_ad(end); } + + /** @return true always */ + bool operator()(mtr_memo_slot_t *slot) const + { + void *object= slot->object; + if (!object) + return true; + slot->object= nullptr; + switch (const auto type= slot->type) { + case MTR_MEMO_S_LOCK: + static_cast<index_lock*>(object)->s_unlock(); + break; + case MTR_MEMO_SPACE_X_LOCK: + static_cast<fil_space_t*>(object)->set_committed_size(); + static_cast<fil_space_t*>(object)->x_unlock(); + break; + case MTR_MEMO_SPACE_S_LOCK: + static_cast<fil_space_t*>(object)->s_unlock(); + break; + case MTR_MEMO_X_LOCK: + case MTR_MEMO_SX_LOCK: + static_cast<index_lock*>(object)-> + u_or_x_unlock(type == MTR_MEMO_SX_LOCK); break; default: - ut_ad(!(slot->type & MTR_MEMO_MODIFY)); - return true; + buf_page_t *bpage= static_cast<buf_page_t*>(object); + if (type & MTR_MEMO_MODIFY) + { + ut_ad(slot->type == MTR_MEMO_PAGE_X_MODIFY || + slot->type == MTR_MEMO_PAGE_SX_MODIFY); + ut_ad(bpage->oldest_modification() > 1); + ut_ad(bpage->oldest_modification() < end); + ut_ad(bpage->id() < end_page_id); + ut_d(const auto s= bpage->state()); + ut_ad(s > buf_page_t::FREED); + ut_ad(s < buf_page_t::READ_FIX); + ut_ad(mach_read_from_8(bpage->frame + FIL_PAGE_LSN) <= end); + mach_write_to_8(bpage->frame + FIL_PAGE_LSN, end); + if (UNIV_LIKELY_NULL(bpage->zip.data)) + memcpy_aligned<8>(FIL_PAGE_LSN + bpage->zip.data, + FIL_PAGE_LSN + bpage->frame, 8); + modified++; + } + bpage->unfix(); + switch (auto latch= type & ~MTR_MEMO_MODIFY) { + case MTR_MEMO_PAGE_S_FIX: + bpage->lock.s_unlock(); + return true; + case MTR_MEMO_PAGE_SX_FIX: + case MTR_MEMO_PAGE_X_FIX: + bpage->lock.u_or_x_unlock(latch == MTR_MEMO_PAGE_SX_FIX); + /* fall through */ + case MTR_MEMO_BUF_FIX: + return true; + } + ut_ad("invalid type" == 0); } + return true; + } +}; - buf_block_t *block= static_cast<buf_block_t*>(slot->object); - buf_flush_note_modification(block, start, end); +ATTRIBUTE_COLD __attribute__((noinline)) +/** Insert a modified block into buf_pool.flush_list on IMPORT TABLESPACE. */ +static void insert_imported(buf_block_t *block) +{ + ut_d(const auto s= block->page.state()); + ut_ad(s > buf_page_t::FREED); + ut_ad(s < buf_page_t::READ_FIX); + if (block->page.oldest_modification() <= 1) + { + log_sys.latch.rd_lock(SRW_LOCK_CALL); + const lsn_t lsn= log_sys.last_checkpoint_lsn; + mysql_mutex_lock(&buf_pool.flush_list_mutex); + buf_pool.insert_into_flush_list + (buf_pool.prepare_insert_into_flush_list(lsn), block, lsn); + log_sys.latch.rd_unlock(); + mysql_mutex_unlock(&buf_pool.flush_list_mutex); + } +} + +/** Release latches to already pages when no log was written. +This is like ReleaseSimple, but it cover pages of the temporary tablespace +as well as pages modified during IMPORT TABLESPACE. */ +struct ReleaseUnlogged +{ + /** @return true always */ + bool operator()(mtr_memo_slot_t *slot) const + { + void *object= slot->object; + if (!object) + return true; + slot->object= nullptr; + switch (const auto type= slot->type) { + case MTR_MEMO_S_LOCK: + static_cast<index_lock*>(object)->s_unlock(); + break; + case MTR_MEMO_SPACE_X_LOCK: + static_cast<fil_space_t*>(object)->set_committed_size(); + static_cast<fil_space_t*>(object)->x_unlock(); + break; + case MTR_MEMO_SPACE_S_LOCK: + static_cast<fil_space_t*>(object)->s_unlock(); + break; + case MTR_MEMO_X_LOCK: + case MTR_MEMO_SX_LOCK: + static_cast<index_lock*>(object)-> + u_or_x_unlock(type == MTR_MEMO_SX_LOCK); + break; + default: + buf_block_t *block= static_cast<buf_block_t*>(object); + block->page.unfix(); + + if (type & MTR_MEMO_MODIFY) + { + ut_ad(type == MTR_MEMO_PAGE_X_MODIFY || + type == MTR_MEMO_PAGE_SX_MODIFY); + if (UNIV_LIKELY(block->page.id() >= end_page_id)) + block->page.set_temp_modified(); + else + insert_imported(block); + } + + switch (type) { + case MTR_MEMO_PAGE_S_FIX: + block->page.lock.s_unlock(); + break; + case MTR_MEMO_BUF_FIX: + break; + default: + ut_ad(type == MTR_MEMO_PAGE_SX_FIX || type == MTR_MEMO_PAGE_X_FIX || + type == MTR_MEMO_PAGE_SX_MODIFY || + type == MTR_MEMO_PAGE_X_MODIFY); + block->page.lock.u_or_x_unlock(type & MTR_MEMO_PAGE_SX_FIX); + } + } return true; } }; @@ -401,6 +637,7 @@ void mtr_t::start() new(&m_log) mtr_buf_t(); m_made_dirty= false; + m_latch_ex= false; m_inside_ibuf= false; m_modifications= false; m_log_mode= MTR_LOG_ALL; @@ -420,6 +657,44 @@ inline void mtr_t::release_resources() ut_d(m_commit= true); } +/** Handle any pages that were freed during the mini-transaction. */ +void mtr_t::process_freed_pages() +{ + if (m_freed_pages) + { + ut_ad(!m_freed_pages->empty()); + ut_ad(m_freed_space); + ut_ad(m_freed_space->is_owner()); + ut_ad(is_named_space(m_freed_space)); + + /* Update the last freed lsn */ + m_freed_space->freed_range_mutex.lock(); + m_freed_space->update_last_freed_lsn(m_commit_lsn); + if (!m_trim_pages) + for (const auto &range : *m_freed_pages) + m_freed_space->add_free_range(range); + else + m_freed_space->clear_freed_ranges(); + m_freed_space->freed_range_mutex.unlock(); + + delete m_freed_pages; + m_freed_pages= nullptr; + m_freed_space= nullptr; + /* mtr_t::start() will reset m_trim_pages */ + } + else + ut_ad(!m_freed_space); +} + +/** Release modified pages when no log was written. */ +void mtr_t::release_unlogged() +{ + ut_ad(m_log_mode == MTR_LOG_NO_REDO); + ut_ad(m_log.size() == 0); + process_freed_pages(); + m_memo.for_each_block_in_reverse(CIterate<ReleaseUnlogged>()); +} + /** Commit a mini-transaction. */ void mtr_t::commit() { @@ -429,73 +704,66 @@ void mtr_t::commit() /* This is a dirty read, for debugging. */ ut_ad(!m_modifications || !recv_no_log_write); ut_ad(!m_modifications || m_log_mode != MTR_LOG_NONE); + ut_ad(!m_latch_ex); if (m_modifications && (m_log_mode == MTR_LOG_NO_REDO || !m_log.empty())) { - ut_ad(!srv_read_only_mode || m_log_mode == MTR_LOG_NO_REDO); + if (UNIV_UNLIKELY(!is_logged())) + { + release_unlogged(); + goto func_exit; + } - std::pair<lsn_t,page_flush_ahead> lsns; + ut_ad(!srv_read_only_mode); + std::pair<lsn_t,page_flush_ahead> lsns{do_write()}; + process_freed_pages(); - if (UNIV_LIKELY(is_logged())) + if (m_made_dirty) { - lsns= do_write(); + mysql_mutex_lock(&buf_pool.flush_list_mutex); + { + CIterate<ReleaseModified> rm + {ReleaseModified{buf_pool.prepare_insert_into_flush_list(lsns.first), + lsns.first, m_commit_lsn}}; + m_memo.for_each_block_in_reverse(rm); + ut_ad(rm.functor.modified); + buf_pool.flush_list_requests+= rm.functor.modified; + } - if (m_made_dirty) - mysql_mutex_lock(&log_sys.flush_order_mutex); + buf_pool.page_cleaner_wakeup(); + mysql_mutex_unlock(&buf_pool.flush_list_mutex); - /* It is now safe to release log_sys.mutex because the - buf_pool.flush_order_mutex will ensure that we are the first one - to insert into buf_pool.flush_list. */ - mysql_mutex_unlock(&log_sys.mutex); + if (m_latch_ex) + { + log_sys.latch.wr_unlock(); + m_latch_ex= false; + } + else + log_sys.latch.rd_unlock(); + + m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>()); } else { - ut_ad(m_log_mode == MTR_LOG_NO_REDO); - ut_ad(m_log.size() == 0); - m_commit_lsn= log_sys.get_lsn(); - lsns= { m_commit_lsn, PAGE_FLUSH_NO }; - if (UNIV_UNLIKELY(m_made_dirty)) /* This should be IMPORT TABLESPACE */ - mysql_mutex_lock(&log_sys.flush_order_mutex); - } - - if (m_freed_pages) - { - ut_ad(!m_freed_pages->empty()); - ut_ad(m_freed_space); - ut_ad(m_freed_space->is_owner()); - ut_ad(is_named_space(m_freed_space)); - /* Update the last freed lsn */ - m_freed_space->update_last_freed_lsn(m_commit_lsn); - - if (!is_trim_pages()) - for (const auto &range : *m_freed_pages) - m_freed_space->add_free_range(range); + if (m_latch_ex) + { + log_sys.latch.wr_unlock(); + m_latch_ex= false; + } else - m_freed_space->clear_freed_ranges(); - delete m_freed_pages; - m_freed_pages= nullptr; - m_freed_space= nullptr; - /* mtr_t::start() will reset m_trim_pages */ + log_sys.latch.rd_unlock(); + Iterate<ReleaseSimple> rs{ReleaseSimple{m_commit_lsn}}; + m_memo.for_each_block_in_reverse(rs); + buf_pool.add_flush_list_requests(rs.functor.modified); } - else - ut_ad(!m_freed_space); - - m_memo.for_each_block_in_reverse - (CIterate<const ReleaseBlocks>(ReleaseBlocks(lsns.first, m_commit_lsn))); - if (m_made_dirty) - mysql_mutex_unlock(&log_sys.flush_order_mutex); - - m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>()); if (UNIV_UNLIKELY(lsns.second != PAGE_FLUSH_NO)) buf_flush_ahead(m_commit_lsn, lsns.second == PAGE_FLUSH_SYNC); - - if (m_made_dirty) - srv_stats.log_write_requests.inc(); } else m_memo.for_each_block_in_reverse(CIterate<ReleaseAll>()); +func_exit: release_resources(); } @@ -575,43 +843,38 @@ void mtr_t::commit_shrink(fil_space_t &space) ut_ad(UT_LIST_GET_LEN(space.chain) == 1); log_write_and_flush_prepare(); + m_latch_ex= true; + log_sys.latch.wr_lock(SRW_LOCK_CALL); const lsn_t start_lsn= do_write().first; ut_d(m_log.erase()); - mysql_mutex_lock(&log_sys.flush_order_mutex); /* Durably write the reduced FSP_SIZE before truncating the data file. */ log_write_and_flush(); +#ifndef SUX_LOCK_GENERIC + ut_ad(log_sys.latch.is_write_locked()); +#endif os_file_truncate(space.chain.start->name, space.chain.start->handle, os_offset_t{space.size} << srv_page_size_shift, true); - if (m_freed_pages) - { - ut_ad(!m_freed_pages->empty()); - ut_ad(m_freed_space == &space); - ut_ad(memo_contains(*m_freed_space)); - ut_ad(is_named_space(m_freed_space)); - m_freed_space->update_last_freed_lsn(m_commit_lsn); - - if (!is_trim_pages()) - for (const auto &range : *m_freed_pages) - m_freed_space->add_free_range(range); - else - m_freed_space->clear_freed_ranges(); - delete m_freed_pages; - m_freed_pages= nullptr; - m_freed_space= nullptr; - /* mtr_t::start() will reset m_trim_pages */ - } - else - ut_ad(!m_freed_space); + ut_ad(!m_freed_pages || m_freed_space == &space); + process_freed_pages(); m_memo.for_each_block_in_reverse(CIterate<Shrink>{space}); - - m_memo.for_each_block_in_reverse(CIterate<const ReleaseBlocks> - (ReleaseBlocks(start_lsn, m_commit_lsn))); - mysql_mutex_unlock(&log_sys.flush_order_mutex); + mysql_mutex_lock(&buf_pool.flush_list_mutex); + { + CIterate<ReleaseModified> rm + {ReleaseModified{buf_pool.prepare_insert_into_flush_list(start_lsn), + start_lsn, m_commit_lsn}}; + m_memo.for_each_block_in_reverse(rm); + ut_ad(rm.functor.modified); + buf_pool.flush_list_requests+= rm.functor.modified; + } + buf_pool.page_cleaner_wakeup(); + mysql_mutex_unlock(&buf_pool.flush_list_mutex); + log_sys.latch.wr_unlock(); + m_latch_ex= false; mysql_mutex_lock(&fil_system.mutex); ut_ad(space.is_being_truncated); @@ -621,7 +884,6 @@ void mtr_t::commit_shrink(fil_space_t &space) mysql_mutex_unlock(&fil_system.mutex); m_memo.for_each_block_in_reverse(CIterate<ReleaseLatches>()); - srv_stats.log_write_requests.inc(); release_resources(); } @@ -631,39 +893,58 @@ but generated some redo log on a higher level, such as FILE_MODIFY records and an optional FILE_CHECKPOINT marker. The caller must hold log_sys.mutex. This is to be used at log_checkpoint(). -@param[in] checkpoint_lsn log checkpoint LSN, or 0 */ -void mtr_t::commit_files(lsn_t checkpoint_lsn) +@param checkpoint_lsn the log sequence number of a checkpoint, or 0 +@return current LSN */ +lsn_t mtr_t::commit_files(lsn_t checkpoint_lsn) { - mysql_mutex_assert_owner(&log_sys.mutex); - ut_ad(is_active()); - ut_ad(!is_inside_ibuf()); - ut_ad(m_log_mode == MTR_LOG_ALL); - ut_ad(!m_made_dirty); - ut_ad(m_memo.size() == 0); - ut_ad(!srv_read_only_mode); - ut_ad(!m_freed_space); - ut_ad(!m_freed_pages); - - if (checkpoint_lsn) { - byte* ptr = m_log.push<byte*>(SIZE_OF_FILE_CHECKPOINT); - compile_time_assert(SIZE_OF_FILE_CHECKPOINT == 3 + 8 + 1); - *ptr = FILE_CHECKPOINT | (SIZE_OF_FILE_CHECKPOINT - 2); - ::memset(ptr + 1, 0, 2); - mach_write_to_8(ptr + 3, checkpoint_lsn); - ptr[3 + 8] = 0; - } else { - *m_log.push<byte*>(1) = 0; - } +#ifndef SUX_LOCK_GENERIC + ut_ad(log_sys.latch.is_write_locked()); +#endif + ut_ad(is_active()); + ut_ad(!is_inside_ibuf()); + ut_ad(m_log_mode == MTR_LOG_ALL); + ut_ad(!m_made_dirty); + ut_ad(m_memo.size() == 0); + ut_ad(!srv_read_only_mode); + ut_ad(!m_freed_space); + ut_ad(!m_freed_pages); + ut_ad(!m_user_space); + ut_ad(!m_latch_ex); - finish_write(m_log.size()); - srv_stats.log_write_requests.inc(); - release_resources(); + m_latch_ex= true; - if (checkpoint_lsn) { - DBUG_PRINT("ib_log", - ("FILE_CHECKPOINT(" LSN_PF ") written at " LSN_PF, - checkpoint_lsn, log_sys.get_lsn())); - } + if (checkpoint_lsn) + { + byte *ptr= m_log.push<byte*>(3 + 8); + *ptr= FILE_CHECKPOINT | (2 + 8); + ::memset(ptr + 1, 0, 2); + mach_write_to_8(ptr + 3, checkpoint_lsn); + } + + size_t size= m_log.size() + 5; + + if (log_sys.is_encrypted()) + { + /* We will not encrypt any FILE_ records, but we will reserve + a nonce at the end. */ + size+= 8; + m_commit_lsn= log_sys.get_lsn(); + } + else + m_commit_lsn= 0; + + m_crc= 0; + m_log.for_each_block([this](const mtr_buf_t::block_t *b) + { m_crc= my_crc32c(m_crc, b->begin(), b->used()); return true; }); + finish_write(size); + release_resources(); + + if (checkpoint_lsn) + DBUG_PRINT("ib_log", + ("FILE_CHECKPOINT(" LSN_PF ") written at " LSN_PF, + checkpoint_lsn, m_commit_lsn)); + + return m_commit_lsn; } #ifdef UNIV_DEBUG @@ -774,183 +1055,111 @@ mtr_t::release_page(const void* ptr, mtr_memo_type_t type) ut_ad(0); } -static bool log_margin_warned; -static time_t log_margin_warn_time; static bool log_close_warned; static time_t log_close_warn_time; -/** Check margin not to overwrite transaction log from the last checkpoint. -If would estimate the log write to exceed the log_capacity, -waits for the checkpoint is done enough. -@param len length of the data to be written */ -static void log_margin_checkpoint_age(ulint len) +/** Display a warning that the log tail is overwriting the head, +making the server crash-unsafe. */ +ATTRIBUTE_COLD static void log_overwrite_warning(lsn_t age, lsn_t capacity) { - const ulint framing_size= log_sys.framing_size(); - /* actual length stored per block */ - const ulint len_per_blk= OS_FILE_LOG_BLOCK_SIZE - framing_size; - - /* actual data length in last block already written */ - ulint extra_len= log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE; - - ut_ad(extra_len >= LOG_BLOCK_HDR_SIZE); - extra_len-= LOG_BLOCK_HDR_SIZE; - - /* total extra length for block header and trailer */ - extra_len= ((len + extra_len) / len_per_blk) * framing_size; - - const ulint margin= len + extra_len; - - mysql_mutex_assert_owner(&log_sys.mutex); - - const lsn_t lsn= log_sys.get_lsn(); - - if (UNIV_UNLIKELY(margin > log_sys.log_capacity)) + time_t t= time(nullptr); + if (!log_close_warned || difftime(t, log_close_warn_time) > 15) { - time_t t= time(nullptr); - - /* return with warning output to avoid deadlock */ - if (!log_margin_warned || difftime(t, log_margin_warn_time) > 15) - { - log_margin_warned= true; - log_margin_warn_time= t; + log_close_warned= true; + log_close_warn_time= t; - ib::error() << "innodb_log_file_size is too small " - "for mini-transaction size " << len; - } + sql_print_error("InnoDB: The age of the last checkpoint is " LSN_PF + ", which exceeds the log capacity " LSN_PF ".", + age, capacity); } - else if (UNIV_LIKELY(lsn + margin <= log_sys.last_checkpoint_lsn + - log_sys.log_capacity)) - return; - - log_sys.set_check_flush_or_checkpoint(); } - -/** Open the log for log_write_low(). The log must be closed with log_close(). -@param len length of the data to be written -@return start lsn of the log record */ -static lsn_t log_reserve_and_open(size_t len) +/** Wait in append_prepare() for buffer to become available +@param ex whether log_sys.latch is exclusively locked */ +ATTRIBUTE_COLD void log_t::append_prepare_wait(bool ex) noexcept { - for (ut_d(ulint count= 0);;) - { - mysql_mutex_assert_owner(&log_sys.mutex); - - /* Calculate an upper limit for the space the string may take in - the log buffer */ - - size_t len_upper_limit= (4 * OS_FILE_LOG_BLOCK_SIZE) + - srv_log_write_ahead_size + (5 * len) / 4; - - if (log_sys.buf_free + len_upper_limit <= srv_log_buffer_size) - break; - - mysql_mutex_unlock(&log_sys.mutex); - DEBUG_SYNC_C("log_buf_size_exceeded"); - - /* Not enough free space, do a write of the log buffer */ - log_write_up_to(log_sys.get_lsn(), false); + log_sys.waits++; + log_sys.unlock_lsn(); - srv_stats.log_waits.inc(); + if (ex) + log_sys.latch.wr_unlock(); + else + log_sys.latch.rd_unlock(); - ut_ad(++count < 50); + DEBUG_SYNC_C("log_buf_size_exceeded"); + log_buffer_flush_to_disk(log_sys.is_pmem()); - mysql_mutex_lock(&log_sys.mutex); - } + if (ex) + log_sys.latch.wr_lock(SRW_LOCK_CALL); + else + log_sys.latch.rd_lock(SRW_LOCK_CALL); - return log_sys.get_lsn(); + log_sys.lock_lsn(); } -/** Append data to the log buffer. */ -static void log_write_low(const void *str, size_t size) +/** Reserve space in the log buffer for appending data. +@tparam pmem log_sys.is_pmem() +@param size total length of the data to append(), in bytes +@param ex whether log_sys.latch is exclusively locked +@return the start LSN and the buffer position for append() */ +template<bool pmem> +inline +std::pair<lsn_t,byte*> log_t::append_prepare(size_t size, bool ex) noexcept { - mysql_mutex_assert_owner(&log_sys.mutex); - const ulint trailer_offset= log_sys.trailer_offset(); - - do +#ifndef SUX_LOCK_GENERIC + ut_ad(latch.is_locked()); +# ifndef _WIN32 // there is no accurate is_write_locked() on SRWLOCK + ut_ad(ex == latch.is_write_locked()); +# endif +#endif + ut_ad(pmem == is_pmem()); + const lsn_t checkpoint_margin{last_checkpoint_lsn + log_capacity - size}; + const size_t avail{(pmem ? size_t(capacity()) : buf_size) - size}; + lock_lsn(); + write_to_buf++; + + for (ut_d(int count= 50); + UNIV_UNLIKELY((pmem + ? size_t(get_lsn() - + get_flushed_lsn(std::memory_order_relaxed)) + : size_t{buf_free}) > avail); ) { - /* Calculate a part length */ - size_t len= size; - size_t data_len= (log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE) + size; - - if (data_len > trailer_offset) - { - data_len= trailer_offset; - len= trailer_offset - log_sys.buf_free % OS_FILE_LOG_BLOCK_SIZE; - } - - memcpy(log_sys.buf + log_sys.buf_free, str, len); - - size-= len; - str= static_cast<const char*>(str) + len; - - byte *log_block= static_cast<byte*>(ut_align_down(log_sys.buf + - log_sys.buf_free, - OS_FILE_LOG_BLOCK_SIZE)); - - log_block_set_data_len(log_block, data_len); - lsn_t lsn= log_sys.get_lsn(); - - if (data_len == trailer_offset) - { - /* This block became full */ - log_block_set_data_len(log_block, OS_FILE_LOG_BLOCK_SIZE); - log_block_set_checkpoint_no(log_block, log_sys.next_checkpoint_no); - len+= log_sys.framing_size(); - lsn+= len; - /* Initialize the next block header */ - log_block_init(log_block + OS_FILE_LOG_BLOCK_SIZE, lsn); - } - else - lsn+= len; - - log_sys.set_lsn(lsn); - log_sys.buf_free+= len; - - ut_ad(log_sys.buf_free <= size_t{srv_log_buffer_size}); + append_prepare_wait(ex); + ut_ad(count--); } - while (size); + + const lsn_t l{lsn.load(std::memory_order_relaxed)}; + lsn.store(l + size, std::memory_order_relaxed); + const size_t b{buf_free}; + size_t new_buf_free{b}; + new_buf_free+= size; + if (pmem && new_buf_free >= file_size) + new_buf_free-= size_t(capacity()); + buf_free= new_buf_free; + unlock_lsn(); + + if (UNIV_UNLIKELY(l > checkpoint_margin) || + (!pmem && b >= max_buf_free)) + set_check_flush_or_checkpoint(); + + return {l, &buf[b]}; } -/** Close the log at mini-transaction commit. -@return whether buffer pool flushing is needed */ -static mtr_t::page_flush_ahead log_close(lsn_t lsn) +/** Finish appending data to the log. +@param lsn the end LSN of the log record +@return whether buf_flush_ahead() will have to be invoked */ +static mtr_t::page_flush_ahead log_close(lsn_t lsn) noexcept { - mysql_mutex_assert_owner(&log_sys.mutex); - ut_ad(lsn == log_sys.get_lsn()); - - byte *log_block= static_cast<byte*>(ut_align_down(log_sys.buf + - log_sys.buf_free, - OS_FILE_LOG_BLOCK_SIZE)); - - if (!log_block_get_first_rec_group(log_block)) - { - /* We initialized a new log block which was not written - full by the current mtr: the next mtr log record group - will start within this block at the offset data_len */ - log_block_set_first_rec_group(log_block, - log_block_get_data_len(log_block)); - } - - if (log_sys.buf_free > log_sys.max_buf_free) - log_sys.set_check_flush_or_checkpoint(); +#ifndef SUX_LOCK_GENERIC + ut_ad(log_sys.latch.is_locked()); +#endif const lsn_t checkpoint_age= lsn - log_sys.last_checkpoint_lsn; if (UNIV_UNLIKELY(checkpoint_age >= log_sys.log_capacity) && /* silence message on create_log_file() after the log had been deleted */ checkpoint_age != lsn) - { - time_t t= time(nullptr); - if (!log_close_warned || difftime(t, log_close_warn_time) > 15) - { - log_close_warned= true; - log_close_warn_time= t; - - ib::error() << "The age of the last checkpoint is " << checkpoint_age - << ", which exceeds the log capacity " - << log_sys.log_capacity << "."; - } - } + log_overwrite_warning(checkpoint_age, log_sys.log_capacity); else if (UNIV_LIKELY(checkpoint_age <= log_sys.max_modified_age_async)) return mtr_t::PAGE_FLUSH_NO; else if (UNIV_LIKELY(checkpoint_age <= log_sys.max_checkpoint_age)) @@ -1022,97 +1231,143 @@ struct WriteOPT_PAGE_CHECKSUM } }; -/** Write the block contents to the REDO log */ -struct mtr_write_log -{ - /** Append a block to the redo log buffer. - @return whether the appending should continue */ - bool operator()(const mtr_buf_t::block_t *block) const - { - log_write_low(block->begin(), block->used()); - return true; - } -}; - std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::do_write() { - ut_ad(!recv_no_log_write); - ut_ad(is_logged()); + ut_ad(!recv_no_log_write); + ut_ad(is_logged()); +#ifndef SUX_LOCK_GENERIC + ut_ad(!m_latch_ex || log_sys.latch.is_write_locked()); +#endif - ulint len = m_log.size(); - ut_ad(len > 0); + size_t len= m_log.size() + 5; + ut_ad(len > 5); #ifdef UNIV_DEBUG - if (m_log_mode == MTR_LOG_ALL) { - m_memo.for_each_block(CIterate<WriteOPT_PAGE_CHECKSUM>(*this)); - len = m_log.size(); - } + if (m_log_mode == MTR_LOG_ALL) + { + m_memo.for_each_block(CIterate<WriteOPT_PAGE_CHECKSUM>(*this)); + len= m_log.size() + 5; + } #endif - if (len > srv_log_buffer_size / 2) { - log_buffer_extend(ulong((len + 1) * 2)); - } - - fil_space_t* space = m_user_space; - - if (space != NULL && is_predefined_tablespace(space->id)) { - /* Omit FILE_MODIFY for predefined tablespaces. */ - space = NULL; - } - - mysql_mutex_lock(&log_sys.mutex); - - if (fil_names_write_if_was_clean(space)) { - len = m_log.size(); - } else { - /* This was not the first time of dirtying a - tablespace since the latest checkpoint. */ - ut_ad(len == m_log.size()); - } - - *m_log.push<byte*>(1) = 0; - len++; + if (log_sys.is_encrypted()) + { + len+= 8; + encrypt(); + } + else + { + m_crc= 0; + m_commit_lsn= 0; + m_log.for_each_block([this](const mtr_buf_t::block_t *b) + { m_crc= my_crc32c(m_crc, b->begin(), b->used()); return true; }); + } - /* check and attempt a checkpoint if exceeding capacity */ - log_margin_checkpoint_age(len); + if (!m_latch_ex) + log_sys.latch.rd_lock(SRW_LOCK_CALL); - return finish_write(len); + if (UNIV_UNLIKELY(m_user_space && !m_user_space->max_lsn && + !is_predefined_tablespace(m_user_space->id))) + { + if (!m_latch_ex) + { + m_latch_ex= true; + log_sys.latch.rd_unlock(); + log_sys.latch.wr_lock(SRW_LOCK_CALL); + if (UNIV_UNLIKELY(m_user_space->max_lsn != 0)) + goto func_exit; + } + name_write(); + } +func_exit: + return finish_write(len); } -/** Append the redo log records to the redo log buffer. +/** Write the mini-transaction log to the redo log buffer. @param len number of bytes to write @return {start_lsn,flush_ahead} */ -inline std::pair<lsn_t,mtr_t::page_flush_ahead> mtr_t::finish_write(ulint len) +std::pair<lsn_t,mtr_t::page_flush_ahead> +mtr_t::finish_write(size_t len) { - ut_ad(is_logged()); - mysql_mutex_assert_owner(&log_sys.mutex); - ut_ad(m_log.size() == len); - ut_ad(len > 0); + ut_ad(!recv_no_log_write); + ut_ad(is_logged()); +#ifndef SUX_LOCK_GENERIC +# ifndef _WIN32 // there is no accurate is_write_locked() on SRWLOCK + ut_ad(m_latch_ex == log_sys.latch.is_write_locked()); +# endif +#endif + + const size_t size{m_commit_lsn ? 5U + 8U : 5U}; + std::pair<lsn_t, byte*> start; + + if (!log_sys.is_pmem()) + { + start= log_sys.append_prepare<false>(len, m_latch_ex); + m_log.for_each_block([&start](const mtr_buf_t::block_t *b) + { log_sys.append(start.second, b->begin(), b->used()); return true; }); - lsn_t start_lsn; +#ifdef HAVE_PMEM + write_trailer: +#endif + *start.second++= log_sys.get_sequence_bit(start.first + len - size); + if (m_commit_lsn) + { + mach_write_to_8(start.second, m_commit_lsn); + m_crc= my_crc32c(m_crc, start.second, 8); + start.second+= 8; + } + mach_write_to_4(start.second, m_crc); + } +#ifdef HAVE_PMEM + else + { + start= log_sys.append_prepare<true>(len, m_latch_ex); + if (UNIV_LIKELY(start.second + len <= &log_sys.buf[log_sys.file_size])) + { + m_log.for_each_block([&start](const mtr_buf_t::block_t *b) + { log_sys.append(start.second, b->begin(), b->used()); return true; }); + goto write_trailer; + } + m_log.for_each_block([&start](const mtr_buf_t::block_t *b) + { + size_t size{b->used()}; + const size_t size_left(&log_sys.buf[log_sys.file_size] - start.second); + const byte *src= b->begin(); + if (size > size_left) + { + ::memcpy(start.second, src, size_left); + start.second= &log_sys.buf[log_sys.START_OFFSET]; + src+= size_left; + size-= size_left; + } + ::memcpy(start.second, src, size); + start.second+= size; + return true; + }); + const size_t size_left(&log_sys.buf[log_sys.file_size] - start.second); + if (size_left > size) + goto write_trailer; - if (m_log.is_small()) { - const mtr_buf_t::block_t* front = m_log.front(); - ut_ad(len <= front->used()); + byte tail[5 + 8]; + tail[0]= log_sys.get_sequence_bit(start.first + len - size); - m_commit_lsn = log_reserve_and_write_fast(front->begin(), len, - &start_lsn); + if (m_commit_lsn) + { + mach_write_to_8(tail + 1, m_commit_lsn); + m_crc= my_crc32c(m_crc, tail + 1, 8); + mach_write_to_4(tail + 9, m_crc); + } + else + mach_write_to_4(tail + 1, m_crc); - if (!m_commit_lsn) { - goto piecewise; - } - } else { -piecewise: - /* Open the database log for log_write_low */ - start_lsn = log_reserve_and_open(len); - mtr_write_log write_log; - m_log.for_each_block(write_log); - m_commit_lsn = log_sys.get_lsn(); - } - page_flush_ahead flush= log_close(m_commit_lsn); - DBUG_EXECUTE_IF("ib_log_flush_ahead", flush = PAGE_FLUSH_SYNC;); + ::memcpy(start.second, tail, size_left); + ::memcpy(log_sys.buf + log_sys.START_OFFSET, tail + size_left, + size - size_left); + } +#endif - return std::make_pair(start_lsn, flush); + m_commit_lsn= start.first + len; + return {start.first, log_close(m_commit_lsn)}; } /** Find out whether a block was not X-latched by the mini-transaction */ @@ -1460,7 +1715,8 @@ void mtr_t::modify(const buf_block_t &block) } Iterate<FindModified> iteration((FindModified(block))); - if (UNIV_UNLIKELY(m_memo.for_each_block(iteration))) + m_memo.for_each_block(iteration); + if (UNIV_UNLIKELY(!iteration.functor.found)) { ut_ad("modifying an unlatched page" == 0); return; |