diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel.result | 41 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 58 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 30 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 10 |
4 files changed, 131 insertions, 8 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index 198b3d32619..fb86d46b01e 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -882,6 +882,47 @@ a SELECT * FROM t6 ORDER BY a; a 4 +*** MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 *** +INSERT INTO t2 VALUES (31); +include/save_master_gtid.inc +include/sync_with_master_gtid.inc +include/stop_slave.inc +SET GLOBAL slave_parallel_threads= 0; +include/start_slave.inc +SET sql_log_bin= 0; +INSERT INTO t2 VALUES (32); +SET sql_log_bin= 1; +INSERT INTO t2 VALUES (32); +FLUSH LOGS; +INSERT INTO t2 VALUES (33); +INSERT INTO t2 VALUES (34); +SELECT * FROM t2 WHERE a >= 30 ORDER BY a; +a +31 +32 +33 +34 +include/save_master_gtid.inc +include/wait_for_slave_sql_error.inc [errno=1062] +include/stop_slave_io.inc +SET GLOBAL slave_parallel_threads=10; +START SLAVE; +include/wait_for_slave_sql_error.inc [errno=1062] +START SLAVE SQL_THREAD; +include/wait_for_slave_sql_error.inc [errno=1062] +SELECT * FROM t2 WHERE a >= 30 ORDER BY a; +a +31 +32 +SET sql_slave_skip_counter= 1; +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t2 WHERE a >= 30 ORDER BY a; +a +31 +32 +33 +34 include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index f834dbef2fd..4f01ef7765b 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -1408,6 +1408,64 @@ SELECT * FROM t6 ORDER BY a; SELECT * FROM t6 ORDER BY a; +--echo *** MDEV-6551: Some replication errors are ignored if slave_parallel_threads > 0 *** + +--connection server_1 +INSERT INTO t2 VALUES (31); +--source include/save_master_gtid.inc + +--connection server_2 +--source include/sync_with_master_gtid.inc +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads= 0; +--source include/start_slave.inc + +# Force a duplicate key error on the slave. +SET sql_log_bin= 0; +INSERT INTO t2 VALUES (32); +SET sql_log_bin= 1; + +--connection server_1 +INSERT INTO t2 VALUES (32); +# Rotate the binlog; the bug is triggered when the master binlog file changes +# after the event group that causes the duplicate key error. +FLUSH LOGS; +INSERT INTO t2 VALUES (33); +INSERT INTO t2 VALUES (34); +SELECT * FROM t2 WHERE a >= 30 ORDER BY a; +--source include/save_master_gtid.inc + +--connection server_2 +--let $slave_sql_errno= 1062 +--source include/wait_for_slave_sql_error.inc + +--connection server_2 +--source include/stop_slave_io.inc +SET GLOBAL slave_parallel_threads=10; +START SLAVE; + +--let $slave_sql_errno= 1062 +--source include/wait_for_slave_sql_error.inc + +# Note: IO thread is still running at this point. +# The bug seems to have been that restarting the SQL thread after an error with +# the IO thread still running, somehow picks up a later relay log position and +# thus ends up skipping the failing event, rather than re-executing. + +START SLAVE SQL_THREAD; +--let $slave_sql_errno= 1062 +--source include/wait_for_slave_sql_error.inc + +SELECT * FROM t2 WHERE a >= 30 ORDER BY a; + +# Skip the duplicate error, so we can proceed. +SET sql_slave_skip_counter= 1; +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc + +SELECT * FROM t2 WHERE a >= 30 ORDER BY a; + + --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 91bd636d3f5..eeb66821809 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -22,18 +22,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rpl_group_info *rgi= qev->rgi; Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; + Log_event *ev; + + DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT); + ev= qev->ev; thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; + ev->thd= thd; - /* ToDo: Access to thd, and what about rli, split out a parallel part? */ - mysql_mutex_lock(&rli->data_lock); - qev->ev->thd= thd; strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name); rgi->event_relay_log_name= rgi->event_relay_log_name_buf; rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); - err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); + mysql_mutex_lock(&rli->data_lock); + /* Mutex will be released in apply_event_and_update_pos(). */ + err= apply_event_and_update_pos(ev, thd, rgi, rpt); thread_safe_increment64(&rli->executed_entries, &slave_executed_entries_lock); @@ -47,6 +51,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) { int cmp; Relay_log_info *rli; + rpl_parallel_entry *e; + /* Events that are not part of an event group, such as Format Description, Stop, GTID List and such, are executed directly in the driver SQL thread, @@ -57,6 +63,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) if ((thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) return; + + /* Do not update position if an earlier event group caused an error abort. */ + DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE); + e= qev->entry_for_queued; + if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort) + return; + rli= qev->rgi->rli; mysql_mutex_lock(&rli->data_lock); cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); @@ -566,7 +579,7 @@ handle_rpl_parallel_thread(void *arg) bool end_of_group, group_ending; total_event_size+= events->event_size; - if (!events->ev) + if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) { handle_queued_pos_update(thd, events); events->next= qevs_to_free; @@ -574,6 +587,7 @@ handle_rpl_parallel_thread(void *arg) events= next; continue; } + DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); thd->rgi_slave= group_rgi= rgi; gco= rgi->gco; @@ -1082,6 +1096,7 @@ rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev)); return NULL; } + qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT; qev->ev= ev; qev->event_size= event_size; qev->next= NULL; @@ -1824,7 +1839,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, return 1; } /* - Queue an empty event, so that the position will be updated in a + Queue a position update, so that the position will be updated in a reasonable way relative to other events: - If the currently executing events are queued serially for a single @@ -1835,7 +1850,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, least the position will not be updated until one of them has reached the current point. */ - qev->ev= NULL; + qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE; + qev->entry_for_queued= e; } else { diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 415259cd3c4..c7e15528e97 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -72,7 +72,15 @@ struct rpl_parallel_thread { rpl_parallel_entry *current_entry; struct queued_event { queued_event *next; - Log_event *ev; + /* + queued_event can hold either an event to be executed, or just a binlog + position to be updated without any associated event. + */ + enum queued_event_t { QUEUED_EVENT, QUEUED_POS_UPDATE } typ; + union { + Log_event *ev; /* QUEUED_EVENT */ + rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE */ + }; rpl_group_info *rgi; inuse_relaylog *ir; ulonglong future_event_relay_log_pos; |