diff options
Diffstat (limited to 'sql/transaction.cc')
-rw-r--r-- | sql/transaction.cc | 391 |
1 files changed, 24 insertions, 367 deletions
diff --git a/sql/transaction.cc b/sql/transaction.cc index 8c28543a0e7..aecee04c364 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -24,19 +24,20 @@ #include "debug_sync.h" // DEBUG_SYNC #include "sql_acl.h" #include "semisync_master.h" +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" +#endif /* WITH_WSREP */ -#ifndef EMBEDDED_LIBRARY /** Helper: Tell tracker (if any) that transaction ended. */ -static void trans_track_end_trx(THD *thd) +void trans_track_end_trx(THD *thd) { +#ifndef EMBEDDED_LIBRARY if (thd->variables.session_track_transaction_info > TX_TRACK_NONE) thd->session_tracker.transaction_info.end_trx(thd); -} -#else -#define trans_track_end_trx(A) do{}while(0) #endif //EMBEDDED_LIBRARY +} /** @@ -59,7 +60,6 @@ void trans_reset_one_shot_chistics(THD *thd) /* Conditions under which the transaction state must not change. */ static bool trans_check(THD *thd) { - enum xa_states xa_state= thd->transaction.xid_state.xa_state; DBUG_ENTER("trans_check"); /* @@ -70,8 +70,8 @@ static bool trans_check(THD *thd) if (unlikely(thd->in_sub_stmt)) my_error(ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG, MYF(0)); - if (xa_state != XA_NOTR) - my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); + if (thd->transaction.xid_state.is_explicit_XA()) + thd->transaction.xid_state.er_xaer_rmfail(); else DBUG_RETURN(FALSE); @@ -80,67 +80,6 @@ static bool trans_check(THD *thd) /** - Mark a XA transaction as rollback-only if the RM unilaterally - rolled back the transaction branch. - - @note If a rollback was requested by the RM, this function sets - the appropriate rollback error code and transits the state - to XA_ROLLBACK_ONLY. - - @return TRUE if transaction was rolled back or if the transaction - state is XA_ROLLBACK_ONLY. FALSE otherwise. -*/ -static bool xa_trans_rolled_back(XID_STATE *xid_state) -{ - if (xid_state->rm_error) - { - switch (xid_state->rm_error) { - case ER_LOCK_WAIT_TIMEOUT: - my_error(ER_XA_RBTIMEOUT, MYF(0)); - break; - case ER_LOCK_DEADLOCK: - my_error(ER_XA_RBDEADLOCK, MYF(0)); - break; - default: - my_error(ER_XA_RBROLLBACK, MYF(0)); - } - xid_state->xa_state= XA_ROLLBACK_ONLY; - } - - return (xid_state->xa_state == XA_ROLLBACK_ONLY); -} - - -/** - Rollback the active XA transaction. - - @note Resets rm_error before calling ha_rollback(), so - the thd->transaction.xid structure gets reset - by ha_rollback() / THD::transaction::cleanup(). - - @return TRUE if the rollback failed, FALSE otherwise. -*/ - -static bool xa_trans_force_rollback(THD *thd) -{ - /* - We must reset rm_error before calling ha_rollback(), - so thd->transaction.xid structure gets reset - by ha_rollback()/THD::transaction::cleanup(). - */ - thd->transaction.xid_state.rm_error= 0; - if (WSREP_ON) - wsrep_register_hton(thd, TRUE); - if (ha_rollback_trans(thd, true)) - { - my_error(ER_XAER_RMERR, MYF(0)); - return true; - } - return false; -} - - -/** Begin a new transaction. @note Beginning a transaction implicitly commits any current @@ -169,14 +108,16 @@ bool trans_begin(THD *thd, uint flags) (thd->variables.option_bits & OPTION_TABLE_LOCK)) { thd->variables.option_bits&= ~OPTION_TABLE_LOCK; - if (WSREP_ON) - wsrep_register_hton(thd, TRUE); thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= MY_TEST(ha_commit_trans(thd, TRUE)); - if (WSREP_ON) - wsrep_post_commit(thd, TRUE); +#ifdef WITH_WSREP + if (wsrep_thd_is_local(thd)) + { + res= res || wsrep_after_statement(thd); + } +#endif /* WITH_WSREP */ } thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); @@ -237,9 +178,14 @@ bool trans_begin(THD *thd, uint flags) } #ifdef WITH_WSREP - thd->wsrep_PA_safe= true; - if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) - DBUG_RETURN(TRUE); + if (wsrep_thd_is_local(thd)) + { + if (wsrep_sync_wait(thd)) + DBUG_RETURN(TRUE); + if (!thd->tx_read_only && + wsrep_start_transaction(thd, thd->wsrep_next_trx_id())) + DBUG_RETURN(TRUE); + } #endif /* WITH_WSREP */ thd->variables.option_bits|= OPTION_BEGIN; @@ -284,8 +230,6 @@ bool trans_commit(THD *thd) if (trans_check(thd)) DBUG_RETURN(TRUE); - if (WSREP_ON) - wsrep_register_hton(thd, TRUE); thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); @@ -296,8 +240,6 @@ bool trans_commit(THD *thd) mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync); mysql_mutex_assert_not_owner(&LOCK_commit_ordered); - if (WSREP_ON) - wsrep_post_commit(thd, TRUE); /* if res is non-zero, then ha_commit_trans has rolled back the transaction, so the hooks for rollback will be called. @@ -353,14 +295,10 @@ bool trans_commit_implicit(THD *thd) /* Safety if one did "drop table" on locked tables */ if (!thd->locked_tables_mode) thd->variables.option_bits&= ~OPTION_TABLE_LOCK; - if (WSREP_ON) - wsrep_register_hton(thd, TRUE); thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= MY_TEST(ha_commit_trans(thd, TRUE)); - if (WSREP_ON) - wsrep_post_commit(thd, TRUE); } thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); @@ -394,14 +332,9 @@ bool trans_rollback(THD *thd) int res; DBUG_ENTER("trans_rollback"); -#ifdef WITH_WSREP - thd->wsrep_PA_safe= true; -#endif /* WITH_WSREP */ if (trans_check(thd)) DBUG_RETURN(TRUE); - if (WSREP_ON) - wsrep_register_hton(thd, TRUE); thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); @@ -500,14 +433,10 @@ bool trans_commit_stmt(THD *thd) if (thd->transaction.stmt.ha_list) { - if (WSREP_ON) - wsrep_register_hton(thd, FALSE); res= ha_commit_trans(thd, FALSE); if (! thd->in_active_multi_stmt_transaction()) { trans_reset_one_shot_chistics(thd); - if (WSREP_ON) - wsrep_post_commit(thd, FALSE); } } @@ -563,8 +492,6 @@ bool trans_rollback_stmt(THD *thd) if (thd->transaction.stmt.ha_list) { - if (WSREP_ON) - wsrep_register_hton(thd, FALSE); ha_rollback_trans(thd, FALSE); if (! thd->in_active_multi_stmt_transaction()) trans_reset_one_shot_chistics(thd); @@ -718,7 +645,8 @@ bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name) logging is off. */ bool mdl_can_safely_rollback_to_savepoint= - (!(mysql_bin_log.is_open() && thd->variables.sql_log_bin) || + (!((WSREP_EMULATE_BINLOG_NNULL(thd) || mysql_bin_log.is_open()) + && thd->variables.sql_log_bin) || ha_rollback_to_savepoint_can_release_mdl(thd)); if (ha_rollback_to_savepoint(thd, sv)) @@ -772,274 +700,3 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) DBUG_RETURN(MY_TEST(res)); } - - -/** - Starts an XA transaction with the given xid value. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_start(THD *thd) -{ - enum xa_states xa_state= thd->transaction.xid_state.xa_state; - DBUG_ENTER("trans_xa_start"); - - if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME) - { - bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid); - if (not_equal) - my_error(ER_XAER_NOTA, MYF(0)); - else - thd->transaction.xid_state.xa_state= XA_ACTIVE; - DBUG_RETURN(not_equal); - } - - /* TODO: JOIN is not supported yet. */ - if (thd->lex->xa_opt != XA_NONE) - my_error(ER_XAER_INVAL, MYF(0)); - else if (xa_state != XA_NOTR) - my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); - else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction()) - my_error(ER_XAER_OUTSIDE, MYF(0)); - else if (!trans_begin(thd)) - { - DBUG_ASSERT(thd->transaction.xid_state.xid.is_null()); - thd->transaction.xid_state.xa_state= XA_ACTIVE; - thd->transaction.xid_state.rm_error= 0; - thd->transaction.xid_state.xid.set(thd->lex->xid); - if (xid_cache_insert(thd, &thd->transaction.xid_state)) - { - thd->transaction.xid_state.xa_state= XA_NOTR; - thd->transaction.xid_state.xid.null(); - trans_rollback(thd); - DBUG_RETURN(true); - } - DBUG_RETURN(FALSE); - } - - DBUG_RETURN(TRUE); -} - - -/** - Put a XA transaction in the IDLE state. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_end(THD *thd) -{ - DBUG_ENTER("trans_xa_end"); - - /* TODO: SUSPEND and FOR MIGRATE are not supported yet. */ - if (thd->lex->xa_opt != XA_NONE) - my_error(ER_XAER_INVAL, MYF(0)); - else if (thd->transaction.xid_state.xa_state != XA_ACTIVE) - my_error(ER_XAER_RMFAIL, MYF(0), - xa_state_names[thd->transaction.xid_state.xa_state]); - else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) - my_error(ER_XAER_NOTA, MYF(0)); - else if (!xa_trans_rolled_back(&thd->transaction.xid_state)) - thd->transaction.xid_state.xa_state= XA_IDLE; - - DBUG_RETURN(thd->is_error() || - thd->transaction.xid_state.xa_state != XA_IDLE); -} - - -/** - Put a XA transaction in the PREPARED state. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_prepare(THD *thd) -{ - DBUG_ENTER("trans_xa_prepare"); - - if (thd->transaction.xid_state.xa_state != XA_IDLE) - my_error(ER_XAER_RMFAIL, MYF(0), - xa_state_names[thd->transaction.xid_state.xa_state]); - else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) - my_error(ER_XAER_NOTA, MYF(0)); - else if (ha_prepare(thd)) - { - xid_cache_delete(thd, &thd->transaction.xid_state); - thd->transaction.xid_state.xa_state= XA_NOTR; - my_error(ER_XA_RBROLLBACK, MYF(0)); - } - else - thd->transaction.xid_state.xa_state= XA_PREPARED; - - DBUG_RETURN(thd->is_error() || - thd->transaction.xid_state.xa_state != XA_PREPARED); -} - - -/** - Commit and terminate the a XA transaction. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_commit(THD *thd) -{ - bool res= TRUE; - enum xa_states xa_state= thd->transaction.xid_state.xa_state; - DBUG_ENTER("trans_xa_commit"); - - if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) - { - if (thd->fix_xid_hash_pins()) - { - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - DBUG_RETURN(TRUE); - } - - XID_STATE *xs= xid_cache_search(thd, thd->lex->xid); - res= !xs; - if (res) - my_error(ER_XAER_NOTA, MYF(0)); - else - { - res= xa_trans_rolled_back(xs); - ha_commit_or_rollback_by_xid(thd->lex->xid, !res); - xid_cache_delete(thd, xs); - } - DBUG_RETURN(res); - } - - if (xa_trans_rolled_back(&thd->transaction.xid_state)) - { - xa_trans_force_rollback(thd); - res= thd->is_error(); - } - else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE) - { - if (WSREP_ON) - wsrep_register_hton(thd, TRUE); - int r= ha_commit_trans(thd, TRUE); - if ((res= MY_TEST(r))) - my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0)); - if (WSREP_ON) - wsrep_post_commit(thd, TRUE); - } - else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE) - { - MDL_request mdl_request; - - /* - Acquire metadata lock which will ensure that COMMIT is blocked - by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in - progress blocks FTWRL). - - We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does. - */ - mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE, - MDL_TRANSACTION); - - if (thd->mdl_context.acquire_lock(&mdl_request, - thd->variables.lock_wait_timeout)) - { - if (WSREP_ON) - wsrep_register_hton(thd, TRUE); - ha_rollback_trans(thd, TRUE); - my_error(ER_XAER_RMERR, MYF(0)); - } - else - { - DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock"); - - res= MY_TEST(ha_commit_one_phase(thd, 1)); - if (res) - my_error(ER_XAER_RMERR, MYF(0)); - } - } - else - { - my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); - DBUG_RETURN(TRUE); - } - - thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); - thd->transaction.all.reset(); - thd->server_status&= - ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); - DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); - xid_cache_delete(thd, &thd->transaction.xid_state); - thd->transaction.xid_state.xa_state= XA_NOTR; - - trans_track_end_trx(thd); - - DBUG_RETURN(res); -} - - -/** - Roll back and terminate a XA transaction. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_rollback(THD *thd) -{ - bool res= TRUE; - enum xa_states xa_state= thd->transaction.xid_state.xa_state; - DBUG_ENTER("trans_xa_rollback"); - - if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) - { - if (thd->fix_xid_hash_pins()) - { - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - DBUG_RETURN(TRUE); - } - - XID_STATE *xs= xid_cache_search(thd, thd->lex->xid); - if (!xs) - my_error(ER_XAER_NOTA, MYF(0)); - else - { - xa_trans_rolled_back(xs); - ha_commit_or_rollback_by_xid(thd->lex->xid, 0); - xid_cache_delete(thd, xs); - } - DBUG_RETURN(thd->get_stmt_da()->is_error()); - } - - if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY) - { - my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); - DBUG_RETURN(TRUE); - } - - res= xa_trans_force_rollback(thd); - - thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); - thd->transaction.all.reset(); - thd->server_status&= - ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); - DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); - xid_cache_delete(thd, &thd->transaction.xid_state); - thd->transaction.xid_state.xa_state= XA_NOTR; - - trans_track_end_trx(thd); - - DBUG_RETURN(res); -} |