summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-02-26 15:02:09 +0100
committerunknown <knielsen@knielsen-hq.org>2014-02-26 15:02:09 +0100
commite90f68c0ba1802d58b06bc7178513f77b27b662b (patch)
treede9f14296cb3a637b036b9358471215f7bd60dca
parent50323f12630fbc5b19b40f1a9affdc550cc7cfd9 (diff)
downloadmariadb-git-e90f68c0ba1802d58b06bc7178513f77b27b662b.tar.gz
MDEV-5657: Parallel replication.
Clean up and improve the parallel implementation code, mainly related to scheduling of work to threads and handling of stop and errors. Fix a lot of bugs in various corner cases that could lead to crashes or corruption. Fix that a single replication domain could easily grab all worker threads and stall all other domains; now a configuration variable --slave-domain-parallel-threads allows to limit the number of workers. Allow next event group to start as soon as previous group begins the commit phase (as opposed to when it ends it); this allows multiple event groups on the slave to participate in group commit, even when no other opportunities for parallelism are available. Various fixes: - Fix some races in the rpl.rpl_parallel test case. - Fix an old incorrect assertion in Log_event iocache read. - Fix repeated malloc/free of wait_for_commit and rpl_group_info objects. - Simplify wait_for_commit wakeup logic. - Fix one case in queue_for_group_commit() where killing one thread would fail to correctly signal the error to the next, causing loss of the transaction after slave restart. - Fix leaking of pthreads (and their allocated stack) due to missing PTHREAD_CREATE_DETACHED attribute. - Fix how one batch of group-committed transactions wait for the previous batch before starting to execute themselves. The old code had a very complex scheduling where the first transaction was handled differently, with subtle bugs in corner cases. Now each event group is always scheduled for a new worker (in a round-robin fashion amongst available workers). Keep a count of how many transactions have started to commit, and wait for that counter to reach the appropriate value. - Fix slave stop to wait for all workers to actually complete processing; before, the wait was for update of last_committed_sub_id, which happens a bit earlier, and could leave worker threads potentially accessing bits of the replication state that is no longer valid after slave stop. - Fix a couple of places where the test suite would kill a thread waiting inside enter_cond() in connection with debug_sync; debug_sync + kill can crash in rare cases due to a race with mysys_var_current_mutex in this case. - Fix some corner cases where we had enter_cond() but no exit_cond(). - Fix that we could get failure in wait_for_prior_commit() but forget to flag the error with my_error(). - Fix slave stop (both for normal stop and stop due to error). Now, at stop we pick a specific safe point (in terms of event groups executed) and make sure that all event groups before that point are executed to completion, and that no event group after start executing; this ensures a safe place to restart replication, even for non-transactional stuff/DDL. In error stop, make sure that all prior event groups are allowed to execute to completion, and that any later event groups that have started are rolled back, if possible. The old code could leave eg. T1 and T3 committed but T2 not, or it could even leave half a transaction not rolled back in some random worker, which would cause big problems when that worker was later reused after slave restart. - Fix the accounting of amount of events queued for one worker. Before, the amount was reduced immediately as soon as the events were dequeued (which happens all at once); this allowed twice the amount of events to be queued in memory for each single worker, which is not what users would expect. - Fix that an error set during execution of one event was sometimes not cleared before executing the next, causing problems with the error reporting. - Fix incorrect handling of thd->killed in worker threads.
-rw-r--r--mysql-test/r/mysqld--help.result9
-rw-r--r--mysql-test/suite/perfschema/r/dml_setup_instruments.result4
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result88
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test108
-rw-r--r--mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result13
-rw-r--r--mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test14
-rw-r--r--mysys/mf_iocache.c4
-rw-r--r--sql/log.cc29
-rw-r--r--sql/mysqld.cc9
-rw-r--r--sql/mysqld.h6
-rw-r--r--sql/rpl_parallel.cc951
-rw-r--r--sql/rpl_parallel.h146
-rw-r--r--sql/rpl_rli.cc63
-rw-r--r--sql/rpl_rli.h20
-rw-r--r--sql/slave.cc6
-rw-r--r--sql/sql_class.cc52
-rw-r--r--sql/sql_class.h29
-rw-r--r--sql/sys_vars.cc43
18 files changed, 1144 insertions, 450 deletions
diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result
index 953e9d926dd..9b124952095 100644
--- a/mysql-test/r/mysqld--help.result
+++ b/mysql-test/r/mysqld--help.result
@@ -782,6 +782,14 @@ The following options may be given as the first argument:
is already the default.
--slave-compressed-protocol
Use compression on master/slave protocol
+ --slave-domain-parallel-threads=#
+ Maximum number of parallel threads to use on slave for
+ events in a single replication domain. When using
+ multiple domains, this can be used to limit a single
+ domain from grabbing all threads and thus stalling other
+ domains. The default of 0 means to allow a domain to grab
+ as many threads as it wants, up to the value of
+ slave_parallel_threads.
--slave-exec-mode=name
Modes for how replication events should be executed.
Legal values are STRICT (default) and IDEMPOTENT. In
@@ -1155,6 +1163,7 @@ skip-networking FALSE
skip-show-database FALSE
skip-slave-start FALSE
slave-compressed-protocol FALSE
+slave-domain-parallel-threads 0
slave-exec-mode STRICT
slave-max-allowed-packet 1073741824
slave-net-timeout 3600
diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
index 17087264e7c..b47283d9193 100644
--- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result
+++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
@@ -37,6 +37,7 @@ where name like 'Wait/Synch/Cond/sql/%'
order by name limit 10;
NAME ENABLED TIMED
wait/synch/cond/sql/COND_flush_thread_cache YES YES
+wait/synch/cond/sql/COND_group_commit_orderer YES YES
wait/synch/cond/sql/COND_manager YES YES
wait/synch/cond/sql/COND_parallel_entry YES YES
wait/synch/cond/sql/COND_prepare_ordered YES YES
@@ -44,8 +45,7 @@ wait/synch/cond/sql/COND_queue_state YES YES
wait/synch/cond/sql/COND_rpl_status YES YES
wait/synch/cond/sql/COND_rpl_thread YES YES
wait/synch/cond/sql/COND_rpl_thread_pool YES YES
-wait/synch/cond/sql/COND_server_started YES YES
-wait/synch/cond/sql/COND_thread_cache YES YES
+wait/synch/cond/sql/COND_rpl_thread_queue YES YES
select * from performance_schema.setup_instruments
where name='Wait';
select * from performance_schema.setup_instruments
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 2ff6bd7cbe1..2531bf9457c 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -115,6 +115,7 @@ SET GLOBAL slave_parallel_threads=10;
SET debug_sync='RESET';
include/start_slave.inc
*** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
+SET debug_sync='RESET';
FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
@@ -141,6 +142,7 @@ INSERT INTO t3 VALUES (6, foo(16,
''));
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
+SET debug_sync='RESET';
SELECT * FROM t3 ORDER BY a;
a b
1 1
@@ -213,6 +215,9 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
slave-bin.000003 # Xid # # COMMIT /* XID */
*** Test STOP SLAVE in parallel mode ***
include/stop_slave.inc
+SET debug_sync='RESET';
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
SET binlog_direct_non_transactional_updates=0;
SET sql_log_bin=0;
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
@@ -227,10 +232,15 @@ INSERT INTO t3 VALUES(21, 21);
INSERT INTO t3 VALUES(22, 22);
SET binlog_format=@old_format;
BEGIN;
-INSERT INTO t2 VALUES (21);
+INSERT INTO t2 VALUES (21);
START SLAVE;
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
STOP SLAVE;
+SET debug_sync='now WAIT_FOR wait_for_done_waiting';
ROLLBACK;
+SET GLOBAL debug_dbug=@old_dbug;
+SET debug_sync='RESET';
include/wait_for_slave_to_stop.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
@@ -292,6 +302,7 @@ a b
32 32
33 33
34 34
+SET debug_sync='RESET';
SET sql_log_bin=0;
CALL mtr.add_suppression("Query execution was interrupted");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
@@ -308,6 +319,7 @@ STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -379,6 +391,7 @@ a b
42 42
43 43
44 44
+SET debug_sync='RESET';
SET debug_sync='now WAIT_FOR t2_query';
SET debug_sync='now SIGNAL t2_cont';
SET debug_sync='now WAIT_FOR t1_ready';
@@ -386,9 +399,7 @@ KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1963]
-SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
-a b
-41 41
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -463,6 +474,7 @@ a b
52 52
53 53
54 54
+SET debug_sync='RESET';
SET debug_sync='now WAIT_FOR t2_query';
SET debug_sync='now SIGNAL t2_cont';
SET debug_sync='now WAIT_FOR t1_ready';
@@ -473,6 +485,7 @@ include/wait_for_slave_sql_error.inc [errno=1317,1963]
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
a b
51 51
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -514,14 +527,18 @@ include/start_slave.inc
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
-SET GLOBAL slave_parallel_threads=3;
+SET GLOBAL slave_parallel_threads=4;
include/start_slave.inc
*** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
SET binlog_format=statement;
SET gtid_domain_id=2;
+BEGIN;
+INSERT INTO t3 VALUES (70, foo(70,
+'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
INSERT INTO t3 VALUES (60, foo(60,
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+COMMIT;
SET gtid_domain_id=0;
SET debug_sync='now WAIT_FOR d2_query';
SET gtid_domain_id=1;
@@ -540,15 +557,27 @@ INSERT INTO t3 VALUES (63, foo(63,
'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
SET debug_sync='now WAIT_FOR d0_query';
+SET gtid_domain_id=3;
+BEGIN;
+INSERT INTO t3 VALUES (68, foo(68,
+'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
+INSERT INTO t3 VALUES (69, foo(69,
+'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
+'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+SET debug_sync='now WAIT_FOR d3_query';
SET debug_sync='now SIGNAL d2_cont2';
SET debug_sync='now WAIT_FOR d2_done';
SET debug_sync='now SIGNAL d1_cont2';
SET debug_sync='now WAIT_FOR d1_done';
SET debug_sync='now SIGNAL d0_cont2';
SET debug_sync='now WAIT_FOR d0_done';
+SET debug_sync='now SIGNAL d3_cont2';
+SET debug_sync='now WAIT_FOR d3_done';
SET binlog_format=statement;
INSERT INTO t3 VALUES (64, foo(64,
-'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
INSERT INTO t3 VALUES (65, foo(65, '', ''));
SET debug_sync='now WAIT_FOR master_queued2';
@@ -569,23 +598,34 @@ a b
65 65
66 66
67 67
+68 68
+69 69
+70 70
+SET debug_sync='RESET';
SET debug_sync='now SIGNAL d0_cont';
SET debug_sync='now WAIT_FOR t1_waiting';
+SET debug_sync='now SIGNAL d3_cont';
+SET debug_sync='now WAIT_FOR t2_waiting';
SET debug_sync='now SIGNAL d1_cont';
SET debug_sync='now WAIT_FOR t3_waiting';
SET debug_sync='now SIGNAL d2_cont';
+SET debug_sync='now WAIT_FOR t4_waiting';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t3_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
a b
60 60
61 61
62 62
63 63
64 64
+68 68
+69 69
+70 70
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -597,11 +637,11 @@ RETURN x;
END
||
SET sql_log_bin=1;
-INSERT INTO t3 VALUES (69,0);
+UPDATE t3 SET b=b+1 WHERE a=60;
include/start_slave.inc
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
a b
-60 60
+60 61
61 61
62 62
63 63
@@ -609,7 +649,9 @@ a b
65 65
66 66
67 67
-69 0
+68 68
+69 69
+70 70
SET sql_log_bin=0;
DROP FUNCTION foo;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
@@ -634,37 +676,31 @@ include/start_slave.inc
SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
SET GLOBAL slave_parallel_max_queued=9000;
SET binlog_format=statement;
-INSERT INTO t3 VALUES (70, foo(0,
+INSERT INTO t3 VALUES (80, foo(0,
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
SET debug_sync='now WAIT_FOR query_waiting';
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
-INSERT INTO t3 VALUES (72, 0);
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
a b
-70 0
-71 10000
-72 0
+80 0
+81 10000
SET debug_sync='now WAIT_FOR wait_queue_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR wait_queue_killed';
SET debug_sync='now SIGNAL query_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1927,1963]
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
-a b
-70 0
-71 10000
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
-INSERT INTO t3 VALUES (73,0);
+INSERT INTO t3 VALUES (82,0);
+SET debug_sync='RESET';
include/start_slave.inc
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
a b
-70 0
-71 10000
-72 0
-73 0
+80 0
+81 10000
+82 0
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index 72a0a72db7d..a71534d2eb1 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -163,6 +163,7 @@ SET debug_sync='RESET';
--echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
--connection server_1
+SET debug_sync='RESET';
FLUSH LOGS;
--source include/wait_for_binlog_checkpoint.inc
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
@@ -230,6 +231,7 @@ REAP;
REAP;
--connection con_temp5
REAP;
+SET debug_sync='RESET';
--connection server_1
SELECT * FROM t3 ORDER BY a;
@@ -270,6 +272,10 @@ SELECT * FROM t3 ORDER BY a;
--echo *** Test STOP SLAVE in parallel mode ***
--connection server_2
--source include/stop_slave.inc
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+SET debug_sync='RESET';
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
--connection server_1
# Set up a couple of transactions. The first will be blocked halfway
@@ -288,7 +294,7 @@ BEGIN;
INSERT INTO t2 VALUES (20);
--disable_warnings
INSERT INTO t1 VALUES (20);
---disable_warnings
+--enable_warnings
INSERT INTO t2 VALUES (21);
INSERT INTO t3 VALUES (20, 20);
COMMIT;
@@ -300,7 +306,7 @@ SET binlog_format=@old_format;
# Start a connection that will block the replicated transaction halfway.
--connection con_temp1
BEGIN;
-INSERT INTO t2 VALUES (21);
+INSERT INTO t2 VALUES (21);
--connection server_2
START SLAVE;
@@ -312,13 +318,20 @@ START SLAVE;
--connection con_temp2
# Initiate slave stop. It will have to wait for the current event group
# to complete.
+# The dbug injection causes debug_sync to signal 'wait_for_done_waiting'
+# when the SQL driver thread is ready.
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
send STOP SLAVE;
--connection con_temp1
+SET debug_sync='now WAIT_FOR wait_for_done_waiting';
ROLLBACK;
--connection con_temp2
reap;
+SET GLOBAL debug_dbug=@old_dbug;
+SET debug_sync='RESET';
--connection server_2
--source include/wait_for_slave_to_stop.inc
@@ -397,6 +410,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
SET sql_log_bin=0;
@@ -431,6 +445,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -535,6 +550,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Wait until T2 is inside executing its insert of 42, then find it in SHOW
@@ -559,10 +575,10 @@ SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1963
--source include/wait_for_slave_sql_error.inc
-SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -673,6 +689,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Wait until T2 is inside executing its insert of 52, then find it in SHOW
@@ -701,6 +718,7 @@ SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -752,7 +770,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
-SET GLOBAL slave_parallel_threads=3;
+SET GLOBAL slave_parallel_threads=4;
--source include/start_slave.inc
@@ -762,24 +780,29 @@ SET GLOBAL slave_parallel_threads=3;
# can run in parallel with each other (same group commit and commit id),
# but not in parallel with T1.
#
-# We use three worker threads. T1 and T2 will be queued on the first, T3 on
-# the second, and T4 on the third. We will delay T1 commit, T3 will wait for
-# T1 to commit before it can start. We will kill T3 during this wait, and
+# We use four worker threads, each Ti will be queued on each their own
+# worker thread. We will delay T1 commit, T3 will wait for T1 to begin
+# commit before it can start. We will kill T3 during this wait, and
# check that everything works correctly.
#
# It is rather tricky to get the correct thread id of the worker to kill.
-# We start by injecting three dummy transactions in a debug_sync-controlled
+# We start by injecting four dummy transactions in a debug_sync-controlled
# manner to be able to get known thread ids for the workers in a pool with
-# just 3 worker threads. Then we let in each of the real test transactions
+# just 4 worker threads. Then we let in each of the real test transactions
# T1-T4 one at a time in a way which allows us to know which transaction
# ends up with which thread id.
--connection server_1
SET binlog_format=statement;
SET gtid_domain_id=2;
+BEGIN;
+# This debug_sync will linger on and be used to control T4 later.
+INSERT INTO t3 VALUES (70, foo(70,
+ 'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
INSERT INTO t3 VALUES (60, foo(60,
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+COMMIT;
SET gtid_domain_id=0;
--connection server_2
@@ -813,12 +836,30 @@ INSERT INTO t3 VALUES (63, foo(63,
SET debug_sync='now WAIT_FOR d0_query';
--let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'`
+--connection server_1
+SET gtid_domain_id=3;
+BEGIN;
+# These debug_sync's will linger on and be used to control T2 later.
+INSERT INTO t3 VALUES (68, foo(68,
+ 'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
+INSERT INTO t3 VALUES (69, foo(69,
+ 'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
+ 'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d3_query';
+--let $d3_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(69%' AND INFO NOT LIKE '%LIKE%'`
+
SET debug_sync='now SIGNAL d2_cont2';
SET debug_sync='now WAIT_FOR d2_done';
SET debug_sync='now SIGNAL d1_cont2';
SET debug_sync='now WAIT_FOR d1_done';
SET debug_sync='now SIGNAL d0_cont2';
SET debug_sync='now WAIT_FOR d0_done';
+SET debug_sync='now SIGNAL d3_cont2';
+SET debug_sync='now WAIT_FOR d3_done';
# Now prepare the real transactions T1, T2, T3, T4 on the master.
@@ -826,7 +867,7 @@ SET debug_sync='now WAIT_FOR d0_done';
# Create transaction T1.
SET binlog_format=statement;
INSERT INTO t3 VALUES (64, foo(64,
- 'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+ 'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
# Create transaction T2, as a group commit leader on the master.
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
@@ -861,6 +902,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Now we have the four transactions pending for replication on the slave.
@@ -872,15 +914,20 @@ SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
SET debug_sync='now SIGNAL d0_cont';
SET debug_sync='now WAIT_FOR t1_waiting';
-# T2 will be queued on the same worker D0 as T1.
+# Make the worker D3 free, and wait for T2 to be queued in it.
+SET debug_sync='now SIGNAL d3_cont';
+SET debug_sync='now WAIT_FOR t2_waiting';
+
# Now release worker D1, and wait for T3 to be queued in it.
# T3 will wait for T1 to commit before it can start.
SET debug_sync='now SIGNAL d1_cont';
SET debug_sync='now WAIT_FOR t3_waiting';
-# Release worker D2. T4 may or may not have time to be queued on it, but
-# it will not be able to complete due to T3 being killed.
+# Release worker D2. Wait for T4 to be queued, so we are sure it has
+# received the debug_sync signal (else we might overwrite it with the
+# next debug_sync).
SET debug_sync='now SIGNAL d2_cont';
+SET debug_sync='now WAIT_FOR t4_waiting';
# Now we kill the waiting transaction T3 in worker D1.
--replace_result $d1_thd_id THD_ID
@@ -895,10 +942,15 @@ SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1927,1963
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+# Since T2, T3, and T4 run in parallel, we can not be sure if T2 will have time
+# to commit or not before the stop. However, T1 should commit, and T3/T4 may
+# not have committed. (After slave restart we check that all become committed
+# eventually).
+SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -914,7 +966,7 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
SET sql_log_bin=1;
--connection server_1
-INSERT INTO t3 VALUES (69,0);
+UPDATE t3 SET b=b+1 WHERE a=60;
--save_master_pos
--connection server_2
@@ -951,6 +1003,7 @@ SET GLOBAL slave_parallel_threads=10;
--echo *** 5. Test killing thread that is waiting for queue of max length to shorten ***
+# Find the thread id of the driver SQL thread that we want to kill.
--let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'
--source include/wait_condition.inc
--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'`
@@ -961,12 +1014,8 @@ SET GLOBAL slave_parallel_max_queued=9000;
--let bigstring= `SELECT REPEAT('x', 10000)`
SET binlog_format=statement;
# Create an event that will wait to be signalled.
-INSERT INTO t3 VALUES (70, foo(0,
+INSERT INTO t3 VALUES (80, foo(0,
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
---disable_query_log
-# Create an event that will fill up the queue.
-eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring'));
---enable_query_log
--connection server_2
SET debug_sync='now WAIT_FOR query_waiting';
@@ -977,11 +1026,14 @@ SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
--connection server_1
-# This event will have to wait for the queue to become shorter before it can
-# be queued. We will test that things work when we kill the SQL driver thread
-# during this wait.
-INSERT INTO t3 VALUES (72, 0);
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+--disable_query_log
+# Create an event that will fill up the queue.
+# The Xid event at the end of the event group will have to wait for the Query
+# event with the INSERT to drain so the queue becomes shorter. However that in
+# turn waits for the prior event group to continue.
+eval INSERT INTO t3 VALUES (81, LENGTH('$bigstring'));
+--enable_query_log
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
--connection server_2
SET debug_sync='now WAIT_FOR wait_queue_ready';
@@ -995,19 +1047,19 @@ SET debug_sync='now SIGNAL query_cont';
--let $slave_sql_errno= 1317,1927,1963
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
--connection server_1
-INSERT INTO t3 VALUES (73,0);
+INSERT INTO t3 VALUES (82,0);
--save_master_pos
--connection server_2
+SET debug_sync='RESET';
--source include/start_slave.inc
--sync_with_master
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
--connection server_2
diff --git a/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result
new file mode 100644
index 00000000000..9e53d9bd891
--- /dev/null
+++ b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result
@@ -0,0 +1,13 @@
+SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads;
+SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default';
+must be zero because of default
+0
+SELECT @@SESSION.slave_domain_parallel_threads as 'no session var';
+ERROR HY000: Variable 'slave_domain_parallel_threads' is a GLOBAL variable
+SET GLOBAL slave_domain_parallel_threads= 0;
+SET GLOBAL slave_domain_parallel_threads= DEFAULT;
+SET GLOBAL slave_domain_parallel_threads= 10;
+SELECT @@GLOBAL.slave_domain_parallel_threads;
+@@GLOBAL.slave_domain_parallel_threads
+10
+SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads;
diff --git a/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test
new file mode 100644
index 00000000000..7be48fbd4c5
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test
@@ -0,0 +1,14 @@
+--source include/not_embedded.inc
+
+SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads;
+
+SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default';
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT @@SESSION.slave_domain_parallel_threads as 'no session var';
+
+SET GLOBAL slave_domain_parallel_threads= 0;
+SET GLOBAL slave_domain_parallel_threads= DEFAULT;
+SET GLOBAL slave_domain_parallel_threads= 10;
+SELECT @@GLOBAL.slave_domain_parallel_threads;
+
+SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads;
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index 02e5c5373ae..0c48b367942 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -1281,10 +1281,6 @@ read_append_buffer:
size_t transfer_len;
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
- /*
- TODO: figure out if the assert below is needed or correct.
- */
- DBUG_ASSERT(pos_in_file == info->end_of_file);
copy_len=min(Count, len_in_buff);
memcpy(Buffer, info->append_read_pos, copy_len);
info->append_read_pos += copy_len;
diff --git a/sql/log.cc b/sql/log.cc
index 6fead95a4b1..a7f3a69cfc9 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6644,13 +6644,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/
wfc= orig_entry->thd->wait_for_commit_ptr;
orig_entry->queued_by_other= false;
- if (wfc && wfc->waiting_for_commit)
+ if (wfc && wfc->waitee)
{
mysql_mutex_lock(&wfc->LOCK_wait_commit);
/* Do an extra check here, this time safely under lock. */
- if (wfc->waiting_for_commit)
+ if (wfc->waitee)
{
const char *old_msg;
+ wait_for_commit *loc_waitee;
+
/*
By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before
@@ -6661,21 +6663,20 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
queued_by_other flag is set.
*/
wfc->opaque_pointer= orig_entry;
+ DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
old_msg=
orig_entry->thd->enter_cond(&wfc->COND_wait_commit,
&wfc->LOCK_wait_commit,
"Waiting for prior transaction to commit");
- DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
- while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
+ while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed())
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other));
- if (wfc->waiting_for_commit)
+ if (loc_waitee)
{
/* Wait terminated due to kill. */
- wait_for_commit *loc_waitee= wfc->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running ||
orig_entry->queued_by_other)
@@ -6685,13 +6686,14 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
- } while (wfc->waiting_for_commit);
+ } while (wfc->waitee);
}
else
{
/* We were killed, so remove us from the list of waitee. */
wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ wfc->waitee= NULL;
orig_entry->thd->exit_cond(old_msg);
/* Interrupted by kill. */
@@ -6707,12 +6709,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
}
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
-
- if (wfc->wakeup_error)
- {
- my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
- DBUG_RETURN(-1);
- }
+ }
+ if (wfc && wfc->wakeup_error)
+ {
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ DBUG_RETURN(-1);
}
/*
@@ -9043,7 +9044,7 @@ start_binlog_background_thread()
array_elements(all_binlog_threads));
#endif
- if (mysql_thread_create(key_thread_binlog, &th, NULL,
+ if (mysql_thread_create(key_thread_binlog, &th, &connection_attrib,
binlog_background_thread, NULL))
return 1;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d95d856c18f..b7cdeb0bde4 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -546,6 +546,7 @@ ulong rpl_recovery_rank=0;
ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0;
+ulong opt_slave_domain_parallel_threads= 0;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
@@ -895,8 +896,10 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
-PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
- key_COND_parallel_entry, key_COND_prepare_ordered;
+PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
+ key_COND_rpl_thread_pool,
+ key_COND_parallel_entry, key_COND_group_commit_orderer,
+ key_COND_prepare_ordered;
PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
@@ -941,8 +944,10 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
+ { &key_COND_rpl_thread_queue, "COND_rpl_thread_queue", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
+ { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 4fdd34fd8be..dfdd54225fe 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -177,6 +177,7 @@ extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads;
+extern ulong opt_slave_domain_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
@@ -284,8 +285,9 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
-extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
- key_COND_parallel_entry;
+extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
+ key_COND_rpl_thread_pool,
+ key_COND_parallel_entry, key_COND_group_commit_orderer;
extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index ef282611f70..eab5b980c02 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -93,32 +93,12 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
}
-static bool
-sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
-{
- if (!rgi->rli->abort_slave && !abort_loop)
- return false;
-
- /*
- Do not abort in the middle of an event group that cannot be rolled back.
- */
- if ((thd->transaction.all.modified_non_trans_table ||
- (thd->variables.option_bits & OPTION_KEEP_LOG))
- && in_event_group)
- return false;
- /* ToDo: should we add some timeout like in sql_slave_killed?
- if (rgi->last_event_start_time == 0)
- rgi->last_event_start_time= my_time(0);
- */
-
- return true;
-}
-
-
static void
finish_event_group(THD *thd, int err, uint64 sub_id,
- rpl_parallel_entry *entry, wait_for_commit *wfc)
+ rpl_parallel_entry *entry, rpl_group_info *rgi)
{
+ wait_for_commit *wfc= &rgi->commit_orderer;
+
/*
Remove any left-over registration to wait for a prior commit to
complete. Normally, such wait would already have been removed at
@@ -163,12 +143,26 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < sub_id)
- {
entry->last_committed_sub_id= sub_id;
- mysql_cond_broadcast(&entry->COND_parallel_entry);
- }
+
+ /*
+ If this event group got error, then any following event groups that have
+ not yet started should just skip their group, preparing for stop of the
+ SQL driver thread.
+ */
+ if (unlikely(rgi->is_error) &&
+ entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
+ entry->stop_on_error_sub_id= sub_id;
+ /*
+ We need to mark that this event group started its commit phase, in case we
+ missed it before (otherwise we would deadlock the next event group that is
+ waiting for this). In most cases (normal DML), it will be a no-op.
+ */
+ rgi->mark_start_commit_no_lock();
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ thd->clear_error();
+ thd->stmt_da->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(err);
}
@@ -185,6 +179,20 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
}
+static void
+unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
+ const char *old_msg)
+{
+ if (*did_enter_cond)
+ {
+ thd->exit_cond(old_msg);
+ *did_enter_cond= false;
+ }
+ else
+ mysql_mutex_unlock(lock);
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -193,8 +201,14 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
+ bool group_skip_for_stop= false;
rpl_group_info *group_rgi= NULL;
+ group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
+ rpl_parallel_thread::queued_event *qevs_to_free;
+ rpl_group_info *rgis_to_free;
+ group_commit_orderer *gcos_to_free;
+ size_t total_event_size;
int err;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -234,44 +248,62 @@ handle_rpl_parallel_thread(void *arg)
rpt->running= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
- while (!rpt->stop && !thd->killed)
+ while (!rpt->stop)
{
- rpl_parallel_thread *list;
-
old_msg= thd->proc_info;
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
"Waiting for work from SQL thread");
- while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed &&
- !(rpt->current_entry && rpt->current_entry->force_abort))
+ /*
+ There are 4 cases that should cause us to wake up:
+ - Events have been queued for us to handle.
+ - We have an owner, but no events and not inside event group -> we need
+ to release ourself to the thread pool
+ - SQL thread is stopping, and we have an owner but no events, and we are
+ inside an event group; no more events will be queued to us, so we need
+ to abort the group (force_abort==1).
+ - Thread pool shutdown (rpt->stop==1).
+ */
+ while (!( (events= rpt->event_queue) ||
+ (rpt->current_owner && !in_event_group) ||
+ (rpt->current_owner && group_rgi->parallel_entry->force_abort) ||
+ rpt->stop))
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
- rpt->dequeue(events);
+ rpt->dequeue1(events);
thd->exit_cond(old_msg);
- mysql_cond_signal(&rpt->COND_rpl_thread);
more_events:
+ qevs_to_free= NULL;
+ rgis_to_free= NULL;
+ gcos_to_free= NULL;
+ total_event_size= 0;
while (events)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type;
rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
- uint64 wait_for_sub_id;
- uint64 wait_start_sub_id;
- bool end_of_group;
+ bool end_of_group, group_ending;
+ total_event_size+= events->event_size;
if (!events->ev)
{
handle_queued_pos_update(thd, events);
- my_free(events);
+ events->next= qevs_to_free;
+ qevs_to_free= events;
events= next;
continue;
}
err= 0;
group_rgi= rgi;
+ gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
{
+ bool did_enter_cond= false;
+ const char *old_msg= NULL;
+ uint64 wait_count;
+
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -293,50 +325,87 @@ handle_rpl_parallel_thread(void *arg)
occured.
Also do not start parallel execution of this event group until all
- prior groups have committed that are not safe to run in parallel with.
+ prior groups have reached the commit phase that are not safe to run
+ in parallel with.
*/
- wait_for_sub_id= rgi->wait_commit_sub_id;
- wait_start_sub_id= rgi->wait_start_sub_id;
- if (wait_for_sub_id || wait_start_sub_id)
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (!gco->installed)
{
- bool did_enter_cond= false;
- const char *old_msg= NULL;
-
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- if (wait_start_sub_id)
+ if (gco->prev_gco)
+ gco->prev_gco->next_gco= gco;
+ gco->installed= true;
+ }
+ wait_count= gco->wait_count;
+ if (wait_count > entry->count_committing_event_groups)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
+ old_msg= thd->enter_cond(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry,
+ "Waiting for prior transaction to start "
+ "commit before starting next transaction");
+ did_enter_cond= true;
+ do
{
- old_msg= thd->enter_cond(&entry->COND_parallel_entry,
- &entry->LOCK_parallel_entry,
- "Waiting for prior transaction to commit "
- "before starting next transaction");
- did_enter_cond= true;
- DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
- while (wait_start_sub_id > entry->last_committed_sub_id &&
- !thd->check_killed())
- mysql_cond_wait(&entry->COND_parallel_entry,
- &entry->LOCK_parallel_entry);
- if (wait_start_sub_id > entry->last_committed_sub_id)
+ if (thd->check_killed() && !rgi->is_error)
{
- /* The thread got a kill signal. */
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi);
+ /*
+ Even though we were killed, we need to continue waiting for the
+ prior event groups to signal that we can continue. Otherwise we
+ mess up the accounting for ordering. However, now that we have
+ marked the error, events will just be skipped rather than
+ executed, and things will progress quickly towards stop.
+ */
}
- rgi->wait_start_sub_id= 0; /* No need to check again. */
- }
- if (wait_for_sub_id > entry->last_committed_sub_id)
- {
- wait_for_commit *waitee=
- &rgi->wait_commit_group_info->commit_orderer;
- rgi->commit_orderer.register_wait_for_prior_commit(waitee);
- }
- if (did_enter_cond)
- thd->exit_cond(old_msg);
- else
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ mysql_cond_wait(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry);
+ } while (wait_count > entry->count_committing_event_groups);
+ }
+
+ if ((tmp_gco= gco->prev_gco))
+ {
+ /*
+ Now all the event groups in the previous batch have entered their
+ commit phase, and will no longer access their gco. So we can free
+ it here.
+ */
+ DBUG_ASSERT(!tmp_gco->prev_gco);
+ gco->prev_gco= NULL;
+ tmp_gco->next_gco= gcos_to_free;
+ gcos_to_free= tmp_gco;
}
+ if (entry->force_abort && wait_count > entry->stop_count)
+ {
+ /*
+ We are stopping (STOP SLAVE), and this event group is beyond the
+ point where we can safely stop. So set a flag that will cause us
+ to skip, rather than execute, the following events.
+ */
+ group_skip_for_stop= true;
+ }
+ else
+ group_skip_for_stop= false;
+
+ if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
+ group_skip_for_stop= true;
+ else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
+ {
+ /*
+ Register that the commit of this event group must wait for the
+ commit of the previous event group to complete before it may
+ complete itself, so that we preserve commit order.
+ */
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+ unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
+ &did_enter_cond, old_msg);
+
if(thd->wait_for_commit_ptr)
{
/*
@@ -353,13 +422,23 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer;
}
+ group_ending= event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (((Query_log_event *)events->ev)->is_commit() ||
+ ((Query_log_event *)events->ev)->is_rollback()));
+ if (group_ending)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
+ rgi->mark_start_commit();
+ }
+
/*
If the SQL thread is stopping, we just skip execution of all the
following event groups. We still do all the normal waiting and wakeup
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group))
+ if (!rgi->is_error && !group_skip_for_stop)
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
@@ -367,13 +446,11 @@ handle_rpl_parallel_thread(void *arg)
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
- event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
- (((Query_log_event *)events->ev)->is_commit() ||
- ((Query_log_event *)events->ev)->is_rollback())));
+ group_ending);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
- my_free(events);
+ events->next= qevs_to_free;
+ qevs_to_free= events;
if (err)
{
@@ -383,10 +460,11 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group)
{
in_event_group= false;
- finish_event_group(thd, err, event_gtid_sub_id, entry,
- &rgi->commit_orderer);
- delete rgi;
+ finish_event_group(thd, err, event_gtid_sub_id, entry, rgi);
+ rgi->next= rgis_to_free;
+ rgis_to_free= rgi;
group_rgi= rgi= NULL;
+ group_skip_for_stop= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@@ -394,6 +472,29 @@ handle_rpl_parallel_thread(void *arg)
}
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ /* Signal that our queue can now accept more events. */
+ rpt->dequeue2(total_event_size);
+ mysql_cond_signal(&rpt->COND_rpl_thread_queue);
+ /* We need to delay the free here, to when we have the lock. */
+ while (gcos_to_free)
+ {
+ group_commit_orderer *next= gcos_to_free->next_gco;
+ rpt->free_gco(gcos_to_free);
+ gcos_to_free= next;
+ }
+ while (rgis_to_free)
+ {
+ rpl_group_info *next= rgis_to_free->next;
+ rpt->free_rgi(rgis_to_free);
+ rgis_to_free= next;
+ }
+ while (qevs_to_free)
+ {
+ rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+ rpt->free_qev(qevs_to_free);
+ qevs_to_free= next;
+ }
+
if ((events= rpt->event_queue) != NULL)
{
/*
@@ -401,9 +502,8 @@ handle_rpl_parallel_thread(void *arg)
This is faster than having to wakeup the pool manager thread to give us
a new event.
*/
- rpt->dequeue(events);
+ rpt->dequeue1(events);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- mysql_cond_signal(&rpt->COND_rpl_thread);
goto more_events;
}
@@ -418,27 +518,26 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ thd->wait_for_prior_commit();
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
- group_rgi->parallel_entry, &group_rgi->commit_orderer);
+ group_rgi->parallel_entry, group_rgi);
signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
- delete group_rgi;
- group_rgi= NULL;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_rgi(group_rgi);
+ group_rgi= NULL;
+ group_skip_for_stop= false;
}
if (!in_event_group)
{
+ rpt->current_owner= NULL;
+ /* Tell wait_for_done() that we are done, if it is waiting. */
+ if (likely(rpt->current_entry) &&
+ unlikely(rpt->current_entry->force_abort))
+ mysql_cond_broadcast(&rpt->current_entry->COND_parallel_entry);
rpt->current_entry= NULL;
if (!rpt->stop)
- {
- mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
- list= rpt->pool->free_list;
- rpt->next= list;
- rpt->pool->free_list= rpt;
- if (!list)
- mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool);
- mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool);
- }
+ rpt->pool->release_thread(rpt);
}
}
@@ -467,6 +566,15 @@ handle_rpl_parallel_thread(void *arg)
}
+static void
+dealloc_gco(group_commit_orderer *gco)
+{
+ DBUG_ASSERT(!gco->prev_gco /* Must only free after dealloc previous */);
+ mysql_cond_destroy(&gco->COND_group_commit_orderer);
+ my_free(gco);
+}
+
+
int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count, bool skip_check)
@@ -501,8 +609,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
+ mysql_cond_init(key_COND_rpl_thread_queue,
+ &new_list[i]->COND_rpl_thread_queue, NULL);
new_list[i]->pool= pool;
- if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL,
+ if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
handle_rpl_parallel_thread, new_list[i]))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -539,7 +649,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
for (i= 0; i < pool->count; ++i)
{
- rpl_parallel_thread *rpt= pool->get_thread(NULL);
+ rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
rpt->stop= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -554,6 +664,24 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
mysql_cond_destroy(&rpt->COND_rpl_thread);
+ while (rpt->qev_free_list)
+ {
+ rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next;
+ my_free(rpt->qev_free_list);
+ rpt->qev_free_list= next;
+ }
+ while (rpt->rgi_free_list)
+ {
+ rpl_group_info *next= rpt->rgi_free_list->next;
+ delete rpt->rgi_free_list;
+ rpt->rgi_free_list= next;
+ }
+ while (rpt->gco_free_list)
+ {
+ group_commit_orderer *next= rpt->gco_free_list->next_gco;
+ dealloc_gco(rpt->gco_free_list);
+ rpt->gco_free_list= next;
+ }
}
my_free(pool->threads);
@@ -609,6 +737,121 @@ err:
}
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli)
+{
+ queued_event *qev;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((qev= qev_free_list))
+ qev_free_list= qev->next;
+ else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
+ return NULL;
+ }
+ qev->ev= ev;
+ qev->event_size= event_size;
+ qev->next= NULL;
+ strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
+ qev->event_relay_log_pos= rli->event_relay_log_pos;
+ qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
+ strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
+ return qev;
+}
+
+
+void
+rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ qev->next= qev_free_list;
+ qev_free_list= qev;
+}
+
+
+rpl_group_info*
+rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
+ rpl_parallel_entry *e)
+{
+ rpl_group_info *rgi;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((rgi= rgi_free_list))
+ {
+ rgi_free_list= rgi->next;
+ rgi->reinit(rli);
+ }
+ else
+ {
+ if(!(rgi= new rpl_group_info(rli)))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi));
+ return NULL;
+ }
+ rgi->is_parallel_exec = true;
+ if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
+ rgi->deferred_events= new Deferred_log_events(rli);
+ }
+ if (event_group_new_gtid(rgi, gtid_ev))
+ {
+ free_rgi(rgi);
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ return NULL;
+ }
+ rgi->parallel_entry= e;
+
+ return rgi;
+}
+
+
+void
+rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
+ rgi->free_annotate_event();
+ if (rgi->deferred_events)
+ {
+ delete rgi->deferred_events;
+ rgi->deferred_events= NULL;
+ }
+ rgi->next= rgi_free_list;
+ rgi_free_list= rgi;
+}
+
+
+group_commit_orderer *
+rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
+{
+ group_commit_orderer *gco;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((gco= gco_free_list))
+ gco_free_list= gco->next_gco;
+ else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco));
+ return NULL;
+ }
+ mysql_cond_init(key_COND_group_commit_orderer,
+ &gco->COND_group_commit_orderer, NULL);
+ gco->wait_count= wait_count;
+ gco->prev_gco= prev;
+ gco->next_gco= NULL;
+ gco->installed= false;
+ return gco;
+}
+
+
+void
+rpl_parallel_thread::free_gco(group_commit_orderer *gco)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
+ gco->next_gco= gco_free_list;
+ gco_free_list= gco;
+}
+
+
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: count(0), threads(0), free_list(0), changing(false), inited(false)
{
@@ -651,7 +894,8 @@ rpl_parallel_thread_pool::destroy()
Note that we return with the worker threads's LOCK_rpl_thread mutex locked.
*/
struct rpl_parallel_thread *
-rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
+rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
+ rpl_parallel_entry *entry)
{
rpl_parallel_thread *rpt;
@@ -661,16 +905,151 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
free_list= rpt->next;
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->current_owner= owner;
rpt->current_entry= entry;
return rpt;
}
+/*
+ Release a thread to the thread pool.
+ The thread should be locked, and should not have any work queued for it.
+*/
+void
+rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
+{
+ rpl_parallel_thread *list;
+
+ mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread);
+ DBUG_ASSERT(rpt->current_owner == NULL);
+ mysql_mutex_lock(&LOCK_rpl_thread_pool);
+ list= free_list;
+ rpt->next= list;
+ free_list= rpt;
+ if (!list)
+ mysql_cond_broadcast(&COND_rpl_thread_pool);
+ mysql_mutex_unlock(&LOCK_rpl_thread_pool);
+}
+
+
+/*
+ Obtain a worker thread that we can queue an event to.
+
+ Each invocation allocates a new worker thread, to maximise
+ parallelism. However, only up to a maximum of
+ --slave-domain-parallel-threads workers can be occupied by a single
+ replication domain; after that point, we start re-using worker threads that
+ are still executing events that were queued earlier for this thread.
+
+ We never queue more than --rpl-parallel-wait-queue_max amount of events
+ for one worker, to avoid the SQL driver thread using up all memory with
+ queued events while worker threads are stalling.
+
+ Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread
+ locked. Exception is if we were killed, in which case NULL is returned.
+
+ The *did_enter_cond flag is set true if we had to wait for a worker thread
+ to become free (with mysql_cond_wait()). If so, *old_msg will also be set,
+ and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead
+ of mysql_mutex_unlock.
+
+ If the flag `reuse' is set, the last worker thread will be returned again,
+ if it is still available. Otherwise a new worker thread is allocated.
+*/
+rpl_parallel_thread *
+rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+ const char **old_msg, bool reuse)
+{
+ uint32 idx;
+ rpl_parallel_thread *thr;
+
+ idx= rpl_thread_idx;
+ if (!reuse)
+ {
+ ++idx;
+ if (idx >= rpl_thread_max)
+ idx= 0;
+ rpl_thread_idx= idx;
+ }
+ thr= rpl_threads[idx];
+ if (thr)
+ {
+ *did_enter_cond= false;
+ mysql_mutex_lock(&thr->LOCK_rpl_thread);
+ for (;;)
+ {
+ if (thr->current_owner != &rpl_threads[idx])
+ {
+ /*
+ The worker thread became idle, and returned to the free list and
+ possibly was allocated to a different request. So we should allocate
+ a new worker thread.
+ */
+ unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
+ did_enter_cond, *old_msg);
+ thr= NULL;
+ break;
+ }
+ else if (thr->queued_size <= opt_slave_parallel_max_queued)
+ {
+ /* The thread is ready to queue into. */
+ break;
+ }
+ else if (rli->sql_driver_thd->check_killed())
+ {
+ unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
+ did_enter_cond, *old_msg);
+ my_error(ER_CONNECTION_KILLED, MYF(0));
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
+ };);
+ slave_output_error_info(rli, rli->sql_driver_thd);
+ return NULL;
+ }
+ else
+ {
+ /*
+ We have reached the limit of how much memory we are allowed to use
+ for queuing events, so wait for the thread to consume some of its
+ queue.
+ */
+ if (!*did_enter_cond)
+ {
+ /*
+ We need to do the debug_sync before enter_cond().
+ Because debug_sync changes the thd->mysys_var->current_mutex,
+ and this can cause THD::awake to use the wrong mutex.
+ */
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
+ };);
+ *old_msg= rli->sql_driver_thd->enter_cond
+ (&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread,
+ "Waiting for room in worker thread event queue");
+ *did_enter_cond= true;
+ }
+ mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread);
+ }
+ }
+ }
+ if (!thr)
+ rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
+ this);
+
+ return thr;
+}
+
static void
free_rpl_parallel_entry(void *element)
{
rpl_parallel_entry *e= (rpl_parallel_entry *)element;
+ if (e->current_gco)
+ dealloc_gco(e->current_gco);
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e);
@@ -710,10 +1089,22 @@ rpl_parallel::find(uint32 domain_id)
(const uchar *)&domain_id, 0)))
{
/* Allocate a new, empty one. */
- if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e),
- MYF(MY_ZEROFILL))))
+ ulong count= opt_slave_domain_parallel_threads;
+ if (count == 0 || count > opt_slave_parallel_threads)
+ count= opt_slave_parallel_threads;
+ rpl_parallel_thread **p;
+ if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
+ &e, sizeof(*e),
+ &p, count*sizeof(*p),
+ NULL))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
return NULL;
+ }
+ e->rpl_threads= p;
+ e->rpl_thread_max= count;
e->domain_id= domain_id;
+ e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -731,10 +1122,11 @@ rpl_parallel::find(uint32 domain_id)
void
-rpl_parallel::wait_for_done()
+rpl_parallel::wait_for_done(THD *thd)
{
struct rpl_parallel_entry *e;
- uint32 i;
+ rpl_parallel_thread *rpt;
+ uint32 i, j;
/*
First signal all workers that they must force quit; no more events will
@@ -742,26 +1134,58 @@ rpl_parallel::wait_for_done()
*/
for (i= 0; i < domain_hash.records; ++i)
{
- rpl_parallel_thread *rpt;
-
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ /*
+ We want the worker threads to stop as quickly as is safe. If the slave
+ SQL threads are behind, we could have significant amount of events
+ queued for the workers, and we want to stop without waiting for them
+ all to be applied first. But if any event group has already started
+ executing in a worker, we want to be sure that all prior event groups
+ are also executed, so that we stop at a consistent point in the binlog
+ stream (per replication domain).
+
+ All event groups wait for e->count_committing_event_groups to reach
+ the value of group_commit_orderer::wait_count before starting to
+ execute. Thus, at this point we know that any event group with a
+ strictly larger wait_count are safe to skip, none of them can have
+ started executing yet. So we set e->stop_count here and use it to
+ decide in the worker threads whether to continue executing an event
+ group or whether to skip it, when force_abort is set.
+ */
e->force_abort= true;
- if ((rpt= e->rpl_thread))
+ e->stop_count= e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ for (j= 0; j < e->rpl_thread_max; ++j)
{
- mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- if (rpt->current_entry == e)
- mysql_cond_signal(&rpt->COND_rpl_thread);
- mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if ((rpt= e->rpl_threads[j]))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (rpt->current_owner == &e->rpl_threads[j])
+ mysql_cond_signal(&rpt->COND_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
}
}
+ DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger",
+ {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
+ };);
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
- mysql_mutex_lock(&e->LOCK_parallel_entry);
- while (e->current_sub_id > e->last_committed_sub_id)
- mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
- mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ for (j= 0; j < e->rpl_thread_max; ++j)
+ {
+ if ((rpt= e->rpl_threads[j]))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ while (rpt->current_owner == &e->rpl_threads[j])
+ mysql_cond_wait(&e->COND_parallel_entry, &rpt->LOCK_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
+ }
}
}
@@ -788,6 +1212,21 @@ rpl_parallel::workers_idle()
/*
+ This is used when we get an error during processing in do_event();
+ We will not queue any event to the thread, but we still need to wake it up
+ to be sure that it will be returned to the pool.
+*/
+static void
+abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
+ bool *did_enter_cond, const char *old_msg)
+{
+ unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread,
+ did_enter_cond, old_msg);
+ mysql_cond_signal(&cur_thread->COND_rpl_thread);
+}
+
+
+/*
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can execute the query.
@@ -815,6 +1254,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
/*
Stop queueing additional event groups once the SQL thread is requested to
stop.
+
+ We have to queue any remaining events of any event group that has already
+ been partially queued, but after that we will just ignore any further
+ events the SQL driver thread may try to queue, and eventually it will stop.
*/
if (((typ= ev->get_type_code()) == GTID_EVENT ||
!(is_group_event= Log_event::is_group_event(typ))) &&
@@ -823,188 +1266,121 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (sql_thread_stopping)
{
delete ev;
- /* QQ: Need a better comment why we return false here */
+ /*
+ Return false ("no error"); normal stop is not an error, and otherwise the
+ error has already been recorded.
+ */
return false;
}
- if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
- MYF(0))))
+ if (typ == GTID_EVENT || unlikely(!current))
{
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ uint32 domain_id;
+ if (likely(typ == GTID_EVENT))
+ {
+ Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ 0 : gtid_ev->domain_id);
+ }
+ else
+ domain_id= 0;
+ if (!(e= find(domain_id)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ delete ev;
+ return true;
+ }
+ current= e;
+ }
+ else
+ e= current;
+
+ /*
+ Find a worker thread to queue the event for.
+ Prefer a new thread, so we maximise parallelism (at least for the group
+ commit). But do not exceed a limit of --slave-domain-parallel-threads;
+ instead re-use a thread that we queued for previously.
+ */
+ cur_thread=
+ e->choose_thread(rli, &did_enter_cond, &old_msg, typ != GTID_EVENT);
+ if (!cur_thread)
+ {
+ /* This means we were killed. The error is already signalled. */
+ delete ev;
+ return true;
+ }
+
+ if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
+ {
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
delete ev;
return true;
}
- qev->ev= ev;
- qev->event_size= event_size;
- qev->next= NULL;
- strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
- qev->event_relay_log_pos= rli->event_relay_log_pos;
- qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
- strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
- 0 : gtid_ev->domain_id);
- if (!(e= find(domain_id)) ||
- !(rgi= new rpl_group_info(rli)) ||
- event_group_new_gtid(rgi, gtid_ev))
+ if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
{
- my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
- delete rgi;
- my_free(qev);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
delete ev;
return true;
}
- rgi->is_parallel_exec = true;
- if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
- rgi->deferred_events= new Deferred_log_events(rli);
- if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
- e->last_commit_id == gtid_ev->commit_id)
- {
- /*
- We are already executing something else in this domain. But the two
- event groups were committed together in the same group commit on the
- master, so we can still do them in parallel here on the slave.
+ /*
+ We queue the event group in a new worker thread, to run in parallel
+ with previous groups.
- However, the commit of this event must wait for the commit of the prior
- event, to preserve binlog commit order and visibility across all
- servers in the replication hierarchy.
+ To preserve commit order within the replication domain, we set up
+ rgi->wait_commit_sub_id to make the new group commit only after the
+ previous group has committed.
- In addition, we must not start executing this event until we have
- finished the previous collection of event groups that group-committed
- together; we use rgi->wait_start_sub_id to control this.
- */
- rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
- rgi->wait_commit_sub_id= e->current_sub_id;
- rgi->wait_commit_group_info= e->current_group_info;
- rgi->wait_start_sub_id= e->prev_groupcommit_sub_id;
- e->rpl_thread= cur_thread= rpt;
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else
+ Event groups that group-committed together on the master can be run
+ in parallel with each other without restrictions. But one batch of
+ group-commits may not start before all groups in the previous batch
+ have initiated their commit phase; we set up rgi->gco to ensure that.
+ */
+ rgi->wait_commit_sub_id= e->current_sub_id;
+ rgi->wait_commit_group_info= e->current_group_info;
+
+ if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
+ e->last_commit_id == gtid_ev->commit_id))
{
/*
- Check if we already have a worker thread for this entry.
-
- We continue to queue more events up for the worker thread while it is
- still executing the first ones, to be able to start executing a large
- event group without having to wait for the end to be fetched from the
- master. And we continue to queue up more events after the first group,
- so that we can continue to process subsequent parts of the relay log in
- parallel without having to wait for previous long-running events to
- complete.
-
- But if the worker thread is idle at any point, it may return to the
- idle list or start servicing a different request. So check this, and
- allocate a new thread if the old one is no longer processing for us.
+ A new batch of transactions that group-committed together on the master.
+
+ Remember the count that marks the end of the previous group committed
+ batch, and allocate a new gco.
*/
- cur_thread= e->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- for (;;)
- {
- if (cur_thread->current_entry != e)
- {
- /*
- The worker thread became idle, and returned to the free list and
- possibly was allocated to a different request. This also means
- that everything previously queued has already been executed,
- else the worker thread would not have become idle. So we should
- allocate a new worker thread.
- */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- e->rpl_thread= cur_thread= NULL;
- break;
- }
- else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
- break; // The thread is ready to queue into
- else if (rli->sql_driver_thd->check_killed())
- {
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- my_error(ER_CONNECTION_KILLED, MYF(0));
- delete rgi;
- my_free(qev);
- delete ev;
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
- };);
- slave_output_error_info(rli, rli->sql_driver_thd);
- return true;
- }
- else
- {
- /*
- We have reached the limit of how much memory we are allowed to
- use for queuing events, so wait for the thread to consume some
- of its queue.
- */
- if (!did_enter_cond)
- {
- old_msg= rli->sql_driver_thd->enter_cond
- (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
- "Waiting for room in worker thread event queue");
- did_enter_cond= true;
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
- };);
- }
- mysql_cond_wait(&cur_thread->COND_rpl_thread,
- &cur_thread->LOCK_rpl_thread);
- }
- }
- }
+ uint64 count= e->count_queued_event_groups;
+ group_commit_orderer *gco;
- if (!cur_thread)
- {
- /*
- Nothing else is currently running in this domain. We can
- spawn a new thread to do this event group in parallel with
- anything else that might be running in other domains.
- */
- cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else
+ if (!(gco= cur_thread->get_gco(count, e->current_gco)))
{
- /*
- We are still executing the previous event group for this replication
- domain, and we have to wait for that to finish before we can start on
- the next one. So just re-use the thread.
- */
+ cur_thread->free_rgi(rgi);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
+ delete ev;
+ return true;
}
-
- rgi->wait_commit_sub_id= 0;
- rgi->wait_start_sub_id= 0;
- e->prev_groupcommit_sub_id= e->current_sub_id;
+ e->current_gco= rgi->gco= gco;
}
-
+ else
+ rgi->gco= e->current_gco;
if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
- {
- e->last_server_id= gtid_ev->server_id;
- e->last_seq_no= gtid_ev->seq_no;
e->last_commit_id= gtid_ev->commit_id;
- }
else
- {
- e->last_server_id= 0;
- e->last_seq_no= 0;
e->last_commit_id= 0;
- }
-
qev->rgi= e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
- current= rgi->parallel_entry= e;
+ ++e->count_queued_event_groups;
}
- else if (!is_group_event || !current)
+ else if (!is_group_event || !e)
{
my_off_t log_pos;
int err;
@@ -1014,7 +1390,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Same for events not preceeded by GTID (we should not see those normally,
but they might be from an old master).
- The varuable `current' is NULL for the case where the master did not
+ The variable `e' is NULL for the case where the master did not
have GTID, like a MariaDB 5.5 or MySQL master.
*/
qev->rgi= serial_rgi;
@@ -1041,18 +1417,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (err)
{
- my_free(qev);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
return true;
}
- qev->ev= NULL;
- qev->future_event_master_log_pos= log_pos;
- if (!current)
- {
- rli->event_relay_log_pos= rli->future_event_relay_log_pos;
- handle_queued_pos_update(rli->sql_driver_thd, qev);
- my_free(qev);
- return false;
- }
/*
Queue an empty event, so that the position will be updated in a
reasonable way relative to other events:
@@ -1065,40 +1434,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
- cur_thread= current->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != current)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- cur_thread= NULL;
- }
- }
- if (!cur_thread)
- cur_thread= current->rpl_thread=
- global_rpl_thread_pool.get_thread(current);
+ qev->ev= NULL;
+ qev->future_event_master_log_pos= log_pos;
}
else
{
- cur_thread= current->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != current)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- cur_thread= NULL;
- }
- }
- if (!cur_thread)
- {
- cur_thread= current->rpl_thread=
- global_rpl_thread_pool.get_thread(current);
- }
- qev->rgi= current->current_group_info;
+ qev->rgi= e->current_group_info;
}
/*
@@ -1106,10 +1447,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev);
- if (did_enter_cond)
- rli->sql_driver_thd->exit_cond(old_msg);
- else
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
+ &did_enter_cond, old_msg);
mysql_cond_signal(&cur_thread->COND_rpl_thread);
return false;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 019a354c57d..90649230f98 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -9,16 +9,66 @@ struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
class Relay_log_info;
+
+
+/*
+ Structure used to keep track of the parallel replication of a batch of
+ event-groups that group-committed together on the master.
+
+ It is used to ensure that every event group in one batch has reached the
+ commit stage before the next batch starts executing.
+
+ Note the lifetime of this structure:
+
+ - It is allocated when the first event in a new batch of group commits
+ is queued, from the free list rpl_parallel_entry::gco_free_list.
+
+ - The gco for the batch currently being queued is owned by
+ rpl_parallel_entry::current_gco. The gco for a previous batch that has
+ been fully queued is owned by the gco->prev_gco pointer of the gco for
+ the following batch.
+
+ - The worker thread waits on gco->COND_group_commit_orderer for
+ rpl_parallel_entry::count_committing_event_groups to reach wait_count
+ before starting; the first waiter links the gco into the next_gco
+ pointer of the gco of the previous batch for signalling.
+
+ - When an event group reaches the commit stage, it signals the
+ COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and
+ rpl_parallel_entry::count_committing_event_groups has reached
+ gco->next_gco->wait_count.
+
+ - When gco->wait_count is reached for a worker and the wait completes,
+ the worker frees gco->prev_gco; at this point it is guaranteed not to
+ be needed any longer.
+*/
+struct group_commit_orderer {
+ /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */
+ mysql_cond_t COND_group_commit_orderer;
+ uint64 wait_count;
+ group_commit_orderer *prev_gco;
+ group_commit_orderer *next_gco;
+ bool installed;
+};
+
+
struct rpl_parallel_thread {
bool delay_start;
bool running;
bool stop;
mysql_mutex_t LOCK_rpl_thread;
mysql_cond_t COND_rpl_thread;
+ mysql_cond_t COND_rpl_thread_queue;
struct rpl_parallel_thread *next; /* For free list. */
struct rpl_parallel_thread_pool *pool;
THD *thd;
- struct rpl_parallel_entry *current_entry;
+ /*
+ Who owns the thread, if any (it's a pointer into the
+ rpl_parallel_entry::rpl_threads array.
+ */
+ struct rpl_parallel_thread **current_owner;
+ /* The rpl_parallel_entry of the owner. */
+ rpl_parallel_entry *current_entry;
struct queued_event {
queued_event *next;
Log_event *ev;
@@ -31,6 +81,9 @@ struct rpl_parallel_thread {
size_t event_size;
} *event_queue, *last_in_queue;
uint64 queued_size;
+ queued_event *qev_free_list;
+ rpl_group_info *rgi_free_list;
+ group_commit_orderer *gco_free_list;
void enqueue(queued_event *qev)
{
@@ -42,15 +95,25 @@ struct rpl_parallel_thread {
queued_size+= qev->event_size;
}
- void dequeue(queued_event *list)
+ void dequeue1(queued_event *list)
{
- queued_event *tmp;
-
DBUG_ASSERT(list == event_queue);
event_queue= last_in_queue= NULL;
- for (tmp= list; tmp; tmp= tmp->next)
- queued_size-= tmp->event_size;
}
+
+ void dequeue2(size_t dequeue_size)
+ {
+ queued_size-= dequeue_size;
+ }
+
+ queued_event *get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli);
+ void free_qev(queued_event *qev);
+ rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
+ rpl_parallel_entry *e);
+ void free_rgi(rpl_group_info *rgi);
+ group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
+ void free_gco(group_commit_orderer *gco);
};
@@ -66,14 +129,16 @@ struct rpl_parallel_thread_pool {
rpl_parallel_thread_pool();
int init(uint32 size);
void destroy();
- struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry);
+ struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner,
+ rpl_parallel_entry *entry);
+ void release_thread(rpl_parallel_thread *rpt);
};
struct rpl_parallel_entry {
+ mysql_mutex_t LOCK_parallel_entry;
+ mysql_cond_t COND_parallel_entry;
uint32 domain_id;
- uint32 last_server_id;
- uint64 last_seq_no;
uint64 last_commit_id;
bool active;
/*
@@ -82,15 +147,41 @@ struct rpl_parallel_entry {
waiting for event groups to complete.
*/
bool force_abort;
+ /*
+ At STOP SLAVE (force_abort=true), we do not want to process all events in
+ the queue (which could unnecessarily delay stop, if a lot of events happen
+ to be queued). The stop_count provides a safe point at which to stop, so
+ that everything before becomes committed and nothing after does. The value
+ corresponds to group_commit_orderer::wait_count; if wait_count is less than
+ or equal to stop_count, we execute the associated event group, else we
+ skip it (and all following) and stop.
+ */
+ uint64 stop_count;
- rpl_parallel_thread *rpl_thread;
+ /*
+ Cyclic array recording the last rpl_thread_max worker threads that we
+ queued event for. This is used to limit how many workers a single domain
+ can occupy (--slave-domain-parallel-threads).
+
+ Note that workers are never explicitly deleted from the array. Instead,
+ we need to check (under LOCK_rpl_thread) that the thread still belongs
+ to us before re-using (rpl_thread::current_owner).
+ */
+ rpl_parallel_thread **rpl_threads;
+ uint32 rpl_thread_max;
+ uint32 rpl_thread_idx;
/*
The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection.
+
+ Event groups commit in order, so the rpl_group_info for an event group
+ will be alive (at least) as long as
+ rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to
+ safely refer back to previous event groups if they are still executing,
+ and ignore them if they completed, without requiring explicit
+ synchronisation between the threads.
*/
uint64 last_committed_sub_id;
- mysql_mutex_t LOCK_parallel_entry;
- mysql_cond_t COND_parallel_entry;
/*
The sub_id of the last event group in this replication domain that was
queued for execution by a worker thread.
@@ -98,14 +189,29 @@ struct rpl_parallel_entry {
uint64 current_sub_id;
rpl_group_info *current_group_info;
/*
- The sub_id of the last event group in the previous batch of group-committed
- transactions.
-
- When we spawn parallel worker threads for the next group-committed batch,
- they first need to wait for this sub_id to be committed before it is safe
- to start executing them.
+ If we get an error in some event group, we set the sub_id of that event
+ group here. Then later event groups (with higher sub_id) can know not to
+ try to start (event groups that already started will be rolled back when
+ wait_for_prior_commit() returns error).
+ The value is ULONGLONG_MAX when no error occured.
+ */
+ uint64 stop_on_error_sub_id;
+ /* Total count of event groups queued so far. */
+ uint64 count_queued_event_groups;
+ /*
+ Count of event groups that have started (but not necessarily completed)
+ the commit phase. We use this to know when every event group in a previous
+ batch of master group commits have started committing on the slave, so
+ that it is safe to start executing the events in the following batch.
*/
- uint64 prev_groupcommit_sub_id;
+ uint64 count_committing_event_groups;
+ /* The group_commit_orderer object for the events currently being queued. */
+ group_commit_orderer *current_gco;
+
+ rpl_parallel_thread * choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+ const char **old_msg, bool reuse);
+ group_commit_orderer *get_gco();
+ void free_gco(group_commit_orderer *gco);
};
struct rpl_parallel {
HASH domain_hash;
@@ -116,7 +222,7 @@ struct rpl_parallel {
~rpl_parallel();
void reset();
rpl_parallel_entry *find(uint32 domain_id);
- void wait_for_done();
+ void wait_for_done(THD *thd);
bool workers_idle();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 797f5681ec5..ffe5e516069 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1479,14 +1479,27 @@ end:
}
-rpl_group_info::rpl_group_info(Relay_log_info *rli_)
- : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
- wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
- tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
- is_parallel_exec(false), is_error(false),
- row_stmt_start_timestamp(0), long_find_row_note_printed(false)
+void
+rpl_group_info::reinit(Relay_log_info *rli)
+{
+ this->rli= rli;
+ tables_to_lock= NULL;
+ tables_to_lock_count= 0;
+ trans_retries= 0;
+ last_event_start_time= 0;
+ is_error= false;
+ row_stmt_start_timestamp= 0;
+ long_find_row_note_printed= false;
+ did_mark_start_commit= false;
+ commit_orderer.reinit();
+}
+
+rpl_group_info::rpl_group_info(Relay_log_info *rli)
+ : thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), parallel_entry(0),
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
+ reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
MY_MUTEX_INIT_FAST);
@@ -1706,4 +1719,40 @@ void rpl_group_info::slave_close_thread_tables(THD *thd)
}
+
+static void
+mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco)
+{
+ uint64 count= ++e->count_committing_event_groups;
+ if (gco->next_gco && gco->next_gco->wait_count == count)
+ mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer);
+}
+
+
+void
+rpl_group_info::mark_start_commit_no_lock()
+{
+ if (did_mark_start_commit)
+ return;
+ mark_start_commit_inner(parallel_entry, gco);
+ did_mark_start_commit= true;
+}
+
+
+void
+rpl_group_info::mark_start_commit()
+{
+ rpl_parallel_entry *e;
+
+ if (did_mark_start_commit)
+ return;
+
+ e= this->parallel_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ mark_start_commit_inner(e, gco);
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ did_mark_start_commit= true;
+}
+
+
#endif
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 3f95849a926..0ba259b0efd 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -481,6 +481,7 @@ private:
struct rpl_group_info
{
+ rpl_group_info *next; /* For free list in rpl_parallel_thread */
Relay_log_info *rli;
THD *thd;
/*
@@ -510,14 +511,15 @@ struct rpl_group_info
uint64 wait_commit_sub_id;
rpl_group_info *wait_commit_group_info;
/*
- If non-zero, the event group must wait for this sub_id to be committed
- before the execution of the event group is allowed to start.
+ This holds a pointer to a struct that keeps track of the need to wait
+ for the previous batch of event groups to reach the commit stage, before
+ this batch can start to execute.
(When we execute in parallel the transactions that group committed
together on the master, we still need to wait for any prior transactions
- to have commtted).
+ to have reached the commit stage).
*/
- uint64 wait_start_sub_id;
+ group_commit_orderer *gco;
struct rpl_parallel_entry *parallel_entry;
@@ -567,18 +569,22 @@ struct rpl_group_info
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
bool is_error;
+ /*
+ Set true when we signalled that we reach the commit phase. Used to avoid
+ counting one event group twice.
+ */
+ bool did_mark_start_commit;
-private:
/*
Runtime state for printing a note when slave is taking
too long while processing a row event.
*/
time_t row_stmt_start_timestamp;
bool long_find_row_note_printed;
-public:
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
+ void reinit(Relay_log_info *rli);
/*
Returns true if the argument event resides in the containter;
@@ -661,6 +667,8 @@ public:
void clear_tables_to_lock();
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
+ void mark_start_commit_no_lock();
+ void mark_start_commit();
time_t get_row_stmt_start_timestamp()
{
diff --git a/sql/slave.cc b/sql/slave.cc
index 07209c30ff3..b081a3369f5 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -330,7 +330,7 @@ run_slave_init_thread()
pthread_t th;
slave_init_thread_running= true;
- if (mysql_thread_create(key_thread_slave_init, &th, NULL,
+ if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
handle_slave_init, NULL))
{
sql_print_error("Failed to create thread while initialising slave");
@@ -4526,7 +4526,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done();
+ rli->parallel.wait_for_done(thd);
/* Thread stopped. Print the current replication position to the log */
{
@@ -4552,7 +4552,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
get the correct position printed.)
*/
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done();
+ rli->parallel.wait_for_done(thd);
/*
Some events set some playgrounds, which won't be cleared because thread
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 09ec7361fe3..b9e1bf91888 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5667,14 +5667,23 @@ bool THD::rgi_have_temporary_tables()
}
+void
+wait_for_commit::reinit()
+{
+ subsequent_commits_list= NULL;
+ next_subsequent_commit= NULL;
+ waitee= NULL;
+ opaque_pointer= NULL;
+ wakeup_error= 0;
+ wakeup_subsequent_commits_running= false;
+}
+
+
wait_for_commit::wait_for_commit()
- : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
- opaque_pointer(0),
- waiting_for_commit(false), wakeup_error(0),
- wakeup_subsequent_commits_running(false)
{
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+ reinit();
}
@@ -5722,7 +5731,7 @@ wait_for_commit::wakeup(int wakeup_error)
*/
mysql_mutex_lock(&LOCK_wait_commit);
- waiting_for_commit= false;
+ waitee= NULL;
this->wakeup_error= wakeup_error;
/*
Note that it is critical that the mysql_cond_signal() here is done while
@@ -5754,9 +5763,8 @@ wait_for_commit::wakeup(int wakeup_error)
void
wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
{
- waiting_for_commit= true;
- wakeup_error= 0;
DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
+ wakeup_error= 0;
this->waitee= waitee;
mysql_mutex_lock(&waitee->LOCK_wait_commit);
@@ -5766,7 +5774,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
see comments on wakeup_subsequent_commits2() for details.
*/
if (waitee->wakeup_subsequent_commits_running)
- waiting_for_commit= false;
+ this->waitee= NULL;
else
{
/*
@@ -5795,9 +5803,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
DEBUG_SYNC(thd, "wait_for_prior_commit_waiting");
old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
"Waiting for prior transaction to commit");
- while (waiting_for_commit && !thd->check_killed())
+ while ((loc_waitee= this->waitee) && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- if (!waiting_for_commit)
+ if (!loc_waitee)
{
if (wakeup_error)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
@@ -5810,7 +5818,6 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
waiter as to whether we succeed or fail (eg. we may roll back but waitee
might attempt to commit both us and any subsequent commits waiting for us).
*/
- loc_waitee= this->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
@@ -5819,21 +5826,29 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
do
{
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- } while (waiting_for_commit);
+ } while (this->waitee);
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
goto end;
}
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ this->waitee= NULL;
- DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wakeup_error, ER(wakeup_error), MYF(0));
+ thd->exit_cond(old_msg);
+ /*
+ Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to
+ use within enter_cond/exit_cond.
+ */
+ DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
+ return wakeup_error;
end:
thd->exit_cond(old_msg);
- waitee= NULL;
return wakeup_error;
}
@@ -5916,10 +5931,11 @@ wait_for_commit::wakeup_subsequent_commits2(int wakeup_error)
void
wait_for_commit::unregister_wait_for_prior_commit2()
{
+ wait_for_commit *loc_waitee;
+
mysql_mutex_lock(&LOCK_wait_commit);
- if (waiting_for_commit)
+ if ((loc_waitee= this->waitee))
{
- wait_for_commit *loc_waitee= this->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
@@ -5931,7 +5947,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
See comments on wakeup_subsequent_commits2() for more details.
*/
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
- while (waiting_for_commit)
+ while (this->waitee)
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
}
else
@@ -5939,10 +5955,10 @@ wait_for_commit::unregister_wait_for_prior_commit2()
/* Remove ourselves from the list in the waitee. */
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ this->waitee= NULL;
}
}
mysql_mutex_unlock(&LOCK_wait_commit);
- this->waitee= NULL;
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 726b920edb1..57dd92db49d 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1598,8 +1598,8 @@ struct wait_for_commit
{
/*
The LOCK_wait_commit protects the fields subsequent_commits_list and
- wakeup_subsequent_commits_running (for a waitee), and the flag
- waiting_for_commit and associated COND_wait_commit (for a waiter).
+ wakeup_subsequent_commits_running (for a waitee), and the pointer
+ waiterr and associated COND_wait_commit (for a waiter).
*/
mysql_mutex_t LOCK_wait_commit;
mysql_cond_t COND_wait_commit;
@@ -1607,7 +1607,13 @@ struct wait_for_commit
wait_for_commit *subsequent_commits_list;
/* Link field for entries in subsequent_commits_list. */
wait_for_commit *next_subsequent_commit;
- /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
+ /*
+ Our waitee, if we did register_wait_for_prior_commit(), and were not
+ yet woken up. Else NULL.
+
+ When this is cleared for wakeup, the COND_wait_commit condition is
+ signalled.
+ */
wait_for_commit *waitee;
/*
Generic pointer for use by the transaction coordinator to optimise the
@@ -1618,12 +1624,6 @@ struct wait_for_commit
used by another transaction coordinator for similar purposes.
*/
void *opaque_pointer;
- /*
- The waiting_for_commit flag is cleared when a waiter has been woken
- up. The COND_wait_commit condition is signalled when this has been
- cleared.
- */
- bool waiting_for_commit;
/* The wakeup error code from the waitee. 0 means no error. */
int wakeup_error;
/*
@@ -1639,10 +1639,14 @@ struct wait_for_commit
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
- if (waiting_for_commit)
+ if (waitee)
return wait_for_prior_commit2(thd);
else
+ {
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
return wakeup_error;
+ }
}
void wakeup_subsequent_commits(int wakeup_error)
{
@@ -1663,7 +1667,7 @@ struct wait_for_commit
}
void unregister_wait_for_prior_commit()
{
- if (waiting_for_commit)
+ if (waitee)
unregister_wait_for_prior_commit2();
}
/*
@@ -1683,7 +1687,7 @@ struct wait_for_commit
}
next_ptr_ptr= &cur->next_subsequent_commit;
}
- waiting_for_commit= false;
+ waitee= NULL;
}
void wakeup(int wakeup_error);
@@ -1694,6 +1698,7 @@ struct wait_for_commit
wait_for_commit();
~wait_for_commit();
+ void reinit();
};
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 69d93968f9f..35e1cd457ee 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1609,6 +1609,49 @@ static Sys_var_ulong Sys_slave_parallel_threads(
ON_UPDATE(fix_slave_parallel_threads));
+static bool
+check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var)
+{
+ bool running;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ if (running)
+ return true;
+
+ return false;
+}
+
+static bool
+fix_slave_domain_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
+{
+ bool running;
+
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_global_system_variables);
+
+ return running ? true : false;
+}
+
+
+static Sys_var_ulong Sys_slave_domain_parallel_threads(
+ "slave_domain_parallel_threads",
+ "Maximum number of parallel threads to use on slave for events in a "
+ "single replication domain. When using multiple domains, this can be "
+ "used to limit a single domain from grabbing all threads and thus "
+ "stalling other domains. The default of 0 means to allow a domain to "
+ "grab as many threads as it wants, up to the value of "
+ "slave_parallel_threads.",
+ GLOBAL_VAR(opt_slave_domain_parallel_threads), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG, ON_CHECK(check_slave_domain_parallel_threads),
+ ON_UPDATE(fix_slave_domain_parallel_threads));
+
+
static Sys_var_ulong Sys_slave_parallel_max_queued(
"slave_parallel_max_queued",
"Limit on how much memory SQL threads should use per parallel "