summaryrefslogtreecommitdiff
path: root/sql/sql_repl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r--sql/sql_repl.cc272
1 files changed, 271 insertions, 1 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 6153c4bd0f9..2fd7f29c560 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -26,6 +26,7 @@
#include <my_dir.h>
#define SLAVE_LIST_CHUNK 128
+#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64)
extern const char* any_db;
extern pthread_handler_decl(handle_slave,arg);
@@ -38,6 +39,10 @@ bool opt_sporadic_binlog_dump_fail = 0;
static int binlog_dump_count = 0;
#endif
+static Slave_log_event* find_slave_event(IO_CACHE* log,
+ const char* log_file_name,
+ char* errmsg);
+
static uint32* slave_list_key(SLAVE_INFO* si, uint* len,
my_bool not_used __attribute__((unused)))
{
@@ -863,6 +868,272 @@ void reset_master()
}
+int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
+ const char* log_file_name2, ulonglong log_pos2)
+{
+ int res;
+ if ((res = strcmp(log_file_name1, log_file_name2)))
+ return res;
+ if (log_pos1 > log_pos2)
+ return 1;
+ else if (log_pos1 == log_pos2)
+ return 0;
+ return -1;
+}
+
+static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi)
+{
+ return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name,
+ mi->pos);
+}
+
+static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg)
+{
+ uint32 log_seq = mi->last_log_seq;
+ uint32 target_server_id = mi->server_id;
+
+ for (;;)
+ {
+ Log_event* ev;
+ if (!(ev = Log_event::read_log_event(log, 0)))
+ {
+ if (log->error > 0)
+ strmov(errmsg, "Binary log truncated in the middle of event");
+ else if(log->error < 0)
+ strmov(errmsg, "I/O error reading binary log");
+ else
+ strmov(errmsg, "Could not find target event in the binary log");
+ return 1;
+ }
+
+ if (ev->log_seq == log_seq && ev->server_id == target_server_id)
+ {
+ delete ev;
+ mi->pos = my_b_tell(log);
+ return 0;
+ }
+
+ delete ev;
+ }
+}
+
+static void copy_base_name(char* dest, char* src)
+{
+ char* p;
+ p = strrchr(src, FN_LIBCHAR);
+ if (p)
+ p++;
+ else
+ p = src;
+ strmov(dest, p);
+}
+
+int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg)
+{
+ LOG_INFO linfo;
+ char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN];
+ IO_CACHE log, last_log;
+ File file = -1, last_file = -1;
+ pthread_mutex_t *log_lock;
+ const char* errmsg_p;
+ Slave_log_event* sev = 0;
+ my_off_t last_pos = 0;
+ int error = 1;
+ int cmp_res;
+ LINT_INIT(cmp_res);
+
+ if (!mysql_bin_log.is_open())
+ {
+ strmov(errmsg,"Binary log is not open");
+ return 1;
+ }
+
+ if (!server_id_supplied)
+ {
+ strmov(errmsg, "Misconfigured master - server id was not set");
+ return 1;
+ }
+
+ linfo.index_file_offset = 0;
+ thd->current_linfo = &linfo;
+ search_file_name[0] = 0;
+
+ if (mysql_bin_log.find_first_log(&linfo, search_file_name))
+ {
+ strmov(errmsg,"Could not find first log");
+ return 1;
+ }
+
+ bzero((char*) &log,sizeof(log));
+ log_lock = mysql_bin_log.get_log_lock();
+ pthread_mutex_lock(log_lock);
+
+ for (;;)
+ {
+
+ if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0)
+ {
+ pthread_mutex_unlock(log_lock);
+ strmov(errmsg, errmsg_p);
+ goto err;
+ }
+
+ if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg)))
+ {
+ pthread_mutex_unlock(log_lock);
+ goto err;
+ }
+
+ cmp_res = cmp_master_pos(sev, mi);
+ delete sev;
+
+ if(!cmp_res)
+ {
+ pthread_mutex_unlock(log_lock);
+ copy_base_name(mi->log_file_name, linfo.log_file_name);
+ mi->pos = my_b_tell(&log);
+ goto mi_inited;
+ }
+
+ if (!last_pos && cmp_res > 0)
+ {
+ pthread_mutex_unlock(log_lock);
+ strmov(errmsg, "Slave event in first log points past the \
+target position");
+ goto err;
+ }
+
+ if (last_pos && cmp_res > 0)
+ {
+ end_io_cache(&log);
+ (void) my_close(file, MYF(MY_WME));
+ if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0,
+ MYF(MY_WME)))
+ {
+ errmsg[0] = 0;
+ goto err;
+ }
+
+ goto found_log;
+ }
+
+ strmov(last_log_name, linfo.log_file_name);
+ last_pos = my_b_tell(&log);
+
+ switch (mysql_bin_log.find_next_log(&linfo))
+ {
+ case LOG_INFO_EOF:
+ if (last_file >= 0)
+ (void)my_close(last_file, MYF(MY_WME));
+ last_file = -1;
+ goto found_log;
+ case 0:
+ break;
+ default:
+ pthread_mutex_unlock(log_lock);
+ strmov(errmsg, "Error reading log index");
+ goto err;
+ }
+
+ end_io_cache(&log);
+ if (last_file >= 0)
+ (void) my_close(last_file, MYF(MY_WME));
+ last_file = file;
+ }
+
+found_log:
+ my_b_seek(&log, last_pos);
+ if (find_target_pos(mi,&log,errmsg))
+ {
+ pthread_mutex_unlock(log_lock);
+ goto err;
+ }
+ pthread_mutex_unlock(log_lock);
+ copy_base_name(mi->log_file_name, last_log_name);
+mi_inited:
+ error = 0;
+err:
+ end_io_cache(&log);
+ pthread_mutex_lock(&LOCK_thread_count);
+ thd->current_linfo = 0;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ if (file >= 0)
+ (void) my_close(file, MYF(MY_WME));
+ if (last_file >= 0 && last_file != file)
+ (void) my_close(last_file, MYF(MY_WME));
+
+ return error;
+}
+
+// caller must delete result when done
+static Slave_log_event* find_slave_event(IO_CACHE* log,
+ const char* log_file_name,
+ char* errmsg)
+{
+ Log_event* ev;
+ if (!(ev = Log_event::read_log_event(log, 0)))
+ {
+ my_vsnprintf(errmsg, SLAVE_ERRMSG_SIZE,
+ "Error reading start event in log '%s'",
+ (char*)log_file_name);
+ return 0;
+ }
+
+ delete ev;
+
+ if (!(ev = Log_event::read_log_event(log, 0)))
+ {
+ my_vsnprintf(errmsg, SLAVE_ERRMSG_SIZE,
+ "Error reading slave event in log '%s'",
+ (char*)log_file_name);
+ return 0;
+ }
+
+ if (ev->get_type_code() != SLAVE_EVENT)
+ {
+ my_vsnprintf(errmsg, SLAVE_ERRMSG_SIZE,
+ "Second event in log '%s' is not slave event",
+ (char*)log_file_name);
+ delete ev;
+ return 0;
+ }
+
+ return (Slave_log_event*)ev;
+}
+
+int show_new_master(THD* thd)
+{
+ DBUG_ENTER("show_new_master");
+ List<Item> field_list;
+ char errmsg[SLAVE_ERRMSG_SIZE];
+ LEX_MASTER_INFO* lex_mi = &thd->lex.mi;
+
+ if (translate_master(thd, lex_mi, errmsg))
+ {
+ if (errmsg[0])
+ net_printf(&thd->net, ER_SHOW_NEW_MASTER, errmsg);
+ else
+ send_error(&thd->net, 0);
+
+ DBUG_RETURN(1);
+ }
+ else
+ {
+ String* packet = &thd->packet;
+ field_list.push_back(new Item_empty_string("Log_name", 20));
+ field_list.push_back(new Item_empty_string("Log_pos", 20));
+ if (send_fields(thd, field_list, 1))
+ DBUG_RETURN(-1);
+ packet->length(0);
+ net_store_data(packet, lex_mi->log_file_name);
+ net_store_data(packet, (longlong)lex_mi->pos);
+ if (my_net_write(&thd->net, packet->ptr(), packet->length()))
+ DBUG_RETURN(-1);
+ send_eof(&thd->net);
+ DBUG_RETURN(0);
+ }
+
+}
int show_binlog_events(THD* thd)
{
@@ -913,7 +1184,6 @@ int show_binlog_events(THD* thd)
}
pthread_mutex_lock(mysql_bin_log.get_log_lock());
-
my_b_seek(&log, pos);
for (event_count = 0;