summaryrefslogtreecommitdiff
path: root/sql/wsrep_mysqld.cc
diff options
context:
space:
mode:
authorSeppo Jaakola <seppo.jaakola@codership.com>2013-02-05 15:48:54 +0200
committerSeppo Jaakola <seppo.jaakola@codership.com>2013-02-05 15:48:54 +0200
commite0c6a87b997509f9eb282988cfb097e30b4c9749 (patch)
tree0475741cb7626ab2e4a0dc799fe2f53dab516669 /sql/wsrep_mysqld.cc
parentdb7495822940b5c95f10ff5b1e6273701e868436 (diff)
downloadmariadb-git-e0c6a87b997509f9eb282988cfb097e30b4c9749.tar.gz
re-merging wsrep files from lp:codership-mysql
Diffstat (limited to 'sql/wsrep_mysqld.cc')
-rw-r--r--sql/wsrep_mysqld.cc1311
1 files changed, 0 insertions, 1311 deletions
diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc
deleted file mode 100644
index 62397c58a6e..00000000000
--- a/sql/wsrep_mysqld.cc
+++ /dev/null
@@ -1,1311 +0,0 @@
-/* Copyright 2008 Codership Oy <http://www.codership.com>
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; version 2 of the License.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
-
-#include <mysqld.h>
-#include <sql_class.h>
-#include <sql_parse.h>
-#include "wsrep_priv.h"
-#include <cstdio>
-#include <cstdlib>
-#include "log_event.h"
-
-extern Format_description_log_event *wsrep_format_desc;
-wsrep_t *wsrep = NULL;
-my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface
-
-/*
- * Begin configuration options and their default values
- */
-
-const char* wsrep_data_home_dir = NULL;
-
-#define WSREP_NODE_INCOMING_AUTO "AUTO"
-const char* wsrep_node_incoming_address = WSREP_NODE_INCOMING_AUTO;
-const char* wsrep_dbug_option = "";
-
-long wsrep_slave_threads = 1; // # of slave action appliers wanted
-my_bool wsrep_debug = 0; // enable debug level logging
-my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx
-ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx
-my_bool wsrep_auto_increment_control = 1; // control auto increment variables
-my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey
-my_bool wsrep_incremental_data_collection = 0; // incremental data collection
-long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size
-long wsrep_max_ws_rows = 65536; // max number of rows in ws
-int wsrep_to_isolation = 0; // # of active TO isolation threads
-my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key
-long wsrep_max_protocol_version = 2; // maximum protocol version to use
-ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC;
-my_bool wsrep_recovery = 0; // recovery
-my_bool wsrep_replicate_myisam = 0; // enable myisam replication
-my_bool wsrep_log_conflicts = 0; //
-ulong wsrep_mysql_replication_bundle = 0;
-
-/*
- * End configuration options
- */
-
-static const wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
-const wsrep_uuid_t* wsrep_cluster_uuid()
-{
- return &cluster_uuid;
-}
-static char cluster_uuid_str[40]= { 0, };
-static const char* cluster_status_str[WSREP_VIEW_MAX] =
-{
- "Primary",
- "non-Primary",
- "Disconnected"
-};
-
-static char provider_name[256]= { 0, };
-static char provider_version[256]= { 0, };
-static char provider_vendor[256]= { 0, };
-
-/*
- * wsrep status variables
- */
-my_bool wsrep_connected = FALSE;
-my_bool wsrep_ready = FALSE; // node can accept queries
-const char* wsrep_cluster_state_uuid = cluster_uuid_str;
-long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED;
-const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED];
-long wsrep_cluster_size = 0;
-long wsrep_local_index = -1;
-const char* wsrep_provider_name = provider_name;
-const char* wsrep_provider_version = provider_version;
-const char* wsrep_provider_vendor = provider_vendor;
-/* End wsrep status variables */
-
-
-wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED;
-wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED;
-wsp::node_status local_status;
-long wsrep_protocol_version = 2;
-
-// Boolean denoting if server is in initial startup phase. This is needed
-// to make sure that main thread waiting in wsrep_sst_wait() is signaled
-// if there was no state gap on receiving first view event.
-static my_bool wsrep_startup = TRUE;
-
-// action execute callback
-extern wsrep_status_t wsrep_apply_cb(void *ctx,
- const void* buf, size_t buf_len,
- wsrep_seqno_t global_seqno);
-
-extern wsrep_status_t wsrep_commit_cb (void *ctx,
- wsrep_seqno_t global_seqno,
- bool commit);
-
-static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
- switch (level) {
- case WSREP_LOG_INFO:
- sql_print_information("WSREP: %s", msg);
- break;
- case WSREP_LOG_WARN:
- sql_print_warning("WSREP: %s", msg);
- break;
- case WSREP_LOG_ERROR:
- case WSREP_LOG_FATAL:
- sql_print_error("WSREP: %s", msg);
- break;
- case WSREP_LOG_DEBUG:
- if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg);
- default:
- break;
- }
-}
-
-static void wsrep_log_states (wsrep_log_level_t const level,
- const wsrep_uuid_t* const group_uuid,
- wsrep_seqno_t const group_seqno,
- const wsrep_uuid_t* const node_uuid,
- wsrep_seqno_t const node_seqno)
-{
- char uuid_str[37];
- char msg[256];
-
- wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str));
- snprintf (msg, 255, "WSREP: Group state: %s:%lld",
- uuid_str, (long long)group_seqno);
- wsrep_log_cb (level, msg);
-
- wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str));
- snprintf (msg, 255, "WSREP: Local state: %s:%lld",
- uuid_str, (long long)node_seqno);
- wsrep_log_cb (level, msg);
-}
-
-static my_bool set_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg)
-{
- XID* xid= reinterpret_cast<XID*>(arg);
- handlerton* hton= plugin_data(plugin, handlerton *);
- if (hton->db_type == DB_TYPE_INNODB)
- {
- const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid));
- char uuid_str[40] = {0, };
- wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
- WSREP_DEBUG("Set WSREPXid for InnoDB: %s:%lld",
- uuid_str, (long long)wsrep_xid_seqno(xid));
- hton->wsrep_set_checkpoint(hton, xid);
- }
- return FALSE;
-}
-
-void wsrep_set_SE_checkpoint(XID* xid)
-{
- plugin_foreach(NULL, set_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
-}
-
-static my_bool get_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg)
-{
- XID* xid= reinterpret_cast<XID*>(arg);
- handlerton* hton= plugin_data(plugin, handlerton *);
- if (hton->db_type == DB_TYPE_INNODB)
- {
- hton->wsrep_get_checkpoint(hton, xid);
- const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid));
- char uuid_str[40] = {0, };
- wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
- WSREP_DEBUG("Read WSREPXid from InnoDB: %s:%lld",
- uuid_str, (long long)wsrep_xid_seqno(xid));
-
- }
- return FALSE;
-}
-
-void wsrep_get_SE_checkpoint(XID* xid)
-{
- plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
-}
-
-static void wsrep_view_handler_cb (void* app_ctx,
- void* recv_ctx,
- const wsrep_view_info_t* view,
- const char* state,
- size_t state_len,
- void** sst_req,
- ssize_t* sst_req_len)
-{
- wsrep_member_status_t new_status= local_status.get();
-
- if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t)))
- {
- memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid));
- wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
- sizeof(cluster_uuid_str));
- }
-
- wsrep_cluster_conf_id= view->view;
- wsrep_cluster_status= cluster_status_str[view->status];
- wsrep_cluster_size= view->memb_num;
- wsrep_local_index= view->my_idx;
-
- WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
- "number of nodes: %ld, my index: %ld, protocol version %d",
- wsrep_cluster_state_uuid, (long long)view->seqno,
- (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
- wsrep_cluster_size, wsrep_local_index, view->proto_ver);
-
- /* Proceed further only if view is PRIMARY */
- if (WSREP_VIEW_PRIMARY != view->status) {
- wsrep_ready_set(FALSE);
- new_status= WSREP_MEMBER_UNDEFINED;
- /* Always record local_uuid and local_seqno in non-prim since this
- * may lead to re-initializing provider and start position is
- * determined according to these variables */
- // WRONG! local_uuid should be the last primary configuration uuid we were
- // a member of. local_seqno should be updated in commit calls.
- // local_uuid= cluster_uuid;
- // local_seqno= view->first - 1;
- goto out;
- }
-
- switch (view->proto_ver)
- {
- case 0:
- case 1:
- case 2:
- // version change
- if (view->proto_ver != wsrep_protocol_version)
- {
- my_bool wsrep_ready_saved= wsrep_ready;
- wsrep_ready_set(FALSE);
- WSREP_INFO("closing client connections for "
- "protocol change %ld -> %d",
- wsrep_protocol_version, view->proto_ver);
- wsrep_close_client_connections(TRUE);
- wsrep_protocol_version= view->proto_ver;
- wsrep_ready_set(wsrep_ready_saved);
- }
- break;
- default:
- WSREP_ERROR("Unsupported application protocol version: %d",
- view->proto_ver);
- unireg_abort(1);
- }
-
- if (view->state_gap)
- {
- WSREP_WARN("Gap in state sequence. Need state transfer.");
-
- /* After that wsrep will call wsrep_sst_prepare. */
- /* keep ready flag 0 until we receive the snapshot */
- wsrep_ready_set(FALSE);
-
- /* Close client connections to ensure that they don't interfere
- * with SST */
- WSREP_DEBUG("[debug]: closing client connections for PRIM");
- wsrep_close_client_connections(TRUE);
-
- *sst_req_len= wsrep_sst_prepare (sst_req);
-
- if (*sst_req_len < 0)
- {
- int err = *sst_req_len;
- WSREP_ERROR("SST preparation failed: %d (%s)", -err, strerror(-err));
- new_status= WSREP_MEMBER_UNDEFINED;
- }
- else
- {
- new_status= WSREP_MEMBER_JOINER;
- }
- }
- else
- {
- /*
- * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized
- * before - OR - it was reinitilized on startup (lp:992840)
- */
- if (wsrep_startup)
- {
- if (wsrep_before_SE())
- {
- wsrep_SE_init_grab();
- // Signal mysqld init thread to continue
- wsrep_sst_complete (&cluster_uuid, view->seqno, false);
- // and wait for SE initialization
- wsrep_SE_init_wait();
- }
- else
- {
- local_uuid= cluster_uuid;
- local_seqno= view->seqno;
- }
- /* Init storage engine XIDs from first view */
- XID xid;
- wsrep_xid_init(&xid, &local_uuid, local_seqno);
- wsrep_set_SE_checkpoint(&xid);
- new_status= WSREP_MEMBER_JOINED;
- }
-
- // just some sanity check
- if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
- {
- WSREP_ERROR("Undetected state gap. Can't continue.");
- wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno,
- &local_uuid, -1);
- unireg_abort(1);
- }
- }
-
- if (wsrep_auto_increment_control)
- {
- global_system_variables.auto_increment_offset= view->my_idx + 1;
- global_system_variables.auto_increment_increment= view->memb_num;
- }
-
-out:
- wsrep_startup= FALSE;
- local_status.set(new_status, view);
-}
-
-void wsrep_ready_set (my_bool x)
-{
- WSREP_DEBUG("Setting wsrep_ready to %d", x);
- if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
- if (wsrep_ready != x)
- {
- wsrep_ready= x;
- mysql_cond_signal (&COND_wsrep_ready);
- }
- mysql_mutex_unlock (&LOCK_wsrep_ready);
-}
-
-// Wait until wsrep has reached ready state
-void wsrep_ready_wait ()
-{
- if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
- while (!wsrep_ready)
- {
- WSREP_INFO("Waiting to reach ready state");
- mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready);
- }
- WSREP_INFO("ready state reached");
- mysql_mutex_unlock (&LOCK_wsrep_ready);
-}
-
-static void wsrep_synced_cb(void* app_ctx)
-{
- WSREP_INFO("Synchronized with group, ready for connections");
- if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort();
- if (!wsrep_ready)
- {
- wsrep_ready= TRUE;
- mysql_cond_signal (&COND_wsrep_ready);
- }
- local_status.set(WSREP_MEMBER_SYNCED);
- mysql_mutex_unlock (&LOCK_wsrep_ready);
-}
-
-static void wsrep_init_position()
-{
- /* read XIDs from storage engines */
- XID xid;
- memset(&xid, 0, sizeof(xid));
- xid.formatID= -1;
- wsrep_get_SE_checkpoint(&xid);
-
- if (xid.formatID == -1)
- {
- WSREP_INFO("Read nil XID from storage engines, skipping position init");
- return;
- }
- else if (!wsrep_is_wsrep_xid(&xid))
- {
- WSREP_WARN("Read non-wsrep XID from storage engines, skipping position init");
- return;
- }
-
- const wsrep_uuid_t* uuid= wsrep_xid_uuid(&xid);
- const wsrep_seqno_t seqno= wsrep_xid_seqno(&xid);
-
- char uuid_str[40] = {0, };
- wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str));
- WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno);
-
-
- if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) &&
- local_seqno == WSREP_SEQNO_UNDEFINED)
- {
- // Initial state
- local_uuid= *uuid;
- local_seqno= seqno;
- }
- else if (memcmp(&local_uuid, uuid, sizeof(local_uuid)) ||
- local_seqno != seqno)
- {
- WSREP_WARN("Initial position was provided by configuration or SST, "
- "avoiding override");
- }
-}
-
-
-int wsrep_init()
-{
- int rcode= -1;
-
- wsrep_ready_set(FALSE);
- assert(wsrep_provider);
- wsrep_format_desc= new Format_description_log_event(4);
- wsrep_init_position();
-
- if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK)
- {
- if (strcasecmp(wsrep_provider, WSREP_NONE))
- {
- WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.",
- wsrep_provider, strerror(rcode), rcode);
- strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack
- (void) wsrep_init();
- return rcode;
- }
- else /* this is for recursive call above */
- {
- WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.",
- strerror(rcode), rcode);
- unireg_abort(1);
- }
- }
-
- if (strlen(wsrep_provider)== 0 ||
- !strcmp(wsrep_provider, WSREP_NONE))
- {
- // enable normal operation in case no provider is specified
- wsrep_ready_set(TRUE);
- global_system_variables.wsrep_on = 0;
- return 0;
- }
- else
- {
- global_system_variables.wsrep_on = 1;
- strncpy(provider_name,
- wsrep->provider_name, sizeof(provider_name) - 1);
- strncpy(provider_version,
- wsrep->provider_version, sizeof(provider_version) - 1);
- strncpy(provider_vendor,
- wsrep->provider_vendor, sizeof(provider_vendor) - 1);
- }
-
- if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0)
- wsrep_data_home_dir = mysql_real_data_home;
-
- char node_addr[512]= { 0, };
- if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
- {
- size_t const node_addr_max= sizeof(node_addr);
- size_t const ret= guess_ip(node_addr, node_addr_max);
- if (!(ret > 0 && ret < node_addr_max))
- {
- WSREP_WARN("Failed to guess base node address. Set it explicitly via "
- "wsrep_node_address.");
- node_addr[0]= '\0';
- }
- }
- else
- {
- strncpy(node_addr, wsrep_node_address, sizeof(node_addr) - 1);
- }
-
- static char inc_addr[512]= { 0, };
-
- if ((!wsrep_node_incoming_address ||
- !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
- {
- size_t const node_addr_len= strlen(node_addr);
- if (node_addr_len > 0)
- {
- const char* const colon= strrchr(node_addr, ':');
- if (strchr(node_addr, ':') == colon) // 1 or 0 ':'
- {
- size_t const inc_addr_max= sizeof (inc_addr);
- size_t const ip_len= colon ? colon - node_addr : node_addr_len;
- if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
- {
- memcpy (inc_addr, node_addr, ip_len);
- snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port);
- }
- else
- {
- WSREP_WARN("Guessing address for incoming client connections: "
- "address too long.");
- inc_addr[0]= '\0';
- }
- }
- else
- {
- WSREP_WARN("Guessing address for incoming client connections: "
- "too many colons :) .");
- inc_addr[0]= '\0';
- }
- }
-
- // this is to display detected address on SHOW VARIABLES...
- wsrep_node_incoming_address = inc_addr;
-
- if (!strlen(wsrep_node_incoming_address))
- {
- WSREP_WARN("Guessing address for incoming client connections failed. "
- "Try setting wsrep_node_incoming_address explicitly.");
- }
- }
-
- struct wsrep_init_args wsrep_args;
-
- wsrep_args.data_dir = wsrep_data_home_dir;
- wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
- wsrep_args.node_address = node_addr;
- wsrep_args.node_incoming = wsrep_node_incoming_address;
- wsrep_args.options = (wsrep_provider_options) ?
- wsrep_provider_options : "";
- wsrep_args.proto_ver = wsrep_max_protocol_version;
-
- wsrep_args.state_uuid = &local_uuid;
- wsrep_args.state_seqno = local_seqno;
-
- wsrep_args.logger_cb = wsrep_log_cb;
- wsrep_args.view_handler_cb = wsrep_view_handler_cb;
- wsrep_args.apply_cb = wsrep_apply_cb;
- wsrep_args.commit_cb = wsrep_commit_cb;
- wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
- wsrep_args.synced_cb = wsrep_synced_cb;
-
- rcode = wsrep->init(wsrep, &wsrep_args);
-
- if (rcode)
- {
- DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode));
- WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode);
- free(wsrep);
- wsrep = NULL;
- }
-
- return rcode;
-}
-
-extern "C" int wsrep_on(void *);
-
-void wsrep_init_startup (bool first)
-{
- if (wsrep_init()) unireg_abort(1);
-
- wsrep_thr_lock_init(wsrep_thd_is_brute_force, wsrep_abort_thd,
- wsrep_debug, wsrep_convert_LOCK_to_trx, wsrep_on);
-
- /* Skip replication start if no cluster address */
- if (!wsrep_cluster_address || strlen(wsrep_cluster_address) == 0) return;
-
- if (first) wsrep_sst_grab(); // do it so we can wait for SST below
-
- if (!wsrep_start_replication()) unireg_abort(1);
-
- wsrep_create_rollbacker();
- wsrep_create_appliers(1);
-
- if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed
-}
-
-
-void wsrep_deinit()
-{
- wsrep_unload(wsrep);
- wsrep= 0;
- provider_name[0]= '\0';
- provider_version[0]= '\0';
- provider_vendor[0]= '\0';
-
- delete wsrep_format_desc;
- wsrep_format_desc= NULL;
-}
-
-void wsrep_recover()
-{
- if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) &&
- local_seqno == -2)
- {
- char uuid_str[40];
- wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str));
- WSREP_INFO("Position %s:%lld given at startup, skipping position recovery",
- uuid_str, (long long)local_seqno);
- return;
- }
- XID xid;
- memset(&xid, 0, sizeof(xid));
- xid.formatID= -1;
- wsrep_get_SE_checkpoint(&xid);
- char uuid_str[40];
- wsrep_uuid_print(wsrep_xid_uuid(&xid), uuid_str, sizeof(uuid_str));
- WSREP_INFO("Recovered position: %s:%lld", uuid_str,
- (long long)wsrep_xid_seqno(&xid));
-}
-
-
-void wsrep_stop_replication(THD *thd)
-{
- WSREP_INFO("Stop replication");
- if (!wsrep)
- {
- WSREP_INFO("Provider was not loaded, in stop replication");
- return;
- }
-
- /* disconnect from group first to get wsrep_ready == FALSE */
- WSREP_DEBUG("Provider disconnect");
- wsrep->disconnect(wsrep);
-
- wsrep_connected= FALSE;
-
- wsrep_close_client_connections(TRUE);
-
- /* wait until appliers have stopped */
- wsrep_wait_appliers_close(thd);
-
- return;
-}
-
-
-bool wsrep_start_replication()
-{
- wsrep_status_t rcode;
-
- /*
- if provider is trivial, don't even try to connect,
- but resume local node operation
- */
- if (strlen(wsrep_provider)== 0 ||
- !strcmp(wsrep_provider, WSREP_NONE))
- {
- // enable normal operation in case no provider is specified
- wsrep_ready_set(TRUE);
- return true;
- }
-
- if (!wsrep_cluster_address || strlen(wsrep_cluster_address)== 0)
- {
- // if provider is non-trivial, but no address is specified, wait for address
- wsrep_ready_set(FALSE);
- return true;
- }
-
- WSREP_INFO("Start replication");
-
- if ((rcode = wsrep->connect(wsrep,
- wsrep_cluster_name,
- wsrep_cluster_address,
- wsrep_sst_donor)))
- {
- if (-ESOCKTNOSUPPORT == rcode)
- {
- DBUG_PRINT("wsrep",("unrecognized cluster address: '%s', rcode: %d",
- wsrep_cluster_address, rcode));
- WSREP_ERROR("unrecognized cluster address: '%s', rcode: %d",
- wsrep_cluster_address, rcode);
- }
- else
- {
- DBUG_PRINT("wsrep",("wsrep->connect() failed: %d", rcode));
- WSREP_ERROR("wsrep::connect() failed: %d", rcode);
- }
-
- return false;
- }
- else
- {
- wsrep_connected= TRUE;
-
- uint64_t caps = wsrep->capabilities (wsrep);
-
- wsrep_incremental_data_collection =
- (caps & WSREP_CAP_WRITE_SET_INCREMENTS);
-
- char* opts= wsrep->options_get(wsrep);
- if (opts)
- {
- wsrep_provider_options_init(opts);
- free(opts);
- }
- else
- {
- WSREP_WARN("Failed to get wsrep options");
- }
- }
-
- return true;
-}
-
-bool
-wsrep_causal_wait (THD* thd)
-{
- if (thd->variables.wsrep_causal_reads && thd->variables.wsrep_on &&
- !thd->in_active_multi_stmt_transaction() &&
- thd->wsrep_conflict_state != REPLAYING)
- {
- // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
- // TODO: modify to check if thd has locked any rows.
- wsrep_seqno_t seqno;
- wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno);
-
- if (unlikely(WSREP_OK != ret))
- {
- const char* msg;
- int err;
-
- // Possibly relevant error codes:
- // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY,
- // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET,
- // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED
-
- switch (ret)
- {
- case WSREP_NOT_IMPLEMENTED:
- msg= "consistent reads by wsrep backend. "
- "Please unset wsrep_causal_reads variable.";
- err= ER_NOT_SUPPORTED_YET;
- break;
- default:
- msg= "Causal wait failed.";
- err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed
- // with ER_LOCK_WAIT_TIMEOUT
- }
-
- my_error(err, MYF(0), msg);
-
- return true;
- }
- }
-
- return false;
-}
-
-/*
- * Helpers to deal with TOI key arrays
- */
-typedef struct wsrep_key_arr
-{
- wsrep_key_t* keys;
- size_t keys_len;
-} wsrep_key_arr_t;
-
-
-static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
-{
- for (size_t i= 0; i < key_arr->keys_len; ++i)
- {
- my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts);
- }
- my_free(key_arr->keys);
- key_arr->keys= 0;
- key_arr->keys_len= 0;
-}
-
-
-/*!
- * @param db Database string
- * @param table Table string
- * @param key Array of wsrep_key_t
- * @param key_len In: number of elements in key array, Out: number of
- * elements populated
- *
- * @return true if preparation was successful, otherwise false.
- */
-
-static bool wsrep_prepare_key_for_isolation(const char* db,
- const char* table,
- wsrep_key_part_t* key,
- size_t* key_len)
-{
- if (*key_len < 2) return false;
-
- switch (wsrep_protocol_version)
- {
- case 0:
- *key_len= 0;
- break;
- case 1:
- case 2:
- {
- *key_len= 0;
- if (db)
- {
- // sql_print_information("%s.%s", db, table);
- if (db)
- {
- key[*key_len].buf= db;
- key[*key_len].buf_len= strlen(db);
- ++(*key_len);
- if (table)
- {
- key[*key_len].buf= table;
- key[*key_len].buf_len= strlen(table);
- ++(*key_len);
- }
- }
- }
- break;
- }
- default:
- return false;
- }
-
- return true;
-}
-
-/* Prepare key list from db/table and table_list */
-static bool wsrep_prepare_keys_for_isolation(THD* thd,
- const char* db,
- const char* table,
- const TABLE_LIST* table_list,
- wsrep_key_arr_t* ka)
-{
- ka->keys= 0;
- ka->keys_len= 0;
-
- extern TABLE* find_temporary_table(THD*, const TABLE_LIST*);
-
- if (db || table)
- {
- TABLE_LIST tmp_table;
- bzero((char*) &tmp_table,sizeof(tmp_table));
- tmp_table.table_name= (char*)db;
- tmp_table.db= (char*)table;
- if (!table || !find_temporary_table(thd, &tmp_table))
- {
- if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
- {
- sql_print_error("Can't allocate memory for key_array");
- goto err;
- }
- ka->keys_len= 1;
- if (!(ka->keys[0].key_parts= (wsrep_key_part_t*)
- my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
- {
- sql_print_error("Can't allocate memory for key_parts");
- goto err;
- }
- ka->keys[0].key_parts_len= 2;
- if (!wsrep_prepare_key_for_isolation(
- db, table,
- (wsrep_key_part_t*)ka->keys[0].key_parts,
- &ka->keys[0].key_parts_len))
- {
- sql_print_error("Preparing keys for isolation failed");
- goto err;
- }
- }
- }
-
- for (const TABLE_LIST* table= table_list; table; table= table->next_global)
- {
- if (!find_temporary_table(thd, table))
- {
- wsrep_key_t* tmp;
- tmp= (wsrep_key_t*)my_realloc(
- ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0));
- if (!tmp)
- {
- sql_print_error("Can't allocate memory for key_array");
- goto err;
- }
- ka->keys= tmp;
- if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*)
- my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0))))
- {
- sql_print_error("Can't allocate memory for key_parts");
- goto err;
- }
- ka->keys[ka->keys_len].key_parts_len= 2;
- ++ka->keys_len;
- if (!wsrep_prepare_key_for_isolation(
- table->db, table->table_name,
- (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts,
- &ka->keys[ka->keys_len - 1].key_parts_len))
- {
- sql_print_error("Preparing keys for isolation failed");
- goto err;
- }
- }
- }
- return true;
-err:
- wsrep_keys_free(ka);
- return false;
-}
-
-
-
-bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
- size_t cache_key_len,
- const uchar* row_id,
- size_t row_id_len,
- wsrep_key_part_t* key,
- size_t* key_len)
-{
- if (*key_len < 3) return false;
-
- *key_len= 0;
- switch (wsrep_protocol_version)
- {
- case 0:
- {
- key[*key_len].buf = cache_key;
- key[*key_len].buf_len = cache_key_len;
- ++(*key_len);
- break;
- }
- case 1:
- case 2:
- {
- key[*key_len].buf = cache_key;
- key[*key_len].buf_len = strlen( (char*)cache_key );
- ++(*key_len);
- key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1;
- key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) );
- ++(*key_len);
- break;
- }
- default:
- return false;
- }
-
- key[*key_len].buf = row_id;
- key[*key_len].buf_len = row_id_len;
- ++(*key_len);
-
- return true;
-}
-
-/*
- * Construct Query_log_Event from thd query and serialize it
- * into buffer.
- *
- * Return 0 in case of success, 1 in case of error.
- */
-int wsrep_to_buf_helper(
- THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len)
-{
- IO_CACHE tmp_io_cache;
- if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
- 65536, MYF(MY_WME)))
- return 1;
- Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
- int ret(0);
- if (ev.write(&tmp_io_cache)) ret= 1;
- if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1;
- close_cached_file(&tmp_io_cache);
- return ret;
-}
-
-#include "sql_show.h"
-static int
-create_view_query(THD *thd, uchar** buf, uint* buf_len)
-{
- LEX *lex= thd->lex;
- SELECT_LEX *select_lex= &lex->select_lex;
- TABLE_LIST *first_table= select_lex->table_list.first;
- TABLE_LIST *views = first_table;
-
- String buff;
- const LEX_STRING command[3]=
- {{ C_STRING_WITH_LEN("CREATE ") },
- { C_STRING_WITH_LEN("ALTER ") },
- { C_STRING_WITH_LEN("CREATE OR REPLACE ") }};
-
- buff.append(command[thd->lex->create_view_mode].str,
- command[thd->lex->create_view_mode].length);
-
- if (!lex->definer)
- {
- /*
- DEFINER-clause is missing; we have to create default definer in
- persistent arena to be PS/SP friendly.
- If this is an ALTER VIEW then the current user should be set as
- the definer.
- */
-
- if (!(lex->definer= create_default_definer(thd)))
- {
- WSREP_WARN("view default definer issue");
- }
- }
-
- views->algorithm = lex->create_view_algorithm;
- views->definer.user = lex->definer->user;
- views->definer.host = lex->definer->host;
- views->view_suid = lex->create_view_suid;
- views->with_check = lex->create_view_check;
-
- view_store_options(thd, views, &buff);
- buff.append(STRING_WITH_LEN("VIEW "));
- /* Test if user supplied a db (ie: we did not use thd->db) */
- if (views->db && views->db[0] &&
- (thd->db == NULL || strcmp(views->db, thd->db)))
- {
- append_identifier(thd, &buff, views->db,
- views->db_length);
- buff.append('.');
- }
- append_identifier(thd, &buff, views->table_name,
- views->table_name_length);
- if (lex->view_list.elements)
- {
- List_iterator_fast<LEX_STRING> names(lex->view_list);
- LEX_STRING *name;
- int i;
-
- for (i= 0; (name= names++); i++)
- {
- buff.append(i ? ", " : "(");
- append_identifier(thd, &buff, name->str, name->length);
- }
- buff.append(')');
- }
- buff.append(STRING_WITH_LEN(" AS "));
- //buff.append(views->source.str, views->source.length);
- buff.append(thd->lex->create_view_select.str,
- thd->lex->create_view_select.length);
- //int errcode= query_error_code(thd, TRUE);
- //if (thd->binlog_query(THD::STMT_QUERY_TYPE,
- // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod
- return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len);
-}
-
-static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
- const TABLE_LIST* table_list)
-{
- wsrep_status_t ret(WSREP_WARNING);
- uchar* buf(0);
- uint buf_len(0);
- int buf_err;
-
- WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode, thd->query() );
- switch (thd->lex->sql_command)
- {
- case SQLCOM_CREATE_VIEW:
- buf_err= create_view_query(thd, &buf, &buf_len);
- break;
- case SQLCOM_CREATE_PROCEDURE:
- case SQLCOM_CREATE_SPFUNCTION:
- buf_err= wsrep_create_sp(thd, &buf, &buf_len);
- break;
- case SQLCOM_CREATE_TRIGGER:
- buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len);
- break;
- case SQLCOM_CREATE_EVENT:
- buf_err= wsrep_create_event_query(thd, &buf, &buf_len);
- break;
- default:
- buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), &buf,
- &buf_len);
- break;
- }
-
- wsrep_key_arr_t key_arr= {0, 0};
- if (!buf_err &&
- wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&&
- WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
- key_arr.keys, key_arr.keys_len,
- buf, buf_len,
- &thd->wsrep_trx_seqno)))
- {
- thd->wsrep_exec_mode= TOTAL_ORDER;
- wsrep_to_isolation++;
- if (buf) my_free(buf);
- wsrep_keys_free(&key_arr);
- WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode);
- }
- else {
- /* jump to error handler in mysql_execute_command() */
- WSREP_WARN("TO isolation failed for: %d, sql: %s. Check wsrep "
- "connection state and retry the query.",
- ret, (thd->query()) ? thd->query() : "void");
- my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check "
- "your wsrep connection state and retry the query.");
- if (buf) my_free(buf);
- wsrep_keys_free(&key_arr);
- return -1;
- }
- return 0;
-}
-
-static void wsrep_TOI_end(THD *thd) {
- wsrep_status_t ret;
- wsrep_to_isolation--;
- WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void")
- if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
- WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno);
- }
- else {
- WSREP_WARN("TO isolation end failed for: %d, sql: %s",
- ret, (thd->query()) ? thd->query() : "void");
- }
-}
-
-static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
-{
- wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode, thd->query() );
-
- ret = wsrep->desync(wsrep);
- if (ret != WSREP_OK)
- {
- WSREP_WARN("RSU desync failed %d for %s", ret, thd->query());
- my_error(ER_LOCK_DEADLOCK, MYF(0));
- return(ret);
- }
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying++;
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
-
- if (wsrep_wait_committing_connections_close(5000))
- {
- /* no can do, bail out from DDL */
- WSREP_WARN("RSU failed due to pending transactions, %s", thd->query());
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying--;
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
-
- ret = wsrep->resync(wsrep);
- if (ret != WSREP_OK)
- {
- WSREP_WARN("resync failed %d for %s", ret, thd->query());
- }
- my_error(ER_LOCK_DEADLOCK, MYF(0));
- return(1);
- }
-
- wsrep_seqno_t seqno = wsrep->pause(wsrep);
- if (seqno == WSREP_SEQNO_UNDEFINED)
- {
- WSREP_WARN("pause failed %lld for %s", (long long)seqno, thd->query());
- return(1);
- }
- WSREP_DEBUG("paused at %lld", (long long)seqno);
- thd->variables.wsrep_on = 0;
- return 0;
-}
-
-static void wsrep_RSU_end(THD *thd)
-{
- wsrep_status_t ret(WSREP_WARNING);
- WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno,
- thd->wsrep_exec_mode, thd->query() );
-
-
- mysql_mutex_lock(&LOCK_wsrep_replaying);
- wsrep_replaying--;
- mysql_mutex_unlock(&LOCK_wsrep_replaying);
-
- ret = wsrep->resume(wsrep);
- if (ret != WSREP_OK)
- {
- WSREP_WARN("resume failed %d for %s", ret, thd->query());
- }
- ret = wsrep->resync(wsrep);
- if (ret != WSREP_OK)
- {
- WSREP_WARN("resync failed %d for %s", ret, thd->query());
- return;
- }
- thd->variables.wsrep_on = 1;
- return;
-}
-
-int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
- const TABLE_LIST* table_list)
-{
- int ret= 0;
- mysql_mutex_lock(&thd->LOCK_wsrep_thd);
- if (thd->wsrep_conflict_state == MUST_ABORT)
- {
- WSREP_INFO("thread: %lu, %s has been aborted due to multi-master conflict",
- thd->thread_id, thd->query());
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
- return WSREP_TRX_FAIL;
- }
- mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
-
- if (wsrep_debug && thd->mdl_context.has_locks())
- {
- WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu",
- thd->query(), thd->thread_id);
- }
- if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE)
- {
- switch (wsrep_OSU_method_options) {
- case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_,
- table_list); break;
- case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break;
- }
- if (!ret)
- {
- thd->wsrep_exec_mode= TOTAL_ORDER;
- }
- }
- return ret;
-}
-
-void wsrep_to_isolation_end(THD *thd) {
- if (thd->wsrep_exec_mode==TOTAL_ORDER)
- {
- switch(wsrep_OSU_method_options)
- {
- case WSREP_OSU_TOI: return wsrep_TOI_end(thd);
- case WSREP_OSU_RSU: return wsrep_RSU_end(thd);
- }
- }
-}
-
-#define WSREP_MDL_LOG(severity, msg, req, gra) \
- WSREP_##severity( \
- "%s\n" \
- "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
- "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
- msg, \
- req->thread_id, (long long)req->wsrep_trx_seqno, \
- req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
- req->command, req->lex->sql_command, req->query(), \
- gra->thread_id, (long long)gra->wsrep_trx_seqno, \
- gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
- gra->command, gra->lex->sql_command, gra->query());
-
-bool
-wsrep_grant_mdl_exception(MDL_context *requestor_ctx,
- MDL_ticket *ticket
-) {
- if (!WSREP_ON) return FALSE;
-
- THD *request_thd = requestor_ctx->get_thd();
- THD *granted_thd = ticket->get_ctx()->get_thd();
- bool ret = FALSE;
-
- mysql_mutex_lock(&request_thd->LOCK_wsrep_thd);
- if (request_thd->wsrep_exec_mode == TOTAL_ORDER ||
- request_thd->wsrep_exec_mode == REPL_RECV)
- {
- mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd);
- WSREP_MDL_LOG(DEBUG, "MDL conflict ", request_thd, granted_thd);
- ticket->wsrep_report(wsrep_debug);
-
- mysql_mutex_lock(&granted_thd->LOCK_wsrep_thd);
- if (granted_thd->wsrep_exec_mode == TOTAL_ORDER ||
- granted_thd->wsrep_exec_mode == REPL_RECV)
- {
- WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", request_thd, granted_thd);
- ticket->wsrep_report(true);
- mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
- ret = TRUE;
- }
- else if (granted_thd->lex->sql_command == SQLCOM_FLUSH)
- {
- WSREP_DEBUG("mdl granted over FLUSH BF");
- ticket->wsrep_report(wsrep_debug);
- mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
- ret = TRUE;
- }
- else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE)
- {
- WSREP_DEBUG("DROP caused BF abort");
- ticket->wsrep_report(wsrep_debug);
- mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
- wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
- ret = FALSE;
- }
- else if (granted_thd->wsrep_query_state == QUERY_COMMITTING)
- {
- WSREP_DEBUG("mdl granted, but commiting thd abort scheduled");
- ticket->wsrep_report(wsrep_debug);
- mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
- wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
- ret = FALSE;
- }
- else
- {
- WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", request_thd, granted_thd);
- ticket->wsrep_report(wsrep_debug);
- mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd);
- wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1);
- ret = FALSE;
- }
- }
- else
- {
- mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd);
- }
- return ret;
-}