diff options
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 189 |
1 files changed, 182 insertions, 7 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index e51dee20c19..17c3b15c902 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -33,7 +33,8 @@ const LEX_STRING rpl_gtid_slave_state_table_name= void -rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) +rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, + rpl_group_info *rgi) { int err; /* @@ -44,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) it is even committed. */ mysql_mutex_lock(&LOCK_slave_state); - err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rgi); mysql_mutex_unlock(&LOCK_slave_state); if (err) { @@ -74,19 +75,170 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) if ((sub_id= rgi->gtid_sub_id)) { rgi->gtid_sub_id= 0; - if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) - DBUG_RETURN(1); - update_state_hash(sub_id, &rgi->current_gtid); + if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE) + { + if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) + DBUG_RETURN(1); + update_state_hash(sub_id, &rgi->current_gtid, rgi); + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; } DBUG_RETURN(0); } +/* + Check GTID event execution when --gtid-ignore-duplicates. + + The idea with --gtid-ignore-duplicates is that we allow multiple master + connections (in multi-source replication) to all receive the same GTIDs and + event groups. Only one instance of each is applied; we use the sequence + number in the GTID to decide whether a GTID has already been applied. + + So if the seq_no of a GTID (or a higher sequence number) has already been + applied, then the event should be skipped. If not then the event should be + applied. + + To avoid two master connections tring to apply the same event + simultaneously, only one is allowed to work in any given domain at any point + in time. The associated Relay_log_info object is called the owner of the + domain (and there can be multiple parallel worker threads working in that + domain for that Relay_log_info). Any other Relay_log_info/master connection + must wait for the domain to become free, or for their GTID to have been + applied, before being allowed to proceed. + + Returns: + 0 This GTID is already applied, it should be skipped. + 1 The GTID is not yet applied; this rli is now the owner, and must apply + the event and release the domain afterwards. + -1 Error (out of memory to allocate a new element for the domain). +*/ +int +rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, rpl_group_info *rgi) +{ + uint32 domain_id= gtid->domain_id; + uint32 seq_no= gtid->seq_no; + rpl_slave_state::element *elem; + int res; + bool did_enter_cond; + PSI_stage_info old_stage; + THD *thd; + Relay_log_info *rli= rgi->rli; + + mysql_mutex_lock(&LOCK_slave_state); + if (!(elem= get_element(domain_id))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + res= -1; + goto err; + } + /* + Note that the elem pointer does not change once inserted in the hash. So + we can re-use the pointer without looking it up again in the hash after + each lock release and re-take. + */ + + did_enter_cond= false; + for (;;) + { + if (elem->highest_seq_no >= seq_no) + { + /* This sequence number is already applied, ignore it. */ + res= 0; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_IGNORE; + break; + } + if (!elem->owner_rli) + { + /* The domain became free, grab it and apply the event. */ + elem->owner_rli= rli; + elem->owner_count= 1; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER; + res= 1; + break; + } + if (elem->owner_rli == rli) + { + /* Already own this domain, increment reference count and apply event. */ + ++elem->owner_count; + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_OWNER; + res= 1; + break; + } + thd= rgi->thd; + if (thd->check_killed()) + { + thd->send_kill_message(); + res= -1; + break; + } + /* + Someone else is currently processing this GTID (or an earlier one). + Wait for them to complete (or fail), and then check again. + */ + if (!did_enter_cond) + { + thd->ENTER_COND(&elem->COND_gtid_ignore_duplicates, &LOCK_slave_state, + &stage_gtid_wait_other_connection, &old_stage); + did_enter_cond= true; + } + mysql_cond_wait(&elem->COND_gtid_ignore_duplicates, + &LOCK_slave_state); + } + +err: + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&LOCK_slave_state); + return res; +} + + +void +rpl_slave_state::release_domain_owner(rpl_group_info *rgi) +{ + element *elem= NULL; + + mysql_mutex_lock(&LOCK_slave_state); + if (!(elem= get_element(rgi->current_gtid.domain_id))) + { + /* + We cannot really deal with error here, as we are already called in an + error handling case (transaction failure and rollback). + + However, get_element() only fails if the element did not exist already + and could not be allocated due to out-of-memory - and if it did not + exist, then we would not get here in the first place. + */ + mysql_mutex_unlock(&LOCK_slave_state); + return; + } + + if (rgi->gtid_ignore_duplicate_state == rpl_group_info::GTID_DUPLICATE_OWNER) + { + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rgi->rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; + mysql_mutex_unlock(&LOCK_slave_state); +} + + static void rpl_slave_state_free_element(void *arg) { struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg; mysql_cond_destroy(&elem->COND_wait_gtid); + mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates); my_free(elem); } @@ -147,7 +299,7 @@ rpl_slave_state::deinit() int rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no) + uint64 seq_no, rpl_group_info *rgi) { element *elem= NULL; list_element *list_elem= NULL; @@ -170,6 +322,25 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, mysql_cond_broadcast(&elem->COND_wait_gtid); } + if (rgi) + { + if (rgi->gtid_ignore_duplicate_state==rpl_group_info::GTID_DUPLICATE_OWNER) + { + Relay_log_info *rli= rgi->rli; + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } + } + rgi->gtid_ignore_duplicate_state= rpl_group_info::GTID_DUPLICATE_NULL; + } + if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) return 1; list_elem->server_id= server_id; @@ -199,7 +370,11 @@ rpl_slave_state::get_element(uint32 domain_id) elem->domain_id= domain_id; elem->highest_seq_no= 0; elem->gtid_waiter= NULL; + elem->owner_rli= NULL; + elem->owner_count= 0; mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0); + mysql_cond_init(key_COND_gtid_ignore_duplicates, + &elem->COND_gtid_ignore_duplicates, 0); if (my_hash_insert(&hash, (uchar *)elem)) { my_free(elem); @@ -821,7 +996,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_sub_id(gtid.domain_id)) || record_gtid(thd, >id, sub_id, false, in_statement) || - update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no)) + update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL)) return 1; if (state_from_master == end) break; |