summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-01-07 11:57:03 +0100
committerunknown <knielsen@knielsen-hq.org>2014-01-07 11:57:03 +0100
commit90a76ff2fabc3deca2a4514aba5e838518dd6967 (patch)
treeeebc46185102e71138541893b28c65755428f48d
parente1b2de5f0b55badf3fe6d45db1d9306a4a33cf18 (diff)
parentcee925185186b02ec89bf988918fe958053da139 (diff)
downloadmariadb-git-90a76ff2fabc3deca2a4514aba5e838518dd6967.tar.gz
Merge into 10.0-base: MDEV-5363, make parallel replication waits killable.
There are some places in parallel replication where transactions wait for one another. Make sure those waits are killable by the (super)user. Upon kill, unfinished transactions will be rolled back and the SQL thread stops with an error. Add a number of test cases to test the different cases. Also fix some existing bugs found by those tests.
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result411
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel_no_log_slave_updates.result122
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test677
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_no_log_slave_updates-slave.opt1
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_no_log_slave_updates.test199
-rw-r--r--sql/handler.cc18
-rw-r--r--sql/log.cc166
-rw-r--r--sql/log.h2
-rw-r--r--sql/log_event.cc2
-rw-r--r--sql/rpl_parallel.cc96
-rw-r--r--sql/rpl_rli.cc6
-rw-r--r--sql/slave.cc11
-rw-r--r--sql/sql_class.cc60
-rw-r--r--sql/sql_class.h32
14 files changed, 1710 insertions, 93 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 03b102a1af9..7a9fd69b2fb 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -7,6 +7,7 @@ SET GLOBAL slave_parallel_threads=10;
CHANGE MASTER TO master_use_gtid=slave_pos;
include/start_slave.inc
*** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
@@ -259,6 +260,416 @@ SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
include/start_slave.inc
+*** Test killing slave threads at various wait points ***
+*** 1. Test killing transaction waiting in commit for previous transaction to commit ***
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (31, foo(31,
+'commit_before_prepare_ordered WAIT_FOR t2_waiting',
+'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
+SET debug_sync='now WAIT_FOR master_queued1';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+INSERT INTO t3 VALUES (32, foo(32,
+'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+''));
+INSERT INTO t3 VALUES (33, foo(33,
+'group_commit_waiting_for_prior SIGNAL t2_waiting',
+'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
+COMMIT;
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (34, foo(34,
+'',
+''));
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+32 32
+33 33
+34 34
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Query execution was interrupted");
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+CALL mtr.add_suppression("Slave: Connection was killed");
+SET sql_log_bin=1;
+SET debug_sync='now WAIT_FOR t2_query';
+SET debug_sync='now SIGNAL t2_cont';
+SET debug_sync='now WAIT_FOR t1_ready';
+KILL THD_ID;
+SET debug_sync='now WAIT_FOR t2_killed';
+SET debug_sync='now SIGNAL t1_cont';
+include/wait_for_slave_sql_error.inc [errno=1317,1963]
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+INSERT INTO t3 VALUES (39,0);
+include/start_slave.inc
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+32 32
+33 33
+34 34
+39 0
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
+*** 2. Same as (1), but without restarting IO thread after kill of SQL threads ***
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (41, foo(41,
+'commit_before_prepare_ordered WAIT_FOR t2_waiting',
+'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
+SET debug_sync='now WAIT_FOR master_queued1';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+INSERT INTO t3 VALUES (42, foo(42,
+'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+''));
+INSERT INTO t3 VALUES (43, foo(43,
+'group_commit_waiting_for_prior SIGNAL t2_waiting',
+'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
+COMMIT;
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (44, foo(44,
+'',
+''));
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+a b
+41 41
+42 42
+43 43
+44 44
+SET debug_sync='now WAIT_FOR t2_query';
+SET debug_sync='now SIGNAL t2_cont';
+SET debug_sync='now WAIT_FOR t1_ready';
+KILL THD_ID;
+SET debug_sync='now WAIT_FOR t2_killed';
+SET debug_sync='now SIGNAL t1_cont';
+include/wait_for_slave_sql_error.inc [errno=1317,1963]
+SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+a b
+41 41
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+INSERT INTO t3 VALUES (49,0);
+START SLAVE SQL_THREAD;
+SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+a b
+41 41
+42 42
+43 43
+44 44
+49 0
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
+*** 3. Same as (2), but not using gtid mode ***
+include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=no;
+include/start_slave.inc
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (51, foo(51,
+'commit_before_prepare_ordered WAIT_FOR t2_waiting',
+'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
+SET debug_sync='now WAIT_FOR master_queued1';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+INSERT INTO t3 VALUES (52, foo(52,
+'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+''));
+INSERT INTO t3 VALUES (53, foo(53,
+'group_commit_waiting_for_prior SIGNAL t2_waiting',
+'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
+COMMIT;
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (54, foo(54,
+'',
+''));
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+a b
+51 51
+52 52
+53 53
+54 54
+SET debug_sync='now WAIT_FOR t2_query';
+SET debug_sync='now SIGNAL t2_cont';
+SET debug_sync='now WAIT_FOR t1_ready';
+KILL THD_ID;
+SET debug_sync='now WAIT_FOR t2_killed';
+SET debug_sync='now SIGNAL t1_cont';
+include/wait_for_slave_sql_error.inc [errno=1317,1963]
+SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+a b
+51 51
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+INSERT INTO t3 VALUES (59,0);
+START SLAVE SQL_THREAD;
+SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+a b
+51 51
+52 52
+53 53
+54 54
+59 0
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=slave_pos;
+include/start_slave.inc
+include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=3;
+include/start_slave.inc
+*** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
+SET binlog_format=statement;
+SET gtid_domain_id=2;
+INSERT INTO t3 VALUES (60, foo(60,
+'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
+'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+SET gtid_domain_id=0;
+SET debug_sync='now WAIT_FOR d2_query';
+SET gtid_domain_id=1;
+BEGIN;
+INSERT INTO t3 VALUES (61, foo(61,
+'rpl_parallel_start_waiting_for_prior SIGNAL t3_waiting',
+'rpl_parallel_start_waiting_for_prior_killed SIGNAL t3_killed'));
+INSERT INTO t3 VALUES (62, foo(62,
+'ha_write_row_end SIGNAL d1_query WAIT_FOR d1_cont2',
+'rpl_parallel_end_of_group SIGNAL d1_done WAIT_FOR d1_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+SET debug_sync='now WAIT_FOR d1_query';
+SET gtid_domain_id=0;
+INSERT INTO t3 VALUES (63, foo(63,
+'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
+'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
+SET debug_sync='now WAIT_FOR d0_query';
+SET debug_sync='now SIGNAL d2_cont2';
+SET debug_sync='now WAIT_FOR d2_done';
+SET debug_sync='now SIGNAL d1_cont2';
+SET debug_sync='now WAIT_FOR d1_done';
+SET debug_sync='now SIGNAL d0_cont2';
+SET debug_sync='now WAIT_FOR d0_done';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (64, foo(64,
+'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
+INSERT INTO t3 VALUES (65, foo(65, '', ''));
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+INSERT INTO t3 VALUES (66, foo(66, '', ''));
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued4';
+INSERT INTO t3 VALUES (67, foo(67, '', ''));
+SET debug_sync='now WAIT_FOR master_queued4';
+SET debug_sync='now SIGNAL master_cont2';
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+a b
+60 60
+61 61
+62 62
+63 63
+64 64
+65 65
+66 66
+67 67
+SET debug_sync='now SIGNAL d0_cont';
+SET debug_sync='now WAIT_FOR t1_waiting';
+SET debug_sync='now SIGNAL d1_cont';
+SET debug_sync='now WAIT_FOR t3_waiting';
+SET debug_sync='now SIGNAL d2_cont';
+KILL THD_ID;
+SET debug_sync='now WAIT_FOR t3_killed';
+SET debug_sync='now SIGNAL t1_cont';
+include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+a b
+60 60
+61 61
+62 62
+63 63
+64 64
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+INSERT INTO t3 VALUES (69,0);
+include/start_slave.inc
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+a b
+60 60
+61 61
+62 62
+63 63
+64 64
+65 65
+66 66
+67 67
+69 0
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
+*** 5. Test killing thread that is waiting for queue of max length to shorten ***
+SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
+SET GLOBAL slave_parallel_max_queued=9000;
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (70, foo(0,
+'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
+SET debug_sync='now WAIT_FOR query_waiting';
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
+INSERT INTO t3 VALUES (72, 0);
+SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+a b
+70 0
+71 10000
+72 0
+SET debug_sync='now WAIT_FOR wait_queue_ready';
+KILL THD_ID;
+SET debug_sync='now WAIT_FOR wait_queue_killed';
+SET debug_sync='now SIGNAL query_cont';
+include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+a b
+70 0
+71 10000
+SET GLOBAL debug_dbug=@old_dbug;
+SET GLOBAL slave_parallel_max_queued= @old_max_queued;
+INSERT INTO t3 VALUES (73,0);
+include/start_slave.inc
+SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+a b
+70 0
+71 10000
+72 0
+73 0
+include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_no_log_slave_updates.result b/mysql-test/suite/rpl/r/rpl_parallel_no_log_slave_updates.result
new file mode 100644
index 00000000000..067d92a962f
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_parallel_no_log_slave_updates.result
@@ -0,0 +1,122 @@
+include/rpl_init.inc [topology=1->2]
+*** Test killing transaction waiting in commit for previous transaction to commit, when not using 2-phase commit ***
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=slave_pos;
+include/start_slave.inc
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+SET sql_log_bin=0;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+SET sql_log_bin=0;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (31, foo(31,
+'ha_commit_one_phase WAIT_FOR t2_waiting',
+'commit_one_phase_2 SIGNAL t1_ready WAIT_FOR t1_cont'));
+SET debug_sync='now WAIT_FOR master_queued1';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+INSERT INTO t3 VALUES (32, foo(32,
+'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+''));
+INSERT INTO t3 VALUES (33, foo(33,
+'wait_for_prior_commit_waiting SIGNAL t2_waiting',
+'wait_for_prior_commit_killed SIGNAL t2_killed'));
+COMMIT;
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (34, foo(34,
+'',
+''));
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+32 32
+33 33
+34 34
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Query execution was interrupted");
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+CALL mtr.add_suppression("Slave: Connection was killed");
+SET sql_log_bin=1;
+SET debug_sync='now WAIT_FOR t2_query';
+SET debug_sync='now SIGNAL t2_cont';
+SET debug_sync='now WAIT_FOR t1_ready';
+KILL THD_ID;
+SET debug_sync='now WAIT_FOR t2_killed';
+SET debug_sync='now SIGNAL t1_cont';
+include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+INSERT INTO t3 VALUES (39,0);
+include/start_slave.inc
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+a b
+31 31
+32 32
+33 33
+34 34
+39 0
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+RETURNS INT DETERMINISTIC
+BEGIN
+IF d1 != '' THEN
+SET debug_sync = d1;
+END IF;
+IF d2 != '' THEN
+SET debug_sync = d2;
+END IF;
+RETURN x;
+END
+||
+SET sql_log_bin=1;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+DROP function foo;
+DROP TABLE t3;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index 5709cab19c0..a0232ac71e0 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -19,6 +19,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos;
--echo *** Test long-running query in domain 1 can run in parallel with short queries in domain 0 ***
--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
INSERT INTO t1 VALUES (1);
@@ -334,6 +335,682 @@ SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
--connection server_2
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+--source include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+
+--echo *** Test killing slave threads at various wait points ***
+--echo *** 1. Test killing transaction waiting in commit for previous transaction to commit ***
+
+# Set up three transactions on the master that will be group-committed
+# together so they can be replicated in parallel on the slave.
+--connection con_temp3
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (31, foo(31,
+ 'commit_before_prepare_ordered WAIT_FOR t2_waiting',
+ 'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued1';
+
+--connection con_temp4
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+# This insert is just so we can get T2 to wait while a query is running that we
+# can see in SHOW PROCESSLIST so we can get its thread_id to kill later.
+INSERT INTO t3 VALUES (32, foo(32,
+ 'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+ ''));
+# This insert sets up debug_sync points so that T2 will tell when it is at its
+# wait point where we want to kill it - and when it has been killed.
+INSERT INTO t3 VALUES (33, foo(33,
+ 'group_commit_waiting_for_prior SIGNAL t2_waiting',
+ 'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
+send COMMIT;
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+
+--connection con_temp5
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (34, foo(34,
+ '',
+ ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+
+--connection con_temp3
+REAP;
+--connection con_temp4
+REAP;
+--connection con_temp5
+REAP;
+
+--connection server_1
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+
+--connection server_2
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Query execution was interrupted");
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+CALL mtr.add_suppression("Slave: Connection was killed");
+SET sql_log_bin=1;
+# Wait until T2 is inside executing its insert of 32, then find it in SHOW
+# PROCESSLIST to know its thread id for KILL later.
+SET debug_sync='now WAIT_FOR t2_query';
+--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(32%' AND INFO NOT LIKE '%LIKE%'`
+SET debug_sync='now SIGNAL t2_cont';
+
+# Wait until T2 has entered its wait for T1 to commit, and T1 has
+# progressed into its commit phase.
+SET debug_sync='now WAIT_FOR t1_ready';
+
+# Now kill the transaction T2.
+--replace_result $thd_id THD_ID
+eval KILL $thd_id;
+
+# Wait until T2 has reacted on the kill.
+SET debug_sync='now WAIT_FOR t2_killed';
+
+# Now we can allow T1 to proceed.
+SET debug_sync='now SIGNAL t1_cont';
+
+--let $slave_sql_errno= 1317,1963
+--source include/wait_for_slave_sql_error.inc
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+
+# Now we have to disable the debug_sync statements, so they do not trigger
+# when the events are retried.
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_1
+INSERT INTO t3 VALUES (39,0);
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+# Restore the foo() function.
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ IF d1 != '' THEN
+ SET debug_sync = d1;
+ END IF;
+ IF d2 != '' THEN
+ SET debug_sync = d2;
+ END IF;
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+
+--connection server_2
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+--source include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+
+--echo *** 2. Same as (1), but without restarting IO thread after kill of SQL threads ***
+
+# Set up three transactions on the master that will be group-committed
+# together so they can be replicated in parallel on the slave.
+--connection con_temp3
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (41, foo(41,
+ 'commit_before_prepare_ordered WAIT_FOR t2_waiting',
+ 'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued1';
+
+--connection con_temp4
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+# This insert is just so we can get T2 to wait while a query is running that we
+# can see in SHOW PROCESSLIST so we can get its thread_id to kill later.
+INSERT INTO t3 VALUES (42, foo(42,
+ 'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+ ''));
+# This insert sets up debug_sync points so that T2 will tell when it is at its
+# wait point where we want to kill it - and when it has been killed.
+INSERT INTO t3 VALUES (43, foo(43,
+ 'group_commit_waiting_for_prior SIGNAL t2_waiting',
+ 'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
+send COMMIT;
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+
+--connection con_temp5
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (44, foo(44,
+ '',
+ ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+
+--connection con_temp3
+REAP;
+--connection con_temp4
+REAP;
+--connection con_temp5
+REAP;
+
+--connection server_1
+SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+
+--connection server_2
+# Wait until T2 is inside executing its insert of 42, then find it in SHOW
+# PROCESSLIST to know its thread id for KILL later.
+SET debug_sync='now WAIT_FOR t2_query';
+--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(42%' AND INFO NOT LIKE '%LIKE%'`
+SET debug_sync='now SIGNAL t2_cont';
+
+# Wait until T2 has entered its wait for T1 to commit, and T1 has
+# progressed into its commit phase.
+SET debug_sync='now WAIT_FOR t1_ready';
+
+# Now kill the transaction T2.
+--replace_result $thd_id THD_ID
+eval KILL $thd_id;
+
+# Wait until T2 has reacted on the kill.
+SET debug_sync='now WAIT_FOR t2_killed';
+
+# Now we can allow T1 to proceed.
+SET debug_sync='now SIGNAL t1_cont';
+
+--let $slave_sql_errno= 1317,1963
+--source include/wait_for_slave_sql_error.inc
+SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+
+# Now we have to disable the debug_sync statements, so they do not trigger
+# when the events are retried.
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_1
+INSERT INTO t3 VALUES (49,0);
+--save_master_pos
+
+--connection server_2
+START SLAVE SQL_THREAD;
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+# Restore the foo() function.
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ IF d1 != '' THEN
+ SET debug_sync = d1;
+ END IF;
+ IF d2 != '' THEN
+ SET debug_sync = d2;
+ END IF;
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+
+--connection server_2
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+--source include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+
+--echo *** 3. Same as (2), but not using gtid mode ***
+
+--connection server_2
+--source include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=no;
+--source include/start_slave.inc
+
+--connection server_1
+# Set up three transactions on the master that will be group-committed
+# together so they can be replicated in parallel on the slave.
+--connection con_temp3
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (51, foo(51,
+ 'commit_before_prepare_ordered WAIT_FOR t2_waiting',
+ 'commit_after_prepare_ordered SIGNAL t1_ready WAIT_FOR t1_cont'));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued1';
+
+--connection con_temp4
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+# This insert is just so we can get T2 to wait while a query is running that we
+# can see in SHOW PROCESSLIST so we can get its thread_id to kill later.
+INSERT INTO t3 VALUES (52, foo(52,
+ 'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+ ''));
+# This insert sets up debug_sync points so that T2 will tell when it is at its
+# wait point where we want to kill it - and when it has been killed.
+INSERT INTO t3 VALUES (53, foo(53,
+ 'group_commit_waiting_for_prior SIGNAL t2_waiting',
+ 'group_commit_waiting_for_prior_killed SIGNAL t2_killed'));
+send COMMIT;
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+
+--connection con_temp5
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (54, foo(54,
+ '',
+ ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+
+--connection con_temp3
+REAP;
+--connection con_temp4
+REAP;
+--connection con_temp5
+REAP;
+
+--connection server_1
+SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+
+--connection server_2
+# Wait until T2 is inside executing its insert of 52, then find it in SHOW
+# PROCESSLIST to know its thread id for KILL later.
+SET debug_sync='now WAIT_FOR t2_query';
+--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(52%' AND INFO NOT LIKE '%LIKE%'`
+SET debug_sync='now SIGNAL t2_cont';
+
+# Wait until T2 has entered its wait for T1 to commit, and T1 has
+# progressed into its commit phase.
+SET debug_sync='now WAIT_FOR t1_ready';
+
+# Now kill the transaction T2.
+--replace_result $thd_id THD_ID
+eval KILL $thd_id;
+
+# Wait until T2 has reacted on the kill.
+SET debug_sync='now WAIT_FOR t2_killed';
+
+# Now we can allow T1 to proceed.
+SET debug_sync='now SIGNAL t1_cont';
+
+--let $slave_sql_errno= 1317,1963
+--source include/wait_for_slave_sql_error.inc
+SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+
+# Now we have to disable the debug_sync statements, so they do not trigger
+# when the events are retried.
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_1
+INSERT INTO t3 VALUES (59,0);
+--save_master_pos
+
+--connection server_2
+START SLAVE SQL_THREAD;
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+# Restore the foo() function.
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ IF d1 != '' THEN
+ SET debug_sync = d1;
+ END IF;
+ IF d2 != '' THEN
+ SET debug_sync = d2;
+ END IF;
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+
+--source include/stop_slave.inc
+CHANGE MASTER TO master_use_gtid=slave_pos;
+--source include/start_slave.inc
+
+--connection server_2
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+--source include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=3;
+--source include/start_slave.inc
+
+
+--echo *** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
+
+# We set up four transactions T1, T2, T3, and T4 on the master. T2, T3, and T4
+# can run in parallel with each other (same group commit and commit id),
+# but not in parallel with T1.
+#
+# We use three worker threads. T1 and T2 will be queued on the first, T3 on
+# the second, and T4 on the third. We will delay T1 commit, T3 will wait for
+# T1 to commit before it can start. We will kill T3 during this wait, and
+# check that everything works correctly.
+#
+# It is rather tricky to get the correct thread id of the worker to kill.
+# We start by injecting three dummy transactions in a debug_sync-controlled
+# manner to be able to get known thread ids for the workers in a pool with
+# just 3 worker threads. Then we let in each of the real test transactions
+# T1-T4 one at a time in a way which allows us to know which transaction
+# ends up with which thread id.
+
+--connection server_1
+SET binlog_format=statement;
+SET gtid_domain_id=2;
+INSERT INTO t3 VALUES (60, foo(60,
+ 'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
+ 'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+SET gtid_domain_id=0;
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d2_query';
+--let $d2_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(60%' AND INFO NOT LIKE '%LIKE%'`
+
+--connection server_1
+SET gtid_domain_id=1;
+BEGIN;
+# These debug_sync's will linger on and be used to control T3 later.
+INSERT INTO t3 VALUES (61, foo(61,
+ 'rpl_parallel_start_waiting_for_prior SIGNAL t3_waiting',
+ 'rpl_parallel_start_waiting_for_prior_killed SIGNAL t3_killed'));
+INSERT INTO t3 VALUES (62, foo(62,
+ 'ha_write_row_end SIGNAL d1_query WAIT_FOR d1_cont2',
+ 'rpl_parallel_end_of_group SIGNAL d1_done WAIT_FOR d1_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d1_query';
+--let $d1_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(62%' AND INFO NOT LIKE '%LIKE%'`
+
+--connection server_1
+SET gtid_domain_id=0;
+INSERT INTO t3 VALUES (63, foo(63,
+ 'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
+ 'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d0_query';
+--let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'`
+
+SET debug_sync='now SIGNAL d2_cont2';
+SET debug_sync='now WAIT_FOR d2_done';
+SET debug_sync='now SIGNAL d1_cont2';
+SET debug_sync='now WAIT_FOR d1_done';
+SET debug_sync='now SIGNAL d0_cont2';
+SET debug_sync='now WAIT_FOR d0_done';
+
+# Now prepare the real transactions T1, T2, T3, T4 on the master.
+
+--connection con_temp3
+# Create transaction T1.
+SET binlog_format=statement;
+INSERT INTO t3 VALUES (64, foo(64,
+ 'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+
+# Create transaction T2, as a group commit leader on the master.
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
+send INSERT INTO t3 VALUES (65, foo(65, '', ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+
+--connection con_temp4
+# Create transaction T3, participating in T2's group commit.
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+send INSERT INTO t3 VALUES (66, foo(66, '', ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued3';
+
+--connection con_temp5
+# Create transaction T4, participating in group commit with T2 and T3.
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued4';
+send INSERT INTO t3 VALUES (67, foo(67, '', ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued4';
+SET debug_sync='now SIGNAL master_cont2';
+
+--connection con_temp3
+REAP;
+--connection con_temp4
+REAP;
+--connection con_temp5
+REAP;
+
+--connection server_1
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+
+--connection server_2
+# Now we have the four transactions pending for replication on the slave.
+# Let them be queued for our three worker threads in a controlled fashion.
+# We put them at a stage where T1 is delayed and T3 is waiting for T1 to
+# commit before T3 can start. Then we kill T3.
+
+# Make the worker D0 free, and wait for T1 to be queued in it.
+SET debug_sync='now SIGNAL d0_cont';
+SET debug_sync='now WAIT_FOR t1_waiting';
+
+# T2 will be queued on the same worker D0 as T1.
+# Now release worker D1, and wait for T3 to be queued in it.
+# T3 will wait for T1 to commit before it can start.
+SET debug_sync='now SIGNAL d1_cont';
+SET debug_sync='now WAIT_FOR t3_waiting';
+
+# Release worker D2. T4 may or may not have time to be queued on it, but
+# it will not be able to complete due to T3 being killed.
+SET debug_sync='now SIGNAL d2_cont';
+
+# Now we kill the waiting transaction T3 in worker D1.
+--replace_result $d1_thd_id THD_ID
+eval KILL $d1_thd_id;
+
+# Wait until T3 has reacted on the kill.
+SET debug_sync='now WAIT_FOR t3_killed';
+
+# Now we can allow T1 to proceed.
+SET debug_sync='now SIGNAL t1_cont';
+
+--let $slave_sql_errno= 1317,1927,1963
+--source include/wait_for_slave_sql_error.inc
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+
+# Now we have to disable the debug_sync statements, so they do not trigger
+# when the events are retried.
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_1
+INSERT INTO t3 VALUES (69,0);
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+# Restore the foo() function.
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ IF d1 != '' THEN
+ SET debug_sync = d1;
+ END IF;
+ IF d2 != '' THEN
+ SET debug_sync = d2;
+ END IF;
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_2
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+--source include/stop_slave.inc
+SET GLOBAL binlog_format=@old_format;
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+
+--echo *** 5. Test killing thread that is waiting for queue of max length to shorten ***
+
+--let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'
+--source include/wait_condition.inc
+--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'`
+SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
+SET GLOBAL slave_parallel_max_queued=9000;
+
+--connection server_1
+--let bigstring= `SELECT REPEAT('x', 10000)`
+SET binlog_format=statement;
+# Create an event that will wait to be signalled.
+INSERT INTO t3 VALUES (70, foo(0,
+ 'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
+--disable_query_log
+# Create an event that will fill up the queue.
+eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring'));
+--enable_query_log
+
+--connection server_2
+SET debug_sync='now WAIT_FOR query_waiting';
+# Inject that the SQL driver thread will signal `wait_queue_ready' to debug_sync
+# as it goes to wait for the event queue to become smaller than the value of
+# @@slave_parallel_max_queued.
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
+
+--connection server_1
+# This event will have to wait for the queue to become shorter before it can
+# be queued. We will test that things work when we kill the SQL driver thread
+# during this wait.
+INSERT INTO t3 VALUES (72, 0);
+SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+
+--connection server_2
+SET debug_sync='now WAIT_FOR wait_queue_ready';
+
+--replace_result $thd_id THD_ID
+eval KILL $thd_id;
+
+SET debug_sync='now WAIT_FOR wait_queue_killed';
+SET debug_sync='now SIGNAL query_cont';
+
+--let $slave_sql_errno= 1317,1927,1963
+--source include/wait_for_slave_sql_error.inc
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+
+SET GLOBAL debug_dbug=@old_dbug;
+SET GLOBAL slave_parallel_max_queued= @old_max_queued;
+
+--connection server_1
+INSERT INTO t3 VALUES (73,0);
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+
+
+--connection server_2
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_no_log_slave_updates-slave.opt b/mysql-test/suite/rpl/t/rpl_parallel_no_log_slave_updates-slave.opt
new file mode 100644
index 00000000000..acd68493e0a
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_parallel_no_log_slave_updates-slave.opt
@@ -0,0 +1 @@
+--log-slave-updates=0
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_no_log_slave_updates.test b/mysql-test/suite/rpl/t/rpl_parallel_no_log_slave_updates.test
new file mode 100644
index 00000000000..98f919e4727
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_parallel_no_log_slave_updates.test
@@ -0,0 +1,199 @@
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--source include/have_binlog_format_statement.inc
+--let $rpl_topology=1->2
+--source include/rpl_init.inc
+
+--echo *** Test killing transaction waiting in commit for previous transaction to commit, when not using 2-phase commit ***
+
+--connection server_2
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=slave_pos;
+--source include/start_slave.inc
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+# Use a stored function to inject a debug_sync into the appropriate THD.
+# The function does nothing on the master, and on the slave it injects the
+# desired debug_sync action(s).
+SET sql_log_bin=0;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+--save_master_pos
+
+--connection server_2
+SET sql_log_bin=0;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ IF d1 != '' THEN
+ SET debug_sync = d1;
+ END IF;
+ IF d2 != '' THEN
+ SET debug_sync = d2;
+ END IF;
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+--sync_with_master
+
+
+# Set up three transactions on the master that will be group-committed
+# together so they can be replicated in parallel on the slave.
+--connect (con_temp3,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (31, foo(31,
+ 'ha_commit_one_phase WAIT_FOR t2_waiting',
+ 'commit_one_phase_2 SIGNAL t1_ready WAIT_FOR t1_cont'));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued1';
+
+--connect (con_temp4,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+SET binlog_format=statement;
+BEGIN;
+# This insert is just so we can get T2 to wait while a query is running that we
+# can see in SHOW PROCESSLIST so we can get its thread_id to kill later.
+INSERT INTO t3 VALUES (32, foo(32,
+ 'ha_write_row_end SIGNAL t2_query WAIT_FOR t2_cont',
+ ''));
+# This insert sets up debug_sync points so that T2 will tell when it is at its
+# wait point where we want to kill it - and when it has been killed.
+INSERT INTO t3 VALUES (33, foo(33,
+ 'wait_for_prior_commit_waiting SIGNAL t2_waiting',
+ 'wait_for_prior_commit_killed SIGNAL t2_killed'));
+send COMMIT;
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+
+--connect (con_temp5,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued3';
+SET binlog_format=statement;
+send INSERT INTO t3 VALUES (34, foo(34,
+ '',
+ ''));
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued3';
+SET debug_sync='now SIGNAL master_cont1';
+
+--connection con_temp3
+REAP;
+--connection con_temp4
+REAP;
+--connection con_temp5
+REAP;
+
+--connection server_1
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+
+--connection server_2
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Query execution was interrupted");
+CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
+CALL mtr.add_suppression("Slave: Connection was killed");
+SET sql_log_bin=1;
+# Wait until T2 is inside executing its insert of 32, then find it in SHOW
+# PROCESSLIST to know its thread id for KILL later.
+SET debug_sync='now WAIT_FOR t2_query';
+--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(32%' AND INFO NOT LIKE '%LIKE%'`
+SET debug_sync='now SIGNAL t2_cont';
+
+# Wait until T2 has entered its wait for T1 to commit, and T1 has
+# progressed into its commit phase.
+SET debug_sync='now WAIT_FOR t1_ready';
+
+# Now kill the transaction T2.
+--replace_result $thd_id THD_ID
+eval KILL $thd_id;
+
+# Wait until T2 has reacted on the kill.
+SET debug_sync='now WAIT_FOR t2_killed';
+
+# Now we can allow T1 to proceed.
+SET debug_sync='now SIGNAL t1_cont';
+
+--let $slave_sql_errno= 1317,1927,1963
+--source include/wait_for_slave_sql_error.inc
+STOP SLAVE IO_THREAD;
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+
+# Now we have to disable the debug_sync statements, so they do not trigger
+# when the events are retried.
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+--connection server_1
+INSERT INTO t3 VALUES (39,0);
+--save_master_pos
+
+--connection server_2
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+# Restore the foo() function.
+SET sql_log_bin=0;
+DROP FUNCTION foo;
+--delimiter ||
+CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
+ RETURNS INT DETERMINISTIC
+ BEGIN
+ IF d1 != '' THEN
+ SET debug_sync = d1;
+ END IF;
+ IF d2 != '' THEN
+ SET debug_sync = d2;
+ END IF;
+ RETURN x;
+ END
+||
+--delimiter ;
+SET sql_log_bin=1;
+
+
+--connection server_2
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+
+--connection server_2
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+
+--connection server_1
+DROP function foo;
+DROP TABLE t3;
+
+--source include/rpl_end.inc
diff --git a/sql/handler.cc b/sql/handler.cc
index 1518f2baaa0..1c5d643ed96 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1300,7 +1300,10 @@ int ha_commit_trans(THD *thd, bool all)
{
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
+ {
thd->transaction.cleanup();
+ thd->wakeup_subsequent_commits(error);
+ }
DBUG_RETURN(0);
}
@@ -1334,6 +1337,7 @@ int ha_commit_trans(THD *thd, bool all)
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, all);
+ thd->wakeup_subsequent_commits(1);
DBUG_RETURN(1);
}
@@ -1421,6 +1425,7 @@ done:
err:
error= 1; /* Transaction was rolled back */
ha_rollback_trans(thd, all);
+ thd->wakeup_subsequent_commits(error);
end:
if (rw_trans && mdl_request.ticket)
@@ -1466,8 +1471,12 @@ int ha_commit_one_phase(THD *thd, bool all)
bool is_real_trans=all || thd->transaction.all.ha_list == 0;
int res;
DBUG_ENTER("ha_commit_one_phase");
- if (is_real_trans && (res= thd->wait_for_prior_commit()))
- DBUG_RETURN(res);
+ if (is_real_trans)
+ {
+ DEBUG_SYNC(thd, "ha_commit_one_phase");
+ if ((res= thd->wait_for_prior_commit()))
+ DBUG_RETURN(res);
+ }
res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res);
}
@@ -1479,6 +1488,8 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
int error= 0;
Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
DBUG_ENTER("commit_one_phase_2");
+ if (is_real_trans)
+ DEBUG_SYNC(thd, "commit_one_phase_2");
if (ha_info)
{
for (; ha_info; ha_info= ha_info_next)
@@ -1591,10 +1602,7 @@ int ha_rollback_trans(THD *thd, bool all)
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
- {
- thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
- }
if (all)
thd->transaction_rollback_request= FALSE;
diff --git a/sql/log.cc b/sql/log.cc
index ee7e22cbe0c..fbb73acf5d1 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6612,16 +6612,17 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
to commit. If so, we add those to the queue as well, transitively for all
waiters.
- @retval TRUE If queued as the first entry in the queue (meaning this
- is the leader)
- @retval FALSE Otherwise
+ @retval < 0 Error
+ @retval > 0 If queued as the first entry in the queue (meaning this
+ is the leader)
+ @retval 0 Otherwise (queued as participant, leader handles the commit)
*/
-bool
+int
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{
group_commit_entry *entry, *orig_queue;
- wait_for_commit *list, *cur, *last;
+ wait_for_commit *cur, *last;
wait_for_commit *wfc;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
@@ -6641,6 +6642,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
/* Do an extra check here, this time safely under lock. */
if (wfc->waiting_for_commit)
{
+ const char *old_msg;
/*
By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before
@@ -6651,16 +6653,58 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
queued_by_other flag is set.
*/
wfc->opaque_pointer= orig_entry;
+ old_msg=
+ orig_entry->thd->enter_cond(&wfc->COND_wait_commit,
+ &wfc->LOCK_wait_commit,
+ "Waiting for prior transaction to commit");
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
- do
- {
+ while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
- } while (wfc->waiting_for_commit);
wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other));
+
+ if (wfc->waiting_for_commit)
+ {
+ /* Wait terminated due to kill. */
+ wait_for_commit *loc_waitee= wfc->waitee;
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running ||
+ orig_entry->queued_by_other)
+ {
+ /* Our waitee is already waking us up, so ignore the kill. */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ do
+ {
+ mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
+ } while (wfc->waiting_for_commit);
+ }
+ else
+ {
+ /* We were killed, so remove us from the list of waitee. */
+ wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+
+ orig_entry->thd->exit_cond(old_msg);
+ /* Interrupted by kill. */
+ DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior_killed");
+ wfc->wakeup_error= orig_entry->thd->killed_errno();
+ if (wfc->wakeup_error)
+ wfc->wakeup_error= ER_QUERY_INTERRUPTED;
+ my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
+ DBUG_RETURN(-1);
+ }
+ }
+ orig_entry->thd->exit_cond(old_msg);
+ }
+ else
+ mysql_mutex_unlock(&wfc->LOCK_wait_commit);
+
+ if (wfc->wakeup_error)
+ {
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ DBUG_RETURN(-1);
}
- mysql_mutex_unlock(&wfc->LOCK_wait_commit);
}
/*
@@ -6669,7 +6713,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
then there is nothing else to do.
*/
if (orig_entry->queued_by_other)
- DBUG_RETURN(false);
+ DBUG_RETURN(0);
/* Now enqueue ourselves in the group commit queue. */
DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
@@ -6707,9 +6751,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
used by the caller or any other function.
*/
- list= wfc;
- cur= list;
- last= list;
+ cur= wfc;
+ last= wfc;
entry= orig_entry;
for (;;)
{
@@ -6735,11 +6778,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/
if (cur->subsequent_commits_list)
{
- bool have_lock;
wait_for_commit *waiter;
+ wait_for_commit *wakeup_list= NULL;
+ wait_for_commit **wakeup_next_ptr= &wakeup_list;
mysql_mutex_lock(&cur->LOCK_wait_commit);
- have_lock= true;
/*
Grab the list, now safely under lock, and process it if still
non-empty.
@@ -6780,18 +6823,68 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
For this, we need to set the "wakeup running" flag and release
the waitee lock to avoid a deadlock, see comments on
THD::wakeup_subsequent_commits2() for details.
+
+ So we need to put these on a list and delay the wakeup until we
+ have released the lock.
*/
- if (have_lock)
+ *wakeup_next_ptr= waiter;
+ wakeup_next_ptr= &waiter->next_subsequent_commit;
+ }
+ waiter= next;
+ }
+ if (wakeup_list)
+ {
+ /* Now release our lock and do the wakeups that were delayed above. */
+ cur->wakeup_subsequent_commits_running= true;
+ mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ for (;;)
+ {
+ wait_for_commit *next;
+
+ /*
+ ToDo: We wakeup the waiter here, so that it can have the chance to
+ reach its own commit state and queue up for this same group commit,
+ if it is still pending.
+
+ One problem with this is that if the waiter does not reach its own
+ commit state before this group commit starts, and then the group
+ commit fails (binlog write failure), we do not get to propagate
+ the error to the waiter.
+
+ A solution for this could be to delay the wakeup until commit is
+ successful. But then we need to set a flag in the waitee that it is
+ already queued for group commit, so that the waiter can check this
+ flag and queue itself if it _does_ reach the commit state in time.
+
+ (But error handling in case of binlog write failure is currently
+ broken in other ways, as well).
+ */
+ if (&wakeup_list->next_subsequent_commit == wakeup_next_ptr)
{
- have_lock= false;
- cur->wakeup_subsequent_commits_running= true;
- mysql_mutex_unlock(&cur->LOCK_wait_commit);
+ /* The last one in the list. */
+ wakeup_list->wakeup(0);
+ break;
}
- waiter->wakeup(0);
+ /*
+ Important: don't access wakeup_list->next after the wakeup() call,
+ it may be invalidated by the other thread.
+ */
+ next= wakeup_list->next_subsequent_commit;
+ wakeup_list->wakeup(0);
+ wakeup_list= next;
}
- waiter= next;
+ /*
+ We need a full memory barrier between walking the list and clearing
+ the flag wakeup_subsequent_commits_running. This barrier is needed
+ to ensure that no other thread will start to modify the list
+ pointers before we are done traversing the list.
+
+ But wait_for_commit::wakeup(), which was called above, does a full
+ memory barrier already (it locks a mutex).
+ */
+ cur->wakeup_subsequent_commits_running= false;
}
- if (have_lock)
+ else
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
if (cur == last)
@@ -6805,29 +6898,6 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DBUG_ASSERT(entry != NULL);
}
- /*
- Now we need to clear the wakeup_subsequent_commits_running flags.
-
- We need a full memory barrier between walking the list above, and clearing
- the flag wakeup_subsequent_commits_running below. This barrier is needed
- to ensure that no other thread will start to modify the list pointers
- before we are done traversing the list.
-
- But wait_for_commit::wakeup(), which was called above for any other thread
- that might modify the list in parallel, does a full memory barrier already
- (it locks a mutex).
- */
- if (list)
- {
- for (;;)
- {
- list->wakeup_subsequent_commits_running= false;
- if (list == last)
- break;
- list= list->next_subsequent_commit;
- }
- }
-
if (opt_binlog_commit_wait_count > 0)
mysql_cond_signal(&COND_prepare_ordered);
mysql_mutex_unlock(&LOCK_prepare_ordered);
@@ -6841,13 +6911,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
- bool is_leader= queue_for_group_commit(entry);
+ int is_leader= queue_for_group_commit(entry);
/*
The first in the queue handles group commit for all; the others just wait
to be signalled when group commit is done.
*/
- if (is_leader)
+ if (is_leader < 0)
+ return true; /* Error */
+ else if (is_leader)
trx_group_commit_leader(entry);
else if (!entry->queued_by_other)
entry->thd->wait_for_wakeup_ready();
diff --git a/sql/log.h b/sql/log.h
index 73518d2594f..45381152d97 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
void do_checkpoint_request(ulong binlog_id);
void purge();
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
- bool queue_for_group_commit(group_commit_entry *entry);
+ int queue_for_group_commit(group_commit_entry *entry);
bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock();
diff --git a/sql/log_event.cc b/sql/log_event.cc
index d6c7db1e215..d205a08e708 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -7158,7 +7158,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks();
- if (sub_id)
+ if (!res && sub_id)
rpl_global_gtid_slave_state.update_state_hash(sub_id, &gtid);
/*
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 109db6ef681..af44d79038c 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -2,6 +2,7 @@
#include "rpl_parallel.h"
#include "slave.h"
#include "rpl_mi.h"
+#include "debug_sync.h"
/*
@@ -24,7 +25,7 @@ static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
- int err __attribute__((unused));
+ int err;
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
@@ -142,7 +143,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
if (err)
wfc->unregister_wait_for_prior_commit();
else
- wfc->wait_for_prior_commit();
+ err= wfc->wait_for_prior_commit(thd);
thd->wait_for_commit_ptr= NULL;
/*
@@ -172,6 +173,18 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
}
+static void
+signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
+{
+ rgi->is_error= true;
+ rgi->cleanup_context(thd, true);
+ rgi->rli->abort_slave= true;
+ mysql_mutex_lock(rgi->rli->relay_log.get_log_lock());
+ mysql_mutex_unlock(rgi->rli->relay_log.get_log_lock());
+ rgi->rli->relay_log.signal_update();
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -207,6 +220,7 @@ handle_rpl_parallel_thread(void *arg)
thd->variables.log_slow_filter= global_system_variables.log_slow_filter;
set_slave_thread_options(thd);
thd->client_capabilities = CLIENT_LOCAL_FILES;
+ thd->net.reading_or_writing= 0;
thd_proc_info(thd, "Waiting for work from main SQL threads");
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
@@ -285,21 +299,42 @@ handle_rpl_parallel_thread(void *arg)
wait_start_sub_id= rgi->wait_start_sub_id;
if (wait_for_sub_id || wait_start_sub_id)
{
+ bool did_enter_cond= false;
+ const char *old_msg= NULL;
+
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (wait_start_sub_id)
{
- while (wait_start_sub_id > entry->last_committed_sub_id)
+ old_msg= thd->enter_cond(&entry->COND_parallel_entry,
+ &entry->LOCK_parallel_entry,
+ "Waiting for prior transaction to commit "
+ "before starting next transaction");
+ did_enter_cond= true;
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
+ while (wait_start_sub_id > entry->last_committed_sub_id &&
+ !thd->check_killed())
mysql_cond_wait(&entry->COND_parallel_entry,
&entry->LOCK_parallel_entry);
+ if (wait_start_sub_id > entry->last_committed_sub_id)
+ {
+ /* The thread got a kill signal. */
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
+ thd->send_kill_message();
+ slave_output_error_info(rgi->rli, thd);
+ signal_error_to_sql_driver_thread(thd, rgi);
+ }
+ rgi->wait_start_sub_id= 0; /* No need to check again. */
}
- rgi->wait_start_sub_id= 0; /* No need to check again. */
if (wait_for_sub_id > entry->last_committed_sub_id)
{
wait_for_commit *waitee=
&rgi->wait_commit_group_info->commit_orderer;
rgi->commit_orderer.register_wait_for_prior_commit(waitee);
}
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ if (did_enter_cond)
+ thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
}
if(thd->wait_for_commit_ptr)
@@ -342,10 +377,8 @@ handle_rpl_parallel_thread(void *arg)
if (err)
{
- rgi->is_error= true;
slave_output_error_info(rgi->rli, thd);
- rgi->cleanup_context(thd, true);
- rgi->rli->abort_slave= true;
+ signal_error_to_sql_driver_thread(thd, rgi);
}
if (end_of_group)
{
@@ -354,6 +387,7 @@ handle_rpl_parallel_thread(void *arg)
&rgi->commit_orderer);
delete rgi;
group_rgi= rgi= NULL;
+ DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
events= next;
@@ -384,11 +418,9 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- group_rgi->is_error= true;
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, &group_rgi->commit_orderer);
- group_rgi->cleanup_context(thd, true);
- group_rgi->rli->abort_slave= true;
+ signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
delete group_rgi;
group_rgi= NULL;
@@ -753,6 +785,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;
bool is_group_event;
+ bool did_enter_cond= false;
+ const char *old_msg= NULL;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
@@ -767,6 +801,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
sql_thread_stopping= true;
if (sql_thread_stopping)
{
+ delete ev;
/* QQ: Need a better comment why we return false here */
return false;
}
@@ -775,6 +810,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
MYF(0))))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ delete ev;
return true;
}
qev->ev= ev;
@@ -797,6 +833,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
delete rgi;
+ my_free(qev);
+ delete ev;
return true;
}
rgi->is_parallel_exec = true;
@@ -814,6 +852,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
However, the commit of this event must wait for the commit of the prior
event, to preserve binlog commit order and visibility across all
servers in the replication hierarchy.
+
+ In addition, we must not start executing this event until we have
+ finished the previous collection of event groups that group-committed
+ together; we use rgi->wait_start_sub_id to control this.
*/
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
rgi->wait_commit_sub_id= e->current_sub_id;
@@ -860,6 +902,21 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
}
else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
break; // The thread is ready to queue into
+ else if (rli->sql_driver_thd->check_killed())
+ {
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ my_error(ER_CONNECTION_KILLED, MYF(0));
+ delete rgi;
+ my_free(qev);
+ delete ev;
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
+ };);
+ slave_output_error_info(rli, rli->sql_driver_thd);
+ return true;
+ }
else
{
/*
@@ -867,6 +924,18 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
use for queuing events, so wait for the thread to consume some
of its queue.
*/
+ if (!did_enter_cond)
+ {
+ old_msg= rli->sql_driver_thd->enter_cond
+ (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
+ "Waiting for room in worker thread event queue");
+ did_enter_cond= true;
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
+ };);
+ }
mysql_cond_wait(&cur_thread->COND_rpl_thread,
&cur_thread->LOCK_rpl_thread);
}
@@ -1016,7 +1085,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev);
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ if (did_enter_cond)
+ rli->sql_driver_thd->exit_cond(old_msg);
+ else
+ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
mysql_cond_signal(&cur_thread->COND_rpl_thread);
return false;
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index d4db81b79c1..cfa7c0f344f 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -886,11 +886,11 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
int cmp= strcmp(group_relay_log_name, event_relay_log_name);
if (cmp < 0)
{
- group_relay_log_pos= event_relay_log_pos;
+ group_relay_log_pos= rgi->future_event_relay_log_pos;
strmake_buf(group_relay_log_name, event_relay_log_name);
notify_group_relay_log_name_update();
- } else if (cmp == 0 && group_relay_log_pos < event_relay_log_pos)
- group_relay_log_pos= event_relay_log_pos;
+ } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
+ group_relay_log_pos= rgi->future_event_relay_log_pos;
/*
In the parallel case we need to update the master_log_name here, rather
diff --git a/sql/slave.cc b/sql/slave.cc
index 4be4a96d142..dab978be591 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -6182,6 +6182,17 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
}
/*
+ We have to check sql_slave_killed() here an extra time.
+ Otherwise we may miss a wakeup, since last check was done
+ without holding LOCK_log.
+ */
+ if (sql_slave_killed(rgi))
+ {
+ mysql_mutex_unlock(log_lock);
+ break;
+ }
+
+ /*
We can, and should release data_lock while we are waiting for
update. If we do not, show slave status will block
*/
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index d1edec140b6..8abdd53469f 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5783,12 +5783,53 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
returns immediately.
*/
int
-wait_for_commit::wait_for_prior_commit2()
+wait_for_commit::wait_for_prior_commit2(THD *thd)
{
+ const char *old_msg;
+ wait_for_commit *loc_waitee;
+
mysql_mutex_lock(&LOCK_wait_commit);
- while (waiting_for_commit)
+ DEBUG_SYNC(thd, "wait_for_prior_commit_waiting");
+ old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
+ "Waiting for prior transaction to commit");
+ while (waiting_for_commit && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- mysql_mutex_unlock(&LOCK_wait_commit);
+ if (!waiting_for_commit)
+ {
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ goto end;
+ }
+ /*
+ Wait was interrupted by kill. We need to unregister our wait and give the
+ error. But if a wakeup is already in progress, then we must ignore the
+ kill and not give error, otherwise we get inconsistency between waitee and
+ waiter as to whether we succeed or fail (eg. we may roll back but waitee
+ might attempt to commit both us and any subsequent commits waiting for us).
+ */
+ loc_waitee= this->waitee;
+ mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
+ if (loc_waitee->wakeup_subsequent_commits_running)
+ {
+ /* We are being woken up; ignore the kill and just wait. */
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ do
+ {
+ mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
+ } while (waiting_for_commit);
+ goto end;
+ }
+ remove_from_list(&loc_waitee->subsequent_commits_list);
+ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+
+ DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
+ wakeup_error= thd->killed_errno();
+ if (!wakeup_error)
+ wakeup_error= ER_QUERY_INTERRUPTED;
+ my_message(wakeup_error, ER(wakeup_error), MYF(0));
+
+end:
+ thd->exit_cond(old_msg);
waitee= NULL;
return wakeup_error;
}
@@ -5876,7 +5917,6 @@ wait_for_commit::unregister_wait_for_prior_commit2()
if (waiting_for_commit)
{
wait_for_commit *loc_waitee= this->waitee;
- wait_for_commit **next_ptr_ptr, *cur;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
@@ -5894,17 +5934,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
else
{
/* Remove ourselves from the list in the waitee. */
- next_ptr_ptr= &loc_waitee->subsequent_commits_list;
- while ((cur= *next_ptr_ptr) != NULL)
- {
- if (cur == this)
- {
- *next_ptr_ptr= this->next_subsequent_commit;
- break;
- }
- next_ptr_ptr= &cur->next_subsequent_commit;
- }
- waiting_for_commit= false;
+ remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
}
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 9f091ac2c2b..a3fc3a7866f 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1627,14 +1627,14 @@ struct wait_for_commit
bool wakeup_subsequent_commits_running;
void register_wait_for_prior_commit(wait_for_commit *waitee);
- int wait_for_prior_commit()
+ int wait_for_prior_commit(THD *thd)
{
/*
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
if (waiting_for_commit)
- return wait_for_prior_commit2();
+ return wait_for_prior_commit2(thd);
else
return wakeup_error;
}
@@ -1660,10 +1660,29 @@ struct wait_for_commit
if (waiting_for_commit)
unregister_wait_for_prior_commit2();
}
+ /*
+ Remove a waiter from the list in the waitee. Used to unregister a wait.
+ The caller must be holding the locks of both waiter and waitee.
+ */
+ void remove_from_list(wait_for_commit **next_ptr_ptr)
+ {
+ wait_for_commit *cur;
+
+ while ((cur= *next_ptr_ptr) != NULL)
+ {
+ if (cur == this)
+ {
+ *next_ptr_ptr= this->next_subsequent_commit;
+ break;
+ }
+ next_ptr_ptr= &cur->next_subsequent_commit;
+ }
+ waiting_for_commit= false;
+ }
void wakeup(int wakeup_error);
- int wait_for_prior_commit2();
+ int wait_for_prior_commit2(THD *thd);
void wakeup_subsequent_commits2(int wakeup_error);
void unregister_wait_for_prior_commit2();
@@ -3334,12 +3353,7 @@ public:
int wait_for_prior_commit()
{
if (wait_for_commit_ptr)
- {
- int err= wait_for_commit_ptr->wait_for_prior_commit();
- if (err)
- my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
- return err;
- }
+ return wait_for_commit_ptr->wait_for_prior_commit(this);
return 0;
}
void wakeup_subsequent_commits(int wakeup_error)