summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc2478
1 files changed, 1922 insertions, 556 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 9701ef2ff00..d4225b39704 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -183,10 +183,12 @@ static void cleanup_load_tmpdir()
write_str()
*/
-static bool write_str(IO_CACHE *file, char *str, byte length)
+static bool write_str(IO_CACHE *file, char *str, uint length)
{
- return (my_b_safe_write(file, &length, 1) ||
- my_b_safe_write(file, (byte*) str, (int) length));
+ byte tmp[1];
+ tmp[0]= (byte) length;
+ return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
+ my_b_safe_write(file, (byte*) str, length));
}
@@ -194,17 +196,18 @@ static bool write_str(IO_CACHE *file, char *str, byte length)
read_str()
*/
-static inline int read_str(char * &buf, char *buf_end, char * &str,
- uint8 &len)
+static inline int read_str(char **buf, char *buf_end, char **str,
+ uint8 *len)
{
- if (buf + (uint) (uchar) *buf >= buf_end)
+ if (*buf + ((uint) (uchar) **buf) >= buf_end)
return 1;
- len = (uint8) *buf;
- str= buf+1;
- buf+= (uint) len+1;
+ *len= (uint8) **buf;
+ *str= (*buf)+1;
+ (*buf)+= (uint) *len+1;
return 0;
}
+
/*
Transforms a string into "" or its expression in 0x... form.
*/
@@ -228,9 +231,25 @@ static char *str_to_hex(char *to, char *from, uint len)
return p; // pointer to end 0 of 'to'
}
+/*
+ Prints a "session_var=value" string. Used by mysqlbinlog to print some SET
+ commands just before it prints a query.
+*/
+
+static void print_set_option(FILE* file, uint32 bits_changed, uint32 option,
+ uint32 flags, const char* name, bool* need_comma)
+{
+ if (bits_changed & option)
+ {
+ if (*need_comma)
+ fprintf(file,", ");
+ fprintf(file,"%s=%d", name, (bool)(flags & option));
+ *need_comma= 1;
+ }
+}
/**************************************************************************
- Log_event methods
+ Log_event methods (= the parent class of all events)
**************************************************************************/
/*
@@ -240,7 +259,7 @@ static char *str_to_hex(char *to, char *from, uint len)
const char* Log_event::get_type_str()
{
switch(get_type_code()) {
- case START_EVENT: return "Start";
+ case START_EVENT_V3: return "Start_v3";
case STOP_EVENT: return "Stop";
case QUERY_EVENT: return "Query";
case ROTATE_EVENT: return "Rotate";
@@ -253,8 +272,12 @@ const char* Log_event::get_type_str()
case DELETE_FILE_EVENT: return "Delete_file";
case EXEC_LOAD_EVENT: return "Exec_load";
case RAND_EVENT: return "RAND";
+ case XID_EVENT: return "Xid";
case USER_VAR_EVENT: return "User var";
- default: return "Unknown"; /* impossible */
+ case FORMAT_DESCRIPTION_EVENT: return "Format_desc";
+ case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
+ case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
+ default: return "Unknown"; /* impossible */
}
}
@@ -265,25 +288,23 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
- :log_pos(0), temp_buf(0), exec_time(0), cached_event_len(0),
- flags(flags_arg), thd(thd_arg)
+ :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
{
server_id= thd->server_id;
when= thd->start_time;
- cache_stmt= (using_trans &&
- (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)));
+ cache_stmt= using_trans;
}
/*
- This minimal constructor is for when you are not even sure that there is a
- valid THD. For example in the server when we are shutting down or flushing
- logs after receiving a SIGHUP (then we must write a Rotate to the binlog but
- we have no THD, so we need this minimal constructor).
+ This minimal constructor is for when you are not even sure that there
+ is a valid THD. For example in the server when we are shutting down or
+ flushing logs after receiving a SIGHUP (then we must write a Rotate to
+ the binlog but we have no THD, so we need this minimal constructor).
*/
Log_event::Log_event()
- :temp_buf(0), exec_time(0), cached_event_len(0), flags(0), cache_stmt(0),
+ :temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
thd(0)
{
server_id= ::server_id;
@@ -297,24 +318,71 @@ Log_event::Log_event()
Log_event::Log_event()
*/
-Log_event::Log_event(const char* buf, bool old_format)
- :temp_buf(0), cached_event_len(0), cache_stmt(0)
+Log_event::Log_event(const char* buf,
+ const Format_description_log_event* description_event)
+ :temp_buf(0), cache_stmt(0)
{
+#ifndef MYSQL_CLIENT
+ thd = 0;
+#endif
when = uint4korr(buf);
server_id = uint4korr(buf + SERVER_ID_OFFSET);
- if (old_format)
+ if (description_event->binlog_version==1)
{
- log_pos=0;
- flags=0;
+ log_pos= 0;
+ flags= 0;
+ return;
}
- else
+ /* 4.0 or newer */
+ log_pos= uint4korr(buf + LOG_POS_OFFSET);
+ /*
+ If the log is 4.0 (so here it can only be a 4.0 relay log read by
+ the SQL thread or a 4.0 master binlog read by the I/O thread),
+ log_pos is the beginning of the event: we transform it into the end
+ of the event, which is more useful.
+ But how do you know that the log is 4.0: you know it if
+ description_event is version 3 *and* you are not reading a
+ Format_desc (remember that mysqlbinlog starts by assuming that 5.0
+ logs are in 4.0 format, until it finds a Format_desc).
+ */
+ if (description_event->binlog_version==3 &&
+ buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
{
- log_pos = uint4korr(buf + LOG_POS_OFFSET);
- flags = uint2korr(buf + FLAGS_OFFSET);
+ /*
+ If log_pos=0, don't change it. log_pos==0 is a marker to mean
+ "don't change rli->group_master_log_pos" (see
+ inc_group_relay_log_pos()). As it is unreal log_pos, adding the
+ event len's is nonsense. For example, a fake Rotate event should
+ not have its log_pos (which is 0) changed or it will modify
+ Exec_master_log_pos in SHOW SLAVE STATUS, displaying a nonsense
+ value of (a non-zero offset which does not exist in the master's
+ binlog, so which will cause problems if the user uses this value
+ in CHANGE MASTER).
+ */
+ log_pos+= uint4korr(buf + EVENT_LEN_OFFSET);
}
-#ifndef MYSQL_CLIENT
- thd = 0;
-#endif
+ DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
+
+ flags= uint2korr(buf + FLAGS_OFFSET);
+ if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
+ (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
+ {
+ /*
+ These events always have a header which stops here (i.e. their
+ header is FROZEN).
+ */
+ /*
+ Initialization to zero of all other Log_event members as they're
+ not specified. Currently there are no such members; in the future
+ there will be an event UID (but Format_description and Rotate
+ don't need this UID, as they are not propagated through
+ --log-slave-updates (remember the UID is used to not play a query
+ twice when you have two masters which are slaves of a 3rd master).
+ Then we are done.
+ */
+ return;
+ }
+ /* otherwise, go on with reading the header from buf (nothing now) */
}
#ifndef MYSQL_CLIENT
@@ -340,41 +408,42 @@ int Log_event::exec_event(struct st_relay_log_info* rli)
only in the case discussed above, 'if (rli)' is useless here.
But as we are not 100% sure, keep it for now.
*/
- if (rli)
+ if (rli)
{
/*
- If in a transaction, and if the slave supports transactions,
- just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
- (not OPTION_NOT_AUTOCOMMIT) as transactions are logged
- with BEGIN/COMMIT, not with SET AUTOCOMMIT= .
-
+ If in a transaction, and if the slave supports transactions, just
+ inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
+ (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
+ BEGIN/COMMIT, not with SET AUTOCOMMIT= .
+
CAUTION: opt_using_transactions means
- innodb || bdb ; suppose the master supports InnoDB and BDB,
+ innodb || bdb ; suppose the master supports InnoDB and BDB,
but the slave supports only BDB, problems
- will arise:
+ will arise:
- suppose an InnoDB table is created on the master,
- then it will be MyISAM on the slave
- - but as opt_using_transactions is true, the slave will believe he is
- transactional with the MyISAM table. And problems will come when one
- does START SLAVE; STOP SLAVE; START SLAVE; (the slave will resume at
- BEGIN whereas there has not been any rollback). This is the problem of
- using opt_using_transactions instead of a finer
- "does the slave support _the_transactional_handler_used_on_the_master_".
-
- More generally, we'll have problems when a query mixes a transactional
- handler and MyISAM and STOP SLAVE is issued in the middle of the
- "transaction". START SLAVE will resume at BEGIN while the MyISAM table
- has already been updated.
+ - but as opt_using_transactions is true, the slave will believe he
+ is transactional with the MyISAM table. And problems will come
+ when one does START SLAVE; STOP SLAVE; START SLAVE; (the slave
+ will resume at BEGIN whereas there has not been any rollback).
+ This is the problem of using opt_using_transactions instead of a
+ finer "does the slave support
+ _the_transactional_handler_used_on_the_master_".
+
+ More generally, we'll have problems when a query mixes a
+ transactional handler and MyISAM and STOP SLAVE is issued in the
+ middle of the "transaction". START SLAVE will resume at BEGIN
+ while the MyISAM table has already been updated.
*/
if ((thd->options & OPTION_BEGIN) && opt_using_transactions)
- rli->inc_event_relay_log_pos(get_event_len());
+ rli->inc_event_relay_log_pos();
else
{
- rli->inc_group_relay_log_pos(get_event_len(),log_pos);
+ rli->inc_group_relay_log_pos(log_pos);
flush_relay_log_info(rli);
/*
- Note that Rotate_log_event::exec_event() does not call this function,
- so there is no chance that a fake rotate event resets
+ Note that Rotate_log_event::exec_event() does not call this
+ function, so there is no chance that a fake rotate event resets
last_master_timestamp.
Note that we update without mutex (probably ok - except in some very
rare cases, only consequence is that value may take some time to
@@ -435,58 +504,101 @@ void Log_event::init_show_field_list(List<Item>* field_list)
field_list->push_back(new Item_empty_string("Event_type", 20));
field_list->push_back(new Item_return_int("Server_id", 10,
MYSQL_TYPE_LONG));
- field_list->push_back(new Item_return_int("Orig_log_pos", 11,
+ field_list->push_back(new Item_return_int("End_log_pos", 11,
MYSQL_TYPE_LONGLONG));
field_list->push_back(new Item_empty_string("Info", 20));
}
-#endif /* !MYSQL_CLIENT */
/*
Log_event::write()
*/
-int Log_event::write(IO_CACHE* file)
+bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
{
- return (write_header(file) || write_data(file)) ? -1 : 0;
-}
+ byte header[LOG_EVENT_HEADER_LEN];
+ DBUG_ENTER("Log_event::write_header");
+ /* Store number of bytes that will be written by this event */
+ data_written= event_data_length + sizeof(header);
-/*
- Log_event::write_header()
-*/
+ /*
+ log_pos != 0 if this is relay-log event. In this case we should not
+ change the position
+ */
-int Log_event::write_header(IO_CACHE* file)
-{
- char buf[LOG_EVENT_HEADER_LEN];
- char* pos = buf;
- int4store(pos, (ulong) when); // timestamp
- pos += 4;
- *pos++ = get_type_code(); // event type code
- int4store(pos, server_id);
- pos += 4;
- long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
- int4store(pos, tmp);
- pos += 4;
- int4store(pos, log_pos);
- pos += 4;
- int2store(pos, flags);
- pos += 2;
- return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf)));
+ if (is_artificial_event())
+ {
+ /*
+ We should not do any cleanup on slave when reading this. We
+ mark this by setting log_pos to 0. Start_log_event_v3() will
+ detect this on reading and set artificial_event=1 for the event.
+ */
+ log_pos= 0;
+ }
+ else if (!log_pos)
+ {
+ /*
+ Calculate position of end of event
+
+ Note that with a SEQ_READ_APPEND cache, my_b_tell() does not
+ work well. So this will give slightly wrong positions for the
+ Format_desc/Rotate/Stop events which the slave writes to its
+ relay log. For example, the initial Format_desc will have
+ end_log_pos=91 instead of 95. Because after writing the first 4
+ bytes of the relay log, my_b_tell() still reports 0. Because
+ my_b_append() does not update the counter which my_b_tell()
+ later uses (one should probably use my_b_append_tell() to work
+ around this). To get right positions even when writing to the
+ relay log, we use the (new) my_b_safe_tell().
+
+ Note that this raises a question on the correctness of all these
+ DBUG_ASSERT(my_b_tell()=rli->event_relay_log_pos).
+
+ If in a transaction, the log_pos which we calculate below is not
+ very good (because then my_b_safe_tell() returns start position
+ of the BEGIN, so it's like the statement was at the BEGIN's
+ place), but it's not a very serious problem (as the slave, when
+ it is in a transaction, does not take those end_log_pos into
+ account (as it calls inc_event_relay_log_pos()). To be fixed
+ later, so that it looks less strange. But not bug.
+ */
+
+ log_pos= my_b_safe_tell(file)+data_written;
+ }
+
+ /*
+ Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
+ FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT, where it will be
+ LOG_EVENT_MINIMAL_HEADER_LEN (remember these 2 have a frozen header,
+ because we read them before knowing the format).
+ */
+
+ int4store(header, (ulong) when); // timestamp
+ header[EVENT_TYPE_OFFSET]= get_type_code();
+ int4store(header+ SERVER_ID_OFFSET, server_id);
+ int4store(header+ EVENT_LEN_OFFSET, data_written);
+ int4store(header+ LOG_POS_OFFSET, log_pos);
+ int2store(header+ FLAGS_OFFSET, flags);
+
+ DBUG_RETURN(my_b_safe_write(file, header, sizeof(header)) != 0);
}
/*
Log_event::read_log_event()
+
+ This needn't be format-tolerant, because we only read
+ LOG_EVENT_MINIMAL_HEADER_LEN (we just want to read the event's length).
+
*/
-#ifndef MYSQL_CLIENT
int Log_event::read_log_event(IO_CACHE* file, String* packet,
pthread_mutex_t* log_lock)
{
ulong data_len;
int result=0;
- char buf[LOG_EVENT_HEADER_LEN];
+ char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
DBUG_ENTER("read_log_event");
if (log_lock)
@@ -506,24 +618,25 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
goto end;
}
data_len= uint4korr(buf + EVENT_LEN_OFFSET);
- if (data_len < LOG_EVENT_HEADER_LEN ||
+ if (data_len < LOG_EVENT_MINIMAL_HEADER_LEN ||
data_len > current_thd->variables.max_allowed_packet)
{
DBUG_PRINT("error",("data_len: %ld", data_len));
- result= ((data_len < LOG_EVENT_HEADER_LEN) ? LOG_READ_BOGUS :
+ result= ((data_len < LOG_EVENT_MINIMAL_HEADER_LEN) ? LOG_READ_BOGUS :
LOG_READ_TOO_LARGE);
goto end;
}
packet->append(buf, sizeof(buf));
- data_len-= LOG_EVENT_HEADER_LEN;
+ data_len-= LOG_EVENT_MINIMAL_HEADER_LEN;
if (data_len)
{
if (packet->append(file, data_len))
{
/*
- Here we should never hit EOF in a non-error condition.
+ Here if we hit EOF it's really an error: as data_len is >=0
+ there's supposed to be more bytes available.
EOF means we are reading the event partially, which should
- never happen.
+ never happen: either we read badly or the binlog is truncated.
*/
result= file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO;
/* Implicit goto end; */
@@ -540,42 +653,63 @@ end:
#ifndef MYSQL_CLIENT
#define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
#define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
-#define max_allowed_packet current_thd->variables.max_allowed_packet
#else
#define UNLOCK_MUTEX
#define LOCK_MUTEX
-#define max_allowed_packet (*mysql_get_parameters()->p_max_allowed_packet)
#endif
/*
Log_event::read_log_event()
NOTE:
- Allocates memory; The caller is responsible for clean-up
+ Allocates memory; The caller is responsible for clean-up.
*/
#ifndef MYSQL_CLIENT
Log_event* Log_event::read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
- bool old_format)
+ const Format_description_log_event *description_event)
#else
-Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
-#endif
+Log_event* Log_event::read_log_event(IO_CACHE* file,
+ const Format_description_log_event *description_event)
+#endif
{
- char head[LOG_EVENT_HEADER_LEN];
- uint header_size= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
+ DBUG_ENTER("Log_event::read_log_event(IO_CACHE *, Format_description_log_event *");
+ DBUG_ASSERT(description_event != 0);
+ char head[LOG_EVENT_MINIMAL_HEADER_LEN];
+ /*
+ First we only want to read at most LOG_EVENT_MINIMAL_HEADER_LEN, just to
+ check the event for sanity and to know its length; no need to really parse
+ it. We say "at most" because this could be a 3.23 master, which has header
+ of 13 bytes, whereas LOG_EVENT_MINIMAL_HEADER_LEN is 19 bytes (it's
+ "minimal" over the set {MySQL >=4.0}).
+ */
+ uint header_size= min(description_event->common_header_len,
+ LOG_EVENT_MINIMAL_HEADER_LEN);
LOCK_MUTEX;
+ DBUG_PRINT("info", ("my_b_tell=%lu", my_b_tell(file)));
if (my_b_read(file, (byte *) head, header_size))
{
+ DBUG_PRINT("info", ("Log_event::read_log_event(IO_CACHE*,Format_desc*) \
+failed my_b_read"));
UNLOCK_MUTEX;
- return 0;
+ /*
+ No error here; it could be that we are at the file's end. However
+ if the next my_b_read() fails (below), it will be an error as we
+ were able to read the first bytes.
+ */
+ DBUG_RETURN(0);
}
uint data_len = uint4korr(head + EVENT_LEN_OFFSET);
char *buf= 0;
const char *error= 0;
Log_event *res= 0;
+#ifndef max_allowed_packet
+ THD *thd=current_thd;
+ uint max_allowed_packet= thd ? thd->variables.max_allowed_packet : ~0;
+#endif
if (data_len > max_allowed_packet)
{
@@ -602,15 +736,16 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, bool old_format)
error = "read error";
goto err;
}
- if ((res = read_log_event(buf, data_len, &error, old_format)))
+ if ((res= read_log_event(buf, data_len, &error, description_event)))
res->register_temp_buf(buf);
err:
UNLOCK_MUTEX;
- if (error)
+ if (!res)
{
- sql_print_error("\
-Error in Log_event::read_log_event(): '%s', data_len: %d, event_type: %d",
+ DBUG_ASSERT(error != 0);
+ sql_print_error("Error in Log_event::read_log_event(): "
+ "'%s', data_len: %d, event_type: %d",
error,data_len,head[EVENT_TYPE_OFFSET]);
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
/*
@@ -623,94 +758,120 @@ Error in Log_event::read_log_event(): '%s', data_len: %d, event_type: %d",
*/
file->error= -1;
}
- return res;
+ DBUG_RETURN(res);
}
/*
Log_event::read_log_event()
+ Binlog format tolerance is in (buf, event_len, description_event)
+ constructors.
*/
-Log_event* Log_event::read_log_event(const char* buf, int event_len,
- const char **error, bool old_format)
+Log_event* Log_event::read_log_event(const char* buf, uint event_len,
+ const char **error,
+ const Format_description_log_event *description_event)
{
- DBUG_ENTER("Log_event::read_log_event");
-
+ Log_event* ev;
+ DBUG_ENTER("Log_event::read_log_event(char*,...)");
+ DBUG_ASSERT(description_event != 0);
+ DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version));
if (event_len < EVENT_LEN_OFFSET ||
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
{
*error="Sanity check failed"; // Needed to free buffer
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
}
-
- Log_event* ev = NULL;
-
+
switch(buf[EVENT_TYPE_OFFSET]) {
case QUERY_EVENT:
- ev = new Query_log_event(buf, event_len, old_format);
+ ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
break;
case LOAD_EVENT:
- ev = new Create_file_log_event(buf, event_len, old_format);
+ ev = new Load_log_event(buf, event_len, description_event);
break;
case NEW_LOAD_EVENT:
- ev = new Load_log_event(buf, event_len, old_format);
+ ev = new Load_log_event(buf, event_len, description_event);
break;
case ROTATE_EVENT:
- ev = new Rotate_log_event(buf, event_len, old_format);
+ ev = new Rotate_log_event(buf, event_len, description_event);
break;
#ifdef HAVE_REPLICATION
- case SLAVE_EVENT:
+ case SLAVE_EVENT: /* can never happen (unused event) */
ev = new Slave_log_event(buf, event_len);
break;
#endif /* HAVE_REPLICATION */
case CREATE_FILE_EVENT:
- ev = new Create_file_log_event(buf, event_len, old_format);
+ ev = new Create_file_log_event(buf, event_len, description_event);
break;
case APPEND_BLOCK_EVENT:
- ev = new Append_block_log_event(buf, event_len);
+ ev = new Append_block_log_event(buf, event_len, description_event);
break;
case DELETE_FILE_EVENT:
- ev = new Delete_file_log_event(buf, event_len);
+ ev = new Delete_file_log_event(buf, event_len, description_event);
break;
case EXEC_LOAD_EVENT:
- ev = new Execute_load_log_event(buf, event_len);
+ ev = new Execute_load_log_event(buf, event_len, description_event);
break;
- case START_EVENT:
- ev = new Start_log_event(buf, old_format);
+ case START_EVENT_V3: /* this is sent only by MySQL <=4.x */
+ ev = new Start_log_event_v3(buf, description_event);
break;
-#ifdef HAVE_REPLICATION
case STOP_EVENT:
- ev = new Stop_log_event(buf, old_format);
+ ev = new Stop_log_event(buf, description_event);
break;
-#endif /* HAVE_REPLICATION */
case INTVAR_EVENT:
- ev = new Intvar_log_event(buf, old_format);
+ ev = new Intvar_log_event(buf, description_event);
+ break;
+ case XID_EVENT:
+ ev = new Xid_log_event(buf, description_event);
break;
case RAND_EVENT:
- ev = new Rand_log_event(buf, old_format);
+ ev = new Rand_log_event(buf, description_event);
break;
case USER_VAR_EVENT:
- ev = new User_var_log_event(buf, old_format);
+ ev = new User_var_log_event(buf, description_event);
+ break;
+ case FORMAT_DESCRIPTION_EVENT:
+ ev = new Format_description_log_event(buf, event_len, description_event);
+ break;
+ case BEGIN_LOAD_QUERY_EVENT:
+ ev = new Begin_load_query_log_event(buf, event_len, description_event);
+ break;
+ case EXECUTE_LOAD_QUERY_EVENT:
+ ev = new Execute_load_query_log_event(buf, event_len, description_event);
break;
default:
+ DBUG_PRINT("error",("Unknown evernt code: %d",(int) buf[EVENT_TYPE_OFFSET]));
+ ev= NULL;
break;
}
+
+ /*
+ is_valid() are small event-specific sanity tests which are
+ important; for example there are some my_malloc() in constructors
+ (e.g. Query_log_event::Query_log_event(char*...)); when these
+ my_malloc() fail we can't return an error out of the constructor
+ (because constructor is "void") ; so instead we leave the pointer we
+ wanted to allocate (e.g. 'query') to 0 and we test it in is_valid().
+ Same for Format_description_log_event, member 'post_header_len'.
+ */
if (!ev || !ev->is_valid())
{
+ DBUG_PRINT("error",("Found invalid event in binary log"));
+
delete ev;
#ifdef MYSQL_CLIENT
- if (!force_opt)
+ if (!force_opt) /* then mysqlbinlog dies */
{
*error= "Found invalid event in binary log";
DBUG_RETURN(0);
}
- ev= new Unknown_log_event(buf, old_format);
+ ev= new Unknown_log_event(buf, description_event);
#else
*error= "Found invalid event in binary log";
DBUG_RETURN(0);
#endif
}
- ev->cached_event_len = event_len;
DBUG_RETURN(ev);
}
@@ -725,7 +886,7 @@ void Log_event::print_header(FILE* file)
char llbuff[22];
fputc('#', file);
print_timestamp(file);
- fprintf(file, " server id %d log_pos %s ", server_id,
+ fprintf(file, " server id %d end_log_pos %s ", server_id,
llstr(log_pos,llbuff));
}
@@ -757,19 +918,6 @@ void Log_event::print_timestamp(FILE* file, time_t* ts)
#endif /* MYSQL_CLIENT */
-/*
- Log_event::set_log_pos()
-*/
-
-#ifndef MYSQL_CLIENT
-void Log_event::set_log_pos(MYSQL_LOG* log)
-{
- if (!log_pos)
- log_pos = my_b_tell(&log->log_file);
-}
-#endif /* !MYSQL_CLIENT */
-
-
/**************************************************************************
Query_log_event methods
**************************************************************************/
@@ -778,15 +926,20 @@ void Log_event::set_log_pos(MYSQL_LOG* log)
/*
Query_log_event::pack_info()
+ This (which is used only for SHOW BINLOG EVENTS) could be updated to
+ print SET @@session_var=. But this is not urgent, as SHOW BINLOG EVENTS is
+ only an information, it does not produce suitable queries to replay (for
+ example it does not print LOAD DATA INFILE).
*/
void Query_log_event::pack_info(Protocol *protocol)
{
+ // TODO: show the catalog ??
char *buf, *pos;
if (!(buf= my_malloc(9 + db_len + q_len, MYF(MY_WME))))
return;
- pos= buf;
- if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
+ pos= buf;
+ if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
&& db && db_len)
{
pos= strmov(buf, "use `");
@@ -803,34 +956,50 @@ void Query_log_event::pack_info(Protocol *protocol)
}
#endif
+#ifndef MYSQL_CLIENT
-/*
- Query_log_event::write()
-*/
-
-int Query_log_event::write(IO_CACHE* file)
+/* Utility function for the next method */
+static void write_str_with_code_and_len(char **dst, const char *src,
+ int len, uint code)
{
- return query ? Log_event::write(file) : -1;
+ DBUG_ASSERT(src);
+ *((*dst)++)= code;
+ *((*dst)++)= (uchar) len;
+ bmove(*dst, src, len);
+ (*dst)+= len;
}
/*
- Query_log_event::write_data()
+ Query_log_event::write()
+
+ NOTES:
+ In this event we have to modify the header to have the correct
+ EVENT_LEN_OFFSET as we don't yet know how many status variables we
+ will print!
*/
-int Query_log_event::write_data(IO_CACHE* file)
+bool Query_log_event::write(IO_CACHE* file)
{
- char buf[QUERY_HEADER_LEN];
+ uchar buf[QUERY_HEADER_LEN+
+ 1+4+ // code of flags2 and flags2
+ 1+8+ // code of sql_mode and sql_mode
+ 1+1+FN_REFLEN+ // code of catalog and catalog length and catalog
+ 1+4+ // code of autoinc and the 2 autoinc variables
+ 1+6+ // code of charset and charset
+ 1+1+MAX_TIME_ZONE_NAME_LENGTH // code of tz and tz length and tz name
+ ], *start, *start_of_status;
+ ulong event_length;
if (!query)
- return -1;
-
+ return 1; // Something wrong with event
+
/*
We want to store the thread id:
(- as an information for the user when he reads the binlog)
- if the query uses temporary table: for the slave SQL thread to know to
which master connection the temp table belongs.
- Now imagine we (write_data()) are called by the slave SQL thread (we are
+ Now imagine we (write()) are called by the slave SQL thread (we are
logging a query executed by this thread; the slave runs with
--log-slave-updates). Then this query will be logged with
thread_id=the_thread_id_of_the_SQL_thread. Imagine that 2 temp tables of
@@ -867,78 +1036,324 @@ int Query_log_event::write_data(IO_CACHE* file)
buf[Q_DB_LEN_OFFSET] = (char) db_len;
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
- return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
- my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
- my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0;
+ /*
+ You MUST always write status vars in increasing order of code. This
+ guarantees that a slightly older slave will be able to parse those he
+ knows.
+ */
+ start_of_status= start= buf+QUERY_HEADER_LEN;
+ if (flags2_inited)
+ {
+ *start++= Q_FLAGS2_CODE;
+ int4store(start, flags2);
+ start+= 4;
+ }
+ if (sql_mode_inited)
+ {
+ *start++= Q_SQL_MODE_CODE;
+ int8store(start, (ulonglong)sql_mode);
+ start+= 8;
+ }
+ if (catalog_len) // i.e. this var is inited (false for 4.0 events)
+ {
+ write_str_with_code_and_len((char **)(&start),
+ catalog, catalog_len, Q_CATALOG_NZ_CODE);
+ /*
+ In 5.0.x where x<4 masters we used to store the end zero here. This was
+ a waste of one byte so we don't do it in x>=4 masters. We change code to
+ Q_CATALOG_NZ_CODE, because re-using the old code would make x<4 slaves
+ of this x>=4 master segfault (expecting a zero when there is
+ none). Remaining compatibility problems are: the older slave will not
+ find the catalog; but it is will not crash, and it's not an issue
+ that it does not find the catalog as catalogs were not used in these
+ older MySQL versions (we store it in binlog and read it from relay log
+ but do nothing useful with it). What is an issue is that the older slave
+ will stop processing the Q_* blocks (and jumps to the db/query) as soon
+ as it sees unknown Q_CATALOG_NZ_CODE; so it will not be able to read
+ Q_AUTO_INCREMENT*, Q_CHARSET and so replication will fail silently in
+ various ways. Documented that you should not mix alpha/beta versions if
+ they are not exactly the same version, with example of 5.0.3->5.0.2 and
+ 5.0.4->5.0.3. If replication is from older to new, the new will
+ recognize Q_CATALOG_CODE and have no problem.
+ */
+ }
+ if (auto_increment_increment != 1)
+ {
+ *start++= Q_AUTO_INCREMENT;
+ int2store(start, auto_increment_increment);
+ int2store(start+2, auto_increment_offset);
+ start+= 4;
+ }
+ if (charset_inited)
+ {
+ *start++= Q_CHARSET_CODE;
+ memcpy(start, charset, 6);
+ start+= 6;
+ }
+ if (time_zone_len)
+ {
+ /* In the TZ sys table, column Name is of length 64 so this should be ok */
+ DBUG_ASSERT(time_zone_len <= MAX_TIME_ZONE_NAME_LENGTH);
+ *start++= Q_TIME_ZONE_CODE;
+ *start++= time_zone_len;
+ memcpy(start, time_zone_str, time_zone_len);
+ start+= time_zone_len;
+ }
+ /*
+ Here there could be code like
+ if (command-line-option-which-says-"log_this_variable" && inited)
+ {
+ *start++= Q_THIS_VARIABLE_CODE;
+ int4store(start, this_variable);
+ start+= 4;
+ }
+ */
+
+ /* Store length of status variables */
+ status_vars_len= (uint) (start-start_of_status);
+ int2store(buf + Q_STATUS_VARS_LEN_OFFSET, status_vars_len);
+
+ /*
+ Calculate length of whole event
+ The "1" below is the \0 in the db's length
+ */
+ event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
+
+ return (write_header(file, event_length) ||
+ my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
+ write_post_header_for_derived(file) ||
+ my_b_safe_write(file, (byte*) start_of_status,
+ (uint) (start-start_of_status)) ||
+ my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
+ my_b_safe_write(file, (byte*) query, q_len)) ? 1 : 0;
}
/*
Query_log_event::Query_log_event()
*/
-
-#ifndef MYSQL_CLIENT
Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
ulong query_length, bool using_trans,
bool suppress_use)
- :Log_event(thd_arg,
+ :Log_event(thd_arg,
((thd_arg->tmp_table_used ? LOG_EVENT_THREAD_SPECIFIC_F : 0)
| (suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0)),
using_trans),
- data_buf(0), query(query_arg),
+ data_buf(0), query(query_arg), catalog(thd_arg->catalog),
db(thd_arg->db), q_len((uint32) query_length),
- error_code(thd_arg->killed ?
+ error_code((thd_arg->killed != THD::NOT_KILLED) ?
((thd_arg->system_thread & SYSTEM_THREAD_DELAYED_INSERT) ?
- 0 : ER_SERVER_SHUTDOWN) : thd_arg->net.last_errno),
+ 0 : thd->killed_errno()) : thd_arg->net.last_errno),
thread_id(thd_arg->thread_id),
/* save the original thread id; we already know the server id */
- slave_proxy_id(thd_arg->variables.pseudo_thread_id)
+ slave_proxy_id(thd_arg->variables.pseudo_thread_id),
+ flags2_inited(1), sql_mode_inited(1), charset_inited(1),
+ sql_mode(thd_arg->variables.sql_mode),
+ auto_increment_increment(thd_arg->variables.auto_increment_increment),
+ auto_increment_offset(thd_arg->variables.auto_increment_offset)
{
time_t end_time;
time(&end_time);
exec_time = (ulong) (end_time - thd->start_time);
+ catalog_len = (catalog) ? (uint32) strlen(catalog) : 0;
+ /* status_vars_len is set just before writing the event */
db_len = (db) ? (uint32) strlen(db) : 0;
+ /*
+ If we don't use flags2 for anything else than options contained in
+ thd->options, it would be more efficient to flags2=thd_arg->options
+ (OPTIONS_WRITTEN_TO_BINLOG would be used only at reading time).
+ But it's likely that we don't want to use 32 bits for 3 bits; in the future
+ we will probably want to reclaim the 29 bits. So we need the &.
+ */
+ flags2= thd_arg->options & OPTIONS_WRITTEN_TO_BIN_LOG;
+ DBUG_ASSERT(thd->variables.character_set_client->number < 256*256);
+ DBUG_ASSERT(thd->variables.collation_connection->number < 256*256);
+ DBUG_ASSERT(thd->variables.collation_server->number < 256*256);
+ int2store(charset, thd_arg->variables.character_set_client->number);
+ int2store(charset+2, thd_arg->variables.collation_connection->number);
+ int2store(charset+4, thd_arg->variables.collation_server->number);
+ if (thd_arg->time_zone_used)
+ {
+ /*
+ Note that our event becomes dependent on the Time_zone object
+ representing the time zone. Fortunately such objects are never deleted
+ or changed during mysqld's lifetime.
+ */
+ time_zone_len= thd_arg->variables.time_zone->get_name()->length();
+ time_zone_str= thd_arg->variables.time_zone->get_name()->ptr();
+ }
+ else
+ time_zone_len= 0;
+ DBUG_PRINT("info",("Query_log_event has flags2=%lu sql_mode=%lu",flags2,sql_mode));
}
#endif /* MYSQL_CLIENT */
+/* 2 utility functions for the next method */
+
+static void get_str_len_and_pointer(const char **dst, const char **src, uint *len)
+{
+ if ((*len= **src))
+ *dst= *src + 1; // Will be copied later
+ (*src)+= *len+1;
+}
+
+
+static void copy_str_and_move(char **dst, const char **src, uint len)
+{
+ memcpy(*dst, *src, len);
+ *src= *dst;
+ (*dst)+= len;
+ *(*dst)++= 0;
+}
+
+
/*
Query_log_event::Query_log_event()
+ This is used by the SQL slave thread to prepare the event before execution.
*/
-Query_log_event::Query_log_event(const char* buf, int event_len,
- bool old_format)
- :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
+Query_log_event::Query_log_event(const char* buf, uint event_len,
+ const Format_description_log_event *description_event,
+ Log_event_type event_type)
+ :Log_event(buf, description_event), data_buf(0), query(NullS),
+ db(NullS), catalog_len(0), status_vars_len(0),
+ flags2_inited(0), sql_mode_inited(0), charset_inited(0),
+ auto_increment_increment(1), auto_increment_offset(1),
+ time_zone_len(0)
{
ulong data_len;
- if (old_format)
- {
- if ((uint)event_len < OLD_HEADER_LEN + QUERY_HEADER_LEN)
- return;
- data_len = event_len - (QUERY_HEADER_LEN + OLD_HEADER_LEN);
- buf += OLD_HEADER_LEN;
- }
- else
- {
- if ((uint)event_len < QUERY_EVENT_OVERHEAD)
- return;
- data_len = event_len - QUERY_EVENT_OVERHEAD;
- buf += LOG_EVENT_HEADER_LEN;
- }
-
+ uint32 tmp;
+ uint8 common_header_len, post_header_len;
+ char *start;
+ const char *end;
+ bool catalog_nz= 1;
+ DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
+
+ common_header_len= description_event->common_header_len;
+ post_header_len= description_event->post_header_len[event_type-1];
+ DBUG_PRINT("info",("event_len=%ld, common_header_len=%d, post_header_len=%d",
+ event_len, common_header_len, post_header_len));
+
+ /*
+ We test if the event's length is sensible, and if so we compute data_len.
+ We cannot rely on QUERY_HEADER_LEN here as it would not be format-tolerant.
+ We use QUERY_HEADER_MINIMAL_LEN which is the same for 3.23, 4.0 & 5.0.
+ */
+ if (event_len < (uint)(common_header_len + post_header_len))
+ DBUG_VOID_RETURN;
+ data_len = event_len - (common_header_len + post_header_len);
+ buf+= common_header_len;
+
+ slave_proxy_id= thread_id = uint4korr(buf + Q_THREAD_ID_OFFSET);
exec_time = uint4korr(buf + Q_EXEC_TIME_OFFSET);
+ db_len = (uint)buf[Q_DB_LEN_OFFSET]; // TODO: add a check of all *_len vars
error_code = uint2korr(buf + Q_ERR_CODE_OFFSET);
- if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME))))
- return;
+ /*
+ 5.0 format starts here.
+ Depending on the format, we may or not have affected/warnings etc
+ The remnent post-header to be parsed has length:
+ */
+ tmp= post_header_len - QUERY_HEADER_MINIMAL_LEN;
+ if (tmp)
+ {
+ status_vars_len= uint2korr(buf + Q_STATUS_VARS_LEN_OFFSET);
+ data_len-= status_vars_len;
+ DBUG_PRINT("info", ("Query_log_event has status_vars_len: %u",
+ (uint) status_vars_len));
+ tmp-= 2;
+ }
+ /*
+ We have parsed everything we know in the post header for QUERY_EVENT,
+ the rest of post header is either comes from older version MySQL or
+ dedicated to derived events (e.g. Execute_load_query...)
+ */
- memcpy(data_buf, buf + Q_DATA_OFFSET, data_len);
- slave_proxy_id= thread_id= uint4korr(buf + Q_THREAD_ID_OFFSET);
- db = data_buf;
- db_len = (uint)buf[Q_DB_LEN_OFFSET];
- query=data_buf + db_len + 1;
- q_len = data_len - 1 - db_len;
- *((char*)query+q_len) = 0;
+ /* variable-part: the status vars; only in MySQL 5.0 */
+
+ start= (char*) (buf+post_header_len);
+ end= (const char*) (start+status_vars_len);
+ for (const uchar* pos= (const uchar*) start; pos < (const uchar*) end;)
+ {
+ switch (*pos++) {
+ case Q_FLAGS2_CODE:
+ flags2_inited= 1;
+ flags2= uint4korr(pos);
+ DBUG_PRINT("info",("In Query_log_event, read flags2: %lu", flags2));
+ pos+= 4;
+ break;
+ case Q_SQL_MODE_CODE:
+ {
+#ifndef DBUG_OFF
+ char buff[22];
+#endif
+ sql_mode_inited= 1;
+ sql_mode= (ulong) uint8korr(pos); // QQ: Fix when sql_mode is ulonglong
+ DBUG_PRINT("info",("In Query_log_event, read sql_mode: %s",
+ llstr(sql_mode, buff)));
+ pos+= 8;
+ break;
+ }
+ case Q_CATALOG_NZ_CODE:
+ get_str_len_and_pointer(&catalog, (const char **)(&pos), &catalog_len);
+ break;
+ case Q_AUTO_INCREMENT:
+ auto_increment_increment= uint2korr(pos);
+ auto_increment_offset= uint2korr(pos+2);
+ pos+= 4;
+ break;
+ case Q_CHARSET_CODE:
+ {
+ charset_inited= 1;
+ memcpy(charset, pos, 6);
+ pos+= 6;
+ break;
+ }
+ case Q_TIME_ZONE_CODE:
+ {
+ get_str_len_and_pointer(&time_zone_str, (const char **)(&pos), &time_zone_len);
+ break;
+ }
+ case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
+ if ((catalog_len= *pos))
+ catalog= (char*) pos+1; // Will be copied later
+ pos+= catalog_len+2; // leap over end 0
+ catalog_nz= 0; // catalog has end 0 in event
+ break;
+ default:
+ /* That's why you must write status vars in growing order of code */
+ DBUG_PRINT("info",("Query_log_event has unknown status vars (first has\
+ code: %u), skipping the rest of them", (uint) *(pos-1)));
+ pos= (const uchar*) end; // Break loop
+ }
+ }
+
+ if (!(start= data_buf = (char*) my_malloc(catalog_len + 1 +
+ time_zone_len + 1 +
+ data_len + 1, MYF(MY_WME))))
+ DBUG_VOID_RETURN;
+ if (catalog_len) // If catalog is given
+ {
+ if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
+ copy_str_and_move(&start, &catalog, catalog_len);
+ else
+ {
+ memcpy(start, catalog, catalog_len+1); // copy end 0
+ catalog= start;
+ start+= catalog_len+1;
+ }
+ }
+ if (time_zone_len)
+ copy_str_and_move(&start, &time_zone_str, time_zone_len);
+
+ /* A 2nd variable part; this is common to all versions */
+ memcpy((char*) start, end, data_len); // Copy db and query
+ start[data_len]= '\0'; // End query with \0 (For safetly)
+ db= start;
+ query= start + db_len + 1;
+ q_len= data_len - db_len -1;
+ DBUG_VOID_RETURN;
}
@@ -947,30 +1362,27 @@ Query_log_event::Query_log_event(const char* buf, int event_len,
*/
#ifdef MYSQL_CLIENT
-void Query_log_event::print(FILE* file, bool short_form, char* last_db)
+void Query_log_event::print_query_header(FILE* file, bool short_form,
+ LAST_EVENT_INFO* last_event_info)
{
+ // TODO: print the catalog ??
char buff[40],*end; // Enough for SET TIMESTAMP
+ bool different_db= 1;
+ uint32 tmp;
+
if (!short_form)
{
print_header(file);
- fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
- (ulong) thread_id, (ulong) exec_time, error_code);
+ fprintf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
+ get_type_str(), (ulong) thread_id, (ulong) exec_time, error_code);
}
- bool different_db= 1;
-
- if (!(flags & LOG_EVENT_SUPPRESS_USE_F))
+ if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db)
{
- if (db && last_db)
- {
- if (different_db= memcmp(last_db, db, db_len + 1))
- memcpy(last_db, db, db_len + 1);
- }
-
- if (db && db[0] && different_db)
- {
+ if (different_db= memcmp(last_event_info->db, db, db_len + 1))
+ memcpy(last_event_info->db, db, db_len + 1);
+ if (db[0] && different_db)
fprintf(file, "use %s;\n", db);
- }
}
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
@@ -979,8 +1391,114 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME));
if (flags & LOG_EVENT_THREAD_SPECIFIC_F)
fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id);
+
+ /*
+ If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to
+ print (remember we don't produce mixed relay logs so there cannot be
+ 5.0 events before that one so there is nothing to reset).
+ */
+ if (likely(flags2_inited)) /* likely as this will mainly read 5.0 logs */
+ {
+ /* tmp is a bitmask of bits which have changed. */
+ if (likely(last_event_info->flags2_inited))
+ /* All bits which have changed */
+ tmp= (last_event_info->flags2) ^ flags2;
+ else /* that's the first Query event we read */
+ {
+ last_event_info->flags2_inited= 1;
+ tmp= ~((uint32)0); /* all bits have changed */
+ }
+
+ if (unlikely(tmp)) /* some bits have changed */
+ {
+ bool need_comma= 0;
+ fprintf(file, "SET ");
+ print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2,
+ "@@session.foreign_key_checks", &need_comma);
+ print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2,
+ "@@session.sql_auto_is_null", &need_comma);
+ print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2,
+ "@@session.unique_checks", &need_comma);
+ fprintf(file,";\n");
+ last_event_info->flags2= flags2;
+ }
+ }
+
+ /*
+ Now the session variables;
+ it's more efficient to pass SQL_MODE as a number instead of a
+ comma-separated list.
+ FOREIGN_KEY_CHECKS, SQL_AUTO_IS_NULL, UNIQUE_CHECKS are session-only
+ variables (they have no global version; they're not listed in
+ sql_class.h), The tests below work for pure binlogs or pure relay
+ logs. Won't work for mixed relay logs but we don't create mixed
+ relay logs (that is, there is no relay log with a format change
+ except within the 3 first events, which mysqlbinlog handles
+ gracefully). So this code should always be good.
+ */
+
+ if (likely(sql_mode_inited))
+ {
+ if (unlikely(!last_event_info->sql_mode_inited)) /* first Query event */
+ {
+ last_event_info->sql_mode_inited= 1;
+ /* force a difference to force write */
+ last_event_info->sql_mode= ~sql_mode;
+ }
+ if (unlikely(last_event_info->sql_mode != sql_mode))
+ {
+ fprintf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode);
+ last_event_info->sql_mode= sql_mode;
+ }
+ }
+ if (last_event_info->auto_increment_increment != auto_increment_increment ||
+ last_event_info->auto_increment_offset != auto_increment_offset)
+ {
+ fprintf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n",
+ auto_increment_increment,auto_increment_offset);
+ last_event_info->auto_increment_increment= auto_increment_increment;
+ last_event_info->auto_increment_offset= auto_increment_offset;
+ }
+
+ /* TODO: print the catalog when we feature SET CATALOG */
+
+ if (likely(charset_inited))
+ {
+ if (unlikely(!last_event_info->charset_inited)) /* first Query event */
+ {
+ last_event_info->charset_inited= 1;
+ last_event_info->charset[0]= ~charset[0]; // force a difference to force write
+ }
+ if (unlikely(bcmp(last_event_info->charset, charset, 6)))
+ {
+ fprintf(file,"SET "
+ "@@session.character_set_client=%d,"
+ "@@session.collation_connection=%d,"
+ "@@session.collation_server=%d"
+ ";\n",
+ uint2korr(charset),
+ uint2korr(charset+2),
+ uint2korr(charset+4));
+ memcpy(last_event_info->charset, charset, 6);
+ }
+ }
+ if (time_zone_len)
+ {
+ if (bcmp(last_event_info->time_zone_str, time_zone_str, time_zone_len+1))
+ {
+ fprintf(file,"SET @@session.time_zone='%s';\n", time_zone_str);
+ memcpy(last_event_info->time_zone_str, time_zone_str, time_zone_len+1);
+ }
+ }
+}
+
+
+void Query_log_event::print(FILE* file, bool short_form,
+ LAST_EVENT_INFO* last_event_info)
+{
+ print_query_header(file, short_form, last_event_info);
my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME));
- fprintf(file, ";\n");
+ fputs(";\n", file);
}
#endif /* MYSQL_CLIENT */
@@ -992,37 +1510,116 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Query_log_event::exec_event(struct st_relay_log_info* rli)
{
+ return exec_event(rli, query, q_len);
+}
+
+
+int Query_log_event::exec_event(struct st_relay_log_info* rli, const char *query_arg, uint32 q_len_arg)
+{
int expected_error,actual_error= 0;
+ /*
+ Colleagues: please never free(thd->catalog) in MySQL. This would lead to
+ bugs as here thd->catalog is a part of an alloced block, not an entire
+ alloced block (see Query_log_event::exec_event()). Same for thd->db.
+ Thank you.
+ */
+ thd->catalog= catalog_len ? (char *) catalog : (char *)"";
thd->db_length= db_len;
thd->db= (char*) rewrite_db(db, &thd->db_length);
+ thd->variables.auto_increment_increment= auto_increment_increment;
+ thd->variables.auto_increment_offset= auto_increment_offset;
/*
- InnoDB internally stores the master log position it has processed so far;
- position to store is of the END of the current log event.
+ InnoDB internally stores the master log position it has executed so far,
+ i.e. the position just after the COMMIT event.
+ When InnoDB will want to store, the positions in rli won't have
+ been updated yet, so group_master_log_* will point to old BEGIN
+ and event_master_log* will point to the beginning of current COMMIT.
+ But log_pos of the COMMIT Query event is what we want, i.e. the pos of the
+ END of the current log event (COMMIT). We save it in rli so that InnoDB can
+ access it.
*/
-#if MYSQL_VERSION_ID < 50000
- rli->future_group_master_log_pos= log_pos + get_event_len() -
- (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
-#else
- /* In 5.0 we store the end_log_pos in the relay log so no problem */
rli->future_group_master_log_pos= log_pos;
-#endif
+ DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
+
clear_all_errors(thd, rli);
if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{
thd->set_time((time_t)when);
- thd->query_length= q_len;
- thd->query = (char*)query;
+ thd->query_length= q_len_arg;
+ thd->query= (char*)query_arg;
VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = query_id++;
+ thd->query_id = next_query_id();
VOID(pthread_mutex_unlock(&LOCK_thread_count));
thd->variables.pseudo_thread_id= thread_id; // for temp tables
-
DBUG_PRINT("query",("%s",thd->query));
+
if (ignored_error_code((expected_error= error_code)) ||
!check_expected_error(thd,rli,expected_error))
- mysql_parse(thd, thd->query, q_len);
+ {
+ if (flags2_inited)
+ /*
+ all bits of thd->options which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG must
+ take their value from flags2.
+ */
+ thd->options= flags2|(thd->options & ~(ulong)OPTIONS_WRITTEN_TO_BIN_LOG);
+ /*
+ else, we are in a 3.23/4.0 binlog; we previously received a
+ Rotate_log_event which reset thd->options and sql_mode etc, so nothing to do.
+ */
+ /*
+ We do not replicate IGNORE_DIR_IN_CREATE. That is, if the master is a
+ slave which runs with SQL_MODE=IGNORE_DIR_IN_CREATE, this should not
+ force us to ignore the dir too. Imagine you are a ring of machines, and
+ one has a disk problem so that you temporarily need IGNORE_DIR_IN_CREATE
+ on this machine; you don't want it to propagate elsewhere (you don't want
+ all slaves to start ignoring the dirs).
+ */
+ if (sql_mode_inited)
+ thd->variables.sql_mode=
+ (ulong) ((thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE) |
+ (sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE));
+ if (charset_inited)
+ {
+ if (rli->cached_charset_compare(charset))
+ {
+ /* Verify that we support the charsets found in the event. */
+ if (!(thd->variables.character_set_client=
+ get_charset(uint2korr(charset), MYF(MY_WME))) ||
+ !(thd->variables.collation_connection=
+ get_charset(uint2korr(charset+2), MYF(MY_WME))) ||
+ !(thd->variables.collation_server=
+ get_charset(uint2korr(charset+4), MYF(MY_WME))))
+ {
+ /*
+ We updated the thd->variables with nonsensical values (0). Let's
+ set them to something safe (i.e. which avoids crash), and we'll
+ stop with EE_UNKNOWN_CHARSET in compare_errors (unless set to
+ ignore this error).
+ */
+ set_slave_thread_default_charset(thd, rli);
+ goto compare_errors;
+ }
+ thd->update_charset(); // for the charset change to take effect
+ }
+ }
+ if (time_zone_len)
+ {
+ String tmp(time_zone_str, time_zone_len, &my_charset_bin);
+ if (!(thd->variables.time_zone=
+ my_tz_find_with_opening_tz_tables(thd, &tmp)))
+ {
+ my_error(ER_UNKNOWN_TIME_ZONE, MYF(0), tmp.c_ptr());
+ thd->variables.time_zone= global_system_variables.time_zone;
+ goto compare_errors;
+ }
+ }
+
+ /* Execute the query (note that we bypass dispatch_command()) */
+ mysql_parse(thd, thd->query, thd->query_length);
+
+ }
else
{
/*
@@ -1032,7 +1629,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli)
we exit gracefully; otherwise we warn about the bad error and tell DBA
to check/fix it.
*/
- if (mysql_test_parse_for_slave(thd, thd->query, q_len))
+ if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length))
clear_all_errors(thd, rli); /* Can ignore query */
else
{
@@ -1052,19 +1649,21 @@ START SLAVE; . Query: '%s'", expected_error, thd->query);
if (thd->net.last_errno != ER_SLAVE_IGNORED_TABLE)
mysql_log.write(thd,COM_QUERY,"%s",thd->query);
- /*
+compare_errors:
+
+ /*
If we expected a non-zero error code, and we don't get the same error
code, and none of them should be ignored.
*/
DBUG_PRINT("info",("expected_error: %d last_errno: %d",
- expected_error, thd->net.last_errno));
+ expected_error, thd->net.last_errno));
if ((expected_error != (actual_error= thd->net.last_errno)) &&
- expected_error &&
- !ignored_error_code(actual_error) &&
- !ignored_error_code(expected_error))
+ expected_error &&
+ !ignored_error_code(actual_error) &&
+ !ignored_error_code(expected_error))
{
slave_print_error(rli, 0,
- "\
+ "\
Query caused different errors on master and slave. \
Error on master: '%s' (%d), Error on slave: '%s' (%d). \
Default database: '%s'. Query: '%s'",
@@ -1072,14 +1671,14 @@ Default database: '%s'. Query: '%s'",
expected_error,
actual_error ? thd->net.last_error: "no error",
actual_error,
- print_slave_db_safe(thd->db), query);
+ print_slave_db_safe(db), query_arg);
thd->query_error= 1;
}
/*
If we get the same error code as expected, or they should be ignored.
*/
else if (expected_error == actual_error ||
- ignored_error_code(actual_error))
+ ignored_error_code(actual_error))
{
DBUG_PRINT("info",("error ignored"));
clear_all_errors(thd, rli);
@@ -1093,14 +1692,46 @@ Default database: '%s'. Query: '%s'",
"Error '%s' on query. Default database: '%s'. Query: '%s'",
(actual_error ? thd->net.last_error :
"unexpected success or fatal error"),
- print_slave_db_safe(thd->db), query);
+ print_slave_db_safe(thd->db), query_arg);
thd->query_error= 1;
}
+
+ /*
+ TODO: compare the values of "affected rows" around here. Something
+ like:
+ if ((uint32) affected_in_event != (uint32) affected_on_slave)
+ {
+ sql_print_error("Slave: did not get the expected number of affected \
+ rows running query from master - expected %d, got %d (this numbers \
+ should have matched modulo 4294967296).", 0, ...);
+ thd->query_error = 1;
+ }
+ We may also want an option to tell the slave to ignore "affected"
+ mismatch. This mismatch could be implemented with a new ER_ code, and
+ to ignore it you would use --slave-skip-errors...
+
+ To do the comparison we need to know the value of "affected" which the
+ above mysql_parse() computed. And we need to know the value of
+ "affected" in the master's binlog. Both will be implemented later. The
+ important thing is that we now have the format ready to log the values
+ of "affected" in the binlog. So we can release 5.0.0 before effectively
+ logging "affected" and effectively comparing it.
+ */
} /* End of if (db_ok(... */
end:
VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->db= 0; // prevent db from being freed
+ /*
+ Probably we have set thd->query, thd->db, thd->catalog to point to places
+ in the data_buf of this event. Now the event is going to be deleted
+ probably, so data_buf will be freed, so the thd->... listed above will be
+ pointers to freed memory.
+ So we must set them to 0, so that those bad pointers values are not later
+ used. Note that "cleanup" queries like automatic DROP TEMPORARY TABLE
+ don't suffer from these assignments to 0 as DROP TEMPORARY
+ TABLE uses the db.table syntax.
+ */
+ thd->db= thd->catalog= 0; // prevent db from being freed
thd->query= 0; // just to be sure
thd->query_length= thd->db_length =0;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
@@ -1113,22 +1744,30 @@ end:
updating query.
*/
return (thd->query_error ? thd->query_error :
- (thd->one_shot_set ? (rli->inc_event_relay_log_pos(get_event_len()),0) :
+ (thd->one_shot_set ? (rli->inc_event_relay_log_pos(),0) :
Log_event::exec_event(rli)));
}
#endif
/**************************************************************************
- Start_log_event methods
+ Start_log_event_v3 methods
**************************************************************************/
+#ifndef MYSQL_CLIENT
+Start_log_event_v3::Start_log_event_v3() :Log_event(), binlog_version(BINLOG_VERSION), artificial_event(0)
+{
+ created= when;
+ memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
+}
+#endif
+
/*
- Start_log_event::pack_info()
+ Start_log_event_v3::pack_info()
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-void Start_log_event::pack_info(Protocol *protocol)
+void Start_log_event_v3::pack_info(Protocol *protocol)
{
char buf[12 + ST_SERVER_VER_LEN + 14 + 22], *pos;
pos= strmov(buf, "Server ver: ");
@@ -1141,57 +1780,80 @@ void Start_log_event::pack_info(Protocol *protocol)
/*
- Start_log_event::print()
+ Start_log_event_v3::print()
*/
#ifdef MYSQL_CLIENT
-void Start_log_event::print(FILE* file, bool short_form, char* last_db)
+void Start_log_event_v3::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
{
- if (short_form)
- return;
-
- print_header(file);
- fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version,
- server_version);
- print_timestamp(file);
- if (created)
- fprintf(file," at startup");
- fputc('\n', file);
+ if (!short_form)
+ {
+ print_header(file);
+ fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version,
+ server_version);
+ print_timestamp(file);
+ if (created)
+ fprintf(file," at startup");
+ fputc('\n', file);
+ if (flags & LOG_EVENT_BINLOG_IN_USE_F)
+ fprintf(file, "# Warning: this binlog was not closed properly. "
+ "Most probably mysqld crashed writing it.\n");
+ }
+ if (!artificial_event && created)
+ {
+#ifdef WHEN_WE_HAVE_THE_RESET_CONNECTION_SQL_COMMAND
+ /*
+ This is for mysqlbinlog: like in replication, we want to delete the stale
+ tmp files left by an unclean shutdown of mysqld (temporary tables)
+ and rollback unfinished transaction.
+ Probably this can be done with RESET CONNECTION (syntax to be defined).
+ */
+ fprintf(file,"RESET CONNECTION;\n");
+#else
+ fprintf(file,"ROLLBACK;\n");
+#endif
+ }
fflush(file);
}
#endif /* MYSQL_CLIENT */
/*
- Start_log_event::Start_log_event()
+ Start_log_event_v3::Start_log_event_v3()
*/
-Start_log_event::Start_log_event(const char* buf,
- bool old_format)
- :Log_event(buf, old_format)
+Start_log_event_v3::Start_log_event_v3(const char* buf,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
{
- buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
- binlog_version = uint2korr(buf+ST_BINLOG_VER_OFFSET);
+ buf+= description_event->common_header_len;
+ binlog_version= uint2korr(buf+ST_BINLOG_VER_OFFSET);
memcpy(server_version, buf+ST_SERVER_VER_OFFSET,
ST_SERVER_VER_LEN);
- created = uint4korr(buf+ST_CREATED_OFFSET);
+ created= uint4korr(buf+ST_CREATED_OFFSET);
+ /* We use log_pos to mark if this was an artificial event or not */
+ artificial_event= (log_pos == 0);
}
/*
- Start_log_event::write_data()
+ Start_log_event_v3::write()
*/
-int Start_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool Start_log_event_v3::write(IO_CACHE* file)
{
- char buff[START_HEADER_LEN];
+ char buff[START_V3_HEADER_LEN];
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
int4store(buff + ST_CREATED_OFFSET,created);
- return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
+ return (write_header(file, sizeof(buff)) ||
+ my_b_safe_write(file, (byte*) buff, sizeof(buff)));
}
+#endif
+
/*
- Start_log_event::exec_event()
+ Start_log_event_v3::exec_event()
The master started
@@ -1210,84 +1872,321 @@ int Start_log_event::write_data(IO_CACHE* file)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Start_log_event::exec_event(struct st_relay_log_info* rli)
+int Start_log_event_v3::exec_event(struct st_relay_log_info* rli)
{
- DBUG_ENTER("Start_log_event::exec_event");
-
- /*
- If the I/O thread has not started, mi->old_format is BINLOG_FORMAT_CURRENT
- (that's what the MASTER_INFO constructor does), so the test below is not
- perfect at all.
- */
- switch (rli->mi->old_format) {
- case BINLOG_FORMAT_CURRENT:
- /*
- This is 4.x, so a Start_log_event is only at master startup,
- so we are sure the master has restarted and cleared his temp tables.
- */
- close_temporary_tables(thd);
- cleanup_load_tmpdir();
+ DBUG_ENTER("Start_log_event_v3::exec_event");
+ switch (binlog_version)
+ {
+ case 3:
+ case 4:
/*
- As a transaction NEVER spans on 2 or more binlogs:
- if we have an active transaction at this point, the master died while
- writing the transaction to the binary log, i.e. while flushing the binlog
- cache to the binlog. As the write was started, the transaction had been
- committed on the master, so we lack of information to replay this
- transaction on the slave; all we can do is stop with error.
+ This can either be 4.x (then a Start_log_event_v3 is only at master
+ startup so we are sure the master has restarted and cleared his temp
+ tables; the event always has 'created'>0) or 5.0 (then we have to test
+ 'created').
*/
- if (thd->options & OPTION_BEGIN)
+ if (created)
{
- slave_print_error(rli, 0, "\
-Rolling back unfinished transaction (no COMMIT or ROLLBACK) from relay log. \
-A probable cause is that the master died while writing the transaction to its \
-binary log.");
- return(1);
+ close_temporary_tables(thd);
+ cleanup_load_tmpdir();
}
break;
- /*
+ /*
Now the older formats; in that case load_tmpdir is cleaned up by the I/O
thread.
*/
- case BINLOG_FORMAT_323_LESS_57:
- /*
- Cannot distinguish a Start_log_event generated at master startup and
- one generated by master FLUSH LOGS, so cannot be sure temp tables
- have to be dropped. So do nothing.
- */
- break;
- case BINLOG_FORMAT_323_GEQ_57:
+ case 1:
+ if (strncmp(rli->relay_log.description_event_for_exec->server_version,
+ "3.23.57",7) >= 0 && created)
+ {
+ /*
+ Can distinguish, based on the value of 'created': this event was
+ generated at master startup.
+ */
+ close_temporary_tables(thd);
+ }
/*
- Can distinguish, based on the value of 'created',
- which was generated at master startup.
+ Otherwise, can't distinguish a Start_log_event generated at
+ master startup and one generated by master FLUSH LOGS, so cannot
+ be sure temp tables have to be dropped. So do nothing.
*/
- if (created)
- close_temporary_tables(thd);
break;
default:
/* this case is impossible */
- return 1;
+ DBUG_RETURN(1);
}
-
DBUG_RETURN(Log_event::exec_event(rli));
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
-/**************************************************************************
- Load_log_event methods
-**************************************************************************/
+/***************************************************************************
+ Format_description_log_event methods
+****************************************************************************/
/*
- Load_log_event::pack_info()
+ Format_description_log_event 1st ctor.
+
+ SYNOPSIS
+ Format_description_log_event::Format_description_log_event
+ binlog_version the binlog version for which we want to build
+ an event. Can be 1 (=MySQL 3.23), 3 (=4.0.x
+ x>=2 and 4.1) or 4 (MySQL 5.0). Note that the
+ old 4.0 (binlog version 2) is not supported;
+ it should not be used for replication with
+ 5.0.
+
+ DESCRIPTION
+ Ctor. Can be used to create the event to write to the binary log (when the
+ server starts or when FLUSH LOGS), or to create artificial events to parse
+ binlogs from MySQL 3.23 or 4.x.
+ When in a client, only the 2nd use is possible.
+*/
+
+Format_description_log_event::
+Format_description_log_event(uint8 binlog_ver, const char* server_ver)
+ :Start_log_event_v3()
+{
+ created= when;
+ binlog_version= binlog_ver;
+ switch (binlog_ver) {
+ case 4: /* MySQL 5.0 */
+ memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
+ common_header_len= LOG_EVENT_HEADER_LEN;
+ number_of_event_types= LOG_EVENT_TYPES;
+ /* we'll catch my_malloc() error in is_valid() */
+ post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8),
+ MYF(MY_ZEROFILL));
+ /*
+ This long list of assignments is not beautiful, but I see no way to
+ make it nicer, as the right members are #defines, not array members, so
+ it's impossible to write a loop.
+ */
+ if (post_header_len)
+ {
+ post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
+ post_header_len[QUERY_EVENT-1]= QUERY_HEADER_LEN;
+ post_header_len[ROTATE_EVENT-1]= ROTATE_HEADER_LEN;
+ post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
+ post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
+ post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
+ post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
+ post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
+ post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
+ post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN;
+ post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= post_header_len[APPEND_BLOCK_EVENT-1];
+ post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN;
+ }
+ break;
+
+ case 1: /* 3.23 */
+ case 3: /* 4.0.x x>=2 */
+ /*
+ We build an artificial (i.e. not sent by the master) event, which
+ describes what those old master versions send.
+ */
+ if (binlog_ver==1)
+ strmov(server_version, server_ver ? server_ver : "3.23");
+ else
+ strmov(server_version, server_ver ? server_ver : "4.0");
+ common_header_len= binlog_ver==1 ? OLD_HEADER_LEN :
+ LOG_EVENT_MINIMAL_HEADER_LEN;
+ /*
+ The first new event in binlog version 4 is Format_desc. So any event type
+ after that does not exist in older versions. We use the events known by
+ version 3, even if version 1 had only a subset of them (this is not a
+ problem: it uses a few bytes for nothing but unifies code; it does not
+ make the slave detect less corruptions).
+ */
+ number_of_event_types= FORMAT_DESCRIPTION_EVENT - 1;
+ post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8),
+ MYF(0));
+ if (post_header_len)
+ {
+ post_header_len[START_EVENT_V3-1]= START_V3_HEADER_LEN;
+ post_header_len[QUERY_EVENT-1]= QUERY_HEADER_MINIMAL_LEN;
+ post_header_len[STOP_EVENT-1]= 0;
+ post_header_len[ROTATE_EVENT-1]= (binlog_ver==1) ? 0 : ROTATE_HEADER_LEN;
+ post_header_len[INTVAR_EVENT-1]= 0;
+ post_header_len[LOAD_EVENT-1]= LOAD_HEADER_LEN;
+ post_header_len[SLAVE_EVENT-1]= 0;
+ post_header_len[CREATE_FILE_EVENT-1]= CREATE_FILE_HEADER_LEN;
+ post_header_len[APPEND_BLOCK_EVENT-1]= APPEND_BLOCK_HEADER_LEN;
+ post_header_len[EXEC_LOAD_EVENT-1]= EXEC_LOAD_HEADER_LEN;
+ post_header_len[DELETE_FILE_EVENT-1]= DELETE_FILE_HEADER_LEN;
+ post_header_len[NEW_LOAD_EVENT-1]= post_header_len[LOAD_EVENT-1];
+ post_header_len[RAND_EVENT-1]= 0;
+ post_header_len[USER_VAR_EVENT-1]= 0;
+ }
+ break;
+ default: /* Includes binlog version 2 i.e. 4.0.x x<=1 */
+ post_header_len= 0; /* will make is_valid() fail */
+ break;
+ }
+}
+
+
+/*
+ The problem with this constructor is that the fixed header may have a
+ length different from this version, but we don't know this length as we
+ have not read the Format_description_log_event which says it, yet. This
+ length is in the post-header of the event, but we don't know where the
+ post-header starts.
+ So this type of event HAS to:
+ - either have the header's length at the beginning (in the header, at a
+ fixed position which will never be changed), not in the post-header. That
+ would make the header be "shifted" compared to other events.
+ - or have a header of size LOG_EVENT_MINIMAL_HEADER_LEN (19), in all future
+ versions, so that we know for sure.
+ I (Guilhem) chose the 2nd solution. Rotate has the same constraint (because
+ it is sent before Format_description_log_event).
+*/
+
+Format_description_log_event::
+Format_description_log_event(const char* buf,
+ uint event_len,
+ const
+ Format_description_log_event*
+ description_event)
+ :Start_log_event_v3(buf, description_event)
+{
+ DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)");
+ buf+= LOG_EVENT_MINIMAL_HEADER_LEN;
+ if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
+ DBUG_VOID_RETURN; /* sanity check */
+ number_of_event_types=
+ event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
+ DBUG_PRINT("info", ("common_header_len=%d number_of_event_types=%d",
+ common_header_len, number_of_event_types));
+ /* If alloc fails, we'll detect it in is_valid() */
+ post_header_len= (uint8*) my_memdup((byte*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
+ number_of_event_types*
+ sizeof(*post_header_len), MYF(0));
+ DBUG_VOID_RETURN;
+}
+
+#ifndef MYSQL_CLIENT
+bool Format_description_log_event::write(IO_CACHE* file)
+{
+ /*
+ We don't call Start_log_event_v3::write() because this would make 2
+ my_b_safe_write().
+ */
+ byte buff[FORMAT_DESCRIPTION_HEADER_LEN];
+ int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
+ memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
+ int4store(buff + ST_CREATED_OFFSET,created);
+ buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
+ memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (byte*) post_header_len,
+ LOG_EVENT_TYPES);
+ return (write_header(file, sizeof(buff)) ||
+ my_b_safe_write(file, buff, sizeof(buff)));
+}
+#endif
+
+/*
+ SYNOPSIS
+ Format_description_log_event::exec_event()
+
+ IMPLEMENTATION
+ Save the information which describes the binlog's format, to be able to
+ read all coming events.
+ Call Start_log_event_v3::exec_event().
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-void Load_log_event::pack_info(Protocol *protocol)
+int Format_description_log_event::exec_event(struct st_relay_log_info* rli)
{
- char *buf, *pos;
- uint buf_len;
+ DBUG_ENTER("Format_description_log_event::exec_event");
+
+ /* save the information describing this binlog */
+ delete rli->relay_log.description_event_for_exec;
+ rli->relay_log.description_event_for_exec= this;
- buf_len=
+#ifdef USING_TRANSACTIONS
+ /*
+ As a transaction NEVER spans on 2 or more binlogs:
+ if we have an active transaction at this point, the master died
+ while writing the transaction to the binary log, i.e. while
+ flushing the binlog cache to the binlog. As the write was started,
+ the transaction had been committed on the master, so we lack of
+ information to replay this transaction on the slave; all we can do
+ is stop with error.
+ Note: this event could be sent by the master to inform us of the
+ format of its binlog; in other words maybe it is not at its
+ original place when it comes to us; we'll know this by checking
+ log_pos ("artificial" events have log_pos == 0).
+ */
+ if (!artificial_event && created && thd->transaction.all.nht)
+ {
+ slave_print_error(rli, 0, "Rolling back unfinished transaction (no "
+ "COMMIT or ROLLBACK) from relay log. A probable cause "
+ "is that the master died while writing the transaction "
+ "to its binary log.");
+ end_trans(thd, ROLLBACK);
+ }
+#endif
+ /*
+ If this event comes from ourselves, there is no cleaning task to perform,
+ we don't call Start_log_event_v3::exec_event() (this was just to update the
+ log's description event).
+ */
+ if (server_id == (uint32) ::server_id)
+ {
+ /*
+ Do not modify rli->group_master_log_pos, as this event did not exist on
+ the master. That is, just update the *relay log* coordinates; this is
+ done by passing log_pos=0 to inc_group_relay_log_pos, like we do in
+ Stop_log_event::exec_event().
+ If in a transaction, don't touch group_* coordinates.
+ */
+ if (thd->options & OPTION_BEGIN)
+ rli->inc_event_relay_log_pos();
+ else
+ {
+ rli->inc_group_relay_log_pos(0);
+ flush_relay_log_info(rli);
+ }
+ DBUG_RETURN(0);
+ }
+
+ /*
+ If the event was not requested by the slave i.e. the master sent it while
+ the slave asked for a position >4, the event will make
+ rli->group_master_log_pos advance. Say that the slave asked for position
+ 1000, and the Format_desc event's end is 96. Then in the beginning of
+ replication rli->group_master_log_pos will be 0, then 96, then jump to
+ first really asked event (which is >96). So this is ok.
+ */
+ DBUG_RETURN(Start_log_event_v3::exec_event(rli));
+}
+#endif
+
+ /**************************************************************************
+ Load_log_event methods
+ General note about Load_log_event: the binlogging of LOAD DATA INFILE is
+ going to be changed in 5.0 (or maybe in 5.1; not decided yet).
+ However, the 5.0 slave could still have to read such events (from a 4.x
+ master), convert them (which just means maybe expand the header, when 5.0
+ servers have a UID in events) (remember that whatever is after the header
+ will be like in 4.x, as this event's format is not modified in 5.0 as we
+ will use new types of events to log the new LOAD DATA INFILE features).
+ To be able to read/convert, we just need to not assume that the common
+ header is of length LOG_EVENT_HEADER_LEN (we must use the description
+ event).
+ Note that I (Guilhem) manually tested replication of a big LOAD DATA INFILE
+ between 3.23 and 5.0, and between 4.0 and 5.0, and it works fine (and the
+ positions displayed in SHOW SLAVE STATUS then are fine too).
+ **************************************************************************/
+
+/*
+ Load_log_event::pack_info()
+*/
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+uint Load_log_event::get_query_buffer_length()
+{
+ return
5 + db_len + 3 + // "use DB; "
18 + fname_len + 2 + // "LOAD DATA INFILE 'file''"
7 + // LOCAL
@@ -1297,14 +2196,18 @@ void Load_log_event::pack_info(Protocol *protocol)
23 + sql_ex.enclosed_len*4 + 2 + // " OPTIONALLY ENCLOSED BY 'str'"
12 + sql_ex.escaped_len*4 + 2 + // " ESCAPED BY 'str'"
21 + sql_ex.line_term_len*4 + 2 + // " FIELDS TERMINATED BY 'str'"
- 19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
- 15 + 22 + // " IGNORE xxx LINES"
+ 19 + sql_ex.line_start_len*4 + 2 + // " LINES STARTING BY 'str'"
+ 15 + 22 + // " IGNORE xxx LINES"
3 + (num_fields-1)*2 + field_block_len; // " (field1, field2, ...)"
+}
- if (!(buf= my_malloc(buf_len, MYF(MY_WME))))
- return;
- pos= buf;
- if (db && db_len)
+
+void Load_log_event::print_query(bool need_db, char *buf,
+ char **end, char **fn_start, char **fn_end)
+{
+ char *pos= buf;
+
+ if (need_db && db && db_len)
{
pos= strmov(pos, "use `");
memcpy(pos, db, db_len);
@@ -1312,6 +2215,10 @@ void Load_log_event::pack_info(Protocol *protocol)
}
pos= strmov(pos, "LOAD DATA ");
+
+ if (fn_start)
+ *fn_start= pos;
+
if (check_fname_outside_temp_buf())
pos= strmov(pos, "LOCAL ");
pos= strmov(pos, "INFILE '");
@@ -1323,7 +2230,12 @@ void Load_log_event::pack_info(Protocol *protocol)
else if (sql_ex.opt_flags & IGNORE_FLAG)
pos= strmov(pos, " IGNORE ");
- pos= strmov(pos ,"INTO TABLE `");
+ pos= strmov(pos ,"INTO");
+
+ if (fn_end)
+ *fn_end= pos;
+
+ pos= strmov(pos ," TABLE `");
memcpy(pos, table_name, table_name_len);
pos+= table_name_len;
@@ -1372,17 +2284,30 @@ void Load_log_event::pack_info(Protocol *protocol)
*pos++= ')';
}
- protocol->store(buf, pos-buf, &my_charset_bin);
+ *end= pos;
+}
+
+
+void Load_log_event::pack_info(Protocol *protocol)
+{
+ char *buf, *end;
+
+ if (!(buf= my_malloc(get_query_buffer_length(), MYF(MY_WME))))
+ return;
+ print_query(TRUE, buf, &end, 0, 0);
+ protocol->store(buf, end-buf, &my_charset_bin);
my_free(buf, MYF(0));
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
+#ifndef MYSQL_CLIENT
+
/*
Load_log_event::write_data_header()
*/
-int Load_log_event::write_data_header(IO_CACHE* file)
+bool Load_log_event::write_data_header(IO_CACHE* file)
{
char buf[LOAD_HEADER_LEN];
int4store(buf + L_THREAD_ID_OFFSET, slave_proxy_id);
@@ -1391,7 +2316,7 @@ int Load_log_event::write_data_header(IO_CACHE* file)
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
- return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN);
+ return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN) != 0;
}
@@ -1399,7 +2324,7 @@ int Load_log_event::write_data_header(IO_CACHE* file)
Load_log_event::write_data_body()
*/
-int Load_log_event::write_data_body(IO_CACHE* file)
+bool Load_log_event::write_data_body(IO_CACHE* file)
{
if (sql_ex.write_data(file))
return 1;
@@ -1419,7 +2344,6 @@ int Load_log_event::write_data_body(IO_CACHE* file)
Load_log_event::Load_log_event()
*/
-#ifndef MYSQL_CLIENT
Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
const char *db_arg, const char *table_name_arg,
List<Item> &fields_arg,
@@ -1503,6 +2427,7 @@ Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
}
#endif /* !MYSQL_CLIENT */
+
/*
Load_log_event::Load_log_event()
@@ -1511,15 +2436,25 @@ Load_log_event::Load_log_event(THD *thd_arg, sql_exchange *ex,
constructed event.
*/
-Load_log_event::Load_log_event(const char *buf, int event_len,
- bool old_format)
- :Log_event(buf, old_format), num_fields(0), fields(0),
- field_lens(0), field_block_len(0),
+Load_log_event::Load_log_event(const char *buf, uint event_len,
+ const Format_description_log_event *description_event)
+ :Log_event(buf, description_event), num_fields(0), fields(0),
+ field_lens(0),field_block_len(0),
table_name(0), db(0), fname(0), local_fname(FALSE)
{
DBUG_ENTER("Load_log_event");
- if (event_len) // derived class, will call copy_log_event() itself
- copy_log_event(buf, event_len, old_format);
+ /*
+ I (Guilhem) manually tested replication of LOAD DATA INFILE for 3.23->5.0,
+ 4.0->5.0 and 5.0->5.0 and it works.
+ */
+ if (event_len)
+ copy_log_event(buf, event_len,
+ ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ LOAD_HEADER_LEN +
+ description_event->common_header_len :
+ LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
+ description_event);
+ /* otherwise it's a derived class, will call copy_log_event() itself */
DBUG_VOID_RETURN;
}
@@ -1529,14 +2464,14 @@ Load_log_event::Load_log_event(const char *buf, int event_len,
*/
int Load_log_event::copy_log_event(const char *buf, ulong event_len,
- bool old_format)
+ int body_offset,
+ const Format_description_log_event *description_event)
{
+ DBUG_ENTER("Load_log_event::copy_log_event");
uint data_len;
char* buf_end = (char*)buf + event_len;
- uint header_len= old_format ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
- const char* data_head = buf + header_len;
- DBUG_ENTER("Load_log_event::copy_log_event");
-
+ /* this is the beginning of the post-header */
+ const char* data_head = buf + description_event->common_header_len;
slave_proxy_id= thread_id= uint4korr(data_head + L_THREAD_ID_OFFSET);
exec_time = uint4korr(data_head + L_EXEC_TIME_OFFSET);
skip_lines = uint4korr(data_head + L_SKIP_LINES_OFFSET);
@@ -1544,21 +2479,17 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
db_len = (uint)data_head[L_DB_LEN_OFFSET];
num_fields = uint4korr(data_head + L_NUM_FIELDS_OFFSET);
- int body_offset = ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
- LOAD_HEADER_LEN + header_len :
- get_data_body_offset());
-
if ((int) event_len < body_offset)
DBUG_RETURN(1);
/*
Sql_ex.init() on success returns the pointer to the first byte after
the sql_ex structure, which is the start of field lengths array.
*/
- if (!(field_lens=(uchar*)sql_ex.init((char*)buf + body_offset,
- buf_end,
- buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
+ if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset,
+ buf_end,
+ buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
DBUG_RETURN(1);
-
+
data_len = event_len - body_offset;
if (num_fields > data_len) // simple sanity check against corruption
DBUG_RETURN(1);
@@ -1571,6 +2502,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
fname = db + db_len + 1;
fname_len = strlen(fname);
// null termination is accomplished by the caller doing buf[event_len]=0
+
DBUG_RETURN(0);
}
@@ -1580,13 +2512,13 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
*/
#ifdef MYSQL_CLIENT
-void Load_log_event::print(FILE* file, bool short_form, char* last_db)
+void Load_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
{
- print(file, short_form, last_db, 0);
+ print(file, short_form, last_event_info, 0);
}
-void Load_log_event::print(FILE* file, bool short_form, char* last_db,
+void Load_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info,
bool commented)
{
DBUG_ENTER("Load_log_event::print");
@@ -1598,7 +2530,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db,
}
bool different_db= 1;
- if (db && last_db)
+ if (db)
{
/*
If the database is different from the one of the previous statement, we
@@ -1606,9 +2538,9 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db,
But if commented, the "use" is going to be commented so we should not
update the last_db.
*/
- if ((different_db= memcmp(last_db, db, db_len + 1)) &&
+ if ((different_db= memcmp(last_event_info->db, db, db_len + 1)) &&
!commented)
- memcpy(last_db, db, db_len + 1);
+ memcpy(last_event_info->db, db, db_len + 1);
}
if (db && db[0] && different_db)
@@ -1730,7 +2662,6 @@ void Load_log_event::set_fields(const char* affected_db,
int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
bool use_rli_only_for_errors)
{
- char *load_data_query= 0;
thd->db_length= db_len;
thd->db= (char*) rewrite_db(db, &thd->db_length);
DBUG_ASSERT(thd->query == 0);
@@ -1744,15 +2675,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
mysql_init_query(thd, 0, 0);
if (!use_rli_only_for_errors)
{
-#if MYSQL_VERSION_ID < 50000
- rli->future_group_master_log_pos= log_pos + get_event_len() -
- (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
-#else
+ /* Saved for InnoDB, see comment in Query_log_event::exec_event() */
rli->future_group_master_log_pos= log_pos;
-#endif
+ DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
}
-
- /*
+
+ /*
We test replicate_*_db rules. Note that we have already prepared the file
to load, even if we are going to ignore and delete it now. So it is
possible that we did a lot of disk writes for nothing. In other words, a
@@ -1768,7 +2696,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
{
thd->set_time((time_t)when);
VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query_id = query_id++;
+ thd->query_id = next_query_id();
VOID(pthread_mutex_unlock(&LOCK_thread_count));
/*
Initing thd->row_count is not necessary in theory as this variable has no
@@ -1776,12 +2704,12 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
"data truncated" warning but which is absorbed and never gets to the
error log); still we init it to avoid a Valgrind message.
*/
- mysql_reset_errors(thd);
+ mysql_reset_errors(thd, 0);
TABLE_LIST tables;
bzero((char*) &tables,sizeof(tables));
tables.db = thd->db;
- tables.alias = tables.real_name = (char*)table_name;
+ tables.alias = tables.table_name = (char*) table_name;
tables.lock_type = TL_WRITE;
tables.updating= 1;
@@ -1795,21 +2723,30 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
else
{
char llbuff[22];
+ char *end;
enum enum_duplicates handle_dup;
bool ignore= 0;
+ char *load_data_query;
+
/*
- Make a simplified LOAD DATA INFILE query, for the information of the
- user in SHOW PROCESSLIST. Note that db is known in the 'db' column.
+ Forge LOAD DATA INFILE query which will be used in SHOW PROCESS LIST
+ and written to slave's binlog if binlogging is on.
*/
- if ((load_data_query= (char *) my_alloca(18 + strlen(fname) + 14 +
- strlen(tables.real_name) + 8)))
+ if (!(load_data_query= (char *)thd->alloc(get_query_buffer_length() + 1)))
{
- thd->query_length= (uint)(strxmov(load_data_query,
- "LOAD DATA INFILE '", fname,
- "' INTO TABLE `", tables.real_name,
- "` <...>", NullS) - load_data_query);
- thd->query= load_data_query;
+ /*
+ This will set thd->fatal_error in case of OOM. So we surely will notice
+ that something is wrong.
+ */
+ goto error;
}
+
+ print_query(FALSE, load_data_query, &end, (char **)&thd->lex->fname_start,
+ (char **)&thd->lex->fname_end);
+ *end= 0;
+ thd->query_length= end - load_data_query;
+ thd->query= load_data_query;
+
if (sql_ex.opt_flags & REPLACE_FLAG)
handle_dup= DUP_REPLACE;
else if (sql_ex.opt_flags & IGNORE_FLAG)
@@ -1855,6 +2792,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
List<Item> field_list;
set_fields(thd->db,field_list);
thd->variables.pseudo_thread_id= thread_id;
+ List<Item> set_fields;
if (net)
{
// mysql_load will use thd->net to read the file
@@ -1864,9 +2802,13 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
*/
thd->net.pkt_nr = net->pkt_nr;
}
- if (mysql_load(thd, &ex, &tables, field_list, handle_dup, ignore, net != 0,
- TL_WRITE))
- thd->query_error = 1;
+ /*
+ It is safe to use set_fields twice because we are not going to
+ update it inside mysql_load().
+ */
+ if (mysql_load(thd, &ex, &tables, field_list, set_fields, set_fields,
+ handle_dup, ignore, net != 0))
+ thd->query_error= 1;
if (thd->cuted_fields)
{
/* log_pos is the position of the LOAD event in the master log */
@@ -1892,20 +2834,19 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
if (net)
skip_load_data_infile(net);
}
-
+
+error:
thd->net.vio = 0;
char *save_db= thd->db;
VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->db= 0;
+ thd->db= thd->catalog= 0;
thd->query= 0;
thd->query_length= thd->db_length= 0;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
close_thread_tables(thd);
- if (load_data_query)
- my_afree(load_data_query);
if (thd->query_error)
{
- /* this err/sql_errno code is copy-paste from send_error() */
+ /* this err/sql_errno code is copy-paste from net_send_error() */
const char *err;
int sql_errno;
if ((err=thd->net.last_error)[0])
@@ -1963,17 +2904,17 @@ void Rotate_log_event::pack_info(Protocol *protocol)
*/
#ifdef MYSQL_CLIENT
-void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
+void Rotate_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
{
char buf[22];
+
if (short_form)
return;
-
print_header(file);
fprintf(file, "\tRotate to ");
if (new_log_ident)
my_fwrite(file, (byte*) new_log_ident, (uint)ident_len,
- MYF(MY_NABP | MY_WME));
+ MYF(MY_NABP | MY_WME));
fprintf(file, " pos: %s", llstr(pos, buf));
fputc('\n', file);
fflush(file);
@@ -1985,31 +2926,22 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
Rotate_log_event::Rotate_log_event()
*/
-Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
- bool old_format)
- :Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
+Rotate_log_event::Rotate_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event) ,new_log_ident(NULL),alloced(0)
{
+ DBUG_ENTER("Rotate_log_event::Rotate_log_event(char*,...)");
// The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
- int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
+ uint8 header_size= description_event->common_header_len;
+ uint8 post_header_len= description_event->post_header_len[ROTATE_EVENT-1];
uint ident_offset;
- DBUG_ENTER("Rotate_log_event");
-
if (event_len < header_size)
DBUG_VOID_RETURN;
-
buf += header_size;
- if (old_format)
- {
- ident_len = (uint)(event_len - OLD_HEADER_LEN);
- pos = 4;
- ident_offset = 0;
- }
- else
- {
- ident_len = (uint)(event_len - ROTATE_EVENT_OVERHEAD);
- pos = uint8korr(buf + R_POS_OFFSET);
- ident_offset = ROTATE_HEADER_LEN;
- }
+ pos = post_header_len ? uint8korr(buf + R_POS_OFFSET) : 4;
+ ident_len = (uint)(event_len -
+ (header_size+post_header_len));
+ ident_offset = post_header_len;
set_if_smaller(ident_len,FN_REFLEN-1);
if (!(new_log_ident= my_strdup_with_length((byte*) buf +
ident_offset,
@@ -2022,29 +2954,32 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
/*
- Rotate_log_event::write_data()
+ Rotate_log_event::write()
*/
-int Rotate_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool Rotate_log_event::write(IO_CACHE* file)
{
char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos);
- return (my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
- my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len));
+ return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
+ my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
+ my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len));
}
-
+#endif
/*
Rotate_log_event::exec_event()
- Got a rotate log even from the master
+ Got a rotate log event from the master
IMPLEMENTATION
This is mainly used so that we can later figure out the logname and
position for the master.
- We can't rotate the slave as this will cause infinitive rotations
+ We can't rotate the slave's BINlog as this will cause infinitive rotations
in a A -> B -> A setup.
+ The NOTES below is a wrong comment which will disappear when 4.1 is merged.
RETURN VALUES
0 ok
@@ -2056,7 +2991,7 @@ int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
DBUG_ENTER("Rotate_log_event::exec_event");
pthread_mutex_lock(&rli->data_lock);
- rli->event_relay_log_pos += get_event_len();
+ rli->event_relay_log_pos= my_b_tell(rli->cur_log);
/*
If we are in a transaction: the only normal case is when the I/O thread was
copying a big transaction, then it was stopped and restarted: we have this
@@ -2068,15 +3003,31 @@ int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
COMMIT or ROLLBACK
In that case, we don't want to touch the coordinates which correspond to
the beginning of the transaction.
+ Starting from 5.0.0, there also are some rotates from the slave itself, in
+ the relay log.
*/
if (!(thd->options & OPTION_BEGIN))
{
memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
rli->notify_group_master_log_name_update();
- rli->group_master_log_pos = pos;
- rli->group_relay_log_pos = rli->event_relay_log_pos;
- DBUG_PRINT("info", ("group_master_log_pos: %lu",
+ rli->group_master_log_pos= pos;
+ rli->group_relay_log_pos= rli->event_relay_log_pos;
+ DBUG_PRINT("info", ("group_master_log_name: '%s' group_master_log_pos:\
+%lu",
+ rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
+ /*
+ Reset thd->options and sql_mode etc, because this could be the signal of
+ a master's downgrade from 5.0 to 4.0.
+ However, no need to reset description_event_for_exec: indeed, if the next
+ master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
+ master is 4.0 then the events are in the slave's format (conversion).
+ */
+ set_slave_thread_options(thd);
+ set_slave_thread_default_charset(thd, rli);
+ thd->variables.sql_mode= global_system_variables.sql_mode;
+ thd->variables.auto_increment_increment=
+ thd->variables.auto_increment_offset= 1;
}
pthread_mutex_unlock(&rli->data_lock);
pthread_cond_broadcast(&rli->data_cond);
@@ -2110,12 +3061,13 @@ void Intvar_log_event::pack_info(Protocol *protocol)
Intvar_log_event::Intvar_log_event()
*/
-Intvar_log_event::Intvar_log_event(const char* buf, bool old_format)
- :Log_event(buf, old_format)
+Intvar_log_event::Intvar_log_event(const char* buf,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
{
- buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
- type = buf[I_TYPE_OFFSET];
- val = uint8korr(buf+I_VAL_OFFSET);
+ buf+= description_event->common_header_len;
+ type= buf[I_TYPE_OFFSET];
+ val= uint8korr(buf+I_VAL_OFFSET);
}
@@ -2134,16 +3086,19 @@ const char* Intvar_log_event::get_var_type_name()
/*
- Intvar_log_event::write_data()
+ Intvar_log_event::write()
*/
-int Intvar_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool Intvar_log_event::write(IO_CACHE* file)
{
- char buf[9];
- buf[I_TYPE_OFFSET] = type;
+ byte buf[9];
+ buf[I_TYPE_OFFSET]= (byte) type;
int8store(buf + I_VAL_OFFSET, val);
- return my_b_safe_write(file, (byte*) buf, sizeof(buf));
+ return (write_header(file, sizeof(buf)) ||
+ my_b_safe_write(file, buf, sizeof(buf)));
}
+#endif
/*
@@ -2151,7 +3106,8 @@ int Intvar_log_event::write_data(IO_CACHE* file)
*/
#ifdef MYSQL_CLIENT
-void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
+void Intvar_log_event::print(FILE* file, bool short_form,
+ LAST_EVENT_INFO* last_event_info)
{
char llbuff[22];
const char *msg;
@@ -2194,7 +3150,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
thd->next_insert_id = val;
break;
}
- rli->inc_event_relay_log_pos(get_event_len());
+ rli->inc_event_relay_log_pos();
return 0;
}
#endif
@@ -2217,26 +3173,30 @@ void Rand_log_event::pack_info(Protocol *protocol)
#endif
-Rand_log_event::Rand_log_event(const char* buf, bool old_format)
- :Log_event(buf, old_format)
+Rand_log_event::Rand_log_event(const char* buf,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
{
- buf += (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
- seed1 = uint8korr(buf+RAND_SEED1_OFFSET);
- seed2 = uint8korr(buf+RAND_SEED2_OFFSET);
+ buf+= description_event->common_header_len;
+ seed1= uint8korr(buf+RAND_SEED1_OFFSET);
+ seed2= uint8korr(buf+RAND_SEED2_OFFSET);
}
-int Rand_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool Rand_log_event::write(IO_CACHE* file)
{
- char buf[16];
+ byte buf[16];
int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2);
- return my_b_safe_write(file, (byte*) buf, sizeof(buf));
+ return (write_header(file, sizeof(buf)) ||
+ my_b_safe_write(file, buf, sizeof(buf)));
}
+#endif
#ifdef MYSQL_CLIENT
-void Rand_log_event::print(FILE* file, bool short_form, char* last_db)
+void Rand_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
{
char llbuff[22],llbuff2[22];
if (!short_form)
@@ -2256,13 +3216,83 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli)
{
thd->rand.seed1= (ulong) seed1;
thd->rand.seed2= (ulong) seed2;
- rli->inc_event_relay_log_pos(get_event_len());
+ rli->inc_event_relay_log_pos();
return 0;
}
#endif /* !MYSQL_CLIENT */
/**************************************************************************
+ Xid_log_event methods
+**************************************************************************/
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+void Xid_log_event::pack_info(Protocol *protocol)
+{
+ char buf[128], *pos;
+ pos= strmov(buf, "COMMIT /* xid=");
+ pos= longlong10_to_str(xid, pos, 10);
+ pos= strmov(pos, " */");
+ protocol->store(buf, (uint) (pos-buf), &my_charset_bin);
+}
+#endif
+
+/*
+ NOTE it's ok not to use int8store here,
+ as long as xid_t::set(ulonglong) and
+ xid_t::get_my_xid doesn't do it either
+
+ we don't care about actual values of xids as long as
+ identical numbers compare identically
+*/
+
+Xid_log_event::
+Xid_log_event(const char* buf,
+ const Format_description_log_event *description_event)
+ :Log_event(buf, description_event)
+{
+ buf+= description_event->common_header_len;
+ memcpy((char*) &xid, buf, sizeof(xid));
+}
+
+
+#ifndef MYSQL_CLIENT
+bool Xid_log_event::write(IO_CACHE* file)
+{
+ return write_header(file, sizeof(xid)) ||
+ my_b_safe_write(file, (byte*) &xid, sizeof(xid));
+}
+#endif
+
+
+#ifdef MYSQL_CLIENT
+void Xid_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
+{
+ if (!short_form)
+ {
+ char buf[64];
+ longlong10_to_str(xid, buf, 10);
+
+ print_header(file);
+ fprintf(file, "\tXid = %s\n", buf);
+ fflush(file);
+ }
+ fprintf(file, "COMMIT;\n");
+}
+#endif /* MYSQL_CLIENT */
+
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+int Xid_log_event::exec_event(struct st_relay_log_info* rli)
+{
+ /* For a slave Xid_log_event is COMMIT */
+ mysql_log.write(thd,COM_QUERY,"COMMIT /* implicit, from Xid_log_event */");
+ return end_trans(thd, COMMIT) || Log_event::exec_event(rli);
+}
+#endif /* !MYSQL_CLIENT */
+
+
+/**************************************************************************
User_var_log_event methods
**************************************************************************/
@@ -2293,6 +3323,16 @@ void User_var_log_event::pack_info(Protocol* protocol)
buf= my_malloc(val_offset + 22, MYF(MY_WME));
event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
break;
+ case DECIMAL_RESULT:
+ {
+ buf= my_malloc(val_offset + DECIMAL_MAX_STR_LENGTH, MYF(MY_WME));
+ String str(buf+val_offset, DECIMAL_MAX_STR_LENGTH, &my_charset_bin);
+ my_decimal dec;
+ binary2my_decimal(E_DEC_FATAL_ERROR, val+2, &dec, val[0], val[1]);
+ my_decimal2string(E_DEC_FATAL_ERROR, &dec, 0, 0, 0, &str);
+ event_len= str.length() + val_offset;
+ break;
+ }
case STRING_RESULT:
/* 15 is for 'COLLATE' and other chars */
buf= my_malloc(event_len+val_len*2+1+2*MY_CS_NAME_SIZE+15, MYF(MY_WME));
@@ -2327,10 +3367,12 @@ void User_var_log_event::pack_info(Protocol* protocol)
#endif /* !MYSQL_CLIENT */
-User_var_log_event::User_var_log_event(const char* buf, bool old_format)
- :Log_event(buf, old_format)
+User_var_log_event::
+User_var_log_event(const char* buf,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event)
{
- buf+= (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
+ buf+= description_event->common_header_len;
name_len= uint4korr(buf);
name= (char *) buf + UV_NAME_LEN_SIZE;
buf+= UV_NAME_LEN_SIZE + name_len;
@@ -2354,13 +3396,15 @@ User_var_log_event::User_var_log_event(const char* buf, bool old_format)
}
-int User_var_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool User_var_log_event::write(IO_CACHE* file)
{
char buf[UV_NAME_LEN_SIZE];
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
- char buf2[8], *pos= buf2;
+ char buf2[max(8, DECIMAL_MAX_FIELD_SIZE + 2)], *pos= buf2;
uint buf1_length;
+ ulong event_length;
int4store(buf, name_len);
@@ -2373,8 +3417,6 @@ int User_var_log_event::write_data(IO_CACHE* file)
{
buf1[1]= type;
int4store(buf1 + 2, charset_number);
- int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
- buf1_length= 10;
switch (type) {
case REAL_RESULT:
@@ -2383,6 +3425,16 @@ int User_var_log_event::write_data(IO_CACHE* file)
case INT_RESULT:
int8store(buf2, *(longlong*) val);
break;
+ case DECIMAL_RESULT:
+ {
+ my_decimal *dec= (my_decimal *)val;
+ dec->fix_buffer_pointer();
+ buf2[0]= (char)(dec->intg + dec->frac);
+ buf2[1]= (char)dec->frac;
+ decimal2bin((decimal_t*)val, buf2+2, buf2[0], buf2[1]);
+ val_len= decimal_bin_size(buf2[0], buf2[1]) + 2;
+ break;
+ }
case STRING_RESULT:
pos= val;
break;
@@ -2391,12 +3443,20 @@ int User_var_log_event::write_data(IO_CACHE* file)
DBUG_ASSERT(1);
return 0;
}
+ int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
+ buf1_length= 10;
}
- return (my_b_safe_write(file, (byte*) buf, sizeof(buf)) ||
+
+ /* Length of the whole event */
+ event_length= sizeof(buf)+ name_len + buf1_length + val_len;
+
+ return (write_header(file, event_length) ||
+ my_b_safe_write(file, (byte*) buf, sizeof(buf)) ||
my_b_safe_write(file, (byte*) name, name_len) ||
my_b_safe_write(file, (byte*) buf1, buf1_length) ||
my_b_safe_write(file, (byte*) pos, val_len));
}
+#endif
/*
@@ -2404,7 +3464,7 @@ int User_var_log_event::write_data(IO_CACHE* file)
*/
#ifdef MYSQL_CLIENT
-void User_var_log_event::print(FILE* file, bool short_form, char* last_db)
+void User_var_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
{
if (!short_form)
{
@@ -2433,6 +3493,23 @@ void User_var_log_event::print(FILE* file, bool short_form, char* last_db)
longlong10_to_str(uint8korr(val), int_buf, -10);
fprintf(file, ":=%s;\n", int_buf);
break;
+ case DECIMAL_RESULT:
+ {
+ char str_buf[200];
+ int str_len= sizeof(str_buf) - 1;
+ int precision= (int)val[0];
+ int scale= (int)val[1];
+ decimal_digit_t dec_buf[10];
+ decimal_t dec;
+ dec.len= 10;
+ dec.buf= dec_buf;
+
+ bin2decimal(val+2, &dec, precision, scale);
+ decimal2string(&dec, str_buf, &str_len, 0, 0, 0);
+ str_buf[str_len]= 0;
+ fprintf(file, ":=%s;\n",str_buf);
+ break;
+ }
case STRING_RESULT:
{
/*
@@ -2509,7 +3586,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
switch (type) {
case REAL_RESULT:
float8get(real_val, val);
- it= new Item_real(real_val);
+ it= new Item_float(real_val);
val= (char*) &real_val; // Pointer to value in native format
val_len= 8;
break;
@@ -2519,6 +3596,14 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
val= (char*) &int_val; // Pointer to value in native format
val_len= 8;
break;
+ case DECIMAL_RESULT:
+ {
+ Item_decimal *dec= new Item_decimal(val+2, val[0], val[1]);
+ it= dec;
+ val= (char *)dec->val_decimal(NULL);
+ val_len= sizeof(my_decimal);
+ break;
+ }
case STRING_RESULT:
it= new Item_string(val, val_len, charset);
break;
@@ -2542,7 +3627,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT);
free_root(thd->mem_root,0);
- rli->inc_event_relay_log_pos(get_event_len());
+ rli->inc_event_relay_log_pos();
return 0;
}
#endif /* !MYSQL_CLIENT */
@@ -2554,7 +3639,7 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
#ifdef HAVE_REPLICATION
#ifdef MYSQL_CLIENT
-void Unknown_log_event::print(FILE* file, bool short_form, char* last_db)
+void Unknown_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
{
if (short_form)
return;
@@ -2584,12 +3669,12 @@ void Slave_log_event::pack_info(Protocol *protocol)
#ifndef MYSQL_CLIENT
Slave_log_event::Slave_log_event(THD* thd_arg,
struct st_relay_log_info* rli)
- :Log_event(thd_arg, 0, 0), mem_pool(0), master_host(0)
+ :Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
{
DBUG_ENTER("Slave_log_event");
if (!rli->inited) // QQ When can this happen ?
DBUG_VOID_RETURN;
-
+
MASTER_INFO* mi = rli->mi;
// TODO: re-write this better without holding both locks at the same time
pthread_mutex_lock(&mi->data_lock);
@@ -2625,7 +3710,7 @@ Slave_log_event::~Slave_log_event()
#ifdef MYSQL_CLIENT
-void Slave_log_event::print(FILE* file, bool short_form, char* last_db)
+void Slave_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
{
char llbuff[22];
if (short_form)
@@ -2645,13 +3730,18 @@ int Slave_log_event::get_data_size()
}
-int Slave_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool Slave_log_event::write(IO_CACHE* file)
{
+ ulong event_length= get_data_size();
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
// log and host are already there
- return my_b_safe_write(file, (byte*)mem_pool, get_data_size());
+
+ return (write_header(file, event_length) ||
+ my_b_safe_write(file, (byte*) mem_pool, event_length));
}
+#endif
void Slave_log_event::init_from_mem_pool(int data_size)
@@ -2671,12 +3761,13 @@ void Slave_log_event::init_from_mem_pool(int data_size)
}
-Slave_log_event::Slave_log_event(const char* buf, int event_len)
- :Log_event(buf,0),mem_pool(0),master_host(0)
+/* This code is not used, so has not been updated to be format-tolerant */
+Slave_log_event::Slave_log_event(const char* buf, uint event_len)
+ :Log_event(buf,0) /*unused event*/ ,mem_pool(0),master_host(0)
{
- event_len -= LOG_EVENT_HEADER_LEN;
- if (event_len < 0)
+ if (event_len < LOG_EVENT_HEADER_LEN)
return;
+ event_len -= LOG_EVENT_HEADER_LEN;
if (!(mem_pool = (char*) my_malloc(event_len + 1, MYF(MY_WME))))
return;
memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
@@ -2704,7 +3795,7 @@ int Slave_log_event::exec_event(struct st_relay_log_info* rli)
*/
#ifdef MYSQL_CLIENT
-void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
+void Stop_log_event::print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info)
{
if (short_form)
return;
@@ -2719,14 +3810,14 @@ void Stop_log_event::print(FILE* file, bool short_form, char* last_db)
/*
Stop_log_event::exec_event()
- The master stopped.
+ The master stopped.
We used to clean up all temporary tables but this is useless as, as the
- master has shut down properly, it has written all DROP TEMPORARY TABLE and DO
- RELEASE_LOCK (prepared statements' deletion is TODO).
+ master has shut down properly, it has written all DROP TEMPORARY TABLE
+ (prepared statements' deletion is TODO only when we binlog prep stmts).
We used to clean up slave_load_tmpdir, but this is useless as it has been
cleared at the end of LOAD DATA INFILE.
So we have nothing to do here.
- The place were we must do this cleaning is in Start_log_event::exec_event(),
+ The place were we must do this cleaning is in Start_log_event_v3::exec_event(),
not here. Because if we come here, the master was sane.
*/
@@ -2740,8 +3831,13 @@ int Stop_log_event::exec_event(struct st_relay_log_info* rli)
could give false triggers in MASTER_POS_WAIT() that we have reached
the target position when in fact we have not.
*/
- rli->inc_group_relay_log_pos(get_event_len(), 0);
- flush_relay_log_info(rli);
+ if (thd->options & OPTION_BEGIN)
+ rli->inc_event_relay_log_pos();
+ else
+ {
+ rli->inc_group_relay_log_pos(0);
+ flush_relay_log_info(rli);
+ }
return 0;
}
#endif /* !MYSQL_CLIENT */
@@ -2772,20 +3868,19 @@ Create_file_log_event(THD* thd_arg, sql_exchange* ex,
sql_ex.force_new_format();
DBUG_VOID_RETURN;
}
-#endif /* !MYSQL_CLIENT */
/*
Create_file_log_event::write_data_body()
*/
-int Create_file_log_event::write_data_body(IO_CACHE* file)
+bool Create_file_log_event::write_data_body(IO_CACHE* file)
{
- int res;
- if ((res = Load_log_event::write_data_body(file)) || fake_base)
+ bool res;
+ if ((res= Load_log_event::write_data_body(file)) || fake_base)
return res;
return (my_b_safe_write(file, (byte*) "", 1) ||
- my_b_safe_write(file, (byte*) block, block_len));
+ my_b_safe_write(file, (byte*) block, block_len));
}
@@ -2793,14 +3888,14 @@ int Create_file_log_event::write_data_body(IO_CACHE* file)
Create_file_log_event::write_data_header()
*/
-int Create_file_log_event::write_data_header(IO_CACHE* file)
+bool Create_file_log_event::write_data_header(IO_CACHE* file)
{
- int res;
- if ((res = Load_log_event::write_data_header(file)) || fake_base)
- return res;
+ bool res;
byte buf[CREATE_FILE_HEADER_LEN];
+ if ((res= Load_log_event::write_data_header(file)) || fake_base)
+ return res;
int4store(buf + CF_FILE_ID_OFFSET, file_id);
- return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN);
+ return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
}
@@ -2808,42 +3903,58 @@ int Create_file_log_event::write_data_header(IO_CACHE* file)
Create_file_log_event::write_base()
*/
-int Create_file_log_event::write_base(IO_CACHE* file)
+bool Create_file_log_event::write_base(IO_CACHE* file)
{
- int res;
- fake_base = 1; // pretend we are Load event
- res = write(file);
- fake_base = 0;
+ bool res;
+ fake_base= 1; // pretend we are Load event
+ res= write(file);
+ fake_base= 0;
return res;
}
+#endif /* !MYSQL_CLIENT */
/*
Create_file_log_event ctor
*/
-Create_file_log_event::Create_file_log_event(const char* buf, int len,
- bool old_format)
- :Load_log_event(buf,0,old_format),fake_base(0),block(0),inited_from_old(0)
+Create_file_log_event::Create_file_log_event(const char* buf, uint len,
+ const Format_description_log_event* description_event)
+ :Load_log_event(buf,0,description_event),fake_base(0),block(0),inited_from_old(0)
{
- int block_offset;
- DBUG_ENTER("Create_file_log_event");
-
- /*
- We must make copy of 'buf' as this event may have to live over a
- rotate log entry when used in mysqlbinlog
- */
+ DBUG_ENTER("Create_file_log_event::Create_file_log_event(char*,...)");
+ uint block_offset;
+ uint header_len= description_event->common_header_len;
+ uint8 load_header_len= description_event->post_header_len[LOAD_EVENT-1];
+ uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
if (!(event_buf= my_memdup((byte*) buf, len, MYF(MY_WME))) ||
- (copy_log_event(event_buf, len, old_format)))
+ copy_log_event(event_buf,len,
+ ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ load_header_len + header_len :
+ (fake_base ? (header_len+load_header_len) :
+ (header_len+load_header_len) +
+ create_file_header_len)),
+ description_event))
DBUG_VOID_RETURN;
-
- if (!old_format)
+ if (description_event->binlog_version!=1)
{
- file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN +
- + LOAD_HEADER_LEN + CF_FILE_ID_OFFSET);
- // + 1 for \0 terminating fname
- block_offset = (LOG_EVENT_HEADER_LEN + Load_log_event::get_data_size() +
- CREATE_FILE_HEADER_LEN + 1);
+ file_id= uint4korr(buf +
+ header_len +
+ load_header_len + CF_FILE_ID_OFFSET);
+ /*
+ Note that it's ok to use get_data_size() below, because it is computed
+ with values we have already read from this event (because we called
+ copy_log_event()); we are not using slave's format info to decode
+ master's format, we are really using master's format info.
+ Anyway, both formats should be identical (except the common_header_len)
+ as these Load events are not changed between 4.0 and 5.0 (as logging of
+ LOAD DATA INFILE does not use Load_log_event in 5.0).
+
+ The + 1 is for \0 terminating fname
+ */
+ block_offset= (description_event->common_header_len +
+ Load_log_event::get_data_size() +
+ create_file_header_len + 1);
if (len < block_offset)
return;
block = (char*)buf + block_offset;
@@ -2864,18 +3975,18 @@ Create_file_log_event::Create_file_log_event(const char* buf, int len,
#ifdef MYSQL_CLIENT
void Create_file_log_event::print(FILE* file, bool short_form,
- char* last_db, bool enable_local)
+ LAST_EVENT_INFO* last_event_info, bool enable_local)
{
if (short_form)
{
if (enable_local && check_fname_outside_temp_buf())
- Load_log_event::print(file, 1, last_db);
+ Load_log_event::print(file, 1, last_event_info);
return;
}
if (enable_local)
{
- Load_log_event::print(file, short_form, last_db, !check_fname_outside_temp_buf());
+ Load_log_event::print(file, short_form, last_event_info, !check_fname_outside_temp_buf());
/*
That one is for "file_id: etc" below: in mysqlbinlog we want the #, in
SHOW BINLOG EVENTS we don't.
@@ -2888,9 +3999,9 @@ void Create_file_log_event::print(FILE* file, bool short_form,
void Create_file_log_event::print(FILE* file, bool short_form,
- char* last_db)
+ LAST_EVENT_INFO* last_event_info)
{
- print(file,short_form,last_db,0);
+ print(file,short_form,last_event_info,0);
}
#endif /* MYSQL_CLIENT */
@@ -3011,30 +4122,38 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, const char* db_arg,
Append_block_log_event ctor
*/
-Append_block_log_event::Append_block_log_event(const char* buf, int len)
- :Log_event(buf, 0),block(0)
+Append_block_log_event::Append_block_log_event(const char* buf, uint len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event),block(0)
{
- DBUG_ENTER("Append_block_log_event");
- if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
+ DBUG_ENTER("Append_block_log_event::Append_block_log_event(char*,...)");
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 append_block_header_len=
+ description_event->post_header_len[APPEND_BLOCK_EVENT-1];
+ uint total_header_len= common_header_len+append_block_header_len;
+ if (len < total_header_len)
DBUG_VOID_RETURN;
- file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
- block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD;
- block_len = len - APPEND_BLOCK_EVENT_OVERHEAD;
+ file_id= uint4korr(buf + common_header_len + AB_FILE_ID_OFFSET);
+ block= (char*)buf + total_header_len;
+ block_len= len - total_header_len;
DBUG_VOID_RETURN;
}
/*
- Append_block_log_event::write_data()
+ Append_block_log_event::write()
*/
-int Append_block_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool Append_block_log_event::write(IO_CACHE* file)
{
byte buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
- return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
+ return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
+ my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
my_b_safe_write(file, (byte*) block, block_len));
}
+#endif
/*
@@ -3043,14 +4162,14 @@ int Append_block_log_event::write_data(IO_CACHE* file)
#ifdef MYSQL_CLIENT
void Append_block_log_event::print(FILE* file, bool short_form,
- char* last_db)
+ LAST_EVENT_INFO* last_event_info)
{
if (short_form)
return;
print_header(file);
fputc('\n', file);
- fprintf(file, "#Append_block: file_id: %d block_len: %d\n",
- file_id, block_len);
+ fprintf(file, "#%s: file_id: %d block_len: %d\n",
+ get_type_str(), file_id, block_len);
}
#endif /* MYSQL_CLIENT */
@@ -3069,14 +4188,21 @@ void Append_block_log_event::pack_info(Protocol *protocol)
block_len));
protocol->store(buf, length, &my_charset_bin);
}
-#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
/*
+ Append_block_log_event::get_create_or_append()
+*/
+
+int Append_block_log_event::get_create_or_append() const
+{
+ return 0; /* append to the file, fail if not exists */
+}
+
+/*
Append_block_log_event::exec_event()
*/
-#if defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
{
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
@@ -3088,14 +4214,32 @@ int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
memcpy(p, ".data", 6);
strnmov(proc_info, "Making temp file ", 17); // no end 0
thd->proc_info= proc_info;
- if ((fd = my_open(fname, O_WRONLY|O_APPEND|O_BINARY|O_NOFOLLOW, MYF(MY_WME))) < 0)
+ if (get_create_or_append())
{
- slave_print_error(rli,my_errno, "Error in Append_block event: could not open file '%s'", fname);
+ my_delete(fname, MYF(0)); // old copy may exist already
+ if ((fd= my_create(fname, CREATE_MODE,
+ O_WRONLY | O_BINARY | O_EXCL | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
+ {
+ slave_print_error(rli, my_errno,
+ "Error in %s event: could not create file '%s'",
+ get_type_str(), fname);
+ goto err;
+ }
+ }
+ else if ((fd = my_open(fname, O_WRONLY | O_APPEND | O_BINARY | O_NOFOLLOW,
+ MYF(MY_WME))) < 0)
+ {
+ slave_print_error(rli, my_errno,
+ "Error in %s event: could not open file '%s'",
+ get_type_str(), fname);
goto err;
}
if (my_write(fd, (byte*) block, block_len, MYF(MY_WME+MY_NABP)))
{
- slave_print_error(rli,my_errno, "Error in Append_block event: write to '%s' failed", fname);
+ slave_print_error(rli, my_errno,
+ "Error in %s event: write to '%s' failed",
+ get_type_str(), fname);
goto err;
}
error=0;
@@ -3129,25 +4273,31 @@ Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
Delete_file_log_event ctor
*/
-Delete_file_log_event::Delete_file_log_event(const char* buf, int len)
- :Log_event(buf, 0),file_id(0)
+Delete_file_log_event::Delete_file_log_event(const char* buf, uint len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event),file_id(0)
{
- if ((uint)len < DELETE_FILE_EVENT_OVERHEAD)
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 delete_file_header_len= description_event->post_header_len[DELETE_FILE_EVENT-1];
+ if (len < (uint)(common_header_len + delete_file_header_len))
return;
- file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
+ file_id= uint4korr(buf + common_header_len + DF_FILE_ID_OFFSET);
}
/*
- Delete_file_log_event::write_data()
+ Delete_file_log_event::write()
*/
-int Delete_file_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool Delete_file_log_event::write(IO_CACHE* file)
{
byte buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
- return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN);
+ return (write_header(file, sizeof(buf)) ||
+ my_b_safe_write(file, buf, sizeof(buf)));
}
+#endif
/*
@@ -3156,7 +4306,7 @@ int Delete_file_log_event::write_data(IO_CACHE* file)
#ifdef MYSQL_CLIENT
void Delete_file_log_event::print(FILE* file, bool short_form,
- char* last_db)
+ LAST_EVENT_INFO* last_event_info)
{
if (short_form)
return;
@@ -3219,25 +4369,31 @@ Execute_load_log_event::Execute_load_log_event(THD *thd_arg, const char* db_arg,
Execute_load_log_event ctor
*/
-Execute_load_log_event::Execute_load_log_event(const char* buf, int len)
- :Log_event(buf, 0), file_id(0)
+Execute_load_log_event::Execute_load_log_event(const char* buf, uint len,
+ const Format_description_log_event* description_event)
+ :Log_event(buf, description_event), file_id(0)
{
- if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
+ uint8 common_header_len= description_event->common_header_len;
+ uint8 exec_load_header_len= description_event->post_header_len[EXEC_LOAD_EVENT-1];
+ if (len < (uint)(common_header_len+exec_load_header_len))
return;
- file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET);
+ file_id= uint4korr(buf + common_header_len + EL_FILE_ID_OFFSET);
}
/*
- Execute_load_log_event::write_data()
+ Execute_load_log_event::write()
*/
-int Execute_load_log_event::write_data(IO_CACHE* file)
+#ifndef MYSQL_CLIENT
+bool Execute_load_log_event::write(IO_CACHE* file)
{
byte buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id);
- return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN);
+ return (write_header(file, sizeof(buf)) ||
+ my_b_safe_write(file, buf, sizeof(buf)));
}
+#endif
/*
@@ -3246,7 +4402,7 @@ int Execute_load_log_event::write_data(IO_CACHE* file)
#ifdef MYSQL_CLIENT
void Execute_load_log_event::print(FILE* file, bool short_form,
- char* last_db)
+ LAST_EVENT_INFO* last_event_info)
{
if (short_form)
return;
@@ -3285,7 +4441,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
Load_log_event* lev = 0;
memcpy(p, ".info", 6);
- if ((fd = my_open(fname, O_RDONLY|O_BINARY|O_NOFOLLOW, MYF(MY_WME))) < 0 ||
+ if ((fd = my_open(fname, O_RDONLY | O_BINARY | O_NOFOLLOW,
+ MYF(MY_WME))) < 0 ||
init_io_cache(&file, fd, IO_SIZE, READ_CACHE, (my_off_t)0, 0,
MYF(MY_WME|MY_NABP)))
{
@@ -3293,8 +4450,8 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
goto err;
}
if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
- (pthread_mutex_t*)0,
- (bool)0)) ||
+ (pthread_mutex_t*)0,
+ rli->relay_log.description_event_for_exec)) ||
lev->get_type_code() != NEW_LOAD_EVENT)
{
slave_print_error(rli,0, "Error in Exec_load event: file '%s' appears corrupted", fname);
@@ -3309,15 +4466,7 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
mysql_load()).
*/
-#if MYSQL_VERSION_ID < 40100
- rli->future_master_log_pos= log_pos + get_event_len() -
- (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
-#elif MYSQL_VERSION_ID < 50000
- rli->future_group_master_log_pos= log_pos + get_event_len() -
- (rli->mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
-#else
- rli->future_group_master_log_pos= log_pos;
-#endif
+ rli->future_group_master_log_pos= log_pos;
if (lev->exec_event(0,rli,1))
{
/*
@@ -3368,6 +4517,223 @@ err:
/**************************************************************************
+ Begin_load_query_log_event methods
+**************************************************************************/
+
+#ifndef MYSQL_CLIENT
+Begin_load_query_log_event::
+Begin_load_query_log_event(THD* thd_arg, const char* db_arg, char* block_arg,
+ uint block_len_arg, bool using_trans)
+ :Append_block_log_event(thd_arg, db_arg, block_arg, block_len_arg,
+ using_trans)
+{
+ file_id= thd_arg->file_id= mysql_bin_log.next_file_id();
+}
+#endif
+
+
+Begin_load_query_log_event::
+Begin_load_query_log_event(const char* buf, uint len,
+ const Format_description_log_event* desc_event)
+ :Append_block_log_event(buf, len, desc_event)
+{
+}
+
+
+#if defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+int Begin_load_query_log_event::get_create_or_append() const
+{
+ return 1; /* create the file */
+}
+#endif /* defined( HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
+
+
+/**************************************************************************
+ Execute_load_query_log_event methods
+**************************************************************************/
+
+
+#ifndef MYSQL_CLIENT
+Execute_load_query_log_event::
+Execute_load_query_log_event(THD* thd_arg, const char* query_arg,
+ ulong query_length_arg, uint fn_pos_start_arg,
+ uint fn_pos_end_arg,
+ enum_load_dup_handling dup_handling_arg,
+ bool using_trans, bool suppress_use):
+ Query_log_event(thd_arg, query_arg, query_length_arg, using_trans,
+ suppress_use),
+ file_id(thd_arg->file_id), fn_pos_start(fn_pos_start_arg),
+ fn_pos_end(fn_pos_end_arg), dup_handling(dup_handling_arg)
+{
+}
+#endif /* !MYSQL_CLIENT */
+
+
+Execute_load_query_log_event::
+Execute_load_query_log_event(const char* buf, uint event_len,
+ const Format_description_log_event* desc_event):
+ Query_log_event(buf, event_len, desc_event, EXECUTE_LOAD_QUERY_EVENT),
+ file_id(0), fn_pos_start(0), fn_pos_end(0)
+{
+ if (!Query_log_event::is_valid())
+ return;
+
+ buf+= desc_event->common_header_len;
+
+ fn_pos_start= uint4korr(buf + ELQ_FN_POS_START_OFFSET);
+ fn_pos_end= uint4korr(buf + ELQ_FN_POS_END_OFFSET);
+ dup_handling= (enum_load_dup_handling)(*(buf + ELQ_DUP_HANDLING_OFFSET));
+
+ if (fn_pos_start > q_len || fn_pos_end > q_len ||
+ dup_handling > LOAD_DUP_REPLACE)
+ return;
+
+ file_id= uint4korr(buf + ELQ_FILE_ID_OFFSET);
+}
+
+
+ulong Execute_load_query_log_event::get_post_header_size_for_derived()
+{
+ return EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN;
+}
+
+
+#ifndef MYSQL_CLIENT
+bool
+Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
+{
+ char buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
+ int4store(buf, file_id);
+ int4store(buf + 4, fn_pos_start);
+ int4store(buf + 4 + 4, fn_pos_end);
+ *(buf + 4 + 4 + 4)= (char)dup_handling;
+ return my_b_safe_write(file, (byte*) buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
+}
+#endif
+
+
+#ifdef MYSQL_CLIENT
+void Execute_load_query_log_event::print(FILE* file, bool short_form,
+ LAST_EVENT_INFO* last_event_info)
+{
+ print(file, short_form, last_event_info, 0);
+}
+
+
+void Execute_load_query_log_event::print(FILE* file, bool short_form,
+ LAST_EVENT_INFO* last_event_info,
+ const char *local_fname)
+{
+ print_query_header(file, short_form, last_event_info);
+
+ if (local_fname)
+ {
+ my_fwrite(file, (byte*) query, fn_pos_start, MYF(MY_NABP | MY_WME));
+ fprintf(file, " LOCAL INFILE \'");
+ fprintf(file, local_fname);
+ fprintf(file, "\'");
+ if (dup_handling == LOAD_DUP_REPLACE)
+ fprintf(file, " REPLACE");
+ fprintf(file, " INTO");
+ my_fwrite(file, (byte*) query + fn_pos_end, q_len-fn_pos_end,
+ MYF(MY_NABP | MY_WME));
+ fprintf(file, ";\n");
+ }
+ else
+ {
+ my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME));
+ fprintf(file, ";\n");
+ }
+
+ if (!short_form)
+ fprintf(file, "# file_id: %d \n", file_id);
+}
+#endif
+
+
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+void Execute_load_query_log_event::pack_info(Protocol *protocol)
+{
+ char *buf, *pos;
+ if (!(buf= my_malloc(9 + db_len + q_len + 10 + 21, MYF(MY_WME))))
+ return;
+ pos= buf;
+ if (db && db_len)
+ {
+ pos= strmov(buf, "use `");
+ memcpy(pos, db, db_len);
+ pos= strmov(pos+db_len, "`; ");
+ }
+ if (query && q_len)
+ {
+ memcpy(pos, query, q_len);
+ pos+= q_len;
+ }
+ pos= strmov(pos, " ;file_id=");
+ pos= int10_to_str((long) file_id, pos, 10);
+ protocol->store(buf, pos-buf, &my_charset_bin);
+ my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
+}
+
+
+int
+Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli)
+{
+ char *p;
+ char *buf;
+ char *fname;
+ char *fname_end;
+ int error;
+
+ /* Replace filename and LOCAL keyword in query before executing it */
+ if (!(buf = my_malloc(q_len + 1 - (fn_pos_end - fn_pos_start) +
+ (FN_REFLEN + 10) + 10 + 8 + 5, MYF(MY_WME))))
+ {
+ slave_print_error(rli, my_errno, "Not enough memory");
+ return 1;
+ }
+
+ p= buf;
+ memcpy(p, query, fn_pos_start);
+ p+= fn_pos_start;
+ fname= (p= strmake(p, " INFILE \'", 9));
+ p= slave_load_file_stem(p, file_id, server_id);
+ fname_end= (p= strmake(p, ".data", 5));
+ *(p++)='\'';
+ switch (dup_handling)
+ {
+ case LOAD_DUP_IGNORE:
+ p= strmake(p, " IGNORE", 7);
+ break;
+ case LOAD_DUP_REPLACE:
+ p= strmake(p, " REPLACE", 8);
+ break;
+ default:
+ /* Ordinary load data */
+ break;
+ }
+ p= strmake(p, " INTO", 5);
+ p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
+
+ error= Query_log_event::exec_event(rli, buf, p-buf);
+
+ /* Forging file name for deletion in same buffer */
+ *fname_end= 0;
+
+ /*
+ If there was an error the slave is going to stop, leave the
+ file so that we can re-execute this event at START SLAVE.
+ */
+ if (!error)
+ (void) my_delete(fname, MYF(MY_WME));
+
+ my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
+ return error;
+}
+#endif
+
+
+/**************************************************************************
sql_ex_info methods
**************************************************************************/
@@ -3375,15 +4741,15 @@ err:
sql_ex_info::write_data()
*/
-int sql_ex_info::write_data(IO_CACHE* file)
+bool sql_ex_info::write_data(IO_CACHE* file)
{
if (new_format())
{
- return (write_str(file, field_term, field_term_len) ||
- write_str(file, enclosed, enclosed_len) ||
- write_str(file, line_term, line_term_len) ||
- write_str(file, line_start, line_start_len) ||
- write_str(file, escaped, escaped_len) ||
+ return (write_str(file, field_term, (uint) field_term_len) ||
+ write_str(file, enclosed, (uint) enclosed_len) ||
+ write_str(file, line_term, (uint) line_term_len) ||
+ write_str(file, line_start, (uint) line_start_len) ||
+ write_str(file, escaped, (uint) escaped_len) ||
my_b_safe_write(file,(byte*) &opt_flags,1));
}
else
@@ -3396,7 +4762,7 @@ int sql_ex_info::write_data(IO_CACHE* file)
old_ex.escaped= *escaped;
old_ex.opt_flags= opt_flags;
old_ex.empty_flags=empty_flags;
- return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex));
+ return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex)) != 0;
}
}
@@ -3418,11 +4784,11 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
the case when we have old format because we will be reusing net buffer
to read the actual file before we write out the Create_file event.
*/
- if (read_str(buf, buf_end, field_term, field_term_len) ||
- read_str(buf, buf_end, enclosed, enclosed_len) ||
- read_str(buf, buf_end, line_term, line_term_len) ||
- read_str(buf, buf_end, line_start, line_start_len) ||
- read_str(buf, buf_end, escaped, escaped_len))
+ if (read_str(&buf, buf_end, &field_term, &field_term_len) ||
+ read_str(&buf, buf_end, &enclosed, &enclosed_len) ||
+ read_str(&buf, buf_end, &line_term, &line_term_len) ||
+ read_str(&buf, buf_end, &line_start, &line_start_len) ||
+ read_str(&buf, buf_end, &escaped, &escaped_len))
return 0;
opt_flags = *buf++;
}