summaryrefslogtreecommitdiff
path: root/storage/innobase/trx/trx0purge.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/trx/trx0purge.cc')
-rw-r--r--storage/innobase/trx/trx0purge.cc415
1 files changed, 165 insertions, 250 deletions
diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc
index b95a2ac565c..6e730faf567 100644
--- a/storage/innobase/trx/trx0purge.cc
+++ b/storage/innobase/trx/trx0purge.cc
@@ -43,6 +43,8 @@ Created 3/26/1996 Heikki Tuuri
#include "trx0trx.h"
#include <mysql/service_wsrep.h>
+#include <unordered_map>
+
/** Maximum allowable purge history length. <=0 means 'infinite'. */
ulong srv_max_purge_lag = 0;
@@ -160,10 +162,8 @@ purge_graph_build()
void purge_sys_t::create()
{
ut_ad(this == &purge_sys);
+ ut_ad(!heap);
ut_ad(!enabled());
- ut_ad(!event);
- event= os_event_create(0);
- ut_ad(event);
m_paused= 0;
query= purge_graph_build();
next_stored= false;
@@ -176,16 +176,17 @@ void purge_sys_t::create()
mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex);
truncate.current= NULL;
truncate.last= NULL;
+ heap= mem_heap_create(4096);
}
/** Close the purge subsystem on shutdown. */
void purge_sys_t::close()
{
ut_ad(this == &purge_sys);
- if (!event) return;
+ if (!heap)
+ return;
ut_ad(!enabled());
- ut_ad(n_tasks.load(std::memory_order_relaxed) == 0);
trx_t* trx = query->trx;
que_graph_free(query);
ut_ad(!trx->id);
@@ -194,7 +195,8 @@ void purge_sys_t::close()
trx_free(trx);
rw_lock_free(&latch);
mutex_free(&pq_mutex);
- os_event_destroy(event);
+ mem_heap_free(heap);
+ heap= nullptr;
}
/*================ UNDO LOG HISTORY LIST =============================*/
@@ -213,15 +215,16 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
|| undo == trx->rsegs.m_redo.old_insert);
trx_rseg_t* rseg = trx->rsegs.m_redo.rseg;
ut_ad(undo->rseg == rseg);
- trx_rsegf_t* rseg_header = trx_rsegf_get(
+ buf_block_t* rseg_header = trx_rsegf_get(
rseg->space, rseg->page_no, mtr);
- page_t* undo_page = trx_undo_set_state_at_finish(
+ buf_block_t* undo_page = trx_undo_set_state_at_finish(
undo, mtr);
- trx_ulogf_t* undo_header = undo_page + undo->hdr_offset;
+ trx_ulogf_t* undo_header = undo_page->frame + undo->hdr_offset;
ut_ad(mach_read_from_2(undo_header + TRX_UNDO_NEEDS_PURGE) <= 1);
- if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG_FORMAT + rseg_header))) {
+ if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT
+ + rseg_header->frame))) {
/* This database must have been upgraded from
before MariaDB 10.3.5. */
trx_rseg_format_upgrade(rseg_header, mtr);
@@ -230,23 +233,27 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
if (undo->state != TRX_UNDO_CACHED) {
/* The undo log segment will not be reused */
ut_a(undo->id < TRX_RSEG_N_SLOTS);
- trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);
+ compile_time_assert(FIL_NULL == 0xffffffff);
+ mtr->memset(rseg_header,
+ TRX_RSEG + TRX_RSEG_UNDO_SLOTS
+ + undo->id * TRX_RSEG_SLOT_SIZE, 4, 0xff);
MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);
uint32_t hist_size = mach_read_from_4(TRX_RSEG_HISTORY_SIZE
- + rseg_header);
+ + TRX_RSEG
+ + rseg_header->frame);
ut_ad(undo->size == flst_get_len(TRX_UNDO_SEG_HDR
+ TRX_UNDO_PAGE_LIST
- + undo_page));
-
- mlog_write_ulint(
- rseg_header + TRX_RSEG_HISTORY_SIZE,
- hist_size + undo->size, MLOG_4BYTES, mtr);
-
- mlog_write_ull(rseg_header + TRX_RSEG_MAX_TRX_ID,
- trx_sys.get_max_trx_id(), mtr);
+ + undo_page->frame));
+
+ mtr->write<4>(*rseg_header, TRX_RSEG + TRX_RSEG_HISTORY_SIZE
+ + rseg_header->frame,
+ hist_size + undo->size);
+ mtr->write<8>(*rseg_header, TRX_RSEG + TRX_RSEG_MAX_TRX_ID
+ + rseg_header->frame,
+ trx_sys.get_max_trx_id());
}
/* After the purge thread has been given permission to exit,
@@ -289,16 +296,18 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
}
/* Add the log as the first in the history list */
- flst_add_first(rseg_header + TRX_RSEG_HISTORY,
- undo_header + TRX_UNDO_HISTORY_NODE, mtr);
+ flst_add_first(rseg_header, TRX_RSEG + TRX_RSEG_HISTORY, undo_page,
+ static_cast<uint16_t>(undo->hdr_offset
+ + TRX_UNDO_HISTORY_NODE), mtr);
- mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);
+ mtr->write<8,mtr_t::MAYBE_NOP>(*undo_page,
+ undo_header + TRX_UNDO_TRX_NO, trx->no);
/* This is needed for upgrading old undo log pages from
before MariaDB 10.3.1. */
if (UNIV_UNLIKELY(!mach_read_from_2(undo_header
+ TRX_UNDO_NEEDS_PURGE))) {
- mlog_write_ulint(undo_header + TRX_UNDO_NEEDS_PURGE, 1,
- MLOG_2BYTES, mtr);
+ mtr->write<2>(*undo_page, undo_header + TRX_UNDO_NEEDS_PURGE,
+ 1U);
}
if (rseg->last_page_no == FIL_NULL) {
@@ -322,19 +331,16 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr)
}
/** Remove undo log header from the history list.
-@param[in,out] rseg_hdr rollback segment header
-@param[in] log_hdr undo log segment header
-@param[in,out] mtr mini transaction. */
-static
-void
-trx_purge_remove_log_hdr(
- trx_rsegf_t* rseg_hdr,
- trx_ulogf_t* log_hdr,
- mtr_t* mtr)
+@param[in,out] rseg rollback segment header page
+@param[in] log undo log segment header page
+@param[in] offset byte offset in the undo log segment header page
+@param[in,out] mtr mini-transaction */
+static void trx_purge_remove_log_hdr(buf_block_t *rseg, buf_block_t* log,
+ uint16_t offset, mtr_t *mtr)
{
- flst_remove(rseg_hdr + TRX_RSEG_HISTORY,
- log_hdr + TRX_UNDO_HISTORY_NODE, mtr);
- trx_sys.rseg_history_len--;
+ flst_remove(rseg, TRX_RSEG + TRX_RSEG_HISTORY,
+ log, static_cast<uint16_t>(offset + TRX_UNDO_HISTORY_NODE), mtr);
+ trx_sys.rseg_history_len--;
}
/** Free an undo log segment, and remove the header from the history list.
@@ -345,14 +351,12 @@ void
trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
{
mtr_t mtr;
- trx_rsegf_t* rseg_hdr;
- page_t* undo_page;
mtr.start();
mutex_enter(&rseg->mutex);
- rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
- undo_page = trx_undo_page_get(
+ buf_block_t* rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
+ buf_block_t* block = trx_undo_page_get(
page_id_t(rseg->space->id, hdr_addr.page), &mtr);
/* Mark the last undo log totally purged, so that if the
@@ -360,12 +364,12 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
again. The list of pages in the undo log tail gets
inconsistent during the freeing of the segment, and therefore
purge should not try to access them again. */
- mlog_write_ulint(undo_page + hdr_addr.boffset + TRX_UNDO_NEEDS_PURGE,
- 0, MLOG_2BYTES, &mtr);
+ mtr.write<2,mtr_t::MAYBE_NOP>(*block, block->frame + hdr_addr.boffset
+ + TRX_UNDO_NEEDS_PURGE, 0U);
while (!fseg_free_step_not_header(
TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER
- + undo_page, &mtr)) {
+ + block->frame, &mtr)) {
mutex_exit(&rseg->mutex);
mtr.commit();
@@ -375,7 +379,7 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr);
- undo_page = trx_undo_page_get(
+ block = trx_undo_page_get(
page_id_t(rseg->space->id, hdr_addr.page), &mtr);
}
@@ -383,15 +387,15 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
stored in the list base node tells us how big it was before we
started the freeing. */
- const ulint seg_size = flst_get_len(
- TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + undo_page);
+ const uint32_t seg_size = flst_get_len(
+ TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->frame);
/* We may free the undo log segment header page; it must be freed
within the same mtr as the undo log header is removed from the
history list: otherwise, in case of a database crash, the segment
could become inaccessible garbage in the file space. */
- trx_purge_remove_log_hdr(rseg_hdr, undo_page + hdr_addr.boffset, &mtr);
+ trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset, &mtr);
do {
@@ -401,14 +405,12 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr)
fsp0fsp.cc. */
} while (!fseg_free_step(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER
- + undo_page, &mtr));
+ + block->frame, &mtr));
- const ulint hist_size = mach_read_from_4(rseg_hdr
- + TRX_RSEG_HISTORY_SIZE);
- ut_ad(hist_size >= seg_size);
+ byte* hist = TRX_RSEG + TRX_RSEG_HISTORY_SIZE + rseg_hdr->frame;
+ ut_ad(mach_read_from_4(hist) >= seg_size);
- mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE,
- hist_size - seg_size, MLOG_4BYTES, &mtr);
+ mtr.write<4>(*rseg_hdr, hist, mach_read_from_4(hist) - seg_size);
ut_ad(rseg->curr_size >= seg_size);
@@ -430,10 +432,6 @@ trx_purge_truncate_rseg_history(
{
fil_addr_t hdr_addr;
fil_addr_t prev_hdr_addr;
- trx_rsegf_t* rseg_hdr;
- page_t* undo_page;
- trx_ulogf_t* log_hdr;
- trx_usegf_t* seg_hdr;
mtr_t mtr;
trx_id_t undo_trx_no;
@@ -441,10 +439,13 @@ trx_purge_truncate_rseg_history(
ut_ad(rseg.is_persistent());
mutex_enter(&rseg.mutex);
- rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr);
+ buf_block_t* rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr);
+
+ hdr_addr = flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY
+ + rseg_hdr->frame);
+ hdr_addr.boffset = static_cast<uint16_t>(hdr_addr.boffset
+ - TRX_UNDO_HISTORY_NODE);
- hdr_addr = trx_purge_get_log_from_hist(
- flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr));
loop:
if (hdr_addr.page == FIL_NULL) {
func_exit:
@@ -453,12 +454,11 @@ func_exit:
return;
}
- undo_page = trx_undo_page_get(page_id_t(rseg.space->id, hdr_addr.page),
- &mtr);
-
- log_hdr = undo_page + hdr_addr.boffset;
-
- undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
+ buf_block_t* block = trx_undo_page_get(page_id_t(rseg.space->id,
+ hdr_addr.page),
+ &mtr);
+ undo_trx_no = mach_read_from_8(block->frame + hdr_addr.boffset
+ + TRX_UNDO_TRX_NO);
if (undo_trx_no >= limit.trx_no()) {
if (undo_trx_no == limit.trx_no()) {
@@ -470,13 +470,15 @@ func_exit:
goto func_exit;
}
- prev_hdr_addr = trx_purge_get_log_from_hist(
- flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
+ prev_hdr_addr = flst_get_prev_addr(block->frame + hdr_addr.boffset
+ + TRX_UNDO_HISTORY_NODE);
+ prev_hdr_addr.boffset = static_cast<uint16_t>(prev_hdr_addr.boffset
+ - TRX_UNDO_HISTORY_NODE);
- seg_hdr = undo_page + TRX_UNDO_SEG_HDR;
-
- if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE)
- && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) {
+ if (mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE + block->frame)
+ == TRX_UNDO_TO_PURGE
+ && !mach_read_from_2(block->frame + hdr_addr.boffset
+ + TRX_UNDO_NEXT_LOG)) {
/* We can free the whole log segment */
@@ -488,7 +490,8 @@ func_exit:
trx_purge_free_segment(&rseg, hdr_addr);
} else {
/* Remove the log hdr from the rseg history. */
- trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr);
+ trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset,
+ &mtr);
mutex_exit(&rseg.mutex);
mtr.commit();
@@ -569,7 +572,7 @@ static void trx_purge_truncate_history()
return;
}
- while (srv_undo_log_truncate && srv_undo_logs >= 3) {
+ while (srv_undo_log_truncate) {
if (!purge_sys.truncate.current) {
const ulint threshold = ulint(srv_max_undo_log_size
>> srv_page_size_shift);
@@ -675,14 +678,7 @@ not_free:
mini-transaction commit and the server was killed, then
discarding the to-be-trimmed pages without flushing would
break crash recovery. So, we cannot avoid the write. */
- {
- FlushObserver observer(
- purge_sys.truncate.current,
- UT_LIST_GET_FIRST(purge_sys.query->thrs)
- ->graph->trx,
- NULL);
- buf_LRU_flush_or_remove_pages(space.id, &observer);
- }
+ buf_LRU_flush_or_remove_pages(space.id, true);
log_free_check();
@@ -698,7 +694,7 @@ not_free:
const ulint size = SRV_UNDO_TABLESPACE_SIZE_IN_PAGES;
mtr.start();
mtr_x_lock_space(purge_sys.truncate.current, &mtr);
- fil_truncate_log(purge_sys.truncate.current, size, &mtr);
+ mtr.trim_pages(page_id_t(space.id, size));
fsp_header_init(purge_sys.truncate.current, size, &mtr);
mutex_enter(&fil_system.mutex);
purge_sys.truncate.current->size = file->size = size;
@@ -827,8 +823,6 @@ static void trx_purge_rseg_get_next_history_log(
ulint* n_pages_handled)/*!< in/out: number of UNDO pages
handled */
{
- page_t* undo_page;
- trx_ulogf_t* log_hdr;
fil_addr_t prev_log_addr;
trx_id_t trx_no;
mtr_t mtr;
@@ -843,18 +837,21 @@ static void trx_purge_rseg_get_next_history_log(
mtr.start();
- undo_page = trx_undo_page_get_s_latched(
+ const buf_block_t* undo_page = trx_undo_page_get_s_latched(
page_id_t(purge_sys.rseg->space->id,
purge_sys.rseg->last_page_no), &mtr);
- log_hdr = undo_page + purge_sys.rseg->last_offset;
+ const trx_ulogf_t* log_hdr = undo_page->frame
+ + purge_sys.rseg->last_offset;
/* Increase the purge page count by one for every handled log */
(*n_pages_handled)++;
- prev_log_addr = trx_purge_get_log_from_hist(
- flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr));
+ prev_log_addr = flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE);
+ prev_log_addr.boffset = static_cast<uint16_t>(prev_log_addr.boffset
+ - TRX_UNDO_HISTORY_NODE);
+
const bool empty = prev_log_addr.page == FIL_NULL;
@@ -875,7 +872,7 @@ static void trx_purge_rseg_get_next_history_log(
log_hdr = trx_undo_page_get_s_latched(
page_id_t(purge_sys.rseg->space->id, prev_log_addr.page),
- &mtr)
+ &mtr)->frame
+ prev_log_addr.boffset;
trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO);
@@ -910,8 +907,8 @@ static
void
trx_purge_read_undo_rec()
{
- ulint offset;
- ulint page_no;
+ uint16_t offset;
+ uint32_t page_no;
ib_uint64_t undo_no;
purge_sys.hdr_offset = purge_sys.rseg->last_offset;
@@ -920,13 +917,15 @@ trx_purge_read_undo_rec()
if (purge_sys.rseg->needs_purge) {
mtr_t mtr;
mtr.start();
+ buf_block_t* undo_page;
if (trx_undo_rec_t* undo_rec = trx_undo_get_first_rec(
- purge_sys.rseg->space, purge_sys.hdr_page_no,
- purge_sys.hdr_offset, RW_S_LATCH, &mtr)) {
+ *purge_sys.rseg->space, purge_sys.hdr_page_no,
+ purge_sys.hdr_offset, RW_S_LATCH,
+ undo_page, &mtr)) {
offset = page_offset(undo_rec);
undo_no = trx_undo_rec_get_undo_no(undo_rec);
- page_no = page_get_page_no(page_align(undo_rec));
+ page_no = undo_page->page.id.page_no();
} else {
offset = 0;
undo_no = 0;
@@ -976,22 +975,14 @@ trx_purge_get_next_rec(
handled */
mem_heap_t* heap) /*!< in: memory heap where copied */
{
- trx_undo_rec_t* rec;
- trx_undo_rec_t* rec_copy;
- trx_undo_rec_t* rec2;
- page_t* undo_page;
- page_t* page;
- ulint offset;
- ulint page_no;
- ulint space;
mtr_t mtr;
ut_ad(purge_sys.next_stored);
ut_ad(purge_sys.tail.trx_no() < purge_sys.view.low_limit_no());
- space = purge_sys.rseg->space->id;
- page_no = purge_sys.page_no;
- offset = purge_sys.offset;
+ const ulint space = purge_sys.rseg->space->id;
+ const uint32_t page_no = purge_sys.page_no;
+ const uint16_t offset = purge_sys.offset;
if (offset == 0) {
/* It is the dummy undo log record, which means that there is
@@ -1008,16 +999,16 @@ trx_purge_get_next_rec(
mtr_start(&mtr);
- undo_page = trx_undo_page_get_s_latched(page_id_t(space, page_no),
- &mtr);
-
- rec = undo_page + offset;
+ buf_block_t* undo_page = trx_undo_page_get_s_latched(
+ page_id_t(space, page_no), &mtr);
+ buf_block_t* rec2_page = undo_page;
- rec2 = trx_undo_page_get_next_rec(rec, purge_sys.hdr_page_no,
- purge_sys.hdr_offset);
+ const trx_undo_rec_t* rec2 = trx_undo_page_get_next_rec(
+ undo_page, offset, purge_sys.hdr_page_no, purge_sys.hdr_offset);
if (rec2 == NULL) {
- rec2 = trx_undo_get_next_rec(rec, purge_sys.hdr_page_no,
+ rec2 = trx_undo_get_next_rec(rec2_page, offset,
+ purge_sys.hdr_page_no,
purge_sys.hdr_offset, &mtr);
}
@@ -1034,22 +1025,19 @@ trx_purge_get_next_rec(
undo_page = trx_undo_page_get_s_latched(
page_id_t(space, page_no), &mtr);
-
- rec = undo_page + offset;
} else {
- page = page_align(rec2);
-
- purge_sys.offset = ulint(rec2 - page);
- purge_sys.page_no = page_get_page_no(page);
+ purge_sys.offset = page_offset(rec2);
+ purge_sys.page_no = rec2_page->page.id.page_no();
purge_sys.tail.undo_no = trx_undo_rec_get_undo_no(rec2);
- if (undo_page != page) {
+ if (undo_page != rec2_page) {
/* We advance to a new page of the undo log: */
(*n_pages_handled)++;
}
}
- rec_copy = trx_undo_rec_copy(rec, heap);
+ trx_undo_rec_t* rec_copy = trx_undo_rec_copy(undo_page->frame + offset,
+ heap);
mtr_commit(&mtr);
@@ -1130,7 +1118,7 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
node = (purge_node_t*) thr->child;
ut_ad(que_node_get_type(node) == QUE_NODE_PURGE);
- ut_ad(node->undo_recs == NULL);
+ ut_ad(node->undo_recs.empty());
ut_ad(!node->in_progress);
ut_d(node->in_progress = true);
}
@@ -1149,11 +1137,13 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
i = 0;
- const ulint batch_size = srv_purge_batch_size;
+ const ulint batch_size = srv_purge_batch_size;
+ std::unordered_map<table_id_t, purge_node_t*> table_id_map;
+ mem_heap_empty(purge_sys.heap);
while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) {
purge_node_t* node;
- trx_purge_rec_t* purge_rec;
+ trx_purge_rec_t purge_rec;
ut_a(!thr->is_active);
@@ -1161,9 +1151,6 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
node = (purge_node_t*) thr->child;
ut_a(que_node_get_type(node) == QUE_NODE_PURGE);
- purge_rec = static_cast<trx_purge_rec_t*>(
- mem_heap_zalloc(node->heap, sizeof(*purge_rec)));
-
/* Track the max {trx_id, undo_no} for truncating the
UNDO logs once we have purged the records. */
@@ -1172,37 +1159,40 @@ trx_purge_attach_undo_recs(ulint n_purge_threads)
}
/* Fetch the next record, and advance the purge_sys.tail. */
- purge_rec->undo_rec = trx_purge_fetch_next_rec(
- &purge_rec->roll_ptr, &n_pages_handled, node->heap);
-
- if (purge_rec->undo_rec != NULL) {
-
- if (node->undo_recs == NULL) {
- node->undo_recs = ib_vector_create(
- ib_heap_allocator_create(node->heap),
- sizeof(trx_purge_rec_t),
- batch_size);
- } else {
- ut_a(!ib_vector_is_empty(node->undo_recs));
- }
+ purge_rec.undo_rec = trx_purge_fetch_next_rec(
+ &purge_rec.roll_ptr, &n_pages_handled,
+ purge_sys.heap);
- ib_vector_push(node->undo_recs, purge_rec);
+ if (purge_rec.undo_rec == NULL) {
+ break;
+ } else if (purge_rec.undo_rec == &trx_purge_dummy_rec) {
+ continue;
+ }
- if (n_pages_handled >= batch_size) {
+ table_id_t table_id = trx_undo_rec_get_table_id(
+ purge_rec.undo_rec);
- break;
- }
+ purge_node_t *& table_node = table_id_map[table_id];
+
+ if (table_node) {
+ node = table_node;
} else {
- break;
- }
+ thr = UT_LIST_GET_NEXT(thrs, thr);
- thr = UT_LIST_GET_NEXT(thrs, thr);
+ if (!(++i % n_purge_threads)) {
+ thr = UT_LIST_GET_FIRST(
+ purge_sys.query->thrs);
+ }
- if (!(++i % n_purge_threads)) {
- thr = UT_LIST_GET_FIRST(purge_sys.query->thrs);
+ ut_a(thr != NULL);
+ table_node = node;
}
- ut_a(thr != NULL);
+ node->undo_recs.push(purge_rec);
+
+ if (n_pages_handled >= batch_size) {
+ break;
+ }
}
ut_ad(purge_sys.head <= purge_sys.tail);
@@ -1228,9 +1218,8 @@ trx_purge_dml_delay(void)
without holding trx_sys.mutex. */
if (srv_max_purge_lag > 0) {
- float ratio;
-
- ratio = float(trx_sys.rseg_history_len) / srv_max_purge_lag;
+ double ratio = static_cast<double>(trx_sys.rseg_history_len) /
+ static_cast<double>(srv_max_purge_lag);
if (ratio > 1.0) {
/* If the history list length exceeds the
@@ -1250,51 +1239,40 @@ trx_purge_dml_delay(void)
return(delay);
}
+extern tpool::waitable_task purge_worker_task;
+
/** Wait for pending purge jobs to complete. */
-static
-void
-trx_purge_wait_for_workers_to_complete()
+static void trx_purge_wait_for_workers_to_complete()
{
- /* Ensure that the work queue empties out. */
- while (purge_sys.n_tasks.load(std::memory_order_acquire)) {
+ bool notify_wait = purge_worker_task.is_running();
- if (srv_get_task_queue_length() > 0) {
- srv_release_threads(SRV_WORKER, 1);
- }
+ if (notify_wait)
+ tpool::tpool_wait_begin();
- os_thread_yield();
- }
+ purge_worker_task.wait();
+
+ if(notify_wait)
+ tpool::tpool_wait_end();
- /* There should be no outstanding tasks as long
- as the worker threads are active. */
- ut_a(srv_get_task_queue_length() == 0);
+ /* There should be no outstanding tasks as long
+ as the worker threads are active. */
+ ut_ad(srv_get_task_queue_length() == 0);
}
-/*******************************************************************//**
-This function runs a purge batch.
+/**
+Run a purge batch.
+@param n_tasks number of purge tasks to submit to the queue
+@param truncate whether to truncate the history at the end of the batch
@return number of undo log pages handled in the batch */
-ulint
-trx_purge(
-/*======*/
- ulint n_purge_threads, /*!< in: number of purge tasks
- to submit to the work queue */
- bool truncate /*!< in: truncate history if true */
-#ifdef UNIV_DEBUG
- , srv_slot_t *slot /*!< in/out: purge coordinator
- thread slot */
-#endif
-)
+ulint trx_purge(ulint n_tasks, bool truncate)
{
que_thr_t* thr = NULL;
ulint n_pages_handled;
- ut_a(n_purge_threads > 0);
+ ut_ad(n_tasks > 0);
srv_dml_needed_delay = trx_purge_dml_delay();
- /* All submitted tasks should be completed. */
- ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0);
-
rw_lock_x_lock(&purge_sys.latch);
trx_sys.clone_oldest_view();
rw_lock_x_unlock(&purge_sys.latch);
@@ -1306,25 +1284,22 @@ trx_purge(
#endif /* UNIV_DEBUG */
/* Fetch the UNDO recs that need to be purged. */
- n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads);
- purge_sys.n_tasks.store(n_purge_threads - 1, std::memory_order_relaxed);
+ n_pages_handled = trx_purge_attach_undo_recs(n_tasks);
/* Submit tasks to workers queue if using multi-threaded purge. */
- for (ulint i = n_purge_threads; --i; ) {
+ for (ulint i = n_tasks; --i; ) {
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
ut_a(thr);
srv_que_task_enqueue_low(thr);
+ srv_thread_pool->submit_task(&purge_worker_task);
}
thr = que_fork_scheduler_round_robin(purge_sys.query, thr);
- ut_d(thr->thread_slot = slot);
que_run_threads(thr);
trx_purge_wait_for_workers_to_complete();
- ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0);
-
if (truncate) {
trx_purge_truncate_history();
}
@@ -1334,63 +1309,3 @@ trx_purge(
return(n_pages_handled);
}
-
-/** Stop purge during FLUSH TABLES FOR EXPORT */
-void purge_sys_t::stop()
-{
- rw_lock_x_lock(&latch);
-
- if (!enabled())
- {
- /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
- ut_ad(!srv_undo_sources);
- rw_lock_x_unlock(&latch);
- return;
- }
-
- ut_ad(srv_n_purge_threads > 0);
-
- if (m_paused++ == 0)
- {
- /* We need to wakeup the purge thread in case it is suspended, so
- that it can acknowledge the state change. */
- const int64_t sig_count = os_event_reset(event);
- rw_lock_x_unlock(&latch);
- ib::info() << "Stopping purge";
- srv_purge_wakeup();
- /* Wait for purge coordinator to signal that it is suspended. */
- os_event_wait_low(event, sig_count);
- MONITOR_ATOMIC_INC(MONITOR_PURGE_STOP_COUNT);
- return;
- }
-
- rw_lock_x_unlock(&latch);
-
- if (running())
- {
- ib::info() << "Waiting for purge to stop";
- while (running())
- os_thread_sleep(10000);
- }
-}
-
-/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */
-void purge_sys_t::resume()
-{
- if (!enabled())
- {
- /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */
- ut_ad(!srv_undo_sources);
- return;
- }
-
- int32_t paused= m_paused--;
- ut_a(paused);
-
- if (paused == 1)
- {
- ib::info() << "Resuming purge";
- srv_purge_wakeup();
- MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT);
- }
-}