summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2015-04-13 14:28:07 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2015-04-13 14:28:07 +0200
commit2de8db6296fb5d5ea90f646b465b6afe33c32286 (patch)
treef0a149395b2ddc9a6c8d62ac0a0425dd692597a0
parent8a01a0acb332c36c8e1c63027fb9e65587ed2784 (diff)
parent60d094aeacb0c1433f2510e0228b2791745ae53e (diff)
downloadmariadb-git-2de8db6296fb5d5ea90f646b465b6afe33c32286.tar.gz
Merge MDEV-7936 into 10.1
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result52
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel_temptable.result20
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf17
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test80
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_temptable.test23
-rw-r--r--sql/sql_base.cc124
-rw-r--r--sql/sql_base.h4
-rw-r--r--sql/sql_table.cc4
8 files changed, 289 insertions, 35 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result b/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result
new file mode 100644
index 00000000000..47bf2ff887f
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result
@@ -0,0 +1,52 @@
+include/rpl_init.inc [topology=1->2->3]
+*** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure ***
+SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
+SET GLOBAL binlog_direct_non_transactional_updates=OFF;
+SET SESSION binlog_direct_non_transactional_updates=OFF;
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
+include/stop_slave.inc
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=10;
+SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count;
+SET GLOBAL binlog_commit_wait_count=2;
+SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec;
+SET GLOBAL binlog_commit_wait_usec=2000000;
+SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
+SET GLOBAL binlog_direct_non_transactional_updates=OFF;
+SET SESSION binlog_direct_non_transactional_updates=OFF;
+CHANGE MASTER TO master_use_gtid=current_pos;
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=current_pos;
+BEGIN;
+CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY;
+COMMIT;
+INSERT INTO t2 VALUES (1);
+INSERT INTO t1 SELECT a, a*10 FROM t2;
+DROP TABLE t2;
+include/save_master_gtid.inc
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a b
+1 10
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a b
+1 10
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+SET GLOBAL binlog_commit_wait_count=@old_commit_count;
+SET GLOBAL binlog_commit_wait_usec=@old_commit_usec;
+SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
+include/start_slave.inc
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
+CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table");
+DROP TABLE t1;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result
index 61eba2cab2f..290a10f7e03 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel_temptable.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel_temptable.result
@@ -116,6 +116,26 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
Variable_name Value
Slave_open_temp_tables 0
FLUSH LOGS;
+*** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
+CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
+SET @old_dbug= @@SESSION.debug_dbug;
+SET SESSION debug_dbug="+d,binlog_force_commit_id";
+SET @commit_id= 10000;
+INSERT INTO t4 VALUES (30);
+INSERT INTO t4 VALUES (31);
+SET SESSION debug_dbug= @old_dbug;
+INSERT INTO t1 SELECT a, "conservative" FROM t4;
+DROP TEMPORARY TABLE t4;
+SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
+a b
+30 conservative
+31 conservative
+include/save_master_pos.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
+a b
+30 conservative
+31 conservative
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf
new file mode 100644
index 00000000000..4e1d3878f03
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf
@@ -0,0 +1,17 @@
+!include ../my.cnf
+
+[mysqld.1]
+log-slave-updates
+loose-innodb
+
+[mysqld.2]
+log-slave-updates
+loose-innodb
+
+[mysqld.3]
+log-slave-updates
+loose-innodb
+
+[ENV]
+SERVER_MYPORT_3= @mysqld.3.port
+SERVER_MYSOCK_3= @mysqld.3.socket
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test
new file mode 100644
index 00000000000..4125394ef80
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test
@@ -0,0 +1,80 @@
+--source include/have_innodb.inc
+--let $rpl_topology=1->2->3
+--source include/rpl_init.inc
+
+--echo *** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure ***
+
+--connection server_1
+SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
+SET GLOBAL binlog_direct_non_transactional_updates=OFF;
+SET SESSION binlog_direct_non_transactional_updates=OFF;
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+--save_master_pos
+--source include/stop_slave.inc
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+SET GLOBAL slave_parallel_threads=10;
+SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count;
+SET GLOBAL binlog_commit_wait_count=2;
+SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec;
+SET GLOBAL binlog_commit_wait_usec=2000000;
+SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates;
+SET GLOBAL binlog_direct_non_transactional_updates=OFF;
+SET SESSION binlog_direct_non_transactional_updates=OFF;
+CHANGE MASTER TO master_use_gtid=current_pos;
+
+--connection server_3
+--sync_with_master
+--save_master_pos
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=current_pos;
+
+
+--connection server_1
+
+BEGIN;
+CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY;
+COMMIT;
+INSERT INTO t2 VALUES (1);
+INSERT INTO t1 SELECT a, a*10 FROM t2;
+DROP TABLE t2;
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection server_3
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+
+# Clean up
+
+--connection server_2
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+SET GLOBAL binlog_commit_wait_count=@old_commit_count;
+SET GLOBAL binlog_commit_wait_usec=@old_commit_usec;
+SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
+--source include/start_slave.inc
+
+--connection server_3
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+
+--connection server_1
+SET GLOBAL binlog_direct_non_transactional_updates= @old_updates;
+CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table");
+DROP TABLE t1;
+
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test
index b13fa5a01b1..5dffbf18498 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel_temptable.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel_temptable.test
@@ -213,6 +213,29 @@ SHOW STATUS LIKE 'Slave_open_temp_tables';
FLUSH LOGS;
+--echo *** MDEV-7936: Assertion `!table || table->in_use == _current_thd()' failed on parallel replication in optimistic mode ***
+
+--connection server_1
+CREATE TEMPORARY TABLE t4 (a INT PRIMARY KEY) ENGINE=InnoDB;
+SET @old_dbug= @@SESSION.debug_dbug;
+SET SESSION debug_dbug="+d,binlog_force_commit_id";
+SET @commit_id= 10000;
+INSERT INTO t4 VALUES (30);
+INSERT INTO t4 VALUES (31);
+SET SESSION debug_dbug= @old_dbug;
+INSERT INTO t1 SELECT a, "conservative" FROM t4;
+DROP TEMPORARY TABLE t4;
+SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
+--source include/save_master_pos.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+
+SELECT * FROM t1 WHERE a >= 30 ORDER BY a;
+
+
+# Clean up.
+
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index b4c3e5ecafd..0e9f668722d 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -704,20 +704,23 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd)
DBUG_VOID_RETURN;
}
- thd->lock_temporary_tables();
- for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
- {
- if ((table->query_id == thd->query_id) && ! table->open_by_handler)
- mark_tmp_table_for_reuse(table);
- }
- thd->unlock_temporary_tables();
- if (thd->rgi_slave)
+ if (thd->temporary_tables)
{
- /*
- Temporary tables are shared with other by sql execution threads.
- As a safety messure, clear the pointer to the common area.
- */
- thd->temporary_tables= 0;
+ thd->lock_temporary_tables();
+ for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
+ {
+ if ((table->query_id == thd->query_id) && ! table->open_by_handler)
+ mark_tmp_table_for_reuse(table);
+ }
+ thd->unlock_temporary_tables();
+ if (thd->rgi_slave)
+ {
+ /*
+ Temporary tables are shared with other by sql execution threads.
+ As a safety messure, clear the pointer to the common area.
+ */
+ thd->temporary_tables= 0;
+ }
}
DBUG_VOID_RETURN;
}
@@ -1590,6 +1593,69 @@ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl)
}
+static bool
+use_temporary_table(THD *thd, TABLE *table, TABLE **out_table)
+{
+ *out_table= table;
+ if (!table)
+ return false;
+ /*
+ Temporary tables are not safe for parallel replication. They were
+ designed to be visible to one thread only, so have no table locking.
+ Thus there is no protection against two conflicting transactions
+ committing in parallel and things like that.
+
+ So for now, anything that uses temporary tables will be serialised
+ with anything before it, when using parallel replication.
+
+ ToDo: We might be able to introduce a reference count or something
+ on temp tables, and have slave worker threads wait for it to reach
+ zero before being allowed to use the temp table. Might not be worth
+ it though, as statement-based replication using temporary tables is
+ in any case rather fragile.
+ */
+ if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
+ thd->wait_for_prior_commit())
+ return true;
+ /*
+ We need to set the THD as it may be different in case of
+ parallel replication
+ */
+ if (table->in_use != thd)
+ {
+ table->in_use= thd;
+#ifdef REMOVE_AFTER_MERGE_WITH_10
+ if (thd->rgi_slave)
+ {
+ /*
+ We may be stealing an opened temporary tables from one slave
+ thread to another, we need to let the performance schema know that,
+ for aggregates per thread to work properly.
+ */
+ table->file->unbind_psi();
+ table->file->rebind_psi();
+ }
+#endif
+ }
+ return false;
+}
+
+bool
+find_and_use_temporary_table(THD *thd, const char *db, const char *table_name,
+ TABLE **out_table)
+{
+ return use_temporary_table(thd, find_temporary_table(thd, db, table_name),
+ out_table);
+}
+
+
+bool
+find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl, TABLE **out_table)
+{
+ return use_temporary_table(thd, find_temporary_table(thd, tl), out_table);
+}
+
+
/**
Find a temporary table specified by a key in the THD::temporary_tables list.
@@ -1610,26 +1676,6 @@ TABLE *find_temporary_table(THD *thd,
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
- /*
- We need to set the THD as it may be different in case of
- parallel replication
- */
- if (table->in_use != thd)
- {
- table->in_use= thd;
-#ifdef REMOVE_AFTER_MERGE_WITH_10
- if (thd->rgi_slave)
- {
- /*
- We may be stealing an opened temporary tables from one slave
- thread to another, we need to let the performance schema know that,
- for aggregates per thread to work properly.
- */
- table->file->unbind_psi();
- table->file->rebind_psi();
- }
-#endif
- }
result= table;
break;
}
@@ -5622,6 +5668,14 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton,
(uint) thd->variables.server_id,
(ulong) thd->variables.pseudo_thread_id));
+ if (add_to_temporary_tables_list)
+ {
+ /* Temporary tables are not safe for parallel replication. */
+ if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
+ thd->wait_for_prior_commit())
+ return NULL;
+ }
+
/* Create the cache_key for temporary tables */
key_length= create_tmp_table_def_key(thd, cache_key, db, table_name);
@@ -5857,7 +5911,9 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl)
DBUG_RETURN(FALSE);
}
- if (!(table= find_temporary_table(thd, tl)))
+ if (find_and_use_temporary_table(thd, tl, &table))
+ DBUG_RETURN(TRUE);
+ if (!table)
{
if (tl->open_type == OT_TEMPORARY_ONLY &&
tl->open_strategy == TABLE_LIST::OPEN_NORMAL)
diff --git a/sql/sql_base.h b/sql/sql_base.h
index 0e492fa8cf7..876bc6e7ed7 100644
--- a/sql/sql_base.h
+++ b/sql/sql_base.h
@@ -143,7 +143,11 @@ TABLE_LIST *find_table_in_list(TABLE_LIST *table,
const char *db_name,
const char *table_name);
TABLE *find_temporary_table(THD *thd, const char *db, const char *table_name);
+bool find_and_use_temporary_table(THD *thd, const char *db,
+ const char *table_name, TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
+bool find_and_use_temporary_table(THD *thd, const TABLE_LIST *tl,
+ TABLE **out_table);
TABLE *find_temporary_table(THD *thd, const char *table_key,
uint table_key_length);
void close_thread_tables(THD *thd);
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index c5748459d6d..1f967934e44 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -4656,7 +4656,9 @@ int create_table_impl(THD *thd,
if (create_info->tmp_table())
{
TABLE *tmp_table;
- if ((tmp_table= find_temporary_table(thd, db, table_name)))
+ if (find_and_use_temporary_table(thd, db, table_name, &tmp_table))
+ goto err;
+ if (tmp_table)
{
bool table_creation_was_logged= tmp_table->s->table_creation_was_logged;
if (options.or_replace())