summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result35
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test59
-rw-r--r--sql/rpl_parallel.cc42
-rw-r--r--sql/rpl_rli.cc2
-rw-r--r--sql/rpl_rli.h2
5 files changed, 118 insertions, 22 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index fc4c3a3787e..3784ccb4dec 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -694,6 +694,7 @@ STOP SLAVE IO_THREAD;
SET GLOBAL debug_dbug=@old_dbug;
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
INSERT INTO t3 VALUES (82,0);
+SET binlog_format=@old_format;
SET debug_sync='RESET';
include/start_slave.inc
SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
@@ -726,6 +727,40 @@ SELECT * FROM t3 WHERE a >= 100 ORDER BY a;
a b
106 #
107 #
+*** MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction ***
+include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+include/start_slave.inc
+INSERT INTO t3 VALUES (110, 1);
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+a b
+110 1
+SET sql_log_bin=0;
+INSERT INTO t3 VALUES (111, 666);
+SET sql_log_bin=1;
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+INSERT INTO t3 VALUES (111, 2);
+SET debug_sync='now WAIT_FOR master_queued1';
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+INSERT INTO t3 VALUES (112, 3);
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='now SIGNAL master_cont1';
+SET debug_sync='RESET';
+include/wait_for_slave_sql_error.inc [errno=1062]
+include/wait_for_slave_sql_to_stop.inc
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+a b
+110 1
+111 666
+SET sql_log_bin=0;
+DELETE FROM t3 WHERE a=111 AND b=666;
+SET sql_log_bin=1;
+START SLAVE SQL_THREAD;
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+a b
+110 1
+111 2
+112 3
include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
include/start_slave.inc
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index 3ed4016380c..6bf339edb3a 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -1053,6 +1053,7 @@ SET GLOBAL slave_parallel_max_queued= @old_max_queued;
--connection server_1
INSERT INTO t3 VALUES (82,0);
+SET binlog_format=@old_format;
--save_master_pos
--connection server_2
@@ -1113,6 +1114,64 @@ INSERT INTO t3 VALUES (107, rand());
SELECT * FROM t3 WHERE a >= 100 ORDER BY a;
+--echo *** MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction ***
+
+--connection server_2
+--source include/stop_slave.inc
+SET GLOBAL slave_parallel_threads=10;
+--source include/start_slave.inc
+
+--connection server_1
+INSERT INTO t3 VALUES (110, 1);
+--save_master_pos
+
+--connection server_2
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+# Inject a duplicate key error.
+SET sql_log_bin=0;
+INSERT INTO t3 VALUES (111, 666);
+SET sql_log_bin=1;
+
+--connection server_1
+
+# Create a group commit with two inserts, the first one conflicts with a row on the slave
+--connect (con1,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1';
+send INSERT INTO t3 VALUES (111, 2);
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued1';
+
+--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_1,)
+SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2';
+send INSERT INTO t3 VALUES (112, 3);
+
+--connection server_1
+SET debug_sync='now WAIT_FOR master_queued2';
+SET debug_sync='now SIGNAL master_cont1';
+
+--connection con1
+REAP;
+--connection con2
+REAP;
+SET debug_sync='RESET';
+--save_master_pos
+
+--connection server_2
+--let $slave_sql_errno= 1062
+--source include/wait_for_slave_sql_error.inc
+--source include/wait_for_slave_sql_to_stop.inc
+# We should not see the row (112,3) here, it should be rolled back due to
+# error signal from the prior transaction.
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+SET sql_log_bin=0;
+DELETE FROM t3 WHERE a=111 AND b=666;
+SET sql_log_bin=1;
+START SLAVE SQL_THREAD;
+--sync_with_master
+SELECT * FROM t3 WHERE a >= 110 ORDER BY a;
+
+
--connection server_2
--source include/stop_slave.inc
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 3218acf2525..af797d55da3 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -20,6 +20,8 @@
struct rpl_parallel_thread_pool global_rpl_thread_pool;
+static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi,
+ int err);
static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
@@ -94,10 +96,11 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
static void
-finish_event_group(THD *thd, int err, uint64 sub_id,
- rpl_parallel_entry *entry, rpl_group_info *rgi)
+finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
+ rpl_group_info *rgi)
{
wait_for_commit *wfc= &rgi->commit_orderer;
+ int err;
/*
Remove any left-over registration to wait for a prior commit to
@@ -120,10 +123,10 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
waiting for us will in any case receive the error back from their
wait_for_prior_commit() call.
*/
- if (err)
+ if (rgi->worker_error)
wfc->unregister_wait_for_prior_commit();
- else
- err= wfc->wait_for_prior_commit(thd);
+ else if ((err= wfc->wait_for_prior_commit(thd)))
+ signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL;
/*
@@ -150,7 +153,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
not yet started should just skip their group, preparing for stop of the
SQL driver thread.
*/
- if (unlikely(rgi->is_error) &&
+ if (unlikely(rgi->worker_error) &&
entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
entry->stop_on_error_sub_id= sub_id;
/*
@@ -163,14 +166,14 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
- wfc->wakeup_subsequent_commits(err);
+ wfc->wakeup_subsequent_commits(rgi->worker_error);
}
static void
-signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
+signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
{
- rgi->is_error= true;
+ rgi->worker_error= err;
rgi->cleanup_context(thd, true);
rgi->rli->abort_slave= true;
rgi->rli->stop_for_until= false;
@@ -294,7 +297,6 @@ handle_rpl_parallel_thread(void *arg)
continue;
}
- err= 0;
group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
@@ -346,12 +348,12 @@ handle_rpl_parallel_thread(void *arg)
did_enter_cond= true;
do
{
- if (thd->check_killed() && !rgi->is_error)
+ if (thd->check_killed() && !rgi->worker_error)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
slave_output_error_info(rgi->rli, thd);
- signal_error_to_sql_driver_thread(thd, rgi);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
/*
Even though we were killed, we need to continue waiting for the
prior event groups to signal that we can continue. Otherwise we
@@ -417,7 +419,7 @@ handle_rpl_parallel_thread(void *arg)
*/
rgi->cleanup_context(thd, true);
thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
- thd->wait_for_commit_ptr->wakeup_subsequent_commits(err);
+ thd->wait_for_commit_ptr->wakeup_subsequent_commits(rgi->worker_error);
}
thd->wait_for_commit_ptr= &rgi->commit_orderer;
@@ -430,7 +432,7 @@ handle_rpl_parallel_thread(void *arg)
{
/* Error. */
slave_output_error_info(rgi->rli, thd);
- signal_error_to_sql_driver_thread(thd, rgi);
+ signal_error_to_sql_driver_thread(thd, rgi, 1);
}
else if (!res)
{
@@ -460,7 +462,7 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->is_error && !skip_event_group)
+ if (!rgi->worker_error && !skip_event_group)
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
@@ -474,15 +476,15 @@ handle_rpl_parallel_thread(void *arg)
events->next= qevs_to_free;
qevs_to_free= events;
- if (err)
+ if (unlikely(err) && !rgi->worker_error)
{
slave_output_error_info(rgi->rli, thd);
- signal_error_to_sql_driver_thread(thd, rgi);
+ signal_error_to_sql_driver_thread(thd, rgi, err);
}
if (end_of_group)
{
in_event_group= false;
- finish_event_group(thd, err, event_gtid_sub_id, entry, rgi);
+ finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
group_rgi= rgi= NULL;
@@ -541,9 +543,9 @@ handle_rpl_parallel_thread(void *arg)
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
thd->wait_for_prior_commit();
- finish_event_group(thd, 1, group_rgi->gtid_sub_id,
+ signal_error_to_sql_driver_thread(thd, group_rgi, 1);
+ finish_event_group(thd, group_rgi->gtid_sub_id,
group_rgi->parallel_entry, group_rgi);
- signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index c9b4153d28e..0d0c8c9df70 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1489,7 +1489,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
tables_to_lock_count= 0;
trans_retries= 0;
last_event_start_time= 0;
- is_error= false;
+ worker_error= 0;
row_stmt_start_timestamp= 0;
long_find_row_note_printed= false;
did_mark_start_commit= false;
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 100ce25fe9c..48193afce4d 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -569,7 +569,7 @@ struct rpl_group_info
*/
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
- bool is_error;
+ int worker_error;
/*
Set true when we signalled that we reach the commit phase. Used to avoid
counting one event group twice.