summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-02-26 15:02:09 +0100
committerunknown <knielsen@knielsen-hq.org>2014-02-26 15:02:09 +0100
commite90f68c0ba1802d58b06bc7178513f77b27b662b (patch)
treede9f14296cb3a637b036b9358471215f7bd60dca /sql/rpl_parallel.cc
parent50323f12630fbc5b19b40f1a9affdc550cc7cfd9 (diff)
downloadmariadb-git-e90f68c0ba1802d58b06bc7178513f77b27b662b.tar.gz
MDEV-5657: Parallel replication.
Clean up and improve the parallel implementation code, mainly related to scheduling of work to threads and handling of stop and errors. Fix a lot of bugs in various corner cases that could lead to crashes or corruption. Fix that a single replication domain could easily grab all worker threads and stall all other domains; now a configuration variable --slave-domain-parallel-threads allows to limit the number of workers. Allow next event group to start as soon as previous group begins the commit phase (as opposed to when it ends it); this allows multiple event groups on the slave to participate in group commit, even when no other opportunities for parallelism are available. Various fixes: - Fix some races in the rpl.rpl_parallel test case. - Fix an old incorrect assertion in Log_event iocache read. - Fix repeated malloc/free of wait_for_commit and rpl_group_info objects. - Simplify wait_for_commit wakeup logic. - Fix one case in queue_for_group_commit() where killing one thread would fail to correctly signal the error to the next, causing loss of the transaction after slave restart. - Fix leaking of pthreads (and their allocated stack) due to missing PTHREAD_CREATE_DETACHED attribute. - Fix how one batch of group-committed transactions wait for the previous batch before starting to execute themselves. The old code had a very complex scheduling where the first transaction was handled differently, with subtle bugs in corner cases. Now each event group is always scheduled for a new worker (in a round-robin fashion amongst available workers). Keep a count of how many transactions have started to commit, and wait for that counter to reach the appropriate value. - Fix slave stop to wait for all workers to actually complete processing; before, the wait was for update of last_committed_sub_id, which happens a bit earlier, and could leave worker threads potentially accessing bits of the replication state that is no longer valid after slave stop. - Fix a couple of places where the test suite would kill a thread waiting inside enter_cond() in connection with debug_sync; debug_sync + kill can crash in rare cases due to a race with mysys_var_current_mutex in this case. - Fix some corner cases where we had enter_cond() but no exit_cond(). - Fix that we could get failure in wait_for_prior_commit() but forget to flag the error with my_error(). - Fix slave stop (both for normal stop and stop due to error). Now, at stop we pick a specific safe point (in terms of event groups executed) and make sure that all event groups before that point are executed to completion, and that no event group after start executing; this ensures a safe place to restart replication, even for non-transactional stuff/DDL. In error stop, make sure that all prior event groups are allowed to execute to completion, and that any later event groups that have started are rolled back, if possible. The old code could leave eg. T1 and T3 committed but T2 not, or it could even leave half a transaction not rolled back in some random worker, which would cause big problems when that worker was later reused after slave restart. - Fix the accounting of amount of events queued for one worker. Before, the amount was reduced immediately as soon as the events were dequeued (which happens all at once); this allowed twice the amount of events to be queued in memory for each single worker, which is not what users would expect. - Fix that an error set during execution of one event was sometimes not cleared before executing the next, causing problems with the error reporting. - Fix incorrect handling of thd->killed in worker threads.
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc951
1 files changed, 645 insertions, 306 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index ef282611f70..eab5b980c02 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -93,32 +93,12 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
}
-static bool
-sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group)
-{
- if (!rgi->rli->abort_slave && !abort_loop)
- return false;
-
- /*
- Do not abort in the middle of an event group that cannot be rolled back.
- */
- if ((thd->transaction.all.modified_non_trans_table ||
- (thd->variables.option_bits & OPTION_KEEP_LOG))
- && in_event_group)
- return false;
- /* ToDo: should we add some timeout like in sql_slave_killed?
- if (rgi->last_event_start_time == 0)
- rgi->last_event_start_time= my_time(0);
- */
-
- return true;
-}
-
-
static void
finish_event_group(THD *thd, int err, uint64 sub_id,
- rpl_parallel_entry *entry, wait_for_commit *wfc)
+ rpl_parallel_entry *entry, rpl_group_info *rgi)
{
+ wait_for_commit *wfc= &rgi->commit_orderer;
+
/*
Remove any left-over registration to wait for a prior commit to
complete. Normally, such wait would already have been removed at
@@ -163,12 +143,26 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
*/
mysql_mutex_lock(&entry->LOCK_parallel_entry);
if (entry->last_committed_sub_id < sub_id)
- {
entry->last_committed_sub_id= sub_id;
- mysql_cond_broadcast(&entry->COND_parallel_entry);
- }
+
+ /*
+ If this event group got error, then any following event groups that have
+ not yet started should just skip their group, preparing for stop of the
+ SQL driver thread.
+ */
+ if (unlikely(rgi->is_error) &&
+ entry->stop_on_error_sub_id == (uint64)ULONGLONG_MAX)
+ entry->stop_on_error_sub_id= sub_id;
+ /*
+ We need to mark that this event group started its commit phase, in case we
+ missed it before (otherwise we would deadlock the next event group that is
+ waiting for this). In most cases (normal DML), it will be a no-op.
+ */
+ rgi->mark_start_commit_no_lock();
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ thd->clear_error();
+ thd->stmt_da->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(err);
}
@@ -185,6 +179,20 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi)
}
+static void
+unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
+ const char *old_msg)
+{
+ if (*did_enter_cond)
+ {
+ thd->exit_cond(old_msg);
+ *did_enter_cond= false;
+ }
+ else
+ mysql_mutex_unlock(lock);
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -193,8 +201,14 @@ handle_rpl_parallel_thread(void *arg)
struct rpl_parallel_thread::queued_event *events;
bool group_standalone= true;
bool in_event_group= false;
+ bool group_skip_for_stop= false;
rpl_group_info *group_rgi= NULL;
+ group_commit_orderer *gco, *tmp_gco;
uint64 event_gtid_sub_id= 0;
+ rpl_parallel_thread::queued_event *qevs_to_free;
+ rpl_group_info *rgis_to_free;
+ group_commit_orderer *gcos_to_free;
+ size_t total_event_size;
int err;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -234,44 +248,62 @@ handle_rpl_parallel_thread(void *arg)
rpt->running= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
- while (!rpt->stop && !thd->killed)
+ while (!rpt->stop)
{
- rpl_parallel_thread *list;
-
old_msg= thd->proc_info;
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
"Waiting for work from SQL thread");
- while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed &&
- !(rpt->current_entry && rpt->current_entry->force_abort))
+ /*
+ There are 4 cases that should cause us to wake up:
+ - Events have been queued for us to handle.
+ - We have an owner, but no events and not inside event group -> we need
+ to release ourself to the thread pool
+ - SQL thread is stopping, and we have an owner but no events, and we are
+ inside an event group; no more events will be queued to us, so we need
+ to abort the group (force_abort==1).
+ - Thread pool shutdown (rpt->stop==1).
+ */
+ while (!( (events= rpt->event_queue) ||
+ (rpt->current_owner && !in_event_group) ||
+ (rpt->current_owner && group_rgi->parallel_entry->force_abort) ||
+ rpt->stop))
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
- rpt->dequeue(events);
+ rpt->dequeue1(events);
thd->exit_cond(old_msg);
- mysql_cond_signal(&rpt->COND_rpl_thread);
more_events:
+ qevs_to_free= NULL;
+ rgis_to_free= NULL;
+ gcos_to_free= NULL;
+ total_event_size= 0;
while (events)
{
struct rpl_parallel_thread::queued_event *next= events->next;
Log_event_type event_type;
rpl_group_info *rgi= events->rgi;
rpl_parallel_entry *entry= rgi->parallel_entry;
- uint64 wait_for_sub_id;
- uint64 wait_start_sub_id;
- bool end_of_group;
+ bool end_of_group, group_ending;
+ total_event_size+= events->event_size;
if (!events->ev)
{
handle_queued_pos_update(thd, events);
- my_free(events);
+ events->next= qevs_to_free;
+ qevs_to_free= events;
events= next;
continue;
}
err= 0;
group_rgi= rgi;
+ gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
{
+ bool did_enter_cond= false;
+ const char *old_msg= NULL;
+ uint64 wait_count;
+
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -293,50 +325,87 @@ handle_rpl_parallel_thread(void *arg)
occured.
Also do not start parallel execution of this event group until all
- prior groups have committed that are not safe to run in parallel with.
+ prior groups have reached the commit phase that are not safe to run
+ in parallel with.
*/
- wait_for_sub_id= rgi->wait_commit_sub_id;
- wait_start_sub_id= rgi->wait_start_sub_id;
- if (wait_for_sub_id || wait_start_sub_id)
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ if (!gco->installed)
{
- bool did_enter_cond= false;
- const char *old_msg= NULL;
-
- mysql_mutex_lock(&entry->LOCK_parallel_entry);
- if (wait_start_sub_id)
+ if (gco->prev_gco)
+ gco->prev_gco->next_gco= gco;
+ gco->installed= true;
+ }
+ wait_count= gco->wait_count;
+ if (wait_count > entry->count_committing_event_groups)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
+ old_msg= thd->enter_cond(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry,
+ "Waiting for prior transaction to start "
+ "commit before starting next transaction");
+ did_enter_cond= true;
+ do
{
- old_msg= thd->enter_cond(&entry->COND_parallel_entry,
- &entry->LOCK_parallel_entry,
- "Waiting for prior transaction to commit "
- "before starting next transaction");
- did_enter_cond= true;
- DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
- while (wait_start_sub_id > entry->last_committed_sub_id &&
- !thd->check_killed())
- mysql_cond_wait(&entry->COND_parallel_entry,
- &entry->LOCK_parallel_entry);
- if (wait_start_sub_id > entry->last_committed_sub_id)
+ if (thd->check_killed() && !rgi->is_error)
{
- /* The thread got a kill signal. */
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
slave_output_error_info(rgi->rli, thd);
signal_error_to_sql_driver_thread(thd, rgi);
+ /*
+ Even though we were killed, we need to continue waiting for the
+ prior event groups to signal that we can continue. Otherwise we
+ mess up the accounting for ordering. However, now that we have
+ marked the error, events will just be skipped rather than
+ executed, and things will progress quickly towards stop.
+ */
}
- rgi->wait_start_sub_id= 0; /* No need to check again. */
- }
- if (wait_for_sub_id > entry->last_committed_sub_id)
- {
- wait_for_commit *waitee=
- &rgi->wait_commit_group_info->commit_orderer;
- rgi->commit_orderer.register_wait_for_prior_commit(waitee);
- }
- if (did_enter_cond)
- thd->exit_cond(old_msg);
- else
- mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+ mysql_cond_wait(&gco->COND_group_commit_orderer,
+ &entry->LOCK_parallel_entry);
+ } while (wait_count > entry->count_committing_event_groups);
+ }
+
+ if ((tmp_gco= gco->prev_gco))
+ {
+ /*
+ Now all the event groups in the previous batch have entered their
+ commit phase, and will no longer access their gco. So we can free
+ it here.
+ */
+ DBUG_ASSERT(!tmp_gco->prev_gco);
+ gco->prev_gco= NULL;
+ tmp_gco->next_gco= gcos_to_free;
+ gcos_to_free= tmp_gco;
}
+ if (entry->force_abort && wait_count > entry->stop_count)
+ {
+ /*
+ We are stopping (STOP SLAVE), and this event group is beyond the
+ point where we can safely stop. So set a flag that will cause us
+ to skip, rather than execute, the following events.
+ */
+ group_skip_for_stop= true;
+ }
+ else
+ group_skip_for_stop= false;
+
+ if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
+ group_skip_for_stop= true;
+ else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
+ {
+ /*
+ Register that the commit of this event group must wait for the
+ commit of the previous event group to complete before it may
+ complete itself, so that we preserve commit order.
+ */
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+ unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
+ &did_enter_cond, old_msg);
+
if(thd->wait_for_commit_ptr)
{
/*
@@ -353,13 +422,23 @@ handle_rpl_parallel_thread(void *arg)
thd->wait_for_commit_ptr= &rgi->commit_orderer;
}
+ group_ending= event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (((Query_log_event *)events->ev)->is_commit() ||
+ ((Query_log_event *)events->ev)->is_rollback()));
+ if (group_ending)
+ {
+ DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
+ rgi->mark_start_commit();
+ }
+
/*
If the SQL thread is stopping, we just skip execution of all the
following event groups. We still do all the normal waiting and wakeup
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->is_error && !sql_worker_killed(thd, rgi, in_event_group))
+ if (!rgi->is_error && !group_skip_for_stop)
err= rpt_handle_event(events, rpt);
else
err= thd->wait_for_prior_commit();
@@ -367,13 +446,11 @@ handle_rpl_parallel_thread(void *arg)
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
- event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
- (((Query_log_event *)events->ev)->is_commit() ||
- ((Query_log_event *)events->ev)->is_rollback())));
+ group_ending);
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
- my_free(events);
+ events->next= qevs_to_free;
+ qevs_to_free= events;
if (err)
{
@@ -383,10 +460,11 @@ handle_rpl_parallel_thread(void *arg)
if (end_of_group)
{
in_event_group= false;
- finish_event_group(thd, err, event_gtid_sub_id, entry,
- &rgi->commit_orderer);
- delete rgi;
+ finish_event_group(thd, err, event_gtid_sub_id, entry, rgi);
+ rgi->next= rgis_to_free;
+ rgis_to_free= rgi;
group_rgi= rgi= NULL;
+ group_skip_for_stop= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@@ -394,6 +472,29 @@ handle_rpl_parallel_thread(void *arg)
}
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ /* Signal that our queue can now accept more events. */
+ rpt->dequeue2(total_event_size);
+ mysql_cond_signal(&rpt->COND_rpl_thread_queue);
+ /* We need to delay the free here, to when we have the lock. */
+ while (gcos_to_free)
+ {
+ group_commit_orderer *next= gcos_to_free->next_gco;
+ rpt->free_gco(gcos_to_free);
+ gcos_to_free= next;
+ }
+ while (rgis_to_free)
+ {
+ rpl_group_info *next= rgis_to_free->next;
+ rpt->free_rgi(rgis_to_free);
+ rgis_to_free= next;
+ }
+ while (qevs_to_free)
+ {
+ rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+ rpt->free_qev(qevs_to_free);
+ qevs_to_free= next;
+ }
+
if ((events= rpt->event_queue) != NULL)
{
/*
@@ -401,9 +502,8 @@ handle_rpl_parallel_thread(void *arg)
This is faster than having to wakeup the pool manager thread to give us
a new event.
*/
- rpt->dequeue(events);
+ rpt->dequeue1(events);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
- mysql_cond_signal(&rpt->COND_rpl_thread);
goto more_events;
}
@@ -418,27 +518,26 @@ handle_rpl_parallel_thread(void *arg)
half-processed event group.
*/
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ thd->wait_for_prior_commit();
finish_event_group(thd, 1, group_rgi->gtid_sub_id,
- group_rgi->parallel_entry, &group_rgi->commit_orderer);
+ group_rgi->parallel_entry, group_rgi);
signal_error_to_sql_driver_thread(thd, group_rgi);
in_event_group= false;
- delete group_rgi;
- group_rgi= NULL;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_rgi(group_rgi);
+ group_rgi= NULL;
+ group_skip_for_stop= false;
}
if (!in_event_group)
{
+ rpt->current_owner= NULL;
+ /* Tell wait_for_done() that we are done, if it is waiting. */
+ if (likely(rpt->current_entry) &&
+ unlikely(rpt->current_entry->force_abort))
+ mysql_cond_broadcast(&rpt->current_entry->COND_parallel_entry);
rpt->current_entry= NULL;
if (!rpt->stop)
- {
- mysql_mutex_lock(&rpt->pool->LOCK_rpl_thread_pool);
- list= rpt->pool->free_list;
- rpt->next= list;
- rpt->pool->free_list= rpt;
- if (!list)
- mysql_cond_broadcast(&rpt->pool->COND_rpl_thread_pool);
- mysql_mutex_unlock(&rpt->pool->LOCK_rpl_thread_pool);
- }
+ rpt->pool->release_thread(rpt);
}
}
@@ -467,6 +566,15 @@ handle_rpl_parallel_thread(void *arg)
}
+static void
+dealloc_gco(group_commit_orderer *gco)
+{
+ DBUG_ASSERT(!gco->prev_gco /* Must only free after dealloc previous */);
+ mysql_cond_destroy(&gco->COND_group_commit_orderer);
+ my_free(gco);
+}
+
+
int
rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
uint32 new_count, bool skip_check)
@@ -501,8 +609,10 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_init(key_LOCK_rpl_thread, &new_list[i]->LOCK_rpl_thread,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
+ mysql_cond_init(key_COND_rpl_thread_queue,
+ &new_list[i]->COND_rpl_thread_queue, NULL);
new_list[i]->pool= pool;
- if (mysql_thread_create(key_rpl_parallel_thread, &th, NULL,
+ if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
handle_rpl_parallel_thread, new_list[i]))
{
my_error(ER_OUT_OF_RESOURCES, MYF(0));
@@ -539,7 +649,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
*/
for (i= 0; i < pool->count; ++i)
{
- rpl_parallel_thread *rpt= pool->get_thread(NULL);
+ rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL);
rpt->stop= true;
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
@@ -554,6 +664,24 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
mysql_mutex_destroy(&rpt->LOCK_rpl_thread);
mysql_cond_destroy(&rpt->COND_rpl_thread);
+ while (rpt->qev_free_list)
+ {
+ rpl_parallel_thread::queued_event *next= rpt->qev_free_list->next;
+ my_free(rpt->qev_free_list);
+ rpt->qev_free_list= next;
+ }
+ while (rpt->rgi_free_list)
+ {
+ rpl_group_info *next= rpt->rgi_free_list->next;
+ delete rpt->rgi_free_list;
+ rpt->rgi_free_list= next;
+ }
+ while (rpt->gco_free_list)
+ {
+ group_commit_orderer *next= rpt->gco_free_list->next_gco;
+ dealloc_gco(rpt->gco_free_list);
+ rpt->gco_free_list= next;
+ }
}
my_free(pool->threads);
@@ -609,6 +737,121 @@ err:
}
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli)
+{
+ queued_event *qev;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((qev= qev_free_list))
+ qev_free_list= qev->next;
+ else if(!(qev= (queued_event *)my_malloc(sizeof(*qev), MYF(0))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
+ return NULL;
+ }
+ qev->ev= ev;
+ qev->event_size= event_size;
+ qev->next= NULL;
+ strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
+ qev->event_relay_log_pos= rli->event_relay_log_pos;
+ qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
+ strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
+ return qev;
+}
+
+
+void
+rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ qev->next= qev_free_list;
+ qev_free_list= qev;
+}
+
+
+rpl_group_info*
+rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
+ rpl_parallel_entry *e)
+{
+ rpl_group_info *rgi;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((rgi= rgi_free_list))
+ {
+ rgi_free_list= rgi->next;
+ rgi->reinit(rli);
+ }
+ else
+ {
+ if(!(rgi= new rpl_group_info(rli)))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*rgi));
+ return NULL;
+ }
+ rgi->is_parallel_exec = true;
+ if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
+ rgi->deferred_events= new Deferred_log_events(rli);
+ }
+ if (event_group_new_gtid(rgi, gtid_ev))
+ {
+ free_rgi(rgi);
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ return NULL;
+ }
+ rgi->parallel_entry= e;
+
+ return rgi;
+}
+
+
+void
+rpl_parallel_thread::free_rgi(rpl_group_info *rgi)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ DBUG_ASSERT(rgi->commit_orderer.waitee == NULL);
+ rgi->free_annotate_event();
+ if (rgi->deferred_events)
+ {
+ delete rgi->deferred_events;
+ rgi->deferred_events= NULL;
+ }
+ rgi->next= rgi_free_list;
+ rgi_free_list= rgi;
+}
+
+
+group_commit_orderer *
+rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev)
+{
+ group_commit_orderer *gco;
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ if ((gco= gco_free_list))
+ gco_free_list= gco->next_gco;
+ else if(!(gco= (group_commit_orderer *)my_malloc(sizeof(*gco), MYF(0))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gco));
+ return NULL;
+ }
+ mysql_cond_init(key_COND_group_commit_orderer,
+ &gco->COND_group_commit_orderer, NULL);
+ gco->wait_count= wait_count;
+ gco->prev_gco= prev;
+ gco->next_gco= NULL;
+ gco->installed= false;
+ return gco;
+}
+
+
+void
+rpl_parallel_thread::free_gco(group_commit_orderer *gco)
+{
+ mysql_mutex_assert_owner(&LOCK_rpl_thread);
+ DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */);
+ gco->next_gco= gco_free_list;
+ gco_free_list= gco;
+}
+
+
rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: count(0), threads(0), free_list(0), changing(false), inited(false)
{
@@ -651,7 +894,8 @@ rpl_parallel_thread_pool::destroy()
Note that we return with the worker threads's LOCK_rpl_thread mutex locked.
*/
struct rpl_parallel_thread *
-rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
+rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
+ rpl_parallel_entry *entry)
{
rpl_parallel_thread *rpt;
@@ -661,16 +905,151 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry)
free_list= rpt->next;
mysql_mutex_unlock(&LOCK_rpl_thread_pool);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->current_owner= owner;
rpt->current_entry= entry;
return rpt;
}
+/*
+ Release a thread to the thread pool.
+ The thread should be locked, and should not have any work queued for it.
+*/
+void
+rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
+{
+ rpl_parallel_thread *list;
+
+ mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread);
+ DBUG_ASSERT(rpt->current_owner == NULL);
+ mysql_mutex_lock(&LOCK_rpl_thread_pool);
+ list= free_list;
+ rpt->next= list;
+ free_list= rpt;
+ if (!list)
+ mysql_cond_broadcast(&COND_rpl_thread_pool);
+ mysql_mutex_unlock(&LOCK_rpl_thread_pool);
+}
+
+
+/*
+ Obtain a worker thread that we can queue an event to.
+
+ Each invocation allocates a new worker thread, to maximise
+ parallelism. However, only up to a maximum of
+ --slave-domain-parallel-threads workers can be occupied by a single
+ replication domain; after that point, we start re-using worker threads that
+ are still executing events that were queued earlier for this thread.
+
+ We never queue more than --rpl-parallel-wait-queue_max amount of events
+ for one worker, to avoid the SQL driver thread using up all memory with
+ queued events while worker threads are stalling.
+
+ Note that this function returns with rpl_parallel_thread::LOCK_rpl_thread
+ locked. Exception is if we were killed, in which case NULL is returned.
+
+ The *did_enter_cond flag is set true if we had to wait for a worker thread
+ to become free (with mysql_cond_wait()). If so, *old_msg will also be set,
+ and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead
+ of mysql_mutex_unlock.
+
+ If the flag `reuse' is set, the last worker thread will be returned again,
+ if it is still available. Otherwise a new worker thread is allocated.
+*/
+rpl_parallel_thread *
+rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+ const char **old_msg, bool reuse)
+{
+ uint32 idx;
+ rpl_parallel_thread *thr;
+
+ idx= rpl_thread_idx;
+ if (!reuse)
+ {
+ ++idx;
+ if (idx >= rpl_thread_max)
+ idx= 0;
+ rpl_thread_idx= idx;
+ }
+ thr= rpl_threads[idx];
+ if (thr)
+ {
+ *did_enter_cond= false;
+ mysql_mutex_lock(&thr->LOCK_rpl_thread);
+ for (;;)
+ {
+ if (thr->current_owner != &rpl_threads[idx])
+ {
+ /*
+ The worker thread became idle, and returned to the free list and
+ possibly was allocated to a different request. So we should allocate
+ a new worker thread.
+ */
+ unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
+ did_enter_cond, *old_msg);
+ thr= NULL;
+ break;
+ }
+ else if (thr->queued_size <= opt_slave_parallel_max_queued)
+ {
+ /* The thread is ready to queue into. */
+ break;
+ }
+ else if (rli->sql_driver_thd->check_killed())
+ {
+ unlock_or_exit_cond(rli->sql_driver_thd, &thr->LOCK_rpl_thread,
+ did_enter_cond, *old_msg);
+ my_error(ER_CONNECTION_KILLED, MYF(0));
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
+ };);
+ slave_output_error_info(rli, rli->sql_driver_thd);
+ return NULL;
+ }
+ else
+ {
+ /*
+ We have reached the limit of how much memory we are allowed to use
+ for queuing events, so wait for the thread to consume some of its
+ queue.
+ */
+ if (!*did_enter_cond)
+ {
+ /*
+ We need to do the debug_sync before enter_cond().
+ Because debug_sync changes the thd->mysys_var->current_mutex,
+ and this can cause THD::awake to use the wrong mutex.
+ */
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
+ {
+ debug_sync_set_action(rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
+ };);
+ *old_msg= rli->sql_driver_thd->enter_cond
+ (&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread,
+ "Waiting for room in worker thread event queue");
+ *did_enter_cond= true;
+ }
+ mysql_cond_wait(&thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread);
+ }
+ }
+ }
+ if (!thr)
+ rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
+ this);
+
+ return thr;
+}
+
static void
free_rpl_parallel_entry(void *element)
{
rpl_parallel_entry *e= (rpl_parallel_entry *)element;
+ if (e->current_gco)
+ dealloc_gco(e->current_gco);
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e);
@@ -710,10 +1089,22 @@ rpl_parallel::find(uint32 domain_id)
(const uchar *)&domain_id, 0)))
{
/* Allocate a new, empty one. */
- if (!(e= (struct rpl_parallel_entry *)my_malloc(sizeof(*e),
- MYF(MY_ZEROFILL))))
+ ulong count= opt_slave_domain_parallel_threads;
+ if (count == 0 || count > opt_slave_parallel_threads)
+ count= opt_slave_parallel_threads;
+ rpl_parallel_thread **p;
+ if (!my_multi_malloc(MYF(MY_WME|MY_ZEROFILL),
+ &e, sizeof(*e),
+ &p, count*sizeof(*p),
+ NULL))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
return NULL;
+ }
+ e->rpl_threads= p;
+ e->rpl_thread_max= count;
e->domain_id= domain_id;
+ e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
if (my_hash_insert(&domain_hash, (uchar *)e))
{
my_free(e);
@@ -731,10 +1122,11 @@ rpl_parallel::find(uint32 domain_id)
void
-rpl_parallel::wait_for_done()
+rpl_parallel::wait_for_done(THD *thd)
{
struct rpl_parallel_entry *e;
- uint32 i;
+ rpl_parallel_thread *rpt;
+ uint32 i, j;
/*
First signal all workers that they must force quit; no more events will
@@ -742,26 +1134,58 @@ rpl_parallel::wait_for_done()
*/
for (i= 0; i < domain_hash.records; ++i)
{
- rpl_parallel_thread *rpt;
-
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ /*
+ We want the worker threads to stop as quickly as is safe. If the slave
+ SQL threads are behind, we could have significant amount of events
+ queued for the workers, and we want to stop without waiting for them
+ all to be applied first. But if any event group has already started
+ executing in a worker, we want to be sure that all prior event groups
+ are also executed, so that we stop at a consistent point in the binlog
+ stream (per replication domain).
+
+ All event groups wait for e->count_committing_event_groups to reach
+ the value of group_commit_orderer::wait_count before starting to
+ execute. Thus, at this point we know that any event group with a
+ strictly larger wait_count are safe to skip, none of them can have
+ started executing yet. So we set e->stop_count here and use it to
+ decide in the worker threads whether to continue executing an event
+ group or whether to skip it, when force_abort is set.
+ */
e->force_abort= true;
- if ((rpt= e->rpl_thread))
+ e->stop_count= e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ for (j= 0; j < e->rpl_thread_max; ++j)
{
- mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- if (rpt->current_entry == e)
- mysql_cond_signal(&rpt->COND_rpl_thread);
- mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if ((rpt= e->rpl_threads[j]))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ if (rpt->current_owner == &e->rpl_threads[j])
+ mysql_cond_signal(&rpt->COND_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
}
}
+ DBUG_EXECUTE_IF("rpl_parallel_wait_for_done_trigger",
+ {
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now SIGNAL wait_for_done_waiting"));
+ };);
for (i= 0; i < domain_hash.records; ++i)
{
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
- mysql_mutex_lock(&e->LOCK_parallel_entry);
- while (e->current_sub_id > e->last_committed_sub_id)
- mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
- mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ for (j= 0; j < e->rpl_thread_max; ++j)
+ {
+ if ((rpt= e->rpl_threads[j]))
+ {
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ while (rpt->current_owner == &e->rpl_threads[j])
+ mysql_cond_wait(&e->COND_parallel_entry, &rpt->LOCK_rpl_thread);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ }
+ }
}
}
@@ -788,6 +1212,21 @@ rpl_parallel::workers_idle()
/*
+ This is used when we get an error during processing in do_event();
+ We will not queue any event to the thread, but we still need to wake it up
+ to be sure that it will be returned to the pool.
+*/
+static void
+abandon_worker_thread(THD *thd, rpl_parallel_thread *cur_thread,
+ bool *did_enter_cond, const char *old_msg)
+{
+ unlock_or_exit_cond(thd, &cur_thread->LOCK_rpl_thread,
+ did_enter_cond, old_msg);
+ mysql_cond_signal(&cur_thread->COND_rpl_thread);
+}
+
+
+/*
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can execute the query.
@@ -815,6 +1254,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
/*
Stop queueing additional event groups once the SQL thread is requested to
stop.
+
+ We have to queue any remaining events of any event group that has already
+ been partially queued, but after that we will just ignore any further
+ events the SQL driver thread may try to queue, and eventually it will stop.
*/
if (((typ= ev->get_type_code()) == GTID_EVENT ||
!(is_group_event= Log_event::is_group_event(typ))) &&
@@ -823,188 +1266,121 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (sql_thread_stopping)
{
delete ev;
- /* QQ: Need a better comment why we return false here */
+ /*
+ Return false ("no error"); normal stop is not an error, and otherwise the
+ error has already been recorded.
+ */
return false;
}
- if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
- MYF(0))))
+ if (typ == GTID_EVENT || unlikely(!current))
{
- my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ uint32 domain_id;
+ if (likely(typ == GTID_EVENT))
+ {
+ Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ 0 : gtid_ev->domain_id);
+ }
+ else
+ domain_id= 0;
+ if (!(e= find(domain_id)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
+ delete ev;
+ return true;
+ }
+ current= e;
+ }
+ else
+ e= current;
+
+ /*
+ Find a worker thread to queue the event for.
+ Prefer a new thread, so we maximise parallelism (at least for the group
+ commit). But do not exceed a limit of --slave-domain-parallel-threads;
+ instead re-use a thread that we queued for previously.
+ */
+ cur_thread=
+ e->choose_thread(rli, &did_enter_cond, &old_msg, typ != GTID_EVENT);
+ if (!cur_thread)
+ {
+ /* This means we were killed. The error is already signalled. */
+ delete ev;
+ return true;
+ }
+
+ if (!(qev= cur_thread->get_qev(ev, event_size, rli)))
+ {
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
delete ev;
return true;
}
- qev->ev= ev;
- qev->event_size= event_size;
- qev->next= NULL;
- strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
- qev->event_relay_log_pos= rli->event_relay_log_pos;
- qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
- strcpy(qev->future_event_master_log_name, rli->future_event_master_log_name);
if (typ == GTID_EVENT)
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
- 0 : gtid_ev->domain_id);
- if (!(e= find(domain_id)) ||
- !(rgi= new rpl_group_info(rli)) ||
- event_group_new_gtid(rgi, gtid_ev))
+ if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
{
- my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
- delete rgi;
- my_free(qev);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
delete ev;
return true;
}
- rgi->is_parallel_exec = true;
- if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
- rgi->deferred_events= new Deferred_log_events(rli);
- if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
- e->last_commit_id == gtid_ev->commit_id)
- {
- /*
- We are already executing something else in this domain. But the two
- event groups were committed together in the same group commit on the
- master, so we can still do them in parallel here on the slave.
+ /*
+ We queue the event group in a new worker thread, to run in parallel
+ with previous groups.
- However, the commit of this event must wait for the commit of the prior
- event, to preserve binlog commit order and visibility across all
- servers in the replication hierarchy.
+ To preserve commit order within the replication domain, we set up
+ rgi->wait_commit_sub_id to make the new group commit only after the
+ previous group has committed.
- In addition, we must not start executing this event until we have
- finished the previous collection of event groups that group-committed
- together; we use rgi->wait_start_sub_id to control this.
- */
- rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
- rgi->wait_commit_sub_id= e->current_sub_id;
- rgi->wait_commit_group_info= e->current_group_info;
- rgi->wait_start_sub_id= e->prev_groupcommit_sub_id;
- e->rpl_thread= cur_thread= rpt;
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else
+ Event groups that group-committed together on the master can be run
+ in parallel with each other without restrictions. But one batch of
+ group-commits may not start before all groups in the previous batch
+ have initiated their commit phase; we set up rgi->gco to ensure that.
+ */
+ rgi->wait_commit_sub_id= e->current_sub_id;
+ rgi->wait_commit_group_info= e->current_group_info;
+
+ if (!((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
+ e->last_commit_id == gtid_ev->commit_id))
{
/*
- Check if we already have a worker thread for this entry.
-
- We continue to queue more events up for the worker thread while it is
- still executing the first ones, to be able to start executing a large
- event group without having to wait for the end to be fetched from the
- master. And we continue to queue up more events after the first group,
- so that we can continue to process subsequent parts of the relay log in
- parallel without having to wait for previous long-running events to
- complete.
-
- But if the worker thread is idle at any point, it may return to the
- idle list or start servicing a different request. So check this, and
- allocate a new thread if the old one is no longer processing for us.
+ A new batch of transactions that group-committed together on the master.
+
+ Remember the count that marks the end of the previous group committed
+ batch, and allocate a new gco.
*/
- cur_thread= e->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- for (;;)
- {
- if (cur_thread->current_entry != e)
- {
- /*
- The worker thread became idle, and returned to the free list and
- possibly was allocated to a different request. This also means
- that everything previously queued has already been executed,
- else the worker thread would not have become idle. So we should
- allocate a new worker thread.
- */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- e->rpl_thread= cur_thread= NULL;
- break;
- }
- else if (cur_thread->queued_size <= opt_slave_parallel_max_queued)
- break; // The thread is ready to queue into
- else if (rli->sql_driver_thd->check_killed())
- {
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- my_error(ER_CONNECTION_KILLED, MYF(0));
- delete rgi;
- my_free(qev);
- delete ev;
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
- };);
- slave_output_error_info(rli, rli->sql_driver_thd);
- return true;
- }
- else
- {
- /*
- We have reached the limit of how much memory we are allowed to
- use for queuing events, so wait for the thread to consume some
- of its queue.
- */
- if (!did_enter_cond)
- {
- old_msg= rli->sql_driver_thd->enter_cond
- (&cur_thread->COND_rpl_thread, &cur_thread->LOCK_rpl_thread,
- "Waiting for room in worker thread event queue");
- did_enter_cond= true;
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
- };);
- }
- mysql_cond_wait(&cur_thread->COND_rpl_thread,
- &cur_thread->LOCK_rpl_thread);
- }
- }
- }
+ uint64 count= e->count_queued_event_groups;
+ group_commit_orderer *gco;
- if (!cur_thread)
- {
- /*
- Nothing else is currently running in this domain. We can
- spawn a new thread to do this event group in parallel with
- anything else that might be running in other domains.
- */
- cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
- /* get_thread() returns with the LOCK_rpl_thread locked. */
- }
- else
+ if (!(gco= cur_thread->get_gco(count, e->current_gco)))
{
- /*
- We are still executing the previous event group for this replication
- domain, and we have to wait for that to finish before we can start on
- the next one. So just re-use the thread.
- */
+ cur_thread->free_rgi(rgi);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
+ delete ev;
+ return true;
}
-
- rgi->wait_commit_sub_id= 0;
- rgi->wait_start_sub_id= 0;
- e->prev_groupcommit_sub_id= e->current_sub_id;
+ e->current_gco= rgi->gco= gco;
}
-
+ else
+ rgi->gco= e->current_gco;
if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
- {
- e->last_server_id= gtid_ev->server_id;
- e->last_seq_no= gtid_ev->seq_no;
e->last_commit_id= gtid_ev->commit_id;
- }
else
- {
- e->last_server_id= 0;
- e->last_seq_no= 0;
e->last_commit_id= 0;
- }
-
qev->rgi= e->current_group_info= rgi;
e->current_sub_id= rgi->gtid_sub_id;
- current= rgi->parallel_entry= e;
+ ++e->count_queued_event_groups;
}
- else if (!is_group_event || !current)
+ else if (!is_group_event || !e)
{
my_off_t log_pos;
int err;
@@ -1014,7 +1390,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Same for events not preceeded by GTID (we should not see those normally,
but they might be from an old master).
- The varuable `current' is NULL for the case where the master did not
+ The variable `e' is NULL for the case where the master did not
have GTID, like a MariaDB 5.5 or MySQL master.
*/
qev->rgi= serial_rgi;
@@ -1041,18 +1417,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (err)
{
- my_free(qev);
+ cur_thread->free_qev(qev);
+ abandon_worker_thread(rli->sql_driver_thd, cur_thread,
+ &did_enter_cond, old_msg);
return true;
}
- qev->ev= NULL;
- qev->future_event_master_log_pos= log_pos;
- if (!current)
- {
- rli->event_relay_log_pos= rli->future_event_relay_log_pos;
- handle_queued_pos_update(rli->sql_driver_thd, qev);
- my_free(qev);
- return false;
- }
/*
Queue an empty event, so that the position will be updated in a
reasonable way relative to other events:
@@ -1065,40 +1434,12 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
- cur_thread= current->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != current)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- cur_thread= NULL;
- }
- }
- if (!cur_thread)
- cur_thread= current->rpl_thread=
- global_rpl_thread_pool.get_thread(current);
+ qev->ev= NULL;
+ qev->future_event_master_log_pos= log_pos;
}
else
{
- cur_thread= current->rpl_thread;
- if (cur_thread)
- {
- mysql_mutex_lock(&cur_thread->LOCK_rpl_thread);
- if (cur_thread->current_entry != current)
- {
- /* Not ours anymore, we need to grab a new one. */
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
- cur_thread= NULL;
- }
- }
- if (!cur_thread)
- {
- cur_thread= current->rpl_thread=
- global_rpl_thread_pool.get_thread(current);
- }
- qev->rgi= current->current_group_info;
+ qev->rgi= e->current_group_info;
}
/*
@@ -1106,10 +1447,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
cur_thread->enqueue(qev);
- if (did_enter_cond)
- rli->sql_driver_thd->exit_cond(old_msg);
- else
- mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread);
+ unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
+ &did_enter_cond, old_msg);
mysql_cond_signal(&cur_thread->COND_rpl_thread);
return false;