summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/item_create.cc55
-rw-r--r--sql/item_func.cc28
-rw-r--r--sql/item_func.h16
-rw-r--r--sql/log.cc1
-rw-r--r--sql/mysqld.cc8
-rw-r--r--sql/mysqld.h2
-rw-r--r--sql/rpl_gtid.cc480
-rw-r--r--sql/rpl_gtid.h59
-rw-r--r--sql/rpl_rli.cc16
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/sql_class.cc1
-rw-r--r--sql/sql_class.h17
-rw-r--r--sql/sql_repl.cc14
-rw-r--r--sql/sql_repl.h2
-rw-r--r--sql/sys_vars.cc27
-rw-r--r--sql/sys_vars.h50
17 files changed, 743 insertions, 36 deletions
diff --git a/sql/item_create.cc b/sql/item_create.cc
index 60eabe67c83..c158816bf32 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -1783,6 +1783,19 @@ protected:
};
+class Create_func_master_gtid_wait : public Create_native_func
+{
+public:
+ virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
+
+ static Create_func_master_gtid_wait s_singleton;
+
+protected:
+ Create_func_master_gtid_wait() {}
+ virtual ~Create_func_master_gtid_wait() {}
+};
+
+
class Create_func_md5 : public Create_func_arg1
{
public:
@@ -4590,6 +4603,47 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name,
}
+Create_func_master_gtid_wait Create_func_master_gtid_wait::s_singleton;
+
+Item*
+Create_func_master_gtid_wait::create_native(THD *thd, LEX_STRING name,
+ List<Item> *item_list)
+{
+ Item *func= NULL;
+ int arg_count= 0;
+
+ thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
+
+ if (item_list != NULL)
+ arg_count= item_list->elements;
+
+ if (arg_count < 1 || arg_count > 2)
+ {
+ my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
+ return func;
+ }
+
+ thd->lex->safe_to_cache_query= 0;
+
+ Item *param_1= item_list->pop();
+ switch (arg_count) {
+ case 1:
+ {
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1);
+ break;
+ }
+ case 2:
+ {
+ Item *param_2= item_list->pop();
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1, param_2);
+ break;
+ }
+ }
+
+ return func;
+}
+
+
Create_func_md5 Create_func_md5::s_singleton;
Item*
@@ -5536,6 +5590,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("MAKEDATE") }, BUILDER(Create_func_makedate)},
{ { C_STRING_WITH_LEN("MAKETIME") }, BUILDER(Create_func_maketime)},
{ { C_STRING_WITH_LEN("MAKE_SET") }, BUILDER(Create_func_make_set)},
+ { { C_STRING_WITH_LEN("MASTER_GTID_WAIT") }, BUILDER(Create_func_master_gtid_wait)},
{ { C_STRING_WITH_LEN("MASTER_POS_WAIT") }, BUILDER(Create_func_master_pos_wait)},
{ { C_STRING_WITH_LEN("MBRCONTAINS") }, GEOM_BUILDER(Create_func_mbr_contains)},
{ { C_STRING_WITH_LEN("MBRDISJOINT") }, GEOM_BUILDER(Create_func_mbr_disjoint)},
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 5d9abbb0d8c..b2af80e6d96 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -3989,6 +3989,34 @@ err:
}
+longlong Item_master_gtid_wait::val_int()
+{
+ DBUG_ASSERT(fixed == 1);
+ longlong result= 0;
+
+ if (args[0]->null_value)
+ {
+ null_value= 1;
+ return 0;
+ }
+
+ null_value=0;
+#ifdef HAVE_REPLICATION
+ THD* thd= current_thd;
+ longlong timeout_us;
+ String *gtid_pos = args[0]->val_str(&value);
+
+ if (arg_count==2 && !args[1]->null_value)
+ timeout_us= (longlong)(1e6*args[1]->val_real());
+ else
+ timeout_us= (longlong)-1;
+
+ result= rpl_global_gtid_waiting.wait_for_pos(thd, gtid_pos, timeout_us);
+#endif
+ return result;
+}
+
+
/**
Enables a session to wait on a condition until a timeout or a network
disconnect occurs.
diff --git a/sql/item_func.h b/sql/item_func.h
index 384a6b535df..2e3f352e377 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -1642,6 +1642,22 @@ public:
};
+class Item_master_gtid_wait :public Item_int_func
+{
+ String value;
+public:
+ Item_master_gtid_wait(Item *a) :Item_int_func(a) {}
+ Item_master_gtid_wait(Item *a,Item *b) :Item_int_func(a,b) {}
+ longlong val_int();
+ const char *func_name() const { return "master_gtid_wait"; }
+ void fix_length_and_dec() { max_length=10+1+10+1+20+1; maybe_null=0;}
+ bool check_vcol_func_processor(uchar *int_arg)
+ {
+ return trace_unsupported_by_check_vcol_func_processor(func_name());
+ }
+};
+
+
/* Handling of user definable variables */
class user_var_entry;
diff --git a/sql/log.cc b/sql/log.cc
index fbb73acf5d1..c5cdc2cccc5 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -5446,6 +5446,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
}
if (err)
return true;
+ thd->last_commit_gtid= gtid;
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional,
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 4e6646feead..ebfa238df3f 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -780,6 +780,7 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
@@ -825,6 +826,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
+ { &key_LOCK_gtid_waiting, "gtid_waiting::LOCK_gtid_waiting", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -895,6 +897,7 @@ PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_prepare_ordered;
+PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
{
@@ -940,7 +943,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
- { &key_COND_prepare_ordered, "COND_prepare_ordered", 0}
+ { &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
+ { &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
@@ -1821,6 +1825,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
+ rpl_deinit_gtid_waiting();
rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
mysql_audit_finalize();
@@ -4201,6 +4206,7 @@ static int init_thread_environment()
#ifdef HAVE_REPLICATION
rpl_init_gtid_slave_state();
+ rpl_init_gtid_waiting();
#endif
DBUG_RETURN(0);
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 2e10b0caeb5..4fdd34fd8be 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -256,6 +256,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+extern PSI_mutex_key key_LOCK_gtid_waiting;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -285,6 +286,7 @@ extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
+extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 3f79a0cb528..00140fd3475 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
there will not be an attempt to delete the corresponding table row before
it is even committed.
*/
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
sql_print_warning("Slave: Out of memory during slave state maintenance. "
@@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
}
+static void
+rpl_slave_state_free_element(void *arg)
+{
+ struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
+ mysql_cond_destroy(&elem->COND_wait_gtid);
+ my_free(elem);
+}
+
+
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
}
@@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (!(elem= get_element(domain_id)))
return 1;
+ if (seq_no > elem->highest_seq_no)
+ elem->highest_seq_no= seq_no;
+ if (elem->min_wait_seq_no != 0 && elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
+ Signal (and remove) them. The waiter will handle all the processing
+ of all pending MASTER_GTID_WAIT(), so we do not slow down the
+ replication SQL thread.
+ */
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ elem->min_wait_seq_no= 0;
+ mysql_cond_broadcast(&elem->COND_wait_gtid);
+ }
+
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
@@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id)
return NULL;
elem->list= NULL;
elem->domain_id= domain_id;
+ elem->highest_seq_no= 0;
+ elem->min_wait_seq_no= 0;
+ mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
@@ -378,10 +405,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end;
}
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
if ((elem= get_element(gtid->domain_id)) == NULL)
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto end;
@@ -410,7 +437,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
cur->next= NULL;
elem->list= cur;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (!elist)
goto end;
@@ -470,9 +497,9 @@ end:
*/
if (elist)
{
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
}
ha_rollback_trans(thd, FALSE);
@@ -499,9 +526,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
sub_id= ++last_sub_id;
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return sub_id;
}
@@ -541,7 +568,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
@@ -576,19 +603,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
if ((res= (*cb)(&best_gtid, data)))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
@@ -659,11 +686,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
list_element *list;
uint64 best_sub_id;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return false;
}
@@ -681,7 +708,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
out_gtid->seq_no= list->seq_no;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return true;
}
@@ -811,7 +838,7 @@ rpl_slave_state::is_empty()
uint32 i;
bool result= true;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
@@ -821,7 +848,7 @@ rpl_slave_state::is_empty()
break;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return result;
}
@@ -1647,3 +1674,418 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
return 0;
}
+
+
+/*
+ Execute a MASTER_GTID_WAIT().
+ The position to wait for is in gtid_str in string form.
+ The timeout in microseconds is in timeout_us, zero means no timeout.
+
+ Returns:
+ 1 for error.
+ 0 for wait completed.
+ -1 for wait timed out.
+*/
+int
+gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
+{
+ int err;
+ rpl_gtid *wait_pos;
+ uint32 count, i;
+ struct timespec wait_until, *wait_until_ptr;
+
+ /* Wait for the empty position returns immediately. */
+ if (gtid_str->length() == 0)
+ return 0;
+
+ if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
+ &count)))
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+
+ if (timeout_us >= 0)
+ {
+ set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
+ wait_until_ptr= &wait_until;
+ }
+ else
+ wait_until_ptr= NULL;
+ err= 0;
+ for (i= 0; i < count; ++i)
+ {
+ if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
+ break;
+ }
+ my_free(wait_pos);
+ return err;
+}
+
+
+void
+gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
+{
+ queue_element *qe;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ if (queue_empty(&he->queue))
+ return;
+ qe= (queue_element *)queue_top(&he->queue);
+ qe->thd->wakeup_ready= true;
+ qe->wakeup_reason= queue_element::TAKEOVER;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+}
+
+void
+gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
+ gtid_waiting::hash_element *he)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ for (;;)
+ {
+ queue_element *qe;
+
+ if (queue_first_element(&he->queue) > queue_last_element(&he->queue))
+ break;
+ qe= (queue_element *)queue_top(&he->queue);
+ if (qe->wait_seq_no > wakeup_seq_no)
+ break;
+ queue_remove_top(&he->queue);
+ qe->thd->wakeup_ready= true;
+ qe->wakeup_reason= queue_element::DONE;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+ }
+}
+
+
+/*
+ Execute a MASTER_GTID_WAIT() for one specific domain.
+
+ The implementation is optimised primarily for (1) minimal performance impact
+ on the slave replication threads, and secondarily for (2) quick performance
+ of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
+ read to clients in an async replication read-scaleout scenario.
+
+ To achieve (1), we have a "small" wait and a "large" wait. The small wait
+ contends with the replication threads on the lock on the gtid_slave_pos, so
+ only minimal processing is done under that lock, and only a single waiter at
+ a time does the small wait.
+
+ If there is already a small waiter, a new thread will either replace the
+ small waiter (if it needs to wait for an earlier sequence number), or
+ instead to a "large" wait.
+
+ Once awoken on the small wait, the waiting thread releases the lock shared
+ with the SQL threads quickly, and then processes all waiters currently doing
+ the large wait.
+
+ This way, the SQL threads only need to do a single check + possibly a
+ pthread_cond_signal() when updating the gtid_slave_state, and the time that
+ non-SQL threads contend for the lock on gtid_slave_staste is minimized.
+*/
+int
+gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
+ struct timespec *wait_until)
+{
+ bool timed_out= false;
+#ifdef HAVE_REPLICATION
+ queue_element elem;
+ uint32_t domain_id= wait_gtid->domain_id;
+ uint64 seq_no= wait_gtid->seq_no;
+ hash_element *he;
+ rpl_slave_state::element *slave_state_elem= NULL;
+ const char *old_msg= NULL;
+ bool did_enter_cond= false;
+ bool takeover= false;
+
+ elem.wait_seq_no= seq_no;
+ elem.thd= thd;
+ /*
+ Register on the large wait before checking the small wait.
+ This ensures that if we find another waiter already doing the small wait,
+ we are sure to be woken up by that one, and thus we will not need to take
+ the lock on the small wait more than once in this case.
+ */
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ if (!(he= register_in_wait_hash(thd, wait_gtid, &elem)))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+
+ /*
+ Now check the small wait, and either do the large wait or the small one,
+ depending on whether there is already a suitable small waiter or not.
+
+ We may need to do this multiple times, as a previous small waiter may
+ complete and pass the small wait on to us.
+ */
+ for (;;)
+ {
+ uint64 wakeup_seq_no, cur_wait_seq_no;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ /*
+ The elements in the gtid_slave_state_hash are never re-allocated once
+ they enter the hash, so we do not need to re-do the lookup after releasing
+ and re-aquiring the lock.
+ */
+ if (!slave_state_elem &&
+ !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ remove_from_wait_hash(he, &elem);
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
+ {
+ /*
+ We do not have to wait. But we might need to wakeup other threads on
+ the large wait (can happen if we were woken up to take over the small
+ wait, and SQL thread raced with us to reach the waited-for GTID.
+ */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ thd->wakeup_ready= 0;
+ process_wait_hash(wakeup_seq_no, he);
+ /*
+ Since we already checked wakeup_seq_no, we are sure that
+ process_wait_hash() will mark us done.
+ */
+ DBUG_ASSERT(thd->wakeup_ready);
+ if (thd->wakeup_ready)
+ {
+ if (takeover)
+ promote_new_waiter(he);
+ break;
+ }
+ }
+ else if ((cur_wait_seq_no= slave_state_elem->min_wait_seq_no) == 0 ||
+ cur_wait_seq_no > seq_no)
+ {
+ /*
+ We have to do the small wait ourselves (stealing it from any thread that
+ might already be waiting for a later seq_no).
+ */
+ slave_state_elem->min_wait_seq_no= seq_no;
+ if (cur_wait_seq_no != 0)
+ {
+ /* We stole the wait, so wake up the old waiting thread. */
+ mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ }
+ /* Do the small wait. */
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+
+ old_msg= thd->enter_cond(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ "Waiting in MASTER_GTID_WAIT() (primary waiter)");
+ do
+ {
+ if (thd->check_killed())
+ slave_state_elem->min_wait_seq_no = 0;
+ else if (wait_until)
+ {
+ int err=
+ mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ {
+ timed_out= true;
+ slave_state_elem->min_wait_seq_no = 0;
+ }
+ }
+ else
+ mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state);
+ } while (slave_state_elem->min_wait_seq_no == seq_no);
+ /*
+ Check the new gtid_slave_state. We could be woken up because our seq_no
+ has been reached, or because someone else stole the small wait from us.
+ (Or because of kill/timeout).
+ */
+ wakeup_seq_no= slave_state_elem->highest_seq_no;
+
+ thd->exit_cond(old_msg);
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ /*
+ Note that hash_entry pointers do not change once allocated, so we do
+ not need to lookup `he' again after re-aquiring the lock.
+ */
+ thd->wakeup_ready= 0;
+ process_wait_hash(wakeup_seq_no, he);
+ if (thd->wakeup_ready)
+ promote_new_waiter(he);
+ else if (thd->killed || timed_out)
+ {
+ remove_from_wait_hash(he, &elem);
+ promote_new_waiter(he);
+ if (thd->killed)
+ thd->send_kill_message();
+ break;
+ }
+ }
+ else
+ {
+ /* We have to do the large wait. */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ thd->wakeup_ready= 0;
+ }
+
+ takeover= false;
+ old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
+ "Waiting in MASTER_GTID_WAIT()");
+ while (!thd->wakeup_ready && !thd->check_killed() && !timed_out)
+ {
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ if (wait_until)
+ {
+ int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
+ &LOCK_gtid_waiting, wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ timed_out= true;
+ }
+ else
+ mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
+ thd_wait_end(thd);
+ }
+
+ if (elem.wakeup_reason == queue_element::DONE)
+ break;
+ takeover= true;
+
+ if (thd->killed || timed_out)
+ {
+ remove_from_wait_hash(he, &elem);
+ /*
+ If we got kill/timeout _and_ we were asked to takeover the small wait,
+ we need to pass on that task to someone else.
+ */
+ if (thd->wakeup_ready && elem.wakeup_reason == queue_element::TAKEOVER)
+ promote_new_waiter(he);
+ if (thd->killed)
+ thd->send_kill_message();
+ break;
+ }
+ }
+
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+#endif /* HAVE_REPLICATION */
+ return timed_out ? -1 : 0;
+}
+
+
+static void
+free_hash_element(void *p)
+{
+ gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
+ delete_queue(&e->queue);
+ my_free(e);
+}
+
+
+void
+gtid_waiting::init()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(hash_element, domain_id), sizeof(uint32), NULL,
+ free_hash_element, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
+}
+
+
+void
+gtid_waiting::destroy()
+{
+ mysql_mutex_destroy(&LOCK_gtid_waiting);
+ my_hash_free(&hash);
+}
+
+
+static int
+cmp_queue_elem(void *, uchar *a, uchar *b)
+{
+ uint64 seq_no_a= *(uint64 *)a;
+ uint64 seq_no_b= *(uint64 *)b;
+ if (seq_no_a < seq_no_b)
+ return -1;
+ else if (seq_no_a == seq_no_b)
+ return 0;
+ else
+ return 1;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::get_entry(uint32 domain_id)
+{
+ hash_element *e;
+
+ if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ return e;
+
+ if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e));
+ return NULL;
+ }
+
+ if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
+ cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(e);
+ return NULL;
+ }
+ e->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)e))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete_queue(&e->queue);
+ my_free(e);
+ return NULL;
+ }
+ return e;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
+ gtid_waiting::queue_element *elem)
+{
+ hash_element *e;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (!(e= get_entry(wait_gtid->domain_id)))
+ return NULL;
+
+ if (queue_insert_safe(&e->queue, (uchar *)elem))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return e;
+}
+
+
+void
+gtid_waiting::remove_from_wait_hash(gtid_waiting::hash_element *e,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ queue_remove(&e->queue, elem->queue_idx);
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index b0bc54900e7..1cf57c45018 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -16,6 +16,10 @@
#ifndef RPL_GTID_H
#define RPL_GTID_H
+#include "hash.h"
+#include "queues.h"
+
+
/* Definitions for MariaDB global transaction ID (GTID). */
@@ -61,6 +65,15 @@ struct rpl_slave_state
{
struct list_element *list;
uint32 domain_id;
+ /* Highest seq_no seen so far in this domain. */
+ uint64 highest_seq_no;
+ /*
+ If min_wait_seq_no is non-zero, then it is the smallest seq_no in this
+ domain that someone is doing MASTER_GTID_WAIT() on. When we reach this
+ seq_no, we need to signal the waiter on COND_wait_gtid.
+ */
+ uint64 min_wait_seq_no;
+ mysql_cond_t COND_wait_gtid;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add(list_element *l)
@@ -99,9 +112,6 @@ struct rpl_slave_state
bool in_statement);
bool is_empty();
- void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
- void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
-
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
@@ -204,6 +214,49 @@ struct slave_connection_state
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
};
+
+/*
+ Structure to keep track of threads waiting in MASTER_GTID_WAIT().
+
+ Since replication is (mostly) single-threaded, we want to minimise the
+ performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
+ are careful to keep the common lock between replication threads and
+ MASTER_GTID_WAIT threads held for as short as possible. We keep only
+ a single thread waiting to be notified by the replication threads; this
+ thread then handles all the (potentially heavy) lifting of dealing with
+ all current waiting threads.
+*/
+
+struct gtid_waiting {
+ /* Elements in the hash, basically a priority queue for each domain. */
+ struct hash_element {
+ QUEUE queue;
+ uint32 domain_id;
+ };
+ /* A priority queue to handle waiters in one domain in seq_no order. */
+ struct queue_element {
+ uint64 wait_seq_no;
+ THD *thd;
+ int queue_idx;
+ enum { DONE, TAKEOVER } wakeup_reason;
+ };
+
+ mysql_mutex_t LOCK_gtid_waiting;
+ HASH hash;
+
+ void init();
+ void destroy();
+ hash_element *get_entry(uint32 domain_id);
+ int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
+ void promote_new_waiter(gtid_waiting::hash_element *he);
+ int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
+ void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
+ hash_element *register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
+ queue_element *elem);
+ void remove_from_wait_hash(hash_element *e, queue_element *elem);
+};
+
+
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
bool *first);
extern int gtid_check_rpl_slave_state_table(TABLE *table);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index cfa7c0f344f..8384297624c 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -37,6 +37,8 @@ static int count_relay_log_space(Relay_log_info* rli);
domain).
*/
rpl_slave_state rpl_global_gtid_slave_state;
+/* Object used for MASTER_GTID_WAIT(). */
+gtid_waiting rpl_global_gtid_waiting;
// Defined in slave.cc
@@ -1312,9 +1314,9 @@ rpl_load_gtid_slave_state(THD *thd)
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state.loaded;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);
@@ -1414,10 +1416,10 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (rpl_global_gtid_slave_state.loaded)
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
goto end;
}
@@ -1429,7 +1431,7 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.sub_id,
tmp_entry.gtid.seq_no)))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
@@ -1442,14 +1444,14 @@ rpl_load_gtid_slave_state(THD *thd)
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
}
rpl_global_gtid_slave_state.loaded= true;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
err= 0; /* Clear HA_ERR_END_OF_FILE */
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index ff2ffd0b366..ef18c7e7f9f 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -702,6 +702,7 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
+extern gtid_waiting rpl_global_gtid_waiting;
int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index b74ffecf82e..db61983ef29 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6547,7 +6547,7 @@ ER_UNTIL_REQUIRES_USING_GTID
ER_GTID_STRICT_OUT_OF_ORDER
eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled."
ER_GTID_START_FROM_BINLOG_HOLE
- eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled"
+ eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though a subsequent sequence number does exist), and GTID strict mode is enabled"
ER_SLAVE_UNEXPECTED_MASTER_SWITCH
eng "Unexpected GTID received from master after reconnect. This normally indicates that the master server was replaced without restarting the slave threads. %s"
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 8abdd53469f..b5201371e15 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1266,6 +1266,7 @@ void THD::init(void)
set_status_var_init();
bzero((char *) &org_status_var, sizeof(org_status_var));
start_bytes_received= 0;
+ last_commit_gtid.seq_no= 0;
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index a3fc3a7866f..c1cfd8b4d5b 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -44,6 +44,7 @@
#include "thr_lock.h" /* thr_lock_type, THR_LOCK_DATA,
THR_LOCK_INFO */
#include "my_apc.h"
+#include "rpl_gtid.h"
class Reprepare_observer;
class Relay_log_info;
@@ -3405,6 +3406,12 @@ private:
*/
LEX_STRING invoker_user;
LEX_STRING invoker_host;
+
+ /* Protect against add/delete of temporary tables in parallel replication */
+ void rgi_lock_temporary_tables();
+ void rgi_unlock_temporary_tables();
+ bool rgi_have_temporary_tables();
+public:
/*
Flag, mutex and condition for a thread to wait for a signal from another
thread.
@@ -3415,12 +3422,12 @@ private:
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
mysql_cond_t COND_wakeup_ready;
+ /*
+ The GTID assigned to the last commit. If no GTID was assigned to any commit
+ so far, this is indicated by last_commit_gtid.seq_no == 0.
+ */
+ rpl_gtid last_commit_gtid;
- /* Protect against add/delete of temporary tables in parallel replication */
- void rgi_lock_temporary_tables();
- void rgi_unlock_temporary_tables();
- bool rgi_have_temporary_tables();
-public:
inline void lock_temporary_tables()
{
if (rgi_slave)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 363b53e05a6..7c4e5b1b383 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -3965,6 +3965,20 @@ rpl_deinit_gtid_slave_state()
}
+void
+rpl_init_gtid_waiting()
+{
+ rpl_global_gtid_waiting.init();
+}
+
+
+void
+rpl_deinit_gtid_waiting()
+{
+ rpl_global_gtid_waiting.destroy();
+}
+
+
/*
Format the current GTID state as a string, for returning the value of
@@global.gtid_slave_pos.
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index da55e3e863f..defb1b23f5b 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -70,6 +70,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
+void rpl_init_gtid_waiting();
+void rpl_deinit_gtid_waiting();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 08b4953b2e4..cd24ad38eb2 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1538,6 +1538,33 @@ static Sys_var_gtid_binlog_state Sys_gtid_binlog_state(
GLOBAL_VAR(opt_gtid_binlog_state_dummy), NO_CMD_LINE);
+static Sys_var_last_gtid Sys_last_gtid(
+ "last_gtid", "The GTID of the last commit (if binlogging was enabled), "
+ "or the empty string if none.",
+ READ_ONLY sys_var::ONLY_SESSION, NO_CMD_LINE);
+
+
+uchar *
+Sys_var_last_gtid::session_value_ptr(THD *thd, LEX_STRING *base)
+{
+ char buf[10+1+10+1+20+1];
+ String str(buf, sizeof(buf), system_charset_info);
+ char *p;
+ bool first= true;
+
+ str.length(0);
+ if ((thd->last_commit_gtid.seq_no > 0 &&
+ rpl_slave_state_tostring_helper(&str, &thd->last_commit_gtid, &first)) ||
+ !(p= thd->strmake(str.ptr(), str.length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return (uchar *)p;
+}
+
+
static bool
check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
{
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 3cc4da32811..6a84fc5fbc2 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -2211,3 +2211,53 @@ public:
}
uchar *global_value_ptr(THD *thd, LEX_STRING *base);
};
+
+
+/**
+ Class for @@session.last_gtid.
+*/
+class Sys_var_last_gtid: public sys_var
+{
+public:
+ Sys_var_last_gtid(const char *name_arg,
+ const char *comment, int flag_args, CMD_LINE getopt)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, 0, getopt.id,
+ getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, NULL)
+ {
+ option.var_type= GET_STR;
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool check_update_type(Item_result type) {
+ DBUG_ASSERT(false);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base);
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(false);
+ return NULL;
+ }
+};