diff options
Diffstat (limited to 'sql/log_event.h')
-rw-r--r-- | sql/log_event.h | 404 |
1 files changed, 238 insertions, 166 deletions
diff --git a/sql/log_event.h b/sql/log_event.h index 1b92ff7ff83..2165a620fa3 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -42,6 +42,29 @@ */ #define ST_SERVER_VER_LEN 50 +#define DUMPFILE_FLAG 0x1 +#define OPT_ENCLOSED_FLAG 0x2 +#define REPLACE_FLAG 0x4 +#define IGNORE_FLAG 0x8 + +#define FIELD_TERM_EMPTY 0x1 +#define ENCLOSED_EMPTY 0x2 +#define LINE_TERM_EMPTY 0x4 +#define LINE_START_EMPTY 0x8 +#define ESCAPED_EMPTY 0x10 + + +struct sql_ex_info + { + char field_term; + char enclosed; + char line_term; + char line_start; + char escaped; + char opt_flags; // flags for the options + char empty_flags; // flags to indicate which of the terminating charact + } ; + /* Binary log consists of events. Each event has a fixed length header, followed by possibly variable ( depending on the type of event) length data body. The data body consists of an optional fixed length segment @@ -49,13 +72,17 @@ comments below for the format specifics */ + /* event-specific post-header sizes */ #define LOG_EVENT_HEADER_LEN 19 #define QUERY_HEADER_LEN (4 + 4 + 1 + 2) -#define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4) +#define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4+sizeof(struct sql_ex_info)) #define START_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4) #define ROTATE_HEADER_LEN 8 -#define CREATE_FILE_HEADER_LEN 6 +#define CREATE_FILE_HEADER_LEN 4 +#define APPEND_BLOCK_HEADER_LEN 4 +#define EXEC_LOAD_HEADER_LEN 4 +#define DELETE_FILE_HEADER_LEN 4 /* event header offsets */ @@ -98,6 +125,7 @@ #define L_DB_LEN_OFFSET 12 #define L_TBL_LEN_OFFSET 13 #define L_NUM_FIELDS_OFFSET 14 +#define L_SQL_EX_OFFSET 18 #define L_DATA_OFFSET LOAD_HEADER_LEN /* Rotate event post-header */ @@ -105,15 +133,26 @@ #define R_POS_OFFSET 0 #define R_IDENT_OFFSET 8 -#define CF_DB_LEN_OFFSET 0 -#define CF_TBL_LEN_OFFSET 1 -#define CF_FILE_ID_OFFSET 2 +#define CF_FILE_ID_OFFSET 0 +#define CF_DATA_OFFSET CREATE_FILE_HEADER_LEN + +#define AB_FILE_ID_OFFSET 0 +#define AB_DATA_OFFSET APPEND_BLOCK_HEADER_LEN + +#define EL_FILE_ID_OFFSET 0 + +#define DF_FILE_ID_OFFSET 0 #define QUERY_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN) #define QUERY_DATA_OFFSET (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN) #define ROTATE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+ROTATE_HEADER_LEN) -#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info)) -#define CREATE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+CREATE_FILE_HEADER_LEN) +#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN) +#define CREATE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+\ + +LOAD_HEADER_LEN+CREATE_FILE_HEADER_LEN) +#define DELETE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+DELETE_FILE_HEADER_LEN) +#define EXEC_LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+EXEC_LOAD_HEADER_LEN) +#define APPEND_BLOCK_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+APPEND_BLOCK_HEADER_LEN) + #define BINLOG_MAGIC "\xfe\x62\x69\x6e" @@ -123,7 +162,7 @@ enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8, - APPEND_TO_FILE_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11}; + APPEND_BLOCK_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11}; enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2 }; @@ -145,6 +184,11 @@ public: uint32 server_id; uint32 log_seq; uint16 flags; + int cached_event_len; + char* temp_buf; +#ifndef MYSQL_CLIENT + THD* thd; +#endif static void *operator new(size_t size) { @@ -158,30 +202,32 @@ public: int write(IO_CACHE* file); int write_header(IO_CACHE* file); - virtual int write_data(IO_CACHE* file __attribute__((unused))) { return 0; } + virtual int write_data(IO_CACHE* file) + { return write_data_header(file) || write_data_body(file); } + virtual int write_data_header(IO_CACHE* file __attribute__((unused))) + { return 0; } + virtual int write_data_body(IO_CACHE* file __attribute__((unused))) + { return 0; } virtual Log_event_type get_type_code() = 0; - Log_event(time_t when_arg, ulong exec_time_arg = 0, - int valid_exec_time = 0, uint32 server_id_arg = 0, - uint32 log_seq_arg = 0, uint16 flags_arg = 0): - when(when_arg), exec_time(exec_time_arg), - log_seq(log_seq_arg),flags(0) - { - server_id = server_id_arg ? server_id_arg : (::server_id); - if(valid_exec_time) - flags |= LOG_EVENT_TIME_F; - } - - Log_event(const char* buf) - { - when = uint4korr(buf); - server_id = uint4korr(buf + SERVER_ID_OFFSET); - log_seq = uint4korr(buf + LOG_SEQ_OFFSET); - flags = uint2korr(buf + FLAGS_OFFSET); - } - - virtual ~Log_event() {} - + virtual bool is_valid() = 0; + Log_event(const char* buf); +#ifndef MYSQL_CLIENT + Log_event(THD* thd_arg, uint16 flags_arg = 0); +#endif + virtual ~Log_event() { free_temp_buf();} + void register_temp_buf(char* buf) { temp_buf = buf; } + void free_temp_buf() + { + if (temp_buf) + { + my_free(temp_buf, MYF(0)); + temp_buf = 0; + } + } virtual int get_data_size() { return 0;} + virtual int get_data_body_offset() { return 0; } + int get_event_len() { return cached_event_len ? cached_event_len : + (cached_event_len = LOG_EVENT_HEADER_LEN + get_data_size()); } #ifdef MYSQL_CLIENT virtual void print(FILE* file, bool short_form = 0, char* last_db = 0) = 0; void print_timestamp(FILE* file, time_t *ts = 0); @@ -200,6 +246,11 @@ public: virtual void pack_info(String* packet); int net_send(THD* thd, const char* log_name, ulong pos); static void init_show_field_list(List<Item>* field_list); + virtual int exec_event(struct st_master_info* mi); + virtual const char* get_db() + { + return thd ? thd->db : 0; + } #endif }; @@ -219,24 +270,13 @@ public: uint16 error_code; ulong thread_id; #if !defined(MYSQL_CLIENT) - THD* thd; bool cache_stmt; - Query_log_event(THD* thd_arg, const char* query_arg, bool using_trans=0): - Log_event(thd_arg->start_time,0,1,thd_arg->server_id,thd_arg->log_seq), - data_buf(0), - query(query_arg), db(thd_arg->db), q_len(thd_arg->query_length), - error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno), - thread_id(thd_arg->thread_id), thd(thd_arg), - cache_stmt(using_trans && - (thd_arg->options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN))) - { - time_t end_time; - time(&end_time); - exec_time = (ulong) (end_time - thd->start_time); - db_len = (db) ? (uint32) strlen(db) : 0; - } - + + Query_log_event(THD* thd_arg, const char* query_arg, + bool using_trans=0); + const char* get_db() { return db; } void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif Query_log_event(const char* buf, int event_len); @@ -250,6 +290,7 @@ public: Log_event_type get_type_code() { return QUERY_EVENT; } int write(IO_CACHE* file); int write_data(IO_CACHE* file); // returns 0 on success, -1 on error + bool is_valid() { return query != 0; } int get_data_size() { return q_len + db_len + 2 + @@ -279,11 +320,13 @@ public: #ifndef MYSQL_CLIENT Slave_log_event(THD* thd_arg, struct st_master_info* mi); void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif Slave_log_event(const char* buf, int event_len); ~Slave_log_event(); int get_data_size(); + bool is_valid() { return master_host != 0; } Log_event_type get_type_code() { return SLAVE_EVENT; } #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); @@ -292,34 +335,10 @@ public: }; -#define DUMPFILE_FLAG 0x1 -#define OPT_ENCLOSED_FLAG 0x2 -#define REPLACE_FLAG 0x4 -#define IGNORE_FLAG 0x8 - -#define FIELD_TERM_EMPTY 0x1 -#define ENCLOSED_EMPTY 0x2 -#define LINE_TERM_EMPTY 0x4 -#define LINE_START_EMPTY 0x8 -#define ESCAPED_EMPTY 0x10 - - -struct sql_ex_info - { - char field_term; - char enclosed; - char line_term; - char line_start; - char escaped; - char opt_flags; // flags for the options - char empty_flags; // flags to indicate which of the terminating charact - } ; - class Load_log_event: public Log_event { protected: - char* data_buf; - void copy_log_event(const char *buf, ulong data_len); + int copy_log_event(const char *buf, ulong event_len); public: ulong thread_id; @@ -330,96 +349,39 @@ public: const char* fields; const uchar* field_lens; uint32 field_block_len; - const char* table_name; const char* db; const char* fname; + bool fname_null_term; uint32 skip_lines; sql_ex_info sql_ex; #if !defined(MYSQL_CLIENT) - THD* thd; String field_lens_buf; String fields_buf; - Load_log_event(THD* thd, sql_exchange* ex, const char* table_name_arg, - List<Item>& fields_arg, enum enum_duplicates handle_dup ): - Log_event(thd->start_time),data_buf(0),thread_id(thd->thread_id), - num_fields(0),fields(0),field_lens(0),field_block_len(0), - table_name(table_name_arg), - db(thd->db), - fname(ex->file_name), - thd(thd) - { - time_t end_time; - time(&end_time); - exec_time = (ulong) (end_time - thd->start_time); - db_len = (db) ? (uint32) strlen(db) : 0; - table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; - fname_len = (fname) ? (uint) strlen(fname) : 0; - sql_ex.field_term = (*ex->field_term)[0]; - sql_ex.enclosed = (*ex->enclosed)[0]; - sql_ex.line_term = (*ex->line_term)[0]; - sql_ex.line_start = (*ex->line_start)[0]; - sql_ex.escaped = (*ex->escaped)[0]; - sql_ex.opt_flags = 0; - if(ex->dumpfile) - sql_ex.opt_flags |= DUMPFILE_FLAG; - if(ex->opt_enclosed) - sql_ex.opt_flags |= OPT_ENCLOSED_FLAG; - - sql_ex.empty_flags = 0; - - switch(handle_dup) - { - case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break; - case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break; - case DUP_ERROR: break; - } - - if(!ex->field_term->length()) - sql_ex.empty_flags |= FIELD_TERM_EMPTY; - if(!ex->enclosed->length()) - sql_ex.empty_flags |= ENCLOSED_EMPTY; - if(!ex->line_term->length()) - sql_ex.empty_flags |= LINE_TERM_EMPTY; - if(!ex->line_start->length()) - sql_ex.empty_flags |= LINE_START_EMPTY; - if(!ex->escaped->length()) - sql_ex.empty_flags |= ESCAPED_EMPTY; - - skip_lines = ex->skip_lines; - - List_iterator<Item> li(fields_arg); - field_lens_buf.length(0); - fields_buf.length(0); - Item* item; - while((item = li++)) - { - num_fields++; - uchar len = (uchar) strlen(item->name); - field_block_len += len + 1; - fields_buf.append(item->name, len + 1); - field_lens_buf.append((char*)&len, 1); - } - - field_lens = (const uchar*)field_lens_buf.ptr(); - fields = fields_buf.ptr(); - } + + Load_log_event(THD* thd, sql_exchange* ex, const char* db_arg, + const char* table_name_arg, + List<Item>& fields_arg, enum enum_duplicates handle_dup); void set_fields(List<Item> &fields_arg); void pack_info(String* packet); + const char* get_db() { return db; } + int exec_event(struct st_master_info* mi) + { + return exec_event(thd->slave_net,mi); + } + int exec_event(NET* net, struct st_master_info* mi); #endif Load_log_event(const char* buf, int event_len); ~Load_log_event() { - if (data_buf) - { - my_free((gptr) data_buf, MYF(0)); - } } Log_event_type get_type_code() { return LOAD_EVENT; } - int write_data(IO_CACHE* file); // returns 0 on success, -1 on error + int write_data_header(IO_CACHE* file); + int write_data_body(IO_CACHE* file); + bool is_valid() { return table_name != 0; } int get_data_size() { return table_name_len + 2 + db_len + 2 + fname_len @@ -427,9 +389,10 @@ public: + 4 // exec_time + 4 // skip_lines + 4 // field block len - + sizeof(sql_ex) + field_block_len + num_fields*sizeof(uchar) ; + + sizeof(sql_ex) + field_block_len + num_fields; ; } + int get_data_body_offset() { return LOAD_EVENT_OVERHEAD; } #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); #endif @@ -443,23 +406,25 @@ public: uint32 created; uint16 binlog_version; char server_version[ST_SERVER_VER_LEN]; - - Start_log_event() :Log_event(time(NULL)),binlog_version(BINLOG_VERSION) +#ifndef MYSQL_CLIENT + Start_log_event() :Log_event((THD*)0),binlog_version(BINLOG_VERSION) { created = (uint32) when; memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); } +#endif Start_log_event(const char* buf); - ~Start_log_event() {} Log_event_type get_type_code() { return START_EVENT;} int write_data(IO_CACHE* file); + bool is_valid() { return 1; } int get_data_size() { return START_HEADER_LEN; } #ifndef MYSQL_CLIENT void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); @@ -471,17 +436,21 @@ class Intvar_log_event: public Log_event public: ulonglong val; uchar type; - Intvar_log_event(uchar type_arg, ulonglong val_arg) - :Log_event(time(NULL)),val(val_arg),type(type_arg) +#ifndef MYSQL_CLIENT + Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg) + :Log_event(thd_arg),val(val_arg),type(type_arg) {} +#endif Intvar_log_event(const char* buf); ~Intvar_log_event() {} Log_event_type get_type_code() { return INTVAR_EVENT;} const char* get_var_type_name(); int get_data_size() { return sizeof(type) + sizeof(val);} int write_data(IO_CACHE* file); + bool is_valid() { return 1; } #ifndef MYSQL_CLIENT void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif #ifdef MYSQL_CLIENT @@ -492,15 +461,21 @@ public: class Stop_log_event: public Log_event { public: - Stop_log_event() :Log_event(time(NULL)) +#ifndef MYSQL_CLIENT + Stop_log_event() :Log_event((THD*)0) {} +#endif Stop_log_event(const char* buf):Log_event(buf) { } ~Stop_log_event() {} Log_event_type get_type_code() { return STOP_EVENT;} + bool is_valid() { return 1; } #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif +#ifndef MYSQL_CLIENT + int exec_event(struct st_master_info* mi); #endif }; @@ -511,16 +486,16 @@ public: uchar ident_len; ulonglong pos; bool alloced; - - Rotate_log_event(const char* new_log_ident_arg, uint ident_len_arg = 0, - ulonglong pos_arg = 4) : - Log_event(time(NULL)), +#ifndef MYSQL_CLIENT + Rotate_log_event(THD* thd_arg, const char* new_log_ident_arg, + uint ident_len_arg = 0,ulonglong pos_arg = 4) : + Log_event(thd_arg), new_log_ident(new_log_ident_arg), ident_len(ident_len_arg ? ident_len_arg : (uint) strlen(new_log_ident_arg)), pos(pos_arg), alloced(0) {} - +#endif Rotate_log_event(const char* buf, int event_len); ~Rotate_log_event() { @@ -529,40 +504,136 @@ public: } Log_event_type get_type_code() { return ROTATE_EVENT;} int get_data_size() { return ident_len + ROTATE_HEADER_LEN;} + bool is_valid() { return new_log_ident != 0; } int write_data(IO_CACHE* file); #ifdef MYSQL_CLIENT void print(FILE* file, bool short_form = 0, char* last_db = 0); #endif #ifndef MYSQL_CLIENT void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif }; /* the classes below are for the new LOAD DATA INFILE logging */ -class Create_file_log_event: public Log_event +class Create_file_log_event: public Load_log_event { +protected: + // pretend we are Load event, so we can write out just + // our Load part - used on the slave when writing event out to + // SQL_LOAD-*.info file + bool fake_base; public: - char* db; - char* tbl_name; - uint db_len; - uint tbl_name_len; char* block; uint block_len; uint file_id; - -#ifndef MYSQL_CLIENT - Create_file_log_event(THD* thd, TABLE_LIST * table, char* block_arg, - uint block_len_arg); +#ifndef MYSQL_CLIENT + Create_file_log_event(THD* thd, sql_exchange* ex, const char* db_arg, + const char* table_name_arg, + List<Item>& fields_arg, enum enum_duplicates handle_dup, + char* block_arg, uint block_len_arg); #endif Create_file_log_event(const char* buf, int event_len); ~Create_file_log_event() { } - Log_event_type get_type_code() { return CREATE_FILE_EVENT;} - int get_data_size() { return tbl_name_len + block_len + - CREATE_FILE_HEADER_LEN ;} + Log_event_type get_type_code() { return fake_base ? LOAD_EVENT : + CREATE_FILE_EVENT;} + int get_data_size() { return fake_base ? Load_log_event::get_data_size() : + Load_log_event::get_data_size() + + 4 + 1 + block_len;} + int get_data_body_offset() { return fake_base ? LOAD_EVENT_OVERHEAD: + LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN; } + bool is_valid() { return block != 0; } + int write_data_header(IO_CACHE* file); + int write_data_body(IO_CACHE* file); + int write_base(IO_CACHE* file); // cut out Create_file extentions and + // write it as Load event - used on the slave + +#ifdef MYSQL_CLIENT + void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif +#ifndef MYSQL_CLIENT + void pack_info(String* packet); + int exec_event(struct st_master_info* mi); +#endif +}; + +class Append_block_log_event: public Log_event +{ +public: + char* block; + uint block_len; + uint file_id; + +#ifndef MYSQL_CLIENT + Append_block_log_event(THD* thd, char* block_arg, + uint block_len_arg); + int exec_event(struct st_master_info* mi); +#endif + + Append_block_log_event(const char* buf, int event_len); + ~Append_block_log_event() + { + } + Log_event_type get_type_code() { return APPEND_BLOCK_EVENT;} + int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;} + bool is_valid() { return block != 0; } + int write_data(IO_CACHE* file); + +#ifdef MYSQL_CLIENT + void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif +#ifndef MYSQL_CLIENT + void pack_info(String* packet); +#endif +}; + +class Delete_file_log_event: public Log_event +{ +public: + uint file_id; + +#ifndef MYSQL_CLIENT + Delete_file_log_event(THD* thd); +#endif + + Delete_file_log_event(const char* buf, int event_len); + ~Delete_file_log_event() + { + } + Log_event_type get_type_code() { return DELETE_FILE_EVENT;} + int get_data_size() { return DELETE_FILE_HEADER_LEN ;} + bool is_valid() { return file_id != 0; } + int write_data(IO_CACHE* file); + +#ifdef MYSQL_CLIENT + void print(FILE* file, bool short_form = 0, char* last_db = 0); +#endif +#ifndef MYSQL_CLIENT + void pack_info(String* packet); + int exec_event(struct st_master_info* mi); +#endif +}; + +class Execute_load_log_event: public Log_event +{ +public: + uint file_id; + +#ifndef MYSQL_CLIENT + Execute_load_log_event(THD* thd); +#endif + + Execute_load_log_event(const char* buf, int event_len); + ~Execute_load_log_event() + { + } + Log_event_type get_type_code() { return EXEC_LOAD_EVENT;} + int get_data_size() { return EXEC_LOAD_HEADER_LEN ;} + bool is_valid() { return file_id != 0; } int write_data(IO_CACHE* file); #ifdef MYSQL_CLIENT @@ -570,6 +641,7 @@ public: #endif #ifndef MYSQL_CLIENT void pack_info(String* packet); + int exec_event(struct st_master_info* mi); #endif }; |