summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/suite/galera/r/galera_performance_schema.result1
-rw-r--r--sql/service_wsrep.cc16
-rw-r--r--sql/sql_connect.cc18
-rw-r--r--sql/threadpool_common.cc9
-rw-r--r--sql/wsrep_client_service.cc10
-rw-r--r--sql/wsrep_high_priority_service.cc26
-rw-r--r--sql/wsrep_mysqld.cc7
-rw-r--r--sql/wsrep_schema.cc22
-rw-r--r--sql/wsrep_server_service.cc51
-rw-r--r--sql/wsrep_server_service.h9
-rw-r--r--sql/wsrep_sst.cc7
-rw-r--r--sql/wsrep_storage_service.cc12
-rw-r--r--sql/wsrep_thd.cc346
-rw-r--r--sql/wsrep_thd.h84
-rw-r--r--sql/wsrep_trans_observer.h11
-rw-r--r--sql/wsrep_utils.cc4
m---------wsrep-lib0
17 files changed, 416 insertions, 217 deletions
diff --git a/mysql-test/suite/galera/r/galera_performance_schema.result b/mysql-test/suite/galera/r/galera_performance_schema.result
index 86d773bb1ef..ea0b6ad9ef4 100644
--- a/mysql-test/suite/galera/r/galera_performance_schema.result
+++ b/mysql-test/suite/galera/r/galera_performance_schema.result
@@ -7,7 +7,6 @@ WHERE name LIKE 'thread/sql/wsrep%'
ORDER BY name;
name thread/sql/wsrep_applier_thread
name thread/sql/wsrep_rollbacker_thread
-name thread/sql/wsrep_rollbacker_thread
use test;
create table t1 (a int not null primary key) engine=innodb;
insert into t1 values (1),(2);
diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc
index 8583897e064..43944366665 100644
--- a/sql/service_wsrep.cc
+++ b/sql/service_wsrep.cc
@@ -146,11 +146,10 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd,
victim_thd->wsrep_trx_id(),
victim_thd->wsrep_sr().fragments_certified(),
wsrep_thd_transaction_state_str(victim_thd));
- if (bf_thd && bf_thd != victim_thd)
- {
- victim_thd->store_globals();
- }
- else
+
+ /* Note: do not store/reset globals before wsrep_bf_abort() call
+ to avoid losing BF thd context. */
+ if (!(bf_thd && bf_thd != victim_thd))
{
DEBUG_SYNC(victim_thd, "wsrep_before_SR_rollback");
}
@@ -162,21 +161,24 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd,
{
wsrep_thd_self_abort(victim_thd);
}
- if (bf_thd && bf_thd != victim_thd)
+ if (bf_thd)
{
- bf_thd->store_globals();
+ wsrep_store_threadvars(bf_thd);
}
}
extern "C" my_bool wsrep_thd_bf_abort(const THD *bf_thd, THD *victim_thd,
my_bool signal)
{
+ /* Note: do not store/reset globals before wsrep_bf_abort() call
+ to avoid losing BF thd context. */
if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active())
{
WSREP_DEBUG("BF abort for non active transaction");
wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id());
}
my_bool ret= wsrep_bf_abort(bf_thd, victim_thd);
+ wsrep_store_threadvars((THD*)bf_thd);
/*
Send awake signal if victim was BF aborted or does not
have wsrep on. Note that this should never interrupt RSU
diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc
index b8fff8c864a..6bf43d3df5e 100644
--- a/sql/sql_connect.cc
+++ b/sql/sql_connect.cc
@@ -1188,6 +1188,16 @@ void end_connection(THD *thd)
{
NET *net= &thd->net;
+#ifdef WITH_WSREP
+ if (thd->wsrep_cs().state() == wsrep::client_state::s_exec)
+ {
+ /* Error happened after the thread acquired ownership to wsrep
+ client state, but before command was processed. Clean up the
+ state before wsrep_close(). */
+ wsrep_after_command_ignore_result(thd);
+ }
+ wsrep_close(thd);
+#endif /* WITH_WSREP */
if (thd->user_connect)
{
/*
@@ -1321,6 +1331,7 @@ bool thd_prepare_connection(THD *thd)
prepare_new_connection_state(thd);
#ifdef WITH_WSREP
thd->wsrep_client_thread= true;
+ wsrep_open(thd);
#endif /* WITH_WSREP */
return FALSE;
}
@@ -1393,9 +1404,6 @@ void do_handle_one_connection(CONNECT *connect)
create_user= FALSE;
goto end_thread;
}
-#ifdef WITH_WSREP
- wsrep_open(thd);
-#endif /* WITH_WSREP */
while (thd_is_connection_alive(thd))
{
@@ -1406,10 +1414,6 @@ void do_handle_one_connection(CONNECT *connect)
}
end_connection(thd);
-#ifdef WITH_WSREP
- wsrep_close(thd);
-#endif /* WITH_WSREP */
-
end_thread:
close_connection(thd);
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
index cfb831e2f55..6577472e20b 100644
--- a/sql/threadpool_common.cc
+++ b/sql/threadpool_common.cc
@@ -23,7 +23,9 @@
#include <sql_audit.h>
#include <debug_sync.h>
#include <threadpool.h>
-
+#ifdef WITH_WSREP
+#include "wsrep_trans_observer.h"
+#endif /* WITH_WSREP */
/* Threadpool parameters */
@@ -137,6 +139,11 @@ static inline void set_thd_idle(THD *thd)
*/
static void thread_attach(THD* thd)
{
+#ifdef WITH_WSREP
+ /* Wait until possible background rollback has finished before
+ attaching the thd. */
+ wsrep_wait_rollback_complete_and_acquire_ownership(thd);
+#endif /* WITH_WSREP */
pthread_setspecific(THR_KEY_mysys,thd->mysys_var);
thd->thread_stack=(char*)&thd;
thd->store_globals();
diff --git a/sql/wsrep_client_service.cc b/sql/wsrep_client_service.cc
index b182691c593..5f73f9f714f 100644
--- a/sql/wsrep_client_service.cc
+++ b/sql/wsrep_client_service.cc
@@ -30,9 +30,9 @@
#include "slave.h" /* opt_log_slave_updates */
#include "transaction.h" /* trans_commit()... */
#include "log.h" /* stmt_has_updated_trans_table() */
-//#include "debug_sync.h"
#include "mysql/service_debug_sync.h"
#include "mysql/psi/mysql_thread.h" /* mysql_mutex_assert_owner() */
+
namespace
{
@@ -57,16 +57,12 @@ Wsrep_client_service::Wsrep_client_service(THD* thd,
void Wsrep_client_service::store_globals()
{
- DBUG_ENTER("Wsrep_client_service::store_globals");
- m_thd->store_globals();
- DBUG_VOID_RETURN;
+ wsrep_store_threadvars(m_thd);
}
void Wsrep_client_service::reset_globals()
{
- DBUG_ENTER("Wsrep_client_service::reset_globals");
- m_thd->reset_globals();
- DBUG_VOID_RETURN;
+ wsrep_reset_threadvars(m_thd);
}
bool Wsrep_client_service::interrupted(
diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc
index 68cf0d1877b..73cdbd1c217 100644
--- a/sql/wsrep_high_priority_service.cc
+++ b/sql/wsrep_high_priority_service.cc
@@ -379,20 +379,13 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
void Wsrep_high_priority_service::store_globals()
{
- DBUG_ENTER("Wsrep_high_priority_service::store_globals");
- /* In addition to calling THD::store_globals(), call
- wsrep::client_state::store_globals() to gain ownership of
- the client state */
- m_thd->store_globals();
- m_thd->wsrep_cs().store_globals();
- DBUG_VOID_RETURN;
+ wsrep_store_threadvars(m_thd);
+ m_thd->wsrep_cs().acquire_ownership();
}
void Wsrep_high_priority_service::reset_globals()
{
- DBUG_ENTER("Wsrep_high_priority_service::reset_globals");
- m_thd->reset_globals();
- DBUG_VOID_RETURN;
+ wsrep_reset_threadvars(m_thd);
}
void Wsrep_high_priority_service::switch_execution_context(wsrep::high_priority_service& orig_high_priority_service)
@@ -572,11 +565,14 @@ Wsrep_replayer_service::Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd)
thd_proc_info(orig_thd, "wsrep replaying trx");
/*
- Swith execution context to replayer_thd and prepare it for
+ Switch execution context to replayer_thd and prepare it for
replay execution.
*/
- orig_thd->reset_globals();
- replayer_thd->store_globals();
+ /* Copy thd vars from orig_thd before reset, otherwise reset
+ for orig thd clears thread local storage before copy. */
+ wsrep_assign_from_threadvars(replayer_thd);
+ wsrep_reset_threadvars(orig_thd);
+ wsrep_store_threadvars(replayer_thd);
wsrep_open(replayer_thd);
wsrep_before_command(replayer_thd);
replayer_thd->wsrep_cs().clone_transaction_for_replay(orig_thd->wsrep_trx());
@@ -593,8 +589,8 @@ Wsrep_replayer_service::~Wsrep_replayer_service()
wsrep_after_apply(replayer_thd);
wsrep_after_command_ignore_result(replayer_thd);
wsrep_close(replayer_thd);
- replayer_thd->reset_globals();
- orig_thd->store_globals();
+ wsrep_reset_threadvars(replayer_thd);
+ wsrep_store_threadvars(orig_thd);
DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent());
DBUG_ASSERT(!orig_thd->get_stmt_da()->is_set());
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 0a9adc5fa2c..fde29ae434d 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -2243,6 +2243,7 @@ static void wsrep_close_thread(THD *thd)
{
thd->set_killed(KILL_CONNECTION);
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
if (thd->mysys_var)
{
thd->mysys_var->abort=1;
@@ -2255,6 +2256,7 @@ static void wsrep_close_thread(THD *thd)
}
mysql_mutex_unlock(&thd->mysys_var->mutex);
}
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
}
static my_bool have_committing_connections(THD *thd, void *)
@@ -2658,7 +2660,8 @@ void* start_wsrep_THD(void *arg)
/* now that we've called my_thread_init(), it is safe to call DBUG_* */
thd->thread_stack= (char*) &thd;
- if (thd->store_globals())
+ wsrep_assign_from_threadvars(thd);
+ if (wsrep_store_threadvars(thd))
{
close_connection(thd, ER_OUT_OF_RESOURCES);
statistic_increment(aborted_connects,&LOCK_status);
@@ -2703,7 +2706,7 @@ void* start_wsrep_THD(void *arg)
/* Wsrep may reset globals during thread context switches, store globals
before cleanup. */
- thd->store_globals();
+ wsrep_store_threadvars(thd);
close_connection(thd, 0);
diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc
index 064b6cf9f46..066ea124fb7 100644
--- a/sql/wsrep_schema.cc
+++ b/sql/wsrep_schema.cc
@@ -29,6 +29,7 @@
#include "wsrep_binlog.h"
#include "wsrep_high_priority_service.h"
#include "wsrep_storage_service.h"
+#include "wsrep_thd.h"
#include <string>
#include <sstream>
@@ -145,13 +146,13 @@ public:
: m_orig_thd(orig_thd)
, m_cur_thd(cur_thd)
{
- m_orig_thd->reset_globals();
- m_cur_thd->store_globals();
+ wsrep_reset_threadvars(m_orig_thd);
+ wsrep_store_threadvars(m_cur_thd);
}
~thd_context_switch()
{
- m_cur_thd->reset_globals();
- m_orig_thd->store_globals();
+ wsrep_reset_threadvars(m_cur_thd);
+ wsrep_store_threadvars(m_orig_thd);
}
private:
THD *m_orig_thd;
@@ -595,7 +596,8 @@ static void wsrep_init_thd_for_schema(THD *thd)
thd->variables.option_bits |= OPTION_LOG_OFF;
/* Read committed isolation to avoid gap locking */
thd->variables.tx_isolation= ISO_READ_COMMITTED;
- thd->store_globals();
+ wsrep_assign_from_threadvars(thd);
+ wsrep_store_threadvars(thd);
}
int Wsrep_schema::init()
@@ -1123,6 +1125,7 @@ int Wsrep_schema::replay_transaction(THD* orig_thd,
THD thd(next_thread_id(), true);
thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &thd);
+ wsrep_assign_from_threadvars(&thd);
Wsrep_schema_impl::wsrep_off wsrep_off(&thd);
Wsrep_schema_impl::binlog_off binlog_off(&thd);
@@ -1228,6 +1231,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
THD storage_thd(next_thread_id(), true);
storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack :
(char*) &storage_thd);
+ wsrep_assign_from_threadvars(&storage_thd);
TABLE* frag_table= 0;
TABLE* cluster_table= 0;
Wsrep_storage_service storage_service(&storage_thd);
@@ -1333,12 +1337,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
transaction_id)))
{
DBUG_ASSERT(wsrep::starts_transaction(flags));
- THD* thd= new THD(next_thread_id(), true);
- thd->thread_stack= (char*)&storage_thd;
-
- thd->real_id= pthread_self();
-
- applier= new Wsrep_applier_service(thd);
+ applier = wsrep_create_streaming_applier(&storage_thd, "recovery");
server_state.start_streaming_applier(server_id, transaction_id,
applier);
applier->start_transaction(wsrep::ws_handle(transaction_id, 0),
@@ -1364,6 +1363,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd)
Wsrep_schema_impl::end_scan(frag_table);
Wsrep_schema_impl::finish_stmt(&storage_thd);
trans_commit(&storage_thd);
+ storage_thd.set_mysys_var(0);
out:
DBUG_RETURN(ret);
}
diff --git a/sql/wsrep_server_service.cc b/sql/wsrep_server_service.cc
index 42856862db3..bfb85e3d0ab 100644
--- a/sql/wsrep_server_service.cc
+++ b/sql/wsrep_server_service.cc
@@ -26,6 +26,7 @@
#include "wsrep_mysqld.h"
#include "wsrep_schema.h"
#include "wsrep_utils.h"
+#include "wsrep_thd.h"
#include "log.h" /* sql_print_xxx() */
#include "sql_class.h" /* system variables */
@@ -50,6 +51,10 @@ wsrep::storage_service* Wsrep_server_service::storage_service(
init_service_thd(thd, cs.m_thd->thread_stack);
WSREP_DEBUG("Created storage service with thread id %llu",
thd->thread_id);
+ /* Use variables from the current thd attached to client_service.
+ This is because we need to be able to BF abort storage access
+ operations. */
+ wsrep_assign_from_threadvars(thd);
return new Wsrep_storage_service(thd);
}
@@ -62,6 +67,7 @@ wsrep::storage_service* Wsrep_server_service::storage_service(
init_service_thd(thd, hps.m_thd->thread_stack);
WSREP_DEBUG("Created high priority storage service with thread id %llu",
thd->thread_id);
+ wsrep_assign_from_threadvars(thd);
return new Wsrep_storage_service(thd);
}
@@ -71,21 +77,48 @@ void Wsrep_server_service::release_storage_service(
Wsrep_storage_service* ss=
static_cast<Wsrep_storage_service*>(storage_service);
THD* thd= ss->m_thd;
+ wsrep_reset_threadvars(thd);
delete ss;
delete thd;
}
+Wsrep_applier_service*
+wsrep_create_streaming_applier(THD *orig_thd, const char *ctx)
+{
+ /* Reset variables to allow creating new variables in thread local
+ storage for new THD if needed. Note that reset must be done for
+ current_thd, as orig_thd may not be in effect. This may be the case when
+ streaming transaction is BF aborted and streaming applier
+ is created from BF aborter context. */
+ Wsrep_threadvars saved_threadvars(wsrep_save_threadvars());
+ wsrep_reset_threadvars(saved_threadvars.cur_thd);
+ THD *thd= 0;
+ Wsrep_applier_service *ret= 0;
+ if (!wsrep_create_threadvars() &&
+ (thd= new THD(next_thread_id(), true)))
+ {
+ init_service_thd(thd, orig_thd->thread_stack);
+ wsrep_assign_from_threadvars(thd);
+ WSREP_DEBUG("Created streaming applier service in %s context with "
+ "thread id %llu", ctx, thd->thread_id);
+ if (!(ret= new (std::nothrow) Wsrep_applier_service(thd)))
+ {
+ delete thd;
+ }
+ }
+ /* Restore original thread local storage state before returning. */
+ wsrep_restore_threadvars(saved_threadvars);
+ wsrep_store_threadvars(saved_threadvars.cur_thd);
+ return ret;
+}
+
wsrep::high_priority_service*
Wsrep_server_service::streaming_applier_service(
wsrep::client_service& orig_client_service)
{
Wsrep_client_service& orig_cs=
static_cast<Wsrep_client_service&>(orig_client_service);
- THD* thd= new THD(next_thread_id(), true);
- init_service_thd(thd, orig_cs.m_thd->thread_stack);
- WSREP_DEBUG("Created streaming applier service in local context with "
- "thread id %llu", thd->thread_id);
- return new Wsrep_applier_service(thd);
+ return wsrep_create_streaming_applier(orig_cs.m_thd, "local");
}
wsrep::high_priority_service*
@@ -94,11 +127,7 @@ Wsrep_server_service::streaming_applier_service(
{
Wsrep_high_priority_service&
orig_hps(static_cast<Wsrep_high_priority_service&>(orig_high_priority_service));
- THD* thd= new THD(next_thread_id(), true);
- init_service_thd(thd, orig_hps.m_thd->thread_stack);
- WSREP_DEBUG("Created streaming applier service in high priority "
- "context with thread id %llu", thd->thread_id);
- return new Wsrep_applier_service(thd);
+ return wsrep_create_streaming_applier(orig_hps.m_thd, "high priority");
}
void Wsrep_server_service::release_high_priority_service(wsrep::high_priority_service* high_priority_service)
@@ -107,7 +136,9 @@ void Wsrep_server_service::release_high_priority_service(wsrep::high_priority_se
static_cast<Wsrep_high_priority_service*>(high_priority_service);
THD* thd= hps->m_thd;
delete hps;
+ wsrep_store_threadvars(thd);
delete thd;
+ wsrep_delete_threadvars();
}
void Wsrep_server_service::background_rollback(wsrep::client_state& client_state)
diff --git a/sql/wsrep_server_service.h b/sql/wsrep_server_service.h
index b8f1f009cde..6336fe2c473 100644
--- a/sql/wsrep_server_service.h
+++ b/sql/wsrep_server_service.h
@@ -77,5 +77,14 @@ private:
Wsrep_server_state& m_server_state;
};
+/**
+ Helper method to create new streaming applier.
+
+ @param orig_thd Original thd context to copy operation context from.
+ @param ctx Context string for debug logging.
+ */
+class Wsrep_applier_service;
+Wsrep_applier_service*
+wsrep_create_streaming_applier(THD *orig_thd, const char *ctx);
#endif /* WSREP_SERVER_SERVICE */
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc
index e2878211d7e..62b30c0d67d 100644
--- a/sql/wsrep_sst.cc
+++ b/sql/wsrep_sst.cc
@@ -27,6 +27,8 @@
#include "wsrep_priv.h"
#include "wsrep_utils.h"
#include "wsrep_xid.h"
+#include "wsrep_thd.h"
+
#include <cstdio>
#include <cstdlib>
@@ -237,7 +239,7 @@ void wsrep_sst_received (THD* thd,
wsrep thread pool. Restore original thd context before returning.
*/
if (thd) {
- thd->store_globals();
+ wsrep_store_threadvars(thd);
}
else {
my_pthread_setspecific_ptr(THR_THD, NULL);
@@ -509,7 +511,8 @@ err:
thd->system_thread= SYSTEM_THREAD_GENERIC;
thd->real_id= pthread_self();
- thd->store_globals();
+ wsrep_assign_from_threadvars(thd);
+ wsrep_store_threadvars(thd);
/* */
thd->variables.wsrep_on = 0;
diff --git a/sql/wsrep_storage_service.cc b/sql/wsrep_storage_service.cc
index e164114b733..6dfe3eee448 100644
--- a/sql/wsrep_storage_service.cc
+++ b/sql/wsrep_storage_service.cc
@@ -196,18 +196,10 @@ int Wsrep_storage_service::rollback(const wsrep::ws_handle& ws_handle,
void Wsrep_storage_service::store_globals()
{
- DBUG_ENTER("Wsrep_storage_service::store_globals");
- DBUG_PRINT("info", ("Wsrep_storage_service::store_globals(%llu, %p)",
- m_thd->thread_id, m_thd));
- m_thd->store_globals();
- DBUG_VOID_RETURN;
+ wsrep_store_threadvars(m_thd);
}
void Wsrep_storage_service::reset_globals()
{
- DBUG_ENTER("Wsrep_storage_service::reset_globals");
- DBUG_PRINT("info", ("Wsrep_storage_service::reset_globals(%llu, %p)",
- m_thd->thread_id, m_thd));
- m_thd->reset_globals();
- DBUG_VOID_RETURN;
+ wsrep_reset_threadvars(m_thd);
}
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index addf98561de..50f0376f674 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -31,8 +31,9 @@
#include "rpl_rli.h"
#include "rpl_mi.h"
+extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys);
+
static Wsrep_thd_queue* wsrep_rollback_queue= 0;
-static Wsrep_thd_queue* wsrep_post_rollback_queue= 0;
static Atomic_counter<uint64_t> wsrep_bf_aborts_counter;
@@ -149,6 +150,122 @@ void wsrep_create_appliers(long threads)
}
}
+static void wsrep_rollback_streaming_aborted_by_toi(THD *thd)
+{
+ WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi");
+ /* Set thd->event_scheduler.data temporarily to NULL to avoid
+ callbacks to threadpool wait_begin() during rollback. */
+ auto saved_esd= thd->event_scheduler.data;
+ thd->event_scheduler.data= 0;
+ if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
+ {
+ DBUG_ASSERT(!saved_esd);
+ DBUG_ASSERT(thd->wsrep_applier_service);
+ thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
+ wsrep::ws_meta());
+ thd->wsrep_applier_service->after_apply();
+ /* Will free THD */
+ Wsrep_server_state::instance().server_service().
+ release_high_priority_service(thd->wsrep_applier_service);
+ }
+ else
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ /* prepare THD for rollback processing */
+ thd->reset_for_next_command(true);
+ thd->lex->sql_command= SQLCOM_ROLLBACK;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ /* Perform a client rollback, restore globals and signal
+ the victim only when all the resources have been
+ released */
+ thd->wsrep_cs().client_service().bf_rollback();
+ wsrep_reset_threadvars(thd);
+ /* Assign saved event_scheduler.data back before letting
+ client to continue. */
+ thd->event_scheduler.data= saved_esd;
+ thd->wsrep_cs().sync_rollback_complete();
+ }
+}
+
+static void wsrep_rollback_high_priority(THD *thd)
+{
+ WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)",
+ thd->thread_id, (long long)thd->real_id);
+ DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
+ /* Must be streaming and must have been removed from the
+ server state streaming appliers map. */
+ DBUG_ASSERT(thd->wsrep_trx().is_streaming());
+ DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
+ thd->wsrep_trx().server_id(),
+ thd->wsrep_trx().id()));
+ DBUG_ASSERT(thd->wsrep_applier_service);
+
+ /* Fragment removal should happen before rollback to make
+ the transaction non-observable in SR table after the rollback
+ completes. For correctness the order does not matter here,
+ but currently it is mandated by checks in some MTR tests. */
+ wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ Wsrep_storage_service* storage_service=
+ static_cast<Wsrep_storage_service*>(
+ Wsrep_server_state::instance().server_service().storage_service(
+ *thd->wsrep_applier_service));
+ storage_service->store_globals();
+ storage_service->adopt_transaction(thd->wsrep_trx());
+ storage_service->remove_fragments();
+ storage_service->commit(wsrep::ws_handle(transaction_id, 0),
+ wsrep::ws_meta());
+ Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
+ wsrep_store_threadvars(thd);
+ thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
+ wsrep::ws_meta());
+ thd->wsrep_applier_service->after_apply();
+ /* Will free THD */
+ Wsrep_server_state::instance().server_service()
+ .release_high_priority_service(thd->wsrep_applier_service);
+}
+
+static void wsrep_rollback_local(THD *thd)
+{
+ WSREP_INFO("Wsrep_rollback_local");
+ if (thd->wsrep_trx().is_streaming())
+ {
+ wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ Wsrep_storage_service* storage_service=
+ static_cast<Wsrep_storage_service*>(
+ Wsrep_server_state::instance().server_service().
+ storage_service(thd->wsrep_cs().client_service()));
+
+ storage_service->store_globals();
+ storage_service->adopt_transaction(thd->wsrep_trx());
+ storage_service->remove_fragments();
+ storage_service->commit(wsrep::ws_handle(transaction_id, 0),
+ wsrep::ws_meta());
+ Wsrep_server_state::instance().server_service().
+ release_storage_service(storage_service);
+ wsrep_store_threadvars(thd);
+ }
+ /* Set thd->event_scheduler.data temporarily to NULL to avoid
+ callbacks to threadpool wait_begin() during rollback. */
+ auto saved_esd= thd->event_scheduler.data;
+ thd->event_scheduler.data= 0;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ /* prepare THD for rollback processing */
+ thd->reset_for_next_command();
+ thd->lex->sql_command= SQLCOM_ROLLBACK;
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ /* Perform a client rollback, restore globals and signal
+ the victim only when all the resources have been
+ released */
+ thd->wsrep_cs().client_service().bf_rollback();
+ wsrep_reset_threadvars(thd);
+ /* Assign saved event_scheduler.data back before letting
+ client to continue. */
+ thd->event_scheduler.data= saved_esd;
+ thd->wsrep_cs().sync_rollback_complete();
+ WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
+ thd->thread_id, (long long)thd->real_id);
+}
+
static void wsrep_rollback_process(THD *rollbacker,
void *arg __attribute__((unused)))
{
@@ -170,119 +287,36 @@ static void wsrep_rollback_process(THD *rollbacker,
WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d",
(long long)thd->real_id,
tx.state());
-
mysql_mutex_unlock(&thd->LOCK_thd_data);
continue;
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
+ wsrep_reset_threadvars(rollbacker);
+ wsrep_store_threadvars(thd);
+ thd->wsrep_cs().acquire_ownership();
+
thd_proc_info(rollbacker, "wsrep aborter active");
- wsrep::transaction_id transaction_id(thd->wsrep_trx().id());
+ /* Rollback methods below may free thd pointer. Do not try
+ to access it after method returns. */
if (thd->wsrep_trx().is_streaming() &&
thd->wsrep_trx().bf_aborted_in_total_order())
{
- thd->store_globals();
- thd->wsrep_cs().store_globals();
- if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority)
- {
- DBUG_ASSERT(thd->wsrep_applier_service);
- thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
- wsrep::ws_meta());
- thd->wsrep_applier_service->after_apply();
- /* Will free THD */
- Wsrep_server_state::instance().server_service().
- release_high_priority_service(thd->wsrep_applier_service);
- }
- else
- {
- mysql_mutex_lock(&thd->LOCK_thd_data);
- /* prepare THD for rollback processing */
- thd->reset_for_next_command(true);
- thd->lex->sql_command= SQLCOM_ROLLBACK;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- /* Perform a client rollback, restore globals and signal
- the victim only when all the resources have been
- released */
- thd->wsrep_cs().client_service().bf_rollback();
- thd->reset_globals();
- thd->wsrep_cs().sync_rollback_complete();
- }
+ wsrep_rollback_streaming_aborted_by_toi(thd);
}
else if (wsrep_thd_is_applying(thd))
{
- WSREP_DEBUG("rollbacker aborting SR thd: (%lld %llu)",
- thd->thread_id, (long long)thd->real_id);
- DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority);
- /* Must be streaming and must have been removed from the
- server state streaming appliers map. */
- DBUG_ASSERT(thd->wsrep_trx().is_streaming());
- DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier(
- thd->wsrep_trx().server_id(),
- thd->wsrep_trx().id()));
- DBUG_ASSERT(thd->wsrep_applier_service);
-
- /* Fragment removal should happen before rollback to make
- the transaction non-observable in SR table after the rollback
- completes. For correctness the order does not matter here,
- but currently it is mandated by checks in some MTR tests. */
- Wsrep_storage_service* storage_service=
- static_cast<Wsrep_storage_service*>(
- Wsrep_server_state::instance().server_service().storage_service(
- *thd->wsrep_applier_service));
- storage_service->store_globals();
- storage_service->adopt_transaction(thd->wsrep_trx());
- storage_service->remove_fragments();
- storage_service->commit(wsrep::ws_handle(transaction_id, 0),
- wsrep::ws_meta());
- Wsrep_server_state::instance().server_service().release_storage_service(storage_service);
- thd->store_globals();
- thd->wsrep_cs().store_globals();
- thd->wsrep_applier_service->rollback(wsrep::ws_handle(),
- wsrep::ws_meta());
- thd->wsrep_applier_service->after_apply();
- /* Will free THD */
- Wsrep_server_state::instance().server_service()
- .release_high_priority_service(thd->wsrep_applier_service);
-
+ wsrep_rollback_high_priority(thd);
}
else
{
- if (thd->wsrep_trx().is_streaming())
- {
- Wsrep_storage_service* storage_service=
- static_cast<Wsrep_storage_service*>(
- Wsrep_server_state::instance().server_service().
- storage_service(thd->wsrep_cs().client_service()));
-
- storage_service->store_globals();
- storage_service->adopt_transaction(thd->wsrep_trx());
- storage_service->remove_fragments();
- storage_service->commit(wsrep::ws_handle(transaction_id, 0),
- wsrep::ws_meta());
- Wsrep_server_state::instance().server_service().
- release_storage_service(storage_service);
- }
- thd->store_globals();
- thd->wsrep_cs().store_globals();
- mysql_mutex_lock(&thd->LOCK_thd_data);
- /* prepare THD for rollback processing */
- thd->reset_for_next_command();
- thd->lex->sql_command= SQLCOM_ROLLBACK;
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- /* Perform a client rollback, restore globals and signal
- the victim only when all the resources have been
- released */
- thd->wsrep_cs().client_service().bf_rollback();
- thd->reset_globals();
- thd->wsrep_cs().sync_rollback_complete();
- WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)",
- thd->thread_id, (long long)thd->real_id);
+ wsrep_rollback_local(thd);
}
-
+ wsrep_store_threadvars(rollbacker);
thd_proc_info(rollbacker, "wsrep aborter idle");
}
-
+
delete wsrep_rollback_queue;
wsrep_rollback_queue= NULL;
@@ -293,39 +327,6 @@ static void wsrep_rollback_process(THD *rollbacker,
DBUG_VOID_RETURN;
}
-static void wsrep_post_rollback_process(THD *post_rollbacker,
- void *arg __attribute__((unused)))
-{
- DBUG_ENTER("wsrep_post_rollback_process");
- THD* thd= NULL;
-
- WSREP_INFO("Starting post rollbacker thread %llu", post_rollbacker->thread_id);
- DBUG_ASSERT(!wsrep_post_rollback_queue);
- wsrep_post_rollback_queue= new Wsrep_thd_queue(post_rollbacker);
-
- while ((thd= wsrep_post_rollback_queue->pop_front()) != NULL)
- {
- thd->store_globals();
- wsrep::client_state& cs(thd->wsrep_cs());
- mysql_mutex_lock(&thd->LOCK_thd_data);
- DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting);
- WSREP_DEBUG("post rollbacker calling post rollback for thd %llu, conf %s",
- thd->thread_id, wsrep_thd_transaction_state_str(thd));
-
- cs.after_rollback();
- DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborted);
- mysql_mutex_unlock(&thd->LOCK_thd_data);
- }
-
- delete wsrep_post_rollback_queue;
- wsrep_post_rollback_queue= NULL;
-
- DBUG_ASSERT(post_rollbacker->killed != NOT_KILLED);
- DBUG_PRINT("wsrep",("wsrep post rollbacker thread exiting"));
- WSREP_INFO("post rollbacker thread exiting %llu", post_rollbacker->thread_id);
- DBUG_VOID_RETURN;
-}
-
void wsrep_create_rollbacker()
{
if (wsrep_cluster_address && wsrep_cluster_address[0] != 0)
@@ -337,14 +338,6 @@ void wsrep_create_rollbacker()
/* create rollbacker */
if (create_wsrep_THD(args))
WSREP_WARN("Can't create thread to manage wsrep rollback");
-
- /* create post_rollbacker */
- args= new Wsrep_thd_args(wsrep_post_rollback_process,
- WSREP_ROLLBACKER_THREAD,
- pthread_self());
-
- if (create_wsrep_THD(args))
- WSREP_WARN("Can't create thread to manage wsrep post rollback");
}
}
@@ -438,3 +431,84 @@ void wsrep_thd_auto_increment_variables(THD* thd,
*offset= thd->variables.auto_increment_offset;
*increment= thd->variables.auto_increment_increment;
}
+
+int wsrep_create_threadvars()
+{
+ int ret= 0;
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ /* Caller should have called wsrep_reset_threadvars() before this
+ method. */
+ DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys));
+ pthread_setspecific(THR_KEY_mysys, 0);
+ ret= my_thread_init();
+ }
+ return ret;
+}
+
+void wsrep_delete_threadvars()
+{
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ /* The caller should have called wsrep_store_threadvars() before
+ this method. */
+ DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys));
+ /* Reset psi state to avoid deallocating applier thread
+ psi_thread. */
+ PSI_thread *psi_thread= PSI_CALL_get_thread();
+#ifdef HAVE_PSI_INTERFACE
+ if (PSI_server)
+ {
+ PSI_server->set_thread(0);
+ }
+#endif /* HAVE_PSI_INTERFACE */
+ my_thread_end();
+ PSI_CALL_set_thread(psi_thread);
+ pthread_setspecific(THR_KEY_mysys, 0);
+ }
+}
+
+void wsrep_assign_from_threadvars(THD *thd)
+{
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys);
+ DBUG_ASSERT(mysys_var);
+ thd->set_mysys_var(mysys_var);
+ }
+}
+
+Wsrep_threadvars wsrep_save_threadvars()
+{
+ return Wsrep_threadvars{
+ current_thd,
+ (st_my_thread_var*) pthread_getspecific(THR_KEY_mysys)
+ };
+}
+
+void wsrep_restore_threadvars(const Wsrep_threadvars& globals)
+{
+ set_current_thd(globals.cur_thd);
+ pthread_setspecific(THR_KEY_mysys, globals.mysys_var);
+}
+
+int wsrep_store_threadvars(THD *thd)
+{
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ pthread_setspecific(THR_KEY_mysys, thd->mysys_var);
+ }
+ return thd->store_globals();
+}
+
+void wsrep_reset_threadvars(THD *thd)
+{
+ if (thread_handling == SCHEDULER_TYPES_COUNT)
+ {
+ pthread_setspecific(THR_KEY_mysys, 0);
+ }
+ else
+ {
+ thd->reset_globals();
+ }
+}
diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h
index 2eceb3223a8..872570cd028 100644
--- a/sql/wsrep_thd.h
+++ b/sql/wsrep_thd.h
@@ -82,13 +82,8 @@ private:
mysql_cond_t COND_wsrep_thd_queue;
};
-void wsrep_prepare_bf_thd(THD*, struct wsrep_thd_shadow*);
-void wsrep_return_from_bf_mode(THD*, struct wsrep_thd_shadow*);
-
int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff,
enum enum_var_type scope);
-void wsrep_client_rollback(THD *thd, bool rollbacker = false);
-void wsrep_replay_transaction(THD *thd);
void wsrep_create_appliers(long threads);
void wsrep_create_rollbacker();
@@ -96,8 +91,83 @@ bool wsrep_bf_abort(const THD*, THD*);
int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
my_bool signal);
extern void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe);
-THD* wsrep_start_SR_THD(char *thread_stack);
-void wsrep_end_SR_THD(THD* thd);
+
+/*
+ Helper methods to deal with thread local storage.
+ The purpose of these methods is to hide the details of thread
+ local storage handling when operating with wsrep storage access
+ and streaming applier THDs
+
+ With one-thread-per-connection thread handling thread specific
+ variables are allocated when the thread is started and deallocated
+ before thread exits (my_thread_init(), my_thread_end()). However,
+ with pool-of-threads thread handling new thread specific variables
+ are allocated for each THD separately (see threadpool_add_connection()),
+ and the variables in thread local storage are assigned from
+ currently active thread (see thread_attach()). This must be taken into
+ account when storing/resetting thread local storage and when creating
+ streaming applier THDs.
+*/
+
+/**
+ Create new variables for thread local storage. With
+ one-thread-per-connection thread handling this is a no op,
+ with pool-of-threads new variables are created via my_thread_init().
+ It is assumed that the caller has called wsrep_reset_threadvars() to clear
+ the thread local storage before this call.
+
+ @return Zero in case of success, non-zero otherwise.
+*/
+int wsrep_create_threadvars();
+
+/**
+ Delete variables which were created by wsrep_create_threadvars().
+ The caller must store variables into thread local storage before
+ this call via wsrep_store_threadvars().
+*/
+void wsrep_delete_threadvars();
+
+/**
+ Assign variables from current thread local storage into THD.
+ This should be called for THDs whose lifetime is limited to single
+ thread execution or which may share the operation context with some
+ parent THD (e.g. storage access) and thus don't require separately
+ allocated globals.
+
+ With one-thread-per-connection thread handling this is a no-op,
+ with pool-of-threads the variables which are currently stored into
+ thread local storage are assigned to THD.
+*/
+void wsrep_assign_from_threadvars(THD *);
+
+/**
+ Helper struct to save variables from thread local storage.
+ */
+struct Wsrep_threadvars
+{
+ THD* cur_thd;
+ st_my_thread_var* mysys_var;
+};
+
+/**
+ Save variables from thread local storage into Wsrep_threadvars struct.
+ */
+Wsrep_threadvars wsrep_save_threadvars();
+
+/**
+ Restore variables into thread local storage from Wsrep_threadvars struct.
+*/
+void wsrep_restore_threadvars(const Wsrep_threadvars&);
+
+/**
+ Store variables into thread local storage.
+*/
+int wsrep_store_threadvars(THD *);
+
+/**
+ Reset thread local storage.
+*/
+void wsrep_reset_threadvars(THD *);
/**
Helper functions to override error status
diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h
index 64ac80783c9..6bb26c40064 100644
--- a/sql/wsrep_trans_observer.h
+++ b/sql/wsrep_trans_observer.h
@@ -422,6 +422,17 @@ static inline void wsrep_close(THD* thd)
DBUG_VOID_RETURN;
}
+static inline void
+wsrep_wait_rollback_complete_and_acquire_ownership(THD *thd)
+{
+ DBUG_ENTER("wsrep_wait_rollback_complete_and_acquire_ownership");
+ if (thd->wsrep_cs().state() != wsrep::client_state::s_none)
+ {
+ thd->wsrep_cs().wait_rollback_complete_and_acquire_ownership();
+ }
+ DBUG_VOID_RETURN;
+}
+
static inline int wsrep_before_command(THD* thd)
{
return (thd->wsrep_cs().state() != wsrep::client_state::s_none ?
diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc
index 52949a95e5d..49ea78a3872 100644
--- a/sql/wsrep_utils.cc
+++ b/sql/wsrep_utils.cc
@@ -25,6 +25,7 @@
#include "wsrep_api.h"
#include "wsrep_utils.h"
#include "wsrep_mysqld.h"
+#include "wsrep_thd.h"
#include <sql_class.h>
@@ -421,7 +422,8 @@ thd::thd (my_bool won) : init(), ptr(new THD(0))
if (ptr)
{
ptr->thread_stack= (char*) &ptr;
- ptr->store_globals();
+ wsrep_assign_from_threadvars(ptr);
+ wsrep_store_threadvars(ptr);
ptr->variables.option_bits&= ~OPTION_BIN_LOG; // disable binlog
ptr->variables.wsrep_on= won;
ptr->security_ctx->master_access= ~(ulong)0;
diff --git a/wsrep-lib b/wsrep-lib
-Subproject 0f676bd89378c7c823cff7ae7cdaef3cafcca23
+Subproject 58aa3e821f575532870c5f76f6f1cf833458eed