diff options
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r-- | sql/wsrep_thd.cc | 115 |
1 files changed, 36 insertions, 79 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 9d70875c027..7f1818def73 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -136,99 +136,60 @@ void wsrep_create_appliers(long threads) } } -static void wsrep_rollback_streaming_aborted_by_toi(THD *thd) +static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx) { - WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi"); - /* Set thd->event_scheduler.data temporarily to NULL to avoid - callbacks to threadpool wait_begin() during rollback. */ - auto saved_esd= thd->event_scheduler.data; - thd->event_scheduler.data= 0; - if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority) - { - DBUG_ASSERT(!saved_esd); - DBUG_ASSERT(thd->wsrep_applier_service); - thd->wsrep_applier_service->rollback(wsrep::ws_handle(), - wsrep::ws_meta()); - thd->wsrep_applier_service->after_apply(); - /* Will free THD */ - Wsrep_server_state::instance().server_service(). - release_high_priority_service(thd->wsrep_applier_service); - } - else - { - mysql_mutex_lock(&thd->LOCK_thd_data); - /* prepare THD for rollback processing */ - thd->reset_for_next_command(true); - thd->lex->sql_command= SQLCOM_ROLLBACK; - mysql_mutex_unlock(&thd->LOCK_thd_data); - /* Perform a client rollback, restore globals and signal - the victim only when all the resources have been - released */ - thd->wsrep_cs().client_service().bf_rollback(); - wsrep_reset_threadvars(thd); - /* Assign saved event_scheduler.data back before letting - client to continue. */ - thd->event_scheduler.data= saved_esd; - thd->wsrep_cs().sync_rollback_complete(); - } + wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx); + storage_service->store_globals(); + storage_service->adopt_transaction(thd->wsrep_trx()); + storage_service->remove_fragments(); + storage_service->commit(wsrep::ws_handle(transaction_id, 0), + wsrep::ws_meta()); + Wsrep_server_state::instance().server_service() + .release_storage_service(storage_service); + wsrep_store_threadvars(thd); } -static void wsrep_rollback_high_priority(THD *thd) +static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker) { - WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)", - thd->thread_id, (long long)thd->real_id); + WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)", + thd->thread_id, thd->real_id); + char* orig_thread_stack= thd->thread_stack; + thd->thread_stack= rollbacker->thread_stack; DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority); /* Must be streaming and must have been removed from the server state streaming appliers map. */ DBUG_ASSERT(thd->wsrep_trx().is_streaming()); DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier( - thd->wsrep_trx().server_id(), - thd->wsrep_trx().id())); + thd->wsrep_trx().server_id(), + thd->wsrep_trx().id())); DBUG_ASSERT(thd->wsrep_applier_service); /* Fragment removal should happen before rollback to make the transaction non-observable in SR table after the rollback completes. For correctness the order does not matter here, but currently it is mandated by checks in some MTR tests. */ - wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); - Wsrep_storage_service* storage_service= - static_cast<Wsrep_storage_service*>( - Wsrep_server_state::instance().server_service().storage_service( - *thd->wsrep_applier_service)); - storage_service->store_globals(); - storage_service->adopt_transaction(thd->wsrep_trx()); - storage_service->remove_fragments(); - storage_service->commit(wsrep::ws_handle(transaction_id, 0), - wsrep::ws_meta()); - Wsrep_server_state::instance().server_service().release_storage_service(storage_service); - wsrep_store_threadvars(thd); + wsrep_remove_streaming_fragments(thd, "high priority"); thd->wsrep_applier_service->rollback(wsrep::ws_handle(), wsrep::ws_meta()); thd->wsrep_applier_service->after_apply(); + thd->thread_stack= orig_thread_stack; + WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)", + thd->thread_id, thd->real_id); /* Will free THD */ Wsrep_server_state::instance().server_service() .release_high_priority_service(thd->wsrep_applier_service); } -static void wsrep_rollback_local(THD *thd) +static void wsrep_rollback_local(THD *thd, THD *rollbacker) { - WSREP_INFO("Wsrep_rollback_local"); + WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)", + thd->thread_id, thd->real_id); + char* orig_thread_stack= thd->thread_stack; + thd->thread_stack= rollbacker->thread_stack; if (thd->wsrep_trx().is_streaming()) { - wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); - Wsrep_storage_service* storage_service= - static_cast<Wsrep_storage_service*>( - Wsrep_server_state::instance().server_service(). - storage_service(thd->wsrep_cs().client_service())); - - storage_service->store_globals(); - storage_service->adopt_transaction(thd->wsrep_trx()); - storage_service->remove_fragments(); - storage_service->commit(wsrep::ws_handle(transaction_id, 0), - wsrep::ws_meta()); - Wsrep_server_state::instance().server_service(). - release_storage_service(storage_service); - wsrep_store_threadvars(thd); + wsrep_remove_streaming_fragments(thd, "local"); } /* Set thd->event_scheduler.data temporarily to NULL to avoid callbacks to threadpool wait_begin() during rollback. */ @@ -247,9 +208,10 @@ static void wsrep_rollback_local(THD *thd) /* Assign saved event_scheduler.data back before letting client to continue. */ thd->event_scheduler.data= saved_esd; + thd->thread_stack= orig_thread_stack; thd->wsrep_cs().sync_rollback_complete(); - WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)", - thd->thread_id, (long long)thd->real_id); + WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)", + thd->thread_id, thd->real_id); } static void wsrep_rollback_process(THD *rollbacker, @@ -286,18 +248,13 @@ static void wsrep_rollback_process(THD *rollbacker, /* Rollback methods below may free thd pointer. Do not try to access it after method returns. */ - if (thd->wsrep_trx().is_streaming() && - thd->wsrep_trx().bf_aborted_in_total_order()) - { - wsrep_rollback_streaming_aborted_by_toi(thd); - } - else if (wsrep_thd_is_applying(thd)) + if (wsrep_thd_is_applying(thd)) { - wsrep_rollback_high_priority(thd); + wsrep_rollback_high_priority(thd, rollbacker); } else { - wsrep_rollback_local(thd); + wsrep_rollback_local(thd, rollbacker); } wsrep_store_threadvars(rollbacker); thd_proc_info(rollbacker, "wsrep aborter idle"); @@ -345,7 +302,7 @@ void wsrep_fire_rollbacker(THD *thd) } -int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) +int wsrep_abort_thd(THD *bf_thd_ptr, THD *victim_thd_ptr, my_bool signal) { DBUG_ENTER("wsrep_abort_thd"); THD *victim_thd= (THD *) victim_thd_ptr; @@ -373,7 +330,7 @@ int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) bool wsrep_bf_abort(const THD* bf_thd, THD* victim_thd) { - WSREP_LOG_THD((THD*)bf_thd, "BF aborter before"); + WSREP_LOG_THD(bf_thd, "BF aborter before"); WSREP_LOG_THD(victim_thd, "victim before"); wsrep::seqno bf_seqno(bf_thd->wsrep_trx().ws_meta().seqno()); |