diff options
-rw-r--r-- | libmysqld/CMakeLists.txt | 1 | ||||
-rw-r--r-- | sql/CMakeLists.txt | 2 | ||||
-rw-r--r-- | sql/handler.cc | 180 | ||||
-rw-r--r-- | sql/handler.h | 10 | ||||
-rw-r--r-- | sql/sql_class.cc | 257 | ||||
-rw-r--r-- | sql/sql_class.h | 45 | ||||
-rw-r--r-- | sql/sql_parse.cc | 4 | ||||
-rw-r--r-- | sql/transaction.cc | 332 | ||||
-rw-r--r-- | sql/transaction.h | 8 | ||||
-rw-r--r-- | sql/xa.cc | 797 | ||||
-rw-r--r-- | sql/xa.h | 67 |
11 files changed, 872 insertions, 831 deletions
diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index 6bd3a59dd2f..4d9b07195ba 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -126,6 +126,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ../sql/rowid_filter.cc ../sql/rowid_filter.h ../sql/item_vers.cc ../sql/opt_trace.cc + ../sql/xa.cc ${GEN_SOURCES} ${MYSYS_LIBWRAP_SOURCE} ) diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index da4de86d950..2879970e9ff 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -143,7 +143,7 @@ SET (SQL_SOURCE opt_trace.cc ${WSREP_SOURCES} table_cache.cc encryption.cc temporary_tables.cc - proxy_protocol.cc backup.cc + proxy_protocol.cc backup.cc xa.cc ${CMAKE_CURRENT_BINARY_DIR}/sql_builtin.cc ${CMAKE_CURRENT_BINARY_DIR}/sql_yacc.cc ${CMAKE_CURRENT_BINARY_DIR}/sql_yacc_ora.cc diff --git a/sql/handler.cc b/sql/handler.cc index 917a5a56c6d..777a64b2c67 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2221,186 +2221,6 @@ int ha_recover(HASH *commit_list) DBUG_RETURN(0); } -/** - return the XID as it appears in the SQL function's arguments. - So this string can be passed to XA START, XA PREPARE etc... - - @note - the 'buf' has to have space for at least SQL_XIDSIZE bytes. -*/ - - -/* - 'a'..'z' 'A'..'Z', '0'..'9' - and '-' '_' ' ' symbols don't have to be - converted. -*/ - -static const char xid_needs_conv[128]= -{ - 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, - 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, - 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1, - 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1, - 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, - 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0, - 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, - 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1 -}; - -uint get_sql_xid(XID *xid, char *buf) -{ - int tot_len= xid->gtrid_length + xid->bqual_length; - int i; - const char *orig_buf= buf; - - for (i=0; i<tot_len; i++) - { - uchar c= ((uchar *) xid->data)[i]; - if (c >= 128 || xid_needs_conv[c]) - break; - } - - if (i >= tot_len) - { - /* No need to convert characters to hexadecimals. */ - *buf++= '\''; - memcpy(buf, xid->data, xid->gtrid_length); - buf+= xid->gtrid_length; - *buf++= '\''; - if (xid->bqual_length > 0 || xid->formatID != 1) - { - *buf++= ','; - *buf++= '\''; - memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length); - buf+= xid->bqual_length; - *buf++= '\''; - } - } - else - { - *buf++= 'X'; - *buf++= '\''; - for (i= 0; i < xid->gtrid_length; i++) - { - *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4]; - *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f]; - } - *buf++= '\''; - if (xid->bqual_length > 0 || xid->formatID != 1) - { - *buf++= ','; - *buf++= 'X'; - *buf++= '\''; - for (; i < tot_len; i++) - { - *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4]; - *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f]; - } - *buf++= '\''; - } - } - - if (xid->formatID != 1) - { - *buf++= ','; - buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf, - MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID); - } - - return (uint)(buf - orig_buf); -} - - -/** - return the list of XID's to a client, the same way SHOW commands do. - - @note - I didn't find in XA specs that an RM cannot return the same XID twice, - so mysql_xa_recover does not filter XID's to ensure uniqueness. - It can be easily fixed later, if necessary. -*/ - -static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol, - char *data, uint data_len, CHARSET_INFO *data_cs) -{ - if (xs->xa_state == XA_PREPARED) - { - protocol->prepare_for_resend(); - protocol->store_longlong((longlong) xs->xid.formatID, FALSE); - protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE); - protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE); - protocol->store(data, data_len, data_cs); - if (protocol->write()) - return TRUE; - } - return FALSE; -} - - -static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol) -{ - return xa_recover_callback(xs, protocol, xs->xid.data, - xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin); -} - - -static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol) -{ - char buf[SQL_XIDSIZE]; - uint len= get_sql_xid(&xs->xid, buf); - return xa_recover_callback(xs, protocol, buf, len, - &my_charset_utf8_general_ci); -} - - -bool mysql_xa_recover(THD *thd) -{ - List<Item> field_list; - Protocol *protocol= thd->protocol; - MEM_ROOT *mem_root= thd->mem_root; - my_hash_walk_action action; - DBUG_ENTER("mysql_xa_recover"); - - field_list.push_back(new (mem_root) - Item_int(thd, "formatID", 0, - MY_INT32_NUM_DECIMAL_DIGITS), mem_root); - field_list.push_back(new (mem_root) - Item_int(thd, "gtrid_length", 0, - MY_INT32_NUM_DECIMAL_DIGITS), mem_root); - field_list.push_back(new (mem_root) - Item_int(thd, "bqual_length", 0, - MY_INT32_NUM_DECIMAL_DIGITS), mem_root); - { - uint len; - CHARSET_INFO *cs; - - if (thd->lex->verbose) - { - len= SQL_XIDSIZE; - cs= &my_charset_utf8_general_ci; - action= (my_hash_walk_action) xa_recover_callback_verbose; - } - else - { - len= XIDDATASIZE; - cs= &my_charset_bin; - action= (my_hash_walk_action) xa_recover_callback_short; - } - - field_list.push_back(new (mem_root) - Item_empty_string(thd, "data", len, cs), mem_root); - } - - if (protocol->send_result_set_metadata(&field_list, - Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) - DBUG_RETURN(1); - - if (xid_cache_iterate(thd, action, protocol)) - DBUG_RETURN(1); - my_eof(thd); - DBUG_RETURN(0); -} /* Called by engine to notify TC that a new commit checkpoint has been reached. diff --git a/sql/handler.h b/sql/handler.h index 676632113c3..fb6862e4ce1 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -892,15 +892,6 @@ struct xid_t { }; typedef struct xid_t XID; -/* - The size of XID string representation in the form - 'gtrid', 'bqual', formatID - see xid_t::get_sql_string() for details. -*/ -#define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS) -/* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */ -uint get_sql_xid(XID *xid, char *buf); - /* for recover() handlerton call */ #define MIN_XID_LIST_SIZE 128 #define MAX_XID_LIST_SIZE (1024*128) @@ -4977,7 +4968,6 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht); const char *get_canonical_filename(handler *file, const char *path, char *tmp_path); -bool mysql_xa_recover(THD *thd); void commit_checkpoint_notify_ha(handlerton *hton, void *cookie); inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRING *name) diff --git a/sql/sql_class.cc b/sql/sql_class.cc index c0c89ee59b3..c77c8a1a05a 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -5641,263 +5641,6 @@ void THD::mark_transaction_to_rollback(bool all) is_fatal_sub_stmt_error= true; transaction_rollback_request= all; } -/*************************************************************************** - Handling of XA id cacheing -***************************************************************************/ -class XID_cache_element -{ - /* - m_state is used to prevent elements from being deleted while XA RECOVER - iterates xid cache and to prevent recovered elments from being acquired by - multiple threads. - - bits 1..29 are reference counter - bit 30 is RECOVERED flag - bit 31 is ACQUIRED flag (thread owns this xid) - bit 32 is unused - - Newly allocated and deleted elements have m_state set to 0. - - On lock() m_state is atomically incremented. It also creates load-ACQUIRE - memory barrier to make sure m_state is actually updated before furhter - memory accesses. Attempting to lock an element that has neither ACQUIRED - nor RECOVERED flag set returns failure and further accesses to element - memory are forbidden. - - On unlock() m_state is decremented. It also creates store-RELEASE memory - barrier to make sure m_state is actually updated after preceding memory - accesses. - - ACQUIRED flag is set when thread registers it's xid or when thread acquires - recovered xid. - - RECOVERED flag is set for elements found during crash recovery. - - ACQUIRED and RECOVERED flags are cleared before element is deleted from - hash in a spin loop, after last reference is released. - */ - std::atomic<int32_t> m_state; -public: - static const int32 ACQUIRED= 1 << 30; - static const int32 RECOVERED= 1 << 29; - XID_STATE *m_xid_state; - bool is_set(int32_t flag) - { return m_state.load(std::memory_order_relaxed) & flag; } - void set(int32_t flag) - { - DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED)); - m_state.fetch_add(flag, std::memory_order_relaxed); - } - bool lock() - { - int32_t old= m_state.fetch_add(1, std::memory_order_acquire); - if (old & (ACQUIRED | RECOVERED)) - return true; - unlock(); - return false; - } - void unlock() - { m_state.fetch_sub(1, std::memory_order_release); } - void mark_uninitialized() - { - int32_t old= ACQUIRED; - while (!m_state.compare_exchange_weak(old, 0, - std::memory_order_relaxed, - std::memory_order_relaxed)) - { - old&= ACQUIRED | RECOVERED; - (void) LF_BACKOFF(); - } - } - bool acquire_recovered() - { - int32_t old= RECOVERED; - while (!m_state.compare_exchange_weak(old, ACQUIRED | RECOVERED, - std::memory_order_relaxed, - std::memory_order_relaxed)) - { - if (!(old & RECOVERED) || (old & ACQUIRED)) - return false; - old= RECOVERED; - (void) LF_BACKOFF(); - } - return true; - } - static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)), - XID_cache_element *element, - XID_STATE *xid_state) - { - DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED)); - element->m_xid_state= xid_state; - xid_state->xid_cache_element= element; - } - static void lf_alloc_constructor(uchar *ptr) - { - XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); - element->m_state= 0; - } - static void lf_alloc_destructor(uchar *ptr) - { - XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); - DBUG_ASSERT(!element->is_set(ACQUIRED)); - if (element->is_set(RECOVERED)) - my_free(element->m_xid_state); - } - static uchar *key(const XID_cache_element *element, size_t *length, - my_bool not_used __attribute__((unused))) - { - *length= element->m_xid_state->xid.key_length(); - return element->m_xid_state->xid.key(); - } -}; - - -static LF_HASH xid_cache; -static bool xid_cache_inited; - - -bool THD::fix_xid_hash_pins() -{ - if (!xid_hash_pins) - xid_hash_pins= lf_hash_get_pins(&xid_cache); - return !xid_hash_pins; -} - - -void xid_cache_init() -{ - xid_cache_inited= true; - lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0, - (my_hash_get_key) XID_cache_element::key, &my_charset_bin); - xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor; - xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor; - xid_cache.initializer= - (lf_hash_initializer) XID_cache_element::lf_hash_initializer; -} - - -void xid_cache_free() -{ - if (xid_cache_inited) - { - lf_hash_destroy(&xid_cache); - xid_cache_inited= false; - } -} - - -/** - Find recovered XA transaction by XID. -*/ - -XID_STATE *xid_cache_search(THD *thd, XID *xid) -{ - XID_STATE *xs= 0; - DBUG_ASSERT(thd->xid_hash_pins); - XID_cache_element *element= - (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins, - xid->key(), xid->key_length()); - if (element) - { - if (element->acquire_recovered()) - xs= element->m_xid_state; - lf_hash_search_unpin(thd->xid_hash_pins); - DEBUG_SYNC(thd, "xa_after_search"); - } - return xs; -} - - -bool xid_cache_insert(XID *xid, enum xa_states xa_state) -{ - XID_STATE *xs; - LF_PINS *pins; - int res= 1; - - if (!(pins= lf_hash_get_pins(&xid_cache))) - return true; - - if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME)))) - { - xs->xa_state=xa_state; - xs->xid.set(xid); - xs->rm_error=0; - - if ((res= lf_hash_insert(&xid_cache, pins, xs))) - my_free(xs); - else - xs->xid_cache_element->set(XID_cache_element::RECOVERED); - if (res == 1) - res= 0; - } - lf_hash_put_pins(pins); - return res; -} - - -bool xid_cache_insert(THD *thd, XID_STATE *xid_state) -{ - if (thd->fix_xid_hash_pins()) - return true; - - int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state); - switch (res) - { - case 0: - xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED); - break; - case 1: - my_error(ER_XAER_DUPID, MYF(0)); - /* fall through */ - default: - xid_state->xid_cache_element= 0; - } - return res; -} - - -void xid_cache_delete(THD *thd, XID_STATE *xid_state) -{ - if (xid_state->xid_cache_element) - { - bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED); - DBUG_ASSERT(thd->xid_hash_pins); - xid_state->xid_cache_element->mark_uninitialized(); - lf_hash_delete(&xid_cache, thd->xid_hash_pins, - xid_state->xid.key(), xid_state->xid.key_length()); - xid_state->xid_cache_element= 0; - if (recovered) - my_free(xid_state); - } -} - - -struct xid_cache_iterate_arg -{ - my_hash_walk_action action; - void *argument; -}; - -static my_bool xid_cache_iterate_callback(XID_cache_element *element, - xid_cache_iterate_arg *arg) -{ - my_bool res= FALSE; - if (element->lock()) - { - res= arg->action(element->m_xid_state, arg->argument); - element->unlock(); - } - return res; -} - -int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg) -{ - xid_cache_iterate_arg argument= { action, arg }; - return thd->fix_xid_hash_pins() ? -1 : - lf_hash_iterate(&xid_cache, thd->xid_hash_pins, - (my_hash_walk_action) xid_cache_iterate_callback, - &argument); -} /** diff --git a/sql/sql_class.h b/sql/sql_class.h index f8dd9f78500..c68628be65d 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -47,6 +47,7 @@ #include <mysql_com_server.h> #include "session_tracker.h" #include "backup.h" +#include "xa.h" extern "C" void set_thd_stage_info(void *thd, @@ -1276,50 +1277,6 @@ struct st_savepoint { MDL_savepoint mdl_savepoint; }; -enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY}; -extern const char *xa_state_names[]; -class XID_cache_element; - -typedef struct st_xid_state { - /* For now, this is only used to catch duplicated external xids */ - XID xid; // transaction identifier - enum xa_states xa_state; // used by external XA only - /* Error reported by the Resource Manager (RM) to the Transaction Manager. */ - uint rm_error; - XID_cache_element *xid_cache_element; - - /** - Check that XA transaction has an uncommitted work. Report an error - to the user in case when there is an uncommitted work for XA transaction. - - @return result of check - @retval false XA transaction is NOT in state IDLE, PREPARED - or ROLLBACK_ONLY. - @retval true XA transaction is in state IDLE or PREPARED - or ROLLBACK_ONLY. - */ - - bool check_has_uncommitted_xa() const - { - if (xa_state == XA_IDLE || - xa_state == XA_PREPARED || - xa_state == XA_ROLLBACK_ONLY) - { - my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); - return true; - } - return false; - } -} XID_STATE; - -void xid_cache_init(void); -void xid_cache_free(void); -XID_STATE *xid_cache_search(THD *thd, XID *xid); -bool xid_cache_insert(XID *xid, enum xa_states xa_state); -bool xid_cache_insert(THD *thd, XID_STATE *xid_state); -void xid_cache_delete(THD *thd, XID_STATE *xid_state); -int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *argument); - /** @class Security_context @brief A set of THD members describing the current authenticated user. diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index d5d721e0ede..0a476f47988 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -395,10 +395,6 @@ const LEX_CSTRING command_name[257]={ { STRING_WITH_LEN("Error") } // Last command number 255 }; -const char *xa_state_names[]={ - "NON-EXISTING", "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY" -}; - #ifdef HAVE_REPLICATION /** Returns true if all tables should be ignored. diff --git a/sql/transaction.cc b/sql/transaction.cc index 4d61d2a120d..15c45425528 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -28,21 +28,19 @@ #include "wsrep_trans_observer.h" #endif /* WITH_WSREP */ -#ifndef EMBEDDED_LIBRARY /** Helper: Tell tracker (if any) that transaction ended. */ -static void trans_track_end_trx(THD *thd) +void trans_track_end_trx(THD *thd) { +#ifndef EMBEDDED_LIBRARY if (thd->variables.session_track_transaction_info > TX_TRACK_NONE) { ((Transaction_state_tracker *) thd->session_tracker.get_tracker(TRANSACTION_INFO_TRACKER))->end_trx(thd); } -} -#else -#define trans_track_end_trx(A) do{}while(0) #endif //EMBEDDED_LIBRARY +} /** @@ -89,65 +87,6 @@ static bool trans_check(THD *thd) /** - Mark a XA transaction as rollback-only if the RM unilaterally - rolled back the transaction branch. - - @note If a rollback was requested by the RM, this function sets - the appropriate rollback error code and transits the state - to XA_ROLLBACK_ONLY. - - @return TRUE if transaction was rolled back or if the transaction - state is XA_ROLLBACK_ONLY. FALSE otherwise. -*/ -static bool xa_trans_rolled_back(XID_STATE *xid_state) -{ - if (xid_state->rm_error) - { - switch (xid_state->rm_error) { - case ER_LOCK_WAIT_TIMEOUT: - my_error(ER_XA_RBTIMEOUT, MYF(0)); - break; - case ER_LOCK_DEADLOCK: - my_error(ER_XA_RBDEADLOCK, MYF(0)); - break; - default: - my_error(ER_XA_RBROLLBACK, MYF(0)); - } - xid_state->xa_state= XA_ROLLBACK_ONLY; - } - - return (xid_state->xa_state == XA_ROLLBACK_ONLY); -} - - -/** - Rollback the active XA transaction. - - @note Resets rm_error before calling ha_rollback(), so - the thd->transaction.xid structure gets reset - by ha_rollback() / THD::transaction::cleanup(). - - @return TRUE if the rollback failed, FALSE otherwise. -*/ - -static bool xa_trans_force_rollback(THD *thd) -{ - /* - We must reset rm_error before calling ha_rollback(), - so thd->transaction.xid structure gets reset - by ha_rollback()/THD::transaction::cleanup(). - */ - thd->transaction.xid_state.rm_error= 0; - if (ha_rollback_trans(thd, true)) - { - my_error(ER_XAER_RMERR, MYF(0)); - return true; - } - return false; -} - - -/** Begin a new transaction. @note Beginning a transaction implicitly commits any current @@ -777,268 +716,3 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) DBUG_RETURN(MY_TEST(res)); } - - -/** - Starts an XA transaction with the given xid value. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_start(THD *thd) -{ - enum xa_states xa_state= thd->transaction.xid_state.xa_state; - DBUG_ENTER("trans_xa_start"); - - if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME) - { - bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid); - if (not_equal) - my_error(ER_XAER_NOTA, MYF(0)); - else - thd->transaction.xid_state.xa_state= XA_ACTIVE; - DBUG_RETURN(not_equal); - } - - /* TODO: JOIN is not supported yet. */ - if (thd->lex->xa_opt != XA_NONE) - my_error(ER_XAER_INVAL, MYF(0)); - else if (xa_state != XA_NOTR) - my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); - else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction()) - my_error(ER_XAER_OUTSIDE, MYF(0)); - else if (!trans_begin(thd)) - { - DBUG_ASSERT(thd->transaction.xid_state.xid.is_null()); - thd->transaction.xid_state.xa_state= XA_ACTIVE; - thd->transaction.xid_state.rm_error= 0; - thd->transaction.xid_state.xid.set(thd->lex->xid); - if (xid_cache_insert(thd, &thd->transaction.xid_state)) - { - thd->transaction.xid_state.xa_state= XA_NOTR; - thd->transaction.xid_state.xid.null(); - trans_rollback(thd); - DBUG_RETURN(true); - } - DBUG_RETURN(FALSE); - } - - DBUG_RETURN(TRUE); -} - - -/** - Put a XA transaction in the IDLE state. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_end(THD *thd) -{ - DBUG_ENTER("trans_xa_end"); - - /* TODO: SUSPEND and FOR MIGRATE are not supported yet. */ - if (thd->lex->xa_opt != XA_NONE) - my_error(ER_XAER_INVAL, MYF(0)); - else if (thd->transaction.xid_state.xa_state != XA_ACTIVE) - my_error(ER_XAER_RMFAIL, MYF(0), - xa_state_names[thd->transaction.xid_state.xa_state]); - else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) - my_error(ER_XAER_NOTA, MYF(0)); - else if (!xa_trans_rolled_back(&thd->transaction.xid_state)) - thd->transaction.xid_state.xa_state= XA_IDLE; - - DBUG_RETURN(thd->is_error() || - thd->transaction.xid_state.xa_state != XA_IDLE); -} - - -/** - Put a XA transaction in the PREPARED state. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_prepare(THD *thd) -{ - DBUG_ENTER("trans_xa_prepare"); - - if (thd->transaction.xid_state.xa_state != XA_IDLE) - my_error(ER_XAER_RMFAIL, MYF(0), - xa_state_names[thd->transaction.xid_state.xa_state]); - else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) - my_error(ER_XAER_NOTA, MYF(0)); - else if (ha_prepare(thd)) - { - xid_cache_delete(thd, &thd->transaction.xid_state); - thd->transaction.xid_state.xa_state= XA_NOTR; - my_error(ER_XA_RBROLLBACK, MYF(0)); - } - else - thd->transaction.xid_state.xa_state= XA_PREPARED; - - DBUG_RETURN(thd->is_error() || - thd->transaction.xid_state.xa_state != XA_PREPARED); -} - - -/** - Commit and terminate the a XA transaction. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_commit(THD *thd) -{ - bool res= TRUE; - enum xa_states xa_state= thd->transaction.xid_state.xa_state; - DBUG_ENTER("trans_xa_commit"); - - if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) - { - if (thd->fix_xid_hash_pins()) - { - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - DBUG_RETURN(TRUE); - } - - XID_STATE *xs= xid_cache_search(thd, thd->lex->xid); - res= !xs; - if (res) - my_error(ER_XAER_NOTA, MYF(0)); - else - { - res= xa_trans_rolled_back(xs); - ha_commit_or_rollback_by_xid(thd->lex->xid, !res); - xid_cache_delete(thd, xs); - } - DBUG_RETURN(res); - } - - if (xa_trans_rolled_back(&thd->transaction.xid_state)) - { - xa_trans_force_rollback(thd); - res= thd->is_error(); - } - else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE) - { - int r= ha_commit_trans(thd, TRUE); - if ((res= MY_TEST(r))) - my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0)); - } - else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE) - { - MDL_request mdl_request; - - /* - Acquire metadata lock which will ensure that COMMIT is blocked - by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in - progress blocks FTWRL). - - We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does. - */ - mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, - MDL_TRANSACTION); - - if (thd->mdl_context.acquire_lock(&mdl_request, - thd->variables.lock_wait_timeout)) - { - ha_rollback_trans(thd, TRUE); - my_error(ER_XAER_RMERR, MYF(0)); - } - else - { - DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock"); - - res= MY_TEST(ha_commit_one_phase(thd, 1)); - if (res) - my_error(ER_XAER_RMERR, MYF(0)); - } - } - else - { - my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); - DBUG_RETURN(TRUE); - } - - thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); - thd->transaction.all.reset(); - thd->server_status&= - ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); - DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); - xid_cache_delete(thd, &thd->transaction.xid_state); - thd->transaction.xid_state.xa_state= XA_NOTR; - - trans_track_end_trx(thd); - - DBUG_RETURN(res); -} - - -/** - Roll back and terminate a XA transaction. - - @param thd Current thread - - @retval FALSE Success - @retval TRUE Failure -*/ - -bool trans_xa_rollback(THD *thd) -{ - bool res= TRUE; - enum xa_states xa_state= thd->transaction.xid_state.xa_state; - DBUG_ENTER("trans_xa_rollback"); - - if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) - { - if (thd->fix_xid_hash_pins()) - { - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - DBUG_RETURN(TRUE); - } - - XID_STATE *xs= xid_cache_search(thd, thd->lex->xid); - if (!xs) - my_error(ER_XAER_NOTA, MYF(0)); - else - { - xa_trans_rolled_back(xs); - ha_commit_or_rollback_by_xid(thd->lex->xid, 0); - xid_cache_delete(thd, xs); - } - DBUG_RETURN(thd->get_stmt_da()->is_error()); - } - - if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY) - { - my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); - DBUG_RETURN(TRUE); - } - - res= xa_trans_force_rollback(thd); - - thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); - thd->transaction.all.reset(); - thd->server_status&= - ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); - DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); - xid_cache_delete(thd, &thd->transaction.xid_state); - thd->transaction.xid_state.xa_state= XA_NOTR; - - trans_track_end_trx(thd); - - DBUG_RETURN(res); -} diff --git a/sql/transaction.h b/sql/transaction.h index 7e34693a2eb..5eaa2b00027 100644 --- a/sql/transaction.h +++ b/sql/transaction.h @@ -24,6 +24,8 @@ class THD; +void trans_track_end_trx(THD *thd); + bool trans_begin(THD *thd, uint flags= 0); bool trans_commit(THD *thd); bool trans_commit_implicit(THD *thd); @@ -37,12 +39,6 @@ bool trans_savepoint(THD *thd, LEX_CSTRING name); bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name); bool trans_release_savepoint(THD *thd, LEX_CSTRING name); -bool trans_xa_start(THD *thd); -bool trans_xa_end(THD *thd); -bool trans_xa_prepare(THD *thd); -bool trans_xa_commit(THD *thd); -bool trans_xa_rollback(THD *thd); - void trans_reset_one_shot_chistics(THD *thd); #endif /* TRANSACTION_H */ diff --git a/sql/xa.cc b/sql/xa.cc new file mode 100644 index 00000000000..4de6b101e38 --- /dev/null +++ b/sql/xa.cc @@ -0,0 +1,797 @@ +/* + Copyright (c) 2000, 2016, Oracle and/or its affiliates. + Copyright (c) 2009, 2019, MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +*/ + +#include "mariadb.h" +#include "sql_class.h" +#include "transaction.h" + + +/*************************************************************************** + Handling of XA id cacheing +***************************************************************************/ +class XID_cache_element +{ + /* + m_state is used to prevent elements from being deleted while XA RECOVER + iterates xid cache and to prevent recovered elments from being acquired by + multiple threads. + + bits 1..29 are reference counter + bit 30 is RECOVERED flag + bit 31 is ACQUIRED flag (thread owns this xid) + bit 32 is unused + + Newly allocated and deleted elements have m_state set to 0. + + On lock() m_state is atomically incremented. It also creates load-ACQUIRE + memory barrier to make sure m_state is actually updated before furhter + memory accesses. Attempting to lock an element that has neither ACQUIRED + nor RECOVERED flag set returns failure and further accesses to element + memory are forbidden. + + On unlock() m_state is decremented. It also creates store-RELEASE memory + barrier to make sure m_state is actually updated after preceding memory + accesses. + + ACQUIRED flag is set when thread registers it's xid or when thread acquires + recovered xid. + + RECOVERED flag is set for elements found during crash recovery. + + ACQUIRED and RECOVERED flags are cleared before element is deleted from + hash in a spin loop, after last reference is released. + */ + std::atomic<int32_t> m_state; +public: + static const int32 ACQUIRED= 1 << 30; + static const int32 RECOVERED= 1 << 29; + XID_STATE *m_xid_state; + bool is_set(int32_t flag) + { return m_state.load(std::memory_order_relaxed) & flag; } + void set(int32_t flag) + { + DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED)); + m_state.fetch_add(flag, std::memory_order_relaxed); + } + bool lock() + { + int32_t old= m_state.fetch_add(1, std::memory_order_acquire); + if (old & (ACQUIRED | RECOVERED)) + return true; + unlock(); + return false; + } + void unlock() + { m_state.fetch_sub(1, std::memory_order_release); } + void mark_uninitialized() + { + int32_t old= ACQUIRED; + while (!m_state.compare_exchange_weak(old, 0, + std::memory_order_relaxed, + std::memory_order_relaxed)) + { + old&= ACQUIRED | RECOVERED; + (void) LF_BACKOFF(); + } + } + bool acquire_recovered() + { + int32_t old= RECOVERED; + while (!m_state.compare_exchange_weak(old, ACQUIRED | RECOVERED, + std::memory_order_relaxed, + std::memory_order_relaxed)) + { + if (!(old & RECOVERED) || (old & ACQUIRED)) + return false; + old= RECOVERED; + (void) LF_BACKOFF(); + } + return true; + } + static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)), + XID_cache_element *element, + XID_STATE *xid_state) + { + DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED)); + element->m_xid_state= xid_state; + xid_state->xid_cache_element= element; + } + static void lf_alloc_constructor(uchar *ptr) + { + XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); + element->m_state= 0; + } + static void lf_alloc_destructor(uchar *ptr) + { + XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD); + DBUG_ASSERT(!element->is_set(ACQUIRED)); + if (element->is_set(RECOVERED)) + my_free(element->m_xid_state); + } + static uchar *key(const XID_cache_element *element, size_t *length, + my_bool not_used __attribute__((unused))) + { + *length= element->m_xid_state->xid.key_length(); + return element->m_xid_state->xid.key(); + } +}; + + +static LF_HASH xid_cache; +static bool xid_cache_inited; +const char *xa_state_names[]= { + "NON-EXISTING", "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY" +}; + + +bool THD::fix_xid_hash_pins() +{ + if (!xid_hash_pins) + xid_hash_pins= lf_hash_get_pins(&xid_cache); + return !xid_hash_pins; +} + + +void xid_cache_init() +{ + xid_cache_inited= true; + lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0, + (my_hash_get_key) XID_cache_element::key, &my_charset_bin); + xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor; + xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor; + xid_cache.initializer= + (lf_hash_initializer) XID_cache_element::lf_hash_initializer; +} + + +void xid_cache_free() +{ + if (xid_cache_inited) + { + lf_hash_destroy(&xid_cache); + xid_cache_inited= false; + } +} + + +/** + Find recovered XA transaction by XID. +*/ + +static XID_STATE *xid_cache_search(THD *thd, XID *xid) +{ + XID_STATE *xs= 0; + DBUG_ASSERT(thd->xid_hash_pins); + XID_cache_element *element= + (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins, + xid->key(), xid->key_length()); + if (element) + { + if (element->acquire_recovered()) + xs= element->m_xid_state; + lf_hash_search_unpin(thd->xid_hash_pins); + DEBUG_SYNC(thd, "xa_after_search"); + } + return xs; +} + + +bool xid_cache_insert(XID *xid, enum xa_states xa_state) +{ + XID_STATE *xs; + LF_PINS *pins; + int res= 1; + + if (!(pins= lf_hash_get_pins(&xid_cache))) + return true; + + if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME)))) + { + xs->xa_state=xa_state; + xs->xid.set(xid); + xs->rm_error=0; + + if ((res= lf_hash_insert(&xid_cache, pins, xs))) + my_free(xs); + else + xs->xid_cache_element->set(XID_cache_element::RECOVERED); + if (res == 1) + res= 0; + } + lf_hash_put_pins(pins); + return res; +} + + +bool xid_cache_insert(THD *thd, XID_STATE *xid_state) +{ + if (thd->fix_xid_hash_pins()) + return true; + + int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state); + switch (res) + { + case 0: + xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED); + break; + case 1: + my_error(ER_XAER_DUPID, MYF(0)); + /* fall through */ + default: + xid_state->xid_cache_element= 0; + } + return res; +} + + +void xid_cache_delete(THD *thd, XID_STATE *xid_state) +{ + if (xid_state->xid_cache_element) + { + bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED); + DBUG_ASSERT(thd->xid_hash_pins); + xid_state->xid_cache_element->mark_uninitialized(); + lf_hash_delete(&xid_cache, thd->xid_hash_pins, + xid_state->xid.key(), xid_state->xid.key_length()); + xid_state->xid_cache_element= 0; + if (recovered) + my_free(xid_state); + } +} + + +struct xid_cache_iterate_arg +{ + my_hash_walk_action action; + void *argument; +}; + +static my_bool xid_cache_iterate_callback(XID_cache_element *element, + xid_cache_iterate_arg *arg) +{ + my_bool res= FALSE; + if (element->lock()) + { + res= arg->action(element->m_xid_state, arg->argument); + element->unlock(); + } + return res; +} + +static int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg) +{ + xid_cache_iterate_arg argument= { action, arg }; + return thd->fix_xid_hash_pins() ? -1 : + lf_hash_iterate(&xid_cache, thd->xid_hash_pins, + (my_hash_walk_action) xid_cache_iterate_callback, + &argument); +} + + +/** + Mark a XA transaction as rollback-only if the RM unilaterally + rolled back the transaction branch. + + @note If a rollback was requested by the RM, this function sets + the appropriate rollback error code and transits the state + to XA_ROLLBACK_ONLY. + + @return TRUE if transaction was rolled back or if the transaction + state is XA_ROLLBACK_ONLY. FALSE otherwise. +*/ +static bool xa_trans_rolled_back(XID_STATE *xid_state) +{ + if (xid_state->rm_error) + { + switch (xid_state->rm_error) { + case ER_LOCK_WAIT_TIMEOUT: + my_error(ER_XA_RBTIMEOUT, MYF(0)); + break; + case ER_LOCK_DEADLOCK: + my_error(ER_XA_RBDEADLOCK, MYF(0)); + break; + default: + my_error(ER_XA_RBROLLBACK, MYF(0)); + } + xid_state->xa_state= XA_ROLLBACK_ONLY; + } + + return (xid_state->xa_state == XA_ROLLBACK_ONLY); +} + + +/** + Rollback the active XA transaction. + + @note Resets rm_error before calling ha_rollback(), so + the thd->transaction.xid structure gets reset + by ha_rollback() / THD::transaction::cleanup(). + + @return TRUE if the rollback failed, FALSE otherwise. +*/ + +static bool xa_trans_force_rollback(THD *thd) +{ + /* + We must reset rm_error before calling ha_rollback(), + so thd->transaction.xid structure gets reset + by ha_rollback()/THD::transaction::cleanup(). + */ + thd->transaction.xid_state.rm_error= 0; + if (ha_rollback_trans(thd, true)) + { + my_error(ER_XAER_RMERR, MYF(0)); + return true; + } + return false; +} + + +/** + Starts an XA transaction with the given xid value. + + @param thd Current thread + + @retval FALSE Success + @retval TRUE Failure +*/ + +bool trans_xa_start(THD *thd) +{ + enum xa_states xa_state= thd->transaction.xid_state.xa_state; + DBUG_ENTER("trans_xa_start"); + + if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME) + { + bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid); + if (not_equal) + my_error(ER_XAER_NOTA, MYF(0)); + else + thd->transaction.xid_state.xa_state= XA_ACTIVE; + DBUG_RETURN(not_equal); + } + + /* TODO: JOIN is not supported yet. */ + if (thd->lex->xa_opt != XA_NONE) + my_error(ER_XAER_INVAL, MYF(0)); + else if (xa_state != XA_NOTR) + my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); + else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction()) + my_error(ER_XAER_OUTSIDE, MYF(0)); + else if (!trans_begin(thd)) + { + DBUG_ASSERT(thd->transaction.xid_state.xid.is_null()); + thd->transaction.xid_state.xa_state= XA_ACTIVE; + thd->transaction.xid_state.rm_error= 0; + thd->transaction.xid_state.xid.set(thd->lex->xid); + if (xid_cache_insert(thd, &thd->transaction.xid_state)) + { + thd->transaction.xid_state.xa_state= XA_NOTR; + thd->transaction.xid_state.xid.null(); + trans_rollback(thd); + DBUG_RETURN(true); + } + DBUG_RETURN(FALSE); + } + + DBUG_RETURN(TRUE); +} + + +/** + Put a XA transaction in the IDLE state. + + @param thd Current thread + + @retval FALSE Success + @retval TRUE Failure +*/ + +bool trans_xa_end(THD *thd) +{ + DBUG_ENTER("trans_xa_end"); + + /* TODO: SUSPEND and FOR MIGRATE are not supported yet. */ + if (thd->lex->xa_opt != XA_NONE) + my_error(ER_XAER_INVAL, MYF(0)); + else if (thd->transaction.xid_state.xa_state != XA_ACTIVE) + my_error(ER_XAER_RMFAIL, MYF(0), + xa_state_names[thd->transaction.xid_state.xa_state]); + else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + my_error(ER_XAER_NOTA, MYF(0)); + else if (!xa_trans_rolled_back(&thd->transaction.xid_state)) + thd->transaction.xid_state.xa_state= XA_IDLE; + + DBUG_RETURN(thd->is_error() || + thd->transaction.xid_state.xa_state != XA_IDLE); +} + + +/** + Put a XA transaction in the PREPARED state. + + @param thd Current thread + + @retval FALSE Success + @retval TRUE Failure +*/ + +bool trans_xa_prepare(THD *thd) +{ + DBUG_ENTER("trans_xa_prepare"); + + if (thd->transaction.xid_state.xa_state != XA_IDLE) + my_error(ER_XAER_RMFAIL, MYF(0), + xa_state_names[thd->transaction.xid_state.xa_state]); + else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + my_error(ER_XAER_NOTA, MYF(0)); + else if (ha_prepare(thd)) + { + xid_cache_delete(thd, &thd->transaction.xid_state); + thd->transaction.xid_state.xa_state= XA_NOTR; + my_error(ER_XA_RBROLLBACK, MYF(0)); + } + else + thd->transaction.xid_state.xa_state= XA_PREPARED; + + DBUG_RETURN(thd->is_error() || + thd->transaction.xid_state.xa_state != XA_PREPARED); +} + + +/** + Commit and terminate the a XA transaction. + + @param thd Current thread + + @retval FALSE Success + @retval TRUE Failure +*/ + +bool trans_xa_commit(THD *thd) +{ + bool res= TRUE; + enum xa_states xa_state= thd->transaction.xid_state.xa_state; + DBUG_ENTER("trans_xa_commit"); + + if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + { + if (thd->fix_xid_hash_pins()) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + DBUG_RETURN(TRUE); + } + + XID_STATE *xs= xid_cache_search(thd, thd->lex->xid); + res= !xs; + if (res) + my_error(ER_XAER_NOTA, MYF(0)); + else + { + res= xa_trans_rolled_back(xs); + ha_commit_or_rollback_by_xid(thd->lex->xid, !res); + xid_cache_delete(thd, xs); + } + DBUG_RETURN(res); + } + + if (xa_trans_rolled_back(&thd->transaction.xid_state)) + { + xa_trans_force_rollback(thd); + res= thd->is_error(); + } + else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE) + { + int r= ha_commit_trans(thd, TRUE); + if ((res= MY_TEST(r))) + my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0)); + } + else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE) + { + MDL_request mdl_request; + + /* + Acquire metadata lock which will ensure that COMMIT is blocked + by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in + progress blocks FTWRL). + + We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does. + */ + mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_TRANSACTION); + + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout)) + { + ha_rollback_trans(thd, TRUE); + my_error(ER_XAER_RMERR, MYF(0)); + } + else + { + DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock"); + + res= MY_TEST(ha_commit_one_phase(thd, 1)); + if (res) + my_error(ER_XAER_RMERR, MYF(0)); + } + } + else + { + my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); + DBUG_RETURN(TRUE); + } + + thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); + thd->transaction.all.reset(); + thd->server_status&= + ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); + DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); + xid_cache_delete(thd, &thd->transaction.xid_state); + thd->transaction.xid_state.xa_state= XA_NOTR; + + trans_track_end_trx(thd); + + DBUG_RETURN(res); +} + + +/** + Roll back and terminate a XA transaction. + + @param thd Current thread + + @retval FALSE Success + @retval TRUE Failure +*/ + +bool trans_xa_rollback(THD *thd) +{ + bool res= TRUE; + enum xa_states xa_state= thd->transaction.xid_state.xa_state; + DBUG_ENTER("trans_xa_rollback"); + + if (!thd->transaction.xid_state.xid.eq(thd->lex->xid)) + { + if (thd->fix_xid_hash_pins()) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + DBUG_RETURN(TRUE); + } + + XID_STATE *xs= xid_cache_search(thd, thd->lex->xid); + if (!xs) + my_error(ER_XAER_NOTA, MYF(0)); + else + { + xa_trans_rolled_back(xs); + ha_commit_or_rollback_by_xid(thd->lex->xid, 0); + xid_cache_delete(thd, xs); + } + DBUG_RETURN(thd->get_stmt_da()->is_error()); + } + + if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY) + { + my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); + DBUG_RETURN(TRUE); + } + + res= xa_trans_force_rollback(thd); + + thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); + thd->transaction.all.reset(); + thd->server_status&= + ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); + DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); + xid_cache_delete(thd, &thd->transaction.xid_state); + thd->transaction.xid_state.xa_state= XA_NOTR; + + trans_track_end_trx(thd); + + DBUG_RETURN(res); +} + + +/** + return the XID as it appears in the SQL function's arguments. + So this string can be passed to XA START, XA PREPARE etc... + + @note + the 'buf' has to have space for at least SQL_XIDSIZE bytes. +*/ + + +/* + 'a'..'z' 'A'..'Z', '0'..'9' + and '-' '_' ' ' symbols don't have to be + converted. +*/ + +static const char xid_needs_conv[128]= +{ + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, + 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1, + 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1, + 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0, + 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1 +}; + +/* + The size of XID string representation in the form + 'gtrid', 'bqual', formatID + see xid_t::get_sql_string() for details. +*/ +#define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS) + +/* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */ +static uint get_sql_xid(XID *xid, char *buf) +{ + int tot_len= xid->gtrid_length + xid->bqual_length; + int i; + const char *orig_buf= buf; + + for (i=0; i<tot_len; i++) + { + uchar c= ((uchar *) xid->data)[i]; + if (c >= 128 || xid_needs_conv[c]) + break; + } + + if (i >= tot_len) + { + /* No need to convert characters to hexadecimals. */ + *buf++= '\''; + memcpy(buf, xid->data, xid->gtrid_length); + buf+= xid->gtrid_length; + *buf++= '\''; + if (xid->bqual_length > 0 || xid->formatID != 1) + { + *buf++= ','; + *buf++= '\''; + memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length); + buf+= xid->bqual_length; + *buf++= '\''; + } + } + else + { + *buf++= 'X'; + *buf++= '\''; + for (i= 0; i < xid->gtrid_length; i++) + { + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4]; + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f]; + } + *buf++= '\''; + if (xid->bqual_length > 0 || xid->formatID != 1) + { + *buf++= ','; + *buf++= 'X'; + *buf++= '\''; + for (; i < tot_len; i++) + { + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4]; + *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f]; + } + *buf++= '\''; + } + } + + if (xid->formatID != 1) + { + *buf++= ','; + buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf, + MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID); + } + + return (uint)(buf - orig_buf); +} + + +/** + return the list of XID's to a client, the same way SHOW commands do. + + @note + I didn't find in XA specs that an RM cannot return the same XID twice, + so mysql_xa_recover does not filter XID's to ensure uniqueness. + It can be easily fixed later, if necessary. +*/ + +static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol, + char *data, uint data_len, CHARSET_INFO *data_cs) +{ + if (xs->xa_state == XA_PREPARED) + { + protocol->prepare_for_resend(); + protocol->store_longlong((longlong) xs->xid.formatID, FALSE); + protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE); + protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE); + protocol->store(data, data_len, data_cs); + if (protocol->write()) + return TRUE; + } + return FALSE; +} + + +static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol) +{ + return xa_recover_callback(xs, protocol, xs->xid.data, + xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin); +} + + +static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol) +{ + char buf[SQL_XIDSIZE]; + uint len= get_sql_xid(&xs->xid, buf); + return xa_recover_callback(xs, protocol, buf, len, + &my_charset_utf8_general_ci); +} + + +bool mysql_xa_recover(THD *thd) +{ + List<Item> field_list; + Protocol *protocol= thd->protocol; + MEM_ROOT *mem_root= thd->mem_root; + my_hash_walk_action action; + DBUG_ENTER("mysql_xa_recover"); + + field_list.push_back(new (mem_root) + Item_int(thd, "formatID", 0, + MY_INT32_NUM_DECIMAL_DIGITS), mem_root); + field_list.push_back(new (mem_root) + Item_int(thd, "gtrid_length", 0, + MY_INT32_NUM_DECIMAL_DIGITS), mem_root); + field_list.push_back(new (mem_root) + Item_int(thd, "bqual_length", 0, + MY_INT32_NUM_DECIMAL_DIGITS), mem_root); + { + uint len; + CHARSET_INFO *cs; + + if (thd->lex->verbose) + { + len= SQL_XIDSIZE; + cs= &my_charset_utf8_general_ci; + action= (my_hash_walk_action) xa_recover_callback_verbose; + } + else + { + len= XIDDATASIZE; + cs= &my_charset_bin; + action= (my_hash_walk_action) xa_recover_callback_short; + } + + field_list.push_back(new (mem_root) + Item_empty_string(thd, "data", len, cs), mem_root); + } + + if (protocol->send_result_set_metadata(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) + DBUG_RETURN(1); + + if (xid_cache_iterate(thd, action, protocol)) + DBUG_RETURN(1); + my_eof(thd); + DBUG_RETURN(0); +} diff --git a/sql/xa.h b/sql/xa.h new file mode 100644 index 00000000000..e0ae834648e --- /dev/null +++ b/sql/xa.h @@ -0,0 +1,67 @@ +/* + Copyright (c) 2000, 2016, Oracle and/or its affiliates. + Copyright (c) 2009, 2019, MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +*/ + + +enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY}; +extern const char *xa_state_names[]; +class XID_cache_element; + +struct XID_STATE { + /* For now, this is only used to catch duplicated external xids */ + XID xid; // transaction identifier + enum xa_states xa_state; // used by external XA only + /* Error reported by the Resource Manager (RM) to the Transaction Manager. */ + uint rm_error; + XID_cache_element *xid_cache_element; + + /** + Check that XA transaction has an uncommitted work. Report an error + to the user in case when there is an uncommitted work for XA transaction. + + @return result of check + @retval false XA transaction is NOT in state IDLE, PREPARED + or ROLLBACK_ONLY. + @retval true XA transaction is in state IDLE or PREPARED + or ROLLBACK_ONLY. + */ + + bool check_has_uncommitted_xa() const + { + if (xa_state == XA_IDLE || + xa_state == XA_PREPARED || + xa_state == XA_ROLLBACK_ONLY) + { + my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]); + return true; + } + return false; + } +}; + +void xid_cache_init(void); +void xid_cache_free(void); +bool xid_cache_insert(XID *xid, enum xa_states xa_state); +bool xid_cache_insert(THD *thd, XID_STATE *xid_state); +void xid_cache_delete(THD *thd, XID_STATE *xid_state); + +bool trans_xa_start(THD *thd); +bool trans_xa_end(THD *thd); +bool trans_xa_prepare(THD *thd); +bool trans_xa_commit(THD *thd); +bool trans_xa_rollback(THD *thd); +bool mysql_xa_recover(THD *thd); |