summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSeppo Jaakola <seppo.jaakola@codership.com>2013-11-26 16:48:30 +0200
committerSeppo Jaakola <seppo.jaakola@codership.com>2013-11-26 16:48:30 +0200
commita2594e96f7c7fe762a8165916551ae96bcbb869f (patch)
treefa1e4eb2a6d6ef1ca8a039a2afe48a65bca6ab33 /sql
parent2b4183f10b54a5b3f8c848d897b3107859c23fa4 (diff)
downloadmariadb-git-a2594e96f7c7fe762a8165916551ae96bcbb869f.tar.gz
Merges from lp:codership-mysql/5.5 up to rev #3893, this changes to wsrep API #24
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt2
-rw-r--r--sql/events.cc2
-rw-r--r--sql/log.cc20
-rw-r--r--sql/log.h7
-rw-r--r--sql/log_event.cc14
-rw-r--r--sql/mysqld.cc4
-rw-r--r--sql/sp.cc2
-rw-r--r--sql/sql_class.cc40
-rw-r--r--sql/sql_class.h11
-rw-r--r--sql/sql_parse.cc2
-rw-r--r--sql/sql_trigger.cc4
-rw-r--r--sql/sys_vars.cc6
-rw-r--r--sql/wsrep_binlog.h49
-rw-r--r--sql/wsrep_hton.cc134
-rw-r--r--sql/wsrep_mysqld.cc270
-rw-r--r--sql/wsrep_mysqld.h39
-rw-r--r--sql/wsrep_priv.h18
-rw-r--r--sql/wsrep_sst.cc47
-rw-r--r--sql/wsrep_thd.cc16
-rw-r--r--sql/wsrep_utils.cc85
-rw-r--r--sql/wsrep_var.cc25
21 files changed, 466 insertions, 331 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 7766b82adff..fd654d01b1d 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -50,6 +50,8 @@ IF(WITH_WSREP)
wsrep_sst.cc
wsrep_utils.cc
wsrep_var.cc
+ wsrep_binlog.cc
+ wsrep_applier.cc
wsrep_thd.cc
)
SET(WSREP_LIB wsrep)
diff --git a/sql/events.cc b/sql/events.cc
index 7df8d19644d..73ce894095c 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -1169,7 +1169,7 @@ end:
}
#ifdef WITH_WSREP
-int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len)
+int wsrep_create_event_query(THD *thd, uchar** buf, int* buf_len)
{
String log_query;
diff --git a/sql/log.cc b/sql/log.cc
index 1c51d50a2ca..6dd4aa7038b 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -596,7 +596,7 @@ void thd_binlog_rollback_stmt(THD * thd)
with the exception that here we write in buffer instead of log file.
*/
-int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len)
+int wsrep_write_cache(IO_CACHE *cache, uchar **buf, int *buf_len)
{
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
@@ -604,7 +604,7 @@ int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len)
uint length= my_b_bytes_in_cache(cache);
long long total_length = 0;
uchar *buf_ptr = NULL;
-
+
do
{
/* bail out if buffer grows too large
@@ -614,7 +614,7 @@ int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len)
if (total_length > wsrep_max_ws_size)
{
WSREP_WARN("transaction size limit (%lld) exceeded: %lld",
- wsrep_max_ws_size, total_length);
+ wsrep_max_ws_size, total_length);
if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0))
{
WSREP_WARN("failed to initialize io-cache");
@@ -6017,21 +6017,27 @@ err:
if (WSREP(thd) && wsrep_incremental_data_collection &&
(wsrep_emulate_bin_log || mysql_bin_log.is_open()))
{
- DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1);
+ DBUG_ASSERT(thd->wsrep_ws_handle.trx_id != (unsigned long)-1);
if (!error)
{
IO_CACHE* cache= get_trans_log(thd);
uchar* buf= NULL;
- uint buf_len= 0;
+ int buf_len= 0;
if (wsrep_emulate_bin_log)
thd->binlog_flush_pending_rows_event(false);
error= wsrep_write_cache(cache, &buf, &buf_len);
if (!error && buf_len > 0)
{
+ const struct wsrep_buf buff = { buf, buf_len };
+
+ const bool nocopy(false);
+ const bool unordered(false);
+
wsrep_status_t rc= wsrep->append_data(wsrep,
- &thd->wsrep_trx_handle,
- buf, buf_len);
+ &thd->wsrep_ws_handle,
+ &buff, 1, WSREP_DATA_ORDERED,
+ true);
if (rc != WSREP_OK)
{
sql_print_warning("WSREP: append_data() returned %d", rc);
diff --git a/sql/log.h b/sql/log.h
index 964d75916a9..487005913c9 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -287,12 +287,6 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED };
(mmap+fsync is two times faster than write+fsync)
*/
-#ifdef WITH_WSREP
-extern my_bool wsrep_emulate_bin_log;
-Log_event* wsrep_read_log_event(
- char **arg_buf, size_t *arg_buf_len,
- const Format_description_log_event *description_event);
-#endif
class MYSQL_LOG
{
public:
@@ -974,7 +968,6 @@ bool wsrep_trans_cache_is_empty(THD *thd);
void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end);
void thd_binlog_trx_reset(THD * thd);
void thd_binlog_rollback_stmt(THD * thd);
-int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len);
#define WSREP_FORMAT(my_format) \
((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \
diff --git a/sql/log_event.cc b/sql/log_event.cc
index a6b27caf631..450a80bc9b2 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -9211,7 +9211,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->is_fatal_error,
thd->wsrep_exec_mode,
thd->wsrep_conflict_state,
- (long long)thd->wsrep_trx_seqno);
+ (long long)wsrep_thd_trx_seqno(thd));
}
#endif
if (thd->is_slave_error || thd->is_fatal_error)
@@ -10976,7 +10976,7 @@ Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64];
info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else
const char* tmp = (WSREP(thd)) ?
@@ -11659,7 +11659,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64];
info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else
const char* tmp = (WSREP(thd)) ?
@@ -11675,7 +11675,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1,
"Delete_rows_log_event::ha_delete_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info);
#else
if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()");
@@ -11809,7 +11809,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64];
info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else
const char* tmp = (WSREP(thd)) ?
@@ -11846,7 +11846,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1,
"Update_rows_log_event::unpack_current_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info);
#else
if (WSREP(thd))
@@ -11875,7 +11875,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1,
"Update_rows_log_event::ha_update_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info);
#else
if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()");
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 1c9560ceb9f..a10dfa1015c 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -359,7 +359,11 @@ static char *default_character_set_name;
static char *character_set_filesystem_name;
static char *lc_messages;
static char *lc_time_names_name;
+#ifndef WITH_WSREP
static char *my_bind_addr_str;
+#else
+char *my_bind_addr_str;
+#endif /* WITH_WSREP */
static char *default_collation_name;
char *default_storage_engine;
static char compiled_default_collation_name[]= MYSQL_DEFAULT_COLLATION_NAME;
diff --git a/sql/sp.cc b/sql/sp.cc
index 73f59277cf0..ca8e41282cf 100644
--- a/sql/sp.cc
+++ b/sql/sp.cc
@@ -2255,7 +2255,7 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db,
return sp;
}
#ifdef WITH_WSREP
-int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len)
+int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len)
{
String log_query;
sp_head *sp = thd->lex->sphead;
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index a0c24ffb3ca..49fb88f5866 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -869,9 +869,9 @@ extern "C" const char *wsrep_thd_conflict_state_str(THD *thd)
(thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void";
}
-extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd)
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
{
- return &thd->wsrep_trx_handle;
+ return &thd->wsrep_ws_handle;
}
extern "C" void wsrep_thd_LOCK(THD *thd)
@@ -896,7 +896,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd)
}
extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd)
{
- return (thd) ? thd->wsrep_trx_seqno : -1;
+ return (thd) ? thd->wsrep_trx_meta.gtid.seqno : -1;
}
extern "C" query_id_t wsrep_thd_query_id(THD *thd)
{
@@ -918,7 +918,6 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal)
{
if (signal)
{
- thd->wsrep_bf_thd = bf_thd;
mysql_mutex_lock(&thd->LOCK_thd_data);
thd->awake(KILL_QUERY);
mysql_mutex_unlock(&thd->LOCK_thd_data);
@@ -934,16 +933,16 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal)
extern int
wsrep_trx_order_before(void *thd1, void *thd2)
{
- if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) {
- WSREP_DEBUG("BF conflict, order: %lld %lld\n",
- (long long)((THD*)thd1)->wsrep_trx_seqno,
- (long long)((THD*)thd2)->wsrep_trx_seqno);
- return 1;
- }
- WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
- (long long)((THD*)thd1)->wsrep_trx_seqno,
- (long long)((THD*)thd2)->wsrep_trx_seqno);
- return 0;
+ if (wsrep_thd_trx_seqno((THD*)thd1) < wsrep_thd_trx_seqno((THD*)thd2)) {
+ WSREP_DEBUG("BF conflict, order: %lld %lld\n",
+ (long long)wsrep_thd_trx_seqno((THD*)thd1),
+ (long long)wsrep_thd_trx_seqno((THD*)thd2));
+ return 1;
+ }
+ WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
+ (long long)wsrep_thd_trx_seqno((THD*)thd1),
+ (long long)wsrep_thd_trx_seqno((THD*)thd2));
+ return 0;
}
extern "C" int
wsrep_trx_is_aborting(void *thd_ptr)
@@ -1142,9 +1141,8 @@ THD::THD()
#ifdef WITH_WSREP
mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL);
- wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
- wsrep_trx_handle.opaque = NULL;
- //wsrep_retry_autocommit= ::wsrep_retry_autocommit;
+ wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
+ wsrep_ws_handle.opaque = NULL;
wsrep_retry_counter = 0;
wsrep_PA_safe = true;
wsrep_seqno_changed = false;
@@ -1154,7 +1152,6 @@ THD::THD()
wsrep_consistency_check = NO_CONSISTENCY_CHECK;
wsrep_status_vars = 0;
wsrep_mysql_replicated = 0;
- wsrep_bf_thd = NULL;
wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0;
#endif
@@ -1549,7 +1546,8 @@ void THD::init(void)
wsrep_conflict_state= NO_CONFLICT;
wsrep_query_state= QUERY_IDLE;
wsrep_last_query_id= 0;
- wsrep_trx_seqno= 0;
+ wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED;
+ wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED;
wsrep_converted_lock_session= false;
wsrep_retry_counter= 0;
wsrep_rli= NULL;
@@ -1557,7 +1555,7 @@ void THD::init(void)
wsrep_seqno_changed= false;
wsrep_consistency_check = NO_CONSISTENCY_CHECK;
wsrep_mysql_replicated = 0;
- wsrep_bf_thd = NULL;
+
wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0;
#endif
@@ -1941,7 +1939,7 @@ void THD::awake(killed_state state_to_set)
/* Interrupt target waiting inside a storage engine. */
if (state_to_set != NOT_KILLED)
#ifdef WITH_WSREP
- if (!wsrep_bf_thd || wsrep_bf_thd->wsrep_exec_mode == LOCAL_STATE)
+ /* TODO: prevent applier close here */
#endif /* WITH_WSREP */
ha_kill_query(this, thd_kill_level(this));
diff --git a/sql/sql_class.h b/sql/sql_class.h
index a68b936307a..81d1a3d4a83 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -2553,11 +2553,13 @@ public:
enum wsrep_conflict_state wsrep_conflict_state;
mysql_mutex_t LOCK_wsrep_thd;
mysql_cond_t COND_wsrep_thd;
- wsrep_seqno_t wsrep_trx_seqno;
+ // changed from wsrep_seqno_t to wsrep_trx_meta_t in wsrep API rev 75
+ // wsrep_seqno_t wsrep_trx_seqno;
+ wsrep_trx_meta_t wsrep_trx_meta;
uint32 wsrep_rand;
Relay_log_info* wsrep_rli;
bool wsrep_converted_lock_session;
- wsrep_trx_handle_t wsrep_trx_handle;
+ wsrep_ws_handle_t wsrep_ws_handle;
bool wsrep_seqno_changed;
#ifdef WSREP_PROC_INFO
char wsrep_info[128]; /* string for dynamic proc info */
@@ -2571,9 +2573,8 @@ public:
wsrep_consistency_check;
wsrep_stats_var* wsrep_status_vars;
int wsrep_mysql_replicated;
- THD* wsrep_bf_thd;
- const char* wsrep_TOI_pre_query; /* a query to apply before
- the actual TOI query */
+ const char* wsrep_TOI_pre_query; /* a query to apply before
+ the actual TOI query */
size_t wsrep_TOI_pre_query_len;
#endif /* WITH_WSREP */
/**
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 8f208ee66b1..a9687432849 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -953,7 +953,6 @@ bool do_command(THD *thd)
else if (thd->wsrep_conflict_state == ABORTED)
{
thd->store_globals();
- thd->wsrep_bf_thd = NULL;
}
thd->wsrep_query_state= QUERY_EXEC;
@@ -1241,7 +1240,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->mysys_var->abort = 0;
thd->wsrep_conflict_state = NO_CONFLICT;
thd->wsrep_retry_counter = 0;
- thd->wsrep_bf_thd = NULL;
/*
Increment threads running to compensate dec_thread_running() called
after dispatch_end label.
diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc
index 135207c1681..939541f913e 100644
--- a/sql/sql_trigger.cc
+++ b/sql/sql_trigger.cc
@@ -2484,7 +2484,7 @@ bool load_table_name_for_trigger(THD *thd,
DBUG_RETURN(FALSE);
}
#ifdef WITH_WSREP
-int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len)
+int wsrep_create_trigger_query(THD *thd, uchar** buf, int* buf_len)
{
LEX *lex= thd->lex;
String stmt_query;
@@ -2532,6 +2532,6 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len)
stmt_query.append(stmt_definition.str, stmt_definition.length);
return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
- buf, buf_len);
+ buf, buf_len);
}
#endif /* WITH_WSREP */
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index e1b0dc50c65..37e5f3315e3 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -4065,6 +4065,7 @@ static Sys_var_tz Sys_time_zone(
#ifdef WITH_WSREP
#include "wsrep_var.h"
#include "wsrep_sst.h"
+#include "wsrep_binlog.h"
static Sys_var_charptr Sys_wsrep_provider(
"wsrep_provider", "Path to replication provider library",
@@ -4222,10 +4223,11 @@ static Sys_var_charptr Sys_wsrep_start_position (
ON_CHECK(wsrep_start_position_check),
ON_UPDATE(wsrep_start_position_update));
-static Sys_var_ulonglong Sys_wsrep_max_ws_size (
+static Sys_var_ulong Sys_wsrep_max_ws_size (
"wsrep_max_ws_size", "Max write set size (bytes)",
GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1));
+ /* Upper limit is 65K short of 4G to avoid overlows on 32-bit systems */
+ VALID_RANGE(1024, WSREP_MAX_WS_SIZE), DEFAULT(1073741824UL), BLOCK_SIZE(1));
static Sys_var_ulong Sys_wsrep_max_ws_rows (
"wsrep_max_ws_rows", "Max number of rows in write set",
diff --git a/sql/wsrep_binlog.h b/sql/wsrep_binlog.h
new file mode 100644
index 00000000000..6de73b2f5ee
--- /dev/null
+++ b/sql/wsrep_binlog.h
@@ -0,0 +1,49 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_BINLOG_H
+#define WSREP_BINLOG_H
+
+#include "sql_class.h" // THD, IO_CACHE
+
+#define HEAP_PAGE_SIZE 65536 /* 64K */
+#define WSREP_MAX_WS_SIZE (0xFFFFFFFFUL - HEAP_PAGE_SIZE)
+
+/*
+ Write the contents of a cache to a memory buffer.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len);
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+
+ @param len total amount of data written
+ @return wsrep error status
+ */
+int wsrep_write_cache (wsrep_t* wsrep,
+ THD* thd,
+ IO_CACHE* cache,
+ size_t* len);
+
+/* Dump replication buffer to disk */
+void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
+
+#endif /* WSREP_BINLOG_H */
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc
index 5ac4a5fad12..d43e5408f61 100644
--- a/sql/wsrep_hton.cc
+++ b/sql/wsrep_hton.cc
@@ -18,7 +18,7 @@
#include "rpl_filter.h"
#include <sql_class.h>
#include "wsrep_mysqld.h"
-#include "wsrep_priv.h"
+#include "wsrep_binlog.h"
#include <cstdio>
#include <cstdlib>
@@ -26,10 +26,11 @@ extern handlerton *binlog_hton;
extern int binlog_close_connection(handlerton *hton, THD *thd);
extern ulonglong thd_to_trx_id(THD *thd);
-extern "C" int thd_binlog_format(const MYSQL_THD thd);
-// todo: share interface with ha_innodb.c
+extern "C" int thd_binlog_format(const MYSQL_THD thd);
+// todo: share interface with ha_innodb.c
-enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
+enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton,
+ bool all);
/*
a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct.
@@ -45,7 +46,7 @@ void wsrep_cleanup_transaction(THD *thd)
{
if (thd->wsrep_seqno_changed)
{
- if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle))
+ if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle))
{
DBUG_PRINT("wsrep", ("set committed fail"));
WSREP_WARN("set committed fail: %llu %d",
@@ -59,7 +60,7 @@ void wsrep_cleanup_transaction(THD *thd)
}
thd->wsrep_exec_mode= LOCAL_STATE;
}
- thd->wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
+ thd->wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
}
/*
@@ -87,7 +88,7 @@ void wsrep_register_hton(THD* thd, bool all)
}
/*
- wsrep exploits binlog's caches even if binlogging itself is not
+ wsrep exploits binlog's caches even if binlogging itself is not
activated. In such case connection close needs calling
actual binlog's method.
Todo: split binlog hton from its caches to use ones by wsrep
@@ -100,7 +101,7 @@ wsrep_close_connection(handlerton* hton, THD* thd)
if (thd_get_ha_data(thd, binlog_hton) != NULL)
binlog_hton->close_connection (binlog_hton, thd);
DBUG_RETURN(0);
-}
+}
/*
prepare/wsrep_run_wsrep_commit can fail in two ways
@@ -117,7 +118,7 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all)
//wsrep_seqno_t old = thd->wsrep_trx_seqno;
#endif
DBUG_ENTER("wsrep_prepare");
- if ((all ||
+ if ((all ||
!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd)))
{
@@ -156,16 +157,16 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY))
{
- if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle))
+ if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
{
DBUG_PRINT("wsrep", ("setting rollback fail"));
- WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
+ WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
(long long)thd->real_id, thd->query());
}
}
int rcode = 0;
- if (!wsrep_emulate_bin_log)
+ if (!wsrep_emulate_bin_log)
{
if (all) thd_binlog_trx_reset(thd);
}
@@ -183,14 +184,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all)
extern Rpl_filter* binlog_filter;
extern my_bool opt_log_slave_updates;
-extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
+
enum wsrep_trx_status
-wsrep_run_wsrep_commit(
- THD *thd, handlerton *hton, bool all)
+wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
{
- int rcode = -1;
- uint data_len = 0;
- uchar *rbr_data = NULL;
+ int rcode= -1;
+ int data_len = 0;
IO_CACHE *cache;
int replay_round= 0;
@@ -200,9 +199,9 @@ wsrep_run_wsrep_commit(
}
DBUG_ENTER("wsrep_run_wsrep_commit");
- if (thd->slave_thread && !opt_log_slave_updates) {
- DBUG_RETURN(WSREP_TRX_OK);
- }
+
+ if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK);
+
if (thd->wsrep_exec_mode == REPL_RECV) {
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
@@ -220,9 +219,9 @@ wsrep_run_wsrep_commit(
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
}
- if (thd->wsrep_exec_mode != LOCAL_STATE) {
- DBUG_RETURN(WSREP_TRX_OK);
- }
+
+ if (thd->wsrep_exec_mode != LOCAL_STATE) DBUG_RETURN(WSREP_TRX_OK);
+
if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) {
WSREP_DEBUG("commit for consistency check: %s", thd->query());
DBUG_RETURN(WSREP_TRX_OK);
@@ -244,10 +243,9 @@ wsrep_run_wsrep_commit(
mysql_mutex_lock(&LOCK_wsrep_replaying);
- while (wsrep_replaying > 0 &&
+ while (wsrep_replaying > 0 &&
thd->wsrep_conflict_state == NO_CONFLICT &&
- thd->killed == NOT_KILLED &&
- !shutdown_in_progress)
+ !shutdown_in_progress)
{
mysql_mutex_unlock(&LOCK_wsrep_replaying);
@@ -265,9 +263,12 @@ wsrep_run_wsrep_commit(
struct timespec wtime = {0, 1000000};
mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
&wtime);
+
if (replay_round++ % 100000 == 0)
- WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)",
- wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round);
+ WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) "
+ "conflict: %d (round: %d)",
+ wsrep_replaying, thd->thread_id,
+ thd->wsrep_conflict_state, replay_round);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
@@ -288,7 +289,8 @@ wsrep_run_wsrep_commit(
WSREP_DEBUG("innobase_commit abort after replaying wait %s",
(thd->query()) ? thd->query() : "void");
DBUG_RETURN(WSREP_TRX_ROLLBACK);
- }
+ }
+
thd->wsrep_query_state = QUERY_COMMITTING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
@@ -296,31 +298,31 @@ wsrep_run_wsrep_commit(
rcode = 0;
if (cache) {
thd->binlog_flush_pending_rows_event(true);
- rcode = wsrep_write_cache(cache, &rbr_data, &data_len);
- if (rcode) {
- WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode);
- if (data_len) my_free(rbr_data);
+ rcode = wsrep_write_cache(wsrep, thd, cache, (size_t*)&data_len);
+ if (WSREP_OK != rcode) {
+ WSREP_ERROR("rbr write fail, data_len: %zu, %d", data_len, rcode);
DBUG_RETURN(WSREP_TRX_ROLLBACK);
}
}
- if (data_len == 0)
+
+ if (data_len == 0)
{
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_exec_mode = LOCAL_COMMIT;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
- if (thd->get_stmt_da()->is_ok() &&
+ if (thd->get_stmt_da()->is_ok() &&
thd->get_stmt_da()->affected_rows() > 0 &&
!binlog_filter->is_on())
{
WSREP_DEBUG("empty rbr buffer, query: %s, "
- "affected rows: %llu, "
- "changed tables: %d, "
+ "affected rows: %llu, "
+ "changed tables: %d, "
"sql_log_bin: %d, "
"wsrep status (%d %d %d)",
thd->query(), thd->get_stmt_da()->affected_rows(),
stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin,
- thd->wsrep_exec_mode, thd->wsrep_query_state,
- thd->wsrep_conflict_state);
+ thd->wsrep_exec_mode, thd->wsrep_query_state,
+ thd->wsrep_conflict_state);
}
else
{
@@ -329,33 +331,31 @@ wsrep_run_wsrep_commit(
thd->wsrep_query_state= QUERY_EXEC;
DBUG_RETURN(WSREP_TRX_OK);
}
- if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_trx_handle.trx_id)
+
+ if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id)
{
- WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %d\n"
- "QUERY: %s\n"
- " => Skipping replication",
- thd->thread_id, data_len, thd->query());
- if (wsrep_debug)
- {
- wsrep_write_rbr_buf(thd, rbr_data, data_len);
- }
+ WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n"
+ "QUERY: %s\n"
+ " => Skipping replication",
+ thd->thread_id, data_len, thd->query());
rcode = WSREP_TRX_FAIL;
}
else if (!rcode)
{
- rcode = wsrep->pre_commit(
- wsrep,
- (wsrep_conn_id_t)thd->thread_id,
- &thd->wsrep_trx_handle,
- rbr_data,
- data_len,
- (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL,
- &thd->wsrep_trx_seqno);
- switch (rcode) {
+ if (WSREP_OK == rcode)
+ rcode = wsrep->pre_commit(wsrep,
+ (wsrep_conn_id_t)thd->thread_id,
+ &thd->wsrep_ws_handle,
+ WSREP_FLAG_COMMIT |
+ ((thd->wsrep_PA_safe) ?
+ 0ULL : WSREP_FLAG_PA_UNSAFE),
+ &thd->wsrep_trx_meta);
+
+ switch (rcode)
+ {
case WSREP_TRX_MISSING:
WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s",
thd->thread_id, thd->query());
- wsrep_write_rbr_buf(thd, rbr_data, data_len);
rcode = WSREP_OK;
break;
case WSREP_BF_ABORT:
@@ -375,15 +375,10 @@ wsrep_run_wsrep_commit(
} else {
WSREP_ERROR("I/O error reading from thd's binlog iocache: "
"errno=%d, io cache code=%d", my_errno, cache->error);
- if (data_len) my_free(rbr_data);
DBUG_ASSERT(0); // failure like this can not normally happen
DBUG_RETURN(WSREP_TRX_ERROR);
}
- if (data_len) {
- my_free(rbr_data);
- }
-
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch(rcode) {
case 0:
@@ -392,8 +387,8 @@ wsrep_run_wsrep_commit(
if (thd->transaction.xid_state.xid.get_my_xid())
{
wsrep_xid_init(&thd->transaction.xid_state.xid,
- wsrep_cluster_uuid(),
- thd->wsrep_trx_seqno);
+ &thd->wsrep_trx_meta.gtid.uuid,
+ thd->wsrep_trx_meta.gtid.seqno);
}
DBUG_PRINT("wsrep", ("replicating commit success"));
@@ -407,7 +402,7 @@ wsrep_run_wsrep_commit(
if (thd->wsrep_conflict_state == MUST_ABORT) {
thd->wsrep_conflict_state= ABORTED;
- }
+ }
else
{
WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state);
@@ -465,14 +460,15 @@ mysql_declare_plugin(wsrep)
&wsrep_storage_engine,
"wsrep",
"Codership Oy",
- "A pseudo storage engine to represent transactions in multi-master synchornous replication",
+ "A pseudo storage engine to represent transactions in multi-master "
+ "synchornous replication",
PLUGIN_LICENSE_GPL,
wsrep_hton_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
NULL, /* status variables */
NULL, /* system variables */
- NULL, /* config options */
+ NULL, /* config options */
0, /* flags */
}
mysql_declare_plugin_end;
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index af8b025cfd7..6cbbfc47ad3 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -21,6 +21,7 @@
#include "wsrep_sst.h"
#include "wsrep_utils.h"
#include "wsrep_var.h"
+#include "wsrep_binlog.h"
#include <cstdio>
#include <cstdlib>
#include "log_event.h"
@@ -37,34 +38,32 @@ const char* wsrep_data_home_dir = NULL;
const char* wsrep_dbug_option = "";
long wsrep_slave_threads = 1; // # of slave action appliers wanted
+int wsrep_slave_count_change = 0; // # of appliers to stop or start
my_bool wsrep_debug = 0; // enable debug level logging
my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx
ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx
my_bool wsrep_auto_increment_control = 1; // control auto increment variables
my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey
my_bool wsrep_incremental_data_collection = 0; // incremental data collection
-long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size
-long wsrep_max_ws_rows = 65536; // max number of rows in ws
+ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size
+ulong wsrep_max_ws_rows = 65536; // max number of rows in ws
int wsrep_to_isolation = 0; // # of active TO isolation threads
my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key
long wsrep_max_protocol_version = 2; // maximum protocol version to use
ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC;
my_bool wsrep_recovery = 0; // recovery
my_bool wsrep_replicate_myisam = 0; // enable myisam replication
-my_bool wsrep_log_conflicts = 0; //
+my_bool wsrep_log_conflicts = 0;
ulong wsrep_mysql_replication_bundle = 0;
+my_bool wsrep_desync = 0; // desynchronize the node from the
+ // cluster
my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals
-my_bool wsrep_desync = 0; // desynchronize the node from the cluster
/*
* End configuration options
*/
static const wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
-const wsrep_uuid_t* wsrep_cluster_uuid()
-{
- return &cluster_uuid;
-}
static char cluster_uuid_str[40]= { 0, };
static const char* cluster_status_str[WSREP_VIEW_MAX] =
{
@@ -102,7 +101,18 @@ long wsrep_protocol_version = 2;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
-/* wsrep callbacks */
+extern wsrep_cb_status_t wsrep_apply_cb(void *ctx,
+ const void* buf, size_t buf_len,
+ const wsrep_trx_meta_t* meta);
+
+extern wsrep_cb_status_t wsrep_commit_cb(void *ctx,
+ const wsrep_trx_meta_t* meta,
+ wsrep_bool_t *exit,
+ wsrep_bool_t commit);
+
+extern wsrep_cb_status_t wsrep_unordered_cb(void* ctx,
+ const void* data,
+ size_t size);
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
@@ -186,19 +196,22 @@ void wsrep_get_SE_checkpoint(XID* xid)
plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
}
-static void wsrep_view_handler_cb (void* app_ctx,
- void* recv_ctx,
- const wsrep_view_info_t* view,
- const char* state,
- size_t state_len,
- void** sst_req,
- ssize_t* sst_req_len)
+static wsrep_cb_status_t
+wsrep_view_handler_cb (void* app_ctx,
+ void* recv_ctx,
+ const wsrep_view_info_t* view,
+ const char* state,
+ size_t state_len,
+ void** sst_req,
+ size_t* sst_req_len)
{
wsrep_member_status_t new_status= local_status.get();
- if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t)))
+ if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
{
- memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid));
+ memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid,
+ sizeof(cluster_uuid));
+
wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
sizeof(cluster_uuid_str));
}
@@ -210,7 +223,7 @@ static void wsrep_view_handler_cb (void* app_ctx,
WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
"number of nodes: %ld, my index: %ld, protocol version %d",
- wsrep_cluster_state_uuid, (long long)view->seqno,
+ wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
(long long)wsrep_cluster_conf_id, wsrep_cluster_status,
wsrep_cluster_size, wsrep_local_index, view->proto_ver);
@@ -290,14 +303,14 @@ static void wsrep_view_handler_cb (void* app_ctx,
{
wsrep_SE_init_grab();
// Signal mysqld init thread to continue
- wsrep_sst_complete (&cluster_uuid, view->seqno, false);
+ wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
// and wait for SE initialization
wsrep_SE_init_wait();
}
else
{
local_uuid= cluster_uuid;
- local_seqno= view->seqno;
+ local_seqno= view->state_id.seqno;
}
/* Init storage engine XIDs from first view */
XID xid;
@@ -310,7 +323,7 @@ static void wsrep_view_handler_cb (void* app_ctx,
if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
{
WSREP_ERROR("Undetected state gap. Can't continue.");
- wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno,
+ wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
&local_uuid, -1);
unireg_abort(1);
}
@@ -322,9 +335,23 @@ static void wsrep_view_handler_cb (void* app_ctx,
global_system_variables.auto_increment_increment= view->memb_num;
}
+ { /* capabilities may be updated on new configuration */
+ uint64_t const caps(wsrep->capabilities (wsrep));
+
+ my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
+ if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
+ {
+ WSREP_WARN("Unsupported protocol downgrade: "
+ "incremental data collection disabled. Expect abort.");
+ }
+ wsrep_incremental_data_collection = idc;
+ }
+
out:
wsrep_startup= FALSE;
local_status.set(new_status, view);
+
+ return WSREP_CB_SUCCESS;
}
void wsrep_ready_set (my_bool x)
@@ -407,6 +434,8 @@ static void wsrep_init_position()
}
}
+extern const char* my_bind_addr_str;
+
int wsrep_init()
{
int rcode= -1;
@@ -460,7 +489,7 @@ int wsrep_init()
size_t const node_addr_max= sizeof(node_addr) - 1;
if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
{
- size_t const ret= guess_ip(node_addr, node_addr_max);
+ size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
if (!(ret > 0 && ret < node_addr_max))
{
WSREP_WARN("Failed to guess base node address. Set it explicitly via "
@@ -478,38 +507,57 @@ int wsrep_init()
if ((!wsrep_node_incoming_address ||
!strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
{
- size_t const node_addr_len= strlen(node_addr);
- if (node_addr_len > 0)
+ unsigned int my_bind_ip= INADDR_ANY; // default if not set
+ if (my_bind_addr_str && strlen(my_bind_addr_str))
{
- const char* const colon= strrchr(node_addr, ':');
- if (strchr(node_addr, ':') == colon) // 1 or 0 ':'
+ my_bind_ip= wsrep_check_ip(my_bind_addr_str);
+ }
+
+ if (INADDR_ANY != my_bind_ip)
+ {
+ if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
{
- size_t const ip_len= colon ? colon - node_addr : node_addr_len;
- if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
+ snprintf(inc_addr, inc_addr_max, "%s:%u",
+ my_bind_addr_str, (int)mysqld_port);
+ } // else leave inc_addr an empty string - mysqld is not listening for
+ // client connections on network interfaces.
+ }
+ else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible
+ {
+ size_t const node_addr_len= strlen(node_addr);
+ if (node_addr_len > 0)
+ {
+ const char* const colon= strrchr(node_addr, ':');
+ if (strchr(node_addr, ':') == colon) // 1 or 0 ':'
{
- memcpy (inc_addr, node_addr, ip_len);
- snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port);
+ size_t const ip_len= colon ? colon - node_addr : node_addr_len;
+ if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
+ {
+ memcpy (inc_addr, node_addr, ip_len);
+ snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",
+ (int)mysqld_port);
+ }
+ else
+ {
+ WSREP_WARN("Guessing address for incoming client connections: "
+ "address too long.");
+ inc_addr[0]= '\0';
+ }
}
else
{
WSREP_WARN("Guessing address for incoming client connections: "
- "address too long.");
+ "too many colons :) .");
inc_addr[0]= '\0';
}
}
- else
+
+ if (!strlen(inc_addr))
{
- WSREP_WARN("Guessing address for incoming client connections: "
- "too many colons :) .");
- inc_addr[0]= '\0';
+ WSREP_WARN("Guessing address for incoming client connections failed. "
+ "Try setting wsrep_node_incoming_address explicitly.");
}
}
-
- if (!strlen(inc_addr))
- {
- WSREP_WARN("Guessing address for incoming client connections failed. "
- "Try setting wsrep_node_incoming_address explicitly.");
- }
}
else if (!strchr(wsrep_node_incoming_address, ':')) // no port included
{
@@ -536,6 +584,8 @@ int wsrep_init()
struct wsrep_init_args wsrep_args;
+ struct wsrep_gtid const state_id = { local_uuid, local_seqno };
+
wsrep_args.data_dir = wsrep_data_home_dir;
wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
wsrep_args.node_address = node_addr;
@@ -544,13 +594,13 @@ int wsrep_init()
wsrep_provider_options : "";
wsrep_args.proto_ver = wsrep_max_protocol_version;
- wsrep_args.state_uuid = &local_uuid;
- wsrep_args.state_seqno = local_seqno;
+ wsrep_args.state_id = &state_id;
wsrep_args.logger_cb = wsrep_log_cb;
wsrep_args.view_handler_cb = wsrep_view_handler_cb;
wsrep_args.apply_cb = wsrep_apply_cb;
wsrep_args.commit_cb = wsrep_commit_cb;
+ wsrep_args.unordered_cb = wsrep_unordered_cb;
wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
wsrep_args.synced_cb = wsrep_synced_cb;
@@ -679,6 +729,7 @@ bool wsrep_start_replication()
*/
const char* cluster_address =
wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address;
+ bool const bootstrap(TRUE == wsrep_new_cluster);
wsrep_new_cluster= FALSE;
WSREP_INFO("Start replication");
@@ -686,7 +737,8 @@ bool wsrep_start_replication()
if ((rcode = wsrep->connect(wsrep,
wsrep_cluster_name,
cluster_address,
- wsrep_sst_donor)))
+ wsrep_sst_donor,
+ bootstrap)))
{
if (-ESOCKTNOSUPPORT == rcode)
{
@@ -707,11 +759,6 @@ bool wsrep_start_replication()
{
wsrep_connected= TRUE;
- uint64_t caps = wsrep->capabilities (wsrep);
-
- wsrep_incremental_data_collection =
- !!(caps & WSREP_CAP_WRITE_SET_INCREMENTS);
-
char* opts= wsrep->options_get(wsrep);
if (opts)
{
@@ -736,8 +783,8 @@ wsrep_causal_wait (THD* thd)
{
// This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
// TODO: modify to check if thd has locked any rows.
- wsrep_seqno_t seqno;
- wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno);
+ wsrep_gtid_t gtid;
+ wsrep_status_t ret= wsrep->causal_read (wsrep, &gtid);
if (unlikely(WSREP_OK != ret))
{
@@ -785,7 +832,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
{
for (size_t i= 0; i < key_arr->keys_len; ++i)
{
- my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts);
+ my_free((void*)key_arr->keys[i].key_parts);
}
my_free(key_arr->keys);
key_arr->keys= 0;
@@ -805,7 +852,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
static bool wsrep_prepare_key_for_isolation(const char* db,
const char* table,
- wsrep_key_part_t* key,
+ wsrep_buf_t* key,
size_t* key_len)
{
if (*key_len < 2) return false;
@@ -824,13 +871,13 @@ static bool wsrep_prepare_key_for_isolation(const char* db,
// sql_print_information("%s.%s", db, table);
if (db)
{
- key[*key_len].buf= db;
- key[*key_len].buf_len= strlen(db);
+ key[*key_len].ptr= db;
+ key[*key_len].len= strlen(db);
++(*key_len);
if (table)
{
- key[*key_len].buf= table;
- key[*key_len].buf_len= strlen(table);
+ key[*key_len].ptr= table;
+ key[*key_len].len= strlen(table);
++(*key_len);
}
}
@@ -863,23 +910,23 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
{
if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
{
- sql_print_error("Can't allocate memory for key_array");
+ WSREP_ERROR("Can't allocate memory for key_array");
goto err;
}
ka->keys_len= 1;
- if (!(ka->keys[0].key_parts= (wsrep_key_part_t*)
- my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
+ if (!(ka->keys[0].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{
- sql_print_error("Can't allocate memory for key_parts");
+ WSREP_ERROR("Can't allocate memory for key_parts");
goto err;
}
- ka->keys[0].key_parts_len= 2;
+ ka->keys[0].key_parts_num= 2;
if (!wsrep_prepare_key_for_isolation(
db, table,
- (wsrep_key_part_t*)ka->keys[0].key_parts,
- &ka->keys[0].key_parts_len))
+ (wsrep_buf_t*)ka->keys[0].key_parts,
+ &ka->keys[0].key_parts_num))
{
- sql_print_error("Preparing keys for isolation failed");
+ WSREP_ERROR("Preparing keys for isolation failed");
goto err;
}
}
@@ -895,24 +942,24 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
MYF(MY_ALLOW_ZERO_PTR));
if (!tmp)
{
- sql_print_error("Can't allocate memory for key_array");
+ WSREP_ERROR("Can't allocate memory for key_array");
goto err;
}
ka->keys= tmp;
- if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*)
- my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
+ if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{
- sql_print_error("Can't allocate memory for key_parts");
+ WSREP_ERROR("Can't allocate memory for key_parts");
goto err;
}
- ka->keys[ka->keys_len].key_parts_len= 2;
+ ka->keys[ka->keys_len].key_parts_num= 2;
++ka->keys_len;
if (!wsrep_prepare_key_for_isolation(
table->db, table->table_name,
- (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts,
- &ka->keys[ka->keys_len - 1].key_parts_len))
+ (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
+ &ka->keys[ka->keys_len - 1].key_parts_num))
{
- sql_print_error("Preparing keys for isolation failed");
+ WSREP_ERROR("Preparing keys for isolation failed");
goto err;
}
}
@@ -929,7 +976,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
size_t cache_key_len,
const uchar* row_id,
size_t row_id_len,
- wsrep_key_part_t* key,
+ wsrep_buf_t* key,
size_t* key_len)
{
if (*key_len < 3) return false;
@@ -939,28 +986,30 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
{
case 0:
{
- key[*key_len].buf = cache_key;
- key[*key_len].buf_len = cache_key_len;
- ++(*key_len);
+ key[0].ptr = cache_key;
+ key[0].len = cache_key_len;
+
+ *key_len = 1;
break;
}
case 1:
case 2:
{
- key[*key_len].buf = cache_key;
- key[*key_len].buf_len = strlen( (char*)cache_key );
- ++(*key_len);
- key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1;
- key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) );
- ++(*key_len);
+ key[0].ptr = cache_key;
+ key[0].len = strlen( (char*)cache_key );
+
+ key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
+ key[1].len = strlen( (char*)(key[1].ptr) );
+
+ *key_len = 2;
break;
}
default:
return false;
}
- key[*key_len].buf = row_id;
- key[*key_len].buf_len = row_id_len;
+ key[*key_len].ptr = row_id;
+ key[*key_len].len = row_id_len;
++(*key_len);
return true;
@@ -973,7 +1022,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
* Return 0 in case of success, 1 in case of error.
*/
int wsrep_to_buf_helper(
- THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len)
+ THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len)
{
IO_CACHE tmp_io_cache;
if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
@@ -985,8 +1034,8 @@ int wsrep_to_buf_helper(
if (thd->wsrep_TOI_pre_query)
{
Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
- thd->wsrep_TOI_pre_query_len,
- FALSE, FALSE, FALSE, 0);
+ thd->wsrep_TOI_pre_query_len,
+ FALSE, FALSE, FALSE, 0);
if (ev.write(&tmp_io_cache)) ret= 1;
}
@@ -994,7 +1043,7 @@ int wsrep_to_buf_helper(
Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
if (ev.write(&tmp_io_cache)) ret= 1;
- if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1;
+ if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, (size_t*)buf_len)) ret= 1;
close_cached_file(&tmp_io_cache);
return ret;
@@ -1002,7 +1051,7 @@ int wsrep_to_buf_helper(
#include "sql_show.h"
static int
-create_view_query(THD *thd, uchar** buf, uint* buf_len)
+create_view_query(THD *thd, uchar** buf, int* buf_len)
{
LEX *lex= thd->lex;
SELECT_LEX *select_lex= &lex->select_lex;
@@ -1021,15 +1070,15 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
if (!lex->definer)
{
/*
- DEFINER-clause is missing; we have to create default definer in
- persistent arena to be PS/SP friendly.
- If this is an ALTER VIEW then the current user should be set as
- the definer.
+ DEFINER-clause is missing; we have to create default definer in
+ persistent arena to be PS/SP friendly.
+ If this is an ALTER VIEW then the current user should be set as
+ the definer.
*/
if (!(lex->definer= create_default_definer(thd)))
{
- WSREP_WARN("view default definer issue");
+ WSREP_WARN("view default definer issue");
}
}
@@ -1056,7 +1105,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
List_iterator_fast<LEX_STRING> names(lex->view_list);
LEX_STRING *name;
int i;
-
+
for (i= 0; (name= names++); i++)
{
buff.append(i ? ", " : "(");
@@ -1079,11 +1128,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
{
wsrep_status_t ret(WSREP_WARNING);
uchar* buf(0);
- uint buf_len(0);
+ int buf_len(0);
int buf_err;
- WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode, thd->query() );
+ WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode, thd->query() );
switch (thd->lex->sql_command)
{
case SQLCOM_CREATE_VIEW:
@@ -1106,19 +1155,20 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
}
wsrep_key_arr_t key_arr= {0, 0};
+ struct wsrep_buf buff = { buf, buf_len };
if (!buf_err &&
wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&&
WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
key_arr.keys, key_arr.keys_len,
- buf, buf_len,
- &thd->wsrep_trx_seqno)))
+ &buff, 1,
+ &thd->wsrep_trx_meta)))
{
thd->wsrep_exec_mode= TOTAL_ORDER;
wsrep_to_isolation++;
if (buf) my_free(buf);
wsrep_keys_free(&key_arr);
- WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode);
+ WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode);
}
else {
/* jump to error handler in mysql_execute_command() */
@@ -1137,10 +1187,10 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
static void wsrep_TOI_end(THD *thd) {
wsrep_status_t ret;
wsrep_to_isolation--;
- WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void")
if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
- WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno);
+ WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
}
else {
WSREP_WARN("TO isolation end failed for: %d, sql: %s",
@@ -1151,7 +1201,7 @@ static void wsrep_TOI_end(THD *thd) {
static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
{
wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() );
ret = wsrep->desync(wsrep);
@@ -1196,7 +1246,7 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
static void wsrep_RSU_end(THD *thd)
{
wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() );
@@ -1276,10 +1326,10 @@ void wsrep_to_isolation_end(THD *thd) {
"request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
"granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
msg, \
- req->thread_id, (long long)req->wsrep_trx_seqno, \
+ req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
req->get_command(), req->lex->sql_command, req->query(), \
- gra->thread_id, (long long)gra->wsrep_trx_seqno, \
+ gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
gra->get_command(), gra->lex->sql_command, gra->query());
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 67bc9d0ea15..276f2a7b580 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -28,7 +28,6 @@ class THD;
#ifdef WITH_WSREP
#include "../wsrep/wsrep_api.h"
-//#include "wsrep_mysqld.h"
enum wsrep_exec_mode {
LOCAL_STATE,
REPL_RECV,
@@ -73,15 +72,16 @@ extern const char* wsrep_node_incoming_address;
extern const char* wsrep_data_home_dir;
extern const char* wsrep_dbug_option;
extern long wsrep_slave_threads;
-extern my_bool wsrep_debug;
+extern int wsrep_slave_count_change;
+extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug;
extern my_bool wsrep_convert_LOCK_to_trx;
extern ulong wsrep_retry_autocommit;
extern my_bool wsrep_auto_increment_control;
extern my_bool wsrep_drupal_282555_workaround;
extern my_bool wsrep_incremental_data_collection;
extern const char* wsrep_start_position;
-extern long long wsrep_max_ws_size;
-extern long wsrep_max_ws_rows;
+extern ulong wsrep_max_ws_size;
+extern ulong wsrep_max_ws_rows;
extern const char* wsrep_notify_cmd;
extern my_bool wsrep_certify_nonPK;
extern long wsrep_max_protocol_version;
@@ -136,13 +136,13 @@ extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
extern "C" const char * wsrep_thd_exec_mode_str(THD *thd);
extern "C" const char * wsrep_thd_conflict_state_str(THD *thd);
extern "C" const char * wsrep_thd_query_state_str(THD *thd);
-extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd);
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd);
extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
extern "C" void wsrep_thd_set_query_state(
- THD *thd, enum wsrep_query_state state);
+ THD *thd, enum wsrep_query_state state);
extern "C" void wsrep_thd_set_conflict_state(
- THD *thd, enum wsrep_conflict_state state);
+ THD *thd, enum wsrep_conflict_state state);
extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id);
@@ -259,20 +259,12 @@ extern mysql_mutex_t LOCK_wsrep_rollback;
extern mysql_cond_t COND_wsrep_rollback;
extern int wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_replaying;
-extern mysql_cond_t COND_wsrep_replaying;
-extern wsrep_aborting_thd_t wsrep_aborting_thd;
-extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug;
-extern my_bool wsrep_convert_LOCK_to_trx;
-extern ulong wsrep_retry_autocommit;
-extern my_bool wsrep_emulate_bin_log;
-extern my_bool wsrep_auto_increment_control;
-extern my_bool wsrep_drupal_282555_workaround;
-extern long long wsrep_max_ws_size;
-extern long wsrep_max_ws_rows;
-extern int wsrep_to_isolation;
-extern my_bool wsrep_certify_nonPK;
+extern mysql_cond_t COND_wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_desync;
+extern wsrep_aborting_thd_t wsrep_aborting_thd;
+extern my_bool wsrep_emulate_bin_log;
+extern int wsrep_to_isolation;
extern PSI_mutex_key key_LOCK_wsrep_ready;
extern PSI_mutex_key key_COND_wsrep_ready;
@@ -295,12 +287,11 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
void wsrep_to_isolation_end(THD *thd);
int wsrep_to_buf_helper(
- THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len);
-int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len);
-int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len);
-int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len);
+ THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len);
+int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len);
+int wsrep_create_trigger_query(THD *thd, uchar** buf, int* buf_len);
+int wsrep_create_event_query(THD *thd, uchar** buf, int* buf_len);
-const wsrep_uuid_t* wsrep_cluster_uuid();
struct xid_t;
void wsrep_set_SE_checkpoint(xid_t*);
diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h
index 291823d773e..5c66587d757 100644
--- a/sql/wsrep_priv.h
+++ b/sql/wsrep_priv.h
@@ -32,24 +32,22 @@ ssize_t wsrep_sst_prepare (void** msg);
wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx,
void* recv_ctx,
const void* msg, size_t msg_len,
- const wsrep_gtid_t* state_id,
+ const wsrep_gtid_t* current_id,
const char* state, size_t state_len,
bool bypass);
+extern unsigned int wsrep_check_ip (const char* addr);
+extern size_t wsrep_guess_ip (char* buf, size_t buf_len);
+extern size_t wsrep_guess_address(char* buf, size_t buf_len);
extern wsrep_uuid_t local_uuid;
extern wsrep_seqno_t local_seqno;
// a helper function
-void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t,
+extern void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t,
const void*, size_t);
/*! SST thread signals init thread about sst completion */
-void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool);
-
-extern void wsrep_notify_status (wsrep_member_status_t new_status,
- const wsrep_view_info_t* view = 0);
-
-/* binlog-related stuff */
-int wsrep_write_cache(IO_CACHE *cache, uchar **buf, size_t *buf_len);
-
+extern void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool);
+void wsrep_notify_status (wsrep_member_status_t new_status,
+ const wsrep_view_info_t* view = 0);
#endif /* WSREP_PRIV_H */
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc
index d651e1ed0a9..6a97b29ff6d 100644
--- a/sql/wsrep_sst.cc
+++ b/sql/wsrep_sst.cc
@@ -213,8 +213,8 @@ bool wsrep_sst_wait ()
// Signal end of SST
void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
- wsrep_seqno_t sst_seqno,
- bool needed)
+ wsrep_seqno_t sst_seqno,
+ bool needed)
{
if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
if (!sst_complete)
@@ -232,13 +232,26 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
mysql_mutex_unlock (&LOCK_wsrep_sst);
}
+void wsrep_sst_received (wsrep_t* const wsrep,
+ const wsrep_uuid_t* const uuid,
+ wsrep_seqno_t const seqno,
+ const void* const state,
+ size_t const state_len)
+{
+ int const rcode(seqno < 0 ? seqno : 0);
+ wsrep_gtid_t const state_id = {
+ *uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
+ };
+ wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
+}
+
// Let applier threads to continue
void wsrep_sst_continue ()
{
if (sst_needed)
{
WSREP_INFO("Signalling provider to continue.");
- wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
+ wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
}
}
@@ -434,7 +447,6 @@ static ssize_t sst_prepare_other (const char* method,
return ret;
}
-//extern ulong my_bind_addr;
extern uint mysqld_port;
/*! Just tells donor where to send mysqldump */
@@ -518,7 +530,7 @@ ssize_t wsrep_sst_prepare (void** msg)
}
else
{
- ssize_t ret= guess_ip (ip_buf, ip_max);
+ ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
if (ret && ret < ip_max)
{
@@ -706,7 +718,9 @@ static int sst_donate_mysqldump (const char* addr,
ret= sst_run_shell (cmd_str, 3);
}
- wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno);
+ wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
+
+ wsrep->sst_sent (wsrep, &state_id, ret);
return ret;
}
@@ -896,7 +910,10 @@ wait_signal:
}
// signal to donor that SST is over
- wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno);
+ struct wsrep_gtid const state_id = {
+ ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
+ };
+ wsrep->sst_sent (wsrep, &state_id, -err);
proc.wait();
return NULL;
@@ -950,10 +967,9 @@ static int sst_donate_other (const char* method,
return arg.err;
}
-int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
+wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
const void* msg, size_t msg_len,
- const wsrep_uuid_t* current_uuid,
- wsrep_seqno_t current_seqno,
+ const wsrep_gtid_t* current_gtid,
const char* state, size_t state_len,
bool bypass)
{
@@ -967,20 +983,19 @@ int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
const char* data = method + method_len + 1;
char uuid_str[37];
- wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str));
+ wsrep_uuid_print (&current_gtid->uuid, uuid_str, sizeof(uuid_str));
int ret;
if (!strcmp (WSREP_SST_MYSQLDUMP, method))
{
- ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno,
- bypass);
+ ret = sst_donate_mysqldump(data, &current_gtid->uuid, uuid_str,
+ current_gtid->seqno, bypass);
}
else
{
- ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass);
+ ret = sst_donate_other(method, data, uuid_str, current_gtid->seqno,bypass);
}
-
- return (ret > 0 ? 0 : ret);
+ return (ret > 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
}
void wsrep_SE_init_grab()
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
index 9cbd32cac73..35f5a9bca6a 100644
--- a/sql/wsrep_thd.cc
+++ b/sql/wsrep_thd.cc
@@ -113,17 +113,17 @@ void wsrep_replay_transaction(THD *thd)
/* checking if BF trx must be replayed */
if (thd->wsrep_conflict_state== MUST_REPLAY) {
if (thd->wsrep_exec_mode!= REPL_RECV) {
- if (thd->stmt_da->is_sent)
+ if (thd->get_stmt_da()->is_sent())
{
WSREP_ERROR("replay issue, thd has reported status already");
}
- thd->stmt_da->reset_diagnostics_area();
+ thd->get_stmt_da()->reset_diagnostics_area();
thd->wsrep_conflict_state= REPLAYING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
- mysql_reset_thd_for_next_command(thd);
- thd->killed= THD::NOT_KILLED;
+ mysql_reset_thd_for_next_command(thd, opt_userstat_running);
+ thd->killed= NOT_KILLED;
close_thread_tables(thd);
if (thd->locked_tables_mode && thd->lock)
{
@@ -157,7 +157,7 @@ void wsrep_replay_transaction(THD *thd)
wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
WSREP_DEBUG("trx_replay successful for: %ld %llu",
thd->thread_id, (long long)thd->real_id);
- if (thd->stmt_da->is_sent)
+ if (thd->get_stmt_da()->is_sent())
{
WSREP_WARN("replay ok, thd has reported status");
}
@@ -167,7 +167,7 @@ void wsrep_replay_transaction(THD *thd)
}
break;
case WSREP_TRX_FAIL:
- if (thd->stmt_da->is_sent)
+ if (thd->get_stmt_da()->is_sent())
{
WSREP_ERROR("replay failed, thd has reported status");
}
@@ -237,7 +237,7 @@ static void wsrep_replication_process(THD *thd)
* avoid mysql shutdown. This is because the killer will then handle
* shutdown processing (or replication restarting)
*/
- if (thd->killed != THD::KILL_CONNECTION)
+ if (thd->killed != KILL_CONNECTION)
{
wsrep_kill_mysql(thd);
}
@@ -289,7 +289,7 @@ static void wsrep_rollback_process(THD *thd)
mysql_mutex_lock(&LOCK_wsrep_rollback);
wsrep_aborting_thd= NULL;
- while (thd->killed == THD::NOT_KILLED) {
+ while (thd->killed == NOT_KILLED) {
thd_proc_info(thd, "wsrep aborter idle");
thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
thd->mysys_var->current_cond= &COND_wsrep_rollback;
diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc
index 37e537c62e4..b01bdaaa15e 100644
--- a/sql/wsrep_utils.cc
+++ b/sql/wsrep_utils.cc
@@ -22,16 +22,17 @@
#include "wsrep_utils.h"
#include "wsrep_mysqld.h"
-//#include "wsrep_api.h"
-//#include "wsrep_priv.h"
+
+#include <sql_class.h>
+
#include <spawn.h> // posix_spawn()
#include <unistd.h> // pipe()
#include <errno.h> // errno
#include <string.h> // strerror()
#include <sys/wait.h> // waitpid()
-
-#include <sql_class.h>
-#include "wsrep_priv.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h> // getaddrinfo()
extern char** environ; // environment variables
@@ -317,23 +318,69 @@ thd::~thd ()
} // namespace wsp
-extern ulong my_bind_addr;
-extern uint mysqld_port;
+/* Returns INADDR_NONE, INADDR_ANY, INADDR_LOOPBACK or something else */
+unsigned int wsrep_check_ip (const char* const addr)
+{
+ unsigned int ret = INADDR_NONE;
+ struct addrinfo *res, hints;
+
+ memset (&hints, 0, sizeof(hints));
+ hints.ai_flags= AI_PASSIVE/*|AI_ADDRCONFIG*/;
+ hints.ai_socktype= SOCK_STREAM;
+ hints.ai_family= AF_UNSPEC;
-size_t guess_ip (char* buf, size_t buf_len)
+ int gai_ret = getaddrinfo(addr, NULL, &hints, &res);
+ if (0 == gai_ret)
+ {
+ if (AF_INET == res->ai_family) /* IPv4 */
+ {
+ struct sockaddr_in* a= (struct sockaddr_in*)res->ai_addr;
+ ret= htonl(a->sin_addr.s_addr);
+ }
+ else /* IPv6 */
+ {
+ struct sockaddr_in6* a= (struct sockaddr_in6*)res->ai_addr;
+ if (IN6_IS_ADDR_UNSPECIFIED(&a->sin6_addr))
+ ret= INADDR_ANY;
+ else if (IN6_IS_ADDR_LOOPBACK(&a->sin6_addr))
+ ret= INADDR_LOOPBACK;
+ else
+ ret= 0xdeadbeef;
+ }
+ freeaddrinfo (res);
+ }
+ else {
+ WSREP_ERROR ("getaddrinfo() failed on '%s': %d (%s)",
+ addr, gai_ret, gai_strerror(gai_ret));
+ }
+
+ // uint8_t* b= (uint8_t*)&ret;
+ // fprintf (stderr, "########## wsrep_check_ip returning: %hhu.%hhu.%hhu.%hhu\n",
+ // b[0], b[1], b[2], b[3]);
+
+ return ret;
+}
+
+extern const char* my_bind_addr_str;
+extern uint mysqld_port;
+
+size_t wsrep_guess_ip (char* buf, size_t buf_len)
{
size_t ip_len = 0;
- if (htonl(INADDR_NONE) == my_bind_addr) {
- WSREP_ERROR("Networking not configured, cannot receive state transfer.");
- return 0;
- }
+ if (my_bind_addr_str && strlen(my_bind_addr_str))
+ {
+ unsigned int const ip_type= wsrep_check_ip(my_bind_addr_str);
- if (htonl(INADDR_ANY) != my_bind_addr) {
- uint8_t* b = (uint8_t*)&my_bind_addr;
- ip_len = snprintf (buf, buf_len,
- "%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]);
- return ip_len;
+ if (INADDR_NONE == ip_type) {
+ WSREP_ERROR("Networking not configured, cannot receive state transfer.");
+ return 0;
+ }
+
+ if (INADDR_ANY != ip_type) {;
+ strncpy (buf, my_bind_addr_str, buf_len);
+ return strlen(buf);
+ }
}
// mysqld binds to all interfaces - try IP from wsrep_node_address
@@ -404,9 +451,9 @@ size_t guess_ip (char* buf, size_t buf_len)
return ip_len;
}
-size_t guess_address(char* buf, size_t buf_len)
+size_t wsrep_guess_address(char* buf, size_t buf_len)
{
- size_t addr_len = guess_ip (buf, buf_len);
+ size_t addr_len = wsrep_guess_ip (buf, buf_len);
if (addr_len && addr_len < buf_len) {
addr_len += snprintf (buf + addr_len, buf_len - addr_len,
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index bd041ed51ff..c3c795a291d 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -38,7 +38,6 @@ const char* wsrep_node_address = 0;
const char* wsrep_node_incoming_address = 0;
const char* wsrep_start_position = 0;
ulong wsrep_OSU_method_options;
-static int wsrep_thread_change = 0;
int wsrep_init_vars()
{
@@ -61,15 +60,6 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type)
// FIXME: this variable probably should be changed only per session
thd->variables.wsrep_on = global_system_variables.wsrep_on;
}
- else {
- }
-
-#ifdef REMOVED
- if (thd->variables.wsrep_on)
- thd->variables.option_bits |= (OPTION_BIN_LOG);
- else
- thd->variables.option_bits &= ~(OPTION_BIN_LOG);
-#endif
return false;
}
@@ -78,8 +68,6 @@ void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type)
if (var_type == OPT_GLOBAL) {
thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads;
}
- else {
- }
}
static int wsrep_start_position_verify (const char* start_str)
@@ -149,7 +137,7 @@ bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type)
wsrep_set_local_position (wsrep_start_position);
if (wsrep) {
- wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
+ wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
}
return 0;
@@ -455,7 +443,7 @@ void wsrep_node_address_init (const char* value)
bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var)
{
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
- wsrep_thread_change = var->value->val_int() - wsrep_slave_threads;
+ wsrep_slave_count_change = var->value->val_int() - wsrep_slave_threads;
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
return 0;
@@ -463,13 +451,10 @@ bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var)
bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type)
{
- if (wsrep_thread_change > 0)
- {
- wsrep_create_appliers(wsrep_thread_change);
- }
- else if (wsrep_thread_change < 0)
+ if (wsrep_slave_count_change > 0)
{
- wsrep_close_applier_threads(-wsrep_thread_change);
+ wsrep_create_appliers(wsrep_slave_count_change);
+ wsrep_slave_count_change = 0;
}
return false;
}