summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/r/mysqld--help.result9
-rw-r--r--mysql-test/suite/perfschema/r/dml_setup_instruments.result4
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result88
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test108
-rw-r--r--mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result13
-rw-r--r--mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test14
-rw-r--r--mysys/mf_iocache.c4
-rw-r--r--sql/log.cc29
-rw-r--r--sql/mysqld.cc9
-rw-r--r--sql/mysqld.h6
-rw-r--r--sql/rpl_parallel.cc951
-rw-r--r--sql/rpl_parallel.h146
-rw-r--r--sql/rpl_rli.cc63
-rw-r--r--sql/rpl_rli.h20
-rw-r--r--sql/slave.cc6
-rw-r--r--sql/sql_class.cc52
-rw-r--r--sql/sql_class.h29
-rw-r--r--sql/sys_vars.cc43
18 files changed, 1144 insertions, 450 deletions
diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result
index 953e9d926dd..9b124952095 100644
--- a/mysql-test/r/mysqld--help.result
+++ b/mysql-test/r/mysqld--help.result
@@ -782,6 +782,14 @@ The following options may be given as the first argument:
is already the default.
--slave-compressed-protocol
Use compression on master/slave protocol
+ --slave-domain-parallel-threads=#
+ Maximum number of parallel threads to use on slave for
+ events in a single replication domain. When using
+ multiple domains, this can be used to limit a single
+ domain from grabbing all threads and thus stalling other
+ domains. The default of 0 means to allow a domain to grab
+ as many threads as it wants, up to the value of
+ slave_parallel_threads.
--slave-exec-mode=name
Modes for how replication events should be executed.
Legal values are STRICT (default) and IDEMPOTENT. In
@@ -1155,6 +1163,7 @@ skip-networking FALSE
skip-show-database FALSE
skip-slave-start FALSE
slave-compressed-protocol FALSE
+slave-domain-parallel-threads 0
slave-exec-mode STRICT
slave-max-allowed-packet 1073741824
slave-net-timeout 3600
diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
index 17087264e7c..b47283d9193 100644
--- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result
+++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
@@ -37,6 +37,7 @@ where name like 'Wait/Synch/Cond/sql/%'
order by name limit 10;
NAME ENABLED TIMED
wait/synch/cond/sql/COND_flush_thread_cache YES YES
+wait/synch/cond/sql/COND_group_commit_orderer YES YES
wait/synch/cond/sql/COND_manager YES YES
wait/synch/cond/sql/COND_parallel_entry YES YES
wait/synch/cond/sql/COND_prepare_ordered YES YES
@@ -44,8 +45,7 @@ wait/synch/cond/sql/COND_queue_state YES YES
wait/synch/cond/sql/COND_rpl_status YES YES
wait/synch/cond/sql/COND_rpl_thread YES YES
wait/synch/cond/sql/COND_rpl_thread_pool YES YES
-wait/synch/cond/sql/COND_server_started YES YES
-wait/synch/cond/sql/COND_thread_cache YES YES
+wait/synch/cond/sql/COND_rpl_thread_queue YES YES
select * from performance_schema.setup_instruments
where name='Wait';
select * from performance_schema.setup_instruments
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 2ff6bd7cbe1..2531bf9457c 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -115,6 +115,7 @@ SET GLOBAL slave_parallel_threads=10;
SET debug_sync='RESET';
include/start_slave.inc
*** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
+SET debug_sync='RESET';
FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
@@ -141,6 +142,7 @@ INSERT INTO t3 VALUES (6, foo(16,
''));
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
+SET debug_sync='RESET';
SELECT * FROM t3 ORDER BY a;
a b
1 1
@@ -213,6 +215,9 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
slave-bin.000003 # Xid # # COMMIT /* XID */
*** Test STOP SLAVE in parallel mode ***
include/stop_slave.inc
+SET debug_sync='RESET';
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
SET binlog_direct_non_transactional_updates=0;
SET sql_log_bin=0;
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
@@ -227,10 +232,15 @@ INSERT INTO t3 VALUES(21, 21);
INSERT INTO t3 VALUES(22, 22);
SET binlog_format=@old_format;
BEGIN;
-INSERT INTO t2 VALUES (21);
+INSERT INTO t2 VALUES (21);
START SLAVE;
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
STOP SLAVE;
+SET debug_sync='now WAIT_FOR wait_for_done_waiting';
ROLLBACK;
+SET GLOBAL debug_dbug=@old_dbug;
+SET debug_sync='RESET';
include/wait_for_slave_to_stop.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
@@ -292,6 +302,7 @@ a b
32 32
33 33
34 34
+SET debug_sync='RESET';
SET sql_log_bin=0;
CALL mtr.add_suppression("Query execution was interrupted");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
@@ -308,6 +319,7 @@ STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -379,6 +391,7 @@ a b
42 42
43 43
44 44
+SET debug_sync='RESET';
SET debug_sync='now WAIT_FOR t2_query';
SET debug_sync='now SIGNAL t2_cont';
SET debug_sync='now WAIT_FOR t1_ready';
@@ -386,9 +399,7 @@ KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1963]
-SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
-a b
-41 41
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -463,6 +474,7 @@ a b
52 52
53 53
54 54
+SET debug_sync='RESET';
SET debug_sync='now WAIT_FOR t2_query';
SET debug_sync='now SIGNAL t2_cont';
SET debug_sync='now WAIT_FOR t1_ready';
@@ -473,6 +485,7 @@ include/wait_for_slave_sql_error.inc [errno=1317,1963]
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
a b
51 51
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -514,14 +527,18 @@ include/start_slave.inc
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
-SET GLOBAL slave_parallel_threads=3;
+SET GLOBAL slave_parallel_threads=4;
include/start_slave.inc
*** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
SET binlog_format=statement;
SET gtid_domain_id=2;
+BEGIN;
+INSERT INTO t3 VALUES (70, foo(70,
+'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
INSERT INTO t3 VALUES (60, foo(60,
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+COMMIT;
SET gtid_domain_id=0;
SET debug_sync='now WAIT_FOR d2_query';
SET gtid_domain_id=1;
@@ -540,15 +557,27 @@ INSERT INTO t3 VALUES (63, foo(63,
'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
SET debug_sync='now WAIT_FOR d0_query';
+SET gtid_domain_id=3;
+BEGIN;
+INSERT INTO t3 VALUES (68, foo(68,
+'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
+INSERT INTO t3 VALUES (69, foo(69,
+'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
+'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+SET debug_sync='now WAIT_FOR d3_query';
SET debug_sync='now SIGNAL d2_cont2';
SET debug_sync='now WAIT_FOR d2_done';
SET debug_sync='now SIGNAL d1_cont2';
SET debug_sync='now WAIT_FOR d1_done';
SET debug_sync='now SIGNAL d0_cont2';
SET debug_sync='now WAIT_FOR d0_done';
+SET debug_sync='now SIGNAL d3_cont2';
+SET debug_sync='now WAIT_FOR d3_done';
SET binlog_format=statement;
INSERT INTO t3 VALUES (64, foo(64,
-'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
INSERT INTO t3 VALUES (65, foo(65, '', ''));
SET debug_sync='now WAIT_FOR master_queued2';
@@ -569,23 +598,34 @@ a b
65 65
66 66
67 67
+68 68
+69 69
+70 70
+SET debug_sync='RESET';
SET debug_sync='now SIGNAL d0_cont';
SET debug_sync='now WAIT_FOR t1_waiting';
+SET debug_sync='now SIGNAL d3_cont';
+SET debug_sync='now WAIT_FOR t2_waiting';
SET debug_sync='now SIGNAL d1_cont';
SET debug_sync='now WAIT_FOR t3_waiting';
SET debug_sync='now SIGNAL d2_cont';
+SET debug_sync='now WAIT_FOR t4_waiting';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t3_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
a b
60 60
61 61
62 62
63 63
64 64
+68 68
+69 69
+70 70
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -597,11 +637,11 @@ RETURN x;
END
||
SET sql_log_bin=1;
-INSERT INTO t3 VALUES (69,0);
+UPDATE t3 SET b=b+1 WHERE a=60;
include/start_slave.inc
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
a b
-60 60
+60 61
61 61
62 62
63 63
@@ -609,7 +649,9 @@ a b
65 65
66 66
67 67
-69 0
+68 68
+69 69
+70 70
SET sql_log_bin=0;
DROP FUNCTION foo;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
@@ -634,37 +676,31 @@ include/start_slave.inc
SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
SET GLOBAL slave_parallel_max_queued=9000;
SET binlog_format=statement;
-INSERT INTO t3 VALUES (70, foo(0,
+INSERT INTO t3 VALUES (80, foo(0,
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
SET debug_sync='now WAIT_FOR query_waiting';
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
-INSERT INTO t3 VALUES (72, 0);
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
a b
-70 0
-71 10000
-72 0
+80 0
+81 10000
SET debug_sync='now WAIT_FOR wait_queue_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR wait_queue_killed';
SET debug_sync='now SIGNAL query_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
-a b
-70 0
-71 10000
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
-INSERT INTO t3 VALUES (73,0);
+INSERT INTO t3 VALUES (82,0);
+SET debug_sync='RESET';
include/start_slave.inc
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
a b
-70 0
-71 10000
-72 0
-73 0
+80 0
+81 10000
+82 0
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index 72a0a72db7d..a71534d2eb1 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -163,6 +163,7 @@ SET debug_sync='RESET';
--echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
--connection server_1
+SET debug_sync='RESET';
FLUSH LOGS;
--source include/wait_for_binlog_checkpoint.inc
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
@@ -230,6 +231,7 @@ REAP;
REAP;
--connection con_temp5
REAP;
+SET debug_sync='RESET';
--connection server_1
SELECT * FROM t3 ORDER BY a;
@@ -270,6 +272,10 @@ SELECT * FROM t3 ORDER BY a;
--echo *** Test STOP SLAVE in parallel mode ***
--connection server_2
--source include/stop_slave.inc
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+SET debug_sync='RESET';
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
--connection server_1
# Set up a couple of transactions. The first will be blocked halfway
@@ -288,7 +294,7 @@ BEGIN;
INSERT INTO t2 VALUES (20);
--disable_warnings
INSERT INTO t1 VALUES (20);
---disable_warnings
+--enable_warnings
INSERT INTO t2 VALUES (21);
INSERT INTO t3 VALUES (20, 20);
COMMIT;
@@ -300,7 +306,7 @@ SET binlog_format=@old_format;
# Start a connection that will block the replicated transaction halfway.
--connection con_temp1
BEGIN;
-INSERT INTO t2 VALUES (21);
+INSERT INTO t2 VALUES (21);
--connection server_2
START SLAVE;
@@ -312,13 +318,20 @@ START SLAVE;
--connection con_temp2
# Initiate slave stop. It will have to wait for the current event group
# to complete.
+# The dbug injection causes debug_sync to signal 'wait_for_done_waiting'
+# when the SQL driver thread is ready.
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
send STOP SLAVE;
--connection con_temp1
+SET debug_sync='now WAIT_FOR wait_for_done_waiting';
ROLLBACK;
--connection con_temp2
reap;
+SET GLOBAL debug_dbug=@old_dbug;
+SET debug_sync='RESET';
--connection server_2
--source include/wait_for_slave_to_stop.inc
@@ -397,6 +410,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
SET sql_log_bin=0;
@@ -431,6 +445,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -535,6 +550,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Wait until T2 is inside executing its insert of 42, then find it in SHOW
@@ -559,10 +575,10 @@ SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1963
--source include/wait_for_slave_sql_error.inc
-SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -673,6 +689,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Wait until T2 is inside executing its insert of 52, then find it in SHOW
@@ -701,6 +718,7 @@ SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -752,7 +770,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
-SET GLOBAL slave_parallel_threads=3;
+SET GLOBAL slave_parallel_threads=4;
--source include/start_slave.inc
@@ -762,24 +780,29 @@ SET GLOBAL slave_parallel_threads=3;
# can run in parallel with each other (same group commit and commit id),
# but not in parallel with T1.
#
-# We use three worker threads. T1 and T2 will be queued on the first, T3 on
-# the second, and T4 on the third. We will delay T1 commit, T3 will wait for
-# T1 to commit before it can start. We will kill T3 during this wait, and
+# We use four worker threads, each Ti will be queued on each their own
+# worker thread. We will delay T1 commit, T3 will wait for T1 to begin
+# commit before it can start. We will kill T3 during this wait, and
# check that everything works correctly.
#
# It is rather tricky to get the correct thread id of the worker to kill.
-# We start by injecting three dummy transactions in a debug_sync-controlled
+# We start by injecting four dummy transactions in a debug_sync-controlled
# manner to be able to get known thread ids for the workers in a pool with
-# just 3 worker threads. Then we let in each of the real test transactions
+# just 4 worker threads. Then we let in each of the real test transactions
# T1-T4 one at a time in a way which allows us to know which transaction
# ends up with which thread id.
--connection server_1
SET binlog_format=statement;
SET gtid_domain_id=2;
+BEGIN;
+# This debug_sync will linger on and be used to control T4 later.
+INSERT INTO t3 VALUES (70, foo(70,
+ 'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
INSERT INTO t3 VALUES (60, foo(60,
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+COMMIT;
SET gtid_domain_id=0;
--connection server_2
@@ -813,12 +836,30 @@ INSERT INTO t3 VALUES (63, foo(63,
SET debug_sync='now WAIT_FOR d0_query';
--let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'`
+--connection server_1
+SET gtid_domain_id=3;
+BEGIN;
+# These debug_sync's will linger on and be used to control T2 later.
+INSERT INTO t3 VALUES (68, foo(68,
+ 'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
+INSERT INTO t3 VALUES (69, foo(69,
+ 'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
+ 'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d3_query';
+--let $d3_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(69%' AND INFO NOT LIKE '%LIKE%'`
+
SET debug_sync='now SIGNAL d2_cont2';
SET debug_sync='now WAIT_FOR d2_done';
SET debug_sync='now SIGNAL d1_cont2';
SET debug_sync='now WAIT_FOR d1_done';
SET debug_sync='now SIGNAL d0_cont2';
SET debug_sync='now WAIT_FOR d0_done';
+SET debug_sync='now SIGNAL d3_cont2';
+SET debug_sync='now WAIT_FOR d3_done';
# Now prepare the real transactions T1, T2, T3, T4 on the master.
@@ -826,7 +867,7 @@ SET debug_sync='now WAIT_FOR d0_done';
# Create transaction T1.
SET binlog_format=statement;
INSERT INTO t3 VALUES (64, foo(64,
- 'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+ 'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
# Create transaction T2, as a group commit leader on the master.
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
@@ -861,6 +902,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Now we have the four transactions pending for replication on the slave.
@@ -872,15 +914,20 @@ SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
SET debug_sync='now SIGNAL d0_cont';
SET debug_sync='now WAIT_FOR t1_waiting';
-# T2 will be queued on the same worker D0 as T1.
+# Make the worker D3 free, and wait for T2 to be queued in it.
+SET debug_sync='now SIGNAL d3_cont';
+SET debug_sync='now WAIT_FOR t2_waiting';
+
# Now release worker D1, and wait for T3 to be queued in it.
# T3 will wait for T1 to commit before it can start.
SET debug_sync='now SIGNAL d1_cont';
SET debug_sync='now WAIT_FOR t3_waiting';
-# Release worker D2. T4 may or may not have time to be queued on it, but
-# it will not be able to complete due to T3 being killed.
+# Release worker D2. Wait for T4 to be queued, so we are sure it has
+# received the debug_sync signal (else we might overwrite it with the
+# next debug_sync).
SET debug_sync='now SIGNAL d2_cont';
+SET debug_sync='now WAIT_FOR t4_waiting';
# Now we kill the waiting transaction T3 in worker D1.
--replace_result $d1_thd_id THD_ID
@@ -895,10 +942,15 @@ SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1927,1963
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+# Since T2, T3, and T4 run in parallel, we can not be sure if T2 will have time
+# to commit or not before the stop. However, T1 should commit, and T3/T4 may
+# not have committed. (After slave restart we check that all become committed
+# eventually).
+SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -914,7 +966,7 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
SET sql_log_bin=1;
--connection server_1
-INSERT INTO t3 VALUES (69,0);
+UPDATE t3 SET b=b+1 WHERE a=60;
--save_master_pos
--connection server_2
@@ -951,6 +1003,7 @@ SET GLOBAL slave_parallel_threads=10;
--echo *** 5. Test killing thread that is waiting for queue of max length to shorten ***
+# Find the thread id of the driver SQL thread that we want to kill.
--let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'
--source include/wait_condition.inc
--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'`
@@ -961,12 +1014,8 @@ SET GLOBAL slave_parallel_max_queued=9000;
--let bigstring= `SELECT REPEAT('x', 10000)`
SET binlog_format=statement;
# Create an event that will wait to be signalled.
-INSERT INTO t3 VALUES (70, foo(0,
+INSERT INTO t3 VALUES (80, foo(0,
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
---disable_query_log
-# Create an event that will fill up the queue.
-eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring'));
---enable_query_log
--connection server_2
SET debug_sync='now WAIT_FOR query_waiting';
@@ -977,11 +1026,14 @@ SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
--connection server_1
-# This event will have to wait for the queue to become shorter before it can
-# be queued. We will test that things work when we kill the SQL driver thread
-# during this wait.
-INSERT INTO t3 VALUES (72, 0);
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+--disable_query_log
+# Create an event that will fill up the queue.
+# The Xid event at the end of the event group will have to wait for the Query
+# event with the INSERT to drain so the queue becomes shorter. However that in
+# turn waits for the prior event group to continue.
+eval INSERT INTO t3 VALUES (81, LENGTH('$bigstring'));
+--enable_query_log
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
--connection server_2
SET debug_sync='now WAIT_FOR wait_queue_ready';
@@ -995,19 +1047,19 @@ SET debug_sync='now SIGNAL query_cont';
--let $slave_sql_errno= 1317,1927,1963
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
--connection server_1
-INSERT INTO t3 VALUES (73,0);
+INSERT INTO t3 VALUES (82,0);
--save_master_pos
--connection server_2
+SET debug_sync='RESET';
--source include/start_slave.inc
--sync_with_master
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
--connection server_2
diff --git a/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result
new file mode 100644
index 00000000000..9e53d9bd891
--- /dev/null
+++ b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result
@@ -0,0 +1,13 @@
+SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads;
+SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default';
+must be zero because of default
+0
+SELECT @@SESSION.slave_domain_parallel_threads as 'no session var';
+ERROR HY000: Variable 'slave_domain_parallel_threads' is a GLOBAL variable
+SET GLOBAL slave_domain_parallel_threads= 0;
+SET GLOBAL slave_domain_parallel_threads= DEFAULT;
+SET GLOBAL slave_domain_parallel_threads= 10;
+SELECT @@GLOBAL.slave_domain_parallel_threads;
+@@GLOBAL.slave_domain_parallel_threads
+10
+SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads;
diff --git a/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test
new file mode 100644
index 00000000000..7be48fbd4c5
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test
@@ -0,0 +1,14 @@
+--source include/not_embedded.inc
+
+SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads;
+
+SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default';
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT @@SESSION.slave_domain_parallel_threads as 'no session var';
+
+SET GLOBAL slave_domain_parallel_threads= 0;
+SET GLOBAL slave_domain_parallel_threads= DEFAULT;
+SET GLOBAL slave_domain_parallel_threads= 10;
+SELECT @@GLOBAL.slave_domain_parallel_threads;
+
+SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads;
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index 02e5c5373ae..0c48b367942 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -1281,10 +1281,6 @@ read_append_buffer:
size_t transfer_len;
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
- /*
- TODO: figure out if the assert below is needed or correct.
- */
- DBUG_ASSERT(pos_in_file == info->end_of_file);
copy_len=min(Count, len_in_buff);
memcpy(Buffer, info->append_read_pos, copy_len);
info->append_read_pos += copy_len;
diff --git a/sql/log.cc b/sql/log.cc
index 6fead95a4b1..a7f3a69cfc9 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6644,13 +6644,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/
wfc= orig_entry->thd->wait_for_commit_ptr;
orig_entry->queued_by_other= false;
- if (wfc && wfc->waiting_for_commit)
+ if (wfc && wfc->waitee)
{
mysql_mutex_lock(&wfc->LOCK_wait_commit);
/* Do an extra check here, this time safely under lock. */
- if (wfc->waiting_for_commit)
+ if (wfc->waitee)
{
const char *old_msg;
+ wait_for_commit *loc_waitee;
+
/*
By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before
@@ -6661,21 +6663,20 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
queued_by_other flag is set.
*/
wfc->opaque_pointer= orig_entry;
+ DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
old_msg=
orig_entry->thd->enter_cond(&wfc->COND_wait_commit,
&wfc->LOCK_wait_commit,
"Waiting for prior transaction to commit");
- DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
- while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
+ while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed())
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other));
- if (wfc->waiting_for_commit)
+ if (loc_waitee)
{
/* Wait terminated due to kill. */
- wait_for_commit *loc_waitee= wfc->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running ||
orig_entry->queued_by_other)
@@ -6685,13 +6686,14 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
- } while (wfc->waiting_for_commit);
+ } while (wfc->waitee);
}
else
{
/* We were killed, so remove us from the list of waitee. */
wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ wfc->waitee= NULL;
orig_entry->thd->exit_cond(old_msg);
/* Interrupted by kill. */
@@ -6707,12 +6709,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
}
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
-
- if (wfc->wakeup_error)
- {
- my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
- DBUG_RETURN(-1);
- }
+ }
+ if (wfc && wfc->wakeup_error)
+ {
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ DBUG_RETURN(-1);
}
/*
@@ -9043,7 +9044,7 @@ start_binlog_background_thread()
array_elements(all_binlog_threads));
#endif
- if (mysql_thread_create(key_thread_binlog, &th, NULL,
+ if (mysql_thread_create(key_thread_binlog, &th, &connection_attrib,
binlog_background_thread, NULL))
return 1;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d95d856c18f..b7cdeb0bde4 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -546,6 +546,7 @@ ulong rpl_recovery_rank=0;
ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0;
+ulong opt_slave_domain_parallel_threads= 0;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
@@ -895,8 +896,10 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
-PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
- key_COND_parallel_entry, key_COND_prepare_ordered;
+PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
+ key_COND_rpl_thread_pool,
+ key_COND_parallel_entry, key_COND_group_commit_orderer,
+ key_COND_prepare_ordered;
PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
@@ -941,8 +944,10 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
+ { &key_COND_rpl_thread_queue, "COND_rpl_thread_queue", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
+ { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 4fdd34fd8be..dfdd54225fe 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -177,6 +177,7 @@ extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads;
+extern ulong opt_slave_domain_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
@@ -284,8 +285,9 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
-extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
- key_COND_parallel_entry;
+extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
+ key_COND_rpl_thread_pool,
+ key_COND_parallel_entry, key_COND_group_commit_orderer;
extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index ef282611f70..eab5b980c02 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -93,32 +93,12 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
}
-static bool
-sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
-{
- if (!rgi->rli->abort_slave && !abort_loop)
- return false;
-
- /*
- Do not abort in the middle of an event group that cannot be rolled back.
- */
- if ((thd->transaction.all.modified_non_trans_table ||
- (thd->variables.option_bits & OPTION_KEEP_LOG))
- && in_event_group)
- return false;
- /* ToDo: should we add some timeout like in sql_slave_killed?
- if (rgi->last_event_start_time == 0)
- rgi->last_event_start_time= my_time(0);
- */
-
- return true;
-}
-
-
static void
finish_event_group(THD *thd, int err, uint64 sub_id,
- rpl_parallel_entry *entry, wait_for_commit *wfc)
+ rpl_parallel_entry *entry, rpl_group_info *rgi)
{
+ wait_for_commit *wfc= &rgi->commit_orderer;
+
/*
Remove any left-over registration to wait for a prior commit to
complete. Normally, such wait would already have been removed at
@@ -163,12 +143,26 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < sub_id)
- {
entry->last_committed_sub_id= sub_id;
- mysql_cond_broadcast(&entry->COND_parallel_entry);
- }
+
+ /*
+ If this event group got error, then any following event groups that have
+ not yet started should just skip their group, preparing for stop of the
+ SQL driver thread.
+ */
+ if (unlikely(rgi->is_error) &&
+ entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
+ entry->stop_on_error_sub_id= sub_id;
+ /*
+ We need to mark that this event group started its commit phase, in case we
+ missed it before (otherwise we would deadlock the next event group that is
+ waiting for this). In most cases (normal DML), it will be a no-op.
+ */
+ rgi->mark_start_commit_no_lock();
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ thd->clear_error();
+ thd->stmt_da->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(err);
}
@@ -185,6 +179,20 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
}
+static void
+unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
+ const char *old_msg)
+{
+ if (*did_enter_cond)
+ {
+ thd->exit_cond(old_msg);
+ *did_enter_cond= false;
+ }
+ else
+ mysql_mutex_unlock(lock);
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -193,8 +201,14 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
+ bool group_skip_for_stop= false;
rpl_group_info *group_rgi= NULL;
+ group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
+ rpl_parallel_thread::queued_event *qevs_to_free;
+ rpl_group_info *rgis_to_free;
+ group_commit_orderer *gcos_to_free;
+ size_t total_event_size;
int err;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -234,44 +248,62 @@ handle_rpl_parallel_thread(void *arg)
rpt->running= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
- while (!rpt->stop && !thd->killed)
+ while (!rpt->stop)
{
- rpl_parallel_thread *list;
-
old_msg= thd->proc_info;
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
"Waiting for work from SQL thread");
- while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed &&
- !(rpt->current_entry && rpt->current_entry->force_abort))
+ /*
+ There are 4 cases that should cause us to wake up:
+ - Events have been queued for us to handle.
+ - We have an owner, but no events and not inside event group -> we need
+ to release ourself to the thread pool
+ - SQL thread is stopping, and we have an owner but no events, and we are
+ inside an event group; no more events will be queued to us, so we need
+ to abort the group (force_abort==1).
+ - Thread pool shutdown (rpt->stop==1).
+ */
+ while (!( (events= rpt->event_queue) ||
+ (rpt->current_owner && !in_event_group) ||
+ (rpt->current_owner && group_rgi->parallel_entry->force_abort) ||
+ rpt->stop))
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
- rpt->dequeue(events);
+ rpt->dequeue1(events);
thd->exit_cond(old_msg);
- mysql_cond_signal(&rpt->COND_rpl_thread);
more_events:
+ qevs_to_free= NULL;
+ rgis_to_free= NULL;
+ gcos_to_free= NULL;
+ total_event_size= 0;
while (events)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type;
rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
- uint64 wait_for_sub_id;
- uint64 wait_start_sub_id;
- bool end_of_group;
+ bool end_of_group, group_ending;
+ total_event_size+= events->event_size;
if (!events->ev)
{
handle_queued_pos_update(thd, events);
- my_free(events);
+ events->next= qevs_to_free;
+ qevs_to_free= events;
events= next;
continue;
}
err= 0;
group_rgi= rgi;
+ gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
{
+ bool did_enter_cond= false;
+ const char *old_msg= NULL;
+ uint64 wait_count;
+
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -293,50 +325,87 @@ handle_rpl_parallel_thread(void *arg)
occured.
Also do not start parallel execution of this event group until all
- prior groups have committed that are not safe to run in parallel with.
+ prior groups have reached the commit phase that are not safe to run
+ in parallel with.
*/
- wait_for_sub_id= rgi->wait_commit_sub_id;
- wait_start_sub_id= rgi->wait_start_sub_id;
- if (wait_for_sub_id || wait_start_sub_id)
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (!gco->installed)
{
- bool did_enter_cond= false;
- const char *old_msg= NULL;
-
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- if (wait_start_sub_id)
+ if (gco->prev_gco)
+ gco->prev_gco->next_gco= gco;
+ gco->installed= true;
+ }
+ wait_count= gco->wait_count;
+ if (wait_count > entry->count_committing_event_groups)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
+ old_msg= thd->enter_cond(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry,
+ "Waiting for prior transaction to start "
+ "commit before starting next transaction");
+ did_enter_cond= true;
+ do
{
- old_msg= thd->enter_cond(&entry->COND_parallel_entry,
- &entry->LOCK_parallel_entry,
- "Waiting for prior transaction to commit "
- "before starting next transaction");
- did_enter_cond= true;
- DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
- while (wait_start_sub_id > entry->last_committed_sub_id &&
- !thd->check_killed())
- mysql_cond_wait(&entry->COND_parallel_entry,
- &entry->LOCK_parallel_entry);
- if (wait_start_sub_id > entry->last_committed_sub_id)
+ if (thd->check_killed() && !rgi->is_error)
{
- /* The thread got a kill signal. */
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi);
+ /*
+ Even though we were killed, we need to continue waiting for the
+ prior event groups to signal that we can continue. Otherwise we
+ mess up the accounting for ordering. However, now that we have
+ marked the error, events will just be skipped rather than
+ executed, and things will progress quickly towards stop.
+ */
}
- rgi->wait_start_sub_id= 0; /* No need to check again. */
- }
- if (wait_for_sub_id > entry->last_committed_sub_id)
- {
- wait_for_commit *waitee=
- &rgi->wait_commit_group_info->commit_orderer;
- rgi->commit_orderer.register_wait_for_prior_commit(waitee);
- }
- if (did_enter_cond)
- thd->exit_cond(old_msg);
- else
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ mysql_cond_wait(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry);
+ } while (wait_count > entry->count_committing_event_groups);
+ }
+
+ if ((tmp_gco= gco->prev_gco))
+ {
+ /*
+ Now all the event groups in the previous batch have entered their
+ commit phase, and will no longer access their gco. So we can free
+ it here.
+ */
+ DBUG_ASSERT(!tmp_gco->prev_gco);
+ gco->prev_gco= NULL;
+ tmp_gco->next_gco= gcos_to_free;
+ gcos_to_free= tmp_gco;
}
+ if (entry->force_abort && wait_count > entry->stop_count)
+ {
+ /*
+ We are stopping (STOP SLAVE), and this event group is beyond the
+ point where we can safely stop. So set a flag that will cause us
+ to skip, rather than execute, the following events.
+ */
+ group_skip_for_stop= true;
+ }
+ else
+ group_skip_for_stop= false;
+
+ if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
+ group_skip_for_stop= true;
+ else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
+ {
+ /*
+ Register that the commit of this event group must wait for the
+ commit of the previous event group to complete before it may
+ complete itself, so that we preserve commit order.
+ */
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+ unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
+ &did_enter_cond, old_msg);
+
if(thd->wait_for_commit_ptr)
{
/*
@@ -353,13 +422,23 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer;
}
+ group_ending= event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (((Query_log_event *)events->ev)->is_commit() ||
+ ((Query_log_event *)events->ev)->is_rollback()));
+ if (group_ending)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
+ rgi->mark_start_commit();
+ }
+
/*
If the SQL thread is stopping, we just skip execution of all the
following event groups. We still do all the normal waiting and wakeup
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group))
+ if (!rgi->is_error && !group_skip_for_stop)
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
@@ -367,13 +446,11 @@ handle_rpl_parallel_thread(void *arg)
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
- event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
- (((Query_log_event *)events->ev)->is_commit() ||
- ((Query_log_event *)events->ev)->is_rollback())));
+ group_ending);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
- my_free(events);
+ events->next= qevs_to_free;
+ qevs_to_free= events;
if (err)
{
@@ -383,10 +460,11 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group)
{
in_event_group= false;
- finish_event_group(thd, err, event_gtid_sub_id, entry,
- &rgi->commit_orderer);
- delete rgi;
+ finish_event_group(thd, err, event_gtid_sub_id, entry, rgi);
+ rgi->next= rgis_to_free;
+ rgis_to_free= rgi;
group_rgi= rgi= NULL;
+ group_skip_for_stop= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@@ -394,6 +472,29 @@ handle_rpl_parallel_thread(void *arg)
}
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ /* Signal that our queue can now accept more events. */
+ rpt->dequeue2(total_event_size);
+ mysql_cond_signal(&rpt->COND_rpl_thread_queue);
+ /* We need to delay the free here, to when we have the lock. */
+ while (gcos_to_free)
+ {
+ group_commit_orderer *next= gcos_to_free->next_gco;
+ rpt->free_gco(gcos_to_free);
+ gcos_to_free= next;
+ }
+ while (rgis_to_free)
+ {
+ rpl_group_info *next= rgis_to_free->next;
+ rpt->free_rgi(rgis_to_free);
+ rgis_to_free= next;
+ }
+ while (qevs_to_free)
+ {
+ rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+ rpt->free_qev(qevs_to_free);
+ qevs_to_free= next;
+ }
+
if ((events= rpt->event_queue) != NULL)
{
/*
@@ -401,9 +502,8 @@ handle_rpl_parallel_thread(void *arg)
This is faster than having to wakeup the pool manager thread to give us
a new event.
*/
- rpt->dequeue(events);
+ rpt->dequeue1(events);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- mysql_cond_signal(&rpt->COND_rpl_thread);
goto more_events;
}
@@ -418,27 +518,26 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ thd->wait_for_prior_commit();
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
- group_rgi->parallel_entry, &group_rgi->commit_orderer);
+ group_rgi->parallel_entry, group_rgi);
signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
- delete group_rgi;
- group_rgi= NULL;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_rgi(group_rgi);
+ group_rgi= NULL;
+ group_skip_for_stop= false;
}
if (!in_event_group)
{
+ rpt->current_owner= NULL;
+ /* Tell wait_for_done() that we are done, if it is waiting. */
+ if (likely(rpt->current_entry) &&
+ unlikely(rpt->current_entry->force_abort))
+ mysql_cond_broadcast(&rpt->current_entry->COND_parallel_entry);
rpt->current_entry= NULL;
if (!rpt->stop)
- {
- mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
- list= rpt->pool->free_list;
- rpt->next= list;
- rpt->pool->free_list= rpt;
- if (!list)
- mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool);
- mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool);
- }
+ rpt->pool->release_thread(rpt);
}
}
@@ -467,6 +566,15 @@ handle_rpl_parallel_thread(void *arg)
}
+static void
+dealloc_gco(group_commit_orderer *gco)
+{
+ DBUG_ASSERT(!gco->prev_gco /* Must only free after dealloc previous */);
+ mysql_cond_destroy(&gco->COND_group_commit_orderer);
+ my_free(gco);
+}
+
+
int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count, bool skip_check)
@@ -501,8 +609,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
+ mysql_cond_init(key_COND_rpl_thread_queue,
+ &new_list[i]->COND_rpl_thread_queue, NULL);
new_list[i]->pool= pool;
- if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL,
+ if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
handle_rpl_parallel_thread, new_list[i]))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -539,7 +649,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
for (i= 0; i < pool->count; ++i)
{
- rpl_parallel_thread *rpt= pool->get_thread(NULL);
+ rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
rpt->stop= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -554,6 +664,24 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
mysql_cond_destroy(&rpt->COND_rpl_thread);
+ while (rpt->qev_free_list)
+ {
+ rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next;
+ my_free(rpt->qev_free_list);
+ rpt->qev_free_list= next;
+ }
+ while (rpt->rgi_free_list)
+ {
+ rpl_group_info *next= rpt->rgi_free_list->next;
+ delete rpt->rgi_free_list;
+ rpt->rgi_free_list= next;
+ }
+ while (rpt->gco_free_list)
+ {
+ group_commit_orderer *next= rpt->gco_free_list->next_gco;
+ dealloc_gco(rpt->gco_free_list);
+ rpt->gco_free_list= next;
+ }
}
my_free(pool->threads);
@@ -609,6 +737,121 @@ err:
}
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli)
+{
+ queued_event *qev;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((qev= qev_free_list))
+ qev_free_list= qev->next;
+ else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
+ return NULL;
+ }
+ qev->ev= ev;
+ qev->event_size= event_size;
+ qev->next= NULL;
+ strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
+ qev->event_relay_log_pos= rli->event_relay_log_pos;
+ qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
+ strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
+ return qev;
+}
+
+
+void
+rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ qev->next= qev_free_list;
+ qev_free_list= qev;
+}
+
+
+rpl_group_info*
+rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
+ rpl_parallel_entry *e)
+{
+ rpl_group_info *rgi;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((rgi= rgi_free_list))
+ {
+ rgi_free_list= rgi->next;
+ rgi->reinit(rli);
+ }
+ else
+ {
+ if(!(rgi= new rpl_group_info(rli)))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi));
+ return NULL;
+ }
+ rgi->is_parallel_exec = true;
+ if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
+ rgi->deferred_events= new Deferred_log_events(rli);
+ }
+ if (event_group_new_gtid(rgi, gtid_ev))
+ {
+ free_rgi(rgi);
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ return NULL;
+ }
+ rgi->parallel_entry= e;
+
+ return rgi;
+}
+
+
+void
+rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
+ rgi->free_annotate_event();
+ if (rgi->deferred_events)
+ {
+ delete rgi->deferred_events;
+ rgi->deferred_events= NULL;
+ }
+ rgi->next= rgi_free_list;
+ rgi_free_list= rgi;
+}
+
+
+group_commit_orderer *
+rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
+{
+ group_commit_orderer *gco;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((gco= gco_free_list))
+ gco_free_list= gco->next_gco;
+ else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco));
+ return NULL;
+ }
+ mysql_cond_init(key_COND_group_commit_orderer,
+ &gco->COND_group_commit_orderer, NULL);
+ gco->wait_count= wait_count;
+ gco->prev_gco= prev;
+ gco->next_gco= NULL;
+ gco->installed= false;
+ return gco;
+}
+
+
+void
+rpl_parallel_thread::free_gco(group_commit_orderer *gco)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
+ gco->next_gco= gco_free_list;
+ gco_free_list= gco;
+}
+
+
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: count(0), threads(0), free_list(0), changing(false), inited(false)
{
@@ -651,7 +894,8 @@ rpl_parallel_thread_pool::destroy()
Note that we return with the worker threads's LOCK_rpl_thread mutex locked.
*/
struct rpl_parallel_thread *
-rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
+rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
+ rpl_parallel_entry *entry)
{
rpl_parallel_thread *rpt;
@@ -661,16 +905,151 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
free_list= rpt->next;
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->current_owner= owner;
rpt->current_entry= entry;
return rpt;
}
+/*
+ Release a thread to the thread pool.
+ The thread should be locked, and should not have any work queued for it.
+*/
+void
+rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
+{
+ rpl_parallel_thread *list;
+
+ mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread);
+ DBUG_ASSERT(rpt->current_owner == NULL);
+ mysql_mutex_lock(&LOCK_rpl_thread_pool);
+ list= free_list;
+ rpt->next= list;
+ free_list= rpt;
+ if (!list)
+ mysql_cond_broadcast(&COND_rpl_thread_pool);
+ mysql_mutex_unlock(&LOCK_rpl_thread_pool);
+}
+
+
+/*
+ Obtain a worker thread that we can queue an event to.
+
+ Each invocation allocates a new worker thread, to maximise
+ parallelism. However, only up to a maximum of
+ --slave-domain-parallel-threads workers can be occupied by a single
+ replication domain; after that point, we start re-using worker threads that
+ are still executing events that were queued earlier for this thread.
+
+ We never queue more than --rpl-parallel-wait-queue_max amount of events
+ for one worker, to avoid the SQL driver thread using up all memory with
+ queued events while worker threads are stalling.
+
+ Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread
+ locked. Exception is if we were killed, in which case NULL is returned.
+
+ The *did_enter_cond flag is set true if we had to wait for a worker thread
+ to become free (with mysql_cond_wait()). If so, *old_msg will also be set,
+ and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead
+ of mysql_mutex_unlock.
+
+ If the flag `reuse' is set, the last worker thread will be returned again,
+ if it is still available. Otherwise a new worker thread is allocated.
+*/
+rpl_parallel_thread *
+rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+ const char **old_msg, bool reuse)
+{
+ uint32 idx;
+ rpl_parallel_thread *thr;
+
+ idx= rpl_thread_idx;
+ if (!reuse)
+ {
+ ++idx;
+ if (idx >= rpl_thread_max)
+ idx= 0;
+ rpl_thread_idx= idx;
+ }
+ thr= rpl_threads[idx];
+ if (thr)
+ {
+ *did_enter_cond= false;
+ mysql_mutex_lock(&thr->LOCK_rpl_thread);
+ for (;;)
+ {
+ if (thr->current_owner != &rpl_threads[idx])
+ {
+ /*
+ The worker thread became idle, and returned to the free list and
+ possibly was allocated to a different request. So we should allocate
+ a new worker thread.
+ */
+ unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
+ did_enter_cond, *old_msg);
+ thr= NULL;
+ break;
+ }
+ else if (thr->queued_size <= opt_slave_parallel_max_queued)
+ {
+ /* The thread is ready to queue into. */
+ break;
+ }
+ else if (rli->sql_driver_thd->check_killed())
+ {
+ unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
+ did_enter_cond, *old_msg);
+ my_error(ER_CONNECTION_KILLED, MYF(0));
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
+ };);
+ slave_output_error_info(rli, rli->sql_driver_thd);
+ return NULL;
+ }
+ else
+ {
+ /*
+ We have reached the limit of how much memory we are allowed to use
+ for queuing events, so wait for the thread to consume some of its
+ queue.
+ */
+ if (!*did_enter_cond)
+ {
+ /*
+ We need to do the debug_sync before enter_cond().
+ Because debug_sync changes the thd->mysys_var->current_mutex,
+ and this can cause THD::awake to use the wrong mutex.
+ */
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
+ };);
+ *old_msg= rli->sql_driver_thd->enter_cond
+ (&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread,
+ "Waiting for room in worker thread event queue");
+ *did_enter_cond= true;
+ }
+ mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread);
+ }
+ }
+ }
+ if (!thr)
+ rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
+ this);
+
+ return thr;
+}
+
static void
free_rpl_parallel_entry(void *element)
{
rpl_parallel_entry *e= (rpl_parallel_entry *)element;
+ if (e->current_gco)
+ dealloc_gco(e->current_gco);
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e);
@@ -710,10 +1089,22 @@ rpl_parallel::find(uint32 domain_id)
(const uchar *)&domain_id, 0)))
{
/* Allocate a new, empty one. */
- if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e),
- MYF(MY_ZEROFILL))))
+ ulong count= opt_slave_domain_parallel_threads;
+ if (count == 0 || count > opt_slave_parallel_threads)
+ count= opt_slave_parallel_threads;
+ rpl_parallel_thread **p;
+ if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
+ &e, sizeof(*e),
+ &p, count*sizeof(*p),
+ NULL))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
return NULL;
+ }
+ e->rpl_threads= p;
+ e->rpl_thread_max= count;
e->domain_id= domain_id;
+ e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -731,10 +1122,11 @@ rpl_parallel::find(uint32 domain_id)
void
-rpl_parallel::wait_for_done()
+rpl_parallel::wait_for_done(THD *thd)
{
struct rpl_parallel_entry *e;
- uint32 i;
+ rpl_parallel_thread *rpt;
+ uint32 i, j;
/*
First signal all workers that they must force quit; no more events will
@@ -742,26 +1134,58 @@ rpl_parallel::wait_for_done()
*/
for (i= 0; i < domain_hash.records; ++i)
{
- rpl_parallel_thread *rpt;
-
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ /*
+ We want the worker threads to stop as quickly as is safe. If the slave
+ SQL threads are behind, we could have significant amount of events
+ queued for the workers, and we want to stop without waiting for them
+ all to be applied first. But if any event group has already started
+ executing in a worker, we want to be sure that all prior event groups
+ are also executed, so that we stop at a consistent point in the binlog
+ stream (per replication domain).
+
+ All event groups wait for e->count_committing_event_groups to reach
+ the value of group_commit_orderer::wait_count before starting to
+ execute. Thus, at this point we know that any event group with a
+ strictly larger wait_count are safe to skip, none of them can have
+ started executing yet. So we set e->stop_count here and use it to
+ decide in the worker threads whether to continue executing an event
+ group or whether to skip it, when force_abort is set.
+ */
e->force_abort= true;
- if ((rpt= e->rpl_thread))
+ e->stop_count= e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ for (j= 0; j < e->rpl_thread_max; ++j)
{
- mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- if (rpt->current_entry == e)
- mysql_cond_signal(&rpt->COND_rpl_thread);
- mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if ((rpt= e->rpl_threads[j]))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (rpt->current_owner == &e->rpl_threads[j])
+ mysql_cond_signal(&rpt->COND_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
}
}
+ DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger",
+ {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
+ };);
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
- mysql_mutex_lock(&e->LOCK_parallel_entry);
- while (e->current_sub_id > e->last_committed_sub_id)
- mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
- mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ for (j= 0; j < e->rpl_thread_max; ++j)
+ {
+ if ((rpt= e->rpl_threads[j]))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ while (rpt->current_owner == &e->rpl_threads[j])
+ mysql_cond_wait(&e->COND_parallel_entry, &rpt->LOCK_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
+ }
}
}
@@ -788,6 +1212,21 @@ rpl_parallel::workers_idle()
/*
+ This is used when we get an error during processing in do_event();
+ We will not queue any event to the thread, but we still need to wake it up
+ to be sure that it will be returned to the pool.
+*/
+static void
+abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
+ bool *did_enter_cond, const char *old_msg)
+{
+ unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread,
+ did_enter_cond, old_msg);
+ mysql_cond_signal(&cur_thread->COND_rpl_thread);
+}
+
+
+/*
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can execute the query.
@@ -815,6 +1254,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
/*
Stop queueing additional event groups once the SQL thread is requested to
stop.
+
+ We have to queue any remaining events of any event group that has already
+ been partially queued, but after that we will just ignore any further
+ events the SQL driver thread may try to queue, and eventually it will stop.
*/
if (((typ= ev->get_type_code()) == GTID_EVENT ||
!(is_group_event= Log_event::is_group_event(typ))) &&
@@ -823,188 +1266,121 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (sql_thread_stopping)
{
delete ev;
- /* QQ: Need a better comment why we return false here */
+ /*
+ Return false ("no error"); normal stop is not an error, and otherwise the
+ error has already been recorded.
+ */
return false;
}
- if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
- MYF(0))))
+ if (typ == GTID_EVENT || unlikely(!current))
{
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ uint32 domain_id;
+ if (likely(typ == GTID_EVENT))
+ {
+ Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ 0 : gtid_ev->domain_id);
+ }
+ else
+ domain_id= 0;
+ if (!(e= find(domain_id)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ delete ev;
+ return true;
+ }
+ current= e;
+ }
+ else
+ e= current;
+
+ /*
+ Find a worker thread to queue the event for.
+ Prefer a new thread, so we maximise parallelism (at least for the group
+ commit). But do not exceed a limit of --slave-domain-parallel-threads;
+ instead re-use a thread that we queued for previously.
+ */
+ cur_thread=
+ e->choose_thread(rli, &did_enter_cond, &old_msg, typ != GTID_EVENT);
+ if (!cur_thread)
+ {
+ /* This means we were killed. The error is already signalled. */
+ delete ev;
+ return true;
+ }
+
+ if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
+ {
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
delete ev;
return true;
}
- qev->ev= ev;
- qev->event_size= event_size;
- qev->next= NULL;
- strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
- qev->event_relay_log_pos= rli->event_relay_log_pos;
- qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
- strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
- 0 : gtid_ev->domain_id);
- if (!(e= find(domain_id)) ||
- !(rgi= new rpl_group_info(rli)) ||
- event_group_new_gtid(rgi, gtid_ev))
+ if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
{
- my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
- delete rgi;
- my_free(qev);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
delete ev;
return true;
}
- rgi->is_parallel_exec = true;
- if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
- rgi->deferred_events= new Deferred_log_events(rli);
- if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
- e->last_commit_id == gtid_ev->commit_id)
- {
- /*
- We are already executing something else in this domain. But the two
- event groups were committed together in the same group commit on the
- master, so we can still do them in parallel here on the slave.
+ /*
+ We queue the event group in a new worker thread, to run in parallel
+ with previous groups.
- However, the commit of this event must wait for the commit of the prior
- event, to preserve binlog commit order and visibility across all
- servers in the replication hierarchy.
+ To preserve commit order within the replication domain, we set up
+ rgi->wait_commit_sub_id to make the new group commit only after the
+ previous group has committed.
- In addition, we must not start executing this event until we have
- finished the previous collection of event groups that group-committed
- together; we use rgi->wait_start_sub_id to control this.
- */
- rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
- rgi->wait_commit_sub_id= e->current_sub_id;
- rgi->wait_commit_group_info= e->current_group_info;
- rgi->wait_start_sub_id= e->prev_groupcommit_sub_id;
- e->rpl_thread= cur_thread= rpt;
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else
+ Event groups that group-committed together on the master can be run
+ in parallel with each other without restrictions. But one batch of
+ group-commits may not start before all groups in the previous batch
+ have initiated their commit phase; we set up rgi->gco to ensure that.
+ */
+ rgi->wait_commit_sub_id= e->current_sub_id;
+ rgi->wait_commit_group_info= e->current_group_info;
+
+ if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
+ e->last_commit_id == gtid_ev->commit_id))
{
/*
- Check if we already have a worker thread for this entry.
-
- We continue to queue more events up for the worker thread while it is
- still executing the first ones, to be able to start executing a large
- event group without having to wait for the end to be fetched from the
- master. And we continue to queue up more events after the first group,
- so that we can continue to process subsequent parts of the relay log in
- parallel without having to wait for previous long-running events to
- complete.
-
- But if the worker thread is idle at any point, it may return to the
- idle list or start servicing a different request. So check this, and
- allocate a new thread if the old one is no longer processing for us.
+ A new batch of transactions that group-committed together on the master.
+
+ Remember the count that marks the end of the previous group committed
+ batch, and allocate a new gco.
*/
- cur_thread= e->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- for (;;)
- {
- if (cur_thread->current_entry != e)
- {
- /*
- The worker thread became idle, and returned to the free list and
- possibly was allocated to a different request. This also means
- that everything previously queued has already been executed,
- else the worker thread would not have become idle. So we should
- allocate a new worker thread.
- */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- e->rpl_thread= cur_thread= NULL;
- break;
- }
- else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
- break; // The thread is ready to queue into
- else if (rli->sql_driver_thd->check_killed())
- {
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- my_error(ER_CONNECTION_KILLED, MYF(0));
- delete rgi;
- my_free(qev);
- delete ev;
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
- };);
- slave_output_error_info(rli, rli->sql_driver_thd);
- return true;
- }
- else
- {
- /*
- We have reached the limit of how much memory we are allowed to
- use for queuing events, so wait for the thread to consume some
- of its queue.
- */
- if (!did_enter_cond)
- {
- old_msg= rli->sql_driver_thd->enter_cond
- (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
- "Waiting for room in worker thread event queue");
- did_enter_cond= true;
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
- };);
- }
- mysql_cond_wait(&cur_thread->COND_rpl_thread,
- &cur_thread->LOCK_rpl_thread);
- }
- }
- }
+ uint64 count= e->count_queued_event_groups;
+ group_commit_orderer *gco;
- if (!cur_thread)
- {
- /*
- Nothing else is currently running in this domain. We can
- spawn a new thread to do this event group in parallel with
- anything else that might be running in other domains.
- */
- cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else
+ if (!(gco= cur_thread->get_gco(count, e->current_gco)))
{
- /*
- We are still executing the previous event group for this replication
- domain, and we have to wait for that to finish before we can start on
- the next one. So just re-use the thread.
- */
+ cur_thread->free_rgi(rgi);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
+ delete ev;
+ return true;
}
-
- rgi->wait_commit_sub_id= 0;
- rgi->wait_start_sub_id= 0;
- e->prev_groupcommit_sub_id= e->current_sub_id;
+ e->current_gco= rgi->gco= gco;
}
-
+ else
+ rgi->gco= e->current_gco;
if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
- {
- e->last_server_id= gtid_ev->server_id;
- e->last_seq_no= gtid_ev->seq_no;
e->last_commit_id= gtid_ev->commit_id;
- }
else
- {
- e->last_server_id= 0;
- e->last_seq_no= 0;
e->last_commit_id= 0;
- }
-
qev->rgi= e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
- current= rgi->parallel_entry= e;
+ ++e->count_queued_event_groups;
}
- else if (!is_group_event || !current)
+ else if (!is_group_event || !e)
{
my_off_t log_pos;
int err;
@@ -1014,7 +1390,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Same for events not preceeded by GTID (we should not see those normally,
but they might be from an old master).
- The varuable `current' is NULL for the case where the master did not
+ The variable `e' is NULL for the case where the master did not
have GTID, like a MariaDB 5.5 or MySQL master.
*/
qev->rgi= serial_rgi;
@@ -1041,18 +1417,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (err)
{
- my_free(qev);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
return true;
}
- qev->ev= NULL;
- qev->future_event_master_log_pos= log_pos;
- if (!current)
- {
- rli->event_relay_log_pos= rli->future_event_relay_log_pos;
- handle_queued_pos_update(rli->sql_driver_thd, qev);
- my_free(qev);
- return false;
- }
/*
Queue an empty event, so that the position will be updated in a
reasonable way relative to other events:
@@ -1065,40 +1434,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
- cur_thread= current->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != current)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- cur_thread= NULL;
- }
- }
- if (!cur_thread)
- cur_thread= current->rpl_thread=
- global_rpl_thread_pool.get_thread(current);
+ qev->ev= NULL;
+ qev->future_event_master_log_pos= log_pos;
}
else
{
- cur_thread= current->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != current)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- cur_thread= NULL;
- }
- }
- if (!cur_thread)
- {
- cur_thread= current->rpl_thread=
- global_rpl_thread_pool.get_thread(current);
- }
- qev->rgi= current->current_group_info;
+ qev->rgi= e->current_group_info;
}
/*
@@ -1106,10 +1447,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev);
- if (did_enter_cond)
- rli->sql_driver_thd->exit_cond(old_msg);
- else
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
+ &did_enter_cond, old_msg);
mysql_cond_signal(&cur_thread->COND_rpl_thread);
return false;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 019a354c57d..90649230f98 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -9,16 +9,66 @@ struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
class Relay_log_info;
+
+
+/*
+ Structure used to keep track of the parallel replication of a batch of
+ event-groups that group-committed together on the master.
+
+ It is used to ensure that every event group in one batch has reached the
+ commit stage before the next batch starts executing.
+
+ Note the lifetime of this structure:
+
+ - It is allocated when the first event in a new batch of group commits
+ is queued, from the free list rpl_parallel_entry::gco_free_list.
+
+ - The gco for the batch currently being queued is owned by
+ rpl_parallel_entry::current_gco. The gco for a previous batch that has
+ been fully queued is owned by the gco->prev_gco pointer of the gco for
+ the following batch.
+
+ - The worker thread waits on gco->COND_group_commit_orderer for
+ rpl_parallel_entry::count_committing_event_groups to reach wait_count
+ before starting; the first waiter links the gco into the next_gco
+ pointer of the gco of the previous batch for signalling.
+
+ - When an event group reaches the commit stage, it signals the
+ COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and
+ rpl_parallel_entry::count_committing_event_groups has reached
+ gco->next_gco->wait_count.
+
+ - When gco->wait_count is reached for a worker and the wait completes,
+ the worker frees gco->prev_gco; at this point it is guaranteed not to
+ be needed any longer.
+*/
+struct group_commit_orderer {
+ /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */
+ mysql_cond_t COND_group_commit_orderer;
+ uint64 wait_count;
+ group_commit_orderer *prev_gco;
+ group_commit_orderer *next_gco;
+ bool installed;
+};
+
+
struct rpl_parallel_thread {
bool delay_start;
bool running;
bool stop;
mysql_mutex_t LOCK_rpl_thread;
mysql_cond_t COND_rpl_thread;
+ mysql_cond_t COND_rpl_thread_queue;
struct rpl_parallel_thread *next; /* For free list. */
struct rpl_parallel_thread_pool *pool;
THD *thd;
- struct rpl_parallel_entry *current_entry;
+ /*
+ Who owns the thread, if any (it's a pointer into the
+ rpl_parallel_entry::rpl_threads array.
+ */
+ struct rpl_parallel_thread **current_owner;
+ /* The rpl_parallel_entry of the owner. */
+ rpl_parallel_entry *current_entry;
struct queued_event {
queued_event *next;
Log_event *ev;
@@ -31,6 +81,9 @@ struct rpl_parallel_thread {
size_t event_size;
} *event_queue, *last_in_queue;
uint64 queued_size;
+ queued_event *qev_free_list;
+ rpl_group_info *rgi_free_list;
+ group_commit_orderer *gco_free_list;
void enqueue(queued_event *qev)
{
@@ -42,15 +95,25 @@ struct rpl_parallel_thread {
queued_size+= qev->event_size;
}
- void dequeue(queued_event *list)
+ void dequeue1(queued_event *list)
{
- queued_event *tmp;
-
DBUG_ASSERT(list == event_queue);
event_queue= last_in_queue= NULL;
- for (tmp= list; tmp; tmp= tmp->next)
- queued_size-= tmp->event_size;
}
+
+ void dequeue2(size_t dequeue_size)
+ {
+ queued_size-= dequeue_size;
+ }
+
+ queued_event *get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli);
+ void free_qev(queued_event *qev);
+ rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
+ rpl_parallel_entry *e);
+ void free_rgi(rpl_group_info *rgi);
+ group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
+ void free_gco(group_commit_orderer *gco);
};
@@ -66,14 +129,16 @@ struct rpl_parallel_thread_pool {
rpl_parallel_thread_pool();
int init(uint32 size);
void destroy();
- struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry);
+ struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner,
+ rpl_parallel_entry *entry);
+ void release_thread(rpl_parallel_thread *rpt);
};
struct rpl_parallel_entry {
+ mysql_mutex_t LOCK_parallel_entry;
+ mysql_cond_t COND_parallel_entry;
uint32 domain_id;
- uint32 last_server_id;
- uint64 last_seq_no;
uint64 last_commit_id;
bool active;
/*
@@ -82,15 +147,41 @@ struct rpl_parallel_entry {
waiting for event groups to complete.
*/
bool force_abort;
+ /*
+ At STOP SLAVE (force_abort=true), we do not want to process all events in
+ the queue (which could unnecessarily delay stop, if a lot of events happen
+ to be queued). The stop_count provides a safe point at which to stop, so
+ that everything before becomes committed and nothing after does. The value
+ corresponds to group_commit_orderer::wait_count; if wait_count is less than
+ or equal to stop_count, we execute the associated event group, else we
+ skip it (and all following) and stop.
+ */
+ uint64 stop_count;
- rpl_parallel_thread *rpl_thread;
+ /*
+ Cyclic array recording the last rpl_thread_max worker threads that we
+ queued event for. This is used to limit how many workers a single domain
+ can occupy (--slave-domain-parallel-threads).
+
+ Note that workers are never explicitly deleted from the array. Instead,
+ we need to check (under LOCK_rpl_thread) that the thread still belongs
+ to us before re-using (rpl_thread::current_owner).
+ */
+ rpl_parallel_thread **rpl_threads;
+ uint32 rpl_thread_max;
+ uint32 rpl_thread_idx;
/*
The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection.
+
+ Event groups commit in order, so the rpl_group_info for an event group
+ will be alive (at least) as long as
+ rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to
+ safely refer back to previous event groups if they are still executing,
+ and ignore them if they completed, without requiring explicit
+ synchronisation between the threads.
*/
uint64 last_committed_sub_id;
- mysql_mutex_t LOCK_parallel_entry;
- mysql_cond_t COND_parallel_entry;
/*
The sub_id of the last event group in this replication domain that was
queued for execution by a worker thread.
@@ -98,14 +189,29 @@ struct rpl_parallel_entry {
uint64 current_sub_id;
rpl_group_info *current_group_info;
/*
- The sub_id of the last event group in the previous batch of group-committed
- transactions.
-
- When we spawn parallel worker threads for the next group-committed batch,
- they first need to wait for this sub_id to be committed before it is safe
- to start executing them.
+ If we get an error in some event group, we set the sub_id of that event
+ group here. Then later event groups (with higher sub_id) can know not to
+ try to start (event groups that already started will be rolled back when
+ wait_for_prior_commit() returns error).
+ The value is ULONGLONG_MAX when no error occured.
+ */
+ uint64 stop_on_error_sub_id;
+ /* Total count of event groups queued so far. */
+ uint64 count_queued_event_groups;
+ /*
+ Count of event groups that have started (but not necessarily completed)
+ the commit phase. We use this to know when every event group in a previous
+ batch of master group commits have started committing on the slave, so
+ that it is safe to start executing the events in the following batch.
*/
- uint64 prev_groupcommit_sub_id;
+ uint64 count_committing_event_groups;
+ /* The group_commit_orderer object for the events currently being queued. */
+ group_commit_orderer *current_gco;
+
+ rpl_parallel_thread * choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+ const char **old_msg, bool reuse);
+ group_commit_orderer *get_gco();
+ void free_gco(group_commit_orderer *gco);
};
struct rpl_parallel {
HASH domain_hash;
@@ -116,7 +222,7 @@ struct rpl_parallel {
~rpl_parallel();
void reset();
rpl_parallel_entry *find(uint32 domain_id);
- void wait_for_done();
+ void wait_for_done(THD *thd);
bool workers_idle();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 797f5681ec5..ffe5e516069 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1479,14 +1479,27 @@ end:
}
-rpl_group_info::rpl_group_info(Relay_log_info *rli_)
- : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
- wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
- tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
- is_parallel_exec(false), is_error(false),
- row_stmt_start_timestamp(0), long_find_row_note_printed(false)
+void
+rpl_group_info::reinit(Relay_log_info *rli)
+{
+ this->rli= rli;
+ tables_to_lock= NULL;
+ tables_to_lock_count= 0;
+ trans_retries= 0;
+ last_event_start_time= 0;
+ is_error= false;
+ row_stmt_start_timestamp= 0;
+ long_find_row_note_printed= false;
+ did_mark_start_commit= false;
+ commit_orderer.reinit();
+}
+
+rpl_group_info::rpl_group_info(Relay_log_info *rli)
+ : thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), parallel_entry(0),
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
+ reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
MY_MUTEX_INIT_FAST);
@@ -1706,4 +1719,40 @@ void rpl_group_info::slave_close_thread_tables(THD *thd)
}
+
+static void
+mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco)
+{
+ uint64 count= ++e->count_committing_event_groups;
+ if (gco->next_gco && gco->next_gco->wait_count == count)
+ mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer);
+}
+
+
+void
+rpl_group_info::mark_start_commit_no_lock()
+{
+ if (did_mark_start_commit)
+ return;
+ mark_start_commit_inner(parallel_entry, gco);
+ did_mark_start_commit= true;
+}
+
+
+void
+rpl_group_info::mark_start_commit()
+{
+ rpl_parallel_entry *e;
+
+ if (did_mark_start_commit)
+ return;
+
+ e= this->parallel_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ mark_start_commit_inner(e, gco);
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ did_mark_start_commit= true;
+}
+
+
#endif
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 3f95849a926..0ba259b0efd 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -481,6 +481,7 @@ private:
struct rpl_group_info
{
+ rpl_group_info *next; /* For free list in rpl_parallel_thread */
Relay_log_info *rli;
THD *thd;
/*
@@ -510,14 +511,15 @@ struct rpl_group_info
uint64 wait_commit_sub_id;
rpl_group_info *wait_commit_group_info;
/*
- If non-zero, the event group must wait for this sub_id to be committed
- before the execution of the event group is allowed to start.
+ This holds a pointer to a struct that keeps track of the need to wait
+ for the previous batch of event groups to reach the commit stage, before
+ this batch can start to execute.
(When we execute in parallel the transactions that group committed
together on the master, we still need to wait for any prior transactions
- to have commtted).
+ to have reached the commit stage).
*/
- uint64 wait_start_sub_id;
+ group_commit_orderer *gco;
struct rpl_parallel_entry *parallel_entry;
@@ -567,18 +569,22 @@ struct rpl_group_info
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
bool is_error;
+ /*
+ Set true when we signalled that we reach the commit phase. Used to avoid
+ counting one event group twice.
+ */
+ bool did_mark_start_commit;
-private:
/*
Runtime state for printing a note when slave is taking
too long while processing a row event.
*/
time_t row_stmt_start_timestamp;
bool long_find_row_note_printed;
-public:
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
+ void reinit(Relay_log_info *rli);
/*
Returns true if the argument event resides in the containter;
@@ -661,6 +667,8 @@ public:
void clear_tables_to_lock();
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
+ void mark_start_commit_no_lock();
+ void mark_start_commit();
time_t get_row_stmt_start_timestamp()
{
diff --git a/sql/slave.cc b/sql/slave.cc
index 07209c30ff3..b081a3369f5 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -330,7 +330,7 @@ run_slave_init_thread()
pthread_t th;
slave_init_thread_running= true;
- if (mysql_thread_create(key_thread_slave_init, &th, NULL,
+ if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
handle_slave_init, NULL))
{
sql_print_error("Failed to create thread while initialising slave");
@@ -4526,7 +4526,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done();
+ rli->parallel.wait_for_done(thd);
/* Thread stopped. Print the current replication position to the log */
{
@@ -4552,7 +4552,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
get the correct position printed.)
*/
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done();
+ rli->parallel.wait_for_done(thd);
/*
Some events set some playgrounds, which won't be cleared because thread
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 09ec7361fe3..b9e1bf91888 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5667,14 +5667,23 @@ bool THD::rgi_have_temporary_tables()
}
+void
+wait_for_commit::reinit()
+{
+ subsequent_commits_list= NULL;
+ next_subsequent_commit= NULL;
+ waitee= NULL;
+ opaque_pointer= NULL;
+ wakeup_error= 0;
+ wakeup_subsequent_commits_running= false;
+}
+
+
wait_for_commit::wait_for_commit()
- : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
- opaque_pointer(0),
- waiting_for_commit(false), wakeup_error(0),
- wakeup_subsequent_commits_running(false)
{
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+ reinit();
}
@@ -5722,7 +5731,7 @@ wait_for_commit::wakeup(int wakeup_error)
*/
mysql_mutex_lock(&LOCK_wait_commit);
- waiting_for_commit= false;
+ waitee= NULL;
this->wakeup_error= wakeup_error;
/*
Note that it is critical that the mysql_cond_signal() here is done while
@@ -5754,9 +5763,8 @@ wait_for_commit::wakeup(int wakeup_error)
void
wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
{
- waiting_for_commit= true;
- wakeup_error= 0;
DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
+ wakeup_error= 0;
this->waitee= waitee;
mysql_mutex_lock(&waitee->LOCK_wait_commit);
@@ -5766,7 +5774,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
see comments on wakeup_subsequent_commits2() for details.
*/
if (waitee->wakeup_subsequent_commits_running)
- waiting_for_commit= false;
+ this->waitee= NULL;
else
{
/*
@@ -5795,9 +5803,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
DEBUG_SYNC(thd, "wait_for_prior_commit_waiting");
old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
"Waiting for prior transaction to commit");
- while (waiting_for_commit && !thd->check_killed())
+ while ((loc_waitee= this->waitee) && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- if (!waiting_for_commit)
+ if (!loc_waitee)
{
if (wakeup_error)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
@@ -5810,7 +5818,6 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
waiter as to whether we succeed or fail (eg. we may roll back but waitee
might attempt to commit both us and any subsequent commits waiting for us).
*/
- loc_waitee= this->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
@@ -5819,21 +5826,29 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
do
{
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- } while (waiting_for_commit);
+ } while (this->waitee);
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
goto end;
}
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ this->waitee= NULL;
- DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wakeup_error, ER(wakeup_error), MYF(0));
+ thd->exit_cond(old_msg);
+ /*
+ Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to
+ use within enter_cond/exit_cond.
+ */
+ DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
+ return wakeup_error;
end:
thd->exit_cond(old_msg);
- waitee= NULL;
return wakeup_error;
}
@@ -5916,10 +5931,11 @@ wait_for_commit::wakeup_subsequent_commits2(int wakeup_error)
void
wait_for_commit::unregister_wait_for_prior_commit2()
{
+ wait_for_commit *loc_waitee;
+
mysql_mutex_lock(&LOCK_wait_commit);
- if (waiting_for_commit)
+ if ((loc_waitee= this->waitee))
{
- wait_for_commit *loc_waitee= this->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
@@ -5931,7 +5947,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
See comments on wakeup_subsequent_commits2() for more details.
*/
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
- while (waiting_for_commit)
+ while (this->waitee)
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
}
else
@@ -5939,10 +5955,10 @@ wait_for_commit::unregister_wait_for_prior_commit2()
/* Remove ourselves from the list in the waitee. */
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ this->waitee= NULL;
}
}
mysql_mutex_unlock(&LOCK_wait_commit);
- this->waitee= NULL;
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 726b920edb1..57dd92db49d 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1598,8 +1598,8 @@ struct wait_for_commit
{
/*
The LOCK_wait_commit protects the fields subsequent_commits_list and
- wakeup_subsequent_commits_running (for a waitee), and the flag
- waiting_for_commit and associated COND_wait_commit (for a waiter).
+ wakeup_subsequent_commits_running (for a waitee), and the pointer
+ waiterr and associated COND_wait_commit (for a waiter).
*/
mysql_mutex_t LOCK_wait_commit;
mysql_cond_t COND_wait_commit;
@@ -1607,7 +1607,13 @@ struct wait_for_commit
wait_for_commit *subsequent_commits_list;
/* Link field for entries in subsequent_commits_list. */
wait_for_commit *next_subsequent_commit;
- /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
+ /*
+ Our waitee, if we did register_wait_for_prior_commit(), and were not
+ yet woken up. Else NULL.
+
+ When this is cleared for wakeup, the COND_wait_commit condition is
+ signalled.
+ */
wait_for_commit *waitee;
/*
Generic pointer for use by the transaction coordinator to optimise the
@@ -1618,12 +1624,6 @@ struct wait_for_commit
used by another transaction coordinator for similar purposes.
*/
void *opaque_pointer;
- /*
- The waiting_for_commit flag is cleared when a waiter has been woken
- up. The COND_wait_commit condition is signalled when this has been
- cleared.
- */
- bool waiting_for_commit;
/* The wakeup error code from the waitee. 0 means no error. */
int wakeup_error;
/*
@@ -1639,10 +1639,14 @@ struct wait_for_commit
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
- if (waiting_for_commit)
+ if (waitee)
return wait_for_prior_commit2(thd);
else
+ {
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
return wakeup_error;
+ }
}
void wakeup_subsequent_commits(int wakeup_error)
{
@@ -1663,7 +1667,7 @@ struct wait_for_commit
}
void unregister_wait_for_prior_commit()
{
- if (waiting_for_commit)
+ if (waitee)
unregister_wait_for_prior_commit2();
}
/*
@@ -1683,7 +1687,7 @@ struct wait_for_commit
}
next_ptr_ptr= &cur->next_subsequent_commit;
}
- waiting_for_commit= false;
+ waitee= NULL;
}
void wakeup(int wakeup_error);
@@ -1694,6 +1698,7 @@ struct wait_for_commit
wait_for_commit();
~wait_for_commit();
+ void reinit();
};
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 69d93968f9f..35e1cd457ee 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1609,6 +1609,49 @@ static Sys_var_ulong Sys_slave_parallel_threads(
ON_UPDATE(fix_slave_parallel_threads));
+static bool
+check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var)
+{
+ bool running;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ if (running)
+ return true;
+
+ return false;
+}
+
+static bool
+fix_slave_domain_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
+{
+ bool running;
+
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_global_system_variables);
+
+ return running ? true : false;
+}
+
+
+static Sys_var_ulong Sys_slave_domain_parallel_threads(
+ "slave_domain_parallel_threads",
+ "Maximum number of parallel threads to use on slave for events in a "
+ "single replication domain. When using multiple domains, this can be "
+ "used to limit a single domain from grabbing all threads and thus "
+ "stalling other domains. The default of 0 means to allow a domain to "
+ "grab as many threads as it wants, up to the value of "
+ "slave_parallel_threads.",
+ GLOBAL_VAR(opt_slave_domain_parallel_threads), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG, ON_CHECK(check_slave_domain_parallel_threads),
+ ON_UPDATE(fix_slave_domain_parallel_threads));
+
+
static Sys_var_ulong Sys_slave_parallel_max_queued(
"slave_parallel_max_queued",
"Limit on how much memory SQL threads should use per parallel "