summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mysql-test/r/rpl000001.result2
-rw-r--r--sql/log_event.cc64
-rw-r--r--sql/log_event.h11
-rw-r--r--sql/mysql_priv.h1
-rw-r--r--sql/net_serv.cc11
-rw-r--r--sql/repl_failsafe.cc1
-rw-r--r--sql/slave.cc140
-rw-r--r--sql/slave.h2
-rw-r--r--sql/sql_load.cc7
9 files changed, 195 insertions, 44 deletions
diff --git a/mysql-test/r/rpl000001.result b/mysql-test/r/rpl000001.result
index 3dae52c6d3f..3767de9ad8d 100644
--- a/mysql-test/r/rpl000001.result
+++ b/mysql-test/r/rpl000001.result
@@ -7,7 +7,7 @@ use test;
drop table if exists t1,t3;
create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
-load data local infile '/home/sasha/bk/mysql-4.0/mysql-test/std_data/words.dat' into table t1;
+load data local infile '$MYSQL_TEST_DIR/std_data/words.dat' into table t1;
select * from t1;
word
Aarhus
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 648e9175e13..528110deb74 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -556,6 +556,8 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Query_log_event(buf, event_len, old_format);
break;
case LOAD_EVENT:
+ ev = new Create_file_log_event(buf, event_len, old_format);
+ break;
case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len, old_format);
break;
@@ -566,7 +568,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Slave_log_event(buf, event_len);
break;
case CREATE_FILE_EVENT:
- ev = new Create_file_log_event(buf, event_len);
+ ev = new Create_file_log_event(buf, event_len, old_format);
break;
case APPEND_BLOCK_EVENT:
ev = new Append_block_log_event(buf, event_len);
@@ -959,6 +961,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
if (use_new_format)
{
empty_flags=0;
+ /* the code below assumes that buf will not disappear from
+ under our feet during the lifetime of the event. This assumption
+ holds true in the slave thread if the log is in new format, but is not
+ the case when we have old format because we will be reusing net buffer
+ to read the actual file before we write out the Create_file event
+ */
if (read_str(buf, buf_end, field_term, field_term_len) ||
read_str(buf, buf_end, enclosed, enclosed_len) ||
read_str(buf, buf_end, line_term, line_term_len) ||
@@ -970,11 +978,11 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
else
{
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
- *field_term=*buf++;
- *enclosed= *buf++;
- *line_term= *buf++;
- *line_start=*buf++;
- *escaped= *buf++;
+ field_term = buf++;
+ enclosed= buf++;
+ line_term= buf++;
+ line_start= buf++;
+ escaped= buf++;
opt_flags = *buf++;
empty_flags=*buf++;
if (empty_flags & FIELD_TERM_EMPTY)
@@ -1095,7 +1103,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
db_len = (uint)data_head[L_DB_LEN_OFFSET];
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
- int body_offset = get_data_body_offset();
+ int body_offset = (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ LOAD_HEADER_LEN + OLD_HEADER_LEN : get_data_body_offset();
+
if ((int) event_len < body_offset)
return 1;
//sql_ex.init() on success returns the pointer to the first byte after
@@ -1117,7 +1127,6 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
table_name = fields + field_block_len;
db = table_name + table_name_len + 1;
fname = db + db_len + 1;
- int type_code = get_type_code();
fname_len = strlen(fname);
// null termination is accomplished by the caller doing buf[event_len]=0
return 0;
@@ -1367,20 +1376,29 @@ int Create_file_log_event::write_base(IO_CACHE* file)
return res;
}
-Create_file_log_event::Create_file_log_event(const char* buf, int len):
- Load_log_event(buf,0,0),fake_base(0),block(0)
+Create_file_log_event::Create_file_log_event(const char* buf, int len,
+ bool old_format):
+ Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0)
{
int block_offset;
- if (copy_log_event(buf,len,0))
- return;
- file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
- + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
- block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
- CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
- if (len < block_offset)
+ if (copy_log_event(buf,len,old_format))
return;
- block = (char*)buf + block_offset;
- block_len = len - block_offset;
+ if (!old_format)
+ {
+ file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
+ + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
+ block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
+ CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
+ if (len < block_offset)
+ return;
+ block = (char*)buf + block_offset;
+ block_len = len - block_offset;
+ }
+ else
+ {
+ sql_ex.force_new_format();
+ inited_from_old = 1;
+ }
}
@@ -1568,6 +1586,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db);
+ DBUG_ASSERT(q_len == strlen(query));
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->query = (char*)query;
@@ -1739,11 +1758,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
+ close_temporary_tables(thd);
+ // if we have old format, load_tmpdir is cleaned up by the I/O thread
+ // TODO: cleanup_load_tmpdir() needs to remove only the files associated
+ // with the server id that has just started
if (!rli->mi->old_format)
- {
- close_temporary_tables(thd);
cleanup_load_tmpdir();
- }
return Log_event::exec_event(rli);
}
diff --git a/sql/log_event.h b/sql/log_event.h
index 089d9589763..a29c3952d46 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -64,6 +64,8 @@ struct old_sql_ex
char empty_flags;
};
+#define NUM_LOAD_DELIM_STRS 5
+
struct sql_ex_info
{
@@ -153,8 +155,8 @@ struct sql_ex_info
#define L_THREAD_ID_OFFSET 0
#define L_EXEC_TIME_OFFSET 4
#define L_SKIP_LINES_OFFSET 8
-#define L_DB_LEN_OFFSET 12
-#define L_TBL_LEN_OFFSET 13
+#define L_TBL_LEN_OFFSET 12
+#define L_DB_LEN_OFFSET 13
#define L_NUM_FIELDS_OFFSET 14
#define L_SQL_EX_OFFSET 18
#define L_DATA_OFFSET LOAD_HEADER_LEN
@@ -570,6 +572,7 @@ public:
char* block;
uint block_len;
uint file_id;
+ bool inited_from_old;
#ifndef MYSQL_CLIENT
Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg,
const char* table_name_arg,
@@ -578,7 +581,7 @@ public:
char* block_arg, uint block_len_arg);
#endif
- Create_file_log_event(const char* buf, int event_len);
+ Create_file_log_event(const char* buf, int event_len, bool old_format);
~Create_file_log_event()
{
}
@@ -591,7 +594,7 @@ public:
4 + 1 + block_len;}
int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD:
LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; }
- bool is_valid() { return block != 0; }
+ bool is_valid() { return inited_from_old || block != 0; }
int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file);
int write_base(IO_CACHE* file); // cut out Create_file extentions and
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 81604a7ecfd..fc804425f08 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -48,6 +48,7 @@ char *sql_strmake(const char *str,uint len);
gptr sql_memdup(const void * ptr,unsigned size);
void sql_element_free(void *ptr);
void kill_one_thread(THD *thd, ulong id);
+int net_request_file(NET* net, const char* fname);
char* query_table_status(THD *thd,const char *db,const char *table_name);
#define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); }
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index 7a1d25e980d..811f36bd82e 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -814,3 +814,14 @@ my_net_read(NET *net)
#endif /* HAVE_COMPRESS */
return len;
}
+
+int net_request_file(NET* net, const char* fname)
+{
+ char tmp [FN_REFLEN+1],*end;
+ DBUG_ENTER("net_request_file");
+ tmp[0] = (char) 251; /* NULL_LENGTH */
+ end=strnmov(tmp+1,fname,sizeof(tmp)-2);
+ DBUG_RETURN(my_net_write(net,tmp,(uint) (end-tmp)) ||
+ net_flush(net));
+}
+
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index 6cc7ef7404b..257418d1682 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -828,6 +828,7 @@ int load_master_data(THD* thd)
active_mi->rli.master_log_pos = active_mi->master_log_pos;
strnmov(active_mi->rli.master_log_name,active_mi->master_log_name,
sizeof(active_mi->rli.master_log_name));
+ flush_relay_log_info(&active_mi->rli);
pthread_cond_broadcast(&active_mi->rli.data_cond);
pthread_mutex_unlock(&active_mi->rli.data_lock);
thd->proc_info = "starting slave";
diff --git a/sql/slave.cc b/sql/slave.cc
index 8884e5de778..86db6efa34d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -55,6 +55,7 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
+static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len);
static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
@@ -654,7 +655,7 @@ char* rewrite_db(char* db)
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list )
{
- if(do_list.is_empty() && ignore_list.is_empty())
+ if (do_list.is_empty() && ignore_list.is_empty())
return 1; // ok to replicate if the user puts no constraints
// if the user has specified restrictions on which databases to replicate
@@ -1058,6 +1059,8 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
if (init_relay_log_info(&mi->rli, slave_info_fname))
return 1;
mi->rli.mi = mi;
+ mi->mysql=0;
+ mi->file_id=1;
mi->ignore_stop_event=0;
int fd,error;
MY_STAT stat_area;
@@ -1621,7 +1624,7 @@ slave_begin:
DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
mi->master_log_name, llstr(mi->master_log_pos,llbuff)));
- if (!(mysql = mc_mysql_init(NULL)))
+ if (!(mi->mysql = mysql = mc_mysql_init(NULL)))
{
sql_print_error("Slave I/O thread: error in mc_mysql_init()");
goto err;
@@ -1780,8 +1783,11 @@ err:
sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
thd->query = thd->db = 0; // extra safety
- if(mysql)
+ if (mysql)
+ {
mc_mysql_close(mysql);
+ mi->mysql=0;
+ }
thd->proc_info = "Waiting for slave mutex on exit";
pthread_mutex_lock(&mi->run_lock);
mi->slave_running = 0;
@@ -1790,7 +1796,7 @@ err:
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
mi->abort_slave = 0; // TODO: check if this is needed
DBUG_ASSERT(thd->net.buff != 0);
- net_end(&thd->net); // destructor will not free it, because we are weird
+ net_end(&thd->net); // destructor will not free it, because net.vio is 0
pthread_mutex_lock(&LOCK_thread_count);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -1926,11 +1932,97 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_RETURN(0); // Can't return anything here
}
+static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
+{
+ int error = 1;
+ ulong num_bytes;
+ bool cev_not_written;
+ THD* thd;
+ NET* net = &mi->mysql->net;
+
+ if (unlikely(!cev->is_valid()))
+ return 1;
+ /*
+ TODO: fix to honor table rules, not only db rules
+ */
+ if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
+ {
+ skip_load_data_infile(net);
+ return 0;
+ }
+ DBUG_ASSERT(cev->inited_from_old);
+ thd = mi->io_thd;
+ thd->file_id = cev->file_id = mi->file_id++;
+ cev_not_written = 1;
+
+ if (unlikely(net_request_file(net,cev->fname)))
+ {
+ sql_print_error("Slave I/O: failed requesting download of '%s'",
+ cev->fname);
+ goto err;
+ }
+
+ /* this dummy block is so we could insantiate Append_block_log_event
+ once and then modify it slightly instead of doing it multiple times
+ in the loop
+ */
+ {
+ Append_block_log_event aev(thd,0,0);
+
+ for (;;)
+ {
+ if (unlikely((num_bytes=my_net_read(net)) == packet_error))
+ {
+ sql_print_error("Network read error downloading '%s' from master",
+ cev->fname);
+ goto err;
+ }
+ if (unlikely(!num_bytes)) /* eof */
+ {
+ send_ok(net); /* 3.23 master wants it */
+ Execute_load_log_event xev(mi->io_thd);
+ if (unlikely(mi->rli.relay_log.append(&xev)))
+ {
+ sql_print_error("Slave I/O: error writing Exec_load event to \
+relay log");
+ goto err;
+ }
+ break;
+ }
+ if (unlikely(cev_not_written))
+ {
+ cev->block = (char*)net->read_pos;
+ cev->block_len = num_bytes;
+ if (unlikely(mi->rli.relay_log.append(cev)))
+ {
+ sql_print_error("Slave I/O: error writing Create_file event to \
+relay log");
+ goto err;
+ }
+ cev_not_written=0;
+ }
+ else
+ {
+ aev.block = (char*)net->read_pos;
+ aev.block_len = num_bytes;
+ if (unlikely(mi->rli.relay_log.append(&aev)))
+ {
+ sql_print_error("Slave I/O: error writing Append_block event to \
+relay log");
+ goto err;
+ }
+ }
+ }
+ }
+ error=0;
+err:
+ return error;
+}
// We assume we already locked mi->data_lock
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev)
{
- if (!rev->is_valid())
+ if (unlikely(!rev->is_valid()))
return 1;
DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name));
memcpy(mi->master_log_name,rev->new_log_ident,
@@ -1961,6 +2053,21 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
const char *errmsg = 0;
bool inc_pos = 1;
bool processed_stop_event = 0;
+ char* tmp_buf = 0;
+ /* if we get Load event, we need to pass a non-reusable buffer
+ to read_log_event, so we do a trick
+ */
+ if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
+ {
+ if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
+ {
+ sql_print_error("Slave I/O: out of memory for Load event");
+ return 1;
+ }
+ memcpy(tmp_buf,buf,event_len);
+ tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
+ buf = (const char*)tmp_buf;
+ }
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
1 /*old format*/ );
if (unlikely(!ev))
@@ -1968,6 +2075,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
+ my_free((char*)tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
return 1;
}
pthread_mutex_lock(&mi->data_lock);
@@ -1978,6 +2086,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{
delete ev;
pthread_mutex_unlock(&mi->data_lock);
+ DBUG_ASSERT(!tmp_buf);
return 1;
}
mi->ignore_stop_event=1;
@@ -1986,12 +2095,16 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
case STOP_EVENT:
processed_stop_event=1;
break;
- case LOAD_EVENT:
- // TODO: actually process it
- mi->master_log_pos += event_len;
+ case CREATE_FILE_EVENT:
+ {
+ int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev;
+ mi->master_log_pos += event_len;
pthread_mutex_unlock(&mi->data_lock);
- return 0;
+ DBUG_ASSERT(tmp_buf);
+ my_free((char*)tmp_buf, MYF(0));
+ return error;
+ }
default:
mi->ignore_stop_event=0;
break;
@@ -2002,6 +2115,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{
delete ev;
pthread_mutex_unlock(&mi->data_lock);
+ DBUG_ASSERT(!tmp_buf);
return 1;
}
}
@@ -2011,6 +2125,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
if (unlikely(processed_stop_event))
mi->ignore_stop_event=1;
pthread_mutex_unlock(&mi->data_lock);
+ DBUG_ASSERT(!tmp_buf);
return 0;
}
@@ -2173,7 +2288,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
int flush_relay_log_info(RELAY_LOG_INFO* rli)
{
- IO_CACHE* file = &rli->info_file;
+ register IO_CACHE* file = &rli->info_file;
char lbuf[22],lbuf1[22];
my_b_seek(file, 0L);
@@ -2251,7 +2366,10 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
}
DBUG_ASSERT(my_b_tell(cur_log) >= 4);
DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
- if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format)))
+ /* relay log is always in new format - if the master is 3.23, the
+ I/O thread will convert the format for us
+ */
+ if ((ev=Log_event::read_log_event(cur_log,0,(bool)0/*new format*/)))
{
DBUG_ASSERT(thd==rli->sql_thd);
if (hot_log)
diff --git a/sql/slave.h b/sql/slave.h
index f60f2ce2954..59263a96687 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -254,6 +254,8 @@ typedef struct st_master_info
pthread_mutex_t data_lock,run_lock;
pthread_cond_t data_cond,start_cond,stop_cond;
THD *io_thd;
+ MYSQL* mysql;
+ uint32 file_id; // for 3.23 load data infile
RELAY_LOG_INFO rli;
uint port;
uint connect_retry;
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index abc9fa5a121..899f2e20469 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -147,12 +147,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (read_file_from_client && (thd->client_capabilities & CLIENT_LOCAL_FILES))
{
- char tmp [FN_REFLEN+1],*end;
- DBUG_PRINT("info",("reading local file"));
- tmp[0] = (char) 251; /* NULL_LENGTH */
- end=strnmov(tmp+1,ex->file_name,sizeof(tmp)-2);
- (void) my_net_write(&thd->net,tmp,(uint) (end-tmp));
- (void) net_flush(&thd->net);
+ (void)net_request_file(&thd->net,ex->file_name);
file = -1;
}
else