diff options
Diffstat (limited to 'sql/sql_repl.cc')
-rw-r--r-- | sql/sql_repl.cc | 272 |
1 files changed, 271 insertions, 1 deletions
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 6153c4bd0f9..2fd7f29c560 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -26,6 +26,7 @@ #include <my_dir.h> #define SLAVE_LIST_CHUNK 128 +#define SLAVE_ERRMSG_SIZE (FN_REFLEN+64) extern const char* any_db; extern pthread_handler_decl(handle_slave,arg); @@ -38,6 +39,10 @@ bool opt_sporadic_binlog_dump_fail = 0; static int binlog_dump_count = 0; #endif +static Slave_log_event* find_slave_event(IO_CACHE* log, + const char* log_file_name, + char* errmsg); + static uint32* slave_list_key(SLAVE_INFO* si, uint* len, my_bool not_used __attribute__((unused))) { @@ -863,6 +868,272 @@ void reset_master() } +int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, + const char* log_file_name2, ulonglong log_pos2) +{ + int res; + if ((res = strcmp(log_file_name1, log_file_name2))) + return res; + if (log_pos1 > log_pos2) + return 1; + else if (log_pos1 == log_pos2) + return 0; + return -1; +} + +static inline int cmp_master_pos(Slave_log_event* sev, LEX_MASTER_INFO* mi) +{ + return cmp_master_pos(sev->master_log, sev->master_pos, mi->log_file_name, + mi->pos); +} + +static int find_target_pos(LEX_MASTER_INFO* mi, IO_CACHE* log, char* errmsg) +{ + uint32 log_seq = mi->last_log_seq; + uint32 target_server_id = mi->server_id; + + for (;;) + { + Log_event* ev; + if (!(ev = Log_event::read_log_event(log, 0))) + { + if (log->error > 0) + strmov(errmsg, "Binary log truncated in the middle of event"); + else if(log->error < 0) + strmov(errmsg, "I/O error reading binary log"); + else + strmov(errmsg, "Could not find target event in the binary log"); + return 1; + } + + if (ev->log_seq == log_seq && ev->server_id == target_server_id) + { + delete ev; + mi->pos = my_b_tell(log); + return 0; + } + + delete ev; + } +} + +static void copy_base_name(char* dest, char* src) +{ + char* p; + p = strrchr(src, FN_LIBCHAR); + if (p) + p++; + else + p = src; + strmov(dest, p); +} + +int translate_master(THD* thd, LEX_MASTER_INFO* mi, char* errmsg) +{ + LOG_INFO linfo; + char search_file_name[FN_REFLEN],last_log_name[FN_REFLEN]; + IO_CACHE log, last_log; + File file = -1, last_file = -1; + pthread_mutex_t *log_lock; + const char* errmsg_p; + Slave_log_event* sev = 0; + my_off_t last_pos = 0; + int error = 1; + int cmp_res; + LINT_INIT(cmp_res); + + if (!mysql_bin_log.is_open()) + { + strmov(errmsg,"Binary log is not open"); + return 1; + } + + if (!server_id_supplied) + { + strmov(errmsg, "Misconfigured master - server id was not set"); + return 1; + } + + linfo.index_file_offset = 0; + thd->current_linfo = &linfo; + search_file_name[0] = 0; + + if (mysql_bin_log.find_first_log(&linfo, search_file_name)) + { + strmov(errmsg,"Could not find first log"); + return 1; + } + + bzero((char*) &log,sizeof(log)); + log_lock = mysql_bin_log.get_log_lock(); + pthread_mutex_lock(log_lock); + + for (;;) + { + + if ((file=open_binlog(&log, linfo.log_file_name, &errmsg_p)) < 0) + { + pthread_mutex_unlock(log_lock); + strmov(errmsg, errmsg_p); + goto err; + } + + if (!(sev = find_slave_event(&log, linfo.log_file_name, errmsg))) + { + pthread_mutex_unlock(log_lock); + goto err; + } + + cmp_res = cmp_master_pos(sev, mi); + delete sev; + + if(!cmp_res) + { + pthread_mutex_unlock(log_lock); + copy_base_name(mi->log_file_name, linfo.log_file_name); + mi->pos = my_b_tell(&log); + goto mi_inited; + } + + if (!last_pos && cmp_res > 0) + { + pthread_mutex_unlock(log_lock); + strmov(errmsg, "Slave event in first log points past the \ +target position"); + goto err; + } + + if (last_pos && cmp_res > 0) + { + end_io_cache(&log); + (void) my_close(file, MYF(MY_WME)); + if (init_io_cache(&log, (file = last_file), IO_SIZE, READ_CACHE, 0, 0, + MYF(MY_WME))) + { + errmsg[0] = 0; + goto err; + } + + goto found_log; + } + + strmov(last_log_name, linfo.log_file_name); + last_pos = my_b_tell(&log); + + switch (mysql_bin_log.find_next_log(&linfo)) + { + case LOG_INFO_EOF: + if (last_file >= 0) + (void)my_close(last_file, MYF(MY_WME)); + last_file = -1; + goto found_log; + case 0: + break; + default: + pthread_mutex_unlock(log_lock); + strmov(errmsg, "Error reading log index"); + goto err; + } + + end_io_cache(&log); + if (last_file >= 0) + (void) my_close(last_file, MYF(MY_WME)); + last_file = file; + } + +found_log: + my_b_seek(&log, last_pos); + if (find_target_pos(mi,&log,errmsg)) + { + pthread_mutex_unlock(log_lock); + goto err; + } + pthread_mutex_unlock(log_lock); + copy_base_name(mi->log_file_name, last_log_name); +mi_inited: + error = 0; +err: + end_io_cache(&log); + pthread_mutex_lock(&LOCK_thread_count); + thd->current_linfo = 0; + pthread_mutex_unlock(&LOCK_thread_count); + if (file >= 0) + (void) my_close(file, MYF(MY_WME)); + if (last_file >= 0 && last_file != file) + (void) my_close(last_file, MYF(MY_WME)); + + return error; +} + +// caller must delete result when done +static Slave_log_event* find_slave_event(IO_CACHE* log, + const char* log_file_name, + char* errmsg) +{ + Log_event* ev; + if (!(ev = Log_event::read_log_event(log, 0))) + { + my_vsnprintf(errmsg, SLAVE_ERRMSG_SIZE, + "Error reading start event in log '%s'", + (char*)log_file_name); + return 0; + } + + delete ev; + + if (!(ev = Log_event::read_log_event(log, 0))) + { + my_vsnprintf(errmsg, SLAVE_ERRMSG_SIZE, + "Error reading slave event in log '%s'", + (char*)log_file_name); + return 0; + } + + if (ev->get_type_code() != SLAVE_EVENT) + { + my_vsnprintf(errmsg, SLAVE_ERRMSG_SIZE, + "Second event in log '%s' is not slave event", + (char*)log_file_name); + delete ev; + return 0; + } + + return (Slave_log_event*)ev; +} + +int show_new_master(THD* thd) +{ + DBUG_ENTER("show_new_master"); + List<Item> field_list; + char errmsg[SLAVE_ERRMSG_SIZE]; + LEX_MASTER_INFO* lex_mi = &thd->lex.mi; + + if (translate_master(thd, lex_mi, errmsg)) + { + if (errmsg[0]) + net_printf(&thd->net, ER_SHOW_NEW_MASTER, errmsg); + else + send_error(&thd->net, 0); + + DBUG_RETURN(1); + } + else + { + String* packet = &thd->packet; + field_list.push_back(new Item_empty_string("Log_name", 20)); + field_list.push_back(new Item_empty_string("Log_pos", 20)); + if (send_fields(thd, field_list, 1)) + DBUG_RETURN(-1); + packet->length(0); + net_store_data(packet, lex_mi->log_file_name); + net_store_data(packet, (longlong)lex_mi->pos); + if (my_net_write(&thd->net, packet->ptr(), packet->length())) + DBUG_RETURN(-1); + send_eof(&thd->net); + DBUG_RETURN(0); + } + +} int show_binlog_events(THD* thd) { @@ -913,7 +1184,6 @@ int show_binlog_events(THD* thd) } pthread_mutex_lock(mysql_bin_log.get_log_lock()); - my_b_seek(&log, pos); for (event_count = 0; |