summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc1758
1 files changed, 1257 insertions, 501 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index f2c29146308..e68741e7434 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1,15 +1,15 @@
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
-
+
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
-
+
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
-
+
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
@@ -24,29 +24,23 @@
#include "repl_failsafe.h"
#include <thr_alarm.h>
#include <my_dir.h>
+#include <assert.h>
-volatile bool slave_running = 0;
+volatile bool slave_sql_running = 0, slave_io_running = 0;
char* slave_load_tmpdir = 0;
-pthread_t slave_real_id;
-MASTER_INFO glob_mi;
-MY_BITMAP slave_error_mask;
-bool use_slave_mask = 0;
+MASTER_INFO main_mi;
+MASTER_INFO* active_mi;
+volatile int active_mi_in_use = 0;
HASH replicate_do_table, replicate_ignore_table;
DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
bool do_table_inited = 0, ignore_table_inited = 0;
bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
bool table_rules_on = 0;
-uint32 slave_skip_counter = 0;
-
-/*
- When slave thread exits, we need to remember the temporary tables so we
- can re-use them on slave start
-*/
static TABLE* save_temporary_tables = 0;
+// when slave thread exits, we need to remember the temporary tables so we
+// can re-use them on slave start
-THD* slave_thd = 0;
-int last_slave_errno = 0;
-char last_slave_error[MAX_SLAVE_ERRMSG] = "";
+// TODO: move the vars below under MASTER_INFO
#ifndef DBUG_OFF
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
static int events_till_disconnect = -1;
@@ -54,15 +48,17 @@ int events_till_abort = -1;
static int stuck_count = 0;
#endif
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net);
-inline bool slave_killed(THD* thd);
-static int init_slave_thread(THD* thd);
+static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
+static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli);
+static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect);
-static int safe_sleep(THD* thd, int sec);
+static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
@@ -70,6 +66,79 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi);
char* rewrite_db(char* db);
+void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse)
+{
+ bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
+ if (inverse)
+ {
+ /* This makes me think of the Russian idiom "I am not I, and this is
+ not my horse", which is used to deny reponsibility for
+ one's actions.
+ */
+ set_io = !set_io;
+ set_sql = !set_sql;
+ }
+ register int tmp_mask=0;
+ if (set_io)
+ tmp_mask |= SLAVE_IO;
+ if (set_sql)
+ tmp_mask |= SLAVE_SQL;
+ *mask = tmp_mask;
+}
+
+void lock_slave_threads(MASTER_INFO* mi)
+{
+ //TODO: see if we can do this without dual mutex
+ pthread_mutex_lock(&mi->run_lock);
+ pthread_mutex_lock(&mi->rli.run_lock);
+}
+
+void unlock_slave_threads(MASTER_INFO* mi)
+{
+ //TODO: see if we can do this without dual mutex
+ pthread_mutex_unlock(&mi->rli.run_lock);
+ pthread_mutex_unlock(&mi->run_lock);
+}
+
+int init_slave()
+{
+ // TODO (multi-master): replace this with list initialization
+ active_mi = &main_mi;
+
+ // TODO: the code below is a copy-paste mess - clean it up
+ /*
+ make sure slave thread gets started if server_id is set,
+ valid master.info is present, and master_host has not been specified
+ */
+ if (server_id && !master_host)
+ {
+ // TODO: re-write this to interate through the list of files
+ // for multi-master
+ char fname[FN_REFLEN+128];
+ MY_STAT stat_area;
+ fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
+ if (my_stat(fname, &stat_area, MYF(0)) &&
+ !init_master_info(active_mi,master_info_file,relay_log_info_file))
+ master_host = active_mi->host;
+ }
+ // slave thread
+ if (master_host)
+ {
+ if (!opt_skip_slave_start && start_slave_threads(1 /* need mutex */,
+ 0 /* no wait for start*/,
+ active_mi,
+ master_info_file,
+ relay_log_info_file,
+ SLAVE_IO|SLAVE_SQL
+ ))
+ sql_print_error("Warning: Can't create threads to handle slave");
+ else if (opt_skip_slave_start)
+ if (init_master_info(active_mi, master_info_file, relay_log_info_file))
+ sql_print_error("Warning: failed to initialized master info");
+ }
+ return 0;
+}
+
static void free_table_ent(TABLE_RULE_ENT* e)
{
my_free((gptr) e, MYF(0));
@@ -82,37 +151,285 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
return (byte*)e->db;
}
+// TODO: check proper initialization of master_log_name/master_log_pos
+int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
+ ulonglong pos, bool need_data_lock,
+ const char** errmsg)
+{
+ if (rli->log_pos_current)
+ return 0;
+ pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
+ pthread_mutex_lock(log_lock);
+ if (need_data_lock)
+ pthread_mutex_lock(&rli->data_lock);
+
+ if (rli->cur_log_fd >= 0)
+ {
+ end_io_cache(&rli->cache_buf);
+ my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ }
+
+ if (!log)
+ log = rli->relay_log_name; // already inited
+ if (!pos)
+ pos = rli->relay_log_pos; // already inited
+ else
+ rli->relay_log_pos = pos;
+ if (rli->relay_log.find_first_log(&rli->linfo,log))
+ {
+ *errmsg="Could not find first log during relay log initialization";
+ goto err;
+ }
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ // to make end_io_cache(&rli->cache_buf) safe in all cases
+ if (!rli->inited)
+ bzero((char*) &rli->cache_buf, sizeof(IO_CACHE));
+ if (rli->relay_log.is_active(rli->linfo.log_file_name))
+ {
+ if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
+ check_binlog_magic(rli->cur_log,errmsg))
+ {
+ goto err;
+ }
+ rli->cur_log_init_count=rli->cur_log->init_count;
+ }
+ else
+ {
+ if (rli->inited)
+ end_io_cache(&rli->cache_buf);
+ if (rli->cur_log_fd>=0)
+ my_close(rli->cur_log_fd,MYF(MY_WME));
+ if ((rli->cur_log_fd=open_binlog(&rli->cache_buf,
+ rli->linfo.log_file_name,errmsg)) < 0)
+ {
+ goto err;
+ }
+ rli->cur_log = &rli->cache_buf;
+ }
+ if (pos > 4)
+ my_b_seek(rli->cur_log,(off_t)pos);
+ rli->log_pos_current=1;
+err:
+ pthread_cond_broadcast(&rli->data_cond);
+ if (need_data_lock)
+ pthread_mutex_unlock(&rli->data_lock);
+ pthread_mutex_unlock(log_lock);
+ return (*errmsg) ? 1 : 0;
+}
-/* called from get_options() in mysqld.cc on start-up */
-void init_slave_skip_errors(char* arg)
+// we assume we have a run lock on rli and that the both slave thread
+// are not running
+int purge_relay_logs(RELAY_LOG_INFO* rli, bool just_reset, const char** errmsg)
{
- char* p;
- my_bool last_was_digit = 0;
- if (bitmap_init(&slave_error_mask,MAX_SLAVE_ERROR,0))
+ if (!rli->inited)
+ return 0; /* successfully do nothing */
+ DBUG_ASSERT(rli->slave_running == 0);
+ DBUG_ASSERT(rli->mi->slave_running == 0);
+ int error=0;
+ rli->slave_skip_counter=0;
+ pthread_mutex_lock(&rli->data_lock);
+ rli->pending=0;
+ rli->master_log_name[0]=0;
+ rli->master_log_pos=0; // 0 means uninitialized
+ if (rli->relay_log.reset_logs(rli->sql_thd) ||
+ rli->relay_log.find_first_log(&rli->linfo,""))
{
- fprintf(stderr, "Badly out of memory, please check your system status\n");
- exit(1);
+ *errmsg = "Failed during log reset";
+ error=1;
+ goto err;
}
- use_slave_mask = 1;
- for (;isspace(*arg);++arg)
- /* empty */;
- if (!my_casecmp(arg,"all",3))
+ strnmov(rli->relay_log_name,rli->linfo.log_file_name,
+ sizeof(rli->relay_log_name));
+ rli->relay_log_pos=4;
+ rli->log_pos_current=0;
+ if (!just_reset)
+ error = init_relay_log_pos(rli,0,0,0/*do not need data lock*/,errmsg);
+err:
+ pthread_mutex_unlock(&rli->data_lock);
+ return error;
+}
+
+int terminate_slave_threads(MASTER_INFO* mi,int thread_mask,bool skip_lock)
+{
+ if (!mi->inited)
+ return 0; /* successfully do nothing */
+ int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
+ pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
+ pthread_mutex_t *sql_cond_lock,*io_cond_lock;
+
+ sql_cond_lock=sql_lock;
+ io_cond_lock=io_lock;
+
+ if (skip_lock)
{
- bitmap_set_all(&slave_error_mask);
- return;
+ sql_lock = io_lock = 0;
}
- for (p= arg ; *p; )
+ if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) && mi->slave_running)
{
- long err_code;
- if (!(p= str2int(p, 10, 0, LONG_MAX, &err_code)))
- break;
- if (err_code < MAX_SLAVE_ERROR)
- bitmap_set_bit(&slave_error_mask,(uint)err_code);
- while (!isdigit(*p) && *p)
- p++;
+ mi->abort_slave=1;
+ if ((error=terminate_slave_thread(mi->io_thd,io_lock,
+ io_cond_lock,
+ &mi->stop_cond,
+ &mi->slave_running)) &&
+ !force_all)
+ return error;
}
+ if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) && mi->rli.slave_running)
+ {
+ DBUG_ASSERT(mi->rli.sql_thd != 0) ;
+ mi->rli.abort_slave=1;
+ if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
+ sql_cond_lock,
+ &mi->rli.stop_cond,
+ &mi->rli.slave_running)) &&
+ !force_all)
+ return error;
+ }
+ return 0;
}
+int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
+ pthread_mutex_t *cond_lock,
+ pthread_cond_t* term_cond,
+ volatile bool* slave_running)
+{
+ if (term_lock)
+ {
+ pthread_mutex_lock(term_lock);
+ if (!*slave_running)
+ {
+ pthread_mutex_unlock(term_lock);
+ return ER_SLAVE_NOT_RUNNING;
+ }
+ }
+ DBUG_ASSERT(thd != 0);
+ KICK_SLAVE(thd);
+ while (*slave_running)
+ {
+ /* there is a small chance that slave thread might miss the first
+ alarm. To protect againts it, resend the signal until it reacts
+ */
+ struct timespec abstime;
+#ifdef HAVE_TIMESPEC_TS_SEC
+ abstime.ts_sec=time(NULL)+2;
+ abstime.ts_nsec=0;
+#elif defined(__WIN__)
+ abstime.tv_sec=time((time_t*) 0)+2;
+ abstime.tv_nsec=0;
+#else
+ struct timeval tv;
+ gettimeofday(&tv,0);
+ abstime.tv_sec=tv.tv_sec+2;
+ abstime.tv_nsec=tv.tv_usec*1000;
+#endif
+ pthread_cond_timedwait(term_cond, cond_lock, &abstime);
+ if (*slave_running)
+ KICK_SLAVE(thd);
+ }
+ if (term_lock)
+ pthread_mutex_unlock(term_lock);
+ return 0;
+}
+
+int start_slave_thread(pthread_handler h_func, pthread_mutex_t* start_lock,
+ pthread_mutex_t *cond_lock,
+ pthread_cond_t* start_cond,
+ volatile bool* slave_running,
+ MASTER_INFO* mi)
+{
+ pthread_t th;
+ DBUG_ASSERT(mi->inited);
+ if (start_lock)
+ pthread_mutex_lock(start_lock);
+ if (!server_id)
+ {
+ if (start_cond)
+ pthread_cond_broadcast(start_cond);
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ sql_print_error("Server id not set, will not start slave");
+ return ER_BAD_SLAVE;
+ }
+
+ if (*slave_running)
+ {
+ if (start_cond)
+ pthread_cond_broadcast(start_cond);
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return ER_SLAVE_MUST_STOP;
+ }
+ if (pthread_create(&th, &connection_attrib, h_func, (void*)mi))
+ {
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return ER_SLAVE_THREAD;
+ }
+ if (start_cond && cond_lock)
+ {
+ THD* thd = current_thd;
+ while (!*slave_running)
+ {
+ const char* old_msg = thd->enter_cond(start_cond,cond_lock,
+ "Waiting for slave thread to start");
+ pthread_cond_wait(start_cond,cond_lock);
+ thd->exit_cond(old_msg);
+ // TODO: in a very rare case of init_slave_thread failing, it is
+ // possible that we can get stuck here since slave_running will not
+ // be set. We need to change slave_running to int and have -1 as
+ // error code
+ if (thd->killed)
+ {
+ pthread_mutex_unlock(cond_lock);
+ return ER_SERVER_SHUTDOWN;
+ }
+ }
+ }
+ if (start_lock)
+ pthread_mutex_unlock(start_lock);
+ return 0;
+}
+/* SLAVE_FORCE_ALL is not implemented here on purpose since it does not make
+ sense to do that for starting a slave - we always care if it actually
+ started the threads that were not previously running
+*/
+int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
+ MASTER_INFO* mi, const char* master_info_fname,
+ const char* slave_info_fname, int thread_mask)
+{
+ pthread_mutex_t *lock_io=0,*lock_sql=0,*lock_cond_io=0,*lock_cond_sql=0;
+ pthread_cond_t* cond_io=0,*cond_sql=0;
+ int error=0;
+
+ if (need_slave_mutex)
+ {
+ lock_io = &mi->run_lock;
+ lock_sql = &mi->rli.run_lock;
+ }
+ if (wait_for_start)
+ {
+ cond_io = &mi->start_cond;
+ cond_sql = &mi->rli.start_cond;
+ lock_cond_io = &mi->run_lock;
+ lock_cond_sql = &mi->rli.run_lock;
+ }
+ if (init_master_info(mi,master_info_fname,slave_info_fname))
+ return ER_MASTER_INFO;
+
+ if ((thread_mask & SLAVE_IO) &&
+ (error=start_slave_thread(handle_slave_io,lock_io,lock_cond_io,
+ cond_io,&mi->slave_running,
+ mi)))
+ return error;
+ if ((thread_mask & SLAVE_SQL) &&
+ (error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
+ cond_sql,
+ &mi->rli.slave_running,mi)))
+ return error;
+ return 0;
+}
void init_table_rule_hash(HASH* h, bool* h_inited)
{
@@ -133,16 +450,16 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
{
uint i;
const char* key_end = key + len;
-
+
for (i = 0; i < a->elements; i++)
- {
- TABLE_RULE_ENT* e ;
- get_dynamic(a, (gptr)&e, i);
- if (!wild_case_compare(key, key_end, (const char*)e->db,
- (const char*)(e->db + e->key_len),'\\'))
- return e;
- }
-
+ {
+ TABLE_RULE_ENT* e ;
+ get_dynamic(a, (gptr)&e, i);
+ if (!wild_case_compare(key, key_end, (const char*)e->db,
+ (const char*)(e->db + e->key_len),'\\'))
+ return e;
+ }
+
return 0;
}
@@ -150,10 +467,10 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
{
for (; tables; tables = tables->next)
{
- char hash_key[2*NAME_LEN+2];
- char* p;
if (!tables->updating)
continue;
+ char hash_key[2*NAME_LEN+2];
+ char* p;
p = strmov(hash_key, tables->db ? tables->db : thd->db);
*p++ = '.';
uint len = strmov(p, tables->real_name) - hash_key ;
@@ -162,7 +479,7 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
if (hash_search(&replicate_do_table, (byte*) hash_key, len))
return 1;
}
- if (ignore_table_inited) // if there are any do's
+ if (ignore_table_inited) // if there are any ignores
{
if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
return 0;
@@ -175,10 +492,9 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
return 0;
}
- /*
- If no explicit rule found and there was a do list, do not replicate.
- If there was no do list, go ahead
- */
+ // if no explicit rule found
+ // and there was a do list, do not replicate. If there was
+ // no do list, go ahead
return !do_table_inited && !wild_do_table_inited;
}
@@ -186,14 +502,12 @@ int tables_ok(THD* thd, TABLE_LIST* tables)
int add_table_rule(HASH* h, const char* table_spec)
{
const char* dot = strchr(table_spec, '.');
- if (!dot)
- return 1;
+ if(!dot) return 1;
// len is always > 0 because we know the there exists a '.'
uint len = (uint)strlen(table_spec);
TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
+ len, MYF(MY_WME));
- if (!e)
- return 1;
+ if(!e) return 1;
e->db = (char*)e + sizeof(TABLE_RULE_ENT);
e->tbl_name = e->db + (dot - table_spec) + 1;
e->key_len = len;
@@ -205,12 +519,11 @@ int add_table_rule(HASH* h, const char* table_spec)
int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
{
const char* dot = strchr(table_spec, '.');
- if (!dot) return 1;
+ if(!dot) return 1;
uint len = (uint)strlen(table_spec);
TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
+ len, MYF(MY_WME));
- if (!e)
- return 1;
+ if(!e) return 1;
e->db = (char*)e + sizeof(TABLE_RULE_ENT);
e->tbl_name = e->db + (dot - table_spec) + 1;
e->key_len = len;
@@ -222,31 +535,28 @@ int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
static void free_string_array(DYNAMIC_ARRAY *a)
{
uint i;
- for (i = 0; i < a->elements; i++)
- {
- char* p;
- get_dynamic(a, (gptr) &p, i);
- my_free(p, MYF(MY_WME));
- }
+ for(i = 0; i < a->elements; i++)
+ {
+ char* p;
+ get_dynamic(a, (gptr) &p, i);
+ my_free(p, MYF(MY_WME));
+ }
delete_dynamic(a);
}
-void end_slave()
+static int end_slave_on_walk(MASTER_INFO* mi, gptr /*unused*/)
{
- pthread_mutex_lock(&LOCK_slave);
- if (slave_running)
- {
- abort_slave = 1;
- thr_alarm_kill(slave_real_id);
-#ifdef SIGNAL_WITH_VIO_CLOSE
- slave_thd->close_active_vio();
-#endif
- while (slave_running)
- pthread_cond_wait(&COND_slave_stopped, &LOCK_slave);
- }
- pthread_mutex_unlock(&LOCK_slave);
+ end_master_info(mi);
+ return 0;
+}
- end_master_info(&glob_mi);
+void end_slave()
+{
+ // TODO: replace the line below with
+ // list_walk(&master_list, (list_walk_action)end_slave_on_walk,0);
+ // once multi-master code is ready
+ terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
+ end_master_info(active_mi);
if (do_table_inited)
hash_free(&replicate_do_table);
if (ignore_table_inited)
@@ -257,18 +567,29 @@ void end_slave()
free_string_array(&replicate_wild_ignore_table);
}
-inline bool slave_killed(THD* thd)
+static inline bool slave_killed(THD* thd, MASTER_INFO* mi)
{
- return abort_slave || abort_loop || thd->killed;
+ DBUG_ASSERT(mi->io_thd == thd);
+ DBUG_ASSERT(mi->slave_running == 1); // tracking buffer overrun
+ return mi->abort_slave || abort_loop || thd->killed;
}
-void slave_print_error(int err_code, const char* msg, ...)
+static inline bool slave_killed(THD* thd, RELAY_LOG_INFO* rli)
+{
+ DBUG_ASSERT(rli->sql_thd == thd);
+ DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
+ return rli->abort_slave || abort_loop || thd->killed;
+}
+
+void slave_print_error(RELAY_LOG_INFO* rli, int err_code, const char* msg, ...)
{
va_list args;
va_start(args,msg);
- my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args);
- sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code);
- last_slave_errno = err_code;
+ my_vsnprintf(rli->last_slave_error,
+ sizeof(rli->last_slave_error), msg, args);
+ sql_print_error("Slave: %s, error_code=%d", rli->last_slave_error,
+ err_code);
+ rli->last_slave_errno = err_code;
}
void skip_load_data_infile(NET* net)
@@ -281,15 +602,15 @@ void skip_load_data_infile(NET* net)
char* rewrite_db(char* db)
{
- if (replicate_rewrite_db.is_empty() || !db) return db;
+ if(replicate_rewrite_db.is_empty() || !db) return db;
I_List_iterator<i_string_pair> it(replicate_rewrite_db);
i_string_pair* tmp;
- while ((tmp=it++))
- {
- if (!strcmp(tmp->key, db))
- return tmp->val;
- }
+ while((tmp=it++))
+ {
+ if(!strcmp(tmp->key, db))
+ return tmp->val;
+ }
return db;
}
@@ -297,39 +618,39 @@ char* rewrite_db(char* db)
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list )
{
- if (do_list.is_empty() && ignore_list.is_empty())
+ if(do_list.is_empty() && ignore_list.is_empty())
return 1; // ok to replicate if the user puts no constraints
// if the user has specified restrictions on which databases to replicate
// and db was not selected, do not replicate
- if (!db)
+ if(!db)
return 0;
- if (!do_list.is_empty()) // if the do's are not empty
- {
- I_List_iterator<i_string> it(do_list);
- i_string* tmp;
-
- while ((tmp=it++))
+ if(!do_list.is_empty()) // if the do's are not empty
{
- if (!strcmp(tmp->ptr, db))
- return 1; // match
+ I_List_iterator<i_string> it(do_list);
+ i_string* tmp;
+
+ while((tmp=it++))
+ {
+ if(!strcmp(tmp->ptr, db))
+ return 1; // match
+ }
+ return 0;
}
- return 0;
- }
else // there are some elements in the don't, otherwise we cannot get here
- {
- I_List_iterator<i_string> it(ignore_list);
- i_string* tmp;
-
- while ((tmp=it++))
{
- if (!strcmp(tmp->ptr, db))
- return 0; // match
- }
+ I_List_iterator<i_string> it(ignore_list);
+ i_string* tmp;
- return 1;
- }
+ while((tmp=it++))
+ {
+ if(!strcmp(tmp->ptr, db))
+ return 0; // match
+ }
+
+ return 1;
+ }
}
static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
@@ -346,7 +667,7 @@ static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
// if we truncated a line or stopped on last char, remove all chars
// up to and including newline
int c;
- while (((c=my_b_get(f)) != '\n' && c != my_b_EOF));
+ while( ((c=my_b_get(f)) != '\n' && c != my_b_EOF));
}
return 0;
}
@@ -361,13 +682,13 @@ static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
{
char buf[32];
-
+
if (my_b_gets(f, buf, sizeof(buf)))
{
*var = atoi(buf);
return 0;
}
- else if (default_val)
+ else if(default_val)
{
*var = default_val;
return 0;
@@ -381,7 +702,7 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
MYSQL_ROW row;
const char* version;
const char* errmsg = 0;
-
+
if (mc_mysql_query(mysql, "SELECT VERSION()", 0)
|| !(res = mc_mysql_store_result(mysql)))
{
@@ -399,7 +720,7 @@ static int check_master_version(MYSQL* mysql, MASTER_INFO* mi)
errmsg = "Master reported NULL for the version";
goto err;
}
-
+
switch (*version)
{
case '3':
@@ -434,7 +755,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
int error= 1;
handler *file;
uint save_options;
-
+
if (packet_len == packet_error)
{
send_error(&thd->net, ER_MASTER_NET_READ);
@@ -459,7 +780,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
thd->current_tablenr = 0;
thd->query_error = 0;
thd->net.no_send_ok = 1;
-
+
/* we do not want to log create table statement */
save_options = thd->options;
thd->options &= ~OPTION_BIN_LOG;
@@ -470,7 +791,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
mysql_parse(thd, thd->query, packet_len); // run create table
thd->db = save_db; // leave things the way the were before
thd->options = save_options;
-
+
if (thd->query_error)
goto err; // mysql_parse took care of the error send
@@ -485,7 +806,7 @@ static int create_table_from_dump(THD* thd, NET* net, const char* db,
sql_print_error("create_table_from_dump: could not open created table");
goto err;
}
-
+
file = tables.table->file;
thd->proc_info = "Reading master dump table data";
if (file->net_read_dump(net))
@@ -516,16 +837,16 @@ err:
return error;
}
-int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
+int fetch_master_table(THD* thd, const char* db_name, const char* table_name,
MASTER_INFO* mi, MYSQL* mysql)
{
int error = 1;
- int nx_errno = 0;
+ int fetch_errno = 0;
bool called_connected = (mysql != NULL);
if (!called_connected && !(mysql = mc_mysql_init(NULL)))
{
- sql_print_error("fetch_nx_table: Error in mysql_init()");
- nx_errno = ER_GET_ERRNO;
+ sql_print_error("fetch_master_table: Error in mysql_init()");
+ fetch_errno = ER_GET_ERRNO;
goto err;
}
@@ -535,17 +856,17 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
{
sql_print_error("Could not connect to master while fetching table\
'%-64s.%-64s'", db_name, table_name);
- nx_errno = ER_CONNECT_TO_MASTER;
+ fetch_errno = ER_CONNECT_TO_MASTER;
goto err;
}
}
- if (slave_killed(thd))
+ if (thd->killed)
goto err;
if (request_table_dump(mysql, db_name, table_name))
{
- nx_errno = ER_GET_ERRNO;
- sql_print_error("fetch_nx_table: failed on table dump request ");
+ fetch_errno = ER_GET_ERRNO;
+ sql_print_error("fetch_master_table: failed on table dump request ");
goto err;
}
@@ -553,23 +874,24 @@ int fetch_nx_table(THD* thd, const char* db_name, const char* table_name,
table_name))
{
// create_table_from_dump will have sent the error alread
- sql_print_error("fetch_nx_table: failed on create table ");
+ sql_print_error("fetch_master_table: failed on create table ");
goto err;
}
-
error = 0;
-
err:
if (mysql && !called_connected)
mc_mysql_close(mysql);
- if (nx_errno && thd->net.vio)
- send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
+ if (fetch_errno && thd->net.vio)
+ send_error(&thd->net, fetch_errno, "Error in fetch_master_table");
thd->net.no_send_ok = 0; // Clear up garbage after create_table_from_dump
return error;
}
void end_master_info(MASTER_INFO* mi)
{
+ if (!mi->inited)
+ return;
+ end_relay_log_info(&mi->rli);
if (mi->fd >= 0)
{
end_io_cache(&mi->file);
@@ -579,23 +901,138 @@ void end_master_info(MASTER_INFO* mi)
mi->inited = 0;
}
-int init_master_info(MASTER_INFO* mi)
+int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
+{
+ if (rli->inited)
+ return 0;
+ MY_STAT stat_area;
+ char fname[FN_REFLEN+128];
+ int info_fd;
+ const char* msg = 0;
+ int error = 0;
+
+ fn_format(fname, info_fname,
+ mysql_data_home, "", 4+32);
+ pthread_mutex_lock(&rli->data_lock);
+ info_fd = rli->info_fd;
+ rli->pending = 0;
+ rli->cur_log_fd = -1;
+ rli->slave_skip_counter=0;
+ rli->log_pos_current=0;
+ // TODO: make this work with multi-master
+ if (!opt_relay_logname)
+ {
+ char tmp[FN_REFLEN];
+ /* TODO: The following should be using fn_format(); We just need to
+ first change fn_format() to cut the file name if it's too long.
+ */
+ strmake(tmp,glob_hostname,FN_REFLEN-5);
+ strmov(strcend(tmp,'.'),"-relay-bin");
+ opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
+ }
+ rli->relay_log.set_index_file_name(opt_relaylog_index_name);
+ open_log(&rli->relay_log, glob_hostname, opt_relay_logname, "-relay-bin",
+ LOG_BIN, 1 /* read_append cache */,
+ 1 /* no auto events*/);
+
+ /* if file does not exist */
+ if (!my_stat(fname, &stat_area, MYF(0)))
+ {
+ // if someone removed the file from underneath our feet, just close
+ // the old descriptor and re-create the old file
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(MY_WME));
+ if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
+ || init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
+ MYF(MY_WME)))
+ {
+ if(info_fd >= 0)
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+ }
+ if (init_relay_log_pos(rli,"",4,0/*no data mutex*/,&msg))
+ goto err;
+ rli->master_log_pos = 0; // uninitialized
+ rli->info_fd = info_fd;
+ }
+ else // file exists
+ {
+ if(info_fd >= 0)
+ reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
+ else if((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
+ || init_io_cache(&rli->info_file, info_fd,
+ IO_SIZE*2, READ_CACHE, 0L,
+ 0, MYF(MY_WME)))
+ {
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+ }
+
+ rli->info_fd = info_fd;
+ if (init_strvar_from_file(rli->relay_log_name,
+ sizeof(rli->relay_log_name), &rli->info_file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&rli->relay_log_pos,
+ &rli->info_file, 4) ||
+ init_strvar_from_file(rli->master_log_name,
+ sizeof(rli->master_log_name), &rli->info_file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&rli->master_log_pos,
+ &rli->info_file, 0))
+ {
+ msg="Error reading slave log configuration";
+ goto err;
+ }
+ if (init_relay_log_pos(rli,0 /*log already inited*/,
+ 0 /*pos already inited*/,
+ 0 /* no data lock*/,
+ &msg))
+ goto err;
+ }
+ DBUG_ASSERT(rli->relay_log_pos >= 4);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+ rli->inited = 1;
+ // now change the cache from READ to WRITE - must do this
+ // before flush_relay_log_info
+ reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
+ error=test(flush_relay_log_info(rli));
+ pthread_mutex_unlock(&rli->data_lock);
+ return error;
+
+err:
+ sql_print_error(msg);
+ end_io_cache(&rli->info_file);
+ my_close(info_fd, MYF(0));
+ rli->info_fd=-1;
+ pthread_mutex_unlock(&rli->data_lock);
+ return 1;
+}
+
+int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
+ const char* slave_info_fname)
{
if (mi->inited)
return 0;
+ if (init_relay_log_info(&mi->rli, slave_info_fname))
+ return 1;
+ mi->rli.mi = mi;
int fd,length,error;
MY_STAT stat_area;
char fname[FN_REFLEN+128];
const char *msg;
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
+ fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
// we need a mutex while we are changing master info parameters to
// keep other threads from reading bogus info
- pthread_mutex_lock(&mi->lock);
- mi->pending = 0;
+ pthread_mutex_lock(&mi->data_lock);
fd = mi->fd;
-
+
// we do not want any messages if the file does not exist
if (!my_stat(fname, &stat_area, MYF(0)))
{
@@ -607,15 +1044,17 @@ int init_master_info(MASTER_INFO* mi)
|| init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
MYF(MY_WME)))
{
- if (fd >= 0)
+ if(fd >= 0)
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ end_relay_log_info(&mi->rli);
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
- mi->log_file_name[0] = 0;
- mi->pos = 4; // skip magic number
+ mi->master_log_name[0] = 0;
+ mi->master_log_pos = 4; // skip magic number
mi->fd = fd;
-
+
if (master_host)
strmake(mi->host, master_host, sizeof(mi->host) - 1);
if (master_user)
@@ -627,36 +1066,27 @@ int init_master_info(MASTER_INFO* mi)
}
else // file exists
{
- if (fd >= 0)
+ if(fd >= 0)
reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
- else if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
+ else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
|| init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
0, MYF(MY_WME)))
{
- if (fd >= 0)
+ if(fd >= 0)
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ end_relay_log_info(&mi->rli);
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
- if ((length=my_b_gets(&mi->file, mi->log_file_name,
- sizeof(mi->log_file_name))) < 1)
- {
- msg="Error reading log file name from master info file ";
- goto error;
- }
-
- mi->log_file_name[length-1]= 0; // kill \n
- /* Reuse fname buffer */
- if (!my_b_gets(&mi->file, fname, sizeof(fname)))
- {
- msg="Error reading log file position from master info file";
- goto error;
- }
- mi->pos = strtoull(fname,(char**) 0, 10);
-
mi->fd = fd;
- if (init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
+ if (init_strvar_from_file(mi->master_log_name,
+ sizeof(mi->master_log_name), &mi->file,
+ (char*)"") ||
+ init_intvar_from_file((int*)&mi->master_log_pos, &mi->file, 4)
+ ||
+ init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
master_host) ||
init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
master_user) ||
@@ -664,28 +1094,30 @@ int init_master_info(MASTER_INFO* mi)
master_password) ||
init_intvar_from_file((int*)&mi->port, &mi->file, master_port) ||
init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
- master_connect_retry) ||
- init_intvar_from_file((int*)&mi->last_log_seq, &mi->file, 0)
+ master_connect_retry)
)
{
msg="Error reading master configuration";
- goto error;
+ goto err;
}
}
-
+
mi->inited = 1;
// now change the cache from READ to WRITE - must do this
// before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
error=test(flush_master_info(mi));
- pthread_mutex_unlock(&mi->lock);
+ pthread_mutex_unlock(&mi->data_lock);
return error;
-error:
+err:
sql_print_error(msg);
end_io_cache(&mi->file);
+ end_relay_log_info(&mi->rli);
+ DBUG_ASSERT(fd>=0);
my_close(fd, MYF(0));
- pthread_mutex_unlock(&mi->lock);
+ mi->fd=-1;
+ pthread_mutex_unlock(&mi->data_lock);
return 1;
}
@@ -696,7 +1128,7 @@ int register_slave_on_master(MYSQL* mysql)
if (!report_host)
return 0;
-
+
int4store(buf, server_id);
packet.append(buf, 4);
@@ -705,8 +1137,8 @@ int register_slave_on_master(MYSQL* mysql)
net_store_data(&packet, report_user);
else
packet.append((char)0);
-
- if (report_password)
+
+ if(report_password)
net_store_data(&packet, report_user);
else
packet.append((char)0);
@@ -729,51 +1161,62 @@ int register_slave_on_master(MYSQL* mysql)
return 0;
}
-
-int show_master_info(THD* thd)
+int show_master_info(THD* thd, MASTER_INFO* mi)
{
+ // TODO: fix this for multi-master
DBUG_ENTER("show_master_info");
List<Item> field_list;
field_list.push_back(new Item_empty_string("Master_Host",
- sizeof(glob_mi.host)));
+ sizeof(mi->host)));
field_list.push_back(new Item_empty_string("Master_User",
- sizeof(glob_mi.user)));
+ sizeof(mi->user)));
field_list.push_back(new Item_empty_string("Master_Port", 6));
field_list.push_back(new Item_empty_string("Connect_retry", 6));
- field_list.push_back( new Item_empty_string("Log_File",
+ field_list.push_back(new Item_empty_string("Master_Log_File",
+ FN_REFLEN));
+ field_list.push_back(new Item_empty_string("Read_Master_Log_Pos", 12));
+ field_list.push_back(new Item_empty_string("Relay_Log_File",
FN_REFLEN));
- field_list.push_back(new Item_empty_string("Pos", 12));
- field_list.push_back(new Item_empty_string("Slave_Running", 3));
+ field_list.push_back(new Item_empty_string("Relay_Log_Pos", 12));
+ field_list.push_back(new Item_empty_string("Relay_Master_Log_File",
+ FN_REFLEN));
+ field_list.push_back(new Item_empty_string("Slave_IO_Running", 3));
+ field_list.push_back(new Item_empty_string("Slave_SQL_Running", 3));
field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
field_list.push_back(new Item_empty_string("Last_errno", 4));
field_list.push_back(new Item_empty_string("Last_error", 20));
field_list.push_back(new Item_empty_string("Skip_counter", 12));
- field_list.push_back(new Item_empty_string("Last_log_seq", 12));
- if (send_fields(thd, field_list, 1))
+ field_list.push_back(new Item_empty_string("Exec_master_log_pos", 12));
+ if(send_fields(thd, field_list, 1))
DBUG_RETURN(-1);
String* packet = &thd->packet;
- uint32 last_log_seq;
packet->length(0);
-
- pthread_mutex_lock(&glob_mi.lock);
- net_store_data(packet, glob_mi.host);
- net_store_data(packet, glob_mi.user);
- net_store_data(packet, (uint32) glob_mi.port);
- net_store_data(packet, (uint32) glob_mi.connect_retry);
- net_store_data(packet, glob_mi.log_file_name);
- net_store_data(packet, (longlong) glob_mi.pos);
- last_log_seq = glob_mi.last_log_seq;
- pthread_mutex_unlock(&glob_mi.lock);
- net_store_data(packet, slave_running ? "Yes":"No");
+
+ pthread_mutex_lock(&mi->data_lock);
+ pthread_mutex_lock(&mi->rli.data_lock);
+ net_store_data(packet, mi->host);
+ net_store_data(packet, mi->user);
+ net_store_data(packet, (uint32) mi->port);
+ net_store_data(packet, (uint32) mi->connect_retry);
+ net_store_data(packet, mi->master_log_name);
+ net_store_data(packet, (longlong) mi->master_log_pos);
+ net_store_data(packet, mi->rli.relay_log_name +
+ dirname_length(mi->rli.relay_log_name));
+ net_store_data(packet, (longlong) mi->rli.relay_log_pos);
+ net_store_data(packet, mi->rli.master_log_name);
+ net_store_data(packet, mi->slave_running ? "Yes":"No");
+ net_store_data(packet, mi->rli.slave_running ? "Yes":"No");
net_store_data(packet, &replicate_do_db);
net_store_data(packet, &replicate_ignore_db);
- net_store_data(packet, (uint32)last_slave_errno);
- net_store_data(packet, last_slave_error);
- net_store_data(packet, slave_skip_counter);
- net_store_data(packet, last_log_seq);
-
+ net_store_data(packet, (uint32)mi->rli.last_slave_errno);
+ net_store_data(packet, mi->rli.last_slave_error);
+ net_store_data(packet, mi->rli.slave_skip_counter);
+ net_store_data(packet, (longlong)mi->rli.master_log_pos);
+ pthread_mutex_unlock(&mi->rli.data_lock);
+ pthread_mutex_unlock(&mi->data_lock);
+
if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
DBUG_RETURN(-1);
@@ -785,59 +1228,64 @@ int flush_master_info(MASTER_INFO* mi)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
- char lbuf1[22];
-
+
my_b_seek(file, 0L);
my_b_printf(file, "%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n",
- mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user,
- mi->password, mi->port, mi->connect_retry,
- llstr(mi->last_log_seq, lbuf1));
+ mi->master_log_name, llstr(mi->master_log_pos, lbuf),
+ mi->host, mi->user,
+ mi->password, mi->port, mi->connect_retry
+ );
flush_io_cache(file);
return 0;
}
-
-int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos)
+/* TODO: the code below needs to be re-written almost from scratch
+ Main issue is how to find out if we have reached a certain position
+ in the master log my knowing the offset in the relay log.
+ */
+int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
+ ulonglong log_pos)
{
if (!inited) return -1;
- bool pos_reached;
+ bool pos_reached = 0;
int event_count = 0;
- pthread_mutex_lock(&lock);
+ pthread_mutex_lock(&data_lock);
while (!thd->killed)
{
int cmp_result;
- if (*log_file_name)
+ DBUG_ASSERT(*master_log_name || master_log_pos == 0);
+ if (*master_log_name)
{
/*
We should use dirname_length() here when we have a version of
this that doesn't modify the argument */
- char *basename = strrchr(log_file_name, FN_LIBCHAR);
+ char *basename = strrchr(master_log_name, FN_LIBCHAR);
if (basename)
++basename;
else
- basename = log_file_name;
+ basename = master_log_name;
cmp_result = strncmp(basename, log_name->ptr(),
log_name->length());
}
else
cmp_result = 0;
-
- pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0);
+
+ pos_reached = ((!cmp_result && master_log_pos >= log_pos) ||
+ cmp_result > 0);
if (pos_reached || thd->killed)
break;
-
- const char* msg = thd->enter_cond(&cond, &lock,
+
+ const char* msg = thd->enter_cond(&data_cond, &data_lock,
"Waiting for master update");
- pthread_cond_wait(&cond, &lock);
+ pthread_cond_wait(&data_cond, &data_lock);
thd->exit_cond(msg);
event_count++;
}
- pthread_mutex_unlock(&lock);
+ pthread_mutex_unlock(&data_lock);
return thd->killed ? -1 : event_count;
}
-
-static int init_slave_thread(THD* thd)
+static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
{
DBUG_ENTER("init_slave_thread");
thd->system_thread = thd->bootstrap = 1;
@@ -851,7 +1299,7 @@ static int init_slave_thread(THD* thd)
thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
thd->system_thread = 1;
thd->client_capabilities = CLIENT_LOCAL_FILES;
- slave_real_id=thd->real_id=pthread_self();
+ thd->real_id=pthread_self();
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id = thread_id++;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -861,7 +1309,6 @@ static int init_slave_thread(THD* thd)
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
{
- close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
end_thread(thd,0);
DBUG_RETURN(-1);
}
@@ -878,14 +1325,21 @@ static int init_slave_thread(THD* thd)
if (thd->max_join_size == (ulong) ~0L)
thd->options |= OPTION_BIG_SELECTS;
- thd->proc_info="Waiting for master update";
+ if (thd_type == SLAVE_THD_SQL)
+ {
+ thd->proc_info = "Waiting for the next event in slave queue";
+ }
+ else
+ {
+ thd->proc_info="Waiting for master update";
+ }
thd->version=refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
-static int safe_sleep(THD* thd, int sec)
+static int safe_sleep(THD* thd, MASTER_INFO* mi, int sec)
{
thr_alarm_t alarmed;
thr_alarm_init(&alarmed);
@@ -907,22 +1361,21 @@ static int safe_sleep(THD* thd, int sec)
// so it will not wake up the wife and kids :-)
if (thr_alarm_in_use(&alarmed))
thr_end_alarm(&alarmed);
-
- if (slave_killed(thd))
+
+ if (slave_killed(thd,mi))
return 1;
start_time=time((time_t*) 0);
}
return 0;
}
-
static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
{
char buf[FN_REFLEN + 10];
int len;
int binlog_flags = 0; // for now
- char* logname = mi->log_file_name;
- int4store(buf, mi->pos);
+ char* logname = mi->master_log_name;
+ int4store(buf, mi->master_log_pos);
int2store(buf + 4, binlog_flags);
int4store(buf + 6, server_id);
len = (uint) strlen(logname);
@@ -940,25 +1393,24 @@ static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
return 0;
}
-
static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
{
char buf[1024];
char * p = buf;
uint table_len = (uint) strlen(table);
uint db_len = (uint) strlen(db);
- if (table_len + db_len > sizeof(buf) - 2)
- {
- sql_print_error("request_table_dump: Buffer overrun");
- return 1;
- }
-
+ if(table_len + db_len > sizeof(buf) - 2)
+ {
+ sql_print_error("request_table_dump: Buffer overrun");
+ return 1;
+ }
+
*p++ = db_len;
memcpy(p, db, db_len);
p += db_len;
*p++ = table_len;
memcpy(p, table, table_len);
-
+
if (mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
{
sql_print_error("request_table_dump: Error sending the table dump \
@@ -969,7 +1421,6 @@ command");
return 0;
}
-
static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
{
ulong len = packet_error;
@@ -983,14 +1434,14 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi)
if (disconnect_slave_event_count && !(events_till_disconnect--))
return packet_error;
#endif
-
- while (!abort_loop && !abort_slave && len == packet_error &&
+
+ while (!abort_loop && !mi->abort_slave && len == packet_error &&
read_errno == EINTR )
{
len = mc_net_safe_read(mysql);
read_errno = errno;
}
- if (abort_loop || abort_slave)
+ if (abort_loop || mi->abort_slave)
return packet_error;
if (len == packet_error || (long) len < 1)
{
@@ -1002,77 +1453,85 @@ server_errno=%d)",
if (len == 1)
{
- sql_print_error("Slave: received 0 length packet from server, apparent \
-master shutdown: %s (%d)",
+ sql_print_error("Slave: received 0 length packet from server, apparent\
+ master shutdown: %s (%d)",
mc_mysql_error(mysql), read_errno);
return packet_error;
}
-
+
DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %d\n",
len, mysql->net.read_pos[4]));
return len - 1;
}
-int check_expected_error(THD* thd, int expected_error)
+int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
{
- switch (expected_error) {
- case ER_NET_READ_ERROR:
- case ER_NET_ERROR_ON_WRITE:
- case ER_SERVER_SHUTDOWN:
- case ER_NEW_ABORTING_CONNECTION:
- my_snprintf(last_slave_error, sizeof(last_slave_error),"\
-Slave: query '%s' partially completed on the master \
+ switch (expected_error)
+ {
+ case ER_NET_READ_ERROR:
+ case ER_NET_ERROR_ON_WRITE:
+ case ER_SERVER_SHUTDOWN:
+ case ER_NEW_ABORTING_CONNECTION:
+ my_snprintf(rli->last_slave_error, sizeof(rli->last_slave_error),
+ "Slave: query '%s' partially completed on the master \
and was aborted. There is a chance that your master is inconsistent at this \
-point. If you are sure that your master is ok, run this query manually on the \
-slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\
-SLAVE START;", thd->query);
- last_slave_errno = expected_error;
- sql_print_error("%s",last_slave_error);
- return 1;
- default:
- return 0;
- }
+point. If you are sure that your master is ok, run this query manually on the\
+ slave and then restart the slave with SET SQL_SLAVE_SKIP_COUNTER=1;\
+ SLAVE START;", thd->query);
+ rli->last_slave_errno = expected_error;
+ sql_print_error("%s",rli->last_slave_error);
+ return 1;
+ default:
+ return 0;
+ }
}
-
-
-static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
+static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
const char *error_msg;
- Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
- event_len, &error_msg,
- mi->old_format);
+ DBUG_ASSERT(rli->sql_thd==thd);
+ Log_event * ev = next_event(rli);
+ DBUG_ASSERT(rli->sql_thd==thd);
+ if (slave_killed(thd,rli))
+ return 1;
if (ev)
{
int type_code = ev->get_type_code();
int exec_res;
+ pthread_mutex_lock(&rli->data_lock);
if (ev->server_id == ::server_id ||
- (slave_skip_counter && type_code != ROTATE_EVENT))
+ (rli->slave_skip_counter && type_code != ROTATE_EVENT))
{
- if (type_code == LOAD_EVENT)
- skip_load_data_infile(net);
-
- mi->inc_pos(event_len, ev->log_seq);
- flush_master_info(mi);
- if (slave_skip_counter && /* protect against common user error of
+ /*
+ TODO: I/O thread must handle skipping file delivery for
+ old load data infile events
+ */
+ /* TODO: I/O thread should not even log events with the same server id */
+ rli->inc_pos(ev->get_event_len(),
+ type_code != STOP_EVENT ? ev->log_pos : 0,
+ 1/* skip lock*/);
+ flush_relay_log_info(rli);
+ if (rli->slave_skip_counter && /* protect against common user error of
setting the counter to 1 instead of 2
while recovering from an failed
auto-increment insert */
- !(type_code == INTVAR_EVENT &&
- slave_skip_counter == 1))
- --slave_skip_counter;
+ !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
+ rli->slave_skip_counter == 1))
+ --rli->slave_skip_counter;
+ pthread_mutex_unlock(&rli->data_lock);
delete ev;
return 0; // avoid infinite update loops
}
-
+ pthread_mutex_unlock(&rli->data_lock);
+
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
- if (!thd->log_seq)
- thd->log_seq = ev->log_seq;
if (!ev->when)
ev->when = time(NULL);
ev->thd = thd;
- exec_res = ev->exec_event(mi);
+ thd->log_pos = ev->log_pos;
+ exec_res = ev->exec_event(rli);
+ DBUG_ASSERT(rli->sql_thd==thd);
delete ev;
return exec_res;
}
@@ -1086,277 +1545,410 @@ This may also be a network problem, or just a bug in the master or slave code.\
}
}
-// slave thread
-
-pthread_handler_decl(handle_slave,arg __attribute__((unused)))
+/* slave I/O thread */
+pthread_handler_decl(handle_slave_io,arg)
{
#ifndef DBUG_OFF
-slave_begin:
+ slave_begin:
#endif
THD *thd; // needs to be first for thread_stack
MYSQL *mysql = NULL ;
+ MASTER_INFO* mi = (MASTER_INFO*)arg;
char llbuff[22];
bool retried_once = 0;
- ulonglong last_failed_pos = 0;
-
- pthread_mutex_lock(&LOCK_slave);
- if (!server_id)
- {
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
- sql_print_error("Server id not set, will not start slave");
- pthread_exit((void*)1);
- }
-
- if (slave_running)
- {
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
- pthread_exit((void*)1); // safety just in case
- }
- slave_running = 1;
- abort_slave = 0;
+ ulonglong last_failed_pos = 0; // TODO: see if last_failed_pos is needed
+ DBUG_ASSERT(mi->inited);
+
+ pthread_mutex_lock(&mi->run_lock);
#ifndef DBUG_OFF
- events_till_abort = abort_slave_event_count;
+ mi->events_till_abort = abort_slave_event_count;
#endif
- pthread_cond_broadcast(&COND_slave_start);
- pthread_mutex_unlock(&LOCK_slave);
-
+
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
- slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
- DBUG_ENTER("handle_slave");
+ thd = new THD; // note that contructor of THD uses DBUG_ !
+ DBUG_ENTER("handle_slave_io");
pthread_detach_this_thread();
- if (init_slave_thread(thd) || init_master_info(&glob_mi))
- {
- sql_print_error("Failed during slave thread initialization");
- goto err;
- }
+ if (init_slave_thread(thd, SLAVE_THD_IO))
+ {
+ pthread_cond_broadcast(&mi->start_cond);
+ pthread_mutex_unlock(&mi->run_lock);
+ sql_print_error("Failed during slave I/O thread initialization");
+ goto err;
+ }
+ mi->io_thd = thd;
thd->thread_stack = (char*)&thd; // remember where our stack is
- thd->temporary_tables = save_temporary_tables; // restore temp tables
threads.append(thd);
- glob_mi.pending = 0; //this should always be set to 0 when the slave thread
- // is started
-
+ mi->slave_running = 1;
+ mi->abort_slave = 0;
+ pthread_cond_broadcast(&mi->start_cond);
+ pthread_mutex_unlock(&mi->run_lock);
+
DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
- glob_mi.log_file_name, llstr(glob_mi.pos,llbuff)));
-
-
+ mi->master_log_name, llstr(mi->master_log_pos,llbuff)));
+
if (!(mysql = mc_mysql_init(NULL)))
{
- sql_print_error("Slave thread: error in mc_mysql_init()");
+ sql_print_error("Slave I/O thread: error in mc_mysql_init()");
goto err;
}
-
+
thd->proc_info = "connecting to master";
#ifndef DBUG_OFF
- sql_print_error("Slave thread initialized");
+ sql_print_error("Slave I/O thread initialized");
#endif
// we can get killed during safe_connect
- if (!safe_connect(thd, mysql, &glob_mi))
- sql_print_error("Slave: connected to master '%s@%s:%d',\
- replication started in log '%s' at position %s", glob_mi.user,
- glob_mi.host, glob_mi.port,
- RPL_LOG_NAME,
- llstr(glob_mi.pos,llbuff));
+ if (!safe_connect(thd, mysql, mi))
+ sql_print_error("Slave I/O thread: connected to master '%s@%s:%d',\
+ replication started in log '%s' at position %s", mi->user,
+ mi->host, mi->port,
+ IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
else
{
- sql_print_error("Slave thread killed while connecting to master");
+ sql_print_error("Slave I/O thread killed while connecting to master");
goto err;
}
connected:
thd->slave_net = &mysql->net;
- // register ourselves with the master
- // if fails, this is not fatal - we just print the error message and go
- // on with life
thd->proc_info = "Checking master version";
- if (check_master_version(mysql, &glob_mi))
+ if (check_master_version(mysql, mi))
{
goto err;
}
- if (!glob_mi.old_format)
+ if (!mi->old_format)
{
+ // register ourselves with the master
+ // if fails, this is not fatal - we just print the error message and go
+ // on with life
thd->proc_info = "Registering slave on master";
if (register_slave_on_master(mysql) || update_slave_list(mysql))
goto err;
}
-
- while (!slave_killed(thd))
+
+ while (!slave_killed(thd,mi))
{
- thd->proc_info = "Requesting binlog dump";
- if (request_dump(mysql, &glob_mi))
- {
- sql_print_error("Failed on request_dump()");
- if (slave_killed(thd))
- {
- sql_print_error("Slave thread killed while requesting master \
+ thd->proc_info = "Requesting binlog dump";
+ if (request_dump(mysql, mi))
+ {
+ sql_print_error("Failed on request_dump()");
+ if(slave_killed(thd,mi))
+ {
+ sql_print_error("Slave I/O thread killed while requesting master \
dump");
- goto err;
- }
-
- thd->proc_info = "Waiiting to reconnect after a failed dump request";
- if (mysql->net.vio)
- vio_close(mysql->net.vio);
- // first time retry immediately, assuming that we can recover
- // right away - if first time fails, sleep between re-tries
- // hopefuly the admin can fix the problem sometime
- if (retried_once)
- safe_sleep(thd, glob_mi.connect_retry);
- else
- retried_once = 1;
-
- if (slave_killed(thd))
- {
- sql_print_error("Slave thread killed while retrying master \
+ goto err;
+ }
+
+ thd->proc_info = "Waiiting to reconnect after a failed dump request";
+ mc_end_server(mysql);
+ // first time retry immediately, assuming that we can recover
+ // right away - if first time fails, sleep between re-tries
+ // hopefuly the admin can fix the problem sometime
+ if (retried_once)
+ safe_sleep(thd, mi, mi->connect_retry);
+ else
+ retried_once = 1;
+
+ if (slave_killed(thd,mi))
+ {
+ sql_print_error("Slave I/O thread killed while retrying master \
dump");
- goto err;
- }
-
- thd->proc_info = "Reconnecting after a failed dump request";
- last_failed_pos=glob_mi.pos;
- sql_print_error("Slave: failed dump request, reconnecting to \
-try again, log '%s' at postion %s", RPL_LOG_NAME,
- llstr(last_failed_pos,llbuff));
- if (safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
- {
- sql_print_error("Slave thread killed during or after reconnect");
- goto err;
- }
-
- goto connected;
- }
-
- while (!slave_killed(thd))
- {
- thd->proc_info = "Reading master update";
- ulong event_len = read_event(mysql, &glob_mi);
- if (slave_killed(thd))
- {
- sql_print_error("Slave thread killed while reading event");
- goto err;
- }
-
+ goto err;
+ }
+
+ thd->proc_info = "Reconnecting after a failed dump request";
+ sql_print_error("Slave I/O thread: failed dump request, \
+reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
+ if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi))
+ {
+ sql_print_error("Slave I/O thread killed during or \
+after reconnect");
+ goto err;
+ }
+
+ goto connected;
+ }
- if (event_len == packet_error)
- {
- if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
+ while (!slave_killed(thd,mi))
{
- sql_print_error("Log entry on master is longer than \
+ thd->proc_info = "Reading master update";
+ ulong event_len = read_event(mysql, mi);
+ if (slave_killed(thd,mi))
+ {
+ sql_print_error("Slave I/O thread killed while reading event");
+ goto err;
+ }
+
+ if (event_len == packet_error)
+ {
+ if (mc_mysql_errno(mysql) == ER_NET_PACKET_TOO_LARGE)
+ {
+ sql_print_error("Log entry on master is longer than \
max_allowed_packet on slave. Slave thread will be aborted. If the entry is \
really supposed to be that long, restart the server with a higher value of \
max_allowed_packet. The current value is %ld", max_allowed_packet);
- goto err;
- }
-
- thd->proc_info = "Waiting to reconnect after a failed read";
- if (mysql->net.vio)
- vio_close(mysql->net.vio);
- if (retried_once) // punish repeat offender with sleep
- safe_sleep(thd, glob_mi.connect_retry);
- else
- retried_once = 1;
-
- if (slave_killed(thd))
- {
- sql_print_error("Slave thread killed while waiting to \
+ goto err;
+ }
+
+ thd->proc_info = "Waiting to reconnect after a failed read";
+ mc_end_server(mysql);
+ if (retried_once) // punish repeat offender with sleep
+ safe_sleep(thd,mi,mi->connect_retry);
+ else
+ retried_once = 1;
+
+ if (slave_killed(thd,mi))
+ {
+ sql_print_error("Slave I/O thread killed while waiting to \
reconnect after a failed read");
- goto err;
- }
- thd->proc_info = "Reconnecting after a failed read";
- last_failed_pos= glob_mi.pos;
- sql_print_error("Slave: Failed reading log event, \
-reconnecting to retry, log '%s' position %s", RPL_LOG_NAME,
- llstr(last_failed_pos, llbuff));
- if (safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
- {
- sql_print_error("Slave thread killed during or after a \
+ goto err;
+ }
+ thd->proc_info = "Reconnecting after a failed read";
+ sql_print_error("Slave I/O thread: Failed reading log event, \
+reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos, llbuff));
+ if (safe_reconnect(thd, mysql, mi) || slave_killed(thd,mi))
+ {
+ sql_print_error("Slave I/O thread killed during or after a \
reconnect done to recover from failed read");
- goto err;
- }
-
- goto connected;
- } // if (event_len == packet_error)
-
- thd->proc_info = "Processing master log event";
- if (exec_event(thd, &mysql->net, &glob_mi, event_len))
- {
- sql_print_error("\
-Error running query, slave aborted. Fix the problem, and re-start \
-the slave thread with \"mysqladmin start-slave\". We stopped at log \
-'%s' position %s",
- RPL_LOG_NAME, llstr(glob_mi.pos, llbuff));
- goto err;
- // there was an error running the query
- // abort the slave thread, when the problem is fixed, the user
- // should restart the slave with mysqladmin start-slave
- }
+ goto err;
+ }
+ goto connected;
+ } // if(event_len == packet_error)
+
+ thd->proc_info = "Queueing event from master";
+ if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
+ (uint)event_len))
+ {
+ sql_print_error("Slave I/O thread could not queue event \
+from master");
+ goto err;
+ }
+ // TODO: check debugging abort code
#ifndef DBUG_OFF
- if (abort_slave_event_count && !--events_till_abort)
- {
- sql_print_error("Slave: debugging abort");
- goto err;
- }
+ if (abort_slave_event_count && !--events_till_abort)
+ {
+ sql_print_error("Slave I/O thread: debugging abort");
+ goto err;
+ }
#endif
+ } // while(!slave_killed(thd,mi)) - read/exec loop
+ } // while(!slave_killed(thd,mi)) - slave loop
- // successful exec with offset advance,
- // the slave repents and his sins are forgiven!
- if (glob_mi.pos > last_failed_pos)
- {
- retried_once = 0;
+ // error = 0;
+ err:
+ // print the current replication position
+ sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
+ IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
+ thd->query = thd->db = 0; // extra safety
+ if(mysql)
+ mc_mysql_close(mysql);
+ thd->proc_info = "Waiting for slave mutex on exit";
+ pthread_mutex_lock(&mi->run_lock);
+ mi->slave_running = 0;
+ mi->io_thd = 0;
+ // TODO: make rpl_status part of MASTER_INFO
+ change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
+ mi->abort_slave = 0; // TODO: check if this is needed
+ DBUG_ASSERT(thd->net.buff != 0);
+ net_end(&thd->net); // destructor will not free it, because we are weird
+ pthread_mutex_lock(&LOCK_thread_count);
+ delete thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
+ pthread_mutex_unlock(&mi->run_lock);
+ my_thread_end();
#ifndef DBUG_OFF
- stuck_count = 0;
-#endif
- }
+ if(abort_slave_event_count && !events_till_abort)
+ goto slave_begin;
+#endif
+ pthread_exit(0);
+ DBUG_RETURN(0); // Can't return anything here
+}
+
+/* slave SQL logic thread */
+
+pthread_handler_decl(handle_slave_sql,arg)
+{
#ifndef DBUG_OFF
- else
- {
- // show a little mercy, allow slave to read one more event
- // before cutting him off - otherwise he gets stuck
- // on Intvar events, since they do not advance the offset
- // immediately
- if (++stuck_count > 2)
- events_till_disconnect++;
- }
-#endif
- } // while (!slave_killed(thd)) - read/exec loop
- } // while (!slave_killed(thd)) - slave loop
+ slave_begin:
+#endif
+ THD *thd; /* needs to be first for thread_stack */
+ MYSQL *mysql = NULL ;
+ bool retried_once = 0;
+ ulonglong last_failed_pos = 0; // TODO: see if this can be removed
+ char llbuff[22],llbuff1[22];
+ RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli;
+ const char* errmsg=0;
+ DBUG_ASSERT(rli->inited);
+ pthread_mutex_lock(&rli->run_lock);
+ DBUG_ASSERT(!rli->slave_running);
+#ifndef DBUG_OFF
+ rli->events_till_abort = abort_slave_event_count;
+#endif
+
+
+ // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
+ my_thread_init();
+ thd = new THD; // note that contructor of THD uses DBUG_ !
+ DBUG_ENTER("handle_slave_sql");
+
+ pthread_detach_this_thread();
+ if (init_slave_thread(thd, SLAVE_THD_SQL))
+ {
+ // TODO: this is currently broken - slave start and change master
+ // will be stuck if we fail here
+ pthread_cond_broadcast(&rli->start_cond);
+ pthread_mutex_unlock(&rli->run_lock);
+ sql_print_error("Failed during slave thread initialization");
+ goto err;
+ }
+ thd->thread_stack = (char*)&thd; // remember where our stack is
+ thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
+ threads.append(thd);
+ rli->sql_thd = thd;
+ rli->slave_running = 1;
+ rli->abort_slave = 0;
+ pthread_cond_broadcast(&rli->start_cond);
+ pthread_mutex_unlock(&rli->run_lock);
+ rli->pending = 0; //this should always be set to 0 when the slave thread
+ // is started
+ if (init_relay_log_pos(rli,0,0,1/*need data lock*/,&errmsg))
+ {
+ sql_print_error("Error initializing relay log position: %s",
+ errmsg);
+ goto err;
+ }
+ DBUG_ASSERT(rli->relay_log_pos >= 4);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->relay_log_pos);
+
+ DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
+ rli->master_log_name, llstr(rli->master_log_pos,llbuff)));
+ DBUG_ASSERT(rli->sql_thd == thd);
+ sql_print_error("Slave SQL thread initialized, starting replication in \
+log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
+ llstr(rli->master_log_pos,llbuff),rli->relay_log_name,
+ llstr(rli->relay_log_pos,llbuff1));
+ while (!slave_killed(thd,rli))
+ {
+ thd->proc_info = "Processing master log event";
+ DBUG_ASSERT(rli->sql_thd == thd);
+ if (exec_relay_log_event(thd,rli))
+ {
+ // do not scare the user if SQL thread was simply killed or stopped
+ if (!slave_killed(thd,rli))
+ sql_print_error("\
+Error running query, slave SQL thread aborted. Fix the problem, and restart \
+the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \
+'%s' position %s",
+ RPL_LOG_NAME, llstr(rli->master_log_pos, llbuff));
+ goto err;
+ }
+ } // while(!slave_killed(thd,rli)) - read/exec loop
// error = 0;
-err:
+ err:
// print the current replication position
- sql_print_error("Slave thread exiting, replication stopped in log '%s' at \
-position %s",
- RPL_LOG_NAME, llstr(glob_mi.pos,llbuff));
+ sql_print_error("Slave SQL thread exiting, replication stopped in log \
+ '%s' at position %s",
+ RPL_LOG_NAME, llstr(rli->master_log_pos,llbuff));
thd->query = thd->db = 0; // extra safety
- if (mysql)
- mc_mysql_close(mysql);
thd->proc_info = "Waiting for slave mutex on exit";
- pthread_mutex_lock(&LOCK_slave);
- slave_running = 0;
- change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
- abort_slave = 0;
- save_temporary_tables = thd->temporary_tables;
+ pthread_mutex_lock(&rli->run_lock);
+ DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
+ rli->slave_running = 0;
+ rli->save_temporary_tables = thd->temporary_tables;
+ //TODO: see if we can do this conditionally in next_event() instead
+ // to avoid unneeded position re-init
+ rli->log_pos_current=0;
thd->temporary_tables = 0; // remove tempation from destructor to close them
- pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
- pthread_mutex_unlock(&LOCK_slave);
+ DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net); // destructor will not free it, because we are weird
- slave_thd = 0;
+ DBUG_ASSERT(rli->sql_thd == thd);
+ rli->sql_thd = 0;
+ pthread_mutex_lock(&LOCK_thread_count);
delete thd;
+ pthread_mutex_unlock(&LOCK_thread_count);
+ pthread_cond_broadcast(&rli->stop_cond);
+ // tell the world we are done
+ pthread_mutex_unlock(&rli->run_lock);
my_thread_end();
-#ifndef DBUG_OFF
- if (abort_slave_event_count && !events_till_abort)
+#ifndef DBUG_OFF // TODO: reconsider the code below
+ if (abort_slave_event_count && !rli->events_till_abort)
goto slave_begin;
#endif
pthread_exit(0);
DBUG_RETURN(0); // Can't return anything here
}
+int queue_event(MASTER_INFO* mi,const char* buf,uint event_len)
+{
+ int error;
+ bool inc_pos = 1;
+ if (mi->old_format)
+ return 1; // TODO: deal with old format
+
+ switch (buf[EVENT_TYPE_OFFSET])
+ {
+ case ROTATE_EVENT:
+ {
+ Rotate_log_event rev(buf,event_len,0);
+ if (!rev.is_valid())
+ return 1;
+ DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name));
+ memcpy(mi->master_log_name,rev.new_log_ident,
+ rev.ident_len);
+ mi->master_log_name[rev.ident_len] = 0;
+ mi->master_log_pos = rev.pos;
+ inc_pos = 0;
+#ifndef DBUG_OFF
+ /* if we do not do this, we will be getting the first
+ rotate event forever, so
+ we need to not disconnect after one
+ */
+ if (disconnect_slave_event_count)
+ events_till_disconnect++;
+#endif
+ break;
+ }
+ default:
+ break;
+ }
+
+ if (!(error = mi->rli.relay_log.appendv(buf,event_len,0)))
+ {
+ if (inc_pos)
+ mi->master_log_pos += event_len;
+ }
+ return error;
+}
-/* try to connect until successful or slave killed */
+void end_relay_log_info(RELAY_LOG_INFO* rli)
+{
+ if (!rli->inited)
+ return;
+ if (rli->info_fd >= 0)
+ {
+ end_io_cache(&rli->info_file);
+ (void)my_close(rli->info_fd, MYF(MY_WME));
+ rli->info_fd = -1;
+ }
+ if (rli->cur_log_fd >= 0)
+ {
+ end_io_cache(&rli->cache_buf);
+ (void)my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ }
+ rli->inited = 0;
+ rli->log_pos_current=0;
+ rli->relay_log.close(1);
+}
+/* try to connect until successful or slave killed */
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
{
return connect_to_master(thd, mysql, mi, 0);
@@ -1366,7 +1958,6 @@ static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
Try to connect until successful or slave killed or we have retried
master_retry_count times
*/
-
static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
bool reconnect)
{
@@ -1375,28 +1966,24 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
ulong err_count=0;
char llbuff[22];
- /*
- If we lost connection after reading a state set event
- we will be re-reading it, so pending needs to be cleared
- */
- mi->pending = 0;
#ifndef DBUG_OFF
events_till_disconnect = disconnect_slave_event_count;
#endif
- while (!(slave_was_killed = slave_killed(thd)) &&
- (reconnect ? mc_mysql_reconnect(mysql) != 0 :
+ while (!(slave_was_killed = slave_killed(thd,mi)) &&
+ (reconnect ? mc_mysql_reconnect(mysql) :
!mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0)))
{
/* Don't repeat last error */
if (mc_mysql_errno(mysql) != last_errno)
{
- sql_print_error("Slave thread: error connecting to master: \
-%s, last_errno=%d, retry in %d sec",
+ sql_print_error("Slave I/O thread: error connecting to master \
+'%s@%s:%d': \
+%s, last_errno=%d, retry in %d sec",mi->user,mi->host,mi->port,
mc_mysql_error(mysql), last_errno=mc_mysql_errno(mysql),
mi->connect_retry);
}
- safe_sleep(thd, mi->connect_retry);
+ safe_sleep(thd,mi,mi->connect_retry);
/* by default we try forever. The reason is that failure will trigger
master election, so if the user did not set master_retry_count we
do not want to have electioin triggered on the first failure to
@@ -1415,10 +2002,10 @@ static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
{
if (reconnect)
sql_print_error("Slave: connected to master '%s@%s:%d',\
-replication resumed in log '%s' at position %s", glob_mi.user,
- glob_mi.host, glob_mi.port,
- RPL_LOG_NAME,
- llstr(glob_mi.pos,llbuff));
+replication resumed in log '%s' at position %s", mi->user,
+ mi->host, mi->port,
+ IO_RPL_LOG_NAME,
+ llstr(mi->master_log_pos,llbuff));
else
{
change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
@@ -1443,6 +2030,175 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
return connect_to_master(thd, mysql, mi, 1);
}
+int flush_relay_log_info(RELAY_LOG_INFO* rli)
+{
+ IO_CACHE* file = &rli->info_file;
+ char lbuf[22],lbuf1[22];
+
+ my_b_seek(file, 0L);
+ my_b_printf(file, "%s\n%s\n%s\n%s\n",
+ rli->relay_log_name, llstr(rli->relay_log_pos, lbuf),
+ rli->master_log_name, llstr(rli->master_log_pos, lbuf1)
+ );
+ flush_io_cache(file);
+ flush_io_cache(rli->cur_log);
+ return 0;
+}
+
+IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg)
+{
+ DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
+ IO_CACHE* cur_log = rli->cur_log=&rli->cache_buf;
+ DBUG_ASSERT(rli->cur_log_fd == -1);
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->relay_log_name,
+ errmsg))<0)
+ return 0;
+ my_b_seek(cur_log,rli->relay_log_pos);
+ return cur_log;
+}
+
+Log_event* next_event(RELAY_LOG_INFO* rli)
+{
+ Log_event* ev;
+ IO_CACHE* cur_log = rli->cur_log;
+ pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
+ const char* errmsg=0;
+ THD* thd = rli->sql_thd;
+ bool was_killed;
+ DBUG_ASSERT(thd != 0);
+
+ // For most operations we need to protect rli members with data_lock,
+ // so we will hold it for the most of the loop below
+ // However, we will release it whenever it is worth the hassle,
+ // and in the cases when we go into a pthread_cond_wait() with the
+ // non-data_lock mutex
+ pthread_mutex_lock(&rli->data_lock);
+
+ for (;!(was_killed=slave_killed(thd,rli));)
+ {
+ // we can have two kinds of log reading:
+ // hot_log - rli->cur_log points at the IO_CACHE of relay_log, which
+ // is actively being updated by the I/O thread. We need to be careful
+ // in this case and make sure that we are not looking at a stale log that
+ // has already been rotated. If it has been, we reopen the log
+ // the other case is much simpler - we just have a read only log that
+ // nobody else will be updating.
+ bool hot_log;
+ if ((hot_log = (cur_log != &rli->cache_buf)))
+ {
+ DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
+ pthread_mutex_lock(log_lock);
+ // reading cur_log->init_count here is safe because the log will only
+ // be rotated when we hold relay_log.LOCK_log
+ if (cur_log->init_count != rli->cur_log_init_count)
+ {
+ if (!(cur_log=reopen_relay_log(rli,&errmsg)))
+ {
+ pthread_mutex_unlock(log_lock);
+ goto err;
+ }
+ pthread_mutex_unlock(log_lock);
+ hot_log=0;
+ }
+ }
+ DBUG_ASSERT(my_b_tell(cur_log) >= 4);
+ DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
+ if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format)))
+ {
+ DBUG_ASSERT(thd==rli->sql_thd);
+ if (hot_log)
+ pthread_mutex_unlock(log_lock);
+ pthread_mutex_unlock(&rli->data_lock);
+ return ev;
+ }
+ DBUG_ASSERT(thd==rli->sql_thd);
+ if (!cur_log->error) /* EOF */
+ {
+ // on a hot log, EOF means that there are no more updates to
+ // process and we must block until I/O thread adds some and
+ // signals us to continue
+ if (hot_log)
+ {
+ DBUG_ASSERT(cur_log->init_count == rli->cur_log_init_count);
+ //we can, and should release data_lock while we are waiting for
+ // update. If we do not, show slave status will block
+ pthread_mutex_unlock(&rli->data_lock);
+
+ // IMPORTANT: note that wait_for_update will unlock LOCK_log, but
+ // expects the caller to lock it
+ rli->relay_log.wait_for_update(rli->sql_thd);
+
+ // re-acquire data lock since we released it earlier
+ pthread_mutex_lock(&rli->data_lock);
+ continue;
+ }
+ // if the log was not hot, we need to move to the next log in
+ // sequence. The next log could be hot or cold, we deal with both
+ // cases separately after doing some common initialization
+ else
+ {
+ end_io_cache(cur_log);
+ DBUG_ASSERT(rli->cur_log_fd >= 0);
+ my_close(rli->cur_log_fd, MYF(MY_WME));
+ rli->cur_log_fd = -1;
+ int error;
+
+ // purge_first_log will properly set up relay log coordinates in rli
+ if (rli->relay_log.purge_first_log(rli))
+ {
+ errmsg = "Error purging processed log";
+ goto err;
+ }
+
+ // next log is hot
+ if (rli->relay_log.is_active(rli->linfo.log_file_name))
+ {
+#ifdef EXTRA_DEBUG
+ sql_print_error("next log '%s' is currently active",
+ rli->linfo.log_file_name);
+#endif
+ rli->cur_log = cur_log = rli->relay_log.get_log_file();
+ rli->cur_log_init_count = cur_log->init_count;
+ DBUG_ASSERT(rli->cur_log_fd == -1);
+
+ // read pointer has to be at the start since we are the only
+ // reader
+ if (check_binlog_magic(cur_log,&errmsg))
+ goto err;
+ continue;
+ }
+ // if we get here, the log was not hot, so we will have to
+ // open it ourselves
+#ifdef EXTRA_DEBUG
+ sql_print_error("next log '%s' is not active",
+ rli->linfo.log_file_name);
+#endif
+ // open_binlog() will check the magic header
+ if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
+ &errmsg))<0)
+ goto err;
+ }
+ }
+ else // read failed with a non-EOF error
+ {
+ // TODO: come up with something better to handle this error
+ sql_print_error("Slave SQL thread: I/O error reading \
+event(errno=%d,cur_log->error=%d)",
+ my_errno,cur_log->error);
+ // no need to hog the mutex while we sleep
+ pthread_mutex_unlock(&rli->data_lock);
+ safe_sleep(rli->sql_thd,rli->mi,1);
+ pthread_mutex_lock(&rli->data_lock);
+ }
+ }
+ if (!errmsg && was_killed)
+ errmsg = "slave SQL thread was killed";
+err:
+ pthread_mutex_unlock(&rli->data_lock);
+ sql_print_error("Error reading relay log event: %s", errmsg);
+ return 0;
+}
+
#ifdef __GNUC__
template class I_List_iterator<i_string>;