diff options
Diffstat (limited to 'storage/innobase/trx/trx0purge.cc')
-rw-r--r-- | storage/innobase/trx/trx0purge.cc | 415 |
1 files changed, 165 insertions, 250 deletions
diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc index b95a2ac565c..6e730faf567 100644 --- a/storage/innobase/trx/trx0purge.cc +++ b/storage/innobase/trx/trx0purge.cc @@ -43,6 +43,8 @@ Created 3/26/1996 Heikki Tuuri #include "trx0trx.h" #include <mysql/service_wsrep.h> +#include <unordered_map> + /** Maximum allowable purge history length. <=0 means 'infinite'. */ ulong srv_max_purge_lag = 0; @@ -160,10 +162,8 @@ purge_graph_build() void purge_sys_t::create() { ut_ad(this == &purge_sys); + ut_ad(!heap); ut_ad(!enabled()); - ut_ad(!event); - event= os_event_create(0); - ut_ad(event); m_paused= 0; query= purge_graph_build(); next_stored= false; @@ -176,16 +176,17 @@ void purge_sys_t::create() mutex_create(LATCH_ID_PURGE_SYS_PQ, &pq_mutex); truncate.current= NULL; truncate.last= NULL; + heap= mem_heap_create(4096); } /** Close the purge subsystem on shutdown. */ void purge_sys_t::close() { ut_ad(this == &purge_sys); - if (!event) return; + if (!heap) + return; ut_ad(!enabled()); - ut_ad(n_tasks.load(std::memory_order_relaxed) == 0); trx_t* trx = query->trx; que_graph_free(query); ut_ad(!trx->id); @@ -194,7 +195,8 @@ void purge_sys_t::close() trx_free(trx); rw_lock_free(&latch); mutex_free(&pq_mutex); - os_event_destroy(event); + mem_heap_free(heap); + heap= nullptr; } /*================ UNDO LOG HISTORY LIST =============================*/ @@ -213,15 +215,16 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) || undo == trx->rsegs.m_redo.old_insert); trx_rseg_t* rseg = trx->rsegs.m_redo.rseg; ut_ad(undo->rseg == rseg); - trx_rsegf_t* rseg_header = trx_rsegf_get( + buf_block_t* rseg_header = trx_rsegf_get( rseg->space, rseg->page_no, mtr); - page_t* undo_page = trx_undo_set_state_at_finish( + buf_block_t* undo_page = trx_undo_set_state_at_finish( undo, mtr); - trx_ulogf_t* undo_header = undo_page + undo->hdr_offset; + trx_ulogf_t* undo_header = undo_page->frame + undo->hdr_offset; ut_ad(mach_read_from_2(undo_header + TRX_UNDO_NEEDS_PURGE) <= 1); - if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG_FORMAT + rseg_header))) { + if (UNIV_UNLIKELY(mach_read_from_4(TRX_RSEG + TRX_RSEG_FORMAT + + rseg_header->frame))) { /* This database must have been upgraded from before MariaDB 10.3.5. */ trx_rseg_format_upgrade(rseg_header, mtr); @@ -230,23 +233,27 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) if (undo->state != TRX_UNDO_CACHED) { /* The undo log segment will not be reused */ ut_a(undo->id < TRX_RSEG_N_SLOTS); - trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr); + compile_time_assert(FIL_NULL == 0xffffffff); + mtr->memset(rseg_header, + TRX_RSEG + TRX_RSEG_UNDO_SLOTS + + undo->id * TRX_RSEG_SLOT_SIZE, 4, 0xff); MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED); uint32_t hist_size = mach_read_from_4(TRX_RSEG_HISTORY_SIZE - + rseg_header); + + TRX_RSEG + + rseg_header->frame); ut_ad(undo->size == flst_get_len(TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST - + undo_page)); - - mlog_write_ulint( - rseg_header + TRX_RSEG_HISTORY_SIZE, - hist_size + undo->size, MLOG_4BYTES, mtr); - - mlog_write_ull(rseg_header + TRX_RSEG_MAX_TRX_ID, - trx_sys.get_max_trx_id(), mtr); + + undo_page->frame)); + + mtr->write<4>(*rseg_header, TRX_RSEG + TRX_RSEG_HISTORY_SIZE + + rseg_header->frame, + hist_size + undo->size); + mtr->write<8>(*rseg_header, TRX_RSEG + TRX_RSEG_MAX_TRX_ID + + rseg_header->frame, + trx_sys.get_max_trx_id()); } /* After the purge thread has been given permission to exit, @@ -289,16 +296,18 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) } /* Add the log as the first in the history list */ - flst_add_first(rseg_header + TRX_RSEG_HISTORY, - undo_header + TRX_UNDO_HISTORY_NODE, mtr); + flst_add_first(rseg_header, TRX_RSEG + TRX_RSEG_HISTORY, undo_page, + static_cast<uint16_t>(undo->hdr_offset + + TRX_UNDO_HISTORY_NODE), mtr); - mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr); + mtr->write<8,mtr_t::MAYBE_NOP>(*undo_page, + undo_header + TRX_UNDO_TRX_NO, trx->no); /* This is needed for upgrading old undo log pages from before MariaDB 10.3.1. */ if (UNIV_UNLIKELY(!mach_read_from_2(undo_header + TRX_UNDO_NEEDS_PURGE))) { - mlog_write_ulint(undo_header + TRX_UNDO_NEEDS_PURGE, 1, - MLOG_2BYTES, mtr); + mtr->write<2>(*undo_page, undo_header + TRX_UNDO_NEEDS_PURGE, + 1U); } if (rseg->last_page_no == FIL_NULL) { @@ -322,19 +331,16 @@ trx_purge_add_undo_to_history(const trx_t* trx, trx_undo_t*& undo, mtr_t* mtr) } /** Remove undo log header from the history list. -@param[in,out] rseg_hdr rollback segment header -@param[in] log_hdr undo log segment header -@param[in,out] mtr mini transaction. */ -static -void -trx_purge_remove_log_hdr( - trx_rsegf_t* rseg_hdr, - trx_ulogf_t* log_hdr, - mtr_t* mtr) +@param[in,out] rseg rollback segment header page +@param[in] log undo log segment header page +@param[in] offset byte offset in the undo log segment header page +@param[in,out] mtr mini-transaction */ +static void trx_purge_remove_log_hdr(buf_block_t *rseg, buf_block_t* log, + uint16_t offset, mtr_t *mtr) { - flst_remove(rseg_hdr + TRX_RSEG_HISTORY, - log_hdr + TRX_UNDO_HISTORY_NODE, mtr); - trx_sys.rseg_history_len--; + flst_remove(rseg, TRX_RSEG + TRX_RSEG_HISTORY, + log, static_cast<uint16_t>(offset + TRX_UNDO_HISTORY_NODE), mtr); + trx_sys.rseg_history_len--; } /** Free an undo log segment, and remove the header from the history list. @@ -345,14 +351,12 @@ void trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr) { mtr_t mtr; - trx_rsegf_t* rseg_hdr; - page_t* undo_page; mtr.start(); mutex_enter(&rseg->mutex); - rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr); - undo_page = trx_undo_page_get( + buf_block_t* rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr); + buf_block_t* block = trx_undo_page_get( page_id_t(rseg->space->id, hdr_addr.page), &mtr); /* Mark the last undo log totally purged, so that if the @@ -360,12 +364,12 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr) again. The list of pages in the undo log tail gets inconsistent during the freeing of the segment, and therefore purge should not try to access them again. */ - mlog_write_ulint(undo_page + hdr_addr.boffset + TRX_UNDO_NEEDS_PURGE, - 0, MLOG_2BYTES, &mtr); + mtr.write<2,mtr_t::MAYBE_NOP>(*block, block->frame + hdr_addr.boffset + + TRX_UNDO_NEEDS_PURGE, 0U); while (!fseg_free_step_not_header( TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER - + undo_page, &mtr)) { + + block->frame, &mtr)) { mutex_exit(&rseg->mutex); mtr.commit(); @@ -375,7 +379,7 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr) rseg_hdr = trx_rsegf_get(rseg->space, rseg->page_no, &mtr); - undo_page = trx_undo_page_get( + block = trx_undo_page_get( page_id_t(rseg->space->id, hdr_addr.page), &mtr); } @@ -383,15 +387,15 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr) stored in the list base node tells us how big it was before we started the freeing. */ - const ulint seg_size = flst_get_len( - TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + undo_page); + const uint32_t seg_size = flst_get_len( + TRX_UNDO_SEG_HDR + TRX_UNDO_PAGE_LIST + block->frame); /* We may free the undo log segment header page; it must be freed within the same mtr as the undo log header is removed from the history list: otherwise, in case of a database crash, the segment could become inaccessible garbage in the file space. */ - trx_purge_remove_log_hdr(rseg_hdr, undo_page + hdr_addr.boffset, &mtr); + trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset, &mtr); do { @@ -401,14 +405,12 @@ trx_purge_free_segment(trx_rseg_t* rseg, fil_addr_t hdr_addr) fsp0fsp.cc. */ } while (!fseg_free_step(TRX_UNDO_SEG_HDR + TRX_UNDO_FSEG_HEADER - + undo_page, &mtr)); + + block->frame, &mtr)); - const ulint hist_size = mach_read_from_4(rseg_hdr - + TRX_RSEG_HISTORY_SIZE); - ut_ad(hist_size >= seg_size); + byte* hist = TRX_RSEG + TRX_RSEG_HISTORY_SIZE + rseg_hdr->frame; + ut_ad(mach_read_from_4(hist) >= seg_size); - mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE, - hist_size - seg_size, MLOG_4BYTES, &mtr); + mtr.write<4>(*rseg_hdr, hist, mach_read_from_4(hist) - seg_size); ut_ad(rseg->curr_size >= seg_size); @@ -430,10 +432,6 @@ trx_purge_truncate_rseg_history( { fil_addr_t hdr_addr; fil_addr_t prev_hdr_addr; - trx_rsegf_t* rseg_hdr; - page_t* undo_page; - trx_ulogf_t* log_hdr; - trx_usegf_t* seg_hdr; mtr_t mtr; trx_id_t undo_trx_no; @@ -441,10 +439,13 @@ trx_purge_truncate_rseg_history( ut_ad(rseg.is_persistent()); mutex_enter(&rseg.mutex); - rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr); + buf_block_t* rseg_hdr = trx_rsegf_get(rseg.space, rseg.page_no, &mtr); + + hdr_addr = flst_get_last(TRX_RSEG + TRX_RSEG_HISTORY + + rseg_hdr->frame); + hdr_addr.boffset = static_cast<uint16_t>(hdr_addr.boffset + - TRX_UNDO_HISTORY_NODE); - hdr_addr = trx_purge_get_log_from_hist( - flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr)); loop: if (hdr_addr.page == FIL_NULL) { func_exit: @@ -453,12 +454,11 @@ func_exit: return; } - undo_page = trx_undo_page_get(page_id_t(rseg.space->id, hdr_addr.page), - &mtr); - - log_hdr = undo_page + hdr_addr.boffset; - - undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO); + buf_block_t* block = trx_undo_page_get(page_id_t(rseg.space->id, + hdr_addr.page), + &mtr); + undo_trx_no = mach_read_from_8(block->frame + hdr_addr.boffset + + TRX_UNDO_TRX_NO); if (undo_trx_no >= limit.trx_no()) { if (undo_trx_no == limit.trx_no()) { @@ -470,13 +470,15 @@ func_exit: goto func_exit; } - prev_hdr_addr = trx_purge_get_log_from_hist( - flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr)); + prev_hdr_addr = flst_get_prev_addr(block->frame + hdr_addr.boffset + + TRX_UNDO_HISTORY_NODE); + prev_hdr_addr.boffset = static_cast<uint16_t>(prev_hdr_addr.boffset + - TRX_UNDO_HISTORY_NODE); - seg_hdr = undo_page + TRX_UNDO_SEG_HDR; - - if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE) - && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) { + if (mach_read_from_2(TRX_UNDO_SEG_HDR + TRX_UNDO_STATE + block->frame) + == TRX_UNDO_TO_PURGE + && !mach_read_from_2(block->frame + hdr_addr.boffset + + TRX_UNDO_NEXT_LOG)) { /* We can free the whole log segment */ @@ -488,7 +490,8 @@ func_exit: trx_purge_free_segment(&rseg, hdr_addr); } else { /* Remove the log hdr from the rseg history. */ - trx_purge_remove_log_hdr(rseg_hdr, log_hdr, &mtr); + trx_purge_remove_log_hdr(rseg_hdr, block, hdr_addr.boffset, + &mtr); mutex_exit(&rseg.mutex); mtr.commit(); @@ -569,7 +572,7 @@ static void trx_purge_truncate_history() return; } - while (srv_undo_log_truncate && srv_undo_logs >= 3) { + while (srv_undo_log_truncate) { if (!purge_sys.truncate.current) { const ulint threshold = ulint(srv_max_undo_log_size >> srv_page_size_shift); @@ -675,14 +678,7 @@ not_free: mini-transaction commit and the server was killed, then discarding the to-be-trimmed pages without flushing would break crash recovery. So, we cannot avoid the write. */ - { - FlushObserver observer( - purge_sys.truncate.current, - UT_LIST_GET_FIRST(purge_sys.query->thrs) - ->graph->trx, - NULL); - buf_LRU_flush_or_remove_pages(space.id, &observer); - } + buf_LRU_flush_or_remove_pages(space.id, true); log_free_check(); @@ -698,7 +694,7 @@ not_free: const ulint size = SRV_UNDO_TABLESPACE_SIZE_IN_PAGES; mtr.start(); mtr_x_lock_space(purge_sys.truncate.current, &mtr); - fil_truncate_log(purge_sys.truncate.current, size, &mtr); + mtr.trim_pages(page_id_t(space.id, size)); fsp_header_init(purge_sys.truncate.current, size, &mtr); mutex_enter(&fil_system.mutex); purge_sys.truncate.current->size = file->size = size; @@ -827,8 +823,6 @@ static void trx_purge_rseg_get_next_history_log( ulint* n_pages_handled)/*!< in/out: number of UNDO pages handled */ { - page_t* undo_page; - trx_ulogf_t* log_hdr; fil_addr_t prev_log_addr; trx_id_t trx_no; mtr_t mtr; @@ -843,18 +837,21 @@ static void trx_purge_rseg_get_next_history_log( mtr.start(); - undo_page = trx_undo_page_get_s_latched( + const buf_block_t* undo_page = trx_undo_page_get_s_latched( page_id_t(purge_sys.rseg->space->id, purge_sys.rseg->last_page_no), &mtr); - log_hdr = undo_page + purge_sys.rseg->last_offset; + const trx_ulogf_t* log_hdr = undo_page->frame + + purge_sys.rseg->last_offset; /* Increase the purge page count by one for every handled log */ (*n_pages_handled)++; - prev_log_addr = trx_purge_get_log_from_hist( - flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr)); + prev_log_addr = flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE); + prev_log_addr.boffset = static_cast<uint16_t>(prev_log_addr.boffset + - TRX_UNDO_HISTORY_NODE); + const bool empty = prev_log_addr.page == FIL_NULL; @@ -875,7 +872,7 @@ static void trx_purge_rseg_get_next_history_log( log_hdr = trx_undo_page_get_s_latched( page_id_t(purge_sys.rseg->space->id, prev_log_addr.page), - &mtr) + &mtr)->frame + prev_log_addr.boffset; trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO); @@ -910,8 +907,8 @@ static void trx_purge_read_undo_rec() { - ulint offset; - ulint page_no; + uint16_t offset; + uint32_t page_no; ib_uint64_t undo_no; purge_sys.hdr_offset = purge_sys.rseg->last_offset; @@ -920,13 +917,15 @@ trx_purge_read_undo_rec() if (purge_sys.rseg->needs_purge) { mtr_t mtr; mtr.start(); + buf_block_t* undo_page; if (trx_undo_rec_t* undo_rec = trx_undo_get_first_rec( - purge_sys.rseg->space, purge_sys.hdr_page_no, - purge_sys.hdr_offset, RW_S_LATCH, &mtr)) { + *purge_sys.rseg->space, purge_sys.hdr_page_no, + purge_sys.hdr_offset, RW_S_LATCH, + undo_page, &mtr)) { offset = page_offset(undo_rec); undo_no = trx_undo_rec_get_undo_no(undo_rec); - page_no = page_get_page_no(page_align(undo_rec)); + page_no = undo_page->page.id.page_no(); } else { offset = 0; undo_no = 0; @@ -976,22 +975,14 @@ trx_purge_get_next_rec( handled */ mem_heap_t* heap) /*!< in: memory heap where copied */ { - trx_undo_rec_t* rec; - trx_undo_rec_t* rec_copy; - trx_undo_rec_t* rec2; - page_t* undo_page; - page_t* page; - ulint offset; - ulint page_no; - ulint space; mtr_t mtr; ut_ad(purge_sys.next_stored); ut_ad(purge_sys.tail.trx_no() < purge_sys.view.low_limit_no()); - space = purge_sys.rseg->space->id; - page_no = purge_sys.page_no; - offset = purge_sys.offset; + const ulint space = purge_sys.rseg->space->id; + const uint32_t page_no = purge_sys.page_no; + const uint16_t offset = purge_sys.offset; if (offset == 0) { /* It is the dummy undo log record, which means that there is @@ -1008,16 +999,16 @@ trx_purge_get_next_rec( mtr_start(&mtr); - undo_page = trx_undo_page_get_s_latched(page_id_t(space, page_no), - &mtr); - - rec = undo_page + offset; + buf_block_t* undo_page = trx_undo_page_get_s_latched( + page_id_t(space, page_no), &mtr); + buf_block_t* rec2_page = undo_page; - rec2 = trx_undo_page_get_next_rec(rec, purge_sys.hdr_page_no, - purge_sys.hdr_offset); + const trx_undo_rec_t* rec2 = trx_undo_page_get_next_rec( + undo_page, offset, purge_sys.hdr_page_no, purge_sys.hdr_offset); if (rec2 == NULL) { - rec2 = trx_undo_get_next_rec(rec, purge_sys.hdr_page_no, + rec2 = trx_undo_get_next_rec(rec2_page, offset, + purge_sys.hdr_page_no, purge_sys.hdr_offset, &mtr); } @@ -1034,22 +1025,19 @@ trx_purge_get_next_rec( undo_page = trx_undo_page_get_s_latched( page_id_t(space, page_no), &mtr); - - rec = undo_page + offset; } else { - page = page_align(rec2); - - purge_sys.offset = ulint(rec2 - page); - purge_sys.page_no = page_get_page_no(page); + purge_sys.offset = page_offset(rec2); + purge_sys.page_no = rec2_page->page.id.page_no(); purge_sys.tail.undo_no = trx_undo_rec_get_undo_no(rec2); - if (undo_page != page) { + if (undo_page != rec2_page) { /* We advance to a new page of the undo log: */ (*n_pages_handled)++; } } - rec_copy = trx_undo_rec_copy(rec, heap); + trx_undo_rec_t* rec_copy = trx_undo_rec_copy(undo_page->frame + offset, + heap); mtr_commit(&mtr); @@ -1130,7 +1118,7 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) node = (purge_node_t*) thr->child; ut_ad(que_node_get_type(node) == QUE_NODE_PURGE); - ut_ad(node->undo_recs == NULL); + ut_ad(node->undo_recs.empty()); ut_ad(!node->in_progress); ut_d(node->in_progress = true); } @@ -1149,11 +1137,13 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) i = 0; - const ulint batch_size = srv_purge_batch_size; + const ulint batch_size = srv_purge_batch_size; + std::unordered_map<table_id_t, purge_node_t*> table_id_map; + mem_heap_empty(purge_sys.heap); while (UNIV_LIKELY(srv_undo_sources) || !srv_fast_shutdown) { purge_node_t* node; - trx_purge_rec_t* purge_rec; + trx_purge_rec_t purge_rec; ut_a(!thr->is_active); @@ -1161,9 +1151,6 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) node = (purge_node_t*) thr->child; ut_a(que_node_get_type(node) == QUE_NODE_PURGE); - purge_rec = static_cast<trx_purge_rec_t*>( - mem_heap_zalloc(node->heap, sizeof(*purge_rec))); - /* Track the max {trx_id, undo_no} for truncating the UNDO logs once we have purged the records. */ @@ -1172,37 +1159,40 @@ trx_purge_attach_undo_recs(ulint n_purge_threads) } /* Fetch the next record, and advance the purge_sys.tail. */ - purge_rec->undo_rec = trx_purge_fetch_next_rec( - &purge_rec->roll_ptr, &n_pages_handled, node->heap); - - if (purge_rec->undo_rec != NULL) { - - if (node->undo_recs == NULL) { - node->undo_recs = ib_vector_create( - ib_heap_allocator_create(node->heap), - sizeof(trx_purge_rec_t), - batch_size); - } else { - ut_a(!ib_vector_is_empty(node->undo_recs)); - } + purge_rec.undo_rec = trx_purge_fetch_next_rec( + &purge_rec.roll_ptr, &n_pages_handled, + purge_sys.heap); - ib_vector_push(node->undo_recs, purge_rec); + if (purge_rec.undo_rec == NULL) { + break; + } else if (purge_rec.undo_rec == &trx_purge_dummy_rec) { + continue; + } - if (n_pages_handled >= batch_size) { + table_id_t table_id = trx_undo_rec_get_table_id( + purge_rec.undo_rec); - break; - } + purge_node_t *& table_node = table_id_map[table_id]; + + if (table_node) { + node = table_node; } else { - break; - } + thr = UT_LIST_GET_NEXT(thrs, thr); - thr = UT_LIST_GET_NEXT(thrs, thr); + if (!(++i % n_purge_threads)) { + thr = UT_LIST_GET_FIRST( + purge_sys.query->thrs); + } - if (!(++i % n_purge_threads)) { - thr = UT_LIST_GET_FIRST(purge_sys.query->thrs); + ut_a(thr != NULL); + table_node = node; } - ut_a(thr != NULL); + node->undo_recs.push(purge_rec); + + if (n_pages_handled >= batch_size) { + break; + } } ut_ad(purge_sys.head <= purge_sys.tail); @@ -1228,9 +1218,8 @@ trx_purge_dml_delay(void) without holding trx_sys.mutex. */ if (srv_max_purge_lag > 0) { - float ratio; - - ratio = float(trx_sys.rseg_history_len) / srv_max_purge_lag; + double ratio = static_cast<double>(trx_sys.rseg_history_len) / + static_cast<double>(srv_max_purge_lag); if (ratio > 1.0) { /* If the history list length exceeds the @@ -1250,51 +1239,40 @@ trx_purge_dml_delay(void) return(delay); } +extern tpool::waitable_task purge_worker_task; + /** Wait for pending purge jobs to complete. */ -static -void -trx_purge_wait_for_workers_to_complete() +static void trx_purge_wait_for_workers_to_complete() { - /* Ensure that the work queue empties out. */ - while (purge_sys.n_tasks.load(std::memory_order_acquire)) { + bool notify_wait = purge_worker_task.is_running(); - if (srv_get_task_queue_length() > 0) { - srv_release_threads(SRV_WORKER, 1); - } + if (notify_wait) + tpool::tpool_wait_begin(); - os_thread_yield(); - } + purge_worker_task.wait(); + + if(notify_wait) + tpool::tpool_wait_end(); - /* There should be no outstanding tasks as long - as the worker threads are active. */ - ut_a(srv_get_task_queue_length() == 0); + /* There should be no outstanding tasks as long + as the worker threads are active. */ + ut_ad(srv_get_task_queue_length() == 0); } -/*******************************************************************//** -This function runs a purge batch. +/** +Run a purge batch. +@param n_tasks number of purge tasks to submit to the queue +@param truncate whether to truncate the history at the end of the batch @return number of undo log pages handled in the batch */ -ulint -trx_purge( -/*======*/ - ulint n_purge_threads, /*!< in: number of purge tasks - to submit to the work queue */ - bool truncate /*!< in: truncate history if true */ -#ifdef UNIV_DEBUG - , srv_slot_t *slot /*!< in/out: purge coordinator - thread slot */ -#endif -) +ulint trx_purge(ulint n_tasks, bool truncate) { que_thr_t* thr = NULL; ulint n_pages_handled; - ut_a(n_purge_threads > 0); + ut_ad(n_tasks > 0); srv_dml_needed_delay = trx_purge_dml_delay(); - /* All submitted tasks should be completed. */ - ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0); - rw_lock_x_lock(&purge_sys.latch); trx_sys.clone_oldest_view(); rw_lock_x_unlock(&purge_sys.latch); @@ -1306,25 +1284,22 @@ trx_purge( #endif /* UNIV_DEBUG */ /* Fetch the UNDO recs that need to be purged. */ - n_pages_handled = trx_purge_attach_undo_recs(n_purge_threads); - purge_sys.n_tasks.store(n_purge_threads - 1, std::memory_order_relaxed); + n_pages_handled = trx_purge_attach_undo_recs(n_tasks); /* Submit tasks to workers queue if using multi-threaded purge. */ - for (ulint i = n_purge_threads; --i; ) { + for (ulint i = n_tasks; --i; ) { thr = que_fork_scheduler_round_robin(purge_sys.query, thr); ut_a(thr); srv_que_task_enqueue_low(thr); + srv_thread_pool->submit_task(&purge_worker_task); } thr = que_fork_scheduler_round_robin(purge_sys.query, thr); - ut_d(thr->thread_slot = slot); que_run_threads(thr); trx_purge_wait_for_workers_to_complete(); - ut_ad(purge_sys.n_tasks.load(std::memory_order_relaxed) == 0); - if (truncate) { trx_purge_truncate_history(); } @@ -1334,63 +1309,3 @@ trx_purge( return(n_pages_handled); } - -/** Stop purge during FLUSH TABLES FOR EXPORT */ -void purge_sys_t::stop() -{ - rw_lock_x_lock(&latch); - - if (!enabled()) - { - /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */ - ut_ad(!srv_undo_sources); - rw_lock_x_unlock(&latch); - return; - } - - ut_ad(srv_n_purge_threads > 0); - - if (m_paused++ == 0) - { - /* We need to wakeup the purge thread in case it is suspended, so - that it can acknowledge the state change. */ - const int64_t sig_count = os_event_reset(event); - rw_lock_x_unlock(&latch); - ib::info() << "Stopping purge"; - srv_purge_wakeup(); - /* Wait for purge coordinator to signal that it is suspended. */ - os_event_wait_low(event, sig_count); - MONITOR_ATOMIC_INC(MONITOR_PURGE_STOP_COUNT); - return; - } - - rw_lock_x_unlock(&latch); - - if (running()) - { - ib::info() << "Waiting for purge to stop"; - while (running()) - os_thread_sleep(10000); - } -} - -/** Resume purge at UNLOCK TABLES after FLUSH TABLES FOR EXPORT */ -void purge_sys_t::resume() -{ - if (!enabled()) - { - /* Shutdown must have been initiated during FLUSH TABLES FOR EXPORT. */ - ut_ad(!srv_undo_sources); - return; - } - - int32_t paused= m_paused--; - ut_a(paused); - - if (paused == 1) - { - ib::info() << "Resuming purge"; - srv_purge_wakeup(); - MONITOR_ATOMIC_INC(MONITOR_PURGE_RESUME_COUNT); - } -} |