summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2002-01-19 19:16:52 -0700
committerunknown <sasha@mysql.sashanet.com>2002-01-19 19:16:52 -0700
commit5df61c3cdc4499197e420a76b25b942dce0f3ccc (patch)
tree87da2fd65f79c28f4b97c4619f95b07797107d82 /sql/slave.cc
parent0831ce1c616296196eff82065da469156b4def82 (diff)
downloadmariadb-git-5df61c3cdc4499197e420a76b25b942dce0f3ccc.tar.gz
Here comes a nasty patch, although I am not ready to push it yet. I will
first pull, merge,test, and get it to work. The main change is the new replication code - now we have two slave threads SQL thread and I/O thread. I have also re-written a lot of the code to prepare for multi-master implementation. I also documented IO_CACHE quite extensively and to some extend, THD class. Makefile.am: moved tags target script into a separate file include/my_sys.h: fixes in IO_CACHE for SEQ_READ_APPEND + some documentation libmysqld/lib_sql.cc: updated replication locks, but now I see I did it wrong and it won't compile. Will fix before the push. mysql-test/r/rpl000014.result: test result update mysql-test/r/rpl000015.result: test result update mysql-test/r/rpl000016.result: test result update mysql-test/r/rpl_log.result: test result update mysql-test/t/rpl000016-slave.sh: remove relay logs mysql-test/t/rpl000017-slave.sh: remove relay logs mysql-test/t/rpl_log.test: updated test mysys/mf_iocache.c: IO_CACHE updates to make replication work mysys/mf_iocache2.c: IO_CACHE update to make replication work mysys/thr_mutex.c: cosmetic change sql/item_func.cc: new replication code sql/lex.h: new replication sql/log.cc: new replication sql/log_event.cc: new replication sql/log_event.h: new replication sql/mini_client.cc: new replication sql/mini_client.h: new replication sql/mysql_priv.h: new replication sql/mysqld.cc: new replication sql/repl_failsafe.cc: new replication sql/slave.cc: new replication sql/slave.h: new replication sql/sql_class.cc: new replication sql/sql_class.h: new replication sql/sql_lex.h: new replication sql/sql_parse.cc: new replication sql/sql_repl.cc: new replication sql/sql_repl.h: new replication sql/sql_show.cc: new replication sql/sql_yacc.yy: new replication sql/stacktrace.c: more robust stack tracing sql/structs.h: new replication code BitKeeper/etc/ignore: Added mysql-test/r/rpl000002.eval mysql-test/r/rpl000014.eval mysql-test/r/rpl000015.eval mysql-test/r/rpl000016.eval mysql-test/r/slave-running.eval mysql-test/r/slave-stopped.eval to the ignore list
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc1416
1 files changed, 1105 insertions, 311 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 700838d7cd7..e68741e7434 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -24,24 +24,23 @@
#include "repl_failsafe.h"
#include <thr_alarm.h>
#include <my_dir.h>
+#include <assert.h>
-volatile bool slave_running = 0;
+volatile bool slave_sql_running = 0, slave_io_running = 0;
char* slave_load_tmpdir = 0;
-pthread_t slave_real_id;
-MASTER_INFO glob_mi;
+MASTER_INFO main_mi;
+MASTER_INFO* active_mi;
+volatile int active_mi_in_use = 0;
HASH replicate_do_table, replicate_ignore_table;
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
bool do_table_inited = 0, ignore_table_inited = 0;
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
bool table_rules_on = 0;
-uint32 slave_skip_counter = 0;
static TABLE* save_temporary_tables = 0;
-THD* slave_thd = 0;
// when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start
-int last_slave_errno = 0;
-char last_slave_error[MAX_SLAVE_ERRMSG] = "";
+// TODO: move the vars below under MASTER_INFO
#ifndef DBUG_OFF
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1;
@@ -49,15 +48,17 @@ int events_till_abort = -1;
static int stuck_count = 0;
#endif
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net);
-inline bool slave_killed(THD* thd);
-static int init_slave_thread(THD* thd);
+static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
+static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli);
+static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect);
-static int safe_sleep(THD* thd, int sec);
+static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
@@ -65,6 +66,79 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
char* rewrite_db(char* db);
+void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
+{
+ bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
+ if (inverse)
+ {
+ /* This makes me think of the Russian idiom "I am not I, and this is
+ not my horse", which is used to deny reponsibility for
+ one's actions.
+ */
+ set_io = !set_io;
+ set_sql = !set_sql;
+ }
+ register int tmp_mask=0;
+ if (set_io)
+ tmp_mask |= SLAVE_IO;
+ if (set_sql)
+ tmp_mask |= SLAVE_SQL;
+ *mask = tmp_mask;
+}
+
+void lock_slave_threads(MASTER_INFO* mi)
+{
+ //TODO: see if we can do this without dual mutex
+ pthread_mutex_lock(&mi->run_lock);
+ pthread_mutex_lock(&mi->rli.run_lock);
+}
+
+void unlock_slave_threads(MASTER_INFO* mi)
+{
+ //TODO: see if we can do this without dual mutex
+ pthread_mutex_unlock(&mi->rli.run_lock);
+ pthread_mutex_unlock(&mi->run_lock);
+}
+
+int init_slave()
+{
+ // TODO (multi-master): replace this with list initialization
+ active_mi = &main_mi;
+
+ // TODO: the code below is a copy-paste mess - clean it up
+ /*
+ make sure slave thread gets started if server_id is set,
+ valid master.info is present, and master_host has not been specified
+ */
+ if (server_id && !master_host)
+ {
+ // TODO: re-write this to interate through the list of files
+ // for multi-master
+ char fname[FN_REFLEN+128];
+ MY_STAT stat_area;
+ fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
+ if (my_stat(fname, &stat_area, MYF(0)) &&
+ !init_master_info(active_mi,master_info_file,relay_log_info_file))
+ master_host = active_mi->host;
+ }
+ // slave thread
+ if (master_host)
+ {
+ if (!opt_skip_slave_start && start_slave_threads(1 /* need mutex */,
+ 0 /* no wait for start*/,
+ active_mi,
+ master_info_file,
+ relay_log_info_file,
+ SLAVE_IO|SLAVE_SQL
+ ))
+ sql_print_error("Warning: Can't create threads to handle slave");
+ else if (opt_skip_slave_start)
+ if (init_master_info(active_mi, master_info_file, relay_log_info_file))
+ sql_print_error("Warning: failed to initialized master info");
+ }
+ return 0;
+}
+
static void free_table_ent(TABLE_RULE_ENT* e)
{
my_free((gptr) e, MYF(0));
@@ -77,6 +151,285 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
return (byte*)e->db;
}
+// TODO: check proper initialization of master_log_name/master_log_pos
+int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
+ ulonglong pos, bool need_data_lock,
+ const char** errmsg)
+{
+ if (rli->log_pos_current)
+ return 0;
+ pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
+ pthread_mutex_lock(log_lock);
+ if (need_data_lock)
+ pthread_mutex_lock(&rli->data_lock);
+
+ if (rli->cur_log_fd >= 0)
+ {
+ end_io_cache(&rli->cache_buf);
+ my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ }
+
+ if (!log)
+ log = rli->relay_log_name; // already inited
+ if (!pos)
+ pos = rli->relay_log_pos; // already inited
+ else
+ rli->relay_log_pos = pos;
+ if (rli->relay_log.find_first_log(&rli->linfo,log))
+ {
+ *errmsg="Could not find first log during relay log initialization";
+ goto err;
+ }
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ // to make end_io_cache(&rli->cache_buf) safe in all cases
+ if (!rli->inited)
+ bzero((char*) &rli->cache_buf, sizeof(IO_CACHE));
+ if (rli->relay_log.is_active(rli->linfo.log_file_name))
+ {
+ if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
+ check_binlog_magic(rli->cur_log,errmsg))
+ {
+ goto err;
+ }
+ rli->cur_log_init_count=rli->cur_log->init_count;
+ }
+ else
+ {
+ if (rli->inited)
+ end_io_cache(&rli->cache_buf);
+ if (rli->cur_log_fd>=0)
+ my_close(rli->cur_log_fd,MYF(MY_WME));
+ if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
+ rli->linfo.log_file_name,errmsg)) < 0)
+ {
+ goto err;
+ }
+ rli->cur_log = &rli->cache_buf;
+ }
+ if (pos > 4)
+ my_b_seek(rli->cur_log,(off_t)pos);
+ rli->log_pos_current=1;
+err:
+ pthread_cond_broadcast(&rli->data_cond);
+ if (need_data_lock)
+ pthread_mutex_unlock(&rli->data_lock);
+ pthread_mutex_unlock(log_lock);
+ return (*errmsg) ? 1 : 0;
+}
+
+// we assume we have a run lock on rli and that the both slave thread
+// are not running
+int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
+{
+ if (!rli->inited)
+ return 0; /* successfully do nothing */
+ DBUG_ASSERT(rli->slave_running == 0);
+ DBUG_ASSERT(rli->mi->slave_running == 0);
+ int error=0;
+ rli->slave_skip_counter=0;
+ pthread_mutex_lock(&rli->data_lock);
+ rli->pending=0;
+ rli->master_log_name[0]=0;
+ rli->master_log_pos=0; // 0 means uninitialized
+ if (rli->relay_log.reset_logs(rli->sql_thd) ||
+ rli->relay_log.find_first_log(&rli->linfo,""))
+ {
+ *errmsg = "Failed during log reset";
+ error=1;
+ goto err;
+ }
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ rli->relay_log_pos=4;
+ rli->log_pos_current=0;
+ if (!just_reset)
+ error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg);
+err:
+ pthread_mutex_unlock(&rli->data_lock);
+ return error;
+}
+
+int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
+{
+ if (!mi->inited)
+ return 0; /* successfully do nothing */
+ int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
+ pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
+ pthread_mutex_t *sql_cond_lock,*io_cond_lock;
+
+ sql_cond_lock=sql_lock;
+ io_cond_lock=io_lock;
+
+ if (skip_lock)
+ {
+ sql_lock = io_lock = 0;
+ }
+ if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
+ {
+ mi->abort_slave=1;
+ if ((error=terminate_slave_thread(mi->io_thd,io_lock,
+ io_cond_lock,
+ &mi->stop_cond,
+ &mi->slave_running)) &&
+ !force_all)
+ return error;
+ }
+ if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
+ {
+ DBUG_ASSERT(mi->rli.sql_thd != 0) ;
+ mi->rli.abort_slave=1;
+ if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
+ sql_cond_lock,
+ &mi->rli.stop_cond,
+ &mi->rli.slave_running)) &&
+ !force_all)
+ return error;
+ }
+ return 0;
+}
+
+int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
+ pthread_mutex_t *cond_lock,
+ pthread_cond_t* term_cond,
+ volatile bool* slave_running)
+{
+ if (term_lock)
+ {
+ pthread_mutex_lock(term_lock);
+ if (!*slave_running)
+ {
+ pthread_mutex_unlock(term_lock);
+ return ER_SLAVE_NOT_RUNNING;
+ }
+ }
+ DBUG_ASSERT(thd != 0);
+ KICK_SLAVE(thd);
+ while (*slave_running)
+ {
+ /* there is a small chance that slave thread might miss the first
+ alarm. To protect againts it, resend the signal until it reacts
+ */
+ struct timespec abstime;
+#ifdef HAVE_TIMESPEC_TS_SEC
+ abstime.ts_sec=time(NULL)+2;
+ abstime.ts_nsec=0;
+#elif defined(__WIN__)
+ abstime.tv_sec=time((time_t*) 0)+2;
+ abstime.tv_nsec=0;
+#else
+ struct timeval tv;
+ gettimeofday(&tv,0);
+ abstime.tv_sec=tv.tv_sec+2;
+ abstime.tv_nsec=tv.tv_usec*1000;
+#endif
+ pthread_cond_timedwait(term_cond, cond_lock, &abstime);
+ if (*slave_running)
+ KICK_SLAVE(thd);
+ }
+ if (term_lock)
+ pthread_mutex_unlock(term_lock);
+ return 0;
+}
+
+int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
+ pthread_mutex_t *cond_lock,
+ pthread_cond_t* start_cond,
+ volatile bool* slave_running,
+ MASTER_INFO* mi)
+{
+ pthread_t th;
+ DBUG_ASSERT(mi->inited);
+ if (start_lock)
+ pthread_mutex_lock(start_lock);
+ if (!server_id)
+ {
+ if (start_cond)
+ pthread_cond_broadcast(start_cond);
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ sql_print_error("Server id not set, will not start slave");
+ return ER_BAD_SLAVE;
+ }
+
+ if (*slave_running)
+ {
+ if (start_cond)
+ pthread_cond_broadcast(start_cond);
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return ER_SLAVE_MUST_STOP;
+ }
+ if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
+ {
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return ER_SLAVE_THREAD;
+ }
+ if (start_cond && cond_lock)
+ {
+ THD* thd = current_thd;
+ while (!*slave_running)
+ {
+ const char* old_msg = thd->enter_cond(start_cond,cond_lock,
+ "Waiting for slave thread to start");
+ pthread_cond_wait(start_cond,cond_lock);
+ thd->exit_cond(old_msg);
+ // TODO: in a very rare case of init_slave_thread failing, it is
+ // possible that we can get stuck here since slave_running will not
+ // be set. We need to change slave_running to int and have -1 as
+ // error code
+ if (thd->killed)
+ {
+ pthread_mutex_unlock(cond_lock);
+ return ER_SERVER_SHUTDOWN;
+ }
+ }
+ }
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return 0;
+}
+/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
+ sense to do that for starting a slave - we always care if it actually
+ started the threads that were not previously running
+*/
+int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
+ MASTER_INFO* mi, const char* master_info_fname,
+ const char* slave_info_fname, int thread_mask)
+{
+ pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
+ pthread_cond_t* cond_io=0,*cond_sql=0;
+ int error=0;
+
+ if (need_slave_mutex)
+ {
+ lock_io = &mi->run_lock;
+ lock_sql = &mi->rli.run_lock;
+ }
+ if (wait_for_start)
+ {
+ cond_io = &mi->start_cond;
+ cond_sql = &mi->rli.start_cond;
+ lock_cond_io = &mi->run_lock;
+ lock_cond_sql = &mi->rli.run_lock;
+ }
+ if (init_master_info(mi,master_info_fname,slave_info_fname))
+ return ER_MASTER_INFO;
+
+ if ((thread_mask & SLAVE_IO) &&
+ (error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
+ cond_io,&mi->slave_running,
+ mi)))
+ return error;
+ if ((thread_mask & SLAVE_SQL) &&
+ (error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
+ cond_sql,
+ &mi->rli.slave_running,mi)))
+ return error;
+ return 0;
+}
void init_table_rule_hash(HASH* h, bool* h_inited)
{
@@ -98,11 +451,11 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
uint i;
const char* key_end = key + len;
- for(i = 0; i < a->elements; i++)
+ for (i = 0; i < a->elements; i++)
{
TABLE_RULE_ENT* e ;
get_dynamic(a, (gptr)&e, i);
- if(!wild_case_compare(key, key_end, (const char*)e->db,
+ if (!wild_case_compare(key, key_end, (const char*)e->db,
(const char*)(e->db + e->key_len),'\\'))
return e;
}
@@ -126,7 +479,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
if (hash_search(&replicate_do_table, (byte*) hash_key, len))
return 1;
}
- if (ignore_table_inited) // if there are any do's
+ if (ignore_table_inited) // if there are any ignores
{
if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
return 0;
@@ -191,44 +544,52 @@ static void free_string_array(DYNAMIC_ARRAY *a)
delete_dynamic(a);
}
+static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
+{
+ end_master_info(mi);
+ return 0;
+}
+
void end_slave()
{
- pthread_mutex_lock(&LOCK_slave);
- if (slave_running)
- {
- abort_slave = 1;
- thr_alarm_kill(slave_real_id);
-#ifdef SIGNAL_WITH_VIO_CLOSE
- slave_thd->close_active_vio();
-#endif
- while (slave_running)
- pthread_cond_wait(&COND_slave_stopped, &LOCK_slave);
- }
- pthread_mutex_unlock(&LOCK_slave);
-
- end_master_info(&glob_mi);
- if(do_table_inited)
+ // TODO: replace the line below with
+ // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
+ // once multi-master code is ready
+ terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
+ end_master_info(active_mi);
+ if (do_table_inited)
hash_free(&replicate_do_table);
- if(ignore_table_inited)
+ if (ignore_table_inited)
hash_free(&replicate_ignore_table);
- if(wild_do_table_inited)
+ if (wild_do_table_inited)
free_string_array(&replicate_wild_do_table);
- if(wild_ignore_table_inited)
+ if (wild_ignore_table_inited)
free_string_array(&replicate_wild_ignore_table);
}
-inline bool slave_killed(THD* thd)
+static inline bool slave_killed(THD* thd, MASTER_INFO* mi)
{
- return abort_slave || abort_loop || thd->killed;
+ DBUG_ASSERT(mi->io_thd == thd);
+ DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun
+ return mi->abort_slave || abort_loop || thd->killed;
}
-void slave_print_error(int err_code, const char* msg, ...)
+static inline bool slave_killed(THD* thd, RELAY_LOG_INFO* rli)
+{
+ DBUG_ASSERT(rli->sql_thd == thd);
+ DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
+ return rli->abort_slave || abort_loop || thd->killed;
+}
+
+void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
{
va_list args;
va_start(args,msg);
- my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args);
- sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code);
- last_slave_errno = err_code;
+ my_vsnprintf(rli->last_slave_error,
+ sizeof(rli->last_slave_error), msg, args);
+ sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error,
+ err_code);
+ rli->last_slave_errno = err_code;
}
void skip_load_data_infile(NET* net)
@@ -476,16 +837,16 @@ err:
return error;
}
-int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
+int fetch_master_table(THD* thd, const char* db_name, const char* table_name,
MASTER_INFO* mi, MYSQL* mysql)
{
int error = 1;
- int nx_errno = 0;
+ int fetch_errno = 0;
bool called_connected = (mysql != NULL);
if (!called_connected && !(mysql = mc_mysql_init(NULL)))
{
- sql_print_error("fetch_nx_table: Error in mysql_init()");
- nx_errno = ER_GET_ERRNO;
+ sql_print_error("fetch_master_table: Error in mysql_init()");
+ fetch_errno = ER_GET_ERRNO;
goto err;
}
@@ -495,17 +856,17 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
{
sql_print_error("Could not connect to master while fetching table\
'%-64s.%-64s'", db_name, table_name);
- nx_errno = ER_CONNECT_TO_MASTER;
+ fetch_errno = ER_CONNECT_TO_MASTER;
goto err;
}
}
- if (slave_killed(thd))
+ if (thd->killed)
goto err;
if (request_table_dump(mysql, db_name, table_name))
{
- nx_errno = ER_GET_ERRNO;
- sql_print_error("fetch_nx_table: failed on table dump request ");
+ fetch_errno = ER_GET_ERRNO;
+ sql_print_error("fetch_master_table: failed on table dump request ");
goto err;
}
@@ -513,24 +874,25 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
table_name))
{
// create_table_from_dump will have sent the error alread
- sql_print_error("fetch_nx_table: failed on create table ");
+ sql_print_error("fetch_master_table: failed on create table ");
goto err;
}
-
error = 0;
-
err:
if (mysql && !called_connected)
mc_mysql_close(mysql);
- if (nx_errno && thd->net.vio)
- send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
+ if (fetch_errno && thd->net.vio)
+ send_error(&thd->net, fetch_errno, "Error in fetch_master_table");
thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
return error;
}
void end_master_info(MASTER_INFO* mi)
{
- if(mi->fd >= 0)
+ if (!mi->inited)
+ return;
+ end_relay_log_info(&mi->rli);
+ if (mi->fd >= 0)
{
end_io_cache(&mi->file);
(void)my_close(mi->fd, MYF(MY_WME));
@@ -539,21 +901,136 @@ void end_master_info(MASTER_INFO* mi)
mi->inited = 0;
}
-int init_master_info(MASTER_INFO* mi)
+int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
+{
+ if (rli->inited)
+ return 0;
+ MY_STAT stat_area;
+ char fname[FN_REFLEN+128];
+ int info_fd;
+ const char* msg = 0;
+ int error = 0;
+
+ fn_format(fname, info_fname,
+ mysql_data_home, "", 4+32);
+ pthread_mutex_lock(&rli->data_lock);
+ info_fd = rli->info_fd;
+ rli->pending = 0;
+ rli->cur_log_fd = -1;
+ rli->slave_skip_counter=0;
+ rli->log_pos_current=0;
+ // TODO: make this work with multi-master
+ if (!opt_relay_logname)
+ {
+ char tmp[FN_REFLEN];
+ /* TODO: The following should be using fn_format(); We just need to
+ first change fn_format() to cut the file name if it's too long.
+ */
+ strmake(tmp,glob_hostname,FN_REFLEN-5);
+ strmov(strcend(tmp,'.'),"-relay-bin");
+ opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
+ }
+ rli->relay_log.set_index_file_name(opt_relaylog_index_name);
+ open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin",
+ LOG_BIN, 1 /* read_append cache */,
+ 1 /* no auto events*/);
+
+ /* if file does not exist */
+ if (!my_stat(fname, &stat_area, MYF(0)))
+ {
+ // if someone removed the file from underneath our feet, just close
+ // the old descriptor and re-create the old file
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(MY_WME));
+ if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
+ || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
+ MYF(MY_WME)))
+ {
+ if(info_fd >= 0)
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+ }
+ if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg))
+ goto err;
+ rli->master_log_pos = 0; // uninitialized
+ rli->info_fd = info_fd;
+ }
+ else // file exists
+ {
+ if(info_fd >= 0)
+ reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
+ else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
+ || init_io_cache(&rli->info_file, info_fd,
+ IO_SIZE*2, READ_CACHE, 0L,
+ 0, MYF(MY_WME)))
+ {
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+ }
+
+ rli->info_fd = info_fd;
+ if (init_strvar_from_file(rli->relay_log_name,
+ sizeof(rli->relay_log_name), &rli->info_file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&rli->relay_log_pos,
+ &rli->info_file, 4) ||
+ init_strvar_from_file(rli->master_log_name,
+ sizeof(rli->master_log_name), &rli->info_file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&rli->master_log_pos,
+ &rli->info_file, 0))
+ {
+ msg="Error reading slave log configuration";
+ goto err;
+ }
+ if (init_relay_log_pos(rli,0 /*log already inited*/,
+ 0 /*pos already inited*/,
+ 0 /* no data lock*/,
+ &msg))
+ goto err;
+ }
+ DBUG_ASSERT(rli->relay_log_pos >= 4);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+ rli->inited = 1;
+ // now change the cache from READ to WRITE - must do this
+ // before flush_relay_log_info
+ reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
+ error=test(flush_relay_log_info(rli));
+ pthread_mutex_unlock(&rli->data_lock);
+ return error;
+
+err:
+ sql_print_error(msg);
+ end_io_cache(&rli->info_file);
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+}
+
+int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
+ const char* slave_info_fname)
{
if (mi->inited)
return 0;
+ if (init_relay_log_info(&mi->rli, slave_info_fname))
+ return 1;
+ mi->rli.mi = mi;
int fd,length,error;
MY_STAT stat_area;
char fname[FN_REFLEN+128];
const char *msg;
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
// we need a mutex while we are changing master info parameters to
// keep other threads from reading bogus info
- pthread_mutex_lock(&mi->lock);
- mi->pending = 0;
+ pthread_mutex_lock(&mi->data_lock);
fd = mi->fd;
// we do not want any messages if the file does not exist
@@ -569,11 +1046,13 @@ int init_master_info(MASTER_INFO* mi)
{
if(fd >= 0)
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ end_relay_log_info(&mi->rli);
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
- mi->log_file_name[0] = 0;
- mi->pos = 4; // skip magic number
+ mi->master_log_name[0] = 0;
+ mi->master_log_pos = 4; // skip magic number
mi->fd = fd;
if (master_host)
@@ -595,28 +1074,19 @@ int init_master_info(MASTER_INFO* mi)
{
if(fd >= 0)
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ end_relay_log_info(&mi->rli);
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
-
- if ((length=my_b_gets(&mi->file, mi->log_file_name,
- sizeof(mi->log_file_name))) < 1)
- {
- msg="Error reading log file name from master info file ";
- goto error;
- }
-
- mi->log_file_name[length-1]= 0; // kill \n
- /* Reuse fname buffer */
- if(!my_b_gets(&mi->file, fname, sizeof(fname)))
- {
- msg="Error reading log file position from master info file";
- goto error;
- }
- mi->pos = strtoull(fname,(char**) 0, 10);
mi->fd = fd;
- if(init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
+ if (init_strvar_from_file(mi->master_log_name,
+ sizeof(mi->master_log_name), &mi->file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4)
+ ||
+ init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
master_host) ||
init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
master_user) ||
@@ -624,12 +1094,11 @@ int init_master_info(MASTER_INFO* mi)
master_password) ||
init_intvar_from_file((int*)&mi->port, &mi->file, master_port) ||
init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
- master_connect_retry) ||
- init_intvar_from_file((int*)&mi->last_log_seq, &mi->file, 0)
+ master_connect_retry)
)
{
msg="Error reading master configuration";
- goto error;
+ goto err;
}
}
@@ -638,14 +1107,17 @@ int init_master_info(MASTER_INFO* mi)
// before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
error=test(flush_master_info(mi));
- pthread_mutex_unlock(&mi->lock);
+ pthread_mutex_unlock(&mi->data_lock);
return error;
-error:
+err:
sql_print_error(msg);
end_io_cache(&mi->file);
+ end_relay_log_info(&mi->rli);
+ DBUG_ASSERT(fd>=0);
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
@@ -654,14 +1126,14 @@ int register_slave_on_master(MYSQL* mysql)
String packet;
char buf[4];
- if(!report_host)
+ if (!report_host)
return 0;
int4store(buf, server_id);
packet.append(buf, 4);
net_store_data(&packet, report_host);
- if(report_user)
+ if (report_user)
net_store_data(&packet, report_user);
else
packet.append((char)0);
@@ -678,7 +1150,7 @@ int register_slave_on_master(MYSQL* mysql)
int4store(buf, 0); /* tell the master will fill in master_id */
packet.append(buf, 4);
- if(mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
+ if (mc_simple_command(mysql, COM_REGISTER_SLAVE, (char*)packet.ptr(),
packet.length(), 0))
{
sql_print_error("Error on COM_REGISTER_SLAVE: '%s'",
@@ -689,52 +1161,61 @@ int register_slave_on_master(MYSQL* mysql)
return 0;
}
-
-int show_master_info(THD* thd)
+int show_master_info(THD* thd, MASTER_INFO* mi)
{
+ // TODO: fix this for multi-master
DBUG_ENTER("show_master_info");
List<Item> field_list;
field_list.push_back(new Item_empty_string("Master_Host",
- sizeof(glob_mi.host)));
+ sizeof(mi->host)));
field_list.push_back(new Item_empty_string("Master_User",
- sizeof(glob_mi.user)));
+ sizeof(mi->user)));
field_list.push_back(new Item_empty_string("Master_Port", 6));
field_list.push_back(new Item_empty_string("Connect_retry", 6));
- field_list.push_back( new Item_empty_string("Log_File",
+ field_list.push_back(new Item_empty_string("Master_Log_File",
FN_REFLEN));
- field_list.push_back(new Item_empty_string("Pos", 12));
- field_list.push_back(new Item_empty_string("Slave_Running", 3));
+ field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12));
+ field_list.push_back(new Item_empty_string("Relay_Log_File",
+ FN_REFLEN));
+ field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12));
+ field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
+ FN_REFLEN));
+ field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
+ field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
field_list.push_back(new Item_empty_string("Last_errno", 4));
field_list.push_back(new Item_empty_string("Last_error", 20));
field_list.push_back(new Item_empty_string("Skip_counter", 12));
- field_list.push_back(new Item_empty_string("Last_log_seq", 12));
+ field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
if(send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
String* packet = &thd->packet;
- uint32 last_log_seq;
packet->length(0);
- pthread_mutex_lock(&glob_mi.lock);
- net_store_data(packet, glob_mi.host);
- net_store_data(packet, glob_mi.user);
- net_store_data(packet, (uint32) glob_mi.port);
- net_store_data(packet, (uint32) glob_mi.connect_retry);
- net_store_data(packet, glob_mi.log_file_name);
- net_store_data(packet, (longlong) glob_mi.pos);
- last_log_seq = glob_mi.last_log_seq;
- pthread_mutex_unlock(&glob_mi.lock);
- pthread_mutex_lock(&LOCK_slave); // QQ; This is not needed
- net_store_data(packet, slave_running ? "Yes":"No");
- pthread_mutex_unlock(&LOCK_slave); // QQ; This is not needed
+ pthread_mutex_lock(&mi->data_lock);
+ pthread_mutex_lock(&mi->rli.data_lock);
+ net_store_data(packet, mi->host);
+ net_store_data(packet, mi->user);
+ net_store_data(packet, (uint32) mi->port);
+ net_store_data(packet, (uint32) mi->connect_retry);
+ net_store_data(packet, mi->master_log_name);
+ net_store_data(packet, (longlong) mi->master_log_pos);
+ net_store_data(packet, mi->rli.relay_log_name +
+ dirname_length(mi->rli.relay_log_name));
+ net_store_data(packet, (longlong) mi->rli.relay_log_pos);
+ net_store_data(packet, mi->rli.master_log_name);
+ net_store_data(packet, mi->slave_running ? "Yes":"No");
+ net_store_data(packet, mi->rli.slave_running ? "Yes":"No");
net_store_data(packet, &replicate_do_db);
net_store_data(packet, &replicate_ignore_db);
- net_store_data(packet, (uint32)last_slave_errno);
- net_store_data(packet, last_slave_error);
- net_store_data(packet, slave_skip_counter);
- net_store_data(packet, last_log_seq);
+ net_store_data(packet, (uint32)mi->rli.last_slave_errno);
+ net_store_data(packet, mi->rli.last_slave_error);
+ net_store_data(packet, mi->rli.slave_skip_counter);
+ net_store_data(packet, (longlong)mi->rli.master_log_pos);
+ pthread_mutex_unlock(&mi->rli.data_lock);
+ pthread_mutex_unlock(&mi->data_lock);
if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
DBUG_RETURN(-1);
@@ -747,58 +1228,64 @@ int flush_master_info(MASTER_INFO* mi)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
- char lbuf1[22];
my_b_seek(file, 0L);
my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n",
- mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user,
- mi->password, mi->port, mi->connect_retry,
- llstr(mi->last_log_seq, lbuf1));
+ mi->master_log_name, llstr(mi->master_log_pos, lbuf),
+ mi->host, mi->user,
+ mi->password, mi->port, mi->connect_retry
+ );
flush_io_cache(file);
return 0;
}
-int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos)
+/* TODO: the code below needs to be re-written almost from scratch
+ Main issue is how to find out if we have reached a certain position
+ in the master log my knowing the offset in the relay log.
+ */
+int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
+ ulonglong log_pos)
{
if (!inited) return -1;
- bool pos_reached;
+ bool pos_reached = 0;
int event_count = 0;
- pthread_mutex_lock(&lock);
- while(!thd->killed)
+ pthread_mutex_lock(&data_lock);
+ while (!thd->killed)
{
int cmp_result;
- if (*log_file_name)
+ DBUG_ASSERT(*master_log_name || master_log_pos == 0);
+ if (*master_log_name)
{
/*
We should use dirname_length() here when we have a version of
this that doesn't modify the argument */
- char *basename = strrchr(log_file_name, FN_LIBCHAR);
+ char *basename = strrchr(master_log_name, FN_LIBCHAR);
if (basename)
++basename;
else
- basename = log_file_name;
+ basename = master_log_name;
cmp_result = strncmp(basename, log_name->ptr(),
log_name->length());
}
else
cmp_result = 0;
- pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0);
+ pos_reached = ((!cmp_result && master_log_pos >= log_pos) ||
+ cmp_result > 0);
if (pos_reached || thd->killed)
break;
- const char* msg = thd->enter_cond(&cond, &lock,
+ const char* msg = thd->enter_cond(&data_cond, &data_lock,
"Waiting for master update");
- pthread_cond_wait(&cond, &lock);
+ pthread_cond_wait(&data_cond, &data_lock);
thd->exit_cond(msg);
event_count++;
}
- pthread_mutex_unlock(&lock);
+ pthread_mutex_unlock(&data_lock);
return thd->killed ? -1 : event_count;
}
-
-static int init_slave_thread(THD* thd)
+static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
{
DBUG_ENTER("init_slave_thread");
thd->system_thread = thd->bootstrap = 1;
@@ -812,7 +1299,7 @@ static int init_slave_thread(THD* thd)
thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
thd->system_thread = 1;
thd->client_capabilities = CLIENT_LOCAL_FILES;
- slave_real_id=thd->real_id=pthread_self();
+ thd->real_id=pthread_self();
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -822,7 +1309,6 @@ static int init_slave_thread(THD* thd)
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
{
- close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
end_thread(thd,0);
DBUG_RETURN(-1);
}
@@ -839,14 +1325,21 @@ static int init_slave_thread(THD* thd)
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
- thd->proc_info="Waiting for master update";
+ if (thd_type == SLAVE_THD_SQL)
+ {
+ thd->proc_info = "Waiting for the next event in slave queue";
+ }
+ else
+ {
+ thd->proc_info="Waiting for master update";
+ }
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
-static int safe_sleep(THD* thd, int sec)
+static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec)
{
thr_alarm_t alarmed;
thr_alarm_init(&alarmed);
@@ -869,21 +1362,20 @@ static int safe_sleep(THD* thd, int sec)
if (thr_alarm_in_use(&alarmed))
thr_end_alarm(&alarmed);
- if (slave_killed(thd))
+ if (slave_killed(thd,mi))
return 1;
start_time=time((time_t*) 0);
}
return 0;
}
-
static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
{
char buf[FN_REFLEN + 10];
int len;
int binlog_flags = 0; // for now
- char* logname = mi->log_file_name;
- int4store(buf, mi->pos);
+ char* logname = mi->master_log_name;
+ int4store(buf, mi->master_log_pos);
int2store(buf + 4, binlog_flags);
int4store(buf + 6, server_id);
len = (uint) strlen(logname);
@@ -929,7 +1421,6 @@ command");
return 0;
}
-
static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
{
ulong len = packet_error;
@@ -944,13 +1435,13 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
return packet_error;
#endif
- while (!abort_loop && !abort_slave && len == packet_error &&
+ while (!abort_loop && !mi->abort_slave && len == packet_error &&
read_errno == EINTR )
{
len = mc_net_safe_read(mysql);
read_errno = errno;
}
- if (abort_loop || abort_slave)
+ if (abort_loop || mi->abort_slave)
return packet_error;
if (len == packet_error || (long) len < 1)
{
@@ -973,65 +1464,74 @@ server_errno=%d)",
return len - 1;
}
-int check_expected_error(THD* thd, int expected_error)
+int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
{
- switch(expected_error)
+ switch (expected_error)
{
case ER_NET_READ_ERROR:
case ER_NET_ERROR_ON_WRITE:
case ER_SERVER_SHUTDOWN:
case ER_NEW_ABORTING_CONNECTION:
- my_snprintf(last_slave_error, sizeof(last_slave_error),
+ my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error),
"Slave: query '%s' partially completed on the master \
and was aborted. There is a chance that your master is inconsistent at this \
point. If you are sure that your master is ok, run this query manually on the\
slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\
SLAVE START;", thd->query);
- last_slave_errno = expected_error;
- sql_print_error("%s",last_slave_error);
+ rli->last_slave_errno = expected_error;
+ sql_print_error("%s",rli->last_slave_error);
return 1;
default:
return 0;
}
}
-static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
+static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
const char *error_msg;
- Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
- event_len, &error_msg,
- mi->old_format);
+ DBUG_ASSERT(rli->sql_thd==thd);
+ Log_event * ev = next_event(rli);
+ DBUG_ASSERT(rli->sql_thd==thd);
+ if (slave_killed(thd,rli))
+ return 1;
if (ev)
{
int type_code = ev->get_type_code();
int exec_res;
+ pthread_mutex_lock(&rli->data_lock);
if (ev->server_id == ::server_id ||
- (slave_skip_counter && type_code != ROTATE_EVENT))
+ (rli->slave_skip_counter && type_code != ROTATE_EVENT))
{
- if(type_code == LOAD_EVENT)
- skip_load_data_infile(net);
-
- mi->inc_pos(event_len, ev->log_seq);
- flush_master_info(mi);
- if(slave_skip_counter && /* protect against common user error of
+ /*
+ TODO: I/O thread must handle skipping file delivery for
+ old load data infile events
+ */
+ /* TODO: I/O thread should not even log events with the same server id */
+ rli->inc_pos(ev->get_event_len(),
+ type_code != STOP_EVENT ? ev->log_pos : 0,
+ 1/* skip lock*/);
+ flush_relay_log_info(rli);
+ if (rli->slave_skip_counter && /* protect against common user error of
setting the counter to 1 instead of 2
while recovering from an failed
auto-increment insert */
- !(type_code == INTVAR_EVENT &&
- slave_skip_counter == 1))
- --slave_skip_counter;
+ !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
+ rli->slave_skip_counter == 1))
+ --rli->slave_skip_counter;
+ pthread_mutex_unlock(&rli->data_lock);
delete ev;
return 0; // avoid infinite update loops
}
+ pthread_mutex_unlock(&rli->data_lock);
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
- if(!thd->log_seq)
- thd->log_seq = ev->log_seq;
if (!ev->when)
ev->when = time(NULL);
ev->thd = thd;
- exec_res = ev->exec_event(mi);
+ thd->log_pos = ev->log_pos;
+ exec_res = ev->exec_event(rli);
+ DBUG_ASSERT(rli->sql_thd==thd);
delete ev;
return exec_res;
}
@@ -1044,167 +1544,148 @@ This may also be a network problem, or just a bug in the master or slave code.\
return 1;
}
}
-
-// slave thread
-pthread_handler_decl(handle_slave,arg __attribute__((unused)))
+/* slave I/O thread */
+pthread_handler_decl(handle_slave_io,arg)
{
#ifndef DBUG_OFF
slave_begin:
#endif
THD *thd; // needs to be first for thread_stack
MYSQL *mysql = NULL ;
+ MASTER_INFO* mi = (MASTER_INFO*)arg;
char llbuff[22];
-
- pthread_mutex_lock(&LOCK_slave);
- if (!server_id)
- {
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
- sql_print_error("Server id not set, will not start slave");
- pthread_exit((void*)1);
- }
+ bool retried_once = 0;
+ ulonglong last_failed_pos = 0; // TODO: see if last_failed_pos is needed
+ DBUG_ASSERT(mi->inited);
- if(slave_running)
- {
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
- pthread_exit((void*)1); // safety just in case
- }
- slave_running = 1;
- abort_slave = 0;
+ pthread_mutex_lock(&mi->run_lock);
#ifndef DBUG_OFF
- events_till_abort = abort_slave_event_count;
+ mi->events_till_abort = abort_slave_event_count;
#endif
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
-
- // int error = 1;
- bool retried_once = 0;
- ulonglong last_failed_pos = 0;
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
- slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
- DBUG_ENTER("handle_slave");
+ thd = new THD; // note that contructor of THD uses DBUG_ !
+ DBUG_ENTER("handle_slave_io");
pthread_detach_this_thread();
- if (init_slave_thread(thd) || init_master_info(&glob_mi))
+ if (init_slave_thread(thd, SLAVE_THD_IO))
{
- sql_print_error("Failed during slave thread initialization");
+ pthread_cond_broadcast(&mi->start_cond);
+ pthread_mutex_unlock(&mi->run_lock);
+ sql_print_error("Failed during slave I/O thread initialization");
goto err;
}
+ mi->io_thd = thd;
thd->thread_stack = (char*)&thd; // remember where our stack is
- thd->temporary_tables = save_temporary_tables; // restore temp tables
threads.append(thd);
- glob_mi.pending = 0; //this should always be set to 0 when the slave thread
- // is started
+ mi->slave_running = 1;
+ mi->abort_slave = 0;
+ pthread_cond_broadcast(&mi->start_cond);
+ pthread_mutex_unlock(&mi->run_lock);
DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
- glob_mi.log_file_name, llstr(glob_mi.pos,llbuff)));
-
+ mi->master_log_name, llstr(mi->master_log_pos,llbuff)));
if (!(mysql = mc_mysql_init(NULL)))
{
- sql_print_error("Slave thread: error in mc_mysql_init()");
+ sql_print_error("Slave I/O thread: error in mc_mysql_init()");
goto err;
}
thd->proc_info = "connecting to master";
#ifndef DBUG_OFF
- sql_print_error("Slave thread initialized");
+ sql_print_error("Slave I/O thread initialized");
#endif
// we can get killed during safe_connect
- if (!safe_connect(thd, mysql, &glob_mi))
- sql_print_error("Slave: connected to master '%s@%s:%d',\
- replication started in log '%s' at position %s", glob_mi.user,
- glob_mi.host, glob_mi.port,
- RPL_LOG_NAME,
- llstr(glob_mi.pos,llbuff));
+ if (!safe_connect(thd, mysql, mi))
+ sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
+ replication started in log '%s' at position %s", mi->user,
+ mi->host, mi->port,
+ IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
else
{
- sql_print_error("Slave thread killed while connecting to master");
+ sql_print_error("Slave I/O thread killed while connecting to master");
goto err;
}
connected:
thd->slave_net = &mysql->net;
- // register ourselves with the master
- // if fails, this is not fatal - we just print the error message and go
- // on with life
thd->proc_info = "Checking master version";
- if (check_master_version(mysql, &glob_mi))
+ if (check_master_version(mysql, mi))
{
goto err;
}
- if (!glob_mi.old_format)
+ if (!mi->old_format)
{
+ // register ourselves with the master
+ // if fails, this is not fatal - we just print the error message and go
+ // on with life
thd->proc_info = "Registering slave on master";
if (register_slave_on_master(mysql) || update_slave_list(mysql))
goto err;
}
- while (!slave_killed(thd))
+ while (!slave_killed(thd,mi))
{
thd->proc_info = "Requesting binlog dump";
- if(request_dump(mysql, &glob_mi))
+ if (request_dump(mysql, mi))
{
sql_print_error("Failed on request_dump()");
- if(slave_killed(thd))
+ if(slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed while requesting master \
+ sql_print_error("Slave I/O thread killed while requesting master \
dump");
goto err;
}
thd->proc_info = "Waiiting to reconnect after a failed dump request";
- if(mysql->net.vio)
- vio_close(mysql->net.vio);
+ mc_end_server(mysql);
// first time retry immediately, assuming that we can recover
// right away - if first time fails, sleep between re-tries
// hopefuly the admin can fix the problem sometime
- if(retried_once)
- safe_sleep(thd, glob_mi.connect_retry);
+ if (retried_once)
+ safe_sleep(thd, mi, mi->connect_retry);
else
retried_once = 1;
- if(slave_killed(thd))
+ if (slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed while retrying master \
+ sql_print_error("Slave I/O thread killed while retrying master \
dump");
goto err;
}
thd->proc_info = "Reconnecting after a failed dump request";
- last_failed_pos=glob_mi.pos;
- sql_print_error("Slave: failed dump request, reconnecting to \
-try again, log '%s' at postion %s", RPL_LOG_NAME,
- llstr(last_failed_pos,llbuff));
- if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
+ sql_print_error("Slave I/O thread: failed dump request, \
+reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
+ if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed during or after reconnect");
+ sql_print_error("Slave I/O thread killed during or \
+after reconnect");
goto err;
}
goto connected;
}
-
- while(!slave_killed(thd))
+ while (!slave_killed(thd,mi))
{
thd->proc_info = "Reading master update";
- ulong event_len = read_event(mysql, &glob_mi);
- if(slave_killed(thd))
+ ulong event_len = read_event(mysql, mi);
+ if (slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed while reading event");
+ sql_print_error("Slave I/O thread killed while reading event");
goto err;
}
-
if (event_len == packet_error)
{
- if(mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
+ if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
{
sql_print_error("Log entry on master is longer than \
max_allowed_packet on slave. Slave thread will be aborted. If the entry is \
@@ -1214,99 +1695,72 @@ max_allowed_packet. The current value is %ld", max_allowed_packet);
}
thd->proc_info = "Waiting to reconnect after a failed read";
- if(mysql->net.vio)
- vio_close(mysql->net.vio);
- if(retried_once) // punish repeat offender with sleep
- safe_sleep(thd, glob_mi.connect_retry);
+ mc_end_server(mysql);
+ if (retried_once) // punish repeat offender with sleep
+ safe_sleep(thd,mi,mi->connect_retry);
else
retried_once = 1;
- if(slave_killed(thd))
+ if (slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed while waiting to \
+ sql_print_error("Slave I/O thread killed while waiting to \
reconnect after a failed read");
goto err;
}
thd->proc_info = "Reconnecting after a failed read";
- last_failed_pos= glob_mi.pos;
- sql_print_error("Slave: Failed reading log event, \
-reconnecting to retry, log '%s' position %s", RPL_LOG_NAME,
- llstr(last_failed_pos, llbuff));
- if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
+ sql_print_error("Slave I/O thread: Failed reading log event, \
+reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos, llbuff));
+ if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi))
{
- sql_print_error("Slave thread killed during or after a \
+ sql_print_error("Slave I/O thread killed during or after a \
reconnect done to recover from failed read");
goto err;
}
-
goto connected;
} // if(event_len == packet_error)
- thd->proc_info = "Processing master log event";
- if(exec_event(thd, &mysql->net, &glob_mi, event_len))
- {
- sql_print_error("\
-Error running query, slave aborted. Fix the problem, and re-start \
-the slave thread with \"mysqladmin start-slave\". We stopped at log \
-'%s' position %s",
- RPL_LOG_NAME, llstr(glob_mi.pos, llbuff));
- goto err;
- // there was an error running the query
- // abort the slave thread, when the problem is fixed, the user
- // should restart the slave with mysqladmin start-slave
- }
+ thd->proc_info = "Queueing event from master";
+ if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
+ (uint)event_len))
+ {
+ sql_print_error("Slave I/O thread could not queue event \
+from master");
+ goto err;
+ }
+ // TODO: check debugging abort code
#ifndef DBUG_OFF
- if(abort_slave_event_count && !--events_till_abort)
+ if (abort_slave_event_count && !--events_till_abort)
{
- sql_print_error("Slave: debugging abort");
+ sql_print_error("Slave I/O thread: debugging abort");
goto err;
}
#endif
-
- // successful exec with offset advance,
- // the slave repents and his sins are forgiven!
- if(glob_mi.pos > last_failed_pos)
- {
- retried_once = 0;
-#ifndef DBUG_OFF
- stuck_count = 0;
-#endif
- }
-#ifndef DBUG_OFF
- else
- {
- // show a little mercy, allow slave to read one more event
- // before cutting him off - otherwise he gets stuck
- // on Intvar events, since they do not advance the offset
- // immediately
- if (++stuck_count > 2)
- events_till_disconnect++;
- }
-#endif
- } // while(!slave_killed(thd)) - read/exec loop
- } // while(!slave_killed(thd)) - slave loop
+ } // while(!slave_killed(thd,mi)) - read/exec loop
+ } // while(!slave_killed(thd,mi)) - slave loop
// error = 0;
err:
- // print the current replication position
- sql_print_error("Slave thread exiting, replication stopped in log '%s' at \
-position %s",
- RPL_LOG_NAME, llstr(glob_mi.pos,llbuff));
+ // print the current replication position
+ sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
+ IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
thd->query = thd->db = 0; // extra safety
if(mysql)
mc_mysql_close(mysql);
thd->proc_info = "Waiting for slave mutex on exit";
- pthread_mutex_lock(&LOCK_slave);
- slave_running = 0;
+ pthread_mutex_lock(&mi->run_lock);
+ mi->slave_running = 0;
+ mi->io_thd = 0;
+ // TODO: make rpl_status part of MASTER_INFO
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
- abort_slave = 0;
- save_temporary_tables = thd->temporary_tables;
- thd->temporary_tables = 0; // remove tempation from destructor to close them
- pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
- pthread_mutex_unlock(&LOCK_slave);
+ mi->abort_slave = 0; // TODO: check if this is needed
+ DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because we are weird
- slave_thd = 0;
+ pthread_mutex_lock(&LOCK_thread_count);
delete thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
+ pthread_mutex_unlock(&mi->run_lock);
my_thread_end();
#ifndef DBUG_OFF
if(abort_slave_event_count && !events_till_abort)
@@ -1316,9 +1770,185 @@ position %s",
DBUG_RETURN(0); // Can't return anything here
}
+/* slave SQL logic thread */
-/* try to connect until successful or slave killed */
+pthread_handler_decl(handle_slave_sql,arg)
+{
+#ifndef DBUG_OFF
+ slave_begin:
+#endif
+ THD *thd; /* needs to be first for thread_stack */
+ MYSQL *mysql = NULL ;
+ bool retried_once = 0;
+ ulonglong last_failed_pos = 0; // TODO: see if this can be removed
+ char llbuff[22],llbuff1[22];
+ RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli;
+ const char* errmsg=0;
+ DBUG_ASSERT(rli->inited);
+ pthread_mutex_lock(&rli->run_lock);
+ DBUG_ASSERT(!rli->slave_running);
+#ifndef DBUG_OFF
+ rli->events_till_abort = abort_slave_event_count;
+#endif
+
+
+ // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
+ my_thread_init();
+ thd = new THD; // note that contructor of THD uses DBUG_ !
+ DBUG_ENTER("handle_slave_sql");
+
+ pthread_detach_this_thread();
+ if (init_slave_thread(thd, SLAVE_THD_SQL))
+ {
+ // TODO: this is currently broken - slave start and change master
+ // will be stuck if we fail here
+ pthread_cond_broadcast(&rli->start_cond);
+ pthread_mutex_unlock(&rli->run_lock);
+ sql_print_error("Failed during slave thread initialization");
+ goto err;
+ }
+ thd->thread_stack = (char*)&thd; // remember where our stack is
+ thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
+ threads.append(thd);
+ rli->sql_thd = thd;
+ rli->slave_running = 1;
+ rli->abort_slave = 0;
+ pthread_cond_broadcast(&rli->start_cond);
+ pthread_mutex_unlock(&rli->run_lock);
+ rli->pending = 0; //this should always be set to 0 when the slave thread
+ // is started
+ if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg))
+ {
+ sql_print_error("Error initializing relay log position: %s",
+ errmsg);
+ goto err;
+ }
+ DBUG_ASSERT(rli->relay_log_pos >= 4);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+
+ DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
+ rli->master_log_name, llstr(rli->master_log_pos,llbuff)));
+ DBUG_ASSERT(rli->sql_thd == thd);
+ sql_print_error("Slave SQL thread initialized, starting replication in \
+log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
+ llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
+ llstr(rli->relay_log_pos,llbuff1));
+ while (!slave_killed(thd,rli))
+ {
+ thd->proc_info = "Processing master log event";
+ DBUG_ASSERT(rli->sql_thd == thd);
+ if (exec_relay_log_event(thd,rli))
+ {
+ // do not scare the user if SQL thread was simply killed or stopped
+ if (!slave_killed(thd,rli))
+ sql_print_error("\
+Error running query, slave SQL thread aborted. Fix the problem, and restart \
+the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \
+'%s' position %s",
+ RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
+ goto err;
+ }
+ } // while(!slave_killed(thd,rli)) - read/exec loop
+
+ // error = 0;
+ err:
+ // print the current replication position
+ sql_print_error("Slave SQL thread exiting, replication stopped in log \
+ '%s' at position %s",
+ RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
+ thd->query = thd->db = 0; // extra safety
+ thd->proc_info = "Waiting for slave mutex on exit";
+ pthread_mutex_lock(&rli->run_lock);
+ DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
+ rli->slave_running = 0;
+ rli->save_temporary_tables = thd->temporary_tables;
+ //TODO: see if we can do this conditionally in next_event() instead
+ // to avoid unneeded position re-init
+ rli->log_pos_current=0;
+ thd->temporary_tables = 0; // remove tempation from destructor to close them
+ DBUG_ASSERT(thd->net.buff != 0);
+ net_end(&thd->net); // destructor will not free it, because we are weird
+ DBUG_ASSERT(rli->sql_thd == thd);
+ rli->sql_thd = 0;
+ pthread_mutex_lock(&LOCK_thread_count);
+ delete thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ pthread_cond_broadcast(&rli->stop_cond);
+ // tell the world we are done
+ pthread_mutex_unlock(&rli->run_lock);
+ my_thread_end();
+#ifndef DBUG_OFF // TODO: reconsider the code below
+ if (abort_slave_event_count && !rli->events_till_abort)
+ goto slave_begin;
+#endif
+ pthread_exit(0);
+ DBUG_RETURN(0); // Can't return anything here
+}
+int queue_event(MASTER_INFO* mi,const char* buf,uint event_len)
+{
+ int error;
+ bool inc_pos = 1;
+ if (mi->old_format)
+ return 1; // TODO: deal with old format
+
+ switch (buf[EVENT_TYPE_OFFSET])
+ {
+ case ROTATE_EVENT:
+ {
+ Rotate_log_event rev(buf,event_len,0);
+ if (!rev.is_valid())
+ return 1;
+ DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name));
+ memcpy(mi->master_log_name,rev.new_log_ident,
+ rev.ident_len);
+ mi->master_log_name[rev.ident_len] = 0;
+ mi->master_log_pos = rev.pos;
+ inc_pos = 0;
+#ifndef DBUG_OFF
+ /* if we do not do this, we will be getting the first
+ rotate event forever, so
+ we need to not disconnect after one
+ */
+ if (disconnect_slave_event_count)
+ events_till_disconnect++;
+#endif
+ break;
+ }
+ default:
+ break;
+ }
+
+ if (!(error = mi->rli.relay_log.appendv(buf,event_len,0)))
+ {
+ if (inc_pos)
+ mi->master_log_pos += event_len;
+ }
+ return error;
+}
+
+void end_relay_log_info(RELAY_LOG_INFO* rli)
+{
+ if (!rli->inited)
+ return;
+ if (rli->info_fd >= 0)
+ {
+ end_io_cache(&rli->info_file);
+ (void)my_close(rli->info_fd, MYF(MY_WME));
+ rli->info_fd = -1;
+ }
+ if (rli->cur_log_fd >= 0)
+ {
+ end_io_cache(&rli->cache_buf);
+ (void)my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ }
+ rli->inited = 0;
+ rli->log_pos_current=0;
+ rli->relay_log.close(1);
+}
+
+/* try to connect until successful or slave killed */
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
return connect_to_master(thd, mysql, mi, 0);
@@ -1328,7 +1958,6 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
Try to connect until successful or slave killed or we have retried
master_retry_count times
*/
-
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect)
{
@@ -1337,15 +1966,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
ulong err_count=0;
char llbuff[22];
- /*
- If we lost connection after reading a state set event
- we will be re-reading it, so pending needs to be cleared
- */
- mi->pending = 0;
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
- while (!(slave_was_killed = slave_killed(thd)) &&
+ while (!(slave_was_killed = slave_killed(thd,mi)) &&
(reconnect ? mc_mysql_reconnect(mysql) :
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0)))
@@ -1353,12 +1977,13 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
/* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno)
{
- sql_print_error("Slave thread: error connecting to master: \
-%s, last_errno=%d, retry in %d sec",
+ sql_print_error("Slave I/O thread: error connecting to master \
+'%s@%s:%d': \
+%s, last_errno=%d, retry in %d sec",mi->user,mi->host,mi->port,
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry);
}
- safe_sleep(thd, mi->connect_retry);
+ safe_sleep(thd,mi,mi->connect_retry);
/* by default we try forever. The reason is that failure will trigger
master election, so if the user did not set master_retry_count we
do not want to have electioin triggered on the first failure to
@@ -1377,10 +2002,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
{
if (reconnect)
sql_print_error("Slave: connected to master '%s@%s:%d',\
-replication resumed in log '%s' at position %s", glob_mi.user,
- glob_mi.host, glob_mi.port,
- RPL_LOG_NAME,
- llstr(glob_mi.pos,llbuff));
+replication resumed in log '%s' at position %s", mi->user,
+ mi->host, mi->port,
+ IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
else
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
@@ -1405,6 +2030,175 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
return connect_to_master(thd, mysql, mi, 1);
}
+int flush_relay_log_info(RELAY_LOG_INFO* rli)
+{
+ IO_CACHE* file = &rli->info_file;
+ char lbuf[22],lbuf1[22];
+
+ my_b_seek(file, 0L);
+ my_b_printf(file, "%s\n%s\n%s\n%s\n",
+ rli->relay_log_name, llstr(rli->relay_log_pos, lbuf),
+ rli->master_log_name, llstr(rli->master_log_pos, lbuf1)
+ );
+ flush_io_cache(file);
+ flush_io_cache(rli->cur_log);
+ return 0;
+}
+
+IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg)
+{
+ DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
+ IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf;
+ DBUG_ASSERT(rli->cur_log_fd == -1);
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
+ errmsg))<0)
+ return 0;
+ my_b_seek(cur_log,rli->relay_log_pos);
+ return cur_log;
+}
+
+Log_event* next_event(RELAY_LOG_INFO* rli)
+{
+ Log_event* ev;
+ IO_CACHE* cur_log = rli->cur_log;
+ pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
+ const char* errmsg=0;
+ THD* thd = rli->sql_thd;
+ bool was_killed;
+ DBUG_ASSERT(thd != 0);
+
+ // For most operations we need to protect rli members with data_lock,
+ // so we will hold it for the most of the loop below
+ // However, we will release it whenever it is worth the hassle,
+ // and in the cases when we go into a pthread_cond_wait() with the
+ // non-data_lock mutex
+ pthread_mutex_lock(&rli->data_lock);
+
+ for (;!(was_killed=slave_killed(thd,rli));)
+ {
+ // we can have two kinds of log reading:
+ // hot_log - rli->cur_log points at the IO_CACHE of relay_log, which
+ // is actively being updated by the I/O thread. We need to be careful
+ // in this case and make sure that we are not looking at a stale log that
+ // has already been rotated. If it has been, we reopen the log
+ // the other case is much simpler - we just have a read only log that
+ // nobody else will be updating.
+ bool hot_log;
+ if ((hot_log = (cur_log != &rli->cache_buf)))
+ {
+ DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
+ pthread_mutex_lock(log_lock);
+ // reading cur_log->init_count here is safe because the log will only
+ // be rotated when we hold relay_log.LOCK_log
+ if (cur_log->init_count != rli->cur_log_init_count)
+ {
+ if (!(cur_log=reopen_relay_log(rli,&errmsg)))
+ {
+ pthread_mutex_unlock(log_lock);
+ goto err;
+ }
+ pthread_mutex_unlock(log_lock);
+ hot_log=0;
+ }
+ }
+ DBUG_ASSERT(my_b_tell(cur_log) >= 4);
+ DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
+ if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format)))
+ {
+ DBUG_ASSERT(thd==rli->sql_thd);
+ if (hot_log)
+ pthread_mutex_unlock(log_lock);
+ pthread_mutex_unlock(&rli->data_lock);
+ return ev;
+ }
+ DBUG_ASSERT(thd==rli->sql_thd);
+ if (!cur_log->error) /* EOF */
+ {
+ // on a hot log, EOF means that there are no more updates to
+ // process and we must block until I/O thread adds some and
+ // signals us to continue
+ if (hot_log)
+ {
+ DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count);
+ //we can, and should release data_lock while we are waiting for
+ // update. If we do not, show slave status will block
+ pthread_mutex_unlock(&rli->data_lock);
+
+ // IMPORTANT: note that wait_for_update will unlock LOCK_log, but
+ // expects the caller to lock it
+ rli->relay_log.wait_for_update(rli->sql_thd);
+
+ // re-acquire data lock since we released it earlier
+ pthread_mutex_lock(&rli->data_lock);
+ continue;
+ }
+ // if the log was not hot, we need to move to the next log in
+ // sequence. The next log could be hot or cold, we deal with both
+ // cases separately after doing some common initialization
+ else
+ {
+ end_io_cache(cur_log);
+ DBUG_ASSERT(rli->cur_log_fd >= 0);
+ my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ int error;
+
+ // purge_first_log will properly set up relay log coordinates in rli
+ if (rli->relay_log.purge_first_log(rli))
+ {
+ errmsg = "Error purging processed log";
+ goto err;
+ }
+
+ // next log is hot
+ if (rli->relay_log.is_active(rli->linfo.log_file_name))
+ {
+#ifdef EXTRA_DEBUG
+ sql_print_error("next log '%s' is currently active",
+ rli->linfo.log_file_name);
+#endif
+ rli->cur_log = cur_log = rli->relay_log.get_log_file();
+ rli->cur_log_init_count = cur_log->init_count;
+ DBUG_ASSERT(rli->cur_log_fd == -1);
+
+ // read pointer has to be at the start since we are the only
+ // reader
+ if (check_binlog_magic(cur_log,&errmsg))
+ goto err;
+ continue;
+ }
+ // if we get here, the log was not hot, so we will have to
+ // open it ourselves
+#ifdef EXTRA_DEBUG
+ sql_print_error("next log '%s' is not active",
+ rli->linfo.log_file_name);
+#endif
+ // open_binlog() will check the magic header
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
+ &errmsg))<0)
+ goto err;
+ }
+ }
+ else // read failed with a non-EOF error
+ {
+ // TODO: come up with something better to handle this error
+ sql_print_error("Slave SQL thread: I/O error reading \
+event(errno=%d,cur_log->error=%d)",
+ my_errno,cur_log->error);
+ // no need to hog the mutex while we sleep
+ pthread_mutex_unlock(&rli->data_lock);
+ safe_sleep(rli->sql_thd,rli->mi,1);
+ pthread_mutex_lock(&rli->data_lock);
+ }
+ }
+ if (!errmsg && was_killed)
+ errmsg = "slave SQL thread was killed";
+err:
+ pthread_mutex_unlock(&rli->data_lock);
+ sql_print_error("Error reading relay log event: %s", errmsg);
+ return 0;
+}
+
#ifdef __GNUC__
template class I_List_iterator<i_string>;