summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2014-08-15 11:31:13 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-08-15 11:31:13 +0200
commitcfa1ce81bb7992c362958bb95f41325ce2109834 (patch)
tree1e80daded3dc2dc6df1c37f93b03f4ffd7a58807 /sql/rpl_parallel.cc
parent65ac881c8096cd069e622da6cd699ff1d4aaac57 (diff)
downloadmariadb-git-cfa1ce81bb7992c362958bb95f41325ce2109834.tar.gz
MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0
The problem occured when using parallel replication, and an error occured that caused the SQL thread to stop when the IO thread had already reached a following binlog file from the master (or otherwise performed a relay log rotation). In this case, the Rotate Event at the end of the relay log file could still be executed, even though an earlier event in that relay log file had gotten an error. This would cause the position to be incorrectly updated, so that upon restart of the SQL thread, the event that had failed would be silently skipped and ignored, causing replication corruption. Fixed by checking before executing Rotate Event, whether an earlier event has failed. If so, the Rotate Event is not executed, just dequeued, same as for other normal events following a failing event.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc30
1 files changed, 23 insertions, 7 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 91bd636d3f5..eeb66821809 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -22,18 +22,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
+ Log_event *ev;
+
+ DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
+ ev= qev->ev;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
+ ev->thd= thd;
- /* ToDo: Access to thd, and what about rli, split out a parallel part? */
- mysql_mutex_lock(&rli->data_lock);
- qev->ev->thd= thd;
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
- err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
+ mysql_mutex_lock(&rli->data_lock);
+ /* Mutex will be released in apply_event_and_update_pos(). */
+ err= apply_event_and_update_pos(ev, thd, rgi, rpt);
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
@@ -47,6 +51,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
{
int cmp;
Relay_log_info *rli;
+ rpl_parallel_entry *e;
+
/*
Events that are not part of an event group, such as Format Description,
Stop, GTID List and such, are executed directly in the driver SQL thread,
@@ -57,6 +63,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
if ((thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions)
return;
+
+ /* Do not update position if an earlier event group caused an error abort. */
+ DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
+ e= qev->entry_for_queued;
+ if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort)
+ return;
+
rli= qev->rgi->rli;
mysql_mutex_lock(&rli->data_lock);
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
@@ -566,7 +579,7 @@ handle_rpl_parallel_thread(void *arg)
bool end_of_group, group_ending;
total_event_size+= events->event_size;
- if (!events->ev)
+ if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{
handle_queued_pos_update(thd, events);
events->next= qevs_to_free;
@@ -574,6 +587,7 @@ handle_rpl_parallel_thread(void *arg)
events= next;
continue;
}
+ DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
@@ -1082,6 +1096,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
return NULL;
}
+ qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT;
qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL;
@@ -1824,7 +1839,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1;
}
/*
- Queue an empty event, so that the position will be updated in a
+ Queue a position update, so that the position will be updated in a
reasonable way relative to other events:
- If the currently executing events are queued serially for a single
@@ -1835,7 +1850,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
- qev->ev= NULL;
+ qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE;
+ qev->entry_for_queued= e;
}
else
{