diff options
author | Seppo Jaakola <seppo.jaakola@codership.com> | 2013-11-26 16:48:30 +0200 |
---|---|---|
committer | Seppo Jaakola <seppo.jaakola@codership.com> | 2013-11-26 16:48:30 +0200 |
commit | a2594e96f7c7fe762a8165916551ae96bcbb869f (patch) | |
tree | fa1e4eb2a6d6ef1ca8a039a2afe48a65bca6ab33 /sql | |
parent | 2b4183f10b54a5b3f8c848d897b3107859c23fa4 (diff) | |
download | mariadb-git-a2594e96f7c7fe762a8165916551ae96bcbb869f.tar.gz |
Merges from lp:codership-mysql/5.5 up to rev #3893, this changes to wsrep API #24
Diffstat (limited to 'sql')
-rw-r--r-- | sql/CMakeLists.txt | 2 | ||||
-rw-r--r-- | sql/events.cc | 2 | ||||
-rw-r--r-- | sql/log.cc | 20 | ||||
-rw-r--r-- | sql/log.h | 7 | ||||
-rw-r--r-- | sql/log_event.cc | 14 | ||||
-rw-r--r-- | sql/mysqld.cc | 4 | ||||
-rw-r--r-- | sql/sp.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 40 | ||||
-rw-r--r-- | sql/sql_class.h | 11 | ||||
-rw-r--r-- | sql/sql_parse.cc | 2 | ||||
-rw-r--r-- | sql/sql_trigger.cc | 4 | ||||
-rw-r--r-- | sql/sys_vars.cc | 6 | ||||
-rw-r--r-- | sql/wsrep_binlog.h | 49 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 134 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 270 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 39 | ||||
-rw-r--r-- | sql/wsrep_priv.h | 18 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 47 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 16 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 85 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 25 |
21 files changed, 466 insertions, 331 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 7766b82adff..fd654d01b1d 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -50,6 +50,8 @@ IF(WITH_WSREP) wsrep_sst.cc wsrep_utils.cc wsrep_var.cc + wsrep_binlog.cc + wsrep_applier.cc wsrep_thd.cc ) SET(WSREP_LIB wsrep) diff --git a/sql/events.cc b/sql/events.cc index 7df8d19644d..73ce894095c 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -1169,7 +1169,7 @@ end: } #ifdef WITH_WSREP -int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_event_query(THD *thd, uchar** buf, int* buf_len) { String log_query; diff --git a/sql/log.cc b/sql/log.cc index 1c51d50a2ca..6dd4aa7038b 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -596,7 +596,7 @@ void thd_binlog_rollback_stmt(THD * thd) with the exception that here we write in buffer instead of log file. */ -int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) +int wsrep_write_cache(IO_CACHE *cache, uchar **buf, int *buf_len) { if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) @@ -604,7 +604,7 @@ int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) uint length= my_b_bytes_in_cache(cache); long long total_length = 0; uchar *buf_ptr = NULL; - + do { /* bail out if buffer grows too large @@ -614,7 +614,7 @@ int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) if (total_length > wsrep_max_ws_size) { WSREP_WARN("transaction size limit (%lld) exceeded: %lld", - wsrep_max_ws_size, total_length); + wsrep_max_ws_size, total_length); if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0)) { WSREP_WARN("failed to initialize io-cache"); @@ -6017,21 +6017,27 @@ err: if (WSREP(thd) && wsrep_incremental_data_collection && (wsrep_emulate_bin_log || mysql_bin_log.is_open())) { - DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1); + DBUG_ASSERT(thd->wsrep_ws_handle.trx_id != (unsigned long)-1); if (!error) { IO_CACHE* cache= get_trans_log(thd); uchar* buf= NULL; - uint buf_len= 0; + int buf_len= 0; if (wsrep_emulate_bin_log) thd->binlog_flush_pending_rows_event(false); error= wsrep_write_cache(cache, &buf, &buf_len); if (!error && buf_len > 0) { + const struct wsrep_buf buff = { buf, buf_len }; + + const bool nocopy(false); + const bool unordered(false); + wsrep_status_t rc= wsrep->append_data(wsrep, - &thd->wsrep_trx_handle, - buf, buf_len); + &thd->wsrep_ws_handle, + &buff, 1, WSREP_DATA_ORDERED, + true); if (rc != WSREP_OK) { sql_print_warning("WSREP: append_data() returned %d", rc); diff --git a/sql/log.h b/sql/log.h index 964d75916a9..487005913c9 100644 --- a/sql/log.h +++ b/sql/log.h @@ -287,12 +287,6 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED }; (mmap+fsync is two times faster than write+fsync) */ -#ifdef WITH_WSREP -extern my_bool wsrep_emulate_bin_log; -Log_event* wsrep_read_log_event( - char **arg_buf, size_t *arg_buf_len, - const Format_description_log_event *description_event); -#endif class MYSQL_LOG { public: @@ -974,7 +968,6 @@ bool wsrep_trans_cache_is_empty(THD *thd); void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end); void thd_binlog_trx_reset(THD * thd); void thd_binlog_rollback_stmt(THD * thd); -int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len); #define WSREP_FORMAT(my_format) \ ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \ diff --git a/sql/log_event.cc b/sql/log_event.cc index a6b27caf631..450a80bc9b2 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -9211,7 +9211,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) thd->is_fatal_error, thd->wsrep_exec_mode, thd->wsrep_conflict_state, - (long long)thd->wsrep_trx_seqno); + (long long)wsrep_thd_trx_seqno(thd)); } #endif if (thd->is_slave_error || thd->is_fatal_error) @@ -10976,7 +10976,7 @@ Write_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -11659,7 +11659,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -11675,7 +11675,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::ha_delete_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()"); @@ -11809,7 +11809,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -11846,7 +11846,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Update_rows_log_event::unpack_current_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) @@ -11875,7 +11875,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Update_rows_log_event::ha_update_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()"); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 1c9560ceb9f..a10dfa1015c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -359,7 +359,11 @@ static char *default_character_set_name; static char *character_set_filesystem_name; static char *lc_messages; static char *lc_time_names_name; +#ifndef WITH_WSREP static char *my_bind_addr_str; +#else +char *my_bind_addr_str; +#endif /* WITH_WSREP */ static char *default_collation_name; char *default_storage_engine; static char compiled_default_collation_name[]= MYSQL_DEFAULT_COLLATION_NAME; diff --git a/sql/sp.cc b/sql/sp.cc index 73f59277cf0..ca8e41282cf 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -2255,7 +2255,7 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db, return sp; } #ifdef WITH_WSREP -int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len) { String log_query; sp_head *sp = thd->lex->sphead; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index a0c24ffb3ca..49fb88f5866 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -869,9 +869,9 @@ extern "C" const char *wsrep_thd_conflict_state_str(THD *thd) (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void"; } -extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd) +extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd) { - return &thd->wsrep_trx_handle; + return &thd->wsrep_ws_handle; } extern "C" void wsrep_thd_LOCK(THD *thd) @@ -896,7 +896,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) } extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) { - return (thd) ? thd->wsrep_trx_seqno : -1; + return (thd) ? thd->wsrep_trx_meta.gtid.seqno : -1; } extern "C" query_id_t wsrep_thd_query_id(THD *thd) { @@ -918,7 +918,6 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal) { if (signal) { - thd->wsrep_bf_thd = bf_thd; mysql_mutex_lock(&thd->LOCK_thd_data); thd->awake(KILL_QUERY); mysql_mutex_unlock(&thd->LOCK_thd_data); @@ -934,16 +933,16 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal) extern int wsrep_trx_order_before(void *thd1, void *thd2) { - if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) { - WSREP_DEBUG("BF conflict, order: %lld %lld\n", - (long long)((THD*)thd1)->wsrep_trx_seqno, - (long long)((THD*)thd2)->wsrep_trx_seqno); - return 1; - } - WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", - (long long)((THD*)thd1)->wsrep_trx_seqno, - (long long)((THD*)thd2)->wsrep_trx_seqno); - return 0; + if (wsrep_thd_trx_seqno((THD*)thd1) < wsrep_thd_trx_seqno((THD*)thd2)) { + WSREP_DEBUG("BF conflict, order: %lld %lld\n", + (long long)wsrep_thd_trx_seqno((THD*)thd1), + (long long)wsrep_thd_trx_seqno((THD*)thd2)); + return 1; + } + WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", + (long long)wsrep_thd_trx_seqno((THD*)thd1), + (long long)wsrep_thd_trx_seqno((THD*)thd2)); + return 0; } extern "C" int wsrep_trx_is_aborting(void *thd_ptr) @@ -1142,9 +1141,8 @@ THD::THD() #ifdef WITH_WSREP mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); - wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID; - wsrep_trx_handle.opaque = NULL; - //wsrep_retry_autocommit= ::wsrep_retry_autocommit; + wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; + wsrep_ws_handle.opaque = NULL; wsrep_retry_counter = 0; wsrep_PA_safe = true; wsrep_seqno_changed = false; @@ -1154,7 +1152,6 @@ THD::THD() wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_status_vars = 0; wsrep_mysql_replicated = 0; - wsrep_bf_thd = NULL; wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query_len = 0; #endif @@ -1549,7 +1546,8 @@ void THD::init(void) wsrep_conflict_state= NO_CONFLICT; wsrep_query_state= QUERY_IDLE; wsrep_last_query_id= 0; - wsrep_trx_seqno= 0; + wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; + wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; wsrep_converted_lock_session= false; wsrep_retry_counter= 0; wsrep_rli= NULL; @@ -1557,7 +1555,7 @@ void THD::init(void) wsrep_seqno_changed= false; wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_mysql_replicated = 0; - wsrep_bf_thd = NULL; + wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query_len = 0; #endif @@ -1941,7 +1939,7 @@ void THD::awake(killed_state state_to_set) /* Interrupt target waiting inside a storage engine. */ if (state_to_set != NOT_KILLED) #ifdef WITH_WSREP - if (!wsrep_bf_thd || wsrep_bf_thd->wsrep_exec_mode == LOCAL_STATE) + /* TODO: prevent applier close here */ #endif /* WITH_WSREP */ ha_kill_query(this, thd_kill_level(this)); diff --git a/sql/sql_class.h b/sql/sql_class.h index a68b936307a..81d1a3d4a83 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2553,11 +2553,13 @@ public: enum wsrep_conflict_state wsrep_conflict_state; mysql_mutex_t LOCK_wsrep_thd; mysql_cond_t COND_wsrep_thd; - wsrep_seqno_t wsrep_trx_seqno; + // changed from wsrep_seqno_t to wsrep_trx_meta_t in wsrep API rev 75 + // wsrep_seqno_t wsrep_trx_seqno; + wsrep_trx_meta_t wsrep_trx_meta; uint32 wsrep_rand; Relay_log_info* wsrep_rli; bool wsrep_converted_lock_session; - wsrep_trx_handle_t wsrep_trx_handle; + wsrep_ws_handle_t wsrep_ws_handle; bool wsrep_seqno_changed; #ifdef WSREP_PROC_INFO char wsrep_info[128]; /* string for dynamic proc info */ @@ -2571,9 +2573,8 @@ public: wsrep_consistency_check; wsrep_stats_var* wsrep_status_vars; int wsrep_mysql_replicated; - THD* wsrep_bf_thd; - const char* wsrep_TOI_pre_query; /* a query to apply before - the actual TOI query */ + const char* wsrep_TOI_pre_query; /* a query to apply before + the actual TOI query */ size_t wsrep_TOI_pre_query_len; #endif /* WITH_WSREP */ /** diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 8f208ee66b1..a9687432849 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -953,7 +953,6 @@ bool do_command(THD *thd) else if (thd->wsrep_conflict_state == ABORTED) { thd->store_globals(); - thd->wsrep_bf_thd = NULL; } thd->wsrep_query_state= QUERY_EXEC; @@ -1241,7 +1240,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd->mysys_var->abort = 0; thd->wsrep_conflict_state = NO_CONFLICT; thd->wsrep_retry_counter = 0; - thd->wsrep_bf_thd = NULL; /* Increment threads running to compensate dec_thread_running() called after dispatch_end label. diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc index 135207c1681..939541f913e 100644 --- a/sql/sql_trigger.cc +++ b/sql/sql_trigger.cc @@ -2484,7 +2484,7 @@ bool load_table_name_for_trigger(THD *thd, DBUG_RETURN(FALSE); } #ifdef WITH_WSREP -int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_trigger_query(THD *thd, uchar** buf, int* buf_len) { LEX *lex= thd->lex; String stmt_query; @@ -2532,6 +2532,6 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len) stmt_query.append(stmt_definition.str, stmt_definition.length); return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(), - buf, buf_len); + buf, buf_len); } #endif /* WITH_WSREP */ diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index e1b0dc50c65..37e5f3315e3 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4065,6 +4065,7 @@ static Sys_var_tz Sys_time_zone( #ifdef WITH_WSREP #include "wsrep_var.h" #include "wsrep_sst.h" +#include "wsrep_binlog.h" static Sys_var_charptr Sys_wsrep_provider( "wsrep_provider", "Path to replication provider library", @@ -4222,10 +4223,11 @@ static Sys_var_charptr Sys_wsrep_start_position ( ON_CHECK(wsrep_start_position_check), ON_UPDATE(wsrep_start_position_update)); -static Sys_var_ulonglong Sys_wsrep_max_ws_size ( +static Sys_var_ulong Sys_wsrep_max_ws_size ( "wsrep_max_ws_size", "Max write set size (bytes)", GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG), - VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1)); + /* Upper limit is 65K short of 4G to avoid overlows on 32-bit systems */ + VALID_RANGE(1024, WSREP_MAX_WS_SIZE), DEFAULT(1073741824UL), BLOCK_SIZE(1)); static Sys_var_ulong Sys_wsrep_max_ws_rows ( "wsrep_max_ws_rows", "Max number of rows in write set", diff --git a/sql/wsrep_binlog.h b/sql/wsrep_binlog.h new file mode 100644 index 00000000000..6de73b2f5ee --- /dev/null +++ b/sql/wsrep_binlog.h @@ -0,0 +1,49 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_BINLOG_H +#define WSREP_BINLOG_H + +#include "sql_class.h" // THD, IO_CACHE + +#define HEAP_PAGE_SIZE 65536 /* 64K */ +#define WSREP_MAX_WS_SIZE (0xFFFFFFFFUL - HEAP_PAGE_SIZE) + +/* + Write the contents of a cache to a memory buffer. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + */ +int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len); + +/* + Write the contents of a cache to wsrep provider. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + + @param len total amount of data written + @return wsrep error status + */ +int wsrep_write_cache (wsrep_t* wsrep, + THD* thd, + IO_CACHE* cache, + size_t* len); + +/* Dump replication buffer to disk */ +void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); + +#endif /* WSREP_BINLOG_H */ diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index 5ac4a5fad12..d43e5408f61 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -18,7 +18,7 @@ #include "rpl_filter.h" #include <sql_class.h> #include "wsrep_mysqld.h" -#include "wsrep_priv.h" +#include "wsrep_binlog.h" #include <cstdio> #include <cstdlib> @@ -26,10 +26,11 @@ extern handlerton *binlog_hton; extern int binlog_close_connection(handlerton *hton, THD *thd); extern ulonglong thd_to_trx_id(THD *thd); -extern "C" int thd_binlog_format(const MYSQL_THD thd); -// todo: share interface with ha_innodb.c +extern "C" int thd_binlog_format(const MYSQL_THD thd); +// todo: share interface with ha_innodb.c -enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); +enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, + bool all); /* a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct. @@ -45,7 +46,7 @@ void wsrep_cleanup_transaction(THD *thd) { if (thd->wsrep_seqno_changed) { - if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle)) + if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("set committed fail")); WSREP_WARN("set committed fail: %llu %d", @@ -59,7 +60,7 @@ void wsrep_cleanup_transaction(THD *thd) } thd->wsrep_exec_mode= LOCAL_STATE; } - thd->wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID; + thd->wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; } /* @@ -87,7 +88,7 @@ void wsrep_register_hton(THD* thd, bool all) } /* - wsrep exploits binlog's caches even if binlogging itself is not + wsrep exploits binlog's caches even if binlogging itself is not activated. In such case connection close needs calling actual binlog's method. Todo: split binlog hton from its caches to use ones by wsrep @@ -100,7 +101,7 @@ wsrep_close_connection(handlerton* hton, THD* thd) if (thd_get_ha_data(thd, binlog_hton) != NULL) binlog_hton->close_connection (binlog_hton, thd); DBUG_RETURN(0); -} +} /* prepare/wsrep_run_wsrep_commit can fail in two ways @@ -117,7 +118,7 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all) //wsrep_seqno_t old = thd->wsrep_trx_seqno; #endif DBUG_ENTER("wsrep_prepare"); - if ((all || + if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd))) { @@ -156,16 +157,16 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all) if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) { - if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) + if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", + WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", (long long)thd->real_id, thd->query()); } } int rcode = 0; - if (!wsrep_emulate_bin_log) + if (!wsrep_emulate_bin_log) { if (all) thd_binlog_trx_reset(thd); } @@ -183,14 +184,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) extern Rpl_filter* binlog_filter; extern my_bool opt_log_slave_updates; -extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); + enum wsrep_trx_status -wsrep_run_wsrep_commit( - THD *thd, handlerton *hton, bool all) +wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) { - int rcode = -1; - uint data_len = 0; - uchar *rbr_data = NULL; + int rcode= -1; + int data_len = 0; IO_CACHE *cache; int replay_round= 0; @@ -200,9 +199,9 @@ wsrep_run_wsrep_commit( } DBUG_ENTER("wsrep_run_wsrep_commit"); - if (thd->slave_thread && !opt_log_slave_updates) { - DBUG_RETURN(WSREP_TRX_OK); - } + + if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK); + if (thd->wsrep_exec_mode == REPL_RECV) { mysql_mutex_lock(&thd->LOCK_wsrep_thd); @@ -220,9 +219,9 @@ wsrep_run_wsrep_commit( } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } - if (thd->wsrep_exec_mode != LOCAL_STATE) { - DBUG_RETURN(WSREP_TRX_OK); - } + + if (thd->wsrep_exec_mode != LOCAL_STATE) DBUG_RETURN(WSREP_TRX_OK); + if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) { WSREP_DEBUG("commit for consistency check: %s", thd->query()); DBUG_RETURN(WSREP_TRX_OK); @@ -244,10 +243,9 @@ wsrep_run_wsrep_commit( mysql_mutex_lock(&LOCK_wsrep_replaying); - while (wsrep_replaying > 0 && + while (wsrep_replaying > 0 && thd->wsrep_conflict_state == NO_CONFLICT && - thd->killed == NOT_KILLED && - !shutdown_in_progress) + !shutdown_in_progress) { mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -265,9 +263,12 @@ wsrep_run_wsrep_commit( struct timespec wtime = {0, 1000000}; mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying, &wtime); + if (replay_round++ % 100000 == 0) - WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)", - wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round); + WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) " + "conflict: %d (round: %d)", + wsrep_replaying, thd->thread_id, + thd->wsrep_conflict_state, replay_round); mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -288,7 +289,8 @@ wsrep_run_wsrep_commit( WSREP_DEBUG("innobase_commit abort after replaying wait %s", (thd->query()) ? thd->query() : "void"); DBUG_RETURN(WSREP_TRX_ROLLBACK); - } + } + thd->wsrep_query_state = QUERY_COMMITTING; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); @@ -296,31 +298,31 @@ wsrep_run_wsrep_commit( rcode = 0; if (cache) { thd->binlog_flush_pending_rows_event(true); - rcode = wsrep_write_cache(cache, &rbr_data, &data_len); - if (rcode) { - WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode); - if (data_len) my_free(rbr_data); + rcode = wsrep_write_cache(wsrep, thd, cache, (size_t*)&data_len); + if (WSREP_OK != rcode) { + WSREP_ERROR("rbr write fail, data_len: %zu, %d", data_len, rcode); DBUG_RETURN(WSREP_TRX_ROLLBACK); } } - if (data_len == 0) + + if (data_len == 0) { mysql_mutex_lock(&thd->LOCK_wsrep_thd); thd->wsrep_exec_mode = LOCAL_COMMIT; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - if (thd->get_stmt_da()->is_ok() && + if (thd->get_stmt_da()->is_ok() && thd->get_stmt_da()->affected_rows() > 0 && !binlog_filter->is_on()) { WSREP_DEBUG("empty rbr buffer, query: %s, " - "affected rows: %llu, " - "changed tables: %d, " + "affected rows: %llu, " + "changed tables: %d, " "sql_log_bin: %d, " "wsrep status (%d %d %d)", thd->query(), thd->get_stmt_da()->affected_rows(), stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin, - thd->wsrep_exec_mode, thd->wsrep_query_state, - thd->wsrep_conflict_state); + thd->wsrep_exec_mode, thd->wsrep_query_state, + thd->wsrep_conflict_state); } else { @@ -329,33 +331,31 @@ wsrep_run_wsrep_commit( thd->wsrep_query_state= QUERY_EXEC; DBUG_RETURN(WSREP_TRX_OK); } - if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_trx_handle.trx_id) + + if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id) { - WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %d\n" - "QUERY: %s\n" - " => Skipping replication", - thd->thread_id, data_len, thd->query()); - if (wsrep_debug) - { - wsrep_write_rbr_buf(thd, rbr_data, data_len); - } + WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n" + "QUERY: %s\n" + " => Skipping replication", + thd->thread_id, data_len, thd->query()); rcode = WSREP_TRX_FAIL; } else if (!rcode) { - rcode = wsrep->pre_commit( - wsrep, - (wsrep_conn_id_t)thd->thread_id, - &thd->wsrep_trx_handle, - rbr_data, - data_len, - (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL, - &thd->wsrep_trx_seqno); - switch (rcode) { + if (WSREP_OK == rcode) + rcode = wsrep->pre_commit(wsrep, + (wsrep_conn_id_t)thd->thread_id, + &thd->wsrep_ws_handle, + WSREP_FLAG_COMMIT | + ((thd->wsrep_PA_safe) ? + 0ULL : WSREP_FLAG_PA_UNSAFE), + &thd->wsrep_trx_meta); + + switch (rcode) + { case WSREP_TRX_MISSING: WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", thd->thread_id, thd->query()); - wsrep_write_rbr_buf(thd, rbr_data, data_len); rcode = WSREP_OK; break; case WSREP_BF_ABORT: @@ -375,15 +375,10 @@ wsrep_run_wsrep_commit( } else { WSREP_ERROR("I/O error reading from thd's binlog iocache: " "errno=%d, io cache code=%d", my_errno, cache->error); - if (data_len) my_free(rbr_data); DBUG_ASSERT(0); // failure like this can not normally happen DBUG_RETURN(WSREP_TRX_ERROR); } - if (data_len) { - my_free(rbr_data); - } - mysql_mutex_lock(&thd->LOCK_wsrep_thd); switch(rcode) { case 0: @@ -392,8 +387,8 @@ wsrep_run_wsrep_commit( if (thd->transaction.xid_state.xid.get_my_xid()) { wsrep_xid_init(&thd->transaction.xid_state.xid, - wsrep_cluster_uuid(), - thd->wsrep_trx_seqno); + &thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.seqno); } DBUG_PRINT("wsrep", ("replicating commit success")); @@ -407,7 +402,7 @@ wsrep_run_wsrep_commit( if (thd->wsrep_conflict_state == MUST_ABORT) { thd->wsrep_conflict_state= ABORTED; - } + } else { WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state); @@ -465,14 +460,15 @@ mysql_declare_plugin(wsrep) &wsrep_storage_engine, "wsrep", "Codership Oy", - "A pseudo storage engine to represent transactions in multi-master synchornous replication", + "A pseudo storage engine to represent transactions in multi-master " + "synchornous replication", PLUGIN_LICENSE_GPL, wsrep_hton_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, NULL, /* status variables */ NULL, /* system variables */ - NULL, /* config options */ + NULL, /* config options */ 0, /* flags */ } mysql_declare_plugin_end; diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index af8b025cfd7..6cbbfc47ad3 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -21,6 +21,7 @@ #include "wsrep_sst.h" #include "wsrep_utils.h" #include "wsrep_var.h" +#include "wsrep_binlog.h" #include <cstdio> #include <cstdlib> #include "log_event.h" @@ -37,34 +38,32 @@ const char* wsrep_data_home_dir = NULL; const char* wsrep_dbug_option = ""; long wsrep_slave_threads = 1; // # of slave action appliers wanted +int wsrep_slave_count_change = 0; // # of appliers to stop or start my_bool wsrep_debug = 0; // enable debug level logging my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx my_bool wsrep_auto_increment_control = 1; // control auto increment variables my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey my_bool wsrep_incremental_data_collection = 0; // incremental data collection -long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size -long wsrep_max_ws_rows = 65536; // max number of rows in ws +ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size +ulong wsrep_max_ws_rows = 65536; // max number of rows in ws int wsrep_to_isolation = 0; // # of active TO isolation threads my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key long wsrep_max_protocol_version = 2; // maximum protocol version to use ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; my_bool wsrep_recovery = 0; // recovery my_bool wsrep_replicate_myisam = 0; // enable myisam replication -my_bool wsrep_log_conflicts = 0; // +my_bool wsrep_log_conflicts = 0; ulong wsrep_mysql_replication_bundle = 0; +my_bool wsrep_desync = 0; // desynchronize the node from the + // cluster my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals -my_bool wsrep_desync = 0; // desynchronize the node from the cluster /* * End configuration options */ static const wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; -const wsrep_uuid_t* wsrep_cluster_uuid() -{ - return &cluster_uuid; -} static char cluster_uuid_str[40]= { 0, }; static const char* cluster_status_str[WSREP_VIEW_MAX] = { @@ -102,7 +101,18 @@ long wsrep_protocol_version = 2; // if there was no state gap on receiving first view event. static my_bool wsrep_startup = TRUE; -/* wsrep callbacks */ +extern wsrep_cb_status_t wsrep_apply_cb(void *ctx, + const void* buf, size_t buf_len, + const wsrep_trx_meta_t* meta); + +extern wsrep_cb_status_t wsrep_commit_cb(void *ctx, + const wsrep_trx_meta_t* meta, + wsrep_bool_t *exit, + wsrep_bool_t commit); + +extern wsrep_cb_status_t wsrep_unordered_cb(void* ctx, + const void* data, + size_t size); static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { switch (level) { @@ -186,19 +196,22 @@ void wsrep_get_SE_checkpoint(XID* xid) plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); } -static void wsrep_view_handler_cb (void* app_ctx, - void* recv_ctx, - const wsrep_view_info_t* view, - const char* state, - size_t state_len, - void** sst_req, - ssize_t* sst_req_len) +static wsrep_cb_status_t +wsrep_view_handler_cb (void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view, + const char* state, + size_t state_len, + void** sst_req, + size_t* sst_req_len) { wsrep_member_status_t new_status= local_status.get(); - if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t))) + if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t))) { - memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid)); + memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid, + sizeof(cluster_uuid)); + wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, sizeof(cluster_uuid_str)); } @@ -210,7 +223,7 @@ static void wsrep_view_handler_cb (void* app_ctx, WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " "number of nodes: %ld, my index: %ld, protocol version %d", - wsrep_cluster_state_uuid, (long long)view->seqno, + wsrep_cluster_state_uuid, (long long)view->state_id.seqno, (long long)wsrep_cluster_conf_id, wsrep_cluster_status, wsrep_cluster_size, wsrep_local_index, view->proto_ver); @@ -290,14 +303,14 @@ static void wsrep_view_handler_cb (void* app_ctx, { wsrep_SE_init_grab(); // Signal mysqld init thread to continue - wsrep_sst_complete (&cluster_uuid, view->seqno, false); + wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false); // and wait for SE initialization wsrep_SE_init_wait(); } else { local_uuid= cluster_uuid; - local_seqno= view->seqno; + local_seqno= view->state_id.seqno; } /* Init storage engine XIDs from first view */ XID xid; @@ -310,7 +323,7 @@ static void wsrep_view_handler_cb (void* app_ctx, if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) { WSREP_ERROR("Undetected state gap. Can't continue."); - wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno, + wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno, &local_uuid, -1); unireg_abort(1); } @@ -322,9 +335,23 @@ static void wsrep_view_handler_cb (void* app_ctx, global_system_variables.auto_increment_increment= view->memb_num; } + { /* capabilities may be updated on new configuration */ + uint64_t const caps(wsrep->capabilities (wsrep)); + + my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0); + if (TRUE == wsrep_incremental_data_collection && FALSE == idc) + { + WSREP_WARN("Unsupported protocol downgrade: " + "incremental data collection disabled. Expect abort."); + } + wsrep_incremental_data_collection = idc; + } + out: wsrep_startup= FALSE; local_status.set(new_status, view); + + return WSREP_CB_SUCCESS; } void wsrep_ready_set (my_bool x) @@ -407,6 +434,8 @@ static void wsrep_init_position() } } +extern const char* my_bind_addr_str; + int wsrep_init() { int rcode= -1; @@ -460,7 +489,7 @@ int wsrep_init() size_t const node_addr_max= sizeof(node_addr) - 1; if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) { - size_t const ret= guess_ip(node_addr, node_addr_max); + size_t const ret= wsrep_guess_ip(node_addr, node_addr_max); if (!(ret > 0 && ret < node_addr_max)) { WSREP_WARN("Failed to guess base node address. Set it explicitly via " @@ -478,38 +507,57 @@ int wsrep_init() if ((!wsrep_node_incoming_address || !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) { - size_t const node_addr_len= strlen(node_addr); - if (node_addr_len > 0) + unsigned int my_bind_ip= INADDR_ANY; // default if not set + if (my_bind_addr_str && strlen(my_bind_addr_str)) { - const char* const colon= strrchr(node_addr, ':'); - if (strchr(node_addr, ':') == colon) // 1 or 0 ':' + my_bind_ip= wsrep_check_ip(my_bind_addr_str); + } + + if (INADDR_ANY != my_bind_ip) + { + if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip) { - size_t const ip_len= colon ? colon - node_addr : node_addr_len; - if (ip_len + 7 /* :55555\0 */ < inc_addr_max) + snprintf(inc_addr, inc_addr_max, "%s:%u", + my_bind_addr_str, (int)mysqld_port); + } // else leave inc_addr an empty string - mysqld is not listening for + // client connections on network interfaces. + } + else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible + { + size_t const node_addr_len= strlen(node_addr); + if (node_addr_len > 0) + { + const char* const colon= strrchr(node_addr, ':'); + if (strchr(node_addr, ':') == colon) // 1 or 0 ':' { - memcpy (inc_addr, node_addr, ip_len); - snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port); + size_t const ip_len= colon ? colon - node_addr : node_addr_len; + if (ip_len + 7 /* :55555\0 */ < inc_addr_max) + { + memcpy (inc_addr, node_addr, ip_len); + snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u", + (int)mysqld_port); + } + else + { + WSREP_WARN("Guessing address for incoming client connections: " + "address too long."); + inc_addr[0]= '\0'; + } } else { WSREP_WARN("Guessing address for incoming client connections: " - "address too long."); + "too many colons :) ."); inc_addr[0]= '\0'; } } - else + + if (!strlen(inc_addr)) { - WSREP_WARN("Guessing address for incoming client connections: " - "too many colons :) ."); - inc_addr[0]= '\0'; + WSREP_WARN("Guessing address for incoming client connections failed. " + "Try setting wsrep_node_incoming_address explicitly."); } } - - if (!strlen(inc_addr)) - { - WSREP_WARN("Guessing address for incoming client connections failed. " - "Try setting wsrep_node_incoming_address explicitly."); - } } else if (!strchr(wsrep_node_incoming_address, ':')) // no port included { @@ -536,6 +584,8 @@ int wsrep_init() struct wsrep_init_args wsrep_args; + struct wsrep_gtid const state_id = { local_uuid, local_seqno }; + wsrep_args.data_dir = wsrep_data_home_dir; wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; wsrep_args.node_address = node_addr; @@ -544,13 +594,13 @@ int wsrep_init() wsrep_provider_options : ""; wsrep_args.proto_ver = wsrep_max_protocol_version; - wsrep_args.state_uuid = &local_uuid; - wsrep_args.state_seqno = local_seqno; + wsrep_args.state_id = &state_id; wsrep_args.logger_cb = wsrep_log_cb; wsrep_args.view_handler_cb = wsrep_view_handler_cb; wsrep_args.apply_cb = wsrep_apply_cb; wsrep_args.commit_cb = wsrep_commit_cb; + wsrep_args.unordered_cb = wsrep_unordered_cb; wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; wsrep_args.synced_cb = wsrep_synced_cb; @@ -679,6 +729,7 @@ bool wsrep_start_replication() */ const char* cluster_address = wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address; + bool const bootstrap(TRUE == wsrep_new_cluster); wsrep_new_cluster= FALSE; WSREP_INFO("Start replication"); @@ -686,7 +737,8 @@ bool wsrep_start_replication() if ((rcode = wsrep->connect(wsrep, wsrep_cluster_name, cluster_address, - wsrep_sst_donor))) + wsrep_sst_donor, + bootstrap))) { if (-ESOCKTNOSUPPORT == rcode) { @@ -707,11 +759,6 @@ bool wsrep_start_replication() { wsrep_connected= TRUE; - uint64_t caps = wsrep->capabilities (wsrep); - - wsrep_incremental_data_collection = - !!(caps & WSREP_CAP_WRITE_SET_INCREMENTS); - char* opts= wsrep->options_get(wsrep); if (opts) { @@ -736,8 +783,8 @@ wsrep_causal_wait (THD* thd) { // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 // TODO: modify to check if thd has locked any rows. - wsrep_seqno_t seqno; - wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno); + wsrep_gtid_t gtid; + wsrep_status_t ret= wsrep->causal_read (wsrep, >id); if (unlikely(WSREP_OK != ret)) { @@ -785,7 +832,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr) { for (size_t i= 0; i < key_arr->keys_len; ++i) { - my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts); + my_free((void*)key_arr->keys[i].key_parts); } my_free(key_arr->keys); key_arr->keys= 0; @@ -805,7 +852,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr) static bool wsrep_prepare_key_for_isolation(const char* db, const char* table, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len) { if (*key_len < 2) return false; @@ -824,13 +871,13 @@ static bool wsrep_prepare_key_for_isolation(const char* db, // sql_print_information("%s.%s", db, table); if (db) { - key[*key_len].buf= db; - key[*key_len].buf_len= strlen(db); + key[*key_len].ptr= db; + key[*key_len].len= strlen(db); ++(*key_len); if (table) { - key[*key_len].buf= table; - key[*key_len].buf_len= strlen(table); + key[*key_len].ptr= table; + key[*key_len].len= strlen(table); ++(*key_len); } } @@ -863,23 +910,23 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, { if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) { - sql_print_error("Can't allocate memory for key_array"); + WSREP_ERROR("Can't allocate memory for key_array"); goto err; } ka->keys_len= 1; - if (!(ka->keys[0].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) + if (!(ka->keys[0].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - sql_print_error("Can't allocate memory for key_parts"); + WSREP_ERROR("Can't allocate memory for key_parts"); goto err; } - ka->keys[0].key_parts_len= 2; + ka->keys[0].key_parts_num= 2; if (!wsrep_prepare_key_for_isolation( db, table, - (wsrep_key_part_t*)ka->keys[0].key_parts, - &ka->keys[0].key_parts_len)) + (wsrep_buf_t*)ka->keys[0].key_parts, + &ka->keys[0].key_parts_num)) { - sql_print_error("Preparing keys for isolation failed"); + WSREP_ERROR("Preparing keys for isolation failed"); goto err; } } @@ -895,24 +942,24 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, MYF(MY_ALLOW_ZERO_PTR)); if (!tmp) { - sql_print_error("Can't allocate memory for key_array"); + WSREP_ERROR("Can't allocate memory for key_array"); goto err; } ka->keys= tmp; - if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) + if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - sql_print_error("Can't allocate memory for key_parts"); + WSREP_ERROR("Can't allocate memory for key_parts"); goto err; } - ka->keys[ka->keys_len].key_parts_len= 2; + ka->keys[ka->keys_len].key_parts_num= 2; ++ka->keys_len; if (!wsrep_prepare_key_for_isolation( table->db, table->table_name, - (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts, - &ka->keys[ka->keys_len - 1].key_parts_len)) + (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, + &ka->keys[ka->keys_len - 1].key_parts_num)) { - sql_print_error("Preparing keys for isolation failed"); + WSREP_ERROR("Preparing keys for isolation failed"); goto err; } } @@ -929,7 +976,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, size_t cache_key_len, const uchar* row_id, size_t row_id_len, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len) { if (*key_len < 3) return false; @@ -939,28 +986,30 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, { case 0: { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = cache_key_len; - ++(*key_len); + key[0].ptr = cache_key; + key[0].len = cache_key_len; + + *key_len = 1; break; } case 1: case 2: { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = strlen( (char*)cache_key ); - ++(*key_len); - key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1; - key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) ); - ++(*key_len); + key[0].ptr = cache_key; + key[0].len = strlen( (char*)cache_key ); + + key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1; + key[1].len = strlen( (char*)(key[1].ptr) ); + + *key_len = 2; break; } default: return false; } - key[*key_len].buf = row_id; - key[*key_len].buf_len = row_id_len; + key[*key_len].ptr = row_id; + key[*key_len].len = row_id_len; ++(*key_len); return true; @@ -973,7 +1022,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, * Return 0 in case of success, 1 in case of error. */ int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len) + THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len) { IO_CACHE tmp_io_cache; if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, @@ -985,8 +1034,8 @@ int wsrep_to_buf_helper( if (thd->wsrep_TOI_pre_query) { Query_log_event ev(thd, thd->wsrep_TOI_pre_query, - thd->wsrep_TOI_pre_query_len, - FALSE, FALSE, FALSE, 0); + thd->wsrep_TOI_pre_query_len, + FALSE, FALSE, FALSE, 0); if (ev.write(&tmp_io_cache)) ret= 1; } @@ -994,7 +1043,7 @@ int wsrep_to_buf_helper( Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); if (ev.write(&tmp_io_cache)) ret= 1; - if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1; + if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, (size_t*)buf_len)) ret= 1; close_cached_file(&tmp_io_cache); return ret; @@ -1002,7 +1051,7 @@ int wsrep_to_buf_helper( #include "sql_show.h" static int -create_view_query(THD *thd, uchar** buf, uint* buf_len) +create_view_query(THD *thd, uchar** buf, int* buf_len) { LEX *lex= thd->lex; SELECT_LEX *select_lex= &lex->select_lex; @@ -1021,15 +1070,15 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) if (!lex->definer) { /* - DEFINER-clause is missing; we have to create default definer in - persistent arena to be PS/SP friendly. - If this is an ALTER VIEW then the current user should be set as - the definer. + DEFINER-clause is missing; we have to create default definer in + persistent arena to be PS/SP friendly. + If this is an ALTER VIEW then the current user should be set as + the definer. */ if (!(lex->definer= create_default_definer(thd))) { - WSREP_WARN("view default definer issue"); + WSREP_WARN("view default definer issue"); } } @@ -1056,7 +1105,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) List_iterator_fast<LEX_STRING> names(lex->view_list); LEX_STRING *name; int i; - + for (i= 0; (name= names++); i++) { buff.append(i ? ", " : "("); @@ -1079,11 +1128,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, { wsrep_status_t ret(WSREP_WARNING); uchar* buf(0); - uint buf_len(0); + int buf_len(0); int buf_err; - WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode, thd->query() ); + WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), + thd->wsrep_exec_mode, thd->query() ); switch (thd->lex->sql_command) { case SQLCOM_CREATE_VIEW: @@ -1106,19 +1155,20 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, } wsrep_key_arr_t key_arr= {0, 0}; + struct wsrep_buf buff = { buf, buf_len }; if (!buf_err && wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, key_arr.keys, key_arr.keys_len, - buf, buf_len, - &thd->wsrep_trx_seqno))) + &buff, 1, + &thd->wsrep_trx_meta))) { thd->wsrep_exec_mode= TOTAL_ORDER; wsrep_to_isolation++; if (buf) my_free(buf); wsrep_keys_free(&key_arr); - WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode); + WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd), + thd->wsrep_exec_mode); } else { /* jump to error handler in mysql_execute_command() */ @@ -1137,10 +1187,10 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, static void wsrep_TOI_end(THD *thd) { wsrep_status_t ret; wsrep_to_isolation--; - WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void") if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { - WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno); + WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd)); } else { WSREP_WARN("TO isolation end failed for: %d, sql: %s", @@ -1151,7 +1201,7 @@ static void wsrep_TOI_end(THD *thd) { static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) { wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); ret = wsrep->desync(wsrep); @@ -1196,7 +1246,7 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) static void wsrep_RSU_end(THD *thd) { wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); @@ -1276,10 +1326,10 @@ void wsrep_to_isolation_end(THD *thd) { "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ msg, \ - req->thread_id, (long long)req->wsrep_trx_seqno, \ + req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ req->get_command(), req->lex->sql_command, req->query(), \ - gra->thread_id, (long long)gra->wsrep_trx_seqno, \ + gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->get_command(), gra->lex->sql_command, gra->query()); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 67bc9d0ea15..276f2a7b580 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -28,7 +28,6 @@ class THD; #ifdef WITH_WSREP #include "../wsrep/wsrep_api.h" -//#include "wsrep_mysqld.h" enum wsrep_exec_mode { LOCAL_STATE, REPL_RECV, @@ -73,15 +72,16 @@ extern const char* wsrep_node_incoming_address; extern const char* wsrep_data_home_dir; extern const char* wsrep_dbug_option; extern long wsrep_slave_threads; -extern my_bool wsrep_debug; +extern int wsrep_slave_count_change; +extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug; extern my_bool wsrep_convert_LOCK_to_trx; extern ulong wsrep_retry_autocommit; extern my_bool wsrep_auto_increment_control; extern my_bool wsrep_drupal_282555_workaround; extern my_bool wsrep_incremental_data_collection; extern const char* wsrep_start_position; -extern long long wsrep_max_ws_size; -extern long wsrep_max_ws_rows; +extern ulong wsrep_max_ws_size; +extern ulong wsrep_max_ws_rows; extern const char* wsrep_notify_cmd; extern my_bool wsrep_certify_nonPK; extern long wsrep_max_protocol_version; @@ -136,13 +136,13 @@ extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd); extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); extern "C" const char * wsrep_thd_query_state_str(THD *thd); -extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); +extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd); extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); extern "C" void wsrep_thd_set_query_state( - THD *thd, enum wsrep_query_state state); + THD *thd, enum wsrep_query_state state); extern "C" void wsrep_thd_set_conflict_state( - THD *thd, enum wsrep_conflict_state state); + THD *thd, enum wsrep_conflict_state state); extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id); @@ -259,20 +259,12 @@ extern mysql_mutex_t LOCK_wsrep_rollback; extern mysql_cond_t COND_wsrep_rollback; extern int wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_replaying; -extern mysql_cond_t COND_wsrep_replaying; -extern wsrep_aborting_thd_t wsrep_aborting_thd; -extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug; -extern my_bool wsrep_convert_LOCK_to_trx; -extern ulong wsrep_retry_autocommit; -extern my_bool wsrep_emulate_bin_log; -extern my_bool wsrep_auto_increment_control; -extern my_bool wsrep_drupal_282555_workaround; -extern long long wsrep_max_ws_size; -extern long wsrep_max_ws_rows; -extern int wsrep_to_isolation; -extern my_bool wsrep_certify_nonPK; +extern mysql_cond_t COND_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_mutex_t LOCK_wsrep_desync; +extern wsrep_aborting_thd_t wsrep_aborting_thd; +extern my_bool wsrep_emulate_bin_log; +extern int wsrep_to_isolation; extern PSI_mutex_key key_LOCK_wsrep_ready; extern PSI_mutex_key key_COND_wsrep_ready; @@ -295,12 +287,11 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, void wsrep_to_isolation_end(THD *thd); int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len); -int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len); -int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len); -int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len); + THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len); +int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len); +int wsrep_create_trigger_query(THD *thd, uchar** buf, int* buf_len); +int wsrep_create_event_query(THD *thd, uchar** buf, int* buf_len); -const wsrep_uuid_t* wsrep_cluster_uuid(); struct xid_t; void wsrep_set_SE_checkpoint(xid_t*); diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h index 291823d773e..5c66587d757 100644 --- a/sql/wsrep_priv.h +++ b/sql/wsrep_priv.h @@ -32,24 +32,22 @@ ssize_t wsrep_sst_prepare (void** msg); wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const void* msg, size_t msg_len, - const wsrep_gtid_t* state_id, + const wsrep_gtid_t* current_id, const char* state, size_t state_len, bool bypass); +extern unsigned int wsrep_check_ip (const char* addr); +extern size_t wsrep_guess_ip (char* buf, size_t buf_len); +extern size_t wsrep_guess_address(char* buf, size_t buf_len); extern wsrep_uuid_t local_uuid; extern wsrep_seqno_t local_seqno; // a helper function -void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t, +extern void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t, const void*, size_t); /*! SST thread signals init thread about sst completion */ -void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); - -extern void wsrep_notify_status (wsrep_member_status_t new_status, - const wsrep_view_info_t* view = 0); - -/* binlog-related stuff */ -int wsrep_write_cache(IO_CACHE *cache, uchar **buf, size_t *buf_len); - +extern void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); +void wsrep_notify_status (wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0); #endif /* WSREP_PRIV_H */ diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index d651e1ed0a9..6a97b29ff6d 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -213,8 +213,8 @@ bool wsrep_sst_wait () // Signal end of SST void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, - wsrep_seqno_t sst_seqno, - bool needed) + wsrep_seqno_t sst_seqno, + bool needed) { if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); if (!sst_complete) @@ -232,13 +232,26 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, mysql_mutex_unlock (&LOCK_wsrep_sst); } +void wsrep_sst_received (wsrep_t* const wsrep, + const wsrep_uuid_t* const uuid, + wsrep_seqno_t const seqno, + const void* const state, + size_t const state_len) +{ + int const rcode(seqno < 0 ? seqno : 0); + wsrep_gtid_t const state_id = { + *uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno) + }; + wsrep->sst_received(wsrep, &state_id, state, state_len, rcode); +} + // Let applier threads to continue void wsrep_sst_continue () { if (sst_needed) { WSREP_INFO("Signalling provider to continue."); - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); } } @@ -434,7 +447,6 @@ static ssize_t sst_prepare_other (const char* method, return ret; } -//extern ulong my_bind_addr; extern uint mysqld_port; /*! Just tells donor where to send mysqldump */ @@ -518,7 +530,7 @@ ssize_t wsrep_sst_prepare (void** msg) } else { - ssize_t ret= guess_ip (ip_buf, ip_max); + ssize_t ret= wsrep_guess_ip (ip_buf, ip_max); if (ret && ret < ip_max) { @@ -706,7 +718,9 @@ static int sst_donate_mysqldump (const char* addr, ret= sst_run_shell (cmd_str, 3); } - wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno); + wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)}; + + wsrep->sst_sent (wsrep, &state_id, ret); return ret; } @@ -896,7 +910,10 @@ wait_signal: } // signal to donor that SST is over - wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno); + struct wsrep_gtid const state_id = { + ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno + }; + wsrep->sst_sent (wsrep, &state_id, -err); proc.wait(); return NULL; @@ -950,10 +967,9 @@ static int sst_donate_other (const char* method, return arg.err; } -int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, +wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const void* msg, size_t msg_len, - const wsrep_uuid_t* current_uuid, - wsrep_seqno_t current_seqno, + const wsrep_gtid_t* current_gtid, const char* state, size_t state_len, bool bypass) { @@ -967,20 +983,19 @@ int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const char* data = method + method_len + 1; char uuid_str[37]; - wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str)); + wsrep_uuid_print (¤t_gtid->uuid, uuid_str, sizeof(uuid_str)); int ret; if (!strcmp (WSREP_SST_MYSQLDUMP, method)) { - ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno, - bypass); + ret = sst_donate_mysqldump(data, ¤t_gtid->uuid, uuid_str, + current_gtid->seqno, bypass); } else { - ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass); + ret = sst_donate_other(method, data, uuid_str, current_gtid->seqno,bypass); } - - return (ret > 0 ? 0 : ret); + return (ret > 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE); } void wsrep_SE_init_grab() diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 9cbd32cac73..35f5a9bca6a 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -113,17 +113,17 @@ void wsrep_replay_transaction(THD *thd) /* checking if BF trx must be replayed */ if (thd->wsrep_conflict_state== MUST_REPLAY) { if (thd->wsrep_exec_mode!= REPL_RECV) { - if (thd->stmt_da->is_sent) + if (thd->get_stmt_da()->is_sent()) { WSREP_ERROR("replay issue, thd has reported status already"); } - thd->stmt_da->reset_diagnostics_area(); + thd->get_stmt_da()->reset_diagnostics_area(); thd->wsrep_conflict_state= REPLAYING; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - mysql_reset_thd_for_next_command(thd); - thd->killed= THD::NOT_KILLED; + mysql_reset_thd_for_next_command(thd, opt_userstat_running); + thd->killed= NOT_KILLED; close_thread_tables(thd); if (thd->locked_tables_mode && thd->lock) { @@ -157,7 +157,7 @@ void wsrep_replay_transaction(THD *thd) wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); WSREP_DEBUG("trx_replay successful for: %ld %llu", thd->thread_id, (long long)thd->real_id); - if (thd->stmt_da->is_sent) + if (thd->get_stmt_da()->is_sent()) { WSREP_WARN("replay ok, thd has reported status"); } @@ -167,7 +167,7 @@ void wsrep_replay_transaction(THD *thd) } break; case WSREP_TRX_FAIL: - if (thd->stmt_da->is_sent) + if (thd->get_stmt_da()->is_sent()) { WSREP_ERROR("replay failed, thd has reported status"); } @@ -237,7 +237,7 @@ static void wsrep_replication_process(THD *thd) * avoid mysql shutdown. This is because the killer will then handle * shutdown processing (or replication restarting) */ - if (thd->killed != THD::KILL_CONNECTION) + if (thd->killed != KILL_CONNECTION) { wsrep_kill_mysql(thd); } @@ -289,7 +289,7 @@ static void wsrep_rollback_process(THD *thd) mysql_mutex_lock(&LOCK_wsrep_rollback); wsrep_aborting_thd= NULL; - while (thd->killed == THD::NOT_KILLED) { + while (thd->killed == NOT_KILLED) { thd_proc_info(thd, "wsrep aborter idle"); thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; thd->mysys_var->current_cond= &COND_wsrep_rollback; diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index 37e537c62e4..b01bdaaa15e 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -22,16 +22,17 @@ #include "wsrep_utils.h" #include "wsrep_mysqld.h" -//#include "wsrep_api.h" -//#include "wsrep_priv.h" + +#include <sql_class.h> + #include <spawn.h> // posix_spawn() #include <unistd.h> // pipe() #include <errno.h> // errno #include <string.h> // strerror() #include <sys/wait.h> // waitpid() - -#include <sql_class.h> -#include "wsrep_priv.h" +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> // getaddrinfo() extern char** environ; // environment variables @@ -317,23 +318,69 @@ thd::~thd () } // namespace wsp -extern ulong my_bind_addr; -extern uint mysqld_port; +/* Returns INADDR_NONE, INADDR_ANY, INADDR_LOOPBACK or something else */ +unsigned int wsrep_check_ip (const char* const addr) +{ + unsigned int ret = INADDR_NONE; + struct addrinfo *res, hints; + + memset (&hints, 0, sizeof(hints)); + hints.ai_flags= AI_PASSIVE/*|AI_ADDRCONFIG*/; + hints.ai_socktype= SOCK_STREAM; + hints.ai_family= AF_UNSPEC; -size_t guess_ip (char* buf, size_t buf_len) + int gai_ret = getaddrinfo(addr, NULL, &hints, &res); + if (0 == gai_ret) + { + if (AF_INET == res->ai_family) /* IPv4 */ + { + struct sockaddr_in* a= (struct sockaddr_in*)res->ai_addr; + ret= htonl(a->sin_addr.s_addr); + } + else /* IPv6 */ + { + struct sockaddr_in6* a= (struct sockaddr_in6*)res->ai_addr; + if (IN6_IS_ADDR_UNSPECIFIED(&a->sin6_addr)) + ret= INADDR_ANY; + else if (IN6_IS_ADDR_LOOPBACK(&a->sin6_addr)) + ret= INADDR_LOOPBACK; + else + ret= 0xdeadbeef; + } + freeaddrinfo (res); + } + else { + WSREP_ERROR ("getaddrinfo() failed on '%s': %d (%s)", + addr, gai_ret, gai_strerror(gai_ret)); + } + + // uint8_t* b= (uint8_t*)&ret; + // fprintf (stderr, "########## wsrep_check_ip returning: %hhu.%hhu.%hhu.%hhu\n", + // b[0], b[1], b[2], b[3]); + + return ret; +} + +extern const char* my_bind_addr_str; +extern uint mysqld_port; + +size_t wsrep_guess_ip (char* buf, size_t buf_len) { size_t ip_len = 0; - if (htonl(INADDR_NONE) == my_bind_addr) { - WSREP_ERROR("Networking not configured, cannot receive state transfer."); - return 0; - } + if (my_bind_addr_str && strlen(my_bind_addr_str)) + { + unsigned int const ip_type= wsrep_check_ip(my_bind_addr_str); - if (htonl(INADDR_ANY) != my_bind_addr) { - uint8_t* b = (uint8_t*)&my_bind_addr; - ip_len = snprintf (buf, buf_len, - "%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]); - return ip_len; + if (INADDR_NONE == ip_type) { + WSREP_ERROR("Networking not configured, cannot receive state transfer."); + return 0; + } + + if (INADDR_ANY != ip_type) {; + strncpy (buf, my_bind_addr_str, buf_len); + return strlen(buf); + } } // mysqld binds to all interfaces - try IP from wsrep_node_address @@ -404,9 +451,9 @@ size_t guess_ip (char* buf, size_t buf_len) return ip_len; } -size_t guess_address(char* buf, size_t buf_len) +size_t wsrep_guess_address(char* buf, size_t buf_len) { - size_t addr_len = guess_ip (buf, buf_len); + size_t addr_len = wsrep_guess_ip (buf, buf_len); if (addr_len && addr_len < buf_len) { addr_len += snprintf (buf + addr_len, buf_len - addr_len, diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index bd041ed51ff..c3c795a291d 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -38,7 +38,6 @@ const char* wsrep_node_address = 0; const char* wsrep_node_incoming_address = 0; const char* wsrep_start_position = 0; ulong wsrep_OSU_method_options; -static int wsrep_thread_change = 0; int wsrep_init_vars() { @@ -61,15 +60,6 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type) // FIXME: this variable probably should be changed only per session thd->variables.wsrep_on = global_system_variables.wsrep_on; } - else { - } - -#ifdef REMOVED - if (thd->variables.wsrep_on) - thd->variables.option_bits |= (OPTION_BIN_LOG); - else - thd->variables.option_bits &= ~(OPTION_BIN_LOG); -#endif return false; } @@ -78,8 +68,6 @@ void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) if (var_type == OPT_GLOBAL) { thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; } - else { - } } static int wsrep_start_position_verify (const char* start_str) @@ -149,7 +137,7 @@ bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type) wsrep_set_local_position (wsrep_start_position); if (wsrep) { - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); } return 0; @@ -455,7 +443,7 @@ void wsrep_node_address_init (const char* value) bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) { mysql_mutex_lock(&LOCK_wsrep_slave_threads); - wsrep_thread_change = var->value->val_int() - wsrep_slave_threads; + wsrep_slave_count_change = var->value->val_int() - wsrep_slave_threads; mysql_mutex_unlock(&LOCK_wsrep_slave_threads); return 0; @@ -463,13 +451,10 @@ bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) { - if (wsrep_thread_change > 0) - { - wsrep_create_appliers(wsrep_thread_change); - } - else if (wsrep_thread_change < 0) + if (wsrep_slave_count_change > 0) { - wsrep_close_applier_threads(-wsrep_thread_change); + wsrep_create_appliers(wsrep_slave_count_change); + wsrep_slave_count_change = 0; } return false; } |