diff options
-rw-r--r-- | sql/CMakeLists.txt | 2 | ||||
-rw-r--r-- | sql/events.cc | 2 | ||||
-rw-r--r-- | sql/log.cc | 20 | ||||
-rw-r--r-- | sql/log.h | 7 | ||||
-rw-r--r-- | sql/log_event.cc | 14 | ||||
-rw-r--r-- | sql/mysqld.cc | 4 | ||||
-rw-r--r-- | sql/sp.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.cc | 40 | ||||
-rw-r--r-- | sql/sql_class.h | 11 | ||||
-rw-r--r-- | sql/sql_parse.cc | 2 | ||||
-rw-r--r-- | sql/sql_trigger.cc | 4 | ||||
-rw-r--r-- | sql/sys_vars.cc | 6 | ||||
-rw-r--r-- | sql/wsrep_binlog.h | 49 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 134 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 270 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 39 | ||||
-rw-r--r-- | sql/wsrep_priv.h | 18 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 47 | ||||
-rw-r--r-- | sql/wsrep_thd.cc | 16 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 85 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 25 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 120 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.h | 2 | ||||
-rw-r--r-- | storage/innobase/include/ha_prototypes.h | 2 | ||||
-rw-r--r-- | wsrep/wsrep_api.h | 703 | ||||
-rw-r--r-- | wsrep/wsrep_dummy.c | 107 | ||||
-rw-r--r-- | wsrep/wsrep_loader.c | 8 | ||||
-rw-r--r-- | wsrep/wsrep_uuid.c | 27 |
28 files changed, 1082 insertions, 684 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 7766b82adff..fd654d01b1d 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -50,6 +50,8 @@ IF(WITH_WSREP) wsrep_sst.cc wsrep_utils.cc wsrep_var.cc + wsrep_binlog.cc + wsrep_applier.cc wsrep_thd.cc ) SET(WSREP_LIB wsrep) diff --git a/sql/events.cc b/sql/events.cc index 7df8d19644d..73ce894095c 100644 --- a/sql/events.cc +++ b/sql/events.cc @@ -1169,7 +1169,7 @@ end: } #ifdef WITH_WSREP -int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_event_query(THD *thd, uchar** buf, int* buf_len) { String log_query; diff --git a/sql/log.cc b/sql/log.cc index 1c51d50a2ca..6dd4aa7038b 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -596,7 +596,7 @@ void thd_binlog_rollback_stmt(THD * thd) with the exception that here we write in buffer instead of log file. */ -int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) +int wsrep_write_cache(IO_CACHE *cache, uchar **buf, int *buf_len) { if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) @@ -604,7 +604,7 @@ int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) uint length= my_b_bytes_in_cache(cache); long long total_length = 0; uchar *buf_ptr = NULL; - + do { /* bail out if buffer grows too large @@ -614,7 +614,7 @@ int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) if (total_length > wsrep_max_ws_size) { WSREP_WARN("transaction size limit (%lld) exceeded: %lld", - wsrep_max_ws_size, total_length); + wsrep_max_ws_size, total_length); if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0)) { WSREP_WARN("failed to initialize io-cache"); @@ -6017,21 +6017,27 @@ err: if (WSREP(thd) && wsrep_incremental_data_collection && (wsrep_emulate_bin_log || mysql_bin_log.is_open())) { - DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1); + DBUG_ASSERT(thd->wsrep_ws_handle.trx_id != (unsigned long)-1); if (!error) { IO_CACHE* cache= get_trans_log(thd); uchar* buf= NULL; - uint buf_len= 0; + int buf_len= 0; if (wsrep_emulate_bin_log) thd->binlog_flush_pending_rows_event(false); error= wsrep_write_cache(cache, &buf, &buf_len); if (!error && buf_len > 0) { + const struct wsrep_buf buff = { buf, buf_len }; + + const bool nocopy(false); + const bool unordered(false); + wsrep_status_t rc= wsrep->append_data(wsrep, - &thd->wsrep_trx_handle, - buf, buf_len); + &thd->wsrep_ws_handle, + &buff, 1, WSREP_DATA_ORDERED, + true); if (rc != WSREP_OK) { sql_print_warning("WSREP: append_data() returned %d", rc); diff --git a/sql/log.h b/sql/log.h index 964d75916a9..487005913c9 100644 --- a/sql/log.h +++ b/sql/log.h @@ -287,12 +287,6 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED }; (mmap+fsync is two times faster than write+fsync) */ -#ifdef WITH_WSREP -extern my_bool wsrep_emulate_bin_log; -Log_event* wsrep_read_log_event( - char **arg_buf, size_t *arg_buf_len, - const Format_description_log_event *description_event); -#endif class MYSQL_LOG { public: @@ -974,7 +968,6 @@ bool wsrep_trans_cache_is_empty(THD *thd); void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end); void thd_binlog_trx_reset(THD * thd); void thd_binlog_rollback_stmt(THD * thd); -int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len); #define WSREP_FORMAT(my_format) \ ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \ diff --git a/sql/log_event.cc b/sql/log_event.cc index a6b27caf631..450a80bc9b2 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -9211,7 +9211,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli) thd->is_fatal_error, thd->wsrep_exec_mode, thd->wsrep_conflict_state, - (long long)thd->wsrep_trx_seqno); + (long long)wsrep_thd_trx_seqno(thd)); } #endif if (thd->is_slave_error || thd->is_fatal_error) @@ -10976,7 +10976,7 @@ Write_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -11659,7 +11659,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -11675,7 +11675,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::ha_delete_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()"); @@ -11809,7 +11809,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) char info[64]; info[sizeof(info) - 1] = '\0'; snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; #else const char* tmp = (WSREP(thd)) ? @@ -11846,7 +11846,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Update_rows_log_event::unpack_current_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) @@ -11875,7 +11875,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli) #ifdef WSREP_PROC_INFO snprintf(info, sizeof(info) - 1, "Update_rows_log_event::ha_update_row(%lld)", - (long long) thd->wsrep_trx_seqno); + (long long) wsrep_thd_trx_seqno(thd)); if (WSREP(thd)) thd_proc_info(thd, info); #else if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()"); diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 1c9560ceb9f..a10dfa1015c 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -359,7 +359,11 @@ static char *default_character_set_name; static char *character_set_filesystem_name; static char *lc_messages; static char *lc_time_names_name; +#ifndef WITH_WSREP static char *my_bind_addr_str; +#else +char *my_bind_addr_str; +#endif /* WITH_WSREP */ static char *default_collation_name; char *default_storage_engine; static char compiled_default_collation_name[]= MYSQL_DEFAULT_COLLATION_NAME; diff --git a/sql/sp.cc b/sql/sp.cc index 73f59277cf0..ca8e41282cf 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -2255,7 +2255,7 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db, return sp; } #ifdef WITH_WSREP -int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len) { String log_query; sp_head *sp = thd->lex->sphead; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index a0c24ffb3ca..49fb88f5866 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -869,9 +869,9 @@ extern "C" const char *wsrep_thd_conflict_state_str(THD *thd) (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void"; } -extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd) +extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd) { - return &thd->wsrep_trx_handle; + return &thd->wsrep_ws_handle; } extern "C" void wsrep_thd_LOCK(THD *thd) @@ -896,7 +896,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd) } extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) { - return (thd) ? thd->wsrep_trx_seqno : -1; + return (thd) ? thd->wsrep_trx_meta.gtid.seqno : -1; } extern "C" query_id_t wsrep_thd_query_id(THD *thd) { @@ -918,7 +918,6 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal) { if (signal) { - thd->wsrep_bf_thd = bf_thd; mysql_mutex_lock(&thd->LOCK_thd_data); thd->awake(KILL_QUERY); mysql_mutex_unlock(&thd->LOCK_thd_data); @@ -934,16 +933,16 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal) extern int wsrep_trx_order_before(void *thd1, void *thd2) { - if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) { - WSREP_DEBUG("BF conflict, order: %lld %lld\n", - (long long)((THD*)thd1)->wsrep_trx_seqno, - (long long)((THD*)thd2)->wsrep_trx_seqno); - return 1; - } - WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", - (long long)((THD*)thd1)->wsrep_trx_seqno, - (long long)((THD*)thd2)->wsrep_trx_seqno); - return 0; + if (wsrep_thd_trx_seqno((THD*)thd1) < wsrep_thd_trx_seqno((THD*)thd2)) { + WSREP_DEBUG("BF conflict, order: %lld %lld\n", + (long long)wsrep_thd_trx_seqno((THD*)thd1), + (long long)wsrep_thd_trx_seqno((THD*)thd2)); + return 1; + } + WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", + (long long)wsrep_thd_trx_seqno((THD*)thd1), + (long long)wsrep_thd_trx_seqno((THD*)thd2)); + return 0; } extern "C" int wsrep_trx_is_aborting(void *thd_ptr) @@ -1142,9 +1141,8 @@ THD::THD() #ifdef WITH_WSREP mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); - wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID; - wsrep_trx_handle.opaque = NULL; - //wsrep_retry_autocommit= ::wsrep_retry_autocommit; + wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; + wsrep_ws_handle.opaque = NULL; wsrep_retry_counter = 0; wsrep_PA_safe = true; wsrep_seqno_changed = false; @@ -1154,7 +1152,6 @@ THD::THD() wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_status_vars = 0; wsrep_mysql_replicated = 0; - wsrep_bf_thd = NULL; wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query_len = 0; #endif @@ -1549,7 +1546,8 @@ void THD::init(void) wsrep_conflict_state= NO_CONFLICT; wsrep_query_state= QUERY_IDLE; wsrep_last_query_id= 0; - wsrep_trx_seqno= 0; + wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED; + wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED; wsrep_converted_lock_session= false; wsrep_retry_counter= 0; wsrep_rli= NULL; @@ -1557,7 +1555,7 @@ void THD::init(void) wsrep_seqno_changed= false; wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_mysql_replicated = 0; - wsrep_bf_thd = NULL; + wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query_len = 0; #endif @@ -1941,7 +1939,7 @@ void THD::awake(killed_state state_to_set) /* Interrupt target waiting inside a storage engine. */ if (state_to_set != NOT_KILLED) #ifdef WITH_WSREP - if (!wsrep_bf_thd || wsrep_bf_thd->wsrep_exec_mode == LOCAL_STATE) + /* TODO: prevent applier close here */ #endif /* WITH_WSREP */ ha_kill_query(this, thd_kill_level(this)); diff --git a/sql/sql_class.h b/sql/sql_class.h index a68b936307a..81d1a3d4a83 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2553,11 +2553,13 @@ public: enum wsrep_conflict_state wsrep_conflict_state; mysql_mutex_t LOCK_wsrep_thd; mysql_cond_t COND_wsrep_thd; - wsrep_seqno_t wsrep_trx_seqno; + // changed from wsrep_seqno_t to wsrep_trx_meta_t in wsrep API rev 75 + // wsrep_seqno_t wsrep_trx_seqno; + wsrep_trx_meta_t wsrep_trx_meta; uint32 wsrep_rand; Relay_log_info* wsrep_rli; bool wsrep_converted_lock_session; - wsrep_trx_handle_t wsrep_trx_handle; + wsrep_ws_handle_t wsrep_ws_handle; bool wsrep_seqno_changed; #ifdef WSREP_PROC_INFO char wsrep_info[128]; /* string for dynamic proc info */ @@ -2571,9 +2573,8 @@ public: wsrep_consistency_check; wsrep_stats_var* wsrep_status_vars; int wsrep_mysql_replicated; - THD* wsrep_bf_thd; - const char* wsrep_TOI_pre_query; /* a query to apply before - the actual TOI query */ + const char* wsrep_TOI_pre_query; /* a query to apply before + the actual TOI query */ size_t wsrep_TOI_pre_query_len; #endif /* WITH_WSREP */ /** diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 8f208ee66b1..a9687432849 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -953,7 +953,6 @@ bool do_command(THD *thd) else if (thd->wsrep_conflict_state == ABORTED) { thd->store_globals(); - thd->wsrep_bf_thd = NULL; } thd->wsrep_query_state= QUERY_EXEC; @@ -1241,7 +1240,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd->mysys_var->abort = 0; thd->wsrep_conflict_state = NO_CONFLICT; thd->wsrep_retry_counter = 0; - thd->wsrep_bf_thd = NULL; /* Increment threads running to compensate dec_thread_running() called after dispatch_end label. diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc index 135207c1681..939541f913e 100644 --- a/sql/sql_trigger.cc +++ b/sql/sql_trigger.cc @@ -2484,7 +2484,7 @@ bool load_table_name_for_trigger(THD *thd, DBUG_RETURN(FALSE); } #ifdef WITH_WSREP -int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len) +int wsrep_create_trigger_query(THD *thd, uchar** buf, int* buf_len) { LEX *lex= thd->lex; String stmt_query; @@ -2532,6 +2532,6 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len) stmt_query.append(stmt_definition.str, stmt_definition.length); return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(), - buf, buf_len); + buf, buf_len); } #endif /* WITH_WSREP */ diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index e1b0dc50c65..37e5f3315e3 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4065,6 +4065,7 @@ static Sys_var_tz Sys_time_zone( #ifdef WITH_WSREP #include "wsrep_var.h" #include "wsrep_sst.h" +#include "wsrep_binlog.h" static Sys_var_charptr Sys_wsrep_provider( "wsrep_provider", "Path to replication provider library", @@ -4222,10 +4223,11 @@ static Sys_var_charptr Sys_wsrep_start_position ( ON_CHECK(wsrep_start_position_check), ON_UPDATE(wsrep_start_position_update)); -static Sys_var_ulonglong Sys_wsrep_max_ws_size ( +static Sys_var_ulong Sys_wsrep_max_ws_size ( "wsrep_max_ws_size", "Max write set size (bytes)", GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG), - VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1)); + /* Upper limit is 65K short of 4G to avoid overlows on 32-bit systems */ + VALID_RANGE(1024, WSREP_MAX_WS_SIZE), DEFAULT(1073741824UL), BLOCK_SIZE(1)); static Sys_var_ulong Sys_wsrep_max_ws_rows ( "wsrep_max_ws_rows", "Max number of rows in write set", diff --git a/sql/wsrep_binlog.h b/sql/wsrep_binlog.h new file mode 100644 index 00000000000..6de73b2f5ee --- /dev/null +++ b/sql/wsrep_binlog.h @@ -0,0 +1,49 @@ +/* Copyright (C) 2013 Codership Oy <info@codership.com> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ + +#ifndef WSREP_BINLOG_H +#define WSREP_BINLOG_H + +#include "sql_class.h" // THD, IO_CACHE + +#define HEAP_PAGE_SIZE 65536 /* 64K */ +#define WSREP_MAX_WS_SIZE (0xFFFFFFFFUL - HEAP_PAGE_SIZE) + +/* + Write the contents of a cache to a memory buffer. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + */ +int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len); + +/* + Write the contents of a cache to wsrep provider. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + + @param len total amount of data written + @return wsrep error status + */ +int wsrep_write_cache (wsrep_t* wsrep, + THD* thd, + IO_CACHE* cache, + size_t* len); + +/* Dump replication buffer to disk */ +void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); + +#endif /* WSREP_BINLOG_H */ diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc index 5ac4a5fad12..d43e5408f61 100644 --- a/sql/wsrep_hton.cc +++ b/sql/wsrep_hton.cc @@ -18,7 +18,7 @@ #include "rpl_filter.h" #include <sql_class.h> #include "wsrep_mysqld.h" -#include "wsrep_priv.h" +#include "wsrep_binlog.h" #include <cstdio> #include <cstdlib> @@ -26,10 +26,11 @@ extern handlerton *binlog_hton; extern int binlog_close_connection(handlerton *hton, THD *thd); extern ulonglong thd_to_trx_id(THD *thd); -extern "C" int thd_binlog_format(const MYSQL_THD thd); -// todo: share interface with ha_innodb.c +extern "C" int thd_binlog_format(const MYSQL_THD thd); +// todo: share interface with ha_innodb.c -enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); +enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, + bool all); /* a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct. @@ -45,7 +46,7 @@ void wsrep_cleanup_transaction(THD *thd) { if (thd->wsrep_seqno_changed) { - if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle)) + if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("set committed fail")); WSREP_WARN("set committed fail: %llu %d", @@ -59,7 +60,7 @@ void wsrep_cleanup_transaction(THD *thd) } thd->wsrep_exec_mode= LOCAL_STATE; } - thd->wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID; + thd->wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID; } /* @@ -87,7 +88,7 @@ void wsrep_register_hton(THD* thd, bool all) } /* - wsrep exploits binlog's caches even if binlogging itself is not + wsrep exploits binlog's caches even if binlogging itself is not activated. In such case connection close needs calling actual binlog's method. Todo: split binlog hton from its caches to use ones by wsrep @@ -100,7 +101,7 @@ wsrep_close_connection(handlerton* hton, THD* thd) if (thd_get_ha_data(thd, binlog_hton) != NULL) binlog_hton->close_connection (binlog_hton, thd); DBUG_RETURN(0); -} +} /* prepare/wsrep_run_wsrep_commit can fail in two ways @@ -117,7 +118,7 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all) //wsrep_seqno_t old = thd->wsrep_trx_seqno; #endif DBUG_ENTER("wsrep_prepare"); - if ((all || + if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd))) { @@ -156,16 +157,16 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all) if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) { - if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) + if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle)) { DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", + WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", (long long)thd->real_id, thd->query()); } } int rcode = 0; - if (!wsrep_emulate_bin_log) + if (!wsrep_emulate_bin_log) { if (all) thd_binlog_trx_reset(thd); } @@ -183,14 +184,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all) extern Rpl_filter* binlog_filter; extern my_bool opt_log_slave_updates; -extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len); + enum wsrep_trx_status -wsrep_run_wsrep_commit( - THD *thd, handlerton *hton, bool all) +wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all) { - int rcode = -1; - uint data_len = 0; - uchar *rbr_data = NULL; + int rcode= -1; + int data_len = 0; IO_CACHE *cache; int replay_round= 0; @@ -200,9 +199,9 @@ wsrep_run_wsrep_commit( } DBUG_ENTER("wsrep_run_wsrep_commit"); - if (thd->slave_thread && !opt_log_slave_updates) { - DBUG_RETURN(WSREP_TRX_OK); - } + + if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK); + if (thd->wsrep_exec_mode == REPL_RECV) { mysql_mutex_lock(&thd->LOCK_wsrep_thd); @@ -220,9 +219,9 @@ wsrep_run_wsrep_commit( } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } - if (thd->wsrep_exec_mode != LOCAL_STATE) { - DBUG_RETURN(WSREP_TRX_OK); - } + + if (thd->wsrep_exec_mode != LOCAL_STATE) DBUG_RETURN(WSREP_TRX_OK); + if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) { WSREP_DEBUG("commit for consistency check: %s", thd->query()); DBUG_RETURN(WSREP_TRX_OK); @@ -244,10 +243,9 @@ wsrep_run_wsrep_commit( mysql_mutex_lock(&LOCK_wsrep_replaying); - while (wsrep_replaying > 0 && + while (wsrep_replaying > 0 && thd->wsrep_conflict_state == NO_CONFLICT && - thd->killed == NOT_KILLED && - !shutdown_in_progress) + !shutdown_in_progress) { mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -265,9 +263,12 @@ wsrep_run_wsrep_commit( struct timespec wtime = {0, 1000000}; mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying, &wtime); + if (replay_round++ % 100000 == 0) - WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)", - wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round); + WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) " + "conflict: %d (round: %d)", + wsrep_replaying, thd->thread_id, + thd->wsrep_conflict_state, replay_round); mysql_mutex_unlock(&LOCK_wsrep_replaying); @@ -288,7 +289,8 @@ wsrep_run_wsrep_commit( WSREP_DEBUG("innobase_commit abort after replaying wait %s", (thd->query()) ? thd->query() : "void"); DBUG_RETURN(WSREP_TRX_ROLLBACK); - } + } + thd->wsrep_query_state = QUERY_COMMITTING; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); @@ -296,31 +298,31 @@ wsrep_run_wsrep_commit( rcode = 0; if (cache) { thd->binlog_flush_pending_rows_event(true); - rcode = wsrep_write_cache(cache, &rbr_data, &data_len); - if (rcode) { - WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode); - if (data_len) my_free(rbr_data); + rcode = wsrep_write_cache(wsrep, thd, cache, (size_t*)&data_len); + if (WSREP_OK != rcode) { + WSREP_ERROR("rbr write fail, data_len: %zu, %d", data_len, rcode); DBUG_RETURN(WSREP_TRX_ROLLBACK); } } - if (data_len == 0) + + if (data_len == 0) { mysql_mutex_lock(&thd->LOCK_wsrep_thd); thd->wsrep_exec_mode = LOCAL_COMMIT; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - if (thd->get_stmt_da()->is_ok() && + if (thd->get_stmt_da()->is_ok() && thd->get_stmt_da()->affected_rows() > 0 && !binlog_filter->is_on()) { WSREP_DEBUG("empty rbr buffer, query: %s, " - "affected rows: %llu, " - "changed tables: %d, " + "affected rows: %llu, " + "changed tables: %d, " "sql_log_bin: %d, " "wsrep status (%d %d %d)", thd->query(), thd->get_stmt_da()->affected_rows(), stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin, - thd->wsrep_exec_mode, thd->wsrep_query_state, - thd->wsrep_conflict_state); + thd->wsrep_exec_mode, thd->wsrep_query_state, + thd->wsrep_conflict_state); } else { @@ -329,33 +331,31 @@ wsrep_run_wsrep_commit( thd->wsrep_query_state= QUERY_EXEC; DBUG_RETURN(WSREP_TRX_OK); } - if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_trx_handle.trx_id) + + if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id) { - WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %d\n" - "QUERY: %s\n" - " => Skipping replication", - thd->thread_id, data_len, thd->query()); - if (wsrep_debug) - { - wsrep_write_rbr_buf(thd, rbr_data, data_len); - } + WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n" + "QUERY: %s\n" + " => Skipping replication", + thd->thread_id, data_len, thd->query()); rcode = WSREP_TRX_FAIL; } else if (!rcode) { - rcode = wsrep->pre_commit( - wsrep, - (wsrep_conn_id_t)thd->thread_id, - &thd->wsrep_trx_handle, - rbr_data, - data_len, - (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL, - &thd->wsrep_trx_seqno); - switch (rcode) { + if (WSREP_OK == rcode) + rcode = wsrep->pre_commit(wsrep, + (wsrep_conn_id_t)thd->thread_id, + &thd->wsrep_ws_handle, + WSREP_FLAG_COMMIT | + ((thd->wsrep_PA_safe) ? + 0ULL : WSREP_FLAG_PA_UNSAFE), + &thd->wsrep_trx_meta); + + switch (rcode) + { case WSREP_TRX_MISSING: WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", thd->thread_id, thd->query()); - wsrep_write_rbr_buf(thd, rbr_data, data_len); rcode = WSREP_OK; break; case WSREP_BF_ABORT: @@ -375,15 +375,10 @@ wsrep_run_wsrep_commit( } else { WSREP_ERROR("I/O error reading from thd's binlog iocache: " "errno=%d, io cache code=%d", my_errno, cache->error); - if (data_len) my_free(rbr_data); DBUG_ASSERT(0); // failure like this can not normally happen DBUG_RETURN(WSREP_TRX_ERROR); } - if (data_len) { - my_free(rbr_data); - } - mysql_mutex_lock(&thd->LOCK_wsrep_thd); switch(rcode) { case 0: @@ -392,8 +387,8 @@ wsrep_run_wsrep_commit( if (thd->transaction.xid_state.xid.get_my_xid()) { wsrep_xid_init(&thd->transaction.xid_state.xid, - wsrep_cluster_uuid(), - thd->wsrep_trx_seqno); + &thd->wsrep_trx_meta.gtid.uuid, + thd->wsrep_trx_meta.gtid.seqno); } DBUG_PRINT("wsrep", ("replicating commit success")); @@ -407,7 +402,7 @@ wsrep_run_wsrep_commit( if (thd->wsrep_conflict_state == MUST_ABORT) { thd->wsrep_conflict_state= ABORTED; - } + } else { WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state); @@ -465,14 +460,15 @@ mysql_declare_plugin(wsrep) &wsrep_storage_engine, "wsrep", "Codership Oy", - "A pseudo storage engine to represent transactions in multi-master synchornous replication", + "A pseudo storage engine to represent transactions in multi-master " + "synchornous replication", PLUGIN_LICENSE_GPL, wsrep_hton_init, /* Plugin Init */ NULL, /* Plugin Deinit */ 0x0100 /* 1.0 */, NULL, /* status variables */ NULL, /* system variables */ - NULL, /* config options */ + NULL, /* config options */ 0, /* flags */ } mysql_declare_plugin_end; diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index af8b025cfd7..6cbbfc47ad3 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -21,6 +21,7 @@ #include "wsrep_sst.h" #include "wsrep_utils.h" #include "wsrep_var.h" +#include "wsrep_binlog.h" #include <cstdio> #include <cstdlib> #include "log_event.h" @@ -37,34 +38,32 @@ const char* wsrep_data_home_dir = NULL; const char* wsrep_dbug_option = ""; long wsrep_slave_threads = 1; // # of slave action appliers wanted +int wsrep_slave_count_change = 0; // # of appliers to stop or start my_bool wsrep_debug = 0; // enable debug level logging my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx my_bool wsrep_auto_increment_control = 1; // control auto increment variables my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey my_bool wsrep_incremental_data_collection = 0; // incremental data collection -long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size -long wsrep_max_ws_rows = 65536; // max number of rows in ws +ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size +ulong wsrep_max_ws_rows = 65536; // max number of rows in ws int wsrep_to_isolation = 0; // # of active TO isolation threads my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key long wsrep_max_protocol_version = 2; // maximum protocol version to use ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; my_bool wsrep_recovery = 0; // recovery my_bool wsrep_replicate_myisam = 0; // enable myisam replication -my_bool wsrep_log_conflicts = 0; // +my_bool wsrep_log_conflicts = 0; ulong wsrep_mysql_replication_bundle = 0; +my_bool wsrep_desync = 0; // desynchronize the node from the + // cluster my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals -my_bool wsrep_desync = 0; // desynchronize the node from the cluster /* * End configuration options */ static const wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; -const wsrep_uuid_t* wsrep_cluster_uuid() -{ - return &cluster_uuid; -} static char cluster_uuid_str[40]= { 0, }; static const char* cluster_status_str[WSREP_VIEW_MAX] = { @@ -102,7 +101,18 @@ long wsrep_protocol_version = 2; // if there was no state gap on receiving first view event. static my_bool wsrep_startup = TRUE; -/* wsrep callbacks */ +extern wsrep_cb_status_t wsrep_apply_cb(void *ctx, + const void* buf, size_t buf_len, + const wsrep_trx_meta_t* meta); + +extern wsrep_cb_status_t wsrep_commit_cb(void *ctx, + const wsrep_trx_meta_t* meta, + wsrep_bool_t *exit, + wsrep_bool_t commit); + +extern wsrep_cb_status_t wsrep_unordered_cb(void* ctx, + const void* data, + size_t size); static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { switch (level) { @@ -186,19 +196,22 @@ void wsrep_get_SE_checkpoint(XID* xid) plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); } -static void wsrep_view_handler_cb (void* app_ctx, - void* recv_ctx, - const wsrep_view_info_t* view, - const char* state, - size_t state_len, - void** sst_req, - ssize_t* sst_req_len) +static wsrep_cb_status_t +wsrep_view_handler_cb (void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view, + const char* state, + size_t state_len, + void** sst_req, + size_t* sst_req_len) { wsrep_member_status_t new_status= local_status.get(); - if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t))) + if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t))) { - memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid)); + memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid, + sizeof(cluster_uuid)); + wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, sizeof(cluster_uuid_str)); } @@ -210,7 +223,7 @@ static void wsrep_view_handler_cb (void* app_ctx, WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " "number of nodes: %ld, my index: %ld, protocol version %d", - wsrep_cluster_state_uuid, (long long)view->seqno, + wsrep_cluster_state_uuid, (long long)view->state_id.seqno, (long long)wsrep_cluster_conf_id, wsrep_cluster_status, wsrep_cluster_size, wsrep_local_index, view->proto_ver); @@ -290,14 +303,14 @@ static void wsrep_view_handler_cb (void* app_ctx, { wsrep_SE_init_grab(); // Signal mysqld init thread to continue - wsrep_sst_complete (&cluster_uuid, view->seqno, false); + wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false); // and wait for SE initialization wsrep_SE_init_wait(); } else { local_uuid= cluster_uuid; - local_seqno= view->seqno; + local_seqno= view->state_id.seqno; } /* Init storage engine XIDs from first view */ XID xid; @@ -310,7 +323,7 @@ static void wsrep_view_handler_cb (void* app_ctx, if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) { WSREP_ERROR("Undetected state gap. Can't continue."); - wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno, + wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno, &local_uuid, -1); unireg_abort(1); } @@ -322,9 +335,23 @@ static void wsrep_view_handler_cb (void* app_ctx, global_system_variables.auto_increment_increment= view->memb_num; } + { /* capabilities may be updated on new configuration */ + uint64_t const caps(wsrep->capabilities (wsrep)); + + my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0); + if (TRUE == wsrep_incremental_data_collection && FALSE == idc) + { + WSREP_WARN("Unsupported protocol downgrade: " + "incremental data collection disabled. Expect abort."); + } + wsrep_incremental_data_collection = idc; + } + out: wsrep_startup= FALSE; local_status.set(new_status, view); + + return WSREP_CB_SUCCESS; } void wsrep_ready_set (my_bool x) @@ -407,6 +434,8 @@ static void wsrep_init_position() } } +extern const char* my_bind_addr_str; + int wsrep_init() { int rcode= -1; @@ -460,7 +489,7 @@ int wsrep_init() size_t const node_addr_max= sizeof(node_addr) - 1; if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) { - size_t const ret= guess_ip(node_addr, node_addr_max); + size_t const ret= wsrep_guess_ip(node_addr, node_addr_max); if (!(ret > 0 && ret < node_addr_max)) { WSREP_WARN("Failed to guess base node address. Set it explicitly via " @@ -478,38 +507,57 @@ int wsrep_init() if ((!wsrep_node_incoming_address || !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) { - size_t const node_addr_len= strlen(node_addr); - if (node_addr_len > 0) + unsigned int my_bind_ip= INADDR_ANY; // default if not set + if (my_bind_addr_str && strlen(my_bind_addr_str)) { - const char* const colon= strrchr(node_addr, ':'); - if (strchr(node_addr, ':') == colon) // 1 or 0 ':' + my_bind_ip= wsrep_check_ip(my_bind_addr_str); + } + + if (INADDR_ANY != my_bind_ip) + { + if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip) { - size_t const ip_len= colon ? colon - node_addr : node_addr_len; - if (ip_len + 7 /* :55555\0 */ < inc_addr_max) + snprintf(inc_addr, inc_addr_max, "%s:%u", + my_bind_addr_str, (int)mysqld_port); + } // else leave inc_addr an empty string - mysqld is not listening for + // client connections on network interfaces. + } + else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible + { + size_t const node_addr_len= strlen(node_addr); + if (node_addr_len > 0) + { + const char* const colon= strrchr(node_addr, ':'); + if (strchr(node_addr, ':') == colon) // 1 or 0 ':' { - memcpy (inc_addr, node_addr, ip_len); - snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port); + size_t const ip_len= colon ? colon - node_addr : node_addr_len; + if (ip_len + 7 /* :55555\0 */ < inc_addr_max) + { + memcpy (inc_addr, node_addr, ip_len); + snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u", + (int)mysqld_port); + } + else + { + WSREP_WARN("Guessing address for incoming client connections: " + "address too long."); + inc_addr[0]= '\0'; + } } else { WSREP_WARN("Guessing address for incoming client connections: " - "address too long."); + "too many colons :) ."); inc_addr[0]= '\0'; } } - else + + if (!strlen(inc_addr)) { - WSREP_WARN("Guessing address for incoming client connections: " - "too many colons :) ."); - inc_addr[0]= '\0'; + WSREP_WARN("Guessing address for incoming client connections failed. " + "Try setting wsrep_node_incoming_address explicitly."); } } - - if (!strlen(inc_addr)) - { - WSREP_WARN("Guessing address for incoming client connections failed. " - "Try setting wsrep_node_incoming_address explicitly."); - } } else if (!strchr(wsrep_node_incoming_address, ':')) // no port included { @@ -536,6 +584,8 @@ int wsrep_init() struct wsrep_init_args wsrep_args; + struct wsrep_gtid const state_id = { local_uuid, local_seqno }; + wsrep_args.data_dir = wsrep_data_home_dir; wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; wsrep_args.node_address = node_addr; @@ -544,13 +594,13 @@ int wsrep_init() wsrep_provider_options : ""; wsrep_args.proto_ver = wsrep_max_protocol_version; - wsrep_args.state_uuid = &local_uuid; - wsrep_args.state_seqno = local_seqno; + wsrep_args.state_id = &state_id; wsrep_args.logger_cb = wsrep_log_cb; wsrep_args.view_handler_cb = wsrep_view_handler_cb; wsrep_args.apply_cb = wsrep_apply_cb; wsrep_args.commit_cb = wsrep_commit_cb; + wsrep_args.unordered_cb = wsrep_unordered_cb; wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; wsrep_args.synced_cb = wsrep_synced_cb; @@ -679,6 +729,7 @@ bool wsrep_start_replication() */ const char* cluster_address = wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address; + bool const bootstrap(TRUE == wsrep_new_cluster); wsrep_new_cluster= FALSE; WSREP_INFO("Start replication"); @@ -686,7 +737,8 @@ bool wsrep_start_replication() if ((rcode = wsrep->connect(wsrep, wsrep_cluster_name, cluster_address, - wsrep_sst_donor))) + wsrep_sst_donor, + bootstrap))) { if (-ESOCKTNOSUPPORT == rcode) { @@ -707,11 +759,6 @@ bool wsrep_start_replication() { wsrep_connected= TRUE; - uint64_t caps = wsrep->capabilities (wsrep); - - wsrep_incremental_data_collection = - !!(caps & WSREP_CAP_WRITE_SET_INCREMENTS); - char* opts= wsrep->options_get(wsrep); if (opts) { @@ -736,8 +783,8 @@ wsrep_causal_wait (THD* thd) { // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 // TODO: modify to check if thd has locked any rows. - wsrep_seqno_t seqno; - wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno); + wsrep_gtid_t gtid; + wsrep_status_t ret= wsrep->causal_read (wsrep, >id); if (unlikely(WSREP_OK != ret)) { @@ -785,7 +832,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr) { for (size_t i= 0; i < key_arr->keys_len; ++i) { - my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts); + my_free((void*)key_arr->keys[i].key_parts); } my_free(key_arr->keys); key_arr->keys= 0; @@ -805,7 +852,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr) static bool wsrep_prepare_key_for_isolation(const char* db, const char* table, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len) { if (*key_len < 2) return false; @@ -824,13 +871,13 @@ static bool wsrep_prepare_key_for_isolation(const char* db, // sql_print_information("%s.%s", db, table); if (db) { - key[*key_len].buf= db; - key[*key_len].buf_len= strlen(db); + key[*key_len].ptr= db; + key[*key_len].len= strlen(db); ++(*key_len); if (table) { - key[*key_len].buf= table; - key[*key_len].buf_len= strlen(table); + key[*key_len].ptr= table; + key[*key_len].len= strlen(table); ++(*key_len); } } @@ -863,23 +910,23 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, { if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) { - sql_print_error("Can't allocate memory for key_array"); + WSREP_ERROR("Can't allocate memory for key_array"); goto err; } ka->keys_len= 1; - if (!(ka->keys[0].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) + if (!(ka->keys[0].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - sql_print_error("Can't allocate memory for key_parts"); + WSREP_ERROR("Can't allocate memory for key_parts"); goto err; } - ka->keys[0].key_parts_len= 2; + ka->keys[0].key_parts_num= 2; if (!wsrep_prepare_key_for_isolation( db, table, - (wsrep_key_part_t*)ka->keys[0].key_parts, - &ka->keys[0].key_parts_len)) + (wsrep_buf_t*)ka->keys[0].key_parts, + &ka->keys[0].key_parts_num)) { - sql_print_error("Preparing keys for isolation failed"); + WSREP_ERROR("Preparing keys for isolation failed"); goto err; } } @@ -895,24 +942,24 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd, MYF(MY_ALLOW_ZERO_PTR)); if (!tmp) { - sql_print_error("Can't allocate memory for key_array"); + WSREP_ERROR("Can't allocate memory for key_array"); goto err; } ka->keys= tmp; - if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) + if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*) + my_malloc(sizeof(wsrep_buf_t)*2, MYF(0)))) { - sql_print_error("Can't allocate memory for key_parts"); + WSREP_ERROR("Can't allocate memory for key_parts"); goto err; } - ka->keys[ka->keys_len].key_parts_len= 2; + ka->keys[ka->keys_len].key_parts_num= 2; ++ka->keys_len; if (!wsrep_prepare_key_for_isolation( table->db, table->table_name, - (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts, - &ka->keys[ka->keys_len - 1].key_parts_len)) + (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts, + &ka->keys[ka->keys_len - 1].key_parts_num)) { - sql_print_error("Preparing keys for isolation failed"); + WSREP_ERROR("Preparing keys for isolation failed"); goto err; } } @@ -929,7 +976,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, size_t cache_key_len, const uchar* row_id, size_t row_id_len, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len) { if (*key_len < 3) return false; @@ -939,28 +986,30 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, { case 0: { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = cache_key_len; - ++(*key_len); + key[0].ptr = cache_key; + key[0].len = cache_key_len; + + *key_len = 1; break; } case 1: case 2: { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = strlen( (char*)cache_key ); - ++(*key_len); - key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1; - key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) ); - ++(*key_len); + key[0].ptr = cache_key; + key[0].len = strlen( (char*)cache_key ); + + key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1; + key[1].len = strlen( (char*)(key[1].ptr) ); + + *key_len = 2; break; } default: return false; } - key[*key_len].buf = row_id; - key[*key_len].buf_len = row_id_len; + key[*key_len].ptr = row_id; + key[*key_len].len = row_id_len; ++(*key_len); return true; @@ -973,7 +1022,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key, * Return 0 in case of success, 1 in case of error. */ int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len) + THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len) { IO_CACHE tmp_io_cache; if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, @@ -985,8 +1034,8 @@ int wsrep_to_buf_helper( if (thd->wsrep_TOI_pre_query) { Query_log_event ev(thd, thd->wsrep_TOI_pre_query, - thd->wsrep_TOI_pre_query_len, - FALSE, FALSE, FALSE, 0); + thd->wsrep_TOI_pre_query_len, + FALSE, FALSE, FALSE, 0); if (ev.write(&tmp_io_cache)) ret= 1; } @@ -994,7 +1043,7 @@ int wsrep_to_buf_helper( Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); if (ev.write(&tmp_io_cache)) ret= 1; - if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1; + if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, (size_t*)buf_len)) ret= 1; close_cached_file(&tmp_io_cache); return ret; @@ -1002,7 +1051,7 @@ int wsrep_to_buf_helper( #include "sql_show.h" static int -create_view_query(THD *thd, uchar** buf, uint* buf_len) +create_view_query(THD *thd, uchar** buf, int* buf_len) { LEX *lex= thd->lex; SELECT_LEX *select_lex= &lex->select_lex; @@ -1021,15 +1070,15 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) if (!lex->definer) { /* - DEFINER-clause is missing; we have to create default definer in - persistent arena to be PS/SP friendly. - If this is an ALTER VIEW then the current user should be set as - the definer. + DEFINER-clause is missing; we have to create default definer in + persistent arena to be PS/SP friendly. + If this is an ALTER VIEW then the current user should be set as + the definer. */ if (!(lex->definer= create_default_definer(thd))) { - WSREP_WARN("view default definer issue"); + WSREP_WARN("view default definer issue"); } } @@ -1056,7 +1105,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len) List_iterator_fast<LEX_STRING> names(lex->view_list); LEX_STRING *name; int i; - + for (i= 0; (name= names++); i++) { buff.append(i ? ", " : "("); @@ -1079,11 +1128,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, { wsrep_status_t ret(WSREP_WARNING); uchar* buf(0); - uint buf_len(0); + int buf_len(0); int buf_err; - WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode, thd->query() ); + WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), + thd->wsrep_exec_mode, thd->query() ); switch (thd->lex->sql_command) { case SQLCOM_CREATE_VIEW: @@ -1106,19 +1155,20 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, } wsrep_key_arr_t key_arr= {0, 0}; + struct wsrep_buf buff = { buf, buf_len }; if (!buf_err && wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, key_arr.keys, key_arr.keys_len, - buf, buf_len, - &thd->wsrep_trx_seqno))) + &buff, 1, + &thd->wsrep_trx_meta))) { thd->wsrep_exec_mode= TOTAL_ORDER; wsrep_to_isolation++; if (buf) my_free(buf); wsrep_keys_free(&key_arr); - WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode); + WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd), + thd->wsrep_exec_mode); } else { /* jump to error handler in mysql_execute_command() */ @@ -1137,10 +1187,10 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, static void wsrep_TOI_end(THD *thd) { wsrep_status_t ret; wsrep_to_isolation--; - WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void") if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { - WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno); + WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd)); } else { WSREP_WARN("TO isolation end failed for: %d, sql: %s", @@ -1151,7 +1201,7 @@ static void wsrep_TOI_end(THD *thd) { static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) { wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); ret = wsrep->desync(wsrep); @@ -1196,7 +1246,7 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) static void wsrep_RSU_end(THD *thd) { wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, + WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd), thd->wsrep_exec_mode, thd->query() ); @@ -1276,10 +1326,10 @@ void wsrep_to_isolation_end(THD *thd) { "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ msg, \ - req->thread_id, (long long)req->wsrep_trx_seqno, \ + req->thread_id, (long long)wsrep_thd_trx_seqno(req), \ req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ req->get_command(), req->lex->sql_command, req->query(), \ - gra->thread_id, (long long)gra->wsrep_trx_seqno, \ + gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \ gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->get_command(), gra->lex->sql_command, gra->query()); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 67bc9d0ea15..276f2a7b580 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -28,7 +28,6 @@ class THD; #ifdef WITH_WSREP #include "../wsrep/wsrep_api.h" -//#include "wsrep_mysqld.h" enum wsrep_exec_mode { LOCAL_STATE, REPL_RECV, @@ -73,15 +72,16 @@ extern const char* wsrep_node_incoming_address; extern const char* wsrep_data_home_dir; extern const char* wsrep_dbug_option; extern long wsrep_slave_threads; -extern my_bool wsrep_debug; +extern int wsrep_slave_count_change; +extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug; extern my_bool wsrep_convert_LOCK_to_trx; extern ulong wsrep_retry_autocommit; extern my_bool wsrep_auto_increment_control; extern my_bool wsrep_drupal_282555_workaround; extern my_bool wsrep_incremental_data_collection; extern const char* wsrep_start_position; -extern long long wsrep_max_ws_size; -extern long wsrep_max_ws_rows; +extern ulong wsrep_max_ws_size; +extern ulong wsrep_max_ws_rows; extern const char* wsrep_notify_cmd; extern my_bool wsrep_certify_nonPK; extern long wsrep_max_protocol_version; @@ -136,13 +136,13 @@ extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd); extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); extern "C" const char * wsrep_thd_query_state_str(THD *thd); -extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); +extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd); extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); extern "C" void wsrep_thd_set_query_state( - THD *thd, enum wsrep_query_state state); + THD *thd, enum wsrep_query_state state); extern "C" void wsrep_thd_set_conflict_state( - THD *thd, enum wsrep_conflict_state state); + THD *thd, enum wsrep_conflict_state state); extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id); @@ -259,20 +259,12 @@ extern mysql_mutex_t LOCK_wsrep_rollback; extern mysql_cond_t COND_wsrep_rollback; extern int wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_replaying; -extern mysql_cond_t COND_wsrep_replaying; -extern wsrep_aborting_thd_t wsrep_aborting_thd; -extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug; -extern my_bool wsrep_convert_LOCK_to_trx; -extern ulong wsrep_retry_autocommit; -extern my_bool wsrep_emulate_bin_log; -extern my_bool wsrep_auto_increment_control; -extern my_bool wsrep_drupal_282555_workaround; -extern long long wsrep_max_ws_size; -extern long wsrep_max_ws_rows; -extern int wsrep_to_isolation; -extern my_bool wsrep_certify_nonPK; +extern mysql_cond_t COND_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_mutex_t LOCK_wsrep_desync; +extern wsrep_aborting_thd_t wsrep_aborting_thd; +extern my_bool wsrep_emulate_bin_log; +extern int wsrep_to_isolation; extern PSI_mutex_key key_LOCK_wsrep_ready; extern PSI_mutex_key key_COND_wsrep_ready; @@ -295,12 +287,11 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, void wsrep_to_isolation_end(THD *thd); int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len); -int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len); -int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len); -int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len); + THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len); +int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len); +int wsrep_create_trigger_query(THD *thd, uchar** buf, int* buf_len); +int wsrep_create_event_query(THD *thd, uchar** buf, int* buf_len); -const wsrep_uuid_t* wsrep_cluster_uuid(); struct xid_t; void wsrep_set_SE_checkpoint(xid_t*); diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h index 291823d773e..5c66587d757 100644 --- a/sql/wsrep_priv.h +++ b/sql/wsrep_priv.h @@ -32,24 +32,22 @@ ssize_t wsrep_sst_prepare (void** msg); wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const void* msg, size_t msg_len, - const wsrep_gtid_t* state_id, + const wsrep_gtid_t* current_id, const char* state, size_t state_len, bool bypass); +extern unsigned int wsrep_check_ip (const char* addr); +extern size_t wsrep_guess_ip (char* buf, size_t buf_len); +extern size_t wsrep_guess_address(char* buf, size_t buf_len); extern wsrep_uuid_t local_uuid; extern wsrep_seqno_t local_seqno; // a helper function -void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t, +extern void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t, const void*, size_t); /*! SST thread signals init thread about sst completion */ -void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); - -extern void wsrep_notify_status (wsrep_member_status_t new_status, - const wsrep_view_info_t* view = 0); - -/* binlog-related stuff */ -int wsrep_write_cache(IO_CACHE *cache, uchar **buf, size_t *buf_len); - +extern void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); +void wsrep_notify_status (wsrep_member_status_t new_status, + const wsrep_view_info_t* view = 0); #endif /* WSREP_PRIV_H */ diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index d651e1ed0a9..6a97b29ff6d 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -213,8 +213,8 @@ bool wsrep_sst_wait () // Signal end of SST void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, - wsrep_seqno_t sst_seqno, - bool needed) + wsrep_seqno_t sst_seqno, + bool needed) { if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); if (!sst_complete) @@ -232,13 +232,26 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, mysql_mutex_unlock (&LOCK_wsrep_sst); } +void wsrep_sst_received (wsrep_t* const wsrep, + const wsrep_uuid_t* const uuid, + wsrep_seqno_t const seqno, + const void* const state, + size_t const state_len) +{ + int const rcode(seqno < 0 ? seqno : 0); + wsrep_gtid_t const state_id = { + *uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno) + }; + wsrep->sst_received(wsrep, &state_id, state, state_len, rcode); +} + // Let applier threads to continue void wsrep_sst_continue () { if (sst_needed) { WSREP_INFO("Signalling provider to continue."); - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); } } @@ -434,7 +447,6 @@ static ssize_t sst_prepare_other (const char* method, return ret; } -//extern ulong my_bind_addr; extern uint mysqld_port; /*! Just tells donor where to send mysqldump */ @@ -518,7 +530,7 @@ ssize_t wsrep_sst_prepare (void** msg) } else { - ssize_t ret= guess_ip (ip_buf, ip_max); + ssize_t ret= wsrep_guess_ip (ip_buf, ip_max); if (ret && ret < ip_max) { @@ -706,7 +718,9 @@ static int sst_donate_mysqldump (const char* addr, ret= sst_run_shell (cmd_str, 3); } - wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno); + wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)}; + + wsrep->sst_sent (wsrep, &state_id, ret); return ret; } @@ -896,7 +910,10 @@ wait_signal: } // signal to donor that SST is over - wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno); + struct wsrep_gtid const state_id = { + ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno + }; + wsrep->sst_sent (wsrep, &state_id, -err); proc.wait(); return NULL; @@ -950,10 +967,9 @@ static int sst_donate_other (const char* method, return arg.err; } -int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, +wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const void* msg, size_t msg_len, - const wsrep_uuid_t* current_uuid, - wsrep_seqno_t current_seqno, + const wsrep_gtid_t* current_gtid, const char* state, size_t state_len, bool bypass) { @@ -967,20 +983,19 @@ int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const char* data = method + method_len + 1; char uuid_str[37]; - wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str)); + wsrep_uuid_print (¤t_gtid->uuid, uuid_str, sizeof(uuid_str)); int ret; if (!strcmp (WSREP_SST_MYSQLDUMP, method)) { - ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno, - bypass); + ret = sst_donate_mysqldump(data, ¤t_gtid->uuid, uuid_str, + current_gtid->seqno, bypass); } else { - ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass); + ret = sst_donate_other(method, data, uuid_str, current_gtid->seqno,bypass); } - - return (ret > 0 ? 0 : ret); + return (ret > 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE); } void wsrep_SE_init_grab() diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc index 9cbd32cac73..35f5a9bca6a 100644 --- a/sql/wsrep_thd.cc +++ b/sql/wsrep_thd.cc @@ -113,17 +113,17 @@ void wsrep_replay_transaction(THD *thd) /* checking if BF trx must be replayed */ if (thd->wsrep_conflict_state== MUST_REPLAY) { if (thd->wsrep_exec_mode!= REPL_RECV) { - if (thd->stmt_da->is_sent) + if (thd->get_stmt_da()->is_sent()) { WSREP_ERROR("replay issue, thd has reported status already"); } - thd->stmt_da->reset_diagnostics_area(); + thd->get_stmt_da()->reset_diagnostics_area(); thd->wsrep_conflict_state= REPLAYING; mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - mysql_reset_thd_for_next_command(thd); - thd->killed= THD::NOT_KILLED; + mysql_reset_thd_for_next_command(thd, opt_userstat_running); + thd->killed= NOT_KILLED; close_thread_tables(thd); if (thd->locked_tables_mode && thd->lock) { @@ -157,7 +157,7 @@ void wsrep_replay_transaction(THD *thd) wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); WSREP_DEBUG("trx_replay successful for: %ld %llu", thd->thread_id, (long long)thd->real_id); - if (thd->stmt_da->is_sent) + if (thd->get_stmt_da()->is_sent()) { WSREP_WARN("replay ok, thd has reported status"); } @@ -167,7 +167,7 @@ void wsrep_replay_transaction(THD *thd) } break; case WSREP_TRX_FAIL: - if (thd->stmt_da->is_sent) + if (thd->get_stmt_da()->is_sent()) { WSREP_ERROR("replay failed, thd has reported status"); } @@ -237,7 +237,7 @@ static void wsrep_replication_process(THD *thd) * avoid mysql shutdown. This is because the killer will then handle * shutdown processing (or replication restarting) */ - if (thd->killed != THD::KILL_CONNECTION) + if (thd->killed != KILL_CONNECTION) { wsrep_kill_mysql(thd); } @@ -289,7 +289,7 @@ static void wsrep_rollback_process(THD *thd) mysql_mutex_lock(&LOCK_wsrep_rollback); wsrep_aborting_thd= NULL; - while (thd->killed == THD::NOT_KILLED) { + while (thd->killed == NOT_KILLED) { thd_proc_info(thd, "wsrep aborter idle"); thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; thd->mysys_var->current_cond= &COND_wsrep_rollback; diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index 37e537c62e4..b01bdaaa15e 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -22,16 +22,17 @@ #include "wsrep_utils.h" #include "wsrep_mysqld.h" -//#include "wsrep_api.h" -//#include "wsrep_priv.h" + +#include <sql_class.h> + #include <spawn.h> // posix_spawn() #include <unistd.h> // pipe() #include <errno.h> // errno #include <string.h> // strerror() #include <sys/wait.h> // waitpid() - -#include <sql_class.h> -#include "wsrep_priv.h" +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> // getaddrinfo() extern char** environ; // environment variables @@ -317,23 +318,69 @@ thd::~thd () } // namespace wsp -extern ulong my_bind_addr; -extern uint mysqld_port; +/* Returns INADDR_NONE, INADDR_ANY, INADDR_LOOPBACK or something else */ +unsigned int wsrep_check_ip (const char* const addr) +{ + unsigned int ret = INADDR_NONE; + struct addrinfo *res, hints; + + memset (&hints, 0, sizeof(hints)); + hints.ai_flags= AI_PASSIVE/*|AI_ADDRCONFIG*/; + hints.ai_socktype= SOCK_STREAM; + hints.ai_family= AF_UNSPEC; -size_t guess_ip (char* buf, size_t buf_len) + int gai_ret = getaddrinfo(addr, NULL, &hints, &res); + if (0 == gai_ret) + { + if (AF_INET == res->ai_family) /* IPv4 */ + { + struct sockaddr_in* a= (struct sockaddr_in*)res->ai_addr; + ret= htonl(a->sin_addr.s_addr); + } + else /* IPv6 */ + { + struct sockaddr_in6* a= (struct sockaddr_in6*)res->ai_addr; + if (IN6_IS_ADDR_UNSPECIFIED(&a->sin6_addr)) + ret= INADDR_ANY; + else if (IN6_IS_ADDR_LOOPBACK(&a->sin6_addr)) + ret= INADDR_LOOPBACK; + else + ret= 0xdeadbeef; + } + freeaddrinfo (res); + } + else { + WSREP_ERROR ("getaddrinfo() failed on '%s': %d (%s)", + addr, gai_ret, gai_strerror(gai_ret)); + } + + // uint8_t* b= (uint8_t*)&ret; + // fprintf (stderr, "########## wsrep_check_ip returning: %hhu.%hhu.%hhu.%hhu\n", + // b[0], b[1], b[2], b[3]); + + return ret; +} + +extern const char* my_bind_addr_str; +extern uint mysqld_port; + +size_t wsrep_guess_ip (char* buf, size_t buf_len) { size_t ip_len = 0; - if (htonl(INADDR_NONE) == my_bind_addr) { - WSREP_ERROR("Networking not configured, cannot receive state transfer."); - return 0; - } + if (my_bind_addr_str && strlen(my_bind_addr_str)) + { + unsigned int const ip_type= wsrep_check_ip(my_bind_addr_str); - if (htonl(INADDR_ANY) != my_bind_addr) { - uint8_t* b = (uint8_t*)&my_bind_addr; - ip_len = snprintf (buf, buf_len, - "%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]); - return ip_len; + if (INADDR_NONE == ip_type) { + WSREP_ERROR("Networking not configured, cannot receive state transfer."); + return 0; + } + + if (INADDR_ANY != ip_type) {; + strncpy (buf, my_bind_addr_str, buf_len); + return strlen(buf); + } } // mysqld binds to all interfaces - try IP from wsrep_node_address @@ -404,9 +451,9 @@ size_t guess_ip (char* buf, size_t buf_len) return ip_len; } -size_t guess_address(char* buf, size_t buf_len) +size_t wsrep_guess_address(char* buf, size_t buf_len) { - size_t addr_len = guess_ip (buf, buf_len); + size_t addr_len = wsrep_guess_ip (buf, buf_len); if (addr_len && addr_len < buf_len) { addr_len += snprintf (buf + addr_len, buf_len - addr_len, diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index bd041ed51ff..c3c795a291d 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -38,7 +38,6 @@ const char* wsrep_node_address = 0; const char* wsrep_node_incoming_address = 0; const char* wsrep_start_position = 0; ulong wsrep_OSU_method_options; -static int wsrep_thread_change = 0; int wsrep_init_vars() { @@ -61,15 +60,6 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type) // FIXME: this variable probably should be changed only per session thd->variables.wsrep_on = global_system_variables.wsrep_on; } - else { - } - -#ifdef REMOVED - if (thd->variables.wsrep_on) - thd->variables.option_bits |= (OPTION_BIN_LOG); - else - thd->variables.option_bits &= ~(OPTION_BIN_LOG); -#endif return false; } @@ -78,8 +68,6 @@ void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) if (var_type == OPT_GLOBAL) { thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; } - else { - } } static int wsrep_start_position_verify (const char* start_str) @@ -149,7 +137,7 @@ bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type) wsrep_set_local_position (wsrep_start_position); if (wsrep) { - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); + wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); } return 0; @@ -455,7 +443,7 @@ void wsrep_node_address_init (const char* value) bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) { mysql_mutex_lock(&LOCK_wsrep_slave_threads); - wsrep_thread_change = var->value->val_int() - wsrep_slave_threads; + wsrep_slave_count_change = var->value->val_int() - wsrep_slave_threads; mysql_mutex_unlock(&LOCK_wsrep_slave_threads); return 0; @@ -463,13 +451,10 @@ bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) { - if (wsrep_thread_change > 0) - { - wsrep_create_appliers(wsrep_thread_change); - } - else if (wsrep_thread_change < 0) + if (wsrep_slave_count_change > 0) { - wsrep_close_applier_threads(-wsrep_thread_change); + wsrep_create_appliers(wsrep_slave_count_change); + wsrep_slave_count_change = 0; } return false; } diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 7cc48e261c7..554acb1dcd1 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -127,9 +127,9 @@ extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_wsrep_rollback; extern MYSQL_PLUGIN_IMPORT mysql_cond_t COND_wsrep_rollback; extern MYSQL_PLUGIN_IMPORT wsrep_aborting_thd_t wsrep_aborting_thd; -static inline wsrep_trx_handle_t* -wsrep_trx_handle(THD* thd, const trx_t* trx) { - return wsrep_trx_handle_for_id(wsrep_thd_trx_handle(thd), +static inline wsrep_ws_handle_t* +wsrep_ws_handle(THD* thd, const trx_t* trx) { + return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), (wsrep_trx_id_t)trx->id); } @@ -137,7 +137,7 @@ extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key, size_t cache_key_len, const uchar* row_id, size_t row_id_len, - wsrep_key_part_t* key, + wsrep_buf_t* key, size_t* key_len); extern handlerton * wsrep_hton; @@ -9170,7 +9170,7 @@ wsrep_dict_foreign_find_index( dict_table_t* table, const char** columns, ulint n_cols, - dict_index_t* types_idx, + dict_index_t* types_idx, ibool check_charsets, ulint check_null); @@ -9190,28 +9190,29 @@ wsrep_append_foreign_key( ulint rcode = DB_SUCCESS; char cache_key[513] = {'\0'}; int cache_key_len; + bool const copy = true; - if (!wsrep_on(trx->mysql_thd) || - wsrep_thd_exec_mode(thd) != LOCAL_STATE) + if (!wsrep_on(trx->mysql_thd) || + wsrep_thd_exec_mode(thd) != LOCAL_STATE) return DB_SUCCESS; if (!thd || !foreign || (!foreign->referenced_table && !foreign->foreign_table)) { - WSREP_INFO("FK: %s missing in: %s", - (!thd) ? "thread" : - ((!foreign) ? "constraint" : - ((!foreign->referenced_table) ? - "referenced table" : "foreign table")), - (thd && wsrep_thd_query(thd)) ? + WSREP_INFO("FK: %s missing in: %s", + (!thd) ? "thread" : + ((!foreign) ? "constraint" : + ((!foreign->referenced_table) ? + "referenced table" : "foreign table")), + (thd && wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void"); return DB_ERROR; } - if ( !((referenced) ? + if ( !((referenced) ? foreign->referenced_table : foreign->foreign_table)) { - WSREP_DEBUG("pulling %s table into cache", + WSREP_DEBUG("pulling %s table into cache", (referenced) ? "referenced" : "foreign"); mutex_enter(&(dict_sys->mutex)); if (referenced) @@ -9249,22 +9250,22 @@ wsrep_append_foreign_key( mutex_exit(&(dict_sys->mutex)); } - if ( !((referenced) ? + if ( !((referenced) ? foreign->referenced_table : foreign->foreign_table)) { - WSREP_WARN("FK: %s missing in query: %s", - (!foreign->referenced_table) ? - "referenced table" : "foreign table", - (wsrep_thd_query(thd)) ? + WSREP_WARN("FK: %s missing in query: %s", + (!foreign->referenced_table) ? + "referenced table" : "foreign table", + (wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void"); return DB_ERROR; } byte key[WSREP_MAX_SUPPORTED_KEY_LENGTH+1]; ulint len = WSREP_MAX_SUPPORTED_KEY_LENGTH; - dict_index_t *idx_target = (referenced) ? + dict_index_t *idx_target = (referenced) ? foreign->referenced_index : index; - dict_index_t *idx = (referenced) ? + dict_index_t *idx = (referenced) ? UT_LIST_GET_FIRST(foreign->referenced_table->indexes) : UT_LIST_GET_FIRST(foreign->foreign_table->indexes); int i = 0; @@ -9278,29 +9279,29 @@ wsrep_append_foreign_key( key[0] = (char)i; rcode = wsrep_rec_get_foreign_key( - &key[1], &len, rec, index, idx, + &key[1], &len, rec, index, idx, wsrep_protocol_version > 1); if (rcode != DB_SUCCESS) { WSREP_ERROR( - "FK key set failed: %lu (%lu %lu), index: %s %s, %s", - rcode, referenced, shared, + "FK key set failed: %lu (%lu %lu), index: %s %s, %s", + rcode, referenced, shared, (index && index->name) ? index->name : - "void index", - (index && index->table_name) ? index->table_name : - "void table", + "void index", + (index && index->table_name) ? index->table_name : + "void table", wsrep_thd_query(thd)); return rcode; } strncpy(cache_key, - (wsrep_protocol_version > 1) ? - ((referenced) ? - foreign->referenced_table->name : + (wsrep_protocol_version > 1) ? + ((referenced) ? + foreign->referenced_table->name : foreign->foreign_table->name) : foreign->foreign_table->name, sizeof(cache_key) - 1); cache_key_len = strlen(cache_key); #ifdef WSREP_DEBUG_PRINT ulint j; - fprintf(stderr, "FK parent key, table: %s %s len: %lu ", + fprintf(stderr, "FK parent key, table: %s %s len: %lu ", cache_key, (shared) ? "shared" : "exclusive", len+1); for (j=0; j<len+1; j++) { fprintf(stderr, " %hhX, ", key[j]); @@ -9311,34 +9312,35 @@ wsrep_append_foreign_key( if (p) { *p = '\0'; } else { - WSREP_WARN("unexpected foreign key table %s %s", - foreign->referenced_table->name, + WSREP_WARN("unexpected foreign key table %s %s", + foreign->referenced_table->name, foreign->foreign_table->name); } - wsrep_key_part_t wkey_part[3]; + wsrep_buf_t wkey_part[3]; wsrep_key_t wkey = {wkey_part, 3}; if (!wsrep_prepare_key_for_innodb( - (const uchar*)cache_key, + (const uchar*)cache_key, cache_key_len + 1, (const uchar*)key, len+1, wkey_part, - &wkey.key_parts_len)) { - WSREP_WARN("key prepare failed for cascaded FK: %s", - (wsrep_thd_query(thd)) ? - wsrep_thd_query(thd) : "void"); + (size_t*)&wkey.key_parts_num)) { + WSREP_WARN("key prepare failed for cascaded FK: %s", + (wsrep_thd_query(thd)) ? + wsrep_thd_query(thd) : "void"); return DB_ERROR; } rcode = (int)wsrep->append_key( wsrep, - wsrep_trx_handle(thd, trx), + wsrep_ws_handle(thd, trx), &wkey, - 1, - shared); + 1, + shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE, + copy); if (rcode) { DBUG_PRINT("wsrep", ("row key failed: %lu", rcode)); - WSREP_ERROR("Appending cascaded fk row key failed: %s, %lu", - (wsrep_thd_query(thd)) ? + WSREP_ERROR("Appending cascaded fk row key failed: %s, %lu", + (wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void", rcode); return DB_ERROR; } @@ -9359,40 +9361,42 @@ wsrep_append_key( ) { DBUG_ENTER("wsrep_append_key"); + bool const copy = true; #ifdef WSREP_DEBUG_PRINT - fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ", + fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ", (shared) ? "Shared" : "Exclusive", - wsrep_thd_thread_id(thd), trx->id, key_len, + wsrep_thd_thread_id(thd), trx->id, key_len, table_share->table_name.str); for (int i=0; i<key_len; i++) { fprintf(stderr, "%hhX, ", key[i]); } fprintf(stderr, "\n"); #endif - wsrep_key_part_t wkey_part[3]; + wsrep_buf_t wkey_part[3]; wsrep_key_t wkey = {wkey_part, 3}; if (!wsrep_prepare_key_for_innodb( (const uchar*)table_share->table_cache_key.str, table_share->table_cache_key.length, (const uchar*)key, key_len, wkey_part, - &wkey.key_parts_len)) { - WSREP_WARN("key prepare failed for: %s", - (wsrep_thd_query(thd)) ? + (size_t*)&wkey.key_parts_num)) { + WSREP_WARN("key prepare failed for: %s", + (wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void"); DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } int rcode = (int)wsrep->append_key( wsrep, - wsrep_trx_handle(thd, trx), + wsrep_ws_handle(thd, trx), &wkey, 1, - shared); + shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE, + copy); if (rcode) { DBUG_PRINT("wsrep", ("row key failed: %d", rcode)); - WSREP_WARN("Appending row key failed: %s, %d", - (wsrep_thd_query(thd)) ? + WSREP_WARN("Appending row key failed: %s, %d", + (wsrep_thd_query(thd)) ? wsrep_thd_query(thd) : "void", rcode); DBUG_RETURN(rcode); } @@ -9403,10 +9407,10 @@ ibool wsrep_is_cascding_foreign_key_parent( dict_table_t* table, /*!< in: InnoDB table */ dict_index_t* index /*!< in: InnoDB index */ -) { +) { // return referenced_by_foreign_key(); dict_foreign_t* fk = dict_table_get_referenced_constraint(table, index); - if (fk && + if (fk && (fk->type & DICT_FOREIGN_ON_UPDATE_CASCADE || fk->type & DICT_FOREIGN_ON_UPDATE_SET_NULL) ) { @@ -16915,7 +16919,7 @@ wsrep_fake_trx_id( trx_id_t trx_id = trx_sys_get_new_trx_id(); mutex_exit(&trx_sys->mutex); - (void *)wsrep_trx_handle_for_id(wsrep_thd_trx_handle(thd), trx_id); + (void *)wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id); } #endif /* WITH_WSREP */ diff --git a/storage/innobase/handler/ha_innodb.h b/storage/innobase/handler/ha_innodb.h index d98195145da..3dd16c3e7ff 100644 --- a/storage/innobase/handler/ha_innodb.h +++ b/storage/innobase/handler/ha_innodb.h @@ -470,7 +470,7 @@ extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd); extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); extern "C" const char * wsrep_thd_query_state_str(THD *thd); -extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); +extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd); extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); extern "C" void wsrep_thd_set_query_state( diff --git a/storage/innobase/include/ha_prototypes.h b/storage/innobase/include/ha_prototypes.h index ff197e6ba8e..fa36c7d16aa 100644 --- a/storage/innobase/include/ha_prototypes.h +++ b/storage/innobase/include/ha_prototypes.h @@ -290,7 +290,7 @@ innobase_casedn_str( UNIV_INTERN int wsrep_innobase_kill_one_trx(const trx_t *bf_trx, trx_t *victim_trx, ibool signal); -int wsrep_thd_is_brute_force(void *thd_ptr); +extern "C" int wsrep_thd_is_brute_force(void *thd_ptr); int wsrep_trx_order_before(void *thd1, void *thd2); void wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, unsigned char* str, unsigned int str_length); diff --git a/wsrep/wsrep_api.h b/wsrep/wsrep_api.h index 2cd10afc7ff..987a47db578 100644 --- a/wsrep/wsrep_api.h +++ b/wsrep/wsrep_api.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2009-2011 Codership Oy <info@codership.com> +/* Copyright (C) 2009-2013 Codership Oy <info@codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -9,11 +9,41 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +/*! + @file wsrep API declaration. + + HOW TO READ THIS FILE. + + Due to C language rules this header layout doesn't lend itself to intuitive + reading. So here's the scoop: in the end this header declares two main types: + + * struct wsrep_init_args + + and + + * struct wsrep + + wsrep_init_args contains initialization parameters for wsrep provider like + names, addresses, etc. and pointers to callbacks. The callbacks will be called + by provider when it needs to do something application-specific, like log a + message or apply a writeset. It should be passed to init() call from + wsrep API. It is an application part of wsrep API contract. + + struct wsrep is the interface to wsrep provider. It contains all wsrep API + calls. It is a provider part of wsrep API contract. + + Finally, wsrep_load() method loads (dlopens) wsrep provider library. It is + defined in wsrep_loader.c unit and is part of libwsrep.a (which is not a + wsrep provider, but a convenience library). + + wsrep_unload() does the reverse. + +*/ #ifndef WSREP_H #define WSREP_H @@ -27,14 +57,44 @@ extern "C" { #endif +/************************************************************************** + * * + * wsrep replication API * + * * + **************************************************************************/ + +#define WSREP_INTERFACE_VERSION "24" + +/*! Empty backend spec */ +#define WSREP_NONE "none" + + /*! - * wsrep replication API + * @brief log severity levels, passed as first argument to log handler */ +typedef enum wsrep_log_level +{ + WSREP_LOG_FATAL, //!< Unrecoverable error, application must quit. + WSREP_LOG_ERROR, //!< Operation failed, must be repeated. + WSREP_LOG_WARN, //!< Unexpected condition, but no operational failure. + WSREP_LOG_INFO, //!< Informational message. + WSREP_LOG_DEBUG //!< Debug message. Shows only of compiled with debug. +} wsrep_log_level_t; + +/*! + * @brief error log handler + * + * All messages from wsrep provider are directed to this + * handler, if present. + * + * @param level log level + * @param message log message + */ +typedef void (*wsrep_log_cb_t)(wsrep_log_level_t, const char *); -#define WSREP_INTERFACE_VERSION "23" /*! - * Certain provider capabilities application may need to know + * Certain provider capabilities application may want to know about */ #define WSREP_CAP_MULTI_MASTER ( 1ULL << 0 ) #define WSREP_CAP_CERTIFICATION ( 1ULL << 1 ) @@ -44,89 +104,148 @@ extern "C" { #define WSREP_CAP_PAUSE ( 1ULL << 5 ) #define WSREP_CAP_CAUSAL_READS ( 1ULL << 6 ) #define WSREP_CAP_CAUSAL_TRX ( 1ULL << 7 ) -#define WSREP_CAP_WRITE_SET_INCREMENTS ( 1ULL << 8 ) +#define WSREP_CAP_INCREMENTAL_WRITESET ( 1ULL << 8 ) #define WSREP_CAP_SESSION_LOCKS ( 1ULL << 9 ) #define WSREP_CAP_DISTRIBUTED_LOCKS ( 1ULL << 10 ) #define WSREP_CAP_CONSISTENCY_CHECK ( 1ULL << 11 ) +#define WSREP_CAP_UNORDERED ( 1ULL << 12 ) +#define WSREP_CAP_ANNOTATION ( 1ULL << 13 ) +#define WSREP_CAP_PREORDERED ( 1ULL << 14 ) + /*! - * Write set replication flags + * Writeset flags + * + * COMMIT the writeset and all preceding writesets must be committed + * ROLLBACK all preceding writesets in a transaction must be rolled back + * PA_UNSAFE the writeset cannot be applied in parallel + * ISOLATION the writeset must be applied AND committed in isolation + * COMMUTATIVE the order in which the writeset is applied does not matter + * NATIVE the writeset contains another writeset in this provider format + * + * Note that some of the flags are mutually exclusive (e.g. COMMIT and + * ROLLBACK). */ -#define WSREP_FLAG_PA_SAFE ( 1ULL << 0 ) +#define WSREP_FLAG_COMMIT ( 1ULL << 0 ) +#define WSREP_FLAG_ROLLBACK ( 1ULL << 1 ) +#define WSREP_FLAG_PA_UNSAFE ( 1ULL << 3 ) +#define WSREP_FLAG_ISOLATION ( 1ULL << 2 ) +#define WSREP_FLAG_COMMUTATIVE ( 1ULL << 4 ) +#define WSREP_FLAG_NATIVE ( 1ULL << 5 ) -/* Empty backend spec */ -#define WSREP_NONE "none" typedef uint64_t wsrep_trx_id_t; //!< application transaction ID typedef uint64_t wsrep_conn_id_t; //!< application connection ID typedef int64_t wsrep_seqno_t; //!< sequence number of a writeset, etc. +typedef _Bool wsrep_bool_t; //!< should be the same as standard (C99) bool /*! undefined seqno */ #define WSREP_SEQNO_UNDEFINED (-1) -/*! wsrep status codes */ -typedef enum wsrep_status { - WSREP_OK = 0, //!< success + +/*! wsrep provider status codes */ +typedef enum wsrep_status +{ + WSREP_OK = 0, //!< success WSREP_WARNING, //!< minor warning, error logged WSREP_TRX_MISSING, //!< transaction is not known by wsrep WSREP_TRX_FAIL, //!< transaction aborted, server can continue WSREP_BF_ABORT, //!< trx was victim of brute force abort + WSREP_SIZE_EXCEEDED, //!< data exceeded maximum supported size WSREP_CONN_FAIL, //!< error in client connection, must abort WSREP_NODE_FAIL, //!< error in node state, wsrep must reinit WSREP_FATAL, //!< fatal error, server must abort WSREP_NOT_IMPLEMENTED //!< feature not implemented } wsrep_status_t; -/*! - * @brief log severity levels, passed as first argument to log handler - */ -typedef enum wsrep_log_level + +/*! wsrep callbacks status codes */ +typedef enum wsrep_cb_status { - WSREP_LOG_FATAL, //!< Unrecoverable error, application must quit. - WSREP_LOG_ERROR, //!< Operation failed, must be repeated. - WSREP_LOG_WARN, //!< Unexpected condition, but no operational failure. - WSREP_LOG_INFO, //!< Informational message. - WSREP_LOG_DEBUG //!< Debug message. Shows only of compiled with debug. -} wsrep_log_level_t; + WSREP_CB_SUCCESS = 0, //!< success (as in "not critical failure") + WSREP_CB_FAILURE //!< critical failure (consistency violation) + /* Technically, wsrep provider has no use for specific failure codes since + * there is nothing it can do about it but abort execution. Therefore any + * positive number shall indicate a critical failure. Optionally that value + * may be used by provider to come to a consensus about state consistency + * in a group of nodes. */ +} wsrep_cb_status_t; -/*! - * @brief error log handler - * - * All messages from wsrep library are directed to this - * handler, if present. - * - * @param level log level - * @param message log message - */ -typedef void (*wsrep_log_cb_t)(wsrep_log_level_t, const char *); /*! * UUID type - for all unique IDs */ typedef struct wsrep_uuid { - uint8_t uuid[16]; + uint8_t data[16]; } wsrep_uuid_t; /*! Undefined UUID */ static const wsrep_uuid_t WSREP_UUID_UNDEFINED = {{0,}}; +/*! UUID string representation length, terminating '\0' not included */ +#define WSREP_UUID_STR_LEN 36 + /*! * Scan UUID from string * @return length of UUID string representation or negative error code */ -extern ssize_t +extern int wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid); /*! * Print UUID to string * @return length of UUID string representation or negative error code */ -extern ssize_t +extern int wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len); #define WSREP_MEMBER_NAME_LEN 32 //!< maximum logical member name length #define WSREP_INCOMING_LEN 256 //!< max Domain Name length + 0x00 + +/*! + * Global transaction identifier + */ +typedef struct wsrep_gtid +{ + wsrep_uuid_t uuid; /*!< History UUID */ + wsrep_seqno_t seqno; /*!< Sequence number */ +} wsrep_gtid_t; + +/*! Undefined GTID */ +static const wsrep_gtid_t WSREP_GTID_UNDEFINED = {{{0, }}, -1}; + +/*! Minimum number of bytes guaranteed to store GTID string representation, + * terminating '\0' not included (36 + 1 + 20) */ +#define WSREP_GTID_STR_LEN 57 + + +/*! + * Scan GTID from string + * @return length of GTID string representation or negative error code + */ +extern int +wsrep_gtid_scan(const char* str, size_t str_len, wsrep_gtid_t* gtid); + +/*! + * Print GTID to string + * @return length of GTID string representation or negative error code + */ +extern int +wsrep_gtid_print(const wsrep_gtid_t* gtid, char* str, size_t str_len); + + +/*! + * Transaction meta data + */ +typedef struct wsrep_trx_meta +{ + wsrep_gtid_t gtid; /*!< Global transaction identifier */ + wsrep_seqno_t depends_on; /*!< Sequence number part of the last transaction + this transaction depends on */ +} wsrep_trx_meta_t; + + /*! * member status */ @@ -163,15 +282,14 @@ typedef enum wsrep_view_status { * view of the group */ typedef struct wsrep_view_info { - wsrep_uuid_t uuid; //!< global state UUID - wsrep_seqno_t seqno; //!< global state seqno + wsrep_gtid_t state_id; //!< global state ID wsrep_seqno_t view; //!< global view number wsrep_view_status_t status; //!< view status - bool state_gap; //!< gap between global and local states + wsrep_bool_t state_gap; //!< gap between global and local states int my_idx; //!< index of this member in the view int memb_num; //!< number of members in the view - int proto_ver; //!< application protocol agreed on in the view - wsrep_member_info_t members[1]; //!< array of member information + int proto_ver; //!< application protocol agreed on the view + wsrep_member_info_t members[1];//!< array of member information } wsrep_view_info_t; /*! @@ -208,37 +326,43 @@ typedef struct wsrep_view_info { * @param state current state * @param state_len lenght of current state * @param sst_req location to store SST request - * @param sst_req_len location to store SST request length or error code + * @param sst_req_len location to store SST request length or error code, * value of 0 means no SST. */ -typedef void (*wsrep_view_cb_t) (void* app_ctx, - void* recv_ctx, - const wsrep_view_info_t* view, - const char* state, - size_t state_len, - void** sst_req, - ssize_t* sst_req_len); +typedef enum wsrep_cb_status (*wsrep_view_cb_t) ( + void* app_ctx, + void* recv_ctx, + const wsrep_view_info_t* view, + const char* state, + size_t state_len, + void** sst_req, + size_t* sst_req_len +); + /*! * @brief apply callback * - * This handler is called from wsrep library to apply replicated write set + * This handler is called from wsrep library to apply replicated writeset * Must support brute force applying for multi-master operation * * @param recv_ctx receiver context pointer provided by the application - * @param data data buffer containing the write set + * @param data data buffer containing the writeset * @param size data buffer size - * @param seqno global seqno part of the write set to be applied + * @param meta transaction meta data of the writeset to be applied * * @return success code: * @retval WSREP_OK - * @retval WSREP_NOT_IMPLEMENTED appl. does not support the write set format - * @retval WSREP_ERROR failed to apply the write set + * @retval WSREP_NOT_IMPLEMENTED appl. does not support the writeset format + * @retval WSREP_ERROR failed to apply the writeset */ -typedef enum wsrep_status (*wsrep_apply_cb_t) (void* recv_ctx, - const void* data, - size_t size, - wsrep_seqno_t seqno); +typedef enum wsrep_cb_status (*wsrep_apply_cb_t) ( + void* recv_ctx, + const void* data, + size_t size, + const wsrep_trx_meta_t* meta +); + /*! * @brief commit callback @@ -246,16 +370,38 @@ typedef enum wsrep_status (*wsrep_apply_cb_t) (void* recv_ctx, * This handler is called to commit the changes made by apply callback. * * @param recv_ctx receiver context pointer provided by the application - * @param seqno global seqno part of the write set to be committed + * @param meta transaction meta data of the writeset to be committed + * @param exit set to true to exit recv loop * @param commit true - commit writeset, false - rollback writeset * * @return success code: * @retval WSREP_OK * @retval WSREP_ERROR call failed */ -typedef enum wsrep_status (*wsrep_commit_cb_t) (void* recv_ctx, - wsrep_seqno_t seqno, - bool commit); +typedef enum wsrep_cb_status (*wsrep_commit_cb_t) ( + void* recv_ctx, + const wsrep_trx_meta_t* meta, + wsrep_bool_t* exit, + wsrep_bool_t commit +); + + +/*! + * @brief unordered callback + * + * This handler is called to execute unordered actions (actions that need not + * to be executed in any particular order) attached to writeset. + * + * @param recv_ctx receiver context pointer provided by the application + * @param data data buffer containing the writeset + * @param size data buffer size + */ +typedef enum wsrep_cb_status (*wsrep_unordered_cb_t) ( + void* recv_ctx, + const void* data, + size_t size +); + /*! * @brief a callback to donate state snapshot @@ -270,22 +416,22 @@ typedef enum wsrep_status (*wsrep_commit_cb_t) (void* recv_ctx, * @param recv_ctx receiver context * @param msg state transfer request message * @param msg_len state transfer request message length - * @param uuid current state uuid on this node - * @param seqno current state seqno on this node + * @param gtid current state ID on this node * @param state current wsrep internal state buffer * @param state_len current wsrep internal state buffer len * @param bypass bypass snapshot transfer, only transfer uuid:seqno pair - * @return 0 for success or negative error code */ -typedef int (*wsrep_sst_donate_cb_t) (void* app_ctx, - void* recv_ctx, - const void* msg, - size_t msg_len, - const wsrep_uuid_t* uuid, - wsrep_seqno_t seqno, - const char* state, - size_t state_len, - bool bypass); +typedef enum wsrep_cb_status (*wsrep_sst_donate_cb_t) ( + void* app_ctx, + void* recv_ctx, + const void* msg, + size_t msg_len, + const wsrep_gtid_t* state_id, + const char* state, + size_t state_len, + wsrep_bool_t bypass +); + /*! * @brief a callback to signal application that wsrep state is synced @@ -296,11 +442,11 @@ typedef int (*wsrep_sst_donate_cb_t) (void* app_ctx, * * @param app_ctx application context */ -typedef void (*wsrep_synced_cb_t)(void* app_ctx); +typedef void (*wsrep_synced_cb_t) (void* app_ctx); /*! - * Initialization parameters for wsrep, used as arguments for wsrep_init() + * Initialization parameters for wsrep provider. */ struct wsrep_init_args { @@ -315,24 +461,25 @@ struct wsrep_init_args int proto_ver; //!< Max supported application protocol version /* Application initial state information. */ - const wsrep_uuid_t* state_uuid; //!< Application state sequence UUID - wsrep_seqno_t state_seqno; //!< Applicaiton state sequence number - const char* state; //!< Initial state for wsrep implementation + const wsrep_gtid_t* state_id; //!< Application state GTID + const char* state; //!< Initial state for wsrep provider size_t state_len; //!< Length of state buffer /* Application callbacks */ wsrep_log_cb_t logger_cb; //!< logging handler wsrep_view_cb_t view_handler_cb; //!< group view change handler - /* applier callbacks */ + /* Applier callbacks */ wsrep_apply_cb_t apply_cb; //!< apply callback wsrep_commit_cb_t commit_cb; //!< commit callback + wsrep_unordered_cb_t unordered_cb; //!< callback for unordered actions - /* state snapshot transfer callbacks */ + /* State Snapshot Transfer callbacks */ wsrep_sst_donate_cb_t sst_donate_cb; //!< starting to donate wsrep_synced_cb_t synced_cb; //!< synced with group }; + /*! Type of the stats variable value in struct wsrep_status_var */ typedef enum wsrep_var_type { @@ -355,60 +502,94 @@ struct wsrep_stats_var }; -/*! Key part structure */ -typedef struct wsrep_key_part_ +/*! Abstract data buffer structure */ +typedef struct wsrep_buf { - const void* buf; /*!< Buffer containing key part data */ - size_t buf_len; /*!< Length of buffer */ -} wsrep_key_part_t; + const void* ptr; /*!< Pointer to data buffer */ + size_t len; /*!< Length of buffer */ +} wsrep_buf_t; /*! Key struct used to pass certification keys for transaction handling calls. * A key consists of zero or more key parts. */ -typedef struct wsrep_key_ +typedef struct wsrep_key { - const wsrep_key_part_t* key_parts; /*!< Array of key parts */ - size_t key_parts_len; /*!< Length of key parts array */ + const wsrep_buf_t* key_parts; /*!< Array of key parts */ + size_t key_parts_num; /*!< Number of key parts */ } wsrep_key_t; +/*! Key type: + * EXCLUSIVE conflicts with any key type + * SEMI reserved. If not supported, should be interpeted as EXCLUSIVE + * SHARED conflicts only with EXCLUSIVE keys */ +typedef enum wsrep_key_type +{ + WSREP_KEY_SHARED = 0, + WSREP_KEY_SEMI, + WSREP_KEY_EXCLUSIVE +} wsrep_key_type_t; + +/*! Data type: + * ORDERED state modification event that should be applied and committed + * in order. + * UNORDERED some action that does not modify state and execution of which is + * optional and does not need to happen in order. + * ANNOTATION (human readable) writeset annotation. */ +typedef enum wsrep_data_type +{ + WSREP_DATA_ORDERED = 0, + WSREP_DATA_UNORDERED, + WSREP_DATA_ANNOTATION +} wsrep_data_type_t; + + /*! Transaction handle struct passed for wsrep transaction handling calls */ -typedef struct wsrep_trx_handle_ +typedef struct wsrep_ws_handle { wsrep_trx_id_t trx_id; //!< transaction ID void* opaque; //!< opaque provider transaction context data -} wsrep_trx_handle_t; +} wsrep_ws_handle_t; /*! - * @brief Helper method to reset trx handle state when trx id changes + * @brief Helper method to reset trx writeset handle state when trx id changes * - * Instead of passing wsrep_trx_handle_t directly for wsrep calls, + * Instead of passing wsrep_ws_handle_t directly to wsrep calls, * wrapping handle with this call offloads bookkeeping from * application. */ -static inline wsrep_trx_handle_t* wsrep_trx_handle_for_id( - wsrep_trx_handle_t* trx_handle, - wsrep_trx_id_t trx_id) +static inline wsrep_ws_handle_t* wsrep_ws_handle_for_trx( + wsrep_ws_handle_t* ws_handle, + wsrep_trx_id_t trx_id) { - if (trx_handle->trx_id != trx_id) + if (ws_handle->trx_id != trx_id) { - trx_handle->trx_id = trx_id; - trx_handle->opaque = NULL; + ws_handle->trx_id = trx_id; + ws_handle->opaque = NULL; } - return trx_handle; + return ws_handle; } -typedef struct wsrep_ wsrep_t; +/*! + * A handle for processing preordered actions. + * Must be initialized to WSREP_PO_INITIALIZER before use. + */ +typedef struct wsrep_po_handle { void* opaque; } wsrep_po_handle_t; + +static const wsrep_po_handle_t WSREP_PO_INITIALIZER = { NULL }; + + +typedef struct wsrep wsrep_t; /*! * wsrep interface for dynamically loadable libraries */ -struct wsrep_ { +struct wsrep { const char *version; //!< interface version string /*! * @brief Initializes wsrep provider * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @param args wsrep initialization parameters */ wsrep_status_t (*init) (wsrep_t* wsrep, @@ -417,14 +598,14 @@ struct wsrep_ { /*! * @brief Returns provider capabilities flag bitmap * - * @param wsrep this wsrep handle + * @param wsrep provider handle */ uint64_t (*capabilities) (wsrep_t* wsrep); /*! * @brief Passes provider-specific configuration string to provider. * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @param conf configuration string * * @retval WSREP_OK configuration string was parsed successfully @@ -435,7 +616,7 @@ struct wsrep_ { /*! * @brief Returns provider-specific string with current configuration values. * - * @param wsrep this wsrep handle + * @param wsrep provider handle * * @return a dynamically allocated string with current configuration * parameter values @@ -448,15 +629,20 @@ struct wsrep_ { * Returns when either node is ready to operate as a part of the clsuter * or fails to reach operating status. * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @param cluster_name unique symbolic cluster name * @param cluster_url URL-like cluster address (backend://address) * @param state_donor name of the node to be asked for state transfer. + * @param bootstrap a flag to request initialization of a new wsrep + * service rather then a connection to the existing one. + * clister_url may still carry important initialization + * parameters, like backend spec and/or listen address. */ - wsrep_status_t (*connect) (wsrep_t* wsrep, - const char* cluster_name, - const char* cluster_url, - const char* state_donor); + wsrep_status_t (*connect) (wsrep_t* wsrep, + const char* cluster_name, + const char* cluster_url, + const char* state_donor, + wsrep_bool_t bootstrap); /*! * @brief Closes connection to cluster. @@ -473,7 +659,7 @@ struct wsrep_ { * * This function never returns * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @param recv_ctx receiver context */ wsrep_status_t (*recv)(wsrep_t* wsrep, void* recv_ctx); @@ -487,59 +673,55 @@ struct wsrep_ { * In case of WSREP_OK, starts commit critical section, transaction can * commit. Otherwise transaction must rollback. * - * @param wsrep this wsrep handle - * @param trx_handle transaction which is committing + * @param wsrep provider handle + * @param ws_handle writeset of committing transaction * @param conn_id connection ID - * @param app_data application specific applying data - * @param data_len the size of the applying data * @param flags fine tuning the replication WSREP_FLAG_* - * @param seqno seqno part of the global transaction ID + * @param meta transaction meta data * * @retval WSREP_OK cluster-wide commit succeeded * @retval WSREP_TRX_FAIL must rollback transaction * @retval WSREP_CONN_FAIL must close client connection * @retval WSREP_NODE_FAIL must close all connections and reinit */ - wsrep_status_t (*pre_commit)(wsrep_t* wsrep, - wsrep_conn_id_t conn_id, - wsrep_trx_handle_t* trx_handle, - const void* app_data, - size_t data_len, - uint64_t flags, - wsrep_seqno_t* seqno); + wsrep_status_t (*pre_commit)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + wsrep_ws_handle_t* ws_handle, + uint64_t flags, + wsrep_trx_meta_t* meta); /*! * @brief Releases resources after transaction commit. * * Ends commit critical section. * - * @param wsrep this wsrep handle - * @param trx_handle transaction which is committing + * @param wsrep provider handle + * @param ws_handle writeset of committing transaction * @retval WSREP_OK post_commit succeeded */ wsrep_status_t (*post_commit) (wsrep_t* wsrep, - wsrep_trx_handle_t* trx_handle); + wsrep_ws_handle_t* ws_handle); /*! * @brief Releases resources after transaction rollback. * - * @param wsrep this wsrep handle - * @param trx_handle transaction which is committing + * @param wsrep provider handle + * @param ws_handle writeset of committing transaction * @retval WSREP_OK post_rollback succeeded */ wsrep_status_t (*post_rollback)(wsrep_t* wsrep, - wsrep_trx_handle_t* trx_handle); + wsrep_ws_handle_t* ws_handle); /*! - * @brief Replay trx as a slave write set + * @brief Replay trx as a slave writeset * * If local trx has been aborted by brute force, and it has already * replicated before this abort, we must try if we can apply it as - * slave trx. Note that slave nodes see only trx write sets and certification + * slave trx. Note that slave nodes see only trx writesets and certification * test based on write set content can be different to DBMS lock conflicts. * - * @param wsrep this wsrep handle - * @param trx_handle transaction which is committing + * @param wsrep provider handle + * @param ws_handle writeset of committing transaction * @param trx_ctx transaction context * * @retval WSREP_OK cluster commit succeeded @@ -550,7 +732,7 @@ struct wsrep_ { * @retval WSREP_NODE_FAIL must close all connections and reinit */ wsrep_status_t (*replay_trx)(wsrep_t* wsrep, - wsrep_trx_handle_t* trx_handle, + wsrep_ws_handle_t* ws_handle, void* trx_ctx); /*! @@ -562,76 +744,72 @@ struct wsrep_ { * The kill routine checks that abort is not attmpted against a transaction * which is front of the caller (in total order). * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @param bf_seqno seqno of brute force trx, running this cancel * @param victim_trx transaction to be aborted, and which is committing * - * @retval WSREP_OK abort secceded - * @retval WSREP_WARNING abort failed + * @retval WSREP_OK abort secceded + * @retval WSREP_WARNING abort failed */ wsrep_status_t (*abort_pre_commit)(wsrep_t* wsrep, wsrep_seqno_t bf_seqno, wsrep_trx_id_t victim_trx); /*! - * @brief Appends a query in transaction's write set + * @brief Appends a row reference to transaction writeset + * + * Both copy flag and key_type can be ignored by provider (key type + * interpreted as WSREP_KEY_EXCLUSIVE). * - * @param wsrep this wsrep handle - * @param trx_handle transaction handle - * @param query SQL statement string - * @param timeval time to use for time functions - * @param randseed seed for rand + * @param wsrep provider handle + * @param ws_handle writeset handle + * @param keys array of keys + * @param count length of the array of keys + * @param type type ot the key + * @param copy can be set to FALSE if keys persist through commit. */ - wsrep_status_t (*append_query)(wsrep_t* wsrep, - wsrep_trx_handle_t* trx_handle, - const char* query, - time_t timeval, - uint32_t randseed); + wsrep_status_t (*append_key)(wsrep_t* wsrep, + wsrep_ws_handle_t* ws_handle, + const wsrep_key_t* keys, + size_t count, + enum wsrep_key_type type, + wsrep_bool_t copy); /*! - * @brief Appends a row reference in transaction's write set + * @brief Appends data to transaction writeset + * + * This method can be called any time before commit and it + * appends a number of data buffers to transaction writeset. * - * @param wsrep this wsrep handle - * @param trx_handle transaction handle - * @param key array of keys - * @param key_len length of the array of keys - * @param shared boolean denoting if key corresponds to shared resource + * Both copy and unordered flags can be ignored by provider. + * + * @param wsrep provider handle + * @param ws_handle writeset handle + * @param data array of data buffers + * @param count buffer count + * @param type type of data + * @param copy can be set to FALSE if data persists through commit. */ - wsrep_status_t (*append_key)(wsrep_t* wsrep, - wsrep_trx_handle_t* trx_handle, - const wsrep_key_t* key, - size_t key_len, - bool shared); - /*! - * @brief Appends data in transaction's write set - * - * This method can be called any time before commit and it - * appends data block into transaction's write set. - * - * @param wsrep this wsrep handle - * @param trx_handle transaction handle - * @param data data buffer - * @param data_len data buffer length - */ - wsrep_status_t (*append_data)(wsrep_t* wsrep, - wsrep_trx_handle_t* trx_handle, - const void* data, - size_t data_len); - + wsrep_status_t (*append_data)(wsrep_t* wsrep, + wsrep_ws_handle_t* ws_handle, + const struct wsrep_buf* data, + size_t count, + enum wsrep_data_type type, + wsrep_bool_t copy); /*! * @brief Get causal ordering for read operation * * This call will block until causal ordering with all possible * preceding writes in the cluster is guaranteed. If pointer to - * seqno is non-null, the call stores the global transaction ID + * gtid is non-null, the call stores the global transaction ID * of the last transaction which is guaranteed to be ordered * causally before this call. * - * @param wsrep this wsrep handle - * @param seqno location to store global transaction ID + * @param wsrep provider handle + * @param gtid location to store GTID */ - wsrep_status_t (*causal_read)(wsrep_t* wsrep, wsrep_seqno_t* seqno); + wsrep_status_t (*causal_read)(wsrep_t* wsrep, wsrep_gtid_t* gtid); /*! * @brief Clears allocated connection context. @@ -641,7 +819,7 @@ struct wsrep_ { * connection. This call is to explicitly notify provider fo connection * closing. * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @param conn_id connection ID * @param query the 'set database' query * @param query_len length of query (does not end with 0) @@ -652,29 +830,28 @@ struct wsrep_ { /*! * @brief Replicates a query and starts "total order isolation" section. * - * Replicates the query and returns success code, which - * caller must check. Total order isolation continues - * until to_execute_end() is called. + * Replicates the action spec and returns success code, which caller must + * check. Total order isolation continues until to_execute_end() is called. * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @param conn_id connection ID - * @param key array of keys - * @param key_len lenght of the array of keys - * @param query query to be executed - * @param query_len length of the query string - * @param seqno seqno part of the action ID + * @param keys array of keys + * @param keys_num lenght of the array of keys + * @param action action buffer array to be executed + * @param count action buffer count + * @param meta transaction meta data * * @retval WSREP_OK cluster commit succeeded * @retval WSREP_CONN_FAIL must close client connection * @retval WSREP_NODE_FAIL must close all connections and reinit */ - wsrep_status_t (*to_execute_start)(wsrep_t* wsrep, - wsrep_conn_id_t conn_id, - const wsrep_key_t* key, - size_t key_len, - const void* query, - size_t query_len, - wsrep_seqno_t* seqno); + wsrep_status_t (*to_execute_start)(wsrep_t* wsrep, + wsrep_conn_id_t conn_id, + const wsrep_key_t* keys, + size_t keys_num, + const struct wsrep_buf* action, + size_t count, + wsrep_trx_meta_t* meta); /*! * @brief Ends the total order isolation section. @@ -682,7 +859,7 @@ struct wsrep_ { * Marks the end of total order isolation. TO locks are freed * and other transactions are free to commit from this point on. * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @param conn_id connection ID * * @retval WSREP_OK cluster commit succeeded @@ -692,32 +869,81 @@ struct wsrep_ { wsrep_status_t (*to_execute_end)(wsrep_t* wsrep, wsrep_conn_id_t conn_id); /*! + * @brief Collects preordered replication events into a writeset. + * + * @param wsrep wsrep provider handle + * @param handle a handle associated with a given writeset + * @param data an array of data buffers. + * @param count length of data buffer array. + * @param copy whether provider needs to make a copy of events. + * + * @retval WSREP_OK cluster-wide commit succeeded + * @retval WSREP_TRX_FAIL operation failed (e.g. trx size exceeded limit) + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*preordered_collect) (wsrep_t* wsrep, + wsrep_po_handle_t* handle, + const struct wsrep_buf* data, + size_t count, + wsrep_bool_t copy); + + /*! + * @brief "Commits" preordered writeset to cluster. + * + * The contract is that the writeset will be committed in the same (partial) + * order this method was called. Frees resources associated with the writeset + * handle and reinitializes the handle. + * + * @param wsrep wsrep provider handle + * @param po_handle a handle associated with a given writeset + * @param source_id ID of the event producer, also serves as the partial order + * or stream ID - events with different source_ids won't be + * ordered with respect to each other. + * @param flags WSREP_FLAG_... flags + * @param pa_range the number of preceding events this event can be processed + * in parallel with. A value of 0 means strict serial + * processing. Note: commits always happen in wsrep order. + * @param commit 'true' to commit writeset to cluster (replicate) or + * 'false' to rollback (cancel) the writeset. + * + * @retval WSREP_OK cluster-wide commit succeeded + * @retval WSREP_TRX_FAIL operation failed (e.g. NON-PRIMARY component) + * @retval WSREP_NODE_FAIL must close all connections and reinit + */ + wsrep_status_t (*preordered_commit) (wsrep_t* wsrep, + wsrep_po_handle_t* handle, + const wsrep_uuid_t* source_id, + uint64_t flags, + int pa_range, + wsrep_bool_t commit); + + /*! * @brief Signals to wsrep provider that state snapshot has been sent to * joiner. * - * @param wsrep this wsrep handle - * @param uuid sequence UUID (group UUID) - * @param seqno sequence number or negative error code of the operation + * @param wsrep provider handle + * @param state_id state ID + * @param rcode 0 or negative error code of the operation. */ wsrep_status_t (*sst_sent)(wsrep_t* wsrep, - const wsrep_uuid_t* uuid, - wsrep_seqno_t seqno); + const wsrep_gtid_t* state_id, + int rcode); /*! * @brief Signals to wsrep provider that new state snapshot has been received. * May deadlock if called from sst_prepare_cb. * - * @param wsrep this wsrep handle - * @param uuid sequence UUID (group UUID) - * @param seqno sequence number or negative error code of the operation + * @param wsrep provider handle + * @param state_id state ID * @param state initial state provided by SST donor * @param state_len length of state buffer + * @param rcode 0 or negative error code of the operation. */ wsrep_status_t (*sst_received)(wsrep_t* wsrep, - const wsrep_uuid_t* uuid, - wsrep_seqno_t seqno, - const char* state, - size_t state_len); + const wsrep_gtid_t* state_id, + const void* state, + size_t state_len, + int rcode); /*! @@ -729,9 +955,9 @@ struct wsrep_ { * called only locally. This call will block until sst_sent is called * from callback. * - * @param wsrep this wsrep handle - * @param msg context message for SST donate callback - * @param msg_len length of context message + * @param wsrep provider handle + * @param msg context message for SST donate callback + * @param msg_len length of context message * @param donor_spec list of snapshot donors */ wsrep_status_t (*snapshot)(wsrep_t* wsrep, @@ -743,7 +969,7 @@ struct wsrep_ { * @brief Returns an array fo status variables. * Array is terminated by Null variable name. * - * @param wsrep this wsrep handle + * @param wsrep provider handle * @return array of struct wsrep_status_var. */ struct wsrep_stats_var* (*stats_get) (wsrep_t* wsrep); @@ -751,11 +977,19 @@ struct wsrep_ { /*! * @brief Release resources that might be associated with the array. * - * @param wsrep this wsrep handle. + * @param wsrep provider handle. + * @param var_array array returned by stats_get(). */ void (*stats_free) (wsrep_t* wsrep, struct wsrep_stats_var* var_array); /*! + * @brief Reset some stats variables to inital value, provider-dependent. + * + * @param wsrep provider handle. + */ + void (*stats_reset) (wsrep_t* wsrep); + + /*! * @brief Pauses writeset applying/committing. * * @return global sequence number of the paused state or negative error code. @@ -786,28 +1020,30 @@ struct wsrep_ { /*! * @brief Acquire global named lock * - * @param wsrep wsrep provider handle - * @param name lock name - * @param owner 64-bit owner ID - * @param tout timeout in nanoseconds. - * 0 - return immediately, -1 wait forever. - * @return wsrep status or negative error code + * @param wsrep wsrep provider handle + * @param name lock name + * @param shared shared or exclusive lock + * @param owner 64-bit owner ID + * @param tout timeout in nanoseconds. + * 0 - return immediately, -1 wait forever. + * @return wsrep status or negative error code * @retval -EDEADLK lock was already acquired by this thread * @retval -EBUSY lock was busy */ - wsrep_status_t (*lock) (wsrep_t* wsrep, const char* name, int64_t owner, - int64_t tout); + wsrep_status_t (*lock) (wsrep_t* wsrep, + const char* name, wsrep_bool_t shared, + uint64_t owner, int64_t tout); /*! * @brief Release global named lock * - * @param wsrep wsrep provider handle - * @param name lock name - * @param owner 64-bit owner ID - * @return wsrep status or negative error code + * @param wsrep wsrep provider handle + * @param name lock name + * @param owner 64-bit owner ID + * @return wsrep status or negative error code * @retval -EPERM lock does not belong to this owner */ - wsrep_status_t (*unlock) (wsrep_t* wsrep, const char* name, int64_t owner); + wsrep_status_t (*unlock) (wsrep_t* wsrep, const char* name, uint64_t owner); /*! * @brief Check if global named lock is locked @@ -818,8 +1054,8 @@ struct wsrep_ { * @param node if not NULL will contain owner's node UUID * @return true if lock is locked */ - bool (*is_locked) (wsrep_t* wsrep, const char* name, int64_t* conn, - wsrep_uuid_t* node); + wsrep_bool_t (*is_locked) (wsrep_t* wsrep, const char* name, uint64_t* conn, + wsrep_uuid_t* node); /*! * wsrep provider name @@ -838,7 +1074,7 @@ struct wsrep_ { /*! * @brief Frees allocated resources before unloading the library. - * @param wsrep this wsrep handle + * @param wsrep provider handle */ void (*free)(wsrep_t* wsrep); @@ -846,7 +1082,6 @@ struct wsrep_ { void *ctx; //!< reserved for implemetation private context }; -typedef int (*wsrep_loader_fun)(wsrep_t*); /*! * diff --git a/wsrep/wsrep_dummy.c b/wsrep/wsrep_dummy.c index 6d01ce14b4e..33b61e6821f 100644 --- a/wsrep/wsrep_dummy.c +++ b/wsrep/wsrep_dummy.c @@ -16,10 +16,11 @@ /*! @file Dummy wsrep API implementation. */ -#include <errno.h> - #include "wsrep_api.h" +#include <errno.h> +#include <stdbool.h> + /*! Dummy backend context. */ typedef struct wsrep_dummy { @@ -74,9 +75,10 @@ static char* dummy_options_get (wsrep_t* w) static wsrep_status_t dummy_connect( wsrep_t* w, - const char* name __attribute__((unused)), - const char* url __attribute__((unused)), - const char* donor __attribute__((unused))) + const char* name __attribute__((unused)), + const char* url __attribute__((unused)), + const char* donor __attribute__((unused)), + wsrep_bool_t bootstrap __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -97,12 +99,12 @@ static wsrep_status_t dummy_recv(wsrep_t* w, static wsrep_status_t dummy_pre_commit( wsrep_t* w, - const wsrep_conn_id_t conn_id __attribute__((unused)), - wsrep_trx_handle_t* trx_handle __attribute__((unused)), - const void* query __attribute__((unused)), - const size_t query_len __attribute__((unused)), - uint64_t flags __attribute__((unused)), - wsrep_seqno_t* seqno __attribute__((unused))) + const wsrep_conn_id_t conn_id __attribute__((unused)), + wsrep_ws_handle_t* ws_handle __attribute__((unused)), +// const struct wsrep_buf* data __attribute__((unused)), +// const long count __attribute__((unused)), + uint64_t flags __attribute__((unused)), + wsrep_trx_meta_t* meta __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -110,7 +112,7 @@ static wsrep_status_t dummy_pre_commit( static wsrep_status_t dummy_post_commit( wsrep_t* w, - wsrep_trx_handle_t* trx_handle __attribute__((unused))) + wsrep_ws_handle_t* ws_handle __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -118,7 +120,7 @@ static wsrep_status_t dummy_post_commit( static wsrep_status_t dummy_post_rollback( wsrep_t* w, - wsrep_trx_handle_t* trx_handle __attribute__((unused))) + wsrep_ws_handle_t* ws_handle __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -126,7 +128,7 @@ static wsrep_status_t dummy_post_rollback( static wsrep_status_t dummy_replay_trx( wsrep_t* w, - wsrep_trx_handle_t* trx_handle __attribute__((unused)), + wsrep_ws_handle_t* ws_handle __attribute__((unused)), void* trx_ctx __attribute__((unused))) { WSREP_DBUG_ENTER(w); @@ -142,23 +144,13 @@ static wsrep_status_t dummy_abort_pre_commit( return WSREP_OK; } -static wsrep_status_t dummy_append_query( - wsrep_t* w, - wsrep_trx_handle_t* trx_handle __attribute__((unused)), - const char* query __attribute__((unused)), - const time_t timeval __attribute__((unused)), - const uint32_t randseed __attribute__((unused))) -{ - WSREP_DBUG_ENTER(w); - return WSREP_OK; -} - -static wsrep_status_t dummy_append_row_key( +static wsrep_status_t dummy_append_key( wsrep_t* w, - wsrep_trx_handle_t* trx_handle __attribute__((unused)), - const wsrep_key_t* key __attribute__((unused)), - const size_t key_len __attribute__((unused)), - const bool shared __attribute__((unused))) + wsrep_ws_handle_t* ws_handle __attribute__((unused)), + const wsrep_key_t* key __attribute__((unused)), + const int key_num __attribute__((unused)), + const wsrep_key_type_t key_type __attribute__((unused)), + const bool copy __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -166,9 +158,11 @@ static wsrep_status_t dummy_append_row_key( static wsrep_status_t dummy_append_data( wsrep_t* w, - wsrep_trx_handle_t* trx_handle __attribute__((unused)), - const void* data __attribute__((unused)), - size_t data_len __attribute__((unused))) + wsrep_ws_handle_t* ws_handle __attribute__((unused)), + const struct wsrep_buf* data __attribute__((unused)), + const int count __attribute__((unused)), + const wsrep_data_type_t type __attribute__((unused)), + const bool copy __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -176,7 +170,7 @@ static wsrep_status_t dummy_append_data( static wsrep_status_t dummy_causal_read( wsrep_t* w, - wsrep_seqno_t* seqno __attribute__((unused))) + wsrep_gtid_t* gtid __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -192,12 +186,12 @@ static wsrep_status_t dummy_free_connection( static wsrep_status_t dummy_to_execute_start( wsrep_t* w, - const wsrep_conn_id_t conn_id __attribute__((unused)), - const wsrep_key_t* key __attribute__((unused)), - const size_t key_len __attribute__((unused)), - const void* query __attribute__((unused)), - const size_t query_len __attribute__((unused)), - wsrep_seqno_t* seqno __attribute__((unused))) + const wsrep_conn_id_t conn_id __attribute__((unused)), + const wsrep_key_t* key __attribute__((unused)), + const int key_num __attribute__((unused)), + const struct wsrep_buf* data __attribute__((unused)), + const int count __attribute__((unused)), + wsrep_trx_meta_t* meta __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -211,6 +205,19 @@ static wsrep_status_t dummy_to_execute_end( return WSREP_OK; } +static wsrep_status_t dummy_preordered( + wsrep_t* w, + const wsrep_uuid_t* source_id __attribute__((unused)), + int pa_range __attribute__((unused)), + const struct wsrep_buf* data __attribute__((unused)), + int count __attribute__((unused)), + uint64_t flags __attribute__((unused)), + wsrep_bool_t copy __attribute__((unused))) +{ + WSREP_DBUG_ENTER(w); + return WSREP_OK; +} + static wsrep_status_t dummy_sst_sent( wsrep_t* w, const wsrep_uuid_t* uuid __attribute__((unused)), @@ -234,7 +241,7 @@ static wsrep_status_t dummy_sst_received( static wsrep_status_t dummy_snapshot( wsrep_t* w, const void* msg __attribute__((unused)), - const size_t msg_len __attribute__((unused)), + const int msg_len __attribute__((unused)), const char* donor_spec __attribute__((unused))) { WSREP_DBUG_ENTER(w); @@ -258,6 +265,11 @@ static void dummy_stats_free ( WSREP_DBUG_ENTER(w); } +static void dummy_stats_reset (wsrep_t* w) +{ + WSREP_DBUG_ENTER(w); +} + static wsrep_seqno_t dummy_pause (wsrep_t* w) { WSREP_DBUG_ENTER(w); @@ -284,7 +296,8 @@ static wsrep_status_t dummy_resync (wsrep_t* w) static wsrep_status_t dummy_lock (wsrep_t* w, const char* s __attribute__((unused)), - int64_t o __attribute__((unused)), + bool r __attribute__((unused)), + uint64_t o __attribute__((unused)), int64_t t __attribute__((unused))) { WSREP_DBUG_ENTER(w); @@ -293,7 +306,7 @@ static wsrep_status_t dummy_lock (wsrep_t* w, static wsrep_status_t dummy_unlock (wsrep_t* w, const char* s __attribute__((unused)), - int64_t o __attribute__((unused))) + uint64_t o __attribute__((unused))) { WSREP_DBUG_ENTER(w); return WSREP_OK; @@ -301,7 +314,7 @@ static wsrep_status_t dummy_unlock (wsrep_t* w, static bool dummy_is_locked (wsrep_t* w, const char* s __attribute__((unused)), - int64_t* o __attribute__((unused)), + uint64_t* o __attribute__((unused)), wsrep_uuid_t* t __attribute__((unused))) { WSREP_DBUG_ENTER(w); @@ -322,18 +335,19 @@ static wsrep_t dummy_iface = { &dummy_post_rollback, &dummy_replay_trx, &dummy_abort_pre_commit, - &dummy_append_query, - &dummy_append_row_key, + &dummy_append_key, &dummy_append_data, &dummy_causal_read, &dummy_free_connection, &dummy_to_execute_start, &dummy_to_execute_end, + &dummy_preordered, &dummy_sst_sent, &dummy_sst_received, &dummy_snapshot, &dummy_stats_get, &dummy_stats_free, + &dummy_stats_reset, &dummy_pause, &dummy_resume, &dummy_desync, @@ -344,6 +358,7 @@ static wsrep_t dummy_iface = { WSREP_NONE, WSREP_INTERFACE_VERSION, "Codership Oy <info@codership.com>", + 0xdeadbeef, &dummy_free, NULL, NULL diff --git a/wsrep/wsrep_loader.c b/wsrep/wsrep_loader.c index b4460658f80..8ae6ea962ec 100644 --- a/wsrep/wsrep_loader.c +++ b/wsrep/wsrep_loader.c @@ -70,15 +70,18 @@ static int verify(const wsrep_t *wh, const char *iface_ver) VERIFY(wh->post_rollback); VERIFY(wh->replay_trx); VERIFY(wh->abort_pre_commit); - VERIFY(wh->append_query); VERIFY(wh->append_key); + VERIFY(wh->append_data); VERIFY(wh->free_connection); VERIFY(wh->to_execute_start); VERIFY(wh->to_execute_end); + VERIFY(wh->preordered_collect); + VERIFY(wh->preordered_commit); VERIFY(wh->sst_sent); VERIFY(wh->sst_received); VERIFY(wh->stats_get); VERIFY(wh->stats_free); + VERIFY(wh->stats_reset); VERIFY(wh->pause); VERIFY(wh->resume); VERIFY(wh->desync); @@ -93,6 +96,7 @@ static int verify(const wsrep_t *wh, const char *iface_ver) return 0; } +typedef int (*wsrep_loader_fun)(wsrep_t*); static wsrep_loader_fun wsrep_dlf(void *dlh, const char *sym) { @@ -175,7 +179,7 @@ out: *hptr = NULL; } else { snprintf (msg, msg_len, - "wsrep_load(): %s %s by %s loaded succesfully.", + "wsrep_load(): %s %s by %s loaded successfully.", (*hptr)->provider_name, (*hptr)->provider_version, (*hptr)->provider_vendor); logger (WSREP_LOG_INFO, msg); diff --git a/wsrep/wsrep_uuid.c b/wsrep/wsrep_uuid.c index c99240cc071..baa95b2578a 100644 --- a/wsrep/wsrep_uuid.c +++ b/wsrep/wsrep_uuid.c @@ -26,25 +26,31 @@ * Read UUID from string * @return length of UUID string representation or -EINVAL in case of error */ -ssize_t +int wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid) { - size_t uuid_len = 0; - size_t uuid_offt = 0; + unsigned int uuid_len = 0; + unsigned int uuid_offt = 0; while (uuid_len + 1 < str_len) { - if ((4 == uuid_offt || 6 == uuid_offt || 8 == uuid_offt || - 10 == uuid_offt) && str[uuid_len] == '-') { + /* We are skipping potential '-' after uuid_offt == 4, 6, 8, 10 + * which means + * (uuid_offt >> 1) == 2, 3, 4, 5, + * which in turn means + * (uuid_offt >> 1) - 2 <= 3 + * since it is always >= 0, because uuid_offt is unsigned */ + if (((uuid_offt >> 1) - 2) <= 3 && str[uuid_len] == '-') { // skip dashes after 4th, 6th, 8th and 10th positions uuid_len += 1; continue; } + if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) { - // got hex digit - sscanf (str + uuid_len, "%2hhx", uuid->uuid + uuid_offt); + // got hex digit, scan another byte to uuid, increment uuid_offt + sscanf (str + uuid_len, "%2hhx", uuid->data + uuid_offt); uuid_len += 2; uuid_offt += 1; - if (sizeof (uuid->uuid) == uuid_offt) + if (sizeof (uuid->data) == uuid_offt) return uuid_len; } else { @@ -61,11 +67,11 @@ wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid) * @return length of UUID string representation or -EMSGSIZE if string is too * short */ -ssize_t +int wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len) { if (str_len > 36) { - const unsigned char* u = uuid->uuid; + const unsigned char* u = uuid->data; return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-" "%02x%02x-%02x%02x%02x%02x%02x%02x", u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7], @@ -75,4 +81,3 @@ wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len) return -EMSGSIZE; } } - |