summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-02-10 12:39:26 +0100
committerunknown <knielsen@knielsen-hq.org>2014-02-10 12:39:26 +0100
commitfefdb576bb6b49adcff914f909429781452dd7bf (patch)
tree9a9e5f5f6e796e4e621eb593ed7992be57aa3683 /sql
parent8cc6e90d74f4377491bcb7a0f1acd41ccf9fbcae (diff)
parent4a976545e518e70d5124e356a057c8d9624038a3 (diff)
downloadmariadb-git-fefdb576bb6b49adcff914f909429781452dd7bf.tar.gz
Merge of MDEV-4984, MDEV-4726, and MDEV-5636 into 10.0-base.
MDEV-4984: Implement MASTER_GTID_WAIT() and @@LAST_GTID. MDEV-4726: Race in mysql-test/suite/rpl/t/rpl_gtid_stop_start.test MDEV-5636: Deadlock in RESET MASTER
Diffstat (limited to 'sql')
-rw-r--r--sql/item_create.cc55
-rw-r--r--sql/item_func.cc28
-rw-r--r--sql/item_func.h16
-rw-r--r--sql/log.cc36
-rw-r--r--sql/log.h1
-rw-r--r--sql/mysqld.cc8
-rw-r--r--sql/mysqld.h2
-rw-r--r--sql/rpl_gtid.cc507
-rw-r--r--sql/rpl_gtid.h73
-rw-r--r--sql/rpl_rli.cc16
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/sql_class.cc1
-rw-r--r--sql/sql_class.h17
-rw-r--r--sql/sql_repl.cc14
-rw-r--r--sql/sql_repl.h2
-rw-r--r--sql/sys_vars.cc27
-rw-r--r--sql/sys_vars.h50
18 files changed, 808 insertions, 48 deletions
diff --git a/sql/item_create.cc b/sql/item_create.cc
index 60eabe67c83..c158816bf32 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -1783,6 +1783,19 @@ protected:
};
+class Create_func_master_gtid_wait : public Create_native_func
+{
+public:
+ virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
+
+ static Create_func_master_gtid_wait s_singleton;
+
+protected:
+ Create_func_master_gtid_wait() {}
+ virtual ~Create_func_master_gtid_wait() {}
+};
+
+
class Create_func_md5 : public Create_func_arg1
{
public:
@@ -4590,6 +4603,47 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name,
}
+Create_func_master_gtid_wait Create_func_master_gtid_wait::s_singleton;
+
+Item*
+Create_func_master_gtid_wait::create_native(THD *thd, LEX_STRING name,
+ List<Item> *item_list)
+{
+ Item *func= NULL;
+ int arg_count= 0;
+
+ thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
+
+ if (item_list != NULL)
+ arg_count= item_list->elements;
+
+ if (arg_count < 1 || arg_count > 2)
+ {
+ my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
+ return func;
+ }
+
+ thd->lex->safe_to_cache_query= 0;
+
+ Item *param_1= item_list->pop();
+ switch (arg_count) {
+ case 1:
+ {
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1);
+ break;
+ }
+ case 2:
+ {
+ Item *param_2= item_list->pop();
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1, param_2);
+ break;
+ }
+ }
+
+ return func;
+}
+
+
Create_func_md5 Create_func_md5::s_singleton;
Item*
@@ -5536,6 +5590,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("MAKEDATE") }, BUILDER(Create_func_makedate)},
{ { C_STRING_WITH_LEN("MAKETIME") }, BUILDER(Create_func_maketime)},
{ { C_STRING_WITH_LEN("MAKE_SET") }, BUILDER(Create_func_make_set)},
+ { { C_STRING_WITH_LEN("MASTER_GTID_WAIT") }, BUILDER(Create_func_master_gtid_wait)},
{ { C_STRING_WITH_LEN("MASTER_POS_WAIT") }, BUILDER(Create_func_master_pos_wait)},
{ { C_STRING_WITH_LEN("MBRCONTAINS") }, GEOM_BUILDER(Create_func_mbr_contains)},
{ { C_STRING_WITH_LEN("MBRDISJOINT") }, GEOM_BUILDER(Create_func_mbr_disjoint)},
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 2c6e195075a..df7ccea3caf 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -3991,6 +3991,34 @@ err:
}
+longlong Item_master_gtid_wait::val_int()
+{
+ DBUG_ASSERT(fixed == 1);
+ longlong result= 0;
+
+ if (args[0]->null_value)
+ {
+ null_value= 1;
+ return 0;
+ }
+
+ null_value=0;
+#ifdef HAVE_REPLICATION
+ THD* thd= current_thd;
+ longlong timeout_us;
+ String *gtid_pos = args[0]->val_str(&value);
+
+ if (arg_count==2 && !args[1]->null_value)
+ timeout_us= (longlong)(1e6*args[1]->val_real());
+ else
+ timeout_us= (longlong)-1;
+
+ result= rpl_global_gtid_waiting.wait_for_pos(thd, gtid_pos, timeout_us);
+#endif
+ return result;
+}
+
+
/**
Enables a session to wait on a condition until a timeout or a network
disconnect occurs.
diff --git a/sql/item_func.h b/sql/item_func.h
index 384a6b535df..2e3f352e377 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -1642,6 +1642,22 @@ public:
};
+class Item_master_gtid_wait :public Item_int_func
+{
+ String value;
+public:
+ Item_master_gtid_wait(Item *a) :Item_int_func(a) {}
+ Item_master_gtid_wait(Item *a,Item *b) :Item_int_func(a,b) {}
+ longlong val_int();
+ const char *func_name() const { return "master_gtid_wait"; }
+ void fix_length_and_dec() { max_length=10+1+10+1+20+1; maybe_null=0;}
+ bool check_vcol_func_processor(uchar *int_arg)
+ {
+ return trace_unsupported_by_check_vcol_func_processor(func_name());
+ }
+};
+
+
/* Handling of user definable variables */
class user_var_entry;
diff --git a/sql/log.cc b/sql/log.cc
index fbb73acf5d1..6fead95a4b1 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -2933,7 +2933,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
- :reset_master_pending(false),
+ :reset_master_pending(false), mark_xid_done_waiting(0),
bytes_written(0), file_id(1), open_count(1),
group_commit_queue(0), group_commit_queue_busy(FALSE),
num_commits(0), num_group_commits(0),
@@ -3749,22 +3749,11 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
const char* save_name;
DBUG_ENTER("reset_logs");
- if (thd)
- ha_reset_logs(thd);
- /*
- We need to get both locks to be sure that no one is trying to
- write to the index log file.
- */
- mysql_mutex_lock(&LOCK_log);
- mysql_mutex_lock(&LOCK_index);
-
if (!is_relay_log)
{
if (init_state && !is_empty_state())
{
my_error(ER_BINLOG_MUST_BE_EMPTY, MYF(0));
- mysql_mutex_unlock(&LOCK_index);
- mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(1);
}
@@ -3773,11 +3762,29 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
This ensures that a binlog checkpoint will not try to write binlog
checkpoint events, which would be useless (as we are deleting the binlog
anyway) and could deadlock, as we are holding LOCK_log.
+
+ Wait for any mark_xid_done() calls that might be already running to
+ complete (mark_xid_done_waiting counter to drop to zero); we need to
+ do this before we take the LOCK_log to not deadlock.
*/
mysql_mutex_lock(&LOCK_xid_list);
reset_master_pending= true;
+ while (mark_xid_done_waiting > 0)
+ mysql_cond_wait(&COND_xid_list, &LOCK_xid_list);
mysql_mutex_unlock(&LOCK_xid_list);
+ }
+ if (thd)
+ ha_reset_logs(thd);
+ /*
+ We need to get both locks to be sure that no one is trying to
+ write to the index log file.
+ */
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_index);
+
+ if (!is_relay_log)
+ {
/*
We are going to nuke all binary log files.
Without binlog, we cannot XA recover prepared-but-not-committed
@@ -5446,6 +5453,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
}
if (err)
return true;
+ thd->last_commit_gtid= gtid;
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional,
@@ -8833,9 +8841,13 @@ TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint)
locks in the opposite order.
*/
+ ++mark_xid_done_waiting;
mysql_mutex_unlock(&LOCK_xid_list);
mysql_mutex_lock(&LOCK_log);
mysql_mutex_lock(&LOCK_xid_list);
+ --mark_xid_done_waiting;
+ if (unlikely(reset_master_pending))
+ mysql_cond_signal(&COND_xid_list);
/* We need to reload current_binlog_id due to release/re-take of lock. */
current= current_binlog_id;
diff --git a/sql/log.h b/sql/log.h
index 45381152d97..4249246277f 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -471,6 +471,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
checkpoint arrives - when all have arrived, RESET MASTER will complete.
*/
bool reset_master_pending;
+ ulong mark_xid_done_waiting;
/* LOCK_log and LOCK_index are inited by init_pthread_objects() */
mysql_mutex_t LOCK_index;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 923b8db779e..d95d856c18f 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -780,6 +780,7 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
@@ -825,6 +826,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
+ { &key_LOCK_gtid_waiting, "gtid_waiting::LOCK_gtid_waiting", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -895,6 +897,7 @@ PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_prepare_ordered;
+PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
{
@@ -940,7 +943,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
- { &key_COND_prepare_ordered, "COND_prepare_ordered", 0}
+ { &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
+ { &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
@@ -1821,6 +1825,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
+ rpl_deinit_gtid_waiting();
rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
mysql_audit_finalize();
@@ -4202,6 +4207,7 @@ static int init_thread_environment()
#ifdef HAVE_REPLICATION
rpl_init_gtid_slave_state();
+ rpl_init_gtid_waiting();
#endif
DBUG_RETURN(0);
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 2e10b0caeb5..4fdd34fd8be 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -256,6 +256,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+extern PSI_mutex_key key_LOCK_gtid_waiting;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -285,6 +286,7 @@ extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
+extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 3f79a0cb528..d7923ed9130 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
there will not be an attempt to delete the corresponding table row before
it is even committed.
*/
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
sql_print_warning("Slave: Out of memory during slave state maintenance. "
@@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
}
+static void
+rpl_slave_state_free_element(void *arg)
+{
+ struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
+ mysql_cond_destroy(&elem->COND_wait_gtid);
+ my_free(elem);
+}
+
+
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
}
@@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (!(elem= get_element(domain_id)))
return 1;
+ if (seq_no > elem->highest_seq_no)
+ elem->highest_seq_no= seq_no;
+ if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
+ Signal (and remove) them. The waiter will handle all the processing
+ of all pending MASTER_GTID_WAIT(), so we do not slow down the
+ replication SQL thread.
+ */
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ elem->gtid_waiter= NULL;
+ mysql_cond_broadcast(&elem->COND_wait_gtid);
+ }
+
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
@@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id)
return NULL;
elem->list= NULL;
elem->domain_id= domain_id;
+ elem->highest_seq_no= 0;
+ elem->gtid_waiter= NULL;
+ mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
@@ -378,10 +405,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end;
}
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
if ((elem= get_element(gtid->domain_id)) == NULL)
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto end;
@@ -410,7 +437,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
cur->next= NULL;
elem->list= cur;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (!elist)
goto end;
@@ -470,9 +497,9 @@ end:
*/
if (elist)
{
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
}
ha_rollback_trans(thd, FALSE);
@@ -499,9 +526,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
sub_id= ++last_sub_id;
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return sub_id;
}
@@ -541,7 +568,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
@@ -576,19 +603,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
if ((res= (*cb)(&best_gtid, data)))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
@@ -659,11 +686,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
list_element *list;
uint64 best_sub_id;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return false;
}
@@ -681,7 +708,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
out_gtid->seq_no= list->seq_no;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return true;
}
@@ -811,7 +838,7 @@ rpl_slave_state::is_empty()
uint32 i;
bool result= true;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
@@ -821,7 +848,7 @@ rpl_slave_state::is_empty()
break;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return result;
}
@@ -1647,3 +1674,445 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
return 0;
}
+
+
+/*
+ Execute a MASTER_GTID_WAIT().
+ The position to wait for is in gtid_str in string form.
+ The timeout in microseconds is in timeout_us, zero means no timeout.
+
+ Returns:
+ 1 for error.
+ 0 for wait completed.
+ -1 for wait timed out.
+*/
+int
+gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
+{
+ int err;
+ rpl_gtid *wait_pos;
+ uint32 count, i;
+ struct timespec wait_until, *wait_until_ptr;
+
+ /* Wait for the empty position returns immediately. */
+ if (gtid_str->length() == 0)
+ return 0;
+
+ if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
+ &count)))
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+
+ if (timeout_us >= 0)
+ {
+ set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
+ wait_until_ptr= &wait_until;
+ }
+ else
+ wait_until_ptr= NULL;
+ err= 0;
+ for (i= 0; i < count; ++i)
+ {
+ if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
+ break;
+ }
+ my_free(wait_pos);
+ return err;
+}
+
+
+void
+gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
+{
+ queue_element *qe;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ if (queue_empty(&he->queue))
+ return;
+ qe= (queue_element *)queue_top(&he->queue);
+ qe->do_small_wait= true;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+}
+
+void
+gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
+ gtid_waiting::hash_element *he)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ for (;;)
+ {
+ queue_element *qe;
+
+ if (queue_empty(&he->queue))
+ break;
+ qe= (queue_element *)queue_top(&he->queue);
+ if (qe->wait_seq_no > wakeup_seq_no)
+ break;
+ DBUG_ASSERT(!qe->done);
+ queue_remove_top(&he->queue);
+ qe->done= true;;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+ }
+}
+
+
+/*
+ Execute a MASTER_GTID_WAIT() for one specific domain.
+
+ The implementation is optimised primarily for (1) minimal performance impact
+ on the slave replication threads, and secondarily for (2) quick performance
+ of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
+ read to clients in an async replication read-scaleout scenario.
+
+ To achieve (1), we have a "small" wait and a "large" wait. The small wait
+ contends with the replication threads on the lock on the gtid_slave_pos, so
+ only minimal processing is done under that lock, and only a single waiter at
+ a time does the small wait.
+
+ If there is already a small waiter, a new thread will either replace the
+ small waiter (if it needs to wait for an earlier sequence number), or
+ instead do a "large" wait.
+
+ Once awoken on the small wait, the waiting thread releases the lock shared
+ with the SQL threads quickly, and then processes all waiters currently doing
+ the large wait using a different lock that does not impact replication.
+
+ This way, the SQL threads only need to do a single check + possibly a
+ pthread_cond_signal() when updating the gtid_slave_state, and the time that
+ non-SQL threads contend for the lock on gtid_slave_state is minimized.
+
+ There is always at least one thread that has the responsibility to ensure
+ that there is a small waiter; this thread has queue_element::do_small_wait
+ set to true. This thread will do the small wait until it is done, at which
+ point it will make sure to pass on the responsibility to another thread.
+ Normally only one thread has do_small_wait==true, but it can occasionally
+ happen that there is more than one, when threads race one another for the
+ lock on the small wait (this results in slightly increased activity on the
+ small lock but is otherwise harmless).
+
+ Returns:
+ 0 Wait completed normally
+ -1 Wait completed due to timeout
+ 1 An error (my_error() will have been called to set the error in the da)
+*/
+int
+gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
+ struct timespec *wait_until)
+{
+ bool timed_out= false;
+#ifdef HAVE_REPLICATION
+ queue_element elem;
+ uint32 domain_id= wait_gtid->domain_id;
+ uint64 seq_no= wait_gtid->seq_no;
+ hash_element *he;
+ rpl_slave_state::element *slave_state_elem= NULL;
+ const char *old_msg= NULL;
+ bool did_enter_cond= false;
+
+ elem.wait_seq_no= seq_no;
+ elem.thd= thd;
+ elem.done= false;
+
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ if (!(he= get_entry(wait_gtid->domain_id)))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+ /*
+ If there is already another waiter with seq_no no larger than our own,
+ we are sure that there is already a small waiter that will wake us up
+ (or later pass the small wait responsibility to us). So in this case, we
+ do not need to touch the small wait lock at all.
+ */
+ elem.do_small_wait=
+ (queue_empty(&he->queue) ||
+ ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
+
+ if (register_in_wait_queue(thd, wait_gtid, he, &elem))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+ /*
+ Loop, doing either the small or large wait as appropriate, until either
+ the position waited for is reached, or we get a kill or timeout.
+ */
+ for (;;)
+ {
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (elem.do_small_wait)
+ {
+ uint64 wakeup_seq_no;
+ queue_element *cur_waiter;
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ /*
+ The elements in the gtid_slave_state_hash are never re-allocated once
+ they enter the hash, so we do not need to re-do the lookup after releasing
+ and re-aquiring the lock.
+ */
+ if (!slave_state_elem &&
+ !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ remove_from_wait_queue(he, &elem);
+ promote_new_waiter(he);
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
+ {
+ /*
+ We do not have to wait. (We will be removed from the wait queue when
+ we call process_wait_hash() below.
+ */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ }
+ else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
+ slave_state_elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ There is already a suitable small waiter, go do the large wait.
+ (Normally we would not have needed to check the small wait in this
+ case, but it can happen if we race with another thread for the small
+ lock).
+ */
+ elem.do_small_wait= false;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ }
+ else
+ {
+ /*
+ We have to do the small wait ourselves (stealing it from any thread
+ that might already be waiting for a later seq_no).
+ */
+ slave_state_elem->gtid_waiter= &elem;
+ slave_state_elem->min_wait_seq_no= seq_no;
+ if (cur_waiter)
+ {
+ /* We stole the wait, so wake up the old waiting thread. */
+ mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ }
+
+ /* Release the large lock, and do the small wait. */
+ if (did_enter_cond)
+ {
+ thd->exit_cond(old_msg);
+ did_enter_cond= false;
+ }
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ old_msg=
+ thd->enter_cond(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ "Waiting in MASTER_GTID_WAIT() (primary waiter)");
+ do
+ {
+ if (thd->check_killed())
+ break;
+ else if (wait_until)
+ {
+ int err=
+ mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ {
+ timed_out= true;
+ break;
+ }
+ }
+ else
+ mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state);
+ } while (slave_state_elem->gtid_waiter == &elem);
+ wakeup_seq_no= slave_state_elem->highest_seq_no;
+ /*
+ If we aborted due to timeout or kill, remove us as waiter.
+
+ If we were replaced by another waiter with a smaller seq_no, then we
+ no longer have responsibility for the small wait.
+ */
+ if ((cur_waiter= slave_state_elem->gtid_waiter))
+ {
+ if (cur_waiter == &elem)
+ slave_state_elem->gtid_waiter= NULL;
+ else if (slave_state_elem->min_wait_seq_no <= seq_no)
+ elem.do_small_wait= false;
+ }
+ thd->exit_cond(old_msg);
+
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ }
+
+ /*
+ Note that hash_entry pointers do not change once allocated, so we do
+ not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
+ */
+ process_wait_hash(wakeup_seq_no, he);
+ }
+ else
+ {
+ /* Do the large wait. */
+ if (!did_enter_cond)
+ {
+ old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
+ "Waiting in MASTER_GTID_WAIT()");
+ did_enter_cond= true;
+ }
+ while (!elem.done && !thd->check_killed())
+ {
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ if (wait_until)
+ {
+ int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
+ &LOCK_gtid_waiting, wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ timed_out= true;
+ }
+ else
+ mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
+ thd_wait_end(thd);
+ if (elem.do_small_wait || timed_out)
+ break;
+ }
+ }
+
+ if ((thd->killed || timed_out) && !elem.done)
+ {
+ /* Aborted, so remove ourselves from the hash. */
+ remove_from_wait_queue(he, &elem);
+ elem.done= true;
+ }
+ if (elem.done)
+ {
+ /*
+ If our wait is done, but we have (or were passed) responsibility for
+ the small wait, then we need to pass on that task to someone else.
+ */
+ if (elem.do_small_wait)
+ promote_new_waiter(he);
+ break;
+ }
+ }
+
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ if (thd->killed)
+ thd->send_kill_message();
+#endif /* HAVE_REPLICATION */
+ return timed_out ? -1 : 0;
+}
+
+
+static void
+free_hash_element(void *p)
+{
+ gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
+ delete_queue(&e->queue);
+ my_free(e);
+}
+
+
+void
+gtid_waiting::init()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(hash_element, domain_id), sizeof(uint32), NULL,
+ free_hash_element, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
+}
+
+
+void
+gtid_waiting::destroy()
+{
+ mysql_mutex_destroy(&LOCK_gtid_waiting);
+ my_hash_free(&hash);
+}
+
+
+static int
+cmp_queue_elem(void *, uchar *a, uchar *b)
+{
+ uint64 seq_no_a= *(uint64 *)a;
+ uint64 seq_no_b= *(uint64 *)b;
+ if (seq_no_a < seq_no_b)
+ return -1;
+ else if (seq_no_a == seq_no_b)
+ return 0;
+ else
+ return 1;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::get_entry(uint32 domain_id)
+{
+ hash_element *e;
+
+ if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ return e;
+
+ if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e));
+ return NULL;
+ }
+
+ if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
+ cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(e);
+ return NULL;
+ }
+ e->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)e))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete_queue(&e->queue);
+ my_free(e);
+ return NULL;
+ }
+ return e;
+}
+
+
+int
+gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
+ gtid_waiting::hash_element *he,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (queue_insert_safe(&he->queue, (uchar *)elem))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ return 0;
+}
+
+
+void
+gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ queue_remove(&he->queue, elem->queue_idx);
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index b0bc54900e7..54f352661a7 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -16,6 +16,10 @@
#ifndef RPL_GTID_H
#define RPL_GTID_H
+#include "hash.h"
+#include "queues.h"
+
+
/* Definitions for MariaDB global transaction ID (GTID). */
@@ -37,6 +41,57 @@ enum enum_gtid_skip_type {
/*
+ Structure to keep track of threads waiting in MASTER_GTID_WAIT().
+
+ Since replication is (mostly) single-threaded, we want to minimise the
+ performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
+ are careful to keep the common lock between replication threads and
+ MASTER_GTID_WAIT threads held for as short as possible. We keep only
+ a single thread waiting to be notified by the replication threads; this
+ thread then handles all the (potentially heavy) lifting of dealing with
+ all current waiting threads.
+*/
+struct gtid_waiting {
+ /* Elements in the hash, basically a priority queue for each domain. */
+ struct hash_element {
+ QUEUE queue;
+ uint32 domain_id;
+ };
+ /* A priority queue to handle waiters in one domain in seq_no order. */
+ struct queue_element {
+ uint64 wait_seq_no;
+ THD *thd;
+ int queue_idx;
+ /*
+ do_small_wait is true if we have responsibility for ensuring that there
+ is a small waiter.
+ */
+ bool do_small_wait;
+ /*
+ The flag `done' is set when the wait is completed (either due to reaching
+ the position waited for, or due to timeout or kill). The queue_element
+ is in the queue if and only if `done' is true.
+ */
+ bool done;
+ };
+
+ mysql_mutex_t LOCK_gtid_waiting;
+ HASH hash;
+
+ void init();
+ void destroy();
+ hash_element *get_entry(uint32 domain_id);
+ int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
+ void promote_new_waiter(gtid_waiting::hash_element *he);
+ int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
+ void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
+ int register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, hash_element *he,
+ queue_element *elem);
+ void remove_from_wait_queue(hash_element *he, queue_element *elem);
+};
+
+
+/*
Replication slave state.
For every independent replication stream (identified by domain_id), this
@@ -61,6 +116,20 @@ struct rpl_slave_state
{
struct list_element *list;
uint32 domain_id;
+ /* Highest seq_no seen so far in this domain. */
+ uint64 highest_seq_no;
+ /*
+ If this is non-NULL, then it is the waiter responsible for the small
+ wait in MASTER_GTID_WAIT().
+ */
+ gtid_waiting::queue_element *gtid_waiter;
+ /*
+ If gtid_waiter is non-NULL, then this is the seq_no that its
+ MASTER_GTID_WAIT() is waiting on. When we reach this seq_no, we need to
+ signal the waiter on COND_wait_gtid.
+ */
+ uint64 min_wait_seq_no;
+ mysql_cond_t COND_wait_gtid;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add(list_element *l)
@@ -99,9 +168,6 @@ struct rpl_slave_state
bool in_statement);
bool is_empty();
- void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
- void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
-
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
@@ -204,6 +270,7 @@ struct slave_connection_state
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
};
+
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
bool *first);
extern int gtid_check_rpl_slave_state_table(TABLE *table);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 7f0ec702b87..797f5681ec5 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -37,6 +37,8 @@ static int count_relay_log_space(Relay_log_info* rli);
domain).
*/
rpl_slave_state rpl_global_gtid_slave_state;
+/* Object used for MASTER_GTID_WAIT(). */
+gtid_waiting rpl_global_gtid_waiting;
// Defined in slave.cc
@@ -1317,9 +1319,9 @@ rpl_load_gtid_slave_state(THD *thd)
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state.loaded;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);
@@ -1419,10 +1421,10 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (rpl_global_gtid_slave_state.loaded)
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
goto end;
}
@@ -1434,7 +1436,7 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.sub_id,
tmp_entry.gtid.seq_no)))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
@@ -1447,14 +1449,14 @@ rpl_load_gtid_slave_state(THD *thd)
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
}
rpl_global_gtid_slave_state.loaded= true;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
err= 0; /* Clear HA_ERR_END_OF_FILE */
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 6716718d354..3f95849a926 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -708,6 +708,7 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
+extern gtid_waiting rpl_global_gtid_waiting;
int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 0c9c84d4964..1ed2a2ba462 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6524,7 +6524,7 @@ ER_UNTIL_REQUIRES_USING_GTID
ER_GTID_STRICT_OUT_OF_ORDER
eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled."
ER_GTID_START_FROM_BINLOG_HOLE
- eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled"
+ eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though a subsequent sequence number does exist), and GTID strict mode is enabled"
ER_SLAVE_UNEXPECTED_MASTER_SWITCH
eng "Unexpected GTID received from master after reconnect. This normally indicates that the master server was replaced without restarting the slave threads. %s"
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index c446d901055..09ec7361fe3 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1267,6 +1267,7 @@ void THD::init(void)
set_status_var_init();
bzero((char *) &org_status_var, sizeof(org_status_var));
start_bytes_received= 0;
+ last_commit_gtid.seq_no= 0;
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index a7e3290c543..726b920edb1 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -44,6 +44,7 @@
#include "thr_lock.h" /* thr_lock_type, THR_LOCK_DATA,
THR_LOCK_INFO */
#include "my_apc.h"
+#include "rpl_gtid.h"
class Reprepare_observer;
class Relay_log_info;
@@ -3410,6 +3411,12 @@ private:
*/
LEX_STRING invoker_user;
LEX_STRING invoker_host;
+
+ /* Protect against add/delete of temporary tables in parallel replication */
+ void rgi_lock_temporary_tables();
+ void rgi_unlock_temporary_tables();
+ bool rgi_have_temporary_tables();
+public:
/*
Flag, mutex and condition for a thread to wait for a signal from another
thread.
@@ -3420,12 +3427,12 @@ private:
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
mysql_cond_t COND_wakeup_ready;
+ /*
+ The GTID assigned to the last commit. If no GTID was assigned to any commit
+ so far, this is indicated by last_commit_gtid.seq_no == 0.
+ */
+ rpl_gtid last_commit_gtid;
- /* Protect against add/delete of temporary tables in parallel replication */
- void rgi_lock_temporary_tables();
- void rgi_unlock_temporary_tables();
- bool rgi_have_temporary_tables();
-public:
inline void lock_temporary_tables()
{
if (rgi_slave)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 363b53e05a6..7c4e5b1b383 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -3965,6 +3965,20 @@ rpl_deinit_gtid_slave_state()
}
+void
+rpl_init_gtid_waiting()
+{
+ rpl_global_gtid_waiting.init();
+}
+
+
+void
+rpl_deinit_gtid_waiting()
+{
+ rpl_global_gtid_waiting.destroy();
+}
+
+
/*
Format the current GTID state as a string, for returning the value of
@@global.gtid_slave_pos.
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index da55e3e863f..defb1b23f5b 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -70,6 +70,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
+void rpl_init_gtid_waiting();
+void rpl_deinit_gtid_waiting();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 7b0b64f9961..0495ee3e240 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1538,6 +1538,33 @@ static Sys_var_gtid_binlog_state Sys_gtid_binlog_state(
GLOBAL_VAR(opt_gtid_binlog_state_dummy), NO_CMD_LINE);
+static Sys_var_last_gtid Sys_last_gtid(
+ "last_gtid", "The GTID of the last commit (if binlogging was enabled), "
+ "or the empty string if none.",
+ READ_ONLY sys_var::ONLY_SESSION, NO_CMD_LINE);
+
+
+uchar *
+Sys_var_last_gtid::session_value_ptr(THD *thd, LEX_STRING *base)
+{
+ char buf[10+1+10+1+20+1];
+ String str(buf, sizeof(buf), system_charset_info);
+ char *p;
+ bool first= true;
+
+ str.length(0);
+ if ((thd->last_commit_gtid.seq_no > 0 &&
+ rpl_slave_state_tostring_helper(&str, &thd->last_commit_gtid, &first)) ||
+ !(p= thd->strmake(str.ptr(), str.length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return (uchar *)p;
+}
+
+
static bool
check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
{
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 3cc4da32811..6a84fc5fbc2 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -2211,3 +2211,53 @@ public:
}
uchar *global_value_ptr(THD *thd, LEX_STRING *base);
};
+
+
+/**
+ Class for @@session.last_gtid.
+*/
+class Sys_var_last_gtid: public sys_var
+{
+public:
+ Sys_var_last_gtid(const char *name_arg,
+ const char *comment, int flag_args, CMD_LINE getopt)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, 0, getopt.id,
+ getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, NULL)
+ {
+ option.var_type= GET_STR;
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool check_update_type(Item_result type) {
+ DBUG_ASSERT(false);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base);
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(false);
+ return NULL;
+ }
+};