summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2014-06-03 10:31:11 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2014-06-03 10:31:11 +0200
commit629b822913348cec56ec7a80a236f0ba2e613585 (patch)
tree8c713a0ad975deb8b6764af03b1cca8e8cd195db /sql
parent787c470cef54574e744eb5dfd9153d837fe67e45 (diff)
downloadmariadb-git-629b822913348cec56ec7a80a236f0ba2e613585.tar.gz
MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel
replication causing replication to fail. In parallel replication, we run transactions from the master in parallel, but force them to commit in the same order they did on the master. If we force T1 to commit before T2, but T2 holds eg. a row lock that is needed by T1, we get a deadlock when T2 waits until T1 has committed. Usually, we do not run T1 and T2 in parallel if there is a chance that they can have conflicting locks like this, but there are certain edge cases where it can occasionally happen (eg. MDEV-5914, MDEV-5941, MDEV-6020). The bug was that this would cause replication to hang, eventually getting a lock timeout and causing the slave to stop with error. With this patch, InnoDB will report back to the upper layer whenever a transactions T1 is about to do a lock wait on T2. If T1 and T2 are parallel replication transactions, and T2 needs to commit later than T1, we can thus detect the deadlock; we then kill T2, setting a flag that causes it to catch the kill and convert it to a deadlock error; this error will then cause T2 to roll back and release its locks (so that T1 can commit), and later T2 will be re-tried and eventually also committed. The kill happens asynchroneously in a slave background thread; this is necessary, as the reporting from InnoDB about lock waits happen deep inside the locking code, at a point where it is not possible to directly call THD::awake() due to mutexes held. Deadlock is assumed to be (very) rarely occuring, so this patch tries to minimise the performance impact on the normal case where no deadlocks occur, rather than optimise the handling of the occasional deadlock. Also fix transaction retry due to deadlock when it happens after a transaction already signalled to later transactions that it started to commit. In this case we need to undo this signalling (and later redo it when we commit again during retry), so following transactions will not start too early. Also add a missing thd->send_kill_message() that got triggered during testing (this corrects an incorrect fix for MySQL Bug#58933).
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc14
-rw-r--r--sql/log_event.cc21
-rw-r--r--sql/mysqld.cc25
-rw-r--r--sql/mysqld.h10
-rw-r--r--sql/rpl_parallel.cc135
-rw-r--r--sql/rpl_rli.cc31
-rw-r--r--sql/rpl_rli.h2
-rw-r--r--sql/slave.cc106
-rw-r--r--sql/slave.h1
-rw-r--r--sql/sql_admin.cc2
-rw-r--r--sql/sql_base.cc3
-rw-r--r--sql/sql_class.cc64
-rw-r--r--sql/sql_class.h3
13 files changed, 370 insertions, 47 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 116ac6aed52..66be81562a5 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6847,12 +6847,6 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
else
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
}
- if (wfc && wfc->wakeup_error)
- {
- my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
- DBUG_RETURN(-1);
- }
-
/*
If the transaction we were waiting for has already put us into the group
commit queue (and possibly already done the entire binlog commit for us),
@@ -6861,6 +6855,12 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
if (orig_entry->queued_by_other)
DBUG_RETURN(0);
+ if (wfc && wfc->wakeup_error)
+ {
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ DBUG_RETURN(-1);
+ }
+
/* Now enqueue ourselves in the group commit queue. */
DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
orig_entry->thd->clear_wakeup_ready();
@@ -9064,6 +9064,8 @@ binlog_background_thread(void *arg __attribute__((unused)))
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thd->set_command(COM_DAEMON);
/*
Load the slave replication GTID state from the mysql.gtid_slave_pos
diff --git a/sql/log_event.cc b/sql/log_event.cc
index e9cd0ee3179..cf9f4242280 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -216,8 +216,19 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
thd->get_stmt_da()->sql_conditions();
Relay_log_info const *rli= rgi->rli;
const Sql_condition *err;
+ Relay_log_info const *rli= rgi->rli;
buff[0]= 0;
+ /*
+ In parallel replication, deadlocks or other temporary errors can happen
+ occasionally in normal operation, they will be handled correctly and
+ automatically by re-trying the transactions. So do not pollute the error
+ log with messages about them.
+ */
+ if (rgi->is_parallel_exec &&
+ (rgi->killed_for_retry || has_temporary_error(thd)))
+ return;
+
for (err= it++, slider= buff; err && slider < buff_end - 1;
slider += len, err= it++)
{
@@ -7306,6 +7317,13 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false);
if (err)
{
+ /*
+ Do not report an error if this is really a kill due to a deadlock.
+ In this case, the transaction will be re-tried instead.
+ */
+ if (rgi->killed_for_retry &&
+ thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED)
+ return err;
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
"Error during XID COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
@@ -9631,7 +9649,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
if (open_and_lock_tables(thd, rgi->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->get_stmt_da()->sql_errno();
- if (thd->is_slave_error || thd->is_fatal_error)
+ if ((thd->is_slave_error || thd->is_fatal_error) &&
+ !(rgi->killed_for_retry && actual_error == ER_QUERY_INTERRUPTED))
{
/*
Error reporting borrowed from Query_log_event with many excessive
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 63f392f438d..d3d262c736b 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -368,6 +368,7 @@ static I_List<THD> thread_cache;
static bool binlog_format_used= false;
LEX_STRING opt_init_connect, opt_init_slave;
static mysql_cond_t COND_thread_cache, COND_flush_thread_cache;
+mysql_cond_t COND_slave_background;
static DYNAMIC_ARRAY all_options;
/* Global variables */
@@ -706,7 +707,7 @@ mysql_mutex_t
LOCK_crypt,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi,
- LOCK_connection_count, LOCK_error_messages;
+ LOCK_connection_count, LOCK_error_messages, LOCK_slave_background;
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
@@ -880,7 +881,8 @@ PSI_mutex_key key_LOCK_stats,
key_LOCK_wakeup_ready, key_LOCK_wait_commit;
PSI_mutex_key key_LOCK_gtid_waiting;
-PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
+PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
+ key_LOCK_slave_background;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
static PSI_mutex_info all_server_mutexes[]=
@@ -943,6 +945,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
+ { &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
@@ -997,7 +1000,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer,
- key_COND_prepare_ordered;
+ key_COND_prepare_ordered, key_COND_slave_background;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
static PSI_cond_info all_server_conds[]=
@@ -1046,6 +1049,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
+ { &key_COND_slave_background, "COND_slave_background", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
};
@@ -1053,7 +1057,7 @@ static PSI_cond_info all_server_conds[]=
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
- key_thread_slave_init, key_rpl_parallel_thread;
+ key_thread_slave_background, key_rpl_parallel_thread;
static PSI_thread_info all_server_threads[]=
{
@@ -1079,7 +1083,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_main, "main", PSI_FLAG_GLOBAL},
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
- { &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL},
+ { &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};
@@ -2173,6 +2177,8 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_prepare_ordered);
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_commit_ordered);
+ mysql_mutex_destroy(&LOCK_slave_background);
+ mysql_cond_destroy(&COND_slave_background);
DBUG_VOID_RETURN;
}
@@ -4387,6 +4393,9 @@ static int init_thread_environment()
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW);
+ mysql_mutex_init(key_LOCK_slave_background, &LOCK_slave_background,
+ MY_MUTEX_INIT_SLOW);
+ mysql_cond_init(key_COND_slave_background, &COND_slave_background, NULL);
#ifdef HAVE_OPENSSL
mysql_mutex_init(key_LOCK_des_key_file,
@@ -9468,6 +9477,8 @@ PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room i
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
+PSI_stage_info stage_slave_background_process_request= { 0, "Processing requests", 0};
+PSI_stage_info stage_slave_background_wait_request= { 0, "Waiting for requests", 0};
#ifdef HAVE_PSI_INTERFACE
@@ -9591,7 +9602,9 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_to_get_readlock,
& stage_master_gtid_wait_primary,
& stage_master_gtid_wait,
- & stage_gtid_wait_other_connection
+ & stage_gtid_wait_other_connection,
+ & stage_slave_background_process_request,
+ & stage_slave_background_wait_request
};
PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index e7eea3dfa1a..5b28fefb082 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -309,8 +309,8 @@ extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
- key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init,
- key_rpl_parallel_thread;
+ key_thread_one_connection, key_thread_signal_hand,
+ key_thread_slave_background, key_rpl_parallel_thread;
extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file,
@@ -451,6 +451,8 @@ extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection;
+extern PSI_stage_info stage_slave_background_process_request;
+extern PSI_stage_info stage_slave_background_wait_request;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
@@ -518,7 +520,8 @@ extern mysql_mutex_t
LOCK_delayed_status, LOCK_delayed_create, LOCK_crypt, LOCK_timezone,
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
- LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
+ LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count,
+ LOCK_slave_background;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL
extern mysql_mutex_t LOCK_des_key_file;
@@ -529,6 +532,7 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern mysql_rwlock_t LOCK_system_variables_hash;
extern mysql_cond_t COND_thread_count;
extern mysql_cond_t COND_manager;
+extern mysql_cond_t COND_slave_background;
extern int32 thread_running;
extern int32 thread_count;
extern my_atomic_rwlock_t thread_running_lock, thread_count_lock;
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 67d61b7cf11..65461b3f990 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -156,6 +156,7 @@ finish_event_group(THD *thd, uint64 sub_id, rpl_parallel_entry *entry,
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
thd->clear_error();
+ thd->reset_killed();
thd->get_stmt_da()->reset_diagnostics_area();
wfc->wakeup_subsequent_commits(rgi->worker_error);
}
@@ -188,6 +189,25 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
}
+static void
+register_wait_for_prior_event_group_commit(rpl_group_info *rgi,
+ rpl_parallel_entry *entry)
+{
+ mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
+ if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
+ {
+ /*
+ Register that the commit of this event group must wait for the
+ commit of the previous event group to complete before it may
+ complete itself, so that we preserve commit order.
+ */
+ wait_for_commit *waitee=
+ &rgi->wait_commit_group_info->commit_orderer;
+ rgi->commit_orderer.register_wait_for_prior_commit(waitee);
+ }
+}
+
+
#ifndef DBUG_OFF
static int
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
@@ -205,6 +225,40 @@ dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
#endif
+/*
+ If we detect a deadlock due to eg. storage engine locks that conflict with
+ the fixed commit order, then the later transaction will be killed
+ asynchroneously to allow the former to complete its commit.
+
+ In this case, we convert the 'killed' error into a deadlock error, and retry
+ the later transaction. */
+static void
+convert_kill_to_deadlock_error(rpl_group_info *rgi)
+{
+ THD *thd= rgi->thd;
+
+ if (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED &&
+ rgi->killed_for_retry)
+ {
+ thd->clear_error();
+ thd->get_stmt_da()->reset_diagnostics_area();
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ rgi->killed_for_retry= false;
+ thd->reset_killed();
+ }
+}
+
+
+static bool
+is_group_ending(Log_event *ev, Log_event_type event_type)
+{
+ return event_type == XID_EVENT ||
+ (event_type == QUERY_EVENT &&
+ (((Query_log_event *)ev)->is_commit() ||
+ ((Query_log_event *)ev)->is_rollback()));
+}
+
+
static int
retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
rpl_parallel_thread::queued_event *orig_qev)
@@ -221,11 +275,46 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
ulonglong cur_offset, old_offset;
char log_name[FN_REFLEN];
THD *thd= rgi->thd;
+ rpl_parallel_entry *entry= rgi->parallel_entry;
ulong retries= 0;
do_retry:
event_count= 0;
err= 0;
+
+ /*
+ If we already started committing before getting the deadlock (or other
+ error) that caused us to need to retry, we have already signalled
+ subsequent transactions that we have started committing. This is
+ potentially a problem, as now we will rollback, and if subsequent
+ transactions would start to execute now, they could see an unexpected
+ state of the database and get eg. key not found or duplicate key error.
+
+ However, to get a deadlock in the first place, there must have been
+ another earlier transaction that is waiting for us. Thus that other
+ transaction has _not_ yet started to commit, and any subsequent
+ transactions will still be waiting at this point.
+
+ So here, we decrement back the count of transactions that started
+ committing (if we already incremented it), undoing the effect of an
+ earlier mark_start_commit(). Then later, when the retry succeeds and we
+ commit again, we can do a new mark_start_commit() and eventually wake up
+ subsequent transactions at the proper time.
+
+ We need to do the unmark before the rollback, to be sure that the
+ transaction we deadlocked with will not signal that it started to commit
+ until after the unmark.
+ */
+ rgi->unmark_start_commit();
+
+ /*
+ We might get the deadlock error that causes the retry during commit, while
+ sitting in wait_for_prior_commit(). If this happens, we will have a
+ pending error in the wait_for_commit object. So clear this by
+ unregistering (and later re-registering) the wait.
+ */
+ if(thd->wait_for_commit_ptr)
+ thd->wait_for_commit_ptr->unregister_wait_for_prior_commit();
rgi->cleanup_context(thd, 1);
mysql_mutex_lock(&rli->data_lock);
@@ -233,6 +322,10 @@ do_retry:
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
+ mysql_mutex_lock(&entry->LOCK_parallel_entry);
+ register_wait_for_prior_event_group_commit(rgi, entry);
+ mysql_mutex_unlock(&entry->LOCK_parallel_entry);
+
strcpy(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
@@ -319,6 +412,9 @@ do_retry:
err= 1;
goto err;
}
+ if (is_group_ending(ev, event_type))
+ rgi->mark_start_commit();
+
err= rpt_handle_event(qev, rpt);
++event_count;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
@@ -332,6 +428,7 @@ do_retry:
err= dbug_simulate_tmp_error(rgi, thd););
if (err)
{
+ convert_kill_to_deadlock_error(rgi);
if (has_temporary_error(thd))
{
++retries;
@@ -599,17 +696,9 @@ handle_rpl_parallel_thread(void *arg)
if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id))
skip_event_group= true;
- else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
- {
- /*
- Register that the commit of this event group must wait for the
- commit of the previous event group to complete before it may
- complete itself, so that we preserve commit order.
- */
- wait_for_commit *waitee=
- &rgi->wait_commit_group_info->commit_orderer;
- rgi->commit_orderer.register_wait_for_prior_commit(waitee);
- }
+ else
+ register_wait_for_prior_event_group_commit(rgi, entry);
+
unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry,
&did_enter_cond, &old_stage);
@@ -651,10 +740,7 @@ handle_rpl_parallel_thread(void *arg)
}
}
- group_ending= event_type == XID_EVENT ||
- (event_type == QUERY_EVENT &&
- (((Query_log_event *)events->ev)->is_commit() ||
- ((Query_log_event *)events->ev)->is_rollback()));
+ group_ending= is_group_ending(events->ev, event_type);
if (group_ending && likely(!rgi->worker_error))
{
DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit");
@@ -674,8 +760,12 @@ handle_rpl_parallel_thread(void *arg)
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
err= dbug_simulate_tmp_error(rgi, thd););
- if (err && has_temporary_error(thd))
- err= retry_event_group(rgi, rpt, events);
+ if (err)
+ {
+ convert_kill_to_deadlock_error(rgi);
+ if (has_temporary_error(thd) && slave_trans_retries > 0)
+ err= retry_event_group(rgi, rpt, events);
+ }
}
else
{
@@ -691,10 +781,14 @@ handle_rpl_parallel_thread(void *arg)
events->next= qevs_to_free;
qevs_to_free= events;
- if (unlikely(err) && !rgi->worker_error)
+ if (unlikely(err))
{
- slave_output_error_info(rgi, thd);
- signal_error_to_sql_driver_thread(thd, rgi, err);
+ if (!rgi->worker_error)
+ {
+ slave_output_error_info(rgi, thd);
+ signal_error_to_sql_driver_thread(thd, rgi, err);
+ }
+ thd->reset_killed();
}
if (end_of_group)
{
@@ -1096,6 +1190,7 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
rgi->relay_log= rli->last_inuse_relaylog;
rgi->retry_start_offset= rli->future_event_relay_log_pos-event_size;
rgi->retry_event_count= 0;
+ rgi->killed_for_retry= false;
return rgi;
}
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 688068b850f..9c315271387 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -1843,6 +1843,7 @@ rpl_group_info::mark_start_commit()
}
+<<<<<<< TREE
/*
Format the current GTID as a string suitable for printing in error messages.
@@ -1863,6 +1864,36 @@ rpl_group_info::gtid_info()
}
+=======
+/*
+ Undo the effect of a prior mark_start_commit().
+
+ This is only used for retrying a transaction in parallel replication, after
+ we have encountered a deadlock or other temporary error.
+
+ When we get such a deadlock, it means that the current group of transactions
+ did not yet all start committing (else they would not have deadlocked). So
+ we will not yet have woken up anything in the next group, our rgi->gco is
+ still live, and we can simply decrement the counter (to be incremented again
+ later, when the retry succeeds and reaches the commit step).
+*/
+void
+rpl_group_info::unmark_start_commit()
+{
+ rpl_parallel_entry *e;
+
+ if (!did_mark_start_commit)
+ return;
+
+ e= this->parallel_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ --e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ did_mark_start_commit= false;
+}
+
+
+>>>>>>> MERGE-SOURCE
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
: rpl_filter(filter)
{
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 932db0a0b7d..b44e794a795 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -646,6 +646,7 @@ struct rpl_group_info
inuse_relaylog *relay_log;
uint64 retry_start_offset;
uint64 retry_event_count;
+ bool killed_for_retry;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
@@ -735,6 +736,7 @@ struct rpl_group_info
void mark_start_commit_no_lock();
void mark_start_commit();
char *gtid_info();
+ void unmark_start_commit();
time_t get_row_stmt_start_timestamp()
{
diff --git a/sql/slave.cc b/sql/slave.cc
index 59375297448..3d84dfe36ef 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -287,13 +287,22 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */
-static bool slave_init_thread_running;
+static bool slave_background_thread_running;
+static bool slave_background_thread_gtid_loaded;
+
+struct slave_background_kill_t {
+ slave_background_kill_t *next;
+ THD *to_kill;
+ int errcode;
+} *slave_background_kill_list;
pthread_handler_t
-handle_slave_init(void *arg __attribute__((unused)))
+handle_slave_background(void *arg __attribute__((unused)))
{
THD *thd;
+ PSI_stage_info old_stage;
+ bool stop;
my_thread_init();
thd= new THD;
@@ -301,7 +310,10 @@ handle_slave_init(void *arg __attribute__((unused)))
mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
+ thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thd->store_globals();
+ thd->security_ctx->skip_grants();
+ thd->set_command(COM_DAEMON);
thd_proc_info(thd, "Loading slave GTID position from table");
if (rpl_load_gtid_slave_state(thd))
@@ -312,12 +324,52 @@ handle_slave_init(void *arg __attribute__((unused)))
thd->get_stmt_da()->message());
mysql_mutex_lock(&LOCK_thread_count);
+ threads.append(thd);
+ slave_background_thread_gtid_loaded= true;
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ THD_STAGE_INFO(thd, stage_slave_background_process_request);
+ do
+ {
+ slave_background_kill_t *kill_list;
+
+ mysql_mutex_lock(&LOCK_slave_background);
+ thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
+ &stage_slave_background_wait_request,
+ &old_stage);
+ for (;;)
+ {
+ stop= abort_loop || thd->killed;
+ kill_list= slave_background_kill_list;
+ if (stop || kill_list)
+ break;
+ mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
+ }
+
+ slave_background_kill_list= NULL;
+ thd->EXIT_COND(&old_stage);
+
+ while (kill_list)
+ {
+ slave_background_kill_t *p = kill_list;
+ kill_list= p->next;
+
+ mysql_mutex_lock(&p->to_kill->LOCK_thd_data);
+ /* ToDo: mark the p->errcode error code somehow ... ? */
+ p->to_kill->awake(KILL_QUERY);
+ mysql_mutex_unlock(&p->to_kill->LOCK_thd_data);
+ my_free(p);
+ }
+ } while (!stop);
+
+ mysql_mutex_lock(&LOCK_thread_count);
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
my_thread_end();
mysql_mutex_lock(&LOCK_thread_count);
- slave_init_thread_running= false;
+ slave_background_thread_running= false;
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
@@ -325,21 +377,57 @@ handle_slave_init(void *arg __attribute__((unused)))
}
+void
+slave_background_kill_request(THD *to_kill, int errcode)
+{
+ slave_background_kill_t *p=
+ (slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
+ if (p)
+ {
+ p->to_kill= to_kill;
+ p->errcode= errcode;
+ to_kill->rgi_slave->killed_for_retry= true;
+ mysql_mutex_lock(&LOCK_slave_background);
+ p->next= slave_background_kill_list;
+ slave_background_kill_list= p;
+ mysql_mutex_unlock(&LOCK_slave_background);
+ mysql_cond_signal(&COND_slave_background);
+ }
+}
+
+
+/*
+ Start the slave background thread.
+
+ This thread is currently used for two purposes:
+
+ 1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
+ from table requires valid THD, which is otherwise not available during
+ server init.
+
+ 2. To kill worker thread transactions during parallel replication, when a
+ storage engine attempts to take an errorneous conflicting lock that would
+ cause a deadlock. Killing is done asynchroneously, as the kill may not
+ be safe within the context of a callback from inside storage engine
+ locking code.
+*/
static int
-run_slave_init_thread()
+start_slave_background_thread()
{
pthread_t th;
- slave_init_thread_running= true;
- if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
- handle_slave_init, NULL))
+ slave_background_thread_running= true;
+ slave_background_thread_gtid_loaded= false;
+ if (mysql_thread_create(key_thread_slave_background,
+ &th, &connection_attrib, handle_slave_background,
+ NULL))
{
sql_print_error("Failed to create thread while initialising slave");
return 1;
}
mysql_mutex_lock(&LOCK_thread_count);
- while (slave_init_thread_running)
+ while (!slave_background_thread_gtid_loaded)
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
@@ -358,7 +446,7 @@ int init_slave()
init_slave_psi_keys();
#endif
- if (run_slave_init_thread())
+ if (start_slave_background_thread())
return 1;
/*
diff --git a/sql/slave.h b/sql/slave.h
index 4b5bc1686fb..467e6fcc949 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -238,6 +238,7 @@ pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname);
+void slave_background_kill_request(THD *to_kill, int errcode);
extern bool volatile abort_loop;
extern Master_info main_mi, *active_mi; /* active_mi for multi-master */
diff --git a/sql/sql_admin.cc b/sql/sql_admin.cc
index 34a076cc327..0d42294430d 100644
--- a/sql/sql_admin.cc
+++ b/sql/sql_admin.cc
@@ -914,7 +914,7 @@ send_result_message:
protocol->store(operator_name, system_charset_info);
if (result_code) // either mysql_recreate_table or analyze failed
{
- DBUG_ASSERT(thd->is_error() || thd->killed);
+ DBUG_ASSERT(thd->is_error());
if (thd->is_error())
{
const char *err_msg= thd->get_stmt_da()->message();
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 055806609e7..8d5d5058ed1 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -2084,7 +2084,10 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
DBUG_RETURN(TRUE);
if (!(flags & MYSQL_OPEN_IGNORE_KILLED) && thd->killed)
+ {
+ thd->send_kill_message();
DBUG_RETURN(TRUE);
+ }
/*
Check if we're trying to take a write lock in a read only transaction.
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index a48ebd450bb..4449a77a715 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -4217,6 +4217,70 @@ extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
}
+extern "C" int
+thd_need_wait_for(const MYSQL_THD thd)
+{
+ return thd && thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
+}
+
+extern "C" void
+thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
+{
+ rpl_group_info *rgi;
+ rpl_group_info *other_rgi;
+
+ if (!thd || !other_thd)
+ return;
+ rgi= thd->rgi_slave;
+ other_rgi= other_thd->rgi_slave;
+ if (!rgi || !other_rgi)
+ return;
+ if (!rgi->is_parallel_exec)
+ return;
+ if (rgi->rli != other_rgi->rli)
+ return;
+ if (!rgi->gtid_sub_id)
+ return;
+ if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
+ return;
+ if (rgi->gtid_sub_id > other_rgi->gtid_sub_id)
+ return;
+ /*
+ This transaction is about to wait for another transaction that is required
+ by replication binlog order to commit after. This would cause a deadlock.
+
+ So send a kill to the other transaction, with a temporary error; this will
+ cause replication to rollback (and later re-try) the other transaction,
+ releasing the lock for this transaction so replication can proceed.
+ */
+
+#ifdef HAVE_REPLICATION
+ slave_background_kill_request(other_thd, ER_LOCK_DEADLOCK);
+#endif
+}
+
+extern "C" int
+thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
+{
+ rpl_group_info *rgi= thd->rgi_slave;
+ rpl_group_info *other_rgi= other_thd->rgi_slave;
+ if (!rgi || !other_rgi)
+ return 1;
+ if (!rgi->is_parallel_exec)
+ return 1;
+ if (rgi->rli != other_rgi->rli)
+ return 1;
+ if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
+ return 1;
+ /*
+ These two threads are doing parallel replication within the same
+ replication domain. Their commit order is already fixed, so we do not need
+ gap locks or similar to otherwise enforce ordering (and in fact such locks
+ could lead to unnecessary deadlocks and transaction retry).
+ */
+ return 0;
+}
+
extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
{
return(thd->transaction.all.modified_non_trans_table);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 5898d9e2cf8..986e371bb4e 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1357,7 +1357,8 @@ enum enum_thread_type
SYSTEM_THREAD_NDBCLUSTER_BINLOG= 8,
SYSTEM_THREAD_EVENT_SCHEDULER= 16,
SYSTEM_THREAD_EVENT_WORKER= 32,
- SYSTEM_THREAD_BINLOG_BACKGROUND= 64
+ SYSTEM_THREAD_BINLOG_BACKGROUND= 64,
+ SYSTEM_THREAD_SLAVE_BACKGROUND= 128,
};
inline char const *