summaryrefslogtreecommitdiff
path: root/sql/repl_failsafe.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/repl_failsafe.cc')
-rw-r--r--sql/repl_failsafe.cc611
1 files changed, 611 insertions, 0 deletions
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index c836e6803ee..d846662947d 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -20,12 +20,21 @@
#include "repl_failsafe.h"
#include "sql_repl.h"
#include "slave.h"
+#include "sql_acl.h"
#include "mini_client.h"
+#include "log_event.h"
#include <mysql.h>
+#include <thr_alarm.h>
+
+#define SLAVE_LIST_CHUNK 128
+#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
+
RPL_STATUS rpl_status=RPL_NULL;
pthread_mutex_t LOCK_rpl_status;
pthread_cond_t COND_rpl_status;
+HASH slave_list;
+extern const char* any_db;
const char *rpl_role_type[] = {"MASTER","SLAVE",NullS};
TYPELIB rpl_role_typelib = {array_elements(rpl_role_type)-1,"",
@@ -37,6 +46,10 @@ const char* rpl_status_type[] = {"AUTH_MASTER","ACTIVE_SLAVE","IDLE_SLAVE",
TYPELIB rpl_status_typelib= {array_elements(rpl_status_type)-1,"",
rpl_status_type};
+static Slave_log_event* find_slave_event(IO_CACHE* log,
+ const char* log_file_name,
+ char* errmsg);
+
static int init_failsafe_rpl_thread(THD* thd)
{
DBUG_ENTER("init_failsafe_rpl_thread");
@@ -89,6 +102,333 @@ void change_rpl_status(RPL_STATUS from_status, RPL_STATUS to_status)
pthread_mutex_unlock(&LOCK_rpl_status);
}
+#define get_object(p, obj) \
+{\
+ uint len = (uint)*p++; \
+ if (p + len > p_end || len >= sizeof(obj)) \
+ goto err; \
+ strmake(obj,(char*) p,len); \
+ p+= len; \
+}\
+
+static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
+{
+ return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
+ mi->pos);
+}
+
+void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
+{
+ if (need_mutex)
+ pthread_mutex_lock(&LOCK_slave_list);
+ if (thd->server_id)
+ {
+ SLAVE_INFO* old_si;
+ if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
+ (byte*)&thd->server_id, 4)) &&
+ (!only_mine || old_si->thd == thd))
+ hash_delete(&slave_list, (byte*)old_si);
+ }
+ if (need_mutex)
+ pthread_mutex_unlock(&LOCK_slave_list);
+}
+
+int register_slave(THD* thd, uchar* packet, uint packet_length)
+{
+ SLAVE_INFO *si;
+ int res = 1;
+ uchar* p = packet, *p_end = packet + packet_length;
+
+ if (check_access(thd, FILE_ACL, any_db))
+ return 1;
+
+ if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
+ goto err;
+
+ thd->server_id = si->server_id = uint4korr(p);
+ p += 4;
+ get_object(p,si->host);
+ get_object(p,si->user);
+ get_object(p,si->password);
+ si->port = uint2korr(p);
+ p += 2;
+ si->rpl_recovery_rank = uint4korr(p);
+ p += 4;
+ if (!(si->master_id = uint4korr(p)))
+ si->master_id = server_id;
+ si->thd = thd;
+ pthread_mutex_lock(&LOCK_slave_list);
+
+ unregister_slave(thd,0,0);
+ res = hash_insert(&slave_list, (byte*) si);
+ pthread_mutex_unlock(&LOCK_slave_list);
+ return res;
+
+err:
+ if (si)
+ my_free((gptr) si, MYF(MY_WME));
+ return res;
+}
+
+static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
+ my_bool not_used __attribute__((unused)))
+{
+ *len = 4;
+ return &si->server_id;
+}
+
+static void slave_info_free(void *s)
+{
+ my_free((gptr) s, MYF(MY_WME));
+}
+
+void init_slave_list()
+{
+ hash_init(&slave_list, SLAVE_LIST_CHUNK, 0, 0,
+ (hash_get_key) slave_list_key, slave_info_free, 0);
+ pthread_mutex_init(&LOCK_slave_list, MY_MUTEX_INIT_FAST);
+}
+
+void end_slave_list()
+{
+ pthread_mutex_lock(&LOCK_slave_list);
+ hash_free(&slave_list);
+ pthread_mutex_unlock(&LOCK_slave_list);
+ pthread_mutex_destroy(&LOCK_slave_list);
+}
+
+static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
+{
+ uint32 log_seq = mi->last_log_seq;
+ uint32 target_server_id = mi->server_id;
+
+ for (;;)
+ {
+ Log_event* ev;
+ if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0,
+ 0)))
+ {
+ if (log->error > 0)
+ strmov(errmsg, "Binary log truncated in the middle of event");
+ else if (log->error < 0)
+ strmov(errmsg, "I/O error reading binary log");
+ else
+ strmov(errmsg, "Could not find target event in the binary log");
+ return 1;
+ }
+
+ if (ev->log_seq == log_seq && ev->server_id == target_server_id)
+ {
+ delete ev;
+ mi->pos = my_b_tell(log);
+ return 0;
+ }
+
+ delete ev;
+ }
+}
+
+
+int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
+{
+ LOG_INFO linfo;
+ char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN];
+ IO_CACHE log;
+ File file = -1, last_file = -1;
+ pthread_mutex_t *log_lock;
+ const char* errmsg_p;
+ Slave_log_event* sev = 0;
+ my_off_t last_pos = 0;
+ int error = 1;
+ int cmp_res;
+ LINT_INIT(cmp_res);
+
+ if (!mysql_bin_log.is_open())
+ {
+ strmov(errmsg,"Binary log is not open");
+ return 1;
+ }
+
+ if (!server_id_supplied)
+ {
+ strmov(errmsg, "Misconfigured master - server id was not set");
+ return 1;
+ }
+
+ linfo.index_file_offset = 0;
+
+
+ search_file_name[0] = 0;
+
+ if (mysql_bin_log.find_first_log(&linfo, search_file_name))
+ {
+ strmov(errmsg,"Could not find first log");
+ return 1;
+ }
+ thd->current_linfo = &linfo;
+
+ bzero((char*) &log,sizeof(log));
+ log_lock = mysql_bin_log.get_log_lock();
+ pthread_mutex_lock(log_lock);
+
+ for (;;)
+ {
+ if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
+ {
+ strmov(errmsg, errmsg_p);
+ goto err;
+ }
+
+ if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
+ goto err;
+
+ cmp_res = cmp_master_pos(sev, mi);
+ delete sev;
+
+ if (!cmp_res)
+ {
+ /* Copy basename */
+ fn_format(mi->log_file_name, linfo.log_file_name, "","",1);
+ mi->pos = my_b_tell(&log);
+ goto mi_inited;
+ }
+ else if (cmp_res > 0)
+ {
+ if (!last_pos)
+ {
+ strmov(errmsg,
+ "Slave event in first log points past the target position");
+ goto err;
+ }
+ end_io_cache(&log);
+ (void) my_close(file, MYF(MY_WME));
+ if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
+ MYF(MY_WME)))
+ {
+ errmsg[0] = 0;
+ goto err;
+ }
+ break;
+ }
+
+ strmov(last_log_name, linfo.log_file_name);
+ last_pos = my_b_tell(&log);
+
+ switch (mysql_bin_log.find_next_log(&linfo)) {
+ case LOG_INFO_EOF:
+ if (last_file >= 0)
+ (void)my_close(last_file, MYF(MY_WME));
+ last_file = -1;
+ goto found_log;
+ case 0:
+ break;
+ default:
+ strmov(errmsg, "Error reading log index");
+ goto err;
+ }
+
+ end_io_cache(&log);
+ if (last_file >= 0)
+ (void) my_close(last_file, MYF(MY_WME));
+ last_file = file;
+ }
+
+found_log:
+ my_b_seek(&log, last_pos);
+ if (find_target_pos(mi,&log,errmsg))
+ goto err;
+ fn_format(mi->log_file_name, last_log_name, "","",1); /* Copy basename */
+
+mi_inited:
+ error = 0;
+err:
+ pthread_mutex_unlock(log_lock);
+ end_io_cache(&log);
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->current_linfo = 0;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ if (file >= 0)
+ (void) my_close(file, MYF(MY_WME));
+ if (last_file >= 0 && last_file != file)
+ (void) my_close(last_file, MYF(MY_WME));
+
+ return error;
+}
+
+// caller must delete result when done
+static Slave_log_event* find_slave_event(IO_CACHE* log,
+ const char* log_file_name,
+ char* errmsg)
+{
+ Log_event* ev;
+ int i;
+ bool slave_event_found = 0;
+ LINT_INIT(ev);
+
+ for (i = 0; i < 2; i++)
+ {
+ if (!(ev = Log_event::read_log_event(log, (pthread_mutex_t*)0, 0)))
+ {
+ my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
+ "Error reading event in log '%s'",
+ (char*)log_file_name);
+ return 0;
+ }
+ if (ev->get_type_code() == SLAVE_EVENT)
+ {
+ slave_event_found = 1;
+ break;
+ }
+ delete ev;
+ }
+ if (!slave_event_found)
+ {
+ my_snprintf(errmsg, SLAVE_ERRMSG_SIZE,
+ "Could not find slave event in log '%s'",
+ (char*)log_file_name);
+ delete ev;
+ return 0;
+ }
+
+ return (Slave_log_event*)ev;
+}
+
+
+int show_new_master(THD* thd)
+{
+ DBUG_ENTER("show_new_master");
+ List<Item> field_list;
+ char errmsg[SLAVE_ERRMSG_SIZE];
+ LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
+
+ errmsg[0]=0; // Safety
+ if (translate_master(thd, lex_mi, errmsg))
+ {
+ if (errmsg[0])
+ net_printf(&thd->net, ER_ERROR_WHEN_EXECUTING_COMMAND,
+ "SHOW NEW MASTER", errmsg);
+ else
+ send_error(&thd->net, 0);
+
+ DBUG_RETURN(1);
+ }
+ else
+ {
+ String* packet = &thd->packet;
+ field_list.push_back(new Item_empty_string("Log_name", 20));
+ field_list.push_back(new Item_empty_string("Log_pos", 20));
+ if (send_fields(thd, field_list, 1))
+ DBUG_RETURN(-1);
+ packet->length(0);
+ net_store_data(packet, lex_mi->log_file_name);
+ net_store_data(packet, (longlong)lex_mi->pos);
+ if (my_net_write(&thd->net, packet->ptr(), packet->length()))
+ DBUG_RETURN(-1);
+ send_eof(&thd->net);
+ DBUG_RETURN(0);
+ }
+}
+
int update_slave_list(MYSQL* mysql)
{
MYSQL_RES* res=0;
@@ -216,6 +556,277 @@ err:
DBUG_RETURN(0);
}
+int show_slave_hosts(THD* thd)
+{
+ List<Item> field_list;
+ NET* net = &thd->net;
+ String* packet = &thd->packet;
+ DBUG_ENTER("show_slave_hosts");
+
+ field_list.push_back(new Item_empty_string("Server_id", 20));
+ field_list.push_back(new Item_empty_string("Host", 20));
+ if (opt_show_slave_auth_info)
+ {
+ field_list.push_back(new Item_empty_string("User",20));
+ field_list.push_back(new Item_empty_string("Password",20));
+ }
+ field_list.push_back(new Item_empty_string("Port",20));
+ field_list.push_back(new Item_empty_string("Rpl_recovery_rank", 20));
+ field_list.push_back(new Item_empty_string("Master_id", 20));
+
+ if (send_fields(thd, field_list, 1))
+ DBUG_RETURN(-1);
+
+ pthread_mutex_lock(&LOCK_slave_list);
+
+ for (uint i = 0; i < slave_list.records; ++i)
+ {
+ SLAVE_INFO* si = (SLAVE_INFO*) hash_element(&slave_list, i);
+ packet->length(0);
+ net_store_data(packet, si->server_id);
+ net_store_data(packet, si->host);
+ if (opt_show_slave_auth_info)
+ {
+ net_store_data(packet, si->user);
+ net_store_data(packet, si->password);
+ }
+ net_store_data(packet, (uint32) si->port);
+ net_store_data(packet, si->rpl_recovery_rank);
+ net_store_data(packet, si->master_id);
+ if (my_net_write(net, (char*)packet->ptr(), packet->length()))
+ {
+ pthread_mutex_unlock(&LOCK_slave_list);
+ DBUG_RETURN(-1);
+ }
+ }
+ pthread_mutex_unlock(&LOCK_slave_list);
+ send_eof(net);
+ DBUG_RETURN(0);
+}
+
+int connect_to_master(THD *thd, MYSQL* mysql, MASTER_INFO* mi)
+{
+ if (!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ sql_print_error("Connection to master failed: %s",
+ mc_mysql_error(mysql));
+ return 1;
+ }
+ return 0;
+}
+
+
+static inline void cleanup_mysql_results(MYSQL_RES* db_res,
+ MYSQL_RES** cur, MYSQL_RES** start)
+{
+ for( ; cur >= start; --cur)
+ {
+ if (*cur)
+ mc_mysql_free_result(*cur);
+ }
+ mc_mysql_free_result(db_res);
+}
+
+
+static inline int fetch_db_tables(THD* thd, MYSQL* mysql, const char* db,
+ MYSQL_RES* table_res)
+{
+ MYSQL_ROW row;
+
+ for( row = mc_mysql_fetch_row(table_res); row;
+ row = mc_mysql_fetch_row(table_res))
+ {
+ TABLE_LIST table;
+ const char* table_name = row[0];
+ int error;
+ if (table_rules_on)
+ {
+ table.next = 0;
+ table.db = (char*)db;
+ table.real_name = (char*)table_name;
+ table.updating = 1;
+ if (!tables_ok(thd, &table))
+ continue;
+ }
+
+ if ((error = fetch_nx_table(thd, db, table_name, &glob_mi, mysql)))
+ return error;
+ }
+
+ return 0;
+}
+
+
+int load_master_data(THD* thd)
+{
+ MYSQL mysql;
+ MYSQL_RES* master_status_res = 0;
+ bool slave_was_running = 0;
+ int error = 0;
+
+ mc_mysql_init(&mysql);
+
+ // we do not want anyone messing with the slave at all for the entire
+ // duration of the data load;
+ pthread_mutex_lock(&LOCK_slave);
+
+ // first, kill the slave
+ if ((slave_was_running = slave_running))
+ {
+ abort_slave = 1;
+ KICK_SLAVE;
+ thd->proc_info = "waiting for slave to die";
+ while (slave_running)
+ pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); // wait until done
+ }
+
+
+ if (connect_to_master(thd, &mysql, &glob_mi))
+ {
+ net_printf(&thd->net, error = ER_CONNECT_TO_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ // now that we are connected, get all database and tables in each
+ {
+ MYSQL_RES *db_res, **table_res, **table_res_end, **cur_table_res;
+ uint num_dbs;
+
+ if (mc_mysql_query(&mysql, "show databases", 0) ||
+ !(db_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
+ goto err;
+ // in theory, the master could have no databases at all
+ // and run with skip-grant
+
+ if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
+ {
+ net_printf(&thd->net, error = ER_OUTOFMEMORY);
+ goto err;
+ }
+
+ // this is a temporary solution until we have online backup
+ // capabilities - to be replaced once online backup is working
+ // we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
+ // can to minimize the lock time
+ if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
+ mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
+ !(master_status_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+
+ // go through every table in every database, and if the replication
+ // rules allow replicating it, get it
+
+ table_res_end = table_res + num_dbs;
+
+ for(cur_table_res = table_res; cur_table_res < table_res_end;
+ cur_table_res++)
+ {
+ // since we know how many rows we have, this can never be NULL
+ MYSQL_ROW row = mc_mysql_fetch_row(db_res);
+ char* db = row[0];
+
+ /*
+ Do not replicate databases excluded by rules
+ also skip mysql database - in most cases the user will
+ mess up and not exclude mysql database with the rules when
+ he actually means to - in this case, he is up for a surprise if
+ his priv tables get dropped and downloaded from master
+ TO DO - add special option, not enabled
+ by default, to allow inclusion of mysql database into load
+ data from master
+ */
+
+ if (!db_ok(db, replicate_do_db, replicate_ignore_db) ||
+ !strcmp(db,"mysql"))
+ {
+ *cur_table_res = 0;
+ continue;
+ }
+
+ if (mysql_rm_db(thd, db, 1,1) ||
+ mysql_create_db(thd, db, 0, 1))
+ {
+ send_error(&thd->net, 0, 0);
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+ goto err;
+ }
+
+ if (mc_mysql_select_db(&mysql, db) ||
+ mc_mysql_query(&mysql, "show tables", 0) ||
+ !(*cur_table_res = mc_mysql_store_result(&mysql)))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+ goto err;
+ }
+
+ if ((error = fetch_db_tables(thd, &mysql, db, *cur_table_res)))
+ {
+ // we do not report the error - fetch_db_tables handles it
+ cleanup_mysql_results(db_res, cur_table_res, table_res);
+ goto err;
+ }
+ }
+
+ cleanup_mysql_results(db_res, cur_table_res - 1, table_res);
+
+ // adjust position in the master
+ if (master_status_res)
+ {
+ MYSQL_ROW row = mc_mysql_fetch_row(master_status_res);
+
+ /*
+ We need this check because the master may not be running with
+ log-bin, but it will still allow us to do all the steps
+ of LOAD DATA FROM MASTER - no reason to forbid it, really,
+ although it does not make much sense for the user to do it
+ */
+ if (row[0] && row[1])
+ {
+ strmake(glob_mi.log_file_name, row[0], sizeof(glob_mi.log_file_name));
+ glob_mi.pos = atoi(row[1]); // atoi() is ok, since offset is <= 1GB
+ if (glob_mi.pos < 4)
+ glob_mi.pos = 4; // don't hit the magic number
+ glob_mi.pending = 0;
+ flush_master_info(&glob_mi);
+ }
+
+ mc_mysql_free_result(master_status_res);
+ }
+
+ if (mc_mysql_query(&mysql, "UNLOCK TABLES", 0))
+ {
+ net_printf(&thd->net, error = ER_QUERY_ON_MASTER,
+ mc_mysql_error(&mysql));
+ goto err;
+ }
+ }
+
+err:
+ pthread_mutex_unlock(&LOCK_slave);
+ if (slave_was_running)
+ start_slave(0, 0);
+ mc_mysql_close(&mysql); // safe to call since we always do mc_mysql_init()
+ if (!error)
+ send_ok(&thd->net);
+
+ return error;
+}
+