diff options
Diffstat (limited to 'innobase/trx/trx0trx.c')
-rw-r--r-- | innobase/trx/trx0trx.c | 165 |
1 files changed, 36 insertions, 129 deletions
diff --git a/innobase/trx/trx0trx.c b/innobase/trx/trx0trx.c index 335e1f69228..54993465f26 100644 --- a/innobase/trx/trx0trx.c +++ b/innobase/trx/trx0trx.c @@ -895,18 +895,15 @@ trx_assign_read_view( /******************************************************************** Commits a transaction. NOTE that the kernel mutex is temporarily released. */ static -void +que_thr_t* trx_handle_commit_sig_off_kernel( /*=============================*/ - trx_t* trx, /* in: transaction */ - que_thr_t** next_thr) /* in/out: next query thread to run; - if the value which is passed in is - a pointer to a NULL pointer, then the - calling function can start running - a new query thread */ + /* out: next query thread to run */ + trx_t* trx) /* in: transaction */ { trx_sig_t* sig; trx_sig_t* next_sig; + que_thr_t* next_thr = NULL; #ifdef UNIV_SYNC_DEBUG ut_ad(mutex_own(&kernel_mutex)); @@ -928,7 +925,8 @@ trx_handle_commit_sig_off_kernel( if (sig->type == TRX_SIG_COMMIT) { - trx_sig_reply(sig, next_thr); + ut_a(next_thr == NULL); + next_thr = trx_sig_reply(sig); trx_sig_remove(trx, sig); } @@ -936,6 +934,8 @@ trx_handle_commit_sig_off_kernel( } trx->que_state = TRX_QUE_RUNNING; + + return(next_thr); } /*************************************************************** @@ -997,39 +997,6 @@ trx_lock_wait_to_suspended( trx->que_state = TRX_QUE_RUNNING; } -/*************************************************************** -Moves the query threads in the sig reply wait list of trx to the SUSPENDED -state. */ -static -void -trx_sig_reply_wait_to_suspended( -/*============================*/ - trx_t* trx) /* in: transaction */ -{ - trx_sig_t* sig; - que_thr_t* thr; - -#ifdef UNIV_SYNC_DEBUG - ut_ad(mutex_own(&kernel_mutex)); -#endif /* UNIV_SYNC_DEBUG */ - - sig = UT_LIST_GET_FIRST(trx->reply_signals); - - while (sig != NULL) { - thr = sig->receiver; - - ut_ad(thr->state == QUE_THR_SIG_REPLY_WAIT); - - thr->state = QUE_THR_SUSPENDED; - - sig->receiver = NULL; - - UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig); - - sig = UT_LIST_GET_FIRST(trx->reply_signals); - } -} - /********************************************************************* Checks the compatibility of a new signal with the other signals in the queue. */ @@ -1109,11 +1076,10 @@ trx_sig_is_compatible( /******************************************************************** Sends a signal to a trx object. */ -ibool +que_thr_t* trx_sig_send( /*=========*/ - /* out: TRUE if the signal was - successfully delivered */ + /* out: next query thread to run */ trx_t* trx, /* in: trx handle */ ulint type, /* in: signal type */ ulint sender, /* in: TRX_SIG_SELF or @@ -1121,14 +1087,8 @@ trx_sig_send( que_thr_t* receiver_thr, /* in: query thread which wants the reply, or NULL; if type is TRX_SIG_END_WAIT, this must be NULL */ - trx_savept_t* savept, /* in: possible rollback savepoint, or + trx_savept_t* savept) /* in: possible rollback savepoint, or NULL */ - que_thr_t** next_thr) /* in/out: next query thread to run; - if the value which is passed in is - a pointer to a NULL pointer, then the - calling function can start running - a new query thread; if the parameter - is NULL, it is ignored */ { trx_sig_t* sig; trx_t* receiver_trx; @@ -1138,14 +1098,7 @@ trx_sig_send( ut_ad(mutex_own(&kernel_mutex)); #endif /* UNIV_SYNC_DEBUG */ - if (!trx_sig_is_compatible(trx, type, sender)) { - /* The signal is not compatible with the other signals in - the queue: do nothing */ - - ut_error; - - return(FALSE); - } + ut_a(trx_sig_is_compatible(trx, type, sender)); /* Queue the signal object */ @@ -1179,11 +1132,6 @@ trx_sig_send( sig); } - if (trx->sess->state == SESS_ERROR) { - - trx_sig_reply_wait_to_suspended(trx); - } - if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) { /* The following call will add a TRX_SIG_ERROR_OCCURRED @@ -1198,10 +1146,10 @@ trx_sig_send( if (UT_LIST_GET_FIRST(trx->signals) == sig) { - trx_sig_start_handle(trx, next_thr); + return(trx_sig_start_handle(trx)); } - return(TRUE); + return(NULL); } /******************************************************************** @@ -1223,27 +1171,18 @@ trx_end_signal_handling( trx->handling_signals = FALSE; trx->graph = trx->graph_before_signal_handling; - - if (trx->graph && (trx->sess->state == SESS_ERROR)) { - - que_fork_error_handle(trx, trx->graph); - } } /******************************************************************** Starts handling of a trx signal. */ -void +que_thr_t* trx_sig_start_handle( /*=================*/ - trx_t* trx, /* in: trx handle */ - que_thr_t** next_thr) /* in/out: next query thread to run; - if the value which is passed in is - a pointer to a NULL pointer, then the - calling function can start running - a new query thread; if the parameter - is NULL, it is ignored */ + /* out: next query thread to run, or NULL */ + trx_t* trx) /* in: trx handle */ { + que_thr_t* next_thr = NULL; trx_sig_t* sig; ulint type; loop: @@ -1259,7 +1198,7 @@ loop: trx_end_signal_handling(trx); - return; + return(next_thr); } if (trx->conc_state == TRX_NOT_STARTED) { @@ -1275,23 +1214,13 @@ loop: trx_lock_wait_to_suspended(trx); } - /* If the session is in the error state and this trx has threads - waiting for reply from signals, moves these threads to the suspended - state, canceling wait reservations; note that if the transaction has - sent a commit or rollback signal to itself, and its session is not in - the error state, then nothing is done here. */ - - if (trx->sess->state == SESS_ERROR) { - trx_sig_reply_wait_to_suspended(trx); - } - /* If there are no running query threads, we can start processing of a signal, otherwise we have to wait until all query threads of this transaction are aware of the arrival of the signal. */ if (trx->n_active_thrs > 0) { - return; + return(NULL); } if (trx->handling_signals == FALSE) { @@ -1305,30 +1234,19 @@ loop: if (type == TRX_SIG_COMMIT) { - trx_handle_commit_sig_off_kernel(trx, next_thr); + next_thr = trx_handle_commit_sig_off_kernel(trx); } else if ((type == TRX_SIG_TOTAL_ROLLBACK) - || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)) { - - trx_rollback(trx, sig, next_thr); - - /* No further signals can be handled until the rollback - completes, therefore we return */ - - return; - - } else if (type == TRX_SIG_ERROR_OCCURRED) { - - trx_rollback(trx, sig, next_thr); - + || (type == TRX_SIG_ROLLBACK_TO_SAVEPT) + || (type == TRX_SIG_ERROR_OCCURRED)) { /* No further signals can be handled until the rollback completes, therefore we return */ - return; + return(trx_rollback(trx, sig)); } else if (type == TRX_SIG_BREAK_EXECUTION) { - trx_sig_reply(sig, next_thr); + next_thr = trx_sig_reply(sig); trx_sig_remove(trx, sig); } else { ut_error; @@ -1341,17 +1259,14 @@ loop: Send the reply message when a signal in the queue of the trx has been handled. */ -void +que_thr_t* trx_sig_reply( /*==========*/ - trx_sig_t* sig, /* in: signal */ - que_thr_t** next_thr) /* in/out: next query thread to run; - if the value which is passed in is - a pointer to a NULL pointer, then the - calling function can start running - a new query thread */ + /* out: next query thread to run */ + trx_sig_t* sig) /* in: signal */ { - trx_t* receiver_trx; + trx_t* receiver_trx; + que_thr_t* next_thr = NULL; ut_ad(sig); #ifdef UNIV_SYNC_DEBUG @@ -1365,13 +1280,13 @@ trx_sig_reply( UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals, sig); - ut_ad(receiver_trx->sess->state != SESS_ERROR); - - que_thr_end_wait(sig->receiver, next_thr); + next_thr = que_thr_end_wait(sig->receiver); sig->receiver = NULL; } + + return(next_thr); } /******************************************************************** @@ -1427,7 +1342,6 @@ trx_commit_step( { commit_node_t* node; que_thr_t* next_thr; - ibool success; node = thr->run_node; @@ -1442,21 +1356,14 @@ trx_commit_step( node->state = COMMIT_NODE_WAIT; - next_thr = NULL; - thr->state = QUE_THR_SIG_REPLY_WAIT; /* Send the commit signal to the transaction */ - success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT, - TRX_SIG_SELF, thr, NULL, &next_thr); - - mutex_exit(&kernel_mutex); + next_thr = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT, + TRX_SIG_SELF, thr, NULL); - if (!success) { - /* Error in delivering the commit signal */ - que_thr_handle_error(thr, DB_ERROR, NULL, 0); - } + mutex_exit(&kernel_mutex); return(next_thr); } |