diff options
Diffstat (limited to 'sql/wsrep_hton.cc')
-rw-r--r-- | sql/wsrep_hton.cc | 116 |
1 files changed, 74 insertions, 42 deletions
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index 1676daab5fe..0a2264ac03c 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -1,4 +1,4 @@ -/* Copyright 2008 Codership Oy <http://www.codership.com> +/* Copyright 2008-2015 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -19,11 +19,11 @@ #include <sql_class.h> #include "wsrep_mysqld.h" #include "wsrep_binlog.h" +#include "wsrep_xid.h" #include <cstdio> #include <cstdlib> +#include "debug_sync.h" -extern handlerton *binlog_hton; -extern int binlog_close_connection(handlerton *hton, THD *thd); extern ulonglong thd_to_trx_id(THD *thd); extern "C" int thd_binlog_format(const MYSQL_THD thd); @@ -37,11 +37,15 @@ enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, */ void wsrep_cleanup_transaction(THD *thd) { + if (!WSREP(thd)) return; + if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd); thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID; thd->wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; thd->wsrep_exec_mode= LOCAL_STATE; + thd->wsrep_affected_rows= 0; + thd->wsrep_skip_wsrep_GTID= false; return; } @@ -70,18 +74,30 @@ void wsrep_register_hton(THD* thd, bool all) { if (thd->wsrep_exec_mode != TOTAL_ORDER && !thd->wsrep_apply_toi) { + if (thd->wsrep_exec_mode == LOCAL_STATE && + (thd_sql_command(thd) == SQLCOM_OPTIMIZE || + thd_sql_command(thd) == SQLCOM_ANALYZE || + thd_sql_command(thd) == SQLCOM_REPAIR) && + thd->lex->no_write_to_binlog == 1) + { + WSREP_DEBUG("Skipping wsrep_register_hton for LOCAL sql admin command : %s", + thd->query()); + return; + } + THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next()) { - if (i->ht()->db_type == DB_TYPE_INNODB) + if ((i->ht()->db_type == DB_TYPE_INNODB) || + (i->ht()->db_type == DB_TYPE_TOKUDB)) { trans_register_ha(thd, all, wsrep_hton); /* follow innodb read/write settting - * but, as an exception: CTAS with empty result set will not be + * but, as an exception: CTAS with empty result set will not be * replicated unless we declare wsrep hton as read/write here */ - if (i->is_trx_read_write() || + if (i->is_trx_read_write() || (thd->lex->sql_command == SQLCOM_CREATE_TABLE && thd->wsrep_exec_mode == LOCAL_STATE)) { @@ -110,7 +126,7 @@ void wsrep_post_commit(THD* thd, bool all) { DBUG_PRINT("wsrep", ("set committed fail")); WSREP_WARN("set committed fail: %llu %d", - (long long)thd->real_id, thd->stmt_da->status()); + (long long)thd->real_id, thd->get_stmt_da()->status()); } wsrep_cleanup_transaction(thd); break; @@ -132,7 +148,7 @@ void wsrep_post_commit(THD* thd, bool all) wsrep && wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) { WSREP_WARN("post_rollback fail: %llu %d", - (long long)thd->thread_id, thd->stmt_da->status()); + (long long)thd->thread_id, thd->get_stmt_da()->status()); } wsrep_cleanup_transaction(thd); break; @@ -157,10 +173,7 @@ wsrep_close_connection(handlerton* hton, THD* thd) { DBUG_RETURN(0); } - - if (wsrep_emulate_bin_log && thd_get_ha_data(thd, binlog_hton) != NULL) - binlog_hton->close_connection (binlog_hton, thd); - DBUG_RETURN(0); + DBUG_RETURN(wsrep_binlog_close_connection (thd)); } /* @@ -203,10 +216,11 @@ static int wsrep_savepoint_set(handlerton *hton, THD *thd, void *sv) DBUG_RETURN(0); } - if (!wsrep_emulate_bin_log) return 0; - int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv); - return rcode; + if (!wsrep_emulate_bin_log) DBUG_RETURN(0); + int rcode = wsrep_binlog_savepoint_set(thd, sv); + DBUG_RETURN(rcode); } + static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv) { DBUG_ENTER("wsrep_savepoint_rollback"); @@ -216,9 +230,9 @@ static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv) DBUG_RETURN(0); } - if (!wsrep_emulate_bin_log) return 0; - int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv); - return rcode; + if (!wsrep_emulate_bin_log) DBUG_RETURN(0); + int rcode = wsrep_binlog_savepoint_rollback(thd, sv); + DBUG_RETURN(rcode); } static int wsrep_rollback(handlerton *hton, THD *thd, bool all) @@ -231,14 +245,25 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all) } mysql_mutex_lock(&thd->LOCK_wsrep_thd); + switch (thd->wsrep_exec_mode) + { + case TOTAL_ORDER: + case REPL_RECV: + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + WSREP_DEBUG("Avoiding wsrep rollback for failed DDL: %s", thd->query()); + DBUG_RETURN(0); + default: break; + } + if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && - (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) + thd->wsrep_conflict_state != MUST_REPLAY) { - if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) + if (WSREP(thd) && wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", - (long long)thd->real_id, thd->query()); + WSREP_ERROR("settting rollback fail: thd: %llu, schema: %s, SQL: %s", + (long long)thd->real_id, (thd->db ? thd->db : "(null)"), + thd->query()); } wsrep_cleanup_transaction(thd); } @@ -274,13 +299,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) Transaction didn't go through wsrep->pre_commit() so just roll back possible changes to clean state. */ - if (WSREP_PROVIDER_EXISTS) { - if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) - { - DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", - (long long)thd->real_id, thd->query()); - } + if (WSREP(thd) && wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) + { + DBUG_PRINT("wsrep", ("setting rollback fail")); + WSREP_ERROR("settting rollback fail: thd: %llu, schema: %s, SQL: %s", + (long long)thd->real_id, (thd->db ? thd->db : "(null)"), + thd->query()); } wsrep_cleanup_transaction(thd); } @@ -301,13 +325,15 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) IO_CACHE *cache; int replay_round= 0; - if (thd->stmt_da->is_error()) { - WSREP_ERROR("commit issue, error: %d %s", - thd->stmt_da->sql_errno(), thd->stmt_da->message()); + if (thd->get_stmt_da()->is_error()) { + WSREP_DEBUG("commit issue, error: %d %s", + thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message()); } DBUG_ENTER("wsrep_run_wsrep_commit"); + DEBUG_SYNC(thd, "wsrep_before_replication"); + if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK); if (thd->wsrep_exec_mode == REPL_RECV) { @@ -353,7 +379,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) while (wsrep_replaying > 0 && thd->wsrep_conflict_state == NO_CONFLICT && - thd->killed == NOT_KILLED && + thd->killed == NOT_KILLED && !shutdown_in_progress) { @@ -416,8 +442,8 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) if (data_len == 0) { - if (thd->stmt_da->is_ok() && - thd->stmt_da->affected_rows() > 0 && + if (thd->get_stmt_da()->is_ok() && + thd->get_stmt_da()->affected_rows() > 0 && !binlog_filter->is_on()) { WSREP_DEBUG("empty rbr buffer, query: %s, " @@ -425,7 +451,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) "changed tables: %d, " "sql_log_bin: %d, " "wsrep status (%d %d %d)", - thd->query(), thd->stmt_da->affected_rows(), + thd->query(), thd->get_stmt_da()->affected_rows(), stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin, thd->wsrep_exec_mode, thd->wsrep_query_state, thd->wsrep_conflict_state); @@ -441,9 +467,11 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id) { WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n" - "QUERY: %s\n" - " => Skipping replication", - thd->thread_id, data_len, thd->query()); + "schema: %s \n" + "QUERY: %s\n" + " => Skipping replication", + thd->thread_id, data_len, + (thd->db ? thd->db : "(null)"), thd->query()); rcode = WSREP_TRX_FAIL; } else if (!rcode) @@ -458,14 +486,15 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) &thd->wsrep_trx_meta); if (rcode == WSREP_TRX_MISSING) { - WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", - thd->thread_id, thd->query()); + WSREP_WARN("Transaction missing in provider, thd: %ld, schema: %s, SQL: %s", + thd->thread_id, (thd->db ? thd->db : "(null)"), thd->query()); rcode = WSREP_TRX_FAIL; } else if (rcode == WSREP_BF_ABORT) { WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay", thd->thread_id, (long long)thd->wsrep_trx_meta.gtid.seqno); mysql_mutex_lock(&thd->LOCK_wsrep_thd); thd->wsrep_conflict_state = MUST_REPLAY; + DBUG_ASSERT(wsrep_thd_trx_seqno(thd) > 0); mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&LOCK_wsrep_replaying); wsrep_replaying++; @@ -481,6 +510,9 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) } mysql_mutex_lock(&thd->LOCK_wsrep_thd); + + DEBUG_SYNC(thd, "wsrep_after_replication"); + switch(rcode) { case 0: /* @@ -505,7 +537,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) if (thd->transaction.xid_state.xid.get_my_xid()) { wsrep_xid_init(&thd->transaction.xid_state.xid, - &thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.uuid, thd->wsrep_trx_meta.gtid.seqno); } DBUG_PRINT("wsrep", ("replicating commit success")); |