summaryrefslogtreecommitdiff
path: root/sql/wsrep_thd.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_thd.cc')
-rw-r--r--sql/wsrep_thd.cc188
1 files changed, 169 insertions, 19 deletions
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index 413d7f97214..4dddb399bd1 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -93,10 +93,12 @@ void wsrep_client_rollback(THD *thd)
#define NUMBER_OF_FIELDS_TO_IDENTIFY_COORDINATOR 1
#define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
-static rpl_group_info* wsrep_relay_group_init(const char* log_fname)
+static rpl_group_info* wsrep_relay_group_init(THD *thd, const char* log_fname)
{
Relay_log_info* rli= new Relay_log_info(false);
+ WSREP_DEBUG("wsrep_relay_group_init %s", log_fname);
+
if (!rli->relay_log.description_event_for_exec)
{
rli->relay_log.description_event_for_exec=
@@ -125,7 +127,7 @@ static rpl_group_info* wsrep_relay_group_init(const char* log_fname)
rli->mi = new Master_info(&connection_name, false);
struct rpl_group_info *rgi= new rpl_group_info(rli);
- rgi->thd= rli->sql_driver_thd= current_thd;
+ rgi->thd= rli->sql_driver_thd= thd;
if ((rgi->deferred_events_collecting= rli->mi->rpl_filter->is_on()))
{
@@ -150,11 +152,12 @@ static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
else
thd->variables.option_bits&= ~(OPTION_BIN_LOG);
- if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init("wsrep_relay");
+ if (!thd->wsrep_rgi) thd->wsrep_rgi= wsrep_relay_group_init(thd, "wsrep_relay");
/* thd->system_thread_info.rpl_sql_info isn't initialized. */
- thd->system_thread_info.rpl_sql_info=
- new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter);
+ if (!thd->slave_thread)
+ thd->system_thread_info.rpl_sql_info=
+ new rpl_sql_thread_info(thd->wsrep_rgi->rli->mi->rpl_filter);
thd->wsrep_exec_mode= REPL_RECV;
thd->net.vio= 0;
@@ -181,7 +184,8 @@ static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
thd->user_time = shadow->user_time;
thd->reset_db(shadow->db, shadow->db_length);
- delete thd->system_thread_info.rpl_sql_info;
+ if (!thd->slave_thread)
+ delete thd->system_thread_info.rpl_sql_info;
delete thd->wsrep_rgi->rli->mi;
delete thd->wsrep_rgi->rli;
@@ -191,6 +195,109 @@ static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
thd->set_row_count_func(shadow->row_count_func);
}
+void wsrep_replay_sp_transaction(THD* thd)
+{
+ DBUG_ENTER("wsrep_replay_sp_transaction");
+ mysql_mutex_assert_owner(&thd->LOCK_thd_data);
+ DBUG_ASSERT(thd->wsrep_conflict_state == MUST_REPLAY);
+ DBUG_ASSERT(wsrep_thd_trx_seqno(thd) > 0);
+
+ WSREP_DEBUG("replaying SP transaction %llu", thd->thread_id);
+ close_thread_tables(thd);
+ if (thd->locked_tables_mode && thd->lock)
+ {
+ WSREP_DEBUG("releasing table lock for replaying (%u)",
+ thd->thread_id);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
+ }
+ thd->mdl_context.release_transactional_locks();
+
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ THD *replay_thd= new THD(true);
+ replay_thd->thread_stack= thd->thread_stack;
+
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(replay_thd, &shadow);
+ WSREP_DEBUG("replaying set for %p rgi %p", replay_thd, replay_thd->wsrep_rgi); replay_thd->wsrep_trx_meta= thd->wsrep_trx_meta;
+ replay_thd->wsrep_ws_handle= thd->wsrep_ws_handle;
+ replay_thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
+ replay_thd->wsrep_conflict_state= REPLAYING;
+
+ replay_thd->variables.option_bits|= OPTION_BEGIN;
+ replay_thd->server_status|= SERVER_STATUS_IN_TRANS;
+
+ thd->reset_globals();
+ replay_thd->store_globals();
+ wsrep_status_t rcode= wsrep->replay_trx(wsrep,
+ &replay_thd->wsrep_ws_handle,
+ (void*) replay_thd);
+
+ wsrep_return_from_bf_mode(replay_thd, &shadow);
+ replay_thd->reset_globals();
+ delete replay_thd;
+
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+
+ thd->store_globals();
+
+ switch (rcode)
+ {
+ case WSREP_OK:
+ {
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ thd->killed= NOT_KILLED;
+ wsrep_status_t rcode= wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
+ if (rcode != WSREP_OK)
+ {
+ WSREP_WARN("Post commit failed for SP replay: thd: %u error: %d",
+ thd->thread_id, rcode);
+ }
+ /* As replaying the transaction was successful, an error must not
+ be returned to client, so we need to reset the error state of
+ the diagnostics area */
+ thd->get_stmt_da()->reset_diagnostics_area();
+ break;
+ }
+ case WSREP_TRX_FAIL:
+ {
+ thd->wsrep_conflict_state= ABORTED;
+ wsrep_status_t rcode= wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
+ if (rcode != WSREP_OK)
+ {
+ WSREP_WARN("Post rollback failed for SP replay: thd: %u error: %d",
+ thd->thread_id, rcode);
+ }
+ if (thd->get_stmt_da()->is_set())
+ {
+ thd->get_stmt_da()->reset_diagnostics_area();
+ }
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ break;
+ }
+ default:
+ WSREP_ERROR("trx_replay failed for: %d, schema: %s, query: %s",
+ rcode,
+ (thd->db ? thd->db : "(null)"),
+ WSREP_QUERY(thd));
+ /* we're now in inconsistent state, must abort */
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ unireg_abort(1);
+ break;
+ }
+
+ wsrep_cleanup_transaction(thd);
+
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying--;
+ WSREP_DEBUG("replaying decreased: %d, thd: %u",
+ wsrep_replaying, thd->thread_id);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+
+ DBUG_VOID_RETURN;
+}
+
void wsrep_replay_transaction(THD *thd)
{
DBUG_ENTER("wsrep_replay_transaction");
@@ -416,29 +523,46 @@ static void wsrep_replication_process(THD *thd)
DBUG_VOID_RETURN;
}
-static bool create_wsrep_THD(wsrep_thread_args* args)
+static bool create_wsrep_THD(wsrep_thread_args* args, bool thread_count_lock)
{
- mysql_mutex_lock(&LOCK_thread_count);
+ if (!thread_count_lock)
+ mysql_mutex_lock(&LOCK_thread_count);
+
ulong old_wsrep_running_threads= wsrep_running_threads;
+
DBUG_ASSERT(args->thread_type == WSREP_APPLIER_THREAD ||
args->thread_type == WSREP_ROLLBACKER_THREAD);
+
bool res= mysql_thread_create(args->thread_type == WSREP_APPLIER_THREAD
? key_wsrep_applier : key_wsrep_rollbacker,
&args->thread_id, &connection_attrib,
start_wsrep_THD, (void*)args);
+
+ if (res)
+ {
+ WSREP_ERROR("Can't create wsrep thread");
+ }
+
/*
if starting a thread on server startup, wait until the this thread's THD
is fully initialized (otherwise a THD initialization code might
try to access a partially initialized server data structure - MDEV-8208).
*/
if (!mysqld_server_initialized)
+ {
while (old_wsrep_running_threads == wsrep_running_threads)
+ {
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
+ }
+ }
+
+ if (!thread_count_lock)
+ mysql_mutex_unlock(&LOCK_thread_count);
+
return res;
}
-void wsrep_create_appliers(long threads)
+bool wsrep_create_appliers(long threads, bool thread_count_lock)
{
if (!wsrep_connected)
{
@@ -450,26 +574,32 @@ void wsrep_create_appliers(long threads)
"connection at '%s'", wsrep_cluster_address);
assert(0);
}
- return;
+ return false;
}
- long wsrep_threads=0;
+ long wsrep_threads= 0;
+
while (wsrep_threads++ < threads) {
wsrep_thread_args* arg;
- if((arg = (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL) {
+
+ if((arg= (wsrep_thread_args*)my_malloc(sizeof(wsrep_thread_args), MYF(0))) == NULL)
+ {
WSREP_ERROR("Can't allocate memory for wsrep replication thread %ld\n", wsrep_threads);
assert(0);
}
- arg->thread_type = WSREP_APPLIER_THREAD;
- arg->processor = wsrep_replication_process;
+ arg->thread_type= WSREP_APPLIER_THREAD;
+ arg->processor= wsrep_replication_process;
- if (create_wsrep_THD(arg)) {
- WSREP_WARN("Can't create thread to manage wsrep replication");
+ if (create_wsrep_THD(arg, thread_count_lock))
+ {
+ WSREP_ERROR("Can't create thread to manage wsrep replication");
my_free(arg);
- return;
+ return true;
}
}
+
+ return false;
}
static void wsrep_rollback_process(THD *thd)
@@ -565,7 +695,7 @@ void wsrep_create_rollbacker()
arg->processor = wsrep_rollback_process;
/* create rollbacker */
- if (create_wsrep_THD(arg)) {
+ if (create_wsrep_THD(arg, false)) {
WSREP_WARN("Can't create thread to manage wsrep rollback");
my_free(arg);
return;
@@ -746,3 +876,23 @@ bool wsrep_is_load_multi_commit(THD *thd)
{
return thd->wsrep_split_flag;
}
+
+void wsrep_report_bf_lock_wait(THD *thd,
+ unsigned long long trx_id)
+{
+ if (thd)
+ {
+ WSREP_ERROR("Thread %s trx_id: %llu thread: %ld "
+ "seqno: %lld query_state: %s conf_state: %s exec_mode: %s "
+ "applier: %d query: %s",
+ wsrep_thd_is_BF(thd, false) ? "BF" : "normal",
+ trx_id,
+ thd_get_thread_id(thd),
+ wsrep_thd_trx_seqno(thd),
+ wsrep_thd_query_state_str(thd),
+ wsrep_thd_conflict_state_str(thd),
+ wsrep_thd_exec_mode_str(thd),
+ thd->wsrep_applier,
+ wsrep_thd_query(thd));
+ }
+}