diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-10-31 14:11:41 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-10-31 14:11:41 +0100 |
commit | 39df665a3332bd9bfb2529419f534a49cfac388c (patch) | |
tree | 0b431c7e00a4ef8344a5fbde2cdac4935ec204b1 /sql/rpl_parallel.cc | |
parent | 9c8da4ed762a4ad092e23cc07c34212320341ac1 (diff) | |
download | mariadb-git-39df665a3332bd9bfb2529419f534a49cfac388c.tar.gz |
MDEV-5206: Incorrect slave old-style position in MDEV-4506, parallel replication.
In parallel replication, there are two kinds of events which are
executed in different ways.
Normal events that are part of event groups/transactions are executed
asynchroneously by being queued for a worker thread.
Other events like format description and rotate and such are executed
directly in the driver SQL thread.
If the direct execution of the other events were to update the old-style
position, then the position gets updated too far ahead, before the normal
events that have been queued for a worker thread have been executed. So
this patch adds some special cases to prevent such position updates ahead
of time, and instead queues dummy events for the worker threads, so that
they will at an appropriate time do the position updates instead.
(Also fix a race in a test case that happened to trigger while running
tests for this patch).
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 105 |
1 files changed, 99 insertions, 6 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index d62bec6e605..8328dd24128 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -56,6 +56,48 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, } +static void +handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) +{ + int cmp; + Relay_log_info *rli; + /* + Events that are not part of an event group, such as Format Description, + Stop, GTID List and such, are executed directly in the driver SQL thread, + to keep the relay log state up-to-date. But the associated position update + is done here, in sync with other normal events as they are queued to + worker threads. + */ + if ((thd->variables.option_bits & OPTION_BEGIN) && + opt_using_transactions) + return; + rli= qev->rgi->rli; + mysql_mutex_lock(&rli->data_lock); + cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name); + if (cmp < 0) + { + rli->group_relay_log_pos= qev->future_event_relay_log_pos; + strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name); + rli->notify_group_relay_log_name_update(); + } else if (cmp == 0 && + rli->group_relay_log_pos < qev->future_event_relay_log_pos) + rli->group_relay_log_pos= qev->future_event_relay_log_pos; + + cmp= strcmp(rli->group_master_log_name, qev->future_event_master_log_name); + if (cmp < 0) + { + strcpy(rli->group_master_log_name, qev->future_event_master_log_name); + rli->notify_group_master_log_name_update(); + rli->group_master_log_pos= qev->future_event_master_log_pos; + } + else if (cmp == 0 + && rli->group_master_log_pos < qev->future_event_master_log_pos) + rli->group_master_log_pos= qev->future_event_master_log_pos; + mysql_mutex_unlock(&rli->data_lock); + mysql_cond_broadcast(&rli->data_cond); +} + + static bool sql_worker_killed(THD *thd, rpl_group_info *rgi, bool in_event_group) { @@ -142,16 +184,24 @@ handle_rpl_parallel_thread(void *arg) while (events) { struct rpl_parallel_thread::queued_event *next= events->next; - Log_event_type event_type= events->ev->get_type_code(); + Log_event_type event_type; rpl_group_info *rgi= events->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; uint64 wait_for_sub_id; uint64 wait_start_sub_id; bool end_of_group; + if (!events->ev) + { + handle_queued_pos_update(thd, events); + my_free(events); + events= next; + continue; + } + err= 0; /* Handle a new event group, which will be initiated by a GTID event. */ - if (event_type == GTID_EVENT) + if ((event_type= events->ev->get_type_code()) == GTID_EVENT) { in_event_group= true; /* @@ -794,13 +844,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, e->last_commit_id= 0; } - e->current_group_info= rgi; + qev->rgi= e->current_group_info= rgi; e->current_sub_id= rgi->gtid_sub_id; current= rgi->parallel_entry= e; } else if (!is_group_event || !current) { + my_off_t log_pos; int err; + bool tmp; /* Events like ROTATE and FORMAT_DESCRIPTION. Do not run in worker thread. Same for events not preceeded by GTID (we should not see those normally, @@ -824,11 +876,52 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } } + tmp= serial_rgi->is_parallel_exec; + serial_rgi->is_parallel_exec= true; err= rpt_handle_event(qev, NULL); + serial_rgi->is_parallel_exec= tmp; + log_pos= qev->ev->log_pos; delete_or_keep_event_post_apply(serial_rgi, typ, qev->ev); - my_free(qev); - return (err != 0); + if (err) + { + my_free(qev); + return true; + } + qev->ev= NULL; + qev->future_event_master_log_pos= log_pos; + if (!current) + { + handle_queued_pos_update(rli->sql_driver_thd, qev); + my_free(qev); + return false; + } + /* + Queue an empty event, so that the position will be updated in a + reasonable way relative to other events: + + - If the currently executing events are queued serially for a single + thread, the position will only be updated when everything before has + completed. + + - If we are executing multiple independent events in parallel, then at + least the position will not be updated until one of them has reached + the current point. + */ + cur_thread= current->rpl_thread; + if (cur_thread) + { + mysql_mutex_lock(&cur_thread->LOCK_rpl_thread); + if (cur_thread->current_entry != current) + { + /* Not ours anymore, we need to grab a new one. */ + mysql_mutex_unlock(&cur_thread->LOCK_rpl_thread); + cur_thread= NULL; + } + } + if (!cur_thread) + cur_thread= current->rpl_thread= + global_rpl_thread_pool.get_thread(current); } else { @@ -848,8 +941,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, cur_thread= current->rpl_thread= global_rpl_thread_pool.get_thread(current); } + qev->rgi= current->current_group_info; } - qev->rgi= current->current_group_info; /* Queue the event for processing. |