diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/CMakeLists.txt | 3 | ||||
-rw-r--r-- | sql/events.cc | 2 | ||||
-rw-r--r-- | sql/handler.cc | 12 | ||||
-rw-r--r-- | sql/log.cc | 154 | ||||
-rw-r--r-- | sql/log.h | 15 | ||||
-rw-r--r-- | sql/log_event.cc | 57 | ||||
-rw-r--r-- | sql/mdl.cc | 5 | ||||
-rw-r--r-- | sql/mysqld.cc | 101 | ||||
-rw-r--r-- | sql/mysqld.h | 8 | ||||
-rw-r--r-- | sql/sp.cc | 10 | ||||
-rw-r--r-- | sql/sql_base.cc | 9 | ||||
-rw-r--r-- | sql/sql_class.cc | 47 | ||||
-rw-r--r-- | sql/sql_class.h | 15 | ||||
-rw-r--r-- | sql/sql_parse.cc | 886 | ||||
-rw-r--r-- | sql/sql_trigger.cc | 4 | ||||
-rw-r--r-- | sql/sys_vars.cc | 14 | ||||
-rw-r--r-- | sql/transaction.cc | 3 | ||||
-rw-r--r-- | sql/wsrep_applier.cc | 351 | ||||
-rw-r--r-- | sql/wsrep_applier.h | 38 | ||||
-rw-r--r-- | sql/wsrep_binlog.cc | 321 | ||||
-rw-r--r-- | sql/wsrep_binlog.h | 49 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 173 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 368 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 195 | ||||
-rw-r--r-- | sql/wsrep_notify.cc | 3 | ||||
-rw-r--r-- | sql/wsrep_priv.h | 210 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 68 | ||||
-rw-r--r-- | sql/wsrep_sst.h | 40 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 464 | ||||
-rw-r--r-- | sql/wsrep_thd.h | 32 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 87 | ||||
-rw-r--r-- | sql/wsrep_utils.h | 208 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 34 | ||||
-rw-r--r-- | sql/wsrep_var.h | 83 |
34 files changed, 2361 insertions, 1708 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index e59ea142a11..2378ced6504 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -48,6 +48,9 @@ IF(WITH_WSREP) wsrep_sst.cc wsrep_utils.cc wsrep_var.cc + wsrep_binlog.cc + wsrep_applier.cc + wsrep_thd.cc ) SET(WSREP_LIB wsrep) ENDIF() diff --git a/sql/events.cc b/sql/events.cc index 78226bbf7ef..a39f31f416c 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -1147,7 +1147,7 @@ end: DBUG_RETURN(ret); } #ifdef WITH_WSREP -int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_event_query(THD *thd, uchar** buf, size_t* buf_len) { String log_query; diff --git a/sql/handler.cc b/sql/handler.cc index 9cadfb2cf3f..58356116883 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1248,7 +1248,7 @@ int ha_commit_trans(THD *thd, bool all) #ifdef WITH_WSREP if (!WSREP(thd) && - thd->mdl_context.acquire_lock(&mdl_request, + thd->mdl_context.acquire_lock(&mdl_request, #else if (thd->mdl_context.acquire_lock(&mdl_request, #endif /* WITH_WSREP */ @@ -1313,8 +1313,9 @@ int ha_commit_trans(THD *thd, bool all) else { /* not wsrep hton, bail to native mysql behavior */ -#endif +#endif /* WITH_WSREP */ my_error(ER_ERROR_DURING_COMMIT, MYF(0), err); + error= 1; #ifdef WITH_WSREP } /* End of else */ #endif @@ -1426,7 +1427,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) #ifdef WSREP_PROC_INFO char info[64]= { 0, }; snprintf (info, sizeof(info) - 1, "ha_commit_one_phase(%lld)", - (long long)thd->wsrep_trx_seqno); + (long long)wsrep_thd_trx_seqno(thd)); #else const char info[]="ha_commit_one_phase()"; #endif /* WSREP_PROC_INFO */ @@ -1462,7 +1463,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans) } /* Free resources and perform other cleanup even for 'empty' transactions. */ if (is_real_trans) - thd->transaction.cleanup(); + thd->transaction.cleanup(); #ifdef WITH_WSREP if (WSREP(thd)) thd_proc_info(thd, tmp_info); #endif /* WITH_WSREP */ @@ -1546,8 +1547,7 @@ int ha_rollback_trans(THD *thd, bool all) /* Always cleanup. Even if nht==0. There may be savepoints. */ if (is_real_trans) - thd->transaction.cleanup(); - + thd->transaction.cleanup(); if (all) thd->transaction_rollback_request= FALSE; diff --git a/sql/log.cc b/sql/log.cc index 242c45a7ed7..7f6cff3cc59 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -490,24 +490,10 @@ private: }; handlerton *binlog_hton; -#ifdef WITH_WSREP -extern handlerton *wsrep_hton; -#endif - -bool LOGGER::is_log_table_enabled(uint log_table_type) -{ - switch (log_table_type) { - case QUERY_LOG_SLOW: - return (table_log_handler != NULL) && opt_slow_log; - case QUERY_LOG_GENERAL: - return (table_log_handler != NULL) && opt_log ; - default: - DBUG_ASSERT(0); - return FALSE; /* make compiler happy */ - } -} -#ifdef WITH_WSREP +#if WITH_WSREP +/* the functions below depend on the definition of binlog_cache_manager class, + * so have to stay in this unit. */ IO_CACHE * get_trans_log(THD * thd) { binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) @@ -515,7 +501,7 @@ IO_CACHE * get_trans_log(THD * thd) if (cache_mngr) { return cache_mngr->get_binlog_cache_log(true); - } + } else { WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id); @@ -523,7 +509,6 @@ IO_CACHE * get_trans_log(THD * thd) } } - bool wsrep_trans_cache_is_empty(THD *thd) { binlog_cache_mngr *const cache_mngr= @@ -535,6 +520,7 @@ void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) { thd->binlog_flush_pending_rows_event(stmt_end); } + void thd_binlog_trx_reset(THD * thd) { /* @@ -556,74 +542,21 @@ void thd_binlog_rollback_stmt(THD * thd) (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); } -/* - Write the contents of a cache to memory buffer. - - This function quite the same as MYSQL_BIN_LOG::write_cache(), - with the exception that here we write in buffer instead of log file. - */ +#endif /* WITH_WSREP */ -int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) +bool LOGGER::is_log_table_enabled(uint log_table_type) { - - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - return ER_ERROR_ON_WRITE; - uint length= my_b_bytes_in_cache(cache); - long long total_length = 0; - uchar *buf_ptr = NULL; - - do - { - /* bail out if buffer grows too large - This is a temporary fix to avoid flooding replication - TODO: remove this check for 0.7.4 release - */ - if (total_length > wsrep_max_ws_size) - { - WSREP_WARN("transaction size limit (%lld) exceeded: %lld", - wsrep_max_ws_size, total_length); - if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0)) - { - WSREP_WARN("failed to initialize io-cache"); - } - if (buf_ptr) my_free(*buf); - *buf_len = 0; - return ER_ERROR_ON_WRITE; - } - if (total_length > 0) - { - *buf_len += length; - *buf = (uchar *)my_realloc(*buf, total_length+length, MYF(0)); - if (!*buf) - { - WSREP_ERROR("io cache write problem: %d %d", *buf_len, length); - return ER_ERROR_ON_WRITE; - } - buf_ptr = *buf+total_length; - } - else - { - if (buf_ptr != NULL) - { - WSREP_ERROR("io cache alloc error: %d %d", *buf_len, length); - my_free(*buf); - } - if (length > 0) - { - *buf = (uchar *) my_malloc(length, MYF(0)); - buf_ptr = *buf; - *buf_len = length; - } - } - total_length += length; - - memcpy(buf_ptr, cache->read_pos, length); - cache->read_pos=cache->read_end; - } while ((cache->file >= 0) && (length= my_b_fill(cache))); - - return 0; + switch (log_table_type) { + case QUERY_LOG_SLOW: + return (table_log_handler != NULL) && opt_slow_log; + case QUERY_LOG_GENERAL: + return (table_log_handler != NULL) && opt_log ; + default: + DBUG_ASSERT(0); + return FALSE; /* make compiler happy */ + } } -#endif + /* Check if a given table is opened log table */ int check_if_log_table(size_t db_len, const char *db, size_t table_name_len, @@ -1817,13 +1750,6 @@ static inline int binlog_commit_flush_stmt_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { -#ifdef WITH_WSREP - if (thd->wsrep_mysql_replicated > 0) - { - WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", thd->wsrep_mysql_replicated); - return 0; - } -#endif Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), FALSE, TRUE, TRUE, 0); return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); @@ -1841,6 +1767,13 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all, static inline int binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { +#ifdef WITH_WSREP + if (thd->wsrep_mysql_replicated > 0) + { + WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", thd->wsrep_mysql_replicated); + return 0; + } +#endif Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), TRUE, TRUE, TRUE, 0); return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); @@ -2104,12 +2037,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) if (ending_trans(thd, all) && ((thd->variables.option_bits & OPTION_KEEP_LOG) || (trans_has_updated_non_trans_table(thd) && - WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) || + WSREP_BINLOG_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) || (cache_mngr->trx_cache.changes_to_non_trans_temp_table() && - WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) || + WSREP_BINLOG_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) || (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && - WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED))) + WSREP_BINLOG_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED))) error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr); /* Truncate the cache if: @@ -2123,9 +2056,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) else if (ending_trans(thd, all) || (!(thd->variables.option_bits & OPTION_KEEP_LOG) && (!stmt_has_updated_non_trans_table(thd) || - WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) && + WSREP_BINLOG_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) && (!cache_mngr->trx_cache.changes_to_non_trans_temp_table() || - WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED))) + WSREP_BINLOG_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED))) error= binlog_truncate_trx_cache(thd, cache_mngr, all); } @@ -5439,35 +5372,6 @@ err: } } -#ifdef WITH_WSREP - if (WSREP(thd) && wsrep_incremental_data_collection && - (wsrep_emulate_bin_log || mysql_bin_log.is_open())) - { - DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1); - if (!error) - { - IO_CACHE* cache= get_trans_log(thd); - uchar* buf= NULL; - uint buf_len= 0; - - if (wsrep_emulate_bin_log) - thd->binlog_flush_pending_rows_event(false); - error= wsrep_write_cache(cache, &buf, &buf_len); - if (!error && buf_len > 0) - { - wsrep_status_t rc= wsrep->append_data(wsrep, - &thd->wsrep_trx_handle, - buf, buf_len); - if (rc != WSREP_OK) - { - sql_print_warning("WSREP: append_data() returned %d", rc); - error= 1; - } - } - if (buf_len) my_free(buf); - } - } -#endif /* WITH_WSREP */ DBUG_RETURN(error); } diff --git a/sql/log.h b/sql/log.h index 0e189a789db..34b9df53c93 100644 --- a/sql/log.h +++ b/sql/log.h @@ -273,12 +273,6 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED }; (mmap+fsync is two times faster than write+fsync) */ -#ifdef WITH_WSREP -extern my_bool wsrep_emulate_bin_log; -Log_event* wsrep_read_log_event( - char **arg_buf, size_t *arg_buf_len, - const Format_description_log_event *description_event); -#endif class MYSQL_LOG { public: @@ -870,18 +864,17 @@ enum enum_binlog_format { }; #ifdef WITH_WSREP -IO_CACHE * get_trans_log(THD * thd); +IO_CACHE* get_trans_log(THD * thd); bool wsrep_trans_cache_is_empty(THD *thd); void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end); void thd_binlog_trx_reset(THD * thd); void thd_binlog_rollback_stmt(THD * thd); -int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len); -#define WSREP_FORMAT(my_format) \ - ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \ +#define WSREP_BINLOG_FORMAT(my_format) \ + ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \ wsrep_forced_binlog_format : my_format) #else -#define WSREP_FORMAT(my_format) my_format +#define WSREP_BINLOG_FORMAT(my_format) my_format #endif int query_error_code(THD *thd, bool not_killed); uint purge_log_get_error_code(int res); diff --git a/sql/log_event.cc b/sql/log_event.cc index b4f5f5ad8b2..dac39a63282 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -8406,7 +8406,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) thd->is_fatal_error, thd->wsrep_exec_mode, thd->wsrep_conflict_state, - (long long)thd->wsrep_trx_seqno); + (long long)wsrep_thd_trx_seqno(thd)); } #endif if (thd->is_slave_error || thd->is_fatal_error) @@ -10187,7 +10187,7 @@ Write_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -10864,7 +10864,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -10880,7 +10880,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::ha_delete_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()"); @@ -11016,7 +11016,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -11053,7 +11053,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Update_rows_log_event::unpack_current_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) @@ -11082,7 +11082,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Update_rows_log_event::ha_update_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()"); @@ -11179,48 +11179,6 @@ void Incident_log_event::pack_info(THD *thd, Protocol *protocol) protocol->store(buf, bytes, &my_charset_bin); } #endif -#if WITH_WSREP && !defined(MYSQL_CLIENT) -Format_description_log_event *wsrep_format_desc; // TODO: free them at the end -/* - read the first event from (*buf). The size of the (*buf) is (*buf_len). - At the end (*buf) is shitfed to point to the following event or NULL and - (*buf_len) will be changed to account just being read bytes of the 1st event. -*/ -#define WSREP_MAX_ALLOWED_PACKET 1024*1024*1024 // current protocol max - -Log_event* wsrep_read_log_event( - char **arg_buf, size_t *arg_buf_len, - const Format_description_log_event *description_event) -{ - DBUG_ENTER("wsrep_read_log_event"); - char *head= (*arg_buf); - - uint data_len = uint4korr(head + EVENT_LEN_OFFSET); - char *buf= (*arg_buf); - const char *error= 0; - Log_event *res= 0; - - if (data_len > WSREP_MAX_ALLOWED_PACKET) - { - error = "Event too big"; - goto err; - } - - res= Log_event::read_log_event(buf, data_len, &error, description_event, FALSE); - -err: - if (!res) - { - DBUG_ASSERT(error != 0); - sql_print_error("Error in Log_event::read_log_event(): " - "'%s', data_len: %d, event_type: %d", - error,data_len,head[EVENT_TYPE_OFFSET]); - } - (*arg_buf)+= data_len; - (*arg_buf_len)-= data_len; - DBUG_RETURN(res); -} -#endif #ifdef MYSQL_CLIENT @@ -11310,6 +11268,7 @@ st_print_event_info::st_print_event_info() } #endif + #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, const Format_description_log_event* description_event) diff --git a/sql/mdl.cc b/sql/mdl.cc index fe8efd3e6cd..268a6621f65 100644 --- a/sql/mdl.cc +++ b/sql/mdl.cc @@ -22,6 +22,7 @@ #include <mysql/service_thd_wait.h> #ifdef WITH_WSREP #include "wsrep_mysqld.h" +#include "wsrep_thd.h" extern "C" my_thread_id wsrep_thd_thread_id(THD *thd); extern "C" char *wsrep_thd_query(THD *thd); void sql_print_information(const char *format, ...) @@ -3029,7 +3030,7 @@ void MDL_ticket::wsrep_report(bool debug) { if (debug) { - WSREP_DEBUG("MDL ticket: type: %s space: %s db: %s name: %s (%s)", + WSREP_DEBUG("MDL ticket: type: %s space: %s db: %s name: %s", (get_type() == MDL_INTENTION_EXCLUSIVE) ? "intention exclusive" : ((get_type() == MDL_SHARED) ? "shared" : ((get_type() == MDL_SHARED_HIGH_PRIO ? "shared high prio" : @@ -3049,7 +3050,7 @@ void MDL_ticket::wsrep_report(bool debug) ((m_lock->key.mdl_namespace() == MDL_key::COMMIT) ? "COMMIT" : (char *)"UNKNOWN"))))))), m_lock->key.db_name(), - m_lock->key.name(), + m_lock->key.name(), m_lock->key.get_wait_state_name()); } } diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 849ceb1db39..953b59031d8 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -74,6 +74,9 @@ #include "debug_sync.h" #ifdef WITH_WSREP #include "wsrep_mysqld.h" +#include "wsrep_var.h" +#include "wsrep_thd.h" +#include "wsrep_sst.h" ulong wsrep_running_threads = 0; // # of currently running wsrep threads #endif #include "sql_callback.h" @@ -354,7 +357,11 @@ static char *default_character_set_name; static char *character_set_filesystem_name; static char *lc_messages; static char *lc_time_names_name; +#ifndef WITH_WSREP static char *my_bind_addr_str; +#else +char *my_bind_addr_str; +#endif /* WITH_WSREP */ static char *default_collation_name; char *default_storage_engine; static char compiled_default_collation_name[]= MYSQL_DEFAULT_COLLATION_NAME; @@ -716,7 +723,7 @@ mysql_mutex_t LOCK_wsrep_slave_threads; mysql_mutex_t LOCK_wsrep_desync; int wsrep_replaying= 0; static void wsrep_close_threads(THD* thd); -#endif +#endif /* WITH_WSREP */ int mysqld_server_started= 0; File_parser_dummy_hook file_parser_dummy_hook; @@ -1863,6 +1870,7 @@ extern "C" void unireg_abort(int exit_code) WSREP_INFO("Some threads may fail to exit."); } #endif // WITH_WSREP + clean_up(!opt_abort && (exit_code || !opt_bootstrap)); /* purecov: inspected */ DBUG_PRINT("quit",("done with cleanup in unireg_abort")); mysqld_exit(exit_code); @@ -2389,7 +2397,7 @@ static my_socket activate_tcp_port(uint port) socket_errno); unireg_abort(1); } -#if defined(WITH_WSREP) && defined(HAVE_FCNTL) +#if defined(WITH_WSREP) && defined(HAVE_FCNTL) && defined(FD_CLOEXEC) (void) fcntl(ip_sock, F_SETFD, FD_CLOEXEC); #endif /* WITH_WSREP */ DBUG_RETURN(ip_sock); @@ -2513,7 +2521,7 @@ static void network_init(void) if (listen(unix_sock,(int) back_log) < 0) sql_print_warning("listen() on Unix socket failed with error %d", socket_errno); -#if defined(WITH_WSREP) && defined(HAVE_FCNTL) +#if defined(WITH_WSREP) && defined(HAVE_FCNTL) && defined(FD_CLOEXEC) (void) fcntl(unix_sock, F_SETFD, FD_CLOEXEC); #endif /* WITH_WSREP */ } @@ -4141,25 +4149,25 @@ static int init_thread_environment() return 1; } #ifdef WITH_WSREP - mysql_mutex_init(key_LOCK_wsrep_ready, - &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_ready, + &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL); - mysql_mutex_init(key_LOCK_wsrep_sst, - &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_sst, + &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL); - mysql_mutex_init(key_LOCK_wsrep_sst_init, - &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_sst_init, + &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL); - mysql_mutex_init(key_LOCK_wsrep_rollback, - &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_rollback, + &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL); - mysql_mutex_init(key_LOCK_wsrep_replaying, - &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_replaying, + &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL); - mysql_mutex_init(key_LOCK_wsrep_slave_threads, - &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); - mysql_mutex_init(key_LOCK_wsrep_desync, - &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_slave_threads, + &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST); + mysql_mutex_init(key_LOCK_wsrep_desync, + &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST); #endif return 0; } @@ -4789,7 +4797,7 @@ pthread_handler_t start_wsrep_THD(void *arg) THD *thd; wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg; - if (my_thread_init()) + if (my_thread_init()) { WSREP_ERROR("Could not initialize thread"); return(NULL); @@ -4852,7 +4860,7 @@ pthread_handler_t start_wsrep_THD(void *arg) statistic_increment(aborted_connects,&LOCK_status); MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0)); delete thd; - + return(NULL); } @@ -4910,42 +4918,6 @@ pthread_handler_t start_wsrep_THD(void *arg) return(NULL); } -void wsrep_create_rollbacker() -{ - if (WSREP_PROVIDER_EXISTS) - { - pthread_t hThread; - /* create rollbacker */ - if (pthread_create( &hThread, &connection_attrib, - start_wsrep_THD, (void*)wsrep_rollback_process)) - WSREP_WARN("Can't create thread to manage wsrep rollback"); - } -} - -void wsrep_create_appliers(long threads) -{ - if (!wsrep_connected) - { - /* see wsrep_replication_start() for the logic */ - if (wsrep_cluster_address && strlen(wsrep_cluster_address) && - WSREP_PROVIDER_EXISTS) - { - WSREP_ERROR("Trying to launch slave threads before creating " - "connection at '%s'", wsrep_cluster_address); - assert(0); - } - return; - } - - long wsrep_threads=0; - pthread_t hThread; - while (wsrep_threads++ < threads) { - if (pthread_create( - &hThread, &connection_attrib, - start_wsrep_THD, (void*)wsrep_replication_process)) - WSREP_WARN("Can't create thread to manage wsrep replication"); - } -} /**/ static bool abort_replicated(THD *thd) { @@ -4970,7 +4942,7 @@ WSREP_WARN("applier has wsrep_exec_mode = %d", thd->wsrep_exec_mode); if ( thd->slave_thread || /* declared as mysql slave */ thd->system_thread || /* declared as system thread */ - !thd->vio_ok() || /* server internal thread */ + !thd->vio_ok() || /* server internal thread */ thd->wsrep_exec_mode==REPL_RECV || /* applier or replaying thread */ thd->wsrep_applier || /* wsrep slave applier */ !thd->variables.wsrep_on) /* client, but fenced outside wsrep */ @@ -5452,6 +5424,9 @@ int mysqld_main(int argc, char **argv) return 1; } #endif +#ifdef WITH_WSREP + wsrep_filter_new_cluster (&argc, argv); +#endif /* WITH_WSREP */ orig_argc= argc; orig_argv= argv; @@ -5755,6 +5730,13 @@ int mysqld_main(int argc, char **argv) unireg_abort(1); #ifdef WITH_WSREP /* WSREP AFTER SE */ + if (wsrep_recovery) + { + select_thread_in_use= 0; + wsrep_recover(); + unireg_abort(0); + } + if (opt_bootstrap) { /*! bootstrap wsrep init was taken care of above */ @@ -6484,7 +6466,7 @@ void handle_connections_sockets() sleep(1); // Give other threads some time continue; } -#if defined(WITH_WSREP) && defined(HAVE_FCNTL) +#if defined(WITH_WSREP) && defined(HAVE_FCNTL) && defined(FD_CLOEXEC) (void) fcntl(new_sock, F_SETFD, FD_CLOEXEC); #endif /* WITH_WSREP */ @@ -7984,6 +7966,7 @@ SHOW_VAR status_vars[]= { {"wsrep_cluster_status", (char*) &wsrep_cluster_status, SHOW_CHAR_PTR}, {"wsrep_cluster_size", (char*) &wsrep_cluster_size, SHOW_LONG_NOFLUSH}, {"wsrep_local_index", (char*) &wsrep_local_index, SHOW_LONG_NOFLUSH}, + {"wsrep_local_bf_aborts", (char*) &wsrep_show_bf_aborts, SHOW_FUNC}, {"wsrep_provider_name", (char*) &wsrep_provider_name, SHOW_CHAR_PTR}, {"wsrep_provider_version", (char*) &wsrep_provider_version, SHOW_CHAR_PTR}, {"wsrep_provider_vendor", (char*) &wsrep_provider_vendor, SHOW_CHAR_PTR}, @@ -9220,6 +9203,9 @@ void refresh_status(THD *thd) /* Reset some global variables */ reset_status_vars(); +#ifdef WITH_WSREP + wsrep->stats_reset(wsrep); +#endif /* WITH_WSREP */ /* Reset the counters of all key caches (default and named). */ process_key_caches(reset_key_cache_counters, 0); @@ -9256,3 +9242,4 @@ template class I_List<i_string_pair>; template class I_List<Statement>; template class I_List_iterator<Statement>; #endif + diff --git a/sql/mysqld.h b/sql/mysqld.h index 0075c81726c..f392452f56e 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -227,7 +227,7 @@ extern PSI_mutex_key key_PAGE_lock, key_LOCK_sync, key_LOCK_active, #ifdef WITH_WSREP extern PSI_mutex_key key_LOCK_wsrep_thd; extern PSI_cond_key key_COND_wsrep_thd; -#endif /* HAVE_MMAP */ +#endif /* HAVE_WSREP */ #ifdef HAVE_OPENSSL extern PSI_mutex_key key_LOCK_des_key_file; @@ -423,7 +423,7 @@ enum options_mysqld OPT_WSREP_START_POSITION, OPT_WSREP_SST_AUTH, OPT_WSREP_RECOVER, -#endif +#endif /* WITH_WSREP */ OPT_which_is_always_the_last }; #endif @@ -572,5 +572,9 @@ extern uint internal_tmp_table_max_key_segments; extern uint volatile global_disable_checkpoint; extern my_bool opt_help; +#ifdef WITH_WSREP +#include "my_pthread.h" +pthread_handler_t start_wsrep_THD(void*); +#endif /* WITH_WSREP */ #endif /* MYSQLD_INCLUDED */ diff --git a/sql/sp.cc b/sql/sp.cc index 269917d6863..c379dbd7fbf 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -2289,7 +2289,7 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db, return sp; } #ifdef WITH_WSREP -int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len) { String log_query; sp_head *sp = thd->lex->sphead; @@ -2315,10 +2315,10 @@ int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len) sp->m_chistics, &(thd->lex->definer->user), &(thd->lex->definer->host), saved_mode)) - { - WSREP_WARN("SP create string failed: %s", thd->query()); - return 1; - } + { + WSREP_WARN("SP create string failed: %s", thd->query()); + return 1; + } return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len); } #endif /* WITH_WSREP */ diff --git a/sql/sql_base.cc b/sql/sql_base.cc index 3acfa63d55e..5eb6607fb4d 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -59,9 +59,9 @@ #ifdef __WIN__ #include <io.h> #endif - #ifdef WITH_WSREP #include "wsrep_mysqld.h" +#include "wsrep_thd.h" #endif // WITH_WSREP bool @@ -4227,7 +4227,7 @@ thr_lock_type read_lock_type_for_table(THD *thd, */ bool log_on= mysql_bin_log.is_open() && thd->variables.sql_log_bin; ulong binlog_format= thd->variables.binlog_format; - if ((log_on == FALSE) || (WSREP_FORMAT(binlog_format) == BINLOG_FORMAT_ROW) || + if ((log_on == FALSE) || (WSREP_BINLOG_FORMAT(binlog_format) == BINLOG_FORMAT_ROW) || (table_list->table->s->table_category == TABLE_CATEGORY_LOG) || (table_list->table->s->table_category == TABLE_CATEGORY_PERFORMANCE) || !(is_update_query(prelocking_ctx->sql_command) || @@ -5894,7 +5894,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count, We can solve these problems in mixed mode by switching to binlogging if at least one updated table is used by sub-statement */ - if (WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_ROW && tables && + if (WSREP_BINLOG_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_ROW && tables && has_write_table_with_auto_increment(thd->lex->first_not_own_table())) thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_COLUMNS); } @@ -9431,6 +9431,7 @@ void tdc_remove_table(THD *thd, enum_tdc_remove_table_type remove_type, { mysql_mutex_assert_owner(&LOCK_open); } + #ifdef WITH_WSREP /* if thd was BF aborted, exclusive locks were canceled */ #else @@ -9438,6 +9439,7 @@ void tdc_remove_table(THD *thd, enum_tdc_remove_table_type remove_type, thd->mdl_context.is_lock_owner(MDL_key::TABLE, db, table_name, MDL_EXCLUSIVE)); #endif /* WITH_WSREP */ + key_length= create_table_def_key(key, db, table_name); if ((share= (TABLE_SHARE*) my_hash_search(&table_def_cache,(uchar*) key, @@ -9461,7 +9463,6 @@ void tdc_remove_table(THD *thd, enum_tdc_remove_table_type remove_type, thus others can use table */ if (table->in_use != thd && - table->in_use->wsrep_bf_thd != thd && table->in_use->wsrep_conflict_state != MUST_ABORT) { #endif diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 2b9595a5080..334d6c5a619 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -848,9 +848,9 @@ extern "C" const char *wsrep_thd_conflict_state_str(THD *thd) (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void"; } -extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd) +extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd) { - return &thd->wsrep_trx_handle; + return &thd->wsrep_ws_handle; } extern "C"void wsrep_thd_LOCK(THD *thd) @@ -875,7 +875,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) } extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) { - return (thd) ? thd->wsrep_trx_seqno : WSREP_SEQNO_UNDEFINED; + return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED; } extern "C" query_id_t wsrep_thd_query_id(THD *thd) { @@ -913,16 +913,16 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal) extern "C" int wsrep_trx_order_before(void *thd1, void *thd2) { - if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) { - WSREP_DEBUG("BF conflict, order: %lld %lld\n", - (long long)((THD*)thd1)->wsrep_trx_seqno, - (long long)((THD*)thd2)->wsrep_trx_seqno); - return 1; - } - WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", - (long long)((THD*)thd1)->wsrep_trx_seqno, - (long long)((THD*)thd2)->wsrep_trx_seqno); - return 0; + if (wsrep_thd_trx_seqno((THD*)thd1) < wsrep_thd_trx_seqno((THD*)thd2)) { + WSREP_DEBUG("BF conflict, order: %lld %lld\n", + (long long)wsrep_thd_trx_seqno((THD*)thd1), + (long long)wsrep_thd_trx_seqno((THD*)thd2)); + return 1; + } + WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", + (long long)wsrep_thd_trx_seqno((THD*)thd1), + (long long)wsrep_thd_trx_seqno((THD*)thd2)); + return 0; } extern "C" int wsrep_trx_is_aborting(void *thd_ptr) @@ -1000,7 +1000,7 @@ THD::THD() wsrep_applier(is_applier), wsrep_applier_closing(FALSE), wsrep_client_thread(0), - wsrep_trx_seqno(WSREP_SEQNO_UNDEFINED), + wsrep_apply_toi(false), #endif m_parser_state(NULL), #if defined(ENABLED_DEBUG_SYNC) @@ -1104,9 +1104,8 @@ THD::THD() #ifdef WITH_WSREP mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); - wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID; - wsrep_trx_handle.opaque = NULL; - //wsrep_retry_autocommit= ::wsrep_retry_autocommit; + wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; + wsrep_ws_handle.opaque = NULL; wsrep_retry_counter = 0; wsrep_PA_safe = true; wsrep_retry_query = NULL; @@ -1471,7 +1470,10 @@ void THD::init(void) wsrep_conflict_state= NO_CONFLICT; wsrep_query_state= QUERY_IDLE; wsrep_last_query_id= 0; + wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; + wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; wsrep_converted_lock_session= false; + //wsrep_retry_autocommit= ::wsrep_retry_autocommit; wsrep_retry_counter= 0; wsrep_rli= NULL; wsrep_PA_safe= true; @@ -2116,6 +2118,7 @@ void THD::cleanup_after_query() /* reset table map for multi-table update */ table_map_for_update= 0; m_binlog_invoker= FALSE; + /* reset replication info structure */ #ifndef EMBEDDED_LIBRARY if (rli_slave) @@ -4279,7 +4282,7 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd) #else if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG)) #endif - return (int) WSREP_FORMAT(thd->variables.binlog_format); + return (int) WSREP_BINLOG_FORMAT(thd->variables.binlog_format); else return BINLOG_FORMAT_UNSPEC; } @@ -4839,7 +4842,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) binlog by filtering rules. */ if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) && - !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT && + !(WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT && !binlog_filter->db_ok(db))) { /* @@ -5003,7 +5006,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) */ my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0)); } - else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW && + else if (WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW && sqlcom_can_generate_row_events(this)) { /* @@ -5032,7 +5035,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) else { /* binlog_format = STATEMENT */ - if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT) + if (WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT) { if (lex->is_stmt_row_injection()) { @@ -5144,7 +5147,7 @@ int THD::decide_logging_format(TABLE_LIST *tables) "and binlog_filter->db_ok(db) = %d", mysql_bin_log.is_open(), (variables.option_bits & OPTION_BIN_LOG), - WSREP_FORMAT(variables.binlog_format), + WSREP_BINLOG_FORMAT(variables.binlog_format), binlog_filter->db_ok(db))); #endif diff --git a/sql/sql_class.h b/sql/sql_class.h index f35e8914c42..d10769d10a8 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -49,6 +49,7 @@ #include "wsrep_mysqld.h" struct wsrep_thd_shadow { ulonglong options; + uint server_status; enum wsrep_exec_mode wsrep_exec_mode; Vio *vio; ulong tx_isolation; @@ -1807,7 +1808,8 @@ public: int is_current_stmt_binlog_format_row() const { DBUG_ASSERT(current_stmt_binlog_format == BINLOG_FORMAT_STMT || current_stmt_binlog_format == BINLOG_FORMAT_ROW); - return (WSREP_FORMAT((ulong)current_stmt_binlog_format) == BINLOG_FORMAT_ROW); + return (WSREP_BINLOG_FORMAT((ulong)current_stmt_binlog_format) == + BINLOG_FORMAT_ROW); } private: @@ -2353,11 +2355,13 @@ public: enum wsrep_conflict_state wsrep_conflict_state; mysql_mutex_t LOCK_wsrep_thd; mysql_cond_t COND_wsrep_thd; - wsrep_seqno_t wsrep_trx_seqno; + // changed from wsrep_seqno_t to wsrep_trx_meta_t in wsrep API rev 75 + // wsrep_seqno_t wsrep_trx_seqno; + wsrep_trx_meta_t wsrep_trx_meta; uint32 wsrep_rand; Relay_log_info* wsrep_rli; bool wsrep_converted_lock_session; - wsrep_trx_handle_t wsrep_trx_handle; + wsrep_ws_handle_t wsrep_ws_handle; #ifdef WSREP_PROC_INFO char wsrep_info[128]; /* string for dynamic proc info */ #endif /* WSREP_PROC_INFO */ @@ -2374,6 +2378,7 @@ public: const char* wsrep_TOI_pre_query; /* a query to apply before the actual TOI query */ size_t wsrep_TOI_pre_query_len; + bool wsrep_apply_toi; /* applier processing in TOI */ #endif /* WITH_WSREP */ /** Internal parser state. @@ -2819,7 +2824,7 @@ public: tests fail and so force them to propagate the lex->binlog_row_based_if_mixed upwards to the caller. */ - if ((WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_MIXED) && + if ((WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_MIXED)&& (in_sub_stmt == 0)) set_current_stmt_binlog_format_row(); @@ -2861,7 +2866,7 @@ public: show_system_thread(system_thread))); if (in_sub_stmt == 0) { - if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW) + if (WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW) set_current_stmt_binlog_format_row(); else if (temporary_tables == NULL) clear_current_stmt_binlog_format_row(); diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index e8e5d79b370..d497b6d1263 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -104,13 +104,9 @@ #ifdef WITH_WSREP #include "wsrep_mysqld.h" -#include "rpl_rli.h" -static void wsrep_client_rollback(THD *thd); - -extern Format_description_log_event *wsrep_format_desc; - +#include "wsrep_thd.h" static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, - Parser_state *parser_state); + Parser_state *parser_state); #endif /* WITH_WSREP */ /** @defgroup Runtime_Environment Runtime Environment @@ -445,13 +441,6 @@ bool is_log_table_write_query(enum enum_sql_command command) return (sql_command_flags[command] & CF_WRITE_LOGS_COMMAND) != 0; } -#ifdef WITH_WSREP -bool is_show_query(enum enum_sql_command command) -{ - DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); - return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; -} -#endif void execute_init_command(THD *thd, LEX_STRING *init_command, mysql_rwlock_t *var_lock) { @@ -637,7 +626,7 @@ void do_handle_bootstrap(THD *thd) close_connection(thd, ER_OUT_OF_RESOURCES, 1); #else close_connection(thd, ER_OUT_OF_RESOURCES); -#endif +#endif /* WITH_WSREP */ #endif thd->fatal_error(); goto end; @@ -717,13 +706,13 @@ bool do_command(THD *thd) { mysql_mutex_lock(&thd->LOCK_wsrep_thd); thd->wsrep_query_state= QUERY_IDLE; - if (thd->wsrep_conflict_state==MUST_ABORT) + if (thd->wsrep_conflict_state==MUST_ABORT) { wsrep_client_rollback(thd); } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } -#endif +#endif /* WITH_WSREP */ /* indicator of uninitialized lex => normal flow of errors handling (see my_message_sql) @@ -776,12 +765,12 @@ bool do_command(THD *thd) if (thd->wsrep_conflict_state == ABORTING) { while (thd->wsrep_conflict_state == ABORTING) { - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - my_sleep(1000); - mysql_mutex_lock(&thd->LOCK_wsrep_thd); + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + my_sleep(1000); + mysql_mutex_lock(&thd->LOCK_wsrep_thd); } thd->store_globals(); - } + } else if (thd->wsrep_conflict_state == ABORTED) { thd->store_globals(); @@ -795,7 +784,7 @@ bool do_command(THD *thd) (!WSREP(thd) && (packet_length= my_net_read(net)) == packet_error)) #else if ((packet_length= my_net_read(net)) == packet_error) -#endif +#endif /* WITH_WSREP */ { DBUG_PRINT("info",("Got error %d reading command from socket %s", net->error, @@ -805,12 +794,12 @@ bool do_command(THD *thd) mysql_mutex_lock(&thd->LOCK_wsrep_thd); if (thd->wsrep_conflict_state == MUST_ABORT) { - DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id)); - wsrep_client_rollback(thd); + DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id)); + wsrep_client_rollback(thd); } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } -#endif +#endif /* WITH_WSREP */ /* Check if we can continue without closing the connection */ @@ -858,11 +847,11 @@ bool do_command(THD *thd) #ifdef WITH_WSREP if (WSREP(thd)) { - /* + /* * bail out if DB snapshot has not been installed. We however, * allow queries "SET" and "SHOW", they are trapped later in execute_command */ - if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && + if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && command != COM_QUERY && command != COM_PING && command != COM_QUIT && @@ -875,14 +864,14 @@ bool do_command(THD *thd) command != COM_TIME && command != COM_END ) { - my_error(ER_UNKNOWN_COM_ERROR, MYF(0), + my_error(ER_UNKNOWN_COM_ERROR, MYF(0), "WSREP has not yet prepared node for application use"); thd->protocol->end_statement(); return_value= FALSE; goto out; } } -#endif +#endif /* WITH_WSREP */ /* Restore read timeout value */ my_net_set_read_timeout(net, thd->variables.net_read_timeout); @@ -892,17 +881,17 @@ bool do_command(THD *thd) if (WSREP(thd)) { while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT) { - CHARSET_INFO *current_charset = thd->variables.character_set_client; - if (!is_supported_parser_charset(current_charset)) - { - /* Do not use non-supported parser character sets */ - WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname); - thd->variables.character_set_client = &my_charset_latin1; - WSREP_WARN("For retry temporally setting character set to : %s", my_charset_latin1.csname); - } - return_value= dispatch_command(command, thd, thd->wsrep_retry_query, - thd->wsrep_retry_query_len); - thd->variables.character_set_client = current_charset; + CHARSET_INFO *current_charset = thd->variables.character_set_client; + if (!is_supported_parser_charset(current_charset)) + { + /* Do not use non-supported parser character sets */ + WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname); + thd->variables.character_set_client = &my_charset_latin1; + WSREP_WARN("For retry temporally setting character set to : %s", my_charset_latin1.csname); + } + return_value= dispatch_command(command, thd, thd->wsrep_retry_query, + thd->wsrep_retry_query_len); + thd->variables.character_set_client = current_charset; } } if (thd->wsrep_retry_query && thd->wsrep_conflict_state != REPLAYING) @@ -912,7 +901,7 @@ bool do_command(THD *thd) thd->wsrep_retry_query_len = 0; thd->wsrep_retry_command = COM_CONNECT; } -#endif +#endif /* WITH_WSREP */ out: DBUG_RETURN(return_value); } @@ -1059,7 +1048,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, { wsrep_client_rollback(thd); } - if (thd->wsrep_conflict_state== ABORTED) + if (thd->wsrep_conflict_state== ABORTED) { my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); WSREP_DEBUG("Deadlock error for: %s", thd->query()); @@ -1264,7 +1253,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, wsrep_mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); #else mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); -#endif +#endif /* WITH_WSREP */ while (!thd->killed && (parser_state.m_lip.found_semicolon != NULL) && ! thd->is_error()) @@ -1325,14 +1314,14 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd->set_time(); /* Reset the query start time. */ #else thd->set_time(); /* Reset the query start time. */ -#endif +#endif /* WITH_WSREP */ parser_state.reset(beginning_of_next_stmt, length); /* TODO: set thd->lex->sql_command to SQLCOM_END here */ #ifdef WITH_WSREP wsrep_mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); #else mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); -#endif +#endif /* WITH_WSREP */ } DBUG_PRINT("info",("query ready")); @@ -1666,14 +1655,17 @@ bool dispatch_command(enum enum_server_command command, THD *thd, /* wsrep BF abort in query exec phase */ mysql_mutex_lock(&thd->LOCK_wsrep_thd); if ((thd->wsrep_conflict_state != REPLAYING) && - (thd->wsrep_conflict_state != RETRY_AUTOCOMMIT)) - { + (thd->wsrep_conflict_state != RETRY_AUTOCOMMIT)) { + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + thd->update_server_status(); thd->protocol->end_statement(); query_cache_end_of_result(thd); + } + else + { + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - } else { /* if (WSREP(thd))... */ #endif /* WITH_WSREP */ DBUG_ASSERT(thd->derived_tables == NULL && @@ -2146,6 +2138,13 @@ err: return TRUE; } +#ifdef WITH_WSREP +static bool wsrep_is_show_query(enum enum_sql_command command) +{ + DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); + return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; +} +#endif /* WITH_WSREP */ /** Execute command saved in thd and lex->sql_command. @@ -2381,7 +2380,7 @@ mysql_execute_command(THD *thd) } } } - if (lex->sql_command== SQLCOM_UNLOCK_TABLES && + if (lex->sql_command== SQLCOM_UNLOCK_TABLES && thd->wsrep_converted_lock_session) { thd->wsrep_converted_lock_session= false; @@ -2389,13 +2388,13 @@ mysql_execute_command(THD *thd) lex->tx_release= TVL_NO; } - /* + /* * bail out if DB snapshot has not been installed. We however, * allow SET and SHOW queries */ if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && lex->sql_command != SQLCOM_SET_OPTION && - !is_show_query(lex->sql_command)) + !wsrep_is_show_query(lex->sql_command)) { #if DIRTY_HACK /* Dirty hack for lp:1002714 - trying to recognize mysqldump connection @@ -2410,7 +2409,7 @@ mysql_execute_command(THD *thd) strncmp(thd->query(), mysqldump_magic_str, mysqldump_magic_str_len)) { #endif /* DIRTY_HACK */ - my_error(ER_UNKNOWN_COM_ERROR, MYF(0), + my_error(ER_UNKNOWN_COM_ERROR, MYF(0), "WSREP has not yet prepared node for application use"); goto error; #if DIRTY_HACK @@ -2449,7 +2448,9 @@ mysql_execute_command(THD *thd) if (trans_commit_implicit(thd)) { thd->mdl_context.release_transactional_locks(); +#ifdef WITH_WSREP WSREP_DEBUG("implicit commit failed, MDL released: %lu", thd->thread_id); +#endif /* WITH_WSREP */ goto error; } /* Release metadata locks acquired in this transaction. */ @@ -2480,7 +2481,13 @@ mysql_execute_command(THD *thd) break; case SQLCOM_SHOW_STATUS: { +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; +#endif /* WITH_WSREP */ execute_show_status(thd, all_tables); +#ifdef WITH_WSREP + if (lex->sql_command == SQLCOM_SHOW_STATUS) wsrep_free_status(thd); +#endif /* WITH_WSREP */ break; } case SQLCOM_SHOW_DATABASES: @@ -2511,7 +2518,7 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_STORAGE_ENGINES: case SQLCOM_SHOW_PROFILE: #endif /* WITH_WSREP */ - { + { thd->status_var.last_query_cost= 0.0; /* @@ -2821,7 +2828,7 @@ case SQLCOM_PREPARE: */ if (thd->query_name_consts && mysql_bin_log.is_open() && - WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT && + WSREP_BINLOG_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT && !mysql_bin_log.is_query_in_union(thd, thd->query_id)) { List_iterator_fast<Item> it(select_lex->item_list); @@ -3503,7 +3510,7 @@ end_with_restore_list: #ifdef WITH_WSREP for (TABLE_LIST *table= all_tables; table; table= table->next_global) { - if (!lex->drop_temporary && + if (!lex->drop_temporary && (!thd->is_current_stmt_binlog_format_row() || !find_temporary_table(thd, table))) { @@ -4169,7 +4176,7 @@ end_with_restore_list: lex->insert_list, lex->ha_rkey_mode, select_lex->where, unit->select_limit_cnt, unit->offset_limit_cnt); #ifdef WITH_WSREP - if (WSREP(thd)) thd_proc_info(thd, tmp_info); + if (WSREP(thd)) thd_proc_info(thd, tmp_info); } #endif /* WITH_WSREP */ break; @@ -4178,7 +4185,9 @@ end_with_restore_list: if (trans_begin(thd, lex->start_transaction_opt)) { thd->mdl_context.release_transactional_locks(); +#ifdef WITH_WSREP WSREP_DEBUG("BEGIN failed, MDL released: %lu", thd->thread_id); +#endif /* WITH_WSREP */ goto error; } my_ok(thd); @@ -4196,7 +4205,9 @@ end_with_restore_list: if (trans_commit(thd)) { thd->mdl_context.release_transactional_locks(); +#ifdef WITH_WSREP WSREP_DEBUG("COMMIT failed, MDL released: %lu", thd->thread_id); +#endif /* WITH_WSREP */ goto error; } thd->mdl_context.release_transactional_locks(); @@ -4217,7 +4228,20 @@ end_with_restore_list: thd->killed= KILL_CONNECTION; thd->print_aborted_warning(3, "RELEASE"); } +#ifdef WITH_WSREP + if (WSREP(thd)) { + + if (thd->wsrep_conflict_state == NO_CONFLICT || + thd->wsrep_conflict_state == REPLAYING) + { + my_ok(thd); + } + } else { +#endif /* WITH_WSREP */ my_ok(thd); +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ break; } case SQLCOM_ROLLBACK: @@ -4233,7 +4257,9 @@ end_with_restore_list: if (trans_rollback(thd)) { thd->mdl_context.release_transactional_locks(); +#ifdef WITH_WSREP WSREP_DEBUG("rollback failed, MDL released: %lu", thd->thread_id); +#endif /* WITH_WSREP */ goto error; } thd->mdl_context.release_transactional_locks(); @@ -4251,20 +4277,18 @@ end_with_restore_list: /* Disconnect the current client connection. */ if (tx_release) thd->killed= KILL_CONNECTION; - #ifdef WITH_WSREP +#ifdef WITH_WSREP if (WSREP(thd)) { - if (thd->wsrep_conflict_state == NO_CONFLICT || - thd->wsrep_conflict_state == REPLAYING) - { - my_ok(thd); + if (thd->wsrep_conflict_state == NO_CONFLICT) { + my_ok(thd); } } else { #endif /* WITH_WSREP */ - my_ok(thd); - #ifdef WITH_WSREP + my_ok(thd); +#ifdef WITH_WSREP } #endif /* WITH_WSREP */ - break; + break; } case SQLCOM_RELEASE_SAVEPOINT: if (trans_release_savepoint(thd, lex->ident)) @@ -4781,7 +4805,9 @@ create_sp_error: if (trans_xa_commit(thd)) { thd->mdl_context.release_transactional_locks(); +#ifdef WITH_WSREP WSREP_DEBUG("XA commit failed, MDL released: %lu", thd->thread_id); +#endif /* WITH_WSREP */ goto error; } thd->mdl_context.release_transactional_locks(); @@ -4796,7 +4822,9 @@ create_sp_error: if (trans_xa_rollback(thd)) { thd->mdl_context.release_transactional_locks(); +#ifdef WITH_WSREP WSREP_DEBUG("XA rollback failed, MDL released: %lu", thd->thread_id); +#endif /* WITH_WSREP */ goto error; } thd->mdl_context.release_transactional_locks(); @@ -5931,7 +5959,12 @@ void THD::reset_for_next_command() thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0; #ifdef WITH_WSREP - if (WSREP(thd)) { + /* + Autoinc variables should be adjusted only for locally executed + transactions. Appliers and replayers are either processing ROW + events or get autoinc variable values from Query_log_event. + */ + if (WSREP(thd) && thd->wsrep_exec_mode == LOCAL_STATE) { if (wsrep_auto_increment_control) { if (thd->variables.auto_increment_offset != @@ -6147,96 +6180,10 @@ void mysql_init_multi_delete(LEX *lex) } #ifdef WITH_WSREP -void wsrep_replay_transaction(THD *thd) -{ - /* checking if BF trx must be replayed */ - if (thd->wsrep_conflict_state== MUST_REPLAY) - { - if (thd->wsrep_exec_mode!= REPL_RECV) - { - if (thd->stmt_da->is_sent) - { - WSREP_ERROR("replay issue, thd has reported status already"); - } - thd->stmt_da->reset_diagnostics_area(); - - thd->wsrep_conflict_state= REPLAYING; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - mysql_reset_thd_for_next_command(thd); - thd->killed= NOT_KILLED; - close_thread_tables(thd); - if (thd->locked_tables_mode && thd->lock) - { - WSREP_DEBUG("releasing table lock for replaying (%ld)", - thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); - } - thd->mdl_context.release_transactional_locks(); - - thd_proc_info(thd, "wsrep replaying trx"); - WSREP_DEBUG("replay trx: %s %lld", - thd->query() ? thd->query() : "void", - (long long)thd->wsrep_trx_seqno); - struct wsrep_thd_shadow shadow; - wsrep_prepare_bf_thd(thd, &shadow); - int rcode = wsrep->replay_trx(wsrep, - &thd->wsrep_trx_handle, - (void *)thd); - - wsrep_return_from_bf_mode(thd, &shadow); - if (thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - - switch (rcode) - { - case WSREP_OK: - thd->wsrep_conflict_state= NO_CONFLICT; - wsrep->post_commit(wsrep, &thd->wsrep_trx_handle); - WSREP_DEBUG("trx_replay successful for: %ld %llu", - thd->thread_id, (long long)thd->real_id); - break; - case WSREP_TRX_FAIL: - if (thd->stmt_da->is_sent) - { - WSREP_ERROR("replay failed, thd has reported status"); - } - else - { - WSREP_DEBUG("replay failed, rolling back"); - my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); - } - thd->wsrep_conflict_state= ABORTED; - thd->wsrep_bf_thd = NULL; - wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); - break; - default: - WSREP_ERROR("trx_replay failed for: %d, query: %s", - rcode, thd->query() ? thd->query() : "void"); - /* we're now in inconsistent state, must abort */ - unireg_abort(1); - break; - } - - wsrep_cleanup_transaction(thd); - - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - WSREP_DEBUG("replaying decreased: %d, thd: %lu", - wsrep_replaying, thd->thread_id); - mysql_cond_broadcast(&COND_wsrep_replaying); - mysql_mutex_unlock(&LOCK_wsrep_replaying); - } - } -} - static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, - Parser_state *parser_state) + Parser_state *parser_state) { - bool is_autocommit= + bool is_autocommit= !thd->in_multi_stmt_transaction_mode() && thd->wsrep_conflict_state == NO_CONFLICT && !thd->wsrep_applier && @@ -6259,9 +6206,10 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, WSREP_DEBUG("abort in exec query state, avoiding autocommit"); } - if (thd->wsrep_conflict_state== MUST_REPLAY) + /* checking if BF trx must be replayed */ + if (thd->wsrep_conflict_state== MUST_REPLAY) { - wsrep_replay_transaction(thd); + wsrep_replay_transaction(thd); } /* setting error code for BF aborted trxs */ @@ -6274,7 +6222,7 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, thd->lex->sql_command != SQLCOM_SELECT && (thd->wsrep_retry_counter < thd->variables.wsrep_retry_autocommit)) { - WSREP_DEBUG("wsrep retrying AC query: %s", + WSREP_DEBUG("wsrep retrying AC query: %s", (thd->query()) ? thd->query() : "void"); close_thread_tables(thd); @@ -6287,10 +6235,10 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, } else { - WSREP_DEBUG("%s, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s", - (thd->wsrep_conflict_state == ABORTED) ? + WSREP_DEBUG("%s, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s", + (thd->wsrep_conflict_state == ABORTED) ? "BF Aborted" : "cert failure", - thd->thread_id, is_autocommit, thd->wsrep_retry_counter, + thd->thread_id, is_autocommit, thd->wsrep_retry_counter, thd->variables.wsrep_retry_autocommit, thd->query()); my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); thd->killed= NOT_KILLED; @@ -6309,7 +6257,8 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, if (thd->wsrep_retry_query) { - WSREP_DEBUG("releasing retry_query: conf %d sent %d kill %d errno %d SQL %s", + WSREP_DEBUG("releasing retry_query: " + "conf %d sent %d kill %d errno %d SQL %s", thd->wsrep_conflict_state, thd->stmt_da->is_sent, thd->killed, @@ -7304,7 +7253,7 @@ uint kill_one_thread(THD *thd, ulong id, killed_state kill_signal) #ifdef WITH_WSREP if (((thd->security_ctx->master_access & SUPER_ACL) || thd->security_ctx->user_matches(tmp->security_ctx)) && - !wsrep_thd_is_brute_force((void *)tmp)) + !wsrep_thd_is_brute_force((void *)tmp)) #else if ((thd->security_ctx->master_access & SUPER_ACL) || thd->security_ctx->user_matches(tmp->security_ctx)) @@ -8083,621 +8032,6 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name) return definer; } -#ifdef WITH_WSREP -/* must have (&thd->LOCK_wsrep_thd) */ -static void wsrep_client_rollback(THD *thd) -{ - WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s", - thd->thread_id, thd->query()); - - thd->wsrep_conflict_state= ABORTING; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - trans_rollback(thd); - - if (thd->locked_tables_mode && thd->lock) - { - WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); - } - - if (thd->global_read_lock.is_acquired()) - { - WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id); - thd->global_read_lock.unlock_global_read_lock(thd); - } - - /* Release transactional metadata locks. */ - thd->mdl_context.release_transactional_locks(); - - /* release explicit MDL locks */ - thd->mdl_context.release_explicit_locks(); - - if (thd->get_binlog_table_maps()) - { - WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id); - thd->clear_binlog_table_maps(); - } - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_conflict_state= ABORTED; - thd->wsrep_bf_thd = NULL; -} - -static enum wsrep_status wsrep_apply_sql( - THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed) -{ - int error; - enum wsrep_status ret_code= WSREP_OK; - CHARSET_INFO *current_charset = thd->variables.character_set_client; - - DBUG_ENTER("wsrep_bf_execute_cb"); - thd->wsrep_exec_mode= REPL_RECV; - thd->net.vio= 0; - thd->start_time= timeval; - thd->wsrep_rand= randseed; - - thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT; - - DBUG_PRINT("wsrep", ("SQL: %s", sql)); - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_query_state= QUERY_EXEC; - /* preserve replaying mode */ - if (thd->wsrep_conflict_state!= REPLAYING) - thd->wsrep_conflict_state= NO_CONFLICT; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - if (!is_supported_parser_charset(current_charset)) - { - /* Do not use non-supported parser character sets */ - WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname); - thd->variables.character_set_client = &my_charset_latin1; - WSREP_WARN("For BF SQL apply temporally setting character set to : %s", - my_charset_latin1.csname); - } - - if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) { - WSREP_WARN("BF SQL apply failed: %d, %lld", - thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno); - thd->variables.character_set_client = current_charset; - DBUG_RETURN(WSREP_FATAL); - } - thd->variables.character_set_client = current_charset; - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - if (thd->wsrep_conflict_state!= NO_CONFLICT && - thd->wsrep_conflict_state!= REPLAYING) { - ret_code= WSREP_FATAL; - WSREP_DEBUG("BF thd ending, with: %d, %lld", - thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno); - } - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - assert(thd->wsrep_exec_mode== REPL_RECV); - DBUG_RETURN(ret_code); -} - -void wsrep_write_rbr_buf( - THD *thd, const void* rbr_buf, size_t buf_len) -{ - char filename[PATH_MAX]= {0}; - int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log", - wsrep_data_home_dir, thd->thread_id, - (long long)thd->wsrep_trx_seqno); - if (len >= PATH_MAX) - { - WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len); - return; - } - - FILE *of= fopen(filename, "wb"); - if (of) - { - (void) fwrite (rbr_buf, buf_len, 1, of); - fclose(of); - } - else - { - WSREP_ERROR("Failed to open file '%s': %d (%s)", - filename, errno, strerror(errno)); - } -} - -static inline wsrep_status_t wsrep_apply_rbr( - THD *thd, const uchar *rbr_buf, size_t buf_len) -{ - char *buf= (char *)rbr_buf; - int rcode= 0; - int event= 1; - Format_description_log_event *description_event = wsrep_format_desc; - DBUG_ENTER("wsrep_apply_rbr"); - - if (thd->killed == KILL_CONNECTION) - { - WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld", - (long long) thd->wsrep_trx_seqno); - DBUG_RETURN(WSREP_FATAL); - } - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_query_state= QUERY_EXEC; - if (thd->wsrep_conflict_state!= REPLAYING) - thd->wsrep_conflict_state= NO_CONFLICT; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", - (long long) thd->wsrep_trx_seqno); - - if ((rcode= trans_begin(thd))) - WSREP_WARN("begin for rbr apply failed: %lld, code: %d", - (long long) thd->wsrep_trx_seqno, rcode); - - while(buf_len) - { - int exec_res; - int error = 0; - Log_event* ev= wsrep_read_log_event(&buf, &buf_len, description_event); - - if (!ev) - { - WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld", - (long long)thd->wsrep_trx_seqno, (long int) buf_len); - rcode= 1; - goto error; - } - switch (ev->get_type_code()) { - case WRITE_ROWS_EVENT: - case UPDATE_ROWS_EVENT: - case DELETE_ROWS_EVENT: - DBUG_ASSERT(buf_len != 0 || - ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)); - break; - case FORMAT_DESCRIPTION_EVENT: - description_event = (Format_description_log_event *)ev; - break; - default: - break; - } - - thd->server_id = ev->server_id; // use the original server id for logging - thd->set_time(); // time the query - wsrep_xid_init(&thd->transaction.xid_state.xid, - wsrep_cluster_uuid(), - thd->wsrep_trx_seqno); - thd->lex->current_select= 0; - if (!ev->when) - ev->when = time(NULL); - ev->thd = thd; - exec_res = ev->apply_event(thd->wsrep_rli); - DBUG_PRINT("info", ("exec_event result: %d", exec_res)); - - if (exec_res) - { - WSREP_WARN("RBR event %d %s apply warning: %d, %lld", - event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno); - rcode= exec_res; - /* stop processing for the first error */ - delete ev; - goto error; - } - event++; - - if (thd->wsrep_conflict_state!= NO_CONFLICT && - thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("conflict state after RBR event applying: %d, %lld", - thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno); - - if (thd->wsrep_conflict_state == MUST_ABORT) { - WSREP_WARN("RBR event apply failed, rolling back: %lld", - (long long) thd->wsrep_trx_seqno); - trans_rollback(thd); - thd->locked_tables_list.unlock_locked_tables(thd); - /* Release transactional metadata locks. */ - thd->mdl_context.release_transactional_locks(); - thd->wsrep_conflict_state= NO_CONFLICT; - DBUG_RETURN(WSREP_FATAL); - } - - if ((ev->get_type_code() == WRITE_ROWS_EVENT || - ev->get_type_code() == UPDATE_ROWS_EVENT || - ev->get_type_code() == DELETE_ROWS_EVENT) && - ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)) - { - thd->wsrep_rli->cleanup_context(thd, 0); - - if (error == 0) - { - thd->clear_error(); - } - else - WSREP_ERROR("Error in %s event: commit of row events failed: %lld", - ev->get_type_str(), (long long)thd->wsrep_trx_seqno); - } - - if (description_event != ev) - delete ev; - } - - error: - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_query_state= QUERY_IDLE; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - assert(thd->wsrep_exec_mode== REPL_RECV); - - if (thd->killed == KILL_CONNECTION) - WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno); - - if (rcode) DBUG_RETURN(WSREP_FATAL); - DBUG_RETURN(WSREP_OK); -} - -wsrep_status_t wsrep_apply_cb(void* const ctx, - const void* const buf, size_t const buf_len, - wsrep_seqno_t const global_seqno) -{ - THD* const thd((THD*)ctx); - - thd->wsrep_trx_seqno= global_seqno; - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "applying write set %lld: %p, %zu", - (long long)thd->wsrep_trx_seqno, buf, buf_len); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "applying write set"); -#endif /* WSREP_PROC_INFO */ - - wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len)); - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "applied write set %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "applied write set"); -#endif /* WSREP_PROC_INFO */ - - if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len); - TABLE *tmp; - while ((tmp = thd->temporary_tables)) - { - WSREP_DEBUG("Applier %lu, has temporary tables: %s.%s", - thd->thread_id, - (tmp->s) ? tmp->s->db.str : "void", - (tmp->s) ? tmp->s->table_name.str : "void"); - close_temporary_table(thd, tmp, 1, 1); - } - - return rcode; -} - -#if DELETE // this does not work in 5.5 -/* a common wrapper for end_trans() function - to put all necessary stuff */ -static inline wsrep_status_t -wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end) -{ - if (0 == end_trans(thd, end)) - { - return WSREP_OK; - } - else - { - return WSREP_FATAL; - } -} -#endif - -wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno) -{ -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "committing %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "committing"); -#endif /* WSREP_PROC_INFO */ - - wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0)); -// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT)); - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "committed %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "committed"); -#endif /* WSREP_PROC_INFO */ - - if (WSREP_OK == rcode) - { - // TODO: mark snapshot with global_seqno. - } - - return rcode; -} - -wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno) -{ -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "rolling back %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "rolling back"); -#endif /* WSREP_PROC_INFO */ - - wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0)); -// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK)); - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "rolled back %lld", (long long)thd->wsrep_trx_seqno); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "rolled back"); -#endif /* WSREP_PROC_INFO */ - - return rcode; -} - -wsrep_status_t wsrep_commit_cb(void* const ctx, - wsrep_seqno_t const global_seqno, - bool const commit) -{ - THD* const thd((THD*)ctx); - - assert(global_seqno == thd->wsrep_trx_seqno); - - if (commit) - return wsrep_commit(thd, global_seqno); - else - return wsrep_rollback(thd, global_seqno); -} - -Relay_log_info* wsrep_relay_log_init(const char* log_fname) -{ - Relay_log_info* rli= new Relay_log_info(false); - - rli->no_storage= true; - if (!rli->relay_log.description_event_for_exec) - { - rli->relay_log.description_event_for_exec= - new Format_description_log_event(4); - } - - rli->sql_thd= current_thd; - return rli; -} - -void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) -{ - shadow->options = thd->variables.option_bits; - shadow->wsrep_exec_mode = thd->wsrep_exec_mode; - shadow->vio = thd->net.vio; - - if (opt_log_slave_updates) - thd->variables.option_bits|= OPTION_BIN_LOG; - else - thd->variables.option_bits&= ~(OPTION_BIN_LOG); - - if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay"); - - thd->wsrep_exec_mode= REPL_RECV; - thd->net.vio= 0; - thd->clear_error(); - - thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT; - - shadow->tx_isolation = thd->variables.tx_isolation; - thd->variables.tx_isolation = ISO_READ_COMMITTED; - thd->tx_isolation = ISO_READ_COMMITTED; - - shadow->db = thd->db; - shadow->db_length = thd->db_length; - thd->reset_db(NULL, 0); -} - -void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) -{ - thd->variables.option_bits = shadow->options; - thd->wsrep_exec_mode = shadow->wsrep_exec_mode; - thd->net.vio = shadow->vio; - thd->variables.tx_isolation = shadow->tx_isolation; - - thd->reset_db(shadow->db, shadow->db_length); -} - -void wsrep_replication_process(THD *thd) -{ - int rcode; - DBUG_ENTER("wsrep_replication_process"); - - struct wsrep_thd_shadow shadow; - wsrep_prepare_bf_thd(thd, &shadow); - - rcode = wsrep->recv(wsrep, (void *)thd); - DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode)); - - WSREP_INFO("applier thread exiting (code:%d)", rcode); - - switch (rcode) { - case WSREP_OK: - case WSREP_NOT_IMPLEMENTED: - case WSREP_CONN_FAIL: - /* provider does not support slave operations / disconnected from group, - * just close applier thread */ - break; - case WSREP_NODE_FAIL: - /* data inconsistency => SST is needed */ - /* Note: we cannot just blindly restart replication here, - * SST might require server restart if storage engines must be - * initialized after SST */ - WSREP_ERROR("node consistency compromised, aborting"); - wsrep_kill_mysql(thd); - break; - case WSREP_WARNING: - case WSREP_TRX_FAIL: - case WSREP_TRX_MISSING: - /* these suggests a bug in provider code */ - WSREP_WARN("bad return from recv() call: %d", rcode); - /* fall through to node shutdown */ - case WSREP_FATAL: - /* Cluster connectivity is lost. - * - * If applier was killed on purpose (KILL_CONNECTION), we - * avoid mysql shutdown. This is because the killer will then handle - * shutdown processing (or replication restarting) - */ - if (thd->killed != KILL_CONNECTION) - { - wsrep_kill_mysql(thd); - } - break; - } - - mysql_mutex_lock(&LOCK_thread_count); - wsrep_close_applier(thd); - mysql_cond_broadcast(&COND_thread_count); - mysql_mutex_unlock(&LOCK_thread_count); - - if (thd->temporary_tables) - { - WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id); - } - wsrep_return_from_bf_mode(thd, &shadow); - DBUG_VOID_RETURN; -} - -void wsrep_rollback_process(THD *thd) -{ - DBUG_ENTER("wsrep_rollback_process"); - - mysql_mutex_lock(&LOCK_wsrep_rollback); - wsrep_aborting_thd= NULL; - - while (thd->killed == NOT_KILLED) { - thd_proc_info(thd, "wsrep aborter idle"); - thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; - thd->mysys_var->current_cond= &COND_wsrep_rollback; - - mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback); - - WSREP_DEBUG("WSREP rollback thread wakes for signal"); - - mysql_mutex_lock(&thd->mysys_var->mutex); - thd_proc_info(thd, "wsrep aborter active"); - thd->mysys_var->current_mutex= 0; - thd->mysys_var->current_cond= 0; - mysql_mutex_unlock(&thd->mysys_var->mutex); - - /* check for false alarms */ - if (!wsrep_aborting_thd) - { - WSREP_DEBUG("WSREP rollback thread has empty abort queue"); - } - /* process all entries in the queue */ - while (wsrep_aborting_thd) { - THD *aborting; - wsrep_aborting_thd_t next = wsrep_aborting_thd->next; - aborting = wsrep_aborting_thd->aborting_thd; - my_free(wsrep_aborting_thd); - wsrep_aborting_thd= next; - /* - * must release mutex, appliers my want to add more - * aborting thds in our work queue, while we rollback - */ - mysql_mutex_unlock(&LOCK_wsrep_rollback); - - mysql_mutex_lock(&aborting->LOCK_wsrep_thd); - if (aborting->wsrep_conflict_state== ABORTED) - { - WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", - (long long)aborting->real_id, - aborting->wsrep_conflict_state); - - mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); - mysql_mutex_lock(&LOCK_wsrep_rollback); - continue; - } - aborting->wsrep_conflict_state= ABORTING; - - mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); - - aborting->store_globals(); - - mysql_mutex_lock(&aborting->LOCK_wsrep_thd); - wsrep_client_rollback(aborting); - WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)", - aborting->thread_id, (long long)aborting->real_id); - mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); - - mysql_mutex_lock(&LOCK_wsrep_rollback); - } - } - - mysql_mutex_unlock(&LOCK_wsrep_rollback); - sql_print_information("WSREP: rollbacker thread exiting"); - - DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting")); - DBUG_VOID_RETURN; -} -extern "C" -int wsrep_thd_is_brute_force(void *thd_ptr) -{ - /* - Brute force: - Appliers and replaying are running in REPL_RECV mode. TOI statements - in TOTAL_ORDER mode. Locally committing transaction that has got - past wsrep->pre_commit() without error is running in LOCAL_COMMIT mode. - - Everything else is running in LOCAL_STATE and should not be considered - brute force. - */ - if (thd_ptr) { - switch (((THD *)thd_ptr)->wsrep_exec_mode) { - case LOCAL_STATE: return 0; - case REPL_RECV: return 1; - case TOTAL_ORDER: return 2; - case LOCAL_COMMIT: return 3; - } - } - DBUG_ASSERT(0); - return 0; -} -extern "C" -int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) -{ - THD *victim_thd = (THD *) victim_thd_ptr; - THD *bf_thd = (THD *) bf_thd_ptr; - DBUG_ENTER("wsrep_abort_thd"); - - if ( (WSREP(bf_thd) || - ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) && - bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && - victim_thd) - { - WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? - (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); - ha_wsrep_abort_transaction(bf_thd, victim_thd, signal); - } - else - { - WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd); - } - - DBUG_RETURN(1); -} -extern "C" -int wsrep_thd_in_locking_session(void *thd_ptr) -{ - if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) { - return 1; - } - return 0; -} -#endif /** Retuns information about user or current user. diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc index 46f8100d9d1..da03e5f7a03 100644 --- a/sql/sql_trigger.cc +++ b/sql/sql_trigger.cc @@ -2453,7 +2453,7 @@ bool load_table_name_for_trigger(THD *thd, DBUG_RETURN(FALSE); } #ifdef WITH_WSREP -int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len) { LEX *lex= thd->lex; String stmt_query; @@ -2501,6 +2501,6 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len) stmt_query.append(stmt_definition.str, stmt_definition.length); return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(), - buf, buf_len); + buf, buf_len); } #endif /* WITH_WSREP */ diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index ef70b1e77d0..6c3dd171706 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2571,7 +2571,9 @@ static bool fix_autocommit(sys_var *self, THD *thd, enum_var_type type) { thd->variables.option_bits&= ~OPTION_AUTOCOMMIT; thd->mdl_context.release_transactional_locks(); +#ifdef WITH_WSREP WSREP_DEBUG("autocommit, MDL TRX lock released: %lu", thd->thread_id); +#endif /* WITH_WSREP */ return true; } /* @@ -3654,7 +3656,9 @@ static Sys_var_tz Sys_time_zone( SESSION_VAR(time_zone), NO_CMD_LINE, DEFAULT(&default_tz), NO_MUTEX_GUARD, IN_BINLOG); #ifdef WITH_WSREP -#include "wsrep_mysqld.h" +#include "wsrep_var.h" +#include "wsrep_sst.h" +#include "wsrep_binlog.h" static Sys_var_charptr Sys_wsrep_provider( "wsrep_provider", "Path to replication provider library", @@ -3812,10 +3816,11 @@ static Sys_var_charptr Sys_wsrep_start_position ( ON_CHECK(wsrep_start_position_check), ON_UPDATE(wsrep_start_position_update)); -static Sys_var_ulonglong Sys_wsrep_max_ws_size ( +static Sys_var_ulong Sys_wsrep_max_ws_size ( "wsrep_max_ws_size", "Max write set size (bytes)", GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG), - VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1)); + /* Upper limit is 65K short of 4G to avoid overlows on 32-bit systems */ + VALID_RANGE(1024, WSREP_MAX_WS_SIZE), DEFAULT(1073741824UL), BLOCK_SIZE(1)); static Sys_var_ulong Sys_wsrep_max_ws_rows ( "wsrep_max_ws_rows", "Max number of rows in write set", @@ -3835,8 +3840,7 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK( static Sys_var_mybool Sys_wsrep_causal_reads( "wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations", SESSION_VAR(wsrep_causal_reads), - CMD_LINE(OPT_ARG), DEFAULT(FALSE)); - // ON_UPDATE(wsrep_causal_reads_update)); + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS }; static Sys_var_enum Sys_wsrep_OSU_method( diff --git a/sql/transaction.cc b/sql/transaction.cc index c66b86ff87f..36ed15bfe9c 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -298,6 +298,7 @@ bool trans_rollback(THD *thd) DBUG_RETURN(test(res)); } + /** Implicitly rollback the current transaction, typically after deadlock was discovered. @@ -329,7 +330,6 @@ bool trans_rollback_implicit(THD *thd) #ifdef WITH_WSREP wsrep_register_hton(thd, true); #endif /* WITH_WSREP */ - thd->server_status&= ~SERVER_STATUS_IN_TRANS; DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); res= ha_rollback_trans(thd, true); @@ -433,7 +433,6 @@ bool trans_rollback_stmt(THD *thd) wsrep_register_hton(thd, FALSE); #endif /* WITH_WSREP */ ha_rollback_trans(thd, FALSE); - if (! thd->in_active_multi_stmt_transaction()) thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; } diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc new file mode 100644 index 00000000000..f1016dff902 --- /dev/null +++ b/sql/wsrep_applier.cc @@ -0,0 +1,351 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#include "wsrep_priv.h" +#include "wsrep_binlog.h" // wsrep_dump_rbr_buf() + +#include "log_event.h" // EVENT_LEN_OFFSET, etc. + +#include "wsrep_applier.h" + +/* + read the first event from (*buf). The size of the (*buf) is (*buf_len). + At the end (*buf) is shitfed to point to the following event or NULL and + (*buf_len) will be changed to account just being read bytes of the 1st event. +*/ + +static Log_event* wsrep_read_log_event( + char **arg_buf, size_t *arg_buf_len, + const Format_description_log_event *description_event) +{ + DBUG_ENTER("wsrep_read_log_event"); + char *head= (*arg_buf); + + uint data_len = uint4korr(head + EVENT_LEN_OFFSET); + char *buf= (*arg_buf); + const char *error= 0; + Log_event *res= 0; + + if (data_len > wsrep_max_ws_size) + { + error = "Event too big"; + goto err; + } + + res= Log_event::read_log_event(buf, data_len, &error, description_event, + FALSE); + +err: + if (!res) + { + DBUG_ASSERT(error != 0); + sql_print_error("Error in Log_event::read_log_event(): " + "'%s', data_len: %d, event_type: %d", + error,data_len,head[EVENT_TYPE_OFFSET]); + } + (*arg_buf)+= data_len; + (*arg_buf_len)-= data_len; + DBUG_RETURN(res); +} + +#include "transaction.h" // trans_commit(), trans_rollback() +#include "rpl_rli.h" // class Relay_log_info; +#include "sql_base.h" // close_temporary_table() + +extern const Format_description_log_event *wsrep_format_desc; + +static wsrep_cb_status_t wsrep_apply_events(THD* thd, + const void* events_buf, + size_t buf_len) +{ + char *buf= (char *)events_buf; + int rcode= 0; + int event= 1; + + DBUG_ENTER("wsrep_apply_rbr"); + + if (thd->killed == KILL_CONNECTION) + { + WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld", + (long long) wsrep_thd_trx_seqno(thd)); + DBUG_RETURN(WSREP_CB_FAILURE); + } + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_EXEC; + if (thd->wsrep_conflict_state!= REPLAYING) + thd->wsrep_conflict_state= NO_CONFLICT; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", + (long long) wsrep_thd_trx_seqno(thd)); + + while(buf_len) + { + int exec_res; + int error = 0; + Log_event* ev= wsrep_read_log_event(&buf, &buf_len, wsrep_format_desc); + + if (!ev) + { + WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld", + (long long)wsrep_thd_trx_seqno(thd), buf_len); + rcode= 1; + goto error; + } + switch (ev->get_type_code()) { + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + DBUG_ASSERT(buf_len != 0 || + ((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)); + break; + default: + break; + } + + thd->server_id = ev->server_id; // use the original server id for logging + thd->set_time(); // time the query + wsrep_xid_init(&thd->transaction.xid_state.xid, + &thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.seqno); + thd->lex->current_select= 0; + if (!ev->when) + ev->when = time(NULL); + ev->thd = thd; + exec_res = ev->apply_event(thd->wsrep_rli); + DBUG_PRINT("info", ("exec_event result: %d", exec_res)); + + if (exec_res) + { + WSREP_WARN("RBR event %d %s apply warning: %d, %lld", + event, ev->get_type_str(), exec_res, + (long long) wsrep_thd_trx_seqno(thd)); + rcode= exec_res; + /* stop processing for the first error */ + delete ev; + goto error; + } + event++; + + if (thd->wsrep_conflict_state!= NO_CONFLICT && + thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("conflict state after RBR event applying: %d, %lld", + thd->wsrep_query_state, (long long)wsrep_thd_trx_seqno(thd)); + + if (thd->wsrep_conflict_state == MUST_ABORT) { + WSREP_WARN("RBR event apply failed, rolling back: %lld", + (long long) wsrep_thd_trx_seqno(thd)); + trans_rollback(thd); + thd->locked_tables_list.unlock_locked_tables(thd); + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + thd->wsrep_conflict_state= NO_CONFLICT; + DBUG_RETURN(WSREP_CB_FAILURE); + } + + if ((ev->get_type_code() == WRITE_ROWS_EVENT || + ev->get_type_code() == UPDATE_ROWS_EVENT || + ev->get_type_code() == DELETE_ROWS_EVENT) && + ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F)) + { + thd->wsrep_rli->cleanup_context(thd, 0); + + if (error == 0) + { + thd->clear_error(); + } + else + WSREP_ERROR("Error in %s event: commit of row events failed: %lld", + ev->get_type_str(), (long long)wsrep_thd_trx_seqno(thd)); + } + delete ev; + } + + error: + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_query_state= QUERY_IDLE; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + assert(thd->wsrep_exec_mode== REPL_RECV); + + if (thd->killed == KILL_CONNECTION) + WSREP_INFO("applier aborted: %lld", (long long)wsrep_thd_trx_seqno(thd)); + + if (rcode) DBUG_RETURN(WSREP_CB_FAILURE); + DBUG_RETURN(WSREP_CB_SUCCESS); +} + +wsrep_cb_status_t wsrep_apply_cb(void* const ctx, + const void* const buf, + size_t const buf_len, + uint32_t const flags, + const wsrep_trx_meta_t* meta) +{ + THD* const thd((THD*)ctx); + + thd->wsrep_trx_meta = *meta; + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "applying write set %lld: %p, %zu", + (long long)wsrep_thd_trx_seqno(thd), buf, buf_len); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "applying write set"); +#endif /* WSREP_PROC_INFO */ + + if (flags & WSREP_FLAG_ISOLATION) + { + thd->wsrep_apply_toi= true; + /* + Don't run in transaction mode with TOI actions. + */ + thd->variables.option_bits&= ~OPTION_BEGIN; + thd->server_status&= ~SERVER_STATUS_IN_TRANS; + } + wsrep_cb_status_t rcode(wsrep_apply_events(thd, buf, buf_len)); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "applied write set %lld", (long long)wsrep_thd_trx_seqno(thd)); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "applied write set"); +#endif /* WSREP_PROC_INFO */ + + if (WSREP_CB_SUCCESS != rcode) + { + wsrep_dump_rbr_buf(thd, buf, buf_len); + } + + TABLE *tmp; + while ((tmp = thd->temporary_tables)) + { + WSREP_DEBUG("Applier %lu, has temporary tables: %s.%s", + thd->thread_id, + (tmp->s) ? tmp->s->db.str : "void", + (tmp->s) ? tmp->s->table_name.str : "void"); + close_temporary_table(thd, tmp, 1, 1); + } + + return rcode; +} + +static wsrep_cb_status_t wsrep_commit(THD* const thd, + wsrep_seqno_t const global_seqno) +{ +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "committing %lld", (long long)wsrep_thd_trx_seqno(thd)); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "committing"); +#endif /* WSREP_PROC_INFO */ + + wsrep_cb_status_t const rcode(trans_commit(thd) ? + WSREP_CB_FAILURE : WSREP_CB_SUCCESS); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "committed %lld", (long long)wsrep_thd_trx_seqno(thd)); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "committed"); +#endif /* WSREP_PROC_INFO */ + + if (WSREP_CB_SUCCESS == rcode) + { + // TODO: mark snapshot with global_seqno. + } + + return rcode; +} + +static wsrep_cb_status_t wsrep_rollback(THD* const thd, + wsrep_seqno_t const global_seqno) +{ +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "rolling back %lld", (long long)wsrep_thd_trx_seqno(thd)); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "rolling back"); +#endif /* WSREP_PROC_INFO */ + + wsrep_cb_status_t const rcode(trans_rollback(thd) ? + WSREP_CB_FAILURE : WSREP_CB_SUCCESS); + +#ifdef WSREP_PROC_INFO + snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, + "rolled back %lld", (long long)wsrep_thd_trx_seqno(thd)); + thd_proc_info(thd, thd->wsrep_info); +#else + thd_proc_info(thd, "rolled back"); +#endif /* WSREP_PROC_INFO */ + + return rcode; +} + +wsrep_cb_status_t wsrep_commit_cb(void* const ctx, + uint32_t const flags, + const wsrep_trx_meta_t* meta, + wsrep_bool_t* const exit, + bool const commit) +{ + THD* const thd((THD*)ctx); + + assert(meta->gtid.seqno == wsrep_thd_trx_seqno(thd)); + + wsrep_cb_status_t rcode; + + if (commit) + rcode = wsrep_commit(thd, meta->gtid.seqno); + else + rcode = wsrep_rollback(thd, meta->gtid.seqno); + + thd->mdl_context.release_transactional_locks(); + free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); + thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; + + if (wsrep_slave_count_change < 0 && commit && WSREP_CB_SUCCESS == rcode) + { + mysql_mutex_lock(&LOCK_wsrep_slave_threads); + if (wsrep_slave_count_change < 0) + { + wsrep_slave_count_change++; + *exit = true; + } + mysql_mutex_unlock(&LOCK_wsrep_slave_threads); + } + + if (*exit == false && thd->wsrep_applier) + { + /* From trans_begin() */ + thd->variables.option_bits|= OPTION_BEGIN; + thd->server_status|= SERVER_STATUS_IN_TRANS; + thd->wsrep_apply_toi= false; + } + + return rcode; +} + + +wsrep_cb_status_t wsrep_unordered_cb(void* const ctx, + const void* const data, + size_t const size) +{ + return WSREP_CB_SUCCESS; +} diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h new file mode 100644 index 00000000000..816970db67c --- /dev/null +++ b/sql/wsrep_applier.h @@ -0,0 +1,38 @@ +/* Copyright 2013 Codership Oy <http://www.codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef WSREP_APPLIER_H +#define WSREP_APPLIER_H + +#include <sys/types.h> + +/* wsrep callback prototypes */ + +wsrep_cb_status_t wsrep_apply_cb(void *ctx, + const void* buf, size_t buf_len, + uint32_t flags, + const wsrep_trx_meta_t* meta); + +wsrep_cb_status_t wsrep_commit_cb(void *ctx, + uint32_t flags, + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit, + bool commit); + +wsrep_cb_status_t wsrep_unordered_cb(void* ctx, + const void* data, + size_t size); + +#endif /* WSREP_APPLIER_H */ diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc new file mode 100644 index 00000000000..a0c56ce9299 --- /dev/null +++ b/sql/wsrep_binlog.cc @@ -0,0 +1,321 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#include "wsrep_binlog.h" +#include "wsrep_priv.h" + +/* + Write the contents of a cache to a memory buffer. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + */ +int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len) +{ + *buf= NULL; + *buf_len= 0; + + my_off_t const saved_pos(my_b_tell(cache)); + + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + { + WSREP_ERROR("failed to initialize io-cache"); + return ER_ERROR_ON_WRITE; + } + + uint length = my_b_bytes_in_cache(cache); + if (unlikely(0 == length)) length = my_b_fill(cache); + + size_t total_length = 0; + + if (likely(length > 0)) do + { + total_length += length; + /* + Bail out if buffer grows too large. + A temporary fix to avoid allocating indefinitely large buffer, + not a real limit on a writeset size which includes other things + like header and keys. + */ + if (total_length > wsrep_max_ws_size) + { + WSREP_WARN("transaction size limit (%lu) exceeded: %zu", + wsrep_max_ws_size, total_length); + goto error; + } + + uchar* tmp = (uchar *)my_realloc(*buf, total_length, MYF(0)); + if (!tmp) + { + WSREP_ERROR("could not (re)allocate buffer: %zu + %u", + *buf_len, length); + goto error; + } + *buf = tmp; + + memcpy(*buf + *buf_len, cache->read_pos, length); + *buf_len = total_length; + cache->read_pos = cache->read_end; + } while ((cache->file >= 0) && (length = my_b_fill(cache))); + + if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) + { + WSREP_WARN("failed to initialize io-cache"); + goto cleanup; + } + + return 0; + +error: + if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) + { + WSREP_ERROR("failed to initialize io-cache"); + } +cleanup: + my_free(*buf); + *buf= NULL; + *buf_len= 0; + return ER_ERROR_ON_WRITE; +} + +#define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack: + * many transactions would fit in there + * so there is no need to reach for the heap */ + +/* Returns minimum multiple of HEAP_PAGE_SIZE that is >= length */ +static inline size_t +heap_size(size_t length) +{ + return (length + HEAP_PAGE_SIZE - 1)/HEAP_PAGE_SIZE*HEAP_PAGE_SIZE; +} + +/* append data to writeset */ +static inline wsrep_status_t +wsrep_append_data(wsrep_t* const wsrep, + wsrep_ws_handle_t* const ws, + const void* const data, + size_t const len) +{ + struct wsrep_buf const buff = { data, len }; + wsrep_status_t const rc(wsrep->append_data(wsrep, ws, &buff, 1, + WSREP_DATA_ORDERED, true)); + if (rc != WSREP_OK) + { + WSREP_WARN("append_data() returned %d", rc); + } + + return rc; +} + +/* + Write the contents of a cache to wsrep provider. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + + This version reads all of cache into single buffer and then appends to a + writeset at once. + */ +static int wsrep_write_cache_once(wsrep_t* const wsrep, + THD* const thd, + IO_CACHE* const cache, + size_t* const len) +{ + my_off_t const saved_pos(my_b_tell(cache)); + + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + { + WSREP_ERROR("failed to initialize io-cache"); + return ER_ERROR_ON_WRITE; + } + + int err(WSREP_OK); + + size_t total_length(0); + uchar stack_buf[STACK_SIZE]; /* to avoid dynamic allocations for few data*/ + uchar* heap_buf(NULL); + uchar* buf(stack_buf); + size_t allocated(sizeof(stack_buf)); + size_t used(0); + + uint length(my_b_bytes_in_cache(cache)); + if (unlikely(0 == length)) length = my_b_fill(cache); + + if (likely(length > 0)) do + { + total_length += length; + /* + Bail out if buffer grows too large. + A temporary fix to avoid allocating indefinitely large buffer, + not a real limit on a writeset size which includes other things + like header and keys. + */ + if (unlikely(total_length > wsrep_max_ws_size)) + { + WSREP_WARN("transaction size limit (%lu) exceeded: %zu", + wsrep_max_ws_size, total_length); + goto cleanup; + } + + if (total_length > allocated) + { + size_t const new_size(heap_size(total_length)); + uchar* tmp = (uchar *)my_realloc(heap_buf, new_size, MYF(0)); + if (!tmp) + { + WSREP_ERROR("could not (re)allocate buffer: %zu + %u", + allocated, length); + err = WSREP_SIZE_EXCEEDED; + goto cleanup; + } + + heap_buf = tmp; + buf = heap_buf; + allocated = new_size; + + if (used <= STACK_SIZE && used > 0) // there's data in stack_buf + { + DBUG_ASSERT(buf == stack_buf); + memcpy(heap_buf, stack_buf, used); + } + } + + memcpy(buf + used, cache->read_pos, length); + used = total_length; + cache->read_pos = cache->read_end; + } while ((cache->file >= 0) && (length = my_b_fill(cache))); + + if (used > 0) + err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used); + + if (WSREP_OK == err) *len = total_length; + +cleanup: + if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) + { + WSREP_ERROR("failed to reinitialize io-cache"); + } + + if (unlikely(WSREP_OK != err)) wsrep_dump_rbr_buf(thd, buf, used); + + my_free(heap_buf); + return err; +} + +/* + Write the contents of a cache to wsrep provider. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + + This version uses incremental data appending as it reads it from cache. + */ +static int wsrep_write_cache_inc(wsrep_t* const wsrep, + THD* const thd, + IO_CACHE* const cache, + size_t* const len) +{ + my_off_t const saved_pos(my_b_tell(cache)); + + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + { + WSREP_ERROR("failed to initialize io-cache"); + return WSREP_TRX_ROLLBACK; + } + + int err(WSREP_OK); + + size_t total_length(0); + + uint length(my_b_bytes_in_cache(cache)); + if (unlikely(0 == length)) length = my_b_fill(cache); + + if (likely(length > 0)) do + { + total_length += length; + /* bail out if buffer grows too large + not a real limit on a writeset size which includes other things + like header and keys. + */ + if (unlikely(total_length > wsrep_max_ws_size)) + { + WSREP_WARN("transaction size limit (%lu) exceeded: %zu", + wsrep_max_ws_size, total_length); + err = WSREP_SIZE_EXCEEDED; + goto cleanup; + } + + if(WSREP_OK != (err=wsrep_append_data(wsrep, &thd->wsrep_ws_handle, + cache->read_pos, length))) + goto cleanup; + + cache->read_pos = cache->read_end; + } while ((cache->file >= 0) && (length = my_b_fill(cache))); + + if (WSREP_OK == err) *len = total_length; + +cleanup: + if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) + { + WSREP_ERROR("failed to reinitialize io-cache"); + } + + return err; +} + +/* + Write the contents of a cache to wsrep provider. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + */ +int wsrep_write_cache(wsrep_t* const wsrep, + THD* const thd, + IO_CACHE* const cache, + size_t* const len) +{ + if (wsrep_incremental_data_collection) { + return wsrep_write_cache_inc(wsrep, thd, cache, len); + } + else { + return wsrep_write_cache_once(wsrep, thd, cache, len); + } +} + +void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len) +{ + char filename[PATH_MAX]= {0}; + int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log", + wsrep_data_home_dir, thd->thread_id, + (long long)wsrep_thd_trx_seqno(thd)); + if (len >= PATH_MAX) + { + WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len); + return; + } + + FILE *of= fopen(filename, "wb"); + if (of) + { + fwrite (rbr_buf, buf_len, 1, of); + fclose(of); + } + else + { + WSREP_ERROR("Failed to open file '%s': %d (%s)", + filename, errno, strerror(errno)); + } +} + diff --git a/sql/wsrep_binlog.h b/sql/wsrep_binlog.h new file mode 100644 index 00000000000..6de73b2f5ee --- /dev/null +++ b/sql/wsrep_binlog.h @@ -0,0 +1,49 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_BINLOG_H +#define WSREP_BINLOG_H + +#include "sql_class.h" // THD, IO_CACHE + +#define HEAP_PAGE_SIZE 65536 /* 64K */ +#define WSREP_MAX_WS_SIZE (0xFFFFFFFFUL - HEAP_PAGE_SIZE) + +/* + Write the contents of a cache to a memory buffer. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + */ +int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len); + +/* + Write the contents of a cache to wsrep provider. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + + @param len total amount of data written + @return wsrep error status + */ +int wsrep_write_cache (wsrep_t* wsrep, + THD* thd, + IO_CACHE* cache, + size_t* len); + +/* Dump replication buffer to disk */ +void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); + +#endif /* WSREP_BINLOG_H */ diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index d4bb77c9e6f..8eb5340dd58 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -18,7 +18,7 @@ #include "rpl_filter.h" #include <sql_class.h> #include "wsrep_mysqld.h" -#include "wsrep_priv.h" +#include "wsrep_binlog.h" #include <cstdio> #include <cstdlib> @@ -26,10 +26,11 @@ extern handlerton *binlog_hton; extern int binlog_close_connection(handlerton *hton, THD *thd); extern ulonglong thd_to_trx_id(THD *thd); -extern "C" int thd_binlog_format(const MYSQL_THD thd); -// todo: share interface with ha_innodb.c +extern "C" int thd_binlog_format(const MYSQL_THD thd); +// todo: share interface with ha_innodb.c -enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); +enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, + bool all); /* Cleanup after local transaction commit/rollback, replay or TOI. @@ -37,8 +38,9 @@ enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool al void wsrep_cleanup_transaction(THD *thd) { if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd); - thd->wsrep_trx_handle.trx_id= WSREP_UNDEFINED_TRX_ID; - thd->wsrep_trx_seqno= WSREP_SEQNO_UNDEFINED; + thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID; + thd->wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; + thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; thd->wsrep_exec_mode= LOCAL_STATE; return; } @@ -66,7 +68,7 @@ handlerton *wsrep_hton; */ void wsrep_register_hton(THD* thd, bool all) { - if (thd->wsrep_exec_mode != TOTAL_ORDER) + if (thd->wsrep_exec_mode != TOTAL_ORDER && !thd->wsrep_apply_toi) { THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next()) @@ -94,8 +96,8 @@ void wsrep_post_commit(THD* thd, bool all) { if (thd->wsrep_exec_mode == LOCAL_COMMIT) { - DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED); - if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle)) + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); + if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("set committed fail")); WSREP_WARN("set committed fail: %llu %d", @@ -106,7 +108,7 @@ void wsrep_post_commit(THD* thd, bool all) } /* - wsrep exploits binlog's caches even if binlogging itself is not + wsrep exploits binlog's caches even if binlogging itself is not activated. In such case connection close needs calling actual binlog's method. Todo: split binlog hton from its caches to use ones by wsrep @@ -125,7 +127,7 @@ wsrep_close_connection(handlerton* hton, THD* thd) if (wsrep_emulate_bin_log && thd_get_ha_data(thd, binlog_hton) != NULL) binlog_hton->close_connection (binlog_hton, thd); DBUG_RETURN(0); -} +} /* prepare/wsrep_run_wsrep_commit can fail in two ways @@ -147,18 +149,15 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all) DBUG_ASSERT(thd->ha_data[wsrep_hton->slot].ha_info[all].is_trx_read_write()); DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE); - DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED); - if ((all || + if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd))) { switch (wsrep_run_wsrep_commit(thd, hton, all)) { case WSREP_TRX_OK: - // DBUG_ASSERT(thd->wsrep_trx_seqno > old || - // thd->wsrep_exec_mode == REPL_RECV || - // thd->wsrep_exec_mode == TOTAL_ORDER); break; case WSREP_TRX_ROLLBACK: case WSREP_TRX_ERROR: @@ -208,10 +207,10 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all) if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) { - if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) + if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", + WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", (long long)thd->real_id, thd->query()); } wsrep_cleanup_transaction(thd); @@ -249,12 +248,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) possible changes to clean state. */ if (WSREP_PROVIDER_EXISTS) { - if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) - { - DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", - (long long)thd->real_id, thd->query()); - } + if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) + { + DBUG_PRINT("wsrep", ("setting rollback fail")); + WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", + (long long)thd->real_id, thd->query()); + } } wsrep_cleanup_transaction(thd); } @@ -266,26 +265,24 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) extern Rpl_filter* binlog_filter; extern my_bool opt_log_slave_updates; -extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); + enum wsrep_trx_status -wsrep_run_wsrep_commit( - THD *thd, handlerton *hton, bool all) +wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) { - int rcode = -1; - uint data_len = 0; - uchar *rbr_data = NULL; + int rcode= -1; + size_t data_len= 0; IO_CACHE *cache; int replay_round= 0; if (thd->stmt_da->is_error()) { - WSREP_ERROR("commit issue, error: %d %s", + WSREP_ERROR("commit issue, error: %d %s", thd->stmt_da->sql_errno(), thd->stmt_da->message()); } DBUG_ENTER("wsrep_run_wsrep_commit"); - if (thd->slave_thread && !opt_log_slave_updates) { - DBUG_RETURN(WSREP_TRX_OK); - } + + if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK); + if (thd->wsrep_exec_mode == REPL_RECV) { mysql_mutex_lock(&thd->LOCK_wsrep_thd); @@ -303,9 +300,9 @@ wsrep_run_wsrep_commit( } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } - if (thd->wsrep_exec_mode != LOCAL_STATE) { - DBUG_RETURN(WSREP_TRX_OK); - } + + if (thd->wsrep_exec_mode != LOCAL_STATE) DBUG_RETURN(WSREP_TRX_OK); + if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) { WSREP_DEBUG("commit for consistency check: %s", thd->query()); DBUG_RETURN(WSREP_TRX_OK); @@ -327,10 +324,10 @@ wsrep_run_wsrep_commit( mysql_mutex_lock(&LOCK_wsrep_replaying); - while (wsrep_replaying > 0 && + while (wsrep_replaying > 0 && thd->wsrep_conflict_state == NO_CONFLICT && thd->killed == NOT_KILLED && - !shutdown_in_progress) + !shutdown_in_progress) { mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -348,9 +345,12 @@ wsrep_run_wsrep_commit( struct timespec wtime = {0, 1000000}; mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying, &wtime); + if (replay_round++ % 100000 == 0) - WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)", - wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round); + WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) " + "conflict: %d (round: %d)", + wsrep_replaying, thd->thread_id, + thd->wsrep_conflict_state, replay_round); mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -371,7 +371,8 @@ wsrep_run_wsrep_commit( WSREP_DEBUG("innobase_commit abort after replaying wait %s", (thd->query()) ? thd->query() : "void"); DBUG_RETURN(WSREP_TRX_ROLLBACK); - } + } + thd->wsrep_query_state = QUERY_COMMITTING; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); @@ -379,28 +380,28 @@ wsrep_run_wsrep_commit( rcode = 0; if (cache) { thd->binlog_flush_pending_rows_event(true); - rcode = wsrep_write_cache(cache, &rbr_data, &data_len); - if (rcode) { - WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode); - if (data_len) my_free(rbr_data); + rcode = wsrep_write_cache(wsrep, thd, cache, &data_len); + if (WSREP_OK != rcode) { + WSREP_ERROR("rbr write fail, data_len: %zu, %d", data_len, rcode); DBUG_RETURN(WSREP_TRX_ROLLBACK); } } - if (data_len == 0) + + if (data_len == 0) { - if (thd->stmt_da->is_ok() && + if (thd->stmt_da->is_ok() && thd->stmt_da->affected_rows() > 0 && !binlog_filter->is_on()) { WSREP_DEBUG("empty rbr buffer, query: %s, " - "affected rows: %llu, " - "changed tables: %d, " + "affected rows: %llu, " + "changed tables: %d, " "sql_log_bin: %d, " - "wsrep status (%d %d %d)", + "wsrep status (%d %d %d)", thd->query(), thd->stmt_da->affected_rows(), stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin, - thd->wsrep_exec_mode, thd->wsrep_query_state, - thd->wsrep_conflict_state); + thd->wsrep_exec_mode, thd->wsrep_query_state, + thd->wsrep_conflict_state); } else { @@ -409,38 +410,33 @@ wsrep_run_wsrep_commit( thd->wsrep_query_state= QUERY_EXEC; DBUG_RETURN(WSREP_TRX_OK); } - if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_trx_handle.trx_id) + + if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id) { - WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %d\n" - "QUERY: %s\n" - " => Skipping replication", - thd->thread_id, data_len, thd->query()); - if (wsrep_debug) - { - wsrep_write_rbr_buf(thd, rbr_data, data_len); - } + WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n" + "QUERY: %s\n" + " => Skipping replication", + thd->thread_id, data_len, thd->query()); rcode = WSREP_TRX_FAIL; } else if (!rcode) { - rcode = wsrep->pre_commit( - wsrep, - (wsrep_conn_id_t)thd->thread_id, - &thd->wsrep_trx_handle, - rbr_data, - data_len, - (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL, - &thd->wsrep_trx_seqno); - switch (rcode) { - case WSREP_TRX_MISSING: + if (WSREP_OK == rcode) + rcode = wsrep->pre_commit(wsrep, + (wsrep_conn_id_t)thd->thread_id, + &thd->wsrep_ws_handle, + WSREP_FLAG_COMMIT | + ((thd->wsrep_PA_safe) ? + 0ULL : WSREP_FLAG_PA_UNSAFE), + &thd->wsrep_trx_meta); + + if (rcode == WSREP_TRX_MISSING) { WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", thd->thread_id, thd->query()); - wsrep_write_rbr_buf(thd, rbr_data, data_len); rcode = WSREP_TRX_FAIL; - break; - case WSREP_BF_ABORT: + } else if (rcode == WSREP_BF_ABORT) { WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay", - thd->thread_id, (long long)thd->wsrep_trx_seqno); + thd->thread_id, (long long)thd->wsrep_trx_meta.gtid.seqno); mysql_mutex_lock(&thd->LOCK_wsrep_thd); thd->wsrep_conflict_state = MUST_REPLAY; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); @@ -449,22 +445,14 @@ wsrep_run_wsrep_commit( WSREP_DEBUG("replaying increased: %d, thd: %lu", wsrep_replaying, thd->thread_id); mysql_mutex_unlock(&LOCK_wsrep_replaying); - break; - default: - break; } } else { WSREP_ERROR("I/O error reading from thd's binlog iocache: " "errno=%d, io cache code=%d", my_errno, cache->error); - if (data_len) my_free(rbr_data); DBUG_ASSERT(0); // failure like this can not normally happen DBUG_RETURN(WSREP_TRX_ERROR); } - if (data_len) { - my_free(rbr_data); - } - mysql_mutex_lock(&thd->LOCK_wsrep_thd); switch(rcode) { case 0: @@ -481,22 +469,22 @@ wsrep_run_wsrep_commit( { WSREP_WARN("thd %lu seqno %lld: conflict state %d after post commit", thd->thread_id, - (long long)thd->wsrep_trx_seqno, + (long long)thd->wsrep_trx_meta.gtid.seqno, thd->wsrep_conflict_state); } thd->wsrep_exec_mode= LOCAL_COMMIT; - DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); /* Override XID iff it was generated by mysql */ if (thd->transaction.xid_state.xid.get_my_xid()) { wsrep_xid_init(&thd->transaction.xid_state.xid, - wsrep_cluster_uuid(), - thd->wsrep_trx_seqno); + &thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.seqno); } DBUG_PRINT("wsrep", ("replicating commit success")); break; case WSREP_BF_ABORT: - DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED); case WSREP_TRX_FAIL: WSREP_DEBUG("commit failed for reason: %d", rcode); DBUG_PRINT("wsrep", ("replicating commit fail")); @@ -505,7 +493,7 @@ wsrep_run_wsrep_commit( if (thd->wsrep_conflict_state == MUST_ABORT) { thd->wsrep_conflict_state= ABORTED; - } + } else { WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state); @@ -563,14 +551,15 @@ mysql_declare_plugin(wsrep) &wsrep_storage_engine, "wsrep", "Codership Oy", - "A pseudo storage engine to represent transactions in multi-master synchornous replication", + "A pseudo storage engine to represent transactions in multi-master " + "synchornous replication", PLUGIN_LICENSE_GPL, wsrep_hton_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, NULL, /* status variables */ NULL, /* system variables */ - NULL, /* config options */ + NULL, /* config options */ 0, /* flags */ } mysql_declare_plugin_end; diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 2181054a34c..666952e6f52 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -1,4 +1,4 @@ -/* Copyright 2008 Codership Oy <http://www.codership.com> +/* Copyright 2008-2013 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -17,11 +17,17 @@ #include <sql_class.h> #include <sql_parse.h> #include "wsrep_priv.h" +#include "wsrep_thd.h" +#include "wsrep_sst.h" +#include "wsrep_utils.h" +#include "wsrep_var.h" +#include "wsrep_binlog.h" +#include "wsrep_applier.h" #include <cstdio> #include <cstdlib> #include "log_event.h" -extern Format_description_log_event *wsrep_format_desc; +Format_description_log_event *wsrep_format_desc = NULL; wsrep_t *wsrep = NULL; my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface @@ -33,24 +39,26 @@ const char* wsrep_data_home_dir = NULL; const char* wsrep_dbug_option = ""; long wsrep_slave_threads = 1; // # of slave action appliers wanted +int wsrep_slave_count_change = 0; // # of appliers to stop or start my_bool wsrep_debug = 0; // enable debug level logging my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx my_bool wsrep_auto_increment_control = 1; // control auto increment variables my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey my_bool wsrep_incremental_data_collection = 0; // incremental data collection -long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size -long wsrep_max_ws_rows = 65536; // max number of rows in ws +ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size +ulong wsrep_max_ws_rows = 65536; // max number of rows in ws int wsrep_to_isolation = 0; // # of active TO isolation threads my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key long wsrep_max_protocol_version = 2; // maximum protocol version to use ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; my_bool wsrep_recovery = 0; // recovery my_bool wsrep_replicate_myisam = 0; // enable myisam replication -my_bool wsrep_log_conflicts = 0; // +my_bool wsrep_log_conflicts = 0; ulong wsrep_mysql_replication_bundle = 0; +my_bool wsrep_desync = 0; // desynchronize the node from the + // cluster my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals -my_bool wsrep_desync = 0; // desynchronize the node from the cluster /* * End configuration options @@ -88,12 +96,12 @@ long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED; const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED]; long wsrep_cluster_size = 0; long wsrep_local_index = -1; +long long wsrep_local_bf_aborts = 0; const char* wsrep_provider_name = provider_name; const char* wsrep_provider_version = provider_version; const char* wsrep_provider_vendor = provider_vendor; /* End wsrep status variables */ - wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; wsp::node_status local_status; @@ -104,14 +112,6 @@ long wsrep_protocol_version = 2; // if there was no state gap on receiving first view event. static my_bool wsrep_startup = TRUE; -// action execute callback -extern wsrep_status_t wsrep_apply_cb(void *ctx, - const void* buf, size_t buf_len, - wsrep_seqno_t global_seqno); - -extern wsrep_status_t wsrep_commit_cb (void *ctx, - wsrep_seqno_t global_seqno, - bool commit); static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { switch (level) { @@ -195,19 +195,25 @@ void wsrep_get_SE_checkpoint(XID* xid) plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); } -static void wsrep_view_handler_cb (void* app_ctx, - void* recv_ctx, - const wsrep_view_info_t* view, - const char* state, - size_t state_len, - void** sst_req, - ssize_t* sst_req_len) +static wsrep_cb_status_t +wsrep_view_handler_cb (void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view, + const char* state, + size_t state_len, + void** sst_req, + size_t* sst_req_len) { + *sst_req = NULL; + *sst_req_len = 0; + wsrep_member_status_t new_status= local_status.get(); - if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t))) + if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t))) { - memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid)); + memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid, + sizeof(cluster_uuid)); + wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, sizeof(cluster_uuid_str)); } @@ -219,7 +225,7 @@ static void wsrep_view_handler_cb (void* app_ctx, WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " "number of nodes: %ld, my index: %ld, protocol version %d", - wsrep_cluster_state_uuid, (long long)view->seqno, + wsrep_cluster_state_uuid, (long long)view->state_id.seqno, (long long)wsrep_cluster_conf_id, wsrep_cluster_status, wsrep_cluster_size, wsrep_local_index, view->proto_ver); @@ -274,16 +280,18 @@ static void wsrep_view_handler_cb (void* app_ctx, WSREP_DEBUG("[debug]: closing client connections for PRIM"); wsrep_close_client_connections(TRUE); - *sst_req_len= wsrep_sst_prepare (sst_req); + ssize_t const req_len= wsrep_sst_prepare (sst_req); - if (*sst_req_len < 0) + if (req_len < 0) { - int err = *sst_req_len; - WSREP_ERROR("SST preparation failed: %d (%s)", -err, strerror(-err)); + WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len, + strerror(-req_len)); new_status= WSREP_MEMBER_UNDEFINED; } else { + assert(sst_req != NULL); + *sst_req_len= req_len; new_status= WSREP_MEMBER_JOINER; } } @@ -299,14 +307,14 @@ static void wsrep_view_handler_cb (void* app_ctx, { wsrep_SE_init_grab(); // Signal mysqld init thread to continue - wsrep_sst_complete (&cluster_uuid, view->seqno, false); + wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false); // and wait for SE initialization wsrep_SE_init_wait(); } else { local_uuid= cluster_uuid; - local_seqno= view->seqno; + local_seqno= view->state_id.seqno; } /* Init storage engine XIDs from first view */ XID xid; @@ -319,7 +327,7 @@ static void wsrep_view_handler_cb (void* app_ctx, if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) { WSREP_ERROR("Undetected state gap. Can't continue."); - wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno, + wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno, &local_uuid, -1); unireg_abort(1); } @@ -331,9 +339,23 @@ static void wsrep_view_handler_cb (void* app_ctx, global_system_variables.auto_increment_increment= view->memb_num; } + { /* capabilities may be updated on new configuration */ + uint64_t const caps(wsrep->capabilities (wsrep)); + + my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0); + if (TRUE == wsrep_incremental_data_collection && FALSE == idc) + { + WSREP_WARN("Unsupported protocol downgrade: " + "incremental data collection disabled. Expect abort."); + } + wsrep_incremental_data_collection = idc; + } + out: wsrep_startup= FALSE; local_status.set(new_status, view); + + return WSREP_CB_SUCCESS; } void wsrep_ready_set (my_bool x) @@ -364,14 +386,26 @@ void wsrep_ready_wait () static void wsrep_synced_cb(void* app_ctx) { WSREP_INFO("Synchronized with group, ready for connections"); + bool signal_main= false; if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); if (!wsrep_ready) { wsrep_ready= TRUE; mysql_cond_signal (&COND_wsrep_ready); + signal_main= true; + } local_status.set(WSREP_MEMBER_SYNCED); mysql_mutex_unlock (&LOCK_wsrep_ready); + + if (signal_main) + { + wsrep_SE_init_grab(); + // Signal mysqld init thread to continue + wsrep_sst_complete (&local_uuid, local_seqno, false); + // and wait for SE initialization + wsrep_SE_init_wait(); + } } static void wsrep_init_position() @@ -416,6 +450,8 @@ static void wsrep_init_position() } } +extern const char* my_bind_addr_str; + int wsrep_init() { int rcode= -1; @@ -470,7 +506,7 @@ int wsrep_init() size_t const node_addr_max= sizeof(node_addr) - 1; if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) { - size_t const ret= guess_ip(node_addr, node_addr_max); + size_t const ret= wsrep_guess_ip(node_addr, node_addr_max); if (!(ret > 0 && ret < node_addr_max)) { WSREP_WARN("Failed to guess base node address. Set it explicitly via " @@ -488,38 +524,57 @@ int wsrep_init() if ((!wsrep_node_incoming_address || !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) { - size_t const node_addr_len= strlen(node_addr); - if (node_addr_len > 0) + unsigned int my_bind_ip= INADDR_ANY; // default if not set + if (my_bind_addr_str && strlen(my_bind_addr_str)) + { + my_bind_ip= wsrep_check_ip(my_bind_addr_str); + } + + if (INADDR_ANY != my_bind_ip) { - const char* const colon= strrchr(node_addr, ':'); - if (strchr(node_addr, ':') == colon) // 1 or 0 ':' + if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip) { - size_t const ip_len= colon ? colon - node_addr : node_addr_len; - if (ip_len + 7 /* :55555\0 */ < inc_addr_max) + snprintf(inc_addr, inc_addr_max, "%s:%u", + my_bind_addr_str, (int)mysqld_port); + } // else leave inc_addr an empty string - mysqld is not listening for + // client connections on network interfaces. + } + else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible + { + size_t const node_addr_len= strlen(node_addr); + if (node_addr_len > 0) + { + const char* const colon= strrchr(node_addr, ':'); + if (strchr(node_addr, ':') == colon) // 1 or 0 ':' { - memcpy (inc_addr, node_addr, ip_len); - snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port); + size_t const ip_len= colon ? colon - node_addr : node_addr_len; + if (ip_len + 7 /* :55555\0 */ < inc_addr_max) + { + memcpy (inc_addr, node_addr, ip_len); + snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u", + (int)mysqld_port); + } + else + { + WSREP_WARN("Guessing address for incoming client connections: " + "address too long."); + inc_addr[0]= '\0'; + } } else { WSREP_WARN("Guessing address for incoming client connections: " - "address too long."); + "too many colons :) ."); inc_addr[0]= '\0'; } } - else + + if (!strlen(inc_addr)) { - WSREP_WARN("Guessing address for incoming client connections: " - "too many colons :) ."); - inc_addr[0]= '\0'; + WSREP_WARN("Guessing address for incoming client connections failed. " + "Try setting wsrep_node_incoming_address explicitly."); } } - - if (!strlen(inc_addr)) - { - WSREP_WARN("Guessing address for incoming client connections failed. " - "Try setting wsrep_node_incoming_address explicitly."); - } } else if (!strchr(wsrep_node_incoming_address, ':')) // no port included { @@ -546,6 +601,8 @@ int wsrep_init() struct wsrep_init_args wsrep_args; + struct wsrep_gtid const state_id = { local_uuid, local_seqno }; + wsrep_args.data_dir = wsrep_data_home_dir; wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; wsrep_args.node_address = node_addr; @@ -554,13 +611,13 @@ int wsrep_init() wsrep_provider_options : ""; wsrep_args.proto_ver = wsrep_max_protocol_version; - wsrep_args.state_uuid = &local_uuid; - wsrep_args.state_seqno = local_seqno; + wsrep_args.state_id = &state_id; wsrep_args.logger_cb = wsrep_log_cb; wsrep_args.view_handler_cb = wsrep_view_handler_cb; wsrep_args.apply_cb = wsrep_apply_cb; wsrep_args.commit_cb = wsrep_commit_cb; + wsrep_args.unordered_cb = wsrep_unordered_cb; wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; wsrep_args.synced_cb = wsrep_synced_cb; @@ -661,8 +718,36 @@ void wsrep_stop_replication(THD *thd) return; } - -extern my_bool wsrep_new_cluster; +/* This one is set to true when --wsrep-new-cluster is found in the command + * line arguments */ +static my_bool wsrep_new_cluster= FALSE; +#define WSREP_NEW_CLUSTER "--wsrep-new-cluster" +/* Finds and hides --wsrep-new-cluster from the arguments list + * by moving it to the end of the list and decrementing argument count */ +void wsrep_filter_new_cluster (int* argc, char* argv[]) +{ + int i; + for (i= *argc - 1; i > 0; i--) + { + /* make a copy of the argument to convert possible underscores to hyphens. + * the copy need not to be longer than WSREP_NEW_CLUSTER option */ + char arg[sizeof(WSREP_NEW_CLUSTER) + 2]= { 0, }; + strncpy(arg, argv[i], sizeof(arg) - 1); + char* underscore; + while (NULL != (underscore= strchr(arg, '_'))) *underscore= '-'; + + if (!strcmp(arg, WSREP_NEW_CLUSTER)) + { + wsrep_new_cluster= TRUE; + *argc -= 1; + /* preserve the order of remaining arguments AND + * preserve the original argument pointers - just in case */ + char* wnc= argv[i]; + memmove(&argv[i], &argv[i + 1], (*argc - i)*sizeof(argv[i])); + argv[*argc]= wnc; /* this will be invisible to the rest of the program */ + } + } +} bool wsrep_start_replication() { @@ -686,20 +771,16 @@ bool wsrep_start_replication() return true; } - /* Note 'bootstrap' address is not officially supported in wsrep API #23 - but it can be back ported from #24 provider to get sneak preview of - bootstrap command - */ - const char* cluster_address = - wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address; + bool const bootstrap(TRUE == wsrep_new_cluster); wsrep_new_cluster= FALSE; WSREP_INFO("Start replication"); if ((rcode = wsrep->connect(wsrep, wsrep_cluster_name, - cluster_address, - wsrep_sst_donor))) + wsrep_cluster_address, + wsrep_sst_donor, + bootstrap))) { if (-ESOCKTNOSUPPORT == rcode) { @@ -720,11 +801,6 @@ bool wsrep_start_replication() { wsrep_connected= TRUE; - uint64_t caps = wsrep->capabilities (wsrep); - - wsrep_incremental_data_collection = - !!(caps & WSREP_CAP_WRITE_SET_INCREMENTS); - char* opts= wsrep->options_get(wsrep); if (opts) { @@ -749,8 +825,8 @@ wsrep_causal_wait (THD* thd) { // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 // TODO: modify to check if thd has locked any rows. - wsrep_seqno_t seqno; - wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno); + wsrep_gtid_t gtid; + wsrep_status_t ret= wsrep->causal_read (wsrep, >id); if (unlikely(WSREP_OK != ret)) { @@ -798,7 +874,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr) { for (size_t i= 0; i < key_arr->keys_len; ++i) { - my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts); + my_free((void*)key_arr->keys[i].key_parts); } my_free(key_arr->keys); key_arr->keys= 0; @@ -818,7 +894,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr) static bool wsrep_prepare_key_for_isolation(const char* db, const char* table, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len) { if (*key_len < 2) return false; @@ -837,13 +913,13 @@ static bool wsrep_prepare_key_for_isolation(const char* db, // sql_print_information("%s.%s", db, table); if (db) { - key[*key_len].buf= db; - key[*key_len].buf_len= strlen(db); + key[*key_len].ptr= db; + key[*key_len].len= strlen(db); ++(*key_len); if (table) { - key[*key_len].buf= table; - key[*key_len].buf_len= strlen(table); + key[*key_len].ptr= table; + key[*key_len].len= strlen(table); ++(*key_len); } } @@ -879,23 +955,23 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, { if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) { - sql_print_error("Can't allocate memory for key_array"); + WSREP_ERROR("Can't allocate memory for key_array"); goto err; } ka->keys_len= 1; - if (!(ka->keys[0].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) + if (!(ka->keys[0].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - sql_print_error("Can't allocate memory for key_parts"); + WSREP_ERROR("Can't allocate memory for key_parts"); goto err; } - ka->keys[0].key_parts_len= 2; + ka->keys[0].key_parts_num= 2; if (!wsrep_prepare_key_for_isolation( db, table, - (wsrep_key_part_t*)ka->keys[0].key_parts, - &ka->keys[0].key_parts_len)) + (wsrep_buf_t*)ka->keys[0].key_parts, + &ka->keys[0].key_parts_num)) { - sql_print_error("Preparing keys for isolation failed"); + WSREP_ERROR("Preparing keys for isolation failed"); goto err; } } @@ -910,24 +986,24 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0)); if (!tmp) { - sql_print_error("Can't allocate memory for key_array"); + WSREP_ERROR("Can't allocate memory for key_array"); goto err; } ka->keys= tmp; - if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) + if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - sql_print_error("Can't allocate memory for key_parts"); + WSREP_ERROR("Can't allocate memory for key_parts"); goto err; } - ka->keys[ka->keys_len].key_parts_len= 2; + ka->keys[ka->keys_len].key_parts_num= 2; ++ka->keys_len; if (!wsrep_prepare_key_for_isolation( table->db, table->table_name, - (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts, - &ka->keys[ka->keys_len - 1].key_parts_len)) + (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, + &ka->keys[ka->keys_len - 1].key_parts_num)) { - sql_print_error("Preparing keys for isolation failed"); + WSREP_ERROR("Preparing keys for isolation failed"); goto err; } } @@ -939,12 +1015,11 @@ err: } - bool wsrep_prepare_key_for_innodb(const uchar* cache_key, - size_t cache_key_len, + size_t cache_key_len, const uchar* row_id, size_t row_id_len, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len) { if (*key_len < 3) return false; @@ -954,33 +1029,36 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, { case 0: { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = cache_key_len; - ++(*key_len); + key[0].ptr = cache_key; + key[0].len = cache_key_len; + + *key_len = 1; break; } case 1: case 2: { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = strlen( (char*)cache_key ); - ++(*key_len); - key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1; - key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) ); - ++(*key_len); + key[0].ptr = cache_key; + key[0].len = strlen( (char*)cache_key ); + + key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1; + key[1].len = strlen( (char*)(key[1].ptr) ); + + *key_len = 2; break; } default: return false; } - key[*key_len].buf = row_id; - key[*key_len].buf_len = row_id_len; + key[*key_len].ptr = row_id; + key[*key_len].len = row_id_len; ++(*key_len); return true; } + /* * Construct Query_log_Event from thd query and serialize it * into buffer. @@ -988,7 +1066,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, * Return 0 in case of success, 1 in case of error. */ int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len) + THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) { IO_CACHE tmp_io_cache; if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, @@ -999,9 +1077,9 @@ int wsrep_to_buf_helper( /* if there is prepare query, add event for it */ if (thd->wsrep_TOI_pre_query) { - Query_log_event ev(thd, thd->wsrep_TOI_pre_query, - thd->wsrep_TOI_pre_query_len, - FALSE, FALSE, FALSE, 0); + Query_log_event ev(thd, thd->wsrep_TOI_pre_query, + thd->wsrep_TOI_pre_query_len, + FALSE, FALSE, FALSE, 0); if (ev.write(&tmp_io_cache)) ret= 1; } @@ -1009,7 +1087,7 @@ int wsrep_to_buf_helper( Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); if (ev.write(&tmp_io_cache)) ret= 1; - if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1; + if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; close_cached_file(&tmp_io_cache); return ret; @@ -1017,7 +1095,7 @@ int wsrep_to_buf_helper( #include "sql_show.h" static int -create_view_query(THD *thd, uchar** buf, uint* buf_len) +create_view_query(THD *thd, uchar** buf, size_t* buf_len) { LEX *lex= thd->lex; SELECT_LEX *select_lex= &lex->select_lex; @@ -1036,15 +1114,15 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) if (!lex->definer) { /* - DEFINER-clause is missing; we have to create default definer in - persistent arena to be PS/SP friendly. - If this is an ALTER VIEW then the current user should be set as - the definer. + DEFINER-clause is missing; we have to create default definer in + persistent arena to be PS/SP friendly. + If this is an ALTER VIEW then the current user should be set as + the definer. */ if (!(lex->definer= create_default_definer(thd))) { - WSREP_WARN("view default definer issue"); + WSREP_WARN("view default definer issue"); } } @@ -1071,7 +1149,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) List_iterator_fast<LEX_STRING> names(lex->view_list); LEX_STRING *name; int i; - + for (i= 0; (name= names++); i++) { buff.append(i ? ", " : "("); @@ -1082,7 +1160,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) buff.append(STRING_WITH_LEN(" AS ")); //buff.append(views->source.str, views->source.length); buff.append(thd->lex->create_view_select.str, - thd->lex->create_view_select.length); + thd->lex->create_view_select.length); //int errcode= query_error_code(thd, TRUE); //if (thd->binlog_query(THD::STMT_QUERY_TYPE, // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod @@ -1094,11 +1172,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, { wsrep_status_t ret(WSREP_WARNING); uchar* buf(0); - uint buf_len(0); + size_t buf_len(0); int buf_err; - WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode, thd->query() ); + WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), + thd->wsrep_exec_mode, thd->query() ); switch (thd->lex->sql_command) { case SQLCOM_CREATE_VIEW: @@ -1121,19 +1199,20 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, } wsrep_key_arr_t key_arr= {0, 0}; + struct wsrep_buf buff = { buf, buf_len }; if (!buf_err && wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, key_arr.keys, key_arr.keys_len, - buf, buf_len, - &thd->wsrep_trx_seqno))) + &buff, 1, + &thd->wsrep_trx_meta))) { thd->wsrep_exec_mode= TOTAL_ORDER; wsrep_to_isolation++; if (buf) my_free(buf); wsrep_keys_free(&key_arr); - WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode); + WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd), + thd->wsrep_exec_mode); } else { /* jump to error handler in mysql_execute_command() */ @@ -1152,10 +1231,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, static void wsrep_TOI_end(THD *thd) { wsrep_status_t ret; wsrep_to_isolation--; - WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + + WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void"); if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { - WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno); + WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd)); } else { WSREP_WARN("TO isolation end failed for: %d, sql: %s", @@ -1166,7 +1246,7 @@ static void wsrep_TOI_end(THD *thd) { static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) { wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); ret = wsrep->desync(wsrep); @@ -1211,7 +1291,7 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) static void wsrep_RSU_end(THD *thd) { wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); @@ -1255,13 +1335,27 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, mysql_mutex_unlock(&thd->LOCK_wsrep_thd); DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE); - DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED); + DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED); if (wsrep_debug && thd->mdl_context.has_locks()) { WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu", thd->query(), thd->thread_id); } + + /* + It makes sense to set auto_increment_* to defaults in TOI operations. + Must be done before wsrep_TOI_begin() since Query_log_event encapsulating + TOI statement and auto inc variables for wsrep replication is constructed + there. Variables are reset back in THD::reset_for_next_command() before + processing of next command. + */ + if (wsrep_auto_increment_control) + { + thd->variables.auto_increment_offset = 1; + thd->variables.auto_increment_increment = 1; + } + if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) { switch (wsrep_OSU_method_options) { @@ -1272,12 +1366,6 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, if (!ret) { thd->wsrep_exec_mode= TOTAL_ORDER; - /* It makes sense to set auto_increment_* to defaults in TOI operations */ - if (wsrep_auto_increment_control) - { - thd->variables.auto_increment_offset = 1; - thd->variables.auto_increment_increment = 1; - } } } return ret; @@ -1302,10 +1390,10 @@ void wsrep_to_isolation_end(THD *thd) "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ msg, \ - req->thread_id, (long long)req->wsrep_trx_seqno, \ + req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ req->command, req->lex->sql_command, req->query(), \ - gra->thread_id, (long long)gra->wsrep_trx_seqno, \ + gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->command, gra->lex->sql_command, gra->query()); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 619711cc4b0..815990ba9d4 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -1,4 +1,4 @@ -/* Copyright 2008-2012 Codership Oy <http://www.codership.com> +/* Copyright 2008-2013 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,23 +26,22 @@ typedef struct st_mysql_show_var SHOW_VAR; class set_var; class THD; -#ifdef WITH_WSREP -#include "../wsrep/wsrep_api.h" -//#include "wsrep_mysqld.h" - enum wsrep_exec_mode { +enum wsrep_exec_mode { LOCAL_STATE, REPL_RECV, TOTAL_ORDER, LOCAL_COMMIT - }; - enum wsrep_query_state { +}; + +enum wsrep_query_state { QUERY_IDLE, QUERY_EXEC, QUERY_COMMITTING, QUERY_EXITING, QUERY_ROLLINGBACK, - }; - enum wsrep_conflict_state { +}; + +enum wsrep_conflict_state { NO_CONFLICT, MUST_ABORT, ABORTING, @@ -51,13 +50,14 @@ class THD; REPLAYING, RETRY_AUTOCOMMIT, CERT_FAILURE, - }; - enum wsrep_consistency_check_mode { +}; + +enum wsrep_consistency_check_mode { NO_CONSISTENCY_CHECK, CONSISTENCY_CHECK_DECLARED, CONSISTENCY_CHECK_RUNNING, - }; -#endif +}; + // Global wsrep parameters extern wsrep_t* wsrep; @@ -73,20 +73,16 @@ extern const char* wsrep_node_incoming_address; extern const char* wsrep_data_home_dir; extern const char* wsrep_dbug_option; extern long wsrep_slave_threads; -extern my_bool wsrep_debug; +extern int wsrep_slave_count_change; +extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug; extern my_bool wsrep_convert_LOCK_to_trx; extern ulong wsrep_retry_autocommit; extern my_bool wsrep_auto_increment_control; extern my_bool wsrep_drupal_282555_workaround; extern my_bool wsrep_incremental_data_collection; -extern const char* wsrep_sst_method; -extern const char* wsrep_sst_receive_address; -extern char* wsrep_sst_auth; -extern const char* wsrep_sst_donor; -extern my_bool wsrep_sst_donor_rejects_queries; extern const char* wsrep_start_position; -extern long long wsrep_max_ws_size; -extern long wsrep_max_ws_rows; +extern ulong wsrep_max_ws_size; +extern ulong wsrep_max_ws_rows; extern const char* wsrep_notify_cmd; extern my_bool wsrep_certify_nonPK; extern long wsrep_max_protocol_version; @@ -98,7 +94,6 @@ extern my_bool wsrep_recovery; extern my_bool wsrep_replicate_myisam; extern my_bool wsrep_log_conflicts; extern ulong wsrep_mysql_replication_bundle; -extern ulong wsrep_mysql_replication_bundle; extern my_bool wsrep_load_data_splitting; enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU }; @@ -111,81 +106,29 @@ extern long long wsrep_cluster_conf_id; extern const char* wsrep_cluster_status; extern long wsrep_cluster_size; extern long wsrep_local_index; +extern long long wsrep_local_bf_aborts; extern const char* wsrep_provider_name; extern const char* wsrep_provider_version; extern const char* wsrep_provider_vendor; -extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff); -extern void wsrep_free_status(THD *thd); // Other wsrep global variables extern my_bool wsrep_inited; // whether wsrep is initialized ? -#define WSREP_SST_ADDRESS_AUTO "AUTO" -#define WSREP_NODE_INCOMING_AUTO "AUTO" - -// MySQL variables funcs - -#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var) -#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type) -#define DEFAULT_ARGS (THD* thd, enum_var_type var_type) -#define INIT_ARGS (const char* opt) - -extern int wsrep_init_vars(); - -extern bool wsrep_on_update UPDATE_ARGS; -extern void wsrep_causal_reads_update UPDATE_ARGS; -extern bool wsrep_start_position_check CHECK_ARGS; -extern bool wsrep_start_position_update UPDATE_ARGS; -extern void wsrep_start_position_init INIT_ARGS; - -extern bool wsrep_provider_check CHECK_ARGS; -extern bool wsrep_provider_update UPDATE_ARGS; -extern void wsrep_provider_init INIT_ARGS; - -extern bool wsrep_provider_options_check CHECK_ARGS; -extern bool wsrep_provider_options_update UPDATE_ARGS; -extern void wsrep_provider_options_init INIT_ARGS; - -extern bool wsrep_cluster_address_check CHECK_ARGS; -extern bool wsrep_cluster_address_update UPDATE_ARGS; -extern void wsrep_cluster_address_init INIT_ARGS; - -extern bool wsrep_cluster_name_check CHECK_ARGS; -extern bool wsrep_cluster_name_update UPDATE_ARGS; - -extern bool wsrep_node_name_check CHECK_ARGS; -extern bool wsrep_node_name_update UPDATE_ARGS; - -extern bool wsrep_node_address_check CHECK_ARGS; -extern bool wsrep_node_address_update UPDATE_ARGS; -extern void wsrep_node_address_init INIT_ARGS; +int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff); +void wsrep_free_status(THD *thd); -extern bool wsrep_sst_method_check CHECK_ARGS; -extern bool wsrep_sst_method_update UPDATE_ARGS; -extern void wsrep_sst_method_init INIT_ARGS; - -extern bool wsrep_sst_receive_address_check CHECK_ARGS; -extern bool wsrep_sst_receive_address_update UPDATE_ARGS; - -extern bool wsrep_sst_auth_check CHECK_ARGS; -extern bool wsrep_sst_auth_update UPDATE_ARGS; -extern void wsrep_sst_auth_init INIT_ARGS; - -extern bool wsrep_sst_donor_check CHECK_ARGS; -extern bool wsrep_sst_donor_update UPDATE_ARGS; - -extern bool wsrep_slave_threads_check CHECK_ARGS; -extern bool wsrep_slave_threads_update UPDATE_ARGS; - -extern bool wsrep_desync_check CHECK_ARGS; -extern bool wsrep_desync_update UPDATE_ARGS; - -extern bool wsrep_before_SE(); // initialize wsrep before storage - // engines (true) or after (false) -extern int wsrep_init(); -extern void wsrep_deinit(); -extern void wsrep_recover(); +/* Filters out --wsrep-new-cluster oprtion from argv[] + * should be called in the very beginning of main() */ +void wsrep_filter_new_cluster (int* argc, char* argv[]); +int wsrep_init(); +void wsrep_deinit(); +void wsrep_recover(); +bool wsrep_before_SE(); // initialize wsrep before storage + // engines (true) or after (false) +/* wsrep initialization sequence at startup + * @param before wsrep_before_SE() value */ +void wsrep_init_startup(bool before); extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd); @@ -194,18 +137,18 @@ extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd); extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); extern "C" const char * wsrep_thd_query_state_str(THD *thd); -extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); +extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd); extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); extern "C" void wsrep_thd_set_query_state( - THD *thd, enum wsrep_query_state state); + THD *thd, enum wsrep_query_state state); extern "C" void wsrep_thd_set_conflict_state( - THD *thd, enum wsrep_conflict_state state); + THD *thd, enum wsrep_conflict_state state); extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id); -extern "C"void wsrep_thd_LOCK(THD *thd); -extern "C"void wsrep_thd_UNLOCK(THD *thd); +extern "C" void wsrep_thd_LOCK(THD *thd); +extern "C" void wsrep_thd_UNLOCK(THD *thd); extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd); extern "C" time_t wsrep_thd_query_start(THD *thd); extern "C" my_thread_id wsrep_thd_thread_id(THD *thd); @@ -217,18 +160,11 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id); extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal); - -/* wsrep initialization sequence at startup - * @param first wsrep_before_SE() value */ -extern void wsrep_init_startup(bool before); - extern void wsrep_close_client_connections(my_bool wait_to_end); extern int wsrep_wait_committing_connections_close(int wait_time); extern void wsrep_close_applier(THD *thd); extern void wsrep_wait_appliers_close(THD *thd); extern void wsrep_close_applier_threads(int count); -extern void wsrep_create_appliers(long threads = wsrep_slave_threads); -extern void wsrep_create_rollbacker(); extern void wsrep_kill_mysql(THD *thd); /* new defines */ @@ -285,32 +221,20 @@ extern wsrep_seqno_t wsrep_locked_seqno; WSREP_LOG(sql_print_information, "cluster conflict due to %s for threads:",\ (bf_abort) ? "high priority abort" : "certification failure" \ ); \ - if (bf_thd != NULL) WSREP_LOG_CONFLICT_THD(bf_thd, "Winning thread"); \ + if (bf_thd) WSREP_LOG_CONFLICT_THD(bf_thd, "Winning thread"); \ if (victim_thd) WSREP_LOG_CONFLICT_THD(victim_thd, "Victim thread"); \ } #define WSREP_PROVIDER_EXISTS \ (wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN)) -/*! Synchronizes applier thread start with init thread */ -extern void wsrep_sst_grab(); -/*! Init thread waits for SST completion */ -extern bool wsrep_sst_wait(); -/*! Signals wsrep that initialization is complete, writesets can be applied */ -extern void wsrep_sst_continue(); - -extern void wsrep_SE_init_grab(); /*! grab init critical section */ -extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */ -extern void wsrep_SE_init_done(); /*! signal that SE init is complte */ -extern void wsrep_SE_initialized(); /*! mark SE initialization complete */ - extern void wsrep_ready_wait(); enum wsrep_trx_status { WSREP_TRX_OK, WSREP_TRX_ROLLBACK, WSREP_TRX_ERROR, - }; +}; extern enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); @@ -318,17 +242,10 @@ class Ha_trx_info; struct THD_TRANS; void wsrep_register_hton(THD* thd, bool all); void wsrep_post_commit(THD* thd, bool all); -void wsrep_replication_process(THD *thd); -void wsrep_rollback_process(THD *thd); void wsrep_brute_force_killer(THD *thd); int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id); + extern "C" bool wsrep_consistency_check(void *thd_ptr); -extern "C" int wsrep_thd_is_brute_force(void *thd_ptr); -extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, - my_bool signal); -extern "C" int wsrep_thd_in_locking_session(void *thd_ptr); -void *wsrep_prepare_bf_thd(THD *thd); -void wsrep_return_from_bf_mode(void *shadow, THD *thd); /* this is visible for client build so that innodb plugin gets this */ typedef struct wsrep_aborting_thd { @@ -337,29 +254,21 @@ typedef struct wsrep_aborting_thd { } *wsrep_aborting_thd_t; extern mysql_mutex_t LOCK_wsrep_ready; -extern mysql_cond_t COND_wsrep_ready; +extern mysql_cond_t COND_wsrep_ready; extern mysql_mutex_t LOCK_wsrep_sst; -extern mysql_cond_t COND_wsrep_sst; +extern mysql_cond_t COND_wsrep_sst; extern mysql_mutex_t LOCK_wsrep_sst_init; -extern mysql_cond_t COND_wsrep_sst_init; +extern mysql_cond_t COND_wsrep_sst_init; extern mysql_mutex_t LOCK_wsrep_rollback; -extern mysql_cond_t COND_wsrep_rollback; +extern mysql_cond_t COND_wsrep_rollback; extern int wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_replaying; -extern mysql_cond_t COND_wsrep_replaying; -extern wsrep_aborting_thd_t wsrep_aborting_thd; -extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug; -extern my_bool wsrep_convert_LOCK_to_trx; -extern ulong wsrep_retry_autocommit; -extern my_bool wsrep_emulate_bin_log; -extern my_bool wsrep_auto_increment_control; -extern my_bool wsrep_drupal_282555_workaround; -extern long long wsrep_max_ws_size; -extern long wsrep_max_ws_rows; -extern int wsrep_to_isolation; -extern my_bool wsrep_certify_nonPK; +extern mysql_cond_t COND_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_mutex_t LOCK_wsrep_desync; +extern wsrep_aborting_thd_t wsrep_aborting_thd; +extern my_bool wsrep_emulate_bin_log; +extern int wsrep_to_isolation; extern PSI_mutex_key key_LOCK_wsrep_ready; extern PSI_mutex_key key_COND_wsrep_ready; @@ -381,13 +290,11 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, const TABLE_LIST* table_list); void wsrep_to_isolation_end(THD *thd); void wsrep_cleanup_transaction(THD *thd); -void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*); -void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*); int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len); -int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len); -int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len); -int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len); + THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len); +int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len); +int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len); +int wsrep_create_event_query(THD *thd, uchar** buf, size_t* buf_len); const wsrep_uuid_t* wsrep_cluster_uuid(); struct xid_t; diff --git a/sql/wsrep_notify.cc b/sql/wsrep_notify.cc index ff997d01183..291cdbb7c75 100644 --- a/sql/wsrep_notify.cc +++ b/sql/wsrep_notify.cc @@ -15,6 +15,7 @@ #include <mysqld.h> #include "wsrep_priv.h" +#include "wsrep_utils.h" const char* wsrep_notify_cmd=""; @@ -64,7 +65,7 @@ void wsrep_notify_status (wsrep_member_status_t status, { char uuid_str[40]; - wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str)); + wsrep_uuid_print (&view->state_id.uuid, uuid_str, sizeof(uuid_str)); cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --uuid %s", uuid_str); diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h index 700639ebcb1..93640fbcc03 100644 --- a/sql/wsrep_priv.h +++ b/sql/wsrep_priv.h @@ -26,208 +26,26 @@ #include <pthread.h> #include <cstdio> -extern void wsrep_ready_set (my_bool x); +void wsrep_ready_set (my_bool x); -extern ssize_t wsrep_sst_prepare (void** msg); -extern int wsrep_sst_donate_cb (void* app_ctx, - void* recv_ctx, - const void* msg, size_t msg_len, - const wsrep_uuid_t* current_uuid, - wsrep_seqno_t current_seqno, - const char* state, size_t state_len, - bool bypass); - -extern size_t guess_ip (char* buf, size_t buf_len); -extern size_t guess_address(char* buf, size_t buf_len); +ssize_t wsrep_sst_prepare (void** msg); +wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, + void* recv_ctx, + const void* msg, size_t msg_len, + const wsrep_gtid_t* state_id, + const char* state, size_t state_len, + bool bypass); extern wsrep_uuid_t local_uuid; extern wsrep_seqno_t local_seqno; +// a helper function +void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t, + const void*, size_t); /*! SST thread signals init thread about sst completion */ -extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool); - -extern void wsrep_notify_status (wsrep_member_status_t new_status, - const wsrep_view_info_t* view = 0); - -namespace wsp { -class node_status -{ -public: - node_status() : status(WSREP_MEMBER_UNDEFINED) {} - void set(wsrep_member_status_t new_status, - const wsrep_view_info_t* view = 0) - { - if (status != new_status || 0 != view) - { - wsrep_notify_status(new_status, view); - status = new_status; - } - } - wsrep_member_status_t get() const { return status; } -private: - wsrep_member_status_t status; -}; -} /* namespace wsp */ - -extern wsp::node_status local_status; - -namespace wsp { -/* A small class to run external programs. */ -class process -{ -private: - const char* const str_; - FILE* io_; - int err_; - pid_t pid_; - -public: -/*! @arg type is a pointer to a null-terminated string which must contain - either the letter 'r' for reading or the letter 'w' for writing. - */ - process (const char* cmd, const char* type); - ~process (); - - FILE* pipe () { return io_; } - int error() { return err_; } - int wait (); - const char* cmd() { return str_; } -}; -#ifdef REMOVED -class lock -{ - pthread_mutex_t* const mtx_; - -public: - - lock (pthread_mutex_t* mtx) : mtx_(mtx) - { - int err = pthread_mutex_lock (mtx_); - - if (err) - { - WSREP_ERROR("Mutex lock failed: %s", strerror(err)); - abort(); - } - } - - virtual ~lock () - { - int err = pthread_mutex_unlock (mtx_); - - if (err) - { - WSREP_ERROR("Mutex unlock failed: %s", strerror(err)); - abort(); - } - } - - inline void wait (pthread_cond_t* cond) - { - pthread_cond_wait (cond, mtx_); - } - -private: - - lock (const lock&); - lock& operator=(const lock&); - -}; - -class monitor -{ - int mutable refcnt; - pthread_mutex_t mutable mtx; - pthread_cond_t mutable cond; - -public: - - monitor() : refcnt(0) - { - pthread_mutex_init (&mtx, NULL); - pthread_cond_init (&cond, NULL); - } - - ~monitor() - { - pthread_mutex_destroy (&mtx); - pthread_cond_destroy (&cond); - } - - void enter() const - { - lock l(&mtx); - - while (refcnt) - { - l.wait(&cond); - } - refcnt++; - } - - void leave() const - { - lock l(&mtx); - - refcnt--; - if (refcnt == 0) - { - pthread_cond_signal (&cond); - } - } - -private: - - monitor (const monitor&); - monitor& operator= (const monitor&); -}; - -class critical -{ - const monitor& mon; - -public: - - critical(const monitor& m) : mon(m) { mon.enter(); } - - ~critical() { mon.leave(); } - -private: - - critical (const critical&); - critical& operator= (const critical&); -}; -#endif - -class thd -{ - class thd_init - { - public: - thd_init() { my_thread_init(); } - ~thd_init() { my_thread_end(); } - } - init; - - thd (const thd&); - thd& operator= (const thd&); - -public: - - thd(my_bool wsrep_on); - ~thd(); - THD* const ptr; -}; +void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); -class string -{ -public: - string() : string_(0) {} - void set(char* str) { if (string_) free (string_); string_ = str; } - ~string() { set (0); } -private: - char* string_; -}; +void wsrep_notify_status (wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0); -} // namespace wsrep #endif /* WSREP_PRIV_H */ diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index 7afdb4909e4..ec636a8b1ec 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -13,6 +13,8 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include "wsrep_sst.h" + #include <mysqld.h> #include <m_ctype.h> #include <my_sys.h> @@ -23,6 +25,7 @@ #include <sql_reload.h> #include <sql_parse.h> #include "wsrep_priv.h" +#include "wsrep_utils.h" #include <cstdio> #include <cstdlib> @@ -61,7 +64,6 @@ const char* wsrep_sst_donor = ""; // container for real auth string static const char* sst_auth_real = NULL; - my_bool wsrep_sst_donor_rejects_queries = FALSE; bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) @@ -214,8 +216,8 @@ bool wsrep_sst_wait () // Signal end of SST void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, - wsrep_seqno_t sst_seqno, - bool needed) + wsrep_seqno_t sst_seqno, + bool needed) { if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); if (!sst_complete) @@ -228,18 +230,37 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, } else { - WSREP_WARN("Nobody is waiting for SST."); + /* This can happen when called from wsrep_synced_cb(). + At the moment there is no way to check there + if main thread is still waiting for signal, + so wsrep_sst_complete() is called from there + each time wsrep_ready changes from FALSE -> TRUE. + */ + WSREP_DEBUG("Nobody is waiting for SST."); } mysql_mutex_unlock (&LOCK_wsrep_sst); } +void wsrep_sst_received (wsrep_t* const wsrep, + const wsrep_uuid_t* const uuid, + wsrep_seqno_t const seqno, + const void* const state, + size_t const state_len) +{ + int const rcode(seqno < 0 ? seqno : 0); + wsrep_gtid_t const state_id = { + *uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno) + }; + wsrep->sst_received(wsrep, &state_id, state, state_len, rcode); +} + // Let applier threads to continue void wsrep_sst_continue () { if (sst_needed) { WSREP_INFO("Signalling provider to continue."); - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); } } @@ -519,7 +540,7 @@ ssize_t wsrep_sst_prepare (void** msg) } else { - ssize_t ret= guess_ip (ip_buf, ip_max); + ssize_t ret= wsrep_guess_ip (ip_buf, ip_max); if (ret && ret < ip_max) { @@ -707,7 +728,9 @@ static int sst_donate_mysqldump (const char* addr, ret= sst_run_shell (cmd_str, 3); } - wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno); + wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)}; + + wsrep->sst_sent (wsrep, &state_id, ret); return ret; } @@ -927,7 +950,10 @@ wait_signal: } // signal to donor that SST is over - wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno); + struct wsrep_gtid const state_id = { + ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno + }; + wsrep->sst_sent (wsrep, &state_id, -err); proc.wait(); return NULL; @@ -981,12 +1007,11 @@ static int sst_donate_other (const char* method, return arg.err; } -int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, - const void* msg, size_t msg_len, - const wsrep_uuid_t* current_uuid, - wsrep_seqno_t current_seqno, - const char* state, size_t state_len, - bool bypass) +wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, + const void* msg, size_t msg_len, + const wsrep_gtid_t* current_gtid, + const char* state, size_t state_len, + bool bypass) { /* This will be reset when sync callback is called. * Should we set wsrep_ready to FALSE here too? */ @@ -998,20 +1023,20 @@ int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const char* data = method + method_len + 1; char uuid_str[37]; - wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str)); + wsrep_uuid_print (¤t_gtid->uuid, uuid_str, sizeof(uuid_str)); int ret; if (!strcmp (WSREP_SST_MYSQLDUMP, method)) { - ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno, - bypass); + ret = sst_donate_mysqldump(data, ¤t_gtid->uuid, uuid_str, + current_gtid->seqno, bypass); } else { - ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass); + ret = sst_donate_other(method, data, uuid_str, current_gtid->seqno,bypass); } - return (ret > 0 ? 0 : ret); + return (ret > 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE); } void wsrep_SE_init_grab() @@ -1021,7 +1046,10 @@ void wsrep_SE_init_grab() void wsrep_SE_init_wait() { - mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init); + while (SE_initialized == false) + { + mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init); + } mysql_mutex_unlock (&LOCK_wsrep_sst_init); } diff --git a/sql/wsrep_sst.h b/sql/wsrep_sst.h new file mode 100644 index 00000000000..b7f0e26f226 --- /dev/null +++ b/sql/wsrep_sst.h @@ -0,0 +1,40 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_SST_H +#define WSREP_SST_H + +#include <mysql.h> // my_bool + +/* system variables */ +extern const char* wsrep_sst_method; +extern const char* wsrep_sst_receive_address; +extern const char* wsrep_sst_donor; +extern char* wsrep_sst_auth; +extern my_bool wsrep_sst_donor_rejects_queries; + +/*! Synchronizes applier thread start with init thread */ +extern void wsrep_sst_grab(); +/*! Init thread waits for SST completion */ +extern bool wsrep_sst_wait(); +/*! Signals wsrep that initialization is complete, writesets can be applied */ +extern void wsrep_sst_continue(); + +extern void wsrep_SE_init_grab(); /*! grab init critical section */ +extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */ +extern void wsrep_SE_init_done(); /*! signal that SE init is complte */ +extern void wsrep_SE_initialized(); /*! mark SE initialization complete */ + +#endif /* WSREP_SST_H */ diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc new file mode 100644 index 00000000000..d9c30e501e6 --- /dev/null +++ b/sql/wsrep_thd.cc @@ -0,0 +1,464 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#include "wsrep_thd.h" + +#include "transaction.h" +#include "rpl_rli.h" +#include "log_event.h" +#include "sql_parse.h" +#include "slave.h" // opt_log_slave_updates +#include "sql_base.h" // close_thread_tables() +#include "mysqld.h" // start_wsrep_THD(); + +static long long wsrep_bf_aborts_counter = 0; + +int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff) +{ + wsrep_local_bf_aborts = my_atomic_load64(&wsrep_bf_aborts_counter); + var->type = SHOW_LONGLONG; + var->value = (char*)&wsrep_local_bf_aborts; + return 0; +} + +/* must have (&thd->LOCK_wsrep_thd) */ +void wsrep_client_rollback(THD *thd) +{ + WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s", + thd->thread_id, thd->query()); + + my_atomic_add64(&wsrep_bf_aborts_counter, 1); + + thd->wsrep_conflict_state= ABORTING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + trans_rollback(thd); + + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + + if (thd->global_read_lock.is_acquired()) + { + WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id); + thd->global_read_lock.unlock_global_read_lock(thd); + } + + /* Release transactional metadata locks. */ + thd->mdl_context.release_transactional_locks(); + + /* release explicit MDL locks */ + thd->mdl_context.release_explicit_locks(); + + if (thd->get_binlog_table_maps()) + { + WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id); + thd->clear_binlog_table_maps(); + } + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + thd->wsrep_conflict_state= ABORTED; +} + +static Relay_log_info* wsrep_relay_log_init(const char* log_fname) +{ + Relay_log_info* rli= new Relay_log_info(false); + + rli->no_storage= true; + if (!rli->relay_log.description_event_for_exec) + { + rli->relay_log.description_event_for_exec= + new Format_description_log_event(4); + } + + rli->sql_thd= current_thd; + return rli; +} + +static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) +{ + shadow->options = thd->variables.option_bits; + shadow->server_status = thd->server_status; + shadow->wsrep_exec_mode = thd->wsrep_exec_mode; + shadow->vio = thd->net.vio; + + if (opt_log_slave_updates) + thd->variables.option_bits|= OPTION_BIN_LOG; + else + thd->variables.option_bits&= ~(OPTION_BIN_LOG); + + if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay"); + + thd->wsrep_exec_mode= REPL_RECV; + thd->net.vio= 0; + thd->clear_error(); + + shadow->tx_isolation = thd->variables.tx_isolation; + thd->variables.tx_isolation = ISO_READ_COMMITTED; + thd->tx_isolation = ISO_READ_COMMITTED; + + shadow->db = thd->db; + shadow->db_length = thd->db_length; + thd->reset_db(NULL, 0); +} + +static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) +{ + thd->variables.option_bits = shadow->options; + thd->server_status = shadow->server_status; + thd->wsrep_exec_mode = shadow->wsrep_exec_mode; + thd->net.vio = shadow->vio; + thd->variables.tx_isolation = shadow->tx_isolation; + thd->reset_db(shadow->db, shadow->db_length); +} + +void wsrep_replay_transaction(THD *thd) +{ + /* checking if BF trx must be replayed */ + if (thd->wsrep_conflict_state== MUST_REPLAY) { + if (thd->wsrep_exec_mode!= REPL_RECV) { + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay issue, thd has reported status already"); + } + thd->stmt_da->reset_diagnostics_area(); + + thd->wsrep_conflict_state= REPLAYING; + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + + mysql_reset_thd_for_next_command(thd); + thd->killed= NOT_KILLED; + close_thread_tables(thd); + if (thd->locked_tables_mode && thd->lock) + { + WSREP_DEBUG("releasing table lock for replaying (%ld)", + thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + } + thd->mdl_context.release_transactional_locks(); + + thd_proc_info(thd, "wsrep replaying trx"); + WSREP_DEBUG("replay trx: %s %lld", + thd->query() ? thd->query() : "void", + (long long)wsrep_thd_trx_seqno(thd)); + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + + /* From trans_begin() */ + thd->variables.option_bits|= OPTION_BEGIN; + thd->server_status|= SERVER_STATUS_IN_TRANS; + + int rcode = wsrep->replay_trx(wsrep, + &thd->wsrep_ws_handle, + (void *)thd); + + wsrep_return_from_bf_mode(thd, &shadow); + if (thd->wsrep_conflict_state!= REPLAYING) + WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); + + mysql_mutex_lock(&thd->LOCK_wsrep_thd); + + switch (rcode) + { + case WSREP_OK: + thd->wsrep_conflict_state= NO_CONFLICT; + wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); + WSREP_DEBUG("trx_replay successful for: %ld %llu", + thd->thread_id, (long long)thd->real_id); + if (thd->stmt_da->is_sent) + { + WSREP_WARN("replay ok, thd has reported status"); + } + else if (thd->stmt_da->is_set()) + { + if (thd->stmt_da->status() != Diagnostics_area::DA_OK) + { + WSREP_WARN("replay ok, thd has error status %d", + thd->stmt_da->status()); + } + } + else + { + my_ok(thd); + } + break; + case WSREP_TRX_FAIL: + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay failed, thd has reported status"); + } + else + { + WSREP_DEBUG("replay failed, rolling back"); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + } + thd->wsrep_conflict_state= ABORTED; + wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle); + break; + default: + WSREP_ERROR("trx_replay failed for: %d, query: %s", + rcode, thd->query() ? thd->query() : "void"); + /* we're now in inconsistent state, must abort */ + unireg_abort(1); + break; + } + + wsrep_cleanup_transaction(thd); + + mysql_mutex_lock(&LOCK_wsrep_replaying); + wsrep_replaying--; + WSREP_DEBUG("replaying decreased: %d, thd: %lu", + wsrep_replaying, thd->thread_id); + mysql_cond_broadcast(&COND_wsrep_replaying); + mysql_mutex_unlock(&LOCK_wsrep_replaying); + } + } +} + +static void wsrep_replication_process(THD *thd) +{ + int rcode; + DBUG_ENTER("wsrep_replication_process"); + + struct wsrep_thd_shadow shadow; + wsrep_prepare_bf_thd(thd, &shadow); + + /* From trans_begin() */ + thd->variables.option_bits|= OPTION_BEGIN; + thd->server_status|= SERVER_STATUS_IN_TRANS; + + rcode = wsrep->recv(wsrep, (void *)thd); + DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode)); + + WSREP_INFO("applier thread exiting (code:%d)", rcode); + + switch (rcode) { + case WSREP_OK: + case WSREP_NOT_IMPLEMENTED: + case WSREP_CONN_FAIL: + /* provider does not support slave operations / disconnected from group, + * just close applier thread */ + break; + case WSREP_NODE_FAIL: + /* data inconsistency => SST is needed */ + /* Note: we cannot just blindly restart replication here, + * SST might require server restart if storage engines must be + * initialized after SST */ + WSREP_ERROR("node consistency compromised, aborting"); + wsrep_kill_mysql(thd); + break; + case WSREP_WARNING: + case WSREP_TRX_FAIL: + case WSREP_TRX_MISSING: + /* these suggests a bug in provider code */ + WSREP_WARN("bad return from recv() call: %d", rcode); + /* fall through to node shutdown */ + case WSREP_FATAL: + /* Cluster connectivity is lost. + * + * If applier was killed on purpose (KILL_CONNECTION), we + * avoid mysql shutdown. This is because the killer will then handle + * shutdown processing (or replication restarting) + */ + if (thd->killed != KILL_CONNECTION) + { + wsrep_kill_mysql(thd); + } + break; + } + + mysql_mutex_lock(&LOCK_thread_count); + wsrep_close_applier(thd); + mysql_cond_broadcast(&COND_thread_count); + mysql_mutex_unlock(&LOCK_thread_count); + + if (thd->temporary_tables) + { + WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id); + } + wsrep_return_from_bf_mode(thd, &shadow); + DBUG_VOID_RETURN; +} + +void wsrep_create_appliers(long threads) +{ + if (!wsrep_connected) + { + /* see wsrep_replication_start() for the logic */ + if (wsrep_cluster_address && strlen(wsrep_cluster_address) && + wsrep_provider && strcasecmp(wsrep_provider, "none")) + { + WSREP_ERROR("Trying to launch slave threads before creating " + "connection at '%s'", wsrep_cluster_address); + assert(0); + } + return; + } + + long wsrep_threads=0; + pthread_t hThread; + while (wsrep_threads++ < threads) { + if (pthread_create( + &hThread, &connection_attrib, + start_wsrep_THD, (void*)wsrep_replication_process)) + WSREP_WARN("Can't create thread to manage wsrep replication"); + } +} + +static void wsrep_rollback_process(THD *thd) +{ + DBUG_ENTER("wsrep_rollback_process"); + + mysql_mutex_lock(&LOCK_wsrep_rollback); + wsrep_aborting_thd= NULL; + + while (thd->killed == NOT_KILLED) { + thd_proc_info(thd, "wsrep aborter idle"); + thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; + thd->mysys_var->current_cond= &COND_wsrep_rollback; + + mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback); + + WSREP_DEBUG("WSREP rollback thread wakes for signal"); + + mysql_mutex_lock(&thd->mysys_var->mutex); + thd_proc_info(thd, "wsrep aborter active"); + thd->mysys_var->current_mutex= 0; + thd->mysys_var->current_cond= 0; + mysql_mutex_unlock(&thd->mysys_var->mutex); + + /* check for false alarms */ + if (!wsrep_aborting_thd) + { + WSREP_DEBUG("WSREP rollback thread has empty abort queue"); + } + /* process all entries in the queue */ + while (wsrep_aborting_thd) { + THD *aborting; + wsrep_aborting_thd_t next = wsrep_aborting_thd->next; + aborting = wsrep_aborting_thd->aborting_thd; + my_free(wsrep_aborting_thd); + wsrep_aborting_thd= next; + /* + * must release mutex, appliers my want to add more + * aborting thds in our work queue, while we rollback + */ + mysql_mutex_unlock(&LOCK_wsrep_rollback); + + mysql_mutex_lock(&aborting->LOCK_wsrep_thd); + if (aborting->wsrep_conflict_state== ABORTED) + { + WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", + (long long)aborting->real_id, + aborting->wsrep_conflict_state); + + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + mysql_mutex_lock(&LOCK_wsrep_rollback); + continue; + } + aborting->wsrep_conflict_state= ABORTING; + + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + + aborting->store_globals(); + + mysql_mutex_lock(&aborting->LOCK_wsrep_thd); + wsrep_client_rollback(aborting); + WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)", + aborting->thread_id, (long long)aborting->real_id); + mysql_mutex_unlock(&aborting->LOCK_wsrep_thd); + + mysql_mutex_lock(&LOCK_wsrep_rollback); + } + } + + mysql_mutex_unlock(&LOCK_wsrep_rollback); + sql_print_information("WSREP: rollbacker thread exiting"); + + DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting")); + DBUG_VOID_RETURN; +} + +void wsrep_create_rollbacker() +{ + if (wsrep_provider && strcasecmp(wsrep_provider, "none")) + { + pthread_t hThread; + /* create rollbacker */ + if (pthread_create( &hThread, &connection_attrib, + start_wsrep_THD, (void*)wsrep_rollback_process)) + WSREP_WARN("Can't create thread to manage wsrep rollback"); + } +} + +extern "C" +int wsrep_thd_is_brute_force(void *thd_ptr) +{ + /* + Brute force: + Appliers and replaying are running in REPL_RECV mode. TOI statements + in TOTAL_ORDER mode. Locally committing transaction that has got + past wsrep->pre_commit() without error is running in LOCAL_COMMIT mode. + + Everything else is running in LOCAL_STATE and should not be considered + brute force. + */ + if (thd_ptr) { + switch (((THD *)thd_ptr)->wsrep_exec_mode) { + case LOCAL_STATE: return 0; + case REPL_RECV: return 1; + case TOTAL_ORDER: return 2; + case LOCAL_COMMIT: return 3; + } + } + DBUG_ASSERT(0); + return 0; +} + +extern "C" +int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) +{ + THD *victim_thd = (THD *) victim_thd_ptr; + THD *bf_thd = (THD *) bf_thd_ptr; + DBUG_ENTER("wsrep_abort_thd"); + + if ( (WSREP(bf_thd) || + ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) && + bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && + victim_thd) + { + WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ? + (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id); + ha_wsrep_abort_transaction(bf_thd, victim_thd, signal); + } + else + { + WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd); + } + + DBUG_RETURN(1); +} + +extern "C" +int wsrep_thd_in_locking_session(void *thd_ptr) +{ + if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) { + return 1; + } + return 0; +} + diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h new file mode 100644 index 00000000000..bded13b5684 --- /dev/null +++ b/sql/wsrep_thd.h @@ -0,0 +1,32 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_THD_H +#define WSREP_THD_H + +#include "sql_class.h" + +int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff); +void wsrep_client_rollback(THD *thd); +void wsrep_replay_transaction(THD *thd); +void wsrep_create_appliers(long threads); +void wsrep_create_rollbacker(); + +extern "C" int wsrep_thd_is_brute_force(void *thd_ptr); +extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, + my_bool signal); +extern "C" int wsrep_thd_in_locking_session(void *thd_ptr); + +#endif /* WSREP_THD_H */ diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index 53d0f7c449e..90af2fb8156 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -14,20 +14,25 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -//! @file declares symbols private to wsrep integration layer +//! @file some utility functions and classes not directly related to replication #ifndef _GNU_SOURCE #define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag #endif +#include "wsrep_utils.h" +#include "wsrep_mysqld.h" + +#include <sql_class.h> + #include <spawn.h> // posix_spawn() #include <unistd.h> // pipe() #include <errno.h> // errno #include <string.h> // strerror() #include <sys/wait.h> // waitpid() - -#include <sql_class.h> -#include "wsrep_priv.h" +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> // getaddrinfo() extern char** environ; // environment variables @@ -313,23 +318,69 @@ thd::~thd () } // namespace wsp -extern ulong my_bind_addr; -extern uint mysqld_port; +/* Returns INADDR_NONE, INADDR_ANY, INADDR_LOOPBACK or something else */ +unsigned int wsrep_check_ip (const char* const addr) +{ + unsigned int ret = INADDR_NONE; + struct addrinfo *res, hints; + + memset (&hints, 0, sizeof(hints)); + hints.ai_flags= AI_PASSIVE/*|AI_ADDRCONFIG*/; + hints.ai_socktype= SOCK_STREAM; + hints.ai_family= AF_UNSPEC; -size_t guess_ip (char* buf, size_t buf_len) + int gai_ret = getaddrinfo(addr, NULL, &hints, &res); + if (0 == gai_ret) + { + if (AF_INET == res->ai_family) /* IPv4 */ + { + struct sockaddr_in* a= (struct sockaddr_in*)res->ai_addr; + ret= htonl(a->sin_addr.s_addr); + } + else /* IPv6 */ + { + struct sockaddr_in6* a= (struct sockaddr_in6*)res->ai_addr; + if (IN6_IS_ADDR_UNSPECIFIED(&a->sin6_addr)) + ret= INADDR_ANY; + else if (IN6_IS_ADDR_LOOPBACK(&a->sin6_addr)) + ret= INADDR_LOOPBACK; + else + ret= 0xdeadbeef; + } + freeaddrinfo (res); + } + else { + WSREP_ERROR ("getaddrinfo() failed on '%s': %d (%s)", + addr, gai_ret, gai_strerror(gai_ret)); + } + + // uint8_t* b= (uint8_t*)&ret; + // fprintf (stderr, "########## wsrep_check_ip returning: %hhu.%hhu.%hhu.%hhu\n", + // b[0], b[1], b[2], b[3]); + + return ret; +} + +extern const char* my_bind_addr_str; +extern uint mysqld_port; + +size_t wsrep_guess_ip (char* buf, size_t buf_len) { size_t ip_len = 0; - if (htonl(INADDR_NONE) == my_bind_addr) { - WSREP_ERROR("Networking not configured, cannot receive state transfer."); - return 0; - } + if (my_bind_addr_str && strlen(my_bind_addr_str)) + { + unsigned int const ip_type= wsrep_check_ip(my_bind_addr_str); - if (htonl(INADDR_ANY) != my_bind_addr) { - uint8_t* b = (uint8_t*)&my_bind_addr; - ip_len = snprintf (buf, buf_len, - "%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]); - return ip_len; + if (INADDR_NONE == ip_type) { + WSREP_ERROR("Networking not configured, cannot receive state transfer."); + return 0; + } + + if (INADDR_ANY != ip_type) {; + strncpy (buf, my_bind_addr_str, buf_len); + return strlen(buf); + } } // mysqld binds to all interfaces - try IP from wsrep_node_address @@ -400,9 +451,9 @@ size_t guess_ip (char* buf, size_t buf_len) return ip_len; } -size_t guess_address(char* buf, size_t buf_len) +size_t wsrep_guess_address(char* buf, size_t buf_len) { - size_t addr_len = guess_ip (buf, buf_len); + size_t addr_len = wsrep_guess_ip (buf, buf_len); if (addr_len && addr_len < buf_len) { addr_len += snprintf (buf + addr_len, buf_len - addr_len, diff --git a/sql/wsrep_utils.h b/sql/wsrep_utils.h new file mode 100644 index 00000000000..337678238f8 --- /dev/null +++ b/sql/wsrep_utils.h @@ -0,0 +1,208 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_UTILS_H +#define WSREP_UTILS_H + +#include "wsrep_priv.h" + +unsigned int wsrep_check_ip (const char* addr); +size_t wsrep_guess_ip (char* buf, size_t buf_len); +size_t wsrep_guess_address(char* buf, size_t buf_len); + +namespace wsp { +class node_status +{ +public: + node_status() : status(WSREP_MEMBER_UNDEFINED) {} + void set(wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0) + { + if (status != new_status || 0 != view) + { + wsrep_notify_status(new_status, view); + status = new_status; + } + } + wsrep_member_status_t get() const { return status; } +private: + wsrep_member_status_t status; +}; +} /* namespace wsp */ + +extern wsp::node_status local_status; + +namespace wsp { +/* A small class to run external programs. */ +class process +{ +private: + const char* const str_; + FILE* io_; + int err_; + pid_t pid_; + +public: +/*! @arg type is a pointer to a null-terminated string which must contain + either the letter 'r' for reading or the letter 'w' for writing. + */ + process (const char* cmd, const char* type); + ~process (); + + FILE* pipe () { return io_; } + int error() { return err_; } + int wait (); + const char* cmd() { return str_; } +}; + +class thd +{ + class thd_init + { + public: + thd_init() { my_thread_init(); } + ~thd_init() { my_thread_end(); } + } + init; + + thd (const thd&); + thd& operator= (const thd&); + +public: + + thd(my_bool wsrep_on); + ~thd(); + THD* const ptr; +}; + +class string +{ +public: + string() : string_(0) {} + void set(char* str) { if (string_) free (string_); string_ = str; } + ~string() { set (0); } +private: + char* string_; +}; + +#ifdef REMOVED +class lock +{ + pthread_mutex_t* const mtx_; + +public: + + lock (pthread_mutex_t* mtx) : mtx_(mtx) + { + int err = pthread_mutex_lock (mtx_); + + if (err) + { + WSREP_ERROR("Mutex lock failed: %s", strerror(err)); + abort(); + } + } + + virtual ~lock () + { + int err = pthread_mutex_unlock (mtx_); + + if (err) + { + WSREP_ERROR("Mutex unlock failed: %s", strerror(err)); + abort(); + } + } + + inline void wait (pthread_cond_t* cond) + { + pthread_cond_wait (cond, mtx_); + } + +private: + + lock (const lock&); + lock& operator=(const lock&); + +}; + +class monitor +{ + int mutable refcnt; + pthread_mutex_t mutable mtx; + pthread_cond_t mutable cond; + +public: + + monitor() : refcnt(0) + { + pthread_mutex_init (&mtx, NULL); + pthread_cond_init (&cond, NULL); + } + + ~monitor() + { + pthread_mutex_destroy (&mtx); + pthread_cond_destroy (&cond); + } + + void enter() const + { + lock l(&mtx); + + while (refcnt) + { + l.wait(&cond); + } + refcnt++; + } + + void leave() const + { + lock l(&mtx); + + refcnt--; + if (refcnt == 0) + { + pthread_cond_signal (&cond); + } + } + +private: + + monitor (const monitor&); + monitor& operator= (const monitor&); +}; + +class critical +{ + const monitor& mon; + +public: + + critical(const monitor& m) : mon(m) { mon.enter(); } + + ~critical() { mon.leave(); } + +private: + + critical (const critical&); + critical& operator= (const critical&); +}; +#endif + +} // namespace wsrep + +#endif /* WSREP_UTILS_H */ diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index 5dc9a475e0d..81d436f6116 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -13,12 +13,15 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include "wsrep_var.h" + #include <mysqld.h> #include <sql_class.h> #include <sql_plugin.h> #include <set_var.h> #include <sql_acl.h> #include "wsrep_priv.h" +#include "wsrep_thd.h" #include <my_dir.h> #include <cstdio> #include <cstdlib> @@ -34,8 +37,7 @@ const char* wsrep_node_name = 0; const char* wsrep_node_address = 0; const char* wsrep_node_incoming_address = 0; const char* wsrep_start_position = 0; -ulong wsrep_OSU_method_options; -static int wsrep_thread_change = 0; +ulong wsrep_OSU_method_options; int wsrep_init_vars() { @@ -58,15 +60,6 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type) // FIXME: this variable probably should be changed only per session thd->variables.wsrep_on = global_system_variables.wsrep_on; } - else { - } - -#ifdef REMOVED - if (thd->variables.wsrep_on) - thd->variables.option_bits |= (OPTION_BIN_LOG); - else - thd->variables.option_bits &= ~(OPTION_BIN_LOG); -#endif return false; } @@ -75,8 +68,6 @@ void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) if (var_type == OPT_GLOBAL) { thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; } - else { - } } static int wsrep_start_position_verify (const char* start_str) @@ -146,7 +137,7 @@ bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type) wsrep_set_local_position (wsrep_start_position); if (wsrep) { - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); } return 0; @@ -157,7 +148,7 @@ void wsrep_start_position_init (const char* val) if (NULL == val || wsrep_start_position_verify (val)) { WSREP_ERROR("Bad initial value for wsrep_start_position: %s", - (val ? val : "")); + (val ? val : "")); return; } @@ -173,7 +164,7 @@ static bool refresh_provider_options() { if (wsrep_provider_options) my_free((void *)wsrep_provider_options); wsrep_provider_options = (char*)my_memdup(opts, strlen(opts) + 1, - MYF(MY_WME)); + MYF(MY_WME)); } else { @@ -453,7 +444,7 @@ void wsrep_node_address_init (const char* value) bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) { mysql_mutex_lock(&LOCK_wsrep_slave_threads); - wsrep_thread_change = var->value->val_int() - wsrep_slave_threads; + wsrep_slave_count_change = var->value->val_int() - wsrep_slave_threads; mysql_mutex_unlock(&LOCK_wsrep_slave_threads); return 0; @@ -461,13 +452,10 @@ bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) { - if (wsrep_thread_change > 0) - { - wsrep_create_appliers(wsrep_thread_change); - } - else if (wsrep_thread_change < 0) + if (wsrep_slave_count_change > 0) { - wsrep_close_applier_threads(-wsrep_thread_change); + wsrep_create_appliers(wsrep_slave_count_change); + wsrep_slave_count_change = 0; } return false; } diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h new file mode 100644 index 00000000000..b69f670a14b --- /dev/null +++ b/sql/wsrep_var.h @@ -0,0 +1,83 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_VAR_H +#define WSREP_VAR_H + +#define WSREP_NODE_INCOMING_AUTO "AUTO" + +// MySQL variables funcs + +#include "sql_priv.h" +class sys_var; +class set_var; +class THD; + +int wsrep_init_vars(); + +#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var) +#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type) +#define DEFAULT_ARGS (THD* thd, enum_var_type var_type) +#define INIT_ARGS (const char* opt) + +extern bool wsrep_on_update UPDATE_ARGS; +extern void wsrep_causal_reads_update UPDATE_ARGS; +extern bool wsrep_start_position_check CHECK_ARGS; +extern bool wsrep_start_position_update UPDATE_ARGS; +extern void wsrep_start_position_init INIT_ARGS; + +extern bool wsrep_provider_check CHECK_ARGS; +extern bool wsrep_provider_update UPDATE_ARGS; +extern void wsrep_provider_init INIT_ARGS; + +extern bool wsrep_provider_options_check CHECK_ARGS; +extern bool wsrep_provider_options_update UPDATE_ARGS; +extern void wsrep_provider_options_init INIT_ARGS; + +extern bool wsrep_cluster_address_check CHECK_ARGS; +extern bool wsrep_cluster_address_update UPDATE_ARGS; +extern void wsrep_cluster_address_init INIT_ARGS; + +extern bool wsrep_cluster_name_check CHECK_ARGS; +extern bool wsrep_cluster_name_update UPDATE_ARGS; + +extern bool wsrep_node_name_check CHECK_ARGS; +extern bool wsrep_node_name_update UPDATE_ARGS; + +extern bool wsrep_node_address_check CHECK_ARGS; +extern bool wsrep_node_address_update UPDATE_ARGS; +extern void wsrep_node_address_init INIT_ARGS; + +extern bool wsrep_sst_method_check CHECK_ARGS; +extern bool wsrep_sst_method_update UPDATE_ARGS; +extern void wsrep_sst_method_init INIT_ARGS; + +extern bool wsrep_sst_receive_address_check CHECK_ARGS; +extern bool wsrep_sst_receive_address_update UPDATE_ARGS; + +extern bool wsrep_sst_auth_check CHECK_ARGS; +extern bool wsrep_sst_auth_update UPDATE_ARGS; +extern void wsrep_sst_auth_init INIT_ARGS; + +extern bool wsrep_sst_donor_check CHECK_ARGS; +extern bool wsrep_sst_donor_update UPDATE_ARGS; + +extern bool wsrep_slave_threads_check CHECK_ARGS; +extern bool wsrep_slave_threads_update UPDATE_ARGS; + +extern bool wsrep_desync_check CHECK_ARGS; +extern bool wsrep_desync_update UPDATE_ARGS; + +#endif /* WSREP_VAR_H */ |