summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <knielsen@knielsen-hq.org>2013-10-14 15:28:16 +0200
committerunknown <knielsen@knielsen-hq.org>2013-10-14 15:28:16 +0200
commit2842f6b5dc254c82aa3dc976cd5bd3645dc82a60 (patch)
treeadefb3cd2b2e5e1f41652d12cc597929f55f80d0
parent2e100cc5a493b6a0f6f907e0483a734c7fee2087 (diff)
downloadmariadb-git-2842f6b5dc254c82aa3dc976cd5bd3645dc82a60.tar.gz
MDEV-4506: Parallel replication: error handling.
Add an error code to the wait_for_commit facility. Now, when a transaction fails, it can signal the error to any subsequent transaction that is waiting for it to commit. The waiting transactions then receive the error code back from wait_for_prior_commit() and can handle the error appropriately. Also fix one race that could cause crash if @@slave_parallel_threads were changed several times quickly in succession.
-rw-r--r--include/mysql/plugin.h2
-rw-r--r--include/mysql/plugin_audit.h.pp2
-rw-r--r--include/mysql/plugin_auth.h.pp2
-rw-r--r--include/mysql/plugin_ftparser.h.pp2
-rw-r--r--sql/handler.cc11
-rw-r--r--sql/log.cc9
-rw-r--r--sql/rpl_parallel.cc15
-rw-r--r--sql/share/errmsg-utf8.txt2
-rw-r--r--sql/sql_class.cc18
-rw-r--r--sql/sql_class.h32
-rw-r--r--storage/innobase/handler/ha_innodb.cc2
-rw-r--r--storage/xtradb/handler/ha_innodb.cc2
12 files changed, 62 insertions, 37 deletions
diff --git a/include/mysql/plugin.h b/include/mysql/plugin.h
index ab72a9d106b..9ac63f08f73 100644
--- a/include/mysql/plugin.h
+++ b/include/mysql/plugin.h
@@ -716,7 +716,7 @@ void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
thd_wakeup_subsequent_commits() is only needed when no transaction
coordinator is used, meaning a single storage engine and no binary log.
*/
-void thd_wakeup_subsequent_commits(MYSQL_THD thd);
+void thd_wakeup_subsequent_commits(MYSQL_THD thd, int wakeup_error);
#ifdef __cplusplus
}
diff --git a/include/mysql/plugin_audit.h.pp b/include/mysql/plugin_audit.h.pp
index 564dd6272f5..17e5c191672 100644
--- a/include/mysql/plugin_audit.h.pp
+++ b/include/mysql/plugin_audit.h.pp
@@ -236,7 +236,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
-void thd_wakeup_subsequent_commits(void* thd);
+void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
struct mysql_event_general
{
unsigned int event_subclass;
diff --git a/include/mysql/plugin_auth.h.pp b/include/mysql/plugin_auth.h.pp
index edfd7095203..33b552ec75b 100644
--- a/include/mysql/plugin_auth.h.pp
+++ b/include/mysql/plugin_auth.h.pp
@@ -236,7 +236,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
-void thd_wakeup_subsequent_commits(void* thd);
+void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
#include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info
{
diff --git a/include/mysql/plugin_ftparser.h.pp b/include/mysql/plugin_ftparser.h.pp
index 0cc51e259dc..b4aa962c51c 100644
--- a/include/mysql/plugin_ftparser.h.pp
+++ b/include/mysql/plugin_ftparser.h.pp
@@ -189,7 +189,7 @@ void mysql_query_cache_invalidate4(void* thd,
void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
-void thd_wakeup_subsequent_commits(void* thd);
+void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
enum enum_ftparser_mode
{
MYSQL_FTPARSER_SIMPLE_MODE= 0,
diff --git a/sql/handler.cc b/sql/handler.cc
index c42204b27d1..672e1cb4e42 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1458,10 +1458,11 @@ int ha_commit_one_phase(THD *thd, bool all)
transaction.all.ha_list, see why in trans_register_ha()).
*/
bool is_real_trans=all || thd->transaction.all.ha_list == 0;
+ int res;
DBUG_ENTER("ha_commit_one_phase");
- if (is_real_trans)
- thd->wait_for_prior_commit();
- int res= commit_one_phase_2(thd, all, trans, is_real_trans);
+ if (is_real_trans && (res= thd->wait_for_prior_commit()))
+ DBUG_RETURN(res);
+ res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res);
}
@@ -1501,7 +1502,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
{
- thd->wakeup_subsequent_commits();
+ thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
}
@@ -1579,7 +1580,7 @@ int ha_rollback_trans(THD *thd, bool all)
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
{
- thd->wakeup_subsequent_commits();
+ thd->wakeup_subsequent_commits(error);
thd->transaction.cleanup();
}
if (all)
diff --git a/sql/log.cc b/sql/log.cc
index dd6eeb3678c..95091875d83 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -6743,7 +6743,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
cur->wakeup_subsequent_commits_running= true;
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
- waiter->wakeup();
+ waiter->wakeup(0);
}
waiter= next;
}
@@ -6849,7 +6849,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
field.
*/
if (next->queued_by_other)
- next->thd->wait_for_commit_ptr->wakeup();
+ next->thd->wait_for_commit_ptr->wakeup(entry->error);
else
next->thd->signal_wakeup_ready();
}
@@ -7145,7 +7145,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
if (current != leader) // Don't wake up ourself
{
if (current->queued_by_other)
- current->thd->wait_for_commit_ptr->wakeup();
+ current->thd->wait_for_commit_ptr->wakeup(current->error);
else
current->thd->signal_wakeup_ready();
}
@@ -7844,7 +7844,8 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all,
mysql_mutex_unlock(&LOCK_prepare_ordered);
}
- thd->wait_for_prior_commit();
+ if (thd->wait_for_prior_commit())
+ return 0;
cookie= 0;
if (xid)
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index c10a035c599..c6411b01e60 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -52,7 +52,7 @@
struct rpl_parallel_thread_pool global_rpl_thread_pool;
-static void
+static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
@@ -70,6 +70,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
/* ToDo: error handling. */
+ return err;
}
@@ -104,6 +105,7 @@ handle_rpl_parallel_thread(void *arg)
bool group_standalone= true;
bool in_event_group= false;
uint64 event_gtid_sub_id= 0;
+ int err;
struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg;
@@ -139,6 +141,7 @@ handle_rpl_parallel_thread(void *arg)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
rpt->running= true;
+ mysql_cond_signal(&rpt->COND_rpl_thread);
while (!rpt->stop && !thd->killed)
{
@@ -163,6 +166,7 @@ handle_rpl_parallel_thread(void *arg)
uint64 wait_start_sub_id;
bool end_of_group;
+ err= 0;
/* Handle a new event group, which will be initiated by a GTID event. */
if (event_type == GTID_EVENT)
{
@@ -221,9 +225,9 @@ handle_rpl_parallel_thread(void *arg)
everything is stopped and cleaned up correctly.
*/
if (!sql_worker_killed(thd, rgi, in_event_group))
- rpt_handle_event(events, rpt);
+ err= rpt_handle_event(events, rpt);
else
- thd->wait_for_prior_commit();
+ err= thd->wait_for_prior_commit();
end_of_group=
in_event_group &&
@@ -272,7 +276,7 @@ handle_rpl_parallel_thread(void *arg)
}
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
- rgi->commit_orderer.wakeup_subsequent_commits();
+ rgi->commit_orderer.wakeup_subsequent_commits(err);
delete rgi;
}
@@ -431,6 +435,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread);
pool->threads[i]->delay_start= false;
mysql_cond_signal(&pool->threads[i]->COND_rpl_thread);
+ while (!pool->threads[i]->running)
+ mysql_cond_wait(&pool->threads[i]->COND_rpl_thread,
+ &pool->threads[i]->LOCK_rpl_thread);
mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread);
}
diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt
index 85baddd3c49..51bc7eaf93d 100644
--- a/sql/share/errmsg-utf8.txt
+++ b/sql/share/errmsg-utf8.txt
@@ -6557,3 +6557,5 @@ ER_STORED_FUNCTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO
eng "Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a stored function or trigger"
ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE
eng "Cannot change @@slave_parallel_threads while another change is in progress"
+ER_PRIOR_COMMIT_FAILED
+ eng "Commit failed due to failure of an earlier commit on which this one depends"
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index f424e34969d..65fed2e8f98 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -610,9 +610,9 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton,
@see thd_wakeup_subsequent_commits() definition in plugin.h
*/
extern "C"
-void thd_wakeup_subsequent_commits(THD *thd)
+void thd_wakeup_subsequent_commits(THD *thd, int wakeup_error)
{
- thd->wakeup_subsequent_commits();
+ thd->wakeup_subsequent_commits(wakeup_error);
}
@@ -5618,7 +5618,8 @@ bool THD::rgi_have_temporary_tables()
wait_for_commit::wait_for_commit()
: subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
opaque_pointer(0),
- waiting_for_commit(false), wakeup_subsequent_commits_running(false)
+ waiting_for_commit(false), wakeup_error(0),
+ wakeup_subsequent_commits_running(false)
{
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
@@ -5633,7 +5634,7 @@ wait_for_commit::~wait_for_commit()
void
-wait_for_commit::wakeup()
+wait_for_commit::wakeup(int wakeup_error)
{
/*
We signal each waiter on their own condition and mutex (rather than using
@@ -5649,6 +5650,7 @@ wait_for_commit::wakeup()
*/
mysql_mutex_lock(&LOCK_wait_commit);
waiting_for_commit= false;
+ this->wakeup_error= wakeup_error;
mysql_mutex_unlock(&LOCK_wait_commit);
mysql_cond_signal(&COND_wait_commit);
}
@@ -5675,6 +5677,7 @@ void
wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
{
waiting_for_commit= true;
+ wakeup_error= 0;
DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
this->waitee= waitee;
@@ -5704,7 +5707,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
with register_wait_for_prior_commit(). If the commit already completed,
returns immediately.
*/
-void
+int
wait_for_commit::wait_for_prior_commit2()
{
mysql_mutex_lock(&LOCK_wait_commit);
@@ -5712,6 +5715,7 @@ wait_for_commit::wait_for_prior_commit2()
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
mysql_mutex_unlock(&LOCK_wait_commit);
waitee= NULL;
+ return wakeup_error;
}
@@ -5755,7 +5759,7 @@ wait_for_commit::wait_for_prior_commit2()
*/
void
-wait_for_commit::wakeup_subsequent_commits2()
+wait_for_commit::wakeup_subsequent_commits2(int wakeup_error)
{
wait_for_commit *waiter;
@@ -5772,7 +5776,7 @@ wait_for_commit::wakeup_subsequent_commits2()
once the wakeup is done, the field could be invalidated at any time.
*/
wait_for_commit *next= waiter->next_subsequent_commit;
- waiter->wakeup();
+ waiter->wakeup(wakeup_error);
waiter= next;
}
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 01121fd5b35..567eaf5c351 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -1614,6 +1614,8 @@ struct wait_for_commit
cleared.
*/
bool waiting_for_commit;
+ /* The wakeup error code from the waitee. 0 means no error. */
+ int wakeup_error;
/*
Flag set when wakeup_subsequent_commits_running() is active, see comments
on that function for details.
@@ -1621,16 +1623,18 @@ struct wait_for_commit
bool wakeup_subsequent_commits_running;
void register_wait_for_prior_commit(wait_for_commit *waitee);
- void wait_for_prior_commit()
+ int wait_for_prior_commit()
{
/*
Quick inline check, to avoid function call and locking in the common case
where no wakeup is registered, or a registered wait was already signalled.
*/
if (waiting_for_commit)
- wait_for_prior_commit2();
+ return wait_for_prior_commit2();
+ else
+ return wakeup_error;
}
- void wakeup_subsequent_commits()
+ void wakeup_subsequent_commits(int wakeup_error)
{
/*
Do the check inline, so only the wakeup case takes the cost of a function
@@ -1645,7 +1649,7 @@ struct wait_for_commit
prevent a waiter from arriving just after releasing the lock.
*/
if (subsequent_commits_list)
- wakeup_subsequent_commits2();
+ wakeup_subsequent_commits2(wakeup_error);
}
void unregister_wait_for_prior_commit()
{
@@ -1653,10 +1657,10 @@ struct wait_for_commit
unregister_wait_for_prior_commit2();
}
- void wakeup();
+ void wakeup(int wakeup_error);
- void wait_for_prior_commit2();
- void wakeup_subsequent_commits2();
+ int wait_for_prior_commit2();
+ void wakeup_subsequent_commits2(int wakeup_error);
void unregister_wait_for_prior_commit2();
wait_for_commit();
@@ -3308,15 +3312,21 @@ public:
void signal_wakeup_ready();
wait_for_commit *wait_for_commit_ptr;
- void wait_for_prior_commit()
+ int wait_for_prior_commit()
{
if (wait_for_commit_ptr)
- wait_for_commit_ptr->wait_for_prior_commit();
+ {
+ int err= wait_for_commit_ptr->wait_for_prior_commit();
+ if (err)
+ my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
+ return err;
+ }
+ return 0;
}
- void wakeup_subsequent_commits()
+ void wakeup_subsequent_commits(int wakeup_error)
{
if (wait_for_commit_ptr)
- wait_for_commit_ptr->wakeup_subsequent_commits();
+ wait_for_commit_ptr->wakeup_subsequent_commits(wakeup_error);
}
private:
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 4d4bb7bd1f3..f32037244c5 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -2927,7 +2927,7 @@ innobase_commit(
/* At this point commit order is fixed and transaction is
visible to others. So we can wakeup other commits waiting for
this one, to allow then to group commit with us. */
- thd_wakeup_subsequent_commits(thd);
+ thd_wakeup_subsequent_commits(thd, 0);
/* We did the first part already in innobase_commit_ordered(),
Now finish by doing a write + flush of logs. */
diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc
index e80810d3948..2efcb15cba6 100644
--- a/storage/xtradb/handler/ha_innodb.cc
+++ b/storage/xtradb/handler/ha_innodb.cc
@@ -3588,7 +3588,7 @@ innobase_commit(
/* At this point commit order is fixed and transaction is
visible to others. So we can wakeup other commits waiting for
this one, to allow then to group commit with us. */
- thd_wakeup_subsequent_commits(thd);
+ thd_wakeup_subsequent_commits(thd, 0);
/* We did the first part already in innobase_commit_ordered(),
Now finish by doing a write + flush of logs. */