diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel.result | 35 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 59 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 42 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 2 | ||||
-rw-r--r-- | sql/rpl_rli.h | 2 |
5 files changed, 118 insertions, 22 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index fc4c3a3787e..3784ccb4dec 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -694,6 +694,7 @@ STOP SLAVE IO_THREAD; SET GLOBAL debug_dbug=@old_dbug; SET GLOBAL slave_parallel_max_queued= @old_max_queued; INSERT INTO t3 VALUES (82,0); +SET binlog_format=@old_format; SET debug_sync='RESET'; include/start_slave.inc SELECT * FROM t3 WHERE a >= 80 ORDER BY a; @@ -726,6 +727,40 @@ SELECT * FROM t3 WHERE a >= 100 ORDER BY a; a b 106 # 107 # +*** MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction *** +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +include/start_slave.inc +INSERT INTO t3 VALUES (110, 1); +SELECT * FROM t3 WHERE a >= 110 ORDER BY a; +a b +110 1 +SET sql_log_bin=0; +INSERT INTO t3 VALUES (111, 666); +SET sql_log_bin=1; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'; +INSERT INTO t3 VALUES (111, 2); +SET debug_sync='now WAIT_FOR master_queued1'; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'; +INSERT INTO t3 VALUES (112, 3); +SET debug_sync='now WAIT_FOR master_queued2'; +SET debug_sync='now SIGNAL master_cont1'; +SET debug_sync='RESET'; +include/wait_for_slave_sql_error.inc [errno=1062] +include/wait_for_slave_sql_to_stop.inc +SELECT * FROM t3 WHERE a >= 110 ORDER BY a; +a b +110 1 +111 666 +SET sql_log_bin=0; +DELETE FROM t3 WHERE a=111 AND b=666; +SET sql_log_bin=1; +START SLAVE SQL_THREAD; +SELECT * FROM t3 WHERE a >= 110 ORDER BY a; +a b +110 1 +111 2 +112 3 include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 3ed4016380c..6bf339edb3a 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -1053,6 +1053,7 @@ SET GLOBAL slave_parallel_max_queued= @old_max_queued; --connection server_1 INSERT INTO t3 VALUES (82,0); +SET binlog_format=@old_format; --save_master_pos --connection server_2 @@ -1113,6 +1114,64 @@ INSERT INTO t3 VALUES (107, rand()); SELECT * FROM t3 WHERE a >= 100 ORDER BY a; +--echo *** MDEV-5921: In parallel replication, an error is not correctly signalled to the next transaction *** + +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +--source include/start_slave.inc + +--connection server_1 +INSERT INTO t3 VALUES (110, 1); +--save_master_pos + +--connection server_2 +--sync_with_master +SELECT * FROM t3 WHERE a >= 110 ORDER BY a; +# Inject a duplicate key error. +SET sql_log_bin=0; +INSERT INTO t3 VALUES (111, 666); +SET sql_log_bin=1; + +--connection server_1 + +# Create a group commit with two inserts, the first one conflicts with a row on the slave +--connect (con1,127.0.0.1,root,,test,$SERVER_MYPORT_1,) +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'; +send INSERT INTO t3 VALUES (111, 2); +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued1'; + +--connect (con2,127.0.0.1,root,,test,$SERVER_MYPORT_1,) +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'; +send INSERT INTO t3 VALUES (112, 3); + +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued2'; +SET debug_sync='now SIGNAL master_cont1'; + +--connection con1 +REAP; +--connection con2 +REAP; +SET debug_sync='RESET'; +--save_master_pos + +--connection server_2 +--let $slave_sql_errno= 1062 +--source include/wait_for_slave_sql_error.inc +--source include/wait_for_slave_sql_to_stop.inc +# We should not see the row (112,3) here, it should be rolled back due to +# error signal from the prior transaction. +SELECT * FROM t3 WHERE a >= 110 ORDER BY a; +SET sql_log_bin=0; +DELETE FROM t3 WHERE a=111 AND b=666; +SET sql_log_bin=1; +START SLAVE SQL_THREAD; +--sync_with_master +SELECT * FROM t3 WHERE a >= 110 ORDER BY a; + + --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 3218acf2525..af797d55da3 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -20,6 +20,8 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool; +static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, + int err); static int rpt_handle_event(rpl_parallel_thread::queued_event *qev, @@ -94,10 +96,11 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) static void -finish_event_group(THD *thd, int err, uint64 sub_id, - rpl_parallel_entry *entry, rpl_group_info *rgi) +finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry, + rpl_group_info *rgi) { wait_for_commit *wfc= &rgi->commit_orderer; + int err; /* Remove any left-over registration to wait for a prior commit to @@ -120,10 +123,10 @@ finish_event_group(THD *thd, int err, uint64 sub_id, waiting for us will in any case receive the error back from their wait_for_prior_commit() call. */ - if (err) + if (rgi->worker_error) wfc->unregister_wait_for_prior_commit(); - else - err= wfc->wait_for_prior_commit(thd); + else if ((err= wfc->wait_for_prior_commit(thd))) + signal_error_to_sql_driver_thread(thd, rgi, err); thd->wait_for_commit_ptr= NULL; /* @@ -150,7 +153,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id, not yet started should just skip their group, preparing for stop of the SQL driver thread. */ - if (unlikely(rgi->is_error) && + if (unlikely(rgi->worker_error) && entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX) entry->stop_on_error_sub_id= sub_id; /* @@ -163,14 +166,14 @@ finish_event_group(THD *thd, int err, uint64 sub_id, thd->clear_error(); thd->get_stmt_da()->reset_diagnostics_area(); - wfc->wakeup_subsequent_commits(err); + wfc->wakeup_subsequent_commits(rgi->worker_error); } static void -signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi) +signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err) { - rgi->is_error= true; + rgi->worker_error= err; rgi->cleanup_context(thd, true); rgi->rli->abort_slave= true; rgi->rli->stop_for_until= false; @@ -294,7 +297,6 @@ handle_rpl_parallel_thread(void *arg) continue; } - err= 0; group_rgi= rgi; gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ @@ -346,12 +348,12 @@ handle_rpl_parallel_thread(void *arg) did_enter_cond= true; do { - if (thd->check_killed() && !rgi->is_error) + if (thd->check_killed() && !rgi->worker_error) { DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); thd->send_kill_message(); slave_output_error_info(rgi->rli, thd); - signal_error_to_sql_driver_thread(thd, rgi); + signal_error_to_sql_driver_thread(thd, rgi, 1); /* Even though we were killed, we need to continue waiting for the prior event groups to signal that we can continue. Otherwise we @@ -417,7 +419,7 @@ handle_rpl_parallel_thread(void *arg) */ rgi->cleanup_context(thd, true); thd->wait_for_commit_ptr->unregister_wait_for_prior_commit(); - thd->wait_for_commit_ptr->wakeup_subsequent_commits(err); + thd->wait_for_commit_ptr->wakeup_subsequent_commits(rgi->worker_error); } thd->wait_for_commit_ptr= &rgi->commit_orderer; @@ -430,7 +432,7 @@ handle_rpl_parallel_thread(void *arg) { /* Error. */ slave_output_error_info(rgi->rli, thd); - signal_error_to_sql_driver_thread(thd, rgi); + signal_error_to_sql_driver_thread(thd, rgi, 1); } else if (!res) { @@ -460,7 +462,7 @@ handle_rpl_parallel_thread(void *arg) processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!rgi->is_error && !skip_event_group) + if (!rgi->worker_error && !skip_event_group) err= rpt_handle_event(events, rpt); else err= thd->wait_for_prior_commit(); @@ -474,15 +476,15 @@ handle_rpl_parallel_thread(void *arg) events->next= qevs_to_free; qevs_to_free= events; - if (err) + if (unlikely(err) && !rgi->worker_error) { slave_output_error_info(rgi->rli, thd); - signal_error_to_sql_driver_thread(thd, rgi); + signal_error_to_sql_driver_thread(thd, rgi, err); } if (end_of_group) { in_event_group= false; - finish_event_group(thd, err, event_gtid_sub_id, entry, rgi); + finish_event_group(thd, event_gtid_sub_id, entry, rgi); rgi->next= rgis_to_free; rgis_to_free= rgi; group_rgi= rgi= NULL; @@ -541,9 +543,9 @@ handle_rpl_parallel_thread(void *arg) */ mysql_mutex_unlock(&rpt->LOCK_rpl_thread); thd->wait_for_prior_commit(); - finish_event_group(thd, 1, group_rgi->gtid_sub_id, + signal_error_to_sql_driver_thread(thd, group_rgi, 1); + finish_event_group(thd, group_rgi->gtid_sub_id, group_rgi->parallel_entry, group_rgi); - signal_error_to_sql_driver_thread(thd, group_rgi); in_event_group= false; mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->free_rgi(group_rgi); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index c9b4153d28e..0d0c8c9df70 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1489,7 +1489,7 @@ rpl_group_info::reinit(Relay_log_info *rli) tables_to_lock_count= 0; trans_retries= 0; last_event_start_time= 0; - is_error= false; + worker_error= 0; row_stmt_start_timestamp= 0; long_find_row_note_printed= false; did_mark_start_commit= false; diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 100ce25fe9c..48193afce4d 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -569,7 +569,7 @@ struct rpl_group_info */ char future_event_master_log_name[FN_REFLEN]; bool is_parallel_exec; - bool is_error; + int worker_error; /* Set true when we signalled that we reach the commit phase. Used to avoid counting one event group twice. |