diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-12-05 14:36:09 +0100 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-12-05 14:36:09 +0100 |
commit | 4d6ee2d1197c0e5ebecd019d6625d7d05bf1cab8 (patch) | |
tree | b22b03c739c9d47e0c64ff489ae97c9041f1a5f2 /sql | |
parent | 4bce09c104f5c9ddc3600a77c9db44dfa7fd283c (diff) | |
download | mariadb-git-4d6ee2d1197c0e5ebecd019d6625d7d05bf1cab8.tar.gz |
MDEV-5363: Make parallel replication waits killable
Make wait_for_prior_commit killable, and handle the error if
killed.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log.cc | 46 | ||||
-rw-r--r-- | sql/log.h | 2 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 21 | ||||
-rw-r--r-- | sql/sql_class.h | 13 |
5 files changed, 59 insertions, 25 deletions
diff --git a/sql/log.cc b/sql/log.cc index ee7e22cbe0c..9cddb5a8e75 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6612,12 +6612,13 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, to commit. If so, we add those to the queue as well, transitively for all waiters. - @retval TRUE If queued as the first entry in the queue (meaning this - is the leader) - @retval FALSE Otherwise + @retval < 0 Error + @retval > 0 If queued as the first entry in the queue (meaning this + is the leader) + @retval 0 Otherwise (queued as participant, leader handles the commit) */ -bool +int MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) { group_commit_entry *entry, *orig_queue; @@ -6641,6 +6642,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) /* Do an extra check here, this time safely under lock. */ if (wfc->waiting_for_commit) { + const char *old_msg; /* By setting wfc->opaque_pointer to our own entry, we mark that we are ready to commit, but waiting for another transaction to commit before @@ -6651,16 +6653,36 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) queued_by_other flag is set. */ wfc->opaque_pointer= orig_entry; + old_msg= + orig_entry->thd->enter_cond(&wfc->COND_wait_commit, + &wfc->LOCK_wait_commit, + "Waiting for prior transaction to commit"); DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); - do - { + while (wfc->waiting_for_commit && !orig_entry->thd->check_killed()) mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit); - } while (wfc->waiting_for_commit); wfc->opaque_pointer= NULL; DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d", orig_entry->queued_by_other)); + orig_entry->thd->exit_cond(old_msg); + + if (wfc->waiting_for_commit) + { + /* Interrupted by kill. */ + wfc->wakeup_error= orig_entry->thd->killed_errno(); + if (wfc->wakeup_error) + wfc->wakeup_error= ER_QUERY_INTERRUPTED; + my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0)); + DBUG_RETURN(-1); + } + } + else + mysql_mutex_unlock(&wfc->LOCK_wait_commit); + + if (wfc->wakeup_error) + { + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + DBUG_RETURN(-1); } - mysql_mutex_unlock(&wfc->LOCK_wait_commit); } /* @@ -6669,7 +6691,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) then there is nothing else to do. */ if (orig_entry->queued_by_other) - DBUG_RETURN(false); + DBUG_RETURN(0); /* Now enqueue ourselves in the group commit queue. */ DEBUG_SYNC(orig_entry->thd, "commit_before_enqueue"); @@ -6841,13 +6863,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) bool MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) { - bool is_leader= queue_for_group_commit(entry); + int is_leader= queue_for_group_commit(entry); /* The first in the queue handles group commit for all; the others just wait to be signalled when group commit is done. */ - if (is_leader) + if (is_leader < 0) + return true; /* Error */ + else if (is_leader) trx_group_commit_leader(entry); else if (!entry->queued_by_other) entry->thd->wait_for_wakeup_ready(); diff --git a/sql/log.h b/sql/log.h index 73518d2594f..45381152d97 100644 --- a/sql/log.h +++ b/sql/log.h @@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG void do_checkpoint_request(ulong binlog_id); void purge(); int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id); - bool queue_for_group_commit(group_commit_entry *entry); + int queue_for_group_commit(group_commit_entry *entry); bool write_transaction_to_binlog_events(group_commit_entry *entry); void trx_group_commit_leader(group_commit_entry *leader); bool is_xidlist_idle_nolock(); diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 109db6ef681..0de164483d4 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -142,7 +142,7 @@ finish_event_group(THD *thd, int err, uint64 sub_id, if (err) wfc->unregister_wait_for_prior_commit(); else - wfc->wait_for_prior_commit(); + err= wfc->wait_for_prior_commit(thd); thd->wait_for_commit_ptr= NULL; /* diff --git a/sql/sql_class.cc b/sql/sql_class.cc index d1edec140b6..1898bae1499 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -5783,13 +5783,28 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) returns immediately. */ int -wait_for_commit::wait_for_prior_commit2() +wait_for_commit::wait_for_prior_commit2(THD *thd) { + const char *old_msg; + mysql_mutex_lock(&LOCK_wait_commit); - while (waiting_for_commit) + old_msg= thd->enter_cond(&COND_wait_commit, &LOCK_wait_commit, + "Waiting for prior transaction to commit"); + while (waiting_for_commit && !thd->check_killed()) mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); - mysql_mutex_unlock(&LOCK_wait_commit); + thd->exit_cond(old_msg); waitee= NULL; + if (!waiting_for_commit) + { + if (wakeup_error) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + return wakeup_error; + } + /* Wait was interrupted by kill, so give the error. */ + wakeup_error= thd->killed_errno(); + if (!wakeup_error) + wakeup_error= ER_QUERY_INTERRUPTED; + my_message(wakeup_error, ER(wakeup_error), MYF(0)); return wakeup_error; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 9f091ac2c2b..083cf0b8c04 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1627,14 +1627,14 @@ struct wait_for_commit bool wakeup_subsequent_commits_running; void register_wait_for_prior_commit(wait_for_commit *waitee); - int wait_for_prior_commit() + int wait_for_prior_commit(THD *thd) { /* Quick inline check, to avoid function call and locking in the common case where no wakeup is registered, or a registered wait was already signalled. */ if (waiting_for_commit) - return wait_for_prior_commit2(); + return wait_for_prior_commit2(thd); else return wakeup_error; } @@ -1663,7 +1663,7 @@ struct wait_for_commit void wakeup(int wakeup_error); - int wait_for_prior_commit2(); + int wait_for_prior_commit2(THD *thd); void wakeup_subsequent_commits2(int wakeup_error); void unregister_wait_for_prior_commit2(); @@ -3334,12 +3334,7 @@ public: int wait_for_prior_commit() { if (wait_for_commit_ptr) - { - int err= wait_for_commit_ptr->wait_for_prior_commit(); - if (err) - my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); - return err; - } + return wait_for_commit_ptr->wait_for_prior_commit(this); return 0; } void wakeup_subsequent_commits(int wakeup_error) |