summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt3
-rw-r--r--sql/events.cc2
-rw-r--r--sql/handler.cc12
-rw-r--r--sql/log.cc154
-rw-r--r--sql/log.h15
-rw-r--r--sql/log_event.cc57
-rw-r--r--sql/mdl.cc5
-rw-r--r--sql/mysqld.cc101
-rw-r--r--sql/mysqld.h8
-rw-r--r--sql/sp.cc10
-rw-r--r--sql/sql_base.cc9
-rw-r--r--sql/sql_class.cc47
-rw-r--r--sql/sql_class.h15
-rw-r--r--sql/sql_parse.cc886
-rw-r--r--sql/sql_trigger.cc4
-rw-r--r--sql/sys_vars.cc14
-rw-r--r--sql/transaction.cc3
-rw-r--r--sql/wsrep_applier.cc351
-rw-r--r--sql/wsrep_applier.h38
-rw-r--r--sql/wsrep_binlog.cc321
-rw-r--r--sql/wsrep_binlog.h49
-rw-r--r--sql/wsrep_hton.cc173
-rw-r--r--sql/wsrep_mysqld.cc368
-rw-r--r--sql/wsrep_mysqld.h195
-rw-r--r--sql/wsrep_notify.cc3
-rw-r--r--sql/wsrep_priv.h210
-rw-r--r--sql/wsrep_sst.cc68
-rw-r--r--sql/wsrep_sst.h40
-rw-r--r--sql/wsrep_thd.cc464
-rw-r--r--sql/wsrep_thd.h32
-rw-r--r--sql/wsrep_utils.cc87
-rw-r--r--sql/wsrep_utils.h208
-rw-r--r--sql/wsrep_var.cc34
-rw-r--r--sql/wsrep_var.h83
34 files changed, 2361 insertions, 1708 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index e59ea142a11..2378ced6504 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -48,6 +48,9 @@ IF(WITH_WSREP)
wsrep_sst.cc
wsrep_utils.cc
wsrep_var.cc
+ wsrep_binlog.cc
+ wsrep_applier.cc
+ wsrep_thd.cc
)
SET(WSREP_LIB wsrep)
ENDIF()
diff --git a/sql/events.cc b/sql/events.cc
index 78226bbf7ef..a39f31f416c 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -1147,7 +1147,7 @@ end:
DBUG_RETURN(ret);
}
#ifdef WITH_WSREP
-int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len)
+int wsrep_create_event_query(THD *thd, uchar** buf, size_t* buf_len)
{
String log_query;
diff --git a/sql/handler.cc b/sql/handler.cc
index 9cadfb2cf3f..58356116883 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1248,7 +1248,7 @@ int ha_commit_trans(THD *thd, bool all)
#ifdef WITH_WSREP
if (!WSREP(thd) &&
- thd->mdl_context.acquire_lock(&mdl_request,
+ thd->mdl_context.acquire_lock(&mdl_request,
#else
if (thd->mdl_context.acquire_lock(&mdl_request,
#endif /* WITH_WSREP */
@@ -1313,8 +1313,9 @@ int ha_commit_trans(THD *thd, bool all)
else
{
/* not wsrep hton, bail to native mysql behavior */
-#endif
+#endif /* WITH_WSREP */
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
+ error= 1;
#ifdef WITH_WSREP
} /* End of else */
#endif
@@ -1426,7 +1427,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
#ifdef WSREP_PROC_INFO
char info[64]= { 0, };
snprintf (info, sizeof(info) - 1, "ha_commit_one_phase(%lld)",
- (long long)thd->wsrep_trx_seqno);
+ (long long)wsrep_thd_trx_seqno(thd));
#else
const char info[]="ha_commit_one_phase()";
#endif /* WSREP_PROC_INFO */
@@ -1462,7 +1463,7 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
}
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
- thd->transaction.cleanup();
+ thd->transaction.cleanup();
#ifdef WITH_WSREP
if (WSREP(thd)) thd_proc_info(thd, tmp_info);
#endif /* WITH_WSREP */
@@ -1546,8 +1547,7 @@ int ha_rollback_trans(THD *thd, bool all)
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
- thd->transaction.cleanup();
-
+ thd->transaction.cleanup();
if (all)
thd->transaction_rollback_request= FALSE;
diff --git a/sql/log.cc b/sql/log.cc
index 242c45a7ed7..7f6cff3cc59 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -490,24 +490,10 @@ private:
};
handlerton *binlog_hton;
-#ifdef WITH_WSREP
-extern handlerton *wsrep_hton;
-#endif
-
-bool LOGGER::is_log_table_enabled(uint log_table_type)
-{
- switch (log_table_type) {
- case QUERY_LOG_SLOW:
- return (table_log_handler != NULL) && opt_slow_log;
- case QUERY_LOG_GENERAL:
- return (table_log_handler != NULL) && opt_log ;
- default:
- DBUG_ASSERT(0);
- return FALSE; /* make compiler happy */
- }
-}
-#ifdef WITH_WSREP
+#if WITH_WSREP
+/* the functions below depend on the definition of binlog_cache_manager class,
+ * so have to stay in this unit. */
IO_CACHE * get_trans_log(THD * thd)
{
binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
@@ -515,7 +501,7 @@ IO_CACHE * get_trans_log(THD * thd)
if (cache_mngr)
{
return cache_mngr->get_binlog_cache_log(true);
- }
+ }
else
{
WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id);
@@ -523,7 +509,6 @@ IO_CACHE * get_trans_log(THD * thd)
}
}
-
bool wsrep_trans_cache_is_empty(THD *thd)
{
binlog_cache_mngr *const cache_mngr=
@@ -535,6 +520,7 @@ void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
{
thd->binlog_flush_pending_rows_event(stmt_end);
}
+
void thd_binlog_trx_reset(THD * thd)
{
/*
@@ -556,74 +542,21 @@ void thd_binlog_rollback_stmt(THD * thd)
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
}
-/*
- Write the contents of a cache to memory buffer.
-
- This function quite the same as MYSQL_BIN_LOG::write_cache(),
- with the exception that here we write in buffer instead of log file.
- */
+#endif /* WITH_WSREP */
-int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len)
+bool LOGGER::is_log_table_enabled(uint log_table_type)
{
-
- if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
- return ER_ERROR_ON_WRITE;
- uint length= my_b_bytes_in_cache(cache);
- long long total_length = 0;
- uchar *buf_ptr = NULL;
-
- do
- {
- /* bail out if buffer grows too large
- This is a temporary fix to avoid flooding replication
- TODO: remove this check for 0.7.4 release
- */
- if (total_length > wsrep_max_ws_size)
- {
- WSREP_WARN("transaction size limit (%lld) exceeded: %lld",
- wsrep_max_ws_size, total_length);
- if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0))
- {
- WSREP_WARN("failed to initialize io-cache");
- }
- if (buf_ptr) my_free(*buf);
- *buf_len = 0;
- return ER_ERROR_ON_WRITE;
- }
- if (total_length > 0)
- {
- *buf_len += length;
- *buf = (uchar *)my_realloc(*buf, total_length+length, MYF(0));
- if (!*buf)
- {
- WSREP_ERROR("io cache write problem: %d %d", *buf_len, length);
- return ER_ERROR_ON_WRITE;
- }
- buf_ptr = *buf+total_length;
- }
- else
- {
- if (buf_ptr != NULL)
- {
- WSREP_ERROR("io cache alloc error: %d %d", *buf_len, length);
- my_free(*buf);
- }
- if (length > 0)
- {
- *buf = (uchar *) my_malloc(length, MYF(0));
- buf_ptr = *buf;
- *buf_len = length;
- }
- }
- total_length += length;
-
- memcpy(buf_ptr, cache->read_pos, length);
- cache->read_pos=cache->read_end;
- } while ((cache->file >= 0) && (length= my_b_fill(cache)));
-
- return 0;
+ switch (log_table_type) {
+ case QUERY_LOG_SLOW:
+ return (table_log_handler != NULL) && opt_slow_log;
+ case QUERY_LOG_GENERAL:
+ return (table_log_handler != NULL) && opt_log ;
+ default:
+ DBUG_ASSERT(0);
+ return FALSE; /* make compiler happy */
+ }
}
-#endif
+
/* Check if a given table is opened log table */
int check_if_log_table(size_t db_len, const char *db, size_t table_name_len,
@@ -1817,13 +1750,6 @@ static inline int
binlog_commit_flush_stmt_cache(THD *thd, bool all,
binlog_cache_mngr *cache_mngr)
{
-#ifdef WITH_WSREP
- if (thd->wsrep_mysql_replicated > 0)
- {
- WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", thd->wsrep_mysql_replicated);
- return 0;
- }
-#endif
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
FALSE, TRUE, TRUE, 0);
return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE));
@@ -1841,6 +1767,13 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all,
static inline int
binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr)
{
+#ifdef WITH_WSREP
+ if (thd->wsrep_mysql_replicated > 0)
+ {
+ WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", thd->wsrep_mysql_replicated);
+ return 0;
+ }
+#endif
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
TRUE, TRUE, TRUE, 0);
return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE));
@@ -2104,12 +2037,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
if (ending_trans(thd, all) &&
((thd->variables.option_bits & OPTION_KEEP_LOG) ||
(trans_has_updated_non_trans_table(thd) &&
- WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) ||
+ WSREP_BINLOG_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) ||
(cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
- WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) ||
+ WSREP_BINLOG_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) ||
(trans_has_updated_non_trans_table(thd) &&
ending_single_stmt_trans(thd,all) &&
- WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED)))
+ WSREP_BINLOG_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED)))
error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
Truncate the cache if:
@@ -2123,9 +2056,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
else if (ending_trans(thd, all) ||
(!(thd->variables.option_bits & OPTION_KEEP_LOG) &&
(!stmt_has_updated_non_trans_table(thd) ||
- WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) &&
+ WSREP_BINLOG_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) &&
(!cache_mngr->trx_cache.changes_to_non_trans_temp_table() ||
- WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED)))
+ WSREP_BINLOG_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED)))
error= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
@@ -5439,35 +5372,6 @@ err:
}
}
-#ifdef WITH_WSREP
- if (WSREP(thd) && wsrep_incremental_data_collection &&
- (wsrep_emulate_bin_log || mysql_bin_log.is_open()))
- {
- DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1);
- if (!error)
- {
- IO_CACHE* cache= get_trans_log(thd);
- uchar* buf= NULL;
- uint buf_len= 0;
-
- if (wsrep_emulate_bin_log)
- thd->binlog_flush_pending_rows_event(false);
- error= wsrep_write_cache(cache, &buf, &buf_len);
- if (!error && buf_len > 0)
- {
- wsrep_status_t rc= wsrep->append_data(wsrep,
- &thd->wsrep_trx_handle,
- buf, buf_len);
- if (rc != WSREP_OK)
- {
- sql_print_warning("WSREP: append_data() returned %d", rc);
- error= 1;
- }
- }
- if (buf_len) my_free(buf);
- }
- }
-#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
diff --git a/sql/log.h b/sql/log.h
index 0e189a789db..34b9df53c93 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -273,12 +273,6 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED };
(mmap+fsync is two times faster than write+fsync)
*/
-#ifdef WITH_WSREP
-extern my_bool wsrep_emulate_bin_log;
-Log_event* wsrep_read_log_event(
- char **arg_buf, size_t *arg_buf_len,
- const Format_description_log_event *description_event);
-#endif
class MYSQL_LOG
{
public:
@@ -870,18 +864,17 @@ enum enum_binlog_format {
};
#ifdef WITH_WSREP
-IO_CACHE * get_trans_log(THD * thd);
+IO_CACHE* get_trans_log(THD * thd);
bool wsrep_trans_cache_is_empty(THD *thd);
void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end);
void thd_binlog_trx_reset(THD * thd);
void thd_binlog_rollback_stmt(THD * thd);
-int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len);
-#define WSREP_FORMAT(my_format) \
- ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \
+#define WSREP_BINLOG_FORMAT(my_format) \
+ ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \
wsrep_forced_binlog_format : my_format)
#else
-#define WSREP_FORMAT(my_format) my_format
+#define WSREP_BINLOG_FORMAT(my_format) my_format
#endif
int query_error_code(THD *thd, bool not_killed);
uint purge_log_get_error_code(int res);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index b4f5f5ad8b2..dac39a63282 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -8406,7 +8406,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->is_fatal_error,
thd->wsrep_exec_mode,
thd->wsrep_conflict_state,
- (long long)thd->wsrep_trx_seqno);
+ (long long)wsrep_thd_trx_seqno(thd));
}
#endif
if (thd->is_slave_error || thd->is_fatal_error)
@@ -10187,7 +10187,7 @@ Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64];
info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else
const char* tmp = (WSREP(thd)) ?
@@ -10864,7 +10864,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64];
info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else
const char* tmp = (WSREP(thd)) ?
@@ -10880,7 +10880,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1,
"Delete_rows_log_event::ha_delete_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info);
#else
if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()");
@@ -11016,7 +11016,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64];
info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else
const char* tmp = (WSREP(thd)) ?
@@ -11053,7 +11053,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1,
"Update_rows_log_event::unpack_current_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info);
#else
if (WSREP(thd))
@@ -11082,7 +11082,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1,
"Update_rows_log_event::ha_update_row(%lld)",
- (long long) thd->wsrep_trx_seqno);
+ (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info);
#else
if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()");
@@ -11179,48 +11179,6 @@ void Incident_log_event::pack_info(THD *thd, Protocol *protocol)
protocol->store(buf, bytes, &my_charset_bin);
}
#endif
-#if WITH_WSREP && !defined(MYSQL_CLIENT)
-Format_description_log_event *wsrep_format_desc; // TODO: free them at the end
-/*
- read the first event from (*buf). The size of the (*buf) is (*buf_len).
- At the end (*buf) is shitfed to point to the following event or NULL and
- (*buf_len) will be changed to account just being read bytes of the 1st event.
-*/
-#define WSREP_MAX_ALLOWED_PACKET 1024*1024*1024 // current protocol max
-
-Log_event* wsrep_read_log_event(
- char **arg_buf, size_t *arg_buf_len,
- const Format_description_log_event *description_event)
-{
- DBUG_ENTER("wsrep_read_log_event");
- char *head= (*arg_buf);
-
- uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
- char *buf= (*arg_buf);
- const char *error= 0;
- Log_event *res= 0;
-
- if (data_len > WSREP_MAX_ALLOWED_PACKET)
- {
- error = "Event too big";
- goto err;
- }
-
- res= Log_event::read_log_event(buf, data_len, &error, description_event, FALSE);
-
-err:
- if (!res)
- {
- DBUG_ASSERT(error != 0);
- sql_print_error("Error in Log_event::read_log_event(): "
- "'%s', data_len: %d, event_type: %d",
- error,data_len,head[EVENT_TYPE_OFFSET]);
- }
- (*arg_buf)+= data_len;
- (*arg_buf_len)-= data_len;
- DBUG_RETURN(res);
-}
-#endif
#ifdef MYSQL_CLIENT
@@ -11310,6 +11268,7 @@ st_print_event_info::st_print_event_info()
}
#endif
+
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event)
diff --git a/sql/mdl.cc b/sql/mdl.cc
index fe8efd3e6cd..268a6621f65 100644
--- a/sql/mdl.cc
+++ b/sql/mdl.cc
@@ -22,6 +22,7 @@
#include <mysql/service_thd_wait.h>
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
+#include "wsrep_thd.h"
extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
extern "C" char *wsrep_thd_query(THD *thd);
void sql_print_information(const char *format, ...)
@@ -3029,7 +3030,7 @@ void MDL_ticket::wsrep_report(bool debug)
{
if (debug)
{
- WSREP_DEBUG("MDL ticket: type: %s space: %s db: %s name: %s (%s)",
+ WSREP_DEBUG("MDL ticket: type: %s space: %s db: %s name: %s",
(get_type() == MDL_INTENTION_EXCLUSIVE) ? "intention exclusive" :
((get_type() == MDL_SHARED) ? "shared" :
((get_type() == MDL_SHARED_HIGH_PRIO ? "shared high prio" :
@@ -3049,7 +3050,7 @@ void MDL_ticket::wsrep_report(bool debug)
((m_lock->key.mdl_namespace() == MDL_key::COMMIT) ? "COMMIT" :
(char *)"UNKNOWN"))))))),
m_lock->key.db_name(),
- m_lock->key.name(),
+ m_lock->key.name(),
m_lock->key.get_wait_state_name());
}
}
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 849ceb1db39..953b59031d8 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -74,6 +74,9 @@
#include "debug_sync.h"
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
+#include "wsrep_var.h"
+#include "wsrep_thd.h"
+#include "wsrep_sst.h"
ulong wsrep_running_threads = 0; // # of currently running wsrep threads
#endif
#include "sql_callback.h"
@@ -354,7 +357,11 @@ static char *default_character_set_name;
static char *character_set_filesystem_name;
static char *lc_messages;
static char *lc_time_names_name;
+#ifndef WITH_WSREP
static char *my_bind_addr_str;
+#else
+char *my_bind_addr_str;
+#endif /* WITH_WSREP */
static char *default_collation_name;
char *default_storage_engine;
static char compiled_default_collation_name[]= MYSQL_DEFAULT_COLLATION_NAME;
@@ -716,7 +723,7 @@ mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
int wsrep_replaying= 0;
static void wsrep_close_threads(THD* thd);
-#endif
+#endif /* WITH_WSREP */
int mysqld_server_started= 0;
File_parser_dummy_hook file_parser_dummy_hook;
@@ -1863,6 +1870,7 @@ extern "C" void unireg_abort(int exit_code)
WSREP_INFO("Some threads may fail to exit.");
}
#endif // WITH_WSREP
+
clean_up(!opt_abort && (exit_code || !opt_bootstrap)); /* purecov: inspected */
DBUG_PRINT("quit",("done with cleanup in unireg_abort"));
mysqld_exit(exit_code);
@@ -2389,7 +2397,7 @@ static my_socket activate_tcp_port(uint port)
socket_errno);
unireg_abort(1);
}
-#if defined(WITH_WSREP) && defined(HAVE_FCNTL)
+#if defined(WITH_WSREP) && defined(HAVE_FCNTL) && defined(FD_CLOEXEC)
(void) fcntl(ip_sock, F_SETFD, FD_CLOEXEC);
#endif /* WITH_WSREP */
DBUG_RETURN(ip_sock);
@@ -2513,7 +2521,7 @@ static void network_init(void)
if (listen(unix_sock,(int) back_log) < 0)
sql_print_warning("listen() on Unix socket failed with error %d",
socket_errno);
-#if defined(WITH_WSREP) && defined(HAVE_FCNTL)
+#if defined(WITH_WSREP) && defined(HAVE_FCNTL) && defined(FD_CLOEXEC)
(void) fcntl(unix_sock, F_SETFD, FD_CLOEXEC);
#endif /* WITH_WSREP */
}
@@ -4141,25 +4149,25 @@ static int init_thread_environment()
return 1;
}
#ifdef WITH_WSREP
- mysql_mutex_init(key_LOCK_wsrep_ready,
- &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_ready,
+ &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
- mysql_mutex_init(key_LOCK_wsrep_sst,
- &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_sst,
+ &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
- mysql_mutex_init(key_LOCK_wsrep_sst_init,
- &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_sst_init,
+ &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
- mysql_mutex_init(key_LOCK_wsrep_rollback,
- &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_rollback,
+ &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
- mysql_mutex_init(key_LOCK_wsrep_replaying,
- &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_replaying,
+ &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
- mysql_mutex_init(key_LOCK_wsrep_slave_threads,
- &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_LOCK_wsrep_desync,
- &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_slave_threads,
+ &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_desync,
+ &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
#endif
return 0;
}
@@ -4789,7 +4797,7 @@ pthread_handler_t start_wsrep_THD(void *arg)
THD *thd;
wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
- if (my_thread_init())
+ if (my_thread_init())
{
WSREP_ERROR("Could not initialize thread");
return(NULL);
@@ -4852,7 +4860,7 @@ pthread_handler_t start_wsrep_THD(void *arg)
statistic_increment(aborted_connects,&LOCK_status);
MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
delete thd;
-
+
return(NULL);
}
@@ -4910,42 +4918,6 @@ pthread_handler_t start_wsrep_THD(void *arg)
return(NULL);
}
-void wsrep_create_rollbacker()
-{
- if (WSREP_PROVIDER_EXISTS)
- {
- pthread_t hThread;
- /* create rollbacker */
- if (pthread_create( &hThread, &connection_attrib,
- start_wsrep_THD, (void*)wsrep_rollback_process))
- WSREP_WARN("Can't create thread to manage wsrep rollback");
- }
-}
-
-void wsrep_create_appliers(long threads)
-{
- if (!wsrep_connected)
- {
- /* see wsrep_replication_start() for the logic */
- if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
- WSREP_PROVIDER_EXISTS)
- {
- WSREP_ERROR("Trying to launch slave threads before creating "
- "connection at '%s'", wsrep_cluster_address);
- assert(0);
- }
- return;
- }
-
- long wsrep_threads=0;
- pthread_t hThread;
- while (wsrep_threads++ < threads) {
- if (pthread_create(
- &hThread, &connection_attrib,
- start_wsrep_THD, (void*)wsrep_replication_process))
- WSREP_WARN("Can't create thread to manage wsrep replication");
- }
-}
/**/
static bool abort_replicated(THD *thd)
{
@@ -4970,7 +4942,7 @@ WSREP_WARN("applier has wsrep_exec_mode = %d", thd->wsrep_exec_mode);
if ( thd->slave_thread || /* declared as mysql slave */
thd->system_thread || /* declared as system thread */
- !thd->vio_ok() || /* server internal thread */
+ !thd->vio_ok() || /* server internal thread */
thd->wsrep_exec_mode==REPL_RECV || /* applier or replaying thread */
thd->wsrep_applier || /* wsrep slave applier */
!thd->variables.wsrep_on) /* client, but fenced outside wsrep */
@@ -5452,6 +5424,9 @@ int mysqld_main(int argc, char **argv)
return 1;
}
#endif
+#ifdef WITH_WSREP
+ wsrep_filter_new_cluster (&argc, argv);
+#endif /* WITH_WSREP */
orig_argc= argc;
orig_argv= argv;
@@ -5755,6 +5730,13 @@ int mysqld_main(int argc, char **argv)
unireg_abort(1);
#ifdef WITH_WSREP /* WSREP AFTER SE */
+ if (wsrep_recovery)
+ {
+ select_thread_in_use= 0;
+ wsrep_recover();
+ unireg_abort(0);
+ }
+
if (opt_bootstrap)
{
/*! bootstrap wsrep init was taken care of above */
@@ -6484,7 +6466,7 @@ void handle_connections_sockets()
sleep(1); // Give other threads some time
continue;
}
-#if defined(WITH_WSREP) && defined(HAVE_FCNTL)
+#if defined(WITH_WSREP) && defined(HAVE_FCNTL) && defined(FD_CLOEXEC)
(void) fcntl(new_sock, F_SETFD, FD_CLOEXEC);
#endif /* WITH_WSREP */
@@ -7984,6 +7966,7 @@ SHOW_VAR status_vars[]= {
{"wsrep_cluster_status", (char*) &wsrep_cluster_status, SHOW_CHAR_PTR},
{"wsrep_cluster_size", (char*) &wsrep_cluster_size, SHOW_LONG_NOFLUSH},
{"wsrep_local_index", (char*) &wsrep_local_index, SHOW_LONG_NOFLUSH},
+ {"wsrep_local_bf_aborts", (char*) &wsrep_show_bf_aborts, SHOW_FUNC},
{"wsrep_provider_name", (char*) &wsrep_provider_name, SHOW_CHAR_PTR},
{"wsrep_provider_version", (char*) &wsrep_provider_version, SHOW_CHAR_PTR},
{"wsrep_provider_vendor", (char*) &wsrep_provider_vendor, SHOW_CHAR_PTR},
@@ -9220,6 +9203,9 @@ void refresh_status(THD *thd)
/* Reset some global variables */
reset_status_vars();
+#ifdef WITH_WSREP
+ wsrep->stats_reset(wsrep);
+#endif /* WITH_WSREP */
/* Reset the counters of all key caches (default and named). */
process_key_caches(reset_key_cache_counters, 0);
@@ -9256,3 +9242,4 @@ template class I_List<i_string_pair>;
template class I_List<Statement>;
template class I_List_iterator<Statement>;
#endif
+
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 0075c81726c..f392452f56e 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -227,7 +227,7 @@ extern PSI_mutex_key key_PAGE_lock, key_LOCK_sync, key_LOCK_active,
#ifdef WITH_WSREP
extern PSI_mutex_key key_LOCK_wsrep_thd;
extern PSI_cond_key key_COND_wsrep_thd;
-#endif /* HAVE_MMAP */
+#endif /* HAVE_WSREP */
#ifdef HAVE_OPENSSL
extern PSI_mutex_key key_LOCK_des_key_file;
@@ -423,7 +423,7 @@ enum options_mysqld
OPT_WSREP_START_POSITION,
OPT_WSREP_SST_AUTH,
OPT_WSREP_RECOVER,
-#endif
+#endif /* WITH_WSREP */
OPT_which_is_always_the_last
};
#endif
@@ -572,5 +572,9 @@ extern uint internal_tmp_table_max_key_segments;
extern uint volatile global_disable_checkpoint;
extern my_bool opt_help;
+#ifdef WITH_WSREP
+#include "my_pthread.h"
+pthread_handler_t start_wsrep_THD(void*);
+#endif /* WITH_WSREP */
#endif /* MYSQLD_INCLUDED */
diff --git a/sql/sp.cc b/sql/sp.cc
index 269917d6863..c379dbd7fbf 100644
--- a/sql/sp.cc
+++ b/sql/sp.cc
@@ -2289,7 +2289,7 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db,
return sp;
}
#ifdef WITH_WSREP
-int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len)
+int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
{
String log_query;
sp_head *sp = thd->lex->sphead;
@@ -2315,10 +2315,10 @@ int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len)
sp->m_chistics, &(thd->lex->definer->user),
&(thd->lex->definer->host),
saved_mode))
- {
- WSREP_WARN("SP create string failed: %s", thd->query());
- return 1;
- }
+ {
+ WSREP_WARN("SP create string failed: %s", thd->query());
+ return 1;
+ }
return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
}
#endif /* WITH_WSREP */
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 3acfa63d55e..5eb6607fb4d 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -59,9 +59,9 @@
#ifdef __WIN__
#include <io.h>
#endif
-
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
+#include "wsrep_thd.h"
#endif // WITH_WSREP
bool
@@ -4227,7 +4227,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
*/
bool log_on= mysql_bin_log.is_open() && thd->variables.sql_log_bin;
ulong binlog_format= thd->variables.binlog_format;
- if ((log_on == FALSE) || (WSREP_FORMAT(binlog_format) == BINLOG_FORMAT_ROW) ||
+ if ((log_on == FALSE) || (WSREP_BINLOG_FORMAT(binlog_format) == BINLOG_FORMAT_ROW) ||
(table_list->table->s->table_category == TABLE_CATEGORY_LOG) ||
(table_list->table->s->table_category == TABLE_CATEGORY_PERFORMANCE) ||
!(is_update_query(prelocking_ctx->sql_command) ||
@@ -5894,7 +5894,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count,
We can solve these problems in mixed mode by switching to binlogging
if at least one updated table is used by sub-statement
*/
- if (WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_ROW && tables &&
+ if (WSREP_BINLOG_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_ROW && tables &&
has_write_table_with_auto_increment(thd->lex->first_not_own_table()))
thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_COLUMNS);
}
@@ -9431,6 +9431,7 @@ void tdc_remove_table(THD *thd, enum_tdc_remove_table_type remove_type,
{
mysql_mutex_assert_owner(&LOCK_open);
}
+
#ifdef WITH_WSREP
/* if thd was BF aborted, exclusive locks were canceled */
#else
@@ -9438,6 +9439,7 @@ void tdc_remove_table(THD *thd, enum_tdc_remove_table_type remove_type,
thd->mdl_context.is_lock_owner(MDL_key::TABLE, db, table_name,
MDL_EXCLUSIVE));
#endif /* WITH_WSREP */
+
key_length= create_table_def_key(key, db, table_name);
if ((share= (TABLE_SHARE*) my_hash_search(&table_def_cache,(uchar*) key,
@@ -9461,7 +9463,6 @@ void tdc_remove_table(THD *thd, enum_tdc_remove_table_type remove_type,
thus others can use table */
if (table->in_use != thd &&
- table->in_use->wsrep_bf_thd != thd &&
table->in_use->wsrep_conflict_state != MUST_ABORT)
{
#endif
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 2b9595a5080..334d6c5a619 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -848,9 +848,9 @@ extern "C" const char *wsrep_thd_conflict_state_str(THD *thd)
(thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void";
}
-extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd)
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
{
- return &thd->wsrep_trx_handle;
+ return &thd->wsrep_ws_handle;
}
extern "C"void wsrep_thd_LOCK(THD *thd)
@@ -875,7 +875,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd)
}
extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd)
{
- return (thd) ? thd->wsrep_trx_seqno : WSREP_SEQNO_UNDEFINED;
+ return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED;
}
extern "C" query_id_t wsrep_thd_query_id(THD *thd)
{
@@ -913,16 +913,16 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal)
extern "C" int
wsrep_trx_order_before(void *thd1, void *thd2)
{
- if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) {
- WSREP_DEBUG("BF conflict, order: %lld %lld\n",
- (long long)((THD*)thd1)->wsrep_trx_seqno,
- (long long)((THD*)thd2)->wsrep_trx_seqno);
- return 1;
- }
- WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
- (long long)((THD*)thd1)->wsrep_trx_seqno,
- (long long)((THD*)thd2)->wsrep_trx_seqno);
- return 0;
+ if (wsrep_thd_trx_seqno((THD*)thd1) < wsrep_thd_trx_seqno((THD*)thd2)) {
+ WSREP_DEBUG("BF conflict, order: %lld %lld\n",
+ (long long)wsrep_thd_trx_seqno((THD*)thd1),
+ (long long)wsrep_thd_trx_seqno((THD*)thd2));
+ return 1;
+ }
+ WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
+ (long long)wsrep_thd_trx_seqno((THD*)thd1),
+ (long long)wsrep_thd_trx_seqno((THD*)thd2));
+ return 0;
}
extern "C" int
wsrep_trx_is_aborting(void *thd_ptr)
@@ -1000,7 +1000,7 @@ THD::THD()
wsrep_applier(is_applier),
wsrep_applier_closing(FALSE),
wsrep_client_thread(0),
- wsrep_trx_seqno(WSREP_SEQNO_UNDEFINED),
+ wsrep_apply_toi(false),
#endif
m_parser_state(NULL),
#if defined(ENABLED_DEBUG_SYNC)
@@ -1104,9 +1104,8 @@ THD::THD()
#ifdef WITH_WSREP
mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL);
- wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
- wsrep_trx_handle.opaque = NULL;
- //wsrep_retry_autocommit= ::wsrep_retry_autocommit;
+ wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
+ wsrep_ws_handle.opaque = NULL;
wsrep_retry_counter = 0;
wsrep_PA_safe = true;
wsrep_retry_query = NULL;
@@ -1471,7 +1470,10 @@ void THD::init(void)
wsrep_conflict_state= NO_CONFLICT;
wsrep_query_state= QUERY_IDLE;
wsrep_last_query_id= 0;
+ wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED;
+ wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED;
wsrep_converted_lock_session= false;
+ //wsrep_retry_autocommit= ::wsrep_retry_autocommit;
wsrep_retry_counter= 0;
wsrep_rli= NULL;
wsrep_PA_safe= true;
@@ -2116,6 +2118,7 @@ void THD::cleanup_after_query()
/* reset table map for multi-table update */
table_map_for_update= 0;
m_binlog_invoker= FALSE;
+ /* reset replication info structure */
#ifndef EMBEDDED_LIBRARY
if (rli_slave)
@@ -4279,7 +4282,7 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd)
#else
if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG))
#endif
- return (int) WSREP_FORMAT(thd->variables.binlog_format);
+ return (int) WSREP_BINLOG_FORMAT(thd->variables.binlog_format);
else
return BINLOG_FORMAT_UNSPEC;
}
@@ -4839,7 +4842,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
binlog by filtering rules.
*/
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
- !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT &&
+ !(WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db)))
{
/*
@@ -5003,7 +5006,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
*/
my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
}
- else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW &&
+ else if (WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW &&
sqlcom_can_generate_row_events(this))
{
/*
@@ -5032,7 +5035,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
else
{
/* binlog_format = STATEMENT */
- if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT)
+ if (WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT)
{
if (lex->is_stmt_row_injection())
{
@@ -5144,7 +5147,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
"and binlog_filter->db_ok(db) = %d",
mysql_bin_log.is_open(),
(variables.option_bits & OPTION_BIN_LOG),
- WSREP_FORMAT(variables.binlog_format),
+ WSREP_BINLOG_FORMAT(variables.binlog_format),
binlog_filter->db_ok(db)));
#endif
diff --git a/sql/sql_class.h b/sql/sql_class.h
index f35e8914c42..d10769d10a8 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -49,6 +49,7 @@
#include "wsrep_mysqld.h"
struct wsrep_thd_shadow {
ulonglong options;
+ uint server_status;
enum wsrep_exec_mode wsrep_exec_mode;
Vio *vio;
ulong tx_isolation;
@@ -1807,7 +1808,8 @@ public:
int is_current_stmt_binlog_format_row() const {
DBUG_ASSERT(current_stmt_binlog_format == BINLOG_FORMAT_STMT ||
current_stmt_binlog_format == BINLOG_FORMAT_ROW);
- return (WSREP_FORMAT((ulong)current_stmt_binlog_format) == BINLOG_FORMAT_ROW);
+ return (WSREP_BINLOG_FORMAT((ulong)current_stmt_binlog_format) ==
+ BINLOG_FORMAT_ROW);
}
private:
@@ -2353,11 +2355,13 @@ public:
enum wsrep_conflict_state wsrep_conflict_state;
mysql_mutex_t LOCK_wsrep_thd;
mysql_cond_t COND_wsrep_thd;
- wsrep_seqno_t wsrep_trx_seqno;
+ // changed from wsrep_seqno_t to wsrep_trx_meta_t in wsrep API rev 75
+ // wsrep_seqno_t wsrep_trx_seqno;
+ wsrep_trx_meta_t wsrep_trx_meta;
uint32 wsrep_rand;
Relay_log_info* wsrep_rli;
bool wsrep_converted_lock_session;
- wsrep_trx_handle_t wsrep_trx_handle;
+ wsrep_ws_handle_t wsrep_ws_handle;
#ifdef WSREP_PROC_INFO
char wsrep_info[128]; /* string for dynamic proc info */
#endif /* WSREP_PROC_INFO */
@@ -2374,6 +2378,7 @@ public:
const char* wsrep_TOI_pre_query; /* a query to apply before
the actual TOI query */
size_t wsrep_TOI_pre_query_len;
+ bool wsrep_apply_toi; /* applier processing in TOI */
#endif /* WITH_WSREP */
/**
Internal parser state.
@@ -2819,7 +2824,7 @@ public:
tests fail and so force them to propagate the
lex->binlog_row_based_if_mixed upwards to the caller.
*/
- if ((WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_MIXED) &&
+ if ((WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_MIXED)&&
(in_sub_stmt == 0))
set_current_stmt_binlog_format_row();
@@ -2861,7 +2866,7 @@ public:
show_system_thread(system_thread)));
if (in_sub_stmt == 0)
{
- if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW)
+ if (WSREP_BINLOG_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW)
set_current_stmt_binlog_format_row();
else if (temporary_tables == NULL)
clear_current_stmt_binlog_format_row();
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index e8e5d79b370..d497b6d1263 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -104,13 +104,9 @@
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
-#include "rpl_rli.h"
-static void wsrep_client_rollback(THD *thd);
-
-extern Format_description_log_event *wsrep_format_desc;
-
+#include "wsrep_thd.h"
static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
- Parser_state *parser_state);
+ Parser_state *parser_state);
#endif /* WITH_WSREP */
/**
@defgroup Runtime_Environment Runtime Environment
@@ -445,13 +441,6 @@ bool is_log_table_write_query(enum enum_sql_command command)
return (sql_command_flags[command] & CF_WRITE_LOGS_COMMAND) != 0;
}
-#ifdef WITH_WSREP
-bool is_show_query(enum enum_sql_command command)
-{
- DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
- return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
-}
-#endif
void execute_init_command(THD *thd, LEX_STRING *init_command,
mysql_rwlock_t *var_lock)
{
@@ -637,7 +626,7 @@ void do_handle_bootstrap(THD *thd)
close_connection(thd, ER_OUT_OF_RESOURCES, 1);
#else
close_connection(thd, ER_OUT_OF_RESOURCES);
-#endif
+#endif /* WITH_WSREP */
#endif
thd->fatal_error();
goto end;
@@ -717,13 +706,13 @@ bool do_command(THD *thd)
{
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_query_state= QUERY_IDLE;
- if (thd->wsrep_conflict_state==MUST_ABORT)
+ if (thd->wsrep_conflict_state==MUST_ABORT)
{
wsrep_client_rollback(thd);
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
}
-#endif
+#endif /* WITH_WSREP */
/*
indicator of uninitialized lex => normal flow of errors handling
(see my_message_sql)
@@ -776,12 +765,12 @@ bool do_command(THD *thd)
if (thd->wsrep_conflict_state == ABORTING)
{
while (thd->wsrep_conflict_state == ABORTING) {
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
- my_sleep(1000);
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ my_sleep(1000);
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
}
thd->store_globals();
- }
+ }
else if (thd->wsrep_conflict_state == ABORTED)
{
thd->store_globals();
@@ -795,7 +784,7 @@ bool do_command(THD *thd)
(!WSREP(thd) && (packet_length= my_net_read(net)) == packet_error))
#else
if ((packet_length= my_net_read(net)) == packet_error)
-#endif
+#endif /* WITH_WSREP */
{
DBUG_PRINT("info",("Got error %d reading command from socket %s",
net->error,
@@ -805,12 +794,12 @@ bool do_command(THD *thd)
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
if (thd->wsrep_conflict_state == MUST_ABORT)
{
- DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id));
- wsrep_client_rollback(thd);
+ DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id));
+ wsrep_client_rollback(thd);
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
}
-#endif
+#endif /* WITH_WSREP */
/* Check if we can continue without closing the connection */
@@ -858,11 +847,11 @@ bool do_command(THD *thd)
#ifdef WITH_WSREP
if (WSREP(thd)) {
- /*
+ /*
* bail out if DB snapshot has not been installed. We however,
* allow queries "SET" and "SHOW", they are trapped later in execute_command
*/
- if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready &&
+ if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready &&
command != COM_QUERY &&
command != COM_PING &&
command != COM_QUIT &&
@@ -875,14 +864,14 @@ bool do_command(THD *thd)
command != COM_TIME &&
command != COM_END
) {
- my_error(ER_UNKNOWN_COM_ERROR, MYF(0),
+ my_error(ER_UNKNOWN_COM_ERROR, MYF(0),
"WSREP has not yet prepared node for application use");
thd->protocol->end_statement();
return_value= FALSE;
goto out;
}
}
-#endif
+#endif /* WITH_WSREP */
/* Restore read timeout value */
my_net_set_read_timeout(net, thd->variables.net_read_timeout);
@@ -892,17 +881,17 @@ bool do_command(THD *thd)
if (WSREP(thd)) {
while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT)
{
- CHARSET_INFO *current_charset = thd->variables.character_set_client;
- if (!is_supported_parser_charset(current_charset))
- {
- /* Do not use non-supported parser character sets */
- WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
- thd->variables.character_set_client = &my_charset_latin1;
- WSREP_WARN("For retry temporally setting character set to : %s", my_charset_latin1.csname);
- }
- return_value= dispatch_command(command, thd, thd->wsrep_retry_query,
- thd->wsrep_retry_query_len);
- thd->variables.character_set_client = current_charset;
+ CHARSET_INFO *current_charset = thd->variables.character_set_client;
+ if (!is_supported_parser_charset(current_charset))
+ {
+ /* Do not use non-supported parser character sets */
+ WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
+ thd->variables.character_set_client = &my_charset_latin1;
+ WSREP_WARN("For retry temporally setting character set to : %s", my_charset_latin1.csname);
+ }
+ return_value= dispatch_command(command, thd, thd->wsrep_retry_query,
+ thd->wsrep_retry_query_len);
+ thd->variables.character_set_client = current_charset;
}
}
if (thd->wsrep_retry_query && thd->wsrep_conflict_state != REPLAYING)
@@ -912,7 +901,7 @@ bool do_command(THD *thd)
thd->wsrep_retry_query_len = 0;
thd->wsrep_retry_command = COM_CONNECT;
}
-#endif
+#endif /* WITH_WSREP */
out:
DBUG_RETURN(return_value);
}
@@ -1059,7 +1048,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
{
wsrep_client_rollback(thd);
}
- if (thd->wsrep_conflict_state== ABORTED)
+ if (thd->wsrep_conflict_state== ABORTED)
{
my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
WSREP_DEBUG("Deadlock error for: %s", thd->query());
@@ -1264,7 +1253,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
wsrep_mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
#else
mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
-#endif
+#endif /* WITH_WSREP */
while (!thd->killed && (parser_state.m_lip.found_semicolon != NULL) &&
! thd->is_error())
@@ -1325,14 +1314,14 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->set_time(); /* Reset the query start time. */
#else
thd->set_time(); /* Reset the query start time. */
-#endif
+#endif /* WITH_WSREP */
parser_state.reset(beginning_of_next_stmt, length);
/* TODO: set thd->lex->sql_command to SQLCOM_END here */
#ifdef WITH_WSREP
wsrep_mysql_parse(thd, beginning_of_next_stmt, length, &parser_state);
#else
mysql_parse(thd, beginning_of_next_stmt, length, &parser_state);
-#endif
+#endif /* WITH_WSREP */
}
DBUG_PRINT("info",("query ready"));
@@ -1666,14 +1655,17 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
/* wsrep BF abort in query exec phase */
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
if ((thd->wsrep_conflict_state != REPLAYING) &&
- (thd->wsrep_conflict_state != RETRY_AUTOCOMMIT))
- {
+ (thd->wsrep_conflict_state != RETRY_AUTOCOMMIT)) {
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
thd->update_server_status();
thd->protocol->end_statement();
query_cache_end_of_result(thd);
+ }
+ else
+ {
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
}
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
} else { /* if (WSREP(thd))... */
#endif /* WITH_WSREP */
DBUG_ASSERT(thd->derived_tables == NULL &&
@@ -2146,6 +2138,13 @@ err:
return TRUE;
}
+#ifdef WITH_WSREP
+static bool wsrep_is_show_query(enum enum_sql_command command)
+{
+ DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
+ return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
+}
+#endif /* WITH_WSREP */
/**
Execute command saved in thd and lex->sql_command.
@@ -2381,7 +2380,7 @@ mysql_execute_command(THD *thd)
}
}
}
- if (lex->sql_command== SQLCOM_UNLOCK_TABLES &&
+ if (lex->sql_command== SQLCOM_UNLOCK_TABLES &&
thd->wsrep_converted_lock_session)
{
thd->wsrep_converted_lock_session= false;
@@ -2389,13 +2388,13 @@ mysql_execute_command(THD *thd)
lex->tx_release= TVL_NO;
}
- /*
+ /*
* bail out if DB snapshot has not been installed. We however,
* allow SET and SHOW queries
*/
if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready &&
lex->sql_command != SQLCOM_SET_OPTION &&
- !is_show_query(lex->sql_command))
+ !wsrep_is_show_query(lex->sql_command))
{
#if DIRTY_HACK
/* Dirty hack for lp:1002714 - trying to recognize mysqldump connection
@@ -2410,7 +2409,7 @@ mysql_execute_command(THD *thd)
strncmp(thd->query(), mysqldump_magic_str, mysqldump_magic_str_len))
{
#endif /* DIRTY_HACK */
- my_error(ER_UNKNOWN_COM_ERROR, MYF(0),
+ my_error(ER_UNKNOWN_COM_ERROR, MYF(0),
"WSREP has not yet prepared node for application use");
goto error;
#if DIRTY_HACK
@@ -2449,7 +2448,9 @@ mysql_execute_command(THD *thd)
if (trans_commit_implicit(thd))
{
thd->mdl_context.release_transactional_locks();
+#ifdef WITH_WSREP
WSREP_DEBUG("implicit commit failed, MDL released: %lu", thd->thread_id);
+#endif /* WITH_WSREP */
goto error;
}
/* Release metadata locks acquired in this transaction. */
@@ -2480,7 +2481,13 @@ mysql_execute_command(THD *thd)
break;
case SQLCOM_SHOW_STATUS:
{
+#ifdef WITH_WSREP
+ if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
execute_show_status(thd, all_tables);
+#ifdef WITH_WSREP
+ if (lex->sql_command == SQLCOM_SHOW_STATUS) wsrep_free_status(thd);
+#endif /* WITH_WSREP */
break;
}
case SQLCOM_SHOW_DATABASES:
@@ -2511,7 +2518,7 @@ mysql_execute_command(THD *thd)
case SQLCOM_SHOW_STORAGE_ENGINES:
case SQLCOM_SHOW_PROFILE:
#endif /* WITH_WSREP */
- {
+ {
thd->status_var.last_query_cost= 0.0;
/*
@@ -2821,7 +2828,7 @@ case SQLCOM_PREPARE:
*/
if (thd->query_name_consts &&
mysql_bin_log.is_open() &&
- WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT &&
+ WSREP_BINLOG_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT &&
!mysql_bin_log.is_query_in_union(thd, thd->query_id))
{
List_iterator_fast<Item> it(select_lex->item_list);
@@ -3503,7 +3510,7 @@ end_with_restore_list:
#ifdef WITH_WSREP
for (TABLE_LIST *table= all_tables; table; table= table->next_global)
{
- if (!lex->drop_temporary &&
+ if (!lex->drop_temporary &&
(!thd->is_current_stmt_binlog_format_row() ||
!find_temporary_table(thd, table)))
{
@@ -4169,7 +4176,7 @@ end_with_restore_list:
lex->insert_list, lex->ha_rkey_mode, select_lex->where,
unit->select_limit_cnt, unit->offset_limit_cnt);
#ifdef WITH_WSREP
- if (WSREP(thd)) thd_proc_info(thd, tmp_info);
+ if (WSREP(thd)) thd_proc_info(thd, tmp_info);
}
#endif /* WITH_WSREP */
break;
@@ -4178,7 +4185,9 @@ end_with_restore_list:
if (trans_begin(thd, lex->start_transaction_opt))
{
thd->mdl_context.release_transactional_locks();
+#ifdef WITH_WSREP
WSREP_DEBUG("BEGIN failed, MDL released: %lu", thd->thread_id);
+#endif /* WITH_WSREP */
goto error;
}
my_ok(thd);
@@ -4196,7 +4205,9 @@ end_with_restore_list:
if (trans_commit(thd))
{
thd->mdl_context.release_transactional_locks();
+#ifdef WITH_WSREP
WSREP_DEBUG("COMMIT failed, MDL released: %lu", thd->thread_id);
+#endif /* WITH_WSREP */
goto error;
}
thd->mdl_context.release_transactional_locks();
@@ -4217,7 +4228,20 @@ end_with_restore_list:
thd->killed= KILL_CONNECTION;
thd->print_aborted_warning(3, "RELEASE");
}
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+
+ if (thd->wsrep_conflict_state == NO_CONFLICT ||
+ thd->wsrep_conflict_state == REPLAYING)
+ {
+ my_ok(thd);
+ }
+ } else {
+#endif /* WITH_WSREP */
my_ok(thd);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
break;
}
case SQLCOM_ROLLBACK:
@@ -4233,7 +4257,9 @@ end_with_restore_list:
if (trans_rollback(thd))
{
thd->mdl_context.release_transactional_locks();
+#ifdef WITH_WSREP
WSREP_DEBUG("rollback failed, MDL released: %lu", thd->thread_id);
+#endif /* WITH_WSREP */
goto error;
}
thd->mdl_context.release_transactional_locks();
@@ -4251,20 +4277,18 @@ end_with_restore_list:
/* Disconnect the current client connection. */
if (tx_release)
thd->killed= KILL_CONNECTION;
- #ifdef WITH_WSREP
+#ifdef WITH_WSREP
if (WSREP(thd)) {
- if (thd->wsrep_conflict_state == NO_CONFLICT ||
- thd->wsrep_conflict_state == REPLAYING)
- {
- my_ok(thd);
+ if (thd->wsrep_conflict_state == NO_CONFLICT) {
+ my_ok(thd);
}
} else {
#endif /* WITH_WSREP */
- my_ok(thd);
- #ifdef WITH_WSREP
+ my_ok(thd);
+#ifdef WITH_WSREP
}
#endif /* WITH_WSREP */
- break;
+ break;
}
case SQLCOM_RELEASE_SAVEPOINT:
if (trans_release_savepoint(thd, lex->ident))
@@ -4781,7 +4805,9 @@ create_sp_error:
if (trans_xa_commit(thd))
{
thd->mdl_context.release_transactional_locks();
+#ifdef WITH_WSREP
WSREP_DEBUG("XA commit failed, MDL released: %lu", thd->thread_id);
+#endif /* WITH_WSREP */
goto error;
}
thd->mdl_context.release_transactional_locks();
@@ -4796,7 +4822,9 @@ create_sp_error:
if (trans_xa_rollback(thd))
{
thd->mdl_context.release_transactional_locks();
+#ifdef WITH_WSREP
WSREP_DEBUG("XA rollback failed, MDL released: %lu", thd->thread_id);
+#endif /* WITH_WSREP */
goto error;
}
thd->mdl_context.release_transactional_locks();
@@ -5931,7 +5959,12 @@ void THD::reset_for_next_command()
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
#ifdef WITH_WSREP
- if (WSREP(thd)) {
+ /*
+ Autoinc variables should be adjusted only for locally executed
+ transactions. Appliers and replayers are either processing ROW
+ events or get autoinc variable values from Query_log_event.
+ */
+ if (WSREP(thd) && thd->wsrep_exec_mode == LOCAL_STATE) {
if (wsrep_auto_increment_control)
{
if (thd->variables.auto_increment_offset !=
@@ -6147,96 +6180,10 @@ void mysql_init_multi_delete(LEX *lex)
}
#ifdef WITH_WSREP
-void wsrep_replay_transaction(THD *thd)
-{
- /* checking if BF trx must be replayed */
- if (thd->wsrep_conflict_state== MUST_REPLAY)
- {
- if (thd->wsrep_exec_mode!= REPL_RECV)
- {
- if (thd->stmt_da->is_sent)
- {
- WSREP_ERROR("replay issue, thd has reported status already");
- }
- thd->stmt_da->reset_diagnostics_area();
-
- thd->wsrep_conflict_state= REPLAYING;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- mysql_reset_thd_for_next_command(thd);
- thd->killed= NOT_KILLED;
- close_thread_tables(thd);
- if (thd->locked_tables_mode && thd->lock)
- {
- WSREP_DEBUG("releasing table lock for replaying (%ld)",
- thd->thread_id);
- thd->locked_tables_list.unlock_locked_tables(thd);
- thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
- }
- thd->mdl_context.release_transactional_locks();
-
- thd_proc_info(thd, "wsrep replaying trx");
- WSREP_DEBUG("replay trx: %s %lld",
- thd->query() ? thd->query() : "void",
- (long long)thd->wsrep_trx_seqno);
- struct wsrep_thd_shadow shadow;
- wsrep_prepare_bf_thd(thd, &shadow);
- int rcode = wsrep->replay_trx(wsrep,
- &thd->wsrep_trx_handle,
- (void *)thd);
-
- wsrep_return_from_bf_mode(thd, &shadow);
- if (thd->wsrep_conflict_state!= REPLAYING)
- WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
-
- switch (rcode)
- {
- case WSREP_OK:
- thd->wsrep_conflict_state= NO_CONFLICT;
- wsrep->post_commit(wsrep, &thd->wsrep_trx_handle);
- WSREP_DEBUG("trx_replay successful for: %ld %llu",
- thd->thread_id, (long long)thd->real_id);
- break;
- case WSREP_TRX_FAIL:
- if (thd->stmt_da->is_sent)
- {
- WSREP_ERROR("replay failed, thd has reported status");
- }
- else
- {
- WSREP_DEBUG("replay failed, rolling back");
- my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
- }
- thd->wsrep_conflict_state= ABORTED;
- thd->wsrep_bf_thd = NULL;
- wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle);
- break;
- default:
- WSREP_ERROR("trx_replay failed for: %d, query: %s",
- rcode, thd->query() ? thd->query() : "void");
- /* we're now in inconsistent state, must abort */
- unireg_abort(1);
- break;
- }
-
- wsrep_cleanup_transaction(thd);
-
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying--;
- WSREP_DEBUG("replaying decreased: %d, thd: %lu",
- wsrep_replaying, thd->thread_id);
- mysql_cond_broadcast(&COND_wsrep_replaying);
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
- }
- }
-}
-
static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
- Parser_state *parser_state)
+ Parser_state *parser_state)
{
- bool is_autocommit=
+ bool is_autocommit=
!thd->in_multi_stmt_transaction_mode() &&
thd->wsrep_conflict_state == NO_CONFLICT &&
!thd->wsrep_applier &&
@@ -6259,9 +6206,10 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
WSREP_DEBUG("abort in exec query state, avoiding autocommit");
}
- if (thd->wsrep_conflict_state== MUST_REPLAY)
+ /* checking if BF trx must be replayed */
+ if (thd->wsrep_conflict_state== MUST_REPLAY)
{
- wsrep_replay_transaction(thd);
+ wsrep_replay_transaction(thd);
}
/* setting error code for BF aborted trxs */
@@ -6274,7 +6222,7 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
thd->lex->sql_command != SQLCOM_SELECT &&
(thd->wsrep_retry_counter < thd->variables.wsrep_retry_autocommit))
{
- WSREP_DEBUG("wsrep retrying AC query: %s",
+ WSREP_DEBUG("wsrep retrying AC query: %s",
(thd->query()) ? thd->query() : "void");
close_thread_tables(thd);
@@ -6287,10 +6235,10 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
}
else
{
- WSREP_DEBUG("%s, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s",
- (thd->wsrep_conflict_state == ABORTED) ?
+ WSREP_DEBUG("%s, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s",
+ (thd->wsrep_conflict_state == ABORTED) ?
"BF Aborted" : "cert failure",
- thd->thread_id, is_autocommit, thd->wsrep_retry_counter,
+ thd->thread_id, is_autocommit, thd->wsrep_retry_counter,
thd->variables.wsrep_retry_autocommit, thd->query());
my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
thd->killed= NOT_KILLED;
@@ -6309,7 +6257,8 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
if (thd->wsrep_retry_query)
{
- WSREP_DEBUG("releasing retry_query: conf %d sent %d kill %d errno %d SQL %s",
+ WSREP_DEBUG("releasing retry_query: "
+ "conf %d sent %d kill %d errno %d SQL %s",
thd->wsrep_conflict_state,
thd->stmt_da->is_sent,
thd->killed,
@@ -7304,7 +7253,7 @@ uint kill_one_thread(THD *thd, ulong id, killed_state kill_signal)
#ifdef WITH_WSREP
if (((thd->security_ctx->master_access & SUPER_ACL) ||
thd->security_ctx->user_matches(tmp->security_ctx)) &&
- !wsrep_thd_is_brute_force((void *)tmp))
+ !wsrep_thd_is_brute_force((void *)tmp))
#else
if ((thd->security_ctx->master_access & SUPER_ACL) ||
thd->security_ctx->user_matches(tmp->security_ctx))
@@ -8083,621 +8032,6 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name)
return definer;
}
-#ifdef WITH_WSREP
-/* must have (&thd->LOCK_wsrep_thd) */
-static void wsrep_client_rollback(THD *thd)
-{
- WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s",
- thd->thread_id, thd->query());
-
- thd->wsrep_conflict_state= ABORTING;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
- trans_rollback(thd);
-
- if (thd->locked_tables_mode && thd->lock)
- {
- WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id);
- thd->locked_tables_list.unlock_locked_tables(thd);
- thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
- }
-
- if (thd->global_read_lock.is_acquired())
- {
- WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id);
- thd->global_read_lock.unlock_global_read_lock(thd);
- }
-
- /* Release transactional metadata locks. */
- thd->mdl_context.release_transactional_locks();
-
- /* release explicit MDL locks */
- thd->mdl_context.release_explicit_locks();
-
- if (thd->get_binlog_table_maps())
- {
- WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id);
- thd->clear_binlog_table_maps();
- }
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- thd->wsrep_conflict_state= ABORTED;
- thd->wsrep_bf_thd = NULL;
-}
-
-static enum wsrep_status wsrep_apply_sql(
- THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed)
-{
- int error;
- enum wsrep_status ret_code= WSREP_OK;
- CHARSET_INFO *current_charset = thd->variables.character_set_client;
-
- DBUG_ENTER("wsrep_bf_execute_cb");
- thd->wsrep_exec_mode= REPL_RECV;
- thd->net.vio= 0;
- thd->start_time= timeval;
- thd->wsrep_rand= randseed;
-
- thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT;
-
- DBUG_PRINT("wsrep", ("SQL: %s", sql));
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- thd->wsrep_query_state= QUERY_EXEC;
- /* preserve replaying mode */
- if (thd->wsrep_conflict_state!= REPLAYING)
- thd->wsrep_conflict_state= NO_CONFLICT;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- if (!is_supported_parser_charset(current_charset))
- {
- /* Do not use non-supported parser character sets */
- WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
- thd->variables.character_set_client = &my_charset_latin1;
- WSREP_WARN("For BF SQL apply temporally setting character set to : %s",
- my_charset_latin1.csname);
- }
-
- if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) {
- WSREP_WARN("BF SQL apply failed: %d, %lld",
- thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
- thd->variables.character_set_client = current_charset;
- DBUG_RETURN(WSREP_FATAL);
- }
- thd->variables.character_set_client = current_charset;
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- if (thd->wsrep_conflict_state!= NO_CONFLICT &&
- thd->wsrep_conflict_state!= REPLAYING) {
- ret_code= WSREP_FATAL;
- WSREP_DEBUG("BF thd ending, with: %d, %lld",
- thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
- }
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- assert(thd->wsrep_exec_mode== REPL_RECV);
- DBUG_RETURN(ret_code);
-}
-
-void wsrep_write_rbr_buf(
- THD *thd, const void* rbr_buf, size_t buf_len)
-{
- char filename[PATH_MAX]= {0};
- int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
- wsrep_data_home_dir, thd->thread_id,
- (long long)thd->wsrep_trx_seqno);
- if (len >= PATH_MAX)
- {
- WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
- return;
- }
-
- FILE *of= fopen(filename, "wb");
- if (of)
- {
- (void) fwrite (rbr_buf, buf_len, 1, of);
- fclose(of);
- }
- else
- {
- WSREP_ERROR("Failed to open file '%s': %d (%s)",
- filename, errno, strerror(errno));
- }
-}
-
-static inline wsrep_status_t wsrep_apply_rbr(
- THD *thd, const uchar *rbr_buf, size_t buf_len)
-{
- char *buf= (char *)rbr_buf;
- int rcode= 0;
- int event= 1;
- Format_description_log_event *description_event = wsrep_format_desc;
- DBUG_ENTER("wsrep_apply_rbr");
-
- if (thd->killed == KILL_CONNECTION)
- {
- WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld",
- (long long) thd->wsrep_trx_seqno);
- DBUG_RETURN(WSREP_FATAL);
- }
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- thd->wsrep_query_state= QUERY_EXEC;
- if (thd->wsrep_conflict_state!= REPLAYING)
- thd->wsrep_conflict_state= NO_CONFLICT;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
- (long long) thd->wsrep_trx_seqno);
-
- if ((rcode= trans_begin(thd)))
- WSREP_WARN("begin for rbr apply failed: %lld, code: %d",
- (long long) thd->wsrep_trx_seqno, rcode);
-
- while(buf_len)
- {
- int exec_res;
- int error = 0;
- Log_event* ev= wsrep_read_log_event(&buf, &buf_len, description_event);
-
- if (!ev)
- {
- WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld",
- (long long)thd->wsrep_trx_seqno, (long int) buf_len);
- rcode= 1;
- goto error;
- }
- switch (ev->get_type_code()) {
- case WRITE_ROWS_EVENT:
- case UPDATE_ROWS_EVENT:
- case DELETE_ROWS_EVENT:
- DBUG_ASSERT(buf_len != 0 ||
- ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F));
- break;
- case FORMAT_DESCRIPTION_EVENT:
- description_event = (Format_description_log_event *)ev;
- break;
- default:
- break;
- }
-
- thd->server_id = ev->server_id; // use the original server id for logging
- thd->set_time(); // time the query
- wsrep_xid_init(&thd->transaction.xid_state.xid,
- wsrep_cluster_uuid(),
- thd->wsrep_trx_seqno);
- thd->lex->current_select= 0;
- if (!ev->when)
- ev->when = time(NULL);
- ev->thd = thd;
- exec_res = ev->apply_event(thd->wsrep_rli);
- DBUG_PRINT("info", ("exec_event result: %d", exec_res));
-
- if (exec_res)
- {
- WSREP_WARN("RBR event %d %s apply warning: %d, %lld",
- event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno);
- rcode= exec_res;
- /* stop processing for the first error */
- delete ev;
- goto error;
- }
- event++;
-
- if (thd->wsrep_conflict_state!= NO_CONFLICT &&
- thd->wsrep_conflict_state!= REPLAYING)
- WSREP_WARN("conflict state after RBR event applying: %d, %lld",
- thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno);
-
- if (thd->wsrep_conflict_state == MUST_ABORT) {
- WSREP_WARN("RBR event apply failed, rolling back: %lld",
- (long long) thd->wsrep_trx_seqno);
- trans_rollback(thd);
- thd->locked_tables_list.unlock_locked_tables(thd);
- /* Release transactional metadata locks. */
- thd->mdl_context.release_transactional_locks();
- thd->wsrep_conflict_state= NO_CONFLICT;
- DBUG_RETURN(WSREP_FATAL);
- }
-
- if ((ev->get_type_code() == WRITE_ROWS_EVENT ||
- ev->get_type_code() == UPDATE_ROWS_EVENT ||
- ev->get_type_code() == DELETE_ROWS_EVENT) &&
- ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F))
- {
- thd->wsrep_rli->cleanup_context(thd, 0);
-
- if (error == 0)
- {
- thd->clear_error();
- }
- else
- WSREP_ERROR("Error in %s event: commit of row events failed: %lld",
- ev->get_type_str(), (long long)thd->wsrep_trx_seqno);
- }
-
- if (description_event != ev)
- delete ev;
- }
-
- error:
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- thd->wsrep_query_state= QUERY_IDLE;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- assert(thd->wsrep_exec_mode== REPL_RECV);
-
- if (thd->killed == KILL_CONNECTION)
- WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno);
-
- if (rcode) DBUG_RETURN(WSREP_FATAL);
- DBUG_RETURN(WSREP_OK);
-}
-
-wsrep_status_t wsrep_apply_cb(void* const ctx,
- const void* const buf, size_t const buf_len,
- wsrep_seqno_t const global_seqno)
-{
- THD* const thd((THD*)ctx);
-
- thd->wsrep_trx_seqno= global_seqno;
-
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "applying write set %lld: %p, %zu",
- (long long)thd->wsrep_trx_seqno, buf, buf_len);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "applying write set");
-#endif /* WSREP_PROC_INFO */
-
- wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len));
-
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "applied write set %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "applied write set");
-#endif /* WSREP_PROC_INFO */
-
- if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len);
- TABLE *tmp;
- while ((tmp = thd->temporary_tables))
- {
- WSREP_DEBUG("Applier %lu, has temporary tables: %s.%s",
- thd->thread_id,
- (tmp->s) ? tmp->s->db.str : "void",
- (tmp->s) ? tmp->s->table_name.str : "void");
- close_temporary_table(thd, tmp, 1, 1);
- }
-
- return rcode;
-}
-
-#if DELETE // this does not work in 5.5
-/* a common wrapper for end_trans() function - to put all necessary stuff */
-static inline wsrep_status_t
-wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end)
-{
- if (0 == end_trans(thd, end))
- {
- return WSREP_OK;
- }
- else
- {
- return WSREP_FATAL;
- }
-}
-#endif
-
-wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno)
-{
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "committing %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "committing");
-#endif /* WSREP_PROC_INFO */
-
- wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0));
-// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT));
-
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "committed %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "committed");
-#endif /* WSREP_PROC_INFO */
-
- if (WSREP_OK == rcode)
- {
- // TODO: mark snapshot with global_seqno.
- }
-
- return rcode;
-}
-
-wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno)
-{
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "rolling back %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "rolling back");
-#endif /* WSREP_PROC_INFO */
-
- wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0));
-// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK));
-
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "rolled back %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "rolled back");
-#endif /* WSREP_PROC_INFO */
-
- return rcode;
-}
-
-wsrep_status_t wsrep_commit_cb(void* const ctx,
- wsrep_seqno_t const global_seqno,
- bool const commit)
-{
- THD* const thd((THD*)ctx);
-
- assert(global_seqno == thd->wsrep_trx_seqno);
-
- if (commit)
- return wsrep_commit(thd, global_seqno);
- else
- return wsrep_rollback(thd, global_seqno);
-}
-
-Relay_log_info* wsrep_relay_log_init(const char* log_fname)
-{
- Relay_log_info* rli= new Relay_log_info(false);
-
- rli->no_storage= true;
- if (!rli->relay_log.description_event_for_exec)
- {
- rli->relay_log.description_event_for_exec=
- new Format_description_log_event(4);
- }
-
- rli->sql_thd= current_thd;
- return rli;
-}
-
-void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
-{
- shadow->options = thd->variables.option_bits;
- shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
- shadow->vio = thd->net.vio;
-
- if (opt_log_slave_updates)
- thd->variables.option_bits|= OPTION_BIN_LOG;
- else
- thd->variables.option_bits&= ~(OPTION_BIN_LOG);
-
- if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay");
-
- thd->wsrep_exec_mode= REPL_RECV;
- thd->net.vio= 0;
- thd->clear_error();
-
- thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT;
-
- shadow->tx_isolation = thd->variables.tx_isolation;
- thd->variables.tx_isolation = ISO_READ_COMMITTED;
- thd->tx_isolation = ISO_READ_COMMITTED;
-
- shadow->db = thd->db;
- shadow->db_length = thd->db_length;
- thd->reset_db(NULL, 0);
-}
-
-void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
-{
- thd->variables.option_bits = shadow->options;
- thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
- thd->net.vio = shadow->vio;
- thd->variables.tx_isolation = shadow->tx_isolation;
-
- thd->reset_db(shadow->db, shadow->db_length);
-}
-
-void wsrep_replication_process(THD *thd)
-{
- int rcode;
- DBUG_ENTER("wsrep_replication_process");
-
- struct wsrep_thd_shadow shadow;
- wsrep_prepare_bf_thd(thd, &shadow);
-
- rcode = wsrep->recv(wsrep, (void *)thd);
- DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
-
- WSREP_INFO("applier thread exiting (code:%d)", rcode);
-
- switch (rcode) {
- case WSREP_OK:
- case WSREP_NOT_IMPLEMENTED:
- case WSREP_CONN_FAIL:
- /* provider does not support slave operations / disconnected from group,
- * just close applier thread */
- break;
- case WSREP_NODE_FAIL:
- /* data inconsistency => SST is needed */
- /* Note: we cannot just blindly restart replication here,
- * SST might require server restart if storage engines must be
- * initialized after SST */
- WSREP_ERROR("node consistency compromised, aborting");
- wsrep_kill_mysql(thd);
- break;
- case WSREP_WARNING:
- case WSREP_TRX_FAIL:
- case WSREP_TRX_MISSING:
- /* these suggests a bug in provider code */
- WSREP_WARN("bad return from recv() call: %d", rcode);
- /* fall through to node shutdown */
- case WSREP_FATAL:
- /* Cluster connectivity is lost.
- *
- * If applier was killed on purpose (KILL_CONNECTION), we
- * avoid mysql shutdown. This is because the killer will then handle
- * shutdown processing (or replication restarting)
- */
- if (thd->killed != KILL_CONNECTION)
- {
- wsrep_kill_mysql(thd);
- }
- break;
- }
-
- mysql_mutex_lock(&LOCK_thread_count);
- wsrep_close_applier(thd);
- mysql_cond_broadcast(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
-
- if (thd->temporary_tables)
- {
- WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id);
- }
- wsrep_return_from_bf_mode(thd, &shadow);
- DBUG_VOID_RETURN;
-}
-
-void wsrep_rollback_process(THD *thd)
-{
- DBUG_ENTER("wsrep_rollback_process");
-
- mysql_mutex_lock(&LOCK_wsrep_rollback);
- wsrep_aborting_thd= NULL;
-
- while (thd->killed == NOT_KILLED) {
- thd_proc_info(thd, "wsrep aborter idle");
- thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
- thd->mysys_var->current_cond= &COND_wsrep_rollback;
-
- mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
-
- WSREP_DEBUG("WSREP rollback thread wakes for signal");
-
- mysql_mutex_lock(&thd->mysys_var->mutex);
- thd_proc_info(thd, "wsrep aborter active");
- thd->mysys_var->current_mutex= 0;
- thd->mysys_var->current_cond= 0;
- mysql_mutex_unlock(&thd->mysys_var->mutex);
-
- /* check for false alarms */
- if (!wsrep_aborting_thd)
- {
- WSREP_DEBUG("WSREP rollback thread has empty abort queue");
- }
- /* process all entries in the queue */
- while (wsrep_aborting_thd) {
- THD *aborting;
- wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
- aborting = wsrep_aborting_thd->aborting_thd;
- my_free(wsrep_aborting_thd);
- wsrep_aborting_thd= next;
- /*
- * must release mutex, appliers my want to add more
- * aborting thds in our work queue, while we rollback
- */
- mysql_mutex_unlock(&LOCK_wsrep_rollback);
-
- mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
- if (aborting->wsrep_conflict_state== ABORTED)
- {
- WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
- (long long)aborting->real_id,
- aborting->wsrep_conflict_state);
-
- mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
- mysql_mutex_lock(&LOCK_wsrep_rollback);
- continue;
- }
- aborting->wsrep_conflict_state= ABORTING;
-
- mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
-
- aborting->store_globals();
-
- mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
- wsrep_client_rollback(aborting);
- WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)",
- aborting->thread_id, (long long)aborting->real_id);
- mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
-
- mysql_mutex_lock(&LOCK_wsrep_rollback);
- }
- }
-
- mysql_mutex_unlock(&LOCK_wsrep_rollback);
- sql_print_information("WSREP: rollbacker thread exiting");
-
- DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
- DBUG_VOID_RETURN;
-}
-extern "C"
-int wsrep_thd_is_brute_force(void *thd_ptr)
-{
- /*
- Brute force:
- Appliers and replaying are running in REPL_RECV mode. TOI statements
- in TOTAL_ORDER mode. Locally committing transaction that has got
- past wsrep->pre_commit() without error is running in LOCAL_COMMIT mode.
-
- Everything else is running in LOCAL_STATE and should not be considered
- brute force.
- */
- if (thd_ptr) {
- switch (((THD *)thd_ptr)->wsrep_exec_mode) {
- case LOCAL_STATE: return 0;
- case REPL_RECV: return 1;
- case TOTAL_ORDER: return 2;
- case LOCAL_COMMIT: return 3;
- }
- }
- DBUG_ASSERT(0);
- return 0;
-}
-extern "C"
-int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
-{
- THD *victim_thd = (THD *) victim_thd_ptr;
- THD *bf_thd = (THD *) bf_thd_ptr;
- DBUG_ENTER("wsrep_abort_thd");
-
- if ( (WSREP(bf_thd) ||
- ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) &&
- bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
- victim_thd)
- {
- WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
- (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
- ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
- }
- else
- {
- WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
- }
-
- DBUG_RETURN(1);
-}
-extern "C"
-int wsrep_thd_in_locking_session(void *thd_ptr)
-{
- if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
- return 1;
- }
- return 0;
-}
-#endif
/**
Retuns information about user or current user.
diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc
index 46f8100d9d1..da03e5f7a03 100644
--- a/sql/sql_trigger.cc
+++ b/sql/sql_trigger.cc
@@ -2453,7 +2453,7 @@ bool load_table_name_for_trigger(THD *thd,
DBUG_RETURN(FALSE);
}
#ifdef WITH_WSREP
-int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len)
+int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
{
LEX *lex= thd->lex;
String stmt_query;
@@ -2501,6 +2501,6 @@ int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len)
stmt_query.append(stmt_definition.str, stmt_definition.length);
return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
- buf, buf_len);
+ buf, buf_len);
}
#endif /* WITH_WSREP */
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index ef70b1e77d0..6c3dd171706 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -2571,7 +2571,9 @@ static bool fix_autocommit(sys_var *self, THD *thd, enum_var_type type)
{
thd->variables.option_bits&= ~OPTION_AUTOCOMMIT;
thd->mdl_context.release_transactional_locks();
+#ifdef WITH_WSREP
WSREP_DEBUG("autocommit, MDL TRX lock released: %lu", thd->thread_id);
+#endif /* WITH_WSREP */
return true;
}
/*
@@ -3654,7 +3656,9 @@ static Sys_var_tz Sys_time_zone(
SESSION_VAR(time_zone), NO_CMD_LINE,
DEFAULT(&default_tz), NO_MUTEX_GUARD, IN_BINLOG);
#ifdef WITH_WSREP
-#include "wsrep_mysqld.h"
+#include "wsrep_var.h"
+#include "wsrep_sst.h"
+#include "wsrep_binlog.h"
static Sys_var_charptr Sys_wsrep_provider(
"wsrep_provider", "Path to replication provider library",
@@ -3812,10 +3816,11 @@ static Sys_var_charptr Sys_wsrep_start_position (
ON_CHECK(wsrep_start_position_check),
ON_UPDATE(wsrep_start_position_update));
-static Sys_var_ulonglong Sys_wsrep_max_ws_size (
+static Sys_var_ulong Sys_wsrep_max_ws_size (
"wsrep_max_ws_size", "Max write set size (bytes)",
GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1));
+ /* Upper limit is 65K short of 4G to avoid overlows on 32-bit systems */
+ VALID_RANGE(1024, WSREP_MAX_WS_SIZE), DEFAULT(1073741824UL), BLOCK_SIZE(1));
static Sys_var_ulong Sys_wsrep_max_ws_rows (
"wsrep_max_ws_rows", "Max number of rows in write set",
@@ -3835,8 +3840,7 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK(
static Sys_var_mybool Sys_wsrep_causal_reads(
"wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations",
SESSION_VAR(wsrep_causal_reads),
- CMD_LINE(OPT_ARG), DEFAULT(FALSE));
- // ON_UPDATE(wsrep_causal_reads_update));
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS };
static Sys_var_enum Sys_wsrep_OSU_method(
diff --git a/sql/transaction.cc b/sql/transaction.cc
index c66b86ff87f..36ed15bfe9c 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -298,6 +298,7 @@ bool trans_rollback(THD *thd)
DBUG_RETURN(test(res));
}
+
/**
Implicitly rollback the current transaction, typically
after deadlock was discovered.
@@ -329,7 +330,6 @@ bool trans_rollback_implicit(THD *thd)
#ifdef WITH_WSREP
wsrep_register_hton(thd, true);
#endif /* WITH_WSREP */
-
thd->server_status&= ~SERVER_STATUS_IN_TRANS;
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= ha_rollback_trans(thd, true);
@@ -433,7 +433,6 @@ bool trans_rollback_stmt(THD *thd)
wsrep_register_hton(thd, FALSE);
#endif /* WITH_WSREP */
ha_rollback_trans(thd, FALSE);
-
if (! thd->in_active_multi_stmt_transaction())
thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation;
}
diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc
new file mode 100644
index 00000000000..f1016dff902
--- /dev/null
+++ b/sql/wsrep_applier.cc
@@ -0,0 +1,351 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#include "wsrep_priv.h"
+#include "wsrep_binlog.h" // wsrep_dump_rbr_buf()
+
+#include "log_event.h" // EVENT_LEN_OFFSET, etc.
+
+#include "wsrep_applier.h"
+
+/*
+ read the first event from (*buf). The size of the (*buf) is (*buf_len).
+ At the end (*buf) is shitfed to point to the following event or NULL and
+ (*buf_len) will be changed to account just being read bytes of the 1st event.
+*/
+
+static Log_event* wsrep_read_log_event(
+ char **arg_buf, size_t *arg_buf_len,
+ const Format_description_log_event *description_event)
+{
+ DBUG_ENTER("wsrep_read_log_event");
+ char *head= (*arg_buf);
+
+ uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
+ char *buf= (*arg_buf);
+ const char *error= 0;
+ Log_event *res= 0;
+
+ if (data_len > wsrep_max_ws_size)
+ {
+ error = "Event too big";
+ goto err;
+ }
+
+ res= Log_event::read_log_event(buf, data_len, &error, description_event,
+ FALSE);
+
+err:
+ if (!res)
+ {
+ DBUG_ASSERT(error != 0);
+ sql_print_error("Error in Log_event::read_log_event(): "
+ "'%s', data_len: %d, event_type: %d",
+ error,data_len,head[EVENT_TYPE_OFFSET]);
+ }
+ (*arg_buf)+= data_len;
+ (*arg_buf_len)-= data_len;
+ DBUG_RETURN(res);
+}
+
+#include "transaction.h" // trans_commit(), trans_rollback()
+#include "rpl_rli.h" // class Relay_log_info;
+#include "sql_base.h" // close_temporary_table()
+
+extern const Format_description_log_event *wsrep_format_desc;
+
+static wsrep_cb_status_t wsrep_apply_events(THD* thd,
+ const void* events_buf,
+ size_t buf_len)
+{
+ char *buf= (char *)events_buf;
+ int rcode= 0;
+ int event= 1;
+
+ DBUG_ENTER("wsrep_apply_rbr");
+
+ if (thd->killed == KILL_CONNECTION)
+ {
+ WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld",
+ (long long) wsrep_thd_trx_seqno(thd));
+ DBUG_RETURN(WSREP_CB_FAILURE);
+ }
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXEC;
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
+ (long long) wsrep_thd_trx_seqno(thd));
+
+ while(buf_len)
+ {
+ int exec_res;
+ int error = 0;
+ Log_event* ev= wsrep_read_log_event(&buf, &buf_len, wsrep_format_desc);
+
+ if (!ev)
+ {
+ WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld",
+ (long long)wsrep_thd_trx_seqno(thd), buf_len);
+ rcode= 1;
+ goto error;
+ }
+ switch (ev->get_type_code()) {
+ case WRITE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case DELETE_ROWS_EVENT:
+ DBUG_ASSERT(buf_len != 0 ||
+ ((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F));
+ break;
+ default:
+ break;
+ }
+
+ thd->server_id = ev->server_id; // use the original server id for logging
+ thd->set_time(); // time the query
+ wsrep_xid_init(&thd->transaction.xid_state.xid,
+ &thd->wsrep_trx_meta.gtid.uuid,
+ thd->wsrep_trx_meta.gtid.seqno);
+ thd->lex->current_select= 0;
+ if (!ev->when)
+ ev->when = time(NULL);
+ ev->thd = thd;
+ exec_res = ev->apply_event(thd->wsrep_rli);
+ DBUG_PRINT("info", ("exec_event result: %d", exec_res));
+
+ if (exec_res)
+ {
+ WSREP_WARN("RBR event %d %s apply warning: %d, %lld",
+ event, ev->get_type_str(), exec_res,
+ (long long) wsrep_thd_trx_seqno(thd));
+ rcode= exec_res;
+ /* stop processing for the first error */
+ delete ev;
+ goto error;
+ }
+ event++;
+
+ if (thd->wsrep_conflict_state!= NO_CONFLICT &&
+ thd->wsrep_conflict_state!= REPLAYING)
+ WSREP_WARN("conflict state after RBR event applying: %d, %lld",
+ thd->wsrep_query_state, (long long)wsrep_thd_trx_seqno(thd));
+
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ WSREP_WARN("RBR event apply failed, rolling back: %lld",
+ (long long) wsrep_thd_trx_seqno(thd));
+ trans_rollback(thd);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ DBUG_RETURN(WSREP_CB_FAILURE);
+ }
+
+ if ((ev->get_type_code() == WRITE_ROWS_EVENT ||
+ ev->get_type_code() == UPDATE_ROWS_EVENT ||
+ ev->get_type_code() == DELETE_ROWS_EVENT) &&
+ ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F))
+ {
+ thd->wsrep_rli->cleanup_context(thd, 0);
+
+ if (error == 0)
+ {
+ thd->clear_error();
+ }
+ else
+ WSREP_ERROR("Error in %s event: commit of row events failed: %lld",
+ ev->get_type_str(), (long long)wsrep_thd_trx_seqno(thd));
+ }
+ delete ev;
+ }
+
+ error:
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_IDLE;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ assert(thd->wsrep_exec_mode== REPL_RECV);
+
+ if (thd->killed == KILL_CONNECTION)
+ WSREP_INFO("applier aborted: %lld", (long long)wsrep_thd_trx_seqno(thd));
+
+ if (rcode) DBUG_RETURN(WSREP_CB_FAILURE);
+ DBUG_RETURN(WSREP_CB_SUCCESS);
+}
+
+wsrep_cb_status_t wsrep_apply_cb(void* const ctx,
+ const void* const buf,
+ size_t const buf_len,
+ uint32_t const flags,
+ const wsrep_trx_meta_t* meta)
+{
+ THD* const thd((THD*)ctx);
+
+ thd->wsrep_trx_meta = *meta;
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "applying write set %lld: %p, %zu",
+ (long long)wsrep_thd_trx_seqno(thd), buf, buf_len);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "applying write set");
+#endif /* WSREP_PROC_INFO */
+
+ if (flags & WSREP_FLAG_ISOLATION)
+ {
+ thd->wsrep_apply_toi= true;
+ /*
+ Don't run in transaction mode with TOI actions.
+ */
+ thd->variables.option_bits&= ~OPTION_BEGIN;
+ thd->server_status&= ~SERVER_STATUS_IN_TRANS;
+ }
+ wsrep_cb_status_t rcode(wsrep_apply_events(thd, buf, buf_len));
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "applied write set %lld", (long long)wsrep_thd_trx_seqno(thd));
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "applied write set");
+#endif /* WSREP_PROC_INFO */
+
+ if (WSREP_CB_SUCCESS != rcode)
+ {
+ wsrep_dump_rbr_buf(thd, buf, buf_len);
+ }
+
+ TABLE *tmp;
+ while ((tmp = thd->temporary_tables))
+ {
+ WSREP_DEBUG("Applier %lu, has temporary tables: %s.%s",
+ thd->thread_id,
+ (tmp->s) ? tmp->s->db.str : "void",
+ (tmp->s) ? tmp->s->table_name.str : "void");
+ close_temporary_table(thd, tmp, 1, 1);
+ }
+
+ return rcode;
+}
+
+static wsrep_cb_status_t wsrep_commit(THD* const thd,
+ wsrep_seqno_t const global_seqno)
+{
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "committing %lld", (long long)wsrep_thd_trx_seqno(thd));
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "committing");
+#endif /* WSREP_PROC_INFO */
+
+ wsrep_cb_status_t const rcode(trans_commit(thd) ?
+ WSREP_CB_FAILURE : WSREP_CB_SUCCESS);
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "committed %lld", (long long)wsrep_thd_trx_seqno(thd));
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "committed");
+#endif /* WSREP_PROC_INFO */
+
+ if (WSREP_CB_SUCCESS == rcode)
+ {
+ // TODO: mark snapshot with global_seqno.
+ }
+
+ return rcode;
+}
+
+static wsrep_cb_status_t wsrep_rollback(THD* const thd,
+ wsrep_seqno_t const global_seqno)
+{
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "rolling back %lld", (long long)wsrep_thd_trx_seqno(thd));
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "rolling back");
+#endif /* WSREP_PROC_INFO */
+
+ wsrep_cb_status_t const rcode(trans_rollback(thd) ?
+ WSREP_CB_FAILURE : WSREP_CB_SUCCESS);
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "rolled back %lld", (long long)wsrep_thd_trx_seqno(thd));
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "rolled back");
+#endif /* WSREP_PROC_INFO */
+
+ return rcode;
+}
+
+wsrep_cb_status_t wsrep_commit_cb(void* const ctx,
+ uint32_t const flags,
+ const wsrep_trx_meta_t* meta,
+ wsrep_bool_t* const exit,
+ bool const commit)
+{
+ THD* const thd((THD*)ctx);
+
+ assert(meta->gtid.seqno == wsrep_thd_trx_seqno(thd));
+
+ wsrep_cb_status_t rcode;
+
+ if (commit)
+ rcode = wsrep_commit(thd, meta->gtid.seqno);
+ else
+ rcode = wsrep_rollback(thd, meta->gtid.seqno);
+
+ thd->mdl_context.release_transactional_locks();
+ free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
+ thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation;
+
+ if (wsrep_slave_count_change < 0 && commit && WSREP_CB_SUCCESS == rcode)
+ {
+ mysql_mutex_lock(&LOCK_wsrep_slave_threads);
+ if (wsrep_slave_count_change < 0)
+ {
+ wsrep_slave_count_change++;
+ *exit = true;
+ }
+ mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
+ }
+
+ if (*exit == false && thd->wsrep_applier)
+ {
+ /* From trans_begin() */
+ thd->variables.option_bits|= OPTION_BEGIN;
+ thd->server_status|= SERVER_STATUS_IN_TRANS;
+ thd->wsrep_apply_toi= false;
+ }
+
+ return rcode;
+}
+
+
+wsrep_cb_status_t wsrep_unordered_cb(void* const ctx,
+ const void* const data,
+ size_t const size)
+{
+ return WSREP_CB_SUCCESS;
+}
diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h
new file mode 100644
index 00000000000..816970db67c
--- /dev/null
+++ b/sql/wsrep_applier.h
@@ -0,0 +1,38 @@
+/* Copyright 2013 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef WSREP_APPLIER_H
+#define WSREP_APPLIER_H
+
+#include <sys/types.h>
+
+/* wsrep callback prototypes */
+
+wsrep_cb_status_t wsrep_apply_cb(void *ctx,
+ const void* buf, size_t buf_len,
+ uint32_t flags,
+ const wsrep_trx_meta_t* meta);
+
+wsrep_cb_status_t wsrep_commit_cb(void *ctx,
+ uint32_t flags,
+ const wsrep_trx_meta_t* meta,
+ wsrep_bool_t* exit,
+ bool commit);
+
+wsrep_cb_status_t wsrep_unordered_cb(void* ctx,
+ const void* data,
+ size_t size);
+
+#endif /* WSREP_APPLIER_H */
diff --git a/sql/wsrep_binlog.cc b/sql/wsrep_binlog.cc
new file mode 100644
index 00000000000..a0c56ce9299
--- /dev/null
+++ b/sql/wsrep_binlog.cc
@@ -0,0 +1,321 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#include "wsrep_binlog.h"
+#include "wsrep_priv.h"
+
+/*
+ Write the contents of a cache to a memory buffer.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len)
+{
+ *buf= NULL;
+ *buf_len= 0;
+
+ my_off_t const saved_pos(my_b_tell(cache));
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ return ER_ERROR_ON_WRITE;
+ }
+
+ uint length = my_b_bytes_in_cache(cache);
+ if (unlikely(0 == length)) length = my_b_fill(cache);
+
+ size_t total_length = 0;
+
+ if (likely(length > 0)) do
+ {
+ total_length += length;
+ /*
+ Bail out if buffer grows too large.
+ A temporary fix to avoid allocating indefinitely large buffer,
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (total_length > wsrep_max_ws_size)
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ goto error;
+ }
+
+ uchar* tmp = (uchar *)my_realloc(*buf, total_length, MYF(0));
+ if (!tmp)
+ {
+ WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
+ *buf_len, length);
+ goto error;
+ }
+ *buf = tmp;
+
+ memcpy(*buf + *buf_len, cache->read_pos, length);
+ *buf_len = total_length;
+ cache->read_pos = cache->read_end;
+ } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_WARN("failed to initialize io-cache");
+ goto cleanup;
+ }
+
+ return 0;
+
+error:
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ }
+cleanup:
+ my_free(*buf);
+ *buf= NULL;
+ *buf_len= 0;
+ return ER_ERROR_ON_WRITE;
+}
+
+#define STACK_SIZE 4096 /* 4K - for buffer preallocated on the stack:
+ * many transactions would fit in there
+ * so there is no need to reach for the heap */
+
+/* Returns minimum multiple of HEAP_PAGE_SIZE that is >= length */
+static inline size_t
+heap_size(size_t length)
+{
+ return (length + HEAP_PAGE_SIZE - 1)/HEAP_PAGE_SIZE*HEAP_PAGE_SIZE;
+}
+
+/* append data to writeset */
+static inline wsrep_status_t
+wsrep_append_data(wsrep_t* const wsrep,
+ wsrep_ws_handle_t* const ws,
+ const void* const data,
+ size_t const len)
+{
+ struct wsrep_buf const buff = { data, len };
+ wsrep_status_t const rc(wsrep->append_data(wsrep, ws, &buff, 1,
+ WSREP_DATA_ORDERED, true));
+ if (rc != WSREP_OK)
+ {
+ WSREP_WARN("append_data() returned %d", rc);
+ }
+
+ return rc;
+}
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+
+ This version reads all of cache into single buffer and then appends to a
+ writeset at once.
+ */
+static int wsrep_write_cache_once(wsrep_t* const wsrep,
+ THD* const thd,
+ IO_CACHE* const cache,
+ size_t* const len)
+{
+ my_off_t const saved_pos(my_b_tell(cache));
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ return ER_ERROR_ON_WRITE;
+ }
+
+ int err(WSREP_OK);
+
+ size_t total_length(0);
+ uchar stack_buf[STACK_SIZE]; /* to avoid dynamic allocations for few data*/
+ uchar* heap_buf(NULL);
+ uchar* buf(stack_buf);
+ size_t allocated(sizeof(stack_buf));
+ size_t used(0);
+
+ uint length(my_b_bytes_in_cache(cache));
+ if (unlikely(0 == length)) length = my_b_fill(cache);
+
+ if (likely(length > 0)) do
+ {
+ total_length += length;
+ /*
+ Bail out if buffer grows too large.
+ A temporary fix to avoid allocating indefinitely large buffer,
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (unlikely(total_length > wsrep_max_ws_size))
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ goto cleanup;
+ }
+
+ if (total_length > allocated)
+ {
+ size_t const new_size(heap_size(total_length));
+ uchar* tmp = (uchar *)my_realloc(heap_buf, new_size, MYF(0));
+ if (!tmp)
+ {
+ WSREP_ERROR("could not (re)allocate buffer: %zu + %u",
+ allocated, length);
+ err = WSREP_SIZE_EXCEEDED;
+ goto cleanup;
+ }
+
+ heap_buf = tmp;
+ buf = heap_buf;
+ allocated = new_size;
+
+ if (used <= STACK_SIZE && used > 0) // there's data in stack_buf
+ {
+ DBUG_ASSERT(buf == stack_buf);
+ memcpy(heap_buf, stack_buf, used);
+ }
+ }
+
+ memcpy(buf + used, cache->read_pos, length);
+ used = total_length;
+ cache->read_pos = cache->read_end;
+ } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+
+ if (used > 0)
+ err = wsrep_append_data(wsrep, &thd->wsrep_ws_handle, buf, used);
+
+ if (WSREP_OK == err) *len = total_length;
+
+cleanup:
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_ERROR("failed to reinitialize io-cache");
+ }
+
+ if (unlikely(WSREP_OK != err)) wsrep_dump_rbr_buf(thd, buf, used);
+
+ my_free(heap_buf);
+ return err;
+}
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+
+ This version uses incremental data appending as it reads it from cache.
+ */
+static int wsrep_write_cache_inc(wsrep_t* const wsrep,
+ THD* const thd,
+ IO_CACHE* const cache,
+ size_t* const len)
+{
+ my_off_t const saved_pos(my_b_tell(cache));
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ {
+ WSREP_ERROR("failed to initialize io-cache");
+ return WSREP_TRX_ROLLBACK;
+ }
+
+ int err(WSREP_OK);
+
+ size_t total_length(0);
+
+ uint length(my_b_bytes_in_cache(cache));
+ if (unlikely(0 == length)) length = my_b_fill(cache);
+
+ if (likely(length > 0)) do
+ {
+ total_length += length;
+ /* bail out if buffer grows too large
+ not a real limit on a writeset size which includes other things
+ like header and keys.
+ */
+ if (unlikely(total_length > wsrep_max_ws_size))
+ {
+ WSREP_WARN("transaction size limit (%lu) exceeded: %zu",
+ wsrep_max_ws_size, total_length);
+ err = WSREP_SIZE_EXCEEDED;
+ goto cleanup;
+ }
+
+ if(WSREP_OK != (err=wsrep_append_data(wsrep, &thd->wsrep_ws_handle,
+ cache->read_pos, length)))
+ goto cleanup;
+
+ cache->read_pos = cache->read_end;
+ } while ((cache->file >= 0) && (length = my_b_fill(cache)));
+
+ if (WSREP_OK == err) *len = total_length;
+
+cleanup:
+ if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
+ {
+ WSREP_ERROR("failed to reinitialize io-cache");
+ }
+
+ return err;
+}
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+int wsrep_write_cache(wsrep_t* const wsrep,
+ THD* const thd,
+ IO_CACHE* const cache,
+ size_t* const len)
+{
+ if (wsrep_incremental_data_collection) {
+ return wsrep_write_cache_inc(wsrep, thd, cache, len);
+ }
+ else {
+ return wsrep_write_cache_once(wsrep, thd, cache, len);
+ }
+}
+
+void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len)
+{
+ char filename[PATH_MAX]= {0};
+ int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
+ wsrep_data_home_dir, thd->thread_id,
+ (long long)wsrep_thd_trx_seqno(thd));
+ if (len >= PATH_MAX)
+ {
+ WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
+ return;
+ }
+
+ FILE *of= fopen(filename, "wb");
+ if (of)
+ {
+ fwrite (rbr_buf, buf_len, 1, of);
+ fclose(of);
+ }
+ else
+ {
+ WSREP_ERROR("Failed to open file '%s': %d (%s)",
+ filename, errno, strerror(errno));
+ }
+}
+
diff --git a/sql/wsrep_binlog.h b/sql/wsrep_binlog.h
new file mode 100644
index 00000000000..6de73b2f5ee
--- /dev/null
+++ b/sql/wsrep_binlog.h
@@ -0,0 +1,49 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_BINLOG_H
+#define WSREP_BINLOG_H
+
+#include "sql_class.h" // THD, IO_CACHE
+
+#define HEAP_PAGE_SIZE 65536 /* 64K */
+#define WSREP_MAX_WS_SIZE (0xFFFFFFFFUL - HEAP_PAGE_SIZE)
+
+/*
+ Write the contents of a cache to a memory buffer.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len);
+
+/*
+ Write the contents of a cache to wsrep provider.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+
+ @param len total amount of data written
+ @return wsrep error status
+ */
+int wsrep_write_cache (wsrep_t* wsrep,
+ THD* thd,
+ IO_CACHE* cache,
+ size_t* len);
+
+/* Dump replication buffer to disk */
+void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
+
+#endif /* WSREP_BINLOG_H */
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc
index d4bb77c9e6f..8eb5340dd58 100644
--- a/sql/wsrep_hton.cc
+++ b/sql/wsrep_hton.cc
@@ -18,7 +18,7 @@
#include "rpl_filter.h"
#include <sql_class.h>
#include "wsrep_mysqld.h"
-#include "wsrep_priv.h"
+#include "wsrep_binlog.h"
#include <cstdio>
#include <cstdlib>
@@ -26,10 +26,11 @@ extern handlerton *binlog_hton;
extern int binlog_close_connection(handlerton *hton, THD *thd);
extern ulonglong thd_to_trx_id(THD *thd);
-extern "C" int thd_binlog_format(const MYSQL_THD thd);
-// todo: share interface with ha_innodb.c
+extern "C" int thd_binlog_format(const MYSQL_THD thd);
+// todo: share interface with ha_innodb.c
-enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
+enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton,
+ bool all);
/*
Cleanup after local transaction commit/rollback, replay or TOI.
@@ -37,8 +38,9 @@ enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool al
void wsrep_cleanup_transaction(THD *thd)
{
if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd);
- thd->wsrep_trx_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
- thd->wsrep_trx_seqno= WSREP_SEQNO_UNDEFINED;
+ thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
+ thd->wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED;
+ thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED;
thd->wsrep_exec_mode= LOCAL_STATE;
return;
}
@@ -66,7 +68,7 @@ handlerton *wsrep_hton;
*/
void wsrep_register_hton(THD* thd, bool all)
{
- if (thd->wsrep_exec_mode != TOTAL_ORDER)
+ if (thd->wsrep_exec_mode != TOTAL_ORDER && !thd->wsrep_apply_toi)
{
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next())
@@ -94,8 +96,8 @@ void wsrep_post_commit(THD* thd, bool all)
{
if (thd->wsrep_exec_mode == LOCAL_COMMIT)
{
- DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
- if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle))
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED);
+ if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle))
{
DBUG_PRINT("wsrep", ("set committed fail"));
WSREP_WARN("set committed fail: %llu %d",
@@ -106,7 +108,7 @@ void wsrep_post_commit(THD* thd, bool all)
}
/*
- wsrep exploits binlog's caches even if binlogging itself is not
+ wsrep exploits binlog's caches even if binlogging itself is not
activated. In such case connection close needs calling
actual binlog's method.
Todo: split binlog hton from its caches to use ones by wsrep
@@ -125,7 +127,7 @@ wsrep_close_connection(handlerton* hton, THD* thd)
if (wsrep_emulate_bin_log && thd_get_ha_data(thd, binlog_hton) != NULL)
binlog_hton->close_connection (binlog_hton, thd);
DBUG_RETURN(0);
-}
+}
/*
prepare/wsrep_run_wsrep_commit can fail in two ways
@@ -147,18 +149,15 @@ static int wsrep_prepare(handlerton *hton, THD *thd, bool all)
DBUG_ASSERT(thd->ha_data[wsrep_hton->slot].ha_info[all].is_trx_read_write());
DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
- DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
- if ((all ||
+ if ((all ||
!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd)))
{
switch (wsrep_run_wsrep_commit(thd, hton, all))
{
case WSREP_TRX_OK:
- // DBUG_ASSERT(thd->wsrep_trx_seqno > old ||
- // thd->wsrep_exec_mode == REPL_RECV ||
- // thd->wsrep_exec_mode == TOTAL_ORDER);
break;
case WSREP_TRX_ROLLBACK:
case WSREP_TRX_ERROR:
@@ -208,10 +207,10 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY))
{
- if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle))
+ if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
{
DBUG_PRINT("wsrep", ("setting rollback fail"));
- WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
+ WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
(long long)thd->real_id, thd->query());
}
wsrep_cleanup_transaction(thd);
@@ -249,12 +248,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all)
possible changes to clean state.
*/
if (WSREP_PROVIDER_EXISTS) {
- if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle))
- {
- DBUG_PRINT("wsrep", ("setting rollback fail"));
- WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
- (long long)thd->real_id, thd->query());
- }
+ if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
+ {
+ DBUG_PRINT("wsrep", ("setting rollback fail"));
+ WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
+ (long long)thd->real_id, thd->query());
+ }
}
wsrep_cleanup_transaction(thd);
}
@@ -266,26 +265,24 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all)
extern Rpl_filter* binlog_filter;
extern my_bool opt_log_slave_updates;
-extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
+
enum wsrep_trx_status
-wsrep_run_wsrep_commit(
- THD *thd, handlerton *hton, bool all)
+wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
{
- int rcode = -1;
- uint data_len = 0;
- uchar *rbr_data = NULL;
+ int rcode= -1;
+ size_t data_len= 0;
IO_CACHE *cache;
int replay_round= 0;
if (thd->stmt_da->is_error()) {
- WSREP_ERROR("commit issue, error: %d %s",
+ WSREP_ERROR("commit issue, error: %d %s",
thd->stmt_da->sql_errno(), thd->stmt_da->message());
}
DBUG_ENTER("wsrep_run_wsrep_commit");
- if (thd->slave_thread && !opt_log_slave_updates) {
- DBUG_RETURN(WSREP_TRX_OK);
- }
+
+ if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK);
+
if (thd->wsrep_exec_mode == REPL_RECV) {
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
@@ -303,9 +300,9 @@ wsrep_run_wsrep_commit(
}
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
}
- if (thd->wsrep_exec_mode != LOCAL_STATE) {
- DBUG_RETURN(WSREP_TRX_OK);
- }
+
+ if (thd->wsrep_exec_mode != LOCAL_STATE) DBUG_RETURN(WSREP_TRX_OK);
+
if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) {
WSREP_DEBUG("commit for consistency check: %s", thd->query());
DBUG_RETURN(WSREP_TRX_OK);
@@ -327,10 +324,10 @@ wsrep_run_wsrep_commit(
mysql_mutex_lock(&LOCK_wsrep_replaying);
- while (wsrep_replaying > 0 &&
+ while (wsrep_replaying > 0 &&
thd->wsrep_conflict_state == NO_CONFLICT &&
thd->killed == NOT_KILLED &&
- !shutdown_in_progress)
+ !shutdown_in_progress)
{
mysql_mutex_unlock(&LOCK_wsrep_replaying);
@@ -348,9 +345,12 @@ wsrep_run_wsrep_commit(
struct timespec wtime = {0, 1000000};
mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
&wtime);
+
if (replay_round++ % 100000 == 0)
- WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)",
- wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round);
+ WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) "
+ "conflict: %d (round: %d)",
+ wsrep_replaying, thd->thread_id,
+ thd->wsrep_conflict_state, replay_round);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
@@ -371,7 +371,8 @@ wsrep_run_wsrep_commit(
WSREP_DEBUG("innobase_commit abort after replaying wait %s",
(thd->query()) ? thd->query() : "void");
DBUG_RETURN(WSREP_TRX_ROLLBACK);
- }
+ }
+
thd->wsrep_query_state = QUERY_COMMITTING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
@@ -379,28 +380,28 @@ wsrep_run_wsrep_commit(
rcode = 0;
if (cache) {
thd->binlog_flush_pending_rows_event(true);
- rcode = wsrep_write_cache(cache, &rbr_data, &data_len);
- if (rcode) {
- WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode);
- if (data_len) my_free(rbr_data);
+ rcode = wsrep_write_cache(wsrep, thd, cache, &data_len);
+ if (WSREP_OK != rcode) {
+ WSREP_ERROR("rbr write fail, data_len: %zu, %d", data_len, rcode);
DBUG_RETURN(WSREP_TRX_ROLLBACK);
}
}
- if (data_len == 0)
+
+ if (data_len == 0)
{
- if (thd->stmt_da->is_ok() &&
+ if (thd->stmt_da->is_ok() &&
thd->stmt_da->affected_rows() > 0 &&
!binlog_filter->is_on())
{
WSREP_DEBUG("empty rbr buffer, query: %s, "
- "affected rows: %llu, "
- "changed tables: %d, "
+ "affected rows: %llu, "
+ "changed tables: %d, "
"sql_log_bin: %d, "
- "wsrep status (%d %d %d)",
+ "wsrep status (%d %d %d)",
thd->query(), thd->stmt_da->affected_rows(),
stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin,
- thd->wsrep_exec_mode, thd->wsrep_query_state,
- thd->wsrep_conflict_state);
+ thd->wsrep_exec_mode, thd->wsrep_query_state,
+ thd->wsrep_conflict_state);
}
else
{
@@ -409,38 +410,33 @@ wsrep_run_wsrep_commit(
thd->wsrep_query_state= QUERY_EXEC;
DBUG_RETURN(WSREP_TRX_OK);
}
- if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_trx_handle.trx_id)
+
+ if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id)
{
- WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %d\n"
- "QUERY: %s\n"
- " => Skipping replication",
- thd->thread_id, data_len, thd->query());
- if (wsrep_debug)
- {
- wsrep_write_rbr_buf(thd, rbr_data, data_len);
- }
+ WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n"
+ "QUERY: %s\n"
+ " => Skipping replication",
+ thd->thread_id, data_len, thd->query());
rcode = WSREP_TRX_FAIL;
}
else if (!rcode)
{
- rcode = wsrep->pre_commit(
- wsrep,
- (wsrep_conn_id_t)thd->thread_id,
- &thd->wsrep_trx_handle,
- rbr_data,
- data_len,
- (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL,
- &thd->wsrep_trx_seqno);
- switch (rcode) {
- case WSREP_TRX_MISSING:
+ if (WSREP_OK == rcode)
+ rcode = wsrep->pre_commit(wsrep,
+ (wsrep_conn_id_t)thd->thread_id,
+ &thd->wsrep_ws_handle,
+ WSREP_FLAG_COMMIT |
+ ((thd->wsrep_PA_safe) ?
+ 0ULL : WSREP_FLAG_PA_UNSAFE),
+ &thd->wsrep_trx_meta);
+
+ if (rcode == WSREP_TRX_MISSING) {
WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s",
thd->thread_id, thd->query());
- wsrep_write_rbr_buf(thd, rbr_data, data_len);
rcode = WSREP_TRX_FAIL;
- break;
- case WSREP_BF_ABORT:
+ } else if (rcode == WSREP_BF_ABORT) {
WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay",
- thd->thread_id, (long long)thd->wsrep_trx_seqno);
+ thd->thread_id, (long long)thd->wsrep_trx_meta.gtid.seqno);
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_conflict_state = MUST_REPLAY;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
@@ -449,22 +445,14 @@ wsrep_run_wsrep_commit(
WSREP_DEBUG("replaying increased: %d, thd: %lu",
wsrep_replaying, thd->thread_id);
mysql_mutex_unlock(&LOCK_wsrep_replaying);
- break;
- default:
- break;
}
} else {
WSREP_ERROR("I/O error reading from thd's binlog iocache: "
"errno=%d, io cache code=%d", my_errno, cache->error);
- if (data_len) my_free(rbr_data);
DBUG_ASSERT(0); // failure like this can not normally happen
DBUG_RETURN(WSREP_TRX_ERROR);
}
- if (data_len) {
- my_free(rbr_data);
- }
-
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch(rcode) {
case 0:
@@ -481,22 +469,22 @@ wsrep_run_wsrep_commit(
{
WSREP_WARN("thd %lu seqno %lld: conflict state %d after post commit",
thd->thread_id,
- (long long)thd->wsrep_trx_seqno,
+ (long long)thd->wsrep_trx_meta.gtid.seqno,
thd->wsrep_conflict_state);
}
thd->wsrep_exec_mode= LOCAL_COMMIT;
- DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED);
/* Override XID iff it was generated by mysql */
if (thd->transaction.xid_state.xid.get_my_xid())
{
wsrep_xid_init(&thd->transaction.xid_state.xid,
- wsrep_cluster_uuid(),
- thd->wsrep_trx_seqno);
+ &thd->wsrep_trx_meta.gtid.uuid,
+ thd->wsrep_trx_meta.gtid.seqno);
}
DBUG_PRINT("wsrep", ("replicating commit success"));
break;
case WSREP_BF_ABORT:
- DBUG_ASSERT(thd->wsrep_trx_seqno != WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno != WSREP_SEQNO_UNDEFINED);
case WSREP_TRX_FAIL:
WSREP_DEBUG("commit failed for reason: %d", rcode);
DBUG_PRINT("wsrep", ("replicating commit fail"));
@@ -505,7 +493,7 @@ wsrep_run_wsrep_commit(
if (thd->wsrep_conflict_state == MUST_ABORT) {
thd->wsrep_conflict_state= ABORTED;
- }
+ }
else
{
WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state);
@@ -563,14 +551,15 @@ mysql_declare_plugin(wsrep)
&wsrep_storage_engine,
"wsrep",
"Codership Oy",
- "A pseudo storage engine to represent transactions in multi-master synchornous replication",
+ "A pseudo storage engine to represent transactions in multi-master "
+ "synchornous replication",
PLUGIN_LICENSE_GPL,
wsrep_hton_init, /* Plugin Init */
NULL, /* Plugin Deinit */
0x0100 /* 1.0 */,
NULL, /* status variables */
NULL, /* system variables */
- NULL, /* config options */
+ NULL, /* config options */
0, /* flags */
}
mysql_declare_plugin_end;
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 2181054a34c..666952e6f52 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -1,4 +1,4 @@
-/* Copyright 2008 Codership Oy <http://www.codership.com>
+/* Copyright 2008-2013 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -17,11 +17,17 @@
#include <sql_class.h>
#include <sql_parse.h>
#include "wsrep_priv.h"
+#include "wsrep_thd.h"
+#include "wsrep_sst.h"
+#include "wsrep_utils.h"
+#include "wsrep_var.h"
+#include "wsrep_binlog.h"
+#include "wsrep_applier.h"
#include <cstdio>
#include <cstdlib>
#include "log_event.h"
-extern Format_description_log_event *wsrep_format_desc;
+Format_description_log_event *wsrep_format_desc = NULL;
wsrep_t *wsrep = NULL;
my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
@@ -33,24 +39,26 @@ const char* wsrep_data_home_dir = NULL;
const char* wsrep_dbug_option = "";
long wsrep_slave_threads = 1; // # of slave action appliers wanted
+int wsrep_slave_count_change = 0; // # of appliers to stop or start
my_bool wsrep_debug = 0; // enable debug level logging
my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx
ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx
my_bool wsrep_auto_increment_control = 1; // control auto increment variables
my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey
my_bool wsrep_incremental_data_collection = 0; // incremental data collection
-long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size
-long wsrep_max_ws_rows = 65536; // max number of rows in ws
+ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size
+ulong wsrep_max_ws_rows = 65536; // max number of rows in ws
int wsrep_to_isolation = 0; // # of active TO isolation threads
my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key
long wsrep_max_protocol_version = 2; // maximum protocol version to use
ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC;
my_bool wsrep_recovery = 0; // recovery
my_bool wsrep_replicate_myisam = 0; // enable myisam replication
-my_bool wsrep_log_conflicts = 0; //
+my_bool wsrep_log_conflicts = 0;
ulong wsrep_mysql_replication_bundle = 0;
+my_bool wsrep_desync = 0; // desynchronize the node from the
+ // cluster
my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals
-my_bool wsrep_desync = 0; // desynchronize the node from the cluster
/*
* End configuration options
@@ -88,12 +96,12 @@ long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
long wsrep_cluster_size = 0;
long wsrep_local_index = -1;
+long long wsrep_local_bf_aborts = 0;
const char* wsrep_provider_name = provider_name;
const char* wsrep_provider_version = provider_version;
const char* wsrep_provider_vendor = provider_vendor;
/* End wsrep status variables */
-
wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
wsp::node_status local_status;
@@ -104,14 +112,6 @@ long wsrep_protocol_version = 2;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
-// action execute callback
-extern wsrep_status_t wsrep_apply_cb(void *ctx,
- const void* buf, size_t buf_len,
- wsrep_seqno_t global_seqno);
-
-extern wsrep_status_t wsrep_commit_cb (void *ctx,
- wsrep_seqno_t global_seqno,
- bool commit);
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
@@ -195,19 +195,25 @@ void wsrep_get_SE_checkpoint(XID* xid)
plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
}
-static void wsrep_view_handler_cb (void* app_ctx,
- void* recv_ctx,
- const wsrep_view_info_t* view,
- const char* state,
- size_t state_len,
- void** sst_req,
- ssize_t* sst_req_len)
+static wsrep_cb_status_t
+wsrep_view_handler_cb (void* app_ctx,
+ void* recv_ctx,
+ const wsrep_view_info_t* view,
+ const char* state,
+ size_t state_len,
+ void** sst_req,
+ size_t* sst_req_len)
{
+ *sst_req = NULL;
+ *sst_req_len = 0;
+
wsrep_member_status_t new_status= local_status.get();
- if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t)))
+ if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
{
- memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid));
+ memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid,
+ sizeof(cluster_uuid));
+
wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
sizeof(cluster_uuid_str));
}
@@ -219,7 +225,7 @@ static void wsrep_view_handler_cb (void* app_ctx,
WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
"number of nodes: %ld, my index: %ld, protocol version %d",
- wsrep_cluster_state_uuid, (long long)view->seqno,
+ wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
(long long)wsrep_cluster_conf_id, wsrep_cluster_status,
wsrep_cluster_size, wsrep_local_index, view->proto_ver);
@@ -274,16 +280,18 @@ static void wsrep_view_handler_cb (void* app_ctx,
WSREP_DEBUG("[debug]: closing client connections for PRIM");
wsrep_close_client_connections(TRUE);
- *sst_req_len= wsrep_sst_prepare (sst_req);
+ ssize_t const req_len= wsrep_sst_prepare (sst_req);
- if (*sst_req_len < 0)
+ if (req_len < 0)
{
- int err = *sst_req_len;
- WSREP_ERROR("SST preparation failed: %d (%s)", -err, strerror(-err));
+ WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len,
+ strerror(-req_len));
new_status= WSREP_MEMBER_UNDEFINED;
}
else
{
+ assert(sst_req != NULL);
+ *sst_req_len= req_len;
new_status= WSREP_MEMBER_JOINER;
}
}
@@ -299,14 +307,14 @@ static void wsrep_view_handler_cb (void* app_ctx,
{
wsrep_SE_init_grab();
// Signal mysqld init thread to continue
- wsrep_sst_complete (&cluster_uuid, view->seqno, false);
+ wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
// and wait for SE initialization
wsrep_SE_init_wait();
}
else
{
local_uuid= cluster_uuid;
- local_seqno= view->seqno;
+ local_seqno= view->state_id.seqno;
}
/* Init storage engine XIDs from first view */
XID xid;
@@ -319,7 +327,7 @@ static void wsrep_view_handler_cb (void* app_ctx,
if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
{
WSREP_ERROR("Undetected state gap. Can't continue.");
- wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno,
+ wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
&local_uuid, -1);
unireg_abort(1);
}
@@ -331,9 +339,23 @@ static void wsrep_view_handler_cb (void* app_ctx,
global_system_variables.auto_increment_increment= view->memb_num;
}
+ { /* capabilities may be updated on new configuration */
+ uint64_t const caps(wsrep->capabilities (wsrep));
+
+ my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
+ if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
+ {
+ WSREP_WARN("Unsupported protocol downgrade: "
+ "incremental data collection disabled. Expect abort.");
+ }
+ wsrep_incremental_data_collection = idc;
+ }
+
out:
wsrep_startup= FALSE;
local_status.set(new_status, view);
+
+ return WSREP_CB_SUCCESS;
}
void wsrep_ready_set (my_bool x)
@@ -364,14 +386,26 @@ void wsrep_ready_wait ()
static void wsrep_synced_cb(void* app_ctx)
{
WSREP_INFO("Synchronized with group, ready for connections");
+ bool signal_main= false;
if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
if (!wsrep_ready)
{
wsrep_ready= TRUE;
mysql_cond_signal (&COND_wsrep_ready);
+ signal_main= true;
+
}
local_status.set(WSREP_MEMBER_SYNCED);
mysql_mutex_unlock (&LOCK_wsrep_ready);
+
+ if (signal_main)
+ {
+ wsrep_SE_init_grab();
+ // Signal mysqld init thread to continue
+ wsrep_sst_complete (&local_uuid, local_seqno, false);
+ // and wait for SE initialization
+ wsrep_SE_init_wait();
+ }
}
static void wsrep_init_position()
@@ -416,6 +450,8 @@ static void wsrep_init_position()
}
}
+extern const char* my_bind_addr_str;
+
int wsrep_init()
{
int rcode= -1;
@@ -470,7 +506,7 @@ int wsrep_init()
size_t const node_addr_max= sizeof(node_addr) - 1;
if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
{
- size_t const ret= guess_ip(node_addr, node_addr_max);
+ size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
if (!(ret > 0 && ret < node_addr_max))
{
WSREP_WARN("Failed to guess base node address. Set it explicitly via "
@@ -488,38 +524,57 @@ int wsrep_init()
if ((!wsrep_node_incoming_address ||
!strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
{
- size_t const node_addr_len= strlen(node_addr);
- if (node_addr_len > 0)
+ unsigned int my_bind_ip= INADDR_ANY; // default if not set
+ if (my_bind_addr_str && strlen(my_bind_addr_str))
+ {
+ my_bind_ip= wsrep_check_ip(my_bind_addr_str);
+ }
+
+ if (INADDR_ANY != my_bind_ip)
{
- const char* const colon= strrchr(node_addr, ':');
- if (strchr(node_addr, ':') == colon) // 1 or 0 ':'
+ if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
{
- size_t const ip_len= colon ? colon - node_addr : node_addr_len;
- if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
+ snprintf(inc_addr, inc_addr_max, "%s:%u",
+ my_bind_addr_str, (int)mysqld_port);
+ } // else leave inc_addr an empty string - mysqld is not listening for
+ // client connections on network interfaces.
+ }
+ else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible
+ {
+ size_t const node_addr_len= strlen(node_addr);
+ if (node_addr_len > 0)
+ {
+ const char* const colon= strrchr(node_addr, ':');
+ if (strchr(node_addr, ':') == colon) // 1 or 0 ':'
{
- memcpy (inc_addr, node_addr, ip_len);
- snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port);
+ size_t const ip_len= colon ? colon - node_addr : node_addr_len;
+ if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
+ {
+ memcpy (inc_addr, node_addr, ip_len);
+ snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",
+ (int)mysqld_port);
+ }
+ else
+ {
+ WSREP_WARN("Guessing address for incoming client connections: "
+ "address too long.");
+ inc_addr[0]= '\0';
+ }
}
else
{
WSREP_WARN("Guessing address for incoming client connections: "
- "address too long.");
+ "too many colons :) .");
inc_addr[0]= '\0';
}
}
- else
+
+ if (!strlen(inc_addr))
{
- WSREP_WARN("Guessing address for incoming client connections: "
- "too many colons :) .");
- inc_addr[0]= '\0';
+ WSREP_WARN("Guessing address for incoming client connections failed. "
+ "Try setting wsrep_node_incoming_address explicitly.");
}
}
-
- if (!strlen(inc_addr))
- {
- WSREP_WARN("Guessing address for incoming client connections failed. "
- "Try setting wsrep_node_incoming_address explicitly.");
- }
}
else if (!strchr(wsrep_node_incoming_address, ':')) // no port included
{
@@ -546,6 +601,8 @@ int wsrep_init()
struct wsrep_init_args wsrep_args;
+ struct wsrep_gtid const state_id = { local_uuid, local_seqno };
+
wsrep_args.data_dir = wsrep_data_home_dir;
wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
wsrep_args.node_address = node_addr;
@@ -554,13 +611,13 @@ int wsrep_init()
wsrep_provider_options : "";
wsrep_args.proto_ver = wsrep_max_protocol_version;
- wsrep_args.state_uuid = &local_uuid;
- wsrep_args.state_seqno = local_seqno;
+ wsrep_args.state_id = &state_id;
wsrep_args.logger_cb = wsrep_log_cb;
wsrep_args.view_handler_cb = wsrep_view_handler_cb;
wsrep_args.apply_cb = wsrep_apply_cb;
wsrep_args.commit_cb = wsrep_commit_cb;
+ wsrep_args.unordered_cb = wsrep_unordered_cb;
wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
wsrep_args.synced_cb = wsrep_synced_cb;
@@ -661,8 +718,36 @@ void wsrep_stop_replication(THD *thd)
return;
}
-
-extern my_bool wsrep_new_cluster;
+/* This one is set to true when --wsrep-new-cluster is found in the command
+ * line arguments */
+static my_bool wsrep_new_cluster= FALSE;
+#define WSREP_NEW_CLUSTER "--wsrep-new-cluster"
+/* Finds and hides --wsrep-new-cluster from the arguments list
+ * by moving it to the end of the list and decrementing argument count */
+void wsrep_filter_new_cluster (int* argc, char* argv[])
+{
+ int i;
+ for (i= *argc - 1; i > 0; i--)
+ {
+ /* make a copy of the argument to convert possible underscores to hyphens.
+ * the copy need not to be longer than WSREP_NEW_CLUSTER option */
+ char arg[sizeof(WSREP_NEW_CLUSTER) + 2]= { 0, };
+ strncpy(arg, argv[i], sizeof(arg) - 1);
+ char* underscore;
+ while (NULL != (underscore= strchr(arg, '_'))) *underscore= '-';
+
+ if (!strcmp(arg, WSREP_NEW_CLUSTER))
+ {
+ wsrep_new_cluster= TRUE;
+ *argc -= 1;
+ /* preserve the order of remaining arguments AND
+ * preserve the original argument pointers - just in case */
+ char* wnc= argv[i];
+ memmove(&argv[i], &argv[i + 1], (*argc - i)*sizeof(argv[i]));
+ argv[*argc]= wnc; /* this will be invisible to the rest of the program */
+ }
+ }
+}
bool wsrep_start_replication()
{
@@ -686,20 +771,16 @@ bool wsrep_start_replication()
return true;
}
- /* Note 'bootstrap' address is not officially supported in wsrep API #23
- but it can be back ported from #24 provider to get sneak preview of
- bootstrap command
- */
- const char* cluster_address =
- wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address;
+ bool const bootstrap(TRUE == wsrep_new_cluster);
wsrep_new_cluster= FALSE;
WSREP_INFO("Start replication");
if ((rcode = wsrep->connect(wsrep,
wsrep_cluster_name,
- cluster_address,
- wsrep_sst_donor)))
+ wsrep_cluster_address,
+ wsrep_sst_donor,
+ bootstrap)))
{
if (-ESOCKTNOSUPPORT == rcode)
{
@@ -720,11 +801,6 @@ bool wsrep_start_replication()
{
wsrep_connected= TRUE;
- uint64_t caps = wsrep->capabilities (wsrep);
-
- wsrep_incremental_data_collection =
- !!(caps & WSREP_CAP_WRITE_SET_INCREMENTS);
-
char* opts= wsrep->options_get(wsrep);
if (opts)
{
@@ -749,8 +825,8 @@ wsrep_causal_wait (THD* thd)
{
// This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
// TODO: modify to check if thd has locked any rows.
- wsrep_seqno_t seqno;
- wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno);
+ wsrep_gtid_t gtid;
+ wsrep_status_t ret= wsrep->causal_read (wsrep, &gtid);
if (unlikely(WSREP_OK != ret))
{
@@ -798,7 +874,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
{
for (size_t i= 0; i < key_arr->keys_len; ++i)
{
- my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts);
+ my_free((void*)key_arr->keys[i].key_parts);
}
my_free(key_arr->keys);
key_arr->keys= 0;
@@ -818,7 +894,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
static bool wsrep_prepare_key_for_isolation(const char* db,
const char* table,
- wsrep_key_part_t* key,
+ wsrep_buf_t* key,
size_t* key_len)
{
if (*key_len < 2) return false;
@@ -837,13 +913,13 @@ static bool wsrep_prepare_key_for_isolation(const char* db,
// sql_print_information("%s.%s", db, table);
if (db)
{
- key[*key_len].buf= db;
- key[*key_len].buf_len= strlen(db);
+ key[*key_len].ptr= db;
+ key[*key_len].len= strlen(db);
++(*key_len);
if (table)
{
- key[*key_len].buf= table;
- key[*key_len].buf_len= strlen(table);
+ key[*key_len].ptr= table;
+ key[*key_len].len= strlen(table);
++(*key_len);
}
}
@@ -879,23 +955,23 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
{
if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
{
- sql_print_error("Can't allocate memory for key_array");
+ WSREP_ERROR("Can't allocate memory for key_array");
goto err;
}
ka->keys_len= 1;
- if (!(ka->keys[0].key_parts= (wsrep_key_part_t*)
- my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
+ if (!(ka->keys[0].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{
- sql_print_error("Can't allocate memory for key_parts");
+ WSREP_ERROR("Can't allocate memory for key_parts");
goto err;
}
- ka->keys[0].key_parts_len= 2;
+ ka->keys[0].key_parts_num= 2;
if (!wsrep_prepare_key_for_isolation(
db, table,
- (wsrep_key_part_t*)ka->keys[0].key_parts,
- &ka->keys[0].key_parts_len))
+ (wsrep_buf_t*)ka->keys[0].key_parts,
+ &ka->keys[0].key_parts_num))
{
- sql_print_error("Preparing keys for isolation failed");
+ WSREP_ERROR("Preparing keys for isolation failed");
goto err;
}
}
@@ -910,24 +986,24 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0));
if (!tmp)
{
- sql_print_error("Can't allocate memory for key_array");
+ WSREP_ERROR("Can't allocate memory for key_array");
goto err;
}
ka->keys= tmp;
- if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*)
- my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
+ if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{
- sql_print_error("Can't allocate memory for key_parts");
+ WSREP_ERROR("Can't allocate memory for key_parts");
goto err;
}
- ka->keys[ka->keys_len].key_parts_len= 2;
+ ka->keys[ka->keys_len].key_parts_num= 2;
++ka->keys_len;
if (!wsrep_prepare_key_for_isolation(
table->db, table->table_name,
- (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts,
- &ka->keys[ka->keys_len - 1].key_parts_len))
+ (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
+ &ka->keys[ka->keys_len - 1].key_parts_num))
{
- sql_print_error("Preparing keys for isolation failed");
+ WSREP_ERROR("Preparing keys for isolation failed");
goto err;
}
}
@@ -939,12 +1015,11 @@ err:
}
-
bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
- size_t cache_key_len,
+ size_t cache_key_len,
const uchar* row_id,
size_t row_id_len,
- wsrep_key_part_t* key,
+ wsrep_buf_t* key,
size_t* key_len)
{
if (*key_len < 3) return false;
@@ -954,33 +1029,36 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
{
case 0:
{
- key[*key_len].buf = cache_key;
- key[*key_len].buf_len = cache_key_len;
- ++(*key_len);
+ key[0].ptr = cache_key;
+ key[0].len = cache_key_len;
+
+ *key_len = 1;
break;
}
case 1:
case 2:
{
- key[*key_len].buf = cache_key;
- key[*key_len].buf_len = strlen( (char*)cache_key );
- ++(*key_len);
- key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1;
- key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) );
- ++(*key_len);
+ key[0].ptr = cache_key;
+ key[0].len = strlen( (char*)cache_key );
+
+ key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
+ key[1].len = strlen( (char*)(key[1].ptr) );
+
+ *key_len = 2;
break;
}
default:
return false;
}
- key[*key_len].buf = row_id;
- key[*key_len].buf_len = row_id_len;
+ key[*key_len].ptr = row_id;
+ key[*key_len].len = row_id_len;
++(*key_len);
return true;
}
+
/*
* Construct Query_log_Event from thd query and serialize it
* into buffer.
@@ -988,7 +1066,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
* Return 0 in case of success, 1 in case of error.
*/
int wsrep_to_buf_helper(
- THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len)
+ THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
{
IO_CACHE tmp_io_cache;
if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
@@ -999,9 +1077,9 @@ int wsrep_to_buf_helper(
/* if there is prepare query, add event for it */
if (thd->wsrep_TOI_pre_query)
{
- Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
- thd->wsrep_TOI_pre_query_len,
- FALSE, FALSE, FALSE, 0);
+ Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
+ thd->wsrep_TOI_pre_query_len,
+ FALSE, FALSE, FALSE, 0);
if (ev.write(&tmp_io_cache)) ret= 1;
}
@@ -1009,7 +1087,7 @@ int wsrep_to_buf_helper(
Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
if (ev.write(&tmp_io_cache)) ret= 1;
- if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1;
+ if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
close_cached_file(&tmp_io_cache);
return ret;
@@ -1017,7 +1095,7 @@ int wsrep_to_buf_helper(
#include "sql_show.h"
static int
-create_view_query(THD *thd, uchar** buf, uint* buf_len)
+create_view_query(THD *thd, uchar** buf, size_t* buf_len)
{
LEX *lex= thd->lex;
SELECT_LEX *select_lex= &lex->select_lex;
@@ -1036,15 +1114,15 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
if (!lex->definer)
{
/*
- DEFINER-clause is missing; we have to create default definer in
- persistent arena to be PS/SP friendly.
- If this is an ALTER VIEW then the current user should be set as
- the definer.
+ DEFINER-clause is missing; we have to create default definer in
+ persistent arena to be PS/SP friendly.
+ If this is an ALTER VIEW then the current user should be set as
+ the definer.
*/
if (!(lex->definer= create_default_definer(thd)))
{
- WSREP_WARN("view default definer issue");
+ WSREP_WARN("view default definer issue");
}
}
@@ -1071,7 +1149,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
List_iterator_fast<LEX_STRING> names(lex->view_list);
LEX_STRING *name;
int i;
-
+
for (i= 0; (name= names++); i++)
{
buff.append(i ? ", " : "(");
@@ -1082,7 +1160,7 @@ create_view_query(THD *thd, uchar** buf, uint* buf_len)
buff.append(STRING_WITH_LEN(" AS "));
//buff.append(views->source.str, views->source.length);
buff.append(thd->lex->create_view_select.str,
- thd->lex->create_view_select.length);
+ thd->lex->create_view_select.length);
//int errcode= query_error_code(thd, TRUE);
//if (thd->binlog_query(THD::STMT_QUERY_TYPE,
// buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
@@ -1094,11 +1172,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
{
wsrep_status_t ret(WSREP_WARNING);
uchar* buf(0);
- uint buf_len(0);
+ size_t buf_len(0);
int buf_err;
- WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode, thd->query() );
+ WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode, thd->query() );
switch (thd->lex->sql_command)
{
case SQLCOM_CREATE_VIEW:
@@ -1121,19 +1199,20 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
}
wsrep_key_arr_t key_arr= {0, 0};
+ struct wsrep_buf buff = { buf, buf_len };
if (!buf_err &&
wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&&
WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
key_arr.keys, key_arr.keys_len,
- buf, buf_len,
- &thd->wsrep_trx_seqno)))
+ &buff, 1,
+ &thd->wsrep_trx_meta)))
{
thd->wsrep_exec_mode= TOTAL_ORDER;
wsrep_to_isolation++;
if (buf) my_free(buf);
wsrep_keys_free(&key_arr);
- WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode);
+ WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode);
}
else {
/* jump to error handler in mysql_execute_command() */
@@ -1152,10 +1231,11 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
static void wsrep_TOI_end(THD *thd) {
wsrep_status_t ret;
wsrep_to_isolation--;
- WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+
+ WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void");
if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
- WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno);
+ WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
}
else {
WSREP_WARN("TO isolation end failed for: %d, sql: %s",
@@ -1166,7 +1246,7 @@ static void wsrep_TOI_end(THD *thd) {
static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
{
wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() );
ret = wsrep->desync(wsrep);
@@ -1211,7 +1291,7 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
static void wsrep_RSU_end(THD *thd)
{
wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
+ WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() );
@@ -1255,13 +1335,27 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
- DBUG_ASSERT(thd->wsrep_trx_seqno == WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
if (wsrep_debug && thd->mdl_context.has_locks())
{
WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu",
thd->query(), thd->thread_id);
}
+
+ /*
+ It makes sense to set auto_increment_* to defaults in TOI operations.
+ Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
+ TOI statement and auto inc variables for wsrep replication is constructed
+ there. Variables are reset back in THD::reset_for_next_command() before
+ processing of next command.
+ */
+ if (wsrep_auto_increment_control)
+ {
+ thd->variables.auto_increment_offset = 1;
+ thd->variables.auto_increment_increment = 1;
+ }
+
if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
{
switch (wsrep_OSU_method_options) {
@@ -1272,12 +1366,6 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
if (!ret)
{
thd->wsrep_exec_mode= TOTAL_ORDER;
- /* It makes sense to set auto_increment_* to defaults in TOI operations */
- if (wsrep_auto_increment_control)
- {
- thd->variables.auto_increment_offset = 1;
- thd->variables.auto_increment_increment = 1;
- }
}
}
return ret;
@@ -1302,10 +1390,10 @@ void wsrep_to_isolation_end(THD *thd)
"request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
"granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
msg, \
- req->thread_id, (long long)req->wsrep_trx_seqno, \
+ req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
req->command, req->lex->sql_command, req->query(), \
- gra->thread_id, (long long)gra->wsrep_trx_seqno, \
+ gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
gra->command, gra->lex->sql_command, gra->query());
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 619711cc4b0..815990ba9d4 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -1,4 +1,4 @@
-/* Copyright 2008-2012 Codership Oy <http://www.codership.com>
+/* Copyright 2008-2013 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -26,23 +26,22 @@ typedef struct st_mysql_show_var SHOW_VAR;
class set_var;
class THD;
-#ifdef WITH_WSREP
-#include "../wsrep/wsrep_api.h"
-//#include "wsrep_mysqld.h"
- enum wsrep_exec_mode {
+enum wsrep_exec_mode {
LOCAL_STATE,
REPL_RECV,
TOTAL_ORDER,
LOCAL_COMMIT
- };
- enum wsrep_query_state {
+};
+
+enum wsrep_query_state {
QUERY_IDLE,
QUERY_EXEC,
QUERY_COMMITTING,
QUERY_EXITING,
QUERY_ROLLINGBACK,
- };
- enum wsrep_conflict_state {
+};
+
+enum wsrep_conflict_state {
NO_CONFLICT,
MUST_ABORT,
ABORTING,
@@ -51,13 +50,14 @@ class THD;
REPLAYING,
RETRY_AUTOCOMMIT,
CERT_FAILURE,
- };
- enum wsrep_consistency_check_mode {
+};
+
+enum wsrep_consistency_check_mode {
NO_CONSISTENCY_CHECK,
CONSISTENCY_CHECK_DECLARED,
CONSISTENCY_CHECK_RUNNING,
- };
-#endif
+};
+
// Global wsrep parameters
extern wsrep_t* wsrep;
@@ -73,20 +73,16 @@ extern const char* wsrep_node_incoming_address;
extern const char* wsrep_data_home_dir;
extern const char* wsrep_dbug_option;
extern long wsrep_slave_threads;
-extern my_bool wsrep_debug;
+extern int wsrep_slave_count_change;
+extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug;
extern my_bool wsrep_convert_LOCK_to_trx;
extern ulong wsrep_retry_autocommit;
extern my_bool wsrep_auto_increment_control;
extern my_bool wsrep_drupal_282555_workaround;
extern my_bool wsrep_incremental_data_collection;
-extern const char* wsrep_sst_method;
-extern const char* wsrep_sst_receive_address;
-extern char* wsrep_sst_auth;
-extern const char* wsrep_sst_donor;
-extern my_bool wsrep_sst_donor_rejects_queries;
extern const char* wsrep_start_position;
-extern long long wsrep_max_ws_size;
-extern long wsrep_max_ws_rows;
+extern ulong wsrep_max_ws_size;
+extern ulong wsrep_max_ws_rows;
extern const char* wsrep_notify_cmd;
extern my_bool wsrep_certify_nonPK;
extern long wsrep_max_protocol_version;
@@ -98,7 +94,6 @@ extern my_bool wsrep_recovery;
extern my_bool wsrep_replicate_myisam;
extern my_bool wsrep_log_conflicts;
extern ulong wsrep_mysql_replication_bundle;
-extern ulong wsrep_mysql_replication_bundle;
extern my_bool wsrep_load_data_splitting;
enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU };
@@ -111,81 +106,29 @@ extern long long wsrep_cluster_conf_id;
extern const char* wsrep_cluster_status;
extern long wsrep_cluster_size;
extern long wsrep_local_index;
+extern long long wsrep_local_bf_aborts;
extern const char* wsrep_provider_name;
extern const char* wsrep_provider_version;
extern const char* wsrep_provider_vendor;
-extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff);
-extern void wsrep_free_status(THD *thd);
// Other wsrep global variables
extern my_bool wsrep_inited; // whether wsrep is initialized ?
-#define WSREP_SST_ADDRESS_AUTO "AUTO"
-#define WSREP_NODE_INCOMING_AUTO "AUTO"
-
-// MySQL variables funcs
-
-#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var)
-#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type)
-#define DEFAULT_ARGS (THD* thd, enum_var_type var_type)
-#define INIT_ARGS (const char* opt)
-
-extern int wsrep_init_vars();
-
-extern bool wsrep_on_update UPDATE_ARGS;
-extern void wsrep_causal_reads_update UPDATE_ARGS;
-extern bool wsrep_start_position_check CHECK_ARGS;
-extern bool wsrep_start_position_update UPDATE_ARGS;
-extern void wsrep_start_position_init INIT_ARGS;
-
-extern bool wsrep_provider_check CHECK_ARGS;
-extern bool wsrep_provider_update UPDATE_ARGS;
-extern void wsrep_provider_init INIT_ARGS;
-
-extern bool wsrep_provider_options_check CHECK_ARGS;
-extern bool wsrep_provider_options_update UPDATE_ARGS;
-extern void wsrep_provider_options_init INIT_ARGS;
-
-extern bool wsrep_cluster_address_check CHECK_ARGS;
-extern bool wsrep_cluster_address_update UPDATE_ARGS;
-extern void wsrep_cluster_address_init INIT_ARGS;
-
-extern bool wsrep_cluster_name_check CHECK_ARGS;
-extern bool wsrep_cluster_name_update UPDATE_ARGS;
-
-extern bool wsrep_node_name_check CHECK_ARGS;
-extern bool wsrep_node_name_update UPDATE_ARGS;
-
-extern bool wsrep_node_address_check CHECK_ARGS;
-extern bool wsrep_node_address_update UPDATE_ARGS;
-extern void wsrep_node_address_init INIT_ARGS;
+int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff);
+void wsrep_free_status(THD *thd);
-extern bool wsrep_sst_method_check CHECK_ARGS;
-extern bool wsrep_sst_method_update UPDATE_ARGS;
-extern void wsrep_sst_method_init INIT_ARGS;
-
-extern bool wsrep_sst_receive_address_check CHECK_ARGS;
-extern bool wsrep_sst_receive_address_update UPDATE_ARGS;
-
-extern bool wsrep_sst_auth_check CHECK_ARGS;
-extern bool wsrep_sst_auth_update UPDATE_ARGS;
-extern void wsrep_sst_auth_init INIT_ARGS;
-
-extern bool wsrep_sst_donor_check CHECK_ARGS;
-extern bool wsrep_sst_donor_update UPDATE_ARGS;
-
-extern bool wsrep_slave_threads_check CHECK_ARGS;
-extern bool wsrep_slave_threads_update UPDATE_ARGS;
-
-extern bool wsrep_desync_check CHECK_ARGS;
-extern bool wsrep_desync_update UPDATE_ARGS;
-
-extern bool wsrep_before_SE(); // initialize wsrep before storage
- // engines (true) or after (false)
-extern int wsrep_init();
-extern void wsrep_deinit();
-extern void wsrep_recover();
+/* Filters out --wsrep-new-cluster oprtion from argv[]
+ * should be called in the very beginning of main() */
+void wsrep_filter_new_cluster (int* argc, char* argv[]);
+int wsrep_init();
+void wsrep_deinit();
+void wsrep_recover();
+bool wsrep_before_SE(); // initialize wsrep before storage
+ // engines (true) or after (false)
+/* wsrep initialization sequence at startup
+ * @param before wsrep_before_SE() value */
+void wsrep_init_startup(bool before);
extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd);
@@ -194,18 +137,18 @@ extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
extern "C" const char * wsrep_thd_exec_mode_str(THD *thd);
extern "C" const char * wsrep_thd_conflict_state_str(THD *thd);
extern "C" const char * wsrep_thd_query_state_str(THD *thd);
-extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd);
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd);
extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
extern "C" void wsrep_thd_set_query_state(
- THD *thd, enum wsrep_query_state state);
+ THD *thd, enum wsrep_query_state state);
extern "C" void wsrep_thd_set_conflict_state(
- THD *thd, enum wsrep_conflict_state state);
+ THD *thd, enum wsrep_conflict_state state);
extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id);
-extern "C"void wsrep_thd_LOCK(THD *thd);
-extern "C"void wsrep_thd_UNLOCK(THD *thd);
+extern "C" void wsrep_thd_LOCK(THD *thd);
+extern "C" void wsrep_thd_UNLOCK(THD *thd);
extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd);
extern "C" time_t wsrep_thd_query_start(THD *thd);
extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
@@ -217,18 +160,11 @@ extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id);
extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal);
-
-/* wsrep initialization sequence at startup
- * @param first wsrep_before_SE() value */
-extern void wsrep_init_startup(bool before);
-
extern void wsrep_close_client_connections(my_bool wait_to_end);
extern int wsrep_wait_committing_connections_close(int wait_time);
extern void wsrep_close_applier(THD *thd);
extern void wsrep_wait_appliers_close(THD *thd);
extern void wsrep_close_applier_threads(int count);
-extern void wsrep_create_appliers(long threads = wsrep_slave_threads);
-extern void wsrep_create_rollbacker();
extern void wsrep_kill_mysql(THD *thd);
/* new defines */
@@ -285,32 +221,20 @@ extern wsrep_seqno_t wsrep_locked_seqno;
WSREP_LOG(sql_print_information, "cluster conflict due to %s for threads:",\
(bf_abort) ? "high priority abort" : "certification failure" \
); \
- if (bf_thd != NULL) WSREP_LOG_CONFLICT_THD(bf_thd, "Winning thread"); \
+ if (bf_thd) WSREP_LOG_CONFLICT_THD(bf_thd, "Winning thread"); \
if (victim_thd) WSREP_LOG_CONFLICT_THD(victim_thd, "Victim thread"); \
}
#define WSREP_PROVIDER_EXISTS \
(wsrep_provider && strncasecmp(wsrep_provider, WSREP_NONE, FN_REFLEN))
-/*! Synchronizes applier thread start with init thread */
-extern void wsrep_sst_grab();
-/*! Init thread waits for SST completion */
-extern bool wsrep_sst_wait();
-/*! Signals wsrep that initialization is complete, writesets can be applied */
-extern void wsrep_sst_continue();
-
-extern void wsrep_SE_init_grab(); /*! grab init critical section */
-extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */
-extern void wsrep_SE_init_done(); /*! signal that SE init is complte */
-extern void wsrep_SE_initialized(); /*! mark SE initialization complete */
-
extern void wsrep_ready_wait();
enum wsrep_trx_status {
WSREP_TRX_OK,
WSREP_TRX_ROLLBACK,
WSREP_TRX_ERROR,
- };
+};
extern enum wsrep_trx_status
wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
@@ -318,17 +242,10 @@ class Ha_trx_info;
struct THD_TRANS;
void wsrep_register_hton(THD* thd, bool all);
void wsrep_post_commit(THD* thd, bool all);
-void wsrep_replication_process(THD *thd);
-void wsrep_rollback_process(THD *thd);
void wsrep_brute_force_killer(THD *thd);
int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id);
+
extern "C" bool wsrep_consistency_check(void *thd_ptr);
-extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
-extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
- my_bool signal);
-extern "C" int wsrep_thd_in_locking_session(void *thd_ptr);
-void *wsrep_prepare_bf_thd(THD *thd);
-void wsrep_return_from_bf_mode(void *shadow, THD *thd);
/* this is visible for client build so that innodb plugin gets this */
typedef struct wsrep_aborting_thd {
@@ -337,29 +254,21 @@ typedef struct wsrep_aborting_thd {
} *wsrep_aborting_thd_t;
extern mysql_mutex_t LOCK_wsrep_ready;
-extern mysql_cond_t COND_wsrep_ready;
+extern mysql_cond_t COND_wsrep_ready;
extern mysql_mutex_t LOCK_wsrep_sst;
-extern mysql_cond_t COND_wsrep_sst;
+extern mysql_cond_t COND_wsrep_sst;
extern mysql_mutex_t LOCK_wsrep_sst_init;
-extern mysql_cond_t COND_wsrep_sst_init;
+extern mysql_cond_t COND_wsrep_sst_init;
extern mysql_mutex_t LOCK_wsrep_rollback;
-extern mysql_cond_t COND_wsrep_rollback;
+extern mysql_cond_t COND_wsrep_rollback;
extern int wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_replaying;
-extern mysql_cond_t COND_wsrep_replaying;
-extern wsrep_aborting_thd_t wsrep_aborting_thd;
-extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug;
-extern my_bool wsrep_convert_LOCK_to_trx;
-extern ulong wsrep_retry_autocommit;
-extern my_bool wsrep_emulate_bin_log;
-extern my_bool wsrep_auto_increment_control;
-extern my_bool wsrep_drupal_282555_workaround;
-extern long long wsrep_max_ws_size;
-extern long wsrep_max_ws_rows;
-extern int wsrep_to_isolation;
-extern my_bool wsrep_certify_nonPK;
+extern mysql_cond_t COND_wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_desync;
+extern wsrep_aborting_thd_t wsrep_aborting_thd;
+extern my_bool wsrep_emulate_bin_log;
+extern int wsrep_to_isolation;
extern PSI_mutex_key key_LOCK_wsrep_ready;
extern PSI_mutex_key key_COND_wsrep_ready;
@@ -381,13 +290,11 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
const TABLE_LIST* table_list);
void wsrep_to_isolation_end(THD *thd);
void wsrep_cleanup_transaction(THD *thd);
-void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*);
-void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*);
int wsrep_to_buf_helper(
- THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len);
-int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len);
-int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len);
-int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len);
+ THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len);
+int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len);
+int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
+int wsrep_create_event_query(THD *thd, uchar** buf, size_t* buf_len);
const wsrep_uuid_t* wsrep_cluster_uuid();
struct xid_t;
diff --git a/sql/wsrep_notify.cc b/sql/wsrep_notify.cc
index ff997d01183..291cdbb7c75 100644
--- a/sql/wsrep_notify.cc
+++ b/sql/wsrep_notify.cc
@@ -15,6 +15,7 @@
#include <mysqld.h>
#include "wsrep_priv.h"
+#include "wsrep_utils.h"
const char* wsrep_notify_cmd="";
@@ -64,7 +65,7 @@ void wsrep_notify_status (wsrep_member_status_t status,
{
char uuid_str[40];
- wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str));
+ wsrep_uuid_print (&view->state_id.uuid, uuid_str, sizeof(uuid_str));
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
" --uuid %s", uuid_str);
diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h
index 700639ebcb1..93640fbcc03 100644
--- a/sql/wsrep_priv.h
+++ b/sql/wsrep_priv.h
@@ -26,208 +26,26 @@
#include <pthread.h>
#include <cstdio>
-extern void wsrep_ready_set (my_bool x);
+void wsrep_ready_set (my_bool x);
-extern ssize_t wsrep_sst_prepare (void** msg);
-extern int wsrep_sst_donate_cb (void* app_ctx,
- void* recv_ctx,
- const void* msg, size_t msg_len,
- const wsrep_uuid_t* current_uuid,
- wsrep_seqno_t current_seqno,
- const char* state, size_t state_len,
- bool bypass);
-
-extern size_t guess_ip (char* buf, size_t buf_len);
-extern size_t guess_address(char* buf, size_t buf_len);
+ssize_t wsrep_sst_prepare (void** msg);
+wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx,
+ void* recv_ctx,
+ const void* msg, size_t msg_len,
+ const wsrep_gtid_t* state_id,
+ const char* state, size_t state_len,
+ bool bypass);
extern wsrep_uuid_t local_uuid;
extern wsrep_seqno_t local_seqno;
+// a helper function
+void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t,
+ const void*, size_t);
/*! SST thread signals init thread about sst completion */
-extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool);
-
-extern void wsrep_notify_status (wsrep_member_status_t new_status,
- const wsrep_view_info_t* view = 0);
-
-namespace wsp {
-class node_status
-{
-public:
- node_status() : status(WSREP_MEMBER_UNDEFINED) {}
- void set(wsrep_member_status_t new_status,
- const wsrep_view_info_t* view = 0)
- {
- if (status != new_status || 0 != view)
- {
- wsrep_notify_status(new_status, view);
- status = new_status;
- }
- }
- wsrep_member_status_t get() const { return status; }
-private:
- wsrep_member_status_t status;
-};
-} /* namespace wsp */
-
-extern wsp::node_status local_status;
-
-namespace wsp {
-/* A small class to run external programs. */
-class process
-{
-private:
- const char* const str_;
- FILE* io_;
- int err_;
- pid_t pid_;
-
-public:
-/*! @arg type is a pointer to a null-terminated string which must contain
- either the letter 'r' for reading or the letter 'w' for writing.
- */
- process (const char* cmd, const char* type);
- ~process ();
-
- FILE* pipe () { return io_; }
- int error() { return err_; }
- int wait ();
- const char* cmd() { return str_; }
-};
-#ifdef REMOVED
-class lock
-{
- pthread_mutex_t* const mtx_;
-
-public:
-
- lock (pthread_mutex_t* mtx) : mtx_(mtx)
- {
- int err = pthread_mutex_lock (mtx_);
-
- if (err)
- {
- WSREP_ERROR("Mutex lock failed: %s", strerror(err));
- abort();
- }
- }
-
- virtual ~lock ()
- {
- int err = pthread_mutex_unlock (mtx_);
-
- if (err)
- {
- WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
- abort();
- }
- }
-
- inline void wait (pthread_cond_t* cond)
- {
- pthread_cond_wait (cond, mtx_);
- }
-
-private:
-
- lock (const lock&);
- lock& operator=(const lock&);
-
-};
-
-class monitor
-{
- int mutable refcnt;
- pthread_mutex_t mutable mtx;
- pthread_cond_t mutable cond;
-
-public:
-
- monitor() : refcnt(0)
- {
- pthread_mutex_init (&mtx, NULL);
- pthread_cond_init (&cond, NULL);
- }
-
- ~monitor()
- {
- pthread_mutex_destroy (&mtx);
- pthread_cond_destroy (&cond);
- }
-
- void enter() const
- {
- lock l(&mtx);
-
- while (refcnt)
- {
- l.wait(&cond);
- }
- refcnt++;
- }
-
- void leave() const
- {
- lock l(&mtx);
-
- refcnt--;
- if (refcnt == 0)
- {
- pthread_cond_signal (&cond);
- }
- }
-
-private:
-
- monitor (const monitor&);
- monitor& operator= (const monitor&);
-};
-
-class critical
-{
- const monitor& mon;
-
-public:
-
- critical(const monitor& m) : mon(m) { mon.enter(); }
-
- ~critical() { mon.leave(); }
-
-private:
-
- critical (const critical&);
- critical& operator= (const critical&);
-};
-#endif
-
-class thd
-{
- class thd_init
- {
- public:
- thd_init() { my_thread_init(); }
- ~thd_init() { my_thread_end(); }
- }
- init;
-
- thd (const thd&);
- thd& operator= (const thd&);
-
-public:
-
- thd(my_bool wsrep_on);
- ~thd();
- THD* const ptr;
-};
+void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool);
-class string
-{
-public:
- string() : string_(0) {}
- void set(char* str) { if (string_) free (string_); string_ = str; }
- ~string() { set (0); }
-private:
- char* string_;
-};
+void wsrep_notify_status (wsrep_member_status_t new_status,
+ const wsrep_view_info_t* view = 0);
-} // namespace wsrep
#endif /* WSREP_PRIV_H */
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc
index 7afdb4909e4..ec636a8b1ec 100644
--- a/sql/wsrep_sst.cc
+++ b/sql/wsrep_sst.cc
@@ -13,6 +13,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include "wsrep_sst.h"
+
#include <mysqld.h>
#include <m_ctype.h>
#include <my_sys.h>
@@ -23,6 +25,7 @@
#include <sql_reload.h>
#include <sql_parse.h>
#include "wsrep_priv.h"
+#include "wsrep_utils.h"
#include <cstdio>
#include <cstdlib>
@@ -61,7 +64,6 @@ const char* wsrep_sst_donor = "";
// container for real auth string
static const char* sst_auth_real = NULL;
-
my_bool wsrep_sst_donor_rejects_queries = FALSE;
bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
@@ -214,8 +216,8 @@ bool wsrep_sst_wait ()
// Signal end of SST
void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
- wsrep_seqno_t sst_seqno,
- bool needed)
+ wsrep_seqno_t sst_seqno,
+ bool needed)
{
if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
if (!sst_complete)
@@ -228,18 +230,37 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
}
else
{
- WSREP_WARN("Nobody is waiting for SST.");
+ /* This can happen when called from wsrep_synced_cb().
+ At the moment there is no way to check there
+ if main thread is still waiting for signal,
+ so wsrep_sst_complete() is called from there
+ each time wsrep_ready changes from FALSE -> TRUE.
+ */
+ WSREP_DEBUG("Nobody is waiting for SST.");
}
mysql_mutex_unlock (&LOCK_wsrep_sst);
}
+void wsrep_sst_received (wsrep_t* const wsrep,
+ const wsrep_uuid_t* const uuid,
+ wsrep_seqno_t const seqno,
+ const void* const state,
+ size_t const state_len)
+{
+ int const rcode(seqno < 0 ? seqno : 0);
+ wsrep_gtid_t const state_id = {
+ *uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
+ };
+ wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
+}
+
// Let applier threads to continue
void wsrep_sst_continue ()
{
if (sst_needed)
{
WSREP_INFO("Signalling provider to continue.");
- wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
+ wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
}
}
@@ -519,7 +540,7 @@ ssize_t wsrep_sst_prepare (void** msg)
}
else
{
- ssize_t ret= guess_ip (ip_buf, ip_max);
+ ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
if (ret && ret < ip_max)
{
@@ -707,7 +728,9 @@ static int sst_donate_mysqldump (const char* addr,
ret= sst_run_shell (cmd_str, 3);
}
- wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno);
+ wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
+
+ wsrep->sst_sent (wsrep, &state_id, ret);
return ret;
}
@@ -927,7 +950,10 @@ wait_signal:
}
// signal to donor that SST is over
- wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno);
+ struct wsrep_gtid const state_id = {
+ ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
+ };
+ wsrep->sst_sent (wsrep, &state_id, -err);
proc.wait();
return NULL;
@@ -981,12 +1007,11 @@ static int sst_donate_other (const char* method,
return arg.err;
}
-int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
- const void* msg, size_t msg_len,
- const wsrep_uuid_t* current_uuid,
- wsrep_seqno_t current_seqno,
- const char* state, size_t state_len,
- bool bypass)
+wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
+ const void* msg, size_t msg_len,
+ const wsrep_gtid_t* current_gtid,
+ const char* state, size_t state_len,
+ bool bypass)
{
/* This will be reset when sync callback is called.
* Should we set wsrep_ready to FALSE here too? */
@@ -998,20 +1023,20 @@ int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
const char* data = method + method_len + 1;
char uuid_str[37];
- wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str));
+ wsrep_uuid_print (&current_gtid->uuid, uuid_str, sizeof(uuid_str));
int ret;
if (!strcmp (WSREP_SST_MYSQLDUMP, method))
{
- ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno,
- bypass);
+ ret = sst_donate_mysqldump(data, &current_gtid->uuid, uuid_str,
+ current_gtid->seqno, bypass);
}
else
{
- ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass);
+ ret = sst_donate_other(method, data, uuid_str, current_gtid->seqno,bypass);
}
- return (ret > 0 ? 0 : ret);
+ return (ret > 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
}
void wsrep_SE_init_grab()
@@ -1021,7 +1046,10 @@ void wsrep_SE_init_grab()
void wsrep_SE_init_wait()
{
- mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init);
+ while (SE_initialized == false)
+ {
+ mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init);
+ }
mysql_mutex_unlock (&LOCK_wsrep_sst_init);
}
diff --git a/sql/wsrep_sst.h b/sql/wsrep_sst.h
new file mode 100644
index 00000000000..b7f0e26f226
--- /dev/null
+++ b/sql/wsrep_sst.h
@@ -0,0 +1,40 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_SST_H
+#define WSREP_SST_H
+
+#include <mysql.h> // my_bool
+
+/* system variables */
+extern const char* wsrep_sst_method;
+extern const char* wsrep_sst_receive_address;
+extern const char* wsrep_sst_donor;
+extern char* wsrep_sst_auth;
+extern my_bool wsrep_sst_donor_rejects_queries;
+
+/*! Synchronizes applier thread start with init thread */
+extern void wsrep_sst_grab();
+/*! Init thread waits for SST completion */
+extern bool wsrep_sst_wait();
+/*! Signals wsrep that initialization is complete, writesets can be applied */
+extern void wsrep_sst_continue();
+
+extern void wsrep_SE_init_grab(); /*! grab init critical section */
+extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */
+extern void wsrep_SE_init_done(); /*! signal that SE init is complte */
+extern void wsrep_SE_initialized(); /*! mark SE initialization complete */
+
+#endif /* WSREP_SST_H */
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
new file mode 100644
index 00000000000..d9c30e501e6
--- /dev/null
+++ b/sql/wsrep_thd.cc
@@ -0,0 +1,464 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#include "wsrep_thd.h"
+
+#include "transaction.h"
+#include "rpl_rli.h"
+#include "log_event.h"
+#include "sql_parse.h"
+#include "slave.h" // opt_log_slave_updates
+#include "sql_base.h" // close_thread_tables()
+#include "mysqld.h" // start_wsrep_THD();
+
+static long long wsrep_bf_aborts_counter = 0;
+
+int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff)
+{
+ wsrep_local_bf_aborts = my_atomic_load64(&wsrep_bf_aborts_counter);
+ var->type = SHOW_LONGLONG;
+ var->value = (char*)&wsrep_local_bf_aborts;
+ return 0;
+}
+
+/* must have (&thd->LOCK_wsrep_thd) */
+void wsrep_client_rollback(THD *thd)
+{
+ WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s",
+ thd->thread_id, thd->query());
+
+ my_atomic_add64(&wsrep_bf_aborts_counter, 1);
+
+ thd->wsrep_conflict_state= ABORTING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ trans_rollback(thd);
+
+ if (thd->locked_tables_mode && thd->lock)
+ {
+ WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
+ }
+
+ if (thd->global_read_lock.is_acquired())
+ {
+ WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id);
+ thd->global_read_lock.unlock_global_read_lock(thd);
+ }
+
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+
+ /* release explicit MDL locks */
+ thd->mdl_context.release_explicit_locks();
+
+ if (thd->get_binlog_table_maps())
+ {
+ WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id);
+ thd->clear_binlog_table_maps();
+ }
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_conflict_state= ABORTED;
+}
+
+static Relay_log_info* wsrep_relay_log_init(const char* log_fname)
+{
+ Relay_log_info* rli= new Relay_log_info(false);
+
+ rli->no_storage= true;
+ if (!rli->relay_log.description_event_for_exec)
+ {
+ rli->relay_log.description_event_for_exec=
+ new Format_description_log_event(4);
+ }
+
+ rli->sql_thd= current_thd;
+ return rli;
+}
+
+static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
+{
+ shadow->options = thd->variables.option_bits;
+ shadow->server_status = thd->server_status;
+ shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
+ shadow->vio = thd->net.vio;
+
+ if (opt_log_slave_updates)
+ thd->variables.option_bits|= OPTION_BIN_LOG;
+ else
+ thd->variables.option_bits&= ~(OPTION_BIN_LOG);
+
+ if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay");
+
+ thd->wsrep_exec_mode= REPL_RECV;
+ thd->net.vio= 0;
+ thd->clear_error();
+
+ shadow->tx_isolation = thd->variables.tx_isolation;
+ thd->variables.tx_isolation = ISO_READ_COMMITTED;
+ thd->tx_isolation = ISO_READ_COMMITTED;
+
+ shadow->db = thd->db;
+ shadow->db_length = thd->db_length;
+ thd->reset_db(NULL, 0);
+}
+
+static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
+{
+ thd->variables.option_bits = shadow->options;
+ thd->server_status = shadow->server_status;
+ thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
+ thd->net.vio = shadow->vio;
+ thd->variables.tx_isolation = shadow->tx_isolation;
+ thd->reset_db(shadow->db, shadow->db_length);
+}
+
+void wsrep_replay_transaction(THD *thd)
+{
+ /* checking if BF trx must be replayed */
+ if (thd->wsrep_conflict_state== MUST_REPLAY) {
+ if (thd->wsrep_exec_mode!= REPL_RECV) {
+ if (thd->stmt_da->is_sent)
+ {
+ WSREP_ERROR("replay issue, thd has reported status already");
+ }
+ thd->stmt_da->reset_diagnostics_area();
+
+ thd->wsrep_conflict_state= REPLAYING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ mysql_reset_thd_for_next_command(thd);
+ thd->killed= NOT_KILLED;
+ close_thread_tables(thd);
+ if (thd->locked_tables_mode && thd->lock)
+ {
+ WSREP_DEBUG("releasing table lock for replaying (%ld)",
+ thd->thread_id);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
+ }
+ thd->mdl_context.release_transactional_locks();
+
+ thd_proc_info(thd, "wsrep replaying trx");
+ WSREP_DEBUG("replay trx: %s %lld",
+ thd->query() ? thd->query() : "void",
+ (long long)wsrep_thd_trx_seqno(thd));
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(thd, &shadow);
+
+ /* From trans_begin() */
+ thd->variables.option_bits|= OPTION_BEGIN;
+ thd->server_status|= SERVER_STATUS_IN_TRANS;
+
+ int rcode = wsrep->replay_trx(wsrep,
+ &thd->wsrep_ws_handle,
+ (void *)thd);
+
+ wsrep_return_from_bf_mode(thd, &shadow);
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+
+ switch (rcode)
+ {
+ case WSREP_OK:
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
+ WSREP_DEBUG("trx_replay successful for: %ld %llu",
+ thd->thread_id, (long long)thd->real_id);
+ if (thd->stmt_da->is_sent)
+ {
+ WSREP_WARN("replay ok, thd has reported status");
+ }
+ else if (thd->stmt_da->is_set())
+ {
+ if (thd->stmt_da->status() != Diagnostics_area::DA_OK)
+ {
+ WSREP_WARN("replay ok, thd has error status %d",
+ thd->stmt_da->status());
+ }
+ }
+ else
+ {
+ my_ok(thd);
+ }
+ break;
+ case WSREP_TRX_FAIL:
+ if (thd->stmt_da->is_sent)
+ {
+ WSREP_ERROR("replay failed, thd has reported status");
+ }
+ else
+ {
+ WSREP_DEBUG("replay failed, rolling back");
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
+ }
+ thd->wsrep_conflict_state= ABORTED;
+ wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
+ break;
+ default:
+ WSREP_ERROR("trx_replay failed for: %d, query: %s",
+ rcode, thd->query() ? thd->query() : "void");
+ /* we're now in inconsistent state, must abort */
+ unireg_abort(1);
+ break;
+ }
+
+ wsrep_cleanup_transaction(thd);
+
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying--;
+ WSREP_DEBUG("replaying decreased: %d, thd: %lu",
+ wsrep_replaying, thd->thread_id);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+ }
+}
+
+static void wsrep_replication_process(THD *thd)
+{
+ int rcode;
+ DBUG_ENTER("wsrep_replication_process");
+
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(thd, &shadow);
+
+ /* From trans_begin() */
+ thd->variables.option_bits|= OPTION_BEGIN;
+ thd->server_status|= SERVER_STATUS_IN_TRANS;
+
+ rcode = wsrep->recv(wsrep, (void *)thd);
+ DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
+
+ WSREP_INFO("applier thread exiting (code:%d)", rcode);
+
+ switch (rcode) {
+ case WSREP_OK:
+ case WSREP_NOT_IMPLEMENTED:
+ case WSREP_CONN_FAIL:
+ /* provider does not support slave operations / disconnected from group,
+ * just close applier thread */
+ break;
+ case WSREP_NODE_FAIL:
+ /* data inconsistency => SST is needed */
+ /* Note: we cannot just blindly restart replication here,
+ * SST might require server restart if storage engines must be
+ * initialized after SST */
+ WSREP_ERROR("node consistency compromised, aborting");
+ wsrep_kill_mysql(thd);
+ break;
+ case WSREP_WARNING:
+ case WSREP_TRX_FAIL:
+ case WSREP_TRX_MISSING:
+ /* these suggests a bug in provider code */
+ WSREP_WARN("bad return from recv() call: %d", rcode);
+ /* fall through to node shutdown */
+ case WSREP_FATAL:
+ /* Cluster connectivity is lost.
+ *
+ * If applier was killed on purpose (KILL_CONNECTION), we
+ * avoid mysql shutdown. This is because the killer will then handle
+ * shutdown processing (or replication restarting)
+ */
+ if (thd->killed != KILL_CONNECTION)
+ {
+ wsrep_kill_mysql(thd);
+ }
+ break;
+ }
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_close_applier(thd);
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ if (thd->temporary_tables)
+ {
+ WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id);
+ }
+ wsrep_return_from_bf_mode(thd, &shadow);
+ DBUG_VOID_RETURN;
+}
+
+void wsrep_create_appliers(long threads)
+{
+ if (!wsrep_connected)
+ {
+ /* see wsrep_replication_start() for the logic */
+ if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
+ wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ WSREP_ERROR("Trying to launch slave threads before creating "
+ "connection at '%s'", wsrep_cluster_address);
+ assert(0);
+ }
+ return;
+ }
+
+ long wsrep_threads=0;
+ pthread_t hThread;
+ while (wsrep_threads++ < threads) {
+ if (pthread_create(
+ &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_replication_process))
+ WSREP_WARN("Can't create thread to manage wsrep replication");
+ }
+}
+
+static void wsrep_rollback_process(THD *thd)
+{
+ DBUG_ENTER("wsrep_rollback_process");
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ wsrep_aborting_thd= NULL;
+
+ while (thd->killed == NOT_KILLED) {
+ thd_proc_info(thd, "wsrep aborter idle");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
+ thd->mysys_var->current_cond= &COND_wsrep_rollback;
+
+ mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
+
+ WSREP_DEBUG("WSREP rollback thread wakes for signal");
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep aborter active");
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ /* check for false alarms */
+ if (!wsrep_aborting_thd)
+ {
+ WSREP_DEBUG("WSREP rollback thread has empty abort queue");
+ }
+ /* process all entries in the queue */
+ while (wsrep_aborting_thd) {
+ THD *aborting;
+ wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
+ aborting = wsrep_aborting_thd->aborting_thd;
+ my_free(wsrep_aborting_thd);
+ wsrep_aborting_thd= next;
+ /*
+ * must release mutex, appliers my want to add more
+ * aborting thds in our work queue, while we rollback
+ */
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+
+ mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
+ if (aborting->wsrep_conflict_state== ABORTED)
+ {
+ WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
+ (long long)aborting->real_id,
+ aborting->wsrep_conflict_state);
+
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ continue;
+ }
+ aborting->wsrep_conflict_state= ABORTING;
+
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+
+ aborting->store_globals();
+
+ mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
+ wsrep_client_rollback(aborting);
+ WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)",
+ aborting->thread_id, (long long)aborting->real_id);
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+ sql_print_information("WSREP: rollbacker thread exiting");
+
+ DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
+ DBUG_VOID_RETURN;
+}
+
+void wsrep_create_rollbacker()
+{
+ if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ pthread_t hThread;
+ /* create rollbacker */
+ if (pthread_create( &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_rollback_process))
+ WSREP_WARN("Can't create thread to manage wsrep rollback");
+ }
+}
+
+extern "C"
+int wsrep_thd_is_brute_force(void *thd_ptr)
+{
+ /*
+ Brute force:
+ Appliers and replaying are running in REPL_RECV mode. TOI statements
+ in TOTAL_ORDER mode. Locally committing transaction that has got
+ past wsrep->pre_commit() without error is running in LOCAL_COMMIT mode.
+
+ Everything else is running in LOCAL_STATE and should not be considered
+ brute force.
+ */
+ if (thd_ptr) {
+ switch (((THD *)thd_ptr)->wsrep_exec_mode) {
+ case LOCAL_STATE: return 0;
+ case REPL_RECV: return 1;
+ case TOTAL_ORDER: return 2;
+ case LOCAL_COMMIT: return 3;
+ }
+ }
+ DBUG_ASSERT(0);
+ return 0;
+}
+
+extern "C"
+int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
+{
+ THD *victim_thd = (THD *) victim_thd_ptr;
+ THD *bf_thd = (THD *) bf_thd_ptr;
+ DBUG_ENTER("wsrep_abort_thd");
+
+ if ( (WSREP(bf_thd) ||
+ ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) &&
+ bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
+ victim_thd)
+ {
+ WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
+ (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
+ ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
+ }
+ else
+ {
+ WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
+ }
+
+ DBUG_RETURN(1);
+}
+
+extern "C"
+int wsrep_thd_in_locking_session(void *thd_ptr)
+{
+ if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
+ return 1;
+ }
+ return 0;
+}
+
diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h
new file mode 100644
index 00000000000..bded13b5684
--- /dev/null
+++ b/sql/wsrep_thd.h
@@ -0,0 +1,32 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_THD_H
+#define WSREP_THD_H
+
+#include "sql_class.h"
+
+int wsrep_show_bf_aborts (THD *thd, SHOW_VAR *var, char *buff);
+void wsrep_client_rollback(THD *thd);
+void wsrep_replay_transaction(THD *thd);
+void wsrep_create_appliers(long threads);
+void wsrep_create_rollbacker();
+
+extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
+extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
+ my_bool signal);
+extern "C" int wsrep_thd_in_locking_session(void *thd_ptr);
+
+#endif /* WSREP_THD_H */
diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc
index 53d0f7c449e..90af2fb8156 100644
--- a/sql/wsrep_utils.cc
+++ b/sql/wsrep_utils.cc
@@ -14,20 +14,25 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
-//! @file declares symbols private to wsrep integration layer
+//! @file some utility functions and classes not directly related to replication
#ifndef _GNU_SOURCE
#define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag
#endif
+#include "wsrep_utils.h"
+#include "wsrep_mysqld.h"
+
+#include <sql_class.h>
+
#include <spawn.h> // posix_spawn()
#include <unistd.h> // pipe()
#include <errno.h> // errno
#include <string.h> // strerror()
#include <sys/wait.h> // waitpid()
-
-#include <sql_class.h>
-#include "wsrep_priv.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h> // getaddrinfo()
extern char** environ; // environment variables
@@ -313,23 +318,69 @@ thd::~thd ()
} // namespace wsp
-extern ulong my_bind_addr;
-extern uint mysqld_port;
+/* Returns INADDR_NONE, INADDR_ANY, INADDR_LOOPBACK or something else */
+unsigned int wsrep_check_ip (const char* const addr)
+{
+ unsigned int ret = INADDR_NONE;
+ struct addrinfo *res, hints;
+
+ memset (&hints, 0, sizeof(hints));
+ hints.ai_flags= AI_PASSIVE/*|AI_ADDRCONFIG*/;
+ hints.ai_socktype= SOCK_STREAM;
+ hints.ai_family= AF_UNSPEC;
-size_t guess_ip (char* buf, size_t buf_len)
+ int gai_ret = getaddrinfo(addr, NULL, &hints, &res);
+ if (0 == gai_ret)
+ {
+ if (AF_INET == res->ai_family) /* IPv4 */
+ {
+ struct sockaddr_in* a= (struct sockaddr_in*)res->ai_addr;
+ ret= htonl(a->sin_addr.s_addr);
+ }
+ else /* IPv6 */
+ {
+ struct sockaddr_in6* a= (struct sockaddr_in6*)res->ai_addr;
+ if (IN6_IS_ADDR_UNSPECIFIED(&a->sin6_addr))
+ ret= INADDR_ANY;
+ else if (IN6_IS_ADDR_LOOPBACK(&a->sin6_addr))
+ ret= INADDR_LOOPBACK;
+ else
+ ret= 0xdeadbeef;
+ }
+ freeaddrinfo (res);
+ }
+ else {
+ WSREP_ERROR ("getaddrinfo() failed on '%s': %d (%s)",
+ addr, gai_ret, gai_strerror(gai_ret));
+ }
+
+ // uint8_t* b= (uint8_t*)&ret;
+ // fprintf (stderr, "########## wsrep_check_ip returning: %hhu.%hhu.%hhu.%hhu\n",
+ // b[0], b[1], b[2], b[3]);
+
+ return ret;
+}
+
+extern const char* my_bind_addr_str;
+extern uint mysqld_port;
+
+size_t wsrep_guess_ip (char* buf, size_t buf_len)
{
size_t ip_len = 0;
- if (htonl(INADDR_NONE) == my_bind_addr) {
- WSREP_ERROR("Networking not configured, cannot receive state transfer.");
- return 0;
- }
+ if (my_bind_addr_str && strlen(my_bind_addr_str))
+ {
+ unsigned int const ip_type= wsrep_check_ip(my_bind_addr_str);
- if (htonl(INADDR_ANY) != my_bind_addr) {
- uint8_t* b = (uint8_t*)&my_bind_addr;
- ip_len = snprintf (buf, buf_len,
- "%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]);
- return ip_len;
+ if (INADDR_NONE == ip_type) {
+ WSREP_ERROR("Networking not configured, cannot receive state transfer.");
+ return 0;
+ }
+
+ if (INADDR_ANY != ip_type) {;
+ strncpy (buf, my_bind_addr_str, buf_len);
+ return strlen(buf);
+ }
}
// mysqld binds to all interfaces - try IP from wsrep_node_address
@@ -400,9 +451,9 @@ size_t guess_ip (char* buf, size_t buf_len)
return ip_len;
}
-size_t guess_address(char* buf, size_t buf_len)
+size_t wsrep_guess_address(char* buf, size_t buf_len)
{
- size_t addr_len = guess_ip (buf, buf_len);
+ size_t addr_len = wsrep_guess_ip (buf, buf_len);
if (addr_len && addr_len < buf_len) {
addr_len += snprintf (buf + addr_len, buf_len - addr_len,
diff --git a/sql/wsrep_utils.h b/sql/wsrep_utils.h
new file mode 100644
index 00000000000..337678238f8
--- /dev/null
+++ b/sql/wsrep_utils.h
@@ -0,0 +1,208 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_UTILS_H
+#define WSREP_UTILS_H
+
+#include "wsrep_priv.h"
+
+unsigned int wsrep_check_ip (const char* addr);
+size_t wsrep_guess_ip (char* buf, size_t buf_len);
+size_t wsrep_guess_address(char* buf, size_t buf_len);
+
+namespace wsp {
+class node_status
+{
+public:
+ node_status() : status(WSREP_MEMBER_UNDEFINED) {}
+ void set(wsrep_member_status_t new_status,
+ const wsrep_view_info_t* view = 0)
+ {
+ if (status != new_status || 0 != view)
+ {
+ wsrep_notify_status(new_status, view);
+ status = new_status;
+ }
+ }
+ wsrep_member_status_t get() const { return status; }
+private:
+ wsrep_member_status_t status;
+};
+} /* namespace wsp */
+
+extern wsp::node_status local_status;
+
+namespace wsp {
+/* A small class to run external programs. */
+class process
+{
+private:
+ const char* const str_;
+ FILE* io_;
+ int err_;
+ pid_t pid_;
+
+public:
+/*! @arg type is a pointer to a null-terminated string which must contain
+ either the letter 'r' for reading or the letter 'w' for writing.
+ */
+ process (const char* cmd, const char* type);
+ ~process ();
+
+ FILE* pipe () { return io_; }
+ int error() { return err_; }
+ int wait ();
+ const char* cmd() { return str_; }
+};
+
+class thd
+{
+ class thd_init
+ {
+ public:
+ thd_init() { my_thread_init(); }
+ ~thd_init() { my_thread_end(); }
+ }
+ init;
+
+ thd (const thd&);
+ thd& operator= (const thd&);
+
+public:
+
+ thd(my_bool wsrep_on);
+ ~thd();
+ THD* const ptr;
+};
+
+class string
+{
+public:
+ string() : string_(0) {}
+ void set(char* str) { if (string_) free (string_); string_ = str; }
+ ~string() { set (0); }
+private:
+ char* string_;
+};
+
+#ifdef REMOVED
+class lock
+{
+ pthread_mutex_t* const mtx_;
+
+public:
+
+ lock (pthread_mutex_t* mtx) : mtx_(mtx)
+ {
+ int err = pthread_mutex_lock (mtx_);
+
+ if (err)
+ {
+ WSREP_ERROR("Mutex lock failed: %s", strerror(err));
+ abort();
+ }
+ }
+
+ virtual ~lock ()
+ {
+ int err = pthread_mutex_unlock (mtx_);
+
+ if (err)
+ {
+ WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
+ abort();
+ }
+ }
+
+ inline void wait (pthread_cond_t* cond)
+ {
+ pthread_cond_wait (cond, mtx_);
+ }
+
+private:
+
+ lock (const lock&);
+ lock& operator=(const lock&);
+
+};
+
+class monitor
+{
+ int mutable refcnt;
+ pthread_mutex_t mutable mtx;
+ pthread_cond_t mutable cond;
+
+public:
+
+ monitor() : refcnt(0)
+ {
+ pthread_mutex_init (&mtx, NULL);
+ pthread_cond_init (&cond, NULL);
+ }
+
+ ~monitor()
+ {
+ pthread_mutex_destroy (&mtx);
+ pthread_cond_destroy (&cond);
+ }
+
+ void enter() const
+ {
+ lock l(&mtx);
+
+ while (refcnt)
+ {
+ l.wait(&cond);
+ }
+ refcnt++;
+ }
+
+ void leave() const
+ {
+ lock l(&mtx);
+
+ refcnt--;
+ if (refcnt == 0)
+ {
+ pthread_cond_signal (&cond);
+ }
+ }
+
+private:
+
+ monitor (const monitor&);
+ monitor& operator= (const monitor&);
+};
+
+class critical
+{
+ const monitor& mon;
+
+public:
+
+ critical(const monitor& m) : mon(m) { mon.enter(); }
+
+ ~critical() { mon.leave(); }
+
+private:
+
+ critical (const critical&);
+ critical& operator= (const critical&);
+};
+#endif
+
+} // namespace wsrep
+
+#endif /* WSREP_UTILS_H */
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index 5dc9a475e0d..81d436f6116 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -13,12 +13,15 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include "wsrep_var.h"
+
#include <mysqld.h>
#include <sql_class.h>
#include <sql_plugin.h>
#include <set_var.h>
#include <sql_acl.h>
#include "wsrep_priv.h"
+#include "wsrep_thd.h"
#include <my_dir.h>
#include <cstdio>
#include <cstdlib>
@@ -34,8 +37,7 @@ const char* wsrep_node_name = 0;
const char* wsrep_node_address = 0;
const char* wsrep_node_incoming_address = 0;
const char* wsrep_start_position = 0;
-ulong wsrep_OSU_method_options;
-static int wsrep_thread_change = 0;
+ulong wsrep_OSU_method_options;
int wsrep_init_vars()
{
@@ -58,15 +60,6 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type)
// FIXME: this variable probably should be changed only per session
thd->variables.wsrep_on = global_system_variables.wsrep_on;
}
- else {
- }
-
-#ifdef REMOVED
- if (thd->variables.wsrep_on)
- thd->variables.option_bits |= (OPTION_BIN_LOG);
- else
- thd->variables.option_bits &= ~(OPTION_BIN_LOG);
-#endif
return false;
}
@@ -75,8 +68,6 @@ void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type)
if (var_type == OPT_GLOBAL) {
thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads;
}
- else {
- }
}
static int wsrep_start_position_verify (const char* start_str)
@@ -146,7 +137,7 @@ bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type)
wsrep_set_local_position (wsrep_start_position);
if (wsrep) {
- wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
+ wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
}
return 0;
@@ -157,7 +148,7 @@ void wsrep_start_position_init (const char* val)
if (NULL == val || wsrep_start_position_verify (val))
{
WSREP_ERROR("Bad initial value for wsrep_start_position: %s",
- (val ? val : ""));
+ (val ? val : ""));
return;
}
@@ -173,7 +164,7 @@ static bool refresh_provider_options()
{
if (wsrep_provider_options) my_free((void *)wsrep_provider_options);
wsrep_provider_options = (char*)my_memdup(opts, strlen(opts) + 1,
- MYF(MY_WME));
+ MYF(MY_WME));
}
else
{
@@ -453,7 +444,7 @@ void wsrep_node_address_init (const char* value)
bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var)
{
mysql_mutex_lock(&LOCK_wsrep_slave_threads);
- wsrep_thread_change = var->value->val_int() - wsrep_slave_threads;
+ wsrep_slave_count_change = var->value->val_int() - wsrep_slave_threads;
mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
return 0;
@@ -461,13 +452,10 @@ bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var)
bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type)
{
- if (wsrep_thread_change > 0)
- {
- wsrep_create_appliers(wsrep_thread_change);
- }
- else if (wsrep_thread_change < 0)
+ if (wsrep_slave_count_change > 0)
{
- wsrep_close_applier_threads(-wsrep_thread_change);
+ wsrep_create_appliers(wsrep_slave_count_change);
+ wsrep_slave_count_change = 0;
}
return false;
}
diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h
new file mode 100644
index 00000000000..b69f670a14b
--- /dev/null
+++ b/sql/wsrep_var.h
@@ -0,0 +1,83 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_VAR_H
+#define WSREP_VAR_H
+
+#define WSREP_NODE_INCOMING_AUTO "AUTO"
+
+// MySQL variables funcs
+
+#include "sql_priv.h"
+class sys_var;
+class set_var;
+class THD;
+
+int wsrep_init_vars();
+
+#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var)
+#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type)
+#define DEFAULT_ARGS (THD* thd, enum_var_type var_type)
+#define INIT_ARGS (const char* opt)
+
+extern bool wsrep_on_update UPDATE_ARGS;
+extern void wsrep_causal_reads_update UPDATE_ARGS;
+extern bool wsrep_start_position_check CHECK_ARGS;
+extern bool wsrep_start_position_update UPDATE_ARGS;
+extern void wsrep_start_position_init INIT_ARGS;
+
+extern bool wsrep_provider_check CHECK_ARGS;
+extern bool wsrep_provider_update UPDATE_ARGS;
+extern void wsrep_provider_init INIT_ARGS;
+
+extern bool wsrep_provider_options_check CHECK_ARGS;
+extern bool wsrep_provider_options_update UPDATE_ARGS;
+extern void wsrep_provider_options_init INIT_ARGS;
+
+extern bool wsrep_cluster_address_check CHECK_ARGS;
+extern bool wsrep_cluster_address_update UPDATE_ARGS;
+extern void wsrep_cluster_address_init INIT_ARGS;
+
+extern bool wsrep_cluster_name_check CHECK_ARGS;
+extern bool wsrep_cluster_name_update UPDATE_ARGS;
+
+extern bool wsrep_node_name_check CHECK_ARGS;
+extern bool wsrep_node_name_update UPDATE_ARGS;
+
+extern bool wsrep_node_address_check CHECK_ARGS;
+extern bool wsrep_node_address_update UPDATE_ARGS;
+extern void wsrep_node_address_init INIT_ARGS;
+
+extern bool wsrep_sst_method_check CHECK_ARGS;
+extern bool wsrep_sst_method_update UPDATE_ARGS;
+extern void wsrep_sst_method_init INIT_ARGS;
+
+extern bool wsrep_sst_receive_address_check CHECK_ARGS;
+extern bool wsrep_sst_receive_address_update UPDATE_ARGS;
+
+extern bool wsrep_sst_auth_check CHECK_ARGS;
+extern bool wsrep_sst_auth_update UPDATE_ARGS;
+extern void wsrep_sst_auth_init INIT_ARGS;
+
+extern bool wsrep_sst_donor_check CHECK_ARGS;
+extern bool wsrep_sst_donor_update UPDATE_ARGS;
+
+extern bool wsrep_slave_threads_check CHECK_ARGS;
+extern bool wsrep_slave_threads_update UPDATE_ARGS;
+
+extern bool wsrep_desync_check CHECK_ARGS;
+extern bool wsrep_desync_update UPDATE_ARGS;
+
+#endif /* WSREP_VAR_H */