diff options
author | unknown <knielsen@knielsen-hq.org> | 2014-03-12 00:14:49 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2014-03-12 00:14:49 +0100 |
commit | 8b9b7ec395df111f886224a565b63a1a312e5679 (patch) | |
tree | a44e2719f727964a854ed9c45e26e7314f6b62a4 /sql/rpl_gtid.cc | |
parent | 2c2478b82260f5110ea2c5bed3c6c7bcd3558453 (diff) | |
download | mariadb-git-8b9b7ec395df111f886224a565b63a1a312e5679.tar.gz |
MDEV-5804: If same GTID is received on multiple master connections in multi-source replication, the event is double-executed causing corruption or replication failure
Some fixes, mainly to make it work in non-parallel replication mode also
(--slave-parallel-threads=0).
Patch should be fairly complete now.
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 107 |
1 files changed, 89 insertions, 18 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index b66651ae5fd..17c3b15c902 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -34,7 +34,7 @@ const LEX_STRING rpl_gtid_slave_state_table_name= void rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, - const Relay_log_info *rli) + rpl_group_info *rgi) { int err; /* @@ -45,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, it is even committed. */ mysql_mutex_lock(&LOCK_slave_state); - err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi); mysql_mutex_unlock(&LOCK_slave_state); if (err) { @@ -75,9 +75,13 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) if ((sub_id= rgi->gtid_sub_id)) { rgi->gtid_sub_id= 0; - if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) - DBUG_RETURN(1); - update_state_hash(sub_id, &rgi->current_gtid, rgi->rli); + if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) + { + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) + DBUG_RETURN(1); + update_state_hash(sub_id, &rgi->current_gtid, rgi); + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; } DBUG_RETURN(0); } @@ -110,16 +114,21 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) -1 Error (out of memory to allocate a new element for the domain). */ int -rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) +rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi) { uint32 domain_id= gtid->domain_id; uint32 seq_no= gtid->seq_no; rpl_slave_state::element *elem; int res; + bool did_enter_cond; + PSI_stage_info old_stage; + THD *thd; + Relay_log_info *rli= rgi->rli; mysql_mutex_lock(&LOCK_slave_state); if (!(elem= get_element(domain_id))) { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); res= -1; goto err; } @@ -129,13 +138,14 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) each lock release and re-take. */ - /* ToDo: Make this wait killable. */ + did_enter_cond= false; for (;;) { if (elem->highest_seq_no >= seq_no) { /* This sequence number is already applied, ignore it. */ res= 0; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE; break; } if (!elem->owner_rli) @@ -143,6 +153,7 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) /* The domain became free, grab it and apply the event. */ elem->owner_rli= rli; elem->owner_count= 1; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER; res= 1; break; } @@ -150,23 +161,78 @@ rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) { /* Already own this domain, increment reference count and apply event. */ ++elem->owner_count; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER; res= 1; break; } + thd= rgi->thd; + if (thd->check_killed()) + { + thd->send_kill_message(); + res= -1; + break; + } /* Someone else is currently processing this GTID (or an earlier one). Wait for them to complete (or fail), and then check again. */ + if (!did_enter_cond) + { + thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state, + &stage_gtid_wait_other_connection, &old_stage); + did_enter_cond= true; + } mysql_cond_wait(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state); } err: - mysql_mutex_unlock(&LOCK_slave_state); + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&LOCK_slave_state); return res; } +void +rpl_slave_state::release_domain_owner(rpl_group_info *rgi) +{ + element *elem= NULL; + + mysql_mutex_lock(&LOCK_slave_state); + if (!(elem= get_element(rgi->current_gtid.domain_id))) + { + /* + We cannot really deal with error here, as we are already called in an + error handling case (transaction failure and rollback). + + However, get_element() only fails if the element did not exist already + and could not be allocated due to out-of-memory - and if it did not + exist, then we would not get here in the first place. + */ + mysql_mutex_unlock(&LOCK_slave_state); + return; + } + + if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER) + { + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rgi->rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; + mysql_mutex_unlock(&LOCK_slave_state); +} + + static void rpl_slave_state_free_element(void *arg) { @@ -233,7 +299,7 @@ rpl_slave_state::deinit() int rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no, const Relay_log_info *rli) + uint64 seq_no, rpl_group_info *rgi) { element *elem= NULL; list_element *list_elem= NULL; @@ -256,18 +322,23 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, mysql_cond_broadcast(&elem->COND_wait_gtid); } - if (opt_gtid_ignore_duplicates && rli) + if (rgi) { - uint32 count= elem->owner_count; - DBUG_ASSERT(count > 0); - DBUG_ASSERT(elem->owner_rli == rli); - --count; - elem->owner_count= count; - if (count == 0) + if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER) { - elem->owner_rli= NULL; - mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + Relay_log_info *rli= rgi->rli; + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; } if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) |