diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 1758 |
1 files changed, 1257 insertions, 501 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index f2c29146308..e68741e7434 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1,15 +1,15 @@ /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB - + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. - + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - + You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ @@ -24,29 +24,23 @@ #include "repl_failsafe.h" #include <thr_alarm.h> #include <my_dir.h> +#include <assert.h> -volatile bool slave_running = 0; +volatile bool slave_sql_running = 0, slave_io_running = 0; char* slave_load_tmpdir = 0; -pthread_t slave_real_id; -MASTER_INFO glob_mi; -MY_BITMAP slave_error_mask; -bool use_slave_mask = 0; +MASTER_INFO main_mi; +MASTER_INFO* active_mi; +volatile int active_mi_in_use = 0; HASH replicate_do_table, replicate_ignore_table; DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table; bool do_table_inited = 0, ignore_table_inited = 0; bool wild_do_table_inited = 0, wild_ignore_table_inited = 0; bool table_rules_on = 0; -uint32 slave_skip_counter = 0; - -/* - When slave thread exits, we need to remember the temporary tables so we - can re-use them on slave start -*/ static TABLE* save_temporary_tables = 0; +// when slave thread exits, we need to remember the temporary tables so we +// can re-use them on slave start -THD* slave_thd = 0; -int last_slave_errno = 0; -char last_slave_error[MAX_SLAVE_ERRMSG] = ""; +// TODO: move the vars below under MASTER_INFO #ifndef DBUG_OFF int disconnect_slave_event_count = 0, abort_slave_event_count = 0; static int events_till_disconnect = -1; @@ -54,15 +48,17 @@ int events_till_abort = -1; static int stuck_count = 0; #endif +typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; void skip_load_data_infile(NET* net); -inline bool slave_killed(THD* thd); -static int init_slave_thread(THD* thd); +static inline bool slave_killed(THD* thd,MASTER_INFO* mi); +static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli); +static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, bool reconnect); -static int safe_sleep(THD* thd, int sec); +static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec); static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); @@ -70,6 +66,79 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi); char* rewrite_db(char* db); +void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse) +{ + bool set_io = mi->slave_running, set_sql = mi->rli.slave_running; + if (inverse) + { + /* This makes me think of the Russian idiom "I am not I, and this is + not my horse", which is used to deny reponsibility for + one's actions. + */ + set_io = !set_io; + set_sql = !set_sql; + } + register int tmp_mask=0; + if (set_io) + tmp_mask |= SLAVE_IO; + if (set_sql) + tmp_mask |= SLAVE_SQL; + *mask = tmp_mask; +} + +void lock_slave_threads(MASTER_INFO* mi) +{ + //TODO: see if we can do this without dual mutex + pthread_mutex_lock(&mi->run_lock); + pthread_mutex_lock(&mi->rli.run_lock); +} + +void unlock_slave_threads(MASTER_INFO* mi) +{ + //TODO: see if we can do this without dual mutex + pthread_mutex_unlock(&mi->rli.run_lock); + pthread_mutex_unlock(&mi->run_lock); +} + +int init_slave() +{ + // TODO (multi-master): replace this with list initialization + active_mi = &main_mi; + + // TODO: the code below is a copy-paste mess - clean it up + /* + make sure slave thread gets started if server_id is set, + valid master.info is present, and master_host has not been specified + */ + if (server_id && !master_host) + { + // TODO: re-write this to interate through the list of files + // for multi-master + char fname[FN_REFLEN+128]; + MY_STAT stat_area; + fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); + if (my_stat(fname, &stat_area, MYF(0)) && + !init_master_info(active_mi,master_info_file,relay_log_info_file)) + master_host = active_mi->host; + } + // slave thread + if (master_host) + { + if (!opt_skip_slave_start && start_slave_threads(1 /* need mutex */, + 0 /* no wait for start*/, + active_mi, + master_info_file, + relay_log_info_file, + SLAVE_IO|SLAVE_SQL + )) + sql_print_error("Warning: Can't create threads to handle slave"); + else if (opt_skip_slave_start) + if (init_master_info(active_mi, master_info_file, relay_log_info_file)) + sql_print_error("Warning: failed to initialized master info"); + } + return 0; +} + static void free_table_ent(TABLE_RULE_ENT* e) { my_free((gptr) e, MYF(0)); @@ -82,37 +151,285 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, return (byte*)e->db; } +// TODO: check proper initialization of master_log_name/master_log_pos +int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, + ulonglong pos, bool need_data_lock, + const char** errmsg) +{ + if (rli->log_pos_current) + return 0; + pthread_mutex_t *log_lock=rli->relay_log.get_log_lock(); + pthread_mutex_lock(log_lock); + if (need_data_lock) + pthread_mutex_lock(&rli->data_lock); + + if (rli->cur_log_fd >= 0) + { + end_io_cache(&rli->cache_buf); + my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + } + + if (!log) + log = rli->relay_log_name; // already inited + if (!pos) + pos = rli->relay_log_pos; // already inited + else + rli->relay_log_pos = pos; + if (rli->relay_log.find_first_log(&rli->linfo,log)) + { + *errmsg="Could not find first log during relay log initialization"; + goto err; + } + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + // to make end_io_cache(&rli->cache_buf) safe in all cases + if (!rli->inited) + bzero((char*) &rli->cache_buf, sizeof(IO_CACHE)); + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { + if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 && + check_binlog_magic(rli->cur_log,errmsg)) + { + goto err; + } + rli->cur_log_init_count=rli->cur_log->init_count; + } + else + { + if (rli->inited) + end_io_cache(&rli->cache_buf); + if (rli->cur_log_fd>=0) + my_close(rli->cur_log_fd,MYF(MY_WME)); + if ((rli->cur_log_fd=open_binlog(&rli->cache_buf, + rli->linfo.log_file_name,errmsg)) < 0) + { + goto err; + } + rli->cur_log = &rli->cache_buf; + } + if (pos > 4) + my_b_seek(rli->cur_log,(off_t)pos); + rli->log_pos_current=1; +err: + pthread_cond_broadcast(&rli->data_cond); + if (need_data_lock) + pthread_mutex_unlock(&rli->data_lock); + pthread_mutex_unlock(log_lock); + return (*errmsg) ? 1 : 0; +} -/* called from get_options() in mysqld.cc on start-up */ -void init_slave_skip_errors(char* arg) +// we assume we have a run lock on rli and that the both slave thread +// are not running +int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) { - char* p; - my_bool last_was_digit = 0; - if (bitmap_init(&slave_error_mask,MAX_SLAVE_ERROR,0)) + if (!rli->inited) + return 0; /* successfully do nothing */ + DBUG_ASSERT(rli->slave_running == 0); + DBUG_ASSERT(rli->mi->slave_running == 0); + int error=0; + rli->slave_skip_counter=0; + pthread_mutex_lock(&rli->data_lock); + rli->pending=0; + rli->master_log_name[0]=0; + rli->master_log_pos=0; // 0 means uninitialized + if (rli->relay_log.reset_logs(rli->sql_thd) || + rli->relay_log.find_first_log(&rli->linfo,"")) { - fprintf(stderr, "Badly out of memory, please check your system status\n"); - exit(1); + *errmsg = "Failed during log reset"; + error=1; + goto err; } - use_slave_mask = 1; - for (;isspace(*arg);++arg) - /* empty */; - if (!my_casecmp(arg,"all",3)) + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + rli->relay_log_pos=4; + rli->log_pos_current=0; + if (!just_reset) + error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg); +err: + pthread_mutex_unlock(&rli->data_lock); + return error; +} + +int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock) +{ + if (!mi->inited) + return 0; /* successfully do nothing */ + int error,force_all = (thread_mask & SLAVE_FORCE_ALL); + pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; + pthread_mutex_t *sql_cond_lock,*io_cond_lock; + + sql_cond_lock=sql_lock; + io_cond_lock=io_lock; + + if (skip_lock) { - bitmap_set_all(&slave_error_mask); - return; + sql_lock = io_lock = 0; } - for (p= arg ; *p; ) + if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running) { - long err_code; - if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code))) - break; - if (err_code < MAX_SLAVE_ERROR) - bitmap_set_bit(&slave_error_mask,(uint)err_code); - while (!isdigit(*p) && *p) - p++; + mi->abort_slave=1; + if ((error=terminate_slave_thread(mi->io_thd,io_lock, + io_cond_lock, + &mi->stop_cond, + &mi->slave_running)) && + !force_all) + return error; } + if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running) + { + DBUG_ASSERT(mi->rli.sql_thd != 0) ; + mi->rli.abort_slave=1; + if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock, + sql_cond_lock, + &mi->rli.stop_cond, + &mi->rli.slave_running)) && + !force_all) + return error; + } + return 0; } +int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock, + pthread_mutex_t *cond_lock, + pthread_cond_t* term_cond, + volatile bool* slave_running) +{ + if (term_lock) + { + pthread_mutex_lock(term_lock); + if (!*slave_running) + { + pthread_mutex_unlock(term_lock); + return ER_SLAVE_NOT_RUNNING; + } + } + DBUG_ASSERT(thd != 0); + KICK_SLAVE(thd); + while (*slave_running) + { + /* there is a small chance that slave thread might miss the first + alarm. To protect againts it, resend the signal until it reacts + */ + struct timespec abstime; +#ifdef HAVE_TIMESPEC_TS_SEC + abstime.ts_sec=time(NULL)+2; + abstime.ts_nsec=0; +#elif defined(__WIN__) + abstime.tv_sec=time((time_t*) 0)+2; + abstime.tv_nsec=0; +#else + struct timeval tv; + gettimeofday(&tv,0); + abstime.tv_sec=tv.tv_sec+2; + abstime.tv_nsec=tv.tv_usec*1000; +#endif + pthread_cond_timedwait(term_cond, cond_lock, &abstime); + if (*slave_running) + KICK_SLAVE(thd); + } + if (term_lock) + pthread_mutex_unlock(term_lock); + return 0; +} + +int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock, + pthread_mutex_t *cond_lock, + pthread_cond_t* start_cond, + volatile bool* slave_running, + MASTER_INFO* mi) +{ + pthread_t th; + DBUG_ASSERT(mi->inited); + if (start_lock) + pthread_mutex_lock(start_lock); + if (!server_id) + { + if (start_cond) + pthread_cond_broadcast(start_cond); + if (start_lock) + pthread_mutex_unlock(start_lock); + sql_print_error("Server id not set, will not start slave"); + return ER_BAD_SLAVE; + } + + if (*slave_running) + { + if (start_cond) + pthread_cond_broadcast(start_cond); + if (start_lock) + pthread_mutex_unlock(start_lock); + return ER_SLAVE_MUST_STOP; + } + if (pthread_create(&th, &connection_attrib, h_func, (void*)mi)) + { + if (start_lock) + pthread_mutex_unlock(start_lock); + return ER_SLAVE_THREAD; + } + if (start_cond && cond_lock) + { + THD* thd = current_thd; + while (!*slave_running) + { + const char* old_msg = thd->enter_cond(start_cond,cond_lock, + "Waiting for slave thread to start"); + pthread_cond_wait(start_cond,cond_lock); + thd->exit_cond(old_msg); + // TODO: in a very rare case of init_slave_thread failing, it is + // possible that we can get stuck here since slave_running will not + // be set. We need to change slave_running to int and have -1 as + // error code + if (thd->killed) + { + pthread_mutex_unlock(cond_lock); + return ER_SERVER_SHUTDOWN; + } + } + } + if (start_lock) + pthread_mutex_unlock(start_lock); + return 0; +} +/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make + sense to do that for starting a slave - we always care if it actually + started the threads that were not previously running +*/ +int start_slave_threads(bool need_slave_mutex, bool wait_for_start, + MASTER_INFO* mi, const char* master_info_fname, + const char* slave_info_fname, int thread_mask) +{ + pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0; + pthread_cond_t* cond_io=0,*cond_sql=0; + int error=0; + + if (need_slave_mutex) + { + lock_io = &mi->run_lock; + lock_sql = &mi->rli.run_lock; + } + if (wait_for_start) + { + cond_io = &mi->start_cond; + cond_sql = &mi->rli.start_cond; + lock_cond_io = &mi->run_lock; + lock_cond_sql = &mi->rli.run_lock; + } + if (init_master_info(mi,master_info_fname,slave_info_fname)) + return ER_MASTER_INFO; + + if ((thread_mask & SLAVE_IO) && + (error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io, + cond_io,&mi->slave_running, + mi))) + return error; + if ((thread_mask & SLAVE_SQL) && + (error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql, + cond_sql, + &mi->rli.slave_running,mi))) + return error; + return 0; +} void init_table_rule_hash(HASH* h, bool* h_inited) { @@ -133,16 +450,16 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len) { uint i; const char* key_end = key + len; - + for (i = 0; i < a->elements; i++) - { - TABLE_RULE_ENT* e ; - get_dynamic(a, (gptr)&e, i); - if (!wild_case_compare(key, key_end, (const char*)e->db, - (const char*)(e->db + e->key_len),'\\')) - return e; - } - + { + TABLE_RULE_ENT* e ; + get_dynamic(a, (gptr)&e, i); + if (!wild_case_compare(key, key_end, (const char*)e->db, + (const char*)(e->db + e->key_len),'\\')) + return e; + } + return 0; } @@ -150,10 +467,10 @@ int tables_ok(THD* thd, TABLE_LIST* tables) { for (; tables; tables = tables->next) { - char hash_key[2*NAME_LEN+2]; - char* p; if (!tables->updating) continue; + char hash_key[2*NAME_LEN+2]; + char* p; p = strmov(hash_key, tables->db ? tables->db : thd->db); *p++ = '.'; uint len = strmov(p, tables->real_name) - hash_key ; @@ -162,7 +479,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables) if (hash_search(&replicate_do_table, (byte*) hash_key, len)) return 1; } - if (ignore_table_inited) // if there are any do's + if (ignore_table_inited) // if there are any ignores { if (hash_search(&replicate_ignore_table, (byte*) hash_key, len)) return 0; @@ -175,10 +492,9 @@ int tables_ok(THD* thd, TABLE_LIST* tables) return 0; } - /* - If no explicit rule found and there was a do list, do not replicate. - If there was no do list, go ahead - */ + // if no explicit rule found + // and there was a do list, do not replicate. If there was + // no do list, go ahead return !do_table_inited && !wild_do_table_inited; } @@ -186,14 +502,12 @@ int tables_ok(THD* thd, TABLE_LIST* tables) int add_table_rule(HASH* h, const char* table_spec) { const char* dot = strchr(table_spec, '.'); - if (!dot) - return 1; + if(!dot) return 1; // len is always > 0 because we know the there exists a '.' uint len = (uint)strlen(table_spec); TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT) + len, MYF(MY_WME)); - if (!e) - return 1; + if(!e) return 1; e->db = (char*)e + sizeof(TABLE_RULE_ENT); e->tbl_name = e->db + (dot - table_spec) + 1; e->key_len = len; @@ -205,12 +519,11 @@ int add_table_rule(HASH* h, const char* table_spec) int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec) { const char* dot = strchr(table_spec, '.'); - if (!dot) return 1; + if(!dot) return 1; uint len = (uint)strlen(table_spec); TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT) + len, MYF(MY_WME)); - if (!e) - return 1; + if(!e) return 1; e->db = (char*)e + sizeof(TABLE_RULE_ENT); e->tbl_name = e->db + (dot - table_spec) + 1; e->key_len = len; @@ -222,31 +535,28 @@ int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec) static void free_string_array(DYNAMIC_ARRAY *a) { uint i; - for (i = 0; i < a->elements; i++) - { - char* p; - get_dynamic(a, (gptr) &p, i); - my_free(p, MYF(MY_WME)); - } + for(i = 0; i < a->elements; i++) + { + char* p; + get_dynamic(a, (gptr) &p, i); + my_free(p, MYF(MY_WME)); + } delete_dynamic(a); } -void end_slave() +static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/) { - pthread_mutex_lock(&LOCK_slave); - if (slave_running) - { - abort_slave = 1; - thr_alarm_kill(slave_real_id); -#ifdef SIGNAL_WITH_VIO_CLOSE - slave_thd->close_active_vio(); -#endif - while (slave_running) - pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); - } - pthread_mutex_unlock(&LOCK_slave); + end_master_info(mi); + return 0; +} - end_master_info(&glob_mi); +void end_slave() +{ + // TODO: replace the line below with + // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); + // once multi-master code is ready + terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); + end_master_info(active_mi); if (do_table_inited) hash_free(&replicate_do_table); if (ignore_table_inited) @@ -257,18 +567,29 @@ void end_slave() free_string_array(&replicate_wild_ignore_table); } -inline bool slave_killed(THD* thd) +static inline bool slave_killed(THD* thd, MASTER_INFO* mi) { - return abort_slave || abort_loop || thd->killed; + DBUG_ASSERT(mi->io_thd == thd); + DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun + return mi->abort_slave || abort_loop || thd->killed; } -void slave_print_error(int err_code, const char* msg, ...) +static inline bool slave_killed(THD* thd, RELAY_LOG_INFO* rli) +{ + DBUG_ASSERT(rli->sql_thd == thd); + DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun + return rli->abort_slave || abort_loop || thd->killed; +} + +void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...) { va_list args; va_start(args,msg); - my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args); - sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code); - last_slave_errno = err_code; + my_vsnprintf(rli->last_slave_error, + sizeof(rli->last_slave_error), msg, args); + sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error, + err_code); + rli->last_slave_errno = err_code; } void skip_load_data_infile(NET* net) @@ -281,15 +602,15 @@ void skip_load_data_infile(NET* net) char* rewrite_db(char* db) { - if (replicate_rewrite_db.is_empty() || !db) return db; + if(replicate_rewrite_db.is_empty() || !db) return db; I_List_iterator<i_string_pair> it(replicate_rewrite_db); i_string_pair* tmp; - while ((tmp=it++)) - { - if (!strcmp(tmp->key, db)) - return tmp->val; - } + while((tmp=it++)) + { + if(!strcmp(tmp->key, db)) + return tmp->val; + } return db; } @@ -297,39 +618,39 @@ char* rewrite_db(char* db) int db_ok(const char* db, I_List<i_string> &do_list, I_List<i_string> &ignore_list ) { - if (do_list.is_empty() && ignore_list.is_empty()) + if(do_list.is_empty() && ignore_list.is_empty()) return 1; // ok to replicate if the user puts no constraints // if the user has specified restrictions on which databases to replicate // and db was not selected, do not replicate - if (!db) + if(!db) return 0; - if (!do_list.is_empty()) // if the do's are not empty - { - I_List_iterator<i_string> it(do_list); - i_string* tmp; - - while ((tmp=it++)) + if(!do_list.is_empty()) // if the do's are not empty { - if (!strcmp(tmp->ptr, db)) - return 1; // match + I_List_iterator<i_string> it(do_list); + i_string* tmp; + + while((tmp=it++)) + { + if(!strcmp(tmp->ptr, db)) + return 1; // match + } + return 0; } - return 0; - } else // there are some elements in the don't, otherwise we cannot get here - { - I_List_iterator<i_string> it(ignore_list); - i_string* tmp; - - while ((tmp=it++)) { - if (!strcmp(tmp->ptr, db)) - return 0; // match - } + I_List_iterator<i_string> it(ignore_list); + i_string* tmp; - return 1; - } + while((tmp=it++)) + { + if(!strcmp(tmp->ptr, db)) + return 0; // match + } + + return 1; + } } static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f, @@ -346,7 +667,7 @@ static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f, // if we truncated a line or stopped on last char, remove all chars // up to and including newline int c; - while (((c=my_b_get(f)) != '\n' && c != my_b_EOF)); + while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF)); } return 0; } @@ -361,13 +682,13 @@ static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f, static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) { char buf[32]; - + if (my_b_gets(f, buf, sizeof(buf))) { *var = atoi(buf); return 0; } - else if (default_val) + else if(default_val) { *var = default_val; return 0; @@ -381,7 +702,7 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi) MYSQL_ROW row; const char* version; const char* errmsg = 0; - + if (mc_mysql_query(mysql, "SELECT VERSION()", 0) || !(res = mc_mysql_store_result(mysql))) { @@ -399,7 +720,7 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi) errmsg = "Master reported NULL for the version"; goto err; } - + switch (*version) { case '3': @@ -434,7 +755,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, int error= 1; handler *file; uint save_options; - + if (packet_len == packet_error) { send_error(&thd->net, ER_MASTER_NET_READ); @@ -459,7 +780,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, thd->current_tablenr = 0; thd->query_error = 0; thd->net.no_send_ok = 1; - + /* we do not want to log create table statement */ save_options = thd->options; thd->options &= ~OPTION_BIN_LOG; @@ -470,7 +791,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, mysql_parse(thd, thd->query, packet_len); // run create table thd->db = save_db; // leave things the way the were before thd->options = save_options; - + if (thd->query_error) goto err; // mysql_parse took care of the error send @@ -485,7 +806,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db, sql_print_error("create_table_from_dump: could not open created table"); goto err; } - + file = tables.table->file; thd->proc_info = "Reading master dump table data"; if (file->net_read_dump(net)) @@ -516,16 +837,16 @@ err: return error; } -int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, +int fetch_master_table(THD* thd, const char* db_name, const char* table_name, MASTER_INFO* mi, MYSQL* mysql) { int error = 1; - int nx_errno = 0; + int fetch_errno = 0; bool called_connected = (mysql != NULL); if (!called_connected && !(mysql = mc_mysql_init(NULL))) { - sql_print_error("fetch_nx_table: Error in mysql_init()"); - nx_errno = ER_GET_ERRNO; + sql_print_error("fetch_master_table: Error in mysql_init()"); + fetch_errno = ER_GET_ERRNO; goto err; } @@ -535,17 +856,17 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, { sql_print_error("Could not connect to master while fetching table\ '%-64s.%-64s'", db_name, table_name); - nx_errno = ER_CONNECT_TO_MASTER; + fetch_errno = ER_CONNECT_TO_MASTER; goto err; } } - if (slave_killed(thd)) + if (thd->killed) goto err; if (request_table_dump(mysql, db_name, table_name)) { - nx_errno = ER_GET_ERRNO; - sql_print_error("fetch_nx_table: failed on table dump request "); + fetch_errno = ER_GET_ERRNO; + sql_print_error("fetch_master_table: failed on table dump request "); goto err; } @@ -553,23 +874,24 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, table_name)) { // create_table_from_dump will have sent the error alread - sql_print_error("fetch_nx_table: failed on create table "); + sql_print_error("fetch_master_table: failed on create table "); goto err; } - error = 0; - err: if (mysql && !called_connected) mc_mysql_close(mysql); - if (nx_errno && thd->net.vio) - send_error(&thd->net, nx_errno, "Error in fetch_nx_table"); + if (fetch_errno && thd->net.vio) + send_error(&thd->net, fetch_errno, "Error in fetch_master_table"); thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump return error; } void end_master_info(MASTER_INFO* mi) { + if (!mi->inited) + return; + end_relay_log_info(&mi->rli); if (mi->fd >= 0) { end_io_cache(&mi->file); @@ -579,23 +901,138 @@ void end_master_info(MASTER_INFO* mi) mi->inited = 0; } -int init_master_info(MASTER_INFO* mi) +int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) +{ + if (rli->inited) + return 0; + MY_STAT stat_area; + char fname[FN_REFLEN+128]; + int info_fd; + const char* msg = 0; + int error = 0; + + fn_format(fname, info_fname, + mysql_data_home, "", 4+32); + pthread_mutex_lock(&rli->data_lock); + info_fd = rli->info_fd; + rli->pending = 0; + rli->cur_log_fd = -1; + rli->slave_skip_counter=0; + rli->log_pos_current=0; + // TODO: make this work with multi-master + if (!opt_relay_logname) + { + char tmp[FN_REFLEN]; + /* TODO: The following should be using fn_format(); We just need to + first change fn_format() to cut the file name if it's too long. + */ + strmake(tmp,glob_hostname,FN_REFLEN-5); + strmov(strcend(tmp,'.'),"-relay-bin"); + opt_relay_logname=my_strdup(tmp,MYF(MY_WME)); + } + rli->relay_log.set_index_file_name(opt_relaylog_index_name); + open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin", + LOG_BIN, 1 /* read_append cache */, + 1 /* no auto events*/); + + /* if file does not exist */ + if (!my_stat(fname, &stat_area, MYF(0))) + { + // if someone removed the file from underneath our feet, just close + // the old descriptor and re-create the old file + if (info_fd >= 0) + my_close(info_fd, MYF(MY_WME)); + if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 + || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, + MYF(MY_WME))) + { + if(info_fd >= 0) + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; + } + if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg)) + goto err; + rli->master_log_pos = 0; // uninitialized + rli->info_fd = info_fd; + } + else // file exists + { + if(info_fd >= 0) + reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0); + else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 + || init_io_cache(&rli->info_file, info_fd, + IO_SIZE*2, READ_CACHE, 0L, + 0, MYF(MY_WME))) + { + if (info_fd >= 0) + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; + } + + rli->info_fd = info_fd; + if (init_strvar_from_file(rli->relay_log_name, + sizeof(rli->relay_log_name), &rli->info_file, + (char*)"") || + init_intvar_from_file((int*)&rli->relay_log_pos, + &rli->info_file, 4) || + init_strvar_from_file(rli->master_log_name, + sizeof(rli->master_log_name), &rli->info_file, + (char*)"") || + init_intvar_from_file((int*)&rli->master_log_pos, + &rli->info_file, 0)) + { + msg="Error reading slave log configuration"; + goto err; + } + if (init_relay_log_pos(rli,0 /*log already inited*/, + 0 /*pos already inited*/, + 0 /* no data lock*/, + &msg)) + goto err; + } + DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + rli->inited = 1; + // now change the cache from READ to WRITE - must do this + // before flush_relay_log_info + reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); + error=test(flush_relay_log_info(rli)); + pthread_mutex_unlock(&rli->data_lock); + return error; + +err: + sql_print_error(msg); + end_io_cache(&rli->info_file); + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; +} + +int init_master_info(MASTER_INFO* mi, const char* master_info_fname, + const char* slave_info_fname) { if (mi->inited) return 0; + if (init_relay_log_info(&mi->rli, slave_info_fname)) + return 1; + mi->rli.mi = mi; int fd,length,error; MY_STAT stat_area; char fname[FN_REFLEN+128]; const char *msg; - fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + fn_format(fname, master_info_fname, mysql_data_home, "", 4+32); // we need a mutex while we are changing master info parameters to // keep other threads from reading bogus info - pthread_mutex_lock(&mi->lock); - mi->pending = 0; + pthread_mutex_lock(&mi->data_lock); fd = mi->fd; - + // we do not want any messages if the file does not exist if (!my_stat(fname, &stat_area, MYF(0))) { @@ -607,15 +1044,17 @@ int init_master_info(MASTER_INFO* mi) || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0, MYF(MY_WME))) { - if (fd >= 0) + if(fd >= 0) my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + end_relay_log_info(&mi->rli); + pthread_mutex_unlock(&mi->data_lock); return 1; } - mi->log_file_name[0] = 0; - mi->pos = 4; // skip magic number + mi->master_log_name[0] = 0; + mi->master_log_pos = 4; // skip magic number mi->fd = fd; - + if (master_host) strmake(mi->host, master_host, sizeof(mi->host) - 1); if (master_user) @@ -627,36 +1066,27 @@ int init_master_info(MASTER_INFO* mi) } else // file exists { - if (fd >= 0) + if(fd >= 0) reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0); - else if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 + else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME))) { - if (fd >= 0) + if(fd >= 0) my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + end_relay_log_info(&mi->rli); + pthread_mutex_unlock(&mi->data_lock); return 1; } - if ((length=my_b_gets(&mi->file, mi->log_file_name, - sizeof(mi->log_file_name))) < 1) - { - msg="Error reading log file name from master info file "; - goto error; - } - - mi->log_file_name[length-1]= 0; // kill \n - /* Reuse fname buffer */ - if (!my_b_gets(&mi->file, fname, sizeof(fname))) - { - msg="Error reading log file position from master info file"; - goto error; - } - mi->pos = strtoull(fname,(char**) 0, 10); - mi->fd = fd; - if (init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, + if (init_strvar_from_file(mi->master_log_name, + sizeof(mi->master_log_name), &mi->file, + (char*)"") || + init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4) + || + init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, master_host) || init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file, master_user) || @@ -664,28 +1094,30 @@ int init_master_info(MASTER_INFO* mi) master_password) || init_intvar_from_file((int*)&mi->port, &mi->file, master_port) || init_intvar_from_file((int*)&mi->connect_retry, &mi->file, - master_connect_retry) || - init_intvar_from_file((int*)&mi->last_log_seq, &mi->file, 0) + master_connect_retry) ) { msg="Error reading master configuration"; - goto error; + goto err; } } - + mi->inited = 1; // now change the cache from READ to WRITE - must do this // before flush_master_info reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1); error=test(flush_master_info(mi)); - pthread_mutex_unlock(&mi->lock); + pthread_mutex_unlock(&mi->data_lock); return error; -error: +err: sql_print_error(msg); end_io_cache(&mi->file); + end_relay_log_info(&mi->rli); + DBUG_ASSERT(fd>=0); my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + pthread_mutex_unlock(&mi->data_lock); return 1; } @@ -696,7 +1128,7 @@ int register_slave_on_master(MYSQL* mysql) if (!report_host) return 0; - + int4store(buf, server_id); packet.append(buf, 4); @@ -705,8 +1137,8 @@ int register_slave_on_master(MYSQL* mysql) net_store_data(&packet, report_user); else packet.append((char)0); - - if (report_password) + + if(report_password) net_store_data(&packet, report_user); else packet.append((char)0); @@ -729,51 +1161,62 @@ int register_slave_on_master(MYSQL* mysql) return 0; } - -int show_master_info(THD* thd) +int show_master_info(THD* thd, MASTER_INFO* mi) { + // TODO: fix this for multi-master DBUG_ENTER("show_master_info"); List<Item> field_list; field_list.push_back(new Item_empty_string("Master_Host", - sizeof(glob_mi.host))); + sizeof(mi->host))); field_list.push_back(new Item_empty_string("Master_User", - sizeof(glob_mi.user))); + sizeof(mi->user))); field_list.push_back(new Item_empty_string("Master_Port", 6)); field_list.push_back(new Item_empty_string("Connect_retry", 6)); - field_list.push_back( new Item_empty_string("Log_File", + field_list.push_back(new Item_empty_string("Master_Log_File", + FN_REFLEN)); + field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12)); + field_list.push_back(new Item_empty_string("Relay_Log_File", FN_REFLEN)); - field_list.push_back(new Item_empty_string("Pos", 12)); - field_list.push_back(new Item_empty_string("Slave_Running", 3)); + field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12)); + field_list.push_back(new Item_empty_string("Relay_Master_Log_File", + FN_REFLEN)); + field_list.push_back(new Item_empty_string("Slave_IO_Running", 3)); + field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3)); field_list.push_back(new Item_empty_string("Replicate_do_db", 20)); field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20)); field_list.push_back(new Item_empty_string("Last_errno", 4)); field_list.push_back(new Item_empty_string("Last_error", 20)); field_list.push_back(new Item_empty_string("Skip_counter", 12)); - field_list.push_back(new Item_empty_string("Last_log_seq", 12)); - if (send_fields(thd, field_list, 1)) + field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12)); + if(send_fields(thd, field_list, 1)) DBUG_RETURN(-1); String* packet = &thd->packet; - uint32 last_log_seq; packet->length(0); - - pthread_mutex_lock(&glob_mi.lock); - net_store_data(packet, glob_mi.host); - net_store_data(packet, glob_mi.user); - net_store_data(packet, (uint32) glob_mi.port); - net_store_data(packet, (uint32) glob_mi.connect_retry); - net_store_data(packet, glob_mi.log_file_name); - net_store_data(packet, (longlong) glob_mi.pos); - last_log_seq = glob_mi.last_log_seq; - pthread_mutex_unlock(&glob_mi.lock); - net_store_data(packet, slave_running ? "Yes":"No"); + + pthread_mutex_lock(&mi->data_lock); + pthread_mutex_lock(&mi->rli.data_lock); + net_store_data(packet, mi->host); + net_store_data(packet, mi->user); + net_store_data(packet, (uint32) mi->port); + net_store_data(packet, (uint32) mi->connect_retry); + net_store_data(packet, mi->master_log_name); + net_store_data(packet, (longlong) mi->master_log_pos); + net_store_data(packet, mi->rli.relay_log_name + + dirname_length(mi->rli.relay_log_name)); + net_store_data(packet, (longlong) mi->rli.relay_log_pos); + net_store_data(packet, mi->rli.master_log_name); + net_store_data(packet, mi->slave_running ? "Yes":"No"); + net_store_data(packet, mi->rli.slave_running ? "Yes":"No"); net_store_data(packet, &replicate_do_db); net_store_data(packet, &replicate_ignore_db); - net_store_data(packet, (uint32)last_slave_errno); - net_store_data(packet, last_slave_error); - net_store_data(packet, slave_skip_counter); - net_store_data(packet, last_log_seq); - + net_store_data(packet, (uint32)mi->rli.last_slave_errno); + net_store_data(packet, mi->rli.last_slave_error); + net_store_data(packet, mi->rli.slave_skip_counter); + net_store_data(packet, (longlong)mi->rli.master_log_pos); + pthread_mutex_unlock(&mi->rli.data_lock); + pthread_mutex_unlock(&mi->data_lock); + if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length())) DBUG_RETURN(-1); @@ -785,59 +1228,64 @@ int flush_master_info(MASTER_INFO* mi) { IO_CACHE* file = &mi->file; char lbuf[22]; - char lbuf1[22]; - + my_b_seek(file, 0L); my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n", - mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, - mi->password, mi->port, mi->connect_retry, - llstr(mi->last_log_seq, lbuf1)); + mi->master_log_name, llstr(mi->master_log_pos, lbuf), + mi->host, mi->user, + mi->password, mi->port, mi->connect_retry + ); flush_io_cache(file); return 0; } - -int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos) +/* TODO: the code below needs to be re-written almost from scratch + Main issue is how to find out if we have reached a certain position + in the master log my knowing the offset in the relay log. + */ +int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, + ulonglong log_pos) { if (!inited) return -1; - bool pos_reached; + bool pos_reached = 0; int event_count = 0; - pthread_mutex_lock(&lock); + pthread_mutex_lock(&data_lock); while (!thd->killed) { int cmp_result; - if (*log_file_name) + DBUG_ASSERT(*master_log_name || master_log_pos == 0); + if (*master_log_name) { /* We should use dirname_length() here when we have a version of this that doesn't modify the argument */ - char *basename = strrchr(log_file_name, FN_LIBCHAR); + char *basename = strrchr(master_log_name, FN_LIBCHAR); if (basename) ++basename; else - basename = log_file_name; + basename = master_log_name; cmp_result = strncmp(basename, log_name->ptr(), log_name->length()); } else cmp_result = 0; - - pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0); + + pos_reached = ((!cmp_result && master_log_pos >= log_pos) || + cmp_result > 0); if (pos_reached || thd->killed) break; - - const char* msg = thd->enter_cond(&cond, &lock, + + const char* msg = thd->enter_cond(&data_cond, &data_lock, "Waiting for master update"); - pthread_cond_wait(&cond, &lock); + pthread_cond_wait(&data_cond, &data_lock); thd->exit_cond(msg); event_count++; } - pthread_mutex_unlock(&lock); + pthread_mutex_unlock(&data_lock); return thd->killed ? -1 : event_count; } - -static int init_slave_thread(THD* thd) +static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) { DBUG_ENTER("init_slave_thread"); thd->system_thread = thd->bootstrap = 1; @@ -851,7 +1299,7 @@ static int init_slave_thread(THD* thd) thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ; thd->system_thread = 1; thd->client_capabilities = CLIENT_LOCAL_FILES; - slave_real_id=thd->real_id=pthread_self(); + thd->real_id=pthread_self(); pthread_mutex_lock(&LOCK_thread_count); thd->thread_id = thread_id++; pthread_mutex_unlock(&LOCK_thread_count); @@ -861,7 +1309,6 @@ static int init_slave_thread(THD* thd) my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) || my_pthread_setspecific_ptr(THR_NET, &thd->net)) { - close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed? end_thread(thd,0); DBUG_RETURN(-1); } @@ -878,14 +1325,21 @@ static int init_slave_thread(THD* thd) if (thd->max_join_size == (ulong) ~0L) thd->options |= OPTION_BIG_SELECTS; - thd->proc_info="Waiting for master update"; + if (thd_type == SLAVE_THD_SQL) + { + thd->proc_info = "Waiting for the next event in slave queue"; + } + else + { + thd->proc_info="Waiting for master update"; + } thd->version=refresh_version; thd->set_time(); DBUG_RETURN(0); } -static int safe_sleep(THD* thd, int sec) +static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec) { thr_alarm_t alarmed; thr_alarm_init(&alarmed); @@ -907,22 +1361,21 @@ static int safe_sleep(THD* thd, int sec) // so it will not wake up the wife and kids :-) if (thr_alarm_in_use(&alarmed)) thr_end_alarm(&alarmed); - - if (slave_killed(thd)) + + if (slave_killed(thd,mi)) return 1; start_time=time((time_t*) 0); } return 0; } - static int request_dump(MYSQL* mysql, MASTER_INFO* mi) { char buf[FN_REFLEN + 10]; int len; int binlog_flags = 0; // for now - char* logname = mi->log_file_name; - int4store(buf, mi->pos); + char* logname = mi->master_log_name; + int4store(buf, mi->master_log_pos); int2store(buf + 4, binlog_flags); int4store(buf + 6, server_id); len = (uint) strlen(logname); @@ -940,25 +1393,24 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi) return 0; } - static int request_table_dump(MYSQL* mysql, const char* db, const char* table) { char buf[1024]; char * p = buf; uint table_len = (uint) strlen(table); uint db_len = (uint) strlen(db); - if (table_len + db_len > sizeof(buf) - 2) - { - sql_print_error("request_table_dump: Buffer overrun"); - return 1; - } - + if(table_len + db_len > sizeof(buf) - 2) + { + sql_print_error("request_table_dump: Buffer overrun"); + return 1; + } + *p++ = db_len; memcpy(p, db, db_len); p += db_len; *p++ = table_len; memcpy(p, table, table_len); - + if (mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1)) { sql_print_error("request_table_dump: Error sending the table dump \ @@ -969,7 +1421,6 @@ command"); return 0; } - static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) { ulong len = packet_error; @@ -983,14 +1434,14 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) if (disconnect_slave_event_count && !(events_till_disconnect--)) return packet_error; #endif - - while (!abort_loop && !abort_slave && len == packet_error && + + while (!abort_loop && !mi->abort_slave && len == packet_error && read_errno == EINTR ) { len = mc_net_safe_read(mysql); read_errno = errno; } - if (abort_loop || abort_slave) + if (abort_loop || mi->abort_slave) return packet_error; if (len == packet_error || (long) len < 1) { @@ -1002,77 +1453,85 @@ server_errno=%d)", if (len == 1) { - sql_print_error("Slave: received 0 length packet from server, apparent \ -master shutdown: %s (%d)", + sql_print_error("Slave: received 0 length packet from server, apparent\ + master shutdown: %s (%d)", mc_mysql_error(mysql), read_errno); return packet_error; } - + DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n", len, mysql->net.read_pos[4])); return len - 1; } -int check_expected_error(THD* thd, int expected_error) +int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error) { - switch (expected_error) { - case ER_NET_READ_ERROR: - case ER_NET_ERROR_ON_WRITE: - case ER_SERVER_SHUTDOWN: - case ER_NEW_ABORTING_CONNECTION: - my_snprintf(last_slave_error, sizeof(last_slave_error),"\ -Slave: query '%s' partially completed on the master \ + switch (expected_error) + { + case ER_NET_READ_ERROR: + case ER_NET_ERROR_ON_WRITE: + case ER_SERVER_SHUTDOWN: + case ER_NEW_ABORTING_CONNECTION: + my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error), + "Slave: query '%s' partially completed on the master \ and was aborted. There is a chance that your master is inconsistent at this \ -point. If you are sure that your master is ok, run this query manually on the \ -slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\ -SLAVE START;", thd->query); - last_slave_errno = expected_error; - sql_print_error("%s",last_slave_error); - return 1; - default: - return 0; - } +point. If you are sure that your master is ok, run this query manually on the\ + slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\ + SLAVE START;", thd->query); + rli->last_slave_errno = expected_error; + sql_print_error("%s",rli->last_slave_error); + return 1; + default: + return 0; + } } - - -static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) +static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) { const char *error_msg; - Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, - event_len, &error_msg, - mi->old_format); + DBUG_ASSERT(rli->sql_thd==thd); + Log_event * ev = next_event(rli); + DBUG_ASSERT(rli->sql_thd==thd); + if (slave_killed(thd,rli)) + return 1; if (ev) { int type_code = ev->get_type_code(); int exec_res; + pthread_mutex_lock(&rli->data_lock); if (ev->server_id == ::server_id || - (slave_skip_counter && type_code != ROTATE_EVENT)) + (rli->slave_skip_counter && type_code != ROTATE_EVENT)) { - if (type_code == LOAD_EVENT) - skip_load_data_infile(net); - - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - if (slave_skip_counter && /* protect against common user error of + /* + TODO: I/O thread must handle skipping file delivery for + old load data infile events + */ + /* TODO: I/O thread should not even log events with the same server id */ + rli->inc_pos(ev->get_event_len(), + type_code != STOP_EVENT ? ev->log_pos : 0, + 1/* skip lock*/); + flush_relay_log_info(rli); + if (rli->slave_skip_counter && /* protect against common user error of setting the counter to 1 instead of 2 while recovering from an failed auto-increment insert */ - !(type_code == INTVAR_EVENT && - slave_skip_counter == 1)) - --slave_skip_counter; + !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) && + rli->slave_skip_counter == 1)) + --rli->slave_skip_counter; + pthread_mutex_unlock(&rli->data_lock); delete ev; return 0; // avoid infinite update loops } - + pthread_mutex_unlock(&rli->data_lock); + thd->server_id = ev->server_id; // use the original server id for logging thd->set_time(); // time the query - if (!thd->log_seq) - thd->log_seq = ev->log_seq; if (!ev->when) ev->when = time(NULL); ev->thd = thd; - exec_res = ev->exec_event(mi); + thd->log_pos = ev->log_pos; + exec_res = ev->exec_event(rli); + DBUG_ASSERT(rli->sql_thd==thd); delete ev; return exec_res; } @@ -1086,277 +1545,410 @@ This may also be a network problem, or just a bug in the master or slave code.\ } } -// slave thread - -pthread_handler_decl(handle_slave,arg __attribute__((unused))) +/* slave I/O thread */ +pthread_handler_decl(handle_slave_io,arg) { #ifndef DBUG_OFF -slave_begin: + slave_begin: #endif THD *thd; // needs to be first for thread_stack MYSQL *mysql = NULL ; + MASTER_INFO* mi = (MASTER_INFO*)arg; char llbuff[22]; bool retried_once = 0; - ulonglong last_failed_pos = 0; - - pthread_mutex_lock(&LOCK_slave); - if (!server_id) - { - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - sql_print_error("Server id not set, will not start slave"); - pthread_exit((void*)1); - } - - if (slave_running) - { - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - pthread_exit((void*)1); // safety just in case - } - slave_running = 1; - abort_slave = 0; + ulonglong last_failed_pos = 0; // TODO: see if last_failed_pos is needed + DBUG_ASSERT(mi->inited); + + pthread_mutex_lock(&mi->run_lock); #ifndef DBUG_OFF - events_till_abort = abort_slave_event_count; + mi->events_till_abort = abort_slave_event_count; #endif - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - + // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); - slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ ! - DBUG_ENTER("handle_slave"); + thd = new THD; // note that contructor of THD uses DBUG_ ! + DBUG_ENTER("handle_slave_io"); pthread_detach_this_thread(); - if (init_slave_thread(thd) || init_master_info(&glob_mi)) - { - sql_print_error("Failed during slave thread initialization"); - goto err; - } + if (init_slave_thread(thd, SLAVE_THD_IO)) + { + pthread_cond_broadcast(&mi->start_cond); + pthread_mutex_unlock(&mi->run_lock); + sql_print_error("Failed during slave I/O thread initialization"); + goto err; + } + mi->io_thd = thd; thd->thread_stack = (char*)&thd; // remember where our stack is - thd->temporary_tables = save_temporary_tables; // restore temp tables threads.append(thd); - glob_mi.pending = 0; //this should always be set to 0 when the slave thread - // is started - + mi->slave_running = 1; + mi->abort_slave = 0; + pthread_cond_broadcast(&mi->start_cond); + pthread_mutex_unlock(&mi->run_lock); + DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", - glob_mi.log_file_name, llstr(glob_mi.pos,llbuff))); - - + mi->master_log_name, llstr(mi->master_log_pos,llbuff))); + if (!(mysql = mc_mysql_init(NULL))) { - sql_print_error("Slave thread: error in mc_mysql_init()"); + sql_print_error("Slave I/O thread: error in mc_mysql_init()"); goto err; } - + thd->proc_info = "connecting to master"; #ifndef DBUG_OFF - sql_print_error("Slave thread initialized"); + sql_print_error("Slave I/O thread initialized"); #endif // we can get killed during safe_connect - if (!safe_connect(thd, mysql, &glob_mi)) - sql_print_error("Slave: connected to master '%s@%s:%d',\ - replication started in log '%s' at position %s", glob_mi.user, - glob_mi.host, glob_mi.port, - RPL_LOG_NAME, - llstr(glob_mi.pos,llbuff)); + if (!safe_connect(thd, mysql, mi)) + sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\ + replication started in log '%s' at position %s", mi->user, + mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); else { - sql_print_error("Slave thread killed while connecting to master"); + sql_print_error("Slave I/O thread killed while connecting to master"); goto err; } connected: thd->slave_net = &mysql->net; - // register ourselves with the master - // if fails, this is not fatal - we just print the error message and go - // on with life thd->proc_info = "Checking master version"; - if (check_master_version(mysql, &glob_mi)) + if (check_master_version(mysql, mi)) { goto err; } - if (!glob_mi.old_format) + if (!mi->old_format) { + // register ourselves with the master + // if fails, this is not fatal - we just print the error message and go + // on with life thd->proc_info = "Registering slave on master"; if (register_slave_on_master(mysql) || update_slave_list(mysql)) goto err; } - - while (!slave_killed(thd)) + + while (!slave_killed(thd,mi)) { - thd->proc_info = "Requesting binlog dump"; - if (request_dump(mysql, &glob_mi)) - { - sql_print_error("Failed on request_dump()"); - if (slave_killed(thd)) - { - sql_print_error("Slave thread killed while requesting master \ + thd->proc_info = "Requesting binlog dump"; + if (request_dump(mysql, mi)) + { + sql_print_error("Failed on request_dump()"); + if(slave_killed(thd,mi)) + { + sql_print_error("Slave I/O thread killed while requesting master \ dump"); - goto err; - } - - thd->proc_info = "Waiiting to reconnect after a failed dump request"; - if (mysql->net.vio) - vio_close(mysql->net.vio); - // first time retry immediately, assuming that we can recover - // right away - if first time fails, sleep between re-tries - // hopefuly the admin can fix the problem sometime - if (retried_once) - safe_sleep(thd, glob_mi.connect_retry); - else - retried_once = 1; - - if (slave_killed(thd)) - { - sql_print_error("Slave thread killed while retrying master \ + goto err; + } + + thd->proc_info = "Waiiting to reconnect after a failed dump request"; + mc_end_server(mysql); + // first time retry immediately, assuming that we can recover + // right away - if first time fails, sleep between re-tries + // hopefuly the admin can fix the problem sometime + if (retried_once) + safe_sleep(thd, mi, mi->connect_retry); + else + retried_once = 1; + + if (slave_killed(thd,mi)) + { + sql_print_error("Slave I/O thread killed while retrying master \ dump"); - goto err; - } - - thd->proc_info = "Reconnecting after a failed dump request"; - last_failed_pos=glob_mi.pos; - sql_print_error("Slave: failed dump request, reconnecting to \ -try again, log '%s' at postion %s", RPL_LOG_NAME, - llstr(last_failed_pos,llbuff)); - if (safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd)) - { - sql_print_error("Slave thread killed during or after reconnect"); - goto err; - } - - goto connected; - } - - while (!slave_killed(thd)) - { - thd->proc_info = "Reading master update"; - ulong event_len = read_event(mysql, &glob_mi); - if (slave_killed(thd)) - { - sql_print_error("Slave thread killed while reading event"); - goto err; - } - + goto err; + } + + thd->proc_info = "Reconnecting after a failed dump request"; + sql_print_error("Slave I/O thread: failed dump request, \ +reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); + if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi)) + { + sql_print_error("Slave I/O thread killed during or \ +after reconnect"); + goto err; + } + + goto connected; + } - if (event_len == packet_error) - { - if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE) + while (!slave_killed(thd,mi)) { - sql_print_error("Log entry on master is longer than \ + thd->proc_info = "Reading master update"; + ulong event_len = read_event(mysql, mi); + if (slave_killed(thd,mi)) + { + sql_print_error("Slave I/O thread killed while reading event"); + goto err; + } + + if (event_len == packet_error) + { + if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE) + { + sql_print_error("Log entry on master is longer than \ max_allowed_packet on slave. Slave thread will be aborted. If the entry is \ really supposed to be that long, restart the server with a higher value of \ max_allowed_packet. The current value is %ld", max_allowed_packet); - goto err; - } - - thd->proc_info = "Waiting to reconnect after a failed read"; - if (mysql->net.vio) - vio_close(mysql->net.vio); - if (retried_once) // punish repeat offender with sleep - safe_sleep(thd, glob_mi.connect_retry); - else - retried_once = 1; - - if (slave_killed(thd)) - { - sql_print_error("Slave thread killed while waiting to \ + goto err; + } + + thd->proc_info = "Waiting to reconnect after a failed read"; + mc_end_server(mysql); + if (retried_once) // punish repeat offender with sleep + safe_sleep(thd,mi,mi->connect_retry); + else + retried_once = 1; + + if (slave_killed(thd,mi)) + { + sql_print_error("Slave I/O thread killed while waiting to \ reconnect after a failed read"); - goto err; - } - thd->proc_info = "Reconnecting after a failed read"; - last_failed_pos= glob_mi.pos; - sql_print_error("Slave: Failed reading log event, \ -reconnecting to retry, log '%s' position %s", RPL_LOG_NAME, - llstr(last_failed_pos, llbuff)); - if (safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd)) - { - sql_print_error("Slave thread killed during or after a \ + goto err; + } + thd->proc_info = "Reconnecting after a failed read"; + sql_print_error("Slave I/O thread: Failed reading log event, \ +reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME, + llstr(mi->master_log_pos, llbuff)); + if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi)) + { + sql_print_error("Slave I/O thread killed during or after a \ reconnect done to recover from failed read"); - goto err; - } - - goto connected; - } // if (event_len == packet_error) - - thd->proc_info = "Processing master log event"; - if (exec_event(thd, &mysql->net, &glob_mi, event_len)) - { - sql_print_error("\ -Error running query, slave aborted. Fix the problem, and re-start \ -the slave thread with \"mysqladmin start-slave\". We stopped at log \ -'%s' position %s", - RPL_LOG_NAME, llstr(glob_mi.pos, llbuff)); - goto err; - // there was an error running the query - // abort the slave thread, when the problem is fixed, the user - // should restart the slave with mysqladmin start-slave - } + goto err; + } + goto connected; + } // if(event_len == packet_error) + + thd->proc_info = "Queueing event from master"; + if (queue_event(mi,(const char*)mysql->net.read_pos + 1, + (uint)event_len)) + { + sql_print_error("Slave I/O thread could not queue event \ +from master"); + goto err; + } + // TODO: check debugging abort code #ifndef DBUG_OFF - if (abort_slave_event_count && !--events_till_abort) - { - sql_print_error("Slave: debugging abort"); - goto err; - } + if (abort_slave_event_count && !--events_till_abort) + { + sql_print_error("Slave I/O thread: debugging abort"); + goto err; + } #endif + } // while(!slave_killed(thd,mi)) - read/exec loop + } // while(!slave_killed(thd,mi)) - slave loop - // successful exec with offset advance, - // the slave repents and his sins are forgiven! - if (glob_mi.pos > last_failed_pos) - { - retried_once = 0; + // error = 0; + err: + // print the current replication position + sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); + thd->query = thd->db = 0; // extra safety + if(mysql) + mc_mysql_close(mysql); + thd->proc_info = "Waiting for slave mutex on exit"; + pthread_mutex_lock(&mi->run_lock); + mi->slave_running = 0; + mi->io_thd = 0; + // TODO: make rpl_status part of MASTER_INFO + change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); + mi->abort_slave = 0; // TODO: check if this is needed + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); // destructor will not free it, because we are weird + pthread_mutex_lock(&LOCK_thread_count); + delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done + pthread_mutex_unlock(&mi->run_lock); + my_thread_end(); #ifndef DBUG_OFF - stuck_count = 0; -#endif - } + if(abort_slave_event_count && !events_till_abort) + goto slave_begin; +#endif + pthread_exit(0); + DBUG_RETURN(0); // Can't return anything here +} + +/* slave SQL logic thread */ + +pthread_handler_decl(handle_slave_sql,arg) +{ #ifndef DBUG_OFF - else - { - // show a little mercy, allow slave to read one more event - // before cutting him off - otherwise he gets stuck - // on Intvar events, since they do not advance the offset - // immediately - if (++stuck_count > 2) - events_till_disconnect++; - } -#endif - } // while (!slave_killed(thd)) - read/exec loop - } // while (!slave_killed(thd)) - slave loop + slave_begin: +#endif + THD *thd; /* needs to be first for thread_stack */ + MYSQL *mysql = NULL ; + bool retried_once = 0; + ulonglong last_failed_pos = 0; // TODO: see if this can be removed + char llbuff[22],llbuff1[22]; + RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; + const char* errmsg=0; + DBUG_ASSERT(rli->inited); + pthread_mutex_lock(&rli->run_lock); + DBUG_ASSERT(!rli->slave_running); +#ifndef DBUG_OFF + rli->events_till_abort = abort_slave_event_count; +#endif + + + // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff + my_thread_init(); + thd = new THD; // note that contructor of THD uses DBUG_ ! + DBUG_ENTER("handle_slave_sql"); + + pthread_detach_this_thread(); + if (init_slave_thread(thd, SLAVE_THD_SQL)) + { + // TODO: this is currently broken - slave start and change master + // will be stuck if we fail here + pthread_cond_broadcast(&rli->start_cond); + pthread_mutex_unlock(&rli->run_lock); + sql_print_error("Failed during slave thread initialization"); + goto err; + } + thd->thread_stack = (char*)&thd; // remember where our stack is + thd->temporary_tables = rli->save_temporary_tables; // restore temp tables + threads.append(thd); + rli->sql_thd = thd; + rli->slave_running = 1; + rli->abort_slave = 0; + pthread_cond_broadcast(&rli->start_cond); + pthread_mutex_unlock(&rli->run_lock); + rli->pending = 0; //this should always be set to 0 when the slave thread + // is started + if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg)) + { + sql_print_error("Error initializing relay log position: %s", + errmsg); + goto err; + } + DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + + DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", + rli->master_log_name, llstr(rli->master_log_pos,llbuff))); + DBUG_ASSERT(rli->sql_thd == thd); + sql_print_error("Slave SQL thread initialized, starting replication in \ +log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME, + llstr(rli->master_log_pos,llbuff),rli->relay_log_name, + llstr(rli->relay_log_pos,llbuff1)); + while (!slave_killed(thd,rli)) + { + thd->proc_info = "Processing master log event"; + DBUG_ASSERT(rli->sql_thd == thd); + if (exec_relay_log_event(thd,rli)) + { + // do not scare the user if SQL thread was simply killed or stopped + if (!slave_killed(thd,rli)) + sql_print_error("\ +Error running query, slave SQL thread aborted. Fix the problem, and restart \ +the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \ +'%s' position %s", + RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); + goto err; + } + } // while(!slave_killed(thd,rli)) - read/exec loop // error = 0; -err: + err: // print the current replication position - sql_print_error("Slave thread exiting, replication stopped in log '%s' at \ -position %s", - RPL_LOG_NAME, llstr(glob_mi.pos,llbuff)); + sql_print_error("Slave SQL thread exiting, replication stopped in log \ + '%s' at position %s", + RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff)); thd->query = thd->db = 0; // extra safety - if (mysql) - mc_mysql_close(mysql); thd->proc_info = "Waiting for slave mutex on exit"; - pthread_mutex_lock(&LOCK_slave); - slave_running = 0; - change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); - abort_slave = 0; - save_temporary_tables = thd->temporary_tables; + pthread_mutex_lock(&rli->run_lock); + DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun + rli->slave_running = 0; + rli->save_temporary_tables = thd->temporary_tables; + //TODO: see if we can do this conditionally in next_event() instead + // to avoid unneeded position re-init + rli->log_pos_current=0; thd->temporary_tables = 0; // remove tempation from destructor to close them - pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done - pthread_mutex_unlock(&LOCK_slave); + DBUG_ASSERT(thd->net.buff != 0); net_end(&thd->net); // destructor will not free it, because we are weird - slave_thd = 0; + DBUG_ASSERT(rli->sql_thd == thd); + rli->sql_thd = 0; + pthread_mutex_lock(&LOCK_thread_count); delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + pthread_cond_broadcast(&rli->stop_cond); + // tell the world we are done + pthread_mutex_unlock(&rli->run_lock); my_thread_end(); -#ifndef DBUG_OFF - if (abort_slave_event_count && !events_till_abort) +#ifndef DBUG_OFF // TODO: reconsider the code below + if (abort_slave_event_count && !rli->events_till_abort) goto slave_begin; #endif pthread_exit(0); DBUG_RETURN(0); // Can't return anything here } +int queue_event(MASTER_INFO* mi,const char* buf,uint event_len) +{ + int error; + bool inc_pos = 1; + if (mi->old_format) + return 1; // TODO: deal with old format + + switch (buf[EVENT_TYPE_OFFSET]) + { + case ROTATE_EVENT: + { + Rotate_log_event rev(buf,event_len,0); + if (!rev.is_valid()) + return 1; + DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name)); + memcpy(mi->master_log_name,rev.new_log_ident, + rev.ident_len); + mi->master_log_name[rev.ident_len] = 0; + mi->master_log_pos = rev.pos; + inc_pos = 0; +#ifndef DBUG_OFF + /* if we do not do this, we will be getting the first + rotate event forever, so + we need to not disconnect after one + */ + if (disconnect_slave_event_count) + events_till_disconnect++; +#endif + break; + } + default: + break; + } + + if (!(error = mi->rli.relay_log.appendv(buf,event_len,0))) + { + if (inc_pos) + mi->master_log_pos += event_len; + } + return error; +} -/* try to connect until successful or slave killed */ +void end_relay_log_info(RELAY_LOG_INFO* rli) +{ + if (!rli->inited) + return; + if (rli->info_fd >= 0) + { + end_io_cache(&rli->info_file); + (void)my_close(rli->info_fd, MYF(MY_WME)); + rli->info_fd = -1; + } + if (rli->cur_log_fd >= 0) + { + end_io_cache(&rli->cache_buf); + (void)my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + } + rli->inited = 0; + rli->log_pos_current=0; + rli->relay_log.close(1); +} +/* try to connect until successful or slave killed */ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) { return connect_to_master(thd, mysql, mi, 0); @@ -1366,7 +1958,6 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) Try to connect until successful or slave killed or we have retried master_retry_count times */ - static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, bool reconnect) { @@ -1375,28 +1966,24 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, ulong err_count=0; char llbuff[22]; - /* - If we lost connection after reading a state set event - we will be re-reading it, so pending needs to be cleared - */ - mi->pending = 0; #ifndef DBUG_OFF events_till_disconnect = disconnect_slave_event_count; #endif - while (!(slave_was_killed = slave_killed(thd)) && - (reconnect ? mc_mysql_reconnect(mysql) != 0 : + while (!(slave_was_killed = slave_killed(thd,mi)) && + (reconnect ? mc_mysql_reconnect(mysql) : !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, mi->port, 0, 0))) { /* Don't repeat last error */ if (mc_mysql_errno(mysql) != last_errno) { - sql_print_error("Slave thread: error connecting to master: \ -%s, last_errno=%d, retry in %d sec", + sql_print_error("Slave I/O thread: error connecting to master \ +'%s@%s:%d': \ +%s, last_errno=%d, retry in %d sec",mi->user,mi->host,mi->port, mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql), mi->connect_retry); } - safe_sleep(thd, mi->connect_retry); + safe_sleep(thd,mi,mi->connect_retry); /* by default we try forever. The reason is that failure will trigger master election, so if the user did not set master_retry_count we do not want to have electioin triggered on the first failure to @@ -1415,10 +2002,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, { if (reconnect) sql_print_error("Slave: connected to master '%s@%s:%d',\ -replication resumed in log '%s' at position %s", glob_mi.user, - glob_mi.host, glob_mi.port, - RPL_LOG_NAME, - llstr(glob_mi.pos,llbuff)); +replication resumed in log '%s' at position %s", mi->user, + mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); else { change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE); @@ -1443,6 +2030,175 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) return connect_to_master(thd, mysql, mi, 1); } +int flush_relay_log_info(RELAY_LOG_INFO* rli) +{ + IO_CACHE* file = &rli->info_file; + char lbuf[22],lbuf1[22]; + + my_b_seek(file, 0L); + my_b_printf(file, "%s\n%s\n%s\n%s\n", + rli->relay_log_name, llstr(rli->relay_log_pos, lbuf), + rli->master_log_name, llstr(rli->master_log_pos, lbuf1) + ); + flush_io_cache(file); + flush_io_cache(rli->cur_log); + return 0; +} + +IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg) +{ + DBUG_ASSERT(rli->cur_log != &rli->cache_buf); + IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf; + DBUG_ASSERT(rli->cur_log_fd == -1); + if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name, + errmsg))<0) + return 0; + my_b_seek(cur_log,rli->relay_log_pos); + return cur_log; +} + +Log_event* next_event(RELAY_LOG_INFO* rli) +{ + Log_event* ev; + IO_CACHE* cur_log = rli->cur_log; + pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); + const char* errmsg=0; + THD* thd = rli->sql_thd; + bool was_killed; + DBUG_ASSERT(thd != 0); + + // For most operations we need to protect rli members with data_lock, + // so we will hold it for the most of the loop below + // However, we will release it whenever it is worth the hassle, + // and in the cases when we go into a pthread_cond_wait() with the + // non-data_lock mutex + pthread_mutex_lock(&rli->data_lock); + + for (;!(was_killed=slave_killed(thd,rli));) + { + // we can have two kinds of log reading: + // hot_log - rli->cur_log points at the IO_CACHE of relay_log, which + // is actively being updated by the I/O thread. We need to be careful + // in this case and make sure that we are not looking at a stale log that + // has already been rotated. If it has been, we reopen the log + // the other case is much simpler - we just have a read only log that + // nobody else will be updating. + bool hot_log; + if ((hot_log = (cur_log != &rli->cache_buf))) + { + DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor + pthread_mutex_lock(log_lock); + // reading cur_log->init_count here is safe because the log will only + // be rotated when we hold relay_log.LOCK_log + if (cur_log->init_count != rli->cur_log_init_count) + { + if (!(cur_log=reopen_relay_log(rli,&errmsg))) + { + pthread_mutex_unlock(log_lock); + goto err; + } + pthread_mutex_unlock(log_lock); + hot_log=0; + } + } + DBUG_ASSERT(my_b_tell(cur_log) >= 4); + DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); + if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format))) + { + DBUG_ASSERT(thd==rli->sql_thd); + if (hot_log) + pthread_mutex_unlock(log_lock); + pthread_mutex_unlock(&rli->data_lock); + return ev; + } + DBUG_ASSERT(thd==rli->sql_thd); + if (!cur_log->error) /* EOF */ + { + // on a hot log, EOF means that there are no more updates to + // process and we must block until I/O thread adds some and + // signals us to continue + if (hot_log) + { + DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count); + //we can, and should release data_lock while we are waiting for + // update. If we do not, show slave status will block + pthread_mutex_unlock(&rli->data_lock); + + // IMPORTANT: note that wait_for_update will unlock LOCK_log, but + // expects the caller to lock it + rli->relay_log.wait_for_update(rli->sql_thd); + + // re-acquire data lock since we released it earlier + pthread_mutex_lock(&rli->data_lock); + continue; + } + // if the log was not hot, we need to move to the next log in + // sequence. The next log could be hot or cold, we deal with both + // cases separately after doing some common initialization + else + { + end_io_cache(cur_log); + DBUG_ASSERT(rli->cur_log_fd >= 0); + my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + int error; + + // purge_first_log will properly set up relay log coordinates in rli + if (rli->relay_log.purge_first_log(rli)) + { + errmsg = "Error purging processed log"; + goto err; + } + + // next log is hot + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { +#ifdef EXTRA_DEBUG + sql_print_error("next log '%s' is currently active", + rli->linfo.log_file_name); +#endif + rli->cur_log = cur_log = rli->relay_log.get_log_file(); + rli->cur_log_init_count = cur_log->init_count; + DBUG_ASSERT(rli->cur_log_fd == -1); + + // read pointer has to be at the start since we are the only + // reader + if (check_binlog_magic(cur_log,&errmsg)) + goto err; + continue; + } + // if we get here, the log was not hot, so we will have to + // open it ourselves +#ifdef EXTRA_DEBUG + sql_print_error("next log '%s' is not active", + rli->linfo.log_file_name); +#endif + // open_binlog() will check the magic header + if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, + &errmsg))<0) + goto err; + } + } + else // read failed with a non-EOF error + { + // TODO: come up with something better to handle this error + sql_print_error("Slave SQL thread: I/O error reading \ +event(errno=%d,cur_log->error=%d)", + my_errno,cur_log->error); + // no need to hog the mutex while we sleep + pthread_mutex_unlock(&rli->data_lock); + safe_sleep(rli->sql_thd,rli->mi,1); + pthread_mutex_lock(&rli->data_lock); + } + } + if (!errmsg && was_killed) + errmsg = "slave SQL thread was killed"; +err: + pthread_mutex_unlock(&rli->data_lock); + sql_print_error("Error reading relay log event: %s", errmsg); + return 0; +} + #ifdef __GNUC__ template class I_List_iterator<i_string>; |