diff options
author | Julius Goryavsky <julius.goryavsky@mariadb.com> | 2020-06-05 18:32:37 +0200 |
---|---|---|
committer | Julius Goryavsky <julius.goryavsky@mariadb.com> | 2020-06-05 18:32:37 +0200 |
commit | 5f55f69e4a237deb98cd4fc7d6ea9a17a8fe63bb (patch) | |
tree | d9031bb02e1807d0324b9f802dcf72eb8e3768a6 /sql/wsrep_thd.cc | |
parent | efc70da5fd0459ff44153529d13651741cc32bc4 (diff) | |
parent | 3f019d1771d4e6e21f941b72a83e0663f15b376f (diff) | |
download | mariadb-git-5f55f69e4a237deb98cd4fc7d6ea9a17a8fe63bb.tar.gz |
Merge 10.1 into 10.2
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r-- | sql/wsrep_thd.cc | 111 |
1 files changed, 108 insertions, 3 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index a7b6e8ff1b1..1e60088c5f1 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -93,10 +93,12 @@ void wsrep_client_rollback(THD *thd) #define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2 -static rpl_group_info* wsrep_relay_group_init(const char* log_fname) +static rpl_group_info* wsrep_relay_group_init(THD *thd, const char* log_fname) { Relay_log_info* rli= new Relay_log_info(false); + WSREP_DEBUG("wsrep_relay_group_init %s", log_fname); + if (!rli->relay_log.description_event_for_exec) { rli->relay_log.description_event_for_exec= @@ -125,7 +127,7 @@ static rpl_group_info* wsrep_relay_group_init(const char* log_fname) rli->mi = new Master_info(&connection_name, false); struct rpl_group_info *rgi= new rpl_group_info(rli); - rgi->thd= rli->sql_driver_thd= current_thd; + rgi->thd= rli->sql_driver_thd= thd; if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on())) { @@ -150,7 +152,7 @@ static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) else thd->variables.option_bits&= ~(OPTION_BIN_LOG); - if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay"); + if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init(thd, "wsrep_relay"); /* thd->system_thread_info.rpl_sql_info isn't initialized. */ if (!thd->slave_thread) @@ -193,6 +195,109 @@ static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) thd->set_row_count_func(shadow->row_count_func); } +void wsrep_replay_sp_transaction(THD* thd) +{ + DBUG_ENTER("wsrep_replay_sp_transaction"); + mysql_mutex_assert_owner(&thd->LOCK_thd_data); + DBUG_ASSERT(thd->wsrep_conflict_state == MUST_REPLAY); + DBUG_ASSERT(wsrep_thd_trx_seqno(thd) > 0); + + WSREP_DEBUG("replaying SP transaction %llu", thd->thread_id); + close_thread_tables(thd); + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("releasing table lock for replaying (%u)", + thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + thd->mdl_context.release_transactional_locks(); + + mysql_mutex_unlock(&thd->LOCK_thd_data); + THD *replay_thd= new THD(true); + replay_thd->thread_stack= thd->thread_stack; + + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(replay_thd, &shadow); + WSREP_DEBUG("replaying set for %p rgi %p", replay_thd, replay_thd->wsrep_rgi); replay_thd->wsrep_trx_meta= thd->wsrep_trx_meta; + replay_thd->wsrep_ws_handle= thd->wsrep_ws_handle; + replay_thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID; + replay_thd->wsrep_conflict_state= REPLAYING; + + replay_thd->variables.option_bits|= OPTION_BEGIN; + replay_thd->server_status|= SERVER_STATUS_IN_TRANS; + + thd->reset_globals(); + replay_thd->store_globals(); + wsrep_status_t rcode= wsrep->replay_trx(wsrep, + &replay_thd->wsrep_ws_handle, + (void*) replay_thd); + + wsrep_return_from_bf_mode(replay_thd, &shadow); + replay_thd->reset_globals(); + delete replay_thd; + + mysql_mutex_lock(&thd->LOCK_thd_data); + + thd->store_globals(); + + switch (rcode) + { + case WSREP_OK: + { + thd->wsrep_conflict_state= NO_CONFLICT; + thd->killed= NOT_KILLED; + wsrep_status_t rcode= wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); + if (rcode != WSREP_OK) + { + WSREP_WARN("Post commit failed for SP replay: thd: %u error: %d", + thd->thread_id, rcode); + } + /* As replaying the transaction was successful, an error must not + be returned to client, so we need to reset the error state of + the diagnostics area */ + thd->get_stmt_da()->reset_diagnostics_area(); + break; + } + case WSREP_TRX_FAIL: + { + thd->wsrep_conflict_state= ABORTED; + wsrep_status_t rcode= wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle); + if (rcode != WSREP_OK) + { + WSREP_WARN("Post rollback failed for SP replay: thd: %u error: %d", + thd->thread_id, rcode); + } + if (thd->get_stmt_da()->is_set()) + { + thd->get_stmt_da()->reset_diagnostics_area(); + } + my_error(ER_LOCK_DEADLOCK, MYF(0)); + break; + } + default: + WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s", + rcode, + (thd->db ? thd->db : "(null)"), + WSREP_QUERY(thd)); + /* we're now in inconsistent state, must abort */ + mysql_mutex_unlock(&thd->LOCK_thd_data); + unireg_abort(1); + break; + } + + wsrep_cleanup_transaction(thd); + + mysql_mutex_lock(&LOCK_wsrep_replaying); + wsrep_replaying--; + WSREP_DEBUG("replaying decreased: %d, thd: %u", + wsrep_replaying, thd->thread_id); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + + DBUG_VOID_RETURN; +} + void wsrep_replay_transaction(THD *thd) { DBUG_ENTER("wsrep_replay_transaction"); |