diff options
author | Michael Widenius <monty@askmonty.org> | 2012-09-28 02:06:56 +0300 |
---|---|---|
committer | Michael Widenius <monty@askmonty.org> | 2012-09-28 02:06:56 +0300 |
commit | 1864d9596d9531427f1d63f86ada9e0cb21b51db (patch) | |
tree | 89ad647a849fea26b336faf565573a8d226ca7a9 /sql/rpl_mi.cc | |
parent | 620d14f8c3521f9ec7283b8690e0e16434739d33 (diff) | |
download | mariadb-git-1864d9596d9531427f1d63f86ada9e0cb21b51db.tar.gz |
Implementation of Multi-source replication (MDEV:253)
Documentation of the feature can be found at: http://kb.askmonty.org/en/multi-source-replication/
This code is based on code from Taobao, developed by Plinux
BUILD/SETUP.sh:
Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it)
client/mysqltest.cc:
Added support for error names starting with 'W'
Added connection_name support to --sync_with_master
cmake/maintainer.cmake:
Added -Wno-invalid-offsetof to get rid of warning of offsetof() on C++ class (safe in the contex we use it)
mysql-test/r/mysqltest.result:
Updated results
mysql-test/r/parser.result:
Updated results
mysql-test/suite/multi_source/my.cnf:
Setup of multi-master tests
mysql-test/suite/multi_source/simple.result:
Simple basic test of multi-source functionality
mysql-test/suite/multi_source/simple.test:
Simple basic test of multi-source functionality
mysql-test/suite/multi_source/syntax.result:
Test of multi-source syntax
mysql-test/suite/multi_source/syntax.test:
Test of multi-source syntax
mysql-test/suite/rpl/r/rpl_rotate_logs.result:
Updated results because of new error messages
mysql-test/t/parser.test:
Updated test as master_pos_wait() now takes more arguments than before
sql/event_scheduler.cc:
No reason to initialize slave_thread (it's guaranteed to be zero here)
sql/item_create.cc:
Added connection_name argument to master_pos_wait()
Simplified code
sql/item_func.cc:
Added connection_name argument to master_pos_wait()
sql/item_func.h:
Added connection_name argument to master_pos_wait()
sql/log.cc:
Added tag "Master 'connection_name'" to slave errors that has a connection name.
sql/mysqld.cc:
Added variable mysqld_server_initialized so that other functions can test if server is fully initialized.
Free all slave data in one place (fewer ifdef's)
Removed not needed call to close_active_mi()
Initialize slaves() later in startup to ensure that everthing is really initialized when slaves start.
Made status variable slave_running multi-source safe
sql/mysqld.h:
Added mysqld_server_initialized
sql/rpl_mi.cc:
Store connection name and cmp_connection_name (only used for show full slave status) in Master_info
Added code for Master_info_index, which handles storage of multi-master information
Don't write the empty "" connection_name to multi-master.info file. This is handled by the original code.
sql/rpl_mi.h:
Added connection_name and Master_info_index
sql/rpl_rli.cc:
Added connection_name to relay log files.
sql/rpl_rli.h:
Fixed type of slave_skip_counter as we now access it directly in sys_vars.cc, so it must be uint
sql/share/errmsg-utf8.txt:
Added new error messages needed for multi-source
Added multi-source name to error ER_MASTER_INFO and WARN_NO_MASTER_INFO
sql/slave.cc:
Moved things a bit around to make it easier to handle error conditions.
Create a global master_info_index and add the "" connection to it
Ensure that new Master_info doesn't fail.
Don't call terminate_slave_threads(active_mi..) on end_slave() as this is now done automaticly when deleting master_info_index.
Delete not needed function close_active_mi(). One can achive same thing by calling end_slave().
Added support for SHOW FULL SLAVE STATUS (show status for all master connections with connection_name as first column)
sql/slave.h:
Added new prototypes
sql/sql_base.cc:
More DBUG_PRINT
sql/sql_class.cc:
Reset thd->connection_name and thd-->default_master_connection
sql/sql_class.h:
Added thd->connection_name and thd-->default_master_connection
Added slave_skip_count to variables to make changing the @@sql_slave_skip_count variable thread safe
sql/sql_const.h:
Added MAX_CONNECTION_NAME
sql/sql_lex.cc:
Reset 'lex->verbose' (to simplify some sql_yacc.yy code)
sql/sql_lex.h:
Added connection_name
sql/sql_parse.cc:
Added support for connection_name to all SLAVE commands.
- Instead of using active_mi, we now get the current Master_info from master_info_index.
- Create new replication threads with CHANGE MASTER
- Added support for show_all_master_info()
sql/sql_reload.cc:
Made reset/full slave use master_info_index->get_master_info() instead of active_mi.
If one uses 'RESET SLAVE "connection_name" all' the connection is removed from master_info_index.
sql/sql_repl.cc:
sql_slave_skip_counter is moved to thd->variables to make it thread safe and fix some bugs with it
Add connection name to relay log files.
Added connection name to errors.
Added some logging for multi-master if log_warnings > 1
stop_slave():
- Don't check if thd is set. It's guaranteed to always be set.
change_master():
- Check for duplicate connection names in change_master()
- Check for wrong arguments first in file (to simplify error handling)
- Register new connections in master_info_index
sql/sql_yacc.yy:
Added optional connection_name to a all relevant master/slave commands
sql/strfunc.cc:
my_global.h shoud always be included first.
sql/sys_vars.cc:
Added variable default_master_connection
Made variable sql_slave_skip_counter multi-source safe
sql/sys_vars.h:
Added Sys_var_session_lexstring (needed for default_master_connection)
Added Sys_var_multi_source_uint (needed for sql_slave_skip_counter).
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r-- | sql/rpl_mi.cc | 472 |
1 files changed, 470 insertions, 2 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index 3c5a99121fa..3dc3e38f419 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -20,6 +20,7 @@ #include "unireg.h" // REQUIRED by other includes #include "rpl_mi.h" #include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD +#include "strfunc.h" #ifdef HAVE_REPLICATION @@ -27,7 +28,8 @@ static void init_master_log_pos(Master_info* mi); -Master_info::Master_info(bool is_slave_recovery) +Master_info::Master_info(LEX_STRING *connection_name_arg, + bool is_slave_recovery) :Slave_reporting_capability("I/O"), ssl(0), ssl_verify_server_cert(1), fd(-1), io_thd(0), rli(is_slave_recovery), port(MYSQL_PORT), @@ -40,6 +42,21 @@ Master_info::Master_info(bool is_slave_recovery) ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; ssl_cipher[0]= 0; ssl_key[0]= 0; + /* Store connection name and lower case connection name */ + connection_name.length= cmp_connection_name.length= + connection_name_arg->length; + if ((connection_name.str= (char*) my_malloc(connection_name_arg->length*2+2, + MYF(MY_WME)))) + { + cmp_connection_name.str= (connection_name.str + + connection_name_arg->length+1); + strmake(connection_name.str, connection_name_arg->str, + connection_name.length); + memcpy(cmp_connection_name.str, connection_name_arg->str, + connection_name.length+1); + my_casedn_str(system_charset_info, cmp_connection_name.str); + } + my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16); bzero((char*) &file, sizeof(file)); mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); @@ -55,6 +72,7 @@ Master_info::Master_info(bool is_slave_recovery) Master_info::~Master_info() { + my_free(connection_name.str); delete_dynamic(&ignore_server_ids); mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); @@ -407,7 +425,7 @@ file '%s')", fname); mi->master_log_name, (ulong) mi->master_log_pos)); - mi->rli.mi = mi; + mi->rli.mi= mi; if (init_relay_log_info(&mi->rli, slave_info_fname)) goto err; @@ -560,5 +578,455 @@ void end_master_info(Master_info* mi) DBUG_VOID_RETURN; } +/* Multi-Master By P.Linux */ +uchar *get_key_master_info(Master_info *mi, size_t *length, + my_bool not_used __attribute__((unused))) +{ + /* Return lower case name */ + *length= mi->cmp_connection_name.length; + return (uchar*) mi->cmp_connection_name.str; +} + +void free_key_master_info(Master_info *mi) +{ + DBUG_ENTER("free_key_master_info"); + terminate_slave_threads(mi,SLAVE_FORCE_ALL); + end_master_info(mi); + delete mi; + DBUG_VOID_RETURN; +} + +/** + Check if connection name for master_info is valid. + + It's valid if it's a valid system name, is less than + MAX_CONNECTION_NAME. + + @return + 0 ok + 1 error +*/ + +bool check_master_connection_name(LEX_STRING *name) +{ + if (name->length >= MAX_CONNECTION_NAME) + return 1; + return 0; +} + + +/** + Create a log file with a signed suffix. + + @param + res_file_name Store result here + length Length of res_file_name buffer + info_file Original file name (prefix) + separator Separator character + suffix Suffix + + @note + If suffix is an empty string, then we don't add any suffix. + This is to allow one to use this function also to generate old + file names without a prefix. +*/ + +void create_signed_file_name(char *res_file_name, uint length, + const char *info_file, + char separator, LEX_STRING *suffix) +{ + char buff[MAX_CONNECTION_NAME+1], res[MAX_CONNECTION_NAME+1], *p; + p= strmake(res_file_name, info_file, length); + if (suffix->length != 0 && p != info_file + length) + { + uint errors; + size_t res_length; + + *p++= separator; + /* Create null terminated string */ + strmake(buff, suffix->str, suffix->length); + /* Convert to lower case */ + my_casedn_str(system_charset_info, buff); + /* Convert to characters usable in a file name */ + res_length= strconvert(system_charset_info, buff, + &my_charset_filename, res, sizeof(res), &errors); + strmake(p, res, min(length - (p - res_file_name), res_length)); + } +} + + +Master_info_index::Master_info_index() +{ + index_file_name[0] = 0; + bzero((char*) &index_file, sizeof(index_file)); +} + +Master_info_index::~Master_info_index() +{ + /* This will close connection for all objects in the cache */ + my_hash_free(&master_info_hash); + end_io_cache(&index_file); + if (index_file.file > 0) + my_close(index_file.file, MYF(MY_WME)); +} + + +/* Load All Master_info from master.info.index File + * RETURN: + * 0 - All Success + * 1 - All Fail + * 2 - Some Success, Some Fail + */ + +bool Master_info_index::init_all_master_info() +{ + int thread_mask; + int err_num= 0, succ_num= 0; // The number of success read Master_info + char sign[MAX_CONNECTION_NAME]; + File index_file_nr; + size_t filename_length, dir_length; + DBUG_ENTER("init_all_master_info"); + + /* + Create the Master_info index file by prepending 'multi-' before + the master_info_file file name. + */ + fn_format(index_file_name, master_info_file, mysql_data_home, + "", MY_UNPACK_FILENAME); + filename_length= strlen(index_file_name) + 1; /* Count 0 byte */ + dir_length= dirname_length(index_file_name); + bmove_upp((uchar*) index_file_name + filename_length + 6, + (uchar*) index_file_name + filename_length, + filename_length - dir_length); + memcpy(index_file_name + dir_length, "multi-", 6); + + if ((index_file_nr= my_open(index_file_name, + O_RDWR | O_CREAT | O_BINARY , + MYF(MY_WME | ME_NOREFRESH))) < 0 || + my_sync(index_file_nr, MYF(MY_WME)) || + init_io_cache(&index_file, index_file_nr, + IO_SIZE, READ_CACHE, + my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL))) + { + if (index_file_nr >= 0) + my_close(index_file_nr,MYF(0)); + + sql_print_error("Creation of Master_info index file '%s' failed", + index_file_name); + DBUG_RETURN(1); + } + + /* Initialize Master_info Hash Table */ + if (my_hash_init(&master_info_hash, system_charset_info, + MAX_REPLICATION_THREAD, 0, 0, + (my_hash_get_key) get_key_master_info, + (my_hash_free_key)free_key_master_info, HASH_UNIQUE)) + { + sql_print_error("Initializing Master_info hash table failed"); + DBUG_RETURN(1); + } + + reinit_io_cache(&index_file, READ_CACHE, 0L,0,0); + while (!init_strvar_from_file(sign, sizeof(sign), + &index_file, NULL)) + { + LEX_STRING connection_name; + Master_info *mi; + char buf_master_info_file[FN_REFLEN]; + char buf_relay_log_info_file[FN_REFLEN]; + + connection_name.str= sign; + connection_name.length= strlen(sign); + if (!(mi= new Master_info(&connection_name, relay_log_recovery)) || + mi->error()) + { + delete mi; + DBUG_RETURN(1); + } + + lock_slave_threads(mi); + init_thread_mask(&thread_mask,mi,0 /*not inverse*/); + + create_signed_file_name(buf_master_info_file, sizeof(buf_master_info_file), + master_info_file, '.', &connection_name); + create_signed_file_name(buf_relay_log_info_file, + sizeof(buf_relay_log_info_file), + relay_log_info_file, '.', &connection_name); + if (global_system_variables.log_warnings > 1) + sql_print_information("Reading Master_info: '%s' Relay_info:'%s'", + buf_master_info_file, buf_relay_log_info_file); + + if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file, + 0, thread_mask)) + { + err_num++; + sql_print_error("Initialized Master_info from '%s' failed", + buf_master_info_file); + if (!master_info_index->get_master_info(&connection_name, + MYSQL_ERROR::WARN_LEVEL_NOTE)) + { + /* Master_info is not in HASH; Add it */ + if (master_info_index->add_master_info(mi, FALSE)) + return 1; + succ_num++; + unlock_slave_threads(mi); + } + else + { + /* Master_info already in HASH */ + sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), + (int) connection_name.length, connection_name.str); + unlock_slave_threads(mi); + delete mi; + } + continue; + } + else + { + /* Initialization of Master_info succeded. Add it to HASH */ + if (global_system_variables.log_warnings > 1) + sql_print_information("Initialized Master_info from '%s'", + buf_master_info_file); + if (master_info_index->get_master_info(&connection_name, + MYSQL_ERROR::WARN_LEVEL_NOTE)) + { + /* Master_info was already registered */ + sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), + (int) connection_name.length, connection_name.str); + unlock_slave_threads(mi); + delete mi; + continue; + } + + /* Master_info was not registered; add it */ + if (master_info_index->add_master_info(mi, FALSE)) + return 1; + succ_num++; + unlock_slave_threads(mi); + + if (!opt_skip_slave_start) + { + if (start_slave_threads(1 /* need mutex */, + 0 /* no wait for start*/, + mi, + buf_master_info_file, + buf_relay_log_info_file, + SLAVE_IO | SLAVE_SQL)) + { + sql_print_error("Failed to create slave threads for connection %.*s", + (int) connection_name.length, + connection_name.str); + continue; + } + if (global_system_variables.log_warnings) + sql_print_information("Started replication for '%.*s'", + (int) connection_name.length, + connection_name.str); + } + } + } + + if (!err_num) // No Error on read Master_info + { + if (global_system_variables.log_warnings > 1) + sql_print_information("Reading of all Master_info entries succeded"); + DBUG_RETURN(0); + } + else if (succ_num) // Have some Error and some Success + { + sql_print_warning("Reading of some Master_info entries failed"); + DBUG_RETURN(2); + } + else // All failed + { + sql_print_error("Reading of all Master_info entries failed!"); + DBUG_RETURN(1); + } +} + + +/* Write new master.info to master.info.index File */ +bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name, + bool do_sync) +{ + DBUG_ASSERT(my_b_inited(&index_file) != 0); + DBUG_ENTER("write_master_name_to_index_file"); + + /* Don't write default slave to master_info.index */ + if (name->length == 0) + DBUG_RETURN(0); + + reinit_io_cache(&index_file, WRITE_CACHE, + my_b_filelength(&index_file), 0, 0); + + if (my_b_write(&index_file, (uchar*) name->str, name->length) || + my_b_write(&index_file, (uchar*) "\n", 1) || + flush_io_cache(&index_file) || + (do_sync && my_sync(index_file.file, MYF(MY_WME)))) + { + sql_print_error("Write of new Master_info for '%.*s' to index file failed", + (int) name->length, name->str); + DBUG_RETURN(1); + } + + DBUG_RETURN(0); +} + + +/** + Get Master_info for a connection + + @param + connection_name Connection name + warning WARN_LEVEL_NOTE -> Don't print anything + WARN_LEVEL_WARN -> Issue warning if not exists + WARN_LEVEL_ERROR-> Issue error if not exists +*/ + +Master_info * +Master_info_index::get_master_info(LEX_STRING *connection_name, + MYSQL_ERROR::enum_warning_level warning) +{ + Master_info *mi; + char buff[MAX_CONNECTION_NAME+1], *res; + uint buff_length; + + /* Make name lower case for comparison */ + res= strmake(buff, connection_name->str, connection_name->length); + my_casedn_str(system_charset_info, buff); + buff_length= (size_t) (res-buff); + + mi= (Master_info*) my_hash_search(&master_info_hash, + (uchar*) buff, buff_length); + if (!mi && warning != MYSQL_ERROR::WARN_LEVEL_NOTE) + { + my_error(WARN_NO_MASTER_INFO, + MYF(warning == MYSQL_ERROR::WARN_LEVEL_WARN ? ME_JUST_WARNING : + 0), + (int) connection_name->length, + connection_name->str); + } + return mi; +} + + +/* Check Master_host & Master_port is duplicated or not */ +bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg, + const char *host, + uint port) +{ + Master_info *mi; + + /* Get full host and port name */ + if ((mi= master_info_index->get_master_info(name_arg, + MYSQL_ERROR::WARN_LEVEL_NOTE))) + { + if (!host) + host= mi->host; + if (!port) + port= mi->port; + } + if (!host || !port) + return FALSE; // Not comparable yet + + for (uint i= 0; i < master_info_hash.records; ++i) + { + Master_info *tmp_mi; + tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); + if (tmp_mi == mi) + continue; // Current connection + if (!strcasecmp(host, tmp_mi->host) && port == tmp_mi->port) + { + sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), + (int) tmp_mi->connection_name.length, + tmp_mi->connection_name.str); + return TRUE; + } + } + return FALSE; +} + + +/* Add a Master_info class to Hash Table */ +bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file) +{ + if (!my_hash_insert(&master_info_hash, (uchar*) mi)) + { + if (global_system_variables.log_warnings > 1) + sql_print_information("Added new Master_info '%.*s' to hash table", + (int) mi->connection_name.length, + mi->connection_name.str); + if (write_to_file) + return write_master_name_to_index_file(&mi->connection_name, 1); + return FALSE; + } + + /* Impossible error (EOM) ? */ + sql_print_error("Adding new entry '%.*s' to master_info failed", + (int) mi->connection_name.length, + mi->connection_name.str); + return TRUE; +} + + +/** + Remove a Master_info class From Hash Table + + TODO: Change this to use my_rename() to make the file name creation + atomic +*/ + +bool Master_info_index::remove_master_info(LEX_STRING *name) +{ + Master_info* mi; + DBUG_ENTER("remove_master_info"); + + if ((mi= get_master_info(name, MYSQL_ERROR::WARN_LEVEL_WARN))) + { + // Delete Master_info and rewrite others to file + if (!my_hash_delete(&master_info_hash, (uchar*) mi)) + { + File index_file_nr; + + // Close IO_CACHE and FILE handler fisrt + end_io_cache(&index_file); + my_close(index_file.file, MYF(MY_WME)); + + // Reopen File and truncate it + fn_format(index_file_name, master_info_file, mysql_data_home, + ".index", MY_UNPACK_FILENAME | MY_APPEND_EXT); + + if ((index_file_nr= my_open(index_file_name, + O_RDWR | O_CREAT | O_TRUNC | O_BINARY , + MYF(MY_WME))) < 0 || + init_io_cache(&index_file, index_file_nr, + IO_SIZE, WRITE_CACHE, + my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL))) + { + int error= my_errno; + if (index_file_nr >= 0) + my_close(index_file_nr,MYF(0)); + + sql_print_error("Create of Master Info Index file '%s' failed with " + "error: %M", + index_file_name, error); + DBUG_RETURN(TRUE); + } + + // Rewrite Master_info.index + uint i; + for (i= 0; i< master_info_hash.records; ++i) + { + Master_info *tmp_mi; + tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); + write_master_name_to_index_file(&tmp_mi->connection_name, 0); + } + my_sync(index_file_nr, MYF(MY_WME)); + } + } + DBUG_RETURN(FALSE); +} #endif /* HAVE_REPLICATION */ |