diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 131 |
1 files changed, 113 insertions, 18 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 8d6ddc0bb08..443da8557ad 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -64,6 +64,8 @@ #include "sql_parse.h" // is_update_query #include "sql_callback.h" #include "lock.h" +#include "wsrep_mysqld.h" +#include "wsrep_thd.h" #include "sql_connect.h" /* @@ -859,7 +861,7 @@ bool Drop_table_error_handler::handle_condition(THD *thd, } -THD::THD() +THD::THD(bool is_applier) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), rli_fake(0), rgi_fake(0), rgi_slave(NULL), @@ -889,6 +891,16 @@ THD::THD() bootstrap(0), derived_tables_processing(FALSE), spcont(NULL), +#ifdef WITH_WSREP + wsrep_applier(is_applier), + wsrep_applier_closing(false), + wsrep_client_thread(false), + wsrep_po_handle(WSREP_PO_INITIALIZER), + wsrep_po_cnt(0), +// wsrep_po_in_trans(false), + wsrep_apply_format(0), + wsrep_apply_toi(false), +#endif m_parser_state(NULL), #if defined(ENABLED_DEBUG_SYNC) debug_sync_control(0), @@ -1004,6 +1016,22 @@ THD::THD() m_command=COM_CONNECT; *scramble= '\0'; +#ifdef WITH_WSREP + mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); + wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; + wsrep_ws_handle.opaque = NULL; + wsrep_retry_counter = 0; + wsrep_PA_safe = true; + wsrep_retry_query = NULL; + wsrep_retry_query_len = 0; + wsrep_retry_command = COM_CONNECT; + wsrep_consistency_check = NO_CONSISTENCY_CHECK; + wsrep_status_vars = 0; + wsrep_mysql_replicated = 0; + wsrep_TOI_pre_query = NULL; + wsrep_TOI_pre_query_len = 0; +#endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -1043,6 +1071,9 @@ THD::THD() my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id); substitute_null_with_insert_id = FALSE; thr_lock_info_init(&lock_info); /* safety: will be reset after start */ + lock_info.mysql_thd= (void *)this; + + wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ m_internal_handler= NULL; m_binlog_invoker= INVOKER_NONE; @@ -1387,7 +1418,24 @@ void THD::init(void) bzero((char *) &org_status_var, sizeof(org_status_var)); start_bytes_received= 0; last_commit_gtid.seq_no= 0; - +#ifdef WITH_WSREP + wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE; + wsrep_conflict_state= NO_CONFLICT; + wsrep_query_state= QUERY_IDLE; + wsrep_last_query_id= 0; + wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; + wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; + wsrep_converted_lock_session= false; + wsrep_retry_counter= 0; + wsrep_rli= NULL; + wsrep_rgi= NULL; + wsrep_PA_safe= true; + wsrep_consistency_check = NO_CONSISTENCY_CHECK; + wsrep_mysql_replicated = 0; + + wsrep_TOI_pre_query = NULL; + wsrep_TOI_pre_query_len = 0; +#endif if (variables.sql_log_bin) variables.option_bits|= OPTION_BIN_LOG; else @@ -1582,6 +1630,14 @@ THD::~THD() mysql_mutex_lock(&LOCK_thd_data); mysql_mutex_unlock(&LOCK_thd_data); +#ifdef WITH_WSREP + mysql_mutex_lock(&LOCK_wsrep_thd); + mysql_mutex_unlock(&LOCK_wsrep_thd); + mysql_mutex_destroy(&LOCK_wsrep_thd); + if (wsrep_rli) delete wsrep_rli; + if (wsrep_rgi) delete wsrep_rgi; + if (wsrep_status_vars) wsrep->stats_free(wsrep, wsrep_status_vars); +#endif /* Close connection */ #ifndef EMBEDDED_LIBRARY if (net.vio) @@ -1892,7 +1948,17 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use, (e.g. see partitioning code). */ if (!thd_table->needs_reopen()) + { signalled|= mysql_lock_abort_for_thread(this, thd_table); +#if WITH_WSREP + if (this && WSREP(this) && wsrep_thd_is_BF((void *)this, FALSE)) + { + WSREP_DEBUG("remove_table_from_cache: %llu", + (unsigned long long) this->real_id); + wsrep_abort_thd((void *)this, (void *)in_use, FALSE); + } +#endif /* WITH_WSREP */ + } } mysql_mutex_unlock(&in_use->LOCK_thd_data); } @@ -2074,6 +2140,12 @@ void THD::cleanup_after_query() /* reset table map for multi-table update */ table_map_for_update= 0; m_binlog_invoker= INVOKER_NONE; +#ifdef WITH_WSREP + if (TOTAL_ORDER == wsrep_exec_mode) + { + wsrep_exec_mode = LOCAL_STATE; + } +#endif /* WITH_WSREP */ #ifndef EMBEDDED_LIBRARY if (rgi_slave) @@ -2496,6 +2568,13 @@ bool sql_exchange::escaped_given(void) bool select_send::send_result_set_metadata(List<Item> &list, uint flags) { bool res; +#ifdef WITH_WSREP + if (WSREP(thd) && thd->wsrep_retry_query) + { + WSREP_DEBUG("skipping select metadata"); + return FALSE; + } +#endif /* WITH_WSREP */ if (!(res= thd->protocol->send_result_set_metadata(&list, flags))) is_result_set_started= 1; return res; @@ -4241,8 +4320,10 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd) extern "C" int thd_binlog_format(const MYSQL_THD thd) { - if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) - return (int) thd->variables.binlog_format; + if (IF_WSREP(((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()), + mysql_bin_log.is_open()) && + thd->variables.option_bits & OPTION_BIN_LOG) + return (int) WSREP_FORMAT(thd->variables.binlog_format); else return BINLOG_FORMAT_UNSPEC; } @@ -4972,7 +5053,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) binlog by filtering rules. */ if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && - !(variables.binlog_format == BINLOG_FORMAT_STMT && + !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db))) { /* @@ -5182,7 +5263,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) */ my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); } - else if (variables.binlog_format == BINLOG_FORMAT_ROW && + else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW && sqlcom_can_generate_row_events(this)) { /* @@ -5211,7 +5292,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) else { /* binlog_format = STATEMENT */ - if (variables.binlog_format == BINLOG_FORMAT_STMT) + if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT) { if (lex->is_stmt_row_injection()) { @@ -5228,7 +5309,10 @@ int THD::decide_logging_format(TABLE_LIST *tables) 5. Error: Cannot modify table that uses a storage engine limited to row-logging when binlog_format = STATEMENT */ - my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); + if (IF_WSREP((!WSREP(this) || wsrep_exec_mode == LOCAL_STATE),1)) + { + my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); + } } else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) { @@ -5340,7 +5424,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) "and binlog_filter->db_ok(db) = %d", mysql_bin_log.is_open(), (variables.option_bits & OPTION_BIN_LOG), - variables.binlog_format, + WSREP_FORMAT(variables.binlog_format), binlog_filter->db_ok(db))); #endif @@ -5576,9 +5660,11 @@ CPP_UNNAMED_NS_END int THD::binlog_write_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) -{ - DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +{ + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + IF_WSREP(((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()), + mysql_bin_log.is_open())); /* Pack records into format for transfer. We are allocating more memory than needed, but that doesn't matter. @@ -5610,8 +5696,10 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, const uchar *before_record, const uchar *after_record) -{ - DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +{ + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + IF_WSREP(((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()), + mysql_bin_log.is_open())); size_t const before_maxlen = max_row_length(table, before_record); size_t const after_maxlen = max_row_length(table, after_record); @@ -5659,8 +5747,10 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, int THD::binlog_delete_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) -{ - DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +{ + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + IF_WSREP(((WSREP(this) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()), + mysql_bin_log.is_open())); /* Pack records into format for transfer. We are allocating more @@ -5695,7 +5785,8 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps, { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); - if (!mysql_bin_log.is_open()) + IF_WSREP(!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()), + !mysql_bin_log.is_open()); DBUG_RETURN(0); /* Ensure that all events in a GTID group are in the same cache */ @@ -5718,7 +5809,8 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) mode: it might be the case that we left row-based mode before flushing anything (e.g., if we have explicitly locked tables). */ - if (!mysql_bin_log.is_open()) + if(IF_WSREP(!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()), + !mysql_bin_log.is_open())) DBUG_RETURN(0); /* Ensure that all events in a GTID group are in the same cache */ @@ -5970,7 +6062,10 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, DBUG_ENTER("THD::binlog_query"); DBUG_PRINT("enter", ("qtype: %s query: '%-.*s'", show_query_type(qtype), (int) query_len, query_arg)); - DBUG_ASSERT(query_arg && mysql_bin_log.is_open()); + + DBUG_ASSERT(query_arg && + IF_WSREP((WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()), + mysql_bin_log.is_open())); /* If this is withing a BEGIN ... COMMIT group, don't log it */ if (variables.option_bits & OPTION_GTID_BEGIN) |