summaryrefslogtreecommitdiff
path: root/sql/rpl_mi.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r--sql/rpl_mi.cc616
1 files changed, 614 insertions, 2 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index a765f851812..2ca700a123a 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -20,6 +20,8 @@
#include "unireg.h" // REQUIRED by other includes
#include "rpl_mi.h"
#include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD
+#include "strfunc.h"
+#include "sql_repl.h"
#ifdef HAVE_REPLICATION
@@ -27,7 +29,8 @@
static void init_master_log_pos(Master_info* mi);
-Master_info::Master_info(bool is_slave_recovery)
+Master_info::Master_info(LEX_STRING *connection_name_arg,
+ bool is_slave_recovery)
:Slave_reporting_capability("I/O"),
ssl(0), ssl_verify_server_cert(1), fd(-1), io_thd(0),
rli(is_slave_recovery), port(MYSQL_PORT),
@@ -41,6 +44,24 @@ Master_info::Master_info(bool is_slave_recovery)
ssl_cipher[0]= 0; ssl_key[0]= 0;
ssl_crl[0]= 0; ssl_crlpath[0]= 0;
+ /*
+ Store connection name and lower case connection name
+ It's safe to ignore any OMM errors as this is checked by error()
+ */
+ connection_name.length= cmp_connection_name.length=
+ connection_name_arg->length;
+ if ((connection_name.str= (char*) my_malloc(connection_name_arg->length*2+2,
+ MYF(MY_WME))))
+ {
+ cmp_connection_name.str= (connection_name.str +
+ connection_name_arg->length+1);
+ strmake(connection_name.str, connection_name_arg->str,
+ connection_name.length);
+ memcpy(cmp_connection_name.str, connection_name_arg->str,
+ connection_name.length+1);
+ my_casedn_str(system_charset_info, cmp_connection_name.str);
+ }
+
my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16);
bzero((char*) &file, sizeof(file));
mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
@@ -56,6 +77,7 @@ Master_info::Master_info(bool is_slave_recovery)
Master_info::~Master_info()
{
+ my_free(connection_name.str);
delete_dynamic(&ignore_server_ids);
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
@@ -441,7 +463,7 @@ file '%s')", fname);
mi->master_log_name,
(ulong) mi->master_log_pos));
- mi->rli.mi = mi;
+ mi->rli.mi= mi;
if (init_relay_log_info(&mi->rli, slave_info_fname))
goto err;
@@ -596,5 +618,595 @@ void end_master_info(Master_info* mi)
DBUG_VOID_RETURN;
}
+/* Multi-Master By P.Linux */
+uchar *get_key_master_info(Master_info *mi, size_t *length,
+ my_bool not_used __attribute__((unused)))
+{
+ /* Return lower case name */
+ *length= mi->cmp_connection_name.length;
+ return (uchar*) mi->cmp_connection_name.str;
+}
+
+void free_key_master_info(Master_info *mi)
+{
+ DBUG_ENTER("free_key_master_info");
+ terminate_slave_threads(mi,SLAVE_FORCE_ALL);
+ end_master_info(mi);
+ delete mi;
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Check if connection name for master_info is valid.
+
+ It's valid if it's a valid system name of length less than
+ MAX_CONNECTION_NAME.
+
+ @return
+ 0 ok
+ 1 error
+*/
+
+bool check_master_connection_name(LEX_STRING *name)
+{
+ if (name->length >= MAX_CONNECTION_NAME)
+ return 1;
+ return 0;
+}
+
+
+/**
+ Create a log file with a given suffix.
+
+ @param
+ res_file_name Store result here
+ length Length of res_file_name buffer
+ info_file Original file name (prefix)
+ append 1 if we should add suffix last (not before ext)
+ suffix Suffix
+
+ @note
+ The suffix is added before the extension of the file name prefixed with '-'.
+ The suffix is also converted to lower case and we transform
+ all not safe character, as we do with MySQL table names.
+
+ If suffix is an empty string, then we don't add any suffix.
+ This is to allow one to use this function also to generate old
+ file names without a prefix.
+*/
+
+void create_logfile_name_with_suffix(char *res_file_name, uint length,
+ const char *info_file, bool append,
+ LEX_STRING *suffix)
+{
+ char buff[MAX_CONNECTION_NAME+1], res[MAX_CONNECTION_NAME+1], *p;
+
+ p= strmake(res_file_name, info_file, length);
+ /* If not empty suffix and there is place left for some part of the suffix */
+ if (suffix->length != 0 && p <= res_file_name + length -1)
+ {
+ const char *info_file_end= info_file + (p - res_file_name);
+ const char *ext= append ? info_file_end : fn_ext2(info_file);
+ size_t res_length, ext_pos;
+ uint errors;
+
+ /* Create null terminated string */
+ strmake(buff, suffix->str, suffix->length);
+ /* Convert to lower case */
+ my_casedn_str(system_charset_info, buff);
+ /* Convert to characters usable in a file name */
+ res_length= strconvert(system_charset_info, buff,
+ &my_charset_filename, res, sizeof(res), &errors);
+
+ ext_pos= (size_t) (ext - info_file);
+ length-= (suffix->length - ext_pos); /* Leave place for extension */
+ p= res_file_name + ext_pos;
+ *p++= '-'; /* Add separator */
+ p= strmake(p, res, min((size_t) (length - (p - res_file_name)),
+ res_length));
+ /* Add back extension. We have checked above that there is space for it */
+ strmov(p, ext);
+ }
+}
+
+
+Master_info_index::Master_info_index()
+{
+ size_t filename_length, dir_length;
+ /*
+ Create the Master_info index file by prepending 'multi-' before
+ the master_info_file file name.
+ */
+ fn_format(index_file_name, master_info_file, mysql_data_home,
+ "", MY_UNPACK_FILENAME);
+ filename_length= strlen(index_file_name) + 1; /* Count 0 byte */
+ dir_length= dirname_length(index_file_name);
+ bmove_upp((uchar*) index_file_name + filename_length + 6,
+ (uchar*) index_file_name + filename_length,
+ filename_length - dir_length);
+ memcpy(index_file_name + dir_length, "multi-", 6);
+
+ bzero((char*) &index_file, sizeof(index_file));
+}
+
+Master_info_index::~Master_info_index()
+{
+ /* This will close connection for all objects in the cache */
+ my_hash_free(&master_info_hash);
+ end_io_cache(&index_file);
+ if (index_file.file > 0)
+ my_close(index_file.file, MYF(MY_WME));
+}
+
+
+/* Load All Master_info from master.info.index File
+ * RETURN:
+ * 0 - All Success
+ * 1 - All Fail
+ * 2 - Some Success, Some Fail
+ */
+
+bool Master_info_index::init_all_master_info()
+{
+ int thread_mask;
+ int err_num= 0, succ_num= 0; // The number of success read Master_info
+ char sign[MAX_CONNECTION_NAME];
+ File index_file_nr;
+ DBUG_ENTER("init_all_master_info");
+
+ if ((index_file_nr= my_open(index_file_name,
+ O_RDWR | O_CREAT | O_BINARY ,
+ MYF(MY_WME | ME_NOREFRESH))) < 0 ||
+ my_sync(index_file_nr, MYF(MY_WME)) ||
+ init_io_cache(&index_file, index_file_nr,
+ IO_SIZE, READ_CACHE,
+ my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
+ 0, MYF(MY_WME | MY_WAIT_IF_FULL)))
+ {
+ if (index_file_nr >= 0)
+ my_close(index_file_nr,MYF(0));
+
+ sql_print_error("Creation of Master_info index file '%s' failed",
+ index_file_name);
+ DBUG_RETURN(1);
+ }
+
+ /* Initialize Master_info Hash Table */
+ if (my_hash_init(&master_info_hash, system_charset_info,
+ MAX_REPLICATION_THREAD, 0, 0,
+ (my_hash_get_key) get_key_master_info,
+ (my_hash_free_key)free_key_master_info, HASH_UNIQUE))
+ {
+ sql_print_error("Initializing Master_info hash table failed");
+ DBUG_RETURN(1);
+ }
+
+ reinit_io_cache(&index_file, READ_CACHE, 0L,0,0);
+ while (!init_strvar_from_file(sign, sizeof(sign),
+ &index_file, NULL))
+ {
+ LEX_STRING connection_name;
+ Master_info *mi;
+ char buf_master_info_file[FN_REFLEN];
+ char buf_relay_log_info_file[FN_REFLEN];
+
+ connection_name.str= sign;
+ connection_name.length= strlen(sign);
+ if (!(mi= new Master_info(&connection_name, relay_log_recovery)) ||
+ mi->error())
+ {
+ delete mi;
+ DBUG_RETURN(1);
+ }
+
+ lock_slave_threads(mi);
+ init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
+
+ create_logfile_name_with_suffix(buf_master_info_file, sizeof(buf_master_info_file),
+ master_info_file, 0, &connection_name);
+ create_logfile_name_with_suffix(buf_relay_log_info_file,
+ sizeof(buf_relay_log_info_file),
+ relay_log_info_file, 0, &connection_name);
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Reading Master_info: '%s' Relay_info:'%s'",
+ buf_master_info_file, buf_relay_log_info_file);
+
+ if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file,
+ 0, thread_mask))
+ {
+ err_num++;
+ sql_print_error("Initialized Master_info from '%s' failed",
+ buf_master_info_file);
+ if (!master_info_index->get_master_info(&connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE))
+ {
+ /* Master_info is not in HASH; Add it */
+ if (master_info_index->add_master_info(mi, FALSE))
+ return 1;
+ succ_num++;
+ unlock_slave_threads(mi);
+ }
+ else
+ {
+ /* Master_info already in HASH */
+ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
+ (int) connection_name.length, connection_name.str);
+ unlock_slave_threads(mi);
+ delete mi;
+ }
+ continue;
+ }
+ else
+ {
+ /* Initialization of Master_info succeded. Add it to HASH */
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Initialized Master_info from '%s'",
+ buf_master_info_file);
+ if (master_info_index->get_master_info(&connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE))
+ {
+ /* Master_info was already registered */
+ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
+ (int) connection_name.length, connection_name.str);
+ unlock_slave_threads(mi);
+ delete mi;
+ continue;
+ }
+
+ /* Master_info was not registered; add it */
+ if (master_info_index->add_master_info(mi, FALSE))
+ return 1;
+ succ_num++;
+ unlock_slave_threads(mi);
+
+ if (!opt_skip_slave_start)
+ {
+ if (start_slave_threads(1 /* need mutex */,
+ 0 /* no wait for start*/,
+ mi,
+ buf_master_info_file,
+ buf_relay_log_info_file,
+ SLAVE_IO | SLAVE_SQL))
+ {
+ sql_print_error("Failed to create slave threads for connection '%.*s'",
+ (int) connection_name.length,
+ connection_name.str);
+ continue;
+ }
+ if (global_system_variables.log_warnings)
+ sql_print_information("Started replication for '%.*s'",
+ (int) connection_name.length,
+ connection_name.str);
+ }
+ }
+ }
+
+ if (!err_num) // No Error on read Master_info
+ {
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Reading of all Master_info entries succeded");
+ DBUG_RETURN(0);
+ }
+ else if (succ_num) // Have some Error and some Success
+ {
+ sql_print_warning("Reading of some Master_info entries failed");
+ DBUG_RETURN(2);
+ }
+ else // All failed
+ {
+ sql_print_error("Reading of all Master_info entries failed!");
+ DBUG_RETURN(1);
+ }
+}
+
+
+/* Write new master.info to master.info.index File */
+bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name,
+ bool do_sync)
+{
+ DBUG_ASSERT(my_b_inited(&index_file) != 0);
+ DBUG_ENTER("write_master_name_to_index_file");
+
+ /* Don't write default slave to master_info.index */
+ if (name->length == 0)
+ DBUG_RETURN(0);
+
+ reinit_io_cache(&index_file, WRITE_CACHE,
+ my_b_filelength(&index_file), 0, 0);
+
+ if (my_b_write(&index_file, (uchar*) name->str, name->length) ||
+ my_b_write(&index_file, (uchar*) "\n", 1) ||
+ flush_io_cache(&index_file) ||
+ (do_sync && my_sync(index_file.file, MYF(MY_WME))))
+ {
+ sql_print_error("Write of new Master_info for '%.*s' to index file failed",
+ (int) name->length, name->str);
+ DBUG_RETURN(1);
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+/**
+ Get Master_info for a connection
+
+ @param
+ connection_name Connection name
+ warning WARN_LEVEL_NOTE -> Don't print anything
+ WARN_LEVEL_WARN -> Issue warning if not exists
+ WARN_LEVEL_ERROR-> Issue error if not exists
+*/
+
+Master_info *
+Master_info_index::get_master_info(LEX_STRING *connection_name,
+ MYSQL_ERROR::enum_warning_level warning)
+{
+ Master_info *mi;
+ char buff[MAX_CONNECTION_NAME+1], *res;
+ uint buff_length;
+ DBUG_ENTER("get_master_info");
+ DBUG_PRINT("enter",
+ ("connection_name: '%.*s'", (int) connection_name->length,
+ connection_name->str));
+
+ /* Make name lower case for comparison */
+ res= strmake(buff, connection_name->str, connection_name->length);
+ my_casedn_str(system_charset_info, buff);
+ buff_length= (size_t) (res-buff);
+
+ mi= (Master_info*) my_hash_search(&master_info_hash,
+ (uchar*) buff, buff_length);
+ if (!mi && warning != MYSQL_ERROR::WARN_LEVEL_NOTE)
+ {
+ my_error(WARN_NO_MASTER_INFO,
+ MYF(warning == MYSQL_ERROR::WARN_LEVEL_WARN ? ME_JUST_WARNING :
+ 0),
+ (int) connection_name->length,
+ connection_name->str);
+ }
+ DBUG_RETURN(mi);
+}
+
+
+/* Check Master_host & Master_port is duplicated or not */
+bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg,
+ const char *host,
+ uint port)
+{
+ Master_info *mi;
+ DBUG_ENTER("check_duplicate_master_info");
+
+ /* Get full host and port name */
+ if ((mi= master_info_index->get_master_info(name_arg,
+ MYSQL_ERROR::WARN_LEVEL_NOTE)))
+ {
+ if (!host)
+ host= mi->host;
+ if (!port)
+ port= mi->port;
+ }
+ if (!host || !port)
+ DBUG_RETURN(FALSE); // Not comparable yet
+
+ for (uint i= 0; i < master_info_hash.records; ++i)
+ {
+ Master_info *tmp_mi;
+ tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
+ if (tmp_mi == mi)
+ continue; // Current connection
+ if (!strcasecmp(host, tmp_mi->host) && port == tmp_mi->port)
+ {
+ my_error(ER_CONNECTION_ALREADY_EXISTS, MYF(0),
+ (int) name_arg->length,
+ name_arg->str,
+ (int) tmp_mi->connection_name.length,
+ tmp_mi->connection_name.str);
+ DBUG_RETURN(TRUE);
+ }
+ }
+ DBUG_RETURN(FALSE);
+}
+
+
+/* Add a Master_info class to Hash Table */
+bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file)
+{
+ if (!my_hash_insert(&master_info_hash, (uchar*) mi))
+ {
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Added new Master_info '%.*s' to hash table",
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ if (write_to_file)
+ return write_master_name_to_index_file(&mi->connection_name, 1);
+ return FALSE;
+ }
+
+ /* Impossible error (EOM) ? */
+ sql_print_error("Adding new entry '%.*s' to master_info failed",
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ return TRUE;
+}
+
+
+/**
+ Remove a Master_info class From Hash Table
+
+ TODO: Change this to use my_rename() to make the file name creation
+ atomic
+*/
+
+bool Master_info_index::remove_master_info(LEX_STRING *name)
+{
+ Master_info* mi;
+ DBUG_ENTER("remove_master_info");
+
+ if ((mi= get_master_info(name, MYSQL_ERROR::WARN_LEVEL_WARN)))
+ {
+ // Delete Master_info and rewrite others to file
+ if (!my_hash_delete(&master_info_hash, (uchar*) mi))
+ {
+ File index_file_nr;
+
+ // Close IO_CACHE and FILE handler fisrt
+ end_io_cache(&index_file);
+ my_close(index_file.file, MYF(MY_WME));
+
+ // Reopen File and truncate it
+ if ((index_file_nr= my_open(index_file_name,
+ O_RDWR | O_CREAT | O_TRUNC | O_BINARY ,
+ MYF(MY_WME))) < 0 ||
+ init_io_cache(&index_file, index_file_nr,
+ IO_SIZE, WRITE_CACHE,
+ my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
+ 0, MYF(MY_WME | MY_WAIT_IF_FULL)))
+ {
+ int error= my_errno;
+ if (index_file_nr >= 0)
+ my_close(index_file_nr,MYF(0));
+
+ sql_print_error("Create of Master Info Index file '%s' failed with "
+ "error: %M",
+ index_file_name, error);
+ DBUG_RETURN(TRUE);
+ }
+
+ // Rewrite Master_info.index
+ for (uint i= 0; i< master_info_hash.records; ++i)
+ {
+ Master_info *tmp_mi;
+ tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
+ write_master_name_to_index_file(&tmp_mi->connection_name, 0);
+ }
+ my_sync(index_file_nr, MYF(MY_WME));
+ }
+ }
+ DBUG_RETURN(FALSE);
+}
+
+
+/**
+ Master_info_index::give_error_if_slave_running()
+
+ @return
+ TRUE If some slave is running. An error is printed
+ FALSE No slave is running
+*/
+
+bool Master_info_index::give_error_if_slave_running()
+{
+ DBUG_ENTER("warn_if_slave_running");
+ mysql_mutex_assert_owner(&LOCK_active_mi);
+
+ for (uint i= 0; i< master_info_hash.records; ++i)
+ {
+ Master_info *mi;
+ mi= (Master_info *) my_hash_element(&master_info_hash, i);
+ if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
+ {
+ my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
+ mi->connection_name.str);
+ DBUG_RETURN(TRUE);
+ }
+ }
+ DBUG_RETURN(FALSE);
+}
+
+
+/**
+ Master_info_index::start_all_slaves()
+
+ Start all slaves that was not running.
+
+ @return
+ TRUE Error
+ FALSE Everything ok.
+*/
+
+bool Master_info_index::start_all_slaves(THD *thd)
+{
+ bool result= FALSE;
+ DBUG_ENTER("warn_if_slave_running");
+ mysql_mutex_assert_owner(&LOCK_active_mi);
+
+ for (uint i= 0; i< master_info_hash.records; ++i)
+ {
+ int error;
+ Master_info *mi;
+ mi= (Master_info *) my_hash_element(&master_info_hash, i);
+
+ /*
+ Try to start all slaves that are configured (host is defined)
+ and are not already running
+ */
+ if ((mi->slave_running != MYSQL_SLAVE_RUN_CONNECT ||
+ !mi->rli.slave_running) && *mi->host)
+ {
+ if ((error= start_slave(thd, mi, 1)))
+ {
+ my_error(ER_CANT_START_STOP_SLAVE, MYF(0),
+ "START",
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ result= 1;
+ if (error < 0) // fatal error
+ break;
+ }
+ else
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
+ ER_SLAVE_STARTED, ER(ER_SLAVE_STARTED),
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ }
+ }
+ DBUG_RETURN(result);
+}
+
+
+/**
+ Master_info_index::stop_all_slaves()
+
+ Start all slaves that was not running.
+
+ @return
+ TRUE Error
+ FALSE Everything ok.
+*/
+
+bool Master_info_index::stop_all_slaves(THD *thd)
+{
+ bool result= FALSE;
+ DBUG_ENTER("warn_if_slave_running");
+ mysql_mutex_assert_owner(&LOCK_active_mi);
+
+ for (uint i= 0; i< master_info_hash.records; ++i)
+ {
+ int error;
+ Master_info *mi;
+ mi= (Master_info *) my_hash_element(&master_info_hash, i);
+ if ((mi->slave_running != MYSQL_SLAVE_NOT_RUN ||
+ mi->rli.slave_running))
+ {
+ if ((error= stop_slave(thd, mi, 1)))
+ {
+ my_error(ER_CANT_START_STOP_SLAVE, MYF(0),
+ "STOP",
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ result= 1;
+ if (error < 0) // Fatal error
+ break;
+ }
+ else
+ push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
+ ER_SLAVE_STOPPED, ER(ER_SLAVE_STOPPED),
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ }
+ }
+ DBUG_RETURN(result);
+}
#endif /* HAVE_REPLICATION */