summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc628
1 files changed, 416 insertions, 212 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index ac985c266c8..5538e6c0b7f 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -20,6 +20,7 @@
#pragma implementation // gcc: Class implementation
#endif
#include "mysql_priv.h"
+#include "slave.h"
#endif /* MYSQL_CLIENT */
@@ -31,6 +32,7 @@ static void pretty_print_char(FILE* file, int c)
case '\r': fprintf(file, "\\r"); break;
case '\\': fprintf(file, "\\\\"); break;
case '\b': fprintf(file, "\\b"); break;
+ case '\t': fprintf(file, "\\t"); break;
case '\'': fprintf(file, "\\'"); break;
case 0 : fprintf(file, "\\0"); break;
default:
@@ -40,6 +42,220 @@ static void pretty_print_char(FILE* file, int c)
fputc('\'', file);
}
+#ifndef MYSQL_CLIENT
+
+static void pretty_print_char(String* packet, int c)
+{
+ packet->append('\'');
+ switch(c) {
+ case '\n': packet->append( "\\n"); break;
+ case '\r': packet->append( "\\r"); break;
+ case '\\': packet->append( "\\\\"); break;
+ case '\b': packet->append( "\\b"); break;
+ case '\t': packet->append( "\\t"); break;
+ case '\'': packet->append( "\\'"); break;
+ case 0 : packet->append( "\\0"); break;
+ default:
+ packet->append((char)c);
+ break;
+ }
+ packet->append('\'');
+}
+
+#endif
+
+const char* Log_event::get_type_str()
+{
+ switch(get_type_code())
+ {
+ case START_EVENT: return "Start";
+ case STOP_EVENT: return "Stop";
+ case QUERY_EVENT: return "Query";
+ case ROTATE_EVENT: return "Rotate";
+ case INTVAR_EVENT: return "Intvar";
+ case LOAD_EVENT: return "Load";
+ case SLAVE_EVENT: return "Slave";
+ default: /* impossible */ return "Unknown";
+ }
+}
+
+#ifndef MYSQL_CLIENT
+
+void Log_event::pack_info(String* packet)
+{
+ net_store_data(packet, "", 0);
+}
+
+void Query_log_event::pack_info(String* packet)
+{
+ String tmp;
+ if(db && db_len)
+ {
+ tmp.append("use ");
+ tmp.append(db, db_len);
+ tmp.append("; ", 2);
+ }
+
+ if(query && q_len)
+ tmp.append(query, q_len);
+ net_store_data(packet, (char*)tmp.ptr(), tmp.length());
+}
+
+void Start_log_event::pack_info(String* packet)
+{
+ String tmp;
+ char buf[22];
+
+ tmp.append("Server ver: ");
+ tmp.append(server_version);
+ tmp.append(", Binlog ver: ");
+ tmp.append(llstr(binlog_version, buf));
+ net_store_data(packet, tmp.ptr(), tmp.length());
+}
+
+void Load_log_event::pack_info(String* packet)
+{
+ String tmp;
+ if(db && db_len)
+ {
+ tmp.append("use ");
+ tmp.append(db, db_len);
+ tmp.append("; ", 2);
+ }
+
+ tmp.append("LOAD DATA INFILE '");
+ tmp.append(fname);
+ tmp.append("' ", 2);
+ if(sql_ex.opt_flags && REPLACE_FLAG )
+ tmp.append(" REPLACE ");
+ else if(sql_ex.opt_flags && IGNORE_FLAG )
+ tmp.append(" IGNORE ");
+
+ tmp.append("INTO TABLE ");
+ tmp.append(table_name);
+ if (!(sql_ex.empty_flags & FIELD_TERM_EMPTY))
+ {
+ tmp.append(" FIELDS TERMINATED BY ");
+ pretty_print_char(&tmp, sql_ex.field_term);
+ }
+
+ if (!(sql_ex.empty_flags & ENCLOSED_EMPTY))
+ {
+ if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
+ tmp.append(" OPTIONALLY ");
+ tmp.append( " ENCLOSED BY ");
+ pretty_print_char(&tmp, sql_ex.enclosed);
+ }
+
+ if (!(sql_ex.empty_flags & ESCAPED_EMPTY))
+ {
+ tmp.append( " ESCAPED BY ");
+ pretty_print_char(&tmp, sql_ex.escaped);
+ }
+
+ if (!(sql_ex.empty_flags & LINE_TERM_EMPTY))
+ {
+ tmp.append(" LINES TERMINATED BY ");
+ pretty_print_char(&tmp, sql_ex.line_term);
+ }
+
+ if (!(sql_ex.empty_flags & LINE_START_EMPTY))
+ {
+ tmp.append(" LINES STARTING BY ");
+ pretty_print_char(&tmp, sql_ex.line_start);
+ }
+
+ if ((int)skip_lines > 0)
+ tmp.append( " IGNORE %ld LINES ", (long) skip_lines);
+
+ if (num_fields)
+ {
+ uint i;
+ const char* field = fields;
+ tmp.append(" (");
+ for(i = 0; i < num_fields; i++)
+ {
+ if(i)
+ tmp.append(" ,");
+ tmp.append( field);
+
+ field += field_lens[i] + 1;
+ }
+ tmp.append(')');
+ }
+
+ net_store_data(packet, tmp.ptr(), tmp.length());
+}
+
+void Rotate_log_event::pack_info(String* packet)
+{
+ String tmp;
+ char buf[22];
+ tmp.append(new_log_ident, ident_len);
+ tmp.append(";pos=");
+ tmp.append(llstr(pos,buf));
+ if(flags & LOG_EVENT_FORCED_ROTATE_F)
+ tmp.append("; forced by master");
+ net_store_data(packet, tmp.ptr(), tmp.length());
+}
+
+void Intvar_log_event::pack_info(String* packet)
+{
+ String tmp;
+ char buf[22];
+ tmp.append(get_var_type_name());
+ tmp.append('=');
+ tmp.append(llstr(val, buf));
+ net_store_data(packet, tmp.ptr(), tmp.length());
+}
+
+void Slave_log_event::pack_info(String* packet)
+{
+ String tmp;
+ char buf[22];
+ tmp.append("host=");
+ tmp.append(master_host);
+ tmp.append(",port=");
+ tmp.append(llstr(master_port,buf));
+ tmp.append(",log=");
+ tmp.append(master_log);
+ tmp.append(",pos=");
+ tmp.append(llstr(master_pos,buf));
+ net_store_data(packet, tmp.ptr(), tmp.length());
+}
+
+
+void Log_event::init_show_field_list(List<Item>* field_list)
+{
+ field_list->push_back(new Item_empty_string("Log_name", 20));
+ field_list->push_back(new Item_empty_string("Pos", 20));
+ field_list->push_back(new Item_empty_string("Event_type", 20));
+ field_list->push_back(new Item_empty_string("Server_id", 20));
+ field_list->push_back(new Item_empty_string("Log_seq", 20));
+ field_list->push_back(new Item_empty_string("Info", 20));
+}
+
+int Log_event::net_send(THD* thd, const char* log_name, ulong pos)
+{
+ String* packet = &thd->packet;
+ const char* p = strrchr(log_name, FN_LIBCHAR);
+ const char* event_type;
+ if (p)
+ log_name = p + 1;
+
+ packet->length(0);
+ net_store_data(packet, log_name, strlen(log_name));
+ net_store_data(packet, (longlong)pos);
+ event_type = get_type_str();
+ net_store_data(packet, event_type, strlen(event_type));
+ net_store_data(packet, server_id);
+ net_store_data(packet, log_seq);
+ pack_info(packet);
+ return my_net_write(&thd->net, (char*)packet->ptr(), packet->length());
+}
+
+#endif
+
int Query_log_event::write(IO_CACHE* file)
{
return query ? Log_event::write(file) : -1;
@@ -52,7 +268,6 @@ int Log_event::write(IO_CACHE* file)
int Log_event::write_header(IO_CACHE* file)
{
- // make sure to change this when the header gets bigger
char buf[LOG_EVENT_HEADER_LEN];
char* pos = buf;
int4store(pos, when); // timestamp
@@ -63,6 +278,10 @@ int Log_event::write_header(IO_CACHE* file)
long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
int4store(pos, tmp);
pos += 4;
+ int4store(pos, log_seq);
+ pos += 4;
+ int2store(pos, flags);
+ pos += 2;
return (my_b_write(file, (byte*) buf, (uint) (pos - buf)));
}
@@ -115,91 +334,51 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock)
{
- time_t timestamp;
- uint32 server_id;
-
- char buf[LOG_EVENT_HEADER_LEN-4];
+ char head[LOG_EVENT_HEADER_LEN];
if(log_lock) pthread_mutex_lock(log_lock);
- if (my_b_read(file, (byte *) buf, sizeof(buf)))
+ if (my_b_read(file, (byte *) head, sizeof(head)))
{
if (log_lock) pthread_mutex_unlock(log_lock);
- return NULL;
- }
- timestamp = uint4korr(buf);
- server_id = uint4korr(buf + 5);
-
- switch(buf[EVENT_TYPE_OFFSET])
- {
- case QUERY_EVENT:
- {
- Query_log_event* q = new Query_log_event(file, timestamp, server_id);
- if(log_lock) pthread_mutex_unlock(log_lock);
- if (!q->query)
- {
- delete q;
- q=NULL;
- }
- return q;
- }
-
- case LOAD_EVENT:
- {
- Load_log_event* l = new Load_log_event(file, timestamp, server_id);
- if(log_lock) pthread_mutex_unlock(log_lock);
- if (!l->table_name)
- {
- delete l;
- l=NULL;
- }
- return l;
+ return 0;
}
+ uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
+ char* buf = 0;
+ const char* error = 0;
+ Log_event* res = 0;
- case ROTATE_EVENT:
+ if (data_len > max_allowed_packet)
{
- Rotate_log_event* r = new Rotate_log_event(file, timestamp, server_id);
- if(log_lock) pthread_mutex_unlock(log_lock);
-
- if (!r->new_log_ident)
- {
- delete r;
- r=NULL;
- }
- return r;
+ error = "Event too big";
+ goto err;
}
- case INTVAR_EVENT:
+ if (data_len < LOG_EVENT_HEADER_LEN)
{
- Intvar_log_event* e = new Intvar_log_event(file, timestamp, server_id);
- if(log_lock) pthread_mutex_unlock(log_lock);
-
- if (e->type == INVALID_INT_EVENT)
- {
- delete e;
- e=NULL;
- }
- return e;
+ error = "Event too small";
+ goto err;
}
- case START_EVENT:
- {
- Start_log_event* e = new Start_log_event(file, timestamp, server_id);
- if(log_lock) pthread_mutex_unlock(log_lock);
- return e;
- }
- case STOP_EVENT:
- {
- Stop_log_event* e = new Stop_log_event(file, timestamp, server_id);
- if(log_lock) pthread_mutex_unlock(log_lock);
- return e;
- }
- default:
- break;
+ if (!(buf = my_malloc(data_len, MYF(MY_WME))))
+ {
+ error = "Out of memory";
+ goto err;
}
- // default
+ memcpy(buf, head, LOG_EVENT_HEADER_LEN);
+ if(my_b_read(file, (byte*) buf + LOG_EVENT_HEADER_LEN,
+ data_len - LOG_EVENT_HEADER_LEN))
+ {
+ error = "read error";
+ goto err;
+ }
+ res = read_log_event(buf, data_len);
+err:
if (log_lock) pthread_mutex_unlock(log_lock);
- return NULL;
+ if(error)
+ sql_print_error(error);
+ my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
+ return res;
}
Log_event* Log_event::read_log_event(const char* buf, int event_len)
@@ -245,6 +424,17 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len)
return r;
}
+ case SLAVE_EVENT:
+ {
+ Slave_log_event* s = new Slave_log_event(buf, event_len);
+ if (!s->master_host)
+ {
+ delete s;
+ return NULL;
+ }
+
+ return s;
+ }
case START_EVENT: return new Start_log_event(buf);
case STOP_EVENT: return new Stop_log_event(buf);
case INTVAR_EVENT: return new Intvar_log_event(buf);
@@ -305,6 +495,7 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
{
+ char buf[22];
if (short_form)
return;
@@ -313,51 +504,25 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
if (new_log_ident)
my_fwrite(file, (byte*) new_log_ident, (uint)ident_len,
MYF(MY_NABP | MY_WME));
- fprintf(file, "\n");
+ fprintf(file, "pos=%s\n", llstr(pos, buf));
fflush(file);
}
-Rotate_log_event::Rotate_log_event(IO_CACHE* file, time_t when_arg,
- uint32 server_id):
- Log_event(when_arg, 0, 0, server_id),new_log_ident(NULL),alloced(0)
-{
- char *tmp_ident;
- char buf[4];
-
- if (my_b_read(file, (byte*) buf, sizeof(buf)))
- return;
- ulong event_len;
- event_len = uint4korr(buf);
- if (event_len < ROTATE_EVENT_OVERHEAD)
- return;
-
- ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD);
- if (!(tmp_ident = (char*) my_malloc((uint)ident_len, MYF(MY_WME))))
- return;
- if (my_b_read( file, (byte*) tmp_ident, (uint) ident_len))
- {
- my_free((gptr) tmp_ident, MYF(0));
- return;
- }
-
- new_log_ident = tmp_ident;
- alloced = 1;
-}
-
Start_log_event::Start_log_event(const char* buf) :Log_event(buf)
{
- buf += EVENT_LEN_OFFSET + 4; // skip even length
- binlog_version = uint2korr(buf);
- memcpy(server_version, buf + 2, sizeof(server_version));
- created = uint4korr(buf + 2 + sizeof(server_version));
+ binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN +
+ ST_BINLOG_VER_OFFSET);
+ memcpy(server_version, buf + ST_SERVER_VER_OFFSET + LOG_EVENT_HEADER_LEN,
+ ST_SERVER_VER_LEN);
+ created = uint4korr(buf + ST_CREATED_OFFSET + LOG_EVENT_HEADER_LEN);
}
int Start_log_event::write_data(IO_CACHE* file)
{
- char buff[sizeof(server_version)+2+4];
- int2store(buff,binlog_version);
- memcpy(buff+2,server_version,sizeof(server_version));
- int4store(buff+2+sizeof(server_version),created);
+ char buff[START_HEADER_LEN];
+ int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
+ memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
+ int4store(buff + ST_CREATED_OFFSET,created);
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
}
@@ -369,8 +534,10 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len):
if(event_len < ROTATE_EVENT_OVERHEAD)
return;
+ pos = uint8korr(buf + R_POS_OFFSET + LOG_EVENT_HEADER_LEN);
ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD);
- if (!(new_log_ident = (char*) my_memdup((byte*) buf + LOG_EVENT_HEADER_LEN,
+ if (!(new_log_ident = (char*) my_memdup((byte*) buf + R_IDENT_OFFSET
+ + LOG_EVENT_HEADER_LEN,
(uint) ident_len, MYF(MY_WME))))
return;
@@ -379,42 +546,10 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len):
int Rotate_log_event::write_data(IO_CACHE* file)
{
- return my_b_write(file, (byte*) new_log_ident, (uint) ident_len) ? -1 :0;
-}
-
-Query_log_event::Query_log_event(IO_CACHE* file, time_t when_arg,
- uint32 server_id):
- Log_event(when_arg,0,0,server_id),data_buf(0),query(NULL),db(NULL)
-{
- char buf[QUERY_HEADER_LEN + 4];
- ulong data_len;
- if (my_b_read(file, (byte*) buf, sizeof(buf)))
- return; // query == NULL will tell the
- // caller there was a problem
- data_len = uint4korr(buf);
- if (data_len < QUERY_EVENT_OVERHEAD)
- return; // tear-drop attack protection :)
-
- data_len -= QUERY_EVENT_OVERHEAD;
- exec_time = uint4korr(buf + 8);
- db_len = (uint)buf[12];
- error_code = uint2korr(buf + 13);
-
- /* Allocate one byte extra for end \0 */
- if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
- return;
- if (my_b_read( file, (byte*) data_buf, data_len))
- {
- my_free((gptr) data_buf, MYF(0));
- data_buf = 0;
- return;
- }
-
- thread_id = uint4korr(buf + 4);
- db = data_buf;
- query=data_buf + db_len + 1;
- q_len = data_len - 1 - db_len;
- *((char*) query + q_len) = 0; // Safety
+ char buf[ROTATE_HEADER_LEN];
+ int8store(buf, pos + R_POS_OFFSET);
+ return my_b_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
+ my_b_write(file, (byte*)new_log_ident, (uint) ident_len);
}
Query_log_event::Query_log_event(const char* buf, int event_len):
@@ -423,19 +558,19 @@ Query_log_event::Query_log_event(const char* buf, int event_len):
if ((uint)event_len < QUERY_EVENT_OVERHEAD)
return;
ulong data_len;
- buf += EVENT_LEN_OFFSET;
data_len = event_len - QUERY_EVENT_OVERHEAD;
+
- exec_time = uint4korr(buf + 8);
- error_code = uint2korr(buf + 13);
+ exec_time = uint4korr(buf + LOG_EVENT_HEADER_LEN + Q_EXEC_TIME_OFFSET);
+ error_code = uint2korr(buf + LOG_EVENT_HEADER_LEN + Q_ERR_CODE_OFFSET);
if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME))))
return;
- memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len);
- thread_id = uint4korr(buf + 4);
+ memcpy(data_buf, buf + LOG_EVENT_HEADER_LEN + Q_DATA_OFFSET, data_len);
+ thread_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + Q_THREAD_ID_OFFSET);
db = data_buf;
- db_len = (uint)buf[12];
+ db_len = (uint)buf[LOG_EVENT_HEADER_LEN + Q_DB_LEN_OFFSET];
query=data_buf + db_len + 1;
q_len = data_len - 1 - db_len;
*((char*)query+q_len) = 0;
@@ -474,44 +609,38 @@ int Query_log_event::write_data(IO_CACHE* file)
if (!query) return -1;
char buf[QUERY_HEADER_LEN];
- char* pos = buf;
- int4store(pos, thread_id);
- pos += 4;
- int4store(pos, exec_time);
- pos += 4;
- *pos++ = (char)db_len;
- int2store(pos, error_code);
- pos += 2;
+ int4store(buf + Q_THREAD_ID_OFFSET, thread_id);
+ int4store(buf + Q_EXEC_TIME_OFFSET, exec_time);
+ buf[Q_DB_LEN_OFFSET] = (char)db_len;
+ int2store(buf + Q_ERR_CODE_OFFSET, error_code);
- return (my_b_write(file, (byte*) buf, (uint)(pos - buf)) ||
+ return (my_b_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
}
-Intvar_log_event:: Intvar_log_event(IO_CACHE* file, time_t when_arg,
- uint32 server_id)
- :Log_event(when_arg,0,0,server_id), type(INVALID_INT_EVENT)
+Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf)
{
- char buf[9+4];
- if (!my_b_read(file, (byte*) buf, sizeof(buf)))
- {
- type = buf[4];
- val = uint8korr(buf+1+4);
- }
+ buf += LOG_EVENT_HEADER_LEN;
+ type = buf[I_TYPE_OFFSET];
+ val = uint8korr(buf+I_VAL_OFFSET);
}
-Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf)
+const char* Intvar_log_event::get_var_type_name()
{
- buf += LOG_EVENT_HEADER_LEN;
- type = buf[0];
- val = uint8korr(buf+1);
+ switch(type)
+ {
+ case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID";
+ case INSERT_ID_EVENT: return "INSERT_ID";
+ default: /* impossible */ return "UNKNOWN";
+ }
}
int Intvar_log_event::write_data(IO_CACHE* file)
{
char buf[9];
- buf[0] = type;
- int8store(buf + 1, val);
+ buf[I_TYPE_OFFSET] = type;
+ int8store(buf + I_VAL_OFFSET, val);
return my_b_write(file, (byte*) buf, sizeof(buf));
}
@@ -542,12 +671,12 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
int Load_log_event::write_data(IO_CACHE* file)
{
char buf[LOAD_HEADER_LEN];
- int4store(buf, thread_id);
- int4store(buf + 4, exec_time);
- int4store(buf + 8, skip_lines);
- buf[12] = (char)table_name_len;
- buf[13] = (char)db_len;
- int4store(buf + 14, num_fields);
+ int4store(buf + L_THREAD_ID_OFFSET, thread_id);
+ int4store(buf + L_EXEC_TIME_OFFSET, exec_time);
+ int4store(buf + L_SKIP_LINES_OFFSET, skip_lines);
+ buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
+ buf[L_DB_LEN_OFFSET] = (char)db_len;
+ int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
if(my_b_write(file, (byte*)buf, sizeof(buf)) ||
my_b_write(file, (byte*)&sql_ex, sizeof(sql_ex)))
@@ -566,52 +695,33 @@ int Load_log_event::write_data(IO_CACHE* file)
return 0;
}
-Load_log_event::Load_log_event(IO_CACHE* file, time_t when, uint32 server_id):
- Log_event(when,0,0,server_id),data_buf(0),num_fields(0),
- fields(0),field_lens(0),field_block_len(0),
- table_name(0),db(0),fname(0)
-{
- char buf[LOAD_HEADER_LEN + 4];
- ulong data_len;
- if (my_b_read(file, (byte*)buf, sizeof(buf)) ||
- my_b_read(file, (byte*)&sql_ex, sizeof(sql_ex)))
- return;
-
- data_len = uint4korr(buf) - LOAD_EVENT_OVERHEAD;
- if (!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
- return;
- if (my_b_read(file, (byte*)data_buf, data_len))
- return;
- copy_log_event(buf,data_len);
-}
-
Load_log_event::Load_log_event(const char* buf, int event_len):
Log_event(buf),data_buf(0),num_fields(0),fields(0),
field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0)
{
- ulong data_len;
-
+ uint data_len;
if((uint)event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN))
return;
- buf += EVENT_LEN_OFFSET;
- memcpy(&sql_ex, buf + LOAD_HEADER_LEN + 4, sizeof(sql_ex));
- data_len = event_len;
-
+ memcpy(&sql_ex, buf + LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN,
+ sizeof(sql_ex));
+ data_len = event_len - LOAD_HEADER_LEN - LOG_EVENT_HEADER_LEN -
+ sizeof(sql_ex);
if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
return;
- memcpy(data_buf, buf + 22 + sizeof(sql_ex), data_len);
+ memcpy(data_buf, buf +LOG_EVENT_HEADER_LEN + LOAD_HEADER_LEN
+ + sizeof(sql_ex), data_len);
copy_log_event(buf, data_len);
}
void Load_log_event::copy_log_event(const char *buf, ulong data_len)
{
- thread_id = uint4korr(buf+4);
- exec_time = uint4korr(buf+8);
- skip_lines = uint4korr(buf + 12);
- table_name_len = (uint)buf[16];
- db_len = (uint)buf[17];
- num_fields = uint4korr(buf + 18);
+ thread_id = uint4korr(buf + L_THREAD_ID_OFFSET + LOG_EVENT_HEADER_LEN);
+ exec_time = uint4korr(buf + L_EXEC_TIME_OFFSET + LOG_EVENT_HEADER_LEN);
+ skip_lines = uint4korr(buf + L_SKIP_LINES_OFFSET + LOG_EVENT_HEADER_LEN);
+ table_name_len = (uint)buf[L_TBL_LEN_OFFSET + LOG_EVENT_HEADER_LEN];
+ db_len = (uint)buf[L_DB_LEN_OFFSET + LOG_EVENT_HEADER_LEN];
+ num_fields = uint4korr(buf + L_NUM_FIELDS_OFFSET + LOG_EVENT_HEADER_LEN);
if (num_fields > data_len) // simple sanity check against corruption
return;
@@ -717,6 +827,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
#ifndef MYSQL_CLIENT
+void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log)
+ {
+ log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++;
+ }
+
+
void Load_log_event::set_fields(List<Item> &fields)
{
uint i;
@@ -729,4 +845,92 @@ void Load_log_event::set_fields(List<Item> &fields)
}
+Slave_log_event::Slave_log_event(THD* thd_arg,MASTER_INFO* mi):
+ Log_event(thd_arg->start_time, 0, 1, thd_arg->server_id),
+ mem_pool(0),master_host(0)
+{
+ if(!mi->inited)
+ return;
+ pthread_mutex_lock(&mi->lock);
+ master_host_len = strlen(mi->host);
+ master_log_len = strlen(mi->log_file_name);
+ // on OOM, just do not initialize the structure and print the error
+ if((mem_pool = (char*)my_malloc(get_data_size() + 1,
+ MYF(MY_WME))))
+ {
+ master_host = mem_pool + SL_MASTER_HOST_OFFSET ;
+ memcpy(master_host, mi->host, master_host_len + 1);
+ master_log = master_host + master_host_len + 1;
+ memcpy(master_log, mi->log_file_name, master_log_len + 1);
+ master_port = mi->port;
+ master_pos = mi->pos;
+ }
+ else
+ sql_print_error("Out of memory while recording slave event");
+ pthread_mutex_unlock(&mi->lock);
+}
+
+
#endif
+
+
+Slave_log_event::~Slave_log_event()
+{
+ my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR));
+}
+
+void Slave_log_event::print(FILE* file, bool short_form = 0,
+ char* last_db = 0)
+{
+ char llbuff[22];
+ if(short_form)
+ return;
+ print_header(file);
+ fputc('\n', file);
+ fprintf(file, "Slave: master_host='%s' master_port=%d \
+ master_log=%s master_pos=%s\n", master_host, master_port, master_log,
+ llstr(master_pos, llbuff));
+}
+
+int Slave_log_event::get_data_size()
+{
+ return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET;
+}
+
+int Slave_log_event::write_data(IO_CACHE* file)
+{
+ int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
+ int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
+ // log and host are already there
+ return my_b_write(file, (byte*)mem_pool, get_data_size());
+}
+
+void Slave_log_event::init_from_mem_pool(int data_size)
+{
+ master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET);
+ master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET);
+ master_host = mem_pool + SL_MASTER_HOST_OFFSET;
+ master_host_len = strlen(master_host);
+ // safety
+ master_log = master_host + master_host_len + 1;
+ if(master_log > mem_pool + data_size)
+ {
+ master_host = 0;
+ return;
+ }
+
+ master_log_len = strlen(master_log);
+}
+
+Slave_log_event::Slave_log_event(const char* buf, int event_len):
+ Log_event(buf),mem_pool(0),master_host(0)
+{
+ event_len -= LOG_EVENT_HEADER_LEN;
+ if(event_len < 0)
+ return;
+ if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME))))
+ return;
+ memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
+ mem_pool[event_len] = 0;
+ init_from_mem_pool(event_len);
+}