summaryrefslogtreecommitdiff
path: root/sql/wsrep_sst.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/wsrep_sst.cc')
-rw-r--r--sql/wsrep_sst.cc621
1 files changed, 267 insertions, 354 deletions
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc
index e648a7f4c69..afbcd4d6c52 100644
--- a/sql/wsrep_sst.cc
+++ b/sql/wsrep_sst.cc
@@ -1,4 +1,4 @@
-/* Copyright 2008-2015 Codership Oy <http://www.codership.com>
+/* Copyright 2008-2017 Codership Oy <http://www.codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -35,16 +35,16 @@
static char wsrep_defaults_file[FN_REFLEN * 2 + 10 + 30 +
sizeof(WSREP_SST_OPT_CONF) +
sizeof(WSREP_SST_OPT_CONF_SUFFIX) +
- sizeof(WSREP_SST_OPT_CONF_EXTRA)] = {0};
+ sizeof(WSREP_SST_OPT_CONF_EXTRA)]= {0};
-const char* wsrep_sst_method = WSREP_SST_DEFAULT;
-const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO;
-const char* wsrep_sst_donor = "";
-const char* wsrep_sst_auth = NULL;
+const char* wsrep_sst_method = WSREP_SST_DEFAULT;
+const char* wsrep_sst_receive_address= WSREP_SST_ADDRESS_AUTO;
+const char* wsrep_sst_donor = "";
+const char* wsrep_sst_auth = NULL;
// container for real auth string
-static const char* sst_auth_real = NULL;
-my_bool wsrep_sst_donor_rejects_queries = FALSE;
+static const char* sst_auth_real = NULL;
+my_bool wsrep_sst_donor_rejects_queries= FALSE;
bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
{
@@ -60,12 +60,7 @@ bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var)
return 0;
}
-bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type)
-{
- return 0;
-}
-
-static const char* data_home_dir = NULL;
+static const char* data_home_dir;
void wsrep_set_data_home_dir(const char *data_dir)
{
@@ -139,7 +134,7 @@ static bool sst_auth_real_set (const char* value)
{
// set sst_auth_real
if (sst_auth_real) { my_free((void *) sst_auth_real); }
- sst_auth_real = v;
+ sst_auth_real= v;
// mask wsrep_sst_auth
if (strlen(sst_auth_real))
@@ -180,6 +175,7 @@ bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type)
return 0;
}
+
bool wsrep_before_SE()
{
return (wsrep_provider != NULL
@@ -188,111 +184,29 @@ bool wsrep_before_SE()
&& strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP));
}
-static bool sst_complete = false;
-static bool sst_needed = false;
-
-#define WSREP_EXTEND_TIMEOUT_INTERVAL 30
-#define WSREP_TIMEDWAIT_SECONDS 10
-
-void wsrep_sst_grab ()
-{
- WSREP_INFO("wsrep_sst_grab()");
- if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
- sst_complete = false;
- mysql_mutex_unlock (&LOCK_wsrep_sst);
-}
-
-// Wait for end of SST
-bool wsrep_sst_wait ()
-{
- double total_wtime = 0;
-
- if (mysql_mutex_lock (&LOCK_wsrep_sst))
- abort();
-
- WSREP_INFO("Waiting for SST to complete.");
-
- while (!sst_complete)
- {
- struct timespec wtime;
- set_timespec(wtime, WSREP_TIMEDWAIT_SECONDS);
- time_t start_time = time(NULL);
- mysql_cond_timedwait (&COND_wsrep_sst, &LOCK_wsrep_sst, &wtime);
- time_t end_time = time(NULL);
-
- if (!sst_complete)
- {
- total_wtime += difftime(end_time, start_time);
- WSREP_DEBUG("Waiting for SST to complete. current seqno: %" PRId64 " waited %f secs.", local_seqno, total_wtime);
- service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
- "WSREP state transfer ongoing, current seqno: %ld waited %f secs", local_seqno, total_wtime);
- }
- }
-
- if (local_seqno >= 0)
- {
- WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno);
- }
- else
- {
- WSREP_ERROR("SST failed: %d (%s)",
- int(-local_seqno), strerror(-local_seqno));
- }
-
- mysql_mutex_unlock (&LOCK_wsrep_sst);
-
- return (local_seqno >= 0);
-}
-
// Signal end of SST
-void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
- wsrep_seqno_t sst_seqno,
- bool needed)
+static void wsrep_sst_complete (THD* thd,
+ int const rcode)
{
- if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort();
- if (!sst_complete)
- {
- sst_complete = true;
- sst_needed = needed;
- local_uuid = *sst_uuid;
- local_seqno = sst_seqno;
- mysql_cond_signal (&COND_wsrep_sst);
- }
- else
- {
- /* This can happen when called from wsrep_synced_cb().
- At the moment there is no way to check there
- if main thread is still waiting for signal,
- so wsrep_sst_complete() is called from there
- each time wsrep_ready changes from FALSE -> TRUE.
- */
- WSREP_DEBUG("Nobody is waiting for SST.");
- }
- mysql_mutex_unlock (&LOCK_wsrep_sst);
+ Wsrep_client_service client_service(thd, thd->wsrep_cs());
+ Wsrep_server_state::instance().sst_received(client_service, rcode);
}
-/*
+ /*
If wsrep provider is loaded, inform that the new state snapshot
has been received. Also update the local checkpoint.
- @param wsrep [IN] wsrep handle
+ @param thd [IN]
@param uuid [IN] Initial state UUID
@param seqno [IN] Initial state sequence number
@param state [IN] Always NULL, also ignored by wsrep provider (?)
@param state_len [IN] Always 0, also ignored by wsrep provider (?)
- @param implicit [IN] Whether invoked implicitly due to SST
- (true) or explicitly because if change
- in wsrep_start_position by user (false).
- @return false Success
- true Error
-
*/
-bool wsrep_sst_received (wsrep_t* const wsrep,
- const wsrep_uuid_t& uuid,
- const wsrep_seqno_t seqno,
- const void* const state,
- const size_t state_len,
- const bool implicit)
+void wsrep_sst_received (THD* thd,
+ const wsrep_uuid_t& uuid,
+ wsrep_seqno_t const seqno,
+ const void* const state,
+ size_t const state_len)
{
/*
To keep track of whether the local uuid:seqno should be updated. Also, note
@@ -300,81 +214,40 @@ bool wsrep_sst_received (wsrep_t* const wsrep,
OK from wsrep provider. By doing so, the values remain consistent across
the server & wsrep provider.
*/
- bool do_update= false;
-
- // Get the locally stored uuid:seqno.
- if (wsrep_get_SE_checkpoint(local_uuid, local_seqno))
- {
- return true;
- }
-
- if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) ||
- local_seqno < seqno || seqno < 0)
- {
- do_update= true;
- }
- else if (local_seqno > seqno)
- {
- WSREP_WARN("SST position can't be set in past. Requested: %lld, Current: "
- " %lld.", (long long)seqno, (long long)local_seqno);
/*
- If we are here because of SET command, simply return true (error) instead of
- aborting.
+ TODO: Handle backwards compatibility. WSREP API v25 does not have
+ wsrep schema.
*/
- if (implicit)
- {
- WSREP_WARN("Can't continue.");
- unireg_abort(1);
- }
- else
- {
- return true;
+ /*
+ Logical SST methods (mysqldump etc) don't update InnoDB sys header.
+ Reset the SE checkpoint before recovering view in order to avoid
+ sanity check failure.
+ */
+ wsrep::gtid const sst_gtid(wsrep::id(uuid.data, sizeof(uuid.data)),
+ wsrep::seqno(seqno));
+
+ if (!wsrep_before_SE()) {
+ wsrep_set_SE_checkpoint(wsrep::gtid::undefined());
+ wsrep_set_SE_checkpoint(sst_gtid);
}
- }
-
-#ifdef GTID_SUPPORT
- wsrep_init_sidno(uuid);
-#endif /* GTID_SUPPORT */
-
- if (wsrep)
- {
- int const rcode(seqno < 0 ? seqno : 0);
- wsrep_gtid_t const state_id= {uuid,
- (rcode ? WSREP_SEQNO_UNDEFINED : seqno)};
+ wsrep_verify_SE_checkpoint(uuid, seqno);
- wsrep_status_t ret= wsrep->sst_received(wsrep, &state_id, state,
- state_len, rcode);
-
- if (ret != WSREP_OK)
- {
- return true;
+ /*
+ Both wsrep_init_SR() and wsrep_recover_view() may use
+ wsrep thread pool. Restore original thd context before returning.
+ */
+ if (thd) {
+ thd->store_globals();
+ }
+ else {
+ my_pthread_setspecific_ptr(THR_THD, NULL);
}
- }
- // Now is the good time to update the local state and checkpoint.
- if (do_update)
- {
- if (wsrep_set_SE_checkpoint(uuid, seqno))
+ if (WSREP_ON)
{
- return true;
+ int const rcode(seqno < 0 ? seqno : 0);
+ wsrep_sst_complete(thd,rcode);
}
-
- local_uuid= uuid;
- local_seqno= seqno;
- }
-
- return false;
-}
-
-// Let applier threads to continue
-bool wsrep_sst_continue ()
-{
- if (sst_needed)
- {
- WSREP_INFO("Signalling provider to continue.");
- return wsrep_sst_received (wsrep, local_uuid, local_seqno, NULL, 0, true);
- }
- return false;
}
struct sst_thread_arg
@@ -404,11 +277,11 @@ struct sst_thread_arg
static int sst_scan_uuid_seqno (const char* str,
wsrep_uuid_t* uuid, wsrep_seqno_t* seqno)
{
- int offt = wsrep_uuid_scan (str, strlen(str), uuid);
+ int offt= wsrep_uuid_scan (str, strlen(str), uuid);
errno= 0; /* Reset the errno */
if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt])
{
- *seqno = strtoll (str + offt + 1, NULL, 10);
+ *seqno= strtoll (str + offt + 1, NULL, 10);
if (*seqno != LLONG_MAX || errno != ERANGE)
{
return 0;
@@ -416,7 +289,7 @@ static int sst_scan_uuid_seqno (const char* str,
}
WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str);
- return EINVAL;
+ return -EINVAL;
}
// get rid of trailing \n
@@ -426,8 +299,8 @@ static char* my_fgets (char* buf, size_t buf_len, FILE* stream)
if (ret)
{
- size_t len = strlen(ret);
- if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0';
+ size_t len= strlen(ret);
+ if (len > 0 && ret[len - 1] == '\n') ret[len - 1]= '\0';
}
return ret;
@@ -482,9 +355,10 @@ static void* sst_joiner_thread (void* a)
int err= 1;
{
- const char magic[] = "ready";
- const size_t magic_len = sizeof(magic) - 1;
- const size_t out_len = 512;
+ THD* thd;
+ const char magic[]= "ready";
+ const size_t magic_len= sizeof(magic) - 1;
+ const size_t out_len= 512;
char out[out_len];
WSREP_INFO("Running: '%s'", arg->cmd);
@@ -501,29 +375,31 @@ static void* sst_joiner_thread (void* a)
WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'",
magic, arg->cmd, tmp);
proc.wait();
- if (proc.error()) err = proc.error();
+ if (proc.error()) err= proc.error();
}
else
{
- err = 0;
+ err= 0;
}
}
else
{
- err = proc.error();
+ err= proc.error();
WSREP_ERROR("Failed to execute: %s : %d (%s)",
arg->cmd, err, strerror(err));
}
- // signal sst_prepare thread with ret code,
- // it will go on sending SST request
+ /*
+ signal sst_prepare thread with ret code,
+ it will go on sending SST request
+ */
mysql_mutex_lock (&arg->lock);
if (!err)
{
- arg->ret_str = strdup (out + magic_len + 1);
- if (!arg->ret_str) err = ENOMEM;
+ arg->ret_str= strdup (out + magic_len + 1);
+ if (!arg->ret_str) err= ENOMEM;
}
- arg->err = -err;
+ arg->err= -err;
mysql_cond_signal (&arg->cond);
mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
@@ -531,11 +407,11 @@ static void* sst_joiner_thread (void* a)
* initializer thread to ensure single thread of
* shutdown. */
- wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
- wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED;
+ wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
+ wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
// in case of successfull receiver start, wait for SST completion/end
- char* tmp = my_fgets (out, out_len, proc.pipe());
+ char* tmp= my_fgets (out, out_len, proc.pipe());
proc.wait();
err= EINVAL;
@@ -544,7 +420,7 @@ static void* sst_joiner_thread (void* a)
{
WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from "
"joiner script.");
- if (proc.error()) err = proc.error();
+ if (proc.error()) err= proc.error();
}
else
{
@@ -552,7 +428,14 @@ static void* sst_joiner_thread (void* a)
const char *pos= strchr(out, ' ');
if (!pos) {
- // There is no wsrep_gtid_domain_id (some older version SST script?).
+
+ if (wsrep_gtid_mode)
+ {
+ // There is no wsrep_gtid_domain_id (some older version SST script?).
+ WSREP_WARN("Did not find domain ID from SST script output '%s'. "
+ "Domain ID must be set manually to keep binlog consistent",
+ out);
+ }
err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno);
} else {
@@ -588,14 +471,59 @@ static void* sst_joiner_thread (void* a)
err:
+ wsrep::gtid ret_gtid;
+
if (err)
{
- ret_uuid= WSREP_UUID_UNDEFINED;
- ret_seqno= -err;
+ ret_gtid= wsrep::gtid::undefined();
}
+ else
+ {
+ ret_gtid= wsrep::gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)),
+ wsrep::seqno(ret_seqno));
+ }
+
+ /*
+ Tell initializer thread that SST is complete
+ For that initialize a THD
+ */
+ if (my_thread_init())
+ {
+ WSREP_ERROR("my_thread_init() failed, can't signal end of SST. "
+ "Aborting.");
+ unireg_abort(1);
+ }
+
+ thd= new THD(next_thread_id());
+
+ if (!thd)
+ {
+ WSREP_ERROR("Failed to allocate THD to restore view from local state, "
+ "can't signal end of SST. Aborting.");
+ unireg_abort(1);
+ }
+
+ thd->thread_stack= (char*) &thd;
+ thd->security_ctx->skip_grants();
+ thd->system_thread= SYSTEM_THREAD_GENERIC;
+ thd->real_id= pthread_self();
+
+ thd->store_globals();
+
+ /* */
+ thd->variables.wsrep_on = 0;
+ /* No binlogging */
+ thd->variables.sql_log_bin = 0;
+ thd->variables.option_bits &= ~OPTION_BIN_LOG;
+ /* No general log */
+ thd->variables.option_bits |= OPTION_LOG_OFF;
+ /* Read committed isolation to avoid gap locking */
+ thd->variables.tx_isolation= ISO_READ_COMMITTED;
- // Tell initializer thread that SST is complete
- wsrep_sst_complete (&ret_uuid, ret_seqno, true);
+ wsrep_sst_complete (thd, -err);
+
+ delete thd;
+ my_thread_end();
}
return NULL;
@@ -694,7 +622,7 @@ static ssize_t sst_prepare_other (const char* method,
" %s "
WSREP_SST_OPT_PARENT " '%d'"
" %s '%s'"
- " %s '%s'",
+ " %s '%s'",
method, addr_in, mysql_real_data_home,
wsrep_defaults_file,
(int)getpid(), binlog_opt, binlog_opt_val,
@@ -734,7 +662,7 @@ static ssize_t sst_prepare_other (const char* method,
pthread_t tmp;
sst_thread_arg arg(cmd_str(), env());
mysql_mutex_lock (&arg.lock);
- ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg);
+ ret= pthread_create (&tmp, NULL, sst_joiner_thread, &arg);
if (ret)
{
WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)",
@@ -746,11 +674,11 @@ static ssize_t sst_prepare_other (const char* method,
*addr_out= arg.ret_str;
if (!arg.err)
- ret = strlen(*addr_out);
+ ret= strlen(*addr_out);
else
{
assert (arg.err < 0);
- ret = arg.err;
+ ret= arg.err;
}
pthread_detach (tmp);
@@ -764,12 +692,12 @@ extern uint mysqld_port;
static ssize_t sst_prepare_mysqldump (const char* addr_in,
const char** addr_out)
{
- ssize_t ret = strlen (addr_in);
+ ssize_t ret= strlen (addr_in);
if (!strrchr(addr_in, ':'))
{
- ssize_t s = ret + 7;
- char* tmp = (char*) malloc (s);
+ ssize_t s= ret + 7;
+ char* tmp= (char*) malloc (s);
if (tmp)
{
@@ -780,7 +708,7 @@ static ssize_t sst_prepare_mysqldump (const char* addr_in,
*addr_out= tmp;
return ret;
}
- if (ret > 0) /* buffer too short */ ret = -EMSGSIZE;
+ if (ret > 0) /* buffer too short */ ret= -EMSGSIZE;
free (tmp);
}
else {
@@ -797,31 +725,22 @@ static ssize_t sst_prepare_mysqldump (const char* addr_in,
return ret;
}
-static bool SE_initialized = false;
-
-ssize_t wsrep_sst_prepare (void** msg)
+std::string wsrep_sst_prepare()
{
+ const ssize_t ip_max= 256;
+ char ip_buf[ip_max];
const char* addr_in= NULL;
const char* addr_out= NULL;
+ const char* method;
if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP))
{
- ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1;
- *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL);
- if (!msg)
- {
- WSREP_ERROR("Could not allocate %zd bytes for state request", ret);
- unireg_abort(1);
- }
- return ret;
+ return WSREP_STATE_TRANSFER_TRIVIAL;
}
/*
Figure out SST receive address. Common for all SST methods.
*/
- char ip_buf[256];
- const ssize_t ip_max= sizeof(ip_buf);
-
// Attempt 1: wsrep_sst_receive_address
if (wsrep_sst_receive_address &&
strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO))
@@ -838,7 +757,7 @@ ssize_t wsrep_sst_prepare (void** msg)
{
WSREP_ERROR("Could not parse wsrep_node_address : %s",
wsrep_node_address);
- unireg_abort(1);
+ throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable");
}
memcpy(ip_buf, addr.get_address(), addr.get_address_len());
addr_in= ip_buf;
@@ -856,21 +775,33 @@ ssize_t wsrep_sst_prepare (void** msg)
{
WSREP_ERROR("Failed to guess address to accept state transfer. "
"wsrep_sst_receive_address must be set manually.");
- unireg_abort(1);
+ throw wsrep::runtime_error("Could not prepare state transfer request");
}
}
ssize_t addr_len= -ENOSYS;
- if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP))
+ method = wsrep_sst_method;
+ if (!strcmp(method, WSREP_SST_MYSQLDUMP))
{
addr_len= sst_prepare_mysqldump (addr_in, &addr_out);
- if (addr_len < 0) unireg_abort(1);
+ if (addr_len < 0)
+ {
+ throw wsrep::runtime_error("Could not prepare mysqldimp address");
+ }
}
else
{
/*! A heuristic workaround until we learn how to stop and start engines */
- if (SE_initialized)
+ if (Wsrep_server_state::instance().is_initialized() &&
+ Wsrep_server_state::instance().state() == Wsrep_server_state::s_joiner)
{
+ if (!strcmp(method, WSREP_SST_XTRABACKUP) ||
+ !strcmp(method, WSREP_SST_XTRABACKUPV2))
+ {
+ WSREP_WARN("The %s SST method is deprecated, so it is automatically "
+ "replaced by %s", method, WSREP_SST_MARIABACKUP);
+ method = WSREP_SST_MARIABACKUP;
+ }
// we already did SST at initializaiton, now engines are running
// sql_print_information() is here because the message is too long
// for WSREP_INFO.
@@ -880,48 +811,38 @@ ssize_t wsrep_sst_prepare (void** msg)
"Wsrep provider won't be able to fall back to it "
"if other means of state transfer are unavailable. "
"In that case you will need to restart the server.",
- wsrep_sst_method);
- *msg = 0;
- return 0;
+ method);
+ return "";
}
- addr_len = sst_prepare_other (wsrep_sst_method, sst_auth_real,
+ addr_len = sst_prepare_other (method, sst_auth_real,
addr_in, &addr_out);
if (addr_len < 0)
{
WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.",
- wsrep_sst_method);
- unireg_abort(1);
+ method);
+ throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable");
}
}
- size_t const method_len(strlen(wsrep_sst_method));
- size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/);
-
- *msg = malloc (msg_len);
- if (NULL != *msg) {
- char* const method_ptr(reinterpret_cast<char*>(*msg));
- strcpy (method_ptr, wsrep_sst_method);
- char* const addr_ptr(method_ptr + method_len + 1);
- strcpy (addr_ptr, addr_out);
+ std::string ret;
+ ret += method;
+ ret.push_back('\0');
+ ret += addr_out;
- WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr);
- }
- else {
- WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.",
- msg_len);
- unireg_abort(1);
- }
+ const char* method_ptr(ret.data());
+ const char* addr_ptr(ret.data() + strlen(method_ptr) + 1);
+ WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr);
if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out);
- return msg_len;
+ return ret;
}
// helper method for donors
static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
{
- int ret = 0;
+ int ret= 0;
for (int tries=1; tries <= max_tries; tries++)
{
@@ -932,7 +853,7 @@ static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
proc.wait();
}
- if ((ret = proc.error()))
+ if ((ret= proc.error()))
{
WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)",
tries, max_tries, proc.cmd(), ret, strerror(ret));
@@ -950,15 +871,12 @@ static int sst_run_shell (const char* cmd_str, char** env, int max_tries)
static void sst_reject_queries(my_bool close_conn)
{
- wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced
- WSREP_INFO("Rejecting client queries for the duration of SST.");
- if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
+ WSREP_INFO("Rejecting client queries for the duration of SST.");
+ if (TRUE == close_conn) wsrep_close_client_connections(FALSE);
}
static int sst_donate_mysqldump (const char* addr,
- const wsrep_uuid_t* uuid,
- const char* uuid_str,
- wsrep_seqno_t seqno,
+ const wsrep::gtid& gtid,
bool bypass,
char** env) // carries auth info
{
@@ -981,23 +899,31 @@ static int sst_donate_mysqldump (const char* addr,
return -ENOMEM;
}
+ /*
+ we enable new client connections so that mysqldump donation can connect in,
+ but we reject local connections from modifyingcdata during SST, to keep
+ data intact
+ */
if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE);
make_wsrep_defaults_file();
+ std::ostringstream uuid_oss;
+ uuid_oss << gtid.id();
int ret= snprintf (cmd_str(), cmd_len,
"wsrep_sst_mysqldump "
WSREP_SST_OPT_ADDR " '%s' "
- WSREP_SST_OPT_PORT " '%d' "
+ WSREP_SST_OPT_PORT " '%u' "
WSREP_SST_OPT_LPORT " '%u' "
WSREP_SST_OPT_SOCKET " '%s' "
" %s "
WSREP_SST_OPT_GTID " '%s:%lld' "
WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'"
"%s",
- addr, port, mysqld_port, mysqld_unix_port,
- wsrep_defaults_file, uuid_str,
- (long long)seqno, wsrep_gtid_domain_id,
+ addr, port, mysqld_port, mysqld_unix_port,
+ wsrep_defaults_file,
+ uuid_oss.str().c_str(), gtid.seqno().get(),
+ wsrep_gtid_domain_id,
bypass ? " " WSREP_SST_OPT_BYPASS : "");
if (ret < 0 || ret >= cmd_len)
@@ -1010,16 +936,17 @@ static int sst_donate_mysqldump (const char* addr,
ret= sst_run_shell (cmd_str(), env, 3);
- wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
-
- wsrep->sst_sent (wsrep, &state_id, ret);
+ wsrep::gtid sst_sent_gtid(ret == 0 ?
+ gtid :
+ wsrep::gtid(gtid.id(),
+ wsrep::seqno::undefined()));
+ Wsrep_server_state::instance().sst_sent(sst_sent_gtid, ret);
return ret;
}
wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED;
-
/*
Create a file under data directory.
*/
@@ -1068,7 +995,6 @@ static int sst_create_file(const char *name, const char *content)
return err;
}
-
static int run_sql_command(THD *thd, const char *query)
{
thd->set_query((char *)query, strlen(query));
@@ -1114,9 +1040,9 @@ static int sst_flush_tables(THD* thd)
{
/* Do not use non-supported parser character sets */
WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
- thd->variables.character_set_client = &my_charset_latin1;
+ thd->variables.character_set_client= &my_charset_latin1;
WSREP_WARN("For SST temporally setting character set to : %s",
- my_charset_latin1.csname);
+ my_charset_latin1.csname);
}
if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK"))
@@ -1137,7 +1063,7 @@ static int sst_flush_tables(THD* thd)
}
}
- thd->variables.character_set_client = current_charset;
+ thd->variables.character_set_client= current_charset;
if (err)
{
@@ -1155,7 +1081,6 @@ static int sst_flush_tables(THD* thd)
else
{
WSREP_INFO("Tables flushed.");
-
/*
Tables have been flushed. Create a file with cluster state ID and
wsrep_gtid_domain_id.
@@ -1164,6 +1089,41 @@ static int sst_flush_tables(THD* thd)
snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid,
(long long)wsrep_locked_seqno, wsrep_gtid_domain_id);
err= sst_create_file(flush_success, content);
+
+ const char base_name[]= "tables_flushed";
+ ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2;
+ char *real_name= (char*) malloc(full_len);
+ sprintf(real_name, "%s/%s", mysql_real_data_home, base_name);
+ char *tmp_name= (char*) malloc(full_len + 4);
+ sprintf(tmp_name, "%s.tmp", real_name);
+
+ FILE* file= fopen(tmp_name, "w+");
+ if (0 == file)
+ {
+ err= errno;
+ WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err));
+ }
+ else
+ {
+ Wsrep_server_state& server_state= Wsrep_server_state::instance();
+ std::ostringstream uuid_oss;
+
+ uuid_oss << server_state.current_view().state_id().id();
+
+ fprintf(file, "%s:%lld %u\n",
+ uuid_oss.str().c_str(), server_state.pause_seqno().get(),
+ wsrep_gtid_domain_id);
+ fsync(fileno(file));
+ fclose(file);
+ if (rename(tmp_name, real_name) == -1)
+ {
+ err= errno;
+ WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)",
+ tmp_name, real_name, err,strerror(err));
+ }
+ }
+ free(real_name);
+ free(tmp_name);
}
return err;
@@ -1172,19 +1132,19 @@ static int sst_flush_tables(THD* thd)
static void sst_disallow_writes (THD* thd, bool yes)
{
- char query_str[64] = { 0, };
- ssize_t const query_max = sizeof(query_str) - 1;
+ char query_str[64]= { 0, };
+ ssize_t const query_max= sizeof(query_str) - 1;
CHARSET_INFO *current_charset;
- current_charset = thd->variables.character_set_client;
+ current_charset= thd->variables.character_set_client;
if (!is_supported_parser_charset(current_charset))
{
/* Do not use non-supported parser character sets */
WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname);
- thd->variables.character_set_client = &my_charset_latin1;
+ thd->variables.character_set_client= &my_charset_latin1;
WSREP_WARN("For SST temporally setting character set to : %s",
- my_charset_latin1.csname);
+ my_charset_latin1.csname);
}
snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d",
@@ -1194,7 +1154,7 @@ static void sst_disallow_writes (THD* thd, bool yes)
{
WSREP_ERROR("Failed to disallow InnoDB writes");
}
- thd->variables.character_set_client = current_charset;
+ thd->variables.character_set_client= current_charset;
}
static void* sst_donor_thread (void* a)
@@ -1217,11 +1177,11 @@ static void* sst_donor_thread (void* a)
// operate with wsrep_ready == OFF
wsp::process proc(arg->cmd, "r", arg->env);
- err= proc.error();
+ err= -proc.error();
/* Inform server about SST script startup and release TO isolation */
mysql_mutex_lock (&arg->lock);
- arg->err = -err;
+ arg->err= -err;
mysql_cond_signal (&arg->cond);
mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that.
@@ -1266,7 +1226,7 @@ wait_signal:
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
}
sst_disallow_writes (thd.ptr, false);
- thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
+ thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr);
locked= false;
}
err= 0;
@@ -1280,6 +1240,7 @@ wait_signal:
else
{
WSREP_WARN("Received unknown signal: '%s'", out);
+ proc.wait();
}
}
else
@@ -1287,7 +1248,7 @@ wait_signal:
WSREP_ERROR("Failed to read from: %s", proc.cmd());
proc.wait();
}
- if (!err && proc.error()) err= proc.error();
+ if (!err && proc.error()) err= -proc.error();
}
else
{
@@ -1303,27 +1264,23 @@ wait_signal:
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
}
sst_disallow_writes (thd.ptr, false);
- thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr);
+ thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr);
}
- // signal to donor that SST is over
- struct wsrep_gtid const state_id = {
- ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
- };
- wsrep->sst_sent (wsrep, &state_id, -err);
+ wsrep::gtid gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)),
+ wsrep::seqno(err ? wsrep::seqno::undefined() :
+ wsrep::seqno(ret_seqno)));
+ Wsrep_server_state::instance().sst_sent(gtid, err);
proc.wait();
return NULL;
}
-
-
-static int sst_donate_other (const char* method,
- const char* addr,
- const char* uuid,
- wsrep_seqno_t seqno,
- bool bypass,
- char** env) // carries auth info
+static int sst_donate_other (const char* method,
+ const char* addr,
+ const wsrep::gtid& gtid,
+ bool bypass,
+ char** env) // carries auth info
{
int const cmd_len= 4096;
wsp::string cmd_str(cmd_len);
@@ -1348,6 +1305,8 @@ static int sst_donate_other (const char* method,
make_wsrep_defaults_file();
+ std::ostringstream uuid_oss;
+ uuid_oss << gtid.id();
ret= snprintf (cmd_str(), cmd_len,
"wsrep_sst_%s "
WSREP_SST_OPT_ROLE " 'donor' "
@@ -1362,7 +1321,7 @@ static int sst_donate_other (const char* method,
method, addr, mysqld_unix_port, mysql_real_data_home,
wsrep_defaults_file,
binlog_opt, binlog_opt_val,
- uuid, (long long) seqno, wsrep_gtid_domain_id,
+ uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_domain_id,
bypass ? " " WSREP_SST_OPT_BYPASS : "");
my_free(binlog_opt_val);
@@ -1377,7 +1336,7 @@ static int sst_donate_other (const char* method,
pthread_t tmp;
sst_thread_arg arg(cmd_str(), env);
mysql_mutex_lock (&arg.lock);
- ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg);
+ ret= pthread_create (&tmp, NULL, sst_donor_thread, &arg);
if (ret)
{
WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)",
@@ -1390,23 +1349,18 @@ static int sst_donate_other (const char* method,
return arg.err;
}
-wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
- const void* msg, size_t msg_len,
- const wsrep_gtid_t* current_gtid,
- const char* state, size_t state_len,
- bool bypass)
+int wsrep_sst_donate(const std::string& msg,
+ const wsrep::gtid& current_gtid,
+ const bool bypass)
{
/* This will be reset when sync callback is called.
* Should we set wsrep_ready to FALSE here too? */
- wsrep_config_state->set(WSREP_MEMBER_DONOR);
-
- const char* method = (char*)msg;
- size_t method_len = strlen (method);
- const char* data = method + method_len + 1;
+ wsrep_config_state->set(wsrep::server_state::s_donor);
- char uuid_str[37];
- wsrep_uuid_print (&current_gtid->uuid, uuid_str, sizeof(uuid_str));
+ const char* method= msg.data();
+ size_t method_len= strlen (method);
+ const char* data= method + method_len + 1;
wsp::env env(NULL);
if (env.error())
@@ -1434,54 +1388,13 @@ wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
if (!strcmp (WSREP_SST_MYSQLDUMP, method))
{
- ret = sst_donate_mysqldump(data, &current_gtid->uuid, uuid_str,
- current_gtid->seqno, bypass, env());
+ ret= sst_donate_mysqldump(data, current_gtid, bypass, env());
}
else
{
- ret = sst_donate_other(method, data, uuid_str,
- current_gtid->seqno, bypass, env());
+ ret= sst_donate_other(method, data, current_gtid, bypass, env());
}
- return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
-}
-
-void wsrep_SE_init_grab()
-{
- if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort();
-}
-
-void wsrep_SE_init_wait()
-{
- double total_wtime=0;
-
- while (SE_initialized == false)
- {
- struct timespec wtime;
- set_timespec(wtime, WSREP_TIMEDWAIT_SECONDS);
- time_t start_time = time(NULL);
- mysql_cond_timedwait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init, &wtime);
- time_t end_time = time(NULL);
-
- if (!SE_initialized)
- {
- total_wtime += difftime(end_time, start_time);
- WSREP_DEBUG("Waiting for SST to complete. current seqno: %" PRId64 " waited %f secs.", local_seqno, total_wtime);
- service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL,
- "WSREP state transfer ongoing, current seqno: %ld waited %f secs", local_seqno, total_wtime);
- }
- }
-
- mysql_mutex_unlock (&LOCK_wsrep_sst_init);
-}
-
-void wsrep_SE_init_done()
-{
- mysql_cond_signal (&COND_wsrep_sst_init);
- mysql_mutex_unlock (&LOCK_wsrep_sst_init);
-}
-
-void wsrep_SE_initialized()
-{
- SE_initialized = true;
+ return (ret >= 0 ? 0 : 1);
}
+