summaryrefslogtreecommitdiff
path: root/storage/xtradb/trx/trx0purge.c
diff options
context:
space:
mode:
Diffstat (limited to 'storage/xtradb/trx/trx0purge.c')
-rw-r--r--storage/xtradb/trx/trx0purge.c124
1 files changed, 120 insertions, 4 deletions
diff --git a/storage/xtradb/trx/trx0purge.c b/storage/xtradb/trx/trx0purge.c
index cd79fd1c315..41e16b35e85 100644
--- a/storage/xtradb/trx/trx0purge.c
+++ b/storage/xtradb/trx/trx0purge.c
@@ -184,8 +184,9 @@ this query graph.
@return own: the query graph */
static
que_t*
-trx_purge_graph_build(void)
+trx_purge_graph_build(
/*=======================*/
+ trx_t* trx)
{
mem_heap_t* heap;
que_fork_t* fork;
@@ -194,7 +195,7 @@ trx_purge_graph_build(void)
heap = mem_heap_create(512);
fork = que_fork_create(NULL, NULL, QUE_FORK_PURGE, heap);
- fork->trx = purge_sys->trx;
+ fork->trx = trx;
thr = que_thr_create(fork, heap);
@@ -243,10 +244,73 @@ trx_purge_sys_create(void)
ut_a(trx_start_low(purge_sys->trx, ULINT_UNDEFINED));
- purge_sys->query = trx_purge_graph_build();
+ purge_sys->query = trx_purge_graph_build(purge_sys->trx);
purge_sys->view = read_view_oldest_copy_or_open_new(ut_dulint_zero,
purge_sys->heap);
+
+ purge_sys->n_worker = 0;
+ if (srv_use_purge_thread > 1) {
+ /* Use worker threads */
+ ulint i;
+
+ purge_sys->n_worker = srv_use_purge_thread - 1;
+
+ purge_sys->sess_arr = mem_alloc(sizeof(sess_t*) * purge_sys->n_worker);
+ purge_sys->trx_arr = mem_alloc(sizeof(trx_t*) * purge_sys->n_worker);
+ purge_sys->query_arr = mem_alloc(sizeof(que_t*) * purge_sys->n_worker);
+
+ purge_sys->worker_event = os_event_create(NULL);
+ os_event_reset(purge_sys->worker_event);
+
+ for (i = 0; i < purge_sys->n_worker; i++) {
+ purge_sys->sess_arr[i] = sess_open();
+
+ purge_sys->trx_arr[i] = purge_sys->sess_arr[i]->trx;
+ purge_sys->trx_arr[i]->is_purge = 1;
+ ut_a(trx_start_low(purge_sys->trx_arr[i], ULINT_UNDEFINED));
+
+ purge_sys->query_arr[i] = trx_purge_graph_build(purge_sys->trx_arr[i]);
+ }
+ }
+}
+
+/************************************************************************
+Frees the global purge system control structure. */
+UNIV_INTERN
+void
+trx_purge_sys_close(void)
+/*======================*/
+{
+ ut_ad(!mutex_own(&kernel_mutex));
+
+ que_graph_free(purge_sys->query);
+
+ ut_a(purge_sys->sess->trx->is_purge);
+ purge_sys->sess->trx->conc_state = TRX_NOT_STARTED;
+ sess_close(purge_sys->sess);
+ purge_sys->sess = NULL;
+
+ if (purge_sys->view != NULL) {
+ /* Because acquiring the kernel mutex is a pre-condition
+ of read_view_close(). We don't really need it here. */
+ mutex_enter(&kernel_mutex);
+
+ read_view_close(purge_sys->view);
+ purge_sys->view = NULL;
+
+ mutex_exit(&kernel_mutex);
+ }
+
+ trx_undo_arr_free(purge_sys->arr);
+
+ rw_lock_free(&purge_sys->latch);
+ mutex_free(&purge_sys->mutex);
+
+ mem_heap_free(purge_sys->heap);
+ mem_free(purge_sys);
+
+ purge_sys = NULL;
}
/*================ UNDO LOG HISTORY LIST =============================*/
@@ -1110,7 +1174,7 @@ trx_purge(void)
/* Handle at most 20 undo log pages in one purge batch */
- purge_sys->handle_limit = purge_sys->n_pages_handled + 20;
+ purge_sys->handle_limit = purge_sys->n_pages_handled + 20 * (srv_use_purge_thread + 1);
old_pages_handled = purge_sys->n_pages_handled;
@@ -1129,6 +1193,9 @@ trx_purge(void)
mutex_exit(&kernel_mutex);
+ if (purge_sys->n_worker)
+ os_event_set(purge_sys->worker_event);
+
/* srv_que_task_enqueue(thr2); */
if (srv_print_thread_releases) {
@@ -1138,6 +1205,9 @@ trx_purge(void)
que_run_threads(thr);
+ if (purge_sys->n_worker)
+ os_event_reset(purge_sys->worker_event);
+
if (srv_print_thread_releases) {
fprintf(stderr,
@@ -1148,6 +1218,52 @@ trx_purge(void)
return(purge_sys->n_pages_handled - old_pages_handled);
}
+/**********************************************************************
+This function runs a purge worker batch */
+UNIV_INTERN
+void
+trx_purge_worker(
+/*=============*/
+ ulint worker_id)
+{
+ que_thr_t* thr;
+
+ mutex_enter(&kernel_mutex);
+
+ thr = que_fork_start_command(purge_sys->query_arr[worker_id]);
+
+ ut_ad(thr);
+
+ mutex_exit(&kernel_mutex);
+
+ que_run_threads(thr);
+
+ if (purge_sys->state == TRX_STOP_PURGE) { /* optimistic */
+ os_event_reset(purge_sys->worker_event);
+ }
+}
+
+/**********************************************************************
+This function waits the event for worker batch */
+UNIV_INTERN
+void
+trx_purge_worker_wait(void)
+/*=======================*/
+{
+ os_event_wait(purge_sys->worker_event);
+}
+
+/**********************************************************************
+This function wakes the waiting worker batch */
+UNIV_INTERN
+void
+trx_purge_worker_wake(void)
+/*=======================*/
+{
+ if (purge_sys->n_worker)
+ os_event_set(purge_sys->worker_event);
+}
+
/******************************************************************//**
Prints information of the purge system to stderr. */
UNIV_INTERN