summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/my_global.h32
-rw-r--r--mysql-test/r/mysqld--help.result9
-rw-r--r--mysql-test/suite/csv/csv.result8
-rw-r--r--mysql-test/suite/csv/csv.test9
-rw-r--r--mysql-test/suite/perfschema/r/dml_setup_instruments.result4
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result88
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test108
-rw-r--r--mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result13
-rw-r--r--mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test14
-rw-r--r--mysys/mf_iocache.c4
-rw-r--r--mysys/my_rename.c15
-rw-r--r--plugin/handler_socket/libhsclient/auto_file.hpp5
-rw-r--r--sql-common/mysql_async.c6
-rw-r--r--sql/field.h4
-rw-r--r--sql/field_conv.cc111
-rw-r--r--sql/item.cc46
-rw-r--r--sql/item.h23
-rw-r--r--sql/item_create.cc7
-rw-r--r--sql/item_func.h4
-rw-r--r--sql/log.cc29
-rw-r--r--sql/mysqld.cc13
-rw-r--r--sql/mysqld.h6
-rw-r--r--sql/rpl_parallel.cc952
-rw-r--r--sql/rpl_parallel.h146
-rw-r--r--sql/rpl_rli.cc63
-rw-r--r--sql/rpl_rli.h20
-rw-r--r--sql/scheduler.h3
-rw-r--r--sql/slave.cc6
-rw-r--r--sql/sql_class.cc52
-rw-r--r--sql/sql_class.h29
-rw-r--r--sql/sql_select.cc11
-rw-r--r--sql/sys_vars.cc43
-rw-r--r--sql/table.h10
-rw-r--r--sql/threadpool_unix.cc5
-rw-r--r--storage/connect/os.h6
-rw-r--r--storage/oqgraph/ha_oqgraph.h14
-rw-r--r--storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.opt1
-rw-r--r--storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.result41
-rw-r--r--storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.test48
-rw-r--r--storage/perfschema/table_events_waits.cc14
-rw-r--r--storage/xtradb/CMakeLists.txt2
-rw-r--r--storage/xtradb/include/sync0sync.h16
-rw-r--r--storage/xtradb/include/sync0sync.ic8
-rw-r--r--storage/xtradb/include/sync0types.h6
44 files changed, 1521 insertions, 533 deletions
diff --git a/include/my_global.h b/include/my_global.h
index c3d9ac21861..e9a472e686e 100644
--- a/include/my_global.h
+++ b/include/my_global.h
@@ -144,6 +144,7 @@
/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */
#if defined(_AIX) && defined(_LARGE_FILE_API)
#undef _LARGE_FILE_API
+#undef __GNUG__
#endif
/*
@@ -264,6 +265,16 @@
#endif
#endif
+
+#ifdef _AIX
+/*
+ AIX includes inttypes.h from sys/types.h
+ Explicitly request format macros before the first inclusion of inttypes.h
+*/
+#define __STDC_FORMAT_MACROS
+#endif
+
+
#if !defined(__WIN__)
#ifndef _POSIX_PTHREAD_SEMANTICS
#define _POSIX_PTHREAD_SEMANTICS /* We want posix threads */
@@ -316,6 +327,13 @@ C_MODE_END
#define _LONG_LONG 1 /* For AIX string library */
#endif
+/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */
+#if defined(_AIX) && defined(_LARGE_FILE_API)
+#undef _LARGE_FILE_API
+#undef __GNUG__
+#endif
+
+
#ifndef stdin
#include <stdio.h>
#endif
@@ -341,6 +359,14 @@ C_MODE_END
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
#endif
+
+/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */
+#if defined(_AIX) && defined(_LARGE_FILE_API)
+#undef _LARGE_FILE_API
+#undef __GNUG__
+#endif
+
+
#ifdef HAVE_FCNTL_H
#include <fcntl.h>
#endif
@@ -1214,4 +1240,10 @@ static inline double rint(double x)
#define HAVE_EXTERNAL_CLIENT
#endif /* EMBEDDED_LIBRARY */
+/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */
+#if defined(_AIX) && defined(_LARGE_FILE_API)
+#undef _LARGE_FILE_API
+#undef __GNUG__
+#endif
+
#endif /* my_global_h */
diff --git a/mysql-test/r/mysqld--help.result b/mysql-test/r/mysqld--help.result
index 9b2b6e12a14..19f982ef554 100644
--- a/mysql-test/r/mysqld--help.result
+++ b/mysql-test/r/mysqld--help.result
@@ -867,6 +867,14 @@ The following options may be given as the first argument:
operations that are idempotent. This means that CREATE
TABLE is treated CREATE TABLE OR REPLACE and DROP TABLE
is threated as DROP TABLE IF EXISTS.
+ --slave-domain-parallel-threads=#
+ Maximum number of parallel threads to use on slave for
+ events in a single replication domain. When using
+ multiple domains, this can be used to limit a single
+ domain from grabbing all threads and thus stalling other
+ domains. The default of 0 means to allow a domain to grab
+ as many threads as it wants, up to the value of
+ slave_parallel_threads.
--slave-exec-mode=name
Modes for how replication events should be executed.
Legal values are STRICT (default) and IDEMPOTENT. In
@@ -1275,6 +1283,7 @@ skip-show-database FALSE
skip-slave-start FALSE
slave-compressed-protocol FALSE
slave-ddl-exec-mode IDEMPOTENT
+slave-domain-parallel-threads 0
slave-exec-mode STRICT
slave-max-allowed-packet 1073741824
slave-net-timeout 3600
diff --git a/mysql-test/suite/csv/csv.result b/mysql-test/suite/csv/csv.result
index fc6aab530c7..8d497f52b31 100644
--- a/mysql-test/suite/csv/csv.result
+++ b/mysql-test/suite/csv/csv.result
@@ -5485,3 +5485,11 @@ SELECT * FROM t1;
ERROR HY000: Table 't1' is marked as crashed and should be repaired
DROP TABLE t1;
End of 5.1 tests
+#
+# MDEV-5612 - my_rename() deletes files when it shouldn't
+#
+CREATE TABLE t1(a INT NOT NULL) ENGINE=CSV;
+RENAME TABLE t1 TO t2;
+SELECT * FROM t2;
+a
+DROP TABLE t2;
diff --git a/mysql-test/suite/csv/csv.test b/mysql-test/suite/csv/csv.test
index 768a21912a2..90617d06599 100644
--- a/mysql-test/suite/csv/csv.test
+++ b/mysql-test/suite/csv/csv.test
@@ -1917,3 +1917,12 @@ SELECT * FROM t1;
DROP TABLE t1;
--echo End of 5.1 tests
+
+--echo #
+--echo # MDEV-5612 - my_rename() deletes files when it shouldn't
+--echo #
+CREATE TABLE t1(a INT NOT NULL) ENGINE=CSV;
+move_file $MYSQLD_DATADIR/test/t1.CSV $MYSQLD_DATADIR/test/t2.CSV;
+RENAME TABLE t1 TO t2;
+SELECT * FROM t2;
+DROP TABLE t2;
diff --git a/mysql-test/suite/perfschema/r/dml_setup_instruments.result b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
index ec4bfb5b338..c2a9be78385 100644
--- a/mysql-test/suite/perfschema/r/dml_setup_instruments.result
+++ b/mysql-test/suite/perfschema/r/dml_setup_instruments.result
@@ -37,15 +37,15 @@ where name like 'Wait/Synch/Cond/sql/%'
order by name limit 10;
NAME ENABLED TIMED
wait/synch/cond/sql/COND_flush_thread_cache YES YES
+wait/synch/cond/sql/COND_group_commit_orderer YES YES
wait/synch/cond/sql/COND_manager YES YES
wait/synch/cond/sql/COND_parallel_entry YES YES
wait/synch/cond/sql/COND_prepare_ordered YES YES
wait/synch/cond/sql/COND_queue_state YES YES
wait/synch/cond/sql/COND_rpl_thread YES YES
wait/synch/cond/sql/COND_rpl_thread_pool YES YES
+wait/synch/cond/sql/COND_rpl_thread_queue YES YES
wait/synch/cond/sql/COND_server_started YES YES
-wait/synch/cond/sql/COND_thread_cache YES YES
-wait/synch/cond/sql/COND_thread_count YES YES
select * from performance_schema.setup_instruments
where name='Wait';
select * from performance_schema.setup_instruments
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 7dae4de075c..b04d9e7748e 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -115,6 +115,7 @@ SET GLOBAL slave_parallel_threads=10;
SET debug_sync='RESET';
include/start_slave.inc
*** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
+SET debug_sync='RESET';
FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
@@ -141,6 +142,7 @@ INSERT INTO t3 VALUES (6, foo(16,
''));
SET debug_sync='now WAIT_FOR master_queued3';
SET debug_sync='now SIGNAL master_cont1';
+SET debug_sync='RESET';
SELECT * FROM t3 ORDER BY a;
a b
1 1
@@ -213,6 +215,9 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
slave-bin.000003 # Xid # # COMMIT /* XID */
*** Test STOP SLAVE in parallel mode ***
include/stop_slave.inc
+SET debug_sync='RESET';
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
SET binlog_direct_non_transactional_updates=0;
SET sql_log_bin=0;
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
@@ -227,10 +232,15 @@ INSERT INTO t3 VALUES(21, 21);
INSERT INTO t3 VALUES(22, 22);
SET binlog_format=@old_format;
BEGIN;
-INSERT INTO t2 VALUES (21);
+INSERT INTO t2 VALUES (21);
START SLAVE;
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
STOP SLAVE;
+SET debug_sync='now WAIT_FOR wait_for_done_waiting';
ROLLBACK;
+SET GLOBAL debug_dbug=@old_dbug;
+SET debug_sync='RESET';
include/wait_for_slave_to_stop.inc
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
a
@@ -292,6 +302,7 @@ a b
32 32
33 33
34 34
+SET debug_sync='RESET';
SET sql_log_bin=0;
CALL mtr.add_suppression("Query execution was interrupted");
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
@@ -308,6 +319,7 @@ STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
31 31
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -379,6 +391,7 @@ a b
42 42
43 43
44 44
+SET debug_sync='RESET';
SET debug_sync='now WAIT_FOR t2_query';
SET debug_sync='now SIGNAL t2_cont';
SET debug_sync='now WAIT_FOR t1_ready';
@@ -386,9 +399,7 @@ KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1964]
-SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
-a b
-41 41
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -463,6 +474,7 @@ a b
52 52
53 53
54 54
+SET debug_sync='RESET';
SET debug_sync='now WAIT_FOR t2_query';
SET debug_sync='now SIGNAL t2_cont';
SET debug_sync='now WAIT_FOR t1_ready';
@@ -473,6 +485,7 @@ include/wait_for_slave_sql_error.inc [errno=1317,1964]
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
a b
51 51
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -514,14 +527,18 @@ include/start_slave.inc
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
-SET GLOBAL slave_parallel_threads=3;
+SET GLOBAL slave_parallel_threads=4;
include/start_slave.inc
*** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
SET binlog_format=statement;
SET gtid_domain_id=2;
+BEGIN;
+INSERT INTO t3 VALUES (70, foo(70,
+'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
INSERT INTO t3 VALUES (60, foo(60,
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+COMMIT;
SET gtid_domain_id=0;
SET debug_sync='now WAIT_FOR d2_query';
SET gtid_domain_id=1;
@@ -540,15 +557,27 @@ INSERT INTO t3 VALUES (63, foo(63,
'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
SET debug_sync='now WAIT_FOR d0_query';
+SET gtid_domain_id=3;
+BEGIN;
+INSERT INTO t3 VALUES (68, foo(68,
+'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
+INSERT INTO t3 VALUES (69, foo(69,
+'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
+'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+SET debug_sync='now WAIT_FOR d3_query';
SET debug_sync='now SIGNAL d2_cont2';
SET debug_sync='now WAIT_FOR d2_done';
SET debug_sync='now SIGNAL d1_cont2';
SET debug_sync='now WAIT_FOR d1_done';
SET debug_sync='now SIGNAL d0_cont2';
SET debug_sync='now WAIT_FOR d0_done';
+SET debug_sync='now SIGNAL d3_cont2';
+SET debug_sync='now WAIT_FOR d3_done';
SET binlog_format=statement;
INSERT INTO t3 VALUES (64, foo(64,
-'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
INSERT INTO t3 VALUES (65, foo(65, '', ''));
SET debug_sync='now WAIT_FOR master_queued2';
@@ -569,23 +598,34 @@ a b
65 65
66 66
67 67
+68 68
+69 69
+70 70
+SET debug_sync='RESET';
SET debug_sync='now SIGNAL d0_cont';
SET debug_sync='now WAIT_FOR t1_waiting';
+SET debug_sync='now SIGNAL d3_cont';
+SET debug_sync='now WAIT_FOR t2_waiting';
SET debug_sync='now SIGNAL d1_cont';
SET debug_sync='now WAIT_FOR t3_waiting';
SET debug_sync='now SIGNAL d2_cont';
+SET debug_sync='now WAIT_FOR t4_waiting';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t3_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
a b
60 60
61 61
62 62
63 63
64 64
+68 68
+69 69
+70 70
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -597,11 +637,11 @@ RETURN x;
END
||
SET sql_log_bin=1;
-INSERT INTO t3 VALUES (69,0);
+UPDATE t3 SET b=b+1 WHERE a=60;
include/start_slave.inc
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
a b
-60 60
+60 61
61 61
62 62
63 63
@@ -609,7 +649,9 @@ a b
65 65
66 66
67 67
-69 0
+68 68
+69 69
+70 70
SET sql_log_bin=0;
DROP FUNCTION foo;
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
@@ -634,37 +676,31 @@ include/start_slave.inc
SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
SET GLOBAL slave_parallel_max_queued=9000;
SET binlog_format=statement;
-INSERT INTO t3 VALUES (70, foo(0,
+INSERT INTO t3 VALUES (80, foo(0,
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
SET debug_sync='now WAIT_FOR query_waiting';
SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
-INSERT INTO t3 VALUES (72, 0);
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
a b
-70 0
-71 10000
-72 0
+80 0
+81 10000
SET debug_sync='now WAIT_FOR wait_queue_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR wait_queue_killed';
SET debug_sync='now SIGNAL query_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
-a b
-70 0
-71 10000
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
-INSERT INTO t3 VALUES (73,0);
+INSERT INTO t3 VALUES (82,0);
+SET debug_sync='RESET';
include/start_slave.inc
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
a b
-70 0
-71 10000
-72 0
-73 0
+80 0
+81 10000
+82 0
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index 82cec0735fc..0a064e7a227 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -163,6 +163,7 @@ SET debug_sync='RESET';
--echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
--connection server_1
+SET debug_sync='RESET';
FLUSH LOGS;
--source include/wait_for_binlog_checkpoint.inc
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
@@ -230,6 +231,7 @@ REAP;
REAP;
--connection con_temp5
REAP;
+SET debug_sync='RESET';
--connection server_1
SELECT * FROM t3 ORDER BY a;
@@ -270,6 +272,10 @@ SELECT * FROM t3 ORDER BY a;
--echo *** Test STOP SLAVE in parallel mode ***
--connection server_2
--source include/stop_slave.inc
+# Respawn all worker threads to clear any left-over debug_sync or other stuff.
+SET debug_sync='RESET';
+SET GLOBAL slave_parallel_threads=0;
+SET GLOBAL slave_parallel_threads=10;
--connection server_1
# Set up a couple of transactions. The first will be blocked halfway
@@ -288,7 +294,7 @@ BEGIN;
INSERT INTO t2 VALUES (20);
--disable_warnings
INSERT INTO t1 VALUES (20);
---disable_warnings
+--enable_warnings
INSERT INTO t2 VALUES (21);
INSERT INTO t3 VALUES (20, 20);
COMMIT;
@@ -300,7 +306,7 @@ SET binlog_format=@old_format;
# Start a connection that will block the replicated transaction halfway.
--connection con_temp1
BEGIN;
-INSERT INTO t2 VALUES (21);
+INSERT INTO t2 VALUES (21);
--connection server_2
START SLAVE;
@@ -312,13 +318,20 @@ START SLAVE;
--connection con_temp2
# Initiate slave stop. It will have to wait for the current event group
# to complete.
+# The dbug injection causes debug_sync to signal 'wait_for_done_waiting'
+# when the SQL driver thread is ready.
+SET @old_dbug= @@GLOBAL.debug_dbug;
+SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
send STOP SLAVE;
--connection con_temp1
+SET debug_sync='now WAIT_FOR wait_for_done_waiting';
ROLLBACK;
--connection con_temp2
reap;
+SET GLOBAL debug_dbug=@old_dbug;
+SET debug_sync='RESET';
--connection server_2
--source include/wait_for_slave_to_stop.inc
@@ -397,6 +410,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
SET sql_log_bin=0;
@@ -431,6 +445,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -535,6 +550,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Wait until T2 is inside executing its insert of 42, then find it in SHOW
@@ -559,10 +575,10 @@ SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1964
--source include/wait_for_slave_sql_error.inc
-SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -673,6 +689,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Wait until T2 is inside executing its insert of 52, then find it in SHOW
@@ -701,6 +718,7 @@ SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -752,7 +770,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos;
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
-SET GLOBAL slave_parallel_threads=3;
+SET GLOBAL slave_parallel_threads=4;
--source include/start_slave.inc
@@ -762,24 +780,29 @@ SET GLOBAL slave_parallel_threads=3;
# can run in parallel with each other (same group commit and commit id),
# but not in parallel with T1.
#
-# We use three worker threads. T1 and T2 will be queued on the first, T3 on
-# the second, and T4 on the third. We will delay T1 commit, T3 will wait for
-# T1 to commit before it can start. We will kill T3 during this wait, and
+# We use four worker threads, each Ti will be queued on each their own
+# worker thread. We will delay T1 commit, T3 will wait for T1 to begin
+# commit before it can start. We will kill T3 during this wait, and
# check that everything works correctly.
#
# It is rather tricky to get the correct thread id of the worker to kill.
-# We start by injecting three dummy transactions in a debug_sync-controlled
+# We start by injecting four dummy transactions in a debug_sync-controlled
# manner to be able to get known thread ids for the workers in a pool with
-# just 3 worker threads. Then we let in each of the real test transactions
+# just 4 worker threads. Then we let in each of the real test transactions
# T1-T4 one at a time in a way which allows us to know which transaction
# ends up with which thread id.
--connection server_1
SET binlog_format=statement;
SET gtid_domain_id=2;
+BEGIN;
+# This debug_sync will linger on and be used to control T4 later.
+INSERT INTO t3 VALUES (70, foo(70,
+ 'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
INSERT INTO t3 VALUES (60, foo(60,
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
+COMMIT;
SET gtid_domain_id=0;
--connection server_2
@@ -813,12 +836,30 @@ INSERT INTO t3 VALUES (63, foo(63,
SET debug_sync='now WAIT_FOR d0_query';
--let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'`
+--connection server_1
+SET gtid_domain_id=3;
+BEGIN;
+# These debug_sync's will linger on and be used to control T2 later.
+INSERT INTO t3 VALUES (68, foo(68,
+ 'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
+INSERT INTO t3 VALUES (69, foo(69,
+ 'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
+ 'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
+COMMIT;
+SET gtid_domain_id=0;
+
+--connection server_2
+SET debug_sync='now WAIT_FOR d3_query';
+--let $d3_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(69%' AND INFO NOT LIKE '%LIKE%'`
+
SET debug_sync='now SIGNAL d2_cont2';
SET debug_sync='now WAIT_FOR d2_done';
SET debug_sync='now SIGNAL d1_cont2';
SET debug_sync='now WAIT_FOR d1_done';
SET debug_sync='now SIGNAL d0_cont2';
SET debug_sync='now WAIT_FOR d0_done';
+SET debug_sync='now SIGNAL d3_cont2';
+SET debug_sync='now WAIT_FOR d3_done';
# Now prepare the real transactions T1, T2, T3, T4 on the master.
@@ -826,7 +867,7 @@ SET debug_sync='now WAIT_FOR d0_done';
# Create transaction T1.
SET binlog_format=statement;
INSERT INTO t3 VALUES (64, foo(64,
- 'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
+ 'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
# Create transaction T2, as a group commit leader on the master.
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
@@ -861,6 +902,7 @@ REAP;
--connection server_1
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+SET debug_sync='RESET';
--connection server_2
# Now we have the four transactions pending for replication on the slave.
@@ -872,15 +914,20 @@ SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
SET debug_sync='now SIGNAL d0_cont';
SET debug_sync='now WAIT_FOR t1_waiting';
-# T2 will be queued on the same worker D0 as T1.
+# Make the worker D3 free, and wait for T2 to be queued in it.
+SET debug_sync='now SIGNAL d3_cont';
+SET debug_sync='now WAIT_FOR t2_waiting';
+
# Now release worker D1, and wait for T3 to be queued in it.
# T3 will wait for T1 to commit before it can start.
SET debug_sync='now SIGNAL d1_cont';
SET debug_sync='now WAIT_FOR t3_waiting';
-# Release worker D2. T4 may or may not have time to be queued on it, but
-# it will not be able to complete due to T3 being killed.
+# Release worker D2. Wait for T4 to be queued, so we are sure it has
+# received the debug_sync signal (else we might overwrite it with the
+# next debug_sync).
SET debug_sync='now SIGNAL d2_cont';
+SET debug_sync='now WAIT_FOR t4_waiting';
# Now we kill the waiting transaction T3 in worker D1.
--replace_result $d1_thd_id THD_ID
@@ -895,10 +942,15 @@ SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1927,1964
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
+# Since T2, T3, and T4 run in parallel, we can not be sure if T2 will have time
+# to commit or not before the stop. However, T1 should commit, and T3/T4 may
+# not have committed. (After slave restart we check that all become committed
+# eventually).
+SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
# Now we have to disable the debug_sync statements, so they do not trigger
# when the events are retried.
+SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
SET sql_log_bin=0;
@@ -914,7 +966,7 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
SET sql_log_bin=1;
--connection server_1
-INSERT INTO t3 VALUES (69,0);
+UPDATE t3 SET b=b+1 WHERE a=60;
--save_master_pos
--connection server_2
@@ -951,6 +1003,7 @@ SET GLOBAL slave_parallel_threads=10;
--echo *** 5. Test killing thread that is waiting for queue of max length to shorten ***
+# Find the thread id of the driver SQL thread that we want to kill.
--let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'
--source include/wait_condition.inc
--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'`
@@ -961,12 +1014,8 @@ SET GLOBAL slave_parallel_max_queued=9000;
--let bigstring= `SELECT REPEAT('x', 10000)`
SET binlog_format=statement;
# Create an event that will wait to be signalled.
-INSERT INTO t3 VALUES (70, foo(0,
+INSERT INTO t3 VALUES (80, foo(0,
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
---disable_query_log
-# Create an event that will fill up the queue.
-eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring'));
---enable_query_log
--connection server_2
SET debug_sync='now WAIT_FOR query_waiting';
@@ -977,11 +1026,14 @@ SET @old_dbug= @@GLOBAL.debug_dbug;
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
--connection server_1
-# This event will have to wait for the queue to become shorter before it can
-# be queued. We will test that things work when we kill the SQL driver thread
-# during this wait.
-INSERT INTO t3 VALUES (72, 0);
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+--disable_query_log
+# Create an event that will fill up the queue.
+# The Xid event at the end of the event group will have to wait for the Query
+# event with the INSERT to drain so the queue becomes shorter. However that in
+# turn waits for the prior event group to continue.
+eval INSERT INTO t3 VALUES (81, LENGTH('$bigstring'));
+--enable_query_log
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
--connection server_2
SET debug_sync='now WAIT_FOR wait_queue_ready';
@@ -995,19 +1047,19 @@ SET debug_sync='now SIGNAL query_cont';
--let $slave_sql_errno= 1317,1927,1964
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
--connection server_1
-INSERT INTO t3 VALUES (73,0);
+INSERT INTO t3 VALUES (82,0);
--save_master_pos
--connection server_2
+SET debug_sync='RESET';
--source include/start_slave.inc
--sync_with_master
-SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
--connection server_2
diff --git a/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result
new file mode 100644
index 00000000000..9e53d9bd891
--- /dev/null
+++ b/mysql-test/suite/sys_vars/r/slave_domain_parallel_threads_basic.result
@@ -0,0 +1,13 @@
+SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads;
+SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default';
+must be zero because of default
+0
+SELECT @@SESSION.slave_domain_parallel_threads as 'no session var';
+ERROR HY000: Variable 'slave_domain_parallel_threads' is a GLOBAL variable
+SET GLOBAL slave_domain_parallel_threads= 0;
+SET GLOBAL slave_domain_parallel_threads= DEFAULT;
+SET GLOBAL slave_domain_parallel_threads= 10;
+SELECT @@GLOBAL.slave_domain_parallel_threads;
+@@GLOBAL.slave_domain_parallel_threads
+10
+SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads;
diff --git a/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test
new file mode 100644
index 00000000000..7be48fbd4c5
--- /dev/null
+++ b/mysql-test/suite/sys_vars/t/slave_domain_parallel_threads_basic.test
@@ -0,0 +1,14 @@
+--source include/not_embedded.inc
+
+SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads;
+
+SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default';
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+SELECT @@SESSION.slave_domain_parallel_threads as 'no session var';
+
+SET GLOBAL slave_domain_parallel_threads= 0;
+SET GLOBAL slave_domain_parallel_threads= DEFAULT;
+SET GLOBAL slave_domain_parallel_threads= 10;
+SELECT @@GLOBAL.slave_domain_parallel_threads;
+
+SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads;
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index 2008a6f8860..a3cbaff68b0 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -1281,10 +1281,6 @@ read_append_buffer:
size_t transfer_len;
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
- /*
- TODO: figure out if the assert below is needed or correct.
- */
- DBUG_ASSERT(pos_in_file == info->end_of_file);
copy_len=MY_MIN(Count, len_in_buff);
memcpy(Buffer, info->append_read_pos, copy_len);
info->append_read_pos += copy_len;
diff --git a/mysys/my_rename.c b/mysys/my_rename.c
index 8a9e6eb3dfd..09e7eafa980 100644
--- a/mysys/my_rename.c
+++ b/mysys/my_rename.c
@@ -27,19 +27,18 @@ int my_rename(const char *from, const char *to, myf MyFlags)
DBUG_ENTER("my_rename");
DBUG_PRINT("my",("from %s to %s MyFlags %lu", from, to, MyFlags));
-#if defined(HAVE_RENAME)
#if defined(__WIN__)
- /*
- On windows we can't rename over an existing file:
- Remove any conflicting files:
- */
- (void) my_delete(to, MYF(0));
-#endif
+ if (!MoveFileEx(from, to, MOVEFILE_COPY_ALLOWED |
+ MOVEFILE_REPLACE_EXISTING))
+ {
+ my_osmaperr(GetLastError());
+#elif defined(HAVE_RENAME)
if (rename(from,to))
+ {
#else
if (link(from, to) || unlink(from))
-#endif
{
+#endif
my_errno=errno;
error = -1;
if (MyFlags & (MY_FAE+MY_WME))
diff --git a/plugin/handler_socket/libhsclient/auto_file.hpp b/plugin/handler_socket/libhsclient/auto_file.hpp
index 841351e54cd..03c357f4d4e 100644
--- a/plugin/handler_socket/libhsclient/auto_file.hpp
+++ b/plugin/handler_socket/libhsclient/auto_file.hpp
@@ -9,6 +9,11 @@
#ifndef DENA_AUTO_FILE_HPP
#define DENA_AUTO_FILE_HPP
+/* Workaround for _LARGE_FILES and _LARGE_FILE_API incompatibility on AIX */
+#if defined(_AIX) && defined(_LARGE_FILE_API)
+#undef _LARGE_FILE_API
+#endif
+
#include <unistd.h>
#include <sys/types.h>
#include <dirent.h>
diff --git a/sql-common/mysql_async.c b/sql-common/mysql_async.c
index 8f3a91e26fa..ef01f292180 100644
--- a/sql-common/mysql_async.c
+++ b/sql-common/mysql_async.c
@@ -121,6 +121,12 @@ my_connect_async(struct mysql_async_context *b, my_socket fd,
IF_WIN(WSAGetLastError() != WSAEWOULDBLOCK, \
(errno != EAGAIN && errno != EINTR))
+#ifdef _AIX
+#ifndef MSG_DONTWAIT
+#define MSG_DONTWAIT 0
+#endif
+#endif
+
ssize_t
my_recv_async(struct mysql_async_context *b, int fd,
unsigned char *buf, size_t size, int timeout)
diff --git a/sql/field.h b/sql/field.h
index 916f6211bee..cbd9175f26c 100644
--- a/sql/field.h
+++ b/sql/field.h
@@ -75,6 +75,8 @@ struct ha_field_option_struct;
struct st_cache_field;
int field_conv(Field *to,Field *from);
+int field_conv_incompatible(Field *to,Field *from);
+bool memcpy_field_possible(Field *to, Field *from);
int truncate_double(double *nr, uint field_length, uint dec,
bool unsigned_flag, double max_value);
longlong double_to_longlong(double nr, bool unsigned_flag, bool *error);
@@ -2451,7 +2453,7 @@ public:
uint max_packed_col_length(uint max_length);
void free() { value.free(); }
inline void clear_temporary() { bzero((uchar*) &value,sizeof(value)); }
- friend int field_conv(Field *to,Field *from);
+ friend int field_conv_incompatible(Field *to,Field *from);
uint size_of() const { return sizeof(*this); }
bool has_charset(void) const
{ return charset() == &my_charset_bin ? FALSE : TRUE; }
diff --git a/sql/field_conv.cc b/sql/field_conv.cc
index 97c82b9b7bf..f13694e2a13 100644
--- a/sql/field_conv.cc
+++ b/sql/field_conv.cc
@@ -823,40 +823,76 @@ Copy_field::get_copy_func(Field *to,Field *from)
return do_field_eq;
}
+/**
+ Check if it is possible just copy value of the fields
+
+ @param to The field to copy to
+ @param from The field to copy from
+
+ @retval TRUE - it is possible to just copy value of 'from' to 'to'.
+ @retval FALSE - conversion is needed
+*/
+
+bool memcpy_field_possible(Field *to,Field *from)
+{
+ const enum_field_types to_real_type= to->real_type();
+ const enum_field_types from_real_type= from->real_type();
+ const enum_field_types to_type= from->type();
+ return (to_real_type == from_real_type &&
+ !(to->flags & BLOB_FLAG && to->table->copy_blobs) &&
+ to->pack_length() == from->pack_length() &&
+ !(to->flags & UNSIGNED_FLAG && !(from->flags & UNSIGNED_FLAG)) &&
+ to->decimals() == from->decimals() &&
+ to_real_type != MYSQL_TYPE_ENUM &&
+ to_real_type != MYSQL_TYPE_SET &&
+ to_real_type != MYSQL_TYPE_BIT &&
+ (to_real_type != MYSQL_TYPE_NEWDECIMAL ||
+ to->field_length == from->field_length) &&
+ from->charset() == to->charset() &&
+ (!sql_mode_for_dates(to->table->in_use) ||
+ (to_type != MYSQL_TYPE_DATE &&
+ to_type != MYSQL_TYPE_DATETIME)) &&
+ (from_real_type != MYSQL_TYPE_VARCHAR ||
+ ((Field_varstring*)from)->length_bytes ==
+ ((Field_varstring*)to)->length_bytes));
+}
+
/** Simple quick field convert that is called on insert. */
int field_conv(Field *to,Field *from)
{
- if (to->real_type() == from->real_type() &&
- !(to->flags & BLOB_FLAG && to->table->copy_blobs))
- {
- if (to->pack_length() == from->pack_length() &&
- !(to->flags & UNSIGNED_FLAG && !(from->flags & UNSIGNED_FLAG)) &&
- to->decimals() == from->decimals() &&
- to->real_type() != MYSQL_TYPE_ENUM &&
- to->real_type() != MYSQL_TYPE_SET &&
- to->real_type() != MYSQL_TYPE_BIT &&
- (to->real_type() != MYSQL_TYPE_NEWDECIMAL ||
- to->field_length == from->field_length) &&
- from->charset() == to->charset() &&
- (!sql_mode_for_dates(to->table->in_use) ||
- (to->type() != MYSQL_TYPE_DATE &&
- to->type() != MYSQL_TYPE_DATETIME)) &&
- (from->real_type() != MYSQL_TYPE_VARCHAR ||
- ((Field_varstring*)from)->length_bytes ==
- ((Field_varstring*)to)->length_bytes))
- { // Identical fields
- /*
- This may happen if one does 'UPDATE ... SET x=x'
- The test is here mostly for valgrind, but can also be relevant
- if memcpy() is implemented with prefetch-write
- */
- if (to->ptr != from->ptr)
- memcpy(to->ptr,from->ptr,to->pack_length());
- return 0;
- }
+ if (memcpy_field_possible(to, from))
+ { // Identical fields
+ /*
+ This may happen if one does 'UPDATE ... SET x=x'
+ The test is here mostly for valgrind, but can also be relevant
+ if memcpy() is implemented with prefetch-write
+ */
+ if (to->ptr != from->ptr)
+ memcpy(to->ptr, from->ptr, to->pack_length());
+ return 0;
}
+ return field_conv_incompatible(to, from);
+}
+
+
+/**
+ Copy value of the field with conversion.
+
+ @note Impossibility of simple copy should be checked before this call.
+
+ @param to The field to copy to
+ @param from The field to copy from
+
+ @retval TRUE ERROR
+ @retval FALSE OK
+*/
+
+int field_conv_incompatible(Field *to, Field *from)
+{
+ const enum_field_types to_real_type= to->real_type();
+ const enum_field_types from_real_type= from->real_type();
if (to->flags & BLOB_FLAG)
{ // Be sure the value is stored
Field_blob *blob=(Field_blob*) to;
@@ -867,21 +903,22 @@ int field_conv(Field *to,Field *from)
*/
if (to->table->copy_blobs ||
(!blob->value.is_alloced() &&
- from->real_type() != MYSQL_TYPE_STRING &&
- from->real_type() != MYSQL_TYPE_VARCHAR))
+ from_real_type != MYSQL_TYPE_STRING &&
+ from_real_type != MYSQL_TYPE_VARCHAR))
blob->value.copy();
return blob->store(blob->value.ptr(),blob->value.length(),from->charset());
}
- if (from->real_type() == MYSQL_TYPE_ENUM &&
- to->real_type() == MYSQL_TYPE_ENUM &&
+ if (from_real_type == MYSQL_TYPE_ENUM &&
+ to_real_type == MYSQL_TYPE_ENUM &&
from->val_int() == 0)
{
((Field_enum *)(to))->store_type(0);
return 0;
}
- if (from->result_type() == REAL_RESULT)
+ Item_result from_result_type= from->result_type();
+ if (from_result_type == REAL_RESULT)
return to->store(from->val_real());
- if (from->result_type() == DECIMAL_RESULT)
+ if (from_result_type == DECIMAL_RESULT)
{
my_decimal buff;
return to->store_decimal(from->val_decimal(&buff));
@@ -894,10 +931,10 @@ int field_conv(Field *to,Field *from)
else
return to->store_time_dec(&ltime, from->decimals());
}
- if ((from->result_type() == STRING_RESULT &&
+ if ((from_result_type == STRING_RESULT &&
(to->result_type() == STRING_RESULT ||
- (from->real_type() != MYSQL_TYPE_ENUM &&
- from->real_type() != MYSQL_TYPE_SET))) ||
+ (from_real_type != MYSQL_TYPE_ENUM &&
+ from_real_type != MYSQL_TYPE_SET))) ||
to->type() == MYSQL_TYPE_DECIMAL)
{
char buff[MAX_FIELD_WIDTH];
diff --git a/sql/item.cc b/sql/item.cc
index bcba505a265..7901f1186d8 100644
--- a/sql/item.cc
+++ b/sql/item.cc
@@ -5919,13 +5919,51 @@ static int save_field_in_field(Field *from, bool *null_value,
}
+static int memcpy_field_value(Field *to, Field *from)
+{
+ if (to->ptr != from->ptr)
+ memcpy(to->ptr,from->ptr, to->pack_length());
+ return 0;
+}
+
+fast_field_copier Item_field::setup_fast_field_copier(Field *to)
+{
+ DBUG_ENTER("Item_field::setup_fast_field_copier");
+ DBUG_RETURN(memcpy_field_possible(to, field) ?
+ &memcpy_field_value :
+ &field_conv_incompatible);
+}
+
+
/**
Set a field's value from a item.
*/
-void Item_field::save_org_in_field(Field *to)
+void Item_field::save_org_in_field(Field *to,
+ fast_field_copier fast_field_copier_func)
{
- save_field_in_field(field, &null_value, to, TRUE);
+ DBUG_ENTER("Item_field::save_org_in_field");
+ DBUG_PRINT("enter", ("setup: 0x%lx data: 0x%lx",
+ (ulong) to, (ulong) fast_field_copier_func));
+ if (fast_field_copier_func)
+ {
+ if (field->is_null())
+ {
+ null_value= TRUE;
+ set_field_to_null_with_conversions(to, TRUE);
+ DBUG_VOID_RETURN;
+ }
+ to->set_notnull();
+ if (to == field)
+ {
+ null_value= 0;
+ DBUG_VOID_RETURN;
+ }
+ (*fast_field_copier_func)(to, field);
+ }
+ else
+ save_field_in_field(field, &null_value, to, TRUE);
+ DBUG_VOID_RETURN;
}
@@ -7446,9 +7484,9 @@ int Item_ref::save_in_field(Field *to, bool no_conversions)
}
-void Item_ref::save_org_in_field(Field *field)
+void Item_ref::save_org_in_field(Field *field, fast_field_copier optimizer_data)
{
- (*ref)->save_org_in_field(field);
+ (*ref)->save_org_in_field(field, optimizer_data);
}
diff --git a/sql/item.h b/sql/item.h
index f96ae41ef46..1faed26e1ee 100644
--- a/sql/item.h
+++ b/sql/item.h
@@ -709,8 +709,12 @@ public:
/* Function returns 1 on overflow and -1 on fatal errors */
int save_in_field_no_warnings(Field *field, bool no_conversions);
virtual int save_in_field(Field *field, bool no_conversions);
- virtual void save_org_in_field(Field *field)
+ virtual void save_org_in_field(Field *field,
+ fast_field_copier data
+ __attribute__ ((__unused__)))
{ (void) save_in_field(field, 1); }
+ virtual fast_field_copier setup_fast_field_copier(Field *field)
+ { return NULL; }
virtual int save_safe_in_field(Field *field)
{ return save_in_field(field, 1); }
virtual bool send(Protocol *protocol, String *str);
@@ -946,7 +950,7 @@ public:
save_val() is method of val_* family which stores value in the given
field.
*/
- virtual void save_val(Field *to) { save_org_in_field(to); }
+ virtual void save_val(Field *to) { save_org_in_field(to, NULL); }
/*
save_result() is method of val*result() family which stores value in
the given field.
@@ -2070,7 +2074,8 @@ public:
void fix_after_pullout(st_select_lex *new_parent, Item **ref);
void make_field(Send_field *tmp_field);
int save_in_field(Field *field,bool no_conversions);
- void save_org_in_field(Field *field);
+ void save_org_in_field(Field *field, fast_field_copier optimizer_data);
+ fast_field_copier setup_fast_field_copier(Field *field);
table_map used_tables() const;
table_map all_used_tables() const;
enum Item_result result_type () const
@@ -3103,7 +3108,9 @@ public:
bool fix_fields(THD *, Item **);
void fix_after_pullout(st_select_lex *new_parent, Item **ref);
int save_in_field(Field *field, bool no_conversions);
- void save_org_in_field(Field *field);
+ void save_org_in_field(Field *field, fast_field_copier optimizer_data);
+ fast_field_copier setup_fast_field_copier(Field *field)
+ { return (*ref)->setup_fast_field_copier(field); }
enum Item_result result_type () const { return (*ref)->result_type(); }
enum_field_types field_type() const { return (*ref)->field_type(); }
Field *get_tmp_table_field()
@@ -3332,7 +3339,8 @@ public:
bool is_null();
bool get_date(MYSQL_TIME *ltime, ulonglong fuzzydate);
bool send(Protocol *protocol, String *buffer);
- void save_org_in_field(Field *field)
+ void save_org_in_field(Field *field,
+ fast_field_copier data __attribute__ ((__unused__)))
{
save_val(field);
}
@@ -3529,7 +3537,8 @@ public:
return Item_direct_ref::get_date(ltime, fuzzydate);
}
bool send(Protocol *protocol, String *buffer);
- void save_org_in_field(Field *field)
+ void save_org_in_field(Field *field,
+ fast_field_copier data __attribute__ ((__unused__)))
{
if (check_null_ref())
field->set_null();
@@ -3596,7 +3605,7 @@ public:
{}
void save_in_result_field(bool no_conversions)
{
- outer_ref->save_org_in_field(result_field);
+ outer_ref->save_org_in_field(result_field, NULL);
}
bool fix_fields(THD *, Item **);
void fix_after_pullout(st_select_lex *new_parent, Item **ref);
diff --git a/sql/item_create.cc b/sql/item_create.cc
index 8466319b66f..a3e0dc6012b 100644
--- a/sql/item_create.cc
+++ b/sql/item_create.cc
@@ -3197,6 +3197,13 @@ Create_func_binlog_gtid_pos Create_func_binlog_gtid_pos::s_singleton;
Item*
Create_func_binlog_gtid_pos::create_2_arg(THD *thd, Item *arg1, Item *arg2)
{
+#ifdef HAVE_REPLICATION
+ if (!mysql_bin_log.is_open())
+#endif
+ {
+ my_error(ER_NO_BINARY_LOGGING, MYF(0));
+ return NULL;
+ }
thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
return new (thd->mem_root) Item_func_binlog_gtid_pos(arg1, arg2);
}
diff --git a/sql/item_func.h b/sql/item_func.h
index aaba24a5841..c1a92573eec 100644
--- a/sql/item_func.h
+++ b/sql/item_func.h
@@ -1740,7 +1740,9 @@ public:
{
return save_in_field(field, no_conversions, 1);
}
- void save_org_in_field(Field *field) { (void)save_in_field(field, 1, 0); }
+ void save_org_in_field(Field *field,
+ fast_field_copier data __attribute__ ((__unused__)))
+ { (void)save_in_field(field, 1, 0); }
bool register_field_in_read_map(uchar *arg);
bool register_field_in_bitmap(uchar *arg);
bool set_entry(THD *thd, bool create_if_not_exists);
diff --git a/sql/log.cc b/sql/log.cc
index b5b3e1061f7..ebfbba953fa 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6710,13 +6710,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
*/
wfc= orig_entry->thd->wait_for_commit_ptr;
orig_entry->queued_by_other= false;
- if (wfc && wfc->waiting_for_commit)
+ if (wfc && wfc->waitee)
{
mysql_mutex_lock(&wfc->LOCK_wait_commit);
/* Do an extra check here, this time safely under lock. */
- if (wfc->waiting_for_commit)
+ if (wfc->waitee)
{
PSI_stage_info old_stage;
+ wait_for_commit *loc_waitee;
+
/*
By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before
@@ -6727,21 +6729,20 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
queued_by_other flag is set.
*/
wfc->opaque_pointer= orig_entry;
+ DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
orig_entry->thd->ENTER_COND(&wfc->COND_wait_commit,
&wfc->LOCK_wait_commit,
&stage_waiting_for_prior_transaction_to_commit,
&old_stage);
- DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
- while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
+ while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed())
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other));
- if (wfc->waiting_for_commit)
+ if (loc_waitee)
{
/* Wait terminated due to kill. */
- wait_for_commit *loc_waitee= wfc->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running ||
orig_entry->queued_by_other)
@@ -6751,13 +6752,14 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
- } while (wfc->waiting_for_commit);
+ } while (wfc->waitee);
}
else
{
/* We were killed, so remove us from the list of waitee. */
wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ wfc->waitee= NULL;
orig_entry->thd->EXIT_COND(&old_stage);
/* Interrupted by kill. */
@@ -6773,12 +6775,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
}
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
-
- if (wfc->wakeup_error)
- {
- my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
- DBUG_RETURN(-1);
- }
+ }
+ if (wfc && wfc->wakeup_error)
+ {
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ DBUG_RETURN(-1);
}
/*
@@ -9110,7 +9111,7 @@ start_binlog_background_thread()
array_elements(all_binlog_threads));
#endif
- if (mysql_thread_create(key_thread_binlog, &th, NULL,
+ if (mysql_thread_create(key_thread_binlog, &th, &connection_attrib,
binlog_background_thread, NULL))
return 1;
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index d95dc0e0d87..36d0edee660 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -549,6 +549,7 @@ ulong rpl_recovery_rank=0;
ulong stored_program_cache_size= 0;
ulong opt_slave_parallel_threads= 0;
+ulong opt_slave_domain_parallel_threads= 0;
ulong opt_binlog_commit_wait_count= 0;
ulong opt_binlog_commit_wait_usec= 0;
ulong opt_slave_parallel_max_queued= 131072;
@@ -982,8 +983,10 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
-PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
- key_COND_parallel_entry, key_COND_prepare_ordered;
+PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
+ key_COND_rpl_thread_pool,
+ key_COND_parallel_entry, key_COND_group_commit_orderer,
+ key_COND_prepare_ordered;
PSI_cond_key key_COND_wait_gtid;
static PSI_cond_info all_server_conds[]=
@@ -1027,8 +1030,10 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
+ { &key_COND_rpl_thread_queue, "COND_rpl_thread_queue", 0},
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
+ { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}
};
@@ -8139,7 +8144,7 @@ static int mysql_init_variables(void)
my_atomic_rwlock_init(&thread_running_lock);
my_atomic_rwlock_init(&thread_count_lock);
my_atomic_rwlock_init(&statistics_lock);
- my_atomic_rwlock_init(slave_executed_entries_lock);
+ my_atomic_rwlock_init(&slave_executed_entries_lock);
strmov(server_version, MYSQL_SERVER_VERSION);
threads.empty();
thread_cache.empty();
@@ -9436,7 +9441,7 @@ PSI_stage_info stage_binlog_waiting_background_tasks= { 0, "Waiting for backgrou
PSI_stage_info stage_binlog_processing_checkpoint_notify= { 0, "Processing binlog checkpoint notification", 0};
PSI_stage_info stage_binlog_stopping_background_thread= { 0, "Stopping binlog background thread", 0};
PSI_stage_info stage_waiting_for_work_from_sql_thread= { 0, "Waiting for work from SQL thread", 0};
-PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to commit", 0};
+PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0};
PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 75b4c3ba6c4..0d4b23b12e7 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -180,6 +180,7 @@ extern ulong opt_binlog_rows_event_max_size;
extern ulong rpl_recovery_rank, thread_cache_size;
extern ulong stored_program_cache_size;
extern ulong opt_slave_parallel_threads;
+extern ulong opt_slave_domain_parallel_threads;
extern ulong opt_slave_parallel_max_queued;
extern ulong opt_binlog_commit_wait_count;
extern ulong opt_binlog_commit_wait_usec;
@@ -295,8 +296,9 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
key_COND_wait_commit;
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
-extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
- key_COND_parallel_entry;
+extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
+ key_COND_rpl_thread_pool,
+ key_COND_parallel_entry, key_COND_group_commit_orderer;
extern PSI_cond_key key_COND_wait_gtid;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 7e9a4aa6239..5947fb70330 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -93,32 +93,12 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
}
-static bool
-sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
-{
- if (!rgi->rli->abort_slave && !abort_loop)
- return false;
-
- /*
- Do not abort in the middle of an event group that cannot be rolled back.
- */
- if ((thd->transaction.all.modified_non_trans_table ||
- (thd->variables.option_bits & OPTION_KEEP_LOG))
- && in_event_group)
- return false;
- /* ToDo: should we add some timeout like in sql_slave_killed?
- if (rgi->last_event_start_time == 0)
- rgi->last_event_start_time= my_time(0);
- */
-
- return true;
-}
-
-
static void
finish_event_group(THD *thd, int err, uint64 sub_id,
- rpl_parallel_entry *entry, wait_for_commit *wfc)
+ rpl_parallel_entry *entry, rpl_group_info *rgi)
{
+ wait_for_commit *wfc= &rgi->commit_orderer;
+
/*
Remove any left-over registration to wait for a prior commit to
complete. Normally, such wait would already have been removed at
@@ -163,12 +143,26 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < sub_id)
- {
entry->last_committed_sub_id= sub_id;
- mysql_cond_broadcast(&entry->COND_parallel_entry);
- }
+
+ /*
+ If this event group got error, then any following event groups that have
+ not yet started should just skip their group, preparing for stop of the
+ SQL driver thread.
+ */
+ if (unlikely(rgi->is_error) &&
+ entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
+ entry->stop_on_error_sub_id= sub_id;
+ /*
+ We need to mark that this event group started its commit phase, in case we
+ missed it before (otherwise we would deadlock the next event group that is
+ waiting for this). In most cases (normal DML), it will be a no-op.
+ */
+ rgi->mark_start_commit_no_lock();
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ thd->clear_error();
+ thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(err);
}
@@ -185,6 +179,20 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
}
+static void
+unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
+ PSI_stage_info *old_stage)
+{
+ if (*did_enter_cond)
+ {
+ thd->EXIT_COND(old_stage);
+ *did_enter_cond= false;
+ }
+ else
+ mysql_mutex_unlock(lock);
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -193,8 +201,14 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
+ bool group_skip_for_stop= false;
rpl_group_info *group_rgi= NULL;
+ group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
+ rpl_parallel_thread::queued_event *qevs_to_free;
+ rpl_group_info *rgis_to_free;
+ group_commit_orderer *gcos_to_free;
+ size_t total_event_size;
int err;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -234,43 +248,61 @@ handle_rpl_parallel_thread(void *arg)
rpt->running= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
- while (!rpt->stop && !thd->killed)
+ while (!rpt->stop)
{
- rpl_parallel_thread *list;
-
thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
&stage_waiting_for_work_from_sql_thread, &old_stage);
- while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed &&
- !(rpt->current_entry && rpt->current_entry->force_abort))
+ /*
+ There are 4 cases that should cause us to wake up:
+ - Events have been queued for us to handle.
+ - We have an owner, but no events and not inside event group -> we need
+ to release ourself to the thread pool
+ - SQL thread is stopping, and we have an owner but no events, and we are
+ inside an event group; no more events will be queued to us, so we need
+ to abort the group (force_abort==1).
+ - Thread pool shutdown (rpt->stop==1).
+ */
+ while (!( (events= rpt->event_queue) ||
+ (rpt->current_owner && !in_event_group) ||
+ (rpt->current_owner && group_rgi->parallel_entry->force_abort) ||
+ rpt->stop))
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
- rpt->dequeue(events);
+ rpt->dequeue1(events);
thd->EXIT_COND(&old_stage);
- mysql_cond_signal(&rpt->COND_rpl_thread);
more_events:
+ qevs_to_free= NULL;
+ rgis_to_free= NULL;
+ gcos_to_free= NULL;
+ total_event_size= 0;
while (events)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type;
rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
- uint64 wait_for_sub_id;
- uint64 wait_start_sub_id;
- bool end_of_group;
+ bool end_of_group, group_ending;
+ total_event_size+= events->event_size;
if (!events->ev)
{
handle_queued_pos_update(thd, events);
- my_free(events);
+ events->next= qevs_to_free;
+ qevs_to_free= events;
events= next;
continue;
}
err= 0;
group_rgi= rgi;
+ gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
{
+ bool did_enter_cond= false;
+ PSI_stage_info old_stage;
+ uint64 wait_count;
+
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -292,50 +324,87 @@ handle_rpl_parallel_thread(void *arg)
occured.
Also do not start parallel execution of this event group until all
- prior groups have committed that are not safe to run in parallel with.
+ prior groups have reached the commit phase that are not safe to run
+ in parallel with.
*/
- wait_for_sub_id= rgi->wait_commit_sub_id;
- wait_start_sub_id= rgi->wait_start_sub_id;
- if (wait_for_sub_id || wait_start_sub_id)
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (!gco->installed)
{
- bool did_enter_cond= false;
- PSI_stage_info old_stage;
-
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- if (wait_start_sub_id)
+ if (gco->prev_gco)
+ gco->prev_gco->next_gco= gco;
+ gco->installed= true;
+ }
+ wait_count= gco->wait_count;
+ if (wait_count > entry->count_committing_event_groups)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
+ thd->ENTER_COND(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry,
+ &stage_waiting_for_prior_transaction_to_commit,
+ &old_stage);
+ did_enter_cond= true;
+ do
{
- thd->ENTER_COND(&entry->COND_parallel_entry,
- &entry->LOCK_parallel_entry,
- &stage_waiting_for_prior_transaction_to_commit,
- &old_stage);
- did_enter_cond= true;
- DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
- while (wait_start_sub_id > entry->last_committed_sub_id &&
- !thd->check_killed())
- mysql_cond_wait(&entry->COND_parallel_entry,
- &entry->LOCK_parallel_entry);
- if (wait_start_sub_id > entry->last_committed_sub_id)
+ if (thd->check_killed() && !rgi->is_error)
{
- /* The thread got a kill signal. */
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi);
+ /*
+ Even though we were killed, we need to continue waiting for the
+ prior event groups to signal that we can continue. Otherwise we
+ mess up the accounting for ordering. However, now that we have
+ marked the error, events will just be skipped rather than
+ executed, and things will progress quickly towards stop.
+ */
}
- rgi->wait_start_sub_id= 0; /* No need to check again. */
- }
- if (wait_for_sub_id > entry->last_committed_sub_id)
- {
- wait_for_commit *waitee=
- &rgi->wait_commit_group_info->commit_orderer;
- rgi->commit_orderer.register_wait_for_prior_commit(waitee);
- }
- if (did_enter_cond)
- thd->EXIT_COND(&old_stage);
- else
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ mysql_cond_wait(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry);
+ } while (wait_count > entry->count_committing_event_groups);
}
+ if ((tmp_gco= gco->prev_gco))
+ {
+ /*
+ Now all the event groups in the previous batch have entered their
+ commit phase, and will no longer access their gco. So we can free
+ it here.
+ */
+ DBUG_ASSERT(!tmp_gco->prev_gco);
+ gco->prev_gco= NULL;
+ tmp_gco->next_gco= gcos_to_free;
+ gcos_to_free= tmp_gco;
+ }
+
+ if (entry->force_abort && wait_count > entry->stop_count)
+ {
+ /*
+ We are stopping (STOP SLAVE), and this event group is beyond the
+ point where we can safely stop. So set a flag that will cause us
+ to skip, rather than execute, the following events.
+ */
+ group_skip_for_stop= true;
+ }
+ else
+ group_skip_for_stop= false;
+
+ if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
+ group_skip_for_stop= true;
+ else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
+ {
+ /*
+ Register that the commit of this event group must wait for the
+ commit of the previous event group to complete before it may
+ complete itself, so that we preserve commit order.
+ */
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+ unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
+ &did_enter_cond, &old_stage);
+
if(thd->wait_for_commit_ptr)
{
/*
@@ -352,13 +421,23 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer;
}
+ group_ending= event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (((Query_log_event *)events->ev)->is_commit() ||
+ ((Query_log_event *)events->ev)->is_rollback()));
+ if (group_ending)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
+ rgi->mark_start_commit();
+ }
+
/*
If the SQL thread is stopping, we just skip execution of all the
following event groups. We still do all the normal waiting and wakeup
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group))
+ if (!rgi->is_error && !group_skip_for_stop)
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
@@ -366,13 +445,11 @@ handle_rpl_parallel_thread(void *arg)
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
- event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
- (((Query_log_event *)events->ev)->is_commit() ||
- ((Query_log_event *)events->ev)->is_rollback())));
+ group_ending);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
- my_free(events);
+ events->next= qevs_to_free;
+ qevs_to_free= events;
if (err)
{
@@ -382,10 +459,11 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group)
{
in_event_group= false;
- finish_event_group(thd, err, event_gtid_sub_id, entry,
- &rgi->commit_orderer);
- delete rgi;
+ finish_event_group(thd, err, event_gtid_sub_id, entry, rgi);
+ rgi->next= rgis_to_free;
+ rgis_to_free= rgi;
group_rgi= rgi= NULL;
+ group_skip_for_stop= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@@ -393,6 +471,29 @@ handle_rpl_parallel_thread(void *arg)
}
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ /* Signal that our queue can now accept more events. */
+ rpt->dequeue2(total_event_size);
+ mysql_cond_signal(&rpt->COND_rpl_thread_queue);
+ /* We need to delay the free here, to when we have the lock. */
+ while (gcos_to_free)
+ {
+ group_commit_orderer *next= gcos_to_free->next_gco;
+ rpt->free_gco(gcos_to_free);
+ gcos_to_free= next;
+ }
+ while (rgis_to_free)
+ {
+ rpl_group_info *next= rgis_to_free->next;
+ rpt->free_rgi(rgis_to_free);
+ rgis_to_free= next;
+ }
+ while (qevs_to_free)
+ {
+ rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+ rpt->free_qev(qevs_to_free);
+ qevs_to_free= next;
+ }
+
if ((events= rpt->event_queue) != NULL)
{
/*
@@ -400,9 +501,8 @@ handle_rpl_parallel_thread(void *arg)
This is faster than having to wakeup the pool manager thread to give us
a new event.
*/
- rpt->dequeue(events);
+ rpt->dequeue1(events);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- mysql_cond_signal(&rpt->COND_rpl_thread);
goto more_events;
}
@@ -417,27 +517,26 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ thd->wait_for_prior_commit();
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
- group_rgi->parallel_entry, &group_rgi->commit_orderer);
+ group_rgi->parallel_entry, group_rgi);
signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
- delete group_rgi;
- group_rgi= NULL;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_rgi(group_rgi);
+ group_rgi= NULL;
+ group_skip_for_stop= false;
}
if (!in_event_group)
{
+ rpt->current_owner= NULL;
+ /* Tell wait_for_done() that we are done, if it is waiting. */
+ if (likely(rpt->current_entry) &&
+ unlikely(rpt->current_entry->force_abort))
+ mysql_cond_broadcast(&rpt->current_entry->COND_parallel_entry);
rpt->current_entry= NULL;
if (!rpt->stop)
- {
- mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
- list= rpt->pool->free_list;
- rpt->next= list;
- rpt->pool->free_list= rpt;
- if (!list)
- mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool);
- mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool);
- }
+ rpt->pool->release_thread(rpt);
}
}
@@ -466,6 +565,15 @@ handle_rpl_parallel_thread(void *arg)
}
+static void
+dealloc_gco(group_commit_orderer *gco)
+{
+ DBUG_ASSERT(!gco->prev_gco /* Must only free after dealloc previous */);
+ mysql_cond_destroy(&gco->COND_group_commit_orderer);
+ my_free(gco);
+}
+
+
int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count, bool skip_check)
@@ -500,8 +608,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
+ mysql_cond_init(key_COND_rpl_thread_queue,
+ &new_list[i]->COND_rpl_thread_queue, NULL);
new_list[i]->pool= pool;
- if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL,
+ if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
handle_rpl_parallel_thread, new_list[i]))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -538,7 +648,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
for (i= 0; i < pool->count; ++i)
{
- rpl_parallel_thread *rpt= pool->get_thread(NULL);
+ rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
rpt->stop= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -553,6 +663,24 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
mysql_cond_destroy(&rpt->COND_rpl_thread);
+ while (rpt->qev_free_list)
+ {
+ rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next;
+ my_free(rpt->qev_free_list);
+ rpt->qev_free_list= next;
+ }
+ while (rpt->rgi_free_list)
+ {
+ rpl_group_info *next= rpt->rgi_free_list->next;
+ delete rpt->rgi_free_list;
+ rpt->rgi_free_list= next;
+ }
+ while (rpt->gco_free_list)
+ {
+ group_commit_orderer *next= rpt->gco_free_list->next_gco;
+ dealloc_gco(rpt->gco_free_list);
+ rpt->gco_free_list= next;
+ }
}
my_free(pool->threads);
@@ -608,6 +736,121 @@ err:
}
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli)
+{
+ queued_event *qev;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((qev= qev_free_list))
+ qev_free_list= qev->next;
+ else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
+ return NULL;
+ }
+ qev->ev= ev;
+ qev->event_size= event_size;
+ qev->next= NULL;
+ strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
+ qev->event_relay_log_pos= rli->event_relay_log_pos;
+ qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
+ strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
+ return qev;
+}
+
+
+void
+rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ qev->next= qev_free_list;
+ qev_free_list= qev;
+}
+
+
+rpl_group_info*
+rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
+ rpl_parallel_entry *e)
+{
+ rpl_group_info *rgi;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((rgi= rgi_free_list))
+ {
+ rgi_free_list= rgi->next;
+ rgi->reinit(rli);
+ }
+ else
+ {
+ if(!(rgi= new rpl_group_info(rli)))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi));
+ return NULL;
+ }
+ rgi->is_parallel_exec = true;
+ if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
+ rgi->deferred_events= new Deferred_log_events(rli);
+ }
+ if (event_group_new_gtid(rgi, gtid_ev))
+ {
+ free_rgi(rgi);
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ return NULL;
+ }
+ rgi->parallel_entry= e;
+
+ return rgi;
+}
+
+
+void
+rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
+ rgi->free_annotate_event();
+ if (rgi->deferred_events)
+ {
+ delete rgi->deferred_events;
+ rgi->deferred_events= NULL;
+ }
+ rgi->next= rgi_free_list;
+ rgi_free_list= rgi;
+}
+
+
+group_commit_orderer *
+rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
+{
+ group_commit_orderer *gco;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((gco= gco_free_list))
+ gco_free_list= gco->next_gco;
+ else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco));
+ return NULL;
+ }
+ mysql_cond_init(key_COND_group_commit_orderer,
+ &gco->COND_group_commit_orderer, NULL);
+ gco->wait_count= wait_count;
+ gco->prev_gco= prev;
+ gco->next_gco= NULL;
+ gco->installed= false;
+ return gco;
+}
+
+
+void
+rpl_parallel_thread::free_gco(group_commit_orderer *gco)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
+ gco->next_gco= gco_free_list;
+ gco_free_list= gco;
+}
+
+
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: count(0), threads(0), free_list(0), changing(false), inited(false)
{
@@ -650,7 +893,8 @@ rpl_parallel_thread_pool::destroy()
Note that we return with the worker threads's LOCK_rpl_thread mutex locked.
*/
struct rpl_parallel_thread *
-rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
+rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
+ rpl_parallel_entry *entry)
{
rpl_parallel_thread *rpt;
@@ -660,16 +904,152 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
free_list= rpt->next;
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->current_owner= owner;
rpt->current_entry= entry;
return rpt;
}
+/*
+ Release a thread to the thread pool.
+ The thread should be locked, and should not have any work queued for it.
+*/
+void
+rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
+{
+ rpl_parallel_thread *list;
+
+ mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread);
+ DBUG_ASSERT(rpt->current_owner == NULL);
+ mysql_mutex_lock(&LOCK_rpl_thread_pool);
+ list= free_list;
+ rpt->next= list;
+ free_list= rpt;
+ if (!list)
+ mysql_cond_broadcast(&COND_rpl_thread_pool);
+ mysql_mutex_unlock(&LOCK_rpl_thread_pool);
+}
+
+
+/*
+ Obtain a worker thread that we can queue an event to.
+
+ Each invocation allocates a new worker thread, to maximise
+ parallelism. However, only up to a maximum of
+ --slave-domain-parallel-threads workers can be occupied by a single
+ replication domain; after that point, we start re-using worker threads that
+ are still executing events that were queued earlier for this thread.
+
+ We never queue more than --rpl-parallel-wait-queue_max amount of events
+ for one worker, to avoid the SQL driver thread using up all memory with
+ queued events while worker threads are stalling.
+
+ Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread
+ locked. Exception is if we were killed, in which case NULL is returned.
+
+ The *did_enter_cond flag is set true if we had to wait for a worker thread
+ to become free (with mysql_cond_wait()). If so, old_stage will also be set,
+ and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead
+ of mysql_mutex_unlock.
+
+ If the flag `reuse' is set, the last worker thread will be returned again,
+ if it is still available. Otherwise a new worker thread is allocated.
+*/
+rpl_parallel_thread *
+rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+ PSI_stage_info *old_stage, bool reuse)
+{
+ uint32 idx;
+ rpl_parallel_thread *thr;
+
+ idx= rpl_thread_idx;
+ if (!reuse)
+ {
+ ++idx;
+ if (idx >= rpl_thread_max)
+ idx= 0;
+ rpl_thread_idx= idx;
+ }
+ thr= rpl_threads[idx];
+ if (thr)
+ {
+ *did_enter_cond= false;
+ mysql_mutex_lock(&thr->LOCK_rpl_thread);
+ for (;;)
+ {
+ if (thr->current_owner != &rpl_threads[idx])
+ {
+ /*
+ The worker thread became idle, and returned to the free list and
+ possibly was allocated to a different request. So we should allocate
+ a new worker thread.
+ */
+ unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
+ did_enter_cond, old_stage);
+ thr= NULL;
+ break;
+ }
+ else if (thr->queued_size <= opt_slave_parallel_max_queued)
+ {
+ /* The thread is ready to queue into. */
+ break;
+ }
+ else if (rli->sql_driver_thd->check_killed())
+ {
+ unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
+ did_enter_cond, old_stage);
+ my_error(ER_CONNECTION_KILLED, MYF(0));
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
+ };);
+ slave_output_error_info(rli, rli->sql_driver_thd);
+ return NULL;
+ }
+ else
+ {
+ /*
+ We have reached the limit of how much memory we are allowed to use
+ for queuing events, so wait for the thread to consume some of its
+ queue.
+ */
+ if (!*did_enter_cond)
+ {
+ /*
+ We need to do the debug_sync before ENTER_COND().
+ Because debug_sync changes the thd->mysys_var->current_mutex,
+ and this can cause THD::awake to use the wrong mutex.
+ */
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
+ };);
+ rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue,
+ &thr->LOCK_rpl_thread,
+ &stage_waiting_for_room_in_worker_thread,
+ old_stage);
+ *did_enter_cond= true;
+ }
+ mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread);
+ }
+ }
+ }
+ if (!thr)
+ rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
+ this);
+
+ return thr;
+}
+
static void
free_rpl_parallel_entry(void *element)
{
rpl_parallel_entry *e= (rpl_parallel_entry *)element;
+ if (e->current_gco)
+ dealloc_gco(e->current_gco);
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e);
@@ -709,10 +1089,22 @@ rpl_parallel::find(uint32 domain_id)
(const uchar *)&domain_id, 0)))
{
/* Allocate a new, empty one. */
- if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e),
- MYF(MY_ZEROFILL))))
+ ulong count= opt_slave_domain_parallel_threads;
+ if (count == 0 || count > opt_slave_parallel_threads)
+ count= opt_slave_parallel_threads;
+ rpl_parallel_thread **p;
+ if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
+ &e, sizeof(*e),
+ &p, count*sizeof(*p),
+ NULL))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
return NULL;
+ }
+ e->rpl_threads= p;
+ e->rpl_thread_max= count;
e->domain_id= domain_id;
+ e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -730,10 +1122,11 @@ rpl_parallel::find(uint32 domain_id)
void
-rpl_parallel::wait_for_done()
+rpl_parallel::wait_for_done(THD *thd)
{
struct rpl_parallel_entry *e;
- uint32 i;
+ rpl_parallel_thread *rpt;
+ uint32 i, j;
/*
First signal all workers that they must force quit; no more events will
@@ -741,26 +1134,58 @@ rpl_parallel::wait_for_done()
*/
for (i= 0; i < domain_hash.records; ++i)
{
- rpl_parallel_thread *rpt;
-
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ /*
+ We want the worker threads to stop as quickly as is safe. If the slave
+ SQL threads are behind, we could have significant amount of events
+ queued for the workers, and we want to stop without waiting for them
+ all to be applied first. But if any event group has already started
+ executing in a worker, we want to be sure that all prior event groups
+ are also executed, so that we stop at a consistent point in the binlog
+ stream (per replication domain).
+
+ All event groups wait for e->count_committing_event_groups to reach
+ the value of group_commit_orderer::wait_count before starting to
+ execute. Thus, at this point we know that any event group with a
+ strictly larger wait_count are safe to skip, none of them can have
+ started executing yet. So we set e->stop_count here and use it to
+ decide in the worker threads whether to continue executing an event
+ group or whether to skip it, when force_abort is set.
+ */
e->force_abort= true;
- if ((rpt= e->rpl_thread))
+ e->stop_count= e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ for (j= 0; j < e->rpl_thread_max; ++j)
{
- mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- if (rpt->current_entry == e)
- mysql_cond_signal(&rpt->COND_rpl_thread);
- mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if ((rpt= e->rpl_threads[j]))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (rpt->current_owner == &e->rpl_threads[j])
+ mysql_cond_signal(&rpt->COND_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
}
}
+ DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger",
+ {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
+ };);
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
- mysql_mutex_lock(&e->LOCK_parallel_entry);
- while (e->current_sub_id > e->last_committed_sub_id)
- mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
- mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ for (j= 0; j < e->rpl_thread_max; ++j)
+ {
+ if ((rpt= e->rpl_threads[j]))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ while (rpt->current_owner == &e->rpl_threads[j])
+ mysql_cond_wait(&e->COND_parallel_entry, &rpt->LOCK_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
+ }
}
}
@@ -787,6 +1212,21 @@ rpl_parallel::workers_idle()
/*
+ This is used when we get an error during processing in do_event();
+ We will not queue any event to the thread, but we still need to wake it up
+ to be sure that it will be returned to the pool.
+*/
+static void
+abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
+ bool *did_enter_cond, PSI_stage_info *old_stage)
+{
+ unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread,
+ did_enter_cond, old_stage);
+ mysql_cond_signal(&cur_thread->COND_rpl_thread);
+}
+
+
+/*
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can execute the query.
@@ -814,6 +1254,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
/*
Stop queueing additional event groups once the SQL thread is requested to
stop.
+
+ We have to queue any remaining events of any event group that has already
+ been partially queued, but after that we will just ignore any further
+ events the SQL driver thread may try to queue, and eventually it will stop.
*/
if (((typ= ev->get_type_code()) == GTID_EVENT ||
!(is_group_event= Log_event::is_group_event(typ))) &&
@@ -822,188 +1266,121 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (sql_thread_stopping)
{
delete ev;
- /* QQ: Need a better comment why we return false here */
+ /*
+ Return false ("no error"); normal stop is not an error, and otherwise the
+ error has already been recorded.
+ */
return false;
}
- if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
- MYF(0))))
+ if (typ == GTID_EVENT || unlikely(!current))
+ {
+ uint32 domain_id;
+ if (likely(typ == GTID_EVENT))
+ {
+ Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ 0 : gtid_ev->domain_id);
+ }
+ else
+ domain_id= 0;
+ if (!(e= find(domain_id)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ delete ev;
+ return true;
+ }
+ current= e;
+ }
+ else
+ e= current;
+
+ /*
+ Find a worker thread to queue the event for.
+ Prefer a new thread, so we maximise parallelism (at least for the group
+ commit). But do not exceed a limit of --slave-domain-parallel-threads;
+ instead re-use a thread that we queued for previously.
+ */
+ cur_thread=
+ e->choose_thread(rli, &did_enter_cond, &old_stage, typ != GTID_EVENT);
+ if (!cur_thread)
{
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ /* This means we were killed. The error is already signalled. */
+ delete ev;
+ return true;
+ }
+
+ if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
+ {
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, &old_stage);
delete ev;
return true;
}
- qev->ev= ev;
- qev->event_size= event_size;
- qev->next= NULL;
- strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
- qev->event_relay_log_pos= rli->event_relay_log_pos;
- qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
- strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
- 0 : gtid_ev->domain_id);
- if (!(e= find(domain_id)) ||
- !(rgi= new rpl_group_info(rli)) ||
- event_group_new_gtid(rgi, gtid_ev))
+ if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
{
- my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
- delete rgi;
- my_free(qev);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, &old_stage);
delete ev;
return true;
}
- rgi->is_parallel_exec = true;
- if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
- rgi->deferred_events= new Deferred_log_events(rli);
- if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
- e->last_commit_id == gtid_ev->commit_id)
- {
- /*
- We are already executing something else in this domain. But the two
- event groups were committed together in the same group commit on the
- master, so we can still do them in parallel here on the slave.
+ /*
+ We queue the event group in a new worker thread, to run in parallel
+ with previous groups.
- However, the commit of this event must wait for the commit of the prior
- event, to preserve binlog commit order and visibility across all
- servers in the replication hierarchy.
+ To preserve commit order within the replication domain, we set up
+ rgi->wait_commit_sub_id to make the new group commit only after the
+ previous group has committed.
- In addition, we must not start executing this event until we have
- finished the previous collection of event groups that group-committed
- together; we use rgi->wait_start_sub_id to control this.
- */
- rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
- rgi->wait_commit_sub_id= e->current_sub_id;
- rgi->wait_commit_group_info= e->current_group_info;
- rgi->wait_start_sub_id= e->prev_groupcommit_sub_id;
- e->rpl_thread= cur_thread= rpt;
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else
+ Event groups that group-committed together on the master can be run
+ in parallel with each other without restrictions. But one batch of
+ group-commits may not start before all groups in the previous batch
+ have initiated their commit phase; we set up rgi->gco to ensure that.
+ */
+ rgi->wait_commit_sub_id= e->current_sub_id;
+ rgi->wait_commit_group_info= e->current_group_info;
+
+ if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
+ e->last_commit_id == gtid_ev->commit_id))
{
/*
- Check if we already have a worker thread for this entry.
-
- We continue to queue more events up for the worker thread while it is
- still executing the first ones, to be able to start executing a large
- event group without having to wait for the end to be fetched from the
- master. And we continue to queue up more events after the first group,
- so that we can continue to process subsequent parts of the relay log in
- parallel without having to wait for previous long-running events to
- complete.
-
- But if the worker thread is idle at any point, it may return to the
- idle list or start servicing a different request. So check this, and
- allocate a new thread if the old one is no longer processing for us.
+ A new batch of transactions that group-committed together on the master.
+
+ Remember the count that marks the end of the previous group committed
+ batch, and allocate a new gco.
*/
- cur_thread= e->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- for (;;)
- {
- if (cur_thread->current_entry != e)
- {
- /*
- The worker thread became idle, and returned to the free list and
- possibly was allocated to a different request. This also means
- that everything previously queued has already been executed,
- else the worker thread would not have become idle. So we should
- allocate a new worker thread.
- */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- e->rpl_thread= cur_thread= NULL;
- break;
- }
- else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
- break; // The thread is ready to queue into
- else if (rli->sql_driver_thd->check_killed())
- {
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- my_error(ER_CONNECTION_KILLED, MYF(0));
- delete rgi;
- my_free(qev);
- delete ev;
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
- };);
- slave_output_error_info(rli, rli->sql_driver_thd);
- return true;
- }
- else
- {
- /*
- We have reached the limit of how much memory we are allowed to
- use for queuing events, so wait for the thread to consume some
- of its queue.
- */
- if (!did_enter_cond)
- {
- rli->sql_driver_thd->ENTER_COND(&cur_thread->COND_rpl_thread,
- &cur_thread->LOCK_rpl_thread,
- &stage_waiting_for_room_in_worker_thread, &old_stage);
- did_enter_cond= true;
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
- };);
- }
- mysql_cond_wait(&cur_thread->COND_rpl_thread,
- &cur_thread->LOCK_rpl_thread);
- }
- }
- }
+ uint64 count= e->count_queued_event_groups;
+ group_commit_orderer *gco;
- if (!cur_thread)
- {
- /*
- Nothing else is currently running in this domain. We can
- spawn a new thread to do this event group in parallel with
- anything else that might be running in other domains.
- */
- cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else
+ if (!(gco= cur_thread->get_gco(count, e->current_gco)))
{
- /*
- We are still executing the previous event group for this replication
- domain, and we have to wait for that to finish before we can start on
- the next one. So just re-use the thread.
- */
+ cur_thread->free_rgi(rgi);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, &old_stage);
+ delete ev;
+ return true;
}
-
- rgi->wait_commit_sub_id= 0;
- rgi->wait_start_sub_id= 0;
- e->prev_groupcommit_sub_id= e->current_sub_id;
+ e->current_gco= rgi->gco= gco;
}
-
+ else
+ rgi->gco= e->current_gco;
if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
- {
- e->last_server_id= gtid_ev->server_id;
- e->last_seq_no= gtid_ev->seq_no;
e->last_commit_id= gtid_ev->commit_id;
- }
else
- {
- e->last_server_id= 0;
- e->last_seq_no= 0;
e->last_commit_id= 0;
- }
-
qev->rgi= e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
- current= rgi->parallel_entry= e;
+ ++e->count_queued_event_groups;
}
- else if (!is_group_event || !current)
+ else if (!is_group_event || !e)
{
my_off_t log_pos;
int err;
@@ -1013,7 +1390,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Same for events not preceeded by GTID (we should not see those normally,
but they might be from an old master).
- The varuable `current' is NULL for the case where the master did not
+ The variable `e' is NULL for the case where the master did not
have GTID, like a MariaDB 5.5 or MySQL master.
*/
qev->rgi= serial_rgi;
@@ -1040,18 +1417,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (err)
{
- my_free(qev);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, &old_stage);
return true;
}
- qev->ev= NULL;
- qev->future_event_master_log_pos= log_pos;
- if (!current)
- {
- rli->event_relay_log_pos= rli->future_event_relay_log_pos;
- handle_queued_pos_update(rli->sql_driver_thd, qev);
- my_free(qev);
- return false;
- }
/*
Queue an empty event, so that the position will be updated in a
reasonable way relative to other events:
@@ -1064,40 +1434,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
- cur_thread= current->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != current)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- cur_thread= NULL;
- }
- }
- if (!cur_thread)
- cur_thread= current->rpl_thread=
- global_rpl_thread_pool.get_thread(current);
+ qev->ev= NULL;
+ qev->future_event_master_log_pos= log_pos;
}
else
{
- cur_thread= current->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != current)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- cur_thread= NULL;
- }
- }
- if (!cur_thread)
- {
- cur_thread= current->rpl_thread=
- global_rpl_thread_pool.get_thread(current);
- }
- qev->rgi= current->current_group_info;
+ qev->rgi= e->current_group_info;
}
/*
@@ -1105,10 +1447,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev);
- if (did_enter_cond)
- rli->sql_driver_thd->EXIT_COND(&old_stage);
- else
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
+ &did_enter_cond, &old_stage);
mysql_cond_signal(&cur_thread->COND_rpl_thread);
return false;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 019a354c57d..956a31e4b7f 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -9,16 +9,66 @@ struct rpl_parallel_entry;
struct rpl_parallel_thread_pool;
class Relay_log_info;
+
+
+/*
+ Structure used to keep track of the parallel replication of a batch of
+ event-groups that group-committed together on the master.
+
+ It is used to ensure that every event group in one batch has reached the
+ commit stage before the next batch starts executing.
+
+ Note the lifetime of this structure:
+
+ - It is allocated when the first event in a new batch of group commits
+ is queued, from the free list rpl_parallel_entry::gco_free_list.
+
+ - The gco for the batch currently being queued is owned by
+ rpl_parallel_entry::current_gco. The gco for a previous batch that has
+ been fully queued is owned by the gco->prev_gco pointer of the gco for
+ the following batch.
+
+ - The worker thread waits on gco->COND_group_commit_orderer for
+ rpl_parallel_entry::count_committing_event_groups to reach wait_count
+ before starting; the first waiter links the gco into the next_gco
+ pointer of the gco of the previous batch for signalling.
+
+ - When an event group reaches the commit stage, it signals the
+ COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and
+ rpl_parallel_entry::count_committing_event_groups has reached
+ gco->next_gco->wait_count.
+
+ - When gco->wait_count is reached for a worker and the wait completes,
+ the worker frees gco->prev_gco; at this point it is guaranteed not to
+ be needed any longer.
+*/
+struct group_commit_orderer {
+ /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */
+ mysql_cond_t COND_group_commit_orderer;
+ uint64 wait_count;
+ group_commit_orderer *prev_gco;
+ group_commit_orderer *next_gco;
+ bool installed;
+};
+
+
struct rpl_parallel_thread {
bool delay_start;
bool running;
bool stop;
mysql_mutex_t LOCK_rpl_thread;
mysql_cond_t COND_rpl_thread;
+ mysql_cond_t COND_rpl_thread_queue;
struct rpl_parallel_thread *next; /* For free list. */
struct rpl_parallel_thread_pool *pool;
THD *thd;
- struct rpl_parallel_entry *current_entry;
+ /*
+ Who owns the thread, if any (it's a pointer into the
+ rpl_parallel_entry::rpl_threads array.
+ */
+ struct rpl_parallel_thread **current_owner;
+ /* The rpl_parallel_entry of the owner. */
+ rpl_parallel_entry *current_entry;
struct queued_event {
queued_event *next;
Log_event *ev;
@@ -31,6 +81,9 @@ struct rpl_parallel_thread {
size_t event_size;
} *event_queue, *last_in_queue;
uint64 queued_size;
+ queued_event *qev_free_list;
+ rpl_group_info *rgi_free_list;
+ group_commit_orderer *gco_free_list;
void enqueue(queued_event *qev)
{
@@ -42,15 +95,25 @@ struct rpl_parallel_thread {
queued_size+= qev->event_size;
}
- void dequeue(queued_event *list)
+ void dequeue1(queued_event *list)
{
- queued_event *tmp;
-
DBUG_ASSERT(list == event_queue);
event_queue= last_in_queue= NULL;
- for (tmp= list; tmp; tmp= tmp->next)
- queued_size-= tmp->event_size;
}
+
+ void dequeue2(size_t dequeue_size)
+ {
+ queued_size-= dequeue_size;
+ }
+
+ queued_event *get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli);
+ void free_qev(queued_event *qev);
+ rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
+ rpl_parallel_entry *e);
+ void free_rgi(rpl_group_info *rgi);
+ group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
+ void free_gco(group_commit_orderer *gco);
};
@@ -66,14 +129,16 @@ struct rpl_parallel_thread_pool {
rpl_parallel_thread_pool();
int init(uint32 size);
void destroy();
- struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry);
+ struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner,
+ rpl_parallel_entry *entry);
+ void release_thread(rpl_parallel_thread *rpt);
};
struct rpl_parallel_entry {
+ mysql_mutex_t LOCK_parallel_entry;
+ mysql_cond_t COND_parallel_entry;
uint32 domain_id;
- uint32 last_server_id;
- uint64 last_seq_no;
uint64 last_commit_id;
bool active;
/*
@@ -82,15 +147,41 @@ struct rpl_parallel_entry {
waiting for event groups to complete.
*/
bool force_abort;
+ /*
+ At STOP SLAVE (force_abort=true), we do not want to process all events in
+ the queue (which could unnecessarily delay stop, if a lot of events happen
+ to be queued). The stop_count provides a safe point at which to stop, so
+ that everything before becomes committed and nothing after does. The value
+ corresponds to group_commit_orderer::wait_count; if wait_count is less than
+ or equal to stop_count, we execute the associated event group, else we
+ skip it (and all following) and stop.
+ */
+ uint64 stop_count;
- rpl_parallel_thread *rpl_thread;
+ /*
+ Cyclic array recording the last rpl_thread_max worker threads that we
+ queued event for. This is used to limit how many workers a single domain
+ can occupy (--slave-domain-parallel-threads).
+
+ Note that workers are never explicitly deleted from the array. Instead,
+ we need to check (under LOCK_rpl_thread) that the thread still belongs
+ to us before re-using (rpl_thread::current_owner).
+ */
+ rpl_parallel_thread **rpl_threads;
+ uint32 rpl_thread_max;
+ uint32 rpl_thread_idx;
/*
The sub_id of the last transaction to commit within this domain_id.
Must be accessed under LOCK_parallel_entry protection.
+
+ Event groups commit in order, so the rpl_group_info for an event group
+ will be alive (at least) as long as
+ rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to
+ safely refer back to previous event groups if they are still executing,
+ and ignore them if they completed, without requiring explicit
+ synchronisation between the threads.
*/
uint64 last_committed_sub_id;
- mysql_mutex_t LOCK_parallel_entry;
- mysql_cond_t COND_parallel_entry;
/*
The sub_id of the last event group in this replication domain that was
queued for execution by a worker thread.
@@ -98,14 +189,29 @@ struct rpl_parallel_entry {
uint64 current_sub_id;
rpl_group_info *current_group_info;
/*
- The sub_id of the last event group in the previous batch of group-committed
- transactions.
-
- When we spawn parallel worker threads for the next group-committed batch,
- they first need to wait for this sub_id to be committed before it is safe
- to start executing them.
+ If we get an error in some event group, we set the sub_id of that event
+ group here. Then later event groups (with higher sub_id) can know not to
+ try to start (event groups that already started will be rolled back when
+ wait_for_prior_commit() returns error).
+ The value is ULONGLONG_MAX when no error occured.
+ */
+ uint64 stop_on_error_sub_id;
+ /* Total count of event groups queued so far. */
+ uint64 count_queued_event_groups;
+ /*
+ Count of event groups that have started (but not necessarily completed)
+ the commit phase. We use this to know when every event group in a previous
+ batch of master group commits have started committing on the slave, so
+ that it is safe to start executing the events in the following batch.
*/
- uint64 prev_groupcommit_sub_id;
+ uint64 count_committing_event_groups;
+ /* The group_commit_orderer object for the events currently being queued. */
+ group_commit_orderer *current_gco;
+
+ rpl_parallel_thread * choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+ PSI_stage_info *old_stage, bool reuse);
+ group_commit_orderer *get_gco();
+ void free_gco(group_commit_orderer *gco);
};
struct rpl_parallel {
HASH domain_hash;
@@ -116,7 +222,7 @@ struct rpl_parallel {
~rpl_parallel();
void reset();
rpl_parallel_entry *find(uint32 domain_id);
- void wait_for_done();
+ void wait_for_done(THD *thd);
bool workers_idle();
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
ulonglong event_size);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index c9e8c79d5fc..399852744f8 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1479,14 +1479,27 @@ end:
}
-rpl_group_info::rpl_group_info(Relay_log_info *rli_)
- : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
- wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
- tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
- is_parallel_exec(false), is_error(false),
- row_stmt_start_timestamp(0), long_find_row_note_printed(false)
+void
+rpl_group_info::reinit(Relay_log_info *rli)
+{
+ this->rli= rli;
+ tables_to_lock= NULL;
+ tables_to_lock_count= 0;
+ trans_retries= 0;
+ last_event_start_time= 0;
+ is_error= false;
+ row_stmt_start_timestamp= 0;
+ long_find_row_note_printed= false;
+ did_mark_start_commit= false;
+ commit_orderer.reinit();
+}
+
+rpl_group_info::rpl_group_info(Relay_log_info *rli)
+ : thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), parallel_entry(0),
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
+ reinit(rli);
bzero(&current_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
MY_MUTEX_INIT_FAST);
@@ -1710,4 +1723,40 @@ void rpl_group_info::slave_close_thread_tables(THD *thd)
}
+
+static void
+mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco)
+{
+ uint64 count= ++e->count_committing_event_groups;
+ if (gco->next_gco && gco->next_gco->wait_count == count)
+ mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer);
+}
+
+
+void
+rpl_group_info::mark_start_commit_no_lock()
+{
+ if (did_mark_start_commit)
+ return;
+ mark_start_commit_inner(parallel_entry, gco);
+ did_mark_start_commit= true;
+}
+
+
+void
+rpl_group_info::mark_start_commit()
+{
+ rpl_parallel_entry *e;
+
+ if (did_mark_start_commit)
+ return;
+
+ e= this->parallel_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ mark_start_commit_inner(e, gco);
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ did_mark_start_commit= true;
+}
+
+
#endif
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 3f95849a926..0ba259b0efd 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -481,6 +481,7 @@ private:
struct rpl_group_info
{
+ rpl_group_info *next; /* For free list in rpl_parallel_thread */
Relay_log_info *rli;
THD *thd;
/*
@@ -510,14 +511,15 @@ struct rpl_group_info
uint64 wait_commit_sub_id;
rpl_group_info *wait_commit_group_info;
/*
- If non-zero, the event group must wait for this sub_id to be committed
- before the execution of the event group is allowed to start.
+ This holds a pointer to a struct that keeps track of the need to wait
+ for the previous batch of event groups to reach the commit stage, before
+ this batch can start to execute.
(When we execute in parallel the transactions that group committed
together on the master, we still need to wait for any prior transactions
- to have commtted).
+ to have reached the commit stage).
*/
- uint64 wait_start_sub_id;
+ group_commit_orderer *gco;
struct rpl_parallel_entry *parallel_entry;
@@ -567,18 +569,22 @@ struct rpl_group_info
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
bool is_error;
+ /*
+ Set true when we signalled that we reach the commit phase. Used to avoid
+ counting one event group twice.
+ */
+ bool did_mark_start_commit;
-private:
/*
Runtime state for printing a note when slave is taking
too long while processing a row event.
*/
time_t row_stmt_start_timestamp;
bool long_find_row_note_printed;
-public:
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
+ void reinit(Relay_log_info *rli);
/*
Returns true if the argument event resides in the containter;
@@ -661,6 +667,8 @@ public:
void clear_tables_to_lock();
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
+ void mark_start_commit_no_lock();
+ void mark_start_commit();
time_t get_row_stmt_start_timestamp()
{
diff --git a/sql/scheduler.h b/sql/scheduler.h
index 4e200e86d74..06c17c7b114 100644
--- a/sql/scheduler.h
+++ b/sql/scheduler.h
@@ -99,7 +99,8 @@ public:
void *data; /* scheduler-specific data structure */
};
-#if !defined(EMBEDDED_LIBRARY)
+#undef HAVE_POOL_OF_THREADS
+#if !defined(EMBEDDED_LIBRARY) && !defined(_AIX)
#define HAVE_POOL_OF_THREADS 1
void pool_of_threads_scheduler(scheduler_functions* func,
ulong *arg_max_connections,
diff --git a/sql/slave.cc b/sql/slave.cc
index f74f5ec6267..25480da79a1 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -331,7 +331,7 @@ run_slave_init_thread()
pthread_t th;
slave_init_thread_running= true;
- if (mysql_thread_create(key_thread_slave_init, &th, NULL,
+ if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
handle_slave_init, NULL))
{
sql_print_error("Failed to create thread while initialising slave");
@@ -4542,7 +4542,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done();
+ rli->parallel.wait_for_done(thd);
/* Thread stopped. Print the current replication position to the log */
{
@@ -4568,7 +4568,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
get the correct position printed.)
*/
if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done();
+ rli->parallel.wait_for_done(thd);
/*
Some events set some playgrounds, which won't be cleared because thread
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 2be1b895f39..8a4a7006c63 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -6118,14 +6118,23 @@ bool THD::rgi_have_temporary_tables()
}
+void
+wait_for_commit::reinit()
+{
+ subsequent_commits_list= NULL;
+ next_subsequent_commit= NULL;
+ waitee= NULL;
+ opaque_pointer= NULL;
+ wakeup_error= 0;
+ wakeup_subsequent_commits_running= false;
+}
+
+
wait_for_commit::wait_for_commit()
- : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
- opaque_pointer(0),
- waiting_for_commit(false), wakeup_error(0),
- wakeup_subsequent_commits_running(false)
{
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
+ reinit();
}
@@ -6173,7 +6182,7 @@ wait_for_commit::wakeup(int wakeup_error)
*/
mysql_mutex_lock(&LOCK_wait_commit);
- waiting_for_commit= false;
+ waitee= NULL;
this->wakeup_error= wakeup_error;
/*
Note that it is critical that the mysql_cond_signal() here is done while
@@ -6205,9 +6214,8 @@ wait_for_commit::wakeup(int wakeup_error)
void
wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
{
- waiting_for_commit= true;
- wakeup_error= 0;
DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
+ wakeup_error= 0;
this->waitee= waitee;
mysql_mutex_lock(&waitee->LOCK_wait_commit);
@@ -6217,7 +6225,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
see comments on wakeup_subsequent_commits2() for details.
*/
if (waitee->wakeup_subsequent_commits_running)
- waiting_for_commit= false;
+ this->waitee= NULL;
else
{
/*
@@ -6247,9 +6255,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
thd->ENTER_COND(&COND_wait_commit, &LOCK_wait_commit,
&stage_waiting_for_prior_transaction_to_commit,
&old_stage);
- while (waiting_for_commit && !thd->check_killed())
+ while ((loc_waitee= this->waitee) && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- if (!waiting_for_commit)
+ if (!loc_waitee)
{
if (wakeup_error)
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
@@ -6262,7 +6270,6 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
waiter as to whether we succeed or fail (eg. we may roll back but waitee
might attempt to commit both us and any subsequent commits waiting for us).
*/
- loc_waitee= this->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
@@ -6271,21 +6278,29 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
do
{
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- } while (waiting_for_commit);
+ } while (this->waitee);
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
goto end;
}
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ this->waitee= NULL;
- DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
wakeup_error= thd->killed_errno();
if (!wakeup_error)
wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wakeup_error, ER(wakeup_error), MYF(0));
+ thd->EXIT_COND(&old_stage);
+ /*
+ Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to
+ use within enter_cond/exit_cond.
+ */
+ DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
+ return wakeup_error;
end:
thd->EXIT_COND(&old_stage);
- waitee= NULL;
return wakeup_error;
}
@@ -6368,10 +6383,11 @@ wait_for_commit::wakeup_subsequent_commits2(int wakeup_error)
void
wait_for_commit::unregister_wait_for_prior_commit2()
{
+ wait_for_commit *loc_waitee;
+
mysql_mutex_lock(&LOCK_wait_commit);
- if (waiting_for_commit)
+ if ((loc_waitee= this->waitee))
{
- wait_for_commit *loc_waitee= this->waitee;
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
if (loc_waitee->wakeup_subsequent_commits_running)
{
@@ -6383,7 +6399,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
See comments on wakeup_subsequent_commits2() for more details.
*/
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
- while (waiting_for_commit)
+ while (this->waitee)
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
}
else
@@ -6391,10 +6407,10 @@ wait_for_commit::unregister_wait_for_prior_commit2()
/* Remove ourselves from the list in the waitee. */
remove_from_list(&loc_waitee->subsequent_commits_list);
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
+ this->waitee= NULL;
}
}
mysql_mutex_unlock(&LOCK_wait_commit);
- this->waitee= NULL;
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index defc31547d0..ef28d860b04 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1656,8 +1656,8 @@ struct wait_for_commit
{
/*
The LOCK_wait_commit protects the fields subsequent_commits_list and
- wakeup_subsequent_commits_running (for a waitee), and the flag
- waiting_for_commit and associated COND_wait_commit (for a waiter).
+ wakeup_subsequent_commits_running (for a waitee), and the pointer
+ waiterr and associated COND_wait_commit (for a waiter).
*/
mysql_mutex_t LOCK_wait_commit;
mysql_cond_t COND_wait_commit;
@@ -1665,7 +1665,13 @@ struct wait_for_commit
wait_for_commit *subsequent_commits_list;
/* Link field for entries in subsequent_commits_list. */
wait_for_commit *next_subsequent_commit;
- /* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
+ /*
+ Our waitee, if we did register_wait_for_prior_commit(), and were not
+ yet woken up. Else NULL.
+
+ When this is cleared for wakeup, the COND_wait_commit condition is
+ signalled.
+ */
wait_for_commit *waitee;
/*
Generic pointer for use by the transaction coordinator to optimise the
@@ -1676,12 +1682,6 @@ struct wait_for_commit
used by another transaction coordinator for similar purposes.
*/
void *opaque_pointer;
- /*
- The waiting_for_commit flag is cleared when a waiter has been woken
- up. The COND_wait_commit condition is signalled when this has been
- cleared.
- */
- bool waiting_for_commit;
/* The wakeup error code from the waitee. 0 means no error. */
int wakeup_error;
/*
@@ -1697,10 +1697,14 @@ struct wait_for_commit
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
- if (waiting_for_commit)
+ if (waitee)
return wait_for_prior_commit2(thd);
else
+ {
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
return wakeup_error;
+ }
}
void wakeup_subsequent_commits(int wakeup_error)
{
@@ -1721,7 +1725,7 @@ struct wait_for_commit
}
void unregister_wait_for_prior_commit()
{
- if (waiting_for_commit)
+ if (waitee)
unregister_wait_for_prior_commit2();
}
/*
@@ -1741,7 +1745,7 @@ struct wait_for_commit
}
next_ptr_ptr= &cur->next_subsequent_commit;
}
- waiting_for_commit= false;
+ waitee= NULL;
}
void wakeup(int wakeup_error);
@@ -1752,6 +1756,7 @@ struct wait_for_commit
wait_for_commit();
~wait_for_commit();
+ void reinit();
};
diff --git a/sql/sql_select.cc b/sql/sql_select.cc
index 37e9c642dbb..06a48d9157b 100644
--- a/sql/sql_select.cc
+++ b/sql/sql_select.cc
@@ -18800,7 +18800,16 @@ end_update(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)),
for (group=table->group ; group ; group=group->next)
{
Item *item= *group->item;
- item->save_org_in_field(group->field);
+ if (group->fast_field_copier_setup != group->field)
+ {
+ DBUG_PRINT("info", ("new setup 0x%lx -> 0x%lx",
+ (ulong)group->fast_field_copier_setup,
+ (ulong)group->field));
+ group->fast_field_copier_setup= group->field;
+ group->fast_field_copier_func=
+ item->setup_fast_field_copier(group->field);
+ }
+ item->save_org_in_field(group->field, group->fast_field_copier_func);
/* Store in the used key if the field was 0 */
if (item->maybe_null)
group->buff[-1]= (char) group->field->is_null();
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index a9684947ba4..20784ce9301 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1768,6 +1768,49 @@ static Sys_var_ulong Sys_slave_parallel_threads(
ON_UPDATE(fix_slave_parallel_threads));
+static bool
+check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var)
+{
+ bool running;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ if (running)
+ return true;
+
+ return false;
+}
+
+static bool
+fix_slave_domain_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
+{
+ bool running;
+
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_lock(&LOCK_active_mi);
+ running= master_info_index->give_error_if_slave_running();
+ mysql_mutex_unlock(&LOCK_active_mi);
+ mysql_mutex_lock(&LOCK_global_system_variables);
+
+ return running ? true : false;
+}
+
+
+static Sys_var_ulong Sys_slave_domain_parallel_threads(
+ "slave_domain_parallel_threads",
+ "Maximum number of parallel threads to use on slave for events in a "
+ "single replication domain. When using multiple domains, this can be "
+ "used to limit a single domain from grabbing all threads and thus "
+ "stalling other domains. The default of 0 means to allow a domain to "
+ "grab as many threads as it wants, up to the value of "
+ "slave_parallel_threads.",
+ GLOBAL_VAR(opt_slave_domain_parallel_threads), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG, ON_CHECK(check_slave_domain_parallel_threads),
+ ON_UPDATE(fix_slave_domain_parallel_threads));
+
+
static Sys_var_ulong Sys_slave_parallel_max_queued(
"slave_parallel_max_queued",
"Limit on how much memory SQL threads should use per parallel "
diff --git a/sql/table.h b/sql/table.h
index e3290410657..e1e66898a6d 100644
--- a/sql/table.h
+++ b/sql/table.h
@@ -194,10 +194,20 @@ private:
/* Order clause list element */
+typedef int (*fast_field_copier)(Field *to, Field *from);
+
+
typedef struct st_order {
struct st_order *next;
Item **item; /* Point at item in select fields */
Item *item_ptr; /* Storage for initial item */
+ /*
+ Reference to the function we are trying to optimize copy to
+ a temporary table
+ */
+ fast_field_copier fast_field_copier_func;
+ /* Field for which above optimizer function setup */
+ Field *fast_field_copier_setup;
int counter; /* position in SELECT list, correct
only if counter_used is true*/
bool asc; /* true if ascending */
diff --git a/sql/threadpool_unix.cc b/sql/threadpool_unix.cc
index f0454cfedb0..68c032fb67b 100644
--- a/sql/threadpool_unix.cc
+++ b/sql/threadpool_unix.cc
@@ -19,6 +19,9 @@
#include <sql_class.h>
#include <my_pthread.h>
#include <scheduler.h>
+
+#ifdef HAVE_POOL_OF_THREADS
+
#include <sql_connect.h>
#include <mysqld.h>
#include <debug_sync.h>
@@ -1678,3 +1681,5 @@ static void print_pool_blocked_message(bool max_threads_reached)
msg_written= true;
}
}
+
+#endif /* HAVE_POOL_OF_THREADS */
diff --git a/storage/connect/os.h b/storage/connect/os.h
index e3d452bf7b8..8e94f4241bb 100644
--- a/storage/connect/os.h
+++ b/storage/connect/os.h
@@ -9,6 +9,12 @@ typedef off_t off64_t;
#define O_LARGEFILE 0
#endif
+#ifdef _AIX
+#ifndef O_LARGEFILE
+#define O_LARGEFILE 0
+#endif
+#endif
+
#if defined(WIN32)
typedef __int64 BIGINT;
#else // !WIN32
diff --git a/storage/oqgraph/ha_oqgraph.h b/storage/oqgraph/ha_oqgraph.h
index 2a998425c27..14490270031 100644
--- a/storage/oqgraph/ha_oqgraph.h
+++ b/storage/oqgraph/ha_oqgraph.h
@@ -115,6 +115,20 @@ public:
virtual const char *table_type() const { return hton_name(ht)->str; }
#endif
+ my_bool register_query_cache_table(THD *thd, char *table_key,
+ uint key_length,
+ qc_engine_callback
+ *engine_callback,
+ ulonglong *engine_data)
+ {
+ /*
+ Do not put data from OQGRAPH tables into query cache (because there
+ is no way to tell whether the data in the backing table has changed or
+ not)
+ */
+ return FALSE;
+ }
+
private:
int oqgraph_check_table_structure (TABLE *table_arg);
diff --git a/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.opt b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.opt
new file mode 100644
index 00000000000..a4548161f9b
--- /dev/null
+++ b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.opt
@@ -0,0 +1 @@
+--query_cache_type=ON
diff --git a/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.result b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.result
new file mode 100644
index 00000000000..efe520e16a5
--- /dev/null
+++ b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.result
@@ -0,0 +1,41 @@
+DROP TABLE IF EXISTS graph_base;
+DROP TABLE IF EXISTS graph;
+CREATE TABLE graph_base (
+from_id INT UNSIGNED NOT NULL,
+to_id INT UNSIGNED NOT NULL,
+PRIMARY KEY (from_id,to_id),
+INDEX (to_id)
+) ENGINE=MyISAM;
+CREATE TABLE graph (
+latch VARCHAR(32) NULL,
+origid BIGINT UNSIGNED NULL,
+destid BIGINT UNSIGNED NULL,
+weight DOUBLE NULL,
+seq BIGINT UNSIGNED NULL,
+linkid BIGINT UNSIGNED NULL,
+KEY (latch, origid, destid) USING HASH,
+KEY (latch, destid, origid) USING HASH
+) ENGINE=OQGRAPH DATA_TABLE='graph_base' ORIGID='from_id', DESTID='to_id';
+INSERT INTO graph_base(from_id, to_id) VALUES (1,2), (2,1);
+SET @query_cache_size.saved = @@query_cache_size;
+SET GLOBAL query_cache_size = 1024*1024;
+SELECT * FROM graph;
+latch origid destid weight seq linkid
+NULL 1 2 1 NULL NULL
+NULL 2 1 1 NULL NULL
+UPDATE graph_base SET to_id = 20 WHERE from_id = 1;
+SELECT * FROM graph;
+latch origid destid weight seq linkid
+NULL 1 20 1 NULL NULL
+NULL 2 1 1 NULL NULL
+SELECT SQL_NO_CACHE * FROM graph;
+latch origid destid weight seq linkid
+NULL 1 20 1 NULL NULL
+NULL 2 1 1 NULL NULL
+SET GLOBAL query_cache_size = 0;
+SELECT SQL_NO_CACHE * FROM graph;
+latch origid destid weight seq linkid
+NULL 1 20 1 NULL NULL
+NULL 2 1 1 NULL NULL
+DROP TABLE graph_base, graph;
+SET GLOBAL query_cache_size = @query_cache_size.saved;
diff --git a/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.test b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.test
new file mode 100644
index 00000000000..9ba30ee2f76
--- /dev/null
+++ b/storage/oqgraph/mysql-test/oqgraph/regression_mdev5744.test
@@ -0,0 +1,48 @@
+# Regression test for https://mariadb.atlassian.net/browse/MDEV-5744
+#--reproduce bug where changes to backing table data are not reflected
+# in a graph table due to query caching
+
+--disable_warnings
+DROP TABLE IF EXISTS graph_base;
+DROP TABLE IF EXISTS graph;
+--enable_warnings
+
+# Create the backing store
+CREATE TABLE graph_base (
+ from_id INT UNSIGNED NOT NULL,
+ to_id INT UNSIGNED NOT NULL,
+ PRIMARY KEY (from_id,to_id),
+ INDEX (to_id)
+ ) ENGINE=MyISAM;
+
+
+CREATE TABLE graph (
+ latch VARCHAR(32) NULL,
+ origid BIGINT UNSIGNED NULL,
+ destid BIGINT UNSIGNED NULL,
+ weight DOUBLE NULL,
+ seq BIGINT UNSIGNED NULL,
+ linkid BIGINT UNSIGNED NULL,
+ KEY (latch, origid, destid) USING HASH,
+ KEY (latch, destid, origid) USING HASH
+ ) ENGINE=OQGRAPH DATA_TABLE='graph_base' ORIGID='from_id', DESTID='to_id';
+
+
+INSERT INTO graph_base(from_id, to_id) VALUES (1,2), (2,1);
+
+SET @query_cache_size.saved = @@query_cache_size;
+SET GLOBAL query_cache_size = 1024*1024;
+
+SELECT * FROM graph;
+UPDATE graph_base SET to_id = 20 WHERE from_id = 1;
+
+SELECT * FROM graph;
+SELECT SQL_NO_CACHE * FROM graph;
+
+SET GLOBAL query_cache_size = 0;
+
+SELECT SQL_NO_CACHE * FROM graph;
+
+DROP TABLE graph_base, graph;
+SET GLOBAL query_cache_size = @query_cache_size.saved;
+
diff --git a/storage/perfschema/table_events_waits.cc b/storage/perfschema/table_events_waits.cc
index 8fb7ca91c44..35b90816713 100644
--- a/storage/perfschema/table_events_waits.cc
+++ b/storage/perfschema/table_events_waits.cc
@@ -315,11 +315,15 @@ int table_events_waits_common::make_socket_object_columns(volatile PFS_events_wa
uint port;
char port_str[128];
char ip_str[INET6_ADDRSTRLEN+1];
- uint ip_len= 0;
+ /*
+ "ip_length" was "ip_len" originally.
+ but it conflicted with some macro on AIX. Renamed.
+ */
+ uint ip_length= 0;
port_str[0]= ':';
/* Get the IP address and port number */
- ip_len= pfs_get_socket_address(ip_str, sizeof(ip_str), &port,
+ ip_length= pfs_get_socket_address(ip_str, sizeof(ip_str), &port,
&safe_socket->m_sock_addr,
safe_socket->m_addr_len);
@@ -327,15 +331,15 @@ int table_events_waits_common::make_socket_object_columns(volatile PFS_events_wa
int port_len= int10_to_str(port, (port_str+1), 10) - port_str + 1;
/* OBJECT NAME */
- m_row.m_object_name_length= ip_len + port_len;
+ m_row.m_object_name_length= ip_length + port_len;
if (unlikely((m_row.m_object_name_length == 0) ||
(m_row.m_object_name_length > sizeof(m_row.m_object_name))))
return 1;
char *name= m_row.m_object_name;
- memcpy(name, ip_str, ip_len);
- memcpy(name + ip_len, port_str, port_len);
+ memcpy(name, ip_str, ip_length);
+ memcpy(name + ip_length, port_str, port_len);
}
else
{
diff --git a/storage/xtradb/CMakeLists.txt b/storage/xtradb/CMakeLists.txt
index 0c63afd744e..0aa177c6657 100644
--- a/storage/xtradb/CMakeLists.txt
+++ b/storage/xtradb/CMakeLists.txt
@@ -237,7 +237,7 @@ ENDIF()
IF(MSVC)
ADD_DEFINITIONS(-DHAVE_WINDOWS_ATOMICS)
- #SET(XTRADB_OK 1)
+ SET(XTRADB_OK 1)
# Avoid "unreferenced label" warning in generated file
GET_FILENAME_COMPONENT(_SRC_DIR ${CMAKE_CURRENT_LIST_FILE} PATH)
diff --git a/storage/xtradb/include/sync0sync.h b/storage/xtradb/include/sync0sync.h
index 03914741f96..19cfcddd1f5 100644
--- a/storage/xtradb/include/sync0sync.h
+++ b/storage/xtradb/include/sync0sync.h
@@ -199,10 +199,10 @@ necessary only if the memory block containing it is freed. */
pfs_mutex_enter_nowait_func((M), __FILE__, __LINE__)
# define mutex_enter_first(M) \
- pfs_mutex_enter_func((M), __FILE__, __LINE__, HIGH_PRIO)
+ pfs_mutex_enter_func((M), __FILE__, __LINE__, IB_HIGH_PRIO)
# define mutex_enter_last(M) \
- pfs_mutex_enter_func((M), __FILE__, __LINE__, LOW_PRIO)
+ pfs_mutex_enter_func((M), __FILE__, __LINE__, IB_LOW_PRIO)
# define mutex_exit(M) pfs_mutex_exit_func(M)
@@ -231,10 +231,10 @@ original non-instrumented functions */
mutex_enter_nowait_func((M), __FILE__, __LINE__)
# define mutex_enter_first(M) \
- mutex_enter_func((M), __FILE__, __LINE__, HIGH_PRIO)
+ mutex_enter_func((M), __FILE__, __LINE__, IB_HIGH_PRIO)
# define mutex_enter_last(M) \
- mutex_enter_func((M), __FILE__, __LINE__, LOW_PRIO)
+ mutex_enter_func((M), __FILE__, __LINE__, IB_LOW_PRIO)
# define mutex_exit(M) mutex_exit_func(M)
@@ -326,8 +326,8 @@ directly. Locks a priority mutex for the current thread. If the mutex is
reserved the function spins a preset time (controlled by SYNC_SPIN_ROUNDS)
waiting for the mutex before suspending the thread. If the thread is suspended,
the priority argument value determines the relative order for its wake up. Any
-HIGH_PRIO waiters will be woken up before any LOW_PRIO waiters. In case of
-DEFAULT_PRIO, the relative priority will be set according to
+IB_HIGH_PRIO waiters will be woken up before any IB_LOW_PRIO waiters. In case of
+IB_DEFAULT_PRIO, the relative priority will be set according to
srv_current_thread_priority. */
UNIV_INLINE
void
@@ -337,7 +337,7 @@ mutex_enter_func(
const char* file_name, /*!< in: file name where
locked */
ulint line, /*!< in: line where locked */
- enum ib_sync_priority priority = DEFAULT_PRIO);
+ enum ib_sync_priority priority = IB_DEFAULT_PRIO);
/*!<in: mutex acquisition
priority */
/********************************************************************//**
@@ -454,7 +454,7 @@ pfs_mutex_enter_func(
const char* file_name, /*!< in: file name where
locked */
ulint line, /*!< in: line where locked */
- enum ib_sync_priority priority = DEFAULT_PRIO);
+ enum ib_sync_priority priority = IB_DEFAULT_PRIO);
/*!<in: mutex acquisition
priority */
/********************************************************************//**
diff --git a/storage/xtradb/include/sync0sync.ic b/storage/xtradb/include/sync0sync.ic
index 4a1707654cb..d6a95156ff4 100644
--- a/storage/xtradb/include/sync0sync.ic
+++ b/storage/xtradb/include/sync0sync.ic
@@ -277,8 +277,8 @@ directly. Locks a priority mutex for the current thread. If the mutex is
reserved the function spins a preset time (controlled by SYNC_SPIN_ROUNDS)
waiting for the mutex before suspending the thread. If the thread is suspended,
the priority argument value determines the relative order for its wake up. Any
-HIGH_PRIO waiters will be woken up before any LOW_PRIO waiters. In case of
-DEFAULT_PRIO, the relative priority will be set according to
+IB_HIGH_PRIO waiters will be woken up before any IB_LOW_PRIO waiters. In case
+of IB_DEFAULT_PRIO, the relative priority will be set according to
srv_current_thread_priority. */
UNIV_INLINE
void
@@ -308,10 +308,10 @@ mutex_enter_func(
return; /* Succeeded! */
}
- if (UNIV_LIKELY(priority == DEFAULT_PRIO)) {
+ if (UNIV_LIKELY(priority == IB_DEFAULT_PRIO)) {
high_priority = srv_current_thread_priority;
} else {
- high_priority = (priority == HIGH_PRIO);
+ high_priority = (priority == IB_HIGH_PRIO);
}
mutex_spin_wait(mutex, high_priority, file_name, line);
}
diff --git a/storage/xtradb/include/sync0types.h b/storage/xtradb/include/sync0types.h
index 67f613ab8ae..04baaa0339d 100644
--- a/storage/xtradb/include/sync0types.h
+++ b/storage/xtradb/include/sync0types.h
@@ -36,9 +36,9 @@ struct ib_prio_mutex_t;
/** Priority mutex and rwlatch acquisition priorities */
enum ib_sync_priority {
- DEFAULT_PRIO,
- LOW_PRIO,
- HIGH_PRIO
+ IB_DEFAULT_PRIO,
+ IB_LOW_PRIO,
+ IB_HIGH_PRIO
};
#endif