diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-08-15 11:31:13 +0200 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2014-08-15 11:31:13 +0200 |
commit | cfa1ce81bb7992c362958bb95f41325ce2109834 (patch) | |
tree | 1e80daded3dc2dc6df1c37f93b03f4ffd7a58807 /sql/rpl_parallel.cc | |
parent | 65ac881c8096cd069e622da6cd699ff1d4aaac57 (diff) | |
download | mariadb-git-cfa1ce81bb7992c362958bb95f41325ce2109834.tar.gz |
MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0
The problem occured when using parallel replication, and an error occured that
caused the SQL thread to stop when the IO thread had already reached a
following binlog file from the master (or otherwise performed a relay log
rotation).
In this case, the Rotate Event at the end of the relay log file could still be
executed, even though an earlier event in that relay log file had gotten an
error. This would cause the position to be incorrectly updated, so that upon
restart of the SQL thread, the event that had failed would be silently skipped
and ignored, causing replication corruption.
Fixed by checking before executing Rotate Event, whether an earlier event
has failed. If so, the Rotate Event is not executed, just dequeued, same as
for other normal events following a failing event.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 91bd636d3f5..eeb66821809 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -22,18 +22,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rpl_group_info *rgi= qev->rgi; Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; + Log_event *ev; + + DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT); + ev= qev->ev; thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; + ev->thd= thd; - /* ToDo: Access to thd, and what about rli, split out a parallel part? */ - mysql_mutex_lock(&rli->data_lock); - qev->ev->thd= thd; strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); rgi->event_relay_log_name= rgi->event_relay_log_name_buf; rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); - err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); + mysql_mutex_lock(&rli->data_lock); + /* Mutex will be released in apply_event_and_update_pos(). */ + err= apply_event_and_update_pos(ev, thd, rgi, rpt); thread_safe_increment64(&rli->executed_entries, &slave_executed_entries_lock); @@ -47,6 +51,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) { int cmp; Relay_log_info *rli; + rpl_parallel_entry *e; + /* Events that are not part of an event group, such as Format Description, Stop, GTID List and such, are executed directly in the driver SQL thread, @@ -57,6 +63,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) if ((thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) return; + + /* Do not update position if an earlier event group caused an error abort. */ + DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE); + e= qev->entry_for_queued; + if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort) + return; + rli= qev->rgi->rli; mysql_mutex_lock(&rli->data_lock); cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); @@ -566,7 +579,7 @@ handle_rpl_parallel_thread(void *arg) bool end_of_group, group_ending; total_event_size+= events->event_size; - if (!events->ev) + if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) { handle_queued_pos_update(thd, events); events->next= qevs_to_free; @@ -574,6 +587,7 @@ handle_rpl_parallel_thread(void *arg) events= next; continue; } + DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); thd->rgi_slave= group_rgi= rgi; gco= rgi->gco; @@ -1082,6 +1096,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); return NULL; } + qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT; qev->ev= ev; qev->event_size= event_size; qev->next= NULL; @@ -1824,7 +1839,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 1; } /* - Queue an empty event, so that the position will be updated in a + Queue a position update, so that the position will be updated in a reasonable way relative to other events: - If the currently executing events are queued serially for a single @@ -1835,7 +1850,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, least the position will not be updated until one of them has reached the current point. */ - qev->ev= NULL; + qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE; + qev->entry_for_queued= e; } else { |