summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc350
1 files changed, 25 insertions, 325 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index a00809b6994..e5da79d6871 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -24,10 +24,8 @@
#include <thr_alarm.h>
#include <my_dir.h>
-#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\
- "FIRST")
-
volatile bool slave_running = 0;
+char* slave_load_tmpdir = 0;
pthread_t slave_real_id;
MASTER_INFO glob_mi;
HASH replicate_do_table, replicate_ignore_table;
@@ -41,16 +39,17 @@ THD* slave_thd = 0;
// when slave thread exits, we need to remember the temporary tables so we
// can re-use them on slave start
-static int last_slave_errno = 0;
-static char last_slave_error[1024] = "";
+int last_slave_errno = 0;
+char last_slave_error[MAX_SLAVE_ERRMSG] = "";
#ifndef DBUG_OFF
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
-static int events_till_disconnect = -1, events_till_abort = -1;
+static int events_till_disconnect = -1;
+int events_till_abort = -1;
static int stuck_count = 0;
#endif
-inline void skip_load_data_infile(NET* net);
+void skip_load_data_infile(NET* net);
inline bool slave_killed(THD* thd);
static int init_slave_thread(THD* thd);
static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
@@ -59,8 +58,7 @@ static int safe_sleep(THD* thd, int sec);
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, NET* net, const char* db,
const char* table_name);
-inline char* rewrite_db(char* db);
-static int check_expected_error(THD* thd, int expected_error);
+char* rewrite_db(char* db);
static void free_table_ent(TABLE_RULE_ENT* e)
{
@@ -219,7 +217,16 @@ inline bool slave_killed(THD* thd)
return abort_slave || abort_loop || thd->killed;
}
-inline void skip_load_data_infile(NET* net)
+void slave_print_error(int err_code, const char* msg, ...)
+{
+ va_list args;
+ va_start(args,msg);
+ my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args);
+ sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code);
+ last_slave_errno = err_code;
+}
+
+void skip_load_data_infile(NET* net)
{
(void)my_net_write(net, "\xfb/dev/null", 10);
(void)net_flush(net);
@@ -227,7 +234,7 @@ inline void skip_load_data_infile(NET* net)
send_ok(net); // the master expects it
}
-inline char* rewrite_db(char* db)
+char* rewrite_db(char* db)
{
if(replicate_rewrite_db.is_empty() || !db) return db;
I_List_iterator<i_string_pair> it(replicate_rewrite_db);
@@ -904,7 +911,7 @@ server_errno=%d)",
return len - 1;
}
-static int check_expected_error(THD* thd, int expected_error)
+int check_expected_error(THD* thd, int expected_error)
{
switch(expected_error)
{
@@ -935,6 +942,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
if (ev)
{
int type_code = ev->get_type_code();
+ int exec_res;
if (ev->server_id == ::server_id || slave_skip_counter)
{
if(type_code == LOAD_EVENT)
@@ -952,320 +960,12 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
thd->set_time(); // time the query
if(!thd->log_seq)
thd->log_seq = ev->log_seq;
-
if (!ev->when)
ev->when = time(NULL);
-
- switch(type_code) {
- case QUERY_EVENT:
- {
- Query_log_event* qev = (Query_log_event*)ev;
- int q_len = qev->q_len;
- int expected_error,actual_error = 0;
- init_sql_alloc(&thd->mem_root, 8192,0);
- thd->db = rewrite_db((char*)qev->db);
- if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
- {
- thd->query = (char*)qev->query;
- thd->set_time((time_t)qev->when);
- thd->current_tablenr = 0;
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = query_id++;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- thd->query_error = 0; // clear error
- thd->net.last_errno = 0;
- thd->net.last_error[0] = 0;
- thd->slave_proxy_id = qev->thread_id; // for temp tables
-
- // sanity check to make sure the master did not get a really bad
- // error on the query
- if (!check_expected_error(thd, (expected_error = qev->error_code)))
- {
- mysql_parse(thd, thd->query, q_len);
- if (expected_error !=
- (actual_error = thd->net.last_errno) && expected_error)
- {
- const char* errmsg = "Slave: did not get the expected error\
- running query from master - expected: '%s'(%d), got '%s'(%d)";
- sql_print_error(errmsg, ER_SAFE(expected_error),
- expected_error,
- actual_error ? thd->net.last_error:"no error",
- actual_error);
- thd->query_error = 1;
- }
- else if (expected_error == actual_error)
- {
- thd->query_error = 0;
- *last_slave_error = 0;
- last_slave_errno = 0;
- }
- }
- else
- {
- // master could be inconsistent, abort and tell DBA to check/fix it
- thd->db = thd->query = 0;
- thd->convert_set = 0;
- close_thread_tables(thd);
- free_root(&thd->mem_root,0);
- delete ev;
- return 1;
- }
- }
- thd->db = 0; // prevent db from being freed
- thd->query = 0; // just to be sure
- // assume no convert for next query unless set explictly
- thd->convert_set = 0;
- close_thread_tables(thd);
-
- if (thd->query_error || thd->fatal_error)
- {
- sql_print_error("Slave: error running query '%s' ",
- qev->query);
- last_slave_errno = actual_error ? actual_error : -1;
- my_snprintf(last_slave_error, sizeof(last_slave_error),
- "error '%s' on query '%s'",
- actual_error ? thd->net.last_error :
- "unexpected success or fatal error",
- qev->query
- );
- free_root(&thd->mem_root,0);
- delete ev;
- return 1;
- }
- free_root(&thd->mem_root,0);
- thd->log_seq = 0;
- mi->inc_pos(event_len, ev->log_seq);
- delete ev;
- flush_master_info(mi);
- break;
- }
-
- case SLAVE_EVENT:
- {
- if(mysql_bin_log.is_open())
- {
- Slave_log_event *sev = (Slave_log_event*)ev;
- mysql_bin_log.write(sev);
- }
-
- thd->log_seq = 0;
- mi->inc_pos(event_len, ev->log_seq);
- flush_master_info(mi);
- delete ev;
- break;
- }
-
- case LOAD_EVENT:
- {
- Load_log_event* lev = (Load_log_event*)ev;
- init_sql_alloc(&thd->mem_root, 8192,0);
- thd->db = rewrite_db((char*)lev->db);
- thd->query = 0;
- thd->query_error = 0;
-
- if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
- {
- thd->set_time((time_t)lev->when);
- thd->current_tablenr = 0;
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = query_id++;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
-
- TABLE_LIST tables;
- bzero((char*) &tables,sizeof(tables));
- tables.db = thd->db;
- tables.name = tables.real_name = (char*)lev->table_name;
- tables.lock_type = TL_WRITE;
- // the table will be opened in mysql_load
- if(table_rules_on && !tables_ok(thd, &tables))
- {
- skip_load_data_infile(net);
- }
- else
- {
- enum enum_duplicates handle_dup = DUP_IGNORE;
- if(lev->sql_ex.opt_flags && REPLACE_FLAG)
- handle_dup = DUP_REPLACE;
- sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags &&
- DUMPFILE_FLAG );
- String field_term(&lev->sql_ex.field_term, 1),
- enclosed(&lev->sql_ex.enclosed, 1),
- line_term(&lev->sql_ex.line_term,1),
- escaped(&lev->sql_ex.escaped, 1),
- line_start(&lev->sql_ex.line_start, 1);
-
- ex.field_term = &field_term;
- if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
- ex.field_term->length(0);
-
- ex.enclosed = &enclosed;
- if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
- ex.enclosed->length(0);
-
- ex.line_term = &line_term;
- if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
- ex.line_term->length(0);
-
- ex.line_start = &line_start;
- if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
- ex.line_start->length(0);
-
- ex.escaped = &escaped;
- if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
- ex.escaped->length(0);
-
- ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
- if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
- ex.field_term->length(0);
-
- ex.skip_lines = lev->skip_lines;
-
-
- List<Item> fields;
- lev->set_fields(fields);
- thd->slave_proxy_id = thd->thread_id;
- thd->net.vio = net->vio;
- // mysql_load will use thd->net to read the file
- thd->net.pkt_nr = net->pkt_nr;
- // make sure the client does not get confused
- // about the packet sequence
- if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
- TL_WRITE))
- thd->query_error = 1;
- if(thd->cuted_fields)
- sql_print_error("Slave: load data infile at position %s in log \
-'%s' produced %d warning(s)", llstr(glob_mi.pos,llbuff), RPL_LOG_NAME,
- thd->cuted_fields );
- net->pkt_nr = thd->net.pkt_nr;
- }
- }
- else
- {
- // we will just ask the master to send us /dev/null if we do not
- // want to load the data :-)
- skip_load_data_infile(net);
- }
-
- thd->net.vio = 0;
- thd->db = 0;// prevent db from being freed
- close_thread_tables(thd);
- if(thd->query_error)
- {
- int sql_error = thd->net.last_errno;
- if(!sql_error)
- sql_error = ER_UNKNOWN_ERROR;
-
- sql_print_error("Slave: Error '%s' running load data infile ",
- ER(sql_error));
- delete ev;
- free_root(&thd->mem_root,0);
- return 1;
- }
-
- thd->log_seq = 0;
- free_root(&thd->mem_root,0);
-
- if(thd->fatal_error)
- {
- sql_print_error("Slave: Fatal error running query '%s' ",
- thd->query);
- delete ev;
- return 1;
- }
-
- mi->inc_pos(event_len, ev->log_seq);
- delete ev;
- flush_master_info(mi);
- break;
- }
-
- case START_EVENT:
- close_temporary_tables(thd);
- mi->inc_pos(event_len, ev->log_seq);
- flush_master_info(mi);
- delete ev;
- thd->log_seq = 0;
- break;
-
- case STOP_EVENT:
- if(mi->pos > 4) // stop event should be ignored after rotate event
- {
- close_temporary_tables(thd);
- mi->inc_pos(event_len, ev->log_seq);
- flush_master_info(mi);
- }
- delete ev;
- thd->log_seq = 0;
- break;
- case ROTATE_EVENT:
- {
- Rotate_log_event* rev = (Rotate_log_event*)ev;
- int ident_len = rev->ident_len;
- bool rotate_binlog = 0, write_slave_event = 0;
- char* log_name = mi->log_file_name;
- pthread_mutex_lock(&mi->lock);
-
- // rotate local binlog only if the name of remote has changed
- if (!*log_name || !(log_name[ident_len] == 0 &&
- !memcmp(log_name, rev->new_log_ident, ident_len)))
- {
- write_slave_event = (!(rev->flags & LOG_EVENT_FORCED_ROTATE_F)
- && mysql_bin_log.is_open());
- rotate_binlog = (*log_name && write_slave_event);
- memcpy(log_name, rev->new_log_ident,ident_len );
- log_name[ident_len] = 0;
- }
- mi->pos = rev->pos;
- mi->last_log_seq = ev->log_seq;
-#ifndef DBUG_OFF
- if (abort_slave_event_count)
- ++events_till_abort;
-#endif
- if (rotate_binlog)
- {
- mysql_bin_log.new_file();
- mi->last_log_seq = 0;
- }
- pthread_cond_broadcast(&mi->cond);
- pthread_mutex_unlock(&mi->lock);
- flush_master_info(mi);
-
- if (write_slave_event)
- {
- Slave_log_event s(thd, mi);
- if (s.master_host)
- {
- s.set_log_seq(0, &mysql_bin_log);
- s.server_id = ::server_id;
- mysql_bin_log.write(&s);
- }
- }
-
- delete ev;
- thd->log_seq = 0;
- break;
- }
-
- case INTVAR_EVENT:
- {
- Intvar_log_event* iev = (Intvar_log_event*)ev;
- switch(iev->type)
- {
- case LAST_INSERT_ID_EVENT:
- thd->last_insert_id_used = 1;
- thd->last_insert_id = iev->val;
- break;
- case INSERT_ID_EVENT:
- thd->next_insert_id = iev->val;
- break;
-
- }
- mi->inc_pending(event_len);
- delete ev;
- // do not reset log_seq
- break;
- }
- }
+ ev->thd = thd;
+ exec_res = ev->exec_event(mi);
+ delete ev;
+ return exec_res;
}
else
{
@@ -1275,7 +975,6 @@ This may also be a network problem, or just a bug in the master or slave code.\
");
return 1;
}
- return 0;
}
// slave thread
@@ -1363,6 +1062,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused)))
connected:
+ thd->slave_net = &mysql->net;
// register ourselves with the master
// if fails, this is not fatal - we just print the error message and go
// on with life