diff options
Diffstat (limited to 'sql/rpl_mi.cc')
-rw-r--r-- | sql/rpl_mi.cc | 616 |
1 files changed, 614 insertions, 2 deletions
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc index a765f851812..2ca700a123a 100644 --- a/sql/rpl_mi.cc +++ b/sql/rpl_mi.cc @@ -20,6 +20,8 @@ #include "unireg.h" // REQUIRED by other includes #include "rpl_mi.h" #include "slave.h" // SLAVE_MAX_HEARTBEAT_PERIOD +#include "strfunc.h" +#include "sql_repl.h" #ifdef HAVE_REPLICATION @@ -27,7 +29,8 @@ static void init_master_log_pos(Master_info* mi); -Master_info::Master_info(bool is_slave_recovery) +Master_info::Master_info(LEX_STRING *connection_name_arg, + bool is_slave_recovery) :Slave_reporting_capability("I/O"), ssl(0), ssl_verify_server_cert(1), fd(-1), io_thd(0), rli(is_slave_recovery), port(MYSQL_PORT), @@ -41,6 +44,24 @@ Master_info::Master_info(bool is_slave_recovery) ssl_cipher[0]= 0; ssl_key[0]= 0; ssl_crl[0]= 0; ssl_crlpath[0]= 0; + /* + Store connection name and lower case connection name + It's safe to ignore any OMM errors as this is checked by error() + */ + connection_name.length= cmp_connection_name.length= + connection_name_arg->length; + if ((connection_name.str= (char*) my_malloc(connection_name_arg->length*2+2, + MYF(MY_WME)))) + { + cmp_connection_name.str= (connection_name.str + + connection_name_arg->length+1); + strmake(connection_name.str, connection_name_arg->str, + connection_name.length); + memcpy(cmp_connection_name.str, connection_name_arg->str, + connection_name.length+1); + my_casedn_str(system_charset_info, cmp_connection_name.str); + } + my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16); bzero((char*) &file, sizeof(file)); mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); @@ -56,6 +77,7 @@ Master_info::Master_info(bool is_slave_recovery) Master_info::~Master_info() { + my_free(connection_name.str); delete_dynamic(&ignore_server_ids); mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); @@ -441,7 +463,7 @@ file '%s')", fname); mi->master_log_name, (ulong) mi->master_log_pos)); - mi->rli.mi = mi; + mi->rli.mi= mi; if (init_relay_log_info(&mi->rli, slave_info_fname)) goto err; @@ -596,5 +618,595 @@ void end_master_info(Master_info* mi) DBUG_VOID_RETURN; } +/* Multi-Master By P.Linux */ +uchar *get_key_master_info(Master_info *mi, size_t *length, + my_bool not_used __attribute__((unused))) +{ + /* Return lower case name */ + *length= mi->cmp_connection_name.length; + return (uchar*) mi->cmp_connection_name.str; +} + +void free_key_master_info(Master_info *mi) +{ + DBUG_ENTER("free_key_master_info"); + terminate_slave_threads(mi,SLAVE_FORCE_ALL); + end_master_info(mi); + delete mi; + DBUG_VOID_RETURN; +} + +/** + Check if connection name for master_info is valid. + + It's valid if it's a valid system name of length less than + MAX_CONNECTION_NAME. + + @return + 0 ok + 1 error +*/ + +bool check_master_connection_name(LEX_STRING *name) +{ + if (name->length >= MAX_CONNECTION_NAME) + return 1; + return 0; +} + + +/** + Create a log file with a given suffix. + + @param + res_file_name Store result here + length Length of res_file_name buffer + info_file Original file name (prefix) + append 1 if we should add suffix last (not before ext) + suffix Suffix + + @note + The suffix is added before the extension of the file name prefixed with '-'. + The suffix is also converted to lower case and we transform + all not safe character, as we do with MySQL table names. + + If suffix is an empty string, then we don't add any suffix. + This is to allow one to use this function also to generate old + file names without a prefix. +*/ + +void create_logfile_name_with_suffix(char *res_file_name, uint length, + const char *info_file, bool append, + LEX_STRING *suffix) +{ + char buff[MAX_CONNECTION_NAME+1], res[MAX_CONNECTION_NAME+1], *p; + + p= strmake(res_file_name, info_file, length); + /* If not empty suffix and there is place left for some part of the suffix */ + if (suffix->length != 0 && p <= res_file_name + length -1) + { + const char *info_file_end= info_file + (p - res_file_name); + const char *ext= append ? info_file_end : fn_ext2(info_file); + size_t res_length, ext_pos; + uint errors; + + /* Create null terminated string */ + strmake(buff, suffix->str, suffix->length); + /* Convert to lower case */ + my_casedn_str(system_charset_info, buff); + /* Convert to characters usable in a file name */ + res_length= strconvert(system_charset_info, buff, + &my_charset_filename, res, sizeof(res), &errors); + + ext_pos= (size_t) (ext - info_file); + length-= (suffix->length - ext_pos); /* Leave place for extension */ + p= res_file_name + ext_pos; + *p++= '-'; /* Add separator */ + p= strmake(p, res, min((size_t) (length - (p - res_file_name)), + res_length)); + /* Add back extension. We have checked above that there is space for it */ + strmov(p, ext); + } +} + + +Master_info_index::Master_info_index() +{ + size_t filename_length, dir_length; + /* + Create the Master_info index file by prepending 'multi-' before + the master_info_file file name. + */ + fn_format(index_file_name, master_info_file, mysql_data_home, + "", MY_UNPACK_FILENAME); + filename_length= strlen(index_file_name) + 1; /* Count 0 byte */ + dir_length= dirname_length(index_file_name); + bmove_upp((uchar*) index_file_name + filename_length + 6, + (uchar*) index_file_name + filename_length, + filename_length - dir_length); + memcpy(index_file_name + dir_length, "multi-", 6); + + bzero((char*) &index_file, sizeof(index_file)); +} + +Master_info_index::~Master_info_index() +{ + /* This will close connection for all objects in the cache */ + my_hash_free(&master_info_hash); + end_io_cache(&index_file); + if (index_file.file > 0) + my_close(index_file.file, MYF(MY_WME)); +} + + +/* Load All Master_info from master.info.index File + * RETURN: + * 0 - All Success + * 1 - All Fail + * 2 - Some Success, Some Fail + */ + +bool Master_info_index::init_all_master_info() +{ + int thread_mask; + int err_num= 0, succ_num= 0; // The number of success read Master_info + char sign[MAX_CONNECTION_NAME]; + File index_file_nr; + DBUG_ENTER("init_all_master_info"); + + if ((index_file_nr= my_open(index_file_name, + O_RDWR | O_CREAT | O_BINARY , + MYF(MY_WME | ME_NOREFRESH))) < 0 || + my_sync(index_file_nr, MYF(MY_WME)) || + init_io_cache(&index_file, index_file_nr, + IO_SIZE, READ_CACHE, + my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL))) + { + if (index_file_nr >= 0) + my_close(index_file_nr,MYF(0)); + + sql_print_error("Creation of Master_info index file '%s' failed", + index_file_name); + DBUG_RETURN(1); + } + + /* Initialize Master_info Hash Table */ + if (my_hash_init(&master_info_hash, system_charset_info, + MAX_REPLICATION_THREAD, 0, 0, + (my_hash_get_key) get_key_master_info, + (my_hash_free_key)free_key_master_info, HASH_UNIQUE)) + { + sql_print_error("Initializing Master_info hash table failed"); + DBUG_RETURN(1); + } + + reinit_io_cache(&index_file, READ_CACHE, 0L,0,0); + while (!init_strvar_from_file(sign, sizeof(sign), + &index_file, NULL)) + { + LEX_STRING connection_name; + Master_info *mi; + char buf_master_info_file[FN_REFLEN]; + char buf_relay_log_info_file[FN_REFLEN]; + + connection_name.str= sign; + connection_name.length= strlen(sign); + if (!(mi= new Master_info(&connection_name, relay_log_recovery)) || + mi->error()) + { + delete mi; + DBUG_RETURN(1); + } + + lock_slave_threads(mi); + init_thread_mask(&thread_mask,mi,0 /*not inverse*/); + + create_logfile_name_with_suffix(buf_master_info_file, sizeof(buf_master_info_file), + master_info_file, 0, &connection_name); + create_logfile_name_with_suffix(buf_relay_log_info_file, + sizeof(buf_relay_log_info_file), + relay_log_info_file, 0, &connection_name); + if (global_system_variables.log_warnings > 1) + sql_print_information("Reading Master_info: '%s' Relay_info:'%s'", + buf_master_info_file, buf_relay_log_info_file); + + if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file, + 0, thread_mask)) + { + err_num++; + sql_print_error("Initialized Master_info from '%s' failed", + buf_master_info_file); + if (!master_info_index->get_master_info(&connection_name, + MYSQL_ERROR::WARN_LEVEL_NOTE)) + { + /* Master_info is not in HASH; Add it */ + if (master_info_index->add_master_info(mi, FALSE)) + return 1; + succ_num++; + unlock_slave_threads(mi); + } + else + { + /* Master_info already in HASH */ + sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), + (int) connection_name.length, connection_name.str); + unlock_slave_threads(mi); + delete mi; + } + continue; + } + else + { + /* Initialization of Master_info succeded. Add it to HASH */ + if (global_system_variables.log_warnings > 1) + sql_print_information("Initialized Master_info from '%s'", + buf_master_info_file); + if (master_info_index->get_master_info(&connection_name, + MYSQL_ERROR::WARN_LEVEL_NOTE)) + { + /* Master_info was already registered */ + sql_print_error(ER(ER_CONNECTION_ALREADY_EXISTS), + (int) connection_name.length, connection_name.str); + unlock_slave_threads(mi); + delete mi; + continue; + } + + /* Master_info was not registered; add it */ + if (master_info_index->add_master_info(mi, FALSE)) + return 1; + succ_num++; + unlock_slave_threads(mi); + + if (!opt_skip_slave_start) + { + if (start_slave_threads(1 /* need mutex */, + 0 /* no wait for start*/, + mi, + buf_master_info_file, + buf_relay_log_info_file, + SLAVE_IO | SLAVE_SQL)) + { + sql_print_error("Failed to create slave threads for connection '%.*s'", + (int) connection_name.length, + connection_name.str); + continue; + } + if (global_system_variables.log_warnings) + sql_print_information("Started replication for '%.*s'", + (int) connection_name.length, + connection_name.str); + } + } + } + + if (!err_num) // No Error on read Master_info + { + if (global_system_variables.log_warnings > 1) + sql_print_information("Reading of all Master_info entries succeded"); + DBUG_RETURN(0); + } + else if (succ_num) // Have some Error and some Success + { + sql_print_warning("Reading of some Master_info entries failed"); + DBUG_RETURN(2); + } + else // All failed + { + sql_print_error("Reading of all Master_info entries failed!"); + DBUG_RETURN(1); + } +} + + +/* Write new master.info to master.info.index File */ +bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name, + bool do_sync) +{ + DBUG_ASSERT(my_b_inited(&index_file) != 0); + DBUG_ENTER("write_master_name_to_index_file"); + + /* Don't write default slave to master_info.index */ + if (name->length == 0) + DBUG_RETURN(0); + + reinit_io_cache(&index_file, WRITE_CACHE, + my_b_filelength(&index_file), 0, 0); + + if (my_b_write(&index_file, (uchar*) name->str, name->length) || + my_b_write(&index_file, (uchar*) "\n", 1) || + flush_io_cache(&index_file) || + (do_sync && my_sync(index_file.file, MYF(MY_WME)))) + { + sql_print_error("Write of new Master_info for '%.*s' to index file failed", + (int) name->length, name->str); + DBUG_RETURN(1); + } + + DBUG_RETURN(0); +} + + +/** + Get Master_info for a connection + + @param + connection_name Connection name + warning WARN_LEVEL_NOTE -> Don't print anything + WARN_LEVEL_WARN -> Issue warning if not exists + WARN_LEVEL_ERROR-> Issue error if not exists +*/ + +Master_info * +Master_info_index::get_master_info(LEX_STRING *connection_name, + MYSQL_ERROR::enum_warning_level warning) +{ + Master_info *mi; + char buff[MAX_CONNECTION_NAME+1], *res; + uint buff_length; + DBUG_ENTER("get_master_info"); + DBUG_PRINT("enter", + ("connection_name: '%.*s'", (int) connection_name->length, + connection_name->str)); + + /* Make name lower case for comparison */ + res= strmake(buff, connection_name->str, connection_name->length); + my_casedn_str(system_charset_info, buff); + buff_length= (size_t) (res-buff); + + mi= (Master_info*) my_hash_search(&master_info_hash, + (uchar*) buff, buff_length); + if (!mi && warning != MYSQL_ERROR::WARN_LEVEL_NOTE) + { + my_error(WARN_NO_MASTER_INFO, + MYF(warning == MYSQL_ERROR::WARN_LEVEL_WARN ? ME_JUST_WARNING : + 0), + (int) connection_name->length, + connection_name->str); + } + DBUG_RETURN(mi); +} + + +/* Check Master_host & Master_port is duplicated or not */ +bool Master_info_index::check_duplicate_master_info(LEX_STRING *name_arg, + const char *host, + uint port) +{ + Master_info *mi; + DBUG_ENTER("check_duplicate_master_info"); + + /* Get full host and port name */ + if ((mi= master_info_index->get_master_info(name_arg, + MYSQL_ERROR::WARN_LEVEL_NOTE))) + { + if (!host) + host= mi->host; + if (!port) + port= mi->port; + } + if (!host || !port) + DBUG_RETURN(FALSE); // Not comparable yet + + for (uint i= 0; i < master_info_hash.records; ++i) + { + Master_info *tmp_mi; + tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); + if (tmp_mi == mi) + continue; // Current connection + if (!strcasecmp(host, tmp_mi->host) && port == tmp_mi->port) + { + my_error(ER_CONNECTION_ALREADY_EXISTS, MYF(0), + (int) name_arg->length, + name_arg->str, + (int) tmp_mi->connection_name.length, + tmp_mi->connection_name.str); + DBUG_RETURN(TRUE); + } + } + DBUG_RETURN(FALSE); +} + + +/* Add a Master_info class to Hash Table */ +bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file) +{ + if (!my_hash_insert(&master_info_hash, (uchar*) mi)) + { + if (global_system_variables.log_warnings > 1) + sql_print_information("Added new Master_info '%.*s' to hash table", + (int) mi->connection_name.length, + mi->connection_name.str); + if (write_to_file) + return write_master_name_to_index_file(&mi->connection_name, 1); + return FALSE; + } + + /* Impossible error (EOM) ? */ + sql_print_error("Adding new entry '%.*s' to master_info failed", + (int) mi->connection_name.length, + mi->connection_name.str); + return TRUE; +} + + +/** + Remove a Master_info class From Hash Table + + TODO: Change this to use my_rename() to make the file name creation + atomic +*/ + +bool Master_info_index::remove_master_info(LEX_STRING *name) +{ + Master_info* mi; + DBUG_ENTER("remove_master_info"); + + if ((mi= get_master_info(name, MYSQL_ERROR::WARN_LEVEL_WARN))) + { + // Delete Master_info and rewrite others to file + if (!my_hash_delete(&master_info_hash, (uchar*) mi)) + { + File index_file_nr; + + // Close IO_CACHE and FILE handler fisrt + end_io_cache(&index_file); + my_close(index_file.file, MYF(MY_WME)); + + // Reopen File and truncate it + if ((index_file_nr= my_open(index_file_name, + O_RDWR | O_CREAT | O_TRUNC | O_BINARY , + MYF(MY_WME))) < 0 || + init_io_cache(&index_file, index_file_nr, + IO_SIZE, WRITE_CACHE, + my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)), + 0, MYF(MY_WME | MY_WAIT_IF_FULL))) + { + int error= my_errno; + if (index_file_nr >= 0) + my_close(index_file_nr,MYF(0)); + + sql_print_error("Create of Master Info Index file '%s' failed with " + "error: %M", + index_file_name, error); + DBUG_RETURN(TRUE); + } + + // Rewrite Master_info.index + for (uint i= 0; i< master_info_hash.records; ++i) + { + Master_info *tmp_mi; + tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); + write_master_name_to_index_file(&tmp_mi->connection_name, 0); + } + my_sync(index_file_nr, MYF(MY_WME)); + } + } + DBUG_RETURN(FALSE); +} + + +/** + Master_info_index::give_error_if_slave_running() + + @return + TRUE If some slave is running. An error is printed + FALSE No slave is running +*/ + +bool Master_info_index::give_error_if_slave_running() +{ + DBUG_ENTER("warn_if_slave_running"); + mysql_mutex_assert_owner(&LOCK_active_mi); + + for (uint i= 0; i< master_info_hash.records; ++i) + { + Master_info *mi; + mi= (Master_info *) my_hash_element(&master_info_hash, i); + if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) + { + my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, + mi->connection_name.str); + DBUG_RETURN(TRUE); + } + } + DBUG_RETURN(FALSE); +} + + +/** + Master_info_index::start_all_slaves() + + Start all slaves that was not running. + + @return + TRUE Error + FALSE Everything ok. +*/ + +bool Master_info_index::start_all_slaves(THD *thd) +{ + bool result= FALSE; + DBUG_ENTER("warn_if_slave_running"); + mysql_mutex_assert_owner(&LOCK_active_mi); + + for (uint i= 0; i< master_info_hash.records; ++i) + { + int error; + Master_info *mi; + mi= (Master_info *) my_hash_element(&master_info_hash, i); + + /* + Try to start all slaves that are configured (host is defined) + and are not already running + */ + if ((mi->slave_running != MYSQL_SLAVE_RUN_CONNECT || + !mi->rli.slave_running) && *mi->host) + { + if ((error= start_slave(thd, mi, 1))) + { + my_error(ER_CANT_START_STOP_SLAVE, MYF(0), + "START", + (int) mi->connection_name.length, + mi->connection_name.str); + result= 1; + if (error < 0) // fatal error + break; + } + else + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_SLAVE_STARTED, ER(ER_SLAVE_STARTED), + (int) mi->connection_name.length, + mi->connection_name.str); + } + } + DBUG_RETURN(result); +} + + +/** + Master_info_index::stop_all_slaves() + + Start all slaves that was not running. + + @return + TRUE Error + FALSE Everything ok. +*/ + +bool Master_info_index::stop_all_slaves(THD *thd) +{ + bool result= FALSE; + DBUG_ENTER("warn_if_slave_running"); + mysql_mutex_assert_owner(&LOCK_active_mi); + + for (uint i= 0; i< master_info_hash.records; ++i) + { + int error; + Master_info *mi; + mi= (Master_info *) my_hash_element(&master_info_hash, i); + if ((mi->slave_running != MYSQL_SLAVE_NOT_RUN || + mi->rli.slave_running)) + { + if ((error= stop_slave(thd, mi, 1))) + { + my_error(ER_CANT_START_STOP_SLAVE, MYF(0), + "STOP", + (int) mi->connection_name.length, + mi->connection_name.str); + result= 1; + if (error < 0) // Fatal error + break; + } + else + push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, + ER_SLAVE_STOPPED, ER(ER_SLAVE_STOPPED), + (int) mi->connection_name.length, + mi->connection_name.str); + } + } + DBUG_RETURN(result); +} #endif /* HAVE_REPLICATION */ |