diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-09-23 14:46:57 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-09-23 14:46:57 +0200 |
commit | 976606d0318465e40670535f6353849d83d1c78f (patch) | |
tree | 3ed12f894d8f9fc8e41a8d5ce1d7a5d043e6e881 /sql/rpl_parallel.cc | |
parent | 5d1d20e40982a09a1f279dfd85b45583195cc5b8 (diff) | |
download | mariadb-git-976606d0318465e40670535f6353849d83d1c78f.tar.gz |
MDEV-4506: Parallel replication: After-review fixes.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 98 |
1 files changed, 69 insertions, 29 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index def0fe7c756..ce3170bb774 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -132,7 +132,6 @@ handle_rpl_parallel_thread(void *arg) while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); /* Mark that this thread is now executing */ - rpt->free= false; rpt->event_queue= rpt->last_in_queue= NULL; thd->exit_cond(old_msg); @@ -216,12 +215,24 @@ handle_rpl_parallel_thread(void *arg) { in_event_group= false; + /* + Remove any left-over registration to wait for a prior commit to + complete. Normally, such wait would already have been removed at + this point by wait_for_prior_commit(), but eg. in error case we + might have skipped waiting, so we would need to remove it explicitly. + */ rgi->commit_orderer.unregister_wait_for_prior_commit(); thd->wait_for_commit_ptr= NULL; /* - Record that we have finished, so other event groups will no - longer attempt to wait for us to commit. + Record that this event group has finished (eg. transaction is + committed, if transactional), so other event groups will no longer + attempt to wait for us to commit. Once we have increased + entry->last_committed_sub_id, no other threads will execute + register_wait_for_prior_commit() against us. Thus, by doing one + extra (usually redundant) wakeup_subsequent_commits() we can ensure + that no register_wait_for_prior_commit() can ever happen without a + subsequent wakeup_subsequent_commits() to wake it up. We can race here with the next transactions, but that is fine, as long as we check that we do not decrease last_committed_sub_id. If @@ -246,6 +257,11 @@ handle_rpl_parallel_thread(void *arg) mysql_mutex_lock(&rpt->LOCK_rpl_thread); if ((events= rpt->event_queue) != NULL) { + /* + Take next group of events from the replication pool. + This is faster than having to wakeup the pool manager thread to give us + a new event. + */ rpt->event_queue= rpt->last_in_queue= NULL; mysql_mutex_unlock(&rpt->LOCK_rpl_thread); goto more_events; @@ -254,7 +270,7 @@ handle_rpl_parallel_thread(void *arg) if (!in_event_group) { rpt->current_entry= NULL; - if (!rpt->stop && !rpt->free) + if (!rpt->stop) { mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool); list= rpt->pool->free_list; @@ -263,7 +279,6 @@ handle_rpl_parallel_thread(void *arg) if (!list) mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool); mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool); - rpt->free= true; } } } @@ -300,6 +315,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, uint32 i; rpl_parallel_thread **new_list= NULL; rpl_parallel_thread *new_free_list= NULL; + rpl_parallel_thread *rpt_array= NULL; /* Allocate the new list of threads up-front. @@ -307,10 +323,13 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, to allocate, and will not be left with a half-functional thread pool. */ if (new_count && - !(new_list= (rpl_parallel_thread **)my_malloc(new_count*sizeof(*new_list), - MYF(MY_WME)))) + !my_multi_malloc(MYF(MY_WME|MY_ZEROFILL), + &new_list, new_count*sizeof(*new_list), + &rpt_array, new_count*sizeof(*rpt_array), + NULL)) { - my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list)))); + my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) + + new_count*sizeof(*rpt_array)))); goto err;; } @@ -318,28 +337,16 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, { pthread_t th; - if (!(new_list[i]= (rpl_parallel_thread *)my_malloc(sizeof(*(new_list[i])), - MYF(MY_WME)))) - { - my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*(new_list[i]))); - goto err; - } + new_list[i]= &rpt_array[i]; new_list[i]->delay_start= true; - new_list[i]->running= false; - new_list[i]->stop= false; - new_list[i]->free= false; mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread, MY_MUTEX_INIT_SLOW); mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL); new_list[i]->pool= pool; - new_list[i]->current_entry= NULL; - new_list[i]->event_queue= NULL; - new_list[i]->last_in_queue= NULL; if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL, handle_rpl_parallel_thread, new_list[i])) { my_error(ER_OUT_OF_RESOURCES, MYF(0)); - my_free(new_list[i]); goto err; } new_list[i]->next= new_free_list; @@ -364,6 +371,13 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&LOCK_active_mi); } + /* + Grab each old thread in turn, and signal it to stop. + + Note that since we require all replication threads to be stopped before + changing the parallel replication worker thread pool, all the threads will + be already idle and will terminate immediately. + */ for (i= 0; i < pool->count; ++i) { rpl_parallel_thread *rpt= pool->get_thread(NULL); @@ -381,7 +395,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_unlock(&rpt->LOCK_rpl_thread); mysql_mutex_destroy(&rpt->LOCK_rpl_thread); mysql_cond_destroy(&rpt->COND_rpl_thread); - my_free(rpt); } my_free(pool->threads); @@ -409,7 +422,6 @@ err: { while (new_free_list) { - rpl_parallel_thread *next= new_free_list->next; mysql_mutex_lock(&new_free_list->LOCK_rpl_thread); new_free_list->delay_start= false; new_free_list->stop= true; @@ -421,8 +433,7 @@ err: mysql_cond_wait(&new_free_list->COND_rpl_thread, &new_free_list->LOCK_rpl_thread); mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread); - my_free(new_free_list); - new_free_list= next; + new_free_list= new_free_list->next; } my_free(new_list); } @@ -471,6 +482,12 @@ rpl_parallel_thread_pool::destroy() } +/* + Wait for a worker thread to become idle. When one does, grab the thread for + our use and return it. + + Note that we return with the worker threads's LOCK_rpl_thread mutex locked. +*/ struct rpl_parallel_thread * rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) { @@ -571,7 +588,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) rpl_parallel_entry *e; rpl_parallel_thread *cur_thread; rpl_parallel_thread::queued_event *qev; - rpl_group_info *rgi; + rpl_group_info *rgi= NULL; Relay_log_info *rli= serial_rgi->rli; enum Log_event_type typ; @@ -596,6 +613,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) event_group_new_gtid(rgi, gtid_ev)) { my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME)); + delete rgi; return true; } if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) @@ -622,14 +640,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) } else { - /* Check if we already have a worker thread for this entry. */ + /* + Check if we already have a worker thread for this entry. + + We continue to queue more events up for the worker thread while it is + still executing the first ones, to be able to start executing a large + event group without having to wait for the end to be fetched from the + master. And we continue to queue up more events after the first group, + avoiding the overhead of worker threads constantly entering and + leaving the worker thread free list. + + But if the worker thread is idle at any point, it may return to the + idle list or be servicing a different request. So check this, and + allocate a new thread if the old one is no longer processing for us. + */ cur_thread= e->rpl_thread; if (cur_thread) { mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); if (cur_thread->current_entry != e) { - /* Not ours anymore, we need to grab a new one. */ + /* + The worker thread became idle, and returned to the free list and + possibly was allocated to a different request. This also means + that everything previously queued has already been executed, else + the worker thread would not have become idle. So we should + allocate a new worker thread. + */ mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); e->rpl_thread= cur_thread= NULL; } @@ -682,6 +719,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Same for events not preceeded by GTID (we should not see those normally, but they might be from an old master). + + The varuable `current' is NULL for the case where the master did not + have GTID, like a MariaDB 5.5 or MySQL master. */ qev->rgi= serial_rgi; rpt_handle_event(qev, NULL); @@ -719,8 +759,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) else cur_thread->event_queue= qev; cur_thread->last_in_queue= qev; - mysql_cond_signal(&cur_thread->COND_rpl_thread); mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + mysql_cond_signal(&cur_thread->COND_rpl_thread); return false; } |