summaryrefslogtreecommitdiff
path: root/sql/wsrep_high_priority_service.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_high_priority_service.cc')
-rw-r--r--sql/wsrep_high_priority_service.cc76
1 files changed, 44 insertions, 32 deletions
diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc
index ef9a46f1a8e..d73b9cb09ce 100644
--- a/sql/wsrep_high_priority_service.cc
+++ b/sql/wsrep_high_priority_service.cc
@@ -119,6 +119,23 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd)
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
}
+static int apply_events(THD* thd,
+ Relay_log_info* rli,
+ const wsrep::const_buffer& data,
+ wsrep::mutable_buffer& err)
+{
+ int const ret= wsrep_apply_events(thd, rli, data.data(), data.size());
+ if (ret || wsrep_thd_has_ignored_error(thd))
+ {
+ if (ret)
+ {
+ wsrep_store_error(thd, err);
+ }
+ wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
+ }
+ return ret;
+}
+
/****************************************************************************
High priority service
*****************************************************************************/
@@ -254,8 +271,8 @@ int Wsrep_high_priority_service::append_fragment_and_commit(
common utility function to deal with commit.
*/
const bool do_binlog_commit= (opt_log_slave_updates &&
- wsrep_gtid_mode &&
- m_thd->variables.gtid_seq_no);
+ wsrep_gtid_mode &&
+ m_thd->variables.gtid_seq_no);
/*
Write skip event into binlog if gtid_mode is on. This is to
maintain gtid continuity.
@@ -272,8 +289,7 @@ int Wsrep_high_priority_service::append_fragment_and_commit(
}
ret= ret || trans_commit(m_thd);
-
- m_thd->wsrep_cs().after_applying();
+ ret= ret || (m_thd->wsrep_cs().after_applying(), 0);
m_thd->mdl_context.release_transactional_locks();
thd_proc_info(m_thd, "wsrep applier committed");
@@ -342,7 +358,15 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta)
{
DBUG_ENTER("Wsrep_high_priority_service::rollback");
- m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false);
+ if (ws_meta.ordered())
+ {
+ m_thd->wsrep_cs().prepare_for_ordering(ws_handle, ws_meta, false);
+ }
+ else
+ {
+ assert(ws_meta == wsrep::ws_meta());
+ assert(ws_handle == wsrep::ws_handle());
+ }
int ret= (trans_rollback_stmt(m_thd) || trans_rollback(m_thd));
m_thd->mdl_context.release_transactional_locks();
m_thd->mdl_context.release_explicit_locks();
@@ -351,7 +375,7 @@ int Wsrep_high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
- wsrep::mutable_buffer&)
+ wsrep::mutable_buffer& err)
{
DBUG_ENTER("Wsrep_high_priority_service::apply_toi");
THD* thd= m_thd;
@@ -365,19 +389,15 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
WSREP_DEBUG("Wsrep_high_priority_service::apply_toi: %lld",
client_state.toi_meta().seqno().get());
- int ret= wsrep_apply_events(thd, m_rli, data.data(), data.size());
- if (ret != 0 || thd->wsrep_has_ignored_error)
- {
- wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
- thd->wsrep_has_ignored_error= false;
- /* todo: error voting */
- }
+ int ret= apply_events(thd, m_rli, data, err);
+ wsrep_thd_set_ignored_error(thd, false);
trans_commit(thd);
thd->close_temporary_tables();
thd->lex->sql_command= SQLCOM_END;
- wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());
+ wsrep_gtid_server.signal_waiters(thd->wsrep_current_gtid_seqno, false);
+ wsrep_set_SE_checkpoint(client_state.toi_meta().gtid(), wsrep_gtid_server.gtid());
must_exit_= check_exit_status();
@@ -429,13 +449,18 @@ int Wsrep_high_priority_service::log_dummy_write_set(const wsrep::ws_handle& ws_
cs.before_rollback();
cs.after_rollback();
}
- wsrep_set_SE_checkpoint(ws_meta.gtid());
+ wsrep_set_SE_checkpoint(ws_meta.gtid(), wsrep_gtid_server.gtid());
ret= ret || cs.provider().commit_order_leave(ws_handle, ws_meta, err);
cs.after_applying();
}
DBUG_RETURN(ret);
}
+void Wsrep_high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err)
+{
+ m_thd->wsrep_cs().adopt_apply_error(err);
+}
+
void Wsrep_high_priority_service::debug_crash(const char* crash_point)
{
DBUG_ASSERT(m_thd == current_thd);
@@ -467,7 +492,7 @@ Wsrep_applier_service::~Wsrep_applier_service()
int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
- wsrep::mutable_buffer&)
+ wsrep::mutable_buffer& err)
{
DBUG_ENTER("Wsrep_applier_service::apply_write_set");
THD* thd= m_thd;
@@ -493,13 +518,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
};);
wsrep_setup_uk_and_fk_checks(thd);
-
- int ret= wsrep_apply_events(thd, m_rli, data.data(), data.size());
-
- if (ret || thd->wsrep_has_ignored_error)
- {
- wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
- }
+ int ret= apply_events(thd, m_rli, data, err);
thd->close_temporary_tables();
if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit))
@@ -631,7 +650,7 @@ Wsrep_replayer_service::~Wsrep_replayer_service()
int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data,
- wsrep::mutable_buffer&)
+ wsrep::mutable_buffer& err)
{
DBUG_ENTER("Wsrep_replayer_service::apply_write_set");
THD* thd= m_thd;
@@ -650,14 +669,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
ws_meta,
thd->wsrep_sr().fragments());
}
-
- ret= ret || wsrep_apply_events(thd, m_rli, data.data(), data.size());
-
- if (ret || thd->wsrep_has_ignored_error)
- {
- wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
- }
-
+ ret= ret || apply_events(thd, m_rli, data, err);
thd->close_temporary_tables();
if (!ret && !(ws_meta.flags() & wsrep::provider::flag::commit))
{