summaryrefslogtreecommitdiff
path: root/sql/rpl_mi.cc
diff options
context:
space:
mode:
authorMichael Widenius <monty@askmonty.org>2012-09-28 02:06:56 +0300
committerMichael Widenius <monty@askmonty.org>2012-09-28 02:06:56 +0300
commit1864d9596d9531427f1d63f86ada9e0cb21b51db (patch)
tree89ad647a849fea26b336faf565573a8d226ca7a9 /sql/rpl_mi.cc
parent620d14f8c3521f9ec7283b8690e0e16434739d33 (diff)
downloadmariadb-git-1864d9596d9531427f1d63f86ada9e0cb21b51db.tar.gz
Implementation of Multi-source replication (MDEV:253)
Documentation of the feature can be found at: http://kb.askmonty.org/en/multi-source-replication/ This code is based on code from Taobao, developed by Plinux BUILD/SETUP.sh: Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it) client/mysqltest.cc: Added support for error names starting with 'W' Added connection_name support to --sync_with_master cmake/maintainer.cmake: Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it) mysql-test/r/mysqltest.result: Updated results mysql-test/r/parser.result: Updated results mysql-test/suite/multi_source/my.cnf: Setup of multi-master tests mysql-test/suite/multi_source/simple.result: Simple basic test of multi-source functionality mysql-test/suite/multi_source/simple.test: Simple basic test of multi-source functionality mysql-test/suite/multi_source/syntax.result: Test of multi-source syntax mysql-test/suite/multi_source/syntax.test: Test of multi-source syntax mysql-test/suite/rpl/r/rpl_rotate_logs.result: Updated results because of new error messages mysql-test/t/parser.test: Updated test as master_pos_wait() now takes more arguments than before sql/event_scheduler.cc: No reason to initialize slave_thread (it's guaranteed to be zero here) sql/item_create.cc: Added connection_name argument to master_pos_wait() Simplified code sql/item_func.cc: Added connection_name argument to master_pos_wait() sql/item_func.h: Added connection_name argument to master_pos_wait() sql/log.cc: Added tag "Master 'connection_name'" to slave errors that has a connection name. sql/mysqld.cc: Added variable mysqld_server_initialized so that other functions can test if server is fully initialized. Free all slave data in one place (fewer ifdef's) Removed not needed call to close_active_mi() Initialize slaves() later in startup to ensure that everthing is really initialized when slaves start. Made status variable slave_running multi-source safe sql/mysqld.h: Added mysqld_server_initialized sql/rpl_mi.cc: Store connection name and cmp_connection_name (only used for show full slave status) in Master_info Added code for Master_info_index, which handles storage of multi-master information Don't write the empty "" connection_name to multi-master.info file. This is handled by the original code. sql/rpl_mi.h: Added connection_name and Master_info_index sql/rpl_rli.cc: Added connection_name to relay log files. sql/rpl_rli.h: Fixed type of slave_skip_counter as we now access it directly in sys_vars.cc, so it must be uint sql/share/errmsg-utf8.txt: Added new error messages needed for multi-source Added multi-source name to error ER_MASTER_INFO and WARN_NO_MASTER_INFO sql/slave.cc: Moved things a bit around to make it easier to handle error conditions. Create a global master_info_index and add the "" connection to it Ensure that new Master_info doesn't fail. Don't call terminate_slave_threads(active_mi..) on end_slave() as this is now done automaticly when deleting master_info_index. Delete not needed function close_active_mi(). One can achive same thing by calling end_slave(). Added support for SHOW FULL SLAVE STATUS (show status for all master connections with connection_name as first column) sql/slave.h: Added new prototypes sql/sql_base.cc: More DBUG_PRINT sql/sql_class.cc: Reset thd->connection_name and thd-->default_master_connection sql/sql_class.h: Added thd->connection_name and thd-->default_master_connection Added slave_skip_count to variables to make changing the @@sql_slave_skip_count variable thread safe sql/sql_const.h: Added MAX_CONNECTION_NAME sql/sql_lex.cc: Reset 'lex->verbose' (to simplify some sql_yacc.yy code) sql/sql_lex.h: Added connection_name sql/sql_parse.cc: Added support for connection_name to all SLAVE commands. - Instead of using active_mi, we now get the current Master_info from master_info_index. - Create new replication threads with CHANGE MASTER - Added support for show_all_master_info() sql/sql_reload.cc: Made reset/full slave use master_info_index->get_master_info() instead of active_mi. If one uses 'RESET SLAVE "connection_name" all' the connection is removed from master_info_index. sql/sql_repl.cc: sql_slave_skip_counter is moved to thd->variables to make it thread safe and fix some bugs with it Add connection name to relay log files. Added connection name to errors. Added some logging for multi-master if log_warnings > 1 stop_slave(): - Don't check if thd is set. It's guaranteed to always be set. change_master(): - Check for duplicate connection names in change_master() - Check for wrong arguments first in file (to simplify error handling) - Register new connections in master_info_index sql/sql_yacc.yy: Added optional connection_name to a all relevant master/slave commands sql/strfunc.cc: my_global.h shoud always be included first. sql/sys_vars.cc: Added variable default_master_connection Made variable sql_slave_skip_counter multi-source safe sql/sys_vars.h: Added Sys_var_session_lexstring (needed for default_master_connection) Added Sys_var_multi_source_uint (needed for sql_slave_skip_counter).
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r--sql/rpl_mi.cc472
1 files changed, 470 insertions, 2 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 3c5a99121fa..3dc3e38f419 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -20,6 +20,7 @@
#include "unireg.h" // REQUIRED by other includes
#include "rpl_mi.h"
#include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD
+#include "strfunc.h"
#ifdef HAVE_REPLICATION
@@ -27,7 +28,8 @@
static void init_master_log_pos(Master_info* mi);
-Master_info::Master_info(bool is_slave_recovery)
+Master_info::Master_info(LEX_STRING *connection_name_arg,
+ bool is_slave_recovery)
:Slave_reporting_capability("I/O"),
ssl(0), ssl_verify_server_cert(1), fd(-1), io_thd(0),
rli(is_slave_recovery), port(MYSQL_PORT),
@@ -40,6 +42,21 @@ Master_info::Master_info(bool is_slave_recovery)
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
+ /* Store connection name and lower case connection name */
+ connection_name.length= cmp_connection_name.length=
+ connection_name_arg->length;
+ if ((connection_name.str= (char*) my_malloc(connection_name_arg->length*2+2,
+ MYF(MY_WME))))
+ {
+ cmp_connection_name.str= (connection_name.str +
+ connection_name_arg->length+1);
+ strmake(connection_name.str, connection_name_arg->str,
+ connection_name.length);
+ memcpy(cmp_connection_name.str, connection_name_arg->str,
+ connection_name.length+1);
+ my_casedn_str(system_charset_info, cmp_connection_name.str);
+ }
+
my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16);
bzero((char*) &file, sizeof(file));
mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
@@ -55,6 +72,7 @@ Master_info::Master_info(bool is_slave_recovery)
Master_info::~Master_info()
{
+ my_free(connection_name.str);
delete_dynamic(&ignore_server_ids);
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
@@ -407,7 +425,7 @@ file '%s')", fname);
mi->master_log_name,
(ulong) mi->master_log_pos));
- mi->rli.mi = mi;
+ mi->rli.mi= mi;
if (init_relay_log_info(&mi->rli, slave_info_fname))
goto err;
@@ -560,5 +578,455 @@ void end_master_info(Master_info* mi)
DBUG_VOID_RETURN;
}
+/* Multi-Master By P.Linux */
+uchar *get_key_master_info(Master_info *mi, size_t *length,
+ my_bool not_used __attribute__((unused)))
+{
+ /* Return lower case name */
+ *length= mi->cmp_connection_name.length;
+ return (uchar*) mi->cmp_connection_name.str;
+}
+
+void free_key_master_info(Master_info *mi)
+{
+ DBUG_ENTER("free_key_master_info");
+ terminate_slave_threads(mi,SLAVE_FORCE_ALL);
+ end_master_info(mi);
+ delete mi;
+ DBUG_VOID_RETURN;
+}
+
+/**
+ Check if connection name for master_info is valid.
+
+ It's valid if it's a valid system name, is less than
+ MAX_CONNECTION_NAME.
+
+ @return
+ 0 ok
+ 1 error
+*/
+
+bool check_master_connection_name(LEX_STRING *name)
+{
+ if (name->length >= MAX_CONNECTION_NAME)
+ return 1;
+ return 0;
+}
+
+
+/**
+ Create a log file with a signed suffix.
+
+ @param
+ res_file_name Store result here
+ length Length of res_file_name buffer
+ info_file Original file name (prefix)
+ separator Separator character
+ suffix Suffix
+
+ @note
+ If suffix is an empty string, then we don't add any suffix.
+ This is to allow one to use this function also to generate old
+ file names without a prefix.
+*/
+
+void create_signed_file_name(char *res_file_name, uint length,
+ const char *info_file,
+ char separator, LEX_STRING *suffix)
+{
+ char buff[MAX_CONNECTION_NAME+1], res[MAX_CONNECTION_NAME+1], *p;
+ p= strmake(res_file_name, info_file, length);
+ if (suffix->length != 0 && p != info_file + length)
+ {
+ uint errors;
+ size_t res_length;
+
+ *p++= separator;
+ /* Create null terminated string */
+ strmake(buff, suffix->str, suffix->length);
+ /* Convert to lower case */
+ my_casedn_str(system_charset_info, buff);
+ /* Convert to characters usable in a file name */
+ res_length= strconvert(system_charset_info, buff,
+ &my_charset_filename, res, sizeof(res), &errors);
+ strmake(p, res, min(length - (p - res_file_name), res_length));
+ }
+}
+
+
+Master_info_index::Master_info_index()
+{
+ index_file_name[0] = 0;
+ bzero((char*) &index_file, sizeof(index_file));
+}
+
+Master_info_index::~Master_info_index()
+{
+ /* This will close connection for all objects in the cache */
+ my_hash_free(&master_info_hash);
+ end_io_cache(&index_file);
+ if (index_file.file > 0)
+ my_close(index_file.file, MYF(MY_WME));
+}
+
+
+/* Load All Master_info from master.info.index File
+ * RETURN:
+ * 0 - All Success
+ * 1 - All Fail
+ * 2 - Some Success, Some Fail
+ */
+
+bool Master_info_index::init_all_master_info()
+{
+ int thread_mask;
+ int err_num= 0, succ_num= 0; // The number of success read Master_info
+ char sign[MAX_CONNECTION_NAME];
+ File index_file_nr;
+ size_t filename_length, dir_length;
+ DBUG_ENTER("init_all_master_info");
+
+ /*
+ Create the Master_info index file by prepending 'multi-' before
+ the master_info_file file name.
+ */
+ fn_format(index_file_name, master_info_file, mysql_data_home,
+ "", MY_UNPACK_FILENAME);
+ filename_length= strlen(index_file_name) + 1; /* Count 0 byte */
+ dir_length= dirname_length(index_file_name);
+ bmove_upp((uchar*) index_file_name + filename_length + 6,
+ (uchar*) index_file_name + filename_length,
+ filename_length - dir_length);
+ memcpy(index_file_name + dir_length, "multi-", 6);
+
+ if ((index_file_nr= my_open(index_file_name,
+ O_RDWR | O_CREAT | O_BINARY ,
+ MYF(MY_WME | ME_NOREFRESH))) < 0 ||
+ my_sync(index_file_nr, MYF(MY_WME)) ||
+ init_io_cache(&index_file, index_file_nr,
+ IO_SIZE, READ_CACHE,
+ my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
+ 0, MYF(MY_WME | MY_WAIT_IF_FULL)))
+ {
+ if (index_file_nr >= 0)
+ my_close(index_file_nr,MYF(0));
+
+ sql_print_error("Creation of Master_info index file '%s' failed",
+ index_file_name);
+ DBUG_RETURN(1);
+ }
+
+ /* Initialize Master_info Hash Table */
+ if (my_hash_init(&master_info_hash, system_charset_info,
+ MAX_REPLICATION_THREAD, 0, 0,
+ (my_hash_get_key) get_key_master_info,
+ (my_hash_free_key)free_key_master_info, HASH_UNIQUE))
+ {
+ sql_print_error("Initializing Master_info hash table failed");
+ DBUG_RETURN(1);
+ }
+
+ reinit_io_cache(&index_file, READ_CACHE, 0L,0,0);
+ while (!init_strvar_from_file(sign, sizeof(sign),
+ &index_file, NULL))
+ {
+ LEX_STRING connection_name;
+ Master_info *mi;
+ char buf_master_info_file[FN_REFLEN];
+ char buf_relay_log_info_file[FN_REFLEN];
+
+ connection_name.str= sign;
+ connection_name.length= strlen(sign);
+ if (!(mi= new Master_info(&connection_name, relay_log_recovery)) ||
+ mi->error())
+ {
+ delete mi;
+ DBUG_RETURN(1);
+ }
+
+ lock_slave_threads(mi);
+ init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
+
+ create_signed_file_name(buf_master_info_file, sizeof(buf_master_info_file),
+ master_info_file, '.', &connection_name);
+ create_signed_file_name(buf_relay_log_info_file,
+ sizeof(buf_relay_log_info_file),
+ relay_log_info_file, '.', &connection_name);
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Reading Master_info: '%s' Relay_info:'%s'",
+ buf_master_info_file, buf_relay_log_info_file);
+
+ if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file,
+ 0, thread_mask))
+ {
+ err_num++;
+ sql_print_error("Initialized Master_info from '%s' failed",
+ buf_master_info_file);
+ if (!master_info_index->get_master_info(&connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE))
+ {
+ /* Master_info is not in HASH; Add it */
+ if (master_info_index->add_master_info(mi, FALSE))
+ return 1;
+ succ_num++;
+ unlock_slave_threads(mi);
+ }
+ else
+ {
+ /* Master_info already in HASH */
+ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
+ (int) connection_name.length, connection_name.str);
+ unlock_slave_threads(mi);
+ delete mi;
+ }
+ continue;
+ }
+ else
+ {
+ /* Initialization of Master_info succeded. Add it to HASH */
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Initialized Master_info from '%s'",
+ buf_master_info_file);
+ if (master_info_index->get_master_info(&connection_name,
+ MYSQL_ERROR::WARN_LEVEL_NOTE))
+ {
+ /* Master_info was already registered */
+ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
+ (int) connection_name.length, connection_name.str);
+ unlock_slave_threads(mi);
+ delete mi;
+ continue;
+ }
+
+ /* Master_info was not registered; add it */
+ if (master_info_index->add_master_info(mi, FALSE))
+ return 1;
+ succ_num++;
+ unlock_slave_threads(mi);
+
+ if (!opt_skip_slave_start)
+ {
+ if (start_slave_threads(1 /* need mutex */,
+ 0 /* no wait for start*/,
+ mi,
+ buf_master_info_file,
+ buf_relay_log_info_file,
+ SLAVE_IO | SLAVE_SQL))
+ {
+ sql_print_error("Failed to create slave threads for connection %.*s",
+ (int) connection_name.length,
+ connection_name.str);
+ continue;
+ }
+ if (global_system_variables.log_warnings)
+ sql_print_information("Started replication for '%.*s'",
+ (int) connection_name.length,
+ connection_name.str);
+ }
+ }
+ }
+
+ if (!err_num) // No Error on read Master_info
+ {
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Reading of all Master_info entries succeded");
+ DBUG_RETURN(0);
+ }
+ else if (succ_num) // Have some Error and some Success
+ {
+ sql_print_warning("Reading of some Master_info entries failed");
+ DBUG_RETURN(2);
+ }
+ else // All failed
+ {
+ sql_print_error("Reading of all Master_info entries failed!");
+ DBUG_RETURN(1);
+ }
+}
+
+
+/* Write new master.info to master.info.index File */
+bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name,
+ bool do_sync)
+{
+ DBUG_ASSERT(my_b_inited(&index_file) != 0);
+ DBUG_ENTER("write_master_name_to_index_file");
+
+ /* Don't write default slave to master_info.index */
+ if (name->length == 0)
+ DBUG_RETURN(0);
+
+ reinit_io_cache(&index_file, WRITE_CACHE,
+ my_b_filelength(&index_file), 0, 0);
+
+ if (my_b_write(&index_file, (uchar*) name->str, name->length) ||
+ my_b_write(&index_file, (uchar*) "\n", 1) ||
+ flush_io_cache(&index_file) ||
+ (do_sync && my_sync(index_file.file, MYF(MY_WME))))
+ {
+ sql_print_error("Write of new Master_info for '%.*s' to index file failed",
+ (int) name->length, name->str);
+ DBUG_RETURN(1);
+ }
+
+ DBUG_RETURN(0);
+}
+
+
+/**
+ Get Master_info for a connection
+
+ @param
+ connection_name Connection name
+ warning WARN_LEVEL_NOTE -> Don't print anything
+ WARN_LEVEL_WARN -> Issue warning if not exists
+ WARN_LEVEL_ERROR-> Issue error if not exists
+*/
+
+Master_info *
+Master_info_index::get_master_info(LEX_STRING *connection_name,
+ MYSQL_ERROR::enum_warning_level warning)
+{
+ Master_info *mi;
+ char buff[MAX_CONNECTION_NAME+1], *res;
+ uint buff_length;
+
+ /* Make name lower case for comparison */
+ res= strmake(buff, connection_name->str, connection_name->length);
+ my_casedn_str(system_charset_info, buff);
+ buff_length= (size_t) (res-buff);
+
+ mi= (Master_info*) my_hash_search(&master_info_hash,
+ (uchar*) buff, buff_length);
+ if (!mi && warning != MYSQL_ERROR::WARN_LEVEL_NOTE)
+ {
+ my_error(WARN_NO_MASTER_INFO,
+ MYF(warning == MYSQL_ERROR::WARN_LEVEL_WARN ? ME_JUST_WARNING :
+ 0),
+ (int) connection_name->length,
+ connection_name->str);
+ }
+ return mi;
+}
+
+
+/* Check Master_host & Master_port is duplicated or not */
+bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg,
+ const char *host,
+ uint port)
+{
+ Master_info *mi;
+
+ /* Get full host and port name */
+ if ((mi= master_info_index->get_master_info(name_arg,
+ MYSQL_ERROR::WARN_LEVEL_NOTE)))
+ {
+ if (!host)
+ host= mi->host;
+ if (!port)
+ port= mi->port;
+ }
+ if (!host || !port)
+ return FALSE; // Not comparable yet
+
+ for (uint i= 0; i < master_info_hash.records; ++i)
+ {
+ Master_info *tmp_mi;
+ tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
+ if (tmp_mi == mi)
+ continue; // Current connection
+ if (!strcasecmp(host, tmp_mi->host) && port == tmp_mi->port)
+ {
+ sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS),
+ (int) tmp_mi->connection_name.length,
+ tmp_mi->connection_name.str);
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
+
+
+/* Add a Master_info class to Hash Table */
+bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file)
+{
+ if (!my_hash_insert(&master_info_hash, (uchar*) mi))
+ {
+ if (global_system_variables.log_warnings > 1)
+ sql_print_information("Added new Master_info '%.*s' to hash table",
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ if (write_to_file)
+ return write_master_name_to_index_file(&mi->connection_name, 1);
+ return FALSE;
+ }
+
+ /* Impossible error (EOM) ? */
+ sql_print_error("Adding new entry '%.*s' to master_info failed",
+ (int) mi->connection_name.length,
+ mi->connection_name.str);
+ return TRUE;
+}
+
+
+/**
+ Remove a Master_info class From Hash Table
+
+ TODO: Change this to use my_rename() to make the file name creation
+ atomic
+*/
+
+bool Master_info_index::remove_master_info(LEX_STRING *name)
+{
+ Master_info* mi;
+ DBUG_ENTER("remove_master_info");
+
+ if ((mi= get_master_info(name, MYSQL_ERROR::WARN_LEVEL_WARN)))
+ {
+ // Delete Master_info and rewrite others to file
+ if (!my_hash_delete(&master_info_hash, (uchar*) mi))
+ {
+ File index_file_nr;
+
+ // Close IO_CACHE and FILE handler fisrt
+ end_io_cache(&index_file);
+ my_close(index_file.file, MYF(MY_WME));
+
+ // Reopen File and truncate it
+ fn_format(index_file_name, master_info_file, mysql_data_home,
+ ".index", MY_UNPACK_FILENAME | MY_APPEND_EXT);
+
+ if ((index_file_nr= my_open(index_file_name,
+ O_RDWR | O_CREAT | O_TRUNC | O_BINARY ,
+ MYF(MY_WME))) < 0 ||
+ init_io_cache(&index_file, index_file_nr,
+ IO_SIZE, WRITE_CACHE,
+ my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
+ 0, MYF(MY_WME | MY_WAIT_IF_FULL)))
+ {
+ int error= my_errno;
+ if (index_file_nr >= 0)
+ my_close(index_file_nr,MYF(0));
+
+ sql_print_error("Create of Master Info Index file '%s' failed with "
+ "error: %M",
+ index_file_name, error);
+ DBUG_RETURN(TRUE);
+ }
+
+ // Rewrite Master_info.index
+ uint i;
+ for (i= 0; i< master_info_hash.records; ++i)
+ {
+ Master_info *tmp_mi;
+ tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
+ write_master_name_to_index_file(&tmp_mi->connection_name, 0);
+ }
+ my_sync(index_file_nr, MYF(MY_WME));
+ }
+ }
+ DBUG_RETURN(FALSE);
+}
#endif /* HAVE_REPLICATION */