summaryrefslogtreecommitdiff
path: root/sql/sql_class.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_class.cc')
-rw-r--r--sql/sql_class.cc161
1 files changed, 112 insertions, 49 deletions
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index fa2f866a3f6..87e377c1819 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -66,8 +66,11 @@
#include "sql_callback.h"
#include "lock.h"
#include "wsrep_mysqld.h"
-#include "wsrep_thd.h"
#include "sql_connect.h"
+#ifdef WITH_WSREP
+#include "wsrep_thd.h"
+#include "wsrep_trans_observer.h"
+#endif /* WITH_WSREP */
#ifdef HAVE_SYS_SYSCALL_H
#include <sys/syscall.h>
@@ -639,16 +642,42 @@ THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock)
xid_hash_pins(0),
m_tmp_tables_locked(false)
#ifdef WITH_WSREP
- ,
+ ,
wsrep_applier(is_wsrep_applier),
wsrep_applier_closing(false),
wsrep_client_thread(false),
- wsrep_apply_toi(false),
+ wsrep_retry_counter(0),
+ wsrep_PA_safe(true),
+ wsrep_retry_query(NULL),
+ wsrep_retry_query_len(0),
+ wsrep_retry_command(COM_CONNECT),
+ wsrep_consistency_check(NO_CONSISTENCY_CHECK),
+ wsrep_mysql_replicated(0),
+ wsrep_TOI_pre_query(NULL),
+ wsrep_TOI_pre_query_len(0),
wsrep_po_handle(WSREP_PO_INITIALIZER),
wsrep_po_cnt(0),
wsrep_apply_format(0),
- wsrep_ignore_table(false)
-#endif
+ wsrep_apply_toi(false),
+ wsrep_rbr_buf(NULL),
+ wsrep_sync_wait_gtid(WSREP_GTID_UNDEFINED),
+ wsrep_affected_rows(0),
+ wsrep_has_ignored_error(false),
+ wsrep_replicate_GTID(false),
+ wsrep_ignore_table(false),
+
+/* wsrep-lib */
+ m_wsrep_next_trx_id(WSREP_UNDEFINED_TRX_ID),
+ m_wsrep_mutex(LOCK_thd_data),
+ m_wsrep_cond(COND_wsrep_thd),
+ m_wsrep_client_service(this, m_wsrep_client_state),
+ m_wsrep_client_state(this,
+ m_wsrep_mutex,
+ m_wsrep_cond,
+ Wsrep_server_state::instance(),
+ m_wsrep_client_service,
+ wsrep::client_id(thread_id))
+#endif /*WITH_WSREP */
{
ulong tmp;
bzero(&variables, sizeof(variables));
@@ -771,22 +800,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier, bool skip_global_sys_var_lock)
*scramble= '\0';
#ifdef WITH_WSREP
- wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
- wsrep_ws_handle.opaque = NULL;
- wsrep_retry_counter = 0;
- wsrep_PA_safe = true;
- wsrep_retry_query = NULL;
- wsrep_retry_query_len = 0;
- wsrep_retry_command = COM_CONNECT;
- wsrep_consistency_check = NO_CONSISTENCY_CHECK;
- wsrep_mysql_replicated = 0;
- wsrep_TOI_pre_query = NULL;
- wsrep_TOI_pre_query_len = 0;
+ mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL);
wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */
- wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED;
- wsrep_affected_rows = 0;
- wsrep_replicate_GTID = false;
- wsrep_skip_wsrep_GTID = false;
#endif
/* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this);
@@ -1049,10 +1064,25 @@ Sql_condition* THD::raise_condition(uint sql_errno,
is_slave_error= 1; // needed to catch query errors during replication
- if (!da->is_error())
+#ifdef WITH_WSREP
+ /*
+ With wsrep we allow converting BF abort error to warning if
+ errors are ignored.
+ */
+ if (!is_fatal_error &&
+ no_errors &&
+ (wsrep_trx().bf_aborted() || wsrep_retry_counter))
{
- set_row_count_func(-1);
- da->set_error_status(sql_errno, msg, sqlstate, ucid, cond);
+ WSREP_DEBUG("BF abort error converted to warning");
+ }
+ else
+#endif /* WITH_WSREP */
+ {
+ if (!da->is_error())
+ {
+ set_row_count_func(-1);
+ da->set_error_status(sql_errno, msg, sqlstate, ucid, cond);
+ }
}
}
@@ -1113,6 +1143,13 @@ void *thd_memdup(MYSQL_THD thd, const void* str, size_t size)
extern "C"
void thd_get_xid(const MYSQL_THD thd, MYSQL_XID *xid)
{
+#ifdef WITH_WSREP
+ if (!thd->wsrep_xid.is_null())
+ {
+ *xid = *(MYSQL_XID *) &thd->wsrep_xid;
+ }
+ else
+#endif /* WITH_WSREP */
*xid = *(MYSQL_XID *) &thd->transaction.xid_state.xid;
}
@@ -1221,12 +1258,9 @@ void THD::init(bool skip_lock)
first_successful_insert_id_in_cur_stmt= 0;
current_backup_stage= BACKUP_FINISHED;
#ifdef WITH_WSREP
- wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE;
- wsrep_conflict_state= NO_CONFLICT;
- wsrep_query_state= QUERY_IDLE;
wsrep_last_query_id= 0;
- wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED;
- wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED;
+ wsrep_xid.null();
+ wsrep_skip_locking= FALSE;
wsrep_converted_lock_session= false;
wsrep_retry_counter= 0;
wsrep_rgi= NULL;
@@ -1235,10 +1269,10 @@ void THD::init(bool skip_lock)
wsrep_mysql_replicated = 0;
wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0;
- wsrep_sync_wait_gtid = WSREP_GTID_UNDEFINED;
+ wsrep_rbr_buf = NULL;
wsrep_affected_rows = 0;
+ m_wsrep_next_trx_id = WSREP_UNDEFINED_TRX_ID;
wsrep_replicate_GTID = false;
- wsrep_skip_wsrep_GTID = false;
#endif /* WITH_WSREP */
if (variables.sql_log_bin)
@@ -1467,6 +1501,13 @@ void THD::cleanup(void)
#error xid_state in the cache should be replaced by the allocated value
}
#endif
+#ifdef WITH_WSREP
+ if (wsrep_cs().state() != wsrep::client_state::s_none)
+ {
+ wsrep_cs().cleanup();
+ }
+ wsrep_client_thread= false;
+#endif /* WITH_WSREP */
mysql_ha_cleanup(this);
locked_tables_list.unlock_locked_tables(this);
@@ -1587,6 +1628,9 @@ void THD::reset_for_reuse()
#ifdef SIGNAL_WITH_VIO_CLOSE
active_vio = 0;
#endif
+#ifdef WITH_WSREP
+ wsrep_free_status(this);
+#endif /* WITH_WSREP */
}
@@ -1613,15 +1657,21 @@ THD::~THD()
THD is not deleted while they access it. The following mutex_lock
ensures that no one else is using this THD and it's now safe to delete
*/
+ if (WSREP(this)) mysql_mutex_lock(&LOCK_thd_data);
mysql_mutex_lock(&LOCK_thd_kill);
mysql_mutex_unlock(&LOCK_thd_kill);
+ if (WSREP(this)) mysql_mutex_unlock(&LOCK_thd_data);
-#ifdef WITH_WSREP
- delete wsrep_rgi;
-#endif
if (!free_connection_done)
free_connection();
+#ifdef WITH_WSREP
+ if (wsrep_rgi != NULL) {
+ delete wsrep_rgi;
+ wsrep_rgi = NULL;
+ }
+ mysql_cond_destroy(&COND_wsrep_thd);
+#endif
mdl_context.destroy();
free_root(&transaction.mem_root,MYF(0));
@@ -1803,6 +1853,7 @@ void THD::awake_no_mutex(killed_state state_to_set)
DBUG_PRINT("enter", ("this: %p current_thd: %p state: %d",
this, current_thd, (int) state_to_set));
THD_CHECK_SENTRY(this);
+ if (WSREP(this)) mysql_mutex_assert_owner(&LOCK_thd_data);
mysql_mutex_assert_owner(&LOCK_thd_kill);
print_aborted_warning(3, "KILLED");
@@ -1835,7 +1886,8 @@ void THD::awake_no_mutex(killed_state state_to_set)
}
/* Interrupt target waiting inside a storage engine. */
- if (state_to_set != NOT_KILLED)
+ if (IF_WSREP(state_to_set != NOT_KILLED && !wsrep_is_bf_aborted(this),
+ state_to_set != NOT_KILLED))
ha_kill_query(this, thd_kill_level(this));
/* Broadcast a condition to kick the target if it is waiting on it. */
@@ -1988,12 +2040,6 @@ bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use,
if (!thd_table->needs_reopen())
{
signalled|= mysql_lock_abort_for_thread(this, thd_table);
- if (WSREP(this) && wsrep_thd_is_BF(this, FALSE))
- {
- WSREP_DEBUG("remove_table_from_cache: %llu",
- (unsigned long long) this->real_id);
- wsrep_abort_thd((void *)this, (void *)in_use, FALSE);
- }
}
}
}
@@ -2225,12 +2271,6 @@ void THD::cleanup_after_query()
/* reset table map for multi-table update */
table_map_for_update= 0;
m_binlog_invoker= INVOKER_NONE;
-#ifdef WITH_WSREP
- if (TOTAL_ORDER == wsrep_exec_mode)
- {
- wsrep_exec_mode = LOCAL_STATE;
- }
-#endif /* WITH_WSREP */
#ifndef EMBEDDED_LIBRARY
if (rgi_slave)
@@ -2238,7 +2278,6 @@ void THD::cleanup_after_query()
#endif
#ifdef WITH_WSREP
- wsrep_sync_wait_gtid= WSREP_GTID_UNDEFINED;
if (!in_active_multi_stmt_transaction())
wsrep_affected_rows= 0;
#endif /* WITH_WSREP */
@@ -5007,8 +5046,9 @@ extern "C" int thd_binlog_format(const MYSQL_THD thd)
if (WSREP(thd))
{
/* for wsrep binlog format is meaningful also when binlogging is off */
- return (int) thd->wsrep_binlog_format();
+ return (int) WSREP_BINLOG_FORMAT(thd->variables.binlog_format);
}
+
if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG))
return (int) thd->variables.binlog_format;
return BINLOG_FORMAT_UNSPEC;
@@ -5491,6 +5531,10 @@ void THD::set_query_and_id(char *query_arg, uint32 query_length_arg,
set_query_inner(query_arg, query_length_arg, cs);
mysql_mutex_unlock(&LOCK_thd_data);
query_id= new_query_id;
+#ifdef WITH_WSREP
+ set_wsrep_next_trx_id(query_id);
+ WSREP_DEBUG("assigned new next query and trx id: %lu", wsrep_next_trx_id());
+#endif /* WITH_WSREP */
}
/** Assign a new value to thd->mysys_var. */
@@ -5936,9 +5980,27 @@ int THD::decide_logging_format(TABLE_LIST *tables)
binlogging is off, or if the statement is filtered out from the
binlog by filtering rules.
*/
+#ifdef WITH_WSREP
+ if (WSREP_CLIENT_NNULL(this) && variables.wsrep_trx_fragment_size > 0)
+ {
+ if (!is_current_stmt_binlog_format_row())
+ {
+ my_message(ER_NOT_SUPPORTED_YET,
+ "Streaming replication not supported with "
+ "binlog_format=STATEMENT", MYF(0));
+ DBUG_RETURN(-1);
+ }
+ }
+
+ if ((WSREP_EMULATE_BINLOG_NNULL(this) ||
+ (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG))) &&
+ !(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
+ !binlog_filter->db_ok(db.str)))
+#else
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
!(wsrep_binlog_format() == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db.str)))
+#endif /* WITH_WSREP */
{
if (is_bulk_op())
@@ -6260,7 +6322,8 @@ int THD::decide_logging_format(TABLE_LIST *tables)
5. Error: Cannot modify table that uses a storage engine
limited to row-logging when binlog_format = STATEMENT
*/
- if (IF_WSREP((!WSREP(this) || wsrep_exec_mode == LOCAL_STATE),1))
+ if (IF_WSREP((!WSREP(this) ||
+ wsrep_cs().mode() == wsrep::client_state::m_local),1))
{
my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), "");
}