diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/wsrep_check_opts.cc | 392 | ||||
-rw-r--r-- | sql/wsrep_hton.cc | 449 | ||||
-rw-r--r-- | sql/wsrep_mysqld.cc | 1311 | ||||
-rw-r--r-- | sql/wsrep_mysqld.h | 383 | ||||
-rw-r--r-- | sql/wsrep_notify.cc | 107 | ||||
-rw-r--r-- | sql/wsrep_priv.h | 233 | ||||
-rw-r--r-- | sql/wsrep_sst.cc | 1001 | ||||
-rw-r--r-- | sql/wsrep_utils.cc | 468 | ||||
-rw-r--r-- | sql/wsrep_var.cc | 548 |
9 files changed, 0 insertions, 4892 deletions
diff --git a/sql/wsrep_check_opts.cc b/sql/wsrep_check_opts.cc deleted file mode 100644 index 5764be39093..00000000000 --- a/sql/wsrep_check_opts.cc +++ /dev/null @@ -1,392 +0,0 @@ -/* Copyright 2011 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -//#include <mysqld.h> -#include <sql_class.h> -//#include <sql_plugin.h> -//#include <set_var.h> - -#include "wsrep_mysqld.h" - -#include <stdlib.h> -#include <string.h> -#include <errno.h> -#include <ctype.h> - -/* This file is about checking for correctness of mysql configuration options */ - -struct opt -{ - const char* const name; - const char* value; -}; - -/* A list of options to check. - * At first we assume default values and then see if they are changed on CLI or - * in my.cnf */ -static struct opt opts[] = -{ - { "wsrep_slave_threads", "1" }, // mysqld.cc - { "bind_address", "0.0.0.0" }, // mysqld.cc - { "wsrep_sst_method","mysqldump" }, // mysqld.cc - { "wsrep_sst_receive_address","AUTO"}, // mysqld.cc - { "binlog_format", "ROW" }, // mysqld.cc - { "wsrep_provider", "none" }, // mysqld.cc - { "query_cache_type", "0" }, // mysqld.cc - { "query_cache_size", "0" }, // mysqld.cc - { "locked_in_memory", "0" }, // mysqld.cc - { "wsrep_cluster_address", "0" }, // mysqld.cc - { "locks_unsafe_for_binlog", "0" }, // ha_innodb.cc - { "autoinc_lock_mode", "1" }, // ha_innodb.cc - { 0, 0 } -}; - -enum -{ - WSREP_SLAVE_THREADS, - BIND_ADDRESS, - WSREP_SST_METHOD, - WSREP_SST_RECEIVE_ADDRESS, - BINLOG_FORMAT, - WSREP_PROVIDER, - QUERY_CACHE_TYPE, - QUERY_CACHE_SIZE, - LOCKED_IN_MEMORY, - WSREP_CLUSTER_ADDRESS, - LOCKS_UNSAFE_FOR_BINLOG, - AUTOINC_LOCK_MODE -}; - - -/* A class to make a copy of argv[] vector */ -struct argv_copy -{ - int const argc_; - char** argv_; - - argv_copy (int const argc, const char* const argv[]) : - argc_ (argc), - argv_ (reinterpret_cast<char**>(calloc(argc_, sizeof(char*)))) - { - if (argv_) - { - for (int i = 0; i < argc_; ++i) - { - argv_[i] = strdup(argv[i]); - - if (!argv_[i]) - { - argv_free (); // free whatever bee allocated - return; - } - } - } - } - - ~argv_copy () { argv_free (); } - -private: - argv_copy (const argv_copy&); - argv_copy& operator= (const argv_copy&); - - void argv_free() - { - if (argv_) - { - for (int i = 0; (i < argc_) && argv_[i] ; ++i) free (argv_[i]); - free (argv_); - argv_ = 0; - } - } -}; - -/* a short corresponding to '--' byte sequence */ -static short const long_opt_prefix ('-' + ('-' << 8)); - -/* Normalizes long options to have '_' instead of '-' */ -static int -normalize_opts (argv_copy& a) -{ - if (a.argv_) - { - for (int i = 0; i < a.argc_; ++i) - { - char* ptr = a.argv_[i]; - if (long_opt_prefix == *(short*)ptr) // long option - { - ptr += 2; - const char* end = strchr(ptr, '='); - - if (!end) end = ptr + strlen(ptr); - - for (; ptr != end; ++ptr) if ('-' == *ptr) *ptr = '_'; - } - } - - return 0; - } - - return EINVAL; -} - -/* Find required options in the argument list and change their values */ -static int -find_opts (argv_copy& a, struct opt* const opts) -{ - for (int i = 0; i < a.argc_; ++i) - { - char* ptr = a.argv_[i] + 2; // we're interested only in long options - - struct opt* opt = opts; - for (; 0 != opt->name; ++opt) - { - if (!strstr(ptr, opt->name)) continue; // try next option - - /* 1. try to find value after the '=' */ - opt->value = strchr(ptr, '=') + 1; - - /* 2. if no '=', try next element in the argument vector */ - if (reinterpret_cast<void*>(1) == opt->value) - { - /* also check that the next element is not an option itself */ - if (i + 1 < a.argc_ && *(a.argv_[i + 1]) != '-') - { - ++i; - opt->value = a.argv_[i]; - } - else opt->value = ""; // no value supplied (like boolean opt) - } - - break; // option found, break inner loop - } - } - - return 0; -} - -/* Parses string for an integer. Returns 0 on success. */ -int get_long_long (const struct opt& opt, long long* const val, int const base) -{ - const char* const str = opt.value; - - if ('\0' != *str) - { - char* endptr; - - *val = strtoll (str, &endptr, base); - - if ('k' == *endptr || 'K' == *endptr) - { - *val *= 1024L; - endptr++; - } - else if ('m' == *endptr || 'M' == *endptr) - { - *val *= 1024L * 1024L; - endptr++; - } - else if ('g' == *endptr || 'G' == *endptr) - { - *val *= 1024L * 1024L * 1024L; - endptr++; - } - - if ('\0' == *endptr) return 0; // the whole string was a valid integer - } - - WSREP_ERROR ("Bad value for *%s: '%s'. Should be integer.", - opt.name, opt.value); - - return EINVAL; -} - -/* This is flimzy coz hell knows how mysql interprets boolean strings... - * and, no, I'm not going to become versed in how mysql handles options - - * I'd rather sing. - - Aha, http://dev.mysql.com/doc/refman/5.1/en/dynamic-system-variables.html: - Variables that have a type of “boolean” can be set to 0, 1, ON or OFF. (If you - set them on the command line or in an option file, use the numeric values.) - - So it is '0' for FALSE, '1' or empty string for TRUE - - */ -int get_bool (const struct opt& opt, bool* const val) -{ - const char* str = opt.value; - - while (isspace(*str)) ++str; // skip initial whitespaces - - ssize_t str_len = strlen(str); - switch (str_len) - { - case 0: - *val = true; - return 0; - case 1: - if ('0' == *str || '1' == *str) - { - *val = ('1' == *str); - return 0; - } - } - - WSREP_ERROR ("Bad value for *%s: '%s'. Should be '0', '1' or empty string.", - opt.name, opt.value); - - return EINVAL; -} - -static int -check_opts (int const argc, const char* const argv[], struct opt opts[]) -{ - /* First, make a copy of argv to be able to manipulate it */ - argv_copy a(argc, argv); - - if (!a.argv_) - { - WSREP_ERROR ("Could not copy argv vector: not enough memory."); - return ENOMEM; - } - - int err = normalize_opts (a); - if (err) - { - WSREP_ERROR ("Failed to normalize options."); - return err; - } - - err = find_opts (a, opts); - if (err) - { - WSREP_ERROR ("Failed to parse options."); - return err; - } - - /* At this point we have updated default values in our option list to - what has been specified on the command line / my.cnf */ - - long long slave_threads; - err = get_long_long (opts[WSREP_SLAVE_THREADS], &slave_threads, 10); - if (err) return err; - - int rcode = 0; - - if (slave_threads > 1) - /* Need to check AUTOINC_LOCK_MODE and LOCKS_UNSAFE_FOR_BINLOG */ - { - long long autoinc_lock_mode; - err = get_long_long (opts[AUTOINC_LOCK_MODE], &autoinc_lock_mode, 10); - if (err) return err; - - bool locks_unsafe_for_binlog; - err = get_bool (opts[LOCKS_UNSAFE_FOR_BINLOG],&locks_unsafe_for_binlog); - if (err) return err; - - if (autoinc_lock_mode != 2) - { - WSREP_ERROR ("Parallel applying (wsrep_slave_threads > 1) requires" - " innodb_autoinc_lock_mode = 2."); - rcode = EINVAL; - } - } - - long long query_cache_size, query_cache_type; - if ((err = get_long_long (opts[QUERY_CACHE_SIZE], &query_cache_size, 10))) - return err; - if ((err = get_long_long (opts[QUERY_CACHE_TYPE], &query_cache_type, 10))) - return err; - - if (0 != query_cache_size && 0 != query_cache_type) - { - WSREP_ERROR ("Query cache is not supported (size=%lld type=%lld)", - query_cache_size, query_cache_type); - rcode = EINVAL; - } - - bool locked_in_memory; - err = get_bool (opts[LOCKED_IN_MEMORY], &locked_in_memory); - if (err) { WSREP_ERROR("get_bool error: %s", strerror(err)); return err; } - if (locked_in_memory) - { - WSREP_ERROR ("Memory locking is not supported (locked_in_memory=%s)", - locked_in_memory ? "ON" : "OFF"); - rcode = EINVAL; - } - - if (!strcasecmp(opts[WSREP_SST_METHOD].value,"mysqldump")) - { - if (!strcasecmp(opts[BIND_ADDRESS].value, "127.0.0.1") || - !strcasecmp(opts[BIND_ADDRESS].value, "localhost")) - { - WSREP_ERROR ("wsrep_sst_method is set to 'mysqldump' yet " - "mysqld bind_address is set to '%s', which makes it " - "impossible to receive state transfer from another " - "node, since mysqld won't accept such connections. " - "If you wish to use mysqldump state transfer method, " - "set bind_address to allow mysql client connections " - "from other cluster members (e.g. 0.0.0.0).", - opts[BIND_ADDRESS].value); - rcode = EINVAL; - } - } - else - { - // non-mysqldump SST requires wsrep_cluster_address on startup - if (strlen(opts[WSREP_CLUSTER_ADDRESS].value) == 0) - { - WSREP_ERROR ("%s SST method requires wsrep_cluster_address to be " - "configured on startup.",opts[WSREP_SST_METHOD].value); - rcode = EINVAL; - } - } - - if (strcasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value, "AUTO")) - { - if (!strncasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value, - "127.0.0.1", strlen("127.0.0.1")) || - !strncasecmp(opts[WSREP_SST_RECEIVE_ADDRESS].value, - "localhost", strlen("localhost"))) - { - WSREP_WARN ("wsrep_sst_receive_address is set to '%s' which " - "makes it impossible for another host to reach this " - "one. Please set it to the address which this node " - "can be connected at by other cluster members.", - opts[WSREP_SST_RECEIVE_ADDRESS].value); -// rcode = EINVAL; - } - } - - if (strcasecmp(opts[WSREP_PROVIDER].value, "none")) - { - if (strcasecmp(opts[BINLOG_FORMAT].value, "ROW")) - { - WSREP_ERROR ("Only binlog_format = 'ROW' is currently supported. " - "Configured value: '%s'. Please adjust your " - "configuration.", opts[BINLOG_FORMAT].value); - - rcode = EINVAL; - } - } - - return rcode; -} - -int -wsrep_check_opts (int const argc, char* const* const argv) -{ - return check_opts (argc, argv, opts); -} - diff --git a/sql/wsrep_hton.cc b/sql/wsrep_hton.cc deleted file mode 100644 index 1fc2372a57c..00000000000 --- a/sql/wsrep_hton.cc +++ /dev/null @@ -1,449 +0,0 @@ -/* Copyright 2008 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include <mysqld.h> -#include "sql_base.h" -#include "rpl_filter.h" -#include <sql_class.h> -#include "wsrep_mysqld.h" -#include "wsrep_priv.h" -#include <cstdio> -#include <cstdlib> - -extern handlerton *binlog_hton; -extern int binlog_close_connection(handlerton *hton, THD *thd); -extern ulonglong thd_to_trx_id(THD *thd); - -extern "C" int thd_binlog_format(const MYSQL_THD thd); -// todo: share interface with ha_innodb.c - -enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); - -/* - a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct. - Is called by THD::transactions.cleanup() -*/ -void wsrep_cleanup_transaction(THD *thd) -{ - if (thd->thread_id == 0) return; - if (thd->wsrep_exec_mode == LOCAL_COMMIT) - { - if (thd->variables.wsrep_on && - thd->wsrep_conflict_state != MUST_REPLAY) - { - if (thd->wsrep_seqno_changed) - { - if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle)) - { - DBUG_PRINT("wsrep", ("set committed fail")); - WSREP_WARN("set committed fail: %llu %d", - (long long)thd->real_id, thd->stmt_da->status()); - } - } - //else - //WSREP_DEBUG("no trx handle for %s", thd->query()); - thd_binlog_trx_reset(thd); - thd->wsrep_seqno_changed = false; - } - thd->wsrep_exec_mode= LOCAL_STATE; - } -} - -/* - wsrep hton -*/ -handlerton *wsrep_hton; - -void wsrep_register_hton(THD* thd, bool all) -{ - THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; - for (Ha_trx_info *i= trans->ha_list; WSREP(thd) && i; i = i->next()) - { - if (i->ht()->db_type == DB_TYPE_INNODB) - { - trans_register_ha(thd, all, wsrep_hton); - thd->ha_data[wsrep_hton->slot].ha_info[all].set_trx_read_write(); - break; - } - } -} - -/* - wsrep exploits binlog's caches even if binlogging itself is not - activated. In such case connection close needs calling - actual binlog's method. - Todo: split binlog hton from its caches to use ones by wsrep - without referring to binlog's stuff. -*/ -static int -wsrep_close_connection(handlerton* hton, THD* thd) -{ - DBUG_ENTER("wsrep_close_connection"); - if (thd_get_ha_data(thd, binlog_hton) != NULL) - binlog_hton->close_connection (binlog_hton, thd); - DBUG_RETURN(0); -} - -/* - prepare/wsrep_run_wsrep_commit can fail in two ways - - certification test or an equivalent. As a result, - the current transaction just rolls back - Error codes: - WSREP_TRX_ROLLBACK, WSREP_TRX_ERROR - - a post-certification failure makes this server unable to - commit its own WS and therefore the server must abort -*/ -static int wsrep_prepare(handlerton *hton, THD *thd, bool all) -{ -#ifndef DBUG_OFF - //wsrep_seqno_t old = thd->wsrep_trx_seqno; -#endif - DBUG_ENTER("wsrep_prepare"); - if ((all || - !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && - (thd->variables.wsrep_on && !wsrep_trans_cache_is_empty(thd))) - { - switch (wsrep_run_wsrep_commit(thd, hton, all)) - { - case WSREP_TRX_OK: - // DBUG_ASSERT(thd->wsrep_trx_seqno > old || - // thd->wsrep_exec_mode == REPL_RECV || - // thd->wsrep_exec_mode == TOTAL_ORDER); - break; - case WSREP_TRX_ROLLBACK: - case WSREP_TRX_ERROR: - DBUG_RETURN(1); - } - } - DBUG_RETURN(0); -} - -static int wsrep_savepoint_set(handlerton *hton, THD *thd, void *sv) -{ - if (!wsrep_emulate_bin_log) return 0; - int rcode = binlog_hton->savepoint_set(binlog_hton, thd, sv); - return rcode; -} -static int wsrep_savepoint_rollback(handlerton *hton, THD *thd, void *sv) -{ - if (!wsrep_emulate_bin_log) return 0; - int rcode = binlog_hton->savepoint_rollback(binlog_hton, thd, sv); - return rcode; -} - -static int wsrep_rollback(handlerton *hton, THD *thd, bool all) -{ - DBUG_ENTER("wsrep_rollback"); - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && - (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) - { - if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) - { - DBUG_PRINT("wsrep", ("setting rollback fail")); - WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", - (long long)thd->real_id, thd->query()); - } - } - - int rcode = 0; - if (!wsrep_emulate_bin_log) - { - if (all) thd_binlog_trx_reset(thd); - } - - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - DBUG_RETURN(rcode); -} - -int wsrep_commit(handlerton *hton, THD *thd, bool all) -{ - DBUG_ENTER("wsrep_commit"); - - DBUG_RETURN(0); -} - -extern Rpl_filter* binlog_filter; -extern my_bool opt_log_slave_updates; -enum wsrep_trx_status -wsrep_run_wsrep_commit( - THD *thd, handlerton *hton, bool all) -{ - int rcode = -1; - uint data_len = 0; - uchar *rbr_data = NULL; - IO_CACHE *cache; - int replay_round= 0; - - if (thd->stmt_da->is_error()) { - WSREP_ERROR("commit issue, error: %d %s", - thd->stmt_da->sql_errno(), thd->stmt_da->message()); - } - - DBUG_ENTER("wsrep_run_wsrep_commit"); - if (thd->slave_thread && !opt_log_slave_updates) { - DBUG_RETURN(WSREP_TRX_OK); - } - if (thd->wsrep_exec_mode == REPL_RECV) { - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - if (thd->wsrep_conflict_state == MUST_ABORT) { - if (wsrep_debug) - WSREP_INFO("WSREP: must abort for BF"); - DBUG_PRINT("wsrep", ("BF apply commit fail")); - thd->wsrep_conflict_state = NO_CONFLICT; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - // - // TODO: test all calls of the rollback. - // rollback must happen automagically innobase_rollback(hton, thd, 1); - // - DBUG_RETURN(WSREP_TRX_ERROR); - } - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - } - if (thd->wsrep_exec_mode != LOCAL_STATE) { - DBUG_RETURN(WSREP_TRX_OK); - } - if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) { - WSREP_DEBUG("commit for consistency check: %s", thd->query()); - DBUG_RETURN(WSREP_TRX_OK); - } - - DBUG_PRINT("wsrep", ("replicating commit")); - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - if (thd->wsrep_conflict_state == MUST_ABORT) { - DBUG_PRINT("wsrep", ("replicate commit fail")); - thd->wsrep_conflict_state = ABORTED; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - if (wsrep_debug) { - WSREP_INFO("innobase_commit, abort %s", - (thd->query()) ? thd->query() : "void"); - } - DBUG_RETURN(WSREP_TRX_ROLLBACK); - } - - mysql_mutex_lock(&LOCK_wsrep_replaying); - - while (wsrep_replaying > 0 && - thd->wsrep_conflict_state == NO_CONFLICT && - thd->killed == NOT_KILLED && - !shutdown_in_progress) - { - - mysql_mutex_unlock(&LOCK_wsrep_replaying); - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - mysql_mutex_lock(&thd->mysys_var->mutex); - thd_proc_info(thd, "wsrep waiting on replaying"); - thd->mysys_var->current_mutex= &LOCK_wsrep_replaying; - thd->mysys_var->current_cond= &COND_wsrep_replaying; - mysql_mutex_unlock(&thd->mysys_var->mutex); - - mysql_mutex_lock(&LOCK_wsrep_replaying); - // Using timedwait is a hack to avoid deadlock in case if BF victim - // misses the signal. - struct timespec wtime = {0, 1000000}; - mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying, - &wtime); - if (replay_round++ % 100000 == 0) - WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)", - wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round); - - mysql_mutex_unlock(&LOCK_wsrep_replaying); - - mysql_mutex_lock(&thd->mysys_var->mutex); - thd->mysys_var->current_mutex= 0; - thd->mysys_var->current_cond= 0; - mysql_mutex_unlock(&thd->mysys_var->mutex); - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - mysql_mutex_lock(&LOCK_wsrep_replaying); - } - mysql_mutex_unlock(&LOCK_wsrep_replaying); - - if (thd->wsrep_conflict_state == MUST_ABORT) { - DBUG_PRINT("wsrep", ("replicate commit fail")); - thd->wsrep_conflict_state = ABORTED; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - WSREP_DEBUG("innobase_commit abort after replaying wait %s", - (thd->query()) ? thd->query() : "void"); - DBUG_RETURN(WSREP_TRX_ROLLBACK); - } - thd->wsrep_query_state = QUERY_COMMITTING; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - cache = get_trans_log(thd); - rcode = 0; - if (cache) { - thd->binlog_flush_pending_rows_event(true); - rcode = wsrep_write_cache(cache, &rbr_data, &data_len); - if (rcode) { - WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode); - if (data_len) my_free(rbr_data); - DBUG_RETURN(WSREP_TRX_ROLLBACK); - } - } - if (data_len == 0) - { - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_exec_mode = LOCAL_COMMIT; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - if (thd->stmt_da->is_ok() && - thd->stmt_da->affected_rows() > 0 && - !binlog_filter->is_on()) - { - WSREP_DEBUG("empty rbr buffer, query: %s, " - "affected rows: %llu, " - "changed tables: %d, " - "sql_log_bin: %d, " - "wsrep status (%d %d %d)", - thd->query(), thd->stmt_da->affected_rows(), - stmt_has_updated_trans_table(thd), thd->variables.sql_log_bin, - thd->wsrep_exec_mode, thd->wsrep_query_state, - thd->wsrep_conflict_state); - } - else - { - WSREP_DEBUG("empty rbr buffer, query: %s", thd->query()); - } - DBUG_RETURN(WSREP_TRX_OK); - } - if (!rcode) { - rcode = wsrep->pre_commit( - wsrep, - (wsrep_conn_id_t)thd->thread_id, - &thd->wsrep_trx_handle, - rbr_data, - data_len, - (thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL, - &thd->wsrep_trx_seqno); - if (rcode == WSREP_TRX_MISSING) { - rcode = WSREP_OK; - } else if (rcode == WSREP_BF_ABORT) { - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - thd->wsrep_conflict_state = MUST_REPLAY; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying++; - WSREP_DEBUG("replaying increased: %d, thd: %lu", - wsrep_replaying, thd->thread_id); - mysql_mutex_unlock(&LOCK_wsrep_replaying); - } - thd->wsrep_seqno_changed = true; - } else { - WSREP_ERROR("I/O error reading from thd's binlog iocache: " - "errno=%d, io cache code=%d", my_errno, cache->error); - if (data_len) my_free(rbr_data); - DBUG_ASSERT(0); // failure like this can not normally happen - DBUG_RETURN(WSREP_TRX_ERROR); - } - - if (data_len) { - my_free(rbr_data); - } - - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - switch(rcode) { - case 0: - thd->wsrep_exec_mode = LOCAL_COMMIT; - /* Override XID iff it was generated by mysql */ - if (thd->transaction.xid_state.xid.get_my_xid()) - { - wsrep_xid_init(&thd->transaction.xid_state.xid, - wsrep_cluster_uuid(), - thd->wsrep_trx_seqno); - } - DBUG_PRINT("wsrep", ("replicating commit success")); - - break; - case WSREP_TRX_FAIL: - case WSREP_BF_ABORT: - WSREP_DEBUG("commit failed for reason: %d", rcode); - DBUG_PRINT("wsrep", ("replicating commit fail")); - - thd->wsrep_query_state= QUERY_EXEC; - - if (thd->wsrep_conflict_state == MUST_ABORT) { - thd->wsrep_conflict_state= ABORTED; - } - else - { - WSREP_DEBUG("conflict state: %d", thd->wsrep_conflict_state); - if (thd->wsrep_conflict_state == NO_CONFLICT) - { - thd->wsrep_conflict_state = CERT_FAILURE; - WSREP_LOG_CONFLICT(NULL, thd, FALSE); - } - } - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - DBUG_RETURN(WSREP_TRX_ROLLBACK); - - case WSREP_CONN_FAIL: - WSREP_ERROR("connection failure"); - DBUG_RETURN(WSREP_TRX_ERROR); - default: - WSREP_ERROR("unknown connection failure"); - DBUG_RETURN(WSREP_TRX_ERROR); - } - - thd->wsrep_query_state= QUERY_EXEC; - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - DBUG_RETURN(WSREP_TRX_OK); -} - - -static int wsrep_hton_init(void *p) -{ - wsrep_hton= (handlerton *)p; - //wsrep_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO; - wsrep_hton->state= SHOW_OPTION_YES; - wsrep_hton->db_type=DB_TYPE_WSREP; - wsrep_hton->savepoint_offset= sizeof(my_off_t); - wsrep_hton->close_connection= wsrep_close_connection; - wsrep_hton->savepoint_set= wsrep_savepoint_set; - wsrep_hton->savepoint_rollback= wsrep_savepoint_rollback; - wsrep_hton->commit= wsrep_commit; - wsrep_hton->rollback= wsrep_rollback; - wsrep_hton->prepare= wsrep_prepare; - wsrep_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; // todo: fix flags - wsrep_hton->slot= 0; - return 0; -} - - -struct st_mysql_storage_engine wsrep_storage_engine= -{ MYSQL_HANDLERTON_INTERFACE_VERSION }; - - -mysql_declare_plugin(wsrep) -{ - MYSQL_STORAGE_ENGINE_PLUGIN, - &wsrep_storage_engine, - "wsrep", - "Codership Oy", - "A pseudo storage engine to represent transactions in multi-master synchornous replication", - PLUGIN_LICENSE_GPL, - wsrep_hton_init, /* Plugin Init */ - NULL, /* Plugin Deinit */ - 0x0100 /* 1.0 */, - NULL, /* status variables */ - NULL, /* system variables */ - NULL, /* config options */ - 0, /* flags */ -} -mysql_declare_plugin_end; diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc deleted file mode 100644 index 62397c58a6e..00000000000 --- a/sql/wsrep_mysqld.cc +++ /dev/null @@ -1,1311 +0,0 @@ -/* Copyright 2008 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include <mysqld.h> -#include <sql_class.h> -#include <sql_parse.h> -#include "wsrep_priv.h" -#include <cstdio> -#include <cstdlib> -#include "log_event.h" - -extern Format_description_log_event *wsrep_format_desc; -wsrep_t *wsrep = NULL; -my_bool wsrep_emulate_bin_log = FALSE; // activating parts of binlog interface - -/* - * Begin configuration options and their default values - */ - -const char* wsrep_data_home_dir = NULL; - -#define WSREP_NODE_INCOMING_AUTO "AUTO" -const char* wsrep_node_incoming_address = WSREP_NODE_INCOMING_AUTO; -const char* wsrep_dbug_option = ""; - -long wsrep_slave_threads = 1; // # of slave action appliers wanted -my_bool wsrep_debug = 0; // enable debug level logging -my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx -ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx -my_bool wsrep_auto_increment_control = 1; // control auto increment variables -my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey -my_bool wsrep_incremental_data_collection = 0; // incremental data collection -long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size -long wsrep_max_ws_rows = 65536; // max number of rows in ws -int wsrep_to_isolation = 0; // # of active TO isolation threads -my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key -long wsrep_max_protocol_version = 2; // maximum protocol version to use -ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; -my_bool wsrep_recovery = 0; // recovery -my_bool wsrep_replicate_myisam = 0; // enable myisam replication -my_bool wsrep_log_conflicts = 0; // -ulong wsrep_mysql_replication_bundle = 0; - -/* - * End configuration options - */ - -static const wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; -const wsrep_uuid_t* wsrep_cluster_uuid() -{ - return &cluster_uuid; -} -static char cluster_uuid_str[40]= { 0, }; -static const char* cluster_status_str[WSREP_VIEW_MAX] = -{ - "Primary", - "non-Primary", - "Disconnected" -}; - -static char provider_name[256]= { 0, }; -static char provider_version[256]= { 0, }; -static char provider_vendor[256]= { 0, }; - -/* - * wsrep status variables - */ -my_bool wsrep_connected = FALSE; -my_bool wsrep_ready = FALSE; // node can accept queries -const char* wsrep_cluster_state_uuid = cluster_uuid_str; -long long wsrep_cluster_conf_id = WSREP_SEQNO_UNDEFINED; -const char* wsrep_cluster_status = cluster_status_str[WSREP_VIEW_DISCONNECTED]; -long wsrep_cluster_size = 0; -long wsrep_local_index = -1; -const char* wsrep_provider_name = provider_name; -const char* wsrep_provider_version = provider_version; -const char* wsrep_provider_vendor = provider_vendor; -/* End wsrep status variables */ - - -wsrep_uuid_t local_uuid = WSREP_UUID_UNDEFINED; -wsrep_seqno_t local_seqno = WSREP_SEQNO_UNDEFINED; -wsp::node_status local_status; -long wsrep_protocol_version = 2; - -// Boolean denoting if server is in initial startup phase. This is needed -// to make sure that main thread waiting in wsrep_sst_wait() is signaled -// if there was no state gap on receiving first view event. -static my_bool wsrep_startup = TRUE; - -// action execute callback -extern wsrep_status_t wsrep_apply_cb(void *ctx, - const void* buf, size_t buf_len, - wsrep_seqno_t global_seqno); - -extern wsrep_status_t wsrep_commit_cb (void *ctx, - wsrep_seqno_t global_seqno, - bool commit); - -static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { - switch (level) { - case WSREP_LOG_INFO: - sql_print_information("WSREP: %s", msg); - break; - case WSREP_LOG_WARN: - sql_print_warning("WSREP: %s", msg); - break; - case WSREP_LOG_ERROR: - case WSREP_LOG_FATAL: - sql_print_error("WSREP: %s", msg); - break; - case WSREP_LOG_DEBUG: - if (wsrep_debug) sql_print_information ("[Debug] WSREP: %s", msg); - default: - break; - } -} - -static void wsrep_log_states (wsrep_log_level_t const level, - const wsrep_uuid_t* const group_uuid, - wsrep_seqno_t const group_seqno, - const wsrep_uuid_t* const node_uuid, - wsrep_seqno_t const node_seqno) -{ - char uuid_str[37]; - char msg[256]; - - wsrep_uuid_print (group_uuid, uuid_str, sizeof(uuid_str)); - snprintf (msg, 255, "WSREP: Group state: %s:%lld", - uuid_str, (long long)group_seqno); - wsrep_log_cb (level, msg); - - wsrep_uuid_print (node_uuid, uuid_str, sizeof(uuid_str)); - snprintf (msg, 255, "WSREP: Local state: %s:%lld", - uuid_str, (long long)node_seqno); - wsrep_log_cb (level, msg); -} - -static my_bool set_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) -{ - XID* xid= reinterpret_cast<XID*>(arg); - handlerton* hton= plugin_data(plugin, handlerton *); - if (hton->db_type == DB_TYPE_INNODB) - { - const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid)); - char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); - WSREP_DEBUG("Set WSREPXid for InnoDB: %s:%lld", - uuid_str, (long long)wsrep_xid_seqno(xid)); - hton->wsrep_set_checkpoint(hton, xid); - } - return FALSE; -} - -void wsrep_set_SE_checkpoint(XID* xid) -{ - plugin_foreach(NULL, set_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); -} - -static my_bool get_SE_checkpoint(THD* unused, plugin_ref plugin, void* arg) -{ - XID* xid= reinterpret_cast<XID*>(arg); - handlerton* hton= plugin_data(plugin, handlerton *); - if (hton->db_type == DB_TYPE_INNODB) - { - hton->wsrep_get_checkpoint(hton, xid); - const wsrep_uuid_t* uuid(wsrep_xid_uuid(xid)); - char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); - WSREP_DEBUG("Read WSREPXid from InnoDB: %s:%lld", - uuid_str, (long long)wsrep_xid_seqno(xid)); - - } - return FALSE; -} - -void wsrep_get_SE_checkpoint(XID* xid) -{ - plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); -} - -static void wsrep_view_handler_cb (void* app_ctx, - void* recv_ctx, - const wsrep_view_info_t* view, - const char* state, - size_t state_len, - void** sst_req, - ssize_t* sst_req_len) -{ - wsrep_member_status_t new_status= local_status.get(); - - if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t))) - { - memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid)); - wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, - sizeof(cluster_uuid_str)); - } - - wsrep_cluster_conf_id= view->view; - wsrep_cluster_status= cluster_status_str[view->status]; - wsrep_cluster_size= view->memb_num; - wsrep_local_index= view->my_idx; - - WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " - "number of nodes: %ld, my index: %ld, protocol version %d", - wsrep_cluster_state_uuid, (long long)view->seqno, - (long long)wsrep_cluster_conf_id, wsrep_cluster_status, - wsrep_cluster_size, wsrep_local_index, view->proto_ver); - - /* Proceed further only if view is PRIMARY */ - if (WSREP_VIEW_PRIMARY != view->status) { - wsrep_ready_set(FALSE); - new_status= WSREP_MEMBER_UNDEFINED; - /* Always record local_uuid and local_seqno in non-prim since this - * may lead to re-initializing provider and start position is - * determined according to these variables */ - // WRONG! local_uuid should be the last primary configuration uuid we were - // a member of. local_seqno should be updated in commit calls. - // local_uuid= cluster_uuid; - // local_seqno= view->first - 1; - goto out; - } - - switch (view->proto_ver) - { - case 0: - case 1: - case 2: - // version change - if (view->proto_ver != wsrep_protocol_version) - { - my_bool wsrep_ready_saved= wsrep_ready; - wsrep_ready_set(FALSE); - WSREP_INFO("closing client connections for " - "protocol change %ld -> %d", - wsrep_protocol_version, view->proto_ver); - wsrep_close_client_connections(TRUE); - wsrep_protocol_version= view->proto_ver; - wsrep_ready_set(wsrep_ready_saved); - } - break; - default: - WSREP_ERROR("Unsupported application protocol version: %d", - view->proto_ver); - unireg_abort(1); - } - - if (view->state_gap) - { - WSREP_WARN("Gap in state sequence. Need state transfer."); - - /* After that wsrep will call wsrep_sst_prepare. */ - /* keep ready flag 0 until we receive the snapshot */ - wsrep_ready_set(FALSE); - - /* Close client connections to ensure that they don't interfere - * with SST */ - WSREP_DEBUG("[debug]: closing client connections for PRIM"); - wsrep_close_client_connections(TRUE); - - *sst_req_len= wsrep_sst_prepare (sst_req); - - if (*sst_req_len < 0) - { - int err = *sst_req_len; - WSREP_ERROR("SST preparation failed: %d (%s)", -err, strerror(-err)); - new_status= WSREP_MEMBER_UNDEFINED; - } - else - { - new_status= WSREP_MEMBER_JOINER; - } - } - else - { - /* - * NOTE: Initialize wsrep_group_uuid here only if it wasn't initialized - * before - OR - it was reinitilized on startup (lp:992840) - */ - if (wsrep_startup) - { - if (wsrep_before_SE()) - { - wsrep_SE_init_grab(); - // Signal mysqld init thread to continue - wsrep_sst_complete (&cluster_uuid, view->seqno, false); - // and wait for SE initialization - wsrep_SE_init_wait(); - } - else - { - local_uuid= cluster_uuid; - local_seqno= view->seqno; - } - /* Init storage engine XIDs from first view */ - XID xid; - wsrep_xid_init(&xid, &local_uuid, local_seqno); - wsrep_set_SE_checkpoint(&xid); - new_status= WSREP_MEMBER_JOINED; - } - - // just some sanity check - if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) - { - WSREP_ERROR("Undetected state gap. Can't continue."); - wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno, - &local_uuid, -1); - unireg_abort(1); - } - } - - if (wsrep_auto_increment_control) - { - global_system_variables.auto_increment_offset= view->my_idx + 1; - global_system_variables.auto_increment_increment= view->memb_num; - } - -out: - wsrep_startup= FALSE; - local_status.set(new_status, view); -} - -void wsrep_ready_set (my_bool x) -{ - WSREP_DEBUG("Setting wsrep_ready to %d", x); - if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); - if (wsrep_ready != x) - { - wsrep_ready= x; - mysql_cond_signal (&COND_wsrep_ready); - } - mysql_mutex_unlock (&LOCK_wsrep_ready); -} - -// Wait until wsrep has reached ready state -void wsrep_ready_wait () -{ - if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); - while (!wsrep_ready) - { - WSREP_INFO("Waiting to reach ready state"); - mysql_cond_wait (&COND_wsrep_ready, &LOCK_wsrep_ready); - } - WSREP_INFO("ready state reached"); - mysql_mutex_unlock (&LOCK_wsrep_ready); -} - -static void wsrep_synced_cb(void* app_ctx) -{ - WSREP_INFO("Synchronized with group, ready for connections"); - if (mysql_mutex_lock (&LOCK_wsrep_ready)) abort(); - if (!wsrep_ready) - { - wsrep_ready= TRUE; - mysql_cond_signal (&COND_wsrep_ready); - } - local_status.set(WSREP_MEMBER_SYNCED); - mysql_mutex_unlock (&LOCK_wsrep_ready); -} - -static void wsrep_init_position() -{ - /* read XIDs from storage engines */ - XID xid; - memset(&xid, 0, sizeof(xid)); - xid.formatID= -1; - wsrep_get_SE_checkpoint(&xid); - - if (xid.formatID == -1) - { - WSREP_INFO("Read nil XID from storage engines, skipping position init"); - return; - } - else if (!wsrep_is_wsrep_xid(&xid)) - { - WSREP_WARN("Read non-wsrep XID from storage engines, skipping position init"); - return; - } - - const wsrep_uuid_t* uuid= wsrep_xid_uuid(&xid); - const wsrep_seqno_t seqno= wsrep_xid_seqno(&xid); - - char uuid_str[40] = {0, }; - wsrep_uuid_print(uuid, uuid_str, sizeof(uuid_str)); - WSREP_INFO("Initial position: %s:%lld", uuid_str, (long long)seqno); - - - if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(local_uuid)) && - local_seqno == WSREP_SEQNO_UNDEFINED) - { - // Initial state - local_uuid= *uuid; - local_seqno= seqno; - } - else if (memcmp(&local_uuid, uuid, sizeof(local_uuid)) || - local_seqno != seqno) - { - WSREP_WARN("Initial position was provided by configuration or SST, " - "avoiding override"); - } -} - - -int wsrep_init() -{ - int rcode= -1; - - wsrep_ready_set(FALSE); - assert(wsrep_provider); - wsrep_format_desc= new Format_description_log_event(4); - wsrep_init_position(); - - if ((rcode= wsrep_load(wsrep_provider, &wsrep, wsrep_log_cb)) != WSREP_OK) - { - if (strcasecmp(wsrep_provider, WSREP_NONE)) - { - WSREP_ERROR("wsrep_load(%s) failed: %s (%d). Reverting to no provider.", - wsrep_provider, strerror(rcode), rcode); - strcpy((char*)wsrep_provider, WSREP_NONE); // damn it's a dirty hack - (void) wsrep_init(); - return rcode; - } - else /* this is for recursive call above */ - { - WSREP_ERROR("Could not revert to no provider: %s (%d). Need to abort.", - strerror(rcode), rcode); - unireg_abort(1); - } - } - - if (strlen(wsrep_provider)== 0 || - !strcmp(wsrep_provider, WSREP_NONE)) - { - // enable normal operation in case no provider is specified - wsrep_ready_set(TRUE); - global_system_variables.wsrep_on = 0; - return 0; - } - else - { - global_system_variables.wsrep_on = 1; - strncpy(provider_name, - wsrep->provider_name, sizeof(provider_name) - 1); - strncpy(provider_version, - wsrep->provider_version, sizeof(provider_version) - 1); - strncpy(provider_vendor, - wsrep->provider_vendor, sizeof(provider_vendor) - 1); - } - - if (!wsrep_data_home_dir || strlen(wsrep_data_home_dir) == 0) - wsrep_data_home_dir = mysql_real_data_home; - - char node_addr[512]= { 0, }; - if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) - { - size_t const node_addr_max= sizeof(node_addr); - size_t const ret= guess_ip(node_addr, node_addr_max); - if (!(ret > 0 && ret < node_addr_max)) - { - WSREP_WARN("Failed to guess base node address. Set it explicitly via " - "wsrep_node_address."); - node_addr[0]= '\0'; - } - } - else - { - strncpy(node_addr, wsrep_node_address, sizeof(node_addr) - 1); - } - - static char inc_addr[512]= { 0, }; - - if ((!wsrep_node_incoming_address || - !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) - { - size_t const node_addr_len= strlen(node_addr); - if (node_addr_len > 0) - { - const char* const colon= strrchr(node_addr, ':'); - if (strchr(node_addr, ':') == colon) // 1 or 0 ':' - { - size_t const inc_addr_max= sizeof (inc_addr); - size_t const ip_len= colon ? colon - node_addr : node_addr_len; - if (ip_len + 7 /* :55555\0 */ < inc_addr_max) - { - memcpy (inc_addr, node_addr, ip_len); - snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port); - } - else - { - WSREP_WARN("Guessing address for incoming client connections: " - "address too long."); - inc_addr[0]= '\0'; - } - } - else - { - WSREP_WARN("Guessing address for incoming client connections: " - "too many colons :) ."); - inc_addr[0]= '\0'; - } - } - - // this is to display detected address on SHOW VARIABLES... - wsrep_node_incoming_address = inc_addr; - - if (!strlen(wsrep_node_incoming_address)) - { - WSREP_WARN("Guessing address for incoming client connections failed. " - "Try setting wsrep_node_incoming_address explicitly."); - } - } - - struct wsrep_init_args wsrep_args; - - wsrep_args.data_dir = wsrep_data_home_dir; - wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; - wsrep_args.node_address = node_addr; - wsrep_args.node_incoming = wsrep_node_incoming_address; - wsrep_args.options = (wsrep_provider_options) ? - wsrep_provider_options : ""; - wsrep_args.proto_ver = wsrep_max_protocol_version; - - wsrep_args.state_uuid = &local_uuid; - wsrep_args.state_seqno = local_seqno; - - wsrep_args.logger_cb = wsrep_log_cb; - wsrep_args.view_handler_cb = wsrep_view_handler_cb; - wsrep_args.apply_cb = wsrep_apply_cb; - wsrep_args.commit_cb = wsrep_commit_cb; - wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; - wsrep_args.synced_cb = wsrep_synced_cb; - - rcode = wsrep->init(wsrep, &wsrep_args); - - if (rcode) - { - DBUG_PRINT("wsrep",("wsrep::init() failed: %d", rcode)); - WSREP_ERROR("wsrep::init() failed: %d, must shutdown", rcode); - free(wsrep); - wsrep = NULL; - } - - return rcode; -} - -extern "C" int wsrep_on(void *); - -void wsrep_init_startup (bool first) -{ - if (wsrep_init()) unireg_abort(1); - - wsrep_thr_lock_init(wsrep_thd_is_brute_force, wsrep_abort_thd, - wsrep_debug, wsrep_convert_LOCK_to_trx, wsrep_on); - - /* Skip replication start if no cluster address */ - if (!wsrep_cluster_address || strlen(wsrep_cluster_address) == 0) return; - - if (first) wsrep_sst_grab(); // do it so we can wait for SST below - - if (!wsrep_start_replication()) unireg_abort(1); - - wsrep_create_rollbacker(); - wsrep_create_appliers(1); - - if (first && !wsrep_sst_wait()) unireg_abort(1);// wait until SST is completed -} - - -void wsrep_deinit() -{ - wsrep_unload(wsrep); - wsrep= 0; - provider_name[0]= '\0'; - provider_version[0]= '\0'; - provider_vendor[0]= '\0'; - - delete wsrep_format_desc; - wsrep_format_desc= NULL; -} - -void wsrep_recover() -{ - if (!memcmp(&local_uuid, &WSREP_UUID_UNDEFINED, sizeof(wsrep_uuid_t)) && - local_seqno == -2) - { - char uuid_str[40]; - wsrep_uuid_print(&local_uuid, uuid_str, sizeof(uuid_str)); - WSREP_INFO("Position %s:%lld given at startup, skipping position recovery", - uuid_str, (long long)local_seqno); - return; - } - XID xid; - memset(&xid, 0, sizeof(xid)); - xid.formatID= -1; - wsrep_get_SE_checkpoint(&xid); - char uuid_str[40]; - wsrep_uuid_print(wsrep_xid_uuid(&xid), uuid_str, sizeof(uuid_str)); - WSREP_INFO("Recovered position: %s:%lld", uuid_str, - (long long)wsrep_xid_seqno(&xid)); -} - - -void wsrep_stop_replication(THD *thd) -{ - WSREP_INFO("Stop replication"); - if (!wsrep) - { - WSREP_INFO("Provider was not loaded, in stop replication"); - return; - } - - /* disconnect from group first to get wsrep_ready == FALSE */ - WSREP_DEBUG("Provider disconnect"); - wsrep->disconnect(wsrep); - - wsrep_connected= FALSE; - - wsrep_close_client_connections(TRUE); - - /* wait until appliers have stopped */ - wsrep_wait_appliers_close(thd); - - return; -} - - -bool wsrep_start_replication() -{ - wsrep_status_t rcode; - - /* - if provider is trivial, don't even try to connect, - but resume local node operation - */ - if (strlen(wsrep_provider)== 0 || - !strcmp(wsrep_provider, WSREP_NONE)) - { - // enable normal operation in case no provider is specified - wsrep_ready_set(TRUE); - return true; - } - - if (!wsrep_cluster_address || strlen(wsrep_cluster_address)== 0) - { - // if provider is non-trivial, but no address is specified, wait for address - wsrep_ready_set(FALSE); - return true; - } - - WSREP_INFO("Start replication"); - - if ((rcode = wsrep->connect(wsrep, - wsrep_cluster_name, - wsrep_cluster_address, - wsrep_sst_donor))) - { - if (-ESOCKTNOSUPPORT == rcode) - { - DBUG_PRINT("wsrep",("unrecognized cluster address: '%s', rcode: %d", - wsrep_cluster_address, rcode)); - WSREP_ERROR("unrecognized cluster address: '%s', rcode: %d", - wsrep_cluster_address, rcode); - } - else - { - DBUG_PRINT("wsrep",("wsrep->connect() failed: %d", rcode)); - WSREP_ERROR("wsrep::connect() failed: %d", rcode); - } - - return false; - } - else - { - wsrep_connected= TRUE; - - uint64_t caps = wsrep->capabilities (wsrep); - - wsrep_incremental_data_collection = - (caps & WSREP_CAP_WRITE_SET_INCREMENTS); - - char* opts= wsrep->options_get(wsrep); - if (opts) - { - wsrep_provider_options_init(opts); - free(opts); - } - else - { - WSREP_WARN("Failed to get wsrep options"); - } - } - - return true; -} - -bool -wsrep_causal_wait (THD* thd) -{ - if (thd->variables.wsrep_causal_reads && thd->variables.wsrep_on && - !thd->in_active_multi_stmt_transaction() && - thd->wsrep_conflict_state != REPLAYING) - { - // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 - // TODO: modify to check if thd has locked any rows. - wsrep_seqno_t seqno; - wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno); - - if (unlikely(WSREP_OK != ret)) - { - const char* msg; - int err; - - // Possibly relevant error codes: - // ER_CHECKREAD, ER_ERROR_ON_READ, ER_INVALID_DEFAULT, ER_EMPTY_QUERY, - // ER_FUNCTION_NOT_DEFINED, ER_NOT_ALLOWED_COMMAND, ER_NOT_SUPPORTED_YET, - // ER_FEATURE_DISABLED, ER_QUERY_INTERRUPTED - - switch (ret) - { - case WSREP_NOT_IMPLEMENTED: - msg= "consistent reads by wsrep backend. " - "Please unset wsrep_causal_reads variable."; - err= ER_NOT_SUPPORTED_YET; - break; - default: - msg= "Causal wait failed."; - err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed - // with ER_LOCK_WAIT_TIMEOUT - } - - my_error(err, MYF(0), msg); - - return true; - } - } - - return false; -} - -/* - * Helpers to deal with TOI key arrays - */ -typedef struct wsrep_key_arr -{ - wsrep_key_t* keys; - size_t keys_len; -} wsrep_key_arr_t; - - -static void wsrep_keys_free(wsrep_key_arr_t* key_arr) -{ - for (size_t i= 0; i < key_arr->keys_len; ++i) - { - my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts); - } - my_free(key_arr->keys); - key_arr->keys= 0; - key_arr->keys_len= 0; -} - - -/*! - * @param db Database string - * @param table Table string - * @param key Array of wsrep_key_t - * @param key_len In: number of elements in key array, Out: number of - * elements populated - * - * @return true if preparation was successful, otherwise false. - */ - -static bool wsrep_prepare_key_for_isolation(const char* db, - const char* table, - wsrep_key_part_t* key, - size_t* key_len) -{ - if (*key_len < 2) return false; - - switch (wsrep_protocol_version) - { - case 0: - *key_len= 0; - break; - case 1: - case 2: - { - *key_len= 0; - if (db) - { - // sql_print_information("%s.%s", db, table); - if (db) - { - key[*key_len].buf= db; - key[*key_len].buf_len= strlen(db); - ++(*key_len); - if (table) - { - key[*key_len].buf= table; - key[*key_len].buf_len= strlen(table); - ++(*key_len); - } - } - } - break; - } - default: - return false; - } - - return true; -} - -/* Prepare key list from db/table and table_list */ -static bool wsrep_prepare_keys_for_isolation(THD* thd, - const char* db, - const char* table, - const TABLE_LIST* table_list, - wsrep_key_arr_t* ka) -{ - ka->keys= 0; - ka->keys_len= 0; - - extern TABLE* find_temporary_table(THD*, const TABLE_LIST*); - - if (db || table) - { - TABLE_LIST tmp_table; - bzero((char*) &tmp_table,sizeof(tmp_table)); - tmp_table.table_name= (char*)db; - tmp_table.db= (char*)table; - if (!table || !find_temporary_table(thd, &tmp_table)) - { - if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) - { - sql_print_error("Can't allocate memory for key_array"); - goto err; - } - ka->keys_len= 1; - if (!(ka->keys[0].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) - { - sql_print_error("Can't allocate memory for key_parts"); - goto err; - } - ka->keys[0].key_parts_len= 2; - if (!wsrep_prepare_key_for_isolation( - db, table, - (wsrep_key_part_t*)ka->keys[0].key_parts, - &ka->keys[0].key_parts_len)) - { - sql_print_error("Preparing keys for isolation failed"); - goto err; - } - } - } - - for (const TABLE_LIST* table= table_list; table; table= table->next_global) - { - if (!find_temporary_table(thd, table)) - { - wsrep_key_t* tmp; - tmp= (wsrep_key_t*)my_realloc( - ka->keys, (ka->keys_len + 1) * sizeof(wsrep_key_t), MYF(0)); - if (!tmp) - { - sql_print_error("Can't allocate memory for key_array"); - goto err; - } - ka->keys= tmp; - if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*) - my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) - { - sql_print_error("Can't allocate memory for key_parts"); - goto err; - } - ka->keys[ka->keys_len].key_parts_len= 2; - ++ka->keys_len; - if (!wsrep_prepare_key_for_isolation( - table->db, table->table_name, - (wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts, - &ka->keys[ka->keys_len - 1].key_parts_len)) - { - sql_print_error("Preparing keys for isolation failed"); - goto err; - } - } - } - return true; -err: - wsrep_keys_free(ka); - return false; -} - - - -bool wsrep_prepare_key_for_innodb(const uchar* cache_key, - size_t cache_key_len, - const uchar* row_id, - size_t row_id_len, - wsrep_key_part_t* key, - size_t* key_len) -{ - if (*key_len < 3) return false; - - *key_len= 0; - switch (wsrep_protocol_version) - { - case 0: - { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = cache_key_len; - ++(*key_len); - break; - } - case 1: - case 2: - { - key[*key_len].buf = cache_key; - key[*key_len].buf_len = strlen( (char*)cache_key ); - ++(*key_len); - key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1; - key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) ); - ++(*key_len); - break; - } - default: - return false; - } - - key[*key_len].buf = row_id; - key[*key_len].buf_len = row_id_len; - ++(*key_len); - - return true; -} - -/* - * Construct Query_log_Event from thd query and serialize it - * into buffer. - * - * Return 0 in case of success, 1 in case of error. - */ -int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len) -{ - IO_CACHE tmp_io_cache; - if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, - 65536, MYF(MY_WME))) - return 1; - Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); - int ret(0); - if (ev.write(&tmp_io_cache)) ret= 1; - if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1; - close_cached_file(&tmp_io_cache); - return ret; -} - -#include "sql_show.h" -static int -create_view_query(THD *thd, uchar** buf, uint* buf_len) -{ - LEX *lex= thd->lex; - SELECT_LEX *select_lex= &lex->select_lex; - TABLE_LIST *first_table= select_lex->table_list.first; - TABLE_LIST *views = first_table; - - String buff; - const LEX_STRING command[3]= - {{ C_STRING_WITH_LEN("CREATE ") }, - { C_STRING_WITH_LEN("ALTER ") }, - { C_STRING_WITH_LEN("CREATE OR REPLACE ") }}; - - buff.append(command[thd->lex->create_view_mode].str, - command[thd->lex->create_view_mode].length); - - if (!lex->definer) - { - /* - DEFINER-clause is missing; we have to create default definer in - persistent arena to be PS/SP friendly. - If this is an ALTER VIEW then the current user should be set as - the definer. - */ - - if (!(lex->definer= create_default_definer(thd))) - { - WSREP_WARN("view default definer issue"); - } - } - - views->algorithm = lex->create_view_algorithm; - views->definer.user = lex->definer->user; - views->definer.host = lex->definer->host; - views->view_suid = lex->create_view_suid; - views->with_check = lex->create_view_check; - - view_store_options(thd, views, &buff); - buff.append(STRING_WITH_LEN("VIEW ")); - /* Test if user supplied a db (ie: we did not use thd->db) */ - if (views->db && views->db[0] && - (thd->db == NULL || strcmp(views->db, thd->db))) - { - append_identifier(thd, &buff, views->db, - views->db_length); - buff.append('.'); - } - append_identifier(thd, &buff, views->table_name, - views->table_name_length); - if (lex->view_list.elements) - { - List_iterator_fast<LEX_STRING> names(lex->view_list); - LEX_STRING *name; - int i; - - for (i= 0; (name= names++); i++) - { - buff.append(i ? ", " : "("); - append_identifier(thd, &buff, name->str, name->length); - } - buff.append(')'); - } - buff.append(STRING_WITH_LEN(" AS ")); - //buff.append(views->source.str, views->source.length); - buff.append(thd->lex->create_view_select.str, - thd->lex->create_view_select.length); - //int errcode= query_error_code(thd, TRUE); - //if (thd->binlog_query(THD::STMT_QUERY_TYPE, - // buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcod - return wsrep_to_buf_helper(thd, buff.ptr(), buff.length(), buf, buf_len); -} - -static int wsrep_TOI_begin(THD *thd, char *db_, char *table_, - const TABLE_LIST* table_list) -{ - wsrep_status_t ret(WSREP_WARNING); - uchar* buf(0); - uint buf_len(0); - int buf_err; - - WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode, thd->query() ); - switch (thd->lex->sql_command) - { - case SQLCOM_CREATE_VIEW: - buf_err= create_view_query(thd, &buf, &buf_len); - break; - case SQLCOM_CREATE_PROCEDURE: - case SQLCOM_CREATE_SPFUNCTION: - buf_err= wsrep_create_sp(thd, &buf, &buf_len); - break; - case SQLCOM_CREATE_TRIGGER: - buf_err= wsrep_create_trigger_query(thd, &buf, &buf_len); - break; - case SQLCOM_CREATE_EVENT: - buf_err= wsrep_create_event_query(thd, &buf, &buf_len); - break; - default: - buf_err= wsrep_to_buf_helper(thd, thd->query(), thd->query_length(), &buf, - &buf_len); - break; - } - - wsrep_key_arr_t key_arr= {0, 0}; - if (!buf_err && - wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& - WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, - key_arr.keys, key_arr.keys_len, - buf, buf_len, - &thd->wsrep_trx_seqno))) - { - thd->wsrep_exec_mode= TOTAL_ORDER; - wsrep_to_isolation++; - if (buf) my_free(buf); - wsrep_keys_free(&key_arr); - WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode); - } - else { - /* jump to error handler in mysql_execute_command() */ - WSREP_WARN("TO isolation failed for: %d, sql: %s. Check wsrep " - "connection state and retry the query.", - ret, (thd->query()) ? thd->query() : "void"); - my_error(ER_LOCK_DEADLOCK, MYF(0), "WSREP replication failed. Check " - "your wsrep connection state and retry the query."); - if (buf) my_free(buf); - wsrep_keys_free(&key_arr); - return -1; - } - return 0; -} - -static void wsrep_TOI_end(THD *thd) { - wsrep_status_t ret; - wsrep_to_isolation--; - WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void") - if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { - WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno); - } - else { - WSREP_WARN("TO isolation end failed for: %d, sql: %s", - ret, (thd->query()) ? thd->query() : "void"); - } -} - -static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) -{ - wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode, thd->query() ); - - ret = wsrep->desync(wsrep); - if (ret != WSREP_OK) - { - WSREP_WARN("RSU desync failed %d for %s", ret, thd->query()); - my_error(ER_LOCK_DEADLOCK, MYF(0)); - return(ret); - } - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying++; - mysql_mutex_unlock(&LOCK_wsrep_replaying); - - if (wsrep_wait_committing_connections_close(5000)) - { - /* no can do, bail out from DDL */ - WSREP_WARN("RSU failed due to pending transactions, %s", thd->query()); - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - mysql_mutex_unlock(&LOCK_wsrep_replaying); - - ret = wsrep->resync(wsrep); - if (ret != WSREP_OK) - { - WSREP_WARN("resync failed %d for %s", ret, thd->query()); - } - my_error(ER_LOCK_DEADLOCK, MYF(0)); - return(1); - } - - wsrep_seqno_t seqno = wsrep->pause(wsrep); - if (seqno == WSREP_SEQNO_UNDEFINED) - { - WSREP_WARN("pause failed %lld for %s", (long long)seqno, thd->query()); - return(1); - } - WSREP_DEBUG("paused at %lld", (long long)seqno); - thd->variables.wsrep_on = 0; - return 0; -} - -static void wsrep_RSU_end(THD *thd) -{ - wsrep_status_t ret(WSREP_WARNING); - WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, - thd->wsrep_exec_mode, thd->query() ); - - - mysql_mutex_lock(&LOCK_wsrep_replaying); - wsrep_replaying--; - mysql_mutex_unlock(&LOCK_wsrep_replaying); - - ret = wsrep->resume(wsrep); - if (ret != WSREP_OK) - { - WSREP_WARN("resume failed %d for %s", ret, thd->query()); - } - ret = wsrep->resync(wsrep); - if (ret != WSREP_OK) - { - WSREP_WARN("resync failed %d for %s", ret, thd->query()); - return; - } - thd->variables.wsrep_on = 1; - return; -} - -int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, - const TABLE_LIST* table_list) -{ - int ret= 0; - mysql_mutex_lock(&thd->LOCK_wsrep_thd); - if (thd->wsrep_conflict_state == MUST_ABORT) - { - WSREP_INFO("thread: %lu, %s has been aborted due to multi-master conflict", - thd->thread_id, thd->query()); - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - return WSREP_TRX_FAIL; - } - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - - if (wsrep_debug && thd->mdl_context.has_locks()) - { - WSREP_DEBUG("thread holds MDL locks at TI begin: %s %lu", - thd->query(), thd->thread_id); - } - if (thd->variables.wsrep_on && thd->wsrep_exec_mode==LOCAL_STATE) - { - switch (wsrep_OSU_method_options) { - case WSREP_OSU_TOI: ret = wsrep_TOI_begin(thd, db_, table_, - table_list); break; - case WSREP_OSU_RSU: ret = wsrep_RSU_begin(thd, db_, table_); break; - } - if (!ret) - { - thd->wsrep_exec_mode= TOTAL_ORDER; - } - } - return ret; -} - -void wsrep_to_isolation_end(THD *thd) { - if (thd->wsrep_exec_mode==TOTAL_ORDER) - { - switch(wsrep_OSU_method_options) - { - case WSREP_OSU_TOI: return wsrep_TOI_end(thd); - case WSREP_OSU_RSU: return wsrep_RSU_end(thd); - } - } -} - -#define WSREP_MDL_LOG(severity, msg, req, gra) \ - WSREP_##severity( \ - "%s\n" \ - "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ - "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ - msg, \ - req->thread_id, (long long)req->wsrep_trx_seqno, \ - req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ - req->command, req->lex->sql_command, req->query(), \ - gra->thread_id, (long long)gra->wsrep_trx_seqno, \ - gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ - gra->command, gra->lex->sql_command, gra->query()); - -bool -wsrep_grant_mdl_exception(MDL_context *requestor_ctx, - MDL_ticket *ticket -) { - if (!WSREP_ON) return FALSE; - - THD *request_thd = requestor_ctx->get_thd(); - THD *granted_thd = ticket->get_ctx()->get_thd(); - bool ret = FALSE; - - mysql_mutex_lock(&request_thd->LOCK_wsrep_thd); - if (request_thd->wsrep_exec_mode == TOTAL_ORDER || - request_thd->wsrep_exec_mode == REPL_RECV) - { - mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd); - WSREP_MDL_LOG(DEBUG, "MDL conflict ", request_thd, granted_thd); - ticket->wsrep_report(wsrep_debug); - - mysql_mutex_lock(&granted_thd->LOCK_wsrep_thd); - if (granted_thd->wsrep_exec_mode == TOTAL_ORDER || - granted_thd->wsrep_exec_mode == REPL_RECV) - { - WSREP_MDL_LOG(INFO, "MDL BF-BF conflict", request_thd, granted_thd); - ticket->wsrep_report(true); - mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); - ret = TRUE; - } - else if (granted_thd->lex->sql_command == SQLCOM_FLUSH) - { - WSREP_DEBUG("mdl granted over FLUSH BF"); - ticket->wsrep_report(wsrep_debug); - mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); - ret = TRUE; - } - else if (request_thd->lex->sql_command == SQLCOM_DROP_TABLE) - { - WSREP_DEBUG("DROP caused BF abort"); - ticket->wsrep_report(wsrep_debug); - mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); - wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); - ret = FALSE; - } - else if (granted_thd->wsrep_query_state == QUERY_COMMITTING) - { - WSREP_DEBUG("mdl granted, but commiting thd abort scheduled"); - ticket->wsrep_report(wsrep_debug); - mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); - wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); - ret = FALSE; - } - else - { - WSREP_MDL_LOG(DEBUG, "MDL conflict-> BF abort", request_thd, granted_thd); - ticket->wsrep_report(wsrep_debug); - mysql_mutex_unlock(&granted_thd->LOCK_wsrep_thd); - wsrep_abort_thd((void*)request_thd, (void*)granted_thd, 1); - ret = FALSE; - } - } - else - { - mysql_mutex_unlock(&request_thd->LOCK_wsrep_thd); - } - return ret; -} diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h deleted file mode 100644 index 6b9640cea70..00000000000 --- a/sql/wsrep_mysqld.h +++ /dev/null @@ -1,383 +0,0 @@ -/* Copyright 2008-2012 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#ifndef WSREP_MYSQLD_H -#define WSREP_MYSQLD_H - -#include "mysqld.h" -typedef struct st_mysql_show_var SHOW_VAR; -#include <sql_priv.h> -#include "../wsrep/wsrep_api.h" - -class set_var; -class THD; - -#ifdef WITH_WSREP -#include "../wsrep/wsrep_api.h" -//#include "wsrep_mysqld.h" - enum wsrep_exec_mode { - LOCAL_STATE, - REPL_RECV, - TOTAL_ORDER, - LOCAL_COMMIT, - }; - enum wsrep_query_state { - QUERY_IDLE, - QUERY_EXEC, - QUERY_COMMITTING, - QUERY_EXITING, - QUERY_ROLLINGBACK, - }; - enum wsrep_conflict_state { - NO_CONFLICT, - MUST_ABORT, - ABORTING, - ABORTED, - MUST_REPLAY, - REPLAYING, - RETRY_AUTOCOMMIT, - CERT_FAILURE, - }; - enum wsrep_consistency_check_mode { - NO_CONSISTENCY_CHECK, - CONSISTENCY_CHECK_DECLARED, - CONSISTENCY_CHECK_RUNNING, - }; -#endif - -// Global wsrep parameters -extern wsrep_t* wsrep; - -// MySQL wsrep options -extern const char* wsrep_provider; -extern const char* wsrep_provider_options; -extern const char* wsrep_cluster_name; -extern const char* wsrep_cluster_address; -extern const char* wsrep_node_name; -extern const char* wsrep_node_address; -extern const char* wsrep_node_incoming_address; -extern const char* wsrep_data_home_dir; -extern const char* wsrep_dbug_option; -extern long wsrep_slave_threads; -extern my_bool wsrep_debug; -extern my_bool wsrep_convert_LOCK_to_trx; -extern ulong wsrep_retry_autocommit; -extern my_bool wsrep_auto_increment_control; -extern my_bool wsrep_drupal_282555_workaround; -extern my_bool wsrep_incremental_data_collection; -extern const char* wsrep_sst_method; -extern const char* wsrep_sst_receive_address; -extern char* wsrep_sst_auth; -extern const char* wsrep_sst_donor; -extern my_bool wsrep_sst_donor_rejects_queries; -extern const char* wsrep_start_position; -extern long long wsrep_max_ws_size; -extern long wsrep_max_ws_rows; -extern const char* wsrep_notify_cmd; -extern my_bool wsrep_certify_nonPK; -extern long wsrep_max_protocol_version; -extern long wsrep_protocol_version; -extern ulong wsrep_forced_binlog_format; -extern ulong wsrep_OSU_method_options; -extern my_bool wsrep_recovery; -extern my_bool wsrep_replicate_myisam; -extern my_bool wsrep_log_conflicts; -extern ulong wsrep_mysql_replication_bundle; - -enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU }; - -// MySQL status variables -extern my_bool wsrep_connected; -extern my_bool wsrep_ready; -extern const char* wsrep_cluster_state_uuid; -extern long long wsrep_cluster_conf_id; -extern const char* wsrep_cluster_status; -extern long wsrep_cluster_size; -extern long wsrep_local_index; -extern const char* wsrep_provider_name; -extern const char* wsrep_provider_version; -extern const char* wsrep_provider_vendor; -extern int wsrep_show_status(THD *thd, SHOW_VAR *var, char *buff); -extern void wsrep_free_status(THD *thd); - -#define WSREP_SST_ADDRESS_AUTO "AUTO" -// MySQL variables funcs - -#define CHECK_ARGS (sys_var *self, THD* thd, set_var *var) -#define UPDATE_ARGS (sys_var *self, THD* thd, enum_var_type type) -#define DEFAULT_ARGS (THD* thd, enum_var_type var_type) -#define INIT_ARGS (const char* opt) - -extern int wsrep_init_vars(); - -extern bool wsrep_on_update UPDATE_ARGS; -extern void wsrep_causal_reads_update UPDATE_ARGS; -extern bool wsrep_start_position_check CHECK_ARGS; -extern bool wsrep_start_position_update UPDATE_ARGS; -extern void wsrep_start_position_init INIT_ARGS; - -extern bool wsrep_provider_check CHECK_ARGS; -extern bool wsrep_provider_update UPDATE_ARGS; -extern void wsrep_provider_init INIT_ARGS; - -extern bool wsrep_provider_options_check CHECK_ARGS; -extern bool wsrep_provider_options_update UPDATE_ARGS; -extern void wsrep_provider_options_init INIT_ARGS; - -extern bool wsrep_cluster_address_check CHECK_ARGS; -extern bool wsrep_cluster_address_update UPDATE_ARGS; -extern void wsrep_cluster_address_init INIT_ARGS; - -extern bool wsrep_cluster_name_check CHECK_ARGS; -extern bool wsrep_cluster_name_update UPDATE_ARGS; - -extern bool wsrep_node_name_check CHECK_ARGS; -extern bool wsrep_node_name_update UPDATE_ARGS; - -extern bool wsrep_node_address_check CHECK_ARGS; -extern bool wsrep_node_address_update UPDATE_ARGS; -extern void wsrep_node_address_init INIT_ARGS; - -extern bool wsrep_sst_method_check CHECK_ARGS; -extern bool wsrep_sst_method_update UPDATE_ARGS; -extern void wsrep_sst_method_init INIT_ARGS; - -extern bool wsrep_sst_receive_address_check CHECK_ARGS; -extern bool wsrep_sst_receive_address_update UPDATE_ARGS; - -extern bool wsrep_sst_auth_check CHECK_ARGS; -extern bool wsrep_sst_auth_update UPDATE_ARGS; -extern void wsrep_sst_auth_init INIT_ARGS; - -extern bool wsrep_sst_donor_check CHECK_ARGS; -extern bool wsrep_sst_donor_update UPDATE_ARGS; - -extern bool wsrep_slave_threads_check CHECK_ARGS; -extern bool wsrep_slave_threads_update UPDATE_ARGS; - -extern bool wsrep_before_SE(); // initialize wsrep before storage - // engines (true) or after (false) -extern int wsrep_init(); -extern void wsrep_deinit(); -extern void wsrep_recover(); - - - -extern "C" enum wsrep_exec_mode wsrep_thd_exec_mode(THD *thd); -extern "C" enum wsrep_conflict_state wsrep_thd_conflict_state(THD *thd); -extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd); -extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); -extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); -extern "C" const char * wsrep_thd_query_state_str(THD *thd); -extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); - -extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); -extern "C" void wsrep_thd_set_query_state( - THD *thd, enum wsrep_query_state state); -extern "C" void wsrep_thd_set_conflict_state( - THD *thd, enum wsrep_conflict_state state); - -extern "C" void wsrep_thd_set_trx_to_replay(THD *thd, uint64 trx_id); - -extern "C"void wsrep_thd_LOCK(THD *thd); -extern "C"void wsrep_thd_UNLOCK(THD *thd); -extern "C" uint32 wsrep_thd_wsrep_rand(THD *thd); -extern "C" time_t wsrep_thd_query_start(THD *thd); -extern "C" my_thread_id wsrep_thd_thread_id(THD *thd); -extern "C" int64_t wsrep_thd_trx_seqno(THD *thd); -extern "C" query_id_t wsrep_thd_query_id(THD *thd); -extern "C" char * wsrep_thd_query(THD *thd); -extern "C" query_id_t wsrep_thd_wsrep_last_query_id(THD *thd); -extern "C" void wsrep_thd_set_wsrep_last_query_id(THD *thd, query_id_t id); -extern "C" void wsrep_thd_awake(THD *thd, my_bool signal); - - - -/* wsrep initialization sequence at startup - * @param first wsrep_before_SE() value */ -extern void wsrep_init_startup(bool before); - -extern void wsrep_close_client_connections(my_bool wait_to_end); -extern int wsrep_wait_committing_connections_close(int wait_time); -extern void wsrep_close_applier(THD *thd); -extern void wsrep_wait_appliers_close(THD *thd); -extern void wsrep_close_applier_threads(int count); -extern void wsrep_create_appliers(long threads = wsrep_slave_threads); -extern void wsrep_create_rollbacker(); -extern void wsrep_kill_mysql(THD *thd); - -/* new defines */ -extern void wsrep_stop_replication(THD *thd); -extern bool wsrep_start_replication(); -extern bool wsrep_causal_wait(THD* thd); -extern int wsrep_check_opts (int argc, char* const* argv); -extern void wsrep_prepend_PATH (const char* path); - -/* Other global variables */ -extern wsrep_seqno_t wsrep_locked_seqno; - -#define WSREP_ON \ - (global_system_variables.wsrep_on) - -#define WSREP(thd) \ - (WSREP_ON && (thd && thd->variables.wsrep_on)) - -#define WSREP_CLIENT(thd) \ - (WSREP(thd) && thd->wsrep_client_thread) - -#define WSREP_EMULATE_BINLOG(thd) \ - (WSREP(thd) && wsrep_emulate_bin_log) - -// MySQL logging functions don't seem to understand long long length modifer. -// This is a workaround. It also prefixes all messages with "WSREP" -#define WSREP_LOG(fun, ...) \ - { \ - char msg[1024] = {'\0'}; \ - snprintf(msg, sizeof(msg) - 1, ## __VA_ARGS__); \ - fun("WSREP: %s", msg); \ - } - -#define WSREP_DEBUG(...) \ - if (wsrep_debug) WSREP_LOG(sql_print_information, ##__VA_ARGS__) -#define WSREP_INFO(...) WSREP_LOG(sql_print_information, ##__VA_ARGS__) -#define WSREP_WARN(...) WSREP_LOG(sql_print_warning, ##__VA_ARGS__) -#define WSREP_ERROR(...) WSREP_LOG(sql_print_error, ##__VA_ARGS__) - -#define WSREP_LOG_CONFLICT_THD(thd, role) \ - WSREP_LOG(sql_print_information, \ - "%s: \n " \ - " THD: %lu, mode: %s, state: %s, conflict: %s, seqno: %lld\n " \ - " SQL: %s", \ - role, wsrep_thd_thread_id(thd), wsrep_thd_exec_mode_str(thd), \ - wsrep_thd_query_state_str(thd), \ - wsrep_thd_conflict_state_str(thd), (long long)wsrep_thd_trx_seqno(thd), \ - wsrep_thd_query(thd) \ - ); - -#define WSREP_LOG_CONFLICT(bf_thd, victim_thd, bf_abort) \ - if (wsrep_debug || wsrep_log_conflicts) \ - { \ - WSREP_LOG(sql_print_information, "cluster conflict due to %s for threads:",\ - (bf_abort) ? "high priority abort" : "certification failure" \ - ); \ - if (bf_thd) WSREP_LOG_CONFLICT_THD(bf_thd, "Winning thread"); \ - if (victim_thd) WSREP_LOG_CONFLICT_THD(victim_thd, "Victim thread"); \ - } - -/*! Synchronizes applier thread start with init thread */ -extern void wsrep_sst_grab(); -/*! Init thread waits for SST completion */ -extern bool wsrep_sst_wait(); -/*! Signals wsrep that initialization is complete, writesets can be applied */ -extern void wsrep_sst_continue(); - -extern void wsrep_SE_init_grab(); /*! grab init critical section */ -extern void wsrep_SE_init_wait(); /*! wait for SE init to complete */ -extern void wsrep_SE_init_done(); /*! signal that SE init is complte */ -extern void wsrep_SE_initialized(); /*! mark SE initialization complete */ - -extern void wsrep_ready_wait(); - -enum wsrep_trx_status { - WSREP_TRX_OK, - WSREP_TRX_ROLLBACK, - WSREP_TRX_ERROR, - }; - -extern enum wsrep_trx_status -wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); -class Ha_trx_info; -struct THD_TRANS; -void wsrep_register_hton(THD* thd, bool all); - -void wsrep_replication_process(THD *thd); -void wsrep_rollback_process(THD *thd); -void wsrep_brute_force_killer(THD *thd); -int wsrep_hire_brute_force_killer(THD *thd, uint64_t trx_id); -extern "C" bool wsrep_consistency_check(void *thd_ptr); -extern "C" int wsrep_thd_is_brute_force(void *thd_ptr); -extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, - my_bool signal); -extern "C" int wsrep_thd_in_locking_session(void *thd_ptr); -void *wsrep_prepare_bf_thd(THD *thd); -void wsrep_return_from_bf_mode(void *shadow, THD *thd); - -/* this is visible for client build so that innodb plugin gets this */ -typedef struct wsrep_aborting_thd { - struct wsrep_aborting_thd *next; - THD *aborting_thd; -} *wsrep_aborting_thd_t; - -extern mysql_mutex_t LOCK_wsrep_ready; -extern mysql_cond_t COND_wsrep_ready; -extern mysql_mutex_t LOCK_wsrep_sst; -extern mysql_cond_t COND_wsrep_sst; -extern mysql_mutex_t LOCK_wsrep_sst_init; -extern mysql_cond_t COND_wsrep_sst_init; -extern mysql_mutex_t LOCK_wsrep_rollback; -extern mysql_cond_t COND_wsrep_rollback; -extern int wsrep_replaying; -extern mysql_mutex_t LOCK_wsrep_replaying; -extern mysql_cond_t COND_wsrep_replaying; -extern wsrep_aborting_thd_t wsrep_aborting_thd; -extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug; -extern my_bool wsrep_convert_LOCK_to_trx; -extern ulong wsrep_retry_autocommit; -extern my_bool wsrep_emulate_bin_log; -extern my_bool wsrep_auto_increment_control; -extern my_bool wsrep_drupal_282555_workaround; -extern long long wsrep_max_ws_size; -extern long wsrep_max_ws_rows; -extern int wsrep_to_isolation; -extern my_bool wsrep_certify_nonPK; -extern mysql_mutex_t LOCK_wsrep_slave_threads; - -extern PSI_mutex_key key_LOCK_wsrep_ready; -extern PSI_mutex_key key_COND_wsrep_ready; -extern PSI_mutex_key key_LOCK_wsrep_sst; -extern PSI_cond_key key_COND_wsrep_sst; -extern PSI_mutex_key key_LOCK_wsrep_sst_init; -extern PSI_cond_key key_COND_wsrep_sst_init; -extern PSI_mutex_key key_LOCK_wsrep_sst_thread; -extern PSI_cond_key key_COND_wsrep_sst_thread; -extern PSI_mutex_key key_LOCK_wsrep_rollback; -extern PSI_cond_key key_COND_wsrep_rollback; -extern PSI_mutex_key key_LOCK_wsrep_replaying; -extern PSI_cond_key key_COND_wsrep_replaying; -extern PSI_mutex_key key_LOCK_wsrep_slave_threads; - -struct TABLE_LIST; -int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, - const TABLE_LIST* table_list); -void wsrep_to_isolation_end(THD *thd); - -void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*); -void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*); -int wsrep_to_buf_helper( - THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len); -int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len); -int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len); -int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len); - -const wsrep_uuid_t* wsrep_cluster_uuid(); -struct xid_t; -void wsrep_set_SE_checkpoint(xid_t*); - -void wsrep_xid_init(xid_t*, const wsrep_uuid_t*, wsrep_seqno_t); -const wsrep_uuid_t* wsrep_xid_uuid(const xid_t*); -wsrep_seqno_t wsrep_xid_seqno(const xid_t*); -extern "C" int wsrep_is_wsrep_xid(const void* xid); - -#endif /* WSREP_MYSQLD_H */ diff --git a/sql/wsrep_notify.cc b/sql/wsrep_notify.cc deleted file mode 100644 index ff997d01183..00000000000 --- a/sql/wsrep_notify.cc +++ /dev/null @@ -1,107 +0,0 @@ -/* Copyright 2010 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include <mysqld.h> -#include "wsrep_priv.h" - -const char* wsrep_notify_cmd=""; - -static const char* _status_str(wsrep_member_status_t status) -{ - switch (status) - { - case WSREP_MEMBER_UNDEFINED: return "Undefined"; - case WSREP_MEMBER_JOINER: return "Joiner"; - case WSREP_MEMBER_DONOR: return "Donor"; - case WSREP_MEMBER_JOINED: return "Joined"; - case WSREP_MEMBER_SYNCED: return "Synced"; - default: return "Error(?)"; - } -} - -void wsrep_notify_status (wsrep_member_status_t status, - const wsrep_view_info_t* view) -{ - if (!wsrep_notify_cmd || 0 == strlen(wsrep_notify_cmd)) - { - WSREP_INFO("wsrep_notify_cmd is not defined, skipping notification."); - return; - } - - char cmd_buf[1 << 16]; // this can be long - long cmd_len = sizeof(cmd_buf) - 1; - char* cmd_ptr = cmd_buf; - long cmd_off = 0; - - cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, "%s", - wsrep_notify_cmd); - - if (status >= WSREP_MEMBER_UNDEFINED && status < WSREP_MEMBER_ERROR) - { - cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --status %s", - _status_str(status)); - } - else - { - /* here we preserve provider error codes */ - cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, - " --status 'Error(%d)'", status); - } - - if (0 != view) - { - char uuid_str[40]; - - wsrep_uuid_print (&view->uuid, uuid_str, sizeof(uuid_str)); - cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, - " --uuid %s", uuid_str); - - cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, - " --primary %s", view->view >= 0 ? "yes" : "no"); - - cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, - " --index %d", view->my_idx); - - cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, " --members"); - - for (int i = 0; i < view->memb_num; i++) - { - wsrep_uuid_print (&view->members[i].id, uuid_str, sizeof(uuid_str)); - cmd_off += snprintf (cmd_ptr + cmd_off, cmd_len - cmd_off, - "%c%s/%s/%s", i > 0 ? ',' : ' ', - uuid_str, view->members[i].name, - view->members[i].incoming); - } - } - - if (cmd_off == cmd_len) - { - WSREP_ERROR("Notification buffer too short (%ld). Aborting notification.", - cmd_len); - return; - } - - wsp::process p(cmd_ptr, "r"); - - p.wait(); - int err = p.error(); - - if (err) - { - WSREP_ERROR("Notification command failed: %d (%s): \"%s\"", - err, strerror(err), cmd_ptr); - } -} - diff --git a/sql/wsrep_priv.h b/sql/wsrep_priv.h deleted file mode 100644 index 700639ebcb1..00000000000 --- a/sql/wsrep_priv.h +++ /dev/null @@ -1,233 +0,0 @@ -/* Copyright 2010 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ - -//! @file declares symbols private to wsrep integration layer - -#ifndef WSREP_PRIV_H -#define WSREP_PRIV_H - -#include "wsrep_mysqld.h" -#include "../wsrep/wsrep_api.h" - -#include <log.h> -#include <pthread.h> -#include <cstdio> - -extern void wsrep_ready_set (my_bool x); - -extern ssize_t wsrep_sst_prepare (void** msg); -extern int wsrep_sst_donate_cb (void* app_ctx, - void* recv_ctx, - const void* msg, size_t msg_len, - const wsrep_uuid_t* current_uuid, - wsrep_seqno_t current_seqno, - const char* state, size_t state_len, - bool bypass); - -extern size_t guess_ip (char* buf, size_t buf_len); -extern size_t guess_address(char* buf, size_t buf_len); - -extern wsrep_uuid_t local_uuid; -extern wsrep_seqno_t local_seqno; - -/*! SST thread signals init thread about sst completion */ -extern void wsrep_sst_complete(const wsrep_uuid_t* uuid, wsrep_seqno_t, bool); - -extern void wsrep_notify_status (wsrep_member_status_t new_status, - const wsrep_view_info_t* view = 0); - -namespace wsp { -class node_status -{ -public: - node_status() : status(WSREP_MEMBER_UNDEFINED) {} - void set(wsrep_member_status_t new_status, - const wsrep_view_info_t* view = 0) - { - if (status != new_status || 0 != view) - { - wsrep_notify_status(new_status, view); - status = new_status; - } - } - wsrep_member_status_t get() const { return status; } -private: - wsrep_member_status_t status; -}; -} /* namespace wsp */ - -extern wsp::node_status local_status; - -namespace wsp { -/* A small class to run external programs. */ -class process -{ -private: - const char* const str_; - FILE* io_; - int err_; - pid_t pid_; - -public: -/*! @arg type is a pointer to a null-terminated string which must contain - either the letter 'r' for reading or the letter 'w' for writing. - */ - process (const char* cmd, const char* type); - ~process (); - - FILE* pipe () { return io_; } - int error() { return err_; } - int wait (); - const char* cmd() { return str_; } -}; -#ifdef REMOVED -class lock -{ - pthread_mutex_t* const mtx_; - -public: - - lock (pthread_mutex_t* mtx) : mtx_(mtx) - { - int err = pthread_mutex_lock (mtx_); - - if (err) - { - WSREP_ERROR("Mutex lock failed: %s", strerror(err)); - abort(); - } - } - - virtual ~lock () - { - int err = pthread_mutex_unlock (mtx_); - - if (err) - { - WSREP_ERROR("Mutex unlock failed: %s", strerror(err)); - abort(); - } - } - - inline void wait (pthread_cond_t* cond) - { - pthread_cond_wait (cond, mtx_); - } - -private: - - lock (const lock&); - lock& operator=(const lock&); - -}; - -class monitor -{ - int mutable refcnt; - pthread_mutex_t mutable mtx; - pthread_cond_t mutable cond; - -public: - - monitor() : refcnt(0) - { - pthread_mutex_init (&mtx, NULL); - pthread_cond_init (&cond, NULL); - } - - ~monitor() - { - pthread_mutex_destroy (&mtx); - pthread_cond_destroy (&cond); - } - - void enter() const - { - lock l(&mtx); - - while (refcnt) - { - l.wait(&cond); - } - refcnt++; - } - - void leave() const - { - lock l(&mtx); - - refcnt--; - if (refcnt == 0) - { - pthread_cond_signal (&cond); - } - } - -private: - - monitor (const monitor&); - monitor& operator= (const monitor&); -}; - -class critical -{ - const monitor& mon; - -public: - - critical(const monitor& m) : mon(m) { mon.enter(); } - - ~critical() { mon.leave(); } - -private: - - critical (const critical&); - critical& operator= (const critical&); -}; -#endif - -class thd -{ - class thd_init - { - public: - thd_init() { my_thread_init(); } - ~thd_init() { my_thread_end(); } - } - init; - - thd (const thd&); - thd& operator= (const thd&); - -public: - - thd(my_bool wsrep_on); - ~thd(); - THD* const ptr; -}; - -class string -{ -public: - string() : string_(0) {} - void set(char* str) { if (string_) free (string_); string_ = str; } - ~string() { set (0); } -private: - char* string_; -}; - -} // namespace wsrep -#endif /* WSREP_PRIV_H */ diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc deleted file mode 100644 index 597d0ea087d..00000000000 --- a/sql/wsrep_sst.cc +++ /dev/null @@ -1,1001 +0,0 @@ -/* Copyright 2008-2012 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include <mysqld.h> -#include <sql_class.h> -#include <set_var.h> -#include <sql_acl.h> -#include <sql_reload.h> -#include <sql_parse.h> -#include "wsrep_priv.h" -#include <cstdio> -#include <cstdlib> - -extern const char wsrep_defaults_file[]; - -#define WSREP_SST_OPT_ROLE "--role" -#define WSREP_SST_OPT_ADDR "--address" -#define WSREP_SST_OPT_AUTH "--auth" -#define WSREP_SST_OPT_DATA "--datadir" -#define WSREP_SST_OPT_CONF "--defaults-file" -#define WSREP_SST_OPT_PARENT "--parent" - -// mysqldump-specific options -#define WSREP_SST_OPT_USER "--user" -#define WSREP_SST_OPT_PSWD "--password" -#define WSREP_SST_OPT_HOST "--host" -#define WSREP_SST_OPT_PORT "--port" -#define WSREP_SST_OPT_LPORT "--local-port" - -// donor-specific -#define WSREP_SST_OPT_SOCKET "--socket" -#define WSREP_SST_OPT_GTID "--gtid" -#define WSREP_SST_OPT_BYPASS "--bypass" - -#define WSREP_SST_MYSQLDUMP "mysqldump" -#define WSREP_SST_SKIP "skip" -#define WSREP_SST_DEFAULT WSREP_SST_MYSQLDUMP -#define WSREP_SST_ADDRESS_AUTO "AUTO" -#define WSREP_SST_AUTH_MASK "********" - -const char* wsrep_sst_method = WSREP_SST_DEFAULT; -const char* wsrep_sst_receive_address = WSREP_SST_ADDRESS_AUTO; -const char* wsrep_sst_donor = ""; - char* wsrep_sst_auth = NULL; - -// container for real auth string -static const char* sst_auth_real = NULL; - -my_bool wsrep_sst_donor_rejects_queries = FALSE; - -bool wsrep_sst_method_check (sys_var *self, THD* thd, set_var* var) -{ - char buff[FN_REFLEN]; - String str(buff, sizeof(buff), system_charset_info), *res; - const char* c_str = NULL; - - if ((res = var->value->val_str(&str)) && - (c_str = res->c_ptr()) && - strlen(c_str) > 0) - return 0; - - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_method", c_str ? c_str : "NULL"); - return 1; -} - -bool wsrep_sst_method_update (sys_var *self, THD* thd, enum_var_type type) -{ - return 0; -} - -static bool sst_receive_address_check (const char* str) -{ - if (!strncasecmp(str, "127.0.0.1", strlen("127.0.0.1")) || - !strncasecmp(str, "localhost", strlen("localhost"))) - { - return 1; - } - - return 0; -} - -bool wsrep_sst_receive_address_check (sys_var *self, THD* thd, set_var* var) -{ - const char* c_str = var->value->str_value.c_ptr(); - - if (sst_receive_address_check (c_str)) - { - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "wsrep_sst_receive_address", c_str ? c_str : "NULL"); - return 1; - } - - return 0; -} - -bool wsrep_sst_receive_address_update (sys_var *self, THD* thd, - enum_var_type type) -{ - return 0; -} - -bool wsrep_sst_auth_check (sys_var *self, THD* thd, set_var* var) -{ - return 0; -} -static bool sst_auth_real_set (const char* value) -{ - const char* v = strdup (value); - - if (v) - { - if (sst_auth_real) free (const_cast<char*>(sst_auth_real)); - sst_auth_real = v; - - if (strlen(sst_auth_real)) - { - if (wsrep_sst_auth) - { - my_free ((void*)wsrep_sst_auth); - wsrep_sst_auth = my_strdup(WSREP_SST_AUTH_MASK, MYF(0)); - //strncpy (wsrep_sst_auth, WSREP_SST_AUTH_MASK, - // sizeof(wsrep_sst_auth) - 1); - } - else - wsrep_sst_auth = my_strdup (WSREP_SST_AUTH_MASK, MYF(0)); - } - return 0; - } - - return 1; -} - -bool wsrep_sst_auth_update (sys_var *self, THD* thd, enum_var_type type) -{ - return sst_auth_real_set (wsrep_sst_auth); -} - -void wsrep_sst_auth_init (const char* value) -{ - if (wsrep_sst_auth == value) wsrep_sst_auth = NULL; - if (value) sst_auth_real_set (value); -} - -bool wsrep_sst_donor_check (sys_var *self, THD* thd, set_var* var) -{ - return 0; -} - -bool wsrep_sst_donor_update (sys_var *self, THD* thd, enum_var_type type) -{ - return 0; -} - -static wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; - -bool wsrep_before_SE() -{ - return (wsrep_provider != NULL - && strcmp (wsrep_provider, WSREP_NONE) - && strcmp (wsrep_sst_method, WSREP_SST_SKIP) - && strcmp (wsrep_sst_method, WSREP_SST_MYSQLDUMP)); -} - -static bool sst_complete = false; -static bool sst_needed = false; - -void wsrep_sst_grab () -{ - WSREP_INFO("wsrep_sst_grab()"); - if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); - sst_complete = false; - mysql_mutex_unlock (&LOCK_wsrep_sst); -} - -// Wait for end of SST -bool wsrep_sst_wait () -{ - if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); - while (!sst_complete) - { - WSREP_INFO("Waiting for SST to complete."); - mysql_cond_wait (&COND_wsrep_sst, &LOCK_wsrep_sst); - } - - if (local_seqno >= 0) - { - WSREP_INFO("SST complete, seqno: %lld", (long long) local_seqno); - } - else - { - WSREP_ERROR("SST failed: %d (%s)", - int(-local_seqno), strerror(-local_seqno)); - } - - mysql_mutex_unlock (&LOCK_wsrep_sst); - - return (local_seqno >= 0); -} - -// Signal end of SST -void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid, - wsrep_seqno_t sst_seqno, - bool needed) -{ - if (mysql_mutex_lock (&LOCK_wsrep_sst)) abort(); - if (!sst_complete) - { - sst_complete = true; - sst_needed = needed; - local_uuid = *sst_uuid; - local_seqno = sst_seqno; - mysql_cond_signal (&COND_wsrep_sst); - } - else - { - WSREP_WARN("Nobody is waiting for SST."); - } - mysql_mutex_unlock (&LOCK_wsrep_sst); -} - -// Let applier threads to continue -void wsrep_sst_continue () -{ - if (sst_needed) - { - WSREP_INFO("Signalling provider to continue."); - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); - } -} - -struct sst_thread_arg -{ - const char* cmd; - int err; - char* ret_str; - mysql_mutex_t lock; - mysql_cond_t cond; - - sst_thread_arg (const char* c) : cmd(c), err(-1), ret_str(0) - { - mysql_mutex_init(key_LOCK_wsrep_sst_thread, &lock, MY_MUTEX_INIT_FAST); - mysql_cond_init(key_COND_wsrep_sst_thread, &cond, NULL); - } - - ~sst_thread_arg() - { - mysql_cond_destroy (&cond); - mysql_mutex_unlock (&lock); - mysql_mutex_destroy (&lock); - } -}; - -static int sst_scan_uuid_seqno (const char* str, - wsrep_uuid_t* uuid, wsrep_seqno_t* seqno) -{ - int offt = wsrep_uuid_scan (str, strlen(str), uuid); - if (offt > 0 && strlen(str) > (unsigned int)offt && ':' == str[offt]) - { - *seqno = strtoll (str + offt + 1, NULL, 10); - if (*seqno != LLONG_MAX || errno != ERANGE) - { - return 0; - } - } - - WSREP_ERROR("Failed to parse uuid:seqno pair: '%s'", str); - return EINVAL; -} - -// get rid of trailing \n -static char* my_fgets (char* buf, size_t buf_len, FILE* stream) -{ - char* ret= fgets (buf, buf_len, stream); - - if (ret) - { - size_t len = strlen(ret); - if (len > 0 && ret[len - 1] == '\n') ret[len - 1] = '\0'; - } - - return ret; -} - -static void* sst_joiner_thread (void* a) -{ - sst_thread_arg* arg= (sst_thread_arg*) a; - int err= 1; - - { - const char magic[] = "ready"; - const size_t magic_len = sizeof(magic) - 1; - const size_t out_len = 512; - char out[out_len]; - - WSREP_INFO("Running: '%s'", arg->cmd); - - wsp::process proc (arg->cmd, "r"); - - if (proc.pipe() && !proc.error()) - { - const char* tmp= my_fgets (out, out_len, proc.pipe()); - - if (!tmp || strlen(tmp) < (magic_len + 2) || - strncasecmp (tmp, magic, magic_len)) - { - WSREP_ERROR("Failed to read '%s <addr>' from: %s\n\tRead: '%s'", - magic, arg->cmd, tmp); - proc.wait(); - if (proc.error()) err = proc.error(); - } - else - { - err = 0; - } - } - else - { - err = proc.error(); - WSREP_ERROR("Failed to execute: %s : %d (%s)", - arg->cmd, err, strerror(err)); - } - - // signal sst_prepare thread with ret code, - // it will go on sending SST request - mysql_mutex_lock (&arg->lock); - if (!err) - { - arg->ret_str = strdup (out + magic_len + 1); - if (!arg->ret_str) err = ENOMEM; - } - arg->err = -err; - mysql_cond_signal (&arg->cond); - mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. - - if (err) return NULL; /* lp:808417 - return immediately, don't signal - * initializer thread to ensure single thread of - * shutdown. */ - - wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; - wsrep_seqno_t ret_seqno = WSREP_SEQNO_UNDEFINED; - - // in case of successfull receiver start, wait for SST completion/end - char* tmp = my_fgets (out, out_len, proc.pipe()); - - proc.wait(); - err= EINVAL; - - if (!tmp) - { - WSREP_ERROR("Failed to read uuid:seqno from joiner script."); - if (proc.error()) err = proc.error(); - } - else - { - err= sst_scan_uuid_seqno (out, &ret_uuid, &ret_seqno); - } - - if (err) - { - ret_uuid= WSREP_UUID_UNDEFINED; - ret_seqno= -err; - } - - // Tell initializer thread that SST is complete - wsrep_sst_complete (&ret_uuid, ret_seqno, true); - } - - return NULL; -} - -static ssize_t sst_prepare_other (const char* method, - const char* addr_in, - const char** addr_out) -{ - ssize_t cmd_len= 1024; - char cmd_str[cmd_len]; - const char* sst_dir= mysql_real_data_home; - - int ret= snprintf (cmd_str, cmd_len, - "wsrep_sst_%s " - WSREP_SST_OPT_ROLE" 'joiner' " - WSREP_SST_OPT_ADDR" '%s' " - WSREP_SST_OPT_AUTH" '%s' " - WSREP_SST_OPT_DATA" '%s' " - WSREP_SST_OPT_CONF" '%s' " - WSREP_SST_OPT_PARENT" '%d'", - method, addr_in, (sst_auth_real) ? sst_auth_real : "", - sst_dir, wsrep_defaults_file, (int)getpid()); - - if (ret < 0 || ret >= cmd_len) - { - WSREP_ERROR("sst_prepare_other(): snprintf() failed: %d", ret); - return (ret < 0 ? ret : -EMSGSIZE); - } - - pthread_t tmp; - sst_thread_arg arg(cmd_str); - mysql_mutex_lock (&arg.lock); - ret = pthread_create (&tmp, NULL, sst_joiner_thread, &arg); - if (ret) - { - WSREP_ERROR("sst_prepare_other(): pthread_create() failed: %d (%s)", - ret, strerror(ret)); - return ret; - } - mysql_cond_wait (&arg.cond, &arg.lock); - - *addr_out= arg.ret_str; - - if (!arg.err) - ret = strlen(*addr_out); - else - { - assert (arg.err < 0); - ret = arg.err; - } - - pthread_detach (tmp); - - return ret; -} - -//extern ulong my_bind_addr; -extern uint mysqld_port; - -/*! Just tells donor where to send mysqldump */ -static ssize_t sst_prepare_mysqldump (const char* addr_in, - const char** addr_out) -{ - ssize_t ret = strlen (addr_in); - - if (!strrchr(addr_in, ':')) - { - ssize_t s = ret + 7; - char* tmp = (char*) malloc (s); - - if (tmp) - { - ret= snprintf (tmp, s, "%s:%u", addr_in, mysqld_port); - - if (ret > 0 && ret < s) - { - *addr_out= tmp; - return ret; - } - if (ret > 0) /* buffer too short */ ret = -EMSGSIZE; - free (tmp); - } - else { - ret= -ENOMEM; - } - - WSREP_ERROR ("Could not prepare state transfer request: " - "adding default port failed: %zd.", ret); - } - else { - *addr_out= addr_in; - } - - return ret; -} - -static bool SE_initialized = false; - -ssize_t wsrep_sst_prepare (void** msg) -{ - const ssize_t ip_max= 256; - char ip_buf[ip_max]; - const char* addr_in= NULL; - const char* addr_out= NULL; - - if (!strcmp(wsrep_sst_method, WSREP_SST_SKIP)) - { - ssize_t ret = strlen(WSREP_STATE_TRANSFER_TRIVIAL) + 1; - *msg = strdup(WSREP_STATE_TRANSFER_TRIVIAL); - if (!msg) - { - WSREP_ERROR("Could not allocate %zd bytes for state request", ret); - unireg_abort(1); - } - return ret; - } - - // Figure out SST address. Common for all SST methods - if (wsrep_sst_receive_address && - strcmp (wsrep_sst_receive_address, WSREP_SST_ADDRESS_AUTO)) - { - addr_in= wsrep_sst_receive_address; - } - else if (wsrep_node_address && strlen(wsrep_node_address)) - { - const char* const colon= strchr (wsrep_node_address, ':'); - if (colon) - { - ptrdiff_t const len= colon - wsrep_node_address; - strncpy (ip_buf, wsrep_node_address, len); - ip_buf[len]= '\0'; - addr_in= ip_buf; - } - else - { - addr_in= wsrep_node_address; - } - } - else - { - ssize_t ret= guess_ip (ip_buf, ip_max); - - if (ret && ret < ip_max) - { - addr_in= ip_buf; - } - else - { - WSREP_ERROR("Could not prepare state transfer request: " - "failed to guess address to accept state transfer at. " - "wsrep_sst_receive_address must be set manually."); - unireg_abort(1); - } - } - - ssize_t addr_len= -ENOSYS; - if (!strcmp(wsrep_sst_method, WSREP_SST_MYSQLDUMP)) - { - addr_len= sst_prepare_mysqldump (addr_in, &addr_out); - if (addr_len < 0) unireg_abort(1); - } - else - { - /*! A heuristic workaround until we learn how to stop and start engines */ - if (SE_initialized) - { - // we already did SST at initializaiton, now engines are running - // sql_print_information() is here because the message is too long - // for WSREP_INFO. - sql_print_information ("WSREP: " - "You have configured '%s' state snapshot transfer method " - "which cannot be performed on a running server. " - "Wsrep provider won't be able to fall back to it " - "if other means of state transfer are unavailable. " - "In that case you will need to restart the server.", - wsrep_sst_method); - *msg = 0; - return 0; - } - - addr_len = sst_prepare_other (wsrep_sst_method, addr_in, &addr_out); - if (addr_len < 0) - { - WSREP_ERROR("Failed to prepare for '%s' SST. Unrecoverable.", - wsrep_sst_method); - unireg_abort(1); - } - } - - size_t const method_len(strlen(wsrep_sst_method)); - size_t const msg_len (method_len + addr_len + 2 /* + auth_len + 1*/); - - *msg = malloc (msg_len); - if (NULL != *msg) { - char* const method_ptr(reinterpret_cast<char*>(*msg)); - strcpy (method_ptr, wsrep_sst_method); - char* const addr_ptr(method_ptr + method_len + 1); - strcpy (addr_ptr, addr_out); - - WSREP_INFO ("Prepared SST request: %s|%s", method_ptr, addr_ptr); - } - else { - WSREP_ERROR("Failed to allocate SST request of size %zu. Can't continue.", - msg_len); - unireg_abort(1); - } - - if (addr_out != addr_in) /* malloc'ed */ free ((char*)addr_out); - - return msg_len; -} - -// helper method for donors -static int sst_run_shell (const char* cmd_str, int max_tries) -{ - int ret = 0; - - for (int tries=1; tries <= max_tries; tries++) - { - wsp::process proc (cmd_str, "r"); - - if (NULL != proc.pipe()) - { - proc.wait(); - } - - if ((ret = proc.error())) - { - WSREP_ERROR("Try %d/%d: '%s' failed: %d (%s)", - tries, max_tries, proc.cmd(), ret, strerror(ret)); - sleep (1); - } - else - { - WSREP_DEBUG("SST script successfully completed."); - break; - } - } - - return -ret; -} - -static void sst_reject_queries(my_bool close_conn) -{ - wsrep_ready_set (FALSE); // this will be resotred when donor becomes synced - WSREP_INFO("Rejecting client queries for the duration of SST."); - if (TRUE == close_conn) wsrep_close_client_connections(FALSE); -} - -static int sst_mysqldump_check_addr (const char* user, const char* pswd, - const char* host, const char* port) -{ - return 0; -} - -static int sst_donate_mysqldump (const char* addr, - const wsrep_uuid_t* uuid, - const char* uuid_str, - wsrep_seqno_t seqno, - bool bypass) -{ - size_t host_len; - const char* port = strchr (addr, ':'); - - if (port) - { - port += 1; - host_len = port - addr; - } - else - { - port = ""; - host_len = strlen (addr) + 1; - } - - char host[host_len]; - - strncpy (host, addr, host_len - 1); - host[host_len - 1] = '\0'; - - const char* auth = sst_auth_real; - const char* pswd = (auth) ? strchr (auth, ':') : NULL; - size_t user_len; - - if (pswd) - { - pswd += 1; - user_len = pswd - auth; - } - else - { - pswd = ""; - user_len = (auth) ? strlen (auth) + 1 : 1; - } - - char user[user_len]; - - strncpy (user, (auth) ? auth : "", user_len - 1); - user[user_len - 1] = '\0'; - - int ret = sst_mysqldump_check_addr (user, pswd, host, port); - if (!ret) - { - size_t cmd_len= 1024; - char cmd_str[cmd_len]; - - if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(TRUE); - - snprintf (cmd_str, cmd_len, - "wsrep_sst_mysqldump " - WSREP_SST_OPT_USER" '%s' " - WSREP_SST_OPT_PSWD" '%s' " - WSREP_SST_OPT_HOST" '%s' " - WSREP_SST_OPT_PORT" '%s' " - WSREP_SST_OPT_LPORT" '%u' " - WSREP_SST_OPT_SOCKET" '%s' " - WSREP_SST_OPT_GTID" '%s:%lld'" - "%s", - user, pswd, host, port, mysqld_port, mysqld_unix_port, uuid_str, - (long long)seqno, bypass ? " "WSREP_SST_OPT_BYPASS : ""); - - WSREP_DEBUG("Running: '%s'", cmd_str); - - ret= sst_run_shell (cmd_str, 3); - } - - wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno); - - return ret; -} - -wsrep_seqno_t wsrep_locked_seqno= WSREP_SEQNO_UNDEFINED; - -static int run_sql_command(THD *thd, const char *query) -{ - thd->set_query((char *)query, strlen(query)); - - Parser_state ps; - if (ps.init(thd, thd->query(), thd->query_length())) - { - WSREP_ERROR("SST query: %s failed", query); - return -1; - } - - mysql_parse(thd, thd->query(), thd->query_length(), &ps); - if (thd->is_error()) - { - int const err= thd->stmt_da->sql_errno(); - WSREP_WARN ("error executing '%s': %d (%s)%s", - query, err, thd->stmt_da->message(), - err == ER_UNKNOWN_SYSTEM_VARIABLE ? - ". Was mysqld built with --with-innodb-disallow-writes ?" : ""); - thd->clear_error(); - return -1; - } - return 0; -} - -static int sst_flush_tables(THD* thd) -{ - WSREP_INFO("Flushing tables for SST..."); - - int err; - int not_used; - if (run_sql_command(thd, "FLUSH TABLES WITH READ LOCK")) - { - WSREP_ERROR("Failed to flush and lock tables"); - err = -1; - } - else - { - /* make sure logs are flushed after global read lock acquired */ - err= reload_acl_and_cache(thd, REFRESH_ENGINE_LOG, - (TABLE_LIST*) 0, ¬_used); - } - - if (err) - { - WSREP_ERROR("Failed to flush tables: %d (%s)", err, strerror(err)); - } - else - { - WSREP_INFO("Tables flushed."); - const char base_name[]= "tables_flushed"; - ssize_t const full_len= strlen(mysql_real_data_home) + strlen(base_name)+2; - char real_name[full_len]; - sprintf(real_name, "%s/%s", mysql_real_data_home, base_name); - char tmp_name[full_len + 4]; - sprintf(tmp_name, "%s.tmp", real_name); - - FILE* file= fopen(tmp_name, "w+"); - if (0 == file) - { - err= errno; - WSREP_ERROR("Failed to open '%s': %d (%s)", tmp_name, err,strerror(err)); - } - else - { - fprintf(file, "%s:%lld\n", - wsrep_cluster_state_uuid, (long long)wsrep_locked_seqno); - fsync(fileno(file)); - fclose(file); - if (rename(tmp_name, real_name) == -1) - { - err= errno; - WSREP_ERROR("Failed to rename '%s' to '%s': %d (%s)", - tmp_name, real_name, err,strerror(err)); - } - } - } - - return err; -} - -static void sst_disallow_writes (THD* thd, bool yes) -{ - char query_str[64] = { 0, }; - ssize_t const query_max = sizeof(query_str) - 1; - snprintf (query_str, query_max, "SET GLOBAL innodb_disallow_writes=%d", - yes ? 1 : 0); - - if (run_sql_command(thd, query_str)) - { - WSREP_ERROR("Failed to disallow InnoDB writes"); - } -} - -static void* sst_donor_thread (void* a) -{ - sst_thread_arg* arg= (sst_thread_arg*)a; - - WSREP_INFO("Running: '%s'", arg->cmd); - - int err= 1; - bool locked= false; - - const char* out= NULL; - const size_t out_len= 128; - char out_buf[out_len]; - - wsrep_uuid_t ret_uuid= WSREP_UUID_UNDEFINED; - wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; // seqno of complete SST - - wsp::thd thd(FALSE); // we turn off wsrep_on for this THD so that it can - // operate with wsrep_ready == OFF - wsp::process proc(arg->cmd, "r"); - - err= proc.error(); - -/* Inform server about SST script startup and release TO isolation */ - mysql_mutex_lock (&arg->lock); - arg->err = -err; - mysql_cond_signal (&arg->cond); - mysql_mutex_unlock (&arg->lock); //! @note arg is unusable after that. - - if (proc.pipe() && !err) - { -wait_signal: - out= my_fgets (out_buf, out_len, proc.pipe()); - - if (out) - { - const char magic_flush[]= "flush tables"; - const char magic_cont[]= "continue"; - const char magic_done[]= "done"; - - if (!strcasecmp (out, magic_flush)) - { - err= sst_flush_tables (thd.ptr); - if (!err) - { - sst_disallow_writes (thd.ptr, true); - locked= true; - goto wait_signal; - } - } - else if (!strcasecmp (out, magic_cont)) - { - if (locked) - { - sst_disallow_writes (thd.ptr, false); - thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr); - locked= false; - } - err= 0; - goto wait_signal; - } - else if (!strncasecmp (out, magic_done, strlen(magic_done))) - { - err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1, - &ret_uuid, &ret_seqno); - } - else - { - WSREP_WARN("Received unknown signal: '%s'", out); - } - } - else - { - WSREP_ERROR("Failed to read from: %s", proc.cmd()); - } - if (err && proc.error()) err= proc.error(); - } - else - { - WSREP_ERROR("Failed to execute: %s : %d (%s)", - proc.cmd(), err, strerror(err)); - } - - if (locked) // don't forget to unlock server before return - { - sst_disallow_writes (thd.ptr, false); - thd.ptr->global_read_lock.unlock_global_read_lock (thd.ptr); - } - - // signal to donor that SST is over - wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno); - proc.wait(); - - return NULL; -} - -static int sst_donate_other (const char* method, - const char* addr, - const char* uuid, - wsrep_seqno_t seqno, - bool bypass) -{ - ssize_t cmd_len = 4096; - char cmd_str[cmd_len]; - - int ret= snprintf (cmd_str, cmd_len, - "wsrep_sst_%s " - WSREP_SST_OPT_ROLE" 'donor' " - WSREP_SST_OPT_ADDR" '%s' " - WSREP_SST_OPT_AUTH" '%s' " - WSREP_SST_OPT_SOCKET" '%s' " - WSREP_SST_OPT_DATA" '%s' " - WSREP_SST_OPT_CONF" '%s' " - WSREP_SST_OPT_GTID" '%s:%lld'" - "%s", - method, addr, sst_auth_real, mysqld_unix_port, - mysql_real_data_home, wsrep_defaults_file, - uuid, (long long) seqno, - bypass ? " "WSREP_SST_OPT_BYPASS : ""); - - if (ret < 0 || ret >= cmd_len) - { - WSREP_ERROR("sst_donate_other(): snprintf() failed: %d", ret); - return (ret < 0 ? ret : -EMSGSIZE); - } - - if (!bypass && wsrep_sst_donor_rejects_queries) sst_reject_queries(FALSE); - - pthread_t tmp; - sst_thread_arg arg(cmd_str); - mysql_mutex_lock (&arg.lock); - ret = pthread_create (&tmp, NULL, sst_donor_thread, &arg); - if (ret) - { - WSREP_ERROR("sst_donate_other(): pthread_create() failed: %d (%s)", - ret, strerror(ret)); - return ret; - } - mysql_cond_wait (&arg.cond, &arg.lock); - - WSREP_INFO("sst_donor_thread signaled with %d", arg.err); - return arg.err; -} - -int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, - const void* msg, size_t msg_len, - const wsrep_uuid_t* current_uuid, - wsrep_seqno_t current_seqno, - const char* state, size_t state_len, - bool bypass) -{ - /* This will be reset when sync callback is called. - * Should we set wsrep_ready to FALSE here too? */ -// wsrep_notify_status(WSREP_MEMBER_DONOR); - local_status.set(WSREP_MEMBER_DONOR); - - const char* method = (char*)msg; - size_t method_len = strlen (method); - const char* data = method + method_len + 1; - - char uuid_str[37]; - wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str)); - - int ret; - if (!strcmp (WSREP_SST_MYSQLDUMP, method)) - { - ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno, - bypass); - } - else - { - ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass); - } - - return (ret > 0 ? 0 : ret); -} - -void wsrep_SE_init_grab() -{ - if (mysql_mutex_lock (&LOCK_wsrep_sst_init)) abort(); -} - -void wsrep_SE_init_wait() -{ - mysql_cond_wait (&COND_wsrep_sst_init, &LOCK_wsrep_sst_init); - mysql_mutex_unlock (&LOCK_wsrep_sst_init); -} - -void wsrep_SE_init_done() -{ - mysql_cond_signal (&COND_wsrep_sst_init); - mysql_mutex_unlock (&LOCK_wsrep_sst_init); -} - -void wsrep_SE_initialized() -{ - SE_initialized = true; -} diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc deleted file mode 100644 index daba0e4cab2..00000000000 --- a/sql/wsrep_utils.cc +++ /dev/null @@ -1,468 +0,0 @@ -/* Copyright 2010 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ - -//! @file declares symbols private to wsrep integration layer - -#ifndef _GNU_SOURCE -#define _GNU_SOURCE // POSIX_SPAWN_USEVFORK flag -#endif - -#include <spawn.h> // posix_spawn() -#include <unistd.h> // pipe() -#include <errno.h> // errno -#include <string.h> // strerror() -#include <sys/wait.h> // waitpid() - -#include <sql_class.h> -#include "wsrep_priv.h" - -extern char** environ; // environment variables - -static wsp::string wsrep_PATH; - -void -wsrep_prepend_PATH (const char* path) -{ - int count = 0; - - while (environ[count]) - { - if (strncmp (environ[count], "PATH=", 5)) - { - count++; - continue; - } - - char* const old_path (environ[count]); - - if (strstr (old_path, path)) return; // path already there - - size_t const new_path_len(strlen(old_path) + strlen(":") + - strlen(path) + 1); - - char* const new_path (reinterpret_cast<char*>(malloc(new_path_len))); - - if (new_path) - { - snprintf (new_path, new_path_len, "PATH=%s:%s", path, - old_path + strlen("PATH=")); - - wsrep_PATH.set (new_path); - environ[count] = new_path; - } - else - { - WSREP_ERROR ("Failed to allocate 'PATH' environment variable " - "buffer of size %zu.", new_path_len); - } - - return; - } - - WSREP_ERROR ("Failed to find 'PATH' environment variable. " - "State snapshot transfer may not be working."); -} - -namespace wsp -{ - -#define PIPE_READ 0 -#define PIPE_WRITE 1 -#define STDIN_FD 0 -#define STDOUT_FD 1 - -#ifndef POSIX_SPAWN_USEVFORK -# define POSIX_SPAWN_USEVFORK 0 -#endif - -process::process (const char* cmd, const char* type) - : str_(cmd ? strdup(cmd) : strdup("")), io_(NULL), err_(EINVAL), pid_(0) -{ - if (0 == str_) - { - WSREP_ERROR ("Can't allocate command line of size: %zu", strlen(cmd)); - err_ = ENOMEM; - return; - } - - if (0 == strlen(str_)) - { - WSREP_ERROR ("Can't start a process: null or empty command line."); - return; - } - - if (NULL == type || (strcmp (type, "w") && strcmp(type, "r"))) - { - WSREP_ERROR ("type argument should be either \"r\" or \"w\"."); - return; - } - - int pipe_fds[2] = { -1, }; - if (::pipe(pipe_fds)) - { - err_ = errno; - WSREP_ERROR ("pipe() failed: %d (%s)", err_, strerror(err_)); - return; - } - - // which end of pipe will be returned to parent - int const parent_end (strcmp(type,"w") ? PIPE_READ : PIPE_WRITE); - int const child_end (parent_end == PIPE_READ ? PIPE_WRITE : PIPE_READ); - int const close_fd (parent_end == PIPE_READ ? STDOUT_FD : STDIN_FD); - - char* const pargv[4] = { strdup("sh"), strdup("-c"), strdup(str_), NULL }; - if (!(pargv[0] && pargv[1] && pargv[2])) - { - err_ = ENOMEM; - WSREP_ERROR ("Failed to allocate pargv[] array."); - goto cleanup_pipe; - } - - posix_spawnattr_t attr; - err_ = posix_spawnattr_init (&attr); - if (err_) - { - WSREP_ERROR ("posix_spawnattr_init() failed: %d (%s)", - err_, strerror(err_)); - goto cleanup_pipe; - } - - err_ = posix_spawnattr_setflags (&attr, POSIX_SPAWN_SETSIGDEF | - POSIX_SPAWN_USEVFORK); - if (err_) - { - WSREP_ERROR ("posix_spawnattr_setflags() failed: %d (%s)", - err_, strerror(err_)); - goto cleanup_attr; - } - - posix_spawn_file_actions_t fact; - err_ = posix_spawn_file_actions_init (&fact); - if (err_) - { - WSREP_ERROR ("posix_spawn_file_actions_init() failed: %d (%s)", - err_, strerror(err_)); - goto cleanup_attr; - } - - // close child's stdout|stdin depending on what we returning - err_ = posix_spawn_file_actions_addclose (&fact, close_fd); - if (err_) - { - WSREP_ERROR ("posix_spawn_file_actions_addclose() failed: %d (%s)", - err_, strerror(err_)); - goto cleanup_fact; - } - - // substitute our pipe descriptor in place of the closed one - err_ = posix_spawn_file_actions_adddup2 (&fact, - pipe_fds[child_end], close_fd); - if (err_) - { - WSREP_ERROR ("posix_spawn_file_actions_addup2() failed: %d (%s)", - err_, strerror(err_)); - goto cleanup_fact; - } - - err_ = posix_spawnp (&pid_, pargv[0], &fact, &attr, pargv, environ); - if (err_) - { - WSREP_ERROR ("posix_spawnp(%s) failed: %d (%s)", - pargv[2], err_, strerror(err_)); - pid_ = 0; // just to make sure it was not messed up in the call - goto cleanup_fact; - } - - io_ = fdopen (pipe_fds[parent_end], type); - - if (io_) - { - pipe_fds[parent_end] = -1; // skip close on cleanup - } - else - { - err_ = errno; - WSREP_ERROR ("fdopen() failed: %d (%s)", err_, strerror(err_)); - } - -cleanup_fact: - int err; // to preserve err_ code - err = posix_spawn_file_actions_destroy (&fact); - if (err) - { - WSREP_ERROR ("posix_spawn_file_actions_destroy() failed: %d (%s)\n", - err, strerror(err)); - } - -cleanup_attr: - err = posix_spawnattr_destroy (&attr); - if (err) - { - WSREP_ERROR ("posix_spawnattr_destroy() failed: %d (%s)", - err, strerror(err)); - } - -cleanup_pipe: - if (pipe_fds[0] >= 0) close (pipe_fds[0]); - if (pipe_fds[1] >= 0) close (pipe_fds[1]); - - free (pargv[0]); - free (pargv[1]); - free (pargv[2]); -} - -process::~process () -{ - if (io_) - { - assert (pid_); - assert (str_); - - WSREP_WARN("Closing pipe to child process: %s, PID(%ld) " - "which might still be running.", str_, (long)pid_); - - if (fclose (io_) == -1) - { - err_ = errno; - WSREP_ERROR("fclose() failed: %d (%s)", err_, strerror(err_)); - } - } - - if (str_) free (const_cast<char*>(str_)); -} - -int -process::wait () -{ - if (pid_) - { - int status; - if (-1 == waitpid(pid_, &status, 0)) - { - err_ = errno; assert (err_); - WSREP_ERROR("Waiting for process failed: %s, PID(%ld): %d (%s)", - str_, (long)pid_, err_, strerror (err_)); - } - else - { // command completed, check exit status - if (WIFEXITED (status)) { - err_ = WEXITSTATUS (status); - } - else { // command didn't complete with exit() - WSREP_ERROR("Process was aborted."); - err_ = errno ? errno : ECHILD; - } - - if (err_) { - switch (err_) /* Translate error codes to more meaningful */ - { - case 126: err_ = EACCES; break; /* Permission denied */ - case 127: err_ = ENOENT; break; /* No such file or directory */ - } - WSREP_ERROR("Process completed with error: %s: %d (%s)", - str_, err_, strerror(err_)); - } - - pid_ = 0; - if (io_) fclose (io_); - io_ = NULL; - } - } - else { - assert (NULL == io_); - WSREP_ERROR("Command did not run: %s", str_); - } - - return err_; -} - -thd::thd (my_bool won) : init(), ptr(new THD) -{ - if (ptr) - { - ptr->thread_stack= (char*) &ptr; - ptr->store_globals(); - ptr->variables.option_bits&= ~OPTION_BIN_LOG; // disable binlog - ptr->variables.wsrep_on = won; - ptr->security_ctx->master_access= ~(ulong)0; - lex_start(ptr); - } -} - -thd::~thd () -{ - if (ptr) - { - delete ptr; - my_pthread_setspecific_ptr (THR_THD, 0); - } -} - -} // namespace wsp - -extern ulong my_bind_addr; -extern uint mysqld_port; - -size_t guess_ip (char* buf, size_t buf_len) -{ - size_t ip_len = 0; - - if (htonl(INADDR_NONE) == my_bind_addr) { - WSREP_ERROR("Networking not configured, cannot receive state transfer."); - return 0; - } - - if (htonl(INADDR_ANY) != my_bind_addr) { - uint8_t* b = (uint8_t*)&my_bind_addr; - ip_len = snprintf (buf, buf_len, - "%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]); - return ip_len; - } - - // mysqld binds to all interfaces - try IP from wsrep_node_address - if (wsrep_node_address && wsrep_node_address[0] != '\0') { - const char* const colon_ptr = strchr(wsrep_node_address, ':'); - - if (colon_ptr) - ip_len = colon_ptr - wsrep_node_address; - else - ip_len = strlen(wsrep_node_address); - - if (ip_len >= buf_len) { - WSREP_WARN("default_ip(): buffer too short: %zu <= %zd", buf_len, ip_len); - return 0; - } - - memcpy (buf, wsrep_node_address, ip_len); - buf[ip_len] = '\0'; - return ip_len; - } - - // try to find the address of the first one -#if (TARGET_OS_LINUX == 1) - const char cmd[] = "/sbin/ifconfig | " -// "grep -m1 -1 -E '^[a-z]?eth[0-9]' | tail -n 1 | " - "grep -E '^[[:space:]]+inet addr:' | grep -m1 -v 'inet addr:127' | " - "sed 's/:/ /' | awk '{ print $3 }'"; -#elif defined(__sun__) - const char cmd[] = "/sbin/ifconfig -a | " - "/usr/gnu/bin/grep -m1 -1 -E 'net[0-9]:' | tail -n 1 | awk '{ print $2 }'"; -#else - char *cmd; -#error "OS not supported" -#endif - wsp::process proc (cmd, "r"); - - if (NULL != proc.pipe()) { - char* ret; - - ret = fgets (buf, buf_len, proc.pipe()); - - if (proc.wait()) return 0; - - if (NULL == ret) { - WSREP_ERROR("Failed to read output of: '%s'", cmd); - return 0; - } - } - else { - WSREP_ERROR("Failed to execute: '%s'", cmd); - return 0; - } - - // clear possible \n at the end of ip string left by fgets() - ip_len = strlen (buf); - if (ip_len > 0 && '\n' == buf[ip_len - 1]) { - ip_len--; - buf[ip_len] = '\0'; - } - - if (INADDR_NONE == inet_addr(buf)) { - if (strlen(buf) != 0) { - WSREP_WARN("Shell command returned invalid address: '%s'", buf); - } - return 0; - } - - return ip_len; -} - -size_t guess_address(char* buf, size_t buf_len) -{ - size_t addr_len = guess_ip (buf, buf_len); - - if (addr_len && addr_len < buf_len) { - addr_len += snprintf (buf + addr_len, buf_len - addr_len, - ":%u", mysqld_port); - } - - return addr_len; -} - -/* - * WSREPXid - */ - -#define WSREP_XID_PREFIX "WSREPXid" -#define WSREP_XID_PREFIX_LEN MYSQL_XID_PREFIX_LEN -#define WSREP_XID_UUID_OFFSET 8 -#define WSREP_XID_SEQNO_OFFSET (WSREP_XID_UUID_OFFSET + sizeof(wsrep_uuid_t)) -#define WSREP_XID_GTRID_LEN (WSREP_XID_SEQNO_OFFSET + sizeof(wsrep_seqno_t)) - -void wsrep_xid_init(XID* xid, const wsrep_uuid_t* uuid, wsrep_seqno_t seqno) -{ - xid->formatID= 1; - xid->gtrid_length= WSREP_XID_GTRID_LEN; - xid->bqual_length= 0; - memset(xid->data, 0, sizeof(xid->data)); - memcpy(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN); - memcpy(xid->data + WSREP_XID_UUID_OFFSET, uuid, sizeof(wsrep_uuid_t)); - memcpy(xid->data + WSREP_XID_SEQNO_OFFSET, &seqno, sizeof(wsrep_seqno_t)); -} - -const wsrep_uuid_t* wsrep_xid_uuid(const XID* xid) -{ - if (wsrep_is_wsrep_xid(xid)) - return reinterpret_cast<const wsrep_uuid_t*>(xid->data - + WSREP_XID_UUID_OFFSET); - else - return &WSREP_UUID_UNDEFINED; -} - -wsrep_seqno_t wsrep_xid_seqno(const XID* xid) -{ - - if (wsrep_is_wsrep_xid(xid)) - { - wsrep_seqno_t seqno; - memcpy(&seqno, xid->data + WSREP_XID_SEQNO_OFFSET, sizeof(wsrep_seqno_t)); - return seqno; - } - else - { - return WSREP_SEQNO_UNDEFINED; - } -} - -extern "C" -int wsrep_is_wsrep_xid(const void* xid_ptr) -{ - const XID* xid= reinterpret_cast<const XID*>(xid_ptr); - return (xid->formatID == 1 && - xid->gtrid_length == WSREP_XID_GTRID_LEN && - xid->bqual_length == 0 && - !memcmp(xid->data, WSREP_XID_PREFIX, WSREP_XID_PREFIX_LEN)); -} diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc deleted file mode 100644 index 01f91b0a5ae..00000000000 --- a/sql/wsrep_var.cc +++ /dev/null @@ -1,548 +0,0 @@ -/* Copyright 2008 Codership Oy <http://www.codership.com> - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; version 2 of the License. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - -#include <mysqld.h> -#include <sql_class.h> -#include <sql_plugin.h> -#include <set_var.h> -#include <sql_acl.h> -#include "wsrep_priv.h" -#include <my_dir.h> -#include <cstdio> -#include <cstdlib> - -#define WSREP_START_POSITION_ZERO "00000000-0000-0000-0000-000000000000:-1" -#define WSREP_CLUSTER_NAME "my_wsrep_cluster" - -const char* wsrep_provider = 0; -const char* wsrep_provider_options = 0; -const char* wsrep_cluster_address = 0; -const char* wsrep_cluster_name = 0; -const char* wsrep_node_name = 0; -static char node_address[256] = { 0, }; -const char* wsrep_node_address = node_address; // ??? -const char* wsrep_start_position = 0; -ulong wsrep_OSU_method_options; -static int wsrep_thread_change = 0; - -int wsrep_init_vars() -{ - wsrep_provider = my_strdup(WSREP_NONE, MYF(MY_WME)); - wsrep_provider_options= my_strdup("", MYF(MY_WME)); - wsrep_cluster_address = my_strdup("", MYF(MY_WME)); - wsrep_cluster_name = my_strdup(WSREP_CLUSTER_NAME, MYF(MY_WME)); - wsrep_node_name = my_strdup("", MYF(MY_WME)); - wsrep_start_position = my_strdup(WSREP_START_POSITION_ZERO, MYF(MY_WME)); - - global_system_variables.binlog_format=BINLOG_FORMAT_ROW; - return 0; -} - -bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type) -{ - if (var_type == OPT_GLOBAL) { - // FIXME: this variable probably should be changed only per session - thd->variables.wsrep_on = global_system_variables.wsrep_on; - } - else { - } - -#ifdef REMOVED - if (thd->variables.wsrep_on) - thd->variables.option_bits |= (OPTION_BIN_LOG); - else - thd->variables.option_bits &= ~(OPTION_BIN_LOG); -#endif - return false; -} - -void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) -{ - if (var_type == OPT_GLOBAL) { - thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; - } - else { - } -} - -static int wsrep_start_position_verify (const char* start_str) -{ - size_t start_len; - wsrep_uuid_t uuid; - ssize_t uuid_len; - - start_len = strlen (start_str); - if (start_len < 34) - return 1; - - uuid_len = wsrep_uuid_scan (start_str, start_len, &uuid); - if (uuid_len < 0 || (start_len - uuid_len) < 2) - return 1; - - if (start_str[uuid_len] != ':') // separator should follow UUID - return 1; - - char* endptr; - wsrep_seqno_t const seqno __attribute__((unused)) // to avoid GCC warnings - (strtoll(&start_str[uuid_len + 1], &endptr, 10)); - - if (*endptr == '\0') return 0; // remaining string was seqno - - return 1; -} - -bool wsrep_start_position_check (sys_var *self, THD* thd, set_var* var) -{ - char buff[FN_REFLEN]; - String str(buff, sizeof(buff), system_charset_info), *res; - const char* start_str = NULL; - - if (!(res = var->value->val_str(&str))) goto err; - - start_str = res->c_ptr(); - - if (!start_str) goto err; - - if (!wsrep_start_position_verify(start_str)) return 0; - -err: - - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, - start_str ? start_str : "NULL"); - return 1; -} - -void wsrep_set_local_position (const char* value) -{ - size_t value_len = strlen (value); - size_t uuid_len = wsrep_uuid_scan (value, value_len, &local_uuid); - - local_seqno = strtoll (value + uuid_len + 1, NULL, 10); - - XID xid; - wsrep_xid_init(&xid, &local_uuid, local_seqno); - wsrep_set_SE_checkpoint(&xid); - WSREP_INFO ("wsrep_start_position var submitted: '%s'", wsrep_start_position); -} - -bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type) -{ - // since this value passed wsrep_start_position_check, don't check anything - // here - wsrep_set_local_position (wsrep_start_position); - - if (wsrep) { - wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); - } - - return 0; -} - -void wsrep_start_position_init (const char* val) -{ - if (NULL == val || wsrep_start_position_verify (val)) - { - WSREP_ERROR("Bad initial value for wsrep_start_position: %s", - (val ? val : "")); - return; - } - - wsrep_set_local_position (val); -} - -static bool refresh_provider_options() -{ - WSREP_DEBUG("refresh_provider_options: %s", - (wsrep_provider_options) ? wsrep_provider_options : "null"); - char* opts= wsrep->options_get(wsrep); - if (opts) - { - if (wsrep_provider_options) my_free((void *)wsrep_provider_options); - wsrep_provider_options = (char*)my_memdup(opts, strlen(opts) + 1, - MYF(MY_WME)); - } - else - { - WSREP_ERROR("Failed to get provider options"); - return true; - } - return false; -} - -static int wsrep_provider_verify (const char* provider_str) -{ - MY_STAT f_stat; - char path[FN_REFLEN]; - - if (!provider_str || strlen(provider_str)== 0) - return 1; - - if (!strcmp(provider_str, WSREP_NONE)) - return 0; - - if (!unpack_filename(path, provider_str)) - return 1; - - /* check that provider file exists */ - bzero(&f_stat, sizeof(MY_STAT)); - if (!my_stat(path, &f_stat, MYF(0))) - { - return 1; - } - return 0; -} - -bool wsrep_provider_check (sys_var *self, THD* thd, set_var* var) -{ - char buff[FN_REFLEN]; - String str(buff, sizeof(buff), system_charset_info), *res; - const char* provider_str = NULL; - - if (!(res = var->value->val_str(&str))) goto err; - - provider_str = res->c_ptr(); - - if (!provider_str) goto err; - - if (!wsrep_provider_verify(provider_str)) return 0; - -err: - - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, - provider_str ? provider_str : "NULL"); - return 1; -} - -bool wsrep_provider_update (sys_var *self, THD* thd, enum_var_type type) -{ - bool rcode= false; - - bool wsrep_on_saved= thd->variables.wsrep_on; - thd->variables.wsrep_on= false; - - WSREP_DEBUG("wsrep_provider_update: %s", wsrep_provider); - - wsrep_stop_replication(thd); - wsrep_deinit(); - - char* tmp= strdup(wsrep_provider); // wsrep_init() rewrites provider - //when fails - if (wsrep_init()) - { - my_error(ER_CANT_OPEN_LIBRARY, MYF(0), tmp); - rcode = true; - } - free(tmp); - - // we sure don't want to use old address with new provider - wsrep_cluster_address_init(NULL); - wsrep_provider_options_init(NULL); - - thd->variables.wsrep_on= wsrep_on_saved; - - refresh_provider_options(); - - return rcode; -} - -void wsrep_provider_init (const char* value) -{ - WSREP_DEBUG("wsrep_provider_init: %s -> %s", - (wsrep_provider) ? wsrep_provider : "null", - (value) ? value : "null"); - if (NULL == value || wsrep_provider_verify (value)) - { - WSREP_ERROR("Bad initial value for wsrep_provider: %s", - (value ? value : "")); - return; - } - - if (wsrep_provider) my_free((void *)wsrep_provider); - wsrep_provider = my_strdup(value, MYF(0)); -} - -bool wsrep_provider_options_check(sys_var *self, THD* thd, set_var* var) -{ - return 0; -} - -bool wsrep_provider_options_update(sys_var *self, THD* thd, enum_var_type type) -{ - wsrep_status_t ret= wsrep->options_set(wsrep, wsrep_provider_options); - if (ret != WSREP_OK) - { - WSREP_ERROR("Set options returned %d", ret); - return true; - } - return refresh_provider_options(); -} - -void wsrep_provider_options_init(const char* value) -{ - if (wsrep_provider_options && wsrep_provider_options != value) - my_free((void *)wsrep_provider_options); - wsrep_provider_options = (value) ? my_strdup(value, MYF(0)) : NULL; -} - -static int wsrep_cluster_address_verify (const char* cluster_address_str) -{ - /* There is no predefined address format, it depends on provider. */ - return 0; -} - -bool wsrep_cluster_address_check (sys_var *self, THD* thd, set_var* var) -{ - char buff[FN_REFLEN]; - String str(buff, sizeof(buff), system_charset_info), *res; - const char* cluster_address_str = NULL; - - if (!(res = var->value->val_str(&str))) goto err; - - cluster_address_str = res->c_ptr(); - - if (!wsrep_cluster_address_verify(cluster_address_str)) return 0; - - err: - - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, - cluster_address_str ? cluster_address_str : "NULL"); - return 1 ; -} - -bool wsrep_cluster_address_update (sys_var *self, THD* thd, enum_var_type type) -{ - bool wsrep_on_saved= thd->variables.wsrep_on; - thd->variables.wsrep_on= false; - - wsrep_stop_replication(thd); - - if (wsrep_start_replication()) - { - wsrep_create_rollbacker(); - wsrep_create_appliers(wsrep_slave_threads); - } - - thd->variables.wsrep_on= wsrep_on_saved; - - return false; -} - -void wsrep_cluster_address_init (const char* value) -{ - WSREP_DEBUG("wsrep_cluster_address_init: %s -> %s", - (wsrep_cluster_address) ? wsrep_cluster_address : "null", - (value) ? value : "null"); - - if (wsrep_cluster_address) my_free ((void*)wsrep_cluster_address); - wsrep_cluster_address = (value) ? my_strdup(value, MYF(0)) : NULL; -} - -bool wsrep_cluster_name_check (sys_var *self, THD* thd, set_var* var) -{ - char buff[FN_REFLEN]; - String str(buff, sizeof(buff), system_charset_info), *res; - const char* cluster_name_str = NULL; - - if (!(res = var->value->val_str(&str))) goto err; - - cluster_name_str = res->c_ptr(); - - if (!cluster_name_str || strlen(cluster_name_str) == 0) goto err; - - return 0; - - err: - - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, - cluster_name_str ? cluster_name_str : "NULL"); - return 1; -} - -bool wsrep_cluster_name_update (sys_var *self, THD* thd, enum_var_type type) -{ - return 0; -} - -bool wsrep_node_name_check (sys_var *self, THD* thd, set_var* var) -{ - char buff[FN_REFLEN]; - String str(buff, sizeof(buff), system_charset_info), *res; - const char* node_name_str = NULL; - - if (!(res = var->value->val_str(&str))) goto err; - - node_name_str = res->c_ptr(); - - if (!node_name_str || strlen(node_name_str) == 0) goto err; - - return 0; - - err: - - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, - node_name_str ? node_name_str : "NULL"); - return 1; -} - -bool wsrep_node_name_update (sys_var *self, THD* thd, enum_var_type type) -{ - return 0; -} - -// TODO: do something more elaborate, like checking connectivity -bool wsrep_node_address_check (sys_var *self, THD* thd, set_var* var) -{ - char buff[FN_REFLEN]; - String str(buff, sizeof(buff), system_charset_info), *res; - const char* node_address_str = NULL; - - if (!(res = var->value->val_str(&str))) goto err; - - node_address_str = res->c_ptr(); - - if (!node_address_str || strlen(node_address_str) == 0) goto err; - - return 0; - - err: - - my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), var->var->name.str, - node_address_str ? node_address_str : "NULL"); - return 1; -} - -bool wsrep_node_address_update (sys_var *self, THD* thd, enum_var_type type) -{ - return 0; -} - -void wsrep_node_address_init (const char* value) -{ - if (wsrep_node_address && strcmp(wsrep_node_address, value)) - my_free ((void*)wsrep_node_address); - - wsrep_node_address = (value) ? my_strdup(value, MYF(0)) : NULL; -} - -bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) -{ - mysql_mutex_lock(&LOCK_wsrep_slave_threads); - wsrep_thread_change = var->value->val_int() - wsrep_slave_threads; - mysql_mutex_unlock(&LOCK_wsrep_slave_threads); - - return 0; -} - -bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) -{ - if (wsrep_thread_change > 0) - { - wsrep_create_appliers(wsrep_thread_change); - } - else if (wsrep_thread_change < 0) - { - wsrep_close_applier_threads(-wsrep_thread_change); - } - return false; -} -/* - * Status variables stuff below - */ -static inline void -wsrep_assign_to_mysql (SHOW_VAR* mysql, wsrep_stats_var* wsrep) -{ - mysql->name = wsrep->name; - switch (wsrep->type) { - case WSREP_VAR_INT64: - mysql->value = (char*) &wsrep->value._int64; - mysql->type = SHOW_LONGLONG; - break; - case WSREP_VAR_STRING: - mysql->value = (char*) &wsrep->value._string; - mysql->type = SHOW_CHAR_PTR; - break; - case WSREP_VAR_DOUBLE: - mysql->value = (char*) &wsrep->value._double; - mysql->type = SHOW_DOUBLE; - break; - } -} - -#if DYNAMIC -// somehow this mysql status thing works only with statically allocated arrays. -static SHOW_VAR* mysql_status_vars = NULL; -static int mysql_status_len = -1; -#else -static SHOW_VAR mysql_status_vars[512 + 1]; -static const int mysql_status_len = 512; -#endif - -static void export_wsrep_status_to_mysql(THD* thd) -{ - int wsrep_status_len, i; - - thd->wsrep_status_vars = wsrep->stats_get(wsrep); - - if (!thd->wsrep_status_vars) { - return; - } - - for (wsrep_status_len = 0; - thd->wsrep_status_vars[wsrep_status_len].name != NULL; - wsrep_status_len++); - -#if DYNAMIC - if (wsrep_status_len != mysql_status_len) { - void* tmp = realloc (mysql_status_vars, - (wsrep_status_len + 1) * sizeof(SHOW_VAR)); - if (!tmp) { - - sql_print_error ("Out of memory for wsrep status variables." - "Number of variables: %d", wsrep_status_len); - return; - } - - mysql_status_len = wsrep_status_len; - mysql_status_vars = (SHOW_VAR*)tmp; - } - /* @TODO: fix this: */ -#else - if (mysql_status_len < wsrep_status_len) wsrep_status_len= mysql_status_len; -#endif - - for (i = 0; i < wsrep_status_len; i++) - wsrep_assign_to_mysql (mysql_status_vars + i, thd->wsrep_status_vars + i); - - mysql_status_vars[wsrep_status_len].name = NullS; - mysql_status_vars[wsrep_status_len].value = NullS; - mysql_status_vars[wsrep_status_len].type = SHOW_LONG; -} - -int wsrep_show_status (THD *thd, SHOW_VAR *var, char *buff) -{ - export_wsrep_status_to_mysql(thd); - var->type= SHOW_ARRAY; - var->value= (char *) &mysql_status_vars; - return 0; -} - -void wsrep_free_status (THD* thd) -{ - if (thd->wsrep_status_vars) - { - wsrep->stats_free (wsrep, thd->wsrep_status_vars); - thd->wsrep_status_vars = 0; - } -} |