summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunknown <sasha@mysql.sashanet.com>2002-02-07 15:29:46 -0700
committerunknown <sasha@mysql.sashanet.com>2002-02-07 15:29:46 -0700
commit0d876509340f0ba5ae67b4645137c863182ab9f4 (patch)
treed9192c46a4eb4d83decc75bbeac94c01eba13040
parent552656c3ba5e3604fabc656f8b7dec805febb91b (diff)
downloadmariadb-git-0d876509340f0ba5ae67b4645137c863182ab9f4.tar.gz
fixes for slave backward compat
fixed bug in LOAD DATA FROM MASTER fixed rpl000001.result Slave now replicates 3.23 master, with the exception of LOAD DATA INFILE, which is still buggy. Will push this one after the pull/merge mysql-test/r/rpl000001.result: fixed bug in result sql/log_event.cc: fixes for slave backward compat sql/log_event.h: fixes for slave backward compat sql/mysql_priv.h: fixes for slave backward compat sql/net_serv.cc: fixes for slave backward compat sql/repl_failsafe.cc: fixed bug in LOAD DATA FROM MASTER sql/slave.cc: fixes for slave backward compat sql/slave.h: fixes for slave backward compat sql/sql_load.cc: fixes for slave backward compat
-rw-r--r--mysql-test/r/rpl000001.result2
-rw-r--r--sql/log_event.cc64
-rw-r--r--sql/log_event.h11
-rw-r--r--sql/mysql_priv.h1
-rw-r--r--sql/net_serv.cc11
-rw-r--r--sql/repl_failsafe.cc1
-rw-r--r--sql/slave.cc140
-rw-r--r--sql/slave.h2
-rw-r--r--sql/sql_load.cc7
9 files changed, 195 insertions, 44 deletions
diff --git a/mysql-test/r/rpl000001.result b/mysql-test/r/rpl000001.result
index 3dae52c6d3f..3767de9ad8d 100644
--- a/mysql-test/r/rpl000001.result
+++ b/mysql-test/r/rpl000001.result
@@ -7,7 +7,7 @@ use test;
drop table if exists t1,t3;
create table t1 (word char(20) not null);
load data infile '../../std_data/words.dat' into table t1;
-load data local infile '/home/sasha/bk/mysql-4.0/mysql-test/std_data/words.dat' into table t1;
+load data local infile '$MYSQL_TEST_DIR/std_data/words.dat' into table t1;
select * from t1;
word
Aarhus
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 648e9175e13..528110deb74 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -556,6 +556,8 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Query_log_event(buf, event_len, old_format);
break;
case LOAD_EVENT:
+ ev = new Create_file_log_event(buf, event_len, old_format);
+ break;
case NEW_LOAD_EVENT:
ev = new Load_log_event(buf, event_len, old_format);
break;
@@ -566,7 +568,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
ev = new Slave_log_event(buf, event_len);
break;
case CREATE_FILE_EVENT:
- ev = new Create_file_log_event(buf, event_len);
+ ev = new Create_file_log_event(buf, event_len, old_format);
break;
case APPEND_BLOCK_EVENT:
ev = new Append_block_log_event(buf, event_len);
@@ -959,6 +961,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
if (use_new_format)
{
empty_flags=0;
+ /* the code below assumes that buf will not disappear from
+ under our feet during the lifetime of the event. This assumption
+ holds true in the slave thread if the log is in new format, but is not
+ the case when we have old format because we will be reusing net buffer
+ to read the actual file before we write out the Create_file event
+ */
if (read_str(buf, buf_end, field_term, field_term_len) ||
read_str(buf, buf_end, enclosed, enclosed_len) ||
read_str(buf, buf_end, line_term, line_term_len) ||
@@ -970,11 +978,11 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
else
{
field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1;
- *field_term=*buf++;
- *enclosed= *buf++;
- *line_term= *buf++;
- *line_start=*buf++;
- *escaped= *buf++;
+ field_term = buf++;
+ enclosed= buf++;
+ line_term= buf++;
+ line_start= buf++;
+ escaped= buf++;
opt_flags = *buf++;
empty_flags=*buf++;
if (empty_flags & FIELD_TERM_EMPTY)
@@ -1095,7 +1103,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
db_len = (uint)data_head[L_DB_LEN_OFFSET];
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
- int body_offset = get_data_body_offset();
+ int body_offset = (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ LOAD_HEADER_LEN + OLD_HEADER_LEN : get_data_body_offset();
+
if ((int) event_len < body_offset)
return 1;
//sql_ex.init() on success returns the pointer to the first byte after
@@ -1117,7 +1127,6 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
table_name = fields + field_block_len;
db = table_name + table_name_len + 1;
fname = db + db_len + 1;
- int type_code = get_type_code();
fname_len = strlen(fname);
// null termination is accomplished by the caller doing buf[event_len]=0
return 0;
@@ -1367,20 +1376,29 @@ int Create_file_log_event::write_base(IO_CACHE* file)
return res;
}
-Create_file_log_event::Create_file_log_event(const char* buf, int len):
- Load_log_event(buf,0,0),fake_base(0),block(0)
+Create_file_log_event::Create_file_log_event(const char* buf, int len,
+ bool old_format):
+ Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0)
{
int block_offset;
- if (copy_log_event(buf,len,0))
- return;
- file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
- + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
- block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
- CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
- if (len < block_offset)
+ if (copy_log_event(buf,len,old_format))
return;
- block = (char*)buf + block_offset;
- block_len = len - block_offset;
+ if (!old_format)
+ {
+ file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
+ + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
+ block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
+ CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname
+ if (len < block_offset)
+ return;
+ block = (char*)buf + block_offset;
+ block_len = len - block_offset;
+ }
+ else
+ {
+ sql_ex.force_new_format();
+ inited_from_old = 1;
+ }
}
@@ -1568,6 +1586,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
thd->db = rewrite_db((char*)db);
+ DBUG_ASSERT(q_len == strlen(query));
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->query = (char*)query;
@@ -1739,11 +1758,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
+ close_temporary_tables(thd);
+ // if we have old format, load_tmpdir is cleaned up by the I/O thread
+ // TODO: cleanup_load_tmpdir() needs to remove only the files associated
+ // with the server id that has just started
if (!rli->mi->old_format)
- {
- close_temporary_tables(thd);
cleanup_load_tmpdir();
- }
return Log_event::exec_event(rli);
}
diff --git a/sql/log_event.h b/sql/log_event.h
index 089d9589763..a29c3952d46 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -64,6 +64,8 @@ struct old_sql_ex
char empty_flags;
};
+#define NUM_LOAD_DELIM_STRS 5
+
struct sql_ex_info
{
@@ -153,8 +155,8 @@ struct sql_ex_info
#define L_THREAD_ID_OFFSET 0
#define L_EXEC_TIME_OFFSET 4
#define L_SKIP_LINES_OFFSET 8
-#define L_DB_LEN_OFFSET 12
-#define L_TBL_LEN_OFFSET 13
+#define L_TBL_LEN_OFFSET 12
+#define L_DB_LEN_OFFSET 13
#define L_NUM_FIELDS_OFFSET 14
#define L_SQL_EX_OFFSET 18
#define L_DATA_OFFSET LOAD_HEADER_LEN
@@ -570,6 +572,7 @@ public:
char* block;
uint block_len;
uint file_id;
+ bool inited_from_old;
#ifndef MYSQL_CLIENT
Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg,
const char* table_name_arg,
@@ -578,7 +581,7 @@ public:
char* block_arg, uint block_len_arg);
#endif
- Create_file_log_event(const char* buf, int event_len);
+ Create_file_log_event(const char* buf, int event_len, bool old_format);
~Create_file_log_event()
{
}
@@ -591,7 +594,7 @@ public:
4 + 1 + block_len;}
int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD:
LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; }
- bool is_valid() { return block != 0; }
+ bool is_valid() { return inited_from_old || block != 0; }
int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file);
int write_base(IO_CACHE* file); // cut out Create_file extentions and
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 81604a7ecfd..fc804425f08 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -48,6 +48,7 @@ char *sql_strmake(const char *str,uint len);
gptr sql_memdup(const void * ptr,unsigned size);
void sql_element_free(void *ptr);
void kill_one_thread(THD *thd, ulong id);
+int net_request_file(NET* net, const char* fname);
char* query_table_status(THD *thd,const char *db,const char *table_name);
#define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); }
diff --git a/sql/net_serv.cc b/sql/net_serv.cc
index 7a1d25e980d..811f36bd82e 100644
--- a/sql/net_serv.cc
+++ b/sql/net_serv.cc
@@ -814,3 +814,14 @@ my_net_read(NET *net)
#endif /* HAVE_COMPRESS */
return len;
}
+
+int net_request_file(NET* net, const char* fname)
+{
+ char tmp [FN_REFLEN+1],*end;
+ DBUG_ENTER("net_request_file");
+ tmp[0] = (char) 251; /* NULL_LENGTH */
+ end=strnmov(tmp+1,fname,sizeof(tmp)-2);
+ DBUG_RETURN(my_net_write(net,tmp,(uint) (end-tmp)) ||
+ net_flush(net));
+}
+
diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc
index 6cc7ef7404b..257418d1682 100644
--- a/sql/repl_failsafe.cc
+++ b/sql/repl_failsafe.cc
@@ -828,6 +828,7 @@ int load_master_data(THD* thd)
active_mi->rli.master_log_pos = active_mi->master_log_pos;
strnmov(active_mi->rli.master_log_name,active_mi->master_log_name,
sizeof(active_mi->rli.master_log_name));
+ flush_relay_log_info(&active_mi->rli);
pthread_cond_broadcast(&active_mi->rli.data_cond);
pthread_mutex_unlock(&active_mi->rli.data_lock);
thd->proc_info = "starting slave";
diff --git a/sql/slave.cc b/sql/slave.cc
index 8884e5de778..86db6efa34d 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -55,6 +55,7 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
void skip_load_data_infile(NET* net);
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
+static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev);
static int queue_old_event(MASTER_INFO* mi, const char* buf,
uint event_len);
static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
@@ -654,7 +655,7 @@ char* rewrite_db(char* db)
int db_ok(const char* db, I_List<i_string> &do_list,
I_List<i_string> &ignore_list )
{
- if(do_list.is_empty() && ignore_list.is_empty())
+ if (do_list.is_empty() && ignore_list.is_empty())
return 1; // ok to replicate if the user puts no constraints
// if the user has specified restrictions on which databases to replicate
@@ -1058,6 +1059,8 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
if (init_relay_log_info(&mi->rli, slave_info_fname))
return 1;
mi->rli.mi = mi;
+ mi->mysql=0;
+ mi->file_id=1;
mi->ignore_stop_event=0;
int fd,error;
MY_STAT stat_area;
@@ -1621,7 +1624,7 @@ slave_begin:
DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
mi->master_log_name, llstr(mi->master_log_pos,llbuff)));
- if (!(mysql = mc_mysql_init(NULL)))
+ if (!(mi->mysql = mysql = mc_mysql_init(NULL)))
{
sql_print_error("Slave I/O thread: error in mc_mysql_init()");
goto err;
@@ -1780,8 +1783,11 @@ err:
sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
thd->query = thd->db = 0; // extra safety
- if(mysql)
+ if (mysql)
+ {
mc_mysql_close(mysql);
+ mi->mysql=0;
+ }
thd->proc_info = "Waiting for slave mutex on exit";
pthread_mutex_lock(&mi->run_lock);
mi->slave_running = 0;
@@ -1790,7 +1796,7 @@ err:
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
mi->abort_slave = 0; // TODO: check if this is needed
DBUG_ASSERT(thd->net.buff != 0);
- net_end(&thd->net); // destructor will not free it, because we are weird
+ net_end(&thd->net); // destructor will not free it, because net.vio is 0
pthread_mutex_lock(&LOCK_thread_count);
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -1926,11 +1932,97 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_RETURN(0); // Can't return anything here
}
+static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
+{
+ int error = 1;
+ ulong num_bytes;
+ bool cev_not_written;
+ THD* thd;
+ NET* net = &mi->mysql->net;
+
+ if (unlikely(!cev->is_valid()))
+ return 1;
+ /*
+ TODO: fix to honor table rules, not only db rules
+ */
+ if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
+ {
+ skip_load_data_infile(net);
+ return 0;
+ }
+ DBUG_ASSERT(cev->inited_from_old);
+ thd = mi->io_thd;
+ thd->file_id = cev->file_id = mi->file_id++;
+ cev_not_written = 1;
+
+ if (unlikely(net_request_file(net,cev->fname)))
+ {
+ sql_print_error("Slave I/O: failed requesting download of '%s'",
+ cev->fname);
+ goto err;
+ }
+
+ /* this dummy block is so we could insantiate Append_block_log_event
+ once and then modify it slightly instead of doing it multiple times
+ in the loop
+ */
+ {
+ Append_block_log_event aev(thd,0,0);
+
+ for (;;)
+ {
+ if (unlikely((num_bytes=my_net_read(net)) == packet_error))
+ {
+ sql_print_error("Network read error downloading '%s' from master",
+ cev->fname);
+ goto err;
+ }
+ if (unlikely(!num_bytes)) /* eof */
+ {
+ send_ok(net); /* 3.23 master wants it */
+ Execute_load_log_event xev(mi->io_thd);
+ if (unlikely(mi->rli.relay_log.append(&xev)))
+ {
+ sql_print_error("Slave I/O: error writing Exec_load event to \
+relay log");
+ goto err;
+ }
+ break;
+ }
+ if (unlikely(cev_not_written))
+ {
+ cev->block = (char*)net->read_pos;
+ cev->block_len = num_bytes;
+ if (unlikely(mi->rli.relay_log.append(cev)))
+ {
+ sql_print_error("Slave I/O: error writing Create_file event to \
+relay log");
+ goto err;
+ }
+ cev_not_written=0;
+ }
+ else
+ {
+ aev.block = (char*)net->read_pos;
+ aev.block_len = num_bytes;
+ if (unlikely(mi->rli.relay_log.append(&aev)))
+ {
+ sql_print_error("Slave I/O: error writing Append_block event to \
+relay log");
+ goto err;
+ }
+ }
+ }
+ }
+ error=0;
+err:
+ return error;
+}
// We assume we already locked mi->data_lock
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev)
{
- if (!rev->is_valid())
+ if (unlikely(!rev->is_valid()))
return 1;
DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name));
memcpy(mi->master_log_name,rev->new_log_ident,
@@ -1961,6 +2053,21 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
const char *errmsg = 0;
bool inc_pos = 1;
bool processed_stop_event = 0;
+ char* tmp_buf = 0;
+ /* if we get Load event, we need to pass a non-reusable buffer
+ to read_log_event, so we do a trick
+ */
+ if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
+ {
+ if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
+ {
+ sql_print_error("Slave I/O: out of memory for Load event");
+ return 1;
+ }
+ memcpy(tmp_buf,buf,event_len);
+ tmp_buf[event_len]=0; // Create_file constructor wants null-term buffer
+ buf = (const char*)tmp_buf;
+ }
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
1 /*old format*/ );
if (unlikely(!ev))
@@ -1968,6 +2075,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
+ my_free((char*)tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
return 1;
}
pthread_mutex_lock(&mi->data_lock);
@@ -1978,6 +2086,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{
delete ev;
pthread_mutex_unlock(&mi->data_lock);
+ DBUG_ASSERT(!tmp_buf);
return 1;
}
mi->ignore_stop_event=1;
@@ -1986,12 +2095,16 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
case STOP_EVENT:
processed_stop_event=1;
break;
- case LOAD_EVENT:
- // TODO: actually process it
- mi->master_log_pos += event_len;
+ case CREATE_FILE_EVENT:
+ {
+ int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev;
+ mi->master_log_pos += event_len;
pthread_mutex_unlock(&mi->data_lock);
- return 0;
+ DBUG_ASSERT(tmp_buf);
+ my_free((char*)tmp_buf, MYF(0));
+ return error;
+ }
default:
mi->ignore_stop_event=0;
break;
@@ -2002,6 +2115,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{
delete ev;
pthread_mutex_unlock(&mi->data_lock);
+ DBUG_ASSERT(!tmp_buf);
return 1;
}
}
@@ -2011,6 +2125,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
if (unlikely(processed_stop_event))
mi->ignore_stop_event=1;
pthread_mutex_unlock(&mi->data_lock);
+ DBUG_ASSERT(!tmp_buf);
return 0;
}
@@ -2173,7 +2288,7 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
int flush_relay_log_info(RELAY_LOG_INFO* rli)
{
- IO_CACHE* file = &rli->info_file;
+ register IO_CACHE* file = &rli->info_file;
char lbuf[22],lbuf1[22];
my_b_seek(file, 0L);
@@ -2251,7 +2366,10 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
}
DBUG_ASSERT(my_b_tell(cur_log) >= 4);
DBUG_ASSERT(my_b_tell(cur_log) == rli->relay_log_pos + rli->pending);
- if ((ev=Log_event::read_log_event(cur_log,0,rli->mi->old_format)))
+ /* relay log is always in new format - if the master is 3.23, the
+ I/O thread will convert the format for us
+ */
+ if ((ev=Log_event::read_log_event(cur_log,0,(bool)0/*new format*/)))
{
DBUG_ASSERT(thd==rli->sql_thd);
if (hot_log)
diff --git a/sql/slave.h b/sql/slave.h
index f60f2ce2954..59263a96687 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -254,6 +254,8 @@ typedef struct st_master_info
pthread_mutex_t data_lock,run_lock;
pthread_cond_t data_cond,start_cond,stop_cond;
THD *io_thd;
+ MYSQL* mysql;
+ uint32 file_id; // for 3.23 load data infile
RELAY_LOG_INFO rli;
uint port;
uint connect_retry;
diff --git a/sql/sql_load.cc b/sql/sql_load.cc
index abc9fa5a121..899f2e20469 100644
--- a/sql/sql_load.cc
+++ b/sql/sql_load.cc
@@ -147,12 +147,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
if (read_file_from_client && (thd->client_capabilities & CLIENT_LOCAL_FILES))
{
- char tmp [FN_REFLEN+1],*end;
- DBUG_PRINT("info",("reading local file"));
- tmp[0] = (char) 251; /* NULL_LENGTH */
- end=strnmov(tmp+1,ex->file_name,sizeof(tmp)-2);
- (void) my_net_write(&thd->net,tmp,(uint) (end-tmp));
- (void) net_flush(&thd->net);
+ (void)net_request_file(&thd->net,ex->file_name);
file = -1;
}
else