diff options
author | Marko Mäkelä <marko.makela@mariadb.com> | 2023-04-06 16:54:33 +0300 |
---|---|---|
committer | Marko Mäkelä <marko.makela@mariadb.com> | 2023-04-06 16:54:33 +0300 |
commit | 392d9c6dcf17abe5de5ba9ca2b54dfbc9d27bf00 (patch) | |
tree | 4a2f47adbe45b16dae97c931b039ecf546ea3f91 | |
parent | f9a040b755b4d4f4572a9a9e7224ff7873256d43 (diff) | |
download | mariadb-git-392d9c6dcf17abe5de5ba9ca2b54dfbc9d27bf00.tar.gz |
Reduce contention on recv_sys.mutex
recv_sys_t::apply_batch(): Choose a number of successive pages
for a recovery batch.
recv_sys_t::erase(recv_sys_t::map::iterator): Remove log records for a
page whose recovery is not in progress. Log application threads
will not invoke this; they will only set being_recovered=-1 to indicate
that the entry is no longer needed.
recv_sys_t::garbage_collect(): Remove all being_recovered=-1 entries.
recv_sys_t::wait_for_pool(): Wait for some space to become available
in the buffer pool.
Fake reads will not touch buf_pool.n_pend_reads.
TODO: Even simpler fake reads (no changes to tpool), and replace
buf_pool.n_pend_reads with something based on read_slots.
-rw-r--r-- | storage/innobase/buf/buf0flu.cc | 5 | ||||
-rw-r--r-- | storage/innobase/buf/buf0lru.cc | 3 | ||||
-rw-r--r-- | storage/innobase/buf/buf0rea.cc | 92 | ||||
-rw-r--r-- | storage/innobase/include/buf0rea.h | 9 | ||||
-rw-r--r-- | storage/innobase/include/log0recv.h | 100 | ||||
-rw-r--r-- | storage/innobase/log/log0recv.cc | 919 |
6 files changed, 571 insertions, 557 deletions
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 0072d0c22b8..4ac7c059818 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -113,7 +113,12 @@ void buf_pool_t::page_cleaner_wakeup(bool for_LRU) { ut_d(buf_flush_validate_skip()); if (!page_cleaner_idle()) + { + if (for_LRU) + /* Ensure that the page cleaner is not in a timed wait. */ + pthread_cond_signal(&do_flush_list); return; + } double dirty_pct= double(UT_LIST_GET_LEN(buf_pool.flush_list)) * 100.0 / double(UT_LIST_GET_LEN(buf_pool.LRU) + UT_LIST_GET_LEN(buf_pool.free)); double pct_lwm= srv_max_dirty_pages_pct_lwm; diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc index e4e20e8335f..213bd3bb030 100644 --- a/storage/innobase/buf/buf0lru.cc +++ b/storage/innobase/buf/buf0lru.cc @@ -1233,6 +1233,7 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state) buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold()); page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain); + recv_sys.free_corrupted_page(id); mysql_mutex_lock(&mutex); hash_lock.lock(); @@ -1257,8 +1258,6 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state) buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage)); mysql_mutex_unlock(&mutex); - - recv_sys.free_corrupted_page(id); } /** Update buf_pool.LRU_old_ratio. diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc index 9f2e7c8c199..48163951b1d 100644 --- a/storage/innobase/buf/buf0rea.cc +++ b/storage/innobase/buf/buf0rea.cc @@ -188,7 +188,6 @@ page_exists: buf_pool.stat.n_pages_read++; mysql_mutex_unlock(&buf_pool.mutex); - buf_pool.n_pend_reads++; return bpage; func_exit: mysql_mutex_unlock(&buf_pool.mutex); @@ -233,6 +232,7 @@ buf_read_page_low( return DB_SUCCESS_LOCKED_REC; } + buf_pool.n_pend_reads++; ut_ad(bpage->in_file()); if (sync) { @@ -657,82 +657,42 @@ failed: return count; } -/** @return whether a page has been freed */ -inline bool fil_space_t::is_freed(uint32_t page) +/** Schedule a page for recovery. +@param space tablespace +@param id page identifier +@param no_read whether the page can be recovered without reading it */ +void buf_read_recover(fil_space_t *space, const page_id_t id, bool no_read) { - std::lock_guard<std::mutex> freed_lock(freed_range_mutex); - return freed_ranges.contains(page); -} - -/** Issues read requests for pages which recovery wants to read in. -@param space_id tablespace identifier -@param page_nos page numbers to read, in ascending order */ -void buf_read_recv_pages(uint32_t space_id, st_::span<uint64_t> page_nos) -{ - fil_space_t *space= fil_space_t::get(space_id); - - if (!space) - /* The tablespace is missing or unreadable: do nothing */ - return; - + ut_ad(space->id == id.space()); + space->reacquire(); const ulint zip_size= space->zip_size() | 1; - mysql_mutex_lock(&buf_pool.mutex); - if (UT_LIST_GET_LEN(buf_pool.free) < page_nos.size()) - { - mysql_mutex_unlock(&buf_pool.mutex); - os_aio_wait_until_no_pending_reads(); - mysql_mutex_lock(&buf_pool.mutex); - } - buf_block_t* block= buf_LRU_get_free_block(have_mutex); - mysql_mutex_unlock(&buf_pool.mutex); - - for (ulint i= 0; i < page_nos.size(); i++) - { - /* Ignore if the page already present in freed ranges. */ - const uint32_t page_no= static_cast<uint32_t>(page_nos[i]); - if (space->is_freed(page_no)) - continue; + buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold()); - const page_id_t id{space_id, page_no}; - ut_ad(!buf_dblwr.is_inside(id)); + buf_block_t *block= buf_LRU_get_free_block(have_no_mutex); - buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold()); - - if (page_nos[i] >> 63) + if (no_read) + { + if (buf_page_t *bpage= buf_page_init_for_read(id, zip_size, chain, block)) { - buf_page_t *bpage= buf_page_init_for_read(id, zip_size, chain, block); - if (!bpage) - { - ut_ad(block); - continue; - } ut_ad(bpage->in_file()); - space->reacquire(); os_fake_read(IORequest{bpage, nullptr, UT_LIST_GET_FIRST(space->chain), IORequest::READ_ASYNC}); - goto allocate; + return; } - - space->reacquire(); - - switch (buf_read_page_low(id, zip_size, chain, space, block)) { - case DB_SUCCESS: - allocate: - ut_ad(!block); - block= buf_LRU_get_free_block(have_no_mutex); - break; - case DB_SUCCESS_LOCKED_REC: - break; - default: + } + else if (dberr_t err= buf_read_page_low(id, zip_size, chain, space, block)) + { + if (err != DB_SUCCESS_LOCKED_REC) sql_print_error("InnoDB: Recovery failed to read page " - UINT32PF " from %s", page_no, space->chain.start->name); - } - ut_ad(block); + UINT32PF " from %s", + id.page_no(), space->chain.start->name); + } + else + { + ut_ad(!block); + return; } - DBUG_PRINT("ib_buf", ("recovery read (%zu pages) for %s", - page_nos.size(), space->chain.start->name)); - space->release(); - buf_read_release(block); + buf_LRU_block_free_non_file_page(block); } diff --git a/storage/innobase/include/buf0rea.h b/storage/innobase/include/buf0rea.h index d3e47895a2e..e2a6987aba8 100644 --- a/storage/innobase/include/buf0rea.h +++ b/storage/innobase/include/buf0rea.h @@ -91,7 +91,8 @@ latches! @return number of page read requests issued */ ulint buf_read_ahead_linear(const page_id_t page_id, ulint zip_size); -/** Issue read requests for pages that need to be recovered. -@param space_id tablespace identifier -@param page_nos page numbers to read, in ascending order */ -void buf_read_recv_pages(uint32_t space_id, st_::span<uint64_t> page_nos); +/** Schedule a page for recovery. +@param space tablespace +@param id page identifier +@param no_read whether the page can be recovered without reading it */ +void buf_read_recover(fil_space_t *space, const page_id_t id, bool no_read); diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h index 951694b7ae0..12a84b9a66d 100644 --- a/storage/innobase/include/log0recv.h +++ b/storage/innobase/include/log0recv.h @@ -38,9 +38,9 @@ Created 9/20/1997 Heikki Tuuri #define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on) ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result)) -/** Apply any buffered redo log to a page that was just read from a data file. -@param[in,out] space tablespace -@param[in,out] bpage buffer pool page +/** Apply any buffered redo log to a page. +@param space tablespace +@param bpage buffer pool page @return whether the page was recovered correctly */ bool recv_recover_page(fil_space_t* space, buf_page_t* bpage); @@ -114,23 +114,15 @@ struct recv_dblwr_t list pages; }; -/** the recovery state and buffered records for a page */ +/** recv_sys.pages entry; protected by recv_sys.mutex */ struct page_recv_t { - /** Recovery state; protected by recv_sys.mutex */ - enum - { - /** not yet processed */ - RECV_NOT_PROCESSED, - /** not processed; the page will be reinitialized */ - RECV_WILL_NOT_READ, - /** page is being fake read */ - RECV_BEING_FAKE_READ, - /** page is being read */ - RECV_BEING_READ, - /** log records are being applied on the page */ - RECV_BEING_PROCESSED - } state= RECV_NOT_PROCESSED; + /** Recovery status: 0=not in progress, 1=log is being applied, + -1=log has been applied and the entry may be erased. + Transitions from 1 to -1 are NOT protected by recv_sys.mutex. */ + Atomic_relaxed<int8_t> being_processed{0}; + /** Whether reading the page will be skipped */ + bool skip_read= false; /** Latest written byte offset when applying the log records. @see mtr_t::m_last_offset */ uint16_t last_offset= 1; @@ -175,7 +167,7 @@ struct page_recv_t iterator end() { return NULL; } bool empty() const { ut_ad(!head == !tail); return !head; } /** Clear and free the records; @see recv_sys_t::add() */ - inline void clear(); + void clear(); } log; /** Trim old log records for a page. @@ -184,21 +176,14 @@ struct page_recv_t inline bool trim(lsn_t start_lsn); /** Ignore any earlier redo log records for this page. */ inline void will_not_read(); - /** @return whether the log records for the page are being processed */ - bool is_being_processed() const { return state == RECV_BEING_PROCESSED; } }; /** Recovery system data structure */ struct recv_sys_t { - /** mutex protecting apply_log_recs and page_recv_t::state */ - mysql_mutex_t mutex; + /** mutex protecting this as well as some of page_recv_t */ + alignas(CPU_LEVEL1_DCACHE_LINESIZE) mysql_mutex_t mutex; private: - /** condition variable for - !apply_batch_on || pages.empty() || found_corrupt_log || found_corrupt_fs */ - pthread_cond_t cond; - /** whether recv_apply_hashed_log_recs() is running */ - bool apply_batch_on; /** set when finding a corrupt log block or record, or there is a log parsing buffer overflow */ bool found_corrupt_log; @@ -232,7 +217,7 @@ public: map pages; private: - /** iterator to parse, used by parse() */ + /** iterator to pages, used by parse() */ map::iterator pages_it; /** Process a record that indicates that a tablespace size is being shrunk. @@ -261,21 +246,21 @@ public: private: /** Attempt to initialize a page based on redo log records. - @param page_id page identifier - @param p iterator pointing to page_id + @param p iterator @param mtr mini-transaction @param b pre-allocated buffer pool block + @param init_lsn LSN of the page initialization @return the recovered block @retval nullptr if the page cannot be initialized based on log records @retval -1 if the page cannot be recovered due to corruption */ - inline buf_block_t *recover_low(const page_id_t page_id, map::iterator &p, - mtr_t &mtr, buf_block_t *b); + inline buf_block_t *recover_low(const map::iterator &p, mtr_t &mtr, + buf_block_t *b, lsn_t init_lsn); /** Attempt to initialize a page based on redo log records. @param page_id page identifier @return the recovered block @retval nullptr if the page cannot be initialized based on log records @retval -1 if the page cannot be recovered due to corruption */ - buf_block_t *recover_low(const page_id_t page_id); + ATTRIBUTE_COLD buf_block_t *recover_low(const page_id_t page_id); /** All found log files (multiple ones are possible if we are upgrading from before MariaDB Server 10.5.1) */ @@ -284,6 +269,26 @@ private: /** Base node of the redo block list. List elements are linked via buf_block_t::unzip_LRU. */ UT_LIST_BASE_NODE_T(buf_block_t) blocks; + + /** Allocate a block from the buffer pool for recv_sys.pages */ + ATTRIBUTE_COLD buf_block_t *add_block(); + + /** Wait for buffer pool to become available. + @param pages number of buffer pool pages needed */ + ATTRIBUTE_COLD void wait_for_pool(size_t pages); + + /** Free log for processed pages. */ + void garbage_collect(); + + /** Apply a recovery batch. + @param space_id current tablespace identifier + @param space current tablespace + @param free_block spare buffer block + @param last_batch whether it is possible to write more redo log + @return whether the caller must provide a new free_block */ + bool apply_batch(uint32_t space_id, fil_space_t *&space, + buf_block_t *&free_block, bool last_batch); + public: /** Apply buffered log to persistent data pages. @param last_batch whether it is possible to write more redo log */ @@ -363,21 +368,17 @@ public: { return parse_mtr<store>(if_exists); } #endif + /** Erase log records for a page. */ + void erase(map::iterator p); + /** Clear a fully processed set of stored redo log records. */ - inline void clear(); + void clear(); /** Determine whether redo log recovery progress should be reported. @param time the current time @return whether progress should be reported (the last report was at least 15 seconds ago) */ - bool report(time_t time) - { - if (time - progress_time < 15) - return false; - - progress_time= time; - return true; - } + bool report(time_t time); /** The alloc() memory alignment, in bytes */ static constexpr size_t ALIGNMENT= sizeof(size_t); @@ -395,8 +396,6 @@ public: ATTRIBUTE_COLD void set_corrupt_fs(); /** Flag log file corruption during recovery. */ ATTRIBUTE_COLD void set_corrupt_log(); - /** Possibly finish a recovery batch. */ - inline void maybe_finish_batch(); /** @return whether data file corruption was found */ bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); } @@ -414,13 +413,14 @@ public: } /** Try to recover a tablespace that was not readable earlier - @param p iterator, initially pointing to page_id_t{space_id,0}; - the records will be freed and the iterator advanced + @param p iterator @param name tablespace file name @param free_block spare buffer block - @return whether recovery failed */ - bool recover_deferred(map::iterator &p, const std::string &name, - buf_block_t *&free_block); + @return recovered tablespace + @retval nullptr if recovery failed */ + fil_space_t *recover_deferred(const map::iterator &p, + const std::string &name, + buf_block_t *&free_block); }; /** The recovery system */ diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc index a700feaca00..f834a61d65d 100644 --- a/storage/innobase/log/log0recv.cc +++ b/storage/innobase/log/log0recv.cc @@ -726,7 +726,7 @@ static struct { retry: log_sys.latch.wr_unlock(); - bool fail= false; + fil_space_t *space= fil_system.sys_space; buf_block_t *free_block= buf_LRU_get_free_block(have_no_mutex); log_sys.latch.wr_lock(SRW_LOCK_CALL); mysql_mutex_lock(&recv_sys.mutex); @@ -745,9 +745,9 @@ retry: for dict_drop_index_tree(). */ while (p != recv_sys.pages.end() && p->first.space() == space_id) { + ut_ad(!p->second.being_processed); recv_sys_t::map::iterator r= p++; - r->second.log.clear(); - recv_sys.pages.erase(r); + recv_sys.erase(r); } recv_spaces_t::iterator it{recv_spaces.find(space_id)}; if (it != recv_spaces.end()) @@ -769,11 +769,13 @@ retry: } } else - fail= recv_sys.recover_deferred(p, d->second.file_name, free_block); + space= recv_sys.recover_deferred(p, d->second.file_name, free_block); processed: - defers.erase(d++); - if (fail) + auto e= d++; + defers.erase(e); + if (!space) break; + space->release(); if (free_block) continue; mysql_mutex_unlock(&recv_sys.mutex); @@ -784,7 +786,7 @@ processed: mysql_mutex_unlock(&recv_sys.mutex); if (free_block) buf_pool.free_block(free_block); - return fail; + return !space; } /** Create tablespace metadata for a data file that was initially @@ -890,110 +892,6 @@ free_space: } deferred_spaces; -/** Try to recover a tablespace that was not readable earlier -@param p iterator, initially pointing to page_id_t{space_id,0}; - the records will be freed and the iterator advanced -@param name tablespace file name -@param free_block spare buffer block -@return whether recovery failed */ -bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p, - const std::string &name, - buf_block_t *&free_block) -{ - mysql_mutex_assert_owner(&mutex); - - const page_id_t first{p->first}; - ut_ad(first.space()); - - recv_spaces_t::iterator it{recv_spaces.find(first.space())}; - ut_ad(it != recv_spaces.end()); - - if (!first.page_no() && p->second.state == page_recv_t::RECV_WILL_NOT_READ) - { - mtr_t mtr; - buf_block_t *block= recover_low(first, p, mtr, free_block); - ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1)); - free_block= nullptr; - if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1))) - goto fail; - const byte *page= UNIV_LIKELY_NULL(block->page.zip.data) - ? block->page.zip.data - : block->page.frame; - const uint32_t space_id= mach_read_from_4(page + FIL_PAGE_SPACE_ID); - const uint32_t flags= fsp_header_get_flags(page); - const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET); - const uint32_t size= fsp_header_get_field(page, FSP_SIZE); - - ut_ad(it != recv_spaces.end()); - - if (page_id_t{space_id, page_no} == first && size >= 4 && - it != recv_spaces.end() && - fil_space_t::is_valid_flags(flags, space_id) && - fil_space_t::logical_size(flags) == srv_page_size) - { - fil_space_t *space= deferred_spaces.create(it, name, flags, - fil_space_read_crypt_data - (fil_space_t::zip_size(flags), - page), size); - if (!space) - goto release_and_fail; - space->free_limit= fsp_header_get_field(page, FSP_FREE_LIMIT); - space->free_len= flst_get_len(FSP_HEADER_OFFSET + FSP_FREE + page); - fil_node_t *node= UT_LIST_GET_FIRST(space->chain); - node->deferred= true; - if (!space->acquire()) - goto release_and_fail; - fil_names_dirty(space); - const bool is_compressed= fil_space_t::is_compressed(flags); -#ifdef _WIN32 - const bool is_sparse= is_compressed; - if (is_compressed) - os_file_set_sparse_win32(node->handle); -#else - const bool is_sparse= is_compressed && - DB_SUCCESS == os_file_punch_hole(node->handle, 0, 4096) && - !my_test_if_thinly_provisioned(node->handle); -#endif - /* Mimic fil_node_t::read_page0() in case the file exists and - has already been extended to a larger size. */ - ut_ad(node->size == size); - const os_offset_t file_size= os_file_get_size(node->handle); - if (file_size != os_offset_t(-1)) - { - const uint32_t n_pages= - uint32_t(file_size / fil_space_t::physical_size(flags)); - if (n_pages > size) - { - space->size= node->size= n_pages; - space->set_committed_size(); - goto size_set; - } - } - if (!os_file_set_size(node->name, node->handle, - (size * fil_space_t::physical_size(flags)) & - ~4095ULL, is_sparse)) - { - space->release(); - goto release_and_fail; - } - size_set: - node->deferred= false; - space->release(); - it->second.space= space; - block->page.lock.x_unlock(); - return false; - } - - release_and_fail: - block->page.lock.x_unlock(); - } - -fail: - ib::error() << "Cannot apply log to " << first - << " of corrupted file '" << name << "'"; - return true; -} - /** Report an operation to create, delete, or rename a file during backup. @param[in] space_id tablespace identifier @param[in] type redo log type @@ -1081,6 +979,112 @@ public: static mlog_init_t mlog_init; +/** Try to recover a tablespace that was not readable earlier +@param p iterator to the page +@param name tablespace file name +@param free_block spare buffer block +@return recovered tablespace +@retval nullptr if recovery failed */ +fil_space_t *recv_sys_t::recover_deferred(const recv_sys_t::map::iterator &p, + const std::string &name, + buf_block_t *&free_block) +{ + mysql_mutex_assert_owner(&mutex); + + ut_ad(p->first.space()); + + recv_spaces_t::iterator it{recv_spaces.find(p->first.space())}; + ut_ad(it != recv_spaces.end()); + + if (!p->first.page_no() && p->second.skip_read) + { + mtr_t mtr; + ut_ad(!p->second.being_processed); + p->second.being_processed= 1; + const lsn_t init_lsn= mlog_init.last(p->first); + mysql_mutex_unlock(&mutex); + buf_block_t *block= recover_low(p, mtr, free_block, init_lsn); + mysql_mutex_lock(&mutex); + p->second.being_processed= -1; + ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1)); + free_block= nullptr; + if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1))) + goto fail; + const byte *page= UNIV_LIKELY_NULL(block->page.zip.data) + ? block->page.zip.data + : block->page.frame; + const uint32_t space_id= mach_read_from_4(page + FIL_PAGE_SPACE_ID); + const uint32_t flags= fsp_header_get_flags(page); + const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET); + const uint32_t size= fsp_header_get_field(page, FSP_SIZE); + + if (page_id_t{space_id, page_no} == p->first && size >= 4 && + fil_space_t::is_valid_flags(flags, space_id) && + fil_space_t::logical_size(flags) == srv_page_size) + { + fil_space_t *space= deferred_spaces.create(it, name, flags, + fil_space_read_crypt_data + (fil_space_t::zip_size(flags), + page), size); + if (!space) + goto release_and_fail; + space->free_limit= fsp_header_get_field(page, FSP_FREE_LIMIT); + space->free_len= flst_get_len(FSP_HEADER_OFFSET + FSP_FREE + page); + fil_node_t *node= UT_LIST_GET_FIRST(space->chain); + node->deferred= true; + if (!space->acquire()) + goto release_and_fail; + fil_names_dirty(space); + const bool is_compressed= fil_space_t::is_compressed(flags); +#ifdef _WIN32 + const bool is_sparse= is_compressed; + if (is_compressed) + os_file_set_sparse_win32(node->handle); +#else + const bool is_sparse= is_compressed && + DB_SUCCESS == os_file_punch_hole(node->handle, 0, 4096) && + !my_test_if_thinly_provisioned(node->handle); +#endif + /* Mimic fil_node_t::read_page0() in case the file exists and + has already been extended to a larger size. */ + ut_ad(node->size == size); + const os_offset_t file_size= os_file_get_size(node->handle); + if (file_size != os_offset_t(-1)) + { + const uint32_t n_pages= + uint32_t(file_size / fil_space_t::physical_size(flags)); + if (n_pages > size) + { + space->size= node->size= n_pages; + space->set_committed_size(); + goto size_set; + } + } + if (!os_file_set_size(node->name, node->handle, + (size * fil_space_t::physical_size(flags)) & + ~4095ULL, is_sparse)) + { + space->release(); + goto release_and_fail; + } + size_set: + node->deferred= false; + it->second.space= space; + block->page.lock.x_unlock(); + p->second.being_processed= -1; + return space; + } + + release_and_fail: + block->page.lock.x_unlock(); + } + +fail: + ib::error() << "Cannot apply log to " << p->first + << " of corrupted file '" << name << "'"; + return nullptr; +} + /** Process a record that indicates that a tablespace is being shrunk in size. @param page_id first page identifier that is not in the file @@ -1096,6 +1100,7 @@ inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn) p != pages.end() && p->first.space() == page_id.space();) { recv_sys_t::map::iterator r = p++; if (r->second.trim(lsn)) { + ut_ad(!r->second.being_processed); pages.erase(r); } } @@ -1279,7 +1284,6 @@ void recv_sys_t::close() lsn= 0; mysql_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); } recv_spaces.clear(); @@ -1294,10 +1298,8 @@ void recv_sys_t::create() ut_ad(this == &recv_sys); ut_ad(!is_initialised()); mysql_mutex_init(recv_sys_mutex_key, &mutex, nullptr); - pthread_cond_init(&cond, nullptr); apply_log_recs = false; - apply_batch_on = false; len = 0; offset = 0; @@ -1314,11 +1316,10 @@ void recv_sys_t::create() } /** Clear a fully processed set of stored redo log records. */ -inline void recv_sys_t::clear() +void recv_sys_t::clear() { mysql_mutex_assert_owner(&mutex); apply_log_recs= false; - apply_batch_on= false; ut_ad(!after_apply || found_corrupt_fs || !UT_LIST_GET_LAST(blocks)); pages.clear(); pages_it= pages.end(); @@ -1332,8 +1333,6 @@ inline void recv_sys_t::clear() buf_block_free(block); block= prev_block; } - - pthread_cond_broadcast(&cond); } /** Free most recovery data structures. */ @@ -1870,7 +1869,7 @@ void page_recv_t::recs_t::rewind(lsn_t start_lsn) } -inline void page_recv_t::recs_t::clear() +void page_recv_t::recs_t::clear() { mysql_mutex_assert_owner(&recv_sys.mutex); for (const log_rec_t *l= head; l; ) @@ -1882,15 +1881,78 @@ inline void page_recv_t::recs_t::clear() head= tail= nullptr; } - /** Ignore any earlier redo log records for this page. */ inline void page_recv_t::will_not_read() { - ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ); - state= RECV_WILL_NOT_READ; + ut_ad(!being_processed); + skip_read= true; log.clear(); } +void recv_sys_t::erase(map::iterator p) +{ + ut_ad(p->second.being_processed <= 0); + p->second.log.clear(); + pages.erase(p); +} + +/** Free log for processed pages. */ +void recv_sys_t::garbage_collect() +{ + mysql_mutex_assert_owner(&mutex); + + for (map::iterator p= pages.begin(); p != pages.end(); ) + { + if (p->second.being_processed < 0) + { + map::iterator r= p++; + erase(r); + } + else + p++; + } +} + +/** Allocate a block from the buffer pool for recv_sys.pages */ +ATTRIBUTE_COLD buf_block_t *recv_sys_t::add_block() +{ + for (bool freed= false;;) + { + const auto rs= UT_LIST_GET_LEN(blocks) * 2; + mysql_mutex_lock(&buf_pool.mutex); + const auto bs= + UT_LIST_GET_LEN(buf_pool.free) + UT_LIST_GET_LEN(buf_pool.LRU); + if (UNIV_LIKELY(bs > BUF_LRU_MIN_LEN || rs < bs)) + { + buf_block_t *block= buf_LRU_get_free_block(have_mutex); + mysql_mutex_unlock(&buf_pool.mutex); + return block; + } + /* out of memory: redo log occupies more than 1/3 of buf_pool + and there are fewer than BUF_LRU_MIN_LEN pages left */ + mysql_mutex_unlock(&buf_pool.mutex); + if (freed) + return nullptr; + freed= true; + if (pages_it != pages.end() && pages_it->second.being_processed < 0) + pages_it= pages.end(); + garbage_collect(); + } +} + +/** Wait for buffer pool to become available. */ +ATTRIBUTE_COLD void recv_sys_t::wait_for_pool(size_t pages) +{ + mysql_mutex_unlock(&mutex); + os_aio_wait_until_no_pending_reads(); + mysql_mutex_lock(&mutex); + garbage_collect(); + mysql_mutex_lock(&buf_pool.mutex); + bool need_more= UT_LIST_GET_LEN(buf_pool.free) < pages; + mysql_mutex_unlock(&buf_pool.mutex); + if (need_more) + buf_flush_sync_batch(lsn); +} /** Register a redo log snippet for a page. @param it page iterator @@ -1954,19 +2016,9 @@ append: if (UNIV_UNLIKELY(!block)) { create_block: - const auto rs= UT_LIST_GET_LEN(blocks) * 2; - mysql_mutex_lock(&buf_pool.mutex); - const auto bs= - UT_LIST_GET_LEN(buf_pool.free) + UT_LIST_GET_LEN(buf_pool.LRU); - if (bs <= BUF_LRU_MIN_LEN && rs >= bs) - { - /* out of memory: redo log occupies more than 1/3 of buf_pool - and there are fewer than BUF_LRU_MIN_LEN pages left */ - mysql_mutex_unlock(&buf_pool.mutex); + block= add_block(); + if (UNIV_UNLIKELY(!block)) return true; - } - block= buf_LRU_get_free_block(have_mutex); - mysql_mutex_unlock(&buf_pool.mutex); block->page.access_time= 1U << 16 | ut_calc_align<uint16_t>(static_cast<uint16_t>(size), ALIGNMENT); static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2"); @@ -2267,7 +2319,6 @@ void recv_sys_t::rewind(source &l, source &begin) noexcept { ut_ad(srv_operation != SRV_OPERATION_BACKUP); mysql_mutex_assert_owner(&mutex); - pages_it= pages.end(); const source end= l; uint32_t rlen; @@ -2311,28 +2362,24 @@ void recv_sys_t::rewind(source &l, source &begin) noexcept if (pages_it == pages.end() || pages_it->first != id) { pages_it= pages.find(id); - if (pages_it != pages.end()) - { - const log_phys_t *head= - static_cast<log_phys_t*>(*pages_it->second.log.begin()); - if (!head) - { - erase: - pages.erase(pages_it); - pages_it= pages.end(); - } - else if (head->start_lsn == lsn) - { - pages_it->second.log.clear(); - goto erase; - } - else - pages_it->second.log.rewind(lsn); - } + if (pages_it == pages.end()) + continue; } + + ut_ad(!pages_it->second.being_processed); + const log_phys_t *head= + static_cast<log_phys_t*>(*pages_it->second.log.begin()); + if (!head || head->start_lsn == lsn) + { + erase(pages_it); + pages_it= pages.end(); + } + else + pages_it->second.log.rewind(lsn); } l= begin; + pages_it= pages.end(); } /** Parse and register one log_t::FORMAT_10_8 mini-transaction. @@ -2470,7 +2517,6 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists) sql_print_error("InnoDB: Unknown log record at LSN " LSN_PF, lsn); corrupted: found_corrupt_log= true; - pthread_cond_broadcast(&cond); return GOT_EOF; } @@ -2774,9 +2820,8 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists) apply(false); goto restart; } - DBUG_PRINT("ib_log", - ("Ran out of memory and last stored lsn " LSN_PF - " last stored offset %zu\n", lsn, offset)); + sql_print_information("InnoDB recovery ran out of memory at LSN " + LSN_PF, lsn); return GOT_OOM; } } @@ -2790,8 +2835,8 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists) if (pages_it == pages.end()) continue; } - pages_it->second.log.clear(); - pages.erase(pages_it++); + map::iterator r= pages_it++; + erase(r); } } else if (rlen) @@ -2977,11 +3022,11 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, fil_space_t *space= nullptr, lsn_t init_lsn= 0) { - mysql_mutex_assert_owner(&recv_sys.mutex); + mysql_mutex_assert_not_owner(&recv_sys.mutex); ut_ad(recv_sys.apply_log_recs); ut_ad(recv_needed_recovery); ut_ad(block->page.id() == p->first); - ut_ad(!p->second.is_being_processed()); + ut_ad(p->second.being_processed == 1); ut_ad(!space || space->id == block->page.id().space()); ut_ad(log_sys.is_latest()); @@ -2993,10 +3038,6 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr, block->page.id().space(), block->page.id().page_no())); - p->second.state = page_recv_t::RECV_BEING_PROCESSED; - - mysql_mutex_unlock(&recv_sys.mutex); - byte *frame = UNIV_LIKELY_NULL(block->page.zip.data) ? block->page.zip.data : block->page.frame; @@ -3187,26 +3228,11 @@ set_start_lsn: mtr.commit(); done: - time_t now = time(NULL); - - mysql_mutex_lock(&recv_sys.mutex); - + /* FIXME: do this in page read, protected with recv_sys.mutex! */ if (recv_max_page_lsn < page_lsn) { recv_max_page_lsn = page_lsn; } - ut_ad(!block || p->second.is_being_processed()); - ut_ad(!block || !recv_sys.pages.empty()); - - if (recv_sys.report(now)) { - const size_t n = recv_sys.pages.size(); - sql_print_information("InnoDB: To recover: %zu pages from log", - n); - service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, - "To recover: %zu pages" - " from log", n); - } - return block; } @@ -3220,52 +3246,40 @@ ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id) mysql_mutex_lock(&mutex); map::iterator p= pages.find(page_id); - if (p != pages.end()) + if (p == pages.end()) { - p->second.log.clear(); - pages.erase(p); - if (!srv_force_recovery) - { - set_corrupt_fs(); - ib::error() << "Unable to apply log to corrupted page " << page_id - << "; set innodb_force_recovery to ignore"; - } - else - ib::warn() << "Discarding log for corrupted page " << page_id; + mysql_mutex_unlock(&mutex); + return; } - if (pages.empty()) - pthread_cond_broadcast(&cond); + p->second.being_processed= -1; + if (!srv_force_recovery) + set_corrupt_fs(); mysql_mutex_unlock(&mutex); -} -/** Possibly finish a recovery batch. */ -inline void recv_sys_t::maybe_finish_batch() -{ - mysql_mutex_assert_owner(&mutex); - ut_ad(recovery_on); - if (!apply_batch_on || pages.empty() || is_corrupt_log() || is_corrupt_fs()) - pthread_cond_broadcast(&cond); + ib::error_or_warn(!srv_force_recovery) + << "Unable to apply log to corrupted page " << page_id; } ATTRIBUTE_COLD void recv_sys_t::set_corrupt_log() { mysql_mutex_lock(&mutex); found_corrupt_log= true; - pthread_cond_broadcast(&cond); mysql_mutex_unlock(&mutex); } ATTRIBUTE_COLD void recv_sys_t::set_corrupt_fs() { mysql_mutex_assert_owner(&mutex); + if (!srv_force_recovery) + sql_print_information("InnoDB: Set innodb_force_recovery=1" + " to ignore corrupted pages."); found_corrupt_fs= true; - pthread_cond_broadcast(&cond); } -/** Apply any buffered redo log to a page that was just read from a data file. -@param[in,out] space tablespace -@param[in,out] bpage buffer pool page +/** Apply any buffered redo log to a page. +@param space tablespace +@param bpage buffer pool page @return whether the page was recovered correctly */ bool recv_recover_page(fil_space_t* space, buf_page_t* bpage) { @@ -3289,23 +3303,23 @@ bool recv_recover_page(fil_space_t* space, buf_page_t* bpage) { const page_id_t id{bpage->id()}; recv_sys_t::map::iterator p= recv_sys.pages.find(id); - if (p != recv_sys.pages.end() && !p->second.is_being_processed()) + if (p == recv_sys.pages.end()); + else if (p->second.being_processed < 0) + recv_sys.erase(p); + else { - success= recv_recover_page(success, mtr, p, space, p->second.state == - page_recv_t::RECV_BEING_FAKE_READ); - if (UNIV_LIKELY(!!success)) - { - p->second.log.clear(); - recv_sys.pages.erase(p); - } - recv_sys.maybe_finish_batch(); + p->second.being_processed= 1; + const lsn_t init_lsn= p->second.skip_read ? mlog_init.last(id) : 0; + mysql_mutex_unlock(&recv_sys.mutex); + success= recv_recover_page(success, mtr, p, space, init_lsn); + p->second.being_processed= -1; goto func_exit; } } + mysql_mutex_unlock(&recv_sys.mutex); mtr.commit(); func_exit: - mysql_mutex_unlock(&recv_sys.mutex); ut_ad(mtr.has_committed()); return success; } @@ -3318,79 +3332,210 @@ void IORequest::fake_read_complete() const ut_ad(bpage->frame); ut_ad(recv_recovery_is_on()); - ut_d(auto n=) buf_pool.n_pend_reads--; - ut_ad(n > 0); - if (recv_recover_page(node->space, bpage)) bpage->lock.x_unlock(true); node->space->release(); } -/** Read pages for which log needs to be applied. -@param page_id first page identifier to read -@param i iterator to recv_sys.pages -@param last_batch whether it is possible to write more redo log */ -TRANSACTIONAL_TARGET -static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i, - bool last_batch) + +/** @return whether a page has been freed */ +inline bool fil_space_t::is_freed(uint32_t page) { - static constexpr uint32_t read_area= 32; + std::lock_guard<std::mutex> freed_lock(freed_range_mutex); + return freed_ranges.contains(page); +} - uint64_t page_nos[read_area]; - ut_ad(page_id == i->first); - page_id.set_page_no(ut_2pow_round(page_id.page_no(), read_area)); - const page_id_t up_limit{page_id + (read_area - 1)}; - uint64_t *p= page_nos; +bool recv_sys_t::report(time_t time) +{ + if (time - progress_time < 15) + return false; + progress_time= time; + return true; +} + +/** Apply a recovery batch. +@param space_id current tablespace identifier +@param space current tablespace +@param free_block spare buffer block +@param last_batch whether it is possible to write more redo log +@return whether the caller must provide a new free_block */ +bool recv_sys_t::apply_batch(uint32_t space_id, fil_space_t *&space, + buf_block_t *&free_block, bool last_batch) +{ + mysql_mutex_assert_owner(&mutex); + ut_ad(pages_it != pages.end()); + ut_ad(!pages_it->second.log.empty()); + + mysql_mutex_lock(&buf_pool.mutex); + size_t n= 0, max_n= std::min<size_t>(BUF_LRU_MIN_LEN, + UT_LIST_GET_LEN(buf_pool.LRU) + + UT_LIST_GET_LEN(buf_pool.free)); + mysql_mutex_unlock(&buf_pool.mutex); - for (; i != recv_sys.pages.end() && i->first <= up_limit; i++) + map::iterator begin{pages_it}; + + while (pages_it != pages.end() && n < max_n) { - switch (auto &state= i->second.state) { - default: + ut_ad(!buf_dblwr.is_inside(pages_it->first)); + const auto being_processed= pages_it->second.being_processed; + + if (being_processed < 0) + { + discard: + map::iterator p{pages_it++}; + erase(p); continue; - case page_recv_t::RECV_WILL_NOT_READ: - state= page_recv_t::RECV_BEING_FAKE_READ; - *p++= 1ULL << 63 | i->first.page_no(); - break; - case page_recv_t::RECV_NOT_PROCESSED: - state= page_recv_t::RECV_BEING_READ; - *p++= i->first.raw(); } + else if (!being_processed) + { + if (space_id != pages_it->first.space()) + { + space_id= pages_it->first.space(); + if (space) + space->release(); + space= fil_space_t::get(space_id); + if (!space) + { + auto d= deferred_spaces.defers.find(space_id); + if (d == deferred_spaces.defers.end() || d->second.deleted) + /* For deleted files we preserve the deferred_spaces entry */; + else if (!free_block) + return true; + else + { + space= recover_deferred(pages_it, d->second.file_name, free_block); + deferred_spaces.defers.erase(d); + if (!space && !srv_force_recovery) + { + set_corrupt_fs(); + return false; + } + } + } + } + if (!space || space->is_freed(pages_it->first.page_no())) + goto discard; + if (!n++) + begin= pages_it; + } + pages_it++; + } + + if (!last_batch) + log_sys.latch.wr_unlock(); + + pages_it= begin; + const page_id_t begin_id{pages_it->first}; + + if (report(time(nullptr))) + { + const size_t n= pages.size(); + sql_print_information("InnoDB: To recover: %zu pages from log", n); + service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL, + "To recover: %zu pages from log", n); } - if (p != page_nos) + if (!n) + goto wait; + + mysql_mutex_lock(&buf_pool.mutex); + if (free_block) + buf_LRU_block_free_non_file_page(free_block); + free_block= nullptr; + + if (UNIV_UNLIKELY(UT_LIST_GET_LEN(buf_pool.free) < n)) { - mysql_mutex_unlock(&recv_sys.mutex); - if (!last_batch) log_sys.latch.wr_unlock(); - buf_read_recv_pages(page_id.space(), {page_nos, p}); - if (!last_batch) log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&recv_sys.mutex); + mysql_mutex_unlock(&buf_pool.mutex); + wait: + wait_for_pool(n); + if (n); + else if (!last_batch) + goto unlock_relock; + else + goto get_last; + pages_it= pages.lower_bound(begin_id); + ut_ad(pages_it != pages.end()); + } + else + mysql_mutex_unlock(&buf_pool.mutex); + + while (pages_it != pages.end()) + { + ut_ad(!buf_dblwr.is_inside(pages_it->first)); + const auto being_processed= pages_it->second.being_processed; + if (being_processed < 0) + { + discard_again: + map::iterator p{pages_it++}; + erase(p); + } + else if (!being_processed) + { + const page_id_t id{pages_it->first}; + + if (space_id != id.space()) + { + space_id= id.space(); + if (space) + space->release(); + space= fil_space_t::get(space_id); + } + if (!space || space->is_freed(id.page_no())) + goto discard_again; + + pages_it->second.being_processed= 1; + mysql_mutex_unlock(&mutex); + buf_read_recover(space, id, pages_it->second.skip_read); + if (!--n) + { + if (last_batch) + goto relock_last; + goto relock; + } + mysql_mutex_lock(&mutex); + pages_it= pages.lower_bound(id); + } + else + pages_it++; + } + + if (!last_batch) + { + unlock_relock: + mysql_mutex_unlock(&mutex); + relock: + log_sys.latch.wr_lock(SRW_LOCK_CALL); + relock_last: + mysql_mutex_lock(&mutex); + get_last: + pages_it= pages.lower_bound(begin_id); } + + return false; } /** Attempt to initialize a page based on redo log records. -@param page_id page identifier -@param p iterator pointing to page_id +@param p iterator @param mtr mini-transaction @param b pre-allocated buffer pool block +@param init_lsn LSN of the page initialization @return the recovered block @retval nullptr if the page cannot be initialized based on log records @retval -1 if the page cannot be recovered due to corruption */ -inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id, - map::iterator &p, mtr_t &mtr, - buf_block_t *b) +inline buf_block_t *recv_sys_t::recover_low(const map::iterator &p, mtr_t &mtr, + buf_block_t *b, lsn_t init_lsn) { - mysql_mutex_assert_owner(&mutex); - ut_ad(p->first == page_id); + mysql_mutex_assert_not_owner(&mutex); page_recv_t &recs= p->second; - ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ); + ut_ad(recs.skip_read); + ut_ad(recs.being_processed == 1); buf_block_t* block= nullptr; - const lsn_t init_lsn= mlog_init.last(page_id); const lsn_t end_lsn= recs.log.last()->lsn; if (end_lsn < init_lsn) - DBUG_LOG("ib_log", "skip log for page " << page_id + DBUG_LOG("ib_log", "skip log for page " << p->first << " LSN " << end_lsn << " < " << init_lsn); - fil_space_t *space= fil_space_t::get(page_id.space()); + fil_space_t *space= fil_space_t::get(p->first.space()); mtr.start(); mtr.set_log_mode(MTR_LOG_NO_REDO); @@ -3399,81 +3544,76 @@ inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id, if (!space) { - if (page_id.page_no() != 0) + if (p->first.page_no() != 0) { nothing_recoverable: mtr.commit(); return nullptr; } - auto it= recv_spaces.find(page_id.space()); + auto it= recv_spaces.find(p->first.space()); ut_ad(it != recv_spaces.end()); uint32_t flags= it->second.flags; zip_size= fil_space_t::zip_size(flags); - block= buf_page_create_deferred(page_id.space(), zip_size, &mtr, b); + block= buf_page_create_deferred(p->first.space(), zip_size, &mtr, b); ut_ad(block == b); block->page.lock.x_lock_recursive(); } else { - block= buf_page_create(space, page_id.page_no(), zip_size, &mtr, b); + block= buf_page_create(space, p->first.page_no(), zip_size, &mtr, b); if (UNIV_UNLIKELY(block != b)) { /* The page happened to exist in the buffer pool, or it was just being read in. Before the exclusive page latch was acquired by buf_page_create(), all changes to the page must have been applied. */ - ut_ad(pages.find(page_id) == pages.end()); + ut_d(mysql_mutex_lock(&mutex)); + ut_ad(pages.find(p->first) == pages.end()); + ut_d(mysql_mutex_unlock(&mutex)); space->release(); goto nothing_recoverable; } } - ut_ad(&recs == &pages.find(page_id)->second); - map::iterator r= p++; - block= recv_recover_page(block, mtr, r, space, init_lsn); + ut_d(mysql_mutex_lock(&mutex)); + ut_ad(&recs == &pages.find(p->first)->second); + ut_d(mysql_mutex_unlock(&mutex)); + block= recv_recover_page(block, mtr, p, space, init_lsn); ut_ad(mtr.has_committed()); - if (block) - { - recs.log.clear(); - pages.erase(r); - } - else - block= reinterpret_cast<buf_block_t*>(-1); - - if (pages.empty()) - pthread_cond_signal(&cond); - if (space) space->release(); - return block; + return block ? block : reinterpret_cast<buf_block_t*>(-1); } /** Attempt to initialize a page based on redo log records. @param page_id page identifier @return recovered block @retval nullptr if the page cannot be initialized based on log records */ -buf_block_t *recv_sys_t::recover_low(const page_id_t page_id) +ATTRIBUTE_COLD buf_block_t *recv_sys_t::recover_low(const page_id_t page_id) { - buf_block_t *free_block= buf_LRU_get_free_block(have_no_mutex); - buf_block_t *block= nullptr; - mysql_mutex_lock(&mutex); map::iterator p= pages.find(page_id); - if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ) + if (p != pages.end() && !p->second.being_processed && p->second.skip_read) { + p->second.being_processed= 1; + const lsn_t init_lsn= mlog_init.last(page_id); + mysql_mutex_unlock(&mutex); + buf_block_t *free_block= buf_LRU_get_free_block(have_no_mutex); mtr_t mtr; - block= recover_low(page_id, p, mtr, free_block); + buf_block_t *block= recover_low(p, mtr, free_block, init_lsn); + p->second.being_processed= -1; ut_ad(!block || block == reinterpret_cast<buf_block_t*>(-1) || block == free_block); + if (UNIV_UNLIKELY(!block)) + buf_pool.free_block(free_block); + return block; } mysql_mutex_unlock(&mutex); - if (UNIV_UNLIKELY(!block)) - buf_pool.free_block(free_block); - return block; + return nullptr; } inline fil_space_t *fil_system_t::find(const char *path) const @@ -3521,32 +3661,13 @@ void recv_sys_t::apply(bool last_batch) mysql_mutex_assert_owner(&mutex); - timespec abstime; - - while (apply_batch_on) - { - if (is_corrupt_log()) - return; - if (last_batch) - my_cond_wait(&cond, &mutex.m_mutex); - else - { -#ifndef SUX_LOCK_GENERIC - ut_ad(log_sys.latch.is_write_locked()); -#endif - log_sys.latch.wr_unlock(); - set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */ - my_cond_timedwait(&cond, &mutex.m_mutex, &abstime); - mysql_mutex_unlock(&mutex); - log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&mutex); - } - } - - mtr_t mtr; + if (pages_it != pages.end() && pages_it->second.being_processed < 0) + pages_it= pages.end(); + garbage_collect(); if (!pages.empty()) { + mtr_t mtr; const char *msg= last_batch ? "Starting final batch to recover" : "Starting a batch to recover"; @@ -3555,7 +3676,6 @@ void recv_sys_t::apply(bool last_batch) sd_notifyf(0, "STATUS=%s %zu pages from redo log", msg, n); apply_log_recs= true; - apply_batch_on= true; for (auto id= srv_undo_tablespaces_open; id--;) { @@ -3581,145 +3701,76 @@ void recv_sys_t::apply(bool last_batch) fil_system.extend_to_recv_size(); - /* We must release log_sys.latch and recv_sys.mutex before - invoking buf_LRU_get_free_block(). Allocating a block may initiate - a redo log write and therefore acquire log_sys.latch. To avoid - deadlocks, log_sys.latch must not be acquired while holding - recv_sys.mutex. */ - mysql_mutex_unlock(&mutex); - if (!last_batch) - log_sys.latch.wr_unlock(); - - buf_block_t *free_block= buf_LRU_get_free_block(have_no_mutex); - - if (!last_batch) - log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&mutex); + fil_space_t *space= nullptr; + uint32_t space_id= ~0; - for (map::iterator p= pages.begin(); p != pages.end(); ) + for (pages_it= pages.begin(); pages_it != pages.end(); + pages_it= pages.begin()) { - const page_id_t page_id= p->first; - ut_ad(!p->second.log.empty()); - - const uint32_t space_id= page_id.space(); - auto d= deferred_spaces.defers.find(space_id); - if (d != deferred_spaces.defers.end()) + mysql_mutex_lock(&buf_pool.mutex); + buf_block_t *free_block= pages_it == pages.begin() + ? nullptr + : buf_LRU_get_free_only(); + if (!free_block) { - if (d->second.deleted) - { - /* For deleted files we must preserve the entry in deferred_spaces */ -erase_for_space: - while (p != pages.end() && p->first.space() == space_id) - { - map::iterator r= p++; - r->second.log.clear(); - pages.erase(r); - } - } - else if (recover_deferred(p, d->second.file_name, free_block)) - { - if (!srv_force_recovery) - set_corrupt_fs(); - deferred_spaces.defers.erase(d); - goto erase_for_space; - } - else - deferred_spaces.defers.erase(d); - if (!free_block) - { - mysql_mutex_unlock(&mutex); - if (!last_batch) - log_sys.latch.wr_unlock(); - free_block= buf_LRU_get_free_block(have_no_mutex); - if (!last_batch) - log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&mutex); - } - ut_ad(p == pages.end() || p->first > page_id); - continue; - } - - switch (p->second.state) { - case page_recv_t::RECV_BEING_FAKE_READ: - case page_recv_t::RECV_BEING_READ: - case page_recv_t::RECV_BEING_PROCESSED: - p++; - continue; - case page_recv_t::RECV_WILL_NOT_READ: - case page_recv_t::RECV_NOT_PROCESSED: - recv_read_in_area(page_id, p, last_batch); + mysql_mutex_unlock(&buf_pool.mutex); + if (!last_batch) + log_sys.latch.wr_unlock(); + wait_for_pool(1); + pages_it= pages.begin(); + mysql_mutex_unlock(&mutex); + /* We must release log_sys.latch and recv_sys.mutex before + invoking buf_LRU_get_free_block(). Allocating a block may initiate + a redo log write and therefore acquire log_sys.latch. To avoid + deadlocks, log_sys.latch must not be acquired while holding + recv_sys.mutex. */ + free_block= buf_LRU_get_free_block(have_no_mutex); + if (!last_batch) + log_sys.latch.wr_lock(SRW_LOCK_CALL); + mysql_mutex_lock(&mutex); + pages_it= pages.begin(); } - p= pages.lower_bound(page_id); - /* Ensure that progress will be made. */ - ut_ad(p == pages.end() || p->first > page_id || - p->second.state >= page_recv_t::RECV_BEING_FAKE_READ); - } - - buf_pool.free_block(free_block); - - /* Wait until all the pages have been processed */ - for (;;) - { - const bool empty= pages.empty(); - if (empty && !buf_pool.n_pend_reads) - break; + else + mysql_mutex_unlock(&buf_pool.mutex); - if (!is_corrupt_fs() && !is_corrupt_log()) + while (pages_it != pages.end()) { - if (last_batch) + if (is_corrupt_fs() || is_corrupt_log()) { - if (!empty) - my_cond_wait(&cond, &mutex.m_mutex); - else - { - mysql_mutex_unlock(&mutex); - os_aio_wait_until_no_pending_reads(); - ut_ad(!buf_pool.n_pend_reads); - mysql_mutex_lock(&mutex); - ut_ad(pages.empty()); - } - } - else - { -#ifndef SUX_LOCK_GENERIC - ut_ad(log_sys.latch.is_write_locked()); -#endif - log_sys.latch.wr_unlock(); - set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */ - my_cond_timedwait(&cond, &mutex.m_mutex, &abstime); - mysql_mutex_unlock(&mutex); - log_sys.latch.wr_lock(SRW_LOCK_CALL); - mysql_mutex_lock(&mutex); + if (space) + space->release(); + return; } - continue; + if (apply_batch(space_id, space, free_block, last_batch)) + break; } - if (is_corrupt_fs() && !srv_force_recovery) - sql_print_information("InnoDB: Set innodb_force_recovery=1" - " to ignore corrupted pages."); - return; + + garbage_collect(); } - } - if (!last_batch) - log_sys.latch.wr_unlock(); + if (space) + space->release(); + } mysql_mutex_unlock(&mutex); - if (last_batch && srv_operation != SRV_OPERATION_RESTORE && - srv_operation != SRV_OPERATION_RESTORE_EXPORT) - /* Instead of flushing, last_batch sorts the buf_pool.flush_list - in ascending order of buf_page_t::oldest_modification. */ - log_sort_flush_list(); - else - buf_flush_sync_batch(lsn); - if (!last_batch) { + log_sys.latch.wr_unlock(); + buf_flush_sync_batch(lsn); buf_pool_invalidate(); log_sys.latch.wr_lock(SRW_LOCK_CALL); } + else if (srv_operation == SRV_OPERATION_RESTORE || + srv_operation == SRV_OPERATION_RESTORE_EXPORT) + buf_flush_sync_batch(lsn); + else + /* Instead of flushing, last_batch sorts the buf_pool.flush_list + in ascending order of buf_page_t::oldest_modification. */ + log_sort_flush_list(); + #ifdef HAVE_PMEM - else if (log_sys.is_pmem()) + if (last_batch && log_sys.is_pmem()) mprotect(log_sys.buf, len, PROT_READ | PROT_WRITE); #endif @@ -3920,7 +3971,6 @@ static bool recv_scan_log(bool last_phase) } } - recv_sys.maybe_finish_batch(); if (last_phase) { ut_ad(!rewound_lsn); @@ -4030,8 +4080,7 @@ next: /* fall through */ case file_name_t::DELETED: recv_sys_t::map::iterator r = p++; - r->second.log.clear(); - recv_sys.pages.erase(r); + recv_sys.erase(r); continue; } ut_ad(0); |