diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 628 |
1 files changed, 416 insertions, 212 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index ac985c266c8..5538e6c0b7f 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -20,6 +20,7 @@ #pragma implementation // gcc: Class implementation #endif #include "mysql_priv.h" +#include "slave.h" #endif /* MYSQL_CLIENT */ @@ -31,6 +32,7 @@ static void pretty_print_char(FILE* file, int c) case '\r': fprintf(file, "\\r"); break; case '\\': fprintf(file, "\\\\"); break; case '\b': fprintf(file, "\\b"); break; + case '\t': fprintf(file, "\\t"); break; case '\'': fprintf(file, "\\'"); break; case 0 : fprintf(file, "\\0"); break; default: @@ -40,6 +42,220 @@ static void pretty_print_char(FILE* file, int c) fputc('\'', file); } +#ifndef MYSQL_CLIENT + +static void pretty_print_char(String* packet, int c) +{ + packet->append('\''); + switch(c) { + case '\n': packet->append( "\\n"); break; + case '\r': packet->append( "\\r"); break; + case '\\': packet->append( "\\\\"); break; + case '\b': packet->append( "\\b"); break; + case '\t': packet->append( "\\t"); break; + case '\'': packet->append( "\\'"); break; + case 0 : packet->append( "\\0"); break; + default: + packet->append((char)c); + break; + } + packet->append('\''); +} + +#endif + +const char* Log_event::get_type_str() +{ + switch(get_type_code()) + { + case START_EVENT: return "Start"; + case STOP_EVENT: return "Stop"; + case QUERY_EVENT: return "Query"; + case ROTATE_EVENT: return "Rotate"; + case INTVAR_EVENT: return "Intvar"; + case LOAD_EVENT: return "Load"; + case SLAVE_EVENT: return "Slave"; + default: /* impossible */ return "Unknown"; + } +} + +#ifndef MYSQL_CLIENT + +void Log_event::pack_info(String* packet) +{ + net_store_data(packet, "", 0); +} + +void Query_log_event::pack_info(String* packet) +{ + String tmp; + if(db && db_len) + { + tmp.append("use "); + tmp.append(db, db_len); + tmp.append("; ", 2); + } + + if(query && q_len) + tmp.append(query, q_len); + net_store_data(packet, (char*)tmp.ptr(), tmp.length()); +} + +void Start_log_event::pack_info(String* packet) +{ + String tmp; + char buf[22]; + + tmp.append("Server ver: "); + tmp.append(server_version); + tmp.append(", Binlog ver: "); + tmp.append(llstr(binlog_version, buf)); + net_store_data(packet, tmp.ptr(), tmp.length()); +} + +void Load_log_event::pack_info(String* packet) +{ + String tmp; + if(db && db_len) + { + tmp.append("use "); + tmp.append(db, db_len); + tmp.append("; ", 2); + } + + tmp.append("LOAD DATA INFILE '"); + tmp.append(fname); + tmp.append("' ", 2); + if(sql_ex.opt_flags && REPLACE_FLAG ) + tmp.append(" REPLACE "); + else if(sql_ex.opt_flags && IGNORE_FLAG ) + tmp.append(" IGNORE "); + + tmp.append("INTO TABLE "); + tmp.append(table_name); + if (!(sql_ex.empty_flags & FIELD_TERM_EMPTY)) + { + tmp.append(" FIELDS TERMINATED BY "); + pretty_print_char(&tmp, sql_ex.field_term); + } + + if (!(sql_ex.empty_flags & ENCLOSED_EMPTY)) + { + if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) + tmp.append(" OPTIONALLY "); + tmp.append( " ENCLOSED BY "); + pretty_print_char(&tmp, sql_ex.enclosed); + } + + if (!(sql_ex.empty_flags & ESCAPED_EMPTY)) + { + tmp.append( " ESCAPED BY "); + pretty_print_char(&tmp, sql_ex.escaped); + } + + if (!(sql_ex.empty_flags & LINE_TERM_EMPTY)) + { + tmp.append(" LINES TERMINATED BY "); + pretty_print_char(&tmp, sql_ex.line_term); + } + + if (!(sql_ex.empty_flags & LINE_START_EMPTY)) + { + tmp.append(" LINES STARTING BY "); + pretty_print_char(&tmp, sql_ex.line_start); + } + + if ((int)skip_lines > 0) + tmp.append( " IGNORE %ld LINES ", (long) skip_lines); + + if (num_fields) + { + uint i; + const char* field = fields; + tmp.append(" ("); + for(i = 0; i < num_fields; i++) + { + if(i) + tmp.append(" ,"); + tmp.append( field); + + field += field_lens[i] + 1; + } + tmp.append(')'); + } + + net_store_data(packet, tmp.ptr(), tmp.length()); +} + +void Rotate_log_event::pack_info(String* packet) +{ + String tmp; + char buf[22]; + tmp.append(new_log_ident, ident_len); + tmp.append(";pos="); + tmp.append(llstr(pos,buf)); + if(flags & LOG_EVENT_FORCED_ROTATE_F) + tmp.append("; forced by master"); + net_store_data(packet, tmp.ptr(), tmp.length()); +} + +void Intvar_log_event::pack_info(String* packet) +{ + String tmp; + char buf[22]; + tmp.append(get_var_type_name()); + tmp.append('='); + tmp.append(llstr(val, buf)); + net_store_data(packet, tmp.ptr(), tmp.length()); +} + +void Slave_log_event::pack_info(String* packet) +{ + String tmp; + char buf[22]; + tmp.append("host="); + tmp.append(master_host); + tmp.append(",port="); + tmp.append(llstr(master_port,buf)); + tmp.append(",log="); + tmp.append(master_log); + tmp.append(",pos="); + tmp.append(llstr(master_pos,buf)); + net_store_data(packet, tmp.ptr(), tmp.length()); +} + + +void Log_event::init_show_field_list(List<Item>* field_list) +{ + field_list->push_back(new Item_empty_string("Log_name", 20)); + field_list->push_back(new Item_empty_string("Pos", 20)); + field_list->push_back(new Item_empty_string("Event_type", 20)); + field_list->push_back(new Item_empty_string("Server_id", 20)); + field_list->push_back(new Item_empty_string("Log_seq", 20)); + field_list->push_back(new Item_empty_string("Info", 20)); +} + +int Log_event::net_send(THD* thd, const char* log_name, ulong pos) +{ + String* packet = &thd->packet; + const char* p = strrchr(log_name, FN_LIBCHAR); + const char* event_type; + if (p) + log_name = p + 1; + + packet->length(0); + net_store_data(packet, log_name, strlen(log_name)); + net_store_data(packet, (longlong)pos); + event_type = get_type_str(); + net_store_data(packet, event_type, strlen(event_type)); + net_store_data(packet, server_id); + net_store_data(packet, log_seq); + pack_info(packet); + return my_net_write(&thd->net, (char*)packet->ptr(), packet->length()); +} + +#endif + int Query_log_event::write(IO_CACHE* file) { return query ? Log_event::write(file) : -1; @@ -52,7 +268,6 @@ int Log_event::write(IO_CACHE* file) int Log_event::write_header(IO_CACHE* file) { - // make sure to change this when the header gets bigger char buf[LOG_EVENT_HEADER_LEN]; char* pos = buf; int4store(pos, when); // timestamp @@ -63,6 +278,10 @@ int Log_event::write_header(IO_CACHE* file) long tmp=get_data_size() + LOG_EVENT_HEADER_LEN; int4store(pos, tmp); pos += 4; + int4store(pos, log_seq); + pos += 4; + int2store(pos, flags); + pos += 2; return (my_b_write(file, (byte*) buf, (uint) (pos - buf))); } @@ -115,91 +334,51 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) { - time_t timestamp; - uint32 server_id; - - char buf[LOG_EVENT_HEADER_LEN-4]; + char head[LOG_EVENT_HEADER_LEN]; if(log_lock) pthread_mutex_lock(log_lock); - if (my_b_read(file, (byte *) buf, sizeof(buf))) + if (my_b_read(file, (byte *) head, sizeof(head))) { if (log_lock) pthread_mutex_unlock(log_lock); - return NULL; - } - timestamp = uint4korr(buf); - server_id = uint4korr(buf + 5); - - switch(buf[EVENT_TYPE_OFFSET]) - { - case QUERY_EVENT: - { - Query_log_event* q = new Query_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - if (!q->query) - { - delete q; - q=NULL; - } - return q; - } - - case LOAD_EVENT: - { - Load_log_event* l = new Load_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - if (!l->table_name) - { - delete l; - l=NULL; - } - return l; + return 0; } + uint data_len = uint4korr(head + EVENT_LEN_OFFSET); + char* buf = 0; + const char* error = 0; + Log_event* res = 0; - case ROTATE_EVENT: + if (data_len > max_allowed_packet) { - Rotate_log_event* r = new Rotate_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - - if (!r->new_log_ident) - { - delete r; - r=NULL; - } - return r; + error = "Event too big"; + goto err; } - case INTVAR_EVENT: + if (data_len < LOG_EVENT_HEADER_LEN) { - Intvar_log_event* e = new Intvar_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - - if (e->type == INVALID_INT_EVENT) - { - delete e; - e=NULL; - } - return e; + error = "Event too small"; + goto err; } - case START_EVENT: - { - Start_log_event* e = new Start_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - return e; - } - case STOP_EVENT: - { - Stop_log_event* e = new Stop_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - return e; - } - default: - break; + if (!(buf = my_malloc(data_len, MYF(MY_WME)))) + { + error = "Out of memory"; + goto err; } - // default + memcpy(buf, head, LOG_EVENT_HEADER_LEN); + if(my_b_read(file, (byte*) buf + LOG_EVENT_HEADER_LEN, + data_len - LOG_EVENT_HEADER_LEN)) + { + error = "read error"; + goto err; + } + res = read_log_event(buf, data_len); +err: if (log_lock) pthread_mutex_unlock(log_lock); - return NULL; + if(error) + sql_print_error(error); + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); + return res; } Log_event* Log_event::read_log_event(const char* buf, int event_len) @@ -245,6 +424,17 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len) return r; } + case SLAVE_EVENT: + { + Slave_log_event* s = new Slave_log_event(buf, event_len); + if (!s->master_host) + { + delete s; + return NULL; + } + + return s; + } case START_EVENT: return new Start_log_event(buf); case STOP_EVENT: return new Stop_log_event(buf); case INTVAR_EVENT: return new Intvar_log_event(buf); @@ -305,6 +495,7 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db) void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) { + char buf[22]; if (short_form) return; @@ -313,51 +504,25 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) if (new_log_ident) my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, MYF(MY_NABP | MY_WME)); - fprintf(file, "\n"); + fprintf(file, "pos=%s\n", llstr(pos, buf)); fflush(file); } -Rotate_log_event::Rotate_log_event(IO_CACHE* file, time_t when_arg, - uint32 server_id): - Log_event(when_arg, 0, 0, server_id),new_log_ident(NULL),alloced(0) -{ - char *tmp_ident; - char buf[4]; - - if (my_b_read(file, (byte*) buf, sizeof(buf))) - return; - ulong event_len; - event_len = uint4korr(buf); - if (event_len < ROTATE_EVENT_OVERHEAD) - return; - - ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD); - if (!(tmp_ident = (char*) my_malloc((uint)ident_len, MYF(MY_WME)))) - return; - if (my_b_read( file, (byte*) tmp_ident, (uint) ident_len)) - { - my_free((gptr) tmp_ident, MYF(0)); - return; - } - - new_log_ident = tmp_ident; - alloced = 1; -} - Start_log_event::Start_log_event(const char* buf) :Log_event(buf) { - buf += EVENT_LEN_OFFSET + 4; // skip even length - binlog_version = uint2korr(buf); - memcpy(server_version, buf + 2, sizeof(server_version)); - created = uint4korr(buf + 2 + sizeof(server_version)); + binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN + + ST_BINLOG_VER_OFFSET); + memcpy(server_version, buf + ST_SERVER_VER_OFFSET + LOG_EVENT_HEADER_LEN, + ST_SERVER_VER_LEN); + created = uint4korr(buf + ST_CREATED_OFFSET + LOG_EVENT_HEADER_LEN); } int Start_log_event::write_data(IO_CACHE* file) { - char buff[sizeof(server_version)+2+4]; - int2store(buff,binlog_version); - memcpy(buff+2,server_version,sizeof(server_version)); - int4store(buff+2+sizeof(server_version),created); + char buff[START_HEADER_LEN]; + int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); + memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); + int4store(buff + ST_CREATED_OFFSET,created); return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); } @@ -369,8 +534,10 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len): if(event_len < ROTATE_EVENT_OVERHEAD) return; + pos = uint8korr(buf + R_POS_OFFSET + LOG_EVENT_HEADER_LEN); ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD); - if (!(new_log_ident = (char*) my_memdup((byte*) buf + LOG_EVENT_HEADER_LEN, + if (!(new_log_ident = (char*) my_memdup((byte*) buf + R_IDENT_OFFSET + + LOG_EVENT_HEADER_LEN, (uint) ident_len, MYF(MY_WME)))) return; @@ -379,42 +546,10 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len): int Rotate_log_event::write_data(IO_CACHE* file) { - return my_b_write(file, (byte*) new_log_ident, (uint) ident_len) ? -1 :0; -} - -Query_log_event::Query_log_event(IO_CACHE* file, time_t when_arg, - uint32 server_id): - Log_event(when_arg,0,0,server_id),data_buf(0),query(NULL),db(NULL) -{ - char buf[QUERY_HEADER_LEN + 4]; - ulong data_len; - if (my_b_read(file, (byte*) buf, sizeof(buf))) - return; // query == NULL will tell the - // caller there was a problem - data_len = uint4korr(buf); - if (data_len < QUERY_EVENT_OVERHEAD) - return; // tear-drop attack protection :) - - data_len -= QUERY_EVENT_OVERHEAD; - exec_time = uint4korr(buf + 8); - db_len = (uint)buf[12]; - error_code = uint2korr(buf + 13); - - /* Allocate one byte extra for end \0 */ - if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME)))) - return; - if (my_b_read( file, (byte*) data_buf, data_len)) - { - my_free((gptr) data_buf, MYF(0)); - data_buf = 0; - return; - } - - thread_id = uint4korr(buf + 4); - db = data_buf; - query=data_buf + db_len + 1; - q_len = data_len - 1 - db_len; - *((char*) query + q_len) = 0; // Safety + char buf[ROTATE_HEADER_LEN]; + int8store(buf, pos + R_POS_OFFSET); + return my_b_write(file, (byte*)buf, ROTATE_HEADER_LEN) || + my_b_write(file, (byte*)new_log_ident, (uint) ident_len); } Query_log_event::Query_log_event(const char* buf, int event_len): @@ -423,19 +558,19 @@ Query_log_event::Query_log_event(const char* buf, int event_len): if ((uint)event_len < QUERY_EVENT_OVERHEAD) return; ulong data_len; - buf += EVENT_LEN_OFFSET; data_len = event_len - QUERY_EVENT_OVERHEAD; + - exec_time = uint4korr(buf + 8); - error_code = uint2korr(buf + 13); + exec_time = uint4korr(buf + LOG_EVENT_HEADER_LEN + Q_EXEC_TIME_OFFSET); + error_code = uint2korr(buf + LOG_EVENT_HEADER_LEN + Q_ERR_CODE_OFFSET); if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME)))) return; - memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len); - thread_id = uint4korr(buf + 4); + memcpy(data_buf, buf + LOG_EVENT_HEADER_LEN + Q_DATA_OFFSET, data_len); + thread_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + Q_THREAD_ID_OFFSET); db = data_buf; - db_len = (uint)buf[12]; + db_len = (uint)buf[LOG_EVENT_HEADER_LEN + Q_DB_LEN_OFFSET]; query=data_buf + db_len + 1; q_len = data_len - 1 - db_len; *((char*)query+q_len) = 0; @@ -474,44 +609,38 @@ int Query_log_event::write_data(IO_CACHE* file) if (!query) return -1; char buf[QUERY_HEADER_LEN]; - char* pos = buf; - int4store(pos, thread_id); - pos += 4; - int4store(pos, exec_time); - pos += 4; - *pos++ = (char)db_len; - int2store(pos, error_code); - pos += 2; + int4store(buf + Q_THREAD_ID_OFFSET, thread_id); + int4store(buf + Q_EXEC_TIME_OFFSET, exec_time); + buf[Q_DB_LEN_OFFSET] = (char)db_len; + int2store(buf + Q_ERR_CODE_OFFSET, error_code); - return (my_b_write(file, (byte*) buf, (uint)(pos - buf)) || + return (my_b_write(file, (byte*) buf, QUERY_HEADER_LEN) || my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || my_b_write(file, (byte*) query, q_len)) ? -1 : 0; } -Intvar_log_event:: Intvar_log_event(IO_CACHE* file, time_t when_arg, - uint32 server_id) - :Log_event(when_arg,0,0,server_id), type(INVALID_INT_EVENT) +Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) { - char buf[9+4]; - if (!my_b_read(file, (byte*) buf, sizeof(buf))) - { - type = buf[4]; - val = uint8korr(buf+1+4); - } + buf += LOG_EVENT_HEADER_LEN; + type = buf[I_TYPE_OFFSET]; + val = uint8korr(buf+I_VAL_OFFSET); } -Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) +const char* Intvar_log_event::get_var_type_name() { - buf += LOG_EVENT_HEADER_LEN; - type = buf[0]; - val = uint8korr(buf+1); + switch(type) + { + case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID"; + case INSERT_ID_EVENT: return "INSERT_ID"; + default: /* impossible */ return "UNKNOWN"; + } } int Intvar_log_event::write_data(IO_CACHE* file) { char buf[9]; - buf[0] = type; - int8store(buf + 1, val); + buf[I_TYPE_OFFSET] = type; + int8store(buf + I_VAL_OFFSET, val); return my_b_write(file, (byte*) buf, sizeof(buf)); } @@ -542,12 +671,12 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) int Load_log_event::write_data(IO_CACHE* file) { char buf[LOAD_HEADER_LEN]; - int4store(buf, thread_id); - int4store(buf + 4, exec_time); - int4store(buf + 8, skip_lines); - buf[12] = (char)table_name_len; - buf[13] = (char)db_len; - int4store(buf + 14, num_fields); + int4store(buf + L_THREAD_ID_OFFSET, thread_id); + int4store(buf + L_EXEC_TIME_OFFSET, exec_time); + int4store(buf + L_SKIP_LINES_OFFSET, skip_lines); + buf[L_TBL_LEN_OFFSET] = (char)table_name_len; + buf[L_DB_LEN_OFFSET] = (char)db_len; + int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); if(my_b_write(file, (byte*)buf, sizeof(buf)) || my_b_write(file, (byte*)&sql_ex, sizeof(sql_ex))) @@ -566,52 +695,33 @@ int Load_log_event::write_data(IO_CACHE* file) return 0; } -Load_log_event::Load_log_event(IO_CACHE* file, time_t when, uint32 server_id): - Log_event(when,0,0,server_id),data_buf(0),num_fields(0), - fields(0),field_lens(0),field_block_len(0), - table_name(0),db(0),fname(0) -{ - char buf[LOAD_HEADER_LEN + 4]; - ulong data_len; - if (my_b_read(file, (byte*)buf, sizeof(buf)) || - my_b_read(file, (byte*)&sql_ex, sizeof(sql_ex))) - return; - - data_len = uint4korr(buf) - LOAD_EVENT_OVERHEAD; - if (!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME)))) - return; - if (my_b_read(file, (byte*)data_buf, data_len)) - return; - copy_log_event(buf,data_len); -} - Load_log_event::Load_log_event(const char* buf, int event_len): Log_event(buf),data_buf(0),num_fields(0),fields(0), field_lens(0),field_block_len(0), table_name(0),db(0),fname(0) { - ulong data_len; - + uint data_len; if((uint)event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN)) return; - buf += EVENT_LEN_OFFSET; - memcpy(&sql_ex, buf + LOAD_HEADER_LEN + 4, sizeof(sql_ex)); - data_len = event_len; - + memcpy(&sql_ex, buf + LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN, + sizeof(sql_ex)); + data_len = event_len - LOAD_HEADER_LEN - LOG_EVENT_HEADER_LEN - + sizeof(sql_ex); if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME)))) return; - memcpy(data_buf, buf + 22 + sizeof(sql_ex), data_len); + memcpy(data_buf, buf +LOG_EVENT_HEADER_LEN + LOAD_HEADER_LEN + + sizeof(sql_ex), data_len); copy_log_event(buf, data_len); } void Load_log_event::copy_log_event(const char *buf, ulong data_len) { - thread_id = uint4korr(buf+4); - exec_time = uint4korr(buf+8); - skip_lines = uint4korr(buf + 12); - table_name_len = (uint)buf[16]; - db_len = (uint)buf[17]; - num_fields = uint4korr(buf + 18); + thread_id = uint4korr(buf + L_THREAD_ID_OFFSET + LOG_EVENT_HEADER_LEN); + exec_time = uint4korr(buf + L_EXEC_TIME_OFFSET + LOG_EVENT_HEADER_LEN); + skip_lines = uint4korr(buf + L_SKIP_LINES_OFFSET + LOG_EVENT_HEADER_LEN); + table_name_len = (uint)buf[L_TBL_LEN_OFFSET + LOG_EVENT_HEADER_LEN]; + db_len = (uint)buf[L_DB_LEN_OFFSET + LOG_EVENT_HEADER_LEN]; + num_fields = uint4korr(buf + L_NUM_FIELDS_OFFSET + LOG_EVENT_HEADER_LEN); if (num_fields > data_len) // simple sanity check against corruption return; @@ -717,6 +827,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) #ifndef MYSQL_CLIENT +void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log) + { + log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++; + } + + void Load_log_event::set_fields(List<Item> &fields) { uint i; @@ -729,4 +845,92 @@ void Load_log_event::set_fields(List<Item> &fields) } +Slave_log_event::Slave_log_event(THD* thd_arg,MASTER_INFO* mi): + Log_event(thd_arg->start_time, 0, 1, thd_arg->server_id), + mem_pool(0),master_host(0) +{ + if(!mi->inited) + return; + pthread_mutex_lock(&mi->lock); + master_host_len = strlen(mi->host); + master_log_len = strlen(mi->log_file_name); + // on OOM, just do not initialize the structure and print the error + if((mem_pool = (char*)my_malloc(get_data_size() + 1, + MYF(MY_WME)))) + { + master_host = mem_pool + SL_MASTER_HOST_OFFSET ; + memcpy(master_host, mi->host, master_host_len + 1); + master_log = master_host + master_host_len + 1; + memcpy(master_log, mi->log_file_name, master_log_len + 1); + master_port = mi->port; + master_pos = mi->pos; + } + else + sql_print_error("Out of memory while recording slave event"); + pthread_mutex_unlock(&mi->lock); +} + + #endif + + +Slave_log_event::~Slave_log_event() +{ + my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR)); +} + +void Slave_log_event::print(FILE* file, bool short_form = 0, + char* last_db = 0) +{ + char llbuff[22]; + if(short_form) + return; + print_header(file); + fputc('\n', file); + fprintf(file, "Slave: master_host='%s' master_port=%d \ + master_log=%s master_pos=%s\n", master_host, master_port, master_log, + llstr(master_pos, llbuff)); +} + +int Slave_log_event::get_data_size() +{ + return master_host_len + master_log_len + 1 + SL_MASTER_HOST_OFFSET; +} + +int Slave_log_event::write_data(IO_CACHE* file) +{ + int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos); + int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port); + // log and host are already there + return my_b_write(file, (byte*)mem_pool, get_data_size()); +} + +void Slave_log_event::init_from_mem_pool(int data_size) +{ + master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET); + master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET); + master_host = mem_pool + SL_MASTER_HOST_OFFSET; + master_host_len = strlen(master_host); + // safety + master_log = master_host + master_host_len + 1; + if(master_log > mem_pool + data_size) + { + master_host = 0; + return; + } + + master_log_len = strlen(master_log); +} + +Slave_log_event::Slave_log_event(const char* buf, int event_len): + Log_event(buf),mem_pool(0),master_host(0) +{ + event_len -= LOG_EVENT_HEADER_LEN; + if(event_len < 0) + return; + if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) + return; + memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len); + mem_pool[event_len] = 0; + init_from_mem_pool(event_len); +} |