diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 161 |
1 files changed, 112 insertions, 49 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index fa2f866a3f6..87e377c1819 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -66,8 +66,11 @@ #include "sql_callback.h" #include "lock.h" #include "wsrep_mysqld.h" -#include "wsrep_thd.h" #include "sql_connect.h" +#ifdef WITH_WSREP +#include "wsrep_thd.h" +#include "wsrep_trans_observer.h" +#endif /* WITH_WSREP */ #ifdef HAVE_SYS_SYSCALL_H #include <sys/syscall.h> @@ -639,16 +642,42 @@ THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock) xid_hash_pins(0), m_tmp_tables_locked(false) #ifdef WITH_WSREP - , + , wsrep_applier(is_wsrep_applier), wsrep_applier_closing(false), wsrep_client_thread(false), - wsrep_apply_toi(false), + wsrep_retry_counter(0), + wsrep_PA_safe(true), + wsrep_retry_query(NULL), + wsrep_retry_query_len(0), + wsrep_retry_command(COM_CONNECT), + wsrep_consistency_check(NO_CONSISTENCY_CHECK), + wsrep_mysql_replicated(0), + wsrep_TOI_pre_query(NULL), + wsrep_TOI_pre_query_len(0), wsrep_po_handle(WSREP_PO_INITIALIZER), wsrep_po_cnt(0), wsrep_apply_format(0), - wsrep_ignore_table(false) -#endif + wsrep_apply_toi(false), + wsrep_rbr_buf(NULL), + wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED), + wsrep_affected_rows(0), + wsrep_has_ignored_error(false), + wsrep_replicate_GTID(false), + wsrep_ignore_table(false), + +/* wsrep-lib */ + m_wsrep_next_trx_id(WSREP_UNDEFINED_TRX_ID), + m_wsrep_mutex(LOCK_thd_data), + m_wsrep_cond(COND_wsrep_thd), + m_wsrep_client_service(this, m_wsrep_client_state), + m_wsrep_client_state(this, + m_wsrep_mutex, + m_wsrep_cond, + Wsrep_server_state::instance(), + m_wsrep_client_service, + wsrep::client_id(thread_id)) +#endif /*WITH_WSREP */ { ulong tmp; bzero(&variables, sizeof(variables)); @@ -771,22 +800,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock) *scramble= '\0'; #ifdef WITH_WSREP - wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; - wsrep_ws_handle.opaque = NULL; - wsrep_retry_counter = 0; - wsrep_PA_safe = true; - wsrep_retry_query = NULL; - wsrep_retry_query_len = 0; - wsrep_retry_command = COM_CONNECT; - wsrep_consistency_check = NO_CONSISTENCY_CHECK; - wsrep_mysql_replicated = 0; - wsrep_TOI_pre_query = NULL; - wsrep_TOI_pre_query_len = 0; + mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ - wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; - wsrep_affected_rows = 0; - wsrep_replicate_GTID = false; - wsrep_skip_wsrep_GTID = false; #endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -1049,10 +1064,25 @@ Sql_condition* THD::raise_condition(uint sql_errno, is_slave_error= 1; // needed to catch query errors during replication - if (!da->is_error()) +#ifdef WITH_WSREP + /* + With wsrep we allow converting BF abort error to warning if + errors are ignored. + */ + if (!is_fatal_error && + no_errors && + (wsrep_trx().bf_aborted() || wsrep_retry_counter)) { - set_row_count_func(-1); - da->set_error_status(sql_errno, msg, sqlstate, ucid, cond); + WSREP_DEBUG("BF abort error converted to warning"); + } + else +#endif /* WITH_WSREP */ + { + if (!da->is_error()) + { + set_row_count_func(-1); + da->set_error_status(sql_errno, msg, sqlstate, ucid, cond); + } } } @@ -1113,6 +1143,13 @@ void *thd_memdup(MYSQL_THD thd, const void* str, size_t size) extern "C" void thd_get_xid(const MYSQL_THD thd, MYSQL_XID *xid) { +#ifdef WITH_WSREP + if (!thd->wsrep_xid.is_null()) + { + *xid = *(MYSQL_XID *) &thd->wsrep_xid; + } + else +#endif /* WITH_WSREP */ *xid = *(MYSQL_XID *) &thd->transaction.xid_state.xid; } @@ -1221,12 +1258,9 @@ void THD::init(bool skip_lock) first_successful_insert_id_in_cur_stmt= 0; current_backup_stage= BACKUP_FINISHED; #ifdef WITH_WSREP - wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE; - wsrep_conflict_state= NO_CONFLICT; - wsrep_query_state= QUERY_IDLE; wsrep_last_query_id= 0; - wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; - wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; + wsrep_xid.null(); + wsrep_skip_locking= FALSE; wsrep_converted_lock_session= false; wsrep_retry_counter= 0; wsrep_rgi= NULL; @@ -1235,10 +1269,10 @@ void THD::init(bool skip_lock) wsrep_mysql_replicated = 0; wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query_len = 0; - wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED; + wsrep_rbr_buf = NULL; wsrep_affected_rows = 0; + m_wsrep_next_trx_id = WSREP_UNDEFINED_TRX_ID; wsrep_replicate_GTID = false; - wsrep_skip_wsrep_GTID = false; #endif /* WITH_WSREP */ if (variables.sql_log_bin) @@ -1467,6 +1501,13 @@ void THD::cleanup(void) #error xid_state in the cache should be replaced by the allocated value } #endif +#ifdef WITH_WSREP + if (wsrep_cs().state() != wsrep::client_state::s_none) + { + wsrep_cs().cleanup(); + } + wsrep_client_thread= false; +#endif /* WITH_WSREP */ mysql_ha_cleanup(this); locked_tables_list.unlock_locked_tables(this); @@ -1587,6 +1628,9 @@ void THD::reset_for_reuse() #ifdef SIGNAL_WITH_VIO_CLOSE active_vio = 0; #endif +#ifdef WITH_WSREP + wsrep_free_status(this); +#endif /* WITH_WSREP */ } @@ -1613,15 +1657,21 @@ THD::~THD() THD is not deleted while they access it. The following mutex_lock ensures that no one else is using this THD and it's now safe to delete */ + if (WSREP(this)) mysql_mutex_lock(&LOCK_thd_data); mysql_mutex_lock(&LOCK_thd_kill); mysql_mutex_unlock(&LOCK_thd_kill); + if (WSREP(this)) mysql_mutex_unlock(&LOCK_thd_data); -#ifdef WITH_WSREP - delete wsrep_rgi; -#endif if (!free_connection_done) free_connection(); +#ifdef WITH_WSREP + if (wsrep_rgi != NULL) { + delete wsrep_rgi; + wsrep_rgi = NULL; + } + mysql_cond_destroy(&COND_wsrep_thd); +#endif mdl_context.destroy(); free_root(&transaction.mem_root,MYF(0)); @@ -1803,6 +1853,7 @@ void THD::awake_no_mutex(killed_state state_to_set) DBUG_PRINT("enter", ("this: %p current_thd: %p state: %d", this, current_thd, (int) state_to_set)); THD_CHECK_SENTRY(this); + if (WSREP(this)) mysql_mutex_assert_owner(&LOCK_thd_data); mysql_mutex_assert_owner(&LOCK_thd_kill); print_aborted_warning(3, "KILLED"); @@ -1835,7 +1886,8 @@ void THD::awake_no_mutex(killed_state state_to_set) } /* Interrupt target waiting inside a storage engine. */ - if (state_to_set != NOT_KILLED) + if (IF_WSREP(state_to_set != NOT_KILLED && !wsrep_is_bf_aborted(this), + state_to_set != NOT_KILLED)) ha_kill_query(this, thd_kill_level(this)); /* Broadcast a condition to kick the target if it is waiting on it. */ @@ -1988,12 +2040,6 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, if (!thd_table->needs_reopen()) { signalled|= mysql_lock_abort_for_thread(this, thd_table); - if (WSREP(this) && wsrep_thd_is_BF(this, FALSE)) - { - WSREP_DEBUG("remove_table_from_cache: %llu", - (unsigned long long) this->real_id); - wsrep_abort_thd((void *)this, (void *)in_use, FALSE); - } } } } @@ -2225,12 +2271,6 @@ void THD::cleanup_after_query() /* reset table map for multi-table update */ table_map_for_update= 0; m_binlog_invoker= INVOKER_NONE; -#ifdef WITH_WSREP - if (TOTAL_ORDER == wsrep_exec_mode) - { - wsrep_exec_mode = LOCAL_STATE; - } -#endif /* WITH_WSREP */ #ifndef EMBEDDED_LIBRARY if (rgi_slave) @@ -2238,7 +2278,6 @@ void THD::cleanup_after_query() #endif #ifdef WITH_WSREP - wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED; if (!in_active_multi_stmt_transaction()) wsrep_affected_rows= 0; #endif /* WITH_WSREP */ @@ -5007,8 +5046,9 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd) if (WSREP(thd)) { /* for wsrep binlog format is meaningful also when binlogging is off */ - return (int) thd->wsrep_binlog_format(); + return (int) WSREP_BINLOG_FORMAT(thd->variables.binlog_format); } + if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) return (int) thd->variables.binlog_format; return BINLOG_FORMAT_UNSPEC; @@ -5491,6 +5531,10 @@ void THD::set_query_and_id(char *query_arg, uint32 query_length_arg, set_query_inner(query_arg, query_length_arg, cs); mysql_mutex_unlock(&LOCK_thd_data); query_id= new_query_id; +#ifdef WITH_WSREP + set_wsrep_next_trx_id(query_id); + WSREP_DEBUG("assigned new next query and trx id: %lu", wsrep_next_trx_id()); +#endif /* WITH_WSREP */ } /** Assign a new value to thd->mysys_var. */ @@ -5936,9 +5980,27 @@ int THD::decide_logging_format(TABLE_LIST *tables) binlogging is off, or if the statement is filtered out from the binlog by filtering rules. */ +#ifdef WITH_WSREP + if (WSREP_CLIENT_NNULL(this) && variables.wsrep_trx_fragment_size > 0) + { + if (!is_current_stmt_binlog_format_row()) + { + my_message(ER_NOT_SUPPORTED_YET, + "Streaming replication not supported with " + "binlog_format=STATEMENT", MYF(0)); + DBUG_RETURN(-1); + } + } + + if ((WSREP_EMULATE_BINLOG_NNULL(this) || + (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) && + !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && + !binlog_filter->db_ok(db.str))) +#else if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && !(wsrep_binlog_format() == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db.str))) +#endif /* WITH_WSREP */ { if (is_bulk_op()) @@ -6260,7 +6322,8 @@ int THD::decide_logging_format(TABLE_LIST *tables) 5. Error: Cannot modify table that uses a storage engine limited to row-logging when binlog_format = STATEMENT */ - if (IF_WSREP((!WSREP(this) || wsrep_exec_mode == LOCAL_STATE),1)) + if (IF_WSREP((!WSREP(this) || + wsrep_cs().mode() == wsrep::client_state::m_local),1)) { my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); } |