summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/item_create.cc55
-rw-r--r--sql/item_func.cc28
-rw-r--r--sql/item_func.h16
-rw-r--r--sql/log.cc36
-rw-r--r--sql/log.h1
-rw-r--r--sql/mysqld.cc14
-rw-r--r--sql/mysqld.h4
-rw-r--r--sql/rpl_gtid.cc506
-rw-r--r--sql/rpl_gtid.h73
-rw-r--r--sql/rpl_parallel.cc21
-rw-r--r--sql/rpl_parallel.h1
-rw-r--r--sql/rpl_rli.cc25
-rw-r--r--sql/rpl_rli.h7
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/slave.cc39
-rw-r--r--sql/sql_class.cc1
-rw-r--r--sql/sql_class.h17
-rw-r--r--sql/sql_repl.cc14
-rw-r--r--sql/sql_repl.h2
-rw-r--r--sql/sys_vars.cc27
-rw-r--r--sql/sys_vars.h50
21 files changed, 877 insertions, 62 deletions
diff --git a/sql/item_create.cc b/sql/item_create.cc
index fa8c016d61b..8466319b66f 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -1797,6 +1797,19 @@ protected:
};
+class Create_func_master_gtid_wait : public Create_native_func
+{
+public:
+ virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
+
+ static Create_func_master_gtid_wait s_singleton;
+
+protected:
+ Create_func_master_gtid_wait() {}
+ virtual ~Create_func_master_gtid_wait() {}
+};
+
+
class Create_func_md5 : public Create_func_arg1
{
public:
@@ -4614,6 +4627,47 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name,
}
+Create_func_master_gtid_wait Create_func_master_gtid_wait::s_singleton;
+
+Item*
+Create_func_master_gtid_wait::create_native(THD *thd, LEX_STRING name,
+ List<Item> *item_list)
+{
+ Item *func= NULL;
+ int arg_count= 0;
+
+ thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
+
+ if (item_list != NULL)
+ arg_count= item_list->elements;
+
+ if (arg_count < 1 || arg_count > 2)
+ {
+ my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
+ return func;
+ }
+
+ thd->lex->safe_to_cache_query= 0;
+
+ Item *param_1= item_list->pop();
+ switch (arg_count) {
+ case 1:
+ {
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1);
+ break;
+ }
+ case 2:
+ {
+ Item *param_2= item_list->pop();
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1, param_2);
+ break;
+ }
+ }
+
+ return func;
+}
+
+
Create_func_md5 Create_func_md5::s_singleton;
Item*
@@ -5558,6 +5612,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("MAKEDATE") }, BUILDER(Create_func_makedate)},
{ { C_STRING_WITH_LEN("MAKETIME") }, BUILDER(Create_func_maketime)},
{ { C_STRING_WITH_LEN("MAKE_SET") }, BUILDER(Create_func_make_set)},
+ { { C_STRING_WITH_LEN("MASTER_GTID_WAIT") }, BUILDER(Create_func_master_gtid_wait)},
{ { C_STRING_WITH_LEN("MASTER_POS_WAIT") }, BUILDER(Create_func_master_pos_wait)},
{ { C_STRING_WITH_LEN("MBRCONTAINS") }, GEOM_BUILDER(Create_func_mbr_contains)},
{ { C_STRING_WITH_LEN("MBRDISJOINT") }, GEOM_BUILDER(Create_func_mbr_disjoint)},
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 2ff93b5538a..9918b7c40f6 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -3987,6 +3987,34 @@ err:
}
+longlong Item_master_gtid_wait::val_int()
+{
+ DBUG_ASSERT(fixed == 1);
+ longlong result= 0;
+
+ if (args[0]->null_value)
+ {
+ null_value= 1;
+ return 0;
+ }
+
+ null_value=0;
+#ifdef HAVE_REPLICATION
+ THD* thd= current_thd;
+ longlong timeout_us;
+ String *gtid_pos = args[0]->val_str(&value);
+
+ if (arg_count==2 && !args[1]->null_value)
+ timeout_us= (longlong)(1e6*args[1]->val_real());
+ else
+ timeout_us= (longlong)-1;
+
+ result= rpl_global_gtid_waiting.wait_for_pos(thd, gtid_pos, timeout_us);
+#endif
+ return result;
+}
+
+
/**
Enables a session to wait on a condition until a timeout or a network
disconnect occurs.
diff --git a/sql/item_func.h b/sql/item_func.h
index 270c031955a..4da5bfa91bb 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -1643,6 +1643,22 @@ public:
};
+class Item_master_gtid_wait :public Item_int_func
+{
+ String value;
+public:
+ Item_master_gtid_wait(Item *a) :Item_int_func(a) {}
+ Item_master_gtid_wait(Item *a,Item *b) :Item_int_func(a,b) {}
+ longlong val_int();
+ const char *func_name() const { return "master_gtid_wait"; }
+ void fix_length_and_dec() { max_length=10+1+10+1+20+1; maybe_null=0;}
+ bool check_vcol_func_processor(uchar *int_arg)
+ {
+ return trace_unsupported_by_check_vcol_func_processor(func_name());
+ }
+};
+
+
/* Handling of user definable variables */
class user_var_entry;
diff --git a/sql/log.cc b/sql/log.cc
index f2fc7eb1554..33897073a3d 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -2975,7 +2975,7 @@ const char *MYSQL_LOG::generate_name(const char *log_name,
MYSQL_BIN_LOG::MYSQL_BIN_LOG(uint *sync_period)
- :reset_master_pending(false),
+ :reset_master_pending(false), mark_xid_done_waiting(0),
bytes_written(0), file_id(1), open_count(1),
group_commit_queue(0), group_commit_queue_busy(FALSE),
num_commits(0), num_group_commits(0),
@@ -3791,22 +3791,11 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
const char* save_name;
DBUG_ENTER("reset_logs");
- if (thd)
- ha_reset_logs(thd);
- /*
- We need to get both locks to be sure that no one is trying to
- write to the index log file.
- */
- mysql_mutex_lock(&LOCK_log);
- mysql_mutex_lock(&LOCK_index);
-
if (!is_relay_log)
{
if (init_state && !is_empty_state())
{
my_error(ER_BINLOG_MUST_BE_EMPTY, MYF(0));
- mysql_mutex_unlock(&LOCK_index);
- mysql_mutex_unlock(&LOCK_log);
DBUG_RETURN(1);
}
@@ -3815,11 +3804,29 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log,
This ensures that a binlog checkpoint will not try to write binlog
checkpoint events, which would be useless (as we are deleting the binlog
anyway) and could deadlock, as we are holding LOCK_log.
+
+ Wait for any mark_xid_done() calls that might be already running to
+ complete (mark_xid_done_waiting counter to drop to zero); we need to
+ do this before we take the LOCK_log to not deadlock.
*/
mysql_mutex_lock(&LOCK_xid_list);
reset_master_pending= true;
+ while (mark_xid_done_waiting > 0)
+ mysql_cond_wait(&COND_xid_list, &LOCK_xid_list);
mysql_mutex_unlock(&LOCK_xid_list);
+ }
+ if (thd)
+ ha_reset_logs(thd);
+ /*
+ We need to get both locks to be sure that no one is trying to
+ write to the index log file.
+ */
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_index);
+
+ if (!is_relay_log)
+ {
/*
We are going to nuke all binary log files.
Without binlog, we cannot XA recover prepared-but-not-committed
@@ -5503,6 +5510,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
}
if (err)
DBUG_RETURN(true);
+ thd->last_commit_gtid= gtid;
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional,
@@ -8899,9 +8907,13 @@ TC_LOG_BINLOG::mark_xid_done(ulong binlog_id, bool write_checkpoint)
locks in the opposite order.
*/
+ ++mark_xid_done_waiting;
mysql_mutex_unlock(&LOCK_xid_list);
mysql_mutex_lock(&LOCK_log);
mysql_mutex_lock(&LOCK_xid_list);
+ --mark_xid_done_waiting;
+ if (unlikely(reset_master_pending))
+ mysql_cond_signal(&COND_xid_list);
/* We need to reload current_binlog_id due to release/re-take of lock. */
current= current_binlog_id;
diff --git a/sql/log.h b/sql/log.h
index d6ae7bbb1bb..2577cc5225a 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -471,6 +471,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
checkpoint arrives - when all have arrived, RESET MASTER will complete.
*/
bool reset_master_pending;
+ ulong mark_xid_done_waiting;
/* LOCK_log and LOCK_index are inited by init_pthread_objects() */
mysql_mutex_t LOCK_index;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 808a0d47ac9..75495491980 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -866,6 +866,7 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
@@ -911,6 +912,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
+ { &key_LOCK_gtid_waiting, "gtid_waiting::LOCK_gtid_waiting", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -982,6 +984,7 @@ PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_prepare_ordered;
+PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
{
@@ -1026,7 +1029,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
- { &key_COND_prepare_ordered, "COND_prepare_ordered", 0}
+ { &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
+ { &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
@@ -1941,6 +1945,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
+ rpl_deinit_gtid_waiting();
rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
mysql_audit_finalize();
@@ -4400,6 +4405,7 @@ static int init_thread_environment()
#ifdef HAVE_REPLICATION
rpl_init_gtid_slave_state();
+ rpl_init_gtid_waiting();
#endif
DBUG_RETURN(0);
@@ -9432,6 +9438,8 @@ PSI_stage_info stage_binlog_stopping_background_thread= { 0, "Stopping binlog ba
PSI_stage_info stage_waiting_for_work_from_sql_thread= { 0, "Waiting for work from SQL thread", 0};
PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to commit", 0};
PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
+PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
+PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
#ifdef HAVE_PSI_INTERFACE
@@ -9548,7 +9556,9 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_for_the_slave_thread_to_advance_position,
& stage_waiting_for_work_from_sql_thread,
& stage_waiting_to_finalize_termination,
- & stage_waiting_to_get_readlock
+ & stage_waiting_to_get_readlock,
+ & stage_master_gtid_wait_primary,
+ & stage_master_gtid_wait
};
PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index dd80a6cf423..5e5eb16d7c3 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -267,6 +267,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
extern PSI_mutex_key key_TABLE_SHARE_LOCK_share, key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+extern PSI_mutex_key key_LOCK_gtid_waiting;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -296,6 +297,7 @@ extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
+extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
@@ -434,6 +436,8 @@ extern PSI_stage_info stage_binlog_stopping_background_thread;
extern PSI_stage_info stage_waiting_for_work_from_sql_thread;
extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit;
extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
+extern PSI_stage_info stage_master_gtid_wait_primary;
+extern PSI_stage_info stage_master_gtid_wait;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index a36a15b3c27..e51dee20c19 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
there will not be an attempt to delete the corresponding table row before
it is even committed.
*/
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
sql_print_warning("Slave: Out of memory during slave state maintenance. "
@@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
}
+static void
+rpl_slave_state_free_element(void *arg)
+{
+ struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
+ mysql_cond_destroy(&elem->COND_wait_gtid);
+ my_free(elem);
+}
+
+
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
}
@@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (!(elem= get_element(domain_id)))
return 1;
+ if (seq_no > elem->highest_seq_no)
+ elem->highest_seq_no= seq_no;
+ if (elem->gtid_waiter && elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
+ Signal (and remove) them. The waiter will handle all the processing
+ of all pending MASTER_GTID_WAIT(), so we do not slow down the
+ replication SQL thread.
+ */
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ elem->gtid_waiter= NULL;
+ mysql_cond_broadcast(&elem->COND_wait_gtid);
+ }
+
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
@@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id)
return NULL;
elem->list= NULL;
elem->domain_id= domain_id;
+ elem->highest_seq_no= 0;
+ elem->gtid_waiter= NULL;
+ mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
@@ -379,10 +406,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end;
}
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
if ((elem= get_element(gtid->domain_id)) == NULL)
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto end;
@@ -411,7 +438,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
cur->next= NULL;
elem->list= cur;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (!elist)
goto end;
@@ -471,9 +498,9 @@ end:
*/
if (elist)
{
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
}
ha_rollback_trans(thd, FALSE);
@@ -500,9 +527,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
sub_id= ++last_sub_id;
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return sub_id;
}
@@ -542,7 +569,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
@@ -577,19 +604,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
if ((res= (*cb)(&best_gtid, data)))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
@@ -660,11 +687,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
list_element *list;
uint64 best_sub_id;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return false;
}
@@ -682,7 +709,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
out_gtid->seq_no= list->seq_no;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return true;
}
@@ -812,7 +839,7 @@ rpl_slave_state::is_empty()
uint32 i;
bool result= true;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
@@ -822,7 +849,7 @@ rpl_slave_state::is_empty()
break;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return result;
}
@@ -1648,3 +1675,444 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
return 0;
}
+
+
+/*
+ Execute a MASTER_GTID_WAIT().
+ The position to wait for is in gtid_str in string form.
+ The timeout in microseconds is in timeout_us, zero means no timeout.
+
+ Returns:
+ 1 for error.
+ 0 for wait completed.
+ -1 for wait timed out.
+*/
+int
+gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
+{
+ int err;
+ rpl_gtid *wait_pos;
+ uint32 count, i;
+ struct timespec wait_until, *wait_until_ptr;
+
+ /* Wait for the empty position returns immediately. */
+ if (gtid_str->length() == 0)
+ return 0;
+
+ if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
+ &count)))
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+
+ if (timeout_us >= 0)
+ {
+ set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
+ wait_until_ptr= &wait_until;
+ }
+ else
+ wait_until_ptr= NULL;
+ err= 0;
+ for (i= 0; i < count; ++i)
+ {
+ if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
+ break;
+ }
+ my_free(wait_pos);
+ return err;
+}
+
+
+void
+gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
+{
+ queue_element *qe;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ if (queue_empty(&he->queue))
+ return;
+ qe= (queue_element *)queue_top(&he->queue);
+ qe->do_small_wait= true;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+}
+
+void
+gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
+ gtid_waiting::hash_element *he)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ for (;;)
+ {
+ queue_element *qe;
+
+ if (queue_empty(&he->queue))
+ break;
+ qe= (queue_element *)queue_top(&he->queue);
+ if (qe->wait_seq_no > wakeup_seq_no)
+ break;
+ DBUG_ASSERT(!qe->done);
+ queue_remove_top(&he->queue);
+ qe->done= true;;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+ }
+}
+
+
+/*
+ Execute a MASTER_GTID_WAIT() for one specific domain.
+
+ The implementation is optimised primarily for (1) minimal performance impact
+ on the slave replication threads, and secondarily for (2) quick performance
+ of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
+ read to clients in an async replication read-scaleout scenario.
+
+ To achieve (1), we have a "small" wait and a "large" wait. The small wait
+ contends with the replication threads on the lock on the gtid_slave_pos, so
+ only minimal processing is done under that lock, and only a single waiter at
+ a time does the small wait.
+
+ If there is already a small waiter, a new thread will either replace the
+ small waiter (if it needs to wait for an earlier sequence number), or
+ instead do a "large" wait.
+
+ Once awoken on the small wait, the waiting thread releases the lock shared
+ with the SQL threads quickly, and then processes all waiters currently doing
+ the large wait using a different lock that does not impact replication.
+
+ This way, the SQL threads only need to do a single check + possibly a
+ pthread_cond_signal() when updating the gtid_slave_state, and the time that
+ non-SQL threads contend for the lock on gtid_slave_state is minimized.
+
+ There is always at least one thread that has the responsibility to ensure
+ that there is a small waiter; this thread has queue_element::do_small_wait
+ set to true. This thread will do the small wait until it is done, at which
+ point it will make sure to pass on the responsibility to another thread.
+ Normally only one thread has do_small_wait==true, but it can occasionally
+ happen that there is more than one, when threads race one another for the
+ lock on the small wait (this results in slightly increased activity on the
+ small lock but is otherwise harmless).
+
+ Returns:
+ 0 Wait completed normally
+ -1 Wait completed due to timeout
+ 1 An error (my_error() will have been called to set the error in the da)
+*/
+int
+gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
+ struct timespec *wait_until)
+{
+ bool timed_out= false;
+#ifdef HAVE_REPLICATION
+ queue_element elem;
+ uint32 domain_id= wait_gtid->domain_id;
+ uint64 seq_no= wait_gtid->seq_no;
+ hash_element *he;
+ rpl_slave_state::element *slave_state_elem= NULL;
+ PSI_stage_info old_stage;
+ bool did_enter_cond= false;
+
+ elem.wait_seq_no= seq_no;
+ elem.thd= thd;
+ elem.done= false;
+
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ if (!(he= get_entry(wait_gtid->domain_id)))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+ /*
+ If there is already another waiter with seq_no no larger than our own,
+ we are sure that there is already a small waiter that will wake us up
+ (or later pass the small wait responsibility to us). So in this case, we
+ do not need to touch the small wait lock at all.
+ */
+ elem.do_small_wait=
+ (queue_empty(&he->queue) ||
+ ((queue_element *)queue_top(&he->queue))->wait_seq_no > seq_no);
+
+ if (register_in_wait_queue(thd, wait_gtid, he, &elem))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+ /*
+ Loop, doing either the small or large wait as appropriate, until either
+ the position waited for is reached, or we get a kill or timeout.
+ */
+ for (;;)
+ {
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (elem.do_small_wait)
+ {
+ uint64 wakeup_seq_no;
+ queue_element *cur_waiter;
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ /*
+ The elements in the gtid_slave_state_hash are never re-allocated once
+ they enter the hash, so we do not need to re-do the lookup after releasing
+ and re-aquiring the lock.
+ */
+ if (!slave_state_elem &&
+ !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ remove_from_wait_queue(he, &elem);
+ promote_new_waiter(he);
+ if (did_enter_cond)
+ thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
+ {
+ /*
+ We do not have to wait. (We will be removed from the wait queue when
+ we call process_wait_hash() below.
+ */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ }
+ else if ((cur_waiter= slave_state_elem->gtid_waiter) &&
+ slave_state_elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ There is already a suitable small waiter, go do the large wait.
+ (Normally we would not have needed to check the small wait in this
+ case, but it can happen if we race with another thread for the small
+ lock).
+ */
+ elem.do_small_wait= false;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ }
+ else
+ {
+ /*
+ We have to do the small wait ourselves (stealing it from any thread
+ that might already be waiting for a later seq_no).
+ */
+ slave_state_elem->gtid_waiter= &elem;
+ slave_state_elem->min_wait_seq_no= seq_no;
+ if (cur_waiter)
+ {
+ /* We stole the wait, so wake up the old waiting thread. */
+ mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ }
+
+ /* Release the large lock, and do the small wait. */
+ if (did_enter_cond)
+ {
+ thd->EXIT_COND(&old_stage);
+ did_enter_cond= false;
+ }
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ thd->ENTER_COND(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ &stage_master_gtid_wait_primary, &old_stage);
+ do
+ {
+ if (thd->check_killed())
+ break;
+ else if (wait_until)
+ {
+ int err=
+ mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ {
+ timed_out= true;
+ break;
+ }
+ }
+ else
+ mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state);
+ } while (slave_state_elem->gtid_waiter == &elem);
+ wakeup_seq_no= slave_state_elem->highest_seq_no;
+ /*
+ If we aborted due to timeout or kill, remove us as waiter.
+
+ If we were replaced by another waiter with a smaller seq_no, then we
+ no longer have responsibility for the small wait.
+ */
+ if ((cur_waiter= slave_state_elem->gtid_waiter))
+ {
+ if (cur_waiter == &elem)
+ slave_state_elem->gtid_waiter= NULL;
+ else if (slave_state_elem->min_wait_seq_no <= seq_no)
+ elem.do_small_wait= false;
+ }
+ thd->EXIT_COND(&old_stage);
+
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ }
+
+ /*
+ Note that hash_entry pointers do not change once allocated, so we do
+ not need to lookup `he' again after re-aquiring LOCK_gtid_waiting.
+ */
+ process_wait_hash(wakeup_seq_no, he);
+ }
+ else
+ {
+ /* Do the large wait. */
+ if (!did_enter_cond)
+ {
+ thd->ENTER_COND(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
+ &stage_master_gtid_wait, &old_stage);
+ did_enter_cond= true;
+ }
+ while (!elem.done && !thd->check_killed())
+ {
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ if (wait_until)
+ {
+ int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
+ &LOCK_gtid_waiting, wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ timed_out= true;
+ }
+ else
+ mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
+ thd_wait_end(thd);
+ if (elem.do_small_wait || timed_out)
+ break;
+ }
+ }
+
+ if ((thd->killed || timed_out) && !elem.done)
+ {
+ /* Aborted, so remove ourselves from the hash. */
+ remove_from_wait_queue(he, &elem);
+ elem.done= true;
+ }
+ if (elem.done)
+ {
+ /*
+ If our wait is done, but we have (or were passed) responsibility for
+ the small wait, then we need to pass on that task to someone else.
+ */
+ if (elem.do_small_wait)
+ promote_new_waiter(he);
+ break;
+ }
+ }
+
+ if (did_enter_cond)
+ thd->EXIT_COND(&old_stage);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ if (thd->killed)
+ thd->send_kill_message();
+#endif /* HAVE_REPLICATION */
+ return timed_out ? -1 : 0;
+}
+
+
+static void
+free_hash_element(void *p)
+{
+ gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
+ delete_queue(&e->queue);
+ my_free(e);
+}
+
+
+void
+gtid_waiting::init()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(hash_element, domain_id), sizeof(uint32), NULL,
+ free_hash_element, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
+}
+
+
+void
+gtid_waiting::destroy()
+{
+ mysql_mutex_destroy(&LOCK_gtid_waiting);
+ my_hash_free(&hash);
+}
+
+
+static int
+cmp_queue_elem(void *, uchar *a, uchar *b)
+{
+ uint64 seq_no_a= *(uint64 *)a;
+ uint64 seq_no_b= *(uint64 *)b;
+ if (seq_no_a < seq_no_b)
+ return -1;
+ else if (seq_no_a == seq_no_b)
+ return 0;
+ else
+ return 1;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::get_entry(uint32 domain_id)
+{
+ hash_element *e;
+
+ if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ return e;
+
+ if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e));
+ return NULL;
+ }
+
+ if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
+ cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(e);
+ return NULL;
+ }
+ e->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)e))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete_queue(&e->queue);
+ my_free(e);
+ return NULL;
+ }
+ return e;
+}
+
+
+int
+gtid_waiting::register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid,
+ gtid_waiting::hash_element *he,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (queue_insert_safe(&he->queue, (uchar *)elem))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ return 0;
+}
+
+
+void
+gtid_waiting::remove_from_wait_queue(gtid_waiting::hash_element *he,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ queue_remove(&he->queue, elem->queue_idx);
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index b0bc54900e7..54f352661a7 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -16,6 +16,10 @@
#ifndef RPL_GTID_H
#define RPL_GTID_H
+#include "hash.h"
+#include "queues.h"
+
+
/* Definitions for MariaDB global transaction ID (GTID). */
@@ -37,6 +41,57 @@ enum enum_gtid_skip_type {
/*
+ Structure to keep track of threads waiting in MASTER_GTID_WAIT().
+
+ Since replication is (mostly) single-threaded, we want to minimise the
+ performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
+ are careful to keep the common lock between replication threads and
+ MASTER_GTID_WAIT threads held for as short as possible. We keep only
+ a single thread waiting to be notified by the replication threads; this
+ thread then handles all the (potentially heavy) lifting of dealing with
+ all current waiting threads.
+*/
+struct gtid_waiting {
+ /* Elements in the hash, basically a priority queue for each domain. */
+ struct hash_element {
+ QUEUE queue;
+ uint32 domain_id;
+ };
+ /* A priority queue to handle waiters in one domain in seq_no order. */
+ struct queue_element {
+ uint64 wait_seq_no;
+ THD *thd;
+ int queue_idx;
+ /*
+ do_small_wait is true if we have responsibility for ensuring that there
+ is a small waiter.
+ */
+ bool do_small_wait;
+ /*
+ The flag `done' is set when the wait is completed (either due to reaching
+ the position waited for, or due to timeout or kill). The queue_element
+ is in the queue if and only if `done' is true.
+ */
+ bool done;
+ };
+
+ mysql_mutex_t LOCK_gtid_waiting;
+ HASH hash;
+
+ void init();
+ void destroy();
+ hash_element *get_entry(uint32 domain_id);
+ int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
+ void promote_new_waiter(gtid_waiting::hash_element *he);
+ int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
+ void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
+ int register_in_wait_queue(THD *thd, rpl_gtid *wait_gtid, hash_element *he,
+ queue_element *elem);
+ void remove_from_wait_queue(hash_element *he, queue_element *elem);
+};
+
+
+/*
Replication slave state.
For every independent replication stream (identified by domain_id), this
@@ -61,6 +116,20 @@ struct rpl_slave_state
{
struct list_element *list;
uint32 domain_id;
+ /* Highest seq_no seen so far in this domain. */
+ uint64 highest_seq_no;
+ /*
+ If this is non-NULL, then it is the waiter responsible for the small
+ wait in MASTER_GTID_WAIT().
+ */
+ gtid_waiting::queue_element *gtid_waiter;
+ /*
+ If gtid_waiter is non-NULL, then this is the seq_no that its
+ MASTER_GTID_WAIT() is waiting on. When we reach this seq_no, we need to
+ signal the waiter on COND_wait_gtid.
+ */
+ uint64 min_wait_seq_no;
+ mysql_cond_t COND_wait_gtid;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add(list_element *l)
@@ -99,9 +168,6 @@ struct rpl_slave_state
bool in_statement);
bool is_empty();
- void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
- void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
-
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
@@ -204,6 +270,7 @@ struct slave_connection_state
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
};
+
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
bool *first);
extern int gtid_check_rpl_slave_state_table(TABLE *table);
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index cc65856e37b..7e9a4aa6239 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -765,6 +765,27 @@ rpl_parallel::wait_for_done()
}
+bool
+rpl_parallel::workers_idle()
+{
+ struct rpl_parallel_entry *e;
+ uint32 i, max_i;
+
+ max_i= domain_hash.records;
+ for (i= 0; i < max_i; ++i)
+ {
+ bool active;
+ e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ active= e->current_sub_id > e->last_committed_sub_id;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ if (active)
+ break;
+ }
+ return (i == max_i);
+}
+
+
/*
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can execute the query.
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 0e88e09652b..019a354c57d 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -117,6 +117,7 @@ struct rpl_parallel {
void reset();
rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done();
+ bool workers_idle();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size);
};
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index c5c59f3aebb..c9e8c79d5fc 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -37,6 +37,8 @@ static int count_relay_log_space(Relay_log_info* rli);
domain).
*/
rpl_slave_state rpl_global_gtid_slave_state;
+/* Object used for MASTER_GTID_WAIT(). */
+gtid_waiting rpl_global_gtid_waiting;
// Defined in slave.cc
@@ -56,7 +58,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
is_fake(FALSE),
#endif
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
- last_master_timestamp(0), slave_skip_counter(0),
+ last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
@@ -1287,9 +1289,14 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
(probably ok - except in some very rare cases, only consequence
is that value may take some time to display in
Seconds_Behind_Master - not critical).
+
+ In parallel replication, we take care to not set last_master_timestamp
+ backwards, in case of out-of-order calls here.
*/
if (!(event_creation_time == 0 &&
- IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)))
+ IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)) &&
+ !(rgi->is_parallel_exec && event_creation_time <= last_master_timestamp)
+ )
last_master_timestamp= event_creation_time;
}
DBUG_VOID_RETURN;
@@ -1312,9 +1319,9 @@ rpl_load_gtid_slave_state(THD *thd)
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state.loaded;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);
@@ -1414,10 +1421,10 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (rpl_global_gtid_slave_state.loaded)
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
goto end;
}
@@ -1429,7 +1436,7 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.sub_id,
tmp_entry.gtid.seq_no)))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
@@ -1442,14 +1449,14 @@ rpl_load_gtid_slave_state(THD *thd)
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
}
rpl_global_gtid_slave_state.loaded= true;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
err= 0; /* Clear HA_ERR_END_OF_FILE */
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index ff2ffd0b366..3f95849a926 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -221,6 +221,12 @@ public:
bool sql_force_rotate_relay;
time_t last_master_timestamp;
+ /*
+ The SQL driver thread sets this true while it is waiting at the end of the
+ relay log for more events to arrive. SHOW SLAVE STATUS uses this to report
+ Seconds_Behind_Master as zero while the SQL thread is so waiting.
+ */
+ bool sql_thread_caught_up;
void clear_until_condition();
@@ -702,6 +708,7 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
+extern gtid_waiting rpl_global_gtid_waiting;
int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 160bf2e6c87..585a70af33d 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -7030,7 +7030,7 @@ ER_UNTIL_REQUIRES_USING_GTID
ER_GTID_STRICT_OUT_OF_ORDER
eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled."
ER_GTID_START_FROM_BINLOG_HOLE
- eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled"
+ eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though a subsequent sequence number does exist), and GTID strict mode is enabled"
ER_SLAVE_UNEXPECTED_MASTER_SWITCH
eng "Unexpected GTID received from master after reconnect. This normally indicates that the master server was replaced without restarting the slave threads. %s"
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
diff --git a/sql/slave.cc b/sql/slave.cc
index a2ed89e0c9c..f281bdf06a2 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -2628,8 +2628,24 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
mi->rli.slave_running)
{
- long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
- - mi->clock_diff_with_master);
+ long time_diff;
+ bool idle;
+ time_t stamp= mi->rli.last_master_timestamp;
+
+ if (!stamp)
+ idle= true;
+ else
+ {
+ idle= mi->rli.sql_thread_caught_up;
+ if (opt_slave_parallel_threads > 0 && idle &&
+ !mi->rli.parallel.workers_idle())
+ idle= false;
+ }
+ if (idle)
+ time_diff= 0;
+ else
+ {
+ time_diff= ((long)(time(0) - stamp) - mi->clock_diff_with_master);
/*
Apparently on some systems time_diff can be <0. Here are possible
reasons related to MySQL:
@@ -2645,13 +2661,15 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
slave is 2. At SHOW SLAVE STATUS time, assume that the difference
between timestamp of slave and rli->last_master_timestamp is 0
(i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
- This confuses users, so we don't go below 0: hence the MY_MAX().
+ This confuses users, so we don't go below 0.
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
special marker to say "consider we have caught up".
*/
- protocol->store((longlong)(mi->rli.last_master_timestamp ?
- MY_MAX(0, time_diff) : 0));
+ if (time_diff < 0)
+ time_diff= 0;
+ }
+ protocol->store((longlong)time_diff);
}
else
{
@@ -6120,6 +6138,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
if (hot_log)
mysql_mutex_unlock(log_lock);
+ rli->sql_thread_caught_up= false;
DBUG_RETURN(ev);
}
if (opt_reckless_slave) // For mysql-test
@@ -6157,12 +6176,10 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
Seconds_Behind_Master would be zero only when master has no
more updates in binlog for slave. The heartbeat can be sent
in a (small) fraction of slave_net_timeout. Until it's done
- rli->last_master_timestamp is temporarely (for time of
- waiting for the following event) reset whenever EOF is
- reached.
+ rli->sql_thread_caught_up is temporarely (for time of waiting for
+ the following event) set whenever EOF is reached.
*/
- time_t save_timestamp= rli->last_master_timestamp;
- rli->last_master_timestamp= 0;
+ rli->sql_thread_caught_up= true;
DBUG_ASSERT(rli->relay_log.get_open_count() ==
rli->cur_log_old_open_count);
@@ -6288,7 +6305,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
rli->relay_log.wait_for_update_relay_log(rli->sql_driver_thd);
// re-acquire data lock since we released it earlier
mysql_mutex_lock(&rli->data_lock);
- rli->last_master_timestamp= save_timestamp;
+ rli->sql_thread_caught_up= false;
continue;
}
/*
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index ee0cc43dca2..af908c0e71f 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1386,6 +1386,7 @@ void THD::init(void)
set_status_var_init();
bzero((char *) &org_status_var, sizeof(org_status_var));
start_bytes_received= 0;
+ last_commit_gtid.seq_no= 0;
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index e3668321bec..81040624353 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -55,6 +55,7 @@ void set_thd_stage_info(void *thd,
(thd)->enter_stage(& stage, NULL, __func__, __FILE__, __LINE__)
#include "my_apc.h"
+#include "rpl_gtid.h"
class Reprepare_observer;
class Relay_log_info;
@@ -3661,6 +3662,12 @@ private:
*/
LEX_STRING invoker_user;
LEX_STRING invoker_host;
+
+ /* Protect against add/delete of temporary tables in parallel replication */
+ void rgi_lock_temporary_tables();
+ void rgi_unlock_temporary_tables();
+ bool rgi_have_temporary_tables();
+public:
/*
Flag, mutex and condition for a thread to wait for a signal from another
thread.
@@ -3671,12 +3678,12 @@ private:
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
mysql_cond_t COND_wakeup_ready;
+ /*
+ The GTID assigned to the last commit. If no GTID was assigned to any commit
+ so far, this is indicated by last_commit_gtid.seq_no == 0.
+ */
+ rpl_gtid last_commit_gtid;
- /* Protect against add/delete of temporary tables in parallel replication */
- void rgi_lock_temporary_tables();
- void rgi_unlock_temporary_tables();
- bool rgi_have_temporary_tables();
-public:
inline void lock_temporary_tables()
{
if (rgi_slave)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index b2b111dadc3..418e0a1549f 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -3966,6 +3966,20 @@ rpl_deinit_gtid_slave_state()
}
+void
+rpl_init_gtid_waiting()
+{
+ rpl_global_gtid_waiting.init();
+}
+
+
+void
+rpl_deinit_gtid_waiting()
+{
+ rpl_global_gtid_waiting.destroy();
+}
+
+
/*
Format the current GTID state as a string, for returning the value of
@@global.gtid_slave_pos.
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index 018b23673ee..7f7751b8f44 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -72,6 +72,8 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
#endif
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
+void rpl_init_gtid_waiting();
+void rpl_deinit_gtid_waiting();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index d7c30f532e0..c87368b7080 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1696,6 +1696,33 @@ static Sys_var_gtid_binlog_state Sys_gtid_binlog_state(
GLOBAL_VAR(opt_gtid_binlog_state_dummy), NO_CMD_LINE);
+static Sys_var_last_gtid Sys_last_gtid(
+ "last_gtid", "The GTID of the last commit (if binlogging was enabled), "
+ "or the empty string if none.",
+ READ_ONLY sys_var::ONLY_SESSION, NO_CMD_LINE);
+
+
+uchar *
+Sys_var_last_gtid::session_value_ptr(THD *thd, LEX_STRING *base)
+{
+ char buf[10+1+10+1+20+1];
+ String str(buf, sizeof(buf), system_charset_info);
+ char *p;
+ bool first= true;
+
+ str.length(0);
+ if ((thd->last_commit_gtid.seq_no > 0 &&
+ rpl_slave_state_tostring_helper(&str, &thd->last_commit_gtid, &first)) ||
+ !(p= thd->strmake(str.ptr(), str.length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return (uchar *)p;
+}
+
+
static bool
check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
{
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 6c0228fd6a0..495099b9c59 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -2239,3 +2239,53 @@ public:
}
uchar *global_value_ptr(THD *thd, LEX_STRING *base);
};
+
+
+/**
+ Class for @@session.last_gtid.
+*/
+class Sys_var_last_gtid: public sys_var
+{
+public:
+ Sys_var_last_gtid(const char *name_arg,
+ const char *comment, int flag_args, CMD_LINE getopt)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, 0, getopt.id,
+ getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, NULL)
+ {
+ option.var_type= GET_STR;
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool check_update_type(Item_result type) {
+ DBUG_ASSERT(false);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base);
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(false);
+ return NULL;
+ }
+};