diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-11-13 14:24:40 +0100 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2015-11-13 14:24:40 +0100 |
commit | 8f2e05f41cf3b92bdb4c409e542ef126d5fe8f95 (patch) | |
tree | 701b5599a3c9478e125535580e294f8b627f2834 /sql/rpl_parallel.cc | |
parent | 2828c2be554b62646fc990ac28b4aef20cd9b9d2 (diff) | |
parent | ba02550166eb39c0375a6422ecaa4731421250b6 (diff) | |
download | mariadb-git-8f2e05f41cf3b92bdb4c409e542ef126d5fe8f95.tar.gz |
Merge branch 'mdev7818-4' into 10.1
Conflicts:
mysql-test/suite/perfschema/r/stage_mdl_global.result
sql/rpl_rli.cc
sql/sql_parse.cc
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 416 |
1 files changed, 339 insertions, 77 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index d5020dd4cba..526c3ec7a92 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -44,6 +44,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, rgi->event_relay_log_pos= qev->event_relay_log_pos; rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos; strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name); + if (!(ev->is_artificial_event() || ev->is_relay_log_event() || + (ev->when == 0))) + rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time; mysql_mutex_lock(&rli->data_lock); /* Mutex will be released in apply_event_and_update_pos(). */ err= apply_event_and_update_pos(ev, thd, rgi, rpt); @@ -271,6 +274,284 @@ register_wait_for_prior_event_group_commit(rpl_group_info *rgi, } +/* + Do not start parallel execution of this event group until all prior groups + have reached the commit phase that are not safe to run in parallel with. +*/ +static bool +do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 wait_count; + + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + + if (!gco->installed) + { + group_commit_orderer *prev_gco= gco->prev_gco; + if (prev_gco) + { + prev_gco->last_sub_id= gco->prior_sub_id; + prev_gco->next_gco= gco; + } + gco->installed= true; + } + wait_count= gco->wait_count; + if (wait_count > entry->count_committing_event_groups) + { + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); + thd->ENTER_COND(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry, + &stage_waiting_for_prior_transaction_to_start_commit, + old_stage); + *did_enter_cond= true; + do + { + if (thd->check_killed() && !rgi->worker_error) + { + DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); + thd->clear_error(); + thd->get_stmt_da()->reset_diagnostics_area(); + thd->send_kill_message(); + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + /* + Even though we were killed, we need to continue waiting for the + prior event groups to signal that we can continue. Otherwise we + mess up the accounting for ordering. However, now that we have + marked the error, events will just be skipped rather than + executed, and things will progress quickly towards stop. + */ + } + mysql_cond_wait(&gco->COND_group_commit_orderer, + &entry->LOCK_parallel_entry); + } while (wait_count > entry->count_committing_event_groups); + } + + if (entry->force_abort && wait_count > entry->stop_count) + { + /* + We are stopping (STOP SLAVE), and this event group is beyond the point + where we can safely stop. So return a flag that will cause us to skip, + rather than execute, the following events. + */ + return true; + } + else + return false; +} + + +static void +do_ftwrl_wait(rpl_group_info *rgi, + bool *did_enter_cond, PSI_stage_info *old_stage) +{ + THD *thd= rgi->thd; + rpl_parallel_entry *entry= rgi->parallel_entry; + uint64 sub_id= rgi->gtid_sub_id; + + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); + + /* + If a FLUSH TABLES WITH READ LOCK (FTWRL) is pending, check if this + transaction is later than transactions that have priority to complete + before FTWRL. If so, wait here so that FTWRL can proceed and complete + first. + + (entry->pause_sub_id is ULONGLONG_MAX if no FTWRL is pending, which makes + this test false as required). + */ + if (unlikely(sub_id > entry->pause_sub_id)) + { + thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry, + &stage_waiting_for_ftwrl, old_stage); + *did_enter_cond= true; + do + { + if (entry->force_abort || rgi->worker_error) + break; + if (thd->check_killed()) + { + thd->send_kill_message(); + slave_output_error_info(rgi, thd); + signal_error_to_sql_driver_thread(thd, rgi, 1); + break; + } + mysql_cond_wait(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry); + } while (sub_id > entry->pause_sub_id); + + /* + We do not call EXIT_COND() here, as this will be done later by our + caller (since we set *did_enter_cond to true). + */ + } + + if (sub_id > entry->largest_started_sub_id) + entry->largest_started_sub_id= sub_id; +} + + +static int +pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd) +{ + PSI_stage_info old_stage; + int res= 0; + + /* + Wait here while the queue is busy. This is done to make FLUSH TABLES WITH + READ LOCK work correctly, without incuring extra locking penalties in + normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in the + thread pool, and for this we need to make sure the pool will not go away + during the operation. The LOCK_rpl_thread_pool is not suitable for + this. It is taken by release_thread() while holding LOCK_rpl_thread; so it + must be released before locking any LOCK_rpl_thread lock, or a deadlock + can occur. + + So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK and + pool size changes with this condition wait. + */ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + if (thd) + thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool, + &stage_waiting_for_rpl_thread_pool, &old_stage); + while (pool->busy) + { + if (thd && thd->check_killed()) + { + thd->send_kill_message(); + res= 1; + break; + } + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + } + if (!res) + pool->busy= true; + if (thd) + thd->EXIT_COND(&old_stage); + else + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + + return res; +} + + +static void +pool_mark_not_busy(rpl_parallel_thread_pool *pool) +{ + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + DBUG_ASSERT(pool->busy); + pool->busy= false; + mysql_cond_broadcast(&pool->COND_rpl_thread_pool); + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); +} + + +void +rpl_unpause_after_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + + DBUG_ASSERT(pool->busy); + + for (i= 0; i < pool->count; ++i) + { + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + rpt->pause_for_ftwrl = false; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + e->pause_sub_id= (uint64)ULONGLONG_MAX; + mysql_cond_broadcast(&e->COND_parallel_entry); + mysql_mutex_unlock(&e->LOCK_parallel_entry); + } + + pool_mark_not_busy(pool); +} + + +/* + . + + Note: in case of error return, rpl_unpause_after_ftwrl() must _not_ be called. +*/ +int +rpl_pause_for_ftwrl(THD *thd) +{ + uint32 i; + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; + int err; + + /* + While the count_pending_pause_for_ftwrl counter is non-zero, the pool + cannot be shutdown/resized, so threads are guaranteed to not disappear. + + This is required to safely be able to access the individual threads below. + (We cannot lock an individual thread while holding LOCK_rpl_thread_pool, + as this can deadlock against release_thread()). + */ + if ((err= pool_mark_busy(pool, thd))) + return err; + + for (i= 0; i < pool->count; ++i) + { + PSI_stage_info old_stage; + rpl_parallel_entry *e; + rpl_parallel_thread *rpt= pool->threads[i]; + + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + if (!rpt->current_owner) + { + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + continue; + } + e= rpt->current_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + /* + Setting the rpt->pause_for_ftwrl flag makes sure that the thread will not + de-allocate itself until signalled to do so by rpl_unpause_after_ftwrl(). + */ + rpt->pause_for_ftwrl = true; + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + ++e->need_sub_id_signal; + if (e->pause_sub_id == (uint64)ULONGLONG_MAX) + e->pause_sub_id= e->largest_started_sub_id; + thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, + &stage_waiting_for_ftwrl_threads_to_pause, &old_stage); + while (e->pause_sub_id < (uint64)ULONGLONG_MAX && + e->last_committed_sub_id < e->pause_sub_id && + !err) + { + if (thd->check_killed()) + { + thd->send_kill_message(); + err= 1; + break; + } + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); + }; + --e->need_sub_id_signal; + thd->EXIT_COND(&old_stage); + if (err) + break; + } + + if (err) + rpl_unpause_after_ftwrl(thd); + return err; +} + + #ifndef DBUG_OFF static int dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) @@ -792,7 +1073,6 @@ handle_rpl_parallel_thread(void *arg) { bool did_enter_cond= false; PSI_stage_info old_stage; - uint64 wait_count; DBUG_EXECUTE_IF("rpl_parallel_scheduled_gtid_0_x_100", { if (rgi->current_gtid.domain_id == 0 && @@ -831,72 +1111,19 @@ handle_rpl_parallel_thread(void *arg) event_gtid_sub_id= rgi->gtid_sub_id; rgi->thd= thd; + mysql_mutex_lock(&entry->LOCK_parallel_entry); + skip_event_group= do_gco_wait(rgi, gco, &did_enter_cond, &old_stage); + + if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) + skip_event_group= true; + if (likely(!skip_event_group)) + do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); + /* Register ourself to wait for the previous commit, if we need to do such registration _and_ that previous commit has not already occured. - - Also do not start parallel execution of this event group until all - prior groups have reached the commit phase that are not safe to run - in parallel with. */ - mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (!gco->installed) - { - group_commit_orderer *prev_gco= gco->prev_gco; - if (prev_gco) - { - prev_gco->last_sub_id= gco->prior_sub_id; - prev_gco->next_gco= gco; - } - gco->installed= true; - } - wait_count= gco->wait_count; - if (wait_count > entry->count_committing_event_groups) - { - DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); - thd->ENTER_COND(&gco->COND_group_commit_orderer, - &entry->LOCK_parallel_entry, - &stage_waiting_for_prior_transaction_to_start_commit, - &old_stage); - did_enter_cond= true; - do - { - if (thd->check_killed() && !rgi->worker_error) - { - DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed"); - thd->clear_error(); - thd->get_stmt_da()->reset_diagnostics_area(); - thd->send_kill_message(); - slave_output_error_info(rgi, thd); - signal_error_to_sql_driver_thread(thd, rgi, 1); - /* - Even though we were killed, we need to continue waiting for the - prior event groups to signal that we can continue. Otherwise we - mess up the accounting for ordering. However, now that we have - marked the error, events will just be skipped rather than - executed, and things will progress quickly towards stop. - */ - } - mysql_cond_wait(&gco->COND_group_commit_orderer, - &entry->LOCK_parallel_entry); - } while (wait_count > entry->count_committing_event_groups); - } - - if (entry->force_abort && wait_count > entry->stop_count) - { - /* - We are stopping (STOP SLAVE), and this event group is beyond the - point where we can safely stop. So set a flag that will cause us - to skip, rather than execute, the following events. - */ - skip_event_group= true; - } - else - skip_event_group= false; - - if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) - skip_event_group= true; register_wait_for_prior_event_group_commit(rgi, entry); unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, @@ -1060,17 +1287,40 @@ handle_rpl_parallel_thread(void *arg) */ rpt->batch_free(); - if ((events= rpt->event_queue) != NULL) + for (;;) { + if ((events= rpt->event_queue) != NULL) + { + /* + Take next group of events from the replication pool. + This is faster than having to wakeup the pool manager thread to give + us a new event. + */ + rpt->dequeue1(events); + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); + goto more_events; + } + if (!rpt->pause_for_ftwrl || + (in_event_group && !group_rgi->parallel_entry->force_abort)) + break; /* - Take next group of events from the replication pool. - This is faster than having to wakeup the pool manager thread to give us - a new event. + We are currently in the delicate process of pausing parallel + replication while FLUSH TABLES WITH READ LOCK is starting. We must + not de-allocate the thread (setting rpt->current_owner= NULL) until + rpl_unpause_after_ftwrl() has woken us up. */ - rpt->dequeue1(events); + mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); - goto more_events; + mysql_cond_wait(&rpt->current_entry->COND_parallel_entry, + &rpt->current_entry->LOCK_parallel_entry); + mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); + /* + Now loop to check again for more events available, since we released + and re-aquired the LOCK_rpl_thread mutex. + */ } + rpt->inuse_relaylog_refcount_update(); if (in_event_group && group_rgi->parallel_entry->force_abort) @@ -1148,6 +1398,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, rpl_parallel_thread **new_list= NULL; rpl_parallel_thread *new_free_list= NULL; rpl_parallel_thread *rpt_array= NULL; + int res; + + if ((res= pool_mark_busy(pool, current_thd))) + return res; /* Allocate the new list of threads up-front. @@ -1196,7 +1450,14 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, */ for (i= 0; i < pool->count; ++i) { - rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL); + rpl_parallel_thread *rpt; + + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); + while ((rpt= pool->free_list) == NULL) + mysql_cond_wait(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool); + pool->free_list= rpt->next; + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->stop= true; mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); @@ -1250,9 +1511,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); } - mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); - mysql_cond_broadcast(&pool->COND_rpl_thread_pool); - mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); + pool_mark_not_busy(pool); return 0; @@ -1276,6 +1535,7 @@ err: } my_free(new_list); } + pool_mark_not_busy(pool); return 1; } @@ -1538,7 +1798,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) rpl_parallel_thread_pool::rpl_parallel_thread_pool() - : count(0), threads(0), free_list(0), inited(false) + : threads(0), free_list(0), count(0), inited(false), busy(false) { } @@ -1546,9 +1806,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool() int rpl_parallel_thread_pool::init(uint32 size) { - count= 0; threads= NULL; free_list= NULL; + count= 0; + busy= false; mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, MY_MUTEX_INIT_SLOW); @@ -1589,7 +1850,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, rpl_parallel_thread *rpt; mysql_mutex_lock(&LOCK_rpl_thread_pool); - while ((rpt= free_list) == NULL) + while (unlikely(busy) || !(rpt= free_list)) mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); free_list= rpt->next; mysql_mutex_unlock(&LOCK_rpl_thread_pool); @@ -1800,6 +2061,7 @@ rpl_parallel::find(uint32 domain_id) e->rpl_thread_max= count; e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; + e->pause_sub_id= (uint64)ULONGLONG_MAX; if (my_hash_insert(&domain_hash, (uchar *)e)) { my_free(e); @@ -2001,7 +2263,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); - e->need_sub_id_signal= true; + ++e->need_sub_id_signal; thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, &stage_waiting_for_workers_idle, &old_stage); while (e->current_sub_id > e->last_committed_sub_id) @@ -2014,7 +2276,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) } mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); } - e->need_sub_id_signal= false; + --e->need_sub_id_signal; thd->EXIT_COND(&old_stage); if (err) return err; |