diff options
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel.result | 45 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel.test | 68 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 56 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 1 |
4 files changed, 161 insertions, 9 deletions
diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index 2e13d914e62..03b102a1af9 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -117,7 +117,6 @@ include/start_slave.inc FLUSH LOGS; CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7); -SET binlog_format=@old_format; BEGIN; INSERT INTO t3 VALUES (2,102); BEGIN; @@ -211,6 +210,50 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16, 'group_commit_waiting_for_prior SIGNAL slave_queued3', '')) slave-bin.000003 # Xid # # COMMIT /* XID */ +*** Test STOP SLAVE in parallel mode *** +include/stop_slave.inc +SET binlog_direct_non_transactional_updates=0; +SET sql_log_bin=0; +CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction"); +SET sql_log_bin=1; +BEGIN; +INSERT INTO t2 VALUES (20); +INSERT INTO t1 VALUES (20); +INSERT INTO t2 VALUES (21); +INSERT INTO t3 VALUES (20, 20); +COMMIT; +INSERT INTO t3 VALUES(21, 21); +INSERT INTO t3 VALUES(22, 22); +SET binlog_format=@old_format; +BEGIN; +INSERT INTO t2 VALUES (21); +START SLAVE; +STOP SLAVE; +ROLLBACK; +include/wait_for_slave_to_stop.inc +SELECT * FROM t1 WHERE a >= 20 ORDER BY a; +a +20 +SELECT * FROM t2 WHERE a >= 20 ORDER BY a; +a +20 +21 +SELECT * FROM t3 WHERE a >= 20 ORDER BY a; +a b +20 20 +include/start_slave.inc +SELECT * FROM t1 WHERE a >= 20 ORDER BY a; +a +20 +SELECT * FROM t2 WHERE a >= 20 ORDER BY a; +a +20 +21 +SELECT * FROM t3 WHERE a >= 20 ORDER BY a; +a b +20 20 +21 21 +22 22 include/stop_slave.inc SET GLOBAL binlog_format=@old_format; SET GLOBAL slave_parallel_threads=0; diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index bfc6283da66..89834b790d6 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -165,7 +165,6 @@ CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; # Create some sentinel rows so that the rows inserted in parallel fall into # separate gaps and do not cause gap lock conflicts. INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7); -SET binlog_format=@old_format; --save_master_pos --connection server_2 --sync_with_master @@ -264,6 +263,73 @@ SELECT * FROM t3 ORDER BY a; --source include/show_binlog_events.inc +--echo *** Test STOP SLAVE in parallel mode *** +--connection server_2 +--source include/stop_slave.inc + +--connection server_1 +# Set up a couple of transactions. The first will be blocked halfway +# through on a lock, and while it is blocked we initiate STOP SLAVE. +# We then test that the halfway-initiated transaction is allowed to +# complete, but no subsequent ones. +# We have to use statement-based mode and set +# binlog_direct_non_transactional_updates=0; otherwise the binlog will +# be split into two event groups, one for the MyISAM part and one for the +# InnoDB part. +SET binlog_direct_non_transactional_updates=0; +SET sql_log_bin=0; +CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction"); +SET sql_log_bin=1; +BEGIN; +INSERT INTO t2 VALUES (20); +--disable_warnings +INSERT INTO t1 VALUES (20); +--disable_warnings +INSERT INTO t2 VALUES (21); +INSERT INTO t3 VALUES (20, 20); +COMMIT; +INSERT INTO t3 VALUES(21, 21); +INSERT INTO t3 VALUES(22, 22); +SET binlog_format=@old_format; +--save_master_pos + +# Start a connection that will block the replicated transaction halfway. +--connection con_temp1 +BEGIN; +INSERT INTO t2 VALUES (21); + +--connection server_2 +START SLAVE; +# Wait for the MyISAM change to be visible, after which replication will wait +# for con_temp1 to roll back. +--let $wait_condition= SELECT COUNT(*) = 1 FROM t1 WHERE a=20 +--source include/wait_condition.inc + +--connection con_temp2 +# Initiate slave stop. It will have to wait for the current event group +# to complete. +send STOP SLAVE; + +--connection con_temp1 +ROLLBACK; + +--connection con_temp2 +reap; + +--connection server_2 +--source include/wait_for_slave_to_stop.inc +# We should see the first transaction applied, but not the two others. +SELECT * FROM t1 WHERE a >= 20 ORDER BY a; +SELECT * FROM t2 WHERE a >= 20 ORDER BY a; +SELECT * FROM t3 WHERE a >= 20 ORDER BY a; + +--source include/start_slave.inc +--sync_with_master +SELECT * FROM t1 WHERE a >= 20 ORDER BY a; +SELECT * FROM t2 WHERE a >= 20 ORDER BY a; +SELECT * FROM t3 WHERE a >= 20 ORDER BY a; + + --connection server_2 --source include/stop_slave.inc SET GLOBAL binlog_format=@old_format; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 6c8c5b5c3fa..e80512a3580 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -77,6 +77,28 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, } +static bool +sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) +{ + if (!rgi->rli->abort_slave && !abort_loop) + return false; + + /* + Do not abort in the middle of an event group that cannot be rolled back. + */ + if ((thd->transaction.all.modified_non_trans_table || + (thd->variables.option_bits & OPTION_KEEP_LOG)) + && in_event_group) + return false; + /* ToDo: should we add some timeout like in sql_slave_killed? + if (rgi->last_event_start_time == 0) + rgi->last_event_start_time= my_time(0); + */ + + return true; +} + + pthread_handler_t handle_rpl_parallel_thread(void *arg) { @@ -131,7 +153,6 @@ handle_rpl_parallel_thread(void *arg) "Waiting for work from SQL thread"); while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); - /* Mark that this thread is now executing */ rpt->event_queue= rpt->last_in_queue= NULL; thd->exit_cond(old_msg); @@ -159,7 +180,7 @@ handle_rpl_parallel_thread(void *arg) (0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); - /* Save this, as it gets cleared once event group commits. */ + /* Save this, as it gets cleared when the event group commits. */ event_gtid_sub_id= rgi->gtid_sub_id; rgi->thd= thd; @@ -197,7 +218,16 @@ handle_rpl_parallel_thread(void *arg) thd->wait_for_commit_ptr= &rgi->commit_orderer; } - rpt_handle_event(events, rpt); + /* + If the SQL thread is stopping, we just skip execution of all the + following event groups. We still do all the normal waiting and wakeup + processing between the event groups as a simple way to ensure that + everything is stopped and cleaned up correctly. + */ + if (!sql_worker_killed(thd, rgi, in_event_group)) + rpt_handle_event(events, rpt); + else + thd->wait_for_prior_commit(); end_of_group= in_event_group && @@ -207,7 +237,6 @@ handle_rpl_parallel_thread(void *arg) (!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) || !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query)))); - /* ToDo: must use rgi here, not rli, for thread safety. */ delete_or_keep_event_post_apply(rgi, event_type, events->ev); my_free(events); @@ -516,7 +545,7 @@ free_rpl_parallel_entry(void *element) rpl_parallel::rpl_parallel() : - current(NULL) + current(NULL), sql_thread_stopping(false) { my_hash_init(&domain_hash, &my_charset_bin, 32, offsetof(rpl_parallel_entry, domain_id), sizeof(uint32), @@ -529,6 +558,7 @@ rpl_parallel::reset() { my_hash_reset(&domain_hash); current= NULL; + sql_thread_stopping= false; } @@ -591,10 +621,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) rpl_group_info *rgi= NULL; Relay_log_info *rli= serial_rgi->rli; enum Log_event_type typ; + bool is_group_event; /* ToDo: what to do with this lock?!? */ mysql_mutex_unlock(&rli->data_lock); + /* + Stop queueing additional event groups once the SQL thread is requested to + stop. + */ + if (((typ= ev->get_type_code()) == GTID_EVENT || + !(is_group_event= Log_event::is_group_event(typ))) && + rli->abort_slave) + sql_thread_stopping= true; + if (sql_thread_stopping) + return false; + if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), MYF(0)))) { @@ -604,7 +646,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) qev->ev= ev; qev->next= NULL; - if ((typ= ev->get_type_code()) == GTID_EVENT) + if (typ == GTID_EVENT) { Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); @@ -714,7 +756,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) e->current_sub_id= rgi->gtid_sub_id; current= rgi->parallel_entry= e; } - else if (!Log_event::is_group_event(typ) || !current) + else if (!is_group_event || !current) { /* Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 8dfd0297199..b9106392faf 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -76,6 +76,7 @@ struct rpl_parallel_entry { struct rpl_parallel { HASH domain_hash; rpl_parallel_entry *current; + bool sql_thread_stopping; rpl_parallel(); ~rpl_parallel(); |