diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 350 |
1 files changed, 25 insertions, 325 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index a00809b6994..e5da79d6871 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -24,10 +24,8 @@ #include <thr_alarm.h> #include <my_dir.h> -#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\ - "FIRST") - volatile bool slave_running = 0; +char* slave_load_tmpdir = 0; pthread_t slave_real_id; MASTER_INFO glob_mi; HASH replicate_do_table, replicate_ignore_table; @@ -41,16 +39,17 @@ THD* slave_thd = 0; // when slave thread exits, we need to remember the temporary tables so we // can re-use them on slave start -static int last_slave_errno = 0; -static char last_slave_error[1024] = ""; +int last_slave_errno = 0; +char last_slave_error[MAX_SLAVE_ERRMSG] = ""; #ifndef DBUG_OFF int disconnect_slave_event_count = 0, abort_slave_event_count = 0; -static int events_till_disconnect = -1, events_till_abort = -1; +static int events_till_disconnect = -1; +int events_till_abort = -1; static int stuck_count = 0; #endif -inline void skip_load_data_infile(NET* net); +void skip_load_data_infile(NET* net); inline bool slave_killed(THD* thd); static int init_slave_thread(THD* thd); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); @@ -59,8 +58,7 @@ static int safe_sleep(THD* thd, int sec); static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); -inline char* rewrite_db(char* db); -static int check_expected_error(THD* thd, int expected_error); +char* rewrite_db(char* db); static void free_table_ent(TABLE_RULE_ENT* e) { @@ -219,7 +217,16 @@ inline bool slave_killed(THD* thd) return abort_slave || abort_loop || thd->killed; } -inline void skip_load_data_infile(NET* net) +void slave_print_error(int err_code, const char* msg, ...) +{ + va_list args; + va_start(args,msg); + my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args); + sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code); + last_slave_errno = err_code; +} + +void skip_load_data_infile(NET* net) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); @@ -227,7 +234,7 @@ inline void skip_load_data_infile(NET* net) send_ok(net); // the master expects it } -inline char* rewrite_db(char* db) +char* rewrite_db(char* db) { if(replicate_rewrite_db.is_empty() || !db) return db; I_List_iterator<i_string_pair> it(replicate_rewrite_db); @@ -904,7 +911,7 @@ server_errno=%d)", return len - 1; } -static int check_expected_error(THD* thd, int expected_error) +int check_expected_error(THD* thd, int expected_error) { switch(expected_error) { @@ -935,6 +942,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) if (ev) { int type_code = ev->get_type_code(); + int exec_res; if (ev->server_id == ::server_id || slave_skip_counter) { if(type_code == LOAD_EVENT) @@ -952,320 +960,12 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) thd->set_time(); // time the query if(!thd->log_seq) thd->log_seq = ev->log_seq; - if (!ev->when) ev->when = time(NULL); - - switch(type_code) { - case QUERY_EVENT: - { - Query_log_event* qev = (Query_log_event*)ev; - int q_len = qev->q_len; - int expected_error,actual_error = 0; - init_sql_alloc(&thd->mem_root, 8192,0); - thd->db = rewrite_db((char*)qev->db); - if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) - { - thd->query = (char*)qev->query; - thd->set_time((time_t)qev->when); - thd->current_tablenr = 0; - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - thd->query_error = 0; // clear error - thd->net.last_errno = 0; - thd->net.last_error[0] = 0; - thd->slave_proxy_id = qev->thread_id; // for temp tables - - // sanity check to make sure the master did not get a really bad - // error on the query - if (!check_expected_error(thd, (expected_error = qev->error_code))) - { - mysql_parse(thd, thd->query, q_len); - if (expected_error != - (actual_error = thd->net.last_errno) && expected_error) - { - const char* errmsg = "Slave: did not get the expected error\ - running query from master - expected: '%s'(%d), got '%s'(%d)"; - sql_print_error(errmsg, ER_SAFE(expected_error), - expected_error, - actual_error ? thd->net.last_error:"no error", - actual_error); - thd->query_error = 1; - } - else if (expected_error == actual_error) - { - thd->query_error = 0; - *last_slave_error = 0; - last_slave_errno = 0; - } - } - else - { - // master could be inconsistent, abort and tell DBA to check/fix it - thd->db = thd->query = 0; - thd->convert_set = 0; - close_thread_tables(thd); - free_root(&thd->mem_root,0); - delete ev; - return 1; - } - } - thd->db = 0; // prevent db from being freed - thd->query = 0; // just to be sure - // assume no convert for next query unless set explictly - thd->convert_set = 0; - close_thread_tables(thd); - - if (thd->query_error || thd->fatal_error) - { - sql_print_error("Slave: error running query '%s' ", - qev->query); - last_slave_errno = actual_error ? actual_error : -1; - my_snprintf(last_slave_error, sizeof(last_slave_error), - "error '%s' on query '%s'", - actual_error ? thd->net.last_error : - "unexpected success or fatal error", - qev->query - ); - free_root(&thd->mem_root,0); - delete ev; - return 1; - } - free_root(&thd->mem_root,0); - thd->log_seq = 0; - mi->inc_pos(event_len, ev->log_seq); - delete ev; - flush_master_info(mi); - break; - } - - case SLAVE_EVENT: - { - if(mysql_bin_log.is_open()) - { - Slave_log_event *sev = (Slave_log_event*)ev; - mysql_bin_log.write(sev); - } - - thd->log_seq = 0; - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - delete ev; - break; - } - - case LOAD_EVENT: - { - Load_log_event* lev = (Load_log_event*)ev; - init_sql_alloc(&thd->mem_root, 8192,0); - thd->db = rewrite_db((char*)lev->db); - thd->query = 0; - thd->query_error = 0; - - if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) - { - thd->set_time((time_t)lev->when); - thd->current_tablenr = 0; - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - TABLE_LIST tables; - bzero((char*) &tables,sizeof(tables)); - tables.db = thd->db; - tables.name = tables.real_name = (char*)lev->table_name; - tables.lock_type = TL_WRITE; - // the table will be opened in mysql_load - if(table_rules_on && !tables_ok(thd, &tables)) - { - skip_load_data_infile(net); - } - else - { - enum enum_duplicates handle_dup = DUP_IGNORE; - if(lev->sql_ex.opt_flags && REPLACE_FLAG) - handle_dup = DUP_REPLACE; - sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && - DUMPFILE_FLAG ); - String field_term(&lev->sql_ex.field_term, 1), - enclosed(&lev->sql_ex.enclosed, 1), - line_term(&lev->sql_ex.line_term,1), - escaped(&lev->sql_ex.escaped, 1), - line_start(&lev->sql_ex.line_start, 1); - - ex.field_term = &field_term; - if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); - - ex.enclosed = &enclosed; - if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY) - ex.enclosed->length(0); - - ex.line_term = &line_term; - if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY) - ex.line_term->length(0); - - ex.line_start = &line_start; - if(lev->sql_ex.empty_flags & LINE_START_EMPTY) - ex.line_start->length(0); - - ex.escaped = &escaped; - if(lev->sql_ex.empty_flags & ESCAPED_EMPTY) - ex.escaped->length(0); - - ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG); - if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); - - ex.skip_lines = lev->skip_lines; - - - List<Item> fields; - lev->set_fields(fields); - thd->slave_proxy_id = thd->thread_id; - thd->net.vio = net->vio; - // mysql_load will use thd->net to read the file - thd->net.pkt_nr = net->pkt_nr; - // make sure the client does not get confused - // about the packet sequence - if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1, - TL_WRITE)) - thd->query_error = 1; - if(thd->cuted_fields) - sql_print_error("Slave: load data infile at position %s in log \ -'%s' produced %d warning(s)", llstr(glob_mi.pos,llbuff), RPL_LOG_NAME, - thd->cuted_fields ); - net->pkt_nr = thd->net.pkt_nr; - } - } - else - { - // we will just ask the master to send us /dev/null if we do not - // want to load the data :-) - skip_load_data_infile(net); - } - - thd->net.vio = 0; - thd->db = 0;// prevent db from being freed - close_thread_tables(thd); - if(thd->query_error) - { - int sql_error = thd->net.last_errno; - if(!sql_error) - sql_error = ER_UNKNOWN_ERROR; - - sql_print_error("Slave: Error '%s' running load data infile ", - ER(sql_error)); - delete ev; - free_root(&thd->mem_root,0); - return 1; - } - - thd->log_seq = 0; - free_root(&thd->mem_root,0); - - if(thd->fatal_error) - { - sql_print_error("Slave: Fatal error running query '%s' ", - thd->query); - delete ev; - return 1; - } - - mi->inc_pos(event_len, ev->log_seq); - delete ev; - flush_master_info(mi); - break; - } - - case START_EVENT: - close_temporary_tables(thd); - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - delete ev; - thd->log_seq = 0; - break; - - case STOP_EVENT: - if(mi->pos > 4) // stop event should be ignored after rotate event - { - close_temporary_tables(thd); - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - } - delete ev; - thd->log_seq = 0; - break; - case ROTATE_EVENT: - { - Rotate_log_event* rev = (Rotate_log_event*)ev; - int ident_len = rev->ident_len; - bool rotate_binlog = 0, write_slave_event = 0; - char* log_name = mi->log_file_name; - pthread_mutex_lock(&mi->lock); - - // rotate local binlog only if the name of remote has changed - if (!*log_name || !(log_name[ident_len] == 0 && - !memcmp(log_name, rev->new_log_ident, ident_len))) - { - write_slave_event = (!(rev->flags & LOG_EVENT_FORCED_ROTATE_F) - && mysql_bin_log.is_open()); - rotate_binlog = (*log_name && write_slave_event); - memcpy(log_name, rev->new_log_ident,ident_len ); - log_name[ident_len] = 0; - } - mi->pos = rev->pos; - mi->last_log_seq = ev->log_seq; -#ifndef DBUG_OFF - if (abort_slave_event_count) - ++events_till_abort; -#endif - if (rotate_binlog) - { - mysql_bin_log.new_file(); - mi->last_log_seq = 0; - } - pthread_cond_broadcast(&mi->cond); - pthread_mutex_unlock(&mi->lock); - flush_master_info(mi); - - if (write_slave_event) - { - Slave_log_event s(thd, mi); - if (s.master_host) - { - s.set_log_seq(0, &mysql_bin_log); - s.server_id = ::server_id; - mysql_bin_log.write(&s); - } - } - - delete ev; - thd->log_seq = 0; - break; - } - - case INTVAR_EVENT: - { - Intvar_log_event* iev = (Intvar_log_event*)ev; - switch(iev->type) - { - case LAST_INSERT_ID_EVENT: - thd->last_insert_id_used = 1; - thd->last_insert_id = iev->val; - break; - case INSERT_ID_EVENT: - thd->next_insert_id = iev->val; - break; - - } - mi->inc_pending(event_len); - delete ev; - // do not reset log_seq - break; - } - } + ev->thd = thd; + exec_res = ev->exec_event(mi); + delete ev; + return exec_res; } else { @@ -1275,7 +975,6 @@ This may also be a network problem, or just a bug in the master or slave code.\ "); return 1; } - return 0; } // slave thread @@ -1363,6 +1062,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) connected: + thd->slave_net = &mysql->net; // register ourselves with the master // if fails, this is not fatal - we just print the error message and go // on with life |