From 5e2639830e1d5bad975204feeba94013551f337f Mon Sep 17 00:00:00 2001 From: unknown Date: Tue, 19 Jun 2001 15:03:48 -0600 Subject: Added event sequence number and flags to binlog Documented/cleaned up log event code Updated tests mysql-test/r/rpl000014.result: new binlog format mysql-test/r/rpl000015.result: new binlog format mysql-test/r/rpl000016.result: new binlog format mysql-test/std_data/master-bin.001: new binlog format mysql-test/t/rpl000014.test: new binlog format sql/log.cc: new binlog format sql/log_event.cc: new binlog format sql/log_event.h: new binlog format sql/sql_class.cc: new binlog format sql/sql_class.h: new binlog format sql/sql_repl.cc: new binlog format --- mysql-test/r/rpl000014.result | 6 +- mysql-test/r/rpl000015.result | 4 +- mysql-test/r/rpl000016.result | 8 +- mysql-test/std_data/master-bin.001 | Bin 113 -> 98 bytes mysql-test/t/rpl000014.test | 2 +- sql/log.cc | 27 +-- sql/log_event.cc | 357 ++++++++++--------------------------- sql/log_event.h | 125 ++++++++----- sql/sql_class.cc | 1 + sql/sql_class.h | 5 + sql/sql_repl.cc | 4 +- 11 files changed, 212 insertions(+), 327 deletions(-) diff --git a/mysql-test/r/rpl000014.result b/mysql-test/r/rpl000014.result index a47c3c91c1d..b206fa99d10 100644 --- a/mysql-test/r/rpl000014.result +++ b/mysql-test/r/rpl000014.result @@ -1,7 +1,7 @@ File Position Binlog_do_db Binlog_ignore_db -master-bin.001 73 +master-bin.001 79 Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter -127.0.0.1 root 9999 1 master-bin.001 73 Yes 0 0 +127.0.0.1 root 9999 1 master-bin.001 79 Yes 0 0 Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter 127.0.0.1 root 9999 1 master-bin.001 73 No 0 0 Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter @@ -9,7 +9,7 @@ Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Rep Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter 127.0.0.1 root 9999 1 master-bin.001 173 Yes 0 0 File Position Binlog_do_db Binlog_ignore_db -master-bin.001 73 +master-bin.001 79 n 1 2 diff --git a/mysql-test/r/rpl000015.result b/mysql-test/r/rpl000015.result index 58487af27f8..12074224fd8 100644 --- a/mysql-test/r/rpl000015.result +++ b/mysql-test/r/rpl000015.result @@ -1,5 +1,5 @@ File Position Binlog_do_db Binlog_ignore_db -master-bin.001 73 +master-bin.001 79 Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter 0 0 0 No 0 0 Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter @@ -7,7 +7,7 @@ Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Rep Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter 127.0.0.1 root 9999 60 4 No 0 0 Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter -127.0.0.1 root 9999 60 master-bin.001 73 Yes 0 0 +127.0.0.1 root 9999 60 master-bin.001 79 Yes 0 0 n 10 45 diff --git a/mysql-test/r/rpl000016.result b/mysql-test/r/rpl000016.result index abe4275a124..16f8eceaf6d 100644 --- a/mysql-test/r/rpl000016.result +++ b/mysql-test/r/rpl000016.result @@ -1,5 +1,5 @@ Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter -127.0.0.1 root 9999 60 master-bin.001 216 Yes 0 0 +127.0.0.1 root 9999 60 master-bin.001 234 Yes 0 0 s Could not break slave Tried hard @@ -10,7 +10,7 @@ master-bin.003 Log_name master-bin.003 Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter -127.0.0.1 root 9999 60 master-bin.003 184 Yes 0 0 +127.0.0.1 root 9999 60 master-bin.003 202 Yes 0 0 m 34 65 @@ -23,8 +23,8 @@ master-bin.004 master-bin.005 master-bin.006 File Position Binlog_do_db Binlog_ignore_db -master-bin.006 131 +master-bin.006 720 Master_Host Master_User Master_Port Connect_retry Log_File Pos Slave_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter -127.0.0.1 root 9999 60 master-bin.006 131 Yes 0 0 +127.0.0.1 root 9999 60 master-bin.006 720 Yes 0 0 count(*) 100 diff --git a/mysql-test/std_data/master-bin.001 b/mysql-test/std_data/master-bin.001 index fa30d8e5302..2ec2397acdd 100644 Binary files a/mysql-test/std_data/master-bin.001 and b/mysql-test/std_data/master-bin.001 differ diff --git a/mysql-test/t/rpl000014.test b/mysql-test/t/rpl000014.test index b501d63b10e..604e614b3a8 100644 --- a/mysql-test/t/rpl000014.test +++ b/mysql-test/t/rpl000014.test @@ -25,7 +25,7 @@ create table foo (n int); insert into foo values (1),(2),(3); save_master_pos; connection slave; -change master to master_log_pos=73; +change master to master_log_pos=79; sync_with_master; select * from foo; connection master; diff --git a/sql/log.cc b/sql/log.cc index 1cb6c945b7c..1faf9f971be 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -81,7 +81,7 @@ static int find_uniq_filename(char *name) MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), name(0), log_type(LOG_CLOSED),write_error(0), - inited(0), no_rotate(0) + inited(0), log_seq(1), no_rotate(0) { /* We don't want to intialize LOCK_Log here as the thread system may @@ -232,7 +232,20 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, goto err; Start_log_event s; bool error; + s.set_log_seq(0, this); s.write(&log_file); + // if we have a master, record current master info in a slave + // event + if (glob_mi.inited) + { + THD* thd = current_thd; + Slave_log_event s(thd, &glob_mi); + s.set_log_seq(thd, this); + + if(s.master_host) + s.write(&log_file); + } + flush_io_cache(&log_file); pthread_mutex_lock(&LOCK_index); error=(my_write(index_file, (byte*) log_file_name, strlen(log_file_name), @@ -531,16 +544,8 @@ void MYSQL_LOG::new_file() to change base names at some point. */ Rotate_log_event r(new_name+dirname_length(new_name)); + r.set_log_seq(current_thd, this); r.write(&log_file); - - // if we have a master, record current master info in a slave - // event - if(glob_mi.inited) - { - Slave_log_event s(current_thd, &glob_mi); - if(s.master_host) - s.write(&log_file); - } VOID(pthread_cond_broadcast(&COND_binlog_update)); } name=0; @@ -548,6 +553,7 @@ void MYSQL_LOG::new_file() open(old_name, log_type, new_name); my_free(old_name,MYF(0)); last_time=query_start=0; + log_seq = 1; write_error=0; VOID(pthread_mutex_unlock(&LOCK_log)); } @@ -641,6 +647,7 @@ bool MYSQL_LOG::write(Slave_log_event* event_info) if (!inited) // Can't use mutex if not init return 0; VOID(pthread_mutex_lock(&LOCK_log)); + event_info->set_log_seq(current_thd, this); error = event_info->write(&log_file); VOID(pthread_mutex_unlock(&LOCK_log)); return error; diff --git a/sql/log_event.cc b/sql/log_event.cc index 630a0380764..f438eb351d1 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -53,7 +53,6 @@ int Log_event::write(IO_CACHE* file) int Log_event::write_header(IO_CACHE* file) { - // make sure to change this when the header gets bigger char buf[LOG_EVENT_HEADER_LEN]; char* pos = buf; int4store(pos, when); // timestamp @@ -64,6 +63,10 @@ int Log_event::write_header(IO_CACHE* file) long tmp=get_data_size() + LOG_EVENT_HEADER_LEN; int4store(pos, tmp); pos += 4; + int4store(pos, log_seq); + pos += 4; + int2store(pos, flags); + pos += 2; return (my_b_write(file, (byte*) buf, (uint) (pos - buf))); } @@ -116,103 +119,51 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) { - time_t timestamp; - uint32 server_id; - - char buf[LOG_EVENT_HEADER_LEN-4]; + char head[LOG_EVENT_HEADER_LEN]; if(log_lock) pthread_mutex_lock(log_lock); - if (my_b_read(file, (byte *) buf, sizeof(buf))) + if (my_b_read(file, (byte *) head, sizeof(head))) { if (log_lock) pthread_mutex_unlock(log_lock); - return NULL; - } - timestamp = uint4korr(buf); - server_id = uint4korr(buf + 5); - - switch(buf[EVENT_TYPE_OFFSET]) - { - case QUERY_EVENT: - { - Query_log_event* q = new Query_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - if (!q->query) - { - delete q; - q=NULL; - } - return q; - } - - case LOAD_EVENT: - { - Load_log_event* l = new Load_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - if (!l->table_name) - { - delete l; - l=NULL; - } - return l; - } - - case SLAVE_EVENT: - { - Slave_log_event* l = new Slave_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - if (!l->master_host) - { - delete l; - l=NULL; - } - return l; + return 0; } + uint data_len = uint4korr(head + EVENT_LEN_OFFSET); + char* buf = 0; + const char* error = 0; + Log_event* res = 0; - case ROTATE_EVENT: + if (data_len > max_allowed_packet) { - Rotate_log_event* r = new Rotate_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - - if (!r->new_log_ident) - { - delete r; - r=NULL; - } - return r; + error = "Event too big"; + goto err; } - case INTVAR_EVENT: + if (data_len < LOG_EVENT_HEADER_LEN) { - Intvar_log_event* e = new Intvar_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - - if (e->type == INVALID_INT_EVENT) - { - delete e; - e=NULL; - } - return e; + error = "Event too small"; + goto err; } - case START_EVENT: - { - Start_log_event* e = new Start_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - return e; - } - case STOP_EVENT: - { - Stop_log_event* e = new Stop_log_event(file, timestamp, server_id); - if(log_lock) pthread_mutex_unlock(log_lock); - return e; - } - default: - break; + if (!(buf = my_malloc(data_len, MYF(MY_WME)))) + { + error = "Out of memory"; + goto err; } - // default + memcpy(buf, head, LOG_EVENT_HEADER_LEN); + if(my_b_read(file, (byte*) buf + LOG_EVENT_HEADER_LEN, + data_len - LOG_EVENT_HEADER_LEN)) + { + error = "read error"; + goto err; + } + res = read_log_event(buf, data_len); +err: if (log_lock) pthread_mutex_unlock(log_lock); - return NULL; + if(error) + sql_print_error(error); + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); + return res; } Log_event* Log_event::read_log_event(const char* buf, int event_len) @@ -234,18 +185,6 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len) return q; } - - case SLAVE_EVENT: - { - Slave_log_event* s = new Slave_log_event(buf, event_len); - if (!s->master_host) - { - delete s; - return NULL; - } - - return s; - } case LOAD_EVENT: { @@ -342,47 +281,21 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) fflush(file); } -Rotate_log_event::Rotate_log_event(IO_CACHE* file, time_t when_arg, - uint32 server_id): - Log_event(when_arg, 0, 0, server_id),new_log_ident(NULL),alloced(0) -{ - char *tmp_ident; - char buf[4]; - - if (my_b_read(file, (byte*) buf, sizeof(buf))) - return; - ulong event_len; - event_len = uint4korr(buf); - if (event_len < ROTATE_EVENT_OVERHEAD) - return; - - ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD); - if (!(tmp_ident = (char*) my_malloc((uint)ident_len, MYF(MY_WME)))) - return; - if (my_b_read( file, (byte*) tmp_ident, (uint) ident_len)) - { - my_free((gptr) tmp_ident, MYF(0)); - return; - } - - new_log_ident = tmp_ident; - alloced = 1; -} - Start_log_event::Start_log_event(const char* buf) :Log_event(buf) { - buf += EVENT_LEN_OFFSET + 4; // skip even length - binlog_version = uint2korr(buf); - memcpy(server_version, buf + 2, sizeof(server_version)); - created = uint4korr(buf + 2 + sizeof(server_version)); + binlog_version = uint2korr(buf + LOG_EVENT_HEADER_LEN + + ST_BINLOG_VER_OFFSET); + memcpy(server_version, buf + ST_SERVER_VER_OFFSET + LOG_EVENT_HEADER_LEN, + ST_SERVER_VER_LEN); + created = uint4korr(buf + ST_CREATED_OFFSET + LOG_EVENT_HEADER_LEN); } int Start_log_event::write_data(IO_CACHE* file) { - char buff[sizeof(server_version)+2+4]; - int2store(buff,binlog_version); - memcpy(buff+2,server_version,sizeof(server_version)); - int4store(buff+2+sizeof(server_version),created); + char buff[START_HEADER_LEN]; + int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); + memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); + int4store(buff + ST_CREATED_OFFSET,created); return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); } @@ -407,60 +320,25 @@ int Rotate_log_event::write_data(IO_CACHE* file) return my_b_write(file, (byte*) new_log_ident, (uint) ident_len) ? -1 :0; } -Query_log_event::Query_log_event(IO_CACHE* file, time_t when_arg, - uint32 server_id): - Log_event(when_arg,0,0,server_id),data_buf(0),query(NULL),db(NULL) -{ - char buf[QUERY_HEADER_LEN + 4]; - ulong data_len; - if (my_b_read(file, (byte*) buf, sizeof(buf))) - return; // query == NULL will tell the - // caller there was a problem - data_len = uint4korr(buf); - if (data_len < QUERY_EVENT_OVERHEAD) - return; // tear-drop attack protection :) - - data_len -= QUERY_EVENT_OVERHEAD; - exec_time = uint4korr(buf + 8); - db_len = (uint)buf[12]; - error_code = uint2korr(buf + 13); - - /* Allocate one byte extra for end \0 */ - if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME)))) - return; - if (my_b_read( file, (byte*) data_buf, data_len)) - { - my_free((gptr) data_buf, MYF(0)); - data_buf = 0; - return; - } - - thread_id = uint4korr(buf + 4); - db = data_buf; - query=data_buf + db_len + 1; - q_len = data_len - 1 - db_len; - *((char*) query + q_len) = 0; // Safety -} - Query_log_event::Query_log_event(const char* buf, int event_len): Log_event(buf),data_buf(0), query(NULL), db(NULL) { if ((uint)event_len < QUERY_EVENT_OVERHEAD) return; ulong data_len; - buf += EVENT_LEN_OFFSET; data_len = event_len - QUERY_EVENT_OVERHEAD; + - exec_time = uint4korr(buf + 8); - error_code = uint2korr(buf + 13); + exec_time = uint4korr(buf + LOG_EVENT_HEADER_LEN + Q_EXEC_TIME_OFFSET); + error_code = uint2korr(buf + LOG_EVENT_HEADER_LEN + Q_ERR_CODE_OFFSET); if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME)))) return; - memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len); - thread_id = uint4korr(buf + 4); + memcpy(data_buf, buf + LOG_EVENT_HEADER_LEN + Q_DATA_OFFSET, data_len); + thread_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + Q_THREAD_ID_OFFSET); db = data_buf; - db_len = (uint)buf[12]; + db_len = (uint)buf[LOG_EVENT_HEADER_LEN + Q_DB_LEN_OFFSET]; query=data_buf + db_len + 1; q_len = data_len - 1 - db_len; *((char*)query+q_len) = 0; @@ -499,44 +377,28 @@ int Query_log_event::write_data(IO_CACHE* file) if (!query) return -1; char buf[QUERY_HEADER_LEN]; - char* pos = buf; - int4store(pos, thread_id); - pos += 4; - int4store(pos, exec_time); - pos += 4; - *pos++ = (char)db_len; - int2store(pos, error_code); - pos += 2; + int4store(buf + Q_THREAD_ID_OFFSET, thread_id); + int4store(buf + Q_EXEC_TIME_OFFSET, exec_time); + buf[Q_DB_LEN_OFFSET] = (char)db_len; + int2store(buf + Q_ERR_CODE_OFFSET, error_code); - return (my_b_write(file, (byte*) buf, (uint)(pos - buf)) || + return (my_b_write(file, (byte*) buf, QUERY_HEADER_LEN) || my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || my_b_write(file, (byte*) query, q_len)) ? -1 : 0; } -Intvar_log_event:: Intvar_log_event(IO_CACHE* file, time_t when_arg, - uint32 server_id) - :Log_event(when_arg,0,0,server_id), type(INVALID_INT_EVENT) -{ - char buf[9+4]; - if (!my_b_read(file, (byte*) buf, sizeof(buf))) - { - type = buf[4]; - val = uint8korr(buf+1+4); - } -} - Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) { buf += LOG_EVENT_HEADER_LEN; - type = buf[0]; - val = uint8korr(buf+1); + type = buf[I_TYPE_OFFSET]; + val = uint8korr(buf+I_VAL_OFFSET); } int Intvar_log_event::write_data(IO_CACHE* file) { char buf[9]; - buf[0] = type; - int8store(buf + 1, val); + buf[I_TYPE_OFFSET] = type; + int8store(buf + I_VAL_OFFSET, val); return my_b_write(file, (byte*) buf, sizeof(buf)); } @@ -567,12 +429,12 @@ void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) int Load_log_event::write_data(IO_CACHE* file) { char buf[LOAD_HEADER_LEN]; - int4store(buf, thread_id); - int4store(buf + 4, exec_time); - int4store(buf + 8, skip_lines); - buf[12] = (char)table_name_len; - buf[13] = (char)db_len; - int4store(buf + 14, num_fields); + int4store(buf + L_THREAD_ID_OFFSET, thread_id); + int4store(buf + L_EXEC_TIME_OFFSET, exec_time); + int4store(buf + L_SKIP_LINES_OFFSET, skip_lines); + buf[L_TBL_LEN_OFFSET] = (char)table_name_len; + buf[L_DB_LEN_OFFSET] = (char)db_len; + int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); if(my_b_write(file, (byte*)buf, sizeof(buf)) || my_b_write(file, (byte*)&sql_ex, sizeof(sql_ex))) @@ -591,52 +453,33 @@ int Load_log_event::write_data(IO_CACHE* file) return 0; } -Load_log_event::Load_log_event(IO_CACHE* file, time_t when, uint32 server_id): - Log_event(when,0,0,server_id),data_buf(0),num_fields(0), - fields(0),field_lens(0),field_block_len(0), - table_name(0),db(0),fname(0) -{ - char buf[LOAD_HEADER_LEN + 4]; - ulong data_len; - if (my_b_read(file, (byte*)buf, sizeof(buf)) || - my_b_read(file, (byte*)&sql_ex, sizeof(sql_ex))) - return; - - data_len = uint4korr(buf) - LOAD_EVENT_OVERHEAD; - if (!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME)))) - return; - if (my_b_read(file, (byte*)data_buf, data_len)) - return; - copy_log_event(buf,data_len); -} - Load_log_event::Load_log_event(const char* buf, int event_len): Log_event(buf),data_buf(0),num_fields(0),fields(0), field_lens(0),field_block_len(0), table_name(0),db(0),fname(0) { - ulong data_len; - + uint data_len; if((uint)event_len < (LOAD_EVENT_OVERHEAD + LOG_EVENT_HEADER_LEN)) return; - buf += EVENT_LEN_OFFSET; - memcpy(&sql_ex, buf + LOAD_HEADER_LEN + 4, sizeof(sql_ex)); - data_len = event_len; - + memcpy(&sql_ex, buf + LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN, + sizeof(sql_ex)); + data_len = event_len - LOAD_HEADER_LEN - LOG_EVENT_HEADER_LEN - + sizeof(sql_ex); if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME)))) return; - memcpy(data_buf, buf + 22 + sizeof(sql_ex), data_len); + memcpy(data_buf, buf +LOG_EVENT_HEADER_LEN + LOAD_HEADER_LEN + + sizeof(sql_ex), data_len); copy_log_event(buf, data_len); } void Load_log_event::copy_log_event(const char *buf, ulong data_len) { - thread_id = uint4korr(buf+4); - exec_time = uint4korr(buf+8); - skip_lines = uint4korr(buf + 12); - table_name_len = (uint)buf[16]; - db_len = (uint)buf[17]; - num_fields = uint4korr(buf + 18); + thread_id = uint4korr(buf + L_THREAD_ID_OFFSET + LOG_EVENT_HEADER_LEN); + exec_time = uint4korr(buf + L_EXEC_TIME_OFFSET + LOG_EVENT_HEADER_LEN); + skip_lines = uint4korr(buf + L_SKIP_LINES_OFFSET + LOG_EVENT_HEADER_LEN); + table_name_len = (uint)buf[L_TBL_LEN_OFFSET + LOG_EVENT_HEADER_LEN]; + db_len = (uint)buf[L_DB_LEN_OFFSET + LOG_EVENT_HEADER_LEN]; + num_fields = uint4korr(buf + L_NUM_FIELDS_OFFSET + LOG_EVENT_HEADER_LEN); if (num_fields > data_len) // simple sanity check against corruption return; @@ -742,6 +585,12 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db) #ifndef MYSQL_CLIENT +void Log_event::set_log_seq(THD* thd, MYSQL_LOG* log) + { + log_seq = (thd && thd->log_seq) ? thd->log_seq++ : log->log_seq++; + } + + void Load_log_event::set_fields(List &fields) { uint i; @@ -804,48 +653,26 @@ void Slave_log_event::print(FILE* file, bool short_form = 0, int Slave_log_event::get_data_size() { - return master_host_len + master_log_len + 1 + sizeof(uint32) + - sizeof(ulonglong) + - sizeof(uint16); + return master_host_len + master_log_len + 1 + 4 /* data_size*/ + + 8 /* master_pos */ + + 2 /* master_port */; } int Slave_log_event::write_data(IO_CACHE* file) { int data_size = get_data_size(); int4store(mem_pool, data_size); - int8store(mem_pool + 4, master_pos); - int2store(mem_pool + 12, master_port); + int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos); + int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port); // log and host are already there return my_b_write(file, (byte*)mem_pool, data_size); } -Slave_log_event::Slave_log_event(IO_CACHE* file, time_t when, - uint32 server_id_arg): - Log_event(when,0,0,server_id),master_host(0) -{ - char buf[4]; - if(my_b_read(file, (byte*)buf, 4)) - return; - uint32 data_size; - data_size = uint4korr(buf); - if(data_size > max_allowed_packet) - return; // safety - - if(!(mem_pool = (char*)my_malloc(data_size + 1, MYF(MY_WME)))) - return; - - if(my_b_read(file, (byte*)mem_pool + 4, data_size - 4)) - return; - - mem_pool[data_size] = 0; - init_from_mem_pool(data_size); -} - void Slave_log_event::init_from_mem_pool(int data_size) { - master_pos = uint8korr(mem_pool + 4); - master_port = uint2korr(mem_pool + 12); - master_host = mem_pool + 14; + master_pos = uint8korr(mem_pool + SL_MASTER_POS_OFFSET); + master_port = uint2korr(mem_pool + SL_MASTER_PORT_OFFSET); + master_host = mem_pool + SL_MASTER_HOST_OFFSET; master_host_len = strlen(master_host); // safety master_log = master_host + master_host_len; @@ -872,5 +699,3 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len): - - diff --git a/sql/log_event.h b/sql/log_event.h index 59657fa75e0..eef2be962a1 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -34,21 +34,80 @@ #define LOG_READ_TOO_LARGE -7 #define LOG_EVENT_OFFSET 4 -#define BINLOG_VERSION 1 +#define BINLOG_VERSION 2 + +/* we could have used SERVER_VERSION_LENGTH, but this introduces an + obscure dependency - if somebody decided to change SERVER_VERSION_LENGTH + this would have broke the replication protocol +*/ +#define ST_SERVER_VER_LEN 50 + +/* Binary log consists of events. Each event has a fixed length header, + followed by possibly variable ( depending on the type of event) length + data body. The data body consists of an optional fixed length segment + (post-header), and an optional variable length segment. See #defines and + comments below for the format specifics +*/ + +/* event-specific post-header sizes */ +#define LOG_EVENT_HEADER_LEN 19 +#define QUERY_HEADER_LEN (4 + 4 + 1 + 2) +#define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4) +#define START_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4) + +/* event header offsets */ -#define LOG_EVENT_HEADER_LEN 13 -#define QUERY_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + \ - sizeof(uchar) + sizeof(uint16)) -#define LOAD_HEADER_LEN (sizeof(uint32) + sizeof(uint32) + \ - + sizeof(uint32) + 2 + sizeof(uint32)) -#define EVENT_LEN_OFFSET 9 #define EVENT_TYPE_OFFSET 4 +#define SERVER_ID_OFFSET 5 +#define EVENT_LEN_OFFSET 9 +#define LOG_SEQ_OFFSET 13 +#define FLAGS_OFFSET 17 + +/* start event post-header */ + +#define ST_BINLOG_VER_OFFSET 0 +#define ST_SERVER_VER_OFFSET 2 +#define ST_CREATED_OFFSET (ST_SERVER_VER_OFFSET + ST_SERVER_VER_LEN) + +/* slave event post-header */ + +#define SL_MASTER_PORT_OFFSET 12 +#define SL_MASTER_POS_OFFSET 4 +#define SL_MASTER_HOST_OFFSET 14 + +/* query event post-header */ + +#define Q_THREAD_ID_OFFSET 0 +#define Q_EXEC_TIME_OFFSET 4 +#define Q_DB_LEN_OFFSET 8 +#define Q_ERR_CODE_OFFSET 9 +#define Q_DATA_OFFSET QUERY_HEADER_LEN + +/* Intvar event post-header */ + +#define I_TYPE_OFFSET 0 +#define I_VAL_OFFSET 1 + +/* Load event post-header */ + +#define L_THREAD_ID_OFFSET 0 +#define L_EXEC_TIME_OFFSET 4 +#define L_SKIP_LINES_OFFSET 8 +#define L_DB_LEN_OFFSET 12 +#define L_TBL_LEN_OFFSET 13 +#define L_NUM_FIELDS_OFFSET 14 +#define L_DATA_OFFSET LOAD_HEADER_LEN + + #define QUERY_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN) +#define QUERY_DATA_OFFSET (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN) #define ROTATE_EVENT_OVERHEAD LOG_EVENT_HEADER_LEN #define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN+sizeof(sql_ex_info)) #define BINLOG_MAGIC "\xfe\x62\x69\x6e" +#define LOG_EVENT_TIME_F 0x1 + enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, LOAD_EVENT=6, SLAVE_EVENT=7}; @@ -57,6 +116,8 @@ enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID #ifndef MYSQL_CLIENT class String; +class MYSQL_LOG; +class THD; #endif extern uint32 server_id; @@ -68,8 +129,9 @@ class Log_event public: time_t when; ulong exec_time; - int valid_exec_time; // if false, the exec time setting is bogus uint32 server_id; + uint32 log_seq; + uint16 flags; static void *operator new(size_t size) { @@ -86,17 +148,22 @@ public: virtual int write_data(IO_CACHE* file __attribute__((unused))) { return 0; } virtual Log_event_type get_type_code() = 0; Log_event(time_t when_arg, ulong exec_time_arg = 0, - int valid_exec_time_arg = 0, uint32 server_id_arg = 0): + int valid_exec_time = 0, uint32 server_id_arg = 0, + uint32 log_seq_arg = 0, uint16 flags_arg = 0): when(when_arg), exec_time(exec_time_arg), - valid_exec_time(valid_exec_time_arg) + log_seq(log_seq_arg),flags(0) { server_id = server_id_arg ? server_id_arg : (::server_id); + if(valid_exec_time) + flags |= LOG_EVENT_TIME_F; } - Log_event(const char* buf): valid_exec_time(0) + Log_event(const char* buf) { when = uint4korr(buf); - server_id = uint4korr(buf + 5); + server_id = uint4korr(buf + SERVER_ID_OFFSET); + log_seq = uint4korr(buf + LOG_SEQ_OFFSET); + flags = uint2korr(buf + FLAGS_OFFSET); } virtual ~Log_event() {} @@ -114,6 +181,7 @@ public: #ifndef MYSQL_CLIENT static int read_log_event(IO_CACHE* file, String* packet, pthread_mutex_t* log_lock); + void set_log_seq(THD* thd, MYSQL_LOG* log); #endif }; @@ -136,7 +204,8 @@ public: THD* thd; bool cache_stmt; Query_log_event(THD* thd_arg, const char* query_arg, bool using_trans=0): - Log_event(thd_arg->start_time,0,1,thd_arg->server_id), data_buf(0), + Log_event(thd_arg->start_time,0,1,thd_arg->server_id,thd_arg->log_seq), + data_buf(0), query(query_arg), db(thd_arg->db), q_len(thd_arg->query_length), error_code(thd_arg->killed ? ER_SERVER_SHUTDOWN: thd_arg->net.last_errno), thread_id(thd_arg->thread_id), thd(thd_arg), @@ -150,7 +219,6 @@ public: } #endif - Query_log_event(IO_CACHE* file, time_t when, uint32 server_id_arg); Query_log_event(const char* buf, int event_len); ~Query_log_event() { @@ -192,7 +260,6 @@ public: #endif Slave_log_event(const char* buf, int event_len); - Slave_log_event(IO_CACHE* file, time_t when, uint32 server_id_arg); ~Slave_log_event(); int get_data_size(); Log_event_type get_type_code() { return SLAVE_EVENT; } @@ -263,7 +330,6 @@ public: time_t end_time; time(&end_time); exec_time = (ulong) (end_time - thd->start_time); - valid_exec_time = 1; db_len = (db) ? (uint32) strlen(db) : 0; table_name_len = (table_name) ? (uint32) strlen(table_name) : 0; fname_len = (fname) ? (uint) strlen(fname) : 0; @@ -319,7 +385,6 @@ public: void set_fields(List &fields_arg); #endif - Load_log_event(IO_CACHE * file, time_t when, uint32 server_id_arg); Load_log_event(const char* buf, int event_len); ~Load_log_event() { @@ -351,23 +416,12 @@ class Start_log_event: public Log_event public: uint32 created; uint16 binlog_version; - char server_version[50]; + char server_version[ST_SERVER_VER_LEN]; Start_log_event() :Log_event(time(NULL)),binlog_version(BINLOG_VERSION) { created = (uint32) when; - memcpy(server_version, ::server_version, sizeof(server_version)); - } - Start_log_event(IO_CACHE* file, time_t when_arg, uint32 server_id_arg) : - Log_event(when_arg, 0, 0, server_id_arg) - { - char buf[sizeof(server_version) + 2 + 4 + 4]; - if (my_b_read(file, (byte*) buf, sizeof(buf))) - return; - binlog_version = uint2korr(buf+4); - memcpy(server_version, buf + 6, sizeof(server_version)); - server_version[sizeof(server_version)-1]=0; - created = uint4korr(buf + 6 + sizeof(server_version)); + memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); } Start_log_event(const char* buf); @@ -376,8 +430,7 @@ public: int write_data(IO_CACHE* file); int get_data_size() { - // sizeof(binlog_version) + sizeof(server_version) sizeof(created) - return 2 + sizeof(server_version) + 4; + return START_HEADER_LEN; } void print(FILE* file, bool short_form = 0, char* last_db = 0); }; @@ -390,7 +443,6 @@ public: Intvar_log_event(uchar type_arg, ulonglong val_arg) :Log_event(time(NULL)),val(val_arg),type(type_arg) {} - Intvar_log_event(IO_CACHE* file, time_t when, uint32 server_id_arg); Intvar_log_event(const char* buf); ~Intvar_log_event() {} Log_event_type get_type_code() { return INTVAR_EVENT;} @@ -406,12 +458,6 @@ class Stop_log_event: public Log_event public: Stop_log_event() :Log_event(time(NULL)) {} - Stop_log_event(IO_CACHE* file, time_t when_arg, uint32 server_id_arg): - Log_event(when_arg,0,0,server_id_arg) - { - byte skip[4]; - my_b_read(file, skip, sizeof(skip)); // skip the event length - } Stop_log_event(const char* buf):Log_event(buf) { } @@ -434,7 +480,6 @@ public: alloced(0) {} - Rotate_log_event(IO_CACHE* file, time_t when, uint32 server_id_arg) ; Rotate_log_event(const char* buf, int event_len); ~Rotate_log_event() { diff --git a/sql/sql_class.cc b/sql/sql_class.cc index eedfd21e4c3..b6cba673bea 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -96,6 +96,7 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0), current_linfo = 0; slave_thread = 0; slave_proxy_id = 0; + log_seq = 0; cond_count=0; convert_set=0; mysys_var=0; diff --git a/sql/sql_class.h b/sql/sql_class.h index 3e10e8d28e4..1cc1dab8cae 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -63,11 +63,15 @@ class MYSQL_LOG { char time_buff[20],db[NAME_LEN+1]; char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN]; bool write_error,inited; + uint32 log_seq; // current event sequence number + // needed this for binlog bool no_rotate; // for binlog - if log name can never change // we should not try to rotate it or write any rotation events // the user should use FLUSH MASTER instead of FLUSH LOGS for // purging + friend class Log_event; + public: MYSQL_LOG(); ~MYSQL_LOG(); @@ -243,6 +247,7 @@ public: struct st_my_thread_var *mysys_var; enum enum_server_command command; uint32 server_id; + uint32 log_seq; const char *where; time_t start_time,time_after_lock,user_time; time_t connect_time,thr_create_time; // track down slow pthread_create diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 714a9e10db7..ea777402e90 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -74,8 +74,10 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, uint ident_len = (uint) strlen(p); ulong event_len = ident_len + sizeof(header); - int4store(header + EVENT_TYPE_OFFSET + 1, server_id); + int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + EVENT_LEN_OFFSET, event_len); + int2store(header + FLAGS_OFFSET, 0); + int4store(header + LOG_SEQ_OFFSET, 0); packet->append(header, sizeof(header)); packet->append(p,ident_len); if(my_net_write(net, (char*)packet->ptr(), packet->length())) -- cgit v1.2.1