summaryrefslogtreecommitdiff
path: root/sql/rpl_gtid.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-03-12 00:14:49 +0100
committerunknown <knielsen@knielsen-hq.org>2014-03-12 00:14:49 +0100
commit8b9b7ec395df111f886224a565b63a1a312e5679 (patch)
treea44e2719f727964a854ed9c45e26e7314f6b62a4 /sql/rpl_gtid.cc
parent2c2478b82260f5110ea2c5bed3c6c7bcd3558453 (diff)
downloadmariadb-git-8b9b7ec395df111f886224a565b63a1a312e5679.tar.gz
MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure
Some fixes, mainly to make it work in non-parallel replication mode also (--slave-parallel-threads=0). Patch should be fairly complete now.
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r--sql/rpl_gtid.cc107
1 files changed, 89 insertions, 18 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index b66651ae5fd..17c3b15c902 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -34,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name=
void
rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
- const Relay_log_info *rli)
+ rpl_group_info *rgi)
{
int err;
/*
@@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
it is even committed.
*/
mysql_mutex_lock(&LOCK_slave_state);
- err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli);
+ err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi);
mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
@@ -75,9 +75,13 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
if ((sub_id= rgi->gtid_sub_id))
{
rgi->gtid_sub_id= 0;
- if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
- DBUG_RETURN(1);
- update_state_hash(sub_id, &rgi->current_gtid, rgi->rli);
+ if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
+ {
+ if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
+ DBUG_RETURN(1);
+ update_state_hash(sub_id, &rgi->current_gtid, rgi);
+ }
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
}
DBUG_RETURN(0);
}
@@ -110,16 +114,21 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
-1 Error (out of memory to allocate a new element for the domain).
*/
int
-rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
+rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi)
{
uint32 domain_id= gtid->domain_id;
uint32 seq_no= gtid->seq_no;
rpl_slave_state::element *elem;
int res;
+ bool did_enter_cond;
+ PSI_stage_info old_stage;
+ THD *thd;
+ Relay_log_info *rli= rgi->rli;
mysql_mutex_lock(&LOCK_slave_state);
if (!(elem= get_element(domain_id)))
{
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
res= -1;
goto err;
}
@@ -129,13 +138,14 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
each lock release and re-take.
*/
- /* ToDo: Make this wait killable. */
+ did_enter_cond= false;
for (;;)
{
if (elem->highest_seq_no >= seq_no)
{
/* This sequence number is already applied, ignore it. */
res= 0;
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE;
break;
}
if (!elem->owner_rli)
@@ -143,6 +153,7 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
/* The domain became free, grab it and apply the event. */
elem->owner_rli= rli;
elem->owner_count= 1;
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
res= 1;
break;
}
@@ -150,23 +161,78 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli)
{
/* Already own this domain, increment reference count and apply event. */
++elem->owner_count;
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER;
res= 1;
break;
}
+ thd= rgi->thd;
+ if (thd->check_killed())
+ {
+ thd->send_kill_message();
+ res= -1;
+ break;
+ }
/*
Someone else is currently processing this GTID (or an earlier one).
Wait for them to complete (or fail), and then check again.
*/
+ if (!did_enter_cond)
+ {
+ thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state,
+ &stage_gtid_wait_other_connection, &old_stage);
+ did_enter_cond= true;
+ }
mysql_cond_wait(&elem->COND_gtid_ignore_duplicates,
&LOCK_slave_state);
}
err:
- mysql_mutex_unlock(&LOCK_slave_state);
+ if (did_enter_cond)
+ thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&LOCK_slave_state);
return res;
}
+void
+rpl_slave_state::release_domain_owner(rpl_group_info *rgi)
+{
+ element *elem= NULL;
+
+ mysql_mutex_lock(&LOCK_slave_state);
+ if (!(elem= get_element(rgi->current_gtid.domain_id)))
+ {
+ /*
+ We cannot really deal with error here, as we are already called in an
+ error handling case (transaction failure and rollback).
+
+ However, get_element() only fails if the element did not exist already
+ and could not be allocated due to out-of-memory - and if it did not
+ exist, then we would not get here in the first place.
+ */
+ mysql_mutex_unlock(&LOCK_slave_state);
+ return;
+ }
+
+ if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER)
+ {
+ uint32 count= elem->owner_count;
+ DBUG_ASSERT(count > 0);
+ DBUG_ASSERT(elem->owner_rli == rgi->rli);
+ --count;
+ elem->owner_count= count;
+ if (count == 0)
+ {
+ elem->owner_rli= NULL;
+ mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+ }
+ }
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
+ mysql_mutex_unlock(&LOCK_slave_state);
+}
+
+
static void
rpl_slave_state_free_element(void *arg)
{
@@ -233,7 +299,7 @@ rpl_slave_state::deinit()
int
rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
- uint64 seq_no, const Relay_log_info *rli)
+ uint64 seq_no, rpl_group_info *rgi)
{
element *elem= NULL;
list_element *list_elem= NULL;
@@ -256,18 +322,23 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
mysql_cond_broadcast(&elem->COND_wait_gtid);
}
- if (opt_gtid_ignore_duplicates && rli)
+ if (rgi)
{
- uint32 count= elem->owner_count;
- DBUG_ASSERT(count > 0);
- DBUG_ASSERT(elem->owner_rli == rli);
- --count;
- elem->owner_count= count;
- if (count == 0)
+ if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER)
{
- elem->owner_rli= NULL;
- mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+ Relay_log_info *rli= rgi->rli;
+ uint32 count= elem->owner_count;
+ DBUG_ASSERT(count > 0);
+ DBUG_ASSERT(elem->owner_rli == rli);
+ --count;
+ elem->owner_count= count;
+ if (count == 0)
+ {
+ elem->owner_rli= NULL;
+ mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates);
+ }
}
+ rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL;
}
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))