summaryrefslogtreecommitdiff
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc644
1 files changed, 547 insertions, 97 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 90ee2360eb7..9b91206ca75 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -4,18 +4,8 @@
#include "rpl_mi.h"
#include "debug_sync.h"
-
/*
Code for optional parallel execution of replicated events on the slave.
-
- ToDo list:
-
- - Retry of failed transactions is not yet implemented for the parallel case.
-
- - All the waits (eg. in struct wait_for_commit and in
- rpl_parallel_thread_pool::get_thread()) need to be killable. And on kill,
- everything needs to be correctly rolled back and stopped in all threads,
- to ensure a consistent slave replication state.
*/
struct rpl_parallel_thread_pool global_rpl_thread_pool;
@@ -31,20 +21,22 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
+ Log_event *ev;
+
+ DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_EVENT);
+ ev= qev->ev;
- thd->rgi_slave= rgi;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
+ ev->thd= thd;
- /* ToDo: Access to thd, and what about rli, split out a parallel part? */
- mysql_mutex_lock(&rli->data_lock);
- qev->ev->thd= thd;
strcpy(rgi->event_relay_log_name_buf, qev->event_relay_log_name);
rgi->event_relay_log_name= rgi->event_relay_log_name_buf;
rgi->event_relay_log_pos= qev->event_relay_log_pos;
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
- err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
- thd->rgi_slave= NULL;
+ mysql_mutex_lock(&rli->data_lock);
+ /* Mutex will be released in apply_event_and_update_pos(). */
+ err= apply_event_and_update_pos(ev, thd, rgi, rpt);
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
@@ -58,6 +50,8 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
{
int cmp;
Relay_log_info *rli;
+ rpl_parallel_entry *e;
+
/*
Events that are not part of an event group, such as Format Description,
Stop, GTID List and such, are executed directly in the driver SQL thread,
@@ -68,6 +62,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
if ((thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions)
return;
+
+ /* Do not update position if an earlier event group caused an error abort. */
+ DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
+ e= qev->entry_for_queued;
+ if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort)
+ return;
+
rli= qev->rgi->rli;
mysql_mutex_lock(&rli->data_lock);
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
@@ -165,6 +166,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
thd->clear_error();
+ thd->reset_killed();
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
}
@@ -197,6 +199,290 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
}
+static void
+register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
+ rpl_parallel_entry *entry)
+{
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+ if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
+ {
+ /*
+ Register that the commit of this event group must wait for the
+ commit of the previous event group to complete before it may
+ complete itself, so that we preserve commit order.
+ */
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+}
+
+
+#ifndef DBUG_OFF
+static int
+dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
+{
+ if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 &&
+ rgi->retry_event_count == 4)
+ {
+ thd->clear_error();
+ thd->get_stmt_da()->reset_diagnostics_area();
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ return 1;
+ }
+ return 0;
+}
+#endif
+
+
+/*
+ If we detect a deadlock due to eg. storage engine locks that conflict with
+ the fixed commit order, then the later transaction will be killed
+ asynchroneously to allow the former to complete its commit.
+
+ In this case, we convert the 'killed' error into a deadlock error, and retry
+ the later transaction. */
+static void
+convert_kill_to_deadlock_error(rpl_group_info *rgi)
+{
+ THD *thd= rgi->thd;
+ int err_code;
+
+ if (!thd->get_stmt_da()->is_error())
+ return;
+ err_code= thd->get_stmt_da()->sql_errno();
+ if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
+ rgi->killed_for_retry)
+ {
+ thd->clear_error();
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ rgi->killed_for_retry= false;
+ thd->reset_killed();
+ }
+}
+
+
+static bool
+is_group_ending(Log_event *ev, Log_event_type event_type)
+{
+ return event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (((Query_log_event *)ev)->is_commit() ||
+ ((Query_log_event *)ev)->is_rollback()));
+}
+
+
+static int
+retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
+ rpl_parallel_thread::queued_event *orig_qev)
+{
+ IO_CACHE rlog;
+ LOG_INFO linfo;
+ File fd= (File)-1;
+ const char *errmsg= NULL;
+ inuse_relaylog *ir= rgi->relay_log;
+ uint64 event_count;
+ uint64 events_to_execute= rgi->retry_event_count;
+ Relay_log_info *rli= rgi->rli;
+ int err;
+ ulonglong cur_offset, old_offset;
+ char log_name[FN_REFLEN];
+ THD *thd= rgi->thd;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
+ ulong retries= 0;
+
+do_retry:
+ event_count= 0;
+ err= 0;
+
+ /*
+ If we already started committing before getting the deadlock (or other
+ error) that caused us to need to retry, we have already signalled
+ subsequent transactions that we have started committing. This is
+ potentially a problem, as now we will rollback, and if subsequent
+ transactions would start to execute now, they could see an unexpected
+ state of the database and get eg. key not found or duplicate key error.
+
+ However, to get a deadlock in the first place, there must have been
+ another earlier transaction that is waiting for us. Thus that other
+ transaction has _not_ yet started to commit, and any subsequent
+ transactions will still be waiting at this point.
+
+ So here, we decrement back the count of transactions that started
+ committing (if we already incremented it), undoing the effect of an
+ earlier mark_start_commit(). Then later, when the retry succeeds and we
+ commit again, we can do a new mark_start_commit() and eventually wake up
+ subsequent transactions at the proper time.
+
+ We need to do the unmark before the rollback, to be sure that the
+ transaction we deadlocked with will not signal that it started to commit
+ until after the unmark.
+ */
+ rgi->unmark_start_commit();
+
+ /*
+ We might get the deadlock error that causes the retry during commit, while
+ sitting in wait_for_prior_commit(). If this happens, we will have a
+ pending error in the wait_for_commit object. So clear this by
+ unregistering (and later re-registering) the wait.
+ */
+ if(thd->wait_for_commit_ptr)
+ thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
+ rgi->cleanup_context(thd, 1);
+
+ /*
+ If we retry due to a deadlock kill that occured during the commit step, we
+ might have already updated (but not committed) an update of table
+ mysql.gtid_slave_pos, and cleared the gtid_pending flag. Now we have
+ rolled back any such update, so we must set the gtid_pending flag back to
+ true so that we will do a new update when/if we succeed with the retry.
+ */
+ rgi->gtid_pending= true;
+
+ mysql_mutex_lock(&rli->data_lock);
+ ++rli->retried_trans;
+ statistic_increment(slave_retried_transactions, LOCK_status);
+ mysql_mutex_unlock(&rli->data_lock);
+
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ register_wait_for_prior_event_group_commit(rgi, entry);
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
+ strmake_buf(log_name, ir->name);
+ if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+ {
+ err= 1;
+ goto err;
+ }
+ cur_offset= rgi->retry_start_offset;
+ my_b_seek(&rlog, cur_offset);
+
+ do
+ {
+ Log_event_type event_type;
+ Log_event *ev;
+ rpl_parallel_thread::queued_event *qev;
+
+ /* The loop is here so we can try again the next relay log file on EOF. */
+ for (;;)
+ {
+ old_offset= cur_offset;
+ ev= Log_event::read_log_event(&rlog, 0,
+ rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */,
+ opt_slave_sql_verify_checksum);
+ cur_offset= my_b_tell(&rlog);
+
+ if (ev)
+ break;
+ if (rlog.error < 0)
+ {
+ errmsg= "slave SQL thread aborted because of I/O error";
+ err= 1;
+ goto err;
+ }
+ if (rlog.error > 0)
+ {
+ sql_print_error("Slave SQL thread: I/O error reading "
+ "event(errno: %d cur_log->error: %d)",
+ my_errno, rlog.error);
+ errmsg= "Aborting slave SQL thread because of partial event read";
+ err= 1;
+ goto err;
+ }
+ /* EOF. Move to the next relay log. */
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ fd= (File)-1;
+
+ /* Find the next relay log file. */
+ if((err= rli->relay_log.find_log_pos(&linfo, log_name, 1)) ||
+ (err= rli->relay_log.find_next_log(&linfo, 1)))
+ {
+ char buff[22];
+ sql_print_error("next log error: %d offset: %s log: %s",
+ err,
+ llstr(linfo.index_file_offset, buff),
+ log_name);
+ goto err;
+ }
+ strmake_buf(log_name ,linfo.log_file_name);
+
+ if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
+ {
+ err= 1;
+ goto err;
+ }
+ /* Loop to try again on the new log file. */
+ }
+
+ event_type= ev->get_type_code();
+ if (!Log_event::is_group_event(event_type))
+ {
+ delete ev;
+ continue;
+ }
+ ev->thd= thd;
+
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ qev= rpt->retry_get_qev(ev, orig_qev, log_name, cur_offset,
+ cur_offset - old_offset);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+ if (!qev)
+ {
+ delete ev;
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ err= 1;
+ goto err;
+ }
+ if (is_group_ending(ev, event_type))
+ rgi->mark_start_commit();
+
+ err= rpt_handle_event(qev, rpt);
+ ++event_count;
+ mysql_mutex_lock(&rpt->LOCK_rpl_thread);
+ rpt->free_qev(qev);
+ mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
+
+ delete_or_keep_event_post_apply(rgi, event_type, ev);
+ DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
+ if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
+ DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
+ err= dbug_simulate_tmp_error(rgi, thd););
+ if (err)
+ {
+ convert_kill_to_deadlock_error(rgi);
+ if (has_temporary_error(thd))
+ {
+ ++retries;
+ if (retries < slave_trans_retries)
+ {
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ fd= (File)-1;
+ goto do_retry;
+ }
+ sql_print_error("Slave worker thread retried transaction %lu time(s) "
+ "in vain, giving up. Consider raising the value of "
+ "the slave_transaction_retries variable.",
+ slave_trans_retries);
+ }
+ goto err;
+ }
+ } while (event_count < events_to_execute);
+
+err:
+
+ if (fd >= 0)
+ {
+ end_io_cache(&rlog);
+ mysql_file_close(fd, MYF(MY_WME));
+ }
+ if (errmsg)
+ sql_print_error("Error reading relay log event: %s", errmsg);
+ return err;
+}
+
+
pthread_handler_t
handle_rpl_parallel_thread(void *arg)
{
@@ -215,6 +501,8 @@ handle_rpl_parallel_thread(void *arg)
rpl_sql_thread_info sql_info(NULL);
size_t total_event_size;
int err;
+ inuse_relaylog *last_ir;
+ uint64 accumulated_ir_count;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -244,39 +532,6 @@ handle_rpl_parallel_thread(void *arg)
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
- /*
- For now, we need to run the replication parallel worker threads in
- READ COMMITTED. This is needed because gap locks are not symmetric.
- For example, a gap lock from a DELETE blocks an insert intention lock,
- but not vice versa. So an INSERT followed by DELETE can group commit
- on the master, but if we are unlucky with thread scheduling we can
- then deadlock on the slave because the INSERT ends up waiting for a
- gap lock from the DELETE (and the DELETE in turn waits for the INSERT
- in wait_for_prior_commit()). See also MDEV-5914.
-
- It should be mostly safe to run in READ COMMITTED in the slave anyway.
- The commit order is already fixed from on the master, so we do not
- risk logging into the binlog in an incorrect order between worker
- threads (one that would cause different results if executed on a
- lower-level slave that uses this slave as a master). The only
- potential problem is with transactions run in a different master
- connection (using multi-source replication), or run directly on the
- slave by an application; when using READ COMMITTED we are not
- guaranteed serialisability of binlogged statements.
-
- In practice, this is unlikely to be an issue. In GTID mode, such
- parallel transactions from multi-source or application must in any
- case use a different replication domain, in which case binlog order
- by definition must be independent between the different domain. Even
- in non-GTID mode, normally one will assume that the external
- transactions are not conflicting with those applied by the slave, so
- that isolation level should make no difference. It would be rather
- strange if the result of applying query events from one master would
- depend on the timing and nature of other queries executed from
- different multi-source connections or done directly on the slave by
- an application. Still, something to be aware of.
- */
- thd->variables.tx_isolation= ISO_READ_COMMITTED;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
@@ -323,7 +578,7 @@ handle_rpl_parallel_thread(void *arg)
bool end_of_group, group_ending;
total_event_size+= events->event_size;
- if (!events->ev)
+ if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE)
{
handle_queued_pos_update(thd, events);
events->next= qevs_to_free;
@@ -331,8 +586,33 @@ handle_rpl_parallel_thread(void *arg)
events= next;
continue;
}
+ else if (events->typ ==
+ rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART)
+ {
+ if (in_event_group)
+ {
+ /*
+ Master restarted (crashed) in the middle of an event group.
+ So we need to roll back and discard that event group.
+ */
+ group_rgi->cleanup_context(thd, 1);
+ in_event_group= false;
+ finish_event_group(thd, group_rgi->gtid_sub_id,
+ events->entry_for_queued, group_rgi);
+
+ group_rgi->next= rgis_to_free;
+ rgis_to_free= group_rgi;
+ thd->rgi_slave= group_rgi= NULL;
+ }
+
+ events->next= qevs_to_free;
+ qevs_to_free= events;
+ events= next;
+ continue;
+ }
+ DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT);
- group_rgi= rgi;
+ thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
@@ -341,7 +621,6 @@ handle_rpl_parallel_thread(void *arg)
PSI_stage_info old_stage;
uint64 wait_count;
- thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@@ -352,9 +631,7 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
- /* Save this, as it gets cleared when the event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
-
rgi->thd= thd;
/*
@@ -388,7 +665,7 @@ handle_rpl_parallel_thread(void *arg)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior_killed");
thd->send_kill_message();
- slave_output_error_info(rgi->rli, thd);
+ slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
/*
Even though we were killed, we need to continue waiting for the
@@ -430,17 +707,9 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
- else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
- {
- /*
- Register that the commit of this event group must wait for the
- commit of the previous event group to complete before it may
- complete itself, so that we preserve commit order.
- */
- wait_for_commit *waitee=
- &rgi->wait_commit_group_info->commit_orderer;
- rgi->commit_orderer.register_wait_for_prior_commit(waitee);
- }
+ else
+ register_wait_for_prior_event_group_commit(rgi, entry);
+
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
&did_enter_cond, &old_stage);
@@ -467,7 +736,7 @@ handle_rpl_parallel_thread(void *arg)
if (res < 0)
{
/* Error. */
- slave_output_error_info(rgi->rli, thd);
+ slave_output_error_info(rgi, thd);
signal_error_to_sql_driver_thread(thd, rgi, 1);
}
else if (!res)
@@ -482,11 +751,8 @@ handle_rpl_parallel_thread(void *arg)
}
}
- group_ending= event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
- (((Query_log_event *)events->ev)->is_commit() ||
- ((Query_log_event *)events->ev)->is_rollback()));
- if (group_ending)
+ group_ending= is_group_ending(events->ev, event_type);
+ if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
rgi->mark_start_commit();
@@ -498,24 +764,42 @@ handle_rpl_parallel_thread(void *arg)
processing between the event groups as a simple way to ensure that
everything is stopped and cleaned up correctly.
*/
- if (!rgi->worker_error && !skip_event_group)
+ if (likely(!rgi->worker_error) && !skip_event_group)
+ {
+ ++rgi->retry_event_count;
err= rpt_handle_event(events, rpt);
+ delete_or_keep_event_post_apply(rgi, event_type, events->ev);
+ DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
+ err= dbug_simulate_tmp_error(rgi, thd););
+ if (err)
+ {
+ convert_kill_to_deadlock_error(rgi);
+ if (has_temporary_error(thd) && slave_trans_retries > 0)
+ err= retry_event_group(rgi, rpt, events);
+ }
+ }
else
+ {
+ delete events->ev;
err= thd->wait_for_prior_commit();
+ }
end_of_group=
in_event_group &&
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
group_ending);
- delete_or_keep_event_post_apply(rgi, event_type, events->ev);
events->next= qevs_to_free;
qevs_to_free= events;
- if (unlikely(err) && !rgi->worker_error)
+ if (unlikely(err))
{
- slave_output_error_info(rgi->rli, thd);
- signal_error_to_sql_driver_thread(thd, rgi, err);
+ if (!rgi->worker_error)
+ {
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, err);
+ }
+ thd->reset_killed();
}
if (end_of_group)
{
@@ -523,7 +807,7 @@ handle_rpl_parallel_thread(void *arg)
finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
- group_rgi= rgi= NULL;
+ thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@@ -548,12 +832,34 @@ handle_rpl_parallel_thread(void *arg)
rpt->free_rgi(rgis_to_free);
rgis_to_free= next;
}
+ last_ir= NULL;
+ accumulated_ir_count= 0;
while (qevs_to_free)
{
rpl_parallel_thread::queued_event *next= qevs_to_free->next;
+ inuse_relaylog *ir= qevs_to_free->ir;
+ /* Batch up refcount update to reduce use of synchronised operations. */
+ if (last_ir != ir)
+ {
+ if (last_ir)
+ {
+ my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
+ accumulated_ir_count= 0;
+ }
+ last_ir= ir;
+ }
+ ++accumulated_ir_count;
rpt->free_qev(qevs_to_free);
qevs_to_free= next;
}
+ if (last_ir)
+ {
+ my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock);
+ my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count);
+ my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock);
+ }
if ((events= rpt->event_queue) != NULL)
{
@@ -584,7 +890,7 @@ handle_rpl_parallel_thread(void *arg)
in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);
- group_rgi= NULL;
+ thd->rgi_slave= group_rgi= NULL;
skip_event_group= false;
}
if (!in_event_group)
@@ -802,8 +1108,7 @@ err:
rpl_parallel_thread::queued_event *
-rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
- Relay_log_info *rli)
+rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size)
{
queued_event *qev;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -814,9 +1119,21 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*qev));
return NULL;
}
+ qev->typ= rpl_parallel_thread::queued_event::QUEUED_EVENT;
qev->ev= ev;
qev->event_size= event_size;
qev->next= NULL;
+ return qev;
+}
+
+
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
+ Relay_log_info *rli)
+{
+ queued_event *qev= get_qev_common(ev, event_size);
+ if (!qev)
+ return NULL;
strcpy(qev->event_relay_log_name, rli->event_relay_log_name);
qev->event_relay_log_pos= rli->event_relay_log_pos;
qev->future_event_relay_log_pos= rli->future_event_relay_log_pos;
@@ -825,6 +1142,24 @@ rpl_parallel_thread::get_qev(Log_event *ev, ulonglong event_size,
}
+rpl_parallel_thread::queued_event *
+rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev,
+ const char *relay_log_name,
+ ulonglong event_pos, ulonglong event_size)
+{
+ queued_event *qev= get_qev_common(ev, event_size);
+ if (!qev)
+ return NULL;
+ qev->rgi= orig_qev->rgi;
+ strcpy(qev->event_relay_log_name, relay_log_name);
+ qev->event_relay_log_pos= event_pos;
+ qev->future_event_relay_log_pos= event_pos+event_size;
+ strcpy(qev->future_event_master_log_name,
+ orig_qev->future_event_master_log_name);
+ return qev;
+}
+
+
void
rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
{
@@ -836,7 +1171,7 @@ rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev)
rpl_group_info*
rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
- rpl_parallel_entry *e)
+ rpl_parallel_entry *e, ulonglong event_size)
{
rpl_group_info *rgi;
mysql_mutex_assert_owner(&LOCK_rpl_thread);
@@ -864,6 +1199,10 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
return NULL;
}
rgi->parallel_entry= e;
+ rgi->relay_log= rli->last_inuse_relaylog;
+ rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
+ rgi->retry_event_count= 0;
+ rgi->killed_for_retry= false;
return rgi;
}
@@ -1018,10 +1357,11 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
if it is still available. Otherwise a new worker thread is allocated.
*/
rpl_parallel_thread *
-rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
+rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage, bool reuse)
{
uint32 idx;
+ Relay_log_info *rli= rgi->rli;
rpl_parallel_thread *thr;
idx= rpl_thread_idx;
@@ -1066,7 +1406,7 @@ rpl_parallel_entry::choose_thread(Relay_log_info *rli, bool *did_enter_cond,
debug_sync_set_action(rli->sql_driver_thd,
STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
};);
- slave_output_error_info(rli, rli->sql_driver_thd);
+ slave_output_error_info(rgi, rli->sql_driver_thd);
return NULL;
}
else
@@ -1300,6 +1640,91 @@ rpl_parallel::workers_idle()
}
+int
+rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
+ Format_description_log_event *fdev)
+{
+ uint32 idx;
+ rpl_parallel_thread *thr;
+ rpl_parallel_thread::queued_event *qev;
+ Relay_log_info *rli= rgi->rli;
+
+ /*
+ We only need to queue the server restart if we still have a thread working
+ on a (potentially partial) event group.
+
+ If the last thread we queued for has finished, then it cannot have any
+ partial event group that needs aborting.
+
+ Thus there is no need for the full complexity of choose_thread(). We only
+ need to check if we have a current worker thread, and queue for it if so.
+ */
+ idx= rpl_thread_idx;
+ thr= rpl_threads[idx];
+ if (!thr)
+ return 0;
+ mysql_mutex_lock(&thr->LOCK_rpl_thread);
+ if (thr->current_owner != &rpl_threads[idx])
+ {
+ /* No active worker thread, so no need to queue the master restart. */
+ mysql_mutex_unlock(&thr->LOCK_rpl_thread);
+ return 0;
+ }
+
+ if (!(qev= thr->get_qev(fdev, 0, rli)))
+ {
+ mysql_mutex_unlock(&thr->LOCK_rpl_thread);
+ return 1;
+ }
+
+ qev->rgi= rgi;
+ qev->typ= rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART;
+ qev->entry_for_queued= this;
+ qev->ir= rli->last_inuse_relaylog;
+ ++qev->ir->queued_count;
+ thr->enqueue(qev);
+ mysql_mutex_unlock(&thr->LOCK_rpl_thread);
+ return 0;
+}
+
+
+int
+rpl_parallel::wait_for_workers_idle(THD *thd)
+{
+ uint32 i, max_i;
+
+ /*
+ The domain_hash is only accessed by the SQL driver thread, so it is safe
+ to iterate over without a lock.
+ */
+ max_i= domain_hash.records;
+ for (i= 0; i < max_i; ++i)
+ {
+ bool active;
+ wait_for_commit my_orderer;
+ struct rpl_parallel_entry *e;
+
+ e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ if ((active= (e->current_sub_id > e->last_committed_sub_id)))
+ {
+ wait_for_commit *waitee= &e->current_group_info->commit_orderer;
+ my_orderer.register_wait_for_prior_commit(waitee);
+ thd->wait_for_commit_ptr= &my_orderer;
+ }
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ if (active)
+ {
+ int err= my_orderer.wait_for_prior_commit(thd);
+ thd->wait_for_commit_ptr= NULL;
+ if (err)
+ return err;
+ }
+ }
+ return 0;
+}
+
+
/*
This is used when we get an error during processing in do_event();
We will not queue any event to the thread, but we still need to wake it up
@@ -1367,6 +1792,33 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
/* ToDo: what to do with this lock?!? */
mysql_mutex_unlock(&rli->data_lock);
+ if (typ == FORMAT_DESCRIPTION_EVENT)
+ {
+ Format_description_log_event *fdev=
+ static_cast<Format_description_log_event *>(ev);
+ if (fdev->created)
+ {
+ /*
+ This format description event marks a new binlog after a master server
+ restart. We are going to close all temporary tables to clean up any
+ possible left-overs after a prior master crash.
+
+ Thus we need to wait for all prior events to execute to completion,
+ in case they need access to any of the temporary tables.
+
+ We also need to notify the worker thread running the prior incomplete
+ event group (if any), as such event group signifies an incompletely
+ written group cut short by a master crash, and must be rolled back.
+ */
+ if (current->queue_master_restart(serial_rgi, fdev) ||
+ wait_for_workers_idle(rli->sql_driver_thd))
+ {
+ delete ev;
+ return 1;
+ }
+ }
+ }
+
/*
Stop queueing additional event groups once the SQL thread is requested to
stop.
@@ -1390,15 +1842,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
if (typ == GTID_EVENT)
{
- uint32 domain_id;
- if (likely(typ == GTID_EVENT))
- {
- Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
- 0 : gtid_ev->domain_id);
- }
- else
- domain_id= 0;
+ Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+ uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
+ 0 : gtid_ev->domain_id);
if (!(e= find(domain_id)))
{
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
@@ -1417,7 +1863,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
instead re-use a thread that we queued for previously.
*/
cur_thread=
- e->choose_thread(rli, &did_enter_cond, &old_stage, typ != GTID_EVENT);
+ e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
+ typ != GTID_EVENT);
if (!cur_thread)
{
/* This means we were killed. The error is already signalled. */
@@ -1437,7 +1884,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
{
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
- if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e)))
+ if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
{
cur_thread->free_qev(qev);
abandon_worker_thread(rli->sql_driver_thd, cur_thread,
@@ -1527,7 +1974,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
return 1;
}
/*
- Queue an empty event, so that the position will be updated in a
+ Queue a position update, so that the position will be updated in a
reasonable way relative to other events:
- If the currently executing events are queued serially for a single
@@ -1538,7 +1985,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
least the position will not be updated until one of them has reached
the current point.
*/
- qev->ev= NULL;
+ qev->typ= rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE;
+ qev->entry_for_queued= e;
}
else
{
@@ -1549,6 +1997,8 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
Queue the event for processing.
*/
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
+ qev->ir= rli->last_inuse_relaylog;
+ ++qev->ir->queued_count;
cur_thread->enqueue(qev);
unlock_or_exit_cond(rli->sql_driver_thd, &cur_thread->LOCK_rpl_thread,
&did_enter_cond, &old_stage);