diff options
-rw-r--r-- | mysql-test/suite/galera/r/galera_performance_schema.result | 1 | ||||
-rw-r--r-- | sql/service_wsrep.cc | 16 | ||||
-rw-r--r-- | sql/sql_connect.cc | 18 | ||||
-rw-r--r-- | sql/threadpool_common.cc | 9 | ||||
-rw-r--r-- | sql/wsrep_client_service.cc | 10 | ||||
-rw-r--r-- | sql/wsrep_high_priority_service.cc | 26 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 7 | ||||
-rw-r--r-- | sql/wsrep_schema.cc | 22 | ||||
-rw-r--r-- | sql/wsrep_server_service.cc | 51 | ||||
-rw-r--r-- | sql/wsrep_server_service.h | 9 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 7 | ||||
-rw-r--r-- | sql/wsrep_storage_service.cc | 12 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 346 | ||||
-rw-r--r-- | sql/wsrep_thd.h | 84 | ||||
-rw-r--r-- | sql/wsrep_trans_observer.h | 11 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 4 | ||||
m--------- | wsrep-lib | 0 |
17 files changed, 416 insertions, 217 deletions
diff --git a/mysql-test/suite/galera/r/galera_performance_schema.result b/mysql-test/suite/galera/r/galera_performance_schema.result index 86d773bb1ef..ea0b6ad9ef4 100644 --- a/mysql-test/suite/galera/r/galera_performance_schema.result +++ b/mysql-test/suite/galera/r/galera_performance_schema.result @@ -7,7 +7,6 @@ WHERE name LIKE 'thread/sql/wsrep%' ORDER BY name; name thread/sql/wsrep_applier_thread name thread/sql/wsrep_rollbacker_thread -name thread/sql/wsrep_rollbacker_thread use test; create table t1 (a int not null primary key) engine=innodb; insert into t1 values (1),(2); diff --git a/sql/service_wsrep.cc b/sql/service_wsrep.cc index 8583897e064..43944366665 100644 --- a/sql/service_wsrep.cc +++ b/sql/service_wsrep.cc @@ -146,11 +146,10 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd, victim_thd->wsrep_trx_id(), victim_thd->wsrep_sr().fragments_certified(), wsrep_thd_transaction_state_str(victim_thd)); - if (bf_thd && bf_thd != victim_thd) - { - victim_thd->store_globals(); - } - else + + /* Note: do not store/reset globals before wsrep_bf_abort() call + to avoid losing BF thd context. */ + if (!(bf_thd && bf_thd != victim_thd)) { DEBUG_SYNC(victim_thd, "wsrep_before_SR_rollback"); } @@ -162,21 +161,24 @@ extern "C" void wsrep_handle_SR_rollback(THD *bf_thd, { wsrep_thd_self_abort(victim_thd); } - if (bf_thd && bf_thd != victim_thd) + if (bf_thd) { - bf_thd->store_globals(); + wsrep_store_threadvars(bf_thd); } } extern "C" my_bool wsrep_thd_bf_abort(const THD *bf_thd, THD *victim_thd, my_bool signal) { + /* Note: do not store/reset globals before wsrep_bf_abort() call + to avoid losing BF thd context. */ if (WSREP(victim_thd) && !victim_thd->wsrep_trx().active()) { WSREP_DEBUG("BF abort for non active transaction"); wsrep_start_transaction(victim_thd, victim_thd->wsrep_next_trx_id()); } my_bool ret= wsrep_bf_abort(bf_thd, victim_thd); + wsrep_store_threadvars((THD*)bf_thd); /* Send awake signal if victim was BF aborted or does not have wsrep on. Note that this should never interrupt RSU diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc index b8fff8c864a..6bf43d3df5e 100644 --- a/sql/sql_connect.cc +++ b/sql/sql_connect.cc @@ -1188,6 +1188,16 @@ void end_connection(THD *thd) { NET *net= &thd->net; +#ifdef WITH_WSREP + if (thd->wsrep_cs().state() == wsrep::client_state::s_exec) + { + /* Error happened after the thread acquired ownership to wsrep + client state, but before command was processed. Clean up the + state before wsrep_close(). */ + wsrep_after_command_ignore_result(thd); + } + wsrep_close(thd); +#endif /* WITH_WSREP */ if (thd->user_connect) { /* @@ -1321,6 +1331,7 @@ bool thd_prepare_connection(THD *thd) prepare_new_connection_state(thd); #ifdef WITH_WSREP thd->wsrep_client_thread= true; + wsrep_open(thd); #endif /* WITH_WSREP */ return FALSE; } @@ -1393,9 +1404,6 @@ void do_handle_one_connection(CONNECT *connect) create_user= FALSE; goto end_thread; } -#ifdef WITH_WSREP - wsrep_open(thd); -#endif /* WITH_WSREP */ while (thd_is_connection_alive(thd)) { @@ -1406,10 +1414,6 @@ void do_handle_one_connection(CONNECT *connect) } end_connection(thd); -#ifdef WITH_WSREP - wsrep_close(thd); -#endif /* WITH_WSREP */ - end_thread: close_connection(thd); diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc index cfb831e2f55..6577472e20b 100644 --- a/sql/threadpool_common.cc +++ b/sql/threadpool_common.cc @@ -23,7 +23,9 @@ #include <sql_audit.h> #include <debug_sync.h> #include <threadpool.h> - +#ifdef WITH_WSREP +#include "wsrep_trans_observer.h" +#endif /* WITH_WSREP */ /* Threadpool parameters */ @@ -137,6 +139,11 @@ static inline void set_thd_idle(THD *thd) */ static void thread_attach(THD* thd) { +#ifdef WITH_WSREP + /* Wait until possible background rollback has finished before + attaching the thd. */ + wsrep_wait_rollback_complete_and_acquire_ownership(thd); +#endif /* WITH_WSREP */ pthread_setspecific(THR_KEY_mysys,thd->mysys_var); thd->thread_stack=(char*)&thd; thd->store_globals(); diff --git a/sql/wsrep_client_service.cc b/sql/wsrep_client_service.cc index b182691c593..5f73f9f714f 100644 --- a/sql/wsrep_client_service.cc +++ b/sql/wsrep_client_service.cc @@ -30,9 +30,9 @@ #include "slave.h" /* opt_log_slave_updates */ #include "transaction.h" /* trans_commit()... */ #include "log.h" /* stmt_has_updated_trans_table() */ -//#include "debug_sync.h" #include "mysql/service_debug_sync.h" #include "mysql/psi/mysql_thread.h" /* mysql_mutex_assert_owner() */ + namespace { @@ -57,16 +57,12 @@ Wsrep_client_service::Wsrep_client_service(THD* thd, void Wsrep_client_service::store_globals() { - DBUG_ENTER("Wsrep_client_service::store_globals"); - m_thd->store_globals(); - DBUG_VOID_RETURN; + wsrep_store_threadvars(m_thd); } void Wsrep_client_service::reset_globals() { - DBUG_ENTER("Wsrep_client_service::reset_globals"); - m_thd->reset_globals(); - DBUG_VOID_RETURN; + wsrep_reset_threadvars(m_thd); } bool Wsrep_client_service::interrupted( diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index 68cf0d1877b..73cdbd1c217 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -379,20 +379,13 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, void Wsrep_high_priority_service::store_globals() { - DBUG_ENTER("Wsrep_high_priority_service::store_globals"); - /* In addition to calling THD::store_globals(), call - wsrep::client_state::store_globals() to gain ownership of - the client state */ - m_thd->store_globals(); - m_thd->wsrep_cs().store_globals(); - DBUG_VOID_RETURN; + wsrep_store_threadvars(m_thd); + m_thd->wsrep_cs().acquire_ownership(); } void Wsrep_high_priority_service::reset_globals() { - DBUG_ENTER("Wsrep_high_priority_service::reset_globals"); - m_thd->reset_globals(); - DBUG_VOID_RETURN; + wsrep_reset_threadvars(m_thd); } void Wsrep_high_priority_service::switch_execution_context(wsrep::high_priority_service& orig_high_priority_service) @@ -572,11 +565,14 @@ Wsrep_replayer_service::Wsrep_replayer_service(THD* replayer_thd, THD* orig_thd) thd_proc_info(orig_thd, "wsrep replaying trx"); /* - Swith execution context to replayer_thd and prepare it for + Switch execution context to replayer_thd and prepare it for replay execution. */ - orig_thd->reset_globals(); - replayer_thd->store_globals(); + /* Copy thd vars from orig_thd before reset, otherwise reset + for orig thd clears thread local storage before copy. */ + wsrep_assign_from_threadvars(replayer_thd); + wsrep_reset_threadvars(orig_thd); + wsrep_store_threadvars(replayer_thd); wsrep_open(replayer_thd); wsrep_before_command(replayer_thd); replayer_thd->wsrep_cs().clone_transaction_for_replay(orig_thd->wsrep_trx()); @@ -593,8 +589,8 @@ Wsrep_replayer_service::~Wsrep_replayer_service() wsrep_after_apply(replayer_thd); wsrep_after_command_ignore_result(replayer_thd); wsrep_close(replayer_thd); - replayer_thd->reset_globals(); - orig_thd->store_globals(); + wsrep_reset_threadvars(replayer_thd); + wsrep_store_threadvars(orig_thd); DBUG_ASSERT(!orig_thd->get_stmt_da()->is_sent()); DBUG_ASSERT(!orig_thd->get_stmt_da()->is_set()); diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 0a9adc5fa2c..fde29ae434d 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2243,6 +2243,7 @@ static void wsrep_close_thread(THD *thd) { thd->set_killed(KILL_CONNECTION); MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd)); + mysql_mutex_lock(&thd->LOCK_thd_kill); if (thd->mysys_var) { thd->mysys_var->abort=1; @@ -2255,6 +2256,7 @@ static void wsrep_close_thread(THD *thd) } mysql_mutex_unlock(&thd->mysys_var->mutex); } + mysql_mutex_unlock(&thd->LOCK_thd_kill); } static my_bool have_committing_connections(THD *thd, void *) @@ -2658,7 +2660,8 @@ void* start_wsrep_THD(void *arg) /* now that we've called my_thread_init(), it is safe to call DBUG_* */ thd->thread_stack= (char*) &thd; - if (thd->store_globals()) + wsrep_assign_from_threadvars(thd); + if (wsrep_store_threadvars(thd)) { close_connection(thd, ER_OUT_OF_RESOURCES); statistic_increment(aborted_connects,&LOCK_status); @@ -2703,7 +2706,7 @@ void* start_wsrep_THD(void *arg) /* Wsrep may reset globals during thread context switches, store globals before cleanup. */ - thd->store_globals(); + wsrep_store_threadvars(thd); close_connection(thd, 0); diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index 064b6cf9f46..066ea124fb7 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -29,6 +29,7 @@ #include "wsrep_binlog.h" #include "wsrep_high_priority_service.h" #include "wsrep_storage_service.h" +#include "wsrep_thd.h" #include <string> #include <sstream> @@ -145,13 +146,13 @@ public: : m_orig_thd(orig_thd) , m_cur_thd(cur_thd) { - m_orig_thd->reset_globals(); - m_cur_thd->store_globals(); + wsrep_reset_threadvars(m_orig_thd); + wsrep_store_threadvars(m_cur_thd); } ~thd_context_switch() { - m_cur_thd->reset_globals(); - m_orig_thd->store_globals(); + wsrep_reset_threadvars(m_cur_thd); + wsrep_store_threadvars(m_orig_thd); } private: THD *m_orig_thd; @@ -595,7 +596,8 @@ static void wsrep_init_thd_for_schema(THD *thd) thd->variables.option_bits |= OPTION_LOG_OFF; /* Read committed isolation to avoid gap locking */ thd->variables.tx_isolation= ISO_READ_COMMITTED; - thd->store_globals(); + wsrep_assign_from_threadvars(thd); + wsrep_store_threadvars(thd); } int Wsrep_schema::init() @@ -1123,6 +1125,7 @@ int Wsrep_schema::replay_transaction(THD* orig_thd, THD thd(next_thread_id(), true); thd.thread_stack= (orig_thd ? orig_thd->thread_stack : (char*) &thd); + wsrep_assign_from_threadvars(&thd); Wsrep_schema_impl::wsrep_off wsrep_off(&thd); Wsrep_schema_impl::binlog_off binlog_off(&thd); @@ -1228,6 +1231,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) THD storage_thd(next_thread_id(), true); storage_thd.thread_stack= (orig_thd ? orig_thd->thread_stack : (char*) &storage_thd); + wsrep_assign_from_threadvars(&storage_thd); TABLE* frag_table= 0; TABLE* cluster_table= 0; Wsrep_storage_service storage_service(&storage_thd); @@ -1333,12 +1337,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) transaction_id))) { DBUG_ASSERT(wsrep::starts_transaction(flags)); - THD* thd= new THD(next_thread_id(), true); - thd->thread_stack= (char*)&storage_thd; - - thd->real_id= pthread_self(); - - applier= new Wsrep_applier_service(thd); + applier = wsrep_create_streaming_applier(&storage_thd, "recovery"); server_state.start_streaming_applier(server_id, transaction_id, applier); applier->start_transaction(wsrep::ws_handle(transaction_id, 0), @@ -1364,6 +1363,7 @@ int Wsrep_schema::recover_sr_transactions(THD *orig_thd) Wsrep_schema_impl::end_scan(frag_table); Wsrep_schema_impl::finish_stmt(&storage_thd); trans_commit(&storage_thd); + storage_thd.set_mysys_var(0); out: DBUG_RETURN(ret); } diff --git a/sql/wsrep_server_service.cc b/sql/wsrep_server_service.cc index 42856862db3..bfb85e3d0ab 100644 --- a/sql/wsrep_server_service.cc +++ b/sql/wsrep_server_service.cc @@ -26,6 +26,7 @@ #include "wsrep_mysqld.h" #include "wsrep_schema.h" #include "wsrep_utils.h" +#include "wsrep_thd.h" #include "log.h" /* sql_print_xxx() */ #include "sql_class.h" /* system variables */ @@ -50,6 +51,10 @@ wsrep::storage_service* Wsrep_server_service::storage_service( init_service_thd(thd, cs.m_thd->thread_stack); WSREP_DEBUG("Created storage service with thread id %llu", thd->thread_id); + /* Use variables from the current thd attached to client_service. + This is because we need to be able to BF abort storage access + operations. */ + wsrep_assign_from_threadvars(thd); return new Wsrep_storage_service(thd); } @@ -62,6 +67,7 @@ wsrep::storage_service* Wsrep_server_service::storage_service( init_service_thd(thd, hps.m_thd->thread_stack); WSREP_DEBUG("Created high priority storage service with thread id %llu", thd->thread_id); + wsrep_assign_from_threadvars(thd); return new Wsrep_storage_service(thd); } @@ -71,21 +77,48 @@ void Wsrep_server_service::release_storage_service( Wsrep_storage_service* ss= static_cast<Wsrep_storage_service*>(storage_service); THD* thd= ss->m_thd; + wsrep_reset_threadvars(thd); delete ss; delete thd; } +Wsrep_applier_service* +wsrep_create_streaming_applier(THD *orig_thd, const char *ctx) +{ + /* Reset variables to allow creating new variables in thread local + storage for new THD if needed. Note that reset must be done for + current_thd, as orig_thd may not be in effect. This may be the case when + streaming transaction is BF aborted and streaming applier + is created from BF aborter context. */ + Wsrep_threadvars saved_threadvars(wsrep_save_threadvars()); + wsrep_reset_threadvars(saved_threadvars.cur_thd); + THD *thd= 0; + Wsrep_applier_service *ret= 0; + if (!wsrep_create_threadvars() && + (thd= new THD(next_thread_id(), true))) + { + init_service_thd(thd, orig_thd->thread_stack); + wsrep_assign_from_threadvars(thd); + WSREP_DEBUG("Created streaming applier service in %s context with " + "thread id %llu", ctx, thd->thread_id); + if (!(ret= new (std::nothrow) Wsrep_applier_service(thd))) + { + delete thd; + } + } + /* Restore original thread local storage state before returning. */ + wsrep_restore_threadvars(saved_threadvars); + wsrep_store_threadvars(saved_threadvars.cur_thd); + return ret; +} + wsrep::high_priority_service* Wsrep_server_service::streaming_applier_service( wsrep::client_service& orig_client_service) { Wsrep_client_service& orig_cs= static_cast<Wsrep_client_service&>(orig_client_service); - THD* thd= new THD(next_thread_id(), true); - init_service_thd(thd, orig_cs.m_thd->thread_stack); - WSREP_DEBUG("Created streaming applier service in local context with " - "thread id %llu", thd->thread_id); - return new Wsrep_applier_service(thd); + return wsrep_create_streaming_applier(orig_cs.m_thd, "local"); } wsrep::high_priority_service* @@ -94,11 +127,7 @@ Wsrep_server_service::streaming_applier_service( { Wsrep_high_priority_service& orig_hps(static_cast<Wsrep_high_priority_service&>(orig_high_priority_service)); - THD* thd= new THD(next_thread_id(), true); - init_service_thd(thd, orig_hps.m_thd->thread_stack); - WSREP_DEBUG("Created streaming applier service in high priority " - "context with thread id %llu", thd->thread_id); - return new Wsrep_applier_service(thd); + return wsrep_create_streaming_applier(orig_hps.m_thd, "high priority"); } void Wsrep_server_service::release_high_priority_service(wsrep::high_priority_service* high_priority_service) @@ -107,7 +136,9 @@ void Wsrep_server_service::release_high_priority_service(wsrep::high_priority_se static_cast<Wsrep_high_priority_service*>(high_priority_service); THD* thd= hps->m_thd; delete hps; + wsrep_store_threadvars(thd); delete thd; + wsrep_delete_threadvars(); } void Wsrep_server_service::background_rollback(wsrep::client_state& client_state) diff --git a/sql/wsrep_server_service.h b/sql/wsrep_server_service.h index b8f1f009cde..6336fe2c473 100644 --- a/sql/wsrep_server_service.h +++ b/sql/wsrep_server_service.h @@ -77,5 +77,14 @@ private: Wsrep_server_state& m_server_state; }; +/** + Helper method to create new streaming applier. + + @param orig_thd Original thd context to copy operation context from. + @param ctx Context string for debug logging. + */ +class Wsrep_applier_service; +Wsrep_applier_service* +wsrep_create_streaming_applier(THD *orig_thd, const char *ctx); #endif /* WSREP_SERVER_SERVICE */ diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index e2878211d7e..62b30c0d67d 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -27,6 +27,8 @@ #include "wsrep_priv.h" #include "wsrep_utils.h" #include "wsrep_xid.h" +#include "wsrep_thd.h" + #include <cstdio> #include <cstdlib> @@ -237,7 +239,7 @@ void wsrep_sst_received (THD* thd, wsrep thread pool. Restore original thd context before returning. */ if (thd) { - thd->store_globals(); + wsrep_store_threadvars(thd); } else { my_pthread_setspecific_ptr(THR_THD, NULL); @@ -509,7 +511,8 @@ err: thd->system_thread= SYSTEM_THREAD_GENERIC; thd->real_id= pthread_self(); - thd->store_globals(); + wsrep_assign_from_threadvars(thd); + wsrep_store_threadvars(thd); /* */ thd->variables.wsrep_on = 0; diff --git a/sql/wsrep_storage_service.cc b/sql/wsrep_storage_service.cc index e164114b733..6dfe3eee448 100644 --- a/sql/wsrep_storage_service.cc +++ b/sql/wsrep_storage_service.cc @@ -196,18 +196,10 @@ int Wsrep_storage_service::rollback(const wsrep::ws_handle& ws_handle, void Wsrep_storage_service::store_globals() { - DBUG_ENTER("Wsrep_storage_service::store_globals"); - DBUG_PRINT("info", ("Wsrep_storage_service::store_globals(%llu, %p)", - m_thd->thread_id, m_thd)); - m_thd->store_globals(); - DBUG_VOID_RETURN; + wsrep_store_threadvars(m_thd); } void Wsrep_storage_service::reset_globals() { - DBUG_ENTER("Wsrep_storage_service::reset_globals"); - DBUG_PRINT("info", ("Wsrep_storage_service::reset_globals(%llu, %p)", - m_thd->thread_id, m_thd)); - m_thd->reset_globals(); - DBUG_VOID_RETURN; + wsrep_reset_threadvars(m_thd); } diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index addf98561de..50f0376f674 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -31,8 +31,9 @@ #include "rpl_rli.h" #include "rpl_mi.h" +extern "C" pthread_key(struct st_my_thread_var*, THR_KEY_mysys); + static Wsrep_thd_queue* wsrep_rollback_queue= 0; -static Wsrep_thd_queue* wsrep_post_rollback_queue= 0; static Atomic_counter<uint64_t> wsrep_bf_aborts_counter; @@ -149,6 +150,122 @@ void wsrep_create_appliers(long threads) } } +static void wsrep_rollback_streaming_aborted_by_toi(THD *thd) +{ + WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi"); + /* Set thd->event_scheduler.data temporarily to NULL to avoid + callbacks to threadpool wait_begin() during rollback. */ + auto saved_esd= thd->event_scheduler.data; + thd->event_scheduler.data= 0; + if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority) + { + DBUG_ASSERT(!saved_esd); + DBUG_ASSERT(thd->wsrep_applier_service); + thd->wsrep_applier_service->rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + thd->wsrep_applier_service->after_apply(); + /* Will free THD */ + Wsrep_server_state::instance().server_service(). + release_high_priority_service(thd->wsrep_applier_service); + } + else + { + mysql_mutex_lock(&thd->LOCK_thd_data); + /* prepare THD for rollback processing */ + thd->reset_for_next_command(true); + thd->lex->sql_command= SQLCOM_ROLLBACK; + mysql_mutex_unlock(&thd->LOCK_thd_data); + /* Perform a client rollback, restore globals and signal + the victim only when all the resources have been + released */ + thd->wsrep_cs().client_service().bf_rollback(); + wsrep_reset_threadvars(thd); + /* Assign saved event_scheduler.data back before letting + client to continue. */ + thd->event_scheduler.data= saved_esd; + thd->wsrep_cs().sync_rollback_complete(); + } +} + +static void wsrep_rollback_high_priority(THD *thd) +{ + WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)", + thd->thread_id, (long long)thd->real_id); + DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority); + /* Must be streaming and must have been removed from the + server state streaming appliers map. */ + DBUG_ASSERT(thd->wsrep_trx().is_streaming()); + DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier( + thd->wsrep_trx().server_id(), + thd->wsrep_trx().id())); + DBUG_ASSERT(thd->wsrep_applier_service); + + /* Fragment removal should happen before rollback to make + the transaction non-observable in SR table after the rollback + completes. For correctness the order does not matter here, + but currently it is mandated by checks in some MTR tests. */ + wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + Wsrep_storage_service* storage_service= + static_cast<Wsrep_storage_service*>( + Wsrep_server_state::instance().server_service().storage_service( + *thd->wsrep_applier_service)); + storage_service->store_globals(); + storage_service->adopt_transaction(thd->wsrep_trx()); + storage_service->remove_fragments(); + storage_service->commit(wsrep::ws_handle(transaction_id, 0), + wsrep::ws_meta()); + Wsrep_server_state::instance().server_service().release_storage_service(storage_service); + wsrep_store_threadvars(thd); + thd->wsrep_applier_service->rollback(wsrep::ws_handle(), + wsrep::ws_meta()); + thd->wsrep_applier_service->after_apply(); + /* Will free THD */ + Wsrep_server_state::instance().server_service() + .release_high_priority_service(thd->wsrep_applier_service); +} + +static void wsrep_rollback_local(THD *thd) +{ + WSREP_INFO("Wsrep_rollback_local"); + if (thd->wsrep_trx().is_streaming()) + { + wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + Wsrep_storage_service* storage_service= + static_cast<Wsrep_storage_service*>( + Wsrep_server_state::instance().server_service(). + storage_service(thd->wsrep_cs().client_service())); + + storage_service->store_globals(); + storage_service->adopt_transaction(thd->wsrep_trx()); + storage_service->remove_fragments(); + storage_service->commit(wsrep::ws_handle(transaction_id, 0), + wsrep::ws_meta()); + Wsrep_server_state::instance().server_service(). + release_storage_service(storage_service); + wsrep_store_threadvars(thd); + } + /* Set thd->event_scheduler.data temporarily to NULL to avoid + callbacks to threadpool wait_begin() during rollback. */ + auto saved_esd= thd->event_scheduler.data; + thd->event_scheduler.data= 0; + mysql_mutex_lock(&thd->LOCK_thd_data); + /* prepare THD for rollback processing */ + thd->reset_for_next_command(); + thd->lex->sql_command= SQLCOM_ROLLBACK; + mysql_mutex_unlock(&thd->LOCK_thd_data); + /* Perform a client rollback, restore globals and signal + the victim only when all the resources have been + released */ + thd->wsrep_cs().client_service().bf_rollback(); + wsrep_reset_threadvars(thd); + /* Assign saved event_scheduler.data back before letting + client to continue. */ + thd->event_scheduler.data= saved_esd; + thd->wsrep_cs().sync_rollback_complete(); + WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)", + thd->thread_id, (long long)thd->real_id); +} + static void wsrep_rollback_process(THD *rollbacker, void *arg __attribute__((unused))) { @@ -170,119 +287,36 @@ static void wsrep_rollback_process(THD *rollbacker, WSREP_DEBUG("rollbacker thd already aborted: %llu state: %d", (long long)thd->real_id, tx.state()); - mysql_mutex_unlock(&thd->LOCK_thd_data); continue; } mysql_mutex_unlock(&thd->LOCK_thd_data); + wsrep_reset_threadvars(rollbacker); + wsrep_store_threadvars(thd); + thd->wsrep_cs().acquire_ownership(); + thd_proc_info(rollbacker, "wsrep aborter active"); - wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + /* Rollback methods below may free thd pointer. Do not try + to access it after method returns. */ if (thd->wsrep_trx().is_streaming() && thd->wsrep_trx().bf_aborted_in_total_order()) { - thd->store_globals(); - thd->wsrep_cs().store_globals(); - if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority) - { - DBUG_ASSERT(thd->wsrep_applier_service); - thd->wsrep_applier_service->rollback(wsrep::ws_handle(), - wsrep::ws_meta()); - thd->wsrep_applier_service->after_apply(); - /* Will free THD */ - Wsrep_server_state::instance().server_service(). - release_high_priority_service(thd->wsrep_applier_service); - } - else - { - mysql_mutex_lock(&thd->LOCK_thd_data); - /* prepare THD for rollback processing */ - thd->reset_for_next_command(true); - thd->lex->sql_command= SQLCOM_ROLLBACK; - mysql_mutex_unlock(&thd->LOCK_thd_data); - /* Perform a client rollback, restore globals and signal - the victim only when all the resources have been - released */ - thd->wsrep_cs().client_service().bf_rollback(); - thd->reset_globals(); - thd->wsrep_cs().sync_rollback_complete(); - } + wsrep_rollback_streaming_aborted_by_toi(thd); } else if (wsrep_thd_is_applying(thd)) { - WSREP_DEBUG("rollbacker aborting SR thd: (%lld %llu)", - thd->thread_id, (long long)thd->real_id); - DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority); - /* Must be streaming and must have been removed from the - server state streaming appliers map. */ - DBUG_ASSERT(thd->wsrep_trx().is_streaming()); - DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier( - thd->wsrep_trx().server_id(), - thd->wsrep_trx().id())); - DBUG_ASSERT(thd->wsrep_applier_service); - - /* Fragment removal should happen before rollback to make - the transaction non-observable in SR table after the rollback - completes. For correctness the order does not matter here, - but currently it is mandated by checks in some MTR tests. */ - Wsrep_storage_service* storage_service= - static_cast<Wsrep_storage_service*>( - Wsrep_server_state::instance().server_service().storage_service( - *thd->wsrep_applier_service)); - storage_service->store_globals(); - storage_service->adopt_transaction(thd->wsrep_trx()); - storage_service->remove_fragments(); - storage_service->commit(wsrep::ws_handle(transaction_id, 0), - wsrep::ws_meta()); - Wsrep_server_state::instance().server_service().release_storage_service(storage_service); - thd->store_globals(); - thd->wsrep_cs().store_globals(); - thd->wsrep_applier_service->rollback(wsrep::ws_handle(), - wsrep::ws_meta()); - thd->wsrep_applier_service->after_apply(); - /* Will free THD */ - Wsrep_server_state::instance().server_service() - .release_high_priority_service(thd->wsrep_applier_service); - + wsrep_rollback_high_priority(thd); } else { - if (thd->wsrep_trx().is_streaming()) - { - Wsrep_storage_service* storage_service= - static_cast<Wsrep_storage_service*>( - Wsrep_server_state::instance().server_service(). - storage_service(thd->wsrep_cs().client_service())); - - storage_service->store_globals(); - storage_service->adopt_transaction(thd->wsrep_trx()); - storage_service->remove_fragments(); - storage_service->commit(wsrep::ws_handle(transaction_id, 0), - wsrep::ws_meta()); - Wsrep_server_state::instance().server_service(). - release_storage_service(storage_service); - } - thd->store_globals(); - thd->wsrep_cs().store_globals(); - mysql_mutex_lock(&thd->LOCK_thd_data); - /* prepare THD for rollback processing */ - thd->reset_for_next_command(); - thd->lex->sql_command= SQLCOM_ROLLBACK; - mysql_mutex_unlock(&thd->LOCK_thd_data); - /* Perform a client rollback, restore globals and signal - the victim only when all the resources have been - released */ - thd->wsrep_cs().client_service().bf_rollback(); - thd->reset_globals(); - thd->wsrep_cs().sync_rollback_complete(); - WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)", - thd->thread_id, (long long)thd->real_id); + wsrep_rollback_local(thd); } - + wsrep_store_threadvars(rollbacker); thd_proc_info(rollbacker, "wsrep aborter idle"); } - + delete wsrep_rollback_queue; wsrep_rollback_queue= NULL; @@ -293,39 +327,6 @@ static void wsrep_rollback_process(THD *rollbacker, DBUG_VOID_RETURN; } -static void wsrep_post_rollback_process(THD *post_rollbacker, - void *arg __attribute__((unused))) -{ - DBUG_ENTER("wsrep_post_rollback_process"); - THD* thd= NULL; - - WSREP_INFO("Starting post rollbacker thread %llu", post_rollbacker->thread_id); - DBUG_ASSERT(!wsrep_post_rollback_queue); - wsrep_post_rollback_queue= new Wsrep_thd_queue(post_rollbacker); - - while ((thd= wsrep_post_rollback_queue->pop_front()) != NULL) - { - thd->store_globals(); - wsrep::client_state& cs(thd->wsrep_cs()); - mysql_mutex_lock(&thd->LOCK_thd_data); - DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborting); - WSREP_DEBUG("post rollbacker calling post rollback for thd %llu, conf %s", - thd->thread_id, wsrep_thd_transaction_state_str(thd)); - - cs.after_rollback(); - DBUG_ASSERT(thd->wsrep_trx().state() == wsrep::transaction::s_aborted); - mysql_mutex_unlock(&thd->LOCK_thd_data); - } - - delete wsrep_post_rollback_queue; - wsrep_post_rollback_queue= NULL; - - DBUG_ASSERT(post_rollbacker->killed != NOT_KILLED); - DBUG_PRINT("wsrep",("wsrep post rollbacker thread exiting")); - WSREP_INFO("post rollbacker thread exiting %llu", post_rollbacker->thread_id); - DBUG_VOID_RETURN; -} - void wsrep_create_rollbacker() { if (wsrep_cluster_address && wsrep_cluster_address[0] != 0) @@ -337,14 +338,6 @@ void wsrep_create_rollbacker() /* create rollbacker */ if (create_wsrep_THD(args)) WSREP_WARN("Can't create thread to manage wsrep rollback"); - - /* create post_rollbacker */ - args= new Wsrep_thd_args(wsrep_post_rollback_process, - WSREP_ROLLBACKER_THREAD, - pthread_self()); - - if (create_wsrep_THD(args)) - WSREP_WARN("Can't create thread to manage wsrep post rollback"); } } @@ -438,3 +431,84 @@ void wsrep_thd_auto_increment_variables(THD* thd, *offset= thd->variables.auto_increment_offset; *increment= thd->variables.auto_increment_increment; } + +int wsrep_create_threadvars() +{ + int ret= 0; + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + /* Caller should have called wsrep_reset_threadvars() before this + method. */ + DBUG_ASSERT(!pthread_getspecific(THR_KEY_mysys)); + pthread_setspecific(THR_KEY_mysys, 0); + ret= my_thread_init(); + } + return ret; +} + +void wsrep_delete_threadvars() +{ + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + /* The caller should have called wsrep_store_threadvars() before + this method. */ + DBUG_ASSERT(pthread_getspecific(THR_KEY_mysys)); + /* Reset psi state to avoid deallocating applier thread + psi_thread. */ + PSI_thread *psi_thread= PSI_CALL_get_thread(); +#ifdef HAVE_PSI_INTERFACE + if (PSI_server) + { + PSI_server->set_thread(0); + } +#endif /* HAVE_PSI_INTERFACE */ + my_thread_end(); + PSI_CALL_set_thread(psi_thread); + pthread_setspecific(THR_KEY_mysys, 0); + } +} + +void wsrep_assign_from_threadvars(THD *thd) +{ + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + st_my_thread_var *mysys_var= (st_my_thread_var *)pthread_getspecific(THR_KEY_mysys); + DBUG_ASSERT(mysys_var); + thd->set_mysys_var(mysys_var); + } +} + +Wsrep_threadvars wsrep_save_threadvars() +{ + return Wsrep_threadvars{ + current_thd, + (st_my_thread_var*) pthread_getspecific(THR_KEY_mysys) + }; +} + +void wsrep_restore_threadvars(const Wsrep_threadvars& globals) +{ + set_current_thd(globals.cur_thd); + pthread_setspecific(THR_KEY_mysys, globals.mysys_var); +} + +int wsrep_store_threadvars(THD *thd) +{ + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + pthread_setspecific(THR_KEY_mysys, thd->mysys_var); + } + return thd->store_globals(); +} + +void wsrep_reset_threadvars(THD *thd) +{ + if (thread_handling == SCHEDULER_TYPES_COUNT) + { + pthread_setspecific(THR_KEY_mysys, 0); + } + else + { + thd->reset_globals(); + } +} diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h index 2eceb3223a8..872570cd028 100644 --- a/sql/wsrep_thd.h +++ b/sql/wsrep_thd.h @@ -82,13 +82,8 @@ private: mysql_cond_t COND_wsrep_thd_queue; }; -void wsrep_prepare_bf_thd(THD*, struct wsrep_thd_shadow*); -void wsrep_return_from_bf_mode(THD*, struct wsrep_thd_shadow*); - int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff, enum enum_var_type scope); -void wsrep_client_rollback(THD *thd, bool rollbacker = false); -void wsrep_replay_transaction(THD *thd); void wsrep_create_appliers(long threads); void wsrep_create_rollbacker(); @@ -96,8 +91,83 @@ bool wsrep_bf_abort(const THD*, THD*); int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal); extern void wsrep_thd_set_PA_safe(void *thd_ptr, my_bool safe); -THD* wsrep_start_SR_THD(char *thread_stack); -void wsrep_end_SR_THD(THD* thd); + +/* + Helper methods to deal with thread local storage. + The purpose of these methods is to hide the details of thread + local storage handling when operating with wsrep storage access + and streaming applier THDs + + With one-thread-per-connection thread handling thread specific + variables are allocated when the thread is started and deallocated + before thread exits (my_thread_init(), my_thread_end()). However, + with pool-of-threads thread handling new thread specific variables + are allocated for each THD separately (see threadpool_add_connection()), + and the variables in thread local storage are assigned from + currently active thread (see thread_attach()). This must be taken into + account when storing/resetting thread local storage and when creating + streaming applier THDs. +*/ + +/** + Create new variables for thread local storage. With + one-thread-per-connection thread handling this is a no op, + with pool-of-threads new variables are created via my_thread_init(). + It is assumed that the caller has called wsrep_reset_threadvars() to clear + the thread local storage before this call. + + @return Zero in case of success, non-zero otherwise. +*/ +int wsrep_create_threadvars(); + +/** + Delete variables which were created by wsrep_create_threadvars(). + The caller must store variables into thread local storage before + this call via wsrep_store_threadvars(). +*/ +void wsrep_delete_threadvars(); + +/** + Assign variables from current thread local storage into THD. + This should be called for THDs whose lifetime is limited to single + thread execution or which may share the operation context with some + parent THD (e.g. storage access) and thus don't require separately + allocated globals. + + With one-thread-per-connection thread handling this is a no-op, + with pool-of-threads the variables which are currently stored into + thread local storage are assigned to THD. +*/ +void wsrep_assign_from_threadvars(THD *); + +/** + Helper struct to save variables from thread local storage. + */ +struct Wsrep_threadvars +{ + THD* cur_thd; + st_my_thread_var* mysys_var; +}; + +/** + Save variables from thread local storage into Wsrep_threadvars struct. + */ +Wsrep_threadvars wsrep_save_threadvars(); + +/** + Restore variables into thread local storage from Wsrep_threadvars struct. +*/ +void wsrep_restore_threadvars(const Wsrep_threadvars&); + +/** + Store variables into thread local storage. +*/ +int wsrep_store_threadvars(THD *); + +/** + Reset thread local storage. +*/ +void wsrep_reset_threadvars(THD *); /** Helper functions to override error status diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 64ac80783c9..6bb26c40064 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -422,6 +422,17 @@ static inline void wsrep_close(THD* thd) DBUG_VOID_RETURN; } +static inline void +wsrep_wait_rollback_complete_and_acquire_ownership(THD *thd) +{ + DBUG_ENTER("wsrep_wait_rollback_complete_and_acquire_ownership"); + if (thd->wsrep_cs().state() != wsrep::client_state::s_none) + { + thd->wsrep_cs().wait_rollback_complete_and_acquire_ownership(); + } + DBUG_VOID_RETURN; +} + static inline int wsrep_before_command(THD* thd) { return (thd->wsrep_cs().state() != wsrep::client_state::s_none ? diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index 52949a95e5d..49ea78a3872 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -25,6 +25,7 @@ #include "wsrep_api.h" #include "wsrep_utils.h" #include "wsrep_mysqld.h" +#include "wsrep_thd.h" #include <sql_class.h> @@ -421,7 +422,8 @@ thd::thd (my_bool won) : init(), ptr(new THD(0)) if (ptr) { ptr->thread_stack= (char*) &ptr; - ptr->store_globals(); + wsrep_assign_from_threadvars(ptr); + wsrep_store_threadvars(ptr); ptr->variables.option_bits&= ~OPTION_BIN_LOG; // disable binlog ptr->variables.wsrep_on= won; ptr->security_ctx->master_access= ~(ulong)0; diff --git a/wsrep-lib b/wsrep-lib -Subproject 0f676bd89378c7c823cff7ae7cdaef3cafcca23 +Subproject 58aa3e821f575532870c5f76f6f1cf833458eed |