summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result45
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test68
-rw-r--r--sql/rpl_parallel.cc56
-rw-r--r--sql/rpl_parallel.h1
4 files changed, 161 insertions, 9 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 2e13d914e62..03b102a1af9 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -117,7 +117,6 @@ include/start_slave.inc
FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
-SET binlog_format=@old_format;
BEGIN;
INSERT INTO t3 VALUES (2,102);
BEGIN;
@@ -211,6 +210,50 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''))
slave-bin.000003 # Xid # # COMMIT /* XID */
+*** Test STOP SLAVE in parallel mode ***
+include/stop_slave.inc
+SET binlog_direct_non_transactional_updates=0;
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
+SET sql_log_bin=1;
+BEGIN;
+INSERT INTO t2 VALUES (20);
+INSERT INTO t1 VALUES (20);
+INSERT INTO t2 VALUES (21);
+INSERT INTO t3 VALUES (20, 20);
+COMMIT;
+INSERT INTO t3 VALUES(21, 21);
+INSERT INTO t3 VALUES(22, 22);
+SET binlog_format=@old_format;
+BEGIN;
+INSERT INTO t2 VALUES (21);
+START SLAVE;
+STOP SLAVE;
+ROLLBACK;
+include/wait_for_slave_to_stop.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+a
+20
+SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
+a
+20
+21
+SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
+a b
+20 20
+include/start_slave.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+a
+20
+SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
+a
+20
+21
+SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
+a b
+20 20
+21 21
+22 22
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index bfc6283da66..89834b790d6 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -165,7 +165,6 @@ CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
# Create some sentinel rows so that the rows inserted in parallel fall into
# separate gaps and do not cause gap lock conflicts.
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
-SET binlog_format=@old_format;
--save_master_pos
--connection server_2
--sync_with_master
@@ -264,6 +263,73 @@ SELECT * FROM t3 ORDER BY a;
--source include/show_binlog_events.inc
+--echo *** Test STOP SLAVE in parallel mode ***
+--connection server_2
+--source include/stop_slave.inc
+
+--connection server_1
+# Set up a couple of transactions. The first will be blocked halfway
+# through on a lock, and while it is blocked we initiate STOP SLAVE.
+# We then test that the halfway-initiated transaction is allowed to
+# complete, but no subsequent ones.
+# We have to use statement-based mode and set
+# binlog_direct_non_transactional_updates=0; otherwise the binlog will
+# be split into two event groups, one for the MyISAM part and one for the
+# InnoDB part.
+SET binlog_direct_non_transactional_updates=0;
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
+SET sql_log_bin=1;
+BEGIN;
+INSERT INTO t2 VALUES (20);
+--disable_warnings
+INSERT INTO t1 VALUES (20);
+--disable_warnings
+INSERT INTO t2 VALUES (21);
+INSERT INTO t3 VALUES (20, 20);
+COMMIT;
+INSERT INTO t3 VALUES(21, 21);
+INSERT INTO t3 VALUES(22, 22);
+SET binlog_format=@old_format;
+--save_master_pos
+
+# Start a connection that will block the replicated transaction halfway.
+--connection con_temp1
+BEGIN;
+INSERT INTO t2 VALUES (21);
+
+--connection server_2
+START SLAVE;
+# Wait for the MyISAM change to be visible, after which replication will wait
+# for con_temp1 to roll back.
+--let $wait_condition= SELECT COUNT(*) = 1 FROM t1 WHERE a=20
+--source include/wait_condition.inc
+
+--connection con_temp2
+# Initiate slave stop. It will have to wait for the current event group
+# to complete.
+send STOP SLAVE;
+
+--connection con_temp1
+ROLLBACK;
+
+--connection con_temp2
+reap;
+
+--connection server_2
+--source include/wait_for_slave_to_stop.inc
+# We should see the first transaction applied, but not the two others.
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
+
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
+
+
--connection server_2
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 6c8c5b5c3fa..e80512a3580 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -77,6 +77,28 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
}
+static bool
+sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
+{
+ if (!rgi->rli->abort_slave && !abort_loop)
+ return false;
+
+ /*
+ Do not abort in the middle of an event group that cannot be rolled back.
+ */
+ if ((thd->transaction.all.modified_non_trans_table ||
+ (thd->variables.option_bits & OPTION_KEEP_LOG))
+ && in_event_group)
+ return false;
+ /* ToDo: should we add some timeout like in sql_slave_killed?
+ if (rgi->last_event_start_time == 0)
+ rgi->last_event_start_time= my_time(0);
+ */
+
+ return true;
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -131,7 +153,6 @@ handle_rpl_parallel_thread(void *arg)
"Waiting for work from SQL thread");
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
- /* Mark that this thread is now executing */
rpt->event_queue= rpt->last_in_queue= NULL;
thd->exit_cond(old_msg);
@@ -159,7 +180,7 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
- /* Save this, as it gets cleared once event group commits. */
+ /* Save this, as it gets cleared when the event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd;
@@ -197,7 +218,16 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer;
}
- rpt_handle_event(events, rpt);
+ /*
+ If the SQL thread is stopping, we just skip execution of all the
+ following event groups. We still do all the normal waiting and wakeup
+ processing between the event groups as a simple way to ensure that
+ everything is stopped and cleaned up correctly.
+ */
+ if (!sql_worker_killed(thd, rgi, in_event_group))
+ rpt_handle_event(events, rpt);
+ else
+ thd->wait_for_prior_commit();
end_of_group=
in_event_group &&
@@ -207,7 +237,6 @@ handle_rpl_parallel_thread(void *arg)
(!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) ||
!strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))));
- /* ToDo: must use rgi here, not rli, for thread safety. */
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
my_free(events);
@@ -516,7 +545,7 @@ free_rpl_parallel_entry(void *element)
rpl_parallel::rpl_parallel() :
- current(NULL)
+ current(NULL), sql_thread_stopping(false)
{
my_hash_init(&domain_hash, &my_charset_bin, 32,
offsetof(rpl_parallel_entry, domain_id), sizeof(uint32),
@@ -529,6 +558,7 @@ rpl_parallel::reset()
{
my_hash_reset(&domain_hash);
current= NULL;
+ sql_thread_stopping= false;
}
@@ -591,10 +621,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
rpl_group_info *rgi= NULL;
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;
+ bool is_group_event;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
+ /*
+ Stop queueing additional event groups once the SQL thread is requested to
+ stop.
+ */
+ if (((typ= ev->get_type_code()) == GTID_EVENT ||
+ !(is_group_event= Log_event::is_group_event(typ))) &&
+ rli->abort_slave)
+ sql_thread_stopping= true;
+ if (sql_thread_stopping)
+ return false;
+
if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
MYF(0))))
{
@@ -604,7 +646,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
qev->ev= ev;
qev->next= NULL;
- if ((typ= ev->get_type_code()) == GTID_EVENT)
+ if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
@@ -714,7 +756,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
e->current_sub_id= rgi->gtid_sub_id;
current= rgi->parallel_entry= e;
}
- else if (!Log_event::is_group_event(typ) || !current)
+ else if (!is_group_event || !current)
{
/*
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 8dfd0297199..b9106392faf 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -76,6 +76,7 @@ struct rpl_parallel_entry {
struct rpl_parallel {
HASH domain_hash;
rpl_parallel_entry *current;
+ bool sql_thread_stopping;
rpl_parallel();
~rpl_parallel();