summaryrefslogtreecommitdiff
path: root/sql/wsrep_hton.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_hton.cc')
-rw-r--r--sql/wsrep_hton.cc116
1 files changed, 74 insertions, 42 deletions
diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc
index 1676daab5fe..0a2264ac03c 100644
--- a/sql/wsrep_hton.cc
+++ b/sql/wsrep_hton.cc
@@ -1,4 +1,4 @@
-/* Copyright 2008 Codership Oy <http://www.codership.com>
+/* Copyright 2008-2015 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -19,11 +19,11 @@
#include <sql_class.h>
#include "wsrep_mysqld.h"
#include "wsrep_binlog.h"
+#include "wsrep_xid.h"
#include <cstdio>
#include <cstdlib>
+#include "debug_sync.h"
-extern handlerton *binlog_hton;
-extern int binlog_close_connection(handlerton *hton, THD *thd);
extern ulonglong thd_to_trx_id(THD *thd);
extern "C" int thd_binlog_format(const MYSQL_THD thd);
@@ -37,11 +37,15 @@ enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton,
*/
void wsrep_cleanup_transaction(THD *thd)
{
+ if (!WSREP(thd)) return;
+
if (wsrep_emulate_bin_log) thd_binlog_trx_reset(thd);
thd->wsrep_ws_handle.trx_id= WSREP_UNDEFINED_TRX_ID;
thd->wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED;
thd->wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED;
thd->wsrep_exec_mode= LOCAL_STATE;
+ thd->wsrep_affected_rows= 0;
+ thd->wsrep_skip_wsrep_GTID= false;
return;
}
@@ -70,18 +74,30 @@ void wsrep_register_hton(THD* thd, bool all)
{
if (thd->wsrep_exec_mode != TOTAL_ORDER && !thd->wsrep_apply_toi)
{
+ if (thd->wsrep_exec_mode == LOCAL_STATE &&
+ (thd_sql_command(thd) == SQLCOM_OPTIMIZE ||
+ thd_sql_command(thd) == SQLCOM_ANALYZE ||
+ thd_sql_command(thd) == SQLCOM_REPAIR) &&
+ thd->lex->no_write_to_binlog == 1)
+ {
+ WSREP_DEBUG("Skipping wsrep_register_hton for LOCAL sql admin command : %s",
+ thd->query());
+ return;
+ }
+
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next())
{
- if (i->ht()->db_type == DB_TYPE_INNODB)
+ if ((i->ht()->db_type == DB_TYPE_INNODB) ||
+ (i->ht()->db_type == DB_TYPE_TOKUDB))
{
trans_register_ha(thd, all, wsrep_hton);
/* follow innodb read/write settting
- * but, as an exception: CTAS with empty result set will not be
+ * but, as an exception: CTAS with empty result set will not be
* replicated unless we declare wsrep hton as read/write here
*/
- if (i->is_trx_read_write() ||
+ if (i->is_trx_read_write() ||
(thd->lex->sql_command == SQLCOM_CREATE_TABLE &&
thd->wsrep_exec_mode == LOCAL_STATE))
{
@@ -110,7 +126,7 @@ void wsrep_post_commit(THD* thd, bool all)
{
DBUG_PRINT("wsrep", ("set committed fail"));
WSREP_WARN("set committed fail: %llu %d",
- (long long)thd->real_id, thd->stmt_da->status());
+ (long long)thd->real_id, thd->get_stmt_da()->status());
}
wsrep_cleanup_transaction(thd);
break;
@@ -132,7 +148,7 @@ void wsrep_post_commit(THD* thd, bool all)
wsrep && wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
{
WSREP_WARN("post_rollback fail: %llu %d",
- (long long)thd->thread_id, thd->stmt_da->status());
+ (long long)thd->thread_id, thd->get_stmt_da()->status());
}
wsrep_cleanup_transaction(thd);
break;
@@ -157,10 +173,7 @@ wsrep_close_connection(handlerton* hton, THD* thd)
{
DBUG_RETURN(0);
}
-
- if (wsrep_emulate_bin_log && thd_get_ha_data(thd, binlog_hton) != NULL)
- binlog_hton->close_connection (binlog_hton, thd);
- DBUG_RETURN(0);
+ DBUG_RETURN(wsrep_binlog_close_connection (thd));
}
/*
@@ -203,10 +216,11 @@ static int wsrep_savepoint_set(handlerton *hton, THD *thd, void *sv)
DBUG_RETURN(0);
}
- if (!wsrep_emulate_bin_log) return 0;
- int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv);
- return rcode;
+ if (!wsrep_emulate_bin_log) DBUG_RETURN(0);
+ int rcode = wsrep_binlog_savepoint_set(thd, sv);
+ DBUG_RETURN(rcode);
}
+
static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
{
DBUG_ENTER("wsrep_savepoint_rollback");
@@ -216,9 +230,9 @@ static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
DBUG_RETURN(0);
}
- if (!wsrep_emulate_bin_log) return 0;
- int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv);
- return rcode;
+ if (!wsrep_emulate_bin_log) DBUG_RETURN(0);
+ int rcode = wsrep_binlog_savepoint_rollback(thd, sv);
+ DBUG_RETURN(rcode);
}
static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
@@ -231,14 +245,25 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ switch (thd->wsrep_exec_mode)
+ {
+ case TOTAL_ORDER:
+ case REPL_RECV:
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ WSREP_DEBUG("Avoiding wsrep rollback for failed DDL: %s", thd->query());
+ DBUG_RETURN(0);
+ default: break;
+ }
+
if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
- (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY))
+ thd->wsrep_conflict_state != MUST_REPLAY)
{
- if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
+ if (WSREP(thd) && wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
{
DBUG_PRINT("wsrep", ("setting rollback fail"));
- WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
- (long long)thd->real_id, thd->query());
+ WSREP_ERROR("settting rollback fail: thd: %llu, schema: %s, SQL: %s",
+ (long long)thd->real_id, (thd->db ? thd->db : "(null)"),
+ thd->query());
}
wsrep_cleanup_transaction(thd);
}
@@ -274,13 +299,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all)
Transaction didn't go through wsrep->pre_commit() so just roll back
possible changes to clean state.
*/
- if (WSREP_PROVIDER_EXISTS) {
- if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
- {
- DBUG_PRINT("wsrep", ("setting rollback fail"));
- WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
- (long long)thd->real_id, thd->query());
- }
+ if (WSREP(thd) && wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
+ {
+ DBUG_PRINT("wsrep", ("setting rollback fail"));
+ WSREP_ERROR("settting rollback fail: thd: %llu, schema: %s, SQL: %s",
+ (long long)thd->real_id, (thd->db ? thd->db : "(null)"),
+ thd->query());
}
wsrep_cleanup_transaction(thd);
}
@@ -301,13 +325,15 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
IO_CACHE *cache;
int replay_round= 0;
- if (thd->stmt_da->is_error()) {
- WSREP_ERROR("commit issue, error: %d %s",
- thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ if (thd->get_stmt_da()->is_error()) {
+ WSREP_DEBUG("commit issue, error: %d %s",
+ thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message());
}
DBUG_ENTER("wsrep_run_wsrep_commit");
+ DEBUG_SYNC(thd, "wsrep_before_replication");
+
if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK);
if (thd->wsrep_exec_mode == REPL_RECV) {
@@ -353,7 +379,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
while (wsrep_replaying > 0 &&
thd->wsrep_conflict_state == NO_CONFLICT &&
- thd->killed == NOT_KILLED &&
+ thd->killed == NOT_KILLED &&
!shutdown_in_progress)
{
@@ -416,8 +442,8 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
if (data_len == 0)
{
- if (thd->stmt_da->is_ok() &&
- thd->stmt_da->affected_rows() > 0 &&
+ if (thd->get_stmt_da()->is_ok() &&
+ thd->get_stmt_da()->affected_rows() > 0 &&
!binlog_filter->is_on())
{
WSREP_DEBUG("empty rbr buffer, query: %s, "
@@ -425,7 +451,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
"changed tables: %d, "
"sql_log_bin: %d, "
"wsrep status (%d %d %d)",
- thd->query(), thd->stmt_da->affected_rows(),
+ thd->query(), thd->get_stmt_da()->affected_rows(),
stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin,
thd->wsrep_exec_mode, thd->wsrep_query_state,
thd->wsrep_conflict_state);
@@ -441,9 +467,11 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id)
{
WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n"
- "QUERY: %s\n"
- " => Skipping replication",
- thd->thread_id, data_len, thd->query());
+ "schema: %s \n"
+ "QUERY: %s\n"
+ " => Skipping replication",
+ thd->thread_id, data_len,
+ (thd->db ? thd->db : "(null)"), thd->query());
rcode = WSREP_TRX_FAIL;
}
else if (!rcode)
@@ -458,14 +486,15 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
&thd->wsrep_trx_meta);
if (rcode == WSREP_TRX_MISSING) {
- WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s",
- thd->thread_id, thd->query());
+ WSREP_WARN("Transaction missing in provider, thd: %ld, schema: %s, SQL: %s",
+ thd->thread_id, (thd->db ? thd->db : "(null)"), thd->query());
rcode = WSREP_TRX_FAIL;
} else if (rcode == WSREP_BF_ABORT) {
WSREP_DEBUG("thd %lu seqno %lld BF aborted by provider, will replay",
thd->thread_id, (long long)thd->wsrep_trx_meta.gtid.seqno);
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
thd->wsrep_conflict_state = MUST_REPLAY;
+ DBUG_ASSERT(wsrep_thd_trx_seqno(thd) > 0);
mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
mysql_mutex_lock(&LOCK_wsrep_replaying);
wsrep_replaying++;
@@ -481,6 +510,9 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+
+ DEBUG_SYNC(thd, "wsrep_after_replication");
+
switch(rcode) {
case 0:
/*
@@ -505,7 +537,7 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
if (thd->transaction.xid_state.xid.get_my_xid())
{
wsrep_xid_init(&thd->transaction.xid_state.xid,
- &thd->wsrep_trx_meta.gtid.uuid,
+ thd->wsrep_trx_meta.gtid.uuid,
thd->wsrep_trx_meta.gtid.seqno);
}
DBUG_PRINT("wsrep", ("replicating commit success"));