summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarko Mäkelä <marko.makela@mariadb.com>2023-04-06 16:54:33 +0300
committerMarko Mäkelä <marko.makela@mariadb.com>2023-04-06 16:54:33 +0300
commit392d9c6dcf17abe5de5ba9ca2b54dfbc9d27bf00 (patch)
tree4a2f47adbe45b16dae97c931b039ecf546ea3f91
parentf9a040b755b4d4f4572a9a9e7224ff7873256d43 (diff)
downloadmariadb-git-392d9c6dcf17abe5de5ba9ca2b54dfbc9d27bf00.tar.gz
Reduce contention on recv_sys.mutex
recv_sys_t::apply_batch(): Choose a number of successive pages for a recovery batch. recv_sys_t::erase(recv_sys_t::map::iterator): Remove log records for a page whose recovery is not in progress. Log application threads will not invoke this; they will only set being_recovered=-1 to indicate that the entry is no longer needed. recv_sys_t::garbage_collect(): Remove all being_recovered=-1 entries. recv_sys_t::wait_for_pool(): Wait for some space to become available in the buffer pool. Fake reads will not touch buf_pool.n_pend_reads. TODO: Even simpler fake reads (no changes to tpool), and replace buf_pool.n_pend_reads with something based on read_slots.
-rw-r--r--storage/innobase/buf/buf0flu.cc5
-rw-r--r--storage/innobase/buf/buf0lru.cc3
-rw-r--r--storage/innobase/buf/buf0rea.cc92
-rw-r--r--storage/innobase/include/buf0rea.h9
-rw-r--r--storage/innobase/include/log0recv.h100
-rw-r--r--storage/innobase/log/log0recv.cc919
6 files changed, 571 insertions, 557 deletions
diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc
index 0072d0c22b8..4ac7c059818 100644
--- a/storage/innobase/buf/buf0flu.cc
+++ b/storage/innobase/buf/buf0flu.cc
@@ -113,7 +113,12 @@ void buf_pool_t::page_cleaner_wakeup(bool for_LRU)
{
ut_d(buf_flush_validate_skip());
if (!page_cleaner_idle())
+ {
+ if (for_LRU)
+ /* Ensure that the page cleaner is not in a timed wait. */
+ pthread_cond_signal(&do_flush_list);
return;
+ }
double dirty_pct= double(UT_LIST_GET_LEN(buf_pool.flush_list)) * 100.0 /
double(UT_LIST_GET_LEN(buf_pool.LRU) + UT_LIST_GET_LEN(buf_pool.free));
double pct_lwm= srv_max_dirty_pages_pct_lwm;
diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc
index e4e20e8335f..213bd3bb030 100644
--- a/storage/innobase/buf/buf0lru.cc
+++ b/storage/innobase/buf/buf0lru.cc
@@ -1233,6 +1233,7 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold());
page_hash_latch &hash_lock= buf_pool.page_hash.lock_get(chain);
+ recv_sys.free_corrupted_page(id);
mysql_mutex_lock(&mutex);
hash_lock.lock();
@@ -1257,8 +1258,6 @@ void buf_pool_t::corrupted_evict(buf_page_t *bpage, uint32_t state)
buf_LRU_block_free_hashed_page(reinterpret_cast<buf_block_t*>(bpage));
mysql_mutex_unlock(&mutex);
-
- recv_sys.free_corrupted_page(id);
}
/** Update buf_pool.LRU_old_ratio.
diff --git a/storage/innobase/buf/buf0rea.cc b/storage/innobase/buf/buf0rea.cc
index 9f2e7c8c199..48163951b1d 100644
--- a/storage/innobase/buf/buf0rea.cc
+++ b/storage/innobase/buf/buf0rea.cc
@@ -188,7 +188,6 @@ page_exists:
buf_pool.stat.n_pages_read++;
mysql_mutex_unlock(&buf_pool.mutex);
- buf_pool.n_pend_reads++;
return bpage;
func_exit:
mysql_mutex_unlock(&buf_pool.mutex);
@@ -233,6 +232,7 @@ buf_read_page_low(
return DB_SUCCESS_LOCKED_REC;
}
+ buf_pool.n_pend_reads++;
ut_ad(bpage->in_file());
if (sync) {
@@ -657,82 +657,42 @@ failed:
return count;
}
-/** @return whether a page has been freed */
-inline bool fil_space_t::is_freed(uint32_t page)
+/** Schedule a page for recovery.
+@param space tablespace
+@param id page identifier
+@param no_read whether the page can be recovered without reading it */
+void buf_read_recover(fil_space_t *space, const page_id_t id, bool no_read)
{
- std::lock_guard<std::mutex> freed_lock(freed_range_mutex);
- return freed_ranges.contains(page);
-}
-
-/** Issues read requests for pages which recovery wants to read in.
-@param space_id tablespace identifier
-@param page_nos page numbers to read, in ascending order */
-void buf_read_recv_pages(uint32_t space_id, st_::span<uint64_t> page_nos)
-{
- fil_space_t *space= fil_space_t::get(space_id);
-
- if (!space)
- /* The tablespace is missing or unreadable: do nothing */
- return;
-
+ ut_ad(space->id == id.space());
+ space->reacquire();
const ulint zip_size= space->zip_size() | 1;
- mysql_mutex_lock(&buf_pool.mutex);
- if (UT_LIST_GET_LEN(buf_pool.free) < page_nos.size())
- {
- mysql_mutex_unlock(&buf_pool.mutex);
- os_aio_wait_until_no_pending_reads();
- mysql_mutex_lock(&buf_pool.mutex);
- }
- buf_block_t* block= buf_LRU_get_free_block(have_mutex);
- mysql_mutex_unlock(&buf_pool.mutex);
-
- for (ulint i= 0; i < page_nos.size(); i++)
- {
- /* Ignore if the page already present in freed ranges. */
- const uint32_t page_no= static_cast<uint32_t>(page_nos[i]);
- if (space->is_freed(page_no))
- continue;
+ buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold());
- const page_id_t id{space_id, page_no};
- ut_ad(!buf_dblwr.is_inside(id));
+ buf_block_t *block= buf_LRU_get_free_block(have_no_mutex);
- buf_pool_t::hash_chain &chain= buf_pool.page_hash.cell_get(id.fold());
-
- if (page_nos[i] >> 63)
+ if (no_read)
+ {
+ if (buf_page_t *bpage= buf_page_init_for_read(id, zip_size, chain, block))
{
- buf_page_t *bpage= buf_page_init_for_read(id, zip_size, chain, block);
- if (!bpage)
- {
- ut_ad(block);
- continue;
- }
ut_ad(bpage->in_file());
- space->reacquire();
os_fake_read(IORequest{bpage, nullptr, UT_LIST_GET_FIRST(space->chain),
IORequest::READ_ASYNC});
- goto allocate;
+ return;
}
-
- space->reacquire();
-
- switch (buf_read_page_low(id, zip_size, chain, space, block)) {
- case DB_SUCCESS:
- allocate:
- ut_ad(!block);
- block= buf_LRU_get_free_block(have_no_mutex);
- break;
- case DB_SUCCESS_LOCKED_REC:
- break;
- default:
+ }
+ else if (dberr_t err= buf_read_page_low(id, zip_size, chain, space, block))
+ {
+ if (err != DB_SUCCESS_LOCKED_REC)
sql_print_error("InnoDB: Recovery failed to read page "
- UINT32PF " from %s", page_no, space->chain.start->name);
- }
- ut_ad(block);
+ UINT32PF " from %s",
+ id.page_no(), space->chain.start->name);
+ }
+ else
+ {
+ ut_ad(!block);
+ return;
}
- DBUG_PRINT("ib_buf", ("recovery read (%zu pages) for %s",
- page_nos.size(), space->chain.start->name));
- space->release();
- buf_read_release(block);
+ buf_LRU_block_free_non_file_page(block);
}
diff --git a/storage/innobase/include/buf0rea.h b/storage/innobase/include/buf0rea.h
index d3e47895a2e..e2a6987aba8 100644
--- a/storage/innobase/include/buf0rea.h
+++ b/storage/innobase/include/buf0rea.h
@@ -91,7 +91,8 @@ latches!
@return number of page read requests issued */
ulint buf_read_ahead_linear(const page_id_t page_id, ulint zip_size);
-/** Issue read requests for pages that need to be recovered.
-@param space_id tablespace identifier
-@param page_nos page numbers to read, in ascending order */
-void buf_read_recv_pages(uint32_t space_id, st_::span<uint64_t> page_nos);
+/** Schedule a page for recovery.
+@param space tablespace
+@param id page identifier
+@param no_read whether the page can be recovered without reading it */
+void buf_read_recover(fil_space_t *space, const page_id_t id, bool no_read);
diff --git a/storage/innobase/include/log0recv.h b/storage/innobase/include/log0recv.h
index 951694b7ae0..12a84b9a66d 100644
--- a/storage/innobase/include/log0recv.h
+++ b/storage/innobase/include/log0recv.h
@@ -38,9 +38,9 @@ Created 9/20/1997 Heikki Tuuri
#define recv_recovery_is_on() UNIV_UNLIKELY(recv_sys.recovery_on)
ATTRIBUTE_COLD MY_ATTRIBUTE((nonnull, warn_unused_result))
-/** Apply any buffered redo log to a page that was just read from a data file.
-@param[in,out] space tablespace
-@param[in,out] bpage buffer pool page
+/** Apply any buffered redo log to a page.
+@param space tablespace
+@param bpage buffer pool page
@return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage);
@@ -114,23 +114,15 @@ struct recv_dblwr_t
list pages;
};
-/** the recovery state and buffered records for a page */
+/** recv_sys.pages entry; protected by recv_sys.mutex */
struct page_recv_t
{
- /** Recovery state; protected by recv_sys.mutex */
- enum
- {
- /** not yet processed */
- RECV_NOT_PROCESSED,
- /** not processed; the page will be reinitialized */
- RECV_WILL_NOT_READ,
- /** page is being fake read */
- RECV_BEING_FAKE_READ,
- /** page is being read */
- RECV_BEING_READ,
- /** log records are being applied on the page */
- RECV_BEING_PROCESSED
- } state= RECV_NOT_PROCESSED;
+ /** Recovery status: 0=not in progress, 1=log is being applied,
+ -1=log has been applied and the entry may be erased.
+ Transitions from 1 to -1 are NOT protected by recv_sys.mutex. */
+ Atomic_relaxed<int8_t> being_processed{0};
+ /** Whether reading the page will be skipped */
+ bool skip_read= false;
/** Latest written byte offset when applying the log records.
@see mtr_t::m_last_offset */
uint16_t last_offset= 1;
@@ -175,7 +167,7 @@ struct page_recv_t
iterator end() { return NULL; }
bool empty() const { ut_ad(!head == !tail); return !head; }
/** Clear and free the records; @see recv_sys_t::add() */
- inline void clear();
+ void clear();
} log;
/** Trim old log records for a page.
@@ -184,21 +176,14 @@ struct page_recv_t
inline bool trim(lsn_t start_lsn);
/** Ignore any earlier redo log records for this page. */
inline void will_not_read();
- /** @return whether the log records for the page are being processed */
- bool is_being_processed() const { return state == RECV_BEING_PROCESSED; }
};
/** Recovery system data structure */
struct recv_sys_t
{
- /** mutex protecting apply_log_recs and page_recv_t::state */
- mysql_mutex_t mutex;
+ /** mutex protecting this as well as some of page_recv_t */
+ alignas(CPU_LEVEL1_DCACHE_LINESIZE) mysql_mutex_t mutex;
private:
- /** condition variable for
- !apply_batch_on || pages.empty() || found_corrupt_log || found_corrupt_fs */
- pthread_cond_t cond;
- /** whether recv_apply_hashed_log_recs() is running */
- bool apply_batch_on;
/** set when finding a corrupt log block or record, or there is a
log parsing buffer overflow */
bool found_corrupt_log;
@@ -232,7 +217,7 @@ public:
map pages;
private:
- /** iterator to parse, used by parse() */
+ /** iterator to pages, used by parse() */
map::iterator pages_it;
/** Process a record that indicates that a tablespace size is being shrunk.
@@ -261,21 +246,21 @@ public:
private:
/** Attempt to initialize a page based on redo log records.
- @param page_id page identifier
- @param p iterator pointing to page_id
+ @param p iterator
@param mtr mini-transaction
@param b pre-allocated buffer pool block
+ @param init_lsn LSN of the page initialization
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
- inline buf_block_t *recover_low(const page_id_t page_id, map::iterator &p,
- mtr_t &mtr, buf_block_t *b);
+ inline buf_block_t *recover_low(const map::iterator &p, mtr_t &mtr,
+ buf_block_t *b, lsn_t init_lsn);
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
- buf_block_t *recover_low(const page_id_t page_id);
+ ATTRIBUTE_COLD buf_block_t *recover_low(const page_id_t page_id);
/** All found log files (multiple ones are possible if we are upgrading
from before MariaDB Server 10.5.1) */
@@ -284,6 +269,26 @@ private:
/** Base node of the redo block list.
List elements are linked via buf_block_t::unzip_LRU. */
UT_LIST_BASE_NODE_T(buf_block_t) blocks;
+
+ /** Allocate a block from the buffer pool for recv_sys.pages */
+ ATTRIBUTE_COLD buf_block_t *add_block();
+
+ /** Wait for buffer pool to become available.
+ @param pages number of buffer pool pages needed */
+ ATTRIBUTE_COLD void wait_for_pool(size_t pages);
+
+ /** Free log for processed pages. */
+ void garbage_collect();
+
+ /** Apply a recovery batch.
+ @param space_id current tablespace identifier
+ @param space current tablespace
+ @param free_block spare buffer block
+ @param last_batch whether it is possible to write more redo log
+ @return whether the caller must provide a new free_block */
+ bool apply_batch(uint32_t space_id, fil_space_t *&space,
+ buf_block_t *&free_block, bool last_batch);
+
public:
/** Apply buffered log to persistent data pages.
@param last_batch whether it is possible to write more redo log */
@@ -363,21 +368,17 @@ public:
{ return parse_mtr<store>(if_exists); }
#endif
+ /** Erase log records for a page. */
+ void erase(map::iterator p);
+
/** Clear a fully processed set of stored redo log records. */
- inline void clear();
+ void clear();
/** Determine whether redo log recovery progress should be reported.
@param time the current time
@return whether progress should be reported
(the last report was at least 15 seconds ago) */
- bool report(time_t time)
- {
- if (time - progress_time < 15)
- return false;
-
- progress_time= time;
- return true;
- }
+ bool report(time_t time);
/** The alloc() memory alignment, in bytes */
static constexpr size_t ALIGNMENT= sizeof(size_t);
@@ -395,8 +396,6 @@ public:
ATTRIBUTE_COLD void set_corrupt_fs();
/** Flag log file corruption during recovery. */
ATTRIBUTE_COLD void set_corrupt_log();
- /** Possibly finish a recovery batch. */
- inline void maybe_finish_batch();
/** @return whether data file corruption was found */
bool is_corrupt_fs() const { return UNIV_UNLIKELY(found_corrupt_fs); }
@@ -414,13 +413,14 @@ public:
}
/** Try to recover a tablespace that was not readable earlier
- @param p iterator, initially pointing to page_id_t{space_id,0};
- the records will be freed and the iterator advanced
+ @param p iterator
@param name tablespace file name
@param free_block spare buffer block
- @return whether recovery failed */
- bool recover_deferred(map::iterator &p, const std::string &name,
- buf_block_t *&free_block);
+ @return recovered tablespace
+ @retval nullptr if recovery failed */
+ fil_space_t *recover_deferred(const map::iterator &p,
+ const std::string &name,
+ buf_block_t *&free_block);
};
/** The recovery system */
diff --git a/storage/innobase/log/log0recv.cc b/storage/innobase/log/log0recv.cc
index a700feaca00..f834a61d65d 100644
--- a/storage/innobase/log/log0recv.cc
+++ b/storage/innobase/log/log0recv.cc
@@ -726,7 +726,7 @@ static struct
{
retry:
log_sys.latch.wr_unlock();
- bool fail= false;
+ fil_space_t *space= fil_system.sys_space;
buf_block_t *free_block= buf_LRU_get_free_block(have_no_mutex);
log_sys.latch.wr_lock(SRW_LOCK_CALL);
mysql_mutex_lock(&recv_sys.mutex);
@@ -745,9 +745,9 @@ retry:
for dict_drop_index_tree(). */
while (p != recv_sys.pages.end() && p->first.space() == space_id)
{
+ ut_ad(!p->second.being_processed);
recv_sys_t::map::iterator r= p++;
- r->second.log.clear();
- recv_sys.pages.erase(r);
+ recv_sys.erase(r);
}
recv_spaces_t::iterator it{recv_spaces.find(space_id)};
if (it != recv_spaces.end())
@@ -769,11 +769,13 @@ retry:
}
}
else
- fail= recv_sys.recover_deferred(p, d->second.file_name, free_block);
+ space= recv_sys.recover_deferred(p, d->second.file_name, free_block);
processed:
- defers.erase(d++);
- if (fail)
+ auto e= d++;
+ defers.erase(e);
+ if (!space)
break;
+ space->release();
if (free_block)
continue;
mysql_mutex_unlock(&recv_sys.mutex);
@@ -784,7 +786,7 @@ processed:
mysql_mutex_unlock(&recv_sys.mutex);
if (free_block)
buf_pool.free_block(free_block);
- return fail;
+ return !space;
}
/** Create tablespace metadata for a data file that was initially
@@ -890,110 +892,6 @@ free_space:
}
deferred_spaces;
-/** Try to recover a tablespace that was not readable earlier
-@param p iterator, initially pointing to page_id_t{space_id,0};
- the records will be freed and the iterator advanced
-@param name tablespace file name
-@param free_block spare buffer block
-@return whether recovery failed */
-bool recv_sys_t::recover_deferred(recv_sys_t::map::iterator &p,
- const std::string &name,
- buf_block_t *&free_block)
-{
- mysql_mutex_assert_owner(&mutex);
-
- const page_id_t first{p->first};
- ut_ad(first.space());
-
- recv_spaces_t::iterator it{recv_spaces.find(first.space())};
- ut_ad(it != recv_spaces.end());
-
- if (!first.page_no() && p->second.state == page_recv_t::RECV_WILL_NOT_READ)
- {
- mtr_t mtr;
- buf_block_t *block= recover_low(first, p, mtr, free_block);
- ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1));
- free_block= nullptr;
- if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1)))
- goto fail;
- const byte *page= UNIV_LIKELY_NULL(block->page.zip.data)
- ? block->page.zip.data
- : block->page.frame;
- const uint32_t space_id= mach_read_from_4(page + FIL_PAGE_SPACE_ID);
- const uint32_t flags= fsp_header_get_flags(page);
- const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET);
- const uint32_t size= fsp_header_get_field(page, FSP_SIZE);
-
- ut_ad(it != recv_spaces.end());
-
- if (page_id_t{space_id, page_no} == first && size >= 4 &&
- it != recv_spaces.end() &&
- fil_space_t::is_valid_flags(flags, space_id) &&
- fil_space_t::logical_size(flags) == srv_page_size)
- {
- fil_space_t *space= deferred_spaces.create(it, name, flags,
- fil_space_read_crypt_data
- (fil_space_t::zip_size(flags),
- page), size);
- if (!space)
- goto release_and_fail;
- space->free_limit= fsp_header_get_field(page, FSP_FREE_LIMIT);
- space->free_len= flst_get_len(FSP_HEADER_OFFSET + FSP_FREE + page);
- fil_node_t *node= UT_LIST_GET_FIRST(space->chain);
- node->deferred= true;
- if (!space->acquire())
- goto release_and_fail;
- fil_names_dirty(space);
- const bool is_compressed= fil_space_t::is_compressed(flags);
-#ifdef _WIN32
- const bool is_sparse= is_compressed;
- if (is_compressed)
- os_file_set_sparse_win32(node->handle);
-#else
- const bool is_sparse= is_compressed &&
- DB_SUCCESS == os_file_punch_hole(node->handle, 0, 4096) &&
- !my_test_if_thinly_provisioned(node->handle);
-#endif
- /* Mimic fil_node_t::read_page0() in case the file exists and
- has already been extended to a larger size. */
- ut_ad(node->size == size);
- const os_offset_t file_size= os_file_get_size(node->handle);
- if (file_size != os_offset_t(-1))
- {
- const uint32_t n_pages=
- uint32_t(file_size / fil_space_t::physical_size(flags));
- if (n_pages > size)
- {
- space->size= node->size= n_pages;
- space->set_committed_size();
- goto size_set;
- }
- }
- if (!os_file_set_size(node->name, node->handle,
- (size * fil_space_t::physical_size(flags)) &
- ~4095ULL, is_sparse))
- {
- space->release();
- goto release_and_fail;
- }
- size_set:
- node->deferred= false;
- space->release();
- it->second.space= space;
- block->page.lock.x_unlock();
- return false;
- }
-
- release_and_fail:
- block->page.lock.x_unlock();
- }
-
-fail:
- ib::error() << "Cannot apply log to " << first
- << " of corrupted file '" << name << "'";
- return true;
-}
-
/** Report an operation to create, delete, or rename a file during backup.
@param[in] space_id tablespace identifier
@param[in] type redo log type
@@ -1081,6 +979,112 @@ public:
static mlog_init_t mlog_init;
+/** Try to recover a tablespace that was not readable earlier
+@param p iterator to the page
+@param name tablespace file name
+@param free_block spare buffer block
+@return recovered tablespace
+@retval nullptr if recovery failed */
+fil_space_t *recv_sys_t::recover_deferred(const recv_sys_t::map::iterator &p,
+ const std::string &name,
+ buf_block_t *&free_block)
+{
+ mysql_mutex_assert_owner(&mutex);
+
+ ut_ad(p->first.space());
+
+ recv_spaces_t::iterator it{recv_spaces.find(p->first.space())};
+ ut_ad(it != recv_spaces.end());
+
+ if (!p->first.page_no() && p->second.skip_read)
+ {
+ mtr_t mtr;
+ ut_ad(!p->second.being_processed);
+ p->second.being_processed= 1;
+ const lsn_t init_lsn= mlog_init.last(p->first);
+ mysql_mutex_unlock(&mutex);
+ buf_block_t *block= recover_low(p, mtr, free_block, init_lsn);
+ mysql_mutex_lock(&mutex);
+ p->second.being_processed= -1;
+ ut_ad(block == free_block || block == reinterpret_cast<buf_block_t*>(-1));
+ free_block= nullptr;
+ if (UNIV_UNLIKELY(!block || block == reinterpret_cast<buf_block_t*>(-1)))
+ goto fail;
+ const byte *page= UNIV_LIKELY_NULL(block->page.zip.data)
+ ? block->page.zip.data
+ : block->page.frame;
+ const uint32_t space_id= mach_read_from_4(page + FIL_PAGE_SPACE_ID);
+ const uint32_t flags= fsp_header_get_flags(page);
+ const uint32_t page_no= mach_read_from_4(page + FIL_PAGE_OFFSET);
+ const uint32_t size= fsp_header_get_field(page, FSP_SIZE);
+
+ if (page_id_t{space_id, page_no} == p->first && size >= 4 &&
+ fil_space_t::is_valid_flags(flags, space_id) &&
+ fil_space_t::logical_size(flags) == srv_page_size)
+ {
+ fil_space_t *space= deferred_spaces.create(it, name, flags,
+ fil_space_read_crypt_data
+ (fil_space_t::zip_size(flags),
+ page), size);
+ if (!space)
+ goto release_and_fail;
+ space->free_limit= fsp_header_get_field(page, FSP_FREE_LIMIT);
+ space->free_len= flst_get_len(FSP_HEADER_OFFSET + FSP_FREE + page);
+ fil_node_t *node= UT_LIST_GET_FIRST(space->chain);
+ node->deferred= true;
+ if (!space->acquire())
+ goto release_and_fail;
+ fil_names_dirty(space);
+ const bool is_compressed= fil_space_t::is_compressed(flags);
+#ifdef _WIN32
+ const bool is_sparse= is_compressed;
+ if (is_compressed)
+ os_file_set_sparse_win32(node->handle);
+#else
+ const bool is_sparse= is_compressed &&
+ DB_SUCCESS == os_file_punch_hole(node->handle, 0, 4096) &&
+ !my_test_if_thinly_provisioned(node->handle);
+#endif
+ /* Mimic fil_node_t::read_page0() in case the file exists and
+ has already been extended to a larger size. */
+ ut_ad(node->size == size);
+ const os_offset_t file_size= os_file_get_size(node->handle);
+ if (file_size != os_offset_t(-1))
+ {
+ const uint32_t n_pages=
+ uint32_t(file_size / fil_space_t::physical_size(flags));
+ if (n_pages > size)
+ {
+ space->size= node->size= n_pages;
+ space->set_committed_size();
+ goto size_set;
+ }
+ }
+ if (!os_file_set_size(node->name, node->handle,
+ (size * fil_space_t::physical_size(flags)) &
+ ~4095ULL, is_sparse))
+ {
+ space->release();
+ goto release_and_fail;
+ }
+ size_set:
+ node->deferred= false;
+ it->second.space= space;
+ block->page.lock.x_unlock();
+ p->second.being_processed= -1;
+ return space;
+ }
+
+ release_and_fail:
+ block->page.lock.x_unlock();
+ }
+
+fail:
+ ib::error() << "Cannot apply log to " << p->first
+ << " of corrupted file '" << name << "'";
+ return nullptr;
+}
+
/** Process a record that indicates that a tablespace is
being shrunk in size.
@param page_id first page identifier that is not in the file
@@ -1096,6 +1100,7 @@ inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn)
p != pages.end() && p->first.space() == page_id.space();) {
recv_sys_t::map::iterator r = p++;
if (r->second.trim(lsn)) {
+ ut_ad(!r->second.being_processed);
pages.erase(r);
}
}
@@ -1279,7 +1284,6 @@ void recv_sys_t::close()
lsn= 0;
mysql_mutex_destroy(&mutex);
- pthread_cond_destroy(&cond);
}
recv_spaces.clear();
@@ -1294,10 +1298,8 @@ void recv_sys_t::create()
ut_ad(this == &recv_sys);
ut_ad(!is_initialised());
mysql_mutex_init(recv_sys_mutex_key, &mutex, nullptr);
- pthread_cond_init(&cond, nullptr);
apply_log_recs = false;
- apply_batch_on = false;
len = 0;
offset = 0;
@@ -1314,11 +1316,10 @@ void recv_sys_t::create()
}
/** Clear a fully processed set of stored redo log records. */
-inline void recv_sys_t::clear()
+void recv_sys_t::clear()
{
mysql_mutex_assert_owner(&mutex);
apply_log_recs= false;
- apply_batch_on= false;
ut_ad(!after_apply || found_corrupt_fs || !UT_LIST_GET_LAST(blocks));
pages.clear();
pages_it= pages.end();
@@ -1332,8 +1333,6 @@ inline void recv_sys_t::clear()
buf_block_free(block);
block= prev_block;
}
-
- pthread_cond_broadcast(&cond);
}
/** Free most recovery data structures. */
@@ -1870,7 +1869,7 @@ void page_recv_t::recs_t::rewind(lsn_t start_lsn)
}
-inline void page_recv_t::recs_t::clear()
+void page_recv_t::recs_t::clear()
{
mysql_mutex_assert_owner(&recv_sys.mutex);
for (const log_rec_t *l= head; l; )
@@ -1882,15 +1881,78 @@ inline void page_recv_t::recs_t::clear()
head= tail= nullptr;
}
-
/** Ignore any earlier redo log records for this page. */
inline void page_recv_t::will_not_read()
{
- ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ);
- state= RECV_WILL_NOT_READ;
+ ut_ad(!being_processed);
+ skip_read= true;
log.clear();
}
+void recv_sys_t::erase(map::iterator p)
+{
+ ut_ad(p->second.being_processed <= 0);
+ p->second.log.clear();
+ pages.erase(p);
+}
+
+/** Free log for processed pages. */
+void recv_sys_t::garbage_collect()
+{
+ mysql_mutex_assert_owner(&mutex);
+
+ for (map::iterator p= pages.begin(); p != pages.end(); )
+ {
+ if (p->second.being_processed < 0)
+ {
+ map::iterator r= p++;
+ erase(r);
+ }
+ else
+ p++;
+ }
+}
+
+/** Allocate a block from the buffer pool for recv_sys.pages */
+ATTRIBUTE_COLD buf_block_t *recv_sys_t::add_block()
+{
+ for (bool freed= false;;)
+ {
+ const auto rs= UT_LIST_GET_LEN(blocks) * 2;
+ mysql_mutex_lock(&buf_pool.mutex);
+ const auto bs=
+ UT_LIST_GET_LEN(buf_pool.free) + UT_LIST_GET_LEN(buf_pool.LRU);
+ if (UNIV_LIKELY(bs > BUF_LRU_MIN_LEN || rs < bs))
+ {
+ buf_block_t *block= buf_LRU_get_free_block(have_mutex);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ return block;
+ }
+ /* out of memory: redo log occupies more than 1/3 of buf_pool
+ and there are fewer than BUF_LRU_MIN_LEN pages left */
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (freed)
+ return nullptr;
+ freed= true;
+ if (pages_it != pages.end() && pages_it->second.being_processed < 0)
+ pages_it= pages.end();
+ garbage_collect();
+ }
+}
+
+/** Wait for buffer pool to become available. */
+ATTRIBUTE_COLD void recv_sys_t::wait_for_pool(size_t pages)
+{
+ mysql_mutex_unlock(&mutex);
+ os_aio_wait_until_no_pending_reads();
+ mysql_mutex_lock(&mutex);
+ garbage_collect();
+ mysql_mutex_lock(&buf_pool.mutex);
+ bool need_more= UT_LIST_GET_LEN(buf_pool.free) < pages;
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (need_more)
+ buf_flush_sync_batch(lsn);
+}
/** Register a redo log snippet for a page.
@param it page iterator
@@ -1954,19 +2016,9 @@ append:
if (UNIV_UNLIKELY(!block))
{
create_block:
- const auto rs= UT_LIST_GET_LEN(blocks) * 2;
- mysql_mutex_lock(&buf_pool.mutex);
- const auto bs=
- UT_LIST_GET_LEN(buf_pool.free) + UT_LIST_GET_LEN(buf_pool.LRU);
- if (bs <= BUF_LRU_MIN_LEN && rs >= bs)
- {
- /* out of memory: redo log occupies more than 1/3 of buf_pool
- and there are fewer than BUF_LRU_MIN_LEN pages left */
- mysql_mutex_unlock(&buf_pool.mutex);
+ block= add_block();
+ if (UNIV_UNLIKELY(!block))
return true;
- }
- block= buf_LRU_get_free_block(have_mutex);
- mysql_mutex_unlock(&buf_pool.mutex);
block->page.access_time= 1U << 16 |
ut_calc_align<uint16_t>(static_cast<uint16_t>(size), ALIGNMENT);
static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
@@ -2267,7 +2319,6 @@ void recv_sys_t::rewind(source &l, source &begin) noexcept
{
ut_ad(srv_operation != SRV_OPERATION_BACKUP);
mysql_mutex_assert_owner(&mutex);
- pages_it= pages.end();
const source end= l;
uint32_t rlen;
@@ -2311,28 +2362,24 @@ void recv_sys_t::rewind(source &l, source &begin) noexcept
if (pages_it == pages.end() || pages_it->first != id)
{
pages_it= pages.find(id);
- if (pages_it != pages.end())
- {
- const log_phys_t *head=
- static_cast<log_phys_t*>(*pages_it->second.log.begin());
- if (!head)
- {
- erase:
- pages.erase(pages_it);
- pages_it= pages.end();
- }
- else if (head->start_lsn == lsn)
- {
- pages_it->second.log.clear();
- goto erase;
- }
- else
- pages_it->second.log.rewind(lsn);
- }
+ if (pages_it == pages.end())
+ continue;
}
+
+ ut_ad(!pages_it->second.being_processed);
+ const log_phys_t *head=
+ static_cast<log_phys_t*>(*pages_it->second.log.begin());
+ if (!head || head->start_lsn == lsn)
+ {
+ erase(pages_it);
+ pages_it= pages.end();
+ }
+ else
+ pages_it->second.log.rewind(lsn);
}
l= begin;
+ pages_it= pages.end();
}
/** Parse and register one log_t::FORMAT_10_8 mini-transaction.
@@ -2470,7 +2517,6 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists)
sql_print_error("InnoDB: Unknown log record at LSN " LSN_PF, lsn);
corrupted:
found_corrupt_log= true;
- pthread_cond_broadcast(&cond);
return GOT_EOF;
}
@@ -2774,9 +2820,8 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists)
apply(false);
goto restart;
}
- DBUG_PRINT("ib_log",
- ("Ran out of memory and last stored lsn " LSN_PF
- " last stored offset %zu\n", lsn, offset));
+ sql_print_information("InnoDB recovery ran out of memory at LSN "
+ LSN_PF, lsn);
return GOT_OOM;
}
}
@@ -2790,8 +2835,8 @@ recv_sys_t::parse_mtr_result recv_sys_t::parse(source &l, bool if_exists)
if (pages_it == pages.end())
continue;
}
- pages_it->second.log.clear();
- pages.erase(pages_it++);
+ map::iterator r= pages_it++;
+ erase(r);
}
}
else if (rlen)
@@ -2977,11 +3022,11 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
fil_space_t *space= nullptr,
lsn_t init_lsn= 0)
{
- mysql_mutex_assert_owner(&recv_sys.mutex);
+ mysql_mutex_assert_not_owner(&recv_sys.mutex);
ut_ad(recv_sys.apply_log_recs);
ut_ad(recv_needed_recovery);
ut_ad(block->page.id() == p->first);
- ut_ad(!p->second.is_being_processed());
+ ut_ad(p->second.being_processed == 1);
ut_ad(!space || space->id == block->page.id().space());
ut_ad(log_sys.is_latest());
@@ -2993,10 +3038,6 @@ static buf_block_t *recv_recover_page(buf_block_t *block, mtr_t &mtr,
block->page.id().space(),
block->page.id().page_no()));
- p->second.state = page_recv_t::RECV_BEING_PROCESSED;
-
- mysql_mutex_unlock(&recv_sys.mutex);
-
byte *frame = UNIV_LIKELY_NULL(block->page.zip.data)
? block->page.zip.data
: block->page.frame;
@@ -3187,26 +3228,11 @@ set_start_lsn:
mtr.commit();
done:
- time_t now = time(NULL);
-
- mysql_mutex_lock(&recv_sys.mutex);
-
+ /* FIXME: do this in page read, protected with recv_sys.mutex! */
if (recv_max_page_lsn < page_lsn) {
recv_max_page_lsn = page_lsn;
}
- ut_ad(!block || p->second.is_being_processed());
- ut_ad(!block || !recv_sys.pages.empty());
-
- if (recv_sys.report(now)) {
- const size_t n = recv_sys.pages.size();
- sql_print_information("InnoDB: To recover: %zu pages from log",
- n);
- service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
- "To recover: %zu pages"
- " from log", n);
- }
-
return block;
}
@@ -3220,52 +3246,40 @@ ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id)
mysql_mutex_lock(&mutex);
map::iterator p= pages.find(page_id);
- if (p != pages.end())
+ if (p == pages.end())
{
- p->second.log.clear();
- pages.erase(p);
- if (!srv_force_recovery)
- {
- set_corrupt_fs();
- ib::error() << "Unable to apply log to corrupted page " << page_id
- << "; set innodb_force_recovery to ignore";
- }
- else
- ib::warn() << "Discarding log for corrupted page " << page_id;
+ mysql_mutex_unlock(&mutex);
+ return;
}
- if (pages.empty())
- pthread_cond_broadcast(&cond);
+ p->second.being_processed= -1;
+ if (!srv_force_recovery)
+ set_corrupt_fs();
mysql_mutex_unlock(&mutex);
-}
-/** Possibly finish a recovery batch. */
-inline void recv_sys_t::maybe_finish_batch()
-{
- mysql_mutex_assert_owner(&mutex);
- ut_ad(recovery_on);
- if (!apply_batch_on || pages.empty() || is_corrupt_log() || is_corrupt_fs())
- pthread_cond_broadcast(&cond);
+ ib::error_or_warn(!srv_force_recovery)
+ << "Unable to apply log to corrupted page " << page_id;
}
ATTRIBUTE_COLD void recv_sys_t::set_corrupt_log()
{
mysql_mutex_lock(&mutex);
found_corrupt_log= true;
- pthread_cond_broadcast(&cond);
mysql_mutex_unlock(&mutex);
}
ATTRIBUTE_COLD void recv_sys_t::set_corrupt_fs()
{
mysql_mutex_assert_owner(&mutex);
+ if (!srv_force_recovery)
+ sql_print_information("InnoDB: Set innodb_force_recovery=1"
+ " to ignore corrupted pages.");
found_corrupt_fs= true;
- pthread_cond_broadcast(&cond);
}
-/** Apply any buffered redo log to a page that was just read from a data file.
-@param[in,out] space tablespace
-@param[in,out] bpage buffer pool page
+/** Apply any buffered redo log to a page.
+@param space tablespace
+@param bpage buffer pool page
@return whether the page was recovered correctly */
bool recv_recover_page(fil_space_t* space, buf_page_t* bpage)
{
@@ -3289,23 +3303,23 @@ bool recv_recover_page(fil_space_t* space, buf_page_t* bpage)
{
const page_id_t id{bpage->id()};
recv_sys_t::map::iterator p= recv_sys.pages.find(id);
- if (p != recv_sys.pages.end() && !p->second.is_being_processed())
+ if (p == recv_sys.pages.end());
+ else if (p->second.being_processed < 0)
+ recv_sys.erase(p);
+ else
{
- success= recv_recover_page(success, mtr, p, space, p->second.state ==
- page_recv_t::RECV_BEING_FAKE_READ);
- if (UNIV_LIKELY(!!success))
- {
- p->second.log.clear();
- recv_sys.pages.erase(p);
- }
- recv_sys.maybe_finish_batch();
+ p->second.being_processed= 1;
+ const lsn_t init_lsn= p->second.skip_read ? mlog_init.last(id) : 0;
+ mysql_mutex_unlock(&recv_sys.mutex);
+ success= recv_recover_page(success, mtr, p, space, init_lsn);
+ p->second.being_processed= -1;
goto func_exit;
}
}
+ mysql_mutex_unlock(&recv_sys.mutex);
mtr.commit();
func_exit:
- mysql_mutex_unlock(&recv_sys.mutex);
ut_ad(mtr.has_committed());
return success;
}
@@ -3318,79 +3332,210 @@ void IORequest::fake_read_complete() const
ut_ad(bpage->frame);
ut_ad(recv_recovery_is_on());
- ut_d(auto n=) buf_pool.n_pend_reads--;
- ut_ad(n > 0);
-
if (recv_recover_page(node->space, bpage))
bpage->lock.x_unlock(true);
node->space->release();
}
-/** Read pages for which log needs to be applied.
-@param page_id first page identifier to read
-@param i iterator to recv_sys.pages
-@param last_batch whether it is possible to write more redo log */
-TRANSACTIONAL_TARGET
-static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i,
- bool last_batch)
+
+/** @return whether a page has been freed */
+inline bool fil_space_t::is_freed(uint32_t page)
{
- static constexpr uint32_t read_area= 32;
+ std::lock_guard<std::mutex> freed_lock(freed_range_mutex);
+ return freed_ranges.contains(page);
+}
- uint64_t page_nos[read_area];
- ut_ad(page_id == i->first);
- page_id.set_page_no(ut_2pow_round(page_id.page_no(), read_area));
- const page_id_t up_limit{page_id + (read_area - 1)};
- uint64_t *p= page_nos;
+bool recv_sys_t::report(time_t time)
+{
+ if (time - progress_time < 15)
+ return false;
+ progress_time= time;
+ return true;
+}
+
+/** Apply a recovery batch.
+@param space_id current tablespace identifier
+@param space current tablespace
+@param free_block spare buffer block
+@param last_batch whether it is possible to write more redo log
+@return whether the caller must provide a new free_block */
+bool recv_sys_t::apply_batch(uint32_t space_id, fil_space_t *&space,
+ buf_block_t *&free_block, bool last_batch)
+{
+ mysql_mutex_assert_owner(&mutex);
+ ut_ad(pages_it != pages.end());
+ ut_ad(!pages_it->second.log.empty());
+
+ mysql_mutex_lock(&buf_pool.mutex);
+ size_t n= 0, max_n= std::min<size_t>(BUF_LRU_MIN_LEN,
+ UT_LIST_GET_LEN(buf_pool.LRU) +
+ UT_LIST_GET_LEN(buf_pool.free));
+ mysql_mutex_unlock(&buf_pool.mutex);
- for (; i != recv_sys.pages.end() && i->first <= up_limit; i++)
+ map::iterator begin{pages_it};
+
+ while (pages_it != pages.end() && n < max_n)
{
- switch (auto &state= i->second.state) {
- default:
+ ut_ad(!buf_dblwr.is_inside(pages_it->first));
+ const auto being_processed= pages_it->second.being_processed;
+
+ if (being_processed < 0)
+ {
+ discard:
+ map::iterator p{pages_it++};
+ erase(p);
continue;
- case page_recv_t::RECV_WILL_NOT_READ:
- state= page_recv_t::RECV_BEING_FAKE_READ;
- *p++= 1ULL << 63 | i->first.page_no();
- break;
- case page_recv_t::RECV_NOT_PROCESSED:
- state= page_recv_t::RECV_BEING_READ;
- *p++= i->first.raw();
}
+ else if (!being_processed)
+ {
+ if (space_id != pages_it->first.space())
+ {
+ space_id= pages_it->first.space();
+ if (space)
+ space->release();
+ space= fil_space_t::get(space_id);
+ if (!space)
+ {
+ auto d= deferred_spaces.defers.find(space_id);
+ if (d == deferred_spaces.defers.end() || d->second.deleted)
+ /* For deleted files we preserve the deferred_spaces entry */;
+ else if (!free_block)
+ return true;
+ else
+ {
+ space= recover_deferred(pages_it, d->second.file_name, free_block);
+ deferred_spaces.defers.erase(d);
+ if (!space && !srv_force_recovery)
+ {
+ set_corrupt_fs();
+ return false;
+ }
+ }
+ }
+ }
+ if (!space || space->is_freed(pages_it->first.page_no()))
+ goto discard;
+ if (!n++)
+ begin= pages_it;
+ }
+ pages_it++;
+ }
+
+ if (!last_batch)
+ log_sys.latch.wr_unlock();
+
+ pages_it= begin;
+ const page_id_t begin_id{pages_it->first};
+
+ if (report(time(nullptr)))
+ {
+ const size_t n= pages.size();
+ sql_print_information("InnoDB: To recover: %zu pages from log", n);
+ service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
+ "To recover: %zu pages from log", n);
}
- if (p != page_nos)
+ if (!n)
+ goto wait;
+
+ mysql_mutex_lock(&buf_pool.mutex);
+ if (free_block)
+ buf_LRU_block_free_non_file_page(free_block);
+ free_block= nullptr;
+
+ if (UNIV_UNLIKELY(UT_LIST_GET_LEN(buf_pool.free) < n))
{
- mysql_mutex_unlock(&recv_sys.mutex);
- if (!last_batch) log_sys.latch.wr_unlock();
- buf_read_recv_pages(page_id.space(), {page_nos, p});
- if (!last_batch) log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&recv_sys.mutex);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ wait:
+ wait_for_pool(n);
+ if (n);
+ else if (!last_batch)
+ goto unlock_relock;
+ else
+ goto get_last;
+ pages_it= pages.lower_bound(begin_id);
+ ut_ad(pages_it != pages.end());
+ }
+ else
+ mysql_mutex_unlock(&buf_pool.mutex);
+
+ while (pages_it != pages.end())
+ {
+ ut_ad(!buf_dblwr.is_inside(pages_it->first));
+ const auto being_processed= pages_it->second.being_processed;
+ if (being_processed < 0)
+ {
+ discard_again:
+ map::iterator p{pages_it++};
+ erase(p);
+ }
+ else if (!being_processed)
+ {
+ const page_id_t id{pages_it->first};
+
+ if (space_id != id.space())
+ {
+ space_id= id.space();
+ if (space)
+ space->release();
+ space= fil_space_t::get(space_id);
+ }
+ if (!space || space->is_freed(id.page_no()))
+ goto discard_again;
+
+ pages_it->second.being_processed= 1;
+ mysql_mutex_unlock(&mutex);
+ buf_read_recover(space, id, pages_it->second.skip_read);
+ if (!--n)
+ {
+ if (last_batch)
+ goto relock_last;
+ goto relock;
+ }
+ mysql_mutex_lock(&mutex);
+ pages_it= pages.lower_bound(id);
+ }
+ else
+ pages_it++;
+ }
+
+ if (!last_batch)
+ {
+ unlock_relock:
+ mysql_mutex_unlock(&mutex);
+ relock:
+ log_sys.latch.wr_lock(SRW_LOCK_CALL);
+ relock_last:
+ mysql_mutex_lock(&mutex);
+ get_last:
+ pages_it= pages.lower_bound(begin_id);
}
+
+ return false;
}
/** Attempt to initialize a page based on redo log records.
-@param page_id page identifier
-@param p iterator pointing to page_id
+@param p iterator
@param mtr mini-transaction
@param b pre-allocated buffer pool block
+@param init_lsn LSN of the page initialization
@return the recovered block
@retval nullptr if the page cannot be initialized based on log records
@retval -1 if the page cannot be recovered due to corruption */
-inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
- map::iterator &p, mtr_t &mtr,
- buf_block_t *b)
+inline buf_block_t *recv_sys_t::recover_low(const map::iterator &p, mtr_t &mtr,
+ buf_block_t *b, lsn_t init_lsn)
{
- mysql_mutex_assert_owner(&mutex);
- ut_ad(p->first == page_id);
+ mysql_mutex_assert_not_owner(&mutex);
page_recv_t &recs= p->second;
- ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ);
+ ut_ad(recs.skip_read);
+ ut_ad(recs.being_processed == 1);
buf_block_t* block= nullptr;
- const lsn_t init_lsn= mlog_init.last(page_id);
const lsn_t end_lsn= recs.log.last()->lsn;
if (end_lsn < init_lsn)
- DBUG_LOG("ib_log", "skip log for page " << page_id
+ DBUG_LOG("ib_log", "skip log for page " << p->first
<< " LSN " << end_lsn << " < " << init_lsn);
- fil_space_t *space= fil_space_t::get(page_id.space());
+ fil_space_t *space= fil_space_t::get(p->first.space());
mtr.start();
mtr.set_log_mode(MTR_LOG_NO_REDO);
@@ -3399,81 +3544,76 @@ inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
if (!space)
{
- if (page_id.page_no() != 0)
+ if (p->first.page_no() != 0)
{
nothing_recoverable:
mtr.commit();
return nullptr;
}
- auto it= recv_spaces.find(page_id.space());
+ auto it= recv_spaces.find(p->first.space());
ut_ad(it != recv_spaces.end());
uint32_t flags= it->second.flags;
zip_size= fil_space_t::zip_size(flags);
- block= buf_page_create_deferred(page_id.space(), zip_size, &mtr, b);
+ block= buf_page_create_deferred(p->first.space(), zip_size, &mtr, b);
ut_ad(block == b);
block->page.lock.x_lock_recursive();
}
else
{
- block= buf_page_create(space, page_id.page_no(), zip_size, &mtr, b);
+ block= buf_page_create(space, p->first.page_no(), zip_size, &mtr, b);
if (UNIV_UNLIKELY(block != b))
{
/* The page happened to exist in the buffer pool, or it
was just being read in. Before the exclusive page latch was acquired by
buf_page_create(), all changes to the page must have been applied. */
- ut_ad(pages.find(page_id) == pages.end());
+ ut_d(mysql_mutex_lock(&mutex));
+ ut_ad(pages.find(p->first) == pages.end());
+ ut_d(mysql_mutex_unlock(&mutex));
space->release();
goto nothing_recoverable;
}
}
- ut_ad(&recs == &pages.find(page_id)->second);
- map::iterator r= p++;
- block= recv_recover_page(block, mtr, r, space, init_lsn);
+ ut_d(mysql_mutex_lock(&mutex));
+ ut_ad(&recs == &pages.find(p->first)->second);
+ ut_d(mysql_mutex_unlock(&mutex));
+ block= recv_recover_page(block, mtr, p, space, init_lsn);
ut_ad(mtr.has_committed());
- if (block)
- {
- recs.log.clear();
- pages.erase(r);
- }
- else
- block= reinterpret_cast<buf_block_t*>(-1);
-
- if (pages.empty())
- pthread_cond_signal(&cond);
-
if (space)
space->release();
- return block;
+ return block ? block : reinterpret_cast<buf_block_t*>(-1);
}
/** Attempt to initialize a page based on redo log records.
@param page_id page identifier
@return recovered block
@retval nullptr if the page cannot be initialized based on log records */
-buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
+ATTRIBUTE_COLD buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
{
- buf_block_t *free_block= buf_LRU_get_free_block(have_no_mutex);
- buf_block_t *block= nullptr;
-
mysql_mutex_lock(&mutex);
map::iterator p= pages.find(page_id);
- if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ)
+ if (p != pages.end() && !p->second.being_processed && p->second.skip_read)
{
+ p->second.being_processed= 1;
+ const lsn_t init_lsn= mlog_init.last(page_id);
+ mysql_mutex_unlock(&mutex);
+ buf_block_t *free_block= buf_LRU_get_free_block(have_no_mutex);
mtr_t mtr;
- block= recover_low(page_id, p, mtr, free_block);
+ buf_block_t *block= recover_low(p, mtr, free_block, init_lsn);
+ p->second.being_processed= -1;
ut_ad(!block || block == reinterpret_cast<buf_block_t*>(-1) ||
block == free_block);
+ if (UNIV_UNLIKELY(!block))
+ buf_pool.free_block(free_block);
+ return block;
}
mysql_mutex_unlock(&mutex);
- if (UNIV_UNLIKELY(!block))
- buf_pool.free_block(free_block);
- return block;
+ return nullptr;
}
inline fil_space_t *fil_system_t::find(const char *path) const
@@ -3521,32 +3661,13 @@ void recv_sys_t::apply(bool last_batch)
mysql_mutex_assert_owner(&mutex);
- timespec abstime;
-
- while (apply_batch_on)
- {
- if (is_corrupt_log())
- return;
- if (last_batch)
- my_cond_wait(&cond, &mutex.m_mutex);
- else
- {
-#ifndef SUX_LOCK_GENERIC
- ut_ad(log_sys.latch.is_write_locked());
-#endif
- log_sys.latch.wr_unlock();
- set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */
- my_cond_timedwait(&cond, &mutex.m_mutex, &abstime);
- mysql_mutex_unlock(&mutex);
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
- }
- }
-
- mtr_t mtr;
+ if (pages_it != pages.end() && pages_it->second.being_processed < 0)
+ pages_it= pages.end();
+ garbage_collect();
if (!pages.empty())
{
+ mtr_t mtr;
const char *msg= last_batch
? "Starting final batch to recover"
: "Starting a batch to recover";
@@ -3555,7 +3676,6 @@ void recv_sys_t::apply(bool last_batch)
sd_notifyf(0, "STATUS=%s %zu pages from redo log", msg, n);
apply_log_recs= true;
- apply_batch_on= true;
for (auto id= srv_undo_tablespaces_open; id--;)
{
@@ -3581,145 +3701,76 @@ void recv_sys_t::apply(bool last_batch)
fil_system.extend_to_recv_size();
- /* We must release log_sys.latch and recv_sys.mutex before
- invoking buf_LRU_get_free_block(). Allocating a block may initiate
- a redo log write and therefore acquire log_sys.latch. To avoid
- deadlocks, log_sys.latch must not be acquired while holding
- recv_sys.mutex. */
- mysql_mutex_unlock(&mutex);
- if (!last_batch)
- log_sys.latch.wr_unlock();
-
- buf_block_t *free_block= buf_LRU_get_free_block(have_no_mutex);
-
- if (!last_batch)
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
+ fil_space_t *space= nullptr;
+ uint32_t space_id= ~0;
- for (map::iterator p= pages.begin(); p != pages.end(); )
+ for (pages_it= pages.begin(); pages_it != pages.end();
+ pages_it= pages.begin())
{
- const page_id_t page_id= p->first;
- ut_ad(!p->second.log.empty());
-
- const uint32_t space_id= page_id.space();
- auto d= deferred_spaces.defers.find(space_id);
- if (d != deferred_spaces.defers.end())
+ mysql_mutex_lock(&buf_pool.mutex);
+ buf_block_t *free_block= pages_it == pages.begin()
+ ? nullptr
+ : buf_LRU_get_free_only();
+ if (!free_block)
{
- if (d->second.deleted)
- {
- /* For deleted files we must preserve the entry in deferred_spaces */
-erase_for_space:
- while (p != pages.end() && p->first.space() == space_id)
- {
- map::iterator r= p++;
- r->second.log.clear();
- pages.erase(r);
- }
- }
- else if (recover_deferred(p, d->second.file_name, free_block))
- {
- if (!srv_force_recovery)
- set_corrupt_fs();
- deferred_spaces.defers.erase(d);
- goto erase_for_space;
- }
- else
- deferred_spaces.defers.erase(d);
- if (!free_block)
- {
- mysql_mutex_unlock(&mutex);
- if (!last_batch)
- log_sys.latch.wr_unlock();
- free_block= buf_LRU_get_free_block(have_no_mutex);
- if (!last_batch)
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
- }
- ut_ad(p == pages.end() || p->first > page_id);
- continue;
- }
-
- switch (p->second.state) {
- case page_recv_t::RECV_BEING_FAKE_READ:
- case page_recv_t::RECV_BEING_READ:
- case page_recv_t::RECV_BEING_PROCESSED:
- p++;
- continue;
- case page_recv_t::RECV_WILL_NOT_READ:
- case page_recv_t::RECV_NOT_PROCESSED:
- recv_read_in_area(page_id, p, last_batch);
+ mysql_mutex_unlock(&buf_pool.mutex);
+ if (!last_batch)
+ log_sys.latch.wr_unlock();
+ wait_for_pool(1);
+ pages_it= pages.begin();
+ mysql_mutex_unlock(&mutex);
+ /* We must release log_sys.latch and recv_sys.mutex before
+ invoking buf_LRU_get_free_block(). Allocating a block may initiate
+ a redo log write and therefore acquire log_sys.latch. To avoid
+ deadlocks, log_sys.latch must not be acquired while holding
+ recv_sys.mutex. */
+ free_block= buf_LRU_get_free_block(have_no_mutex);
+ if (!last_batch)
+ log_sys.latch.wr_lock(SRW_LOCK_CALL);
+ mysql_mutex_lock(&mutex);
+ pages_it= pages.begin();
}
- p= pages.lower_bound(page_id);
- /* Ensure that progress will be made. */
- ut_ad(p == pages.end() || p->first > page_id ||
- p->second.state >= page_recv_t::RECV_BEING_FAKE_READ);
- }
-
- buf_pool.free_block(free_block);
-
- /* Wait until all the pages have been processed */
- for (;;)
- {
- const bool empty= pages.empty();
- if (empty && !buf_pool.n_pend_reads)
- break;
+ else
+ mysql_mutex_unlock(&buf_pool.mutex);
- if (!is_corrupt_fs() && !is_corrupt_log())
+ while (pages_it != pages.end())
{
- if (last_batch)
+ if (is_corrupt_fs() || is_corrupt_log())
{
- if (!empty)
- my_cond_wait(&cond, &mutex.m_mutex);
- else
- {
- mysql_mutex_unlock(&mutex);
- os_aio_wait_until_no_pending_reads();
- ut_ad(!buf_pool.n_pend_reads);
- mysql_mutex_lock(&mutex);
- ut_ad(pages.empty());
- }
- }
- else
- {
-#ifndef SUX_LOCK_GENERIC
- ut_ad(log_sys.latch.is_write_locked());
-#endif
- log_sys.latch.wr_unlock();
- set_timespec_nsec(abstime, 500000000ULL); /* 0.5s */
- my_cond_timedwait(&cond, &mutex.m_mutex, &abstime);
- mysql_mutex_unlock(&mutex);
- log_sys.latch.wr_lock(SRW_LOCK_CALL);
- mysql_mutex_lock(&mutex);
+ if (space)
+ space->release();
+ return;
}
- continue;
+ if (apply_batch(space_id, space, free_block, last_batch))
+ break;
}
- if (is_corrupt_fs() && !srv_force_recovery)
- sql_print_information("InnoDB: Set innodb_force_recovery=1"
- " to ignore corrupted pages.");
- return;
+
+ garbage_collect();
}
- }
- if (!last_batch)
- log_sys.latch.wr_unlock();
+ if (space)
+ space->release();
+ }
mysql_mutex_unlock(&mutex);
- if (last_batch && srv_operation != SRV_OPERATION_RESTORE &&
- srv_operation != SRV_OPERATION_RESTORE_EXPORT)
- /* Instead of flushing, last_batch sorts the buf_pool.flush_list
- in ascending order of buf_page_t::oldest_modification. */
- log_sort_flush_list();
- else
- buf_flush_sync_batch(lsn);
-
if (!last_batch)
{
+ log_sys.latch.wr_unlock();
+ buf_flush_sync_batch(lsn);
buf_pool_invalidate();
log_sys.latch.wr_lock(SRW_LOCK_CALL);
}
+ else if (srv_operation == SRV_OPERATION_RESTORE ||
+ srv_operation == SRV_OPERATION_RESTORE_EXPORT)
+ buf_flush_sync_batch(lsn);
+ else
+ /* Instead of flushing, last_batch sorts the buf_pool.flush_list
+ in ascending order of buf_page_t::oldest_modification. */
+ log_sort_flush_list();
+
#ifdef HAVE_PMEM
- else if (log_sys.is_pmem())
+ if (last_batch && log_sys.is_pmem())
mprotect(log_sys.buf, len, PROT_READ | PROT_WRITE);
#endif
@@ -3920,7 +3971,6 @@ static bool recv_scan_log(bool last_phase)
}
}
- recv_sys.maybe_finish_batch();
if (last_phase)
{
ut_ad(!rewound_lsn);
@@ -4030,8 +4080,7 @@ next:
/* fall through */
case file_name_t::DELETED:
recv_sys_t::map::iterator r = p++;
- r->second.log.clear();
- recv_sys.pages.erase(r);
+ recv_sys.erase(r);
continue;
}
ut_ad(0);