summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc523
1 files changed, 264 insertions, 259 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index f56837cb81a..7eb7c57ae40 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1,15 +1,15 @@
/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
-
+
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
-
+
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
-
+
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
@@ -24,6 +24,8 @@
#include <my_dir.h>
#endif /* MYSQL_CLIENT */
+#include <assert.h>
+
#ifdef MYSQL_CLIENT
static void pretty_print_str(FILE* file, char* str, int len)
{
@@ -118,14 +120,14 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg):
if (thd)
{
server_id = thd->server_id;
- log_seq = thd->log_seq;
when = thd->start_time;
+ log_pos = thd->log_pos;
}
else
{
server_id = ::server_id;
- log_seq = 0;
when = time(NULL);
+ log_pos=0;
}
}
@@ -140,7 +142,7 @@ static void cleanup_load_tmpdir()
for (i=0;i<(uint)dirp->number_off_files;i++)
{
file=dirp->dir_entry+i;
- if (!bcmp(file->name,"SQL_LOAD-",9))
+ if (!memcmp(file->name,"SQL_LOAD-",9))
my_delete(file->name,MYF(MY_WME));
}
@@ -156,12 +158,12 @@ Log_event::Log_event(const char* buf, bool old_format):
server_id = uint4korr(buf + SERVER_ID_OFFSET);
if (old_format)
{
- log_seq=0;
+ log_pos=0;
flags=0;
}
else
{
- log_seq = uint4korr(buf + LOG_SEQ_OFFSET);
+ log_pos = uint4korr(buf + LOG_POS_OFFSET);
flags = uint2korr(buf + FLAGS_OFFSET);
}
#ifndef MYSQL_CLIENT
@@ -172,13 +174,13 @@ Log_event::Log_event(const char* buf, bool old_format):
#ifndef MYSQL_CLIENT
-int Log_event::exec_event(struct st_master_info* mi)
+int Log_event::exec_event(struct st_relay_log_info* rli)
{
- if (mi)
+ if (rli)
{
- thd->log_seq = 0;
- mi->inc_pos(get_event_len(), log_seq);
- flush_master_info(mi);
+ rli->inc_pos(get_event_len(),log_pos);
+ DBUG_ASSERT(rli->sql_thd != 0);
+ flush_relay_log_info(rli);
}
return 0;
}
@@ -224,7 +226,7 @@ void Load_log_event::pack_info(String* packet)
char buf[256];
String tmp(buf, sizeof(buf));
tmp.length(0);
- if (db && db_len)
+ if(db && db_len)
{
tmp.append("use ");
tmp.append(db, db_len);
@@ -234,11 +236,11 @@ void Load_log_event::pack_info(String* packet)
tmp.append("LOAD DATA INFILE '");
tmp.append(fname, fname_len);
tmp.append("' ", 2);
- if (sql_ex.opt_flags && REPLACE_FLAG )
+ if(sql_ex.opt_flags && REPLACE_FLAG )
tmp.append(" REPLACE ");
- else if (sql_ex.opt_flags && IGNORE_FLAG )
+ else if(sql_ex.opt_flags && IGNORE_FLAG )
tmp.append(" IGNORE ");
-
+
tmp.append("INTO TABLE ");
tmp.append(table_name);
if (sql_ex.field_term_len)
@@ -254,13 +256,13 @@ void Load_log_event::pack_info(String* packet)
tmp.append( " ENCLOSED BY ");
pretty_print_str(&tmp, sql_ex.enclosed, sql_ex.enclosed_len);
}
-
+
if (sql_ex.escaped_len)
{
tmp.append( " ESCAPED BY ");
pretty_print_str(&tmp, sql_ex.escaped, sql_ex.escaped_len);
}
-
+
if (sql_ex.line_term_len)
{
tmp.append(" LINES TERMINATED BY ");
@@ -272,20 +274,21 @@ void Load_log_event::pack_info(String* packet)
tmp.append(" LINES STARTING BY ");
pretty_print_str(&tmp, sql_ex.line_start, sql_ex.line_start_len);
}
-
+
if ((int)skip_lines > 0)
tmp.append( " IGNORE %ld LINES ", (long) skip_lines);
if (num_fields)
{
+ uint i;
const char* field = fields;
tmp.append(" (");
- for (uint i = 0; i < num_fields; i++)
+ for(i = 0; i < num_fields; i++)
{
- if (i)
+ if(i)
tmp.append(" ,");
tmp.append( field);
-
+
field += field_lens[i] + 1;
}
tmp.append(')');
@@ -303,7 +306,7 @@ void Rotate_log_event::pack_info(String* packet)
tmp.append(new_log_ident, ident_len);
tmp.append(";pos=");
tmp.append(llstr(pos,buf));
- if (flags & LOG_EVENT_FORCED_ROTATE_F)
+ if(flags & LOG_EVENT_FORCED_ROTATE_F)
tmp.append("; forced by master");
net_store_data(packet, tmp.ptr(), tmp.length());
}
@@ -344,7 +347,7 @@ void Log_event::init_show_field_list(List<Item>* field_list)
field_list->push_back(new Item_empty_string("Pos", 20));
field_list->push_back(new Item_empty_string("Event_type", 20));
field_list->push_back(new Item_empty_string("Server_id", 20));
- field_list->push_back(new Item_empty_string("Log_seq", 20));
+ field_list->push_back(new Item_empty_string("Orig_log_pos", 20));
field_list->push_back(new Item_empty_string("Info", 20));
}
@@ -355,14 +358,14 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos)
const char* event_type;
if (p)
log_name = p + 1;
-
+
packet->length(0);
net_store_data(packet, log_name, strlen(log_name));
net_store_data(packet, (longlong) pos);
event_type = get_type_str();
net_store_data(packet, event_type, strlen(event_type));
net_store_data(packet, server_id);
- net_store_data(packet, log_seq);
+ net_store_data(packet, log_pos);
pack_info(packet);
return my_net_write(&thd->net, (char*)packet->ptr(), packet->length());
}
@@ -391,7 +394,7 @@ int Log_event::write_header(IO_CACHE* file)
long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
int4store(pos, tmp);
pos += 4;
- int4store(pos, log_seq);
+ int4store(pos, log_pos);
pos += 4;
int2store(pos, flags);
pos += 2;
@@ -413,7 +416,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
// if the read hits eof, we must report it as eof
// so the caller will know it can go into cond_wait to be woken up
// on the next update to the log
- if (!file->error) return LOG_READ_EOF;
+ if(!file->error) return LOG_READ_EOF;
return file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO;
}
data_len = uint4korr(buf + EVENT_LEN_OFFSET);
@@ -429,7 +432,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
{
if (packet->append(file, data_len))
{
- if (log_lock)
+ if(log_lock)
pthread_mutex_unlock(log_lock);
// here we should never hit eof in a non-error condtion
// eof means we are reading the event partially, which should
@@ -444,18 +447,17 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#endif // MYSQL_CLIENT
#ifndef MYSQL_CLIENT
-#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
+#define UNLOCK_MUTEX if(log_lock) pthread_mutex_unlock(log_lock);
#else
#define UNLOCK_MUTEX
#endif
#ifndef MYSQL_CLIENT
-#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
+#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock);
#else
#define LOCK_MUTEX
#endif
-
// allocates memory - the caller is responsible for clean-up
#ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file,
@@ -512,7 +514,8 @@ err:
UNLOCK_MUTEX;
if (error)
{
- sql_print_error(error);
+ sql_print_error("Error in Log_event::read_log_event(): '%s', \
+data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]);
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
}
return res;
@@ -524,9 +527,9 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
if (event_len < EVENT_LEN_OFFSET ||
(uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET))
return NULL; // general sanity check - will fail on a partial read
-
+
Log_event* ev = NULL;
-
+
switch(buf[EVENT_TYPE_OFFSET])
{
case QUERY_EVENT:
@@ -580,9 +583,11 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
#ifdef MYSQL_CLIENT
void Log_event::print_header(FILE* file)
{
+ char llbuff[22];
fputc('#', file);
print_timestamp(file);
- fprintf(file, " server id %d ", server_id);
+ fprintf(file, " server id %d log_pos %s ", server_id,
+ llstr(log_pos,llbuff));
}
void Log_event::print_timestamp(FILE* file, time_t* ts)
@@ -678,7 +683,7 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
// EVENT_LEN_OFFSET
int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
uint ident_offset;
- if (event_len < header_size)
+ if(event_len < header_size)
return;
buf += header_size;
if (old_format)
@@ -774,12 +779,12 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
bool same_db = 0;
- if (db && last_db)
+ if(db && last_db)
{
- if (!(same_db = !memcmp(last_db, db, db_len + 1)))
+ if(!(same_db = !memcmp(last_db, db, db_len + 1)))
memcpy(last_db, db, db_len + 1);
}
-
+
if (db && db[0] && !same_db)
fprintf(file, "use %s;\n", db);
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
@@ -795,7 +800,7 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
int Query_log_event::write_data(IO_CACHE* file)
{
if (!query) return -1;
-
+
char buf[QUERY_HEADER_LEN];
int4store(buf + Q_THREAD_ID_OFFSET, thread_id);
int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
@@ -837,7 +842,7 @@ int Intvar_log_event::write_data(IO_CACHE* file)
void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
{
char llbuff[22];
- if (!short_form)
+ if(!short_form)
{
print_header(file);
fprintf(file, "\tIntvar\n");
@@ -855,7 +860,7 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
}
fprintf(file, "%s;\n", llstr(val,llbuff));
fflush(file);
-
+
}
#endif
@@ -970,74 +975,76 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
#ifndef MYSQL_CLIENT
Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
const char* db_arg, const char* table_name_arg,
- List<Item>& fields_arg,
- enum enum_duplicates handle_dup):
- Log_event(thd), fields(0), field_lens(0),
- table_name(table_name_arg), db(db_arg), fname(ex->file_name),
- thread_id(thd->thread_id), num_fields(0), field_block_len(0)
-{
- time_t end_time;
- time(&end_time);
- exec_time = (ulong) (end_time - thd->start_time);
- db_len = (db) ? (uint32) strlen(db) : 0;
- table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
- fname_len = (fname) ? (uint) strlen(fname) : 0;
- sql_ex.field_term = (char*) ex->field_term->ptr();
- sql_ex.field_term_len = (uint8) ex->field_term->length();
- sql_ex.enclosed = (char*) ex->enclosed->ptr();
- sql_ex.enclosed_len = (uint8) ex->enclosed->length();
- sql_ex.line_term = (char*) ex->line_term->ptr();
- sql_ex.line_term_len = (uint8) ex->line_term->length();
- sql_ex.line_start = (char*) ex->line_start->ptr();
- sql_ex.line_start_len = (uint8) ex->line_start->length();
- sql_ex.escaped = (char*) ex->escaped->ptr();
- sql_ex.escaped_len = (uint8) ex->escaped->length();
- sql_ex.opt_flags = 0;
- sql_ex.cached_new_format = -1;
-
- if (ex->dumpfile)
- sql_ex.opt_flags |= DUMPFILE_FLAG;
- if (ex->opt_enclosed)
- sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
-
- sql_ex.empty_flags = 0;
-
- switch(handle_dup) {
- case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
- case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
- case DUP_ERROR: break;
- }
-
-
- if (!ex->field_term->length())
- sql_ex.empty_flags |= FIELD_TERM_EMPTY;
- if (!ex->enclosed->length())
- sql_ex.empty_flags |= ENCLOSED_EMPTY;
- if (!ex->line_term->length())
- sql_ex.empty_flags |= LINE_TERM_EMPTY;
- if (!ex->line_start->length())
- sql_ex.empty_flags |= LINE_START_EMPTY;
- if (!ex->escaped->length())
- sql_ex.empty_flags |= ESCAPED_EMPTY;
-
- skip_lines = ex->skip_lines;
-
- List_iterator<Item> li(fields_arg);
- field_lens_buf.length(0);
- fields_buf.length(0);
- Item* item;
- while((item = li++))
- {
- num_fields++;
- uchar len = (uchar) strlen(item->name);
- field_block_len += len + 1;
- fields_buf.append(item->name, len + 1);
- field_lens_buf.append((char*)&len, 1);
- }
-
- field_lens = (const uchar*)field_lens_buf.ptr();
- fields = fields_buf.ptr();
-}
+ List<Item>& fields_arg, enum enum_duplicates handle_dup):
+ Log_event(thd),thread_id(thd->thread_id),
+ num_fields(0),fields(0),field_lens(0),field_block_len(0),
+ table_name(table_name_arg),
+ db(db_arg),
+ fname(ex->file_name)
+ {
+ time_t end_time;
+ time(&end_time);
+ exec_time = (ulong) (end_time - thd->start_time);
+ db_len = (db) ? (uint32) strlen(db) : 0;
+ table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
+ fname_len = (fname) ? (uint) strlen(fname) : 0;
+ sql_ex.field_term = (char*) ex->field_term->ptr();
+ sql_ex.field_term_len = (uint8) ex->field_term->length();
+ sql_ex.enclosed = (char*) ex->enclosed->ptr();
+ sql_ex.enclosed_len = (uint8) ex->enclosed->length();
+ sql_ex.line_term = (char*) ex->line_term->ptr();
+ sql_ex.line_term_len = (uint8) ex->line_term->length();
+ sql_ex.line_start = (char*) ex->line_start->ptr();
+ sql_ex.line_start_len = (uint8) ex->line_start->length();
+ sql_ex.escaped = (char*) ex->escaped->ptr();
+ sql_ex.escaped_len = (uint8) ex->escaped->length();
+ sql_ex.opt_flags = 0;
+ sql_ex.cached_new_format = -1;
+
+ if(ex->dumpfile)
+ sql_ex.opt_flags |= DUMPFILE_FLAG;
+ if(ex->opt_enclosed)
+ sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
+
+ sql_ex.empty_flags = 0;
+
+ switch(handle_dup)
+ {
+ case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
+ case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
+ case DUP_ERROR: break;
+ }
+
+
+ if(!ex->field_term->length())
+ sql_ex.empty_flags |= FIELD_TERM_EMPTY;
+ if(!ex->enclosed->length())
+ sql_ex.empty_flags |= ENCLOSED_EMPTY;
+ if(!ex->line_term->length())
+ sql_ex.empty_flags |= LINE_TERM_EMPTY;
+ if(!ex->line_start->length())
+ sql_ex.empty_flags |= LINE_START_EMPTY;
+ if(!ex->escaped->length())
+ sql_ex.empty_flags |= ESCAPED_EMPTY;
+
+ skip_lines = ex->skip_lines;
+
+ List_iterator<Item> li(fields_arg);
+ field_lens_buf.length(0);
+ fields_buf.length(0);
+ Item* item;
+ while((item = li++))
+ {
+ num_fields++;
+ uchar len = (uchar) strlen(item->name);
+ field_block_len += len + 1;
+ fields_buf.append(item->name, len + 1);
+ field_lens_buf.append((char*)&len, 1);
+ }
+
+ field_lens = (const uchar*)field_lens_buf.ptr();
+ fields = fields_buf.ptr();
+ }
#endif
@@ -1045,8 +1052,9 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
// constructed event
Load_log_event::Load_log_event(const char* buf, int event_len,
bool old_format):
- Log_event(buf, old_format),fields(0),
- field_lens(0), num_fields(0), field_block_len(0)
+ Log_event(buf, old_format),num_fields(0),fields(0),
+ field_lens(0),field_block_len(0),
+ table_name(0),db(0),fname(0)
{
if (!event_len) // derived class, will call copy_log_event() itself
return;
@@ -1066,7 +1074,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
table_name_len = (uint)data_head[L_TBL_LEN_OFFSET];
db_len = (uint)data_head[L_DB_LEN_OFFSET];
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
-
+
int body_offset = get_data_body_offset();
if ((int) event_len < body_offset)
return 1;
@@ -1076,7 +1084,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
buf_end,
buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
return 1;
-
+
data_len = event_len - body_offset;
if (num_fields > data_len) // simple sanity check against corruption
return 1;
@@ -1108,43 +1116,43 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
bool same_db = 0;
- if (db && last_db)
+ if(db && last_db)
{
- if (!(same_db = !memcmp(last_db, db, db_len + 1)))
+ if(!(same_db = !memcmp(last_db, db, db_len + 1)))
memcpy(last_db, db, db_len + 1);
}
-
- if (db && db[0] && !same_db)
+
+ if(db && db[0] && !same_db)
fprintf(file, "use %s;\n", db);
fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname);
- if (sql_ex.opt_flags && REPLACE_FLAG )
+ if(sql_ex.opt_flags && REPLACE_FLAG )
fprintf(file," REPLACE ");
- else if (sql_ex.opt_flags && IGNORE_FLAG )
+ else if(sql_ex.opt_flags && IGNORE_FLAG )
fprintf(file," IGNORE ");
-
+
fprintf(file, "INTO TABLE %s ", table_name);
- if (sql_ex.field_term)
+ if(sql_ex.field_term)
{
fprintf(file, " FIELDS TERMINATED BY ");
pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len);
}
- if (sql_ex.enclosed)
+ if(sql_ex.enclosed)
{
- if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
+ if(sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
fprintf(file," OPTIONALLY ");
fprintf(file, " ENCLOSED BY ");
pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len);
}
-
+
if (sql_ex.escaped)
{
fprintf(file, " ESCAPED BY ");
pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len);
}
-
+
if (sql_ex.line_term)
{
fprintf(file," LINES TERMINATED BY ");
@@ -1156,8 +1164,8 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
fprintf(file," LINES STARTING BY ");
pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len);
}
-
- if ((int)skip_lines > 0)
+
+ if((int)skip_lines > 0)
fprintf(file, " IGNORE %ld LINES ", (long) skip_lines);
if (num_fields)
@@ -1165,12 +1173,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
uint i;
const char* field = fields;
fprintf( file, " (");
- for (i = 0; i < num_fields; i++)
+ for(i = 0; i < num_fields; i++)
{
- if (i)
+ if(i)
fputc(',', file);
fprintf(file, field);
-
+
field += field_lens[i] + 1;
}
fputc(')', file);
@@ -1183,46 +1191,53 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
#ifndef MYSQL_CLIENT
-void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log)
+void Log_event::set_log_pos(MYSQL_LOG* log)
{
- log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++;
+ if (!log_pos)
+ log_pos = my_b_tell(&log->log_file);
}
-
void Load_log_event::set_fields(List<Item> &fields)
{
uint i;
const char* field = this->fields;
- for (i = 0; i < num_fields; i++)
- {
- fields.push_back(new Item_field(db, table_name, field));
- field += field_lens[i] + 1;
- }
-
+ for(i = 0; i < num_fields; i++)
+ {
+ fields.push_back(new Item_field(db, table_name, field));
+ field += field_lens[i] + 1;
+ }
+
}
-Slave_log_event::Slave_log_event(THD* thd_arg,struct st_master_info* mi):
+Slave_log_event::Slave_log_event(THD* thd_arg,
+ struct st_relay_log_info* rli):
Log_event(thd_arg),mem_pool(0),master_host(0)
{
- if (!mi->inited)
+ if(!rli->inited)
return;
- pthread_mutex_lock(&mi->lock);
+
+ MASTER_INFO* mi = rli->mi;
+ // TODO: re-write this better without holding both
+ // locks at the same time
+ pthread_mutex_lock(&mi->data_lock);
+ pthread_mutex_lock(&rli->data_lock);
master_host_len = strlen(mi->host);
- master_log_len = strlen(mi->log_file_name);
+ master_log_len = strlen(rli->master_log_name);
// on OOM, just do not initialize the structure and print the error
- if ((mem_pool = (char*)my_malloc(get_data_size() + 1,
+ if((mem_pool = (char*)my_malloc(get_data_size() + 1,
MYF(MY_WME))))
{
master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
memcpy(master_host, mi->host, master_host_len + 1);
master_log = master_host + master_host_len + 1;
- memcpy(master_log, mi->log_file_name, master_log_len + 1);
+ memcpy(master_log, rli->master_log_name, master_log_len + 1);
master_port = mi->port;
- master_pos = mi->pos;
+ master_pos = rli->master_log_pos;
}
else
sql_print_error("Out of memory while recording slave event");
- pthread_mutex_unlock(&mi->lock);
+ pthread_mutex_unlock(&rli->data_lock);
+ pthread_mutex_unlock(&mi->data_lock);
}
@@ -1239,7 +1254,7 @@ Slave_log_event::~Slave_log_event()
void Slave_log_event::print(FILE* file, bool short_form, char* last_db)
{
char llbuff[22];
- if (short_form)
+ if(short_form)
return;
print_header(file);
fputc('\n', file);
@@ -1271,7 +1286,7 @@ void Slave_log_event::init_from_mem_pool(int data_size)
master_host_len = strlen(master_host);
// safety
master_log = master_host + master_host_len + 1;
- if (master_log > mem_pool + data_size)
+ if(master_log > mem_pool + data_size)
{
master_host = 0;
return;
@@ -1283,9 +1298,9 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len):
Log_event(buf,0),mem_pool(0),master_host(0)
{
event_len -= LOG_EVENT_HEADER_LEN;
- if (event_len < 0)
+ if(event_len < 0)
return;
- if (!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME))))
+ if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME))))
return;
memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
mem_pool[event_len] = 0;
@@ -1388,11 +1403,11 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
{
}
#endif
-
+
Append_block_log_event::Append_block_log_event(const char* buf, int len):
Log_event(buf, 0),block(0)
{
- if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
+ if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD;
@@ -1444,7 +1459,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
Delete_file_log_event::Delete_file_log_event(const char* buf, int len):
Log_event(buf, 0),file_id(0)
{
- if ((uint)len < DELETE_FILE_EVENT_OVERHEAD)
+ if((uint)len < DELETE_FILE_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
}
@@ -1487,11 +1502,11 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
{
}
#endif
-
+
Execute_load_log_event::Execute_load_log_event(const char* buf,int len):
Log_event(buf, 0),file_id(0)
{
- if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
+ if((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET);
}
@@ -1529,18 +1544,11 @@ void Execute_load_log_event::pack_info(String* packet)
#endif
#ifndef MYSQL_CLIENT
-
-int ignored_error_code(int err_code)
-{
- return use_slave_mask && bitmap_is_set(&slave_error_mask, err_code);
-}
-
-int Query_log_event::exec_event(struct st_master_info* mi)
+int Query_log_event::exec_event(struct st_relay_log_info* rli)
{
int expected_error,actual_error = 0;
init_sql_alloc(&thd->mem_root, 8192,0);
- thd->db= rewrite_db((char*)db);
- thd->db_length=strlen(thd->db);
+ thd->db = rewrite_db((char*)db);
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->query = (char*)query;
@@ -1553,18 +1561,14 @@ int Query_log_event::exec_event(struct st_master_info* mi)
thd->net.last_errno = 0;
thd->net.last_error[0] = 0;
thd->slave_proxy_id = thread_id; // for temp tables
-
- /*
- sanity check to make sure the master did not get a really bad
- error on the query
- */
- if (ignored_error_code((expected_error=error_code)) ||
- !check_expected_error(thd, expected_error))
+
+ // sanity check to make sure the master did not get a really bad
+ // error on the query
+ if (!check_expected_error(thd,rli,(expected_error = error_code)))
{
mysql_parse(thd, thd->query, q_len);
if (expected_error !=
- (actual_error = thd->net.last_errno) && expected_error &&
- !ignored_error_code(actual_error))
+ (actual_error = thd->net.last_errno) && expected_error)
{
const char* errmsg = "Slave: did not get the expected error\
running query from master - expected: '%s' (%d), got '%s' (%d)";
@@ -1574,52 +1578,49 @@ int Query_log_event::exec_event(struct st_master_info* mi)
actual_error);
thd->query_error = 1;
}
- else if (expected_error == actual_error ||
- ignored_error_code(actual_error))
+ else if (expected_error == actual_error)
{
thd->query_error = 0;
- *last_slave_error = 0;
- last_slave_errno = 0;
+ *rli->last_slave_error = 0;
+ rli->last_slave_errno = 0;
}
}
else
{
// master could be inconsistent, abort and tell DBA to check/fix it
- thd->db= thd->query= 0;
- thd->db_length=0;
+ thd->db = thd->query = 0;
thd->convert_set = 0;
close_thread_tables(thd);
free_root(&thd->mem_root,0);
return 1;
}
}
- thd->db= 0; // prevent db from being freed
- thd->db_length=0;
+ thd->db = 0; // prevent db from being freed
thd->query = 0; // just to be sure
// assume no convert for next query unless set explictly
thd->convert_set = 0;
close_thread_tables(thd);
-
+
if (thd->query_error || thd->fatal_error)
{
- slave_print_error(actual_error, "error '%s' on query '%s'",
+ slave_print_error(rli,actual_error, "error '%s' on query '%s'",
actual_error ? thd->net.last_error :
"unexpected success or fatal error", query);
free_root(&thd->mem_root,0);
return 1;
}
free_root(&thd->mem_root,0);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
+int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
{
init_sql_alloc(&thd->mem_root, 8192,0);
- thd->db= rewrite_db((char*)db);
+ thd->db = rewrite_db((char*)db);
thd->query = 0;
thd->query_error = 0;
-
- if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
+
+ if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->set_time((time_t)when);
thd->current_tablenr = 0;
@@ -1633,8 +1634,9 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
tables.name = tables.real_name = (char*)table_name;
tables.lock_type = TL_WRITE;
// the table will be opened in mysql_load
- if (table_rules_on && !tables_ok(thd, &tables))
+ if(table_rules_on && !tables_ok(thd, &tables))
{
+ // TODO: this is a bug - this needs to be moved to the I/O thread
if (net)
skip_load_data_infile(net);
}
@@ -1651,7 +1653,7 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
String line_term(sql_ex.line_term,sql_ex.line_term_len);
String line_start(sql_ex.line_start,sql_ex.line_start_len);
String escaped(sql_ex.escaped,sql_ex.escaped_len);
-
+
ex.opt_enclosed = (sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
if (sql_ex.empty_flags & FIELD_TERM_EMPTY)
ex.field_term->length(0);
@@ -1668,14 +1670,14 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
// about the packet sequence
thd->net.pkt_nr = net->pkt_nr;
}
- if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0,
+ if(mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0,
TL_WRITE))
thd->query_error = 1;
- if (thd->cuted_fields)
+ if(thd->cuted_fields)
sql_print_error("Slave: load data infile at position %s in log \
-'%s' produced %d warning(s)", llstr(mi->pos,llbuff), RPL_LOG_NAME,
+'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME,
thd->cuted_fields );
- if (net)
+ if(net)
net->pkt_nr = thd->net.pkt_nr;
}
}
@@ -1683,65 +1685,71 @@ int Load_log_event::exec_event(NET* net, struct st_master_info* mi)
{
// we will just ask the master to send us /dev/null if we do not
// want to load the data
+ // TODO: this a bug - needs to be done in I/O thread
if (net)
skip_load_data_infile(net);
}
-
+
thd->net.vio = 0;
- thd->db= 0;// prevent db from being freed
- thd->db_length=0;
+ thd->db = 0;// prevent db from being freed
close_thread_tables(thd);
- if (thd->query_error)
+ if(thd->query_error)
{
int sql_error = thd->net.last_errno;
if (!sql_error)
sql_error = ER_UNKNOWN_ERROR;
-
- slave_print_error(sql_error, "Slave: Error '%s' running load data infile ",
+
+ slave_print_error(rli,sql_error,
+ "Slave: Error '%s' running load data infile ",
ER_SAFE(sql_error));
free_root(&thd->mem_root,0);
return 1;
}
free_root(&thd->mem_root,0);
-
- if (thd->fatal_error)
+
+ if(thd->fatal_error)
{
sql_print_error("Slave: Fatal error running LOAD DATA INFILE ");
return 1;
}
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Start_log_event::exec_event(struct st_master_info* mi)
+int Start_log_event::exec_event(struct st_relay_log_info* rli)
{
- if (!mi->old_format)
+ if (!rli->mi->old_format)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
}
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Stop_log_event::exec_event(struct st_master_info* mi)
+int Stop_log_event::exec_event(struct st_relay_log_info* rli)
{
- if (mi->pos > 4) // stop event should be ignored after rotate event
+ // do not clean up immediately after rotate event
+ if (rli->master_log_pos > 4)
{
close_temporary_tables(thd);
cleanup_load_tmpdir();
- mi->inc_pos(get_event_len(), log_seq);
- flush_master_info(mi);
}
- thd->log_seq = 0;
+ // we do not want to update master_log pos because we get a rotate event
+ // before stop, so by now master_log_name is set to the next log
+ // if we updated it, we will have incorrect master coordinates and this
+ // could give false triggers in MASTER_POS_WAIT() that we have reached
+ // the targed position when in fact we have not
+ rli->inc_pos(get_event_len(), 0);
+ flush_relay_log_info(rli);
return 0;
}
-int Rotate_log_event::exec_event(struct st_master_info* mi)
+int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
{
bool rotate_binlog = 0, write_slave_event = 0;
- char* log_name = mi->log_file_name;
- pthread_mutex_lock(&mi->lock);
-
+ char* log_name = rli->master_log_name;
+ pthread_mutex_lock(&rli->data_lock);
+ // TODO: probably needs re-write
// rotate local binlog only if the name of remote has changed
if (!*log_name || !(log_name[ident_len] == 0 &&
!memcmp(log_name, new_log_ident, ident_len)))
@@ -1749,41 +1757,38 @@ int Rotate_log_event::exec_event(struct st_master_info* mi)
write_slave_event = (!(flags & LOG_EVENT_FORCED_ROTATE_F)
&& mysql_bin_log.is_open());
rotate_binlog = (*log_name && write_slave_event);
- memcpy(log_name, new_log_ident,ident_len );
+ if (ident_len >= sizeof(rli->master_log_name))
+ return 1;
+ memcpy(log_name, new_log_ident,ident_len);
log_name[ident_len] = 0;
}
- mi->pos = pos;
- mi->last_log_seq = log_seq;
-#ifndef DBUG_OFF
- if (abort_slave_event_count)
- ++events_till_abort;
-#endif
+ rli->master_log_pos = pos;
+ rli->relay_log_pos += get_event_len();
if (rotate_binlog)
{
mysql_bin_log.new_file();
- mi->last_log_seq = 0;
+ rli->master_log_pos = 4;
}
- pthread_cond_broadcast(&mi->cond);
- pthread_mutex_unlock(&mi->lock);
- flush_master_info(mi);
-
+ pthread_cond_broadcast(&rli->data_cond);
+ pthread_mutex_unlock(&rli->data_lock);
+ flush_relay_log_info(rli);
+
if (write_slave_event)
{
- Slave_log_event s(thd, mi);
+ Slave_log_event s(thd, rli);
if (s.master_host)
{
- s.set_log_seq(0, &mysql_bin_log);
+ s.set_log_pos(&mysql_bin_log);
s.server_id = ::server_id;
mysql_bin_log.write(&s);
}
}
- thd->log_seq = 0;
return 0;
}
-int Intvar_log_event::exec_event(struct st_master_info* mi)
+int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
{
- switch(type)
+ switch (type)
{
case LAST_INSERT_ID_EVENT:
thd->last_insert_id_used = 1;
@@ -1793,18 +1798,18 @@ int Intvar_log_event::exec_event(struct st_master_info* mi)
thd->next_insert_id = val;
break;
}
- mi->inc_pending(get_event_len());
+ rli->inc_pending(get_event_len());
return 0;
}
-int Slave_log_event::exec_event(struct st_master_info* mi)
+int Slave_log_event::exec_event(struct st_relay_log_info* rli)
{
- if (mysql_bin_log.is_open())
+ if(mysql_bin_log.is_open())
mysql_bin_log.write(this);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Create_file_log_event::exec_event(struct st_master_info* mi)
+int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname_buf[FN_REFLEN+10];
char *p;
@@ -1820,10 +1825,10 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
init_io_cache(&file, fd, IO_SIZE, WRITE_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
- slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
-
+
// a trick to avoid allocating another buffer
strmov(p, ".data");
fname = fname_buf;
@@ -1831,22 +1836,22 @@ int Create_file_log_event::exec_event(struct st_master_info* mi)
if (write_base(&file))
{
strmov(p, ".info"); // to have it right in the error message
- slave_print_error(my_errno, "Could not write to file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not write to file '%s'", fname_buf);
goto err;
}
end_io_cache(&file);
my_close(fd, MYF(0));
-
+
// fname_buf now already has .data, not .info, because we did our trick
if ((fd = my_open(fname_buf, O_WRONLY|O_CREAT|O_BINARY|O_TRUNC,
MYF(MY_WME))) < 0)
{
- slave_print_error(my_errno, "Could not open file '%s'", fname_buf);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname_buf);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(my_errno, "Write to '%s' failed", fname_buf);
+ slave_print_error(rli,my_errno, "Write to '%s' failed", fname_buf);
goto err;
}
if (mysql_bin_log.is_open())
@@ -1857,10 +1862,10 @@ err:
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? 1 : Log_event::exec_event(mi);
+ return error ? 1 : Log_event::exec_event(rli);
}
-int Delete_file_log_event::exec_event(struct st_master_info* mi)
+int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1871,10 +1876,10 @@ int Delete_file_log_event::exec_event(struct st_master_info* mi)
(void)my_delete(fname, MYF(MY_WME));
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
- return Log_event::exec_event(mi);
+ return Log_event::exec_event(rli);
}
-int Append_block_log_event::exec_event(struct st_master_info* mi)
+int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1884,12 +1889,12 @@ int Append_block_log_event::exec_event(struct st_master_info* mi)
memcpy(p, ".data", 6);
if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0)
{
- slave_print_error(my_errno, "Could not open file '%s'", fname);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(my_errno, "Write to '%s' failed", fname);
+ slave_print_error(rli,my_errno, "Write to '%s' failed", fname);
goto err;
}
if (mysql_bin_log.is_open())
@@ -1898,10 +1903,10 @@ int Append_block_log_event::exec_event(struct st_master_info* mi)
err:
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? error : Log_event::exec_event(mi);
+ return error ? error : Log_event::exec_event(rli);
}
-int Execute_load_log_event::exec_event(struct st_master_info* mi)
+int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
{
char fname[FN_REFLEN+10];
char* p;
@@ -1917,7 +1922,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
- slave_print_error(my_errno, "Could not open file '%s'", fname);
+ slave_print_error(rli,my_errno, "Could not open file '%s'", fname);
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
@@ -1925,7 +1930,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
(bool)0))
|| lev->get_type_code() != NEW_LOAD_EVENT)
{
- slave_print_error(0, "File '%s' appears corrupted", fname);
+ slave_print_error(rli,0, "File '%s' appears corrupted", fname);
goto err;
}
// we want to disable binary logging in slave thread
@@ -1938,7 +1943,7 @@ int Execute_load_log_event::exec_event(struct st_master_info* mi)
lev->thd = thd;
if (lev->exec_event(0,0))
{
- slave_print_error(my_errno, "Failed executing load from '%s'", fname);
+ slave_print_error(rli,my_errno, "Failed executing load from '%s'", fname);
thd->options = save_options;
goto err;
}
@@ -1954,7 +1959,7 @@ err:
end_io_cache(&file);
if (fd >= 0)
my_close(fd, MYF(0));
- return error ? error : Log_event::exec_event(mi);
+ return error ? error : Log_event::exec_event(rli);
}