diff options
-rw-r--r-- | sql/sql_class.h | 1 | ||||
-rw-r--r-- | sql/xa.cc | 154 | ||||
-rw-r--r-- | sql/xa.h | 4 | ||||
-rw-r--r-- | storage/spider/spd_trx.cc | 8 |
4 files changed, 82 insertions, 85 deletions
diff --git a/sql/sql_class.h b/sql/sql_class.h index 32165aed223..9e51f45126a 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2618,7 +2618,6 @@ public: st_transactions() { bzero((char*)this, sizeof(*this)); - xid_state.xid.null(); implicit_xid.null(); init_sql_alloc(&mem_root, "THD::transactions", ALLOC_ROOT_MIN_BLOCK_SIZE, 0, diff --git a/sql/xa.cc b/sql/xa.cc index 0214059b401..64efa06253a 100644 --- a/sql/xa.cc +++ b/sql/xa.cc @@ -26,6 +26,18 @@ ***************************************************************************/ enum xa_states { XA_ACTIVE= 0, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY }; + +struct XID_cache_insert_element +{ + enum xa_states xa_state; + XID *xid; + XID_cache_element *xid_cache_element; + + XID_cache_insert_element(enum xa_states xa_state_arg, XID *xid_arg): + xa_state(xa_state_arg), xid(xid_arg) {} +}; + + class XID_cache_element { /* @@ -62,10 +74,10 @@ class XID_cache_element public: static const int32 ACQUIRED= 1 << 30; static const int32 RECOVERED= 1 << 29; - XID_STATE *m_xid_state; /* Error reported by the Resource Manager (RM) to the Transaction Manager. */ uint rm_error; enum xa_states xa_state; + XID xid; bool is_set(int32_t flag) { return m_state.load(std::memory_order_relaxed) & flag; } void set(int32_t flag) @@ -110,12 +122,13 @@ public: } static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)), XID_cache_element *element, - XID_STATE *xid_state) + XID_cache_insert_element *new_element) { DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED)); element->rm_error= 0; - element->m_xid_state= xid_state; - xid_state->xid_cache_element= element; + element->xa_state= new_element->xa_state; + element->xid.set(new_element->xid); + new_element->xid_cache_element= element; } static void lf_alloc_constructor(uchar *ptr) { @@ -126,14 +139,12 @@ public: { XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); DBUG_ASSERT(!element->is_set(ACQUIRED)); - if (element->is_set(RECOVERED)) - my_free(element->m_xid_state); } static uchar *key(const XID_cache_element *element, size_t *length, my_bool not_used __attribute__((unused))) { - *length= element->m_xid_state->xid.key_length(); - return element->m_xid_state->xid.key(); + *length= element->xid.key_length(); + return element->xid.key(); } }; @@ -191,7 +202,7 @@ bool XID_STATE::check_has_uncommitted_xa() const XID *XID_STATE::get_xid() const { DBUG_ASSERT(is_explicit_XA()); - return const_cast<XID*>(&xid); + return &xid_cache_element->xid; } @@ -221,90 +232,81 @@ void xid_cache_free() Find recovered XA transaction by XID. */ -static XID_STATE *xid_cache_search(THD *thd, XID *xid) +static XID_cache_element *xid_cache_search(THD *thd, XID *xid) { - XID_STATE *xs= 0; DBUG_ASSERT(thd->xid_hash_pins); XID_cache_element *element= (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins, xid->key(), xid->key_length()); if (element) { - if (element->acquire_recovered()) - xs= element->m_xid_state; + if (!element->acquire_recovered()) + element= 0; lf_hash_search_unpin(thd->xid_hash_pins); DEBUG_SYNC(thd, "xa_after_search"); } - return xs; + return element; } bool xid_cache_insert(XID *xid) { - XID_STATE *xs; + XID_cache_insert_element new_element(XA_PREPARED, xid); LF_PINS *pins; - int res= 1; if (!(pins= lf_hash_get_pins(&xid_cache))) return true; - if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME)))) + int res= lf_hash_insert(&xid_cache, pins, &new_element); + switch (res) { - xs->xid.set(xid); - - if ((res= lf_hash_insert(&xid_cache, pins, xs))) - my_free(xs); - else - { - xs->xid_cache_element->xa_state= XA_PREPARED; - xs->xid_cache_element->set(XID_cache_element::RECOVERED); - } - if (res == 1) - res= 0; + case 0: + new_element.xid_cache_element->set(XID_cache_element::RECOVERED); + break; + case 1: + res= 0; } lf_hash_put_pins(pins); return res; } -bool xid_cache_insert(THD *thd, XID_STATE *xid_state) +bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid) { + XID_cache_insert_element new_element(XA_ACTIVE, xid); + if (thd->fix_xid_hash_pins()) return true; - int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state); + int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, &new_element); switch (res) { case 0: - xid_state->xid_cache_element->xa_state= XA_ACTIVE; + xid_state->xid_cache_element= new_element.xid_cache_element; xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED); break; case 1: my_error(ER_XAER_DUPID, MYF(0)); - /* fall through */ - default: - xid_state->xid_cache_element= 0; } return res; } +static void xid_cache_delete(THD *thd, XID_cache_element *&element) +{ + DBUG_ASSERT(thd->xid_hash_pins); + element->mark_uninitialized(); + lf_hash_delete(&xid_cache, thd->xid_hash_pins, + element->xid.key(), element->xid.key_length()); +} + + void xid_cache_delete(THD *thd, XID_STATE *xid_state) { if (xid_state->xid_cache_element) { - bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED); - DBUG_ASSERT(thd->xid_hash_pins); - xid_state->xid_cache_element->mark_uninitialized(); - lf_hash_delete(&xid_cache, thd->xid_hash_pins, - xid_state->xid.key(), xid_state->xid.key_length()); - if (recovered) - my_free(xid_state); - else - { - xid_state->xid_cache_element= 0; - xid_state->xid.null(); - } + xid_cache_delete(thd, xid_state->xid_cache_element); + xid_state->xid_cache_element= 0; } } @@ -321,7 +323,7 @@ static my_bool xid_cache_iterate_callback(XID_cache_element *element, my_bool res= FALSE; if (element->lock()) { - res= arg->action(element->m_xid_state, arg->argument); + res= arg->action(element, arg->argument); element->unlock(); } return res; @@ -348,12 +350,11 @@ static int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg) @return TRUE if transaction was rolled back or if the transaction state is XA_ROLLBACK_ONLY. FALSE otherwise. */ -static bool xa_trans_rolled_back(XID_STATE *xid_state) +static bool xa_trans_rolled_back(XID_cache_element *element) { - DBUG_ASSERT(xid_state->is_explicit_XA()); - if (xid_state->xid_cache_element->rm_error) + if (element->rm_error) { - switch (xid_state->xid_cache_element->rm_error) { + switch (element->rm_error) { case ER_LOCK_WAIT_TIMEOUT: my_error(ER_XA_RBTIMEOUT, MYF(0)); break; @@ -363,10 +364,10 @@ static bool xa_trans_rolled_back(XID_STATE *xid_state) default: my_error(ER_XA_RBROLLBACK, MYF(0)); } - xid_state->xid_cache_element->xa_state= XA_ROLLBACK_ONLY; + element->xa_state= XA_ROLLBACK_ONLY; } - return xid_state->xid_cache_element->xa_state == XA_ROLLBACK_ONLY; + return element->xa_state == XA_ROLLBACK_ONLY; } @@ -404,7 +405,8 @@ bool trans_xa_start(THD *thd) thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME) { - bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid); + bool not_equal= + !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid); if (not_equal) my_error(ER_XAER_NOTA, MYF(0)); else @@ -421,11 +423,8 @@ bool trans_xa_start(THD *thd) my_error(ER_XAER_OUTSIDE, MYF(0)); else if (!trans_begin(thd)) { - DBUG_ASSERT(thd->transaction.xid_state.xid.is_null()); - thd->transaction.xid_state.xid.set(thd->lex->xid); - if (xid_cache_insert(thd, &thd->transaction.xid_state)) + if (xid_cache_insert(thd, &thd->transaction.xid_state, thd->lex->xid)) { - thd->transaction.xid_state.xid.null(); trans_rollback(thd); DBUG_RETURN(true); } @@ -455,9 +454,9 @@ bool trans_xa_end(THD *thd) else if (!thd->transaction.xid_state.is_explicit_XA() || thd->transaction.xid_state.xid_cache_element->xa_state != XA_ACTIVE) thd->transaction.xid_state.er_xaer_rmfail(); - else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + else if (!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) my_error(ER_XAER_NOTA, MYF(0)); - else if (!xa_trans_rolled_back(&thd->transaction.xid_state)) + else if (!xa_trans_rolled_back(thd->transaction.xid_state.xid_cache_element)) thd->transaction.xid_state.xid_cache_element->xa_state= XA_IDLE; DBUG_RETURN(thd->is_error() || @@ -481,7 +480,7 @@ bool trans_xa_prepare(THD *thd) if (!thd->transaction.xid_state.is_explicit_XA() || thd->transaction.xid_state.xid_cache_element->xa_state != XA_IDLE) thd->transaction.xid_state.er_xaer_rmfail(); - else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + else if (!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) my_error(ER_XAER_NOTA, MYF(0)); else if (ha_prepare(thd)) { @@ -510,7 +509,8 @@ bool trans_xa_commit(THD *thd) bool res= TRUE; DBUG_ENTER("trans_xa_commit"); - if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + if (!thd->transaction.xid_state.is_explicit_XA() || + !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) { if (thd->fix_xid_hash_pins()) { @@ -518,20 +518,18 @@ bool trans_xa_commit(THD *thd) DBUG_RETURN(TRUE); } - XID_STATE *xs= xid_cache_search(thd, thd->lex->xid); - res= !xs; - if (res) - my_error(ER_XAER_NOTA, MYF(0)); - else + if (auto xs= xid_cache_search(thd, thd->lex->xid)) { res= xa_trans_rolled_back(xs); ha_commit_or_rollback_by_xid(thd->lex->xid, !res); xid_cache_delete(thd, xs); } + else + my_error(ER_XAER_NOTA, MYF(0)); DBUG_RETURN(res); } - if (xa_trans_rolled_back(&thd->transaction.xid_state)) + if (xa_trans_rolled_back(thd->transaction.xid_state.xid_cache_element)) { xa_trans_force_rollback(thd); res= thd->is_error(); @@ -606,7 +604,8 @@ bool trans_xa_rollback(THD *thd) bool res= TRUE; DBUG_ENTER("trans_xa_rollback"); - if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + if (!thd->transaction.xid_state.is_explicit_XA() || + !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) { if (thd->fix_xid_hash_pins()) { @@ -614,15 +613,14 @@ bool trans_xa_rollback(THD *thd) DBUG_RETURN(TRUE); } - XID_STATE *xs= xid_cache_search(thd, thd->lex->xid); - if (!xs) - my_error(ER_XAER_NOTA, MYF(0)); - else + if (auto xs= xid_cache_search(thd, thd->lex->xid)) { xa_trans_rolled_back(xs); ha_commit_or_rollback_by_xid(thd->lex->xid, 0); xid_cache_delete(thd, xs); } + else + my_error(ER_XAER_NOTA, MYF(0)); DBUG_RETURN(thd->get_stmt_da()->is_error()); } @@ -755,10 +753,10 @@ static uint get_sql_xid(XID *xid, char *buf) It can be easily fixed later, if necessary. */ -static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol, +static my_bool xa_recover_callback(XID_cache_element *xs, Protocol *protocol, char *data, uint data_len, CHARSET_INFO *data_cs) { - if (xs->xid_cache_element->xa_state == XA_PREPARED) + if (xs->xa_state == XA_PREPARED) { protocol->prepare_for_resend(); protocol->store_longlong((longlong) xs->xid.formatID, FALSE); @@ -772,14 +770,16 @@ static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol, } -static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol) +static my_bool xa_recover_callback_short(XID_cache_element *xs, + Protocol *protocol) { return xa_recover_callback(xs, protocol, xs->xid.data, xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin); } -static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol) +static my_bool xa_recover_callback_verbose(XID_cache_element *xs, + Protocol *protocol) { char buf[SQL_XIDSIZE]; uint len= get_sql_xid(&xs->xid, buf); @@ -20,8 +20,6 @@ class XID_cache_element; struct XID_STATE { - /* For now, this is only used to catch duplicated external xids */ - XID xid; // transaction identifier XID_cache_element *xid_cache_element; bool check_has_uncommitted_xa() const; @@ -34,7 +32,7 @@ struct XID_STATE { void xid_cache_init(void); void xid_cache_free(void); bool xid_cache_insert(XID *xid); -bool xid_cache_insert(THD *thd, XID_STATE *xid_state); +bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid); void xid_cache_delete(THD *thd, XID_STATE *xid_state); bool trans_xa_start(THD *thd); diff --git a/storage/spider/spd_trx.cc b/storage/spider/spd_trx.cc index a41db0f2b31..a1f9bde6a2e 100644 --- a/storage/spider/spd_trx.cc +++ b/storage/spider/spd_trx.cc @@ -1708,7 +1708,8 @@ int spider_check_and_set_time_zone( } static int spider_xa_lock( - XID_STATE *xid_state + XID_STATE *xid_state, + XID *xid ) { THD *thd = current_thd; int error_num; @@ -1726,7 +1727,7 @@ static int spider_xa_lock( #endif old_proc_info = thd_proc_info(thd, "Locking xid by Spider"); #ifdef SPIDER_XID_USES_xid_cache_iterate - if (xid_cache_insert(thd, xid_state)) + if (xid_cache_insert(thd, xid_state, xid)) { error_num = (spider_stmt_da_sql_errno(thd) == ER_XAER_DUPID ? ER_SPIDER_XA_LOCKED_NUM : HA_ERR_OUT_OF_MEM); @@ -1948,11 +1949,10 @@ int spider_internal_start_trx( thd->server_id)); #endif - trx->internal_xid_state.xid.set(&trx->xid); #ifdef SPIDER_XID_STATE_HAS_in_thd trx->internal_xid_state.in_thd = 1; #endif - if ((error_num = spider_xa_lock(&trx->internal_xid_state))) + if ((error_num = spider_xa_lock(&trx->internal_xid_state, &trx->xid))) { if (error_num == ER_SPIDER_XA_LOCKED_NUM) my_message(error_num, ER_SPIDER_XA_LOCKED_STR, MYF(0)); |