From c47fe0e9db7f3f111618a6a6e488aaf8257b231a Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Mon, 9 Mar 2015 13:06:32 +0100 Subject: MDEV-7668: Intermediate master groups CREATE TEMPORARY with INSERT, causing parallel replication failure [This commit cherry-picked to be able to merge MDEV-7936, of which it is a pre-requisite, into both 10.0 and 10.1.] Parallel replication depends on locking (table locks, row locks, etc.) to prevent two conflicting transactions from running and committing in parallel. But temporary tables are designed to be visible only to one thread, and have no such locking. In the concrete issue, an intermediate master could commit a CREATE TEMPORARY TABLE in the same group commit as in INSERT into that table. Thus, a lower-level master could attempt to run them in parallel and get an error. More generally, we need protection from parallel replication trying to run transactions in parallel that access a common temporary table. This patch simply causes use of a temporary table from parallel replication to wait for all previous transactions to commit, serialising the replication at that point. (A more fine-grained locking could be added later, possibly. However, using temporary tables in statement-based replication is in any case normally undesirable; for example a restart of the server will lose temporary tables and can break replication). Note that row-based replication is not affected, as it does not do any temporary tables on the slave-side. This patch also cleans up the locking around protecting the list of temporary tables in Relay_log_info. This used to take the rli->data_lock at the end of every statement, which is very bad for concurrency. With this patch, the lock is not taken unless temporary tables (with statement-based binlogging) are in use on the slave. --- .../suite/rpl/r/rpl_parallel_multilevel2.result | 52 ++++++++++++++ .../suite/rpl/t/rpl_parallel_multilevel2.cnf | 17 +++++ .../suite/rpl/t/rpl_parallel_multilevel2.test | 80 ++++++++++++++++++++++ sql/sql_base.cc | 56 +++++++++++---- 4 files changed, 192 insertions(+), 13 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result create mode 100644 mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf create mode 100644 mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test diff --git a/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result b/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result new file mode 100644 index 00000000000..47bf2ff887f --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result @@ -0,0 +1,52 @@ +include/rpl_init.inc [topology=1->2->3] +*** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure *** +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +include/stop_slave.inc +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count; +SET GLOBAL binlog_commit_wait_count=2; +SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec; +SET GLOBAL binlog_commit_wait_usec=2000000; +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +CHANGE MASTER TO master_use_gtid=current_pos; +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=current_pos; +BEGIN; +CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY; +COMMIT; +INSERT INTO t2 VALUES (1); +INSERT INTO t1 SELECT a, a*10 FROM t2; +DROP TABLE t2; +include/save_master_gtid.inc +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a b +1 10 +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a b +1 10 +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +SET GLOBAL binlog_commit_wait_count=@old_commit_count; +SET GLOBAL binlog_commit_wait_usec=@old_commit_usec; +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +include/start_slave.inc +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +include/start_slave.inc +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table"); +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf new file mode 100644 index 00000000000..4e1d3878f03 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf @@ -0,0 +1,17 @@ +!include ../my.cnf + +[mysqld.1] +log-slave-updates +loose-innodb + +[mysqld.2] +log-slave-updates +loose-innodb + +[mysqld.3] +log-slave-updates +loose-innodb + +[ENV] +SERVER_MYPORT_3= @mysqld.3.port +SERVER_MYSOCK_3= @mysqld.3.socket diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test new file mode 100644 index 00000000000..4125394ef80 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test @@ -0,0 +1,80 @@ +--source include/have_innodb.inc +--let $rpl_topology=1->2->3 +--source include/rpl_init.inc + +--echo *** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure *** + +--connection server_1 +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +--save_master_pos + +--connection server_2 +--sync_with_master +--save_master_pos +--source include/stop_slave.inc +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count; +SET GLOBAL binlog_commit_wait_count=2; +SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec; +SET GLOBAL binlog_commit_wait_usec=2000000; +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +CHANGE MASTER TO master_use_gtid=current_pos; + +--connection server_3 +--sync_with_master +--save_master_pos +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=current_pos; + + +--connection server_1 + +BEGIN; +CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY; +COMMIT; +INSERT INTO t2 VALUES (1); +INSERT INTO t1 SELECT a, a*10 FROM t2; +DROP TABLE t2; +--source include/save_master_gtid.inc + +--connection server_2 +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + +--connection server_3 +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + + +# Clean up + +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +SET GLOBAL binlog_commit_wait_count=@old_commit_count; +SET GLOBAL binlog_commit_wait_usec=@old_commit_usec; +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +--source include/start_slave.inc + +--connection server_3 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +--source include/start_slave.inc + +--connection server_1 +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table"); +DROP TABLE t1; + +--source include/rpl_end.inc diff --git a/sql/sql_base.cc b/sql/sql_base.cc index d4fb2a8b8bb..9aa9d1e0266 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -661,20 +661,23 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd) DBUG_VOID_RETURN; } - thd->lock_temporary_tables(); - for (TABLE *table= thd->temporary_tables ; table ; table= table->next) + if (thd->temporary_tables) { - if ((table->query_id == thd->query_id) && ! table->open_by_handler) - mark_tmp_table_for_reuse(table); - } - thd->unlock_temporary_tables(); - if (thd->rgi_slave) - { - /* - Temporary tables are shared with other by sql execution threads. - As a safety messure, clear the pointer to the common area. - */ - thd->temporary_tables= 0; + thd->lock_temporary_tables(); + for (TABLE *table= thd->temporary_tables ; table ; table= table->next) + { + if ((table->query_id == thd->query_id) && ! table->open_by_handler) + mark_tmp_table_for_reuse(table); + } + thd->unlock_temporary_tables(); + if (thd->rgi_slave) + { + /* + Temporary tables are shared with other by sql execution threads. + As a safety messure, clear the pointer to the common area. + */ + thd->temporary_tables= 0; + } } DBUG_VOID_RETURN; } @@ -5586,6 +5589,14 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton, (uint) thd->variables.server_id, (ulong) thd->variables.pseudo_thread_id)); + if (add_to_temporary_tables_list) + { + /* Temporary tables are not safe for parallel replication. */ + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_prior_commit()) + return NULL; + } + /* Create the cache_key for temporary tables */ key_length= create_tmp_table_def_key(thd, cache_key, db, table_name); @@ -5822,6 +5833,25 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl) DBUG_RETURN(FALSE); } + /* + Temporary tables are not safe for parallel replication. They were + designed to be visible to one thread only, so have no table locking. + Thus there is no protection against two conflicting transactions + committing in parallel and things like that. + + So for now, anything that uses temporary tables will be serialised + with anything before it, when using parallel replication. + + ToDo: We might be able to introduce a reference count or something + on temp tables, and have slave worker threads wait for it to reach + zero before being allowed to use the temp table. Might not be worth + it though, as statement-based replication using temporary tables is + in any case rather fragile. + */ + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_prior_commit()) + DBUG_RETURN(true); + #ifdef WITH_PARTITION_STORAGE_ENGINE if (tl->partition_names) { -- cgit v1.2.1 From 60d094aeacb0c1433f2510e0228b2791745ae53e Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Mon, 13 Apr 2015 09:52:56 +0200 Subject: MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode Make sure that in parallel replication, we execute wait_for_prior_commit() before setting table->in_use for a temporary table. Otherwise we can end up with two parallel replication worker threads competing with each other for use of a temporary table. Re-factor the use of find_temporary_table() to be able to handle errors in the caller (as wait_for_prior_commit() can return error in case of deadlock kill). --- .../suite/rpl/r/rpl_parallel_temptable.result | 20 ++++ mysql-test/suite/rpl/t/rpl_parallel_temptable.test | 23 +++++ sql/sql_base.cc | 106 +++++++++++++-------- sql/sql_base.h | 4 + sql/sql_table.cc | 4 +- 5 files changed, 116 insertions(+), 41 deletions(-) diff --git a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result index 61eba2cab2f..290a10f7e03 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result +++ b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result @@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables'; Variable_name Value Slave_open_temp_tables 0 FLUSH LOGS; +*** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode *** +CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB; +SET @old_dbug= @@SESSION.debug_dbug; +SET SESSION debug_dbug="+d,binlog_force_commit_id"; +SET @commit_id= 10000; +INSERT INTO t4 VALUES (30); +INSERT INTO t4 VALUES (31); +SET SESSION debug_dbug= @old_dbug; +INSERT INTO t1 SELECT a, "conservative" FROM t4; +DROP TEMPORARY TABLE t4; +SELECT * FROM t1 WHERE a >= 30 ORDER BY a; +a b +30 conservative +31 conservative +include/save_master_pos.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 30 ORDER BY a; +a b +30 conservative +31 conservative include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test index b13fa5a01b1..5dffbf18498 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test +++ b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test @@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables'; FLUSH LOGS; +--echo *** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode *** + +--connection server_1 +CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB; +SET @old_dbug= @@SESSION.debug_dbug; +SET SESSION debug_dbug="+d,binlog_force_commit_id"; +SET @commit_id= 10000; +INSERT INTO t4 VALUES (30); +INSERT INTO t4 VALUES (31); +SET SESSION debug_dbug= @old_dbug; +INSERT INTO t1 SELECT a, "conservative" FROM t4; +DROP TEMPORARY TABLE t4; +SELECT * FROM t1 WHERE a >= 30 ORDER BY a; +--source include/save_master_pos.inc + +--connection server_2 +--source include/sync_with_master_gtid.inc + +SELECT * FROM t1 WHERE a >= 30 ORDER BY a; + + +# Clean up. + --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 9aa9d1e0266..02880f1c621 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -1550,6 +1550,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl) } +static bool +use_temporary_table(THD *thd, TABLE *table, TABLE **out_table) +{ + *out_table= table; + if (!table) + return false; + /* + Temporary tables are not safe for parallel replication. They were + designed to be visible to one thread only, so have no table locking. + Thus there is no protection against two conflicting transactions + committing in parallel and things like that. + + So for now, anything that uses temporary tables will be serialised + with anything before it, when using parallel replication. + + ToDo: We might be able to introduce a reference count or something + on temp tables, and have slave worker threads wait for it to reach + zero before being allowed to use the temp table. Might not be worth + it though, as statement-based replication using temporary tables is + in any case rather fragile. + */ + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_prior_commit()) + return true; + /* + We need to set the THD as it may be different in case of + parallel replication + */ + if (table->in_use != thd) + { + table->in_use= thd; +#ifdef REMOVE_AFTER_MERGE_WITH_10 + if (thd->rgi_slave) + { + /* + We may be stealing an opened temporary tables from one slave + thread to another, we need to let the performance schema know that, + for aggregates per thread to work properly. + */ + table->file->unbind_psi(); + table->file->rebind_psi(); + } +#endif + } + return false; +} + +bool +find_and_use_temporary_table(THD *thd, const char *db, const char *table_name, + TABLE **out_table) +{ + return use_temporary_table(thd, find_temporary_table(thd, db, table_name), + out_table); +} + + +bool +find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, TABLE **out_table) +{ + return use_temporary_table(thd, find_temporary_table(thd, tl), out_table); +} + + /** Find a temporary table specified by a key in the THD::temporary_tables list. @@ -1570,26 +1633,6 @@ TABLE *find_temporary_table(THD *thd, if (table->s->table_cache_key.length == table_key_length && !memcmp(table->s->table_cache_key.str, table_key, table_key_length)) { - /* - We need to set the THD as it may be different in case of - parallel replication - */ - if (table->in_use != thd) - { - table->in_use= thd; -#ifdef REMOVE_AFTER_MERGE_WITH_10 - if (thd->rgi_slave) - { - /* - We may be stealing an opened temporary tables from one slave - thread to another, we need to let the performance schema know that, - for aggregates per thread to work properly. - */ - table->file->unbind_psi(); - table->file->rebind_psi(); - } -#endif - } result= table; break; } @@ -5822,7 +5865,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl) DBUG_RETURN(FALSE); } - if (!(table= find_temporary_table(thd, tl))) + if (find_and_use_temporary_table(thd, tl, &table)) + DBUG_RETURN(TRUE); + if (!table) { if (tl->open_type == OT_TEMPORARY_ONLY && tl->open_strategy == TABLE_LIST::OPEN_NORMAL) @@ -5833,25 +5878,6 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl) DBUG_RETURN(FALSE); } - /* - Temporary tables are not safe for parallel replication. They were - designed to be visible to one thread only, so have no table locking. - Thus there is no protection against two conflicting transactions - committing in parallel and things like that. - - So for now, anything that uses temporary tables will be serialised - with anything before it, when using parallel replication. - - ToDo: We might be able to introduce a reference count or something - on temp tables, and have slave worker threads wait for it to reach - zero before being allowed to use the temp table. Might not be worth - it though, as statement-based replication using temporary tables is - in any case rather fragile. - */ - if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && - thd->wait_for_prior_commit()) - DBUG_RETURN(true); - #ifdef WITH_PARTITION_STORAGE_ENGINE if (tl->partition_names) { diff --git a/sql/sql_base.h b/sql/sql_base.h index 8a0a1e42500..a6d90199860 100644 --- a/sql/sql_base.h +++ b/sql/sql_base.h @@ -148,7 +148,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table, const char *db_name, const char *table_name); TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name); +bool find_and_use_temporary_table(THD *thd, const char *db, + const char *table_name, TABLE **out_table); TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl); +bool find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, + TABLE **out_table); TABLE *find_temporary_table(THD *thd, const char *table_key, uint table_key_length); void close_thread_tables(THD *thd); diff --git a/sql/sql_table.cc b/sql/sql_table.cc index 48aac779215..46591e8b5c1 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -4687,7 +4687,9 @@ int create_table_impl(THD *thd, if (create_info->tmp_table()) { TABLE *tmp_table; - if ((tmp_table= find_temporary_table(thd, db, table_name))) + if (find_and_use_temporary_table(thd, db, table_name, &tmp_table)) + goto err; + if (tmp_table) { bool table_creation_was_logged= tmp_table->s->table_creation_was_logged; if (create_info->options & HA_LEX_CREATE_REPLACE) -- cgit v1.2.1