diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-03-04 13:36:54 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-03-04 13:36:54 +0100 |
commit | 3ef0b9b235a931be4264c096319976a180224354 (patch) | |
tree | 4b1ae3966d23fce4487b1c0a685699d2585b1d29 /sql/rpl_parallel.cc | |
parent | e33b48ae8b943b5d2aed4873f5ada7717860e162 (diff) | |
parent | 78c74dbe30d3a22feec5d069c7424d5a8a86ea4c (diff) | |
download | mariadb-git-3ef0b9b235a931be4264c096319976a180224354.tar.gz |
Merge MDEV-6589 and MDEV-6403 into 10.0.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 98 |
1 files changed, 95 insertions, 3 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index c6bb974f62f..ec177280fea 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1833,6 +1833,41 @@ rpl_parallel::wait_for_workers_idle(THD *thd) /* + Handle seeing a GTID during slave restart in GTID mode. If we stopped with + different replication domains having reached different positions in the relay + log, we need to skip event groups in domains that are further progressed. + + Updates the state with the seen GTID, and returns true if this GTID should + be skipped, false otherwise. +*/ +bool +process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid) +{ + slave_connection_state::entry *gtid_entry; + slave_connection_state *state= &rli->restart_gtid_pos; + + if (likely(state->count() == 0) || + !(gtid_entry= state->find_entry(gtid->domain_id))) + return false; + if (gtid->server_id == gtid_entry->gtid.server_id) + { + uint64 seq_no= gtid_entry->gtid.seq_no; + if (gtid->seq_no >= seq_no) + { + /* + This domain has reached its start position. So remove it, so that + further events will be processed normally. + */ + state->remove(>id_entry->gtid); + } + return gtid->seq_no <= seq_no; + } + else + return true; +} + + +/* This is used when we get an error during processing in do_event(); We will not queue any event to the thread, but we still need to wake it up to be sure that it will be returned to the pool. @@ -1893,13 +1928,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return -1; /* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */ - if (unlikely(!current) && typ != GTID_EVENT) + is_group_event= Log_event::is_group_event(typ); + if (unlikely(!current) && typ != GTID_EVENT && + !(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)) return -1; /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); - if (typ == FORMAT_DESCRIPTION_EVENT) + if (unlikely(typ == FORMAT_DESCRIPTION_EVENT)) { Format_description_log_event *fdev= static_cast<Format_description_log_event *>(ev); @@ -1925,6 +1962,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } } } + else if (unlikely(typ == GTID_LIST_EVENT)) + { + Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev); + rpl_gtid *list= glev->list; + uint32 count= glev->count; + rli->update_relay_log_state(list, count); + while (count) + { + process_gtid_for_restart_pos(rli, list); + ++list; + --count; + } + } /* Stop queueing additional event groups once the SQL thread is requested to @@ -1934,7 +1984,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, been partially queued, but after that we will just ignore any further events the SQL driver thread may try to queue, and eventually it will stop. */ - is_group_event= Log_event::is_group_event(typ); if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave) sql_thread_stopping= true; if (sql_thread_stopping) @@ -1947,8 +1996,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 0; } + if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event) + { + if (typ == GTID_EVENT) + rli->gtid_skip_flag= GTID_SKIP_NOT; + else + { + if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE) + { + if (!Log_event::is_part_of_group(typ)) + rli->gtid_skip_flag= GTID_SKIP_NOT; + } + else + { + DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION); + if (typ == XID_EVENT || + (typ == QUERY_EVENT && + (((Query_log_event *)ev)->is_commit() || + ((Query_log_event *)ev)->is_rollback()))) + rli->gtid_skip_flag= GTID_SKIP_NOT; + } + delete_or_keep_event_post_apply(serial_rgi, typ, ev); + return 0; + } + } + if (typ == GTID_EVENT) { + rpl_gtid gtid; Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? 0 : gtid_ev->domain_id); @@ -1959,6 +2034,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 1; } current= e; + + gtid.domain_id= gtid_ev->domain_id; + gtid.server_id= gtid_ev->server_id; + gtid.seq_no= gtid_ev->seq_no; + rli->update_relay_log_state(>id, 1); + if (process_gtid_for_restart_pos(rli, >id)) + { + /* + This domain has progressed further into the relay log before the last + SQL thread restart. So we need to skip this event group to not doubly + apply it. + */ + rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ? + GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); + delete_or_keep_event_post_apply(serial_rgi, typ, ev); + return 0; + } } else e= current; |