summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2015-03-04 13:48:28 +0100
committerKristian Nielsen <knielsen@knielsen-hq.org>2015-03-04 13:49:37 +0100
commit95d72088596c9c58c30ab87781061094309b4460 (patch)
tree52cbad4ea5bbe4c01c5e88a9efde3a05edd37107
parentf4f37533a09b9776e8d5ac3f3a27957f553c9043 (diff)
parent78c74dbe30d3a22feec5d069c7424d5a8a86ea4c (diff)
downloadmariadb-git-95d72088596c9c58c30ab87781061094309b4460.tar.gz
Merge MDEV-6589 and MDEV-6403 into 10.1.
Conflicts: sql/log.cc sql/rpl_rli.cc sql/sql_repl.cc
-rw-r--r--mysql-test/suite/rpl/r/rpl_gtid_misc.result25
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result147
-rw-r--r--mysql-test/suite/rpl/t/rpl_gtid_misc.test50
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test132
-rw-r--r--sql/log.cc2
-rw-r--r--sql/rpl_gtid.cc46
-rw-r--r--sql/rpl_gtid.h2
-rw-r--r--sql/rpl_parallel.cc98
-rw-r--r--sql/rpl_parallel.h1
-rw-r--r--sql/rpl_rli.cc71
-rw-r--r--sql/rpl_rli.h26
-rw-r--r--sql/slave.cc56
-rw-r--r--sql/sql_repl.cc24
13 files changed, 666 insertions, 14 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_gtid_misc.result b/mysql-test/suite/rpl/r/rpl_gtid_misc.result
new file mode 100644
index 00000000000..cdaac1b1d34
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_gtid_misc.result
@@ -0,0 +1,25 @@
+include/master-slave.inc
+[connection master]
+*** MDEV-6403: Temporary tables lost at STOP SLAVE in GTID mode if master has not rotated binlog since restart ***
+CREATE TABLE t1 (a INT PRIMARY KEY);
+include/stop_slave.inc
+SET sql_log_bin= 0;
+INSERT INTO t1 VALUES (1);
+SET sql_log_bin= 1;
+CHANGE MASTER TO master_use_gtid= current_pos;
+CREATE TEMPORARY TABLE t2 LIKE t1;
+INSERT INTO t2 VALUE (1);
+INSERT INTO t1 SELECT * FROM t2;
+DROP TEMPORARY TABLE t2;
+START SLAVE;
+include/wait_for_slave_sql_error.inc [errno=1062]
+STOP SLAVE IO_THREAD;
+SET sql_log_bin= 0;
+DELETE FROM t1 WHERE a=1;
+SET sql_log_bin= 1;
+include/start_slave.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+DROP TABLE t1;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result b/mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result
new file mode 100644
index 00000000000..d681fd7546d
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result
@@ -0,0 +1,147 @@
+include/master-slave.inc
+[connection master]
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=current_pos;
+include/start_slave.inc
+*** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication ***
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
+CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+INSERT INTO t2 VALUES (1);
+SELECT * FROM t1;
+a
+1
+SELECT * FROM t2;
+a
+1
+SET sql_log_bin=0;
+BEGIN;
+INSERT INTO t2 VALUES (5);
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (2);
+INSERT INTO t2 VALUES (3);
+FLUSH LOGS;
+INSERT INTO t1 VALUES (4);
+SET gtid_domain_id=1;
+INSERT INTO t2 VALUES (5);
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (6);
+INSERT INTO t1 VALUES (7);
+SET gtid_domain_id=2;
+INSERT INTO t2 VALUES (8);
+INSERT INTO t1 VALUES (9);
+FLUSH LOGS;
+SET gtid_domain_id=3;
+INSERT INTO t2 VALUES (10);
+INSERT INTO t1 VALUES (11);
+SET gtid_domain_id=1;
+INSERT INTO t1 VALUES (12);
+INSERT INTO t2 VALUES (13);
+SET gtid_domain_id=0;
+INSERT INTO t2 VALUES (14);
+FLUSH LOGS;
+SET gtid_domain_id=3;
+INSERT INTO t2 VALUES (15);
+SET gtid_domain_id=2;
+INSERT INTO t2 VALUES (16);
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (17);
+SET @gtid0 = @@last_gtid;
+SET gtid_domain_id=2;
+INSERT INTO t1 VALUES (18);
+SET @gtid2 = @@last_gtid;
+SET gtid_domain_id=3;
+INSERT INTO t1 VALUES (19);
+SET @gtid3 = @@last_gtid;
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+4
+6
+7
+9
+11
+12
+17
+18
+19
+SELECT * FROM t2 ORDER BY a;
+a
+1
+3
+5
+8
+10
+13
+14
+15
+16
+include/save_master_gtid.inc
+SELECT MASTER_GTID_WAIT('WAIT_POS');
+MASTER_GTID_WAIT('WAIT_POS')
+0
+COMMIT;
+SET sql_log_bin=1;
+include/wait_for_slave_sql_error.inc [errno=1062]
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+4
+6
+7
+9
+11
+17
+18
+19
+SELECT * FROM t2 ORDER BY a;
+a
+1
+3
+5
+8
+10
+14
+15
+16
+SET sql_log_bin=0;
+DELETE FROM t2 WHERE a=5;
+SET sql_log_bin=1;
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+2
+4
+6
+7
+9
+11
+12
+17
+18
+19
+SELECT * FROM t2 ORDER BY a;
+a
+1
+3
+5
+8
+10
+13
+14
+15
+16
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+include/start_slave.inc
+SET DEBUG_SYNC= 'RESET';
+DROP TABLE t1,t2;
+SET DEBUG_SYNC= 'RESET';
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_gtid_misc.test b/mysql-test/suite/rpl/t/rpl_gtid_misc.test
new file mode 100644
index 00000000000..66d98ec8025
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_gtid_misc.test
@@ -0,0 +1,50 @@
+--source include/master-slave.inc
+
+--echo *** MDEV-6403: Temporary tables lost at STOP SLAVE in GTID mode if master has not rotated binlog since restart ***
+
+--connection master
+CREATE TABLE t1 (a INT PRIMARY KEY);
+--sync_slave_with_master
+
+--connection slave
+--source include/stop_slave.inc
+# Inject a duplicate key error that will make the slave stop in the middle of
+# a sequence of transactions that use a temporary table.
+SET sql_log_bin= 0;
+INSERT INTO t1 VALUES (1);
+SET sql_log_bin= 1;
+CHANGE MASTER TO master_use_gtid= current_pos;
+
+--connection master
+
+# Make some queries that use a temporary table.
+CREATE TEMPORARY TABLE t2 LIKE t1;
+INSERT INTO t2 VALUE (1);
+INSERT INTO t1 SELECT * FROM t2;
+DROP TEMPORARY TABLE t2;
+--save_master_pos
+
+--connection slave
+START SLAVE;
+--let $slave_sql_errno=1062
+--source include/wait_for_slave_sql_error.inc
+
+# Restart the slave.
+# The bug was that the IO thread would receive again the restart
+# format_description event at the start of the master's binlog, and this
+# event would cause the SQL thread to discard all active temporary tables.
+
+STOP SLAVE IO_THREAD;
+
+SET sql_log_bin= 0;
+DELETE FROM t1 WHERE a=1;
+SET sql_log_bin= 1;
+
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t1 ORDER BY a;
+
+--connection master
+DROP TABLE t1;
+
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test b/mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test
new file mode 100644
index 00000000000..5929fad71df
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test
@@ -0,0 +1,132 @@
+--source include/have_innodb.inc
+--source include/have_debug.inc
+--source include/have_debug_sync.inc
+--source include/master-slave.inc
+
+--connection server_2
+SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+CHANGE MASTER TO master_use_gtid=current_pos;
+--source include/start_slave.inc
+
+
+--echo *** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication ***
+
+--connection server_1
+ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
+CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
+CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1);
+INSERT INTO t2 VALUES (1);
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+SELECT * FROM t1;
+SELECT * FROM t2;
+
+# Block one domain, which we will later cause to give an error. And let some
+# other domains proceed so we can check that after restart, the slave is able
+# to correctly restart each domain in a separate position.
+
+--connect (con_temp1,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
+SET sql_log_bin=0;
+BEGIN;
+INSERT INTO t2 VALUES (5);
+
+--connection server_1
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (2);
+INSERT INTO t2 VALUES (3);
+FLUSH LOGS;
+INSERT INTO t1 VALUES (4);
+
+SET gtid_domain_id=1;
+# This query will be blocked on the slave, and later give a duplicate key error.
+INSERT INTO t2 VALUES (5);
+
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (6);
+INSERT INTO t1 VALUES (7);
+
+SET gtid_domain_id=2;
+INSERT INTO t2 VALUES (8);
+INSERT INTO t1 VALUES (9);
+FLUSH LOGS;
+
+SET gtid_domain_id=3;
+INSERT INTO t2 VALUES (10);
+INSERT INTO t1 VALUES (11);
+
+# These cannot be replicated before the error, as a prior commit is blocked.
+SET gtid_domain_id=1;
+INSERT INTO t1 VALUES (12);
+INSERT INTO t2 VALUES (13);
+
+SET gtid_domain_id=0;
+INSERT INTO t2 VALUES (14);
+FLUSH LOGS;
+
+SET gtid_domain_id=3;
+INSERT INTO t2 VALUES (15);
+
+SET gtid_domain_id=2;
+INSERT INTO t2 VALUES (16);
+
+SET gtid_domain_id=0;
+INSERT INTO t1 VALUES (17);
+SET @gtid0 = @@last_gtid;
+SET gtid_domain_id=2;
+INSERT INTO t1 VALUES (18);
+SET @gtid2 = @@last_gtid;
+SET gtid_domain_id=3;
+INSERT INTO t1 VALUES (19);
+SET @gtid3 = @@last_gtid;
+--let $wait_pos= `SELECT CONCAT(@gtid0, ",", @gtid2, ",", @gtid3)`
+
+SELECT * FROM t1 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+--source include/save_master_gtid.inc
+
+
+--connection server_2
+# First wait for domains 0, 2, and 3 to complete.
+--replace_result $wait_pos WAIT_POS
+eval SELECT MASTER_GTID_WAIT('$wait_pos');
+
+# Then release the row lock, and wait for the domain 1 to fail with
+# duplicate key error.
+--connection con_temp1
+COMMIT;
+SET sql_log_bin=1;
+
+--connection server_2
+--let $slave_sql_errno= 1062
+--source include/wait_for_slave_sql_error.inc
+
+SELECT * FROM t1 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+
+SET sql_log_bin=0;
+DELETE FROM t2 WHERE a=5;
+SET sql_log_bin=1;
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+
+SELECT * FROM t1 ORDER BY a;
+SELECT * FROM t2 ORDER BY a;
+
+
+# Clean up.
+--connection server_2
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=@old_parallel_threads;
+--source include/start_slave.inc
+SET DEBUG_SYNC= 'RESET';
+
+--connection server_1
+DROP TABLE t1,t2;
+SET DEBUG_SYNC= 'RESET';
+
+--source include/rpl_end.inc
diff --git a/sql/log.cc b/sql/log.cc
index f94fe12a6f3..adb15f31198 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -4188,7 +4188,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
included= 1;
to_purge_if_included= my_strdup(ir->name, MYF(0));
}
- my_free(ir);
+ rli->free_inuse_relaylog(ir);
ir= next;
}
rli->inuse_relaylog_list= ir;
diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc
index 8b339e0c34e..d5a672eb854 100644
--- a/sql/rpl_gtid.cc
+++ b/sql/rpl_gtid.cc
@@ -1164,6 +1164,27 @@ rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
}
+static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
+{
+ rpl_binlog_state *self= (rpl_binlog_state *)data;
+ return self->update_nolock(gtid, false);
+}
+
+
+bool
+rpl_binlog_state::load(rpl_slave_state *slave_pos)
+{
+ bool res= false;
+
+ mysql_mutex_lock(&LOCK_binlog_state);
+ reset_nolock();
+ if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0))
+ res= true;
+ mysql_mutex_unlock(&LOCK_binlog_state);
+ return res;
+}
+
+
rpl_binlog_state::~rpl_binlog_state()
{
free();
@@ -1933,6 +1954,31 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
/*
+ Check if the GTID position has been reached, for mysql_binlog_send().
+
+ The position has not been reached if we have anything in the state, unless
+ it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
+ belong to this master at all), or the START_OWN_SLAVE_POS (which means that
+ we start on an old position from when the server was a slave with
+ --log-slave-updates=0).
+*/
+bool
+slave_connection_state::is_pos_reached()
+{
+ uint32 i;
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ entry *e= (entry *)my_hash_element(&hash, i);
+ if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
+ return false;
+ }
+
+ return true;
+}
+
+
+/*
Execute a MASTER_GTID_WAIT().
The position to wait for is in gtid_str in string form.
The timeout in microseconds is in timeout_us, zero means no timeout.
diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h
index bd58d091a02..98f6c505dac 100644
--- a/sql/rpl_gtid.h
+++ b/sql/rpl_gtid.h
@@ -241,6 +241,7 @@ struct rpl_binlog_state
void reset();
void free();
bool load(struct rpl_gtid *list, uint32 count);
+ bool load(rpl_slave_state *slave_pos);
int update_nolock(const struct rpl_gtid *gtid, bool strict);
int update(const struct rpl_gtid *gtid, bool strict);
int update_with_next_gtid(uint32 domain_id, uint32 server_id,
@@ -296,6 +297,7 @@ struct slave_connection_state
int to_string(String *out_str);
int append_to_string(String *out_str);
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
+ bool is_pos_reached();
};
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 609f50952c0..756dafdf534 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -1894,6 +1894,41 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
/*
+ Handle seeing a GTID during slave restart in GTID mode. If we stopped with
+ different replication domains having reached different positions in the relay
+ log, we need to skip event groups in domains that are further progressed.
+
+ Updates the state with the seen GTID, and returns true if this GTID should
+ be skipped, false otherwise.
+*/
+bool
+process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid)
+{
+ slave_connection_state::entry *gtid_entry;
+ slave_connection_state *state= &rli->restart_gtid_pos;
+
+ if (likely(state->count() == 0) ||
+ !(gtid_entry= state->find_entry(gtid->domain_id)))
+ return false;
+ if (gtid->server_id == gtid_entry->gtid.server_id)
+ {
+ uint64 seq_no= gtid_entry->gtid.seq_no;
+ if (gtid->seq_no >= seq_no)
+ {
+ /*
+ This domain has reached its start position. So remove it, so that
+ further events will be processed normally.
+ */
+ state->remove(&gtid_entry->gtid);
+ }
+ return gtid->seq_no <= seq_no;
+ }
+ else
+ return true;
+}
+
+
+/*
This is used when we get an error during processing in do_event();
We will not queue any event to the thread, but we still need to wake it up
to be sure that it will be returned to the pool.
@@ -1955,13 +1990,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return -1;
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
- if (unlikely(!current) && typ != GTID_EVENT)
+ is_group_event= Log_event::is_group_event(typ);
+ if (unlikely(!current) && typ != GTID_EVENT &&
+ !(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event))
return -1;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
- if (typ == FORMAT_DESCRIPTION_EVENT)
+ if (unlikely(typ == FORMAT_DESCRIPTION_EVENT))
{
Format_description_log_event *fdev=
static_cast<Format_description_log_event *>(ev);
@@ -1987,6 +2024,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
}
}
}
+ else if (unlikely(typ == GTID_LIST_EVENT))
+ {
+ Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev);
+ rpl_gtid *list= glev->list;
+ uint32 count= glev->count;
+ rli->update_relay_log_state(list, count);
+ while (count)
+ {
+ process_gtid_for_restart_pos(rli, list);
+ ++list;
+ --count;
+ }
+ }
/*
Stop queueing additional event groups once the SQL thread is requested to
@@ -1996,7 +2046,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
been partially queued, but after that we will just ignore any further
events the SQL driver thread may try to queue, and eventually it will stop.
*/
- is_group_event= Log_event::is_group_event(typ);
if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
sql_thread_stopping= true;
if (sql_thread_stopping)
@@ -2009,8 +2058,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 0;
}
+ if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)
+ {
+ if (typ == GTID_EVENT)
+ rli->gtid_skip_flag= GTID_SKIP_NOT;
+ else
+ {
+ if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE)
+ {
+ if (!Log_event::is_part_of_group(typ))
+ rli->gtid_skip_flag= GTID_SKIP_NOT;
+ }
+ else
+ {
+ DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
+ if (typ == XID_EVENT ||
+ (typ == QUERY_EVENT &&
+ (((Query_log_event *)ev)->is_commit() ||
+ ((Query_log_event *)ev)->is_rollback())))
+ rli->gtid_skip_flag= GTID_SKIP_NOT;
+ }
+ delete_or_keep_event_post_apply(serial_rgi, typ, ev);
+ return 0;
+ }
+ }
+
if (typ == GTID_EVENT)
{
+ rpl_gtid gtid;
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
@@ -2022,6 +2097,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1;
}
current= e;
+
+ gtid.domain_id= gtid_ev->domain_id;
+ gtid.server_id= gtid_ev->server_id;
+ gtid.seq_no= gtid_ev->seq_no;
+ rli->update_relay_log_state(&gtid, 1);
+ if (process_gtid_for_restart_pos(rli, &gtid))
+ {
+ /*
+ This domain has progressed further into the relay log before the last
+ SQL thread restart. So we need to skip this event group to not doubly
+ apply it.
+ */
+ rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ delete_or_keep_event_post_apply(serial_rgi, typ, ev);
+ return 0;
+ }
}
else
e= current;
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index f1145d4cc4e..eebb8d136a9 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -339,5 +339,6 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count,
bool skip_check= false);
+extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
#endif /* RPL_PARALLEL_H */
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index eb37b5c6cbc..d98d216d3f2 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -62,7 +62,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
- inited(0), abort_slave(0), stop_for_until(0),
+ gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
m_flags(0)
@@ -100,17 +100,9 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
Relay_log_info::~Relay_log_info()
{
- inuse_relaylog *cur;
DBUG_ENTER("Relay_log_info::~Relay_log_info");
- cur= inuse_relaylog_list;
- while (cur)
- {
- DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
- inuse_relaylog *next= cur->next;
- my_free(cur);
- cur= next;
- }
+ reset_inuse_relaylog();
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
@@ -1383,14 +1375,34 @@ int
Relay_log_info::alloc_inuse_relaylog(const char *name)
{
inuse_relaylog *ir;
+ uint32 gtid_count;
+ rpl_gtid *gtid_list;
if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
return 1;
}
+ gtid_count= relay_log_state.count();
+ if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count,
+ MYF(MY_WME))))
+ {
+ my_free(ir);
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
+ return 1;
+ }
+ if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
+ {
+ my_free(gtid_list);
+ my_free(ir);
+ DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
ir->rli= this;
strmake_buf(ir->name, name);
+ ir->relay_log_state= gtid_list;
+ ir->relay_log_state_count= gtid_count;
if (!inuse_relaylog_list)
inuse_relaylog_list= ir;
@@ -1405,6 +1417,45 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
}
+void
+Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
+{
+ my_free(ir->relay_log_state);
+ my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock);
+ my_free(ir);
+}
+
+
+void
+Relay_log_info::reset_inuse_relaylog()
+{
+ inuse_relaylog *cur= inuse_relaylog_list;
+ while (cur)
+ {
+ DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
+ inuse_relaylog *next= cur->next;
+ free_inuse_relaylog(cur);
+ cur= next;
+ }
+ inuse_relaylog_list= last_inuse_relaylog= NULL;
+}
+
+
+int
+Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
+{
+ int res= 0;
+ while (count)
+ {
+ if (relay_log_state.update_nolock(gtid_list, false))
+ res= 1;
+ ++gtid_list;
+ --count;
+ }
+ return res;
+}
+
+
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 96db33206d9..56300c6ba14 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -269,6 +269,8 @@ public:
int events_till_abort;
#endif
+ enum_gtid_skip_type gtid_skip_flag;
+
/*
inited changes its value within LOCK_active_mi-guarded critical
sections at times of start_slave_threads() (0->1) and end_slave() (1->0).
@@ -344,6 +346,21 @@ public:
size_t slave_patternload_file_size;
rpl_parallel parallel;
+ /*
+ The relay_log_state keeps track of the current binlog state of the execution
+ of the relay log. This is used to know where to resume current GTID position
+ if the slave thread is stopped and restarted.
+ It is only accessed from the SQL thread, so it does not need any locking.
+ */
+ rpl_binlog_state relay_log_state;
+ /*
+ The restart_gtid_state is used when the SQL thread restarts on a relay log
+ in GTID mode. In multi-domain parallel replication, each domain may have a
+ separat position, so some events in more progressed domains may need to be
+ skipped. This keeps track of the domains that have not yet reached their
+ starting event.
+ */
+ slave_connection_state restart_gtid_pos;
Relay_log_info(bool is_slave_recovery);
~Relay_log_info();
@@ -408,6 +425,9 @@ public:
time_t event_creation_time, THD *thd,
rpl_group_info *rgi);
int alloc_inuse_relaylog(const char *name);
+ void free_inuse_relaylog(inuse_relaylog *ir);
+ void reset_inuse_relaylog();
+ int update_relay_log_state(rpl_gtid *gtid_list, uint32 count);
/**
Is the replication inside a group?
@@ -497,6 +517,12 @@ private:
struct inuse_relaylog {
inuse_relaylog *next;
Relay_log_info *rli;
+ /*
+ relay_log_state holds the binlog state corresponding to the start of this
+ relay log file. It is an array with relay_log_state_count elements.
+ */
+ rpl_gtid *relay_log_state;
+ uint32 relay_log_state_count;
/* Number of events in this relay log queued for worker threads. */
int64 queued_count;
/* Number of events completed by worker threads. */
diff --git a/sql/slave.cc b/sql/slave.cc
index 6d64534faf9..d29f2161acf 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -943,6 +943,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
Master_info::USE_GTID_CURRENT_POS);
mi->events_queued_since_last_gtid= 0;
mi->gtid_reconnect_event_skip_count= 0;
+
+ mi->rli.restart_gtid_pos.reset();
}
if (!error && (thread_mask & SLAVE_IO))
@@ -4479,6 +4481,16 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false;
+ if (mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ /*
+ We initialize the relay log state from the know starting position.
+ It will then be updated as required by GTID and GTID_LIST events found
+ while applying events read from relay logs.
+ */
+ rli->relay_log_state.load(&rpl_global_gtid_slave_state);
+ }
+ rli->gtid_skip_flag = GTID_SKIP_NOT;
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
@@ -4489,6 +4501,7 @@ pthread_handler_t handle_slave_sql(void *arg)
"Error initializing relay log position: %s", errmsg);
goto err;
}
+ rli->reset_inuse_relaylog();
if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
goto err;
@@ -4705,7 +4718,49 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
thd->reset_query();
thd->reset_db(NULL, 0);
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
+ {
+ ulong domain_count;
+
flush_relay_log_info(rli);
+ if (opt_slave_parallel_threads > 0)
+ {
+ /*
+ In parallel replication GTID mode, we may stop with different domains
+ at different positions in the relay log.
+
+ To handle this when we restart the SQL thread, mark the current
+ per-domain position in the Relay_log_info.
+ */
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ domain_count= rpl_global_gtid_slave_state.count();
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ if (domain_count > 1)
+ {
+ inuse_relaylog *ir;
+
+ /*
+ Load the starting GTID position, so that we can skip already applied
+ GTIDs when we restart the SQL thread. And set the start position in
+ the relay log back to a known safe place to start (prior to any not
+ yet applied transaction in any domain).
+ */
+ rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0);
+ if ((ir= rli->inuse_relaylog_list))
+ {
+ rpl_gtid *gtid= ir->relay_log_state;
+ uint32 count= ir->relay_log_state_count;
+ while (count > 0)
+ {
+ process_gtid_for_restart_pos(rli, gtid);
+ ++gtid;
+ --count;
+ }
+ strmake_buf(rli->group_relay_log_name, ir->name);
+ rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ }
+ }
+ }
+ }
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
thd->add_status_to_global();
mysql_mutex_lock(&rli->run_lock);
@@ -4718,6 +4773,7 @@ err_during_init:
/* Forget the relay log's format */
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= 0;
+ rli->reset_inuse_relaylog();
/* Wake up master_pos_wait() */
mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index b44bd943f89..8e18d218d37 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -2320,6 +2320,30 @@ static int send_format_descriptor_event(binlog_send_info *info,
info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
fix_checksum(packet, ev_offset);
}
+ else if (info->using_gtid_state)
+ {
+ /*
+ If this event has the field `created' set, then it will cause the
+ slave to delete all active temporary tables. This must not happen
+ if the slave received any later GTIDs in a previous connect, as
+ those GTIDs might have created new temporary tables that are still
+ needed.
+
+ So here, we check if the starting GTID position was already
+ reached before this format description event. If not, we clear the
+ `created' flag to preserve temporary tables on the slave. (If the
+ slave connects at a position past this event, it means that it
+ already received and handled it in a previous connect).
+ */
+ if (!info.gtid_state.is_pos_reached())
+ {
+ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
+ if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ fix_checksum(packet, ev_offset);
+ }
+ }
/* send it */
if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length()))