summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-09-23 14:46:57 +0200
committerunknown <knielsen@knielsen-hq.org>2013-09-23 14:46:57 +0200
commit976606d0318465e40670535f6353849d83d1c78f (patch)
tree3ed12f894d8f9fc8e41a8d5ce1d7a5d043e6e881 /sql/rpl_parallel.cc
parent5d1d20e40982a09a1f279dfd85b45583195cc5b8 (diff)
downloadmariadb-git-976606d0318465e40670535f6353849d83d1c78f.tar.gz
MDEV-4506: Parallel replication: After-review fixes.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc98
1 files changed, 69 insertions, 29 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index def0fe7c756..ce3170bb774 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -132,7 +132,6 @@ handle_rpl_parallel_thread(void *arg)
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
/* Mark that this thread is now executing */
- rpt->free= false;
rpt->event_queue= rpt->last_in_queue= NULL;
thd->exit_cond(old_msg);
@@ -216,12 +215,24 @@ handle_rpl_parallel_thread(void *arg)
{
in_event_group= false;
+ /*
+ Remove any left-over registration to wait for a prior commit to
+ complete. Normally, such wait would already have been removed at
+ this point by wait_for_prior_commit(), but eg. in error case we
+ might have skipped waiting, so we would need to remove it explicitly.
+ */
rgi->commit_orderer.unregister_wait_for_prior_commit();
thd->wait_for_commit_ptr= NULL;
/*
- Record that we have finished, so other event groups will no
- longer attempt to wait for us to commit.
+ Record that this event group has finished (eg. transaction is
+ committed, if transactional), so other event groups will no longer
+ attempt to wait for us to commit. Once we have increased
+ entry->last_committed_sub_id, no other threads will execute
+ register_wait_for_prior_commit() against us. Thus, by doing one
+ extra (usually redundant) wakeup_subsequent_commits() we can ensure
+ that no register_wait_for_prior_commit() can ever happen without a
+ subsequent wakeup_subsequent_commits() to wake it up.
We can race here with the next transactions, but that is fine, as
long as we check that we do not decrease last_committed_sub_id. If
@@ -246,6 +257,11 @@ handle_rpl_parallel_thread(void *arg)
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
if ((events= rpt->event_queue) != NULL)
{
+ /*
+ Take next group of events from the replication pool.
+ This is faster than having to wakeup the pool manager thread to give us
+ a new event.
+ */
rpt->event_queue= rpt->last_in_queue= NULL;
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
goto more_events;
@@ -254,7 +270,7 @@ handle_rpl_parallel_thread(void *arg)
if (!in_event_group)
{
rpt->current_entry= NULL;
- if (!rpt->stop && !rpt->free)
+ if (!rpt->stop)
{
mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
list= rpt->pool->free_list;
@@ -263,7 +279,6 @@ handle_rpl_parallel_thread(void *arg)
if (!list)
mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool);
mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool);
- rpt->free= true;
}
}
}
@@ -300,6 +315,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 i;
rpl_parallel_thread **new_list= NULL;
rpl_parallel_thread *new_free_list= NULL;
+ rpl_parallel_thread *rpt_array= NULL;
/*
Allocate the new list of threads up-front.
@@ -307,10 +323,13 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
to allocate, and will not be left with a half-functional thread pool.
*/
if (new_count &&
- !(new_list= (rpl_parallel_thread **)my_malloc(new_count*sizeof(*new_list),
- MYF(MY_WME))))
+ !my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
+ &new_list, new_count*sizeof(*new_list),
+ &rpt_array, new_count*sizeof(*rpt_array),
+ NULL))
{
- my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list))));
+ my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) +
+ new_count*sizeof(*rpt_array))));
goto err;;
}
@@ -318,28 +337,16 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
{
pthread_t th;
- if (!(new_list[i]= (rpl_parallel_thread *)my_malloc(sizeof(*(new_list[i])),
- MYF(MY_WME))))
- {
- my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*(new_list[i])));
- goto err;
- }
+ new_list[i]= &rpt_array[i];
new_list[i]->delay_start= true;
- new_list[i]->running= false;
- new_list[i]->stop= false;
- new_list[i]->free= false;
mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
new_list[i]->pool= pool;
- new_list[i]->current_entry= NULL;
- new_list[i]->event_queue= NULL;
- new_list[i]->last_in_queue= NULL;
if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL,
handle_rpl_parallel_thread, new_list[i]))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
- my_free(new_list[i]);
goto err;
}
new_list[i]->next= new_free_list;
@@ -364,6 +371,13 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&LOCK_active_mi);
}
+ /*
+ Grab each old thread in turn, and signal it to stop.
+
+ Note that since we require all replication threads to be stopped before
+ changing the parallel replication worker thread pool, all the threads will
+ be already idle and will terminate immediately.
+ */
for (i= 0; i < pool->count; ++i)
{
rpl_parallel_thread *rpt= pool->get_thread(NULL);
@@ -381,7 +395,6 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
mysql_cond_destroy(&rpt->COND_rpl_thread);
- my_free(rpt);
}
my_free(pool->threads);
@@ -409,7 +422,6 @@ err:
{
while (new_free_list)
{
- rpl_parallel_thread *next= new_free_list->next;
mysql_mutex_lock(&new_free_list->LOCK_rpl_thread);
new_free_list->delay_start= false;
new_free_list->stop= true;
@@ -421,8 +433,7 @@ err:
mysql_cond_wait(&new_free_list->COND_rpl_thread,
&new_free_list->LOCK_rpl_thread);
mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread);
- my_free(new_free_list);
- new_free_list= next;
+ new_free_list= new_free_list->next;
}
my_free(new_list);
}
@@ -471,6 +482,12 @@ rpl_parallel_thread_pool::destroy()
}
+/*
+ Wait for a worker thread to become idle. When one does, grab the thread for
+ our use and return it.
+
+ Note that we return with the worker threads's LOCK_rpl_thread mutex locked.
+*/
struct rpl_parallel_thread *
rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
{
@@ -571,7 +588,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
rpl_parallel_entry *e;
rpl_parallel_thread *cur_thread;
rpl_parallel_thread::queued_event *qev;
- rpl_group_info *rgi;
+ rpl_group_info *rgi= NULL;
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;
@@ -596,6 +613,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
event_group_new_gtid(rgi, gtid_ev))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ delete rgi;
return true;
}
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
@@ -622,14 +640,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
}
else
{
- /* Check if we already have a worker thread for this entry. */
+ /*
+ Check if we already have a worker thread for this entry.
+
+ We continue to queue more events up for the worker thread while it is
+ still executing the first ones, to be able to start executing a large
+ event group without having to wait for the end to be fetched from the
+ master. And we continue to queue up more events after the first group,
+ avoiding the overhead of worker threads constantly entering and
+ leaving the worker thread free list.
+
+ But if the worker thread is idle at any point, it may return to the
+ idle list or be servicing a different request. So check this, and
+ allocate a new thread if the old one is no longer processing for us.
+ */
cur_thread= e->rpl_thread;
if (cur_thread)
{
mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
if (cur_thread->current_entry != e)
{
- /* Not ours anymore, we need to grab a new one. */
+ /*
+ The worker thread became idle, and returned to the free list and
+ possibly was allocated to a different request. This also means
+ that everything previously queued has already been executed, else
+ the worker thread would not have become idle. So we should
+ allocate a new worker thread.
+ */
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
e->rpl_thread= cur_thread= NULL;
}
@@ -682,6 +719,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread.
Same for events not preceeded by GTID (we should not see those normally,
but they might be from an old master).
+
+ The varuable `current' is NULL for the case where the master did not
+ have GTID, like a MariaDB 5.5 or MySQL master.
*/
qev->rgi= serial_rgi;
rpt_handle_event(qev, NULL);
@@ -719,8 +759,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
else
cur_thread->event_queue= qev;
cur_thread->last_in_queue= qev;
- mysql_cond_signal(&cur_thread->COND_rpl_thread);
mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ mysql_cond_signal(&cur_thread->COND_rpl_thread);
return false;
}