summaryrefslogtreecommitdiff
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc2438
1 files changed, 1213 insertions, 1225 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
index 58b30a1e77f..73f201f12ca 100644
--- a/sql/wsrep_mysqld.cc
+++ b/sql/wsrep_mysqld.cc
@@ -14,7 +14,12 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */
#include "sql_plugin.h" /* wsrep_plugins_pre_init() */
+#include "my_global.h"
+#include "wsrep_server_state.h"
+
+#include "mariadb.h"
#include <mysqld.h>
+#include <transaction.h>
#include <sql_class.h>
#include <sql_parse.h>
#include <sql_base.h> /* find_temporary_table() */
@@ -33,26 +38,32 @@
#include "wsrep_var.h"
#include "wsrep_binlog.h"
#include "wsrep_applier.h"
+#include "wsrep_schema.h"
#include "wsrep_xid.h"
+#include "wsrep_trans_observer.h"
+#include "mysql/service_wsrep.h"
#include <cstdio>
#include <cstdlib>
+#include <string>
#include "log_event.h"
#include <slave.h>
-wsrep_t *wsrep = NULL;
-/*
- wsrep_emulate_bin_log is a flag to tell that binlog has not been configured.
- wsrep needs to get binlog events from transaction cache even when binlog is
- not enabled, wsrep_emulate_bin_log opens needed code paths to make this
- possible
-*/
-my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
+#include <sstream>
+
+/* wsrep-lib */
+Wsrep_server_state* Wsrep_server_state::m_instance;
+
+my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
#ifdef GTID_SUPPORT
/* Sidno in global_sid_map corresponding to group uuid */
rpl_sidno wsrep_sidno= -1;
#endif /* GTID_SUPPORT */
my_bool wsrep_preordered_opt= FALSE;
+/* Streaming Replication */
+const char *wsrep_fragment_units[]= { "bytes", "rows", "statements", NullS };
+const char *wsrep_SR_store_types[]= { "none", "table", NullS };
+
/*
* Begin configuration options
*/
@@ -82,7 +93,7 @@ my_bool wsrep_certify_nonPK; // Certify, even when no primary
my_bool wsrep_recovery; // Recovery
my_bool wsrep_replicate_myisam; // Enable MyISAM replication
my_bool wsrep_log_conflicts;
-my_bool wsrep_load_data_splitting; // Commit load data every 10K intervals
+my_bool wsrep_load_data_splitting= 0; // Commit load data every 10K intervals
my_bool wsrep_slave_UK_checks; // Slave thread does UK checks
my_bool wsrep_slave_FK_checks; // Slave thread does FK checks
my_bool wsrep_restart_slave; // Should mysql slave thread be
@@ -107,7 +118,13 @@ my_bool wsrep_restart_slave_activated= 0; // Node has dropped, and slave
bool wsrep_new_cluster= false; // Bootstrap the cluster?
int wsrep_slave_count_change= 0; // No. of appliers to stop/start
int wsrep_to_isolation= 0; // No. of active TO isolation threads
-long wsrep_max_protocol_version= 3; // Maximum protocol version to use
+long wsrep_max_protocol_version= 4; // Maximum protocol version to use
+long int wsrep_protocol_version= wsrep_max_protocol_version;
+ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES;
+ // unit for fragment size
+ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE;
+uint wsrep_ignore_apply_errors= 0;
+
/*
* End configuration options
@@ -123,29 +140,33 @@ mysql_mutex_t LOCK_wsrep_sst;
mysql_cond_t COND_wsrep_sst;
mysql_mutex_t LOCK_wsrep_sst_init;
mysql_cond_t COND_wsrep_sst_init;
-mysql_mutex_t LOCK_wsrep_rollback;
-mysql_cond_t COND_wsrep_rollback;
-wsrep_aborting_thd_t wsrep_aborting_thd= NULL;
mysql_mutex_t LOCK_wsrep_replaying;
mysql_cond_t COND_wsrep_replaying;
mysql_mutex_t LOCK_wsrep_slave_threads;
mysql_mutex_t LOCK_wsrep_desync;
mysql_mutex_t LOCK_wsrep_config_state;
+mysql_mutex_t LOCK_wsrep_SR_pool;
+mysql_mutex_t LOCK_wsrep_SR_store;
int wsrep_replaying= 0;
-ulong wsrep_running_threads = 0; // # of currently running wsrep threads
+ulong wsrep_running_threads= 0; // # of currently running wsrep threads
ulong my_bind_addr;
#ifdef HAVE_PSI_INTERFACE
-PSI_mutex_key key_LOCK_wsrep_rollback,
+PSI_mutex_key
key_LOCK_wsrep_replaying, key_LOCK_wsrep_ready, key_LOCK_wsrep_sst,
key_LOCK_wsrep_sst_thread, key_LOCK_wsrep_sst_init,
key_LOCK_wsrep_slave_threads, key_LOCK_wsrep_desync,
- key_LOCK_wsrep_config_state;
+ key_LOCK_wsrep_config_state,
+ key_LOCK_wsrep_SR_pool,
+ key_LOCK_wsrep_SR_store,
+ key_LOCK_wsrep_thd_queue;
-PSI_cond_key key_COND_wsrep_rollback,
+PSI_cond_key key_COND_wsrep_thd,
key_COND_wsrep_replaying, key_COND_wsrep_ready, key_COND_wsrep_sst,
- key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread;
+ key_COND_wsrep_sst_init, key_COND_wsrep_sst_thread,
+ key_COND_wsrep_thd_queue;
+
PSI_file_key key_file_wsrep_gra_log;
@@ -156,11 +177,12 @@ static PSI_mutex_info wsrep_mutexes[]=
{ &key_LOCK_wsrep_sst_thread, "wsrep_sst_thread", 0},
{ &key_LOCK_wsrep_sst_init, "LOCK_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_sst, "LOCK_wsrep_sst", PSI_FLAG_GLOBAL},
- { &key_LOCK_wsrep_rollback, "LOCK_wsrep_rollback", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_replaying, "LOCK_wsrep_replaying", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_slave_threads, "LOCK_wsrep_slave_threads", PSI_FLAG_GLOBAL},
{ &key_LOCK_wsrep_desync, "LOCK_wsrep_desync", PSI_FLAG_GLOBAL},
- { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL}
+ { &key_LOCK_wsrep_config_state, "LOCK_wsrep_config_state", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_SR_pool, "LOCK_wsrep_SR_pool", PSI_FLAG_GLOBAL},
+ { &key_LOCK_wsrep_SR_store, "LOCK_wsrep_SR_store", PSI_FLAG_GLOBAL}
};
static PSI_cond_info wsrep_conds[]=
@@ -169,7 +191,7 @@ static PSI_cond_info wsrep_conds[]=
{ &key_COND_wsrep_sst, "COND_wsrep_sst", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_init, "COND_wsrep_sst_init", PSI_FLAG_GLOBAL},
{ &key_COND_wsrep_sst_thread, "wsrep_sst_thread", 0},
- { &key_COND_wsrep_rollback, "COND_wsrep_rollback", PSI_FLAG_GLOBAL},
+ { &key_COND_wsrep_thd, "THD::COND_wsrep_thd", 0},
{ &key_COND_wsrep_replaying, "COND_wsrep_replaying", PSI_FLAG_GLOBAL}
};
@@ -179,310 +201,219 @@ static PSI_file_info wsrep_files[]=
};
#endif
-my_bool wsrep_inited = 0; // initialized ?
+my_bool wsrep_inited= 0; // initialized ?
-static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
+static wsrep_uuid_t node_uuid= WSREP_UUID_UNDEFINED;
+static wsrep_uuid_t cluster_uuid= WSREP_UUID_UNDEFINED;
static char cluster_uuid_str[40]= { 0, };
-static const char* cluster_status_str[WSREP_VIEW_MAX] =
-{
- "Primary",
- "non-Primary",
- "Disconnected"
-};
static char provider_name[256]= { 0, };
static char provider_version[256]= { 0, };
static char provider_vendor[256]= { 0, };
/*
- * wsrep status variables
+ * Wsrep status variables. LOCK_status must be locked When modifying
+ * these variables,
*/
-my_bool wsrep_connected = FALSE;
-my_bool wsrep_ready = FALSE; // node can accept queries
-const char* wsrep_cluster_state_uuid = cluster_uuid_str;
-long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
-const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
-long wsrep_cluster_size = 0;
-long wsrep_local_index = -1;
-long long wsrep_local_bf_aborts = 0;
-const char* wsrep_provider_name = provider_name;
-const char* wsrep_provider_version = provider_version;
-const char* wsrep_provider_vendor = provider_vendor;
+my_bool wsrep_connected = FALSE;
+my_bool wsrep_ready = FALSE;
+const char* wsrep_cluster_state_uuid= cluster_uuid_str;
+long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
+const char* wsrep_cluster_status = "Disconnected";
+long wsrep_cluster_size = 0;
+long wsrep_local_index = -1;
+long long wsrep_local_bf_aborts = 0;
+const char* wsrep_provider_name = provider_name;
+const char* wsrep_provider_version = provider_version;
+const char* wsrep_provider_vendor = provider_vendor;
+char* wsrep_provider_capabilities = NULL;
+char* wsrep_cluster_capabilities = NULL;
/* End wsrep status variables */
-wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
-wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
-long wsrep_protocol_version = 3;
-
wsp::Config_state *wsrep_config_state;
-// Boolean denoting if server is in initial startup phase. This is needed
-// to make sure that main thread waiting in wsrep_sst_wait() is signaled
-// if there was no state gap on receiving first view event.
-static my_bool wsrep_startup = TRUE;
+wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
+wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
+wsp::node_status local_status;
-static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
- switch (level) {
- case WSREP_LOG_INFO:
- sql_print_information("WSREP: %s", msg);
- break;
- case WSREP_LOG_WARN:
- sql_print_warning("WSREP: %s", msg);
- break;
- case WSREP_LOG_ERROR:
- case WSREP_LOG_FATAL:
+/*
+ */
+Wsrep_schema *wsrep_schema= 0;
+
+static void wsrep_log_cb(wsrep::log::level level, const char *msg)
+{
+ /*
+ Silence all wsrep related logging from lib and provider if
+ wsrep is not enabled.
+ */
+ if (WSREP_ON)
+ {
+ switch (level) {
+ case wsrep::log::info:
+ sql_print_information("WSREP: %s", msg);
+ break;
+ case wsrep::log::warning:
+ sql_print_warning("WSREP: %s", msg);
+ break;
+ case wsrep::log::error:
sql_print_error("WSREP: %s", msg);
break;
- case WSREP_LOG_DEBUG:
- if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
- default:
- break;
+ case wsrep::log::debug:
+ if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
+ default:
+ break;
+ }
}
}
-void wsrep_log(void (*fun)(const char *, ...), const char *format, ...)
+void wsrep_init_sidno(const wsrep::id& uuid)
{
- va_list args;
- char msg[1024];
- va_start(args, format);
- vsnprintf(msg, sizeof(msg) - 1, format, args);
- va_end(args);
- (fun)("WSREP: %s", msg);
-}
-
-
-static void wsrep_log_states (wsrep_log_level_t const level,
- const wsrep_uuid_t* const group_uuid,
- wsrep_seqno_t const group_seqno,
- const wsrep_uuid_t* const node_uuid,
- wsrep_seqno_t const node_seqno)
-{
- char uuid_str[37];
- char msg[256];
-
- wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str));
- snprintf (msg, 255, "WSREP: Group state: %s:%lld",
- uuid_str, (long long)group_seqno);
- wsrep_log_cb (level, msg);
-
- wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str));
- snprintf (msg, 255, "WSREP: Local state: %s:%lld",
- uuid_str, (long long)node_seqno);
- wsrep_log_cb (level, msg);
-}
-
-#ifdef GTID_SUPPORT
-void wsrep_init_sidno(const wsrep_uuid_t& wsrep_uuid)
-{
- /* generate new Sid map entry from inverted uuid */
- rpl_sid sid;
- wsrep_uuid_t ltid_uuid;
-
- for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
+ /*
+ Protocol versions starting from 4 use group gtid as it is.
+ For lesser protocol versions generate new Sid map entry from inverted
+ uuid.
+ */
+ rpl_gtid sid;
+ if (wsrep_protocol_version >= 4)
{
- ltid_uuid.data[i] = ~wsrep_uuid.data[i];
+ memcpy((void*)&sid, (const uchar*)uuid.data(),16);
}
-
- sid.copy_from(ltid_uuid.data);
+ else
+ {
+ wsrep_uuid_t ltid_uuid;
+ for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
+ {
+ ltid_uuid.data[i]= ~((const uchar*)uuid.data())[i];
+ }
+ memcpy((void*)&sid, (const uchar*)ltid_uuid.data,16);
+ }
+#ifdef GTID_SUPPORT
global_sid_lock->wrlock();
wsrep_sidno= global_sid_map->add_sid(sid);
WSREP_INFO("Initialized wsrep sidno %d", wsrep_sidno);
global_sid_lock->unlock();
+#endif
}
-#endif /* GTID_SUPPORT */
-static wsrep_cb_status_t
-wsrep_view_handler_cb (void* app_ctx,
- void* recv_ctx,
- const wsrep_view_info_t* view,
- const char* state,
- size_t state_len,
- void** sst_req,
- size_t* sst_req_len)
+void wsrep_init_schema()
{
- *sst_req = NULL;
- *sst_req_len = 0;
-
- wsrep_member_status_t memb_status= wsrep_config_state->get_status();
-
- if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
- {
- memcpy(&cluster_uuid, &view->state_id.uuid, sizeof(cluster_uuid));
-
- wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
- sizeof(cluster_uuid_str));
- }
-
- wsrep_cluster_conf_id= view->view;
- wsrep_cluster_status= cluster_status_str[view->status];
- wsrep_cluster_size= view->memb_num;
- wsrep_local_index= view->my_idx;
-
- WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
- "number of nodes: %ld, my index: %ld, protocol version %d",
- wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
- (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
- wsrep_cluster_size, wsrep_local_index, view->proto_ver);
-
- /* Proceed further only if view is PRIMARY */
- if (WSREP_VIEW_PRIMARY != view->status)
- {
-#ifdef HAVE_QUERY_CACHE
- // query cache must be initialised by now
- query_cache.flush();
-#endif /* HAVE_QUERY_CACHE */
-
- wsrep_ready_set(FALSE);
- memb_status= WSREP_MEMBER_UNDEFINED;
- /* Always record local_uuid and local_seqno in non-prim since this
- * may lead to re-initializing provider and start position is
- * determined according to these variables */
- // WRONG! local_uuid should be the last primary configuration uuid we were
- // a member of. local_seqno should be updated in commit calls.
- // local_uuid= cluster_uuid;
- // local_seqno= view->first - 1;
- goto out;
- }
+ DBUG_ASSERT(!wsrep_schema);
- switch (view->proto_ver)
+ WSREP_INFO("wsrep_init_schema_and_SR %p", wsrep_schema);
+ if (!wsrep_schema)
{
- case 0:
- case 1:
- case 2:
- case 3:
- // version change
- if (view->proto_ver != wsrep_protocol_version)
- {
- my_bool wsrep_ready_saved= wsrep_ready_get();
- wsrep_ready_set(FALSE);
- WSREP_INFO("closing client connections for "
- "protocol change %ld -> %d",
- wsrep_protocol_version, view->proto_ver);
- wsrep_close_client_connections(TRUE);
- wsrep_protocol_version= view->proto_ver;
- wsrep_ready_set(wsrep_ready_saved);
- }
- break;
- default:
- WSREP_ERROR("Unsupported application protocol version: %d",
- view->proto_ver);
- unireg_abort(1);
- }
-
- if (view->state_gap)
- {
- WSREP_WARN("Gap in state sequence. Need state transfer.");
-
- /* After that wsrep will call wsrep_sst_prepare. */
- /* keep ready flag 0 until we receive the snapshot */
- wsrep_ready_set(FALSE);
-
- /* Close client connections to ensure that they don't interfere
- * with SST. Necessary only if storage engines are initialized
- * before SST.
- * TODO: Just killing all ongoing transactions should be enough
- * since wsrep_ready is OFF and no new transactions can start.
- */
- if (!wsrep_before_SE())
+ wsrep_schema= new Wsrep_schema();
+ if (wsrep_schema->init())
{
- WSREP_DEBUG("[debug]: closing client connections for PRIM");
- wsrep_close_client_connections(FALSE);
+ WSREP_ERROR("Failed to init wsrep schema");
+ unireg_abort(1);
}
+ }
+}
- ssize_t const req_len= wsrep_sst_prepare (sst_req);
+void wsrep_deinit_schema()
+{
+ delete wsrep_schema;
+ wsrep_schema= 0;
+}
- if (req_len < 0)
+void wsrep_recover_sr_from_storage(THD *orig_thd)
+{
+ switch (wsrep_SR_store_type)
+ {
+ case WSREP_SR_STORE_TABLE:
+ if (!wsrep_schema)
{
- WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len,
- strerror(-req_len));
- memb_status= WSREP_MEMBER_UNDEFINED;
+ WSREP_ERROR("Wsrep schema not initialized when trying to recover "
+ "streaming transactions");
+ unireg_abort(1);
}
- else
+ if (wsrep_schema->recover_sr_transactions(orig_thd))
{
- assert(sst_req != NULL);
- *sst_req_len= req_len;
- memb_status= WSREP_MEMBER_JOINER;
+ WSREP_ERROR("Failed to recover SR transactions from schema");
+ unireg_abort(1);
}
+ break;
+ default:
+ /* */
+ WSREP_ERROR("Unsupported wsrep SR store type: %lu", wsrep_SR_store_type);
+ unireg_abort(1);
+ break;
}
- else
- {
- /*
- * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized
- * before - OR - it was reinitilized on startup (lp:992840)
- */
- if (wsrep_startup)
+}
+
+/** Export the WSREP provider's capabilities as a human readable string.
+ * The result is saved in a dynamically allocated string of the form:
+ * :cap1:cap2:cap3:
+ */
+static void wsrep_capabilities_export(wsrep_cap_t const cap, char** str)
+{
+ static const char* names[] =
+ {
+ /* Keep in sync with wsrep/wsrep_api.h WSREP_CAP_* macros. */
+ "MULTI_MASTER",
+ "CERTIFICATION",
+ "PARALLEL_APPLYING",
+ "TRX_REPLAY",
+ "ISOLATION",
+ "PAUSE",
+ "CAUSAL_READS",
+ "CAUSAL_TRX",
+ "INCREMENTAL_WRITESET",
+ "SESSION_LOCKS",
+ "DISTRIBUTED_LOCKS",
+ "CONSISTENCY_CHECK",
+ "UNORDERED",
+ "ANNOTATION",
+ "PREORDERED",
+ "STREAMING",
+ "SNAPSHOT",
+ "NBO",
+ };
+
+ std::string s;
+ for (size_t i= 0; i < sizeof(names) / sizeof(names[0]); ++i)
+ {
+ if (cap & (1ULL << i))
{
- if (wsrep_before_SE())
+ if (s.empty())
{
- wsrep_SE_init_grab();
- // Signal mysqld init thread to continue
- wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
- // and wait for SE initialization
- wsrep_SE_init_wait();
+ s= ":";
}
- else
- {
- local_uuid= cluster_uuid;
- local_seqno= view->state_id.seqno;
- }
- /* Init storage engine XIDs from first view */
- wsrep_set_SE_checkpoint(local_uuid, local_seqno);
-#ifdef GTID_SUPPORT
- wsrep_init_sidno(local_uuid);
-#endif /* GTID_SUPPORT */
- memb_status= WSREP_MEMBER_JOINED;
- }
-
- // just some sanity check
- if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
- {
- WSREP_ERROR("Undetected state gap. Can't continue.");
- wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
- &local_uuid, -1);
- unireg_abort(1);
+ s += names[i];
+ s += ":";
}
}
- if (wsrep_auto_increment_control)
- {
- global_system_variables.auto_increment_offset= view->my_idx + 1;
- global_system_variables.auto_increment_increment= view->memb_num;
- }
+ /* A read from the string pointed to by *str may be started at any time,
+ * so it must never point to free(3)d memory or non '\0' terminated string. */
- { /* capabilities may be updated on new configuration */
- uint64_t const caps(wsrep->capabilities (wsrep));
-
- my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
- if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
- {
- WSREP_WARN("Unsupported protocol downgrade: "
- "incremental data collection disabled. Expect abort.");
- }
- wsrep_incremental_data_collection = idc;
- }
+ char* const previous= *str;
-out:
- if (view->status == WSREP_VIEW_PRIMARY) wsrep_startup= FALSE;
- wsrep_config_state->set(memb_status, view);
+ *str= strdup(s.c_str());
- return WSREP_CB_SUCCESS;
+ if (previous != NULL)
+ {
+ free(previous);
+ }
}
-my_bool wsrep_ready_set (my_bool x)
+/* Verifies that SE position is consistent with the group position
+ * and initializes other variables */
+void wsrep_verify_SE_checkpoint(const wsrep_uuid_t& uuid,
+ wsrep_seqno_t const seqno)
{
- WSREP_DEBUG("Setting wsrep_ready to %d", x);
- if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
- my_bool ret= (wsrep_ready != x);
- if (ret)
- {
- wsrep_ready= x;
- mysql_cond_signal (&COND_wsrep_ready);
- }
- mysql_mutex_unlock (&LOCK_wsrep_ready);
- return ret;
}
+/*
+ Wsrep is considered ready if
+ 1) Provider is not loaded (native mode)
+ 2) Server has reached synced state
+ 3) Server is in joiner mode and mysqldump SST method has been
+ specified
+ See Wsrep_server_service::log_state_change() for further details.
+ */
my_bool wsrep_ready_get (void)
{
if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
@@ -499,178 +430,67 @@ int wsrep_show_ready(THD *thd, SHOW_VAR *var, char *buff)
return 0;
}
-// Wait until wsrep has reached ready state
-void wsrep_ready_wait ()
+void wsrep_update_cluster_state_uuid(const char* uuid)
{
- if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
- while (!wsrep_ready)
- {
- WSREP_INFO("Waiting to reach ready state");
- mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready);
- }
- WSREP_INFO("ready state reached");
- mysql_mutex_unlock (&LOCK_wsrep_ready);
+ strncpy(cluster_uuid_str, uuid, sizeof(cluster_uuid_str) - 1);
}
-static void wsrep_synced_cb(void* app_ctx)
+static void wsrep_init_position()
{
- WSREP_INFO("Synchronized with group, ready for connections");
- my_bool signal_main= wsrep_ready_set(TRUE);
- wsrep_config_state->set(WSREP_MEMBER_SYNCED);
-
- if (signal_main)
- {
- wsrep_SE_init_grab();
- // Signal mysqld init thread to continue
- wsrep_sst_complete (&local_uuid, local_seqno, false);
- // and wait for SE initialization
- wsrep_SE_init_wait();
- }
- if (wsrep_restart_slave_activated)
- {
- int rcode;
- WSREP_INFO("MariaDB slave restart");
- wsrep_restart_slave_activated= FALSE;
-
- mysql_mutex_lock(&LOCK_active_mi);
- if ((rcode = start_slave_threads(0,
- 1 /* need mutex */,
- 0 /* no wait for start*/,
- active_mi,
- master_info_file,
- relay_log_info_file,
- SLAVE_SQL)))
- {
- WSREP_WARN("Failed to create slave threads: %d", rcode);
- }
- mysql_mutex_unlock(&LOCK_active_mi);
-
- }
}
-static void wsrep_init_position()
+/****************************************************************************
+ Helpers for wsrep_init()
+ ****************************************************************************/
+static std::string wsrep_server_name()
{
- /* read XIDs from storage engines */
- wsrep_uuid_t uuid;
- wsrep_seqno_t seqno;
- wsrep_get_SE_checkpoint(uuid, seqno);
-
- if (!memcmp(&uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)))
- {
- WSREP_INFO("Read nil XID from storage engines, skipping position init");
- return;
- }
-
- char uuid_str[40] = {0, };
- wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
- WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno);
-
- if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) &&
- local_seqno == WSREP_SEQNO_UNDEFINED)
- {
- // Initial state
- local_uuid= uuid;
- local_seqno= seqno;
- }
- else if (memcmp(&local_uuid, &uuid, sizeof(local_uuid)) ||
- local_seqno != seqno)
- {
- WSREP_WARN("Initial position was provided by configuration or SST, "
- "avoiding override");
- }
+ std::string ret(wsrep_node_name ? wsrep_node_name : "");
+ return ret;
}
-extern char* my_bind_addr_str;
-
-int wsrep_init()
+static std::string wsrep_server_id()
{
- int rcode= -1;
- DBUG_ASSERT(wsrep_inited == 0);
-
- if (strcmp(wsrep_start_position, WSREP_START_POSITION_ZERO) &&
- wsrep_start_position_init(wsrep_start_position))
- {
- return 1;
- }
-
- wsrep_sst_auth_init();
-
- wsrep_ready_set(FALSE);
- assert(wsrep_provider);
-
- wsrep_init_position();
-
- if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK)
- {
- if (strcasecmp(wsrep_provider, WSREP_NONE))
- {
- WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.",
- wsrep_provider, strerror(rcode), rcode);
- strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack
- return wsrep_init();
- }
- else /* this is for recursive call above */
- {
- WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.",
- strerror(rcode), rcode);
- unireg_abort(1);
- }
- }
+ /* using empty server_id, which enables view change handler to
+ set final server_id later on
+ */
+ std::string ret("");
+ return ret;
+}
- if (!WSREP_PROVIDER_EXISTS)
- {
- // enable normal operation in case no provider is specified
- wsrep_ready_set(TRUE);
- wsrep_inited= 1;
- global_system_variables.wsrep_on = 0;
- wsrep_init_args args;
- args.logger_cb = wsrep_log_cb;
- args.options = (wsrep_provider_options) ?
- wsrep_provider_options : "";
- rcode = wsrep->init(wsrep, &args);
- if (rcode)
- {
- DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
- WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
- wsrep->free(wsrep);
- free(wsrep);
- wsrep = NULL;
- }
- return rcode;
- }
- else
- {
- global_system_variables.wsrep_on = 1;
- strncpy(provider_name,
- wsrep->provider_name, sizeof(provider_name) - 1);
- strncpy(provider_version,
- wsrep->provider_version, sizeof(provider_version) - 1);
- strncpy(provider_vendor,
- wsrep->provider_vendor, sizeof(provider_vendor) - 1);
- }
+static std::string wsrep_server_node_address()
+{
+ std::string ret;
if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
- wsrep_data_home_dir = mysql_real_data_home;
+ wsrep_data_home_dir= mysql_real_data_home;
/* Initialize node address */
- char node_addr[512]= { 0, };
- size_t const node_addr_max= sizeof(node_addr) - 1;
if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
{
- size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
- if (!(ret > 0 && ret < node_addr_max))
+ char node_addr[512]= {0, };
+ const size_t node_addr_max= sizeof(node_addr) - 1;
+ size_t guess_ip_ret= wsrep_guess_ip(node_addr, node_addr_max);
+ if (!(guess_ip_ret > 0 && guess_ip_ret < node_addr_max))
{
WSREP_WARN("Failed to guess base node address. Set it explicitly via "
"wsrep_node_address.");
- node_addr[0]= '\0';
+ }
+ else
+ {
+ ret= node_addr;
}
}
else
{
- strncpy(node_addr, wsrep_node_address, node_addr_max);
+ ret= wsrep_node_address;
}
+ return ret;
+}
- /* Initialize node's incoming address */
+static std::string wsrep_server_incoming_address()
+{
+ std::string ret;
+ const std::string node_addr(wsrep_server_node_address());
char inc_addr[512]= { 0, };
size_t const inc_addr_max= sizeof (inc_addr);
@@ -685,7 +505,8 @@ int wsrep_init()
bool is_ipv6= false;
unsigned int my_bind_ip= INADDR_ANY; // default if not set
- if (my_bind_addr_str && strlen(my_bind_addr_str))
+ if (my_bind_addr_str && strlen(my_bind_addr_str) &&
+ strcmp(my_bind_addr_str, "*") != 0)
{
my_bind_ip= wsrep_check_ip(my_bind_addr_str, &is_ipv6);
}
@@ -704,22 +525,28 @@ int wsrep_init()
}
else /* mysqld binds to 0.0.0.0, try taking IP from wsrep_node_address. */
{
- size_t const node_addr_len= strlen(node_addr);
- if (node_addr_len > 0)
+ if (node_addr.size())
{
- wsp::Address addr(node_addr);
-
- if (!addr.is_valid())
+ size_t const ip_len= wsrep_host_len(node_addr.c_str(), node_addr.size());
+ if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
{
- WSREP_DEBUG("Could not parse node address : %s", node_addr);
- WSREP_WARN("Guessing address for incoming client connections failed. "
- "Try setting wsrep_node_incoming_address explicitly.");
- goto done;
+ memcpy (inc_addr, node_addr.c_str(), ip_len);
+ snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",
+ (int)mysqld_port);
}
+ else
+ {
+ WSREP_WARN("Guessing address for incoming client connections: "
+ "address too long.");
+ inc_addr[0]= '\0';
+ }
+ }
- const char *fmt= (addr.is_ipv6()) ? "[%s]:%u" : "%s:%u";
- snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(),
- (int) mysqld_port);
+ if (!strlen(inc_addr))
+ {
+ WSREP_WARN("Guessing address for incoming client connections failed. "
+ "Try setting wsrep_node_incoming_address explicitly.");
+ WSREP_INFO("Node addr: %s", node_addr.c_str());
}
}
}
@@ -743,52 +570,178 @@ int wsrep_init()
snprintf(inc_addr, inc_addr_max, fmt, addr.get_address(), port);
}
+
+ done:
+ ret= wsrep_node_incoming_address;
+ return ret;
+}
-done:
- struct wsrep_init_args wsrep_args;
+static std::string wsrep_server_working_dir()
+{
+ std::string ret;
+ if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
+ {
+ ret= mysql_real_data_home;
+ }
+ else
+ {
+ ret= wsrep_data_home_dir;
+ }
+ return ret;
+}
- struct wsrep_gtid const state_id = { local_uuid, local_seqno };
+static wsrep::gtid wsrep_server_initial_position()
+{
+ wsrep::gtid ret;
+ WSREP_INFO("Server initial position: %s", wsrep_start_position);
+ std::istringstream is(wsrep_start_position);
+ is >> ret;
+ return ret;
+}
- wsrep_args.data_dir = wsrep_data_home_dir;
- wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
- wsrep_args.node_address = node_addr;
- wsrep_args.node_incoming = inc_addr;
- wsrep_args.options = (wsrep_provider_options) ?
- wsrep_provider_options : "";
- wsrep_args.proto_ver = wsrep_max_protocol_version;
+/*
+ Intitialize provider specific status variables
+ */
+static void wsrep_init_provider_status_variables()
+{
+ const wsrep::provider& provider=
+ Wsrep_server_state::instance().provider();
+ strncpy(provider_name,
+ provider.name().c_str(), sizeof(provider_name) - 1);
+ strncpy(provider_version,
+ provider.version().c_str(), sizeof(provider_version) - 1);
+ strncpy(provider_vendor,
+ provider.vendor().c_str(), sizeof(provider_vendor) - 1);
+}
+
+int wsrep_init_server()
+{
+ wsrep::log::logger_fn(wsrep_log_cb);
+ try
+ {
+ std::string server_name;
+ std::string server_id;
+ std::string node_address;
+ std::string incoming_address;
+ std::string working_dir;
+ wsrep::gtid initial_position;
+
+ server_name= wsrep_server_name();
+ server_id= wsrep_server_id();
+ node_address= wsrep_server_node_address();
+ incoming_address= wsrep_server_incoming_address();
+ working_dir= wsrep_server_working_dir();
+ initial_position= wsrep_server_initial_position();
+
+ Wsrep_server_state::init_once(server_name,
+ incoming_address,
+ node_address,
+ working_dir,
+ initial_position,
+ wsrep_max_protocol_version);
+ }
+ catch (const wsrep::runtime_error& e)
+ {
+ WSREP_ERROR("Failed to init wsrep server %s", e.what());
+ return 1;
+ }
+ catch (const std::exception& e)
+ {
+ WSREP_ERROR("Failed to init wsrep server %s", e.what());
+ }
+ return 0;
+}
- wsrep_args.state_id = &state_id;
+void wsrep_init_globals()
+{
+ wsrep_init_sidno(Wsrep_server_state::instance().connected_gtid().id());
+ wsrep_init_schema();
+ if (WSREP_ON)
+ {
+ Wsrep_server_state::instance().initialized();
+ }
+}
- wsrep_args.logger_cb = wsrep_log_cb;
- wsrep_args.view_handler_cb = wsrep_view_handler_cb;
- wsrep_args.apply_cb = wsrep_apply_cb;
- wsrep_args.commit_cb = wsrep_commit_cb;
- wsrep_args.unordered_cb = wsrep_unordered_cb;
- wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
- wsrep_args.synced_cb = wsrep_synced_cb;
+void wsrep_deinit_server()
+{
+ wsrep_deinit_schema();
+ Wsrep_server_state::destroy();
+}
- rcode = wsrep->init(wsrep, &wsrep_args);
+int wsrep_init()
+{
+ assert(wsrep_provider);
- if (rcode)
+ wsrep_init_position();
+ wsrep_sst_auth_init();
+
+ if (strlen(wsrep_provider)== 0 ||
+ !strcmp(wsrep_provider, WSREP_NONE))
{
- DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
- WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
- wsrep->free(wsrep);
- free(wsrep);
- wsrep = NULL;
- } else {
- wsrep_inited= 1;
+ // enable normal operation in case no provider is specified
+ global_system_variables.wsrep_on= 0;
+ int err= Wsrep_server_state::instance().load_provider(wsrep_provider, wsrep_provider_options ? wsrep_provider_options : "");
+ if (err)
+ {
+ DBUG_PRINT("wsrep",("wsrep::init() failed: %d", err));
+ WSREP_ERROR("wsrep::init() failed: %d, must shutdown", err);
+ }
+ else
+ {
+ wsrep_init_provider_status_variables();
+ }
+ return err;
}
- return rcode;
-}
+ global_system_variables.wsrep_on= 1;
+
+ if (wsrep_gtid_mode && opt_bin_log && !opt_log_slave_updates)
+ {
+ WSREP_ERROR("Option --log-slave-updates is required if "
+ "binlog is enabled, GTID mode is on and wsrep provider "
+ "is specified");
+ return 1;
+ }
+
+ if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
+ wsrep_data_home_dir= mysql_real_data_home;
+
+ if (Wsrep_server_state::instance().load_provider(wsrep_provider,
+ wsrep_provider_options))
+ {
+ WSREP_ERROR("Failed to load provider");
+ return 1;
+ }
+
+ if (!wsrep_provider_is_SR_capable() &&
+ global_system_variables.wsrep_trx_fragment_size > 0)
+ {
+ WSREP_ERROR("The WSREP provider (%s) does not support streaming "
+ "replication but wsrep_trx_fragment_size is set to a "
+ "value other than 0 (%llu). Cannot continue. Either set "
+ "wsrep_trx_fragment_size to 0 or use wsrep_provider that "
+ "supports streaming replication.",
+ wsrep_provider, global_system_variables.wsrep_trx_fragment_size);
+ Wsrep_server_state::instance().unload_provider();
+ return 1;
+ }
+ wsrep_inited= 1;
+
+ wsrep_init_provider_status_variables();
+ wsrep_capabilities_export(Wsrep_server_state::instance().provider().capabilities(),
+ &wsrep_provider_capabilities);
+
+ WSREP_DEBUG("SR storage init for: %s",
+ (wsrep_SR_store_type == WSREP_SR_STORE_TABLE) ? "table" : "void");
+ return 0;
+}
/* Initialize wsrep thread LOCKs and CONDs */
void wsrep_thr_init()
{
DBUG_ENTER("wsrep_thr_init");
- wsrep_config_state = new wsp::Config_state;
+ wsrep_config_state= new wsp::Config_state;
#ifdef HAVE_PSI_INTERFACE
mysql_mutex_register("sql", wsrep_mutexes, array_elements(wsrep_mutexes));
mysql_cond_register("sql", wsrep_conds, array_elements(wsrep_conds));
@@ -801,25 +754,24 @@ void wsrep_thr_init()
mysql_cond_init(key_COND_wsrep_sst, &COND_wsrep_sst, NULL);
mysql_mutex_init(key_LOCK_wsrep_sst_init, &LOCK_wsrep_sst_init, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_sst_init, &COND_wsrep_sst_init, NULL);
- mysql_mutex_init(key_LOCK_wsrep_rollback, &LOCK_wsrep_rollback, MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_COND_wsrep_rollback, &COND_wsrep_rollback, NULL);
mysql_mutex_init(key_LOCK_wsrep_replaying, &LOCK_wsrep_replaying, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_replaying, &COND_wsrep_replaying, NULL);
mysql_mutex_init(key_LOCK_wsrep_slave_threads, &LOCK_wsrep_slave_threads, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_desync, &LOCK_wsrep_desync, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_LOCK_wsrep_config_state, &LOCK_wsrep_config_state, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_SR_pool,
+ &LOCK_wsrep_SR_pool, MY_MUTEX_INIT_FAST);
+ mysql_mutex_init(key_LOCK_wsrep_SR_store,
+ &LOCK_wsrep_SR_store, MY_MUTEX_INIT_FAST);
DBUG_VOID_RETURN;
}
-void wsrep_init_startup (bool first)
+void wsrep_init_startup (bool sst_first)
{
if (wsrep_init()) unireg_abort(1);
- wsrep_thr_lock_init(
- (wsrep_thd_is_brute_force_fun)wsrep_thd_is_BF,
- (wsrep_abort_thd_fun)wsrep_abort_thd,
- wsrep_debug, wsrep_convert_LOCK_to_trx,
- (wsrep_on_fun)wsrep_on);
+ wsrep_thr_lock_init(wsrep_thd_is_BF, wsrep_thd_bf_abort,
+ wsrep_debug, wsrep_convert_LOCK_to_trx, wsrep_on);
/*
Pre-initialize global_system_variables.table_plugin with a dummy engine
@@ -838,28 +790,54 @@ void wsrep_init_startup (bool first)
/* Skip replication start if no cluster address */
if (!wsrep_cluster_address || wsrep_cluster_address[0] == 0) return;
- if (first) wsrep_sst_grab(); // do it so we can wait for SST below
-
+ /*
+ Read value of wsrep_new_cluster before wsrep_start_replication(),
+ the value is reset to FALSE inside wsrep_start_replication.
+ */
if (!wsrep_start_replication()) unireg_abort(1);
wsrep_create_rollbacker();
wsrep_create_appliers(1);
- if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
+ Wsrep_server_state& server_state= Wsrep_server_state::instance();
+ /*
+ If the SST happens before server initialization, wait until the server
+ state reaches initializing. This indicates that
+ either SST was not necessary or SST has been delivered.
+
+ With mysqldump SST (!sst_first) wait until the server reaches
+ joiner state and procedd to accepting connections.
+ */
+ if (sst_first)
+ {
+ server_state.wait_until_state(Wsrep_server_state::s_initializing);
+ }
+ else
+ {
+ server_state.wait_until_state(Wsrep_server_state::s_joiner);
+ }
}
void wsrep_deinit(bool free_options)
{
DBUG_ASSERT(wsrep_inited == 1);
- wsrep_unload(wsrep);
- wsrep= 0;
+ WSREP_DEBUG("wsrep_deinit");
+
+ Wsrep_server_state::instance().unload_provider();
provider_name[0]= '\0';
provider_version[0]= '\0';
provider_vendor[0]= '\0';
wsrep_inited= 0;
+ if (wsrep_provider_capabilities != NULL)
+ {
+ char* p= wsrep_provider_capabilities;
+ wsrep_provider_capabilities= NULL;
+ free(p);
+ }
+
if (free_options)
{
wsrep_sst_auth_free();
@@ -871,28 +849,37 @@ void wsrep_thr_deinit()
{
if (!wsrep_config_state)
return; // Never initialized
+ WSREP_DEBUG("wsrep_thr_deinit");
mysql_mutex_destroy(&LOCK_wsrep_ready);
mysql_cond_destroy(&COND_wsrep_ready);
mysql_mutex_destroy(&LOCK_wsrep_sst);
mysql_cond_destroy(&COND_wsrep_sst);
mysql_mutex_destroy(&LOCK_wsrep_sst_init);
mysql_cond_destroy(&COND_wsrep_sst_init);
- mysql_mutex_destroy(&LOCK_wsrep_rollback);
- mysql_cond_destroy(&COND_wsrep_rollback);
mysql_mutex_destroy(&LOCK_wsrep_replaying);
mysql_cond_destroy(&COND_wsrep_replaying);
mysql_mutex_destroy(&LOCK_wsrep_slave_threads);
mysql_mutex_destroy(&LOCK_wsrep_desync);
mysql_mutex_destroy(&LOCK_wsrep_config_state);
+ mysql_mutex_destroy(&LOCK_wsrep_SR_pool);
+ mysql_mutex_destroy(&LOCK_wsrep_SR_store);
+
delete wsrep_config_state;
wsrep_config_state= 0; // Safety
+
+ if (wsrep_cluster_capabilities != NULL)
+ {
+ char* p= wsrep_cluster_capabilities;
+ wsrep_cluster_capabilities= NULL;
+ free(p);
+ }
}
void wsrep_recover()
{
char uuid_str[40];
- if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) &&
+ if (wsrep_uuid_compare(&local_uuid, &WSREP_UUID_UNDEFINED) == 0 &&
local_seqno == -2)
{
wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
@@ -900,43 +887,60 @@ void wsrep_recover()
uuid_str, (long long)local_seqno);
return;
}
- wsrep_uuid_t uuid;
- wsrep_seqno_t seqno;
- wsrep_get_SE_checkpoint(uuid, seqno);
- wsrep_uuid_print(&uuid, uuid_str, sizeof(uuid_str));
- WSREP_INFO("Recovered position: %s:%lld", uuid_str, (long long)seqno);
+ wsrep::gtid gtid= wsrep_get_SE_checkpoint();
+ std::ostringstream oss;
+ oss << gtid;
+ WSREP_INFO("Recovered position: %s", oss.str().c_str());
}
void wsrep_stop_replication(THD *thd)
{
WSREP_INFO("Stop replication");
- if (!wsrep)
+ if (Wsrep_server_state::instance().state() !=
+ Wsrep_server_state::s_disconnected)
{
- WSREP_INFO("Provider was not loaded, in stop replication");
- return;
+ WSREP_DEBUG("Disconnect provider");
+ Wsrep_server_state::instance().disconnect();
+ Wsrep_server_state::instance().wait_until_state(Wsrep_server_state::s_disconnected);
}
- /* disconnect from group first to get wsrep_ready == FALSE */
- WSREP_DEBUG("Provider disconnect");
- wsrep->disconnect(wsrep);
+ /* my connection, should not terminate with wsrep_close_client_connection(),
+ make transaction to rollback
+ */
+ if (thd && !thd->wsrep_applier) trans_rollback(thd);
+ wsrep_close_client_connections(TRUE, thd);
+
+ /* wait until appliers have stopped */
+ wsrep_wait_appliers_close(thd);
+
+ node_uuid= WSREP_UUID_UNDEFINED;
+}
- wsrep_connected= FALSE;
+void wsrep_shutdown_replication()
+{
+ WSREP_INFO("Shutdown replication");
+ if (Wsrep_server_state::instance().state() != wsrep::server_state::s_disconnected)
+ {
+ WSREP_DEBUG("Disconnect provider");
+ Wsrep_server_state::instance().disconnect();
+ Wsrep_server_state::instance().wait_until_state(Wsrep_server_state::s_disconnected);
+ }
wsrep_close_client_connections(TRUE);
/* wait until appliers have stopped */
- wsrep_wait_appliers_close(thd);
+ wsrep_wait_appliers_close(NULL);
+ node_uuid= WSREP_UUID_UNDEFINED;
- return;
+ /* Undocking the thread specific data. */
+ my_pthread_setspecific_ptr(THR_THD, NULL);
}
bool wsrep_start_replication()
{
- wsrep_status_t rcode;
-
- /* wsrep provider must be loaded. */
- DBUG_ASSERT(wsrep);
+ int rcode;
+ WSREP_DEBUG("wsrep_start_replication");
/*
if provider is trivial, don't even try to connect,
@@ -945,34 +949,27 @@ bool wsrep_start_replication()
if (!WSREP_PROVIDER_EXISTS)
{
// enable normal operation in case no provider is specified
- wsrep_ready_set(TRUE);
return true;
}
if (!wsrep_cluster_address || wsrep_cluster_address[0]== 0)
{
// if provider is non-trivial, but no address is specified, wait for address
- wsrep_ready_set(FALSE);
return true;
}
- bool const bootstrap= wsrep_new_cluster;
+ bool const bootstrap(TRUE == wsrep_new_cluster);
+ wsrep_new_cluster= FALSE;
WSREP_INFO("Start replication");
- if (wsrep_new_cluster)
+ if ((rcode= Wsrep_server_state::instance().connect(
+ wsrep_cluster_name,
+ wsrep_cluster_address,
+ wsrep_sst_donor,
+ bootstrap)))
{
- WSREP_INFO("'wsrep-new-cluster' option used, bootstrapping the cluster");
- wsrep_new_cluster= false;
- }
-
- if ((rcode = wsrep->connect(wsrep,
- wsrep_cluster_name,
- wsrep_cluster_address,
- wsrep_sst_donor,
- bootstrap)))
- {
- DBUG_PRINT("wsrep",("wsrep->connect(%s) failed: %d",
+ DBUG_PRINT("wsrep",("wsrep_ptr->connect(%s) failed: %d",
wsrep_cluster_address, rcode));
WSREP_ERROR("wsrep::connect(%s) failed: %d",
wsrep_cluster_address, rcode);
@@ -980,15 +977,12 @@ bool wsrep_start_replication()
}
else
{
- wsrep_connected= TRUE;
-
- char* opts= wsrep->options_get(wsrep);
- if (opts)
+ try
{
- wsrep_provider_options_init(opts);
- free(opts);
+ std::string opts= Wsrep_server_state::instance().provider().options();
+ wsrep_provider_options_init(opts.c_str());
}
- else
+ catch (const wsrep::runtime_error&)
{
WSREP_WARN("Failed to get wsrep options");
}
@@ -999,40 +993,50 @@ bool wsrep_start_replication()
bool wsrep_must_sync_wait (THD* thd, uint mask)
{
- return (thd->variables.wsrep_sync_wait & mask) &&
+ bool ret;
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ ret= (thd->variables.wsrep_sync_wait & mask) &&
+ thd->wsrep_client_thread &&
thd->variables.wsrep_on &&
!(thd->variables.wsrep_dirty_reads &&
!is_update_query(thd->lex->sql_command)) &&
!thd->in_active_multi_stmt_transaction() &&
- thd->wsrep_conflict_state != REPLAYING &&
- thd->wsrep_sync_wait_gtid.seqno == WSREP_SEQNO_UNDEFINED;
+ thd->wsrep_trx().state() !=
+ wsrep::transaction::s_replaying &&
+ thd->wsrep_cs().sync_wait_gtid().is_undefined();
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ return ret;
}
bool wsrep_sync_wait (THD* thd, uint mask)
{
if (wsrep_must_sync_wait(thd, mask))
{
- WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u",
- thd->variables.wsrep_sync_wait, mask);
- // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
- // TODO: modify to check if thd has locked any rows.
- wsrep_status_t ret= wsrep->causal_read (wsrep, &thd->wsrep_sync_wait_gtid);
-
- if (unlikely(WSREP_OK != ret))
+ WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait= %u, "
+ "mask= %u, thd->variables.wsrep_on= %d",
+ thd->variables.wsrep_sync_wait, mask,
+ thd->variables.wsrep_on);
+ /*
+ This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
+ TODO: modify to check if thd has locked any rows.
+ */
+ if (thd->wsrep_cs().sync_wait(-1))
{
const char* msg;
int err;
- // Possibly relevant error codes:
- // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
- // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
- // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
+ /*
+ Possibly relevant error codes:
+ ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
+ ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
+ ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
+ */
- switch (ret)
+ switch (thd->wsrep_cs().current_error())
{
- case WSREP_NOT_IMPLEMENTED:
+ case wsrep::e_not_supported_error:
msg= "synchronous reads by wsrep backend. "
- "Please unset wsrep_causal_reads variable.";
+ "Please unset wsrep_causal_reads variable.";
err= ER_NOT_SUPPORTED_YET;
break;
default:
@@ -1050,6 +1054,27 @@ bool wsrep_sync_wait (THD* thd, uint mask)
return false;
}
+enum wsrep::provider::status
+wsrep_sync_wait_upto (THD* thd,
+ wsrep_gtid_t* upto,
+ int timeout)
+{
+ DBUG_ASSERT(upto);
+ enum wsrep::provider::status ret;
+ if (upto)
+ {
+ wsrep::gtid upto_gtid(wsrep::id(upto->uuid.data, sizeof(upto->uuid.data)),
+ wsrep::seqno(upto->seqno));
+ ret= Wsrep_server_state::instance().wait_for_gtid(upto_gtid, timeout);
+ }
+ else
+ {
+ ret= Wsrep_server_state::instance().causal_read(timeout).second;
+ }
+ WSREP_DEBUG("wsrep_sync_wait_upto: %d", ret);
+ return ret;
+}
+
void wsrep_keys_free(wsrep_key_arr_t* key_arr)
{
for (size_t i= 0; i < key_arr->keys_len; ++i)
@@ -1061,7 +1086,6 @@ void wsrep_keys_free(wsrep_key_arr_t* key_arr)
key_arr->keys_len= 0;
}
-
/*!
* @param db Database string
* @param table Table string
@@ -1073,9 +1097,9 @@ void wsrep_keys_free(wsrep_key_arr_t* key_arr)
*/
static bool wsrep_prepare_key_for_isolation(const char* db,
- const char* table,
- wsrep_buf_t* key,
- size_t* key_len)
+ const char* table,
+ wsrep_buf_t* key,
+ size_t* key_len)
{
if (*key_len < 2) return false;
@@ -1087,11 +1111,11 @@ static bool wsrep_prepare_key_for_isolation(const char* db,
case 1:
case 2:
case 3:
+ case 4:
{
*key_len= 0;
if (db)
{
- // sql_print_information("%s.%s", db, table);
key[*key_len].ptr= db;
key[*key_len].len= strlen(db);
++(*key_len);
@@ -1105,26 +1129,23 @@ static bool wsrep_prepare_key_for_isolation(const char* db,
break;
}
default:
+ assert(0);
+ WSREP_ERROR("Unsupported protocol version: %ld", wsrep_protocol_version);
+ unireg_abort(1);
return false;
}
- return true;
-}
+ return true;
+}
static bool wsrep_prepare_key_for_isolation(const char* db,
const char* table,
wsrep_key_arr_t* ka)
{
wsrep_key_t* tmp;
-
- if (!ka->keys)
- tmp= (wsrep_key_t*)my_malloc((ka->keys_len + 1) * sizeof(wsrep_key_t),
- MYF(0));
- else
- tmp= (wsrep_key_t*)my_realloc(ka->keys,
- (ka->keys_len + 1) * sizeof(wsrep_key_t),
- MYF(0));
-
+ tmp= (wsrep_key_t*)my_realloc(ka->keys,
+ (ka->keys_len + 1) * sizeof(wsrep_key_t),
+ MYF(MY_ALLOW_ZERO_PTR));
if (!tmp)
{
WSREP_ERROR("Can't allocate memory for key_array");
@@ -1150,7 +1171,6 @@ static bool wsrep_prepare_key_for_isolation(const char* db,
return true;
}
-
static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
Alter_info* alter_info,
wsrep_key_arr_t* ka)
@@ -1177,7 +1197,6 @@ static bool wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
return true;
}
-
static bool wsrep_prepare_keys_for_isolation(THD* thd,
const char* db,
const char* table,
@@ -1205,16 +1224,19 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
if (!wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info, ka))
goto err;
}
-
return false;
err:
- wsrep_keys_free(ka);
- return true;
+ wsrep_keys_free(ka);
+ return true;
}
+/*
+ * Prepare key list from db/table and table_list
+ *
+ * Return zero in case of success, 1 in case of failure.
+ */
-/* Prepare key list from db/table and table_list */
bool wsrep_prepare_keys_for_isolation(THD* thd,
const char* db,
const char* table,
@@ -1224,7 +1246,6 @@ bool wsrep_prepare_keys_for_isolation(THD* thd,
return wsrep_prepare_keys_for_isolation(thd, db, table, table_list, NULL, ka);
}
-
bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len,
const uchar* row_id, size_t row_id_len,
wsrep_buf_t* key, size_t* key_len)
@@ -1236,37 +1257,110 @@ bool wsrep_prepare_key(const uchar* cache_key, size_t cache_key_len,
{
case 0:
{
- key[0].ptr = cache_key;
- key[0].len = cache_key_len;
+ key[0].ptr= cache_key;
+ key[0].len= cache_key_len;
- *key_len = 1;
+ *key_len= 1;
break;
}
case 1:
case 2:
case 3:
+ case 4:
{
- key[0].ptr = cache_key;
- key[0].len = strlen( (char*)cache_key );
+ key[0].ptr= cache_key;
+ key[0].len= strlen( (char*)cache_key );
- key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
- key[1].len = strlen( (char*)(key[1].ptr) );
+ key[1].ptr= cache_key + strlen( (char*)cache_key ) + 1;
+ key[1].len= strlen( (char*)(key[1].ptr) );
- *key_len = 2;
+ *key_len= 2;
break;
}
default:
return false;
}
- key[*key_len].ptr = row_id;
- key[*key_len].len = row_id_len;
+ key[*key_len].ptr= row_id;
+ key[*key_len].len= row_id_len;
++(*key_len);
return true;
}
+bool wsrep_prepare_key_for_innodb(THD* thd,
+ const uchar* cache_key,
+ size_t cache_key_len,
+ const uchar* row_id,
+ size_t row_id_len,
+ wsrep_buf_t* key,
+ size_t* key_len)
+{
+
+ return wsrep_prepare_key(cache_key, cache_key_len, row_id, row_id_len, key, key_len);
+}
+
+wsrep::key wsrep_prepare_key_for_toi(const char* db, const char* table,
+ enum wsrep::key::type type)
+{
+ wsrep::key ret(type);
+ DBUG_ASSERT(db);
+ ret.append_key_part(db, strlen(db));
+ if (table) ret.append_key_part(table, strlen(table));
+ return ret;
+}
+wsrep::key_array
+wsrep_prepare_keys_for_alter_add_fk(const char* child_table_db,
+ Alter_info* alter_info)
+
+{
+ wsrep::key_array ret;
+ Key *key;
+ List_iterator<Key> key_iterator(alter_info->key_list);
+ while ((key= key_iterator++))
+ {
+ if (key->type == Key::FOREIGN_KEY)
+ {
+ Foreign_key *fk_key= (Foreign_key *)key;
+ const char *db_name= fk_key->ref_db.str;
+ const char *table_name= fk_key->ref_table.str;
+ if (!db_name)
+ {
+ db_name= child_table_db;
+ }
+ ret.push_back(wsrep_prepare_key_for_toi(db_name, table_name,
+ wsrep::key::exclusive));
+ }
+ }
+ return ret;
+}
+
+wsrep::key_array wsrep_prepare_keys_for_toi(const char* db,
+ const char* table,
+ const TABLE_LIST* table_list,
+ Alter_info* alter_info)
+{
+ wsrep::key_array ret;
+ if (db || table)
+ {
+ ret.push_back(wsrep_prepare_key_for_toi(db, table, wsrep::key::exclusive));
+ }
+ for (const TABLE_LIST* table= table_list; table; table= table->next_global)
+ {
+ ret.push_back(wsrep_prepare_key_for_toi(table->db.str, table->table_name.str,
+ wsrep::key::exclusive));
+ }
+ if (alter_info && (alter_info->flags & ALTER_ADD_FOREIGN_KEY))
+ {
+ wsrep::key_array fk(wsrep_prepare_keys_for_alter_add_fk(table_list->db.str, alter_info));
+ if (!fk.empty())
+ {
+ ret.insert(ret.end(), fk.begin(), fk.end());
+ }
+ }
+ return ret;
+}
/*
* Construct Query_log_Event from thd query and serialize it
* into buffer.
@@ -1277,7 +1371,7 @@ int wsrep_to_buf_helper(
THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
{
IO_CACHE tmp_io_cache;
- Log_event_writer writer(&tmp_io_cache,0);
+ Log_event_writer writer(&tmp_io_cache, 0);
if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
65536, MYF(MY_WME)))
return 1;
@@ -1365,7 +1459,7 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len)
LEX *lex= thd->lex;
SELECT_LEX *select_lex= lex->first_select_lex();
TABLE_LIST *first_table= select_lex->table_list.first;
- TABLE_LIST *views = first_table;
+ TABLE_LIST *views= first_table;
LEX_USER *definer;
String buff;
const LEX_CSTRING command[3]=
@@ -1390,16 +1484,16 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len)
if (definer)
{
- views->definer.user = definer->user;
- views->definer.host = definer->host;
+ views->definer.user= definer->user;
+ views->definer.host= definer->host;
} else {
WSREP_ERROR("Failed to get DEFINER for VIEW.");
return 1;
}
- views->algorithm = lex->create_view->algorithm;
- views->view_suid = lex->create_view->suid;
- views->with_check = lex->create_view->check;
+ views->algorithm = lex->create_view->algorithm;
+ views->view_suid = lex->create_view->suid;
+ views->with_check = lex->create_view->check;
view_store_options(thd, views, &buff);
buff.append(STRING_WITH_LEN("VIEW "));
@@ -1425,12 +1519,8 @@ create_view_query(THD *thd, uchar** buf, size_t* buf_len)
buff.append(')');
}
buff.append(STRING_WITH_LEN(" AS "));
- //buff.append(views->source.str, views->source.length);
buff.append(thd->lex->create_view->select.str,
thd->lex->create_view->select.length);
- //int errcode= query_error_code(thd, TRUE);
- //if (thd->binlog_query(THD::STMT_QUERY_TYPE,
- // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
}
@@ -1496,8 +1586,7 @@ static int wsrep_drop_table_query(THD* thd, uchar** buf, size_t* buf_len)
/* Forward declarations. */
-static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len);
-static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
+int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len);
/*
Decide if statement should run in TOI.
@@ -1577,6 +1666,7 @@ static bool wsrep_can_run_in_toi(THD *thd, const char *db, const char *table,
}
}
+#if UNUSED /* 323f269d4099 (Jan Lindström 2018-07-19) */
static const char* wsrep_get_query_or_msg(const THD* thd)
{
switch(thd->lex->sql_command)
@@ -1589,58 +1679,70 @@ static const char* wsrep_get_query_or_msg(const THD* thd)
return "REVOKE";
case SQLCOM_SET_OPTION:
if (thd->lex->definer)
- return "SET PASSWORD";
+ return "SET PASSWORD";
/* fallthrough */
default:
return thd->query();
}
}
+#endif //UNUSED
-/*
- returns:
- 0: statement was replicated as TOI
- 1: TOI replication was skipped
- -1: TOI replication failed
- */
-static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_,
- const TABLE_LIST* table_list,
- Alter_info* alter_info)
+static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
{
- wsrep_status_t ret(WSREP_WARNING);
- uchar* buf(0);
- size_t buf_len(0);
- int buf_err;
- int rc= 0;
+ String log_query;
+ sp_head *sp= thd->lex->sphead;
+ sql_mode_t saved_mode= thd->variables.sql_mode;
+ String retstr(64);
+ LEX_CSTRING returns= empty_clex_str;
+ retstr.set_charset(system_charset_info);
- if (wsrep_can_run_in_toi(thd, db_, table_, table_list) == false)
+ log_query.set_charset(system_charset_info);
+
+ if (sp->m_handler->type() == TYPE_ENUM_FUNCTION)
{
- WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
+ sp_returns_type(thd, retstr, sp);
+ returns= retstr.lex_cstring();
+ }
+ if (sp->m_handler->
+ show_create_sp(thd, &log_query,
+ sp->m_explicit_name ? sp->m_db : null_clex_str,
+ sp->m_name, sp->m_params, returns,
+ sp->m_body, sp->chistics(),
+ thd->lex->definer[0],
+ thd->lex->create_info,
+ saved_mode))
+ {
+ WSREP_WARN("SP create string failed: schema: %s, query: %s",
+ thd->get_db(), thd->query());
return 1;
}
- WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
- thd->wsrep_exec_mode, wsrep_get_query_or_msg(thd));
+ return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
+}
+static int wsrep_TOI_event_buf(THD* thd, uchar** buf, size_t* buf_len)
+{
+ int err;
switch (thd->lex->sql_command)
{
case SQLCOM_CREATE_VIEW:
- buf_err= create_view_query(thd, &buf, &buf_len);
+ err= create_view_query(thd, buf, buf_len);
break;
case SQLCOM_CREATE_PROCEDURE:
case SQLCOM_CREATE_SPFUNCTION:
- buf_err= wsrep_create_sp(thd, &buf, &buf_len);
+ err= wsrep_create_sp(thd, buf, buf_len);
break;
case SQLCOM_CREATE_TRIGGER:
- buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len);
+ err= wsrep_create_trigger_query(thd, buf, buf_len);
break;
case SQLCOM_CREATE_EVENT:
- buf_err= wsrep_create_event_query(thd, &buf, &buf_len);
+ err= wsrep_create_event_query(thd, buf, buf_len);
break;
case SQLCOM_ALTER_EVENT:
- buf_err= wsrep_alter_event_query(thd, &buf, &buf_len);
+ err= wsrep_alter_event_query(thd, buf, buf_len);
break;
case SQLCOM_DROP_TABLE:
- buf_err= wsrep_drop_table_query(thd, &buf, &buf_len);
+ err= wsrep_drop_table_query(thd, buf, buf_len);
break;
case SQLCOM_CREATE_ROLE:
if (sp_process_definer(thd))
@@ -1649,169 +1751,212 @@ static int wsrep_TOI_begin(THD *thd, const char *db_, const char *table_,
}
/* fallthrough */
default:
- buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(),
- &buf, &buf_len);
+ err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), buf,
+ buf_len);
break;
}
- wsrep_key_arr_t key_arr= {0, 0};
- struct wsrep_buf buff = { buf, buf_len };
- if (!buf_err &&
- !wsrep_prepare_keys_for_isolation(thd, db_, table_,
- table_list, alter_info, &key_arr) &&
- key_arr.keys_len > 0 &&
- WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
- key_arr.keys, key_arr.keys_len,
- &buff, 1,
- &thd->wsrep_trx_meta)))
- {
- thd->wsrep_exec_mode= TOTAL_ORDER;
- wsrep_to_isolation++;
- wsrep_keys_free(&key_arr);
- WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
- thd->wsrep_exec_mode);
- }
- else if (key_arr.keys_len > 0) {
- /* jump to error handler in mysql_execute_command() */
- WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. Check wsrep "
- "connection state and retry the query.",
- ret,
- thd->get_db(),
- (thd->query()) ? thd->query() : "void");
- my_message(ER_LOCK_DEADLOCK, "WSREP replication failed. Check "
- "your wsrep connection state and retry the query.", MYF(0));
- wsrep_keys_free(&key_arr);
- rc= -1;
- }
- else {
- /* non replicated DDL, affecting temporary tables only */
- WSREP_DEBUG("TO isolation skipped for: %d, sql: %s."
- "Only temporary tables affected.",
- ret, (thd->query()) ? thd->query() : "void");
- rc= 1;
+ return err;
+}
+
+static void wsrep_TOI_begin_failed(THD* thd, const wsrep_buf_t* /* const err */)
+{
+ if (wsrep_thd_trx_seqno(thd) > 0)
+ {
+ /* GTID was granted and TO acquired - need to log event and release TO */
+ if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);
+ if (wsrep_write_dummy_event(thd, "TOI begin failed")) { goto fail; }
+ wsrep::client_state& cs(thd->wsrep_cs());
+ int const ret= cs.leave_toi();
+ if (ret)
+ {
+ WSREP_ERROR("Leaving critical section for failed TOI failed: thd: %lld, "
+ "schema: %s, SQL: %s, rcode: %d wsrep_error: %s",
+ (long long)thd->real_id, thd->db.str,
+ thd->query(), ret, wsrep::to_c_string(cs.current_error()));
+ goto fail;
+ }
}
- if (buf) my_free(buf);
- return rc;
+ return;
+fail:
+ WSREP_ERROR("Failed to release TOI resources. Need to abort.");
+ unireg_abort(1);
}
-static void wsrep_TOI_end(THD *thd) {
- wsrep_status_t ret;
- wsrep_to_isolation--;
- WSREP_DEBUG("TO END: %lld, %d: %s", (long long)wsrep_thd_trx_seqno(thd),
- thd->wsrep_exec_mode, wsrep_get_query_or_msg(thd));
+/*
+ returns:
+ 0: statement was replicated as TOI
+ 1: TOI replication was skipped
+ -1: TOI replication failed
+ */
+static int wsrep_TOI_begin(THD *thd, const char *db, const char *table,
+ const TABLE_LIST* table_list,
+ Alter_info* alter_info)
+{
+ DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_TOI);
- wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid,
- thd->wsrep_trx_meta.gtid.seqno);
- WSREP_DEBUG("TO END: %lld, update seqno",
- (long long)wsrep_thd_trx_seqno(thd));
-
- if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
- WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
+ WSREP_DEBUG("TOI Begin");
+ if (wsrep_can_run_in_toi(thd, db, table, table_list) == false)
+ {
+ WSREP_DEBUG("No TOI for %s", WSREP_QUERY(thd));
+ return 1;
}
- else {
- WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s",
- ret,
- thd->get_db(),
- (thd->query()) ? thd->query() : "void");
+
+ uchar* buf= 0;
+ size_t buf_len(0);
+ int buf_err;
+ int rc;
+
+ buf_err= wsrep_TOI_event_buf(thd, &buf, &buf_len);
+ if (buf_err) {
+ WSREP_ERROR("Failed to create TOI event buf: %d", buf_err);
+ my_message(ER_UNKNOWN_ERROR,
+ "WSREP replication failed to prepare TOI event buffer. "
+ "Check your query.",
+ MYF(0));
+ return -1;
}
-}
+ struct wsrep_buf buff= { buf, buf_len };
-static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_)
-{
- wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
- thd->wsrep_exec_mode, thd->query() );
+ wsrep::key_array key_array=
+ wsrep_prepare_keys_for_toi(db, table, table_list, alter_info);
- ret = wsrep->desync(wsrep);
- if (ret != WSREP_OK)
+ if (thd->has_read_only_protection())
{
- WSREP_WARN("RSU desync failed %d for schema: %s, query: %s",
- ret, thd->get_db(), thd->query());
- my_error(ER_LOCK_DEADLOCK, MYF(0));
- return(ret);
+ /* non replicated DDL, affecting temporary tables only */
+ WSREP_DEBUG("TO isolation skipped, sql: %s."
+ "Only temporary tables affected.",
+ WSREP_QUERY(thd));
+ if (buf) my_free(buf);
+ return -1;
}
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying++;
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ thd_proc_info(thd, "acquiring total order isolation");
- if (wsrep_wait_committing_connections_close(5000))
+ wsrep::client_state& cs(thd->wsrep_cs());
+ int ret= cs.enter_toi(key_array,
+ wsrep::const_buffer(buff.ptr, buff.len),
+ wsrep::provider::flag::start_transaction |
+ wsrep::provider::flag::commit);
+
+ if (ret)
{
- /* no can do, bail out from DDL */
- WSREP_WARN("RSU failed due to pending transactions, schema: %s, query %s",
- thd->get_db(), thd->query());
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying--;
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ DBUG_ASSERT(cs.current_error());
+ WSREP_DEBUG("to_execute_start() failed for %llu: %s, seqno: %lld",
+ thd->thread_id, WSREP_QUERY(thd),
+ (long long)wsrep_thd_trx_seqno(thd));
- ret = wsrep->resync(wsrep);
- if (ret != WSREP_OK)
+ /* jump to error handler in mysql_execute_command() */
+ switch (cs.current_error())
{
- WSREP_WARN("resync failed %d for schema: %s, query: %s",
- ret, thd->get_db(), thd->query());
+ case wsrep::e_size_exceeded_error:
+ WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
+ "Maximum size exceeded.",
+ ret,
+ (thd->db.str ? thd->db.str : "(null)"),
+ WSREP_QUERY(thd));
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), WSREP_SIZE_EXCEEDED);
+ break;
+ default:
+ WSREP_WARN("TO isolation failed for: %d, schema: %s, sql: %s. "
+ "Check wsrep connection state and retry the query.",
+ ret,
+ (thd->db.str ? thd->db.str : "(null)"),
+ WSREP_QUERY(thd));
+ if (!thd->is_error())
+ {
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check "
+ "your wsrep connection state and retry the query.");
+ }
}
-
- my_error(ER_LOCK_DEADLOCK, MYF(0));
- return(1);
+ rc= -1;
}
-
- wsrep_seqno_t seqno = wsrep->pause(wsrep);
- if (seqno == WSREP_SEQNO_UNDEFINED)
- {
- WSREP_WARN("pause failed %lld for schema: %s, query: %s", (long long)seqno,
- thd->get_db(), thd->query());
- return(1);
+ else {
+ ++wsrep_to_isolation;
+ rc= 0;
}
- WSREP_DEBUG("paused at %lld", (long long)seqno);
- thd->variables.wsrep_on = 0;
- return 0;
-}
-static void wsrep_RSU_end(THD *thd)
-{
- wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
- thd->wsrep_exec_mode, thd->query() );
+ if (buf) my_free(buf);
+ if (rc) wsrep_TOI_begin_failed(thd, NULL);
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying--;
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ return rc;
+}
- ret = wsrep->resume(wsrep);
- if (ret != WSREP_OK)
+static void wsrep_TOI_end(THD *thd) {
+ int ret;
+ wsrep_to_isolation--;
+ wsrep::client_state& client_state(thd->wsrep_cs());
+ DBUG_ASSERT(wsrep_thd_is_local_toi(thd));
+ WSREP_DEBUG("TO END: %lld: %s", client_state.toi_meta().seqno().get(),
+ WSREP_QUERY(thd));
+
+ if (wsrep_thd_is_local_toi(thd))
{
- WSREP_WARN("resume failed %d for schema: %s, query: %s", ret,
- thd->get_db(), thd->query());
+ wsrep_set_SE_checkpoint(client_state.toi_meta().gtid());
+ if (thd->is_error() && !wsrep_must_ignore_error(thd))
+ {
+ wsrep_apply_error err;
+ err.store(thd);
+ client_state.leave_toi();
+ }
+ else
+ {
+ ret= client_state.leave_toi();
+ }
+
+ if (ret == 0)
+ {
+ WSREP_DEBUG("TO END: %lld", client_state.toi_meta().seqno().get());
+ }
+ else
+ {
+ WSREP_WARN("TO isolation end failed for: %d, schema: %s, sql: %s",
+ ret, (thd->db.str ? thd->db.str : "(null)"), WSREP_QUERY(thd));
+ }
}
+}
- ret = wsrep->resync(wsrep);
- if (ret != WSREP_OK)
+static int wsrep_RSU_begin(THD *thd, const char *db_, const char *table_)
+{
+ WSREP_DEBUG("RSU BEGIN: %lld, : %s", wsrep_thd_trx_seqno(thd),
+ WSREP_QUERY(thd));
+ if (thd->wsrep_cs().begin_rsu(5000))
{
- WSREP_WARN("resync failed %d for schema: %s, query: %s", ret,
- thd->get_db(), thd->query());
- return;
+ WSREP_WARN("RSU begin failed");
+ }
+ else
+ {
+ thd->variables.wsrep_on= 0;
}
+ return 0;
+}
- thd->variables.wsrep_on = 1;
+static void wsrep_RSU_end(THD *thd)
+{
+ WSREP_DEBUG("RSU END: %lld : %s", wsrep_thd_trx_seqno(thd),
+ WSREP_QUERY(thd));
+ if (thd->wsrep_cs().end_rsu())
+ {
+ WSREP_WARN("Failed to end RSU, server may need to be restarted");
+ }
+ thd->variables.wsrep_on= 1;
}
int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
const TABLE_LIST* table_list,
Alter_info* alter_info)
{
- int ret= 0;
-
/*
No isolation for applier or replaying threads.
*/
- if (thd->wsrep_exec_mode == REPL_RECV)
- return 0;
+ if (!wsrep_thd_is_local(thd)) return 0;
+ int ret= 0;
mysql_mutex_lock(&thd->LOCK_thd_data);
- if (thd->wsrep_conflict_state == MUST_ABORT)
+ if (thd->wsrep_trx().state() == wsrep::transaction::s_must_abort)
{
WSREP_INFO("thread: %lld schema: %s query: %s has been aborted due to multi-master conflict",
(longlong) thd->thread_id, thd->get_db(), thd->query());
@@ -1820,20 +1965,20 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
}
mysql_mutex_unlock(&thd->LOCK_thd_data);
- DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
- DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
+ DBUG_ASSERT(wsrep_thd_is_local(thd));
+ DBUG_ASSERT(thd->wsrep_trx().ws_meta().seqno().is_undefined());
- if (thd->has_read_only_protection())
+ if (thd->global_read_lock.is_acquired())
{
- WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %lld",
- thd->query(), (longlong) thd->thread_id);
+ WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %llu",
+ WSREP_QUERY(thd), thd->thread_id);
return -1;
}
if (wsrep_debug && thd->mdl_context.has_locks())
{
- WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lld",
- thd->query(), (longlong) thd->thread_id);
+ WSREP_DEBUG("thread holds MDL locks at TI begin: %s %llu",
+ WSREP_QUERY(thd), thd->thread_id);
}
/*
@@ -1845,11 +1990,11 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
*/
if (wsrep_auto_increment_control)
{
- thd->variables.auto_increment_offset = 1;
- thd->variables.auto_increment_increment = 1;
+ thd->variables.auto_increment_offset= 1;
+ thd->variables.auto_increment_increment= 1;
}
- if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
+ if (thd->variables.wsrep_on && wsrep_thd_is_local(thd))
{
switch (thd->variables.wsrep_OSU_method) {
case WSREP_OSU_TOI:
@@ -1865,48 +2010,53 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
break;
}
switch (ret) {
- case 0: thd->wsrep_exec_mode= TOTAL_ORDER; break;
+ case 0: /* wsrep_TOI_begin sould set toi mode */ break;
case 1:
/* TOI replication skipped, treat as success */
- ret = 0;
+ ret= 0;
break;
case -1:
/* TOI replication failed, treat as error */
break;
}
}
+
return ret;
}
void wsrep_to_isolation_end(THD *thd)
{
- if (thd->wsrep_exec_mode == TOTAL_ORDER)
+ DBUG_ASSERT(wsrep_thd_is_local_toi(thd) ||
+ wsrep_thd_is_in_rsu(thd));
+ if (wsrep_thd_is_local_toi(thd))
{
- switch(thd->variables.wsrep_OSU_method)
- {
- case WSREP_OSU_TOI: wsrep_TOI_end(thd); break;
- case WSREP_OSU_RSU: wsrep_RSU_end(thd); break;
- default:
- WSREP_WARN("Unsupported wsrep OSU method at isolation end: %lu",
- thd->variables.wsrep_OSU_method);
- break;
- }
- wsrep_cleanup_transaction(thd);
+ DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_TOI);
+ wsrep_TOI_end(thd);
}
+ else if (wsrep_thd_is_in_rsu(thd))
+ {
+ DBUG_ASSERT(thd->variables.wsrep_OSU_method == WSREP_OSU_RSU);
+ wsrep_RSU_end(thd);
+ }
+ else
+ {
+ DBUG_ASSERT(0);
+ }
+ if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);
}
#define WSREP_MDL_LOG(severity, msg, schema, schema_len, req, gra) \
WSREP_##severity( \
"%s\n" \
"schema: %.*s\n" \
- "request: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
- "granted: (%lld \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
+ "request: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)\n" \
+ "granted: (%llu \tseqno %lld \twsrep (%s, %s, %s) cmd %d %d \t%s)", \
msg, schema_len, schema, \
- (longlong) req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
- req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
+ req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
+ wsrep_thd_client_mode_str(req), wsrep_thd_client_state_str(req), wsrep_thd_transaction_state_str(req), \
req->get_command(), req->lex->sql_command, req->query(), \
- (longlong) gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
- gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
+ gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
+ wsrep_thd_client_mode_str(gra), wsrep_thd_client_state_str(gra), wsrep_thd_transaction_state_str(gra), \
gra->get_command(), gra->lex->sql_command, gra->query());
/**
@@ -1919,58 +2069,47 @@ void wsrep_to_isolation_end(THD *thd)
@retval FALSE Lock request cannot be granted
*/
-bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
+void wsrep_handle_mdl_conflict(MDL_context *requestor_ctx,
MDL_ticket *ticket,
const MDL_key *key)
{
/* Fallback to the non-wsrep behaviour */
- if (!WSREP_ON) return FALSE;
+ if (!WSREP_ON) return;
THD *request_thd= requestor_ctx->get_thd();
THD *granted_thd= ticket->get_ctx()->get_thd();
- bool ret= false;
const char* schema= key->db_name();
int schema_len= key->db_name_length();
mysql_mutex_lock(&request_thd->LOCK_thd_data);
+ if (wsrep_thd_is_toi(request_thd) ||
+ wsrep_thd_is_applying(request_thd)) {
- /*
- We consider granting MDL exceptions only for appliers (BF THD) and ones
- executing under TOI mode.
-
- Rules:
- 1. If granted/owner THD is also an applier (BF THD) or one executing
- under TOI mode, then we grant the requested lock to the requester
- THD.
- @return true
-
- 2. If granted/owner THD is executing a FLUSH command or already has an
- explicit lock, then do not grant the requested lock to the requester
- THD and it has to wait.
- @return false
-
- 3. In all other cases the granted/owner THD is aborted and the requested
- lock is not granted to the requester THD, thus it has to wait.
- @return false
- */
- if (request_thd->wsrep_exec_mode == TOTAL_ORDER ||
- request_thd->wsrep_exec_mode == REPL_RECV)
- {
mysql_mutex_unlock(&request_thd->LOCK_thd_data);
WSREP_MDL_LOG(DEBUG, "MDL conflict ", schema, schema_len,
request_thd, granted_thd);
ticket->wsrep_report(wsrep_debug);
mysql_mutex_lock(&granted_thd->LOCK_thd_data);
- if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
- granted_thd->wsrep_exec_mode == REPL_RECV)
+ if (wsrep_thd_is_toi(granted_thd) ||
+ wsrep_thd_is_applying(granted_thd))
{
- WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len,
- request_thd, granted_thd);
- ticket->wsrep_report(true);
- mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
- ret= true;
+ if (wsrep_thd_is_SR(granted_thd) && !wsrep_thd_is_SR(request_thd))
+ {
+ WSREP_MDL_LOG(INFO, "MDL conflict, DDL vs SR",
+ schema, schema_len, request_thd, granted_thd);
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
+ }
+ else
+ {
+ WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", schema, schema_len,
+ request_thd, granted_thd);
+ ticket->wsrep_report(true);
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ unireg_abort(1);
+ }
}
else if (granted_thd->lex->sql_command == SQLCOM_FLUSH ||
granted_thd->mdl_context.has_explicit_locks())
@@ -1978,173 +2117,57 @@ bool wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
WSREP_DEBUG("BF thread waiting for FLUSH");
ticket->wsrep_report(wsrep_debug);
mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
- ret= false;
+ }
+ else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
+ {
+ WSREP_DEBUG("DROP caused BF abort, conf %s",
+ wsrep_thd_transaction_state_str(granted_thd));
+ ticket->wsrep_report(wsrep_debug);
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
}
else
{
- /* Print some debug information. */
- if (wsrep_debug)
+ WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
+ request_thd, granted_thd);
+ ticket->wsrep_report(wsrep_debug);
+ if (granted_thd->wsrep_trx().active())
{
- if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE ||
- request_thd->lex->sql_command == SQLCOM_DROP_SEQUENCE)
- {
- WSREP_DEBUG("DROP caused BF abort, conf %d", granted_thd->wsrep_conflict_state);
- }
- else if (granted_thd->wsrep_query_state == QUERY_COMMITTING)
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ wsrep_abort_thd(request_thd, granted_thd, 1);
+ }
+ else
+ {
+ /*
+ Granted_thd is likely executing with wsrep_on=0. If the requesting
+ thd is BF, BF abort and wait.
+ */
+ mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
+ if (wsrep_thd_is_BF(request_thd, FALSE))
{
- WSREP_DEBUG("MDL granted, but committing thd abort scheduled");
+ ha_abort_transaction(request_thd, granted_thd, TRUE);
}
else
{
- WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", schema, schema_len,
- request_thd, granted_thd);
+ WSREP_MDL_LOG(INFO, "MDL unknown BF-BF conflict", schema, schema_len,
+ request_thd, granted_thd);
+ ticket->wsrep_report(true);
+ unireg_abort(1);
}
- ticket->wsrep_report(true);
}
-
- mysql_mutex_unlock(&granted_thd->LOCK_thd_data);
- wsrep_abort_thd((void *) request_thd, (void *) granted_thd, 1);
- ret= false;
}
}
else
{
mysql_mutex_unlock(&request_thd->LOCK_thd_data);
}
-
- return ret;
-}
-
-
-pthread_handler_t start_wsrep_THD(void *arg)
-{
- THD *thd;
- wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
-
- if (my_thread_init() || (!(thd= new THD(next_thread_id(), true))))
- {
- goto error;
- }
-
- mysql_mutex_lock(&LOCK_thread_count);
-
- if (wsrep_gtid_mode)
- {
- /* Adjust domain_id. */
- thd->variables.gtid_domain_id= wsrep_gtid_domain_id;
- }
-
- thd->real_id=pthread_self(); // Keep purify happy
- thread_created++;
- threads.append(thd);
-
- my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
-
- DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
- thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
- (void) mysql_mutex_unlock(&LOCK_thread_count);
-
- /* from bootstrap()... */
- thd->bootstrap=1;
- thd->max_client_packet_length= thd->net.max_packet;
- thd->security_ctx->master_access= ~(ulong)0;
-
- /* from handle_one_connection... */
- pthread_detach_this_thread();
-
- mysql_thread_set_psi_id(thd->thread_id);
- thd->thr_create_utime= microsecond_interval_timer();
- if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
- {
- close_connection(thd, ER_OUT_OF_RESOURCES);
- statistic_increment(aborted_connects,&LOCK_status);
- MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
- goto error;
- }
-
-// </5.1.17>
- /*
- handle_one_connection() is normally the only way a thread would
- start and would always be on the very high end of the stack ,
- therefore, the thread stack always starts at the address of the
- first local variable of handle_one_connection, which is thd. We
- need to know the start of the stack so that we could check for
- stack overruns.
- */
- DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n",
- (long long)thd->thread_id));
- /* now that we've called my_thread_init(), it is safe to call DBUG_* */
-
- thd->thread_stack= (char*) &thd;
- if (thd->store_globals())
- {
- close_connection(thd, ER_OUT_OF_RESOURCES);
- statistic_increment(aborted_connects,&LOCK_status);
- MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
- goto error;
- }
-
- thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
- thd->security_ctx->skip_grants();
-
- /* handle_one_connection() again... */
- //thd->version= refresh_version;
- thd->proc_info= 0;
- thd->set_command(COM_SLEEP);
- thd->init_for_queries();
-
- mysql_mutex_lock(&LOCK_thread_count);
- wsrep_running_threads++;
- mysql_cond_broadcast(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
-
- processor(thd);
-
- close_connection(thd, 0);
-
- mysql_mutex_lock(&LOCK_thread_count);
- wsrep_running_threads--;
- WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
- mysql_cond_broadcast(&COND_thread_count);
- mysql_mutex_unlock(&LOCK_thread_count);
-
- // Note: We can't call THD destructor without crashing
- // if plugins have not been initialized. However, in most of the
- // cases this means that pre SE initialization SST failed and
- // we are going to exit anyway.
- if (plugins_are_initialized)
- {
- net_end(&thd->net);
- MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
- }
- else
- {
- // TODO: lightweight cleanup to get rid of:
- // 'Error in my_thread_global_end(): 2 threads didn't exit'
- // at server shutdown
- }
-
- unlink_not_visible_thd(thd);
- delete thd;
- my_thread_end();
- return(NULL);
-
-error:
- WSREP_ERROR("Failed to create/initialize system thread");
-
- /* Abort if its the first applier/rollbacker thread. */
- if (!mysqld_server_initialized)
- unireg_abort(1);
- else
- return NULL;
}
-
/**/
static bool abort_replicated(THD *thd)
{
bool ret_code= false;
- if (thd->wsrep_query_state== QUERY_COMMITTING)
+ if (thd->wsrep_trx().state() == wsrep::transaction::s_committing)
{
WSREP_DEBUG("aborting replicated trx: %llu", (ulonglong)(thd->real_id));
@@ -2154,38 +2177,34 @@ static bool abort_replicated(THD *thd)
return ret_code;
}
-
/**/
static inline bool is_client_connection(THD *thd)
{
return (thd->wsrep_client_thread && thd->variables.wsrep_on);
}
-
static inline bool is_replaying_connection(THD *thd)
{
bool ret;
mysql_mutex_lock(&thd->LOCK_thd_data);
- ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false;
+ ret= (thd->wsrep_trx().state() == wsrep::transaction::s_replaying) ? true : false;
mysql_mutex_unlock(&thd->LOCK_thd_data);
return ret;
}
-
static inline bool is_committing_connection(THD *thd)
{
bool ret;
mysql_mutex_lock(&thd->LOCK_thd_data);
- ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
+ ret= (thd->wsrep_trx().state() == wsrep::transaction::s_committing) ? true : false;
mysql_mutex_unlock(&thd->LOCK_thd_data);
return ret;
}
-
static bool have_client_connections()
{
THD *tmp;
@@ -2222,7 +2241,6 @@ static void wsrep_close_thread(THD *thd)
}
}
-
static my_bool have_committing_connections()
{
THD *tmp;
@@ -2236,6 +2254,7 @@ static my_bool have_committing_connections()
if (is_committing_connection(tmp))
{
+ mysql_mutex_unlock(&LOCK_thread_count);
return TRUE;
}
}
@@ -2243,7 +2262,6 @@ static my_bool have_committing_connections()
return FALSE;
}
-
int wsrep_wait_committing_connections_close(int wait_time)
{
int sleep_time= 100;
@@ -2261,8 +2279,7 @@ int wsrep_wait_committing_connections_close(int wait_time)
return 0;
}
-
-void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
+void wsrep_close_client_connections(my_bool wait_to_end, THD* except_caller_thd)
{
/*
First signal all threads that it's time to die
@@ -2305,12 +2322,7 @@ void wsrep_close_client_connections(my_bool wait_to_end, THD *except_caller_thd)
/*
instead of wsrep_close_thread() we do now soft kill by THD::awake
*/
- mysql_mutex_lock(&tmp->LOCK_thd_data);
-
tmp->awake(KILL_CONNECTION);
-
- mysql_mutex_unlock(&tmp->LOCK_thd_data);
-
}
mysql_mutex_unlock(&LOCK_thread_count);
@@ -2360,7 +2372,6 @@ void wsrep_close_applier(THD *thd)
wsrep_close_thread(thd);
}
-
void wsrep_close_threads(THD *thd)
{
THD *tmp;
@@ -2386,10 +2397,12 @@ void wsrep_wait_appliers_close(THD *thd)
{
/* Wait for wsrep appliers to gracefully exit */
mysql_mutex_lock(&LOCK_thread_count);
- while (wsrep_running_threads > 1)
- // 1 is for rollbacker thread which needs to be killed explicitly.
- // This gotta be fixed in a more elegant manner if we gonna have arbitrary
- // number of non-applier wsrep threads.
+ while (wsrep_running_threads > 2)
+ /*
+ 2 is for rollbacker thread which needs to be killed explicitly.
+ This gotta be fixed in a more elegant manner if we gonna have arbitrary
+ number of non-applier wsrep threads.
+ */
{
if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
{
@@ -2425,7 +2438,6 @@ void wsrep_wait_appliers_close(THD *thd)
*/
}
-
void wsrep_kill_mysql(THD *thd)
{
if (mysqld_server_started)
@@ -2442,267 +2454,167 @@ void wsrep_kill_mysql(THD *thd)
}
}
-
-static int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
+void
+wsrep_last_committed_id(wsrep_gtid_t* gtid)
{
- String log_query;
- sp_head *sp = thd->lex->sphead;
- sql_mode_t saved_mode= thd->variables.sql_mode;
- String retstr(64);
- LEX_CSTRING returns= empty_clex_str;
- retstr.set_charset(system_charset_info);
-
- log_query.set_charset(system_charset_info);
-
- if (sp->m_handler->type() == TYPE_ENUM_FUNCTION)
- {
- sp_returns_type(thd, retstr, sp);
- returns= retstr.lex_cstring();
- }
- if (sp->m_handler->
- show_create_sp(thd, &log_query,
- sp->m_explicit_name ? sp->m_db : null_clex_str,
- sp->m_name, sp->m_params, returns,
- sp->m_body, sp->chistics(),
- thd->lex->definer[0],
- thd->lex->create_info,
- saved_mode))
- {
- WSREP_WARN("SP create string failed: schema: %s, query: %s",
- thd->get_db(), thd->query());
- return 1;
- }
-
- return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
+ wsrep::gtid ret= Wsrep_server_state::instance().last_committed_gtid();
+ memcpy(gtid->uuid.data, ret.id().data(), sizeof(gtid->uuid.data));
+ gtid->seqno= ret.seqno().get();
}
-
-extern int wsrep_on(THD *thd)
+void
+wsrep_node_uuid(wsrep_uuid_t& uuid)
{
- return (int)(WSREP(thd));
+ uuid= node_uuid;
}
-
-extern "C" bool wsrep_thd_is_wsrep_on(THD *thd)
+int wsrep_must_ignore_error(THD* thd)
{
- return thd->variables.wsrep_on;
-}
+ const int error= thd->get_stmt_da()->sql_errno();
+ const uint flags= sql_command_flags[thd->lex->sql_command];
+ DBUG_ASSERT(error);
+ DBUG_ASSERT((wsrep_thd_is_toi(thd)) ||
+ (wsrep_thd_is_applying(thd) && thd->wsrep_apply_toi));
-bool wsrep_consistency_check(THD *thd)
-{
- return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
-}
-
-
-extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode)
-{
- thd->wsrep_exec_mode= mode;
-}
+ if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_DDL))
+ goto ignore_error;
+ if ((flags & CF_WSREP_MAY_IGNORE_ERRORS) &&
+ (wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DDL))
+ {
+ switch (error)
+ {
+ case ER_DB_DROP_EXISTS:
+ case ER_BAD_TABLE_ERROR:
+ case ER_CANT_DROP_FIELD_OR_KEY:
+ goto ignore_error;
+ }
+ }
-extern "C" void wsrep_thd_set_query_state(
- THD *thd, enum wsrep_query_state state)
-{
- thd->wsrep_query_state= state;
-}
-
-
-void wsrep_thd_set_conflict_state(THD *thd, enum wsrep_conflict_state state)
-{
- if (WSREP(thd)) thd->wsrep_conflict_state= state;
-}
-
-
-enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd)
-{
- return thd->wsrep_exec_mode;
-}
-
-
-const char *wsrep_thd_exec_mode_str(THD *thd)
-{
- return
- (!thd) ? "void" :
- (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" :
- (thd->wsrep_exec_mode == REPL_RECV) ? "applier" :
- (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" :
- (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void";
-}
-
-
-enum wsrep_query_state wsrep_thd_query_state(THD *thd)
-{
- return thd->wsrep_query_state;
-}
-
-
-const char *wsrep_thd_query_state_str(THD *thd)
-{
- return
- (!thd) ? "void" :
- (thd->wsrep_query_state == QUERY_IDLE) ? "idle" :
- (thd->wsrep_query_state == QUERY_EXEC) ? "executing" :
- (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" :
- (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" :
- (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void";
-}
-
-
-enum wsrep_conflict_state wsrep_thd_get_conflict_state(THD *thd)
-{
- return thd->wsrep_conflict_state;
-}
-
-
-const char *wsrep_thd_conflict_state_str(THD *thd)
-{
- return
- (!thd) ? "void" :
- (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" :
- (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" :
- (thd->wsrep_conflict_state == ABORTING) ? "aborting" :
- (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" :
- (thd->wsrep_conflict_state == REPLAYING) ? "replaying" :
- (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" :
- (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void";
-}
-
-
-wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
-{
- return &thd->wsrep_ws_handle;
-}
-
-
-void wsrep_thd_LOCK(THD *thd)
-{
- mysql_mutex_lock(&thd->LOCK_thd_data);
-}
-
+ return 0;
-void wsrep_thd_UNLOCK(THD *thd)
-{
- mysql_mutex_unlock(&thd->LOCK_thd_data);
+ignore_error:
+ WSREP_WARN("Ignoring error '%s' on query. "
+ "Default database: '%s'. Query: '%s', Error_code: %d",
+ thd->get_stmt_da()->message(),
+ print_slave_db_safe(thd->db.str),
+ thd->query(),
+ error);
+ return 1;
}
-
-extern "C" time_t wsrep_thd_query_start(THD *thd)
+int wsrep_ignored_error_code(Log_event* ev, int error)
{
- return thd->query_start();
-}
+ const THD* thd= ev->thd;
+ DBUG_ASSERT(error);
+ DBUG_ASSERT(wsrep_thd_is_applying(thd) &&
+ !wsrep_thd_is_local_toi(thd));
-extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd)
-{
- return thd->wsrep_rand;
-}
-
-longlong wsrep_thd_trx_seqno(THD *thd)
-{
- return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED;
-}
+ if ((wsrep_ignore_apply_errors & WSREP_IGNORE_ERRORS_ON_RECONCILING_DML))
+ {
+ const int ev_type= ev->get_type_code();
+ if ((ev_type == DELETE_ROWS_EVENT || ev_type == DELETE_ROWS_EVENT_V1)
+ && error == ER_KEY_NOT_FOUND)
+ goto ignore_error;
+ }
+ return 0;
-extern "C" query_id_t wsrep_thd_query_id(THD *thd)
-{
- return thd->query_id;
+ignore_error:
+ WSREP_WARN("Ignoring error '%s' on %s event. Error_code: %d",
+ thd->get_stmt_da()->message(),
+ ev->get_type_str(),
+ error);
+ return 1;
}
-
-char *wsrep_thd_query(THD *thd)
+bool wsrep_provider_is_SR_capable()
{
- return (thd) ? thd->query() : NULL;
+ return Wsrep_server_state::has_capability(wsrep::provider::capability::streaming);
}
-extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd)
+int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all)
{
- return thd->wsrep_last_query_id;
+ if (((wsrep_thd_is_local(thd) &&
+ (WSREP_EMULATE_BINLOG(thd) || !thd->variables.sql_log_bin)) ||
+ (wsrep_thd_is_applying(thd) && !opt_log_slave_updates))
+ && wsrep_thd_trx_seqno(thd) > 0)
+ {
+ wsrep_apply_error unused;
+ return wsrep_ordered_commit(thd, all, unused);
+ }
+ return 0;
}
-
-extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
+wsrep_status_t wsrep_tc_log_commit(THD* thd)
{
- thd->wsrep_last_query_id= id;
-}
-
+ int cookie;
+ my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
-extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
-{
- if (signal)
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD);
+ if (wsrep_before_commit(thd, true))
{
- thd->awake(KILL_QUERY);
+ WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu",
+ thd->thread_id);
+ return WSREP_TRX_FAIL;
}
- else
+ cookie= tc_log->log_and_order(thd, xid, 1, false, true);
+ if (wsrep_after_commit(thd, true))
{
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- mysql_cond_broadcast(&COND_wsrep_replaying);
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu",
+ thd->thread_id);
+ return WSREP_TRX_FAIL;
+ }
+ if (!cookie)
+ {
+ WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
+ return WSREP_TRX_FAIL;
+ }
+ if (tc_log->unlog(cookie, xid))
+ {
+ WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
+ return WSREP_TRX_FAIL;
}
-}
-
-
-int wsrep_thd_retry_counter(THD *thd)
-{
- return(thd->wsrep_retry_counter);
-}
-
-
-extern "C" bool wsrep_thd_ignore_table(THD *thd)
-{
- return thd->wsrep_ignore_table;
-}
-
-extern int
-wsrep_trx_order_before(THD *thd1, THD *thd2)
-{
- if (wsrep_thd_trx_seqno(thd1) < wsrep_thd_trx_seqno(thd2)) {
- WSREP_DEBUG("BF conflict, order: %lld %lld\n",
- (long long)wsrep_thd_trx_seqno(thd1),
- (long long)wsrep_thd_trx_seqno(thd2));
- return 1;
+ if (wsrep_after_statement(thd))
+ {
+ return WSREP_TRX_FAIL;
+ }
+ /* Set wsrep transaction id if not set. */
+ if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
+ {
+ if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID)
+ {
+ thd->set_wsrep_next_trx_id(thd->query_id);
}
- WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
- (long long)wsrep_thd_trx_seqno(thd1),
- (long long)wsrep_thd_trx_seqno(thd2));
- return 0;
+ DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
+ }
+ if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id()))
+ {
+ return WSREP_TRX_FAIL;
+ }
+ DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
+ return WSREP_OK;
}
-
-int wsrep_trx_is_aborting(THD *thd_ptr)
+int wsrep_thd_retry_counter(const THD *thd)
{
- if (thd_ptr) {
- if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) ||
- (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) {
- return 1;
- }
- }
- return 0;
+ return thd->wsrep_retry_counter;
}
-
-void wsrep_copy_query(THD *thd)
+extern bool wsrep_thd_ignore_table(THD *thd)
{
- thd->wsrep_retry_command = thd->get_command();
- thd->wsrep_retry_query_len = thd->query_length();
- if (thd->wsrep_retry_query) {
- my_free(thd->wsrep_retry_query);
- }
- thd->wsrep_retry_query = (char *)my_malloc(
- thd->wsrep_retry_query_len + 1, MYF(0));
- strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len);
- thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0';
+ return thd->wsrep_ignore_table;
}
-
bool wsrep_is_show_query(enum enum_sql_command command)
{
DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
}
-
bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
TABLE_LIST* src_table,
HA_CREATE_INFO *create_info)
@@ -2753,8 +2665,7 @@ wsrep_error_label:
#endif
}
-
-static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
+int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
{
LEX *lex= thd->lex;
String stmt_query;
@@ -2809,88 +2720,165 @@ static int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
buf, buf_len);
}
-/***** callbacks for wsrep service ************/
-
-my_bool get_wsrep_debug()
+void* start_wsrep_THD(void *arg)
{
- return wsrep_debug;
-}
+ THD *thd;
-my_bool get_wsrep_load_data_splitting()
-{
- return wsrep_load_data_splitting;
-}
+ Wsrep_thd_args* thd_args= (Wsrep_thd_args*) arg;
-long get_wsrep_protocol_version()
-{
- return wsrep_protocol_version;
-}
+ if (my_thread_init() || (!(thd= new THD(next_thread_id(), true))))
+ {
+ goto error;
+ }
-my_bool get_wsrep_drupal_282555_workaround()
-{
- return wsrep_drupal_282555_workaround;
-}
+ mysql_mutex_lock(&LOCK_thread_count);
-my_bool get_wsrep_recovery()
-{
- return wsrep_recovery;
-}
+ if (wsrep_gtid_mode)
+ {
+ /* Adjust domain_id. */
+ thd->variables.gtid_domain_id= wsrep_gtid_domain_id;
+ }
-my_bool get_wsrep_log_conflicts()
-{
- return wsrep_log_conflicts;
-}
+ thd->real_id=pthread_self(); // Keep purify happy
+ thread_created++;
+ threads.append(thd);
-wsrep_t *get_wsrep()
-{
- return wsrep;
-}
+ my_net_init(&thd->net,(st_vio*) 0, thd, MYF(0));
-my_bool get_wsrep_certify_nonPK()
-{
- return wsrep_certify_nonPK;
-}
+ DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
+ thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
+ (void) mysql_mutex_unlock(&LOCK_thread_count);
-void wsrep_lock_rollback()
-{
- mysql_mutex_lock(&LOCK_wsrep_rollback);
-}
+ /* from bootstrap()... */
+ thd->bootstrap=1;
+ thd->max_client_packet_length= thd->net.max_packet;
+ thd->security_ctx->master_access= ~(ulong)0;
-void wsrep_unlock_rollback()
-{
- mysql_cond_signal(&COND_wsrep_rollback);
- mysql_mutex_unlock(&LOCK_wsrep_rollback);
-}
+ /* from handle_one_connection... */
+ pthread_detach_this_thread();
-my_bool wsrep_aborting_thd_contains(THD *thd)
-{
- mysql_mutex_assert_owner(&LOCK_wsrep_rollback);
- wsrep_aborting_thd_t abortees = wsrep_aborting_thd;
- while (abortees)
+ mysql_thread_set_psi_id(thd->thread_id);
+ thd->thr_create_utime= microsecond_interval_timer();
+ if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
{
- if (abortees->aborting_thd == thd)
- return true;
- abortees = abortees->next;
+ close_connection(thd, ER_OUT_OF_RESOURCES);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+ goto error;
}
- return false;
+
+// </5.1.17>
+ /*
+ handle_one_connection() is normally the only way a thread would
+ start and would always be on the very high end of the stack ,
+ therefore, the thread stack always starts at the address of the
+ first local variable of handle_one_connection, which is thd. We
+ need to know the start of the stack so that we could check for
+ stack overruns.
+ */
+ DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n",
+ (long long)thd->thread_id));
+ /* now that we've called my_thread_init(), it is safe to call DBUG_* */
+
+ thd->thread_stack= (char*) &thd;
+ if (thd->store_globals())
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+ delete thd;
+ delete thd_args;
+ goto error;
+ }
+
+ thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
+ thd->security_ctx->skip_grants();
+
+ /* handle_one_connection() again... */
+ thd->proc_info= 0;
+ thd->set_command(COM_SLEEP);
+ thd->init_for_queries();
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads++;
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ WSREP_DEBUG("wsrep system thread %llu, %p starting",
+ thd->thread_id, thd);
+ thd_args->fun()(thd, thd_args->args());
+
+ WSREP_DEBUG("wsrep system thread: %llu, %p closing",
+ thd->thread_id, thd);
+
+ /* Wsrep may reset globals during thread context switches, store globals
+ before cleanup. */
+ thd->store_globals();
+
+ close_connection(thd, 0);
+
+ delete thd_args;
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads--;
+ WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+ /*
+ Note: We can't call THD destructor without crashing
+ if plugins have not been initialized. However, in most of the
+ cases this means that pre SE initialization SST failed and
+ we are going to exit anyway.
+ */
+ if (plugins_are_initialized)
+ {
+ net_end(&thd->net);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
+ }
+ else
+ {
+ /*
+ TODO: lightweight cleanup to get rid of:
+ 'Error in my_thread_global_end(): 2 threads didn't exit'
+ at server shutdown
+ */
+ }
+
+ unlink_not_visible_thd(thd);
+ delete thd;
+ my_thread_end();
+ return(NULL);
+
+error:
+ WSREP_ERROR("Failed to create/initialize system thread");
+
+ /* Abort if its the first applier/rollbacker thread. */
+ if (!mysqld_server_initialized)
+ unireg_abort(1);
+ else
+ return NULL;
}
-void wsrep_aborting_thd_enqueue(THD *thd)
+enum wsrep::streaming_context::fragment_unit wsrep_fragment_unit(ulong unit)
{
- mysql_mutex_assert_owner(&LOCK_wsrep_rollback);
- wsrep_aborting_thd_t aborting = (wsrep_aborting_thd_t)
- my_malloc(sizeof(struct wsrep_aborting_thd), MYF(0));
- aborting->aborting_thd = thd;
- aborting->next = wsrep_aborting_thd;
- wsrep_aborting_thd = aborting;
+ switch (unit)
+ {
+ case WSREP_FRAG_BYTES: return wsrep::streaming_context::bytes;
+ case WSREP_FRAG_ROWS: return wsrep::streaming_context::row;
+ case WSREP_FRAG_STATEMENTS: return wsrep::streaming_context::statement;
+ default:
+ DBUG_ASSERT(0);
+ return wsrep::streaming_context::bytes;
+ }
}
-bool wsrep_node_is_donor()
+/***** callbacks for wsrep service ************/
+
+my_bool get_wsrep_recovery()
{
- return (WSREP_ON) ? (wsrep_config_state->get_status() == 2) : false;
+ return wsrep_recovery;
}
-bool wsrep_node_is_synced()
+bool wsrep_consistency_check(THD *thd)
{
- return (WSREP_ON) ? (wsrep_config_state->get_status() == 4) : false;
+ return thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
}