diff options
author | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2002-01-19 19:16:52 -0700 |
commit | 5df61c3cdc4499197e420a76b25b942dce0f3ccc (patch) | |
tree | 87da2fd65f79c28f4b97c4619f95b07797107d82 /sql/slave.cc | |
parent | 0831ce1c616296196eff82065da469156b4def82 (diff) | |
download | mariadb-git-5df61c3cdc4499197e420a76b25b942dce0f3ccc.tar.gz |
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work.
The main change is the new replication code - now we have two slave threads
SQL thread and I/O thread. I have also re-written a lot of the code to
prepare for multi-master implementation.
I also documented IO_CACHE quite extensively and to some extend, THD class.
Makefile.am:
moved tags target script into a separate file
include/my_sys.h:
fixes in IO_CACHE for SEQ_READ_APPEND + some documentation
libmysqld/lib_sql.cc:
updated replication locks, but now I see I did it wrong and it won't compile. Will fix
before the push.
mysql-test/r/rpl000014.result:
test result update
mysql-test/r/rpl000015.result:
test result update
mysql-test/r/rpl000016.result:
test result update
mysql-test/r/rpl_log.result:
test result update
mysql-test/t/rpl000016-slave.sh:
remove relay logs
mysql-test/t/rpl000017-slave.sh:
remove relay logs
mysql-test/t/rpl_log.test:
updated test
mysys/mf_iocache.c:
IO_CACHE updates to make replication work
mysys/mf_iocache2.c:
IO_CACHE update to make replication work
mysys/thr_mutex.c:
cosmetic change
sql/item_func.cc:
new replication code
sql/lex.h:
new replication
sql/log.cc:
new replication
sql/log_event.cc:
new replication
sql/log_event.h:
new replication
sql/mini_client.cc:
new replication
sql/mini_client.h:
new replication
sql/mysql_priv.h:
new replication
sql/mysqld.cc:
new replication
sql/repl_failsafe.cc:
new replication
sql/slave.cc:
new replication
sql/slave.h:
new replication
sql/sql_class.cc:
new replication
sql/sql_class.h:
new replication
sql/sql_lex.h:
new replication
sql/sql_parse.cc:
new replication
sql/sql_repl.cc:
new replication
sql/sql_repl.h:
new replication
sql/sql_show.cc:
new replication
sql/sql_yacc.yy:
new replication
sql/stacktrace.c:
more robust stack tracing
sql/structs.h:
new replication code
BitKeeper/etc/ignore:
Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 1416 |
1 files changed, 1105 insertions, 311 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 700838d7cd7..e68741e7434 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -24,24 +24,23 @@ #include "repl_failsafe.h" #include <thr_alarm.h> #include <my_dir.h> +#include <assert.h> -volatile bool slave_running = 0; +volatile bool slave_sql_running = 0, slave_io_running = 0; char* slave_load_tmpdir = 0; -pthread_t slave_real_id; -MASTER_INFO glob_mi; +MASTER_INFO main_mi; +MASTER_INFO* active_mi; +volatile int active_mi_in_use = 0; HASH replicate_do_table, replicate_ignore_table; DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table; bool do_table_inited = 0, ignore_table_inited = 0; bool wild_do_table_inited = 0, wild_ignore_table_inited = 0; bool table_rules_on = 0; -uint32 slave_skip_counter = 0; static TABLE* save_temporary_tables = 0; -THD* slave_thd = 0; // when slave thread exits, we need to remember the temporary tables so we // can re-use them on slave start -int last_slave_errno = 0; -char last_slave_error[MAX_SLAVE_ERRMSG] = ""; +// TODO: move the vars below under MASTER_INFO #ifndef DBUG_OFF int disconnect_slave_event_count = 0, abort_slave_event_count = 0; static int events_till_disconnect = -1; @@ -49,15 +48,17 @@ int events_till_abort = -1; static int stuck_count = 0; #endif +typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; void skip_load_data_infile(NET* net); -inline bool slave_killed(THD* thd); -static int init_slave_thread(THD* thd); +static inline bool slave_killed(THD* thd,MASTER_INFO* mi); +static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli); +static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, bool reconnect); -static int safe_sleep(THD* thd, int sec); +static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec); static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); @@ -65,6 +66,79 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi); char* rewrite_db(char* db); +void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse) +{ + bool set_io = mi->slave_running, set_sql = mi->rli.slave_running; + if (inverse) + { + /* This makes me think of the Russian idiom "I am not I, and this is + not my horse", which is used to deny reponsibility for + one's actions. + */ + set_io = !set_io; + set_sql = !set_sql; + } + register int tmp_mask=0; + if (set_io) + tmp_mask |= SLAVE_IO; + if (set_sql) + tmp_mask |= SLAVE_SQL; + *mask = tmp_mask; +} + +void lock_slave_threads(MASTER_INFO* mi) +{ + //TODO: see if we can do this without dual mutex + pthread_mutex_lock(&mi->run_lock); + pthread_mutex_lock(&mi->rli.run_lock); +} + +void unlock_slave_threads(MASTER_INFO* mi) +{ + //TODO: see if we can do this without dual mutex + pthread_mutex_unlock(&mi->rli.run_lock); + pthread_mutex_unlock(&mi->run_lock); +} + +int init_slave() +{ + // TODO (multi-master): replace this with list initialization + active_mi = &main_mi; + + // TODO: the code below is a copy-paste mess - clean it up + /* + make sure slave thread gets started if server_id is set, + valid master.info is present, and master_host has not been specified + */ + if (server_id && !master_host) + { + // TODO: re-write this to interate through the list of files + // for multi-master + char fname[FN_REFLEN+128]; + MY_STAT stat_area; + fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32); + if (my_stat(fname, &stat_area, MYF(0)) && + !init_master_info(active_mi,master_info_file,relay_log_info_file)) + master_host = active_mi->host; + } + // slave thread + if (master_host) + { + if (!opt_skip_slave_start && start_slave_threads(1 /* need mutex */, + 0 /* no wait for start*/, + active_mi, + master_info_file, + relay_log_info_file, + SLAVE_IO|SLAVE_SQL + )) + sql_print_error("Warning: Can't create threads to handle slave"); + else if (opt_skip_slave_start) + if (init_master_info(active_mi, master_info_file, relay_log_info_file)) + sql_print_error("Warning: failed to initialized master info"); + } + return 0; +} + static void free_table_ent(TABLE_RULE_ENT* e) { my_free((gptr) e, MYF(0)); @@ -77,6 +151,285 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, return (byte*)e->db; } +// TODO: check proper initialization of master_log_name/master_log_pos +int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, + ulonglong pos, bool need_data_lock, + const char** errmsg) +{ + if (rli->log_pos_current) + return 0; + pthread_mutex_t *log_lock=rli->relay_log.get_log_lock(); + pthread_mutex_lock(log_lock); + if (need_data_lock) + pthread_mutex_lock(&rli->data_lock); + + if (rli->cur_log_fd >= 0) + { + end_io_cache(&rli->cache_buf); + my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + } + + if (!log) + log = rli->relay_log_name; // already inited + if (!pos) + pos = rli->relay_log_pos; // already inited + else + rli->relay_log_pos = pos; + if (rli->relay_log.find_first_log(&rli->linfo,log)) + { + *errmsg="Could not find first log during relay log initialization"; + goto err; + } + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + // to make end_io_cache(&rli->cache_buf) safe in all cases + if (!rli->inited) + bzero((char*) &rli->cache_buf, sizeof(IO_CACHE)); + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { + if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 && + check_binlog_magic(rli->cur_log,errmsg)) + { + goto err; + } + rli->cur_log_init_count=rli->cur_log->init_count; + } + else + { + if (rli->inited) + end_io_cache(&rli->cache_buf); + if (rli->cur_log_fd>=0) + my_close(rli->cur_log_fd,MYF(MY_WME)); + if ((rli->cur_log_fd=open_binlog(&rli->cache_buf, + rli->linfo.log_file_name,errmsg)) < 0) + { + goto err; + } + rli->cur_log = &rli->cache_buf; + } + if (pos > 4) + my_b_seek(rli->cur_log,(off_t)pos); + rli->log_pos_current=1; +err: + pthread_cond_broadcast(&rli->data_cond); + if (need_data_lock) + pthread_mutex_unlock(&rli->data_lock); + pthread_mutex_unlock(log_lock); + return (*errmsg) ? 1 : 0; +} + +// we assume we have a run lock on rli and that the both slave thread +// are not running +int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg) +{ + if (!rli->inited) + return 0; /* successfully do nothing */ + DBUG_ASSERT(rli->slave_running == 0); + DBUG_ASSERT(rli->mi->slave_running == 0); + int error=0; + rli->slave_skip_counter=0; + pthread_mutex_lock(&rli->data_lock); + rli->pending=0; + rli->master_log_name[0]=0; + rli->master_log_pos=0; // 0 means uninitialized + if (rli->relay_log.reset_logs(rli->sql_thd) || + rli->relay_log.find_first_log(&rli->linfo,"")) + { + *errmsg = "Failed during log reset"; + error=1; + goto err; + } + strnmov(rli->relay_log_name,rli->linfo.log_file_name, + sizeof(rli->relay_log_name)); + rli->relay_log_pos=4; + rli->log_pos_current=0; + if (!just_reset) + error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg); +err: + pthread_mutex_unlock(&rli->data_lock); + return error; +} + +int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock) +{ + if (!mi->inited) + return 0; /* successfully do nothing */ + int error,force_all = (thread_mask & SLAVE_FORCE_ALL); + pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; + pthread_mutex_t *sql_cond_lock,*io_cond_lock; + + sql_cond_lock=sql_lock; + io_cond_lock=io_lock; + + if (skip_lock) + { + sql_lock = io_lock = 0; + } + if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running) + { + mi->abort_slave=1; + if ((error=terminate_slave_thread(mi->io_thd,io_lock, + io_cond_lock, + &mi->stop_cond, + &mi->slave_running)) && + !force_all) + return error; + } + if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running) + { + DBUG_ASSERT(mi->rli.sql_thd != 0) ; + mi->rli.abort_slave=1; + if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock, + sql_cond_lock, + &mi->rli.stop_cond, + &mi->rli.slave_running)) && + !force_all) + return error; + } + return 0; +} + +int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock, + pthread_mutex_t *cond_lock, + pthread_cond_t* term_cond, + volatile bool* slave_running) +{ + if (term_lock) + { + pthread_mutex_lock(term_lock); + if (!*slave_running) + { + pthread_mutex_unlock(term_lock); + return ER_SLAVE_NOT_RUNNING; + } + } + DBUG_ASSERT(thd != 0); + KICK_SLAVE(thd); + while (*slave_running) + { + /* there is a small chance that slave thread might miss the first + alarm. To protect againts it, resend the signal until it reacts + */ + struct timespec abstime; +#ifdef HAVE_TIMESPEC_TS_SEC + abstime.ts_sec=time(NULL)+2; + abstime.ts_nsec=0; +#elif defined(__WIN__) + abstime.tv_sec=time((time_t*) 0)+2; + abstime.tv_nsec=0; +#else + struct timeval tv; + gettimeofday(&tv,0); + abstime.tv_sec=tv.tv_sec+2; + abstime.tv_nsec=tv.tv_usec*1000; +#endif + pthread_cond_timedwait(term_cond, cond_lock, &abstime); + if (*slave_running) + KICK_SLAVE(thd); + } + if (term_lock) + pthread_mutex_unlock(term_lock); + return 0; +} + +int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock, + pthread_mutex_t *cond_lock, + pthread_cond_t* start_cond, + volatile bool* slave_running, + MASTER_INFO* mi) +{ + pthread_t th; + DBUG_ASSERT(mi->inited); + if (start_lock) + pthread_mutex_lock(start_lock); + if (!server_id) + { + if (start_cond) + pthread_cond_broadcast(start_cond); + if (start_lock) + pthread_mutex_unlock(start_lock); + sql_print_error("Server id not set, will not start slave"); + return ER_BAD_SLAVE; + } + + if (*slave_running) + { + if (start_cond) + pthread_cond_broadcast(start_cond); + if (start_lock) + pthread_mutex_unlock(start_lock); + return ER_SLAVE_MUST_STOP; + } + if (pthread_create(&th, &connection_attrib, h_func, (void*)mi)) + { + if (start_lock) + pthread_mutex_unlock(start_lock); + return ER_SLAVE_THREAD; + } + if (start_cond && cond_lock) + { + THD* thd = current_thd; + while (!*slave_running) + { + const char* old_msg = thd->enter_cond(start_cond,cond_lock, + "Waiting for slave thread to start"); + pthread_cond_wait(start_cond,cond_lock); + thd->exit_cond(old_msg); + // TODO: in a very rare case of init_slave_thread failing, it is + // possible that we can get stuck here since slave_running will not + // be set. We need to change slave_running to int and have -1 as + // error code + if (thd->killed) + { + pthread_mutex_unlock(cond_lock); + return ER_SERVER_SHUTDOWN; + } + } + } + if (start_lock) + pthread_mutex_unlock(start_lock); + return 0; +} +/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make + sense to do that for starting a slave - we always care if it actually + started the threads that were not previously running +*/ +int start_slave_threads(bool need_slave_mutex, bool wait_for_start, + MASTER_INFO* mi, const char* master_info_fname, + const char* slave_info_fname, int thread_mask) +{ + pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0; + pthread_cond_t* cond_io=0,*cond_sql=0; + int error=0; + + if (need_slave_mutex) + { + lock_io = &mi->run_lock; + lock_sql = &mi->rli.run_lock; + } + if (wait_for_start) + { + cond_io = &mi->start_cond; + cond_sql = &mi->rli.start_cond; + lock_cond_io = &mi->run_lock; + lock_cond_sql = &mi->rli.run_lock; + } + if (init_master_info(mi,master_info_fname,slave_info_fname)) + return ER_MASTER_INFO; + + if ((thread_mask & SLAVE_IO) && + (error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io, + cond_io,&mi->slave_running, + mi))) + return error; + if ((thread_mask & SLAVE_SQL) && + (error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql, + cond_sql, + &mi->rli.slave_running,mi))) + return error; + return 0; +} void init_table_rule_hash(HASH* h, bool* h_inited) { @@ -98,11 +451,11 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len) uint i; const char* key_end = key + len; - for(i = 0; i < a->elements; i++) + for (i = 0; i < a->elements; i++) { TABLE_RULE_ENT* e ; get_dynamic(a, (gptr)&e, i); - if(!wild_case_compare(key, key_end, (const char*)e->db, + if (!wild_case_compare(key, key_end, (const char*)e->db, (const char*)(e->db + e->key_len),'\\')) return e; } @@ -126,7 +479,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables) if (hash_search(&replicate_do_table, (byte*) hash_key, len)) return 1; } - if (ignore_table_inited) // if there are any do's + if (ignore_table_inited) // if there are any ignores { if (hash_search(&replicate_ignore_table, (byte*) hash_key, len)) return 0; @@ -191,44 +544,52 @@ static void free_string_array(DYNAMIC_ARRAY *a) delete_dynamic(a); } +static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/) +{ + end_master_info(mi); + return 0; +} + void end_slave() { - pthread_mutex_lock(&LOCK_slave); - if (slave_running) - { - abort_slave = 1; - thr_alarm_kill(slave_real_id); -#ifdef SIGNAL_WITH_VIO_CLOSE - slave_thd->close_active_vio(); -#endif - while (slave_running) - pthread_cond_wait(&COND_slave_stopped, &LOCK_slave); - } - pthread_mutex_unlock(&LOCK_slave); - - end_master_info(&glob_mi); - if(do_table_inited) + // TODO: replace the line below with + // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0); + // once multi-master code is ready + terminate_slave_threads(active_mi,SLAVE_FORCE_ALL); + end_master_info(active_mi); + if (do_table_inited) hash_free(&replicate_do_table); - if(ignore_table_inited) + if (ignore_table_inited) hash_free(&replicate_ignore_table); - if(wild_do_table_inited) + if (wild_do_table_inited) free_string_array(&replicate_wild_do_table); - if(wild_ignore_table_inited) + if (wild_ignore_table_inited) free_string_array(&replicate_wild_ignore_table); } -inline bool slave_killed(THD* thd) +static inline bool slave_killed(THD* thd, MASTER_INFO* mi) { - return abort_slave || abort_loop || thd->killed; + DBUG_ASSERT(mi->io_thd == thd); + DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun + return mi->abort_slave || abort_loop || thd->killed; } -void slave_print_error(int err_code, const char* msg, ...) +static inline bool slave_killed(THD* thd, RELAY_LOG_INFO* rli) +{ + DBUG_ASSERT(rli->sql_thd == thd); + DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun + return rli->abort_slave || abort_loop || thd->killed; +} + +void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...) { va_list args; va_start(args,msg); - my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args); - sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code); - last_slave_errno = err_code; + my_vsnprintf(rli->last_slave_error, + sizeof(rli->last_slave_error), msg, args); + sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error, + err_code); + rli->last_slave_errno = err_code; } void skip_load_data_infile(NET* net) @@ -476,16 +837,16 @@ err: return error; } -int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, +int fetch_master_table(THD* thd, const char* db_name, const char* table_name, MASTER_INFO* mi, MYSQL* mysql) { int error = 1; - int nx_errno = 0; + int fetch_errno = 0; bool called_connected = (mysql != NULL); if (!called_connected && !(mysql = mc_mysql_init(NULL))) { - sql_print_error("fetch_nx_table: Error in mysql_init()"); - nx_errno = ER_GET_ERRNO; + sql_print_error("fetch_master_table: Error in mysql_init()"); + fetch_errno = ER_GET_ERRNO; goto err; } @@ -495,17 +856,17 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, { sql_print_error("Could not connect to master while fetching table\ '%-64s.%-64s'", db_name, table_name); - nx_errno = ER_CONNECT_TO_MASTER; + fetch_errno = ER_CONNECT_TO_MASTER; goto err; } } - if (slave_killed(thd)) + if (thd->killed) goto err; if (request_table_dump(mysql, db_name, table_name)) { - nx_errno = ER_GET_ERRNO; - sql_print_error("fetch_nx_table: failed on table dump request "); + fetch_errno = ER_GET_ERRNO; + sql_print_error("fetch_master_table: failed on table dump request "); goto err; } @@ -513,24 +874,25 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name, table_name)) { // create_table_from_dump will have sent the error alread - sql_print_error("fetch_nx_table: failed on create table "); + sql_print_error("fetch_master_table: failed on create table "); goto err; } - error = 0; - err: if (mysql && !called_connected) mc_mysql_close(mysql); - if (nx_errno && thd->net.vio) - send_error(&thd->net, nx_errno, "Error in fetch_nx_table"); + if (fetch_errno && thd->net.vio) + send_error(&thd->net, fetch_errno, "Error in fetch_master_table"); thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump return error; } void end_master_info(MASTER_INFO* mi) { - if(mi->fd >= 0) + if (!mi->inited) + return; + end_relay_log_info(&mi->rli); + if (mi->fd >= 0) { end_io_cache(&mi->file); (void)my_close(mi->fd, MYF(MY_WME)); @@ -539,21 +901,136 @@ void end_master_info(MASTER_INFO* mi) mi->inited = 0; } -int init_master_info(MASTER_INFO* mi) +int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) +{ + if (rli->inited) + return 0; + MY_STAT stat_area; + char fname[FN_REFLEN+128]; + int info_fd; + const char* msg = 0; + int error = 0; + + fn_format(fname, info_fname, + mysql_data_home, "", 4+32); + pthread_mutex_lock(&rli->data_lock); + info_fd = rli->info_fd; + rli->pending = 0; + rli->cur_log_fd = -1; + rli->slave_skip_counter=0; + rli->log_pos_current=0; + // TODO: make this work with multi-master + if (!opt_relay_logname) + { + char tmp[FN_REFLEN]; + /* TODO: The following should be using fn_format(); We just need to + first change fn_format() to cut the file name if it's too long. + */ + strmake(tmp,glob_hostname,FN_REFLEN-5); + strmov(strcend(tmp,'.'),"-relay-bin"); + opt_relay_logname=my_strdup(tmp,MYF(MY_WME)); + } + rli->relay_log.set_index_file_name(opt_relaylog_index_name); + open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin", + LOG_BIN, 1 /* read_append cache */, + 1 /* no auto events*/); + + /* if file does not exist */ + if (!my_stat(fname, &stat_area, MYF(0))) + { + // if someone removed the file from underneath our feet, just close + // the old descriptor and re-create the old file + if (info_fd >= 0) + my_close(info_fd, MYF(MY_WME)); + if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 + || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0, + MYF(MY_WME))) + { + if(info_fd >= 0) + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; + } + if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg)) + goto err; + rli->master_log_pos = 0; // uninitialized + rli->info_fd = info_fd; + } + else // file exists + { + if(info_fd >= 0) + reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0); + else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 + || init_io_cache(&rli->info_file, info_fd, + IO_SIZE*2, READ_CACHE, 0L, + 0, MYF(MY_WME))) + { + if (info_fd >= 0) + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; + } + + rli->info_fd = info_fd; + if (init_strvar_from_file(rli->relay_log_name, + sizeof(rli->relay_log_name), &rli->info_file, + (char*)"") || + init_intvar_from_file((int*)&rli->relay_log_pos, + &rli->info_file, 4) || + init_strvar_from_file(rli->master_log_name, + sizeof(rli->master_log_name), &rli->info_file, + (char*)"") || + init_intvar_from_file((int*)&rli->master_log_pos, + &rli->info_file, 0)) + { + msg="Error reading slave log configuration"; + goto err; + } + if (init_relay_log_pos(rli,0 /*log already inited*/, + 0 /*pos already inited*/, + 0 /* no data lock*/, + &msg)) + goto err; + } + DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + rli->inited = 1; + // now change the cache from READ to WRITE - must do this + // before flush_relay_log_info + reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1); + error=test(flush_relay_log_info(rli)); + pthread_mutex_unlock(&rli->data_lock); + return error; + +err: + sql_print_error(msg); + end_io_cache(&rli->info_file); + my_close(info_fd, MYF(0)); + rli->info_fd=-1; + pthread_mutex_unlock(&rli->data_lock); + return 1; +} + +int init_master_info(MASTER_INFO* mi, const char* master_info_fname, + const char* slave_info_fname) { if (mi->inited) return 0; + if (init_relay_log_info(&mi->rli, slave_info_fname)) + return 1; + mi->rli.mi = mi; int fd,length,error; MY_STAT stat_area; char fname[FN_REFLEN+128]; const char *msg; - fn_format(fname, master_info_file, mysql_data_home, "", 4+32); + fn_format(fname, master_info_fname, mysql_data_home, "", 4+32); // we need a mutex while we are changing master info parameters to // keep other threads from reading bogus info - pthread_mutex_lock(&mi->lock); - mi->pending = 0; + pthread_mutex_lock(&mi->data_lock); fd = mi->fd; // we do not want any messages if the file does not exist @@ -569,11 +1046,13 @@ int init_master_info(MASTER_INFO* mi) { if(fd >= 0) my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + end_relay_log_info(&mi->rli); + pthread_mutex_unlock(&mi->data_lock); return 1; } - mi->log_file_name[0] = 0; - mi->pos = 4; // skip magic number + mi->master_log_name[0] = 0; + mi->master_log_pos = 4; // skip magic number mi->fd = fd; if (master_host) @@ -595,28 +1074,19 @@ int init_master_info(MASTER_INFO* mi) { if(fd >= 0) my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + end_relay_log_info(&mi->rli); + pthread_mutex_unlock(&mi->data_lock); return 1; } - - if ((length=my_b_gets(&mi->file, mi->log_file_name, - sizeof(mi->log_file_name))) < 1) - { - msg="Error reading log file name from master info file "; - goto error; - } - - mi->log_file_name[length-1]= 0; // kill \n - /* Reuse fname buffer */ - if(!my_b_gets(&mi->file, fname, sizeof(fname))) - { - msg="Error reading log file position from master info file"; - goto error; - } - mi->pos = strtoull(fname,(char**) 0, 10); mi->fd = fd; - if(init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, + if (init_strvar_from_file(mi->master_log_name, + sizeof(mi->master_log_name), &mi->file, + (char*)"") || + init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4) + || + init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, master_host) || init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file, master_user) || @@ -624,12 +1094,11 @@ int init_master_info(MASTER_INFO* mi) master_password) || init_intvar_from_file((int*)&mi->port, &mi->file, master_port) || init_intvar_from_file((int*)&mi->connect_retry, &mi->file, - master_connect_retry) || - init_intvar_from_file((int*)&mi->last_log_seq, &mi->file, 0) + master_connect_retry) ) { msg="Error reading master configuration"; - goto error; + goto err; } } @@ -638,14 +1107,17 @@ int init_master_info(MASTER_INFO* mi) // before flush_master_info reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1); error=test(flush_master_info(mi)); - pthread_mutex_unlock(&mi->lock); + pthread_mutex_unlock(&mi->data_lock); return error; -error: +err: sql_print_error(msg); end_io_cache(&mi->file); + end_relay_log_info(&mi->rli); + DBUG_ASSERT(fd>=0); my_close(fd, MYF(0)); - pthread_mutex_unlock(&mi->lock); + mi->fd=-1; + pthread_mutex_unlock(&mi->data_lock); return 1; } @@ -654,14 +1126,14 @@ int register_slave_on_master(MYSQL* mysql) String packet; char buf[4]; - if(!report_host) + if (!report_host) return 0; int4store(buf, server_id); packet.append(buf, 4); net_store_data(&packet, report_host); - if(report_user) + if (report_user) net_store_data(&packet, report_user); else packet.append((char)0); @@ -678,7 +1150,7 @@ int register_slave_on_master(MYSQL* mysql) int4store(buf, 0); /* tell the master will fill in master_id */ packet.append(buf, 4); - if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(), + if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(), packet.length(), 0)) { sql_print_error("Error on COM_REGISTER_SLAVE: '%s'", @@ -689,52 +1161,61 @@ int register_slave_on_master(MYSQL* mysql) return 0; } - -int show_master_info(THD* thd) +int show_master_info(THD* thd, MASTER_INFO* mi) { + // TODO: fix this for multi-master DBUG_ENTER("show_master_info"); List<Item> field_list; field_list.push_back(new Item_empty_string("Master_Host", - sizeof(glob_mi.host))); + sizeof(mi->host))); field_list.push_back(new Item_empty_string("Master_User", - sizeof(glob_mi.user))); + sizeof(mi->user))); field_list.push_back(new Item_empty_string("Master_Port", 6)); field_list.push_back(new Item_empty_string("Connect_retry", 6)); - field_list.push_back( new Item_empty_string("Log_File", + field_list.push_back(new Item_empty_string("Master_Log_File", FN_REFLEN)); - field_list.push_back(new Item_empty_string("Pos", 12)); - field_list.push_back(new Item_empty_string("Slave_Running", 3)); + field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12)); + field_list.push_back(new Item_empty_string("Relay_Log_File", + FN_REFLEN)); + field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12)); + field_list.push_back(new Item_empty_string("Relay_Master_Log_File", + FN_REFLEN)); + field_list.push_back(new Item_empty_string("Slave_IO_Running", 3)); + field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3)); field_list.push_back(new Item_empty_string("Replicate_do_db", 20)); field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20)); field_list.push_back(new Item_empty_string("Last_errno", 4)); field_list.push_back(new Item_empty_string("Last_error", 20)); field_list.push_back(new Item_empty_string("Skip_counter", 12)); - field_list.push_back(new Item_empty_string("Last_log_seq", 12)); + field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12)); if(send_fields(thd, field_list, 1)) DBUG_RETURN(-1); String* packet = &thd->packet; - uint32 last_log_seq; packet->length(0); - pthread_mutex_lock(&glob_mi.lock); - net_store_data(packet, glob_mi.host); - net_store_data(packet, glob_mi.user); - net_store_data(packet, (uint32) glob_mi.port); - net_store_data(packet, (uint32) glob_mi.connect_retry); - net_store_data(packet, glob_mi.log_file_name); - net_store_data(packet, (longlong) glob_mi.pos); - last_log_seq = glob_mi.last_log_seq; - pthread_mutex_unlock(&glob_mi.lock); - pthread_mutex_lock(&LOCK_slave); // QQ; This is not needed - net_store_data(packet, slave_running ? "Yes":"No"); - pthread_mutex_unlock(&LOCK_slave); // QQ; This is not needed + pthread_mutex_lock(&mi->data_lock); + pthread_mutex_lock(&mi->rli.data_lock); + net_store_data(packet, mi->host); + net_store_data(packet, mi->user); + net_store_data(packet, (uint32) mi->port); + net_store_data(packet, (uint32) mi->connect_retry); + net_store_data(packet, mi->master_log_name); + net_store_data(packet, (longlong) mi->master_log_pos); + net_store_data(packet, mi->rli.relay_log_name + + dirname_length(mi->rli.relay_log_name)); + net_store_data(packet, (longlong) mi->rli.relay_log_pos); + net_store_data(packet, mi->rli.master_log_name); + net_store_data(packet, mi->slave_running ? "Yes":"No"); + net_store_data(packet, mi->rli.slave_running ? "Yes":"No"); net_store_data(packet, &replicate_do_db); net_store_data(packet, &replicate_ignore_db); - net_store_data(packet, (uint32)last_slave_errno); - net_store_data(packet, last_slave_error); - net_store_data(packet, slave_skip_counter); - net_store_data(packet, last_log_seq); + net_store_data(packet, (uint32)mi->rli.last_slave_errno); + net_store_data(packet, mi->rli.last_slave_error); + net_store_data(packet, mi->rli.slave_skip_counter); + net_store_data(packet, (longlong)mi->rli.master_log_pos); + pthread_mutex_unlock(&mi->rli.data_lock); + pthread_mutex_unlock(&mi->data_lock); if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length())) DBUG_RETURN(-1); @@ -747,58 +1228,64 @@ int flush_master_info(MASTER_INFO* mi) { IO_CACHE* file = &mi->file; char lbuf[22]; - char lbuf1[22]; my_b_seek(file, 0L); my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n", - mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user, - mi->password, mi->port, mi->connect_retry, - llstr(mi->last_log_seq, lbuf1)); + mi->master_log_name, llstr(mi->master_log_pos, lbuf), + mi->host, mi->user, + mi->password, mi->port, mi->connect_retry + ); flush_io_cache(file); return 0; } -int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos) +/* TODO: the code below needs to be re-written almost from scratch + Main issue is how to find out if we have reached a certain position + in the master log my knowing the offset in the relay log. + */ +int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, + ulonglong log_pos) { if (!inited) return -1; - bool pos_reached; + bool pos_reached = 0; int event_count = 0; - pthread_mutex_lock(&lock); - while(!thd->killed) + pthread_mutex_lock(&data_lock); + while (!thd->killed) { int cmp_result; - if (*log_file_name) + DBUG_ASSERT(*master_log_name || master_log_pos == 0); + if (*master_log_name) { /* We should use dirname_length() here when we have a version of this that doesn't modify the argument */ - char *basename = strrchr(log_file_name, FN_LIBCHAR); + char *basename = strrchr(master_log_name, FN_LIBCHAR); if (basename) ++basename; else - basename = log_file_name; + basename = master_log_name; cmp_result = strncmp(basename, log_name->ptr(), log_name->length()); } else cmp_result = 0; - pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0); + pos_reached = ((!cmp_result && master_log_pos >= log_pos) || + cmp_result > 0); if (pos_reached || thd->killed) break; - const char* msg = thd->enter_cond(&cond, &lock, + const char* msg = thd->enter_cond(&data_cond, &data_lock, "Waiting for master update"); - pthread_cond_wait(&cond, &lock); + pthread_cond_wait(&data_cond, &data_lock); thd->exit_cond(msg); event_count++; } - pthread_mutex_unlock(&lock); + pthread_mutex_unlock(&data_lock); return thd->killed ? -1 : event_count; } - -static int init_slave_thread(THD* thd) +static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) { DBUG_ENTER("init_slave_thread"); thd->system_thread = thd->bootstrap = 1; @@ -812,7 +1299,7 @@ static int init_slave_thread(THD* thd) thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ; thd->system_thread = 1; thd->client_capabilities = CLIENT_LOCAL_FILES; - slave_real_id=thd->real_id=pthread_self(); + thd->real_id=pthread_self(); pthread_mutex_lock(&LOCK_thread_count); thd->thread_id = thread_id++; pthread_mutex_unlock(&LOCK_thread_count); @@ -822,7 +1309,6 @@ static int init_slave_thread(THD* thd) my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) || my_pthread_setspecific_ptr(THR_NET, &thd->net)) { - close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed? end_thread(thd,0); DBUG_RETURN(-1); } @@ -839,14 +1325,21 @@ static int init_slave_thread(THD* thd) if (thd->max_join_size == (ulong) ~0L) thd->options |= OPTION_BIG_SELECTS; - thd->proc_info="Waiting for master update"; + if (thd_type == SLAVE_THD_SQL) + { + thd->proc_info = "Waiting for the next event in slave queue"; + } + else + { + thd->proc_info="Waiting for master update"; + } thd->version=refresh_version; thd->set_time(); DBUG_RETURN(0); } -static int safe_sleep(THD* thd, int sec) +static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec) { thr_alarm_t alarmed; thr_alarm_init(&alarmed); @@ -869,21 +1362,20 @@ static int safe_sleep(THD* thd, int sec) if (thr_alarm_in_use(&alarmed)) thr_end_alarm(&alarmed); - if (slave_killed(thd)) + if (slave_killed(thd,mi)) return 1; start_time=time((time_t*) 0); } return 0; } - static int request_dump(MYSQL* mysql, MASTER_INFO* mi) { char buf[FN_REFLEN + 10]; int len; int binlog_flags = 0; // for now - char* logname = mi->log_file_name; - int4store(buf, mi->pos); + char* logname = mi->master_log_name; + int4store(buf, mi->master_log_pos); int2store(buf + 4, binlog_flags); int4store(buf + 6, server_id); len = (uint) strlen(logname); @@ -929,7 +1421,6 @@ command"); return 0; } - static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) { ulong len = packet_error; @@ -944,13 +1435,13 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi) return packet_error; #endif - while (!abort_loop && !abort_slave && len == packet_error && + while (!abort_loop && !mi->abort_slave && len == packet_error && read_errno == EINTR ) { len = mc_net_safe_read(mysql); read_errno = errno; } - if (abort_loop || abort_slave) + if (abort_loop || mi->abort_slave) return packet_error; if (len == packet_error || (long) len < 1) { @@ -973,65 +1464,74 @@ server_errno=%d)", return len - 1; } -int check_expected_error(THD* thd, int expected_error) +int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error) { - switch(expected_error) + switch (expected_error) { case ER_NET_READ_ERROR: case ER_NET_ERROR_ON_WRITE: case ER_SERVER_SHUTDOWN: case ER_NEW_ABORTING_CONNECTION: - my_snprintf(last_slave_error, sizeof(last_slave_error), + my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error), "Slave: query '%s' partially completed on the master \ and was aborted. There is a chance that your master is inconsistent at this \ point. If you are sure that your master is ok, run this query manually on the\ slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\ SLAVE START;", thd->query); - last_slave_errno = expected_error; - sql_print_error("%s",last_slave_error); + rli->last_slave_errno = expected_error; + sql_print_error("%s",rli->last_slave_error); return 1; default: return 0; } } -static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) +static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) { const char *error_msg; - Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, - event_len, &error_msg, - mi->old_format); + DBUG_ASSERT(rli->sql_thd==thd); + Log_event * ev = next_event(rli); + DBUG_ASSERT(rli->sql_thd==thd); + if (slave_killed(thd,rli)) + return 1; if (ev) { int type_code = ev->get_type_code(); int exec_res; + pthread_mutex_lock(&rli->data_lock); if (ev->server_id == ::server_id || - (slave_skip_counter && type_code != ROTATE_EVENT)) + (rli->slave_skip_counter && type_code != ROTATE_EVENT)) { - if(type_code == LOAD_EVENT) - skip_load_data_infile(net); - - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - if(slave_skip_counter && /* protect against common user error of + /* + TODO: I/O thread must handle skipping file delivery for + old load data infile events + */ + /* TODO: I/O thread should not even log events with the same server id */ + rli->inc_pos(ev->get_event_len(), + type_code != STOP_EVENT ? ev->log_pos : 0, + 1/* skip lock*/); + flush_relay_log_info(rli); + if (rli->slave_skip_counter && /* protect against common user error of setting the counter to 1 instead of 2 while recovering from an failed auto-increment insert */ - !(type_code == INTVAR_EVENT && - slave_skip_counter == 1)) - --slave_skip_counter; + !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) && + rli->slave_skip_counter == 1)) + --rli->slave_skip_counter; + pthread_mutex_unlock(&rli->data_lock); delete ev; return 0; // avoid infinite update loops } + pthread_mutex_unlock(&rli->data_lock); thd->server_id = ev->server_id; // use the original server id for logging thd->set_time(); // time the query - if(!thd->log_seq) - thd->log_seq = ev->log_seq; if (!ev->when) ev->when = time(NULL); ev->thd = thd; - exec_res = ev->exec_event(mi); + thd->log_pos = ev->log_pos; + exec_res = ev->exec_event(rli); + DBUG_ASSERT(rli->sql_thd==thd); delete ev; return exec_res; } @@ -1044,167 +1544,148 @@ This may also be a network problem, or just a bug in the master or slave code.\ return 1; } } - -// slave thread -pthread_handler_decl(handle_slave,arg __attribute__((unused))) +/* slave I/O thread */ +pthread_handler_decl(handle_slave_io,arg) { #ifndef DBUG_OFF slave_begin: #endif THD *thd; // needs to be first for thread_stack MYSQL *mysql = NULL ; + MASTER_INFO* mi = (MASTER_INFO*)arg; char llbuff[22]; - - pthread_mutex_lock(&LOCK_slave); - if (!server_id) - { - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - sql_print_error("Server id not set, will not start slave"); - pthread_exit((void*)1); - } + bool retried_once = 0; + ulonglong last_failed_pos = 0; // TODO: see if last_failed_pos is needed + DBUG_ASSERT(mi->inited); - if(slave_running) - { - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - pthread_exit((void*)1); // safety just in case - } - slave_running = 1; - abort_slave = 0; + pthread_mutex_lock(&mi->run_lock); #ifndef DBUG_OFF - events_till_abort = abort_slave_event_count; + mi->events_till_abort = abort_slave_event_count; #endif - pthread_cond_broadcast(&COND_slave_start); - pthread_mutex_unlock(&LOCK_slave); - - // int error = 1; - bool retried_once = 0; - ulonglong last_failed_pos = 0; // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); - slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ ! - DBUG_ENTER("handle_slave"); + thd = new THD; // note that contructor of THD uses DBUG_ ! + DBUG_ENTER("handle_slave_io"); pthread_detach_this_thread(); - if (init_slave_thread(thd) || init_master_info(&glob_mi)) + if (init_slave_thread(thd, SLAVE_THD_IO)) { - sql_print_error("Failed during slave thread initialization"); + pthread_cond_broadcast(&mi->start_cond); + pthread_mutex_unlock(&mi->run_lock); + sql_print_error("Failed during slave I/O thread initialization"); goto err; } + mi->io_thd = thd; thd->thread_stack = (char*)&thd; // remember where our stack is - thd->temporary_tables = save_temporary_tables; // restore temp tables threads.append(thd); - glob_mi.pending = 0; //this should always be set to 0 when the slave thread - // is started + mi->slave_running = 1; + mi->abort_slave = 0; + pthread_cond_broadcast(&mi->start_cond); + pthread_mutex_unlock(&mi->run_lock); DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", - glob_mi.log_file_name, llstr(glob_mi.pos,llbuff))); - + mi->master_log_name, llstr(mi->master_log_pos,llbuff))); if (!(mysql = mc_mysql_init(NULL))) { - sql_print_error("Slave thread: error in mc_mysql_init()"); + sql_print_error("Slave I/O thread: error in mc_mysql_init()"); goto err; } thd->proc_info = "connecting to master"; #ifndef DBUG_OFF - sql_print_error("Slave thread initialized"); + sql_print_error("Slave I/O thread initialized"); #endif // we can get killed during safe_connect - if (!safe_connect(thd, mysql, &glob_mi)) - sql_print_error("Slave: connected to master '%s@%s:%d',\ - replication started in log '%s' at position %s", glob_mi.user, - glob_mi.host, glob_mi.port, - RPL_LOG_NAME, - llstr(glob_mi.pos,llbuff)); + if (!safe_connect(thd, mysql, mi)) + sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\ + replication started in log '%s' at position %s", mi->user, + mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); else { - sql_print_error("Slave thread killed while connecting to master"); + sql_print_error("Slave I/O thread killed while connecting to master"); goto err; } connected: thd->slave_net = &mysql->net; - // register ourselves with the master - // if fails, this is not fatal - we just print the error message and go - // on with life thd->proc_info = "Checking master version"; - if (check_master_version(mysql, &glob_mi)) + if (check_master_version(mysql, mi)) { goto err; } - if (!glob_mi.old_format) + if (!mi->old_format) { + // register ourselves with the master + // if fails, this is not fatal - we just print the error message and go + // on with life thd->proc_info = "Registering slave on master"; if (register_slave_on_master(mysql) || update_slave_list(mysql)) goto err; } - while (!slave_killed(thd)) + while (!slave_killed(thd,mi)) { thd->proc_info = "Requesting binlog dump"; - if(request_dump(mysql, &glob_mi)) + if (request_dump(mysql, mi)) { sql_print_error("Failed on request_dump()"); - if(slave_killed(thd)) + if(slave_killed(thd,mi)) { - sql_print_error("Slave thread killed while requesting master \ + sql_print_error("Slave I/O thread killed while requesting master \ dump"); goto err; } thd->proc_info = "Waiiting to reconnect after a failed dump request"; - if(mysql->net.vio) - vio_close(mysql->net.vio); + mc_end_server(mysql); // first time retry immediately, assuming that we can recover // right away - if first time fails, sleep between re-tries // hopefuly the admin can fix the problem sometime - if(retried_once) - safe_sleep(thd, glob_mi.connect_retry); + if (retried_once) + safe_sleep(thd, mi, mi->connect_retry); else retried_once = 1; - if(slave_killed(thd)) + if (slave_killed(thd,mi)) { - sql_print_error("Slave thread killed while retrying master \ + sql_print_error("Slave I/O thread killed while retrying master \ dump"); goto err; } thd->proc_info = "Reconnecting after a failed dump request"; - last_failed_pos=glob_mi.pos; - sql_print_error("Slave: failed dump request, reconnecting to \ -try again, log '%s' at postion %s", RPL_LOG_NAME, - llstr(last_failed_pos,llbuff)); - if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd)) + sql_print_error("Slave I/O thread: failed dump request, \ +reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); + if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi)) { - sql_print_error("Slave thread killed during or after reconnect"); + sql_print_error("Slave I/O thread killed during or \ +after reconnect"); goto err; } goto connected; } - - while(!slave_killed(thd)) + while (!slave_killed(thd,mi)) { thd->proc_info = "Reading master update"; - ulong event_len = read_event(mysql, &glob_mi); - if(slave_killed(thd)) + ulong event_len = read_event(mysql, mi); + if (slave_killed(thd,mi)) { - sql_print_error("Slave thread killed while reading event"); + sql_print_error("Slave I/O thread killed while reading event"); goto err; } - if (event_len == packet_error) { - if(mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE) + if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE) { sql_print_error("Log entry on master is longer than \ max_allowed_packet on slave. Slave thread will be aborted. If the entry is \ @@ -1214,99 +1695,72 @@ max_allowed_packet. The current value is %ld", max_allowed_packet); } thd->proc_info = "Waiting to reconnect after a failed read"; - if(mysql->net.vio) - vio_close(mysql->net.vio); - if(retried_once) // punish repeat offender with sleep - safe_sleep(thd, glob_mi.connect_retry); + mc_end_server(mysql); + if (retried_once) // punish repeat offender with sleep + safe_sleep(thd,mi,mi->connect_retry); else retried_once = 1; - if(slave_killed(thd)) + if (slave_killed(thd,mi)) { - sql_print_error("Slave thread killed while waiting to \ + sql_print_error("Slave I/O thread killed while waiting to \ reconnect after a failed read"); goto err; } thd->proc_info = "Reconnecting after a failed read"; - last_failed_pos= glob_mi.pos; - sql_print_error("Slave: Failed reading log event, \ -reconnecting to retry, log '%s' position %s", RPL_LOG_NAME, - llstr(last_failed_pos, llbuff)); - if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd)) + sql_print_error("Slave I/O thread: Failed reading log event, \ +reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME, + llstr(mi->master_log_pos, llbuff)); + if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi)) { - sql_print_error("Slave thread killed during or after a \ + sql_print_error("Slave I/O thread killed during or after a \ reconnect done to recover from failed read"); goto err; } - goto connected; } // if(event_len == packet_error) - thd->proc_info = "Processing master log event"; - if(exec_event(thd, &mysql->net, &glob_mi, event_len)) - { - sql_print_error("\ -Error running query, slave aborted. Fix the problem, and re-start \ -the slave thread with \"mysqladmin start-slave\". We stopped at log \ -'%s' position %s", - RPL_LOG_NAME, llstr(glob_mi.pos, llbuff)); - goto err; - // there was an error running the query - // abort the slave thread, when the problem is fixed, the user - // should restart the slave with mysqladmin start-slave - } + thd->proc_info = "Queueing event from master"; + if (queue_event(mi,(const char*)mysql->net.read_pos + 1, + (uint)event_len)) + { + sql_print_error("Slave I/O thread could not queue event \ +from master"); + goto err; + } + // TODO: check debugging abort code #ifndef DBUG_OFF - if(abort_slave_event_count && !--events_till_abort) + if (abort_slave_event_count && !--events_till_abort) { - sql_print_error("Slave: debugging abort"); + sql_print_error("Slave I/O thread: debugging abort"); goto err; } #endif - - // successful exec with offset advance, - // the slave repents and his sins are forgiven! - if(glob_mi.pos > last_failed_pos) - { - retried_once = 0; -#ifndef DBUG_OFF - stuck_count = 0; -#endif - } -#ifndef DBUG_OFF - else - { - // show a little mercy, allow slave to read one more event - // before cutting him off - otherwise he gets stuck - // on Intvar events, since they do not advance the offset - // immediately - if (++stuck_count > 2) - events_till_disconnect++; - } -#endif - } // while(!slave_killed(thd)) - read/exec loop - } // while(!slave_killed(thd)) - slave loop + } // while(!slave_killed(thd,mi)) - read/exec loop + } // while(!slave_killed(thd,mi)) - slave loop // error = 0; err: - // print the current replication position - sql_print_error("Slave thread exiting, replication stopped in log '%s' at \ -position %s", - RPL_LOG_NAME, llstr(glob_mi.pos,llbuff)); + // print the current replication position + sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s", + IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); thd->query = thd->db = 0; // extra safety if(mysql) mc_mysql_close(mysql); thd->proc_info = "Waiting for slave mutex on exit"; - pthread_mutex_lock(&LOCK_slave); - slave_running = 0; + pthread_mutex_lock(&mi->run_lock); + mi->slave_running = 0; + mi->io_thd = 0; + // TODO: make rpl_status part of MASTER_INFO change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); - abort_slave = 0; - save_temporary_tables = thd->temporary_tables; - thd->temporary_tables = 0; // remove tempation from destructor to close them - pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done - pthread_mutex_unlock(&LOCK_slave); + mi->abort_slave = 0; // TODO: check if this is needed + DBUG_ASSERT(thd->net.buff != 0); net_end(&thd->net); // destructor will not free it, because we are weird - slave_thd = 0; + pthread_mutex_lock(&LOCK_thread_count); delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done + pthread_mutex_unlock(&mi->run_lock); my_thread_end(); #ifndef DBUG_OFF if(abort_slave_event_count && !events_till_abort) @@ -1316,9 +1770,185 @@ position %s", DBUG_RETURN(0); // Can't return anything here } +/* slave SQL logic thread */ -/* try to connect until successful or slave killed */ +pthread_handler_decl(handle_slave_sql,arg) +{ +#ifndef DBUG_OFF + slave_begin: +#endif + THD *thd; /* needs to be first for thread_stack */ + MYSQL *mysql = NULL ; + bool retried_once = 0; + ulonglong last_failed_pos = 0; // TODO: see if this can be removed + char llbuff[22],llbuff1[22]; + RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; + const char* errmsg=0; + DBUG_ASSERT(rli->inited); + pthread_mutex_lock(&rli->run_lock); + DBUG_ASSERT(!rli->slave_running); +#ifndef DBUG_OFF + rli->events_till_abort = abort_slave_event_count; +#endif + + + // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff + my_thread_init(); + thd = new THD; // note that contructor of THD uses DBUG_ ! + DBUG_ENTER("handle_slave_sql"); + + pthread_detach_this_thread(); + if (init_slave_thread(thd, SLAVE_THD_SQL)) + { + // TODO: this is currently broken - slave start and change master + // will be stuck if we fail here + pthread_cond_broadcast(&rli->start_cond); + pthread_mutex_unlock(&rli->run_lock); + sql_print_error("Failed during slave thread initialization"); + goto err; + } + thd->thread_stack = (char*)&thd; // remember where our stack is + thd->temporary_tables = rli->save_temporary_tables; // restore temp tables + threads.append(thd); + rli->sql_thd = thd; + rli->slave_running = 1; + rli->abort_slave = 0; + pthread_cond_broadcast(&rli->start_cond); + pthread_mutex_unlock(&rli->run_lock); + rli->pending = 0; //this should always be set to 0 when the slave thread + // is started + if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg)) + { + sql_print_error("Error initializing relay log position: %s", + errmsg); + goto err; + } + DBUG_ASSERT(rli->relay_log_pos >= 4); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos); + + DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", + rli->master_log_name, llstr(rli->master_log_pos,llbuff))); + DBUG_ASSERT(rli->sql_thd == thd); + sql_print_error("Slave SQL thread initialized, starting replication in \ +log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME, + llstr(rli->master_log_pos,llbuff),rli->relay_log_name, + llstr(rli->relay_log_pos,llbuff1)); + while (!slave_killed(thd,rli)) + { + thd->proc_info = "Processing master log event"; + DBUG_ASSERT(rli->sql_thd == thd); + if (exec_relay_log_event(thd,rli)) + { + // do not scare the user if SQL thread was simply killed or stopped + if (!slave_killed(thd,rli)) + sql_print_error("\ +Error running query, slave SQL thread aborted. Fix the problem, and restart \ +the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \ +'%s' position %s", + RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff)); + goto err; + } + } // while(!slave_killed(thd,rli)) - read/exec loop + + // error = 0; + err: + // print the current replication position + sql_print_error("Slave SQL thread exiting, replication stopped in log \ + '%s' at position %s", + RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff)); + thd->query = thd->db = 0; // extra safety + thd->proc_info = "Waiting for slave mutex on exit"; + pthread_mutex_lock(&rli->run_lock); + DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun + rli->slave_running = 0; + rli->save_temporary_tables = thd->temporary_tables; + //TODO: see if we can do this conditionally in next_event() instead + // to avoid unneeded position re-init + rli->log_pos_current=0; + thd->temporary_tables = 0; // remove tempation from destructor to close them + DBUG_ASSERT(thd->net.buff != 0); + net_end(&thd->net); // destructor will not free it, because we are weird + DBUG_ASSERT(rli->sql_thd == thd); + rli->sql_thd = 0; + pthread_mutex_lock(&LOCK_thread_count); + delete thd; + pthread_mutex_unlock(&LOCK_thread_count); + pthread_cond_broadcast(&rli->stop_cond); + // tell the world we are done + pthread_mutex_unlock(&rli->run_lock); + my_thread_end(); +#ifndef DBUG_OFF // TODO: reconsider the code below + if (abort_slave_event_count && !rli->events_till_abort) + goto slave_begin; +#endif + pthread_exit(0); + DBUG_RETURN(0); // Can't return anything here +} +int queue_event(MASTER_INFO* mi,const char* buf,uint event_len) +{ + int error; + bool inc_pos = 1; + if (mi->old_format) + return 1; // TODO: deal with old format + + switch (buf[EVENT_TYPE_OFFSET]) + { + case ROTATE_EVENT: + { + Rotate_log_event rev(buf,event_len,0); + if (!rev.is_valid()) + return 1; + DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name)); + memcpy(mi->master_log_name,rev.new_log_ident, + rev.ident_len); + mi->master_log_name[rev.ident_len] = 0; + mi->master_log_pos = rev.pos; + inc_pos = 0; +#ifndef DBUG_OFF + /* if we do not do this, we will be getting the first + rotate event forever, so + we need to not disconnect after one + */ + if (disconnect_slave_event_count) + events_till_disconnect++; +#endif + break; + } + default: + break; + } + + if (!(error = mi->rli.relay_log.appendv(buf,event_len,0))) + { + if (inc_pos) + mi->master_log_pos += event_len; + } + return error; +} + +void end_relay_log_info(RELAY_LOG_INFO* rli) +{ + if (!rli->inited) + return; + if (rli->info_fd >= 0) + { + end_io_cache(&rli->info_file); + (void)my_close(rli->info_fd, MYF(MY_WME)); + rli->info_fd = -1; + } + if (rli->cur_log_fd >= 0) + { + end_io_cache(&rli->cache_buf); + (void)my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + } + rli->inited = 0; + rli->log_pos_current=0; + rli->relay_log.close(1); +} + +/* try to connect until successful or slave killed */ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) { return connect_to_master(thd, mysql, mi, 0); @@ -1328,7 +1958,6 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) Try to connect until successful or slave killed or we have retried master_retry_count times */ - static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, bool reconnect) { @@ -1337,15 +1966,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, ulong err_count=0; char llbuff[22]; - /* - If we lost connection after reading a state set event - we will be re-reading it, so pending needs to be cleared - */ - mi->pending = 0; #ifndef DBUG_OFF events_till_disconnect = disconnect_slave_event_count; #endif - while (!(slave_was_killed = slave_killed(thd)) && + while (!(slave_was_killed = slave_killed(thd,mi)) && (reconnect ? mc_mysql_reconnect(mysql) : !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0, mi->port, 0, 0))) @@ -1353,12 +1977,13 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, /* Don't repeat last error */ if (mc_mysql_errno(mysql) != last_errno) { - sql_print_error("Slave thread: error connecting to master: \ -%s, last_errno=%d, retry in %d sec", + sql_print_error("Slave I/O thread: error connecting to master \ +'%s@%s:%d': \ +%s, last_errno=%d, retry in %d sec",mi->user,mi->host,mi->port, mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql), mi->connect_retry); } - safe_sleep(thd, mi->connect_retry); + safe_sleep(thd,mi,mi->connect_retry); /* by default we try forever. The reason is that failure will trigger master election, so if the user did not set master_retry_count we do not want to have electioin triggered on the first failure to @@ -1377,10 +2002,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi, { if (reconnect) sql_print_error("Slave: connected to master '%s@%s:%d',\ -replication resumed in log '%s' at position %s", glob_mi.user, - glob_mi.host, glob_mi.port, - RPL_LOG_NAME, - llstr(glob_mi.pos,llbuff)); +replication resumed in log '%s' at position %s", mi->user, + mi->host, mi->port, + IO_RPL_LOG_NAME, + llstr(mi->master_log_pos,llbuff)); else { change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE); @@ -1405,6 +2030,175 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) return connect_to_master(thd, mysql, mi, 1); } +int flush_relay_log_info(RELAY_LOG_INFO* rli) +{ + IO_CACHE* file = &rli->info_file; + char lbuf[22],lbuf1[22]; + + my_b_seek(file, 0L); + my_b_printf(file, "%s\n%s\n%s\n%s\n", + rli->relay_log_name, llstr(rli->relay_log_pos, lbuf), + rli->master_log_name, llstr(rli->master_log_pos, lbuf1) + ); + flush_io_cache(file); + flush_io_cache(rli->cur_log); + return 0; +} + +IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg) +{ + DBUG_ASSERT(rli->cur_log != &rli->cache_buf); + IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf; + DBUG_ASSERT(rli->cur_log_fd == -1); + if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name, + errmsg))<0) + return 0; + my_b_seek(cur_log,rli->relay_log_pos); + return cur_log; +} + +Log_event* next_event(RELAY_LOG_INFO* rli) +{ + Log_event* ev; + IO_CACHE* cur_log = rli->cur_log; + pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); + const char* errmsg=0; + THD* thd = rli->sql_thd; + bool was_killed; + DBUG_ASSERT(thd != 0); + + // For most operations we need to protect rli members with data_lock, + // so we will hold it for the most of the loop below + // However, we will release it whenever it is worth the hassle, + // and in the cases when we go into a pthread_cond_wait() with the + // non-data_lock mutex + pthread_mutex_lock(&rli->data_lock); + + for (;!(was_killed=slave_killed(thd,rli));) + { + // we can have two kinds of log reading: + // hot_log - rli->cur_log points at the IO_CACHE of relay_log, which + // is actively being updated by the I/O thread. We need to be careful + // in this case and make sure that we are not looking at a stale log that + // has already been rotated. If it has been, we reopen the log + // the other case is much simpler - we just have a read only log that + // nobody else will be updating. + bool hot_log; + if ((hot_log = (cur_log != &rli->cache_buf))) + { + DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor + pthread_mutex_lock(log_lock); + // reading cur_log->init_count here is safe because the log will only + // be rotated when we hold relay_log.LOCK_log + if (cur_log->init_count != rli->cur_log_init_count) + { + if (!(cur_log=reopen_relay_log(rli,&errmsg))) + { + pthread_mutex_unlock(log_lock); + goto err; + } + pthread_mutex_unlock(log_lock); + hot_log=0; + } + } + DBUG_ASSERT(my_b_tell(cur_log) >= 4); + DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); + if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format))) + { + DBUG_ASSERT(thd==rli->sql_thd); + if (hot_log) + pthread_mutex_unlock(log_lock); + pthread_mutex_unlock(&rli->data_lock); + return ev; + } + DBUG_ASSERT(thd==rli->sql_thd); + if (!cur_log->error) /* EOF */ + { + // on a hot log, EOF means that there are no more updates to + // process and we must block until I/O thread adds some and + // signals us to continue + if (hot_log) + { + DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count); + //we can, and should release data_lock while we are waiting for + // update. If we do not, show slave status will block + pthread_mutex_unlock(&rli->data_lock); + + // IMPORTANT: note that wait_for_update will unlock LOCK_log, but + // expects the caller to lock it + rli->relay_log.wait_for_update(rli->sql_thd); + + // re-acquire data lock since we released it earlier + pthread_mutex_lock(&rli->data_lock); + continue; + } + // if the log was not hot, we need to move to the next log in + // sequence. The next log could be hot or cold, we deal with both + // cases separately after doing some common initialization + else + { + end_io_cache(cur_log); + DBUG_ASSERT(rli->cur_log_fd >= 0); + my_close(rli->cur_log_fd, MYF(MY_WME)); + rli->cur_log_fd = -1; + int error; + + // purge_first_log will properly set up relay log coordinates in rli + if (rli->relay_log.purge_first_log(rli)) + { + errmsg = "Error purging processed log"; + goto err; + } + + // next log is hot + if (rli->relay_log.is_active(rli->linfo.log_file_name)) + { +#ifdef EXTRA_DEBUG + sql_print_error("next log '%s' is currently active", + rli->linfo.log_file_name); +#endif + rli->cur_log = cur_log = rli->relay_log.get_log_file(); + rli->cur_log_init_count = cur_log->init_count; + DBUG_ASSERT(rli->cur_log_fd == -1); + + // read pointer has to be at the start since we are the only + // reader + if (check_binlog_magic(cur_log,&errmsg)) + goto err; + continue; + } + // if we get here, the log was not hot, so we will have to + // open it ourselves +#ifdef EXTRA_DEBUG + sql_print_error("next log '%s' is not active", + rli->linfo.log_file_name); +#endif + // open_binlog() will check the magic header + if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, + &errmsg))<0) + goto err; + } + } + else // read failed with a non-EOF error + { + // TODO: come up with something better to handle this error + sql_print_error("Slave SQL thread: I/O error reading \ +event(errno=%d,cur_log->error=%d)", + my_errno,cur_log->error); + // no need to hog the mutex while we sleep + pthread_mutex_unlock(&rli->data_lock); + safe_sleep(rli->sql_thd,rli->mi,1); + pthread_mutex_lock(&rli->data_lock); + } + } + if (!errmsg && was_killed) + errmsg = "slave SQL thread was killed"; +err: + pthread_mutex_unlock(&rli->data_lock); + sql_print_error("Error reading relay log event: %s", errmsg); + return 0; +} + #ifdef __GNUC__ template class I_List_iterator<i_string>; |