summaryrefslogtreecommitdiff
path: root/sql/rpl_gtid.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r--sql/rpl_gtid.cc480
1 files changed, 461 insertions, 19 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 3f79a0cb528..00140fd3475 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
there will not be an attempt to delete the corresponding table row before
it is even committed.
*/
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
sql_print_warning("Slave: Out of memory during slave state maintenance. "
@@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
}
+static void
+rpl_slave_state_free_element(void *arg)
+{
+ struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
+ mysql_cond_destroy(&elem->COND_wait_gtid);
+ my_free(elem);
+}
+
+
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
}
@@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (!(elem= get_element(domain_id)))
return 1;
+ if (seq_no > elem->highest_seq_no)
+ elem->highest_seq_no= seq_no;
+ if (elem->min_wait_seq_no != 0 && elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
+ Signal (and remove) them. The waiter will handle all the processing
+ of all pending MASTER_GTID_WAIT(), so we do not slow down the
+ replication SQL thread.
+ */
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ elem->min_wait_seq_no= 0;
+ mysql_cond_broadcast(&elem->COND_wait_gtid);
+ }
+
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
@@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id)
return NULL;
elem->list= NULL;
elem->domain_id= domain_id;
+ elem->highest_seq_no= 0;
+ elem->min_wait_seq_no= 0;
+ mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
@@ -378,10 +405,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end;
}
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
if ((elem= get_element(gtid->domain_id)) == NULL)
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto end;
@@ -410,7 +437,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
cur->next= NULL;
elem->list= cur;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (!elist)
goto end;
@@ -470,9 +497,9 @@ end:
*/
if (elist)
{
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
}
ha_rollback_trans(thd, FALSE);
@@ -499,9 +526,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
sub_id= ++last_sub_id;
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return sub_id;
}
@@ -541,7 +568,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
@@ -576,19 +603,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
if ((res= (*cb)(&best_gtid, data)))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
@@ -659,11 +686,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
list_element *list;
uint64 best_sub_id;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return false;
}
@@ -681,7 +708,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
out_gtid->seq_no= list->seq_no;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return true;
}
@@ -811,7 +838,7 @@ rpl_slave_state::is_empty()
uint32 i;
bool result= true;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
@@ -821,7 +848,7 @@ rpl_slave_state::is_empty()
break;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return result;
}
@@ -1647,3 +1674,418 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
return 0;
}
+
+
+/*
+ Execute a MASTER_GTID_WAIT().
+ The position to wait for is in gtid_str in string form.
+ The timeout in microseconds is in timeout_us, zero means no timeout.
+
+ Returns:
+ 1 for error.
+ 0 for wait completed.
+ -1 for wait timed out.
+*/
+int
+gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
+{
+ int err;
+ rpl_gtid *wait_pos;
+ uint32 count, i;
+ struct timespec wait_until, *wait_until_ptr;
+
+ /* Wait for the empty position returns immediately. */
+ if (gtid_str->length() == 0)
+ return 0;
+
+ if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
+ &count)))
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+
+ if (timeout_us >= 0)
+ {
+ set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
+ wait_until_ptr= &wait_until;
+ }
+ else
+ wait_until_ptr= NULL;
+ err= 0;
+ for (i= 0; i < count; ++i)
+ {
+ if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
+ break;
+ }
+ my_free(wait_pos);
+ return err;
+}
+
+
+void
+gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
+{
+ queue_element *qe;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ if (queue_empty(&he->queue))
+ return;
+ qe= (queue_element *)queue_top(&he->queue);
+ qe->thd->wakeup_ready= true;
+ qe->wakeup_reason= queue_element::TAKEOVER;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+}
+
+void
+gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
+ gtid_waiting::hash_element *he)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ for (;;)
+ {
+ queue_element *qe;
+
+ if (queue_first_element(&he->queue) > queue_last_element(&he->queue))
+ break;
+ qe= (queue_element *)queue_top(&he->queue);
+ if (qe->wait_seq_no > wakeup_seq_no)
+ break;
+ queue_remove_top(&he->queue);
+ qe->thd->wakeup_ready= true;
+ qe->wakeup_reason= queue_element::DONE;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+ }
+}
+
+
+/*
+ Execute a MASTER_GTID_WAIT() for one specific domain.
+
+ The implementation is optimised primarily for (1) minimal performance impact
+ on the slave replication threads, and secondarily for (2) quick performance
+ of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
+ read to clients in an async replication read-scaleout scenario.
+
+ To achieve (1), we have a "small" wait and a "large" wait. The small wait
+ contends with the replication threads on the lock on the gtid_slave_pos, so
+ only minimal processing is done under that lock, and only a single waiter at
+ a time does the small wait.
+
+ If there is already a small waiter, a new thread will either replace the
+ small waiter (if it needs to wait for an earlier sequence number), or
+ instead to a "large" wait.
+
+ Once awoken on the small wait, the waiting thread releases the lock shared
+ with the SQL threads quickly, and then processes all waiters currently doing
+ the large wait.
+
+ This way, the SQL threads only need to do a single check + possibly a
+ pthread_cond_signal() when updating the gtid_slave_state, and the time that
+ non-SQL threads contend for the lock on gtid_slave_staste is minimized.
+*/
+int
+gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
+ struct timespec *wait_until)
+{
+ bool timed_out= false;
+#ifdef HAVE_REPLICATION
+ queue_element elem;
+ uint32_t domain_id= wait_gtid->domain_id;
+ uint64 seq_no= wait_gtid->seq_no;
+ hash_element *he;
+ rpl_slave_state::element *slave_state_elem= NULL;
+ const char *old_msg= NULL;
+ bool did_enter_cond= false;
+ bool takeover= false;
+
+ elem.wait_seq_no= seq_no;
+ elem.thd= thd;
+ /*
+ Register on the large wait before checking the small wait.
+ This ensures that if we find another waiter already doing the small wait,
+ we are sure to be woken up by that one, and thus we will not need to take
+ the lock on the small wait more than once in this case.
+ */
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ if (!(he= register_in_wait_hash(thd, wait_gtid, &elem)))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+
+ /*
+ Now check the small wait, and either do the large wait or the small one,
+ depending on whether there is already a suitable small waiter or not.
+
+ We may need to do this multiple times, as a previous small waiter may
+ complete and pass the small wait on to us.
+ */
+ for (;;)
+ {
+ uint64 wakeup_seq_no, cur_wait_seq_no;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ /*
+ The elements in the gtid_slave_state_hash are never re-allocated once
+ they enter the hash, so we do not need to re-do the lookup after releasing
+ and re-aquiring the lock.
+ */
+ if (!slave_state_elem &&
+ !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ remove_from_wait_hash(he, &elem);
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
+ {
+ /*
+ We do not have to wait. But we might need to wakeup other threads on
+ the large wait (can happen if we were woken up to take over the small
+ wait, and SQL thread raced with us to reach the waited-for GTID.
+ */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ thd->wakeup_ready= 0;
+ process_wait_hash(wakeup_seq_no, he);
+ /*
+ Since we already checked wakeup_seq_no, we are sure that
+ process_wait_hash() will mark us done.
+ */
+ DBUG_ASSERT(thd->wakeup_ready);
+ if (thd->wakeup_ready)
+ {
+ if (takeover)
+ promote_new_waiter(he);
+ break;
+ }
+ }
+ else if ((cur_wait_seq_no= slave_state_elem->min_wait_seq_no) == 0 ||
+ cur_wait_seq_no > seq_no)
+ {
+ /*
+ We have to do the small wait ourselves (stealing it from any thread that
+ might already be waiting for a later seq_no).
+ */
+ slave_state_elem->min_wait_seq_no= seq_no;
+ if (cur_wait_seq_no != 0)
+ {
+ /* We stole the wait, so wake up the old waiting thread. */
+ mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ }
+ /* Do the small wait. */
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+
+ old_msg= thd->enter_cond(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ "Waiting in MASTER_GTID_WAIT() (primary waiter)");
+ do
+ {
+ if (thd->check_killed())
+ slave_state_elem->min_wait_seq_no = 0;
+ else if (wait_until)
+ {
+ int err=
+ mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ {
+ timed_out= true;
+ slave_state_elem->min_wait_seq_no = 0;
+ }
+ }
+ else
+ mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state);
+ } while (slave_state_elem->min_wait_seq_no == seq_no);
+ /*
+ Check the new gtid_slave_state. We could be woken up because our seq_no
+ has been reached, or because someone else stole the small wait from us.
+ (Or because of kill/timeout).
+ */
+ wakeup_seq_no= slave_state_elem->highest_seq_no;
+
+ thd->exit_cond(old_msg);
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ /*
+ Note that hash_entry pointers do not change once allocated, so we do
+ not need to lookup `he' again after re-aquiring the lock.
+ */
+ thd->wakeup_ready= 0;
+ process_wait_hash(wakeup_seq_no, he);
+ if (thd->wakeup_ready)
+ promote_new_waiter(he);
+ else if (thd->killed || timed_out)
+ {
+ remove_from_wait_hash(he, &elem);
+ promote_new_waiter(he);
+ if (thd->killed)
+ thd->send_kill_message();
+ break;
+ }
+ }
+ else
+ {
+ /* We have to do the large wait. */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ thd->wakeup_ready= 0;
+ }
+
+ takeover= false;
+ old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
+ "Waiting in MASTER_GTID_WAIT()");
+ while (!thd->wakeup_ready && !thd->check_killed() && !timed_out)
+ {
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ if (wait_until)
+ {
+ int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
+ &LOCK_gtid_waiting, wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ timed_out= true;
+ }
+ else
+ mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
+ thd_wait_end(thd);
+ }
+
+ if (elem.wakeup_reason == queue_element::DONE)
+ break;
+ takeover= true;
+
+ if (thd->killed || timed_out)
+ {
+ remove_from_wait_hash(he, &elem);
+ /*
+ If we got kill/timeout _and_ we were asked to takeover the small wait,
+ we need to pass on that task to someone else.
+ */
+ if (thd->wakeup_ready && elem.wakeup_reason == queue_element::TAKEOVER)
+ promote_new_waiter(he);
+ if (thd->killed)
+ thd->send_kill_message();
+ break;
+ }
+ }
+
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+#endif /* HAVE_REPLICATION */
+ return timed_out ? -1 : 0;
+}
+
+
+static void
+free_hash_element(void *p)
+{
+ gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
+ delete_queue(&e->queue);
+ my_free(e);
+}
+
+
+void
+gtid_waiting::init()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(hash_element, domain_id), sizeof(uint32), NULL,
+ free_hash_element, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
+}
+
+
+void
+gtid_waiting::destroy()
+{
+ mysql_mutex_destroy(&LOCK_gtid_waiting);
+ my_hash_free(&hash);
+}
+
+
+static int
+cmp_queue_elem(void *, uchar *a, uchar *b)
+{
+ uint64 seq_no_a= *(uint64 *)a;
+ uint64 seq_no_b= *(uint64 *)b;
+ if (seq_no_a < seq_no_b)
+ return -1;
+ else if (seq_no_a == seq_no_b)
+ return 0;
+ else
+ return 1;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::get_entry(uint32 domain_id)
+{
+ hash_element *e;
+
+ if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ return e;
+
+ if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e));
+ return NULL;
+ }
+
+ if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
+ cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(e);
+ return NULL;
+ }
+ e->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)e))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete_queue(&e->queue);
+ my_free(e);
+ return NULL;
+ }
+ return e;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
+ gtid_waiting::queue_element *elem)
+{
+ hash_element *e;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (!(e= get_entry(wait_gtid->domain_id)))
+ return NULL;
+
+ if (queue_insert_safe(&e->queue, (uchar *)elem))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return e;
+}
+
+
+void
+gtid_waiting::remove_from_wait_hash(gtid_waiting::hash_element *e,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ queue_remove(&e->queue, elem->queue_idx);
+}