summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/handler.cc9
-rw-r--r--sql/log.cc24
-rw-r--r--sql/log.h3
-rw-r--r--sql/log_event.cc164
-rw-r--r--sql/log_event.h62
-rw-r--r--sql/mysql_priv.h4
-rw-r--r--sql/mysqld.cc15
-rw-r--r--sql/rpl_rli.cc4
-rw-r--r--sql/rpl_rli.h38
-rw-r--r--sql/set_var.cc3
-rw-r--r--sql/slave.cc63
-rw-r--r--sql/slave.h1
-rw-r--r--sql/sql_binlog.cc2
-rw-r--r--sql/sql_class.h4
-rw-r--r--sql/sql_insert.cc5
-rw-r--r--sql/sql_repl.cc74
16 files changed, 409 insertions, 66 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index e851d9248c2..9bfbc816d3f 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -4699,7 +4699,8 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table)
/** @brief
Write table maps for all (manually or automatically) locked tables
- to the binary log.
+ to the binary log. Also, if binlog_annotate_rows_events is ON,
+ write Annotate_rows event before the first table map.
SYNOPSIS
write_locked_table_maps()
@@ -4736,6 +4737,9 @@ static int write_locked_table_maps(THD *thd)
locks[0]= thd->extra_lock;
locks[1]= thd->lock;
locks[2]= thd->locked_tables;
+ my_bool with_annotate= thd->variables.binlog_annotate_rows_events &&
+ thd->query() && thd->query_length();
+
for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i )
{
MYSQL_LOCK const *const lock= locks[i];
@@ -4753,7 +4757,8 @@ static int write_locked_table_maps(THD *thd)
check_table_binlog_row_based(thd, table))
{
int const has_trans= table->file->has_transactions();
- int const error= thd->binlog_write_table_map(table, has_trans);
+ int const error= thd->binlog_write_table_map(table, has_trans,
+ &with_annotate);
/*
If an error occurs, it is the responsibility of the caller to
roll back the transaction.
diff --git a/sql/log.cc b/sql/log.cc
index e853fedc3d1..4e07bd76739 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -4175,10 +4175,12 @@ void THD::binlog_set_stmt_begin() {
/*
- Write a table map to the binary log.
+ Write a table map to the binary log. If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the table map.
*/
-int THD::binlog_write_table_map(TABLE *table, bool is_trans)
+int THD::binlog_write_table_map(TABLE *table, bool is_trans,
+ my_bool *with_annotate)
{
int error;
DBUG_ENTER("THD::binlog_write_table_map");
@@ -4196,7 +4198,7 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans)
if (is_trans && binlog_table_maps == 0)
binlog_start_trans_and_stmt();
- if ((error= mysql_bin_log.write(&the_event)))
+ if ((error= mysql_bin_log.write(&the_event, with_annotate)))
DBUG_RETURN(error);
binlog_table_maps++;
@@ -4326,10 +4328,12 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
}
/**
- Write an event to the binary log.
+ Write an event to the binary log. If with_annotate != NULL and
+ *with_annotate = TRUE write also Annotate_rows before the event
+ (this should happen only if the event is a Table_map).
*/
-bool MYSQL_BIN_LOG::write(Log_event *event_info)
+bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
{
THD *thd= event_info->thd;
bool error= 1;
@@ -4448,6 +4452,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info)
of the SQL command
*/
+ if (with_annotate && *with_annotate)
+ {
+ DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT);
+ Annotate_rows_log_event anno(thd);
+ /* Annotate event should be written not more than once */
+ *with_annotate= 0;
+ if (anno.write(file))
+ goto err;
+ }
+
/*
If row-based binlogging, Insert_id, Rand and other kind of "setting
context" events are not needed.
diff --git a/sql/log.h b/sql/log.h
index 8ee94ab5807..491be35bd04 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -357,7 +357,8 @@ public:
void new_file();
void reset_gathered_updates(THD *thd);
- bool write(Log_event* event_info); // binary log write
+ bool write(Log_event* event_info,
+ my_bool *with_annotate= 0); // binary log write
bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident);
bool write_incident(THD *thd, bool lock);
int write_cache(THD *thd, IO_CACHE *cache, bool lock_log,
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fb4fc5b79b3..d4f71194121 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -649,6 +649,7 @@ const char* Log_event::get_type_str(Log_event_type type)
case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query";
case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query";
case INCIDENT_EVENT: return "Incident";
+ case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
default: return "Unknown"; /* impossible */
}
}
@@ -728,7 +729,7 @@ Log_event::Log_event(const char* buf,
logs are in 4.0 format, until it finds a Format_desc).
*/
if (description_event->binlog_version==3 &&
- buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
+ (uchar)buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos)
{
/*
If log_pos=0, don't change it. log_pos==0 is a marker to mean
@@ -746,8 +747,8 @@ Log_event::Log_event(const char* buf,
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
flags= uint2korr(buf + FLAGS_OFFSET);
- if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
- (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
+ if (((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) ||
+ ((uchar)buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT))
{
/*
These events always have a header which stops here (i.e. their
@@ -1168,14 +1169,14 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
/* Check the integrity */
if (event_len < EVENT_LEN_OFFSET ||
- buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
+ (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT ||
(uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET))
{
*error="Sanity check failed"; // Needed to free buffer
DBUG_RETURN(NULL); // general sanity check - will fail on a partial read
}
- uint event_type= buf[EVENT_TYPE_OFFSET];
+ uint event_type= (uchar)buf[EVENT_TYPE_OFFSET];
if (event_type > description_event->number_of_event_types &&
event_type != FORMAT_DESCRIPTION_EVENT)
{
@@ -1297,6 +1298,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
case INCIDENT_EVENT:
ev = new Incident_log_event(buf, event_len, description_event);
break;
+ case ANNOTATE_ROWS_EVENT:
+ ev = new Annotate_rows_log_event(buf, event_len, description_event);
+ break;
default:
DBUG_PRINT("error",("Unknown event code: %d",
(int) buf[EVENT_TYPE_OFFSET]));
@@ -3795,6 +3799,13 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[DELETE_ROWS_EVENT-1]= 6;);
post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN;
+ // Set header length of the reserved events to 0
+ memset(post_header_len + MYSQL_EVENTS_END - 1, 0,
+ (MARIA_EVENTS_BEGIN - MYSQL_EVENTS_END)*sizeof(uint8));
+
+ // Set header lengths of Maria events
+ post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
+
// Sanity-check that all post header lengths are initialized.
IF_DBUG({
int i;
@@ -4439,8 +4450,8 @@ Load_log_event::Load_log_event(const char *buf, uint event_len,
*/
if (event_len)
copy_log_event(buf, event_len,
- ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
- LOAD_HEADER_LEN +
+ (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ LOAD_HEADER_LEN +
description_event->common_header_len :
LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN),
description_event);
@@ -4477,7 +4488,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len,
*/
if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset,
buf_end,
- buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
+ (uchar)buf[EVENT_TYPE_OFFSET] != LOAD_EVENT)))
DBUG_RETURN(1);
data_len = event_len - body_offset;
@@ -6172,7 +6183,7 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len,
uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1];
if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) ||
copy_log_event(event_buf,len,
- ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
+ (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ?
load_header_len + header_len :
(fake_base ? (header_len+load_header_len) :
(header_len+load_header_len) +
@@ -7925,6 +7936,141 @@ void Rows_log_event::print_helper(FILE *file,
#endif
/**************************************************************************
+ Annotate_rows_log_event member functions
+**************************************************************************/
+
+#ifndef MYSQL_CLIENT
+Annotate_rows_log_event::Annotate_rows_log_event(THD *thd)
+ : Log_event(thd, 0, true),
+ m_save_thd_query_txt(0),
+ m_save_thd_query_len(0)
+{
+ m_query_txt= thd->query();
+ m_query_len= thd->query_length();
+}
+#endif
+
+Annotate_rows_log_event::Annotate_rows_log_event(const char *buf,
+ uint event_len,
+ const Format_description_log_event *desc)
+ : Log_event(buf, desc),
+ m_save_thd_query_txt(0),
+ m_save_thd_query_len(0)
+{
+ m_query_len= event_len - desc->common_header_len;
+ m_query_txt= (char*) buf + desc->common_header_len;
+}
+
+Annotate_rows_log_event::~Annotate_rows_log_event()
+{
+#ifndef MYSQL_CLIENT
+ if (m_save_thd_query_txt)
+ thd->set_query(m_save_thd_query_txt, m_save_thd_query_len);
+#endif
+}
+
+int Annotate_rows_log_event::get_data_size()
+{
+ return m_query_len;
+}
+
+Log_event_type Annotate_rows_log_event::get_type_code()
+{
+ return ANNOTATE_ROWS_EVENT;
+}
+
+bool Annotate_rows_log_event::is_valid() const
+{
+ return (m_query_txt != NULL && m_query_len != 0);
+}
+
+#ifndef MYSQL_CLIENT
+bool Annotate_rows_log_event::write_data_header(IO_CACHE *file)
+{
+ return 0;
+}
+#endif
+
+#ifndef MYSQL_CLIENT
+bool Annotate_rows_log_event::write_data_body(IO_CACHE *file)
+{
+ return my_b_safe_write(file, (uchar*) m_query_txt, m_query_len);
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+void Annotate_rows_log_event::pack_info(Protocol* protocol)
+{
+ if (m_query_txt && m_query_len)
+ protocol->store(m_query_txt, m_query_len, &my_charset_bin);
+}
+#endif
+
+#ifdef MYSQL_CLIENT
+void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo)
+{
+ if (pinfo->short_form)
+ return;
+
+ print_header(&pinfo->head_cache, pinfo, TRUE);
+ my_b_printf(&pinfo->head_cache, "\tAnnotate_rows:\n");
+
+ char *pbeg; // beginning of the next line
+ char *pend; // end of the next line
+ uint cnt= 0; // characters counter
+
+ for (pbeg= m_query_txt; ; pbeg= pend)
+ {
+ // skip all \r's and \n's at the beginning of the next line
+ for (;; pbeg++)
+ {
+ if (++cnt > m_query_len)
+ return;
+
+ if (*pbeg != '\r' && *pbeg != '\n')
+ break;
+ }
+
+ // find end of the next line
+ for (pend= pbeg + 1;
+ ++cnt <= m_query_len && *pend != '\r' && *pend != '\n';
+ pend++);
+
+ // print next line
+ my_b_write(&pinfo->head_cache, (const uchar*) "#Q> ", 4);
+ my_b_write(&pinfo->head_cache, (const uchar*) pbeg, pend - pbeg);
+ my_b_write(&pinfo->head_cache, (const uchar*) "\n", 1);
+ }
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Annotate_rows_log_event::do_apply_event(Relay_log_info const *rli)
+{
+ m_save_thd_query_txt= thd->query();
+ m_save_thd_query_len= thd->query_length();
+ thd->set_query(m_query_txt, m_query_len);
+ return 0;
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+int Annotate_rows_log_event::do_update_pos(Relay_log_info *rli)
+{
+ rli->inc_event_relay_log_pos();
+ return 0;
+}
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+Log_event::enum_skip_reason
+Annotate_rows_log_event::do_shall_skip(Relay_log_info *rli)
+{
+ return continue_group(rli);
+}
+#endif
+
+/**************************************************************************
Table_map_log_event member functions and support functions
**************************************************************************/
diff --git a/sql/log_event.h b/sql/log_event.h
index 770fb29301b..6593929eb91 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -250,6 +250,7 @@ struct sql_ex_info
#define EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN (4 + 4 + 4 + 1)
#define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN)
#define INCIDENT_HEADER_LEN 2
+#define ANNOTATE_ROWS_HEADER_LEN 0
/*
Max number of possible extra bytes in a replication event compared to a
packet (i.e. a query) sent from client to master;
@@ -582,8 +583,14 @@ enum Log_event_type
*/
INCIDENT_EVENT= 26,
+ /* New MySQL/Sun events are to be added right above this comment */
+ MYSQL_EVENTS_END,
+
+ MARIA_EVENTS_BEGIN= 160,
+ /* New Maria event numbers start from here */
+ ANNOTATE_ROWS_EVENT= 160,
+
/*
- Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers
*/
@@ -2988,6 +2995,59 @@ public:
char *str_to_hex(char *to, const char *from, uint len);
/**
+ @class Annotate_rows_log_event
+
+ In row-based mode, if binlog_annotate_rows_events = ON, each group of
+ Table_map_log_events is preceded by an Annotate_rows_log_event which
+ contains the query which caused the subsequent rows operations.
+
+ The Annotate_rows_log_event has no post-header and its body contains
+ the corresponding query (without trailing zero). Note. The query length
+ is to be calculated as a difference between the whole event length and
+ the common header length.
+*/
+class Annotate_rows_log_event: public Log_event
+{
+public:
+#ifndef MYSQL_CLIENT
+ Annotate_rows_log_event(THD*);
+#endif
+ Annotate_rows_log_event(const char *buf, uint event_len,
+ const Format_description_log_event*);
+ ~Annotate_rows_log_event();
+
+ virtual int get_data_size();
+ virtual Log_event_type get_type_code();
+ virtual bool is_valid() const;
+
+#ifndef MYSQL_CLIENT
+ virtual bool write_data_header(IO_CACHE*);
+ virtual bool write_data_body(IO_CACHE*);
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual void pack_info(Protocol*);
+#endif
+
+#ifdef MYSQL_CLIENT
+ virtual void print(FILE*, PRINT_EVENT_INFO*);
+#endif
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+private:
+ virtual int do_apply_event(Relay_log_info const*);
+ virtual int do_update_pos(Relay_log_info*);
+ virtual enum_skip_reason do_shall_skip(Relay_log_info*);
+#endif
+
+private:
+ char *m_query_txt;
+ uint m_query_len;
+ char *m_save_thd_query_txt;
+ uint m_save_thd_query_len;
+};
+
+/**
@class Table_map_log_event
In row-based mode, every row operation event is preceded by a
diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h
index 3f836a5ab1f..1b3c56ceae4 100644
--- a/sql/mysql_priv.h
+++ b/sql/mysql_priv.h
@@ -611,7 +611,11 @@ protected:
/* BINLOG_DUMP options */
#define BINLOG_DUMP_NON_BLOCK 1
+#endif /* !MYSQL_CLIENT */
+#define BINLOG_SEND_ANNOTATE_ROWS_EVENT 2
+
+#ifndef MYSQL_CLIENT
/* sql_show.cc:show_log_files() */
#define SHOW_LOG_STATUS_FREE "FREE"
#define SHOW_LOG_STATUS_INUSE "IN USE"
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 3d6879e5b23..bc03beaf46d 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -512,6 +512,7 @@ my_bool opt_local_infile, opt_slave_compressed_protocol;
my_bool opt_safe_user_create = 0, opt_no_mix_types = 0;
my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0;
my_bool opt_log_slave_updates= 0;
+my_bool opt_replicate_annotate_rows_events= 0;
bool slave_warning_issued = false;
/*
@@ -5870,6 +5871,8 @@ enum options_mysqld
OPT_REPLICATE_IGNORE_DB, OPT_LOG_SLAVE_UPDATES,
OPT_BINLOG_DO_DB, OPT_BINLOG_IGNORE_DB,
OPT_BINLOG_FORMAT,
+ OPT_BINLOG_ANNOTATE_ROWS_EVENTS,
+ OPT_REPLICATE_ANNOTATE_ROWS_EVENTS,
#ifndef DBUG_OFF
OPT_BINLOG_SHOW_XID,
#endif
@@ -6096,6 +6099,18 @@ struct my_option my_long_options[] =
#endif
, &opt_binlog_format, &opt_binlog_format,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"binlog-annotate-rows-events", OPT_BINLOG_ANNOTATE_ROWS_EVENTS,
+ "Tells the master to annotate RBR events with the statement that "
+ "caused these events.",
+ (uchar**) &global_system_variables.binlog_annotate_rows_events,
+ (uchar**) &max_system_variables.binlog_annotate_rows_events,
+ 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
+ {"replicate-annotate-rows-events", OPT_REPLICATE_ANNOTATE_ROWS_EVENTS,
+ "Tells the slave to write annotate rows events recieved from the master "
+ "to its own binary log. Sensible only in pair with log-slave-updates option.",
+ (uchar**) &opt_replicate_annotate_rows_events,
+ (uchar**) &opt_replicate_annotate_rows_events,
+ 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
{"binlog-do-db", OPT_BINLOG_DO_DB,
"Tells the master it should log updates for the specified database, "
"and exclude all others not explicitly mentioned.",
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index c37c4735e37..a2d0b1e4904 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -38,7 +38,8 @@ Relay_log_info::Relay_log_info()
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
- last_event_start_time(0), m_flags(0)
+ last_event_start_time(0), m_flags(0),
+ m_annotate_event(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -72,6 +73,7 @@ Relay_log_info::~Relay_log_info()
pthread_cond_destroy(&stop_cond);
pthread_cond_destroy(&log_space_cond);
relay_log.cleanup();
+ free_annotate_event();
DBUG_VOID_RETURN;
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 5cafcf47086..0ea3c23bfd8 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -423,8 +423,46 @@ public:
(m_flags & (1UL << IN_STMT));
}
+ /**
+ Save pointer to Annotate_rows event and switch on the
+ binlog_annotate_rows_events for this sql thread.
+ To be called when sql thread recieves an Annotate_rows event.
+ */
+ inline void set_annotate_event(Annotate_rows_log_event *event)
+ {
+ free_annotate_event();
+ m_annotate_event= event;
+ sql_thd->variables.binlog_annotate_rows_events= 1;
+ }
+
+ /**
+ Returns pointer to the saved Annotate_rows event or NULL if there is
+ no saved event.
+ */
+ inline Annotate_rows_log_event* get_annotate_event()
+ {
+ return m_annotate_event;
+ }
+
+ /**
+ Delete saved Annotate_rows event (if any) and switch off the
+ binlog_annotate_rows_events for this sql thread.
+ To be called when sql thread has applied the last (i.e. with
+ STMT_END_F flag) rbr event.
+ */
+ inline void free_annotate_event()
+ {
+ if (m_annotate_event)
+ {
+ sql_thd->variables.binlog_annotate_rows_events= 0;
+ delete m_annotate_event;
+ m_annotate_event= 0;
+ }
+ }
+
private:
uint32 m_flags;
+ Annotate_rows_log_event *m_annotate_event;
};
diff --git a/sql/set_var.cc b/sql/set_var.cc
index 910deafc432..894b6519127 100644
--- a/sql/set_var.cc
+++ b/sql/set_var.cc
@@ -182,6 +182,9 @@ static sys_var_const sys_back_log(&vars, "back_log",
OPT_GLOBAL, SHOW_LONG,
(uchar*) &back_log);
static sys_var_const_os_str sys_basedir(&vars, "basedir", mysql_home);
+static sys_var_thd_bool
+sys_binlog_annotate_rows_events(&vars, "binlog_annotate_rows_events",
+ &SV::binlog_annotate_rows_events);
static sys_var_long_ptr sys_binlog_cache_size(&vars, "binlog_cache_size",
&binlog_cache_size);
static sys_var_thd_binlog_format sys_binlog_format(&vars, "binlog_format",
diff --git a/sql/slave.cc b/sql/slave.cc
index 0e41d897dd4..e159645db38 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1867,6 +1867,9 @@ static int request_dump(MYSQL* mysql, Master_info* mi,
*suppress_warnings= FALSE;
+ if (opt_log_slave_updates && opt_replicate_annotate_rows_events)
+ binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -2261,17 +2264,41 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
}
exec_res= apply_event_and_update_pos(ev, thd, rli);
- /*
- Format_description_log_event should not be deleted because it will be
- used to read info about the relay log's format; it will be deleted when
- the SQL thread does not need it, i.e. when this thread terminates.
- */
- if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
- {
- DBUG_PRINT("info", ("Deleting the event after it has been executed"));
- delete ev;
+ switch (ev->get_type_code()) {
+ case FORMAT_DESCRIPTION_EVENT:
+ /*
+ Format_description_log_event should not be deleted because it
+ will be used to read info about the relay log's format;
+ it will be deleted when the SQL thread does not need it,
+ i.e. when this thread terminates.
+ */
+ break;
+ case ANNOTATE_ROWS_EVENT:
+ /*
+ Annotate_rows event should not be deleted because after it has
+ been applied, thd->query points to the string inside this event.
+ The thd->query will be used to generate new Annotate_rows event
+ during applying the subsequent Rows events.
+ */
+ rli->set_annotate_event((Annotate_rows_log_event*) ev);
+ break;
+ case DELETE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case WRITE_ROWS_EVENT:
+ /*
+ After the last Rows event has been applied, the saved Annotate_rows
+ event (if any) is not needed anymore and can be deleted.
+ */
+ if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
+ rli->free_annotate_event();
+ /* fall through */
+ default:
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ delete ev;
+ break;
}
+
/*
update_log_pos failed: this should not happen, so we don't
retry.
@@ -2899,6 +2926,12 @@ pthread_handler_t handle_slave_sql(void *arg)
thd->init_for_queries();
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
+ /*
+ binlog_annotate_rows_events must be TRUE only after an Annotate_rows event
+ has been recieved and only till the last corresponding rbr event has been
+ applied. In all other cases it must be FALSE.
+ */
+ thd->variables.binlog_annotate_rows_events= 0;
pthread_mutex_lock(&LOCK_thread_count);
threads.append(thd);
pthread_mutex_unlock(&LOCK_thread_count);
@@ -3382,7 +3415,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf,
If we get Load event, we need to pass a non-reusable buffer
to read_log_event, so we do a trick
*/
- if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
+ if ((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
{
if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
{
@@ -3589,13 +3622,13 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
LINT_INIT(inc_pos);
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
- buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
+ (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
LINT_INIT(inc_pos);
pthread_mutex_lock(&mi->data_lock);
- switch (buf[EVENT_TYPE_OFFSET]) {
+ switch ((uchar)buf[EVENT_TYPE_OFFSET]) {
case STOP_EVENT:
/*
We needn't write this event to the relay log. Indeed, it just indicates a
@@ -3698,9 +3731,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
mi->master_log_pos.
*/
- if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
- buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
+ if ((uchar)buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
+ (uchar)buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
{
mi->master_log_pos+= inc_pos;
memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
diff --git a/sql/slave.h b/sql/slave.h
index 1a1cfcebd9b..63dd3afc6b8 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -106,6 +106,7 @@ extern MYSQL_PLUGIN_IMPORT char *relay_log_info_file;
extern char *opt_relay_logname, *opt_relaylog_index_name;
extern my_bool opt_skip_slave_start, opt_reckless_slave;
extern my_bool opt_log_slave_updates;
+extern my_bool opt_replicate_annotate_rows_events;
extern ulonglong relay_log_space_limit;
/*
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index da582c37ae9..9a7aac5774b 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -167,7 +167,7 @@ void mysql_client_binlog_statement(THD* thd)
*/
if (!have_fd_event)
{
- int type = bufptr[EVENT_TYPE_OFFSET];
+ int type = (uchar)bufptr[EVENT_TYPE_OFFSET];
if (type == FORMAT_DESCRIPTION_EVENT || type == START_EVENT_V3)
have_fd_event= TRUE;
else
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 2fe579713e1..ae7994c65c9 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -367,6 +367,7 @@ struct system_variables
ulong ndb_index_stat_cache_entries;
ulong ndb_index_stat_update_freq;
ulong binlog_format; // binlog format for this thd (see enum_binlog_format)
+ my_bool binlog_annotate_rows_events;
my_bool binlog_direct_non_trans_update;
/*
In slave thread we need to know in behalf of which
@@ -1478,7 +1479,8 @@ public:
*/
void binlog_start_trans_and_stmt();
void binlog_set_stmt_begin();
- int binlog_write_table_map(TABLE *table, bool is_transactional);
+ int binlog_write_table_map(TABLE *table, bool is_transactional,
+ my_bool *with_annotate= 0);
int binlog_write_row(TABLE* table, bool is_transactional,
MY_BITMAP const* cols, size_t colcnt,
const uchar *buf);
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index 72ca7310ee1..48105cf9f3c 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -1966,6 +1966,11 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list)
pthread_mutex_lock(&LOCK_thread_count);
thread_count++;
pthread_mutex_unlock(&LOCK_thread_count);
+ /*
+ Annotating delayed inserts is not supported.
+ */
+ di->thd.variables.binlog_annotate_rows_events= 0;
+
di->thd.set_db(table_list->db, (uint) strlen(table_list->db));
di->thd.set_query(my_strdup(table_list->table_name, MYF(MY_WME)), 0);
if (di->thd.db == NULL || di->thd.query() == NULL)
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index 95e48c531be..51361fdf6e4 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -489,7 +489,7 @@ impossible position";
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
(*packet)[EVENT_TYPE_OFFSET+1]));
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
@@ -557,31 +557,36 @@ impossible position";
#endif
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
- else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+ else if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] != ANNOTATE_ROWS_EVENT ||
+ (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
{
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
+ {
+ errmsg = "Failed on my_net_write()";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
- {
- if (send_file(thd))
- {
- errmsg = "failed in send_file()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
+ DBUG_PRINT("info", ("log event code %d",
+ (*packet)[LOG_EVENT_OFFSET+1] ));
+ if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ {
+ if (send_file(thd))
+ {
+ errmsg = "failed in send_file()";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ }
}
packet->set("\0", 1, &my_charset_bin);
}
@@ -677,23 +682,27 @@ impossible position";
if (read_packet)
{
- thd_proc_info(thd, "Sending binlog event to slave");
- if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
- {
- errmsg = "Failed on my_net_write()";
- my_errno= ER_UNKNOWN_ERROR;
- goto err;
- }
-
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
- {
- if (send_file(thd))
+ if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] != ANNOTATE_ROWS_EVENT ||
+ (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT))
+ {
+ thd_proc_info(thd, "Sending binlog event to slave");
+ if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
- errmsg = "failed in send_file()";
+ errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
- }
+
+ if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ {
+ if (send_file(thd))
+ {
+ errmsg = "failed in send_file()";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+ }
+ }
packet->set("\0", 1, &my_charset_bin);
/*
No need to net_flush because we will get to flush later when
@@ -1774,6 +1783,11 @@ static sys_var_chain vars = { NULL, NULL };
static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
OPT_GLOBAL, SHOW_MY_BOOL,
(uchar*) &opt_log_slave_updates);
+static sys_var_const
+sys_replicate_annotate_rows_events(&vars,
+ "replicate_annotate_rows_events",
+ OPT_GLOBAL, SHOW_MY_BOOL,
+ (uchar*) &opt_replicate_annotate_rows_events);
static sys_var_const sys_relay_log(&vars, "relay_log",
OPT_GLOBAL, SHOW_CHAR_PTR,
(uchar*) &opt_relay_logname);