summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-10-08 14:36:06 +0200
committerunknown <knielsen@knielsen-hq.org>2013-10-08 14:36:06 +0200
commit12c760ef71167a1ce6e1adaa084fb196b88e2e55 (patch)
treeefd36c70ae7510e9ba220af9e0ba7dc38497025a
parent45c3c71513b68b8de79f3e0a5e9779e7e8021716 (diff)
downloadmariadb-git-12c760ef71167a1ce6e1adaa084fb196b88e2e55.tar.gz
MDEV-4506: Parallel replication.
Improve STOP SLAVE in parallel mode. Now, the parallel part will queue the current event group to the end, and then stop queing any more events. Each worker will complete the current event group, and then just skip any further queued events.
-rw-r--r--mysql-test/suite/rpl/r/rpl_parallel.result45
-rw-r--r--mysql-test/suite/rpl/t/rpl_parallel.test68
-rw-r--r--sql/rpl_parallel.cc56
-rw-r--r--sql/rpl_parallel.h1
4 files changed, 161 insertions, 9 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result
index 2e13d914e62..03b102a1af9 100644
--- a/mysql-test/suite/rpl/r/rpl_parallel.result
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result
@@ -117,7 +117,6 @@ include/start_slave.inc
FLUSH LOGS;
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
-SET binlog_format=@old_format;
BEGIN;
INSERT INTO t3 VALUES (2,102);
BEGIN;
@@ -211,6 +210,50 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
'group_commit_waiting_for_prior SIGNAL slave_queued3',
''))
slave-bin.000003 # Xid # # COMMIT /* XID */
+*** Test STOP SLAVE in parallel mode ***
+include/stop_slave.inc
+SET binlog_direct_non_transactional_updates=0;
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
+SET sql_log_bin=1;
+BEGIN;
+INSERT INTO t2 VALUES (20);
+INSERT INTO t1 VALUES (20);
+INSERT INTO t2 VALUES (21);
+INSERT INTO t3 VALUES (20, 20);
+COMMIT;
+INSERT INTO t3 VALUES(21, 21);
+INSERT INTO t3 VALUES(22, 22);
+SET binlog_format=@old_format;
+BEGIN;
+INSERT INTO t2 VALUES (21);
+START SLAVE;
+STOP SLAVE;
+ROLLBACK;
+include/wait_for_slave_to_stop.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+a
+20
+SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
+a
+20
+21
+SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
+a b
+20 20
+include/start_slave.inc
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+a
+20
+SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
+a
+20
+21
+SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
+a b
+20 20
+21 21
+22 22
include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
SET GLOBAL slave_parallel_threads=0;
diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test
index bfc6283da66..89834b790d6 100644
--- a/mysql-test/suite/rpl/t/rpl_parallel.test
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test
@@ -165,7 +165,6 @@ CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
# Create some sentinel rows so that the rows inserted in parallel fall into
# separate gaps and do not cause gap lock conflicts.
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
-SET binlog_format=@old_format;
--save_master_pos
--connection server_2
--sync_with_master
@@ -264,6 +263,73 @@ SELECT * FROM t3 ORDER BY a;
--source include/show_binlog_events.inc
+--echo *** Test STOP SLAVE in parallel mode ***
+--connection server_2
+--source include/stop_slave.inc
+
+--connection server_1
+# Set up a couple of transactions. The first will be blocked halfway
+# through on a lock, and while it is blocked we initiate STOP SLAVE.
+# We then test that the halfway-initiated transaction is allowed to
+# complete, but no subsequent ones.
+# We have to use statement-based mode and set
+# binlog_direct_non_transactional_updates=0; otherwise the binlog will
+# be split into two event groups, one for the MyISAM part and one for the
+# InnoDB part.
+SET binlog_direct_non_transactional_updates=0;
+SET sql_log_bin=0;
+CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
+SET sql_log_bin=1;
+BEGIN;
+INSERT INTO t2 VALUES (20);
+--disable_warnings
+INSERT INTO t1 VALUES (20);
+--disable_warnings
+INSERT INTO t2 VALUES (21);
+INSERT INTO t3 VALUES (20, 20);
+COMMIT;
+INSERT INTO t3 VALUES(21, 21);
+INSERT INTO t3 VALUES(22, 22);
+SET binlog_format=@old_format;
+--save_master_pos
+
+# Start a connection that will block the replicated transaction halfway.
+--connection con_temp1
+BEGIN;
+INSERT INTO t2 VALUES (21);
+
+--connection server_2
+START SLAVE;
+# Wait for the MyISAM change to be visible, after which replication will wait
+# for con_temp1 to roll back.
+--let $wait_condition= SELECT COUNT(*) = 1 FROM t1 WHERE a=20
+--source include/wait_condition.inc
+
+--connection con_temp2
+# Initiate slave stop. It will have to wait for the current event group
+# to complete.
+send STOP SLAVE;
+
+--connection con_temp1
+ROLLBACK;
+
+--connection con_temp2
+reap;
+
+--connection server_2
+--source include/wait_for_slave_to_stop.inc
+# We should see the first transaction applied, but not the two others.
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
+
+--source include/start_slave.inc
+--sync_with_master
+SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
+SELECT * FROM t2 WHERE a >= 20 ORDER BY a;
+SELECT * FROM t3 WHERE a >= 20 ORDER BY a;
+
+
--connection server_2
--source include/stop_slave.inc
SET GLOBAL binlog_format=@old_format;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 6c8c5b5c3fa..e80512a3580 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -77,6 +77,28 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
}
+static bool
+sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
+{
+ if (!rgi->rli->abort_slave && !abort_loop)
+ return false;
+
+ /*
+ Do not abort in the middle of an event group that cannot be rolled back.
+ */
+ if ((thd->transaction.all.modified_non_trans_table ||
+ (thd->variables.option_bits & OPTION_KEEP_LOG))
+ && in_event_group)
+ return false;
+ /* ToDo: should we add some timeout like in sql_slave_killed?
+ if (rgi->last_event_start_time == 0)
+ rgi->last_event_start_time= my_time(0);
+ */
+
+ return true;
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -131,7 +153,6 @@ handle_rpl_parallel_thread(void *arg)
"Waiting for work from SQL thread");
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
- /* Mark that this thread is now executing */
rpt->event_queue= rpt->last_in_queue= NULL;
thd->exit_cond(old_msg);
@@ -159,7 +180,7 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
- /* Save this, as it gets cleared once event group commits. */
+ /* Save this, as it gets cleared when the event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd;
@@ -197,7 +218,16 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer;
}
- rpt_handle_event(events, rpt);
+ /*
+ If the SQL thread is stopping, we just skip execution of all the
+ following event groups. We still do all the normal waiting and wakeup
+ processing between the event groups as a simple way to ensure that
+ everything is stopped and cleaned up correctly.
+ */
+ if (!sql_worker_killed(thd, rgi, in_event_group))
+ rpt_handle_event(events, rpt);
+ else
+ thd->wait_for_prior_commit();
end_of_group=
in_event_group &&
@@ -207,7 +237,6 @@ handle_rpl_parallel_thread(void *arg)
(!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) ||
!strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))));
- /* ToDo: must use rgi here, not rli, for thread safety. */
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
my_free(events);
@@ -516,7 +545,7 @@ free_rpl_parallel_entry(void *element)
rpl_parallel::rpl_parallel() :
- current(NULL)
+ current(NULL), sql_thread_stopping(false)
{
my_hash_init(&domain_hash, &my_charset_bin, 32,
offsetof(rpl_parallel_entry, domain_id), sizeof(uint32),
@@ -529,6 +558,7 @@ rpl_parallel::reset()
{
my_hash_reset(&domain_hash);
current= NULL;
+ sql_thread_stopping= false;
}
@@ -591,10 +621,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
rpl_group_info *rgi= NULL;
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;
+ bool is_group_event;
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
+ /*
+ Stop queueing additional event groups once the SQL thread is requested to
+ stop.
+ */
+ if (((typ= ev->get_type_code()) == GTID_EVENT ||
+ !(is_group_event= Log_event::is_group_event(typ))) &&
+ rli->abort_slave)
+ sql_thread_stopping= true;
+ if (sql_thread_stopping)
+ return false;
+
if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
MYF(0))))
{
@@ -604,7 +646,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
qev->ev= ev;
qev->next= NULL;
- if ((typ= ev->get_type_code()) == GTID_EVENT)
+ if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
@@ -714,7 +756,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
e->current_sub_id= rgi->gtid_sub_id;
current= rgi->parallel_entry= e;
}
- else if (!Log_event::is_group_event(typ) || !current)
+ else if (!is_group_event || !current)
{
/*
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h
index 8dfd0297199..b9106392faf 100644
--- a/sql/rpl_parallel.h
+++ b/sql/rpl_parallel.h
@@ -76,6 +76,7 @@ struct rpl_parallel_entry {
struct rpl_parallel {
HASH domain_hash;
rpl_parallel_entry *current;
+ bool sql_thread_stopping;
rpl_parallel();
~rpl_parallel();