diff options
author | Seppo Jaakola <seppo.jaakola@codership.com> | 2013-02-05 15:48:54 +0200 |
---|---|---|
committer | Seppo Jaakola <seppo.jaakola@codership.com> | 2013-02-05 15:48:54 +0200 |
commit | e0c6a87b997509f9eb282988cfb097e30b4c9749 (patch) | |
tree | 0475741cb7626ab2e4a0dc799fe2f53dab516669 /sql/wsrep_sst.cc | |
parent | db7495822940b5c95f10ff5b1e6273701e868436 (diff) | |
download | mariadb-git-e0c6a87b997509f9eb282988cfb097e30b4c9749.tar.gz |
re-merging wsrep files from lp:codership-mysql
Diffstat (limited to 'sql/wsrep_sst.cc')
-rw-r--r-- | sql/wsrep_sst.cc | 1001 |
1 files changed, 0 insertions, 1001 deletions
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc deleted file mode 100644 index 597d0ea087d..00000000000 --- a/sql/wsrep_sst.cc +++ /dev/null @@ -1,1001 +0,0 @@ -/* Copyright 2008-2012 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include <mysqld.h> -#include <sql_class.h> -#include <set_var.h> -#include <sql_acl.h> -#include <sql_reload.h> -#include <sql_parse.h> -#include "wsrep_priv.h" -#include <cstdio> -#include <cstdlib> - -extern const char wsrep_defaults_file[]; - -#define WSREP_SST_OPT_ROLE "--role" -#define WSREP_SST_OPT_ADDR "--address" -#define WSREP_SST_OPT_AUTH "--auth" -#define WSREP_SST_OPT_DATA "--datadir" -#define WSREP_SST_OPT_CONF "--defaults-file" -#define WSREP_SST_OPT_PARENT "--parent" - -// mysqldump-specific options -#define WSREP_SST_OPT_USER "--user" -#define WSREP_SST_OPT_PSWD "--password" -#define WSREP_SST_OPT_HOST "--host" -#define WSREP_SST_OPT_PORT "--port" -#define WSREP_SST_OPT_LPORT "--local-port" - -// donor-specific -#define WSREP_SST_OPT_SOCKET "--socket" -#define WSREP_SST_OPT_GTID "--gtid" -#define WSREP_SST_OPT_BYPASS "--bypass" - -#define WSREP_SST_MYSQLDUMP "mysqldump" -#define WSREP_SST_SKIP "skip" -#define WSREP_SST_DEFAULT WSREP_SST_MYSQLDUMP -#define WSREP_SST_ADDRESS_AUTO "AUTO" -#define WSREP_SST_AUTH_MASK "********" - -const char* wsrep_sst_method = WSREP_SST_DEFAULT; -const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO; -const char* wsrep_sst_donor = ""; - char* wsrep_sst_auth = NULL; - -// container for real auth string -static const char* sst_auth_real = NULL; - -my_bool wsrep_sst_donor_rejects_queries = FALSE; - -bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) -{ - char buff[FN_REFLEN]; - String str(buff, sizeof(buff), system_charset_info), *res; - const char* c_str = NULL; - - if ((res = var->value->val_str(&str)) && - (c_str = res->c_ptr()) && - strlen(c_str) > 0) - return 0; - - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL"); - return 1; -} - -bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type) -{ - return 0; -} - -static bool sst_receive_address_check (const char* str) -{ - if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) || - !strncasecmp(str, "localhost", strlen("localhost"))) - { - return 1; - } - - return 0; -} - -bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var) -{ - const char* c_str = var->value->str_value.c_ptr(); - - if (sst_receive_address_check (c_str)) - { - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL"); - return 1; - } - - return 0; -} - -bool wsrep_sst_receive_address_update (sys_var *self, THD* thd, - enum_var_type type) -{ - return 0; -} - -bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var) -{ - return 0; -} -static bool sst_auth_real_set (const char* value) -{ - const char* v = strdup (value); - - if (v) - { - if (sst_auth_real) free (const_cast<char*>(sst_auth_real)); - sst_auth_real = v; - - if (strlen(sst_auth_real)) - { - if (wsrep_sst_auth) - { - my_free ((void*)wsrep_sst_auth); - wsrep_sst_auth = my_strdup(WSREP_SST_AUTH_MASK, MYF(0)); - //strncpy (wsrep_sst_auth, WSREP_SST_AUTH_MASK, - // sizeof(wsrep_sst_auth) - 1); - } - else - wsrep_sst_auth = my_strdup (WSREP_SST_AUTH_MASK, MYF(0)); - } - return 0; - } - - return 1; -} - -bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type) -{ - return sst_auth_real_set (wsrep_sst_auth); -} - -void wsrep_sst_auth_init (const char* value) -{ - if (wsrep_sst_auth == value) wsrep_sst_auth = NULL; - if (value) sst_auth_real_set (value); -} - -bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var) -{ - return 0; -} - -bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type) -{ - return 0; -} - -static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; - -bool wsrep_before_SE() -{ - return (wsrep_provider != NULL - && strcmp (wsrep_provider, WSREP_NONE) - && strcmp (wsrep_sst_method, WSREP_SST_SKIP) - && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP)); -} - -static bool sst_complete = false; -static bool sst_needed = false; - -void wsrep_sst_grab () -{ - WSREP_INFO("wsrep_sst_grab()"); - if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); - sst_complete = false; - mysql_mutex_unlock (&LOCK_wsrep_sst); -} - -// Wait for end of SST -bool wsrep_sst_wait () -{ - if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); - while (!sst_complete) - { - WSREP_INFO("Waiting for SST to complete."); - mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst); - } - - if (local_seqno >= 0) - { - WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno); - } - else - { - WSREP_ERROR("SST failed: %d (%s)", - int(-local_seqno), strerror(-local_seqno)); - } - - mysql_mutex_unlock (&LOCK_wsrep_sst); - - return (local_seqno >= 0); -} - -// Signal end of SST -void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, - wsrep_seqno_t sst_seqno, - bool needed) -{ - if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); - if (!sst_complete) - { - sst_complete = true; - sst_needed = needed; - local_uuid = *sst_uuid; - local_seqno = sst_seqno; - mysql_cond_signal (&COND_wsrep_sst); - } - else - { - WSREP_WARN("Nobody is waiting for SST."); - } - mysql_mutex_unlock (&LOCK_wsrep_sst); -} - -// Let applier threads to continue -void wsrep_sst_continue () -{ - if (sst_needed) - { - WSREP_INFO("Signalling provider to continue."); - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); - } -} - -struct sst_thread_arg -{ - const char* cmd; - int err; - char* ret_str; - mysql_mutex_t lock; - mysql_cond_t cond; - - sst_thread_arg (const char* c) : cmd(c), err(-1), ret_str(0) - { - mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL); - } - - ~sst_thread_arg() - { - mysql_cond_destroy (&cond); - mysql_mutex_unlock (&lock); - mysql_mutex_destroy (&lock); - } -}; - -static int sst_scan_uuid_seqno (const char* str, - wsrep_uuid_t* uuid, wsrep_seqno_t* seqno) -{ - int offt = wsrep_uuid_scan (str, strlen(str), uuid); - if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt]) - { - *seqno = strtoll (str + offt + 1, NULL, 10); - if (*seqno != LLONG_MAX || errno != ERANGE) - { - return 0; - } - } - - WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str); - return EINVAL; -} - -// get rid of trailing \n -static char* my_fgets (char* buf, size_t buf_len, FILE* stream) -{ - char* ret= fgets (buf, buf_len, stream); - - if (ret) - { - size_t len = strlen(ret); - if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0'; - } - - return ret; -} - -static void* sst_joiner_thread (void* a) -{ - sst_thread_arg* arg= (sst_thread_arg*) a; - int err= 1; - - { - const char magic[] = "ready"; - const size_t magic_len = sizeof(magic) - 1; - const size_t out_len = 512; - char out[out_len]; - - WSREP_INFO("Running: '%s'", arg->cmd); - - wsp::process proc (arg->cmd, "r"); - - if (proc.pipe() && !proc.error()) - { - const char* tmp= my_fgets (out, out_len, proc.pipe()); - - if (!tmp || strlen(tmp) < (magic_len + 2) || - strncasecmp (tmp, magic, magic_len)) - { - WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'", - magic, arg->cmd, tmp); - proc.wait(); - if (proc.error()) err = proc.error(); - } - else - { - err = 0; - } - } - else - { - err = proc.error(); - WSREP_ERROR("Failed to execute: %s : %d (%s)", - arg->cmd, err, strerror(err)); - } - - // signal sst_prepare thread with ret code, - // it will go on sending SST request - mysql_mutex_lock (&arg->lock); - if (!err) - { - arg->ret_str = strdup (out + magic_len + 1); - if (!arg->ret_str) err = ENOMEM; - } - arg->err = -err; - mysql_cond_signal (&arg->cond); - mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. - - if (err) return NULL; /* lp:808417 - return immediately, don't signal - * initializer thread to ensure single thread of - * shutdown. */ - - wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; - wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED; - - // in case of successfull receiver start, wait for SST completion/end - char* tmp = my_fgets (out, out_len, proc.pipe()); - - proc.wait(); - err= EINVAL; - - if (!tmp) - { - WSREP_ERROR("Failed to read uuid:seqno from joiner script."); - if (proc.error()) err = proc.error(); - } - else - { - err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno); - } - - if (err) - { - ret_uuid= WSREP_UUID_UNDEFINED; - ret_seqno= -err; - } - - // Tell initializer thread that SST is complete - wsrep_sst_complete (&ret_uuid, ret_seqno, true); - } - - return NULL; -} - -static ssize_t sst_prepare_other (const char* method, - const char* addr_in, - const char** addr_out) -{ - ssize_t cmd_len= 1024; - char cmd_str[cmd_len]; - const char* sst_dir= mysql_real_data_home; - - int ret= snprintf (cmd_str, cmd_len, - "wsrep_sst_%s " - WSREP_SST_OPT_ROLE" 'joiner' " - WSREP_SST_OPT_ADDR" '%s' " - WSREP_SST_OPT_AUTH" '%s' " - WSREP_SST_OPT_DATA" '%s' " - WSREP_SST_OPT_CONF" '%s' " - WSREP_SST_OPT_PARENT" '%d'", - method, addr_in, (sst_auth_real) ? sst_auth_real : "", - sst_dir, wsrep_defaults_file, (int)getpid()); - - if (ret < 0 || ret >= cmd_len) - { - WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret); - return (ret < 0 ? ret : -EMSGSIZE); - } - - pthread_t tmp; - sst_thread_arg arg(cmd_str); - mysql_mutex_lock (&arg.lock); - ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg); - if (ret) - { - WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)", - ret, strerror(ret)); - return ret; - } - mysql_cond_wait (&arg.cond, &arg.lock); - - *addr_out= arg.ret_str; - - if (!arg.err) - ret = strlen(*addr_out); - else - { - assert (arg.err < 0); - ret = arg.err; - } - - pthread_detach (tmp); - - return ret; -} - -//extern ulong my_bind_addr; -extern uint mysqld_port; - -/*! Just tells donor where to send mysqldump */ -static ssize_t sst_prepare_mysqldump (const char* addr_in, - const char** addr_out) -{ - ssize_t ret = strlen (addr_in); - - if (!strrchr(addr_in, ':')) - { - ssize_t s = ret + 7; - char* tmp = (char*) malloc (s); - - if (tmp) - { - ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port); - - if (ret > 0 && ret < s) - { - *addr_out= tmp; - return ret; - } - if (ret > 0) /* buffer too short */ ret = -EMSGSIZE; - free (tmp); - } - else { - ret= -ENOMEM; - } - - WSREP_ERROR ("Could not prepare state transfer request: " - "adding default port failed: %zd.", ret); - } - else { - *addr_out= addr_in; - } - - return ret; -} - -static bool SE_initialized = false; - -ssize_t wsrep_sst_prepare (void** msg) -{ - const ssize_t ip_max= 256; - char ip_buf[ip_max]; - const char* addr_in= NULL; - const char* addr_out= NULL; - - if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP)) - { - ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1; - *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL); - if (!msg) - { - WSREP_ERROR("Could not allocate %zd bytes for state request", ret); - unireg_abort(1); - } - return ret; - } - - // Figure out SST address. Common for all SST methods - if (wsrep_sst_receive_address && - strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) - { - addr_in= wsrep_sst_receive_address; - } - else if (wsrep_node_address && strlen(wsrep_node_address)) - { - const char* const colon= strchr (wsrep_node_address, ':'); - if (colon) - { - ptrdiff_t const len= colon - wsrep_node_address; - strncpy (ip_buf, wsrep_node_address, len); - ip_buf[len]= '\0'; - addr_in= ip_buf; - } - else - { - addr_in= wsrep_node_address; - } - } - else - { - ssize_t ret= guess_ip (ip_buf, ip_max); - - if (ret && ret < ip_max) - { - addr_in= ip_buf; - } - else - { - WSREP_ERROR("Could not prepare state transfer request: " - "failed to guess address to accept state transfer at. " - "wsrep_sst_receive_address must be set manually."); - unireg_abort(1); - } - } - - ssize_t addr_len= -ENOSYS; - if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP)) - { - addr_len= sst_prepare_mysqldump (addr_in, &addr_out); - if (addr_len < 0) unireg_abort(1); - } - else - { - /*! A heuristic workaround until we learn how to stop and start engines */ - if (SE_initialized) - { - // we already did SST at initializaiton, now engines are running - // sql_print_information() is here because the message is too long - // for WSREP_INFO. - sql_print_information ("WSREP: " - "You have configured '%s' state snapshot transfer method " - "which cannot be performed on a running server. " - "Wsrep provider won't be able to fall back to it " - "if other means of state transfer are unavailable. " - "In that case you will need to restart the server.", - wsrep_sst_method); - *msg = 0; - return 0; - } - - addr_len = sst_prepare_other (wsrep_sst_method, addr_in, &addr_out); - if (addr_len < 0) - { - WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.", - wsrep_sst_method); - unireg_abort(1); - } - } - - size_t const method_len(strlen(wsrep_sst_method)); - size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/); - - *msg = malloc (msg_len); - if (NULL != *msg) { - char* const method_ptr(reinterpret_cast<char*>(*msg)); - strcpy (method_ptr, wsrep_sst_method); - char* const addr_ptr(method_ptr + method_len + 1); - strcpy (addr_ptr, addr_out); - - WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr); - } - else { - WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.", - msg_len); - unireg_abort(1); - } - - if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out); - - return msg_len; -} - -// helper method for donors -static int sst_run_shell (const char* cmd_str, int max_tries) -{ - int ret = 0; - - for (int tries=1; tries <= max_tries; tries++) - { - wsp::process proc (cmd_str, "r"); - - if (NULL != proc.pipe()) - { - proc.wait(); - } - - if ((ret = proc.error())) - { - WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)", - tries, max_tries, proc.cmd(), ret, strerror(ret)); - sleep (1); - } - else - { - WSREP_DEBUG("SST script successfully completed."); - break; - } - } - - return -ret; -} - -static void sst_reject_queries(my_bool close_conn) -{ - wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced - WSREP_INFO("Rejecting client queries for the duration of SST."); - if (TRUE == close_conn) wsrep_close_client_connections(FALSE); -} - -static int sst_mysqldump_check_addr (const char* user, const char* pswd, - const char* host, const char* port) -{ - return 0; -} - -static int sst_donate_mysqldump (const char* addr, - const wsrep_uuid_t* uuid, - const char* uuid_str, - wsrep_seqno_t seqno, - bool bypass) -{ - size_t host_len; - const char* port = strchr (addr, ':'); - - if (port) - { - port += 1; - host_len = port - addr; - } - else - { - port = ""; - host_len = strlen (addr) + 1; - } - - char host[host_len]; - - strncpy (host, addr, host_len - 1); - host[host_len - 1] = '\0'; - - const char* auth = sst_auth_real; - const char* pswd = (auth) ? strchr (auth, ':') : NULL; - size_t user_len; - - if (pswd) - { - pswd += 1; - user_len = pswd - auth; - } - else - { - pswd = ""; - user_len = (auth) ? strlen (auth) + 1 : 1; - } - - char user[user_len]; - - strncpy (user, (auth) ? auth : "", user_len - 1); - user[user_len - 1] = '\0'; - - int ret = sst_mysqldump_check_addr (user, pswd, host, port); - if (!ret) - { - size_t cmd_len= 1024; - char cmd_str[cmd_len]; - - if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE); - - snprintf (cmd_str, cmd_len, - "wsrep_sst_mysqldump " - WSREP_SST_OPT_USER" '%s' " - WSREP_SST_OPT_PSWD" '%s' " - WSREP_SST_OPT_HOST" '%s' " - WSREP_SST_OPT_PORT" '%s' " - WSREP_SST_OPT_LPORT" '%u' " - WSREP_SST_OPT_SOCKET" '%s' " - WSREP_SST_OPT_GTID" '%s:%lld'" - "%s", - user, pswd, host, port, mysqld_port, mysqld_unix_port, uuid_str, - (long long)seqno, bypass ? " "WSREP_SST_OPT_BYPASS : ""); - - WSREP_DEBUG("Running: '%s'", cmd_str); - - ret= sst_run_shell (cmd_str, 3); - } - - wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno); - - return ret; -} - -wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; - -static int run_sql_command(THD *thd, const char *query) -{ - thd->set_query((char *)query, strlen(query)); - - Parser_state ps; - if (ps.init(thd, thd->query(), thd->query_length())) - { - WSREP_ERROR("SST query: %s failed", query); - return -1; - } - - mysql_parse(thd, thd->query(), thd->query_length(), &ps); - if (thd->is_error()) - { - int const err= thd->stmt_da->sql_errno(); - WSREP_WARN ("error executing '%s': %d (%s)%s", - query, err, thd->stmt_da->message(), - err == ER_UNKNOWN_SYSTEM_VARIABLE ? - ". Was mysqld built with --with-innodb-disallow-writes ?" : ""); - thd->clear_error(); - return -1; - } - return 0; -} - -static int sst_flush_tables(THD* thd) -{ - WSREP_INFO("Flushing tables for SST..."); - - int err; - int not_used; - if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK")) - { - WSREP_ERROR("Failed to flush and lock tables"); - err = -1; - } - else - { - /* make sure logs are flushed after global read lock acquired */ - err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG, - (TABLE_LIST*) 0, ¬_used); - } - - if (err) - { - WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err)); - } - else - { - WSREP_INFO("Tables flushed."); - const char base_name[]= "tables_flushed"; - ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2; - char real_name[full_len]; - sprintf(real_name, "%s/%s", mysql_real_data_home, base_name); - char tmp_name[full_len + 4]; - sprintf(tmp_name, "%s.tmp", real_name); - - FILE* file= fopen(tmp_name, "w+"); - if (0 == file) - { - err= errno; - WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err)); - } - else - { - fprintf(file, "%s:%lld\n", - wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno); - fsync(fileno(file)); - fclose(file); - if (rename(tmp_name, real_name) == -1) - { - err= errno; - WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", - tmp_name, real_name, err,strerror(err)); - } - } - } - - return err; -} - -static void sst_disallow_writes (THD* thd, bool yes) -{ - char query_str[64] = { 0, }; - ssize_t const query_max = sizeof(query_str) - 1; - snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d", - yes ? 1 : 0); - - if (run_sql_command(thd, query_str)) - { - WSREP_ERROR("Failed to disallow InnoDB writes"); - } -} - -static void* sst_donor_thread (void* a) -{ - sst_thread_arg* arg= (sst_thread_arg*)a; - - WSREP_INFO("Running: '%s'", arg->cmd); - - int err= 1; - bool locked= false; - - const char* out= NULL; - const size_t out_len= 128; - char out_buf[out_len]; - - wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED; - wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST - - wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can - // operate with wsrep_ready == OFF - wsp::process proc(arg->cmd, "r"); - - err= proc.error(); - -/* Inform server about SST script startup and release TO isolation */ - mysql_mutex_lock (&arg->lock); - arg->err = -err; - mysql_cond_signal (&arg->cond); - mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. - - if (proc.pipe() && !err) - { -wait_signal: - out= my_fgets (out_buf, out_len, proc.pipe()); - - if (out) - { - const char magic_flush[]= "flush tables"; - const char magic_cont[]= "continue"; - const char magic_done[]= "done"; - - if (!strcasecmp (out, magic_flush)) - { - err= sst_flush_tables (thd.ptr); - if (!err) - { - sst_disallow_writes (thd.ptr, true); - locked= true; - goto wait_signal; - } - } - else if (!strcasecmp (out, magic_cont)) - { - if (locked) - { - sst_disallow_writes (thd.ptr, false); - thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr); - locked= false; - } - err= 0; - goto wait_signal; - } - else if (!strncasecmp (out, magic_done, strlen(magic_done))) - { - err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1, - &ret_uuid, &ret_seqno); - } - else - { - WSREP_WARN("Received unknown signal: '%s'", out); - } - } - else - { - WSREP_ERROR("Failed to read from: %s", proc.cmd()); - } - if (err && proc.error()) err= proc.error(); - } - else - { - WSREP_ERROR("Failed to execute: %s : %d (%s)", - proc.cmd(), err, strerror(err)); - } - - if (locked) // don't forget to unlock server before return - { - sst_disallow_writes (thd.ptr, false); - thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr); - } - - // signal to donor that SST is over - wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno); - proc.wait(); - - return NULL; -} - -static int sst_donate_other (const char* method, - const char* addr, - const char* uuid, - wsrep_seqno_t seqno, - bool bypass) -{ - ssize_t cmd_len = 4096; - char cmd_str[cmd_len]; - - int ret= snprintf (cmd_str, cmd_len, - "wsrep_sst_%s " - WSREP_SST_OPT_ROLE" 'donor' " - WSREP_SST_OPT_ADDR" '%s' " - WSREP_SST_OPT_AUTH" '%s' " - WSREP_SST_OPT_SOCKET" '%s' " - WSREP_SST_OPT_DATA" '%s' " - WSREP_SST_OPT_CONF" '%s' " - WSREP_SST_OPT_GTID" '%s:%lld'" - "%s", - method, addr, sst_auth_real, mysqld_unix_port, - mysql_real_data_home, wsrep_defaults_file, - uuid, (long long) seqno, - bypass ? " "WSREP_SST_OPT_BYPASS : ""); - - if (ret < 0 || ret >= cmd_len) - { - WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret); - return (ret < 0 ? ret : -EMSGSIZE); - } - - if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE); - - pthread_t tmp; - sst_thread_arg arg(cmd_str); - mysql_mutex_lock (&arg.lock); - ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg); - if (ret) - { - WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)", - ret, strerror(ret)); - return ret; - } - mysql_cond_wait (&arg.cond, &arg.lock); - - WSREP_INFO("sst_donor_thread signaled with %d", arg.err); - return arg.err; -} - -int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, - const void* msg, size_t msg_len, - const wsrep_uuid_t* current_uuid, - wsrep_seqno_t current_seqno, - const char* state, size_t state_len, - bool bypass) -{ - /* This will be reset when sync callback is called. - * Should we set wsrep_ready to FALSE here too? */ -// wsrep_notify_status(WSREP_MEMBER_DONOR); - local_status.set(WSREP_MEMBER_DONOR); - - const char* method = (char*)msg; - size_t method_len = strlen (method); - const char* data = method + method_len + 1; - - char uuid_str[37]; - wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str)); - - int ret; - if (!strcmp (WSREP_SST_MYSQLDUMP, method)) - { - ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno, - bypass); - } - else - { - ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass); - } - - return (ret > 0 ? 0 : ret); -} - -void wsrep_SE_init_grab() -{ - if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort(); -} - -void wsrep_SE_init_wait() -{ - mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init); - mysql_mutex_unlock (&LOCK_wsrep_sst_init); -} - -void wsrep_SE_init_done() -{ - mysql_cond_signal (&COND_wsrep_sst_init); - mysql_mutex_unlock (&LOCK_wsrep_sst_init); -} - -void wsrep_SE_initialized() -{ - SE_initialized = true; -} |