summaryrefslogtreecommitdiff
path: root/sql/transaction.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/transaction.cc')
-rw-r--r--sql/transaction.cc391
1 files changed, 24 insertions, 367 deletions
diff --git a/sql/transaction.cc b/sql/transaction.cc
index 8c28543a0e7..aecee04c364 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -24,19 +24,20 @@
#include "debug_sync.h" // DEBUG_SYNC
#include "sql_acl.h"
#include "semisync_master.h"
+#ifdef WITH_WSREP
+#include "wsrep_trans_observer.h"
+#endif /* WITH_WSREP */
-#ifndef EMBEDDED_LIBRARY
/**
Helper: Tell tracker (if any) that transaction ended.
*/
-static void trans_track_end_trx(THD *thd)
+void trans_track_end_trx(THD *thd)
{
+#ifndef EMBEDDED_LIBRARY
if (thd->variables.session_track_transaction_info > TX_TRACK_NONE)
thd->session_tracker.transaction_info.end_trx(thd);
-}
-#else
-#define trans_track_end_trx(A) do{}while(0)
#endif //EMBEDDED_LIBRARY
+}
/**
@@ -59,7 +60,6 @@ void trans_reset_one_shot_chistics(THD *thd)
/* Conditions under which the transaction state must not change. */
static bool trans_check(THD *thd)
{
- enum xa_states xa_state= thd->transaction.xid_state.xa_state;
DBUG_ENTER("trans_check");
/*
@@ -70,8 +70,8 @@ static bool trans_check(THD *thd)
if (unlikely(thd->in_sub_stmt))
my_error(ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG, MYF(0));
- if (xa_state != XA_NOTR)
- my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
+ if (thd->transaction.xid_state.is_explicit_XA())
+ thd->transaction.xid_state.er_xaer_rmfail();
else
DBUG_RETURN(FALSE);
@@ -80,67 +80,6 @@ static bool trans_check(THD *thd)
/**
- Mark a XA transaction as rollback-only if the RM unilaterally
- rolled back the transaction branch.
-
- @note If a rollback was requested by the RM, this function sets
- the appropriate rollback error code and transits the state
- to XA_ROLLBACK_ONLY.
-
- @return TRUE if transaction was rolled back or if the transaction
- state is XA_ROLLBACK_ONLY. FALSE otherwise.
-*/
-static bool xa_trans_rolled_back(XID_STATE *xid_state)
-{
- if (xid_state->rm_error)
- {
- switch (xid_state->rm_error) {
- case ER_LOCK_WAIT_TIMEOUT:
- my_error(ER_XA_RBTIMEOUT, MYF(0));
- break;
- case ER_LOCK_DEADLOCK:
- my_error(ER_XA_RBDEADLOCK, MYF(0));
- break;
- default:
- my_error(ER_XA_RBROLLBACK, MYF(0));
- }
- xid_state->xa_state= XA_ROLLBACK_ONLY;
- }
-
- return (xid_state->xa_state == XA_ROLLBACK_ONLY);
-}
-
-
-/**
- Rollback the active XA transaction.
-
- @note Resets rm_error before calling ha_rollback(), so
- the thd->transaction.xid structure gets reset
- by ha_rollback() / THD::transaction::cleanup().
-
- @return TRUE if the rollback failed, FALSE otherwise.
-*/
-
-static bool xa_trans_force_rollback(THD *thd)
-{
- /*
- We must reset rm_error before calling ha_rollback(),
- so thd->transaction.xid structure gets reset
- by ha_rollback()/THD::transaction::cleanup().
- */
- thd->transaction.xid_state.rm_error= 0;
- if (WSREP_ON)
- wsrep_register_hton(thd, TRUE);
- if (ha_rollback_trans(thd, true))
- {
- my_error(ER_XAER_RMERR, MYF(0));
- return true;
- }
- return false;
-}
-
-
-/**
Begin a new transaction.
@note Beginning a transaction implicitly commits any current
@@ -169,14 +108,16 @@ bool trans_begin(THD *thd, uint flags)
(thd->variables.option_bits & OPTION_TABLE_LOCK))
{
thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
- if (WSREP_ON)
- wsrep_register_hton(thd, TRUE);
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= MY_TEST(ha_commit_trans(thd, TRUE));
- if (WSREP_ON)
- wsrep_post_commit(thd, TRUE);
+#ifdef WITH_WSREP
+ if (wsrep_thd_is_local(thd))
+ {
+ res= res || wsrep_after_statement(thd);
+ }
+#endif /* WITH_WSREP */
}
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
@@ -237,9 +178,14 @@ bool trans_begin(THD *thd, uint flags)
}
#ifdef WITH_WSREP
- thd->wsrep_PA_safe= true;
- if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd))
- DBUG_RETURN(TRUE);
+ if (wsrep_thd_is_local(thd))
+ {
+ if (wsrep_sync_wait(thd))
+ DBUG_RETURN(TRUE);
+ if (!thd->tx_read_only &&
+ wsrep_start_transaction(thd, thd->wsrep_next_trx_id()))
+ DBUG_RETURN(TRUE);
+ }
#endif /* WITH_WSREP */
thd->variables.option_bits|= OPTION_BEGIN;
@@ -284,8 +230,6 @@ bool trans_commit(THD *thd)
if (trans_check(thd))
DBUG_RETURN(TRUE);
- if (WSREP_ON)
- wsrep_register_hton(thd, TRUE);
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
@@ -296,8 +240,6 @@ bool trans_commit(THD *thd)
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
- if (WSREP_ON)
- wsrep_post_commit(thd, TRUE);
/*
if res is non-zero, then ha_commit_trans has rolled back the
transaction, so the hooks for rollback will be called.
@@ -353,14 +295,10 @@ bool trans_commit_implicit(THD *thd)
/* Safety if one did "drop table" on locked tables */
if (!thd->locked_tables_mode)
thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
- if (WSREP_ON)
- wsrep_register_hton(thd, TRUE);
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= MY_TEST(ha_commit_trans(thd, TRUE));
- if (WSREP_ON)
- wsrep_post_commit(thd, TRUE);
}
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
@@ -394,14 +332,9 @@ bool trans_rollback(THD *thd)
int res;
DBUG_ENTER("trans_rollback");
-#ifdef WITH_WSREP
- thd->wsrep_PA_safe= true;
-#endif /* WITH_WSREP */
if (trans_check(thd))
DBUG_RETURN(TRUE);
- if (WSREP_ON)
- wsrep_register_hton(thd, TRUE);
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
@@ -500,14 +433,10 @@ bool trans_commit_stmt(THD *thd)
if (thd->transaction.stmt.ha_list)
{
- if (WSREP_ON)
- wsrep_register_hton(thd, FALSE);
res= ha_commit_trans(thd, FALSE);
if (! thd->in_active_multi_stmt_transaction())
{
trans_reset_one_shot_chistics(thd);
- if (WSREP_ON)
- wsrep_post_commit(thd, FALSE);
}
}
@@ -563,8 +492,6 @@ bool trans_rollback_stmt(THD *thd)
if (thd->transaction.stmt.ha_list)
{
- if (WSREP_ON)
- wsrep_register_hton(thd, FALSE);
ha_rollback_trans(thd, FALSE);
if (! thd->in_active_multi_stmt_transaction())
trans_reset_one_shot_chistics(thd);
@@ -718,7 +645,8 @@ bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name)
logging is off.
*/
bool mdl_can_safely_rollback_to_savepoint=
- (!(mysql_bin_log.is_open() && thd->variables.sql_log_bin) ||
+ (!((WSREP_EMULATE_BINLOG_NNULL(thd) || mysql_bin_log.is_open())
+ && thd->variables.sql_log_bin) ||
ha_rollback_to_savepoint_can_release_mdl(thd));
if (ha_rollback_to_savepoint(thd, sv))
@@ -772,274 +700,3 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name)
DBUG_RETURN(MY_TEST(res));
}
-
-
-/**
- Starts an XA transaction with the given xid value.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_start(THD *thd)
-{
- enum xa_states xa_state= thd->transaction.xid_state.xa_state;
- DBUG_ENTER("trans_xa_start");
-
- if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME)
- {
- bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid);
- if (not_equal)
- my_error(ER_XAER_NOTA, MYF(0));
- else
- thd->transaction.xid_state.xa_state= XA_ACTIVE;
- DBUG_RETURN(not_equal);
- }
-
- /* TODO: JOIN is not supported yet. */
- if (thd->lex->xa_opt != XA_NONE)
- my_error(ER_XAER_INVAL, MYF(0));
- else if (xa_state != XA_NOTR)
- my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
- else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction())
- my_error(ER_XAER_OUTSIDE, MYF(0));
- else if (!trans_begin(thd))
- {
- DBUG_ASSERT(thd->transaction.xid_state.xid.is_null());
- thd->transaction.xid_state.xa_state= XA_ACTIVE;
- thd->transaction.xid_state.rm_error= 0;
- thd->transaction.xid_state.xid.set(thd->lex->xid);
- if (xid_cache_insert(thd, &thd->transaction.xid_state))
- {
- thd->transaction.xid_state.xa_state= XA_NOTR;
- thd->transaction.xid_state.xid.null();
- trans_rollback(thd);
- DBUG_RETURN(true);
- }
- DBUG_RETURN(FALSE);
- }
-
- DBUG_RETURN(TRUE);
-}
-
-
-/**
- Put a XA transaction in the IDLE state.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_end(THD *thd)
-{
- DBUG_ENTER("trans_xa_end");
-
- /* TODO: SUSPEND and FOR MIGRATE are not supported yet. */
- if (thd->lex->xa_opt != XA_NONE)
- my_error(ER_XAER_INVAL, MYF(0));
- else if (thd->transaction.xid_state.xa_state != XA_ACTIVE)
- my_error(ER_XAER_RMFAIL, MYF(0),
- xa_state_names[thd->transaction.xid_state.xa_state]);
- else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
- my_error(ER_XAER_NOTA, MYF(0));
- else if (!xa_trans_rolled_back(&thd->transaction.xid_state))
- thd->transaction.xid_state.xa_state= XA_IDLE;
-
- DBUG_RETURN(thd->is_error() ||
- thd->transaction.xid_state.xa_state != XA_IDLE);
-}
-
-
-/**
- Put a XA transaction in the PREPARED state.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_prepare(THD *thd)
-{
- DBUG_ENTER("trans_xa_prepare");
-
- if (thd->transaction.xid_state.xa_state != XA_IDLE)
- my_error(ER_XAER_RMFAIL, MYF(0),
- xa_state_names[thd->transaction.xid_state.xa_state]);
- else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
- my_error(ER_XAER_NOTA, MYF(0));
- else if (ha_prepare(thd))
- {
- xid_cache_delete(thd, &thd->transaction.xid_state);
- thd->transaction.xid_state.xa_state= XA_NOTR;
- my_error(ER_XA_RBROLLBACK, MYF(0));
- }
- else
- thd->transaction.xid_state.xa_state= XA_PREPARED;
-
- DBUG_RETURN(thd->is_error() ||
- thd->transaction.xid_state.xa_state != XA_PREPARED);
-}
-
-
-/**
- Commit and terminate the a XA transaction.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_commit(THD *thd)
-{
- bool res= TRUE;
- enum xa_states xa_state= thd->transaction.xid_state.xa_state;
- DBUG_ENTER("trans_xa_commit");
-
- if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
- {
- if (thd->fix_xid_hash_pins())
- {
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
- DBUG_RETURN(TRUE);
- }
-
- XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
- res= !xs;
- if (res)
- my_error(ER_XAER_NOTA, MYF(0));
- else
- {
- res= xa_trans_rolled_back(xs);
- ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
- xid_cache_delete(thd, xs);
- }
- DBUG_RETURN(res);
- }
-
- if (xa_trans_rolled_back(&thd->transaction.xid_state))
- {
- xa_trans_force_rollback(thd);
- res= thd->is_error();
- }
- else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE)
- {
- if (WSREP_ON)
- wsrep_register_hton(thd, TRUE);
- int r= ha_commit_trans(thd, TRUE);
- if ((res= MY_TEST(r)))
- my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
- if (WSREP_ON)
- wsrep_post_commit(thd, TRUE);
- }
- else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE)
- {
- MDL_request mdl_request;
-
- /*
- Acquire metadata lock which will ensure that COMMIT is blocked
- by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
- progress blocks FTWRL).
-
- We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
- */
- mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
- MDL_TRANSACTION);
-
- if (thd->mdl_context.acquire_lock(&mdl_request,
- thd->variables.lock_wait_timeout))
- {
- if (WSREP_ON)
- wsrep_register_hton(thd, TRUE);
- ha_rollback_trans(thd, TRUE);
- my_error(ER_XAER_RMERR, MYF(0));
- }
- else
- {
- DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
-
- res= MY_TEST(ha_commit_one_phase(thd, 1));
- if (res)
- my_error(ER_XAER_RMERR, MYF(0));
- }
- }
- else
- {
- my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
- DBUG_RETURN(TRUE);
- }
-
- thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->transaction.all.reset();
- thd->server_status&=
- ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
- DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
- xid_cache_delete(thd, &thd->transaction.xid_state);
- thd->transaction.xid_state.xa_state= XA_NOTR;
-
- trans_track_end_trx(thd);
-
- DBUG_RETURN(res);
-}
-
-
-/**
- Roll back and terminate a XA transaction.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_rollback(THD *thd)
-{
- bool res= TRUE;
- enum xa_states xa_state= thd->transaction.xid_state.xa_state;
- DBUG_ENTER("trans_xa_rollback");
-
- if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
- {
- if (thd->fix_xid_hash_pins())
- {
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
- DBUG_RETURN(TRUE);
- }
-
- XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
- if (!xs)
- my_error(ER_XAER_NOTA, MYF(0));
- else
- {
- xa_trans_rolled_back(xs);
- ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
- xid_cache_delete(thd, xs);
- }
- DBUG_RETURN(thd->get_stmt_da()->is_error());
- }
-
- if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY)
- {
- my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
- DBUG_RETURN(TRUE);
- }
-
- res= xa_trans_force_rollback(thd);
-
- thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->transaction.all.reset();
- thd->server_status&=
- ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
- DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
- xid_cache_delete(thd, &thd->transaction.xid_state);
- thd->transaction.xid_state.xa_state= XA_NOTR;
-
- trans_track_end_trx(thd);
-
- DBUG_RETURN(res);
-}