diff options
Diffstat (limited to 'storage/innobase/trx/trx0purge.cc')
-rw-r--r-- | storage/innobase/trx/trx0purge.cc | 1371 |
1 files changed, 1371 insertions, 0 deletions
diff --git a/storage/innobase/trx/trx0purge.cc b/storage/innobase/trx/trx0purge.cc new file mode 100644 index 00000000000..62c140879aa --- /dev/null +++ b/storage/innobase/trx/trx0purge.cc @@ -0,0 +1,1371 @@ +/***************************************************************************** + +Copyright (c) 1996, 2012, Oracle and/or its affiliates. All Rights Reserved. + +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. + +You should have received a copy of the GNU General Public License along with +this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA + +*****************************************************************************/ + +/**************************************************//** +@file trx/trx0purge.cc +Purge old versions + +Created 3/26/1996 Heikki Tuuri +*******************************************************/ + +#include "trx0purge.h" + +#ifdef UNIV_NONINL +#include "trx0purge.ic" +#endif + +#include "fsp0fsp.h" +#include "mach0data.h" +#include "trx0rseg.h" +#include "trx0trx.h" +#include "trx0roll.h" +#include "read0read.h" +#include "fut0fut.h" +#include "que0que.h" +#include "row0purge.h" +#include "row0upd.h" +#include "trx0rec.h" +#include "srv0srv.h" +#include "srv0start.h" +#include "os0thread.h" +#include "srv0mon.h" +#include "mtr0log.h" + +/** Maximum allowable purge history length. <=0 means 'infinite'. */ +UNIV_INTERN ulong srv_max_purge_lag = 0; + +/** Max DML user threads delay in micro-seconds. */ +UNIV_INTERN ulong srv_max_purge_lag_delay = 0; + +/** The global data structure coordinating a purge */ +UNIV_INTERN trx_purge_t* purge_sys = NULL; + +/** A dummy undo record used as a return value when we have a whole undo log +which needs no purge */ +UNIV_INTERN trx_undo_rec_t trx_purge_dummy_rec; + +#ifdef UNIV_PFS_RWLOCK +/* Key to register trx_purge_latch with performance schema */ +UNIV_INTERN mysql_pfs_key_t trx_purge_latch_key; +#endif /* UNIV_PFS_RWLOCK */ + +#ifdef UNIV_PFS_MUTEX +/* Key to register purge_sys_bh_mutex with performance schema */ +UNIV_INTERN mysql_pfs_key_t purge_sys_bh_mutex_key; +#endif /* UNIV_PFS_MUTEX */ + +/********************************************************************//** +Fetches the next undo log record from the history list to purge. It must be +released with the corresponding release function. +@return copy of an undo log record or pointer to trx_purge_dummy_rec, +if the whole undo log can skipped in purge; NULL if none left */ +static +trx_undo_rec_t* +trx_purge_fetch_next_rec( +/*=====================*/ + roll_ptr_t* roll_ptr, /*!< out: roll pointer to undo record */ + ulint* n_pages_handled,/*!< in/out: number of UNDO log pages + handled */ + mem_heap_t* heap); /*!< in: memory heap where copied */ + +/****************************************************************//** +Builds a purge 'query' graph. The actual purge is performed by executing +this query graph. +@return own: the query graph */ +static +que_t* +trx_purge_graph_build( +/*==================*/ + trx_t* trx, /*!< in: transaction */ + ulint n_purge_threads) /*!< in: number of purge + threads */ +{ + ulint i; + mem_heap_t* heap; + que_fork_t* fork; + + heap = mem_heap_create(512); + fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap); + fork->trx = trx; + + for (i = 0; i < n_purge_threads; ++i) { + que_thr_t* thr; + + thr = que_thr_create(fork, heap); + + thr->child = row_purge_node_create(thr, heap); + } + + return(fork); +} + +/********************************************************************//** +Creates the global purge system control structure and inits the history +mutex. */ +UNIV_INTERN +void +trx_purge_sys_create( +/*=================*/ + ulint n_purge_threads, /*!< in: number of purge + threads */ + ib_bh_t* ib_bh) /*!< in, own: UNDO log min + binary heap */ +{ + purge_sys = static_cast<trx_purge_t*>(mem_zalloc(sizeof(*purge_sys))); + + purge_sys->state = PURGE_STATE_INIT; + purge_sys->event = os_event_create("purge"); + + /* Take ownership of ib_bh, we are responsible for freeing it. */ + purge_sys->ib_bh = ib_bh; + + rw_lock_create(trx_purge_latch_key, + &purge_sys->latch, SYNC_PURGE_LATCH); + + mutex_create( + purge_sys_bh_mutex_key, &purge_sys->bh_mutex, + SYNC_PURGE_QUEUE); + + purge_sys->heap = mem_heap_create(256); + + ut_a(n_purge_threads > 0); + + purge_sys->sess = sess_open(); + + purge_sys->trx = purge_sys->sess->trx; + + ut_a(purge_sys->trx->sess == purge_sys->sess); + + /* A purge transaction is not a real transaction, we use a transaction + here only because the query threads code requires it. It is otherwise + quite unnecessary. We should get rid of it eventually. */ + purge_sys->trx->id = 0; + purge_sys->trx->start_time = ut_time(); + purge_sys->trx->state = TRX_STATE_ACTIVE; + purge_sys->trx->op_info = "purge trx"; + + purge_sys->query = trx_purge_graph_build( + purge_sys->trx, n_purge_threads); + + purge_sys->view = read_view_purge_open(purge_sys->heap); +} + +/************************************************************************ +Frees the global purge system control structure. */ +UNIV_INTERN +void +trx_purge_sys_close(void) +/*======================*/ +{ + que_graph_free(purge_sys->query); + + ut_a(purge_sys->trx->id == 0); + ut_a(purge_sys->sess->trx == purge_sys->trx); + + purge_sys->trx->state = TRX_STATE_NOT_STARTED; + + sess_close(purge_sys->sess); + + purge_sys->sess = NULL; + + purge_sys->view = NULL; + + rw_lock_free(&purge_sys->latch); + mutex_free(&purge_sys->bh_mutex); + + mem_heap_free(purge_sys->heap); + + ib_bh_free(purge_sys->ib_bh); + + os_event_free(purge_sys->event); + + purge_sys->event = NULL; + + mem_free(purge_sys); + + purge_sys = NULL; +} + +/*================ UNDO LOG HISTORY LIST =============================*/ + +/********************************************************************//** +Adds the update undo log as the first log in the history list. Removes the +update undo log segment from the rseg slot if it is too big for reuse. */ +UNIV_INTERN +void +trx_purge_add_update_undo_to_history( +/*=================================*/ + trx_t* trx, /*!< in: transaction */ + page_t* undo_page, /*!< in: update undo log header page, + x-latched */ + mtr_t* mtr) /*!< in: mtr */ +{ + trx_undo_t* undo; + trx_rseg_t* rseg; + trx_rsegf_t* rseg_header; + trx_ulogf_t* undo_header; + + undo = trx->update_undo; + rseg = undo->rseg; + + rseg_header = trx_rsegf_get( + undo->rseg->space, undo->rseg->zip_size, undo->rseg->page_no, + mtr); + + undo_header = undo_page + undo->hdr_offset; + + if (undo->state != TRX_UNDO_CACHED) { + ulint hist_size; +#ifdef UNIV_DEBUG + trx_usegf_t* seg_header = undo_page + TRX_UNDO_SEG_HDR; +#endif /* UNIV_DEBUG */ + + /* The undo log segment will not be reused */ + + if (UNIV_UNLIKELY(undo->id >= TRX_RSEG_N_SLOTS)) { + fprintf(stderr, + "InnoDB: Error: undo->id is %lu\n", + (ulong) undo->id); + ut_error; + } + + trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr); + + MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED); + + hist_size = mtr_read_ulint( + rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr); + + ut_ad(undo->size == flst_get_len( + seg_header + TRX_UNDO_PAGE_LIST, mtr)); + + mlog_write_ulint( + rseg_header + TRX_RSEG_HISTORY_SIZE, + hist_size + undo->size, MLOG_4BYTES, mtr); + } + + /* Add the log as the first in the history list */ + flst_add_first(rseg_header + TRX_RSEG_HISTORY, + undo_header + TRX_UNDO_HISTORY_NODE, mtr); + +#ifdef HAVE_ATOMIC_BUILTINS + os_atomic_increment_ulint(&trx_sys->rseg_history_len, 1); +#else + mutex_enter(&trx_sys->mutex); + ++trx_sys->rseg_history_len; + mutex_exit(&trx_sys->mutex); +#endif /* HAVE_ATOMIC_BUILTINS */ + + srv_wake_purge_thread_if_not_active(); + + /* Write the trx number to the undo log header */ + mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr); + + /* Write information about delete markings to the undo log header */ + + if (!undo->del_marks) { + mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE, + MLOG_2BYTES, mtr); + } + + if (rseg->last_page_no == FIL_NULL) { + rseg->last_page_no = undo->hdr_page_no; + rseg->last_offset = undo->hdr_offset; + rseg->last_trx_no = trx->no; + rseg->last_del_marks = undo->del_marks; + } +} + +/**********************************************************************//** +Frees an undo log segment which is in the history list. Cuts the end of the +history list at the youngest undo log in this segment. */ +static +void +trx_purge_free_segment( +/*===================*/ + trx_rseg_t* rseg, /*!< in: rollback segment */ + fil_addr_t hdr_addr, /*!< in: the file address of log_hdr */ + ulint n_removed_logs) /*!< in: count of how many undo logs we + will cut off from the end of the + history list */ +{ + mtr_t mtr; + trx_rsegf_t* rseg_hdr; + trx_ulogf_t* log_hdr; + trx_usegf_t* seg_hdr; + ulint seg_size; + ulint hist_size; + ibool marked = FALSE; + + /* fputs("Freeing an update undo log segment\n", stderr); */ + + for (;;) { + page_t* undo_page; + + mtr_start(&mtr); + + mutex_enter(&rseg->mutex); + + rseg_hdr = trx_rsegf_get( + rseg->space, rseg->zip_size, rseg->page_no, &mtr); + + undo_page = trx_undo_page_get( + rseg->space, rseg->zip_size, hdr_addr.page, &mtr); + + seg_hdr = undo_page + TRX_UNDO_SEG_HDR; + log_hdr = undo_page + hdr_addr.boffset; + + /* Mark the last undo log totally purged, so that if the + system crashes, the tail of the undo log will not get accessed + again. The list of pages in the undo log tail gets inconsistent + during the freeing of the segment, and therefore purge should + not try to access them again. */ + + if (!marked) { + mlog_write_ulint( + log_hdr + TRX_UNDO_DEL_MARKS, FALSE, + MLOG_2BYTES, &mtr); + + marked = TRUE; + } + + if (fseg_free_step_not_header( + seg_hdr + TRX_UNDO_FSEG_HEADER, &mtr)) { + + break; + } + + mutex_exit(&rseg->mutex); + + mtr_commit(&mtr); + } + + /* The page list may now be inconsistent, but the length field + stored in the list base node tells us how big it was before we + started the freeing. */ + + seg_size = flst_get_len(seg_hdr + TRX_UNDO_PAGE_LIST, &mtr); + + /* We may free the undo log segment header page; it must be freed + within the same mtr as the undo log header is removed from the + history list: otherwise, in case of a database crash, the segment + could become inaccessible garbage in the file space. */ + + flst_cut_end(rseg_hdr + TRX_RSEG_HISTORY, + log_hdr + TRX_UNDO_HISTORY_NODE, n_removed_logs, &mtr); + +#ifdef HAVE_ATOMIC_BUILTINS + os_atomic_decrement_ulint(&trx_sys->rseg_history_len, n_removed_logs); +#else + mutex_enter(&trx_sys->mutex); + trx_sys->rseg_history_len -= n_removed_logs; + mutex_exit(&trx_sys->mutex); +#endif /* HAVE_ATOMIC_BUILTINS */ + + do { + + /* Here we assume that a file segment with just the header + page can be freed in a few steps, so that the buffer pool + is not flooded with bufferfixed pages: see the note in + fsp0fsp.cc. */ + + } while(!fseg_free_step(seg_hdr + TRX_UNDO_FSEG_HEADER, &mtr)); + + hist_size = mtr_read_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE, + MLOG_4BYTES, &mtr); + ut_ad(hist_size >= seg_size); + + mlog_write_ulint(rseg_hdr + TRX_RSEG_HISTORY_SIZE, + hist_size - seg_size, MLOG_4BYTES, &mtr); + + ut_ad(rseg->curr_size >= seg_size); + + rseg->curr_size -= seg_size; + + mutex_exit(&(rseg->mutex)); + + mtr_commit(&mtr); +} + +/********************************************************************//** +Removes unnecessary history data from a rollback segment. */ +static +void +trx_purge_truncate_rseg_history( +/*============================*/ + trx_rseg_t* rseg, /*!< in: rollback segment */ + const purge_iter_t* limit) /*!< in: truncate offset */ +{ + fil_addr_t hdr_addr; + fil_addr_t prev_hdr_addr; + trx_rsegf_t* rseg_hdr; + page_t* undo_page; + trx_ulogf_t* log_hdr; + trx_usegf_t* seg_hdr; + ulint n_removed_logs = 0; + mtr_t mtr; + trx_id_t undo_trx_no; + + mtr_start(&mtr); + mutex_enter(&(rseg->mutex)); + + rseg_hdr = trx_rsegf_get(rseg->space, rseg->zip_size, + rseg->page_no, &mtr); + + hdr_addr = trx_purge_get_log_from_hist( + flst_get_last(rseg_hdr + TRX_RSEG_HISTORY, &mtr)); +loop: + if (hdr_addr.page == FIL_NULL) { + + mutex_exit(&(rseg->mutex)); + + mtr_commit(&mtr); + + return; + } + + undo_page = trx_undo_page_get(rseg->space, rseg->zip_size, + hdr_addr.page, &mtr); + + log_hdr = undo_page + hdr_addr.boffset; + + undo_trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO); + + if (undo_trx_no >= limit->trx_no) { + + if (undo_trx_no == limit->trx_no) { + + trx_undo_truncate_start( + rseg, rseg->space, hdr_addr.page, + hdr_addr.boffset, limit->undo_no); + } + +#ifdef HAVE_ATOMIC_BUILTINS + os_atomic_decrement_ulint( + &trx_sys->rseg_history_len, n_removed_logs); +#else + mutex_enter(&trx_sys->mutex); + trx_sys->rseg_history_len -= n_removed_logs; + mutex_exit(&trx_sys->mutex); +#endif /* HAVE_ATOMIC_BUILTINS */ + + flst_truncate_end(rseg_hdr + TRX_RSEG_HISTORY, + log_hdr + TRX_UNDO_HISTORY_NODE, + n_removed_logs, &mtr); + + mutex_exit(&(rseg->mutex)); + mtr_commit(&mtr); + + return; + } + + prev_hdr_addr = trx_purge_get_log_from_hist( + flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr)); + n_removed_logs++; + + seg_hdr = undo_page + TRX_UNDO_SEG_HDR; + + if ((mach_read_from_2(seg_hdr + TRX_UNDO_STATE) == TRX_UNDO_TO_PURGE) + && (mach_read_from_2(log_hdr + TRX_UNDO_NEXT_LOG) == 0)) { + + /* We can free the whole log segment */ + + mutex_exit(&(rseg->mutex)); + mtr_commit(&mtr); + + trx_purge_free_segment(rseg, hdr_addr, n_removed_logs); + + n_removed_logs = 0; + } else { + mutex_exit(&(rseg->mutex)); + mtr_commit(&mtr); + } + + mtr_start(&mtr); + mutex_enter(&(rseg->mutex)); + + rseg_hdr = trx_rsegf_get(rseg->space, rseg->zip_size, + rseg->page_no, &mtr); + + hdr_addr = prev_hdr_addr; + + goto loop; +} + +/********************************************************************//** +Removes unnecessary history data from rollback segments. NOTE that when this +function is called, the caller must not have any latches on undo log pages! */ +static +void +trx_purge_truncate_history( +/*========================*/ + purge_iter_t* limit, /*!< in: truncate limit */ + const read_view_t* view) /*!< in: purge view */ +{ + ulint i; + + /* We play safe and set the truncate limit at most to the purge view + low_limit number, though this is not necessary */ + + if (limit->trx_no >= view->low_limit_no) { + limit->trx_no = view->low_limit_no; + limit->undo_no = 0; + } + + ut_ad(limit->trx_no <= purge_sys->view->low_limit_no); + + for (i = 0; i < TRX_SYS_N_RSEGS; ++i) { + trx_rseg_t* rseg = trx_sys->rseg_array[i]; + + if (rseg != NULL) { + ut_a(rseg->id == i); + trx_purge_truncate_rseg_history(rseg, limit); + } + } +} + + +/***********************************************************************//** +Updates the last not yet purged history log info in rseg when we have purged +a whole undo log. Advances also purge_sys->purge_trx_no past the purged log. */ +static +void +trx_purge_rseg_get_next_history_log( +/*================================*/ + trx_rseg_t* rseg, /*!< in: rollback segment */ + ulint* n_pages_handled)/*!< in/out: number of UNDO pages + handled */ +{ + const void* ptr; + page_t* undo_page; + trx_ulogf_t* log_hdr; + fil_addr_t prev_log_addr; + trx_id_t trx_no; + ibool del_marks; + mtr_t mtr; + rseg_queue_t rseg_queue; + + mutex_enter(&(rseg->mutex)); + + ut_a(rseg->last_page_no != FIL_NULL); + + purge_sys->iter.trx_no = rseg->last_trx_no + 1; + purge_sys->iter.undo_no = 0; + purge_sys->next_stored = FALSE; + + mtr_start(&mtr); + + undo_page = trx_undo_page_get_s_latched( + rseg->space, rseg->zip_size, rseg->last_page_no, &mtr); + + log_hdr = undo_page + rseg->last_offset; + + /* Increase the purge page count by one for every handled log */ + + (*n_pages_handled)++; + + prev_log_addr = trx_purge_get_log_from_hist( + flst_get_prev_addr(log_hdr + TRX_UNDO_HISTORY_NODE, &mtr)); + + if (prev_log_addr.page == FIL_NULL) { + /* No logs left in the history list */ + + rseg->last_page_no = FIL_NULL; + + mutex_exit(&(rseg->mutex)); + mtr_commit(&mtr); + + mutex_enter(&trx_sys->mutex); + + /* Add debug code to track history list corruption reported + on the MySQL mailing list on Nov 9, 2004. The fut0lst.cc + file-based list was corrupt. The prev node pointer was + FIL_NULL, even though the list length was over 8 million nodes! + We assume that purge truncates the history list in large + size pieces, and if we here reach the head of the list, the + list cannot be longer than 2000 000 undo logs now. */ + + if (trx_sys->rseg_history_len > 2000000) { + ut_print_timestamp(stderr); + fprintf(stderr, + " InnoDB: Warning: purge reached the" + " head of the history list,\n" + "InnoDB: but its length is still" + " reported as %lu! Make a detailed bug\n" + "InnoDB: report, and submit it" + " to http://bugs.mysql.com\n", + (ulong) trx_sys->rseg_history_len); + ut_ad(0); + } + + mutex_exit(&trx_sys->mutex); + + return; + } + + mutex_exit(&rseg->mutex); + + mtr_commit(&mtr); + + /* Read the trx number and del marks from the previous log header */ + mtr_start(&mtr); + + log_hdr = trx_undo_page_get_s_latched(rseg->space, rseg->zip_size, + prev_log_addr.page, &mtr) + + prev_log_addr.boffset; + + trx_no = mach_read_from_8(log_hdr + TRX_UNDO_TRX_NO); + + del_marks = mach_read_from_2(log_hdr + TRX_UNDO_DEL_MARKS); + + mtr_commit(&mtr); + + mutex_enter(&(rseg->mutex)); + + rseg->last_page_no = prev_log_addr.page; + rseg->last_offset = prev_log_addr.boffset; + rseg->last_trx_no = trx_no; + rseg->last_del_marks = del_marks; + + rseg_queue.rseg = rseg; + rseg_queue.trx_no = rseg->last_trx_no; + + /* Purge can also produce events, however these are already ordered + in the rollback segment and any user generated event will be greater + than the events that Purge produces. ie. Purge can never produce + events from an empty rollback segment. */ + + mutex_enter(&purge_sys->bh_mutex); + + ptr = ib_bh_push(purge_sys->ib_bh, &rseg_queue); + ut_a(ptr != NULL); + + mutex_exit(&purge_sys->bh_mutex); + + mutex_exit(&rseg->mutex); +} + +/***********************************************************************//** +Chooses the rollback segment with the smallest trx_id. +@return zip_size if log is for a compressed table, ULINT_UNDEFINED if + no rollback segments to purge, 0 for non compressed tables. */ +static +ulint +trx_purge_get_rseg_with_min_trx_id( +/*===============================*/ + trx_purge_t* purge_sys) /*!< in/out: purge instance */ + +{ + ulint zip_size = 0; + + mutex_enter(&purge_sys->bh_mutex); + + /* Only purge consumes events from the binary heap, user + threads only produce the events. */ + + if (!ib_bh_is_empty(purge_sys->ib_bh)) { + trx_rseg_t* rseg; + + rseg = ((rseg_queue_t*) ib_bh_first(purge_sys->ib_bh))->rseg; + ib_bh_pop(purge_sys->ib_bh); + + mutex_exit(&purge_sys->bh_mutex); + + purge_sys->rseg = rseg; + } else { + mutex_exit(&purge_sys->bh_mutex); + + purge_sys->rseg = NULL; + + return(ULINT_UNDEFINED); + } + + ut_a(purge_sys->rseg != NULL); + + mutex_enter(&purge_sys->rseg->mutex); + + ut_a(purge_sys->rseg->last_page_no != FIL_NULL); + + /* We assume in purge of externally stored fields that space id is + in the range of UNDO tablespace space ids */ + ut_a(purge_sys->rseg->space <= srv_undo_tablespaces); + + zip_size = purge_sys->rseg->zip_size; + + ut_a(purge_sys->iter.trx_no <= purge_sys->rseg->last_trx_no); + + purge_sys->iter.trx_no = purge_sys->rseg->last_trx_no; + purge_sys->hdr_offset = purge_sys->rseg->last_offset; + purge_sys->hdr_page_no = purge_sys->rseg->last_page_no; + + mutex_exit(&purge_sys->rseg->mutex); + + return(zip_size); +} + +/***********************************************************************//** +Position the purge sys "iterator" on the undo record to use for purging. */ +static +void +trx_purge_read_undo_rec( +/*====================*/ + trx_purge_t* purge_sys, /*!< in/out: purge instance */ + ulint zip_size) /*!< in: block size or 0 */ +{ + ulint offset; + ulint page_no; + ib_uint64_t undo_no; + + purge_sys->hdr_offset = purge_sys->rseg->last_offset; + page_no = purge_sys->hdr_page_no = purge_sys->rseg->last_page_no; + + if (purge_sys->rseg->last_del_marks) { + mtr_t mtr; + trx_undo_rec_t* undo_rec = NULL; + + mtr_start(&mtr); + + undo_rec = trx_undo_get_first_rec( + purge_sys->rseg->space, + zip_size, + purge_sys->hdr_page_no, + purge_sys->hdr_offset, RW_S_LATCH, &mtr); + + if (undo_rec != NULL) { + offset = page_offset(undo_rec); + undo_no = trx_undo_rec_get_undo_no(undo_rec); + page_no = page_get_page_no(page_align(undo_rec)); + } else { + offset = 0; + undo_no = 0; + } + + mtr_commit(&mtr); + } else { + offset = 0; + undo_no = 0; + } + + purge_sys->offset = offset; + purge_sys->page_no = page_no; + purge_sys->iter.undo_no = undo_no; + + purge_sys->next_stored = TRUE; +} + +/***********************************************************************//** +Chooses the next undo log to purge and updates the info in purge_sys. This +function is used to initialize purge_sys when the next record to purge is +not known, and also to update the purge system info on the next record when +purge has handled the whole undo log for a transaction. */ +static +void +trx_purge_choose_next_log(void) +/*===========================*/ +{ + ulint zip_size; + + ut_ad(purge_sys->next_stored == FALSE); + + zip_size = trx_purge_get_rseg_with_min_trx_id(purge_sys); + + if (purge_sys->rseg != NULL) { + trx_purge_read_undo_rec(purge_sys, zip_size); + } else { + /* There is nothing to do yet. */ + os_thread_yield(); + } +} + +/***********************************************************************//** +Gets the next record to purge and updates the info in the purge system. +@return copy of an undo log record or pointer to the dummy undo log record */ +static +trx_undo_rec_t* +trx_purge_get_next_rec( +/*===================*/ + ulint* n_pages_handled,/*!< in/out: number of UNDO pages + handled */ + mem_heap_t* heap) /*!< in: memory heap where copied */ +{ + trx_undo_rec_t* rec; + trx_undo_rec_t* rec_copy; + trx_undo_rec_t* rec2; + page_t* undo_page; + page_t* page; + ulint offset; + ulint page_no; + ulint space; + ulint zip_size; + mtr_t mtr; + + ut_ad(purge_sys->next_stored); + ut_ad(purge_sys->iter.trx_no < purge_sys->view->low_limit_no); + + space = purge_sys->rseg->space; + zip_size = purge_sys->rseg->zip_size; + page_no = purge_sys->page_no; + offset = purge_sys->offset; + + if (offset == 0) { + /* It is the dummy undo log record, which means that there is + no need to purge this undo log */ + + trx_purge_rseg_get_next_history_log( + purge_sys->rseg, n_pages_handled); + + /* Look for the next undo log and record to purge */ + + trx_purge_choose_next_log(); + + return(&trx_purge_dummy_rec); + } + + mtr_start(&mtr); + + undo_page = trx_undo_page_get_s_latched(space, zip_size, page_no, &mtr); + + rec = undo_page + offset; + + rec2 = rec; + + for (;;) { + ulint type; + trx_undo_rec_t* next_rec; + ulint cmpl_info; + + /* Try first to find the next record which requires a purge + operation from the same page of the same undo log */ + + next_rec = trx_undo_page_get_next_rec( + rec2, purge_sys->hdr_page_no, purge_sys->hdr_offset); + + if (next_rec == NULL) { + rec2 = trx_undo_get_next_rec( + rec2, purge_sys->hdr_page_no, + purge_sys->hdr_offset, &mtr); + break; + } + + rec2 = next_rec; + + type = trx_undo_rec_get_type(rec2); + + if (type == TRX_UNDO_DEL_MARK_REC) { + + break; + } + + cmpl_info = trx_undo_rec_get_cmpl_info(rec2); + + if (trx_undo_rec_get_extern_storage(rec2)) { + break; + } + + if ((type == TRX_UNDO_UPD_EXIST_REC) + && !(cmpl_info & UPD_NODE_NO_ORD_CHANGE)) { + break; + } + } + + if (rec2 == NULL) { + mtr_commit(&mtr); + + trx_purge_rseg_get_next_history_log( + purge_sys->rseg, n_pages_handled); + + /* Look for the next undo log and record to purge */ + + trx_purge_choose_next_log(); + + mtr_start(&mtr); + + undo_page = trx_undo_page_get_s_latched( + space, zip_size, page_no, &mtr); + + rec = undo_page + offset; + } else { + page = page_align(rec2); + + purge_sys->offset = rec2 - page; + purge_sys->page_no = page_get_page_no(page); + purge_sys->iter.undo_no = trx_undo_rec_get_undo_no(rec2); + + if (undo_page != page) { + /* We advance to a new page of the undo log: */ + (*n_pages_handled)++; + } + } + + rec_copy = trx_undo_rec_copy(rec, heap); + + mtr_commit(&mtr); + + return(rec_copy); +} + +/********************************************************************//** +Fetches the next undo log record from the history list to purge. It must be +released with the corresponding release function. +@return copy of an undo log record or pointer to trx_purge_dummy_rec, +if the whole undo log can skipped in purge; NULL if none left */ +static +trx_undo_rec_t* +trx_purge_fetch_next_rec( +/*=====================*/ + roll_ptr_t* roll_ptr, /*!< out: roll pointer to undo record */ + ulint* n_pages_handled,/*!< in/out: number of UNDO log pages + handled */ + mem_heap_t* heap) /*!< in: memory heap where copied */ +{ + if (!purge_sys->next_stored) { + trx_purge_choose_next_log(); + + if (!purge_sys->next_stored) { + + if (srv_print_thread_releases) { + fprintf(stderr, + "Purge: No logs left in the" + " history list\n"); + } + + return(NULL); + } + } + + if (purge_sys->iter.trx_no >= purge_sys->view->low_limit_no) { + + return(NULL); + } + + /* fprintf(stderr, "Thread %lu purging trx %llu undo record %llu\n", + os_thread_get_curr_id(), iter->trx_no, iter->undo_no); */ + + *roll_ptr = trx_undo_build_roll_ptr( + FALSE, purge_sys->rseg->id, + purge_sys->page_no, purge_sys->offset); + + /* The following call will advance the stored values of the + purge iterator. */ + + return(trx_purge_get_next_rec(n_pages_handled, heap)); +} + +/*******************************************************************//** +This function runs a purge batch. +@return number of undo log pages handled in the batch */ +static +ulint +trx_purge_attach_undo_recs( +/*=======================*/ + ulint n_purge_threads,/*!< in: number of purge threads */ + trx_purge_t* purge_sys, /*!< in/out: purge instance */ + purge_iter_t* limit, /*!< out: records read up to */ + ulint batch_size) /*!< in: no. of pages to purge */ +{ + que_thr_t* thr; + ulint i = 0; + ulint n_pages_handled = 0; + ulint n_thrs = UT_LIST_GET_LEN(purge_sys->query->thrs); + + ut_a(n_purge_threads > 0); + + *limit = purge_sys->iter; + + /* Debug code to validate some pre-requisites and reset done flag. */ + for (thr = UT_LIST_GET_FIRST(purge_sys->query->thrs); + thr != NULL && i < n_purge_threads; + thr = UT_LIST_GET_NEXT(thrs, thr), ++i) { + + purge_node_t* node; + + /* Get the purge node. */ + node = (purge_node_t*) thr->child; + + ut_a(que_node_get_type(node) == QUE_NODE_PURGE); + ut_a(node->undo_recs == NULL); + ut_a(node->done); + + node->done = FALSE; + } + + /* There should never be fewer nodes than threads, the inverse + however is allowed because we only use purge threads as needed. */ + ut_a(i == n_purge_threads); + + /* Fetch and parse the UNDO records. The UNDO records are added + to a per purge node vector. */ + thr = UT_LIST_GET_FIRST(purge_sys->query->thrs); + ut_a(n_thrs > 0 && thr != NULL); + + ut_ad(trx_purge_check_limit()); + + i = 0; + + for (;;) { + purge_node_t* node; + trx_purge_rec_t* purge_rec; + + ut_a(!thr->is_active); + + /* Get the purge node. */ + node = (purge_node_t*) thr->child; + ut_a(que_node_get_type(node) == QUE_NODE_PURGE); + + purge_rec = static_cast<trx_purge_rec_t*>( + mem_heap_zalloc(node->heap, sizeof(*purge_rec))); + + /* Track the max {trx_id, undo_no} for truncating the + UNDO logs once we have purged the records. */ + + if (purge_sys->iter.trx_no > limit->trx_no + || (purge_sys->iter.trx_no == limit->trx_no + && purge_sys->iter.undo_no >= limit->undo_no)) { + + *limit = purge_sys->iter; + } + + /* Fetch the next record, and advance the purge_sys->iter. */ + purge_rec->undo_rec = trx_purge_fetch_next_rec( + &purge_rec->roll_ptr, &n_pages_handled, node->heap); + + if (purge_rec->undo_rec != NULL) { + + if (node->undo_recs == NULL) { + node->undo_recs = ib_vector_create( + ib_heap_allocator_create(node->heap), + sizeof(trx_purge_rec_t), + batch_size); + } else { + ut_a(!ib_vector_is_empty(node->undo_recs)); + } + + ib_vector_push(node->undo_recs, purge_rec); + + if (n_pages_handled >= batch_size) { + + break; + } + } else { + break; + } + + thr = UT_LIST_GET_NEXT(thrs, thr); + + if (!(++i % n_purge_threads)) { + thr = UT_LIST_GET_FIRST(purge_sys->query->thrs); + } + + ut_a(thr != NULL); + } + + ut_ad(trx_purge_check_limit()); + + return(n_pages_handled); +} + +/*******************************************************************//** +Calculate the DML delay required. +@return delay in microseconds or ULINT_MAX */ +static +ulint +trx_purge_dml_delay(void) +/*=====================*/ +{ + /* Determine how much data manipulation language (DML) statements + need to be delayed in order to reduce the lagging of the purge + thread. */ + ulint delay = 0; /* in microseconds; default: no delay */ + + /* If purge lag is set (ie. > 0) then calculate the new DML delay. + Note: we do a dirty read of the trx_sys_t data structure here, + without holding trx_sys->mutex. */ + + if (srv_max_purge_lag > 0) { + float ratio; + + ratio = float(trx_sys->rseg_history_len) / srv_max_purge_lag; + + if (ratio > 1.0) { + /* If the history list length exceeds the + srv_max_purge_lag, the data manipulation + statements are delayed by at least 5000 + microseconds. */ + delay = (ulint) ((ratio - .5) * 10000); + } + + if (delay > srv_max_purge_lag_delay) { + delay = srv_max_purge_lag_delay; + } + + MONITOR_SET(MONITOR_DML_PURGE_DELAY, delay); + } + + return(delay); +} + +/*******************************************************************//** +Wait for pending purge jobs to complete. */ +static +void +trx_purge_wait_for_workers_to_complete( +/*===================================*/ + trx_purge_t* purge_sys) /*!< in: purge instance */ +{ + ulint n_submitted = purge_sys->n_submitted; + +#ifdef HAVE_ATOMIC_BUILTINS + /* Ensure that the work queue empties out. */ + while (!os_compare_and_swap_ulint( + &purge_sys->n_completed, n_submitted, n_submitted)) { +#else + mutex_enter(&purge_sys->bh_mutex); + + while (purge_sys->n_completed < n_submitted) { +#endif /* HAVE_ATOMIC_BUILTINS */ + +#ifndef HAVE_ATOMIC_BUILTINS + mutex_exit(&purge_sys->bh_mutex); +#endif /* !HAVE_ATOMIC_BUILTINS */ + + if (srv_get_task_queue_length() > 0) { + srv_release_threads(SRV_WORKER, 1); + } + + os_thread_yield(); + +#ifndef HAVE_ATOMIC_BUILTINS + mutex_enter(&purge_sys->bh_mutex); +#endif /* !HAVE_ATOMIC_BUILTINS */ + } + +#ifndef HAVE_ATOMIC_BUILTINS + mutex_exit(&purge_sys->bh_mutex); +#endif /* !HAVE_ATOMIC_BUILTINS */ + + /* None of the worker threads should be doing any work. */ + ut_a(purge_sys->n_submitted == purge_sys->n_completed); + + /* There should be no outstanding tasks as long + as the worker threads are active. */ + ut_a(srv_get_task_queue_length() == 0); +} + +/******************************************************************//** +Remove old historical changes from the rollback segments. */ +static +void +trx_purge_truncate(void) +/*====================*/ +{ + ut_ad(trx_purge_check_limit()); + + if (purge_sys->limit.trx_no == 0) { + trx_purge_truncate_history(&purge_sys->iter, purge_sys->view); + } else { + trx_purge_truncate_history(&purge_sys->limit, purge_sys->view); + } +} + +/*******************************************************************//** +This function runs a purge batch. +@return number of undo log pages handled in the batch */ +UNIV_INTERN +ulint +trx_purge( +/*======*/ + ulint n_purge_threads, /*!< in: number of purge tasks + to submit to the work queue */ + ulint batch_size, /*!< in: the maximum number of records + to purge in one batch */ + bool truncate) /*!< in: truncate history if true */ +{ + que_thr_t* thr = NULL; + ulint n_pages_handled; + + ut_a(n_purge_threads > 0); + + srv_dml_needed_delay = trx_purge_dml_delay(); + + /* The number of tasks submitted should be completed. */ + ut_a(purge_sys->n_submitted == purge_sys->n_completed); + + rw_lock_x_lock(&purge_sys->latch); + + purge_sys->view = NULL; + + mem_heap_empty(purge_sys->heap); + + purge_sys->view = read_view_purge_open(purge_sys->heap); + + rw_lock_x_unlock(&purge_sys->latch); + + /* Fetch the UNDO recs that need to be purged. */ + n_pages_handled = trx_purge_attach_undo_recs( + n_purge_threads, purge_sys, &purge_sys->limit, batch_size); + + /* Do we do an asynchronous purge or not ? */ + if (n_purge_threads > 1) { + ulint i = 0; + + /* Submit the tasks to the work queue. */ + for (i = 0; i < n_purge_threads - 1; ++i) { + thr = que_fork_scheduler_round_robin( + purge_sys->query, thr); + + ut_a(thr != NULL); + + srv_que_task_enqueue_low(thr); + } + + thr = que_fork_scheduler_round_robin(purge_sys->query, thr); + ut_a(thr != NULL); + + purge_sys->n_submitted += n_purge_threads - 1; + + goto run_synchronously; + + /* Do it synchronously. */ + } else { + thr = que_fork_scheduler_round_robin(purge_sys->query, NULL); + ut_ad(thr); + +run_synchronously: + ++purge_sys->n_submitted; + + que_run_threads(thr); + + os_atomic_inc_ulint( + &purge_sys->bh_mutex, &purge_sys->n_completed, 1); + + if (n_purge_threads > 1) { + trx_purge_wait_for_workers_to_complete(purge_sys); + } + } + + ut_a(purge_sys->n_submitted == purge_sys->n_completed); + + if (truncate) { + trx_purge_truncate(); + } + + MONITOR_INC_VALUE(MONITOR_PURGE_INVOKED, 1); + MONITOR_INC_VALUE(MONITOR_PURGE_N_PAGE_HANDLED, n_pages_handled); + + return(n_pages_handled); +} + +/*******************************************************************//** +Get the purge state. +@return purge state. */ +UNIV_INTERN +purge_state_t +trx_purge_state(void) +/*=================*/ +{ + purge_state_t state; + + rw_lock_x_lock(&purge_sys->latch); + + state = purge_sys->state; + + rw_lock_x_unlock(&purge_sys->latch); + + return(state); +} + +/*******************************************************************//** +Stop purge and wait for it to stop, move to PURGE_STATE_STOP. */ +UNIV_INTERN +void +trx_purge_stop(void) +/*================*/ +{ + purge_state_t state; + ib_int64_t sig_count = os_event_reset(purge_sys->event); + + ut_a(srv_n_purge_threads > 0); + + rw_lock_x_lock(&purge_sys->latch); + + ut_a(purge_sys->state != PURGE_STATE_INIT); + ut_a(purge_sys->state != PURGE_STATE_EXIT); + + ++purge_sys->n_stop; + + state = purge_sys->state; + + if (state == PURGE_STATE_RUN) { + ut_print_timestamp(stderr); + fprintf(stderr, " InnoDB: Stopping purge.\n"); + + /* We need to wakeup the purge thread in case it is suspended, + so that it can acknowledge the state change. */ + + srv_wake_purge_thread_if_not_active(); + } + + purge_sys->state = PURGE_STATE_STOP; + + rw_lock_x_unlock(&purge_sys->latch); + + if (state != PURGE_STATE_STOP) { + + /* Wait for purge coordinator to signal that it + is suspended. */ + os_event_wait_low(purge_sys->event, sig_count); + } + + MONITOR_INC_VALUE(MONITOR_PURGE_STOP_COUNT, 1); +} + +/*******************************************************************//** +Resume purge, move to PURGE_STATE_RUN. */ +UNIV_INTERN +void +trx_purge_run(void) +/*===============*/ +{ + rw_lock_x_lock(&purge_sys->latch); + + ut_a(purge_sys->state != PURGE_STATE_INIT); + ut_a(purge_sys->state != PURGE_STATE_EXIT); + + if (purge_sys->n_stop > 0) { + + ut_a(purge_sys->state == PURGE_STATE_STOP); + + --purge_sys->n_stop; + + if (purge_sys->n_stop == 0) { + + ut_print_timestamp(stderr); + fprintf(stderr, " InnoDB: Resuming purge.\n"); + + purge_sys->state = PURGE_STATE_RUN; + } + + MONITOR_INC_VALUE(MONITOR_PURGE_RESUME_COUNT, 1); + } else { + ut_a(purge_sys->state == PURGE_STATE_RUN); + } + + rw_lock_x_unlock(&purge_sys->latch); + + srv_wake_purge_thread_if_not_active(); +} |