diff options
Diffstat (limited to 'sql/wsrep_sst.cc')
-rw-r--r-- | sql/wsrep_sst.cc | 595 |
1 files changed, 252 insertions, 343 deletions
diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index 0a2424fa069..4e3a7072629 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -1,4 +1,4 @@ -/* Copyright 2008-2015 Codership Oy <http://www.codership.com> +/* Copyright 2008-2017 Codership Oy <http://www.codership.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -35,16 +35,16 @@ static char wsrep_defaults_file[FN_REFLEN * 2 + 10 + 30 + sizeof(WSREP_SST_OPT_CONF) + sizeof(WSREP_SST_OPT_CONF_SUFFIX) + - sizeof(WSREP_SST_OPT_CONF_EXTRA)] = {0}; + sizeof(WSREP_SST_OPT_CONF_EXTRA)]= {0}; -const char* wsrep_sst_method = WSREP_SST_DEFAULT; -const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO; -const char* wsrep_sst_donor = ""; -const char* wsrep_sst_auth = NULL; +const char* wsrep_sst_method = WSREP_SST_DEFAULT; +const char* wsrep_sst_receive_address= WSREP_SST_ADDRESS_AUTO; +const char* wsrep_sst_donor = ""; +const char* wsrep_sst_auth = NULL; // container for real auth string -static const char* sst_auth_real = NULL; -my_bool wsrep_sst_donor_rejects_queries = FALSE; +static const char* sst_auth_real = NULL; +my_bool wsrep_sst_donor_rejects_queries= FALSE; bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) { @@ -65,7 +65,7 @@ bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type) return 0; } -static const char* data_home_dir = NULL; +static const char* data_home_dir= NULL; void wsrep_set_data_home_dir(const char *data_dir) { @@ -139,7 +139,7 @@ static bool sst_auth_real_set (const char* value) { // set sst_auth_real if (sst_auth_real) { my_free((void *) sst_auth_real); } - sst_auth_real = v; + sst_auth_real= v; // mask wsrep_sst_auth if (strlen(sst_auth_real)) @@ -180,6 +180,7 @@ bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type) return 0; } + bool wsrep_before_SE() { return (wsrep_provider != NULL @@ -188,111 +189,29 @@ bool wsrep_before_SE() && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP)); } -static bool sst_complete = false; -static bool sst_needed = false; - -#define WSREP_EXTEND_TIMEOUT_INTERVAL 30 -#define WSREP_TIMEDWAIT_SECONDS 10 - -void wsrep_sst_grab () -{ - WSREP_INFO("wsrep_sst_grab()"); - if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); - sst_complete = false; - mysql_mutex_unlock (&LOCK_wsrep_sst); -} - -// Wait for end of SST -bool wsrep_sst_wait () -{ - double total_wtime = 0; - - if (mysql_mutex_lock (&LOCK_wsrep_sst)) - abort(); - - WSREP_INFO("Waiting for SST to complete."); - - while (!sst_complete) - { - struct timespec wtime; - set_timespec(wtime, WSREP_TIMEDWAIT_SECONDS); - time_t start_time = time(NULL); - mysql_cond_timedwait (&COND_wsrep_sst, &LOCK_wsrep_sst, &wtime); - time_t end_time = time(NULL); - - if (!sst_complete) - { - total_wtime += difftime(end_time, start_time); - WSREP_DEBUG("Waiting for SST to complete. current seqno: %" PRId64 " waited %f secs.", local_seqno, total_wtime); - service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL, - "WSREP state transfer ongoing, current seqno: %ld waited %f secs", local_seqno, total_wtime); - } - } - - if (local_seqno >= 0) - { - WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno); - } - else - { - WSREP_ERROR("SST failed: %d (%s)", - int(-local_seqno), strerror(-local_seqno)); - } - - mysql_mutex_unlock (&LOCK_wsrep_sst); - - return (local_seqno >= 0); -} - // Signal end of SST -void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, - wsrep_seqno_t sst_seqno, - bool needed) +static void wsrep_sst_complete (THD* thd, + int const rcode) { - if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); - if (!sst_complete) - { - sst_complete = true; - sst_needed = needed; - local_uuid = *sst_uuid; - local_seqno = sst_seqno; - mysql_cond_signal (&COND_wsrep_sst); - } - else - { - /* This can happen when called from wsrep_synced_cb(). - At the moment there is no way to check there - if main thread is still waiting for signal, - so wsrep_sst_complete() is called from there - each time wsrep_ready changes from FALSE -> TRUE. - */ - WSREP_DEBUG("Nobody is waiting for SST."); - } - mysql_mutex_unlock (&LOCK_wsrep_sst); + Wsrep_client_service client_service(thd, thd->wsrep_cs()); + Wsrep_server_state::instance().sst_received(client_service, rcode); } -/* + /* If wsrep provider is loaded, inform that the new state snapshot has been received. Also update the local checkpoint. - @param wsrep [IN] wsrep handle + @param thd [IN] @param uuid [IN] Initial state UUID @param seqno [IN] Initial state sequence number @param state [IN] Always NULL, also ignored by wsrep provider (?) @param state_len [IN] Always 0, also ignored by wsrep provider (?) - @param implicit [IN] Whether invoked implicitly due to SST - (true) or explicitly because if change - in wsrep_start_position by user (false). - @return false Success - true Error - */ -bool wsrep_sst_received (wsrep_t* const wsrep, - const wsrep_uuid_t& uuid, - const wsrep_seqno_t seqno, - const void* const state, - const size_t state_len, - const bool implicit) +void wsrep_sst_received (THD* thd, + const wsrep_uuid_t& uuid, + wsrep_seqno_t const seqno, + const void* const state, + size_t const state_len) { /* To keep track of whether the local uuid:seqno should be updated. Also, note @@ -300,81 +219,40 @@ bool wsrep_sst_received (wsrep_t* const wsrep, OK from wsrep provider. By doing so, the values remain consistent across the server & wsrep provider. */ - bool do_update= false; - - // Get the locally stored uuid:seqno. - if (wsrep_get_SE_checkpoint(local_uuid, local_seqno)) - { - return true; - } - - if (memcmp(&local_uuid, &uuid, sizeof(wsrep_uuid_t)) || - local_seqno < seqno || seqno < 0) - { - do_update= true; - } - else if (local_seqno > seqno) - { - WSREP_WARN("SST position can't be set in past. Requested: %lld, Current: " - " %lld.", (long long)seqno, (long long)local_seqno); /* - If we are here because of SET command, simply return true (error) instead of - aborting. + TODO: Handle backwards compatibility. WSREP API v25 does not have + wsrep schema. */ - if (implicit) - { - WSREP_WARN("Can't continue."); - unireg_abort(1); - } - else - { - return true; + /* + Logical SST methods (mysqldump etc) don't update InnoDB sys header. + Reset the SE checkpoint before recovering view in order to avoid + sanity check failure. + */ + wsrep::gtid const sst_gtid(wsrep::id(uuid.data, sizeof(uuid.data)), + wsrep::seqno(seqno)); + + if (!wsrep_before_SE()) { + wsrep_set_SE_checkpoint(wsrep::gtid::undefined()); + wsrep_set_SE_checkpoint(sst_gtid); } - } + wsrep_verify_SE_checkpoint(uuid, seqno); -#ifdef GTID_SUPPORT - wsrep_init_sidno(uuid); -#endif /* GTID_SUPPORT */ - - if (wsrep) - { - int const rcode(seqno < 0 ? seqno : 0); - wsrep_gtid_t const state_id= {uuid, - (rcode ? WSREP_SEQNO_UNDEFINED : seqno)}; - - wsrep_status_t ret= wsrep->sst_received(wsrep, &state_id, state, - state_len, rcode); - - if (ret != WSREP_OK) - { - return true; + /* + Both wsrep_init_SR() and wsrep_recover_view() may use + wsrep thread pool. Restore original thd context before returning. + */ + if (thd) { + thd->store_globals(); + } + else { + my_pthread_setspecific_ptr(THR_THD, NULL); } - } - // Now is the good time to update the local state and checkpoint. - if (do_update) - { - if (wsrep_set_SE_checkpoint(uuid, seqno)) + if (WSREP_ON) { - return true; + int const rcode(seqno < 0 ? seqno : 0); + wsrep_sst_complete(thd,rcode); } - - local_uuid= uuid; - local_seqno= seqno; - } - - return false; -} - -// Let applier threads to continue -bool wsrep_sst_continue () -{ - if (sst_needed) - { - WSREP_INFO("Signalling provider to continue."); - return wsrep_sst_received (wsrep, local_uuid, local_seqno, NULL, 0, true); - } - return false; } struct sst_thread_arg @@ -404,11 +282,11 @@ struct sst_thread_arg static int sst_scan_uuid_seqno (const char* str, wsrep_uuid_t* uuid, wsrep_seqno_t* seqno) { - int offt = wsrep_uuid_scan (str, strlen(str), uuid); + int offt= wsrep_uuid_scan (str, strlen(str), uuid); errno= 0; /* Reset the errno */ if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt]) { - *seqno = strtoll (str + offt + 1, NULL, 10); + *seqno= strtoll (str + offt + 1, NULL, 10); if (*seqno != LLONG_MAX || errno != ERANGE) { return 0; @@ -416,7 +294,7 @@ static int sst_scan_uuid_seqno (const char* str, } WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str); - return EINVAL; + return -EINVAL; } // get rid of trailing \n @@ -426,8 +304,8 @@ static char* my_fgets (char* buf, size_t buf_len, FILE* stream) if (ret) { - size_t len = strlen(ret); - if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0'; + size_t len= strlen(ret); + if (len > 0 && ret[len - 1] == '\n') ret[len - 1]= '\0'; } return ret; @@ -482,9 +360,10 @@ static void* sst_joiner_thread (void* a) int err= 1; { - const char magic[] = "ready"; - const size_t magic_len = sizeof(magic) - 1; - const size_t out_len = 512; + THD* thd; + const char magic[]= "ready"; + const size_t magic_len= sizeof(magic) - 1; + const size_t out_len= 512; char out[out_len]; WSREP_INFO("Running: '%s'", arg->cmd); @@ -501,29 +380,31 @@ static void* sst_joiner_thread (void* a) WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'", magic, arg->cmd, tmp); proc.wait(); - if (proc.error()) err = proc.error(); + if (proc.error()) err= proc.error(); } else { - err = 0; + err= 0; } } else { - err = proc.error(); + err= proc.error(); WSREP_ERROR("Failed to execute: %s : %d (%s)", arg->cmd, err, strerror(err)); } - // signal sst_prepare thread with ret code, - // it will go on sending SST request + /* + signal sst_prepare thread with ret code, + it will go on sending SST request + */ mysql_mutex_lock (&arg->lock); if (!err) { - arg->ret_str = strdup (out + magic_len + 1); - if (!arg->ret_str) err = ENOMEM; + arg->ret_str= strdup (out + magic_len + 1); + if (!arg->ret_str) err= ENOMEM; } - arg->err = -err; + arg->err= -err; mysql_cond_signal (&arg->cond); mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. @@ -531,11 +412,11 @@ static void* sst_joiner_thread (void* a) * initializer thread to ensure single thread of * shutdown. */ - wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; - wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED; + wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; + wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // in case of successfull receiver start, wait for SST completion/end - char* tmp = my_fgets (out, out_len, proc.pipe()); + char* tmp= my_fgets (out, out_len, proc.pipe()); proc.wait(); err= EINVAL; @@ -544,7 +425,7 @@ static void* sst_joiner_thread (void* a) { WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from " "joiner script."); - if (proc.error()) err = proc.error(); + if (proc.error()) err= proc.error(); } else { @@ -552,7 +433,14 @@ static void* sst_joiner_thread (void* a) const char *pos= strchr(out, ' '); if (!pos) { - // There is no wsrep_gtid_domain_id (some older version SST script?). + + if (wsrep_gtid_mode) + { + // There is no wsrep_gtid_domain_id (some older version SST script?). + WSREP_WARN("Did not find domain ID from SST script output '%s'. " + "Domain ID must be set manually to keep binlog consistent", + out); + } err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno); } else { @@ -588,14 +476,59 @@ static void* sst_joiner_thread (void* a) err: + wsrep::gtid ret_gtid; + if (err) { - ret_uuid= WSREP_UUID_UNDEFINED; - ret_seqno= -err; + ret_gtid= wsrep::gtid::undefined(); + } + else + { + ret_gtid= wsrep::gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)), + wsrep::seqno(ret_seqno)); } - // Tell initializer thread that SST is complete - wsrep_sst_complete (&ret_uuid, ret_seqno, true); + /* + Tell initializer thread that SST is complete + For that initialize a THD + */ + if (my_thread_init()) + { + WSREP_ERROR("my_thread_init() failed, can't signal end of SST. " + "Aborting."); + unireg_abort(1); + } + + thd= new THD(next_thread_id()); + + if (!thd) + { + WSREP_ERROR("Failed to allocate THD to restore view from local state, " + "can't signal end of SST. Aborting."); + unireg_abort(1); + } + + thd->thread_stack= (char*) &thd; + thd->security_ctx->skip_grants(); + thd->system_thread= SYSTEM_THREAD_GENERIC; + thd->real_id= pthread_self(); + + thd->store_globals(); + + /* */ + thd->variables.wsrep_on = 0; + /* No binlogging */ + thd->variables.sql_log_bin = 0; + thd->variables.option_bits &= ~OPTION_BIN_LOG; + /* No general log */ + thd->variables.option_bits |= OPTION_LOG_OFF; + /* Read committed isolation to avoid gap locking */ + thd->variables.tx_isolation= ISO_READ_COMMITTED; + + wsrep_sst_complete (thd, -err); + + delete thd; + my_thread_end(); } return NULL; @@ -694,7 +627,7 @@ static ssize_t sst_prepare_other (const char* method, " %s " WSREP_SST_OPT_PARENT " '%d'" " %s '%s'" - " %s '%s'", + " %s '%s'", method, addr_in, mysql_real_data_home, wsrep_defaults_file, (int)getpid(), binlog_opt, binlog_opt_val, @@ -734,7 +667,7 @@ static ssize_t sst_prepare_other (const char* method, pthread_t tmp; sst_thread_arg arg(cmd_str(), env()); mysql_mutex_lock (&arg.lock); - ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg); + ret= pthread_create (&tmp, NULL, sst_joiner_thread, &arg); if (ret) { WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)", @@ -746,11 +679,11 @@ static ssize_t sst_prepare_other (const char* method, *addr_out= arg.ret_str; if (!arg.err) - ret = strlen(*addr_out); + ret= strlen(*addr_out); else { assert (arg.err < 0); - ret = arg.err; + ret= arg.err; } pthread_detach (tmp); @@ -764,12 +697,12 @@ extern uint mysqld_port; static ssize_t sst_prepare_mysqldump (const char* addr_in, const char** addr_out) { - ssize_t ret = strlen (addr_in); + ssize_t ret= strlen (addr_in); if (!strrchr(addr_in, ':')) { - ssize_t s = ret + 7; - char* tmp = (char*) malloc (s); + ssize_t s= ret + 7; + char* tmp= (char*) malloc (s); if (tmp) { @@ -780,7 +713,7 @@ static ssize_t sst_prepare_mysqldump (const char* addr_in, *addr_out= tmp; return ret; } - if (ret > 0) /* buffer too short */ ret = -EMSGSIZE; + if (ret > 0) /* buffer too short */ ret= -EMSGSIZE; free (tmp); } else { @@ -797,32 +730,22 @@ static ssize_t sst_prepare_mysqldump (const char* addr_in, return ret; } -static bool SE_initialized = false; - -ssize_t wsrep_sst_prepare (void** msg) +std::string wsrep_sst_prepare() { + const ssize_t ip_max= 256; + char ip_buf[ip_max]; const char* addr_in= NULL; const char* addr_out= NULL; const char* method; if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP)) { - ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1; - *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL); - if (!msg) - { - WSREP_ERROR("Could not allocate %zd bytes for state request", ret); - unireg_abort(1); - } - return ret; + return WSREP_STATE_TRANSFER_TRIVIAL; } /* Figure out SST receive address. Common for all SST methods. */ - char ip_buf[256]; - const ssize_t ip_max= sizeof(ip_buf); - // Attempt 1: wsrep_sst_receive_address if (wsrep_sst_receive_address && strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) @@ -839,7 +762,7 @@ ssize_t wsrep_sst_prepare (void** msg) { WSREP_ERROR("Could not parse wsrep_node_address : %s", wsrep_node_address); - unireg_abort(1); + throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable"); } memcpy(ip_buf, addr.get_address(), addr.get_address_len()); addr_in= ip_buf; @@ -857,7 +780,7 @@ ssize_t wsrep_sst_prepare (void** msg) { WSREP_ERROR("Failed to guess address to accept state transfer. " "wsrep_sst_receive_address must be set manually."); - unireg_abort(1); + throw wsrep::runtime_error("Could not prepare state transfer request"); } } @@ -866,12 +789,16 @@ ssize_t wsrep_sst_prepare (void** msg) if (!strcmp(method, WSREP_SST_MYSQLDUMP)) { addr_len= sst_prepare_mysqldump (addr_in, &addr_out); - if (addr_len < 0) unireg_abort(1); + if (addr_len < 0) + { + throw wsrep::runtime_error("Could not prepare mysqldimp address"); + } } else { /*! A heuristic workaround until we learn how to stop and start engines */ - if (SE_initialized) + if (Wsrep_server_state::instance().is_initialized() && + Wsrep_server_state::instance().state() == Wsrep_server_state::s_joiner) { if (!strcmp(method, WSREP_SST_XTRABACKUP) || !strcmp(method, WSREP_SST_XTRABACKUPV2)) @@ -890,8 +817,7 @@ ssize_t wsrep_sst_prepare (void** msg) "if other means of state transfer are unavailable. " "In that case you will need to restart the server.", method); - *msg = 0; - return 0; + return ""; } addr_len = sst_prepare_other (method, sst_auth_real, @@ -900,37 +826,28 @@ ssize_t wsrep_sst_prepare (void** msg) { WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.", method); - unireg_abort(1); + throw wsrep::runtime_error("Failed to prepare for SST. Unrecoverable"); } } - size_t const method_len(strlen(method)); - size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/); + std::string ret; + ret += method; + ret.push_back('\0'); + ret += addr_out; - *msg = malloc (msg_len); - if (NULL != *msg) { - char* const method_ptr(reinterpret_cast<char*>(*msg)); - strcpy (method_ptr, method); - char* const addr_ptr(method_ptr + method_len + 1); - strcpy (addr_ptr, addr_out); - - WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr); - } - else { - WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.", - msg_len); - unireg_abort(1); - } + const char* method_ptr(ret.data()); + const char* addr_ptr(ret.data() + strlen(method_ptr) + 1); + WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr); if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out); - return msg_len; + return ret; } // helper method for donors static int sst_run_shell (const char* cmd_str, char** env, int max_tries) { - int ret = 0; + int ret= 0; for (int tries=1; tries <= max_tries; tries++) { @@ -941,7 +858,7 @@ static int sst_run_shell (const char* cmd_str, char** env, int max_tries) proc.wait(); } - if ((ret = proc.error())) + if ((ret= proc.error())) { WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)", tries, max_tries, proc.cmd(), ret, strerror(ret)); @@ -959,15 +876,12 @@ static int sst_run_shell (const char* cmd_str, char** env, int max_tries) static void sst_reject_queries(my_bool close_conn) { - wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced - WSREP_INFO("Rejecting client queries for the duration of SST."); - if (TRUE == close_conn) wsrep_close_client_connections(FALSE); + WSREP_INFO("Rejecting client queries for the duration of SST."); + if (TRUE == close_conn) wsrep_close_client_connections(FALSE); } static int sst_donate_mysqldump (const char* addr, - const wsrep_uuid_t* uuid, - const char* uuid_str, - wsrep_seqno_t seqno, + const wsrep::gtid& gtid, bool bypass, char** env) // carries auth info { @@ -990,23 +904,31 @@ static int sst_donate_mysqldump (const char* addr, return -ENOMEM; } + /* + we enable new client connections so that mysqldump donation can connect in, + but we reject local connections from modifyingcdata during SST, to keep + data intact + */ if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE); make_wsrep_defaults_file(); + std::ostringstream uuid_oss; + uuid_oss << gtid.id(); int ret= snprintf (cmd_str(), cmd_len, "wsrep_sst_mysqldump " WSREP_SST_OPT_ADDR " '%s' " - WSREP_SST_OPT_PORT " '%d' " + WSREP_SST_OPT_PORT " '%u' " WSREP_SST_OPT_LPORT " '%u' " WSREP_SST_OPT_SOCKET " '%s' " " %s " WSREP_SST_OPT_GTID " '%s:%lld' " WSREP_SST_OPT_GTID_DOMAIN_ID " '%d'" "%s", - addr, port, mysqld_port, mysqld_unix_port, - wsrep_defaults_file, uuid_str, - (long long)seqno, wsrep_gtid_domain_id, + addr, port, mysqld_port, mysqld_unix_port, + wsrep_defaults_file, + uuid_oss.str().c_str(), gtid.seqno().get(), + wsrep_gtid_domain_id, bypass ? " " WSREP_SST_OPT_BYPASS : ""); if (ret < 0 || ret >= cmd_len) @@ -1019,16 +941,17 @@ static int sst_donate_mysqldump (const char* addr, ret= sst_run_shell (cmd_str(), env, 3); - wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)}; - - wsrep->sst_sent (wsrep, &state_id, ret); + wsrep::gtid sst_sent_gtid(ret == 0 ? + gtid : + wsrep::gtid(gtid.id(), + wsrep::seqno::undefined())); + Wsrep_server_state::instance().sst_sent(sst_sent_gtid, ret); return ret; } wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; - /* Create a file under data directory. */ @@ -1077,7 +1000,6 @@ static int sst_create_file(const char *name, const char *content) return err; } - static int run_sql_command(THD *thd, const char *query) { thd->set_query((char *)query, strlen(query)); @@ -1123,9 +1045,9 @@ static int sst_flush_tables(THD* thd) { /* Do not use non-supported parser character sets */ WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname); - thd->variables.character_set_client = &my_charset_latin1; + thd->variables.character_set_client= &my_charset_latin1; WSREP_WARN("For SST temporally setting character set to : %s", - my_charset_latin1.csname); + my_charset_latin1.csname); } if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK")) @@ -1146,7 +1068,7 @@ static int sst_flush_tables(THD* thd) } } - thd->variables.character_set_client = current_charset; + thd->variables.character_set_client= current_charset; if (err) { @@ -1164,7 +1086,6 @@ static int sst_flush_tables(THD* thd) else { WSREP_INFO("Tables flushed."); - /* Tables have been flushed. Create a file with cluster state ID and wsrep_gtid_domain_id. @@ -1173,6 +1094,41 @@ static int sst_flush_tables(THD* thd) snprintf(content, sizeof(content), "%s:%lld %d\n", wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno, wsrep_gtid_domain_id); err= sst_create_file(flush_success, content); + + const char base_name[]= "tables_flushed"; + ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2; + char *real_name= (char*) malloc(full_len); + sprintf(real_name, "%s/%s", mysql_real_data_home, base_name); + char *tmp_name= (char*) malloc(full_len + 4); + sprintf(tmp_name, "%s.tmp", real_name); + + FILE* file= fopen(tmp_name, "w+"); + if (0 == file) + { + err= errno; + WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err)); + } + else + { + Wsrep_server_state& server_state= Wsrep_server_state::instance(); + std::ostringstream uuid_oss; + + uuid_oss << server_state.current_view().state_id().id(); + + fprintf(file, "%s:%lld %u\n", + uuid_oss.str().c_str(), server_state.pause_seqno().get(), + wsrep_gtid_domain_id); + fsync(fileno(file)); + fclose(file); + if (rename(tmp_name, real_name) == -1) + { + err= errno; + WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", + tmp_name, real_name, err,strerror(err)); + } + } + free(real_name); + free(tmp_name); } return err; @@ -1181,19 +1137,19 @@ static int sst_flush_tables(THD* thd) static void sst_disallow_writes (THD* thd, bool yes) { - char query_str[64] = { 0, }; - ssize_t const query_max = sizeof(query_str) - 1; + char query_str[64]= { 0, }; + ssize_t const query_max= sizeof(query_str) - 1; CHARSET_INFO *current_charset; - current_charset = thd->variables.character_set_client; + current_charset= thd->variables.character_set_client; if (!is_supported_parser_charset(current_charset)) { /* Do not use non-supported parser character sets */ WSREP_WARN("Current client character set is non-supported parser character set: %s", current_charset->csname); - thd->variables.character_set_client = &my_charset_latin1; + thd->variables.character_set_client= &my_charset_latin1; WSREP_WARN("For SST temporally setting character set to : %s", - my_charset_latin1.csname); + my_charset_latin1.csname); } snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d", @@ -1203,7 +1159,7 @@ static void sst_disallow_writes (THD* thd, bool yes) { WSREP_ERROR("Failed to disallow InnoDB writes"); } - thd->variables.character_set_client = current_charset; + thd->variables.character_set_client= current_charset; } static void* sst_donor_thread (void* a) @@ -1226,11 +1182,11 @@ static void* sst_donor_thread (void* a) // operate with wsrep_ready == OFF wsp::process proc(arg->cmd, "r", arg->env); - err= proc.error(); + err= -proc.error(); /* Inform server about SST script startup and release TO isolation */ mysql_mutex_lock (&arg->lock); - arg->err = -err; + arg->err= -err; mysql_cond_signal (&arg->cond); mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. @@ -1289,6 +1245,7 @@ wait_signal: else { WSREP_WARN("Received unknown signal: '%s'", out); + proc.wait(); } } else @@ -1296,7 +1253,7 @@ wait_signal: WSREP_ERROR("Failed to read from: %s", proc.cmd()); proc.wait(); } - if (!err && proc.error()) err= proc.error(); + if (!err && proc.error()) err= -proc.error(); } else { @@ -1315,24 +1272,20 @@ wait_signal: thd.ptr->global_read_lock.unlock_global_read_lock(thd.ptr); } - // signal to donor that SST is over - struct wsrep_gtid const state_id = { - ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno - }; - wsrep->sst_sent (wsrep, &state_id, -err); + wsrep::gtid gtid(wsrep::id(ret_uuid.data, sizeof(ret_uuid.data)), + wsrep::seqno(err ? wsrep::seqno::undefined() : + wsrep::seqno(ret_seqno))); + Wsrep_server_state::instance().sst_sent(gtid, err); proc.wait(); return NULL; } - - -static int sst_donate_other (const char* method, - const char* addr, - const char* uuid, - wsrep_seqno_t seqno, - bool bypass, - char** env) // carries auth info +static int sst_donate_other (const char* method, + const char* addr, + const wsrep::gtid& gtid, + bool bypass, + char** env) // carries auth info { int const cmd_len= 4096; wsp::string cmd_str(cmd_len); @@ -1357,6 +1310,8 @@ static int sst_donate_other (const char* method, make_wsrep_defaults_file(); + std::ostringstream uuid_oss; + uuid_oss << gtid.id(); ret= snprintf (cmd_str(), cmd_len, "wsrep_sst_%s " WSREP_SST_OPT_ROLE " 'donor' " @@ -1371,7 +1326,7 @@ static int sst_donate_other (const char* method, method, addr, mysqld_unix_port, mysql_real_data_home, wsrep_defaults_file, binlog_opt, binlog_opt_val, - uuid, (long long) seqno, wsrep_gtid_domain_id, + uuid_oss.str().c_str(), gtid.seqno().get(), wsrep_gtid_domain_id, bypass ? " " WSREP_SST_OPT_BYPASS : ""); my_free(binlog_opt_val); @@ -1386,7 +1341,7 @@ static int sst_donate_other (const char* method, pthread_t tmp; sst_thread_arg arg(cmd_str(), env); mysql_mutex_lock (&arg.lock); - ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg); + ret= pthread_create (&tmp, NULL, sst_donor_thread, &arg); if (ret) { WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)", @@ -1399,23 +1354,18 @@ static int sst_donate_other (const char* method, return arg.err; } -wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, - const void* msg, size_t msg_len, - const wsrep_gtid_t* current_gtid, - const char* state, size_t state_len, - bool bypass) +int wsrep_sst_donate(const std::string& msg, + const wsrep::gtid& current_gtid, + const bool bypass) { /* This will be reset when sync callback is called. * Should we set wsrep_ready to FALSE here too? */ - wsrep_config_state->set(WSREP_MEMBER_DONOR); + wsrep_config_state->set(wsrep::server_state::s_donor); - const char* method = (char*)msg; - size_t method_len = strlen (method); - const char* data = method + method_len + 1; - - char uuid_str[37]; - wsrep_uuid_print (¤t_gtid->uuid, uuid_str, sizeof(uuid_str)); + const char* method= msg.data(); + size_t method_len= strlen (method); + const char* data= method + method_len + 1; wsp::env env(NULL); if (env.error()) @@ -1443,54 +1393,13 @@ wsrep_cb_status_t wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, if (!strcmp (WSREP_SST_MYSQLDUMP, method)) { - ret = sst_donate_mysqldump(data, ¤t_gtid->uuid, uuid_str, - current_gtid->seqno, bypass, env()); + ret= sst_donate_mysqldump(data, current_gtid, bypass, env()); } else { - ret = sst_donate_other(method, data, uuid_str, - current_gtid->seqno, bypass, env()); + ret= sst_donate_other(method, data, current_gtid, bypass, env()); } - return (ret >= 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE); -} - -void wsrep_SE_init_grab() -{ - if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort(); -} - -void wsrep_SE_init_wait() -{ - double total_wtime=0; - - while (SE_initialized == false) - { - struct timespec wtime; - set_timespec(wtime, WSREP_TIMEDWAIT_SECONDS); - time_t start_time = time(NULL); - mysql_cond_timedwait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init, &wtime); - time_t end_time = time(NULL); - - if (!SE_initialized) - { - total_wtime += difftime(end_time, start_time); - WSREP_DEBUG("Waiting for SST to complete. current seqno: %" PRId64 " waited %f secs.", local_seqno, total_wtime); - service_manager_extend_timeout(WSREP_EXTEND_TIMEOUT_INTERVAL, - "WSREP state transfer ongoing, current seqno: %ld waited %f secs", local_seqno, total_wtime); - } - } - - mysql_mutex_unlock (&LOCK_wsrep_sst_init); -} - -void wsrep_SE_init_done() -{ - mysql_cond_signal (&COND_wsrep_sst_init); - mysql_mutex_unlock (&LOCK_wsrep_sst_init); -} - -void wsrep_SE_initialized() -{ - SE_initialized = true; + return (ret >= 0 ? 0 : 1); } + |