summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-02-08 22:28:41 +0100
committerunknown <knielsen@knielsen-hq.org>2014-02-08 22:28:41 +0100
commit76e929a92e6fb06b649ed9f199484b64946b6152 (patch)
tree4d11b143e928cfb6924e99298a3b9d2ff2aa013d /sql
parent3c97d24f74b8bfb72746b0a32a78193f20665c44 (diff)
downloadmariadb-git-76e929a92e6fb06b649ed9f199484b64946b6152.tar.gz
MDEV-4984: Implement MASTER_GTID_WAIT() and @@LAST_GTID.
Rewrite the gtid_waiting::wait_for_gtid() function. The code was rubbish (and buggy). Now the logic is much clearer. Also fix a missing slave sync that could cause test failure.
Diffstat (limited to 'sql')
-rw-r--r--sql/rpl_gtid.cc339
-rw-r--r--sql/rpl_gtid.h104
2 files changed, 242 insertions, 201 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 2f2254d53af..d7923ed9130 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -157,7 +157,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (seq_no > elem->highest_seq_no)
elem->highest_seq_no= seq_no;
- if (elem->min_wait_seq_no != 0 && elem->min_wait_seq_no <= seq_no)
+ if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
{
/*
Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
@@ -166,7 +166,7 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
replication SQL thread.
*/
mysql_mutex_assert_owner(&LOCK_slave_state);
- elem->min_wait_seq_no= 0;
+ elem->gtid_waiter= NULL;
mysql_cond_broadcast(&elem->COND_wait_gtid);
}
@@ -198,7 +198,7 @@ rpl_slave_state::get_element(uint32 domain_id)
elem->list= NULL;
elem->domain_id= domain_id;
elem->highest_seq_no= 0;
- elem->min_wait_seq_no= 0;
+ elem->gtid_waiter= NULL;
mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
@@ -1732,8 +1732,7 @@ gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
if (queue_empty(&he->queue))
return;
qe= (queue_element *)queue_top(&he->queue);
- qe->thd->wakeup_ready= true;
- qe->wakeup_reason= queue_element::TAKEOVER;
+ qe->do_small_wait= true;
mysql_cond_signal(&qe->thd->COND_wakeup_ready);
}
@@ -1747,14 +1746,14 @@ gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
{
queue_element *qe;
- if (queue_first_element(&he->queue) > queue_last_element(&he->queue))
+ if (queue_empty(&he->queue))
break;
qe= (queue_element *)queue_top(&he->queue);
if (qe->wait_seq_no > wakeup_seq_no)
break;
+ DBUG_ASSERT(!qe->done);
queue_remove_top(&he->queue);
- qe->thd->wakeup_ready= true;
- qe->wakeup_reason= queue_element::DONE;
+ qe->done= true;;
mysql_cond_signal(&qe->thd->COND_wakeup_ready);
}
}
@@ -1775,15 +1774,29 @@ gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
If there is already a small waiter, a new thread will either replace the
small waiter (if it needs to wait for an earlier sequence number), or
- instead to a "large" wait.
+ instead do a "large" wait.
Once awoken on the small wait, the waiting thread releases the lock shared
with the SQL threads quickly, and then processes all waiters currently doing
- the large wait.
+ the large wait using a different lock that does not impact replication.
This way, the SQL threads only need to do a single check + possibly a
pthread_cond_signal() when updating the gtid_slave_state, and the time that
- non-SQL threads contend for the lock on gtid_slave_staste is minimized.
+ non-SQL threads contend for the lock on gtid_slave_state is minimized.
+
+ There is always at least one thread that has the responsibility to ensure
+ that there is a small waiter; this thread has queue_element::do_small_wait
+ set to true. This thread will do the small wait until it is done, at which
+ point it will make sure to pass on the responsibility to another thread.
+ Normally only one thread has do_small_wait==true, but it can occasionally
+ happen that there is more than one, when threads race one another for the
+ lock on the small wait (this results in slightly increased activity on the
+ small lock but is otherwise harmless).
+
+ Returns:
+ 0 Wait completed normally
+ -1 Wait completed due to timeout
+ 1 An error (my_error() will have been called to set the error in the da)
*/
int
gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
@@ -1798,189 +1811,207 @@ gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
rpl_slave_state::element *slave_state_elem= NULL;
const char *old_msg= NULL;
bool did_enter_cond= false;
- bool takeover= false;
elem.wait_seq_no= seq_no;
elem.thd= thd;
- /*
- Register on the large wait before checking the small wait.
- This ensures that if we find another waiter already doing the small wait,
- we are sure to be woken up by that one, and thus we will not need to take
- the lock on the small wait more than once in this case.
- */
+ elem.done= false;
+
mysql_mutex_lock(&LOCK_gtid_waiting);
- if (!(he= register_in_wait_hash(thd, wait_gtid, &elem)))
+ if (!(he= get_entry(wait_gtid->domain_id)))
{
mysql_mutex_unlock(&LOCK_gtid_waiting);
return 1;
}
-
/*
- Now check the small wait, and either do the large wait or the small one,
- depending on whether there is already a suitable small waiter or not.
+ If there is already another waiter with seq_no no larger than our own,
+ we are sure that there is already a small waiter that will wake us up
+ (or later pass the small wait responsibility to us). So in this case, we
+ do not need to touch the small wait lock at all.
+ */
+ elem.do_small_wait=
+ (queue_empty(&he->queue) ||
+ ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
- We may need to do this multiple times, as a previous small waiter may
- complete and pass the small wait on to us.
+ if (register_in_wait_queue(thd, wait_gtid, he, &elem))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+ /*
+ Loop, doing either the small or large wait as appropriate, until either
+ the position waited for is reached, or we get a kill or timeout.
*/
for (;;)
{
- uint64 wakeup_seq_no, cur_wait_seq_no;
-
mysql_mutex_assert_owner(&LOCK_gtid_waiting);
- mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
- /*
- The elements in the gtid_slave_state_hash are never re-allocated once
- they enter the hash, so we do not need to re-do the lookup after releasing
- and re-aquiring the lock.
- */
- if (!slave_state_elem &&
- !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
- {
- mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
- remove_from_wait_hash(he, &elem);
- mysql_mutex_unlock(&LOCK_gtid_waiting);
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
- return 1;
- }
- if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
+ if (elem.do_small_wait)
{
+ uint64 wakeup_seq_no;
+ queue_element *cur_waiter;
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
/*
- We do not have to wait. But we might need to wakeup other threads on
- the large wait (can happen if we were woken up to take over the small
- wait, and SQL thread raced with us to reach the waited-for GTID.
- */
- mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
- thd->wakeup_ready= 0;
- process_wait_hash(wakeup_seq_no, he);
- /*
- Since we already checked wakeup_seq_no, we are sure that
- process_wait_hash() will mark us done.
+ The elements in the gtid_slave_state_hash are never re-allocated once
+ they enter the hash, so we do not need to re-do the lookup after releasing
+ and re-aquiring the lock.
*/
- DBUG_ASSERT(thd->wakeup_ready);
- if (thd->wakeup_ready)
+ if (!slave_state_elem &&
+ !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
{
- if (takeover)
- promote_new_waiter(he);
- break;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ remove_from_wait_queue(he, &elem);
+ promote_new_waiter(he);
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
}
- }
- else if ((cur_wait_seq_no= slave_state_elem->min_wait_seq_no) == 0 ||
- cur_wait_seq_no > seq_no)
- {
- /*
- We have to do the small wait ourselves (stealing it from any thread that
- might already be waiting for a later seq_no).
- */
- slave_state_elem->min_wait_seq_no= seq_no;
- if (cur_wait_seq_no != 0)
+
+ if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
{
- /* We stole the wait, so wake up the old waiting thread. */
- mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ /*
+ We do not have to wait. (We will be removed from the wait queue when
+ we call process_wait_hash() below.
+ */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ }
+ else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
+ slave_state_elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ There is already a suitable small waiter, go do the large wait.
+ (Normally we would not have needed to check the small wait in this
+ case, but it can happen if we race with another thread for the small
+ lock).
+ */
+ elem.do_small_wait= false;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
}
- /* Do the small wait. */
- if (did_enter_cond)
- thd->exit_cond(old_msg);
else
- mysql_mutex_unlock(&LOCK_gtid_waiting);
-
- old_msg= thd->enter_cond(&slave_state_elem->COND_wait_gtid,
- &rpl_global_gtid_slave_state.LOCK_slave_state,
- "Waiting in MASTER_GTID_WAIT() (primary waiter)");
- do
{
- if (thd->check_killed())
- slave_state_elem->min_wait_seq_no = 0;
- else if (wait_until)
+ /*
+ We have to do the small wait ourselves (stealing it from any thread
+ that might already be waiting for a later seq_no).
+ */
+ slave_state_elem->gtid_waiter= &elem;
+ slave_state_elem->min_wait_seq_no= seq_no;
+ if (cur_waiter)
{
- int err=
- mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
- &rpl_global_gtid_slave_state.LOCK_slave_state,
- wait_until);
- if (err == ETIMEDOUT || err == ETIME)
+ /* We stole the wait, so wake up the old waiting thread. */
+ mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ }
+
+ /* Release the large lock, and do the small wait. */
+ if (did_enter_cond)
+ {
+ thd->exit_cond(old_msg);
+ did_enter_cond= false;
+ }
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ old_msg=
+ thd->enter_cond(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ "Waiting in MASTER_GTID_WAIT() (primary waiter)");
+ do
+ {
+ if (thd->check_killed())
+ break;
+ else if (wait_until)
{
- timed_out= true;
- slave_state_elem->min_wait_seq_no = 0;
+ int err=
+ mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ {
+ timed_out= true;
+ break;
+ }
}
+ else
+ mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state);
+ } while (slave_state_elem->gtid_waiter == &elem);
+ wakeup_seq_no= slave_state_elem->highest_seq_no;
+ /*
+ If we aborted due to timeout or kill, remove us as waiter.
+
+ If we were replaced by another waiter with a smaller seq_no, then we
+ no longer have responsibility for the small wait.
+ */
+ if ((cur_waiter= slave_state_elem->gtid_waiter))
+ {
+ if (cur_waiter == &elem)
+ slave_state_elem->gtid_waiter= NULL;
+ else if (slave_state_elem->min_wait_seq_no <= seq_no)
+ elem.do_small_wait= false;
}
- else
- mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
- &rpl_global_gtid_slave_state.LOCK_slave_state);
- } while (slave_state_elem->min_wait_seq_no == seq_no);
- /*
- Check the new gtid_slave_state. We could be woken up because our seq_no
- has been reached, or because someone else stole the small wait from us.
- (Or because of kill/timeout).
- */
- wakeup_seq_no= slave_state_elem->highest_seq_no;
+ thd->exit_cond(old_msg);
+
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ }
- thd->exit_cond(old_msg);
- mysql_mutex_lock(&LOCK_gtid_waiting);
/*
Note that hash_entry pointers do not change once allocated, so we do
- not need to lookup `he' again after re-aquiring the lock.
+ not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
*/
- thd->wakeup_ready= 0;
process_wait_hash(wakeup_seq_no, he);
- if (thd->wakeup_ready)
- promote_new_waiter(he);
- else if (thd->killed || timed_out)
- {
- remove_from_wait_hash(he, &elem);
- promote_new_waiter(he);
- if (thd->killed)
- thd->send_kill_message();
- break;
- }
}
else
{
- /* We have to do the large wait. */
- mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
- thd->wakeup_ready= 0;
- }
-
- takeover= false;
- old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
- "Waiting in MASTER_GTID_WAIT()");
- while (!thd->wakeup_ready && !thd->check_killed() && !timed_out)
- {
- thd_wait_begin(thd, THD_WAIT_BINLOG);
- if (wait_until)
+ /* Do the large wait. */
+ if (!did_enter_cond)
{
- int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
- &LOCK_gtid_waiting, wait_until);
- if (err == ETIMEDOUT || err == ETIME)
- timed_out= true;
+ old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
+ "Waiting in MASTER_GTID_WAIT()");
+ did_enter_cond= true;
+ }
+ while (!elem.done && !thd->check_killed())
+ {
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ if (wait_until)
+ {
+ int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
+ &LOCK_gtid_waiting, wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ timed_out= true;
+ }
+ else
+ mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
+ thd_wait_end(thd);
+ if (elem.do_small_wait || timed_out)
+ break;
}
- else
- mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
- thd_wait_end(thd);
}
- if (thd->killed || timed_out)
+ if ((thd->killed || timed_out) && !elem.done)
+ {
+ /* Aborted, so remove ourselves from the hash. */
+ remove_from_wait_queue(he, &elem);
+ elem.done= true;
+ }
+ if (elem.done)
{
- remove_from_wait_hash(he, &elem);
/*
- If we got kill/timeout _and_ we were asked to takeover the small wait,
- we need to pass on that task to someone else.
+ If our wait is done, but we have (or were passed) responsibility for
+ the small wait, then we need to pass on that task to someone else.
*/
- if (thd->wakeup_ready && elem.wakeup_reason == queue_element::TAKEOVER)
+ if (elem.do_small_wait)
promote_new_waiter(he);
- if (thd->killed)
- thd->send_kill_message();
break;
}
-
- if (elem.wakeup_reason == queue_element::DONE)
- break;
- takeover= true;
}
if (did_enter_cond)
thd->exit_cond(old_msg);
else
mysql_mutex_unlock(&LOCK_gtid_waiting);
+ if (thd->killed)
+ thd->send_kill_message();
#endif /* HAVE_REPLICATION */
return timed_out ? -1 : 0;
}
@@ -2060,32 +2091,28 @@ gtid_waiting::get_entry(uint32 domain_id)
}
-gtid_waiting::hash_element *
-gtid_waiting::register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
- gtid_waiting::queue_element *elem)
+int
+gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
+ gtid_waiting::hash_element *he,
+ gtid_waiting::queue_element *elem)
{
- hash_element *e;
-
mysql_mutex_assert_owner(&LOCK_gtid_waiting);
- if (!(e= get_entry(wait_gtid->domain_id)))
- return NULL;
-
- if (queue_insert_safe(&e->queue, (uchar *)elem))
+ if (queue_insert_safe(&he->queue, (uchar *)elem))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
- return NULL;
+ return 1;
}
- return e;
+ return 0;
}
void
-gtid_waiting::remove_from_wait_hash(gtid_waiting::hash_element *e,
- gtid_waiting::queue_element *elem)
+gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
+ gtid_waiting::queue_element *elem)
{
mysql_mutex_assert_owner(&LOCK_gtid_waiting);
- queue_remove(&e->queue, elem->queue_idx);
+ queue_remove(&he->queue, elem->queue_idx);
}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index 1cf57c45018..54f352661a7 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -41,6 +41,57 @@ enum enum_gtid_skip_type {
/*
+ Structure to keep track of threads waiting in MASTER_GTID_WAIT().
+
+ Since replication is (mostly) single-threaded, we want to minimise the
+ performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
+ are careful to keep the common lock between replication threads and
+ MASTER_GTID_WAIT threads held for as short as possible. We keep only
+ a single thread waiting to be notified by the replication threads; this
+ thread then handles all the (potentially heavy) lifting of dealing with
+ all current waiting threads.
+*/
+struct gtid_waiting {
+ /* Elements in the hash, basically a priority queue for each domain. */
+ struct hash_element {
+ QUEUE queue;
+ uint32 domain_id;
+ };
+ /* A priority queue to handle waiters in one domain in seq_no order. */
+ struct queue_element {
+ uint64 wait_seq_no;
+ THD *thd;
+ int queue_idx;
+ /*
+ do_small_wait is true if we have responsibility for ensuring that there
+ is a small waiter.
+ */
+ bool do_small_wait;
+ /*
+ The flag `done' is set when the wait is completed (either due to reaching
+ the position waited for, or due to timeout or kill). The queue_element
+ is in the queue if and only if `done' is true.
+ */
+ bool done;
+ };
+
+ mysql_mutex_t LOCK_gtid_waiting;
+ HASH hash;
+
+ void init();
+ void destroy();
+ hash_element *get_entry(uint32 domain_id);
+ int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
+ void promote_new_waiter(gtid_waiting::hash_element *he);
+ int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
+ void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
+ int register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, hash_element *he,
+ queue_element *elem);
+ void remove_from_wait_queue(hash_element *he, queue_element *elem);
+};
+
+
+/*
Replication slave state.
For every independent replication stream (identified by domain_id), this
@@ -68,9 +119,14 @@ struct rpl_slave_state
/* Highest seq_no seen so far in this domain. */
uint64 highest_seq_no;
/*
- If min_wait_seq_no is non-zero, then it is the smallest seq_no in this
- domain that someone is doing MASTER_GTID_WAIT() on. When we reach this
- seq_no, we need to signal the waiter on COND_wait_gtid.
+ If this is non-NULL, then it is the waiter responsible for the small
+ wait in MASTER_GTID_WAIT().
+ */
+ gtid_waiting::queue_element *gtid_waiter;
+ /*
+ If gtid_waiter is non-NULL, then this is the seq_no that its
+ MASTER_GTID_WAIT() is waiting on. When we reach this seq_no, we need to
+ signal the waiter on COND_wait_gtid.
*/
uint64 min_wait_seq_no;
mysql_cond_t COND_wait_gtid;
@@ -215,48 +271,6 @@ struct slave_connection_state
};
-/*
- Structure to keep track of threads waiting in MASTER_GTID_WAIT().
-
- Since replication is (mostly) single-threaded, we want to minimise the
- performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
- are careful to keep the common lock between replication threads and
- MASTER_GTID_WAIT threads held for as short as possible. We keep only
- a single thread waiting to be notified by the replication threads; this
- thread then handles all the (potentially heavy) lifting of dealing with
- all current waiting threads.
-*/
-
-struct gtid_waiting {
- /* Elements in the hash, basically a priority queue for each domain. */
- struct hash_element {
- QUEUE queue;
- uint32 domain_id;
- };
- /* A priority queue to handle waiters in one domain in seq_no order. */
- struct queue_element {
- uint64 wait_seq_no;
- THD *thd;
- int queue_idx;
- enum { DONE, TAKEOVER } wakeup_reason;
- };
-
- mysql_mutex_t LOCK_gtid_waiting;
- HASH hash;
-
- void init();
- void destroy();
- hash_element *get_entry(uint32 domain_id);
- int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
- void promote_new_waiter(gtid_waiting::hash_element *he);
- int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
- void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
- hash_element *register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
- queue_element *elem);
- void remove_from_wait_hash(hash_element *e, queue_element *elem);
-};
-
-
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
bool *first);
extern int gtid_check_rpl_slave_state_table(TABLE *table);