diff options
author | unknown <knielsen@knielsen-hq.org> | 2014-02-08 22:28:41 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2014-02-08 22:28:41 +0100 |
commit | 76e929a92e6fb06b649ed9f199484b64946b6152 (patch) | |
tree | 4d11b143e928cfb6924e99298a3b9d2ff2aa013d /sql | |
parent | 3c97d24f74b8bfb72746b0a32a78193f20665c44 (diff) | |
download | mariadb-git-76e929a92e6fb06b649ed9f199484b64946b6152.tar.gz |
MDEV-4984: Implement MASTER_GTID_WAIT() and @@LAST_GTID.
Rewrite the gtid_waiting::wait_for_gtid() function.
The code was rubbish (and buggy). Now the logic is
much clearer.
Also fix a missing slave sync that could cause test failure.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/rpl_gtid.cc | 339 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 104 |
2 files changed, 242 insertions, 201 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 2f2254d53af..d7923ed9130 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -157,7 +157,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, if (seq_no > elem->highest_seq_no) elem->highest_seq_no= seq_no; - if (elem->min_wait_seq_no != 0 && elem->min_wait_seq_no <= seq_no) + if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no) { /* Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear. @@ -166,7 +166,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, replication SQL thread. */ mysql_mutex_assert_owner(&LOCK_slave_state); - elem->min_wait_seq_no= 0; + elem->gtid_waiter= NULL; mysql_cond_broadcast(&elem->COND_wait_gtid); } @@ -198,7 +198,7 @@ rpl_slave_state::get_element(uint32 domain_id) elem->list= NULL; elem->domain_id= domain_id; elem->highest_seq_no= 0; - elem->min_wait_seq_no= 0; + elem->gtid_waiter= NULL; mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0); if (my_hash_insert(&hash, (uchar *)elem)) { @@ -1732,8 +1732,7 @@ gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he) if (queue_empty(&he->queue)) return; qe= (queue_element *)queue_top(&he->queue); - qe->thd->wakeup_ready= true; - qe->wakeup_reason= queue_element::TAKEOVER; + qe->do_small_wait= true; mysql_cond_signal(&qe->thd->COND_wakeup_ready); } @@ -1747,14 +1746,14 @@ gtid_waiting::process_wait_hash(uint64 wakeup_seq_no, { queue_element *qe; - if (queue_first_element(&he->queue) > queue_last_element(&he->queue)) + if (queue_empty(&he->queue)) break; qe= (queue_element *)queue_top(&he->queue); if (qe->wait_seq_no > wakeup_seq_no) break; + DBUG_ASSERT(!qe->done); queue_remove_top(&he->queue); - qe->thd->wakeup_ready= true; - qe->wakeup_reason= queue_element::DONE; + qe->done= true;; mysql_cond_signal(&qe->thd->COND_wakeup_ready); } } @@ -1775,15 +1774,29 @@ gtid_waiting::process_wait_hash(uint64 wakeup_seq_no, If there is already a small waiter, a new thread will either replace the small waiter (if it needs to wait for an earlier sequence number), or - instead to a "large" wait. + instead do a "large" wait. Once awoken on the small wait, the waiting thread releases the lock shared with the SQL threads quickly, and then processes all waiters currently doing - the large wait. + the large wait using a different lock that does not impact replication. This way, the SQL threads only need to do a single check + possibly a pthread_cond_signal() when updating the gtid_slave_state, and the time that - non-SQL threads contend for the lock on gtid_slave_staste is minimized. + non-SQL threads contend for the lock on gtid_slave_state is minimized. + + There is always at least one thread that has the responsibility to ensure + that there is a small waiter; this thread has queue_element::do_small_wait + set to true. This thread will do the small wait until it is done, at which + point it will make sure to pass on the responsibility to another thread. + Normally only one thread has do_small_wait==true, but it can occasionally + happen that there is more than one, when threads race one another for the + lock on the small wait (this results in slightly increased activity on the + small lock but is otherwise harmless). + + Returns: + 0 Wait completed normally + -1 Wait completed due to timeout + 1 An error (my_error() will have been called to set the error in the da) */ int gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, @@ -1798,189 +1811,207 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, rpl_slave_state::element *slave_state_elem= NULL; const char *old_msg= NULL; bool did_enter_cond= false; - bool takeover= false; elem.wait_seq_no= seq_no; elem.thd= thd; - /* - Register on the large wait before checking the small wait. - This ensures that if we find another waiter already doing the small wait, - we are sure to be woken up by that one, and thus we will not need to take - the lock on the small wait more than once in this case. - */ + elem.done= false; + mysql_mutex_lock(&LOCK_gtid_waiting); - if (!(he= register_in_wait_hash(thd, wait_gtid, &elem))) + if (!(he= get_entry(wait_gtid->domain_id))) { mysql_mutex_unlock(&LOCK_gtid_waiting); return 1; } - /* - Now check the small wait, and either do the large wait or the small one, - depending on whether there is already a suitable small waiter or not. + If there is already another waiter with seq_no no larger than our own, + we are sure that there is already a small waiter that will wake us up + (or later pass the small wait responsibility to us). So in this case, we + do not need to touch the small wait lock at all. + */ + elem.do_small_wait= + (queue_empty(&he->queue) || + ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no); - We may need to do this multiple times, as a previous small waiter may - complete and pass the small wait on to us. + if (register_in_wait_queue(thd, wait_gtid, he, &elem)) + { + mysql_mutex_unlock(&LOCK_gtid_waiting); + return 1; + } + /* + Loop, doing either the small or large wait as appropriate, until either + the position waited for is reached, or we get a kill or timeout. */ for (;;) { - uint64 wakeup_seq_no, cur_wait_seq_no; - mysql_mutex_assert_owner(&LOCK_gtid_waiting); - mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); - /* - The elements in the gtid_slave_state_hash are never re-allocated once - they enter the hash, so we do not need to re-do the lookup after releasing - and re-aquiring the lock. - */ - if (!slave_state_elem && - !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id))) - { - mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); - remove_from_wait_hash(he, &elem); - mysql_mutex_unlock(&LOCK_gtid_waiting); - my_error(ER_OUT_OF_RESOURCES, MYF(0)); - return 1; - } - if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no) + if (elem.do_small_wait) { + uint64 wakeup_seq_no; + queue_element *cur_waiter; + + mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); /* - We do not have to wait. But we might need to wakeup other threads on - the large wait (can happen if we were woken up to take over the small - wait, and SQL thread raced with us to reach the waited-for GTID. - */ - mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); - thd->wakeup_ready= 0; - process_wait_hash(wakeup_seq_no, he); - /* - Since we already checked wakeup_seq_no, we are sure that - process_wait_hash() will mark us done. + The elements in the gtid_slave_state_hash are never re-allocated once + they enter the hash, so we do not need to re-do the lookup after releasing + and re-aquiring the lock. */ - DBUG_ASSERT(thd->wakeup_ready); - if (thd->wakeup_ready) + if (!slave_state_elem && + !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id))) { - if (takeover) - promote_new_waiter(he); - break; + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + remove_from_wait_queue(he, &elem); + promote_new_waiter(he); + if (did_enter_cond) + thd->exit_cond(old_msg); + else + mysql_mutex_unlock(&LOCK_gtid_waiting); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; } - } - else if ((cur_wait_seq_no= slave_state_elem->min_wait_seq_no) == 0 || - cur_wait_seq_no > seq_no) - { - /* - We have to do the small wait ourselves (stealing it from any thread that - might already be waiting for a later seq_no). - */ - slave_state_elem->min_wait_seq_no= seq_no; - if (cur_wait_seq_no != 0) + + if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no) { - /* We stole the wait, so wake up the old waiting thread. */ - mysql_cond_signal(&slave_state_elem->COND_wait_gtid); + /* + We do not have to wait. (We will be removed from the wait queue when + we call process_wait_hash() below. + */ + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + } + else if ((cur_waiter= slave_state_elem->gtid_waiter) && + slave_state_elem->min_wait_seq_no <= seq_no) + { + /* + There is already a suitable small waiter, go do the large wait. + (Normally we would not have needed to check the small wait in this + case, but it can happen if we race with another thread for the small + lock). + */ + elem.do_small_wait= false; + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); } - /* Do the small wait. */ - if (did_enter_cond) - thd->exit_cond(old_msg); else - mysql_mutex_unlock(&LOCK_gtid_waiting); - - old_msg= thd->enter_cond(&slave_state_elem->COND_wait_gtid, - &rpl_global_gtid_slave_state.LOCK_slave_state, - "Waiting in MASTER_GTID_WAIT() (primary waiter)"); - do { - if (thd->check_killed()) - slave_state_elem->min_wait_seq_no = 0; - else if (wait_until) + /* + We have to do the small wait ourselves (stealing it from any thread + that might already be waiting for a later seq_no). + */ + slave_state_elem->gtid_waiter= &elem; + slave_state_elem->min_wait_seq_no= seq_no; + if (cur_waiter) { - int err= - mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid, - &rpl_global_gtid_slave_state.LOCK_slave_state, - wait_until); - if (err == ETIMEDOUT || err == ETIME) + /* We stole the wait, so wake up the old waiting thread. */ + mysql_cond_signal(&slave_state_elem->COND_wait_gtid); + } + + /* Release the large lock, and do the small wait. */ + if (did_enter_cond) + { + thd->exit_cond(old_msg); + did_enter_cond= false; + } + else + mysql_mutex_unlock(&LOCK_gtid_waiting); + old_msg= + thd->enter_cond(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state.LOCK_slave_state, + "Waiting in MASTER_GTID_WAIT() (primary waiter)"); + do + { + if (thd->check_killed()) + break; + else if (wait_until) { - timed_out= true; - slave_state_elem->min_wait_seq_no = 0; + int err= + mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state.LOCK_slave_state, + wait_until); + if (err == ETIMEDOUT || err == ETIME) + { + timed_out= true; + break; + } } + else + mysql_cond_wait(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state.LOCK_slave_state); + } while (slave_state_elem->gtid_waiter == &elem); + wakeup_seq_no= slave_state_elem->highest_seq_no; + /* + If we aborted due to timeout or kill, remove us as waiter. + + If we were replaced by another waiter with a smaller seq_no, then we + no longer have responsibility for the small wait. + */ + if ((cur_waiter= slave_state_elem->gtid_waiter)) + { + if (cur_waiter == &elem) + slave_state_elem->gtid_waiter= NULL; + else if (slave_state_elem->min_wait_seq_no <= seq_no) + elem.do_small_wait= false; } - else - mysql_cond_wait(&slave_state_elem->COND_wait_gtid, - &rpl_global_gtid_slave_state.LOCK_slave_state); - } while (slave_state_elem->min_wait_seq_no == seq_no); - /* - Check the new gtid_slave_state. We could be woken up because our seq_no - has been reached, or because someone else stole the small wait from us. - (Or because of kill/timeout). - */ - wakeup_seq_no= slave_state_elem->highest_seq_no; + thd->exit_cond(old_msg); + + mysql_mutex_lock(&LOCK_gtid_waiting); + } - thd->exit_cond(old_msg); - mysql_mutex_lock(&LOCK_gtid_waiting); /* Note that hash_entry pointers do not change once allocated, so we do - not need to lookup `he' again after re-aquiring the lock. + not need to lookup `he' again after re-aquiring LOCK_gtid_waiting. */ - thd->wakeup_ready= 0; process_wait_hash(wakeup_seq_no, he); - if (thd->wakeup_ready) - promote_new_waiter(he); - else if (thd->killed || timed_out) - { - remove_from_wait_hash(he, &elem); - promote_new_waiter(he); - if (thd->killed) - thd->send_kill_message(); - break; - } } else { - /* We have to do the large wait. */ - mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); - thd->wakeup_ready= 0; - } - - takeover= false; - old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting, - "Waiting in MASTER_GTID_WAIT()"); - while (!thd->wakeup_ready && !thd->check_killed() && !timed_out) - { - thd_wait_begin(thd, THD_WAIT_BINLOG); - if (wait_until) + /* Do the large wait. */ + if (!did_enter_cond) { - int err= mysql_cond_timedwait(&thd->COND_wakeup_ready, - &LOCK_gtid_waiting, wait_until); - if (err == ETIMEDOUT || err == ETIME) - timed_out= true; + old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting, + "Waiting in MASTER_GTID_WAIT()"); + did_enter_cond= true; + } + while (!elem.done && !thd->check_killed()) + { + thd_wait_begin(thd, THD_WAIT_BINLOG); + if (wait_until) + { + int err= mysql_cond_timedwait(&thd->COND_wakeup_ready, + &LOCK_gtid_waiting, wait_until); + if (err == ETIMEDOUT || err == ETIME) + timed_out= true; + } + else + mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting); + thd_wait_end(thd); + if (elem.do_small_wait || timed_out) + break; } - else - mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting); - thd_wait_end(thd); } - if (thd->killed || timed_out) + if ((thd->killed || timed_out) && !elem.done) + { + /* Aborted, so remove ourselves from the hash. */ + remove_from_wait_queue(he, &elem); + elem.done= true; + } + if (elem.done) { - remove_from_wait_hash(he, &elem); /* - If we got kill/timeout _and_ we were asked to takeover the small wait, - we need to pass on that task to someone else. + If our wait is done, but we have (or were passed) responsibility for + the small wait, then we need to pass on that task to someone else. */ - if (thd->wakeup_ready && elem.wakeup_reason == queue_element::TAKEOVER) + if (elem.do_small_wait) promote_new_waiter(he); - if (thd->killed) - thd->send_kill_message(); break; } - - if (elem.wakeup_reason == queue_element::DONE) - break; - takeover= true; } if (did_enter_cond) thd->exit_cond(old_msg); else mysql_mutex_unlock(&LOCK_gtid_waiting); + if (thd->killed) + thd->send_kill_message(); #endif /* HAVE_REPLICATION */ return timed_out ? -1 : 0; } @@ -2060,32 +2091,28 @@ gtid_waiting::get_entry(uint32 domain_id) } -gtid_waiting::hash_element * -gtid_waiting::register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid, - gtid_waiting::queue_element *elem) +int +gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, + gtid_waiting::hash_element *he, + gtid_waiting::queue_element *elem) { - hash_element *e; - mysql_mutex_assert_owner(&LOCK_gtid_waiting); - if (!(e= get_entry(wait_gtid->domain_id))) - return NULL; - - if (queue_insert_safe(&e->queue, (uchar *)elem)) + if (queue_insert_safe(&he->queue, (uchar *)elem)) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); - return NULL; + return 1; } - return e; + return 0; } void -gtid_waiting::remove_from_wait_hash(gtid_waiting::hash_element *e, - gtid_waiting::queue_element *elem) +gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he, + gtid_waiting::queue_element *elem) { mysql_mutex_assert_owner(&LOCK_gtid_waiting); - queue_remove(&e->queue, elem->queue_idx); + queue_remove(&he->queue, elem->queue_idx); } diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 1cf57c45018..54f352661a7 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -41,6 +41,57 @@ enum enum_gtid_skip_type { /* + Structure to keep track of threads waiting in MASTER_GTID_WAIT(). + + Since replication is (mostly) single-threaded, we want to minimise the + performance impact on that from MASTER_GTID_WAIT(). To achieve this, we + are careful to keep the common lock between replication threads and + MASTER_GTID_WAIT threads held for as short as possible. We keep only + a single thread waiting to be notified by the replication threads; this + thread then handles all the (potentially heavy) lifting of dealing with + all current waiting threads. +*/ +struct gtid_waiting { + /* Elements in the hash, basically a priority queue for each domain. */ + struct hash_element { + QUEUE queue; + uint32 domain_id; + }; + /* A priority queue to handle waiters in one domain in seq_no order. */ + struct queue_element { + uint64 wait_seq_no; + THD *thd; + int queue_idx; + /* + do_small_wait is true if we have responsibility for ensuring that there + is a small waiter. + */ + bool do_small_wait; + /* + The flag `done' is set when the wait is completed (either due to reaching + the position waited for, or due to timeout or kill). The queue_element + is in the queue if and only if `done' is true. + */ + bool done; + }; + + mysql_mutex_t LOCK_gtid_waiting; + HASH hash; + + void init(); + void destroy(); + hash_element *get_entry(uint32 domain_id); + int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us); + void promote_new_waiter(gtid_waiting::hash_element *he); + int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until); + void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he); + int register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, hash_element *he, + queue_element *elem); + void remove_from_wait_queue(hash_element *he, queue_element *elem); +}; + + +/* Replication slave state. For every independent replication stream (identified by domain_id), this @@ -68,9 +119,14 @@ struct rpl_slave_state /* Highest seq_no seen so far in this domain. */ uint64 highest_seq_no; /* - If min_wait_seq_no is non-zero, then it is the smallest seq_no in this - domain that someone is doing MASTER_GTID_WAIT() on. When we reach this - seq_no, we need to signal the waiter on COND_wait_gtid. + If this is non-NULL, then it is the waiter responsible for the small + wait in MASTER_GTID_WAIT(). + */ + gtid_waiting::queue_element *gtid_waiter; + /* + If gtid_waiter is non-NULL, then this is the seq_no that its + MASTER_GTID_WAIT() is waiting on. When we reach this seq_no, we need to + signal the waiter on COND_wait_gtid. */ uint64 min_wait_seq_no; mysql_cond_t COND_wait_gtid; @@ -215,48 +271,6 @@ struct slave_connection_state }; -/* - Structure to keep track of threads waiting in MASTER_GTID_WAIT(). - - Since replication is (mostly) single-threaded, we want to minimise the - performance impact on that from MASTER_GTID_WAIT(). To achieve this, we - are careful to keep the common lock between replication threads and - MASTER_GTID_WAIT threads held for as short as possible. We keep only - a single thread waiting to be notified by the replication threads; this - thread then handles all the (potentially heavy) lifting of dealing with - all current waiting threads. -*/ - -struct gtid_waiting { - /* Elements in the hash, basically a priority queue for each domain. */ - struct hash_element { - QUEUE queue; - uint32 domain_id; - }; - /* A priority queue to handle waiters in one domain in seq_no order. */ - struct queue_element { - uint64 wait_seq_no; - THD *thd; - int queue_idx; - enum { DONE, TAKEOVER } wakeup_reason; - }; - - mysql_mutex_t LOCK_gtid_waiting; - HASH hash; - - void init(); - void destroy(); - hash_element *get_entry(uint32 domain_id); - int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us); - void promote_new_waiter(gtid_waiting::hash_element *he); - int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until); - void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he); - hash_element *register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid, - queue_element *elem); - void remove_from_wait_hash(hash_element *e, queue_element *elem); -}; - - extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid, bool *first); extern int gtid_check_rpl_slave_state_table(TABLE *table); |