diff options
author | unknown <sasha@mysql.sashanet.com> | 2001-08-03 15:57:53 -0600 |
---|---|---|
committer | unknown <sasha@mysql.sashanet.com> | 2001-08-03 15:57:53 -0600 |
commit | 0dab9f40e1c52ac00e1ca291785ae7943dea577e (patch) | |
tree | 7a15577166ddda4e7b8c9c9cb80471777aeb717b | |
parent | 07ed42de3177805c3e1f5d8c7664d91ef0015d62 (diff) | |
download | mariadb-git-0dab9f40e1c52ac00e1ca291785ae7943dea577e.tar.gz |
LOAD DATA INFILE is now replicated properly, except for cleanup on
Stop event and bugs the test suite could not catch
Did some big restructuring of binlog event classes - most important
change is that now each event class has exec_event method and one does
not need to modify slave core code to add a new event. Slave code is
now much smaller and easier to read
include/my_sys.h:
pre_code and arg in IO_CACHE
mysql-test/r/rpl_log.result:
updated result for LOAD DATA INFILE fix
mysys/mf_iocache.c:
pre_close routine and arg pointer for callback magic
sql/log.cc:
changed MYSQL_LOG so that write() method is for generic
Log_event - removed redundant code
sql/log_event.cc:
added classes for file events
added exec_event() method to all classes
restructured/cleaned up event classes
sql/log_event.h:
added classes for file events
added exec_event() method to all classes
restructured/cleaned up event classes
sql/mf_iocache.cc:
pre_close/arg
sql/mysqld.cc:
added slave-load-tmpdir and old-rpl-compat options
sql/slave.cc:
changed exec_event() to use Log_event::exec_event()
some routines are now needed in log_event.cc and cannot be static/inline
general cleanup
sql/slave.h:
some routines are now extern because they are called from log_event.cc
sql/sql_class.cc:
added slave_net
sql/sql_class.h:
added slave_net to THD
MYSQL_LOG::write now handles generic Log_event
sql/sql_load.cc:
changes for new handling of LOAD DATA INFILE replication
sql/sql_repl.cc:
added log_loaded_block() callback for IO_CACHE
sql/sql_repl.h:
added structure to pass args to IO_CACHE callback from mysql_load
-rw-r--r-- | include/my_sys.h | 2 | ||||
-rw-r--r-- | mysql-test/r/rpl_log.result | 26 | ||||
-rw-r--r-- | mysys/mf_iocache.c | 7 | ||||
-rw-r--r-- | sql/log.cc | 74 | ||||
-rw-r--r-- | sql/log_event.cc | 966 | ||||
-rw-r--r-- | sql/log_event.h | 404 | ||||
-rw-r--r-- | sql/mf_iocache.cc | 6 | ||||
-rw-r--r-- | sql/mysqld.cc | 22 | ||||
-rw-r--r-- | sql/slave.cc | 350 | ||||
-rw-r--r-- | sql/slave.h | 16 | ||||
-rw-r--r-- | sql/sql_class.cc | 1 | ||||
-rw-r--r-- | sql/sql_class.h | 6 | ||||
-rw-r--r-- | sql/sql_load.cc | 73 | ||||
-rw-r--r-- | sql/sql_repl.cc | 30 | ||||
-rw-r--r-- | sql/sql_repl.h | 17 |
15 files changed, 1302 insertions, 698 deletions
diff --git a/include/my_sys.h b/include/my_sys.h index 307e286a289..303ae03c903 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -281,6 +281,8 @@ typedef struct st_io_cache /* Used when cacheing files */ /* callbacks when the actual read I/O happens */ IO_CACHE_CALLBACK pre_read; IO_CACHE_CALLBACK post_read; + IO_CACHE_CALLBACK pre_close; + void* arg; /* for use by pre/post_read */ char *file_name; /* if used with 'open_cached_file' */ char *dir,*prefix; File file; diff --git a/mysql-test/r/rpl_log.result b/mysql-test/r/rpl_log.result index 0152c9652e9..f79bd8eccaa 100644 --- a/mysql-test/r/rpl_log.result +++ b/mysql-test/r/rpl_log.result @@ -5,8 +5,9 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1 master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 263 Query 1 5 use test; drop table t1 master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) -master-bin.001 386 Load 1 7 use test; LOAD DATA INFILE '../../std_data/words.dat' INTO TABLE t1 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (word) -master-bin.001 468 Query 1 8 use test; drop table t1 +master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81 +master-bin.001 554 Exec_load 1 8 ;file_id=11 +master-bin.001 577 Query 1 9 use test; drop table t1 Log_name Pos Event_type Server_id Log_seq Info master-bin.001 79 Query 1 2 use test; create table t1(n int not null auto_increment primary key) Log_name Pos Event_type Server_id Log_seq Info @@ -21,10 +22,11 @@ master-bin.001 172 Intvar 1 3 INSERT_ID=1 master-bin.001 200 Query 1 4 use test; insert into t1 values (NULL) master-bin.001 263 Query 1 5 use test; drop table t1 master-bin.001 311 Query 1 6 use test; create table t1 (word char(20) not null) -master-bin.001 386 Load 1 7 use test; LOAD DATA INFILE '../../std_data/words.dat' INTO TABLE t1 FIELDS TERMINATED BY '\\t' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' (word) -master-bin.001 468 Query 1 8 use test; drop table t1 -master-bin.001 516 Rotate 1 9 master-bin.002;pos=4 -master-bin.001 557 Stop 1 10 +master-bin.001 386 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81 +master-bin.001 554 Exec_load 1 8 ;file_id=11 +master-bin.001 577 Query 1 9 use test; drop table t1 +master-bin.001 625 Rotate 1 10 master-bin.002;pos=4 +master-bin.001 666 Stop 1 11 Log_name Pos Event_type Server_id Log_seq Info master-bin.002 4 Start 1 1 Server ver: $VERSION, Binlog ver: 2 master-bin.002 79 Query 1 2 use test; create table t1 (n int) @@ -38,18 +40,20 @@ slave-bin.001 slave-bin.002 Log_name Pos Event_type Server_id Log_seq Info slave-bin.001 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 -slave-bin.001 79 Slave 2 2 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4 +slave-bin.001 79 Slave 2 3 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.001,pos=4 slave-bin.001 132 Query 1 2 use test; create table t1(n int not null auto_increment primary key) slave-bin.001 225 Intvar 1 3 INSERT_ID=1 slave-bin.001 253 Query 1 4 use test; insert into t1 values (NULL) slave-bin.001 316 Query 1 5 use test; drop table t1 slave-bin.001 364 Query 1 6 use test; create table t1 (word char(20) not null) -slave-bin.001 439 Query 1 8 use test; drop table t1 -slave-bin.001 487 Rotate 2 3 slave-bin.002;pos=4; forced by master -slave-bin.001 527 Stop 2 4 +slave-bin.001 439 Create_file 1 7 db=test;table=t1;file_id=11;block_len=81 +slave-bin.001 647 Exec_load 1 8 ;file_id=11 +slave-bin.001 670 Query 1 9 use test; drop table t1 +slave-bin.001 718 Rotate 1 4 slave-bin.002;pos=4; forced by master +slave-bin.001 758 Stop 2 5 Log_name Pos Event_type Server_id Log_seq Info slave-bin.002 4 Start 2 1 Server ver: $VERSION, Binlog ver: 2 -slave-bin.002 79 Slave 2 2 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 +slave-bin.002 79 Slave 2 10 host=127.0.0.1,port=$MASTER_MYPORT,log=master-bin.002,pos=4 slave-bin.002 132 Query 1 2 use test; create table t1 (n int) slave-bin.002 190 Query 1 3 use test; insert into t1 values (1) slave-bin.002 250 Query 1 4 use test; drop table t1 diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c index 99af418f6bd..0ef496227b6 100644 --- a/mysys/mf_iocache.c +++ b/mysys/mf_iocache.c @@ -56,7 +56,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset)); info->file=file; - info->pre_read = info->post_read = 0; + info->pre_close = info->pre_read = info->post_read = 0; + info->arg = 0; if (!cachesize) if (! (cachesize= my_default_record_cache_size)) DBUG_RETURN(1); /* No cache requested */ @@ -608,7 +609,10 @@ int flush_io_cache(IO_CACHE *info) int end_io_cache(IO_CACHE *info) { int error=0; + IO_CACHE_CALLBACK pre_close; DBUG_ENTER("end_io_cache"); + if((pre_close=info->pre_close)) + (*pre_close)(info); if (info->buffer) { if (info->file != -1) /* File doesn't exist */ @@ -618,3 +622,4 @@ int end_io_cache(IO_CACHE *info) } DBUG_RETURN(error); } /* end_io_cache */ + diff --git a/sql/log.cc b/sql/log.cc index 091a7b55d0c..61384f85085 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -535,8 +535,8 @@ void MYSQL_LOG::new_file(bool inside_mutex) We log the whole file name for log file as the user may decide to change base names at some point. */ - Rotate_log_event r(new_name+dirname_length(new_name)); THD* thd = current_thd; + Rotate_log_event r(thd,new_name+dirname_length(new_name)); r.set_log_seq(0, this); // this log rotation could have been initiated by a master of // the slave running with log-bin @@ -638,24 +638,8 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command, return 0; } -/* Write to binary log in a format to be used for replication */ -bool MYSQL_LOG::write(Slave_log_event* event_info) -{ - bool error; - if (!inited) // Can't use mutex if not init - return 0; - VOID(pthread_mutex_lock(&LOCK_log)); - if(!event_info->log_seq) - event_info->set_log_seq(current_thd, this); - error = event_info->write(&log_file); - flush_io_cache(&log_file); - VOID(pthread_mutex_unlock(&LOCK_log)); - return error; -} - - -bool MYSQL_LOG::write(Query_log_event* event_info) +bool MYSQL_LOG::write(Log_event* event_info) { /* In most cases this is only called if 'is_open()' is true */ bool error=0; @@ -667,40 +651,42 @@ bool MYSQL_LOG::write(Query_log_event* event_info) if (is_open()) { THD *thd=event_info->thd; + const char* db = event_info->get_db(); #ifdef USING_TRANSACTIONS - IO_CACHE *file = (event_info->cache_stmt ? &thd->transaction.trans_log : + IO_CACHE *file = ((event_info->cache_stmt && thd) ? + &thd->transaction.trans_log : &log_file); #else IO_CACHE *file = &log_file; #endif - if ((!(thd->options & OPTION_BIN_LOG) && + if ((thd && !(thd->options & OPTION_BIN_LOG) && (thd->master_access & PROCESS_ACL)) || - !db_ok(event_info->db, binlog_do_db, binlog_ignore_db)) + (db && !db_ok(db, binlog_do_db, binlog_ignore_db))) { VOID(pthread_mutex_unlock(&LOCK_log)); return 0; } error=1; - if (thd->last_insert_id_used) + if (thd && thd->last_insert_id_used) { - Intvar_log_event e((uchar)LAST_INSERT_ID_EVENT, thd->last_insert_id); + Intvar_log_event e(thd,(uchar)LAST_INSERT_ID_EVENT,thd->last_insert_id); e.set_log_seq(thd, this); if (thd->server_id) e.server_id = thd->server_id; if (e.write(file)) goto err; } - if (thd->insert_id_used) + if (thd && thd->insert_id_used) { - Intvar_log_event e((uchar)INSERT_ID_EVENT, thd->last_insert_id); + Intvar_log_event e(thd,(uchar)INSERT_ID_EVENT,thd->last_insert_id); e.set_log_seq(thd, this); if (thd->server_id) e.server_id = thd->server_id; if (e.write(file)) goto err; } - if (thd->convert_set) + if (thd && thd->convert_set) { char buf[1024] = "SET CHARACTER SET "; char* p = strend(buf); @@ -795,42 +781,6 @@ err: } -bool MYSQL_LOG::write(Load_log_event* event_info) -{ - bool error=0; - bool should_rotate = 0; - - if (inited) - { - VOID(pthread_mutex_lock(&LOCK_log)); - if (is_open()) - { - THD *thd=event_info->thd; - if ((thd->options & OPTION_BIN_LOG) || - !(thd->master_access & PROCESS_ACL)) - { - event_info->set_log_seq(thd, this); - if (event_info->write(&log_file) || flush_io_cache(&log_file)) - { - if (!write_error) - sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); - error=write_error=1; - } - should_rotate = (my_b_tell(&log_file) >= max_binlog_size); - VOID(pthread_cond_broadcast(&COND_binlog_update)); - } - } - - if(should_rotate) - new_file(1); // inside mutex - - VOID(pthread_mutex_unlock(&LOCK_log)); - } - - return error; -} - - /* Write update log in a format suitable for incremental backup */ bool MYSQL_LOG::write(THD *thd,const char *query, uint query_length, diff --git a/sql/log_event.cc b/sql/log_event.cc index 1a3469d57bb..279bb9fbde3 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -63,6 +63,18 @@ static void pretty_print_char(String* packet, int c) packet->append('\''); } +static inline char* slave_load_file_stem(char*buf, uint file_id, + int event_server_id) +{ + fn_format(buf,"SQL_LOAD-",slave_load_tmpdir,"",4+32); + buf = strend(buf); + buf = int10_to_str(::server_id, buf, 10); + *buf++ = '-'; + buf = int10_to_str(event_server_id, buf, 10); + *buf++ = '-'; + return int10_to_str(file_id, buf, 10); +} + #endif const char* Log_event::get_type_str() @@ -76,11 +88,59 @@ const char* Log_event::get_type_str() case INTVAR_EVENT: return "Intvar"; case LOAD_EVENT: return "Load"; case SLAVE_EVENT: return "Slave"; + case CREATE_FILE_EVENT: return "Create_file"; + case APPEND_BLOCK_EVENT: return "Append_block"; + case DELETE_FILE_EVENT: return "Delete_file"; + case EXEC_LOAD_EVENT: return "Exec_load"; default: /* impossible */ return "Unknown"; } } #ifndef MYSQL_CLIENT +Log_event::Log_event(THD* thd_arg, uint16 flags_arg): + exec_time(0), + flags(flags_arg),cached_event_len(0), + temp_buf(0),thd(thd_arg) +{ + if (thd) + { + server_id = thd->server_id; + log_seq = thd->log_seq; + when = thd->start_time; + } + else + { + server_id = ::server_id; + log_seq = 0; + when = time(NULL); + } +} +#endif + +Log_event::Log_event(const char* buf):cached_event_len(0),temp_buf(0) +{ + when = uint4korr(buf); + server_id = uint4korr(buf + SERVER_ID_OFFSET); + log_seq = uint4korr(buf + LOG_SEQ_OFFSET); + flags = uint2korr(buf + FLAGS_OFFSET); +#ifndef MYSQL_CLIENT + thd = 0; +#endif +} + + +#ifndef MYSQL_CLIENT + +int Log_event::exec_event(struct st_master_info* mi) +{ + if (mi) + { + thd->log_seq = 0; + mi->inc_pos(get_event_len(), log_seq); + flush_master_info(mi); + } + return 0; +} void Log_event::pack_info(String* packet) { @@ -131,7 +191,7 @@ void Load_log_event::pack_info(String* packet) } tmp.append("LOAD DATA INFILE '"); - tmp.append(fname); + tmp.append(fname, fname_len); tmp.append("' ", 2); if(sql_ex.opt_flags && REPLACE_FLAG ) tmp.append(" REPLACE "); @@ -385,12 +445,15 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) error = "read error"; goto err; } - res = read_log_event(buf, data_len); + if((res = read_log_event(buf, data_len))) + res->register_temp_buf(buf); err: if (log_lock) pthread_mutex_unlock(log_lock); if(error) + { sql_print_error(error); - my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); + } return res; } @@ -400,61 +463,54 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len) (uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET)) return NULL; // general sanity check - will fail on a partial read + Log_event* ev = NULL; + switch(buf[EVENT_TYPE_OFFSET]) { case QUERY_EVENT: - { - Query_log_event* q = new Query_log_event(buf, event_len); - if (!q->query) - { - delete q; - return NULL; - } - - return q; - } - + ev = new Query_log_event(buf, event_len); + break; case LOAD_EVENT: - { - Load_log_event* l = new Load_log_event(buf, event_len); - if (!l->table_name) - { - delete l; - return NULL; - } - - return l; - } - + ev = new Load_log_event(buf, event_len); + break; case ROTATE_EVENT: - { - Rotate_log_event* r = new Rotate_log_event(buf, event_len); - if (!r->new_log_ident) - { - delete r; - return NULL; - } - - return r; - } + ev = new Rotate_log_event(buf, event_len); + break; case SLAVE_EVENT: - { - Slave_log_event* s = new Slave_log_event(buf, event_len); - if (!s->master_host) - { - delete s; - return NULL; - } - - return s; - } - case START_EVENT: return new Start_log_event(buf); - case STOP_EVENT: return new Stop_log_event(buf); - case INTVAR_EVENT: return new Intvar_log_event(buf); + ev = new Slave_log_event(buf, event_len); + break; + case CREATE_FILE_EVENT: + ev = new Create_file_log_event(buf, event_len); + break; + case APPEND_BLOCK_EVENT: + ev = new Append_block_log_event(buf, event_len); + break; + case DELETE_FILE_EVENT: + ev = new Delete_file_log_event(buf, event_len); + break; + case EXEC_LOAD_EVENT: + ev = new Execute_load_log_event(buf, event_len); + break; + case START_EVENT: + ev = new Start_log_event(buf); + break; + case STOP_EVENT: + ev = new Stop_log_event(buf); + break; + case INTVAR_EVENT: + ev = new Intvar_log_event(buf); + break; default: break; } - return NULL; // default value + if (!ev) return 0; + if (!ev->is_valid()) + { + delete ev; + return 0; + } + ev->cached_event_len = event_len; + return ev; } #ifdef MYSQL_CLIENT @@ -568,6 +624,23 @@ int Rotate_log_event::write_data(IO_CACHE* file) my_b_write(file, (byte*)new_log_ident, (uint) ident_len); } +#ifndef MYSQL_CLIENT +Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, + bool using_trans): + Log_event(thd_arg), data_buf(0), query(query_arg), db(thd_arg->db), + q_len(thd_arg->query_length), + error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno), + thread_id(thd_arg->thread_id), + cache_stmt(using_trans && + (thd_arg->options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN))) + { + time_t end_time; + time(&end_time); + exec_time = (ulong) (end_time - thd->start_time); + db_len = (db) ? (uint32) strlen(db) : 0; + } +#endif + Query_log_event::Query_log_event(const char* buf, int event_len): Log_event(buf),data_buf(0), query(NULL), db(NULL) { @@ -690,7 +763,7 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) } #endif -int Load_log_event::write_data(IO_CACHE* file) +int Load_log_event::write_data_header(IO_CACHE* file) { char buf[LOAD_HEADER_LEN]; int4store(buf + L_THREAD_ID_OFFSET, thread_id); @@ -699,45 +772,110 @@ int Load_log_event::write_data(IO_CACHE* file) buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_DB_LEN_OFFSET] = (char)db_len; int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); - - if(my_b_write(file, (byte*)buf, sizeof(buf)) || - my_b_write(file, (byte*)&sql_ex, sizeof(sql_ex))) - return 1; + memcpy(buf + L_SQL_EX_OFFSET, &sql_ex, sizeof(sql_ex)); + return my_b_write(file, (byte*)buf, LOAD_HEADER_LEN); +} +int Load_log_event::write_data_body(IO_CACHE* file) +{ if (num_fields && fields && field_lens) { if(my_b_write(file, (byte*)field_lens, num_fields) || my_b_write(file, (byte*)fields, field_block_len)) return 1; } - if(my_b_write(file, (byte*)table_name, table_name_len + 1) || + return my_b_write(file, (byte*)table_name, table_name_len + 1) || my_b_write(file, (byte*)db, db_len + 1) || - my_b_write(file, (byte*)fname, fname_len)) - return 1; - return 0; + my_b_write(file, (byte*)fname, fname_len); } +#ifndef MYSQL_CLIENT +Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, + const char* db_arg, const char* table_name_arg, + List<Item>& fields_arg, enum enum_duplicates handle_dup): + Log_event(thd),thread_id(thd->thread_id), + num_fields(0),fields(0),field_lens(0),field_block_len(0), + table_name(table_name_arg), + db(db_arg), + fname(ex->file_name),fname_null_term(1) + { + time_t end_time; + time(&end_time); + exec_time = (ulong) (end_time - thd->start_time); + db_len = (db) ? (uint32) strlen(db) : 0; + table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; + fname_len = (fname) ? (uint) strlen(fname) : 0; + sql_ex.field_term = (*ex->field_term)[0]; + sql_ex.enclosed = (*ex->enclosed)[0]; + sql_ex.line_term = (*ex->line_term)[0]; + sql_ex.line_start = (*ex->line_start)[0]; + sql_ex.escaped = (*ex->escaped)[0]; + sql_ex.opt_flags = 0; + if(ex->dumpfile) + sql_ex.opt_flags |= DUMPFILE_FLAG; + if(ex->opt_enclosed) + sql_ex.opt_flags |= OPT_ENCLOSED_FLAG; + + sql_ex.empty_flags = 0; + + switch(handle_dup) + { + case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break; + case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break; + case DUP_ERROR: break; + } + + if(!ex->field_term->length()) + sql_ex.empty_flags |= FIELD_TERM_EMPTY; + if(!ex->enclosed->length()) + sql_ex.empty_flags |= ENCLOSED_EMPTY; + if(!ex->line_term->length()) + sql_ex.empty_flags |= LINE_TERM_EMPTY; + if(!ex->line_start->length()) + sql_ex.empty_flags |= LINE_START_EMPTY; + if(!ex->escaped->length()) + sql_ex.empty_flags |= ESCAPED_EMPTY; + + skip_lines = ex->skip_lines; + + List_iterator<Item> li(fields_arg); + field_lens_buf.length(0); + fields_buf.length(0); + Item* item; + while((item = li++)) + { + num_fields++; + uchar len = (uchar) strlen(item->name); + field_block_len += len + 1; + fields_buf.append(item->name, len + 1); + field_lens_buf.append((char*)&len, 1); + } + + field_lens = (const uchar*)field_lens_buf.ptr(); + fields = fields_buf.ptr(); + } + +#endif + Load_log_event::Load_log_event(const char* buf, int event_len): - Log_event(buf),data_buf(0),num_fields(0),fields(0), + Log_event(buf),num_fields(0),fields(0), field_lens(0),field_block_len(0), - table_name(0),db(0),fname(0) + table_name(0),db(0),fname(0),fname_null_term(0) { - uint data_len; - if((uint)event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN)) - return; - memcpy(&sql_ex, buf + LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN, - sizeof(sql_ex)); - data_len = event_len - LOAD_HEADER_LEN - LOG_EVENT_HEADER_LEN - - sizeof(sql_ex); - if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME)))) + if (!event_len) // derived class, will call copy_log_event() itself return; - memcpy(data_buf, buf +LOG_EVENT_HEADER_LEN + LOAD_HEADER_LEN - + sizeof(sql_ex), data_len); - copy_log_event(buf, data_len); + copy_log_event(buf, event_len); } -void Load_log_event::copy_log_event(const char *buf, ulong data_len) +int Load_log_event::copy_log_event(const char *buf, ulong event_len) { + uint data_len; + int body_offset = get_data_body_offset(); + if((int)event_len < body_offset) + return 1; + memcpy(&sql_ex, buf + L_SQL_EX_OFFSET + LOG_EVENT_HEADER_LEN, + sizeof(sql_ex)); + data_len = event_len - body_offset; thread_id = uint4korr(buf + L_THREAD_ID_OFFSET + LOG_EVENT_HEADER_LEN); exec_time = uint4korr(buf + L_EXEC_TIME_OFFSET + LOG_EVENT_HEADER_LEN); skip_lines = uint4korr(buf + L_SKIP_LINES_OFFSET + LOG_EVENT_HEADER_LEN); @@ -746,22 +884,21 @@ void Load_log_event::copy_log_event(const char *buf, ulong data_len) num_fields = uint4korr(buf + L_NUM_FIELDS_OFFSET + LOG_EVENT_HEADER_LEN); if (num_fields > data_len) // simple sanity check against corruption - return; - - field_lens = (uchar*) data_buf; + return 1; + field_lens = (uchar*)buf + body_offset; uint i; for (i = 0; i < num_fields; i++) { field_block_len += (uint)field_lens[i] + 1; } fields = (char*)field_lens + num_fields; - - *((char*)data_buf+data_len) = 0; table_name = fields + field_block_len; db = table_name + table_name_len + 1; fname = db + db_len + 1; - fname_len = data_len - 2 - db_len - table_name_len - num_fields - - field_block_len; + fname_len = (get_type_code() == LOAD_EVENT) ? + data_len - 2 - db_len - table_name_len - num_fields - field_block_len : + strlen(fname); + return 0; } #ifdef MYSQL_CLIENT @@ -786,7 +923,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) if(db && db[0] && !same_db) fprintf(file, "use %s;\n", db); - fprintf(file, "LOAD DATA INFILE '%s' ", fname); + fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname); if(sql_ex.opt_flags && REPLACE_FLAG ) fprintf(file," REPLACE "); @@ -870,9 +1007,8 @@ void Load_log_event::set_fields(List<Item> &fields) } -Slave_log_event::Slave_log_event(THD* thd_arg,MASTER_INFO* mi): - Log_event(thd_arg->start_time, 0, 1, thd_arg->server_id), - mem_pool(0),master_host(0) +Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi): + Log_event(thd_arg),mem_pool(0),master_host(0) { if(!mi->inited) return; @@ -947,7 +1083,6 @@ void Slave_log_event::init_from_mem_pool(int data_size) master_host = 0; return; } - master_log_len = strlen(master_log); } @@ -965,43 +1100,59 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len): } #ifndef MYSQL_CLIENT -Create_file_log_event::Create_file_log_event(THD* thd, TABLE_LIST * table, - char* block_arg, - uint block_len_arg) : - Log_event(thd->start_time), db(table->db),tbl_name(table->real_name), - db_len(strlen(table->db)),tbl_name_len(strlen(table->real_name)), - block(block_arg),block_len(block_len_arg), - file_id(thd->file_id = thd->query_id) +Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex, + const char* db_arg, const char* table_name_arg, + List<Item>& fields_arg, enum enum_duplicates handle_dup, + char* block_arg, uint block_len_arg): + Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup), + fake_base(0),block(block_arg),block_len(block_len_arg), + file_id(thd_arg->file_id = thd_arg->query_id) { - set_log_seq(thd, &mysql_bin_log); } #endif -int Create_file_log_event::write_data(IO_CACHE* file) +int Create_file_log_event::write_data_body(IO_CACHE* file) +{ + int res; + if ((res = Load_log_event::write_data_body(file)) || fake_base) + return res; + return my_b_write(file, "", 1) || my_b_write(file, block, block_len); +} + +int Create_file_log_event::write_data_header(IO_CACHE* file) { + int res; + if ((res = Load_log_event::write_data_header(file)) || fake_base) + return res; char buf[CREATE_FILE_HEADER_LEN]; - buf[CF_DB_LEN_OFFSET] = (uchar)db_len; - buf[CF_TBL_LEN_OFFSET] = (uchar)tbl_name_len; int4store(buf + CF_FILE_ID_OFFSET, file_id); - return my_b_write(file, buf, CREATE_FILE_HEADER_LEN) || - my_b_write(file, db, db_len) || - my_b_write(file, tbl_name, tbl_name_len) || - my_b_write(file, block, block_len); + return my_b_write(file, buf, CREATE_FILE_HEADER_LEN); +} + +int Create_file_log_event::write_base(IO_CACHE* file) +{ + int res; + fake_base = 1; // pretend we are Load event + res = write(file); + fake_base = 0; + return res; } Create_file_log_event::Create_file_log_event(const char* buf, int len): - Log_event(buf),db(0) + Load_log_event(buf,0),fake_base(0),block(0) { - db_len = (uint)buf[LOG_EVENT_HEADER_LEN + CF_DB_LEN_OFFSET]; - tbl_name_len = (uint)buf[CF_TBL_LEN_OFFSET + LOG_EVENT_HEADER_LEN]; - if ((uint)len < db_len + tbl_name_len + CREATE_FILE_EVENT_OVERHEAD) + int block_offset; + if (copy_log_event(buf,len)) return; - - file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + CF_FILE_ID_OFFSET); - db = (char*)buf + CREATE_FILE_EVENT_OVERHEAD; - tbl_name = db + db_len; - block = tbl_name + tbl_name_len; - block_len = len - (db_len + tbl_name_len + CREATE_FILE_EVENT_OVERHEAD); + fname_null_term = 1; + file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + + + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET); + block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() + + CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname + if(len < block_offset) + return; + block = (char*)buf + block_offset; + block_len = len - block_offset; } #ifdef MYSQL_CLIENT void Create_file_log_event::print(FILE* file, bool short_form = 0, @@ -1009,11 +1160,8 @@ void Create_file_log_event::print(FILE* file, bool short_form = 0, { if (short_form) return; - print_header(file); - fputc('\n', file); - fprintf(file, "Create_file: db='%-*s' table='%-*s' file_id=%d,\ - block_len=%d\n", db_len, db, tbl_name_len, tbl_name, file_id, - block_len); + Load_log_event::print(file, 1, last_db); + fprintf(file, " file_id=%d, block_len=%d\n", file_id, block_len); } #endif @@ -1027,13 +1175,601 @@ void Create_file_log_event::pack_info(String* packet) tmp.append("db="); tmp.append(db, db_len); tmp.append(";table="); - tmp.append(tbl_name, tbl_name_len); + tmp.append(table_name, table_name_len); tmp.append(";file_id="); tmp.append(llstr(file_id,buf)); tmp.append(";block_len="); tmp.append(llstr(block_len,buf)); + net_store_data(packet, (char*)tmp.ptr(), tmp.length()); +} +#endif + +#ifndef MYSQL_CLIENT +Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, + uint block_len_arg): + Log_event(thd_arg), block(block_arg),block_len(block_len_arg), + file_id(thd_arg->file_id) +{ +} +#endif + +Append_block_log_event::Append_block_log_event(const char* buf, int len): + Log_event(buf),block(0) +{ + if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) + return; + file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); + block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD; + block_len = len - APPEND_BLOCK_EVENT_OVERHEAD; +} + +int Append_block_log_event::write_data(IO_CACHE* file) +{ + char buf[APPEND_BLOCK_HEADER_LEN]; + int4store(buf + AB_FILE_ID_OFFSET, file_id); + return my_b_write(file, buf, APPEND_BLOCK_HEADER_LEN) || + my_b_write(file, block, block_len); +} + +#ifdef MYSQL_CLIENT +void Append_block_log_event::print(FILE* file, bool short_form = 0, + char* last_db = 0) +{ + if (short_form) + return; + print_header(file); + fputc('\n', file); + fprintf(file, "#Append_block: file_id=%d, block_len=%d\n", + file_id, block_len); +} +#endif +#ifndef MYSQL_CLIENT +void Append_block_log_event::pack_info(String* packet) +{ + char buf1[256]; + String tmp(buf1, sizeof(buf1)); + tmp.length(0); + char buf[22]; + tmp.append(";file_id="); + tmp.append(llstr(file_id,buf)); + tmp.append(";block_len="); + tmp.append(llstr(block_len,buf)); + net_store_data(packet, (char*)tmp.ptr(), tmp.length()); +} +#endif + +#ifndef MYSQL_CLIENT +Delete_file_log_event::Delete_file_log_event(THD* thd_arg): + Log_event(thd_arg),file_id(thd_arg->file_id) +{ +} +#endif + +Delete_file_log_event::Delete_file_log_event(const char* buf, int len): + Log_event(buf),file_id(0) +{ + if((uint)len < DELETE_FILE_EVENT_OVERHEAD) + return; + file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); +} + +int Delete_file_log_event::write_data(IO_CACHE* file) +{ + char buf[DELETE_FILE_HEADER_LEN]; + int4store(buf + DF_FILE_ID_OFFSET, file_id); + return my_b_write(file, buf, DELETE_FILE_HEADER_LEN); +} + +#ifdef MYSQL_CLIENT +void Delete_file_log_event::print(FILE* file, bool short_form = 0, + char* last_db = 0) +{ + if (short_form) + return; + print_header(file); + fputc('\n', file); + fprintf(file, "#Delete_file: file_id=%d\n", + file_id); +} +#endif +#ifndef MYSQL_CLIENT +void Delete_file_log_event::pack_info(String* packet) +{ + char buf1[256]; + String tmp(buf1, sizeof(buf1)); + tmp.length(0); + char buf[22]; + tmp.append(";file_id="); + tmp.append(llstr(file_id,buf)); + net_store_data(packet, (char*)tmp.ptr(), tmp.length()); +} +#endif + +#ifndef MYSQL_CLIENT +Execute_load_log_event::Execute_load_log_event(THD* thd_arg): + Log_event(thd_arg),file_id(thd_arg->file_id) +{ +} +#endif + +Execute_load_log_event::Execute_load_log_event(const char* buf,int len): + Log_event(buf),file_id(0) +{ + if((uint)len < EXEC_LOAD_EVENT_OVERHEAD) + return; + file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET); +} + +int Execute_load_log_event::write_data(IO_CACHE* file) +{ + char buf[EXEC_LOAD_HEADER_LEN]; + int4store(buf + EL_FILE_ID_OFFSET, file_id); + return my_b_write(file, buf, EXEC_LOAD_HEADER_LEN); +} + +#ifdef MYSQL_CLIENT +void Execute_load_log_event::print(FILE* file, bool short_form = 0, + char* last_db = 0) +{ + if (short_form) + return; + print_header(file); + fputc('\n', file); + fprintf(file, "#Exec_load: file_id=%d\n", + file_id); } #endif +#ifndef MYSQL_CLIENT +void Execute_load_log_event::pack_info(String* packet) +{ + char buf1[256]; + String tmp(buf1, sizeof(buf1)); + tmp.length(0); + char buf[22]; + tmp.append(";file_id="); + tmp.append(llstr(file_id,buf)); + net_store_data(packet, (char*)tmp.ptr(), tmp.length()); +} +#endif + +#ifndef MYSQL_CLIENT +int Query_log_event::exec_event(struct st_master_info* mi) +{ + int expected_error,actual_error = 0; + init_sql_alloc(&thd->mem_root, 8192,0); + thd->db = rewrite_db((char*)db); + if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + { + thd->query = (char*)query; + thd->set_time((time_t)when); + thd->current_tablenr = 0; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->query_id = query_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + thd->query_error = 0; // clear error + thd->net.last_errno = 0; + thd->net.last_error[0] = 0; + thd->slave_proxy_id = thread_id; // for temp tables + + // sanity check to make sure the master did not get a really bad + // error on the query + if (!check_expected_error(thd, (expected_error = error_code))) + { + mysql_parse(thd, thd->query, q_len); + if (expected_error != + (actual_error = thd->net.last_errno) && expected_error) + { + const char* errmsg = "Slave: did not get the expected error\ + running query from master - expected: '%s'(%d), got '%s'(%d)"; + sql_print_error(errmsg, ER_SAFE(expected_error), + expected_error, + actual_error ? thd->net.last_error:"no error", + actual_error); + thd->query_error = 1; + } + else if (expected_error == actual_error) + { + thd->query_error = 0; + *last_slave_error = 0; + last_slave_errno = 0; + } + } + else + { + // master could be inconsistent, abort and tell DBA to check/fix it + thd->db = thd->query = 0; + thd->convert_set = 0; + close_thread_tables(thd); + free_root(&thd->mem_root,0); + return 1; + } + } + thd->db = 0; // prevent db from being freed + thd->query = 0; // just to be sure + // assume no convert for next query unless set explictly + thd->convert_set = 0; + close_thread_tables(thd); + + if (thd->query_error || thd->fatal_error) + { + slave_print_error(actual_error, "error '%s' on query '%s'", + actual_error ? thd->net.last_error : + "unexpected success or fatal error", query); + free_root(&thd->mem_root,0); + return 1; + } + free_root(&thd->mem_root,0); + return Log_event::exec_event(mi); +} + +int Load_log_event::exec_event(NET* net, struct st_master_info* mi) +{ + init_sql_alloc(&thd->mem_root, 8192,0); + thd->db = rewrite_db((char*)db); + thd->query = 0; + thd->query_error = 0; + + if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + { + thd->set_time((time_t)when); + thd->current_tablenr = 0; + VOID(pthread_mutex_lock(&LOCK_thread_count)); + thd->query_id = query_id++; + VOID(pthread_mutex_unlock(&LOCK_thread_count)); + + TABLE_LIST tables; + bzero((char*) &tables,sizeof(tables)); + tables.db = thd->db; + tables.name = tables.real_name = (char*)table_name; + tables.lock_type = TL_WRITE; + // the table will be opened in mysql_load + if(table_rules_on && !tables_ok(thd, &tables)) + { + if (net) + skip_load_data_infile(net); + } + else + { + char llbuff[22]; + enum enum_duplicates handle_dup = DUP_IGNORE; + char fname_buf[FN_REFLEN+1], *fname_p; + if (fname_null_term) + fname_p = (char*)fname; + else + { + int len = min(FN_REFLEN,fname_len); + memcpy(fname_buf,fname,len); + fname_buf[len] = 0; + fname_p = fname_buf; + } + if(sql_ex.opt_flags && REPLACE_FLAG) + handle_dup = DUP_REPLACE; + sql_exchange ex(fname_p, sql_ex.opt_flags && + DUMPFILE_FLAG ); + String field_term(&sql_ex.field_term, 1), + enclosed(&sql_ex.enclosed, 1), + line_term(&sql_ex.line_term,1), + escaped(&sql_ex.escaped, 1), + line_start(&sql_ex.line_start, 1); + + ex.field_term = &field_term; + if(sql_ex.empty_flags & FIELD_TERM_EMPTY) + ex.field_term->length(0); + + ex.enclosed = &enclosed; + if(sql_ex.empty_flags & ENCLOSED_EMPTY) + ex.enclosed->length(0); + + ex.line_term = &line_term; + if(sql_ex.empty_flags & LINE_TERM_EMPTY) + ex.line_term->length(0); + + ex.line_start = &line_start; + if(sql_ex.empty_flags & LINE_START_EMPTY) + ex.line_start->length(0); + + ex.escaped = &escaped; + if(sql_ex.empty_flags & ESCAPED_EMPTY) + ex.escaped->length(0); + + ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG); + if(sql_ex.empty_flags & FIELD_TERM_EMPTY) + ex.field_term->length(0); + + ex.skip_lines = skip_lines; + List<Item> fields; + set_fields(fields); + thd->slave_proxy_id = thd->thread_id; + if (net) + { + // mysql_load will use thd->net to read the file + thd->net.vio = net->vio; + // make sure the client does not get confused + // about the packet sequence + thd->net.pkt_nr = net->pkt_nr; + } + if(mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, + TL_WRITE)) + thd->query_error = 1; + if(thd->cuted_fields) + sql_print_error("Slave: load data infile at position %s in log \ +'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME, + thd->cuted_fields ); + if(net) + net->pkt_nr = thd->net.pkt_nr; + } + } + else + { + // we will just ask the master to send us /dev/null if we do not + // want to load the data + if (net) + skip_load_data_infile(net); + } + + thd->net.vio = 0; + thd->db = 0;// prevent db from being freed + close_thread_tables(thd); + if(thd->query_error) + { + int sql_error = thd->net.last_errno; + if(!sql_error) + sql_error = ER_UNKNOWN_ERROR; + + slave_print_error(sql_error, "Slave: Error '%s' running load data infile ", + ER(sql_error)); + free_root(&thd->mem_root,0); + return 1; + } + free_root(&thd->mem_root,0); + + if(thd->fatal_error) + { + sql_print_error("Slave: Fatal error running LOAD DATA INFILE "); + return 1; + } + + return Log_event::exec_event(mi); +} + +int Start_log_event::exec_event(struct st_master_info* mi) +{ + close_temporary_tables(thd); + return Log_event::exec_event(mi); +} + +int Stop_log_event::exec_event(struct st_master_info* mi) +{ + if(mi->pos > 4) // stop event should be ignored after rotate event + { + close_temporary_tables(thd); + mi->inc_pos(get_event_len(), log_seq); + flush_master_info(mi); + } + thd->log_seq = 0; + return 0; +} + +int Rotate_log_event::exec_event(struct st_master_info* mi) +{ + bool rotate_binlog = 0, write_slave_event = 0; + char* log_name = mi->log_file_name; + pthread_mutex_lock(&mi->lock); + + // rotate local binlog only if the name of remote has changed + if (!*log_name || !(log_name[ident_len] == 0 && + !memcmp(log_name, new_log_ident, ident_len))) + { + write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F) + && mysql_bin_log.is_open()); + rotate_binlog = (*log_name && write_slave_event); + memcpy(log_name, new_log_ident,ident_len ); + log_name[ident_len] = 0; + } + mi->pos = pos; + mi->last_log_seq = log_seq; +#ifndef DBUG_OFF + if (abort_slave_event_count) + ++events_till_abort; +#endif + if (rotate_binlog) + { + mysql_bin_log.new_file(); + mi->last_log_seq = 0; + } + pthread_cond_broadcast(&mi->cond); + pthread_mutex_unlock(&mi->lock); + flush_master_info(mi); + + if (write_slave_event) + { + Slave_log_event s(thd, mi); + if (s.master_host) + { + s.set_log_seq(0, &mysql_bin_log); + s.server_id = ::server_id; + mysql_bin_log.write(&s); + } + } + thd->log_seq = 0; + return 0; +} + +int Intvar_log_event::exec_event(struct st_master_info* mi) +{ + switch(type) + { + case LAST_INSERT_ID_EVENT: + thd->last_insert_id_used = 1; + thd->last_insert_id = val; + break; + case INSERT_ID_EVENT: + thd->next_insert_id = val; + break; + } + mi->inc_pending(get_event_len()); + return 0; +} + +int Slave_log_event::exec_event(struct st_master_info* mi) +{ + if(mysql_bin_log.is_open()) + mysql_bin_log.write(this); + return Log_event::exec_event(mi); +} + +int Create_file_log_event::exec_event(struct st_master_info* mi) +{ + char fname_buf[FN_REFLEN+10]; + char* p,*p1; + int fd = -1; + IO_CACHE file; + int error = 1; + p = slave_load_file_stem(fname_buf, file_id, server_id); + memcpy(p, ".info", 6); + bzero((char*)&file, sizeof(file)); + if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, + MYF(MY_WME))) < 0 || + init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, + MYF(MY_WME|MY_NABP))) + { + slave_print_error(my_errno, "Could not open file '%s'", fname_buf); + goto err; + } + + // a trick to avoid allocating another buffer + memcpy(p, ".data", 6); + fname = fname_buf; + fname_len = (uint)(p-fname) + 5; + if (write_base(&file)) + { + memcpy(p, ".info", 6); // to have it right in the error message + slave_print_error(my_errno, "Could not write to file '%s'", fname_buf); + goto err; + } + end_io_cache(&file); + my_close(fd, MYF(0)); + + // fname_buf now already has .data, not .info, because we did our trick + if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, + MYF(MY_WME))) < 0) + { + slave_print_error(my_errno, "Could not open file '%s'", fname_buf); + goto err; + } + if (my_write(fd, block, block_len, MYF(MY_WME+MY_NABP))) + { + slave_print_error(my_errno, "Write to '%s' failed", fname_buf); + goto err; + } + if (mysql_bin_log.is_open()) + mysql_bin_log.write(this); + error=0; +err: + if (error) + end_io_cache(&file); + if (fd >= 0) + my_close(fd, MYF(0)); + return error ? 1 : Log_event::exec_event(mi); +} + +int Delete_file_log_event::exec_event(struct st_master_info* mi) +{ + char fname[FN_REFLEN+10]; + char* p; + p = slave_load_file_stem(fname, file_id, server_id); + memcpy(p, ".data", 6); + (void)my_delete(fname, MYF(MY_WME)); + memcpy(p, ".info", 6); + (void)my_delete(fname, MYF(MY_WME)); + if (mysql_bin_log.is_open()) + mysql_bin_log.write(this); + return Log_event::exec_event(mi); +} + +int Append_block_log_event::exec_event(struct st_master_info* mi) +{ + char fname[FN_REFLEN+10]; + char* p; + int fd = -1; + int error = 1; + p = slave_load_file_stem(fname, file_id, server_id); + memcpy(p, ".data", 6); + if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0) + { + slave_print_error(my_errno, "Could not open file '%s'", fname); + goto err; + } + if (my_write(fd, block, block_len, MYF(MY_WME+MY_NABP))) + { + slave_print_error(my_errno, "Write to '%s' failed", fname); + goto err; + } + if (mysql_bin_log.is_open()) + mysql_bin_log.write(this); + error=0; +err: + if (fd >= 0) + my_close(fd, MYF(0)); + return error ? error : Log_event::exec_event(mi); +} + +int Execute_load_log_event::exec_event(struct st_master_info* mi) +{ + char fname[FN_REFLEN+10]; + char* p; + int fd = -1; + int error = 1; + int save_options; + IO_CACHE file; + Load_log_event* lev = 0; + p = slave_load_file_stem(fname, file_id, server_id); + memcpy(p, ".info", 6); + bzero((char*)&file, sizeof(file)); + if ((fd = my_open(fname, O_RDONLY|O_BINARY, MYF(MY_WME))) < 0 || + init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, + MYF(MY_WME|MY_NABP))) + { + slave_print_error(my_errno, "Could not open file '%s'", fname); + goto err; + } + if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,0)) + || lev->get_type_code() != LOAD_EVENT) + { + slave_print_error(0, "File '%s' appears corrupted", fname); + goto err; + } + // we want to disable binary logging in slave thread + // because we need the file events to appear in the same order + // as they do on the master relative to other events, so that we + // can preserve ascending order of log sequence numbers - needed + // to handle failover + save_options = thd->options; + thd->options &= ~OPTION_BIN_LOG; + lev->thd = thd; + if (lev->exec_event(0,0)) + { + slave_print_error(my_errno, "Failed executing load from '%s'", fname); + thd->options = save_options; + goto err; + } + thd->options = save_options; + (void)my_delete(fname, MYF(MY_WME)); + memcpy(p, ".data", 6); + (void)my_delete(fname, MYF(MY_WME)); + if (mysql_bin_log.is_open()) + mysql_bin_log.write(this); + error = 0; +err: + delete lev; + end_io_cache(&file); + if (fd >= 0) + my_close(fd, MYF(0)); + return error ? error : Log_event::exec_event(mi); +} + + +#endif diff --git a/sql/log_event.h b/sql/log_event.h index 1b92ff7ff83..2165a620fa3 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -42,6 +42,29 @@ */ #define ST_SERVER_VER_LEN 50 +#define DUMPFILE_FLAG 0x1 +#define OPT_ENCLOSED_FLAG 0x2 +#define REPLACE_FLAG 0x4 +#define IGNORE_FLAG 0x8 + +#define FIELD_TERM_EMPTY 0x1 +#define ENCLOSED_EMPTY 0x2 +#define LINE_TERM_EMPTY 0x4 +#define LINE_START_EMPTY 0x8 +#define ESCAPED_EMPTY 0x10 + + +struct sql_ex_info + { + char field_term; + char enclosed; + char line_term; + char line_start; + char escaped; + char opt_flags; // flags for the options + char empty_flags; // flags to indicate which of the terminating charact + } ; + /* Binary log consists of events. Each event has a fixed length header, followed by possibly variable ( depending on the type of event) length data body. The data body consists of an optional fixed length segment @@ -49,13 +72,17 @@ comments below for the format specifics */ + /* event-specific post-header sizes */ #define LOG_EVENT_HEADER_LEN 19 #define QUERY_HEADER_LEN (4 + 4 + 1 + 2) -#define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4) +#define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4+sizeof(struct sql_ex_info)) #define START_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4) #define ROTATE_HEADER_LEN 8 -#define CREATE_FILE_HEADER_LEN 6 +#define CREATE_FILE_HEADER_LEN 4 +#define APPEND_BLOCK_HEADER_LEN 4 +#define EXEC_LOAD_HEADER_LEN 4 +#define DELETE_FILE_HEADER_LEN 4 /* event header offsets */ @@ -98,6 +125,7 @@ #define L_DB_LEN_OFFSET 12 #define L_TBL_LEN_OFFSET 13 #define L_NUM_FIELDS_OFFSET 14 +#define L_SQL_EX_OFFSET 18 #define L_DATA_OFFSET LOAD_HEADER_LEN /* Rotate event post-header */ @@ -105,15 +133,26 @@ #define R_POS_OFFSET 0 #define R_IDENT_OFFSET 8 -#define CF_DB_LEN_OFFSET 0 -#define CF_TBL_LEN_OFFSET 1 -#define CF_FILE_ID_OFFSET 2 +#define CF_FILE_ID_OFFSET 0 +#define CF_DATA_OFFSET CREATE_FILE_HEADER_LEN + +#define AB_FILE_ID_OFFSET 0 +#define AB_DATA_OFFSET APPEND_BLOCK_HEADER_LEN + +#define EL_FILE_ID_OFFSET 0 + +#define DF_FILE_ID_OFFSET 0 #define QUERY_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN) #define QUERY_DATA_OFFSET (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN) #define ROTATE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+ROTATE_HEADER_LEN) -#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info)) -#define CREATE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+CREATE_FILE_HEADER_LEN) +#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN) +#define CREATE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+\ + +LOAD_HEADER_LEN+CREATE_FILE_HEADER_LEN) +#define DELETE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+DELETE_FILE_HEADER_LEN) +#define EXEC_LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+EXEC_LOAD_HEADER_LEN) +#define APPEND_BLOCK_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+APPEND_BLOCK_HEADER_LEN) + #define BINLOG_MAGIC "\xfe\x62\x69\x6e" @@ -123,7 +162,7 @@ enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8, - APPEND_TO_FILE_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11}; + APPEND_BLOCK_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11}; enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2 }; @@ -145,6 +184,11 @@ public: uint32 server_id; uint32 log_seq; uint16 flags; + int cached_event_len; + char* temp_buf; +#ifndef MYSQL_CLIENT + THD* thd; +#endif static void *operator new(size_t size) { @@ -158,30 +202,32 @@ public: int write(IO_CACHE* file); int write_header(IO_CACHE* file); - virtual int write_data(IO_CACHE* file __attribute__((unused))) { return 0; } + virtual int write_data(IO_CACHE* file) + { return write_data_header(file) || write_data_body(file); } + virtual int write_data_header(IO_CACHE* file __attribute__((unused))) + { return 0; } + virtual int write_data_body(IO_CACHE* file __attribute__((unused))) + { return 0; } virtual Log_event_type get_type_code() = 0; - Log_event(time_t when_arg, ulong exec_time_arg = 0, - int valid_exec_time = 0, uint32 server_id_arg = 0, - uint32 log_seq_arg = 0, uint16 flags_arg = 0): - when(when_arg), exec_time(exec_time_arg), - log_seq(log_seq_arg),flags(0) - { - server_id = server_id_arg ? server_id_arg : (::server_id); - if(valid_exec_time) - flags |= LOG_EVENT_TIME_F; - } - - Log_event(const char* buf) - { - when = uint4korr(buf); - server_id = uint4korr(buf + SERVER_ID_OFFSET); - log_seq = uint4korr(buf + LOG_SEQ_OFFSET); - flags = uint2korr(buf + FLAGS_OFFSET); - } - - virtual ~Log_event() {} - + virtual bool is_valid() = 0; + Log_event(const char* buf); +#ifndef MYSQL_CLIENT + Log_event(THD* thd_arg, uint16 flags_arg = 0); +#endif + virtual ~Log_event() { free_temp_buf();} + void register_temp_buf(char* buf) { temp_buf = buf; } + void free_temp_buf() + { + if (temp_buf) + { + my_free(temp_buf, MYF(0)); + temp_buf = 0; + } + } virtual int get_data_size() { return 0;} + virtual int get_data_body_offset() { return 0; } + int get_event_len() { return cached_event_len ? cached_event_len : + (cached_event_len = LOG_EVENT_HEADER_LEN + get_data_size()); } #ifdef MYSQL_CLIENT virtual void print(FILE* file, bool short_form = 0, char* last_db = 0) = 0; void print_timestamp(FILE* file, time_t *ts = 0); @@ -200,6 +246,11 @@ public: virtual void pack_info(String* packet); int net_send(THD* thd, const char* log_name, ulong pos); static void init_show_field_list(List<Item>* field_list); + virtual int exec_event(struct st_master_info* mi); + virtual const char* get_db() + { + return thd ? thd->db : 0; + } #endif }; @@ -219,24 +270,13 @@ public: uint16 error_code; ulong thread_id; #if !defined(MYSQL_CLIENT) - THD* thd; bool cache_stmt; - Query_log_event(THD* thd_arg, const char* query_arg, bool using_trans=0): - Log_event(thd_arg->start_time,0,1,thd_arg->server_id,thd_arg->log_seq), - data_buf(0), - query(query_arg), db(thd_arg->db), q_len(thd_arg->query_length), - error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno), - thread_id(thd_arg->thread_id), thd(thd_arg), - cache_stmt(using_trans && - (thd_arg->options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN))) - { - time_t end_time; - time(&end_time); - exec_time = (ulong) (end_time - thd->start_time); - db_len = (db) ? (uint32) strlen(db) : 0; - } - + + Query_log_event(THD* thd_arg, const char* query_arg, + bool using_trans=0); + const char* get_db() { return db; } void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif Query_log_event(const char* buf, int event_len); @@ -250,6 +290,7 @@ public: Log_event_type get_type_code() { return QUERY_EVENT; } int write(IO_CACHE* file); int write_data(IO_CACHE* file); // returns 0 on success, -1 on error + bool is_valid() { return query != 0; } int get_data_size() { return q_len + db_len + 2 + @@ -279,11 +320,13 @@ public: #ifndef MYSQL_CLIENT Slave_log_event(THD* thd_arg, struct st_master_info* mi); void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif Slave_log_event(const char* buf, int event_len); ~Slave_log_event(); int get_data_size(); + bool is_valid() { return master_host != 0; } Log_event_type get_type_code() { return SLAVE_EVENT; } #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); @@ -292,34 +335,10 @@ public: }; -#define DUMPFILE_FLAG 0x1 -#define OPT_ENCLOSED_FLAG 0x2 -#define REPLACE_FLAG 0x4 -#define IGNORE_FLAG 0x8 - -#define FIELD_TERM_EMPTY 0x1 -#define ENCLOSED_EMPTY 0x2 -#define LINE_TERM_EMPTY 0x4 -#define LINE_START_EMPTY 0x8 -#define ESCAPED_EMPTY 0x10 - - -struct sql_ex_info - { - char field_term; - char enclosed; - char line_term; - char line_start; - char escaped; - char opt_flags; // flags for the options - char empty_flags; // flags to indicate which of the terminating charact - } ; - class Load_log_event: public Log_event { protected: - char* data_buf; - void copy_log_event(const char *buf, ulong data_len); + int copy_log_event(const char *buf, ulong event_len); public: ulong thread_id; @@ -330,96 +349,39 @@ public: const char* fields; const uchar* field_lens; uint32 field_block_len; - const char* table_name; const char* db; const char* fname; + bool fname_null_term; uint32 skip_lines; sql_ex_info sql_ex; #if !defined(MYSQL_CLIENT) - THD* thd; String field_lens_buf; String fields_buf; - Load_log_event(THD* thd, sql_exchange* ex, const char* table_name_arg, - List<Item>& fields_arg, enum enum_duplicates handle_dup ): - Log_event(thd->start_time),data_buf(0),thread_id(thd->thread_id), - num_fields(0),fields(0),field_lens(0),field_block_len(0), - table_name(table_name_arg), - db(thd->db), - fname(ex->file_name), - thd(thd) - { - time_t end_time; - time(&end_time); - exec_time = (ulong) (end_time - thd->start_time); - db_len = (db) ? (uint32) strlen(db) : 0; - table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; - fname_len = (fname) ? (uint) strlen(fname) : 0; - sql_ex.field_term = (*ex->field_term)[0]; - sql_ex.enclosed = (*ex->enclosed)[0]; - sql_ex.line_term = (*ex->line_term)[0]; - sql_ex.line_start = (*ex->line_start)[0]; - sql_ex.escaped = (*ex->escaped)[0]; - sql_ex.opt_flags = 0; - if(ex->dumpfile) - sql_ex.opt_flags |= DUMPFILE_FLAG; - if(ex->opt_enclosed) - sql_ex.opt_flags |= OPT_ENCLOSED_FLAG; - - sql_ex.empty_flags = 0; - - switch(handle_dup) - { - case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break; - case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break; - case DUP_ERROR: break; - } - - if(!ex->field_term->length()) - sql_ex.empty_flags |= FIELD_TERM_EMPTY; - if(!ex->enclosed->length()) - sql_ex.empty_flags |= ENCLOSED_EMPTY; - if(!ex->line_term->length()) - sql_ex.empty_flags |= LINE_TERM_EMPTY; - if(!ex->line_start->length()) - sql_ex.empty_flags |= LINE_START_EMPTY; - if(!ex->escaped->length()) - sql_ex.empty_flags |= ESCAPED_EMPTY; - - skip_lines = ex->skip_lines; - - List_iterator<Item> li(fields_arg); - field_lens_buf.length(0); - fields_buf.length(0); - Item* item; - while((item = li++)) - { - num_fields++; - uchar len = (uchar) strlen(item->name); - field_block_len += len + 1; - fields_buf.append(item->name, len + 1); - field_lens_buf.append((char*)&len, 1); - } - - field_lens = (const uchar*)field_lens_buf.ptr(); - fields = fields_buf.ptr(); - } + + Load_log_event(THD* thd, sql_exchange* ex, const char* db_arg, + const char* table_name_arg, + List<Item>& fields_arg, enum enum_duplicates handle_dup); void set_fields(List<Item> &fields_arg); void pack_info(String* packet); + const char* get_db() { return db; } + int exec_event(struct st_master_info* mi) + { + return exec_event(thd->slave_net,mi); + } + int exec_event(NET* net, struct st_master_info* mi); #endif Load_log_event(const char* buf, int event_len); ~Load_log_event() { - if (data_buf) - { - my_free((gptr) data_buf, MYF(0)); - } } Log_event_type get_type_code() { return LOAD_EVENT; } - int write_data(IO_CACHE* file); // returns 0 on success, -1 on error + int write_data_header(IO_CACHE* file); + int write_data_body(IO_CACHE* file); + bool is_valid() { return table_name != 0; } int get_data_size() { return table_name_len + 2 + db_len + 2 + fname_len @@ -427,9 +389,10 @@ public: + 4 // exec_time + 4 // skip_lines + 4 // field block len - + sizeof(sql_ex) + field_block_len + num_fields*sizeof(uchar) ; + + sizeof(sql_ex) + field_block_len + num_fields; ; } + int get_data_body_offset() { return LOAD_EVENT_OVERHEAD; } #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); #endif @@ -443,23 +406,25 @@ public: uint32 created; uint16 binlog_version; char server_version[ST_SERVER_VER_LEN]; - - Start_log_event() :Log_event(time(NULL)),binlog_version(BINLOG_VERSION) +#ifndef MYSQL_CLIENT + Start_log_event() :Log_event((THD*)0),binlog_version(BINLOG_VERSION) { created = (uint32) when; memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); } +#endif Start_log_event(const char* buf); - ~Start_log_event() {} Log_event_type get_type_code() { return START_EVENT;} int write_data(IO_CACHE* file); + bool is_valid() { return 1; } int get_data_size() { return START_HEADER_LEN; } #ifndef MYSQL_CLIENT void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); @@ -471,17 +436,21 @@ class Intvar_log_event: public Log_event public: ulonglong val; uchar type; - Intvar_log_event(uchar type_arg, ulonglong val_arg) - :Log_event(time(NULL)),val(val_arg),type(type_arg) +#ifndef MYSQL_CLIENT + Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg) + :Log_event(thd_arg),val(val_arg),type(type_arg) {} +#endif Intvar_log_event(const char* buf); ~Intvar_log_event() {} Log_event_type get_type_code() { return INTVAR_EVENT;} const char* get_var_type_name(); int get_data_size() { return sizeof(type) + sizeof(val);} int write_data(IO_CACHE* file); + bool is_valid() { return 1; } #ifndef MYSQL_CLIENT void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif #ifdef MYSQL_CLIENT @@ -492,15 +461,21 @@ public: class Stop_log_event: public Log_event { public: - Stop_log_event() :Log_event(time(NULL)) +#ifndef MYSQL_CLIENT + Stop_log_event() :Log_event((THD*)0) {} +#endif Stop_log_event(const char* buf):Log_event(buf) { } ~Stop_log_event() {} Log_event_type get_type_code() { return STOP_EVENT;} + bool is_valid() { return 1; } #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif +#ifndef MYSQL_CLIENT + int exec_event(struct st_master_info* mi); #endif }; @@ -511,16 +486,16 @@ public: uchar ident_len; ulonglong pos; bool alloced; - - Rotate_log_event(const char* new_log_ident_arg, uint ident_len_arg = 0, - ulonglong pos_arg = 4) : - Log_event(time(NULL)), +#ifndef MYSQL_CLIENT + Rotate_log_event(THD* thd_arg, const char* new_log_ident_arg, + uint ident_len_arg = 0,ulonglong pos_arg = 4) : + Log_event(thd_arg), new_log_ident(new_log_ident_arg), ident_len(ident_len_arg ? ident_len_arg : (uint) strlen(new_log_ident_arg)), pos(pos_arg), alloced(0) {} - +#endif Rotate_log_event(const char* buf, int event_len); ~Rotate_log_event() { @@ -529,40 +504,136 @@ public: } Log_event_type get_type_code() { return ROTATE_EVENT;} int get_data_size() { return ident_len + ROTATE_HEADER_LEN;} + bool is_valid() { return new_log_ident != 0; } int write_data(IO_CACHE* file); #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); #endif #ifndef MYSQL_CLIENT void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif }; /* the classes below are for the new LOAD DATA INFILE logging */ -class Create_file_log_event: public Log_event +class Create_file_log_event: public Load_log_event { +protected: + // pretend we are Load event, so we can write out just + // our Load part - used on the slave when writing event out to + // SQL_LOAD-*.info file + bool fake_base; public: - char* db; - char* tbl_name; - uint db_len; - uint tbl_name_len; char* block; uint block_len; uint file_id; - -#ifndef MYSQL_CLIENT - Create_file_log_event(THD* thd, TABLE_LIST * table, char* block_arg, - uint block_len_arg); +#ifndef MYSQL_CLIENT + Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg, + const char* table_name_arg, + List<Item>& fields_arg, enum enum_duplicates handle_dup, + char* block_arg, uint block_len_arg); #endif Create_file_log_event(const char* buf, int event_len); ~Create_file_log_event() { } - Log_event_type get_type_code() { return CREATE_FILE_EVENT;} - int get_data_size() { return tbl_name_len + block_len + - CREATE_FILE_HEADER_LEN ;} + Log_event_type get_type_code() { return fake_base ? LOAD_EVENT : + CREATE_FILE_EVENT;} + int get_data_size() { return fake_base ? Load_log_event::get_data_size() : + Load_log_event::get_data_size() + + 4 + 1 + block_len;} + int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD: + LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; } + bool is_valid() { return block != 0; } + int write_data_header(IO_CACHE* file); + int write_data_body(IO_CACHE* file); + int write_base(IO_CACHE* file); // cut out Create_file extentions and + // write it as Load event - used on the slave + +#ifdef MYSQL_CLIENT + void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif +#ifndef MYSQL_CLIENT + void pack_info(String* packet); + int exec_event(struct st_master_info* mi); +#endif +}; + +class Append_block_log_event: public Log_event +{ +public: + char* block; + uint block_len; + uint file_id; + +#ifndef MYSQL_CLIENT + Append_block_log_event(THD* thd, char* block_arg, + uint block_len_arg); + int exec_event(struct st_master_info* mi); +#endif + + Append_block_log_event(const char* buf, int event_len); + ~Append_block_log_event() + { + } + Log_event_type get_type_code() { return APPEND_BLOCK_EVENT;} + int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;} + bool is_valid() { return block != 0; } + int write_data(IO_CACHE* file); + +#ifdef MYSQL_CLIENT + void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif +#ifndef MYSQL_CLIENT + void pack_info(String* packet); +#endif +}; + +class Delete_file_log_event: public Log_event +{ +public: + uint file_id; + +#ifndef MYSQL_CLIENT + Delete_file_log_event(THD* thd); +#endif + + Delete_file_log_event(const char* buf, int event_len); + ~Delete_file_log_event() + { + } + Log_event_type get_type_code() { return DELETE_FILE_EVENT;} + int get_data_size() { return DELETE_FILE_HEADER_LEN ;} + bool is_valid() { return file_id != 0; } + int write_data(IO_CACHE* file); + +#ifdef MYSQL_CLIENT + void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif +#ifndef MYSQL_CLIENT + void pack_info(String* packet); + int exec_event(struct st_master_info* mi); +#endif +}; + +class Execute_load_log_event: public Log_event +{ +public: + uint file_id; + +#ifndef MYSQL_CLIENT + Execute_load_log_event(THD* thd); +#endif + + Execute_load_log_event(const char* buf, int event_len); + ~Execute_load_log_event() + { + } + Log_event_type get_type_code() { return EXEC_LOAD_EVENT;} + int get_data_size() { return EXEC_LOAD_HEADER_LEN ;} + bool is_valid() { return file_id != 0; } int write_data(IO_CACHE* file); #ifdef MYSQL_CLIENT @@ -570,6 +641,7 @@ public: #endif #ifndef MYSQL_CLIENT void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif }; diff --git a/sql/mf_iocache.cc b/sql/mf_iocache.cc index bf2f2c37409..24af439961e 100644 --- a/sql/mf_iocache.cc +++ b/sql/mf_iocache.cc @@ -57,7 +57,8 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, /* There is no file in net_reading */ info->file= file; - info->pre_read = info->post_read = 0; + info->pre_close = info->pre_read = info->post_read = 0; + info->arg = 0; if (!cachesize) if (! (cachesize= my_default_record_cache_size)) DBUG_RETURN(1); /* No cache requested */ @@ -681,7 +682,10 @@ int flush_io_cache(IO_CACHE *info) int end_io_cache(IO_CACHE *info) { int error=0; + IO_CACHE_CALLBACK pre_close; DBUG_ENTER("end_io_cache"); + if((pre_close=info->pre_close)) + (*pre_close)(info); if (info->buffer) { if (info->file != -1) /* File doesn't exist */ diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 06f9e88b68b..dad4a1c2427 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -223,7 +223,7 @@ static bool opt_log,opt_update_log,opt_bin_log,opt_slow_log,opt_noacl, opt_ansi_mode=0,opt_myisam_log=0, opt_large_files=sizeof(my_off_t) > 4; bool opt_sql_bin_update = 0, opt_log_slave_updates = 0, opt_safe_show_db=0, - opt_show_slave_auth_info = 0; + opt_show_slave_auth_info = 0, opt_old_rpl_compat = 0; FILE *bootstrap_file=0; int segfaulted = 0; // ensure we do not enter SIGSEGV handler twice extern MASTER_INFO glob_mi; @@ -718,6 +718,7 @@ void clean_up(bool print_message) free_defaults(defaults_argv); my_free(charsets_list, MYF(MY_ALLOW_ZERO_PTR)); my_free(mysql_tmpdir,MYF(0)); + my_free(slave_load_tmpdir,MYF(0)); x_free(opt_bin_logname); bitmap_free(&temp_pool); free_max_user_conn(); @@ -2518,7 +2519,8 @@ enum options { OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINK, OPT_REPORT_HOST, OPT_REPORT_USER, OPT_REPORT_PASSWORD, OPT_REPORT_PORT, OPT_MAX_BINLOG_DUMP_EVENTS, OPT_SPORADIC_BINLOG_DUMP_FAIL, - OPT_SHOW_SLAVE_AUTH_INFO}; + OPT_SHOW_SLAVE_AUTH_INFO, OPT_OLD_RPL_COMPAT, + OPT_SLAVE_LOAD_TMPDIR}; static struct option long_options[] = { {"ansi", no_argument, 0, 'a'}, @@ -2611,6 +2613,7 @@ static struct option long_options[] = { OPT_SAFEMALLOC_MEM_LIMIT}, {"new", no_argument, 0, 'n'}, {"old-protocol", no_argument, 0, 'o'}, + {"old-rpl-compat", no_argument, 0, (int)OPT_OLD_RPL_COMPAT}, #ifdef ONE_THREAD {"one-thread", no_argument, 0, (int) OPT_ONE_THREAD}, #endif @@ -2659,6 +2662,7 @@ static struct option long_options[] = { {"skip-stack-trace", no_argument, 0, (int) OPT_SKIP_STACK_TRACE}, {"skip-symlink", no_argument, 0, (int) OPT_SKIP_SYMLINK}, {"skip-thread-priority", no_argument, 0, (int) OPT_SKIP_PRIOR}, + {"slave-load-tmpdir", required_argument, 0, (int) OPT_SLAVE_LOAD_TMPDIR}, {"sql-bin-update-same", no_argument, 0, (int) OPT_SQL_BIN_UPDATE_SAME}, #include "sslopt-longopts.h" #ifdef __WIN__ @@ -3311,6 +3315,12 @@ static void get_options(int argc,char **argv) safemalloc_mem_limit = atoi(optarg); #endif break; + case OPT_SLAVE_LOAD_TMPDIR: + slave_load_tmpdir = my_strdup(optarg, MYF(MY_FAE)); + break; + case OPT_OLD_RPL_COMPAT: + opt_old_rpl_compat = 1; + break; case OPT_SHOW_SLAVE_AUTH_INFO: opt_show_slave_auth_info = 1; break; @@ -4377,6 +4387,14 @@ static void fix_paths(void) mysql_tmpdir=(char*) my_realloc(mysql_tmpdir,(uint) strlen(mysql_tmpdir)+1, MYF(MY_HOLD_ON_ERROR)); } + if (!slave_load_tmpdir) + { + int copy_len; + slave_load_tmpdir = (char*) my_malloc((copy_len=strlen(mysql_tmpdir) + 1) + , MYF(MY_FAE)); + // no need to check return value, if we fail, my_malloc() never returns + memcpy(slave_load_tmpdir, mysql_tmpdir, copy_len); + } } diff --git a/sql/slave.cc b/sql/slave.cc index a00809b6994..e5da79d6871 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -24,10 +24,8 @@ #include <thr_alarm.h> #include <my_dir.h> -#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\ - "FIRST") - volatile bool slave_running = 0; +char* slave_load_tmpdir = 0; pthread_t slave_real_id; MASTER_INFO glob_mi; HASH replicate_do_table, replicate_ignore_table; @@ -41,16 +39,17 @@ THD* slave_thd = 0; // when slave thread exits, we need to remember the temporary tables so we // can re-use them on slave start -static int last_slave_errno = 0; -static char last_slave_error[1024] = ""; +int last_slave_errno = 0; +char last_slave_error[MAX_SLAVE_ERRMSG] = ""; #ifndef DBUG_OFF int disconnect_slave_event_count = 0, abort_slave_event_count = 0; -static int events_till_disconnect = -1, events_till_abort = -1; +static int events_till_disconnect = -1; +int events_till_abort = -1; static int stuck_count = 0; #endif -inline void skip_load_data_infile(NET* net); +void skip_load_data_infile(NET* net); inline bool slave_killed(THD* thd); static int init_slave_thread(THD* thd); static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi); @@ -59,8 +58,7 @@ static int safe_sleep(THD* thd, int sec); static int request_table_dump(MYSQL* mysql, const char* db, const char* table); static int create_table_from_dump(THD* thd, NET* net, const char* db, const char* table_name); -inline char* rewrite_db(char* db); -static int check_expected_error(THD* thd, int expected_error); +char* rewrite_db(char* db); static void free_table_ent(TABLE_RULE_ENT* e) { @@ -219,7 +217,16 @@ inline bool slave_killed(THD* thd) return abort_slave || abort_loop || thd->killed; } -inline void skip_load_data_infile(NET* net) +void slave_print_error(int err_code, const char* msg, ...) +{ + va_list args; + va_start(args,msg); + my_vsnprintf(last_slave_error, sizeof(last_slave_error), msg, args); + sql_print_error("Slave: %s, error_code=%d", last_slave_error, err_code); + last_slave_errno = err_code; +} + +void skip_load_data_infile(NET* net) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); @@ -227,7 +234,7 @@ inline void skip_load_data_infile(NET* net) send_ok(net); // the master expects it } -inline char* rewrite_db(char* db) +char* rewrite_db(char* db) { if(replicate_rewrite_db.is_empty() || !db) return db; I_List_iterator<i_string_pair> it(replicate_rewrite_db); @@ -904,7 +911,7 @@ server_errno=%d)", return len - 1; } -static int check_expected_error(THD* thd, int expected_error) +int check_expected_error(THD* thd, int expected_error) { switch(expected_error) { @@ -935,6 +942,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) if (ev) { int type_code = ev->get_type_code(); + int exec_res; if (ev->server_id == ::server_id || slave_skip_counter) { if(type_code == LOAD_EVENT) @@ -952,320 +960,12 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) thd->set_time(); // time the query if(!thd->log_seq) thd->log_seq = ev->log_seq; - if (!ev->when) ev->when = time(NULL); - - switch(type_code) { - case QUERY_EVENT: - { - Query_log_event* qev = (Query_log_event*)ev; - int q_len = qev->q_len; - int expected_error,actual_error = 0; - init_sql_alloc(&thd->mem_root, 8192,0); - thd->db = rewrite_db((char*)qev->db); - if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) - { - thd->query = (char*)qev->query; - thd->set_time((time_t)qev->when); - thd->current_tablenr = 0; - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - thd->query_error = 0; // clear error - thd->net.last_errno = 0; - thd->net.last_error[0] = 0; - thd->slave_proxy_id = qev->thread_id; // for temp tables - - // sanity check to make sure the master did not get a really bad - // error on the query - if (!check_expected_error(thd, (expected_error = qev->error_code))) - { - mysql_parse(thd, thd->query, q_len); - if (expected_error != - (actual_error = thd->net.last_errno) && expected_error) - { - const char* errmsg = "Slave: did not get the expected error\ - running query from master - expected: '%s'(%d), got '%s'(%d)"; - sql_print_error(errmsg, ER_SAFE(expected_error), - expected_error, - actual_error ? thd->net.last_error:"no error", - actual_error); - thd->query_error = 1; - } - else if (expected_error == actual_error) - { - thd->query_error = 0; - *last_slave_error = 0; - last_slave_errno = 0; - } - } - else - { - // master could be inconsistent, abort and tell DBA to check/fix it - thd->db = thd->query = 0; - thd->convert_set = 0; - close_thread_tables(thd); - free_root(&thd->mem_root,0); - delete ev; - return 1; - } - } - thd->db = 0; // prevent db from being freed - thd->query = 0; // just to be sure - // assume no convert for next query unless set explictly - thd->convert_set = 0; - close_thread_tables(thd); - - if (thd->query_error || thd->fatal_error) - { - sql_print_error("Slave: error running query '%s' ", - qev->query); - last_slave_errno = actual_error ? actual_error : -1; - my_snprintf(last_slave_error, sizeof(last_slave_error), - "error '%s' on query '%s'", - actual_error ? thd->net.last_error : - "unexpected success or fatal error", - qev->query - ); - free_root(&thd->mem_root,0); - delete ev; - return 1; - } - free_root(&thd->mem_root,0); - thd->log_seq = 0; - mi->inc_pos(event_len, ev->log_seq); - delete ev; - flush_master_info(mi); - break; - } - - case SLAVE_EVENT: - { - if(mysql_bin_log.is_open()) - { - Slave_log_event *sev = (Slave_log_event*)ev; - mysql_bin_log.write(sev); - } - - thd->log_seq = 0; - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - delete ev; - break; - } - - case LOAD_EVENT: - { - Load_log_event* lev = (Load_log_event*)ev; - init_sql_alloc(&thd->mem_root, 8192,0); - thd->db = rewrite_db((char*)lev->db); - thd->query = 0; - thd->query_error = 0; - - if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) - { - thd->set_time((time_t)lev->when); - thd->current_tablenr = 0; - VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; - VOID(pthread_mutex_unlock(&LOCK_thread_count)); - - TABLE_LIST tables; - bzero((char*) &tables,sizeof(tables)); - tables.db = thd->db; - tables.name = tables.real_name = (char*)lev->table_name; - tables.lock_type = TL_WRITE; - // the table will be opened in mysql_load - if(table_rules_on && !tables_ok(thd, &tables)) - { - skip_load_data_infile(net); - } - else - { - enum enum_duplicates handle_dup = DUP_IGNORE; - if(lev->sql_ex.opt_flags && REPLACE_FLAG) - handle_dup = DUP_REPLACE; - sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags && - DUMPFILE_FLAG ); - String field_term(&lev->sql_ex.field_term, 1), - enclosed(&lev->sql_ex.enclosed, 1), - line_term(&lev->sql_ex.line_term,1), - escaped(&lev->sql_ex.escaped, 1), - line_start(&lev->sql_ex.line_start, 1); - - ex.field_term = &field_term; - if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); - - ex.enclosed = &enclosed; - if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY) - ex.enclosed->length(0); - - ex.line_term = &line_term; - if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY) - ex.line_term->length(0); - - ex.line_start = &line_start; - if(lev->sql_ex.empty_flags & LINE_START_EMPTY) - ex.line_start->length(0); - - ex.escaped = &escaped; - if(lev->sql_ex.empty_flags & ESCAPED_EMPTY) - ex.escaped->length(0); - - ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG); - if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY) - ex.field_term->length(0); - - ex.skip_lines = lev->skip_lines; - - - List<Item> fields; - lev->set_fields(fields); - thd->slave_proxy_id = thd->thread_id; - thd->net.vio = net->vio; - // mysql_load will use thd->net to read the file - thd->net.pkt_nr = net->pkt_nr; - // make sure the client does not get confused - // about the packet sequence - if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1, - TL_WRITE)) - thd->query_error = 1; - if(thd->cuted_fields) - sql_print_error("Slave: load data infile at position %s in log \ -'%s' produced %d warning(s)", llstr(glob_mi.pos,llbuff), RPL_LOG_NAME, - thd->cuted_fields ); - net->pkt_nr = thd->net.pkt_nr; - } - } - else - { - // we will just ask the master to send us /dev/null if we do not - // want to load the data :-) - skip_load_data_infile(net); - } - - thd->net.vio = 0; - thd->db = 0;// prevent db from being freed - close_thread_tables(thd); - if(thd->query_error) - { - int sql_error = thd->net.last_errno; - if(!sql_error) - sql_error = ER_UNKNOWN_ERROR; - - sql_print_error("Slave: Error '%s' running load data infile ", - ER(sql_error)); - delete ev; - free_root(&thd->mem_root,0); - return 1; - } - - thd->log_seq = 0; - free_root(&thd->mem_root,0); - - if(thd->fatal_error) - { - sql_print_error("Slave: Fatal error running query '%s' ", - thd->query); - delete ev; - return 1; - } - - mi->inc_pos(event_len, ev->log_seq); - delete ev; - flush_master_info(mi); - break; - } - - case START_EVENT: - close_temporary_tables(thd); - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - delete ev; - thd->log_seq = 0; - break; - - case STOP_EVENT: - if(mi->pos > 4) // stop event should be ignored after rotate event - { - close_temporary_tables(thd); - mi->inc_pos(event_len, ev->log_seq); - flush_master_info(mi); - } - delete ev; - thd->log_seq = 0; - break; - case ROTATE_EVENT: - { - Rotate_log_event* rev = (Rotate_log_event*)ev; - int ident_len = rev->ident_len; - bool rotate_binlog = 0, write_slave_event = 0; - char* log_name = mi->log_file_name; - pthread_mutex_lock(&mi->lock); - - // rotate local binlog only if the name of remote has changed - if (!*log_name || !(log_name[ident_len] == 0 && - !memcmp(log_name, rev->new_log_ident, ident_len))) - { - write_slave_event = (!(rev->flags & LOG_EVENT_FORCED_ROTATE_F) - && mysql_bin_log.is_open()); - rotate_binlog = (*log_name && write_slave_event); - memcpy(log_name, rev->new_log_ident,ident_len ); - log_name[ident_len] = 0; - } - mi->pos = rev->pos; - mi->last_log_seq = ev->log_seq; -#ifndef DBUG_OFF - if (abort_slave_event_count) - ++events_till_abort; -#endif - if (rotate_binlog) - { - mysql_bin_log.new_file(); - mi->last_log_seq = 0; - } - pthread_cond_broadcast(&mi->cond); - pthread_mutex_unlock(&mi->lock); - flush_master_info(mi); - - if (write_slave_event) - { - Slave_log_event s(thd, mi); - if (s.master_host) - { - s.set_log_seq(0, &mysql_bin_log); - s.server_id = ::server_id; - mysql_bin_log.write(&s); - } - } - - delete ev; - thd->log_seq = 0; - break; - } - - case INTVAR_EVENT: - { - Intvar_log_event* iev = (Intvar_log_event*)ev; - switch(iev->type) - { - case LAST_INSERT_ID_EVENT: - thd->last_insert_id_used = 1; - thd->last_insert_id = iev->val; - break; - case INSERT_ID_EVENT: - thd->next_insert_id = iev->val; - break; - - } - mi->inc_pending(event_len); - delete ev; - // do not reset log_seq - break; - } - } + ev->thd = thd; + exec_res = ev->exec_event(mi); + delete ev; + return exec_res; } else { @@ -1275,7 +975,6 @@ This may also be a network problem, or just a bug in the master or slave code.\ "); return 1; } - return 0; } // slave thread @@ -1363,6 +1062,7 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) connected: + thd->slave_net = &mysql->net; // register ourselves with the master // if fails, this is not fatal - we just print the error message and go // on with life diff --git a/sql/slave.h b/sql/slave.h index 85db0b75f53..5850b57d3b3 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -5,7 +5,7 @@ #define SLAVE_NET_TIMEOUT 3600 extern ulong slave_net_timeout; - +extern char* slave_load_tmpdir; typedef struct st_master_info { @@ -70,6 +70,11 @@ typedef struct st_table_rule_ent #define TABLE_RULE_HASH_SIZE 16 #define TABLE_RULE_ARR_SIZE 16 +#define MAX_SLAVE_ERRMSG 1024 + +#define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :\ + "FIRST") + int flush_master_info(MASTER_INFO* mi); int register_slave_on_master(MYSQL* mysql); @@ -97,6 +102,10 @@ int add_table_rule(HASH* h, const char* table_spec); int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec); void init_table_rule_hash(HASH* h, bool* h_inited); void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited); +char* rewrite_db(char* db); +int check_expected_error(THD* thd, int error_code); +void skip_load_data_infile(NET* net); +void slave_print_error(int err_code, const char* msg, ...); void end_slave(); // clean up int init_master_info(MASTER_INFO* mi); @@ -109,6 +118,11 @@ extern uint32 slave_skip_counter; // we want to restart it skipping one or more events in the master log that // have caused errors, and have been manually applied by DBA already +extern int last_slave_errno; +#ifndef DBUG_OFF +extern int events_till_abort; +#endif +extern char last_slave_error[MAX_SLAVE_ERRMSG]; extern pthread_t slave_real_id; extern THD* slave_thd; extern MASTER_INFO glob_mi; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 992cd30a02c..5e0c02c5d07 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -121,6 +121,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), proc_info="login"; where="field list"; server_id = ::server_id; + slave_net = 0; server_status=SERVER_STATUS_AUTOCOMMIT; update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE; options=thd_startup_options; diff --git a/sql/sql_class.h b/sql/sql_class.h index 959460a6f4b..b22404b5b17 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -86,9 +86,7 @@ public: bool write(THD *thd, enum enum_server_command command,const char *format,...); bool write(THD *thd, const char *query, uint query_length, time_t query_start=0); - bool write(Query_log_event* event_info); // binary log write - bool write(Load_log_event* event_info); - bool write(Slave_log_event* event_info); + bool write(Log_event* event_info); // binary log write bool write(IO_CACHE *cache); int generate_new_name(char *new_name,const char *old_name); void make_log_name(char* buf, const char* log_ident); @@ -300,6 +298,8 @@ public: ulong slave_proxy_id; // in slave thread we need to know in behalf of which // thread the query is being run to replicate temp tables properly + NET* slave_net; // network connection from slave to master + THD(); ~THD(); bool store_globals(); diff --git a/sql/sql_load.cc b/sql/sql_load.cc index ce8e34b9265..999aec9b15e 100644 --- a/sql/sql_load.cc +++ b/sql/sql_load.cc @@ -20,6 +20,7 @@ #include "mysql_priv.h" #include <my_dir.h> #include <m_ctype.h> +#include "sql_repl.h" class READ_INFO { File file; @@ -32,6 +33,7 @@ class READ_INFO { int field_term_char,line_term_char,enclosed_char,escape_char; int *stack,*stack_pos; bool found_end_of_line,start_of_line,eof; + bool need_end_io_cache; IO_CACHE cache; NET *io_net; @@ -50,6 +52,18 @@ public: char unescape(char chr); int terminator(char *ptr,uint length); bool find_start_of_fields(); + // we need to force cache close before destructor is invoked to log + // the last read block + void end_io_cache() + { + ::end_io_cache(&cache); + need_end_io_cache = 0; + } + + // either this method, or we need to make cache public + // arg must be set from mysql_load() since constructor does not see + // either the table or THD value + void set_io_cache_arg(void* arg) { cache.arg = arg; } }; static int read_fixed_length(THD *thd,COPY_INFO &info,TABLE *table, @@ -67,10 +81,11 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, File file; TABLE *table; int error; - uint save_skip_lines = ex->skip_lines; String *field_term=ex->field_term,*escaped=ex->escaped, *enclosed=ex->enclosed; bool is_fifo=0; + LOAD_FILE_INFO lf_info; + char * db = table_list->db ? table_list->db : thd->db; DBUG_ENTER("mysql_load"); if (escaped->length() > 1 || enclosed->length() > 1) @@ -79,7 +94,6 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, MYF(0)); DBUG_RETURN(-1); } - if (!(table = open_ltable(thd,table_list,lock_type))) DBUG_RETURN(-1); if (!fields.elements) @@ -161,8 +175,9 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, if (!my_stat(name,&stat_info,MYF(MY_WME))) DBUG_RETURN(-1); - // the file must be: - if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others + // if we are not in slave thread, the file must be: + if (!thd->slave_thread && + !((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others #ifndef __EMX__ (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink #endif @@ -195,13 +210,27 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, DBUG_RETURN(-1); // Can't allocate buffers } + if (!opt_old_rpl_compat && mysql_bin_log.is_open()) + { + lf_info.thd = thd; + lf_info.ex = ex; + lf_info.db = db; + lf_info.table_name = table_list->real_name; + lf_info.fields = &fields; + lf_info.handle_dup = handle_duplicates; + lf_info.wrote_create_file = 0; + lf_info.last_pos_in_file = HA_POS_ERROR; + read_info.set_io_cache_arg((void*)&lf_info); + } restore_record(table,2); thd->count_cuted_fields=1; /* calc cuted fields */ thd->cuted_fields=0L; if (ex->line_term->length() && field_term->length()) { - while (ex->skip_lines--) + // ex->skip_lines needs to be preserved for logging + uint skip_lines = ex->skip_lines; + while (skip_lines--) { if (read_info.next_line()) break; @@ -240,7 +269,14 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, table->copy_blobs=0; thd->count_cuted_fields=0; /* Don`t calc cuted fields */ if (error) + { + if (!opt_old_rpl_compat && mysql_bin_log.is_open()) + { + Delete_file_log_event d(thd); + mysql_bin_log.write(&d); + } DBUG_RETURN(-1); // Error on read + } sprintf(name,ER(ER_LOAD_INFO),info.records,info.deleted, info.records-info.copied,thd->cuted_fields); send_ok(&thd->net,info.copied+info.deleted,0L,name); @@ -250,12 +286,20 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list, if (!table->file->has_transactions()) thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; - if (!read_file_from_client && mysql_bin_log.is_open()) + if (mysql_bin_log.is_open()) { - ex->skip_lines = save_skip_lines; - Load_log_event qinfo(thd, ex, table->table_name, fields, + if (opt_old_rpl_compat && !read_file_from_client) + { + Load_log_event qinfo(thd, ex, db, table->table_name, fields, handle_duplicates); - mysql_bin_log.write(&qinfo); + mysql_bin_log.write(&qinfo); + } + if (!opt_old_rpl_compat) + { + read_info.end_io_cache(); // make sure last block gets logged + Execute_load_log_event e(thd); + mysql_bin_log.write(&e); + } } DBUG_RETURN(0); } @@ -480,6 +524,13 @@ READ_INFO::READ_INFO(File file_par, uint tot_length, String &field_term, my_free((gptr) buffer,MYF(0)); /* purecov: inspected */ error=1; } + else + { + need_end_io_cache = 1; + if (!opt_old_rpl_compat && mysql_bin_log.is_open()) + cache.pre_read = cache.pre_close = + (IO_CACHE_CALLBACK)log_loaded_block; + } } } @@ -488,7 +539,8 @@ READ_INFO::~READ_INFO() { if (!error) { - end_io_cache(&cache); + if (need_end_io_cache) + ::end_io_cache(&cache); my_free((gptr) buffer,MYF(0)); error=1; } @@ -798,3 +850,4 @@ bool READ_INFO::find_start_of_fields() } return 0; } + diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 9c1fbbe4ac9..cda1a8531df 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1578,3 +1578,33 @@ err: return error; } + +int log_loaded_block(IO_CACHE* file) +{ + LOAD_FILE_INFO* lf_info; + uint block_len ; + if (!(block_len = file->rc_end - file->buffer)) + return 0; + lf_info = (LOAD_FILE_INFO*)file->arg; + if (lf_info->last_pos_in_file != HA_POS_ERROR && + lf_info->last_pos_in_file >= file->pos_in_file) + return 0; + lf_info->last_pos_in_file = file->pos_in_file; + if (lf_info->wrote_create_file) + { + Append_block_log_event a(lf_info->thd, file->buffer,block_len); + mysql_bin_log.write(&a); + } + else + { + Create_file_log_event c(lf_info->thd,lf_info->ex,lf_info->db, + lf_info->table_name, *lf_info->fields, + lf_info->handle_dup, file->buffer, + block_len); + mysql_bin_log.write(&c); + lf_info->wrote_create_file = 1; + } + return 0; +} + + diff --git a/sql/sql_repl.h b/sql/sql_repl.h index 7d40f22d8fb..3445cd67b42 100644 --- a/sql/sql_repl.h +++ b/sql/sql_repl.h @@ -12,7 +12,7 @@ typedef struct st_slave_info uint16 port; } SLAVE_INFO; -extern bool opt_show_slave_auth_info; +extern bool opt_show_slave_auth_info, opt_old_rpl_compat; extern HASH slave_list; extern char* master_host; extern my_string opt_bin_logname, master_info_file; @@ -51,4 +51,19 @@ int show_binlogs(THD* thd); extern int init_master_info(MASTER_INFO* mi); void kill_zombie_dump_threads(uint32 slave_server_id); +typedef struct st_load_file_info +{ + THD* thd; + sql_exchange* ex; + List <Item> *fields; + enum enum_duplicates handle_dup; + char* db; + char* table_name; + bool wrote_create_file; + my_off_t last_pos_in_file; +} LOAD_FILE_INFO; + +int log_loaded_block(IO_CACHE* file); + #endif + |