diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 523 |
1 files changed, 264 insertions, 259 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index f56837cb81a..7eb7c57ae40 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -1,15 +1,15 @@ /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB - + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. - + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - + You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ @@ -24,6 +24,8 @@ #include <my_dir.h> #endif /* MYSQL_CLIENT */ +#include <assert.h> + #ifdef MYSQL_CLIENT static void pretty_print_str(FILE* file, char* str, int len) { @@ -118,14 +120,14 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg): if (thd) { server_id = thd->server_id; - log_seq = thd->log_seq; when = thd->start_time; + log_pos = thd->log_pos; } else { server_id = ::server_id; - log_seq = 0; when = time(NULL); + log_pos=0; } } @@ -140,7 +142,7 @@ static void cleanup_load_tmpdir() for (i=0;i<(uint)dirp->number_off_files;i++) { file=dirp->dir_entry+i; - if (!bcmp(file->name,"SQL_LOAD-",9)) + if (!memcmp(file->name,"SQL_LOAD-",9)) my_delete(file->name,MYF(MY_WME)); } @@ -156,12 +158,12 @@ Log_event::Log_event(const char* buf, bool old_format): server_id = uint4korr(buf + SERVER_ID_OFFSET); if (old_format) { - log_seq=0; + log_pos=0; flags=0; } else { - log_seq = uint4korr(buf + LOG_SEQ_OFFSET); + log_pos = uint4korr(buf + LOG_POS_OFFSET); flags = uint2korr(buf + FLAGS_OFFSET); } #ifndef MYSQL_CLIENT @@ -172,13 +174,13 @@ Log_event::Log_event(const char* buf, bool old_format): #ifndef MYSQL_CLIENT -int Log_event::exec_event(struct st_master_info* mi) +int Log_event::exec_event(struct st_relay_log_info* rli) { - if (mi) + if (rli) { - thd->log_seq = 0; - mi->inc_pos(get_event_len(), log_seq); - flush_master_info(mi); + rli->inc_pos(get_event_len(),log_pos); + DBUG_ASSERT(rli->sql_thd != 0); + flush_relay_log_info(rli); } return 0; } @@ -224,7 +226,7 @@ void Load_log_event::pack_info(String* packet) char buf[256]; String tmp(buf, sizeof(buf)); tmp.length(0); - if (db && db_len) + if(db && db_len) { tmp.append("use "); tmp.append(db, db_len); @@ -234,11 +236,11 @@ void Load_log_event::pack_info(String* packet) tmp.append("LOAD DATA INFILE '"); tmp.append(fname, fname_len); tmp.append("' ", 2); - if (sql_ex.opt_flags && REPLACE_FLAG ) + if(sql_ex.opt_flags && REPLACE_FLAG ) tmp.append(" REPLACE "); - else if (sql_ex.opt_flags && IGNORE_FLAG ) + else if(sql_ex.opt_flags && IGNORE_FLAG ) tmp.append(" IGNORE "); - + tmp.append("INTO TABLE "); tmp.append(table_name); if (sql_ex.field_term_len) @@ -254,13 +256,13 @@ void Load_log_event::pack_info(String* packet) tmp.append( " ENCLOSED BY "); pretty_print_str(&tmp, sql_ex.enclosed, sql_ex.enclosed_len); } - + if (sql_ex.escaped_len) { tmp.append( " ESCAPED BY "); pretty_print_str(&tmp, sql_ex.escaped, sql_ex.escaped_len); } - + if (sql_ex.line_term_len) { tmp.append(" LINES TERMINATED BY "); @@ -272,20 +274,21 @@ void Load_log_event::pack_info(String* packet) tmp.append(" LINES STARTING BY "); pretty_print_str(&tmp, sql_ex.line_start, sql_ex.line_start_len); } - + if ((int)skip_lines > 0) tmp.append( " IGNORE %ld LINES ", (long) skip_lines); if (num_fields) { + uint i; const char* field = fields; tmp.append(" ("); - for (uint i = 0; i < num_fields; i++) + for(i = 0; i < num_fields; i++) { - if (i) + if(i) tmp.append(" ,"); tmp.append( field); - + field += field_lens[i] + 1; } tmp.append(')'); @@ -303,7 +306,7 @@ void Rotate_log_event::pack_info(String* packet) tmp.append(new_log_ident, ident_len); tmp.append(";pos="); tmp.append(llstr(pos,buf)); - if (flags & LOG_EVENT_FORCED_ROTATE_F) + if(flags & LOG_EVENT_FORCED_ROTATE_F) tmp.append("; forced by master"); net_store_data(packet, tmp.ptr(), tmp.length()); } @@ -344,7 +347,7 @@ void Log_event::init_show_field_list(List<Item>* field_list) field_list->push_back(new Item_empty_string("Pos", 20)); field_list->push_back(new Item_empty_string("Event_type", 20)); field_list->push_back(new Item_empty_string("Server_id", 20)); - field_list->push_back(new Item_empty_string("Log_seq", 20)); + field_list->push_back(new Item_empty_string("Orig_log_pos", 20)); field_list->push_back(new Item_empty_string("Info", 20)); } @@ -355,14 +358,14 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos) const char* event_type; if (p) log_name = p + 1; - + packet->length(0); net_store_data(packet, log_name, strlen(log_name)); net_store_data(packet, (longlong) pos); event_type = get_type_str(); net_store_data(packet, event_type, strlen(event_type)); net_store_data(packet, server_id); - net_store_data(packet, log_seq); + net_store_data(packet, log_pos); pack_info(packet); return my_net_write(&thd->net, (char*)packet->ptr(), packet->length()); } @@ -391,7 +394,7 @@ int Log_event::write_header(IO_CACHE* file) long tmp=get_data_size() + LOG_EVENT_HEADER_LEN; int4store(pos, tmp); pos += 4; - int4store(pos, log_seq); + int4store(pos, log_pos); pos += 4; int2store(pos, flags); pos += 2; @@ -413,7 +416,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, // if the read hits eof, we must report it as eof // so the caller will know it can go into cond_wait to be woken up // on the next update to the log - if (!file->error) return LOG_READ_EOF; + if(!file->error) return LOG_READ_EOF; return file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO; } data_len = uint4korr(buf + EVENT_LEN_OFFSET); @@ -429,7 +432,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, { if (packet->append(file, data_len)) { - if (log_lock) + if(log_lock) pthread_mutex_unlock(log_lock); // here we should never hit eof in a non-error condtion // eof means we are reading the event partially, which should @@ -444,18 +447,17 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, #endif // MYSQL_CLIENT #ifndef MYSQL_CLIENT -#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock); +#define UNLOCK_MUTEX if(log_lock) pthread_mutex_unlock(log_lock); #else #define UNLOCK_MUTEX #endif #ifndef MYSQL_CLIENT -#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock); +#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock); #else #define LOCK_MUTEX #endif - // allocates memory - the caller is responsible for clean-up #ifndef MYSQL_CLIENT Log_event* Log_event::read_log_event(IO_CACHE* file, @@ -512,7 +514,8 @@ err: UNLOCK_MUTEX; if (error) { - sql_print_error(error); + sql_print_error("Error in Log_event::read_log_event(): '%s', \ +data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]); my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); } return res; @@ -524,9 +527,9 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, if (event_len < EVENT_LEN_OFFSET || (uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET)) return NULL; // general sanity check - will fail on a partial read - + Log_event* ev = NULL; - + switch(buf[EVENT_TYPE_OFFSET]) { case QUERY_EVENT: @@ -580,9 +583,11 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, #ifdef MYSQL_CLIENT void Log_event::print_header(FILE* file) { + char llbuff[22]; fputc('#', file); print_timestamp(file); - fprintf(file, " server id %d ", server_id); + fprintf(file, " server id %d log_pos %s ", server_id, + llstr(log_pos,llbuff)); } void Log_event::print_timestamp(FILE* file, time_t* ts) @@ -678,7 +683,7 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len, // EVENT_LEN_OFFSET int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; uint ident_offset; - if (event_len < header_size) + if(event_len < header_size) return; buf += header_size; if (old_format) @@ -774,12 +779,12 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) bool same_db = 0; - if (db && last_db) + if(db && last_db) { - if (!(same_db = !memcmp(last_db, db, db_len + 1))) + if(!(same_db = !memcmp(last_db, db, db_len + 1))) memcpy(last_db, db, db_len + 1); } - + if (db && db[0] && !same_db) fprintf(file, "use %s;\n", db); end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); @@ -795,7 +800,7 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) int Query_log_event::write_data(IO_CACHE* file) { if (!query) return -1; - + char buf[QUERY_HEADER_LEN]; int4store(buf + Q_THREAD_ID_OFFSET, thread_id); int4store(buf + Q_EXEC_TIME_OFFSET, exec_time); @@ -837,7 +842,7 @@ int Intvar_log_event::write_data(IO_CACHE* file) void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) { char llbuff[22]; - if (!short_form) + if(!short_form) { print_header(file); fprintf(file, "\tIntvar\n"); @@ -855,7 +860,7 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) } fprintf(file, "%s;\n", llstr(val,llbuff)); fflush(file); - + } #endif @@ -970,74 +975,76 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) #ifndef MYSQL_CLIENT Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, const char* db_arg, const char* table_name_arg, - List<Item>& fields_arg, - enum enum_duplicates handle_dup): - Log_event(thd), fields(0), field_lens(0), - table_name(table_name_arg), db(db_arg), fname(ex->file_name), - thread_id(thd->thread_id), num_fields(0), field_block_len(0) -{ - time_t end_time; - time(&end_time); - exec_time = (ulong) (end_time - thd->start_time); - db_len = (db) ? (uint32) strlen(db) : 0; - table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; - fname_len = (fname) ? (uint) strlen(fname) : 0; - sql_ex.field_term = (char*) ex->field_term->ptr(); - sql_ex.field_term_len = (uint8) ex->field_term->length(); - sql_ex.enclosed = (char*) ex->enclosed->ptr(); - sql_ex.enclosed_len = (uint8) ex->enclosed->length(); - sql_ex.line_term = (char*) ex->line_term->ptr(); - sql_ex.line_term_len = (uint8) ex->line_term->length(); - sql_ex.line_start = (char*) ex->line_start->ptr(); - sql_ex.line_start_len = (uint8) ex->line_start->length(); - sql_ex.escaped = (char*) ex->escaped->ptr(); - sql_ex.escaped_len = (uint8) ex->escaped->length(); - sql_ex.opt_flags = 0; - sql_ex.cached_new_format = -1; - - if (ex->dumpfile) - sql_ex.opt_flags |= DUMPFILE_FLAG; - if (ex->opt_enclosed) - sql_ex.opt_flags |= OPT_ENCLOSED_FLAG; - - sql_ex.empty_flags = 0; - - switch(handle_dup) { - case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break; - case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break; - case DUP_ERROR: break; - } - - - if (!ex->field_term->length()) - sql_ex.empty_flags |= FIELD_TERM_EMPTY; - if (!ex->enclosed->length()) - sql_ex.empty_flags |= ENCLOSED_EMPTY; - if (!ex->line_term->length()) - sql_ex.empty_flags |= LINE_TERM_EMPTY; - if (!ex->line_start->length()) - sql_ex.empty_flags |= LINE_START_EMPTY; - if (!ex->escaped->length()) - sql_ex.empty_flags |= ESCAPED_EMPTY; - - skip_lines = ex->skip_lines; - - List_iterator<Item> li(fields_arg); - field_lens_buf.length(0); - fields_buf.length(0); - Item* item; - while((item = li++)) - { - num_fields++; - uchar len = (uchar) strlen(item->name); - field_block_len += len + 1; - fields_buf.append(item->name, len + 1); - field_lens_buf.append((char*)&len, 1); - } - - field_lens = (const uchar*)field_lens_buf.ptr(); - fields = fields_buf.ptr(); -} + List<Item>& fields_arg, enum enum_duplicates handle_dup): + Log_event(thd),thread_id(thd->thread_id), + num_fields(0),fields(0),field_lens(0),field_block_len(0), + table_name(table_name_arg), + db(db_arg), + fname(ex->file_name) + { + time_t end_time; + time(&end_time); + exec_time = (ulong) (end_time - thd->start_time); + db_len = (db) ? (uint32) strlen(db) : 0; + table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; + fname_len = (fname) ? (uint) strlen(fname) : 0; + sql_ex.field_term = (char*) ex->field_term->ptr(); + sql_ex.field_term_len = (uint8) ex->field_term->length(); + sql_ex.enclosed = (char*) ex->enclosed->ptr(); + sql_ex.enclosed_len = (uint8) ex->enclosed->length(); + sql_ex.line_term = (char*) ex->line_term->ptr(); + sql_ex.line_term_len = (uint8) ex->line_term->length(); + sql_ex.line_start = (char*) ex->line_start->ptr(); + sql_ex.line_start_len = (uint8) ex->line_start->length(); + sql_ex.escaped = (char*) ex->escaped->ptr(); + sql_ex.escaped_len = (uint8) ex->escaped->length(); + sql_ex.opt_flags = 0; + sql_ex.cached_new_format = -1; + + if(ex->dumpfile) + sql_ex.opt_flags |= DUMPFILE_FLAG; + if(ex->opt_enclosed) + sql_ex.opt_flags |= OPT_ENCLOSED_FLAG; + + sql_ex.empty_flags = 0; + + switch(handle_dup) + { + case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break; + case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break; + case DUP_ERROR: break; + } + + + if(!ex->field_term->length()) + sql_ex.empty_flags |= FIELD_TERM_EMPTY; + if(!ex->enclosed->length()) + sql_ex.empty_flags |= ENCLOSED_EMPTY; + if(!ex->line_term->length()) + sql_ex.empty_flags |= LINE_TERM_EMPTY; + if(!ex->line_start->length()) + sql_ex.empty_flags |= LINE_START_EMPTY; + if(!ex->escaped->length()) + sql_ex.empty_flags |= ESCAPED_EMPTY; + + skip_lines = ex->skip_lines; + + List_iterator<Item> li(fields_arg); + field_lens_buf.length(0); + fields_buf.length(0); + Item* item; + while((item = li++)) + { + num_fields++; + uchar len = (uchar) strlen(item->name); + field_block_len += len + 1; + fields_buf.append(item->name, len + 1); + field_lens_buf.append((char*)&len, 1); + } + + field_lens = (const uchar*)field_lens_buf.ptr(); + fields = fields_buf.ptr(); + } #endif @@ -1045,8 +1052,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, // constructed event Load_log_event::Load_log_event(const char* buf, int event_len, bool old_format): - Log_event(buf, old_format),fields(0), - field_lens(0), num_fields(0), field_block_len(0) + Log_event(buf, old_format),num_fields(0),fields(0), + field_lens(0),field_block_len(0), + table_name(0),db(0),fname(0) { if (!event_len) // derived class, will call copy_log_event() itself return; @@ -1066,7 +1074,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, table_name_len = (uint)data_head[L_TBL_LEN_OFFSET]; db_len = (uint)data_head[L_DB_LEN_OFFSET]; num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET); - + int body_offset = get_data_body_offset(); if ((int) event_len < body_offset) return 1; @@ -1076,7 +1084,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, buf_end, buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) return 1; - + data_len = event_len - body_offset; if (num_fields > data_len) // simple sanity check against corruption return 1; @@ -1108,43 +1116,43 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) bool same_db = 0; - if (db && last_db) + if(db && last_db) { - if (!(same_db = !memcmp(last_db, db, db_len + 1))) + if(!(same_db = !memcmp(last_db, db, db_len + 1))) memcpy(last_db, db, db_len + 1); } - - if (db && db[0] && !same_db) + + if(db && db[0] && !same_db) fprintf(file, "use %s;\n", db); fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname); - if (sql_ex.opt_flags && REPLACE_FLAG ) + if(sql_ex.opt_flags && REPLACE_FLAG ) fprintf(file," REPLACE "); - else if (sql_ex.opt_flags && IGNORE_FLAG ) + else if(sql_ex.opt_flags && IGNORE_FLAG ) fprintf(file," IGNORE "); - + fprintf(file, "INTO TABLE %s ", table_name); - if (sql_ex.field_term) + if(sql_ex.field_term) { fprintf(file, " FIELDS TERMINATED BY "); pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len); } - if (sql_ex.enclosed) + if(sql_ex.enclosed) { - if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) + if(sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) fprintf(file," OPTIONALLY "); fprintf(file, " ENCLOSED BY "); pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len); } - + if (sql_ex.escaped) { fprintf(file, " ESCAPED BY "); pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len); } - + if (sql_ex.line_term) { fprintf(file," LINES TERMINATED BY "); @@ -1156,8 +1164,8 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) fprintf(file," LINES STARTING BY "); pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); } - - if ((int)skip_lines > 0) + + if((int)skip_lines > 0) fprintf(file, " IGNORE %ld LINES ", (long) skip_lines); if (num_fields) @@ -1165,12 +1173,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) uint i; const char* field = fields; fprintf( file, " ("); - for (i = 0; i < num_fields; i++) + for(i = 0; i < num_fields; i++) { - if (i) + if(i) fputc(',', file); fprintf(file, field); - + field += field_lens[i] + 1; } fputc(')', file); @@ -1183,46 +1191,53 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) #ifndef MYSQL_CLIENT -void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log) +void Log_event::set_log_pos(MYSQL_LOG* log) { - log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++; + if (!log_pos) + log_pos = my_b_tell(&log->log_file); } - void Load_log_event::set_fields(List<Item> &fields) { uint i; const char* field = this->fields; - for (i = 0; i < num_fields; i++) - { - fields.push_back(new Item_field(db, table_name, field)); - field += field_lens[i] + 1; - } - + for(i = 0; i < num_fields; i++) + { + fields.push_back(new Item_field(db, table_name, field)); + field += field_lens[i] + 1; + } + } -Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi): +Slave_log_event::Slave_log_event(THD* thd_arg, + struct st_relay_log_info* rli): Log_event(thd_arg),mem_pool(0),master_host(0) { - if (!mi->inited) + if(!rli->inited) return; - pthread_mutex_lock(&mi->lock); + + MASTER_INFO* mi = rli->mi; + // TODO: re-write this better without holding both + // locks at the same time + pthread_mutex_lock(&mi->data_lock); + pthread_mutex_lock(&rli->data_lock); master_host_len = strlen(mi->host); - master_log_len = strlen(mi->log_file_name); + master_log_len = strlen(rli->master_log_name); // on OOM, just do not initialize the structure and print the error - if ((mem_pool = (char*)my_malloc(get_data_size() + 1, + if((mem_pool = (char*)my_malloc(get_data_size() + 1, MYF(MY_WME)))) { master_host = mem_pool + SL_MASTER_HOST_OFFSET ; memcpy(master_host, mi->host, master_host_len + 1); master_log = master_host + master_host_len + 1; - memcpy(master_log, mi->log_file_name, master_log_len + 1); + memcpy(master_log, rli->master_log_name, master_log_len + 1); master_port = mi->port; - master_pos = mi->pos; + master_pos = rli->master_log_pos; } else sql_print_error("Out of memory while recording slave event"); - pthread_mutex_unlock(&mi->lock); + pthread_mutex_unlock(&rli->data_lock); + pthread_mutex_unlock(&mi->data_lock); } @@ -1239,7 +1254,7 @@ Slave_log_event::~Slave_log_event() void Slave_log_event::print(FILE* file, bool short_form, char* last_db) { char llbuff[22]; - if (short_form) + if(short_form) return; print_header(file); fputc('\n', file); @@ -1271,7 +1286,7 @@ void Slave_log_event::init_from_mem_pool(int data_size) master_host_len = strlen(master_host); // safety master_log = master_host + master_host_len + 1; - if (master_log > mem_pool + data_size) + if(master_log > mem_pool + data_size) { master_host = 0; return; @@ -1283,9 +1298,9 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len): Log_event(buf,0),mem_pool(0),master_host(0) { event_len -= LOG_EVENT_HEADER_LEN; - if (event_len < 0) + if(event_len < 0) return; - if (!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) + if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) return; memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len); mem_pool[event_len] = 0; @@ -1388,11 +1403,11 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, { } #endif - + Append_block_log_event::Append_block_log_event(const char* buf, int len): Log_event(buf, 0),block(0) { - if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) + if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) return; file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD; @@ -1444,7 +1459,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg): Delete_file_log_event::Delete_file_log_event(const char* buf, int len): Log_event(buf, 0),file_id(0) { - if ((uint)len < DELETE_FILE_EVENT_OVERHEAD) + if((uint)len < DELETE_FILE_EVENT_OVERHEAD) return; file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); } @@ -1487,11 +1502,11 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg): { } #endif - + Execute_load_log_event::Execute_load_log_event(const char* buf,int len): Log_event(buf, 0),file_id(0) { - if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD) + if((uint)len < EXEC_LOAD_EVENT_OVERHEAD) return; file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET); } @@ -1529,18 +1544,11 @@ void Execute_load_log_event::pack_info(String* packet) #endif #ifndef MYSQL_CLIENT - -int ignored_error_code(int err_code) -{ - return use_slave_mask && bitmap_is_set(&slave_error_mask, err_code); -} - -int Query_log_event::exec_event(struct st_master_info* mi) +int Query_log_event::exec_event(struct st_relay_log_info* rli) { int expected_error,actual_error = 0; init_sql_alloc(&thd->mem_root, 8192,0); - thd->db= rewrite_db((char*)db); - thd->db_length=strlen(thd->db); + thd->db = rewrite_db((char*)db); if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) { thd->query = (char*)query; @@ -1553,18 +1561,14 @@ int Query_log_event::exec_event(struct st_master_info* mi) thd->net.last_errno = 0; thd->net.last_error[0] = 0; thd->slave_proxy_id = thread_id; // for temp tables - - /* - sanity check to make sure the master did not get a really bad - error on the query - */ - if (ignored_error_code((expected_error=error_code)) || - !check_expected_error(thd, expected_error)) + + // sanity check to make sure the master did not get a really bad + // error on the query + if (!check_expected_error(thd,rli,(expected_error = error_code))) { mysql_parse(thd, thd->query, q_len); if (expected_error != - (actual_error = thd->net.last_errno) && expected_error && - !ignored_error_code(actual_error)) + (actual_error = thd->net.last_errno) && expected_error) { const char* errmsg = "Slave: did not get the expected error\ running query from master - expected: '%s' (%d), got '%s' (%d)"; @@ -1574,52 +1578,49 @@ int Query_log_event::exec_event(struct st_master_info* mi) actual_error); thd->query_error = 1; } - else if (expected_error == actual_error || - ignored_error_code(actual_error)) + else if (expected_error == actual_error) { thd->query_error = 0; - *last_slave_error = 0; - last_slave_errno = 0; + *rli->last_slave_error = 0; + rli->last_slave_errno = 0; } } else { // master could be inconsistent, abort and tell DBA to check/fix it - thd->db= thd->query= 0; - thd->db_length=0; + thd->db = thd->query = 0; thd->convert_set = 0; close_thread_tables(thd); free_root(&thd->mem_root,0); return 1; } } - thd->db= 0; // prevent db from being freed - thd->db_length=0; + thd->db = 0; // prevent db from being freed thd->query = 0; // just to be sure // assume no convert for next query unless set explictly thd->convert_set = 0; close_thread_tables(thd); - + if (thd->query_error || thd->fatal_error) { - slave_print_error(actual_error, "error '%s' on query '%s'", + slave_print_error(rli,actual_error, "error '%s' on query '%s'", actual_error ? thd->net.last_error : "unexpected success or fatal error", query); free_root(&thd->mem_root,0); return 1; } free_root(&thd->mem_root,0); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Load_log_event::exec_event(NET* net, struct st_master_info* mi) +int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) { init_sql_alloc(&thd->mem_root, 8192,0); - thd->db= rewrite_db((char*)db); + thd->db = rewrite_db((char*)db); thd->query = 0; thd->query_error = 0; - - if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) + + if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) { thd->set_time((time_t)when); thd->current_tablenr = 0; @@ -1633,8 +1634,9 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) tables.name = tables.real_name = (char*)table_name; tables.lock_type = TL_WRITE; // the table will be opened in mysql_load - if (table_rules_on && !tables_ok(thd, &tables)) + if(table_rules_on && !tables_ok(thd, &tables)) { + // TODO: this is a bug - this needs to be moved to the I/O thread if (net) skip_load_data_infile(net); } @@ -1651,7 +1653,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) String line_term(sql_ex.line_term,sql_ex.line_term_len); String line_start(sql_ex.line_start,sql_ex.line_start_len); String escaped(sql_ex.escaped,sql_ex.escaped_len); - + ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG); if (sql_ex.empty_flags & FIELD_TERM_EMPTY) ex.field_term->length(0); @@ -1668,14 +1670,14 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) // about the packet sequence thd->net.pkt_nr = net->pkt_nr; } - if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, + if(mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, TL_WRITE)) thd->query_error = 1; - if (thd->cuted_fields) + if(thd->cuted_fields) sql_print_error("Slave: load data infile at position %s in log \ -'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME, +'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME, thd->cuted_fields ); - if (net) + if(net) net->pkt_nr = thd->net.pkt_nr; } } @@ -1683,65 +1685,71 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi) { // we will just ask the master to send us /dev/null if we do not // want to load the data + // TODO: this a bug - needs to be done in I/O thread if (net) skip_load_data_infile(net); } - + thd->net.vio = 0; - thd->db= 0;// prevent db from being freed - thd->db_length=0; + thd->db = 0;// prevent db from being freed close_thread_tables(thd); - if (thd->query_error) + if(thd->query_error) { int sql_error = thd->net.last_errno; if (!sql_error) sql_error = ER_UNKNOWN_ERROR; - - slave_print_error(sql_error, "Slave: Error '%s' running load data infile ", + + slave_print_error(rli,sql_error, + "Slave: Error '%s' running load data infile ", ER_SAFE(sql_error)); free_root(&thd->mem_root,0); return 1; } free_root(&thd->mem_root,0); - - if (thd->fatal_error) + + if(thd->fatal_error) { sql_print_error("Slave: Fatal error running LOAD DATA INFILE "); return 1; } - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Start_log_event::exec_event(struct st_master_info* mi) +int Start_log_event::exec_event(struct st_relay_log_info* rli) { - if (!mi->old_format) + if (!rli->mi->old_format) { close_temporary_tables(thd); cleanup_load_tmpdir(); } - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Stop_log_event::exec_event(struct st_master_info* mi) +int Stop_log_event::exec_event(struct st_relay_log_info* rli) { - if (mi->pos > 4) // stop event should be ignored after rotate event + // do not clean up immediately after rotate event + if (rli->master_log_pos > 4) { close_temporary_tables(thd); cleanup_load_tmpdir(); - mi->inc_pos(get_event_len(), log_seq); - flush_master_info(mi); } - thd->log_seq = 0; + // we do not want to update master_log pos because we get a rotate event + // before stop, so by now master_log_name is set to the next log + // if we updated it, we will have incorrect master coordinates and this + // could give false triggers in MASTER_POS_WAIT() that we have reached + // the targed position when in fact we have not + rli->inc_pos(get_event_len(), 0); + flush_relay_log_info(rli); return 0; } -int Rotate_log_event::exec_event(struct st_master_info* mi) +int Rotate_log_event::exec_event(struct st_relay_log_info* rli) { bool rotate_binlog = 0, write_slave_event = 0; - char* log_name = mi->log_file_name; - pthread_mutex_lock(&mi->lock); - + char* log_name = rli->master_log_name; + pthread_mutex_lock(&rli->data_lock); + // TODO: probably needs re-write // rotate local binlog only if the name of remote has changed if (!*log_name || !(log_name[ident_len] == 0 && !memcmp(log_name, new_log_ident, ident_len))) @@ -1749,41 +1757,38 @@ int Rotate_log_event::exec_event(struct st_master_info* mi) write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F) && mysql_bin_log.is_open()); rotate_binlog = (*log_name && write_slave_event); - memcpy(log_name, new_log_ident,ident_len ); + if (ident_len >= sizeof(rli->master_log_name)) + return 1; + memcpy(log_name, new_log_ident,ident_len); log_name[ident_len] = 0; } - mi->pos = pos; - mi->last_log_seq = log_seq; -#ifndef DBUG_OFF - if (abort_slave_event_count) - ++events_till_abort; -#endif + rli->master_log_pos = pos; + rli->relay_log_pos += get_event_len(); if (rotate_binlog) { mysql_bin_log.new_file(); - mi->last_log_seq = 0; + rli->master_log_pos = 4; } - pthread_cond_broadcast(&mi->cond); - pthread_mutex_unlock(&mi->lock); - flush_master_info(mi); - + pthread_cond_broadcast(&rli->data_cond); + pthread_mutex_unlock(&rli->data_lock); + flush_relay_log_info(rli); + if (write_slave_event) { - Slave_log_event s(thd, mi); + Slave_log_event s(thd, rli); if (s.master_host) { - s.set_log_seq(0, &mysql_bin_log); + s.set_log_pos(&mysql_bin_log); s.server_id = ::server_id; mysql_bin_log.write(&s); } } - thd->log_seq = 0; return 0; } -int Intvar_log_event::exec_event(struct st_master_info* mi) +int Intvar_log_event::exec_event(struct st_relay_log_info* rli) { - switch(type) + switch (type) { case LAST_INSERT_ID_EVENT: thd->last_insert_id_used = 1; @@ -1793,18 +1798,18 @@ int Intvar_log_event::exec_event(struct st_master_info* mi) thd->next_insert_id = val; break; } - mi->inc_pending(get_event_len()); + rli->inc_pending(get_event_len()); return 0; } -int Slave_log_event::exec_event(struct st_master_info* mi) +int Slave_log_event::exec_event(struct st_relay_log_info* rli) { - if (mysql_bin_log.is_open()) + if(mysql_bin_log.is_open()) mysql_bin_log.write(this); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Create_file_log_event::exec_event(struct st_master_info* mi) +int Create_file_log_event::exec_event(struct st_relay_log_info* rli) { char fname_buf[FN_REFLEN+10]; char *p; @@ -1820,10 +1825,10 @@ int Create_file_log_event::exec_event(struct st_master_info* mi) init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { - slave_print_error(my_errno, "Could not open file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); goto err; } - + // a trick to avoid allocating another buffer strmov(p, ".data"); fname = fname_buf; @@ -1831,22 +1836,22 @@ int Create_file_log_event::exec_event(struct st_master_info* mi) if (write_base(&file)) { strmov(p, ".info"); // to have it right in the error message - slave_print_error(my_errno, "Could not write to file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf); goto err; } end_io_cache(&file); my_close(fd, MYF(0)); - + // fname_buf now already has .data, not .info, because we did our trick if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC, MYF(MY_WME))) < 0) { - slave_print_error(my_errno, "Could not open file '%s'", fname_buf); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(my_errno, "Write to '%s' failed", fname_buf); + slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf); goto err; } if (mysql_bin_log.is_open()) @@ -1857,10 +1862,10 @@ err: end_io_cache(&file); if (fd >= 0) my_close(fd, MYF(0)); - return error ? 1 : Log_event::exec_event(mi); + return error ? 1 : Log_event::exec_event(rli); } -int Delete_file_log_event::exec_event(struct st_master_info* mi) +int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1871,10 +1876,10 @@ int Delete_file_log_event::exec_event(struct st_master_info* mi) (void)my_delete(fname, MYF(MY_WME)); if (mysql_bin_log.is_open()) mysql_bin_log.write(this); - return Log_event::exec_event(mi); + return Log_event::exec_event(rli); } -int Append_block_log_event::exec_event(struct st_master_info* mi) +int Append_block_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1884,12 +1889,12 @@ int Append_block_log_event::exec_event(struct st_master_info* mi) memcpy(p, ".data", 6); if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0) { - slave_print_error(my_errno, "Could not open file '%s'", fname); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(my_errno, "Write to '%s' failed", fname); + slave_print_error(rli,my_errno, "Write to '%s' failed", fname); goto err; } if (mysql_bin_log.is_open()) @@ -1898,10 +1903,10 @@ int Append_block_log_event::exec_event(struct st_master_info* mi) err: if (fd >= 0) my_close(fd, MYF(0)); - return error ? error : Log_event::exec_event(mi); + return error ? error : Log_event::exec_event(rli); } -int Execute_load_log_event::exec_event(struct st_master_info* mi) +int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; char* p; @@ -1917,7 +1922,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { - slave_print_error(my_errno, "Could not open file '%s'", fname); + slave_print_error(rli,my_errno, "Could not open file '%s'", fname); goto err; } if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, @@ -1925,7 +1930,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) (bool)0)) || lev->get_type_code() != NEW_LOAD_EVENT) { - slave_print_error(0, "File '%s' appears corrupted", fname); + slave_print_error(rli,0, "File '%s' appears corrupted", fname); goto err; } // we want to disable binary logging in slave thread @@ -1938,7 +1943,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi) lev->thd = thd; if (lev->exec_event(0,0)) { - slave_print_error(my_errno, "Failed executing load from '%s'", fname); + slave_print_error(rli,my_errno, "Failed executing load from '%s'", fname); thd->options = save_options; goto err; } @@ -1954,7 +1959,7 @@ err: end_io_cache(&file); if (fd >= 0) my_close(fd, MYF(0)); - return error ? error : Log_event::exec_event(mi); + return error ? error : Log_event::exec_event(rli); } |