diff options
-rw-r--r-- | mysql-test/r/mysqld--help.result | 9 | ||||
-rw-r--r-- | mysql-test/suite/perfschema/r/dml_setup_instruments.result | 4 | ||||
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel.result | 88 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 108 | ||||
-rw-r--r-- | mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result | 13 | ||||
-rw-r--r-- | mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test | 14 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 4 | ||||
-rw-r--r-- | sql/log.cc | 29 | ||||
-rw-r--r-- | sql/mysqld.cc | 9 | ||||
-rw-r--r-- | sql/mysqld.h | 6 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 951 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 146 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 63 | ||||
-rw-r--r-- | sql/rpl_rli.h | 20 | ||||
-rw-r--r-- | sql/slave.cc | 6 | ||||
-rw-r--r-- | sql/sql_class.cc | 52 | ||||
-rw-r--r-- | sql/sql_class.h | 29 | ||||
-rw-r--r-- | sql/sys_vars.cc | 43 |
18 files changed, 1144 insertions, 450 deletions
diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result index 953e9d926dd..9b124952095 100644 --- a/mysql-test/r/mysqld--help.result +++ b/mysql-test/r/mysqld--help.result @@ -782,6 +782,14 @@ The following options may be given as the first argument: is already the default. --slave-compressed-protocol Use compression on master/slave protocol + --slave-domain-parallel-threads=# + Maximum number of parallel threads to use on slave for + events in a single replication domain. When using + multiple domains, this can be used to limit a single + domain from grabbing all threads and thus stalling other + domains. The default of 0 means to allow a domain to grab + as many threads as it wants, up to the value of + slave_parallel_threads. --slave-exec-mode=name Modes for how replication events should be executed. Legal values are STRICT (default) and IDEMPOTENT. In @@ -1155,6 +1163,7 @@ skip-networking FALSE skip-show-database FALSE skip-slave-start FALSE slave-compressed-protocol FALSE +slave-domain-parallel-threads 0 slave-exec-mode STRICT slave-max-allowed-packet 1073741824 slave-net-timeout 3600 diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result index 17087264e7c..b47283d9193 100644 --- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result +++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result @@ -37,6 +37,7 @@ where name like 'Wait/Synch/Cond/sql/%' order by name limit 10; NAME ENABLED TIMED wait/synch/cond/sql/COND_flush_thread_cache YES YES +wait/synch/cond/sql/COND_group_commit_orderer YES YES wait/synch/cond/sql/COND_manager YES YES wait/synch/cond/sql/COND_parallel_entry YES YES wait/synch/cond/sql/COND_prepare_ordered YES YES @@ -44,8 +45,7 @@ wait/synch/cond/sql/COND_queue_state YES YES wait/synch/cond/sql/COND_rpl_status YES YES wait/synch/cond/sql/COND_rpl_thread YES YES wait/synch/cond/sql/COND_rpl_thread_pool YES YES -wait/synch/cond/sql/COND_server_started YES YES -wait/synch/cond/sql/COND_thread_cache YES YES +wait/synch/cond/sql/COND_rpl_thread_queue YES YES select * from performance_schema.setup_instruments where name='Wait'; select * from performance_schema.setup_instruments diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index 2ff6bd7cbe1..2531bf9457c 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -115,6 +115,7 @@ SET GLOBAL slave_parallel_threads=10; SET debug_sync='RESET'; include/start_slave.inc *** Test that group-committed transactions on the master can replicate in parallel on the slave. *** +SET debug_sync='RESET'; FLUSH LOGS; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7); @@ -141,6 +142,7 @@ INSERT INTO t3 VALUES (6, foo(16, '')); SET debug_sync='now WAIT_FOR master_queued3'; SET debug_sync='now SIGNAL master_cont1'; +SET debug_sync='RESET'; SELECT * FROM t3 ORDER BY a; a b 1 1 @@ -213,6 +215,9 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16, slave-bin.000003 # Xid # # COMMIT /* XID */ *** Test STOP SLAVE in parallel mode *** include/stop_slave.inc +SET debug_sync='RESET'; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; SET binlog_direct_non_transactional_updates=0; SET sql_log_bin=0; CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction"); @@ -227,10 +232,15 @@ INSERT INTO t3 VALUES(21, 21); INSERT INTO t3 VALUES(22, 22); SET binlog_format=@old_format; BEGIN; -INSERT INTO t2 VALUES (21); +INSERT INTO t2 VALUES (21); START SLAVE; +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger"; STOP SLAVE; +SET debug_sync='now WAIT_FOR wait_for_done_waiting'; ROLLBACK; +SET GLOBAL debug_dbug=@old_dbug; +SET debug_sync='RESET'; include/wait_for_slave_to_stop.inc SELECT * FROM t1 WHERE a >= 20 ORDER BY a; a @@ -292,6 +302,7 @@ a b 32 32 33 33 34 34 +SET debug_sync='RESET'; SET sql_log_bin=0; CALL mtr.add_suppression("Query execution was interrupted"); CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends"); @@ -308,6 +319,7 @@ STOP SLAVE IO_THREAD; SELECT * FROM t3 WHERE a >= 30 ORDER BY a; a b 31 31 +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -379,6 +391,7 @@ a b 42 42 43 43 44 44 +SET debug_sync='RESET'; SET debug_sync='now WAIT_FOR t2_query'; SET debug_sync='now SIGNAL t2_cont'; SET debug_sync='now WAIT_FOR t1_ready'; @@ -386,9 +399,7 @@ KILL THD_ID; SET debug_sync='now WAIT_FOR t2_killed'; SET debug_sync='now SIGNAL t1_cont'; include/wait_for_slave_sql_error.inc [errno=1317,1963] -SELECT * FROM t3 WHERE a >= 40 ORDER BY a; -a b -41 41 +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -463,6 +474,7 @@ a b 52 52 53 53 54 54 +SET debug_sync='RESET'; SET debug_sync='now WAIT_FOR t2_query'; SET debug_sync='now SIGNAL t2_cont'; SET debug_sync='now WAIT_FOR t1_ready'; @@ -473,6 +485,7 @@ include/wait_for_slave_sql_error.inc [errno=1317,1963] SELECT * FROM t3 WHERE a >= 50 ORDER BY a; a b 51 51 +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -514,14 +527,18 @@ include/start_slave.inc include/stop_slave.inc SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; -SET GLOBAL slave_parallel_threads=3; +SET GLOBAL slave_parallel_threads=4; include/start_slave.inc *** 4. Test killing thread that is waiting to start transaction until previous transaction commits *** SET binlog_format=statement; SET gtid_domain_id=2; +BEGIN; +INSERT INTO t3 VALUES (70, foo(70, +'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', '')); INSERT INTO t3 VALUES (60, foo(60, 'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2', 'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont')); +COMMIT; SET gtid_domain_id=0; SET debug_sync='now WAIT_FOR d2_query'; SET gtid_domain_id=1; @@ -540,15 +557,27 @@ INSERT INTO t3 VALUES (63, foo(63, 'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2', 'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont')); SET debug_sync='now WAIT_FOR d0_query'; +SET gtid_domain_id=3; +BEGIN; +INSERT INTO t3 VALUES (68, foo(68, +'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', '')); +INSERT INTO t3 VALUES (69, foo(69, +'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2', +'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont')); +COMMIT; +SET gtid_domain_id=0; +SET debug_sync='now WAIT_FOR d3_query'; SET debug_sync='now SIGNAL d2_cont2'; SET debug_sync='now WAIT_FOR d2_done'; SET debug_sync='now SIGNAL d1_cont2'; SET debug_sync='now WAIT_FOR d1_done'; SET debug_sync='now SIGNAL d0_cont2'; SET debug_sync='now WAIT_FOR d0_done'; +SET debug_sync='now SIGNAL d3_cont2'; +SET debug_sync='now WAIT_FOR d3_done'; SET binlog_format=statement; INSERT INTO t3 VALUES (64, foo(64, -'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', '')); +'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', '')); SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2'; INSERT INTO t3 VALUES (65, foo(65, '', '')); SET debug_sync='now WAIT_FOR master_queued2'; @@ -569,23 +598,34 @@ a b 65 65 66 66 67 67 +68 68 +69 69 +70 70 +SET debug_sync='RESET'; SET debug_sync='now SIGNAL d0_cont'; SET debug_sync='now WAIT_FOR t1_waiting'; +SET debug_sync='now SIGNAL d3_cont'; +SET debug_sync='now WAIT_FOR t2_waiting'; SET debug_sync='now SIGNAL d1_cont'; SET debug_sync='now WAIT_FOR t3_waiting'; SET debug_sync='now SIGNAL d2_cont'; +SET debug_sync='now WAIT_FOR t4_waiting'; KILL THD_ID; SET debug_sync='now WAIT_FOR t3_killed'; SET debug_sync='now SIGNAL t1_cont'; include/wait_for_slave_sql_error.inc [errno=1317,1927,1963] STOP SLAVE IO_THREAD; -SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a; a b 60 60 61 61 62 62 63 63 64 64 +68 68 +69 69 +70 70 +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -597,11 +637,11 @@ RETURN x; END || SET sql_log_bin=1; -INSERT INTO t3 VALUES (69,0); +UPDATE t3 SET b=b+1 WHERE a=60; include/start_slave.inc SELECT * FROM t3 WHERE a >= 60 ORDER BY a; a b -60 60 +60 61 61 61 62 62 63 63 @@ -609,7 +649,9 @@ a b 65 65 66 66 67 67 -69 0 +68 68 +69 69 +70 70 SET sql_log_bin=0; DROP FUNCTION foo; CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) @@ -634,37 +676,31 @@ include/start_slave.inc SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued; SET GLOBAL slave_parallel_max_queued=9000; SET binlog_format=statement; -INSERT INTO t3 VALUES (70, foo(0, +INSERT INTO t3 VALUES (80, foo(0, 'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', '')); SET debug_sync='now WAIT_FOR query_waiting'; SET @old_dbug= @@GLOBAL.debug_dbug; SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max"; -INSERT INTO t3 VALUES (72, 0); -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +SELECT * FROM t3 WHERE a >= 80 ORDER BY a; a b -70 0 -71 10000 -72 0 +80 0 +81 10000 SET debug_sync='now WAIT_FOR wait_queue_ready'; KILL THD_ID; SET debug_sync='now WAIT_FOR wait_queue_killed'; SET debug_sync='now SIGNAL query_cont'; include/wait_for_slave_sql_error.inc [errno=1317,1927,1963] STOP SLAVE IO_THREAD; -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; -a b -70 0 -71 10000 SET GLOBAL debug_dbug=@old_dbug; SET GLOBAL slave_parallel_max_queued= @old_max_queued; -INSERT INTO t3 VALUES (73,0); +INSERT INTO t3 VALUES (82,0); +SET debug_sync='RESET'; include/start_slave.inc -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +SELECT * FROM t3 WHERE a >= 80 ORDER BY a; a b -70 0 -71 10000 -72 0 -73 0 +80 0 +81 10000 +82 0 include/stop_slave.inc SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 72a0a72db7d..a71534d2eb1 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -163,6 +163,7 @@ SET debug_sync='RESET'; --echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. *** --connection server_1 +SET debug_sync='RESET'; FLUSH LOGS; --source include/wait_for_binlog_checkpoint.inc CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; @@ -230,6 +231,7 @@ REAP; REAP; --connection con_temp5 REAP; +SET debug_sync='RESET'; --connection server_1 SELECT * FROM t3 ORDER BY a; @@ -270,6 +272,10 @@ SELECT * FROM t3 ORDER BY a; --echo *** Test STOP SLAVE in parallel mode *** --connection server_2 --source include/stop_slave.inc +# Respawn all worker threads to clear any left-over debug_sync or other stuff. +SET debug_sync='RESET'; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; --connection server_1 # Set up a couple of transactions. The first will be blocked halfway @@ -288,7 +294,7 @@ BEGIN; INSERT INTO t2 VALUES (20); --disable_warnings INSERT INTO t1 VALUES (20); ---disable_warnings +--enable_warnings INSERT INTO t2 VALUES (21); INSERT INTO t3 VALUES (20, 20); COMMIT; @@ -300,7 +306,7 @@ SET binlog_format=@old_format; # Start a connection that will block the replicated transaction halfway. --connection con_temp1 BEGIN; -INSERT INTO t2 VALUES (21); +INSERT INTO t2 VALUES (21); --connection server_2 START SLAVE; @@ -312,13 +318,20 @@ START SLAVE; --connection con_temp2 # Initiate slave stop. It will have to wait for the current event group # to complete. +# The dbug injection causes debug_sync to signal 'wait_for_done_waiting' +# when the SQL driver thread is ready. +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger"; send STOP SLAVE; --connection con_temp1 +SET debug_sync='now WAIT_FOR wait_for_done_waiting'; ROLLBACK; --connection con_temp2 reap; +SET GLOBAL debug_dbug=@old_dbug; +SET debug_sync='RESET'; --connection server_2 --source include/wait_for_slave_to_stop.inc @@ -397,6 +410,7 @@ REAP; --connection server_1 SELECT * FROM t3 WHERE a >= 30 ORDER BY a; +SET debug_sync='RESET'; --connection server_2 SET sql_log_bin=0; @@ -431,6 +445,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY a; # Now we have to disable the debug_sync statements, so they do not trigger # when the events are retried. +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -535,6 +550,7 @@ REAP; --connection server_1 SELECT * FROM t3 WHERE a >= 40 ORDER BY a; +SET debug_sync='RESET'; --connection server_2 # Wait until T2 is inside executing its insert of 42, then find it in SHOW @@ -559,10 +575,10 @@ SET debug_sync='now SIGNAL t1_cont'; --let $slave_sql_errno= 1317,1963 --source include/wait_for_slave_sql_error.inc -SELECT * FROM t3 WHERE a >= 40 ORDER BY a; # Now we have to disable the debug_sync statements, so they do not trigger # when the events are retried. +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -673,6 +689,7 @@ REAP; --connection server_1 SELECT * FROM t3 WHERE a >= 50 ORDER BY a; +SET debug_sync='RESET'; --connection server_2 # Wait until T2 is inside executing its insert of 52, then find it in SHOW @@ -701,6 +718,7 @@ SELECT * FROM t3 WHERE a >= 50 ORDER BY a; # Now we have to disable the debug_sync statements, so they do not trigger # when the events are retried. +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -752,7 +770,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos; --source include/stop_slave.inc SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; -SET GLOBAL slave_parallel_threads=3; +SET GLOBAL slave_parallel_threads=4; --source include/start_slave.inc @@ -762,24 +780,29 @@ SET GLOBAL slave_parallel_threads=3; # can run in parallel with each other (same group commit and commit id), # but not in parallel with T1. # -# We use three worker threads. T1 and T2 will be queued on the first, T3 on -# the second, and T4 on the third. We will delay T1 commit, T3 will wait for -# T1 to commit before it can start. We will kill T3 during this wait, and +# We use four worker threads, each Ti will be queued on each their own +# worker thread. We will delay T1 commit, T3 will wait for T1 to begin +# commit before it can start. We will kill T3 during this wait, and # check that everything works correctly. # # It is rather tricky to get the correct thread id of the worker to kill. -# We start by injecting three dummy transactions in a debug_sync-controlled +# We start by injecting four dummy transactions in a debug_sync-controlled # manner to be able to get known thread ids for the workers in a pool with -# just 3 worker threads. Then we let in each of the real test transactions +# just 4 worker threads. Then we let in each of the real test transactions # T1-T4 one at a time in a way which allows us to know which transaction # ends up with which thread id. --connection server_1 SET binlog_format=statement; SET gtid_domain_id=2; +BEGIN; +# This debug_sync will linger on and be used to control T4 later. +INSERT INTO t3 VALUES (70, foo(70, + 'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', '')); INSERT INTO t3 VALUES (60, foo(60, 'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2', 'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont')); +COMMIT; SET gtid_domain_id=0; --connection server_2 @@ -813,12 +836,30 @@ INSERT INTO t3 VALUES (63, foo(63, SET debug_sync='now WAIT_FOR d0_query'; --let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'` +--connection server_1 +SET gtid_domain_id=3; +BEGIN; +# These debug_sync's will linger on and be used to control T2 later. +INSERT INTO t3 VALUES (68, foo(68, + 'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', '')); +INSERT INTO t3 VALUES (69, foo(69, + 'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2', + 'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont')); +COMMIT; +SET gtid_domain_id=0; + +--connection server_2 +SET debug_sync='now WAIT_FOR d3_query'; +--let $d3_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(69%' AND INFO NOT LIKE '%LIKE%'` + SET debug_sync='now SIGNAL d2_cont2'; SET debug_sync='now WAIT_FOR d2_done'; SET debug_sync='now SIGNAL d1_cont2'; SET debug_sync='now WAIT_FOR d1_done'; SET debug_sync='now SIGNAL d0_cont2'; SET debug_sync='now WAIT_FOR d0_done'; +SET debug_sync='now SIGNAL d3_cont2'; +SET debug_sync='now WAIT_FOR d3_done'; # Now prepare the real transactions T1, T2, T3, T4 on the master. @@ -826,7 +867,7 @@ SET debug_sync='now WAIT_FOR d0_done'; # Create transaction T1. SET binlog_format=statement; INSERT INTO t3 VALUES (64, foo(64, - 'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', '')); + 'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', '')); # Create transaction T2, as a group commit leader on the master. SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2'; @@ -861,6 +902,7 @@ REAP; --connection server_1 SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +SET debug_sync='RESET'; --connection server_2 # Now we have the four transactions pending for replication on the slave. @@ -872,15 +914,20 @@ SELECT * FROM t3 WHERE a >= 60 ORDER BY a; SET debug_sync='now SIGNAL d0_cont'; SET debug_sync='now WAIT_FOR t1_waiting'; -# T2 will be queued on the same worker D0 as T1. +# Make the worker D3 free, and wait for T2 to be queued in it. +SET debug_sync='now SIGNAL d3_cont'; +SET debug_sync='now WAIT_FOR t2_waiting'; + # Now release worker D1, and wait for T3 to be queued in it. # T3 will wait for T1 to commit before it can start. SET debug_sync='now SIGNAL d1_cont'; SET debug_sync='now WAIT_FOR t3_waiting'; -# Release worker D2. T4 may or may not have time to be queued on it, but -# it will not be able to complete due to T3 being killed. +# Release worker D2. Wait for T4 to be queued, so we are sure it has +# received the debug_sync signal (else we might overwrite it with the +# next debug_sync). SET debug_sync='now SIGNAL d2_cont'; +SET debug_sync='now WAIT_FOR t4_waiting'; # Now we kill the waiting transaction T3 in worker D1. --replace_result $d1_thd_id THD_ID @@ -895,10 +942,15 @@ SET debug_sync='now SIGNAL t1_cont'; --let $slave_sql_errno= 1317,1927,1963 --source include/wait_for_slave_sql_error.inc STOP SLAVE IO_THREAD; -SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +# Since T2, T3, and T4 run in parallel, we can not be sure if T2 will have time +# to commit or not before the stop. However, T1 should commit, and T3/T4 may +# not have committed. (After slave restart we check that all become committed +# eventually). +SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a; # Now we have to disable the debug_sync statements, so they do not trigger # when the events are retried. +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -914,7 +966,7 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) SET sql_log_bin=1; --connection server_1 -INSERT INTO t3 VALUES (69,0); +UPDATE t3 SET b=b+1 WHERE a=60; --save_master_pos --connection server_2 @@ -951,6 +1003,7 @@ SET GLOBAL slave_parallel_threads=10; --echo *** 5. Test killing thread that is waiting for queue of max length to shorten *** +# Find the thread id of the driver SQL thread that we want to kill. --let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%' --source include/wait_condition.inc --let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'` @@ -961,12 +1014,8 @@ SET GLOBAL slave_parallel_max_queued=9000; --let bigstring= `SELECT REPEAT('x', 10000)` SET binlog_format=statement; # Create an event that will wait to be signalled. -INSERT INTO t3 VALUES (70, foo(0, +INSERT INTO t3 VALUES (80, foo(0, 'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', '')); ---disable_query_log -# Create an event that will fill up the queue. -eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring')); ---enable_query_log --connection server_2 SET debug_sync='now WAIT_FOR query_waiting'; @@ -977,11 +1026,14 @@ SET @old_dbug= @@GLOBAL.debug_dbug; SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max"; --connection server_1 -# This event will have to wait for the queue to become shorter before it can -# be queued. We will test that things work when we kill the SQL driver thread -# during this wait. -INSERT INTO t3 VALUES (72, 0); -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +--disable_query_log +# Create an event that will fill up the queue. +# The Xid event at the end of the event group will have to wait for the Query +# event with the INSERT to drain so the queue becomes shorter. However that in +# turn waits for the prior event group to continue. +eval INSERT INTO t3 VALUES (81, LENGTH('$bigstring')); +--enable_query_log +SELECT * FROM t3 WHERE a >= 80 ORDER BY a; --connection server_2 SET debug_sync='now WAIT_FOR wait_queue_ready'; @@ -995,19 +1047,19 @@ SET debug_sync='now SIGNAL query_cont'; --let $slave_sql_errno= 1317,1927,1963 --source include/wait_for_slave_sql_error.inc STOP SLAVE IO_THREAD; -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; SET GLOBAL debug_dbug=@old_dbug; SET GLOBAL slave_parallel_max_queued= @old_max_queued; --connection server_1 -INSERT INTO t3 VALUES (73,0); +INSERT INTO t3 VALUES (82,0); --save_master_pos --connection server_2 +SET debug_sync='RESET'; --source include/start_slave.inc --sync_with_master -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +SELECT * FROM t3 WHERE a >= 80 ORDER BY a; --connection server_2 diff --git a/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result new file mode 100644 index 00000000000..9e53d9bd891 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result @@ -0,0 +1,13 @@ +SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads; +SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default'; +must be zero because of default +0 +SELECT @@SESSION.slave_domain_parallel_threads as 'no session var'; +ERROR HY000: Variable 'slave_domain_parallel_threads' is a GLOBAL variable +SET GLOBAL slave_domain_parallel_threads= 0; +SET GLOBAL slave_domain_parallel_threads= DEFAULT; +SET GLOBAL slave_domain_parallel_threads= 10; +SELECT @@GLOBAL.slave_domain_parallel_threads; +@@GLOBAL.slave_domain_parallel_threads +10 +SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads; diff --git a/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test new file mode 100644 index 00000000000..7be48fbd4c5 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test @@ -0,0 +1,14 @@ +--source include/not_embedded.inc + +SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads; + +SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default'; +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +SELECT @@SESSION.slave_domain_parallel_threads as 'no session var'; + +SET GLOBAL slave_domain_parallel_threads= 0; +SET GLOBAL slave_domain_parallel_threads= DEFAULT; +SET GLOBAL slave_domain_parallel_threads= 10; +SELECT @@GLOBAL.slave_domain_parallel_threads; + +SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads; diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 02e5c5373ae..0c48b367942 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -1281,10 +1281,6 @@ read_append_buffer: size_t transfer_len; DBUG_ASSERT(info->append_read_pos <= info->write_pos); - /* - TODO: figure out if the assert below is needed or correct. - */ - DBUG_ASSERT(pos_in_file == info->end_of_file); copy_len=min(Count, len_in_buff); memcpy(Buffer, info->append_read_pos, copy_len); info->append_read_pos += copy_len; diff --git a/sql/log.cc b/sql/log.cc index 6fead95a4b1..a7f3a69cfc9 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6644,13 +6644,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) */ wfc= orig_entry->thd->wait_for_commit_ptr; orig_entry->queued_by_other= false; - if (wfc && wfc->waiting_for_commit) + if (wfc && wfc->waitee) { mysql_mutex_lock(&wfc->LOCK_wait_commit); /* Do an extra check here, this time safely under lock. */ - if (wfc->waiting_for_commit) + if (wfc->waitee) { const char *old_msg; + wait_for_commit *loc_waitee; + /* By setting wfc->opaque_pointer to our own entry, we mark that we are ready to commit, but waiting for another transaction to commit before @@ -6661,21 +6663,20 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) queued_by_other flag is set. */ wfc->opaque_pointer= orig_entry; + DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); old_msg= orig_entry->thd->enter_cond(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit, "Waiting for prior transaction to commit"); - DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); - while (wfc->waiting_for_commit && !orig_entry->thd->check_killed()) + while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed()) mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); wfc->opaque_pointer= NULL; DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", orig_entry->queued_by_other)); - if (wfc->waiting_for_commit) + if (loc_waitee) { /* Wait terminated due to kill. */ - wait_for_commit *loc_waitee= wfc->waitee; mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); if (loc_waitee->wakeup_subsequent_commits_running || orig_entry->queued_by_other) @@ -6685,13 +6686,14 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) do { mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); - } while (wfc->waiting_for_commit); + } while (wfc->waitee); } else { /* We were killed, so remove us from the list of waitee. */ wfc->remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + wfc->waitee= NULL; orig_entry->thd->exit_cond(old_msg); /* Interrupted by kill. */ @@ -6707,12 +6709,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) } else mysql_mutex_unlock(&wfc->LOCK_wait_commit); - - if (wfc->wakeup_error) - { - my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); - DBUG_RETURN(-1); - } + } + if (wfc && wfc->wakeup_error) + { + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + DBUG_RETURN(-1); } /* @@ -9043,7 +9044,7 @@ start_binlog_background_thread() array_elements(all_binlog_threads)); #endif - if (mysql_thread_create(key_thread_binlog, &th, NULL, + if (mysql_thread_create(key_thread_binlog, &th, &connection_attrib, binlog_background_thread, NULL)) return 1; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index d95d856c18f..b7cdeb0bde4 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -546,6 +546,7 @@ ulong rpl_recovery_rank=0; ulong stored_program_cache_size= 0; ulong opt_slave_parallel_threads= 0; +ulong opt_slave_domain_parallel_threads= 0; ulong opt_binlog_commit_wait_count= 0; ulong opt_binlog_commit_wait_usec= 0; ulong opt_slave_parallel_max_queued= 131072; @@ -895,8 +896,10 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, key_COND_wait_commit; PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; -PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool, - key_COND_parallel_entry, key_COND_prepare_ordered; +PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread, + key_COND_rpl_thread_pool, + key_COND_parallel_entry, key_COND_group_commit_orderer, + key_COND_prepare_ordered; PSI_cond_key key_COND_wait_gtid; static PSI_cond_info all_server_conds[]= @@ -941,8 +944,10 @@ static PSI_cond_info all_server_conds[]= { &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_rpl_thread, "COND_rpl_thread", 0}, + { &key_COND_rpl_thread_queue, "COND_rpl_thread_queue", 0}, { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0}, { &key_COND_parallel_entry, "COND_parallel_entry", 0}, + { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0}, { &key_COND_prepare_ordered, "COND_prepare_ordered", 0}, { &key_COND_wait_gtid, "COND_wait_gtid", 0} }; diff --git a/sql/mysqld.h b/sql/mysqld.h index 4fdd34fd8be..dfdd54225fe 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -177,6 +177,7 @@ extern ulong opt_binlog_rows_event_max_size; extern ulong rpl_recovery_rank, thread_cache_size; extern ulong stored_program_cache_size; extern ulong opt_slave_parallel_threads; +extern ulong opt_slave_domain_parallel_threads; extern ulong opt_slave_parallel_max_queued; extern ulong opt_binlog_commit_wait_count; extern ulong opt_binlog_commit_wait_usec; @@ -284,8 +285,9 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, key_COND_wait_commit; extern PSI_cond_key key_RELAYLOG_COND_queue_busy; extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; -extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool, - key_COND_parallel_entry; +extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue, + key_COND_rpl_thread_pool, + key_COND_parallel_entry, key_COND_group_commit_orderer; extern PSI_cond_key key_COND_wait_gtid; extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index ef282611f70..eab5b980c02 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -93,32 +93,12 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) } -static bool -sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) -{ - if (!rgi->rli->abort_slave && !abort_loop) - return false; - - /* - Do not abort in the middle of an event group that cannot be rolled back. - */ - if ((thd->transaction.all.modified_non_trans_table || - (thd->variables.option_bits & OPTION_KEEP_LOG)) - && in_event_group) - return false; - /* ToDo: should we add some timeout like in sql_slave_killed? - if (rgi->last_event_start_time == 0) - rgi->last_event_start_time= my_time(0); - */ - - return true; -} - - static void finish_event_group(THD *thd, int err, uint64 sub_id, - rpl_parallel_entry *entry, wait_for_commit *wfc) + rpl_parallel_entry *entry, rpl_group_info *rgi) { + wait_for_commit *wfc= &rgi->commit_orderer; + /* Remove any left-over registration to wait for a prior commit to complete. Normally, such wait would already have been removed at @@ -163,12 +143,26 @@ finish_event_group(THD *thd, int err, uint64 sub_id, */ mysql_mutex_lock(&entry->LOCK_parallel_entry); if (entry->last_committed_sub_id < sub_id) - { entry->last_committed_sub_id= sub_id; - mysql_cond_broadcast(&entry->COND_parallel_entry); - } + + /* + If this event group got error, then any following event groups that have + not yet started should just skip their group, preparing for stop of the + SQL driver thread. + */ + if (unlikely(rgi->is_error) && + entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) + entry->stop_on_error_sub_id= sub_id; + /* + We need to mark that this event group started its commit phase, in case we + missed it before (otherwise we would deadlock the next event group that is + waiting for this). In most cases (normal DML), it will be a no-op. + */ + rgi->mark_start_commit_no_lock(); mysql_mutex_unlock(&entry->LOCK_parallel_entry); + thd->clear_error(); + thd->stmt_da->reset_diagnostics_area(); wfc->wakeup_subsequent_commits(err); } @@ -185,6 +179,20 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) } +static void +unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, + const char *old_msg) +{ + if (*did_enter_cond) + { + thd->exit_cond(old_msg); + *did_enter_cond= false; + } + else + mysql_mutex_unlock(lock); +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -193,8 +201,14 @@ handle_rpl_parallel_thread(void *arg) struct rpl_parallel_thread::queued_event *events; bool group_standalone= true; bool in_event_group= false; + bool group_skip_for_stop= false; rpl_group_info *group_rgi= NULL; + group_commit_orderer *gco, *tmp_gco; uint64 event_gtid_sub_id= 0; + rpl_parallel_thread::queued_event *qevs_to_free; + rpl_group_info *rgis_to_free; + group_commit_orderer *gcos_to_free; + size_t total_event_size; int err; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -234,44 +248,62 @@ handle_rpl_parallel_thread(void *arg) rpt->running= true; mysql_cond_signal(&rpt->COND_rpl_thread); - while (!rpt->stop && !thd->killed) + while (!rpt->stop) { - rpl_parallel_thread *list; - old_msg= thd->proc_info; thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, "Waiting for work from SQL thread"); - while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed && - !(rpt->current_entry && rpt->current_entry->force_abort)) + /* + There are 4 cases that should cause us to wake up: + - Events have been queued for us to handle. + - We have an owner, but no events and not inside event group -> we need + to release ourself to the thread pool + - SQL thread is stopping, and we have an owner but no events, and we are + inside an event group; no more events will be queued to us, so we need + to abort the group (force_abort==1). + - Thread pool shutdown (rpt->stop==1). + */ + while (!( (events= rpt->event_queue) || + (rpt->current_owner && !in_event_group) || + (rpt->current_owner && group_rgi->parallel_entry->force_abort) || + rpt->stop)) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); - rpt->dequeue(events); + rpt->dequeue1(events); thd->exit_cond(old_msg); - mysql_cond_signal(&rpt->COND_rpl_thread); more_events: + qevs_to_free= NULL; + rgis_to_free= NULL; + gcos_to_free= NULL; + total_event_size= 0; while (events) { struct rpl_parallel_thread::queued_event *next= events->next; Log_event_type event_type; rpl_group_info *rgi= events->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; - uint64 wait_for_sub_id; - uint64 wait_start_sub_id; - bool end_of_group; + bool end_of_group, group_ending; + total_event_size+= events->event_size; if (!events->ev) { handle_queued_pos_update(thd, events); - my_free(events); + events->next= qevs_to_free; + qevs_to_free= events; events= next; continue; } err= 0; group_rgi= rgi; + gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ if ((event_type= events->ev->get_type_code()) == GTID_EVENT) { + bool did_enter_cond= false; + const char *old_msg= NULL; + uint64 wait_count; + in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -293,50 +325,87 @@ handle_rpl_parallel_thread(void *arg) occured. Also do not start parallel execution of this event group until all - prior groups have committed that are not safe to run in parallel with. + prior groups have reached the commit phase that are not safe to run + in parallel with. */ - wait_for_sub_id= rgi->wait_commit_sub_id; - wait_start_sub_id= rgi->wait_start_sub_id; - if (wait_for_sub_id || wait_start_sub_id) + mysql_mutex_lock(&entry->LOCK_parallel_entry); + if (!gco->installed) { - bool did_enter_cond= false; - const char *old_msg= NULL; - - mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (wait_start_sub_id) + if (gco->prev_gco) + gco->prev_gco->next_gco= gco; + gco->installed= true; + } + wait_count= gco->wait_count; + if (wait_count > entry->count_committing_event_groups) + { + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); + old_msg= thd->enter_cond(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry, + "Waiting for prior transaction to start " + "commit before starting next transaction"); + did_enter_cond= true; + do { - old_msg= thd->enter_cond(&entry->COND_parallel_entry, - &entry->LOCK_parallel_entry, - "Waiting for prior transaction to commit " - "before starting next transaction"); - did_enter_cond= true; - DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); - while (wait_start_sub_id > entry->last_committed_sub_id && - !thd->check_killed()) - mysql_cond_wait(&entry->COND_parallel_entry, - &entry->LOCK_parallel_entry); - if (wait_start_sub_id > entry->last_committed_sub_id) + if (thd->check_killed() && !rgi->is_error) { - /* The thread got a kill signal. */ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); thd->send_kill_message(); slave_output_error_info(rgi->rli, thd); signal_error_to_sql_driver_thread(thd, rgi); + /* + Even though we were killed, we need to continue waiting for the + prior event groups to signal that we can continue. Otherwise we + mess up the accounting for ordering. However, now that we have + marked the error, events will just be skipped rather than + executed, and things will progress quickly towards stop. + */ } - rgi->wait_start_sub_id= 0; /* No need to check again. */ - } - if (wait_for_sub_id > entry->last_committed_sub_id) - { - wait_for_commit *waitee= - &rgi->wait_commit_group_info->commit_orderer; - rgi->commit_orderer.register_wait_for_prior_commit(waitee); - } - if (did_enter_cond) - thd->exit_cond(old_msg); - else - mysql_mutex_unlock(&entry->LOCK_parallel_entry); + mysql_cond_wait(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry); + } while (wait_count > entry->count_committing_event_groups); + } + + if ((tmp_gco= gco->prev_gco)) + { + /* + Now all the event groups in the previous batch have entered their + commit phase, and will no longer access their gco. So we can free + it here. + */ + DBUG_ASSERT(!tmp_gco->prev_gco); + gco->prev_gco= NULL; + tmp_gco->next_gco= gcos_to_free; + gcos_to_free= tmp_gco; } + if (entry->force_abort && wait_count > entry->stop_count) + { + /* + We are stopping (STOP SLAVE), and this event group is beyond the + point where we can safely stop. So set a flag that will cause us + to skip, rather than execute, the following events. + */ + group_skip_for_stop= true; + } + else + group_skip_for_stop= false; + + if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) + group_skip_for_stop= true; + else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) + { + /* + Register that the commit of this event group must wait for the + commit of the previous event group to complete before it may + complete itself, so that we preserve commit order. + */ + wait_for_commit *waitee= + &rgi->wait_commit_group_info->commit_orderer; + rgi->commit_orderer.register_wait_for_prior_commit(waitee); + } + unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, + &did_enter_cond, old_msg); + if(thd->wait_for_commit_ptr) { /* @@ -353,13 +422,23 @@ handle_rpl_parallel_thread(void *arg) thd->wait_for_commit_ptr= &rgi->commit_orderer; } + group_ending= event_type == XID_EVENT || + (event_type == QUERY_EVENT && + (((Query_log_event *)events->ev)->is_commit() || + ((Query_log_event *)events->ev)->is_rollback())); + if (group_ending) + { + DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); + rgi->mark_start_commit(); + } + /* If the SQL thread is stopping, we just skip execution of all the following event groups. We still do all the normal waiting and wakeup processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group)) + if (!rgi->is_error && !group_skip_for_stop) err= rpt_handle_event(events, rpt); else err= thd->wait_for_prior_commit(); @@ -367,13 +446,11 @@ handle_rpl_parallel_thread(void *arg) end_of_group= in_event_group && ((group_standalone && !Log_event::is_part_of_group(event_type)) || - event_type == XID_EVENT || - (event_type == QUERY_EVENT && - (((Query_log_event *)events->ev)->is_commit() || - ((Query_log_event *)events->ev)->is_rollback()))); + group_ending); delete_or_keep_event_post_apply(rgi, event_type, events->ev); - my_free(events); + events->next= qevs_to_free; + qevs_to_free= events; if (err) { @@ -383,10 +460,11 @@ handle_rpl_parallel_thread(void *arg) if (end_of_group) { in_event_group= false; - finish_event_group(thd, err, event_gtid_sub_id, entry, - &rgi->commit_orderer); - delete rgi; + finish_event_group(thd, err, event_gtid_sub_id, entry, rgi); + rgi->next= rgis_to_free; + rgis_to_free= rgi; group_rgi= rgi= NULL; + group_skip_for_stop= false; DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } @@ -394,6 +472,29 @@ handle_rpl_parallel_thread(void *arg) } mysql_mutex_lock(&rpt->LOCK_rpl_thread); + /* Signal that our queue can now accept more events. */ + rpt->dequeue2(total_event_size); + mysql_cond_signal(&rpt->COND_rpl_thread_queue); + /* We need to delay the free here, to when we have the lock. */ + while (gcos_to_free) + { + group_commit_orderer *next= gcos_to_free->next_gco; + rpt->free_gco(gcos_to_free); + gcos_to_free= next; + } + while (rgis_to_free) + { + rpl_group_info *next= rgis_to_free->next; + rpt->free_rgi(rgis_to_free); + rgis_to_free= next; + } + while (qevs_to_free) + { + rpl_parallel_thread::queued_event *next= qevs_to_free->next; + rpt->free_qev(qevs_to_free); + qevs_to_free= next; + } + if ((events= rpt->event_queue) != NULL) { /* @@ -401,9 +502,8 @@ handle_rpl_parallel_thread(void *arg) This is faster than having to wakeup the pool manager thread to give us a new event. */ - rpt->dequeue(events); + rpt->dequeue1(events); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - mysql_cond_signal(&rpt->COND_rpl_thread); goto more_events; } @@ -418,27 +518,26 @@ handle_rpl_parallel_thread(void *arg) half-processed event group. */ mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + thd->wait_for_prior_commit(); finish_event_group(thd, 1, group_rgi->gtid_sub_id, - group_rgi->parallel_entry, &group_rgi->commit_orderer); + group_rgi->parallel_entry, group_rgi); signal_error_to_sql_driver_thread(thd, group_rgi); in_event_group= false; - delete group_rgi; - group_rgi= NULL; mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->free_rgi(group_rgi); + group_rgi= NULL; + group_skip_for_stop= false; } if (!in_event_group) { + rpt->current_owner= NULL; + /* Tell wait_for_done() that we are done, if it is waiting. */ + if (likely(rpt->current_entry) && + unlikely(rpt->current_entry->force_abort)) + mysql_cond_broadcast(&rpt->current_entry->COND_parallel_entry); rpt->current_entry= NULL; if (!rpt->stop) - { - mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool); - list= rpt->pool->free_list; - rpt->next= list; - rpt->pool->free_list= rpt; - if (!list) - mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool); - mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool); - } + rpt->pool->release_thread(rpt); } } @@ -467,6 +566,15 @@ handle_rpl_parallel_thread(void *arg) } +static void +dealloc_gco(group_commit_orderer *gco) +{ + DBUG_ASSERT(!gco->prev_gco /* Must only free after dealloc previous */); + mysql_cond_destroy(&gco->COND_group_commit_orderer); + my_free(gco); +} + + int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, uint32 new_count, bool skip_check) @@ -501,8 +609,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread, MY_MUTEX_INIT_SLOW); mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL); + mysql_cond_init(key_COND_rpl_thread_queue, + &new_list[i]->COND_rpl_thread_queue, NULL); new_list[i]->pool= pool; - if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL, + if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib, handle_rpl_parallel_thread, new_list[i])) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); @@ -539,7 +649,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, */ for (i= 0; i < pool->count; ++i) { - rpl_parallel_thread *rpt= pool->get_thread(NULL); + rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL); rpt->stop= true; mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); @@ -554,6 +664,24 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_destroy(&rpt->LOCK_rpl_thread); mysql_cond_destroy(&rpt->COND_rpl_thread); + while (rpt->qev_free_list) + { + rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next; + my_free(rpt->qev_free_list); + rpt->qev_free_list= next; + } + while (rpt->rgi_free_list) + { + rpl_group_info *next= rpt->rgi_free_list->next; + delete rpt->rgi_free_list; + rpt->rgi_free_list= next; + } + while (rpt->gco_free_list) + { + group_commit_orderer *next= rpt->gco_free_list->next_gco; + dealloc_gco(rpt->gco_free_list); + rpt->gco_free_list= next; + } } my_free(pool->threads); @@ -609,6 +737,121 @@ err: } +rpl_parallel_thread::queued_event * +rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli) +{ + queued_event *qev; + mysql_mutex_assert_owner(&LOCK_rpl_thread); + if ((qev= qev_free_list)) + qev_free_list= qev->next; + else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); + return NULL; + } + qev->ev= ev; + qev->event_size= event_size; + qev->next= NULL; + strcpy(qev->event_relay_log_name, rli->event_relay_log_name); + qev->event_relay_log_pos= rli->event_relay_log_pos; + qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; + strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); + return qev; +} + + +void +rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) +{ + mysql_mutex_assert_owner(&LOCK_rpl_thread); + qev->next= qev_free_list; + qev_free_list= qev; +} + + +rpl_group_info* +rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, + rpl_parallel_entry *e) +{ + rpl_group_info *rgi; + mysql_mutex_assert_owner(&LOCK_rpl_thread); + if ((rgi= rgi_free_list)) + { + rgi_free_list= rgi->next; + rgi->reinit(rli); + } + else + { + if(!(rgi= new rpl_group_info(rli))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi)); + return NULL; + } + rgi->is_parallel_exec = true; + if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) + rgi->deferred_events= new Deferred_log_events(rli); + } + if (event_group_new_gtid(rgi, gtid_ev)) + { + free_rgi(rgi); + my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + return NULL; + } + rgi->parallel_entry= e; + + return rgi; +} + + +void +rpl_parallel_thread::free_rgi(rpl_group_info *rgi) +{ + mysql_mutex_assert_owner(&LOCK_rpl_thread); + DBUG_ASSERT(rgi->commit_orderer.waitee == NULL); + rgi->free_annotate_event(); + if (rgi->deferred_events) + { + delete rgi->deferred_events; + rgi->deferred_events= NULL; + } + rgi->next= rgi_free_list; + rgi_free_list= rgi; +} + + +group_commit_orderer * +rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) +{ + group_commit_orderer *gco; + mysql_mutex_assert_owner(&LOCK_rpl_thread); + if ((gco= gco_free_list)) + gco_free_list= gco->next_gco; + else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco)); + return NULL; + } + mysql_cond_init(key_COND_group_commit_orderer, + &gco->COND_group_commit_orderer, NULL); + gco->wait_count= wait_count; + gco->prev_gco= prev; + gco->next_gco= NULL; + gco->installed= false; + return gco; +} + + +void +rpl_parallel_thread::free_gco(group_commit_orderer *gco) +{ + mysql_mutex_assert_owner(&LOCK_rpl_thread); + DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */); + gco->next_gco= gco_free_list; + gco_free_list= gco; +} + + rpl_parallel_thread_pool::rpl_parallel_thread_pool() : count(0), threads(0), free_list(0), changing(false), inited(false) { @@ -651,7 +894,8 @@ rpl_parallel_thread_pool::destroy() Note that we return with the worker threads's LOCK_rpl_thread mutex locked. */ struct rpl_parallel_thread * -rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) +rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, + rpl_parallel_entry *entry) { rpl_parallel_thread *rpt; @@ -661,16 +905,151 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) free_list= rpt->next; mysql_mutex_unlock(&LOCK_rpl_thread_pool); mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->current_owner= owner; rpt->current_entry= entry; return rpt; } +/* + Release a thread to the thread pool. + The thread should be locked, and should not have any work queued for it. +*/ +void +rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) +{ + rpl_parallel_thread *list; + + mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread); + DBUG_ASSERT(rpt->current_owner == NULL); + mysql_mutex_lock(&LOCK_rpl_thread_pool); + list= free_list; + rpt->next= list; + free_list= rpt; + if (!list) + mysql_cond_broadcast(&COND_rpl_thread_pool); + mysql_mutex_unlock(&LOCK_rpl_thread_pool); +} + + +/* + Obtain a worker thread that we can queue an event to. + + Each invocation allocates a new worker thread, to maximise + parallelism. However, only up to a maximum of + --slave-domain-parallel-threads workers can be occupied by a single + replication domain; after that point, we start re-using worker threads that + are still executing events that were queued earlier for this thread. + + We never queue more than --rpl-parallel-wait-queue_max amount of events + for one worker, to avoid the SQL driver thread using up all memory with + queued events while worker threads are stalling. + + Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread + locked. Exception is if we were killed, in which case NULL is returned. + + The *did_enter_cond flag is set true if we had to wait for a worker thread + to become free (with mysql_cond_wait()). If so, *old_msg will also be set, + and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead + of mysql_mutex_unlock. + + If the flag `reuse' is set, the last worker thread will be returned again, + if it is still available. Otherwise a new worker thread is allocated. +*/ +rpl_parallel_thread * +rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond, + const char **old_msg, bool reuse) +{ + uint32 idx; + rpl_parallel_thread *thr; + + idx= rpl_thread_idx; + if (!reuse) + { + ++idx; + if (idx >= rpl_thread_max) + idx= 0; + rpl_thread_idx= idx; + } + thr= rpl_threads[idx]; + if (thr) + { + *did_enter_cond= false; + mysql_mutex_lock(&thr->LOCK_rpl_thread); + for (;;) + { + if (thr->current_owner != &rpl_threads[idx]) + { + /* + The worker thread became idle, and returned to the free list and + possibly was allocated to a different request. So we should allocate + a new worker thread. + */ + unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread, + did_enter_cond, *old_msg); + thr= NULL; + break; + } + else if (thr->queued_size <= opt_slave_parallel_max_queued) + { + /* The thread is ready to queue into. */ + break; + } + else if (rli->sql_driver_thd->check_killed()) + { + unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread, + did_enter_cond, *old_msg); + my_error(ER_CONNECTION_KILLED, MYF(0)); + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", + { + debug_sync_set_action(rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_killed")); + };); + slave_output_error_info(rli, rli->sql_driver_thd); + return NULL; + } + else + { + /* + We have reached the limit of how much memory we are allowed to use + for queuing events, so wait for the thread to consume some of its + queue. + */ + if (!*did_enter_cond) + { + /* + We need to do the debug_sync before enter_cond(). + Because debug_sync changes the thd->mysys_var->current_mutex, + and this can cause THD::awake to use the wrong mutex. + */ + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", + { + debug_sync_set_action(rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_ready")); + };); + *old_msg= rli->sql_driver_thd->enter_cond + (&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread, + "Waiting for room in worker thread event queue"); + *did_enter_cond= true; + } + mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread); + } + } + } + if (!thr) + rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx], + this); + + return thr; +} + static void free_rpl_parallel_entry(void *element) { rpl_parallel_entry *e= (rpl_parallel_entry *)element; + if (e->current_gco) + dealloc_gco(e->current_gco); mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); my_free(e); @@ -710,10 +1089,22 @@ rpl_parallel::find(uint32 domain_id) (const uchar *)&domain_id, 0))) { /* Allocate a new, empty one. */ - if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e), - MYF(MY_ZEROFILL)))) + ulong count= opt_slave_domain_parallel_threads; + if (count == 0 || count > opt_slave_parallel_threads) + count= opt_slave_parallel_threads; + rpl_parallel_thread **p; + if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), + &e, sizeof(*e), + &p, count*sizeof(*p), + NULL)) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); return NULL; + } + e->rpl_threads= p; + e->rpl_thread_max= count; e->domain_id= domain_id; + e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; if (my_hash_insert(&domain_hash, (uchar *)e)) { my_free(e); @@ -731,10 +1122,11 @@ rpl_parallel::find(uint32 domain_id) void -rpl_parallel::wait_for_done() +rpl_parallel::wait_for_done(THD *thd) { struct rpl_parallel_entry *e; - uint32 i; + rpl_parallel_thread *rpt; + uint32 i, j; /* First signal all workers that they must force quit; no more events will @@ -742,26 +1134,58 @@ rpl_parallel::wait_for_done() */ for (i= 0; i < domain_hash.records; ++i) { - rpl_parallel_thread *rpt; - e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + /* + We want the worker threads to stop as quickly as is safe. If the slave + SQL threads are behind, we could have significant amount of events + queued for the workers, and we want to stop without waiting for them + all to be applied first. But if any event group has already started + executing in a worker, we want to be sure that all prior event groups + are also executed, so that we stop at a consistent point in the binlog + stream (per replication domain). + + All event groups wait for e->count_committing_event_groups to reach + the value of group_commit_orderer::wait_count before starting to + execute. Thus, at this point we know that any event group with a + strictly larger wait_count are safe to skip, none of them can have + started executing yet. So we set e->stop_count here and use it to + decide in the worker threads whether to continue executing an event + group or whether to skip it, when force_abort is set. + */ e->force_abort= true; - if ((rpt= e->rpl_thread)) + e->stop_count= e->count_committing_event_groups; + mysql_mutex_unlock(&e->LOCK_parallel_entry); + for (j= 0; j < e->rpl_thread_max; ++j) { - mysql_mutex_lock(&rpt->LOCK_rpl_thread); - if (rpt->current_entry == e) - mysql_cond_signal(&rpt->COND_rpl_thread); - mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if ((rpt= e->rpl_threads[j])) + { + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (rpt->current_owner == &e->rpl_threads[j]) + mysql_cond_signal(&rpt->COND_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } } } + DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger", + { + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL wait_for_done_waiting")); + };); for (i= 0; i < domain_hash.records; ++i) { e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); - mysql_mutex_lock(&e->LOCK_parallel_entry); - while (e->current_sub_id > e->last_committed_sub_id) - mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); - mysql_mutex_unlock(&e->LOCK_parallel_entry); + for (j= 0; j < e->rpl_thread_max; ++j) + { + if ((rpt= e->rpl_threads[j])) + { + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + while (rpt->current_owner == &e->rpl_threads[j]) + mysql_cond_wait(&e->COND_parallel_entry, &rpt->LOCK_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } + } } } @@ -788,6 +1212,21 @@ rpl_parallel::workers_idle() /* + This is used when we get an error during processing in do_event(); + We will not queue any event to the thread, but we still need to wake it up + to be sure that it will be returned to the pool. +*/ +static void +abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread, + bool *did_enter_cond, const char *old_msg) +{ + unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread, + did_enter_cond, old_msg); + mysql_cond_signal(&cur_thread->COND_rpl_thread); +} + + +/* do_event() is executed by the sql_driver_thd thread. It's main purpose is to find a thread that can execute the query. @@ -815,6 +1254,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, /* Stop queueing additional event groups once the SQL thread is requested to stop. + + We have to queue any remaining events of any event group that has already + been partially queued, but after that we will just ignore any further + events the SQL driver thread may try to queue, and eventually it will stop. */ if (((typ= ev->get_type_code()) == GTID_EVENT || !(is_group_event= Log_event::is_group_event(typ))) && @@ -823,188 +1266,121 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (sql_thread_stopping) { delete ev; - /* QQ: Need a better comment why we return false here */ + /* + Return false ("no error"); normal stop is not an error, and otherwise the + error has already been recorded. + */ return false; } - if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), - MYF(0)))) + if (typ == GTID_EVENT || unlikely(!current)) { - my_error(ER_OUT_OF_RESOURCES, MYF(0)); + uint32 domain_id; + if (likely(typ == GTID_EVENT)) + { + Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + 0 : gtid_ev->domain_id); + } + else + domain_id= 0; + if (!(e= find(domain_id))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + delete ev; + return true; + } + current= e; + } + else + e= current; + + /* + Find a worker thread to queue the event for. + Prefer a new thread, so we maximise parallelism (at least for the group + commit). But do not exceed a limit of --slave-domain-parallel-threads; + instead re-use a thread that we queued for previously. + */ + cur_thread= + e->choose_thread(rli, &did_enter_cond, &old_msg, typ != GTID_EVENT); + if (!cur_thread) + { + /* This means we were killed. The error is already signalled. */ + delete ev; + return true; + } + + if (!(qev= cur_thread->get_qev(ev, event_size, rli))) + { + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, old_msg); delete ev; return true; } - qev->ev= ev; - qev->event_size= event_size; - qev->next= NULL; - strcpy(qev->event_relay_log_name, rli->event_relay_log_name); - qev->event_relay_log_pos= rli->event_relay_log_pos; - qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; - strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? - 0 : gtid_ev->domain_id); - if (!(e= find(domain_id)) || - !(rgi= new rpl_group_info(rli)) || - event_group_new_gtid(rgi, gtid_ev)) + if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e))) { - my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); - delete rgi; - my_free(qev); + cur_thread->free_qev(qev); + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, old_msg); delete ev; return true; } - rgi->is_parallel_exec = true; - if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) - rgi->deferred_events= new Deferred_log_events(rli); - if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && - e->last_commit_id == gtid_ev->commit_id) - { - /* - We are already executing something else in this domain. But the two - event groups were committed together in the same group commit on the - master, so we can still do them in parallel here on the slave. + /* + We queue the event group in a new worker thread, to run in parallel + with previous groups. - However, the commit of this event must wait for the commit of the prior - event, to preserve binlog commit order and visibility across all - servers in the replication hierarchy. + To preserve commit order within the replication domain, we set up + rgi->wait_commit_sub_id to make the new group commit only after the + previous group has committed. - In addition, we must not start executing this event until we have - finished the previous collection of event groups that group-committed - together; we use rgi->wait_start_sub_id to control this. - */ - rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); - rgi->wait_commit_sub_id= e->current_sub_id; - rgi->wait_commit_group_info= e->current_group_info; - rgi->wait_start_sub_id= e->prev_groupcommit_sub_id; - e->rpl_thread= cur_thread= rpt; - /* get_thread() returns with the LOCK_rpl_thread locked. */ - } - else + Event groups that group-committed together on the master can be run + in parallel with each other without restrictions. But one batch of + group-commits may not start before all groups in the previous batch + have initiated their commit phase; we set up rgi->gco to ensure that. + */ + rgi->wait_commit_sub_id= e->current_sub_id; + rgi->wait_commit_group_info= e->current_group_info; + + if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && + e->last_commit_id == gtid_ev->commit_id)) { /* - Check if we already have a worker thread for this entry. - - We continue to queue more events up for the worker thread while it is - still executing the first ones, to be able to start executing a large - event group without having to wait for the end to be fetched from the - master. And we continue to queue up more events after the first group, - so that we can continue to process subsequent parts of the relay log in - parallel without having to wait for previous long-running events to - complete. - - But if the worker thread is idle at any point, it may return to the - idle list or start servicing a different request. So check this, and - allocate a new thread if the old one is no longer processing for us. + A new batch of transactions that group-committed together on the master. + + Remember the count that marks the end of the previous group committed + batch, and allocate a new gco. */ - cur_thread= e->rpl_thread; - if (cur_thread) - { - mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); - for (;;) - { - if (cur_thread->current_entry != e) - { - /* - The worker thread became idle, and returned to the free list and - possibly was allocated to a different request. This also means - that everything previously queued has already been executed, - else the worker thread would not have become idle. So we should - allocate a new worker thread. - */ - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - e->rpl_thread= cur_thread= NULL; - break; - } - else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) - break; // The thread is ready to queue into - else if (rli->sql_driver_thd->check_killed()) - { - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - my_error(ER_CONNECTION_KILLED, MYF(0)); - delete rgi; - my_free(qev); - delete ev; - DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", - { - debug_sync_set_action(rli->sql_driver_thd, - STRING_WITH_LEN("now SIGNAL wait_queue_killed")); - };); - slave_output_error_info(rli, rli->sql_driver_thd); - return true; - } - else - { - /* - We have reached the limit of how much memory we are allowed to - use for queuing events, so wait for the thread to consume some - of its queue. - */ - if (!did_enter_cond) - { - old_msg= rli->sql_driver_thd->enter_cond - (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread, - "Waiting for room in worker thread event queue"); - did_enter_cond= true; - DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", - { - debug_sync_set_action(rli->sql_driver_thd, - STRING_WITH_LEN("now SIGNAL wait_queue_ready")); - };); - } - mysql_cond_wait(&cur_thread->COND_rpl_thread, - &cur_thread->LOCK_rpl_thread); - } - } - } + uint64 count= e->count_queued_event_groups; + group_commit_orderer *gco; - if (!cur_thread) - { - /* - Nothing else is currently running in this domain. We can - spawn a new thread to do this event group in parallel with - anything else that might be running in other domains. - */ - cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e); - /* get_thread() returns with the LOCK_rpl_thread locked. */ - } - else + if (!(gco= cur_thread->get_gco(count, e->current_gco))) { - /* - We are still executing the previous event group for this replication - domain, and we have to wait for that to finish before we can start on - the next one. So just re-use the thread. - */ + cur_thread->free_rgi(rgi); + cur_thread->free_qev(qev); + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, old_msg); + delete ev; + return true; } - - rgi->wait_commit_sub_id= 0; - rgi->wait_start_sub_id= 0; - e->prev_groupcommit_sub_id= e->current_sub_id; + e->current_gco= rgi->gco= gco; } - + else + rgi->gco= e->current_gco; if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) - { - e->last_server_id= gtid_ev->server_id; - e->last_seq_no= gtid_ev->seq_no; e->last_commit_id= gtid_ev->commit_id; - } else - { - e->last_server_id= 0; - e->last_seq_no= 0; e->last_commit_id= 0; - } - qev->rgi= e->current_group_info= rgi; e->current_sub_id= rgi->gtid_sub_id; - current= rgi->parallel_entry= e; + ++e->count_queued_event_groups; } - else if (!is_group_event || !current) + else if (!is_group_event || !e) { my_off_t log_pos; int err; @@ -1014,7 +1390,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Same for events not preceeded by GTID (we should not see those normally, but they might be from an old master). - The varuable `current' is NULL for the case where the master did not + The variable `e' is NULL for the case where the master did not have GTID, like a MariaDB 5.5 or MySQL master. */ qev->rgi= serial_rgi; @@ -1041,18 +1417,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (err) { - my_free(qev); + cur_thread->free_qev(qev); + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, old_msg); return true; } - qev->ev= NULL; - qev->future_event_master_log_pos= log_pos; - if (!current) - { - rli->event_relay_log_pos= rli->future_event_relay_log_pos; - handle_queued_pos_update(rli->sql_driver_thd, qev); - my_free(qev); - return false; - } /* Queue an empty event, so that the position will be updated in a reasonable way relative to other events: @@ -1065,40 +1434,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, least the position will not be updated until one of them has reached the current point. */ - cur_thread= current->rpl_thread; - if (cur_thread) - { - mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); - if (cur_thread->current_entry != current) - { - /* Not ours anymore, we need to grab a new one. */ - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - cur_thread= NULL; - } - } - if (!cur_thread) - cur_thread= current->rpl_thread= - global_rpl_thread_pool.get_thread(current); + qev->ev= NULL; + qev->future_event_master_log_pos= log_pos; } else { - cur_thread= current->rpl_thread; - if (cur_thread) - { - mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); - if (cur_thread->current_entry != current) - { - /* Not ours anymore, we need to grab a new one. */ - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - cur_thread= NULL; - } - } - if (!cur_thread) - { - cur_thread= current->rpl_thread= - global_rpl_thread_pool.get_thread(current); - } - qev->rgi= current->current_group_info; + qev->rgi= e->current_group_info; } /* @@ -1106,10 +1447,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, */ rli->event_relay_log_pos= rli->future_event_relay_log_pos; cur_thread->enqueue(qev); - if (did_enter_cond) - rli->sql_driver_thd->exit_cond(old_msg); - else - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread, + &did_enter_cond, old_msg); mysql_cond_signal(&cur_thread->COND_rpl_thread); return false; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 019a354c57d..90649230f98 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -9,16 +9,66 @@ struct rpl_parallel_entry; struct rpl_parallel_thread_pool; class Relay_log_info; + + +/* + Structure used to keep track of the parallel replication of a batch of + event-groups that group-committed together on the master. + + It is used to ensure that every event group in one batch has reached the + commit stage before the next batch starts executing. + + Note the lifetime of this structure: + + - It is allocated when the first event in a new batch of group commits + is queued, from the free list rpl_parallel_entry::gco_free_list. + + - The gco for the batch currently being queued is owned by + rpl_parallel_entry::current_gco. The gco for a previous batch that has + been fully queued is owned by the gco->prev_gco pointer of the gco for + the following batch. + + - The worker thread waits on gco->COND_group_commit_orderer for + rpl_parallel_entry::count_committing_event_groups to reach wait_count + before starting; the first waiter links the gco into the next_gco + pointer of the gco of the previous batch for signalling. + + - When an event group reaches the commit stage, it signals the + COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and + rpl_parallel_entry::count_committing_event_groups has reached + gco->next_gco->wait_count. + + - When gco->wait_count is reached for a worker and the wait completes, + the worker frees gco->prev_gco; at this point it is guaranteed not to + be needed any longer. +*/ +struct group_commit_orderer { + /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */ + mysql_cond_t COND_group_commit_orderer; + uint64 wait_count; + group_commit_orderer *prev_gco; + group_commit_orderer *next_gco; + bool installed; +}; + + struct rpl_parallel_thread { bool delay_start; bool running; bool stop; mysql_mutex_t LOCK_rpl_thread; mysql_cond_t COND_rpl_thread; + mysql_cond_t COND_rpl_thread_queue; struct rpl_parallel_thread *next; /* For free list. */ struct rpl_parallel_thread_pool *pool; THD *thd; - struct rpl_parallel_entry *current_entry; + /* + Who owns the thread, if any (it's a pointer into the + rpl_parallel_entry::rpl_threads array. + */ + struct rpl_parallel_thread **current_owner; + /* The rpl_parallel_entry of the owner. */ + rpl_parallel_entry *current_entry; struct queued_event { queued_event *next; Log_event *ev; @@ -31,6 +81,9 @@ struct rpl_parallel_thread { size_t event_size; } *event_queue, *last_in_queue; uint64 queued_size; + queued_event *qev_free_list; + rpl_group_info *rgi_free_list; + group_commit_orderer *gco_free_list; void enqueue(queued_event *qev) { @@ -42,15 +95,25 @@ struct rpl_parallel_thread { queued_size+= qev->event_size; } - void dequeue(queued_event *list) + void dequeue1(queued_event *list) { - queued_event *tmp; - DBUG_ASSERT(list == event_queue); event_queue= last_in_queue= NULL; - for (tmp= list; tmp; tmp= tmp->next) - queued_size-= tmp->event_size; } + + void dequeue2(size_t dequeue_size) + { + queued_size-= dequeue_size; + } + + queued_event *get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli); + void free_qev(queued_event *qev); + rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, + rpl_parallel_entry *e); + void free_rgi(rpl_group_info *rgi); + group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev); + void free_gco(group_commit_orderer *gco); }; @@ -66,14 +129,16 @@ struct rpl_parallel_thread_pool { rpl_parallel_thread_pool(); int init(uint32 size); void destroy(); - struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry); + struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner, + rpl_parallel_entry *entry); + void release_thread(rpl_parallel_thread *rpt); }; struct rpl_parallel_entry { + mysql_mutex_t LOCK_parallel_entry; + mysql_cond_t COND_parallel_entry; uint32 domain_id; - uint32 last_server_id; - uint64 last_seq_no; uint64 last_commit_id; bool active; /* @@ -82,15 +147,41 @@ struct rpl_parallel_entry { waiting for event groups to complete. */ bool force_abort; + /* + At STOP SLAVE (force_abort=true), we do not want to process all events in + the queue (which could unnecessarily delay stop, if a lot of events happen + to be queued). The stop_count provides a safe point at which to stop, so + that everything before becomes committed and nothing after does. The value + corresponds to group_commit_orderer::wait_count; if wait_count is less than + or equal to stop_count, we execute the associated event group, else we + skip it (and all following) and stop. + */ + uint64 stop_count; - rpl_parallel_thread *rpl_thread; + /* + Cyclic array recording the last rpl_thread_max worker threads that we + queued event for. This is used to limit how many workers a single domain + can occupy (--slave-domain-parallel-threads). + + Note that workers are never explicitly deleted from the array. Instead, + we need to check (under LOCK_rpl_thread) that the thread still belongs + to us before re-using (rpl_thread::current_owner). + */ + rpl_parallel_thread **rpl_threads; + uint32 rpl_thread_max; + uint32 rpl_thread_idx; /* The sub_id of the last transaction to commit within this domain_id. Must be accessed under LOCK_parallel_entry protection. + + Event groups commit in order, so the rpl_group_info for an event group + will be alive (at least) as long as + rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to + safely refer back to previous event groups if they are still executing, + and ignore them if they completed, without requiring explicit + synchronisation between the threads. */ uint64 last_committed_sub_id; - mysql_mutex_t LOCK_parallel_entry; - mysql_cond_t COND_parallel_entry; /* The sub_id of the last event group in this replication domain that was queued for execution by a worker thread. @@ -98,14 +189,29 @@ struct rpl_parallel_entry { uint64 current_sub_id; rpl_group_info *current_group_info; /* - The sub_id of the last event group in the previous batch of group-committed - transactions. - - When we spawn parallel worker threads for the next group-committed batch, - they first need to wait for this sub_id to be committed before it is safe - to start executing them. + If we get an error in some event group, we set the sub_id of that event + group here. Then later event groups (with higher sub_id) can know not to + try to start (event groups that already started will be rolled back when + wait_for_prior_commit() returns error). + The value is ULONGLONG_MAX when no error occured. + */ + uint64 stop_on_error_sub_id; + /* Total count of event groups queued so far. */ + uint64 count_queued_event_groups; + /* + Count of event groups that have started (but not necessarily completed) + the commit phase. We use this to know when every event group in a previous + batch of master group commits have started committing on the slave, so + that it is safe to start executing the events in the following batch. */ - uint64 prev_groupcommit_sub_id; + uint64 count_committing_event_groups; + /* The group_commit_orderer object for the events currently being queued. */ + group_commit_orderer *current_gco; + + rpl_parallel_thread * choose_thread(Relay_log_info *rli, bool *did_enter_cond, + const char **old_msg, bool reuse); + group_commit_orderer *get_gco(); + void free_gco(group_commit_orderer *gco); }; struct rpl_parallel { HASH domain_hash; @@ -116,7 +222,7 @@ struct rpl_parallel { ~rpl_parallel(); void reset(); rpl_parallel_entry *find(uint32 domain_id); - void wait_for_done(); + void wait_for_done(THD *thd); bool workers_idle(); bool do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 797f5681ec5..ffe5e516069 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1479,14 +1479,27 @@ end: } -rpl_group_info::rpl_group_info(Relay_log_info *rli_) - : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), - wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), - deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), - tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), - is_parallel_exec(false), is_error(false), - row_stmt_start_timestamp(0), long_find_row_note_printed(false) +void +rpl_group_info::reinit(Relay_log_info *rli) +{ + this->rli= rli; + tables_to_lock= NULL; + tables_to_lock_count= 0; + trans_retries= 0; + last_event_start_time= 0; + is_error= false; + row_stmt_start_timestamp= 0; + long_find_row_note_printed= false; + did_mark_start_commit= false; + commit_orderer.reinit(); +} + +rpl_group_info::rpl_group_info(Relay_log_info *rli) + : thd(0), gtid_sub_id(0), wait_commit_sub_id(0), + wait_commit_group_info(0), parallel_entry(0), + deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) { + reinit(rli); bzero(¤t_gtid, sizeof(current_gtid)); mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); @@ -1706,4 +1719,40 @@ void rpl_group_info::slave_close_thread_tables(THD *thd) } + +static void +mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco) +{ + uint64 count= ++e->count_committing_event_groups; + if (gco->next_gco && gco->next_gco->wait_count == count) + mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer); +} + + +void +rpl_group_info::mark_start_commit_no_lock() +{ + if (did_mark_start_commit) + return; + mark_start_commit_inner(parallel_entry, gco); + did_mark_start_commit= true; +} + + +void +rpl_group_info::mark_start_commit() +{ + rpl_parallel_entry *e; + + if (did_mark_start_commit) + return; + + e= this->parallel_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + mark_start_commit_inner(e, gco); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + did_mark_start_commit= true; +} + + #endif diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 3f95849a926..0ba259b0efd 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -481,6 +481,7 @@ private: struct rpl_group_info { + rpl_group_info *next; /* For free list in rpl_parallel_thread */ Relay_log_info *rli; THD *thd; /* @@ -510,14 +511,15 @@ struct rpl_group_info uint64 wait_commit_sub_id; rpl_group_info *wait_commit_group_info; /* - If non-zero, the event group must wait for this sub_id to be committed - before the execution of the event group is allowed to start. + This holds a pointer to a struct that keeps track of the need to wait + for the previous batch of event groups to reach the commit stage, before + this batch can start to execute. (When we execute in parallel the transactions that group committed together on the master, we still need to wait for any prior transactions - to have commtted). + to have reached the commit stage). */ - uint64 wait_start_sub_id; + group_commit_orderer *gco; struct rpl_parallel_entry *parallel_entry; @@ -567,18 +569,22 @@ struct rpl_group_info char future_event_master_log_name[FN_REFLEN]; bool is_parallel_exec; bool is_error; + /* + Set true when we signalled that we reach the commit phase. Used to avoid + counting one event group twice. + */ + bool did_mark_start_commit; -private: /* Runtime state for printing a note when slave is taking too long while processing a row event. */ time_t row_stmt_start_timestamp; bool long_find_row_note_printed; -public: rpl_group_info(Relay_log_info *rli_); ~rpl_group_info(); + void reinit(Relay_log_info *rli); /* Returns true if the argument event resides in the containter; @@ -661,6 +667,8 @@ public: void clear_tables_to_lock(); void cleanup_context(THD *, bool); void slave_close_thread_tables(THD *); + void mark_start_commit_no_lock(); + void mark_start_commit(); time_t get_row_stmt_start_timestamp() { diff --git a/sql/slave.cc b/sql/slave.cc index 07209c30ff3..b081a3369f5 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -330,7 +330,7 @@ run_slave_init_thread() pthread_t th; slave_init_thread_running= true; - if (mysql_thread_create(key_thread_slave_init, &th, NULL, + if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib, handle_slave_init, NULL)) { sql_print_error("Failed to create thread while initialising slave"); @@ -4526,7 +4526,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, } if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(); + rli->parallel.wait_for_done(thd); /* Thread stopped. Print the current replication position to the log */ { @@ -4552,7 +4552,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, get the correct position printed.) */ if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(); + rli->parallel.wait_for_done(thd); /* Some events set some playgrounds, which won't be cleared because thread diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 09ec7361fe3..b9e1bf91888 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -5667,14 +5667,23 @@ bool THD::rgi_have_temporary_tables() } +void +wait_for_commit::reinit() +{ + subsequent_commits_list= NULL; + next_subsequent_commit= NULL; + waitee= NULL; + opaque_pointer= NULL; + wakeup_error= 0; + wakeup_subsequent_commits_running= false; +} + + wait_for_commit::wait_for_commit() - : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0), - opaque_pointer(0), - waiting_for_commit(false), wakeup_error(0), - wakeup_subsequent_commits_running(false) { mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0); + reinit(); } @@ -5722,7 +5731,7 @@ wait_for_commit::wakeup(int wakeup_error) */ mysql_mutex_lock(&LOCK_wait_commit); - waiting_for_commit= false; + waitee= NULL; this->wakeup_error= wakeup_error; /* Note that it is critical that the mysql_cond_signal() here is done while @@ -5754,9 +5763,8 @@ wait_for_commit::wakeup(int wakeup_error) void wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) { - waiting_for_commit= true; - wakeup_error= 0; DBUG_ASSERT(!this->waitee /* No prior registration allowed */); + wakeup_error= 0; this->waitee= waitee; mysql_mutex_lock(&waitee->LOCK_wait_commit); @@ -5766,7 +5774,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) see comments on wakeup_subsequent_commits2() for details. */ if (waitee->wakeup_subsequent_commits_running) - waiting_for_commit= false; + this->waitee= NULL; else { /* @@ -5795,9 +5803,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) DEBUG_SYNC(thd, "wait_for_prior_commit_waiting"); old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit, "Waiting for prior transaction to commit"); - while (waiting_for_commit && !thd->check_killed()) + while ((loc_waitee= this->waitee) && !thd->check_killed()) mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); - if (!waiting_for_commit) + if (!loc_waitee) { if (wakeup_error) my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); @@ -5810,7 +5818,6 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) waiter as to whether we succeed or fail (eg. we may roll back but waitee might attempt to commit both us and any subsequent commits waiting for us). */ - loc_waitee= this->waitee; mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); if (loc_waitee->wakeup_subsequent_commits_running) { @@ -5819,21 +5826,29 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) do { mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); - } while (waiting_for_commit); + } while (this->waitee); + if (wakeup_error) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); goto end; } remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + this->waitee= NULL; - DEBUG_SYNC(thd, "wait_for_prior_commit_killed"); wakeup_error= thd->killed_errno(); if (!wakeup_error) wakeup_error= ER_QUERY_INTERRUPTED; my_message(wakeup_error, ER(wakeup_error), MYF(0)); + thd->exit_cond(old_msg); + /* + Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to + use within enter_cond/exit_cond. + */ + DEBUG_SYNC(thd, "wait_for_prior_commit_killed"); + return wakeup_error; end: thd->exit_cond(old_msg); - waitee= NULL; return wakeup_error; } @@ -5916,10 +5931,11 @@ wait_for_commit::wakeup_subsequent_commits2(int wakeup_error) void wait_for_commit::unregister_wait_for_prior_commit2() { + wait_for_commit *loc_waitee; + mysql_mutex_lock(&LOCK_wait_commit); - if (waiting_for_commit) + if ((loc_waitee= this->waitee)) { - wait_for_commit *loc_waitee= this->waitee; mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); if (loc_waitee->wakeup_subsequent_commits_running) { @@ -5931,7 +5947,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() See comments on wakeup_subsequent_commits2() for more details. */ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); - while (waiting_for_commit) + while (this->waitee) mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); } else @@ -5939,10 +5955,10 @@ wait_for_commit::unregister_wait_for_prior_commit2() /* Remove ourselves from the list in the waitee. */ remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + this->waitee= NULL; } } mysql_mutex_unlock(&LOCK_wait_commit); - this->waitee= NULL; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 726b920edb1..57dd92db49d 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1598,8 +1598,8 @@ struct wait_for_commit { /* The LOCK_wait_commit protects the fields subsequent_commits_list and - wakeup_subsequent_commits_running (for a waitee), and the flag - waiting_for_commit and associated COND_wait_commit (for a waiter). + wakeup_subsequent_commits_running (for a waitee), and the pointer + waiterr and associated COND_wait_commit (for a waiter). */ mysql_mutex_t LOCK_wait_commit; mysql_cond_t COND_wait_commit; @@ -1607,7 +1607,13 @@ struct wait_for_commit wait_for_commit *subsequent_commits_list; /* Link field for entries in subsequent_commits_list. */ wait_for_commit *next_subsequent_commit; - /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */ + /* + Our waitee, if we did register_wait_for_prior_commit(), and were not + yet woken up. Else NULL. + + When this is cleared for wakeup, the COND_wait_commit condition is + signalled. + */ wait_for_commit *waitee; /* Generic pointer for use by the transaction coordinator to optimise the @@ -1618,12 +1624,6 @@ struct wait_for_commit used by another transaction coordinator for similar purposes. */ void *opaque_pointer; - /* - The waiting_for_commit flag is cleared when a waiter has been woken - up. The COND_wait_commit condition is signalled when this has been - cleared. - */ - bool waiting_for_commit; /* The wakeup error code from the waitee. 0 means no error. */ int wakeup_error; /* @@ -1639,10 +1639,14 @@ struct wait_for_commit Quick inline check, to avoid function call and locking in the common case where no wakeup is registered, or a registered wait was already signalled. */ - if (waiting_for_commit) + if (waitee) return wait_for_prior_commit2(thd); else + { + if (wakeup_error) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); return wakeup_error; + } } void wakeup_subsequent_commits(int wakeup_error) { @@ -1663,7 +1667,7 @@ struct wait_for_commit } void unregister_wait_for_prior_commit() { - if (waiting_for_commit) + if (waitee) unregister_wait_for_prior_commit2(); } /* @@ -1683,7 +1687,7 @@ struct wait_for_commit } next_ptr_ptr= &cur->next_subsequent_commit; } - waiting_for_commit= false; + waitee= NULL; } void wakeup(int wakeup_error); @@ -1694,6 +1698,7 @@ struct wait_for_commit wait_for_commit(); ~wait_for_commit(); + void reinit(); }; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 69d93968f9f..35e1cd457ee 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1609,6 +1609,49 @@ static Sys_var_ulong Sys_slave_parallel_threads( ON_UPDATE(fix_slave_parallel_threads)); +static bool +check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var) +{ + bool running; + + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + if (running) + return true; + + return false; +} + +static bool +fix_slave_domain_parallel_threads(sys_var *self, THD *thd, enum_var_type type) +{ + bool running; + + mysql_mutex_unlock(&LOCK_global_system_variables); + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_global_system_variables); + + return running ? true : false; +} + + +static Sys_var_ulong Sys_slave_domain_parallel_threads( + "slave_domain_parallel_threads", + "Maximum number of parallel threads to use on slave for events in a " + "single replication domain. When using multiple domains, this can be " + "used to limit a single domain from grabbing all threads and thus " + "stalling other domains. The default of 0 means to allow a domain to " + "grab as many threads as it wants, up to the value of " + "slave_parallel_threads.", + GLOBAL_VAR(opt_slave_domain_parallel_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG, ON_CHECK(check_slave_domain_parallel_threads), + ON_UPDATE(fix_slave_domain_parallel_threads)); + + static Sys_var_ulong Sys_slave_parallel_max_queued( "slave_parallel_max_queued", "Limit on how much memory SQL threads should use per parallel " |