diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-04-13 14:28:07 +0200 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-04-13 14:28:07 +0200 |
commit | 2de8db6296fb5d5ea90f646b465b6afe33c32286 (patch) | |
tree | f0a149395b2ddc9a6c8d62ac0a0425dd692597a0 | |
parent | 8a01a0acb332c36c8e1c63027fb9e65587ed2784 (diff) | |
parent | 60d094aeacb0c1433f2510e0228b2791745ae53e (diff) | |
download | mariadb-git-2de8db6296fb5d5ea90f646b465b6afe33c32286.tar.gz |
Merge MDEV-7936 into 10.1
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result | 52 | ||||
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel_temptable.result | 20 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf | 17 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test | 80 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel_temptable.test | 23 | ||||
-rw-r--r-- | sql/sql_base.cc | 124 | ||||
-rw-r--r-- | sql/sql_base.h | 4 | ||||
-rw-r--r-- | sql/sql_table.cc | 4 |
8 files changed, 289 insertions, 35 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result b/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result new file mode 100644 index 00000000000..47bf2ff887f --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result @@ -0,0 +1,52 @@ +include/rpl_init.inc [topology=1->2->3] +*** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure *** +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +include/stop_slave.inc +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count; +SET GLOBAL binlog_commit_wait_count=2; +SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec; +SET GLOBAL binlog_commit_wait_usec=2000000; +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +CHANGE MASTER TO master_use_gtid=current_pos; +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=current_pos; +BEGIN; +CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY; +COMMIT; +INSERT INTO t2 VALUES (1); +INSERT INTO t1 SELECT a, a*10 FROM t2; +DROP TABLE t2; +include/save_master_gtid.inc +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a b +1 10 +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a b +1 10 +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +SET GLOBAL binlog_commit_wait_count=@old_commit_count; +SET GLOBAL binlog_commit_wait_usec=@old_commit_usec; +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +include/start_slave.inc +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +include/start_slave.inc +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table"); +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result index 61eba2cab2f..290a10f7e03 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result +++ b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result @@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables'; Variable_name Value Slave_open_temp_tables 0 FLUSH LOGS; +*** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode *** +CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB; +SET @old_dbug= @@SESSION.debug_dbug; +SET SESSION debug_dbug="+d,binlog_force_commit_id"; +SET @commit_id= 10000; +INSERT INTO t4 VALUES (30); +INSERT INTO t4 VALUES (31); +SET SESSION debug_dbug= @old_dbug; +INSERT INTO t1 SELECT a, "conservative" FROM t4; +DROP TEMPORARY TABLE t4; +SELECT * FROM t1 WHERE a >= 30 ORDER BY a; +a b +30 conservative +31 conservative +include/save_master_pos.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 30 ORDER BY a; +a b +30 conservative +31 conservative include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf new file mode 100644 index 00000000000..4e1d3878f03 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf @@ -0,0 +1,17 @@ +!include ../my.cnf + +[mysqld.1] +log-slave-updates +loose-innodb + +[mysqld.2] +log-slave-updates +loose-innodb + +[mysqld.3] +log-slave-updates +loose-innodb + +[ENV] +SERVER_MYPORT_3= @mysqld.3.port +SERVER_MYSOCK_3= @mysqld.3.socket diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test new file mode 100644 index 00000000000..4125394ef80 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test @@ -0,0 +1,80 @@ +--source include/have_innodb.inc +--let $rpl_topology=1->2->3 +--source include/rpl_init.inc + +--echo *** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure *** + +--connection server_1 +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +--save_master_pos + +--connection server_2 +--sync_with_master +--save_master_pos +--source include/stop_slave.inc +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count; +SET GLOBAL binlog_commit_wait_count=2; +SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec; +SET GLOBAL binlog_commit_wait_usec=2000000; +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +CHANGE MASTER TO master_use_gtid=current_pos; + +--connection server_3 +--sync_with_master +--save_master_pos +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=current_pos; + + +--connection server_1 + +BEGIN; +CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY; +COMMIT; +INSERT INTO t2 VALUES (1); +INSERT INTO t1 SELECT a, a*10 FROM t2; +DROP TABLE t2; +--source include/save_master_gtid.inc + +--connection server_2 +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + +--connection server_3 +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + + +# Clean up + +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +SET GLOBAL binlog_commit_wait_count=@old_commit_count; +SET GLOBAL binlog_commit_wait_usec=@old_commit_usec; +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +--source include/start_slave.inc + +--connection server_3 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +--source include/start_slave.inc + +--connection server_1 +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table"); +DROP TABLE t1; + +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test index b13fa5a01b1..5dffbf18498 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test +++ b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test @@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables'; FLUSH LOGS; +--echo *** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode *** + +--connection server_1 +CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB; +SET @old_dbug= @@SESSION.debug_dbug; +SET SESSION debug_dbug="+d,binlog_force_commit_id"; +SET @commit_id= 10000; +INSERT INTO t4 VALUES (30); +INSERT INTO t4 VALUES (31); +SET SESSION debug_dbug= @old_dbug; +INSERT INTO t1 SELECT a, "conservative" FROM t4; +DROP TEMPORARY TABLE t4; +SELECT * FROM t1 WHERE a >= 30 ORDER BY a; +--source include/save_master_pos.inc + +--connection server_2 +--source include/sync_with_master_gtid.inc + +SELECT * FROM t1 WHERE a >= 30 ORDER BY a; + + +# Clean up. + --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; diff --git a/sql/sql_base.cc b/sql/sql_base.cc index b4c3e5ecafd..0e9f668722d 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -704,20 +704,23 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd) DBUG_VOID_RETURN; } - thd->lock_temporary_tables(); - for (TABLE *table= thd->temporary_tables ; table ; table= table->next) - { - if ((table->query_id == thd->query_id) && ! table->open_by_handler) - mark_tmp_table_for_reuse(table); - } - thd->unlock_temporary_tables(); - if (thd->rgi_slave) + if (thd->temporary_tables) { - /* - Temporary tables are shared with other by sql execution threads. - As a safety messure, clear the pointer to the common area. - */ - thd->temporary_tables= 0; + thd->lock_temporary_tables(); + for (TABLE *table= thd->temporary_tables ; table ; table= table->next) + { + if ((table->query_id == thd->query_id) && ! table->open_by_handler) + mark_tmp_table_for_reuse(table); + } + thd->unlock_temporary_tables(); + if (thd->rgi_slave) + { + /* + Temporary tables are shared with other by sql execution threads. + As a safety messure, clear the pointer to the common area. + */ + thd->temporary_tables= 0; + } } DBUG_VOID_RETURN; } @@ -1590,6 +1593,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl) } +static bool +use_temporary_table(THD *thd, TABLE *table, TABLE **out_table) +{ + *out_table= table; + if (!table) + return false; + /* + Temporary tables are not safe for parallel replication. They were + designed to be visible to one thread only, so have no table locking. + Thus there is no protection against two conflicting transactions + committing in parallel and things like that. + + So for now, anything that uses temporary tables will be serialised + with anything before it, when using parallel replication. + + ToDo: We might be able to introduce a reference count or something + on temp tables, and have slave worker threads wait for it to reach + zero before being allowed to use the temp table. Might not be worth + it though, as statement-based replication using temporary tables is + in any case rather fragile. + */ + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_prior_commit()) + return true; + /* + We need to set the THD as it may be different in case of + parallel replication + */ + if (table->in_use != thd) + { + table->in_use= thd; +#ifdef REMOVE_AFTER_MERGE_WITH_10 + if (thd->rgi_slave) + { + /* + We may be stealing an opened temporary tables from one slave + thread to another, we need to let the performance schema know that, + for aggregates per thread to work properly. + */ + table->file->unbind_psi(); + table->file->rebind_psi(); + } +#endif + } + return false; +} + +bool +find_and_use_temporary_table(THD *thd, const char *db, const char *table_name, + TABLE **out_table) +{ + return use_temporary_table(thd, find_temporary_table(thd, db, table_name), + out_table); +} + + +bool +find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, TABLE **out_table) +{ + return use_temporary_table(thd, find_temporary_table(thd, tl), out_table); +} + + /** Find a temporary table specified by a key in the THD::temporary_tables list. @@ -1610,26 +1676,6 @@ TABLE *find_temporary_table(THD *thd, if (table->s->table_cache_key.length == table_key_length && !memcmp(table->s->table_cache_key.str, table_key, table_key_length)) { - /* - We need to set the THD as it may be different in case of - parallel replication - */ - if (table->in_use != thd) - { - table->in_use= thd; -#ifdef REMOVE_AFTER_MERGE_WITH_10 - if (thd->rgi_slave) - { - /* - We may be stealing an opened temporary tables from one slave - thread to another, we need to let the performance schema know that, - for aggregates per thread to work properly. - */ - table->file->unbind_psi(); - table->file->rebind_psi(); - } -#endif - } result= table; break; } @@ -5622,6 +5668,14 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton, (uint) thd->variables.server_id, (ulong) thd->variables.pseudo_thread_id)); + if (add_to_temporary_tables_list) + { + /* Temporary tables are not safe for parallel replication. */ + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_prior_commit()) + return NULL; + } + /* Create the cache_key for temporary tables */ key_length= create_tmp_table_def_key(thd, cache_key, db, table_name); @@ -5857,7 +5911,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl) DBUG_RETURN(FALSE); } - if (!(table= find_temporary_table(thd, tl))) + if (find_and_use_temporary_table(thd, tl, &table)) + DBUG_RETURN(TRUE); + if (!table) { if (tl->open_type == OT_TEMPORARY_ONLY && tl->open_strategy == TABLE_LIST::OPEN_NORMAL) diff --git a/sql/sql_base.h b/sql/sql_base.h index 0e492fa8cf7..876bc6e7ed7 100644 --- a/sql/sql_base.h +++ b/sql/sql_base.h @@ -143,7 +143,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table, const char *db_name, const char *table_name); TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name); +bool find_and_use_temporary_table(THD *thd, const char *db, + const char *table_name, TABLE **out_table); TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl); +bool find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, + TABLE **out_table); TABLE *find_temporary_table(THD *thd, const char *table_key, uint table_key_length); void close_thread_tables(THD *thd); diff --git a/sql/sql_table.cc b/sql/sql_table.cc index c5748459d6d..1f967934e44 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -4656,7 +4656,9 @@ int create_table_impl(THD *thd, if (create_info->tmp_table()) { TABLE *tmp_table; - if ((tmp_table= find_temporary_table(thd, db, table_name))) + if (find_and_use_temporary_table(thd, db, table_name, &tmp_table)) + goto err; + if (tmp_table) { bool table_creation_was_logged= tmp_table->s->table_creation_was_logged; if (options.or_replace()) |