summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/handler.cc43
-rw-r--r--sql/handler.h4
-rw-r--r--sql/mysqld.cc6
-rw-r--r--sql/sql_class.cc232
-rw-r--r--sql/sql_class.h15
-rw-r--r--sql/transaction.cc27
6 files changed, 212 insertions, 115 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 12d7ffb2f5e..a45419f95ca 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1931,12 +1931,28 @@ int ha_recover(HASH *commit_list)
so mysql_xa_recover does not filter XID's to ensure uniqueness.
It can be easily fixed later, if necessary.
*/
+
+static my_bool xa_recover_callback(XID_STATE *xs, Protocol *protocol)
+{
+ if (xs->xa_state == XA_PREPARED)
+ {
+ protocol->prepare_for_resend();
+ protocol->store_longlong((longlong) xs->xid.formatID, FALSE);
+ protocol->store_longlong((longlong) xs->xid.gtrid_length, FALSE);
+ protocol->store_longlong((longlong) xs->xid.bqual_length, FALSE);
+ protocol->store(xs->xid.data, xs->xid.gtrid_length + xs->xid.bqual_length,
+ &my_charset_bin);
+ if (protocol->write())
+ return TRUE;
+ }
+ return FALSE;
+}
+
+
bool mysql_xa_recover(THD *thd)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
- int i=0;
- XID_STATE *xs;
DBUG_ENTER("mysql_xa_recover");
field_list.push_back(new Item_int("formatID", 0, MY_INT32_NUM_DECIMAL_DIGITS));
@@ -1948,26 +1964,9 @@ bool mysql_xa_recover(THD *thd)
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(1);
- mysql_mutex_lock(&LOCK_xid_cache);
- while ((xs= (XID_STATE*) my_hash_element(&xid_cache, i++)))
- {
- if (xs->xa_state==XA_PREPARED)
- {
- protocol->prepare_for_resend();
- protocol->store_longlong((longlong)xs->xid.formatID, FALSE);
- protocol->store_longlong((longlong)xs->xid.gtrid_length, FALSE);
- protocol->store_longlong((longlong)xs->xid.bqual_length, FALSE);
- protocol->store(xs->xid.data, xs->xid.gtrid_length+xs->xid.bqual_length,
- &my_charset_bin);
- if (protocol->write())
- {
- mysql_mutex_unlock(&LOCK_xid_cache);
- DBUG_RETURN(1);
- }
- }
- }
-
- mysql_mutex_unlock(&LOCK_xid_cache);
+ if (xid_cache_iterate(thd, (my_hash_walk_action) xa_recover_callback,
+ protocol))
+ DBUG_RETURN(1);
my_eof(thd);
DBUG_RETURN(0);
}
diff --git a/sql/handler.h b/sql/handler.h
index 5ef92088df5..5ab67c045ec 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -612,11 +612,11 @@ struct xid_t {
return sizeof(formatID)+sizeof(gtrid_length)+sizeof(bqual_length)+
gtrid_length+bqual_length;
}
- uchar *key()
+ uchar *key() const
{
return (uchar *)&gtrid_length;
}
- uint key_length()
+ uint key_length() const
{
return sizeof(gtrid_length)+sizeof(bqual_length)+gtrid_length+bqual_length;
}
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index b97742d01ee..6c06240480e 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -4889,11 +4889,7 @@ static int init_server_components()
my_charset_error_reporter= charset_error_reporter;
#endif
- if (xid_cache_init())
- {
- sql_print_error("Out of memory");
- unireg_abort(1);
- }
+ xid_cache_init();
/*
initialize delegates for extension observers, errors have already
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 5b0d04dc4a9..d59ef525ba2 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -914,7 +914,8 @@ THD::THD(bool is_wsrep_applier)
wait_for_commit_ptr(0),
main_da(0, false, false),
m_stmt_da(&main_da),
- tdc_hash_pins(0)
+ tdc_hash_pins(0),
+ xid_hash_pins(0)
#ifdef WITH_WSREP
,
wsrep_applier(is_wsrep_applier),
@@ -1593,7 +1594,7 @@ void THD::cleanup(void)
transaction.xid_state.xa_state= XA_NOTR;
trans_rollback(this);
- xid_cache_delete(&transaction.xid_state);
+ xid_cache_delete(this, &transaction.xid_state);
DBUG_ASSERT(open_tables == NULL);
/*
@@ -1704,6 +1705,8 @@ THD::~THD()
main_da.free_memory();
if (tdc_hash_pins)
lf_hash_put_pins(tdc_hash_pins);
+ if (xid_hash_pins)
+ lf_hash_put_pins(xid_hash_pins);
/* Ensure everything is freed */
if (status_var.local_memory_used != 0)
{
@@ -5106,120 +5109,205 @@ void mark_transaction_to_rollback(THD *thd, bool all)
/***************************************************************************
Handling of XA id cacheing
***************************************************************************/
+class XID_cache_element
+{
+ uint m_state;
+ static const uint DELETED= 1 << 31;
+public:
+ XID_STATE *m_xid_state;
+ bool lock()
+ {
+ if (my_atomic_add32_explicit(&m_state, 1, MY_MEMORY_ORDER_ACQUIRE) & DELETED)
+ {
+ unlock();
+ return false;
+ }
+ return true;
+ }
+ void unlock()
+ {
+ my_atomic_add32_explicit(&m_state, -1, MY_MEMORY_ORDER_RELEASE);
+ }
+ void mark_deleted()
+ {
+ uint old= 0;
+ while (!my_atomic_cas32_weak_explicit(&m_state, &old, DELETED,
+ MY_MEMORY_ORDER_RELAXED,
+ MY_MEMORY_ORDER_RELAXED))
+ old= 0;
+ }
+ void mark_not_deleted()
+ {
+ DBUG_ASSERT(m_state & DELETED);
+ my_atomic_add32_explicit(&m_state, -DELETED, MY_MEMORY_ORDER_RELAXED);
+ }
+ static void lf_hash_initializer(LF_HASH *hash __attribute__((unused)),
+ XID_cache_element *element,
+ XID_STATE *xid_state)
+ {
+ element->m_xid_state= xid_state;
+ xid_state->xid_cache_element= element;
+ }
+ static void lf_alloc_constructor(uchar *ptr)
+ {
+ XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
+ element->m_state= DELETED;
+ }
+ static void lf_alloc_destructor(uchar *ptr)
+ {
+ XID_cache_element *element= (XID_cache_element*) (ptr + LF_HASH_OVERHEAD);
+ if (element->m_state != DELETED)
+ {
+ DBUG_ASSERT(!element->m_xid_state->in_thd);
+ my_free(element->m_xid_state);
+ }
+ }
+ static uchar *key(const XID_cache_element *element, size_t *length,
+ my_bool not_used __attribute__((unused)))
+ {
+ *length= element->m_xid_state->xid.key_length();
+ return element->m_xid_state->xid.key();
+ }
+};
-mysql_mutex_t LOCK_xid_cache;
-HASH xid_cache;
-extern "C" uchar *xid_get_hash_key(const uchar *, size_t *, my_bool);
-extern "C" void xid_free_hash(void *);
+static LF_HASH xid_cache;
+static bool xid_cache_inited;
-uchar *xid_get_hash_key(const uchar *ptr, size_t *length,
- my_bool not_used __attribute__((unused)))
-{
- *length=((XID_STATE*)ptr)->xid.key_length();
- return ((XID_STATE*)ptr)->xid.key();
-}
-void xid_free_hash(void *ptr)
+bool THD::fix_xid_hash_pins()
{
- if (!((XID_STATE*)ptr)->in_thd)
- my_free(ptr);
+ if (!xid_hash_pins)
+ xid_hash_pins= lf_hash_get_pins(&xid_cache);
+ return !xid_hash_pins;
}
-#ifdef HAVE_PSI_INTERFACE
-static PSI_mutex_key key_LOCK_xid_cache;
-static PSI_mutex_info all_xid_mutexes[]=
+void xid_cache_init()
{
- { &key_LOCK_xid_cache, "LOCK_xid_cache", PSI_FLAG_GLOBAL}
-};
-
-static void init_xid_psi_keys(void)
-{
- const char* category= "sql";
- int count;
-
- if (PSI_server == NULL)
- return;
-
- count= array_elements(all_xid_mutexes);
- PSI_server->register_mutex(category, all_xid_mutexes, count);
+ xid_cache_inited= true;
+ lf_hash_init(&xid_cache, sizeof(XID_cache_element), LF_HASH_UNIQUE, 0, 0,
+ (my_hash_get_key) XID_cache_element::key, &my_charset_bin);
+ xid_cache.alloc.constructor= XID_cache_element::lf_alloc_constructor;
+ xid_cache.alloc.destructor= XID_cache_element::lf_alloc_destructor;
+ xid_cache.initializer=
+ (lf_hash_initializer) XID_cache_element::lf_hash_initializer;
}
-#endif /* HAVE_PSI_INTERFACE */
-
-bool xid_cache_init()
-{
-#ifdef HAVE_PSI_INTERFACE
- init_xid_psi_keys();
-#endif
- mysql_mutex_init(key_LOCK_xid_cache, &LOCK_xid_cache, MY_MUTEX_INIT_FAST);
- return my_hash_init(&xid_cache, &my_charset_bin, 100, 0, 0,
- xid_get_hash_key, xid_free_hash, 0) != 0;
-}
void xid_cache_free()
{
- if (my_hash_inited(&xid_cache))
+ if (xid_cache_inited)
{
- my_hash_free(&xid_cache);
- mysql_mutex_destroy(&LOCK_xid_cache);
+ lf_hash_destroy(&xid_cache);
+ xid_cache_inited= false;
}
}
-XID_STATE *xid_cache_search(XID *xid)
+
+XID_STATE *xid_cache_search(THD *thd, XID *xid)
{
- mysql_mutex_lock(&LOCK_xid_cache);
- XID_STATE *res=(XID_STATE *)my_hash_search(&xid_cache, xid->key(),
- xid->key_length());
- mysql_mutex_unlock(&LOCK_xid_cache);
- return res;
+ DBUG_ASSERT(thd->xid_hash_pins);
+ XID_cache_element *element=
+ (XID_cache_element*) lf_hash_search(&xid_cache, thd->xid_hash_pins,
+ xid->key(), xid->key_length());
+ if (element)
+ {
+ lf_hash_search_unpin(thd->xid_hash_pins);
+ return element->m_xid_state;
+ }
+ return 0;
}
bool xid_cache_insert(XID *xid, enum xa_states xa_state)
{
XID_STATE *xs;
- my_bool res;
- mysql_mutex_lock(&LOCK_xid_cache);
- if (my_hash_search(&xid_cache, xid->key(), xid->key_length()))
- res=0;
- else if (!(xs=(XID_STATE *)my_malloc(sizeof(*xs), MYF(MY_WME))))
- res=1;
- else
+ LF_PINS *pins;
+ int res= 1;
+
+ if (!(pins= lf_hash_get_pins(&xid_cache)))
+ return true;
+
+ if ((xs= (XID_STATE*) my_malloc(sizeof(*xs), MYF(MY_WME))))
{
xs->xa_state=xa_state;
xs->xid.set(xid);
xs->in_thd=0;
xs->rm_error=0;
- res=my_hash_insert(&xid_cache, (uchar*)xs);
+
+ if ((res= lf_hash_insert(&xid_cache, pins, xs)))
+ my_free(xs);
+ else
+ xs->xid_cache_element->mark_not_deleted();
+ if (res == 1)
+ res= 0;
}
- mysql_mutex_unlock(&LOCK_xid_cache);
+ lf_hash_put_pins(pins);
return res;
}
-bool xid_cache_insert(XID_STATE *xid_state)
+bool xid_cache_insert(THD *thd, XID_STATE *xid_state)
{
- mysql_mutex_lock(&LOCK_xid_cache);
- if (my_hash_search(&xid_cache, xid_state->xid.key(),
- xid_state->xid.key_length()))
+ if (thd->fix_xid_hash_pins())
+ return true;
+
+ int res= lf_hash_insert(&xid_cache, thd->xid_hash_pins, xid_state);
+ switch (res)
{
- mysql_mutex_unlock(&LOCK_xid_cache);
+ case 0:
+ xid_state->xid_cache_element->mark_not_deleted();
+ break;
+ case 1:
my_error(ER_XAER_DUPID, MYF(0));
- return true;
+ default:
+ xid_state->xid_cache_element= 0;
}
- bool res= my_hash_insert(&xid_cache, (uchar*)xid_state);
- mysql_mutex_unlock(&LOCK_xid_cache);
return res;
}
-void xid_cache_delete(XID_STATE *xid_state)
+void xid_cache_delete(THD *thd, XID_STATE *xid_state)
+{
+ if (xid_state->xid_cache_element)
+ {
+ DBUG_ASSERT(thd->xid_hash_pins);
+ xid_state->xid_cache_element->mark_deleted();
+ lf_hash_delete(&xid_cache, thd->xid_hash_pins,
+ xid_state->xid.key(), xid_state->xid.key_length());
+ xid_state->xid_cache_element= 0;
+ if (!xid_state->in_thd)
+ my_free(xid_state);
+ }
+}
+
+
+struct xid_cache_iterate_arg
+{
+ my_hash_walk_action action;
+ void *argument;
+};
+
+static my_bool xid_cache_iterate_callback(XID_cache_element *element,
+ xid_cache_iterate_arg *arg)
+{
+ my_bool res= FALSE;
+ if (element->lock())
+ {
+ res= arg->action(element->m_xid_state, arg->argument);
+ element->unlock();
+ }
+ return res;
+}
+
+int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *arg)
{
- mysql_mutex_lock(&LOCK_xid_cache);
- my_hash_delete(&xid_cache, (uchar *)xid_state);
- mysql_mutex_unlock(&LOCK_xid_cache);
+ xid_cache_iterate_arg argument= { action, arg };
+ return thd->fix_xid_hash_pins() ? -1 :
+ lf_hash_iterate(&xid_cache, thd->xid_hash_pins,
+ (my_hash_walk_action) xid_cache_iterate_callback,
+ &argument);
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index bd145bd4d28..200f541f3f1 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1105,6 +1105,7 @@ struct st_savepoint {
enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
extern const char *xa_state_names[];
+class XID_cache_element;
typedef struct st_xid_state {
/* For now, this is only used to catch duplicated external xids */
@@ -1113,16 +1114,16 @@ typedef struct st_xid_state {
bool in_thd;
/* Error reported by the Resource Manager (RM) to the Transaction Manager. */
uint rm_error;
+ XID_cache_element *xid_cache_element;
} XID_STATE;
-extern mysql_mutex_t LOCK_xid_cache;
-extern HASH xid_cache;
-bool xid_cache_init(void);
+void xid_cache_init(void);
void xid_cache_free(void);
-XID_STATE *xid_cache_search(XID *xid);
+XID_STATE *xid_cache_search(THD *thd, XID *xid);
bool xid_cache_insert(XID *xid, enum xa_states xa_state);
-bool xid_cache_insert(XID_STATE *xid_state);
-void xid_cache_delete(XID_STATE *xid_state);
+bool xid_cache_insert(THD *thd, XID_STATE *xid_state);
+void xid_cache_delete(THD *thd, XID_STATE *xid_state);
+int xid_cache_iterate(THD *thd, my_hash_walk_action action, void *argument);
/**
@class Security_context
@@ -3785,6 +3786,8 @@ public:
}
LF_PINS *tdc_hash_pins;
+ LF_PINS *xid_hash_pins;
+ bool fix_xid_hash_pins();
inline ulong wsrep_binlog_format() const
{
diff --git a/sql/transaction.cc b/sql/transaction.cc
index 22e3ad7f87c..d6ef160206b 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -738,7 +738,7 @@ bool trans_xa_start(THD *thd)
thd->transaction.xid_state.xa_state= XA_ACTIVE;
thd->transaction.xid_state.rm_error= 0;
thd->transaction.xid_state.xid.set(thd->lex->xid);
- if (xid_cache_insert(&thd->transaction.xid_state))
+ if (xid_cache_insert(thd, &thd->transaction.xid_state))
{
thd->transaction.xid_state.xa_state= XA_NOTR;
thd->transaction.xid_state.xid.null();
@@ -801,7 +801,7 @@ bool trans_xa_prepare(THD *thd)
my_error(ER_XAER_NOTA, MYF(0));
else if (ha_prepare(thd))
{
- xid_cache_delete(&thd->transaction.xid_state);
+ xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
my_error(ER_XA_RBROLLBACK, MYF(0));
}
@@ -830,6 +830,11 @@ bool trans_xa_commit(THD *thd)
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
+ if (thd->fix_xid_hash_pins())
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ DBUG_RETURN(TRUE);
+ }
/*
xid_state.in_thd is always true beside of xa recovery procedure.
Note, that there is no race condition here between xid_cache_search
@@ -840,7 +845,7 @@ bool trans_xa_commit(THD *thd)
xa_cache_insert(XID, xa_states), which is called before starting
client connections, and thus is always single-threaded.
*/
- XID_STATE *xs= xid_cache_search(thd->lex->xid);
+ XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
res= !xs || xs->in_thd;
if (res)
my_error(ER_XAER_NOTA, MYF(0));
@@ -848,7 +853,7 @@ bool trans_xa_commit(THD *thd)
{
res= xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, !res);
- xid_cache_delete(xs);
+ xid_cache_delete(thd, xs);
}
DBUG_RETURN(res);
}
@@ -911,7 +916,7 @@ bool trans_xa_commit(THD *thd)
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
- xid_cache_delete(&thd->transaction.xid_state);
+ xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
DBUG_RETURN(res);
@@ -935,14 +940,20 @@ bool trans_xa_rollback(THD *thd)
if (!thd->transaction.xid_state.xid.eq(thd->lex->xid))
{
- XID_STATE *xs= xid_cache_search(thd->lex->xid);
+ if (thd->fix_xid_hash_pins())
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ DBUG_RETURN(TRUE);
+ }
+
+ XID_STATE *xs= xid_cache_search(thd, thd->lex->xid);
if (!xs || xs->in_thd)
my_error(ER_XAER_NOTA, MYF(0));
else
{
xa_trans_rolled_back(xs);
ha_commit_or_rollback_by_xid(thd->lex->xid, 0);
- xid_cache_delete(xs);
+ xid_cache_delete(thd, xs);
}
DBUG_RETURN(thd->get_stmt_da()->is_error());
}
@@ -961,7 +972,7 @@ bool trans_xa_rollback(THD *thd)
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
- xid_cache_delete(&thd->transaction.xid_state);
+ xid_cache_delete(thd, &thd->transaction.xid_state);
thd->transaction.xid_state.xa_state= XA_NOTR;
DBUG_RETURN(res);