summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libmysqld/CMakeLists.txt1
-rw-r--r--sql/CMakeLists.txt2
-rw-r--r--sql/handler.cc180
-rw-r--r--sql/handler.h10
-rw-r--r--sql/sql_class.cc257
-rw-r--r--sql/sql_class.h45
-rw-r--r--sql/sql_parse.cc4
-rw-r--r--sql/transaction.cc332
-rw-r--r--sql/transaction.h8
-rw-r--r--sql/xa.cc797
-rw-r--r--sql/xa.h67
11 files changed, 872 insertions, 831 deletions
diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt
index 6bd3a59dd2f..4d9b07195ba 100644
--- a/libmysqld/CMakeLists.txt
+++ b/libmysqld/CMakeLists.txt
@@ -126,6 +126,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/rowid_filter.cc ../sql/rowid_filter.h
../sql/item_vers.cc
../sql/opt_trace.cc
+ ../sql/xa.cc
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
)
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index da4de86d950..2879970e9ff 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -143,7 +143,7 @@ SET (SQL_SOURCE
opt_trace.cc
${WSREP_SOURCES}
table_cache.cc encryption.cc temporary_tables.cc
- proxy_protocol.cc backup.cc
+ proxy_protocol.cc backup.cc xa.cc
${CMAKE_CURRENT_BINARY_DIR}/sql_builtin.cc
${CMAKE_CURRENT_BINARY_DIR}/sql_yacc.cc
${CMAKE_CURRENT_BINARY_DIR}/sql_yacc_ora.cc
diff --git a/sql/handler.cc b/sql/handler.cc
index 917a5a56c6d..777a64b2c67 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -2221,186 +2221,6 @@ int ha_recover(HASH *commit_list)
DBUG_RETURN(0);
}
-/**
- return the XID as it appears in the SQL function's arguments.
- So this string can be passed to XA START, XA PREPARE etc...
-
- @note
- the 'buf' has to have space for at least SQL_XIDSIZE bytes.
-*/
-
-
-/*
- 'a'..'z' 'A'..'Z', '0'..'9'
- and '-' '_' ' ' symbols don't have to be
- converted.
-*/
-
-static const char xid_needs_conv[128]=
-{
- 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
- 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
- 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
- 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
- 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
- 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
- 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
- 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
-};
-
-uint get_sql_xid(XID *xid, char *buf)
-{
- int tot_len= xid->gtrid_length + xid->bqual_length;
- int i;
- const char *orig_buf= buf;
-
- for (i=0; i<tot_len; i++)
- {
- uchar c= ((uchar *) xid->data)[i];
- if (c >= 128 || xid_needs_conv[c])
- break;
- }
-
- if (i >= tot_len)
- {
- /* No need to convert characters to hexadecimals. */
- *buf++= '\'';
- memcpy(buf, xid->data, xid->gtrid_length);
- buf+= xid->gtrid_length;
- *buf++= '\'';
- if (xid->bqual_length > 0 || xid->formatID != 1)
- {
- *buf++= ',';
- *buf++= '\'';
- memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
- buf+= xid->bqual_length;
- *buf++= '\'';
- }
- }
- else
- {
- *buf++= 'X';
- *buf++= '\'';
- for (i= 0; i < xid->gtrid_length; i++)
- {
- *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
- *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
- }
- *buf++= '\'';
- if (xid->bqual_length > 0 || xid->formatID != 1)
- {
- *buf++= ',';
- *buf++= 'X';
- *buf++= '\'';
- for (; i < tot_len; i++)
- {
- *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
- *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
- }
- *buf++= '\'';
- }
- }
-
- if (xid->formatID != 1)
- {
- *buf++= ',';
- buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
- MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
- }
-
- return (uint)(buf - orig_buf);
-}
-
-
-/**
- return the list of XID's to a client, the same way SHOW commands do.
-
- @note
- I didn't find in XA specs that an RM cannot return the same XID twice,
- so mysql_xa_recover does not filter XID's to ensure uniqueness.
- It can be easily fixed later, if necessary.
-*/
-
-static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
- char *data, uint data_len, CHARSET_INFO *data_cs)
-{
- if (xs->xa_state == XA_PREPARED)
- {
- protocol->prepare_for_resend();
- protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
- protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
- protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
- protocol->store(data, data_len, data_cs);
- if (protocol->write())
- return TRUE;
- }
- return FALSE;
-}
-
-
-static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol)
-{
- return xa_recover_callback(xs, protocol, xs->xid.data,
- xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
-}
-
-
-static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol)
-{
- char buf[SQL_XIDSIZE];
- uint len= get_sql_xid(&xs->xid, buf);
- return xa_recover_callback(xs, protocol, buf, len,
- &my_charset_utf8_general_ci);
-}
-
-
-bool mysql_xa_recover(THD *thd)
-{
- List<Item> field_list;
- Protocol *protocol= thd->protocol;
- MEM_ROOT *mem_root= thd->mem_root;
- my_hash_walk_action action;
- DBUG_ENTER("mysql_xa_recover");
-
- field_list.push_back(new (mem_root)
- Item_int(thd, "formatID", 0,
- MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
- field_list.push_back(new (mem_root)
- Item_int(thd, "gtrid_length", 0,
- MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
- field_list.push_back(new (mem_root)
- Item_int(thd, "bqual_length", 0,
- MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
- {
- uint len;
- CHARSET_INFO *cs;
-
- if (thd->lex->verbose)
- {
- len= SQL_XIDSIZE;
- cs= &my_charset_utf8_general_ci;
- action= (my_hash_walk_action) xa_recover_callback_verbose;
- }
- else
- {
- len= XIDDATASIZE;
- cs= &my_charset_bin;
- action= (my_hash_walk_action) xa_recover_callback_short;
- }
-
- field_list.push_back(new (mem_root)
- Item_empty_string(thd, "data", len, cs), mem_root);
- }
-
- if (protocol->send_result_set_metadata(&field_list,
- Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
- DBUG_RETURN(1);
-
- if (xid_cache_iterate(thd, action, protocol))
- DBUG_RETURN(1);
- my_eof(thd);
- DBUG_RETURN(0);
-}
/*
Called by engine to notify TC that a new commit checkpoint has been reached.
diff --git a/sql/handler.h b/sql/handler.h
index 676632113c3..fb6862e4ce1 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -892,15 +892,6 @@ struct xid_t {
};
typedef struct xid_t XID;
-/*
- The size of XID string representation in the form
- 'gtrid', 'bqual', formatID
- see xid_t::get_sql_string() for details.
-*/
-#define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS)
-/* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */
-uint get_sql_xid(XID *xid, char *buf);
-
/* for recover() handlerton call */
#define MIN_XID_LIST_SIZE 128
#define MAX_XID_LIST_SIZE (1024*128)
@@ -4977,7 +4968,6 @@ void trans_register_ha(THD *thd, bool all, handlerton *ht);
const char *get_canonical_filename(handler *file, const char *path,
char *tmp_path);
-bool mysql_xa_recover(THD *thd);
void commit_checkpoint_notify_ha(handlerton *hton, void *cookie);
inline const LEX_CSTRING *table_case_name(HA_CREATE_INFO *info, const LEX_CSTRING *name)
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index c0c89ee59b3..c77c8a1a05a 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5641,263 +5641,6 @@ void THD::mark_transaction_to_rollback(bool all)
is_fatal_sub_stmt_error= true;
transaction_rollback_request= all;
}
-/***************************************************************************
- Handling of XA id cacheing
-***************************************************************************/
-class XID_cache_element
-{
- /*
- m_state is used to prevent elements from being deleted while XA RECOVER
- iterates xid cache and to prevent recovered elments from being acquired by
- multiple threads.
-
- bits 1..29 are reference counter
- bit 30 is RECOVERED flag
- bit 31 is ACQUIRED flag (thread owns this xid)
- bit 32 is unused
-
- Newly allocated and deleted elements have m_state set to 0.
-
- On lock() m_state is atomically incremented. It also creates load-ACQUIRE
- memory barrier to make sure m_state is actually updated before furhter
- memory accesses. Attempting to lock an element that has neither ACQUIRED
- nor RECOVERED flag set returns failure and further accesses to element
- memory are forbidden.
-
- On unlock() m_state is decremented. It also creates store-RELEASE memory
- barrier to make sure m_state is actually updated after preceding memory
- accesses.
-
- ACQUIRED flag is set when thread registers it's xid or when thread acquires
- recovered xid.
-
- RECOVERED flag is set for elements found during crash recovery.
-
- ACQUIRED and RECOVERED flags are cleared before element is deleted from
- hash in a spin loop, after last reference is released.
- */
- std::atomic<int32_t> m_state;
-public:
- static const int32 ACQUIRED= 1 << 30;
- static const int32 RECOVERED= 1 << 29;
- XID_STATE *m_xid_state;
- bool is_set(int32_t flag)
- { return m_state.load(std::memory_order_relaxed) & flag; }
- void set(int32_t flag)
- {
- DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED));
- m_state.fetch_add(flag, std::memory_order_relaxed);
- }
- bool lock()
- {
- int32_t old= m_state.fetch_add(1, std::memory_order_acquire);
- if (old & (ACQUIRED | RECOVERED))
- return true;
- unlock();
- return false;
- }
- void unlock()
- { m_state.fetch_sub(1, std::memory_order_release); }
- void mark_uninitialized()
- {
- int32_t old= ACQUIRED;
- while (!m_state.compare_exchange_weak(old, 0,
- std::memory_order_relaxed,
- std::memory_order_relaxed))
- {
- old&= ACQUIRED | RECOVERED;
- (void) LF_BACKOFF();
- }
- }
- bool acquire_recovered()
- {
- int32_t old= RECOVERED;
- while (!m_state.compare_exchange_weak(old, ACQUIRED | RECOVERED,
- std::memory_order_relaxed,
- std::memory_order_relaxed))
- {
- if (!(old & RECOVERED) || (old & ACQUIRED))
- return false;
- old= RECOVERED;
- (void) LF_BACKOFF();
- }
- return true;
- }
- static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
- XID_cache_element *element,
- XID_STATE *xid_state)
- {
- DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
- element->m_xid_state= xid_state;
- xid_state->xid_cache_element= element;
- }
- static void lf_alloc_constructor(uchar *ptr)
- {
- XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
- element->m_state= 0;
- }
- static void lf_alloc_destructor(uchar *ptr)
- {
- XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
- DBUG_ASSERT(!element->is_set(ACQUIRED));
- if (element->is_set(RECOVERED))
- my_free(element->m_xid_state);
- }
- static uchar *key(const XID_cache_element *element, size_t *length,
- my_bool not_used __attribute__((unused)))
- {
- *length= element->m_xid_state->xid.key_length();
- return element->m_xid_state->xid.key();
- }
-};
-
-
-static LF_HASH xid_cache;
-static bool xid_cache_inited;
-
-
-bool THD::fix_xid_hash_pins()
-{
- if (!xid_hash_pins)
- xid_hash_pins= lf_hash_get_pins(&xid_cache);
- return !xid_hash_pins;
-}
-
-
-void xid_cache_init()
-{
- xid_cache_inited= true;
- lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0,
- (my_hash_get_key) XID_cache_element::key, &my_charset_bin);
- xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor;
- xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor;
- xid_cache.initializer=
- (lf_hash_initializer) XID_cache_element::lf_hash_initializer;
-}
-
-
-void xid_cache_free()
-{
- if (xid_cache_inited)
- {
- lf_hash_destroy(&xid_cache);
- xid_cache_inited= false;
- }
-}
-
-
-/**
- Find recovered XA transaction by XID.
-*/
-
-XID_STATE *xid_cache_search(THD *thd, XID *xid)
-{
- XID_STATE *xs= 0;
- DBUG_ASSERT(thd->xid_hash_pins);
- XID_cache_element *element=
- (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
- xid->key(), xid->key_length());
- if (element)
- {
- if (element->acquire_recovered())
- xs= element->m_xid_state;
- lf_hash_search_unpin(thd->xid_hash_pins);
- DEBUG_SYNC(thd, "xa_after_search");
- }
- return xs;
-}
-
-
-bool xid_cache_insert(XID *xid, enum xa_states xa_state)
-{
- XID_STATE *xs;
- LF_PINS *pins;
- int res= 1;
-
- if (!(pins= lf_hash_get_pins(&xid_cache)))
- return true;
-
- if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME))))
- {
- xs->xa_state=xa_state;
- xs->xid.set(xid);
- xs->rm_error=0;
-
- if ((res= lf_hash_insert(&xid_cache, pins, xs)))
- my_free(xs);
- else
- xs->xid_cache_element->set(XID_cache_element::RECOVERED);
- if (res == 1)
- res= 0;
- }
- lf_hash_put_pins(pins);
- return res;
-}
-
-
-bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
-{
- if (thd->fix_xid_hash_pins())
- return true;
-
- int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state);
- switch (res)
- {
- case 0:
- xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
- break;
- case 1:
- my_error(ER_XAER_DUPID, MYF(0));
- /* fall through */
- default:
- xid_state->xid_cache_element= 0;
- }
- return res;
-}
-
-
-void xid_cache_delete(THD *thd, XID_STATE *xid_state)
-{
- if (xid_state->xid_cache_element)
- {
- bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED);
- DBUG_ASSERT(thd->xid_hash_pins);
- xid_state->xid_cache_element->mark_uninitialized();
- lf_hash_delete(&xid_cache, thd->xid_hash_pins,
- xid_state->xid.key(), xid_state->xid.key_length());
- xid_state->xid_cache_element= 0;
- if (recovered)
- my_free(xid_state);
- }
-}
-
-
-struct xid_cache_iterate_arg
-{
- my_hash_walk_action action;
- void *argument;
-};
-
-static my_bool xid_cache_iterate_callback(XID_cache_element *element,
- xid_cache_iterate_arg *arg)
-{
- my_bool res= FALSE;
- if (element->lock())
- {
- res= arg->action(element->m_xid_state, arg->argument);
- element->unlock();
- }
- return res;
-}
-
-int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
-{
- xid_cache_iterate_arg argument= { action, arg };
- return thd->fix_xid_hash_pins() ? -1 :
- lf_hash_iterate(&xid_cache, thd->xid_hash_pins,
- (my_hash_walk_action) xid_cache_iterate_callback,
- &argument);
-}
/**
diff --git a/sql/sql_class.h b/sql/sql_class.h
index f8dd9f78500..c68628be65d 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -47,6 +47,7 @@
#include <mysql_com_server.h>
#include "session_tracker.h"
#include "backup.h"
+#include "xa.h"
extern "C"
void set_thd_stage_info(void *thd,
@@ -1276,50 +1277,6 @@ struct st_savepoint {
MDL_savepoint mdl_savepoint;
};
-enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
-extern const char *xa_state_names[];
-class XID_cache_element;
-
-typedef struct st_xid_state {
- /* For now, this is only used to catch duplicated external xids */
- XID xid; // transaction identifier
- enum xa_states xa_state; // used by external XA only
- /* Error reported by the Resource Manager (RM) to the Transaction Manager. */
- uint rm_error;
- XID_cache_element *xid_cache_element;
-
- /**
- Check that XA transaction has an uncommitted work. Report an error
- to the user in case when there is an uncommitted work for XA transaction.
-
- @return result of check
- @retval false XA transaction is NOT in state IDLE, PREPARED
- or ROLLBACK_ONLY.
- @retval true XA transaction is in state IDLE or PREPARED
- or ROLLBACK_ONLY.
- */
-
- bool check_has_uncommitted_xa() const
- {
- if (xa_state == XA_IDLE ||
- xa_state == XA_PREPARED ||
- xa_state == XA_ROLLBACK_ONLY)
- {
- my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
- return true;
- }
- return false;
- }
-} XID_STATE;
-
-void xid_cache_init(void);
-void xid_cache_free(void);
-XID_STATE *xid_cache_search(THD *thd, XID *xid);
-bool xid_cache_insert(XID *xid, enum xa_states xa_state);
-bool xid_cache_insert(THD *thd, XID_STATE *xid_state);
-void xid_cache_delete(THD *thd, XID_STATE *xid_state);
-int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *argument);
-
/**
@class Security_context
@brief A set of THD members describing the current authenticated user.
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index d5d721e0ede..0a476f47988 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -395,10 +395,6 @@ const LEX_CSTRING command_name[257]={
{ STRING_WITH_LEN("Error") } // Last command number 255
};
-const char *xa_state_names[]={
- "NON-EXISTING", "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY"
-};
-
#ifdef HAVE_REPLICATION
/**
Returns true if all tables should be ignored.
diff --git a/sql/transaction.cc b/sql/transaction.cc
index 4d61d2a120d..15c45425528 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -28,21 +28,19 @@
#include "wsrep_trans_observer.h"
#endif /* WITH_WSREP */
-#ifndef EMBEDDED_LIBRARY
/**
Helper: Tell tracker (if any) that transaction ended.
*/
-static void trans_track_end_trx(THD *thd)
+void trans_track_end_trx(THD *thd)
{
+#ifndef EMBEDDED_LIBRARY
if (thd->variables.session_track_transaction_info > TX_TRACK_NONE)
{
((Transaction_state_tracker *)
thd->session_tracker.get_tracker(TRANSACTION_INFO_TRACKER))->end_trx(thd);
}
-}
-#else
-#define trans_track_end_trx(A) do{}while(0)
#endif //EMBEDDED_LIBRARY
+}
/**
@@ -89,65 +87,6 @@ static bool trans_check(THD *thd)
/**
- Mark a XA transaction as rollback-only if the RM unilaterally
- rolled back the transaction branch.
-
- @note If a rollback was requested by the RM, this function sets
- the appropriate rollback error code and transits the state
- to XA_ROLLBACK_ONLY.
-
- @return TRUE if transaction was rolled back or if the transaction
- state is XA_ROLLBACK_ONLY. FALSE otherwise.
-*/
-static bool xa_trans_rolled_back(XID_STATE *xid_state)
-{
- if (xid_state->rm_error)
- {
- switch (xid_state->rm_error) {
- case ER_LOCK_WAIT_TIMEOUT:
- my_error(ER_XA_RBTIMEOUT, MYF(0));
- break;
- case ER_LOCK_DEADLOCK:
- my_error(ER_XA_RBDEADLOCK, MYF(0));
- break;
- default:
- my_error(ER_XA_RBROLLBACK, MYF(0));
- }
- xid_state->xa_state= XA_ROLLBACK_ONLY;
- }
-
- return (xid_state->xa_state == XA_ROLLBACK_ONLY);
-}
-
-
-/**
- Rollback the active XA transaction.
-
- @note Resets rm_error before calling ha_rollback(), so
- the thd->transaction.xid structure gets reset
- by ha_rollback() / THD::transaction::cleanup().
-
- @return TRUE if the rollback failed, FALSE otherwise.
-*/
-
-static bool xa_trans_force_rollback(THD *thd)
-{
- /*
- We must reset rm_error before calling ha_rollback(),
- so thd->transaction.xid structure gets reset
- by ha_rollback()/THD::transaction::cleanup().
- */
- thd->transaction.xid_state.rm_error= 0;
- if (ha_rollback_trans(thd, true))
- {
- my_error(ER_XAER_RMERR, MYF(0));
- return true;
- }
- return false;
-}
-
-
-/**
Begin a new transaction.
@note Beginning a transaction implicitly commits any current
@@ -777,268 +716,3 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name)
DBUG_RETURN(MY_TEST(res));
}
-
-
-/**
- Starts an XA transaction with the given xid value.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_start(THD *thd)
-{
- enum xa_states xa_state= thd->transaction.xid_state.xa_state;
- DBUG_ENTER("trans_xa_start");
-
- if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME)
- {
- bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid);
- if (not_equal)
- my_error(ER_XAER_NOTA, MYF(0));
- else
- thd->transaction.xid_state.xa_state= XA_ACTIVE;
- DBUG_RETURN(not_equal);
- }
-
- /* TODO: JOIN is not supported yet. */
- if (thd->lex->xa_opt != XA_NONE)
- my_error(ER_XAER_INVAL, MYF(0));
- else if (xa_state != XA_NOTR)
- my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
- else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction())
- my_error(ER_XAER_OUTSIDE, MYF(0));
- else if (!trans_begin(thd))
- {
- DBUG_ASSERT(thd->transaction.xid_state.xid.is_null());
- thd->transaction.xid_state.xa_state= XA_ACTIVE;
- thd->transaction.xid_state.rm_error= 0;
- thd->transaction.xid_state.xid.set(thd->lex->xid);
- if (xid_cache_insert(thd, &thd->transaction.xid_state))
- {
- thd->transaction.xid_state.xa_state= XA_NOTR;
- thd->transaction.xid_state.xid.null();
- trans_rollback(thd);
- DBUG_RETURN(true);
- }
- DBUG_RETURN(FALSE);
- }
-
- DBUG_RETURN(TRUE);
-}
-
-
-/**
- Put a XA transaction in the IDLE state.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_end(THD *thd)
-{
- DBUG_ENTER("trans_xa_end");
-
- /* TODO: SUSPEND and FOR MIGRATE are not supported yet. */
- if (thd->lex->xa_opt != XA_NONE)
- my_error(ER_XAER_INVAL, MYF(0));
- else if (thd->transaction.xid_state.xa_state != XA_ACTIVE)
- my_error(ER_XAER_RMFAIL, MYF(0),
- xa_state_names[thd->transaction.xid_state.xa_state]);
- else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
- my_error(ER_XAER_NOTA, MYF(0));
- else if (!xa_trans_rolled_back(&thd->transaction.xid_state))
- thd->transaction.xid_state.xa_state= XA_IDLE;
-
- DBUG_RETURN(thd->is_error() ||
- thd->transaction.xid_state.xa_state != XA_IDLE);
-}
-
-
-/**
- Put a XA transaction in the PREPARED state.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_prepare(THD *thd)
-{
- DBUG_ENTER("trans_xa_prepare");
-
- if (thd->transaction.xid_state.xa_state != XA_IDLE)
- my_error(ER_XAER_RMFAIL, MYF(0),
- xa_state_names[thd->transaction.xid_state.xa_state]);
- else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
- my_error(ER_XAER_NOTA, MYF(0));
- else if (ha_prepare(thd))
- {
- xid_cache_delete(thd, &thd->transaction.xid_state);
- thd->transaction.xid_state.xa_state= XA_NOTR;
- my_error(ER_XA_RBROLLBACK, MYF(0));
- }
- else
- thd->transaction.xid_state.xa_state= XA_PREPARED;
-
- DBUG_RETURN(thd->is_error() ||
- thd->transaction.xid_state.xa_state != XA_PREPARED);
-}
-
-
-/**
- Commit and terminate the a XA transaction.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_commit(THD *thd)
-{
- bool res= TRUE;
- enum xa_states xa_state= thd->transaction.xid_state.xa_state;
- DBUG_ENTER("trans_xa_commit");
-
- if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
- {
- if (thd->fix_xid_hash_pins())
- {
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
- DBUG_RETURN(TRUE);
- }
-
- XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
- res= !xs;
- if (res)
- my_error(ER_XAER_NOTA, MYF(0));
- else
- {
- res= xa_trans_rolled_back(xs);
- ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
- xid_cache_delete(thd, xs);
- }
- DBUG_RETURN(res);
- }
-
- if (xa_trans_rolled_back(&thd->transaction.xid_state))
- {
- xa_trans_force_rollback(thd);
- res= thd->is_error();
- }
- else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE)
- {
- int r= ha_commit_trans(thd, TRUE);
- if ((res= MY_TEST(r)))
- my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
- }
- else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE)
- {
- MDL_request mdl_request;
-
- /*
- Acquire metadata lock which will ensure that COMMIT is blocked
- by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
- progress blocks FTWRL).
-
- We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
- */
- mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
- MDL_TRANSACTION);
-
- if (thd->mdl_context.acquire_lock(&mdl_request,
- thd->variables.lock_wait_timeout))
- {
- ha_rollback_trans(thd, TRUE);
- my_error(ER_XAER_RMERR, MYF(0));
- }
- else
- {
- DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
-
- res= MY_TEST(ha_commit_one_phase(thd, 1));
- if (res)
- my_error(ER_XAER_RMERR, MYF(0));
- }
- }
- else
- {
- my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
- DBUG_RETURN(TRUE);
- }
-
- thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->transaction.all.reset();
- thd->server_status&=
- ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
- DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
- xid_cache_delete(thd, &thd->transaction.xid_state);
- thd->transaction.xid_state.xa_state= XA_NOTR;
-
- trans_track_end_trx(thd);
-
- DBUG_RETURN(res);
-}
-
-
-/**
- Roll back and terminate a XA transaction.
-
- @param thd Current thread
-
- @retval FALSE Success
- @retval TRUE Failure
-*/
-
-bool trans_xa_rollback(THD *thd)
-{
- bool res= TRUE;
- enum xa_states xa_state= thd->transaction.xid_state.xa_state;
- DBUG_ENTER("trans_xa_rollback");
-
- if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
- {
- if (thd->fix_xid_hash_pins())
- {
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
- DBUG_RETURN(TRUE);
- }
-
- XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
- if (!xs)
- my_error(ER_XAER_NOTA, MYF(0));
- else
- {
- xa_trans_rolled_back(xs);
- ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
- xid_cache_delete(thd, xs);
- }
- DBUG_RETURN(thd->get_stmt_da()->is_error());
- }
-
- if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY)
- {
- my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
- DBUG_RETURN(TRUE);
- }
-
- res= xa_trans_force_rollback(thd);
-
- thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
- thd->transaction.all.reset();
- thd->server_status&=
- ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
- DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
- xid_cache_delete(thd, &thd->transaction.xid_state);
- thd->transaction.xid_state.xa_state= XA_NOTR;
-
- trans_track_end_trx(thd);
-
- DBUG_RETURN(res);
-}
diff --git a/sql/transaction.h b/sql/transaction.h
index 7e34693a2eb..5eaa2b00027 100644
--- a/sql/transaction.h
+++ b/sql/transaction.h
@@ -24,6 +24,8 @@
class THD;
+void trans_track_end_trx(THD *thd);
+
bool trans_begin(THD *thd, uint flags= 0);
bool trans_commit(THD *thd);
bool trans_commit_implicit(THD *thd);
@@ -37,12 +39,6 @@ bool trans_savepoint(THD *thd, LEX_CSTRING name);
bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name);
bool trans_release_savepoint(THD *thd, LEX_CSTRING name);
-bool trans_xa_start(THD *thd);
-bool trans_xa_end(THD *thd);
-bool trans_xa_prepare(THD *thd);
-bool trans_xa_commit(THD *thd);
-bool trans_xa_rollback(THD *thd);
-
void trans_reset_one_shot_chistics(THD *thd);
#endif /* TRANSACTION_H */
diff --git a/sql/xa.cc b/sql/xa.cc
new file mode 100644
index 00000000000..4de6b101e38
--- /dev/null
+++ b/sql/xa.cc
@@ -0,0 +1,797 @@
+/*
+ Copyright (c) 2000, 2016, Oracle and/or its affiliates.
+ Copyright (c) 2009, 2019, MariaDB Corporation.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
+*/
+
+#include "mariadb.h"
+#include "sql_class.h"
+#include "transaction.h"
+
+
+/***************************************************************************
+ Handling of XA id cacheing
+***************************************************************************/
+class XID_cache_element
+{
+ /*
+ m_state is used to prevent elements from being deleted while XA RECOVER
+ iterates xid cache and to prevent recovered elments from being acquired by
+ multiple threads.
+
+ bits 1..29 are reference counter
+ bit 30 is RECOVERED flag
+ bit 31 is ACQUIRED flag (thread owns this xid)
+ bit 32 is unused
+
+ Newly allocated and deleted elements have m_state set to 0.
+
+ On lock() m_state is atomically incremented. It also creates load-ACQUIRE
+ memory barrier to make sure m_state is actually updated before furhter
+ memory accesses. Attempting to lock an element that has neither ACQUIRED
+ nor RECOVERED flag set returns failure and further accesses to element
+ memory are forbidden.
+
+ On unlock() m_state is decremented. It also creates store-RELEASE memory
+ barrier to make sure m_state is actually updated after preceding memory
+ accesses.
+
+ ACQUIRED flag is set when thread registers it's xid or when thread acquires
+ recovered xid.
+
+ RECOVERED flag is set for elements found during crash recovery.
+
+ ACQUIRED and RECOVERED flags are cleared before element is deleted from
+ hash in a spin loop, after last reference is released.
+ */
+ std::atomic<int32_t> m_state;
+public:
+ static const int32 ACQUIRED= 1 << 30;
+ static const int32 RECOVERED= 1 << 29;
+ XID_STATE *m_xid_state;
+ bool is_set(int32_t flag)
+ { return m_state.load(std::memory_order_relaxed) & flag; }
+ void set(int32_t flag)
+ {
+ DBUG_ASSERT(!is_set(ACQUIRED | RECOVERED));
+ m_state.fetch_add(flag, std::memory_order_relaxed);
+ }
+ bool lock()
+ {
+ int32_t old= m_state.fetch_add(1, std::memory_order_acquire);
+ if (old & (ACQUIRED | RECOVERED))
+ return true;
+ unlock();
+ return false;
+ }
+ void unlock()
+ { m_state.fetch_sub(1, std::memory_order_release); }
+ void mark_uninitialized()
+ {
+ int32_t old= ACQUIRED;
+ while (!m_state.compare_exchange_weak(old, 0,
+ std::memory_order_relaxed,
+ std::memory_order_relaxed))
+ {
+ old&= ACQUIRED | RECOVERED;
+ (void) LF_BACKOFF();
+ }
+ }
+ bool acquire_recovered()
+ {
+ int32_t old= RECOVERED;
+ while (!m_state.compare_exchange_weak(old, ACQUIRED | RECOVERED,
+ std::memory_order_relaxed,
+ std::memory_order_relaxed))
+ {
+ if (!(old & RECOVERED) || (old & ACQUIRED))
+ return false;
+ old= RECOVERED;
+ (void) LF_BACKOFF();
+ }
+ return true;
+ }
+ static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
+ XID_cache_element *element,
+ XID_STATE *xid_state)
+ {
+ DBUG_ASSERT(!element->is_set(ACQUIRED | RECOVERED));
+ element->m_xid_state= xid_state;
+ xid_state->xid_cache_element= element;
+ }
+ static void lf_alloc_constructor(uchar *ptr)
+ {
+ XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
+ element->m_state= 0;
+ }
+ static void lf_alloc_destructor(uchar *ptr)
+ {
+ XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
+ DBUG_ASSERT(!element->is_set(ACQUIRED));
+ if (element->is_set(RECOVERED))
+ my_free(element->m_xid_state);
+ }
+ static uchar *key(const XID_cache_element *element, size_t *length,
+ my_bool not_used __attribute__((unused)))
+ {
+ *length= element->m_xid_state->xid.key_length();
+ return element->m_xid_state->xid.key();
+ }
+};
+
+
+static LF_HASH xid_cache;
+static bool xid_cache_inited;
+const char *xa_state_names[]= {
+ "NON-EXISTING", "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY"
+};
+
+
+bool THD::fix_xid_hash_pins()
+{
+ if (!xid_hash_pins)
+ xid_hash_pins= lf_hash_get_pins(&xid_cache);
+ return !xid_hash_pins;
+}
+
+
+void xid_cache_init()
+{
+ xid_cache_inited= true;
+ lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0,
+ (my_hash_get_key) XID_cache_element::key, &my_charset_bin);
+ xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor;
+ xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor;
+ xid_cache.initializer=
+ (lf_hash_initializer) XID_cache_element::lf_hash_initializer;
+}
+
+
+void xid_cache_free()
+{
+ if (xid_cache_inited)
+ {
+ lf_hash_destroy(&xid_cache);
+ xid_cache_inited= false;
+ }
+}
+
+
+/**
+ Find recovered XA transaction by XID.
+*/
+
+static XID_STATE *xid_cache_search(THD *thd, XID *xid)
+{
+ XID_STATE *xs= 0;
+ DBUG_ASSERT(thd->xid_hash_pins);
+ XID_cache_element *element=
+ (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
+ xid->key(), xid->key_length());
+ if (element)
+ {
+ if (element->acquire_recovered())
+ xs= element->m_xid_state;
+ lf_hash_search_unpin(thd->xid_hash_pins);
+ DEBUG_SYNC(thd, "xa_after_search");
+ }
+ return xs;
+}
+
+
+bool xid_cache_insert(XID *xid, enum xa_states xa_state)
+{
+ XID_STATE *xs;
+ LF_PINS *pins;
+ int res= 1;
+
+ if (!(pins= lf_hash_get_pins(&xid_cache)))
+ return true;
+
+ if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME))))
+ {
+ xs->xa_state=xa_state;
+ xs->xid.set(xid);
+ xs->rm_error=0;
+
+ if ((res= lf_hash_insert(&xid_cache, pins, xs)))
+ my_free(xs);
+ else
+ xs->xid_cache_element->set(XID_cache_element::RECOVERED);
+ if (res == 1)
+ res= 0;
+ }
+ lf_hash_put_pins(pins);
+ return res;
+}
+
+
+bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
+{
+ if (thd->fix_xid_hash_pins())
+ return true;
+
+ int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state);
+ switch (res)
+ {
+ case 0:
+ xid_state->xid_cache_element->set(XID_cache_element::ACQUIRED);
+ break;
+ case 1:
+ my_error(ER_XAER_DUPID, MYF(0));
+ /* fall through */
+ default:
+ xid_state->xid_cache_element= 0;
+ }
+ return res;
+}
+
+
+void xid_cache_delete(THD *thd, XID_STATE *xid_state)
+{
+ if (xid_state->xid_cache_element)
+ {
+ bool recovered= xid_state->xid_cache_element->is_set(XID_cache_element::RECOVERED);
+ DBUG_ASSERT(thd->xid_hash_pins);
+ xid_state->xid_cache_element->mark_uninitialized();
+ lf_hash_delete(&xid_cache, thd->xid_hash_pins,
+ xid_state->xid.key(), xid_state->xid.key_length());
+ xid_state->xid_cache_element= 0;
+ if (recovered)
+ my_free(xid_state);
+ }
+}
+
+
+struct xid_cache_iterate_arg
+{
+ my_hash_walk_action action;
+ void *argument;
+};
+
+static my_bool xid_cache_iterate_callback(XID_cache_element *element,
+ xid_cache_iterate_arg *arg)
+{
+ my_bool res= FALSE;
+ if (element->lock())
+ {
+ res= arg->action(element->m_xid_state, arg->argument);
+ element->unlock();
+ }
+ return res;
+}
+
+static int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
+{
+ xid_cache_iterate_arg argument= { action, arg };
+ return thd->fix_xid_hash_pins() ? -1 :
+ lf_hash_iterate(&xid_cache, thd->xid_hash_pins,
+ (my_hash_walk_action) xid_cache_iterate_callback,
+ &argument);
+}
+
+
+/**
+ Mark a XA transaction as rollback-only if the RM unilaterally
+ rolled back the transaction branch.
+
+ @note If a rollback was requested by the RM, this function sets
+ the appropriate rollback error code and transits the state
+ to XA_ROLLBACK_ONLY.
+
+ @return TRUE if transaction was rolled back or if the transaction
+ state is XA_ROLLBACK_ONLY. FALSE otherwise.
+*/
+static bool xa_trans_rolled_back(XID_STATE *xid_state)
+{
+ if (xid_state->rm_error)
+ {
+ switch (xid_state->rm_error) {
+ case ER_LOCK_WAIT_TIMEOUT:
+ my_error(ER_XA_RBTIMEOUT, MYF(0));
+ break;
+ case ER_LOCK_DEADLOCK:
+ my_error(ER_XA_RBDEADLOCK, MYF(0));
+ break;
+ default:
+ my_error(ER_XA_RBROLLBACK, MYF(0));
+ }
+ xid_state->xa_state= XA_ROLLBACK_ONLY;
+ }
+
+ return (xid_state->xa_state == XA_ROLLBACK_ONLY);
+}
+
+
+/**
+ Rollback the active XA transaction.
+
+ @note Resets rm_error before calling ha_rollback(), so
+ the thd->transaction.xid structure gets reset
+ by ha_rollback() / THD::transaction::cleanup().
+
+ @return TRUE if the rollback failed, FALSE otherwise.
+*/
+
+static bool xa_trans_force_rollback(THD *thd)
+{
+ /*
+ We must reset rm_error before calling ha_rollback(),
+ so thd->transaction.xid structure gets reset
+ by ha_rollback()/THD::transaction::cleanup().
+ */
+ thd->transaction.xid_state.rm_error= 0;
+ if (ha_rollback_trans(thd, true))
+ {
+ my_error(ER_XAER_RMERR, MYF(0));
+ return true;
+ }
+ return false;
+}
+
+
+/**
+ Starts an XA transaction with the given xid value.
+
+ @param thd Current thread
+
+ @retval FALSE Success
+ @retval TRUE Failure
+*/
+
+bool trans_xa_start(THD *thd)
+{
+ enum xa_states xa_state= thd->transaction.xid_state.xa_state;
+ DBUG_ENTER("trans_xa_start");
+
+ if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_RESUME)
+ {
+ bool not_equal= !thd->transaction.xid_state.xid.eq(thd->lex->xid);
+ if (not_equal)
+ my_error(ER_XAER_NOTA, MYF(0));
+ else
+ thd->transaction.xid_state.xa_state= XA_ACTIVE;
+ DBUG_RETURN(not_equal);
+ }
+
+ /* TODO: JOIN is not supported yet. */
+ if (thd->lex->xa_opt != XA_NONE)
+ my_error(ER_XAER_INVAL, MYF(0));
+ else if (xa_state != XA_NOTR)
+ my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
+ else if (thd->locked_tables_mode || thd->in_active_multi_stmt_transaction())
+ my_error(ER_XAER_OUTSIDE, MYF(0));
+ else if (!trans_begin(thd))
+ {
+ DBUG_ASSERT(thd->transaction.xid_state.xid.is_null());
+ thd->transaction.xid_state.xa_state= XA_ACTIVE;
+ thd->transaction.xid_state.rm_error= 0;
+ thd->transaction.xid_state.xid.set(thd->lex->xid);
+ if (xid_cache_insert(thd, &thd->transaction.xid_state))
+ {
+ thd->transaction.xid_state.xa_state= XA_NOTR;
+ thd->transaction.xid_state.xid.null();
+ trans_rollback(thd);
+ DBUG_RETURN(true);
+ }
+ DBUG_RETURN(FALSE);
+ }
+
+ DBUG_RETURN(TRUE);
+}
+
+
+/**
+ Put a XA transaction in the IDLE state.
+
+ @param thd Current thread
+
+ @retval FALSE Success
+ @retval TRUE Failure
+*/
+
+bool trans_xa_end(THD *thd)
+{
+ DBUG_ENTER("trans_xa_end");
+
+ /* TODO: SUSPEND and FOR MIGRATE are not supported yet. */
+ if (thd->lex->xa_opt != XA_NONE)
+ my_error(ER_XAER_INVAL, MYF(0));
+ else if (thd->transaction.xid_state.xa_state != XA_ACTIVE)
+ my_error(ER_XAER_RMFAIL, MYF(0),
+ xa_state_names[thd->transaction.xid_state.xa_state]);
+ else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
+ my_error(ER_XAER_NOTA, MYF(0));
+ else if (!xa_trans_rolled_back(&thd->transaction.xid_state))
+ thd->transaction.xid_state.xa_state= XA_IDLE;
+
+ DBUG_RETURN(thd->is_error() ||
+ thd->transaction.xid_state.xa_state != XA_IDLE);
+}
+
+
+/**
+ Put a XA transaction in the PREPARED state.
+
+ @param thd Current thread
+
+ @retval FALSE Success
+ @retval TRUE Failure
+*/
+
+bool trans_xa_prepare(THD *thd)
+{
+ DBUG_ENTER("trans_xa_prepare");
+
+ if (thd->transaction.xid_state.xa_state != XA_IDLE)
+ my_error(ER_XAER_RMFAIL, MYF(0),
+ xa_state_names[thd->transaction.xid_state.xa_state]);
+ else if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
+ my_error(ER_XAER_NOTA, MYF(0));
+ else if (ha_prepare(thd))
+ {
+ xid_cache_delete(thd, &thd->transaction.xid_state);
+ thd->transaction.xid_state.xa_state= XA_NOTR;
+ my_error(ER_XA_RBROLLBACK, MYF(0));
+ }
+ else
+ thd->transaction.xid_state.xa_state= XA_PREPARED;
+
+ DBUG_RETURN(thd->is_error() ||
+ thd->transaction.xid_state.xa_state != XA_PREPARED);
+}
+
+
+/**
+ Commit and terminate the a XA transaction.
+
+ @param thd Current thread
+
+ @retval FALSE Success
+ @retval TRUE Failure
+*/
+
+bool trans_xa_commit(THD *thd)
+{
+ bool res= TRUE;
+ enum xa_states xa_state= thd->transaction.xid_state.xa_state;
+ DBUG_ENTER("trans_xa_commit");
+
+ if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
+ {
+ if (thd->fix_xid_hash_pins())
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ DBUG_RETURN(TRUE);
+ }
+
+ XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
+ res= !xs;
+ if (res)
+ my_error(ER_XAER_NOTA, MYF(0));
+ else
+ {
+ res= xa_trans_rolled_back(xs);
+ ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
+ xid_cache_delete(thd, xs);
+ }
+ DBUG_RETURN(res);
+ }
+
+ if (xa_trans_rolled_back(&thd->transaction.xid_state))
+ {
+ xa_trans_force_rollback(thd);
+ res= thd->is_error();
+ }
+ else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE)
+ {
+ int r= ha_commit_trans(thd, TRUE);
+ if ((res= MY_TEST(r)))
+ my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
+ }
+ else if (xa_state == XA_PREPARED && thd->lex->xa_opt == XA_NONE)
+ {
+ MDL_request mdl_request;
+
+ /*
+ Acquire metadata lock which will ensure that COMMIT is blocked
+ by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in
+ progress blocks FTWRL).
+
+ We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does.
+ */
+ mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT,
+ MDL_TRANSACTION);
+
+ if (thd->mdl_context.acquire_lock(&mdl_request,
+ thd->variables.lock_wait_timeout))
+ {
+ ha_rollback_trans(thd, TRUE);
+ my_error(ER_XAER_RMERR, MYF(0));
+ }
+ else
+ {
+ DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
+
+ res= MY_TEST(ha_commit_one_phase(thd, 1));
+ if (res)
+ my_error(ER_XAER_RMERR, MYF(0));
+ }
+ }
+ else
+ {
+ my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
+ DBUG_RETURN(TRUE);
+ }
+
+ thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
+ thd->transaction.all.reset();
+ thd->server_status&=
+ ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
+ DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
+ xid_cache_delete(thd, &thd->transaction.xid_state);
+ thd->transaction.xid_state.xa_state= XA_NOTR;
+
+ trans_track_end_trx(thd);
+
+ DBUG_RETURN(res);
+}
+
+
+/**
+ Roll back and terminate a XA transaction.
+
+ @param thd Current thread
+
+ @retval FALSE Success
+ @retval TRUE Failure
+*/
+
+bool trans_xa_rollback(THD *thd)
+{
+ bool res= TRUE;
+ enum xa_states xa_state= thd->transaction.xid_state.xa_state;
+ DBUG_ENTER("trans_xa_rollback");
+
+ if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
+ {
+ if (thd->fix_xid_hash_pins())
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ DBUG_RETURN(TRUE);
+ }
+
+ XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
+ if (!xs)
+ my_error(ER_XAER_NOTA, MYF(0));
+ else
+ {
+ xa_trans_rolled_back(xs);
+ ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
+ xid_cache_delete(thd, xs);
+ }
+ DBUG_RETURN(thd->get_stmt_da()->is_error());
+ }
+
+ if (xa_state != XA_IDLE && xa_state != XA_PREPARED && xa_state != XA_ROLLBACK_ONLY)
+ {
+ my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
+ DBUG_RETURN(TRUE);
+ }
+
+ res= xa_trans_force_rollback(thd);
+
+ thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
+ thd->transaction.all.reset();
+ thd->server_status&=
+ ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
+ DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
+ xid_cache_delete(thd, &thd->transaction.xid_state);
+ thd->transaction.xid_state.xa_state= XA_NOTR;
+
+ trans_track_end_trx(thd);
+
+ DBUG_RETURN(res);
+}
+
+
+/**
+ return the XID as it appears in the SQL function's arguments.
+ So this string can be passed to XA START, XA PREPARE etc...
+
+ @note
+ the 'buf' has to have space for at least SQL_XIDSIZE bytes.
+*/
+
+
+/*
+ 'a'..'z' 'A'..'Z', '0'..'9'
+ and '-' '_' ' ' symbols don't have to be
+ converted.
+*/
+
+static const char xid_needs_conv[128]=
+{
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,
+ 0,1,1,1,1,1,1,1,1,1,1,1,1,0,1,1,
+ 0,0,0,0,0,0,0,0,0,0,1,1,1,1,1,1,
+ 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,0,
+ 1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,1,1,1,1,1
+};
+
+/*
+ The size of XID string representation in the form
+ 'gtrid', 'bqual', formatID
+ see xid_t::get_sql_string() for details.
+*/
+#define SQL_XIDSIZE (XIDDATASIZE * 2 + 8 + MY_INT64_NUM_DECIMAL_DIGITS)
+
+/* The 'buf' has to have space for at least SQL_XIDSIZE bytes. */
+static uint get_sql_xid(XID *xid, char *buf)
+{
+ int tot_len= xid->gtrid_length + xid->bqual_length;
+ int i;
+ const char *orig_buf= buf;
+
+ for (i=0; i<tot_len; i++)
+ {
+ uchar c= ((uchar *) xid->data)[i];
+ if (c >= 128 || xid_needs_conv[c])
+ break;
+ }
+
+ if (i >= tot_len)
+ {
+ /* No need to convert characters to hexadecimals. */
+ *buf++= '\'';
+ memcpy(buf, xid->data, xid->gtrid_length);
+ buf+= xid->gtrid_length;
+ *buf++= '\'';
+ if (xid->bqual_length > 0 || xid->formatID != 1)
+ {
+ *buf++= ',';
+ *buf++= '\'';
+ memcpy(buf, xid->data+xid->gtrid_length, xid->bqual_length);
+ buf+= xid->bqual_length;
+ *buf++= '\'';
+ }
+ }
+ else
+ {
+ *buf++= 'X';
+ *buf++= '\'';
+ for (i= 0; i < xid->gtrid_length; i++)
+ {
+ *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
+ *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
+ }
+ *buf++= '\'';
+ if (xid->bqual_length > 0 || xid->formatID != 1)
+ {
+ *buf++= ',';
+ *buf++= 'X';
+ *buf++= '\'';
+ for (; i < tot_len; i++)
+ {
+ *buf++=_dig_vec_lower[((uchar*) xid->data)[i] >> 4];
+ *buf++=_dig_vec_lower[((uchar*) xid->data)[i] & 0x0f];
+ }
+ *buf++= '\'';
+ }
+ }
+
+ if (xid->formatID != 1)
+ {
+ *buf++= ',';
+ buf+= my_longlong10_to_str_8bit(&my_charset_bin, buf,
+ MY_INT64_NUM_DECIMAL_DIGITS, -10, xid->formatID);
+ }
+
+ return (uint)(buf - orig_buf);
+}
+
+
+/**
+ return the list of XID's to a client, the same way SHOW commands do.
+
+ @note
+ I didn't find in XA specs that an RM cannot return the same XID twice,
+ so mysql_xa_recover does not filter XID's to ensure uniqueness.
+ It can be easily fixed later, if necessary.
+*/
+
+static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol,
+ char *data, uint data_len, CHARSET_INFO *data_cs)
+{
+ if (xs->xa_state == XA_PREPARED)
+ {
+ protocol->prepare_for_resend();
+ protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
+ protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
+ protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
+ protocol->store(data, data_len, data_cs);
+ if (protocol->write())
+ return TRUE;
+ }
+ return FALSE;
+}
+
+
+static my_bool xa_recover_callback_short(XID_STATE *xs, Protocol *protocol)
+{
+ return xa_recover_callback(xs, protocol, xs->xid.data,
+ xs->xid.gtrid_length + xs->xid.bqual_length, &my_charset_bin);
+}
+
+
+static my_bool xa_recover_callback_verbose(XID_STATE *xs, Protocol *protocol)
+{
+ char buf[SQL_XIDSIZE];
+ uint len= get_sql_xid(&xs->xid, buf);
+ return xa_recover_callback(xs, protocol, buf, len,
+ &my_charset_utf8_general_ci);
+}
+
+
+bool mysql_xa_recover(THD *thd)
+{
+ List<Item> field_list;
+ Protocol *protocol= thd->protocol;
+ MEM_ROOT *mem_root= thd->mem_root;
+ my_hash_walk_action action;
+ DBUG_ENTER("mysql_xa_recover");
+
+ field_list.push_back(new (mem_root)
+ Item_int(thd, "formatID", 0,
+ MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
+ field_list.push_back(new (mem_root)
+ Item_int(thd, "gtrid_length", 0,
+ MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
+ field_list.push_back(new (mem_root)
+ Item_int(thd, "bqual_length", 0,
+ MY_INT32_NUM_DECIMAL_DIGITS), mem_root);
+ {
+ uint len;
+ CHARSET_INFO *cs;
+
+ if (thd->lex->verbose)
+ {
+ len= SQL_XIDSIZE;
+ cs= &my_charset_utf8_general_ci;
+ action= (my_hash_walk_action) xa_recover_callback_verbose;
+ }
+ else
+ {
+ len= XIDDATASIZE;
+ cs= &my_charset_bin;
+ action= (my_hash_walk_action) xa_recover_callback_short;
+ }
+
+ field_list.push_back(new (mem_root)
+ Item_empty_string(thd, "data", len, cs), mem_root);
+ }
+
+ if (protocol->send_result_set_metadata(&field_list,
+ Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
+ DBUG_RETURN(1);
+
+ if (xid_cache_iterate(thd, action, protocol))
+ DBUG_RETURN(1);
+ my_eof(thd);
+ DBUG_RETURN(0);
+}
diff --git a/sql/xa.h b/sql/xa.h
new file mode 100644
index 00000000000..e0ae834648e
--- /dev/null
+++ b/sql/xa.h
@@ -0,0 +1,67 @@
+/*
+ Copyright (c) 2000, 2016, Oracle and/or its affiliates.
+ Copyright (c) 2009, 2019, MariaDB Corporation.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
+*/
+
+
+enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
+extern const char *xa_state_names[];
+class XID_cache_element;
+
+struct XID_STATE {
+ /* For now, this is only used to catch duplicated external xids */
+ XID xid; // transaction identifier
+ enum xa_states xa_state; // used by external XA only
+ /* Error reported by the Resource Manager (RM) to the Transaction Manager. */
+ uint rm_error;
+ XID_cache_element *xid_cache_element;
+
+ /**
+ Check that XA transaction has an uncommitted work. Report an error
+ to the user in case when there is an uncommitted work for XA transaction.
+
+ @return result of check
+ @retval false XA transaction is NOT in state IDLE, PREPARED
+ or ROLLBACK_ONLY.
+ @retval true XA transaction is in state IDLE or PREPARED
+ or ROLLBACK_ONLY.
+ */
+
+ bool check_has_uncommitted_xa() const
+ {
+ if (xa_state == XA_IDLE ||
+ xa_state == XA_PREPARED ||
+ xa_state == XA_ROLLBACK_ONLY)
+ {
+ my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[xa_state]);
+ return true;
+ }
+ return false;
+ }
+};
+
+void xid_cache_init(void);
+void xid_cache_free(void);
+bool xid_cache_insert(XID *xid, enum xa_states xa_state);
+bool xid_cache_insert(THD *thd, XID_STATE *xid_state);
+void xid_cache_delete(THD *thd, XID_STATE *xid_state);
+
+bool trans_xa_start(THD *thd);
+bool trans_xa_end(THD *thd);
+bool trans_xa_prepare(THD *thd);
+bool trans_xa_commit(THD *thd);
+bool trans_xa_rollback(THD *thd);
+bool mysql_xa_recover(THD *thd);