diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-03-04 13:48:28 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-03-04 13:49:37 +0100 |
commit | 95d72088596c9c58c30ab87781061094309b4460 (patch) | |
tree | 52cbad4ea5bbe4c01c5e88a9efde3a05edd37107 | |
parent | f4f37533a09b9776e8d5ac3f3a27957f553c9043 (diff) | |
parent | 78c74dbe30d3a22feec5d069c7424d5a8a86ea4c (diff) | |
download | mariadb-git-95d72088596c9c58c30ab87781061094309b4460.tar.gz |
Merge MDEV-6589 and MDEV-6403 into 10.1.
Conflicts:
sql/log.cc
sql/rpl_rli.cc
sql/sql_repl.cc
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_gtid_misc.result | 25 | ||||
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result | 147 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_gtid_misc.test | 50 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test | 132 | ||||
-rw-r--r-- | sql/log.cc | 2 | ||||
-rw-r--r-- | sql/rpl_gtid.cc | 46 | ||||
-rw-r--r-- | sql/rpl_gtid.h | 2 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 98 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 1 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 71 | ||||
-rw-r--r-- | sql/rpl_rli.h | 26 | ||||
-rw-r--r-- | sql/slave.cc | 56 | ||||
-rw-r--r-- | sql/sql_repl.cc | 24 |
13 files changed, 666 insertions, 14 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_gtid_misc.result b/mysql-test/suite/rpl/r/rpl_gtid_misc.result new file mode 100644 index 00000000000..cdaac1b1d34 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_gtid_misc.result @@ -0,0 +1,25 @@ +include/master-slave.inc +[connection master] +*** MDEV-6403: Temporary tables lost at STOP SLAVE in GTID mode if master has not rotated binlog since restart *** +CREATE TABLE t1 (a INT PRIMARY KEY); +include/stop_slave.inc +SET sql_log_bin= 0; +INSERT INTO t1 VALUES (1); +SET sql_log_bin= 1; +CHANGE MASTER TO master_use_gtid= current_pos; +CREATE TEMPORARY TABLE t2 LIKE t1; +INSERT INTO t2 VALUE (1); +INSERT INTO t1 SELECT * FROM t2; +DROP TEMPORARY TABLE t2; +START SLAVE; +include/wait_for_slave_sql_error.inc [errno=1062] +STOP SLAVE IO_THREAD; +SET sql_log_bin= 0; +DELETE FROM t1 WHERE a=1; +SET sql_log_bin= 1; +include/start_slave.inc +SELECT * FROM t1 ORDER BY a; +a +1 +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result b/mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result new file mode 100644 index 00000000000..d681fd7546d --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result @@ -0,0 +1,147 @@ +include/master-slave.inc +[connection master] +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=current_pos; +include/start_slave.inc +*** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication *** +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM; +CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1); +INSERT INTO t2 VALUES (1); +SELECT * FROM t1; +a +1 +SELECT * FROM t2; +a +1 +SET sql_log_bin=0; +BEGIN; +INSERT INTO t2 VALUES (5); +SET gtid_domain_id=0; +INSERT INTO t1 VALUES (2); +INSERT INTO t2 VALUES (3); +FLUSH LOGS; +INSERT INTO t1 VALUES (4); +SET gtid_domain_id=1; +INSERT INTO t2 VALUES (5); +SET gtid_domain_id=0; +INSERT INTO t1 VALUES (6); +INSERT INTO t1 VALUES (7); +SET gtid_domain_id=2; +INSERT INTO t2 VALUES (8); +INSERT INTO t1 VALUES (9); +FLUSH LOGS; +SET gtid_domain_id=3; +INSERT INTO t2 VALUES (10); +INSERT INTO t1 VALUES (11); +SET gtid_domain_id=1; +INSERT INTO t1 VALUES (12); +INSERT INTO t2 VALUES (13); +SET gtid_domain_id=0; +INSERT INTO t2 VALUES (14); +FLUSH LOGS; +SET gtid_domain_id=3; +INSERT INTO t2 VALUES (15); +SET gtid_domain_id=2; +INSERT INTO t2 VALUES (16); +SET gtid_domain_id=0; +INSERT INTO t1 VALUES (17); +SET @gtid0 = @@last_gtid; +SET gtid_domain_id=2; +INSERT INTO t1 VALUES (18); +SET @gtid2 = @@last_gtid; +SET gtid_domain_id=3; +INSERT INTO t1 VALUES (19); +SET @gtid3 = @@last_gtid; +SELECT * FROM t1 ORDER BY a; +a +1 +2 +4 +6 +7 +9 +11 +12 +17 +18 +19 +SELECT * FROM t2 ORDER BY a; +a +1 +3 +5 +8 +10 +13 +14 +15 +16 +include/save_master_gtid.inc +SELECT MASTER_GTID_WAIT('WAIT_POS'); +MASTER_GTID_WAIT('WAIT_POS') +0 +COMMIT; +SET sql_log_bin=1; +include/wait_for_slave_sql_error.inc [errno=1062] +SELECT * FROM t1 ORDER BY a; +a +1 +2 +4 +6 +7 +9 +11 +17 +18 +19 +SELECT * FROM t2 ORDER BY a; +a +1 +3 +5 +8 +10 +14 +15 +16 +SET sql_log_bin=0; +DELETE FROM t2 WHERE a=5; +SET sql_log_bin=1; +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +4 +6 +7 +9 +11 +12 +17 +18 +19 +SELECT * FROM t2 ORDER BY a; +a +1 +3 +5 +8 +10 +13 +14 +15 +16 +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +include/start_slave.inc +SET DEBUG_SYNC= 'RESET'; +DROP TABLE t1,t2; +SET DEBUG_SYNC= 'RESET'; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_gtid_misc.test b/mysql-test/suite/rpl/t/rpl_gtid_misc.test new file mode 100644 index 00000000000..66d98ec8025 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_gtid_misc.test @@ -0,0 +1,50 @@ +--source include/master-slave.inc + +--echo *** MDEV-6403: Temporary tables lost at STOP SLAVE in GTID mode if master has not rotated binlog since restart *** + +--connection master +CREATE TABLE t1 (a INT PRIMARY KEY); +--sync_slave_with_master + +--connection slave +--source include/stop_slave.inc +# Inject a duplicate key error that will make the slave stop in the middle of +# a sequence of transactions that use a temporary table. +SET sql_log_bin= 0; +INSERT INTO t1 VALUES (1); +SET sql_log_bin= 1; +CHANGE MASTER TO master_use_gtid= current_pos; + +--connection master + +# Make some queries that use a temporary table. +CREATE TEMPORARY TABLE t2 LIKE t1; +INSERT INTO t2 VALUE (1); +INSERT INTO t1 SELECT * FROM t2; +DROP TEMPORARY TABLE t2; +--save_master_pos + +--connection slave +START SLAVE; +--let $slave_sql_errno=1062 +--source include/wait_for_slave_sql_error.inc + +# Restart the slave. +# The bug was that the IO thread would receive again the restart +# format_description event at the start of the master's binlog, and this +# event would cause the SQL thread to discard all active temporary tables. + +STOP SLAVE IO_THREAD; + +SET sql_log_bin= 0; +DELETE FROM t1 WHERE a=1; +SET sql_log_bin= 1; + +--source include/start_slave.inc +--sync_with_master +SELECT * FROM t1 ORDER BY a; + +--connection master +DROP TABLE t1; + +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test b/mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test new file mode 100644 index 00000000000..5929fad71df --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test @@ -0,0 +1,132 @@ +--source include/have_innodb.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +--source include/master-slave.inc + +--connection server_2 +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=current_pos; +--source include/start_slave.inc + + +--echo *** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication *** + +--connection server_1 +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM; +CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1); +INSERT INTO t2 VALUES (1); +--save_master_pos + +--connection server_2 +--sync_with_master +SELECT * FROM t1; +SELECT * FROM t2; + +# Block one domain, which we will later cause to give an error. And let some +# other domains proceed so we can check that after restart, the slave is able +# to correctly restart each domain in a separate position. + +--connect (con_temp1,127.0.0.1,root,,test,$SERVER_MYPORT_2,) +SET sql_log_bin=0; +BEGIN; +INSERT INTO t2 VALUES (5); + +--connection server_1 +SET gtid_domain_id=0; +INSERT INTO t1 VALUES (2); +INSERT INTO t2 VALUES (3); +FLUSH LOGS; +INSERT INTO t1 VALUES (4); + +SET gtid_domain_id=1; +# This query will be blocked on the slave, and later give a duplicate key error. +INSERT INTO t2 VALUES (5); + +SET gtid_domain_id=0; +INSERT INTO t1 VALUES (6); +INSERT INTO t1 VALUES (7); + +SET gtid_domain_id=2; +INSERT INTO t2 VALUES (8); +INSERT INTO t1 VALUES (9); +FLUSH LOGS; + +SET gtid_domain_id=3; +INSERT INTO t2 VALUES (10); +INSERT INTO t1 VALUES (11); + +# These cannot be replicated before the error, as a prior commit is blocked. +SET gtid_domain_id=1; +INSERT INTO t1 VALUES (12); +INSERT INTO t2 VALUES (13); + +SET gtid_domain_id=0; +INSERT INTO t2 VALUES (14); +FLUSH LOGS; + +SET gtid_domain_id=3; +INSERT INTO t2 VALUES (15); + +SET gtid_domain_id=2; +INSERT INTO t2 VALUES (16); + +SET gtid_domain_id=0; +INSERT INTO t1 VALUES (17); +SET @gtid0 = @@last_gtid; +SET gtid_domain_id=2; +INSERT INTO t1 VALUES (18); +SET @gtid2 = @@last_gtid; +SET gtid_domain_id=3; +INSERT INTO t1 VALUES (19); +SET @gtid3 = @@last_gtid; +--let $wait_pos= `SELECT CONCAT(@gtid0, ",", @gtid2, ",", @gtid3)` + +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; +--source include/save_master_gtid.inc + + +--connection server_2 +# First wait for domains 0, 2, and 3 to complete. +--replace_result $wait_pos WAIT_POS +eval SELECT MASTER_GTID_WAIT('$wait_pos'); + +# Then release the row lock, and wait for the domain 1 to fail with +# duplicate key error. +--connection con_temp1 +COMMIT; +SET sql_log_bin=1; + +--connection server_2 +--let $slave_sql_errno= 1062 +--source include/wait_for_slave_sql_error.inc + +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; + +SET sql_log_bin=0; +DELETE FROM t2 WHERE a=5; +SET sql_log_bin=1; +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc + +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; + + +# Clean up. +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +--source include/start_slave.inc +SET DEBUG_SYNC= 'RESET'; + +--connection server_1 +DROP TABLE t1,t2; +SET DEBUG_SYNC= 'RESET'; + +--source include/rpl_end.inc diff --git a/sql/log.cc b/sql/log.cc index f94fe12a6f3..adb15f31198 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -4188,7 +4188,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) included= 1; to_purge_if_included= my_strdup(ir->name, MYF(0)); } - my_free(ir); + rli->free_inuse_relaylog(ir); ir= next; } rli->inuse_relaylog_list= ir; diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index 8b339e0c34e..d5a672eb854 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -1164,6 +1164,27 @@ rpl_binlog_state::load(struct rpl_gtid *list, uint32 count) } +static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data) +{ + rpl_binlog_state *self= (rpl_binlog_state *)data; + return self->update_nolock(gtid, false); +} + + +bool +rpl_binlog_state::load(rpl_slave_state *slave_pos) +{ + bool res= false; + + mysql_mutex_lock(&LOCK_binlog_state); + reset_nolock(); + if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0)) + res= true; + mysql_mutex_unlock(&LOCK_binlog_state); + return res; +} + + rpl_binlog_state::~rpl_binlog_state() { free(); @@ -1933,6 +1954,31 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) /* + Check if the GTID position has been reached, for mysql_binlog_send(). + + The position has not been reached if we have anything in the state, unless + it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not + belong to this master at all), or the START_OWN_SLAVE_POS (which means that + we start on an old position from when the server was a slave with + --log-slave-updates=0). +*/ +bool +slave_connection_state::is_pos_reached() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + { + entry *e= (entry *)my_hash_element(&hash, i); + if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN))) + return false; + } + + return true; +} + + +/* Execute a MASTER_GTID_WAIT(). The position to wait for is in gtid_str in string form. The timeout in microseconds is in timeout_us, zero means no timeout. diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index bd58d091a02..98f6c505dac 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -241,6 +241,7 @@ struct rpl_binlog_state void reset(); void free(); bool load(struct rpl_gtid *list, uint32 count); + bool load(rpl_slave_state *slave_pos); int update_nolock(const struct rpl_gtid *gtid, bool strict); int update(const struct rpl_gtid *gtid, bool strict); int update_with_next_gtid(uint32 domain_id, uint32 server_id, @@ -296,6 +297,7 @@ struct slave_connection_state int to_string(String *out_str); int append_to_string(String *out_str); int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); + bool is_pos_reached(); }; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 609f50952c0..756dafdf534 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1894,6 +1894,41 @@ rpl_parallel::wait_for_workers_idle(THD *thd) /* + Handle seeing a GTID during slave restart in GTID mode. If we stopped with + different replication domains having reached different positions in the relay + log, we need to skip event groups in domains that are further progressed. + + Updates the state with the seen GTID, and returns true if this GTID should + be skipped, false otherwise. +*/ +bool +process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid) +{ + slave_connection_state::entry *gtid_entry; + slave_connection_state *state= &rli->restart_gtid_pos; + + if (likely(state->count() == 0) || + !(gtid_entry= state->find_entry(gtid->domain_id))) + return false; + if (gtid->server_id == gtid_entry->gtid.server_id) + { + uint64 seq_no= gtid_entry->gtid.seq_no; + if (gtid->seq_no >= seq_no) + { + /* + This domain has reached its start position. So remove it, so that + further events will be processed normally. + */ + state->remove(>id_entry->gtid); + } + return gtid->seq_no <= seq_no; + } + else + return true; +} + + +/* This is used when we get an error during processing in do_event(); We will not queue any event to the thread, but we still need to wake it up to be sure that it will be returned to the pool. @@ -1955,13 +1990,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return -1; /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */ - if (unlikely(!current) && typ != GTID_EVENT) + is_group_event= Log_event::is_group_event(typ); + if (unlikely(!current) && typ != GTID_EVENT && + !(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)) return -1; /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); - if (typ == FORMAT_DESCRIPTION_EVENT) + if (unlikely(typ == FORMAT_DESCRIPTION_EVENT)) { Format_description_log_event *fdev= static_cast<Format_description_log_event *>(ev); @@ -1987,6 +2024,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } } } + else if (unlikely(typ == GTID_LIST_EVENT)) + { + Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev); + rpl_gtid *list= glev->list; + uint32 count= glev->count; + rli->update_relay_log_state(list, count); + while (count) + { + process_gtid_for_restart_pos(rli, list); + ++list; + --count; + } + } /* Stop queueing additional event groups once the SQL thread is requested to @@ -1996,7 +2046,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, been partially queued, but after that we will just ignore any further events the SQL driver thread may try to queue, and eventually it will stop. */ - is_group_event= Log_event::is_group_event(typ); if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave) sql_thread_stopping= true; if (sql_thread_stopping) @@ -2009,8 +2058,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 0; } + if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event) + { + if (typ == GTID_EVENT) + rli->gtid_skip_flag= GTID_SKIP_NOT; + else + { + if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE) + { + if (!Log_event::is_part_of_group(typ)) + rli->gtid_skip_flag= GTID_SKIP_NOT; + } + else + { + DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION); + if (typ == XID_EVENT || + (typ == QUERY_EVENT && + (((Query_log_event *)ev)->is_commit() || + ((Query_log_event *)ev)->is_rollback()))) + rli->gtid_skip_flag= GTID_SKIP_NOT; + } + delete_or_keep_event_post_apply(serial_rgi, typ, ev); + return 0; + } + } + if (typ == GTID_EVENT) { + rpl_gtid gtid; Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ? @@ -2022,6 +2097,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 1; } current= e; + + gtid.domain_id= gtid_ev->domain_id; + gtid.server_id= gtid_ev->server_id; + gtid.seq_no= gtid_ev->seq_no; + rli->update_relay_log_state(>id, 1); + if (process_gtid_for_restart_pos(rli, >id)) + { + /* + This domain has progressed further into the relay log before the last + SQL thread restart. So we need to skip this event group to not doubly + apply it. + */ + rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + delete_or_keep_event_post_apply(serial_rgi, typ, ev); + return 0; + } } else e= current; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index f1145d4cc4e..eebb8d136a9 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -339,5 +339,6 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool; extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, uint32 new_count, bool skip_check= false); +extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); #endif /* RPL_PARALLEL_H */ diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index eb37b5c6cbc..d98d216d3f2 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -62,7 +62,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), - inited(0), abort_slave(0), stop_for_until(0), + gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), m_flags(0) @@ -100,17 +100,9 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) Relay_log_info::~Relay_log_info() { - inuse_relaylog *cur; DBUG_ENTER("Relay_log_info::~Relay_log_info"); - cur= inuse_relaylog_list; - while (cur) - { - DBUG_ASSERT(cur->queued_count == cur->dequeued_count); - inuse_relaylog *next= cur->next; - my_free(cur); - cur= next; - } + reset_inuse_relaylog(); mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&log_space_lock); @@ -1383,14 +1375,34 @@ int Relay_log_info::alloc_inuse_relaylog(const char *name) { inuse_relaylog *ir; + uint32 gtid_count; + rpl_gtid *gtid_list; if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) { my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); return 1; } + gtid_count= relay_log_state.count(); + if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count, + MYF(MY_WME)))) + { + my_free(ir); + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count); + return 1; + } + if (relay_log_state.get_gtid_list(gtid_list, gtid_count)) + { + my_free(gtid_list); + my_free(ir); + DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } ir->rli= this; strmake_buf(ir->name, name); + ir->relay_log_state= gtid_list; + ir->relay_log_state_count= gtid_count; if (!inuse_relaylog_list) inuse_relaylog_list= ir; @@ -1405,6 +1417,45 @@ Relay_log_info::alloc_inuse_relaylog(const char *name) } +void +Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir) +{ + my_free(ir->relay_log_state); + my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock); + my_free(ir); +} + + +void +Relay_log_info::reset_inuse_relaylog() +{ + inuse_relaylog *cur= inuse_relaylog_list; + while (cur) + { + DBUG_ASSERT(cur->queued_count == cur->dequeued_count); + inuse_relaylog *next= cur->next; + free_inuse_relaylog(cur); + cur= next; + } + inuse_relaylog_list= last_inuse_relaylog= NULL; +} + + +int +Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count) +{ + int res= 0; + while (count) + { + if (relay_log_state.update_nolock(gtid_list, false)) + res= 1; + ++gtid_list; + --count; + } + return res; +} + + #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int rpl_load_gtid_slave_state(THD *thd) diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 96db33206d9..56300c6ba14 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -269,6 +269,8 @@ public: int events_till_abort; #endif + enum_gtid_skip_type gtid_skip_flag; + /* inited changes its value within LOCK_active_mi-guarded critical sections at times of start_slave_threads() (0->1) and end_slave() (1->0). @@ -344,6 +346,21 @@ public: size_t slave_patternload_file_size; rpl_parallel parallel; + /* + The relay_log_state keeps track of the current binlog state of the execution + of the relay log. This is used to know where to resume current GTID position + if the slave thread is stopped and restarted. + It is only accessed from the SQL thread, so it does not need any locking. + */ + rpl_binlog_state relay_log_state; + /* + The restart_gtid_state is used when the SQL thread restarts on a relay log + in GTID mode. In multi-domain parallel replication, each domain may have a + separat position, so some events in more progressed domains may need to be + skipped. This keeps track of the domains that have not yet reached their + starting event. + */ + slave_connection_state restart_gtid_pos; Relay_log_info(bool is_slave_recovery); ~Relay_log_info(); @@ -408,6 +425,9 @@ public: time_t event_creation_time, THD *thd, rpl_group_info *rgi); int alloc_inuse_relaylog(const char *name); + void free_inuse_relaylog(inuse_relaylog *ir); + void reset_inuse_relaylog(); + int update_relay_log_state(rpl_gtid *gtid_list, uint32 count); /** Is the replication inside a group? @@ -497,6 +517,12 @@ private: struct inuse_relaylog { inuse_relaylog *next; Relay_log_info *rli; + /* + relay_log_state holds the binlog state corresponding to the start of this + relay log file. It is an array with relay_log_state_count elements. + */ + rpl_gtid *relay_log_state; + uint32 relay_log_state_count; /* Number of events in this relay log queued for worker threads. */ int64 queued_count; /* Number of events completed by worker threads. */ diff --git a/sql/slave.cc b/sql/slave.cc index 6d64534faf9..d29f2161acf 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -943,6 +943,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start, Master_info::USE_GTID_CURRENT_POS); mi->events_queued_since_last_gtid= 0; mi->gtid_reconnect_event_skip_count= 0; + + mi->rli.restart_gtid_pos.reset(); } if (!error && (thread_mask & SLAVE_IO)) @@ -4479,6 +4481,16 @@ pthread_handler_t handle_slave_sql(void *arg) serial_rgi->gtid_sub_id= 0; serial_rgi->gtid_pending= false; + if (mi->using_gtid != Master_info::USE_GTID_NO) + { + /* + We initialize the relay log state from the know starting position. + It will then be updated as required by GTID and GTID_LIST events found + while applying events read from relay logs. + */ + rli->relay_log_state.load(&rpl_global_gtid_slave_state); + } + rli->gtid_skip_flag = GTID_SKIP_NOT; if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, @@ -4489,6 +4501,7 @@ pthread_handler_t handle_slave_sql(void *arg) "Error initializing relay log position: %s", errmsg); goto err; } + rli->reset_inuse_relaylog(); if (rli->alloc_inuse_relaylog(rli->group_relay_log_name)) goto err; @@ -4705,7 +4718,49 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, thd->reset_query(); thd->reset_db(NULL, 0); if (rli->mi->using_gtid != Master_info::USE_GTID_NO) + { + ulong domain_count; + flush_relay_log_info(rli); + if (opt_slave_parallel_threads > 0) + { + /* + In parallel replication GTID mode, we may stop with different domains + at different positions in the relay log. + + To handle this when we restart the SQL thread, mark the current + per-domain position in the Relay_log_info. + */ + mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); + domain_count= rpl_global_gtid_slave_state.count(); + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + if (domain_count > 1) + { + inuse_relaylog *ir; + + /* + Load the starting GTID position, so that we can skip already applied + GTIDs when we restart the SQL thread. And set the start position in + the relay log back to a known safe place to start (prior to any not + yet applied transaction in any domain). + */ + rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0); + if ((ir= rli->inuse_relaylog_list)) + { + rpl_gtid *gtid= ir->relay_log_state; + uint32 count= ir->relay_log_state_count; + while (count > 0) + { + process_gtid_for_restart_pos(rli, gtid); + ++gtid; + --count; + } + strmake_buf(rli->group_relay_log_name, ir->name); + rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE; + } + } + } + } THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); thd->add_status_to_global(); mysql_mutex_lock(&rli->run_lock); @@ -4718,6 +4773,7 @@ err_during_init: /* Forget the relay log's format */ delete rli->relay_log.description_event_for_exec; rli->relay_log.description_event_for_exec= 0; + rli->reset_inuse_relaylog(); /* Wake up master_pos_wait() */ mysql_mutex_unlock(&rli->data_lock); DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions")); diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index b44bd943f89..8e18d218d37 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -2320,6 +2320,30 @@ static int send_format_descriptor_event(binlog_send_info *info, info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) fix_checksum(packet, ev_offset); } + else if (info->using_gtid_state) + { + /* + If this event has the field `created' set, then it will cause the + slave to delete all active temporary tables. This must not happen + if the slave received any later GTIDs in a previous connect, as + those GTIDs might have created new temporary tables that are still + needed. + + So here, we check if the starting GTID position was already + reached before this format description event. If not, we clear the + `created' flag to preserve temporary tables on the slave. (If the + slave connects at a position past this event, it means that it + already received and handled it in a previous connect). + */ + if (!info.gtid_state.is_pos_reached()) + { + int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ + ST_CREATED_OFFSET+ev_offset, (ulong) 0); + if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + fix_checksum(packet, ev_offset); + } + } /* send it */ if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length())) |