diff options
Diffstat (limited to 'sql/wsrep_high_priority_service.cc')
-rw-r--r-- | sql/wsrep_high_priority_service.cc | 76 |
1 files changed, 44 insertions, 32 deletions
diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index ef9a46f1a8e..d73b9cb09ce 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -119,6 +119,23 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd) thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; } +static int apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err) +{ + int const ret= wsrep_apply_events(thd, rli, data.data(), data.size()); + if (ret || wsrep_thd_has_ignored_error(thd)) + { + if (ret) + { + wsrep_store_error(thd, err); + } + wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); + } + return ret; +} + /**************************************************************************** High priority service *****************************************************************************/ @@ -254,8 +271,8 @@ int Wsrep_high_priority_service::append_fragment_and_commit( common utility function to deal with commit. */ const bool do_binlog_commit= (opt_log_slave_updates && - wsrep_gtid_mode && - m_thd->variables.gtid_seq_no); + wsrep_gtid_mode && + m_thd->variables.gtid_seq_no); /* Write skip event into binlog if gtid_mode is on. This is to maintain gtid continuity. @@ -272,8 +289,7 @@ int Wsrep_high_priority_service::append_fragment_and_commit( } ret= ret || trans_commit(m_thd); - - m_thd->wsrep_cs().after_applying(); + ret= ret || (m_thd->wsrep_cs().after_applying(), 0); m_thd->mdl_context.release_transactional_locks(); thd_proc_info(m_thd, "wsrep applier committed"); @@ -342,7 +358,15 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle, const wsrep::ws_meta& ws_meta) { DBUG_ENTER("Wsrep_high_priority_service::rollback"); - m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false); + if (ws_meta.ordered()) + { + m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false); + } + else + { + assert(ws_meta == wsrep::ws_meta()); + assert(ws_handle == wsrep::ws_handle()); + } int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd)); m_thd->mdl_context.release_transactional_locks(); m_thd->mdl_context.release_explicit_locks(); @@ -351,7 +375,7 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle, int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, - wsrep::mutable_buffer&) + wsrep::mutable_buffer& err) { DBUG_ENTER("Wsrep_high_priority_service::apply_toi"); THD* thd= m_thd; @@ -365,19 +389,15 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, WSREP_DEBUG("Wsrep_high_priority_service::apply_toi: %lld", client_state.toi_meta().seqno().get()); - int ret= wsrep_apply_events(thd, m_rli, data.data(), data.size()); - if (ret != 0 || thd->wsrep_has_ignored_error) - { - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - thd->wsrep_has_ignored_error= false; - /* todo: error voting */ - } + int ret= apply_events(thd, m_rli, data, err); + wsrep_thd_set_ignored_error(thd, false); trans_commit(thd); thd->close_temporary_tables(); thd->lex->sql_command= SQLCOM_END; - wsrep_set_SE_checkpoint(client_state.toi_meta().gtid()); + wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false); + wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid()); must_exit_= check_exit_status(); @@ -429,13 +449,18 @@ int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_ cs.before_rollback(); cs.after_rollback(); } - wsrep_set_SE_checkpoint(ws_meta.gtid()); + wsrep_set_SE_checkpoint(ws_meta.gtid(), wsrep_gtid_server.gtid()); ret= ret || cs.provider().commit_order_leave(ws_handle, ws_meta, err); cs.after_applying(); } DBUG_RETURN(ret); } +void Wsrep_high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err) +{ + m_thd->wsrep_cs().adopt_apply_error(err); +} + void Wsrep_high_priority_service::debug_crash(const char* crash_point) { DBUG_ASSERT(m_thd == current_thd); @@ -467,7 +492,7 @@ Wsrep_applier_service::~Wsrep_applier_service() int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, - wsrep::mutable_buffer&) + wsrep::mutable_buffer& err) { DBUG_ENTER("Wsrep_applier_service::apply_write_set"); THD* thd= m_thd; @@ -493,13 +518,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, };); wsrep_setup_uk_and_fk_checks(thd); - - int ret= wsrep_apply_events(thd, m_rli, data.data(), data.size()); - - if (ret || thd->wsrep_has_ignored_error) - { - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - } + int ret= apply_events(thd, m_rli, data, err); thd->close_temporary_tables(); if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit)) @@ -631,7 +650,7 @@ Wsrep_replayer_service::~Wsrep_replayer_service() int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, const wsrep::const_buffer& data, - wsrep::mutable_buffer&) + wsrep::mutable_buffer& err) { DBUG_ENTER("Wsrep_replayer_service::apply_write_set"); THD* thd= m_thd; @@ -650,14 +669,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, ws_meta, thd->wsrep_sr().fragments()); } - - ret= ret || wsrep_apply_events(thd, m_rli, data.data(), data.size()); - - if (ret || thd->wsrep_has_ignored_error) - { - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - } - + ret= ret || apply_events(thd, m_rli, data, err); thd->close_temporary_tables(); if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit)) { |