summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result41
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test58
-rw-r--r--sql/rpl_parallel.cc30
-rw-r--r--sql/rpl_parallel.h10
4 files changed, 131 insertions, 8 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 198b3d32619..fb86d46b01e 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -882,6 +882,47 @@ a
SELECT * FROM t6 ORDER BY a;
a
4
+*** MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 ***
+INSERT INTO t2 VALUES (31);
+include/save_master_gtid.inc
+include/sync_with_master_gtid.inc
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads= 0;
+include/start_slave.inc
+SET sql_log_bin= 0;
+INSERT INTO t2 VALUES (32);
+SET sql_log_bin= 1;
+INSERT INTO t2 VALUES (32);
+FLUSH LOGS;
+INSERT INTO t2 VALUES (33);
+INSERT INTO t2 VALUES (34);
+SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+a
+31
+32
+33
+34
+include/save_master_gtid.inc
+include/wait_for_slave_sql_error.inc [errno=1062]
+include/stop_slave_io.inc
+SET GLOBAL slave_parallel_threads=10;
+START SLAVE;
+include/wait_for_slave_sql_error.inc [errno=1062]
+START SLAVE SQL_THREAD;
+include/wait_for_slave_sql_error.inc [errno=1062]
+SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+a
+31
+32
+SET sql_slave_skip_counter= 1;
+include/start_slave.inc
+include/sync_with_master_gtid.inc
+SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+a
+31
+32
+33
+34
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index f834dbef2fd..4f01ef7765b 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -1408,6 +1408,64 @@ SELECT * FROM t6 ORDER BY a;
SELECT * FROM t6 ORDER BY a;
+--echo *** MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 ***
+
+--connection server_1
+INSERT INTO t2 VALUES (31);
+--source include/save_master_gtid.inc
+
+--connection server_2
+--source include/sync_with_master_gtid.inc
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads= 0;
+--source include/start_slave.inc
+
+# Force a duplicate key error on the slave.
+SET sql_log_bin= 0;
+INSERT INTO t2 VALUES (32);
+SET sql_log_bin= 1;
+
+--connection server_1
+INSERT INTO t2 VALUES (32);
+# Rotate the binlog; the bug is triggered when the master binlog file changes
+# after the event group that causes the duplicate key error.
+FLUSH LOGS;
+INSERT INTO t2 VALUES (33);
+INSERT INTO t2 VALUES (34);
+SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+--source include/save_master_gtid.inc
+
+--connection server_2
+--let $slave_sql_errno= 1062
+--source include/wait_for_slave_sql_error.inc
+
+--connection server_2
+--source include/stop_slave_io.inc
+SET GLOBAL slave_parallel_threads=10;
+START SLAVE;
+
+--let $slave_sql_errno= 1062
+--source include/wait_for_slave_sql_error.inc
+
+# Note: IO thread is still running at this point.
+# The bug seems to have been that restarting the SQL thread after an error with
+# the IO thread still running, somehow picks up a later relay log position and
+# thus ends up skipping the failing event, rather than re-executing.
+
+START SLAVE SQL_THREAD;
+--let $slave_sql_errno= 1062
+--source include/wait_for_slave_sql_error.inc
+
+SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+
+# Skip the duplicate error, so we can proceed.
+SET sql_slave_skip_counter= 1;
+--source include/start_slave.inc
+--source include/sync_with_master_gtid.inc
+
+SELECT * FROM t2 WHERE a >= 30 ORDER BY a;
+
+
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 91bd636d3f5..eeb66821809 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -22,18 +22,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
+ Log_event *ev;
+
+ DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
+ ev= qev->ev;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
+ ev->thd= thd;
- /* ToDo: Access to thd, and what about rli, split out a parallel part? */
- mysql_mutex_lock(&rli->data_lock);
- qev->ev->thd= thd;
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
- err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
+ mysql_mutex_lock(&rli->data_lock);
+ /* Mutex will be released in apply_event_and_update_pos(). */
+ err= apply_event_and_update_pos(ev, thd, rgi, rpt);
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
@@ -47,6 +51,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
{
int cmp;
Relay_log_info *rli;
+ rpl_parallel_entry *e;
+
/*
Events that are not part of an event group, such as Format Description,
Stop, GTID List and such, are executed directly in the driver SQL thread,
@@ -57,6 +63,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
if ((thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions)
return;
+
+ /* Do not update position if an earlier event group caused an error abort. */
+ DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
+ e= qev->entry_for_queued;
+ if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort)
+ return;
+
rli= qev->rgi->rli;
mysql_mutex_lock(&rli->data_lock);
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
@@ -566,7 +579,7 @@ handle_rpl_parallel_thread(void *arg)
bool end_of_group, group_ending;
total_event_size+= events->event_size;
- if (!events->ev)
+ if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{
handle_queued_pos_update(thd, events);
events->next= qevs_to_free;
@@ -574,6 +587,7 @@ handle_rpl_parallel_thread(void *arg)
events= next;
continue;
}
+ DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
@@ -1082,6 +1096,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
return NULL;
}
+ qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT;
qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL;
@@ -1824,7 +1839,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1;
}
/*
- Queue an empty event, so that the position will be updated in a
+ Queue a position update, so that the position will be updated in a
reasonable way relative to other events:
- If the currently executing events are queued serially for a single
@@ -1835,7 +1850,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
- qev->ev= NULL;
+ qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE;
+ qev->entry_for_queued= e;
}
else
{
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 415259cd3c4..c7e15528e97 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -72,7 +72,15 @@ struct rpl_parallel_thread {
rpl_parallel_entry *current_entry;
struct queued_event {
queued_event *next;
- Log_event *ev;
+ /*
+ queued_event can hold either an event to be executed, or just a binlog
+ position to be updated without any associated event.
+ */
+ enum queued_event_t { QUEUED_EVENT, QUEUED_POS_UPDATE } typ;
+ union {
+ Log_event *ev; /* QUEUED_EVENT */
+ rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE */
+ };
rpl_group_info *rgi;
inuse_relaylog *ir;
ulonglong future_event_relay_log_pos;