summaryrefslogtreecommitdiff
path: root/sql/sql_class.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r--sql/sql_class.cc251
1 files changed, 244 insertions, 7 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index d7d0c8d3f68..5b63201a910 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -62,6 +62,9 @@
#include "debug_sync.h"
#include "sql_parse.h" // is_update_query
#include "sql_callback.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
#include "sql_connect.h"
/*
@@ -695,6 +698,137 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length,
return buffer;
}
+#ifdef WITH_WSREP
+extern "C" int wsrep_on(void *thd)
+{
+ return (int)(WSREP(((THD*)thd)));
+}
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd)
+{
+ return thd->variables.wsrep_on;
+}
+
+extern "C" bool wsrep_consistency_check(void *thd)
+{
+ return ((THD*)thd)->wsrep_consistency_check;
+}
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode)
+{
+ thd->wsrep_exec_mode= mode;
+}
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state)
+{
+ thd->wsrep_query_state= state;
+}
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state)
+{
+ thd->wsrep_conflict_state= state;
+}
+
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd)
+{
+ return thd->wsrep_exec_mode;
+}
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd)
+{
+ return thd->wsrep_query_state;
+}
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd)
+{
+ return thd->wsrep_conflict_state;
+}
+
+extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd)
+{
+ return &thd->wsrep_trx_handle;
+}
+
+extern "C"void wsrep_thd_LOCK(THD *thd)
+{
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+}
+extern "C"void wsrep_thd_UNLOCK(THD *thd)
+{
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+}
+extern "C" time_t wsrep_thd_query_start(THD *thd)
+{
+ return thd->query_start();
+}
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd)
+{
+ return thd->wsrep_rand;
+}
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd)
+{
+ return thd->thread_id;
+}
+extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd)
+{
+ return thd->wsrep_trx_seqno;
+}
+extern "C" query_id_t wsrep_thd_query_id(THD *thd)
+{
+ return thd->query_id;
+}
+extern "C" char *wsrep_thd_query(THD *thd)
+{
+ return thd->query();
+}
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd)
+{
+ return thd->wsrep_last_query_id;
+}
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
+{
+ thd->wsrep_last_query_id= id;
+}
+extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
+{
+ if (signal)
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->awake(KILL_QUERY);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
+ else
+ {
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+}
+
+extern "C" int
+wsrep_trx_order_before(void *thd1, void *thd2)
+{
+ if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) {
+ WSREP_DEBUG("BF conflict, order: %lld %lld\n",
+ (long long)((THD*)thd1)->wsrep_trx_seqno,
+ (long long)((THD*)thd2)->wsrep_trx_seqno);
+ return 1;
+ }
+ WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
+ (long long)((THD*)thd1)->wsrep_trx_seqno,
+ (long long)((THD*)thd2)->wsrep_trx_seqno);
+ return 0;
+}
+extern "C" int
+wsrep_trx_is_aborting(void *thd_ptr)
+{
+ if (thd_ptr) {
+ if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) ||
+ (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) {
+ return 1;
+ }
+ }
+ return 0;
+}
+#endif
/**
Implementation of Drop_table_error_handler::handle_condition().
@@ -723,7 +857,11 @@ bool Drop_table_error_handler::handle_condition(THD *thd,
}
+#ifdef WITH_WSREP
+THD::THD(bool is_applier)
+#else
THD::THD()
+#endif
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0),
@@ -750,6 +888,10 @@ THD::THD()
bootstrap(0),
derived_tables_processing(FALSE),
spcont(NULL),
+#ifdef WITH_WSREP
+ wsrep_applier(is_applier),
+ wsrep_client_thread(0),
+#endif
m_parser_state(NULL),
#if defined(ENABLED_DEBUG_SYNC)
debug_sync_control(0),
@@ -849,6 +991,20 @@ THD::THD()
command=COM_CONNECT;
*scramble= '\0';
+#ifdef WITH_WSREP
+ mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL);
+ wsrep_trx_handle.trx_id= -1;
+ wsrep_trx_handle.opaque= NULL;
+ //wsrep_retry_autocommit= ::wsrep_retry_autocommit;
+ wsrep_retry_counter= 0;
+ wsrep_PA_safe = true;
+ wsrep_seqno_changed= false;
+ wsrep_retry_query = NULL;
+ wsrep_retry_query_len = 0;
+ wsrep_retry_command = COM_CONNECT;
+ wsrep_consistency_check = false;
+#endif
/* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this);
@@ -881,6 +1037,13 @@ THD::THD()
my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
substitute_null_with_insert_id = FALSE;
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
+#ifdef WITH_WSREP
+ lock_info.mysql_thd= (void *)this;
+ lock_info.in_lock_tables= false;
+#ifdef WSREP_PROC_INFO
+ wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
m_internal_handler= NULL;
m_binlog_invoker= FALSE;
@@ -1182,7 +1345,19 @@ void THD::init(void)
reset_current_stmt_binlog_format_row();
bzero((char *) &status_var, sizeof(status_var));
bzero((char *) &org_status_var, sizeof(org_status_var));
-
+#ifdef WITH_WSREP
+ wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE;
+ wsrep_conflict_state= NO_CONFLICT;
+ wsrep_query_state= QUERY_IDLE;
+ wsrep_last_query_id= 0;
+ wsrep_trx_seqno= 0;
+ wsrep_converted_lock_session= false;
+ wsrep_retry_counter= 0;
+ wsrep_rli= NULL;
+ wsrep_PA_safe= true;
+ wsrep_seqno_changed= false;
+ wsrep_consistency_check = false;
+#endif
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
else
@@ -1376,6 +1551,12 @@ THD::~THD()
mysql_mutex_unlock(&LOCK_thd_data);
add_to_status(&global_status_var, &status_var);
+#ifdef WITH_WSREP
+ mysql_mutex_lock(&LOCK_wsrep_thd);
+ mysql_mutex_unlock(&LOCK_wsrep_thd);
+ mysql_mutex_destroy(&LOCK_wsrep_thd);
+ if (wsrep_rli) delete wsrep_rli;
+#endif
/* Close connection */
#ifndef EMBEDDED_LIBRARY
if (net.vio)
@@ -1790,6 +1971,13 @@ void THD::cleanup_after_query()
/* reset table map for multi-table update */
table_map_for_update= 0;
m_binlog_invoker= FALSE;
+#ifdef WITH_WSREP
+ if (TOTAL_ORDER == wsrep_exec_mode)
+ {
+ wsrep_exec_mode = LOCAL_STATE;
+ }
+ //wsrep_trx_seqno = 0;
+#endif /* WITH_WSREP */
DBUG_VOID_RETURN;
}
@@ -2205,6 +2393,13 @@ bool sql_exchange::escaped_given(void)
bool select_send::send_result_set_metadata(List<Item> &list, uint flags)
{
bool res;
+#ifdef WITH_WSREP
+ if (WSREP(thd) && thd->wsrep_retry_query)
+ {
+ WSREP_DEBUG("skipping select metadata");
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
if (!(res= thd->protocol->send_result_set_metadata(&list, flags)))
is_result_set_started= 1;
return res;
@@ -3911,8 +4106,13 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
extern "C" int thd_binlog_format(const MYSQL_THD thd)
{
+#ifdef WITH_WSREP
+ if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) &&
+ (thd->variables.option_bits & OPTION_BIN_LOG))
+#else
if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG))
- return (int) thd->variables.binlog_format;
+#endif
+ return (int) WSREP_FORMAT(thd->variables.binlog_format);
else
return BINLOG_FORMAT_UNSPEC;
}
@@ -4467,7 +4667,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
binlog by filtering rules.
*/
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
- !(variables.binlog_format == BINLOG_FORMAT_STMT &&
+ !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db)))
{
/*
@@ -4631,7 +4831,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
*/
my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
}
- else if (variables.binlog_format == BINLOG_FORMAT_ROW &&
+ else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW &&
sqlcom_can_generate_row_events(this))
{
/*
@@ -4660,7 +4860,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
else
{
/* binlog_format = STATEMENT */
- if (variables.binlog_format == BINLOG_FORMAT_STMT)
+ if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT)
{
if (lex->is_stmt_row_injection())
{
@@ -4677,7 +4877,14 @@ int THD::decide_logging_format(TABLE_LIST *tables)
5. Error: Cannot modify table that uses a storage engine
limited to row-logging when binlog_format = STATEMENT
*/
+#ifdef WITH_WSREP
+ if (!WSREP(this) || wsrep_exec_mode == LOCAL_STATE)
+ {
+#endif /* WITH_WSREP */
my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), "");
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
{
@@ -4725,7 +4932,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
"and binlog_filter->db_ok(db) = %d",
mysql_bin_log.is_open(),
(variables.option_bits & OPTION_BIN_LOG),
- variables.binlog_format,
+ WSREP_FORMAT(variables.binlog_format),
binlog_filter->db_ok(db)));
#endif
@@ -4979,7 +5186,13 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log) ||
+ mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
/*
Pack records into format for transfer. We are allocating more
@@ -5009,7 +5222,13 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
const uchar *before_record,
const uchar *after_record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
size_t const before_maxlen = max_row_length(table, before_record);
size_t const after_maxlen = max_row_length(table, after_record);
@@ -5054,7 +5273,13 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
/*
Pack records into format for transfer. We are allocating more
@@ -5085,7 +5310,11 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps,
{
DBUG_ENTER("THD::binlog_remove_pending_rows_event");
+#ifdef WITH_WSREP
+ if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()))
+#else
if (!mysql_bin_log.is_open())
+#endif
DBUG_RETURN(0);
mysql_bin_log.remove_pending_rows_event(this, is_transactional);
@@ -5104,7 +5333,11 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
mode: it might be the case that we left row-based mode before
flushing anything (e.g., if we have explicitly locked tables).
*/
+#ifdef WITH_WSREP
+ if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()))
+#else
if (!mysql_bin_log.is_open())
+#endif
DBUG_RETURN(0);
/*
@@ -5224,8 +5457,12 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
DBUG_ENTER("THD::binlog_query");
DBUG_PRINT("enter", ("qtype: %s query: '%-.*s'",
show_query_type(qtype), (int) query_len, query_arg));
+#ifdef WITH_WSREP
+ DBUG_ASSERT(query_arg && (WSREP_EMULATE_BINLOG(this)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(query_arg && mysql_bin_log.is_open());
-
+#endif
/*
If we are not in prelocked mode, mysql_unlock_tables() will be
called after this binlog_query(), so we have to flush the pending