diff options
author | unknown <knielsen@knielsen-hq.org> | 2013-10-14 15:28:16 +0200 |
---|---|---|
committer | unknown <knielsen@knielsen-hq.org> | 2013-10-14 15:28:16 +0200 |
commit | 2842f6b5dc254c82aa3dc976cd5bd3645dc82a60 (patch) | |
tree | adefb3cd2b2e5e1f41652d12cc597929f55f80d0 | |
parent | 2e100cc5a493b6a0f6f907e0483a734c7fee2087 (diff) | |
download | mariadb-git-2842f6b5dc254c82aa3dc976cd5bd3645dc82a60.tar.gz |
MDEV-4506: Parallel replication: error handling.
Add an error code to the wait_for_commit facility.
Now, when a transaction fails, it can signal the error to
any subsequent transaction that is waiting for it to commit.
The waiting transactions then receive the error code back from
wait_for_prior_commit() and can handle the error appropriately.
Also fix one race that could cause crash if @@slave_parallel_threads
were changed several times quickly in succession.
-rw-r--r-- | include/mysql/plugin.h | 2 | ||||
-rw-r--r-- | include/mysql/plugin_audit.h.pp | 2 | ||||
-rw-r--r-- | include/mysql/plugin_auth.h.pp | 2 | ||||
-rw-r--r-- | include/mysql/plugin_ftparser.h.pp | 2 | ||||
-rw-r--r-- | sql/handler.cc | 11 | ||||
-rw-r--r-- | sql/log.cc | 9 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 15 | ||||
-rw-r--r-- | sql/share/errmsg-utf8.txt | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 18 | ||||
-rw-r--r-- | sql/sql_class.h | 32 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 2 | ||||
-rw-r--r-- | storage/xtradb/handler/ha_innodb.cc | 2 |
12 files changed, 62 insertions, 37 deletions
diff --git a/include/mysql/plugin.h b/include/mysql/plugin.h index ab72a9d106b..9ac63f08f73 100644 --- a/include/mysql/plugin.h +++ b/include/mysql/plugin.h @@ -716,7 +716,7 @@ void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton, thd_wakeup_subsequent_commits() is only needed when no transaction coordinator is used, meaning a single storage engine and no binary log. */ -void thd_wakeup_subsequent_commits(MYSQL_THD thd); +void thd_wakeup_subsequent_commits(MYSQL_THD thd, int wakeup_error); #ifdef __cplusplus } diff --git a/include/mysql/plugin_audit.h.pp b/include/mysql/plugin_audit.h.pp index 564dd6272f5..17e5c191672 100644 --- a/include/mysql/plugin_audit.h.pp +++ b/include/mysql/plugin_audit.h.pp @@ -236,7 +236,7 @@ void mysql_query_cache_invalidate4(void* thd, void *thd_get_ha_data(const void* thd, const struct handlerton *hton); void thd_set_ha_data(void* thd, const struct handlerton *hton, const void *ha_data); -void thd_wakeup_subsequent_commits(void* thd); +void thd_wakeup_subsequent_commits(void* thd, int wakeup_error); struct mysql_event_general { unsigned int event_subclass; diff --git a/include/mysql/plugin_auth.h.pp b/include/mysql/plugin_auth.h.pp index edfd7095203..33b552ec75b 100644 --- a/include/mysql/plugin_auth.h.pp +++ b/include/mysql/plugin_auth.h.pp @@ -236,7 +236,7 @@ void mysql_query_cache_invalidate4(void* thd, void *thd_get_ha_data(const void* thd, const struct handlerton *hton); void thd_set_ha_data(void* thd, const struct handlerton *hton, const void *ha_data); -void thd_wakeup_subsequent_commits(void* thd); +void thd_wakeup_subsequent_commits(void* thd, int wakeup_error); #include <mysql/plugin_auth_common.h> typedef struct st_plugin_vio_info { diff --git a/include/mysql/plugin_ftparser.h.pp b/include/mysql/plugin_ftparser.h.pp index 0cc51e259dc..b4aa962c51c 100644 --- a/include/mysql/plugin_ftparser.h.pp +++ b/include/mysql/plugin_ftparser.h.pp @@ -189,7 +189,7 @@ void mysql_query_cache_invalidate4(void* thd, void *thd_get_ha_data(const void* thd, const struct handlerton *hton); void thd_set_ha_data(void* thd, const struct handlerton *hton, const void *ha_data); -void thd_wakeup_subsequent_commits(void* thd); +void thd_wakeup_subsequent_commits(void* thd, int wakeup_error); enum enum_ftparser_mode { MYSQL_FTPARSER_SIMPLE_MODE= 0, diff --git a/sql/handler.cc b/sql/handler.cc index c42204b27d1..672e1cb4e42 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1458,10 +1458,11 @@ int ha_commit_one_phase(THD *thd, bool all) transaction.all.ha_list, see why in trans_register_ha()). */ bool is_real_trans=all || thd->transaction.all.ha_list == 0; + int res; DBUG_ENTER("ha_commit_one_phase"); - if (is_real_trans) - thd->wait_for_prior_commit(); - int res= commit_one_phase_2(thd, all, trans, is_real_trans); + if (is_real_trans && (res= thd->wait_for_prior_commit())) + DBUG_RETURN(res); + res= commit_one_phase_2(thd, all, trans, is_real_trans); DBUG_RETURN(res); } @@ -1501,7 +1502,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) /* Free resources and perform other cleanup even for 'empty' transactions. */ if (is_real_trans) { - thd->wakeup_subsequent_commits(); + thd->wakeup_subsequent_commits(error); thd->transaction.cleanup(); } @@ -1579,7 +1580,7 @@ int ha_rollback_trans(THD *thd, bool all) /* Always cleanup. Even if nht==0. There may be savepoints. */ if (is_real_trans) { - thd->wakeup_subsequent_commits(); + thd->wakeup_subsequent_commits(error); thd->transaction.cleanup(); } if (all) diff --git a/sql/log.cc b/sql/log.cc index dd6eeb3678c..95091875d83 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6743,7 +6743,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) cur->wakeup_subsequent_commits_running= true; mysql_mutex_unlock(&cur->LOCK_wait_commit); } - waiter->wakeup(); + waiter->wakeup(0); } waiter= next; } @@ -6849,7 +6849,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) field. */ if (next->queued_by_other) - next->thd->wait_for_commit_ptr->wakeup(); + next->thd->wait_for_commit_ptr->wakeup(entry->error); else next->thd->signal_wakeup_ready(); } @@ -7145,7 +7145,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) if (current != leader) // Don't wake up ourself { if (current->queued_by_other) - current->thd->wait_for_commit_ptr->wakeup(); + current->thd->wait_for_commit_ptr->wakeup(current->error); else current->thd->signal_wakeup_ready(); } @@ -7844,7 +7844,8 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, mysql_mutex_unlock(&LOCK_prepare_ordered); } - thd->wait_for_prior_commit(); + if (thd->wait_for_prior_commit()) + return 0; cookie= 0; if (xid) diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index c10a035c599..c6411b01e60 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -52,7 +52,7 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool; -static void +static int rpt_handle_event(rpl_parallel_thread::queued_event *qev, struct rpl_parallel_thread *rpt) { @@ -70,6 +70,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); thd->rgi_slave= NULL; /* ToDo: error handling. */ + return err; } @@ -104,6 +105,7 @@ handle_rpl_parallel_thread(void *arg) bool group_standalone= true; bool in_event_group= false; uint64 event_gtid_sub_id= 0; + int err; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -139,6 +141,7 @@ handle_rpl_parallel_thread(void *arg) mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread); rpt->running= true; + mysql_cond_signal(&rpt->COND_rpl_thread); while (!rpt->stop && !thd->killed) { @@ -163,6 +166,7 @@ handle_rpl_parallel_thread(void *arg) uint64 wait_start_sub_id; bool end_of_group; + err= 0; /* Handle a new event group, which will be initiated by a GTID event. */ if (event_type == GTID_EVENT) { @@ -221,9 +225,9 @@ handle_rpl_parallel_thread(void *arg) everything is stopped and cleaned up correctly. */ if (!sql_worker_killed(thd, rgi, in_event_group)) - rpt_handle_event(events, rpt); + err= rpt_handle_event(events, rpt); else - thd->wait_for_prior_commit(); + err= thd->wait_for_prior_commit(); end_of_group= in_event_group && @@ -272,7 +276,7 @@ handle_rpl_parallel_thread(void *arg) } mysql_mutex_unlock(&entry->LOCK_parallel_entry); - rgi->commit_orderer.wakeup_subsequent_commits(); + rgi->commit_orderer.wakeup_subsequent_commits(err); delete rgi; } @@ -431,6 +435,9 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, mysql_mutex_lock(&pool->threads[i]->LOCK_rpl_thread); pool->threads[i]->delay_start= false; mysql_cond_signal(&pool->threads[i]->COND_rpl_thread); + while (!pool->threads[i]->running) + mysql_cond_wait(&pool->threads[i]->COND_rpl_thread, + &pool->threads[i]->LOCK_rpl_thread); mysql_mutex_unlock(&pool->threads[i]->LOCK_rpl_thread); } diff --git a/sql/share/errmsg-utf8.txt b/sql/share/errmsg-utf8.txt index 85baddd3c49..51bc7eaf93d 100644 --- a/sql/share/errmsg-utf8.txt +++ b/sql/share/errmsg-utf8.txt @@ -6557,3 +6557,5 @@ ER_STORED_FUNCTION_PREVENTS_SWITCH_GTID_DOMAIN_ID_SEQ_NO eng "Cannot modify @@session.gtid_domain_id or @@session.gtid_seq_no inside a stored function or trigger" ER_CHANGE_SLAVE_PARALLEL_THREADS_ACTIVE eng "Cannot change @@slave_parallel_threads while another change is in progress" +ER_PRIOR_COMMIT_FAILED + eng "Commit failed due to failure of an earlier commit on which this one depends" diff --git a/sql/sql_class.cc b/sql/sql_class.cc index f424e34969d..65fed2e8f98 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -610,9 +610,9 @@ void thd_set_ha_data(THD *thd, const struct handlerton *hton, @see thd_wakeup_subsequent_commits() definition in plugin.h */ extern "C" -void thd_wakeup_subsequent_commits(THD *thd) +void thd_wakeup_subsequent_commits(THD *thd, int wakeup_error) { - thd->wakeup_subsequent_commits(); + thd->wakeup_subsequent_commits(wakeup_error); } @@ -5618,7 +5618,8 @@ bool THD::rgi_have_temporary_tables() wait_for_commit::wait_for_commit() : subsequent_commits_list(0), next_subsequent_commit(0), waitee(0), opaque_pointer(0), - waiting_for_commit(false), wakeup_subsequent_commits_running(false) + waiting_for_commit(false), wakeup_error(0), + wakeup_subsequent_commits_running(false) { mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0); @@ -5633,7 +5634,7 @@ wait_for_commit::~wait_for_commit() void -wait_for_commit::wakeup() +wait_for_commit::wakeup(int wakeup_error) { /* We signal each waiter on their own condition and mutex (rather than using @@ -5649,6 +5650,7 @@ wait_for_commit::wakeup() */ mysql_mutex_lock(&LOCK_wait_commit); waiting_for_commit= false; + this->wakeup_error= wakeup_error; mysql_mutex_unlock(&LOCK_wait_commit); mysql_cond_signal(&COND_wait_commit); } @@ -5675,6 +5677,7 @@ void wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) { waiting_for_commit= true; + wakeup_error= 0; DBUG_ASSERT(!this->waitee /* No prior registration allowed */); this->waitee= waitee; @@ -5704,7 +5707,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) with register_wait_for_prior_commit(). If the commit already completed, returns immediately. */ -void +int wait_for_commit::wait_for_prior_commit2() { mysql_mutex_lock(&LOCK_wait_commit); @@ -5712,6 +5715,7 @@ wait_for_commit::wait_for_prior_commit2() mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit); mysql_mutex_unlock(&LOCK_wait_commit); waitee= NULL; + return wakeup_error; } @@ -5755,7 +5759,7 @@ wait_for_commit::wait_for_prior_commit2() */ void -wait_for_commit::wakeup_subsequent_commits2() +wait_for_commit::wakeup_subsequent_commits2(int wakeup_error) { wait_for_commit *waiter; @@ -5772,7 +5776,7 @@ wait_for_commit::wakeup_subsequent_commits2() once the wakeup is done, the field could be invalidated at any time. */ wait_for_commit *next= waiter->next_subsequent_commit; - waiter->wakeup(); + waiter->wakeup(wakeup_error); waiter= next; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 01121fd5b35..567eaf5c351 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1614,6 +1614,8 @@ struct wait_for_commit cleared. */ bool waiting_for_commit; + /* The wakeup error code from the waitee. 0 means no error. */ + int wakeup_error; /* Flag set when wakeup_subsequent_commits_running() is active, see comments on that function for details. @@ -1621,16 +1623,18 @@ struct wait_for_commit bool wakeup_subsequent_commits_running; void register_wait_for_prior_commit(wait_for_commit *waitee); - void wait_for_prior_commit() + int wait_for_prior_commit() { /* Quick inline check, to avoid function call and locking in the common case where no wakeup is registered, or a registered wait was already signalled. */ if (waiting_for_commit) - wait_for_prior_commit2(); + return wait_for_prior_commit2(); + else + return wakeup_error; } - void wakeup_subsequent_commits() + void wakeup_subsequent_commits(int wakeup_error) { /* Do the check inline, so only the wakeup case takes the cost of a function @@ -1645,7 +1649,7 @@ struct wait_for_commit prevent a waiter from arriving just after releasing the lock. */ if (subsequent_commits_list) - wakeup_subsequent_commits2(); + wakeup_subsequent_commits2(wakeup_error); } void unregister_wait_for_prior_commit() { @@ -1653,10 +1657,10 @@ struct wait_for_commit unregister_wait_for_prior_commit2(); } - void wakeup(); + void wakeup(int wakeup_error); - void wait_for_prior_commit2(); - void wakeup_subsequent_commits2(); + int wait_for_prior_commit2(); + void wakeup_subsequent_commits2(int wakeup_error); void unregister_wait_for_prior_commit2(); wait_for_commit(); @@ -3308,15 +3312,21 @@ public: void signal_wakeup_ready(); wait_for_commit *wait_for_commit_ptr; - void wait_for_prior_commit() + int wait_for_prior_commit() { if (wait_for_commit_ptr) - wait_for_commit_ptr->wait_for_prior_commit(); + { + int err= wait_for_commit_ptr->wait_for_prior_commit(); + if (err) + my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); + return err; + } + return 0; } - void wakeup_subsequent_commits() + void wakeup_subsequent_commits(int wakeup_error) { if (wait_for_commit_ptr) - wait_for_commit_ptr->wakeup_subsequent_commits(); + wait_for_commit_ptr->wakeup_subsequent_commits(wakeup_error); } private: diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 4d4bb7bd1f3..f32037244c5 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -2927,7 +2927,7 @@ innobase_commit( /* At this point commit order is fixed and transaction is visible to others. So we can wakeup other commits waiting for this one, to allow then to group commit with us. */ - thd_wakeup_subsequent_commits(thd); + thd_wakeup_subsequent_commits(thd, 0); /* We did the first part already in innobase_commit_ordered(), Now finish by doing a write + flush of logs. */ diff --git a/storage/xtradb/handler/ha_innodb.cc b/storage/xtradb/handler/ha_innodb.cc index e80810d3948..2efcb15cba6 100644 --- a/storage/xtradb/handler/ha_innodb.cc +++ b/storage/xtradb/handler/ha_innodb.cc @@ -3588,7 +3588,7 @@ innobase_commit( /* At this point commit order is fixed and transaction is visible to others. So we can wakeup other commits waiting for this one, to allow then to group commit with us. */ - thd_wakeup_subsequent_commits(thd); + thd_wakeup_subsequent_commits(thd, 0); /* We did the first part already in innobase_commit_ordered(), Now finish by doing a write + flush of logs. */ |