diff options
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r-- | sql/sql_class.cc | 251 |
1 files changed, 244 insertions, 7 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc index d7d0c8d3f68..5b63201a910 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -62,6 +62,9 @@ #include "debug_sync.h" #include "sql_parse.h" // is_update_query #include "sql_callback.h" +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif #include "sql_connect.h" /* @@ -695,6 +698,137 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length, return buffer; } +#ifdef WITH_WSREP +extern "C" int wsrep_on(void *thd) +{ + return (int)(WSREP(((THD*)thd))); +} +extern "C" bool wsrep_thd_is_wsrep_on(THD *thd) +{ + return thd->variables.wsrep_on; +} + +extern "C" bool wsrep_consistency_check(void *thd) +{ + return ((THD*)thd)->wsrep_consistency_check; +} + +extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode) +{ + thd->wsrep_exec_mode= mode; +} +extern "C" void wsrep_thd_set_query_state( + THD *thd, enum wsrep_query_state state) +{ + thd->wsrep_query_state= state; +} +extern "C" void wsrep_thd_set_conflict_state( + THD *thd, enum wsrep_conflict_state state) +{ + thd->wsrep_conflict_state= state; +} + + +extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd) +{ + return thd->wsrep_exec_mode; +} +extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd) +{ + return thd->wsrep_query_state; +} +extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd) +{ + return thd->wsrep_conflict_state; +} + +extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd) +{ + return &thd->wsrep_trx_handle; +} + +extern "C"void wsrep_thd_LOCK(THD *thd) +{ + mysql_mutex_lock(&thd->LOCK_wsrep_thd); +} +extern "C"void wsrep_thd_UNLOCK(THD *thd) +{ + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); +} +extern "C" time_t wsrep_thd_query_start(THD *thd) +{ + return thd->query_start(); +} +extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd) +{ + return thd->wsrep_rand; +} +extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) +{ + return thd->thread_id; +} +extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) +{ + return thd->wsrep_trx_seqno; +} +extern "C" query_id_t wsrep_thd_query_id(THD *thd) +{ + return thd->query_id; +} +extern "C" char *wsrep_thd_query(THD *thd) +{ + return thd->query(); +} +extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd) +{ + return thd->wsrep_last_query_id; +} +extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id) +{ + thd->wsrep_last_query_id= id; +} +extern "C" void wsrep_thd_awake(THD *thd, my_bool signal) +{ + if (signal) + { + mysql_mutex_lock(&thd->LOCK_thd_data); + thd->awake(KILL_QUERY); + mysql_mutex_unlock(&thd->LOCK_thd_data); + } + else + { + mysql_mutex_lock(&LOCK_wsrep_replaying); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + } +} + +extern "C" int +wsrep_trx_order_before(void *thd1, void *thd2) +{ + if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) { + WSREP_DEBUG("BF conflict, order: %lld %lld\n", + (long long)((THD*)thd1)->wsrep_trx_seqno, + (long long)((THD*)thd2)->wsrep_trx_seqno); + return 1; + } + WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", + (long long)((THD*)thd1)->wsrep_trx_seqno, + (long long)((THD*)thd2)->wsrep_trx_seqno); + return 0; +} +extern "C" int +wsrep_trx_is_aborting(void *thd_ptr) +{ + if (thd_ptr) { + if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) || + (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) { + return 1; + } + } + return 0; +} +#endif /** Implementation of Drop_table_error_handler::handle_condition(). @@ -723,7 +857,11 @@ bool Drop_table_error_handler::handle_condition(THD *thd, } +#ifdef WITH_WSREP +THD::THD(bool is_applier) +#else THD::THD() +#endif :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), rli_fake(0), @@ -750,6 +888,10 @@ THD::THD() bootstrap(0), derived_tables_processing(FALSE), spcont(NULL), +#ifdef WITH_WSREP + wsrep_applier(is_applier), + wsrep_client_thread(0), +#endif m_parser_state(NULL), #if defined(ENABLED_DEBUG_SYNC) debug_sync_control(0), @@ -849,6 +991,20 @@ THD::THD() command=COM_CONNECT; *scramble= '\0'; +#ifdef WITH_WSREP + mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); + wsrep_trx_handle.trx_id= -1; + wsrep_trx_handle.opaque= NULL; + //wsrep_retry_autocommit= ::wsrep_retry_autocommit; + wsrep_retry_counter= 0; + wsrep_PA_safe = true; + wsrep_seqno_changed= false; + wsrep_retry_query = NULL; + wsrep_retry_query_len = 0; + wsrep_retry_command = COM_CONNECT; + wsrep_consistency_check = false; +#endif /* Call to init() below requires fully initialized Open_tables_state. */ reset_open_tables_state(this); @@ -881,6 +1037,13 @@ THD::THD() my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id); substitute_null_with_insert_id = FALSE; thr_lock_info_init(&lock_info); /* safety: will be reset after start */ +#ifdef WITH_WSREP + lock_info.mysql_thd= (void *)this; + lock_info.in_lock_tables= false; +#ifdef WSREP_PROC_INFO + wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */ +#endif /* WSREP_PROC_INFO */ +#endif /* WITH_WSREP */ m_internal_handler= NULL; m_binlog_invoker= FALSE; @@ -1182,7 +1345,19 @@ void THD::init(void) reset_current_stmt_binlog_format_row(); bzero((char *) &status_var, sizeof(status_var)); bzero((char *) &org_status_var, sizeof(org_status_var)); - +#ifdef WITH_WSREP + wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE; + wsrep_conflict_state= NO_CONFLICT; + wsrep_query_state= QUERY_IDLE; + wsrep_last_query_id= 0; + wsrep_trx_seqno= 0; + wsrep_converted_lock_session= false; + wsrep_retry_counter= 0; + wsrep_rli= NULL; + wsrep_PA_safe= true; + wsrep_seqno_changed= false; + wsrep_consistency_check = false; +#endif if (variables.sql_log_bin) variables.option_bits|= OPTION_BIN_LOG; else @@ -1376,6 +1551,12 @@ THD::~THD() mysql_mutex_unlock(&LOCK_thd_data); add_to_status(&global_status_var, &status_var); +#ifdef WITH_WSREP + mysql_mutex_lock(&LOCK_wsrep_thd); + mysql_mutex_unlock(&LOCK_wsrep_thd); + mysql_mutex_destroy(&LOCK_wsrep_thd); + if (wsrep_rli) delete wsrep_rli; +#endif /* Close connection */ #ifndef EMBEDDED_LIBRARY if (net.vio) @@ -1790,6 +1971,13 @@ void THD::cleanup_after_query() /* reset table map for multi-table update */ table_map_for_update= 0; m_binlog_invoker= FALSE; +#ifdef WITH_WSREP + if (TOTAL_ORDER == wsrep_exec_mode) + { + wsrep_exec_mode = LOCAL_STATE; + } + //wsrep_trx_seqno = 0; +#endif /* WITH_WSREP */ DBUG_VOID_RETURN; } @@ -2205,6 +2393,13 @@ bool sql_exchange::escaped_given(void) bool select_send::send_result_set_metadata(List<Item> &list, uint flags) { bool res; +#ifdef WITH_WSREP + if (WSREP(thd) && thd->wsrep_retry_query) + { + WSREP_DEBUG("skipping select metadata"); + return FALSE; + } +#endif /* WITH_WSREP */ if (!(res= thd->protocol->send_result_set_metadata(&list, flags))) is_result_set_started= 1; return res; @@ -3911,8 +4106,13 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd) extern "C" int thd_binlog_format(const MYSQL_THD thd) { +#ifdef WITH_WSREP + if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) && + (thd->variables.option_bits & OPTION_BIN_LOG)) +#else if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) - return (int) thd->variables.binlog_format; +#endif + return (int) WSREP_FORMAT(thd->variables.binlog_format); else return BINLOG_FORMAT_UNSPEC; } @@ -4467,7 +4667,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) binlog by filtering rules. */ if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && - !(variables.binlog_format == BINLOG_FORMAT_STMT && + !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db))) { /* @@ -4631,7 +4831,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) */ my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); } - else if (variables.binlog_format == BINLOG_FORMAT_ROW && + else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW && sqlcom_can_generate_row_events(this)) { /* @@ -4660,7 +4860,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) else { /* binlog_format = STATEMENT */ - if (variables.binlog_format == BINLOG_FORMAT_STMT) + if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT) { if (lex->is_stmt_row_injection()) { @@ -4677,7 +4877,14 @@ int THD::decide_logging_format(TABLE_LIST *tables) 5. Error: Cannot modify table that uses a storage engine limited to row-logging when binlog_format = STATEMENT */ +#ifdef WITH_WSREP + if (!WSREP(this) || wsrep_exec_mode == LOCAL_STATE) + { +#endif /* WITH_WSREP */ my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), ""); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ } else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0) { @@ -4725,7 +4932,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) "and binlog_filter->db_ok(db) = %d", mysql_bin_log.is_open(), (variables.option_bits & OPTION_BIN_LOG), - variables.binlog_format, + WSREP_FORMAT(variables.binlog_format), binlog_filter->db_ok(db))); #endif @@ -4979,7 +5186,13 @@ int THD::binlog_write_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { +#ifdef WITH_WSREP + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) || + mysql_bin_log.is_open())); +#else DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +#endif /* Pack records into format for transfer. We are allocating more @@ -5009,7 +5222,13 @@ int THD::binlog_update_row(TABLE* table, bool is_trans, const uchar *before_record, const uchar *after_record) { +#ifdef WITH_WSREP + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) + || mysql_bin_log.is_open())); +#else DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +#endif size_t const before_maxlen = max_row_length(table, before_record); size_t const after_maxlen = max_row_length(table, after_record); @@ -5054,7 +5273,13 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans, MY_BITMAP const* cols, size_t colcnt, uchar const *record) { +#ifdef WITH_WSREP + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + ((WSREP(this) && wsrep_emulate_bin_log) + || mysql_bin_log.is_open())); +#else DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +#endif /* Pack records into format for transfer. We are allocating more @@ -5085,7 +5310,11 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps, { DBUG_ENTER("THD::binlog_remove_pending_rows_event"); +#ifdef WITH_WSREP + if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())) +#else if (!mysql_bin_log.is_open()) +#endif DBUG_RETURN(0); mysql_bin_log.remove_pending_rows_event(this, is_transactional); @@ -5104,7 +5333,11 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional) mode: it might be the case that we left row-based mode before flushing anything (e.g., if we have explicitly locked tables). */ +#ifdef WITH_WSREP + if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())) +#else if (!mysql_bin_log.is_open()) +#endif DBUG_RETURN(0); /* @@ -5224,8 +5457,12 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg, DBUG_ENTER("THD::binlog_query"); DBUG_PRINT("enter", ("qtype: %s query: '%-.*s'", show_query_type(qtype), (int) query_len, query_arg)); +#ifdef WITH_WSREP + DBUG_ASSERT(query_arg && (WSREP_EMULATE_BINLOG(this) + || mysql_bin_log.is_open())); +#else DBUG_ASSERT(query_arg && mysql_bin_log.is_open()); - +#endif /* If we are not in prelocked mode, mysql_unlock_tables() will be called after this binlog_query(), so we have to flush the pending |