diff options
author | unknown <knielsen@knielsen-hq.org> | 2014-02-10 15:12:17 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2014-02-10 15:12:17 +0100 |
commit | dd93ec5633c325748118ec8f23028715d1935820 (patch) | |
tree | 29aa2272a2782792024a4d1fd79b845f72122b96 /sql/rpl_gtid.cc | |
parent | e8193eeb8d90148f1cd46f085c96b2f3e3139d39 (diff) | |
parent | f3a6f86ec3452b346de513075f72dbd02549a5fb (diff) | |
download | mariadb-git-dd93ec5633c325748118ec8f23028715d1935820.tar.gz |
Merge MariaDB 10.0-base to 10.0.
Diffstat (limited to 'sql/rpl_gtid.cc')
-rw-r--r-- | sql/rpl_gtid.cc | 506 |
1 files changed, 487 insertions, 19 deletions
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index a36a15b3c27..e51dee20c19 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) there will not be an attempt to delete the corresponding table row before it is even committed. */ - lock(); + mysql_mutex_lock(&LOCK_slave_state); err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no); - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); if (err) { sql_print_warning("Slave: Out of memory during slave state maintenance. " @@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) } +static void +rpl_slave_state_free_element(void *arg) +{ + struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg; + mysql_cond_destroy(&elem->COND_wait_gtid); + my_free(elem); +} + + rpl_slave_state::rpl_slave_state() : last_sub_id(0), inited(false), loaded(false) { my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), - sizeof(uint32), NULL, my_free, HASH_UNIQUE); + sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE); } @@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, if (!(elem= get_element(domain_id))) return 1; + if (seq_no > elem->highest_seq_no) + elem->highest_seq_no= seq_no; + if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no) + { + /* + Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear. + Signal (and remove) them. The waiter will handle all the processing + of all pending MASTER_GTID_WAIT(), so we do not slow down the + replication SQL thread. + */ + mysql_mutex_assert_owner(&LOCK_slave_state); + elem->gtid_waiter= NULL; + mysql_cond_broadcast(&elem->COND_wait_gtid); + } + if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) return 1; list_elem->server_id= server_id; @@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id) return NULL; elem->list= NULL; elem->domain_id= domain_id; + elem->highest_seq_no= 0; + elem->gtid_waiter= NULL; + mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0); if (my_hash_insert(&hash, (uchar *)elem)) { my_free(elem); @@ -379,10 +406,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, goto end; } - lock(); + mysql_mutex_lock(&LOCK_slave_state); if ((elem= get_element(gtid->domain_id)) == NULL) { - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); my_error(ER_OUT_OF_RESOURCES, MYF(0)); err= 1; goto end; @@ -411,7 +438,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, cur->next= NULL; elem->list= cur; } - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); if (!elist) goto end; @@ -471,9 +498,9 @@ end: */ if (elist) { - lock(); + mysql_mutex_lock(&LOCK_slave_state); put_back_list(gtid->domain_id, elist); - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); } ha_rollback_trans(thd, FALSE); @@ -500,9 +527,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id) { uint64 sub_id= 0; - lock(); + mysql_mutex_lock(&LOCK_slave_state); sub_id= ++last_sub_id; - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); return sub_id; } @@ -542,7 +569,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data, my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i]))) goto err; - lock(); + mysql_mutex_lock(&LOCK_slave_state); for (i= 0; i < hash.records; ++i) { @@ -577,19 +604,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data, memcpy(&best_gtid, gtid, sizeof(best_gtid)); if (my_hash_delete(>id_hash, rec)) { - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); goto err; } } if ((res= (*cb)(&best_gtid, data))) { - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); goto err; } } - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); /* Also add any remaining extra domain_ids. */ for (i= 0; i < gtid_hash.records; ++i) @@ -660,11 +687,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) list_element *list; uint64 best_sub_id; - lock(); + mysql_mutex_lock(&LOCK_slave_state); elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0); if (!elem || !(list= elem->list)) { - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); return false; } @@ -682,7 +709,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid) out_gtid->seq_no= list->seq_no; } - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); return true; } @@ -812,7 +839,7 @@ rpl_slave_state::is_empty() uint32 i; bool result= true; - lock(); + mysql_mutex_lock(&LOCK_slave_state); for (i= 0; i < hash.records; ++i) { element *e= (element *)my_hash_element(&hash, i); @@ -822,7 +849,7 @@ rpl_slave_state::is_empty() break; } } - unlock(); + mysql_mutex_unlock(&LOCK_slave_state); return result; } @@ -1648,3 +1675,444 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) return 0; } + + +/* + Execute a MASTER_GTID_WAIT(). + The position to wait for is in gtid_str in string form. + The timeout in microseconds is in timeout_us, zero means no timeout. + + Returns: + 1 for error. + 0 for wait completed. + -1 for wait timed out. +*/ +int +gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us) +{ + int err; + rpl_gtid *wait_pos; + uint32 count, i; + struct timespec wait_until, *wait_until_ptr; + + /* Wait for the empty position returns immediately. */ + if (gtid_str->length() == 0) + return 0; + + if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(), + &count))) + { + my_error(ER_INCORRECT_GTID_STATE, MYF(0)); + return 1; + } + + if (timeout_us >= 0) + { + set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us); + wait_until_ptr= &wait_until; + } + else + wait_until_ptr= NULL; + err= 0; + for (i= 0; i < count; ++i) + { + if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr))) + break; + } + my_free(wait_pos); + return err; +} + + +void +gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he) +{ + queue_element *qe; + + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + if (queue_empty(&he->queue)) + return; + qe= (queue_element *)queue_top(&he->queue); + qe->do_small_wait= true; + mysql_cond_signal(&qe->thd->COND_wakeup_ready); +} + +void +gtid_waiting::process_wait_hash(uint64 wakeup_seq_no, + gtid_waiting::hash_element *he) +{ + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + + for (;;) + { + queue_element *qe; + + if (queue_empty(&he->queue)) + break; + qe= (queue_element *)queue_top(&he->queue); + if (qe->wait_seq_no > wakeup_seq_no) + break; + DBUG_ASSERT(!qe->done); + queue_remove_top(&he->queue); + qe->done= true;; + mysql_cond_signal(&qe->thd->COND_wakeup_ready); + } +} + + +/* + Execute a MASTER_GTID_WAIT() for one specific domain. + + The implementation is optimised primarily for (1) minimal performance impact + on the slave replication threads, and secondarily for (2) quick performance + of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent + read to clients in an async replication read-scaleout scenario. + + To achieve (1), we have a "small" wait and a "large" wait. The small wait + contends with the replication threads on the lock on the gtid_slave_pos, so + only minimal processing is done under that lock, and only a single waiter at + a time does the small wait. + + If there is already a small waiter, a new thread will either replace the + small waiter (if it needs to wait for an earlier sequence number), or + instead do a "large" wait. + + Once awoken on the small wait, the waiting thread releases the lock shared + with the SQL threads quickly, and then processes all waiters currently doing + the large wait using a different lock that does not impact replication. + + This way, the SQL threads only need to do a single check + possibly a + pthread_cond_signal() when updating the gtid_slave_state, and the time that + non-SQL threads contend for the lock on gtid_slave_state is minimized. + + There is always at least one thread that has the responsibility to ensure + that there is a small waiter; this thread has queue_element::do_small_wait + set to true. This thread will do the small wait until it is done, at which + point it will make sure to pass on the responsibility to another thread. + Normally only one thread has do_small_wait==true, but it can occasionally + happen that there is more than one, when threads race one another for the + lock on the small wait (this results in slightly increased activity on the + small lock but is otherwise harmless). + + Returns: + 0 Wait completed normally + -1 Wait completed due to timeout + 1 An error (my_error() will have been called to set the error in the da) +*/ +int +gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, + struct timespec *wait_until) +{ + bool timed_out= false; +#ifdef HAVE_REPLICATION + queue_element elem; + uint32 domain_id= wait_gtid->domain_id; + uint64 seq_no= wait_gtid->seq_no; + hash_element *he; + rpl_slave_state::element *slave_state_elem= NULL; + PSI_stage_info old_stage; + bool did_enter_cond= false; + + elem.wait_seq_no= seq_no; + elem.thd= thd; + elem.done= false; + + mysql_mutex_lock(&LOCK_gtid_waiting); + if (!(he= get_entry(wait_gtid->domain_id))) + { + mysql_mutex_unlock(&LOCK_gtid_waiting); + return 1; + } + /* + If there is already another waiter with seq_no no larger than our own, + we are sure that there is already a small waiter that will wake us up + (or later pass the small wait responsibility to us). So in this case, we + do not need to touch the small wait lock at all. + */ + elem.do_small_wait= + (queue_empty(&he->queue) || + ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no); + + if (register_in_wait_queue(thd, wait_gtid, he, &elem)) + { + mysql_mutex_unlock(&LOCK_gtid_waiting); + return 1; + } + /* + Loop, doing either the small or large wait as appropriate, until either + the position waited for is reached, or we get a kill or timeout. + */ + for (;;) + { + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + + if (elem.do_small_wait) + { + uint64 wakeup_seq_no; + queue_element *cur_waiter; + + mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); + /* + The elements in the gtid_slave_state_hash are never re-allocated once + they enter the hash, so we do not need to re-do the lookup after releasing + and re-aquiring the lock. + */ + if (!slave_state_elem && + !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id))) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + remove_from_wait_queue(he, &elem); + promote_new_waiter(he); + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&LOCK_gtid_waiting); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + + if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no) + { + /* + We do not have to wait. (We will be removed from the wait queue when + we call process_wait_hash() below. + */ + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + } + else if ((cur_waiter= slave_state_elem->gtid_waiter) && + slave_state_elem->min_wait_seq_no <= seq_no) + { + /* + There is already a suitable small waiter, go do the large wait. + (Normally we would not have needed to check the small wait in this + case, but it can happen if we race with another thread for the small + lock). + */ + elem.do_small_wait= false; + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + } + else + { + /* + We have to do the small wait ourselves (stealing it from any thread + that might already be waiting for a later seq_no). + */ + slave_state_elem->gtid_waiter= &elem; + slave_state_elem->min_wait_seq_no= seq_no; + if (cur_waiter) + { + /* We stole the wait, so wake up the old waiting thread. */ + mysql_cond_signal(&slave_state_elem->COND_wait_gtid); + } + + /* Release the large lock, and do the small wait. */ + if (did_enter_cond) + { + thd->EXIT_COND(&old_stage); + did_enter_cond= false; + } + else + mysql_mutex_unlock(&LOCK_gtid_waiting); + thd->ENTER_COND(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state.LOCK_slave_state, + &stage_master_gtid_wait_primary, &old_stage); + do + { + if (thd->check_killed()) + break; + else if (wait_until) + { + int err= + mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state.LOCK_slave_state, + wait_until); + if (err == ETIMEDOUT || err == ETIME) + { + timed_out= true; + break; + } + } + else + mysql_cond_wait(&slave_state_elem->COND_wait_gtid, + &rpl_global_gtid_slave_state.LOCK_slave_state); + } while (slave_state_elem->gtid_waiter == &elem); + wakeup_seq_no= slave_state_elem->highest_seq_no; + /* + If we aborted due to timeout or kill, remove us as waiter. + + If we were replaced by another waiter with a smaller seq_no, then we + no longer have responsibility for the small wait. + */ + if ((cur_waiter= slave_state_elem->gtid_waiter)) + { + if (cur_waiter == &elem) + slave_state_elem->gtid_waiter= NULL; + else if (slave_state_elem->min_wait_seq_no <= seq_no) + elem.do_small_wait= false; + } + thd->EXIT_COND(&old_stage); + + mysql_mutex_lock(&LOCK_gtid_waiting); + } + + /* + Note that hash_entry pointers do not change once allocated, so we do + not need to lookup `he' again after re-aquiring LOCK_gtid_waiting. + */ + process_wait_hash(wakeup_seq_no, he); + } + else + { + /* Do the large wait. */ + if (!did_enter_cond) + { + thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting, + &stage_master_gtid_wait, &old_stage); + did_enter_cond= true; + } + while (!elem.done && !thd->check_killed()) + { + thd_wait_begin(thd, THD_WAIT_BINLOG); + if (wait_until) + { + int err= mysql_cond_timedwait(&thd->COND_wakeup_ready, + &LOCK_gtid_waiting, wait_until); + if (err == ETIMEDOUT || err == ETIME) + timed_out= true; + } + else + mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting); + thd_wait_end(thd); + if (elem.do_small_wait || timed_out) + break; + } + } + + if ((thd->killed || timed_out) && !elem.done) + { + /* Aborted, so remove ourselves from the hash. */ + remove_from_wait_queue(he, &elem); + elem.done= true; + } + if (elem.done) + { + /* + If our wait is done, but we have (or were passed) responsibility for + the small wait, then we need to pass on that task to someone else. + */ + if (elem.do_small_wait) + promote_new_waiter(he); + break; + } + } + + if (did_enter_cond) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&LOCK_gtid_waiting); + if (thd->killed) + thd->send_kill_message(); +#endif /* HAVE_REPLICATION */ + return timed_out ? -1 : 0; +} + + +static void +free_hash_element(void *p) +{ + gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p; + delete_queue(&e->queue); + my_free(e); +} + + +void +gtid_waiting::init() +{ + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(hash_element, domain_id), sizeof(uint32), NULL, + free_hash_element, HASH_UNIQUE); + mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0); +} + + +void +gtid_waiting::destroy() +{ + mysql_mutex_destroy(&LOCK_gtid_waiting); + my_hash_free(&hash); +} + + +static int +cmp_queue_elem(void *, uchar *a, uchar *b) +{ + uint64 seq_no_a= *(uint64 *)a; + uint64 seq_no_b= *(uint64 *)b; + if (seq_no_a < seq_no_b) + return -1; + else if (seq_no_a == seq_no_b) + return 0; + else + return 1; +} + + +gtid_waiting::hash_element * +gtid_waiting::get_entry(uint32 domain_id) +{ + hash_element *e; + + if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0))) + return e; + + if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e)); + return NULL; + } + + if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0, + cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + my_free(e); + return NULL; + } + e->domain_id= domain_id; + if (my_hash_insert(&hash, (uchar *)e)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + delete_queue(&e->queue); + my_free(e); + return NULL; + } + return e; +} + + +int +gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, + gtid_waiting::hash_element *he, + gtid_waiting::queue_element *elem) +{ + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + + if (queue_insert_safe(&he->queue, (uchar *)elem)) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } + + return 0; +} + + +void +gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he, + gtid_waiting::queue_element *elem) +{ + mysql_mutex_assert_owner(&LOCK_gtid_waiting); + + queue_remove(&he->queue, elem->queue_idx); +} |