diff options
44 files changed, 1521 insertions, 533 deletions
diff --git a/include/my_global.h b/include/my_global.h index c3d9ac21861..e9a472e686e 100644 --- a/include/my_global.h +++ b/include/my_global.h @@ -144,6 +144,7 @@ /* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */ #if defined(_AIX) && defined(_LARGE_FILE_API) #undef _LARGE_FILE_API +#undef __GNUG__ #endif /* @@ -264,6 +265,16 @@ #endif #endif + +#ifdef _AIX +/* + AIX includes inttypes.h from sys/types.h + Explicitly request format macros before the first inclusion of inttypes.h +*/ +#define __STDC_FORMAT_MACROS +#endif + + #if !defined(__WIN__) #ifndef _POSIX_PTHREAD_SEMANTICS #define _POSIX_PTHREAD_SEMANTICS /* We want posix threads */ @@ -316,6 +327,13 @@ C_MODE_END #define _LONG_LONG 1 /* For AIX string library */ #endif +/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */ +#if defined(_AIX) && defined(_LARGE_FILE_API) +#undef _LARGE_FILE_API +#undef __GNUG__ +#endif + + #ifndef stdin #include <stdio.h> #endif @@ -341,6 +359,14 @@ C_MODE_END #ifdef HAVE_SYS_TYPES_H #include <sys/types.h> #endif + +/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */ +#if defined(_AIX) && defined(_LARGE_FILE_API) +#undef _LARGE_FILE_API +#undef __GNUG__ +#endif + + #ifdef HAVE_FCNTL_H #include <fcntl.h> #endif @@ -1214,4 +1240,10 @@ static inline double rint(double x) #define HAVE_EXTERNAL_CLIENT #endif /* EMBEDDED_LIBRARY */ +/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */ +#if defined(_AIX) && defined(_LARGE_FILE_API) +#undef _LARGE_FILE_API +#undef __GNUG__ +#endif + #endif /* my_global_h */ diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result index 9b2b6e12a14..19f982ef554 100644 --- a/mysql-test/r/mysqld--help.result +++ b/mysql-test/r/mysqld--help.result @@ -867,6 +867,14 @@ The following options may be given as the first argument: operations that are idempotent. This means that CREATE TABLE is treated CREATE TABLE OR REPLACE and DROP TABLE is threated as DROP TABLE IF EXISTS. + --slave-domain-parallel-threads=# + Maximum number of parallel threads to use on slave for + events in a single replication domain. When using + multiple domains, this can be used to limit a single + domain from grabbing all threads and thus stalling other + domains. The default of 0 means to allow a domain to grab + as many threads as it wants, up to the value of + slave_parallel_threads. --slave-exec-mode=name Modes for how replication events should be executed. Legal values are STRICT (default) and IDEMPOTENT. In @@ -1275,6 +1283,7 @@ skip-show-database FALSE skip-slave-start FALSE slave-compressed-protocol FALSE slave-ddl-exec-mode IDEMPOTENT +slave-domain-parallel-threads 0 slave-exec-mode STRICT slave-max-allowed-packet 1073741824 slave-net-timeout 3600 diff --git a/mysql-test/suite/csv/csv.result b/mysql-test/suite/csv/csv.result index fc6aab530c7..8d497f52b31 100644 --- a/mysql-test/suite/csv/csv.result +++ b/mysql-test/suite/csv/csv.result @@ -5485,3 +5485,11 @@ SELECT * FROM t1; ERROR HY000: Table 't1' is marked as crashed and should be repaired DROP TABLE t1; End of 5.1 tests +# +# MDEV-5612 - my_rename() deletes files when it shouldn't +# +CREATE TABLE t1(a INT NOT NULL) ENGINE=CSV; +RENAME TABLE t1 TO t2; +SELECT * FROM t2; +a +DROP TABLE t2; diff --git a/mysql-test/suite/csv/csv.test b/mysql-test/suite/csv/csv.test index 768a21912a2..90617d06599 100644 --- a/mysql-test/suite/csv/csv.test +++ b/mysql-test/suite/csv/csv.test @@ -1917,3 +1917,12 @@ SELECT * FROM t1; DROP TABLE t1; --echo End of 5.1 tests + +--echo # +--echo # MDEV-5612 - my_rename() deletes files when it shouldn't +--echo # +CREATE TABLE t1(a INT NOT NULL) ENGINE=CSV; +move_file $MYSQLD_DATADIR/test/t1.CSV $MYSQLD_DATADIR/test/t2.CSV; +RENAME TABLE t1 TO t2; +SELECT * FROM t2; +DROP TABLE t2; diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result index ec4bfb5b338..c2a9be78385 100644 --- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result +++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result @@ -37,15 +37,15 @@ where name like 'Wait/Synch/Cond/sql/%' order by name limit 10; NAME ENABLED TIMED wait/synch/cond/sql/COND_flush_thread_cache YES YES +wait/synch/cond/sql/COND_group_commit_orderer YES YES wait/synch/cond/sql/COND_manager YES YES wait/synch/cond/sql/COND_parallel_entry YES YES wait/synch/cond/sql/COND_prepare_ordered YES YES wait/synch/cond/sql/COND_queue_state YES YES wait/synch/cond/sql/COND_rpl_thread YES YES wait/synch/cond/sql/COND_rpl_thread_pool YES YES +wait/synch/cond/sql/COND_rpl_thread_queue YES YES wait/synch/cond/sql/COND_server_started YES YES -wait/synch/cond/sql/COND_thread_cache YES YES -wait/synch/cond/sql/COND_thread_count YES YES select * from performance_schema.setup_instruments where name='Wait'; select * from performance_schema.setup_instruments diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index 7dae4de075c..b04d9e7748e 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -115,6 +115,7 @@ SET GLOBAL slave_parallel_threads=10; SET debug_sync='RESET'; include/start_slave.inc *** Test that group-committed transactions on the master can replicate in parallel on the slave. *** +SET debug_sync='RESET'; FLUSH LOGS; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7); @@ -141,6 +142,7 @@ INSERT INTO t3 VALUES (6, foo(16, '')); SET debug_sync='now WAIT_FOR master_queued3'; SET debug_sync='now SIGNAL master_cont1'; +SET debug_sync='RESET'; SELECT * FROM t3 ORDER BY a; a b 1 1 @@ -213,6 +215,9 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16, slave-bin.000003 # Xid # # COMMIT /* XID */ *** Test STOP SLAVE in parallel mode *** include/stop_slave.inc +SET debug_sync='RESET'; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; SET binlog_direct_non_transactional_updates=0; SET sql_log_bin=0; CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction"); @@ -227,10 +232,15 @@ INSERT INTO t3 VALUES(21, 21); INSERT INTO t3 VALUES(22, 22); SET binlog_format=@old_format; BEGIN; -INSERT INTO t2 VALUES (21); +INSERT INTO t2 VALUES (21); START SLAVE; +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger"; STOP SLAVE; +SET debug_sync='now WAIT_FOR wait_for_done_waiting'; ROLLBACK; +SET GLOBAL debug_dbug=@old_dbug; +SET debug_sync='RESET'; include/wait_for_slave_to_stop.inc SELECT * FROM t1 WHERE a >= 20 ORDER BY a; a @@ -292,6 +302,7 @@ a b 32 32 33 33 34 34 +SET debug_sync='RESET'; SET sql_log_bin=0; CALL mtr.add_suppression("Query execution was interrupted"); CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends"); @@ -308,6 +319,7 @@ STOP SLAVE IO_THREAD; SELECT * FROM t3 WHERE a >= 30 ORDER BY a; a b 31 31 +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -379,6 +391,7 @@ a b 42 42 43 43 44 44 +SET debug_sync='RESET'; SET debug_sync='now WAIT_FOR t2_query'; SET debug_sync='now SIGNAL t2_cont'; SET debug_sync='now WAIT_FOR t1_ready'; @@ -386,9 +399,7 @@ KILL THD_ID; SET debug_sync='now WAIT_FOR t2_killed'; SET debug_sync='now SIGNAL t1_cont'; include/wait_for_slave_sql_error.inc [errno=1317,1964] -SELECT * FROM t3 WHERE a >= 40 ORDER BY a; -a b -41 41 +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -463,6 +474,7 @@ a b 52 52 53 53 54 54 +SET debug_sync='RESET'; SET debug_sync='now WAIT_FOR t2_query'; SET debug_sync='now SIGNAL t2_cont'; SET debug_sync='now WAIT_FOR t1_ready'; @@ -473,6 +485,7 @@ include/wait_for_slave_sql_error.inc [errno=1317,1964] SELECT * FROM t3 WHERE a >= 50 ORDER BY a; a b 51 51 +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -514,14 +527,18 @@ include/start_slave.inc include/stop_slave.inc SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; -SET GLOBAL slave_parallel_threads=3; +SET GLOBAL slave_parallel_threads=4; include/start_slave.inc *** 4. Test killing thread that is waiting to start transaction until previous transaction commits *** SET binlog_format=statement; SET gtid_domain_id=2; +BEGIN; +INSERT INTO t3 VALUES (70, foo(70, +'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', '')); INSERT INTO t3 VALUES (60, foo(60, 'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2', 'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont')); +COMMIT; SET gtid_domain_id=0; SET debug_sync='now WAIT_FOR d2_query'; SET gtid_domain_id=1; @@ -540,15 +557,27 @@ INSERT INTO t3 VALUES (63, foo(63, 'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2', 'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont')); SET debug_sync='now WAIT_FOR d0_query'; +SET gtid_domain_id=3; +BEGIN; +INSERT INTO t3 VALUES (68, foo(68, +'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', '')); +INSERT INTO t3 VALUES (69, foo(69, +'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2', +'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont')); +COMMIT; +SET gtid_domain_id=0; +SET debug_sync='now WAIT_FOR d3_query'; SET debug_sync='now SIGNAL d2_cont2'; SET debug_sync='now WAIT_FOR d2_done'; SET debug_sync='now SIGNAL d1_cont2'; SET debug_sync='now WAIT_FOR d1_done'; SET debug_sync='now SIGNAL d0_cont2'; SET debug_sync='now WAIT_FOR d0_done'; +SET debug_sync='now SIGNAL d3_cont2'; +SET debug_sync='now WAIT_FOR d3_done'; SET binlog_format=statement; INSERT INTO t3 VALUES (64, foo(64, -'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', '')); +'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', '')); SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2'; INSERT INTO t3 VALUES (65, foo(65, '', '')); SET debug_sync='now WAIT_FOR master_queued2'; @@ -569,23 +598,34 @@ a b 65 65 66 66 67 67 +68 68 +69 69 +70 70 +SET debug_sync='RESET'; SET debug_sync='now SIGNAL d0_cont'; SET debug_sync='now WAIT_FOR t1_waiting'; +SET debug_sync='now SIGNAL d3_cont'; +SET debug_sync='now WAIT_FOR t2_waiting'; SET debug_sync='now SIGNAL d1_cont'; SET debug_sync='now WAIT_FOR t3_waiting'; SET debug_sync='now SIGNAL d2_cont'; +SET debug_sync='now WAIT_FOR t4_waiting'; KILL THD_ID; SET debug_sync='now WAIT_FOR t3_killed'; SET debug_sync='now SIGNAL t1_cont'; include/wait_for_slave_sql_error.inc [errno=1317,1927,1964] STOP SLAVE IO_THREAD; -SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a; a b 60 60 61 61 62 62 63 63 64 64 +68 68 +69 69 +70 70 +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -597,11 +637,11 @@ RETURN x; END || SET sql_log_bin=1; -INSERT INTO t3 VALUES (69,0); +UPDATE t3 SET b=b+1 WHERE a=60; include/start_slave.inc SELECT * FROM t3 WHERE a >= 60 ORDER BY a; a b -60 60 +60 61 61 61 62 62 63 63 @@ -609,7 +649,9 @@ a b 65 65 66 66 67 67 -69 0 +68 68 +69 69 +70 70 SET sql_log_bin=0; DROP FUNCTION foo; CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) @@ -634,37 +676,31 @@ include/start_slave.inc SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued; SET GLOBAL slave_parallel_max_queued=9000; SET binlog_format=statement; -INSERT INTO t3 VALUES (70, foo(0, +INSERT INTO t3 VALUES (80, foo(0, 'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', '')); SET debug_sync='now WAIT_FOR query_waiting'; SET @old_dbug= @@GLOBAL.debug_dbug; SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max"; -INSERT INTO t3 VALUES (72, 0); -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +SELECT * FROM t3 WHERE a >= 80 ORDER BY a; a b -70 0 -71 10000 -72 0 +80 0 +81 10000 SET debug_sync='now WAIT_FOR wait_queue_ready'; KILL THD_ID; SET debug_sync='now WAIT_FOR wait_queue_killed'; SET debug_sync='now SIGNAL query_cont'; include/wait_for_slave_sql_error.inc [errno=1317,1927,1964] STOP SLAVE IO_THREAD; -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; -a b -70 0 -71 10000 SET GLOBAL debug_dbug=@old_dbug; SET GLOBAL slave_parallel_max_queued= @old_max_queued; -INSERT INTO t3 VALUES (73,0); +INSERT INTO t3 VALUES (82,0); +SET debug_sync='RESET'; include/start_slave.inc -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +SELECT * FROM t3 WHERE a >= 80 ORDER BY a; a b -70 0 -71 10000 -72 0 -73 0 +80 0 +81 10000 +82 0 include/stop_slave.inc SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 82cec0735fc..0a064e7a227 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -163,6 +163,7 @@ SET debug_sync='RESET'; --echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. *** --connection server_1 +SET debug_sync='RESET'; FLUSH LOGS; --source include/wait_for_binlog_checkpoint.inc CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; @@ -230,6 +231,7 @@ REAP; REAP; --connection con_temp5 REAP; +SET debug_sync='RESET'; --connection server_1 SELECT * FROM t3 ORDER BY a; @@ -270,6 +272,10 @@ SELECT * FROM t3 ORDER BY a; --echo *** Test STOP SLAVE in parallel mode *** --connection server_2 --source include/stop_slave.inc +# Respawn all worker threads to clear any left-over debug_sync or other stuff. +SET debug_sync='RESET'; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; --connection server_1 # Set up a couple of transactions. The first will be blocked halfway @@ -288,7 +294,7 @@ BEGIN; INSERT INTO t2 VALUES (20); --disable_warnings INSERT INTO t1 VALUES (20); ---disable_warnings +--enable_warnings INSERT INTO t2 VALUES (21); INSERT INTO t3 VALUES (20, 20); COMMIT; @@ -300,7 +306,7 @@ SET binlog_format=@old_format; # Start a connection that will block the replicated transaction halfway. --connection con_temp1 BEGIN; -INSERT INTO t2 VALUES (21); +INSERT INTO t2 VALUES (21); --connection server_2 START SLAVE; @@ -312,13 +318,20 @@ START SLAVE; --connection con_temp2 # Initiate slave stop. It will have to wait for the current event group # to complete. +# The dbug injection causes debug_sync to signal 'wait_for_done_waiting' +# when the SQL driver thread is ready. +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger"; send STOP SLAVE; --connection con_temp1 +SET debug_sync='now WAIT_FOR wait_for_done_waiting'; ROLLBACK; --connection con_temp2 reap; +SET GLOBAL debug_dbug=@old_dbug; +SET debug_sync='RESET'; --connection server_2 --source include/wait_for_slave_to_stop.inc @@ -397,6 +410,7 @@ REAP; --connection server_1 SELECT * FROM t3 WHERE a >= 30 ORDER BY a; +SET debug_sync='RESET'; --connection server_2 SET sql_log_bin=0; @@ -431,6 +445,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY a; # Now we have to disable the debug_sync statements, so they do not trigger # when the events are retried. +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -535,6 +550,7 @@ REAP; --connection server_1 SELECT * FROM t3 WHERE a >= 40 ORDER BY a; +SET debug_sync='RESET'; --connection server_2 # Wait until T2 is inside executing its insert of 42, then find it in SHOW @@ -559,10 +575,10 @@ SET debug_sync='now SIGNAL t1_cont'; --let $slave_sql_errno= 1317,1964 --source include/wait_for_slave_sql_error.inc -SELECT * FROM t3 WHERE a >= 40 ORDER BY a; # Now we have to disable the debug_sync statements, so they do not trigger # when the events are retried. +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -673,6 +689,7 @@ REAP; --connection server_1 SELECT * FROM t3 WHERE a >= 50 ORDER BY a; +SET debug_sync='RESET'; --connection server_2 # Wait until T2 is inside executing its insert of 52, then find it in SHOW @@ -701,6 +718,7 @@ SELECT * FROM t3 WHERE a >= 50 ORDER BY a; # Now we have to disable the debug_sync statements, so they do not trigger # when the events are retried. +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -752,7 +770,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos; --source include/stop_slave.inc SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; -SET GLOBAL slave_parallel_threads=3; +SET GLOBAL slave_parallel_threads=4; --source include/start_slave.inc @@ -762,24 +780,29 @@ SET GLOBAL slave_parallel_threads=3; # can run in parallel with each other (same group commit and commit id), # but not in parallel with T1. # -# We use three worker threads. T1 and T2 will be queued on the first, T3 on -# the second, and T4 on the third. We will delay T1 commit, T3 will wait for -# T1 to commit before it can start. We will kill T3 during this wait, and +# We use four worker threads, each Ti will be queued on each their own +# worker thread. We will delay T1 commit, T3 will wait for T1 to begin +# commit before it can start. We will kill T3 during this wait, and # check that everything works correctly. # # It is rather tricky to get the correct thread id of the worker to kill. -# We start by injecting three dummy transactions in a debug_sync-controlled +# We start by injecting four dummy transactions in a debug_sync-controlled # manner to be able to get known thread ids for the workers in a pool with -# just 3 worker threads. Then we let in each of the real test transactions +# just 4 worker threads. Then we let in each of the real test transactions # T1-T4 one at a time in a way which allows us to know which transaction # ends up with which thread id. --connection server_1 SET binlog_format=statement; SET gtid_domain_id=2; +BEGIN; +# This debug_sync will linger on and be used to control T4 later. +INSERT INTO t3 VALUES (70, foo(70, + 'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', '')); INSERT INTO t3 VALUES (60, foo(60, 'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2', 'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont')); +COMMIT; SET gtid_domain_id=0; --connection server_2 @@ -813,12 +836,30 @@ INSERT INTO t3 VALUES (63, foo(63, SET debug_sync='now WAIT_FOR d0_query'; --let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'` +--connection server_1 +SET gtid_domain_id=3; +BEGIN; +# These debug_sync's will linger on and be used to control T2 later. +INSERT INTO t3 VALUES (68, foo(68, + 'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', '')); +INSERT INTO t3 VALUES (69, foo(69, + 'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2', + 'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont')); +COMMIT; +SET gtid_domain_id=0; + +--connection server_2 +SET debug_sync='now WAIT_FOR d3_query'; +--let $d3_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(69%' AND INFO NOT LIKE '%LIKE%'` + SET debug_sync='now SIGNAL d2_cont2'; SET debug_sync='now WAIT_FOR d2_done'; SET debug_sync='now SIGNAL d1_cont2'; SET debug_sync='now WAIT_FOR d1_done'; SET debug_sync='now SIGNAL d0_cont2'; SET debug_sync='now WAIT_FOR d0_done'; +SET debug_sync='now SIGNAL d3_cont2'; +SET debug_sync='now WAIT_FOR d3_done'; # Now prepare the real transactions T1, T2, T3, T4 on the master. @@ -826,7 +867,7 @@ SET debug_sync='now WAIT_FOR d0_done'; # Create transaction T1. SET binlog_format=statement; INSERT INTO t3 VALUES (64, foo(64, - 'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', '')); + 'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', '')); # Create transaction T2, as a group commit leader on the master. SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2'; @@ -861,6 +902,7 @@ REAP; --connection server_1 SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +SET debug_sync='RESET'; --connection server_2 # Now we have the four transactions pending for replication on the slave. @@ -872,15 +914,20 @@ SELECT * FROM t3 WHERE a >= 60 ORDER BY a; SET debug_sync='now SIGNAL d0_cont'; SET debug_sync='now WAIT_FOR t1_waiting'; -# T2 will be queued on the same worker D0 as T1. +# Make the worker D3 free, and wait for T2 to be queued in it. +SET debug_sync='now SIGNAL d3_cont'; +SET debug_sync='now WAIT_FOR t2_waiting'; + # Now release worker D1, and wait for T3 to be queued in it. # T3 will wait for T1 to commit before it can start. SET debug_sync='now SIGNAL d1_cont'; SET debug_sync='now WAIT_FOR t3_waiting'; -# Release worker D2. T4 may or may not have time to be queued on it, but -# it will not be able to complete due to T3 being killed. +# Release worker D2. Wait for T4 to be queued, so we are sure it has +# received the debug_sync signal (else we might overwrite it with the +# next debug_sync). SET debug_sync='now SIGNAL d2_cont'; +SET debug_sync='now WAIT_FOR t4_waiting'; # Now we kill the waiting transaction T3 in worker D1. --replace_result $d1_thd_id THD_ID @@ -895,10 +942,15 @@ SET debug_sync='now SIGNAL t1_cont'; --let $slave_sql_errno= 1317,1927,1964 --source include/wait_for_slave_sql_error.inc STOP SLAVE IO_THREAD; -SELECT * FROM t3 WHERE a >= 60 ORDER BY a; +# Since T2, T3, and T4 run in parallel, we can not be sure if T2 will have time +# to commit or not before the stop. However, T1 should commit, and T3/T4 may +# not have committed. (After slave restart we check that all become committed +# eventually). +SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a; # Now we have to disable the debug_sync statements, so they do not trigger # when the events are retried. +SET debug_sync='RESET'; SET GLOBAL slave_parallel_threads=0; SET GLOBAL slave_parallel_threads=10; SET sql_log_bin=0; @@ -914,7 +966,7 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500)) SET sql_log_bin=1; --connection server_1 -INSERT INTO t3 VALUES (69,0); +UPDATE t3 SET b=b+1 WHERE a=60; --save_master_pos --connection server_2 @@ -951,6 +1003,7 @@ SET GLOBAL slave_parallel_threads=10; --echo *** 5. Test killing thread that is waiting for queue of max length to shorten *** +# Find the thread id of the driver SQL thread that we want to kill. --let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%' --source include/wait_condition.inc --let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'` @@ -961,12 +1014,8 @@ SET GLOBAL slave_parallel_max_queued=9000; --let bigstring= `SELECT REPEAT('x', 10000)` SET binlog_format=statement; # Create an event that will wait to be signalled. -INSERT INTO t3 VALUES (70, foo(0, +INSERT INTO t3 VALUES (80, foo(0, 'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', '')); ---disable_query_log -# Create an event that will fill up the queue. -eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring')); ---enable_query_log --connection server_2 SET debug_sync='now WAIT_FOR query_waiting'; @@ -977,11 +1026,14 @@ SET @old_dbug= @@GLOBAL.debug_dbug; SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max"; --connection server_1 -# This event will have to wait for the queue to become shorter before it can -# be queued. We will test that things work when we kill the SQL driver thread -# during this wait. -INSERT INTO t3 VALUES (72, 0); -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +--disable_query_log +# Create an event that will fill up the queue. +# The Xid event at the end of the event group will have to wait for the Query +# event with the INSERT to drain so the queue becomes shorter. However that in +# turn waits for the prior event group to continue. +eval INSERT INTO t3 VALUES (81, LENGTH('$bigstring')); +--enable_query_log +SELECT * FROM t3 WHERE a >= 80 ORDER BY a; --connection server_2 SET debug_sync='now WAIT_FOR wait_queue_ready'; @@ -995,19 +1047,19 @@ SET debug_sync='now SIGNAL query_cont'; --let $slave_sql_errno= 1317,1927,1964 --source include/wait_for_slave_sql_error.inc STOP SLAVE IO_THREAD; -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; SET GLOBAL debug_dbug=@old_dbug; SET GLOBAL slave_parallel_max_queued= @old_max_queued; --connection server_1 -INSERT INTO t3 VALUES (73,0); +INSERT INTO t3 VALUES (82,0); --save_master_pos --connection server_2 +SET debug_sync='RESET'; --source include/start_slave.inc --sync_with_master -SELECT * FROM t3 WHERE a >= 70 ORDER BY a; +SELECT * FROM t3 WHERE a >= 80 ORDER BY a; --connection server_2 diff --git a/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result new file mode 100644 index 00000000000..9e53d9bd891 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result @@ -0,0 +1,13 @@ +SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads; +SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default'; +must be zero because of default +0 +SELECT @@SESSION.slave_domain_parallel_threads as 'no session var'; +ERROR HY000: Variable 'slave_domain_parallel_threads' is a GLOBAL variable +SET GLOBAL slave_domain_parallel_threads= 0; +SET GLOBAL slave_domain_parallel_threads= DEFAULT; +SET GLOBAL slave_domain_parallel_threads= 10; +SELECT @@GLOBAL.slave_domain_parallel_threads; +@@GLOBAL.slave_domain_parallel_threads +10 +SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads; diff --git a/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test new file mode 100644 index 00000000000..7be48fbd4c5 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test @@ -0,0 +1,14 @@ +--source include/not_embedded.inc + +SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads; + +SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default'; +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +SELECT @@SESSION.slave_domain_parallel_threads as 'no session var'; + +SET GLOBAL slave_domain_parallel_threads= 0; +SET GLOBAL slave_domain_parallel_threads= DEFAULT; +SET GLOBAL slave_domain_parallel_threads= 10; +SELECT @@GLOBAL.slave_domain_parallel_threads; + +SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads; diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 2008a6f8860..a3cbaff68b0 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -1281,10 +1281,6 @@ read_append_buffer: size_t transfer_len; DBUG_ASSERT(info->append_read_pos <= info->write_pos); - /* - TODO: figure out if the assert below is needed or correct. - */ - DBUG_ASSERT(pos_in_file == info->end_of_file); copy_len=MY_MIN(Count, len_in_buff); memcpy(Buffer, info->append_read_pos, copy_len); info->append_read_pos += copy_len; diff --git a/mysys/my_rename.c b/mysys/my_rename.c index 8a9e6eb3dfd..09e7eafa980 100644 --- a/mysys/my_rename.c +++ b/mysys/my_rename.c @@ -27,19 +27,18 @@ int my_rename(const char *from, const char *to, myf MyFlags) DBUG_ENTER("my_rename"); DBUG_PRINT("my",("from %s to %s MyFlags %lu", from, to, MyFlags)); -#if defined(HAVE_RENAME) #if defined(__WIN__) - /* - On windows we can't rename over an existing file: - Remove any conflicting files: - */ - (void) my_delete(to, MYF(0)); -#endif + if (!MoveFileEx(from, to, MOVEFILE_COPY_ALLOWED | + MOVEFILE_REPLACE_EXISTING)) + { + my_osmaperr(GetLastError()); +#elif defined(HAVE_RENAME) if (rename(from,to)) + { #else if (link(from, to) || unlink(from)) -#endif { +#endif my_errno=errno; error = -1; if (MyFlags & (MY_FAE+MY_WME)) diff --git a/plugin/handler_socket/libhsclient/auto_file.hpp b/plugin/handler_socket/libhsclient/auto_file.hpp index 841351e54cd..03c357f4d4e 100644 --- a/plugin/handler_socket/libhsclient/auto_file.hpp +++ b/plugin/handler_socket/libhsclient/auto_file.hpp @@ -9,6 +9,11 @@ #ifndef DENA_AUTO_FILE_HPP #define DENA_AUTO_FILE_HPP +/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */ +#if defined(_AIX) && defined(_LARGE_FILE_API) +#undef _LARGE_FILE_API +#endif + #include <unistd.h> #include <sys/types.h> #include <dirent.h> diff --git a/sql-common/mysql_async.c b/sql-common/mysql_async.c index 8f3a91e26fa..ef01f292180 100644 --- a/sql-common/mysql_async.c +++ b/sql-common/mysql_async.c @@ -121,6 +121,12 @@ my_connect_async(struct mysql_async_context *b, my_socket fd, IF_WIN(WSAGetLastError() != WSAEWOULDBLOCK, \ (errno != EAGAIN && errno != EINTR)) +#ifdef _AIX +#ifndef MSG_DONTWAIT +#define MSG_DONTWAIT 0 +#endif +#endif + ssize_t my_recv_async(struct mysql_async_context *b, int fd, unsigned char *buf, size_t size, int timeout) diff --git a/sql/field.h b/sql/field.h index 916f6211bee..cbd9175f26c 100644 --- a/sql/field.h +++ b/sql/field.h @@ -75,6 +75,8 @@ struct ha_field_option_struct; struct st_cache_field; int field_conv(Field *to,Field *from); +int field_conv_incompatible(Field *to,Field *from); +bool memcpy_field_possible(Field *to, Field *from); int truncate_double(double *nr, uint field_length, uint dec, bool unsigned_flag, double max_value); longlong double_to_longlong(double nr, bool unsigned_flag, bool *error); @@ -2451,7 +2453,7 @@ public: uint max_packed_col_length(uint max_length); void free() { value.free(); } inline void clear_temporary() { bzero((uchar*) &value,sizeof(value)); } - friend int field_conv(Field *to,Field *from); + friend int field_conv_incompatible(Field *to,Field *from); uint size_of() const { return sizeof(*this); } bool has_charset(void) const { return charset() == &my_charset_bin ? FALSE : TRUE; } diff --git a/sql/field_conv.cc b/sql/field_conv.cc index 97c82b9b7bf..f13694e2a13 100644 --- a/sql/field_conv.cc +++ b/sql/field_conv.cc @@ -823,40 +823,76 @@ Copy_field::get_copy_func(Field *to,Field *from) return do_field_eq; } +/** + Check if it is possible just copy value of the fields + + @param to The field to copy to + @param from The field to copy from + + @retval TRUE - it is possible to just copy value of 'from' to 'to'. + @retval FALSE - conversion is needed +*/ + +bool memcpy_field_possible(Field *to,Field *from) +{ + const enum_field_types to_real_type= to->real_type(); + const enum_field_types from_real_type= from->real_type(); + const enum_field_types to_type= from->type(); + return (to_real_type == from_real_type && + !(to->flags & BLOB_FLAG && to->table->copy_blobs) && + to->pack_length() == from->pack_length() && + !(to->flags & UNSIGNED_FLAG && !(from->flags & UNSIGNED_FLAG)) && + to->decimals() == from->decimals() && + to_real_type != MYSQL_TYPE_ENUM && + to_real_type != MYSQL_TYPE_SET && + to_real_type != MYSQL_TYPE_BIT && + (to_real_type != MYSQL_TYPE_NEWDECIMAL || + to->field_length == from->field_length) && + from->charset() == to->charset() && + (!sql_mode_for_dates(to->table->in_use) || + (to_type != MYSQL_TYPE_DATE && + to_type != MYSQL_TYPE_DATETIME)) && + (from_real_type != MYSQL_TYPE_VARCHAR || + ((Field_varstring*)from)->length_bytes == + ((Field_varstring*)to)->length_bytes)); +} + /** Simple quick field convert that is called on insert. */ int field_conv(Field *to,Field *from) { - if (to->real_type() == from->real_type() && - !(to->flags & BLOB_FLAG && to->table->copy_blobs)) - { - if (to->pack_length() == from->pack_length() && - !(to->flags & UNSIGNED_FLAG && !(from->flags & UNSIGNED_FLAG)) && - to->decimals() == from->decimals() && - to->real_type() != MYSQL_TYPE_ENUM && - to->real_type() != MYSQL_TYPE_SET && - to->real_type() != MYSQL_TYPE_BIT && - (to->real_type() != MYSQL_TYPE_NEWDECIMAL || - to->field_length == from->field_length) && - from->charset() == to->charset() && - (!sql_mode_for_dates(to->table->in_use) || - (to->type() != MYSQL_TYPE_DATE && - to->type() != MYSQL_TYPE_DATETIME)) && - (from->real_type() != MYSQL_TYPE_VARCHAR || - ((Field_varstring*)from)->length_bytes == - ((Field_varstring*)to)->length_bytes)) - { // Identical fields - /* - This may happen if one does 'UPDATE ... SET x=x' - The test is here mostly for valgrind, but can also be relevant - if memcpy() is implemented with prefetch-write - */ - if (to->ptr != from->ptr) - memcpy(to->ptr,from->ptr,to->pack_length()); - return 0; - } + if (memcpy_field_possible(to, from)) + { // Identical fields + /* + This may happen if one does 'UPDATE ... SET x=x' + The test is here mostly for valgrind, but can also be relevant + if memcpy() is implemented with prefetch-write + */ + if (to->ptr != from->ptr) + memcpy(to->ptr, from->ptr, to->pack_length()); + return 0; } + return field_conv_incompatible(to, from); +} + + +/** + Copy value of the field with conversion. + + @note Impossibility of simple copy should be checked before this call. + + @param to The field to copy to + @param from The field to copy from + + @retval TRUE ERROR + @retval FALSE OK +*/ + +int field_conv_incompatible(Field *to, Field *from) +{ + const enum_field_types to_real_type= to->real_type(); + const enum_field_types from_real_type= from->real_type(); if (to->flags & BLOB_FLAG) { // Be sure the value is stored Field_blob *blob=(Field_blob*) to; @@ -867,21 +903,22 @@ int field_conv(Field *to,Field *from) */ if (to->table->copy_blobs || (!blob->value.is_alloced() && - from->real_type() != MYSQL_TYPE_STRING && - from->real_type() != MYSQL_TYPE_VARCHAR)) + from_real_type != MYSQL_TYPE_STRING && + from_real_type != MYSQL_TYPE_VARCHAR)) blob->value.copy(); return blob->store(blob->value.ptr(),blob->value.length(),from->charset()); } - if (from->real_type() == MYSQL_TYPE_ENUM && - to->real_type() == MYSQL_TYPE_ENUM && + if (from_real_type == MYSQL_TYPE_ENUM && + to_real_type == MYSQL_TYPE_ENUM && from->val_int() == 0) { ((Field_enum *)(to))->store_type(0); return 0; } - if (from->result_type() == REAL_RESULT) + Item_result from_result_type= from->result_type(); + if (from_result_type == REAL_RESULT) return to->store(from->val_real()); - if (from->result_type() == DECIMAL_RESULT) + if (from_result_type == DECIMAL_RESULT) { my_decimal buff; return to->store_decimal(from->val_decimal(&buff)); @@ -894,10 +931,10 @@ int field_conv(Field *to,Field *from) else return to->store_time_dec(<ime, from->decimals()); } - if ((from->result_type() == STRING_RESULT && + if ((from_result_type == STRING_RESULT && (to->result_type() == STRING_RESULT || - (from->real_type() != MYSQL_TYPE_ENUM && - from->real_type() != MYSQL_TYPE_SET))) || + (from_real_type != MYSQL_TYPE_ENUM && + from_real_type != MYSQL_TYPE_SET))) || to->type() == MYSQL_TYPE_DECIMAL) { char buff[MAX_FIELD_WIDTH]; diff --git a/sql/item.cc b/sql/item.cc index bcba505a265..7901f1186d8 100644 --- a/sql/item.cc +++ b/sql/item.cc @@ -5919,13 +5919,51 @@ static int save_field_in_field(Field *from, bool *null_value, } +static int memcpy_field_value(Field *to, Field *from) +{ + if (to->ptr != from->ptr) + memcpy(to->ptr,from->ptr, to->pack_length()); + return 0; +} + +fast_field_copier Item_field::setup_fast_field_copier(Field *to) +{ + DBUG_ENTER("Item_field::setup_fast_field_copier"); + DBUG_RETURN(memcpy_field_possible(to, field) ? + &memcpy_field_value : + &field_conv_incompatible); +} + + /** Set a field's value from a item. */ -void Item_field::save_org_in_field(Field *to) +void Item_field::save_org_in_field(Field *to, + fast_field_copier fast_field_copier_func) { - save_field_in_field(field, &null_value, to, TRUE); + DBUG_ENTER("Item_field::save_org_in_field"); + DBUG_PRINT("enter", ("setup: 0x%lx data: 0x%lx", + (ulong) to, (ulong) fast_field_copier_func)); + if (fast_field_copier_func) + { + if (field->is_null()) + { + null_value= TRUE; + set_field_to_null_with_conversions(to, TRUE); + DBUG_VOID_RETURN; + } + to->set_notnull(); + if (to == field) + { + null_value= 0; + DBUG_VOID_RETURN; + } + (*fast_field_copier_func)(to, field); + } + else + save_field_in_field(field, &null_value, to, TRUE); + DBUG_VOID_RETURN; } @@ -7446,9 +7484,9 @@ int Item_ref::save_in_field(Field *to, bool no_conversions) } -void Item_ref::save_org_in_field(Field *field) +void Item_ref::save_org_in_field(Field *field, fast_field_copier optimizer_data) { - (*ref)->save_org_in_field(field); + (*ref)->save_org_in_field(field, optimizer_data); } diff --git a/sql/item.h b/sql/item.h index f96ae41ef46..1faed26e1ee 100644 --- a/sql/item.h +++ b/sql/item.h @@ -709,8 +709,12 @@ public: /* Function returns 1 on overflow and -1 on fatal errors */ int save_in_field_no_warnings(Field *field, bool no_conversions); virtual int save_in_field(Field *field, bool no_conversions); - virtual void save_org_in_field(Field *field) + virtual void save_org_in_field(Field *field, + fast_field_copier data + __attribute__ ((__unused__))) { (void) save_in_field(field, 1); } + virtual fast_field_copier setup_fast_field_copier(Field *field) + { return NULL; } virtual int save_safe_in_field(Field *field) { return save_in_field(field, 1); } virtual bool send(Protocol *protocol, String *str); @@ -946,7 +950,7 @@ public: save_val() is method of val_* family which stores value in the given field. */ - virtual void save_val(Field *to) { save_org_in_field(to); } + virtual void save_val(Field *to) { save_org_in_field(to, NULL); } /* save_result() is method of val*result() family which stores value in the given field. @@ -2070,7 +2074,8 @@ public: void fix_after_pullout(st_select_lex *new_parent, Item **ref); void make_field(Send_field *tmp_field); int save_in_field(Field *field,bool no_conversions); - void save_org_in_field(Field *field); + void save_org_in_field(Field *field, fast_field_copier optimizer_data); + fast_field_copier setup_fast_field_copier(Field *field); table_map used_tables() const; table_map all_used_tables() const; enum Item_result result_type () const @@ -3103,7 +3108,9 @@ public: bool fix_fields(THD *, Item **); void fix_after_pullout(st_select_lex *new_parent, Item **ref); int save_in_field(Field *field, bool no_conversions); - void save_org_in_field(Field *field); + void save_org_in_field(Field *field, fast_field_copier optimizer_data); + fast_field_copier setup_fast_field_copier(Field *field) + { return (*ref)->setup_fast_field_copier(field); } enum Item_result result_type () const { return (*ref)->result_type(); } enum_field_types field_type() const { return (*ref)->field_type(); } Field *get_tmp_table_field() @@ -3332,7 +3339,8 @@ public: bool is_null(); bool get_date(MYSQL_TIME *ltime, ulonglong fuzzydate); bool send(Protocol *protocol, String *buffer); - void save_org_in_field(Field *field) + void save_org_in_field(Field *field, + fast_field_copier data __attribute__ ((__unused__))) { save_val(field); } @@ -3529,7 +3537,8 @@ public: return Item_direct_ref::get_date(ltime, fuzzydate); } bool send(Protocol *protocol, String *buffer); - void save_org_in_field(Field *field) + void save_org_in_field(Field *field, + fast_field_copier data __attribute__ ((__unused__))) { if (check_null_ref()) field->set_null(); @@ -3596,7 +3605,7 @@ public: {} void save_in_result_field(bool no_conversions) { - outer_ref->save_org_in_field(result_field); + outer_ref->save_org_in_field(result_field, NULL); } bool fix_fields(THD *, Item **); void fix_after_pullout(st_select_lex *new_parent, Item **ref); diff --git a/sql/item_create.cc b/sql/item_create.cc index 8466319b66f..a3e0dc6012b 100644 --- a/sql/item_create.cc +++ b/sql/item_create.cc @@ -3197,6 +3197,13 @@ Create_func_binlog_gtid_pos Create_func_binlog_gtid_pos::s_singleton; Item* Create_func_binlog_gtid_pos::create_2_arg(THD *thd, Item *arg1, Item *arg2) { +#ifdef HAVE_REPLICATION + if (!mysql_bin_log.is_open()) +#endif + { + my_error(ER_NO_BINARY_LOGGING, MYF(0)); + return NULL; + } thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION); return new (thd->mem_root) Item_func_binlog_gtid_pos(arg1, arg2); } diff --git a/sql/item_func.h b/sql/item_func.h index aaba24a5841..c1a92573eec 100644 --- a/sql/item_func.h +++ b/sql/item_func.h @@ -1740,7 +1740,9 @@ public: { return save_in_field(field, no_conversions, 1); } - void save_org_in_field(Field *field) { (void)save_in_field(field, 1, 0); } + void save_org_in_field(Field *field, + fast_field_copier data __attribute__ ((__unused__))) + { (void)save_in_field(field, 1, 0); } bool register_field_in_read_map(uchar *arg); bool register_field_in_bitmap(uchar *arg); bool set_entry(THD *thd, bool create_if_not_exists); diff --git a/sql/log.cc b/sql/log.cc index b5b3e1061f7..ebfbba953fa 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6710,13 +6710,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) */ wfc= orig_entry->thd->wait_for_commit_ptr; orig_entry->queued_by_other= false; - if (wfc && wfc->waiting_for_commit) + if (wfc && wfc->waitee) { mysql_mutex_lock(&wfc->LOCK_wait_commit); /* Do an extra check here, this time safely under lock. */ - if (wfc->waiting_for_commit) + if (wfc->waitee) { PSI_stage_info old_stage; + wait_for_commit *loc_waitee; + /* By setting wfc->opaque_pointer to our own entry, we mark that we are ready to commit, but waiting for another transaction to commit before @@ -6727,21 +6729,20 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) queued_by_other flag is set. */ wfc->opaque_pointer= orig_entry; + DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); orig_entry->thd->ENTER_COND(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit, &stage_waiting_for_prior_transaction_to_commit, &old_stage); - DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); - while (wfc->waiting_for_commit && !orig_entry->thd->check_killed()) + while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed()) mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); wfc->opaque_pointer= NULL; DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", orig_entry->queued_by_other)); - if (wfc->waiting_for_commit) + if (loc_waitee) { /* Wait terminated due to kill. */ - wait_for_commit *loc_waitee= wfc->waitee; mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); if (loc_waitee->wakeup_subsequent_commits_running || orig_entry->queued_by_other) @@ -6751,13 +6752,14 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) do { mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); - } while (wfc->waiting_for_commit); + } while (wfc->waitee); } else { /* We were killed, so remove us from the list of waitee. */ wfc->remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + wfc->waitee= NULL; orig_entry->thd->EXIT_COND(&old_stage); /* Interrupted by kill. */ @@ -6773,12 +6775,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) } else mysql_mutex_unlock(&wfc->LOCK_wait_commit); - - if (wfc->wakeup_error) - { - my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); - DBUG_RETURN(-1); - } + } + if (wfc && wfc->wakeup_error) + { + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + DBUG_RETURN(-1); } /* @@ -9110,7 +9111,7 @@ start_binlog_background_thread() array_elements(all_binlog_threads)); #endif - if (mysql_thread_create(key_thread_binlog, &th, NULL, + if (mysql_thread_create(key_thread_binlog, &th, &connection_attrib, binlog_background_thread, NULL)) return 1; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index d95dc0e0d87..36d0edee660 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -549,6 +549,7 @@ ulong rpl_recovery_rank=0; ulong stored_program_cache_size= 0; ulong opt_slave_parallel_threads= 0; +ulong opt_slave_domain_parallel_threads= 0; ulong opt_binlog_commit_wait_count= 0; ulong opt_binlog_commit_wait_usec= 0; ulong opt_slave_parallel_max_queued= 131072; @@ -982,8 +983,10 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, key_COND_wait_commit; PSI_cond_key key_RELAYLOG_COND_queue_busy; PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; -PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool, - key_COND_parallel_entry, key_COND_prepare_ordered; +PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread, + key_COND_rpl_thread_pool, + key_COND_parallel_entry, key_COND_group_commit_orderer, + key_COND_prepare_ordered; PSI_cond_key key_COND_wait_gtid; static PSI_cond_info all_server_conds[]= @@ -1027,8 +1030,10 @@ static PSI_cond_info all_server_conds[]= { &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}, { &key_COND_rpl_thread, "COND_rpl_thread", 0}, + { &key_COND_rpl_thread_queue, "COND_rpl_thread_queue", 0}, { &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0}, { &key_COND_parallel_entry, "COND_parallel_entry", 0}, + { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0}, { &key_COND_prepare_ordered, "COND_prepare_ordered", 0}, { &key_COND_wait_gtid, "COND_wait_gtid", 0} }; @@ -8139,7 +8144,7 @@ static int mysql_init_variables(void) my_atomic_rwlock_init(&thread_running_lock); my_atomic_rwlock_init(&thread_count_lock); my_atomic_rwlock_init(&statistics_lock); - my_atomic_rwlock_init(slave_executed_entries_lock); + my_atomic_rwlock_init(&slave_executed_entries_lock); strmov(server_version, MYSQL_SERVER_VERSION); threads.empty(); thread_cache.empty(); @@ -9436,7 +9441,7 @@ PSI_stage_info stage_binlog_waiting_background_tasks= { 0, "Waiting for backgrou PSI_stage_info stage_binlog_processing_checkpoint_notify= { 0, "Processing binlog checkpoint notification", 0}; PSI_stage_info stage_binlog_stopping_background_thread= { 0, "Stopping binlog background thread", 0}; PSI_stage_info stage_waiting_for_work_from_sql_thread= { 0, "Waiting for work from SQL thread", 0}; -PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to commit", 0}; +PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0}; PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0}; PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0}; PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0}; diff --git a/sql/mysqld.h b/sql/mysqld.h index 75b4c3ba6c4..0d4b23b12e7 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -180,6 +180,7 @@ extern ulong opt_binlog_rows_event_max_size; extern ulong rpl_recovery_rank, thread_cache_size; extern ulong stored_program_cache_size; extern ulong opt_slave_parallel_threads; +extern ulong opt_slave_domain_parallel_threads; extern ulong opt_slave_parallel_max_queued; extern ulong opt_binlog_commit_wait_count; extern ulong opt_binlog_commit_wait_usec; @@ -295,8 +296,9 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready, key_COND_wait_commit; extern PSI_cond_key key_RELAYLOG_COND_queue_busy; extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; -extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool, - key_COND_parallel_entry; +extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue, + key_COND_rpl_thread_pool, + key_COND_parallel_entry, key_COND_group_commit_orderer; extern PSI_cond_key key_COND_wait_gtid; extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 7e9a4aa6239..5947fb70330 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -93,32 +93,12 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) } -static bool -sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) -{ - if (!rgi->rli->abort_slave && !abort_loop) - return false; - - /* - Do not abort in the middle of an event group that cannot be rolled back. - */ - if ((thd->transaction.all.modified_non_trans_table || - (thd->variables.option_bits & OPTION_KEEP_LOG)) - && in_event_group) - return false; - /* ToDo: should we add some timeout like in sql_slave_killed? - if (rgi->last_event_start_time == 0) - rgi->last_event_start_time= my_time(0); - */ - - return true; -} - - static void finish_event_group(THD *thd, int err, uint64 sub_id, - rpl_parallel_entry *entry, wait_for_commit *wfc) + rpl_parallel_entry *entry, rpl_group_info *rgi) { + wait_for_commit *wfc= &rgi->commit_orderer; + /* Remove any left-over registration to wait for a prior commit to complete. Normally, such wait would already have been removed at @@ -163,12 +143,26 @@ finish_event_group(THD *thd, int err, uint64 sub_id, */ mysql_mutex_lock(&entry->LOCK_parallel_entry); if (entry->last_committed_sub_id < sub_id) - { entry->last_committed_sub_id= sub_id; - mysql_cond_broadcast(&entry->COND_parallel_entry); - } + + /* + If this event group got error, then any following event groups that have + not yet started should just skip their group, preparing for stop of the + SQL driver thread. + */ + if (unlikely(rgi->is_error) && + entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) + entry->stop_on_error_sub_id= sub_id; + /* + We need to mark that this event group started its commit phase, in case we + missed it before (otherwise we would deadlock the next event group that is + waiting for this). In most cases (normal DML), it will be a no-op. + */ + rgi->mark_start_commit_no_lock(); mysql_mutex_unlock(&entry->LOCK_parallel_entry); + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); wfc->wakeup_subsequent_commits(err); } @@ -185,6 +179,20 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) } +static void +unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond, + PSI_stage_info *old_stage) +{ + if (*did_enter_cond) + { + thd->EXIT_COND(old_stage); + *did_enter_cond= false; + } + else + mysql_mutex_unlock(lock); +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -193,8 +201,14 @@ handle_rpl_parallel_thread(void *arg) struct rpl_parallel_thread::queued_event *events; bool group_standalone= true; bool in_event_group= false; + bool group_skip_for_stop= false; rpl_group_info *group_rgi= NULL; + group_commit_orderer *gco, *tmp_gco; uint64 event_gtid_sub_id= 0; + rpl_parallel_thread::queued_event *qevs_to_free; + rpl_group_info *rgis_to_free; + group_commit_orderer *gcos_to_free; + size_t total_event_size; int err; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -234,43 +248,61 @@ handle_rpl_parallel_thread(void *arg) rpt->running= true; mysql_cond_signal(&rpt->COND_rpl_thread); - while (!rpt->stop && !thd->killed) + while (!rpt->stop) { - rpl_parallel_thread *list; - thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, &stage_waiting_for_work_from_sql_thread, &old_stage); - while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed && - !(rpt->current_entry && rpt->current_entry->force_abort)) + /* + There are 4 cases that should cause us to wake up: + - Events have been queued for us to handle. + - We have an owner, but no events and not inside event group -> we need + to release ourself to the thread pool + - SQL thread is stopping, and we have an owner but no events, and we are + inside an event group; no more events will be queued to us, so we need + to abort the group (force_abort==1). + - Thread pool shutdown (rpt->stop==1). + */ + while (!( (events= rpt->event_queue) || + (rpt->current_owner && !in_event_group) || + (rpt->current_owner && group_rgi->parallel_entry->force_abort) || + rpt->stop)) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); - rpt->dequeue(events); + rpt->dequeue1(events); thd->EXIT_COND(&old_stage); - mysql_cond_signal(&rpt->COND_rpl_thread); more_events: + qevs_to_free= NULL; + rgis_to_free= NULL; + gcos_to_free= NULL; + total_event_size= 0; while (events) { struct rpl_parallel_thread::queued_event *next= events->next; Log_event_type event_type; rpl_group_info *rgi= events->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; - uint64 wait_for_sub_id; - uint64 wait_start_sub_id; - bool end_of_group; + bool end_of_group, group_ending; + total_event_size+= events->event_size; if (!events->ev) { handle_queued_pos_update(thd, events); - my_free(events); + events->next= qevs_to_free; + qevs_to_free= events; events= next; continue; } err= 0; group_rgi= rgi; + gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ if ((event_type= events->ev->get_type_code()) == GTID_EVENT) { + bool did_enter_cond= false; + PSI_stage_info old_stage; + uint64 wait_count; + in_event_group= true; /* If the standalone flag is set, then this event group consists of a @@ -292,50 +324,87 @@ handle_rpl_parallel_thread(void *arg) occured. Also do not start parallel execution of this event group until all - prior groups have committed that are not safe to run in parallel with. + prior groups have reached the commit phase that are not safe to run + in parallel with. */ - wait_for_sub_id= rgi->wait_commit_sub_id; - wait_start_sub_id= rgi->wait_start_sub_id; - if (wait_for_sub_id || wait_start_sub_id) + mysql_mutex_lock(&entry->LOCK_parallel_entry); + if (!gco->installed) { - bool did_enter_cond= false; - PSI_stage_info old_stage; - - mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (wait_start_sub_id) + if (gco->prev_gco) + gco->prev_gco->next_gco= gco; + gco->installed= true; + } + wait_count= gco->wait_count; + if (wait_count > entry->count_committing_event_groups) + { + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); + thd->ENTER_COND(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry, + &stage_waiting_for_prior_transaction_to_commit, + &old_stage); + did_enter_cond= true; + do { - thd->ENTER_COND(&entry->COND_parallel_entry, - &entry->LOCK_parallel_entry, - &stage_waiting_for_prior_transaction_to_commit, - &old_stage); - did_enter_cond= true; - DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); - while (wait_start_sub_id > entry->last_committed_sub_id && - !thd->check_killed()) - mysql_cond_wait(&entry->COND_parallel_entry, - &entry->LOCK_parallel_entry); - if (wait_start_sub_id > entry->last_committed_sub_id) + if (thd->check_killed() && !rgi->is_error) { - /* The thread got a kill signal. */ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); thd->send_kill_message(); slave_output_error_info(rgi->rli, thd); signal_error_to_sql_driver_thread(thd, rgi); + /* + Even though we were killed, we need to continue waiting for the + prior event groups to signal that we can continue. Otherwise we + mess up the accounting for ordering. However, now that we have + marked the error, events will just be skipped rather than + executed, and things will progress quickly towards stop. + */ } - rgi->wait_start_sub_id= 0; /* No need to check again. */ - } - if (wait_for_sub_id > entry->last_committed_sub_id) - { - wait_for_commit *waitee= - &rgi->wait_commit_group_info->commit_orderer; - rgi->commit_orderer.register_wait_for_prior_commit(waitee); - } - if (did_enter_cond) - thd->EXIT_COND(&old_stage); - else - mysql_mutex_unlock(&entry->LOCK_parallel_entry); + mysql_cond_wait(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry); + } while (wait_count > entry->count_committing_event_groups); } + if ((tmp_gco= gco->prev_gco)) + { + /* + Now all the event groups in the previous batch have entered their + commit phase, and will no longer access their gco. So we can free + it here. + */ + DBUG_ASSERT(!tmp_gco->prev_gco); + gco->prev_gco= NULL; + tmp_gco->next_gco= gcos_to_free; + gcos_to_free= tmp_gco; + } + + if (entry->force_abort && wait_count > entry->stop_count) + { + /* + We are stopping (STOP SLAVE), and this event group is beyond the + point where we can safely stop. So set a flag that will cause us + to skip, rather than execute, the following events. + */ + group_skip_for_stop= true; + } + else + group_skip_for_stop= false; + + if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) + group_skip_for_stop= true; + else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) + { + /* + Register that the commit of this event group must wait for the + commit of the previous event group to complete before it may + complete itself, so that we preserve commit order. + */ + wait_for_commit *waitee= + &rgi->wait_commit_group_info->commit_orderer; + rgi->commit_orderer.register_wait_for_prior_commit(waitee); + } + unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, + &did_enter_cond, &old_stage); + if(thd->wait_for_commit_ptr) { /* @@ -352,13 +421,23 @@ handle_rpl_parallel_thread(void *arg) thd->wait_for_commit_ptr= &rgi->commit_orderer; } + group_ending= event_type == XID_EVENT || + (event_type == QUERY_EVENT && + (((Query_log_event *)events->ev)->is_commit() || + ((Query_log_event *)events->ev)->is_rollback())); + if (group_ending) + { + DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); + rgi->mark_start_commit(); + } + /* If the SQL thread is stopping, we just skip execution of all the following event groups. We still do all the normal waiting and wakeup processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group)) + if (!rgi->is_error && !group_skip_for_stop) err= rpt_handle_event(events, rpt); else err= thd->wait_for_prior_commit(); @@ -366,13 +445,11 @@ handle_rpl_parallel_thread(void *arg) end_of_group= in_event_group && ((group_standalone && !Log_event::is_part_of_group(event_type)) || - event_type == XID_EVENT || - (event_type == QUERY_EVENT && - (((Query_log_event *)events->ev)->is_commit() || - ((Query_log_event *)events->ev)->is_rollback()))); + group_ending); delete_or_keep_event_post_apply(rgi, event_type, events->ev); - my_free(events); + events->next= qevs_to_free; + qevs_to_free= events; if (err) { @@ -382,10 +459,11 @@ handle_rpl_parallel_thread(void *arg) if (end_of_group) { in_event_group= false; - finish_event_group(thd, err, event_gtid_sub_id, entry, - &rgi->commit_orderer); - delete rgi; + finish_event_group(thd, err, event_gtid_sub_id, entry, rgi); + rgi->next= rgis_to_free; + rgis_to_free= rgi; group_rgi= rgi= NULL; + group_skip_for_stop= false; DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } @@ -393,6 +471,29 @@ handle_rpl_parallel_thread(void *arg) } mysql_mutex_lock(&rpt->LOCK_rpl_thread); + /* Signal that our queue can now accept more events. */ + rpt->dequeue2(total_event_size); + mysql_cond_signal(&rpt->COND_rpl_thread_queue); + /* We need to delay the free here, to when we have the lock. */ + while (gcos_to_free) + { + group_commit_orderer *next= gcos_to_free->next_gco; + rpt->free_gco(gcos_to_free); + gcos_to_free= next; + } + while (rgis_to_free) + { + rpl_group_info *next= rgis_to_free->next; + rpt->free_rgi(rgis_to_free); + rgis_to_free= next; + } + while (qevs_to_free) + { + rpl_parallel_thread::queued_event *next= qevs_to_free->next; + rpt->free_qev(qevs_to_free); + qevs_to_free= next; + } + if ((events= rpt->event_queue) != NULL) { /* @@ -400,9 +501,8 @@ handle_rpl_parallel_thread(void *arg) This is faster than having to wakeup the pool manager thread to give us a new event. */ - rpt->dequeue(events); + rpt->dequeue1(events); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - mysql_cond_signal(&rpt->COND_rpl_thread); goto more_events; } @@ -417,27 +517,26 @@ handle_rpl_parallel_thread(void *arg) half-processed event group. */ mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + thd->wait_for_prior_commit(); finish_event_group(thd, 1, group_rgi->gtid_sub_id, - group_rgi->parallel_entry, &group_rgi->commit_orderer); + group_rgi->parallel_entry, group_rgi); signal_error_to_sql_driver_thread(thd, group_rgi); in_event_group= false; - delete group_rgi; - group_rgi= NULL; mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->free_rgi(group_rgi); + group_rgi= NULL; + group_skip_for_stop= false; } if (!in_event_group) { + rpt->current_owner= NULL; + /* Tell wait_for_done() that we are done, if it is waiting. */ + if (likely(rpt->current_entry) && + unlikely(rpt->current_entry->force_abort)) + mysql_cond_broadcast(&rpt->current_entry->COND_parallel_entry); rpt->current_entry= NULL; if (!rpt->stop) - { - mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool); - list= rpt->pool->free_list; - rpt->next= list; - rpt->pool->free_list= rpt; - if (!list) - mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool); - mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool); - } + rpt->pool->release_thread(rpt); } } @@ -466,6 +565,15 @@ handle_rpl_parallel_thread(void *arg) } +static void +dealloc_gco(group_commit_orderer *gco) +{ + DBUG_ASSERT(!gco->prev_gco /* Must only free after dealloc previous */); + mysql_cond_destroy(&gco->COND_group_commit_orderer); + my_free(gco); +} + + int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, uint32 new_count, bool skip_check) @@ -500,8 +608,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread, MY_MUTEX_INIT_SLOW); mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL); + mysql_cond_init(key_COND_rpl_thread_queue, + &new_list[i]->COND_rpl_thread_queue, NULL); new_list[i]->pool= pool; - if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL, + if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib, handle_rpl_parallel_thread, new_list[i])) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); @@ -538,7 +648,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, */ for (i= 0; i < pool->count; ++i) { - rpl_parallel_thread *rpt= pool->get_thread(NULL); + rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL); rpt->stop= true; mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); @@ -553,6 +663,24 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_destroy(&rpt->LOCK_rpl_thread); mysql_cond_destroy(&rpt->COND_rpl_thread); + while (rpt->qev_free_list) + { + rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next; + my_free(rpt->qev_free_list); + rpt->qev_free_list= next; + } + while (rpt->rgi_free_list) + { + rpl_group_info *next= rpt->rgi_free_list->next; + delete rpt->rgi_free_list; + rpt->rgi_free_list= next; + } + while (rpt->gco_free_list) + { + group_commit_orderer *next= rpt->gco_free_list->next_gco; + dealloc_gco(rpt->gco_free_list); + rpt->gco_free_list= next; + } } my_free(pool->threads); @@ -608,6 +736,121 @@ err: } +rpl_parallel_thread::queued_event * +rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli) +{ + queued_event *qev; + mysql_mutex_assert_owner(&LOCK_rpl_thread); + if ((qev= qev_free_list)) + qev_free_list= qev->next; + else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); + return NULL; + } + qev->ev= ev; + qev->event_size= event_size; + qev->next= NULL; + strcpy(qev->event_relay_log_name, rli->event_relay_log_name); + qev->event_relay_log_pos= rli->event_relay_log_pos; + qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; + strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); + return qev; +} + + +void +rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) +{ + mysql_mutex_assert_owner(&LOCK_rpl_thread); + qev->next= qev_free_list; + qev_free_list= qev; +} + + +rpl_group_info* +rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, + rpl_parallel_entry *e) +{ + rpl_group_info *rgi; + mysql_mutex_assert_owner(&LOCK_rpl_thread); + if ((rgi= rgi_free_list)) + { + rgi_free_list= rgi->next; + rgi->reinit(rli); + } + else + { + if(!(rgi= new rpl_group_info(rli))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi)); + return NULL; + } + rgi->is_parallel_exec = true; + if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) + rgi->deferred_events= new Deferred_log_events(rli); + } + if (event_group_new_gtid(rgi, gtid_ev)) + { + free_rgi(rgi); + my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + return NULL; + } + rgi->parallel_entry= e; + + return rgi; +} + + +void +rpl_parallel_thread::free_rgi(rpl_group_info *rgi) +{ + mysql_mutex_assert_owner(&LOCK_rpl_thread); + DBUG_ASSERT(rgi->commit_orderer.waitee == NULL); + rgi->free_annotate_event(); + if (rgi->deferred_events) + { + delete rgi->deferred_events; + rgi->deferred_events= NULL; + } + rgi->next= rgi_free_list; + rgi_free_list= rgi; +} + + +group_commit_orderer * +rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) +{ + group_commit_orderer *gco; + mysql_mutex_assert_owner(&LOCK_rpl_thread); + if ((gco= gco_free_list)) + gco_free_list= gco->next_gco; + else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco)); + return NULL; + } + mysql_cond_init(key_COND_group_commit_orderer, + &gco->COND_group_commit_orderer, NULL); + gco->wait_count= wait_count; + gco->prev_gco= prev; + gco->next_gco= NULL; + gco->installed= false; + return gco; +} + + +void +rpl_parallel_thread::free_gco(group_commit_orderer *gco) +{ + mysql_mutex_assert_owner(&LOCK_rpl_thread); + DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */); + gco->next_gco= gco_free_list; + gco_free_list= gco; +} + + rpl_parallel_thread_pool::rpl_parallel_thread_pool() : count(0), threads(0), free_list(0), changing(false), inited(false) { @@ -650,7 +893,8 @@ rpl_parallel_thread_pool::destroy() Note that we return with the worker threads's LOCK_rpl_thread mutex locked. */ struct rpl_parallel_thread * -rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) +rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, + rpl_parallel_entry *entry) { rpl_parallel_thread *rpt; @@ -660,16 +904,152 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) free_list= rpt->next; mysql_mutex_unlock(&LOCK_rpl_thread_pool); mysql_mutex_lock(&rpt->LOCK_rpl_thread); + rpt->current_owner= owner; rpt->current_entry= entry; return rpt; } +/* + Release a thread to the thread pool. + The thread should be locked, and should not have any work queued for it. +*/ +void +rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) +{ + rpl_parallel_thread *list; + + mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread); + DBUG_ASSERT(rpt->current_owner == NULL); + mysql_mutex_lock(&LOCK_rpl_thread_pool); + list= free_list; + rpt->next= list; + free_list= rpt; + if (!list) + mysql_cond_broadcast(&COND_rpl_thread_pool); + mysql_mutex_unlock(&LOCK_rpl_thread_pool); +} + + +/* + Obtain a worker thread that we can queue an event to. + + Each invocation allocates a new worker thread, to maximise + parallelism. However, only up to a maximum of + --slave-domain-parallel-threads workers can be occupied by a single + replication domain; after that point, we start re-using worker threads that + are still executing events that were queued earlier for this thread. + + We never queue more than --rpl-parallel-wait-queue_max amount of events + for one worker, to avoid the SQL driver thread using up all memory with + queued events while worker threads are stalling. + + Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread + locked. Exception is if we were killed, in which case NULL is returned. + + The *did_enter_cond flag is set true if we had to wait for a worker thread + to become free (with mysql_cond_wait()). If so, old_stage will also be set, + and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead + of mysql_mutex_unlock. + + If the flag `reuse' is set, the last worker thread will be returned again, + if it is still available. Otherwise a new worker thread is allocated. +*/ +rpl_parallel_thread * +rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond, + PSI_stage_info *old_stage, bool reuse) +{ + uint32 idx; + rpl_parallel_thread *thr; + + idx= rpl_thread_idx; + if (!reuse) + { + ++idx; + if (idx >= rpl_thread_max) + idx= 0; + rpl_thread_idx= idx; + } + thr= rpl_threads[idx]; + if (thr) + { + *did_enter_cond= false; + mysql_mutex_lock(&thr->LOCK_rpl_thread); + for (;;) + { + if (thr->current_owner != &rpl_threads[idx]) + { + /* + The worker thread became idle, and returned to the free list and + possibly was allocated to a different request. So we should allocate + a new worker thread. + */ + unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread, + did_enter_cond, old_stage); + thr= NULL; + break; + } + else if (thr->queued_size <= opt_slave_parallel_max_queued) + { + /* The thread is ready to queue into. */ + break; + } + else if (rli->sql_driver_thd->check_killed()) + { + unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread, + did_enter_cond, old_stage); + my_error(ER_CONNECTION_KILLED, MYF(0)); + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", + { + debug_sync_set_action(rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_killed")); + };); + slave_output_error_info(rli, rli->sql_driver_thd); + return NULL; + } + else + { + /* + We have reached the limit of how much memory we are allowed to use + for queuing events, so wait for the thread to consume some of its + queue. + */ + if (!*did_enter_cond) + { + /* + We need to do the debug_sync before ENTER_COND(). + Because debug_sync changes the thd->mysys_var->current_mutex, + and this can cause THD::awake to use the wrong mutex. + */ + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", + { + debug_sync_set_action(rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_ready")); + };); + rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue, + &thr->LOCK_rpl_thread, + &stage_waiting_for_room_in_worker_thread, + old_stage); + *did_enter_cond= true; + } + mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread); + } + } + } + if (!thr) + rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx], + this); + + return thr; +} + static void free_rpl_parallel_entry(void *element) { rpl_parallel_entry *e= (rpl_parallel_entry *)element; + if (e->current_gco) + dealloc_gco(e->current_gco); mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); my_free(e); @@ -709,10 +1089,22 @@ rpl_parallel::find(uint32 domain_id) (const uchar *)&domain_id, 0))) { /* Allocate a new, empty one. */ - if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e), - MYF(MY_ZEROFILL)))) + ulong count= opt_slave_domain_parallel_threads; + if (count == 0 || count > opt_slave_parallel_threads) + count= opt_slave_parallel_threads; + rpl_parallel_thread **p; + if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), + &e, sizeof(*e), + &p, count*sizeof(*p), + NULL)) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); return NULL; + } + e->rpl_threads= p; + e->rpl_thread_max= count; e->domain_id= domain_id; + e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; if (my_hash_insert(&domain_hash, (uchar *)e)) { my_free(e); @@ -730,10 +1122,11 @@ rpl_parallel::find(uint32 domain_id) void -rpl_parallel::wait_for_done() +rpl_parallel::wait_for_done(THD *thd) { struct rpl_parallel_entry *e; - uint32 i; + rpl_parallel_thread *rpt; + uint32 i, j; /* First signal all workers that they must force quit; no more events will @@ -741,26 +1134,58 @@ rpl_parallel::wait_for_done() */ for (i= 0; i < domain_hash.records; ++i) { - rpl_parallel_thread *rpt; - e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); + mysql_mutex_lock(&e->LOCK_parallel_entry); + /* + We want the worker threads to stop as quickly as is safe. If the slave + SQL threads are behind, we could have significant amount of events + queued for the workers, and we want to stop without waiting for them + all to be applied first. But if any event group has already started + executing in a worker, we want to be sure that all prior event groups + are also executed, so that we stop at a consistent point in the binlog + stream (per replication domain). + + All event groups wait for e->count_committing_event_groups to reach + the value of group_commit_orderer::wait_count before starting to + execute. Thus, at this point we know that any event group with a + strictly larger wait_count are safe to skip, none of them can have + started executing yet. So we set e->stop_count here and use it to + decide in the worker threads whether to continue executing an event + group or whether to skip it, when force_abort is set. + */ e->force_abort= true; - if ((rpt= e->rpl_thread)) + e->stop_count= e->count_committing_event_groups; + mysql_mutex_unlock(&e->LOCK_parallel_entry); + for (j= 0; j < e->rpl_thread_max; ++j) { - mysql_mutex_lock(&rpt->LOCK_rpl_thread); - if (rpt->current_entry == e) - mysql_cond_signal(&rpt->COND_rpl_thread); - mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + if ((rpt= e->rpl_threads[j])) + { + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (rpt->current_owner == &e->rpl_threads[j]) + mysql_cond_signal(&rpt->COND_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } } } + DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger", + { + debug_sync_set_action(thd, + STRING_WITH_LEN("now SIGNAL wait_for_done_waiting")); + };); for (i= 0; i < domain_hash.records; ++i) { e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); - mysql_mutex_lock(&e->LOCK_parallel_entry); - while (e->current_sub_id > e->last_committed_sub_id) - mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); - mysql_mutex_unlock(&e->LOCK_parallel_entry); + for (j= 0; j < e->rpl_thread_max; ++j) + { + if ((rpt= e->rpl_threads[j])) + { + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + while (rpt->current_owner == &e->rpl_threads[j]) + mysql_cond_wait(&e->COND_parallel_entry, &rpt->LOCK_rpl_thread); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + } + } } } @@ -787,6 +1212,21 @@ rpl_parallel::workers_idle() /* + This is used when we get an error during processing in do_event(); + We will not queue any event to the thread, but we still need to wake it up + to be sure that it will be returned to the pool. +*/ +static void +abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread, + did_enter_cond, old_stage); + mysql_cond_signal(&cur_thread->COND_rpl_thread); +} + + +/* do_event() is executed by the sql_driver_thd thread. It's main purpose is to find a thread that can execute the query. @@ -814,6 +1254,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, /* Stop queueing additional event groups once the SQL thread is requested to stop. + + We have to queue any remaining events of any event group that has already + been partially queued, but after that we will just ignore any further + events the SQL driver thread may try to queue, and eventually it will stop. */ if (((typ= ev->get_type_code()) == GTID_EVENT || !(is_group_event= Log_event::is_group_event(typ))) && @@ -822,188 +1266,121 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (sql_thread_stopping) { delete ev; - /* QQ: Need a better comment why we return false here */ + /* + Return false ("no error"); normal stop is not an error, and otherwise the + error has already been recorded. + */ return false; } - if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), - MYF(0)))) + if (typ == GTID_EVENT || unlikely(!current)) + { + uint32 domain_id; + if (likely(typ == GTID_EVENT)) + { + Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? + 0 : gtid_ev->domain_id); + } + else + domain_id= 0; + if (!(e= find(domain_id))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + delete ev; + return true; + } + current= e; + } + else + e= current; + + /* + Find a worker thread to queue the event for. + Prefer a new thread, so we maximise parallelism (at least for the group + commit). But do not exceed a limit of --slave-domain-parallel-threads; + instead re-use a thread that we queued for previously. + */ + cur_thread= + e->choose_thread(rli, &did_enter_cond, &old_stage, typ != GTID_EVENT); + if (!cur_thread) { - my_error(ER_OUT_OF_RESOURCES, MYF(0)); + /* This means we were killed. The error is already signalled. */ + delete ev; + return true; + } + + if (!(qev= cur_thread->get_qev(ev, event_size, rli))) + { + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, &old_stage); delete ev; return true; } - qev->ev= ev; - qev->event_size= event_size; - qev->next= NULL; - strcpy(qev->event_relay_log_name, rli->event_relay_log_name); - qev->event_relay_log_pos= rli->event_relay_log_pos; - qev->future_event_relay_log_pos= rli->future_event_relay_log_pos; - strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name); if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); - uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? - 0 : gtid_ev->domain_id); - if (!(e= find(domain_id)) || - !(rgi= new rpl_group_info(rli)) || - event_group_new_gtid(rgi, gtid_ev)) + if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e))) { - my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); - delete rgi; - my_free(qev); + cur_thread->free_qev(qev); + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, &old_stage); delete ev; return true; } - rgi->is_parallel_exec = true; - if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) - rgi->deferred_events= new Deferred_log_events(rli); - if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && - e->last_commit_id == gtid_ev->commit_id) - { - /* - We are already executing something else in this domain. But the two - event groups were committed together in the same group commit on the - master, so we can still do them in parallel here on the slave. + /* + We queue the event group in a new worker thread, to run in parallel + with previous groups. - However, the commit of this event must wait for the commit of the prior - event, to preserve binlog commit order and visibility across all - servers in the replication hierarchy. + To preserve commit order within the replication domain, we set up + rgi->wait_commit_sub_id to make the new group commit only after the + previous group has committed. - In addition, we must not start executing this event until we have - finished the previous collection of event groups that group-committed - together; we use rgi->wait_start_sub_id to control this. - */ - rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e); - rgi->wait_commit_sub_id= e->current_sub_id; - rgi->wait_commit_group_info= e->current_group_info; - rgi->wait_start_sub_id= e->prev_groupcommit_sub_id; - e->rpl_thread= cur_thread= rpt; - /* get_thread() returns with the LOCK_rpl_thread locked. */ - } - else + Event groups that group-committed together on the master can be run + in parallel with each other without restrictions. But one batch of + group-commits may not start before all groups in the previous batch + have initiated their commit phase; we set up rgi->gco to ensure that. + */ + rgi->wait_commit_sub_id= e->current_sub_id; + rgi->wait_commit_group_info= e->current_group_info; + + if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) && + e->last_commit_id == gtid_ev->commit_id)) { /* - Check if we already have a worker thread for this entry. - - We continue to queue more events up for the worker thread while it is - still executing the first ones, to be able to start executing a large - event group without having to wait for the end to be fetched from the - master. And we continue to queue up more events after the first group, - so that we can continue to process subsequent parts of the relay log in - parallel without having to wait for previous long-running events to - complete. - - But if the worker thread is idle at any point, it may return to the - idle list or start servicing a different request. So check this, and - allocate a new thread if the old one is no longer processing for us. + A new batch of transactions that group-committed together on the master. + + Remember the count that marks the end of the previous group committed + batch, and allocate a new gco. */ - cur_thread= e->rpl_thread; - if (cur_thread) - { - mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); - for (;;) - { - if (cur_thread->current_entry != e) - { - /* - The worker thread became idle, and returned to the free list and - possibly was allocated to a different request. This also means - that everything previously queued has already been executed, - else the worker thread would not have become idle. So we should - allocate a new worker thread. - */ - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - e->rpl_thread= cur_thread= NULL; - break; - } - else if (cur_thread->queued_size <= opt_slave_parallel_max_queued) - break; // The thread is ready to queue into - else if (rli->sql_driver_thd->check_killed()) - { - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - my_error(ER_CONNECTION_KILLED, MYF(0)); - delete rgi; - my_free(qev); - delete ev; - DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", - { - debug_sync_set_action(rli->sql_driver_thd, - STRING_WITH_LEN("now SIGNAL wait_queue_killed")); - };); - slave_output_error_info(rli, rli->sql_driver_thd); - return true; - } - else - { - /* - We have reached the limit of how much memory we are allowed to - use for queuing events, so wait for the thread to consume some - of its queue. - */ - if (!did_enter_cond) - { - rli->sql_driver_thd->ENTER_COND(&cur_thread->COND_rpl_thread, - &cur_thread->LOCK_rpl_thread, - &stage_waiting_for_room_in_worker_thread, &old_stage); - did_enter_cond= true; - DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", - { - debug_sync_set_action(rli->sql_driver_thd, - STRING_WITH_LEN("now SIGNAL wait_queue_ready")); - };); - } - mysql_cond_wait(&cur_thread->COND_rpl_thread, - &cur_thread->LOCK_rpl_thread); - } - } - } + uint64 count= e->count_queued_event_groups; + group_commit_orderer *gco; - if (!cur_thread) - { - /* - Nothing else is currently running in this domain. We can - spawn a new thread to do this event group in parallel with - anything else that might be running in other domains. - */ - cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e); - /* get_thread() returns with the LOCK_rpl_thread locked. */ - } - else + if (!(gco= cur_thread->get_gco(count, e->current_gco))) { - /* - We are still executing the previous event group for this replication - domain, and we have to wait for that to finish before we can start on - the next one. So just re-use the thread. - */ + cur_thread->free_rgi(rgi); + cur_thread->free_qev(qev); + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, &old_stage); + delete ev; + return true; } - - rgi->wait_commit_sub_id= 0; - rgi->wait_start_sub_id= 0; - e->prev_groupcommit_sub_id= e->current_sub_id; + e->current_gco= rgi->gco= gco; } - + else + rgi->gco= e->current_gco; if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) - { - e->last_server_id= gtid_ev->server_id; - e->last_seq_no= gtid_ev->seq_no; e->last_commit_id= gtid_ev->commit_id; - } else - { - e->last_server_id= 0; - e->last_seq_no= 0; e->last_commit_id= 0; - } - qev->rgi= e->current_group_info= rgi; e->current_sub_id= rgi->gtid_sub_id; - current= rgi->parallel_entry= e; + ++e->count_queued_event_groups; } - else if (!is_group_event || !current) + else if (!is_group_event || !e) { my_off_t log_pos; int err; @@ -1013,7 +1390,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, Same for events not preceeded by GTID (we should not see those normally, but they might be from an old master). - The varuable `current' is NULL for the case where the master did not + The variable `e' is NULL for the case where the master did not have GTID, like a MariaDB 5.5 or MySQL master. */ qev->rgi= serial_rgi; @@ -1040,18 +1417,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (err) { - my_free(qev); + cur_thread->free_qev(qev); + abandon_worker_thread(rli->sql_driver_thd, cur_thread, + &did_enter_cond, &old_stage); return true; } - qev->ev= NULL; - qev->future_event_master_log_pos= log_pos; - if (!current) - { - rli->event_relay_log_pos= rli->future_event_relay_log_pos; - handle_queued_pos_update(rli->sql_driver_thd, qev); - my_free(qev); - return false; - } /* Queue an empty event, so that the position will be updated in a reasonable way relative to other events: @@ -1064,40 +1434,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, least the position will not be updated until one of them has reached the current point. */ - cur_thread= current->rpl_thread; - if (cur_thread) - { - mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); - if (cur_thread->current_entry != current) - { - /* Not ours anymore, we need to grab a new one. */ - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - cur_thread= NULL; - } - } - if (!cur_thread) - cur_thread= current->rpl_thread= - global_rpl_thread_pool.get_thread(current); + qev->ev= NULL; + qev->future_event_master_log_pos= log_pos; } else { - cur_thread= current->rpl_thread; - if (cur_thread) - { - mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); - if (cur_thread->current_entry != current) - { - /* Not ours anymore, we need to grab a new one. */ - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); - cur_thread= NULL; - } - } - if (!cur_thread) - { - cur_thread= current->rpl_thread= - global_rpl_thread_pool.get_thread(current); - } - qev->rgi= current->current_group_info; + qev->rgi= e->current_group_info; } /* @@ -1105,10 +1447,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, */ rli->event_relay_log_pos= rli->future_event_relay_log_pos; cur_thread->enqueue(qev); - if (did_enter_cond) - rli->sql_driver_thd->EXIT_COND(&old_stage); - else - mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread, + &did_enter_cond, &old_stage); mysql_cond_signal(&cur_thread->COND_rpl_thread); return false; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 019a354c57d..956a31e4b7f 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -9,16 +9,66 @@ struct rpl_parallel_entry; struct rpl_parallel_thread_pool; class Relay_log_info; + + +/* + Structure used to keep track of the parallel replication of a batch of + event-groups that group-committed together on the master. + + It is used to ensure that every event group in one batch has reached the + commit stage before the next batch starts executing. + + Note the lifetime of this structure: + + - It is allocated when the first event in a new batch of group commits + is queued, from the free list rpl_parallel_entry::gco_free_list. + + - The gco for the batch currently being queued is owned by + rpl_parallel_entry::current_gco. The gco for a previous batch that has + been fully queued is owned by the gco->prev_gco pointer of the gco for + the following batch. + + - The worker thread waits on gco->COND_group_commit_orderer for + rpl_parallel_entry::count_committing_event_groups to reach wait_count + before starting; the first waiter links the gco into the next_gco + pointer of the gco of the previous batch for signalling. + + - When an event group reaches the commit stage, it signals the + COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and + rpl_parallel_entry::count_committing_event_groups has reached + gco->next_gco->wait_count. + + - When gco->wait_count is reached for a worker and the wait completes, + the worker frees gco->prev_gco; at this point it is guaranteed not to + be needed any longer. +*/ +struct group_commit_orderer { + /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */ + mysql_cond_t COND_group_commit_orderer; + uint64 wait_count; + group_commit_orderer *prev_gco; + group_commit_orderer *next_gco; + bool installed; +}; + + struct rpl_parallel_thread { bool delay_start; bool running; bool stop; mysql_mutex_t LOCK_rpl_thread; mysql_cond_t COND_rpl_thread; + mysql_cond_t COND_rpl_thread_queue; struct rpl_parallel_thread *next; /* For free list. */ struct rpl_parallel_thread_pool *pool; THD *thd; - struct rpl_parallel_entry *current_entry; + /* + Who owns the thread, if any (it's a pointer into the + rpl_parallel_entry::rpl_threads array. + */ + struct rpl_parallel_thread **current_owner; + /* The rpl_parallel_entry of the owner. */ + rpl_parallel_entry *current_entry; struct queued_event { queued_event *next; Log_event *ev; @@ -31,6 +81,9 @@ struct rpl_parallel_thread { size_t event_size; } *event_queue, *last_in_queue; uint64 queued_size; + queued_event *qev_free_list; + rpl_group_info *rgi_free_list; + group_commit_orderer *gco_free_list; void enqueue(queued_event *qev) { @@ -42,15 +95,25 @@ struct rpl_parallel_thread { queued_size+= qev->event_size; } - void dequeue(queued_event *list) + void dequeue1(queued_event *list) { - queued_event *tmp; - DBUG_ASSERT(list == event_queue); event_queue= last_in_queue= NULL; - for (tmp= list; tmp; tmp= tmp->next) - queued_size-= tmp->event_size; } + + void dequeue2(size_t dequeue_size) + { + queued_size-= dequeue_size; + } + + queued_event *get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli); + void free_qev(queued_event *qev); + rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, + rpl_parallel_entry *e); + void free_rgi(rpl_group_info *rgi); + group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev); + void free_gco(group_commit_orderer *gco); }; @@ -66,14 +129,16 @@ struct rpl_parallel_thread_pool { rpl_parallel_thread_pool(); int init(uint32 size); void destroy(); - struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry); + struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner, + rpl_parallel_entry *entry); + void release_thread(rpl_parallel_thread *rpt); }; struct rpl_parallel_entry { + mysql_mutex_t LOCK_parallel_entry; + mysql_cond_t COND_parallel_entry; uint32 domain_id; - uint32 last_server_id; - uint64 last_seq_no; uint64 last_commit_id; bool active; /* @@ -82,15 +147,41 @@ struct rpl_parallel_entry { waiting for event groups to complete. */ bool force_abort; + /* + At STOP SLAVE (force_abort=true), we do not want to process all events in + the queue (which could unnecessarily delay stop, if a lot of events happen + to be queued). The stop_count provides a safe point at which to stop, so + that everything before becomes committed and nothing after does. The value + corresponds to group_commit_orderer::wait_count; if wait_count is less than + or equal to stop_count, we execute the associated event group, else we + skip it (and all following) and stop. + */ + uint64 stop_count; - rpl_parallel_thread *rpl_thread; + /* + Cyclic array recording the last rpl_thread_max worker threads that we + queued event for. This is used to limit how many workers a single domain + can occupy (--slave-domain-parallel-threads). + + Note that workers are never explicitly deleted from the array. Instead, + we need to check (under LOCK_rpl_thread) that the thread still belongs + to us before re-using (rpl_thread::current_owner). + */ + rpl_parallel_thread **rpl_threads; + uint32 rpl_thread_max; + uint32 rpl_thread_idx; /* The sub_id of the last transaction to commit within this domain_id. Must be accessed under LOCK_parallel_entry protection. + + Event groups commit in order, so the rpl_group_info for an event group + will be alive (at least) as long as + rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to + safely refer back to previous event groups if they are still executing, + and ignore them if they completed, without requiring explicit + synchronisation between the threads. */ uint64 last_committed_sub_id; - mysql_mutex_t LOCK_parallel_entry; - mysql_cond_t COND_parallel_entry; /* The sub_id of the last event group in this replication domain that was queued for execution by a worker thread. @@ -98,14 +189,29 @@ struct rpl_parallel_entry { uint64 current_sub_id; rpl_group_info *current_group_info; /* - The sub_id of the last event group in the previous batch of group-committed - transactions. - - When we spawn parallel worker threads for the next group-committed batch, - they first need to wait for this sub_id to be committed before it is safe - to start executing them. + If we get an error in some event group, we set the sub_id of that event + group here. Then later event groups (with higher sub_id) can know not to + try to start (event groups that already started will be rolled back when + wait_for_prior_commit() returns error). + The value is ULONGLONG_MAX when no error occured. + */ + uint64 stop_on_error_sub_id; + /* Total count of event groups queued so far. */ + uint64 count_queued_event_groups; + /* + Count of event groups that have started (but not necessarily completed) + the commit phase. We use this to know when every event group in a previous + batch of master group commits have started committing on the slave, so + that it is safe to start executing the events in the following batch. */ - uint64 prev_groupcommit_sub_id; + uint64 count_committing_event_groups; + /* The group_commit_orderer object for the events currently being queued. */ + group_commit_orderer *current_gco; + + rpl_parallel_thread * choose_thread(Relay_log_info *rli, bool *did_enter_cond, + PSI_stage_info *old_stage, bool reuse); + group_commit_orderer *get_gco(); + void free_gco(group_commit_orderer *gco); }; struct rpl_parallel { HASH domain_hash; @@ -116,7 +222,7 @@ struct rpl_parallel { ~rpl_parallel(); void reset(); rpl_parallel_entry *find(uint32 domain_id); - void wait_for_done(); + void wait_for_done(THD *thd); bool workers_idle(); bool do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index c9e8c79d5fc..399852744f8 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1479,14 +1479,27 @@ end: } -rpl_group_info::rpl_group_info(Relay_log_info *rli_) - : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), - wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), - deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), - tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), - is_parallel_exec(false), is_error(false), - row_stmt_start_timestamp(0), long_find_row_note_printed(false) +void +rpl_group_info::reinit(Relay_log_info *rli) +{ + this->rli= rli; + tables_to_lock= NULL; + tables_to_lock_count= 0; + trans_retries= 0; + last_event_start_time= 0; + is_error= false; + row_stmt_start_timestamp= 0; + long_find_row_note_printed= false; + did_mark_start_commit= false; + commit_orderer.reinit(); +} + +rpl_group_info::rpl_group_info(Relay_log_info *rli) + : thd(0), gtid_sub_id(0), wait_commit_sub_id(0), + wait_commit_group_info(0), parallel_entry(0), + deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) { + reinit(rli); bzero(¤t_gtid, sizeof(current_gtid)); mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); @@ -1710,4 +1723,40 @@ void rpl_group_info::slave_close_thread_tables(THD *thd) } + +static void +mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco) +{ + uint64 count= ++e->count_committing_event_groups; + if (gco->next_gco && gco->next_gco->wait_count == count) + mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer); +} + + +void +rpl_group_info::mark_start_commit_no_lock() +{ + if (did_mark_start_commit) + return; + mark_start_commit_inner(parallel_entry, gco); + did_mark_start_commit= true; +} + + +void +rpl_group_info::mark_start_commit() +{ + rpl_parallel_entry *e; + + if (did_mark_start_commit) + return; + + e= this->parallel_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + mark_start_commit_inner(e, gco); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + did_mark_start_commit= true; +} + + #endif diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 3f95849a926..0ba259b0efd 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -481,6 +481,7 @@ private: struct rpl_group_info { + rpl_group_info *next; /* For free list in rpl_parallel_thread */ Relay_log_info *rli; THD *thd; /* @@ -510,14 +511,15 @@ struct rpl_group_info uint64 wait_commit_sub_id; rpl_group_info *wait_commit_group_info; /* - If non-zero, the event group must wait for this sub_id to be committed - before the execution of the event group is allowed to start. + This holds a pointer to a struct that keeps track of the need to wait + for the previous batch of event groups to reach the commit stage, before + this batch can start to execute. (When we execute in parallel the transactions that group committed together on the master, we still need to wait for any prior transactions - to have commtted). + to have reached the commit stage). */ - uint64 wait_start_sub_id; + group_commit_orderer *gco; struct rpl_parallel_entry *parallel_entry; @@ -567,18 +569,22 @@ struct rpl_group_info char future_event_master_log_name[FN_REFLEN]; bool is_parallel_exec; bool is_error; + /* + Set true when we signalled that we reach the commit phase. Used to avoid + counting one event group twice. + */ + bool did_mark_start_commit; -private: /* Runtime state for printing a note when slave is taking too long while processing a row event. */ time_t row_stmt_start_timestamp; bool long_find_row_note_printed; -public: rpl_group_info(Relay_log_info *rli_); ~rpl_group_info(); + void reinit(Relay_log_info *rli); /* Returns true if the argument event resides in the containter; @@ -661,6 +667,8 @@ public: void clear_tables_to_lock(); void cleanup_context(THD *, bool); void slave_close_thread_tables(THD *); + void mark_start_commit_no_lock(); + void mark_start_commit(); time_t get_row_stmt_start_timestamp() { diff --git a/sql/scheduler.h b/sql/scheduler.h index 4e200e86d74..06c17c7b114 100644 --- a/sql/scheduler.h +++ b/sql/scheduler.h @@ -99,7 +99,8 @@ public: void *data; /* scheduler-specific data structure */ }; -#if !defined(EMBEDDED_LIBRARY) +#undef HAVE_POOL_OF_THREADS +#if !defined(EMBEDDED_LIBRARY) && !defined(_AIX) #define HAVE_POOL_OF_THREADS 1 void pool_of_threads_scheduler(scheduler_functions* func, ulong *arg_max_connections, diff --git a/sql/slave.cc b/sql/slave.cc index f74f5ec6267..25480da79a1 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -331,7 +331,7 @@ run_slave_init_thread() pthread_t th; slave_init_thread_running= true; - if (mysql_thread_create(key_thread_slave_init, &th, NULL, + if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib, handle_slave_init, NULL)) { sql_print_error("Failed to create thread while initialising slave"); @@ -4542,7 +4542,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, } if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(); + rli->parallel.wait_for_done(thd); /* Thread stopped. Print the current replication position to the log */ { @@ -4568,7 +4568,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, get the correct position printed.) */ if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(); + rli->parallel.wait_for_done(thd); /* Some events set some playgrounds, which won't be cleared because thread diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 2be1b895f39..8a4a7006c63 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -6118,14 +6118,23 @@ bool THD::rgi_have_temporary_tables() } +void +wait_for_commit::reinit() +{ + subsequent_commits_list= NULL; + next_subsequent_commit= NULL; + waitee= NULL; + opaque_pointer= NULL; + wakeup_error= 0; + wakeup_subsequent_commits_running= false; +} + + wait_for_commit::wait_for_commit() - : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0), - opaque_pointer(0), - waiting_for_commit(false), wakeup_error(0), - wakeup_subsequent_commits_running(false) { mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0); + reinit(); } @@ -6173,7 +6182,7 @@ wait_for_commit::wakeup(int wakeup_error) */ mysql_mutex_lock(&LOCK_wait_commit); - waiting_for_commit= false; + waitee= NULL; this->wakeup_error= wakeup_error; /* Note that it is critical that the mysql_cond_signal() here is done while @@ -6205,9 +6214,8 @@ wait_for_commit::wakeup(int wakeup_error) void wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) { - waiting_for_commit= true; - wakeup_error= 0; DBUG_ASSERT(!this->waitee /* No prior registration allowed */); + wakeup_error= 0; this->waitee= waitee; mysql_mutex_lock(&waitee->LOCK_wait_commit); @@ -6217,7 +6225,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) see comments on wakeup_subsequent_commits2() for details. */ if (waitee->wakeup_subsequent_commits_running) - waiting_for_commit= false; + this->waitee= NULL; else { /* @@ -6247,9 +6255,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) thd->ENTER_COND(&COND_wait_commit, &LOCK_wait_commit, &stage_waiting_for_prior_transaction_to_commit, &old_stage); - while (waiting_for_commit && !thd->check_killed()) + while ((loc_waitee= this->waitee) && !thd->check_killed()) mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); - if (!waiting_for_commit) + if (!loc_waitee) { if (wakeup_error) my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); @@ -6262,7 +6270,6 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) waiter as to whether we succeed or fail (eg. we may roll back but waitee might attempt to commit both us and any subsequent commits waiting for us). */ - loc_waitee= this->waitee; mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); if (loc_waitee->wakeup_subsequent_commits_running) { @@ -6271,21 +6278,29 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) do { mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); - } while (waiting_for_commit); + } while (this->waitee); + if (wakeup_error) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); goto end; } remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + this->waitee= NULL; - DEBUG_SYNC(thd, "wait_for_prior_commit_killed"); wakeup_error= thd->killed_errno(); if (!wakeup_error) wakeup_error= ER_QUERY_INTERRUPTED; my_message(wakeup_error, ER(wakeup_error), MYF(0)); + thd->EXIT_COND(&old_stage); + /* + Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to + use within enter_cond/exit_cond. + */ + DEBUG_SYNC(thd, "wait_for_prior_commit_killed"); + return wakeup_error; end: thd->EXIT_COND(&old_stage); - waitee= NULL; return wakeup_error; } @@ -6368,10 +6383,11 @@ wait_for_commit::wakeup_subsequent_commits2(int wakeup_error) void wait_for_commit::unregister_wait_for_prior_commit2() { + wait_for_commit *loc_waitee; + mysql_mutex_lock(&LOCK_wait_commit); - if (waiting_for_commit) + if ((loc_waitee= this->waitee)) { - wait_for_commit *loc_waitee= this->waitee; mysql_mutex_lock(&loc_waitee->LOCK_wait_commit); if (loc_waitee->wakeup_subsequent_commits_running) { @@ -6383,7 +6399,7 @@ wait_for_commit::unregister_wait_for_prior_commit2() See comments on wakeup_subsequent_commits2() for more details. */ mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); - while (waiting_for_commit) + while (this->waitee) mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); } else @@ -6391,10 +6407,10 @@ wait_for_commit::unregister_wait_for_prior_commit2() /* Remove ourselves from the list in the waitee. */ remove_from_list(&loc_waitee->subsequent_commits_list); mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit); + this->waitee= NULL; } } mysql_mutex_unlock(&LOCK_wait_commit); - this->waitee= NULL; } diff --git a/sql/sql_class.h b/sql/sql_class.h index defc31547d0..ef28d860b04 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1656,8 +1656,8 @@ struct wait_for_commit { /* The LOCK_wait_commit protects the fields subsequent_commits_list and - wakeup_subsequent_commits_running (for a waitee), and the flag - waiting_for_commit and associated COND_wait_commit (for a waiter). + wakeup_subsequent_commits_running (for a waitee), and the pointer + waiterr and associated COND_wait_commit (for a waiter). */ mysql_mutex_t LOCK_wait_commit; mysql_cond_t COND_wait_commit; @@ -1665,7 +1665,13 @@ struct wait_for_commit wait_for_commit *subsequent_commits_list; /* Link field for entries in subsequent_commits_list. */ wait_for_commit *next_subsequent_commit; - /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */ + /* + Our waitee, if we did register_wait_for_prior_commit(), and were not + yet woken up. Else NULL. + + When this is cleared for wakeup, the COND_wait_commit condition is + signalled. + */ wait_for_commit *waitee; /* Generic pointer for use by the transaction coordinator to optimise the @@ -1676,12 +1682,6 @@ struct wait_for_commit used by another transaction coordinator for similar purposes. */ void *opaque_pointer; - /* - The waiting_for_commit flag is cleared when a waiter has been woken - up. The COND_wait_commit condition is signalled when this has been - cleared. - */ - bool waiting_for_commit; /* The wakeup error code from the waitee. 0 means no error. */ int wakeup_error; /* @@ -1697,10 +1697,14 @@ struct wait_for_commit Quick inline check, to avoid function call and locking in the common case where no wakeup is registered, or a registered wait was already signalled. */ - if (waiting_for_commit) + if (waitee) return wait_for_prior_commit2(thd); else + { + if (wakeup_error) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); return wakeup_error; + } } void wakeup_subsequent_commits(int wakeup_error) { @@ -1721,7 +1725,7 @@ struct wait_for_commit } void unregister_wait_for_prior_commit() { - if (waiting_for_commit) + if (waitee) unregister_wait_for_prior_commit2(); } /* @@ -1741,7 +1745,7 @@ struct wait_for_commit } next_ptr_ptr= &cur->next_subsequent_commit; } - waiting_for_commit= false; + waitee= NULL; } void wakeup(int wakeup_error); @@ -1752,6 +1756,7 @@ struct wait_for_commit wait_for_commit(); ~wait_for_commit(); + void reinit(); }; diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 37e9c642dbb..06a48d9157b 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -18800,7 +18800,16 @@ end_update(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)), for (group=table->group ; group ; group=group->next) { Item *item= *group->item; - item->save_org_in_field(group->field); + if (group->fast_field_copier_setup != group->field) + { + DBUG_PRINT("info", ("new setup 0x%lx -> 0x%lx", + (ulong)group->fast_field_copier_setup, + (ulong)group->field)); + group->fast_field_copier_setup= group->field; + group->fast_field_copier_func= + item->setup_fast_field_copier(group->field); + } + item->save_org_in_field(group->field, group->fast_field_copier_func); /* Store in the used key if the field was 0 */ if (item->maybe_null) group->buff[-1]= (char) group->field->is_null(); diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index a9684947ba4..20784ce9301 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1768,6 +1768,49 @@ static Sys_var_ulong Sys_slave_parallel_threads( ON_UPDATE(fix_slave_parallel_threads)); +static bool +check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var) +{ + bool running; + + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + if (running) + return true; + + return false; +} + +static bool +fix_slave_domain_parallel_threads(sys_var *self, THD *thd, enum_var_type type) +{ + bool running; + + mysql_mutex_unlock(&LOCK_global_system_variables); + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + mysql_mutex_lock(&LOCK_global_system_variables); + + return running ? true : false; +} + + +static Sys_var_ulong Sys_slave_domain_parallel_threads( + "slave_domain_parallel_threads", + "Maximum number of parallel threads to use on slave for events in a " + "single replication domain. When using multiple domains, this can be " + "used to limit a single domain from grabbing all threads and thus " + "stalling other domains. The default of 0 means to allow a domain to " + "grab as many threads as it wants, up to the value of " + "slave_parallel_threads.", + GLOBAL_VAR(opt_slave_domain_parallel_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG, ON_CHECK(check_slave_domain_parallel_threads), + ON_UPDATE(fix_slave_domain_parallel_threads)); + + static Sys_var_ulong Sys_slave_parallel_max_queued( "slave_parallel_max_queued", "Limit on how much memory SQL threads should use per parallel " diff --git a/sql/table.h b/sql/table.h index e3290410657..e1e66898a6d 100644 --- a/sql/table.h +++ b/sql/table.h @@ -194,10 +194,20 @@ private: /* Order clause list element */ +typedef int (*fast_field_copier)(Field *to, Field *from); + + typedef struct st_order { struct st_order *next; Item **item; /* Point at item in select fields */ Item *item_ptr; /* Storage for initial item */ + /* + Reference to the function we are trying to optimize copy to + a temporary table + */ + fast_field_copier fast_field_copier_func; + /* Field for which above optimizer function setup */ + Field *fast_field_copier_setup; int counter; /* position in SELECT list, correct only if counter_used is true*/ bool asc; /* true if ascending */ diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc index f0454cfedb0..68c032fb67b 100644 --- a/sql/threadpool_unix.cc +++ b/sql/threadpool_unix.cc @@ -19,6 +19,9 @@ #include <sql_class.h> #include <my_pthread.h> #include <scheduler.h> + +#ifdef HAVE_POOL_OF_THREADS + #include <sql_connect.h> #include <mysqld.h> #include <debug_sync.h> @@ -1678,3 +1681,5 @@ static void print_pool_blocked_message(bool max_threads_reached) msg_written= true; } } + +#endif /* HAVE_POOL_OF_THREADS */ diff --git a/storage/connect/os.h b/storage/connect/os.h index e3d452bf7b8..8e94f4241bb 100644 --- a/storage/connect/os.h +++ b/storage/connect/os.h @@ -9,6 +9,12 @@ typedef off_t off64_t; #define O_LARGEFILE 0 #endif +#ifdef _AIX +#ifndef O_LARGEFILE +#define O_LARGEFILE 0 +#endif +#endif + #if defined(WIN32) typedef __int64 BIGINT; #else // !WIN32 diff --git a/storage/oqgraph/ha_oqgraph.h b/storage/oqgraph/ha_oqgraph.h index 2a998425c27..14490270031 100644 --- a/storage/oqgraph/ha_oqgraph.h +++ b/storage/oqgraph/ha_oqgraph.h @@ -115,6 +115,20 @@ public: virtual const char *table_type() const { return hton_name(ht)->str; } #endif + my_bool register_query_cache_table(THD *thd, char *table_key, + uint key_length, + qc_engine_callback + *engine_callback, + ulonglong *engine_data) + { + /* + Do not put data from OQGRAPH tables into query cache (because there + is no way to tell whether the data in the backing table has changed or + not) + */ + return FALSE; + } + private: int oqgraph_check_table_structure (TABLE *table_arg); diff --git a/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.opt b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.opt new file mode 100644 index 00000000000..a4548161f9b --- /dev/null +++ b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.opt @@ -0,0 +1 @@ +--query_cache_type=ON diff --git a/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.result b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.result new file mode 100644 index 00000000000..efe520e16a5 --- /dev/null +++ b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.result @@ -0,0 +1,41 @@ +DROP TABLE IF EXISTS graph_base; +DROP TABLE IF EXISTS graph; +CREATE TABLE graph_base ( +from_id INT UNSIGNED NOT NULL, +to_id INT UNSIGNED NOT NULL, +PRIMARY KEY (from_id,to_id), +INDEX (to_id) +) ENGINE=MyISAM; +CREATE TABLE graph ( +latch VARCHAR(32) NULL, +origid BIGINT UNSIGNED NULL, +destid BIGINT UNSIGNED NULL, +weight DOUBLE NULL, +seq BIGINT UNSIGNED NULL, +linkid BIGINT UNSIGNED NULL, +KEY (latch, origid, destid) USING HASH, +KEY (latch, destid, origid) USING HASH +) ENGINE=OQGRAPH DATA_TABLE='graph_base' ORIGID='from_id', DESTID='to_id'; +INSERT INTO graph_base(from_id, to_id) VALUES (1,2), (2,1); +SET @query_cache_size.saved = @@query_cache_size; +SET GLOBAL query_cache_size = 1024*1024; +SELECT * FROM graph; +latch origid destid weight seq linkid +NULL 1 2 1 NULL NULL +NULL 2 1 1 NULL NULL +UPDATE graph_base SET to_id = 20 WHERE from_id = 1; +SELECT * FROM graph; +latch origid destid weight seq linkid +NULL 1 20 1 NULL NULL +NULL 2 1 1 NULL NULL +SELECT SQL_NO_CACHE * FROM graph; +latch origid destid weight seq linkid +NULL 1 20 1 NULL NULL +NULL 2 1 1 NULL NULL +SET GLOBAL query_cache_size = 0; +SELECT SQL_NO_CACHE * FROM graph; +latch origid destid weight seq linkid +NULL 1 20 1 NULL NULL +NULL 2 1 1 NULL NULL +DROP TABLE graph_base, graph; +SET GLOBAL query_cache_size = @query_cache_size.saved; diff --git a/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.test b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.test new file mode 100644 index 00000000000..9ba30ee2f76 --- /dev/null +++ b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.test @@ -0,0 +1,48 @@ +# Regression test for https://mariadb.atlassian.net/browse/MDEV-5744 +#--reproduce bug where changes to backing table data are not reflected +# in a graph table due to query caching + +--disable_warnings +DROP TABLE IF EXISTS graph_base; +DROP TABLE IF EXISTS graph; +--enable_warnings + +# Create the backing store +CREATE TABLE graph_base ( + from_id INT UNSIGNED NOT NULL, + to_id INT UNSIGNED NOT NULL, + PRIMARY KEY (from_id,to_id), + INDEX (to_id) + ) ENGINE=MyISAM; + + +CREATE TABLE graph ( + latch VARCHAR(32) NULL, + origid BIGINT UNSIGNED NULL, + destid BIGINT UNSIGNED NULL, + weight DOUBLE NULL, + seq BIGINT UNSIGNED NULL, + linkid BIGINT UNSIGNED NULL, + KEY (latch, origid, destid) USING HASH, + KEY (latch, destid, origid) USING HASH + ) ENGINE=OQGRAPH DATA_TABLE='graph_base' ORIGID='from_id', DESTID='to_id'; + + +INSERT INTO graph_base(from_id, to_id) VALUES (1,2), (2,1); + +SET @query_cache_size.saved = @@query_cache_size; +SET GLOBAL query_cache_size = 1024*1024; + +SELECT * FROM graph; +UPDATE graph_base SET to_id = 20 WHERE from_id = 1; + +SELECT * FROM graph; +SELECT SQL_NO_CACHE * FROM graph; + +SET GLOBAL query_cache_size = 0; + +SELECT SQL_NO_CACHE * FROM graph; + +DROP TABLE graph_base, graph; +SET GLOBAL query_cache_size = @query_cache_size.saved; + diff --git a/storage/perfschema/table_events_waits.cc b/storage/perfschema/table_events_waits.cc index 8fb7ca91c44..35b90816713 100644 --- a/storage/perfschema/table_events_waits.cc +++ b/storage/perfschema/table_events_waits.cc @@ -315,11 +315,15 @@ int table_events_waits_common::make_socket_object_columns(volatile PFS_events_wa uint port; char port_str[128]; char ip_str[INET6_ADDRSTRLEN+1]; - uint ip_len= 0; + /* + "ip_length" was "ip_len" originally. + but it conflicted with some macro on AIX. Renamed. + */ + uint ip_length= 0; port_str[0]= ':'; /* Get the IP address and port number */ - ip_len= pfs_get_socket_address(ip_str, sizeof(ip_str), &port, + ip_length= pfs_get_socket_address(ip_str, sizeof(ip_str), &port, &safe_socket->m_sock_addr, safe_socket->m_addr_len); @@ -327,15 +331,15 @@ int table_events_waits_common::make_socket_object_columns(volatile PFS_events_wa int port_len= int10_to_str(port, (port_str+1), 10) - port_str + 1; /* OBJECT NAME */ - m_row.m_object_name_length= ip_len + port_len; + m_row.m_object_name_length= ip_length + port_len; if (unlikely((m_row.m_object_name_length == 0) || (m_row.m_object_name_length > sizeof(m_row.m_object_name)))) return 1; char *name= m_row.m_object_name; - memcpy(name, ip_str, ip_len); - memcpy(name + ip_len, port_str, port_len); + memcpy(name, ip_str, ip_length); + memcpy(name + ip_length, port_str, port_len); } else { diff --git a/storage/xtradb/CMakeLists.txt b/storage/xtradb/CMakeLists.txt index 0c63afd744e..0aa177c6657 100644 --- a/storage/xtradb/CMakeLists.txt +++ b/storage/xtradb/CMakeLists.txt @@ -237,7 +237,7 @@ ENDIF() IF(MSVC) ADD_DEFINITIONS(-DHAVE_WINDOWS_ATOMICS) - #SET(XTRADB_OK 1) + SET(XTRADB_OK 1) # Avoid "unreferenced label" warning in generated file GET_FILENAME_COMPONENT(_SRC_DIR ${CMAKE_CURRENT_LIST_FILE} PATH) diff --git a/storage/xtradb/include/sync0sync.h b/storage/xtradb/include/sync0sync.h index 03914741f96..19cfcddd1f5 100644 --- a/storage/xtradb/include/sync0sync.h +++ b/storage/xtradb/include/sync0sync.h @@ -199,10 +199,10 @@ necessary only if the memory block containing it is freed. */ pfs_mutex_enter_nowait_func((M), __FILE__, __LINE__) # define mutex_enter_first(M) \ - pfs_mutex_enter_func((M), __FILE__, __LINE__, HIGH_PRIO) + pfs_mutex_enter_func((M), __FILE__, __LINE__, IB_HIGH_PRIO) # define mutex_enter_last(M) \ - pfs_mutex_enter_func((M), __FILE__, __LINE__, LOW_PRIO) + pfs_mutex_enter_func((M), __FILE__, __LINE__, IB_LOW_PRIO) # define mutex_exit(M) pfs_mutex_exit_func(M) @@ -231,10 +231,10 @@ original non-instrumented functions */ mutex_enter_nowait_func((M), __FILE__, __LINE__) # define mutex_enter_first(M) \ - mutex_enter_func((M), __FILE__, __LINE__, HIGH_PRIO) + mutex_enter_func((M), __FILE__, __LINE__, IB_HIGH_PRIO) # define mutex_enter_last(M) \ - mutex_enter_func((M), __FILE__, __LINE__, LOW_PRIO) + mutex_enter_func((M), __FILE__, __LINE__, IB_LOW_PRIO) # define mutex_exit(M) mutex_exit_func(M) @@ -326,8 +326,8 @@ directly. Locks a priority mutex for the current thread. If the mutex is reserved the function spins a preset time (controlled by SYNC_SPIN_ROUNDS) waiting for the mutex before suspending the thread. If the thread is suspended, the priority argument value determines the relative order for its wake up. Any -HIGH_PRIO waiters will be woken up before any LOW_PRIO waiters. In case of -DEFAULT_PRIO, the relative priority will be set according to +IB_HIGH_PRIO waiters will be woken up before any IB_LOW_PRIO waiters. In case of +IB_DEFAULT_PRIO, the relative priority will be set according to srv_current_thread_priority. */ UNIV_INLINE void @@ -337,7 +337,7 @@ mutex_enter_func( const char* file_name, /*!< in: file name where locked */ ulint line, /*!< in: line where locked */ - enum ib_sync_priority priority = DEFAULT_PRIO); + enum ib_sync_priority priority = IB_DEFAULT_PRIO); /*!<in: mutex acquisition priority */ /********************************************************************//** @@ -454,7 +454,7 @@ pfs_mutex_enter_func( const char* file_name, /*!< in: file name where locked */ ulint line, /*!< in: line where locked */ - enum ib_sync_priority priority = DEFAULT_PRIO); + enum ib_sync_priority priority = IB_DEFAULT_PRIO); /*!<in: mutex acquisition priority */ /********************************************************************//** diff --git a/storage/xtradb/include/sync0sync.ic b/storage/xtradb/include/sync0sync.ic index 4a1707654cb..d6a95156ff4 100644 --- a/storage/xtradb/include/sync0sync.ic +++ b/storage/xtradb/include/sync0sync.ic @@ -277,8 +277,8 @@ directly. Locks a priority mutex for the current thread. If the mutex is reserved the function spins a preset time (controlled by SYNC_SPIN_ROUNDS) waiting for the mutex before suspending the thread. If the thread is suspended, the priority argument value determines the relative order for its wake up. Any -HIGH_PRIO waiters will be woken up before any LOW_PRIO waiters. In case of -DEFAULT_PRIO, the relative priority will be set according to +IB_HIGH_PRIO waiters will be woken up before any IB_LOW_PRIO waiters. In case +of IB_DEFAULT_PRIO, the relative priority will be set according to srv_current_thread_priority. */ UNIV_INLINE void @@ -308,10 +308,10 @@ mutex_enter_func( return; /* Succeeded! */ } - if (UNIV_LIKELY(priority == DEFAULT_PRIO)) { + if (UNIV_LIKELY(priority == IB_DEFAULT_PRIO)) { high_priority = srv_current_thread_priority; } else { - high_priority = (priority == HIGH_PRIO); + high_priority = (priority == IB_HIGH_PRIO); } mutex_spin_wait(mutex, high_priority, file_name, line); } diff --git a/storage/xtradb/include/sync0types.h b/storage/xtradb/include/sync0types.h index 67f613ab8ae..04baaa0339d 100644 --- a/storage/xtradb/include/sync0types.h +++ b/storage/xtradb/include/sync0types.h @@ -36,9 +36,9 @@ struct ib_prio_mutex_t; /** Priority mutex and rwlatch acquisition priorities */ enum ib_sync_priority { - DEFAULT_PRIO, - LOW_PRIO, - HIGH_PRIO + IB_DEFAULT_PRIO, + IB_LOW_PRIO, + IB_HIGH_PRIO }; #endif |