diff options
Diffstat (limited to 'storage/xtradb/trx/trx0purge.c')
-rw-r--r-- | storage/xtradb/trx/trx0purge.c | 124 |
1 files changed, 120 insertions, 4 deletions
diff --git a/storage/xtradb/trx/trx0purge.c b/storage/xtradb/trx/trx0purge.c index cd79fd1c315..41e16b35e85 100644 --- a/storage/xtradb/trx/trx0purge.c +++ b/storage/xtradb/trx/trx0purge.c @@ -184,8 +184,9 @@ this query graph. @return own: the query graph */ static que_t* -trx_purge_graph_build(void) +trx_purge_graph_build( /*=======================*/ + trx_t* trx) { mem_heap_t* heap; que_fork_t* fork; @@ -194,7 +195,7 @@ trx_purge_graph_build(void) heap = mem_heap_create(512); fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap); - fork->trx = purge_sys->trx; + fork->trx = trx; thr = que_thr_create(fork, heap); @@ -243,10 +244,73 @@ trx_purge_sys_create(void) ut_a(trx_start_low(purge_sys->trx, ULINT_UNDEFINED)); - purge_sys->query = trx_purge_graph_build(); + purge_sys->query = trx_purge_graph_build(purge_sys->trx); purge_sys->view = read_view_oldest_copy_or_open_new(ut_dulint_zero, purge_sys->heap); + + purge_sys->n_worker = 0; + if (srv_use_purge_thread > 1) { + /* Use worker threads */ + ulint i; + + purge_sys->n_worker = srv_use_purge_thread - 1; + + purge_sys->sess_arr = mem_alloc(sizeof(sess_t*) * purge_sys->n_worker); + purge_sys->trx_arr = mem_alloc(sizeof(trx_t*) * purge_sys->n_worker); + purge_sys->query_arr = mem_alloc(sizeof(que_t*) * purge_sys->n_worker); + + purge_sys->worker_event = os_event_create(NULL); + os_event_reset(purge_sys->worker_event); + + for (i = 0; i < purge_sys->n_worker; i++) { + purge_sys->sess_arr[i] = sess_open(); + + purge_sys->trx_arr[i] = purge_sys->sess_arr[i]->trx; + purge_sys->trx_arr[i]->is_purge = 1; + ut_a(trx_start_low(purge_sys->trx_arr[i], ULINT_UNDEFINED)); + + purge_sys->query_arr[i] = trx_purge_graph_build(purge_sys->trx_arr[i]); + } + } +} + +/************************************************************************ +Frees the global purge system control structure. */ +UNIV_INTERN +void +trx_purge_sys_close(void) +/*======================*/ +{ + ut_ad(!mutex_own(&kernel_mutex)); + + que_graph_free(purge_sys->query); + + ut_a(purge_sys->sess->trx->is_purge); + purge_sys->sess->trx->conc_state = TRX_NOT_STARTED; + sess_close(purge_sys->sess); + purge_sys->sess = NULL; + + if (purge_sys->view != NULL) { + /* Because acquiring the kernel mutex is a pre-condition + of read_view_close(). We don't really need it here. */ + mutex_enter(&kernel_mutex); + + read_view_close(purge_sys->view); + purge_sys->view = NULL; + + mutex_exit(&kernel_mutex); + } + + trx_undo_arr_free(purge_sys->arr); + + rw_lock_free(&purge_sys->latch); + mutex_free(&purge_sys->mutex); + + mem_heap_free(purge_sys->heap); + mem_free(purge_sys); + + purge_sys = NULL; } /*================ UNDO LOG HISTORY LIST =============================*/ @@ -1110,7 +1174,7 @@ trx_purge(void) /* Handle at most 20 undo log pages in one purge batch */ - purge_sys->handle_limit = purge_sys->n_pages_handled + 20; + purge_sys->handle_limit = purge_sys->n_pages_handled + 20 * (srv_use_purge_thread + 1); old_pages_handled = purge_sys->n_pages_handled; @@ -1129,6 +1193,9 @@ trx_purge(void) mutex_exit(&kernel_mutex); + if (purge_sys->n_worker) + os_event_set(purge_sys->worker_event); + /* srv_que_task_enqueue(thr2); */ if (srv_print_thread_releases) { @@ -1138,6 +1205,9 @@ trx_purge(void) que_run_threads(thr); + if (purge_sys->n_worker) + os_event_reset(purge_sys->worker_event); + if (srv_print_thread_releases) { fprintf(stderr, @@ -1148,6 +1218,52 @@ trx_purge(void) return(purge_sys->n_pages_handled - old_pages_handled); } +/********************************************************************** +This function runs a purge worker batch */ +UNIV_INTERN +void +trx_purge_worker( +/*=============*/ + ulint worker_id) +{ + que_thr_t* thr; + + mutex_enter(&kernel_mutex); + + thr = que_fork_start_command(purge_sys->query_arr[worker_id]); + + ut_ad(thr); + + mutex_exit(&kernel_mutex); + + que_run_threads(thr); + + if (purge_sys->state == TRX_STOP_PURGE) { /* optimistic */ + os_event_reset(purge_sys->worker_event); + } +} + +/********************************************************************** +This function waits the event for worker batch */ +UNIV_INTERN +void +trx_purge_worker_wait(void) +/*=======================*/ +{ + os_event_wait(purge_sys->worker_event); +} + +/********************************************************************** +This function wakes the waiting worker batch */ +UNIV_INTERN +void +trx_purge_worker_wake(void) +/*=======================*/ +{ + if (purge_sys->n_worker) + os_event_set(purge_sys->worker_event); +} + /******************************************************************//** Prints information of the purge system to stderr. */ UNIV_INTERN |