diff options
Diffstat (limited to 'innobase/que/que0que.c')
-rw-r--r-- | innobase/que/que0que.c | 117 |
1 files changed, 92 insertions, 25 deletions
diff --git a/innobase/que/que0que.c b/innobase/que/que0que.c index 08bf7ce3e59..22878dec27f 100644 --- a/innobase/que/que0que.c +++ b/innobase/que/que0que.c @@ -12,6 +12,7 @@ Created 5/27/1996 Heikki Tuuri #include "que0que.ic" #endif +#include "srv0que.h" #include "usr0sess.h" #include "trx0trx.h" #include "trx0roll.h" @@ -174,15 +175,19 @@ a single worker thread to execute it. This function should be used to end the wait state of a query thread waiting for a lock or a stored procedure completion. */ -que_thr_t* +void que_thr_end_wait( /*=============*/ - /* out: next query thread to run; - NULL if none */ - que_thr_t* thr) /* in: query thread in the + que_thr_t* thr, /* in: query thread in the QUE_THR_LOCK_WAIT, or QUE_THR_PROCEDURE_WAIT, or QUE_THR_SIG_REPLY_WAIT state */ + que_thr_t** next_thr) /* in/out: next query thread to run; + if the value which is passed in is + a pointer to a NULL pointer, then the + calling function can start running + a new query thread; if NULL is passed + as the parameter, it is ignored */ { ibool was_active; @@ -190,8 +195,6 @@ que_thr_end_wait( ut_ad(mutex_own(&kernel_mutex)); #endif /* UNIV_SYNC_DEBUG */ ut_ad(thr); - ut_ad(next_thr); - ut_ad(*next_thr == NULL); ut_ad((thr->state == QUE_THR_LOCK_WAIT) || (thr->state == QUE_THR_PROCEDURE_WAIT) || (thr->state == QUE_THR_SIG_REPLY_WAIT)); @@ -203,8 +206,18 @@ que_thr_end_wait( que_thr_move_to_run_state(thr); - return(was_active ? NULL : thr); -} + if (was_active) { + + return; + } + + if (next_thr && *next_thr == NULL) { + *next_thr = thr; + } else { + ut_a(0); + srv_que_task_enqueue_low(thr); + } +} /************************************************************************** Same as que_thr_end_wait, but no parameter next_thr available. */ @@ -241,6 +254,8 @@ que_thr_end_wait_no_next_thr( for the lock to be released: */ srv_release_mysql_thread_if_suspended(thr); + + /* srv_que_task_enqueue_low(thr); */ } /************************************************************************** @@ -341,6 +356,49 @@ que_fork_start_command( return(NULL); } +/************************************************************************** +After signal handling is finished, returns control to a query graph error +handling routine. (Currently, just returns the control to the root of the +graph so that the graph can communicate an error message to the client.) */ + +void +que_fork_error_handle( +/*==================*/ + trx_t* trx __attribute__((unused)), /* in: trx */ + que_t* fork) /* in: query graph which was run before signal + handling started, NULL not allowed */ +{ + que_thr_t* thr; + +#ifdef UNIV_SYNC_DEBUG + ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ + ut_ad(trx->sess->state == SESS_ERROR); + ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0); + ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0); + + thr = UT_LIST_GET_FIRST(fork->thrs); + + while (thr != NULL) { + ut_ad(!thr->is_active); + ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT); + ut_ad(thr->state != QUE_THR_LOCK_WAIT); + + thr->run_node = thr; + thr->prev_node = thr->child; + thr->state = QUE_THR_COMPLETED; + + thr = UT_LIST_GET_NEXT(thrs, thr); + } + + thr = UT_LIST_GET_FIRST(fork->thrs); + + que_thr_move_to_run_state(thr); + + ut_a(0); + srv_que_task_enqueue_low(thr); +} + /******************************************************************** Tests if all the query threads in the same fork have a given state. */ UNIV_INLINE @@ -709,18 +767,22 @@ this function may only be called from inside que_run_threads or que_thr_check_if_switch! These restrictions exist to make the rollback code easier to maintain. */ static -que_thr_t* +void que_thr_dec_refer_count( /*====================*/ - /* out: next query thread to run */ - que_thr_t* thr) /* in: query thread */ + que_thr_t* thr, /* in: query thread */ + que_thr_t** next_thr) /* in/out: next query thread to run; + if the value which is passed in is + a pointer to a NULL pointer, then the + calling function can start running + a new query thread */ { que_fork_t* fork; trx_t* trx; sess_t* sess; ulint fork_type; - que_thr_t* next_thr = NULL; - + ibool stopped; + fork = thr->common.parent; trx = thr->graph->trx; sess = trx->sess; @@ -731,7 +793,9 @@ que_thr_dec_refer_count( if (thr->state == QUE_THR_RUNNING) { - if (!que_thr_stop(thr)) { + stopped = que_thr_stop(thr); + + if (!stopped) { /* The reason for the thr suspension or wait was already canceled before we came here: continue running the thread */ @@ -739,9 +803,16 @@ que_thr_dec_refer_count( /* fputs("!!!!!!!! Wait already ended: continue thr\n", stderr); */ + if (next_thr && *next_thr == NULL) { + *next_thr = thr; + } else { + ut_a(0); + srv_que_task_enqueue_low(thr); + } + mutex_exit(&kernel_mutex); - return(thr); + return; } } @@ -757,7 +828,7 @@ que_thr_dec_refer_count( mutex_exit(&kernel_mutex); - return(next_thr); + return; } fork_type = fork->fork_type; @@ -773,7 +844,7 @@ que_thr_dec_refer_count( ut_ad(UT_LIST_GET_LEN(trx->signals) > 0); ut_ad(trx->handling_signals == TRUE); - next_thr = trx_finish_rollback_off_kernel(fork, trx); + trx_finish_rollback_off_kernel(fork, trx, next_thr); } else if (fork_type == QUE_FORK_PURGE) { @@ -795,7 +866,7 @@ que_thr_dec_refer_count( zero, then we start processing a signal; from it we may get a new query thread to run */ - next_thr = trx_sig_start_handle(trx); + trx_sig_start_handle(trx, next_thr); } if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) { @@ -804,8 +875,6 @@ que_thr_dec_refer_count( } mutex_exit(&kernel_mutex); - - return(next_thr); } /************************************************************************** @@ -970,10 +1039,9 @@ que_thr_stop_for_mysql_no_error( trx->n_active_thrs--; } -#ifdef UNIV_DEBUG /************************************************************************** Prints info of an SQL query graph node. */ -static + void que_node_print_info( /*================*/ @@ -1030,7 +1098,6 @@ que_node_print_info( fprintf(stderr, "Node type %lu: %s, address %p\n", (ulong) type, str, node); } -#endif /* UNIV_DEBUG */ /************************************************************************** Performs an execution step on a query thread. */ @@ -1179,7 +1246,6 @@ loop: /*-------------------------*/ next_thr = que_thr_step(thr); /*-------------------------*/ - ut_a(next_thr == thr || next_thr == NULL); /* Test the effect on performance of adding extra mutex reservations */ @@ -1192,7 +1258,8 @@ loop: loop_count++; if (next_thr != thr) { - next_thr = que_thr_dec_refer_count(thr); + ut_a(next_thr == NULL); + que_thr_dec_refer_count(thr, &next_thr); if (next_thr == NULL) { |