diff options
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r-- | sql/log_event.cc | 2478 |
1 files changed, 1922 insertions, 556 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 9701ef2ff00..d4225b39704 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -183,10 +183,12 @@ static void cleanup_load_tmpdir() write_str() */ -static bool write_str(IO_CACHE *file, char *str, byte length) +static bool write_str(IO_CACHE *file, char *str, uint length) { - return (my_b_safe_write(file, &length, 1) || - my_b_safe_write(file, (byte*) str, (int) length)); + byte tmp[1]; + tmp[0]= (byte) length; + return (my_b_safe_write(file, tmp, sizeof(tmp)) || + my_b_safe_write(file, (byte*) str, length)); } @@ -194,17 +196,18 @@ static bool write_str(IO_CACHE *file, char *str, byte length) read_str() */ -static inline int read_str(char * &buf, char *buf_end, char * &str, - uint8 &len) +static inline int read_str(char **buf, char *buf_end, char **str, + uint8 *len) { - if (buf + (uint) (uchar) *buf >= buf_end) + if (*buf + ((uint) (uchar) **buf) >= buf_end) return 1; - len = (uint8) *buf; - str= buf+1; - buf+= (uint) len+1; + *len= (uint8) **buf; + *str= (*buf)+1; + (*buf)+= (uint) *len+1; return 0; } + /* Transforms a string into "" or its expression in 0x... form. */ @@ -228,9 +231,25 @@ static char *str_to_hex(char *to, char *from, uint len) return p; // pointer to end 0 of 'to' } +/* + Prints a "session_var=value" string. Used by mysqlbinlog to print some SET + commands just before it prints a query. +*/ + +static void print_set_option(FILE* file, uint32 bits_changed, uint32 option, + uint32 flags, const char* name, bool* need_comma) +{ + if (bits_changed & option) + { + if (*need_comma) + fprintf(file,", "); + fprintf(file,"%s=%d", name, (bool)(flags & option)); + *need_comma= 1; + } +} /************************************************************************** - Log_event methods + Log_event methods (= the parent class of all events) **************************************************************************/ /* @@ -240,7 +259,7 @@ static char *str_to_hex(char *to, char *from, uint len) const char* Log_event::get_type_str() { switch(get_type_code()) { - case START_EVENT: return "Start"; + case START_EVENT_V3: return "Start_v3"; case STOP_EVENT: return "Stop"; case QUERY_EVENT: return "Query"; case ROTATE_EVENT: return "Rotate"; @@ -253,8 +272,12 @@ const char* Log_event::get_type_str() case DELETE_FILE_EVENT: return "Delete_file"; case EXEC_LOAD_EVENT: return "Exec_load"; case RAND_EVENT: return "RAND"; + case XID_EVENT: return "Xid"; case USER_VAR_EVENT: return "User var"; - default: return "Unknown"; /* impossible */ + case FORMAT_DESCRIPTION_EVENT: return "Format_desc"; + case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query"; + case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; + default: return "Unknown"; /* impossible */ } } @@ -265,25 +288,23 @@ const char* Log_event::get_type_str() #ifndef MYSQL_CLIENT Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) - :log_pos(0), temp_buf(0), exec_time(0), cached_event_len(0), - flags(flags_arg), thd(thd_arg) + :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg) { server_id= thd->server_id; when= thd->start_time; - cache_stmt= (using_trans && - (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); + cache_stmt= using_trans; } /* - This minimal constructor is for when you are not even sure that there is a - valid THD. For example in the server when we are shutting down or flushing - logs after receiving a SIGHUP (then we must write a Rotate to the binlog but - we have no THD, so we need this minimal constructor). + This minimal constructor is for when you are not even sure that there + is a valid THD. For example in the server when we are shutting down or + flushing logs after receiving a SIGHUP (then we must write a Rotate to + the binlog but we have no THD, so we need this minimal constructor). */ Log_event::Log_event() - :temp_buf(0), exec_time(0), cached_event_len(0), flags(0), cache_stmt(0), + :temp_buf(0), exec_time(0), flags(0), cache_stmt(0), thd(0) { server_id= ::server_id; @@ -297,24 +318,71 @@ Log_event::Log_event() Log_event::Log_event() */ -Log_event::Log_event(const char* buf, bool old_format) - :temp_buf(0), cached_event_len(0), cache_stmt(0) +Log_event::Log_event(const char* buf, + const Format_description_log_event* description_event) + :temp_buf(0), cache_stmt(0) { +#ifndef MYSQL_CLIENT + thd = 0; +#endif when = uint4korr(buf); server_id = uint4korr(buf + SERVER_ID_OFFSET); - if (old_format) + if (description_event->binlog_version==1) { - log_pos=0; - flags=0; + log_pos= 0; + flags= 0; + return; } - else + /* 4.0 or newer */ + log_pos= uint4korr(buf + LOG_POS_OFFSET); + /* + If the log is 4.0 (so here it can only be a 4.0 relay log read by + the SQL thread or a 4.0 master binlog read by the I/O thread), + log_pos is the beginning of the event: we transform it into the end + of the event, which is more useful. + But how do you know that the log is 4.0: you know it if + description_event is version 3 *and* you are not reading a + Format_desc (remember that mysqlbinlog starts by assuming that 5.0 + logs are in 4.0 format, until it finds a Format_desc). + */ + if (description_event->binlog_version==3 && + buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos) { - log_pos = uint4korr(buf + LOG_POS_OFFSET); - flags = uint2korr(buf + FLAGS_OFFSET); + /* + If log_pos=0, don't change it. log_pos==0 is a marker to mean + "don't change rli->group_master_log_pos" (see + inc_group_relay_log_pos()). As it is unreal log_pos, adding the + event len's is nonsense. For example, a fake Rotate event should + not have its log_pos (which is 0) changed or it will modify + Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense + value of (a non-zero offset which does not exist in the master's + binlog, so which will cause problems if the user uses this value + in CHANGE MASTER). + */ + log_pos+= uint4korr(buf + EVENT_LEN_OFFSET); } -#ifndef MYSQL_CLIENT - thd = 0; -#endif + DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); + + flags= uint2korr(buf + FLAGS_OFFSET); + if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) || + (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT)) + { + /* + These events always have a header which stops here (i.e. their + header is FROZEN). + */ + /* + Initialization to zero of all other Log_event members as they're + not specified. Currently there are no such members; in the future + there will be an event UID (but Format_description and Rotate + don't need this UID, as they are not propagated through + --log-slave-updates (remember the UID is used to not play a query + twice when you have two masters which are slaves of a 3rd master). + Then we are done. + */ + return; + } + /* otherwise, go on with reading the header from buf (nothing now) */ } #ifndef MYSQL_CLIENT @@ -340,41 +408,42 @@ int Log_event::exec_event(struct st_relay_log_info* rli) only in the case discussed above, 'if (rli)' is useless here. But as we are not 100% sure, keep it for now. */ - if (rli) + if (rli) { /* - If in a transaction, and if the slave supports transactions, - just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN - (not OPTION_NOT_AUTOCOMMIT) as transactions are logged - with BEGIN/COMMIT, not with SET AUTOCOMMIT= . - + If in a transaction, and if the slave supports transactions, just + inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN + (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with + BEGIN/COMMIT, not with SET AUTOCOMMIT= . + CAUTION: opt_using_transactions means - innodb || bdb ; suppose the master supports InnoDB and BDB, + innodb || bdb ; suppose the master supports InnoDB and BDB, but the slave supports only BDB, problems - will arise: + will arise: - suppose an InnoDB table is created on the master, - then it will be MyISAM on the slave - - but as opt_using_transactions is true, the slave will believe he is - transactional with the MyISAM table. And problems will come when one - does START SLAVE; STOP SLAVE; START SLAVE; (the slave will resume at - BEGIN whereas there has not been any rollback). This is the problem of - using opt_using_transactions instead of a finer - "does the slave support _the_transactional_handler_used_on_the_master_". - - More generally, we'll have problems when a query mixes a transactional - handler and MyISAM and STOP SLAVE is issued in the middle of the - "transaction". START SLAVE will resume at BEGIN while the MyISAM table - has already been updated. + - but as opt_using_transactions is true, the slave will believe he + is transactional with the MyISAM table. And problems will come + when one does START SLAVE; STOP SLAVE; START SLAVE; (the slave + will resume at BEGIN whereas there has not been any rollback). + This is the problem of using opt_using_transactions instead of a + finer "does the slave support + _the_transactional_handler_used_on_the_master_". + + More generally, we'll have problems when a query mixes a + transactional handler and MyISAM and STOP SLAVE is issued in the + middle of the "transaction". START SLAVE will resume at BEGIN + while the MyISAM table has already been updated. */ if ((thd->options & OPTION_BEGIN) && opt_using_transactions) - rli->inc_event_relay_log_pos(get_event_len()); + rli->inc_event_relay_log_pos(); else { - rli->inc_group_relay_log_pos(get_event_len(),log_pos); + rli->inc_group_relay_log_pos(log_pos); flush_relay_log_info(rli); /* - Note that Rotate_log_event::exec_event() does not call this function, - so there is no chance that a fake rotate event resets + Note that Rotate_log_event::exec_event() does not call this + function, so there is no chance that a fake rotate event resets last_master_timestamp. Note that we update without mutex (probably ok - except in some very rare cases, only consequence is that value may take some time to @@ -435,58 +504,101 @@ void Log_event::init_show_field_list(List<Item>* field_list) field_list->push_back(new Item_empty_string("Event_type", 20)); field_list->push_back(new Item_return_int("Server_id", 10, MYSQL_TYPE_LONG)); - field_list->push_back(new Item_return_int("Orig_log_pos", 11, + field_list->push_back(new Item_return_int("End_log_pos", 11, MYSQL_TYPE_LONGLONG)); field_list->push_back(new Item_empty_string("Info", 20)); } -#endif /* !MYSQL_CLIENT */ /* Log_event::write() */ -int Log_event::write(IO_CACHE* file) +bool Log_event::write_header(IO_CACHE* file, ulong event_data_length) { - return (write_header(file) || write_data(file)) ? -1 : 0; -} + byte header[LOG_EVENT_HEADER_LEN]; + DBUG_ENTER("Log_event::write_header"); + /* Store number of bytes that will be written by this event */ + data_written= event_data_length + sizeof(header); -/* - Log_event::write_header() -*/ + /* + log_pos != 0 if this is relay-log event. In this case we should not + change the position + */ -int Log_event::write_header(IO_CACHE* file) -{ - char buf[LOG_EVENT_HEADER_LEN]; - char* pos = buf; - int4store(pos, (ulong) when); // timestamp - pos += 4; - *pos++ = get_type_code(); // event type code - int4store(pos, server_id); - pos += 4; - long tmp=get_data_size() + LOG_EVENT_HEADER_LEN; - int4store(pos, tmp); - pos += 4; - int4store(pos, log_pos); - pos += 4; - int2store(pos, flags); - pos += 2; - return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf))); + if (is_artificial_event()) + { + /* + We should not do any cleanup on slave when reading this. We + mark this by setting log_pos to 0. Start_log_event_v3() will + detect this on reading and set artificial_event=1 for the event. + */ + log_pos= 0; + } + else if (!log_pos) + { + /* + Calculate position of end of event + + Note that with a SEQ_READ_APPEND cache, my_b_tell() does not + work well. So this will give slightly wrong positions for the + Format_desc/Rotate/Stop events which the slave writes to its + relay log. For example, the initial Format_desc will have + end_log_pos=91 instead of 95. Because after writing the first 4 + bytes of the relay log, my_b_tell() still reports 0. Because + my_b_append() does not update the counter which my_b_tell() + later uses (one should probably use my_b_append_tell() to work + around this). To get right positions even when writing to the + relay log, we use the (new) my_b_safe_tell(). + + Note that this raises a question on the correctness of all these + DBUG_ASSERT(my_b_tell()=rli->event_relay_log_pos). + + If in a transaction, the log_pos which we calculate below is not + very good (because then my_b_safe_tell() returns start position + of the BEGIN, so it's like the statement was at the BEGIN's + place), but it's not a very serious problem (as the slave, when + it is in a transaction, does not take those end_log_pos into + account (as it calls inc_event_relay_log_pos()). To be fixed + later, so that it looks less strange. But not bug. + */ + + log_pos= my_b_safe_tell(file)+data_written; + } + + /* + Header will be of size LOG_EVENT_HEADER_LEN for all events, except for + FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be + LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header, + because we read them before knowing the format). + */ + + int4store(header, (ulong) when); // timestamp + header[EVENT_TYPE_OFFSET]= get_type_code(); + int4store(header+ SERVER_ID_OFFSET, server_id); + int4store(header+ EVENT_LEN_OFFSET, data_written); + int4store(header+ LOG_POS_OFFSET, log_pos); + int2store(header+ FLAGS_OFFSET, flags); + + DBUG_RETURN(my_b_safe_write(file, header, sizeof(header)) != 0); } /* Log_event::read_log_event() + + This needn't be format-tolerant, because we only read + LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length). + */ -#ifndef MYSQL_CLIENT int Log_event::read_log_event(IO_CACHE* file, String* packet, pthread_mutex_t* log_lock) { ulong data_len; int result=0; - char buf[LOG_EVENT_HEADER_LEN]; + char buf[LOG_EVENT_MINIMAL_HEADER_LEN]; DBUG_ENTER("read_log_event"); if (log_lock) @@ -506,24 +618,25 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, goto end; } data_len= uint4korr(buf + EVENT_LEN_OFFSET); - if (data_len < LOG_EVENT_HEADER_LEN || + if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN || data_len > current_thd->variables.max_allowed_packet) { DBUG_PRINT("error",("data_len: %ld", data_len)); - result= ((data_len < LOG_EVENT_HEADER_LEN) ? LOG_READ_BOGUS : + result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS : LOG_READ_TOO_LARGE); goto end; } packet->append(buf, sizeof(buf)); - data_len-= LOG_EVENT_HEADER_LEN; + data_len-= LOG_EVENT_MINIMAL_HEADER_LEN; if (data_len) { if (packet->append(file, data_len)) { /* - Here we should never hit EOF in a non-error condition. + Here if we hit EOF it's really an error: as data_len is >=0 + there's supposed to be more bytes available. EOF means we are reading the event partially, which should - never happen. + never happen: either we read badly or the binlog is truncated. */ result= file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO; /* Implicit goto end; */ @@ -540,42 +653,63 @@ end: #ifndef MYSQL_CLIENT #define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock); #define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock); -#define max_allowed_packet current_thd->variables.max_allowed_packet #else #define UNLOCK_MUTEX #define LOCK_MUTEX -#define max_allowed_packet (*mysql_get_parameters()->p_max_allowed_packet) #endif /* Log_event::read_log_event() NOTE: - Allocates memory; The caller is responsible for clean-up + Allocates memory; The caller is responsible for clean-up. */ #ifndef MYSQL_CLIENT Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock, - bool old_format) + const Format_description_log_event *description_event) #else -Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) -#endif +Log_event* Log_event::read_log_event(IO_CACHE* file, + const Format_description_log_event *description_event) +#endif { - char head[LOG_EVENT_HEADER_LEN]; - uint header_size= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; + DBUG_ENTER("Log_event::read_log_event(IO_CACHE *, Format_description_log_event *"); + DBUG_ASSERT(description_event != 0); + char head[LOG_EVENT_MINIMAL_HEADER_LEN]; + /* + First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to + check the event for sanity and to know its length; no need to really parse + it. We say "at most" because this could be a 3.23 master, which has header + of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's + "minimal" over the set {MySQL >=4.0}). + */ + uint header_size= min(description_event->common_header_len, + LOG_EVENT_MINIMAL_HEADER_LEN); LOCK_MUTEX; + DBUG_PRINT("info", ("my_b_tell=%lu", my_b_tell(file))); if (my_b_read(file, (byte *) head, header_size)) { + DBUG_PRINT("info", ("Log_event::read_log_event(IO_CACHE*,Format_desc*) \ +failed my_b_read")); UNLOCK_MUTEX; - return 0; + /* + No error here; it could be that we are at the file's end. However + if the next my_b_read() fails (below), it will be an error as we + were able to read the first bytes. + */ + DBUG_RETURN(0); } uint data_len = uint4korr(head + EVENT_LEN_OFFSET); char *buf= 0; const char *error= 0; Log_event *res= 0; +#ifndef max_allowed_packet + THD *thd=current_thd; + uint max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~0; +#endif if (data_len > max_allowed_packet) { @@ -602,15 +736,16 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format) error = "read error"; goto err; } - if ((res = read_log_event(buf, data_len, &error, old_format))) + if ((res= read_log_event(buf, data_len, &error, description_event))) res->register_temp_buf(buf); err: UNLOCK_MUTEX; - if (error) + if (!res) { - sql_print_error("\ -Error in Log_event::read_log_event(): '%s', data_len: %d, event_type: %d", + DBUG_ASSERT(error != 0); + sql_print_error("Error in Log_event::read_log_event(): " + "'%s', data_len: %d, event_type: %d", error,data_len,head[EVENT_TYPE_OFFSET]); my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); /* @@ -623,94 +758,120 @@ Error in Log_event::read_log_event(): '%s', data_len: %d, event_type: %d", */ file->error= -1; } - return res; + DBUG_RETURN(res); } /* Log_event::read_log_event() + Binlog format tolerance is in (buf, event_len, description_event) + constructors. */ -Log_event* Log_event::read_log_event(const char* buf, int event_len, - const char **error, bool old_format) +Log_event* Log_event::read_log_event(const char* buf, uint event_len, + const char **error, + const Format_description_log_event *description_event) { - DBUG_ENTER("Log_event::read_log_event"); - + Log_event* ev; + DBUG_ENTER("Log_event::read_log_event(char*,...)"); + DBUG_ASSERT(description_event != 0); + DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version)); if (event_len < EVENT_LEN_OFFSET || (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET)) { *error="Sanity check failed"; // Needed to free buffer DBUG_RETURN(NULL); // general sanity check - will fail on a partial read } - - Log_event* ev = NULL; - + switch(buf[EVENT_TYPE_OFFSET]) { case QUERY_EVENT: - ev = new Query_log_event(buf, event_len, old_format); + ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT); break; case LOAD_EVENT: - ev = new Create_file_log_event(buf, event_len, old_format); + ev = new Load_log_event(buf, event_len, description_event); break; case NEW_LOAD_EVENT: - ev = new Load_log_event(buf, event_len, old_format); + ev = new Load_log_event(buf, event_len, description_event); break; case ROTATE_EVENT: - ev = new Rotate_log_event(buf, event_len, old_format); + ev = new Rotate_log_event(buf, event_len, description_event); break; #ifdef HAVE_REPLICATION - case SLAVE_EVENT: + case SLAVE_EVENT: /* can never happen (unused event) */ ev = new Slave_log_event(buf, event_len); break; #endif /* HAVE_REPLICATION */ case CREATE_FILE_EVENT: - ev = new Create_file_log_event(buf, event_len, old_format); + ev = new Create_file_log_event(buf, event_len, description_event); break; case APPEND_BLOCK_EVENT: - ev = new Append_block_log_event(buf, event_len); + ev = new Append_block_log_event(buf, event_len, description_event); break; case DELETE_FILE_EVENT: - ev = new Delete_file_log_event(buf, event_len); + ev = new Delete_file_log_event(buf, event_len, description_event); break; case EXEC_LOAD_EVENT: - ev = new Execute_load_log_event(buf, event_len); + ev = new Execute_load_log_event(buf, event_len, description_event); break; - case START_EVENT: - ev = new Start_log_event(buf, old_format); + case START_EVENT_V3: /* this is sent only by MySQL <=4.x */ + ev = new Start_log_event_v3(buf, description_event); break; -#ifdef HAVE_REPLICATION case STOP_EVENT: - ev = new Stop_log_event(buf, old_format); + ev = new Stop_log_event(buf, description_event); break; -#endif /* HAVE_REPLICATION */ case INTVAR_EVENT: - ev = new Intvar_log_event(buf, old_format); + ev = new Intvar_log_event(buf, description_event); + break; + case XID_EVENT: + ev = new Xid_log_event(buf, description_event); break; case RAND_EVENT: - ev = new Rand_log_event(buf, old_format); + ev = new Rand_log_event(buf, description_event); break; case USER_VAR_EVENT: - ev = new User_var_log_event(buf, old_format); + ev = new User_var_log_event(buf, description_event); + break; + case FORMAT_DESCRIPTION_EVENT: + ev = new Format_description_log_event(buf, event_len, description_event); + break; + case BEGIN_LOAD_QUERY_EVENT: + ev = new Begin_load_query_log_event(buf, event_len, description_event); + break; + case EXECUTE_LOAD_QUERY_EVENT: + ev = new Execute_load_query_log_event(buf, event_len, description_event); break; default: + DBUG_PRINT("error",("Unknown evernt code: %d",(int) buf[EVENT_TYPE_OFFSET])); + ev= NULL; break; } + + /* + is_valid() are small event-specific sanity tests which are + important; for example there are some my_malloc() in constructors + (e.g. Query_log_event::Query_log_event(char*...)); when these + my_malloc() fail we can't return an error out of the constructor + (because constructor is "void") ; so instead we leave the pointer we + wanted to allocate (e.g. 'query') to 0 and we test it in is_valid(). + Same for Format_description_log_event, member 'post_header_len'. + */ if (!ev || !ev->is_valid()) { + DBUG_PRINT("error",("Found invalid event in binary log")); + delete ev; #ifdef MYSQL_CLIENT - if (!force_opt) + if (!force_opt) /* then mysqlbinlog dies */ { *error= "Found invalid event in binary log"; DBUG_RETURN(0); } - ev= new Unknown_log_event(buf, old_format); + ev= new Unknown_log_event(buf, description_event); #else *error= "Found invalid event in binary log"; DBUG_RETURN(0); #endif } - ev->cached_event_len = event_len; DBUG_RETURN(ev); } @@ -725,7 +886,7 @@ void Log_event::print_header(FILE* file) char llbuff[22]; fputc('#', file); print_timestamp(file); - fprintf(file, " server id %d log_pos %s ", server_id, + fprintf(file, " server id %d end_log_pos %s ", server_id, llstr(log_pos,llbuff)); } @@ -757,19 +918,6 @@ void Log_event::print_timestamp(FILE* file, time_t* ts) #endif /* MYSQL_CLIENT */ -/* - Log_event::set_log_pos() -*/ - -#ifndef MYSQL_CLIENT -void Log_event::set_log_pos(MYSQL_LOG* log) -{ - if (!log_pos) - log_pos = my_b_tell(&log->log_file); -} -#endif /* !MYSQL_CLIENT */ - - /************************************************************************** Query_log_event methods **************************************************************************/ @@ -778,15 +926,20 @@ void Log_event::set_log_pos(MYSQL_LOG* log) /* Query_log_event::pack_info() + This (which is used only for SHOW BINLOG EVENTS) could be updated to + print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is + only an information, it does not produce suitable queries to replay (for + example it does not print LOAD DATA INFILE). */ void Query_log_event::pack_info(Protocol *protocol) { + // TODO: show the catalog ?? char *buf, *pos; if (!(buf= my_malloc(9 + db_len + q_len, MYF(MY_WME)))) return; - pos= buf; - if (!(flags & LOG_EVENT_SUPPRESS_USE_F) + pos= buf; + if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db && db_len) { pos= strmov(buf, "use `"); @@ -803,34 +956,50 @@ void Query_log_event::pack_info(Protocol *protocol) } #endif +#ifndef MYSQL_CLIENT -/* - Query_log_event::write() -*/ - -int Query_log_event::write(IO_CACHE* file) +/* Utility function for the next method */ +static void write_str_with_code_and_len(char **dst, const char *src, + int len, uint code) { - return query ? Log_event::write(file) : -1; + DBUG_ASSERT(src); + *((*dst)++)= code; + *((*dst)++)= (uchar) len; + bmove(*dst, src, len); + (*dst)+= len; } /* - Query_log_event::write_data() + Query_log_event::write() + + NOTES: + In this event we have to modify the header to have the correct + EVENT_LEN_OFFSET as we don't yet know how many status variables we + will print! */ -int Query_log_event::write_data(IO_CACHE* file) +bool Query_log_event::write(IO_CACHE* file) { - char buf[QUERY_HEADER_LEN]; + uchar buf[QUERY_HEADER_LEN+ + 1+4+ // code of flags2 and flags2 + 1+8+ // code of sql_mode and sql_mode + 1+1+FN_REFLEN+ // code of catalog and catalog length and catalog + 1+4+ // code of autoinc and the 2 autoinc variables + 1+6+ // code of charset and charset + 1+1+MAX_TIME_ZONE_NAME_LENGTH // code of tz and tz length and tz name + ], *start, *start_of_status; + ulong event_length; if (!query) - return -1; - + return 1; // Something wrong with event + /* We want to store the thread id: (- as an information for the user when he reads the binlog) - if the query uses temporary table: for the slave SQL thread to know to which master connection the temp table belongs. - Now imagine we (write_data()) are called by the slave SQL thread (we are + Now imagine we (write()) are called by the slave SQL thread (we are logging a query executed by this thread; the slave runs with --log-slave-updates). Then this query will be logged with thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of @@ -867,78 +1036,324 @@ int Query_log_event::write_data(IO_CACHE* file) buf[Q_DB_LEN_OFFSET] = (char) db_len; int2store(buf + Q_ERR_CODE_OFFSET, error_code); - return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) || - my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || - my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0; + /* + You MUST always write status vars in increasing order of code. This + guarantees that a slightly older slave will be able to parse those he + knows. + */ + start_of_status= start= buf+QUERY_HEADER_LEN; + if (flags2_inited) + { + *start++= Q_FLAGS2_CODE; + int4store(start, flags2); + start+= 4; + } + if (sql_mode_inited) + { + *start++= Q_SQL_MODE_CODE; + int8store(start, (ulonglong)sql_mode); + start+= 8; + } + if (catalog_len) // i.e. this var is inited (false for 4.0 events) + { + write_str_with_code_and_len((char **)(&start), + catalog, catalog_len, Q_CATALOG_NZ_CODE); + /* + In 5.0.x where x<4 masters we used to store the end zero here. This was + a waste of one byte so we don't do it in x>=4 masters. We change code to + Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves + of this x>=4 master segfault (expecting a zero when there is + none). Remaining compatibility problems are: the older slave will not + find the catalog; but it is will not crash, and it's not an issue + that it does not find the catalog as catalogs were not used in these + older MySQL versions (we store it in binlog and read it from relay log + but do nothing useful with it). What is an issue is that the older slave + will stop processing the Q_* blocks (and jumps to the db/query) as soon + as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read + Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in + various ways. Documented that you should not mix alpha/beta versions if + they are not exactly the same version, with example of 5.0.3->5.0.2 and + 5.0.4->5.0.3. If replication is from older to new, the new will + recognize Q_CATALOG_CODE and have no problem. + */ + } + if (auto_increment_increment != 1) + { + *start++= Q_AUTO_INCREMENT; + int2store(start, auto_increment_increment); + int2store(start+2, auto_increment_offset); + start+= 4; + } + if (charset_inited) + { + *start++= Q_CHARSET_CODE; + memcpy(start, charset, 6); + start+= 6; + } + if (time_zone_len) + { + /* In the TZ sys table, column Name is of length 64 so this should be ok */ + DBUG_ASSERT(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH); + *start++= Q_TIME_ZONE_CODE; + *start++= time_zone_len; + memcpy(start, time_zone_str, time_zone_len); + start+= time_zone_len; + } + /* + Here there could be code like + if (command-line-option-which-says-"log_this_variable" && inited) + { + *start++= Q_THIS_VARIABLE_CODE; + int4store(start, this_variable); + start+= 4; + } + */ + + /* Store length of status variables */ + status_vars_len= (uint) (start-start_of_status); + int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len); + + /* + Calculate length of whole event + The "1" below is the \0 in the db's length + */ + event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len; + + return (write_header(file, event_length) || + my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) || + write_post_header_for_derived(file) || + my_b_safe_write(file, (byte*) start_of_status, + (uint) (start-start_of_status)) || + my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) || + my_b_safe_write(file, (byte*) query, q_len)) ? 1 : 0; } /* Query_log_event::Query_log_event() */ - -#ifndef MYSQL_CLIENT Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length, bool using_trans, bool suppress_use) - :Log_event(thd_arg, + :Log_event(thd_arg, ((thd_arg->tmp_table_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0) | (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0)), using_trans), - data_buf(0), query(query_arg), + data_buf(0), query(query_arg), catalog(thd_arg->catalog), db(thd_arg->db), q_len((uint32) query_length), - error_code(thd_arg->killed ? + error_code((thd_arg->killed != THD::NOT_KILLED) ? ((thd_arg->system_thread & SYSTEM_THREAD_DELAYED_INSERT) ? - 0 : ER_SERVER_SHUTDOWN) : thd_arg->net.last_errno), + 0 : thd->killed_errno()) : thd_arg->net.last_errno), thread_id(thd_arg->thread_id), /* save the original thread id; we already know the server id */ - slave_proxy_id(thd_arg->variables.pseudo_thread_id) + slave_proxy_id(thd_arg->variables.pseudo_thread_id), + flags2_inited(1), sql_mode_inited(1), charset_inited(1), + sql_mode(thd_arg->variables.sql_mode), + auto_increment_increment(thd_arg->variables.auto_increment_increment), + auto_increment_offset(thd_arg->variables.auto_increment_offset) { time_t end_time; time(&end_time); exec_time = (ulong) (end_time - thd->start_time); + catalog_len = (catalog) ? (uint32) strlen(catalog) : 0; + /* status_vars_len is set just before writing the event */ db_len = (db) ? (uint32) strlen(db) : 0; + /* + If we don't use flags2 for anything else than options contained in + thd->options, it would be more efficient to flags2=thd_arg->options + (OPTIONS_WRITTEN_TO_BINLOG would be used only at reading time). + But it's likely that we don't want to use 32 bits for 3 bits; in the future + we will probably want to reclaim the 29 bits. So we need the &. + */ + flags2= thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG; + DBUG_ASSERT(thd->variables.character_set_client->number < 256*256); + DBUG_ASSERT(thd->variables.collation_connection->number < 256*256); + DBUG_ASSERT(thd->variables.collation_server->number < 256*256); + int2store(charset, thd_arg->variables.character_set_client->number); + int2store(charset+2, thd_arg->variables.collation_connection->number); + int2store(charset+4, thd_arg->variables.collation_server->number); + if (thd_arg->time_zone_used) + { + /* + Note that our event becomes dependent on the Time_zone object + representing the time zone. Fortunately such objects are never deleted + or changed during mysqld's lifetime. + */ + time_zone_len= thd_arg->variables.time_zone->get_name()->length(); + time_zone_str= thd_arg->variables.time_zone->get_name()->ptr(); + } + else + time_zone_len= 0; + DBUG_PRINT("info",("Query_log_event has flags2=%lu sql_mode=%lu",flags2,sql_mode)); } #endif /* MYSQL_CLIENT */ +/* 2 utility functions for the next method */ + +static void get_str_len_and_pointer(const char **dst, const char **src, uint *len) +{ + if ((*len= **src)) + *dst= *src + 1; // Will be copied later + (*src)+= *len+1; +} + + +static void copy_str_and_move(char **dst, const char **src, uint len) +{ + memcpy(*dst, *src, len); + *src= *dst; + (*dst)+= len; + *(*dst)++= 0; +} + + /* Query_log_event::Query_log_event() + This is used by the SQL slave thread to prepare the event before execution. */ -Query_log_event::Query_log_event(const char* buf, int event_len, - bool old_format) - :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL) +Query_log_event::Query_log_event(const char* buf, uint event_len, + const Format_description_log_event *description_event, + Log_event_type event_type) + :Log_event(buf, description_event), data_buf(0), query(NullS), + db(NullS), catalog_len(0), status_vars_len(0), + flags2_inited(0), sql_mode_inited(0), charset_inited(0), + auto_increment_increment(1), auto_increment_offset(1), + time_zone_len(0) { ulong data_len; - if (old_format) - { - if ((uint)event_len < OLD_HEADER_LEN + QUERY_HEADER_LEN) - return; - data_len = event_len - (QUERY_HEADER_LEN + OLD_HEADER_LEN); - buf += OLD_HEADER_LEN; - } - else - { - if ((uint)event_len < QUERY_EVENT_OVERHEAD) - return; - data_len = event_len - QUERY_EVENT_OVERHEAD; - buf += LOG_EVENT_HEADER_LEN; - } - + uint32 tmp; + uint8 common_header_len, post_header_len; + char *start; + const char *end; + bool catalog_nz= 1; + DBUG_ENTER("Query_log_event::Query_log_event(char*,...)"); + + common_header_len= description_event->common_header_len; + post_header_len= description_event->post_header_len[event_type-1]; + DBUG_PRINT("info",("event_len=%ld, common_header_len=%d, post_header_len=%d", + event_len, common_header_len, post_header_len)); + + /* + We test if the event's length is sensible, and if so we compute data_len. + We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant. + We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0. + */ + if (event_len < (uint)(common_header_len + post_header_len)) + DBUG_VOID_RETURN; + data_len = event_len - (common_header_len + post_header_len); + buf+= common_header_len; + + slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET); exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET); + db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars error_code = uint2korr(buf + Q_ERR_CODE_OFFSET); - if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME)))) - return; + /* + 5.0 format starts here. + Depending on the format, we may or not have affected/warnings etc + The remnent post-header to be parsed has length: + */ + tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN; + if (tmp) + { + status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET); + data_len-= status_vars_len; + DBUG_PRINT("info", ("Query_log_event has status_vars_len: %u", + (uint) status_vars_len)); + tmp-= 2; + } + /* + We have parsed everything we know in the post header for QUERY_EVENT, + the rest of post header is either comes from older version MySQL or + dedicated to derived events (e.g. Execute_load_query...) + */ - memcpy(data_buf, buf + Q_DATA_OFFSET, data_len); - slave_proxy_id= thread_id= uint4korr(buf + Q_THREAD_ID_OFFSET); - db = data_buf; - db_len = (uint)buf[Q_DB_LEN_OFFSET]; - query=data_buf + db_len + 1; - q_len = data_len - 1 - db_len; - *((char*)query+q_len) = 0; + /* variable-part: the status vars; only in MySQL 5.0 */ + + start= (char*) (buf+post_header_len); + end= (const char*) (start+status_vars_len); + for (const uchar* pos= (const uchar*) start; pos < (const uchar*) end;) + { + switch (*pos++) { + case Q_FLAGS2_CODE: + flags2_inited= 1; + flags2= uint4korr(pos); + DBUG_PRINT("info",("In Query_log_event, read flags2: %lu", flags2)); + pos+= 4; + break; + case Q_SQL_MODE_CODE: + { +#ifndef DBUG_OFF + char buff[22]; +#endif + sql_mode_inited= 1; + sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is ulonglong + DBUG_PRINT("info",("In Query_log_event, read sql_mode: %s", + llstr(sql_mode, buff))); + pos+= 8; + break; + } + case Q_CATALOG_NZ_CODE: + get_str_len_and_pointer(&catalog, (const char **)(&pos), &catalog_len); + break; + case Q_AUTO_INCREMENT: + auto_increment_increment= uint2korr(pos); + auto_increment_offset= uint2korr(pos+2); + pos+= 4; + break; + case Q_CHARSET_CODE: + { + charset_inited= 1; + memcpy(charset, pos, 6); + pos+= 6; + break; + } + case Q_TIME_ZONE_CODE: + { + get_str_len_and_pointer(&time_zone_str, (const char **)(&pos), &time_zone_len); + break; + } + case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */ + if ((catalog_len= *pos)) + catalog= (char*) pos+1; // Will be copied later + pos+= catalog_len+2; // leap over end 0 + catalog_nz= 0; // catalog has end 0 in event + break; + default: + /* That's why you must write status vars in growing order of code */ + DBUG_PRINT("info",("Query_log_event has unknown status vars (first has\ + code: %u), skipping the rest of them", (uint) *(pos-1))); + pos= (const uchar*) end; // Break loop + } + } + + if (!(start= data_buf = (char*) my_malloc(catalog_len + 1 + + time_zone_len + 1 + + data_len + 1, MYF(MY_WME)))) + DBUG_VOID_RETURN; + if (catalog_len) // If catalog is given + { + if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3. + copy_str_and_move(&start, &catalog, catalog_len); + else + { + memcpy(start, catalog, catalog_len+1); // copy end 0 + catalog= start; + start+= catalog_len+1; + } + } + if (time_zone_len) + copy_str_and_move(&start, &time_zone_str, time_zone_len); + + /* A 2nd variable part; this is common to all versions */ + memcpy((char*) start, end, data_len); // Copy db and query + start[data_len]= '\0'; // End query with \0 (For safetly) + db= start; + query= start + db_len + 1; + q_len= data_len - db_len -1; + DBUG_VOID_RETURN; } @@ -947,30 +1362,27 @@ Query_log_event::Query_log_event(const char* buf, int event_len, */ #ifdef MYSQL_CLIENT -void Query_log_event::print(FILE* file, bool short_form, char* last_db) +void Query_log_event::print_query_header(FILE* file, bool short_form, + LAST_EVENT_INFO* last_event_info) { + // TODO: print the catalog ?? char buff[40],*end; // Enough for SET TIMESTAMP + bool different_db= 1; + uint32 tmp; + if (!short_form) { print_header(file); - fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", - (ulong) thread_id, (ulong) exec_time, error_code); + fprintf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", + get_type_str(), (ulong) thread_id, (ulong) exec_time, error_code); } - bool different_db= 1; - - if (!(flags & LOG_EVENT_SUPPRESS_USE_F)) + if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db) { - if (db && last_db) - { - if (different_db= memcmp(last_db, db, db_len + 1)) - memcpy(last_db, db, db_len + 1); - } - - if (db && db[0] && different_db) - { + if (different_db= memcmp(last_event_info->db, db, db_len + 1)) + memcpy(last_event_info->db, db, db_len + 1); + if (db[0] && different_db) fprintf(file, "use %s;\n", db); - } } end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); @@ -979,8 +1391,114 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME)); if (flags & LOG_EVENT_THREAD_SPECIFIC_F) fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); + + /* + If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to + print (remember we don't produce mixed relay logs so there cannot be + 5.0 events before that one so there is nothing to reset). + */ + if (likely(flags2_inited)) /* likely as this will mainly read 5.0 logs */ + { + /* tmp is a bitmask of bits which have changed. */ + if (likely(last_event_info->flags2_inited)) + /* All bits which have changed */ + tmp= (last_event_info->flags2) ^ flags2; + else /* that's the first Query event we read */ + { + last_event_info->flags2_inited= 1; + tmp= ~((uint32)0); /* all bits have changed */ + } + + if (unlikely(tmp)) /* some bits have changed */ + { + bool need_comma= 0; + fprintf(file, "SET "); + print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2, + "@@session.foreign_key_checks", &need_comma); + print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2, + "@@session.sql_auto_is_null", &need_comma); + print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2, + "@@session.unique_checks", &need_comma); + fprintf(file,";\n"); + last_event_info->flags2= flags2; + } + } + + /* + Now the session variables; + it's more efficient to pass SQL_MODE as a number instead of a + comma-separated list. + FOREIGN_KEY_CHECKS, SQL_AUTO_IS_NULL, UNIQUE_CHECKS are session-only + variables (they have no global version; they're not listed in + sql_class.h), The tests below work for pure binlogs or pure relay + logs. Won't work for mixed relay logs but we don't create mixed + relay logs (that is, there is no relay log with a format change + except within the 3 first events, which mysqlbinlog handles + gracefully). So this code should always be good. + */ + + if (likely(sql_mode_inited)) + { + if (unlikely(!last_event_info->sql_mode_inited)) /* first Query event */ + { + last_event_info->sql_mode_inited= 1; + /* force a difference to force write */ + last_event_info->sql_mode= ~sql_mode; + } + if (unlikely(last_event_info->sql_mode != sql_mode)) + { + fprintf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); + last_event_info->sql_mode= sql_mode; + } + } + if (last_event_info->auto_increment_increment != auto_increment_increment || + last_event_info->auto_increment_offset != auto_increment_offset) + { + fprintf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", + auto_increment_increment,auto_increment_offset); + last_event_info->auto_increment_increment= auto_increment_increment; + last_event_info->auto_increment_offset= auto_increment_offset; + } + + /* TODO: print the catalog when we feature SET CATALOG */ + + if (likely(charset_inited)) + { + if (unlikely(!last_event_info->charset_inited)) /* first Query event */ + { + last_event_info->charset_inited= 1; + last_event_info->charset[0]= ~charset[0]; // force a difference to force write + } + if (unlikely(bcmp(last_event_info->charset, charset, 6))) + { + fprintf(file,"SET " + "@@session.character_set_client=%d," + "@@session.collation_connection=%d," + "@@session.collation_server=%d" + ";\n", + uint2korr(charset), + uint2korr(charset+2), + uint2korr(charset+4)); + memcpy(last_event_info->charset, charset, 6); + } + } + if (time_zone_len) + { + if (bcmp(last_event_info->time_zone_str, time_zone_str, time_zone_len+1)) + { + fprintf(file,"SET @@session.time_zone='%s';\n", time_zone_str); + memcpy(last_event_info->time_zone_str, time_zone_str, time_zone_len+1); + } + } +} + + +void Query_log_event::print(FILE* file, bool short_form, + LAST_EVENT_INFO* last_event_info) +{ + print_query_header(file, short_form, last_event_info); my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); - fprintf(file, ";\n"); + fputs(";\n", file); } #endif /* MYSQL_CLIENT */ @@ -992,37 +1510,116 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) int Query_log_event::exec_event(struct st_relay_log_info* rli) { + return exec_event(rli, query, q_len); +} + + +int Query_log_event::exec_event(struct st_relay_log_info* rli, const char *query_arg, uint32 q_len_arg) +{ int expected_error,actual_error= 0; + /* + Colleagues: please never free(thd->catalog) in MySQL. This would lead to + bugs as here thd->catalog is a part of an alloced block, not an entire + alloced block (see Query_log_event::exec_event()). Same for thd->db. + Thank you. + */ + thd->catalog= catalog_len ? (char *) catalog : (char *)""; thd->db_length= db_len; thd->db= (char*) rewrite_db(db, &thd->db_length); + thd->variables.auto_increment_increment= auto_increment_increment; + thd->variables.auto_increment_offset= auto_increment_offset; /* - InnoDB internally stores the master log position it has processed so far; - position to store is of the END of the current log event. + InnoDB internally stores the master log position it has executed so far, + i.e. the position just after the COMMIT event. + When InnoDB will want to store, the positions in rli won't have + been updated yet, so group_master_log_* will point to old BEGIN + and event_master_log* will point to the beginning of current COMMIT. + But log_pos of the COMMIT Query event is what we want, i.e. the pos of the + END of the current log event (COMMIT). We save it in rli so that InnoDB can + access it. */ -#if MYSQL_VERSION_ID < 50000 - rli->future_group_master_log_pos= log_pos + get_event_len() - - (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); -#else - /* In 5.0 we store the end_log_pos in the relay log so no problem */ rli->future_group_master_log_pos= log_pos; -#endif + DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); + clear_all_errors(thd, rli); if (db_ok(thd->db, replicate_do_db, replicate_ignore_db)) { thd->set_time((time_t)when); - thd->query_length= q_len; - thd->query = (char*)query; + thd->query_length= q_len_arg; + thd->query= (char*)query_arg; VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; + thd->query_id = next_query_id(); VOID(pthread_mutex_unlock(&LOCK_thread_count)); thd->variables.pseudo_thread_id= thread_id; // for temp tables - DBUG_PRINT("query",("%s",thd->query)); + if (ignored_error_code((expected_error= error_code)) || !check_expected_error(thd,rli,expected_error)) - mysql_parse(thd, thd->query, q_len); + { + if (flags2_inited) + /* + all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG must + take their value from flags2. + */ + thd->options= flags2|(thd->options & ~(ulong)OPTIONS_WRITTEN_TO_BIN_LOG); + /* + else, we are in a 3.23/4.0 binlog; we previously received a + Rotate_log_event which reset thd->options and sql_mode etc, so nothing to do. + */ + /* + We do not replicate IGNORE_DIR_IN_CREATE. That is, if the master is a + slave which runs with SQL_MODE=IGNORE_DIR_IN_CREATE, this should not + force us to ignore the dir too. Imagine you are a ring of machines, and + one has a disk problem so that you temporarily need IGNORE_DIR_IN_CREATE + on this machine; you don't want it to propagate elsewhere (you don't want + all slaves to start ignoring the dirs). + */ + if (sql_mode_inited) + thd->variables.sql_mode= + (ulong) ((thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE) | + (sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE)); + if (charset_inited) + { + if (rli->cached_charset_compare(charset)) + { + /* Verify that we support the charsets found in the event. */ + if (!(thd->variables.character_set_client= + get_charset(uint2korr(charset), MYF(MY_WME))) || + !(thd->variables.collation_connection= + get_charset(uint2korr(charset+2), MYF(MY_WME))) || + !(thd->variables.collation_server= + get_charset(uint2korr(charset+4), MYF(MY_WME)))) + { + /* + We updated the thd->variables with nonsensical values (0). Let's + set them to something safe (i.e. which avoids crash), and we'll + stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to + ignore this error). + */ + set_slave_thread_default_charset(thd, rli); + goto compare_errors; + } + thd->update_charset(); // for the charset change to take effect + } + } + if (time_zone_len) + { + String tmp(time_zone_str, time_zone_len, &my_charset_bin); + if (!(thd->variables.time_zone= + my_tz_find_with_opening_tz_tables(thd, &tmp))) + { + my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr()); + thd->variables.time_zone= global_system_variables.time_zone; + goto compare_errors; + } + } + + /* Execute the query (note that we bypass dispatch_command()) */ + mysql_parse(thd, thd->query, thd->query_length); + + } else { /* @@ -1032,7 +1629,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli) we exit gracefully; otherwise we warn about the bad error and tell DBA to check/fix it. */ - if (mysql_test_parse_for_slave(thd, thd->query, q_len)) + if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length)) clear_all_errors(thd, rli); /* Can ignore query */ else { @@ -1052,19 +1649,21 @@ START SLAVE; . Query: '%s'", expected_error, thd->query); if (thd->net.last_errno != ER_SLAVE_IGNORED_TABLE) mysql_log.write(thd,COM_QUERY,"%s",thd->query); - /* +compare_errors: + + /* If we expected a non-zero error code, and we don't get the same error code, and none of them should be ignored. */ DBUG_PRINT("info",("expected_error: %d last_errno: %d", - expected_error, thd->net.last_errno)); + expected_error, thd->net.last_errno)); if ((expected_error != (actual_error= thd->net.last_errno)) && - expected_error && - !ignored_error_code(actual_error) && - !ignored_error_code(expected_error)) + expected_error && + !ignored_error_code(actual_error) && + !ignored_error_code(expected_error)) { slave_print_error(rli, 0, - "\ + "\ Query caused different errors on master and slave. \ Error on master: '%s' (%d), Error on slave: '%s' (%d). \ Default database: '%s'. Query: '%s'", @@ -1072,14 +1671,14 @@ Default database: '%s'. Query: '%s'", expected_error, actual_error ? thd->net.last_error: "no error", actual_error, - print_slave_db_safe(thd->db), query); + print_slave_db_safe(db), query_arg); thd->query_error= 1; } /* If we get the same error code as expected, or they should be ignored. */ else if (expected_error == actual_error || - ignored_error_code(actual_error)) + ignored_error_code(actual_error)) { DBUG_PRINT("info",("error ignored")); clear_all_errors(thd, rli); @@ -1093,14 +1692,46 @@ Default database: '%s'. Query: '%s'", "Error '%s' on query. Default database: '%s'. Query: '%s'", (actual_error ? thd->net.last_error : "unexpected success or fatal error"), - print_slave_db_safe(thd->db), query); + print_slave_db_safe(thd->db), query_arg); thd->query_error= 1; } + + /* + TODO: compare the values of "affected rows" around here. Something + like: + if ((uint32) affected_in_event != (uint32) affected_on_slave) + { + sql_print_error("Slave: did not get the expected number of affected \ + rows running query from master - expected %d, got %d (this numbers \ + should have matched modulo 4294967296).", 0, ...); + thd->query_error = 1; + } + We may also want an option to tell the slave to ignore "affected" + mismatch. This mismatch could be implemented with a new ER_ code, and + to ignore it you would use --slave-skip-errors... + + To do the comparison we need to know the value of "affected" which the + above mysql_parse() computed. And we need to know the value of + "affected" in the master's binlog. Both will be implemented later. The + important thing is that we now have the format ready to log the values + of "affected" in the binlog. So we can release 5.0.0 before effectively + logging "affected" and effectively comparing it. + */ } /* End of if (db_ok(... */ end: VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->db= 0; // prevent db from being freed + /* + Probably we have set thd->query, thd->db, thd->catalog to point to places + in the data_buf of this event. Now the event is going to be deleted + probably, so data_buf will be freed, so the thd->... listed above will be + pointers to freed memory. + So we must set them to 0, so that those bad pointers values are not later + used. Note that "cleanup" queries like automatic DROP TEMPORARY TABLE + don't suffer from these assignments to 0 as DROP TEMPORARY + TABLE uses the db.table syntax. + */ + thd->db= thd->catalog= 0; // prevent db from being freed thd->query= 0; // just to be sure thd->query_length= thd->db_length =0; VOID(pthread_mutex_unlock(&LOCK_thread_count)); @@ -1113,22 +1744,30 @@ end: updating query. */ return (thd->query_error ? thd->query_error : - (thd->one_shot_set ? (rli->inc_event_relay_log_pos(get_event_len()),0) : + (thd->one_shot_set ? (rli->inc_event_relay_log_pos(),0) : Log_event::exec_event(rli))); } #endif /************************************************************************** - Start_log_event methods + Start_log_event_v3 methods **************************************************************************/ +#ifndef MYSQL_CLIENT +Start_log_event_v3::Start_log_event_v3() :Log_event(), binlog_version(BINLOG_VERSION), artificial_event(0) +{ + created= when; + memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); +} +#endif + /* - Start_log_event::pack_info() + Start_log_event_v3::pack_info() */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Start_log_event::pack_info(Protocol *protocol) +void Start_log_event_v3::pack_info(Protocol *protocol) { char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos; pos= strmov(buf, "Server ver: "); @@ -1141,57 +1780,80 @@ void Start_log_event::pack_info(Protocol *protocol) /* - Start_log_event::print() + Start_log_event_v3::print() */ #ifdef MYSQL_CLIENT -void Start_log_event::print(FILE* file, bool short_form, char* last_db) +void Start_log_event_v3::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) { - if (short_form) - return; - - print_header(file); - fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, - server_version); - print_timestamp(file); - if (created) - fprintf(file," at startup"); - fputc('\n', file); + if (!short_form) + { + print_header(file); + fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, + server_version); + print_timestamp(file); + if (created) + fprintf(file," at startup"); + fputc('\n', file); + if (flags & LOG_EVENT_BINLOG_IN_USE_F) + fprintf(file, "# Warning: this binlog was not closed properly. " + "Most probably mysqld crashed writing it.\n"); + } + if (!artificial_event && created) + { +#ifdef WHEN_WE_HAVE_THE_RESET_CONNECTION_SQL_COMMAND + /* + This is for mysqlbinlog: like in replication, we want to delete the stale + tmp files left by an unclean shutdown of mysqld (temporary tables) + and rollback unfinished transaction. + Probably this can be done with RESET CONNECTION (syntax to be defined). + */ + fprintf(file,"RESET CONNECTION;\n"); +#else + fprintf(file,"ROLLBACK;\n"); +#endif + } fflush(file); } #endif /* MYSQL_CLIENT */ /* - Start_log_event::Start_log_event() + Start_log_event_v3::Start_log_event_v3() */ -Start_log_event::Start_log_event(const char* buf, - bool old_format) - :Log_event(buf, old_format) +Start_log_event_v3::Start_log_event_v3(const char* buf, + const Format_description_log_event* description_event) + :Log_event(buf, description_event) { - buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; - binlog_version = uint2korr(buf+ST_BINLOG_VER_OFFSET); + buf+= description_event->common_header_len; + binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET); memcpy(server_version, buf+ST_SERVER_VER_OFFSET, ST_SERVER_VER_LEN); - created = uint4korr(buf+ST_CREATED_OFFSET); + created= uint4korr(buf+ST_CREATED_OFFSET); + /* We use log_pos to mark if this was an artificial event or not */ + artificial_event= (log_pos == 0); } /* - Start_log_event::write_data() + Start_log_event_v3::write() */ -int Start_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool Start_log_event_v3::write(IO_CACHE* file) { - char buff[START_HEADER_LEN]; + char buff[START_V3_HEADER_LEN]; int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); int4store(buff + ST_CREATED_OFFSET,created); - return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0); + return (write_header(file, sizeof(buff)) || + my_b_safe_write(file, (byte*) buff, sizeof(buff))); } +#endif + /* - Start_log_event::exec_event() + Start_log_event_v3::exec_event() The master started @@ -1210,84 +1872,321 @@ int Start_log_event::write_data(IO_CACHE* file) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Start_log_event::exec_event(struct st_relay_log_info* rli) +int Start_log_event_v3::exec_event(struct st_relay_log_info* rli) { - DBUG_ENTER("Start_log_event::exec_event"); - - /* - If the I/O thread has not started, mi->old_format is BINLOG_FORMAT_CURRENT - (that's what the MASTER_INFO constructor does), so the test below is not - perfect at all. - */ - switch (rli->mi->old_format) { - case BINLOG_FORMAT_CURRENT: - /* - This is 4.x, so a Start_log_event is only at master startup, - so we are sure the master has restarted and cleared his temp tables. - */ - close_temporary_tables(thd); - cleanup_load_tmpdir(); + DBUG_ENTER("Start_log_event_v3::exec_event"); + switch (binlog_version) + { + case 3: + case 4: /* - As a transaction NEVER spans on 2 or more binlogs: - if we have an active transaction at this point, the master died while - writing the transaction to the binary log, i.e. while flushing the binlog - cache to the binlog. As the write was started, the transaction had been - committed on the master, so we lack of information to replay this - transaction on the slave; all we can do is stop with error. + This can either be 4.x (then a Start_log_event_v3 is only at master + startup so we are sure the master has restarted and cleared his temp + tables; the event always has 'created'>0) or 5.0 (then we have to test + 'created'). */ - if (thd->options & OPTION_BEGIN) + if (created) { - slave_print_error(rli, 0, "\ -Rolling back unfinished transaction (no COMMIT or ROLLBACK) from relay log. \ -A probable cause is that the master died while writing the transaction to its \ -binary log."); - return(1); + close_temporary_tables(thd); + cleanup_load_tmpdir(); } break; - /* + /* Now the older formats; in that case load_tmpdir is cleaned up by the I/O thread. */ - case BINLOG_FORMAT_323_LESS_57: - /* - Cannot distinguish a Start_log_event generated at master startup and - one generated by master FLUSH LOGS, so cannot be sure temp tables - have to be dropped. So do nothing. - */ - break; - case BINLOG_FORMAT_323_GEQ_57: + case 1: + if (strncmp(rli->relay_log.description_event_for_exec->server_version, + "3.23.57",7) >= 0 && created) + { + /* + Can distinguish, based on the value of 'created': this event was + generated at master startup. + */ + close_temporary_tables(thd); + } /* - Can distinguish, based on the value of 'created', - which was generated at master startup. + Otherwise, can't distinguish a Start_log_event generated at + master startup and one generated by master FLUSH LOGS, so cannot + be sure temp tables have to be dropped. So do nothing. */ - if (created) - close_temporary_tables(thd); break; default: /* this case is impossible */ - return 1; + DBUG_RETURN(1); } - DBUG_RETURN(Log_event::exec_event(rli)); } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ -/************************************************************************** - Load_log_event methods -**************************************************************************/ +/*************************************************************************** + Format_description_log_event methods +****************************************************************************/ /* - Load_log_event::pack_info() + Format_description_log_event 1st ctor. + + SYNOPSIS + Format_description_log_event::Format_description_log_event + binlog_version the binlog version for which we want to build + an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x + x>=2 and 4.1) or 4 (MySQL 5.0). Note that the + old 4.0 (binlog version 2) is not supported; + it should not be used for replication with + 5.0. + + DESCRIPTION + Ctor. Can be used to create the event to write to the binary log (when the + server starts or when FLUSH LOGS), or to create artificial events to parse + binlogs from MySQL 3.23 or 4.x. + When in a client, only the 2nd use is possible. +*/ + +Format_description_log_event:: +Format_description_log_event(uint8 binlog_ver, const char* server_ver) + :Start_log_event_v3() +{ + created= when; + binlog_version= binlog_ver; + switch (binlog_ver) { + case 4: /* MySQL 5.0 */ + memcpy(server_version, ::server_version, ST_SERVER_VER_LEN); + common_header_len= LOG_EVENT_HEADER_LEN; + number_of_event_types= LOG_EVENT_TYPES; + /* we'll catch my_malloc() error in is_valid() */ + post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8), + MYF(MY_ZEROFILL)); + /* + This long list of assignments is not beautiful, but I see no way to + make it nicer, as the right members are #defines, not array members, so + it's impossible to write a loop. + */ + if (post_header_len) + { + post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN; + post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN; + post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN; + post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN; + post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN; + post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN; + post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN; + post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN; + post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1]; + post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN; + post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1]; + post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN; + } + break; + + case 1: /* 3.23 */ + case 3: /* 4.0.x x>=2 */ + /* + We build an artificial (i.e. not sent by the master) event, which + describes what those old master versions send. + */ + if (binlog_ver==1) + strmov(server_version, server_ver ? server_ver : "3.23"); + else + strmov(server_version, server_ver ? server_ver : "4.0"); + common_header_len= binlog_ver==1 ? OLD_HEADER_LEN : + LOG_EVENT_MINIMAL_HEADER_LEN; + /* + The first new event in binlog version 4 is Format_desc. So any event type + after that does not exist in older versions. We use the events known by + version 3, even if version 1 had only a subset of them (this is not a + problem: it uses a few bytes for nothing but unifies code; it does not + make the slave detect less corruptions). + */ + number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1; + post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8), + MYF(0)); + if (post_header_len) + { + post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN; + post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN; + post_header_len[STOP_EVENT-1]= 0; + post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN; + post_header_len[INTVAR_EVENT-1]= 0; + post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN; + post_header_len[SLAVE_EVENT-1]= 0; + post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN; + post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN; + post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN; + post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN; + post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1]; + post_header_len[RAND_EVENT-1]= 0; + post_header_len[USER_VAR_EVENT-1]= 0; + } + break; + default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */ + post_header_len= 0; /* will make is_valid() fail */ + break; + } +} + + +/* + The problem with this constructor is that the fixed header may have a + length different from this version, but we don't know this length as we + have not read the Format_description_log_event which says it, yet. This + length is in the post-header of the event, but we don't know where the + post-header starts. + So this type of event HAS to: + - either have the header's length at the beginning (in the header, at a + fixed position which will never be changed), not in the post-header. That + would make the header be "shifted" compared to other events. + - or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future + versions, so that we know for sure. + I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because + it is sent before Format_description_log_event). +*/ + +Format_description_log_event:: +Format_description_log_event(const char* buf, + uint event_len, + const + Format_description_log_event* + description_event) + :Start_log_event_v3(buf, description_event) +{ + DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)"); + buf+= LOG_EVENT_MINIMAL_HEADER_LEN; + if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN) + DBUG_VOID_RETURN; /* sanity check */ + number_of_event_types= + event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1); + DBUG_PRINT("info", ("common_header_len=%d number_of_event_types=%d", + common_header_len, number_of_event_types)); + /* If alloc fails, we'll detect it in is_valid() */ + post_header_len= (uint8*) my_memdup((byte*)buf+ST_COMMON_HEADER_LEN_OFFSET+1, + number_of_event_types* + sizeof(*post_header_len), MYF(0)); + DBUG_VOID_RETURN; +} + +#ifndef MYSQL_CLIENT +bool Format_description_log_event::write(IO_CACHE* file) +{ + /* + We don't call Start_log_event_v3::write() because this would make 2 + my_b_safe_write(). + */ + byte buff[FORMAT_DESCRIPTION_HEADER_LEN]; + int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version); + memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN); + int4store(buff + ST_CREATED_OFFSET,created); + buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN; + memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (byte*) post_header_len, + LOG_EVENT_TYPES); + return (write_header(file, sizeof(buff)) || + my_b_safe_write(file, buff, sizeof(buff))); +} +#endif + +/* + SYNOPSIS + Format_description_log_event::exec_event() + + IMPLEMENTATION + Save the information which describes the binlog's format, to be able to + read all coming events. + Call Start_log_event_v3::exec_event(). */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -void Load_log_event::pack_info(Protocol *protocol) +int Format_description_log_event::exec_event(struct st_relay_log_info* rli) { - char *buf, *pos; - uint buf_len; + DBUG_ENTER("Format_description_log_event::exec_event"); + + /* save the information describing this binlog */ + delete rli->relay_log.description_event_for_exec; + rli->relay_log.description_event_for_exec= this; - buf_len= +#ifdef USING_TRANSACTIONS + /* + As a transaction NEVER spans on 2 or more binlogs: + if we have an active transaction at this point, the master died + while writing the transaction to the binary log, i.e. while + flushing the binlog cache to the binlog. As the write was started, + the transaction had been committed on the master, so we lack of + information to replay this transaction on the slave; all we can do + is stop with error. + Note: this event could be sent by the master to inform us of the + format of its binlog; in other words maybe it is not at its + original place when it comes to us; we'll know this by checking + log_pos ("artificial" events have log_pos == 0). + */ + if (!artificial_event && created && thd->transaction.all.nht) + { + slave_print_error(rli, 0, "Rolling back unfinished transaction (no " + "COMMIT or ROLLBACK) from relay log. A probable cause " + "is that the master died while writing the transaction " + "to its binary log."); + end_trans(thd, ROLLBACK); + } +#endif + /* + If this event comes from ourselves, there is no cleaning task to perform, + we don't call Start_log_event_v3::exec_event() (this was just to update the + log's description event). + */ + if (server_id == (uint32) ::server_id) + { + /* + Do not modify rli->group_master_log_pos, as this event did not exist on + the master. That is, just update the *relay log* coordinates; this is + done by passing log_pos=0 to inc_group_relay_log_pos, like we do in + Stop_log_event::exec_event(). + If in a transaction, don't touch group_* coordinates. + */ + if (thd->options & OPTION_BEGIN) + rli->inc_event_relay_log_pos(); + else + { + rli->inc_group_relay_log_pos(0); + flush_relay_log_info(rli); + } + DBUG_RETURN(0); + } + + /* + If the event was not requested by the slave i.e. the master sent it while + the slave asked for a position >4, the event will make + rli->group_master_log_pos advance. Say that the slave asked for position + 1000, and the Format_desc event's end is 96. Then in the beginning of + replication rli->group_master_log_pos will be 0, then 96, then jump to + first really asked event (which is >96). So this is ok. + */ + DBUG_RETURN(Start_log_event_v3::exec_event(rli)); +} +#endif + + /************************************************************************** + Load_log_event methods + General note about Load_log_event: the binlogging of LOAD DATA INFILE is + going to be changed in 5.0 (or maybe in 5.1; not decided yet). + However, the 5.0 slave could still have to read such events (from a 4.x + master), convert them (which just means maybe expand the header, when 5.0 + servers have a UID in events) (remember that whatever is after the header + will be like in 4.x, as this event's format is not modified in 5.0 as we + will use new types of events to log the new LOAD DATA INFILE features). + To be able to read/convert, we just need to not assume that the common + header is of length LOG_EVENT_HEADER_LEN (we must use the description + event). + Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE + between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the + positions displayed in SHOW SLAVE STATUS then are fine too). + **************************************************************************/ + +/* + Load_log_event::pack_info() +*/ + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +uint Load_log_event::get_query_buffer_length() +{ + return 5 + db_len + 3 + // "use DB; " 18 + fname_len + 2 + // "LOAD DATA INFILE 'file''" 7 + // LOCAL @@ -1297,14 +2196,18 @@ void Load_log_event::pack_info(Protocol *protocol) 23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'" 12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'" 21 + sql_ex.line_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'" - 19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'" - 15 + 22 + // " IGNORE xxx LINES" + 19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'" + 15 + 22 + // " IGNORE xxx LINES" 3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)" +} - if (!(buf= my_malloc(buf_len, MYF(MY_WME)))) - return; - pos= buf; - if (db && db_len) + +void Load_log_event::print_query(bool need_db, char *buf, + char **end, char **fn_start, char **fn_end) +{ + char *pos= buf; + + if (need_db && db && db_len) { pos= strmov(pos, "use `"); memcpy(pos, db, db_len); @@ -1312,6 +2215,10 @@ void Load_log_event::pack_info(Protocol *protocol) } pos= strmov(pos, "LOAD DATA "); + + if (fn_start) + *fn_start= pos; + if (check_fname_outside_temp_buf()) pos= strmov(pos, "LOCAL "); pos= strmov(pos, "INFILE '"); @@ -1323,7 +2230,12 @@ void Load_log_event::pack_info(Protocol *protocol) else if (sql_ex.opt_flags & IGNORE_FLAG) pos= strmov(pos, " IGNORE "); - pos= strmov(pos ,"INTO TABLE `"); + pos= strmov(pos ,"INTO"); + + if (fn_end) + *fn_end= pos; + + pos= strmov(pos ," TABLE `"); memcpy(pos, table_name, table_name_len); pos+= table_name_len; @@ -1372,17 +2284,30 @@ void Load_log_event::pack_info(Protocol *protocol) *pos++= ')'; } - protocol->store(buf, pos-buf, &my_charset_bin); + *end= pos; +} + + +void Load_log_event::pack_info(Protocol *protocol) +{ + char *buf, *end; + + if (!(buf= my_malloc(get_query_buffer_length(), MYF(MY_WME)))) + return; + print_query(TRUE, buf, &end, 0, 0); + protocol->store(buf, end-buf, &my_charset_bin); my_free(buf, MYF(0)); } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ +#ifndef MYSQL_CLIENT + /* Load_log_event::write_data_header() */ -int Load_log_event::write_data_header(IO_CACHE* file) +bool Load_log_event::write_data_header(IO_CACHE* file) { char buf[LOAD_HEADER_LEN]; int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id); @@ -1391,7 +2316,7 @@ int Load_log_event::write_data_header(IO_CACHE* file) buf[L_TBL_LEN_OFFSET] = (char)table_name_len; buf[L_DB_LEN_OFFSET] = (char)db_len; int4store(buf + L_NUM_FIELDS_OFFSET, num_fields); - return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN); + return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN) != 0; } @@ -1399,7 +2324,7 @@ int Load_log_event::write_data_header(IO_CACHE* file) Load_log_event::write_data_body() */ -int Load_log_event::write_data_body(IO_CACHE* file) +bool Load_log_event::write_data_body(IO_CACHE* file) { if (sql_ex.write_data(file)) return 1; @@ -1419,7 +2344,6 @@ int Load_log_event::write_data_body(IO_CACHE* file) Load_log_event::Load_log_event() */ -#ifndef MYSQL_CLIENT Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex, const char *db_arg, const char *table_name_arg, List<Item> &fields_arg, @@ -1503,6 +2427,7 @@ Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex, } #endif /* !MYSQL_CLIENT */ + /* Load_log_event::Load_log_event() @@ -1511,15 +2436,25 @@ Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex, constructed event. */ -Load_log_event::Load_log_event(const char *buf, int event_len, - bool old_format) - :Log_event(buf, old_format), num_fields(0), fields(0), - field_lens(0), field_block_len(0), +Load_log_event::Load_log_event(const char *buf, uint event_len, + const Format_description_log_event *description_event) + :Log_event(buf, description_event), num_fields(0), fields(0), + field_lens(0),field_block_len(0), table_name(0), db(0), fname(0), local_fname(FALSE) { DBUG_ENTER("Load_log_event"); - if (event_len) // derived class, will call copy_log_event() itself - copy_log_event(buf, event_len, old_format); + /* + I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0, + 4.0->5.0 and 5.0->5.0 and it works. + */ + if (event_len) + copy_log_event(buf, event_len, + ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + LOAD_HEADER_LEN + + description_event->common_header_len : + LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN), + description_event); + /* otherwise it's a derived class, will call copy_log_event() itself */ DBUG_VOID_RETURN; } @@ -1529,14 +2464,14 @@ Load_log_event::Load_log_event(const char *buf, int event_len, */ int Load_log_event::copy_log_event(const char *buf, ulong event_len, - bool old_format) + int body_offset, + const Format_description_log_event *description_event) { + DBUG_ENTER("Load_log_event::copy_log_event"); uint data_len; char* buf_end = (char*)buf + event_len; - uint header_len= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; - const char* data_head = buf + header_len; - DBUG_ENTER("Load_log_event::copy_log_event"); - + /* this is the beginning of the post-header */ + const char* data_head = buf + description_event->common_header_len; slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET); exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET); skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET); @@ -1544,21 +2479,17 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, db_len = (uint)data_head[L_DB_LEN_OFFSET]; num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET); - int body_offset = ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? - LOAD_HEADER_LEN + header_len : - get_data_body_offset()); - if ((int) event_len < body_offset) DBUG_RETURN(1); /* Sql_ex.init() on success returns the pointer to the first byte after the sql_ex structure, which is the start of field lengths array. */ - if (!(field_lens=(uchar*)sql_ex.init((char*)buf + body_offset, - buf_end, - buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) + if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset, + buf_end, + buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) DBUG_RETURN(1); - + data_len = event_len - body_offset; if (num_fields > data_len) // simple sanity check against corruption DBUG_RETURN(1); @@ -1571,6 +2502,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, fname = db + db_len + 1; fname_len = strlen(fname); // null termination is accomplished by the caller doing buf[event_len]=0 + DBUG_RETURN(0); } @@ -1580,13 +2512,13 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, */ #ifdef MYSQL_CLIENT -void Load_log_event::print(FILE* file, bool short_form, char* last_db) +void Load_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) { - print(file, short_form, last_db, 0); + print(file, short_form, last_event_info, 0); } -void Load_log_event::print(FILE* file, bool short_form, char* last_db, +void Load_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info, bool commented) { DBUG_ENTER("Load_log_event::print"); @@ -1598,7 +2530,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db, } bool different_db= 1; - if (db && last_db) + if (db) { /* If the database is different from the one of the previous statement, we @@ -1606,9 +2538,9 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db, But if commented, the "use" is going to be commented so we should not update the last_db. */ - if ((different_db= memcmp(last_db, db, db_len + 1)) && + if ((different_db= memcmp(last_event_info->db, db, db_len + 1)) && !commented) - memcpy(last_db, db, db_len + 1); + memcpy(last_event_info->db, db, db_len + 1); } if (db && db[0] && different_db) @@ -1730,7 +2662,6 @@ void Load_log_event::set_fields(const char* affected_db, int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, bool use_rli_only_for_errors) { - char *load_data_query= 0; thd->db_length= db_len; thd->db= (char*) rewrite_db(db, &thd->db_length); DBUG_ASSERT(thd->query == 0); @@ -1744,15 +2675,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, mysql_init_query(thd, 0, 0); if (!use_rli_only_for_errors) { -#if MYSQL_VERSION_ID < 50000 - rli->future_group_master_log_pos= log_pos + get_event_len() - - (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); -#else + /* Saved for InnoDB, see comment in Query_log_event::exec_event() */ rli->future_group_master_log_pos= log_pos; -#endif + DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); } - - /* + + /* We test replicate_*_db rules. Note that we have already prepared the file to load, even if we are going to ignore and delete it now. So it is possible that we did a lot of disk writes for nothing. In other words, a @@ -1768,7 +2696,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, { thd->set_time((time_t)when); VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query_id = query_id++; + thd->query_id = next_query_id(); VOID(pthread_mutex_unlock(&LOCK_thread_count)); /* Initing thd->row_count is not necessary in theory as this variable has no @@ -1776,12 +2704,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, "data truncated" warning but which is absorbed and never gets to the error log); still we init it to avoid a Valgrind message. */ - mysql_reset_errors(thd); + mysql_reset_errors(thd, 0); TABLE_LIST tables; bzero((char*) &tables,sizeof(tables)); tables.db = thd->db; - tables.alias = tables.real_name = (char*)table_name; + tables.alias = tables.table_name = (char*) table_name; tables.lock_type = TL_WRITE; tables.updating= 1; @@ -1795,21 +2723,30 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, else { char llbuff[22]; + char *end; enum enum_duplicates handle_dup; bool ignore= 0; + char *load_data_query; + /* - Make a simplified LOAD DATA INFILE query, for the information of the - user in SHOW PROCESSLIST. Note that db is known in the 'db' column. + Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST + and written to slave's binlog if binlogging is on. */ - if ((load_data_query= (char *) my_alloca(18 + strlen(fname) + 14 + - strlen(tables.real_name) + 8))) + if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1))) { - thd->query_length= (uint)(strxmov(load_data_query, - "LOAD DATA INFILE '", fname, - "' INTO TABLE `", tables.real_name, - "` <...>", NullS) - load_data_query); - thd->query= load_data_query; + /* + This will set thd->fatal_error in case of OOM. So we surely will notice + that something is wrong. + */ + goto error; } + + print_query(FALSE, load_data_query, &end, (char **)&thd->lex->fname_start, + (char **)&thd->lex->fname_end); + *end= 0; + thd->query_length= end - load_data_query; + thd->query= load_data_query; + if (sql_ex.opt_flags & REPLACE_FLAG) handle_dup= DUP_REPLACE; else if (sql_ex.opt_flags & IGNORE_FLAG) @@ -1855,6 +2792,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, List<Item> field_list; set_fields(thd->db,field_list); thd->variables.pseudo_thread_id= thread_id; + List<Item> set_fields; if (net) { // mysql_load will use thd->net to read the file @@ -1864,9 +2802,13 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, */ thd->net.pkt_nr = net->pkt_nr; } - if (mysql_load(thd, &ex, &tables, field_list, handle_dup, ignore, net != 0, - TL_WRITE)) - thd->query_error = 1; + /* + It is safe to use set_fields twice because we are not going to + update it inside mysql_load(). + */ + if (mysql_load(thd, &ex, &tables, field_list, set_fields, set_fields, + handle_dup, ignore, net != 0)) + thd->query_error= 1; if (thd->cuted_fields) { /* log_pos is the position of the LOAD event in the master log */ @@ -1892,20 +2834,19 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, if (net) skip_load_data_infile(net); } - + +error: thd->net.vio = 0; char *save_db= thd->db; VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->db= 0; + thd->db= thd->catalog= 0; thd->query= 0; thd->query_length= thd->db_length= 0; VOID(pthread_mutex_unlock(&LOCK_thread_count)); close_thread_tables(thd); - if (load_data_query) - my_afree(load_data_query); if (thd->query_error) { - /* this err/sql_errno code is copy-paste from send_error() */ + /* this err/sql_errno code is copy-paste from net_send_error() */ const char *err; int sql_errno; if ((err=thd->net.last_error)[0]) @@ -1963,17 +2904,17 @@ void Rotate_log_event::pack_info(Protocol *protocol) */ #ifdef MYSQL_CLIENT -void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) +void Rotate_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) { char buf[22]; + if (short_form) return; - print_header(file); fprintf(file, "\tRotate to "); if (new_log_ident) my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, - MYF(MY_NABP | MY_WME)); + MYF(MY_NABP | MY_WME)); fprintf(file, " pos: %s", llstr(pos, buf)); fputc('\n', file); fflush(file); @@ -1985,31 +2926,22 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db) Rotate_log_event::Rotate_log_event() */ -Rotate_log_event::Rotate_log_event(const char* buf, int event_len, - bool old_format) - :Log_event(buf, old_format),new_log_ident(NULL),alloced(0) +Rotate_log_event::Rotate_log_event(const char* buf, uint event_len, + const Format_description_log_event* description_event) + :Log_event(buf, description_event) ,new_log_ident(NULL),alloced(0) { + DBUG_ENTER("Rotate_log_event::Rotate_log_event(char*,...)"); // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET - int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; + uint8 header_size= description_event->common_header_len; + uint8 post_header_len= description_event->post_header_len[ROTATE_EVENT-1]; uint ident_offset; - DBUG_ENTER("Rotate_log_event"); - if (event_len < header_size) DBUG_VOID_RETURN; - buf += header_size; - if (old_format) - { - ident_len = (uint)(event_len - OLD_HEADER_LEN); - pos = 4; - ident_offset = 0; - } - else - { - ident_len = (uint)(event_len - ROTATE_EVENT_OVERHEAD); - pos = uint8korr(buf + R_POS_OFFSET); - ident_offset = ROTATE_HEADER_LEN; - } + pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4; + ident_len = (uint)(event_len - + (header_size+post_header_len)); + ident_offset = post_header_len; set_if_smaller(ident_len,FN_REFLEN-1); if (!(new_log_ident= my_strdup_with_length((byte*) buf + ident_offset, @@ -2022,29 +2954,32 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len, /* - Rotate_log_event::write_data() + Rotate_log_event::write() */ -int Rotate_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool Rotate_log_event::write(IO_CACHE* file) { char buf[ROTATE_HEADER_LEN]; int8store(buf + R_POS_OFFSET, pos); - return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || - my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len)); + return (write_header(file, ROTATE_HEADER_LEN + ident_len) || + my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) || + my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len)); } - +#endif /* Rotate_log_event::exec_event() - Got a rotate log even from the master + Got a rotate log event from the master IMPLEMENTATION This is mainly used so that we can later figure out the logname and position for the master. - We can't rotate the slave as this will cause infinitive rotations + We can't rotate the slave's BINlog as this will cause infinitive rotations in a A -> B -> A setup. + The NOTES below is a wrong comment which will disappear when 4.1 is merged. RETURN VALUES 0 ok @@ -2056,7 +2991,7 @@ int Rotate_log_event::exec_event(struct st_relay_log_info* rli) DBUG_ENTER("Rotate_log_event::exec_event"); pthread_mutex_lock(&rli->data_lock); - rli->event_relay_log_pos += get_event_len(); + rli->event_relay_log_pos= my_b_tell(rli->cur_log); /* If we are in a transaction: the only normal case is when the I/O thread was copying a big transaction, then it was stopped and restarted: we have this @@ -2068,15 +3003,31 @@ int Rotate_log_event::exec_event(struct st_relay_log_info* rli) COMMIT or ROLLBACK In that case, we don't want to touch the coordinates which correspond to the beginning of the transaction. + Starting from 5.0.0, there also are some rotates from the slave itself, in + the relay log. */ if (!(thd->options & OPTION_BEGIN)) { memcpy(rli->group_master_log_name, new_log_ident, ident_len+1); rli->notify_group_master_log_name_update(); - rli->group_master_log_pos = pos; - rli->group_relay_log_pos = rli->event_relay_log_pos; - DBUG_PRINT("info", ("group_master_log_pos: %lu", + rli->group_master_log_pos= pos; + rli->group_relay_log_pos= rli->event_relay_log_pos; + DBUG_PRINT("info", ("group_master_log_name: '%s' group_master_log_pos:\ +%lu", + rli->group_master_log_name, (ulong) rli->group_master_log_pos)); + /* + Reset thd->options and sql_mode etc, because this could be the signal of + a master's downgrade from 5.0 to 4.0. + However, no need to reset description_event_for_exec: indeed, if the next + master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next + master is 4.0 then the events are in the slave's format (conversion). + */ + set_slave_thread_options(thd); + set_slave_thread_default_charset(thd, rli); + thd->variables.sql_mode= global_system_variables.sql_mode; + thd->variables.auto_increment_increment= + thd->variables.auto_increment_offset= 1; } pthread_mutex_unlock(&rli->data_lock); pthread_cond_broadcast(&rli->data_cond); @@ -2110,12 +3061,13 @@ void Intvar_log_event::pack_info(Protocol *protocol) Intvar_log_event::Intvar_log_event() */ -Intvar_log_event::Intvar_log_event(const char* buf, bool old_format) - :Log_event(buf, old_format) +Intvar_log_event::Intvar_log_event(const char* buf, + const Format_description_log_event* description_event) + :Log_event(buf, description_event) { - buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; - type = buf[I_TYPE_OFFSET]; - val = uint8korr(buf+I_VAL_OFFSET); + buf+= description_event->common_header_len; + type= buf[I_TYPE_OFFSET]; + val= uint8korr(buf+I_VAL_OFFSET); } @@ -2134,16 +3086,19 @@ const char* Intvar_log_event::get_var_type_name() /* - Intvar_log_event::write_data() + Intvar_log_event::write() */ -int Intvar_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool Intvar_log_event::write(IO_CACHE* file) { - char buf[9]; - buf[I_TYPE_OFFSET] = type; + byte buf[9]; + buf[I_TYPE_OFFSET]= (byte) type; int8store(buf + I_VAL_OFFSET, val); - return my_b_safe_write(file, (byte*) buf, sizeof(buf)); + return (write_header(file, sizeof(buf)) || + my_b_safe_write(file, buf, sizeof(buf))); } +#endif /* @@ -2151,7 +3106,8 @@ int Intvar_log_event::write_data(IO_CACHE* file) */ #ifdef MYSQL_CLIENT -void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) +void Intvar_log_event::print(FILE* file, bool short_form, + LAST_EVENT_INFO* last_event_info) { char llbuff[22]; const char *msg; @@ -2194,7 +3150,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli) thd->next_insert_id = val; break; } - rli->inc_event_relay_log_pos(get_event_len()); + rli->inc_event_relay_log_pos(); return 0; } #endif @@ -2217,26 +3173,30 @@ void Rand_log_event::pack_info(Protocol *protocol) #endif -Rand_log_event::Rand_log_event(const char* buf, bool old_format) - :Log_event(buf, old_format) +Rand_log_event::Rand_log_event(const char* buf, + const Format_description_log_event* description_event) + :Log_event(buf, description_event) { - buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; - seed1 = uint8korr(buf+RAND_SEED1_OFFSET); - seed2 = uint8korr(buf+RAND_SEED2_OFFSET); + buf+= description_event->common_header_len; + seed1= uint8korr(buf+RAND_SEED1_OFFSET); + seed2= uint8korr(buf+RAND_SEED2_OFFSET); } -int Rand_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool Rand_log_event::write(IO_CACHE* file) { - char buf[16]; + byte buf[16]; int8store(buf + RAND_SEED1_OFFSET, seed1); int8store(buf + RAND_SEED2_OFFSET, seed2); - return my_b_safe_write(file, (byte*) buf, sizeof(buf)); + return (write_header(file, sizeof(buf)) || + my_b_safe_write(file, buf, sizeof(buf))); } +#endif #ifdef MYSQL_CLIENT -void Rand_log_event::print(FILE* file, bool short_form, char* last_db) +void Rand_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) { char llbuff[22],llbuff2[22]; if (!short_form) @@ -2256,13 +3216,83 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli) { thd->rand.seed1= (ulong) seed1; thd->rand.seed2= (ulong) seed2; - rli->inc_event_relay_log_pos(get_event_len()); + rli->inc_event_relay_log_pos(); return 0; } #endif /* !MYSQL_CLIENT */ /************************************************************************** + Xid_log_event methods +**************************************************************************/ + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +void Xid_log_event::pack_info(Protocol *protocol) +{ + char buf[128], *pos; + pos= strmov(buf, "COMMIT /* xid="); + pos= longlong10_to_str(xid, pos, 10); + pos= strmov(pos, " */"); + protocol->store(buf, (uint) (pos-buf), &my_charset_bin); +} +#endif + +/* + NOTE it's ok not to use int8store here, + as long as xid_t::set(ulonglong) and + xid_t::get_my_xid doesn't do it either + + we don't care about actual values of xids as long as + identical numbers compare identically +*/ + +Xid_log_event:: +Xid_log_event(const char* buf, + const Format_description_log_event *description_event) + :Log_event(buf, description_event) +{ + buf+= description_event->common_header_len; + memcpy((char*) &xid, buf, sizeof(xid)); +} + + +#ifndef MYSQL_CLIENT +bool Xid_log_event::write(IO_CACHE* file) +{ + return write_header(file, sizeof(xid)) || + my_b_safe_write(file, (byte*) &xid, sizeof(xid)); +} +#endif + + +#ifdef MYSQL_CLIENT +void Xid_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) +{ + if (!short_form) + { + char buf[64]; + longlong10_to_str(xid, buf, 10); + + print_header(file); + fprintf(file, "\tXid = %s\n", buf); + fflush(file); + } + fprintf(file, "COMMIT;\n"); +} +#endif /* MYSQL_CLIENT */ + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +int Xid_log_event::exec_event(struct st_relay_log_info* rli) +{ + /* For a slave Xid_log_event is COMMIT */ + mysql_log.write(thd,COM_QUERY,"COMMIT /* implicit, from Xid_log_event */"); + return end_trans(thd, COMMIT) || Log_event::exec_event(rli); +} +#endif /* !MYSQL_CLIENT */ + + +/************************************************************************** User_var_log_event methods **************************************************************************/ @@ -2293,6 +3323,16 @@ void User_var_log_event::pack_info(Protocol* protocol) buf= my_malloc(val_offset + 22, MYF(MY_WME)); event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf; break; + case DECIMAL_RESULT: + { + buf= my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH, MYF(MY_WME)); + String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin); + my_decimal dec; + binary2my_decimal(E_DEC_FATAL_ERROR, val+2, &dec, val[0], val[1]); + my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str); + event_len= str.length() + val_offset; + break; + } case STRING_RESULT: /* 15 is for 'COLLATE' and other chars */ buf= my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15, MYF(MY_WME)); @@ -2327,10 +3367,12 @@ void User_var_log_event::pack_info(Protocol* protocol) #endif /* !MYSQL_CLIENT */ -User_var_log_event::User_var_log_event(const char* buf, bool old_format) - :Log_event(buf, old_format) +User_var_log_event:: +User_var_log_event(const char* buf, + const Format_description_log_event* description_event) + :Log_event(buf, description_event) { - buf+= (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; + buf+= description_event->common_header_len; name_len= uint4korr(buf); name= (char *) buf + UV_NAME_LEN_SIZE; buf+= UV_NAME_LEN_SIZE + name_len; @@ -2354,13 +3396,15 @@ User_var_log_event::User_var_log_event(const char* buf, bool old_format) } -int User_var_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool User_var_log_event::write(IO_CACHE* file) { char buf[UV_NAME_LEN_SIZE]; char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE]; - char buf2[8], *pos= buf2; + char buf2[max(8, DECIMAL_MAX_FIELD_SIZE + 2)], *pos= buf2; uint buf1_length; + ulong event_length; int4store(buf, name_len); @@ -2373,8 +3417,6 @@ int User_var_log_event::write_data(IO_CACHE* file) { buf1[1]= type; int4store(buf1 + 2, charset_number); - int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len); - buf1_length= 10; switch (type) { case REAL_RESULT: @@ -2383,6 +3425,16 @@ int User_var_log_event::write_data(IO_CACHE* file) case INT_RESULT: int8store(buf2, *(longlong*) val); break; + case DECIMAL_RESULT: + { + my_decimal *dec= (my_decimal *)val; + dec->fix_buffer_pointer(); + buf2[0]= (char)(dec->intg + dec->frac); + buf2[1]= (char)dec->frac; + decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]); + val_len= decimal_bin_size(buf2[0], buf2[1]) + 2; + break; + } case STRING_RESULT: pos= val; break; @@ -2391,12 +3443,20 @@ int User_var_log_event::write_data(IO_CACHE* file) DBUG_ASSERT(1); return 0; } + int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len); + buf1_length= 10; } - return (my_b_safe_write(file, (byte*) buf, sizeof(buf)) || + + /* Length of the whole event */ + event_length= sizeof(buf)+ name_len + buf1_length + val_len; + + return (write_header(file, event_length) || + my_b_safe_write(file, (byte*) buf, sizeof(buf)) || my_b_safe_write(file, (byte*) name, name_len) || my_b_safe_write(file, (byte*) buf1, buf1_length) || my_b_safe_write(file, (byte*) pos, val_len)); } +#endif /* @@ -2404,7 +3464,7 @@ int User_var_log_event::write_data(IO_CACHE* file) */ #ifdef MYSQL_CLIENT -void User_var_log_event::print(FILE* file, bool short_form, char* last_db) +void User_var_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) { if (!short_form) { @@ -2433,6 +3493,23 @@ void User_var_log_event::print(FILE* file, bool short_form, char* last_db) longlong10_to_str(uint8korr(val), int_buf, -10); fprintf(file, ":=%s;\n", int_buf); break; + case DECIMAL_RESULT: + { + char str_buf[200]; + int str_len= sizeof(str_buf) - 1; + int precision= (int)val[0]; + int scale= (int)val[1]; + decimal_digit_t dec_buf[10]; + decimal_t dec; + dec.len= 10; + dec.buf= dec_buf; + + bin2decimal(val+2, &dec, precision, scale); + decimal2string(&dec, str_buf, &str_len, 0, 0, 0); + str_buf[str_len]= 0; + fprintf(file, ":=%s;\n",str_buf); + break; + } case STRING_RESULT: { /* @@ -2509,7 +3586,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) switch (type) { case REAL_RESULT: float8get(real_val, val); - it= new Item_real(real_val); + it= new Item_float(real_val); val= (char*) &real_val; // Pointer to value in native format val_len= 8; break; @@ -2519,6 +3596,14 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) val= (char*) &int_val; // Pointer to value in native format val_len= 8; break; + case DECIMAL_RESULT: + { + Item_decimal *dec= new Item_decimal(val+2, val[0], val[1]); + it= dec; + val= (char *)dec->val_decimal(NULL); + val_len= sizeof(my_decimal); + break; + } case STRING_RESULT: it= new Item_string(val, val_len, charset); break; @@ -2542,7 +3627,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT); free_root(thd->mem_root,0); - rli->inc_event_relay_log_pos(get_event_len()); + rli->inc_event_relay_log_pos(); return 0; } #endif /* !MYSQL_CLIENT */ @@ -2554,7 +3639,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) #ifdef HAVE_REPLICATION #ifdef MYSQL_CLIENT -void Unknown_log_event::print(FILE* file, bool short_form, char* last_db) +void Unknown_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) { if (short_form) return; @@ -2584,12 +3669,12 @@ void Slave_log_event::pack_info(Protocol *protocol) #ifndef MYSQL_CLIENT Slave_log_event::Slave_log_event(THD* thd_arg, struct st_relay_log_info* rli) - :Log_event(thd_arg, 0, 0), mem_pool(0), master_host(0) + :Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0) { DBUG_ENTER("Slave_log_event"); if (!rli->inited) // QQ When can this happen ? DBUG_VOID_RETURN; - + MASTER_INFO* mi = rli->mi; // TODO: re-write this better without holding both locks at the same time pthread_mutex_lock(&mi->data_lock); @@ -2625,7 +3710,7 @@ Slave_log_event::~Slave_log_event() #ifdef MYSQL_CLIENT -void Slave_log_event::print(FILE* file, bool short_form, char* last_db) +void Slave_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) { char llbuff[22]; if (short_form) @@ -2645,13 +3730,18 @@ int Slave_log_event::get_data_size() } -int Slave_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool Slave_log_event::write(IO_CACHE* file) { + ulong event_length= get_data_size(); int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos); int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port); // log and host are already there - return my_b_safe_write(file, (byte*)mem_pool, get_data_size()); + + return (write_header(file, event_length) || + my_b_safe_write(file, (byte*) mem_pool, event_length)); } +#endif void Slave_log_event::init_from_mem_pool(int data_size) @@ -2671,12 +3761,13 @@ void Slave_log_event::init_from_mem_pool(int data_size) } -Slave_log_event::Slave_log_event(const char* buf, int event_len) - :Log_event(buf,0),mem_pool(0),master_host(0) +/* This code is not used, so has not been updated to be format-tolerant */ +Slave_log_event::Slave_log_event(const char* buf, uint event_len) + :Log_event(buf,0) /*unused event*/ ,mem_pool(0),master_host(0) { - event_len -= LOG_EVENT_HEADER_LEN; - if (event_len < 0) + if (event_len < LOG_EVENT_HEADER_LEN) return; + event_len -= LOG_EVENT_HEADER_LEN; if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME)))) return; memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len); @@ -2704,7 +3795,7 @@ int Slave_log_event::exec_event(struct st_relay_log_info* rli) */ #ifdef MYSQL_CLIENT -void Stop_log_event::print(FILE* file, bool short_form, char* last_db) +void Stop_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info) { if (short_form) return; @@ -2719,14 +3810,14 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db) /* Stop_log_event::exec_event() - The master stopped. + The master stopped. We used to clean up all temporary tables but this is useless as, as the - master has shut down properly, it has written all DROP TEMPORARY TABLE and DO - RELEASE_LOCK (prepared statements' deletion is TODO). + master has shut down properly, it has written all DROP TEMPORARY TABLE + (prepared statements' deletion is TODO only when we binlog prep stmts). We used to clean up slave_load_tmpdir, but this is useless as it has been cleared at the end of LOAD DATA INFILE. So we have nothing to do here. - The place were we must do this cleaning is in Start_log_event::exec_event(), + The place were we must do this cleaning is in Start_log_event_v3::exec_event(), not here. Because if we come here, the master was sane. */ @@ -2740,8 +3831,13 @@ int Stop_log_event::exec_event(struct st_relay_log_info* rli) could give false triggers in MASTER_POS_WAIT() that we have reached the target position when in fact we have not. */ - rli->inc_group_relay_log_pos(get_event_len(), 0); - flush_relay_log_info(rli); + if (thd->options & OPTION_BEGIN) + rli->inc_event_relay_log_pos(); + else + { + rli->inc_group_relay_log_pos(0); + flush_relay_log_info(rli); + } return 0; } #endif /* !MYSQL_CLIENT */ @@ -2772,20 +3868,19 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex, sql_ex.force_new_format(); DBUG_VOID_RETURN; } -#endif /* !MYSQL_CLIENT */ /* Create_file_log_event::write_data_body() */ -int Create_file_log_event::write_data_body(IO_CACHE* file) +bool Create_file_log_event::write_data_body(IO_CACHE* file) { - int res; - if ((res = Load_log_event::write_data_body(file)) || fake_base) + bool res; + if ((res= Load_log_event::write_data_body(file)) || fake_base) return res; return (my_b_safe_write(file, (byte*) "", 1) || - my_b_safe_write(file, (byte*) block, block_len)); + my_b_safe_write(file, (byte*) block, block_len)); } @@ -2793,14 +3888,14 @@ int Create_file_log_event::write_data_body(IO_CACHE* file) Create_file_log_event::write_data_header() */ -int Create_file_log_event::write_data_header(IO_CACHE* file) +bool Create_file_log_event::write_data_header(IO_CACHE* file) { - int res; - if ((res = Load_log_event::write_data_header(file)) || fake_base) - return res; + bool res; byte buf[CREATE_FILE_HEADER_LEN]; + if ((res= Load_log_event::write_data_header(file)) || fake_base) + return res; int4store(buf + CF_FILE_ID_OFFSET, file_id); - return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN); + return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0; } @@ -2808,42 +3903,58 @@ int Create_file_log_event::write_data_header(IO_CACHE* file) Create_file_log_event::write_base() */ -int Create_file_log_event::write_base(IO_CACHE* file) +bool Create_file_log_event::write_base(IO_CACHE* file) { - int res; - fake_base = 1; // pretend we are Load event - res = write(file); - fake_base = 0; + bool res; + fake_base= 1; // pretend we are Load event + res= write(file); + fake_base= 0; return res; } +#endif /* !MYSQL_CLIENT */ /* Create_file_log_event ctor */ -Create_file_log_event::Create_file_log_event(const char* buf, int len, - bool old_format) - :Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0) +Create_file_log_event::Create_file_log_event(const char* buf, uint len, + const Format_description_log_event* description_event) + :Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0) { - int block_offset; - DBUG_ENTER("Create_file_log_event"); - - /* - We must make copy of 'buf' as this event may have to live over a - rotate log entry when used in mysqlbinlog - */ + DBUG_ENTER("Create_file_log_event::Create_file_log_event(char*,...)"); + uint block_offset; + uint header_len= description_event->common_header_len; + uint8 load_header_len= description_event->post_header_len[LOAD_EVENT-1]; + uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1]; if (!(event_buf= my_memdup((byte*) buf, len, MYF(MY_WME))) || - (copy_log_event(event_buf, len, old_format))) + copy_log_event(event_buf,len, + ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + load_header_len + header_len : + (fake_base ? (header_len+load_header_len) : + (header_len+load_header_len) + + create_file_header_len)), + description_event)) DBUG_VOID_RETURN; - - if (!old_format) + if (description_event->binlog_version!=1) { - file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + - + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET); - // + 1 for \0 terminating fname - block_offset = (LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() + - CREATE_FILE_HEADER_LEN + 1); + file_id= uint4korr(buf + + header_len + + load_header_len + CF_FILE_ID_OFFSET); + /* + Note that it's ok to use get_data_size() below, because it is computed + with values we have already read from this event (because we called + copy_log_event()); we are not using slave's format info to decode + master's format, we are really using master's format info. + Anyway, both formats should be identical (except the common_header_len) + as these Load events are not changed between 4.0 and 5.0 (as logging of + LOAD DATA INFILE does not use Load_log_event in 5.0). + + The + 1 is for \0 terminating fname + */ + block_offset= (description_event->common_header_len + + Load_log_event::get_data_size() + + create_file_header_len + 1); if (len < block_offset) return; block = (char*)buf + block_offset; @@ -2864,18 +3975,18 @@ Create_file_log_event::Create_file_log_event(const char* buf, int len, #ifdef MYSQL_CLIENT void Create_file_log_event::print(FILE* file, bool short_form, - char* last_db, bool enable_local) + LAST_EVENT_INFO* last_event_info, bool enable_local) { if (short_form) { if (enable_local && check_fname_outside_temp_buf()) - Load_log_event::print(file, 1, last_db); + Load_log_event::print(file, 1, last_event_info); return; } if (enable_local) { - Load_log_event::print(file, short_form, last_db, !check_fname_outside_temp_buf()); + Load_log_event::print(file, short_form, last_event_info, !check_fname_outside_temp_buf()); /* That one is for "file_id: etc" below: in mysqlbinlog we want the #, in SHOW BINLOG EVENTS we don't. @@ -2888,9 +3999,9 @@ void Create_file_log_event::print(FILE* file, bool short_form, void Create_file_log_event::print(FILE* file, bool short_form, - char* last_db) + LAST_EVENT_INFO* last_event_info) { - print(file,short_form,last_db,0); + print(file,short_form,last_event_info,0); } #endif /* MYSQL_CLIENT */ @@ -3011,30 +4122,38 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, const char* db_arg, Append_block_log_event ctor */ -Append_block_log_event::Append_block_log_event(const char* buf, int len) - :Log_event(buf, 0),block(0) +Append_block_log_event::Append_block_log_event(const char* buf, uint len, + const Format_description_log_event* description_event) + :Log_event(buf, description_event),block(0) { - DBUG_ENTER("Append_block_log_event"); - if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) + DBUG_ENTER("Append_block_log_event::Append_block_log_event(char*,...)"); + uint8 common_header_len= description_event->common_header_len; + uint8 append_block_header_len= + description_event->post_header_len[APPEND_BLOCK_EVENT-1]; + uint total_header_len= common_header_len+append_block_header_len; + if (len < total_header_len) DBUG_VOID_RETURN; - file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); - block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD; - block_len = len - APPEND_BLOCK_EVENT_OVERHEAD; + file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET); + block= (char*)buf + total_header_len; + block_len= len - total_header_len; DBUG_VOID_RETURN; } /* - Append_block_log_event::write_data() + Append_block_log_event::write() */ -int Append_block_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool Append_block_log_event::write(IO_CACHE* file) { byte buf[APPEND_BLOCK_HEADER_LEN]; int4store(buf + AB_FILE_ID_OFFSET, file_id); - return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) || + return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) || + my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) || my_b_safe_write(file, (byte*) block, block_len)); } +#endif /* @@ -3043,14 +4162,14 @@ int Append_block_log_event::write_data(IO_CACHE* file) #ifdef MYSQL_CLIENT void Append_block_log_event::print(FILE* file, bool short_form, - char* last_db) + LAST_EVENT_INFO* last_event_info) { if (short_form) return; print_header(file); fputc('\n', file); - fprintf(file, "#Append_block: file_id: %d block_len: %d\n", - file_id, block_len); + fprintf(file, "#%s: file_id: %d block_len: %d\n", + get_type_str(), file_id, block_len); } #endif /* MYSQL_CLIENT */ @@ -3069,14 +4188,21 @@ void Append_block_log_event::pack_info(Protocol *protocol) block_len)); protocol->store(buf, length, &my_charset_bin); } -#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ /* + Append_block_log_event::get_create_or_append() +*/ + +int Append_block_log_event::get_create_or_append() const +{ + return 0; /* append to the file, fail if not exists */ +} + +/* Append_block_log_event::exec_event() */ -#if defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT) int Append_block_log_event::exec_event(struct st_relay_log_info* rli) { char proc_info[17+FN_REFLEN+10], *fname= proc_info+17; @@ -3088,14 +4214,32 @@ int Append_block_log_event::exec_event(struct st_relay_log_info* rli) memcpy(p, ".data", 6); strnmov(proc_info, "Making temp file ", 17); // no end 0 thd->proc_info= proc_info; - if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY|O_NOFOLLOW, MYF(MY_WME))) < 0) + if (get_create_or_append()) { - slave_print_error(rli,my_errno, "Error in Append_block event: could not open file '%s'", fname); + my_delete(fname, MYF(0)); // old copy may exist already + if ((fd= my_create(fname, CREATE_MODE, + O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW, + MYF(MY_WME))) < 0) + { + slave_print_error(rli, my_errno, + "Error in %s event: could not create file '%s'", + get_type_str(), fname); + goto err; + } + } + else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW, + MYF(MY_WME))) < 0) + { + slave_print_error(rli, my_errno, + "Error in %s event: could not open file '%s'", + get_type_str(), fname); goto err; } if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP))) { - slave_print_error(rli,my_errno, "Error in Append_block event: write to '%s' failed", fname); + slave_print_error(rli, my_errno, + "Error in %s event: write to '%s' failed", + get_type_str(), fname); goto err; } error=0; @@ -3129,25 +4273,31 @@ Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg, Delete_file_log_event ctor */ -Delete_file_log_event::Delete_file_log_event(const char* buf, int len) - :Log_event(buf, 0),file_id(0) +Delete_file_log_event::Delete_file_log_event(const char* buf, uint len, + const Format_description_log_event* description_event) + :Log_event(buf, description_event),file_id(0) { - if ((uint)len < DELETE_FILE_EVENT_OVERHEAD) + uint8 common_header_len= description_event->common_header_len; + uint8 delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1]; + if (len < (uint)(common_header_len + delete_file_header_len)) return; - file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); + file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET); } /* - Delete_file_log_event::write_data() + Delete_file_log_event::write() */ -int Delete_file_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool Delete_file_log_event::write(IO_CACHE* file) { byte buf[DELETE_FILE_HEADER_LEN]; int4store(buf + DF_FILE_ID_OFFSET, file_id); - return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN); + return (write_header(file, sizeof(buf)) || + my_b_safe_write(file, buf, sizeof(buf))); } +#endif /* @@ -3156,7 +4306,7 @@ int Delete_file_log_event::write_data(IO_CACHE* file) #ifdef MYSQL_CLIENT void Delete_file_log_event::print(FILE* file, bool short_form, - char* last_db) + LAST_EVENT_INFO* last_event_info) { if (short_form) return; @@ -3219,25 +4369,31 @@ Execute_load_log_event::Execute_load_log_event(THD *thd_arg, const char* db_arg, Execute_load_log_event ctor */ -Execute_load_log_event::Execute_load_log_event(const char* buf, int len) - :Log_event(buf, 0), file_id(0) +Execute_load_log_event::Execute_load_log_event(const char* buf, uint len, + const Format_description_log_event* description_event) + :Log_event(buf, description_event), file_id(0) { - if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD) + uint8 common_header_len= description_event->common_header_len; + uint8 exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1]; + if (len < (uint)(common_header_len+exec_load_header_len)) return; - file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET); + file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET); } /* - Execute_load_log_event::write_data() + Execute_load_log_event::write() */ -int Execute_load_log_event::write_data(IO_CACHE* file) +#ifndef MYSQL_CLIENT +bool Execute_load_log_event::write(IO_CACHE* file) { byte buf[EXEC_LOAD_HEADER_LEN]; int4store(buf + EL_FILE_ID_OFFSET, file_id); - return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN); + return (write_header(file, sizeof(buf)) || + my_b_safe_write(file, buf, sizeof(buf))); } +#endif /* @@ -3246,7 +4402,7 @@ int Execute_load_log_event::write_data(IO_CACHE* file) #ifdef MYSQL_CLIENT void Execute_load_log_event::print(FILE* file, bool short_form, - char* last_db) + LAST_EVENT_INFO* last_event_info) { if (short_form) return; @@ -3285,7 +4441,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) Load_log_event* lev = 0; memcpy(p, ".info", 6); - if ((fd = my_open(fname, O_RDONLY|O_BINARY|O_NOFOLLOW, MYF(MY_WME))) < 0 || + if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW, + MYF(MY_WME))) < 0 || init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0, MYF(MY_WME|MY_NABP))) { @@ -3293,8 +4450,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) goto err; } if (!(lev = (Load_log_event*)Log_event::read_log_event(&file, - (pthread_mutex_t*)0, - (bool)0)) || + (pthread_mutex_t*)0, + rli->relay_log.description_event_for_exec)) || lev->get_type_code() != NEW_LOAD_EVENT) { slave_print_error(rli,0, "Error in Exec_load event: file '%s' appears corrupted", fname); @@ -3309,15 +4466,7 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) mysql_load()). */ -#if MYSQL_VERSION_ID < 40100 - rli->future_master_log_pos= log_pos + get_event_len() - - (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); -#elif MYSQL_VERSION_ID < 50000 - rli->future_group_master_log_pos= log_pos + get_event_len() - - (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); -#else - rli->future_group_master_log_pos= log_pos; -#endif + rli->future_group_master_log_pos= log_pos; if (lev->exec_event(0,rli,1)) { /* @@ -3368,6 +4517,223 @@ err: /************************************************************************** + Begin_load_query_log_event methods +**************************************************************************/ + +#ifndef MYSQL_CLIENT +Begin_load_query_log_event:: +Begin_load_query_log_event(THD* thd_arg, const char* db_arg, char* block_arg, + uint block_len_arg, bool using_trans) + :Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg, + using_trans) +{ + file_id= thd_arg->file_id= mysql_bin_log.next_file_id(); +} +#endif + + +Begin_load_query_log_event:: +Begin_load_query_log_event(const char* buf, uint len, + const Format_description_log_event* desc_event) + :Append_block_log_event(buf, len, desc_event) +{ +} + + +#if defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +int Begin_load_query_log_event::get_create_or_append() const +{ + return 1; /* create the file */ +} +#endif /* defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ + + +/************************************************************************** + Execute_load_query_log_event methods +**************************************************************************/ + + +#ifndef MYSQL_CLIENT +Execute_load_query_log_event:: +Execute_load_query_log_event(THD* thd_arg, const char* query_arg, + ulong query_length_arg, uint fn_pos_start_arg, + uint fn_pos_end_arg, + enum_load_dup_handling dup_handling_arg, + bool using_trans, bool suppress_use): + Query_log_event(thd_arg, query_arg, query_length_arg, using_trans, + suppress_use), + file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg), + fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg) +{ +} +#endif /* !MYSQL_CLIENT */ + + +Execute_load_query_log_event:: +Execute_load_query_log_event(const char* buf, uint event_len, + const Format_description_log_event* desc_event): + Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT), + file_id(0), fn_pos_start(0), fn_pos_end(0) +{ + if (!Query_log_event::is_valid()) + return; + + buf+= desc_event->common_header_len; + + fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET); + fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET); + dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET)); + + if (fn_pos_start > q_len || fn_pos_end > q_len || + dup_handling > LOAD_DUP_REPLACE) + return; + + file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET); +} + + +ulong Execute_load_query_log_event::get_post_header_size_for_derived() +{ + return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN; +} + + +#ifndef MYSQL_CLIENT +bool +Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file) +{ + char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN]; + int4store(buf, file_id); + int4store(buf + 4, fn_pos_start); + int4store(buf + 4 + 4, fn_pos_end); + *(buf + 4 + 4 + 4)= (char)dup_handling; + return my_b_safe_write(file, (byte*) buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); +} +#endif + + +#ifdef MYSQL_CLIENT +void Execute_load_query_log_event::print(FILE* file, bool short_form, + LAST_EVENT_INFO* last_event_info) +{ + print(file, short_form, last_event_info, 0); +} + + +void Execute_load_query_log_event::print(FILE* file, bool short_form, + LAST_EVENT_INFO* last_event_info, + const char *local_fname) +{ + print_query_header(file, short_form, last_event_info); + + if (local_fname) + { + my_fwrite(file, (byte*) query, fn_pos_start, MYF(MY_NABP | MY_WME)); + fprintf(file, " LOCAL INFILE \'"); + fprintf(file, local_fname); + fprintf(file, "\'"); + if (dup_handling == LOAD_DUP_REPLACE) + fprintf(file, " REPLACE"); + fprintf(file, " INTO"); + my_fwrite(file, (byte*) query + fn_pos_end, q_len-fn_pos_end, + MYF(MY_NABP | MY_WME)); + fprintf(file, ";\n"); + } + else + { + my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); + fprintf(file, ";\n"); + } + + if (!short_form) + fprintf(file, "# file_id: %d \n", file_id); +} +#endif + + +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +void Execute_load_query_log_event::pack_info(Protocol *protocol) +{ + char *buf, *pos; + if (!(buf= my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME)))) + return; + pos= buf; + if (db && db_len) + { + pos= strmov(buf, "use `"); + memcpy(pos, db, db_len); + pos= strmov(pos+db_len, "`; "); + } + if (query && q_len) + { + memcpy(pos, query, q_len); + pos+= q_len; + } + pos= strmov(pos, " ;file_id="); + pos= int10_to_str((long) file_id, pos, 10); + protocol->store(buf, pos-buf, &my_charset_bin); + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); +} + + +int +Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli) +{ + char *p; + char *buf; + char *fname; + char *fname_end; + int error; + + /* Replace filename and LOCAL keyword in query before executing it */ + if (!(buf = my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) + + (FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME)))) + { + slave_print_error(rli, my_errno, "Not enough memory"); + return 1; + } + + p= buf; + memcpy(p, query, fn_pos_start); + p+= fn_pos_start; + fname= (p= strmake(p, " INFILE \'", 9)); + p= slave_load_file_stem(p, file_id, server_id); + fname_end= (p= strmake(p, ".data", 5)); + *(p++)='\''; + switch (dup_handling) + { + case LOAD_DUP_IGNORE: + p= strmake(p, " IGNORE", 7); + break; + case LOAD_DUP_REPLACE: + p= strmake(p, " REPLACE", 8); + break; + default: + /* Ordinary load data */ + break; + } + p= strmake(p, " INTO", 5); + p= strmake(p, query+fn_pos_end, q_len-fn_pos_end); + + error= Query_log_event::exec_event(rli, buf, p-buf); + + /* Forging file name for deletion in same buffer */ + *fname_end= 0; + + /* + If there was an error the slave is going to stop, leave the + file so that we can re-execute this event at START SLAVE. + */ + if (!error) + (void) my_delete(fname, MYF(MY_WME)); + + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); + return error; +} +#endif + + +/************************************************************************** sql_ex_info methods **************************************************************************/ @@ -3375,15 +4741,15 @@ err: sql_ex_info::write_data() */ -int sql_ex_info::write_data(IO_CACHE* file) +bool sql_ex_info::write_data(IO_CACHE* file) { if (new_format()) { - return (write_str(file, field_term, field_term_len) || - write_str(file, enclosed, enclosed_len) || - write_str(file, line_term, line_term_len) || - write_str(file, line_start, line_start_len) || - write_str(file, escaped, escaped_len) || + return (write_str(file, field_term, (uint) field_term_len) || + write_str(file, enclosed, (uint) enclosed_len) || + write_str(file, line_term, (uint) line_term_len) || + write_str(file, line_start, (uint) line_start_len) || + write_str(file, escaped, (uint) escaped_len) || my_b_safe_write(file,(byte*) &opt_flags,1)); } else @@ -3396,7 +4762,7 @@ int sql_ex_info::write_data(IO_CACHE* file) old_ex.escaped= *escaped; old_ex.opt_flags= opt_flags; old_ex.empty_flags=empty_flags; - return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex)); + return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex)) != 0; } } @@ -3418,11 +4784,11 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format) the case when we have old format because we will be reusing net buffer to read the actual file before we write out the Create_file event. */ - if (read_str(buf, buf_end, field_term, field_term_len) || - read_str(buf, buf_end, enclosed, enclosed_len) || - read_str(buf, buf_end, line_term, line_term_len) || - read_str(buf, buf_end, line_start, line_start_len) || - read_str(buf, buf_end, escaped, escaped_len)) + if (read_str(&buf, buf_end, &field_term, &field_term_len) || + read_str(&buf, buf_end, &enclosed, &enclosed_len) || + read_str(&buf, buf_end, &line_term, &line_term_len) || + read_str(&buf, buf_end, &line_start, &line_start_len) || + read_str(&buf, buf_end, &escaped, &escaped_len)) return 0; opt_flags = *buf++; } |