diff options
-rw-r--r-- | mysql-test/suite/perfschema/r/stage_mdl_global.result | 1 | ||||
-rw-r--r-- | mysql-test/suite/rpl/r/rpl_parallel2.result | 92 | ||||
-rw-r--r-- | mysql-test/suite/rpl/t/rpl_parallel2.test | 137 | ||||
-rw-r--r-- | sql/mysqld.cc | 3 | ||||
-rw-r--r-- | sql/mysqld.h | 3 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 416 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 38 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 13 | ||||
-rw-r--r-- | sql/rpl_rli.h | 7 | ||||
-rw-r--r-- | sql/slave.cc | 7 | ||||
-rw-r--r-- | sql/sql_parse.cc | 13 |
11 files changed, 642 insertions, 88 deletions
diff --git a/mysql-test/suite/perfschema/r/stage_mdl_global.result b/mysql-test/suite/perfschema/r/stage_mdl_global.result index de5df8f189a..b476689338e 100644 --- a/mysql-test/suite/perfschema/r/stage_mdl_global.result +++ b/mysql-test/suite/perfschema/r/stage_mdl_global.result @@ -6,6 +6,7 @@ user1 statement/sql/flush flush tables with read lock username event_name nesting_event_type username event_name nesting_event_type user1 stage/sql/init STATEMENT +user1 stage/sql/init STATEMENT user1 stage/sql/Waiting for query cache lock STATEMENT user1 stage/sql/init STATEMENT user1 stage/sql/query end STATEMENT diff --git a/mysql-test/suite/rpl/r/rpl_parallel2.result b/mysql-test/suite/rpl/r/rpl_parallel2.result index de90bcd158f..2ca73738d84 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel2.result +++ b/mysql-test/suite/rpl/r/rpl_parallel2.result @@ -29,8 +29,98 @@ include/start_slave.inc SELECT * FROM t1 WHERE a >= 10 ORDER BY a; a b 10 0 +*** MDEV-7818: Deadlock occurring with parallel replication and FTWRL *** +CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t2 VALUES (1,0), (2,0), (3,0); +include/stop_slave.inc +SET @old_dbug= @@SESSION.debug_dbug; +SET @commit_id= 4242; +SET SESSION debug_dbug="+d,binlog_force_commit_id"; +BEGIN; +UPDATE t2 SET b=b+1 WHERE a=2; +COMMIT; +BEGIN; +INSERT INTO t2 VALUES (4,10); +COMMIT; +SET SESSION debug_dbug= @old_dbug; +INSERT INTO t2 VALUES (5,0); +INSERT INTO t2 VALUES (6,0); +INSERT INTO t2 VALUES (7,0); +INSERT INTO t2 VALUES (8,0); +INSERT INTO t2 VALUES (9,0); +INSERT INTO t2 VALUES (10,0); +INSERT INTO t2 VALUES (11,0); +INSERT INTO t2 VALUES (12,0); +INSERT INTO t2 VALUES (13,0); +INSERT INTO t2 VALUES (14,0); +INSERT INTO t2 VALUES (15,0); +INSERT INTO t2 VALUES (16,0); +INSERT INTO t2 VALUES (17,0); +INSERT INTO t2 VALUES (18,0); +INSERT INTO t2 VALUES (19,0); +BEGIN; +SELECT * FROM t2 WHERE a=2 FOR UPDATE; +a b +2 0 +include/start_slave.inc +FLUSH TABLES WITH READ LOCK; +COMMIT; +STOP SLAVE; +SELECT * FROM t2 ORDER BY a; +a b +1 0 +2 1 +3 0 +4 10 +5 0 +6 0 +7 0 +8 0 +9 0 +10 0 +11 0 +12 0 +13 0 +14 0 +15 0 +16 0 +17 0 +18 0 +19 0 +UNLOCK TABLES; +include/wait_for_slave_to_stop.inc +include/start_slave.inc +SELECT * FROM t2 ORDER BY a; +a b +1 0 +2 1 +3 0 +4 10 +5 0 +6 0 +7 0 +8 0 +9 0 +10 0 +11 0 +12 0 +13 0 +14 0 +15 0 +16 0 +17 0 +18 0 +19 0 +*** MDEV-8318: Assertion `!pool->busy' failed in pool_mark_busy(rpl_parallel_thread_pool*) on concurrent FTWRL *** +LOCK TABLE t2 WRITE; +FLUSH TABLES WITH READ LOCK; +FLUSH TABLES WITH READ LOCK; +KILL QUERY CID; +ERROR 70100: Query execution was interrupted +UNLOCK TABLES; +UNLOCK TABLES; include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc -DROP TABLE t1; +DROP TABLE t1, t2; include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel2.test b/mysql-test/suite/rpl/t/rpl_parallel2.test index 47b0e87a6b6..50617c63024 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel2.test +++ b/mysql-test/suite/rpl/t/rpl_parallel2.test @@ -1,3 +1,5 @@ +--source include/have_debug.inc +--source include/have_innodb.inc --source include/have_binlog_format_statement.inc --let $rpl_topology=1->2 --source include/rpl_init.inc @@ -78,13 +80,144 @@ SET GLOBAL sql_slave_skip_counter= 1; SELECT * FROM t1 WHERE a >= 10 ORDER BY a; -# Clean up +--echo *** MDEV-7818: Deadlock occurring with parallel replication and FTWRL *** + +--connection server_1 +CREATE TABLE t2 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t2 VALUES (1,0), (2,0), (3,0); +--save_master_pos + +--connection server_2 +--sync_with_master +--source include/stop_slave.inc + +--connection server_1 +# Create a group commit with two transactions, will be used to provoke the +# problematic thread interaction with FTWRL on the slave. +SET @old_dbug= @@SESSION.debug_dbug; +SET @commit_id= 4242; +SET SESSION debug_dbug="+d,binlog_force_commit_id"; + +BEGIN; +UPDATE t2 SET b=b+1 WHERE a=2; +COMMIT; + +BEGIN; +INSERT INTO t2 VALUES (4,10); +COMMIT; + +SET SESSION debug_dbug= @old_dbug; + +INSERT INTO t2 VALUES (5,0); +INSERT INTO t2 VALUES (6,0); +INSERT INTO t2 VALUES (7,0); +INSERT INTO t2 VALUES (8,0); +INSERT INTO t2 VALUES (9,0); +INSERT INTO t2 VALUES (10,0); +INSERT INTO t2 VALUES (11,0); +INSERT INTO t2 VALUES (12,0); +INSERT INTO t2 VALUES (13,0); +INSERT INTO t2 VALUES (14,0); +INSERT INTO t2 VALUES (15,0); +INSERT INTO t2 VALUES (16,0); +INSERT INTO t2 VALUES (17,0); +INSERT INTO t2 VALUES (18,0); +INSERT INTO t2 VALUES (19,0); +--save_master_pos + +--connection server_2 + +--connect (s1, 127.0.0.1, root,, test, $SLAVE_MYPORT,) +# Block one transaction on a row lock. +BEGIN; +SELECT * FROM t2 WHERE a=2 FOR UPDATE; + +--connection server_2 + +# Wait for slave thread of the other transaction to have the commit lock. +--source include/start_slave.inc +--let $wait_condition= SELECT COUNT(*) > 0 FROM information_schema.processlist WHERE state = "Waiting for prior transaction to commit" +--source include/wait_condition.inc + +--connect (s2, 127.0.0.1, root,, test, $SLAVE_MYPORT,) +send FLUSH TABLES WITH READ LOCK; +# The bug was that at this point we were deadlocked. +# The FTWRL command would wait forever for T2 to commit. +# T2 would wait for T1 to commit first, but T1 is waiting for +# the global read lock to be released. + +--connection s1 +# Release the lock that blocs T1 from replicating. +COMMIT; + +--connection s1 +send STOP SLAVE; + +--connection s2 +reap; + +--connection server_1 +SELECT * FROM t2 ORDER BY a; + +--connection s2 +UNLOCK TABLES; + +--connection s1 +reap; + +--connection server_2 +--source include/wait_for_slave_to_stop.inc +--source include/start_slave.inc +--sync_with_master + +SELECT * FROM t2 ORDER BY a; + + + +--echo *** MDEV-8318: Assertion `!pool->busy' failed in pool_mark_busy(rpl_parallel_thread_pool*) on concurrent FTWRL *** + +--connection server_1 +LOCK TABLE t2 WRITE; + + +--connect (m1,localhost,root,,test) +--connection m1 +--let $cid=`SELECT CONNECTION_ID()` +send FLUSH TABLES WITH READ LOCK; + +--connect (m2,localhost,root,,test) +# We cannot force the race with DEBUG_SYNC, because the race does not +# exist after fixing the bug. At best we could force a debug sync to +# time out, which is effectively just a sleep. +# So just put a small sleep here; it is enough to trigger the bug in +# most run before the bug fix, and the code should work correctly +# however the thread scheduling happens. +--sleep 0.1 +send FLUSH TABLES WITH READ LOCK; + +--connection server_1 +--replace_result $cid CID +eval KILL QUERY $cid; + +--connection m1 +--error ER_QUERY_INTERRUPTED +reap; + +--connection server_1 +UNLOCK TABLES; + +--connection m2 +reap; +UNLOCK TABLES; + + +# Clean up. --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; --source include/start_slave.inc --connection server_1 -DROP TABLE t1; +DROP TABLE t1, t2; --source include/rpl_end.inc diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 932bb0c20ba..f9a3c2a0b3a 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -10117,6 +10117,9 @@ PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0}; PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0}; PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker threads to be idle", 0}; +PSI_stage_info stage_waiting_for_ftwrl= { 0, "Waiting due to global read lock", 0}; +PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause= { 0, "Waiting for worker threads to pause for global read lock", 0}; +PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while replication worker thread pool is busy", 0}; PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0}; PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0}; PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0}; diff --git a/sql/mysqld.h b/sql/mysqld.h index 29164c7629e..081d89c48a4 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -486,6 +486,9 @@ extern PSI_stage_info stage_waiting_for_prior_transaction_to_commit; extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit; extern PSI_stage_info stage_waiting_for_room_in_worker_thread; extern PSI_stage_info stage_waiting_for_workers_idle; +extern PSI_stage_info stage_waiting_for_ftwrl; +extern PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause; +extern PSI_stage_info stage_waiting_for_rpl_thread_pool; extern PSI_stage_info stage_master_gtid_wait_primary; extern PSI_stage_info stage_master_gtid_wait; extern PSI_stage_info stage_gtid_wait_other_connection; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index d5020dd4cba..526c3ec7a92 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -44,6 +44,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); + if (!(ev->is_artificial_event() || ev->is_relay_log_event() || + (ev->when == 0))) + rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time; mysql_mutex_lock(&rli->data_lock); /* Mutex will be released in apply_event_and_update_pos(). */ err= apply_event_and_update_pos(ev, thd, rgi, rpt); @@ -271,6 +274,284 @@ register_wait_for_prior_event_group_commit(rpl_group_info *rgi, } +/* + Do not start parallel execution of this event group until all prior groups + have reached the commit phase that are not safe to run in parallel with. +*/ +static bool +do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 wait_count; + + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + + if (!gco->installed) + { + group_commit_orderer *prev_gco= gco->prev_gco; + if (prev_gco) + { + prev_gco->last_sub_id= gco->prior_sub_id; + prev_gco->next_gco= gco; + } + gco->installed= true; + } + wait_count= gco->wait_count; + if (wait_count > entry->count_committing_event_groups) + { + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); + thd->ENTER_COND(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry, + &stage_waiting_for_prior_transaction_to_start_commit, + old_stage); + *did_enter_cond= true; + do + { + if (thd->check_killed() && !rgi->worker_error) + { + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + thd->send_kill_message(); + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + /* + Even though we were killed, we need to continue waiting for the + prior event groups to signal that we can continue. Otherwise we + mess up the accounting for ordering. However, now that we have + marked the error, events will just be skipped rather than + executed, and things will progress quickly towards stop. + */ + } + mysql_cond_wait(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry); + } while (wait_count > entry->count_committing_event_groups); + } + + if (entry->force_abort && wait_count > entry->stop_count) + { + /* + We are stopping (STOP SLAVE), and this event group is beyond the point + where we can safely stop. So return a flag that will cause us to skip, + rather than execute, the following events. + */ + return true; + } + else + return false; +} + + +static void +do_ftwrl_wait(rpl_group_info *rgi, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 sub_id= rgi->gtid_sub_id; + + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + + /* + If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this + transaction is later than transactions that have priority to complete + before FTWRL. If so, wait here so that FTWRL can proceed and complete + first. + + (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes + this test false as required). + */ + if (unlikely(sub_id > entry->pause_sub_id)) + { + thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry, + &stage_waiting_for_ftwrl, old_stage); + *did_enter_cond= true; + do + { + if (entry->force_abort || rgi->worker_error) + break; + if (thd->check_killed()) + { + thd->send_kill_message(); + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + break; + } + mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry); + } while (sub_id > entry->pause_sub_id); + + /* + We do not call EXIT_COND() here, as this will be done later by our + caller (since we set *did_enter_cond to true). + */ + } + + if (sub_id > entry->largest_started_sub_id) + entry->largest_started_sub_id= sub_id; +} + + +static int +pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd) +{ + PSI_stage_info old_stage; + int res= 0; + + /* + Wait here while the queue is busy. This is done to make FLUSH TABLES WITH + READ LOCK work correctly, without incuring extra locking penalties in + normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the + thread pool, and for this we need to make sure the pool will not go away + during the operation. The LOCK_rpl_thread_pool is not suitable for + this. It is taken by release_thread() while holding LOCK_rpl_thread; so it + must be released before locking any LOCK_rpl_thread lock, or a deadlock + can occur. + + So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and + pool size changes with this condition wait. + */ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + if (thd) + thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool, + &stage_waiting_for_rpl_thread_pool, &old_stage); + while (pool->busy) + { + if (thd && thd->check_killed()) + { + thd->send_kill_message(); + res= 1; + break; + } + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + } + if (!res) + pool->busy= true; + if (thd) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + + return res; +} + + +static void +pool_mark_not_busy(rpl_parallel_thread_pool *pool) +{ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + DBUG_ASSERT(pool->busy); + pool->busy= false; + mysql_cond_broadcast(&pool->COND_rpl_thread_pool); + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); +} + + +void +rpl_unpause_after_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + + DBUG_ASSERT(pool->busy); + + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + rpt->pause_for_ftwrl = false; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + e->pause_sub_id= (uint64)ULONGLONG_MAX; + mysql_cond_broadcast(&e->COND_parallel_entry); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + } + + pool_mark_not_busy(pool); +} + + +/* + . + + Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called. +*/ +int +rpl_pause_for_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + int err; + + /* + While the count_pending_pause_for_ftwrl counter is non-zero, the pool + cannot be shutdown/resized, so threads are guaranteed to not disappear. + + This is required to safely be able to access the individual threads below. + (We cannot lock an individual thread while holding LOCK_rpl_thread_pool, + as this can deadlock against release_thread()). + */ + if ((err= pool_mark_busy(pool, thd))) + return err; + + for (i= 0; i < pool->count; ++i) + { + PSI_stage_info old_stage; + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + /* + Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not + de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl(). + */ + rpt->pause_for_ftwrl = true; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + ++e->need_sub_id_signal; + if (e->pause_sub_id == (uint64)ULONGLONG_MAX) + e->pause_sub_id= e->largest_started_sub_id; + thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, + &stage_waiting_for_ftwrl_threads_to_pause, &old_stage); + while (e->pause_sub_id < (uint64)ULONGLONG_MAX && + e->last_committed_sub_id < e->pause_sub_id && + !err) + { + if (thd->check_killed()) + { + thd->send_kill_message(); + err= 1; + break; + } + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + }; + --e->need_sub_id_signal; + thd->EXIT_COND(&old_stage); + if (err) + break; + } + + if (err) + rpl_unpause_after_ftwrl(thd); + return err; +} + + #ifndef DBUG_OFF static int dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) @@ -792,7 +1073,6 @@ handle_rpl_parallel_thread(void *arg) { bool did_enter_cond= false; PSI_stage_info old_stage; - uint64 wait_count; DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", { if (rgi->current_gtid.domain_id == 0 && @@ -831,72 +1111,19 @@ handle_rpl_parallel_thread(void *arg) event_gtid_sub_id= rgi->gtid_sub_id; rgi->thd= thd; + mysql_mutex_lock(&entry->LOCK_parallel_entry); + skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage); + + if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) + skip_event_group= true; + if (likely(!skip_event_group)) + do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); + /* Register ourself to wait for the previous commit, if we need to do such registration _and_ that previous commit has not already occured. - - Also do not start parallel execution of this event group until all - prior groups have reached the commit phase that are not safe to run - in parallel with. */ - mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (!gco->installed) - { - group_commit_orderer *prev_gco= gco->prev_gco; - if (prev_gco) - { - prev_gco->last_sub_id= gco->prior_sub_id; - prev_gco->next_gco= gco; - } - gco->installed= true; - } - wait_count= gco->wait_count; - if (wait_count > entry->count_committing_event_groups) - { - DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); - thd->ENTER_COND(&gco->COND_group_commit_orderer, - &entry->LOCK_parallel_entry, - &stage_waiting_for_prior_transaction_to_start_commit, - &old_stage); - did_enter_cond= true; - do - { - if (thd->check_killed() && !rgi->worker_error) - { - DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); - thd->clear_error(); - thd->get_stmt_da()->reset_diagnostics_area(); - thd->send_kill_message(); - slave_output_error_info(rgi, thd); - signal_error_to_sql_driver_thread(thd, rgi, 1); - /* - Even though we were killed, we need to continue waiting for the - prior event groups to signal that we can continue. Otherwise we - mess up the accounting for ordering. However, now that we have - marked the error, events will just be skipped rather than - executed, and things will progress quickly towards stop. - */ - } - mysql_cond_wait(&gco->COND_group_commit_orderer, - &entry->LOCK_parallel_entry); - } while (wait_count > entry->count_committing_event_groups); - } - - if (entry->force_abort && wait_count > entry->stop_count) - { - /* - We are stopping (STOP SLAVE), and this event group is beyond the - point where we can safely stop. So set a flag that will cause us - to skip, rather than execute, the following events. - */ - skip_event_group= true; - } - else - skip_event_group= false; - - if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) - skip_event_group= true; register_wait_for_prior_event_group_commit(rgi, entry); unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, @@ -1060,17 +1287,40 @@ handle_rpl_parallel_thread(void *arg) */ rpt->batch_free(); - if ((events= rpt->event_queue) != NULL) + for (;;) { + if ((events= rpt->event_queue) != NULL) + { + /* + Take next group of events from the replication pool. + This is faster than having to wakeup the pool manager thread to give + us a new event. + */ + rpt->dequeue1(events); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + goto more_events; + } + if (!rpt->pause_for_ftwrl || + (in_event_group && !group_rgi->parallel_entry->force_abort)) + break; /* - Take next group of events from the replication pool. - This is faster than having to wakeup the pool manager thread to give us - a new event. + We are currently in the delicate process of pausing parallel + replication while FLUSH TABLES WITH READ LOCK is starting. We must + not de-allocate the thread (setting rpt->current_owner= NULL) until + rpl_unpause_after_ftwrl() has woken us up. */ - rpt->dequeue1(events); + mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - goto more_events; + mysql_cond_wait(&rpt->current_entry->COND_parallel_entry, + &rpt->current_entry->LOCK_parallel_entry); + mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + /* + Now loop to check again for more events available, since we released + and re-aquired the LOCK_rpl_thread mutex. + */ } + rpt->inuse_relaylog_refcount_update(); if (in_event_group && group_rgi->parallel_entry->force_abort) @@ -1148,6 +1398,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, rpl_parallel_thread **new_list= NULL; rpl_parallel_thread *new_free_list= NULL; rpl_parallel_thread *rpt_array= NULL; + int res; + + if ((res= pool_mark_busy(pool, current_thd))) + return res; /* Allocate the new list of threads up-front. @@ -1196,7 +1450,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, */ for (i= 0; i < pool->count; ++i) { - rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL); + rpl_parallel_thread *rpt; + + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + while ((rpt= pool->free_list) == NULL) + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + pool->free_list= rpt->next; + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->stop= true; mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); @@ -1250,9 +1511,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); } - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); - mysql_cond_broadcast(&pool->COND_rpl_thread_pool); - mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + pool_mark_not_busy(pool); return 0; @@ -1276,6 +1535,7 @@ err: } my_free(new_list); } + pool_mark_not_busy(pool); return 1; } @@ -1538,7 +1798,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : count(0), threads(0), free_list(0), inited(false) + : threads(0), free_list(0), count(0), inited(false), busy(false) { } @@ -1546,9 +1806,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool() int rpl_parallel_thread_pool::init(uint32 size) { - count= 0; threads= NULL; free_list= NULL; + count= 0; + busy= false; mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, MY_MUTEX_INIT_SLOW); @@ -1589,7 +1850,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, rpl_parallel_thread *rpt; mysql_mutex_lock(&LOCK_rpl_thread_pool); - while ((rpt= free_list) == NULL) + while (unlikely(busy) || !(rpt= free_list)) mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); free_list= rpt->next; mysql_mutex_unlock(&LOCK_rpl_thread_pool); @@ -1800,6 +2061,7 @@ rpl_parallel::find(uint32 domain_id) e->rpl_thread_max= count; e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; + e->pause_sub_id= (uint64)ULONGLONG_MAX; if (my_hash_insert(&domain_hash, (uchar *)e)) { my_free(e); @@ -2001,7 +2263,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); - e->need_sub_id_signal= true; + ++e->need_sub_id_signal; thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, &stage_waiting_for_workers_idle, &old_stage); while (e->current_sub_id > e->last_committed_sub_id) @@ -2014,7 +2276,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) } mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); } - e->need_sub_id_signal= false; + --e->need_sub_id_signal; thd->EXIT_COND(&old_stage); if (err) return err; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 0b6c0b460d0..6c1a650429b 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -93,6 +93,7 @@ struct rpl_parallel_thread { bool delay_start; bool running; bool stop; + bool pause_for_ftwrl; mysql_mutex_t LOCK_rpl_thread; mysql_cond_t COND_rpl_thread; mysql_cond_t COND_rpl_thread_queue; @@ -222,12 +223,18 @@ struct rpl_parallel_thread { struct rpl_parallel_thread_pool { - uint32 count; struct rpl_parallel_thread **threads; struct rpl_parallel_thread *free_list; mysql_mutex_t LOCK_rpl_thread_pool; mysql_cond_t COND_rpl_thread_pool; + uint32 count; bool inited; + /* + While FTWRL runs, this counter is incremented to make SQL thread or + STOP/START slave not try to start new activity while that operation + is in progress. + */ + bool busy; rpl_parallel_thread_pool(); int init(uint32 size); @@ -242,6 +249,12 @@ struct rpl_parallel_entry { mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; uint32 domain_id; + /* + Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show + that they are waiting, so that finish_event_group knows to signal them + when last_committed_sub_id is increased. + */ + uint32 need_sub_id_signal; uint64 last_commit_id; bool active; /* @@ -251,12 +264,6 @@ struct rpl_parallel_entry { */ bool force_abort; /* - Set in wait_for_workers_idle() to show that it is waiting, so that - finish_event_group knows to signal it when last_committed_sub_id is - increased. - */ - bool need_sub_id_signal; - /* At STOP SLAVE (force_abort=true), we do not want to process all events in the queue (which could unnecessarily delay stop, if a lot of events happen to be queued). The stop_count provides a safe point at which to stop, so @@ -296,6 +303,15 @@ struct rpl_parallel_entry { queued for execution by a worker thread. */ uint64 current_sub_id; + /* + The largest sub_id that has started its transaction. Protected by + LOCK_parallel_entry. + + (Transactions can start out-of-order, so this value signifies that no + transactions with larger sub_id have started, but not necessarily that all + transactions with smaller sub_id have started). + */ + uint64 largest_started_sub_id; rpl_group_info *current_group_info; /* If we get an error in some event group, we set the sub_id of that event @@ -305,6 +321,12 @@ struct rpl_parallel_entry { The value is ULONGLONG_MAX when no error occured. */ uint64 stop_on_error_sub_id; + /* + During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than + this value must not start, but wait until the global read lock is released. + The value is set to ULONGLONG_MAX when no FTWRL is pending. + */ + uint64 pause_sub_id; /* Total count of event groups queued so far. */ uint64 count_queued_event_groups; /* @@ -345,5 +367,7 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool; extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); +extern int rpl_pause_for_ftwrl(THD *thd); +extern void rpl_unpause_after_ftwrl(THD *thd); #endif /* RPL_PARALLEL_H */ diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 7453f57e64c..7bdd9c1861f 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -998,6 +998,18 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, else if (group_master_log_pos < log_pos) group_master_log_pos= log_pos; } + + /* + In the parallel case, we only update the Seconds_Behind_Master at the + end of a transaction. In the non-parallel case, the value is updated as + soon as an event is read from the relay log; however this would be too + confusing for the user, seeing the slave reported as up-to-date when + potentially thousands of events are still queued up for worker threads + waiting for execution. + */ + if (rgi->last_master_timestamp && + rgi->last_master_timestamp > last_master_timestamp) + last_master_timestamp= rgi->last_master_timestamp; } else { @@ -1614,6 +1626,7 @@ rpl_group_info::reinit(Relay_log_info *rli) long_find_row_note_printed= false; did_mark_start_commit= false; gtid_ev_flags2= 0; + last_master_timestamp = 0; gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL; speculation= SPECULATE_NO; commit_orderer.reinit(); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 377db469276..23ab1664a7b 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -669,6 +669,13 @@ struct rpl_group_info char gtid_info_buf[5+10+1+10+1+20+1]; /* + The timestamp, from the master, of the commit event. + Used to do delayed update of rli->last_master_timestamp, for getting + reasonable values out of Seconds_Behind_Master in SHOW SLAVE STATUS. + */ + time_t last_master_timestamp; + + /* Information to be able to re-try an event group in case of a deadlock or other temporary error. */ diff --git a/sql/slave.cc b/sql/slave.cc index fed171bcc4c..235b26f81ae 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3578,8 +3578,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, If it is an artificial event, or a relay log event (IO thread generated event) or ev->when is set to 0, we don't update the last_master_timestamp. + + In parallel replication, we might queue a large number of events, and + the user might be surprised to see a claim that the slave is up to date + long before those queued events are actually executed. */ - if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) + if (!rli->mi->using_parallel() && + !(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) { rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; DBUG_ASSERT(rli->last_master_timestamp >= 0); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index d6e5082bae6..b277f64ee2a 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -4811,6 +4811,17 @@ end_with_restore_list: } #endif /* WITH_WSREP*/ + if (lex->type & REFRESH_READ_LOCK) + { + /* + We need to pause any parallel replication slave workers during FLUSH + TABLES WITH READ LOCK. Otherwise we might cause a deadlock, as + worker threads eun run in arbitrary order but need to commit in a + specific given order. + */ + if (rpl_pause_for_ftwrl(thd)) + goto error; + } /* reload_acl_and_cache() will tell us if we are allowed to write to the binlog or not. @@ -4841,6 +4852,8 @@ end_with_restore_list: if (!res) my_ok(thd); } + if (lex->type & REFRESH_READ_LOCK) + rpl_unpause_after_ftwrl(thd); break; } |