summaryrefslogtreecommitdiff
path: root/sql/rpl_gtid.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-02-10 12:39:26 +0100
committerunknown <knielsen@knielsen-hq.org>2014-02-10 12:39:26 +0100
commitfefdb576bb6b49adcff914f909429781452dd7bf (patch)
tree9a9e5f5f6e796e4e621eb593ed7992be57aa3683 /sql/rpl_gtid.cc
parent8cc6e90d74f4377491bcb7a0f1acd41ccf9fbcae (diff)
parent4a976545e518e70d5124e356a057c8d9624038a3 (diff)
downloadmariadb-git-fefdb576bb6b49adcff914f909429781452dd7bf.tar.gz
Merge of MDEV-4984, MDEV-4726, and MDEV-5636 into 10.0-base.
MDEV-4984: Implement MASTER_GTID_WAIT() and @@LAST_GTID. MDEV-4726: Race in mysql-test/suite/rpl/t/rpl_gtid_stop_start.test MDEV-5636: Deadlock in RESET MASTER
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r--sql/rpl_gtid.cc507
1 files changed, 488 insertions, 19 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 3f79a0cb528..d7923ed9130 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
there will not be an attempt to delete the corresponding table row before
it is even committed.
*/
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
sql_print_warning("Slave: Out of memory during slave state maintenance. "
@@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
}
+static void
+rpl_slave_state_free_element(void *arg)
+{
+ struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
+ mysql_cond_destroy(&elem->COND_wait_gtid);
+ my_free(elem);
+}
+
+
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
}
@@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (!(elem= get_element(domain_id)))
return 1;
+ if (seq_no > elem->highest_seq_no)
+ elem->highest_seq_no= seq_no;
+ if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
+ Signal (and remove) them. The waiter will handle all the processing
+ of all pending MASTER_GTID_WAIT(), so we do not slow down the
+ replication SQL thread.
+ */
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ elem->gtid_waiter= NULL;
+ mysql_cond_broadcast(&elem->COND_wait_gtid);
+ }
+
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
@@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id)
return NULL;
elem->list= NULL;
elem->domain_id= domain_id;
+ elem->highest_seq_no= 0;
+ elem->gtid_waiter= NULL;
+ mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
@@ -378,10 +405,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end;
}
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
if ((elem= get_element(gtid->domain_id)) == NULL)
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto end;
@@ -410,7 +437,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
cur->next= NULL;
elem->list= cur;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (!elist)
goto end;
@@ -470,9 +497,9 @@ end:
*/
if (elist)
{
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
}
ha_rollback_trans(thd, FALSE);
@@ -499,9 +526,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
sub_id= ++last_sub_id;
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return sub_id;
}
@@ -541,7 +568,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
@@ -576,19 +603,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
if ((res= (*cb)(&best_gtid, data)))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
@@ -659,11 +686,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
list_element *list;
uint64 best_sub_id;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return false;
}
@@ -681,7 +708,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
out_gtid->seq_no= list->seq_no;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return true;
}
@@ -811,7 +838,7 @@ rpl_slave_state::is_empty()
uint32 i;
bool result= true;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
@@ -821,7 +848,7 @@ rpl_slave_state::is_empty()
break;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return result;
}
@@ -1647,3 +1674,445 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
return 0;
}
+
+
+/*
+ Execute a MASTER_GTID_WAIT().
+ The position to wait for is in gtid_str in string form.
+ The timeout in microseconds is in timeout_us, zero means no timeout.
+
+ Returns:
+ 1 for error.
+ 0 for wait completed.
+ -1 for wait timed out.
+*/
+int
+gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
+{
+ int err;
+ rpl_gtid *wait_pos;
+ uint32 count, i;
+ struct timespec wait_until, *wait_until_ptr;
+
+ /* Wait for the empty position returns immediately. */
+ if (gtid_str->length() == 0)
+ return 0;
+
+ if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
+ &count)))
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+
+ if (timeout_us >= 0)
+ {
+ set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
+ wait_until_ptr= &wait_until;
+ }
+ else
+ wait_until_ptr= NULL;
+ err= 0;
+ for (i= 0; i < count; ++i)
+ {
+ if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
+ break;
+ }
+ my_free(wait_pos);
+ return err;
+}
+
+
+void
+gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
+{
+ queue_element *qe;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ if (queue_empty(&he->queue))
+ return;
+ qe= (queue_element *)queue_top(&he->queue);
+ qe->do_small_wait= true;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+}
+
+void
+gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
+ gtid_waiting::hash_element *he)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ for (;;)
+ {
+ queue_element *qe;
+
+ if (queue_empty(&he->queue))
+ break;
+ qe= (queue_element *)queue_top(&he->queue);
+ if (qe->wait_seq_no > wakeup_seq_no)
+ break;
+ DBUG_ASSERT(!qe->done);
+ queue_remove_top(&he->queue);
+ qe->done= true;;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+ }
+}
+
+
+/*
+ Execute a MASTER_GTID_WAIT() for one specific domain.
+
+ The implementation is optimised primarily for (1) minimal performance impact
+ on the slave replication threads, and secondarily for (2) quick performance
+ of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
+ read to clients in an async replication read-scaleout scenario.
+
+ To achieve (1), we have a "small" wait and a "large" wait. The small wait
+ contends with the replication threads on the lock on the gtid_slave_pos, so
+ only minimal processing is done under that lock, and only a single waiter at
+ a time does the small wait.
+
+ If there is already a small waiter, a new thread will either replace the
+ small waiter (if it needs to wait for an earlier sequence number), or
+ instead do a "large" wait.
+
+ Once awoken on the small wait, the waiting thread releases the lock shared
+ with the SQL threads quickly, and then processes all waiters currently doing
+ the large wait using a different lock that does not impact replication.
+
+ This way, the SQL threads only need to do a single check + possibly a
+ pthread_cond_signal() when updating the gtid_slave_state, and the time that
+ non-SQL threads contend for the lock on gtid_slave_state is minimized.
+
+ There is always at least one thread that has the responsibility to ensure
+ that there is a small waiter; this thread has queue_element::do_small_wait
+ set to true. This thread will do the small wait until it is done, at which
+ point it will make sure to pass on the responsibility to another thread.
+ Normally only one thread has do_small_wait==true, but it can occasionally
+ happen that there is more than one, when threads race one another for the
+ lock on the small wait (this results in slightly increased activity on the
+ small lock but is otherwise harmless).
+
+ Returns:
+ 0 Wait completed normally
+ -1 Wait completed due to timeout
+ 1 An error (my_error() will have been called to set the error in the da)
+*/
+int
+gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
+ struct timespec *wait_until)
+{
+ bool timed_out= false;
+#ifdef HAVE_REPLICATION
+ queue_element elem;
+ uint32 domain_id= wait_gtid->domain_id;
+ uint64 seq_no= wait_gtid->seq_no;
+ hash_element *he;
+ rpl_slave_state::element *slave_state_elem= NULL;
+ const char *old_msg= NULL;
+ bool did_enter_cond= false;
+
+ elem.wait_seq_no= seq_no;
+ elem.thd= thd;
+ elem.done= false;
+
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ if (!(he= get_entry(wait_gtid->domain_id)))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+ /*
+ If there is already another waiter with seq_no no larger than our own,
+ we are sure that there is already a small waiter that will wake us up
+ (or later pass the small wait responsibility to us). So in this case, we
+ do not need to touch the small wait lock at all.
+ */
+ elem.do_small_wait=
+ (queue_empty(&he->queue) ||
+ ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
+
+ if (register_in_wait_queue(thd, wait_gtid, he, &elem))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+ /*
+ Loop, doing either the small or large wait as appropriate, until either
+ the position waited for is reached, or we get a kill or timeout.
+ */
+ for (;;)
+ {
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (elem.do_small_wait)
+ {
+ uint64 wakeup_seq_no;
+ queue_element *cur_waiter;
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ /*
+ The elements in the gtid_slave_state_hash are never re-allocated once
+ they enter the hash, so we do not need to re-do the lookup after releasing
+ and re-aquiring the lock.
+ */
+ if (!slave_state_elem &&
+ !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ remove_from_wait_queue(he, &elem);
+ promote_new_waiter(he);
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
+ {
+ /*
+ We do not have to wait. (We will be removed from the wait queue when
+ we call process_wait_hash() below.
+ */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ }
+ else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
+ slave_state_elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ There is already a suitable small waiter, go do the large wait.
+ (Normally we would not have needed to check the small wait in this
+ case, but it can happen if we race with another thread for the small
+ lock).
+ */
+ elem.do_small_wait= false;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ }
+ else
+ {
+ /*
+ We have to do the small wait ourselves (stealing it from any thread
+ that might already be waiting for a later seq_no).
+ */
+ slave_state_elem->gtid_waiter= &elem;
+ slave_state_elem->min_wait_seq_no= seq_no;
+ if (cur_waiter)
+ {
+ /* We stole the wait, so wake up the old waiting thread. */
+ mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ }
+
+ /* Release the large lock, and do the small wait. */
+ if (did_enter_cond)
+ {
+ thd->exit_cond(old_msg);
+ did_enter_cond= false;
+ }
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ old_msg=
+ thd->enter_cond(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ "Waiting in MASTER_GTID_WAIT() (primary waiter)");
+ do
+ {
+ if (thd->check_killed())
+ break;
+ else if (wait_until)
+ {
+ int err=
+ mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ {
+ timed_out= true;
+ break;
+ }
+ }
+ else
+ mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state);
+ } while (slave_state_elem->gtid_waiter == &elem);
+ wakeup_seq_no= slave_state_elem->highest_seq_no;
+ /*
+ If we aborted due to timeout or kill, remove us as waiter.
+
+ If we were replaced by another waiter with a smaller seq_no, then we
+ no longer have responsibility for the small wait.
+ */
+ if ((cur_waiter= slave_state_elem->gtid_waiter))
+ {
+ if (cur_waiter == &elem)
+ slave_state_elem->gtid_waiter= NULL;
+ else if (slave_state_elem->min_wait_seq_no <= seq_no)
+ elem.do_small_wait= false;
+ }
+ thd->exit_cond(old_msg);
+
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ }
+
+ /*
+ Note that hash_entry pointers do not change once allocated, so we do
+ not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
+ */
+ process_wait_hash(wakeup_seq_no, he);
+ }
+ else
+ {
+ /* Do the large wait. */
+ if (!did_enter_cond)
+ {
+ old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
+ "Waiting in MASTER_GTID_WAIT()");
+ did_enter_cond= true;
+ }
+ while (!elem.done && !thd->check_killed())
+ {
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ if (wait_until)
+ {
+ int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
+ &LOCK_gtid_waiting, wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ timed_out= true;
+ }
+ else
+ mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
+ thd_wait_end(thd);
+ if (elem.do_small_wait || timed_out)
+ break;
+ }
+ }
+
+ if ((thd->killed || timed_out) && !elem.done)
+ {
+ /* Aborted, so remove ourselves from the hash. */
+ remove_from_wait_queue(he, &elem);
+ elem.done= true;
+ }
+ if (elem.done)
+ {
+ /*
+ If our wait is done, but we have (or were passed) responsibility for
+ the small wait, then we need to pass on that task to someone else.
+ */
+ if (elem.do_small_wait)
+ promote_new_waiter(he);
+ break;
+ }
+ }
+
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ if (thd->killed)
+ thd->send_kill_message();
+#endif /* HAVE_REPLICATION */
+ return timed_out ? -1 : 0;
+}
+
+
+static void
+free_hash_element(void *p)
+{
+ gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
+ delete_queue(&e->queue);
+ my_free(e);
+}
+
+
+void
+gtid_waiting::init()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(hash_element, domain_id), sizeof(uint32), NULL,
+ free_hash_element, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
+}
+
+
+void
+gtid_waiting::destroy()
+{
+ mysql_mutex_destroy(&LOCK_gtid_waiting);
+ my_hash_free(&hash);
+}
+
+
+static int
+cmp_queue_elem(void *, uchar *a, uchar *b)
+{
+ uint64 seq_no_a= *(uint64 *)a;
+ uint64 seq_no_b= *(uint64 *)b;
+ if (seq_no_a < seq_no_b)
+ return -1;
+ else if (seq_no_a == seq_no_b)
+ return 0;
+ else
+ return 1;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::get_entry(uint32 domain_id)
+{
+ hash_element *e;
+
+ if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ return e;
+
+ if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e));
+ return NULL;
+ }
+
+ if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
+ cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(e);
+ return NULL;
+ }
+ e->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)e))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete_queue(&e->queue);
+ my_free(e);
+ return NULL;
+ }
+ return e;
+}
+
+
+int
+gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
+ gtid_waiting::hash_element *he,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (queue_insert_safe(&he->queue, (uchar *)elem))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ return 0;
+}
+
+
+void
+gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ queue_remove(&he->queue, elem->queue_idx);
+}