summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-02-07 19:15:28 +0100
committerunknown <knielsen@knielsen-hq.org>2014-02-07 19:15:28 +0100
commit4e6606acad4ad0ea75dec114ad316e0325efaf02 (patch)
treeabb53cd21ab57261299ec9c96fef4089f7025d1d
parentce02738d7f2f2688eeec7004dd6a30293d36044f (diff)
downloadmariadb-git-4e6606acad4ad0ea75dec114ad316e0325efaf02.tar.gz
MDEV-4984: Implement MASTER_GTID_WAIT() and @@LAST_GTID.
MASTER_GTID_WAIT() is similar to MASTER_POS_WAIT(), but works with a GTID position rather than an old-style filename/offset. @@LAST_GTID gives the GTID assigned to the last transaction written into the binlog. Together, the two can be used by applications to obtain the GTID of an update on the master, and then do a MASTER_GTID_WAIT() for that position on any read slave where it is important to get results that are caught up with the master at least to the point of the update. The implementation of MASTER_GTID_WAIT() is implemented in a way that tries to minimise the performance impact on the SQL threads, even in the presense of many waiters on single GTID positions (as from @@LAST_GTID).
-rw-r--r--client/mysql.cc1
-rw-r--r--include/queues.h1
-rw-r--r--mysql-test/include/sync_with_master_gtid.inc47
-rw-r--r--mysql-test/suite/perfschema/r/all_instances.result1
-rw-r--r--mysql-test/suite/perfschema/r/dml_setup_instruments.result2
-rw-r--r--mysql-test/suite/rpl/r/rpl_gtid_basic.result110
-rw-r--r--mysql-test/suite/rpl/t/rpl_gtid_basic.test164
-rw-r--r--mysql-test/suite/sys_vars/r/last_gtid_basic.result9
-rw-r--r--mysql-test/suite/sys_vars/t/last_gtid_basic.test9
-rw-r--r--sql/item_create.cc55
-rw-r--r--sql/item_func.cc28
-rw-r--r--sql/item_func.h16
-rw-r--r--sql/log.cc1
-rw-r--r--sql/mysqld.cc8
-rw-r--r--sql/mysqld.h2
-rw-r--r--sql/rpl_gtid.cc480
-rw-r--r--sql/rpl_gtid.h59
-rw-r--r--sql/rpl_rli.cc16
-rw-r--r--sql/rpl_rli.h1
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/sql_class.cc1
-rw-r--r--sql/sql_class.h17
-rw-r--r--sql/sql_repl.cc14
-rw-r--r--sql/sql_repl.h2
-rw-r--r--sql/sys_vars.cc27
-rw-r--r--sql/sys_vars.h50
26 files changed, 1084 insertions, 39 deletions
diff --git a/client/mysql.cc b/client/mysql.cc
index 4f9b4c3bc92..fa0dd5faa1c 100644
--- a/client/mysql.cc
+++ b/client/mysql.cc
@@ -915,6 +915,7 @@ static COMMANDS commands[] = {
{ "MAKE_SET", 0, 0, 0, ""},
{ "MAKEDATE", 0, 0, 0, ""},
{ "MAKETIME", 0, 0, 0, ""},
+ { "MASTER_GTID_WAIT", 0, 0, 0, ""},
{ "MASTER_POS_WAIT", 0, 0, 0, ""},
{ "MAX", 0, 0, 0, ""},
{ "MBRCONTAINS", 0, 0, 0, ""},
diff --git a/include/queues.h b/include/queues.h
index 4fef72b149c..f341bbb8148 100644
--- a/include/queues.h
+++ b/include/queues.h
@@ -51,6 +51,7 @@ typedef struct st_queue {
#define queue_first_element(queue) 1
#define queue_last_element(queue) (queue)->elements
+#define queue_empty(queue) ((queue)->elements == 0)
#define queue_top(queue) ((queue)->root[1])
#define queue_element(queue,index) ((queue)->root[index])
#define queue_end(queue) ((queue)->root[(queue)->elements])
diff --git a/mysql-test/include/sync_with_master_gtid.inc b/mysql-test/include/sync_with_master_gtid.inc
new file mode 100644
index 00000000000..7512c045c6b
--- /dev/null
+++ b/mysql-test/include/sync_with_master_gtid.inc
@@ -0,0 +1,47 @@
+# ==== Purpose ====
+#
+# Wait until the slave has reached a certain GTID position.
+# Similar to --sync_with_master, but using GTID instead of old-style
+# binlog file/offset coordinates.
+#
+#
+# ==== Usage ====
+#
+# --let $master_pos= `SELECT @@GLOBAL.gtid_binlog_pos`
+# [--let $slave_timeout= NUMBER]
+# [--let $rpl_debug= 1]
+# --source include/sync_with_master_gtid.inc
+#
+# Syncs slave to the specified GTID position.
+#
+# Must be called on the slave.
+#
+# Parameters:
+# $master_pos
+# The GTID position to sync to. Typically obtained from
+# @@GLOBAL.gtid_binlog_pos on the master.
+#
+# $slave_timeout
+# Timeout in seconds. The default is 2 minutes.
+#
+# $rpl_debug
+# See include/rpl_init.inc
+
+--let $include_filename= sync_with_master_gtid.inc
+--source include/begin_include_file.inc
+
+let $_slave_timeout= $slave_timeout;
+if (!$_slave_timeout)
+{
+ let $_slave_timeout= 120;
+}
+
+--let $_result= `SELECT master_gtid_wait('$master_pos', $_slave_timeout)`
+if ($_result == -1)
+{
+ --let $_current_gtid_pos= `SELECT @@GLOBAL.gtid_slave_pos`
+ --die Timeout in master_gtid_wait('$master_pos', $_slave_timeout), current slave GTID position is: $_current_gtid_pos.
+}
+
+--let $include_filename= sync_with_master_gtid.inc
+--source include/end_include_file.inc
diff --git a/mysql-test/suite/perfschema/r/all_instances.result b/mysql-test/suite/perfschema/r/all_instances.result
index 266c3eb5a9c..fd20f27c929 100644
--- a/mysql-test/suite/perfschema/r/all_instances.result
+++ b/mysql-test/suite/perfschema/r/all_instances.result
@@ -37,6 +37,7 @@ wait/synch/mutex/mysys/THR_LOCK_threads
wait/synch/mutex/mysys/TMPDIR_mutex
wait/synch/mutex/sql/Cversion_lock
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state
+wait/synch/mutex/sql/gtid_waiting::LOCK_gtid_waiting
wait/synch/mutex/sql/hash_filo::lock
wait/synch/mutex/sql/LOCK_active_mi
wait/synch/mutex/sql/LOCK_audit_mask
diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
index 38c02cc2bf4..17087264e7c 100644
--- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result
+++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
@@ -7,13 +7,13 @@ NAME ENABLED TIMED
wait/synch/mutex/sql/Cversion_lock YES YES
wait/synch/mutex/sql/Delayed_insert::mutex YES YES
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state YES YES
+wait/synch/mutex/sql/gtid_waiting::LOCK_gtid_waiting YES YES
wait/synch/mutex/sql/hash_filo::lock YES YES
wait/synch/mutex/sql/HA_DATA_PARTITION::LOCK_auto_inc YES YES
wait/synch/mutex/sql/LOCK_active_mi YES YES
wait/synch/mutex/sql/LOCK_audit_mask YES YES
wait/synch/mutex/sql/LOCK_binlog_state YES YES
wait/synch/mutex/sql/LOCK_commit_ordered YES YES
-wait/synch/mutex/sql/LOCK_connection_count YES YES
select * from performance_schema.setup_instruments
where name like 'Wait/Synch/Rwlock/sql/%'
and name not in ('wait/synch/rwlock/sql/CRYPTO_dynlock_value::lock')
diff --git a/mysql-test/suite/rpl/r/rpl_gtid_basic.result b/mysql-test/suite/rpl/r/rpl_gtid_basic.result
index ecad4a6acf1..d15de315914 100644
--- a/mysql-test/suite/rpl/r/rpl_gtid_basic.result
+++ b/mysql-test/suite/rpl/r/rpl_gtid_basic.result
@@ -197,9 +197,119 @@ CREATE TABLE t1 (a INT PRIMARY KEY);
SET gtid_seq_no=100;
INSERT INTO t1 VALUES (1);
include/start_slave.inc
+include/sync_with_master_gtid.inc
SELECT * FROM t1;
a
1
Gtid_IO_Pos = '0-1-100'
+*** Test @@LAST_GTID and MASTER_GTID_WAIT() ***
+DROP TABLE t1;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+include/stop_slave.inc
+SELECT @@last_gtid;
+@@last_gtid
+
+SET gtid_seq_no=110;
+SELECT @@last_gtid;
+@@last_gtid
+
+BEGIN;
+SELECT @@last_gtid;
+@@last_gtid
+
+INSERT INTO t1 VALUES (2);
+SELECT @@last_gtid;
+@@last_gtid
+
+COMMIT;
+SELECT @@last_gtid;
+@@last_gtid
+0-1-110
+SET @pos= '0-1-110';
+SELECT master_gtid_wait(NULL);
+master_gtid_wait(NULL)
+NULL
+SELECT master_gtid_wait('', NULL);
+master_gtid_wait('', NULL)
+0
+SELECT master_gtid_wait(@pos, 0.5);
+master_gtid_wait(@pos, 0.5)
+-1
+SELECT * FROM t1 ORDER BY a;
+a
+SELECT master_gtid_wait(@pos);
+include/start_slave.inc
+master_gtid_wait(@pos)
+0
+SELECT * FROM t1 ORDER BY a;
+a
+2
+include/stop_slave.inc
+SET gtid_domain_id= 1;
+INSERT INTO t1 VALUES (3);
+SET @pos= '1-1-1,0-1-110';
+SELECT master_gtid_wait(@pos, 0);
+master_gtid_wait(@pos, 0)
+-1
+SELECT * FROM t1 WHERE a >= 3;
+a
+SELECT master_gtid_wait(@pos, -1);
+include/start_slave.inc
+master_gtid_wait(@pos, -1)
+0
+SELECT * FROM t1 WHERE a >= 3;
+a
+3
+SELECT master_gtid_wait('1-1-1', 0);
+master_gtid_wait('1-1-1', 0)
+0
+SELECT master_gtid_wait('2-1-1,1-1-4,0-1-110');
+SELECT master_gtid_wait('0-1-1000', 0.5);
+SELECT master_gtid_wait('0-1-2000');
+SELECT master_gtid_wait('2-1-10');
+SELECT master_gtid_wait('2-1-5', 1);
+SELECT master_gtid_wait('2-1-5');
+SELECT master_gtid_wait('2-1-10');
+SELECT master_gtid_wait('2-1-5,1-1-4,0-1-110');
+SELECT master_gtid_wait('2-1-2');
+SELECT master_gtid_wait('1-1-1');
+master_gtid_wait('1-1-1')
+0
+SELECT master_gtid_wait('0-1-109');
+SELECT master_gtid_wait('2-1-2', 0.5);
+master_gtid_wait('2-1-2', 0.5)
+-1
+KILL QUERY 22;
+ERROR 70100: Query execution was interrupted
+SET gtid_domain_id=2;
+SET gtid_seq_no=2;
+INSERT INTO t1 VALUES (4);
+master_gtid_wait('2-1-2')
+0
+KILL CONNECTION 25;
+ERROR HY000: Lost connection to MySQL server during query
+SET gtid_domain_id=1;
+SET gtid_seq_no=4;
+INSERT INTO t1 VALUES (5);
+SET gtid_domain_id=2;
+SET gtid_seq_no=5;
+INSERT INTO t1 VALUES (6);
+master_gtid_wait('2-1-5,1-1-4,0-1-110')
+0
+master_gtid_wait('2-1-1,1-1-4,0-1-110')
+0
+master_gtid_wait('0-1-1000', 0.5)
+-1
+master_gtid_wait('2-1-5', 1)
+0
+master_gtid_wait('0-1-109')
+0
+SET gtid_domain_id=2;
+SET gtid_seq_no=10;
+INSERT INTO t1 VALUES (7);
+master_gtid_wait('2-1-10')
+0
+master_gtid_wait('2-1-10')
+0
DROP TABLE t1;
include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_gtid_basic.test b/mysql-test/suite/rpl/t/rpl_gtid_basic.test
index 687c0d62cb1..dff7609cb99 100644
--- a/mysql-test/suite/rpl/t/rpl_gtid_basic.test
+++ b/mysql-test/suite/rpl/t/rpl_gtid_basic.test
@@ -199,14 +199,174 @@ INSERT INTO t1 VALUES (1);
# We cannot just use sync_with_master as we've done RESET MASTER, so
# slave old-style position is wrong.
# So sync on gtid position instead.
---let $wait_condition= SELECT @@GLOBAL.gtid_binlog_pos = '$master_pos'
---source include/wait_condition.inc
+--source include/sync_with_master_gtid.inc
SELECT * FROM t1;
# Check that the IO gtid position in SHOW SLAVE STATUS is also correct.
--let $status_items= Gtid_IO_Pos
--source include/show_slave_status.inc
+--echo *** Test @@LAST_GTID and MASTER_GTID_WAIT() ***
+
+--connection server_1
+DROP TABLE t1;
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+--source include/stop_slave.inc
+
+--connect (m1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SELECT @@last_gtid;
+SET gtid_seq_no=110;
+SELECT @@last_gtid;
+BEGIN;
+SELECT @@last_gtid;
+INSERT INTO t1 VALUES (2);
+SELECT @@last_gtid;
+COMMIT;
+SELECT @@last_gtid;
+--let $pos= `SELECT @@gtid_binlog_pos`
+
+--connect (s1,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+eval SET @pos= '$pos';
+# Check NULL argument.
+SELECT master_gtid_wait(NULL);
+# Check empty argument returns immediately.
+SELECT master_gtid_wait('', NULL);
+# Let's check that we get a timeout
+SELECT master_gtid_wait(@pos, 0.5);
+SELECT * FROM t1 ORDER BY a;
+# Now actually wait until the slave reaches the position
+send SELECT master_gtid_wait(@pos);
+
+--connection server_2
+--source include/start_slave.inc
+
+--connection s1
+reap;
+SELECT * FROM t1 ORDER BY a;
+
+# Test waiting on a domain that does not exist yet.
+--source include/stop_slave.inc
+
+--connection server_1
+SET gtid_domain_id= 1;
+INSERT INTO t1 VALUES (3);
+--let $pos= `SELECT @@gtid_binlog_pos`
+
+--connection s1
+eval SET @pos= '$pos';
+SELECT master_gtid_wait(@pos, 0);
+SELECT * FROM t1 WHERE a >= 3;
+send SELECT master_gtid_wait(@pos, -1);
+
+--connection server_2
+--source include/start_slave.inc
+
+--connection s1
+reap;
+SELECT * FROM t1 WHERE a >= 3;
+# Waiting for only part of the position.
+SELECT master_gtid_wait('1-1-1', 0);
+
+# Now test a lot of parallel master_gtid_wait() calls, completing in different
+# order, and some of which time out or get killed on the way.
+
+--connection s1
+send SELECT master_gtid_wait('2-1-1,1-1-4,0-1-110');
+
+--connect (s2,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+# This will time out.
+send SELECT master_gtid_wait('0-1-1000', 0.5);
+
+--connect (s3,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+# This one we will kill
+--let $kill1_id= `SELECT connection_id()`
+send SELECT master_gtid_wait('0-1-2000');
+
+--connect (s4,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-10');
+
+--connect (s5,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-5', 1);
+
+# This one we will kill also.
+--connect (s6,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+--let $kill2_id= `SELECT connection_id()`
+send SELECT master_gtid_wait('2-1-5');
+
+--connect (s7,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-10');
+
+--connect (s8,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-5,1-1-4,0-1-110');
+
+--connect (s9,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('2-1-2');
+
+--connection server_2
+# This one completes immediately.
+SELECT master_gtid_wait('1-1-1');
+
+--connect (s10,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+send SELECT master_gtid_wait('0-1-109');
+
+--connection server_2
+# This one should time out.
+SELECT master_gtid_wait('2-1-2', 0.5);
+
+eval KILL QUERY $kill1_id;
+--connection s3
+--error ER_QUERY_INTERRUPTED
+reap;
+
+--connection server_1
+SET gtid_domain_id=2;
+SET gtid_seq_no=2;
+INSERT INTO t1 VALUES (4);
+
+--connection s9
+reap;
+
+--connection server_2
+eval KILL CONNECTION $kill2_id;
+
+--connection s6
+--error 2013
+reap;
+
+--connection server_1
+SET gtid_domain_id=1;
+SET gtid_seq_no=4;
+INSERT INTO t1 VALUES (5);
+SET gtid_domain_id=2;
+SET gtid_seq_no=5;
+INSERT INTO t1 VALUES (6);
+
+--connection s8
+reap;
+--connection s1
+reap;
+--connection s2
+reap;
+--connection s5
+reap;
+--connection s10
+reap;
+
+--connection server_1
+SET gtid_domain_id=2;
+SET gtid_seq_no=10;
+INSERT INTO t1 VALUES (7);
+
+--connection s4
+reap;
+--connection s7
+reap;
+
+
--connection server_1
DROP TABLE t1;
diff --git a/mysql-test/suite/sys_vars/r/last_gtid_basic.result b/mysql-test/suite/sys_vars/r/last_gtid_basic.result
new file mode 100644
index 00000000000..d39b6595f04
--- /dev/null
+++ b/mysql-test/suite/sys_vars/r/last_gtid_basic.result
@@ -0,0 +1,9 @@
+SELECT @@global.last_gtid;
+ERROR HY000: Variable 'last_gtid' is a SESSION variable
+SET GLOBAL last_gtid= 10;
+ERROR HY000: Variable 'last_gtid' is a read only variable
+SET SESSION last_gtid= 20;
+ERROR HY000: Variable 'last_gtid' is a read only variable
+SELECT @@session.last_gtid;
+@@session.last_gtid
+
diff --git a/mysql-test/suite/sys_vars/t/last_gtid_basic.test b/mysql-test/suite/sys_vars/t/last_gtid_basic.test
new file mode 100644
index 00000000000..d1cd05f2c30
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/last_gtid_basic.test
@@ -0,0 +1,9 @@
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT @@global.last_gtid;
+
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SET GLOBAL last_gtid= 10;
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SET SESSION last_gtid= 20;
+
+SELECT @@session.last_gtid;
diff --git a/sql/item_create.cc b/sql/item_create.cc
index 60eabe67c83..c158816bf32 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -1783,6 +1783,19 @@ protected:
};
+class Create_func_master_gtid_wait : public Create_native_func
+{
+public:
+ virtual Item *create_native(THD *thd, LEX_STRING name, List<Item> *item_list);
+
+ static Create_func_master_gtid_wait s_singleton;
+
+protected:
+ Create_func_master_gtid_wait() {}
+ virtual ~Create_func_master_gtid_wait() {}
+};
+
+
class Create_func_md5 : public Create_func_arg1
{
public:
@@ -4590,6 +4603,47 @@ Create_func_master_pos_wait::create_native(THD *thd, LEX_STRING name,
}
+Create_func_master_gtid_wait Create_func_master_gtid_wait::s_singleton;
+
+Item*
+Create_func_master_gtid_wait::create_native(THD *thd, LEX_STRING name,
+ List<Item> *item_list)
+{
+ Item *func= NULL;
+ int arg_count= 0;
+
+ thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
+
+ if (item_list != NULL)
+ arg_count= item_list->elements;
+
+ if (arg_count < 1 || arg_count > 2)
+ {
+ my_error(ER_WRONG_PARAMCOUNT_TO_NATIVE_FCT, MYF(0), name.str);
+ return func;
+ }
+
+ thd->lex->safe_to_cache_query= 0;
+
+ Item *param_1= item_list->pop();
+ switch (arg_count) {
+ case 1:
+ {
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1);
+ break;
+ }
+ case 2:
+ {
+ Item *param_2= item_list->pop();
+ func= new (thd->mem_root) Item_master_gtid_wait(param_1, param_2);
+ break;
+ }
+ }
+
+ return func;
+}
+
+
Create_func_md5 Create_func_md5::s_singleton;
Item*
@@ -5536,6 +5590,7 @@ static Native_func_registry func_array[] =
{ { C_STRING_WITH_LEN("MAKEDATE") }, BUILDER(Create_func_makedate)},
{ { C_STRING_WITH_LEN("MAKETIME") }, BUILDER(Create_func_maketime)},
{ { C_STRING_WITH_LEN("MAKE_SET") }, BUILDER(Create_func_make_set)},
+ { { C_STRING_WITH_LEN("MASTER_GTID_WAIT") }, BUILDER(Create_func_master_gtid_wait)},
{ { C_STRING_WITH_LEN("MASTER_POS_WAIT") }, BUILDER(Create_func_master_pos_wait)},
{ { C_STRING_WITH_LEN("MBRCONTAINS") }, GEOM_BUILDER(Create_func_mbr_contains)},
{ { C_STRING_WITH_LEN("MBRDISJOINT") }, GEOM_BUILDER(Create_func_mbr_disjoint)},
diff --git a/sql/item_func.cc b/sql/item_func.cc
index 5d9abbb0d8c..b2af80e6d96 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -3989,6 +3989,34 @@ err:
}
+longlong Item_master_gtid_wait::val_int()
+{
+ DBUG_ASSERT(fixed == 1);
+ longlong result= 0;
+
+ if (args[0]->null_value)
+ {
+ null_value= 1;
+ return 0;
+ }
+
+ null_value=0;
+#ifdef HAVE_REPLICATION
+ THD* thd= current_thd;
+ longlong timeout_us;
+ String *gtid_pos = args[0]->val_str(&value);
+
+ if (arg_count==2 && !args[1]->null_value)
+ timeout_us= (longlong)(1e6*args[1]->val_real());
+ else
+ timeout_us= (longlong)-1;
+
+ result= rpl_global_gtid_waiting.wait_for_pos(thd, gtid_pos, timeout_us);
+#endif
+ return result;
+}
+
+
/**
Enables a session to wait on a condition until a timeout or a network
disconnect occurs.
diff --git a/sql/item_func.h b/sql/item_func.h
index 384a6b535df..2e3f352e377 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -1642,6 +1642,22 @@ public:
};
+class Item_master_gtid_wait :public Item_int_func
+{
+ String value;
+public:
+ Item_master_gtid_wait(Item *a) :Item_int_func(a) {}
+ Item_master_gtid_wait(Item *a,Item *b) :Item_int_func(a,b) {}
+ longlong val_int();
+ const char *func_name() const { return "master_gtid_wait"; }
+ void fix_length_and_dec() { max_length=10+1+10+1+20+1; maybe_null=0;}
+ bool check_vcol_func_processor(uchar *int_arg)
+ {
+ return trace_unsupported_by_check_vcol_func_processor(func_name());
+ }
+};
+
+
/* Handling of user definable variables */
class user_var_entry;
diff --git a/sql/log.cc b/sql/log.cc
index fbb73acf5d1..c5cdc2cccc5 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -5446,6 +5446,7 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
}
if (err)
return true;
+ thd->last_commit_gtid= gtid;
Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional,
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 4e6646feead..ebfa238df3f 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -780,6 +780,7 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
@@ -825,6 +826,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
{ &key_LOCK_wait_commit, "wait_for_commit::LOCK_wait_commit", 0},
+ { &key_LOCK_gtid_waiting, "gtid_waiting::LOCK_gtid_waiting", 0},
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
@@ -895,6 +897,7 @@ PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_prepare_ordered;
+PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
{
@@ -940,7 +943,8 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
- { &key_COND_prepare_ordered, "COND_prepare_ordered", 0}
+ { &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
+ { &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
@@ -1821,6 +1825,7 @@ static void mysqld_exit(int exit_code)
but if a kill -15 signal was sent, the signal thread did
spawn the kill_server_thread thread, which is running concurrently.
*/
+ rpl_deinit_gtid_waiting();
rpl_deinit_gtid_slave_state();
wait_for_signal_thread_to_end();
mysql_audit_finalize();
@@ -4201,6 +4206,7 @@ static int init_thread_environment()
#ifdef HAVE_REPLICATION
rpl_init_gtid_slave_state();
+ rpl_init_gtid_waiting();
#endif
DBUG_RETURN(0);
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 2e10b0caeb5..4fdd34fd8be 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -256,6 +256,7 @@ extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
extern PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
key_LOCK_global_index_stats, key_LOCK_wakeup_ready, key_LOCK_wait_commit;
+extern PSI_mutex_key key_LOCK_gtid_waiting;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
@@ -285,6 +286,7 @@ extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
key_COND_parallel_entry;
+extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 3f79a0cb528..00140fd3475 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -43,9 +43,9 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
there will not be an attempt to delete the corresponding table row before
it is even committed.
*/
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (err)
{
sql_print_warning("Slave: Out of memory during slave state maintenance. "
@@ -82,11 +82,20 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
}
+static void
+rpl_slave_state_free_element(void *arg)
+{
+ struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg;
+ mysql_cond_destroy(&elem->COND_wait_gtid);
+ my_free(elem);
+}
+
+
rpl_slave_state::rpl_slave_state()
: last_sub_id(0), inited(false), loaded(false)
{
my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id),
- sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ sizeof(uint32), NULL, rpl_slave_state_free_element, HASH_UNIQUE);
}
@@ -146,6 +155,21 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id,
if (!(elem= get_element(domain_id)))
return 1;
+ if (seq_no > elem->highest_seq_no)
+ elem->highest_seq_no= seq_no;
+ if (elem->min_wait_seq_no != 0 && elem->min_wait_seq_no <= seq_no)
+ {
+ /*
+ Someone was waiting in MASTER_GTID_WAIT() for this GTID to appear.
+ Signal (and remove) them. The waiter will handle all the processing
+ of all pending MASTER_GTID_WAIT(), so we do not slow down the
+ replication SQL thread.
+ */
+ mysql_mutex_assert_owner(&LOCK_slave_state);
+ elem->min_wait_seq_no= 0;
+ mysql_cond_broadcast(&elem->COND_wait_gtid);
+ }
+
if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME))))
return 1;
list_elem->server_id= server_id;
@@ -173,6 +197,9 @@ rpl_slave_state::get_element(uint32 domain_id)
return NULL;
elem->list= NULL;
elem->domain_id= domain_id;
+ elem->highest_seq_no= 0;
+ elem->min_wait_seq_no= 0;
+ mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0);
if (my_hash_insert(&hash, (uchar *)elem))
{
my_free(elem);
@@ -378,10 +405,10 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
goto end;
}
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
if ((elem= get_element(gtid->domain_id)) == NULL)
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
err= 1;
goto end;
@@ -410,7 +437,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
cur->next= NULL;
elem->list= cur;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
if (!elist)
goto end;
@@ -470,9 +497,9 @@ end:
*/
if (elist)
{
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
put_back_list(gtid->domain_id, elist);
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
}
ha_rollback_trans(thd, FALSE);
@@ -499,9 +526,9 @@ rpl_slave_state::next_sub_id(uint32 domain_id)
{
uint64 sub_id= 0;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
sub_id= ++last_sub_id;
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return sub_id;
}
@@ -541,7 +568,7 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
my_hash_insert(&gtid_hash, (uchar *)(&extra_gtids[i])))
goto err;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
@@ -576,19 +603,19 @@ rpl_slave_state::iterate(int (*cb)(rpl_gtid *, void *), void *data,
memcpy(&best_gtid, gtid, sizeof(best_gtid));
if (my_hash_delete(&gtid_hash, rec))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
if ((res= (*cb)(&best_gtid, data)))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
goto err;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
/* Also add any remaining extra domain_ids. */
for (i= 0; i < gtid_hash.records; ++i)
@@ -659,11 +686,11 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
list_element *list;
uint64 best_sub_id;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
elem= (element *)my_hash_search(&hash, (const uchar *)&domain_id, 0);
if (!elem || !(list= elem->list))
{
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return false;
}
@@ -681,7 +708,7 @@ rpl_slave_state::domain_to_gtid(uint32 domain_id, rpl_gtid *out_gtid)
out_gtid->seq_no= list->seq_no;
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return true;
}
@@ -811,7 +838,7 @@ rpl_slave_state::is_empty()
uint32 i;
bool result= true;
- lock();
+ mysql_mutex_lock(&LOCK_slave_state);
for (i= 0; i < hash.records; ++i)
{
element *e= (element *)my_hash_element(&hash, i);
@@ -821,7 +848,7 @@ rpl_slave_state::is_empty()
break;
}
}
- unlock();
+ mysql_mutex_unlock(&LOCK_slave_state);
return result;
}
@@ -1647,3 +1674,418 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
return 0;
}
+
+
+/*
+ Execute a MASTER_GTID_WAIT().
+ The position to wait for is in gtid_str in string form.
+ The timeout in microseconds is in timeout_us, zero means no timeout.
+
+ Returns:
+ 1 for error.
+ 0 for wait completed.
+ -1 for wait timed out.
+*/
+int
+gtid_waiting::wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us)
+{
+ int err;
+ rpl_gtid *wait_pos;
+ uint32 count, i;
+ struct timespec wait_until, *wait_until_ptr;
+
+ /* Wait for the empty position returns immediately. */
+ if (gtid_str->length() == 0)
+ return 0;
+
+ if (!(wait_pos= gtid_parse_string_to_list(gtid_str->ptr(), gtid_str->length(),
+ &count)))
+ {
+ my_error(ER_INCORRECT_GTID_STATE, MYF(0));
+ return 1;
+ }
+
+ if (timeout_us >= 0)
+ {
+ set_timespec_nsec(wait_until, (ulonglong)1000*timeout_us);
+ wait_until_ptr= &wait_until;
+ }
+ else
+ wait_until_ptr= NULL;
+ err= 0;
+ for (i= 0; i < count; ++i)
+ {
+ if ((err= wait_for_gtid(thd, &wait_pos[i], wait_until_ptr)))
+ break;
+ }
+ my_free(wait_pos);
+ return err;
+}
+
+
+void
+gtid_waiting::promote_new_waiter(gtid_waiting::hash_element *he)
+{
+ queue_element *qe;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ if (queue_empty(&he->queue))
+ return;
+ qe= (queue_element *)queue_top(&he->queue);
+ qe->thd->wakeup_ready= true;
+ qe->wakeup_reason= queue_element::TAKEOVER;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+}
+
+void
+gtid_waiting::process_wait_hash(uint64 wakeup_seq_no,
+ gtid_waiting::hash_element *he)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ for (;;)
+ {
+ queue_element *qe;
+
+ if (queue_first_element(&he->queue) > queue_last_element(&he->queue))
+ break;
+ qe= (queue_element *)queue_top(&he->queue);
+ if (qe->wait_seq_no > wakeup_seq_no)
+ break;
+ queue_remove_top(&he->queue);
+ qe->thd->wakeup_ready= true;
+ qe->wakeup_reason= queue_element::DONE;
+ mysql_cond_signal(&qe->thd->COND_wakeup_ready);
+ }
+}
+
+
+/*
+ Execute a MASTER_GTID_WAIT() for one specific domain.
+
+ The implementation is optimised primarily for (1) minimal performance impact
+ on the slave replication threads, and secondarily for (2) quick performance
+ of MASTER_GTID_WAIT() on a single GTID, which can be useful for consistent
+ read to clients in an async replication read-scaleout scenario.
+
+ To achieve (1), we have a "small" wait and a "large" wait. The small wait
+ contends with the replication threads on the lock on the gtid_slave_pos, so
+ only minimal processing is done under that lock, and only a single waiter at
+ a time does the small wait.
+
+ If there is already a small waiter, a new thread will either replace the
+ small waiter (if it needs to wait for an earlier sequence number), or
+ instead to a "large" wait.
+
+ Once awoken on the small wait, the waiting thread releases the lock shared
+ with the SQL threads quickly, and then processes all waiters currently doing
+ the large wait.
+
+ This way, the SQL threads only need to do a single check + possibly a
+ pthread_cond_signal() when updating the gtid_slave_state, and the time that
+ non-SQL threads contend for the lock on gtid_slave_staste is minimized.
+*/
+int
+gtid_waiting::wait_for_gtid(THD *thd, rpl_gtid *wait_gtid,
+ struct timespec *wait_until)
+{
+ bool timed_out= false;
+#ifdef HAVE_REPLICATION
+ queue_element elem;
+ uint32_t domain_id= wait_gtid->domain_id;
+ uint64 seq_no= wait_gtid->seq_no;
+ hash_element *he;
+ rpl_slave_state::element *slave_state_elem= NULL;
+ const char *old_msg= NULL;
+ bool did_enter_cond= false;
+ bool takeover= false;
+
+ elem.wait_seq_no= seq_no;
+ elem.thd= thd;
+ /*
+ Register on the large wait before checking the small wait.
+ This ensures that if we find another waiter already doing the small wait,
+ we are sure to be woken up by that one, and thus we will not need to take
+ the lock on the small wait more than once in this case.
+ */
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ if (!(he= register_in_wait_hash(thd, wait_gtid, &elem)))
+ {
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ return 1;
+ }
+
+ /*
+ Now check the small wait, and either do the large wait or the small one,
+ depending on whether there is already a suitable small waiter or not.
+
+ We may need to do this multiple times, as a previous small waiter may
+ complete and pass the small wait on to us.
+ */
+ for (;;)
+ {
+ uint64 wakeup_seq_no, cur_wait_seq_no;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ /*
+ The elements in the gtid_slave_state_hash are never re-allocated once
+ they enter the hash, so we do not need to re-do the lookup after releasing
+ and re-aquiring the lock.
+ */
+ if (!slave_state_elem &&
+ !(slave_state_elem= rpl_global_gtid_slave_state.get_element(domain_id)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ remove_from_wait_hash(he, &elem);
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+
+ if ((wakeup_seq_no= slave_state_elem->highest_seq_no) >= seq_no)
+ {
+ /*
+ We do not have to wait. But we might need to wakeup other threads on
+ the large wait (can happen if we were woken up to take over the small
+ wait, and SQL thread raced with us to reach the waited-for GTID.
+ */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ thd->wakeup_ready= 0;
+ process_wait_hash(wakeup_seq_no, he);
+ /*
+ Since we already checked wakeup_seq_no, we are sure that
+ process_wait_hash() will mark us done.
+ */
+ DBUG_ASSERT(thd->wakeup_ready);
+ if (thd->wakeup_ready)
+ {
+ if (takeover)
+ promote_new_waiter(he);
+ break;
+ }
+ }
+ else if ((cur_wait_seq_no= slave_state_elem->min_wait_seq_no) == 0 ||
+ cur_wait_seq_no > seq_no)
+ {
+ /*
+ We have to do the small wait ourselves (stealing it from any thread that
+ might already be waiting for a later seq_no).
+ */
+ slave_state_elem->min_wait_seq_no= seq_no;
+ if (cur_wait_seq_no != 0)
+ {
+ /* We stole the wait, so wake up the old waiting thread. */
+ mysql_cond_signal(&slave_state_elem->COND_wait_gtid);
+ }
+ /* Do the small wait. */
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+
+ old_msg= thd->enter_cond(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ "Waiting in MASTER_GTID_WAIT() (primary waiter)");
+ do
+ {
+ if (thd->check_killed())
+ slave_state_elem->min_wait_seq_no = 0;
+ else if (wait_until)
+ {
+ int err=
+ mysql_cond_timedwait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state,
+ wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ {
+ timed_out= true;
+ slave_state_elem->min_wait_seq_no = 0;
+ }
+ }
+ else
+ mysql_cond_wait(&slave_state_elem->COND_wait_gtid,
+ &rpl_global_gtid_slave_state.LOCK_slave_state);
+ } while (slave_state_elem->min_wait_seq_no == seq_no);
+ /*
+ Check the new gtid_slave_state. We could be woken up because our seq_no
+ has been reached, or because someone else stole the small wait from us.
+ (Or because of kill/timeout).
+ */
+ wakeup_seq_no= slave_state_elem->highest_seq_no;
+
+ thd->exit_cond(old_msg);
+ mysql_mutex_lock(&LOCK_gtid_waiting);
+ /*
+ Note that hash_entry pointers do not change once allocated, so we do
+ not need to lookup `he' again after re-aquiring the lock.
+ */
+ thd->wakeup_ready= 0;
+ process_wait_hash(wakeup_seq_no, he);
+ if (thd->wakeup_ready)
+ promote_new_waiter(he);
+ else if (thd->killed || timed_out)
+ {
+ remove_from_wait_hash(he, &elem);
+ promote_new_waiter(he);
+ if (thd->killed)
+ thd->send_kill_message();
+ break;
+ }
+ }
+ else
+ {
+ /* We have to do the large wait. */
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ thd->wakeup_ready= 0;
+ }
+
+ takeover= false;
+ old_msg= thd->enter_cond(&thd->COND_wakeup_ready, &LOCK_gtid_waiting,
+ "Waiting in MASTER_GTID_WAIT()");
+ while (!thd->wakeup_ready && !thd->check_killed() && !timed_out)
+ {
+ thd_wait_begin(thd, THD_WAIT_BINLOG);
+ if (wait_until)
+ {
+ int err= mysql_cond_timedwait(&thd->COND_wakeup_ready,
+ &LOCK_gtid_waiting, wait_until);
+ if (err == ETIMEDOUT || err == ETIME)
+ timed_out= true;
+ }
+ else
+ mysql_cond_wait(&thd->COND_wakeup_ready, &LOCK_gtid_waiting);
+ thd_wait_end(thd);
+ }
+
+ if (elem.wakeup_reason == queue_element::DONE)
+ break;
+ takeover= true;
+
+ if (thd->killed || timed_out)
+ {
+ remove_from_wait_hash(he, &elem);
+ /*
+ If we got kill/timeout _and_ we were asked to takeover the small wait,
+ we need to pass on that task to someone else.
+ */
+ if (thd->wakeup_ready && elem.wakeup_reason == queue_element::TAKEOVER)
+ promote_new_waiter(he);
+ if (thd->killed)
+ thd->send_kill_message();
+ break;
+ }
+ }
+
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&LOCK_gtid_waiting);
+#endif /* HAVE_REPLICATION */
+ return timed_out ? -1 : 0;
+}
+
+
+static void
+free_hash_element(void *p)
+{
+ gtid_waiting::hash_element *e= (gtid_waiting::hash_element *)p;
+ delete_queue(&e->queue);
+ my_free(e);
+}
+
+
+void
+gtid_waiting::init()
+{
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(hash_element, domain_id), sizeof(uint32), NULL,
+ free_hash_element, HASH_UNIQUE);
+ mysql_mutex_init(key_LOCK_gtid_waiting, &LOCK_gtid_waiting, 0);
+}
+
+
+void
+gtid_waiting::destroy()
+{
+ mysql_mutex_destroy(&LOCK_gtid_waiting);
+ my_hash_free(&hash);
+}
+
+
+static int
+cmp_queue_elem(void *, uchar *a, uchar *b)
+{
+ uint64 seq_no_a= *(uint64 *)a;
+ uint64 seq_no_b= *(uint64 *)b;
+ if (seq_no_a < seq_no_b)
+ return -1;
+ else if (seq_no_a == seq_no_b)
+ return 0;
+ else
+ return 1;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::get_entry(uint32 domain_id)
+{
+ hash_element *e;
+
+ if ((e= (hash_element *)my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ return e;
+
+ if (!(e= (hash_element *)my_malloc(sizeof(*e), MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*e));
+ return NULL;
+ }
+
+ if (init_queue(&e->queue, 8, offsetof(queue_element, wait_seq_no), 0,
+ cmp_queue_elem, NULL, 1+offsetof(queue_element, queue_idx), 1))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ my_free(e);
+ return NULL;
+ }
+ e->domain_id= domain_id;
+ if (my_hash_insert(&hash, (uchar *)e))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete_queue(&e->queue);
+ my_free(e);
+ return NULL;
+ }
+ return e;
+}
+
+
+gtid_waiting::hash_element *
+gtid_waiting::register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
+ gtid_waiting::queue_element *elem)
+{
+ hash_element *e;
+
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ if (!(e= get_entry(wait_gtid->domain_id)))
+ return NULL;
+
+ if (queue_insert_safe(&e->queue, (uchar *)elem))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return e;
+}
+
+
+void
+gtid_waiting::remove_from_wait_hash(gtid_waiting::hash_element *e,
+ gtid_waiting::queue_element *elem)
+{
+ mysql_mutex_assert_owner(&LOCK_gtid_waiting);
+
+ queue_remove(&e->queue, elem->queue_idx);
+}
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index b0bc54900e7..1cf57c45018 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -16,6 +16,10 @@
#ifndef RPL_GTID_H
#define RPL_GTID_H
+#include "hash.h"
+#include "queues.h"
+
+
/* Definitions for MariaDB global transaction ID (GTID). */
@@ -61,6 +65,15 @@ struct rpl_slave_state
{
struct list_element *list;
uint32 domain_id;
+ /* Highest seq_no seen so far in this domain. */
+ uint64 highest_seq_no;
+ /*
+ If min_wait_seq_no is non-zero, then it is the smallest seq_no in this
+ domain that someone is doing MASTER_GTID_WAIT() on. When we reach this
+ seq_no, we need to signal the waiter on COND_wait_gtid.
+ */
+ uint64 min_wait_seq_no;
+ mysql_cond_t COND_wait_gtid;
list_element *grab_list() { list_element *l= list; list= NULL; return l; }
void add(list_element *l)
@@ -99,9 +112,6 @@ struct rpl_slave_state
bool in_statement);
bool is_empty();
- void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); }
- void unlock() { DBUG_ASSERT(inited); mysql_mutex_unlock(&LOCK_slave_state); }
-
element *get_element(uint32 domain_id);
int put_back_list(uint32 domain_id, list_element *list);
@@ -204,6 +214,49 @@ struct slave_connection_state
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
};
+
+/*
+ Structure to keep track of threads waiting in MASTER_GTID_WAIT().
+
+ Since replication is (mostly) single-threaded, we want to minimise the
+ performance impact on that from MASTER_GTID_WAIT(). To achieve this, we
+ are careful to keep the common lock between replication threads and
+ MASTER_GTID_WAIT threads held for as short as possible. We keep only
+ a single thread waiting to be notified by the replication threads; this
+ thread then handles all the (potentially heavy) lifting of dealing with
+ all current waiting threads.
+*/
+
+struct gtid_waiting {
+ /* Elements in the hash, basically a priority queue for each domain. */
+ struct hash_element {
+ QUEUE queue;
+ uint32 domain_id;
+ };
+ /* A priority queue to handle waiters in one domain in seq_no order. */
+ struct queue_element {
+ uint64 wait_seq_no;
+ THD *thd;
+ int queue_idx;
+ enum { DONE, TAKEOVER } wakeup_reason;
+ };
+
+ mysql_mutex_t LOCK_gtid_waiting;
+ HASH hash;
+
+ void init();
+ void destroy();
+ hash_element *get_entry(uint32 domain_id);
+ int wait_for_pos(THD *thd, String *gtid_str, longlong timeout_us);
+ void promote_new_waiter(gtid_waiting::hash_element *he);
+ int wait_for_gtid(THD *thd, rpl_gtid *wait_gtid, struct timespec *wait_until);
+ void process_wait_hash(uint64 wakeup_seq_no, gtid_waiting::hash_element *he);
+ hash_element *register_in_wait_hash(THD *thd, rpl_gtid *wait_gtid,
+ queue_element *elem);
+ void remove_from_wait_hash(hash_element *e, queue_element *elem);
+};
+
+
extern bool rpl_slave_state_tostring_helper(String *dest, const rpl_gtid *gtid,
bool *first);
extern int gtid_check_rpl_slave_state_table(TABLE *table);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index cfa7c0f344f..8384297624c 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -37,6 +37,8 @@ static int count_relay_log_space(Relay_log_info* rli);
domain).
*/
rpl_slave_state rpl_global_gtid_slave_state;
+/* Object used for MASTER_GTID_WAIT(). */
+gtid_waiting rpl_global_gtid_waiting;
// Defined in slave.cc
@@ -1312,9 +1314,9 @@ rpl_load_gtid_slave_state(THD *thd)
uint32 i;
DBUG_ENTER("rpl_load_gtid_slave_state");
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
bool loaded= rpl_global_gtid_slave_state.loaded;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (loaded)
DBUG_RETURN(0);
@@ -1414,10 +1416,10 @@ rpl_load_gtid_slave_state(THD *thd)
}
}
- rpl_global_gtid_slave_state.lock();
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
if (rpl_global_gtid_slave_state.loaded)
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
goto end;
}
@@ -1429,7 +1431,7 @@ rpl_load_gtid_slave_state(THD *thd)
tmp_entry.sub_id,
tmp_entry.gtid.seq_no)))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
@@ -1442,14 +1444,14 @@ rpl_load_gtid_slave_state(THD *thd)
mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
entry->gtid.seq_no))
{
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
my_error(ER_OUT_OF_RESOURCES, MYF(0));
goto end;
}
}
rpl_global_gtid_slave_state.loaded= true;
- rpl_global_gtid_slave_state.unlock();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
err= 0; /* Clear HA_ERR_END_OF_FILE */
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index ff2ffd0b366..ef18c7e7f9f 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -702,6 +702,7 @@ int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
extern struct rpl_slave_state rpl_global_gtid_slave_state;
+extern gtid_waiting rpl_global_gtid_waiting;
int rpl_load_gtid_slave_state(THD *thd);
int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev);
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index b74ffecf82e..db61983ef29 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6547,7 +6547,7 @@ ER_UNTIL_REQUIRES_USING_GTID
ER_GTID_STRICT_OUT_OF_ORDER
eng "An attempt was made to binlog GTID %u-%u-%llu which would create an out-of-order sequence number with existing GTID %u-%u-%llu, and gtid strict mode is enabled."
ER_GTID_START_FROM_BINLOG_HOLE
- eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though both a prior and a subsequent sequence number does exist), and GTID strict mode is enabled"
+ eng "The binlog on the master is missing the GTID %u-%u-%llu requested by the slave (even though a subsequent sequence number does exist), and GTID strict mode is enabled"
ER_SLAVE_UNEXPECTED_MASTER_SWITCH
eng "Unexpected GTID received from master after reconnect. This normally indicates that the master server was replaced without restarting the slave threads. %s"
ER_INSIDE_TRANSACTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 8abdd53469f..b5201371e15 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1266,6 +1266,7 @@ void THD::init(void)
set_status_var_init();
bzero((char *) &org_status_var, sizeof(org_status_var));
start_bytes_received= 0;
+ last_commit_gtid.seq_no= 0;
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index a3fc3a7866f..c1cfd8b4d5b 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -44,6 +44,7 @@
#include "thr_lock.h" /* thr_lock_type, THR_LOCK_DATA,
THR_LOCK_INFO */
#include "my_apc.h"
+#include "rpl_gtid.h"
class Reprepare_observer;
class Relay_log_info;
@@ -3405,6 +3406,12 @@ private:
*/
LEX_STRING invoker_user;
LEX_STRING invoker_host;
+
+ /* Protect against add/delete of temporary tables in parallel replication */
+ void rgi_lock_temporary_tables();
+ void rgi_unlock_temporary_tables();
+ bool rgi_have_temporary_tables();
+public:
/*
Flag, mutex and condition for a thread to wait for a signal from another
thread.
@@ -3415,12 +3422,12 @@ private:
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
mysql_cond_t COND_wakeup_ready;
+ /*
+ The GTID assigned to the last commit. If no GTID was assigned to any commit
+ so far, this is indicated by last_commit_gtid.seq_no == 0.
+ */
+ rpl_gtid last_commit_gtid;
- /* Protect against add/delete of temporary tables in parallel replication */
- void rgi_lock_temporary_tables();
- void rgi_unlock_temporary_tables();
- bool rgi_have_temporary_tables();
-public:
inline void lock_temporary_tables()
{
if (rgi_slave)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 363b53e05a6..7c4e5b1b383 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -3965,6 +3965,20 @@ rpl_deinit_gtid_slave_state()
}
+void
+rpl_init_gtid_waiting()
+{
+ rpl_global_gtid_waiting.init();
+}
+
+
+void
+rpl_deinit_gtid_waiting()
+{
+ rpl_global_gtid_waiting.destroy();
+}
+
+
/*
Format the current GTID state as a string, for returning the value of
@@global.gtid_slave_pos.
diff --git a/sql/sql_repl.h b/sql/sql_repl.h
index da55e3e863f..defb1b23f5b 100644
--- a/sql/sql_repl.h
+++ b/sql/sql_repl.h
@@ -70,6 +70,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
void rpl_init_gtid_slave_state();
void rpl_deinit_gtid_slave_state();
+void rpl_init_gtid_waiting();
+void rpl_deinit_gtid_waiting();
int gtid_state_from_binlog_pos(const char *name, uint32 pos, String *out_str);
int rpl_append_gtid_state(String *dest, bool use_binlog);
int rpl_load_gtid_state(slave_connection_state *state, bool use_binlog);
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 08b4953b2e4..cd24ad38eb2 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1538,6 +1538,33 @@ static Sys_var_gtid_binlog_state Sys_gtid_binlog_state(
GLOBAL_VAR(opt_gtid_binlog_state_dummy), NO_CMD_LINE);
+static Sys_var_last_gtid Sys_last_gtid(
+ "last_gtid", "The GTID of the last commit (if binlogging was enabled), "
+ "or the empty string if none.",
+ READ_ONLY sys_var::ONLY_SESSION, NO_CMD_LINE);
+
+
+uchar *
+Sys_var_last_gtid::session_value_ptr(THD *thd, LEX_STRING *base)
+{
+ char buf[10+1+10+1+20+1];
+ String str(buf, sizeof(buf), system_charset_info);
+ char *p;
+ bool first= true;
+
+ str.length(0);
+ if ((thd->last_commit_gtid.seq_no > 0 &&
+ rpl_slave_state_tostring_helper(&str, &thd->last_commit_gtid, &first)) ||
+ !(p= thd->strmake(str.ptr(), str.length())))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return NULL;
+ }
+
+ return (uchar *)p;
+}
+
+
static bool
check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
{
diff --git a/sql/sys_vars.h b/sql/sys_vars.h
index 3cc4da32811..6a84fc5fbc2 100644
--- a/sql/sys_vars.h
+++ b/sql/sys_vars.h
@@ -2211,3 +2211,53 @@ public:
}
uchar *global_value_ptr(THD *thd, LEX_STRING *base);
};
+
+
+/**
+ Class for @@session.last_gtid.
+*/
+class Sys_var_last_gtid: public sys_var
+{
+public:
+ Sys_var_last_gtid(const char *name_arg,
+ const char *comment, int flag_args, CMD_LINE getopt)
+ : sys_var(&all_sys_vars, name_arg, comment, flag_args, 0, getopt.id,
+ getopt.arg_type, SHOW_CHAR, 0, NULL, VARIABLE_NOT_IN_BINLOG,
+ NULL, NULL, NULL)
+ {
+ option.var_type= GET_STR;
+ }
+ bool do_check(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool session_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool global_update(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ return true;
+ }
+ bool check_update_type(Item_result type) {
+ DBUG_ASSERT(false);
+ return false;
+ }
+ void session_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ void global_save_default(THD *thd, set_var *var)
+ {
+ DBUG_ASSERT(false);
+ }
+ uchar *session_value_ptr(THD *thd, LEX_STRING *base);
+ uchar *global_value_ptr(THD *thd, LEX_STRING *base)
+ {
+ DBUG_ASSERT(false);
+ return NULL;
+ }
+};