diff options
author | Daniele Sciascia <daniele.sciascia@galeracluster.com> | 2019-12-11 13:08:06 +0100 |
---|---|---|
committer | Jan Lindström <jan.lindstrom@mariadb.com> | 2019-12-11 14:08:06 +0200 |
commit | 72a5a4f1d51aca36a8c87b845a7a8fb615a6ca74 (patch) | |
tree | 93a4e52bd9915da49cdb4993e773c0220501c7a2 /sql | |
parent | 7c2c420b70b19cc02b5281127205e876f3919dad (diff) | |
download | mariadb-git-72a5a4f1d51aca36a8c87b845a7a8fb615a6ca74.tar.gz |
MDEV-20780 Fixes for failures on galera_sr_ddl_master (#1425)
Test galera_sr_ddl_master would sometimes fail due to leftover
streaming replication fragments. Rollbacker thread would attempt to
open streaming_log table to remove the fragments, but would fail in
check_stack_overrun(). Ultimately the check_stack_overrun() failure
was caused by rollbacker missing to switch the victim's THD thread
stack to rollbacker's thread stack.
Also in this patch:
- Remove duplicate functionality in rollbacker helper functions,
and extract rollbacker fragment removal into function
wsrep_remove_streaming_fragments()
- Reuse open_for_write() in wsrep_schema::remove_fragments
- Partially revert changes to galera_sr_ddl_master test from
commit 44a11a7c085f4f5a4042100df0828d54d596103d. Removed unnecessary
wait condition and isolation level setting
Diffstat (limited to 'sql')
-rw-r--r-- | sql/wsrep_schema.cc | 20 | ||||
-rw-r--r-- | sql/wsrep_server_service.cc | 29 | ||||
-rw-r--r-- | sql/wsrep_server_service.h | 10 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 111 |
4 files changed, 62 insertions, 108 deletions
diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index 066ea124fb7..619a535f916 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1049,37 +1049,23 @@ int Wsrep_schema::remove_fragments(THD* thd, Wsrep_schema_impl::wsrep_off wsrep_off(thd); Wsrep_schema_impl::binlog_off binlog_off(thd); - /* - Open SR table for write. - Adopted from Rpl_info_table_access::open_table() - */ - uint flags= (MYSQL_OPEN_IGNORE_GLOBAL_READ_LOCK | - MYSQL_LOCK_IGNORE_GLOBAL_READ_ONLY | - MYSQL_OPEN_IGNORE_FLUSH | - MYSQL_LOCK_IGNORE_TIMEOUT); Query_tables_list query_tables_list_backup; Open_tables_backup open_tables_backup; thd->lex->reset_n_backup_query_tables_list(&query_tables_list_backup); thd->reset_n_backup_open_tables_state(&open_tables_backup); - TABLE_LIST tables; - LEX_CSTRING schema_str= { wsrep_schema_str.c_str(), wsrep_schema_str.length() }; - LEX_CSTRING table_str= { sr_table_str.c_str(), sr_table_str.length() }; - tables.init_one_table(&schema_str, - &table_str, 0, TL_WRITE); - if (!open_n_lock_single_table(thd, &tables, tables.lock_type, flags)) + TABLE* frag_table= 0; + if (Wsrep_schema_impl::open_for_write(thd, sr_table_str.c_str(), &frag_table)) { - WSREP_DEBUG("Failed to open SR table for access"); ret= 1; } else { - tables.table->use_all_columns(); for (std::vector<wsrep::seqno>::const_iterator i= fragments.begin(); i != fragments.end(); ++i) { if (remove_fragment(thd, - tables.table, + frag_table, server_id, transaction_id, *i)) { diff --git a/sql/wsrep_server_service.cc b/sql/wsrep_server_service.cc index bfb85e3d0ab..aa288e67420 100644 --- a/sql/wsrep_server_service.cc +++ b/sql/wsrep_server_service.cc @@ -42,15 +42,13 @@ static void init_service_thd(THD* thd, char* thread_stack) thd->reset_for_next_command(true); } -wsrep::storage_service* Wsrep_server_service::storage_service( - wsrep::client_service& client_service) +Wsrep_storage_service* +wsrep_create_storage_service(THD* orig_THD, const char* ctx) { - Wsrep_client_service& cs= - static_cast<Wsrep_client_service&>(client_service); - THD* thd= new THD(next_thread_id(), true); - init_service_thd(thd, cs.m_thd->thread_stack); - WSREP_DEBUG("Created storage service with thread id %llu", - thd->thread_id); + THD* thd= new THD(true, true); + init_service_thd(thd, orig_THD->thread_stack); + WSREP_DEBUG("Created storage service in %s context with thread id %llu", + ctx, thd->thread_id); /* Use variables from the current thd attached to client_service. This is because we need to be able to BF abort storage access operations. */ @@ -59,16 +57,19 @@ wsrep::storage_service* Wsrep_server_service::storage_service( } wsrep::storage_service* Wsrep_server_service::storage_service( + wsrep::client_service& client_service) +{ + Wsrep_client_service& cs= + static_cast<Wsrep_client_service&>(client_service); + return wsrep_create_storage_service(cs.m_thd, "local"); +} + +wsrep::storage_service* Wsrep_server_service::storage_service( wsrep::high_priority_service& high_priority_service) { Wsrep_high_priority_service& hps= static_cast<Wsrep_high_priority_service&>(high_priority_service); - THD* thd= new THD(next_thread_id(), true); - init_service_thd(thd, hps.m_thd->thread_stack); - WSREP_DEBUG("Created high priority storage service with thread id %llu", - thd->thread_id); - wsrep_assign_from_threadvars(thd); - return new Wsrep_storage_service(thd); + return wsrep_create_storage_service(hps.m_thd, "high priority"); } void Wsrep_server_service::release_storage_service( diff --git a/sql/wsrep_server_service.h b/sql/wsrep_server_service.h index 6336fe2c473..4017c9b2d58 100644 --- a/sql/wsrep_server_service.h +++ b/sql/wsrep_server_service.h @@ -87,4 +87,14 @@ class Wsrep_applier_service; Wsrep_applier_service* wsrep_create_streaming_applier(THD *orig_thd, const char *ctx); +/** + Helper method to create new storage service. + + @param orig_thd Original thd context to copy operation context from. + @param ctx Context string for debug logging. +*/ +class Wsrep_storage_service; +Wsrep_storage_service* +wsrep_create_storage_service(THD *orig_thd, const char *ctx); + #endif /* WSREP_SERVER_SERVICE */ diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 8b535b41a43..7f1818def73 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -136,99 +136,60 @@ void wsrep_create_appliers(long threads) } } -static void wsrep_rollback_streaming_aborted_by_toi(THD *thd) +static void wsrep_remove_streaming_fragments(THD* thd, const char* ctx) { - WSREP_INFO("wsrep_rollback_streaming_aborted_by_toi"); - /* Set thd->event_scheduler.data temporarily to NULL to avoid - callbacks to threadpool wait_begin() during rollback. */ - auto saved_esd= thd->event_scheduler.data; - thd->event_scheduler.data= 0; - if (thd->wsrep_cs().mode() == wsrep::client_state::m_high_priority) - { - DBUG_ASSERT(!saved_esd); - DBUG_ASSERT(thd->wsrep_applier_service); - thd->wsrep_applier_service->rollback(wsrep::ws_handle(), - wsrep::ws_meta()); - thd->wsrep_applier_service->after_apply(); - /* Will free THD */ - Wsrep_server_state::instance().server_service(). - release_high_priority_service(thd->wsrep_applier_service); - } - else - { - mysql_mutex_lock(&thd->LOCK_thd_data); - /* prepare THD for rollback processing */ - thd->reset_for_next_command(true); - thd->lex->sql_command= SQLCOM_ROLLBACK; - mysql_mutex_unlock(&thd->LOCK_thd_data); - /* Perform a client rollback, restore globals and signal - the victim only when all the resources have been - released */ - thd->wsrep_cs().client_service().bf_rollback(); - wsrep_reset_threadvars(thd); - /* Assign saved event_scheduler.data back before letting - client to continue. */ - thd->event_scheduler.data= saved_esd; - thd->wsrep_cs().sync_rollback_complete(); - } + wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); + Wsrep_storage_service* storage_service= wsrep_create_storage_service(thd, ctx); + storage_service->store_globals(); + storage_service->adopt_transaction(thd->wsrep_trx()); + storage_service->remove_fragments(); + storage_service->commit(wsrep::ws_handle(transaction_id, 0), + wsrep::ws_meta()); + Wsrep_server_state::instance().server_service() + .release_storage_service(storage_service); + wsrep_store_threadvars(thd); } -static void wsrep_rollback_high_priority(THD *thd) +static void wsrep_rollback_high_priority(THD *thd, THD *rollbacker) { - WSREP_INFO("rollbacker aborting SR thd: (%lld %llu)", - thd->thread_id, (long long)thd->real_id); + WSREP_DEBUG("Rollbacker aborting SR applier thd (%llu %lu)", + thd->thread_id, thd->real_id); + char* orig_thread_stack= thd->thread_stack; + thd->thread_stack= rollbacker->thread_stack; DBUG_ASSERT(thd->wsrep_cs().mode() == Wsrep_client_state::m_high_priority); /* Must be streaming and must have been removed from the server state streaming appliers map. */ DBUG_ASSERT(thd->wsrep_trx().is_streaming()); DBUG_ASSERT(!Wsrep_server_state::instance().find_streaming_applier( - thd->wsrep_trx().server_id(), - thd->wsrep_trx().id())); + thd->wsrep_trx().server_id(), + thd->wsrep_trx().id())); DBUG_ASSERT(thd->wsrep_applier_service); /* Fragment removal should happen before rollback to make the transaction non-observable in SR table after the rollback completes. For correctness the order does not matter here, but currently it is mandated by checks in some MTR tests. */ - wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); - Wsrep_storage_service* storage_service= - static_cast<Wsrep_storage_service*>( - Wsrep_server_state::instance().server_service().storage_service( - *thd->wsrep_applier_service)); - storage_service->store_globals(); - storage_service->adopt_transaction(thd->wsrep_trx()); - storage_service->remove_fragments(); - storage_service->commit(wsrep::ws_handle(transaction_id, 0), - wsrep::ws_meta()); - Wsrep_server_state::instance().server_service().release_storage_service(storage_service); - wsrep_store_threadvars(thd); + wsrep_remove_streaming_fragments(thd, "high priority"); thd->wsrep_applier_service->rollback(wsrep::ws_handle(), wsrep::ws_meta()); thd->wsrep_applier_service->after_apply(); + thd->thread_stack= orig_thread_stack; + WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)", + thd->thread_id, thd->real_id); /* Will free THD */ Wsrep_server_state::instance().server_service() .release_high_priority_service(thd->wsrep_applier_service); } -static void wsrep_rollback_local(THD *thd) +static void wsrep_rollback_local(THD *thd, THD *rollbacker) { - WSREP_INFO("Wsrep_rollback_local"); + WSREP_DEBUG("Rollbacker aborting local thd (%llu %lu)", + thd->thread_id, thd->real_id); + char* orig_thread_stack= thd->thread_stack; + thd->thread_stack= rollbacker->thread_stack; if (thd->wsrep_trx().is_streaming()) { - wsrep::transaction_id transaction_id(thd->wsrep_trx().id()); - Wsrep_storage_service* storage_service= - static_cast<Wsrep_storage_service*>( - Wsrep_server_state::instance().server_service(). - storage_service(thd->wsrep_cs().client_service())); - - storage_service->store_globals(); - storage_service->adopt_transaction(thd->wsrep_trx()); - storage_service->remove_fragments(); - storage_service->commit(wsrep::ws_handle(transaction_id, 0), - wsrep::ws_meta()); - Wsrep_server_state::instance().server_service(). - release_storage_service(storage_service); - wsrep_store_threadvars(thd); + wsrep_remove_streaming_fragments(thd, "local"); } /* Set thd->event_scheduler.data temporarily to NULL to avoid callbacks to threadpool wait_begin() during rollback. */ @@ -247,9 +208,10 @@ static void wsrep_rollback_local(THD *thd) /* Assign saved event_scheduler.data back before letting client to continue. */ thd->event_scheduler.data= saved_esd; + thd->thread_stack= orig_thread_stack; thd->wsrep_cs().sync_rollback_complete(); - WSREP_DEBUG("rollbacker aborted thd: (%llu %llu)", - thd->thread_id, (long long)thd->real_id); + WSREP_DEBUG("rollbacker aborted thd: (%llu %lu)", + thd->thread_id, thd->real_id); } static void wsrep_rollback_process(THD *rollbacker, @@ -286,18 +248,13 @@ static void wsrep_rollback_process(THD *rollbacker, /* Rollback methods below may free thd pointer. Do not try to access it after method returns. */ - if (thd->wsrep_trx().is_streaming() && - thd->wsrep_trx().bf_aborted_in_total_order()) - { - wsrep_rollback_streaming_aborted_by_toi(thd); - } - else if (wsrep_thd_is_applying(thd)) + if (wsrep_thd_is_applying(thd)) { - wsrep_rollback_high_priority(thd); + wsrep_rollback_high_priority(thd, rollbacker); } else { - wsrep_rollback_local(thd); + wsrep_rollback_local(thd, rollbacker); } wsrep_store_threadvars(rollbacker); thd_proc_info(rollbacker, "wsrep aborter idle"); |