summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-12-05 14:36:09 +0100
committerunknown <knielsen@knielsen-hq.org>2013-12-05 14:36:09 +0100
commit4d6ee2d1197c0e5ebecd019d6625d7d05bf1cab8 (patch)
treeb22b03c739c9d47e0c64ff489ae97c9041f1a5f2 /sql
parent4bce09c104f5c9ddc3600a77c9db44dfa7fd283c (diff)
downloadmariadb-git-4d6ee2d1197c0e5ebecd019d6625d7d05bf1cab8.tar.gz
MDEV-5363: Make parallel replication waits killable
Make wait_for_prior_commit killable, and handle the error if killed.
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc46
-rw-r--r--sql/log.h2
-rw-r--r--sql/rpl_parallel.cc2
-rw-r--r--sql/sql_class.cc21
-rw-r--r--sql/sql_class.h13
5 files changed, 59 insertions, 25 deletions
diff --git a/sql/log.cc b/sql/log.cc
index ee7e22cbe0c..9cddb5a8e75 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6612,12 +6612,13 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
to commit. If so, we add those to the queue as well, transitively for all
waiters.
- @retval TRUE If queued as the first entry in the queue (meaning this
- is the leader)
- @retval FALSE Otherwise
+ @retval < 0 Error
+ @retval > 0 If queued as the first entry in the queue (meaning this
+ is the leader)
+ @retval 0 Otherwise (queued as participant, leader handles the commit)
*/
-bool
+int
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{
group_commit_entry *entry, *orig_queue;
@@ -6641,6 +6642,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
/* Do an extra check here, this time safely under lock. */
if (wfc->waiting_for_commit)
{
+ const char *old_msg;
/*
By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before
@@ -6651,16 +6653,36 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
queued_by_other flag is set.
*/
wfc->opaque_pointer= orig_entry;
+ old_msg=
+ orig_entry->thd->enter_cond(&wfc->COND_wait_commit,
+ &wfc->LOCK_wait_commit,
+ "Waiting for prior transaction to commit");
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
- do
- {
+ while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
- } while (wfc->waiting_for_commit);
wfc->opaque_pointer= NULL;
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
orig_entry->queued_by_other));
+ orig_entry->thd->exit_cond(old_msg);
+
+ if (wfc->waiting_for_commit)
+ {
+ /* Interrupted by kill. */
+ wfc->wakeup_error= orig_entry->thd->killed_errno();
+ if (wfc->wakeup_error)
+ wfc->wakeup_error= ER_QUERY_INTERRUPTED;
+ my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
+ DBUG_RETURN(-1);
+ }
+ }
+ else
+ mysql_mutex_unlock(&wfc->LOCK_wait_commit);
+
+ if (wfc->wakeup_error)
+ {
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ DBUG_RETURN(-1);
}
- mysql_mutex_unlock(&wfc->LOCK_wait_commit);
}
/*
@@ -6669,7 +6691,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
then there is nothing else to do.
*/
if (orig_entry->queued_by_other)
- DBUG_RETURN(false);
+ DBUG_RETURN(0);
/* Now enqueue ourselves in the group commit queue. */
DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue");
@@ -6841,13 +6863,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
- bool is_leader= queue_for_group_commit(entry);
+ int is_leader= queue_for_group_commit(entry);
/*
The first in the queue handles group commit for all; the others just wait
to be signalled when group commit is done.
*/
- if (is_leader)
+ if (is_leader < 0)
+ return true; /* Error */
+ else if (is_leader)
trx_group_commit_leader(entry);
else if (!entry->queued_by_other)
entry->thd->wait_for_wakeup_ready();
diff --git a/sql/log.h b/sql/log.h
index 73518d2594f..45381152d97 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
void do_checkpoint_request(ulong binlog_id);
void purge();
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
- bool queue_for_group_commit(group_commit_entry *entry);
+ int queue_for_group_commit(group_commit_entry *entry);
bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock();
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 109db6ef681..0de164483d4 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -142,7 +142,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id,
if (err)
wfc->unregister_wait_for_prior_commit();
else
- wfc->wait_for_prior_commit();
+ err= wfc->wait_for_prior_commit(thd);
thd->wait_for_commit_ptr= NULL;
/*
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index d1edec140b6..1898bae1499 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5783,13 +5783,28 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
returns immediately.
*/
int
-wait_for_commit::wait_for_prior_commit2()
+wait_for_commit::wait_for_prior_commit2(THD *thd)
{
+ const char *old_msg;
+
mysql_mutex_lock(&LOCK_wait_commit);
- while (waiting_for_commit)
+ old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit,
+ "Waiting for prior transaction to commit");
+ while (waiting_for_commit && !thd->check_killed())
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
- mysql_mutex_unlock(&LOCK_wait_commit);
+ thd->exit_cond(old_msg);
waitee= NULL;
+ if (!waiting_for_commit)
+ {
+ if (wakeup_error)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ return wakeup_error;
+ }
+ /* Wait was interrupted by kill, so give the error. */
+ wakeup_error= thd->killed_errno();
+ if (!wakeup_error)
+ wakeup_error= ER_QUERY_INTERRUPTED;
+ my_message(wakeup_error, ER(wakeup_error), MYF(0));
return wakeup_error;
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 9f091ac2c2b..083cf0b8c04 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1627,14 +1627,14 @@ struct wait_for_commit
bool wakeup_subsequent_commits_running;
void register_wait_for_prior_commit(wait_for_commit *waitee);
- int wait_for_prior_commit()
+ int wait_for_prior_commit(THD *thd)
{
/*
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
if (waiting_for_commit)
- return wait_for_prior_commit2();
+ return wait_for_prior_commit2(thd);
else
return wakeup_error;
}
@@ -1663,7 +1663,7 @@ struct wait_for_commit
void wakeup(int wakeup_error);
- int wait_for_prior_commit2();
+ int wait_for_prior_commit2(THD *thd);
void wakeup_subsequent_commits2(int wakeup_error);
void unregister_wait_for_prior_commit2();
@@ -3334,12 +3334,7 @@ public:
int wait_for_prior_commit()
{
if (wait_for_commit_ptr)
- {
- int err= wait_for_commit_ptr->wait_for_prior_commit();
- if (err)
- my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
- return err;
- }
+ return wait_for_commit_ptr->wait_for_prior_commit(this);
return 0;
}
void wakeup_subsequent_commits(int wakeup_error)