summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
authorSergei Golubchik <sergii@pisem.net>2011-10-19 21:45:18 +0200
committerSergei Golubchik <sergii@pisem.net>2011-10-19 21:45:18 +0200
commit76f0b94bb0b2994d639353530c5b251d0f1a204b (patch)
tree9ed50628aac34f89a37637bab2fc4915b86b5eb4 /sql/log_event.cc
parent4e46d8e5bff140f2549841167dc4b65a3c0a645d (diff)
parent5dc1a2231f55bacc9aaf0e24816f3d9c2ee1f21d (diff)
downloadmariadb-git-76f0b94bb0b2994d639353530c5b251d0f1a204b.tar.gz
merge with 5.3
sql/sql_insert.cc: CREATE ... IF NOT EXISTS may do nothing, but it is still not a failure. don't forget to my_ok it. ****** CREATE ... IF NOT EXISTS may do nothing, but it is still not a failure. don't forget to my_ok it. sql/sql_table.cc: small cleanup ****** small cleanup
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc994
1 files changed, 849 insertions, 145 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 70087ed4da3..49383778b58 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -51,6 +51,31 @@
#include <my_bitmap.h>
#include "rpl_utility.h"
+
+/**
+ BINLOG_CHECKSUM variable.
+*/
+const char *binlog_checksum_type_names[]= {
+ "NONE",
+ "CRC32",
+ NullS
+};
+
+unsigned int binlog_checksum_type_length[]= {
+ sizeof("NONE") - 1,
+ sizeof("CRC32") - 1,
+ 0
+};
+
+TYPELIB binlog_checksum_typelib=
+{
+ array_elements(binlog_checksum_type_names) - 1, "",
+ binlog_checksum_type_names,
+ binlog_checksum_type_length
+};
+
+
+
#define log_cs &my_charset_latin1
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
@@ -64,6 +89,24 @@
*/
#define FMT_G_BUFSIZE(PREC) (3 + (PREC) + 5 + 1)
+/*
+ replication event checksum is introduced in the following "checksum-home" version.
+ The checksum-aware servers extract FD's version to decide whether the FD event
+ carries checksum info.
+
+ TODO: correct the constant when it has been determined
+ (which main tree to push and when)
+*/
+const uchar checksum_version_split_mysql[3]= {5, 6, 1};
+const ulong checksum_version_product_mysql=
+ (checksum_version_split_mysql[0] * 256 +
+ checksum_version_split_mysql[1]) * 256 +
+ checksum_version_split_mysql[2];
+const uchar checksum_version_split_mariadb[3]= {5, 3, 0};
+const ulong checksum_version_product_mariadb=
+ (checksum_version_split_mariadb[0] * 256 +
+ checksum_version_split_mariadb[1]) * 256 +
+ checksum_version_split_mariadb[2];
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD* thd);
@@ -587,7 +630,7 @@ append_query_string(CHARSET_INFO *csinfo,
if (to->reserve(orig_len + from->length()*2+3))
return 1;
- beg= to->c_ptr_quick() + to->length();
+ beg= (char*) to->ptr() + to->length();
ptr= beg;
if (csinfo->escape_with_backslash_is_dangerous)
ptr= str_to_hex(ptr, from->ptr(), from->length());
@@ -624,7 +667,6 @@ static void print_set_option(IO_CACHE* file, uint32 bits_changed,
}
}
#endif
-
/**************************************************************************
Log_event methods (= the parent class of all events)
**************************************************************************/
@@ -663,6 +705,7 @@ const char* Log_event::get_type_str(Log_event_type type)
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
case INCIDENT_EVENT: return "Incident";
+ case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
default: return "Unknown"; /* impossible */
}
}
@@ -680,10 +723,12 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
:log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg),
- cache_type(Log_event::EVENT_INVALID_CACHE), thd(thd_arg)
+ crc(0), thd(thd_arg),
+ checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
server_id= thd->server_id;
- when= thd->start_time;
+ when= thd->start_time;
+ when_sec_part=thd->start_time_sec_part;
if (using_trans)
cache_type= Log_event::EVENT_TRANSACTIONAL_CACHE;
@@ -700,14 +745,16 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
Log_event::Log_event()
:temp_buf(0), exec_time(0), flags(0),
- cache_type(Log_event::EVENT_INVALID_CACHE), thd(0)
+ cache_type(Log_event::EVENT_INVALID_CACHE), crc(0),
+ thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
server_id= ::server_id;
/*
We can't call my_time() here as this would cause a call before
my_init() is called
*/
- when= 0;
+ when= 0;
+ when_sec_part=0;
log_pos= 0;
}
#endif /* !MYSQL_CLIENT */
@@ -719,12 +766,14 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event)
- :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE)
+ :temp_buf(0), cache_type(Log_event::EVENT_INVALID_CACHE),
+ crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{
#ifndef MYSQL_CLIENT
thd = 0;
#endif
when = uint4korr(buf);
+ when_sec_part= 0;
server_id = uint4korr(buf + SERVER_ID_OFFSET);
data_written= uint4korr(buf + EVENT_LEN_OFFSET);
if (description_event->binlog_version==1)
@@ -746,7 +795,7 @@ Log_event::Log_event(const char* buf,
logs are in 4.0 format, until it finds a Format_desc).
*/
if (description_event->binlog_version==3 &&
- buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
+ (uchar)buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
{
/*
If log_pos=0, don't change it. log_pos==0 is a marker to mean
@@ -764,8 +813,8 @@ Log_event::Log_event(const char* buf,
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
flags= uint2korr(buf + FLAGS_OFFSET);
- if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
- (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
+ if (((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
{
/*
These events always have a header which stops here (i.e. their
@@ -813,21 +862,13 @@ int Log_event::do_update_pos(Relay_log_info *rli)
DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp",
if (debug_not_change_ts_if_art_event == 1
&& is_artificial_event())
- {
- debug_not_change_ts_if_art_event= 0;
- });
-#ifndef DBUG_OFF
- rli->stmt_done(log_pos,
- is_artificial_event() &&
- debug_not_change_ts_if_art_event > 0 ? 0 : when);
-#else
- rli->stmt_done(log_pos, is_artificial_event()? 0 : when);
-#endif
+ debug_not_change_ts_if_art_event= 0; );
+ rli->stmt_done(log_pos, is_artificial_event() &&
+ IF_DBUG(debug_not_change_ts_if_art_event > 0, 1) ?
+ 0 : when);
DBUG_EXECUTE_IF("let_first_flush_log_change_timestamp",
if (debug_not_change_ts_if_art_event == 0)
- {
- debug_not_change_ts_if_art_event= 2;
- });
+ debug_not_change_ts_if_art_event= 2; );
}
return 0; // Cannot fail currently
}
@@ -904,6 +945,105 @@ void Log_event::init_show_field_list(List<Item>* field_list)
field_list->push_back(new Item_empty_string("Info", 20));
}
+/**
+ A decider of whether to trigger checksum computation or not.
+ To be invoked in Log_event::write() stack.
+ The decision is positive
+
+ S,M) if it's been marked for checksumming with @c checksum_alg
+
+ M) otherwise, if @@global.binlog_checksum is not NONE and the event is
+ directly written to the binlog file.
+ The to-be-cached event decides at @c write_cache() time.
+
+ Otherwise the decision is negative.
+
+ @note A side effect of the method is altering Log_event::checksum_alg
+ it the latter was undefined at calling.
+
+ @return true (positive) or false (negative)
+*/
+my_bool Log_event::need_checksum()
+{
+ DBUG_ENTER("Log_event::need_checksum");
+ my_bool ret;
+ /*
+ few callers of Log_event::write
+ (incl FD::write, FD constructing code on the slave side, Rotate relay log
+ and Stop event)
+ provides their checksum alg preference through Log_event::checksum_alg.
+ */
+ ret= ((checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
+ (checksum_alg != BINLOG_CHECKSUM_ALG_OFF) :
+ ((binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF) &&
+ (cache_type == Log_event::EVENT_NO_CACHE)) ?
+ test(binlog_checksum_options) : FALSE);
+
+ /*
+ FD calls the methods before data_written has been calculated.
+ The following invariant claims if the current is not the first
+ call (and therefore data_written is not zero) then `ret' must be
+ TRUE. It may not be null because FD is always checksummed.
+ */
+
+ DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret ||
+ data_written == 0);
+
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
+ checksum_alg= ret ? // calculated value stored
+ (uint8) binlog_checksum_options : (uint8) BINLOG_CHECKSUM_ALG_OFF;
+
+ DBUG_ASSERT(!ret ||
+ ((checksum_alg == binlog_checksum_options ||
+ /*
+ Stop event closes the relay-log and its checksum alg
+ preference is set by the caller can be different
+ from the server's binlog_checksum_options.
+ */
+ get_type_code() == STOP_EVENT ||
+ /*
+ Rotate:s can be checksummed regardless of the server's
+ binlog_checksum_options. That applies to both
+ the local RL's Rotate and the master's Rotate
+ which IO thread instantiates via queue_binlog_ver_3_event.
+ */
+ get_type_code() == ROTATE_EVENT
+ || /* FD is always checksummed */
+ get_type_code() == FORMAT_DESCRIPTION_EVENT) &&
+ checksum_alg != BINLOG_CHECKSUM_ALG_OFF));
+
+ DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
+
+ DBUG_ASSERT(((get_type_code() != ROTATE_EVENT &&
+ get_type_code() != STOP_EVENT) ||
+ get_type_code() != FORMAT_DESCRIPTION_EVENT) ||
+ cache_type == Log_event::EVENT_NO_CACHE);
+
+ DBUG_RETURN(ret);
+}
+
+bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong size)
+{
+ if (need_checksum() && size != 0)
+ crc= my_checksum(crc, buf, size);
+
+ return my_b_safe_write(file, buf, size);
+}
+
+bool Log_event::write_footer(IO_CACHE* file)
+{
+ /*
+ footer contains the checksum-algorithm descriptor
+ followed by the checksum value
+ */
+ if (need_checksum())
+ {
+ uchar buf[BINLOG_CHECKSUM_LEN];
+ int4store(buf, crc);
+ return (my_b_safe_write(file, (uchar*) buf, sizeof(buf)));
+ }
+ return 0;
+}
/*
Log_event::write()
@@ -913,11 +1053,18 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
{
uchar header[LOG_EVENT_HEADER_LEN];
ulong now;
+ bool ret;
DBUG_ENTER("Log_event::write_header");
/* Store number of bytes that will be written by this event */
data_written= event_data_length + sizeof(header);
+ if (need_checksum())
+ {
+ crc= my_checksum(0L, NULL, 0);
+ data_written += BINLOG_CHECKSUM_LEN;
+ }
+
/*
log_pos != 0 if this is relay-log event. In this case we should not
change the position
@@ -962,7 +1109,7 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
log_pos= my_b_safe_tell(file)+data_written;
}
- now= (ulong) get_time(); // Query start time
+ now= get_time(); // Query start time
/*
Header will be of size LOG_EVENT_HEADER_LEN for all events, except for
@@ -976,9 +1123,36 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
int4store(header+ SERVER_ID_OFFSET, server_id);
int4store(header+ EVENT_LEN_OFFSET, data_written);
int4store(header+ LOG_POS_OFFSET, log_pos);
- int2store(header+ FLAGS_OFFSET, flags);
-
- DBUG_RETURN(my_b_safe_write(file, header, sizeof(header)) != 0);
+ /*
+ recording checksum of FD event computed with dropped
+ possibly active LOG_EVENT_BINLOG_IN_USE_F flag.
+ Similar step at verication: the active flag is dropped before
+ checksum computing.
+ */
+ if (header[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT ||
+ !need_checksum() || !(flags & LOG_EVENT_BINLOG_IN_USE_F))
+ {
+ int2store(header+ FLAGS_OFFSET, flags);
+ ret= wrapper_my_b_safe_write(file, header, sizeof(header)) != 0;
+ }
+ else
+ {
+ ret= (wrapper_my_b_safe_write(file, header, FLAGS_OFFSET) != 0);
+ if (!ret)
+ {
+ flags &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ int2store(header + FLAGS_OFFSET, flags);
+ crc= my_checksum(crc, header + FLAGS_OFFSET, sizeof(flags));
+ flags |= LOG_EVENT_BINLOG_IN_USE_F;
+ int2store(header + FLAGS_OFFSET, flags);
+ ret= (my_b_safe_write(file, header + FLAGS_OFFSET, sizeof(flags)) != 0);
+ }
+ if (!ret)
+ ret= (wrapper_my_b_safe_write(file, header + FLAGS_OFFSET + sizeof(flags),
+ sizeof(header)
+ - (FLAGS_OFFSET + sizeof(flags))) != 0);
+ }
+ DBUG_RETURN( ret);
}
@@ -988,11 +1162,13 @@ bool Log_event::write_header(IO_CACHE* file, ulong event_data_length)
*/
int Log_event::read_log_event(IO_CACHE* file, String* packet,
- mysql_mutex_t* log_lock)
+ mysql_mutex_t* log_lock,
+ uint8 checksum_alg_arg)
{
ulong data_len;
int result=0;
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
+ uchar ev_offset= packet->length();
DBUG_ENTER("Log_event::read_log_event");
if (log_lock)
@@ -1050,6 +1226,31 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
(file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO));
/* Implicit goto end; */
}
+ else
+ {
+ /* Corrupt the event for Dump thread*/
+ DBUG_EXECUTE_IF("corrupt_read_log_event2",
+ uchar *debug_event_buf_c = (uchar*) packet->ptr() + ev_offset;
+ if (debug_event_buf_c[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT)
+ {
+ int debug_cor_pos = rand() % (data_len + sizeof(buf) - BINLOG_CHECKSUM_LEN);
+ debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
+ DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event: byte on position %d", debug_cor_pos));
+ DBUG_SET("-d,corrupt_read_log_event2");
+ }
+ );
+ /*
+ CRC verification of the Dump thread
+ */
+ if (opt_master_verify_checksum &&
+ event_checksum_test((uchar*) packet->ptr() + ev_offset,
+ data_len + sizeof(buf),
+ checksum_alg_arg))
+ {
+ result= LOG_READ_CHECKSUM_FAILURE;
+ goto end;
+ }
+ }
}
end:
@@ -1075,11 +1276,13 @@ end:
Log_event* Log_event::read_log_event(IO_CACHE* file,
mysql_mutex_t* log_lock,
const Format_description_log_event
- *description_event)
+ *description_event,
+ my_bool crc_check)
#else
Log_event* Log_event::read_log_event(IO_CACHE* file,
const Format_description_log_event
- *description_event)
+ *description_event,
+ my_bool crc_check)
#endif
{
DBUG_ENTER("Log_event::read_log_event");
@@ -1143,7 +1346,7 @@ failed my_b_read"));
error = "read error";
goto err;
}
- if ((res= read_log_event(buf, data_len, &error, description_event)))
+ if ((res= read_log_event(buf, data_len, &error, description_event, crc_check)))
res->register_temp_buf(buf, TRUE);
err:
@@ -1176,9 +1379,11 @@ err:
Log_event* Log_event::read_log_event(const char* buf, uint event_len,
const char **error,
- const Format_description_log_event *description_event)
+ const Format_description_log_event *description_event,
+ my_bool crc_check)
{
Log_event* ev;
+ uint8 alg;
DBUG_ENTER("Log_event::read_log_event(char*,...)");
DBUG_ASSERT(description_event != 0);
DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version));
@@ -1186,14 +1391,68 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
/* Check the integrity */
if (event_len < EVENT_LEN_OFFSET ||
- buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
+ (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
{
*error="Sanity check failed"; // Needed to free buffer
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
}
- uint event_type= buf[EVENT_TYPE_OFFSET];
+ uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
+ // all following START events in the current file are without checksum
+ if (event_type == START_EVENT_V3)
+ (const_cast< Format_description_log_event *>(description_event))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
+ /*
+ CRC verification by SQL and Show-Binlog-Events master side.
+ The caller has to provide @description_event->checksum_alg to
+ be the last seen FD's (A) descriptor.
+ If event is FD the descriptor is in it.
+ Notice, FD of the binlog can be only in one instance and therefore
+ Show-Binlog-Events executing master side thread needs just to know
+ the only FD's (A) value - whereas RL can contain more.
+ In the RL case, the alg is kept in FD_e (@description_event) which is reset
+ to the newer read-out event after its execution with possibly new alg descriptor.
+ Therefore in a typical sequence of RL:
+ {FD_s^0, FD_m, E_m^1} E_m^1
+ will be verified with (A) of FD_m.
+
+ See legends definition on MYSQL_BIN_LOG::relay_log_checksum_alg docs
+ lines (log.h).
+
+ Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF.
+ */
+ alg= (event_type != FORMAT_DESCRIPTION_EVENT) ?
+ description_event->checksum_alg : get_checksum_alg(buf, event_len);
+ // Emulate the corruption during reading an event
+ DBUG_EXECUTE_IF("corrupt_read_log_event_char",
+ if (event_type != FORMAT_DESCRIPTION_EVENT)
+ {
+ char *debug_event_buf_c = (char *)buf;
+ int debug_cor_pos = rand() % (event_len - BINLOG_CHECKSUM_LEN);
+ debug_event_buf_c[debug_cor_pos] =~ debug_event_buf_c[debug_cor_pos];
+ DBUG_PRINT("info", ("Corrupt the event at Log_event::read_log_event(char*,...): byte on position %d", debug_cor_pos));
+ DBUG_SET("-d,corrupt_read_log_event_char");
+ }
+ );
+ if (crc_check &&
+ event_checksum_test((uchar *) buf, event_len, alg))
+ {
+#ifdef MYSQL_CLIENT
+ *error= "Event crc check failed! Most likely there is event corruption.";
+ if (force_opt)
+ {
+ ev= new Unknown_log_event(buf, description_event);
+ DBUG_RETURN(ev);
+ }
+ else
+ DBUG_RETURN(NULL);
+#else
+ *error= ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE);
+ sql_print_error("%s", ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE));
+ DBUG_RETURN(NULL);
+#endif
+ }
+
if (event_type > description_event->number_of_event_types &&
event_type != FORMAT_DESCRIPTION_EVENT)
{
@@ -1228,6 +1487,11 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
event_type= new_event_type;
}
+ if (alg != BINLOG_CHECKSUM_ALG_UNDEF &&
+ (event_type == FORMAT_DESCRIPTION_EVENT ||
+ alg != BINLOG_CHECKSUM_ALG_OFF))
+ event_len= event_len - BINLOG_CHECKSUM_LEN;
+
switch(event_type) {
case QUERY_EVENT:
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
@@ -1311,6 +1575,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case INCIDENT_EVENT:
ev = new Incident_log_event(buf, event_len, description_event);
break;
+ case ANNOTATE_ROWS_EVENT:
+ ev = new Annotate_rows_log_event(buf, event_len, description_event);
+ break;
default:
DBUG_PRINT("error",("Unknown event code: %d",
(int) buf[EVENT_TYPE_OFFSET]));
@@ -1319,6 +1586,14 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
}
}
+ if (ev)
+ {
+ ev->checksum_alg= alg;
+ if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ ev->crc= uint4korr(buf + (event_len));
+ }
+
DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)",
ev ? ev->get_type_str() : "<unknown>",
buf[EVENT_TYPE_OFFSET],
@@ -1373,6 +1648,18 @@ void Log_event::print_header(IO_CACHE* file,
my_b_printf(file, " server id %lu end_log_pos %s ", (ulong) server_id,
llstr(log_pos,llbuff));
+ /* print the checksum */
+
+ if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
+ checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
+ {
+ char checksum_buf[BINLOG_CHECKSUM_LEN * 2 + 4]; // to fit to "0x%lx "
+ size_t const bytes_written=
+ my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08lx ", (ulong) crc);
+ my_b_printf(file, "%s ", get_type(&binlog_checksum_typelib, checksum_alg));
+ my_b_printf(file, checksum_buf, bytes_written);
+ }
+
/* mysqlbinlog --hexdump */
if (print_event_info->hexdump_from)
{
@@ -1718,6 +2005,7 @@ log_event_print_value(IO_CACHE *file, const uchar *ptr,
uint64 i64= uint8korr(ptr); /* YYYYMMDDhhmmss */
d= (ulong) (i64 / 1000000);
t= (ulong) (i64 % 1000000);
+
my_b_printf(file, "%04d-%02d-%02d %02d:%02d:%02d",
(int) (d / 10000), (int) (d % 10000) / 100, (int) (d % 100),
(int) (t / 10000), (int) (t % 10000) / 100, (int) t % 100);
@@ -2000,12 +2288,10 @@ end:
delete td;
}
-#ifdef MYSQL_CLIENT
void free_table_map_log_event(Table_map_log_event *event)
{
delete event;
}
-#endif
void Log_event::print_base64(IO_CACHE* file,
PRINT_EVENT_INFO* print_event_info,
@@ -2042,6 +2328,9 @@ void Log_event::print_base64(IO_CACHE* file,
if (print_event_info->verbose)
{
Rows_log_event *ev= NULL;
+ if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF &&
+ checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
+ size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header
if (ptr[4] == TABLE_MAP_EVENT)
{
@@ -2085,15 +2374,11 @@ void Log_event::print_base64(IO_CACHE* file,
void Log_event::print_timestamp(IO_CACHE* file, time_t* ts)
{
struct tm *res;
+ time_t my_when= when;
DBUG_ENTER("Log_event::print_timestamp");
if (!ts)
- ts = &when;
-#ifdef MYSQL_SERVER // This is always false
- struct tm tm_tmp;
- localtime_r(ts,(res= &tm_tmp));
-#else
+ ts = &my_when;
res=localtime(ts);
-#endif
my_b_printf(file,"%02d%02d%02d %2d:%02d:%02d",
res->tm_year % 100,
@@ -2378,6 +2663,14 @@ bool Query_log_event::write(IO_CACHE* file)
start+= host.length;
}
}
+
+ if (thd && thd->query_start_sec_part_used)
+ {
+ *start++= Q_HRNOW;
+ get_time();
+ int3store(start, when_sec_part);
+ start+= 3;
+ }
/*
NOTE: When adding new status vars, please don't forget to update
the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function
@@ -2404,12 +2697,13 @@ bool Query_log_event::write(IO_CACHE* file)
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
return (write_header(file, event_length) ||
- my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
write_post_header_for_derived(file) ||
- my_b_safe_write(file, (uchar*) start_of_status,
+ wrapper_my_b_safe_write(file, (uchar*) start_of_status,
(uint) (start-start_of_status)) ||
- my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
- my_b_safe_write(file, (uchar*) query, q_len)) ? 1 : 0;
+ wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
+ wrapper_my_b_safe_write(file, (uchar*) query, q_len) ||
+ write_footer(file)) ? 1 : 0;
}
/**
@@ -2469,7 +2763,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
error_code= errcode;
- time(&end_time);
+ end_time= my_time(0);
exec_time = (ulong) (end_time - thd_arg->start_time);
/**
@todo this means that if we have no catalog, then it is replicated
@@ -2661,6 +2955,7 @@ code_name(int code)
case Q_CHARSET_DATABASE_CODE: return "Q_CHARSET_DATABASE_CODE";
case Q_TABLE_MAP_FOR_UPDATE_CODE: return "Q_TABLE_MAP_FOR_UPDATE_CODE";
case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE";
+ case Q_HRNOW: return "Q_HRNOW";
}
sprintf(buf, "CODE#%d", code);
return buf;
@@ -2877,6 +3172,14 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
CHECK_SPACE(pos, end, host.length);
host.str= (char *)pos;
pos+= host.length;
+ break;
+ }
+ case Q_HRNOW:
+ {
+ CHECK_SPACE(pos, end, 3);
+ when_sec_part= uint3korr(pos);
+ pos+= 3;
+ break;
}
default:
/* That's why you must write status vars in growing order of code */
@@ -2956,7 +3259,7 @@ void Query_log_event::print_query_header(IO_CACHE* file,
PRINT_EVENT_INFO* print_event_info)
{
// TODO: print the catalog ??
- char buff[40],*end; // Enough for SET TIMESTAMP
+ char buff[64], *end; // Enough for SET TIMESTAMP
bool different_db= 1;
uint32 tmp;
@@ -2983,6 +3286,11 @@ void Query_log_event::print_query_header(IO_CACHE* file,
}
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
+ if (when_sec_part)
+ {
+ *end++= '.';
+ end=int10_to_str(when_sec_part, end, 10);
+ }
end= strmov(end, print_event_info->delimiter);
*end++='\n';
my_b_write(file, (uchar*) buff, (uint) (end-buff));
@@ -3244,7 +3552,7 @@ int Query_log_event::do_apply_event(Relay_log_info const *rli,
*/
if (is_trans_keyword() || rpl_filter->db_ok(thd->db))
{
- thd->set_time((time_t)when);
+ thd->set_time(when, when_sec_part);
thd->set_query_and_id((char*)query_arg, q_len_arg,
thd->charset(), next_query_id());
thd->variables.pseudo_thread_id= thread_id; // for temp tables
@@ -3417,6 +3725,19 @@ START SLAVE; . Query: '%s'", expected_error, thd->query());
/* If the query was not ignored, it is printed to the general log */
if (!thd->is_error() || thd->stmt_da->sql_errno() != ER_SLAVE_IGNORED_TABLE)
general_log_write(thd, COM_QUERY, thd->query(), thd->query_length());
+ else
+ {
+ /*
+ Bug#54201: If we skip an INSERT query that uses auto_increment, then we
+ should reset any @@INSERT_ID set by an Intvar_log_event associated with
+ the query; otherwise the @@INSERT_ID will linger until the next INSERT
+ that uses auto_increment and may affect extra triggers on the slave etc.
+
+ We reset INSERT_ID unconditionally; it is probably cheaper than
+ checking if it is necessary.
+ */
+ thd->auto_inc_intervals_forced.empty();
+ }
compare_errors:
/*
@@ -3706,10 +4027,11 @@ bool Start_log_event_v3::write(IO_CACHE* file)
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
if (!dont_set_created)
- created= when= get_time();
+ created= get_time(); // this sets when and when_sec_part as a side effect
int4store(buff + ST_CREATED_OFFSET,created);
return (write_header(file, sizeof(buff)) ||
- my_b_safe_write(file, (uchar*) buff, sizeof(buff)));
+ wrapper_my_b_safe_write(file, (uchar*) buff, sizeof(buff)) ||
+ write_footer(file));
}
#endif
@@ -3812,6 +4134,7 @@ int Start_log_event_v3::do_apply_event(Relay_log_info const *rli)
old 4.0 (binlog version 2) is not supported;
it should not be used for replication with
5.0.
+ @param server_ver a string containing the server version.
*/
Format_description_log_event::
@@ -3827,9 +4150,9 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
common_header_len= LOG_EVENT_HEADER_LEN;
number_of_event_types= LOG_EVENT_TYPES;
/* we'll catch my_malloc() error in is_valid() */
- post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8),
+ post_header_len=(uint8*) my_malloc(number_of_event_types*sizeof(uint8)
+ + BINLOG_CHECKSUM_ALG_DESC_LEN,
MYF(0));
-
/*
This long list of assignments is not beautiful, but I see no way to
make it nicer, as the right members are #defines, not array members, so
@@ -3894,6 +4217,13 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
post_header_len[HEARTBEAT_LOG_EVENT-1]= 0;
+ // Set header length of the reserved events to 0
+ memset(post_header_len + MYSQL_EVENTS_END - 1, 0,
+ (MARIA_EVENTS_BEGIN - MYSQL_EVENTS_END)*sizeof(uint8));
+
+ // Set header lengths of Maria events
+ post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
+
// Sanity-check that all post header lengths are initialized.
int i;
for (i=0; i<number_of_event_types; i++)
@@ -3946,6 +4276,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
break;
}
calc_server_version_split();
+ checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF;
}
@@ -3980,14 +4311,26 @@ Format_description_log_event(const char* buf,
if ((common_header_len=buf[ST_COMMON_HEADER_LEN_OFFSET]) < OLD_HEADER_LEN)
DBUG_VOID_RETURN; /* sanity check */
number_of_event_types=
- event_len-(LOG_EVENT_MINIMAL_HEADER_LEN+ST_COMMON_HEADER_LEN_OFFSET+1);
+ event_len - (LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET + 1);
DBUG_PRINT("info", ("common_header_len=%d number_of_event_types=%d",
common_header_len, number_of_event_types));
/* If alloc fails, we'll detect it in is_valid() */
+
post_header_len= (uint8*) my_memdup((uchar*)buf+ST_COMMON_HEADER_LEN_OFFSET+1,
number_of_event_types*
- sizeof(*post_header_len), MYF(0));
+ sizeof(*post_header_len),
+ MYF(0));
calc_server_version_split();
+ if (!is_version_before_checksum(&server_version_split))
+ {
+ /* the last bytes are the checksum alg desc and value (or value's room) */
+ number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN;
+ checksum_alg= post_header_len[number_of_event_types];
+ }
+ else
+ {
+ checksum_alg= (uint8) BINLOG_CHECKSUM_ALG_UNDEF;
+ }
/*
In some previous versions, the events were given other event type
@@ -4098,21 +4441,59 @@ Format_description_log_event(const char* buf,
#ifndef MYSQL_CLIENT
bool Format_description_log_event::write(IO_CACHE* file)
{
+ bool ret;
+ bool no_checksum;
/*
We don't call Start_log_event_v3::write() because this would make 2
my_b_safe_write().
*/
- uchar buff[FORMAT_DESCRIPTION_HEADER_LEN];
+ uchar buff[FORMAT_DESCRIPTION_HEADER_LEN + BINLOG_CHECKSUM_ALG_DESC_LEN];
+ size_t rec_size= sizeof(buff);
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
memcpy((char*) buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
if (!dont_set_created)
- created= when= get_time();
+ created= get_time();
int4store(buff + ST_CREATED_OFFSET,created);
buff[ST_COMMON_HEADER_LEN_OFFSET]= LOG_EVENT_HEADER_LEN;
- memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (uchar*) post_header_len,
+ memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET + 1, (uchar*) post_header_len,
LOG_EVENT_TYPES);
- return (write_header(file, sizeof(buff)) ||
- my_b_safe_write(file, buff, sizeof(buff)));
+ /*
+ if checksum is requested
+ record the checksum-algorithm descriptor next to
+ post_header_len vector which will be followed by the checksum value.
+ Master is supposed to trigger checksum computing by binlog_checksum_options,
+ slave does it via marking the event according to
+ FD_queue checksum_alg value.
+ */
+ compile_time_assert(sizeof(BINLOG_CHECKSUM_ALG_DESC_LEN == 1));
+#ifndef DBUG_OFF
+ data_written= 0; // to prepare for need_checksum assert
+#endif
+ buff[FORMAT_DESCRIPTION_HEADER_LEN]= need_checksum() ?
+ checksum_alg : (uint8) BINLOG_CHECKSUM_ALG_OFF;
+ /*
+ FD of checksum-aware server is always checksum-equipped, (V) is in,
+ regardless of @@global.binlog_checksum policy.
+ Thereby a combination of (A) == 0, (V) != 0 means
+ it's the checksum-aware server's FD event that heads checksum-free binlog
+ file.
+ Here 0 stands for checksumming OFF to evaluate (V) as 0 is that case.
+ A combination of (A) != 0, (V) != 0 denotes FD of the checksum-aware server
+ heading the checksummed binlog.
+ (A), (V) presence in FD of the checksum-aware server makes the event
+ 1 + 4 bytes bigger comparing to the former FD.
+ */
+
+ if ((no_checksum= (checksum_alg == BINLOG_CHECKSUM_ALG_OFF)))
+ {
+ checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
+ }
+ ret= (write_header(file, rec_size) ||
+ wrapper_my_b_safe_write(file, buff, rec_size) ||
+ write_footer(file));
+ if (no_checksum)
+ checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
+ return ret;
}
#endif
@@ -4207,6 +4588,30 @@ Format_description_log_event::do_shall_skip(Relay_log_info *rli)
#endif
+static inline void
+do_server_version_split(char* version,
+ Format_description_log_event::master_version_split *split_versions)
+{
+ char *p= version, *r;
+ ulong number;
+ for (uint i= 0; i<=2; i++)
+ {
+ number= strtoul(p, &r, 10);
+ split_versions->ver[i]= (uchar) number;
+ DBUG_ASSERT(number < 256); // fit in uchar
+ p= r;
+ DBUG_ASSERT(!((i == 0) && (*r != '.'))); // should be true in practice
+ if (*r == '.')
+ p++; // skip the dot
+ }
+ if (strstr(p, "MariaDB") != 0 || strstr(p, "-maria-") != 0)
+ split_versions->kind=
+ Format_description_log_event::master_version_split::KIND_MARIADB;
+ else
+ split_versions->kind=
+ Format_description_log_event::master_version_split::KIND_MYSQL;
+}
+
/**
Splits the event's 'server_version' string into three numeric pieces stored
@@ -4219,24 +4624,67 @@ Format_description_log_event::do_shall_skip(Relay_log_info *rli)
*/
void Format_description_log_event::calc_server_version_split()
{
- char *p= server_version, *r;
- ulong number;
- for (uint i= 0; i<=2; i++)
- {
- number= strtoul(p, &r, 10);
- server_version_split[i]= (uchar)number;
- DBUG_ASSERT(number < 256); // fit in uchar
- p= r;
- DBUG_ASSERT(!((i == 0) && (*r != '.'))); // should be true in practice
- if (*r == '.')
- p++; // skip the dot
- }
+ do_server_version_split(server_version, &server_version_split);
+
DBUG_PRINT("info",("Format_description_log_event::server_version_split:"
" '%s' %d %d %d", server_version,
- server_version_split[0],
- server_version_split[1], server_version_split[2]));
+ server_version_split.ver[0],
+ server_version_split.ver[1], server_version_split.ver[2]));
+}
+
+static inline ulong
+version_product(const Format_description_log_event::master_version_split* version_split)
+{
+ return ((version_split->ver[0] * 256 + version_split->ver[1]) * 256
+ + version_split->ver[2]);
+}
+
+/**
+ @return TRUE is the event's version is earlier than one that introduced
+ the replication event checksum. FALSE otherwise.
+*/
+bool
+Format_description_log_event::is_version_before_checksum(const master_version_split
+ *version_split)
+{
+ return version_product(version_split) <
+ (version_split->kind == master_version_split::KIND_MARIADB ?
+ checksum_version_product_mariadb : checksum_version_product_mysql);
}
+/**
+ @param buf buffer holding serialized FD event
+ @param len netto (possible checksum is stripped off) length of the event buf
+
+ @return the version-safe checksum alg descriptor where zero
+ designates no checksum, 255 - the orginator is
+ checksum-unaware (effectively no checksum) and the actuall
+ [1-254] range alg descriptor.
+*/
+uint8 get_checksum_alg(const char* buf, ulong len)
+{
+ uint8 ret;
+ char version[ST_SERVER_VER_LEN];
+ Format_description_log_event::master_version_split version_split;
+
+ DBUG_ENTER("get_checksum_alg");
+ DBUG_ASSERT(buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT);
+
+ memcpy(version, buf +
+ buf[LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET]
+ + ST_SERVER_VER_OFFSET, ST_SERVER_VER_LEN);
+ version[ST_SERVER_VER_LEN - 1]= 0;
+
+ do_server_version_split(version, &version_split);
+ ret= Format_description_log_event::is_version_before_checksum(&version_split) ?
+ (uint8) BINLOG_CHECKSUM_ALG_UNDEF :
+ * (uint8*) (buf + len - BINLOG_CHECKSUM_LEN - BINLOG_CHECKSUM_ALG_DESC_LEN);
+ DBUG_ASSERT(ret == BINLOG_CHECKSUM_ALG_OFF ||
+ ret == BINLOG_CHECKSUM_ALG_UNDEF ||
+ ret == BINLOG_CHECKSUM_ALG_CRC32);
+ DBUG_RETURN(ret);
+}
+
/**************************************************************************
Load_log_event methods
@@ -4543,8 +4991,8 @@ Load_log_event::Load_log_event(const char *buf, uint event_len,
*/
if (event_len)
copy_log_event(buf, event_len,
- ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
- LOAD_HEADER_LEN +
+ (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ LOAD_HEADER_LEN +
description_event->common_header_len :
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
description_event);
@@ -4581,7 +5029,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
*/
if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset,
buf_end,
- buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
+ (uchar)buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
DBUG_RETURN(1);
data_len = event_len - body_offset;
@@ -4819,7 +5267,7 @@ int Load_log_event::do_apply_event(NET* net, Relay_log_info const *rli,
*/
if (rpl_filter->db_ok(thd->db))
{
- thd->set_time((time_t)when);
+ thd->set_time(when, when_sec_part);
thd->set_query_id(next_query_id());
thd->warning_info->opt_clear_warning_info(thd->query_id);
@@ -5093,6 +5541,7 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
DBUG_PRINT("enter",("new_log_ident: %s pos: %s flags: %lu", new_log_ident_arg,
llstr(pos_arg, buff), (ulong) flags));
#endif
+ cache_type= EVENT_NO_CACHE;
if (flags & DUP_NAME)
new_log_ident= my_strndup(new_log_ident_arg, ident_len, MYF(MY_WME));
if (flags & RELAY_LOG)
@@ -5134,9 +5583,11 @@ bool Rotate_log_event::write(IO_CACHE* file)
{
char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos);
- return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
- my_b_safe_write(file, (uchar*)buf, ROTATE_HEADER_LEN) ||
- my_b_safe_write(file, (uchar*)new_log_ident, (uint) ident_len));
+ return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
+ wrapper_my_b_safe_write(file, (uchar*) buf, ROTATE_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*) new_log_ident,
+ (uint) ident_len) ||
+ write_footer(file));
}
#endif
@@ -5305,7 +5756,8 @@ bool Intvar_log_event::write(IO_CACHE* file)
buf[I_TYPE_OFFSET]= (uchar) type;
int8store(buf + I_VAL_OFFSET, val);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -5433,7 +5885,8 @@ bool Rand_log_event::write(IO_CACHE* file)
int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -5535,8 +5988,9 @@ Xid_log_event(const char* buf,
bool Xid_log_event::write(IO_CACHE* file)
{
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
- return write_header(file, sizeof(xid)) ||
- my_b_safe_write(file, (uchar*) &xid, sizeof(xid));
+ return (write_header(file, sizeof(xid)) ||
+ wrapper_my_b_safe_write(file, (uchar*) &xid, sizeof(xid)) ||
+ write_footer(file));
}
#endif
@@ -5715,8 +6169,21 @@ User_var_log_event(const char* buf,
we keep the flags set to UNDEF_F.
*/
uint bytes_read= ((val + val_len) - start);
- DBUG_ASSERT(bytes_read==data_written ||
- bytes_read==(data_written-1));
+#ifndef DBUG_OFF
+ bool old_pre_checksum_fd= description_event->is_version_before_checksum(
+ &description_event->server_version_split);
+#endif
+ DBUG_ASSERT((bytes_read == data_written -
+ (old_pre_checksum_fd ||
+ (description_event->checksum_alg ==
+ BINLOG_CHECKSUM_ALG_OFF)) ?
+ 0 : BINLOG_CHECKSUM_LEN)
+ ||
+ (bytes_read == data_written -1 -
+ (old_pre_checksum_fd ||
+ (description_event->checksum_alg ==
+ BINLOG_CHECKSUM_ALG_OFF)) ?
+ 0 : BINLOG_CHECKSUM_LEN));
if ((data_written - bytes_read) > 0)
{
flags= (uint) *(buf + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
@@ -5784,11 +6251,12 @@ bool User_var_log_event::write(IO_CACHE* file)
event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len;
return (write_header(file, event_length) ||
- my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
- my_b_safe_write(file, (uchar*) name, name_len) ||
- my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
- my_b_safe_write(file, pos, val_len) ||
- my_b_safe_write(file, &flags, unsigned_len));
+ wrapper_my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
+ wrapper_my_b_safe_write(file, (uchar*) name, name_len) ||
+ wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
+ wrapper_my_b_safe_write(file, pos, val_len) ||
+ wrapper_my_b_safe_write(file, &flags, unsigned_len) ||
+ write_footer(file));
}
#endif
@@ -6315,7 +6783,7 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len,
uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
copy_log_event(event_buf,len,
- ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
load_header_len + header_len :
(fake_base ? (header_len+load_header_len) :
(header_len+load_header_len) +
@@ -6554,8 +7022,9 @@ bool Append_block_log_event::write(IO_CACHE* file)
uchar buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
- my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
- my_b_safe_write(file, (uchar*) block, block_len));
+ wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*) block, block_len) ||
+ write_footer(file));
}
#endif
@@ -6713,7 +7182,8 @@ bool Delete_file_log_event::write(IO_CACHE* file)
uchar buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -6810,7 +7280,8 @@ bool Execute_load_log_event::write(IO_CACHE* file)
uchar buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -6872,16 +7343,17 @@ int Execute_load_log_event::do_apply_event(Relay_log_info const *rli)
fname);
goto err;
}
- if (!(lev = (Load_log_event*)Log_event::read_log_event(&file,
- (mysql_mutex_t*)0,
- rli->relay_log.description_event_for_exec)) ||
+ if (!(lev= (Load_log_event*)
+ Log_event::read_log_event(&file,
+ (mysql_mutex_t*)0,
+ rli->relay_log.description_event_for_exec,
+ opt_slave_sql_verify_checksum)) ||
lev->get_type_code() != NEW_LOAD_EVENT)
{
rli->report(ERROR_LEVEL, 0, "Error in Exec_load event: "
"file '%s' appears corrupted", fname);
goto err;
}
-
lev->thd = thd;
/*
lev->do_apply_event should use rli only for errors i.e. should
@@ -7044,7 +7516,7 @@ Execute_load_query_log_event::write_post_header_for_derived(IO_CACHE* file)
int4store(buf + 4, fn_pos_start);
int4store(buf + 4 + 4, fn_pos_end);
*(buf + 4 + 4 + 4)= (uchar) dup_handling;
- return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
+ return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
}
#endif
@@ -7280,7 +7752,8 @@ Rows_log_event::Rows_log_event(THD *thd_arg, TABLE *tbl_arg, ulong tid,
m_width(tbl_arg ? tbl_arg->s->fields : 1),
m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
#ifdef HAVE_REPLICATION
- , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
+ , m_curr_row(NULL), m_curr_row_end(NULL),
+ m_key(NULL), m_key_info(NULL), m_key_nr(0)
#endif
{
/*
@@ -7328,7 +7801,8 @@ Rows_log_event::Rows_log_event(const char *buf, uint event_len,
#endif
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
- , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
+ , m_curr_row(NULL), m_curr_row_end(NULL),
+ m_key(NULL), m_key_info(NULL), m_key_nr(0)
#endif
{
DBUG_ENTER("Rows_log_event::Rows_log_event(const char*,...)");
@@ -7678,7 +8152,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
}
#ifdef HAVE_QUERY_CACHE
- query_cache.invalidate_locked_for_write(rli->tables_to_lock);
+ query_cache.invalidate_locked_for_write(thd, rli->tables_to_lock);
#endif
}
@@ -7705,7 +8179,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
TIMESTAMP column to a table with one.
So we call set_time(), like in SBR. Presently it changes nothing.
*/
- thd->set_time((time_t)when);
+ thd->set_time(when, when_sec_part);
/*
Now we are in a statement and will stay in a statement until we
@@ -8007,11 +8481,11 @@ bool Rows_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
+ return (wrapper_my_b_safe_write(file, buf, 6));
});
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+ return (wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN));
}
bool Rows_log_event::write_data_body(IO_CACHE*file)
@@ -8027,10 +8501,10 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
- res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
+ res= res || wrapper_my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
- res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
+ res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap,
no_bytes_in_map(&m_cols));
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
@@ -8039,11 +8513,11 @@ bool Rows_log_event::write_data_body(IO_CACHE*file)
{
DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
- res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
+ res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
}
DBUG_DUMP("rows", m_rows_buf, data_size);
- res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
+ res= res || wrapper_my_b_safe_write(file, m_rows_buf, (size_t) data_size);
return res;
@@ -8088,6 +8562,144 @@ void Rows_log_event::print_helper(FILE *file,
#endif
/**************************************************************************
+ Annotate_rows_log_event member functions
+**************************************************************************/
+
+#ifndef MYSQL_CLIENT
+Annotate_rows_log_event::Annotate_rows_log_event(THD *thd,
+ uint16 cache_type_arg)
+ : Log_event(thd, 0, true),
+ m_save_thd_query_txt(0),
+ m_save_thd_query_len(0)
+{
+ m_query_txt= thd->query();
+ m_query_len= thd->query_length();
+ cache_type= cache_type_arg;
+}
+#endif
+
+Annotate_rows_log_event::Annotate_rows_log_event(const char *buf,
+ uint event_len,
+ const Format_description_log_event *desc)
+ : Log_event(buf, desc),
+ m_save_thd_query_txt(0),
+ m_save_thd_query_len(0)
+{
+ m_query_len= event_len - desc->common_header_len;
+ m_query_txt= (char*) buf + desc->common_header_len;
+}
+
+Annotate_rows_log_event::~Annotate_rows_log_event()
+{
+#ifndef MYSQL_CLIENT
+ if (m_save_thd_query_txt)
+ thd->set_query(m_save_thd_query_txt, m_save_thd_query_len);
+#endif
+}
+
+int Annotate_rows_log_event::get_data_size()
+{
+ return m_query_len;
+}
+
+Log_event_type Annotate_rows_log_event::get_type_code()
+{
+ return ANNOTATE_ROWS_EVENT;
+}
+
+bool Annotate_rows_log_event::is_valid() const
+{
+ return (m_query_txt != NULL && m_query_len != 0);
+}
+
+#ifndef MYSQL_CLIENT
+bool Annotate_rows_log_event::write_data_header(IO_CACHE *file)
+{
+ return 0;
+}
+#endif
+
+#ifndef MYSQL_CLIENT
+bool Annotate_rows_log_event::write_data_body(IO_CACHE *file)
+{
+ return wrapper_my_b_safe_write(file, (uchar*) m_query_txt, m_query_len);
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+void Annotate_rows_log_event::pack_info(Protocol* protocol)
+{
+ if (m_query_txt && m_query_len)
+ protocol->store(m_query_txt, m_query_len, &my_charset_bin);
+}
+#endif
+
+#ifdef MYSQL_CLIENT
+void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo)
+{
+ if (pinfo->short_form)
+ return;
+
+ print_header(&pinfo->head_cache, pinfo, TRUE);
+ my_b_printf(&pinfo->head_cache, "\tAnnotate_rows:\n");
+
+ char *pbeg; // beginning of the next line
+ char *pend; // end of the next line
+ uint cnt= 0; // characters counter
+
+ for (pbeg= m_query_txt; ; pbeg= pend)
+ {
+ // skip all \r's and \n's at the beginning of the next line
+ for (;; pbeg++)
+ {
+ if (++cnt > m_query_len)
+ return;
+
+ if (*pbeg != '\r' && *pbeg != '\n')
+ break;
+ }
+
+ // find end of the next line
+ for (pend= pbeg + 1;
+ ++cnt <= m_query_len && *pend != '\r' && *pend != '\n';
+ pend++)
+ ;
+
+ // print next line
+ my_b_write(&pinfo->head_cache, (const uchar*) "#Q> ", 4);
+ my_b_write(&pinfo->head_cache, (const uchar*) pbeg, pend - pbeg);
+ my_b_write(&pinfo->head_cache, (const uchar*) "\n", 1);
+ }
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Annotate_rows_log_event::do_apply_event(Relay_log_info const *rli)
+{
+ m_save_thd_query_txt= thd->query();
+ m_save_thd_query_len= thd->query_length();
+ thd->set_query(m_query_txt, m_query_len);
+ return 0;
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Annotate_rows_log_event::do_update_pos(Relay_log_info *rli)
+{
+ rli->inc_event_relay_log_pos();
+ return 0;
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+Log_event::enum_skip_reason
+Annotate_rows_log_event::do_shall_skip(Relay_log_info *rli)
+{
+ return continue_group(rli);
+}
+#endif
+
+/**************************************************************************
Table_map_log_event member functions and support functions
**************************************************************************/
@@ -8587,11 +9199,11 @@ bool Table_map_log_event::write_data_header(IO_CACHE *file)
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
+ return (wrapper_my_b_safe_write(file, buf, 6));
});
int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
+ return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
}
bool Table_map_log_event::write_data_body(IO_CACHE *file)
@@ -8615,15 +9227,15 @@ bool Table_map_log_event::write_data_body(IO_CACHE *file)
uchar mbuf[sizeof(m_field_metadata_size)];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
- my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
- my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
- my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
- my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
- my_b_safe_write(file, m_coltype, m_colcnt) ||
- my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
- my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
- my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
+ return (wrapper_my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
+ wrapper_my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
+ wrapper_my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
+ wrapper_my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
+ wrapper_my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
+ wrapper_my_b_safe_write(file, m_coltype, m_colcnt) ||
+ wrapper_my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
+ wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
+ wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
}
#endif
@@ -9224,6 +9836,86 @@ record_compare_exit:
return result;
}
+
+/**
+ Find the best key to use when locating the row in @c find_row().
+
+ A primary key is preferred if it exists; otherwise a unique index is
+ preferred. Else we pick the index with the smalles rec_per_key value.
+
+ If a suitable key is found, set @c m_key, @c m_key_nr and @c m_key_info
+ member fields appropriately.
+
+ @returns Error code on failure, 0 on success.
+*/
+int Rows_log_event::find_key()
+{
+ uint i, best_key_nr, last_part;
+ KEY *key, *best_key;
+ ulong best_rec_per_key, tmp;
+ DBUG_ENTER("Rows_log_event::find_key");
+ DBUG_ASSERT(m_table);
+
+ best_key_nr= MAX_KEY;
+ LINT_INIT(best_key);
+ LINT_INIT(best_rec_per_key);
+
+ /*
+ Keys are sorted so that any primary key is first, followed by unique keys,
+ followed by any other. So we will automatically pick the primary key if
+ it exists.
+ */
+ for (i= 0, key= m_table->key_info; i < m_table->s->keys; i++, key++)
+ {
+ if (!m_table->s->keys_in_use.is_set(i))
+ continue;
+ /*
+ We cannot use a unique key with NULL-able columns to uniquely identify
+ a row (but we can still select it for range scan below if nothing better
+ is available).
+ */
+ if ((key->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)
+ {
+ best_key_nr= i;
+ best_key= key;
+ break;
+ }
+ /*
+ We can only use a non-unique key if it allows range scans (ie. skip
+ FULLTEXT indexes and such).
+ */
+ last_part= key->key_parts - 1;
+ DBUG_PRINT("info", ("Index %s rec_per_key[%u]= %lu",
+ key->name, last_part, key->rec_per_key[last_part]));
+ if (!(m_table->file->index_flags(i, last_part, 1) & HA_READ_NEXT))
+ continue;
+
+ tmp= key->rec_per_key[last_part];
+ if (best_key_nr == MAX_KEY || (tmp > 0 && tmp < best_rec_per_key))
+ {
+ best_key_nr= i;
+ best_key= key;
+ best_rec_per_key= tmp;
+ }
+ }
+
+ if (best_key_nr == MAX_KEY)
+ {
+ m_key_info= NULL;
+ DBUG_RETURN(0);
+ }
+
+ // Allocate buffer for key searches
+ m_key= (uchar *) my_malloc(best_key->key_length, MYF(MY_WME));
+ if (m_key == NULL)
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ m_key_info= best_key;
+ m_key_nr= best_key_nr;
+
+ DBUG_RETURN(0);;
+}
+
+
/**
Locate the current row in event's table.
@@ -9323,12 +10015,17 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
*/
store_record(table,record[1]);
- if (table->s->keys > 0 && table->s->keys_in_use.is_set(0))
+ if (m_key_info)
{
- DBUG_PRINT("info",("locating record using primary key (index_read)"));
+ DBUG_PRINT("info",("locating record using key #%u [%s] (index_read)",
+ m_key_nr, m_key_info->name));
+ /* We use this to test that the correct key is used in test cases. */
+ DBUG_EXECUTE_IF("slave_crash_if_wrong_index",
+ if(0 != strcmp(m_key_info->name,"expected_key")) abort(););
- /* The 0th key is active: search the table using the index */
- if (!table->file->inited && (error= table->file->ha_index_init(0, FALSE)))
+ /* The key is active: search the table using the index */
+ if (!table->file->inited &&
+ (error= table->file->ha_index_init(m_key_nr, FALSE)))
{
DBUG_PRINT("info",("ha_index_init returns error %d",error));
table->file->print_error(error, MYF(0));
@@ -9338,14 +10035,14 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
/* Fill key data for the row */
DBUG_ASSERT(m_key);
- key_copy(m_key, table->record[0], table->key_info, 0);
+ key_copy(m_key, table->record[0], m_key_info, 0);
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
*/
#ifndef HAVE_valgrind
- DBUG_DUMP("key data", m_key, table->key_info->key_length);
+ DBUG_DUMP("key data", m_key, m_key_info->key_length);
#endif
/*
@@ -9431,6 +10128,8 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
record we are looking for is stored in record[1].
*/
DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
+ /* We use this to test that the correct key is used in test cases. */
+ DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort(););
while (record_compare(table))
{
@@ -9469,6 +10168,8 @@ int Rows_log_event::find_row(const Relay_log_info *rli)
else
{
DBUG_PRINT("info",("locating record using table scan (rnd_next)"));
+ /* We use this to test that the correct key is used in test cases. */
+ DBUG_EXECUTE_IF("slave_crash_if_table_scan", abort(););
int restart_count= 0; // Number of times scanning has restarted from top
@@ -9588,14 +10289,7 @@ Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability
return 0;
}
- if (m_table->s->keys > 0)
- {
- // Allocate buffer for key searches
- m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
- if (!m_key)
- return HA_ERR_OUT_OF_MEM;
- }
- return 0;
+ return find_key();
}
int
@@ -9606,6 +10300,7 @@ Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability
m_table->file->ha_index_or_rnd_end();
my_free(m_key);
m_key= NULL;
+ m_key_info= NULL;
return error;
}
@@ -9708,13 +10403,9 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
int
Update_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
{
- if (m_table->s->keys > 0)
- {
- // Allocate buffer for key searches
- m_key= (uchar*)my_malloc(m_table->key_info->key_length, MYF(MY_WME));
- if (!m_key)
- return HA_ERR_OUT_OF_MEM;
- }
+ int err;
+ if ((err= find_key()))
+ return err;
m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
@@ -9729,6 +10420,7 @@ Update_rows_log_event::do_after_row_operations(const Slave_reporting_capability
m_table->file->ha_index_or_rnd_end();
my_free(m_key); // Free for multi_malloc
m_key= NULL;
+ m_key_info= NULL;
return error;
}
@@ -9904,13 +10596,25 @@ Incident_log_event::write_data_header(IO_CACHE *file)
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
uchar buf[sizeof(int16)];
int2store(buf, (int16) m_incident);
- DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf)));
+#ifndef MYSQL_CLIENT
+ DBUG_RETURN(wrapper_my_b_safe_write(file, buf, sizeof(buf)));
+#else
+ DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf)));
+#endif
}
bool
Incident_log_event::write_data_body(IO_CACHE *file)
{
+ uchar tmp[1];
DBUG_ENTER("Incident_log_event::write_data_body");
+ tmp[0]= (uchar) m_message.length;
+ crc= my_checksum(crc, (uchar*) tmp, 1);
+ if (m_message.length > 0)
+ {
+ crc= my_checksum(crc, (uchar*) m_message.str, m_message.length);
+ // todo: report a bug on write_str accepts uint but treats it as uchar
+ }
DBUG_RETURN(write_str(file, m_message.str, (uint) m_message.length));
}