diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-02-18 12:22:50 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-03-04 13:36:04 +0100 |
commit | ad0d203f2ec9b3c696e6c688fe9314f498efc232 (patch) | |
tree | 7ee3ae1f2507cece7ab6297968a3ed94e04f77d2 /sql/rpl_rli.cc | |
parent | fb71449b10100e9a0f887b1585000fbfab294f3c (diff) | |
download | mariadb-git-ad0d203f2ec9b3c696e6c688fe9314f498efc232.tar.gz |
MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication
The problem occurs in parallel replication in GTID mode, when we are using
multiple replication domains. In this case, if the SQL thread stops, the
slave GTID position may refer to a different point in the relay log for each
domain.
The bug was that when the SQL thread was stopped and restarted (but the IO
thread was kept running), the SQL thread would resume applying the relay log
from the point of the most advanced replication domain, silently skipping all
earlier events within other domains. This caused replication corruption.
This patch solves the problem by storing, when the SQL thread stops with
multiple parallel replication domains active, the current GTID
position. Additionally, the current position in the relay logs is moved back
to a point known to be earlier than the current position of any replication
domain. Then when the SQL thread restarts from the earlier position, GTIDs
encountered are compared against the stored GTID position. Any GTID that was
already applied before the stop is skipped to avoid duplicate apply.
This patch should have no effect if multi-domain GTID parallel replication is
not used. Similarly, if both SQL and IO thread are stopped and restarted, the
patch has no effect, as in this case the existing relay logs are removed and
re-fetched from the master at the current global @@gtid_slave_pos.
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 72 |
1 files changed, 61 insertions, 11 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index a751dd16650..4ca8282956c 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -62,7 +62,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), - inited(0), abort_slave(0), stop_for_until(0), + gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), m_flags(0) @@ -100,18 +100,9 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) Relay_log_info::~Relay_log_info() { - inuse_relaylog *cur; DBUG_ENTER("Relay_log_info::~Relay_log_info"); - cur= inuse_relaylog_list; - while (cur) - { - DBUG_ASSERT(cur->queued_count == cur->dequeued_count); - inuse_relaylog *next= cur->next; - my_atomic_rwlock_destroy(&cur->inuse_relaylog_atomic_lock); - my_free(cur); - cur= next; - } + reset_inuse_relaylog(); mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&log_space_lock); @@ -1384,14 +1375,34 @@ int Relay_log_info::alloc_inuse_relaylog(const char *name) { inuse_relaylog *ir; + uint32 gtid_count; + rpl_gtid *gtid_list; if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) { my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); return 1; } + gtid_count= relay_log_state.count(); + if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count, + MYF(MY_WME)))) + { + my_free(ir); + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count); + return 1; + } + if (relay_log_state.get_gtid_list(gtid_list, gtid_count)) + { + my_free(gtid_list); + my_free(ir); + DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + return 1; + } ir->rli= this; strmake_buf(ir->name, name); + ir->relay_log_state= gtid_list; + ir->relay_log_state_count= gtid_count; if (!inuse_relaylog_list) inuse_relaylog_list= ir; @@ -1407,6 +1418,45 @@ Relay_log_info::alloc_inuse_relaylog(const char *name) } +void +Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir) +{ + my_free(ir->relay_log_state); + my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock); + my_free(ir); +} + + +void +Relay_log_info::reset_inuse_relaylog() +{ + inuse_relaylog *cur= inuse_relaylog_list; + while (cur) + { + DBUG_ASSERT(cur->queued_count == cur->dequeued_count); + inuse_relaylog *next= cur->next; + free_inuse_relaylog(cur); + cur= next; + } + inuse_relaylog_list= last_inuse_relaylog= NULL; +} + + +int +Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count) +{ + int res= 0; + while (count) + { + if (relay_log_state.update_nolock(gtid_list, false)) + res= 1; + ++gtid_list; + --count; + } + return res; +} + + #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int rpl_load_gtid_slave_state(THD *thd) |