summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/mysql.cc1
-rw-r--r--include/queues.h1
-rw-r--r--mysql-test/include/sync_with_master_gtid.inc47
-rw-r--r--mysql-test/suite/perfschema/r/all_instances.result1
-rw-r--r--mysql-test/suite/perfschema/r/dml_setup_instruments.result2
-rw-r--r--mysql-test/suite/rpl/r/rpl_gtid_basic.result110
-rw-r--r--mysql-test/suite/rpl/t/rpl_gtid_basic.test164
-rw-r--r--mysql-test/suite/sys_vars/r/last_gtid_basic.result9
-rw-r--r--mysql-test/suite/sys_vars/t/last_gtid_basic.test9
-rw-r--r--sql/item_create.cc55
-rw-r--r--sql/item_func.cc28
-rw-r--r--sql/item_func.h16
-rw-r--r--sql/log.cc1
-rw-r--r--sql/mysqld.cc8
-rw-r--r--sql/mysqld.h2
-rw-r--r--sql/rpl_gtid.cc480
-rw-r--r--sql/rpl_gtid.h59
-rw-r--r--sql/rpl_rli.cc16
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/sql_class.cc1
-rw-r--r--sql/sql_class.h17
-rw-r--r--sql/sql_repl.cc14
-rw-r--r--sql/sql_repl.h2
-rw-r--r--sql/sys_vars.cc27
-rw-r--r--sql/sys_vars.h50
26 files changed, 1084 insertions, 39 deletions
diff --git a/client/mysql.cc b/client/mysql.cc
index 4f9b4c3bc92..fa0dd5faa1c 100644
--- a/client/mysql.cc
+++ b/client/mysql.cc
@@ -915,6 +915,7 @@ static COMMANDS commands[] = {
{ "MAKE_SET", 0, 0, 0, ""},
{ "MAKEDATE", 0, 0, 0, ""},
{ "MAKETIME", 0, 0, 0, ""},
+ { "MASTER_GTID_WAIT", 0, 0, 0, ""},
{ "MASTER_POS_WAIT", 0, 0, 0, ""},
{ "MAX", 0, 0, 0, ""},
{ "MBRCONTAINS", 0, 0, 0, ""},
diff --git a/include/queues.h b/include/queues.h
index 4fef72b149c..f341bbb8148 100644
--- a/include/queues.h
+++ b/include/queues.h
@@ -51,6 +51,7 @@ typedef struct st_queue {
#define queue_first_element(queue) 1
#define queue_last_element(queue) (queue)->elements
+#define queue_empty(queue) ((queue)->elements == 0)
#define queue_top(queue) ((queue)->root[1])
#define queue_element(queue,index) ((queue)->root[index])
#define queue_end(queue) ((queue)->root[(queue)->elements])
diff --git a/mysql-test/include/sync_with_master_gtid.inc b/mysql-test/include/sync_with_master_gtid.inc
new file mode 100644
index 00000000000..7512c045c6b
--- /dev/null
+++ b/mysql-test/include/sync_with_master_gtid.inc
@@ -0,0 +1,47 @@
+# ==== Purpose ====
+#
+# Wait until the slave has reached a certain GTID position.
+# Similar to --sync_with_master, but using GTID instead of old-style
+# binlog file/offset coordinates.
+#
+#
+# ==== Usage ====
+#
+# --let $master_pos= `SELECT @@GLOBAL.gtid_binlog_pos`
+# [--let $slave_timeout= NUMBER]
+# [--let $rpl_debug= 1]
+# --source include/sync_with_master_gtid.inc
+#
+# Syncs slave to the specified GTID position.
+#
+# Must be called on the slave.
+#
+# Parameters:
+# $master_pos
+# The GTID position to sync to. Typically obtained from
+# @@GLOBAL.gtid_binlog_pos on the master.
+#
+# $slave_timeout
+# Timeout in seconds. The default is 2 minutes.
+#
+# $rpl_debug
+# See include/rpl_init.inc
+
+--let $include_filename= sync_with_master_gtid.inc
+--source include/begin_include_file.inc
+
+let $_slave_timeout= $slave_timeout;
+if (!$_slave_timeout)
+{
+ let $_slave_timeout= 120;
+}
+
+--let $_result= `SELECT master_gtid_wait('$master_pos', $_slave_timeout)`
+if ($_result == -1)
+{
+ --let $_current_gtid_pos= `SELECT @@GLOBAL.gtid_slave_pos`
+ --die Timeout in master_gtid_wait('$master_pos', $_slave_timeout), current slave GTID position is: $_current_gtid_pos.
+}
+
+--let $include_filename= sync_with_master_gtid.inc
+--source include/end_include_file.inc
diff --git a/mysql-test/suite/perfschema/r/all_instances.result b/mysql-test/suite/perfschema/r/all_instances.result
index 266c3eb5a9c..fd20f27c929 100644
--- a/mysql-test/suite/perfschema/r/all_instances.result
+++ b/mysql-test/suite/perfschema/r/all_instances.result
@@ -37,6 +37,7 @@ wait/synch/mutex/mysys/THR_LOCK_threads
wait/synch/mutex/mysys/TMPDIR_mutex
wait/synch/mutex/sql/Cversion_lock
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state
+wait/synch/mutex/sql/gtid_waiting::LOCK_gtid_waiting
wait/synch/mutex/sql/hash_filo::lock
wait/synch/mutex/sql/LOCK_active_mi
wait/synch/mutex/sql/LOCK_audit_mask
diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
index 38c02cc2bf4..17087264e7c 100644
--- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result
+++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
@@ -7,13 +7,13 @@ NAME ENABLED TIMED
wait/synch/mutex/sql/Cversion_lock YES YES
wait/synch/mutex/sql/Delayed_insert::mutex YES YES
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state YES YES
+wait/synch/mutex/sql/gtid_waiting::LOCK_gtid_waiting YES YES
wait/synch/mutex/sql/hash_filo::lock YES YES
wait/synch/mutex/sql/HA_DATA_PARTITION::LOCK_auto_inc YES YES
wait/synch/mutex/sql/LOCK_active_mi YES YES
wait/synch/mutex/sql/LOCK_audit_mask YES YES
wait/synch/mutex/sql/LOCK_binlog_state YES YES
wait/synch/mutex/sql/LOCK_commit_ordered YES YES
-wait/synch/mutex/sql/LOCK_connection_count YES YES
select * from performance_schema.setup_instruments
where name like 'Wait/Synch/Rwlock/sql/%'
and name not in ('wait/synch/rwlock/sql/CRYPTO_dynlock_value::lock')
diff --git a/mysql-test/suite/rpl/r/rpl_gtid_basic.result b/mysql-test/suite/rpl/r/rpl_gtid_basic.result
index ecad4a6acf1..d15de315914 100644
--- a/mysql-test/suite/rpl/r/rpl_gtid_basic.result
+++ b/mysql-test/suite/rpl/r/rpl_gtid_basic.result
@@ -197,9 +197,119 @@ CREATE TABLE t1 (a INT PRIMARY KEY);
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (1);
include/start_slave.inc
+include/sync_with_master_gtid.inc
SELECT * FROM t1;
a
1
Gtid_IO_Pos = '0-1-100'
+*** Test @@LAST_GTID and MASTER_GTID_WAIT() ***
+DROP TABLE t1;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+include/stop_slave.inc
+SELECT @@last_gtid;
+@@last_gtid
+
+SET gtid_seq_no=110;
+SELECT @@last_gtid;
+@@last_gtid
+
+BEGIN;
+SELECT @@last_gtid;
+@@last_gtid
+
+INSERT INTO t1 VALUES (2);
+SELECT @@last_gtid;
+@@last_gtid
+
+COMMIT;
+SELECT @@last_gtid;
+@@last_gtid
+0-1-110
+SET @pos= '0-1-110';
+SELECT master_gtid_wait(NULL);
+master_gtid_wait(NULL)
+NULL
+SELECT master_gtid_wait('', NULL);
+master_gtid_wait('', NULL)
+0
+SELECT master_gtid_wait(@pos, 0.5);
+master_gtid_wait(@pos, 0.5)
+-1
+SELECT * FROM t1 ORDER BY a;
+a
+SELECT master_gtid_wait(@pos);
+include/start_slave.inc
+master_gtid_wait(@pos)
+0
+SELECT * FROM t1 ORDER BY a;
+a
+2
+include/stop_slave.inc
+SET gtid_domain_id= 1;
+INSERT INTO t1 VALUES (3);
+SET @pos= '1-1-1,0-1-110';
+SELECT master_gtid_wait(@pos, 0);
+master_gtid_wait(@pos, 0)
+-1
+SELECT * FROM t1 WHERE a >= 3;
+a
+SELECT master_gtid_wait(@pos, -1);
+include/start_slave.inc
+master_gtid_wait(@pos, -1)
+0
+SELECT * FROM t1 WHERE a >= 3;
+a
+3
+SELECT master_gtid_wait('1-1-1', 0);
+master_gtid_wait('1-1-1', 0)
+0
+SELECT master_gtid_wait('2-1-1,1-1-4,0-1-110');
+SELECT master_gtid_wait('0-1-1000', 0.5);
+SELECT master_gtid_wait('0-1-2000');
+SELECT master_gtid_wait('2-1-10');
+SELECT master_gtid_wait('2-1-5', 1);
+SELECT master_gtid_wait('2-1-5');
+SELECT master_gtid_wait('2-1-10');
+SELECT master_gtid_wait('2-1-5,1-1-4,0-1-110');
+SELECT master_gtid_wait('2-1-2');
+SELECT master_gtid_wait('1-1-1');
+master_gtid_wait('1-1-1')
+0
+SELECT master_gtid_wait('0-1-109');
+SELECT master_gtid_wait('2-1-2', 0.5);
+master_gtid_wait('2-1-2', 0.5)
+-1
+KILL QUERY 22;
+ERROR 70100: Query execution was interrupted
+SET gtid_domain_id=2;
+SET gtid_seq_no=2;
+INSERT INTO t1 VALUES (4);
+master_gtid_wait('2-1-2')
+0
+KILL CONNECTION 25;
+ERROR HY000: Lost connection to MySQL server during query
+SET gtid_domain_id=1;
+SET gtid_seq_no=4;
+INSERT INTO t1 VALUES (5);
+SET gtid_domain_id=2;
+SET gtid_seq_no=5;
+INSERT INTO t1 VALUES (6);
+master_gtid_wait('2-1-5,1-1-4,0-1-110')
+0
+master_gtid_wait('2-1-1,1-1-4,0-1-110')
+0
+master_gtid_wait('0-1-1000', 0.5)
+-1
+master_gtid_wait('2-1-5', 1)
+0
+master_gtid_wait('0-1-109')
+0
+SET gtid_domain_id=2;
+SET gtid_seq_no=10;
+INSERT INTO t1 VALUES (7);
+master_gtid_wait('2-1-10')
+0
+master_gtid_wait('2-1-10')
+0
DROP TABLE t1;
include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_gtid_basic.test b/mysql-test/suite/rpl/t/rpl_gtid_basic.test
index 687c0d62cb1..dff7609cb99 100644
--- a/mysql-test/suite/rpl/t/rpl_gtid_basic.test
+++ b/mysql-test/suite/rpl/t/rpl_gtid_basic.test
@@ -199,14 +199,174 @@ INSERT INTO t1 VALUES (1);
# We cannot just use sync_with_master as we've done RESET MASTER, so
# slave old-style position is wrong.
# So sync on gtid position instead.
---let $wait_condition= SELECT @@GLOBAL.gtid_binlog_pos = '$master_pos'
---source include/wait_condition.inc
+--source include/sync_with_master_gtid.inc
SELECT * FROM t1;
# Check that the IO gtid position in SHOW SLAVE STATUS is also correct.
--let $status_items= Gtid_IO_Pos
--source include/show_slave_status.inc
+--echo *** Test @@LAST_GTID and MASTER_GTID_WAIT() ***
+
+--connection server_1
+DROP TABLE t1;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+--source include/stop_slave.inc
+
+--connect (m1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SELECT @@last_gtid;
+SET gtid_seq_no=110;
+SELECT @@last_gtid;
+BEGIN;
+SELECT @@last_gtid;
+INSERT INTO t1 VALUES (2);
+SELECT @@last_gtid;
+COMMIT;
+SELECT @@last_gtid;
+--let $pos= `SELECT @@gtid_binlog_pos`
+
+--connect (s1,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+eval SET @pos= '$pos';
+# Check NULL argument.
+SELECT master_gtid_wait(NULL);
+# Check empty argument returns immediately.
+SELECT master_gtid_wait('', NULL);
+# Let's check that we get a timeout
+SELECT master_gtid_wait(@pos, 0.5);
+SELECT * FROM t1 ORDER BY a;
+# Now actually wait until the slave reaches the position
+send SELECT master_gtid_wait(@pos);
+
+--connection server_2
+--source include/start_slave.inc
+
+--connection s1
+reap;
+SELECT * FROM t1 ORDER BY a;
+
+# Test waiting on a domain that does not exist yet.
+--source include/stop_slave.inc
+
+--connection server_1
+SET gtid_domain_id= 1;
+INSERT INTO t1 VALUES (3);
+--let $pos= `SELECT @@gtid_binlog_pos`
+
+--connection s1
+eval SET @pos= '$pos';
+SELECT master_gtid_wait(@pos, 0);
+SELECT * FROM t1 WHERE a >= 3;
+send SELECT master_gtid_wait(@pos, -1);
+
+--connection server_2
+--source include/start_slave.inc
+
+--connection s1
+reap;
+SELECT * FROM t1 WHERE a >= 3;
+# Waiting for only part of the position.
+SELECT master_gtid_wait('1-1-1', 0);
+
+# Now test a lot of parallel master_gtid_wait() calls, completing in different
+# order, and some of which time out or get killed on the way.
+
+--connection s1
+send SELECT master_gtid_wait('2-1-1,1-1-4,0-1-110');
+
+--connect (s2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+# This will time out.
+send SELECT master_gtid_wait('0-1-1000', 0.5);
+
+--connect (s3,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+# This one we will kill
+--let $kill1_id= `SELECT connection_id()`
+send SELECT master_gtid_wait('0-1-2000');
+
+--connect (s4,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-10');
+
+--connect (s5,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-5', 1);
+
+# This one we will kill also.
+--connect (s6,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+--let $kill2_id= `SELECT connection_id()`
+send SELECT master_gtid_wait('2-1-5');
+
+--connect (s7,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-10');
+
+--connect (s8,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-5,1-1-4,0-1-110');
+
+--connect (s9,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-2');
+
+--connection server_2
+# This one completes immediately.
+SELECT master_gtid_wait('1-1-1');
+
+--connect (s10,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('0-1-109');
+
+--connection server_2
+# This one should time out.
+SELECT master_gtid_wait('2-1-2', 0.5);
+
+eval KILL QUERY $kill1_id;
+--connection s3
+--error ER_QUERY_INTERRUPTED
+reap;
+
+--connection server_1
+SET gtid_domain_id=2;
+SET gtid_seq_no=2;
+INSERT INTO t1 VALUES (4);
+
+--connection s9
+reap;
+
+--connection server_2
+eval KILL CONNECTION $kill2_id;
+
+--connection s6
+--error 2013
+reap;
+
+--connection server_1
+SET gtid_domain_id=1;
+SET gtid_seq_no=4;
+INSERT INTO t1 VALUES (5);
+SET gtid_domain_id=2;
+SET gtid_seq_no=5;
+INSERT INTO t1 VALUES (6);
+
+--connection s8
+reap;
+--connection s1
+reap;
+--connection s2
+reap;
+--connection s5
+reap;
+--connection s10
+reap;
+
+--connection server_1
+SET gtid_domain_id=2;
+SET gtid_seq_no=10;
+INSERT INTO t1 VALUES (7);
+
+--connection s4
+reap;
+--connection s7
+reap;
+
+
--connection server_1
DROP TABLE t1;
diff --git a/mysql-test/suite/sys_vars/r/last_gtid_basic.result b/mysql-test/suite/sys_vars/r/last_gtid_basic.result
new file mode 100644
index 00000000000..d39b6595f04
--- /dev/null
+++ b/mysql-test/suite/sys_vars/r/last_gtid_basic.result
@@ -0,0 +1,9 @@
+SELECT @@global.last_gtid;
+ERROR HY000: Variable 'last_gtid' is a SESSION variable
+SET GLOBAL last_gtid= 10;
+ERROR HY000: Variable 'last_gtid' is a read only variable
+SET SESSION last_gtid= 20;
+ERROR HY000: Variable 'last_gtid' is a read only variable
+SELECT @@session.last_gtid;
+@@session.last_gtid
+
diff --git a/mysql-test/suite/sys_vars/t/last_gtid_basic.test b/mysql-test/suite/sys_vars/t/last_gtid_basic.test
new file mode 100644
index 00000000000..d1cd05f2c30
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/last_gtid_basic.test
@@ -0,0 +1,9 @@
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT @@global.last_gtid;
+
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SET GLOBAL last_gtid= 10;
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SET SESSION last_gtid= 20;
+
+SELECT @@session.last_gtid;
diff --git a/sql/item_create.cc b/sql/item_create.cc
index 60eabe67c83..c158816bf32 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -1783,6 +1783,19 @@ protected:
};
+class Create_func_master_gtid_wait : public Create_native_func
+{
+public:
+ virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
+
+ static Create_func_master_gtid_wait s_singleton;
+
+protected:
+ Create_func_master_gtid_wait() {}
+ virtual ~Create_func_master_gtid_wait() {}
+};
+
+
class Create_func_md5 : public Create_func_arg1
{
public:
@@ -4590,6 +4603,47 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name,
}
+Create_func_master_gtid_wait Create_func_master_gtid_wait::s_singleton;
+
+Item*
+Create_func_master_gtid_wait::create_native(THD *thd, LEX_STRING name,
+ List<Item> *item_list)
+{
+ Item *func= NULL;
+ int arg_count= 0;
+
+ thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
+
+ if (item_list != NULL)
+ arg_count= item_list->elements;
+
+ if (arg_count < 1 || arg_count > 2)
+ {
+ my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
+ return func;
+ }
+
+ thd->lex->safe_to_cache_query= 0;
+
+ Item *param_1= item_list->pop();
+ switch (arg_count) {
+ case 1:
+ {
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1);
+ break;
+ }
+ case 2:
+ {
+ Item *param_2= item_list->pop();
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1, param_2);
+ break;
+ }
+ }
+
+ return func;
+}
+
+
Create_func_md5 Create_func_md5::s_singleton;
Item*
@@ -5536,6 +5590,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("MAKEDATE") }, BUILDER(Create_func_makedate)},
{ { C_STRING_WITH_LEN("MAKETIME") }, BUILDER(Create_func_maketime)},
{ { C_STRING_WITH_LEN("MAKE_SET") }, BUILDER(Create_func_make_set)},
+ { { C_STRING_WITH_LEN("MASTER_GTID_WAIT") }, BUILDER(Create_func_master_gtid_wait)},
{ { C_STRING_WITH_LEN("MASTER_POS_WAIT") }, BUILDER(Create_func_master_pos_wait)},
{ { C_STRING_WITH_LEN("MBRCONTAINS") }, GEOM_BUILDER(Create_func_mbr_contains)},
{ { C_STRING_WITH_LEN("MBRDISJOINT") }, GEOM_BUILDER(Create_func_mbr_disjoint)},
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 5d9abbb0d8c..b2af80e6d96 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -3989,6 +3989,34 @@ err:
}
+longlong Item_master_gtid_wait::val_int()
+{
+ DBUG_ASSERT(fixed == 1);
+ longlong result= 0;
+
+ if (args[0]->null_value)
+ {
+ null_value= 1;
+ return 0;
+ }
+
+ null_value=0;
+#ifdef HAVE_REPLICATION
+ THD* thd= current_thd;
+ longlong timeout_us;
+ String *gtid_pos = args[0]->val_str(&value);
+
+ if (arg_count==2 && !args[1]->null_value)
+ timeout_us= (longlong)(1e6*args[1]->val_real());
+ else
+ timeout_us= (longlong)-1;
+
+ result= rpl_global_gtid_waiting.wait_for_pos(thd, gtid_pos, timeout_us);
+#endif
+ return result;
+}
+
+
/**
Enables a session to wait on a condition until a timeout or a network
disconnect occurs.
diff --git a/sql/item_func.h b/sql/item_func.h
index 384a6b535df..2e3f352e377 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -1642,6 +1642,22 @@ public:
};
+class Item_master_gtid_wait :public Item_int_func
+{
+ String value;
+public:
+ Item_master_gtid_wait(Item *a) :Item_int_func(a) {}
+ Item_master_gtid_wait(Item *a,Item *b) :Item_int_func(a,b) {}
+ longlong val_int();
+ const char *func_name() const { return "master_gtid_wait"; }
+ void fix_length_and_dec() { max_length=10+1+10+1+20+1; maybe_null=0;}
+ bool check_vcol_func_processor(uchar *int_arg)
+ {
+ return trace_unsupported_by_check_vcol_func_processor(func_name());
+ }
+};
+
+
/* Handling of user definable variables */
class user_var_entry;
diff --git a/sql/log.cc b/sql/log.cc
index fbb73acf5d1..c5cdc2cccc5 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -5446,6 +5446,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
}
if (err)
return true;
+ thd->last_commit_gtid= gtid;
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional,
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 4e6646feead..ebfa238df3f 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -780,6 +780,7 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
@@ -825,6 +826,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
+ { &key_LOCK_gtid_waiting, "gtid_waiting::LOCK_gtid_waiting", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -895,6 +897,7 @@ PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_prepare_ordered;
+PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
{
@@ -940,7 +943,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
- { &key_COND_prepare_ordered, "COND_prepare_ordered", 0}
+ { &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
+ { &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
@@ -1821,6 +1825,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
+ rpl_deinit_gtid_waiting();
rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
mysql_audit_finalize();
@@ -4201,6 +4206,7 @@ static int init_thread_environment()
#ifdef HAVE_REPLICATION
rpl_init_gtid_slave_state();
+ rpl_init_gtid_waiting();
#endif
DBUG_RETURN(0);
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 2e10b0caeb5..4fdd34fd8be 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -256,6 +256,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+extern PSI_mutex_key key_LOCK_gtid_waiting;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -285,6 +286,7 @@ extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
+extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 3f79a0cb528..00140fd3475 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
there will not be an attempt to delete the corresponding table row before
it is even committed.
*/
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
sql_print_warning("Slave: Out of memory during slave state maintenance. "
@@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
}
+static void
+rpl_slave_state_free_element(void *arg)
+{
+ struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
+ mysql_cond_destroy(&elem->COND_wait_gtid);
+ my_free(elem);
+}
+
+
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
}
@@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (!(elem= get_element(domain_id)))
return 1;
+ if (seq_no > elem->highest_seq_no)
+ elem->highest_seq_no= seq_no;
+ if (elem->min_wait_seq_no != 0 && elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
+ Signal (and remove) them. The waiter will handle all the processing
+ of all pending MASTER_GTID_WAIT(), so we do not slow down the
+ replication SQL thread.
+ */
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ elem->min_wait_seq_no= 0;
+ mysql_cond_broadcast(&elem->COND_wait_gtid);
+ }
+
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
@@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id)
return NULL;
elem->list= NULL;
elem->domain_id= domain_id;
+ elem->highest_seq_no= 0;
+ elem->min_wait_seq_no= 0;
+ mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
@@ -378,10 +405,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end;
}
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
if ((elem= get_element(gtid->domain_id)) == NULL)
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto end;
@@ -410,7 +437,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
cur->next= NULL;
elem->list= cur;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (!elist)
goto end;
@@ -470,9 +497,9 @@ end:
*/
if (elist)
{
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
}
ha_rollback_trans(thd, FALSE);
@@ -499,9 +526,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
sub_id= ++last_sub_id;
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return sub_id;
}
@@ -541,7 +568,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
@@ -576,19 +603,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
if ((res= (*cb)(&best_gtid, data)))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
@@ -659,11 +686,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
list_element *list;
uint64 best_sub_id;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return false;
}
@@ -681,7 +708,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
out_gtid->seq_no= list->seq_no;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return true;
}
@@ -811,7 +838,7 @@ rpl_slave_state::is_empty()
uint32 i;
bool result= true;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
@@ -821,7 +848,7 @@ rpl_slave_state::is_empty()
break;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return result;
}
@@ -1647,3 +1674,418 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
return 0;
}
+
+
+/*
+ Execute a MASTER_GTID_WAIT().
+ The position to wait for is in gtid_str in string form.
+ The timeout in microseconds is in timeout_us, zero means no timeout.
+
+ Returns:
+ 1 for error.
+ 0 for wait completed.
+ -1 for wait timed out.
+*/
+int
+gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
+{
+ int err;
+ rpl_gtid *wait_pos;
+ uint32 count, i;
+ struct timespec wait_until, *wait_until_ptr;
+
+ /* Wait for the empty position returns immediately. */
+ if (gtid_str->length() == 0)
+ return 0;
+
+ if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
+ &count)))
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+
+ if (timeout_us >= 0)
+ {
+ set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
+ wait_until_ptr= &wait_until;
+ }
+ else
+ wait_until_ptr= NULL;
+ err= 0;
+ for (i= 0; i < count; ++i)
+ {
+ if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
+ break;
+ }
+ my_free(wait_pos);
+ return err;
+}
+
+
+void
+gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
+{
+ queue_element *qe;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ if (queue_empty(&he->queue))
+ return;
+ qe= (queue_element *)queue_top(&he->queue);
+ qe->thd->wakeup_ready= true;
+ qe->wakeup_reason= queue_element::TAKEOVER;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+}
+
+void
+gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
+ gtid_waiting::hash_element *he)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ for (;;)
+ {
+ queue_element *qe;
+
+ if (queue_first_element(&he->queue) > queue_last_element(&he->queue))
+ break;
+ qe= (queue_element *)queue_top(&he->queue);
+ if (qe->wait_seq_no > wakeup_seq_no)
+ break;
+ queue_remove_top(&he->queue);
+ qe->thd->wakeup_ready= true;
+ qe->wakeup_reason= queue_element::DONE;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+ }
+}
+
+
+/*
+ Execute a MASTER_GTID_WAIT() for one specific domain.
+
+ The implementation is optimised primarily for (1) minimal performance impact
+ on the slave replication threads, and secondarily for (2) quick performance
+ of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
+ read to clients in an async replication read-scaleout scenario.
+
+ To achieve (1), we have a "small" wait and a "large" wait. The small wait
+ contends with the replication threads on the lock on the gtid_slave_pos, so
+ only minimal processing is done under that lock, and only a single waiter at
+ a time does the small wait.
+
+ If there is already a small waiter, a new thread will either replace the
+ small waiter (if it needs to wait for an earlier sequence number), or
+ instead to a "large" wait.
+
+ Once awoken on the small wait, the waiting thread releases the lock shared
+ with the SQL threads quickly, and then processes all waiters currently doing
+ the large wait.
+
+ This way, the SQL threads only need to do a single check + possibly a
+ pthread_cond_signal() when updating the gtid_slave_state, and the time that
+ non-SQL threads contend for the lock on gtid_slave_staste is minimized.
+*/
+int
+gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
+ struct timespec *wait_until)
+{
+ bool timed_out= false;
+#ifdef HAVE_REPLICATION
+ queue_element elem;
+ uint32_t domain_id= wait_gtid->domain_id;
+ uint64 seq_no= wait_gtid->seq_no;
+ hash_element *he;
+ rpl_slave_state::element *slave_state_elem= NULL;
+ const char *old_msg= NULL;
+ bool did_enter_cond= false;
+ bool takeover= false;
+
+ elem.wait_seq_no= seq_no;
+ elem.thd= thd;
+ /*
+ Register on the large wait before checking the small wait.
+ This ensures that if we find another waiter already doing the small wait,
+ we are sure to be woken up by that one, and thus we will not need to take
+ the lock on the small wait more than once in this case.
+ */
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ if (!(he= register_in_wait_hash(thd, wait_gtid, &elem)))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+
+ /*
+ Now check the small wait, and either do the large wait or the small one,
+ depending on whether there is already a suitable small waiter or not.
+
+ We may need to do this multiple times, as a previous small waiter may
+ complete and pass the small wait on to us.
+ */
+ for (;;)
+ {
+ uint64 wakeup_seq_no, cur_wait_seq_no;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ /*
+ The elements in the gtid_slave_state_hash are never re-allocated once
+ they enter the hash, so we do not need to re-do the lookup after releasing
+ and re-aquiring the lock.
+ */
+ if (!slave_state_elem &&
+ !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ remove_from_wait_hash(he, &elem);
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
+ {
+ /*
+ We do not have to wait. But we might need to wakeup other threads on
+ the large wait (can happen if we were woken up to take over the small
+ wait, and SQL thread raced with us to reach the waited-for GTID.
+ */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ thd->wakeup_ready= 0;
+ process_wait_hash(wakeup_seq_no, he);
+ /*
+ Since we already checked wakeup_seq_no, we are sure that
+ process_wait_hash() will mark us done.
+ */
+ DBUG_ASSERT(thd->wakeup_ready);
+ if (thd->wakeup_ready)
+ {
+ if (takeover)
+ promote_new_waiter(he);
+ break;
+ }
+ }
+ else if ((cur_wait_seq_no= slave_state_elem->min_wait_seq_no) == 0 ||
+ cur_wait_seq_no > seq_no)
+ {
+ /*
+ We have to do the small wait ourselves (stealing it from any thread that
+ might already be waiting for a later seq_no).
+ */
+ slave_state_elem->min_wait_seq_no= seq_no;
+ if (cur_wait_seq_no != 0)
+ {
+ /* We stole the wait, so wake up the old waiting thread. */
+ mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ }
+ /* Do the small wait. */
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+
+ old_msg= thd->enter_cond(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ "Waiting in MASTER_GTID_WAIT() (primary waiter)");
+ do
+ {
+ if (thd->check_killed())
+ slave_state_elem->min_wait_seq_no = 0;
+ else if (wait_until)
+ {
+ int err=
+ mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ {
+ timed_out= true;
+ slave_state_elem->min_wait_seq_no = 0;
+ }
+ }
+ else
+ mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state);
+ } while (slave_state_elem->min_wait_seq_no == seq_no);
+ /*
+ Check the new gtid_slave_state. We could be woken up because our seq_no
+ has been reached, or because someone else stole the small wait from us.
+ (Or because of kill/timeout).
+ */
+ wakeup_seq_no= slave_state_elem->highest_seq_no;
+
+ thd->exit_cond(old_msg);
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ /*
+ Note that hash_entry pointers do not change once allocated, so we do
+ not need to lookup `he' again after re-aquiring the lock.
+ */
+ thd->wakeup_ready= 0;
+ process_wait_hash(wakeup_seq_no, he);
+ if (thd->wakeup_ready)
+ promote_new_waiter(he);
+ else if (thd->killed || timed_out)
+ {
+ remove_from_wait_hash(he, &elem);
+ promote_new_waiter(he);
+ if (thd->killed)
+ thd->send_kill_message();
+ break;
+ }
+ }
+ else
+ {
+ /* We have to do the large wait. */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ thd->wakeup_ready= 0;
+ }
+
+ takeover= false;
+ old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
+ "Waiting in MASTER_GTID_WAIT()");
+ while (!thd->wakeup_ready && !thd->check_killed() && !timed_out)
+ {
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ if (wait_until)
+ {
+ int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
+ &LOCK_gtid_waiting, wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ timed_out= true;
+ }
+ else
+ mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
+ thd_wait_end(thd);
+ }
+
+ if (elem.wakeup_reason == queue_element::DONE)
+ break;
+ takeover= true;
+
+ if (thd->killed || timed_out)
+ {
+ remove_from_wait_hash(he, &elem);
+ /*
+ If we got kill/timeout _and_ we were asked to takeover the small wait,
+ we need to pass on that task to someone else.
+ */
+ if (thd->wakeup_ready && elem.wakeup_reason == queue_element::TAKEOVER)
+ promote_new_waiter(he);
+ if (thd->killed)
+ thd->send_kill_message();
+ break;
+ }
+ }
+
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+#endif /* HAVE_REPLICATION */
+ return timed_out ? -1 : 0;
+}
+
+
+static void
+free_hash_element(void *p)
+{
+ gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
+ delete_queue(&e->queue);
+ my_free(e);
+}
+
+
+void
+gtid_waiting::init()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(hash_element, domain_id), sizeof(uint32), NULL,
+ free_hash_element, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
+}
+
+
+void
+gtid_waiting::destroy()
+{
+ mysql_mutex_destroy(&LOCK_gtid_waiting);
+ my_hash_free(&hash);
+}
+
+
+static int
+cmp_queue_elem(void *, uchar *a, uchar *b)
+{
+ uint64 seq_no_a= *(uint64 *)a;
+ uint64 seq_no_b= *(uint64 *)b;
+ if (seq_no_a < seq_no_b)
+ return -1;
+ else if (seq_no_a == seq_no_b)
+ return 0;
+ else
+ return 1;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::get_entry(uint32 domain_id)
+{
+ hash_element *e;
+
+ if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ return e;
+
+ if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e));
+ return NULL;
+ }
+
+ if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
+ cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(e);
+ return NULL;
+ }
+ e->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)e))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete_queue(&e->queue);
+ my_free(e);
+ return NULL;
+ }
+ return e;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
+ gtid_waiting::queue_element *elem)
+{
+ hash_element *e;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (!(e= get_entry(wait_gtid->domain_id)))
+ return NULL;
+
+ if (queue_insert_safe(&e->queue, (uchar *)elem))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return e;
+}
+
+
+void
+gtid_waiting::remove_from_wait_hash(gtid_waiting::hash_element *e,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ queue_remove(&e->queue, elem->queue_idx);
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index b0bc54900e7..1cf57c45018 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -16,6 +16,10 @@
#ifndef RPL_GTID_H
#define RPL_GTID_H
+#include "hash.h"
+#include "queues.h"
+
+
/* Definitions for MariaDB global transaction ID (GTID). */
@@ -61,6 +65,15 @@ struct rpl_slave_state
{
struct list_element *list;
uint32 domain_id;
+ /* Highest seq_no seen so far in this domain. */
+ uint64 highest_seq_no;
+ /*
+ If min_wait_seq_no is non-zero, then it is the smallest seq_no in this
+ domain that someone is doing MASTER_GTID_WAIT() on. When we reach this
+ seq_no, we need to signal the waiter on COND_wait_gtid.
+ */
+ uint64 min_wait_seq_no;
+ mysql_cond_t COND_wait_gtid;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add(list_element *l)
@@ -99,9 +112,6 @@ struct rpl_slave_state
bool in_statement);
bool is_empty();
- void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
- void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
-
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
@@ -204,6 +214,49 @@ struct slave_connection_state
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
};
+
+/*
+ Structure to keep track of threads waiting in MASTER_GTID_WAIT().
+
+ Since replication is (mostly) single-threaded, we want to minimise the
+ performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
+ are careful to keep the common lock between replication threads and
+ MASTER_GTID_WAIT threads held for as short as possible. We keep only
+ a single thread waiting to be notified by the replication threads; this
+ thread then handles all the (potentially heavy) lifting of dealing with
+ all current waiting threads.
+*/
+
+struct gtid_waiting {
+ /* Elements in the hash, basically a priority queue for each domain. */
+ struct hash_element {
+ QUEUE queue;
+ uint32 domain_id;
+ };
+ /* A priority queue to handle waiters in one domain in seq_no order. */
+ struct queue_element {
+ uint64 wait_seq_no;
+ THD *thd;
+ int queue_idx;
+ enum { DONE, TAKEOVER } wakeup_reason;
+ };
+
+ mysql_mutex_t LOCK_gtid_waiting;
+ HASH hash;
+
+ void init();
+ void destroy();
+ hash_element *get_entry(uint32 domain_id);
+ int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
+ void promote_new_waiter(gtid_waiting::hash_element *he);
+ int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
+ void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
+ hash_element *register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
+ queue_element *elem);
+ void remove_from_wait_hash(hash_element *e, queue_element *elem);
+};
+
+
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
bool *first);
extern int gtid_check_rpl_slave_state_table(TABLE *table);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index cfa7c0f344f..8384297624c 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -37,6 +37,8 @@ static int count_relay_log_space(Relay_log_info* rli);
domain).
*/
rpl_slave_state rpl_global_gtid_slave_state;
+/* Object used for MASTER_GTID_WAIT(). */
+gtid_waiting rpl_global_gtid_waiting;
// Defined in slave.cc
@@ -1312,9 +1314,9 @@ rpl_load_gtid_slave_state(THD *thd)
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state.loaded;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);
@@ -1414,10 +1416,10 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (rpl_global_gtid_slave_state.loaded)
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
goto end;
}
@@ -1429,7 +1431,7 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.sub_id,
tmp_entry.gtid.seq_no)))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
@@ -1442,14 +1444,14 @@ rpl_load_gtid_slave_state(THD *thd)
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
}
rpl_global_gtid_slave_state.loaded= true;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
err= 0; /* Clear HA_ERR_END_OF_FILE */
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index ff2ffd0b366..ef18c7e7f9f 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -702,6 +702,7 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
+extern gtid_waiting rpl_global_gtid_waiting;
int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index b74ffecf82e..db61983ef29 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6547,7 +6547,7 @@ ER_UNTIL_REQUIRES_USING_GTID
ER_GTID_STRICT_OUT_OF_ORDER
eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled."
ER_GTID_START_FROM_BINLOG_HOLE
- eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled"
+ eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though a subsequent sequence number does exist), and GTID strict mode is enabled"
ER_SLAVE_UNEXPECTED_MASTER_SWITCH
eng "Unexpected GTID received from master after reconnect. This normally indicates that the master server was replaced without restarting the slave threads. %s"
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 8abdd53469f..b5201371e15 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1266,6 +1266,7 @@ void THD::init(void)
set_status_var_init();
bzero((char *) &org_status_var, sizeof(org_status_var));
start_bytes_received= 0;
+ last_commit_gtid.seq_no= 0;
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index a3fc3a7866f..c1cfd8b4d5b 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -44,6 +44,7 @@
#include "thr_lock.h" /* thr_lock_type, THR_LOCK_DATA,
THR_LOCK_INFO */
#include "my_apc.h"
+#include "rpl_gtid.h"
class Reprepare_observer;
class Relay_log_info;
@@ -3405,6 +3406,12 @@ private:
*/
LEX_STRING invoker_user;
LEX_STRING invoker_host;
+
+ /* Protect against add/delete of temporary tables in parallel replication */
+ void rgi_lock_temporary_tables();
+ void rgi_unlock_temporary_tables();
+ bool rgi_have_temporary_tables();
+public:
/*
Flag, mutex and condition for a thread to wait for a signal from another
thread.
@@ -3415,12 +3422,12 @@ private:
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
mysql_cond_t COND_wakeup_ready;
+ /*
+ The GTID assigned to the last commit. If no GTID was assigned to any commit
+ so far, this is indicated by last_commit_gtid.seq_no == 0.
+ */
+ rpl_gtid last_commit_gtid;
- /* Protect against add/delete of temporary tables in parallel replication */
- void rgi_lock_temporary_tables();
- void rgi_unlock_temporary_tables();
- bool rgi_have_temporary_tables();
-public:
inline void lock_temporary_tables()
{
if (rgi_slave)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 363b53e05a6..7c4e5b1b383 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -3965,6 +3965,20 @@ rpl_deinit_gtid_slave_state()
}
+void
+rpl_init_gtid_waiting()
+{
+ rpl_global_gtid_waiting.init();
+}
+
+
+void
+rpl_deinit_gtid_waiting()
+{
+ rpl_global_gtid_waiting.destroy();
+}
+
+
/*
Format the current GTID state as a string, for returning the value of
@@global.gtid_slave_pos.
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index da55e3e863f..defb1b23f5b 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -70,6 +70,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
+void rpl_init_gtid_waiting();
+void rpl_deinit_gtid_waiting();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 08b4953b2e4..cd24ad38eb2 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1538,6 +1538,33 @@ static Sys_var_gtid_binlog_state Sys_gtid_binlog_state(
GLOBAL_VAR(opt_gtid_binlog_state_dummy), NO_CMD_LINE);
+static Sys_var_last_gtid Sys_last_gtid(
+ "last_gtid", "The GTID of the last commit (if binlogging was enabled), "
+ "or the empty string if none.",
+ READ_ONLY sys_var::ONLY_SESSION, NO_CMD_LINE);
+
+
+uchar *
+Sys_var_last_gtid::session_value_ptr(THD *thd, LEX_STRING *base)
+{
+ char buf[10+1+10+1+20+1];
+ String str(buf, sizeof(buf), system_charset_info);
+ char *p;
+ bool first= true;
+
+ str.length(0);
+ if ((thd->last_commit_gtid.seq_no > 0 &&
+ rpl_slave_state_tostring_helper(&str, &thd->last_commit_gtid, &first)) ||
+ !(p= thd->strmake(str.ptr(), str.length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return (uchar *)p;
+}
+
+
static bool
check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
{
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 3cc4da32811..6a84fc5fbc2 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -2211,3 +2211,53 @@ public:
}
uchar *global_value_ptr(THD *thd, LEX_STRING *base);
};
+
+
+/**
+ Class for @@session.last_gtid.
+*/
+class Sys_var_last_gtid: public sys_var
+{
+public:
+ Sys_var_last_gtid(const char *name_arg,
+ const char *comment, int flag_args, CMD_LINE getopt)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, 0, getopt.id,
+ getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, NULL)
+ {
+ option.var_type= GET_STR;
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool check_update_type(Item_result type) {
+ DBUG_ASSERT(false);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base);
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(false);
+ return NULL;
+ }
+};