summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSeppo Jaakola <seppo.jaakola@codership.com>2013-11-06 00:29:37 +0200
committerSeppo Jaakola <seppo.jaakola@codership.com>2013-11-06 00:29:37 +0200
commit2b4183f10b54a5b3f8c848d897b3107859c23fa4 (patch)
tree5d48ff3f0a9814926ba59b7adae1d056c57b54c8
parent9129c8f1d3e1f8c9daeae559eaf6b9807b4331ec (diff)
downloadmariadb-git-2b4183f10b54a5b3f8c848d897b3107859c23fa4.tar.gz
bzr merge -r3890..3891 lp:codership-mysql/5.5
-rw-r--r--sql/CMakeLists.txt1
-rw-r--r--sql/mdl.cc1
-rw-r--r--sql/mysqld.cc76
-rw-r--r--sql/mysqld.h8
-rw-r--r--sql/sql_base.cc2
-rw-r--r--sql/sql_parse.cc691
-rw-r--r--sql/sys_vars.cc6
-rw-r--r--sql/wsrep_mysqld.cc14
-rw-r--r--sql/wsrep_mysqld.h102
-rw-r--r--sql/wsrep_notify.cc3
-rw-r--r--sql/wsrep_notify.cc.moved107
-rw-r--r--sql/wsrep_priv.h206
-rw-r--r--sql/wsrep_priv.h.moved233
-rw-r--r--sql/wsrep_sst.cc4
-rw-r--r--sql/wsrep_sst.h40
-rw-r--r--sql/wsrep_thd.cc424
-rw-r--r--sql/wsrep_thd.h31
-rw-r--r--sql/wsrep_utils.cc4
-rw-r--r--sql/wsrep_utils.h208
-rw-r--r--sql/wsrep_var.cc7
-rw-r--r--sql/wsrep_var.h81
21 files changed, 1202 insertions, 1047 deletions
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 0c17b851f5a..7766b82adff 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -50,6 +50,7 @@ IF(WITH_WSREP)
wsrep_sst.cc
wsrep_utils.cc
wsrep_var.cc
+ wsrep_thd.cc
)
SET(WSREP_LIB wsrep)
ENDIF()
diff --git a/sql/mdl.cc b/sql/mdl.cc
index 7fbbfa74537..a13aeb7904d 100644
--- a/sql/mdl.cc
+++ b/sql/mdl.cc
@@ -24,6 +24,7 @@
#include <mysql/psi/mysql_stage.h>
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
+#include "wsrep_thd.h"
extern "C" my_thread_id wsrep_thd_thread_id(THD *thd);
extern "C" char *wsrep_thd_query(THD *thd);
void sql_print_information(const char *format, ...)
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index c4f78ebd36f..1c9560ceb9f 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -73,6 +73,8 @@
#include "debug_sync.h"
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
+#include "wsrep_thd.h"
+#include "wsrep_sst.h"
ulong wsrep_running_threads = 0; // # of currently running wsrep threads
#endif
#include "sql_callback.h"
@@ -741,7 +743,7 @@ mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
int wsrep_replaying= 0;
static void wsrep_close_threads(THD* thd);
-#endif
+#endif /* WITH_WSREP */
/* replication parameters, if master_host is not NULL, we are a slave */
uint report_port= 0;
@@ -4467,26 +4469,26 @@ static int init_thread_environment()
rpl_init_gtid_slave_state();
#endif
-#ifdef WITH_WSREP
- mysql_mutex_init(key_LOCK_wsrep_ready,
- &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
+#ifdef WITH_WSREP
+ mysql_mutex_init(key_LOCK_wsrep_ready,
+ &LOCK_wsrep_ready, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_ready, &COND_wsrep_ready, NULL);
- mysql_mutex_init(key_LOCK_wsrep_sst,
- &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_sst,
+ &LOCK_wsrep_sst, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
- mysql_mutex_init(key_LOCK_wsrep_sst_init,
- &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_sst_init,
+ &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
- mysql_mutex_init(key_LOCK_wsrep_rollback,
- &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_rollback,
+ &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
- mysql_mutex_init(key_LOCK_wsrep_replaying,
- &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_replaying,
+ &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
- mysql_mutex_init(key_LOCK_wsrep_slave_threads,
- &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_LOCK_wsrep_desync,
- &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_slave_threads,
+ &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_desync,
+ &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
#endif
DBUG_RETURN(0);
@@ -5109,7 +5111,7 @@ pthread_handler_t start_wsrep_THD(void *arg)
THD *thd;
wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
- if (my_thread_init())
+ if (my_thread_init())
{
WSREP_ERROR("Could not initialize thread");
return(NULL);
@@ -5172,7 +5174,7 @@ pthread_handler_t start_wsrep_THD(void *arg)
statistic_increment(aborted_connects,&LOCK_status);
MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
delete thd;
-
+
return(NULL);
}
@@ -5230,42 +5232,6 @@ pthread_handler_t start_wsrep_THD(void *arg)
return(NULL);
}
-void wsrep_create_rollbacker()
-{
- if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
- {
- pthread_t hThread;
- /* create rollbacker */
- if (pthread_create( &hThread, &connection_attrib,
- start_wsrep_THD, (void*)wsrep_rollback_process))
- WSREP_WARN("Can't create thread to manage wsrep rollback");
- }
-}
-
-void wsrep_create_appliers(long threads)
-{
- if (!wsrep_connected)
- {
- /* see wsrep_replication_start() for the logic */
- if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
- wsrep_provider && strcasecmp(wsrep_provider, "none"))
- {
- WSREP_ERROR("Trying to launch slave threads before creating "
- "connection at '%s'", wsrep_cluster_address);
- assert(0);
- }
- return;
- }
-
- long wsrep_threads=0;
- pthread_t hThread;
- while (wsrep_threads++ < threads) {
- if (pthread_create(
- &hThread, &connection_attrib,
- start_wsrep_THD, (void*)wsrep_replication_process))
- WSREP_WARN("Can't create thread to manage wsrep replication");
- }
-}
/**/
static bool abort_replicated(THD *thd)
{
@@ -5290,7 +5256,7 @@ WSREP_WARN("applier has wsrep_exec_mode = %d", thd->wsrep_exec_mode);
if ( thd->slave_thread || /* declared as mysql slave */
thd->system_thread || /* declared as system thread */
- !thd->vio_ok() || /* server internal thread */
+ !thd->vio_ok() || /* server internal thread */
thd->wsrep_exec_mode==REPL_RECV || /* applier or replaying thread */
thd->wsrep_applier || /* wsrep slave applier */
!thd->variables.wsrep_on) /* client, but fenced outside wsrep */
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 522a7820533..c9cfb8d1094 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -240,7 +240,7 @@ extern PSI_mutex_key key_PAGE_lock, key_LOCK_sync, key_LOCK_active,
#ifdef WITH_WSREP
extern PSI_mutex_key key_LOCK_wsrep_thd;
extern PSI_cond_key key_COND_wsrep_thd;
-#endif /* HAVE_MMAP */
+#endif /* HAVE_WSREP */
#ifdef HAVE_OPENSSL
extern PSI_mutex_key key_LOCK_des_key_file;
@@ -580,8 +580,8 @@ enum options_mysqld
OPT_WSREP_START_POSITION,
OPT_WSREP_SST_AUTH,
OPT_WSREP_RECOVER,
-#endif
OPT_which_is_always_the_last
+#endif /* WITH_WSREP */
};
#endif
@@ -724,5 +724,9 @@ extern uint internal_tmp_table_max_key_segments;
extern uint volatile global_disable_checkpoint;
extern my_bool opt_help;
+#ifdef WITH_WSREP
+#include "my_pthread.h"
+pthread_handler_t start_wsrep_THD(void*);
+#endif /* WITH_WSREP */
#endif /* MYSQLD_INCLUDED */
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index 325c79043a3..31a51fefc00 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -63,7 +63,7 @@
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
-
+#include "wsrep_thd.h"
#endif // WITH_WSREP
bool
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 750613326d2..8f208ee66b1 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -105,7 +105,7 @@
#ifdef WITH_WSREP
#include "wsrep_mysqld.h"
-static void wsrep_client_rollback(THD *thd);
+#include "wsrep_thd.h"
static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
Parser_state *parser_state);
#endif /* WITH_WSREP */
@@ -6571,90 +6571,6 @@ void mysql_init_multi_delete(LEX *lex)
}
#ifdef WITH_WSREP
-static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*);
-static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*);
-void wsrep_replay_transaction(THD *thd)
-{
- /* checking if BF trx must be replayed */
- if (thd->wsrep_conflict_state== MUST_REPLAY)
- {
- if (thd->wsrep_exec_mode!= REPL_RECV)
- {
- if (thd->get_stmt_da()->is_sent())
- {
- WSREP_ERROR("replay issue, thd has reported status already");
- }
- thd->get_stmt_da()->reset_diagnostics_area();
-
- thd->wsrep_conflict_state= REPLAYING;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- mysql_reset_thd_for_next_command(thd, opt_userstat_running);
- thd->killed= NOT_KILLED;
- close_thread_tables(thd);
- if (thd->locked_tables_mode && thd->lock)
- {
- WSREP_DEBUG("releasing table lock for replaying (%ld)",
- thd->thread_id);
- thd->locked_tables_list.unlock_locked_tables(thd);
- thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
- }
- thd->mdl_context.release_transactional_locks();
-
- thd_proc_info(thd, "wsrep replaying trx");
- WSREP_DEBUG("replay trx: %s %lld",
- thd->query() ? thd->query() : "void",
- (long long)wsrep_thd_trx_seqno(thd));
- struct wsrep_thd_shadow shadow;
- wsrep_prepare_bf_thd(thd, &shadow);
- int rcode = wsrep->replay_trx(wsrep,
- &thd->wsrep_ws_handle,
- (void *)thd);
-
- wsrep_return_from_bf_mode(thd, &shadow);
- if (thd->wsrep_conflict_state!= REPLAYING)
- WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
-
- switch (rcode)
- {
- case WSREP_OK:
- thd->wsrep_conflict_state= NO_CONFLICT;
- wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
- WSREP_DEBUG("trx_replay successful for: %ld %llu",
- thd->thread_id, (long long)thd->real_id);
- break;
- case WSREP_TRX_FAIL:
- if (thd->stmt_da->is_sent)
- {
- WSREP_ERROR("replay failed, thd has reported status");
- }
- else
- {
- WSREP_DEBUG("replay failed, rolling back");
- my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
- }
- thd->wsrep_conflict_state= ABORTED;
- wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
- break;
- default:
- WSREP_ERROR("trx_replay failed for: %d, query: %s",
- rcode, thd->query() ? thd->query() : "void");
- /* we're now in inconsistent state, must abort */
- unireg_abort(1);
- break;
- }
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying--;
- WSREP_DEBUG("replaying decreased: %d, thd: %lu",
- wsrep_replaying, thd->thread_id);
- mysql_cond_broadcast(&COND_wsrep_replaying);
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
- }
- }
-}
-
static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length,
Parser_state *parser_state)
{
@@ -8597,611 +8513,6 @@ LEX_USER *create_definer(THD *thd, LEX_STRING *user_name, LEX_STRING *host_name)
return definer;
}
-#ifdef WITH_WSREP
-/* must have (&thd->LOCK_wsrep_thd) */
-static void wsrep_client_rollback(THD *thd)
-{
- WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s",
- thd->thread_id, thd->query());
-
- thd->wsrep_conflict_state= ABORTING;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
- trans_rollback(thd);
-
- if (thd->locked_tables_mode && thd->lock)
- {
- WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id);
- thd->locked_tables_list.unlock_locked_tables(thd);
- thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
- }
-
- if (thd->global_read_lock.is_acquired())
- {
- WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id);
- thd->global_read_lock.unlock_global_read_lock(thd);
- }
-
- /* Release transactional metadata locks. */
- thd->mdl_context.release_transactional_locks();
-
- /* release explicit MDL locks */
- thd->mdl_context.release_explicit_locks();
-
- if (thd->get_binlog_table_maps())
- {
- WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id);
- thd->clear_binlog_table_maps();
- }
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- thd->wsrep_conflict_state= ABORTED;
- thd->wsrep_bf_thd = NULL;
-}
-
-static enum wsrep_status wsrep_apply_sql(
- THD *thd, const char *sql, size_t sql_len, time_t timeval, uint32 randseed)
-{
- int error;
- enum wsrep_status ret_code= WSREP_OK;
-
- DBUG_ENTER("wsrep_bf_execute_cb");
- thd->wsrep_exec_mode= REPL_RECV;
- thd->net.vio= 0;
- thd->start_time= timeval;
- thd->wsrep_rand= randseed;
-
- thd->variables.option_bits |= OPTION_NOT_AUTOCOMMIT;
-
- DBUG_PRINT("wsrep", ("SQL: %s", sql));
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- thd->wsrep_query_state= QUERY_EXEC;
- /* preserve replaying mode */
- if (thd->wsrep_conflict_state!= REPLAYING)
- thd->wsrep_conflict_state= NO_CONFLICT;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- if ((error= dispatch_command(COM_QUERY, thd, (char*)sql, sql_len))) {
- WSREP_WARN("BF SQL apply failed: %d, %lld",
- thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
- DBUG_RETURN(WSREP_FATAL);
- }
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- if (thd->wsrep_conflict_state!= NO_CONFLICT &&
- thd->wsrep_conflict_state!= REPLAYING) {
- ret_code= WSREP_FATAL;
- WSREP_DEBUG("BF thd ending, with: %d, %lld",
- thd->wsrep_conflict_state, (long long)thd->wsrep_trx_seqno);
- }
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- assert(thd->wsrep_exec_mode== REPL_RECV);
- DBUG_RETURN(ret_code);
-}
-
-void wsrep_write_rbr_buf(
- THD *thd, const void* rbr_buf, size_t buf_len)
-{
- char filename[PATH_MAX]= {0};
- int len= snprintf(filename, PATH_MAX, "%s/GRA_%ld_%lld.log",
- wsrep_data_home_dir, thd->thread_id,
- (long long)thd->wsrep_trx_seqno);
- if (len >= PATH_MAX)
- {
- WSREP_ERROR("RBR dump path too long: %d, skipping dump.", len);
- return;
- }
-
- FILE *of= fopen(filename, "wb");
- if (of)
- {
- fwrite (rbr_buf, buf_len, 1, of);
- fclose(of);
- }
- else
- {
- WSREP_ERROR("Failed to open file '%s': %d (%s)",
- filename, errno, strerror(errno));
- }
-}
-
-static inline wsrep_status_t wsrep_apply_rbr(
- THD *thd, const uchar *rbr_buf, size_t buf_len)
-{
- char *buf= (char *)rbr_buf;
- int rcode= 0;
- int event= 1;
- Format_description_log_event *description_event = wsrep_format_desc;
- DBUG_ENTER("wsrep_apply_rbr");
-
- if (thd->killed == KILL_CONNECTION)
- {
- WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld",
- (long long) thd->wsrep_trx_seqno);
- DBUG_RETURN(WSREP_FATAL);
- }
-
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- thd->wsrep_query_state= QUERY_EXEC;
- if (thd->wsrep_conflict_state!= REPLAYING)
- thd->wsrep_conflict_state= NO_CONFLICT;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
- (long long) thd->wsrep_trx_seqno);
-
- if ((rcode= trans_begin(thd)))
- WSREP_WARN("begin for rbr apply failed: %lld, code: %d",
- (long long) thd->wsrep_trx_seqno, rcode);
-
- while(buf_len)
- {
- int exec_res;
- int error = 0;
- Log_event* ev= wsrep_read_log_event(&buf, &buf_len, description_event);
-
- if (!ev)
- {
- WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %ld",
- (long long)thd->wsrep_trx_seqno, buf_len);
- rcode= 1;
- goto error;
- }
- switch (ev->get_type_code()) {
- case WRITE_ROWS_EVENT:
- case UPDATE_ROWS_EVENT:
- case DELETE_ROWS_EVENT:
- DBUG_ASSERT(buf_len != 0 ||
- ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F));
- break;
- case FORMAT_DESCRIPTION_EVENT:
- description_event = (Format_description_log_event *)ev;
- break;
- default:
- break;
- }
-
- thd->variables.server_id = ev->server_id; // use the original server id for logging
- thd->set_time(); // time the query
- wsrep_xid_init(&thd->transaction.xid_state.xid,
- wsrep_cluster_uuid(),
- thd->wsrep_trx_seqno);
- thd->lex->current_select= 0;
- if (!ev->when)
- ev->when = time(NULL);
- ev->thd = thd;
- exec_res = ev->apply_event(thd->wsrep_rli);
- DBUG_PRINT("info", ("exec_event result: %d", exec_res));
-
- if (exec_res)
- {
- WSREP_WARN("RBR event %d %s apply warning: %d, %lld",
- event, ev->get_type_str(), exec_res, (long long) thd->wsrep_trx_seqno);
- rcode= exec_res;
- /* stop processing for the first error */
- delete ev;
- goto error;
- }
- event++;
-
- if (thd->wsrep_conflict_state!= NO_CONFLICT &&
- thd->wsrep_conflict_state!= REPLAYING)
- WSREP_WARN("conflict state after RBR event applying: %d, %lld",
- thd->wsrep_query_state, (long long)thd->wsrep_trx_seqno);
-
- if (thd->wsrep_conflict_state == MUST_ABORT) {
- WSREP_WARN("RBR event apply failed, rolling back: %lld",
- (long long) thd->wsrep_trx_seqno);
- trans_rollback(thd);
- thd->locked_tables_list.unlock_locked_tables(thd);
- /* Release transactional metadata locks. */
- thd->mdl_context.release_transactional_locks();
- thd->wsrep_conflict_state= NO_CONFLICT;
- DBUG_RETURN(WSREP_FATAL);
- }
-
- if ((ev->get_type_code() == WRITE_ROWS_EVENT ||
- ev->get_type_code() == UPDATE_ROWS_EVENT ||
- ev->get_type_code() == DELETE_ROWS_EVENT) &&
- ((Rows_log_event *) ev)->get_flags(Rows_log_event::STMT_END_F))
- {
- thd->wsrep_rli->cleanup_context(thd, 0);
-
- if (error == 0)
- {
- thd->clear_error();
- }
- else
- WSREP_ERROR("Error in %s event: commit of row events failed: %lld",
- ev->get_type_str(), (long long)thd->wsrep_trx_seqno);
- }
-
- if (description_event != ev)
- delete ev;
- }
-
- error:
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- thd->wsrep_query_state= QUERY_IDLE;
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- assert(thd->wsrep_exec_mode== REPL_RECV);
-
- if (thd->killed == KILL_CONNECTION)
- WSREP_INFO("applier aborted: %lld", (long long)thd->wsrep_trx_seqno);
-
- if (rcode) DBUG_RETURN(WSREP_FATAL);
- DBUG_RETURN(WSREP_OK);
-}
-
-wsrep_status_t wsrep_apply_cb(void* const ctx,
- const void* const buf, size_t const buf_len,
- wsrep_seqno_t const global_seqno)
-{
- THD* const thd((THD*)ctx);
-
- thd->wsrep_trx_seqno= global_seqno;
-
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "applying write set %lld: %p, %zu",
- (long long)thd->wsrep_trx_seqno, buf, buf_len);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "applying write set");
-#endif /* WSREP_PROC_INFO */
-
- wsrep_status_t const rcode(wsrep_apply_rbr(thd, (const uchar*)buf, buf_len));
-
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "applied write set %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "applied write set");
-#endif /* WSREP_PROC_INFO */
-
- if (WSREP_OK != rcode) wsrep_write_rbr_buf(thd, buf, buf_len);
- TABLE *tmp;
- while ((tmp = thd->temporary_tables))
- {
- WSREP_DEBUG("Applier %lu, has temporary tables: %s.%s",
- thd->thread_id,
- (tmp->s) ? tmp->s->db.str : "void",
- (tmp->s) ? tmp->s->table_name.str : "void");
- close_temporary_table(thd, tmp, 1, 1);
- }
-
- return rcode;
-}
-
-#if DELETE // this does not work in 5.5
-/* a common wrapper for end_trans() function - to put all necessary stuff */
-static inline wsrep_status_t
-wsrep_end_trans (THD* const thd, enum enum_mysql_completiontype const end)
-{
- if (0 == end_trans(thd, end))
- {
- return WSREP_OK;
- }
- else
- {
- return WSREP_FATAL;
- }
-}
-#endif
-
-wsrep_status_t wsrep_commit(THD* const thd, wsrep_seqno_t const global_seqno)
-{
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "committing %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "committing");
-#endif /* WSREP_PROC_INFO */
-
- wsrep_status_t const rcode(wsrep_apply_sql(thd, "COMMIT", 6, 0, 0));
-// wsrep_status_t const rcode(wsrep_end_trans (thd, COMMIT));
-
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "committed %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "committed");
-#endif /* WSREP_PROC_INFO */
-
- if (WSREP_OK == rcode)
- {
- // TODO: mark snapshot with global_seqno.
- }
-
- return rcode;
-}
-
-wsrep_status_t wsrep_rollback(THD* const thd, wsrep_seqno_t const global_seqno)
-{
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "rolling back %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "rolling back");
-#endif /* WSREP_PROC_INFO */
-
- wsrep_status_t const rcode(wsrep_apply_sql(thd, "ROLLBACK", 8, 0, 0));
-// wsrep_status_t const rcode(wsrep_end_trans (thd, ROLLBACK));
-
-#ifdef WSREP_PROC_INFO
- snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "rolled back %lld", (long long)thd->wsrep_trx_seqno);
- thd_proc_info(thd, thd->wsrep_info);
-#else
- thd_proc_info(thd, "rolled back");
-#endif /* WSREP_PROC_INFO */
-
- return rcode;
-}
-
-wsrep_status_t wsrep_commit_cb(void* const ctx,
- wsrep_seqno_t const global_seqno,
- bool const commit)
-{
- THD* const thd((THD*)ctx);
-
- assert(global_seqno == thd->wsrep_trx_seqno);
-
- if (commit)
- return wsrep_commit(thd, global_seqno);
- else
- return wsrep_rollback(thd, global_seqno);
-}
-
-Relay_log_info* wsrep_relay_log_init(const char* log_fname)
-{
- Relay_log_info* rli= new Relay_log_info(false);
- LEX_STRING conn = {"wsrep",5};
-
- /*
- * problem is that mariaDB requires master info for rli, and wsrep replication
- * really should not have it. Allocating empty mi here just for the sake of
- * getting rpl_filter pointer initialized for mi, rpl_filter will be needed in
- * several places
- */
- rli->mi= new Master_info(&conn, false);
-
- rli->no_storage= true;
- if (!rli->relay_log.description_event_for_exec)
- {
- rli->relay_log.description_event_for_exec=
- new Format_description_log_event(4);
- }
-
- rli->sql_thd= current_thd;
- return rli;
-}
-
-static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
-{
- shadow->options = thd->variables.option_bits;
- shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
- shadow->vio = thd->net.vio;
-
- if (opt_log_slave_updates)
- thd->variables.option_bits|= OPTION_BIN_LOG;
- else
- thd->variables.option_bits&= ~(OPTION_BIN_LOG);
-
- if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay");
-
- thd->wsrep_exec_mode= REPL_RECV;
- thd->net.vio= 0;
- thd->clear_error();
-
- thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT;
-
- shadow->tx_isolation = thd->variables.tx_isolation;
- thd->variables.tx_isolation = ISO_READ_COMMITTED;
- thd->tx_isolation = ISO_READ_COMMITTED;
-}
-
-static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
-{
- thd->variables.option_bits = shadow->options;
- thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
- thd->net.vio = shadow->vio;
- thd->variables.tx_isolation = shadow->tx_isolation;
-}
-
-void wsrep_replication_process(THD *thd)
-{
- int rcode;
- DBUG_ENTER("wsrep_replication_process");
-
- struct wsrep_thd_shadow shadow;
- wsrep_prepare_bf_thd(thd, &shadow);
-
- rcode = wsrep->recv(wsrep, (void *)thd);
- DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
-
- WSREP_INFO("applier thread exiting (code:%d)", rcode);
-
- switch (rcode) {
- case WSREP_OK:
- case WSREP_NOT_IMPLEMENTED:
- case WSREP_CONN_FAIL:
- /* provider does not support slave operations / disconnected from group,
- * just close applier thread */
- break;
- case WSREP_NODE_FAIL:
- /* data inconsistency => SST is needed */
- /* Note: we cannot just blindly restart replication here,
- * SST might require server restart if storage engines must be
- * initialized after SST */
- WSREP_ERROR("node consistency compromised, aborting");
- wsrep_kill_mysql(thd);
- break;
- case WSREP_WARNING:
- case WSREP_TRX_FAIL:
- case WSREP_TRX_MISSING:
- /* these suggests a bug in provider code */
- WSREP_WARN("bad return from recv() call: %d", rcode);
- /* fall through to node shutdown */
- case WSREP_FATAL:
- /* Cluster connectivity is lost.
- *
- * If applier was killed on purpose (KILL_CONNECTION), we
- * avoid mysql shutdown. This is because the killer will then handle
- * shutdown processing (or replication restarting)
- */
- if (thd->killed != KILL_CONNECTION)
- {
- wsrep_kill_mysql(thd);
- }
- break;
- }
-
- mysql_mutex_lock(&LOCK_thread_count);
- wsrep_close_applier(thd);
- mysql_cond_broadcast(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
-
- if (thd->temporary_tables)
- {
- WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id);
- }
- wsrep_return_from_bf_mode(thd, &shadow);
- DBUG_VOID_RETURN;
-}
-
-void wsrep_rollback_process(THD *thd)
-{
- DBUG_ENTER("wsrep_rollback_process");
-
- mysql_mutex_lock(&LOCK_wsrep_rollback);
- wsrep_aborting_thd= NULL;
-
- while (thd->killed == NOT_KILLED) {
- thd_proc_info(thd, "wsrep aborter idle");
- thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
- thd->mysys_var->current_cond= &COND_wsrep_rollback;
-
- mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
-
- WSREP_DEBUG("WSREP rollback thread wakes for signal");
-
- mysql_mutex_lock(&thd->mysys_var->mutex);
- thd_proc_info(thd, "wsrep aborter active");
- thd->mysys_var->current_mutex= 0;
- thd->mysys_var->current_cond= 0;
- mysql_mutex_unlock(&thd->mysys_var->mutex);
-
- /* check for false alarms */
- if (!wsrep_aborting_thd)
- {
- WSREP_DEBUG("WSREP rollback thread has empty abort queue");
- }
- /* process all entries in the queue */
- while (wsrep_aborting_thd) {
- THD *aborting;
- wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
- aborting = wsrep_aborting_thd->aborting_thd;
- my_free(wsrep_aborting_thd);
- wsrep_aborting_thd= next;
- /*
- * must release mutex, appliers my want to add more
- * aborting thds in our work queue, while we rollback
- */
- mysql_mutex_unlock(&LOCK_wsrep_rollback);
-
- mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
- if (aborting->wsrep_conflict_state== ABORTED)
- {
- WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
- (long long)aborting->real_id,
- aborting->wsrep_conflict_state);
-
- mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
- mysql_mutex_lock(&LOCK_wsrep_rollback);
- continue;
- }
- aborting->wsrep_conflict_state= ABORTING;
-
- mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
-
- aborting->store_globals();
-
- mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
- wsrep_client_rollback(aborting);
- WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)",
- aborting->thread_id, (long long)aborting->real_id);
- mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
-
- mysql_mutex_lock(&LOCK_wsrep_rollback);
- }
- }
-
- mysql_mutex_unlock(&LOCK_wsrep_rollback);
- sql_print_information("WSREP: rollbacker thread exiting");
-
- DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
- DBUG_VOID_RETURN;
-}
-
-extern
-int wsrep_thd_is_brute_force(void *thd_ptr)
-{
- if (thd_ptr) {
- switch (((THD *)thd_ptr)->wsrep_exec_mode) {
- case LOCAL_STATE:
- {
- if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING)
- {
- return 1;
- }
- return 0;
- }
- case REPL_RECV: return 1;
- case TOTAL_ORDER: return 2;
- case LOCAL_COMMIT: return 3;
- }
- }
- return 0;
-}
-extern "C"
-int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
-{
- THD *victim_thd = (THD *) victim_thd_ptr;
- THD *bf_thd = (THD *) bf_thd_ptr;
- DBUG_ENTER("wsrep_abort_thd");
-
- if ( (WSREP(bf_thd) ||
- ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) &&
- bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
- victim_thd)
- {
- WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
- (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
- ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
- }
- else
- {
- WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
- }
-
- DBUG_RETURN(1);
-}
-extern "C"
-int wsrep_thd_in_locking_session(void *thd_ptr)
-{
- if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
- return 1;
- }
- return 0;
-}
-#endif
-
/**
Retuns information about user or current user.
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 00be8bfbc21..e1b0dc50c65 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -4063,7 +4063,8 @@ static Sys_var_tz Sys_time_zone(
DEFAULT(&default_tz), NO_MUTEX_GUARD, IN_BINLOG);
#ifdef WITH_WSREP
-#include "wsrep_mysqld.h"
+#include "wsrep_var.h"
+#include "wsrep_sst.h"
static Sys_var_charptr Sys_wsrep_provider(
"wsrep_provider", "Path to replication provider library",
@@ -4244,8 +4245,7 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK(
static Sys_var_mybool Sys_wsrep_causal_reads(
"wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations",
SESSION_VAR(wsrep_causal_reads),
- CMD_LINE(OPT_ARG), DEFAULT(FALSE));
- // ON_UPDATE(wsrep_causal_reads_update));
+ CMD_LINE(OPT_ARG), DEFAULT(FALSE));
static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS };
static Sys_var_enum Sys_wsrep_OSU_method(
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 8a04c5cfd79..af8b025cfd7 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -17,6 +17,10 @@
#include <sql_class.h>
#include <sql_parse.h>
#include "wsrep_priv.h"
+#include "wsrep_thd.h"
+#include "wsrep_sst.h"
+#include "wsrep_utils.h"
+#include "wsrep_var.h"
#include <cstdio>
#include <cstdlib>
#include "log_event.h"
@@ -88,7 +92,6 @@ const char* wsrep_provider_version = provider_version;
const char* wsrep_provider_vendor = provider_vendor;
/* End wsrep status variables */
-
wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
wsp::node_status local_status;
@@ -99,14 +102,7 @@ long wsrep_protocol_version = 2;
// if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE;
-// action execute callback
-extern wsrep_status_t wsrep_apply_cb(void *ctx,
- const void* buf, size_t buf_len,
- wsrep_seqno_t global_seqno);
-
-extern wsrep_status_t wsrep_commit_cb (void *ctx,
- wsrep_seqno_t global_seqno,
- bool commit);
+/* wsrep callbacks */
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) {
diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h
index 0df368c562d..67bc9d0ea15 100644
--- a/sql/wsrep_mysqld.h
+++ b/sql/wsrep_mysqld.h
@@ -79,11 +79,6 @@ extern ulong wsrep_retry_autocommit;
extern my_bool wsrep_auto_increment_control;
extern my_bool wsrep_drupal_282555_workaround;
extern my_bool wsrep_incremental_data_collection;
-extern const char* wsrep_sst_method;
-extern const char* wsrep_sst_receive_address;
-extern char* wsrep_sst_auth;
-extern const char* wsrep_sst_donor;
-extern my_bool wsrep_sst_donor_rejects_queries;
extern const char* wsrep_start_position;
extern long long wsrep_max_ws_size;
extern long wsrep_max_ws_rows;
@@ -117,71 +112,21 @@ extern const char* wsrep_provider_vendor;
extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff);
extern void wsrep_free_status(THD *thd);
-#define WSREP_SST_ADDRESS_AUTO "AUTO"
-#define WSREP_NODE_INCOMING_AUTO "AUTO"
-// MySQL variables funcs
+extern int wsrep_init_vars();
+extern void wsrep_provider_init (const char* provider);
+extern void wsrep_start_position_init (const char* position);
+extern void wsrep_sst_auth_init (const char* auth);
-#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var)
-#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type)
-#define DEFAULT_ARGS (THD* thd, enum_var_type var_type)
-#define INIT_ARGS (const char* opt)
-
-extern int wsrep_init_vars();
-
-extern bool wsrep_on_update UPDATE_ARGS;
-extern void wsrep_causal_reads_update UPDATE_ARGS;
-extern bool wsrep_start_position_check CHECK_ARGS;
-extern bool wsrep_start_position_update UPDATE_ARGS;
-extern void wsrep_start_position_init INIT_ARGS;
-
-extern bool wsrep_provider_check CHECK_ARGS;
-extern bool wsrep_provider_update UPDATE_ARGS;
-extern void wsrep_provider_init INIT_ARGS;
-
-extern bool wsrep_provider_options_check CHECK_ARGS;
-extern bool wsrep_provider_options_update UPDATE_ARGS;
-extern void wsrep_provider_options_init INIT_ARGS;
-
-extern bool wsrep_cluster_address_check CHECK_ARGS;
-extern bool wsrep_cluster_address_update UPDATE_ARGS;
-extern void wsrep_cluster_address_init INIT_ARGS;
-
-extern bool wsrep_cluster_name_check CHECK_ARGS;
-extern bool wsrep_cluster_name_update UPDATE_ARGS;
-
-extern bool wsrep_node_name_check CHECK_ARGS;
-extern bool wsrep_node_name_update UPDATE_ARGS;
-
-extern bool wsrep_node_address_check CHECK_ARGS;
-extern bool wsrep_node_address_update UPDATE_ARGS;
-extern void wsrep_node_address_init INIT_ARGS;
-
-extern bool wsrep_sst_method_check CHECK_ARGS;
-extern bool wsrep_sst_method_update UPDATE_ARGS;
-extern void wsrep_sst_method_init INIT_ARGS;
-
-extern bool wsrep_sst_receive_address_check CHECK_ARGS;
-extern bool wsrep_sst_receive_address_update UPDATE_ARGS;
-
-extern bool wsrep_sst_auth_check CHECK_ARGS;
-extern bool wsrep_sst_auth_update UPDATE_ARGS;
-extern void wsrep_sst_auth_init INIT_ARGS;
-
-extern bool wsrep_sst_donor_check CHECK_ARGS;
-extern bool wsrep_sst_donor_update UPDATE_ARGS;
-
-extern bool wsrep_slave_threads_check CHECK_ARGS;
-extern bool wsrep_slave_threads_update UPDATE_ARGS;
-
-extern bool wsrep_desync_check CHECK_ARGS;
-extern bool wsrep_desync_update UPDATE_ARGS;
-
-extern bool wsrep_before_SE(); // initialize wsrep before storage
- // engines (true) or after (false)
extern int wsrep_init();
extern void wsrep_deinit();
extern void wsrep_recover();
+extern bool wsrep_before_SE(); // initialize wsrep before storage
+ // engines (true) or after (false)
+/* wsrep initialization sequence at startup
+ * @param before wsrep_before_SE() value */
+extern void wsrep_init_startup(bool before);
+
@@ -215,17 +160,11 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal);
-/* wsrep initialization sequence at startup
- * @param first wsrep_before_SE() value */
-extern void wsrep_init_startup(bool before);
-
extern void wsrep_close_client_connections(my_bool wait_to_end);
extern int wsrep_wait_committing_connections_close(int wait_time);
extern void wsrep_close_applier(THD *thd);
extern void wsrep_wait_appliers_close(THD *thd);
extern void wsrep_close_applier_threads(int count);
-extern void wsrep_create_appliers(long threads = wsrep_slave_threads);
-extern void wsrep_create_rollbacker();
extern void wsrep_kill_mysql(THD *thd);
/* new defines */
@@ -286,18 +225,6 @@ extern wsrep_seqno_t wsrep_locked_seqno;
if (victim_thd) WSREP_LOG_CONFLICT_THD(victim_thd, "Victim thread"); \
}
-/*! Synchronizes applier thread start with init thread */
-extern void wsrep_sst_grab();
-/*! Init thread waits for SST completion */
-extern bool wsrep_sst_wait();
-/*! Signals wsrep that initialization is complete, writesets can be applied */
-extern void wsrep_sst_continue();
-
-extern void wsrep_SE_init_grab(); /*! grab init critical section */
-extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */
-extern void wsrep_SE_init_done(); /*! signal that SE init is complte */
-extern void wsrep_SE_initialized(); /*! mark SE initialization complete */
-
extern void wsrep_ready_wait();
enum wsrep_trx_status {
@@ -311,17 +238,10 @@ wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all);
class Ha_trx_info;
struct THD_TRANS;
void wsrep_register_hton(THD* thd, bool all);
-
-void wsrep_replication_process(THD *thd);
-void wsrep_rollback_process(THD *thd);
void wsrep_brute_force_killer(THD *thd);
int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id);
+
extern "C" bool wsrep_consistency_check(void *thd_ptr);
-//extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
-extern int wsrep_thd_is_brute_force(void *thd_ptr);
-extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
- my_bool signal);
-extern "C" int wsrep_thd_in_locking_session(void *thd_ptr);
/* this is visible for client build so that innodb plugin gets this */
typedef struct wsrep_aborting_thd {
diff --git a/sql/wsrep_notify.cc b/sql/wsrep_notify.cc
index ff997d01183..291cdbb7c75 100644
--- a/sql/wsrep_notify.cc
+++ b/sql/wsrep_notify.cc
@@ -15,6 +15,7 @@
#include <mysqld.h>
#include "wsrep_priv.h"
+#include "wsrep_utils.h"
const char* wsrep_notify_cmd="";
@@ -64,7 +65,7 @@ void wsrep_notify_status (wsrep_member_status_t status,
{
char uuid_str[40];
- wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str));
+ wsrep_uuid_print (&view->state_id.uuid, uuid_str, sizeof(uuid_str));
cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
" --uuid %s", uuid_str);
diff --git a/sql/wsrep_notify.cc.moved b/sql/wsrep_notify.cc.moved
new file mode 100644
index 00000000000..ff997d01183
--- /dev/null
+++ b/sql/wsrep_notify.cc.moved
@@ -0,0 +1,107 @@
+/* Copyright 2010 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <mysqld.h>
+#include "wsrep_priv.h"
+
+const char* wsrep_notify_cmd="";
+
+static const char* _status_str(wsrep_member_status_t status)
+{
+ switch (status)
+ {
+ case WSREP_MEMBER_UNDEFINED: return "Undefined";
+ case WSREP_MEMBER_JOINER: return "Joiner";
+ case WSREP_MEMBER_DONOR: return "Donor";
+ case WSREP_MEMBER_JOINED: return "Joined";
+ case WSREP_MEMBER_SYNCED: return "Synced";
+ default: return "Error(?)";
+ }
+}
+
+void wsrep_notify_status (wsrep_member_status_t status,
+ const wsrep_view_info_t* view)
+{
+ if (!wsrep_notify_cmd || 0 == strlen(wsrep_notify_cmd))
+ {
+ WSREP_INFO("wsrep_notify_cmd is not defined, skipping notification.");
+ return;
+ }
+
+ char cmd_buf[1 << 16]; // this can be long
+ long cmd_len = sizeof(cmd_buf) - 1;
+ char* cmd_ptr = cmd_buf;
+ long cmd_off = 0;
+
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, "%s",
+ wsrep_notify_cmd);
+
+ if (status >= WSREP_MEMBER_UNDEFINED && status < WSREP_MEMBER_ERROR)
+ {
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --status %s",
+ _status_str(status));
+ }
+ else
+ {
+ /* here we preserve provider error codes */
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ " --status 'Error(%d)'", status);
+ }
+
+ if (0 != view)
+ {
+ char uuid_str[40];
+
+ wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str));
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ " --uuid %s", uuid_str);
+
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ " --primary %s", view->view >= 0 ? "yes" : "no");
+
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ " --index %d", view->my_idx);
+
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --members");
+
+ for (int i = 0; i < view->memb_num; i++)
+ {
+ wsrep_uuid_print (&view->members[i].id, uuid_str, sizeof(uuid_str));
+ cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off,
+ "%c%s/%s/%s", i > 0 ? ',' : ' ',
+ uuid_str, view->members[i].name,
+ view->members[i].incoming);
+ }
+ }
+
+ if (cmd_off == cmd_len)
+ {
+ WSREP_ERROR("Notification buffer too short (%ld). Aborting notification.",
+ cmd_len);
+ return;
+ }
+
+ wsp::process p(cmd_ptr, "r");
+
+ p.wait();
+ int err = p.error();
+
+ if (err)
+ {
+ WSREP_ERROR("Notification command failed: %d (%s): \"%s\"",
+ err, strerror(err), cmd_ptr);
+ }
+}
+
diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h
index 700639ebcb1..291823d773e 100644
--- a/sql/wsrep_priv.h
+++ b/sql/wsrep_priv.h
@@ -26,208 +26,30 @@
#include <pthread.h>
#include <cstdio>
-extern void wsrep_ready_set (my_bool x);
+void wsrep_ready_set (my_bool x);
-extern ssize_t wsrep_sst_prepare (void** msg);
-extern int wsrep_sst_donate_cb (void* app_ctx,
- void* recv_ctx,
- const void* msg, size_t msg_len,
- const wsrep_uuid_t* current_uuid,
- wsrep_seqno_t current_seqno,
- const char* state, size_t state_len,
- bool bypass);
-
-extern size_t guess_ip (char* buf, size_t buf_len);
-extern size_t guess_address(char* buf, size_t buf_len);
+ssize_t wsrep_sst_prepare (void** msg);
+wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx,
+ void* recv_ctx,
+ const void* msg, size_t msg_len,
+ const wsrep_gtid_t* state_id,
+ const char* state, size_t state_len,
+ bool bypass);
extern wsrep_uuid_t local_uuid;
extern wsrep_seqno_t local_seqno;
+// a helper function
+void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t,
+ const void*, size_t);
/*! SST thread signals init thread about sst completion */
-extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool);
+void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool);
extern void wsrep_notify_status (wsrep_member_status_t new_status,
const wsrep_view_info_t* view = 0);
-namespace wsp {
-class node_status
-{
-public:
- node_status() : status(WSREP_MEMBER_UNDEFINED) {}
- void set(wsrep_member_status_t new_status,
- const wsrep_view_info_t* view = 0)
- {
- if (status != new_status || 0 != view)
- {
- wsrep_notify_status(new_status, view);
- status = new_status;
- }
- }
- wsrep_member_status_t get() const { return status; }
-private:
- wsrep_member_status_t status;
-};
-} /* namespace wsp */
-
-extern wsp::node_status local_status;
-
-namespace wsp {
-/* A small class to run external programs. */
-class process
-{
-private:
- const char* const str_;
- FILE* io_;
- int err_;
- pid_t pid_;
-
-public:
-/*! @arg type is a pointer to a null-terminated string which must contain
- either the letter 'r' for reading or the letter 'w' for writing.
- */
- process (const char* cmd, const char* type);
- ~process ();
-
- FILE* pipe () { return io_; }
- int error() { return err_; }
- int wait ();
- const char* cmd() { return str_; }
-};
-#ifdef REMOVED
-class lock
-{
- pthread_mutex_t* const mtx_;
-
-public:
-
- lock (pthread_mutex_t* mtx) : mtx_(mtx)
- {
- int err = pthread_mutex_lock (mtx_);
-
- if (err)
- {
- WSREP_ERROR("Mutex lock failed: %s", strerror(err));
- abort();
- }
- }
-
- virtual ~lock ()
- {
- int err = pthread_mutex_unlock (mtx_);
-
- if (err)
- {
- WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
- abort();
- }
- }
-
- inline void wait (pthread_cond_t* cond)
- {
- pthread_cond_wait (cond, mtx_);
- }
-
-private:
-
- lock (const lock&);
- lock& operator=(const lock&);
-
-};
-
-class monitor
-{
- int mutable refcnt;
- pthread_mutex_t mutable mtx;
- pthread_cond_t mutable cond;
-
-public:
-
- monitor() : refcnt(0)
- {
- pthread_mutex_init (&mtx, NULL);
- pthread_cond_init (&cond, NULL);
- }
-
- ~monitor()
- {
- pthread_mutex_destroy (&mtx);
- pthread_cond_destroy (&cond);
- }
-
- void enter() const
- {
- lock l(&mtx);
-
- while (refcnt)
- {
- l.wait(&cond);
- }
- refcnt++;
- }
-
- void leave() const
- {
- lock l(&mtx);
-
- refcnt--;
- if (refcnt == 0)
- {
- pthread_cond_signal (&cond);
- }
- }
-
-private:
-
- monitor (const monitor&);
- monitor& operator= (const monitor&);
-};
-
-class critical
-{
- const monitor& mon;
-
-public:
-
- critical(const monitor& m) : mon(m) { mon.enter(); }
-
- ~critical() { mon.leave(); }
-
-private:
-
- critical (const critical&);
- critical& operator= (const critical&);
-};
-#endif
-
-class thd
-{
- class thd_init
- {
- public:
- thd_init() { my_thread_init(); }
- ~thd_init() { my_thread_end(); }
- }
- init;
-
- thd (const thd&);
- thd& operator= (const thd&);
-
-public:
-
- thd(my_bool wsrep_on);
- ~thd();
- THD* const ptr;
-};
+/* binlog-related stuff */
+int wsrep_write_cache(IO_CACHE *cache, uchar **buf, size_t *buf_len);
-class string
-{
-public:
- string() : string_(0) {}
- void set(char* str) { if (string_) free (string_); string_ = str; }
- ~string() { set (0); }
-private:
- char* string_;
-};
-} // namespace wsrep
#endif /* WSREP_PRIV_H */
diff --git a/sql/wsrep_priv.h.moved b/sql/wsrep_priv.h.moved
new file mode 100644
index 00000000000..700639ebcb1
--- /dev/null
+++ b/sql/wsrep_priv.h.moved
@@ -0,0 +1,233 @@
+/* Copyright 2010 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+//! @file declares symbols private to wsrep integration layer
+
+#ifndef WSREP_PRIV_H
+#define WSREP_PRIV_H
+
+#include "wsrep_mysqld.h"
+#include "../wsrep/wsrep_api.h"
+
+#include <log.h>
+#include <pthread.h>
+#include <cstdio>
+
+extern void wsrep_ready_set (my_bool x);
+
+extern ssize_t wsrep_sst_prepare (void** msg);
+extern int wsrep_sst_donate_cb (void* app_ctx,
+ void* recv_ctx,
+ const void* msg, size_t msg_len,
+ const wsrep_uuid_t* current_uuid,
+ wsrep_seqno_t current_seqno,
+ const char* state, size_t state_len,
+ bool bypass);
+
+extern size_t guess_ip (char* buf, size_t buf_len);
+extern size_t guess_address(char* buf, size_t buf_len);
+
+extern wsrep_uuid_t local_uuid;
+extern wsrep_seqno_t local_seqno;
+
+/*! SST thread signals init thread about sst completion */
+extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool);
+
+extern void wsrep_notify_status (wsrep_member_status_t new_status,
+ const wsrep_view_info_t* view = 0);
+
+namespace wsp {
+class node_status
+{
+public:
+ node_status() : status(WSREP_MEMBER_UNDEFINED) {}
+ void set(wsrep_member_status_t new_status,
+ const wsrep_view_info_t* view = 0)
+ {
+ if (status != new_status || 0 != view)
+ {
+ wsrep_notify_status(new_status, view);
+ status = new_status;
+ }
+ }
+ wsrep_member_status_t get() const { return status; }
+private:
+ wsrep_member_status_t status;
+};
+} /* namespace wsp */
+
+extern wsp::node_status local_status;
+
+namespace wsp {
+/* A small class to run external programs. */
+class process
+{
+private:
+ const char* const str_;
+ FILE* io_;
+ int err_;
+ pid_t pid_;
+
+public:
+/*! @arg type is a pointer to a null-terminated string which must contain
+ either the letter 'r' for reading or the letter 'w' for writing.
+ */
+ process (const char* cmd, const char* type);
+ ~process ();
+
+ FILE* pipe () { return io_; }
+ int error() { return err_; }
+ int wait ();
+ const char* cmd() { return str_; }
+};
+#ifdef REMOVED
+class lock
+{
+ pthread_mutex_t* const mtx_;
+
+public:
+
+ lock (pthread_mutex_t* mtx) : mtx_(mtx)
+ {
+ int err = pthread_mutex_lock (mtx_);
+
+ if (err)
+ {
+ WSREP_ERROR("Mutex lock failed: %s", strerror(err));
+ abort();
+ }
+ }
+
+ virtual ~lock ()
+ {
+ int err = pthread_mutex_unlock (mtx_);
+
+ if (err)
+ {
+ WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
+ abort();
+ }
+ }
+
+ inline void wait (pthread_cond_t* cond)
+ {
+ pthread_cond_wait (cond, mtx_);
+ }
+
+private:
+
+ lock (const lock&);
+ lock& operator=(const lock&);
+
+};
+
+class monitor
+{
+ int mutable refcnt;
+ pthread_mutex_t mutable mtx;
+ pthread_cond_t mutable cond;
+
+public:
+
+ monitor() : refcnt(0)
+ {
+ pthread_mutex_init (&mtx, NULL);
+ pthread_cond_init (&cond, NULL);
+ }
+
+ ~monitor()
+ {
+ pthread_mutex_destroy (&mtx);
+ pthread_cond_destroy (&cond);
+ }
+
+ void enter() const
+ {
+ lock l(&mtx);
+
+ while (refcnt)
+ {
+ l.wait(&cond);
+ }
+ refcnt++;
+ }
+
+ void leave() const
+ {
+ lock l(&mtx);
+
+ refcnt--;
+ if (refcnt == 0)
+ {
+ pthread_cond_signal (&cond);
+ }
+ }
+
+private:
+
+ monitor (const monitor&);
+ monitor& operator= (const monitor&);
+};
+
+class critical
+{
+ const monitor& mon;
+
+public:
+
+ critical(const monitor& m) : mon(m) { mon.enter(); }
+
+ ~critical() { mon.leave(); }
+
+private:
+
+ critical (const critical&);
+ critical& operator= (const critical&);
+};
+#endif
+
+class thd
+{
+ class thd_init
+ {
+ public:
+ thd_init() { my_thread_init(); }
+ ~thd_init() { my_thread_end(); }
+ }
+ init;
+
+ thd (const thd&);
+ thd& operator= (const thd&);
+
+public:
+
+ thd(my_bool wsrep_on);
+ ~thd();
+ THD* const ptr;
+};
+
+class string
+{
+public:
+ string() : string_(0) {}
+ void set(char* str) { if (string_) free (string_); string_ = str; }
+ ~string() { set (0); }
+private:
+ char* string_;
+};
+
+} // namespace wsrep
+#endif /* WSREP_PRIV_H */
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc
index 4afe269cfe2..d651e1ed0a9 100644
--- a/sql/wsrep_sst.cc
+++ b/sql/wsrep_sst.cc
@@ -13,6 +13,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include "wsrep_sst.h"
+
#include <mysqld.h>
#include <sql_class.h>
#include <set_var.h>
@@ -20,6 +22,7 @@
#include <sql_reload.h>
#include <sql_parse.h>
#include "wsrep_priv.h"
+#include "wsrep_utils.h"
#include <cstdio>
#include <cstdlib>
@@ -58,7 +61,6 @@ const char* wsrep_sst_donor = "";
// container for real auth string
static const char* sst_auth_real = NULL;
-
my_bool wsrep_sst_donor_rejects_queries = FALSE;
bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
diff --git a/sql/wsrep_sst.h b/sql/wsrep_sst.h
new file mode 100644
index 00000000000..b7f0e26f226
--- /dev/null
+++ b/sql/wsrep_sst.h
@@ -0,0 +1,40 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_SST_H
+#define WSREP_SST_H
+
+#include <mysql.h> // my_bool
+
+/* system variables */
+extern const char* wsrep_sst_method;
+extern const char* wsrep_sst_receive_address;
+extern const char* wsrep_sst_donor;
+extern char* wsrep_sst_auth;
+extern my_bool wsrep_sst_donor_rejects_queries;
+
+/*! Synchronizes applier thread start with init thread */
+extern void wsrep_sst_grab();
+/*! Init thread waits for SST completion */
+extern bool wsrep_sst_wait();
+/*! Signals wsrep that initialization is complete, writesets can be applied */
+extern void wsrep_sst_continue();
+
+extern void wsrep_SE_init_grab(); /*! grab init critical section */
+extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */
+extern void wsrep_SE_init_done(); /*! signal that SE init is complte */
+extern void wsrep_SE_initialized(); /*! mark SE initialization complete */
+
+#endif /* WSREP_SST_H */
diff --git a/sql/wsrep_thd.cc b/sql/wsrep_thd.cc
new file mode 100644
index 00000000000..9cbd32cac73
--- /dev/null
+++ b/sql/wsrep_thd.cc
@@ -0,0 +1,424 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#include "wsrep_thd.h"
+
+#include "transaction.h"
+#include "rpl_rli.h"
+#include "log_event.h"
+#include "sql_parse.h"
+#include "slave.h" // opt_log_slave_updates
+#include "sql_base.h" // close_thread_tables()
+#include "mysqld.h" // start_wsrep_THD();
+
+/* must have (&thd->LOCK_wsrep_thd) */
+void wsrep_client_rollback(THD *thd)
+{
+ WSREP_DEBUG("client rollback due to BF abort for (%ld), query: %s",
+ thd->thread_id, thd->query());
+
+ thd->wsrep_conflict_state= ABORTING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ trans_rollback(thd);
+
+ if (thd->locked_tables_mode && thd->lock)
+ {
+ WSREP_DEBUG("unlocking tables for BF abort (%ld)", thd->thread_id);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
+ }
+
+ if (thd->global_read_lock.is_acquired())
+ {
+ WSREP_DEBUG("unlocking GRL for BF abort (%ld)", thd->thread_id);
+ thd->global_read_lock.unlock_global_read_lock(thd);
+ }
+
+ /* Release transactional metadata locks. */
+ thd->mdl_context.release_transactional_locks();
+
+ /* release explicit MDL locks */
+ thd->mdl_context.release_explicit_locks();
+
+ if (thd->get_binlog_table_maps())
+ {
+ WSREP_DEBUG("clearing binlog table map for BF abort (%ld)", thd->thread_id);
+ thd->clear_binlog_table_maps();
+ }
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ thd->wsrep_conflict_state= ABORTED;
+}
+
+static Relay_log_info* wsrep_relay_log_init(const char* log_fname)
+{
+ Relay_log_info* rli= new Relay_log_info(false);
+
+ rli->no_storage= true;
+ if (!rli->relay_log.description_event_for_exec)
+ {
+ rli->relay_log.description_event_for_exec=
+ new Format_description_log_event(4);
+ }
+
+ rli->sql_thd= current_thd;
+ return rli;
+}
+
+static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow)
+{
+ shadow->options = thd->variables.option_bits;
+ shadow->wsrep_exec_mode = thd->wsrep_exec_mode;
+ shadow->vio = thd->net.vio;
+
+ if (opt_log_slave_updates)
+ thd->variables.option_bits|= OPTION_BIN_LOG;
+ else
+ thd->variables.option_bits&= ~(OPTION_BIN_LOG);
+
+ if (!thd->wsrep_rli) thd->wsrep_rli= wsrep_relay_log_init("wsrep_relay");
+
+ thd->wsrep_exec_mode= REPL_RECV;
+ thd->net.vio= 0;
+ thd->clear_error();
+
+ thd->variables.option_bits|= OPTION_NOT_AUTOCOMMIT;
+
+ shadow->tx_isolation = thd->variables.tx_isolation;
+ thd->variables.tx_isolation = ISO_READ_COMMITTED;
+ thd->tx_isolation = ISO_READ_COMMITTED;
+}
+
+static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow)
+{
+ thd->variables.option_bits = shadow->options;
+ thd->wsrep_exec_mode = shadow->wsrep_exec_mode;
+ thd->net.vio = shadow->vio;
+ thd->variables.tx_isolation = shadow->tx_isolation;
+}
+
+void wsrep_replay_transaction(THD *thd)
+{
+ /* checking if BF trx must be replayed */
+ if (thd->wsrep_conflict_state== MUST_REPLAY) {
+ if (thd->wsrep_exec_mode!= REPL_RECV) {
+ if (thd->stmt_da->is_sent)
+ {
+ WSREP_ERROR("replay issue, thd has reported status already");
+ }
+ thd->stmt_da->reset_diagnostics_area();
+
+ thd->wsrep_conflict_state= REPLAYING;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ mysql_reset_thd_for_next_command(thd);
+ thd->killed= THD::NOT_KILLED;
+ close_thread_tables(thd);
+ if (thd->locked_tables_mode && thd->lock)
+ {
+ WSREP_DEBUG("releasing table lock for replaying (%ld)",
+ thd->thread_id);
+ thd->locked_tables_list.unlock_locked_tables(thd);
+ thd->variables.option_bits&= ~(OPTION_TABLE_LOCK);
+ }
+ thd->mdl_context.release_transactional_locks();
+
+ thd_proc_info(thd, "wsrep replaying trx");
+ WSREP_DEBUG("replay trx: %s %lld",
+ thd->query() ? thd->query() : "void",
+ (long long)wsrep_thd_trx_seqno(thd));
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(thd, &shadow);
+ int rcode = wsrep->replay_trx(wsrep,
+ &thd->wsrep_ws_handle,
+ (void *)thd);
+
+ wsrep_return_from_bf_mode(thd, &shadow);
+ if (thd->wsrep_conflict_state!= REPLAYING)
+ WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state );
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+
+ switch (rcode)
+ {
+ case WSREP_OK:
+ thd->wsrep_conflict_state= NO_CONFLICT;
+ wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
+ WSREP_DEBUG("trx_replay successful for: %ld %llu",
+ thd->thread_id, (long long)thd->real_id);
+ if (thd->stmt_da->is_sent)
+ {
+ WSREP_WARN("replay ok, thd has reported status");
+ }
+ else
+ {
+ my_ok(thd);
+ }
+ break;
+ case WSREP_TRX_FAIL:
+ if (thd->stmt_da->is_sent)
+ {
+ WSREP_ERROR("replay failed, thd has reported status");
+ }
+ else
+ {
+ WSREP_DEBUG("replay failed, rolling back");
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction");
+ }
+ thd->wsrep_conflict_state= ABORTED;
+ wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle);
+ break;
+ default:
+ WSREP_ERROR("trx_replay failed for: %d, query: %s",
+ rcode, thd->query() ? thd->query() : "void");
+ /* we're now in inconsistent state, must abort */
+ unireg_abort(1);
+ break;
+ }
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying--;
+ WSREP_DEBUG("replaying decreased: %d, thd: %lu",
+ wsrep_replaying, thd->thread_id);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+ }
+}
+
+static void wsrep_replication_process(THD *thd)
+{
+ int rcode;
+ DBUG_ENTER("wsrep_replication_process");
+
+ struct wsrep_thd_shadow shadow;
+ wsrep_prepare_bf_thd(thd, &shadow);
+
+ rcode = wsrep->recv(wsrep, (void *)thd);
+ DBUG_PRINT("wsrep",("wsrep_repl returned: %d", rcode));
+
+ WSREP_INFO("applier thread exiting (code:%d)", rcode);
+
+ switch (rcode) {
+ case WSREP_OK:
+ case WSREP_NOT_IMPLEMENTED:
+ case WSREP_CONN_FAIL:
+ /* provider does not support slave operations / disconnected from group,
+ * just close applier thread */
+ break;
+ case WSREP_NODE_FAIL:
+ /* data inconsistency => SST is needed */
+ /* Note: we cannot just blindly restart replication here,
+ * SST might require server restart if storage engines must be
+ * initialized after SST */
+ WSREP_ERROR("node consistency compromised, aborting");
+ wsrep_kill_mysql(thd);
+ break;
+ case WSREP_WARNING:
+ case WSREP_TRX_FAIL:
+ case WSREP_TRX_MISSING:
+ /* these suggests a bug in provider code */
+ WSREP_WARN("bad return from recv() call: %d", rcode);
+ /* fall through to node shutdown */
+ case WSREP_FATAL:
+ /* Cluster connectivity is lost.
+ *
+ * If applier was killed on purpose (KILL_CONNECTION), we
+ * avoid mysql shutdown. This is because the killer will then handle
+ * shutdown processing (or replication restarting)
+ */
+ if (thd->killed != THD::KILL_CONNECTION)
+ {
+ wsrep_kill_mysql(thd);
+ }
+ break;
+ }
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_close_applier(thd);
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ if (thd->temporary_tables)
+ {
+ WSREP_DEBUG("Applier %lu, has temporary tables at exit", thd->thread_id);
+ }
+ wsrep_return_from_bf_mode(thd, &shadow);
+ DBUG_VOID_RETURN;
+}
+
+void wsrep_create_appliers(long threads)
+{
+ if (!wsrep_connected)
+ {
+ /* see wsrep_replication_start() for the logic */
+ if (wsrep_cluster_address && strlen(wsrep_cluster_address) &&
+ wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ WSREP_ERROR("Trying to launch slave threads before creating "
+ "connection at '%s'", wsrep_cluster_address);
+ assert(0);
+ }
+ return;
+ }
+
+ long wsrep_threads=0;
+ pthread_t hThread;
+ while (wsrep_threads++ < threads) {
+ if (pthread_create(
+ &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_replication_process))
+ WSREP_WARN("Can't create thread to manage wsrep replication");
+ }
+}
+
+static void wsrep_rollback_process(THD *thd)
+{
+ DBUG_ENTER("wsrep_rollback_process");
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ wsrep_aborting_thd= NULL;
+
+ while (thd->killed == THD::NOT_KILLED) {
+ thd_proc_info(thd, "wsrep aborter idle");
+ thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
+ thd->mysys_var->current_cond= &COND_wsrep_rollback;
+
+ mysql_cond_wait(&COND_wsrep_rollback,&LOCK_wsrep_rollback);
+
+ WSREP_DEBUG("WSREP rollback thread wakes for signal");
+
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ thd_proc_info(thd, "wsrep aborter active");
+ thd->mysys_var->current_mutex= 0;
+ thd->mysys_var->current_cond= 0;
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+
+ /* check for false alarms */
+ if (!wsrep_aborting_thd)
+ {
+ WSREP_DEBUG("WSREP rollback thread has empty abort queue");
+ }
+ /* process all entries in the queue */
+ while (wsrep_aborting_thd) {
+ THD *aborting;
+ wsrep_aborting_thd_t next = wsrep_aborting_thd->next;
+ aborting = wsrep_aborting_thd->aborting_thd;
+ my_free(wsrep_aborting_thd);
+ wsrep_aborting_thd= next;
+ /*
+ * must release mutex, appliers my want to add more
+ * aborting thds in our work queue, while we rollback
+ */
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+
+ mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
+ if (aborting->wsrep_conflict_state== ABORTED)
+ {
+ WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d",
+ (long long)aborting->real_id,
+ aborting->wsrep_conflict_state);
+
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ continue;
+ }
+ aborting->wsrep_conflict_state= ABORTING;
+
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+
+ aborting->store_globals();
+
+ mysql_mutex_lock(&aborting->LOCK_wsrep_thd);
+ wsrep_client_rollback(aborting);
+ WSREP_DEBUG("WSREP rollbacker aborted thd: (%lu %llu)",
+ aborting->thread_id, (long long)aborting->real_id);
+ mysql_mutex_unlock(&aborting->LOCK_wsrep_thd);
+
+ mysql_mutex_lock(&LOCK_wsrep_rollback);
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_wsrep_rollback);
+ sql_print_information("WSREP: rollbacker thread exiting");
+
+ DBUG_PRINT("wsrep",("wsrep rollbacker thread exiting"));
+ DBUG_VOID_RETURN;
+}
+
+void wsrep_create_rollbacker()
+{
+ if (wsrep_provider && strcasecmp(wsrep_provider, "none"))
+ {
+ pthread_t hThread;
+ /* create rollbacker */
+ if (pthread_create( &hThread, &connection_attrib,
+ start_wsrep_THD, (void*)wsrep_rollback_process))
+ WSREP_WARN("Can't create thread to manage wsrep rollback");
+ }
+}
+
+extern "C"
+int wsrep_thd_is_brute_force(void *thd_ptr)
+{
+ if (thd_ptr) {
+ switch (((THD *)thd_ptr)->wsrep_exec_mode) {
+ case LOCAL_STATE:
+ {
+ if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING)
+ {
+ return 1;
+ }
+ return 0;
+ }
+ case REPL_RECV: return 1;
+ case TOTAL_ORDER: return 2;
+ case LOCAL_COMMIT: return 3;
+ }
+ }
+ return 0;
+}
+
+extern "C"
+int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal)
+{
+ THD *victim_thd = (THD *) victim_thd_ptr;
+ THD *bf_thd = (THD *) bf_thd_ptr;
+ DBUG_ENTER("wsrep_abort_thd");
+
+ if ( (WSREP(bf_thd) ||
+ ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) &&
+ bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) &&
+ victim_thd)
+ {
+ WSREP_DEBUG("wsrep_abort_thd, by: %llu, victim: %llu", (bf_thd) ?
+ (long long)bf_thd->real_id : 0, (long long)victim_thd->real_id);
+ ha_wsrep_abort_transaction(bf_thd, victim_thd, signal);
+ }
+ else
+ {
+ WSREP_DEBUG("wsrep_abort_thd not effective: %p %p", bf_thd, victim_thd);
+ }
+
+ DBUG_RETURN(1);
+}
+
+extern "C"
+int wsrep_thd_in_locking_session(void *thd_ptr)
+{
+ if (thd_ptr && ((THD *)thd_ptr)->in_lock_tables) {
+ return 1;
+ }
+ return 0;
+}
+
diff --git a/sql/wsrep_thd.h b/sql/wsrep_thd.h
new file mode 100644
index 00000000000..2397230b0a2
--- /dev/null
+++ b/sql/wsrep_thd.h
@@ -0,0 +1,31 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_THD_H
+#define WSREP_THD_H
+
+#include "sql_class.h"
+
+void wsrep_client_rollback(THD *thd);
+void wsrep_replay_transaction(THD *thd);
+void wsrep_create_appliers(long threads);
+void wsrep_create_rollbacker();
+
+extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
+extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr,
+ my_bool signal);
+extern "C" int wsrep_thd_in_locking_session(void *thd_ptr);
+
+#endif /* WSREP_THD_H */
diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc
index 53d0f7c449e..37e537c62e4 100644
--- a/sql/wsrep_utils.cc
+++ b/sql/wsrep_utils.cc
@@ -20,6 +20,10 @@
#define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag
#endif
+#include "wsrep_utils.h"
+#include "wsrep_mysqld.h"
+//#include "wsrep_api.h"
+//#include "wsrep_priv.h"
#include <spawn.h> // posix_spawn()
#include <unistd.h> // pipe()
#include <errno.h> // errno
diff --git a/sql/wsrep_utils.h b/sql/wsrep_utils.h
new file mode 100644
index 00000000000..337678238f8
--- /dev/null
+++ b/sql/wsrep_utils.h
@@ -0,0 +1,208 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_UTILS_H
+#define WSREP_UTILS_H
+
+#include "wsrep_priv.h"
+
+unsigned int wsrep_check_ip (const char* addr);
+size_t wsrep_guess_ip (char* buf, size_t buf_len);
+size_t wsrep_guess_address(char* buf, size_t buf_len);
+
+namespace wsp {
+class node_status
+{
+public:
+ node_status() : status(WSREP_MEMBER_UNDEFINED) {}
+ void set(wsrep_member_status_t new_status,
+ const wsrep_view_info_t* view = 0)
+ {
+ if (status != new_status || 0 != view)
+ {
+ wsrep_notify_status(new_status, view);
+ status = new_status;
+ }
+ }
+ wsrep_member_status_t get() const { return status; }
+private:
+ wsrep_member_status_t status;
+};
+} /* namespace wsp */
+
+extern wsp::node_status local_status;
+
+namespace wsp {
+/* A small class to run external programs. */
+class process
+{
+private:
+ const char* const str_;
+ FILE* io_;
+ int err_;
+ pid_t pid_;
+
+public:
+/*! @arg type is a pointer to a null-terminated string which must contain
+ either the letter 'r' for reading or the letter 'w' for writing.
+ */
+ process (const char* cmd, const char* type);
+ ~process ();
+
+ FILE* pipe () { return io_; }
+ int error() { return err_; }
+ int wait ();
+ const char* cmd() { return str_; }
+};
+
+class thd
+{
+ class thd_init
+ {
+ public:
+ thd_init() { my_thread_init(); }
+ ~thd_init() { my_thread_end(); }
+ }
+ init;
+
+ thd (const thd&);
+ thd& operator= (const thd&);
+
+public:
+
+ thd(my_bool wsrep_on);
+ ~thd();
+ THD* const ptr;
+};
+
+class string
+{
+public:
+ string() : string_(0) {}
+ void set(char* str) { if (string_) free (string_); string_ = str; }
+ ~string() { set (0); }
+private:
+ char* string_;
+};
+
+#ifdef REMOVED
+class lock
+{
+ pthread_mutex_t* const mtx_;
+
+public:
+
+ lock (pthread_mutex_t* mtx) : mtx_(mtx)
+ {
+ int err = pthread_mutex_lock (mtx_);
+
+ if (err)
+ {
+ WSREP_ERROR("Mutex lock failed: %s", strerror(err));
+ abort();
+ }
+ }
+
+ virtual ~lock ()
+ {
+ int err = pthread_mutex_unlock (mtx_);
+
+ if (err)
+ {
+ WSREP_ERROR("Mutex unlock failed: %s", strerror(err));
+ abort();
+ }
+ }
+
+ inline void wait (pthread_cond_t* cond)
+ {
+ pthread_cond_wait (cond, mtx_);
+ }
+
+private:
+
+ lock (const lock&);
+ lock& operator=(const lock&);
+
+};
+
+class monitor
+{
+ int mutable refcnt;
+ pthread_mutex_t mutable mtx;
+ pthread_cond_t mutable cond;
+
+public:
+
+ monitor() : refcnt(0)
+ {
+ pthread_mutex_init (&mtx, NULL);
+ pthread_cond_init (&cond, NULL);
+ }
+
+ ~monitor()
+ {
+ pthread_mutex_destroy (&mtx);
+ pthread_cond_destroy (&cond);
+ }
+
+ void enter() const
+ {
+ lock l(&mtx);
+
+ while (refcnt)
+ {
+ l.wait(&cond);
+ }
+ refcnt++;
+ }
+
+ void leave() const
+ {
+ lock l(&mtx);
+
+ refcnt--;
+ if (refcnt == 0)
+ {
+ pthread_cond_signal (&cond);
+ }
+ }
+
+private:
+
+ monitor (const monitor&);
+ monitor& operator= (const monitor&);
+};
+
+class critical
+{
+ const monitor& mon;
+
+public:
+
+ critical(const monitor& m) : mon(m) { mon.enter(); }
+
+ ~critical() { mon.leave(); }
+
+private:
+
+ critical (const critical&);
+ critical& operator= (const critical&);
+};
+#endif
+
+} // namespace wsrep
+
+#endif /* WSREP_UTILS_H */
diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc
index 17e8de9ba6b..bd041ed51ff 100644
--- a/sql/wsrep_var.cc
+++ b/sql/wsrep_var.cc
@@ -13,12 +13,15 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+#include "wsrep_var.h"
+
#include <mysqld.h>
#include <sql_class.h>
#include <sql_plugin.h>
#include <set_var.h>
#include <sql_acl.h>
#include "wsrep_priv.h"
+#include "wsrep_thd.h"
#include <my_dir.h>
#include <cstdio>
#include <cstdlib>
@@ -157,7 +160,7 @@ void wsrep_start_position_init (const char* val)
if (NULL == val || wsrep_start_position_verify (val))
{
WSREP_ERROR("Bad initial value for wsrep_start_position: %s",
- (val ? val : ""));
+ (val ? val : ""));
return;
}
@@ -173,7 +176,7 @@ static bool refresh_provider_options()
{
if (wsrep_provider_options) my_free((void *)wsrep_provider_options);
wsrep_provider_options = (char*)my_memdup(opts, strlen(opts) + 1,
- MYF(MY_WME));
+ MYF(MY_WME));
}
else
{
diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h
new file mode 100644
index 00000000000..1dd0beac3e3
--- /dev/null
+++ b/sql/wsrep_var.h
@@ -0,0 +1,81 @@
+/* Copyright (C) 2013 Codership Oy <info@codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
+
+#ifndef WSREP_VAR_H
+#define WSREP_VAR_H
+
+#define WSREP_NODE_INCOMING_AUTO "AUTO"
+
+// MySQL variables funcs
+
+#include "sql_priv.h"
+class sys_var;
+class set_var;
+class THD;
+
+#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var)
+#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type)
+#define DEFAULT_ARGS (THD* thd, enum_var_type var_type)
+#define INIT_ARGS (const char* opt)
+
+extern bool wsrep_on_update UPDATE_ARGS;
+extern void wsrep_causal_reads_update UPDATE_ARGS;
+extern bool wsrep_start_position_check CHECK_ARGS;
+extern bool wsrep_start_position_update UPDATE_ARGS;
+extern void wsrep_start_position_init INIT_ARGS;
+
+extern bool wsrep_provider_check CHECK_ARGS;
+extern bool wsrep_provider_update UPDATE_ARGS;
+extern void wsrep_provider_init INIT_ARGS;
+
+extern bool wsrep_provider_options_check CHECK_ARGS;
+extern bool wsrep_provider_options_update UPDATE_ARGS;
+extern void wsrep_provider_options_init INIT_ARGS;
+
+extern bool wsrep_cluster_address_check CHECK_ARGS;
+extern bool wsrep_cluster_address_update UPDATE_ARGS;
+extern void wsrep_cluster_address_init INIT_ARGS;
+
+extern bool wsrep_cluster_name_check CHECK_ARGS;
+extern bool wsrep_cluster_name_update UPDATE_ARGS;
+
+extern bool wsrep_node_name_check CHECK_ARGS;
+extern bool wsrep_node_name_update UPDATE_ARGS;
+
+extern bool wsrep_node_address_check CHECK_ARGS;
+extern bool wsrep_node_address_update UPDATE_ARGS;
+extern void wsrep_node_address_init INIT_ARGS;
+
+extern bool wsrep_sst_method_check CHECK_ARGS;
+extern bool wsrep_sst_method_update UPDATE_ARGS;
+extern void wsrep_sst_method_init INIT_ARGS;
+
+extern bool wsrep_sst_receive_address_check CHECK_ARGS;
+extern bool wsrep_sst_receive_address_update UPDATE_ARGS;
+
+extern bool wsrep_sst_auth_check CHECK_ARGS;
+extern bool wsrep_sst_auth_update UPDATE_ARGS;
+extern void wsrep_sst_auth_init INIT_ARGS;
+
+extern bool wsrep_sst_donor_check CHECK_ARGS;
+extern bool wsrep_sst_donor_update UPDATE_ARGS;
+
+extern bool wsrep_slave_threads_check CHECK_ARGS;
+extern bool wsrep_slave_threads_update UPDATE_ARGS;
+
+extern bool wsrep_desync_check CHECK_ARGS;
+extern bool wsrep_desync_update UPDATE_ARGS;
+
+#endif /* WSREP_VAR_H */