diff options
Diffstat (limited to 'innobase/trx/trx0trx.c')
-rw-r--r-- | innobase/trx/trx0trx.c | 165 |
1 files changed, 129 insertions, 36 deletions
diff --git a/innobase/trx/trx0trx.c b/innobase/trx/trx0trx.c index afef08dc100..a9aae48dae6 100644 --- a/innobase/trx/trx0trx.c +++ b/innobase/trx/trx0trx.c @@ -897,15 +897,18 @@ trx_assign_read_view( /******************************************************************** Commits a transaction. NOTE that the kernel mutex is temporarily released. */ static -que_thr_t* +void trx_handle_commit_sig_off_kernel( /*=============================*/ - /* out: next query thread to run */ - trx_t* trx) /* in: transaction */ + trx_t* trx, /* in: transaction */ + que_thr_t** next_thr) /* in/out: next query thread to run; + if the value which is passed in is + a pointer to a NULL pointer, then the + calling function can start running + a new query thread */ { trx_sig_t* sig; trx_sig_t* next_sig; - que_thr_t* next_thr = NULL; #ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); @@ -927,8 +930,7 @@ trx_handle_commit_sig_off_kernel( if (sig->type == TRX_SIG_COMMIT) { - ut_a(next_thr == NULL); - next_thr = trx_sig_reply(sig); + trx_sig_reply(sig, next_thr); trx_sig_remove(trx, sig); } @@ -936,8 +938,6 @@ trx_handle_commit_sig_off_kernel( } trx->que_state = TRX_QUE_RUNNING; - - return(next_thr); } /*************************************************************** @@ -999,6 +999,39 @@ trx_lock_wait_to_suspended( trx->que_state = TRX_QUE_RUNNING; } +/*************************************************************** +Moves the query threads in the sig reply wait list of trx to the SUSPENDED +state. */ +static +void +trx_sig_reply_wait_to_suspended( +/*============================*/ + trx_t* trx) /* in: transaction */ +{ + trx_sig_t* sig; + que_thr_t* thr; + +#ifdef UNIV_SYNC_DEBUG + ut_ad(mutex_own(&kernel_mutex)); +#endif /* UNIV_SYNC_DEBUG */ + + sig = UT_LIST_GET_FIRST(trx->reply_signals); + + while (sig != NULL) { + thr = sig->receiver; + + ut_ad(thr->state == QUE_THR_SIG_REPLY_WAIT); + + thr->state = QUE_THR_SUSPENDED; + + sig->receiver = NULL; + + UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig); + + sig = UT_LIST_GET_FIRST(trx->reply_signals); + } +} + /********************************************************************* Checks the compatibility of a new signal with the other signals in the queue. */ @@ -1078,10 +1111,11 @@ trx_sig_is_compatible( /******************************************************************** Sends a signal to a trx object. */ -que_thr_t* +ibool trx_sig_send( /*=========*/ - /* out: next query thread to run */ + /* out: TRUE if the signal was + successfully delivered */ trx_t* trx, /* in: trx handle */ ulint type, /* in: signal type */ ulint sender, /* in: TRX_SIG_SELF or @@ -1089,8 +1123,14 @@ trx_sig_send( que_thr_t* receiver_thr, /* in: query thread which wants the reply, or NULL; if type is TRX_SIG_END_WAIT, this must be NULL */ - trx_savept_t* savept) /* in: possible rollback savepoint, or + trx_savept_t* savept, /* in: possible rollback savepoint, or NULL */ + que_thr_t** next_thr) /* in/out: next query thread to run; + if the value which is passed in is + a pointer to a NULL pointer, then the + calling function can start running + a new query thread; if the parameter + is NULL, it is ignored */ { trx_sig_t* sig; trx_t* receiver_trx; @@ -1100,7 +1140,14 @@ trx_sig_send( ut_ad(mutex_own(&kernel_mutex)); #endif /* UNIV_SYNC_DEBUG */ - ut_a(trx_sig_is_compatible(trx, type, sender)); + if (!trx_sig_is_compatible(trx, type, sender)) { + /* The signal is not compatible with the other signals in + the queue: do nothing */ + + ut_error; + + return(FALSE); + } /* Queue the signal object */ @@ -1134,6 +1181,11 @@ trx_sig_send( sig); } + if (trx->sess->state == SESS_ERROR) { + + trx_sig_reply_wait_to_suspended(trx); + } + if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) { /* The following call will add a TRX_SIG_ERROR_OCCURRED @@ -1148,10 +1200,10 @@ trx_sig_send( if (UT_LIST_GET_FIRST(trx->signals) == sig) { - return(trx_sig_start_handle(trx)); + trx_sig_start_handle(trx, next_thr); } - return(NULL); + return(TRUE); } /******************************************************************** @@ -1173,18 +1225,27 @@ trx_end_signal_handling( trx->handling_signals = FALSE; trx->graph = trx->graph_before_signal_handling; + + if (trx->graph && (trx->sess->state == SESS_ERROR)) { + + que_fork_error_handle(trx, trx->graph); + } } /******************************************************************** Starts handling of a trx signal. */ -que_thr_t* +void trx_sig_start_handle( /*=================*/ - /* out: next query thread to run, or NULL */ - trx_t* trx) /* in: trx handle */ + trx_t* trx, /* in: trx handle */ + que_thr_t** next_thr) /* in/out: next query thread to run; + if the value which is passed in is + a pointer to a NULL pointer, then the + calling function can start running + a new query thread; if the parameter + is NULL, it is ignored */ { - que_thr_t* next_thr = NULL; trx_sig_t* sig; ulint type; loop: @@ -1200,7 +1261,7 @@ loop: trx_end_signal_handling(trx); - return(next_thr); + return; } if (trx->conc_state == TRX_NOT_STARTED) { @@ -1216,13 +1277,23 @@ loop: trx_lock_wait_to_suspended(trx); } + /* If the session is in the error state and this trx has threads + waiting for reply from signals, moves these threads to the suspended + state, canceling wait reservations; note that if the transaction has + sent a commit or rollback signal to itself, and its session is not in + the error state, then nothing is done here. */ + + if (trx->sess->state == SESS_ERROR) { + trx_sig_reply_wait_to_suspended(trx); + } + /* If there are no running query threads, we can start processing of a signal, otherwise we have to wait until all query threads of this transaction are aware of the arrival of the signal. */ if (trx->n_active_thrs > 0) { - return(NULL); + return; } if (trx->handling_signals == FALSE) { @@ -1236,19 +1307,30 @@ loop: if (type == TRX_SIG_COMMIT) { - next_thr = trx_handle_commit_sig_off_kernel(trx); + trx_handle_commit_sig_off_kernel(trx, next_thr); } else if ((type == TRX_SIG_TOTAL_ROLLBACK) - || (type == TRX_SIG_ROLLBACK_TO_SAVEPT) - || (type == TRX_SIG_ERROR_OCCURRED)) { + || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)) { + + trx_rollback(trx, sig, next_thr); + + /* No further signals can be handled until the rollback + completes, therefore we return */ + + return; + + } else if (type == TRX_SIG_ERROR_OCCURRED) { + + trx_rollback(trx, sig, next_thr); + /* No further signals can be handled until the rollback completes, therefore we return */ - return(trx_rollback(trx, sig)); + return; } else if (type == TRX_SIG_BREAK_EXECUTION) { - next_thr = trx_sig_reply(sig); + trx_sig_reply(sig, next_thr); trx_sig_remove(trx, sig); } else { ut_error; @@ -1261,14 +1343,17 @@ loop: Send the reply message when a signal in the queue of the trx has been handled. */ -que_thr_t* +void trx_sig_reply( /*==========*/ - /* out: next query thread to run */ - trx_sig_t* sig) /* in: signal */ + trx_sig_t* sig, /* in: signal */ + que_thr_t** next_thr) /* in/out: next query thread to run; + if the value which is passed in is + a pointer to a NULL pointer, then the + calling function can start running + a new query thread */ { - trx_t* receiver_trx; - que_thr_t* next_thr = NULL; + trx_t* receiver_trx; ut_ad(sig); #ifdef UNIV_SYNC_DEBUG @@ -1282,13 +1367,13 @@ trx_sig_reply( UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals, sig); - next_thr = que_thr_end_wait(sig->receiver); + ut_ad(receiver_trx->sess->state != SESS_ERROR); + + que_thr_end_wait(sig->receiver, next_thr); sig->receiver = NULL; } - - return(next_thr); } /******************************************************************** @@ -1344,6 +1429,7 @@ trx_commit_step( { commit_node_t* node; que_thr_t* next_thr; + ibool success; node = thr->run_node; @@ -1358,15 +1444,22 @@ trx_commit_step( node->state = COMMIT_NODE_WAIT; + next_thr = NULL; + thr->state = QUE_THR_SIG_REPLY_WAIT; /* Send the commit signal to the transaction */ - next_thr = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT, - TRX_SIG_SELF, thr, NULL); - + success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT, + TRX_SIG_SELF, thr, NULL, &next_thr); + mutex_exit(&kernel_mutex); + if (!success) { + /* Error in delivering the commit signal */ + que_thr_handle_error(thr, DB_ERROR, NULL, 0); + } + return(next_thr); } |