summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorBrave Galera Crew <devel@codership.com>2019-01-23 15:30:00 +0400
committerSergey Vojtovich <svoj@mariadb.org>2019-01-23 15:30:00 +0400
commit36a2a185fe18d31a644da46cfabd9757a379280c (patch)
tree00ca186ce2cfdc3ab7e4979336a384e2b51c5aa9 /sql/slave.cc
parent382115b99297ceaa4c3067f79efb5c2515013be5 (diff)
downloadmariadb-git-36a2a185fe18d31a644da46cfabd9757a379280c.tar.gz
Galera4
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc115
1 files changed, 83 insertions, 32 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 5fbe0c77661..f31021bc71e 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -53,6 +53,9 @@
// Create_file_log_event,
// Format_description_log_event
#include "wsrep_mysqld.h"
+#ifdef WITH_WSREP
+#include "wsrep_trans_observer.h"
+#endif
#ifdef HAVE_REPLICATION
@@ -1201,6 +1204,11 @@ terminate_slave_thread(THD *thd,
int error __attribute__((unused));
DBUG_PRINT("loop", ("killing slave thread"));
+#ifdef WITH_WSREP
+ /* awake_no_mutex() requires LOCK_thd_data to be locked if wsrep
+ is enabled */
+ if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data);
+#endif /* WITH_WSREP */
mysql_mutex_lock(&thd->LOCK_thd_kill);
#ifndef DONT_USE_THR_ALARM
/*
@@ -1214,6 +1222,9 @@ terminate_slave_thread(THD *thd,
thd->awake_no_mutex(NOT_KILLED);
mysql_mutex_unlock(&thd->LOCK_thd_kill);
+#ifdef WITH_WSREP
+ if (WSREP(thd)) mysql_mutex_unlock(&thd->LOCK_thd_data);
+#endif /* WITH_WSREP */
/*
There is a small chance that slave thread might miss the first
@@ -3943,14 +3954,20 @@ apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
exec_res= ev->apply_event(rgi);
#ifdef WITH_WSREP
- if (exec_res && thd->wsrep_conflict_state != NO_CONFLICT)
+ if (WSREP_ON)
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ if (exec_res &&
+ thd->wsrep_trx().state() != wsrep::transaction::s_executing)
{
- WSREP_DEBUG("SQL apply failed, res %d conflict state: %d",
- exec_res, thd->wsrep_conflict_state);
+ WSREP_DEBUG("SQL apply failed, res %d conflict state: %s",
+ exec_res, wsrep_thd_transaction_state_str(thd));
rli->abort_slave= 1;
rli->report(ERROR_LEVEL, ER_UNKNOWN_COM_ERROR, rgi->gtid_info(),
"Node has dropped from cluster");
}
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
#endif
#ifndef DBUG_OFF
@@ -4243,6 +4260,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
}
if (ev)
{
+#ifdef WITH_WSREP
+ if (wsrep_before_statement(thd))
+ {
+ WSREP_INFO("Wsrep before statement error");
+ DBUG_RETURN(1);
+ }
+#endif /* WITH_WSREP */
int exec_res;
Log_event_type typ= ev->get_type_code();
@@ -4274,9 +4298,9 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
(ev->server_id != global_system_variables.server_id ||
rli->replicate_same_server_id) &&
- rli->is_until_satisfied((rli->get_flag(Relay_log_info::IN_TRANSACTION) || !ev->log_pos)
- ? rli->group_master_log_pos
- : ev->log_pos - ev->data_written))
+ rli->is_until_satisfied((rli->get_flag(Relay_log_info::IN_TRANSACTION) || !ev->log_pos)
+ ? rli->group_master_log_pos
+ : ev->log_pos - ev->data_written))
{
sql_print_information("Slave SQL thread stopped because it reached its"
" UNTIL position %llu", rli->until_pos());
@@ -4287,6 +4311,9 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
rli->abort_slave= 1;
rli->stop_for_until= true;
mysql_mutex_unlock(&rli->data_lock);
+#ifdef WITH_WSREP
+ wsrep_after_statement(thd);
+#endif /* WITH_WSREP */
delete ev;
DBUG_RETURN(1);
}
@@ -4324,7 +4351,12 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
if (res == 0)
rli->event_relay_log_pos= rli->future_event_relay_log_pos;
if (res >= 0)
+ {
+#ifdef WITH_WSREP
+ wsrep_after_statement(thd);
+#endif /* WITH_WSREP */
DBUG_RETURN(res);
+ }
/*
Else we proceed to execute the event non-parallel.
This is the case for pre-10.0 events without GTID, and for handling
@@ -4359,6 +4391,9 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
"aborted because of out-of-memory error");
mysql_mutex_unlock(&rli->data_lock);
delete ev;
+#ifdef WITH_WSREP
+ wsrep_after_statement(thd);
+#endif /* WITH_WSREP */
DBUG_RETURN(1);
}
@@ -4373,6 +4408,9 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
"thread aborted because of out-of-memory error");
mysql_mutex_unlock(&rli->data_lock);
delete ev;
+#ifdef WITH_WSREP
+ wsrep_after_statement(thd);
+#endif /* WITH_WSREP */
DBUG_RETURN(1);
}
/*
@@ -4401,13 +4439,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
retry.
*/
if (unlikely(exec_res == 2))
+ {
+#ifdef WITH_WSREP
+ wsrep_after_statement(thd);
+#endif /* WITH_WSREP */
DBUG_RETURN(1);
-
+ }
#ifdef WITH_WSREP
mysql_mutex_lock(&thd->LOCK_thd_data);
- if (thd->wsrep_conflict_state == NO_CONFLICT)
- {
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ enum wsrep::client_error wsrep_error= thd->wsrep_cs().current_error();
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ if (wsrep_error == wsrep::e_success)
#endif /* WITH_WSREP */
if (slave_trans_retries)
{
@@ -4420,8 +4462,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
We were in a transaction which has been rolled back because of a
temporary error;
let's seek back to BEGIN log event and retry it all again.
- Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
- there is no rollback since 5.0.13 (ref: manual).
+ Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
+ there is no rollback since 5.0.13 (ref: manual).
We have to not only seek but also
a) init_master_info(), to seek back to hot relay log's start
@@ -4482,13 +4524,11 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
serial_rgi->trans_retries));
}
}
-#ifdef WITH_WSREP
- }
- else
- mysql_mutex_unlock(&thd->LOCK_thd_data);
-#endif /* WITH_WSREP */
thread_safe_increment64(&rli->executed_entries);
+#ifdef WITH_WSREP
+ wsrep_after_statement(thd);
+#endif /* WITH_WSREP */
DBUG_RETURN(exec_res);
}
mysql_mutex_unlock(&rli->data_lock);
@@ -5415,12 +5455,6 @@ pthread_handler_t handle_slave_sql(void *arg)
}
#endif
-#ifdef WITH_WSREP
- thd->wsrep_exec_mode= LOCAL_STATE;
- /* synchronize with wsrep replication */
- if (WSREP_ON)
- wsrep_ready_wait();
-#endif
DBUG_PRINT("master_info",("log_file_name: %s position: %llu",
rli->group_master_log_name,
rli->group_master_log_pos));
@@ -5517,7 +5551,14 @@ pthread_handler_t handle_slave_sql(void *arg)
goto err;
}
mysql_mutex_unlock(&rli->data_lock);
-
+#ifdef WITH_WSREP
+ wsrep_open(thd);
+ if (wsrep_before_command(thd))
+ {
+ WSREP_WARN("Slave SQL wsrep_before_command() failed");
+ goto err;
+ }
+#endif /* WITH_WSREP */
/* Read queries from the IO/THREAD until this thread is killed */
thd->set_command(COM_SLAVE_SQL);
@@ -5554,10 +5595,16 @@ pthread_handler_t handle_slave_sql(void *arg)
if (exec_relay_log_event(thd, rli, serial_rgi))
{
#ifdef WITH_WSREP
- if (thd->wsrep_conflict_state != NO_CONFLICT)
+ if (WSREP_ON)
{
- wsrep_node_dropped= TRUE;
- rli->abort_slave= TRUE;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+
+ if (thd->wsrep_cs().current_error())
+ {
+ wsrep_node_dropped = TRUE;
+ rli->abort_slave = TRUE;
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
}
#endif /* WITH_WSREP */
@@ -5590,6 +5637,10 @@ pthread_handler_t handle_slave_sql(void *arg)
"log '%s' at position %llu%s", RPL_LOG_NAME,
rli->group_master_log_pos, tmp.c_ptr_safe());
}
+#ifdef WITH_WSREP
+ wsrep_after_command_before_result(thd);
+ wsrep_after_command_after_result(thd);
+#endif /* WITH_WSREP */
err_before_start:
@@ -5708,17 +5759,17 @@ err_during_init:
"SQL slave will continue");
wsrep_node_dropped= FALSE;
mysql_mutex_unlock(&rli->run_lock);
- WSREP_DEBUG("wsrep_conflict_state now: %d", thd->wsrep_conflict_state);
- WSREP_INFO("slave restart: %d", thd->wsrep_conflict_state);
- thd->wsrep_conflict_state= NO_CONFLICT;
goto wsrep_restart_point;
- } else {
+ }
+ else
+ {
WSREP_INFO("Slave error due to node going non-primary");
WSREP_INFO("wsrep_restart_slave was set and therefore slave will be "
- "automatically restarted when node joins back to cluster.");
+ "automatically restarted when node joins back to cluster");
wsrep_restart_slave_activated= TRUE;
}
}
+ wsrep_close(thd);
#endif /* WITH_WSREP */
/*