diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 491 |
1 files changed, 259 insertions, 232 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 0d66185c8dd..5ff2362e9db 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -31,9 +31,9 @@ inline int my_b_safe_write(IO_CACHE* file, const byte *buf, { /* Sasha: We are not writing this with the ? operator to avoid hitting - a possible compiler bug. At least gcc 2.95 cannot deal with - several layers of ternary operators that evaluated comma(,) operator - expressions inside - I do have a test case if somebody wants it + a possible compiler bug. At least gcc 2.95 cannot deal with + several layers of ternary operators that evaluated comma(,) operator + expressions inside - I do have a test case if somebody wants it */ if (file->type == SEQ_READ_APPEND) return my_b_append(file, buf,len); @@ -80,7 +80,7 @@ static void pretty_print_str(String* packet, char* str, int len) while (str < end) { char c; - switch((c=*str++)) { + switch ((c=*str++)) { case '\n': packet->append( "\\n"); break; case '\r': packet->append( "\\r"); break; case '\\': packet->append( "\\\\"); break; @@ -113,8 +113,7 @@ static inline char* slave_load_file_stem(char*buf, uint file_id, const char* Log_event::get_type_str() { - switch(get_type_code()) - { + switch(get_type_code()) { case START_EVENT: return "Start"; case STOP_EVENT: return "Stop"; case QUERY_EVENT: return "Query"; @@ -132,10 +131,9 @@ const char* Log_event::get_type_str() } #ifndef MYSQL_CLIENT -Log_event::Log_event(THD* thd_arg, uint16 flags_arg): - exec_time(0), - flags(flags_arg),cached_event_len(0), - temp_buf(0),thd(thd_arg) +Log_event::Log_event(THD* thd_arg, uint16 flags_arg) + :exec_time(0), flags(flags_arg), cached_event_len(0), + temp_buf(0), thd(thd_arg) { if (thd) { @@ -151,6 +149,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg): } } + static void cleanup_load_tmpdir() { MY_DIR *dirp; @@ -163,7 +162,7 @@ static void cleanup_load_tmpdir() { file=dirp->dir_entry+i; if (is_prefix(file->name,"SQL_LOAD-")) - my_delete(file->name,MYF(0)); + my_delete(file->name, MYF(0)); } my_dirend(dirp); @@ -171,8 +170,8 @@ static void cleanup_load_tmpdir() #endif -Log_event::Log_event(const char* buf, bool old_format): - cached_event_len(0),temp_buf(0) +Log_event::Log_event(const char* buf, bool old_format) + :cached_event_len(0), temp_buf(0) { when = uint4korr(buf); server_id = uint4korr(buf + SERVER_ID_OFFSET); @@ -303,7 +302,7 @@ void Load_log_event::pack_info(String* packet) uint i; const char* field = fields; tmp.append(" ("); - for(i = 0; i < num_fields; i++) + for (i = 0; i < num_fields; i++) { if (i) tmp.append(" ,"); @@ -319,10 +318,9 @@ void Load_log_event::pack_info(String* packet) void Rotate_log_event::pack_info(String* packet) { - char buf1[256]; + char buf1[256], buf[22]; String tmp(buf1, sizeof(buf1)); tmp.length(0); - char buf[22]; tmp.append(new_log_ident, ident_len); tmp.append(";pos="); tmp.append(llstr(pos,buf)); @@ -333,10 +331,9 @@ void Rotate_log_event::pack_info(String* packet) void Intvar_log_event::pack_info(String* packet) { - char buf1[256]; + char buf1[256], buf[22]; String tmp(buf1, sizeof(buf1)); tmp.length(0); - char buf[22]; tmp.append(get_var_type_name()); tmp.append('='); tmp.append(llstr(val, buf)); @@ -345,14 +342,14 @@ void Intvar_log_event::pack_info(String* packet) void Slave_log_event::pack_info(String* packet) { - char buf1[256]; + char buf1[256], buf[22], *end; String tmp(buf1, sizeof(buf1)); tmp.length(0); - char buf[22]; tmp.append("host="); tmp.append(master_host); tmp.append(",port="); - tmp.append(llstr(master_port,buf)); + end= int10_to_str((long) master_port, buf, 10); + tmp.append(buf, (uint32) (end-buf)); tmp.append(",log="); tmp.append(master_log); tmp.append(",pos="); @@ -390,18 +387,21 @@ int Log_event::net_send(THD* thd, const char* log_name, my_off_t pos) return my_net_write(&thd->net, (char*) packet->ptr(), packet->length()); } -#endif +#endif /* MYSQL_CLIENT */ + int Query_log_event::write(IO_CACHE* file) { return query ? Log_event::write(file) : -1; } + int Log_event::write(IO_CACHE* file) { return (write_header(file) || write_data(file)) ? -1 : 0; } + int Log_event::write_header(IO_CACHE* file) { char buf[LOG_EVENT_HEADER_LEN]; @@ -427,54 +427,61 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, pthread_mutex_t* log_lock) { ulong data_len; + int result=0; char buf[LOG_EVENT_HEADER_LEN]; + if (log_lock) pthread_mutex_lock(log_lock); if (my_b_read(file, (byte*) buf, sizeof(buf))) { - if (log_lock) pthread_mutex_unlock(log_lock); - // if the read hits eof, we must report it as eof - // so the caller will know it can go into cond_wait to be woken up - // on the next update to the log - if (!file->error) return LOG_READ_EOF; - return file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO; + /* + If the read hits eof, we must report it as eof so the caller + will know it can go into cond_wait to be woken up on the next + update to the log. + */ + if (!file->error) + result= LOG_READ_EOF; + else + result= (file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO); + goto end; } - data_len = uint4korr(buf + EVENT_LEN_OFFSET); + data_len= uint4korr(buf + EVENT_LEN_OFFSET); if (data_len < LOG_EVENT_HEADER_LEN || data_len > max_allowed_packet) { - if (log_lock) pthread_mutex_unlock(log_lock); - return (data_len < LOG_EVENT_HEADER_LEN) ? LOG_READ_BOGUS : - LOG_READ_TOO_LARGE; + + result= ((data_len < LOG_EVENT_HEADER_LEN) ? LOG_READ_BOGUS : + LOG_READ_TOO_LARGE); + goto end; } packet->append(buf, sizeof(buf)); - data_len -= LOG_EVENT_HEADER_LEN; + data_len-= LOG_EVENT_HEADER_LEN; if (data_len) { if (packet->append(file, data_len)) { - if (log_lock) - pthread_mutex_unlock(log_lock); - // here we should never hit eof in a non-error condtion - // eof means we are reading the event partially, which should - // never happen - return file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO; + /* + Here we should never hit eof in a non-error condtion + eof means we are reading the event partially, which should + never happen. + */ + result= file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO; + /* Implicit goto end; */ } } - if (log_lock) pthread_mutex_unlock(log_lock); - return 0; + +end: + if (log_lock) + pthread_mutex_unlock(log_lock); + return result; } #endif // MYSQL_CLIENT #ifndef MYSQL_CLIENT #define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock); -#else -#define UNLOCK_MUTEX -#endif - -#ifndef MYSQL_CLIENT #define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock); #else +#define UNLOCK_MUTEX #define LOCK_MUTEX #endif @@ -488,19 +495,19 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) #endif { char head[LOG_EVENT_HEADER_LEN]; - uint header_size = old_format ? OLD_HEADER_LEN : - LOG_EVENT_HEADER_LEN; + uint header_size= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; + LOCK_MUTEX; - if (my_b_read(file, (byte *) head, header_size )) + if (my_b_read(file, (byte *) head, header_size)) { UNLOCK_MUTEX; return 0; } uint data_len = uint4korr(head + EVENT_LEN_OFFSET); - char* buf = 0; - const char* error = 0; - Log_event* res = 0; + char *buf= 0; + const char *error= 0; + Log_event *res= 0; if (data_len > max_allowed_packet) { @@ -522,14 +529,14 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) } buf[data_len] = 0; memcpy(buf, head, header_size); - if (my_b_read(file, (byte*) buf + header_size, - data_len - header_size)) + if (my_b_read(file, (byte*) buf + header_size, data_len - header_size)) { error = "read error"; goto err; } if ((res = read_log_event(buf, data_len, &error, old_format))) res->register_temp_buf(buf); + err: UNLOCK_MUTEX; if (error) @@ -541,17 +548,20 @@ data_len=%d,event_type=%d",error,data_len,head[EVENT_TYPE_OFFSET]); return res; } + Log_event* Log_event::read_log_event(const char* buf, int event_len, const char **error, bool old_format) { if (event_len < EVENT_LEN_OFFSET || - (uint)event_len != uint4korr(buf+EVENT_LEN_OFFSET)) + (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET)) + { + *error="Sanity check failed"; // Needed to free buffer return NULL; // general sanity check - will fail on a partial read + } Log_event* ev = NULL; - switch(buf[EVENT_TYPE_OFFSET]) - { + switch(buf[EVENT_TYPE_OFFSET]) { case QUERY_EVENT: ev = new Query_log_event(buf, event_len, old_format); break; @@ -591,8 +601,7 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, default: break; } - if (!ev) return 0; - if (!ev->is_valid()) + if (!ev || !ev->is_valid()) { *error= "Found invalid event in binary log"; delete ev; @@ -602,27 +611,24 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len, return ev; } + #ifdef MYSQL_CLIENT void Log_event::print_header(FILE* file) { char llbuff[22]; fputc('#', file); print_timestamp(file); - fprintf(file, " server id %d log_pos %s ", server_id, + fprintf(file, " server id %d log_pos %s ", server_id, llstr(log_pos,llbuff)); } void Log_event::print_timestamp(FILE* file, time_t* ts) { -#ifdef MYSQL_SERVER - struct tm tm_tmp; -#endif struct tm *res; if (!ts) - { ts = &when; - } -#ifdef MYSQL_SERVER +#ifdef MYSQL_SERVER // This is always false + struct tm tm_tmp; localtime_r(ts,(res= &tm_tmp)); #else res=localtime(ts); @@ -678,8 +684,10 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) #endif /* #ifdef MYSQL_CLIENT */ + Start_log_event::Start_log_event(const char* buf, - bool old_format) :Log_event(buf, old_format) + bool old_format) + :Log_event(buf, old_format) { buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; binlog_version = uint2korr(buf+ST_BINLOG_VER_OFFSET); @@ -697,9 +705,10 @@ int Start_log_event::write_data(IO_CACHE* file) return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); } + Rotate_log_event::Rotate_log_event(const char* buf, int event_len, - bool old_format): - Log_event(buf, old_format),new_log_ident(NULL),alloced(0) + bool old_format) + :Log_event(buf, old_format),new_log_ident(NULL),alloced(0) { // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; @@ -726,29 +735,31 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len, alloced = 1; } + int Rotate_log_event::write_data(IO_CACHE* file) { char buf[ROTATE_HEADER_LEN]; int8store(buf, pos + R_POS_OFFSET); - return my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || - my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len); + return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || + my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len)); } + #ifndef MYSQL_CLIENT Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, - bool using_trans): - Log_event(thd_arg), data_buf(0), query(query_arg), db(thd_arg->db), - q_len(thd_arg->query_length), - error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno), - thread_id(thd_arg->thread_id), - cache_stmt(using_trans && - (thd_arg->options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN))) - { - time_t end_time; - time(&end_time); - exec_time = (ulong) (end_time - thd->start_time); - db_len = (db) ? (uint32) strlen(db) : 0; - } + bool using_trans) + :Log_event(thd_arg), data_buf(0), query(query_arg), db(thd_arg->db), + q_len(thd_arg->query_length), + error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno), + thread_id(thd_arg->thread_id), + cache_stmt(using_trans && + (thd_arg->options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN))) +{ + time_t end_time; + time(&end_time); + exec_time = (ulong) (end_time - thd->start_time); + db_len = (db) ? (uint32) strlen(db) : 0; +} #endif Query_log_event::Query_log_event(const char* buf, int event_len, @@ -786,6 +797,7 @@ Query_log_event::Query_log_event(const char* buf, int event_len, *((char*)query+q_len) = 0; } + #ifdef MYSQL_CLIENT void Query_log_event::print(FILE* file, bool short_form, char* last_db) @@ -801,10 +813,10 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) bool same_db = 0; if (db && last_db) - { - if (!(same_db = !memcmp(last_db, db, db_len + 1))) - memcpy(last_db, db, db_len + 1); - } + { + if (!(same_db = !memcmp(last_db, db, db_len + 1))) + memcpy(last_db, db, db_len + 1); + } if (db && db[0] && !same_db) fprintf(file, "use %s;\n", db); @@ -815,17 +827,18 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); fprintf(file, ";\n"); } - #endif + int Query_log_event::write_data(IO_CACHE* file) { - if (!query) return -1; + if (!query) + return -1; char buf[QUERY_HEADER_LEN]; int4store(buf + Q_THREAD_ID_OFFSET, thread_id); int4store(buf + Q_EXEC_TIME_OFFSET, exec_time); - buf[Q_DB_LEN_OFFSET] = (char)db_len; + buf[Q_DB_LEN_OFFSET] = (char) db_len; int2store(buf + Q_ERR_CODE_OFFSET, error_code); return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) || @@ -833,8 +846,8 @@ int Query_log_event::write_data(IO_CACHE* file) my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0; } -Intvar_log_event::Intvar_log_event(const char* buf, bool old_format): - Log_event(buf, old_format) +Intvar_log_event::Intvar_log_event(const char* buf, bool old_format) + :Log_event(buf, old_format) { buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; type = buf[I_TYPE_OFFSET]; @@ -843,8 +856,7 @@ Intvar_log_event::Intvar_log_event(const char* buf, bool old_format): const char* Intvar_log_event::get_var_type_name() { - switch(type) - { + switch(type) { case LAST_INSERT_ID_EVENT: return "LAST_INSERT_ID"; case INSERT_ID_EVENT: return "INSERT_ID"; default: /* impossible */ return "UNKNOWN"; @@ -863,6 +875,9 @@ int Intvar_log_event::write_data(IO_CACHE* file) void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) { char llbuff[22]; + const char *msg; + LINT_INIT(msg); + if (!short_form) { print_header(file); @@ -870,21 +885,20 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) } fprintf(file, "SET "); - switch(type) - { + switch (type) { case LAST_INSERT_ID_EVENT: - fprintf(file, "LAST_INSERT_ID = "); + msg="LAST_INSERT_ID"; break; case INSERT_ID_EVENT: - fprintf(file, "INSERT_ID = "); + msg="INSERT_ID"; break; } - fprintf(file, "%s;\n", llstr(val,llbuff)); + fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff)); fflush(file); - } #endif + int Load_log_event::write_data_header(IO_CACHE* file) { char buf[LOAD_HEADER_LEN]; @@ -899,7 +913,8 @@ int Load_log_event::write_data_header(IO_CACHE* file) int Load_log_event::write_data_body(IO_CACHE* file) { - if (sql_ex.write_data(file)) return 1; + if (sql_ex.write_data(file)) + return 1; if (num_fields && fields && field_lens) { if (my_b_safe_write(file, (byte*)field_lens, num_fields) || @@ -912,12 +927,14 @@ int Load_log_event::write_data_body(IO_CACHE* file) } + static bool write_str(IO_CACHE *file, char *str, byte length) { return (my_b_safe_write(file, &length, 1) || my_b_safe_write(file, (byte*) str, (int) length)); } + int sql_ex_info::write_data(IO_CACHE* file) { if (new_format()) @@ -943,6 +960,7 @@ int sql_ex_info::write_data(IO_CACHE* file) } } + static inline int read_str(char * &buf, char *buf_end, char * &str, uint8 &len) { @@ -954,6 +972,7 @@ static inline int read_str(char * &buf, char *buf_end, char * &str, return 0; } + char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) { cached_new_format = use_new_format; @@ -978,13 +997,13 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) else { field_term_len= enclosed_len= line_term_len= line_start_len= escaped_len=1; - field_term = buf++; - enclosed= buf++; - line_term= buf++; - line_start= buf++; - escaped= buf++; - opt_flags = *buf++; - empty_flags=*buf++; + field_term = buf++; // Use first byte in string + enclosed= buf++; + line_term= buf++; + line_start= buf++; + escaped= buf++; + opt_flags = *buf++; + empty_flags= *buf++; if (empty_flags & FIELD_TERM_EMPTY) field_term_len=0; if (empty_flags & ENCLOSED_EMPTY) @@ -1006,7 +1025,7 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, List<Item>& fields_arg, enum enum_duplicates handle_dup) :Log_event(thd),thread_id(thd->thread_id), num_fields(0),fields(0), field_lens(0),field_block_len(0), table_name(table_name_arg), - db(db_arg), fname(ex->file_name) + db(db_arg), fname(ex->file_name) { time_t end_time; time(&end_time); @@ -1034,14 +1053,12 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, sql_ex.empty_flags = 0; - switch(handle_dup) - { + switch (handle_dup) { case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break; case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break; case DUP_ERROR: break; } - if (!ex->field_term->length()) sql_ex.empty_flags |= FIELD_TERM_EMPTY; if (!ex->enclosed->length()) @@ -1074,8 +1091,11 @@ Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, #endif -// the caller must do buf[event_len] = 0 before he starts using the -// constructed event +/* + The caller must do buf[event_len] = 0 before he starts using the + constructed event. +*/ + Load_log_event::Load_log_event(const char* buf, int event_len, bool old_format): Log_event(buf, old_format),num_fields(0),fields(0), @@ -1093,7 +1113,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, uint data_len; char* buf_end = (char*)buf + event_len; const char* data_head = buf + ((old_format) ? - OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN); + OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN); thread_id = uint4korr(data_head + L_THREAD_ID_OFFSET); exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET); skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET); @@ -1101,13 +1121,16 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, db_len = (uint)data_head[L_DB_LEN_OFFSET]; num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET); - int body_offset = (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? - LOAD_HEADER_LEN + OLD_HEADER_LEN : get_data_body_offset(); + int body_offset = ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + LOAD_HEADER_LEN + OLD_HEADER_LEN : + get_data_body_offset()); if ((int) event_len < body_offset) return 1; - //sql_ex.init() on success returns the pointer to the first byte after - //the sql_ex structure, which is the start of field lengths array + /* + Sql_ex.init() on success returns the pointer to the first byte after + the sql_ex structure, which is the start of field lengths array. + */ if (!(field_lens=(uchar*)sql_ex.init((char*)buf + body_offset, buf_end, buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) @@ -1116,11 +1139,9 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, data_len = event_len - body_offset; if (num_fields > data_len) // simple sanity check against corruption return 1; - uint i; - for (i = 0; i < num_fields; i++) - { + for (uint i = 0; i < num_fields; i++) field_block_len += (uint)field_lens[i] + 1; - } + fields = (char*)field_lens + num_fields; table_name = fields + field_block_len; db = table_name + table_name_len + 1; @@ -1142,7 +1163,6 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) } bool same_db = 0; - if (db && last_db) { if (!(same_db = !memcmp(last_db, db, db_len + 1))) @@ -1199,8 +1219,8 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) { uint i; const char* field = fields; - fprintf( file, " ("); - for(i = 0; i < num_fields; i++) + fprintf(file, " ("); + for (i = 0; i < num_fields; i++) { if (i) fputc(',', file); @@ -1219,10 +1239,11 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) #ifndef MYSQL_CLIENT void Log_event::set_log_pos(MYSQL_LOG* log) - { - if (!log_pos) - log_pos = my_b_tell(&log->log_file); - } +{ + if (!log_pos) + log_pos = my_b_tell(&log->log_file); +} + void Load_log_event::set_fields(List<Item> &fields) { @@ -1270,8 +1291,7 @@ Slave_log_event::Slave_log_event(THD* thd_arg, DBUG_VOID_RETURN; } - -#endif +#endif /* ! MYSQL_CLIENT */ Slave_log_event::~Slave_log_event() @@ -1293,7 +1313,7 @@ master_log: '%s' master_pos: %s\n", master_host, master_port, master_log, llstr(master_pos, llbuff)); } -#endif +#endif /* MYSQL_CLIENT */ int Slave_log_event::get_data_size() { @@ -1308,6 +1328,7 @@ int Slave_log_event::write_data(IO_CACHE* file) return my_b_safe_write(file, (byte*)mem_pool, get_data_size()); } + void Slave_log_event::init_from_mem_pool(int data_size) { master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET); @@ -1324,13 +1345,13 @@ void Slave_log_event::init_from_mem_pool(int data_size) master_log_len = strlen(master_log); } -Slave_log_event::Slave_log_event(const char* buf, int event_len): - Log_event(buf,0),mem_pool(0),master_host(0) +Slave_log_event::Slave_log_event(const char* buf, int event_len) + :Log_event(buf,0),mem_pool(0),master_host(0) { event_len -= LOG_EVENT_HEADER_LEN; if (event_len < 0) return; - if (!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) + if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME)))) return; memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len); mem_pool[event_len] = 0; @@ -1341,10 +1362,10 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len): Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex, const char* db_arg, const char* table_name_arg, List<Item>& fields_arg, enum enum_duplicates handle_dup, - char* block_arg, uint block_len_arg): - Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup), - fake_base(0),block(block_arg),block_len(block_len_arg), - file_id(thd_arg->file_id = mysql_bin_log.next_file_id()) + char* block_arg, uint block_len_arg) + :Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup), + fake_base(0),block(block_arg),block_len(block_len_arg), + file_id(thd_arg->file_id = mysql_bin_log.next_file_id()) { sql_ex.force_new_format(); } @@ -1379,8 +1400,8 @@ int Create_file_log_event::write_base(IO_CACHE* file) } Create_file_log_event::Create_file_log_event(const char* buf, int len, - bool old_format): - Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0) + bool old_format) + :Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0) { int block_offset; if (copy_log_event(buf,len,old_format)) @@ -1389,8 +1410,9 @@ Create_file_log_event::Create_file_log_event(const char* buf, int len, { file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET); - block_offset = LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() + - CREATE_FILE_HEADER_LEN + 1; // 1 for \0 terminating fname + // + 1 for \0 terminating fname + block_offset = (LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() + + CREATE_FILE_HEADER_LEN + 1); if (len < block_offset) return; block = (char*)buf + block_offset; @@ -1418,33 +1440,34 @@ void Create_file_log_event::print(FILE* file, bool short_form, #ifndef MYSQL_CLIENT void Create_file_log_event::pack_info(String* packet) { - char buf1[256]; + char buf1[256],buf[22], *end; String tmp(buf1, sizeof(buf1)); tmp.length(0); - char buf[22]; tmp.append("db="); tmp.append(db, db_len); tmp.append(";table="); tmp.append(table_name, table_name_len); tmp.append(";file_id="); - tmp.append(llstr(file_id,buf)); + end= int10_to_str((long) file_id, buf, 10); + tmp.append(buf, (uint32) (end-buf)); tmp.append(";block_len="); - tmp.append(llstr(block_len,buf)); - net_store_data(packet, (char*)tmp.ptr(), tmp.length()); + end= int10_to_str((long) block_len, buf, 10); + tmp.append(buf, (uint32) (end-buf)); + net_store_data(packet, (char*) tmp.ptr(), tmp.length()); } #endif #ifndef MYSQL_CLIENT Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg, - uint block_len_arg): - Log_event(thd_arg), block(block_arg),block_len(block_len_arg), - file_id(thd_arg->file_id) + uint block_len_arg) + :Log_event(thd_arg), block(block_arg),block_len(block_len_arg), + file_id(thd_arg->file_id) { } #endif -Append_block_log_event::Append_block_log_event(const char* buf, int len): - Log_event(buf, 0),block(0) +Append_block_log_event::Append_block_log_event(const char* buf, int len) + :Log_event(buf, 0),block(0) { if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) return; @@ -1473,36 +1496,31 @@ void Append_block_log_event::print(FILE* file, bool short_form, file_id, block_len); } #endif + #ifndef MYSQL_CLIENT void Append_block_log_event::pack_info(String* packet) { char buf1[256]; - String tmp(buf1, sizeof(buf1)); - tmp.length(0); - char buf[22]; - tmp.append(";file_id="); - tmp.append(llstr(file_id,buf)); - tmp.append(";block_len="); - tmp.append(llstr(block_len,buf)); - net_store_data(packet, (char*)tmp.ptr(), tmp.length()); + sprintf(buf1, ";file_id=%u;block_len=%u", file_id, block_len); + net_store_data(packet, buf1); } -#endif -#ifndef MYSQL_CLIENT -Delete_file_log_event::Delete_file_log_event(THD* thd_arg): - Log_event(thd_arg),file_id(thd_arg->file_id) +Delete_file_log_event::Delete_file_log_event(THD* thd_arg) + :Log_event(thd_arg),file_id(thd_arg->file_id) { } #endif -Delete_file_log_event::Delete_file_log_event(const char* buf, int len): - Log_event(buf, 0),file_id(0) + +Delete_file_log_event::Delete_file_log_event(const char* buf, int len) + :Log_event(buf, 0),file_id(0) { if ((uint)len < DELETE_FILE_EVENT_OVERHEAD) return; file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); } + int Delete_file_log_event::write_data(IO_CACHE* file) { byte buf[DELETE_FILE_HEADER_LEN]; @@ -1518,32 +1536,29 @@ void Delete_file_log_event::print(FILE* file, bool short_form, return; print_header(file); fputc('\n', file); - fprintf(file, "#Delete_file: file_id=%d\n", - file_id); + fprintf(file, "#Delete_file: file_id=%u\n", file_id); } #endif + #ifndef MYSQL_CLIENT void Delete_file_log_event::pack_info(String* packet) { - char buf1[256]; - String tmp(buf1, sizeof(buf1)); - tmp.length(0); - char buf[22]; - tmp.append(";file_id="); - tmp.append(llstr(file_id,buf)); - net_store_data(packet, (char*)tmp.ptr(), tmp.length()); + char buf1[64]; + sprintf(buf1, ";file_id=%u", (uint) file_id); + net_store_data(packet, buf1); } #endif + #ifndef MYSQL_CLIENT -Execute_load_log_event::Execute_load_log_event(THD* thd_arg): - Log_event(thd_arg),file_id(thd_arg->file_id) +Execute_load_log_event::Execute_load_log_event(THD* thd_arg) + :Log_event(thd_arg),file_id(thd_arg->file_id) { } #endif -Execute_load_log_event::Execute_load_log_event(const char* buf,int len): - Log_event(buf, 0),file_id(0) +Execute_load_log_event::Execute_load_log_event(const char* buf,int len) + :Log_event(buf, 0),file_id(0) { if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD) return; @@ -1572,13 +1587,9 @@ void Execute_load_log_event::print(FILE* file, bool short_form, #ifndef MYSQL_CLIENT void Execute_load_log_event::pack_info(String* packet) { - char buf1[256]; - String tmp(buf1, sizeof(buf1)); - tmp.length(0); - char buf[22]; - tmp.append(";file_id="); - tmp.append(llstr(file_id,buf)); - net_store_data(packet, (char*)tmp.ptr(), tmp.length()); + char buf[64]; + sprintf(buf, ";file_id=%u", (uint) file_id); + net_store_data(packet, buf); } #endif @@ -1600,16 +1611,18 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) thd->query_error = 0; // clear error thd->net.last_errno = 0; thd->net.last_error[0] = 0; - thd->slave_proxy_id = thread_id; // for temp tables + thd->slave_proxy_id = thread_id; // for temp tables - // sanity check to make sure the master did not get a really bad - // error on the query + /* + Sanity check to make sure the master did not get a really bad + error on the query. + */ if (ignored_error_code((expected_error = error_code)) || !check_expected_error(thd,rli,expected_error)) { mysql_parse(thd, thd->query, q_len); - if (expected_error != - (actual_error = thd->net.last_errno) && expected_error && + if ((expected_error != (actual_error = thd->net.last_errno)) && + expected_error && !ignored_error_code(actual_error) && !ignored_error_code(expected_error)) { @@ -1621,8 +1634,8 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) actual_error); thd->query_error = 1; } - else if (expected_error == actual_error - || ignored_error_code(actual_error)) + else if (expected_error == actual_error || + ignored_error_code(actual_error)) { thd->query_error = 0; *rli->last_slave_error = 0; @@ -1639,8 +1652,8 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) return 1; } } - thd->db = 0; // prevent db from being freed - thd->query = 0; // just to be sure + thd->db= 0; // prevent db from being freed + thd->query= 0; // just to be sure // assume no convert for next query unless set explictly thd->convert_set = 0; close_thread_tables(thd); @@ -1648,8 +1661,8 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) if (thd->query_error || thd->fatal_error) { slave_print_error(rli,actual_error, "error '%s' on query '%s'", - actual_error ? thd->net.last_error : - "unexpected success or fatal error", query); + actual_error ? thd->net.last_error : + "unexpected success or fatal error", query); free_root(&thd->mem_root,0); return 1; } @@ -1657,6 +1670,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) return Log_event::exec_event(rli); } + int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) { init_sql_alloc(&thd->mem_root, 8192,0); @@ -1710,32 +1724,35 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) { // mysql_load will use thd->net to read the file thd->net.vio = net->vio; - // make sure the client does not get confused - // about the packet sequence + /* + Make sure the client does not get confused about the packet sequence + */ thd->net.pkt_nr = net->pkt_nr; } if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, - TL_WRITE)) + TL_WRITE)) thd->query_error = 1; if (thd->cuted_fields) sql_print_error("Slave: load data infile at position %s in log \ '%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME, thd->cuted_fields ); if (net) - net->pkt_nr = thd->net.pkt_nr; + net->pkt_nr= thd->net.pkt_nr; } } else { - // we will just ask the master to send us /dev/null if we do not - // want to load the data - // TODO: this a bug - needs to be done in I/O thread + /* + We will just ask the master to send us /dev/null if we do not + want to load the data. + TODO: this a bug - needs to be done in I/O thread + */ if (net) skip_load_data_infile(net); } thd->net.vio = 0; - thd->db = 0;// prevent db from being freed + thd->db= 0; // prevent db from being freed close_thread_tables(thd); if (thd->query_error) { @@ -1745,7 +1762,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) slave_print_error(rli,sql_error, "Slave: Error '%s' running load data infile ", - ER_SAFE(sql_error)); + ER_SAFE(sql_error)); free_root(&thd->mem_root,0); return 1; } @@ -1760,21 +1777,26 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli) return Log_event::exec_event(rli); } + int Start_log_event::exec_event(struct st_relay_log_info* rli) { close_temporary_tables(thd); - // if we have old format, load_tmpdir is cleaned up by the I/O thread - // TODO: cleanup_load_tmpdir() needs to remove only the files associated - // with the server id that has just started + /* + If we have old format, load_tmpdir is cleaned up by the I/O thread + + TODO: cleanup_load_tmpdir() needs to remove only the files associated + with the server id that has just started + */ if (!rli->mi->old_format) cleanup_load_tmpdir(); return Log_event::exec_event(rli); } + int Stop_log_event::exec_event(struct st_relay_log_info* rli) { // do not clean up immediately after rotate event - if (rli->master_log_pos > 4) + if (rli->master_log_pos > BIN_LOG_HEADER_SIZE) { close_temporary_tables(thd); cleanup_load_tmpdir(); @@ -1809,7 +1831,10 @@ int Rotate_log_event::exec_event(struct st_relay_log_info* rli) && mysql_bin_log.is_open()); rotate_binlog = (*log_name && write_slave_event); if (ident_len >= sizeof(rli->master_log_name)) + { + pthread_mutex_unlock(&rli->data_lock); DBUG_RETURN(1); + } memcpy(log_name, new_log_ident,ident_len); log_name[ident_len] = 0; } @@ -1907,7 +1932,8 @@ int Create_file_log_event::exec_event(struct st_relay_log_info* rli) } if (mysql_bin_log.is_open()) mysql_bin_log.write(this); - error=0; + error=0; // Everything is ok + err: if (error) end_io_cache(&file); @@ -1919,12 +1945,11 @@ err: int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; - char* p; - p = slave_load_file_stem(fname, file_id, server_id); + char *p= slave_load_file_stem(fname, file_id, server_id); memcpy(p, ".data", 6); - (void)my_delete(fname, MYF(MY_WME)); + (void) my_delete(fname, MYF(MY_WME)); memcpy(p, ".info", 6); - (void)my_delete(fname, MYF(MY_WME)); + (void) my_delete(fname, MYF(MY_WME)); if (mysql_bin_log.is_open()) mysql_bin_log.write(this); return Log_event::exec_event(rli); @@ -1933,10 +1958,10 @@ int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) int Append_block_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; - char* p; - int fd = -1; + char *p= slave_load_file_stem(fname, file_id, server_id); + int fd; int error = 1; - p = slave_load_file_stem(fname, file_id, server_id); + memcpy(p, ".data", 6); if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY, MYF(MY_WME))) < 0) { @@ -1951,6 +1976,7 @@ int Append_block_log_event::exec_event(struct st_relay_log_info* rli) if (mysql_bin_log.is_open()) mysql_bin_log.write(this); error=0; + err: if (fd >= 0) my_close(fd, MYF(0)); @@ -1960,15 +1986,14 @@ err: int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) { char fname[FN_REFLEN+10]; - char* p; - int fd = -1; + char *p= slave_load_file_stem(fname, file_id, server_id); + int fd; int error = 1; ulong save_options; IO_CACHE file; Load_log_event* lev = 0; - p = slave_load_file_stem(fname, file_id, server_id); + memcpy(p, ".info", 6); - bzero((char*)&file, sizeof(file)); if ((fd = my_open(fname, O_RDONLY|O_BINARY, MYF(MY_WME))) < 0 || init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) @@ -1978,8 +2003,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) } if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, (pthread_mutex_t*)0, - (bool)0)) - || lev->get_type_code() != NEW_LOAD_EVENT) + (bool)0)) || + lev->get_type_code() != NEW_LOAD_EVENT) { slave_print_error(rli,0, "File '%s' appears corrupted", fname); goto err; @@ -2000,19 +2025,21 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) goto err; } thd->options = save_options; - (void)my_delete(fname, MYF(MY_WME)); + (void) my_delete(fname, MYF(MY_WME)); memcpy(p, ".data", 6); - (void)my_delete(fname, MYF(MY_WME)); + (void) my_delete(fname, MYF(MY_WME)); if (mysql_bin_log.is_open()) mysql_bin_log.write(this); error = 0; + err: delete lev; - end_io_cache(&file); if (fd >= 0) + { my_close(fd, MYF(0)); + end_io_cache(&file); + } return error ? error : Log_event::exec_event(rli); } - -#endif +#endif /* !MYSQL_CLIENT */ |