diff options
-rw-r--r-- | mysql-test/r/rpl000001.result | 2 | ||||
-rw-r--r-- | sql/log_event.cc | 64 | ||||
-rw-r--r-- | sql/log_event.h | 11 | ||||
-rw-r--r-- | sql/mysql_priv.h | 1 | ||||
-rw-r--r-- | sql/net_serv.cc | 11 | ||||
-rw-r--r-- | sql/repl_failsafe.cc | 1 | ||||
-rw-r--r-- | sql/slave.cc | 140 | ||||
-rw-r--r-- | sql/slave.h | 2 | ||||
-rw-r--r-- | sql/sql_load.cc | 7 |
9 files changed, 195 insertions, 44 deletions
diff --git a/mysql-test/r/rpl000001.result b/mysql-test/r/rpl000001.result index 3dae52c6d3f..3767de9ad8d 100644 --- a/mysql-test/r/rpl000001.result +++ b/mysql-test/r/rpl000001.result @@ -7,7 +7,7 @@ use test; drop table if exists t1,t3; create table t1 (word char(20) not null); load data infile '../../std_data/words.dat' into table t1; -load data local infile '/home/sasha/bk/mysql-4.0/mysql-test/std_data/words.dat' into table t1; +load data local infile '$MYSQL_TEST_DIR/std_data/words.dat' into table t1; select * from t1; word Aarhus diff --git a/sql/log_event.cc b/sql/log_event.cc index 648e9175e13..528110deb74 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -556,6 +556,8 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, ev = new Query_log_event(buf, event_len, old_format); break; case LOAD_EVENT: + ev = new Create_file_log_event(buf, event_len, old_format); + break; case NEW_LOAD_EVENT: ev = new Load_log_event(buf, event_len, old_format); break; @@ -566,7 +568,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, ev = new Slave_log_event(buf, event_len); break; case CREATE_FILE_EVENT: - ev = new Create_file_log_event(buf, event_len); + ev = new Create_file_log_event(buf, event_len, old_format); break; case APPEND_BLOCK_EVENT: ev = new Append_block_log_event(buf, event_len); @@ -959,6 +961,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) if (use_new_format) { empty_flags=0; + /* the code below assumes that buf will not disappear from + under our feet during the lifetime of the event. This assumption + holds true in the slave thread if the log is in new format, but is not + the case when we have old format because we will be reusing net buffer + to read the actual file before we write out the Create_file event + */ if (read_str(buf, buf_end, field_term, field_term_len) || read_str(buf, buf_end, enclosed, enclosed_len) || read_str(buf, buf_end, line_term, line_term_len) || @@ -970,11 +978,11 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) else { field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1; - *field_term=*buf++; - *enclosed= *buf++; - *line_term= *buf++; - *line_start=*buf++; - *escaped= *buf++; + field_term = buf++; + enclosed= buf++; + line_term= buf++; + line_start= buf++; + escaped= buf++; opt_flags = *buf++; empty_flags=*buf++; if (empty_flags & FIELD_TERM_EMPTY) @@ -1095,7 +1103,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, db_len = (uint)data_head[L_DB_LEN_OFFSET]; num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET); - int body_offset = get_data_body_offset(); + int body_offset = (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + LOAD_HEADER_LEN + OLD_HEADER_LEN : get_data_body_offset(); + if ((int) event_len < body_offset) return 1; //sql_ex.init() on success returns the pointer to the first byte after @@ -1117,7 +1127,6 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, table_name = fields + field_block_len; db = table_name + table_name_len + 1; fname = db + db_len + 1; - int type_code = get_type_code(); fname_len = strlen(fname); // null termination is accomplished by the caller doing buf[event_len]=0 return 0; @@ -1367,20 +1376,29 @@ int Create_file_log_event::write_base(IO_CACHE* file) return res; } -Create_file_log_event::Create_file_log_event(const char* buf, int len): - Load_log_event(buf,0,0),fake_base(0),block(0) +Create_file_log_event::Create_file_log_event(const char* buf, int len, + bool old_format): + Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0) { int block_offset; - if (copy_log_event(buf,len,0)) - return; - file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + - + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET); - block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() + - CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname - if (len < block_offset) + if (copy_log_event(buf,len,old_format)) return; - block = (char*)buf + block_offset; - block_len = len - block_offset; + if (!old_format) + { + file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + + + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET); + block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() + + CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname + if (len < block_offset) + return; + block = (char*)buf + block_offset; + block_len = len - block_offset; + } + else + { + sql_ex.force_new_format(); + inited_from_old = 1; + } } @@ -1568,6 +1586,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) int expected_error,actual_error = 0; init_sql_alloc(&thd->mem_root, 8192,0); thd->db = rewrite_db((char*)db); + DBUG_ASSERT(q_len == strlen(query)); if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) { thd->query = (char*)query; @@ -1739,11 +1758,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) int Start_log_event::exec_event(struct st_relay_log_info* rli) { + close_temporary_tables(thd); + // if we have old format, load_tmpdir is cleaned up by the I/O thread + // TODO: cleanup_load_tmpdir() needs to remove only the files associated + // with the server id that has just started if (!rli->mi->old_format) - { - close_temporary_tables(thd); cleanup_load_tmpdir(); - } return Log_event::exec_event(rli); } diff --git a/sql/log_event.h b/sql/log_event.h index 089d9589763..a29c3952d46 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -64,6 +64,8 @@ struct old_sql_ex char empty_flags; }; +#define NUM_LOAD_DELIM_STRS 5 + struct sql_ex_info { @@ -153,8 +155,8 @@ struct sql_ex_info #define L_THREAD_ID_OFFSET 0 #define L_EXEC_TIME_OFFSET 4 #define L_SKIP_LINES_OFFSET 8 -#define L_DB_LEN_OFFSET 12 -#define L_TBL_LEN_OFFSET 13 +#define L_TBL_LEN_OFFSET 12 +#define L_DB_LEN_OFFSET 13 #define L_NUM_FIELDS_OFFSET 14 #define L_SQL_EX_OFFSET 18 #define L_DATA_OFFSET LOAD_HEADER_LEN @@ -570,6 +572,7 @@ public: char* block; uint block_len; uint file_id; + bool inited_from_old; #ifndef MYSQL_CLIENT Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg, const char* table_name_arg, @@ -578,7 +581,7 @@ public: char* block_arg, uint block_len_arg); #endif - Create_file_log_event(const char* buf, int event_len); + Create_file_log_event(const char* buf, int event_len, bool old_format); ~Create_file_log_event() { } @@ -591,7 +594,7 @@ public: 4 + 1 + block_len;} int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD: LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; } - bool is_valid() { return block != 0; } + bool is_valid() { return inited_from_old || block != 0; } int write_data_header(IO_CACHE* file); int write_data_body(IO_CACHE* file); int write_base(IO_CACHE* file); // cut out Create_file extentions and diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 81604a7ecfd..fc804425f08 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -48,6 +48,7 @@ char *sql_strmake(const char *str,uint len); gptr sql_memdup(const void * ptr,unsigned size); void sql_element_free(void *ptr); void kill_one_thread(THD *thd, ulong id); +int net_request_file(NET* net, const char* fname); char* query_table_status(THD *thd,const char *db,const char *table_name); #define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); } diff --git a/sql/net_serv.cc b/sql/net_serv.cc index 7a1d25e980d..811f36bd82e 100644 --- a/sql/net_serv.cc +++ b/sql/net_serv.cc @@ -814,3 +814,14 @@ my_net_read(NET *net) #endif /* HAVE_COMPRESS */ return len; } + +int net_request_file(NET* net, const char* fname) +{ + char tmp [FN_REFLEN+1],*end; + DBUG_ENTER("net_request_file"); + tmp[0] = (char) 251; /* NULL_LENGTH */ + end=strnmov(tmp+1,fname,sizeof(tmp)-2); + DBUG_RETURN(my_net_write(net,tmp,(uint) (end-tmp)) || + net_flush(net)); +} + diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index 6cc7ef7404b..257418d1682 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -828,6 +828,7 @@ int load_master_data(THD* thd) active_mi->rli.master_log_pos = active_mi->master_log_pos; strnmov(active_mi->rli.master_log_name,active_mi->master_log_name, sizeof(active_mi->rli.master_log_name)); + flush_relay_log_info(&active_mi->rli); pthread_cond_broadcast(&active_mi->rli.data_cond); pthread_mutex_unlock(&active_mi->rli.data_lock); thd->proc_info = "starting slave"; diff --git a/sql/slave.cc b/sql/slave.cc index 8884e5de778..86db6efa34d 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -55,6 +55,7 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE; void skip_load_data_infile(NET* net); static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev); +static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev); static int queue_old_event(MASTER_INFO* mi, const char* buf, uint event_len); static inline bool slave_killed(THD* thd,MASTER_INFO* mi); @@ -654,7 +655,7 @@ char* rewrite_db(char* db) int db_ok(const char* db, I_List<i_string> &do_list, I_List<i_string> &ignore_list ) { - if(do_list.is_empty() && ignore_list.is_empty()) + if (do_list.is_empty() && ignore_list.is_empty()) return 1; // ok to replicate if the user puts no constraints // if the user has specified restrictions on which databases to replicate @@ -1058,6 +1059,8 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, if (init_relay_log_info(&mi->rli, slave_info_fname)) return 1; mi->rli.mi = mi; + mi->mysql=0; + mi->file_id=1; mi->ignore_stop_event=0; int fd,error; MY_STAT stat_area; @@ -1621,7 +1624,7 @@ slave_begin: DBUG_PRINT("info",("master info: log_file_name=%s, position=%s", mi->master_log_name, llstr(mi->master_log_pos,llbuff))); - if (!(mysql = mc_mysql_init(NULL))) + if (!(mi->mysql = mysql = mc_mysql_init(NULL))) { sql_print_error("Slave I/O thread: error in mc_mysql_init()"); goto err; @@ -1780,8 +1783,11 @@ err: sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s", IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); thd->query = thd->db = 0; // extra safety - if(mysql) + if (mysql) + { mc_mysql_close(mysql); + mi->mysql=0; + } thd->proc_info = "Waiting for slave mutex on exit"; pthread_mutex_lock(&mi->run_lock); mi->slave_running = 0; @@ -1790,7 +1796,7 @@ err: change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); mi->abort_slave = 0; // TODO: check if this is needed DBUG_ASSERT(thd->net.buff != 0); - net_end(&thd->net); // destructor will not free it, because we are weird + net_end(&thd->net); // destructor will not free it, because net.vio is 0 pthread_mutex_lock(&LOCK_thread_count); delete thd; pthread_mutex_unlock(&LOCK_thread_count); @@ -1926,11 +1932,97 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ DBUG_RETURN(0); // Can't return anything here } +static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) +{ + int error = 1; + ulong num_bytes; + bool cev_not_written; + THD* thd; + NET* net = &mi->mysql->net; + + if (unlikely(!cev->is_valid())) + return 1; + /* + TODO: fix to honor table rules, not only db rules + */ + if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db)) + { + skip_load_data_infile(net); + return 0; + } + DBUG_ASSERT(cev->inited_from_old); + thd = mi->io_thd; + thd->file_id = cev->file_id = mi->file_id++; + cev_not_written = 1; + + if (unlikely(net_request_file(net,cev->fname))) + { + sql_print_error("Slave I/O: failed requesting download of '%s'", + cev->fname); + goto err; + } + + /* this dummy block is so we could insantiate Append_block_log_event + once and then modify it slightly instead of doing it multiple times + in the loop + */ + { + Append_block_log_event aev(thd,0,0); + + for (;;) + { + if (unlikely((num_bytes=my_net_read(net)) == packet_error)) + { + sql_print_error("Network read error downloading '%s' from master", + cev->fname); + goto err; + } + if (unlikely(!num_bytes)) /* eof */ + { + send_ok(net); /* 3.23 master wants it */ + Execute_load_log_event xev(mi->io_thd); + if (unlikely(mi->rli.relay_log.append(&xev))) + { + sql_print_error("Slave I/O: error writing Exec_load event to \ +relay log"); + goto err; + } + break; + } + if (unlikely(cev_not_written)) + { + cev->block = (char*)net->read_pos; + cev->block_len = num_bytes; + if (unlikely(mi->rli.relay_log.append(cev))) + { + sql_print_error("Slave I/O: error writing Create_file event to \ +relay log"); + goto err; + } + cev_not_written=0; + } + else + { + aev.block = (char*)net->read_pos; + aev.block_len = num_bytes; + if (unlikely(mi->rli.relay_log.append(&aev))) + { + sql_print_error("Slave I/O: error writing Append_block event to \ +relay log"); + goto err; + } + } + } + } + error=0; +err: + return error; +} // We assume we already locked mi->data_lock static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev) { - if (!rev->is_valid()) + if (unlikely(!rev->is_valid())) return 1; DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name)); memcpy(mi->master_log_name,rev->new_log_ident, @@ -1961,6 +2053,21 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, const char *errmsg = 0; bool inc_pos = 1; bool processed_stop_event = 0; + char* tmp_buf = 0; + /* if we get Load event, we need to pass a non-reusable buffer + to read_log_event, so we do a trick + */ + if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) + { + if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME))))) + { + sql_print_error("Slave I/O: out of memory for Load event"); + return 1; + } + memcpy(tmp_buf,buf,event_len); + tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer + buf = (const char*)tmp_buf; + } Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg, 1 /*old format*/ ); if (unlikely(!ev)) @@ -1968,6 +2075,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, sql_print_error("Read invalid event from master: '%s',\ master could be corrupt but a more likely cause of this is a bug", errmsg); + my_free((char*)tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); return 1; } pthread_mutex_lock(&mi->data_lock); @@ -1978,6 +2086,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, { delete ev; pthread_mutex_unlock(&mi->data_lock); + DBUG_ASSERT(!tmp_buf); return 1; } mi->ignore_stop_event=1; @@ -1986,12 +2095,16 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, case STOP_EVENT: processed_stop_event=1; break; - case LOAD_EVENT: - // TODO: actually process it - mi->master_log_pos += event_len; + case CREATE_FILE_EVENT: + { + int error = process_io_create_file(mi,(Create_file_log_event*)ev); delete ev; + mi->master_log_pos += event_len; pthread_mutex_unlock(&mi->data_lock); - return 0; + DBUG_ASSERT(tmp_buf); + my_free((char*)tmp_buf, MYF(0)); + return error; + } default: mi->ignore_stop_event=0; break; @@ -2002,6 +2115,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, { delete ev; pthread_mutex_unlock(&mi->data_lock); + DBUG_ASSERT(!tmp_buf); return 1; } } @@ -2011,6 +2125,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, if (unlikely(processed_stop_event)) mi->ignore_stop_event=1; pthread_mutex_unlock(&mi->data_lock); + DBUG_ASSERT(!tmp_buf); return 0; } @@ -2173,7 +2288,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi) int flush_relay_log_info(RELAY_LOG_INFO* rli) { - IO_CACHE* file = &rli->info_file; + register IO_CACHE* file = &rli->info_file; char lbuf[22],lbuf1[22]; my_b_seek(file, 0L); @@ -2251,7 +2366,10 @@ Log_event* next_event(RELAY_LOG_INFO* rli) } DBUG_ASSERT(my_b_tell(cur_log) >= 4); DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending); - if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format))) + /* relay log is always in new format - if the master is 3.23, the + I/O thread will convert the format for us + */ + if ((ev=Log_event::read_log_event(cur_log,0,(bool)0/*new format*/))) { DBUG_ASSERT(thd==rli->sql_thd); if (hot_log) diff --git a/sql/slave.h b/sql/slave.h index f60f2ce2954..59263a96687 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -254,6 +254,8 @@ typedef struct st_master_info pthread_mutex_t data_lock,run_lock; pthread_cond_t data_cond,start_cond,stop_cond; THD *io_thd; + MYSQL* mysql; + uint32 file_id; // for 3.23 load data infile RELAY_LOG_INFO rli; uint port; uint connect_retry; diff --git a/sql/sql_load.cc b/sql/sql_load.cc index abc9fa5a121..899f2e20469 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -147,12 +147,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, if (read_file_from_client && (thd->client_capabilities & CLIENT_LOCAL_FILES)) { - char tmp [FN_REFLEN+1],*end; - DBUG_PRINT("info",("reading local file")); - tmp[0] = (char) 251; /* NULL_LENGTH */ - end=strnmov(tmp+1,ex->file_name,sizeof(tmp)-2); - (void) my_net_write(&thd->net,tmp,(uint) (end-tmp)); - (void) net_flush(&thd->net); + (void)net_request_file(&thd->net,ex->file_name); file = -1; } else |