summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSeppo Jaakola <seppo.jaakola@codership.com>2013-07-13 13:01:13 +0300
committerSeppo Jaakola <seppo.jaakola@codership.com>2013-07-13 13:01:13 +0300
commit0a9216835f406947fb4d492616da4cda75e5e113 (patch)
tree7fbf5059c59fa86ca255452f3ce312aece492652 /sql
parent58926b5e1990d3245b55081ba511fbabe2604e17 (diff)
downloadmariadb-git-0a9216835f406947fb4d492616da4cda75e5e113.tar.gz
Initial merge result with mariaDB 10: lp:maria
Diffstat (limited to 'sql')
-rw-r--r--sql/CMakeLists.txt19
-rw-r--r--sql/events.cc12
-rw-r--r--sql/handler.cc130
-rw-r--r--sql/handler.h18
-rw-r--r--sql/item_func.cc12
-rw-r--r--sql/lock.cc46
-rw-r--r--sql/log.cc245
-rw-r--r--sql/log.h26
-rw-r--r--sql/log_event.cc143
-rw-r--r--sql/mdl.cc135
-rw-r--r--sql/mdl.h3
-rw-r--r--sql/mysqld.cc794
-rw-r--r--sql/mysqld.h17
-rw-r--r--sql/protocol.cc8
-rw-r--r--sql/set_var.h6
-rw-r--r--sql/slave.cc25
-rw-r--r--sql/sp.cc34
-rw-r--r--sql/sql_acl.cc37
-rw-r--r--sql/sql_admin.cc3
-rw-r--r--sql/sql_alter.cc20
-rw-r--r--sql/sql_base.cc36
-rw-r--r--sql/sql_builtin.cc.in9
-rw-r--r--sql/sql_class.cc297
-rw-r--r--sql/sql_class.h66
-rw-r--r--sql/sql_connect.cc39
-rw-r--r--sql/sql_delete.cc12
-rw-r--r--sql/sql_insert.cc39
-rw-r--r--sql/sql_lex.cc11
-rw-r--r--sql/sql_parse.cc1152
-rw-r--r--sql/sql_parse.h16
-rw-r--r--sql/sql_plugin.cc6
-rw-r--r--sql/sql_prepare.cc20
-rw-r--r--sql/sql_reload.cc13
-rw-r--r--sql/sql_show.cc3
-rw-r--r--sql/sql_table.cc6
-rw-r--r--sql/sql_trigger.cc52
-rw-r--r--sql/sql_truncate.cc10
-rw-r--r--sql/sql_update.cc12
-rw-r--r--sql/sys_vars.cc220
-rw-r--r--sql/transaction.cc43
40 files changed, 3753 insertions, 42 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 0b4943851f0..ffd8fab16fd 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -13,6 +13,10 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+IF(WITH_WSREP)
+ SET(WSREP_INCLUDES ${CMAKE_SOURCE_DIR}/wsrep)
+ENDIF()
+
INCLUDE_DIRECTORIES(
${CMAKE_SOURCE_DIR}/include
${CMAKE_SOURCE_DIR}/sql
@@ -20,6 +24,7 @@ ${CMAKE_SOURCE_DIR}/regex
${ZLIB_INCLUDE_DIR}
${SSL_INCLUDE_DIRS}
${CMAKE_BINARY_DIR}/sql
+${WSREP_INCLUDES}
)
SET(GEN_SOURCES
@@ -35,6 +40,18 @@ ADD_DEFINITIONS(-DMYSQL_SERVER -DHAVE_EVENT_SCHEDULER -DHAVE_POOL_OF_THREADS)
IF(SSL_DEFINES)
ADD_DEFINITIONS(${SSL_DEFINES})
ENDIF()
+IF(WITH_WSREP)
+ SET(WSREP_SOURCES
+ wsrep_check_opts.cc
+ wsrep_hton.cc
+ wsrep_mysqld.cc
+ wsrep_notify.cc
+ wsrep_sst.cc
+ wsrep_utils.cc
+ wsrep_var.cc
+ )
+ SET(WSREP_LIB wsrep)
+ENDIF()
SET (SQL_SOURCE
../sql-common/client.c derror.cc des_key_file.cc
@@ -90,6 +107,7 @@ SET (SQL_SOURCE
../sql-common/mysql_async.c
my_apc.cc my_apc.h
rpl_gtid.cc
+ ${WSREP_SOURCES}
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
)
@@ -109,6 +127,7 @@ DTRACE_INSTRUMENT(sql)
TARGET_LINK_LIBRARIES(sql ${MYSQLD_STATIC_PLUGIN_LIBS}
mysys dbug strings vio regex
${LIBWRAP} ${LIBCRYPT} ${LIBDL}
+ ${WSREP_LIB}
${SSL_LIBRARIES})
IF(WIN32)
diff --git a/sql/events.cc b/sql/events.cc
index b9c51b77f05..8645f64c4cd 100644
--- a/sql/events.cc
+++ b/sql/events.cc
@@ -1156,7 +1156,19 @@ end:
close_mysql_tables(thd);
DBUG_RETURN(ret);
}
+#ifdef WITH_WSREP
+int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len)
+{
+ String log_query;
+ if (create_query_string(thd, &log_query))
+ {
+ WSREP_WARN("events create string failed: %s", thd->query());
+ return 1;
+ }
+ return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
+}
+#endif /* WITH_WSREP */
/**
@} (End of group Event_Scheduler)
*/
diff --git a/sql/handler.cc b/sql/handler.cc
index 7d97c197da5..8c8089a61fb 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -49,6 +49,9 @@
#include "../storage/maria/ha_maria.h"
#endif
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
/*
While we have legacy_db_type, we have this array to
check for dups and to find handlerton from legacy_db_type.
@@ -1296,7 +1299,11 @@ int ha_commit_trans(THD *thd, bool all)
Free resources and perform other cleanup even for 'empty' transactions.
*/
if (is_real_trans)
- thd->transaction.cleanup();
+#ifdef WITH_WSREP
+ thd->transaction.cleanup(thd);
+#else
+ thd->transaction.cleanup();
+#endif /* WITH_WSREP */
DBUG_RETURN(0);
}
@@ -1324,7 +1331,12 @@ int ha_commit_trans(THD *thd, bool all)
mdl_request.init(MDL_key::COMMIT, "", "", MDL_INTENTION_EXCLUSIVE,
MDL_EXPLICIT);
+#ifdef WITH_WSREP
+ if (!WSREP(thd) &&
+ thd->mdl_context.acquire_lock(&mdl_request,
+#else
if (thd->mdl_context.acquire_lock(&mdl_request,
+#endif /* WITH_WSREP */
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, all);
@@ -1371,6 +1383,19 @@ int ha_commit_trans(THD *thd, bool all)
err= ht->prepare(ht, thd, all);
status_var_increment(thd->status_var.ha_prepare_count);
if (err)
+#ifdef WITH_WSREP
+ if (WSREP(thd) && ht->db_type== DB_TYPE_WSREP)
+ {
+ error= 1;
+ /* avoid sending error, if we need to replay */
+ if (thd->wsrep_conflict_state!= MUST_REPLAY)
+ {
+ my_error(ER_LOCK_DEADLOCK, MYF(0), err);
+ }
+ }
+ else
+ /* not wsrep hton, bail to native mysql behavior */
+#endif
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
if (err)
@@ -1382,6 +1407,13 @@ int ha_commit_trans(THD *thd, bool all)
DEBUG_SYNC(thd, "ha_commit_trans_after_prepare");
DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););
+#ifdef WITH_WSREP
+ if (!error && wsrep_is_wsrep_xid(&thd->transaction.xid_state.xid))
+ {
+ // xid was rewritten by wsrep
+ xid= wsrep_xid_seqno(&thd->transaction.xid_state.xid);
+ }
+#endif // WITH_WSREP
if (!is_real_trans)
{
error= commit_one_phase_2(thd, all, trans, is_real_trans);
@@ -1470,6 +1502,18 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
int error= 0;
Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
DBUG_ENTER("commit_one_phase_2");
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64]= { 0, };
+ snprintf (info, sizeof(info) - 1, "ha_commit_one_phase(%lld)",
+ (long long)thd->wsrep_trx_seqno);
+#else
+ const char info[]="ha_commit_one_phase()";
+#endif /* WSREP_PROC_INFO */
+ char* tmp_info= NULL;
+ if (WSREP(thd)) tmp_info= (char *)thd_proc_info(thd, info);
+#endif /* WITH_WSREP */
+
if (ha_info)
{
for (; ha_info; ha_info= ha_info_next)
@@ -1498,7 +1542,14 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
}
/* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans)
+#ifdef WITH_WSREP
+ thd->transaction.cleanup(thd);
+#else
thd->transaction.cleanup();
+#endif /* WITH_WSREP */
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp_info);
+#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
@@ -1573,7 +1624,11 @@ int ha_rollback_trans(THD *thd, bool all)
}
/* Always cleanup. Even if nht==0. There may be savepoints. */
if (is_real_trans)
+#ifdef WITH_WSREP
+ thd->transaction.cleanup(thd);
+#else
thd->transaction.cleanup();
+#endif /* WITH_WSREP */
if (all)
thd->transaction_rollback_request= FALSE;
@@ -1738,7 +1793,13 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
got, hton_name(hton)->str);
for (int i=0; i < got; i ++)
{
+#ifdef WITH_WSREP
+ my_xid x=(wsrep_is_wsrep_xid(&info->list[i]) ?
+ wsrep_xid_seqno(&info->list[i]) :
+ info->list[i].get_my_xid());
+#else
my_xid x=info->list[i].get_my_xid();
+#endif /* WITH_WSREP */
if (!x) // not "mine" - that is generated by external TM
{
#ifndef DBUG_OFF
@@ -2943,7 +3004,12 @@ int handler::update_auto_increment()
variables->auto_increment_increment);
auto_inc_intervals_count++;
/* Row-based replication does not need to store intervals in binlog */
+#ifdef WITH_WSREP
+ if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) &&
+ !thd->is_current_stmt_binlog_format_row())
+#else
if (mysql_bin_log.is_open() && !thd->is_current_stmt_binlog_format_row())
+#endif /* WITH_WSREP */
thd->auto_inc_intervals_in_cur_stmt_for_binlog.append(auto_inc_interval_for_cur_row.minimum(),
auto_inc_interval_for_cur_row.values(),
variables->auto_increment_increment);
@@ -5320,7 +5386,11 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table)
return (thd->is_current_stmt_binlog_format_row() &&
table->s->cached_row_logging_check &&
(thd->variables.option_bits & OPTION_BIN_LOG) &&
+#ifdef WITH_WSREP
+ ((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()));
+#else
mysql_bin_log.is_open());
+#endif
}
@@ -5653,6 +5723,64 @@ void signal_log_not_needed(struct handlerton, char *log_file)
}
+#ifdef WITH_WSREP
+/**
+ @details
+ This function makes the storage engine to force the victim transaction
+ to abort. Currently, only innodb has this functionality, but any SE
+ implementing the wsrep API should provide this service to support
+ multi-master operation.
+
+ @param bf_thd brute force THD asking for the abort
+ @param victim_thd victim THD to be aborted
+
+ @return
+ always 0
+*/
+
+int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal)
+{
+ DBUG_ENTER("ha_wsrep_abort_transaction");
+ if (!WSREP(bf_thd) &&
+ !(wsrep_OSU_method_options == WSREP_OSU_RSU &&
+ bf_thd->wsrep_exec_mode == TOTAL_ORDER)) {
+ DBUG_RETURN(0);
+ }
+
+ handlerton *hton= installed_htons[DB_TYPE_INNODB];
+ if (hton && hton->wsrep_abort_transaction)
+ {
+ hton->wsrep_abort_transaction(hton, bf_thd, victim_thd, signal);
+ }
+ else
+ {
+ WSREP_WARN("cannot abort InnoDB transaction");
+ }
+
+ DBUG_RETURN(0);
+}
+
+void ha_wsrep_fake_trx_id(THD *thd)
+{
+ DBUG_ENTER("ha_wsrep_fake_trx_id");
+ if (!WSREP(thd))
+ {
+ DBUG_VOID_RETURN;
+ }
+
+ handlerton *hton= installed_htons[DB_TYPE_INNODB];
+ if (hton && hton->wsrep_fake_trx_id)
+ {
+ hton->wsrep_fake_trx_id(hton, thd);
+ }
+ else
+ {
+ WSREP_WARN("cannot get get fake InnoDB transaction ID");
+ }
+
+ DBUG_VOID_RETURN;
+}
+#endif /* WITH_WSREP */
#ifdef TRANS_LOG_MGM_EXAMPLE_CODE
/*
Example of transaction log management functions based on assumption that logs
diff --git a/sql/handler.h b/sql/handler.h
index a5fb0d8fd9d..c5639053fe2 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -372,6 +372,7 @@ enum legacy_db_type
DB_TYPE_PBXT=23,
DB_TYPE_MARIA=27,
DB_TYPE_PERFORMANCE_SCHEMA=28,
+ DB_TYPE_WSREP=41,
DB_TYPE_FIRST_DYNAMIC=42,
DB_TYPE_DEFAULT=127 // Must be last
};
@@ -1107,6 +1108,13 @@ struct handlerton
enum handler_create_iterator_result
(*create_iterator)(handlerton *hton, enum handler_iterator_type type,
struct handler_iterator *fill_this_in);
+#ifdef WITH_WSREP
+ int (*wsrep_abort_transaction)(handlerton *hton, THD *bf_thd,
+ THD *victim_thd, my_bool signal);
+ int (*wsrep_set_checkpoint)(handlerton *hton, const XID* xid);
+ int (*wsrep_get_checkpoint)(handlerton *hton, XID* xid);
+ void (*wsrep_fake_trx_id)(handlerton *hton, THD *thd);
+#endif /* WITH_WSREP */
/*
Optional clauses in the CREATE/ALTER TABLE
*/
@@ -3154,6 +3162,9 @@ bool key_uses_partial_cols(TABLE *table, uint keyno);
extern const char *ha_row_type[];
extern MYSQL_PLUGIN_IMPORT const char *tx_isolation_names[];
extern MYSQL_PLUGIN_IMPORT const char *binlog_format_names[];
+#ifdef WITH_WSREP
+extern MYSQL_PLUGIN_IMPORT const char *wsrep_binlog_format_names[];
+#endif /* WITH_WSREP */
extern TYPELIB tx_isolation_typelib;
extern const char *myisam_stats_method_names[];
extern ulong total_ha, total_ha_2pc;
@@ -3269,6 +3280,10 @@ int ha_enable_transaction(THD *thd, bool on);
int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv);
int ha_savepoint(THD *thd, SAVEPOINT *sv);
int ha_release_savepoint(THD *thd, SAVEPOINT *sv);
+#ifdef WITH_WSREP
+int ha_wsrep_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal);
+void ha_wsrep_fake_trx_id(THD *thd);
+#endif /* WITH_WSREP */
/* these are called by storage engines */
void trans_register_ha(THD *thd, bool all, handlerton *ht);
@@ -3299,6 +3314,9 @@ int ha_binlog_end(THD *thd);
#define ha_binlog_wait(a) do {} while (0)
#define ha_binlog_end(a) do {} while (0)
#endif
+#ifdef WITH_WSREP
+void wsrep_brute_force_aborts();
+#endif
const char *get_canonical_filename(handler *file, const char *path,
char *tmp_path);
diff --git a/sql/item_func.cc b/sql/item_func.cc
index d0fca63688a..a231b245b53 100644
--- a/sql/item_func.cc
+++ b/sql/item_func.cc
@@ -2596,7 +2596,19 @@ void Item_func_rand::seed_random(Item *arg)
TODO: do not do reinit 'rand' for every execute of PS/SP if
args[0] is a constant.
*/
+#ifdef WITH_WSREP
+ uint32 tmp;
+ if (WSREP(current_thd))
+ {
+ if (current_thd->wsrep_exec_mode==REPL_RECV)
+ tmp= current_thd->wsrep_rand;
+ else
+ tmp= current_thd->wsrep_rand= (uint32) arg->val_int();
+ } else
+ tmp= (uint32) arg->val_int();
+#else
uint32 tmp= (uint32) arg->val_int();
+#endif /* WITH_WSREP */
my_rnd_init(rand, (uint32) (tmp*0x10001L+55555555L),
(uint32) (tmp*0x10000001L));
}
diff --git a/sql/lock.cc b/sql/lock.cc
index 67c8b240c6f..b5605a0124a 100644
--- a/sql/lock.cc
+++ b/sql/lock.cc
@@ -84,6 +84,10 @@
#include <hash.h>
#include <assert.h>
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
+
/**
@defgroup Locking Locking
@{
@@ -314,6 +318,9 @@ bool mysql_lock_tables(THD *thd, MYSQL_LOCK *sql_lock, uint flags)
/* Copy the lock data array. thr_multi_lock() reorders its contents. */
memcpy(sql_lock->locks + sql_lock->lock_count, sql_lock->locks,
sql_lock->lock_count * sizeof(*sql_lock->locks));
+#ifdef WITH_WSREP
+ thd->lock_info.in_lock_tables= thd->in_lock_tables;
+#endif /* Lock on the copied half of the lock data array. */
/* Lock on the copied half of the lock data array. */
rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(sql_lock->locks +
sql_lock->lock_count,
@@ -334,6 +341,9 @@ end:
my_error(rc, MYF(0));
thd->set_time_after_lock();
+#ifdef WITH_WSREP
+ thd_proc_info(thd, "exit mysqld_lock_tables()");
+#endif /* WITH_WSREP */
DBUG_RETURN(rc);
}
@@ -1051,11 +1061,15 @@ void Global_read_lock::unlock_global_read_lock(THD *thd)
{
thd->mdl_context.release_lock(m_mdl_blocks_commits_lock);
m_mdl_blocks_commits_lock= NULL;
+#ifdef WITH_WSREP
+ wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
+ wsrep->resume(wsrep);
+#endif /* WITH_WSREP */
}
thd->mdl_context.release_lock(m_mdl_global_shared_lock);
m_mdl_global_shared_lock= NULL;
m_state= GRL_NONE;
-
+
DBUG_VOID_RETURN;
}
@@ -1083,6 +1097,20 @@ bool Global_read_lock::make_global_read_lock_block_commit(THD *thd)
If we didn't succeed lock_global_read_lock(), or if we already suceeded
make_global_read_lock_block_commit(), do nothing.
*/
+
+#ifdef WITH_WSREP
+ if (m_mdl_blocks_commits_lock)
+ {
+ WSREP_DEBUG("GRL was in block commit mode when entering "
+ "make_global_read_lock_block_commit");
+ thd->mdl_context.release_lock(m_mdl_blocks_commits_lock);
+ m_mdl_blocks_commits_lock= NULL;
+ wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
+ wsrep->resume(wsrep);
+ m_state= GRL_ACQUIRED;
+ }
+#endif /* WITH_WSREP */
+
if (m_state != GRL_ACQUIRED)
DBUG_RETURN(0);
@@ -1095,6 +1123,22 @@ bool Global_read_lock::make_global_read_lock_block_commit(THD *thd)
m_mdl_blocks_commits_lock= mdl_request.ticket;
m_state= GRL_ACQUIRED_AND_BLOCKS_COMMIT;
+#ifdef WITH_WSREP
+ long long ret = wsrep->pause(wsrep);
+ if (ret >= 0)
+ {
+ wsrep_locked_seqno= ret;
+ }
+ else if (ret != -ENOSYS) /* -ENOSYS - no provider */
+ {
+ WSREP_ERROR("Failed to pause provider: %lld (%s)", -ret, strerror(-ret));
+
+ /* m_mdl_blocks_commits_lock is always NULL here */
+ wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ DBUG_RETURN(TRUE);
+ }
+#endif /* WITH_WSREP */
DBUG_RETURN(FALSE);
}
diff --git a/sql/log.cc b/sql/log.cc
index 6726be36e74..aba3c67583b 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -52,6 +52,9 @@
#include "sql_plugin.h"
#include "rpl_handler.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
#include "debug_sync.h"
#include "sql_show.h"
#include "my_pthread.h"
@@ -509,6 +512,9 @@ private:
};
handlerton *binlog_hton;
+#ifdef WITH_WSREP
+extern handlerton *wsrep_hton;
+#endif
bool LOGGER::is_log_table_enabled(uint log_table_type)
{
@@ -523,6 +529,134 @@ bool LOGGER::is_log_table_enabled(uint log_table_type)
}
}
+#ifdef WITH_WSREP
+IO_CACHE * get_trans_log(THD * thd)
+{
+ binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
+ thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ {
+ return cache_mngr->get_binlog_cache_log(true);
+ }
+ else
+ {
+ WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id);
+ return NULL;
+ }
+}
+
+
+bool wsrep_trans_cache_is_empty(THD *thd)
+{
+ bool res= TRUE;
+
+ if (thd_sql_command((const THD*) thd) != SQLCOM_SELECT)
+ res= FALSE;
+ else
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ {
+ res= cache_mngr->trx_cache.empty();
+ }
+ }
+ return res;
+}
+
+void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
+{
+ thd->binlog_flush_pending_rows_event(stmt_end);
+}
+void thd_binlog_trx_reset(THD * thd)
+{
+ /*
+ todo: fix autocommit select to not call the caller
+ */
+ if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr) cache_mngr->reset(TRUE, TRUE);
+ }
+ thd->clear_binlog_table_maps();
+}
+
+void thd_binlog_rollback_stmt(THD * thd)
+{
+ WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
+}
+/*
+ Write the contents of a cache to memory buffer.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+
+int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len)
+{
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ return ER_ERROR_ON_WRITE;
+ uint length= my_b_bytes_in_cache(cache);
+ long long total_length = 0;
+ uchar *buf_ptr = NULL;
+
+ do
+ {
+ /* bail out if buffer grows too large
+ This is a temporary fix to avoid flooding replication
+ TODO: remove this check for 0.7.4 release
+ */
+ if (total_length > wsrep_max_ws_size)
+ {
+ WSREP_WARN("transaction size limit (%lld) exceeded: %lld",
+ wsrep_max_ws_size, total_length);
+ if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0))
+ {
+ WSREP_WARN("failed to initialize io-cache");
+ }
+ if (buf_ptr) my_free(*buf);
+ *buf_len = 0;
+ return ER_ERROR_ON_WRITE;
+ }
+ if (total_length > 0)
+ {
+ *buf_len += length;
+ *buf = (uchar *)my_realloc(*buf, total_length+length, MYF(0));
+ if (!*buf)
+ {
+ WSREP_ERROR("io cache write problem: %d %d", *buf_len, length);
+ return ER_ERROR_ON_WRITE;
+ }
+ buf_ptr = *buf+total_length;
+ }
+ else
+ {
+ if (buf_ptr != NULL)
+ {
+ WSREP_ERROR("io cache alloc error: %d %d", *buf_len, length);
+ my_free(*buf);
+ }
+ if (length > 0)
+ {
+ *buf = (uchar *) my_malloc(length, MYF(0));
+ buf_ptr = *buf;
+ *buf_len = length;
+ }
+ }
+ total_length += length;
+
+ memcpy(buf_ptr, cache->read_pos, length);
+ cache->read_pos=cache->read_end;
+ } while ((cache->file >= 0) && (length= my_b_fill(cache)));
+
+ return 0;
+}
+#endif
/* Check if a given table is opened log table */
int check_if_log_table(size_t db_len, const char *db, size_t table_name_len,
@@ -1555,7 +1689,11 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos)
DBUG_ENTER("binlog_trans_log_savepos");
DBUG_ASSERT(pos != NULL);
binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data();
+#ifdef WITH_WSREP
+ DBUG_ASSERT((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open());
+#else
DBUG_ASSERT(mysql_bin_log.is_open());
+#endif
*pos= cache_mngr->trx_cache.get_byte_position();
DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos));
DBUG_VOID_RETURN;
@@ -1603,7 +1741,16 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
int binlog_init(void *p)
{
binlog_hton= (handlerton *)p;
+#ifdef WITH_WSREP
+ if (WSREP_ON)
+ binlog_hton->state= SHOW_OPTION_YES;
+ else
+ {
+#endif /* WITH_WSREP */
binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
binlog_hton->db_type=DB_TYPE_BINLOG;
binlog_hton->savepoint_offset= sizeof(my_off_t);
binlog_hton->close_connection= binlog_close_connection;
@@ -1718,6 +1865,13 @@ static inline int
binlog_commit_flush_stmt_cache(THD *thd, bool all,
binlog_cache_mngr *cache_mngr)
{
+#ifdef WITH_WSREP
+ if (thd->wsrep_mysql_replicated > 0)
+ {
+ WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", thd->wsrep_mysql_replicated);
+ return 0;
+ }
+#endif
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
FALSE, TRUE, TRUE, 0);
return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE));
@@ -1873,6 +2027,9 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
DBUG_ENTER("binlog_commit");
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ if (!cache_mngr) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
DBUG_PRINT("debug",
("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
@@ -1929,6 +2086,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
int error= 0;
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ if (!cache_mngr) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
YESNO(all),
@@ -1957,8 +2117,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
-
+#ifdef WITH_WSREP
+ if (!wsrep_emulate_bin_log &&
+ mysql_bin_log.check_write_error(thd))
+#else
if (mysql_bin_log.check_write_error(thd))
+#endif
{
/*
"all == true" means that a "rollback statement" triggered the error and
@@ -1988,12 +2152,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
if (ending_trans(thd, all) &&
((thd->variables.option_bits & OPTION_KEEP_LOG) ||
(trans_has_updated_non_trans_table(thd) &&
- thd->variables.binlog_format == BINLOG_FORMAT_STMT) ||
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) ||
(cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED) ||
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) ||
(trans_has_updated_non_trans_table(thd) &&
ending_single_stmt_trans(thd,all) &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED)))
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED)))
error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
Truncate the cache if:
@@ -2007,9 +2171,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
else if (ending_trans(thd, all) ||
(!(thd->variables.option_bits & OPTION_KEEP_LOG) &&
(!stmt_has_updated_non_trans_table(thd) ||
- thd->variables.binlog_format != BINLOG_FORMAT_STMT) &&
+ WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) &&
(!cache_mngr->trx_cache.changes_to_non_trans_temp_table() ||
- thd->variables.binlog_format != BINLOG_FORMAT_MIXED)))
+ WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED)))
error= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
@@ -2103,6 +2267,9 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
int error= 1;
char buf[1024];
+#ifdef WITH_WSREP
+ if (wsrep_emulate_bin_log) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
String log_query(buf, sizeof(buf), &my_charset_bin);
if (log_query.copy(STRING_WITH_LEN("SAVEPOINT "), &my_charset_bin) ||
append_identifier(thd, &log_query,
@@ -2139,7 +2306,12 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
+#ifdef WITH_WSREP
+ if (!wsrep_emulate_bin_log &&
+ unlikely(trans_has_updated_non_trans_table(thd) ||
+#else
if (unlikely(trans_has_updated_non_trans_table(thd) ||
+#endif
(thd->variables.option_bits & OPTION_KEEP_LOG)))
{
char buf[1024];
@@ -5076,6 +5248,7 @@ binlog_cache_mngr *THD::binlog_setup_trx_data()
DBUG_RETURN(cache_mngr);
}
+
/*
Function to start a statement and optionally a transaction for the
binary log.
@@ -5193,7 +5366,12 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
table->s->table_map_id));
/* Pre-conditions */
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ (WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event
@@ -5326,7 +5504,11 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
+#ifdef WITH_WSREP
+ DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
+#else
DBUG_ASSERT(mysql_bin_log.is_open());
+#endif
DBUG_PRINT("enter", ("event: 0x%lx", (long) event));
int error= 0;
@@ -5645,7 +5827,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
mostly called if is_open() *was* true a few instructions before, but it
could have changed since.
*/
+#ifdef WITH_WSREP
+ if ((WSREP(thd) && wsrep_emulate_bin_log) || is_open())
+#else
if (likely(is_open()))
+#endif
{
my_off_t UNINIT_VAR(my_org_b_tell);
#ifdef HAVE_REPLICATION
@@ -5827,6 +6013,35 @@ err:
}
}
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_incremental_data_collection &&
+ (wsrep_emulate_bin_log || mysql_bin_log.is_open()))
+ {
+ DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1);
+ if (!error)
+ {
+ IO_CACHE* cache= get_trans_log(thd);
+ uchar* buf= NULL;
+ uint buf_len= 0;
+
+ if (wsrep_emulate_bin_log)
+ thd->binlog_flush_pending_rows_event(false);
+ error= wsrep_write_cache(cache, &buf, &buf_len);
+ if (!error && buf_len > 0)
+ {
+ wsrep_status_t rc= wsrep->append_data(wsrep,
+ &thd->wsrep_trx_handle,
+ buf, buf_len);
+ if (rc != WSREP_OK)
+ {
+ sql_print_warning("WSREP: append_data() returned %d", rc);
+ error= 1;
+ }
+ }
+ if (buf_len) my_free(buf);
+ }
+ }
+#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
@@ -5982,6 +6197,14 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::rotate");
+#ifdef WITH_WSREP
+ if (WSREP_ON && wsrep_to_isolation)
+ {
+ WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d",
+ wsrep_to_isolation);
+ DBUG_RETURN(0);
+ }
+#endif
//todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log);
*check_purge= false;
@@ -6528,6 +6751,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
Ha_trx_info *ha_info;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+#ifdef WITH_WSREP
+ if (wsrep_emulate_bin_log) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
entry.thd= thd;
entry.cache_mngr= cache_mngr;
entry.error= 0;
@@ -8321,7 +8547,14 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
if (!cache_mngr)
+#ifdef WITH_WSREP
+ {
+ WSREP_DEBUG("Skipping empty log_xid: %s", thd->query());
+ DBUG_RETURN(0);
+ }
+#else
DBUG_RETURN(0);
+#endif /* WITH_WSREP */
cache_mngr->using_xa= TRUE;
cache_mngr->xa_xid= xid;
diff --git a/sql/log.h b/sql/log.h
index 018ac64eff7..964d75916a9 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -95,7 +95,7 @@ public:
int log_and_order(THD *thd, my_xid xid, bool all,
bool need_prepare_ordered, bool need_commit_ordered)
{
- DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */);
+ //DBUG_ASSERT(0 /* Internal error - TC_LOG_DUMMY::log_and_order() called */);
return 1;
}
int unlog(ulong cookie, my_xid xid) { return 0; }
@@ -287,6 +287,12 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED };
(mmap+fsync is two times faster than write+fsync)
*/
+#ifdef WITH_WSREP
+extern my_bool wsrep_emulate_bin_log;
+Log_event* wsrep_read_log_event(
+ char **arg_buf, size_t *arg_buf_len,
+ const Format_description_log_event *description_event);
+#endif
class MYSQL_LOG
{
public:
@@ -952,12 +958,30 @@ public:
};
enum enum_binlog_format {
+ /*
+ statement-based except for cases where only row-based can work (UUID()
+ etc):
+ */
BINLOG_FORMAT_MIXED= 0, ///< statement if safe, otherwise row - autodetected
BINLOG_FORMAT_STMT= 1, ///< statement-based
BINLOG_FORMAT_ROW= 2, ///< row-based
BINLOG_FORMAT_UNSPEC=3 ///< thd_binlog_format() returns it when binlog is closed
};
+#ifdef WITH_WSREP
+IO_CACHE * get_trans_log(THD * thd);
+bool wsrep_trans_cache_is_empty(THD *thd);
+void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end);
+void thd_binlog_trx_reset(THD * thd);
+void thd_binlog_rollback_stmt(THD * thd);
+int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len);
+
+#define WSREP_FORMAT(my_format) \
+ ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \
+ wsrep_forced_binlog_format : my_format)
+#else
+#define WSREP_FORMAT(my_format) my_format
+#endif
int query_error_code(THD *thd, bool not_killed);
uint purge_log_get_error_code(int res);
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fce0130e8dd..2968764920e 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -49,6 +49,9 @@
#include "sql_show.h" // append_identifier
#include <strfunc.h>
+#if WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
#endif /* MYSQL_CLIENT */
#include <base64.h>
@@ -2873,7 +2876,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
master_data_written(0)
{
time_t end_time;
-
+#ifdef WITH_WSREP
+ thd->wsrep_PA_safe= false;
+#endif /* WITH_WSREP */
memset(&user, 0, sizeof(user));
memset(&host, 0, sizeof(host));
@@ -9078,10 +9083,21 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
-
if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->stmt_da->sql_errno();
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ WSREP_WARN("BF applier failed to open_and_lock_tables: %u, fatal: %d "
+ "wsrep = (exec_mode: %d conflict_state: %d seqno: %lld)",
+ thd->stmt_da->sql_errno(),
+ thd->is_fatal_error,
+ thd->wsrep_exec_mode,
+ thd->wsrep_conflict_state,
+ (long long)thd->wsrep_trx_seqno);
+ }
+#endif
if (thd->is_slave_error || thd->is_fatal_error)
{
/*
@@ -10190,7 +10206,12 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list)
DBUG_ENTER("check_table_map");
enum_tbl_map_status res= OK_TO_PROCESS;
+#ifdef WITH_WSREP
+ if ((rli->sql_thd->slave_thread /* filtering is for slave only */ ||
+ (WSREP(rli->sql_thd) && rli->sql_thd->wsrep_applier)) &&
+#else
if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
+#endif /* WITH_WSREP */
(!rli->mi->rpl_filter->db_ok(table_list->db) ||
(rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
res= FILTERED_OUT;
@@ -10833,8 +10854,23 @@ int
Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Write_rows_log_event::write_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
if (error && !thd->is_error())
{
DBUG_ASSERT(0);
@@ -11499,14 +11535,39 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
int error;
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Delete_rows_log_event::find_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
if (!(error= find_row(rli)))
{
/*
Delete the record found, located in record[0]
*/
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Delete_rows_log_event::ha_delete_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
error= m_table->file->ha_delete_row(m_table->record[0]);
m_table->file->ha_index_or_rnd_end();
}
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
return error;
}
@@ -11624,6 +11685,18 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
{
DBUG_ASSERT(m_table != NULL);
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ char info[64];
+ info[sizeof(info) - 1] = '\0';
+ snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
+#else
+ const char* tmp = (WSREP(thd)) ?
+ thd_proc_info(thd,"Update_rows_log_event::find_row()") : NULL;
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
int error= find_row(rli);
if (error)
{
@@ -11650,6 +11723,17 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Update_rows_log_event::unpack_current_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd))
+ thd_proc_info(thd,"Update_rows_log_event::unpack_current_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
/* this also updates m_curr_row_end */
if ((error= unpack_current_row(rli)))
goto err;
@@ -11668,10 +11752,23 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
#endif
+#ifdef WITH_WSREP
+#ifdef WSREP_PROC_INFO
+ snprintf(info, sizeof(info) - 1,
+ "Update_rows_log_event::ha_update_row(%lld)",
+ (long long) thd->wsrep_trx_seqno);
+ if (WSREP(thd)) thd_proc_info(thd, info);
+#else
+ if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()");
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
error= m_table->file->ha_update_row(m_table->record[1], m_table->record[0]);
if (error == HA_ERR_RECORD_IS_THE_SAME)
error= 0;
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp);
+#endif /* WITH_WSREP */
err:
m_table->file->ha_index_or_rnd_end();
return error;
@@ -11756,6 +11853,48 @@ void Incident_log_event::pack_info(THD *thd, Protocol *protocol)
protocol->store(buf, bytes, &my_charset_bin);
}
#endif
+#if WITH_WSREP && !defined(MYSQL_CLIENT)
+Format_description_log_event *wsrep_format_desc; // TODO: free them at the end
+/*
+ read the first event from (*buf). The size of the (*buf) is (*buf_len).
+ At the end (*buf) is shitfed to point to the following event or NULL and
+ (*buf_len) will be changed to account just being read bytes of the 1st event.
+*/
+#define WSREP_MAX_ALLOWED_PACKET 1024*1024*1024 // current protocol max
+
+Log_event* wsrep_read_log_event(
+ char **arg_buf, size_t *arg_buf_len,
+ const Format_description_log_event *description_event)
+{
+ DBUG_ENTER("wsrep_read_log_event");
+ char *head= (*arg_buf);
+
+ uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
+ char *buf= (*arg_buf);
+ const char *error= 0;
+ Log_event *res= 0;
+
+ if (data_len > WSREP_MAX_ALLOWED_PACKET)
+ {
+ error = "Event too big";
+ goto err;
+ }
+
+ res= Log_event::read_log_event(buf, data_len, &error, description_event, FALSE);
+
+err:
+ if (!res)
+ {
+ DBUG_ASSERT(error != 0);
+ sql_print_error("Error in Log_event::read_log_event(): "
+ "'%s', data_len: %d, event_type: %d",
+ error,data_len,head[EVENT_TYPE_OFFSET]);
+ }
+ (*arg_buf)+= data_len;
+ (*arg_buf_len)-= data_len;
+ DBUG_RETURN(res);
+}
+#endif
#ifdef MYSQL_CLIENT
diff --git a/sql/mdl.cc b/sql/mdl.cc
index 9402de02c36..e7321d35e77 100644
--- a/sql/mdl.cc
+++ b/sql/mdl.cc
@@ -22,6 +22,17 @@
#include <mysql/plugin.h>
#include <mysql/service_thd_wait.h>
#include <mysql/psi/mysql_stage.h>
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
+extern "C" char *wsrep_thd_query(THD *thd);
+void sql_print_information(const char *format, ...)
+ ATTRIBUTE_FORMAT(printf, 1, 2);
+
+extern bool
+wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
+ MDL_ticket *ticket);
+#endif /* WITH_WSREP */
#ifdef HAVE_PSI_INTERFACE
static PSI_mutex_key key_MDL_map_mutex;
@@ -1311,11 +1322,54 @@ void MDL_lock::Ticket_list::add_ticket(MDL_ticket *ticket)
called by other threads.
*/
DBUG_ASSERT(ticket->get_lock());
+#ifdef WITH_WSREP
+ if ((this == &(ticket->get_lock()->m_waiting)) &&
+ wsrep_thd_is_brute_force((void *)(ticket->get_ctx()->get_thd())))
+ {
+ Ticket_iterator itw(ticket->get_lock()->m_waiting);
+ Ticket_iterator itg(ticket->get_lock()->m_granted);
+
+ MDL_ticket *waiting, *granted;
+ MDL_ticket *prev=NULL;
+ bool added= false;
+
+ while ((waiting= itw++) && !added)
+ {
+ if (!wsrep_thd_is_brute_force((void *)(waiting->get_ctx()->get_thd())))
+ {
+ WSREP_DEBUG("MDL add_ticket inserted before: %lu %s",
+ wsrep_thd_thread_id(waiting->get_ctx()->get_thd()),
+ wsrep_thd_query(waiting->get_ctx()->get_thd()));
+ m_list.insert_after(prev, ticket);
+ added= true;
+ }
+ prev= waiting;
+ }
+ if (!added) m_list.push_back(ticket);
+
+ while ((granted= itg++))
+ {
+ if (granted->get_ctx() != ticket->get_ctx() &&
+ granted->is_incompatible_when_granted(ticket->get_type()))
+ {
+ if (!wsrep_grant_mdl_exception(ticket->get_ctx(), granted))
+ {
+ WSREP_DEBUG("MDL victim killed at add_ticket");
+ }
+ }
+ }
+ }
+ else
+ {
+#endif /* WITH_WSREP */
/*
Add ticket to the *back* of the queue to ensure fairness
among requests with the same priority.
*/
m_list.push_back(ticket);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
m_bitmap|= MDL_BIT(ticket->get_type());
}
@@ -1621,7 +1675,6 @@ MDL_object_lock::m_waiting_incompatible[MDL_TYPE_END] =
0
};
-
/**
Check if request for the metadata lock can be satisfied given its
current state.
@@ -1646,6 +1699,9 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg,
bool can_grant= FALSE;
bitmap_t waiting_incompat_map= incompatible_waiting_types_bitmap()[type_arg];
bitmap_t granted_incompat_map= incompatible_granted_types_bitmap()[type_arg];
+#ifdef WITH_WSREP
+ bool wsrep_can_grant= TRUE;
+#endif /* WITH_WSREP */
/*
New lock request can be satisfied iff:
@@ -1668,12 +1724,59 @@ MDL_lock::can_grant_lock(enum_mdl_type type_arg,
{
if (ticket->get_ctx() != requestor_ctx &&
ticket->is_incompatible_when_granted(type_arg))
+#ifdef WITH_WSREP
+ {
+ if (wsrep_thd_is_brute_force((void *)(requestor_ctx->get_thd())) &&
+ key.mdl_namespace() == MDL_key::GLOBAL)
+ {
+ WSREP_DEBUG("global lock granted for BF: %lu %s",
+ wsrep_thd_thread_id(requestor_ctx->get_thd()),
+ wsrep_thd_query(requestor_ctx->get_thd()));
+ can_grant = true;
+ }
+ else if (!wsrep_grant_mdl_exception(requestor_ctx, ticket))
+ {
+ wsrep_can_grant= FALSE;
+ if (wsrep_log_conflicts)
+ {
+ MDL_lock * lock = ticket->get_lock();
+ WSREP_INFO(
+ "MDL conflict db=%s table=%s ticket=%d solved by %s",
+ lock->key.db_name(), lock->key.name(), ticket->get_type(), "abort"
+ );
+ }
+ }
+ else
+ {
+ can_grant= TRUE;
+ }
+ }
+#else
break;
+#endif /* WITH_WSREP */
}
+#ifdef WITH_WSREP
+ if ((ticket == NULL) && wsrep_can_grant)
+#else
if (ticket == NULL) /* Incompatible locks are our own. */
+#endif /* WITH_WSREP */
+
can_grant= TRUE;
}
}
+#ifdef WITH_WSREP
+ else
+ {
+ if (wsrep_thd_is_brute_force((void *)(requestor_ctx->get_thd())) &&
+ key.mdl_namespace() == MDL_key::GLOBAL)
+ {
+ WSREP_DEBUG("global lock granted for BF (waiting queue): %lu %s",
+ wsrep_thd_thread_id(requestor_ctx->get_thd()),
+ wsrep_thd_query(requestor_ctx->get_thd()));
+ can_grant = true;
+ }
+ }
+#endif /* WITH_WSREP */
return can_grant;
}
@@ -3019,3 +3122,33 @@ void MDL_context::set_transaction_duration_for_all_locks()
ticket->m_duration= MDL_TRANSACTION;
#endif
}
+#ifdef WITH_WSREP
+void MDL_ticket::wsrep_report(bool debug)
+{
+ if (debug)
+ {
+ WSREP_DEBUG("MDL ticket: type: %s space: %s db: %s name: %s (%s)",
+ (get_type() == MDL_INTENTION_EXCLUSIVE) ? "intention exclusive" :
+ ((get_type() == MDL_SHARED) ? "shared" :
+ ((get_type() == MDL_SHARED_HIGH_PRIO ? "shared high prio" :
+ ((get_type() == MDL_SHARED_READ) ? "shared read" :
+ ((get_type() == MDL_SHARED_WRITE) ? "shared write" :
+ ((get_type() == MDL_SHARED_NO_WRITE) ? "shared no write" :
+ ((get_type() == MDL_SHARED_NO_READ_WRITE) ? "shared no read write" :
+ ((get_type() == MDL_EXCLUSIVE) ? "exclusive" :
+ "UNKNOWN")))))))),
+ (m_lock->key.mdl_namespace() == MDL_key::GLOBAL) ? "GLOBAL" :
+ ((m_lock->key.mdl_namespace() == MDL_key::SCHEMA) ? "SCHEMA" :
+ ((m_lock->key.mdl_namespace() == MDL_key::TABLE) ? "TABLE" :
+ ((m_lock->key.mdl_namespace() == MDL_key::TABLE) ? "FUNCTION" :
+ ((m_lock->key.mdl_namespace() == MDL_key::TABLE) ? "PROCEDURE" :
+ ((m_lock->key.mdl_namespace() == MDL_key::TABLE) ? "TRIGGER" :
+ ((m_lock->key.mdl_namespace() == MDL_key::TABLE) ? "EVENT" :
+ ((m_lock->key.mdl_namespace() == MDL_key::COMMIT) ? "COMMIT" :
+ (char *)"UNKNOWN"))))))),
+ m_lock->key.db_name(),
+ m_lock->key.name(),
+ m_lock->key.get_wait_state_name());
+ }
+}
+#endif /* WITH_WSREP */
diff --git a/sql/mdl.h b/sql/mdl.h
index 944c6bb6349..e4986aaba95 100644
--- a/sql/mdl.h
+++ b/sql/mdl.h
@@ -504,6 +504,9 @@ public:
MDL_ticket *next_in_lock;
MDL_ticket **prev_in_lock;
public:
+#ifdef WITH_WSREP
+ void wsrep_report(bool debug);
+#endif /* WITH_WSREP */
bool has_pending_conflicting_lock() const;
MDL_context *get_ctx() const { return m_ctx; }
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 0a49eb0c7ee..1b61167cf3b 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -73,6 +73,10 @@
#include "scheduler.h"
#include <waiting_threads.h>
#include "debug_sync.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+ulong wsrep_running_threads = 0; // # of currently running wsrep threads
+#endif
#include "sql_callback.h"
#include "threadpool.h"
@@ -367,6 +371,9 @@ static DYNAMIC_ARRAY all_options;
/* Global variables */
+#ifdef WITH_WSREP
+ulong my_bind_addr;
+#endif /* WITH_WSREP */
bool opt_bin_log, opt_bin_log_used=0, opt_ignore_builtin_innodb= 0;
my_bool opt_log, opt_slow_log, debug_assert_if_crashed_table= 0, opt_help= 0, opt_abort;
ulonglong log_output_options;
@@ -459,6 +466,10 @@ ulong opt_binlog_rows_event_max_size;
my_bool opt_master_verify_checksum= 0;
my_bool opt_slave_sql_verify_checksum= 1;
const char *binlog_format_names[]= {"MIXED", "STATEMENT", "ROW", NullS};
+#ifdef WITH_WSREP
+const char *wsrep_binlog_format_names[]=
+ {"MIXED", "STATEMENT", "ROW", "NONE", NullS};
+#endif /*WITH_WSREP */
#ifdef HAVE_INITGROUPS
volatile sig_atomic_t calling_initgroups= 0; /**< Used in SIGSEGV handler. */
#endif
@@ -704,6 +715,22 @@ mysql_cond_t COND_server_started;
int mysqld_server_started=0, mysqld_server_initialized= 0;
File_parser_dummy_hook file_parser_dummy_hook;
+#ifdef WITH_WSREP
+mysql_mutex_t LOCK_wsrep_ready;
+mysql_cond_t COND_wsrep_ready;
+mysql_mutex_t LOCK_wsrep_sst;
+mysql_cond_t COND_wsrep_sst;
+mysql_mutex_t LOCK_wsrep_sst_init;
+mysql_cond_t COND_wsrep_sst_init;
+mysql_mutex_t LOCK_wsrep_rollback;
+mysql_cond_t COND_wsrep_rollback;
+wsrep_aborting_thd_t wsrep_aborting_thd= NULL;
+mysql_mutex_t LOCK_wsrep_replaying;
+mysql_cond_t COND_wsrep_replaying;
+mysql_mutex_t LOCK_wsrep_slave_threads;
+int wsrep_replaying= 0;
+static void wsrep_close_threads(THD* thd);
+#endif
/* replication parameters, if master_host is not NULL, we are a slave */
uint report_port= 0;
@@ -842,6 +869,12 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_LOCK_error_messages, key_LOG_INFO_lock,
key_LOCK_thread_count, key_LOCK_thread_cache,
key_PARTITION_LOCK_auto_inc;
+#ifdef WITH_WSREP
+PSI_mutex_key key_LOCK_wsrep_rollback, key_LOCK_wsrep_thd,
+ key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
+ key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
+ key_LOCK_wsrep_slave_threads;
+#endif
PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state;
@@ -914,6 +947,17 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
+#ifdef WITH_WSREP
+ { &key_LOCK_wsrep_ready, "LOCK_wsrep_ready", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0},
+ { &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_thd, "THD::LOCK_wsrep_thd", 0},
+ { &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
+#endif
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
{ &key_PARTITION_LOCK_auto_inc, "HA_DATA_PARTITION::LOCK_auto_inc", 0},
@@ -957,6 +1001,11 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
key_BINLOG_COND_queue_busy;
+#ifdef WITH_WSREP
+PSI_cond_key key_COND_wsrep_rollback, key_COND_wsrep_thd,
+ key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
+ key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
+#endif /* WITH_WSREP */
PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready;
PSI_cond_key key_RELAYLOG_COND_queue_busy;
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
@@ -999,6 +1048,15 @@ static PSI_cond_info all_server_conds[]=
{ &key_user_level_lock_cond, "User_level_lock::cond", 0},
{ &key_COND_thread_count, "COND_thread_count", PSI_FLAG_GLOBAL},
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
+#ifdef WITH_WSREP
+ { &key_COND_wsrep_ready, "COND_wsrep_ready", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
+ { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
+ { &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL},
+#endif
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL}
};
@@ -1559,6 +1617,11 @@ static void close_connections(void)
if (tmp->slave_thread)
continue;
+#ifdef WITH_WSREP
+ /* skip wsrep system threads as well */
+ if (WSREP(tmp) && (tmp->wsrep_exec_mode==REPL_RECV || tmp->wsrep_applier))
+ continue;
+#endif
tmp->killed= KILL_SERVER_HARD;
MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
mysql_mutex_lock(&tmp->LOCK_thd_data);
@@ -1635,6 +1698,33 @@ static void close_connections(void)
close_connection(tmp,ER_SERVER_SHUTDOWN);
}
#endif
+#ifdef WITH_WSREP
+ /*
+ * TODO: this code block may turn out redundant. wsrep->disconnect()
+ * should terminate slave threads gracefully, and we don't need
+ * to signal them here.
+ * The code here makes sure mysqld will not hang during shutdown
+ * even if wsrep provider has problems in shutting down.
+ */
+ if (WSREP(tmp) && tmp->wsrep_exec_mode==REPL_RECV)
+ {
+ sql_print_information("closing wsrep system thread");
+ tmp->killed= KILL_CONNECTION;
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (tmp));
+ if (tmp->mysys_var)
+ {
+ tmp->mysys_var->abort=1;
+ mysql_mutex_lock(&tmp->mysys_var->mutex);
+ if (tmp->mysys_var->current_cond)
+ {
+ mysql_mutex_lock(tmp->mysys_var->current_mutex);
+ mysql_cond_broadcast(tmp->mysys_var->current_cond);
+ mysql_mutex_unlock(tmp->mysys_var->current_mutex);
+ }
+ mysql_mutex_unlock(&tmp->mysys_var->mutex);
+ }
+ }
+#endif
DBUG_PRINT("quit",("Unlocking LOCK_thread_count"));
mysql_mutex_unlock(&LOCK_thread_count);
}
@@ -1787,8 +1877,14 @@ static void __cdecl kill_server(int sig_ptr)
}
}
#endif
+#ifdef WITH_WSREP
+ if (WSREP_ON) wsrep_stop_replication(NULL);
+#endif
close_connections();
+#ifdef WITH_WSREP
+ if (WSREP_ON) wsrep_deinit();
+#endif
if (sig != MYSQL_KILL_SIGNAL &&
sig != 0)
unireg_abort(1); /* purecov: inspected */
@@ -1883,6 +1979,21 @@ extern "C" void unireg_abort(int exit_code)
usage();
if (exit_code)
sql_print_error("Aborting\n");
+#ifdef WITH_WSREP
+ if (wsrep)
+ {
+ /* This is an abort situation, we cannot expect to gracefully close all
+ * wsrep threads here, we can only diconnect from service */
+ wsrep_close_client_connections(FALSE);
+ shutdown_in_progress= 1;
+ THD* thd(0);
+ wsrep->disconnect(wsrep);
+ WSREP_INFO("Service disconnected.");
+ wsrep_close_threads(thd); /* this won't close all threads */
+ sleep(1); /* so give some time to exit for those which can */
+ WSREP_INFO("Some threads may fail to exit.");
+ }
+#endif // WITH_WSREP
clean_up(!opt_abort && (exit_code || !opt_bootstrap)); /* purecov: inspected */
DBUG_PRINT("quit",("done with cleanup in unireg_abort"));
mysqld_exit(exit_code);
@@ -2092,6 +2203,19 @@ static void clean_up_mutexes()
mysql_cond_destroy(&COND_thread_count);
mysql_cond_destroy(&COND_thread_cache);
mysql_cond_destroy(&COND_flush_thread_cache);
+#ifdef WITH_WSREP
+ (void) mysql_mutex_destroy(&LOCK_wsrep_ready);
+ (void) mysql_cond_destroy(&COND_wsrep_ready);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_sst);
+ (void) mysql_cond_destroy(&COND_wsrep_sst);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_sst_init);
+ (void) mysql_cond_destroy(&COND_wsrep_sst_init);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_rollback);
+ (void) mysql_cond_destroy(&COND_wsrep_rollback);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_replaying);
+ (void) mysql_cond_destroy(&COND_wsrep_replaying);
+ (void) mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
+#endif
mysql_mutex_destroy(&LOCK_server_started);
mysql_cond_destroy(&COND_server_started);
mysql_mutex_destroy(&LOCK_prepare_ordered);
@@ -2406,6 +2530,10 @@ static MYSQL_SOCKET activate_tcp_port(uint port)
socket_errno);
unireg_abort(1);
}
+#if defined(WITH_WSREP) && defined(HAVE_FCNTL) && defined(FD_CLOEXEC)
+ (void) fcntl(mysql_socket_getfd(ip_sock), F_SETFD, FD_CLOEXEC);
+#endif /* WITH_WSREP */
+
DBUG_RETURN(ip_sock);
}
@@ -2532,6 +2660,9 @@ static void network_init(void)
if (mysql_socket_listen(unix_sock,(int) back_log) < 0)
sql_print_warning("listen() on Unix socket failed with error %d",
socket_errno);
+#if defined(WITH_WSREP) && defined(HAVE_FCNTL)
+ (void) fcntl(mysql_socket_getfd(unix_sock), F_SETFD, FD_CLOEXEC);
+#endif /* WITH_WSREP */
}
#endif
DBUG_PRINT("info",("server started"));
@@ -2548,7 +2679,11 @@ static void network_init(void)
@note
For the connection that is doing shutdown, this is called twice
*/
+#ifdef WITH_WSREP
+void close_connection(THD *thd, uint sql_errno, bool lock)
+#else
void close_connection(THD *thd, uint sql_errno)
+#endif
{
DBUG_ENTER("close_connection");
@@ -3835,7 +3970,13 @@ static int init_common_variables()
}
else
opt_log_basename= glob_hostname;
-
+#ifdef WITH_WSREP
+ if (0 == wsrep_node_name || 0 == wsrep_node_name[0])
+ {
+ my_free((void *)wsrep_node_name);
+ wsrep_node_name= my_strdup(glob_hostname, MYF(MY_WME));
+ }
+#endif /* WITH_WSREP */
if (!*pidfile_name)
{
strmake(pidfile_name, opt_log_basename, sizeof(pidfile_name)-5);
@@ -3894,7 +4035,11 @@ static int init_common_variables()
compile_time_assert(sizeof(com_status_vars)/sizeof(com_status_vars[0]) - 1 ==
SQLCOM_END + 8);
#endif
-
+#ifdef WITH_WSREP
+ /* This is a protection against mutually incompatible option values. */
+ if (WSREP_ON && wsrep_check_opts (remaining_argc, remaining_argv))
+ global_system_variables.wsrep_on= 0;
+#endif /* WITH_WSREP */
if (get_options(&remaining_argc, &remaining_argv))
return 1;
set_server_version();
@@ -4534,10 +4679,18 @@ static int init_server_components()
/* need to configure logging before initializing storage engines */
if (!opt_bin_log_used)
{
+#ifdef WITH_WSREP
+ if (!WSREP_ON && opt_log_slave_updates)
+#else
if (opt_log_slave_updates)
+#endif
sql_print_warning("You need to use --log-bin to make "
"--log-slave-updates work.");
+#ifdef WITH_WSREP
+ if (!WSREP_ON && binlog_format_used)
+#else
if (binlog_format_used)
+#endif
sql_print_warning("You need to use --log-bin to make "
"--binlog-format work.");
}
@@ -4563,6 +4716,39 @@ will be ignored as the --log-bin option is not defined.");
}
#endif
+#ifdef WITH_WSREP /* WSREP BEFORE SE */
+ if (!wsrep_recovery)
+ {
+ if (opt_bootstrap) // bootsrap option given - disable wsrep functionality
+ {
+ wsrep_provider_init(WSREP_NONE);
+ if (wsrep_init()) unireg_abort(1);
+ }
+ else // full wsrep initialization
+ {
+ // add basedir/bin to PATH to resolve wsrep script names
+ char* const tmp_path((char*)alloca(strlen(mysql_home) +
+ strlen("/bin") + 1));
+ if (tmp_path)
+ {
+ strcpy(tmp_path, mysql_home);
+ strcat(tmp_path, "/bin");
+ wsrep_prepend_PATH(tmp_path);
+ }
+ else
+ {
+ WSREP_ERROR("Could not append %s/bin to PATH", mysql_home);
+ }
+
+ if (wsrep_before_SE())
+ {
+ set_ports(); // this is also called in network_init() later but we need
+ // to know mysqld_port now - lp:1071882
+ wsrep_init_startup(true);
+ }
+ }
+ }
+#endif /* WITH_WSREP */
if (opt_bin_log)
{
/* Reports an error and aborts, if the --log-bin's path
@@ -4768,10 +4954,29 @@ a file name for --log-bin-index option", opt_binlog_index_name);
internal_tmp_table_max_key_segments= myisam_max_key_segments();
#endif
+#ifdef WITH_WSREP
+ if (!opt_bin_log)
+ {
+ wsrep_emulate_bin_log= 1;
+ }
+#endif
tc_log= (total_ha_2pc > 1 ? (opt_bin_log ?
(TC_LOG *) &mysql_bin_log :
+#ifdef WITH_WSREP
+ (WSREP_ON ?
+ (TC_LOG *) &tc_log_dummy :
+ (TC_LOG *) &tc_log_mmap)) :
+#else
(TC_LOG *) &tc_log_mmap) :
+#endif
(TC_LOG *) &tc_log_dummy);
+#ifdef WITH_WSREP
+ WSREP_DEBUG("Initial TC log open: %s",
+ (tc_log == &mysql_bin_log) ? "binlog" :
+ (tc_log == &tc_log_mmap) ? "mmap" :
+ (tc_log == &tc_log_dummy) ? "dummy" : "unknown"
+ );
+#endif
if (tc_log->open(opt_bin_log ? opt_bin_logname : opt_tc_log_file))
{
@@ -5286,8 +5491,33 @@ int mysqld_main(int argc, char **argv)
if (Events::init(opt_noacl || opt_bootstrap))
unireg_abort(1);
+#ifdef WITH_WSREP /* WSREP AFTER SE */
if (opt_bootstrap)
{
+ /*! bootstrap wsrep init was taken care of above */
+ }
+ else
+ {
+ wsrep_SE_initialized();
+
+ if (wsrep_before_SE())
+ {
+ /*! in case of no SST wsrep waits in view handler callback */
+ wsrep_SE_init_grab();
+ wsrep_SE_init_done();
+ /*! in case of SST wsrep waits for wsrep->sst_received */
+ wsrep_sst_continue();
+ }
+ else
+ {
+ wsrep_init_startup (false);
+ }
+
+ wsrep_create_appliers(wsrep_slave_threads - 1);
+ }
+#endif /* WITH_WSREP */
+ if (opt_bootstrap)
+ {
select_thread_in_use= 0; // Allow 'kill' to work
bootstrap(mysql_stdin);
if (!kill_in_progress)
@@ -5354,6 +5584,9 @@ int mysqld_main(int argc, char **argv)
#ifdef EXTRA_DEBUG2
sql_print_error("Before Lock_thread_count");
#endif
+#ifdef WITH_WSREP
+ WSREP_DEBUG("Before Lock_thread_count");
+#endif
mysql_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("Got thread_count mutex"));
select_thread_in_use=0; // For close_connections
@@ -5619,6 +5852,9 @@ static void bootstrap(MYSQL_FILE *file)
DBUG_ENTER("bootstrap");
THD *thd= new THD;
+#ifdef WITH_WSREP
+ thd->variables.wsrep_on= 0;
+#endif
thd->bootstrap=1;
my_net_init(&thd->net,(st_vio*) 0, MYF(0));
thd->max_client_packet_length= thd->net.max_packet;
@@ -5755,7 +5991,11 @@ void create_thread_to_handle_connection(THD *thd)
my_snprintf(error_message_buff, sizeof(error_message_buff),
ER_THD(thd, ER_CANT_CREATE_THREAD), error);
net_send_error(thd, ER_CANT_CREATE_THREAD, error_message_buff, NULL);
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES ,0);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif /* WITH_WSREP */
mysql_mutex_lock(&LOCK_thread_count);
thd->unlink();
@@ -5801,7 +6041,11 @@ static void create_new_thread(THD *thd)
mysql_mutex_unlock(&LOCK_connection_count);
DBUG_PRINT("error",("Too many connections"));
+#ifdef WITH_WSREP
+ close_connection(thd, ER_CON_COUNT_ERROR, 1);
+#else
close_connection(thd, ER_CON_COUNT_ERROR);
+#endif /* WITH_WSREP */
statistic_increment(denied_connections, &LOCK_status);
delete thd;
DBUG_VOID_RETURN;
@@ -6003,6 +6247,9 @@ void handle_connections_sockets()
sleep(1); // Give other threads some time
continue;
}
+#if defined(WITH_WSREP) && defined(HAVE_FCNTL) && defined(FD_CLOEXEC)
+ (void) fcntl(mysql_socket_getfd(new_sock), F_SETFD, FD_CLOEXEC);
+#endif /* WITH_WSREP */
#ifdef HAVE_LIBWRAP
{
@@ -6189,7 +6436,11 @@ pthread_handler_t handle_connections_namedpipes(void *arg)
if (!(thd->net.vio= vio_new_win32pipe(hConnectedPipe)) ||
my_net_init(&thd->net, thd->net.vio, MYF(MY_THREAD_SPECIFIC)))
{
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif
delete thd;
set_current_thd(0);
continue;
@@ -6387,7 +6638,11 @@ pthread_handler_t handle_connections_shared_memory(void *arg)
event_conn_closed)) ||
my_net_init(&thd->net, thd->net.vio, MYF(MY_THREAD_SPECIFIC)))
{
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif
errmsg= 0;
goto errorconn;
}
@@ -7615,6 +7870,19 @@ SHOW_VAR status_vars[]= {
#ifdef ENABLED_PROFILING
{"Uptime_since_flush_status",(char*) &show_flushstatustime, SHOW_SIMPLE_FUNC},
#endif
+#ifdef WITH_WSREP
+ {"wsrep_connected", (char*) &wsrep_connected, SHOW_BOOL},
+ {"wsrep_ready", (char*) &wsrep_ready, SHOW_BOOL},
+ {"wsrep_cluster_state_uuid", (char*) &wsrep_cluster_state_uuid,SHOW_CHAR_PTR},
+ {"wsrep_cluster_conf_id", (char*) &wsrep_cluster_conf_id, SHOW_LONGLONG},
+ {"wsrep_cluster_status", (char*) &wsrep_cluster_status, SHOW_CHAR_PTR},
+ {"wsrep_cluster_size", (char*) &wsrep_cluster_size, SHOW_LONG},
+ {"wsrep_local_index", (char*) &wsrep_local_index, SHOW_LONG},
+ {"wsrep_provider_name", (char*) &wsrep_provider_name, SHOW_CHAR_PTR},
+ {"wsrep_provider_version", (char*) &wsrep_provider_version, SHOW_CHAR_PTR},
+ {"wsrep_provider_vendor", (char*) &wsrep_provider_vendor, SHOW_CHAR_PTR},
+ {"wsrep", (char*) &wsrep_show_status, SHOW_FUNC},
+#endif
{NullS, NullS, SHOW_LONG}
};
@@ -7952,6 +8220,10 @@ static int mysql_init_variables(void)
tmpenv = DEFAULT_MYSQL_HOME;
(void) strmake(mysql_home, tmpenv, sizeof(mysql_home)-1);
#endif
+#ifdef WITH_WSREP
+ if (WSREP_ON && wsrep_init_vars())
+ return 1;
+#endif
return 0;
}
@@ -8208,6 +8480,14 @@ mysqld_get_one_option(int optid,
case OPT_LOWER_CASE_TABLE_NAMES:
lower_case_table_names_used= 1;
break;
+#ifdef WITH_WSREP
+ case OPT_WSREP_START_POSITION:
+ wsrep_start_position_init (argument);
+ break;
+ case OPT_WSREP_SST_AUTH:
+ wsrep_sst_auth_init (argument);
+ break;
+#endif
#if defined(ENABLED_DEBUG_SYNC)
case OPT_DEBUG_SYNC_TIMEOUT:
/*
@@ -8336,6 +8616,25 @@ mysqld_get_one_option(int optid,
#endif
break;
}
+#ifdef WITH_WSREP
+ mysql_mutex_init(key_LOCK_wsrep_ready,
+ &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_sst,
+ &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_sst_init,
+ &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_rollback,
+ &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_replaying,
+ &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
+ mysql_mutex_init(key_LOCK_wsrep_slave_threads,
+ &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
+#endif
return 0;
}
@@ -8894,8 +9193,6 @@ static int test_if_case_insensitive(const char *dir_name)
DBUG_PRINT("exit", ("result: %d", result));
DBUG_RETURN(result);
}
-
-
#ifndef EMBEDDED_LIBRARY
/**
@@ -8924,6 +9221,487 @@ static void create_pid_file()
}
#endif /* EMBEDDED_LIBRARY */
+#ifdef WITH_WSREP
+typedef void (*wsrep_thd_processor_fun)(THD *);
+
+pthread_handler_t start_wsrep_THD(void *arg)
+{
+ THD *thd;
+ wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
+
+ if (my_thread_init())
+ {
+ WSREP_ERROR("Could not initialize thread");
+ return(NULL);
+ }
+
+ if (!(thd= new THD(true)))
+ {
+ return(NULL);
+ }
+ mysql_mutex_lock(&LOCK_thread_count);
+ thd->thread_id=thread_id++;
+
+ thd->real_id=pthread_self(); // Keep purify happy
+ thread_count++;
+ thread_created++;
+ threads.append(thd);
+
+ my_net_init(&thd->net,(st_vio*) 0, MYF(0));
+
+ DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
+ thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
+ (void) mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* from bootstrap()... */
+ thd->bootstrap=1;
+ thd->max_client_packet_length= thd->net.max_packet;
+ thd->security_ctx->master_access= ~(ulong)0;
+
+ /* from handle_one_connection... */
+ pthread_detach_this_thread();
+
+ mysql_thread_set_psi_id(thd->thread_id);
+ thd->thr_create_utime= microsecond_interval_timer();
+ if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+
+ return(NULL);
+ }
+
+// </5.1.17>
+ /*
+ handle_one_connection() is normally the only way a thread would
+ start and would always be on the very high end of the stack ,
+ therefore, the thread stack always starts at the address of the
+ first local variable of handle_one_connection, which is thd. We
+ need to know the start of the stack so that we could check for
+ stack overruns.
+ */
+ DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n",
+ (long long)thd->thread_id));
+ /* now that we've called my_thread_init(), it is safe to call DBUG_* */
+
+ thd->thread_stack= (char*) &thd;
+ if (thd->store_globals())
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+ delete thd;
+
+ return(NULL);
+ }
+
+ thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
+ thd->security_ctx->skip_grants();
+
+ /* handle_one_connection() again... */
+ //thd->version= refresh_version;
+ thd->proc_info= 0;
+ thd->set_command(COM_SLEEP);
+ thd->set_time();
+ thd->init_for_queries();
+
+ mysql_mutex_lock(&LOCK_connection_count);
+ ++connection_count;
+ mysql_mutex_unlock(&LOCK_connection_count);
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads++;
+ mysql_cond_signal(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ processor(thd);
+
+ close_connection(thd, 0, 1);
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads--;
+ mysql_cond_signal(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ // Note: We can't call THD destructor without crashing
+ // if plugins have not been initialized. However, in most of the
+ // cases this means that pre SE initialization SST failed and
+ // we are going to exit anyway.
+ if (plugins_are_initialized)
+ {
+ net_end(&thd->net);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
+ }
+ else
+ {
+ // TODO: lightweight cleanup to get rid of:
+ // 'Error in my_thread_global_end(): 2 threads didn't exit'
+ // at server shutdown
+ }
+
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_lock(&LOCK_thread_count);
+ delete thd;
+ thread_count--;
+ mysql_mutex_unlock(&LOCK_thread_count);
+ }
+ return(NULL);
+}
+
+void wsrep_create_rollbacker()
+{
+ if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ pthread_t hThread;
+ /* create rollbacker */
+ if (pthread_create( &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_rollback_process))
+ WSREP_WARN("Can't create thread to manage wsrep rollback");
+ }
+}
+
+void wsrep_create_appliers(long threads)
+{
+ if (!wsrep_connected)
+ {
+ /* see wsrep_replication_start() for the logic */
+ if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
+ wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ WSREP_ERROR("Trying to launch slave threads before creating "
+ "connection at '%s'", wsrep_cluster_address);
+ assert(0);
+ }
+ return;
+ }
+
+ long wsrep_threads=0;
+ pthread_t hThread;
+ while (wsrep_threads++ < threads) {
+ if (pthread_create(
+ &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_replication_process))
+ WSREP_WARN("Can't create thread to manage wsrep replication");
+ }
+}
+/**/
+static bool abort_replicated(THD *thd)
+{
+ bool ret_code= false;
+ if (thd->wsrep_query_state== QUERY_COMMITTING)
+ {
+ if (wsrep_debug) WSREP_INFO("aborting replicated trx: %lu", thd->real_id);
+
+ (void)wsrep_abort_thd(thd, thd, TRUE);
+ ret_code= true;
+ }
+ return ret_code;
+}
+/**/
+static inline bool is_client_connection(THD *thd)
+{
+ return (thd->wsrep_client_thread && thd->variables.wsrep_on);
+}
+
+static inline bool is_replaying_connection(THD *thd)
+{
+ bool ret;
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ return ret;
+}
+
+static inline bool is_committing_connection(THD *thd)
+{
+ bool ret;
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ return ret;
+}
+
+static bool have_client_connections()
+{
+ THD *tmp;
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
+ {
+ (void)abort_replicated(tmp);
+ return true;
+ }
+ }
+ return false;
+}
+
+/*
+ returns the number of wsrep appliers running.
+ However, the caller (thd parameter) is not taken in account
+ */
+static int have_wsrep_appliers(THD *thd)
+{
+ int ret= 0;
+ THD *tmp;
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ ret+= (tmp != thd && tmp->wsrep_applier);
+ }
+ return ret;
+}
+
+static void wsrep_close_thread(THD *thd)
+{
+ thd->killed= KILL_CONNECTION;
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+ if (thd->mysys_var)
+ {
+ thd->mysys_var->abort=1;
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ if (thd->mysys_var->current_cond)
+ {
+ mysql_mutex_lock(thd->mysys_var->current_mutex);
+ mysql_cond_broadcast(thd->mysys_var->current_cond);
+ mysql_mutex_unlock(thd->mysys_var->current_mutex);
+ }
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+ }
+}
+
+static my_bool have_committing_connections()
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ if (!is_client_connection(tmp))
+ continue;
+
+ if (is_committing_connection(tmp))
+ {
+ mysql_mutex_unlock(&LOCK_thread_count);
+ return TRUE;
+ }
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ return FALSE;
+}
+
+int wsrep_wait_committing_connections_close(int wait_time)
+{
+ int sleep_time= 100;
+
+ while (have_committing_connections() && wait_time > 0)
+ {
+ WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
+ my_sleep(sleep_time);
+ wait_time -= sleep_time;
+ }
+ if (have_committing_connections())
+ {
+ return 1;
+ }
+ return 0;
+}
+
+void wsrep_close_client_connections(my_bool wait_to_end)
+{
+ /*
+ First signal all threads that it's time to die
+ */
+
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ bool kill_cached_threads_saved= kill_cached_threads;
+ kill_cached_threads= true; // prevent future threads caching
+ mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (!is_client_connection(tmp))
+ continue;
+
+ if (is_replaying_connection(tmp))
+ {
+ tmp->killed= KILL_CONNECTION;
+ continue;
+ }
+
+ /* replicated transactions must be skipped */
+ if (abort_replicated(tmp))
+ continue;
+
+ WSREP_DEBUG("closing connection %ld", tmp->thread_id);
+ wsrep_close_thread(tmp);
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ if (thread_count)
+ sleep(2); // Give threads time to die
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ /*
+ Force remaining threads to die by closing the connection to the client
+ */
+
+ I_List_iterator<THD> it2(threads);
+ while ((tmp=it2++))
+ {
+#ifndef __bsdi__ // Bug in BSDI kernel
+ if (is_client_connection(tmp) &&
+ !abort_replicated(tmp) &&
+ !is_replaying_connection(tmp))
+ {
+ WSREP_INFO("killing local connection: %ld",tmp->thread_id);
+ close_connection(tmp,0,0);
+ }
+#endif
+ }
+
+ DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
+ if (wsrep_debug)
+ WSREP_INFO("waiting for client connections to close: %u", thread_count);
+
+ while (wait_to_end && have_client_connections())
+ {
+ mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
+ DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
+ }
+
+ kill_cached_threads= kill_cached_threads_saved;
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* All client connection threads have now been aborted */
+}
+
+void wsrep_close_applier(THD *thd)
+{
+ WSREP_DEBUG("closing applier %ld", thd->thread_id);
+ wsrep_close_thread(thd);
+}
+
+static void wsrep_close_threads(THD *thd)
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (tmp->wsrep_applier && tmp != thd)
+ {
+ WSREP_DEBUG("closing wsrep thread %ld", tmp->thread_id);
+ wsrep_close_thread (tmp);
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+}
+
+void wsrep_close_applier_threads(int count)
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++) && count)
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (tmp->wsrep_applier)
+ {
+ WSREP_DEBUG("closing wsrep applier thread %ld", tmp->thread_id);
+ tmp->wsrep_applier_closing= TRUE;
+ count--;
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+}
+
+void wsrep_wait_appliers_close(THD *thd)
+{
+ /* Wait for wsrep appliers to gracefully exit */
+ mysql_mutex_lock(&LOCK_thread_count);
+ while (have_wsrep_appliers(thd) > 1)
+ // 1 is for rollbacker thread which needs to be killed explicitly.
+ // This gotta be fixed in a more elegant manner if we gonna have arbitrary
+ // number of non-applier wsrep threads.
+ {
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_unlock(&LOCK_thread_count);
+ my_sleep(100);
+ mysql_mutex_lock(&LOCK_thread_count);
+ }
+ else
+ mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ DBUG_PRINT("quit",("One applier died (count=%u)",thread_count));
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ /* Now kill remaining wsrep threads: rollbacker */
+ wsrep_close_threads (thd);
+ /* and wait for them to die */
+ mysql_mutex_lock(&LOCK_thread_count);
+ while (have_wsrep_appliers(thd) > 0)
+ {
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_unlock(&LOCK_thread_count);
+ my_sleep(100);
+ mysql_mutex_lock(&LOCK_thread_count);
+ }
+ else
+ mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ DBUG_PRINT("quit",("One thread died (count=%u)",thread_count));
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* All wsrep applier threads have now been aborted. However, if this thread
+ is also applier, we are still running...
+ */
+}
+
+void wsrep_kill_mysql(THD *thd)
+{
+ if (mysqld_server_started)
+ {
+ if (!shutdown_in_progress)
+ {
+ WSREP_INFO("starting shutdown");
+ kill_mysql();
+ }
+ }
+ else
+ {
+ unireg_abort(1);
+ }
+}
+#endif /* WITH_WSREP */
/**
Remove the process' pid file.
@@ -9268,6 +10046,14 @@ void init_server_psi_keys(void)
count= array_elements(com_statement_info);
mysql_statement_register(category, com_statement_info, count);
+#ifdef WITH_WSREP /* WSREP AFTER SE */
+ if (wsrep_recovery)
+ {
+ select_thread_in_use= 0;
+ wsrep_recover();
+ unireg_abort(0);
+ }
+#endif /* WITH_WSREP */
/*
When a new packet is received,
it is instrumented as "statement/com/".
diff --git a/sql/mysqld.h b/sql/mysqld.h
index e07b5d5c41c..55a631c3ccf 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -54,7 +54,11 @@ typedef Bitmap<((MAX_INDEXES+7)/8*8)> key_map; /* Used for finding keys */
some places */
/* Function prototypes */
void kill_mysql(void);
+#ifdef WITH_WSREP
+void close_connection(THD *thd, uint sql_errno= 0, bool lock=1);
+#else
void close_connection(THD *thd, uint sql_errno= 0);
+#endif
void handle_connection_in_main_thread(THD *thd);
void create_thread_to_handle_connection(THD *thd);
void delete_running_thd(THD *thd);
@@ -225,6 +229,10 @@ extern pthread_key(MEM_ROOT**,THR_MALLOC);
extern PSI_mutex_key key_PAGE_lock, key_LOCK_sync, key_LOCK_active,
key_LOCK_pool, key_LOCK_pending_checkpoint;
#endif /* HAVE_MMAP */
+#ifdef WITH_WSREP
+extern PSI_mutex_key key_LOCK_wsrep_thd;
+extern PSI_cond_key key_COND_wsrep_thd;
+#endif /* HAVE_MMAP */
#ifdef HAVE_OPENSSL
extern PSI_mutex_key key_LOCK_des_key_file;
@@ -557,6 +565,14 @@ enum options_mysqld
OPT_SSL_KEY,
OPT_UPDATE_LOG,
OPT_WANT_CORE,
+#ifdef WITH_WSREP
+ OPT_WSREP_PROVIDER,
+ OPT_WSREP_PROVIDER_OPTIONS,
+ OPT_WSREP_CLUSTER_ADDRESS,
+ OPT_WSREP_START_POSITION,
+ OPT_WSREP_SST_AUTH,
+ OPT_WSREP_RECOVER,
+#endif
OPT_which_is_always_the_last
};
#endif
@@ -700,4 +716,5 @@ extern uint internal_tmp_table_max_key_segments;
extern uint volatile global_disable_checkpoint;
extern my_bool opt_help;
+
#endif /* MYSQLD_INCLUDED */
diff --git a/sql/protocol.cc b/sql/protocol.cc
index f6e9e9e62e1..a86da94af61 100644
--- a/sql/protocol.cc
+++ b/sql/protocol.cc
@@ -485,6 +485,14 @@ static uchar *net_store_length_fast(uchar *packet, uint length)
void Protocol::end_statement()
{
+#ifdef WITH_WSREP
+ /*sanity check, can be removed before 1.0 release */
+ if (WSREP(thd) && thd->wsrep_conflict_state== REPLAYING)
+ {
+ WSREP_ERROR("attempting net_end_statement while replaying");
+ return;
+ }
+#endif
DBUG_ENTER("Protocol::end_statement");
DBUG_ASSERT(! thd->stmt_da->is_sent);
bool error= FALSE;
diff --git a/sql/set_var.h b/sql/set_var.h
index 6cb0cd33f87..52964a1f29d 100644
--- a/sql/set_var.h
+++ b/sql/set_var.h
@@ -240,6 +240,9 @@ public:
int check(THD *thd);
int update(THD *thd);
int light_check(THD *thd);
+#ifdef WITH_WSREP
+ int wsrep_store_variable(THD *thd);
+#endif
};
@@ -321,6 +324,9 @@ extern sys_var *Sys_autocommit_ptr;
CHARSET_INFO *get_old_charset_by_name(const char *old_name);
+#ifdef WITH_WSREP
+int sql_set_wsrep_variables(THD *thd, List<set_var_base> *var_list);
+#endif
int sys_var_init();
int sys_var_add_options(DYNAMIC_ARRAY *long_options, int parse_flags);
void sys_var_end(void);
diff --git a/sql/slave.cc b/sql/slave.cc
index c7f4dc08096..327f1b763a3 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -53,6 +53,9 @@
// Create_file_log_event,
// Format_description_log_event
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
#ifdef HAVE_REPLICATION
#include "rpl_tblmap.h"
@@ -3026,6 +3029,23 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
ev->thd = thd; // because up to this point, ev->thd == 0
int reason= ev->shall_skip(rli);
+#ifdef WITH_WSREP
+ if (WSREP_ON && (ev->get_type_code() == XID_EVENT ||
+ (ev->get_type_code() == QUERY_EVENT && thd->wsrep_mysql_replicated > 0 &&
+ (!strncasecmp(((Query_log_event*)ev)->query , "BEGIN", 5) ||
+ !strncasecmp(((Query_log_event*)ev)->query , "COMMIT", 6) ))))
+ {
+ if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle)
+ {
+ WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated);
+ reason = Log_event::EVENT_SKIP_IGNORE;
+ }
+ else
+ {
+ thd->wsrep_mysql_replicated = 0;
+ }
+ }
+#endif
if (reason == Log_event::EVENT_SKIP_COUNT)
{
DBUG_ASSERT(rli->slave_skip_counter > 0);
@@ -4004,6 +4024,11 @@ pthread_handler_t handle_slave_sql(void *arg)
#endif
DBUG_ASSERT(rli->sql_thd == thd);
+#ifdef WITH_WSREP
+ thd->wsrep_exec_mode= LOCAL_STATE;
+ /* synchronize with wsrep replication */
+ if (WSREP_ON) wsrep_ready_wait();
+#endif
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
rli->group_master_log_name,
llstr(rli->group_master_log_pos,llbuff)));
diff --git a/sql/sp.cc b/sql/sp.cc
index 41a1ae583fc..b60411a9121 100644
--- a/sql/sp.cc
+++ b/sql/sp.cc
@@ -2254,3 +2254,37 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db,
thd->lex= old_lex;
return sp;
}
+#ifdef WITH_WSREP
+int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len)
+{
+ String log_query;
+ sp_head *sp = thd->lex->sphead;
+ ulong saved_mode= thd->variables.sql_mode;
+ String retstr(64);
+ retstr.set_charset(system_charset_info);
+
+ log_query.set_charset(system_charset_info);
+
+ if (sp->m_type == TYPE_ENUM_FUNCTION)
+ {
+ sp_returns_type(thd, retstr, sp);
+ }
+
+ if (!create_string(thd, &log_query,
+ sp->m_type,
+ (sp->m_explicit_name ? sp->m_db.str : NULL),
+ (sp->m_explicit_name ? sp->m_db.length : 0),
+ sp->m_name.str, sp->m_name.length,
+ sp->m_params.str, sp->m_params.length,
+ retstr.c_ptr(), retstr.length(),
+ sp->m_body.str, sp->m_body.length,
+ sp->m_chistics, &(thd->lex->definer->user),
+ &(thd->lex->definer->host),
+ saved_mode))
+ {
+ WSREP_WARN("SP create string failed: %s", thd->query());
+ return 1;
+ }
+ return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
+}
+#endif /* WITH_WSREP */
diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc
index e776968792c..d056727c2fd 100644
--- a/sql/sql_acl.cc
+++ b/sql/sql_acl.cc
@@ -1862,6 +1862,9 @@ int check_change_password(THD *thd, const char *host, const char *user,
return(1);
}
if (!thd->slave_thread &&
+#ifdef WITH_WSREP
+ (!WSREP(thd) || !thd->wsrep_applier) &&
+#endif /* WITH_WSREP */
(strcmp(thd->security_ctx->user, user) ||
my_strcasecmp(system_charset_info, host,
thd->security_ctx->priv_host)))
@@ -1869,7 +1872,12 @@ int check_change_password(THD *thd, const char *host, const char *user,
if (check_access(thd, UPDATE_ACL, "mysql", NULL, NULL, 1, 0))
return(1);
}
+#ifdef WITH_WSREP
+ if ((!WSREP(thd) || !thd->wsrep_applier) &&
+ !thd->slave_thread && !thd->security_ctx->user[0])
+#else
if (!thd->slave_thread && !thd->security_ctx->user[0])
+#endif /* WITH_WSREP */
{
my_message(ER_PASSWORD_ANONYMOUS_USER, ER(ER_PASSWORD_ANONYMOUS_USER),
MYF(0));
@@ -1913,6 +1921,9 @@ bool change_password(THD *thd, const char *host, const char *user,
enum_binlog_format save_binlog_format;
uint new_password_len= (uint) strlen(new_password);
bool result= 1;
+#ifdef WITH_WSREP
+ const CSET_STRING query_save = thd->query_string;
+#endif /* WITH_WSREP */
DBUG_ENTER("change_password");
DBUG_PRINT("enter",("host: '%s' user: '%s' new_password: '%s'",
host,user,new_password));
@@ -1920,6 +1931,18 @@ bool change_password(THD *thd, const char *host, const char *user,
if (check_change_password(thd, host, user, new_password, new_password_len))
DBUG_RETURN(1);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && !thd->wsrep_applier)
+ {
+ query_length= sprintf(buff, "SET PASSWORD FOR '%-.120s'@'%-.120s'='%-.120s'",
+ user ? user : "",
+ host ? host : "",
+ new_password);
+ thd->set_query_inner(buff, query_length, system_charset_info);
+
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, (char*)"user", NULL);
+ }
+#endif /* WITH_WSREP */
tables.init_one_table("mysql", 5, "user", 4, "user", TL_WRITE);
@@ -1999,9 +2022,23 @@ bool change_password(THD *thd, const char *host, const char *user,
}
end:
close_mysql_tables(thd);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && !thd->wsrep_applier)
+ {
+ WSREP_TO_ISOLATION_END;
+
+ thd->query_string = query_save;
+ thd->wsrep_exec_mode = LOCAL_STATE;
+ }
+#endif /* WITH_WSREP */
thd->restore_stmt_binlog_format(save_binlog_format);
DBUG_RETURN(result);
+#ifdef WITH_WSREP
+ error:
+ WSREP_ERROR("Replication of SET PASSWORD failed: %s", buff);
+ DBUG_RETURN(result);
+#endif /* WITH_WSREP */
}
diff --git a/sql/sql_admin.cc b/sql/sql_admin.cc
index 9e3ea46f526..a9b7624af3c 100644
--- a/sql/sql_admin.cc
+++ b/sql/sql_admin.cc
@@ -1153,6 +1153,8 @@ bool Optimize_table_statement::execute(THD *thd)
FALSE, UINT_MAX, FALSE))
goto error; /* purecov: inspected */
thd->enable_slow_log= opt_log_slow_admin_statements;
+
+ WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name, NULL)
res= (specialflag & (SPECIAL_SAFE_MODE | SPECIAL_NO_NEW_FUNC)) ?
mysql_recreate_table(thd, first_table) :
mysql_admin_table(thd, first_table, &m_lex->check_opt,
@@ -1184,6 +1186,7 @@ bool Repair_table_statement::execute(THD *thd)
FALSE, UINT_MAX, FALSE))
goto error; /* purecov: inspected */
thd->enable_slow_log= opt_log_slow_admin_statements;
+ WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name, NULL)
res= mysql_admin_table(thd, first_table, &m_lex->check_opt, "repair",
TL_WRITE, 1,
test(m_lex->check_opt.sql_flags & TT_USEFRM),
diff --git a/sql/sql_alter.cc b/sql/sql_alter.cc
index c6c02773286..00691633aa8 100644
--- a/sql/sql_alter.cc
+++ b/sql/sql_alter.cc
@@ -17,7 +17,9 @@
#include "sql_table.h" // mysql_alter_table,
// mysql_exchange_partition
#include "sql_alter.h"
-
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
bool Alter_table_statement::execute(THD *thd)
{
LEX *lex= thd->lex;
@@ -97,6 +99,20 @@ bool Alter_table_statement::execute(THD *thd)
thd->enable_slow_log= opt_log_slow_admin_statements;
+#ifdef WITH_WSREP
+ TABLE *find_temporary_table(THD *thd, const TABLE_LIST *tl);
+
+ if ((!thd->is_current_stmt_binlog_format_row() ||
+ !find_temporary_table(thd, first_table)) &&
+ wsrep_to_isolation_begin(thd,
+ lex->name.str ? select_lex->db : NULL,
+ lex->name.str ? lex->name.str : NULL,
+ first_table))
+ {
+ WSREP_WARN("ALTER TABLE isolation failure");
+ DBUG_RETURN(TRUE);
+ }
+#endif /* WITH_WSREP */
result= mysql_alter_table(thd, select_lex->db, lex->name.str,
&create_info,
first_table,
@@ -105,5 +121,7 @@ bool Alter_table_statement::execute(THD *thd)
select_lex->order_list.first,
lex->ignore, lex->online);
+#ifdef WITH_WSREP
+#endif /* WITH_WSREP */
DBUG_RETURN(result);
}
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index b4c163f61ee..c9630aef126 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -61,6 +61,10 @@
#include <io.h>
#endif
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+
+#endif // WITH_WSREP
bool
No_such_table_error_handler::handle_condition(THD *,
@@ -4115,7 +4119,7 @@ thr_lock_type read_lock_type_for_table(THD *thd,
*/
bool log_on= mysql_bin_log.is_open() && thd->variables.sql_log_bin;
ulong binlog_format= thd->variables.binlog_format;
- if ((log_on == FALSE) || (binlog_format == BINLOG_FORMAT_ROW) ||
+ if ((log_on == FALSE) || (WSREP_FORMAT(binlog_format) == BINLOG_FORMAT_ROW) ||
(table_list->table->s->table_category == TABLE_CATEGORY_LOG) ||
(table_list->table->s->table_category == TABLE_CATEGORY_PERFORMANCE) ||
!(is_update_query(prelocking_ctx->sql_command) ||
@@ -5066,6 +5070,22 @@ restart:
}
}
}
+#ifdef WITH_WSREP
+ if ((thd->lex->sql_command== SQLCOM_INSERT ||
+ thd->lex->sql_command== SQLCOM_INSERT_SELECT ||
+ thd->lex->sql_command== SQLCOM_REPLACE ||
+ thd->lex->sql_command== SQLCOM_REPLACE_SELECT ||
+ thd->lex->sql_command== SQLCOM_UPDATE ||
+ thd->lex->sql_command== SQLCOM_UPDATE_MULTI ||
+ thd->lex->sql_command== SQLCOM_LOAD ||
+ thd->lex->sql_command== SQLCOM_DELETE) &&
+ wsrep_replicate_myisam &&
+ (*start)->table && (*start)->table->file->ht->db_type == DB_TYPE_MYISAM)
+ {
+ WSREP_TO_ISOLATION_BEGIN(NULL, NULL, (*start));
+ }
+ error:
+#endif
err:
free_root(&new_frm_mem, MYF(0)); // Free pre-alloced block
@@ -5780,7 +5800,7 @@ bool lock_tables(THD *thd, TABLE_LIST *tables, uint count,
We can solve these problems in mixed mode by switching to binlogging
if at least one updated table is used by sub-statement
*/
- if (thd->variables.binlog_format != BINLOG_FORMAT_ROW && tables &&
+ if (WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_ROW && tables &&
has_write_table_with_auto_increment(thd->lex->first_not_own_table()))
thd->lex->set_stmt_unsafe(LEX::BINLOG_STMT_UNSAFE_AUTOINC_COLUMNS);
}
@@ -9253,7 +9273,19 @@ bool mysql_notify_thread_having_shared_lock(THD *thd, THD *in_use,
(e.g. see partitioning code).
*/
if (!thd_table->needs_reopen())
+#ifdef WITH_WSREP
+ {
+ signalled|= mysql_lock_abort_for_thread(thd, thd_table);
+ if (thd && WSREP(thd) && wsrep_thd_is_brute_force((void *)thd))
+ {
+ WSREP_DEBUG("remove_table_from_cache: %llu",
+ (unsigned long long) thd->real_id);
+ wsrep_abort_thd((void *)thd, (void *)in_use, FALSE);
+ }
+ }
+#else
signalled|= mysql_lock_abort_for_thread(thd, thd_table);
+#endif
}
mysql_mutex_unlock(&in_use->LOCK_thd_data);
}
diff --git a/sql/sql_builtin.cc.in b/sql/sql_builtin.cc.in
index 63850650ac9..2de475b0a76 100644
--- a/sql/sql_builtin.cc.in
+++ b/sql/sql_builtin.cc.in
@@ -25,7 +25,11 @@ extern
#endif
builtin_maria_plugin
@mysql_mandatory_plugins@ @mysql_optional_plugins@
- builtin_maria_binlog_plugin, builtin_maria_mysql_password_plugin;
+ builtin_maria_binlog_plugin,
+#ifdef WITH_WSREP
+ builtin_wsrep_plugin@mysql_plugin_defs@,
+#endif /* WITH_WSREP */
+ builtin_maria_mysql_password_plugin;
struct st_maria_plugin *mysql_optional_plugins[]=
{
@@ -35,5 +39,8 @@ struct st_maria_plugin *mysql_optional_plugins[]=
struct st_maria_plugin *mysql_mandatory_plugins[]=
{
builtin_maria_binlog_plugin, builtin_maria_mysql_password_plugin,
+#ifdef WITH_WSREP
+ builtin_wsrep_plugin@mysql_plugin_defs@,
+#endif /* WITH_WSREP */
@mysql_mandatory_plugins@ 0
};
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 238bf46e528..71df97dd964 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -62,6 +62,9 @@
#include "debug_sync.h"
#include "sql_parse.h" // is_update_query
#include "sql_callback.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
#include "sql_connect.h"
/*
@@ -782,6 +785,173 @@ char *thd_security_context(THD *thd, char *buffer, unsigned int length,
return buffer;
}
+#ifdef WITH_WSREP
+extern int wsrep_on(void *thd)
+{
+ return (int)(WSREP(((THD*)thd)));
+}
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd)
+{
+ return thd->variables.wsrep_on;
+}
+
+extern "C" bool wsrep_consistency_check(void *thd)
+{
+ return ((THD*)thd)->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
+}
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode)
+{
+ thd->wsrep_exec_mode= mode;
+}
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state)
+{
+ thd->wsrep_query_state= state;
+}
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state)
+{
+ thd->wsrep_conflict_state= state;
+}
+
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd)
+{
+ return thd->wsrep_exec_mode;
+}
+
+extern "C" const char *wsrep_thd_exec_mode_str(THD *thd)
+{
+ return
+ (!thd) ? "void" :
+ (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" :
+ (thd->wsrep_exec_mode == REPL_RECV) ? "applier" :
+ (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" :
+ (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void";
+}
+
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd)
+{
+ return thd->wsrep_query_state;
+}
+
+extern "C" const char *wsrep_thd_query_state_str(THD *thd)
+{
+ return
+ (!thd) ? "void" :
+ (thd->wsrep_query_state == QUERY_IDLE) ? "idle" :
+ (thd->wsrep_query_state == QUERY_EXEC) ? "executing" :
+ (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" :
+ (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" :
+ (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void";
+}
+
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd)
+{
+ return thd->wsrep_conflict_state;
+}
+extern "C" const char *wsrep_thd_conflict_state_str(THD *thd)
+{
+ return
+ (!thd) ? "void" :
+ (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" :
+ (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" :
+ (thd->wsrep_conflict_state == ABORTING) ? "aborting" :
+ (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" :
+ (thd->wsrep_conflict_state == REPLAYING) ? "replaying" :
+ (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" :
+ (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void";
+}
+
+extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd)
+{
+ return &thd->wsrep_trx_handle;
+}
+
+extern "C"void wsrep_thd_LOCK(THD *thd)
+{
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+}
+extern "C"void wsrep_thd_UNLOCK(THD *thd)
+{
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+}
+extern "C" time_t wsrep_thd_query_start(THD *thd)
+{
+ return thd->query_start();
+}
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd)
+{
+ return thd->wsrep_rand;
+}
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd)
+{
+ return thd->thread_id;
+}
+extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd)
+{
+ return (thd) ? thd->wsrep_trx_seqno : -1;
+}
+extern "C" query_id_t wsrep_thd_query_id(THD *thd)
+{
+ return thd->query_id;
+}
+extern "C" char *wsrep_thd_query(THD *thd)
+{
+ return (thd) ? thd->query() : NULL;
+}
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd)
+{
+ return thd->wsrep_last_query_id;
+}
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
+{
+ thd->wsrep_last_query_id= id;
+}
+extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal)
+{
+ if (signal)
+ {
+ thd->wsrep_bf_thd = bf_thd;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->awake(KILL_QUERY);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
+ else
+ {
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+}
+
+extern int
+wsrep_trx_order_before(void *thd1, void *thd2)
+{
+ if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) {
+ WSREP_DEBUG("BF conflict, order: %lld %lld\n",
+ (long long)((THD*)thd1)->wsrep_trx_seqno,
+ (long long)((THD*)thd2)->wsrep_trx_seqno);
+ return 1;
+ }
+ WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
+ (long long)((THD*)thd1)->wsrep_trx_seqno,
+ (long long)((THD*)thd2)->wsrep_trx_seqno);
+ return 0;
+}
+extern "C" int
+wsrep_trx_is_aborting(void *thd_ptr)
+{
+ if (thd_ptr) {
+ if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) ||
+ (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) {
+ return 1;
+ }
+ }
+ return 0;
+}
+#endif
/**
Implementation of Drop_table_error_handler::handle_condition().
@@ -810,7 +980,11 @@ bool Drop_table_error_handler::handle_condition(THD *thd,
}
+#ifdef WITH_WSREP
+THD::THD(bool is_applier)
+#else
THD::THD()
+#endif
:Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION,
/* statement id */ 0),
rli_fake(0), rli_slave(NULL),
@@ -842,6 +1016,11 @@ THD::THD()
bootstrap(0),
derived_tables_processing(FALSE),
spcont(NULL),
+#ifdef WITH_WSREP
+ wsrep_applier(is_applier),
+ wsrep_applier_closing(FALSE),
+ wsrep_client_thread(0),
+#endif
m_parser_state(NULL),
#if defined(ENABLED_DEBUG_SYNC)
debug_sync_control(0),
@@ -955,6 +1134,23 @@ THD::THD()
m_command=COM_CONNECT;
*scramble= '\0';
+#ifdef WITH_WSREP
+ mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL);
+ wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
+ wsrep_trx_handle.opaque = NULL;
+ //wsrep_retry_autocommit= ::wsrep_retry_autocommit;
+ wsrep_retry_counter = 0;
+ wsrep_PA_safe = true;
+ wsrep_seqno_changed = false;
+ wsrep_retry_query = NULL;
+ wsrep_retry_query_len = 0;
+ wsrep_retry_command = COM_CONNECT;
+ wsrep_consistency_check = NO_CONSISTENCY_CHECK;
+ wsrep_status_vars = 0;
+ wsrep_mysql_replicated = 0;
+ wsrep_bf_thd = NULL;
+#endif
/* Call to init() below requires fully initialized Open_tables_state. */
reset_open_tables_state(this);
@@ -994,6 +1190,13 @@ THD::THD()
my_rnd_init(&rand, tmp + (ulong) &rand, tmp + (ulong) ::global_query_id);
substitute_null_with_insert_id = FALSE;
thr_lock_info_init(&lock_info); /* safety: will be reset after start */
+#ifdef WITH_WSREP
+ lock_info.mysql_thd= (void *)this;
+ lock_info.in_lock_tables= false;
+#ifdef WSREP_PROC_INFO
+ wsrep_info[sizeof(wsrep_info) - 1] = '\0'; /* make sure it is 0-terminated */
+#endif /* WSREP_PROC_INFO */
+#endif /* WITH_WSREP */
m_internal_handler= NULL;
m_binlog_invoker= FALSE;
@@ -1306,7 +1509,21 @@ void THD::init(void)
reset_current_stmt_binlog_format_row();
set_status_var_init();
bzero((char *) &org_status_var, sizeof(org_status_var));
-
+#ifdef WITH_WSREP
+ wsrep_exec_mode= wsrep_applier ? REPL_RECV : LOCAL_STATE;
+ wsrep_conflict_state= NO_CONFLICT;
+ wsrep_query_state= QUERY_IDLE;
+ wsrep_last_query_id= 0;
+ wsrep_trx_seqno= 0;
+ wsrep_converted_lock_session= false;
+ wsrep_retry_counter= 0;
+ wsrep_rli= NULL;
+ wsrep_PA_safe= true;
+ wsrep_seqno_changed= false;
+ wsrep_consistency_check = NO_CONSISTENCY_CHECK;
+ wsrep_mysql_replicated = 0;
+ wsrep_bf_thd = NULL;
+#endif
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
else
@@ -1504,6 +1721,13 @@ THD::~THD()
mysys_var=0; // Safety (shouldn't be needed)
mysql_mutex_unlock(&LOCK_thd_data);
+#ifdef WITH_WSREP
+ mysql_mutex_lock(&LOCK_wsrep_thd);
+ mysql_mutex_unlock(&LOCK_wsrep_thd);
+ mysql_mutex_destroy(&LOCK_wsrep_thd);
+ if (wsrep_rli) delete wsrep_rli;
+ if (wsrep_status_vars) wsrep->stats_free(wsrep, wsrep_status_vars);
+#endif
/* Close connection */
#ifndef EMBEDDED_LIBRARY
if (net.vio)
@@ -1678,6 +1902,9 @@ void THD::awake(killed_state state_to_set)
/* Interrupt target waiting inside a storage engine. */
if (state_to_set != NOT_KILLED)
+#ifdef WITH_WSREP
+ if (!wsrep_bf_thd || wsrep_bf_thd->wsrep_exec_mode == LOCAL_STATE)
+#endif /* WITH_WSREP */
ha_kill_query(this, thd_kill_level(this));
/* Broadcast a condition to kick the target if it is waiting on it. */
@@ -1944,6 +2171,13 @@ void THD::cleanup_after_query()
/* reset table map for multi-table update */
table_map_for_update= 0;
m_binlog_invoker= FALSE;
+#ifdef WITH_WSREP
+ if (TOTAL_ORDER == wsrep_exec_mode)
+ {
+ wsrep_exec_mode = LOCAL_STATE;
+ }
+ //wsrep_trx_seqno = 0;
+#endif /* WITH_WSREP */
#ifndef EMBEDDED_LIBRARY
if (rli_slave)
@@ -2353,6 +2587,13 @@ bool sql_exchange::escaped_given(void)
bool select_send::send_result_set_metadata(List<Item> &list, uint flags)
{
bool res;
+#ifdef WITH_WSREP
+ if (WSREP(thd) && thd->wsrep_retry_query)
+ {
+ WSREP_DEBUG("skipping select metadata");
+ return FALSE;
+ }
+#endif /* WITH_WSREP */
if (!(res= thd->protocol->send_result_set_metadata(&list, flags)))
is_result_set_started= 1;
return res;
@@ -4103,8 +4344,13 @@ extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
extern "C" int thd_binlog_format(const MYSQL_THD thd)
{
+#ifdef WITH_WSREP
+ if (((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()) &&
+ (thd->variables.option_bits & OPTION_BIN_LOG))
+#else
if (mysql_bin_log.is_open() && (thd->variables.option_bits & OPTION_BIN_LOG))
- return (int) thd->variables.binlog_format;
+#endif
+ return (int) WSREP_FORMAT(thd->variables.binlog_format);
else
return BINLOG_FORMAT_UNSPEC;
}
@@ -4796,7 +5042,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
binlog by filtering rules.
*/
if (mysql_bin_log.is_open() && (variables.option_bits & OPTION_BIN_LOG) &&
- !(variables.binlog_format == BINLOG_FORMAT_STMT &&
+ !(WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT &&
!binlog_filter->db_ok(db)))
{
/*
@@ -4960,7 +5206,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
*/
my_error((error= ER_BINLOG_ROW_INJECTION_AND_STMT_ENGINE), MYF(0));
}
- else if (variables.binlog_format == BINLOG_FORMAT_ROW &&
+ else if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW &&
sqlcom_can_generate_row_events(this))
{
/*
@@ -4989,7 +5235,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
else
{
/* binlog_format = STATEMENT */
- if (variables.binlog_format == BINLOG_FORMAT_STMT)
+ if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_STMT)
{
if (lex->is_stmt_row_injection())
{
@@ -5006,7 +5252,14 @@ int THD::decide_logging_format(TABLE_LIST *tables)
5. Error: Cannot modify table that uses a storage engine
limited to row-logging when binlog_format = STATEMENT
*/
+#ifdef WITH_WSREP
+ if (!WSREP(this) || wsrep_exec_mode == LOCAL_STATE)
+ {
+#endif /* WITH_WSREP */
my_error((error= ER_BINLOG_STMT_MODE_AND_ROW_ENGINE), MYF(0), "");
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
}
else if (is_write && (unsafe_flags= lex->get_stmt_unsafe_flags()) != 0)
{
@@ -5054,7 +5307,7 @@ int THD::decide_logging_format(TABLE_LIST *tables)
"and binlog_filter->db_ok(db) = %d",
mysql_bin_log.is_open(),
(variables.option_bits & OPTION_BIN_LOG),
- variables.binlog_format,
+ WSREP_FORMAT(variables.binlog_format),
binlog_filter->db_ok(db)));
#endif
@@ -5287,7 +5540,13 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log) ||
+ mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
/*
Pack records into format for transfer. We are allocating more
@@ -5317,7 +5576,13 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
const uchar *before_record,
const uchar *after_record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
size_t const before_maxlen = max_row_length(table, before_record);
size_t const after_maxlen = max_row_length(table, after_record);
@@ -5362,7 +5627,13 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
MY_BITMAP const* cols, size_t colcnt,
uchar const *record)
{
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ ((WSREP(this) && wsrep_emulate_bin_log)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
/*
Pack records into format for transfer. We are allocating more
@@ -5393,7 +5664,11 @@ int THD::binlog_remove_pending_rows_event(bool clear_maps,
{
DBUG_ENTER("THD::binlog_remove_pending_rows_event");
+#ifdef WITH_WSREP
+ if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()))
+#else
if (!mysql_bin_log.is_open())
+#endif
DBUG_RETURN(0);
mysql_bin_log.remove_pending_rows_event(this, is_transactional);
@@ -5412,7 +5687,11 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
mode: it might be the case that we left row-based mode before
flushing anything (e.g., if we have explicitly locked tables).
*/
+#ifdef WITH_WSREP
+ if (!(WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()))
+#else
if (!mysql_bin_log.is_open())
+#endif
DBUG_RETURN(0);
/*
@@ -5660,8 +5939,12 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
DBUG_ENTER("THD::binlog_query");
DBUG_PRINT("enter", ("qtype: %s query: '%-.*s'",
show_query_type(qtype), (int) query_len, query_arg));
+#ifdef WITH_WSREP
+ DBUG_ASSERT(query_arg && (WSREP_EMULATE_BINLOG(this)
+ || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(query_arg && mysql_bin_log.is_open());
-
+#endif
/*
If we are not in prelocked mode, mysql_unlock_tables() will be
called after this binlog_query(), so we have to flush the pending
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 5ae777fe817..3427c7ea00d 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -57,6 +57,15 @@ void set_thd_stage_info(void *thd,
#include "my_apc.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+struct wsrep_thd_shadow {
+ ulonglong options;
+ enum wsrep_exec_mode wsrep_exec_mode;
+ Vio *vio;
+ ulong tx_isolation;
+};
+#endif
class Reprepare_observer;
class Relay_log_info;
class Rpl_filter;
@@ -613,6 +622,11 @@ typedef struct system_variables
ulong wt_timeout_short, wt_deadlock_search_depth_short;
ulong wt_timeout_long, wt_deadlock_search_depth_long;
+#ifdef WITH_WSREP
+ my_bool wsrep_on;
+ my_bool wsrep_causal_reads;
+ ulong wsrep_retry_autocommit;
+#endif
double long_query_time_double;
my_bool pseudo_slave_mode;
@@ -1015,6 +1029,9 @@ struct st_savepoint {
/** State of metadata locks before this savepoint was set. */
MDL_savepoint mdl_savepoint;
};
+#ifdef WITH_WSREP
+void wsrep_cleanup_transaction(THD *thd); // THD.transactions.cleanup calls it
+#endif
enum xa_states {XA_NOTR=0, XA_ACTIVE, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY};
extern const char *xa_state_names[];
@@ -1860,7 +1877,7 @@ public:
int is_current_stmt_binlog_format_row() const {
DBUG_ASSERT(current_stmt_binlog_format == BINLOG_FORMAT_STMT ||
current_stmt_binlog_format == BINLOG_FORMAT_ROW);
- return current_stmt_binlog_format == BINLOG_FORMAT_ROW;
+ return (WSREP_FORMAT((ulong)current_stmt_binlog_format) == BINLOG_FORMAT_ROW);
}
private:
@@ -1918,7 +1935,11 @@ public:
*/
CHANGED_TABLE_LIST* changed_tables;
MEM_ROOT mem_root; // Transaction-life memory allocation pool
+#ifdef WITH_WSREP
+ void cleanup(THD *thd)
+#else
void cleanup()
+#endif
{
DBUG_ENTER("thd::cleanup");
changed_tables= 0;
@@ -1932,6 +1953,11 @@ public:
if (!xid_state.rm_error)
xid_state.xid.null();
free_root(&mem_root,MYF(MY_KEEP_PREALLOC));
+#ifdef WITH_WSREP
+ // Todo: convert into a plugin method
+ // wsrep's post-commit. LOCAL_COMMIT designates wsrep's commit was ok
+ if (WSREP(thd)) wsrep_cleanup_transaction(thd);
+#endif /* WITH_WSREP */
DBUG_VOID_RETURN;
}
my_bool is_active()
@@ -2464,6 +2490,36 @@ public:
query_id_t first_query_id;
} binlog_evt_union;
+#ifdef WITH_WSREP
+ const bool wsrep_applier; /* dedicated slave applier thread */
+ bool wsrep_applier_closing; /* applier marked to close */
+ bool wsrep_client_thread; /* to identify client threads*/
+ enum wsrep_exec_mode wsrep_exec_mode;
+ query_id_t wsrep_last_query_id;
+ enum wsrep_query_state wsrep_query_state;
+ enum wsrep_conflict_state wsrep_conflict_state;
+ mysql_mutex_t LOCK_wsrep_thd;
+ mysql_cond_t COND_wsrep_thd;
+ wsrep_seqno_t wsrep_trx_seqno;
+ uint32 wsrep_rand;
+ Relay_log_info* wsrep_rli;
+ bool wsrep_converted_lock_session;
+ wsrep_trx_handle_t wsrep_trx_handle;
+ bool wsrep_seqno_changed;
+#ifdef WSREP_PROC_INFO
+ char wsrep_info[128]; /* string for dynamic proc info */
+#endif /* WSREP_PROC_INFO */
+ ulong wsrep_retry_counter; // of autocommit
+ bool wsrep_PA_safe;
+ char* wsrep_retry_query;
+ size_t wsrep_retry_query_len;
+ enum enum_server_command wsrep_retry_command;
+ enum wsrep_consistency_check_mode
+ wsrep_consistency_check;
+ wsrep_stats_var* wsrep_status_vars;
+ int wsrep_mysql_replicated;
+ THD* wsrep_bf_thd;
+#endif /* WITH_WSREP */
/**
Internal parser state.
Note that since the parser is not re-entrant, we keep only one parser
@@ -2495,7 +2551,11 @@ public:
/* Debug Sync facility. See debug_sync.cc. */
struct st_debug_sync_control *debug_sync_control;
#endif /* defined(ENABLED_DEBUG_SYNC) */
+#ifdef WITH_WSREP
+ THD(bool is_applier = false);
+#else
THD();
+#endif
~THD();
void init(void);
@@ -2967,7 +3027,7 @@ public:
tests fail and so force them to propagate the
lex->binlog_row_based_if_mixed upwards to the caller.
*/
- if ((variables.binlog_format == BINLOG_FORMAT_MIXED) &&
+ if ((WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_MIXED) &&
(in_sub_stmt == 0))
set_current_stmt_binlog_format_row();
@@ -3019,7 +3079,7 @@ public:
show_system_thread(system_thread)));
if (in_sub_stmt == 0)
{
- if (variables.binlog_format == BINLOG_FORMAT_ROW)
+ if (WSREP_FORMAT(variables.binlog_format) == BINLOG_FORMAT_ROW)
set_current_stmt_binlog_format_row();
else if (temporary_tables == NULL)
set_current_stmt_binlog_format_stmt();
diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc
index c3f2bb0ca11..b3af783c566 100644
--- a/sql/sql_connect.cc
+++ b/sql/sql_connect.cc
@@ -44,6 +44,9 @@ HASH global_index_stats;
extern mysql_mutex_t LOCK_global_user_client_stats;
extern mysql_mutex_t LOCK_global_table_stats;
extern mysql_mutex_t LOCK_global_index_stats;
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif
/*
Get structure for logging connection data for the current user
@@ -990,7 +993,11 @@ bool setup_connection_thread_globals(THD *thd)
{
if (thd->store_globals())
{
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif
statistic_increment(aborted_connects,&LOCK_status);
MYSQL_CALLBACK(thd->scheduler, end_thread, (thd, 0));
return 1; // Error
@@ -1063,6 +1070,17 @@ bool login_connection(THD *thd)
void end_connection(THD *thd)
{
NET *net= &thd->net;
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ wsrep_status_t rcode= wsrep->free_connection(wsrep, thd->thread_id);
+ if (rcode) {
+ WSREP_WARN("wsrep failed to free connection context: %lu, code: %d",
+ thd->thread_id, rcode);
+ }
+ }
+ thd->wsrep_client_thread= 0;
+#endif
plugin_thdvar_cleanup(thd);
if (thd->user_connect)
@@ -1195,6 +1213,9 @@ bool thd_prepare_connection(THD *thd)
(char *) thd->security_ctx->host_or_ip);
prepare_new_connection_state(thd);
+#ifdef WITH_WSREP
+ thd->wsrep_client_thread= 1;
+#endif /* WITH_WSREP */
return FALSE;
}
@@ -1218,7 +1239,11 @@ void do_handle_one_connection(THD *thd_arg)
if (MYSQL_CALLBACK_ELSE(thd->scheduler, init_new_connection_thread, (), 0))
{
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
+#endif
statistic_increment(aborted_connects,&LOCK_status);
MYSQL_CALLBACK(thd->scheduler, end_thread, (thd, 0));
return;
@@ -1268,9 +1293,21 @@ void do_handle_one_connection(THD *thd_arg)
break;
}
end_connection(thd);
-
+
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXITING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif
end_thread:
+#ifdef WITH_WSREP
+ close_connection(thd, 0, 1);
+#else
close_connection(thd);
+#endif
if (thd->userstat_running)
update_global_user_stats(thd, create_user, time(NULL));
diff --git a/sql/sql_delete.cc b/sql/sql_delete.cc
index d0a83eac189..1fed2a37e15 100644
--- a/sql/sql_delete.cc
+++ b/sql/sql_delete.cc
@@ -424,7 +424,11 @@ cleanup:
/* See similar binlogging code in sql_update.cc, for comments */
if ((error < 0) || thd->transaction.stmt.modified_non_trans_table)
{
+#ifdef WITH_WSREP
+ if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()))
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (error < 0)
@@ -875,7 +879,11 @@ void multi_delete::abort_result_set()
/*
there is only side effects; to binlog with the error
*/
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
/* possible error of writing binary log is ignored deliberately */
@@ -1051,7 +1059,11 @@ bool multi_delete::send_eof()
}
if ((local_error == 0) || thd->transaction.stmt.modified_non_trans_table)
{
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (local_error == 0)
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 4f025c7c335..34b4385ce94 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -1017,7 +1017,11 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
thd->transaction.stmt.modified_non_trans_table ||
was_insert_delayed)
{
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (error <= 0)
@@ -3641,8 +3645,15 @@ bool select_insert::send_eof()
DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'",
trans_table, table->file->table_type()));
+#ifdef WITH_WSREP
+ error= (thd->wsrep_conflict_state == MUST_ABORT ||
+ thd->wsrep_conflict_state == CERT_FAILURE) ? -1 :
+ (thd->locked_tables_mode <= LTM_LOCK_TABLES ?
+ table->file->ha_end_bulk_insert() : 0);
+#else
error= (thd->locked_tables_mode <= LTM_LOCK_TABLES ?
table->file->ha_end_bulk_insert() : 0);
+#endif /* WITH_WSREP */
if (!error && thd->is_error())
error= thd->stmt_da->sql_errno();
@@ -3670,8 +3681,13 @@ bool select_insert::send_eof()
events are in the transaction cache and will be written when
ha_autocommit_or_rollback() is issued below.
*/
+#ifdef WITH_WSREP
+ if ((WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) &&
+ (!error || thd->transaction.stmt.modified_non_trans_table))
+#else
if (mysql_bin_log.is_open() &&
(!error || thd->transaction.stmt.modified_non_trans_table))
+#endif
{
int errcode= 0;
if (!error)
@@ -3754,7 +3770,11 @@ void select_insert::abort_result_set() {
if (!can_rollback_data())
thd->transaction.all.modified_non_trans_table= TRUE;
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
/* error of writing binary log is ignored */
@@ -4141,7 +4161,11 @@ select_create::binlog_show_create_table(TABLE **tables, uint count)
/* show_database */ TRUE);
DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif /* WITH_WSREP */
{
int errcode= query_error_code(thd, thd->killed == NOT_KILLED);
result= thd->binlog_query(THD::STMT_QUERY_TYPE,
@@ -4151,6 +4175,9 @@ select_create::binlog_show_create_table(TABLE **tables, uint count)
/* suppress_use */ FALSE,
errcode);
}
+#ifdef WITH_WSREP
+ ha_wsrep_fake_trx_id(thd);
+#endif
return result;
}
@@ -4207,6 +4234,18 @@ bool select_create::send_eof()
{
trans_commit_stmt(thd);
trans_commit_implicit(thd);
+#ifdef WITH_WSREP
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state != NO_CONFLICT)
+ {
+ WSREP_DEBUG("select_create commit failed, thd: %lu err: %d %s",
+ thd->thread_id, thd->wsrep_conflict_state, thd->query());
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ abort_result_set();
+ return TRUE;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+#endif /* WITH_WSREP */
}
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
diff --git a/sql/sql_lex.cc b/sql/sql_lex.cc
index fbe5bcd57f2..1ba132b534b 100644
--- a/sql/sql_lex.cc
+++ b/sql/sql_lex.cc
@@ -1574,6 +1574,17 @@ int lex_one_token(void *arg, void *yythd)
}
else
{
+#ifdef WITH_WSREP
+ if (version == 99997 && thd->wsrep_exec_mode == LOCAL_STATE)
+ {
+ WSREP_DEBUG("consistency check: %s", thd->query());
+ thd->wsrep_consistency_check= CONSISTENCY_CHECK_DECLARED;
+ lip->yySkipn(5);
+ lip->set_echo(TRUE);
+ state=MY_LEX_START;
+ break; /* Do not treat contents as a comment. */
+ }
+#endif /* WITH_WSREP */
/*
Patch and skip the conditional comment to avoid it
being propagated infinitely (eg. to a slave).
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index ea0a19fc5f3..888aebd52b2 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -104,6 +104,16 @@
#include "../storage/maria/ha_maria.h"
#endif
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#include "rpl_rli.h"
+static void wsrep_client_rollback(THD *thd);
+
+extern Format_description_log_event *wsrep_format_desc;
+
+static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
+ Parser_state *parser_state);
+#endif /* WITH_WSREP */
/**
@defgroup Runtime_Environment Runtime Environment
@{
@@ -596,6 +606,13 @@ bool is_log_table_write_query(enum enum_sql_command command)
return (sql_command_flags[command] & CF_WRITE_LOGS_COMMAND) != 0;
}
+#ifdef WITH_WSREP
+bool is_show_query(enum enum_sql_command command)
+{
+ DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
+ return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
+}
+#endif
void execute_init_command(THD *thd, LEX_STRING *init_command,
mysql_rwlock_t *var_lock)
{
@@ -792,8 +809,12 @@ void do_handle_bootstrap(THD *thd)
if (my_thread_init() || thd->store_globals())
{
#ifndef EMBEDDED_LIBRARY
+#ifdef WITH_WSREP
+ close_connection(thd, ER_OUT_OF_RESOURCES, 1);
+#else
close_connection(thd, ER_OUT_OF_RESOURCES);
#endif
+#endif
thd->fatal_error();
goto end;
}
@@ -866,7 +887,18 @@ bool do_command(THD *thd)
NET *net= &thd->net;
enum enum_server_command command;
DBUG_ENTER("do_command");
-
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_IDLE;
+ if (thd->wsrep_conflict_state==MUST_ABORT)
+ {
+ wsrep_client_rollback(thd);
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif
/*
indicator of uninitialized lex => normal flow of errors handling
(see my_message_sql)
@@ -915,6 +947,30 @@ bool do_command(THD *thd)
thd->m_server_idle= TRUE;
packet_length= my_net_read(net);
thd->m_server_idle= FALSE;
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+
+ /* these THD's are aborted or are aborting during being idle */
+ if (thd->wsrep_conflict_state == ABORTING)
+ {
+ while (thd->wsrep_conflict_state == ABORTING) {
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ my_sleep(1000);
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ }
+ thd->store_globals();
+ }
+ else if (thd->wsrep_conflict_state == ABORTED)
+ {
+ thd->store_globals();
+ thd->wsrep_bf_thd = NULL;
+ }
+
+ thd->wsrep_query_state= QUERY_EXEC;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif /* WITH_WSREP */
if (packet_length == packet_error)
{
@@ -922,6 +978,17 @@ bool do_command(THD *thd)
net->error,
vio_description(net->vio)));
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state == MUST_ABORT)
+ {
+ DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id));
+ wsrep_client_rollback(thd);
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif
/* Instrument this broken statement as "statement/com/error" */
thd->m_statement_psi= MYSQL_REFINE_STATEMENT(thd->m_statement_psi,
com_statement_info[COM_END].
@@ -976,12 +1043,54 @@ bool do_command(THD *thd)
vio_description(net->vio), command,
command_name[command].str));
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ /*
+ * bail out if DB snapshot has not been installed. We however,
+ * allow queries "SET" and "SHOW", they are trapped later in execute_command
+ */
+ if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready &&
+ command != COM_QUERY &&
+ command != COM_PING &&
+ command != COM_QUIT &&
+ command != COM_PROCESS_INFO &&
+ command != COM_PROCESS_KILL &&
+ command != COM_SET_OPTION &&
+ command != COM_SHUTDOWN &&
+ command != COM_SLEEP &&
+ command != COM_STATISTICS &&
+ command != COM_TIME &&
+ command != COM_END
+ ) {
+ my_error(ER_UNKNOWN_COM_ERROR, MYF(0),
+ "WSREP has not yet prepared node for application use");
+ thd->protocol->end_statement();
+ return_value= FALSE;
+ goto out;
+ }
+ }
+#endif
/* Restore read timeout value */
my_net_set_read_timeout(net, thd->variables.net_read_timeout);
DBUG_ASSERT(packet_length);
return_value= dispatch_command(command, thd, packet+1, (uint) (packet_length-1));
-
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT)
+ {
+ return_value= dispatch_command(command, thd, thd->wsrep_retry_query,
+ thd->wsrep_retry_query_len);
+ }
+ }
+ if (thd->wsrep_retry_query && thd->wsrep_conflict_state != REPLAYING)
+ {
+ my_free(thd->wsrep_retry_query);
+ thd->wsrep_retry_query = NULL;
+ thd->wsrep_retry_query_len = 0;
+ thd->wsrep_retry_command = COM_CONNECT;
+ }
+#endif
out:
/* The statement instrumentation must be closed in all cases. */
DBUG_ASSERT(thd->m_statement_psi == NULL);
@@ -1057,6 +1166,33 @@ static my_bool deny_updates_if_read_only_option(THD *thd,
DBUG_RETURN(FALSE);
}
+#ifdef WITH_WSREP
+static my_bool wsrep_read_only_option(THD *thd, TABLE_LIST *all_tables)
+{
+ int opt_readonly_saved = opt_readonly;
+ ulong flag_saved = (ulong)(thd->security_ctx->master_access & SUPER_ACL);
+
+ opt_readonly = 0;
+ thd->security_ctx->master_access &= ~SUPER_ACL;
+
+ my_bool ret = !deny_updates_if_read_only_option(thd, all_tables);
+
+ opt_readonly = opt_readonly_saved;
+ thd->security_ctx->master_access |= flag_saved;
+
+ return ret;
+}
+
+static void wsrep_copy_query(THD *thd)
+{
+ thd->wsrep_retry_command = thd->get_command();
+ thd->wsrep_retry_query_len = thd->query_length();
+ thd->wsrep_retry_query = (char *)my_malloc(
+ thd->wsrep_retry_query_len + 1, MYF(0));
+ strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len);
+ thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0';
+}
+#endif /* WITH_WSREP */
/**
Perform one connection-level (COM_XXXX) command.
@@ -1086,6 +1222,43 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
DBUG_ENTER("dispatch_command");
DBUG_PRINT("info", ("command: %d", command));
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ if (!thd->in_multi_stmt_transaction_mode())
+ {
+ thd->wsrep_PA_safe= true;
+ }
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXEC;
+ if (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT)
+ {
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ }
+ if (thd->wsrep_conflict_state== MUST_ABORT)
+ {
+ wsrep_client_rollback(thd);
+ }
+ if (thd->wsrep_conflict_state== ABORTED)
+ {
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
+ WSREP_DEBUG("Deadlock error for: %s", thd->query());
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ thd->killed = NOT_KILLED;
+ thd->mysys_var->abort = 0;
+ thd->wsrep_conflict_state = NO_CONFLICT;
+ thd->wsrep_retry_counter = 0;
+ thd->wsrep_bf_thd = NULL;
+ /*
+ Increment threads running to compensate dec_thread_running() called
+ after dispatch_end label.
+ */
+ inc_thread_running();
+ goto dispatch_end;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+#endif /* WITH_WSREP */
#if defined(ENABLED_PROFILING)
thd->profiling.start_new_query();
#endif
@@ -1271,7 +1444,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
if (parser_state.init(thd, thd->query(), thd->query_length()))
break;
+#ifdef WITH_WSREP
+ wsrep_mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
+#else
mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
+#endif
while (!thd->killed && (parser_state.m_lip.found_semicolon != NULL) &&
! thd->is_error())
@@ -1337,10 +1514,19 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
Count each statement from the client.
*/
statistic_increment(thd->status_var.questions, &LOCK_status);
+#ifdef WITH_WSREP
+ if (!WSREP(thd))
+ thd->set_time(); /* Reset the query start time. */
+#else
thd->set_time(); /* Reset the query start time. */
+#endif
parser_state.reset(beginning_of_next_stmt, length);
/* TODO: set thd->lex->sql_command to SQLCOM_END here */
+#ifdef WITH_WSREP
+ wsrep_mysql_parse(thd, beginning_of_next_stmt, length, &parser_state);
+#else
mysql_parse(thd, beginning_of_next_stmt, length, &parser_state);
+#endif
}
DBUG_PRINT("info",("query ready"));
@@ -1659,6 +1845,23 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0));
break;
}
+#ifdef WITH_WSREP
+ dispatch_end:
+
+ if (WSREP(thd)) {
+ /* wsrep BF abort in query exec phase */
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if ((thd->wsrep_conflict_state != REPLAYING) &&
+ (thd->wsrep_conflict_state != RETRY_AUTOCOMMIT))
+ {
+ thd->update_server_status();
+ thd->protocol->end_statement();
+ query_cache_end_of_result(thd);
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ } else { /* if (WSREP(thd))... */
+#endif /* WITH_WSREP */
DBUG_ASSERT(thd->derived_tables == NULL &&
(thd->open_tables == NULL ||
(thd->locked_tables_mode == LTM_LOCK_TABLES)));
@@ -1668,6 +1871,9 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->update_server_status();
thd->protocol->end_statement();
query_cache_end_of_result(thd);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
if (!thd->is_error() && !thd->killed_errno())
mysql_audit_general(thd, MYSQL_AUDIT_GENERAL_RESULT, 0, 0);
@@ -2332,7 +2538,66 @@ mysql_execute_command(THD *thd)
#ifdef HAVE_REPLICATION
} /* endif unlikely slave */
#endif
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ /*
+ change LOCK TABLE WRITE to transaction
+ */
+ if (lex->sql_command== SQLCOM_LOCK_TABLES && wsrep_convert_LOCK_to_trx)
+ {
+ for (TABLE_LIST *table= all_tables; table; table= table->next_global)
+ {
+ if (table->lock_type >= TL_WRITE_ALLOW_WRITE)
+ {
+ lex->sql_command= SQLCOM_BEGIN;
+ thd->wsrep_converted_lock_session= true;
+ break;
+ }
+ }
+ }
+ if (lex->sql_command== SQLCOM_UNLOCK_TABLES &&
+ thd->wsrep_converted_lock_session)
+ {
+ thd->wsrep_converted_lock_session= false;
+ lex->sql_command= SQLCOM_COMMIT;
+ lex->tx_release= TVL_NO;
+ }
+ /*
+ * bail out if DB snapshot has not been installed. We however,
+ * allow SET and SHOW queries
+ */
+ if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready &&
+ lex->sql_command != SQLCOM_SET_OPTION &&
+ !is_show_query(lex->sql_command))
+ {
+#if DIRTY_HACK
+ /* Dirty hack for lp:1002714 - trying to recognize mysqldump connection
+ * and allow it to continue. Actuall mysqldump_magic_str may be longer
+ * and is obviously version dependent and may be issued by any client
+ * connection after which connection becomes non-replicating. */
+ static char const mysqldump_magic_str[]=
+"SELECT LOGFILE_GROUP_NAME, FILE_NAME, TOTAL_EXTENTS, INITIAL_SIZE, ENGINE, EXTRA FROM INFORMATION_SCHEMA.FILES WHERE FILE_TYPE = 'UNDO LOG' AND FILE_NAME IS NOT NULL";
+ static const size_t mysqldump_magic_str_len= sizeof(mysqldump_magic_str) -1;
+ if (SQLCOM_SELECT != lex->sql_command ||
+ thd->query_length() < mysqldump_magic_str_len ||
+ strncmp(thd->query(), mysqldump_magic_str, mysqldump_magic_str_len))
+ {
+#endif /* DIRTY_HACK */
+ my_error(ER_UNKNOWN_COM_ERROR, MYF(0),
+ "WSREP has not yet prepared node for application use");
+ goto error;
+#if DIRTY_HACK
+ }
+ else
+ {
+ /* mysqldump connection, allow all further queries to pass */
+ thd->variables.wsrep_on= FALSE;
+ }
+#endif /* DIRTY_HACK */
+ }
+ }
+#endif /* WITH_WSREP */
status_var_increment(thd->status_var.com_stat[lex->sql_command]);
thd->progress.report_to_client= test(sql_command_flags[lex->sql_command] &
CF_REPORT_PROGRESS);
@@ -2404,6 +2669,9 @@ mysql_execute_command(THD *thd)
#endif
case SQLCOM_SHOW_STATUS_PROC:
case SQLCOM_SHOW_STATUS_FUNC:
+#ifdef WITH_WSREP
+ if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
if ((res= check_table_access(thd, SELECT_ACL, all_tables, FALSE,
UINT_MAX, FALSE)))
goto error;
@@ -2449,17 +2717,27 @@ mysql_execute_command(THD *thd)
case SQLCOM_SHOW_PLUGINS:
case SQLCOM_SHOW_FIELDS:
case SQLCOM_SHOW_KEYS:
+#ifndef WITH_WSREP
case SQLCOM_SHOW_VARIABLES:
case SQLCOM_SHOW_CHARSETS:
case SQLCOM_SHOW_COLLATIONS:
case SQLCOM_SHOW_STORAGE_ENGINES:
case SQLCOM_SHOW_PROFILE:
+#endif /* WITH_WSREP */
case SQLCOM_SHOW_CLIENT_STATS:
case SQLCOM_SHOW_USER_STATS:
case SQLCOM_SHOW_TABLE_STATS:
case SQLCOM_SHOW_INDEX_STATS:
case SQLCOM_SELECT:
- {
+#ifdef WITH_WSREP
+ if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
+ case SQLCOM_SHOW_VARIABLES:
+ case SQLCOM_SHOW_CHARSETS:
+ case SQLCOM_SHOW_COLLATIONS:
+ case SQLCOM_SHOW_STORAGE_ENGINES:
+ case SQLCOM_SHOW_PROFILE:
+#endif /* WITH_WSREP */
+ {
thd->status_var.last_query_cost= 0.0;
/*
@@ -2815,7 +3093,7 @@ case SQLCOM_PREPARE:
*/
if (thd->query_name_consts &&
mysql_bin_log.is_open() &&
- thd->variables.binlog_format == BINLOG_FORMAT_STMT &&
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT &&
!mysql_bin_log.is_query_in_union(thd, thd->query_id))
{
List_iterator_fast<Item> it(select_lex->item_list);
@@ -2926,6 +3204,12 @@ case SQLCOM_PREPARE:
if (create_info.options & HA_LEX_CREATE_TMP_TABLE)
thd->variables.option_bits|= OPTION_KEEP_LOG;
/* regular create */
+#ifdef WITH_WSREP
+ if (!thd->is_current_stmt_binlog_format_row() ||
+ !(create_info.options & HA_LEX_CREATE_TMP_TABLE))
+ WSREP_TO_ISOLATION_BEGIN(create_table->db, create_table->table_name,
+ NULL)
+#endif /* WITH_WSREP */
if (create_info.options & HA_LEX_CREATE_TABLE_LIKE)
{
/* CREATE TABLE ... LIKE ... */
@@ -2967,6 +3251,7 @@ end_with_restore_list:
DBUG_ASSERT(first_table == all_tables && first_table != 0);
if (check_one_table_access(thd, INDEX_ACL, all_tables))
goto error; /* purecov: inspected */
+ WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name, NULL)
/*
Currently CREATE INDEX or DROP INDEX cause a full table rebuild
and thus classify as slow administrative statements just like
@@ -3063,6 +3348,7 @@ end_with_restore_list:
#endif /* HAVE_REPLICATION */
case SQLCOM_RENAME_TABLE:
{
+ WSREP_TO_ISOLATION_BEGIN(0, 0, first_table)
if (execute_rename_table(thd, first_table, all_tables))
goto error;
break;
@@ -3090,6 +3376,10 @@ end_with_restore_list:
goto error;
#else
{
+#ifdef WITH_WSREP
+ if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
+
/*
Access check:
SHOW CREATE TABLE require any privileges on the table level (ie
@@ -3145,6 +3435,10 @@ end_with_restore_list:
case SQLCOM_CHECKSUM:
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
+#ifdef WITH_WSREP
+ if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error;
+#endif /* WITH_WSREP */
+
if (check_table_access(thd, SELECT_ACL, all_tables,
FALSE, UINT_MAX, FALSE))
goto error; /* purecov: inspected */
@@ -3342,6 +3636,15 @@ end_with_restore_list:
DBUG_ASSERT(first_table == all_tables && first_table != 0);
if ((res= insert_precheck(thd, all_tables)))
break;
+#ifdef WITH_WSREP
+ if (lex->sql_command == SQLCOM_INSERT_SELECT &&
+ thd->wsrep_consistency_check == CONSISTENCY_CHECK_DECLARED)
+ {
+ thd->wsrep_consistency_check = CONSISTENCY_CHECK_RUNNING;
+ WSREP_TO_ISOLATION_BEGIN(first_table->db, first_table->table_name, NULL);
+ }
+
+#endif
/*
INSERT...SELECT...ON DUPLICATE KEY UPDATE/REPLACE SELECT/
INSERT...IGNORE...SELECT can be unsafe, unless ORDER BY PRIMARY KEY
@@ -3505,6 +3808,18 @@ end_with_restore_list:
/* So that DROP TEMPORARY TABLE gets to binlog at commit/rollback */
thd->variables.option_bits|= OPTION_KEEP_LOG;
}
+#ifdef WITH_WSREP
+ for (TABLE_LIST *table= all_tables; table; table= table->next_global)
+ {
+ if (!lex->drop_temporary &&
+ (!thd->is_current_stmt_binlog_format_row() ||
+ !find_temporary_table(thd, table)))
+ {
+ WSREP_TO_ISOLATION_BEGIN(NULL, NULL, all_tables);
+ break;
+ }
+ }
+#endif /* WITH_WSREP */
/* DDL and binlog write order are protected by metadata locks. */
res= mysql_rm_table(thd, first_table, lex->check_exists,
lex->drop_temporary);
@@ -3548,7 +3863,6 @@ end_with_restore_list:
if (!mysql_change_db(thd, &db_str, FALSE))
my_ok(thd);
-
break;
}
@@ -3691,6 +4005,7 @@ end_with_restore_list:
#endif
if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0))
break;
+ WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL, NULL)
res= mysql_create_db(thd,(lower_case_table_names == 2 ? alias :
lex->name.str), &create_info, 0);
break;
@@ -3720,6 +4035,7 @@ end_with_restore_list:
#endif
if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0))
break;
+ WSREP_TO_ISOLATION_BEGIN(lex->name.str, NULL, NULL)
res= mysql_rm_db(thd, lex->name.str, lex->check_exists, 0);
break;
}
@@ -3748,6 +4064,7 @@ end_with_restore_list:
res= 1;
break;
}
+ WSREP_TO_ISOLATION_BEGIN(db->str, NULL, NULL)
res= mysql_upgrade_db(thd, db);
if (!res)
my_ok(thd);
@@ -3780,6 +4097,7 @@ end_with_restore_list:
#endif
if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))
break;
+ WSREP_TO_ISOLATION_BEGIN(db->str, NULL, NULL)
res= mysql_alter_db(thd, db->str, &create_info);
break;
}
@@ -3812,6 +4130,7 @@ end_with_restore_list:
if (res)
break;
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
switch (lex->sql_command) {
case SQLCOM_CREATE_EVENT:
{
@@ -3846,6 +4165,7 @@ end_with_restore_list:
lex->spname->m_name);
break;
case SQLCOM_DROP_EVENT:
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
if (!(res= Events::drop_event(thd,
lex->spname->m_db, lex->spname->m_name,
lex->check_exists)))
@@ -3860,6 +4180,7 @@ end_with_restore_list:
if (check_access(thd, INSERT_ACL, "mysql", NULL, NULL, 1, 0))
break;
#ifdef HAVE_DLOPEN
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
if (!(res = mysql_create_function(thd, &lex->udf)))
my_ok(thd);
#else
@@ -3874,6 +4195,7 @@ end_with_restore_list:
if (check_access(thd, INSERT_ACL, "mysql", NULL, NULL, 1, 1) &&
check_global_access(thd,CREATE_USER_ACL))
break;
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
/* Conditionally writes to binlog */
if (!(res= mysql_create_user(thd, lex->users_list)))
my_ok(thd);
@@ -3885,6 +4207,7 @@ end_with_restore_list:
check_global_access(thd,CREATE_USER_ACL))
break;
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
if (!(res= mysql_drop_user(thd, lex->users_list)))
my_ok(thd);
break;
@@ -3895,6 +4218,7 @@ end_with_restore_list:
check_global_access(thd,CREATE_USER_ACL))
break;
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
if (!(res= mysql_rename_user(thd, lex->users_list)))
my_ok(thd);
break;
@@ -3909,6 +4233,7 @@ end_with_restore_list:
thd->binlog_invoker();
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
if (!(res = mysql_revoke_all(thd, lex->users_list)))
my_ok(thd);
break;
@@ -3975,6 +4300,7 @@ end_with_restore_list:
lex->type == TYPE_ENUM_PROCEDURE, 0))
goto error;
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
res= mysql_routine_grant(thd, all_tables,
lex->type == TYPE_ENUM_PROCEDURE,
lex->users_list, grants,
@@ -3988,6 +4314,7 @@ end_with_restore_list:
all_tables, FALSE, UINT_MAX, FALSE))
goto error;
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
res= mysql_table_grant(thd, all_tables, lex->users_list,
lex->columns, lex->grant,
lex->sql_command == SQLCOM_REVOKE);
@@ -4003,6 +4330,7 @@ end_with_restore_list:
}
else
{
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
/* Conditionally writes to binlog */
res = mysql_grant(thd, select_lex->db, lex->users_list, lex->grant,
lex->sql_command == SQLCOM_REVOKE,
@@ -4141,9 +4469,17 @@ end_with_restore_list:
able to open it (with SQLCOM_HA_OPEN) in the first place.
*/
unit->set_limit(select_lex);
+#ifdef WITH_WSREP
+ { char* tmp_info= NULL;
+ if (WSREP(thd)) tmp_info = (char *)thd_proc_info(thd, "mysql_ha_read()");
+#endif /* WITH_WSREP */
res= mysql_ha_read(thd, first_table, lex->ha_read_mode, lex->ident.str,
lex->insert_list, lex->ha_rkey_mode, select_lex->where,
unit->select_limit_cnt, unit->offset_limit_cnt);
+#ifdef WITH_WSREP
+ if (WSREP(thd)) thd_proc_info(thd, tmp_info);
+ }
+#endif /* WITH_WSREP */
break;
case SQLCOM_BEGIN:
@@ -4213,8 +4549,20 @@ end_with_restore_list:
/* Disconnect the current client connection. */
if (tx_release)
thd->killed= KILL_CONNECTION;
- my_ok(thd);
- break;
+ #ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ if (thd->wsrep_conflict_state == NO_CONFLICT ||
+ thd->wsrep_conflict_state == REPLAYING)
+ {
+ my_ok(thd);
+ }
+ } else {
+#endif /* WITH_WSREP */
+ my_ok(thd);
+ #ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
+ break;
}
case SQLCOM_RELEASE_SAVEPOINT:
if (trans_release_savepoint(thd, lex->ident))
@@ -4282,6 +4630,7 @@ end_with_restore_list:
if (sp_process_definer(thd))
goto create_sp_error;
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
res= (sp_result= sp_create_routine(thd, lex->sphead->m_type, lex->sphead));
switch (sp_result) {
case SP_OK: {
@@ -4493,6 +4842,7 @@ create_sp_error:
already puts on CREATE FUNCTION.
*/
/* Conditionally writes to binlog */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
sp_result= sp_update_routine(thd, type, lex->spname, &lex->sp_chistics);
switch (sp_result)
{
@@ -4564,6 +4914,7 @@ create_sp_error:
if (check_routine_access(thd, ALTER_PROC_ACL, db, name,
lex->sql_command == SQLCOM_DROP_PROCEDURE, 0))
goto error;
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
/* Conditionally writes to binlog */
sp_result= sp_drop_routine(thd, type, lex->spname);
@@ -4681,6 +5032,7 @@ create_sp_error:
Note: SQLCOM_CREATE_VIEW also handles 'ALTER VIEW' commands
as specified through the thd->lex->create_view_mode flag.
*/
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
res= mysql_create_view(thd, first_table, thd->lex->create_view_mode);
break;
}
@@ -4689,12 +5041,14 @@ create_sp_error:
if (check_table_access(thd, DROP_ACL, all_tables, FALSE, UINT_MAX, FALSE))
goto error;
/* Conditionally writes to binlog. */
+ WSREP_TO_ISOLATION_BEGIN(NULL, NULL, NULL)
res= mysql_drop_view(thd, first_table, thd->lex->drop_mode);
break;
}
case SQLCOM_CREATE_TRIGGER:
{
/* Conditionally writes to binlog. */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
res= mysql_create_or_drop_trigger(thd, all_tables, 1);
break;
@@ -4702,6 +5056,7 @@ create_sp_error:
case SQLCOM_DROP_TRIGGER:
{
/* Conditionally writes to binlog. */
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
res= mysql_create_or_drop_trigger(thd, all_tables, 0);
break;
}
@@ -4754,11 +5109,13 @@ create_sp_error:
my_ok(thd);
break;
case SQLCOM_INSTALL_PLUGIN:
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
if (! (res= mysql_install_plugin(thd, &thd->lex->comment,
&thd->lex->ident)))
my_ok(thd);
break;
case SQLCOM_UNINSTALL_PLUGIN:
+ WSREP_TO_ISOLATION_BEGIN(WSREP_MYSQL_DB, NULL, NULL)
if (! (res= mysql_uninstall_plugin(thd, &thd->lex->comment,
&thd->lex->ident)))
my_ok(thd);
@@ -4919,6 +5276,9 @@ finish:
/* Free tables */
THD_STAGE_INFO(thd, stage_closing_tables);
close_thread_tables(thd);
+#ifdef WITH_WSREP
+ thd->wsrep_consistency_check= NO_CONSISTENCY_CHECK;
+#endif /* WITH_WSREP */
#ifndef DBUG_OFF
if (lex->sql_command != SQLCOM_SET_OPTION && ! thd->in_sub_stmt)
@@ -4959,6 +5319,7 @@ finish:
{
thd->mdl_context.release_statement_locks();
}
+ WSREP_TO_ISOLATION_END
DBUG_RETURN(res || thd->is_error());
}
@@ -5027,6 +5388,9 @@ static bool execute_sqlcom_select(THD *thd, TABLE_LIST *all_tables)
if (!thd->get_sent_row_count())
status_var_increment(thd->status_var.empty_queries);
status_var_add(thd->status_var.rows_sent, thd->get_sent_row_count());
+#ifdef WITH_WSREP
+ if (lex->sql_command == SQLCOM_SHOW_STATUS) wsrep_free_status(thd);
+#endif /* WITH_WSREP */
return res;
}
@@ -5847,6 +6211,21 @@ void THD::reset_for_next_command(bool calculate_userstat)
thd->auto_inc_intervals_in_cur_stmt_for_binlog.empty();
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
+#ifdef WITH_WSREP
+ if (WSREP(thd)) {
+ if (wsrep_auto_increment_control)
+ {
+ if (thd->variables.auto_increment_offset !=
+ global_system_variables.auto_increment_offset)
+ thd->variables.auto_increment_offset=
+ global_system_variables.auto_increment_offset;
+ if (thd->variables.auto_increment_increment !=
+ global_system_variables.auto_increment_increment)
+ thd->variables.auto_increment_increment=
+ global_system_variables.auto_increment_increment;
+ }
+ }
+#endif /* WITH_WSREP */
thd->query_start_used= 0;
thd->query_start_sec_part_used= 0;
thd->is_fatal_error= thd->time_zone_used= 0;
@@ -6056,6 +6435,179 @@ void mysql_init_multi_delete(LEX *lex)
lex->query_tables_last= &lex->query_tables;
}
+#ifdef WITH_WSREP
+void wsrep_replay_transaction(THD *thd)
+{
+ /* checking if BF trx must be replayed */
+ if (thd->wsrep_conflict_state== MUST_REPLAY)
+ {
+ if (thd->wsrep_exec_mode!= REPL_RECV)
+ {
+ if (thd->stmt_da->is_sent)
+ {
+ WSREP_ERROR("replay issue, thd has reported status already");
+ }
+ thd->stmt_da->reset_diagnostics_area();
+
+ thd->wsrep_conflict_state= REPLAYING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ mysql_reset_thd_for_next_command(thd, opt_userstat_running);
+ thd->killed= NOT_KILLED;
+ close_thread_tables(thd);
+ if (thd->locked_tables_mode && thd->lock)
+ {
+ WSREP_DEBUG("releasing table lock for replaying (%ld)",
+ thd->thread_id);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
+ }
+ thd->mdl_context.release_transactional_locks();
+
+ thd_proc_info(thd, "wsrep replaying trx");
+ WSREP_DEBUG("replay trx: %s %lld",
+ thd->query() ? thd->query() : "void",
+ (long long)thd->wsrep_trx_seqno);
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(thd, &shadow);
+ int rcode = wsrep->replay_trx(wsrep,
+ &thd->wsrep_trx_handle,
+ (void *)thd);
+
+ wsrep_return_from_bf_mode(thd, &shadow);
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+
+ switch (rcode)
+ {
+ case WSREP_OK:
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ wsrep->post_commit(wsrep, &thd->wsrep_trx_handle);
+ WSREP_DEBUG("trx_replay successful for: %ld %llu",
+ thd->thread_id, (long long)thd->real_id);
+ break;
+ case WSREP_TRX_FAIL:
+ if (thd->stmt_da->is_sent)
+ {
+ WSREP_ERROR("replay failed, thd has reported status");
+ }
+ else
+ {
+ WSREP_DEBUG("replay failed, rolling back");
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
+ }
+ thd->wsrep_conflict_state= ABORTED;
+ thd->wsrep_bf_thd = NULL;
+ wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle);
+ break;
+ default:
+ WSREP_ERROR("trx_replay failed for: %d, query: %s",
+ rcode, thd->query() ? thd->query() : "void");
+ /* we're now in inconsistent state, must abort */
+ unireg_abort(1);
+ break;
+ }
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying--;
+ WSREP_DEBUG("replaying decreased: %d, thd: %lu",
+ wsrep_replaying, thd->thread_id);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+ }
+}
+
+static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
+ Parser_state *parser_state)
+{
+ bool is_autocommit=
+ !thd->in_multi_stmt_transaction_mode() &&
+ thd->wsrep_conflict_state == NO_CONFLICT &&
+ !thd->wsrep_applier &&
+ wsrep_read_only_option(thd, thd->lex->query_tables);
+
+ do
+ {
+ if (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT)
+ {
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ }
+ mysql_parse(thd, rawbuf, length, parser_state);
+
+ if (WSREP(thd)) {
+ /* wsrep BF abort in query exec phase */
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ wsrep_client_rollback(thd);
+
+ WSREP_DEBUG("abort in exec query state, avoiding autocommit");
+ }
+
+ if (thd->wsrep_conflict_state== MUST_REPLAY)
+ {
+ wsrep_replay_transaction(thd);
+ }
+
+ /* setting error code for BF aborted trxs */
+ if (thd->wsrep_conflict_state == ABORTED ||
+ thd->wsrep_conflict_state == CERT_FAILURE)
+ {
+ mysql_reset_thd_for_next_command(thd, opt_userstat_running);
+ thd->killed= NOT_KILLED;
+ if (is_autocommit &&
+ thd->lex->sql_command != SQLCOM_SELECT &&
+ (thd->wsrep_retry_counter < thd->variables.wsrep_retry_autocommit))
+ {
+ WSREP_DEBUG("wsrep retrying AC query: %s",
+ (thd->query()) ? thd->query() : "void");
+
+ close_thread_tables(thd);
+
+ thd->wsrep_conflict_state= RETRY_AUTOCOMMIT;
+ thd->wsrep_retry_counter++; // grow
+ wsrep_copy_query(thd);
+ thd->set_time();
+ parser_state->reset(rawbuf, length);
+ }
+ else
+ {
+ WSREP_DEBUG("%s, thd: %lu is_AC: %d, retry: %lu - %lu SQL: %s",
+ (thd->wsrep_conflict_state == ABORTED) ?
+ "BF Aborted" : "cert failure",
+ thd->thread_id, is_autocommit, thd->wsrep_retry_counter,
+ thd->variables.wsrep_retry_autocommit, thd->query());
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
+ thd->killed= NOT_KILLED;
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ if (thd->wsrep_conflict_state != REPLAYING)
+ thd->wsrep_retry_counter= 0; // reset
+ }
+ }
+ else
+ {
+ set_if_smaller(thd->wsrep_retry_counter, 0); // reset; eventually ok
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ }
+ } while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT);
+
+ if (thd->wsrep_retry_query)
+ {
+ WSREP_DEBUG("releasing retry_query: conf %d sent %d kill %d errno %d SQL %s",
+ thd->wsrep_conflict_state,
+ thd->stmt_da->is_sent,
+ thd->killed,
+ thd->stmt_da->is_error() ? thd->stmt_da->sql_errno() : 0,
+ thd->wsrep_retry_query);
+ my_free(thd->wsrep_retry_query);
+ thd->wsrep_retry_query = NULL;
+ thd->wsrep_retry_query_len = 0;
+ thd->wsrep_retry_command = COM_CONNECT;
+ }
+}
+#endif /* WITH_WSREP */
/*
When you modify mysql_parse(), you may need to mofify
@@ -7063,8 +7615,14 @@ uint kill_one_thread(THD *thd, ulong id, killed_state kill_signal)
faster and do a harder kill than KILL_SYSTEM_THREAD;
*/
+#ifdef WITH_WSREP
+ if (((thd->security_ctx->master_access & SUPER_ACL) ||
+ thd->security_ctx->user_matches(tmp->security_ctx)) &&
+ !wsrep_thd_is_brute_force((void *)tmp))
+#else
if ((thd->security_ctx->master_access & SUPER_ACL) ||
thd->security_ctx->user_matches(tmp->security_ctx))
+#endif /* WITH_WSREP */
{
tmp->awake(kill_signal);
error=0;
@@ -7839,6 +8397,586 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name)
return definer;
}
+#ifdef WITH_WSREP
+/* must have (&thd->LOCK_wsrep_thd) */
+static void wsrep_client_rollback(THD *thd)
+{
+ WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s",
+ thd->thread_id, thd->query());
+
+ thd->wsrep_conflict_state= ABORTING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ trans_rollback(thd);
+
+ if (thd->locked_tables_mode && thd->lock)
+ {
+ WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
+ }
+
+ if (thd->global_read_lock.is_acquired())
+ {
+ WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id);
+ thd->global_read_lock.unlock_global_read_lock(thd);
+ }
+
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+
+ if (thd->get_binlog_table_maps())
+ {
+ WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id);
+ thd->clear_binlog_table_maps();
+ }
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_conflict_state= ABORTED;
+ thd->wsrep_bf_thd = NULL;
+}
+
+static enum wsrep_status wsrep_apply_sql(
+ THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed)
+{
+ int error;
+ enum wsrep_status ret_code= WSREP_OK;
+
+ DBUG_ENTER("wsrep_bf_execute_cb");
+ thd->wsrep_exec_mode= REPL_RECV;
+ thd->net.vio= 0;
+ thd->start_time= timeval;
+ thd->wsrep_rand= randseed;
+
+ thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT;
+
+ DBUG_PRINT("wsrep", ("SQL: %s", sql));
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXEC;
+ /* preserve replaying mode */
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) {
+ WSREP_WARN("BF SQL apply failed: %d, %lld",
+ thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
+ DBUG_RETURN(WSREP_FATAL);
+ }
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ if (thd->wsrep_conflict_state!= NO_CONFLICT &&
+ thd->wsrep_conflict_state!= REPLAYING) {
+ ret_code= WSREP_FATAL;
+ WSREP_DEBUG("BF thd ending, with: %d, %lld",
+ thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ assert(thd->wsrep_exec_mode== REPL_RECV);
+ DBUG_RETURN(ret_code);
+}
+
+void wsrep_write_rbr_buf(
+ THD *thd, const void* rbr_buf, size_t buf_len)
+{
+ char filename[PATH_MAX]= {0};
+ int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
+ wsrep_data_home_dir, thd->thread_id,
+ (long long)thd->wsrep_trx_seqno);
+ if (len >= PATH_MAX)
+ {
+ WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
+ return;
+ }
+
+ FILE *of= fopen(filename, "wb");
+ if (of)
+ {
+ fwrite (rbr_buf, buf_len, 1, of);
+ fclose(of);
+ }
+ else
+ {
+ WSREP_ERROR("Failed to open file '%s': %d (%s)",
+ filename, errno, strerror(errno));
+ }
+}
+
+static inline wsrep_status_t wsrep_apply_rbr(
+ THD *thd, const uchar *rbr_buf, size_t buf_len)
+{
+ char *buf= (char *)rbr_buf;
+ int rcode= 0;
+ int event= 1;
+ Format_description_log_event *description_event = wsrep_format_desc;
+ DBUG_ENTER("wsrep_apply_rbr");
+
+ if (thd->killed == KILL_CONNECTION)
+ {
+ WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld",
+ (long long) thd->wsrep_trx_seqno);
+ DBUG_RETURN(WSREP_FATAL);
+ }
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_EXEC;
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
+ (long long) thd->wsrep_trx_seqno);
+
+ if ((rcode= trans_begin(thd)))
+ WSREP_WARN("begin for rbr apply failed: %lld, code: %d",
+ (long long) thd->wsrep_trx_seqno, rcode);
+
+ while(buf_len)
+ {
+ int exec_res;
+ int error = 0;
+ Log_event* ev= wsrep_read_log_event(&buf, &buf_len, description_event);
+
+ if (!ev)
+ {
+ WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld",
+ (long long)thd->wsrep_trx_seqno, buf_len);
+ rcode= 1;
+ goto error;
+ }
+ switch (ev->get_type_code()) {
+ case WRITE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case DELETE_ROWS_EVENT:
+ DBUG_ASSERT(buf_len != 0 ||
+ ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F));
+ break;
+ case FORMAT_DESCRIPTION_EVENT:
+ description_event = (Format_description_log_event *)ev;
+ break;
+ default:
+ break;
+ }
+
+ thd->variables.server_id = ev->server_id; // use the original server id for logging
+ thd->set_time(); // time the query
+ wsrep_xid_init(&thd->transaction.xid_state.xid,
+ wsrep_cluster_uuid(),
+ thd->wsrep_trx_seqno);
+ thd->lex->current_select= 0;
+ if (!ev->when)
+ ev->when = time(NULL);
+ ev->thd = thd;
+ exec_res = ev->apply_event(thd->wsrep_rli);
+ DBUG_PRINT("info", ("exec_event result: %d", exec_res));
+
+ if (exec_res)
+ {
+ WSREP_WARN("RBR event %d %s apply warning: %d, %lld",
+ event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno);
+ rcode= exec_res;
+ /* stop processing for the first error */
+ delete ev;
+ goto error;
+ }
+ event++;
+
+ if (thd->wsrep_conflict_state!= NO_CONFLICT &&
+ thd->wsrep_conflict_state!= REPLAYING)
+ WSREP_WARN("conflict state after RBR event applying: %d, %lld",
+ thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno);
+
+ if (thd->wsrep_conflict_state == MUST_ABORT) {
+ WSREP_WARN("RBR event apply failed, rolling back: %lld",
+ (long long) thd->wsrep_trx_seqno);
+ trans_rollback(thd);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ DBUG_RETURN(WSREP_FATAL);
+ }
+
+ if (ev->get_type_code() != TABLE_MAP_EVENT &&
+ ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F))
+ {
+ // TODO: combine with commit on higher level common for the query ws
+
+ thd->wsrep_rli->cleanup_context(thd, 0);
+
+ if (error == 0)
+ {
+ thd->clear_error();
+ }
+ else
+ WSREP_ERROR("Error in %s event: commit of row events failed: %lld",
+ ev->get_type_str(), (long long)thd->wsrep_trx_seqno);
+ }
+
+ if (description_event != ev)
+ delete ev;
+ }
+
+ error:
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_query_state= QUERY_IDLE;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ assert(thd->wsrep_exec_mode== REPL_RECV);
+
+ if (thd->killed == KILL_CONNECTION)
+ WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno);
+
+ if (rcode) DBUG_RETURN(WSREP_FATAL);
+ DBUG_RETURN(WSREP_OK);
+}
+
+wsrep_status_t wsrep_apply_cb(void* const ctx,
+ const void* const buf, size_t const buf_len,
+ wsrep_seqno_t const global_seqno)
+{
+ THD* const thd((THD*)ctx);
+
+ thd->wsrep_trx_seqno= global_seqno;
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "applying write set %lld: %p, %zu",
+ (long long)thd->wsrep_trx_seqno, buf, buf_len);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "applying write set");
+#endif /* WSREP_PROC_INFO */
+
+ wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len));
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "applied write set %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "applied write set");
+#endif /* WSREP_PROC_INFO */
+
+ if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len);
+
+ return rcode;
+}
+
+#if DELETE // this does not work in 5.5
+/* a common wrapper for end_trans() function - to put all necessary stuff */
+static inline wsrep_status_t
+wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end)
+{
+ if (0 == end_trans(thd, end))
+ {
+ return WSREP_OK;
+ }
+ else
+ {
+ return WSREP_FATAL;
+ }
+}
+#endif
+
+wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno)
+{
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "committing %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "committing");
+#endif /* WSREP_PROC_INFO */
+
+ wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0));
+// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT));
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "committed %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "committed");
+#endif /* WSREP_PROC_INFO */
+
+ if (WSREP_OK == rcode)
+ {
+ // TODO: mark snapshot with global_seqno.
+ }
+
+ return rcode;
+}
+
+wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno)
+{
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "rolling back %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "rolling back");
+#endif /* WSREP_PROC_INFO */
+
+ wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0));
+// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK));
+
+#ifdef WSREP_PROC_INFO
+ snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
+ "rolled back %lld", (long long)thd->wsrep_trx_seqno);
+ thd_proc_info(thd, thd->wsrep_info);
+#else
+ thd_proc_info(thd, "rolled back");
+#endif /* WSREP_PROC_INFO */
+
+ return rcode;
+}
+
+wsrep_status_t wsrep_commit_cb(void* const ctx,
+ wsrep_seqno_t const global_seqno,
+ bool const commit)
+{
+ THD* const thd((THD*)ctx);
+
+ assert(global_seqno == thd->wsrep_trx_seqno);
+
+ if (commit)
+ return wsrep_commit(thd, global_seqno);
+ else
+ return wsrep_rollback(thd, global_seqno);
+}
+
+Relay_log_info* wsrep_relay_log_init(const char* log_fname)
+{
+ Relay_log_info* rli= new Relay_log_info(false);
+
+ rli->no_storage= true;
+ if (!rli->relay_log.description_event_for_exec)
+ {
+ rli->relay_log.description_event_for_exec=
+ new Format_description_log_event(4);
+ }
+
+ rli->sql_thd= current_thd;
+ return rli;
+}
+
+void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
+{
+ shadow->options = thd->variables.option_bits;
+ shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
+ shadow->vio = thd->net.vio;
+
+ if (opt_log_slave_updates)
+ thd->variables.option_bits|= OPTION_BIN_LOG;
+ else
+ thd->variables.option_bits&= ~(OPTION_BIN_LOG);
+
+ if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay");
+
+ thd->wsrep_exec_mode= REPL_RECV;
+ thd->net.vio= 0;
+ thd->clear_error();
+
+ thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT;
+
+ shadow->tx_isolation = thd->variables.tx_isolation;
+ thd->variables.tx_isolation = ISO_READ_COMMITTED;
+ thd->tx_isolation = ISO_READ_COMMITTED;
+}
+
+void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
+{
+ thd->variables.option_bits = shadow->options;
+ thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
+ thd->net.vio = shadow->vio;
+ thd->variables.tx_isolation = shadow->tx_isolation;
+}
+
+void wsrep_replication_process(THD *thd)
+{
+ int rcode;
+ DBUG_ENTER("wsrep_replication_process");
+
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(thd, &shadow);
+
+ rcode = wsrep->recv(wsrep, (void *)thd);
+ DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
+
+ WSREP_INFO("applier thread exiting (code:%d)", rcode);
+
+ switch (rcode) {
+ case WSREP_OK:
+ case WSREP_NOT_IMPLEMENTED:
+ case WSREP_CONN_FAIL:
+ /* provider does not support slave operations / disconnected from group,
+ * just close applier thread */
+ break;
+ case WSREP_NODE_FAIL:
+ /* data inconsistency => SST is needed */
+ /* Note: we cannot just blindly restart replication here,
+ * SST might require server restart if storage engines must be
+ * initialized after SST */
+ WSREP_ERROR("node consistency compromised, aborting");
+ wsrep_kill_mysql(thd);
+ break;
+ case WSREP_WARNING:
+ case WSREP_TRX_FAIL:
+ case WSREP_TRX_MISSING:
+ /* these suggests a bug in provider code */
+ WSREP_WARN("bad return from recv() call: %d", rcode);
+ /* fall through to node shutdown */
+ case WSREP_FATAL:
+ /* Cluster connectivity is lost.
+ *
+ * If applier was killed on purpose (KILL_CONNECTION), we
+ * avoid mysql shutdown. This is because the killer will then handle
+ * shutdown processing (or replication restarting)
+ */
+ if (thd->killed != KILL_CONNECTION)
+ {
+ wsrep_kill_mysql(thd);
+ }
+ break;
+ }
+
+ if (thd->killed != KILL_CONNECTION)
+ {
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_close_applier(thd);
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+ }
+ wsrep_return_from_bf_mode(thd, &shadow);
+ DBUG_VOID_RETURN;
+}
+
+void wsrep_rollback_process(THD *thd)
+{
+ DBUG_ENTER("wsrep_rollback_process");
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ wsrep_aborting_thd= NULL;
+
+ while (thd->killed == NOT_KILLED) {
+ thd_proc_info(thd, "wsrep aborter idle");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
+ thd->mysys_var->current_cond= &COND_wsrep_rollback;
+
+ mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
+
+ WSREP_DEBUG("WSREP rollback thread wakes for signal");
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep aborter active");
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ /* check for false alarms */
+ if (!wsrep_aborting_thd)
+ {
+ WSREP_DEBUG("WSREP rollback thread has empty abort queue");
+ }
+ /* process all entries in the queue */
+ while (wsrep_aborting_thd) {
+ THD *aborting;
+ wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
+ aborting = wsrep_aborting_thd->aborting_thd;
+ my_free(wsrep_aborting_thd);
+ wsrep_aborting_thd= next;
+ /*
+ * must release mutex, appliers my want to add more
+ * aborting thds in our work queue, while we rollback
+ */
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+
+ mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
+ if (aborting->wsrep_conflict_state== ABORTED)
+ {
+ WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
+ (long long)aborting->real_id,
+ aborting->wsrep_conflict_state);
+
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ continue;
+ }
+ aborting->wsrep_conflict_state= ABORTING;
+
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+
+ aborting->store_globals();
+
+ mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
+ wsrep_client_rollback(aborting);
+ WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)",
+ aborting->thread_id, (long long)aborting->real_id);
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+ sql_print_information("WSREP: rollbacker thread exiting");
+
+ DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
+ DBUG_VOID_RETURN;
+}
+extern
+int wsrep_thd_is_brute_force(void *thd_ptr)
+{
+ if (thd_ptr) {
+ switch (((THD *)thd_ptr)->wsrep_exec_mode) {
+ case LOCAL_STATE:
+ {
+ if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING)
+ {
+ return 1;
+ }
+ return 0;
+ }
+ case REPL_RECV: return 1;
+ case TOTAL_ORDER: return 2;
+ case LOCAL_COMMIT: return 3;
+ }
+ }
+ return 0;
+}
+extern "C"
+int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
+{
+ THD *victim_thd = (THD *) victim_thd_ptr;
+ THD *bf_thd = (THD *) bf_thd_ptr;
+ DBUG_ENTER("wsrep_abort_thd");
+
+ if ( (WSREP(bf_thd) ||
+ ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) &&
+ bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
+ victim_thd)
+ {
+ WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
+ (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
+ ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
+ }
+ else
+ {
+ WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
+ }
+
+ DBUG_RETURN(1);
+}
+extern "C"
+int wsrep_thd_in_locking_session(void *thd_ptr)
+{
+ if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
+ return 1;
+ }
+ return 0;
+}
+#endif
/**
Retuns information about user or current user.
diff --git a/sql/sql_parse.h b/sql/sql_parse.h
index 346a3c8899b..7fc25b838ca 100644
--- a/sql/sql_parse.h
+++ b/sql/sql_parse.h
@@ -202,6 +202,22 @@ inline bool is_supported_parser_charset(CHARSET_INFO *cs)
{
return test(cs->mbminlen == 1);
}
+#ifdef WITH_WSREP
+
+#define WSREP_MYSQL_DB (char *)"mysql"
+#define WSREP_TO_ISOLATION_BEGIN(db_, table_, table_list_) \
+ if (WSREP(thd) && wsrep_to_isolation_begin(thd, db_, table_, table_list_)) goto error;
+
+#define WSREP_TO_ISOLATION_END \
+ if (WSREP(thd) || (thd && thd->wsrep_exec_mode==TOTAL_ORDER)) \
+ wsrep_to_isolation_end(thd);
+
+#else
+
+#define WSREP_TO_ISOLATION_BEGIN(db_, table_, table_list_)
+#define WSREP_TO_ISOLATION_END
+
+#endif /* WITH_WSREP */
#endif /* SQL_PARSE_INCLUDED */
diff --git a/sql/sql_plugin.cc b/sql/sql_plugin.cc
index 72f16ba7837..c0a547c46d8 100644
--- a/sql/sql_plugin.cc
+++ b/sql/sql_plugin.cc
@@ -3049,11 +3049,17 @@ void plugin_thdvar_init(THD *thd)
thd->variables.dynamic_variables_size= 0;
thd->variables.dynamic_variables_ptr= 0;
+#ifdef WITH_WSREP
+ if (!WSREP(thd) || !thd->wsrep_applier) {
+#endif
mysql_mutex_lock(&LOCK_plugin);
thd->variables.table_plugin=
intern_plugin_lock(NULL, global_system_variables.table_plugin);
intern_plugin_unlock(NULL, old_table_plugin);
mysql_mutex_unlock(&LOCK_plugin);
+#ifdef WITH_WSREP
+ }
+#endif
DBUG_VOID_RETURN;
}
diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc
index 025ff8820e6..7ccc6aaad39 100644
--- a/sql/sql_prepare.cc
+++ b/sql/sql_prepare.cc
@@ -3490,7 +3490,9 @@ Prepared_statement::set_parameters(String *expanded_query,
return res;
}
-
+#ifdef WITH_WSREP
+void wsrep_replay_transaction(THD *thd);
+#endif /* WITH_WSREP */
/**
Execute a prepared statement. Re-prepare it a limited number
of times if necessary.
@@ -3570,6 +3572,22 @@ reexecute:
error= execute(expanded_query, open_cursor) || thd->is_error();
thd->m_reprepare_observer= NULL;
+#ifdef WITH_WSREP
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ switch (thd->wsrep_conflict_state)
+ {
+ case CERT_FAILURE:
+ WSREP_DEBUG("PS execute fail for CERT_FAILURE: thd: %ld err: %d",
+ thd->thread_id, thd->stmt_da->sql_errno() );
+ thd->wsrep_conflict_state = NO_CONFLICT;
+ break;
+
+ case MUST_REPLAY:
+ (void)wsrep_replay_transaction(thd);
+ default: break;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+#endif /* WITH_WSREP */
if (error && !thd->is_fatal_error && !thd->killed &&
reprepare_observer.is_invalidated() &&
diff --git a/sql/sql_reload.cc b/sql/sql_reload.cc
index e9c9dc86e41..27156e2236a 100644
--- a/sql/sql_reload.cc
+++ b/sql/sql_reload.cc
@@ -253,7 +253,18 @@ bool reload_acl_and_cache(THD *thd, unsigned long options,
}
if (options & REFRESH_CHECKPOINT)
disable_checkpoints(thd);
- }
+#ifdef WITH_WSREP
+ /*
+ We need to do it second time after wsrep appliers were blocked in
+ make_global_read_lock_block_commit(thd) above since they could have
+ modified the tables too.
+ */
+ if (WSREP(thd) &&
+ close_cached_tables(thd, tables, (options & REFRESH_FAST) ?
+ FALSE : TRUE, TRUE))
+ result= 1;
+#endif /* WITH_WSREP */
+ }
else
{
if (thd && thd->locked_tables_mode)
diff --git a/sql/sql_show.cc b/sql/sql_show.cc
index ba36c0fc63f..feba387d3b4 100644
--- a/sql/sql_show.cc
+++ b/sql/sql_show.cc
@@ -8614,7 +8614,8 @@ ST_FIELD_INFO variables_fields_info[]=
{
{"VARIABLE_NAME", 64, MYSQL_TYPE_STRING, 0, 0, "Variable_name",
SKIP_OPEN_TABLE},
- {"VARIABLE_VALUE", 1024, MYSQL_TYPE_STRING, 0, 1, "Value", SKIP_OPEN_TABLE},
+ {"VARIABLE_VALUE", MYSQL_MAX_VARIABLE_VALUE_LEN, MYSQL_TYPE_STRING, 0, 1,
+ "Value", SKIP_OPEN_TABLE},
{0, 0, MYSQL_TYPE_STRING, 0, 0, 0, SKIP_OPEN_TABLE}
};
diff --git a/sql/sql_table.cc b/sql/sql_table.cc
index 60fd630001a..6bb650f55db 100644
--- a/sql/sql_table.cc
+++ b/sql/sql_table.cc
@@ -6621,6 +6621,9 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
}
}
+#ifdef WITH_WSREP
+ bool do_log_write(true);
+#endif /* WITH_WSREP */
if (error == HA_ERR_WRONG_COMMAND)
{
error= 0;
@@ -6628,6 +6631,9 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
ER_ILLEGAL_HA, ER(ER_ILLEGAL_HA),
table->file->table_type(),
table->s->db.str, table->s->table_name.str);
+#ifdef WITH_WSREP
+ WSREP_DEBUG("ignoring DDL failure: %d %s", error, thd->query());
+#endif /* WITH_WSREP */
}
if (!error)
diff --git a/sql/sql_trigger.cc b/sql/sql_trigger.cc
index 35a4464b9e2..c23543bd9f3 100644
--- a/sql/sql_trigger.cc
+++ b/sql/sql_trigger.cc
@@ -2452,3 +2452,55 @@ bool load_table_name_for_trigger(THD *thd,
DBUG_RETURN(FALSE);
}
+#ifdef WITH_WSREP
+int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len)
+{
+ LEX *lex= thd->lex;
+ String stmt_query;
+
+ LEX_STRING definer_user;
+ LEX_STRING definer_host;
+
+ if (!lex->definer)
+ {
+ if (!thd->slave_thread)
+ {
+ if (!(lex->definer= create_default_definer(thd)))
+ return 1;
+ }
+ }
+
+ if (lex->definer)
+ {
+ /* SUID trigger. */
+
+ definer_user= lex->definer->user;
+ definer_host= lex->definer->host;
+ }
+ else
+ {
+ /* non-SUID trigger. */
+
+ definer_user.str= 0;
+ definer_user.length= 0;
+
+ definer_host.str= 0;
+ definer_host.length= 0;
+ }
+
+ stmt_query.append(STRING_WITH_LEN("CREATE "));
+
+ append_definer(thd, &stmt_query, &definer_user, &definer_host);
+
+ LEX_STRING stmt_definition;
+ stmt_definition.str= (char*) thd->lex->stmt_definition_begin;
+ stmt_definition.length= thd->lex->stmt_definition_end
+ - thd->lex->stmt_definition_begin;
+ trim_whitespace(thd->charset(), & stmt_definition);
+
+ stmt_query.append(stmt_definition.str, stmt_definition.length);
+
+ return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
+ buf, buf_len);
+}
+#endif /* WITH_WSREP */
diff --git a/sql/sql_truncate.cc b/sql/sql_truncate.cc
index 19ce553f5ce..83cc0ed2cea 100644
--- a/sql/sql_truncate.cc
+++ b/sql/sql_truncate.cc
@@ -24,6 +24,9 @@
#include "sql_acl.h" // DROP_ACL
#include "sql_parse.h" // check_one_table_access()
#include "sql_truncate.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
#include "sql_show.h" //append_identifier()
@@ -531,9 +534,14 @@ bool Truncate_statement::execute(THD *thd)
if (check_one_table_access(thd, DROP_ACL, first_table))
DBUG_RETURN(res);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_to_isolation_begin(thd,
+ first_table->db,
+ first_table->table_name, NULL))
+ DBUG_RETURN(TRUE);
+#endif /* WITH_WSREP */
if (! (res= truncate_table(thd, first_table)))
my_ok(thd);
-
DBUG_RETURN(res);
}
diff --git a/sql/sql_update.cc b/sql/sql_update.cc
index 8413f612111..137acc6b52b 100644
--- a/sql/sql_update.cc
+++ b/sql/sql_update.cc
@@ -904,7 +904,11 @@ int mysql_update(THD *thd,
*/
if ((error < 0) || thd->transaction.stmt.modified_non_trans_table)
{
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (error < 0)
@@ -2044,7 +2048,11 @@ void multi_update::abort_result_set()
The query has to binlog because there's a modified non-transactional table
either from the query's list or via a stored routine: bug#13270,23333
*/
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
/*
THD::killed status might not have been set ON at time of an error
@@ -2313,7 +2321,11 @@ bool multi_update::send_eof()
if (local_error == 0 || thd->transaction.stmt.modified_non_trans_table)
{
+#ifdef WITH_WSREP
+ if (WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+#else
if (mysql_bin_log.is_open())
+#endif
{
int errcode= 0;
if (local_error == 0)
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 0b543b8218a..9ca504fd275 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -4042,6 +4042,226 @@ static Sys_var_tz Sys_time_zone(
"time_zone", "time_zone",
SESSION_VAR(time_zone), NO_CMD_LINE,
DEFAULT(&default_tz), NO_MUTEX_GUARD, IN_BINLOG);
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+
+static Sys_var_charptr Sys_wsrep_provider(
+ "wsrep_provider", "Path to replication provider library",
+ PREALLOCATED GLOBAL_VAR(wsrep_provider), CMD_LINE(REQUIRED_ARG, OPT_WSREP_PROVIDER),
+ IN_FS_CHARSET, DEFAULT(wsrep_provider),
+ // IN_FS_CHARSET, DEFAULT(wsrep_provider_default),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_provider_check), ON_UPDATE(wsrep_provider_update));
+
+static Sys_var_charptr Sys_wsrep_provider_options(
+ "wsrep_provider_options", "provider specific options",
+ PREALLOCATED GLOBAL_VAR(wsrep_provider_options),
+ CMD_LINE(REQUIRED_ARG, OPT_WSREP_PROVIDER_OPTIONS),
+ IN_FS_CHARSET, DEFAULT(wsrep_provider_options),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_provider_options_check),
+ ON_UPDATE(wsrep_provider_options_update));
+
+static Sys_var_charptr Sys_wsrep_data_home_dir(
+ "wsrep_data_home_dir", "home directory for wsrep provider",
+ READ_ONLY GLOBAL_VAR(wsrep_data_home_dir), CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(""),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_charptr Sys_wsrep_cluster_name(
+ "wsrep_cluster_name", "Name for the cluster",
+ GLOBAL_VAR(wsrep_cluster_name), CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_cluster_name),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_cluster_name_check),
+ ON_UPDATE(wsrep_cluster_name_update));
+
+static PolyLock_mutex PLock_wsrep_slave_threads(&LOCK_wsrep_slave_threads);
+static Sys_var_charptr Sys_wsrep_cluster_address (
+ "wsrep_cluster_address", "Address to initially connect to cluster",
+ PREALLOCATED GLOBAL_VAR(wsrep_cluster_address),
+ CMD_LINE(REQUIRED_ARG, OPT_WSREP_CLUSTER_ADDRESS),
+ IN_FS_CHARSET, DEFAULT(wsrep_cluster_address),
+ &PLock_wsrep_slave_threads, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_cluster_address_check),
+ ON_UPDATE(wsrep_cluster_address_update));
+
+static Sys_var_charptr Sys_wsrep_node_name (
+ "wsrep_node_name", "Node name",
+ GLOBAL_VAR(wsrep_node_name), CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_node_name),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_charptr Sys_wsrep_node_address (
+ "wsrep_node_address", "Node address",
+ GLOBAL_VAR(wsrep_node_address), CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_node_address),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_node_address_check),
+ ON_UPDATE(wsrep_node_address_update));
+
+static Sys_var_charptr Sys_wsrep_node_incoming_address(
+ "wsrep_node_incoming_address", "Client connection address",
+ GLOBAL_VAR(wsrep_node_incoming_address),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_node_incoming_address),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_ulong Sys_wsrep_slave_threads(
+ "wsrep_slave_threads", "Number of slave appliers to launch",
+ GLOBAL_VAR(wsrep_slave_threads), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1, 512), DEFAULT(1), BLOCK_SIZE(1),
+ &PLock_wsrep_slave_threads, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_slave_threads_check),
+ ON_UPDATE(wsrep_slave_threads_update));
+
+static Sys_var_charptr Sys_wsrep_dbug_option(
+ "wsrep_dbug_option", "DBUG options to provider library",
+ GLOBAL_VAR(wsrep_dbug_option),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(""),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_mybool Sys_wsrep_debug(
+ "wsrep_debug", "To enable debug level logging",
+ GLOBAL_VAR(wsrep_debug), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_mybool Sys_wsrep_convert_LOCK_to_trx(
+ "wsrep_convert_LOCK_to_trx", "To convert locking sessions "
+ "into transactions",
+ GLOBAL_VAR(wsrep_convert_LOCK_to_trx),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_ulong Sys_wsrep_retry_autocommit(
+ "wsrep_retry_autocommit", "Max number of times to retry "
+ "a failed autocommit statement",
+ SESSION_VAR(wsrep_retry_autocommit), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0, 10000), DEFAULT(1), BLOCK_SIZE(1));
+
+static Sys_var_mybool Sys_wsrep_auto_increment_control(
+ "wsrep_auto_increment_control", "To automatically control the "
+ "assignment of autoincrement variables",
+ GLOBAL_VAR(wsrep_auto_increment_control),
+ CMD_LINE(OPT_ARG), DEFAULT(TRUE));
+
+static Sys_var_mybool Sys_wsrep_drupal_282555_workaround(
+ "wsrep_drupal_282555_workaround", "To use a workaround for"
+ "bad autoincrement value",
+ GLOBAL_VAR(wsrep_drupal_282555_workaround),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_charptr sys_wsrep_sst_method(
+ "wsrep_sst_method", "State snapshot transfer method",
+ GLOBAL_VAR(wsrep_sst_method),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_sst_method), NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_sst_method_check),
+ ON_UPDATE(wsrep_sst_method_update));
+
+static Sys_var_charptr Sys_wsrep_sst_receive_address(
+ "wsrep_sst_receive_address", "Address where node is waiting for "
+ "SST contact",
+ GLOBAL_VAR(wsrep_sst_receive_address),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(wsrep_sst_receive_address), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG,
+ ON_CHECK(wsrep_sst_receive_address_check),
+ ON_UPDATE(wsrep_sst_receive_address_update));
+
+static Sys_var_charptr Sys_wsrep_sst_auth(
+ "wsrep_sst_auth", "Authentication for SST connection",
+ PREALLOCATED GLOBAL_VAR(wsrep_sst_auth), CMD_LINE(REQUIRED_ARG, OPT_WSREP_SST_AUTH),
+ IN_FS_CHARSET, DEFAULT(wsrep_sst_auth), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG,
+ ON_CHECK(wsrep_sst_auth_check),
+ ON_UPDATE(wsrep_sst_auth_update));
+
+static Sys_var_charptr Sys_wsrep_sst_donor(
+ "wsrep_sst_donor", "preferred donor node for the SST",
+ GLOBAL_VAR(wsrep_sst_donor),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(""), NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_sst_donor_check),
+ ON_UPDATE(wsrep_sst_donor_update));
+
+static Sys_var_mybool Sys_wsrep_sst_donor_rejects_queries(
+ "wsrep_sst_donor_rejects_queries", "Reject client queries "
+ "when donating state snapshot transfer",
+ GLOBAL_VAR(wsrep_sst_donor_rejects_queries),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_mybool Sys_wsrep_on (
+ "wsrep_on", "To enable wsrep replication ",
+ SESSION_VAR(wsrep_on),
+ CMD_LINE(OPT_ARG), DEFAULT(TRUE),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(wsrep_on_update));
+
+static Sys_var_charptr Sys_wsrep_start_position (
+ "wsrep_start_position", "global transaction position to start from ",
+ GLOBAL_VAR(wsrep_start_position),
+ CMD_LINE(REQUIRED_ARG, OPT_WSREP_START_POSITION),
+ IN_FS_CHARSET, DEFAULT(wsrep_start_position),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG,
+ ON_CHECK(wsrep_start_position_check),
+ ON_UPDATE(wsrep_start_position_update));
+
+static Sys_var_ulonglong Sys_wsrep_max_ws_size (
+ "wsrep_max_ws_size", "Max write set size (bytes)",
+ GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1));
+
+static Sys_var_ulong Sys_wsrep_max_ws_rows (
+ "wsrep_max_ws_rows", "Max number of rows in write set",
+ GLOBAL_VAR(wsrep_max_ws_rows), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(1, 1048576), DEFAULT(131072), BLOCK_SIZE(1));
+
+static Sys_var_charptr Sys_wsrep_notify_cmd(
+ "wsrep_notify_cmd", "",
+ GLOBAL_VAR(wsrep_notify_cmd),CMD_LINE(REQUIRED_ARG),
+ IN_FS_CHARSET, DEFAULT(""), NO_MUTEX_GUARD, NOT_IN_BINLOG);
+
+static Sys_var_mybool Sys_wsrep_certify_nonPK(
+ "wsrep_certify_nonPK", "Certify tables with no primary key",
+ GLOBAL_VAR(wsrep_certify_nonPK),
+ CMD_LINE(OPT_ARG), DEFAULT(TRUE));
+
+static Sys_var_mybool Sys_wsrep_causal_reads(
+ "wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations",
+ SESSION_VAR(wsrep_causal_reads),
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+ // ON_UPDATE(wsrep_causal_reads_update));
+
+static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS };
+static Sys_var_enum Sys_wsrep_OSU_method(
+ "wsrep_OSU_method", "Method for Online Schema Upgrade",
+ GLOBAL_VAR(wsrep_OSU_method_options), CMD_LINE(OPT_ARG),
+ wsrep_OSU_method_names, DEFAULT(WSREP_OSU_TOI),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(0));
+
+static Sys_var_enum Sys_wsrep_forced_binlog_format(
+ "wsrep_forced_binlog_format", "binlog format to take effect over user's choice",
+ GLOBAL_VAR(wsrep_forced_binlog_format),
+ CMD_LINE(REQUIRED_ARG, OPT_BINLOG_FORMAT),
+ wsrep_binlog_format_names, DEFAULT(BINLOG_FORMAT_UNSPEC),
+ NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
+ ON_UPDATE(0));
+
+static Sys_var_mybool Sys_wsrep_recover_datadir(
+ "wsrep_recover", "Recover database state after crash and exit",
+ READ_ONLY GLOBAL_VAR(wsrep_recovery),
+ CMD_LINE(OPT_ARG, OPT_WSREP_RECOVER), DEFAULT(FALSE));
+
+static Sys_var_mybool Sys_wsrep_replicate_myisam(
+ "wsrep_replicate_myisam", "To enable myisam replication",
+ GLOBAL_VAR(wsrep_replicate_myisam), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_mybool Sys_wsrep_log_conflicts(
+ "wsrep_log_conflicts", "To log multi-master conflicts",
+ GLOBAL_VAR(wsrep_log_conflicts), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+
+static Sys_var_ulong Sys_wsrep_mysql_replication_bundle(
+ "wsrep_mysql_replication_bundle", "mysql replication group commit ",
+ GLOBAL_VAR(wsrep_mysql_replication_bundle), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0, 1000), DEFAULT(0), BLOCK_SIZE(1));
+
+#endif /* WITH_WSREP */
static Sys_var_charptr Sys_ignore_db_dirs(
"ignore_db_dirs",
diff --git a/sql/transaction.cc b/sql/transaction.cc
index 9a1952427d8..a381b5d21ce 100644
--- a/sql/transaction.cc
+++ b/sql/transaction.cc
@@ -97,6 +97,9 @@ static bool xa_trans_force_rollback(THD *thd)
by ha_rollback()/THD::transaction::cleanup().
*/
thd->transaction.xid_state.rm_error= 0;
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
if (ha_rollback_trans(thd, true))
{
my_error(ER_XAER_RMERR, MYF(0));
@@ -135,6 +138,9 @@ bool trans_begin(THD *thd, uint flags)
(thd->variables.option_bits & OPTION_TABLE_LOCK))
{
thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
@@ -176,6 +182,12 @@ bool trans_begin(THD *thd, uint flags)
thd->tx_read_only= false;
}
+#ifdef WITH_WSREP
+ thd->wsrep_PA_safe= true;
+ if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd))
+ DBUG_RETURN(TRUE);
+#endif /* WITH_WSREP */
+
thd->variables.option_bits|= OPTION_BEGIN;
thd->server_status|= SERVER_STATUS_IN_TRANS;
if (thd->tx_read_only)
@@ -207,6 +219,9 @@ bool trans_commit(THD *thd)
if (trans_check(thd))
DBUG_RETURN(TRUE);
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
@@ -252,6 +267,9 @@ bool trans_commit_implicit(THD *thd)
/* Safety if one did "drop table" on locked tables */
if (!thd->locked_tables_mode)
thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
@@ -288,9 +306,15 @@ bool trans_rollback(THD *thd)
int res;
DBUG_ENTER("trans_rollback");
+#ifdef WITH_WSREP
+ thd->wsrep_PA_safe= true;
+#endif /* WITH_WSREP */
if (trans_check(thd))
DBUG_RETURN(TRUE);
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
@@ -333,6 +357,9 @@ bool trans_commit_stmt(THD *thd)
if (thd->transaction.stmt.ha_list)
{
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, FALSE);
+#endif /* WITH_WSREP */
res= ha_commit_trans(thd, FALSE);
if (! thd->in_active_multi_stmt_transaction())
{
@@ -378,9 +405,19 @@ bool trans_rollback_stmt(THD *thd)
if (thd->transaction.stmt.ha_list)
{
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, FALSE);
+#endif /* WITH_WSREP */
ha_rollback_trans(thd, FALSE);
if (thd->transaction_rollback_request && !thd->in_sub_stmt)
+#ifdef WITH_WSREP
+ {
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
ha_rollback_trans(thd, TRUE);
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
if (! thd->in_active_multi_stmt_transaction())
{
thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation;
@@ -738,6 +775,9 @@ bool trans_xa_commit(THD *thd)
}
else if (xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE)
{
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
int r= ha_commit_trans(thd, TRUE);
if ((res= test(r)))
my_error(r == 1 ? ER_XA_RBROLLBACK : ER_XAER_RMERR, MYF(0));
@@ -759,6 +799,9 @@ bool trans_xa_commit(THD *thd)
if (thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
{
+#ifdef WITH_WSREP
+ wsrep_register_hton(thd, TRUE);
+#endif /* WITH_WSREP */
ha_rollback_trans(thd, TRUE);
my_error(ER_XAER_RMERR, MYF(0));
}