/* Copyright 2008-2012 Codership Oy This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include "wsrep_priv.h" #include #include extern const char wsrep_defaults_file[]; #define WSREP_SST_OPT_ROLE "--role" #define WSREP_SST_OPT_ADDR "--address" #define WSREP_SST_OPT_AUTH "--auth" #define WSREP_SST_OPT_DATA "--datadir" #define WSREP_SST_OPT_CONF "--defaults-file" #define WSREP_SST_OPT_PARENT "--parent" // mysqldump-specific options #define WSREP_SST_OPT_USER "--user" #define WSREP_SST_OPT_PSWD "--password" #define WSREP_SST_OPT_HOST "--host" #define WSREP_SST_OPT_PORT "--port" #define WSREP_SST_OPT_LPORT "--local-port" // donor-specific #define WSREP_SST_OPT_SOCKET "--socket" #define WSREP_SST_OPT_GTID "--gtid" #define WSREP_SST_OPT_BYPASS "--bypass" #define WSREP_SST_MYSQLDUMP "mysqldump" #define WSREP_SST_SKIP "skip" #define WSREP_SST_DEFAULT WSREP_SST_MYSQLDUMP #define WSREP_SST_ADDRESS_AUTO "AUTO" #define WSREP_SST_AUTH_MASK "********" const char* wsrep_sst_method = WSREP_SST_DEFAULT; const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO; const char* wsrep_sst_donor = ""; char* wsrep_sst_auth = NULL; // container for real auth string static const char* sst_auth_real = NULL; my_bool wsrep_sst_donor_rejects_queries = FALSE; bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) { char buff[FN_REFLEN]; String str(buff, sizeof(buff), system_charset_info), *res; const char* c_str = NULL; if ((res = var->value->val_str(&str)) && (c_str = res->c_ptr()) && strlen(c_str) > 0) return 0; my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL"); return 1; } bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type) { return 0; } static bool sst_receive_address_check (const char* str) { if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) || !strncasecmp(str, "localhost", strlen("localhost"))) { return 1; } return 0; } bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var) { const char* c_str = var->value->str_value.c_ptr(); if (sst_receive_address_check (c_str)) { my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL"); return 1; } return 0; } bool wsrep_sst_receive_address_update (sys_var *self, THD* thd, enum_var_type type) { return 0; } bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var) { return 0; } static bool sst_auth_real_set (const char* value) { const char* v = strdup (value); if (v) { if (sst_auth_real) free (const_cast(sst_auth_real)); sst_auth_real = v; if (strlen(sst_auth_real)) { if (wsrep_sst_auth) { my_free ((void*)wsrep_sst_auth); wsrep_sst_auth = my_strdup(WSREP_SST_AUTH_MASK, MYF(0)); //strncpy (wsrep_sst_auth, WSREP_SST_AUTH_MASK, // sizeof(wsrep_sst_auth) - 1); } else wsrep_sst_auth = my_strdup (WSREP_SST_AUTH_MASK, MYF(0)); } return 0; } return 1; } bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type) { return sst_auth_real_set (wsrep_sst_auth); } void wsrep_sst_auth_init (const char* value) { if (wsrep_sst_auth == value) wsrep_sst_auth = NULL; if (value) sst_auth_real_set (value); } bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var) { return 0; } bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type) { return 0; } static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; bool wsrep_before_SE() { return (wsrep_provider != NULL && strcmp (wsrep_provider, WSREP_NONE) && strcmp (wsrep_sst_method, WSREP_SST_SKIP) && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP)); } static bool sst_complete = false; static bool sst_needed = false; void wsrep_sst_grab () { WSREP_INFO("wsrep_sst_grab()"); if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); sst_complete = false; mysql_mutex_unlock (&LOCK_wsrep_sst); } // Wait for end of SST bool wsrep_sst_wait () { if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); while (!sst_complete) { WSREP_INFO("Waiting for SST to complete."); mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst); } if (local_seqno >= 0) { WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno); } else { WSREP_ERROR("SST failed: %d (%s)", int(-local_seqno), strerror(-local_seqno)); } mysql_mutex_unlock (&LOCK_wsrep_sst); return (local_seqno >= 0); } // Signal end of SST void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, wsrep_seqno_t sst_seqno, bool needed) { if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); if (!sst_complete) { sst_complete = true; sst_needed = needed; local_uuid = *sst_uuid; local_seqno = sst_seqno; mysql_cond_signal (&COND_wsrep_sst); } else { WSREP_WARN("Nobody is waiting for SST."); } mysql_mutex_unlock (&LOCK_wsrep_sst); } // Let applier threads to continue void wsrep_sst_continue () { if (sst_needed) { WSREP_INFO("Signalling provider to continue."); wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); } } struct sst_thread_arg { const char* cmd; int err; char* ret_str; mysql_mutex_t lock; mysql_cond_t cond; sst_thread_arg (const char* c) : cmd(c), err(-1), ret_str(0) { mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL); } ~sst_thread_arg() { mysql_cond_destroy (&cond); mysql_mutex_unlock (&lock); mysql_mutex_destroy (&lock); } }; static int sst_scan_uuid_seqno (const char* str, wsrep_uuid_t* uuid, wsrep_seqno_t* seqno) { int offt = wsrep_uuid_scan (str, strlen(str), uuid); if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt]) { *seqno = strtoll (str + offt + 1, NULL, 10); if (*seqno != LLONG_MAX || errno != ERANGE) { return 0; } } WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str); return EINVAL; } // get rid of trailing \n static char* my_fgets (char* buf, size_t buf_len, FILE* stream) { char* ret= fgets (buf, buf_len, stream); if (ret) { size_t len = strlen(ret); if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0'; } return ret; } static void* sst_joiner_thread (void* a) { sst_thread_arg* arg= (sst_thread_arg*) a; int err= 1; { const char magic[] = "ready"; const size_t magic_len = sizeof(magic) - 1; const size_t out_len = 512; char out[out_len]; WSREP_INFO("Running: '%s'", arg->cmd); wsp::process proc (arg->cmd, "r"); if (proc.pipe() && !proc.error()) { const char* tmp= my_fgets (out, out_len, proc.pipe()); if (!tmp || strlen(tmp) < (magic_len + 2) || strncasecmp (tmp, magic, magic_len)) { WSREP_ERROR("Failed to read '%s ' from: %s\n\tRead: '%s'", magic, arg->cmd, tmp); proc.wait(); if (proc.error()) err = proc.error(); } else { err = 0; } } else { err = proc.error(); WSREP_ERROR("Failed to execute: %s : %d (%s)", arg->cmd, err, strerror(err)); } // signal sst_prepare thread with ret code, // it will go on sending SST request mysql_mutex_lock (&arg->lock); if (!err) { arg->ret_str = strdup (out + magic_len + 1); if (!arg->ret_str) err = ENOMEM; } arg->err = -err; mysql_cond_signal (&arg->cond); mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. if (err) return NULL; /* lp:808417 - return immediately, don't signal * initializer thread to ensure single thread of * shutdown. */ wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED; // in case of successfull receiver start, wait for SST completion/end char* tmp = my_fgets (out, out_len, proc.pipe()); proc.wait(); err= EINVAL; if (!tmp) { WSREP_ERROR("Failed to read uuid:seqno from joiner script."); if (proc.error()) err = proc.error(); } else { err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno); } if (err) { ret_uuid= WSREP_UUID_UNDEFINED; ret_seqno= -err; } // Tell initializer thread that SST is complete wsrep_sst_complete (&ret_uuid, ret_seqno, true); } return NULL; } static ssize_t sst_prepare_other (const char* method, const char* addr_in, const char** addr_out) { ssize_t cmd_len= 1024; char cmd_str[cmd_len]; const char* sst_dir= mysql_real_data_home; int ret= snprintf (cmd_str, cmd_len, "wsrep_sst_%s " WSREP_SST_OPT_ROLE" 'joiner' " WSREP_SST_OPT_ADDR" '%s' " WSREP_SST_OPT_AUTH" '%s' " WSREP_SST_OPT_DATA" '%s' " WSREP_SST_OPT_CONF" '%s' " WSREP_SST_OPT_PARENT" '%d'", method, addr_in, (sst_auth_real) ? sst_auth_real : "", sst_dir, wsrep_defaults_file, (int)getpid()); if (ret < 0 || ret >= cmd_len) { WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret); return (ret < 0 ? ret : -EMSGSIZE); } pthread_t tmp; sst_thread_arg arg(cmd_str); mysql_mutex_lock (&arg.lock); ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg); if (ret) { WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)", ret, strerror(ret)); return ret; } mysql_cond_wait (&arg.cond, &arg.lock); *addr_out= arg.ret_str; if (!arg.err) ret = strlen(*addr_out); else { assert (arg.err < 0); ret = arg.err; } pthread_detach (tmp); return ret; } //extern ulong my_bind_addr; extern uint mysqld_port; /*! Just tells donor where to send mysqldump */ static ssize_t sst_prepare_mysqldump (const char* addr_in, const char** addr_out) { ssize_t ret = strlen (addr_in); if (!strrchr(addr_in, ':')) { ssize_t s = ret + 7; char* tmp = (char*) malloc (s); if (tmp) { ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port); if (ret > 0 && ret < s) { *addr_out= tmp; return ret; } if (ret > 0) /* buffer too short */ ret = -EMSGSIZE; free (tmp); } else { ret= -ENOMEM; } WSREP_ERROR ("Could not prepare state transfer request: " "adding default port failed: %zd.", ret); } else { *addr_out= addr_in; } return ret; } static bool SE_initialized = false; ssize_t wsrep_sst_prepare (void** msg) { const ssize_t ip_max= 256; char ip_buf[ip_max]; const char* addr_in= NULL; const char* addr_out= NULL; if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP)) { ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1; *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL); if (!msg) { WSREP_ERROR("Could not allocate %zd bytes for state request", ret); unireg_abort(1); } return ret; } // Figure out SST address. Common for all SST methods if (wsrep_sst_receive_address && strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) { addr_in= wsrep_sst_receive_address; } else if (wsrep_node_address && strlen(wsrep_node_address)) { const char* const colon= strchr (wsrep_node_address, ':'); if (colon) { ptrdiff_t const len= colon - wsrep_node_address; strncpy (ip_buf, wsrep_node_address, len); ip_buf[len]= '\0'; addr_in= ip_buf; } else { addr_in= wsrep_node_address; } } else { ssize_t ret= guess_ip (ip_buf, ip_max); if (ret && ret < ip_max) { addr_in= ip_buf; } else { WSREP_ERROR("Could not prepare state transfer request: " "failed to guess address to accept state transfer at. " "wsrep_sst_receive_address must be set manually."); unireg_abort(1); } } ssize_t addr_len= -ENOSYS; if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP)) { addr_len= sst_prepare_mysqldump (addr_in, &addr_out); if (addr_len < 0) unireg_abort(1); } else { /*! A heuristic workaround until we learn how to stop and start engines */ if (SE_initialized) { // we already did SST at initializaiton, now engines are running // sql_print_information() is here because the message is too long // for WSREP_INFO. sql_print_information ("WSREP: " "You have configured '%s' state snapshot transfer method " "which cannot be performed on a running server. " "Wsrep provider won't be able to fall back to it " "if other means of state transfer are unavailable. " "In that case you will need to restart the server.", wsrep_sst_method); *msg = 0; return 0; } addr_len = sst_prepare_other (wsrep_sst_method, addr_in, &addr_out); if (addr_len < 0) { WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.", wsrep_sst_method); unireg_abort(1); } } size_t const method_len(strlen(wsrep_sst_method)); size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/); *msg = malloc (msg_len); if (NULL != *msg) { char* const method_ptr(reinterpret_cast(*msg)); strcpy (method_ptr, wsrep_sst_method); char* const addr_ptr(method_ptr + method_len + 1); strcpy (addr_ptr, addr_out); WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr); } else { WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.", msg_len); unireg_abort(1); } if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out); return msg_len; } // helper method for donors static int sst_run_shell (const char* cmd_str, int max_tries) { int ret = 0; for (int tries=1; tries <= max_tries; tries++) { wsp::process proc (cmd_str, "r"); if (NULL != proc.pipe()) { proc.wait(); } if ((ret = proc.error())) { WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)", tries, max_tries, proc.cmd(), ret, strerror(ret)); sleep (1); } else { WSREP_DEBUG("SST script successfully completed."); break; } } return -ret; } static void sst_reject_queries(my_bool close_conn) { wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced WSREP_INFO("Rejecting client queries for the duration of SST."); if (TRUE == close_conn) wsrep_close_client_connections(FALSE); } static int sst_mysqldump_check_addr (const char* user, const char* pswd, const char* host, const char* port) { return 0; } static int sst_donate_mysqldump (const char* addr, const wsrep_uuid_t* uuid, const char* uuid_str, wsrep_seqno_t seqno, bool bypass) { size_t host_len; const char* port = strchr (addr, ':'); if (port) { port += 1; host_len = port - addr; } else { port = ""; host_len = strlen (addr) + 1; } char host[host_len]; strncpy (host, addr, host_len - 1); host[host_len - 1] = '\0'; const char* auth = sst_auth_real; const char* pswd = (auth) ? strchr (auth, ':') : NULL; size_t user_len; if (pswd) { pswd += 1; user_len = pswd - auth; } else { pswd = ""; user_len = (auth) ? strlen (auth) + 1 : 1; } char user[user_len]; strncpy (user, (auth) ? auth : "", user_len - 1); user[user_len - 1] = '\0'; int ret = sst_mysqldump_check_addr (user, pswd, host, port); if (!ret) { size_t cmd_len= 1024; char cmd_str[cmd_len]; if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE); snprintf (cmd_str, cmd_len, "wsrep_sst_mysqldump " WSREP_SST_OPT_USER" '%s' " WSREP_SST_OPT_PSWD" '%s' " WSREP_SST_OPT_HOST" '%s' " WSREP_SST_OPT_PORT" '%s' " WSREP_SST_OPT_LPORT" '%u' " WSREP_SST_OPT_SOCKET" '%s' " WSREP_SST_OPT_GTID" '%s:%lld'" "%s", user, pswd, host, port, mysqld_port, mysqld_unix_port, uuid_str, (long long)seqno, bypass ? " "WSREP_SST_OPT_BYPASS : ""); WSREP_DEBUG("Running: '%s'", cmd_str); ret= sst_run_shell (cmd_str, 3); } wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno); return ret; } wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; static int run_sql_command(THD *thd, const char *query) { thd->set_query((char *)query, strlen(query)); Parser_state ps; if (ps.init(thd, thd->query(), thd->query_length())) { WSREP_ERROR("SST query: %s failed", query); return -1; } mysql_parse(thd, thd->query(), thd->query_length(), &ps); if (thd->is_error()) { int const err= thd->stmt_da->sql_errno(); WSREP_WARN ("error executing '%s': %d (%s)%s", query, err, thd->stmt_da->message(), err == ER_UNKNOWN_SYSTEM_VARIABLE ? ". Was mysqld built with --with-innodb-disallow-writes ?" : ""); thd->clear_error(); return -1; } return 0; } static int sst_flush_tables(THD* thd) { WSREP_INFO("Flushing tables for SST..."); int err; int not_used; if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK")) { WSREP_ERROR("Failed to flush and lock tables"); err = -1; } else { /* make sure logs are flushed after global read lock acquired */ err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG, (TABLE_LIST*) 0, ¬_used); } if (err) { WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err)); } else { WSREP_INFO("Tables flushed."); const char base_name[]= "tables_flushed"; ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2; char real_name[full_len]; sprintf(real_name, "%s/%s", mysql_real_data_home, base_name); char tmp_name[full_len + 4]; sprintf(tmp_name, "%s.tmp", real_name); FILE* file= fopen(tmp_name, "w+"); if (0 == file) { err= errno; WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err)); } else { fprintf(file, "%s:%lld\n", wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno); fsync(fileno(file)); fclose(file); if (rename(tmp_name, real_name) == -1) { err= errno; WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", tmp_name, real_name, err,strerror(err)); } } } return err; } static void sst_disallow_writes (THD* thd, bool yes) { char query_str[64] = { 0, }; ssize_t const query_max = sizeof(query_str) - 1; snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d", yes ? 1 : 0); if (run_sql_command(thd, query_str)) { WSREP_ERROR("Failed to disallow InnoDB writes"); } } static void* sst_donor_thread (void* a) { sst_thread_arg* arg= (sst_thread_arg*)a; WSREP_INFO("Running: '%s'", arg->cmd); int err= 1; bool locked= false; const char* out= NULL; const size_t out_len= 128; char out_buf[out_len]; wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED; wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can // operate with wsrep_ready == OFF wsp::process proc(arg->cmd, "r"); err= proc.error(); /* Inform server about SST script startup and release TO isolation */ mysql_mutex_lock (&arg->lock); arg->err = -err; mysql_cond_signal (&arg->cond); mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. if (proc.pipe() && !err) { wait_signal: out= my_fgets (out_buf, out_len, proc.pipe()); if (out) { const char magic_flush[]= "flush tables"; const char magic_cont[]= "continue"; const char magic_done[]= "done"; if (!strcasecmp (out, magic_flush)) { err= sst_flush_tables (thd.ptr); if (!err) { sst_disallow_writes (thd.ptr, true); locked= true; goto wait_signal; } } else if (!strcasecmp (out, magic_cont)) { if (locked) { sst_disallow_writes (thd.ptr, false); thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr); locked= false; } err= 0; goto wait_signal; } else if (!strncasecmp (out, magic_done, strlen(magic_done))) { err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1, &ret_uuid, &ret_seqno); } else { WSREP_WARN("Received unknown signal: '%s'", out); } } else { WSREP_ERROR("Failed to read from: %s", proc.cmd()); } if (err && proc.error()) err= proc.error(); } else { WSREP_ERROR("Failed to execute: %s : %d (%s)", proc.cmd(), err, strerror(err)); } if (locked) // don't forget to unlock server before return { sst_disallow_writes (thd.ptr, false); thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr); } // signal to donor that SST is over wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno); proc.wait(); return NULL; } static int sst_donate_other (const char* method, const char* addr, const char* uuid, wsrep_seqno_t seqno, bool bypass) { ssize_t cmd_len = 4096; char cmd_str[cmd_len]; int ret= snprintf (cmd_str, cmd_len, "wsrep_sst_%s " WSREP_SST_OPT_ROLE" 'donor' " WSREP_SST_OPT_ADDR" '%s' " WSREP_SST_OPT_AUTH" '%s' " WSREP_SST_OPT_SOCKET" '%s' " WSREP_SST_OPT_DATA" '%s' " WSREP_SST_OPT_CONF" '%s' " WSREP_SST_OPT_GTID" '%s:%lld'" "%s", method, addr, sst_auth_real, mysqld_unix_port, mysql_real_data_home, wsrep_defaults_file, uuid, (long long) seqno, bypass ? " "WSREP_SST_OPT_BYPASS : ""); if (ret < 0 || ret >= cmd_len) { WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret); return (ret < 0 ? ret : -EMSGSIZE); } if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE); pthread_t tmp; sst_thread_arg arg(cmd_str); mysql_mutex_lock (&arg.lock); ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg); if (ret) { WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)", ret, strerror(ret)); return -ret; } mysql_cond_wait (&arg.cond, &arg.lock); WSREP_INFO("sst_donor_thread signaled with %d", arg.err); return arg.err; } int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, const void* msg, size_t msg_len, const wsrep_uuid_t* current_uuid, wsrep_seqno_t current_seqno, const char* state, size_t state_len, bool bypass) { /* This will be reset when sync callback is called. * Should we set wsrep_ready to FALSE here too? */ // wsrep_notify_status(WSREP_MEMBER_DONOR); local_status.set(WSREP_MEMBER_DONOR); const char* method = (char*)msg; size_t method_len = strlen (method); const char* data = method + method_len + 1; char uuid_str[37]; wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str)); int ret; if (!strcmp (WSREP_SST_MYSQLDUMP, method)) { ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno, bypass); } else { ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass); } return (ret > 0 ? 0 : ret); } void wsrep_SE_init_grab() { if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort(); } void wsrep_SE_init_wait() { mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init); mysql_mutex_unlock (&LOCK_wsrep_sst_init); } void wsrep_SE_init_done() { mysql_cond_signal (&COND_wsrep_sst_init); mysql_mutex_unlock (&LOCK_wsrep_sst_init); } void wsrep_SE_initialized() { SE_initialized = true; }