summaryrefslogtreecommitdiff
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc2433
1 files changed, 2433 insertions, 0 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
new file mode 100644
index 00000000000..c1b15bd02c2
--- /dev/null
+++ b/sql/wsrep_mysqld.cc
@@ -0,0 +1,2433 @@
+/* Copyright 2008-2014 Codership Oy <http://www.codership.com>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include <mysqld.h>
+#include <sql_class.h>
+#include <sql_parse.h>
+#include "slave.h"
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "rpl_filter.h"
+#include "sql_callback.h"
+#include "sp_head.h"
+#include "sp.h"
+#include "wsrep_priv.h"
+#include "wsrep_thd.h"
+#include "wsrep_sst.h"
+#include "wsrep_utils.h"
+#include "wsrep_var.h"
+#include "wsrep_binlog.h"
+#include "wsrep_applier.h"
+#include <cstdio>
+#include <cstdlib>
+#include "log_event.h"
+#include <slave.h>
+
+wsrep_t *wsrep = NULL;
+my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
+#ifdef GTID_SUPPORT
+/* Sidno in global_sid_map corresponding to group uuid */
+rpl_sidno wsrep_sidno= -1;
+#endif /* GTID_SUPPORT */
+my_bool wsrep_preordered_opt= FALSE;
+
+/*
+ * Begin configuration options and their default values
+ */
+
+extern int wsrep_replaying;
+extern ulong wsrep_running_threads;
+extern ulong my_bind_addr;
+extern my_bool plugins_are_initialized;
+extern uint kill_cached_threads;
+extern mysql_cond_t COND_thread_cache;
+
+const char* wsrep_data_home_dir = NULL;
+const char* wsrep_dbug_option = "";
+
+long wsrep_slave_threads = 1; // # of slave action appliers wanted
+int wsrep_slave_count_change = 0; // # of appliers to stop or start
+my_bool wsrep_debug = 0; // enable debug level logging
+my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx
+ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx
+my_bool wsrep_auto_increment_control = 1; // control auto increment variables
+my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey
+my_bool wsrep_incremental_data_collection = 0; // incremental data collection
+ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size
+ulong wsrep_max_ws_rows = 65536; // max number of rows in ws
+int wsrep_to_isolation = 0; // # of active TO isolation threads
+my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key
+long wsrep_max_protocol_version = 3; // maximum protocol version to use
+ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC;
+my_bool wsrep_recovery = 0; // recovery
+my_bool wsrep_replicate_myisam = 0; // enable myisam replication
+my_bool wsrep_log_conflicts = 0;
+ulong wsrep_mysql_replication_bundle = 0;
+my_bool wsrep_desync = 0; // desynchronize the node from the
+ // cluster
+my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals
+my_bool wsrep_restart_slave = 0; // should mysql slave thread be
+ // restarted, if node joins back
+my_bool wsrep_restart_slave_activated = 0; // node has dropped, and slave
+ // restart will be needed
+my_bool wsrep_slave_UK_checks = 0; // slave thread does UK checks
+my_bool wsrep_slave_FK_checks = 0; // slave thread does FK checks
+/*
+ * End configuration options
+ */
+
+/*
+ * Other wsrep global variables.
+ */
+my_bool wsrep_inited = 0; // initialized ?
+
+static const wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
+static char cluster_uuid_str[40]= { 0, };
+static const char* cluster_status_str[WSREP_VIEW_MAX] =
+{
+ "Primary",
+ "non-Primary",
+ "Disconnected"
+};
+
+static char provider_name[256]= { 0, };
+static char provider_version[256]= { 0, };
+static char provider_vendor[256]= { 0, };
+
+/*
+ * wsrep status variables
+ */
+my_bool wsrep_connected = FALSE;
+my_bool wsrep_ready = FALSE; // node can accept queries
+const char* wsrep_cluster_state_uuid = cluster_uuid_str;
+long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
+const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
+long wsrep_cluster_size = 0;
+long wsrep_local_index = -1;
+long long wsrep_local_bf_aborts = 0;
+const char* wsrep_provider_name = provider_name;
+const char* wsrep_provider_version = provider_version;
+const char* wsrep_provider_vendor = provider_vendor;
+/* End wsrep status variables */
+
+wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
+wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
+wsp::node_status local_status;
+long wsrep_protocol_version = 3;
+
+// Boolean denoting if server is in initial startup phase. This is needed
+// to make sure that main thread waiting in wsrep_sst_wait() is signaled
+// if there was no state gap on receiving first view event.
+static my_bool wsrep_startup = TRUE;
+
+
+static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
+ switch (level) {
+ case WSREP_LOG_INFO:
+ sql_print_information("WSREP: %s", msg);
+ break;
+ case WSREP_LOG_WARN:
+ sql_print_warning("WSREP: %s", msg);
+ break;
+ case WSREP_LOG_ERROR:
+ case WSREP_LOG_FATAL:
+ sql_print_error("WSREP: %s", msg);
+ break;
+ case WSREP_LOG_DEBUG:
+ if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
+ default:
+ break;
+ }
+}
+
+static void wsrep_log_states (wsrep_log_level_t const level,
+ const wsrep_uuid_t* const group_uuid,
+ wsrep_seqno_t const group_seqno,
+ const wsrep_uuid_t* const node_uuid,
+ wsrep_seqno_t const node_seqno)
+{
+ char uuid_str[37];
+ char msg[256];
+
+ wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str));
+ snprintf (msg, 255, "WSREP: Group state: %s:%lld",
+ uuid_str, (long long)group_seqno);
+ wsrep_log_cb (level, msg);
+
+ wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str));
+ snprintf (msg, 255, "WSREP: Local state: %s:%lld",
+ uuid_str, (long long)node_seqno);
+ wsrep_log_cb (level, msg);
+}
+
+static my_bool set_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg)
+{
+ XID* xid= reinterpret_cast<XID*>(arg);
+ handlerton* hton= plugin_data(plugin, handlerton *);
+ if (hton->set_checkpoint)
+ {
+ const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid));
+ char uuid_str[40] = {0, };
+ wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
+ WSREP_DEBUG("Set WSREPXid for InnoDB: %s:%lld",
+ uuid_str, (long long)wsrep_xid_seqno(xid));
+ hton->set_checkpoint(hton, xid);
+ }
+ return FALSE;
+}
+
+void wsrep_set_SE_checkpoint(XID* xid)
+{
+ plugin_foreach(NULL, set_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
+}
+
+static my_bool get_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg)
+{
+ XID* xid= reinterpret_cast<XID*>(arg);
+ handlerton* hton= plugin_data(plugin, handlerton *);
+ if (hton->get_checkpoint)
+ {
+ hton->get_checkpoint(hton, xid);
+ const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid));
+ char uuid_str[40] = {0, };
+ wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
+ WSREP_DEBUG("Read WSREPXid from InnoDB: %s:%lld",
+ uuid_str, (long long)wsrep_xid_seqno(xid));
+ }
+ return FALSE;
+}
+
+void wsrep_get_SE_checkpoint(XID* xid)
+{
+ plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
+}
+
+#ifdef GTID_SUPPORT
+void wsrep_init_sidno(const wsrep_uuid_t& uuid)
+{
+ /* generate new Sid map entry from inverted uuid */
+ rpl_sid sid;
+ wsrep_uuid_t ltid_uuid;
+ for (size_t i= 0; i < sizeof(ltid_uuid.data); ++i)
+ {
+ ltid_uuid.data[i] = ~local_uuid.data[i];
+ }
+ sid.copy_from(ltid_uuid.data);
+ global_sid_lock->wrlock();
+ wsrep_sidno= global_sid_map->add_sid(sid);
+ WSREP_INFO("inited wsrep sidno %d", wsrep_sidno);
+ global_sid_lock->unlock();
+}
+#endif /* GTID_SUPPORT */
+
+static wsrep_cb_status_t
+wsrep_view_handler_cb (void* app_ctx,
+ void* recv_ctx,
+ const wsrep_view_info_t* view,
+ const char* state,
+ size_t state_len,
+ void** sst_req,
+ size_t* sst_req_len)
+{
+ *sst_req = NULL;
+ *sst_req_len = 0;
+
+ wsrep_member_status_t new_status= local_status.get();
+
+ if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
+ {
+ memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid,
+ sizeof(cluster_uuid));
+
+ wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
+ sizeof(cluster_uuid_str));
+ }
+
+ wsrep_cluster_conf_id= view->view;
+ wsrep_cluster_status= cluster_status_str[view->status];
+ wsrep_cluster_size= view->memb_num;
+ wsrep_local_index= view->my_idx;
+
+ WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
+ "number of nodes: %ld, my index: %ld, protocol version %d",
+ wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
+ (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
+ wsrep_cluster_size, wsrep_local_index, view->proto_ver);
+
+ /* Proceed further only if view is PRIMARY */
+ if (WSREP_VIEW_PRIMARY != view->status) {
+ wsrep_ready_set(FALSE);
+ new_status= WSREP_MEMBER_UNDEFINED;
+ /* Always record local_uuid and local_seqno in non-prim since this
+ * may lead to re-initializing provider and start position is
+ * determined according to these variables */
+ // WRONG! local_uuid should be the last primary configuration uuid we were
+ // a member of. local_seqno should be updated in commit calls.
+ // local_uuid= cluster_uuid;
+ // local_seqno= view->first - 1;
+ goto out;
+ }
+
+ switch (view->proto_ver)
+ {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ // version change
+ if (view->proto_ver != wsrep_protocol_version)
+ {
+ my_bool wsrep_ready_saved= wsrep_ready;
+ wsrep_ready_set(FALSE);
+ WSREP_INFO("closing client connections for "
+ "protocol change %ld -> %d",
+ wsrep_protocol_version, view->proto_ver);
+ wsrep_close_client_connections(TRUE);
+ wsrep_protocol_version= view->proto_ver;
+ wsrep_ready_set(wsrep_ready_saved);
+ }
+ break;
+ default:
+ WSREP_ERROR("Unsupported application protocol version: %d",
+ view->proto_ver);
+ unireg_abort(1);
+ }
+
+ if (view->state_gap)
+ {
+ WSREP_WARN("Gap in state sequence. Need state transfer.");
+
+ /* After that wsrep will call wsrep_sst_prepare. */
+ /* keep ready flag 0 until we receive the snapshot */
+ wsrep_ready_set(FALSE);
+
+ /* Close client connections to ensure that they don't interfere
+ * with SST */
+ WSREP_DEBUG("[debug]: closing client connections for PRIM");
+ wsrep_close_client_connections(TRUE);
+
+ ssize_t const req_len= wsrep_sst_prepare (sst_req);
+
+ if (req_len < 0)
+ {
+ WSREP_ERROR("SST preparation failed: %zd (%s)", -req_len,
+ strerror(-req_len));
+ new_status= WSREP_MEMBER_UNDEFINED;
+ }
+ else
+ {
+ assert(sst_req != NULL);
+ *sst_req_len= req_len;
+ new_status= WSREP_MEMBER_JOINER;
+ }
+ }
+ else
+ {
+ /*
+ * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized
+ * before - OR - it was reinitilized on startup (lp:992840)
+ */
+ if (wsrep_startup)
+ {
+ if (wsrep_before_SE())
+ {
+ wsrep_SE_init_grab();
+ // Signal mysqld init thread to continue
+ wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
+ // and wait for SE initialization
+ wsrep_SE_init_wait();
+ }
+ else
+ {
+ local_uuid= cluster_uuid;
+ local_seqno= view->state_id.seqno;
+ }
+ /* Init storage engine XIDs from first view */
+ XID xid;
+ wsrep_xid_init(&xid, &local_uuid, local_seqno);
+ wsrep_set_SE_checkpoint(&xid);
+ new_status= WSREP_MEMBER_JOINED;
+#ifdef GTID_SUPPORT
+ wsrep_init_sidno(local_uuid);
+#endif /* GTID_SUPPORT */
+ }
+
+ // just some sanity check
+ if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
+ {
+ WSREP_ERROR("Undetected state gap. Can't continue.");
+ wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
+ &local_uuid, -1);
+ unireg_abort(1);
+ }
+ }
+
+ if (wsrep_auto_increment_control)
+ {
+ global_system_variables.auto_increment_offset= view->my_idx + 1;
+ global_system_variables.auto_increment_increment= view->memb_num;
+ }
+
+ { /* capabilities may be updated on new configuration */
+ uint64_t const caps(wsrep->capabilities (wsrep));
+
+ my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
+ if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
+ {
+ WSREP_WARN("Unsupported protocol downgrade: "
+ "incremental data collection disabled. Expect abort.");
+ }
+ wsrep_incremental_data_collection = idc;
+ }
+
+out:
+ if (view->status == WSREP_VIEW_PRIMARY) wsrep_startup= FALSE;
+ local_status.set(new_status, view);
+
+ return WSREP_CB_SUCCESS;
+}
+
+void wsrep_ready_set (my_bool x)
+{
+ WSREP_DEBUG("Setting wsrep_ready to %d", x);
+ if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
+ if (wsrep_ready != x)
+ {
+ wsrep_ready= x;
+ mysql_cond_signal (&COND_wsrep_ready);
+ }
+ mysql_mutex_unlock (&LOCK_wsrep_ready);
+}
+
+// Wait until wsrep has reached ready state
+void wsrep_ready_wait ()
+{
+ if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
+ while (!wsrep_ready)
+ {
+ WSREP_INFO("Waiting to reach ready state");
+ mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready);
+ }
+ WSREP_INFO("ready state reached");
+ mysql_mutex_unlock (&LOCK_wsrep_ready);
+}
+
+static void wsrep_synced_cb(void* app_ctx)
+{
+ WSREP_INFO("Synchronized with group, ready for connections");
+ bool signal_main= false;
+ if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
+ if (!wsrep_ready)
+ {
+ wsrep_ready= TRUE;
+ mysql_cond_signal (&COND_wsrep_ready);
+ signal_main= true;
+
+ }
+ local_status.set(WSREP_MEMBER_SYNCED);
+ mysql_mutex_unlock (&LOCK_wsrep_ready);
+
+ if (signal_main)
+ {
+ wsrep_SE_init_grab();
+ // Signal mysqld init thread to continue
+ wsrep_sst_complete (&local_uuid, local_seqno, false);
+ // and wait for SE initialization
+ wsrep_SE_init_wait();
+ }
+ if (wsrep_restart_slave_activated)
+ {
+ int rcode;
+ WSREP_INFO("MySQL slave restart");
+ wsrep_restart_slave_activated= FALSE;
+
+ mysql_mutex_lock(&LOCK_active_mi);
+ if ((rcode = start_slave_threads(1 /* need mutex */,
+ 0 /* no wait for start*/,
+ active_mi,
+ master_info_file,
+ relay_log_info_file,
+ SLAVE_SQL)))
+ {
+ WSREP_WARN("Failed to create slave threads: %d", rcode);
+ }
+ mysql_mutex_unlock(&LOCK_active_mi);
+
+ }
+}
+
+static void wsrep_init_position()
+{
+ /* read XIDs from storage engines */
+ XID xid;
+ memset(&xid, 0, sizeof(xid));
+ xid.formatID= -1;
+ wsrep_get_SE_checkpoint(&xid);
+
+ if (xid.formatID == -1)
+ {
+ WSREP_INFO("Read nil XID from storage engines, skipping position init");
+ return;
+ }
+ else if (!wsrep_is_wsrep_xid(&xid))
+ {
+ WSREP_WARN("Read non-wsrep XID from storage engines, skipping position init");
+ return;
+ }
+
+ const wsrep_uuid_t* uuid= wsrep_xid_uuid(&xid);
+ const wsrep_seqno_t seqno= wsrep_xid_seqno(&xid);
+
+ char uuid_str[40] = {0, };
+ wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
+ WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno);
+
+
+ if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) &&
+ local_seqno == WSREP_SEQNO_UNDEFINED)
+ {
+ // Initial state
+ local_uuid= *uuid;
+ local_seqno= seqno;
+ }
+ else if (memcmp(&local_uuid, uuid, sizeof(local_uuid)) ||
+ local_seqno != seqno)
+ {
+ WSREP_WARN("Initial position was provided by configuration or SST, "
+ "avoiding override");
+ }
+}
+
+extern char* my_bind_addr_str;
+
+int wsrep_init()
+{
+ int rcode= -1;
+ DBUG_ASSERT(wsrep_inited == 0);
+
+ wsrep_ready_set(FALSE);
+ assert(wsrep_provider);
+
+ wsrep_init_position();
+
+ if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK)
+ {
+ if (strcasecmp(wsrep_provider, WSREP_NONE))
+ {
+ WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.",
+ wsrep_provider, strerror(rcode), rcode);
+ strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack
+ (void) wsrep_init();
+ return rcode;
+ }
+ else /* this is for recursive call above */
+ {
+ WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.",
+ strerror(rcode), rcode);
+ unireg_abort(1);
+ }
+ }
+
+ if (!WSREP_PROVIDER_EXISTS)
+ {
+ // enable normal operation in case no provider is specified
+ wsrep_ready_set(TRUE);
+ wsrep_inited= 1;
+ global_system_variables.wsrep_on = 0;
+ wsrep_init_args args;
+ args.logger_cb = wsrep_log_cb;
+ args.options = (wsrep_provider_options) ?
+ wsrep_provider_options : "";
+ rcode = wsrep->init(wsrep, &args);
+ if (rcode)
+ {
+ DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
+ WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
+ wsrep->free(wsrep);
+ free(wsrep);
+ wsrep = NULL;
+ }
+ return rcode;
+ }
+ else
+ {
+ global_system_variables.wsrep_on = 1;
+ strncpy(provider_name,
+ wsrep->provider_name, sizeof(provider_name) - 1);
+ strncpy(provider_version,
+ wsrep->provider_version, sizeof(provider_version) - 1);
+ strncpy(provider_vendor,
+ wsrep->provider_vendor, sizeof(provider_vendor) - 1);
+ }
+
+ if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
+ wsrep_data_home_dir = mysql_real_data_home;
+
+ char node_addr[512]= { 0, };
+ size_t const node_addr_max= sizeof(node_addr) - 1;
+ if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
+ {
+ size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
+ if (!(ret > 0 && ret < node_addr_max))
+ {
+ WSREP_WARN("Failed to guess base node address. Set it explicitly via "
+ "wsrep_node_address.");
+ node_addr[0]= '\0';
+ }
+ }
+ else
+ {
+ strncpy(node_addr, wsrep_node_address, node_addr_max);
+ }
+
+ char inc_addr[512]= { 0, };
+ size_t const inc_addr_max= sizeof (inc_addr);
+ if ((!wsrep_node_incoming_address ||
+ !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
+ {
+ unsigned int my_bind_ip= INADDR_ANY; // default if not set
+ if (my_bind_addr_str && strlen(my_bind_addr_str))
+ {
+ my_bind_ip= wsrep_check_ip(my_bind_addr_str);
+ }
+
+ if (INADDR_ANY != my_bind_ip)
+ {
+ if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
+ {
+ snprintf(inc_addr, inc_addr_max, "%s:%u",
+ my_bind_addr_str, (int)mysqld_port);
+ } // else leave inc_addr an empty string - mysqld is not listening for
+ // client connections on network interfaces.
+ }
+ else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible
+ {
+ size_t const node_addr_len= strlen(node_addr);
+ if (node_addr_len > 0)
+ {
+ const char* const colon= strrchr(node_addr, ':');
+ if (strchr(node_addr, ':') == colon) // 1 or 0 ':'
+ {
+ size_t const ip_len= colon ? colon - node_addr : node_addr_len;
+ if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
+ {
+ memcpy (inc_addr, node_addr, ip_len);
+ snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",
+ (int)mysqld_port);
+ }
+ else
+ {
+ WSREP_WARN("Guessing address for incoming client connections: "
+ "address too long.");
+ inc_addr[0]= '\0';
+ }
+ }
+ else
+ {
+ WSREP_WARN("Guessing address for incoming client connections: "
+ "too many colons :) .");
+ inc_addr[0]= '\0';
+ }
+ }
+
+ if (!strlen(inc_addr))
+ {
+ WSREP_WARN("Guessing address for incoming client connections failed. "
+ "Try setting wsrep_node_incoming_address explicitly.");
+ }
+ }
+ }
+ else if (!strchr(wsrep_node_incoming_address, ':')) // no port included
+ {
+ if ((int)inc_addr_max <=
+ snprintf(inc_addr, inc_addr_max, "%s:%u",
+ wsrep_node_incoming_address,(int)mysqld_port))
+ {
+ WSREP_WARN("Guessing address for incoming client connections: "
+ "address too long.");
+ inc_addr[0]= '\0';
+ }
+ }
+ else
+ {
+ size_t const need = strlen (wsrep_node_incoming_address);
+ if (need >= inc_addr_max) {
+ WSREP_WARN("wsrep_node_incoming_address too long: %zu", need);
+ inc_addr[0]= '\0';
+ }
+ else {
+ memcpy (inc_addr, wsrep_node_incoming_address, need);
+ }
+ }
+
+ struct wsrep_init_args wsrep_args;
+
+ struct wsrep_gtid const state_id = { local_uuid, local_seqno };
+
+ wsrep_args.data_dir = wsrep_data_home_dir;
+ wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
+ wsrep_args.node_address = node_addr;
+ wsrep_args.node_incoming = inc_addr;
+ wsrep_args.options = (wsrep_provider_options) ?
+ wsrep_provider_options : "";
+ wsrep_args.proto_ver = wsrep_max_protocol_version;
+
+ wsrep_args.state_id = &state_id;
+
+ wsrep_args.logger_cb = wsrep_log_cb;
+ wsrep_args.view_handler_cb = wsrep_view_handler_cb;
+ wsrep_args.apply_cb = wsrep_apply_cb;
+ wsrep_args.commit_cb = wsrep_commit_cb;
+ wsrep_args.unordered_cb = wsrep_unordered_cb;
+ wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
+ wsrep_args.synced_cb = wsrep_synced_cb;
+
+ rcode = wsrep->init(wsrep, &wsrep_args);
+
+ if (rcode)
+ {
+ DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
+ WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
+ wsrep->free(wsrep);
+ free(wsrep);
+ wsrep = NULL;
+ } else {
+ wsrep_inited= 1;
+ }
+
+ return rcode;
+}
+
+extern int wsrep_on(void *);
+
+void wsrep_init_startup (bool first)
+{
+ if (wsrep_init()) unireg_abort(1);
+
+ wsrep_thr_lock_init(wsrep_thd_is_BF, wsrep_abort_thd,
+ wsrep_debug, wsrep_convert_LOCK_to_trx, wsrep_on);
+
+ /* Skip replication start if no cluster address */
+ if (!wsrep_cluster_address || strlen(wsrep_cluster_address) == 0) return;
+
+ if (first) wsrep_sst_grab(); // do it so we can wait for SST below
+
+ if (!wsrep_start_replication()) unireg_abort(1);
+
+ wsrep_create_rollbacker();
+ wsrep_create_appliers(1);
+
+ if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
+}
+
+
+void wsrep_deinit(bool free_options)
+{
+ DBUG_ASSERT(wsrep_inited == 1);
+ wsrep_unload(wsrep);
+ wsrep= 0;
+ provider_name[0]= '\0';
+ provider_version[0]= '\0';
+ provider_vendor[0]= '\0';
+ wsrep_inited= 0;
+
+ if (free_options)
+ {
+ wsrep_sst_auth_free();
+ }
+}
+
+void wsrep_recover()
+{
+ if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) &&
+ local_seqno == -2)
+ {
+ char uuid_str[40];
+ wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
+ WSREP_INFO("Position %s:%lld given at startup, skipping position recovery",
+ uuid_str, (long long)local_seqno);
+ return;
+ }
+ XID xid;
+ memset(&xid, 0, sizeof(xid));
+ xid.formatID= -1;
+ wsrep_get_SE_checkpoint(&xid);
+ char uuid_str[40];
+ wsrep_uuid_print(wsrep_xid_uuid(&xid), uuid_str, sizeof(uuid_str));
+ WSREP_INFO("Recovered position: %s:%lld", uuid_str,
+ (long long)wsrep_xid_seqno(&xid));
+}
+
+
+void wsrep_stop_replication(THD *thd)
+{
+ WSREP_INFO("Stop replication");
+ if (!wsrep)
+ {
+ WSREP_INFO("Provider was not loaded, in stop replication");
+ return;
+ }
+
+ /* disconnect from group first to get wsrep_ready == FALSE */
+ WSREP_DEBUG("Provider disconnect");
+ wsrep->disconnect(wsrep);
+
+ wsrep_connected= FALSE;
+
+ wsrep_close_client_connections(TRUE);
+
+ /* wait until appliers have stopped */
+ wsrep_wait_appliers_close(thd);
+
+ return;
+}
+
+/* This one is set to true when --wsrep-new-cluster is found in the command
+ * line arguments */
+static my_bool wsrep_new_cluster= FALSE;
+#define WSREP_NEW_CLUSTER "--wsrep-new-cluster"
+/* Finds and hides --wsrep-new-cluster from the arguments list
+ * by moving it to the end of the list and decrementing argument count */
+void wsrep_filter_new_cluster (int* argc, char* argv[])
+{
+ int i;
+ for (i= *argc - 1; i > 0; i--)
+ {
+ /* make a copy of the argument to convert possible underscores to hyphens.
+ * the copy need not to be longer than WSREP_NEW_CLUSTER option */
+ char arg[sizeof(WSREP_NEW_CLUSTER) + 1]= { 0, };
+ strncpy(arg, argv[i], sizeof(arg) - 1);
+ char* underscore(arg);
+ while (NULL != (underscore= strchr(underscore, '_'))) *underscore= '-';
+
+ if (!strcmp(arg, WSREP_NEW_CLUSTER))
+ {
+ wsrep_new_cluster= TRUE;
+ *argc -= 1;
+ /* preserve the order of remaining arguments AND
+ * preserve the original argument pointers - just in case */
+ char* wnc= argv[i];
+ memmove(&argv[i], &argv[i + 1], (*argc - i)*sizeof(argv[i]));
+ argv[*argc]= wnc; /* this will be invisible to the rest of the program */
+ }
+ }
+}
+
+bool wsrep_start_replication()
+{
+ wsrep_status_t rcode;
+
+ /*
+ if provider is trivial, don't even try to connect,
+ but resume local node operation
+ */
+ if (!WSREP_PROVIDER_EXISTS)
+ {
+ // enable normal operation in case no provider is specified
+ wsrep_ready_set(TRUE);
+ return true;
+ }
+
+ if (!wsrep_cluster_address || strlen(wsrep_cluster_address)== 0)
+ {
+ // if provider is non-trivial, but no address is specified, wait for address
+ wsrep_ready_set(FALSE);
+ return true;
+ }
+
+ bool const bootstrap(TRUE == wsrep_new_cluster);
+ wsrep_new_cluster= FALSE;
+
+ WSREP_INFO("Start replication");
+
+ if ((rcode = wsrep->connect(wsrep,
+ wsrep_cluster_name,
+ wsrep_cluster_address,
+ wsrep_sst_donor,
+ bootstrap)))
+ {
+ if (-ESOCKTNOSUPPORT == rcode)
+ {
+ DBUG_PRINT("wsrep",("unrecognized cluster address: '%s', rcode: %d",
+ wsrep_cluster_address, rcode));
+ WSREP_ERROR("unrecognized cluster address: '%s', rcode: %d",
+ wsrep_cluster_address, rcode);
+ }
+ else
+ {
+ DBUG_PRINT("wsrep",("wsrep->connect() failed: %d", rcode));
+ WSREP_ERROR("wsrep::connect() failed: %d", rcode);
+ }
+
+ return false;
+ }
+ else
+ {
+ wsrep_connected= TRUE;
+
+ char* opts= wsrep->options_get(wsrep);
+ if (opts)
+ {
+ wsrep_provider_options_init(opts);
+ free(opts);
+ }
+ else
+ {
+ WSREP_WARN("Failed to get wsrep options");
+ }
+ }
+
+ return true;
+}
+
+bool wsrep_sync_wait (THD* thd, uint mask)
+{
+ if ((thd->variables.wsrep_sync_wait & mask) &&
+ thd->variables.wsrep_on &&
+ !thd->in_active_multi_stmt_transaction() &&
+ thd->wsrep_conflict_state != REPLAYING)
+ {
+ WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u",
+ thd->variables.wsrep_sync_wait, mask);
+ // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
+ // TODO: modify to check if thd has locked any rows.
+ wsrep_gtid_t gtid;
+ wsrep_status_t ret= wsrep->causal_read (wsrep, &gtid);
+
+ if (unlikely(WSREP_OK != ret))
+ {
+ const char* msg;
+ int err;
+
+ // Possibly relevant error codes:
+ // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
+ // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
+ // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
+
+ switch (ret)
+ {
+ case WSREP_NOT_IMPLEMENTED:
+ msg= "synchronous reads by wsrep backend. "
+ "Please unset wsrep_causal_reads variable.";
+ err= ER_NOT_SUPPORTED_YET;
+ break;
+ default:
+ msg= "Synchronous wait failed.";
+ err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
+ // with ER_LOCK_WAIT_TIMEOUT
+ }
+
+ my_error(err, MYF(0), msg);
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/*
+ * Helpers to deal with TOI key arrays
+ */
+typedef struct wsrep_key_arr
+{
+ wsrep_key_t* keys;
+ size_t keys_len;
+} wsrep_key_arr_t;
+
+
+static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
+{
+ for (size_t i= 0; i < key_arr->keys_len; ++i)
+ {
+ my_free((void*)key_arr->keys[i].key_parts);
+ }
+ my_free(key_arr->keys);
+ key_arr->keys= 0;
+ key_arr->keys_len= 0;
+}
+
+
+/*!
+ * @param db Database string
+ * @param table Table string
+ * @param key Array of wsrep_key_t
+ * @param key_len In: number of elements in key array, Out: number of
+ * elements populated
+ *
+ * @return true if preparation was successful, otherwise false.
+ */
+
+static bool wsrep_prepare_key_for_isolation(const char* db,
+ const char* table,
+ wsrep_buf_t* key,
+ size_t* key_len)
+{
+ if (*key_len < 2) return false;
+
+ switch (wsrep_protocol_version)
+ {
+ case 0:
+ *key_len= 0;
+ break;
+ case 1:
+ case 2:
+ case 3:
+ {
+ *key_len= 0;
+ if (db)
+ {
+ // sql_print_information("%s.%s", db, table);
+ if (db)
+ {
+ key[*key_len].ptr= db;
+ key[*key_len].len= strlen(db);
+ ++(*key_len);
+ if (table)
+ {
+ key[*key_len].ptr= table;
+ key[*key_len].len= strlen(table);
+ ++(*key_len);
+ }
+ }
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+
+ return true;
+}
+
+/* Prepare key list from db/table and table_list */
+static bool wsrep_prepare_keys_for_isolation(THD* thd,
+ const char* db,
+ const char* table,
+ const TABLE_LIST* table_list,
+ wsrep_key_arr_t* ka)
+{
+ ka->keys= 0;
+ ka->keys_len= 0;
+
+ extern TABLE* find_temporary_table(THD*, const TABLE_LIST*);
+
+ if (db || table)
+ {
+ TABLE_LIST tmp_table;
+ MDL_request mdl_request;
+
+ memset(&tmp_table, 0, sizeof(tmp_table));
+ tmp_table.table_name= (char*)table;
+ tmp_table.db= (char*)db;
+ tmp_table.mdl_request.init(MDL_key::GLOBAL, (db) ? db : "",
+ (table) ? table : "",
+ MDL_INTENTION_EXCLUSIVE, MDL_STATEMENT);
+
+ if (!table || !find_temporary_table(thd, &tmp_table))
+ {
+ if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
+ {
+ WSREP_ERROR("Can't allocate memory for key_array");
+ goto err;
+ }
+ ka->keys_len= 1;
+ if (!(ka->keys[0].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
+ {
+ WSREP_ERROR("Can't allocate memory for key_parts");
+ goto err;
+ }
+ ka->keys[0].key_parts_num= 2;
+ if (!wsrep_prepare_key_for_isolation(
+ db, table,
+ (wsrep_buf_t*)ka->keys[0].key_parts,
+ &ka->keys[0].key_parts_num))
+ {
+ WSREP_ERROR("Preparing keys for isolation failed");
+ goto err;
+ }
+ }
+ }
+
+ for (const TABLE_LIST* table= table_list; table; table= table->next_global)
+ {
+ if (!find_temporary_table(thd, table))
+ {
+ wsrep_key_t* tmp;
+ tmp= (wsrep_key_t*)my_realloc(
+ ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t),
+ MYF(MY_ALLOW_ZERO_PTR));
+
+ if (!tmp)
+ {
+ WSREP_ERROR("Can't allocate memory for key_array");
+ goto err;
+ }
+ ka->keys= tmp;
+ if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
+ my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
+ {
+ WSREP_ERROR("Can't allocate memory for key_parts");
+ goto err;
+ }
+ ka->keys[ka->keys_len].key_parts_num= 2;
+ ++ka->keys_len;
+ if (!wsrep_prepare_key_for_isolation(
+ table->db, table->table_name,
+ (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
+ &ka->keys[ka->keys_len - 1].key_parts_num))
+ {
+ WSREP_ERROR("Preparing keys for isolation failed");
+ goto err;
+ }
+ }
+ }
+ return true;
+err:
+ wsrep_keys_free(ka);
+ return false;
+}
+
+
+bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
+ size_t cache_key_len,
+ const uchar* row_id,
+ size_t row_id_len,
+ wsrep_buf_t* key,
+ size_t* key_len)
+{
+ if (*key_len < 3) return false;
+
+ *key_len= 0;
+ switch (wsrep_protocol_version)
+ {
+ case 0:
+ {
+ key[0].ptr = cache_key;
+ key[0].len = cache_key_len;
+
+ *key_len = 1;
+ break;
+ }
+ case 1:
+ case 2:
+ case 3:
+ {
+ key[0].ptr = cache_key;
+ key[0].len = strlen( (char*)cache_key );
+
+ key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
+ key[1].len = strlen( (char*)(key[1].ptr) );
+
+ *key_len = 2;
+ break;
+ }
+ default:
+ return false;
+ }
+
+ key[*key_len].ptr = row_id;
+ key[*key_len].len = row_id_len;
+ ++(*key_len);
+
+ return true;
+}
+
+
+/*
+ * Construct Query_log_Event from thd query and serialize it
+ * into buffer.
+ *
+ * Return 0 in case of success, 1 in case of error.
+ */
+int wsrep_to_buf_helper(
+ THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
+{
+ IO_CACHE tmp_io_cache;
+ if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
+ 65536, MYF(MY_WME)))
+ return 1;
+ int ret(0);
+
+#ifdef GTID_SUPPORT
+ if (thd->variables.gtid_next.type == GTID_GROUP)
+ {
+ Gtid_log_event gtid_ev(thd, FALSE, &thd->variables.gtid_next);
+ if (!gtid_ev.is_valid()) ret= 0;
+ if (!ret && gtid_ev.write(&tmp_io_cache)) ret= 1;
+ }
+#endif /* GTID_SUPPORT */
+
+ /* if there is prepare query, add event for it */
+ if (!ret && thd->wsrep_TOI_pre_query)
+ {
+ Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
+ thd->wsrep_TOI_pre_query_len,
+ FALSE, FALSE, FALSE, 0);
+ if (ev.write(&tmp_io_cache)) ret= 1;
+ }
+
+ /* continue to append the actual query */
+ Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
+ if (!ret && ev.write(&tmp_io_cache)) ret= 1;
+ if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
+ close_cached_file(&tmp_io_cache);
+ return ret;
+}
+
+#include "sql_show.h"
+static int
+create_view_query(THD *thd, uchar** buf, size_t* buf_len)
+{
+ LEX *lex= thd->lex;
+ SELECT_LEX *select_lex= &lex->select_lex;
+ TABLE_LIST *first_table= select_lex->table_list.first;
+ TABLE_LIST *views = first_table;
+
+ String buff;
+ const LEX_STRING command[3]=
+ {{ C_STRING_WITH_LEN("CREATE ") },
+ { C_STRING_WITH_LEN("ALTER ") },
+ { C_STRING_WITH_LEN("CREATE OR REPLACE ") }};
+
+ buff.append(command[thd->lex->create_view_mode].str,
+ command[thd->lex->create_view_mode].length);
+
+ if (!lex->definer)
+ {
+ /*
+ DEFINER-clause is missing; we have to create default definer in
+ persistent arena to be PS/SP friendly.
+ If this is an ALTER VIEW then the current user should be set as
+ the definer.
+ */
+
+ if (!(lex->definer= create_default_definer(thd, false)))
+ {
+ WSREP_WARN("view default definer issue");
+ }
+ }
+
+ views->algorithm = lex->create_view_algorithm;
+ views->definer.user = lex->definer->user;
+ views->definer.host = lex->definer->host;
+ views->view_suid = lex->create_view_suid;
+ views->with_check = lex->create_view_check;
+
+ view_store_options(thd, views, &buff);
+ buff.append(STRING_WITH_LEN("VIEW "));
+ /* Test if user supplied a db (ie: we did not use thd->db) */
+ if (views->db && views->db[0] &&
+ (thd->db == NULL || strcmp(views->db, thd->db)))
+ {
+ append_identifier(thd, &buff, views->db,
+ views->db_length);
+ buff.append('.');
+ }
+ append_identifier(thd, &buff, views->table_name,
+ views->table_name_length);
+ if (lex->view_list.elements)
+ {
+ List_iterator_fast<LEX_STRING> names(lex->view_list);
+ LEX_STRING *name;
+ int i;
+
+ for (i= 0; (name= names++); i++)
+ {
+ buff.append(i ? ", " : "(");
+ append_identifier(thd, &buff, name->str, name->length);
+ }
+ buff.append(')');
+ }
+ buff.append(STRING_WITH_LEN(" AS "));
+ //buff.append(views->source.str, views->source.length);
+ buff.append(thd->lex->create_view_select.str,
+ thd->lex->create_view_select.length);
+ //int errcode= query_error_code(thd, TRUE);
+ //if (thd->binlog_query(THD::STMT_QUERY_TYPE,
+ // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
+ return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
+}
+
+static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
+ const TABLE_LIST* table_list)
+{
+ wsrep_status_t ret(WSREP_WARNING);
+ uchar* buf(0);
+ size_t buf_len(0);
+ int buf_err;
+
+ WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode, thd->query() );
+ switch (thd->lex->sql_command)
+ {
+ case SQLCOM_CREATE_VIEW:
+ buf_err= create_view_query(thd, &buf, &buf_len);
+ break;
+ case SQLCOM_CREATE_PROCEDURE:
+ case SQLCOM_CREATE_SPFUNCTION:
+ buf_err= wsrep_create_sp(thd, &buf, &buf_len);
+ break;
+ case SQLCOM_CREATE_TRIGGER:
+ buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len);
+ break;
+ case SQLCOM_CREATE_EVENT:
+ buf_err= wsrep_create_event_query(thd, &buf, &buf_len);
+ break;
+ case SQLCOM_ALTER_EVENT:
+ buf_err= wsrep_alter_event_query(thd, &buf, &buf_len);
+ break;
+ default:
+ buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), &buf,
+ &buf_len);
+ break;
+ }
+
+ wsrep_key_arr_t key_arr= {0, 0};
+ struct wsrep_buf buff = { buf, buf_len };
+ if (!buf_err &&
+ wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&&
+ WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
+ key_arr.keys, key_arr.keys_len,
+ &buff, 1,
+ &thd->wsrep_trx_meta)))
+ {
+ thd->wsrep_exec_mode= TOTAL_ORDER;
+ wsrep_to_isolation++;
+ my_free(buf);
+ wsrep_keys_free(&key_arr);
+ WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode);
+ }
+ else {
+ /* jump to error handler in mysql_execute_command() */
+ WSREP_WARN("TO isolation failed for: %d, sql: %s. Check wsrep "
+ "connection state and retry the query.",
+ ret, (thd->query()) ? thd->query() : "void");
+ my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check "
+ "your wsrep connection state and retry the query.");
+ if (buf) my_free(buf);
+ wsrep_keys_free(&key_arr);
+ return -1;
+ }
+ return 0;
+}
+
+static void wsrep_TOI_end(THD *thd) {
+ wsrep_status_t ret;
+ wsrep_to_isolation--;
+
+ WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void");
+
+ XID xid;
+ wsrep_xid_init(&xid, &thd->wsrep_trx_meta.gtid.uuid,
+ thd->wsrep_trx_meta.gtid.seqno);
+ wsrep_set_SE_checkpoint(&xid);
+ WSREP_DEBUG("TO END: %lld, update seqno",
+ (long long)wsrep_thd_trx_seqno(thd));
+
+ if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
+ WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
+ }
+ else {
+ WSREP_WARN("TO isolation end failed for: %d, sql: %s",
+ ret, (thd->query()) ? thd->query() : "void");
+ }
+}
+
+static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
+{
+ wsrep_status_t ret(WSREP_WARNING);
+ WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode, thd->query() );
+
+ ret = wsrep->desync(wsrep);
+ if (ret != WSREP_OK)
+ {
+ WSREP_WARN("RSU desync failed %d for %s", ret, thd->query());
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ return(ret);
+ }
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying++;
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+
+ if (wsrep_wait_committing_connections_close(5000))
+ {
+ /* no can do, bail out from DDL */
+ WSREP_WARN("RSU failed due to pending transactions, %s", thd->query());
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying--;
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+
+ ret = wsrep->resync(wsrep);
+ if (ret != WSREP_OK)
+ {
+ WSREP_WARN("resync failed %d for %s", ret, thd->query());
+ }
+ my_error(ER_LOCK_DEADLOCK, MYF(0));
+ return(1);
+ }
+
+ wsrep_seqno_t seqno = wsrep->pause(wsrep);
+ if (seqno == WSREP_SEQNO_UNDEFINED)
+ {
+ WSREP_WARN("pause failed %lld for %s", (long long)seqno, thd->query());
+ return(1);
+ }
+ WSREP_DEBUG("paused at %lld", (long long)seqno);
+ thd->variables.wsrep_on = 0;
+ return 0;
+}
+
+static void wsrep_RSU_end(THD *thd)
+{
+ wsrep_status_t ret(WSREP_WARNING);
+ WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
+ thd->wsrep_exec_mode, thd->query() );
+
+
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ wsrep_replaying--;
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+
+ ret = wsrep->resume(wsrep);
+ if (ret != WSREP_OK)
+ {
+ WSREP_WARN("resume failed %d for %s", ret, thd->query());
+ }
+ ret = wsrep->resync(wsrep);
+ if (ret != WSREP_OK)
+ {
+ WSREP_WARN("resync failed %d for %s", ret, thd->query());
+ return;
+ }
+ thd->variables.wsrep_on = 1;
+}
+
+int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
+ const TABLE_LIST* table_list)
+{
+ int ret= 0;
+
+ /*
+ No isolation for applier or replaying threads.
+ */
+ if (thd->wsrep_exec_mode == REPL_RECV)
+ return 0;
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+
+ if (thd->wsrep_conflict_state == MUST_ABORT)
+ {
+ WSREP_INFO("thread: %lu, %s has been aborted due to multi-master conflict",
+ thd->thread_id, thd->query());
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+ return WSREP_TRX_FAIL;
+ }
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ DBUG_ASSERT(thd->wsrep_exec_mode == LOCAL_STATE);
+ DBUG_ASSERT(thd->wsrep_trx_meta.gtid.seqno == WSREP_SEQNO_UNDEFINED);
+
+ if (thd->global_read_lock.can_acquire_protection())
+ {
+ WSREP_DEBUG("Aborting TOI: Global Read-Lock (FTWRL) in place: %s %lu",
+ thd->query(), thd->thread_id);
+ return -1;
+ }
+
+ if (wsrep_debug && thd->mdl_context.has_locks())
+ {
+ WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu",
+ thd->query(), thd->thread_id);
+ }
+
+ /*
+ It makes sense to set auto_increment_* to defaults in TOI operations.
+ Must be done before wsrep_TOI_begin() since Query_log_event encapsulating
+ TOI statement and auto inc variables for wsrep replication is constructed
+ there. Variables are reset back in THD::reset_for_next_command() before
+ processing of next command.
+ */
+ if (wsrep_auto_increment_control)
+ {
+ thd->variables.auto_increment_offset = 1;
+ thd->variables.auto_increment_increment = 1;
+ }
+
+ if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
+ {
+ switch (wsrep_OSU_method_options) {
+ case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_,
+ table_list); break;
+ case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break;
+ }
+ if (!ret)
+ {
+ thd->wsrep_exec_mode= TOTAL_ORDER;
+ }
+ }
+ return ret;
+}
+
+void wsrep_to_isolation_end(THD *thd)
+{
+ if (thd->wsrep_exec_mode == TOTAL_ORDER)
+ {
+ switch(wsrep_OSU_method_options)
+ {
+ case WSREP_OSU_TOI: wsrep_TOI_end(thd); break;
+ case WSREP_OSU_RSU: wsrep_RSU_end(thd); break;
+ }
+ wsrep_cleanup_transaction(thd);
+ }
+}
+
+#define WSREP_MDL_LOG(severity, msg, req, gra) \
+ WSREP_##severity( \
+ "%s\n" \
+ "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
+ "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
+ msg, \
+ req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
+ req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
+ req->get_command(), req->lex->sql_command, req->query(), \
+ gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
+ gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
+ gra->get_command(), gra->lex->sql_command, gra->query());
+
+bool
+wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
+ MDL_ticket *ticket
+) {
+ if (!WSREP_ON) return FALSE;
+
+ THD *request_thd = requestor_ctx->get_thd();
+ THD *granted_thd = ticket->get_ctx()->get_thd();
+ bool ret = FALSE;
+
+ mysql_mutex_lock(&request_thd->LOCK_wsrep_thd);
+ if (request_thd->wsrep_exec_mode == TOTAL_ORDER ||
+ request_thd->wsrep_exec_mode == REPL_RECV)
+ {
+ mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd);
+ WSREP_MDL_LOG(DEBUG, "MDL conflict ", request_thd, granted_thd);
+ ticket->wsrep_report(wsrep_debug);
+
+ mysql_mutex_lock(&granted_thd->LOCK_wsrep_thd);
+ if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
+ granted_thd->wsrep_exec_mode == REPL_RECV)
+ {
+ WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", request_thd, granted_thd);
+ ticket->wsrep_report(true);
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ ret = TRUE;
+ }
+ else if (granted_thd->lex->sql_command == SQLCOM_FLUSH)
+ {
+ WSREP_DEBUG("mdl granted over FLUSH BF");
+ ticket->wsrep_report(wsrep_debug);
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ ret = TRUE;
+ }
+ else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
+ {
+ WSREP_DEBUG("DROP caused BF abort");
+ ticket->wsrep_report(wsrep_debug);
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
+ ret = FALSE;
+ }
+ else if (granted_thd->wsrep_query_state == QUERY_COMMITTING)
+ {
+ WSREP_DEBUG("mdl granted, but commiting thd abort scheduled");
+ ticket->wsrep_report(wsrep_debug);
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
+ ret = FALSE;
+ }
+ else
+ {
+ WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", request_thd, granted_thd);
+ ticket->wsrep_report(wsrep_debug);
+ mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
+ wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
+ ret = FALSE;
+ }
+ }
+ else
+ {
+ mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd);
+ }
+ return ret;
+}
+
+
+pthread_handler_t start_wsrep_THD(void *arg)
+{
+ THD *thd;
+ rpl_sql_thread_info sql_info(NULL);
+ wsrep_thd_processor_fun processor= (wsrep_thd_processor_fun)arg;
+
+ if (my_thread_init())
+ {
+ WSREP_ERROR("Could not initialize thread");
+ return(NULL);
+ }
+
+ if (!(thd= new THD(true)))
+ {
+ return(NULL);
+ }
+ mysql_mutex_lock(&LOCK_thread_count);
+ thd->thread_id=thread_id++;
+
+ thd->real_id=pthread_self(); // Keep purify happy
+ thread_count++;
+ thread_created++;
+ threads.append(thd);
+
+ my_net_init(&thd->net,(st_vio*) 0, MYF(0));
+
+ DBUG_PRINT("wsrep",(("creating thread %lld"), (long long)thd->thread_id));
+ thd->prior_thr_create_utime= thd->start_utime= microsecond_interval_timer();
+ (void) mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* from bootstrap()... */
+ thd->bootstrap=1;
+ thd->max_client_packet_length= thd->net.max_packet;
+ thd->security_ctx->master_access= ~(ulong)0;
+ thd->system_thread_info.rpl_sql_info= &sql_info;
+
+ /* from handle_one_connection... */
+ pthread_detach_this_thread();
+
+ mysql_thread_set_psi_id(thd->thread_id);
+ thd->thr_create_utime= microsecond_interval_timer();
+ if (MYSQL_CALLBACK_ELSE(thread_scheduler, init_new_connection_thread, (), 0))
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+
+ return(NULL);
+ }
+
+// </5.1.17>
+ /*
+ handle_one_connection() is normally the only way a thread would
+ start and would always be on the very high end of the stack ,
+ therefore, the thread stack always starts at the address of the
+ first local variable of handle_one_connection, which is thd. We
+ need to know the start of the stack so that we could check for
+ stack overruns.
+ */
+ DBUG_PRINT("wsrep", ("handle_one_connection called by thread %lld\n",
+ (long long)thd->thread_id));
+ /* now that we've called my_thread_init(), it is safe to call DBUG_* */
+
+ thd->thread_stack= (char*) &thd;
+ if (thd->store_globals())
+ {
+ close_connection(thd, ER_OUT_OF_RESOURCES);
+ statistic_increment(aborted_connects,&LOCK_status);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 0));
+ delete thd;
+
+ return(NULL);
+ }
+
+ thd->system_thread= SYSTEM_THREAD_SLAVE_SQL;
+ thd->security_ctx->skip_grants();
+
+ /* handle_one_connection() again... */
+ //thd->version= refresh_version;
+ thd->proc_info= 0;
+ thd->set_command(COM_SLEEP);
+ thd->set_time();
+ thd->init_for_queries();
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads++;
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ processor(thd);
+
+ close_connection(thd, 0);
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ wsrep_running_threads--;
+ WSREP_DEBUG("wsrep running threads now: %lu", wsrep_running_threads);
+ mysql_cond_broadcast(&COND_thread_count);
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ // Note: We can't call THD destructor without crashing
+ // if plugins have not been initialized. However, in most of the
+ // cases this means that pre SE initialization SST failed and
+ // we are going to exit anyway.
+ if (plugins_are_initialized)
+ {
+ net_end(&thd->net);
+ MYSQL_CALLBACK(thread_scheduler, end_thread, (thd, 1));
+ }
+ else
+ {
+ // TODO: lightweight cleanup to get rid of:
+ // 'Error in my_thread_global_end(): 2 threads didn't exit'
+ // at server shutdown
+ }
+
+ my_thread_end();
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_lock(&LOCK_thread_count);
+ delete thd;
+ thread_count--;
+ mysql_mutex_unlock(&LOCK_thread_count);
+ }
+ return(NULL);
+}
+
+
+/**/
+static bool abort_replicated(THD *thd)
+{
+ bool ret_code= false;
+ if (thd->wsrep_query_state== QUERY_COMMITTING)
+ {
+ if (wsrep_debug) WSREP_INFO("aborting replicated trx: %lu", thd->real_id);
+
+ (void)wsrep_abort_thd(thd, thd, TRUE);
+ ret_code= true;
+ }
+ return ret_code;
+}
+
+
+/**/
+static inline bool is_client_connection(THD *thd)
+{
+ return (thd->wsrep_client_thread && thd->variables.wsrep_on);
+}
+
+
+static inline bool is_replaying_connection(THD *thd)
+{
+ bool ret;
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ ret= (thd->wsrep_conflict_state == REPLAYING) ? true : false;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ return ret;
+}
+
+
+static inline bool is_committing_connection(THD *thd)
+{
+ bool ret;
+
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+ ret= (thd->wsrep_query_state == QUERY_COMMITTING) ? true : false;
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+
+ return ret;
+}
+
+
+static bool have_client_connections()
+{
+ THD *tmp;
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ if (is_client_connection(tmp) && tmp->killed == KILL_CONNECTION)
+ {
+ (void)abort_replicated(tmp);
+ return true;
+ }
+ }
+ return false;
+}
+
+/*
+ returns the number of wsrep appliers running.
+ However, the caller (thd parameter) is not taken in account
+ */
+static int have_wsrep_appliers(THD *thd)
+{
+ int ret= 0;
+ THD *tmp;
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ ret+= (tmp != thd && tmp->wsrep_applier);
+ }
+ return ret;
+}
+
+
+static void wsrep_close_thread(THD *thd)
+{
+ thd->killed= KILL_CONNECTION;
+ MYSQL_CALLBACK(thread_scheduler, post_kill_notification, (thd));
+ if (thd->mysys_var)
+ {
+ thd->mysys_var->abort=1;
+ mysql_mutex_lock(&thd->mysys_var->mutex);
+ if (thd->mysys_var->current_cond)
+ {
+ mysql_mutex_lock(thd->mysys_var->current_mutex);
+ mysql_cond_broadcast(thd->mysys_var->current_cond);
+ mysql_mutex_unlock(thd->mysys_var->current_mutex);
+ }
+ mysql_mutex_unlock(&thd->mysys_var->mutex);
+ }
+}
+
+
+static my_bool have_committing_connections()
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ if (!is_client_connection(tmp))
+ continue;
+
+ if (is_committing_connection(tmp))
+ {
+ return TRUE;
+ }
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ return FALSE;
+}
+
+
+int wsrep_wait_committing_connections_close(int wait_time)
+{
+ int sleep_time= 100;
+
+ while (have_committing_connections() && wait_time > 0)
+ {
+ WSREP_DEBUG("wait for committing transaction to close: %d", wait_time);
+ my_sleep(sleep_time);
+ wait_time -= sleep_time;
+ }
+ if (have_committing_connections())
+ {
+ return 1;
+ }
+ return 0;
+}
+
+
+void wsrep_close_client_connections(my_bool wait_to_end)
+{
+ /*
+ First signal all threads that it's time to die
+ */
+
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ bool kill_cached_threads_saved= kill_cached_threads;
+ kill_cached_threads= true; // prevent future threads caching
+ mysql_cond_broadcast(&COND_thread_cache); // tell cached threads to die
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (!is_client_connection(tmp))
+ continue;
+
+ if (is_replaying_connection(tmp))
+ {
+ tmp->killed= KILL_CONNECTION;
+ continue;
+ }
+
+ /* replicated transactions must be skipped */
+ if (abort_replicated(tmp))
+ continue;
+
+ WSREP_DEBUG("closing connection %ld", tmp->thread_id);
+ wsrep_close_thread(tmp);
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ if (thread_count)
+ sleep(2); // Give threads time to die
+
+ mysql_mutex_lock(&LOCK_thread_count);
+ /*
+ Force remaining threads to die by closing the connection to the client
+ */
+
+ I_List_iterator<THD> it2(threads);
+ while ((tmp=it2++))
+ {
+#ifndef __bsdi__ // Bug in BSDI kernel
+ if (is_client_connection(tmp) &&
+ !abort_replicated(tmp) &&
+ !is_replaying_connection(tmp))
+ {
+ WSREP_INFO("killing local connection: %ld",tmp->thread_id);
+ close_connection(tmp,0);
+ }
+#endif
+ }
+
+ DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
+ if (wsrep_debug)
+ WSREP_INFO("waiting for client connections to close: %u", thread_count);
+
+ while (wait_to_end && have_client_connections())
+ {
+ mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
+ DBUG_PRINT("quit",("One thread died (count=%u)", thread_count));
+ }
+
+ kill_cached_threads= kill_cached_threads_saved;
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* All client connection threads have now been aborted */
+}
+
+
+void wsrep_close_applier(THD *thd)
+{
+ WSREP_DEBUG("closing applier %ld", thd->thread_id);
+ wsrep_close_thread(thd);
+}
+
+
+void wsrep_close_threads(THD *thd)
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++))
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (tmp->wsrep_applier && tmp != thd)
+ {
+ WSREP_DEBUG("closing wsrep thread %ld", tmp->thread_id);
+ wsrep_close_thread (tmp);
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+}
+
+
+void wsrep_close_applier_threads(int count)
+{
+ THD *tmp;
+ mysql_mutex_lock(&LOCK_thread_count); // For unlink from list
+
+ I_List_iterator<THD> it(threads);
+ while ((tmp=it++) && count)
+ {
+ DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
+ tmp->thread_id));
+ /* We skip slave threads & scheduler on this first loop through. */
+ if (tmp->wsrep_applier)
+ {
+ WSREP_DEBUG("closing wsrep applier thread %ld", tmp->thread_id);
+ tmp->wsrep_applier_closing= TRUE;
+ count--;
+ }
+ }
+
+ mysql_mutex_unlock(&LOCK_thread_count);
+}
+
+
+void wsrep_wait_appliers_close(THD *thd)
+{
+ /* Wait for wsrep appliers to gracefully exit */
+ mysql_mutex_lock(&LOCK_thread_count);
+ while (have_wsrep_appliers(thd) > 1)
+ // 1 is for rollbacker thread which needs to be killed explicitly.
+ // This gotta be fixed in a more elegant manner if we gonna have arbitrary
+ // number of non-applier wsrep threads.
+ {
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_unlock(&LOCK_thread_count);
+ my_sleep(100);
+ mysql_mutex_lock(&LOCK_thread_count);
+ }
+ else
+ mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ DBUG_PRINT("quit",("One applier died (count=%u)",thread_count));
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ /* Now kill remaining wsrep threads: rollbacker */
+ wsrep_close_threads (thd);
+ /* and wait for them to die */
+ mysql_mutex_lock(&LOCK_thread_count);
+ while (have_wsrep_appliers(thd) > 0)
+ {
+ if (thread_handling > SCHEDULER_ONE_THREAD_PER_CONNECTION)
+ {
+ mysql_mutex_unlock(&LOCK_thread_count);
+ my_sleep(100);
+ mysql_mutex_lock(&LOCK_thread_count);
+ }
+ else
+ mysql_cond_wait(&COND_thread_count,&LOCK_thread_count);
+ DBUG_PRINT("quit",("One thread died (count=%u)",thread_count));
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+
+ /* All wsrep applier threads have now been aborted. However, if this thread
+ is also applier, we are still running...
+ */
+}
+
+
+void wsrep_kill_mysql(THD *thd)
+{
+ if (mysqld_server_started)
+ {
+ if (!shutdown_in_progress)
+ {
+ WSREP_INFO("starting shutdown");
+ kill_mysql();
+ }
+ }
+ else
+ {
+ unireg_abort(1);
+ }
+}
+
+
+int wsrep_create_sp(THD *thd, uchar** buf, size_t* buf_len)
+{
+ String log_query;
+ sp_head *sp = thd->lex->sphead;
+ ulong saved_mode= thd->variables.sql_mode;
+ String retstr(64);
+ retstr.set_charset(system_charset_info);
+
+ log_query.set_charset(system_charset_info);
+
+ if (sp->m_type == TYPE_ENUM_FUNCTION)
+ {
+ sp_returns_type(thd, retstr, sp);
+ }
+
+ if (!create_string(thd, &log_query,
+ sp->m_type,
+ (sp->m_explicit_name ? sp->m_db.str : NULL),
+ (sp->m_explicit_name ? sp->m_db.length : 0),
+ sp->m_name.str, sp->m_name.length,
+ sp->m_params.str, sp->m_params.length,
+ retstr.c_ptr(), retstr.length(),
+ sp->m_body.str, sp->m_body.length,
+ sp->m_chistics, &(thd->lex->definer->user),
+ &(thd->lex->definer->host),
+ saved_mode))
+ {
+ WSREP_WARN("SP create string failed: %s", thd->query());
+ return 1;
+ }
+
+ return wsrep_to_buf_helper(thd, log_query.ptr(), log_query.length(), buf, buf_len);
+}
+
+
+extern int wsrep_on(void *thd)
+{
+ return (int)(WSREP(((THD*)thd)));
+}
+
+
+extern "C" bool wsrep_thd_is_wsrep_on(THD *thd)
+{
+ return thd->variables.wsrep_on;
+}
+
+
+extern "C" bool wsrep_consistency_check(void *thd)
+{
+ return ((THD*)thd)->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING;
+}
+
+
+extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode)
+{
+ thd->wsrep_exec_mode= mode;
+}
+
+
+extern "C" void wsrep_thd_set_query_state(
+ THD *thd, enum wsrep_query_state state)
+{
+ thd->wsrep_query_state= state;
+}
+
+
+extern "C" void wsrep_thd_set_conflict_state(
+ THD *thd, enum wsrep_conflict_state state)
+{
+ thd->wsrep_conflict_state= state;
+}
+
+
+extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd)
+{
+ return thd->wsrep_exec_mode;
+}
+
+
+extern "C" const char *wsrep_thd_exec_mode_str(THD *thd)
+{
+ return
+ (!thd) ? "void" :
+ (thd->wsrep_exec_mode == LOCAL_STATE) ? "local" :
+ (thd->wsrep_exec_mode == REPL_RECV) ? "applier" :
+ (thd->wsrep_exec_mode == TOTAL_ORDER) ? "total order" :
+ (thd->wsrep_exec_mode == LOCAL_COMMIT) ? "local commit" : "void";
+}
+
+
+extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd)
+{
+ return thd->wsrep_query_state;
+}
+
+
+extern "C" const char *wsrep_thd_query_state_str(THD *thd)
+{
+ return
+ (!thd) ? "void" :
+ (thd->wsrep_query_state == QUERY_IDLE) ? "idle" :
+ (thd->wsrep_query_state == QUERY_EXEC) ? "executing" :
+ (thd->wsrep_query_state == QUERY_COMMITTING) ? "committing" :
+ (thd->wsrep_query_state == QUERY_EXITING) ? "exiting" :
+ (thd->wsrep_query_state == QUERY_ROLLINGBACK) ? "rolling back" : "void";
+}
+
+
+extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd)
+{
+ return thd->wsrep_conflict_state;
+}
+
+
+extern "C" const char *wsrep_thd_conflict_state_str(THD *thd)
+{
+ return
+ (!thd) ? "void" :
+ (thd->wsrep_conflict_state == NO_CONFLICT) ? "no conflict" :
+ (thd->wsrep_conflict_state == MUST_ABORT) ? "must abort" :
+ (thd->wsrep_conflict_state == ABORTING) ? "aborting" :
+ (thd->wsrep_conflict_state == MUST_REPLAY) ? "must replay" :
+ (thd->wsrep_conflict_state == REPLAYING) ? "replaying" :
+ (thd->wsrep_conflict_state == RETRY_AUTOCOMMIT) ? "retrying" :
+ (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void";
+}
+
+
+extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
+{
+ return &thd->wsrep_ws_handle;
+}
+
+
+extern "C" void wsrep_thd_LOCK(THD *thd)
+{
+ mysql_mutex_lock(&thd->LOCK_wsrep_thd);
+}
+
+
+extern "C" void wsrep_thd_UNLOCK(THD *thd)
+{
+ mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
+}
+
+
+extern "C" time_t wsrep_thd_query_start(THD *thd)
+{
+ return thd->query_start();
+}
+
+
+extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd)
+{
+ return thd->wsrep_rand;
+}
+
+
+extern "C" my_thread_id wsrep_thd_thread_id(THD *thd)
+{
+ return thd->thread_id;
+}
+
+
+extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd)
+{
+ return (thd) ? thd->wsrep_trx_meta.gtid.seqno : WSREP_SEQNO_UNDEFINED;
+}
+
+
+extern "C" query_id_t wsrep_thd_query_id(THD *thd)
+{
+ return thd->query_id;
+}
+
+
+extern "C" char *wsrep_thd_query(THD *thd)
+{
+ return (thd) ? thd->query() : NULL;
+}
+
+
+extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd)
+{
+ return thd->wsrep_last_query_id;
+}
+
+
+extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id)
+{
+ thd->wsrep_last_query_id= id;
+}
+
+
+extern "C" void wsrep_thd_awake(THD *thd, my_bool signal)
+{
+ if (signal)
+ {
+ mysql_mutex_lock(&thd->LOCK_thd_data);
+ thd->awake(KILL_QUERY);
+ mysql_mutex_unlock(&thd->LOCK_thd_data);
+ }
+ else
+ {
+ mysql_mutex_lock(&LOCK_wsrep_replaying);
+ mysql_cond_broadcast(&COND_wsrep_replaying);
+ mysql_mutex_unlock(&LOCK_wsrep_replaying);
+ }
+}
+
+
+extern "C" int wsrep_thd_retry_counter(THD *thd)
+{
+ return(thd->wsrep_retry_counter);
+}
+
+
+extern int
+wsrep_trx_order_before(void *thd1, void *thd2)
+{
+ if (wsrep_thd_trx_seqno((THD*)thd1) < wsrep_thd_trx_seqno((THD*)thd2)) {
+ WSREP_DEBUG("BF conflict, order: %lld %lld\n",
+ (long long)wsrep_thd_trx_seqno((THD*)thd1),
+ (long long)wsrep_thd_trx_seqno((THD*)thd2));
+ return 1;
+ }
+ WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
+ (long long)wsrep_thd_trx_seqno((THD*)thd1),
+ (long long)wsrep_thd_trx_seqno((THD*)thd2));
+ return 0;
+}
+
+
+extern "C" int
+wsrep_trx_is_aborting(void *thd_ptr)
+{
+ if (thd_ptr) {
+ if ((((THD *)thd_ptr)->wsrep_conflict_state == MUST_ABORT) ||
+ (((THD *)thd_ptr)->wsrep_conflict_state == ABORTING)) {
+ return 1;
+ }
+ }
+ return 0;
+}
+
+
+my_bool wsrep_read_only_option(THD *thd, TABLE_LIST *all_tables)
+{
+ int opt_readonly_saved = opt_readonly;
+ ulong flag_saved = (ulong)(thd->security_ctx->master_access & SUPER_ACL);
+
+ opt_readonly = 0;
+ thd->security_ctx->master_access &= ~SUPER_ACL;
+
+ my_bool ret = !deny_updates_if_read_only_option(thd, all_tables);
+
+ opt_readonly = opt_readonly_saved;
+ thd->security_ctx->master_access |= flag_saved;
+
+ return ret;
+}
+
+
+void wsrep_copy_query(THD *thd)
+{
+ thd->wsrep_retry_command = thd->get_command();
+ thd->wsrep_retry_query_len = thd->query_length();
+ if (thd->wsrep_retry_query) {
+ my_free(thd->wsrep_retry_query);
+ }
+ thd->wsrep_retry_query = (char *)my_malloc(
+ thd->wsrep_retry_query_len + 1, MYF(0));
+ strncpy(thd->wsrep_retry_query, thd->query(), thd->wsrep_retry_query_len);
+ thd->wsrep_retry_query[thd->wsrep_retry_query_len] = '\0';
+}
+
+
+bool wsrep_is_show_query(enum enum_sql_command command)
+{
+ DBUG_ASSERT(command >= 0 && command <= SQLCOM_END);
+ return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0;
+}
+
+bool wsrep_create_like_table(THD* thd, TABLE_LIST* table,
+ TABLE_LIST* src_table,
+ HA_CREATE_INFO *create_info)
+{
+ TABLE *tmp_table;
+ bool is_tmp_table= FALSE;
+
+ for (tmp_table= thd->temporary_tables; tmp_table; tmp_table=tmp_table->next)
+ {
+ if (!strcmp(src_table->db, tmp_table->s->db.str) &&
+ !strcmp(src_table->table_name, tmp_table->s->table_name.str))
+ {
+ is_tmp_table= TRUE;
+ break;
+ }
+ }
+ if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
+ {
+
+ /* CREATE TEMPORARY TABLE LIKE must be skipped from replication */
+ WSREP_DEBUG("CREATE TEMPORARY TABLE LIKE... skipped replication\n %s",
+ thd->query());
+ }
+ else if (!is_tmp_table)
+ {
+ /* this is straight CREATE TABLE LIKE... eith no tmp tables */
+ WSREP_TO_ISOLATION_BEGIN(table->db, table->table_name, NULL);
+ }
+ else
+ {
+ /* here we have CREATE TABLE LIKE <temporary table>
+ the temporary table definition will be needed in slaves to
+ enable the create to succeed
+ */
+ TABLE_LIST tbl;
+ bzero((void*) &tbl, sizeof(tbl));
+ tbl.db= src_table->db;
+ tbl.table_name= tbl.alias= src_table->table_name;
+ tbl.table= tmp_table;
+ char buf[2048];
+ String query(buf, sizeof(buf), system_charset_info);
+ query.length(0); // Have to zero it since constructor doesn't
+
+ (void) store_create_info(thd, &tbl, &query, NULL, TRUE, FALSE);
+ WSREP_DEBUG("TMP TABLE: %s", query.ptr());
+
+ thd->wsrep_TOI_pre_query= query.ptr();
+ thd->wsrep_TOI_pre_query_len= query.length();
+
+ WSREP_TO_ISOLATION_BEGIN(table->db, table->table_name, NULL);
+
+ thd->wsrep_TOI_pre_query= NULL;
+ thd->wsrep_TOI_pre_query_len= 0;
+ }
+
+ return(false);
+
+error:
+ thd->wsrep_TOI_pre_query= NULL;
+ return (true);
+}
+
+
+int wsrep_create_trigger_query(THD *thd, uchar** buf, size_t* buf_len)
+{
+ LEX *lex= thd->lex;
+ String stmt_query;
+
+ LEX_STRING definer_user;
+ LEX_STRING definer_host;
+
+ if (!lex->definer)
+ {
+ if (!thd->slave_thread)
+ {
+ if (!(lex->definer= create_default_definer(thd, false)))
+ return 1;
+ }
+ }
+
+ if (lex->definer)
+ {
+ /* SUID trigger. */
+
+ definer_user= lex->definer->user;
+ definer_host= lex->definer->host;
+ }
+ else
+ {
+ /* non-SUID trigger. */
+
+ definer_user.str= 0;
+ definer_user.length= 0;
+
+ definer_host.str= 0;
+ definer_host.length= 0;
+ }
+
+ stmt_query.append(STRING_WITH_LEN("CREATE "));
+
+ append_definer(thd, &stmt_query, &definer_user, &definer_host);
+
+ LEX_STRING stmt_definition;
+ stmt_definition.str= (char*) thd->lex->stmt_definition_begin;
+ stmt_definition.length= thd->lex->stmt_definition_end
+ - thd->lex->stmt_definition_begin;
+ trim_whitespace(thd->charset(), & stmt_definition);
+
+ stmt_query.append(stmt_definition.str, stmt_definition.length);
+
+ return wsrep_to_buf_helper(thd, stmt_query.c_ptr(), stmt_query.length(),
+ buf, buf_len);
+}