diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 9 | ||||
-rw-r--r-- | sql/log.cc | 24 | ||||
-rw-r--r-- | sql/log.h | 3 | ||||
-rw-r--r-- | sql/log_event.cc | 164 | ||||
-rw-r--r-- | sql/log_event.h | 62 | ||||
-rw-r--r-- | sql/mysql_priv.h | 4 | ||||
-rw-r--r-- | sql/mysqld.cc | 15 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 4 | ||||
-rw-r--r-- | sql/rpl_rli.h | 38 | ||||
-rw-r--r-- | sql/set_var.cc | 3 | ||||
-rw-r--r-- | sql/slave.cc | 63 | ||||
-rw-r--r-- | sql/slave.h | 1 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 2 | ||||
-rw-r--r-- | sql/sql_class.h | 4 | ||||
-rw-r--r-- | sql/sql_insert.cc | 5 | ||||
-rw-r--r-- | sql/sql_repl.cc | 74 |
16 files changed, 409 insertions, 66 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index e851d9248c2..9bfbc816d3f 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -4699,7 +4699,8 @@ static bool check_table_binlog_row_based(THD *thd, TABLE *table) /** @brief Write table maps for all (manually or automatically) locked tables - to the binary log. + to the binary log. Also, if binlog_annotate_rows_events is ON, + write Annotate_rows event before the first table map. SYNOPSIS write_locked_table_maps() @@ -4736,6 +4737,9 @@ static int write_locked_table_maps(THD *thd) locks[0]= thd->extra_lock; locks[1]= thd->lock; locks[2]= thd->locked_tables; + my_bool with_annotate= thd->variables.binlog_annotate_rows_events && + thd->query() && thd->query_length(); + for (uint i= 0 ; i < sizeof(locks)/sizeof(*locks) ; ++i ) { MYSQL_LOCK const *const lock= locks[i]; @@ -4753,7 +4757,8 @@ static int write_locked_table_maps(THD *thd) check_table_binlog_row_based(thd, table)) { int const has_trans= table->file->has_transactions(); - int const error= thd->binlog_write_table_map(table, has_trans); + int const error= thd->binlog_write_table_map(table, has_trans, + &with_annotate); /* If an error occurs, it is the responsibility of the caller to roll back the transaction. diff --git a/sql/log.cc b/sql/log.cc index e853fedc3d1..4e07bd76739 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -4175,10 +4175,12 @@ void THD::binlog_set_stmt_begin() { /* - Write a table map to the binary log. + Write a table map to the binary log. If with_annotate != NULL and + *with_annotate = TRUE write also Annotate_rows before the table map. */ -int THD::binlog_write_table_map(TABLE *table, bool is_trans) +int THD::binlog_write_table_map(TABLE *table, bool is_trans, + my_bool *with_annotate) { int error; DBUG_ENTER("THD::binlog_write_table_map"); @@ -4196,7 +4198,7 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) if (is_trans && binlog_table_maps == 0) binlog_start_trans_and_stmt(); - if ((error= mysql_bin_log.write(&the_event))) + if ((error= mysql_bin_log.write(&the_event, with_annotate))) DBUG_RETURN(error); binlog_table_maps++; @@ -4326,10 +4328,12 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, } /** - Write an event to the binary log. + Write an event to the binary log. If with_annotate != NULL and + *with_annotate = TRUE write also Annotate_rows before the event + (this should happen only if the event is a Table_map). */ -bool MYSQL_BIN_LOG::write(Log_event *event_info) +bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) { THD *thd= event_info->thd; bool error= 1; @@ -4448,6 +4452,16 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) of the SQL command */ + if (with_annotate && *with_annotate) + { + DBUG_ASSERT(event_info->get_type_code() == TABLE_MAP_EVENT); + Annotate_rows_log_event anno(thd); + /* Annotate event should be written not more than once */ + *with_annotate= 0; + if (anno.write(file)) + goto err; + } + /* If row-based binlogging, Insert_id, Rand and other kind of "setting context" events are not needed. diff --git a/sql/log.h b/sql/log.h index 8ee94ab5807..491be35bd04 100644 --- a/sql/log.h +++ b/sql/log.h @@ -357,7 +357,8 @@ public: void new_file(); void reset_gathered_updates(THD *thd); - bool write(Log_event* event_info); // binary log write + bool write(Log_event* event_info, + my_bool *with_annotate= 0); // binary log write bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event, bool incident); bool write_incident(THD *thd, bool lock); int write_cache(THD *thd, IO_CACHE *cache, bool lock_log, diff --git a/sql/log_event.cc b/sql/log_event.cc index fb4fc5b79b3..d4f71194121 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -649,6 +649,7 @@ const char* Log_event::get_type_str(Log_event_type type) case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query"; case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; case INCIDENT_EVENT: return "Incident"; + case ANNOTATE_ROWS_EVENT: return "Annotate_rows"; default: return "Unknown"; /* impossible */ } } @@ -728,7 +729,7 @@ Log_event::Log_event(const char* buf, logs are in 4.0 format, until it finds a Format_desc). */ if (description_event->binlog_version==3 && - buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos) + (uchar)buf[EVENT_TYPE_OFFSET]<FORMAT_DESCRIPTION_EVENT && log_pos) { /* If log_pos=0, don't change it. log_pos==0 is a marker to mean @@ -746,8 +747,8 @@ Log_event::Log_event(const char* buf, DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); flags= uint2korr(buf + FLAGS_OFFSET); - if ((buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) || - (buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT)) + if (((uchar)buf[EVENT_TYPE_OFFSET] == FORMAT_DESCRIPTION_EVENT) || + ((uchar)buf[EVENT_TYPE_OFFSET] == ROTATE_EVENT)) { /* These events always have a header which stops here (i.e. their @@ -1168,14 +1169,14 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, /* Check the integrity */ if (event_len < EVENT_LEN_OFFSET || - buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || + (uchar)buf[EVENT_TYPE_OFFSET] >= ENUM_END_EVENT || (uint) event_len != uint4korr(buf+EVENT_LEN_OFFSET)) { *error="Sanity check failed"; // Needed to free buffer DBUG_RETURN(NULL); // general sanity check - will fail on a partial read } - uint event_type= buf[EVENT_TYPE_OFFSET]; + uint event_type= (uchar)buf[EVENT_TYPE_OFFSET]; if (event_type > description_event->number_of_event_types && event_type != FORMAT_DESCRIPTION_EVENT) { @@ -1297,6 +1298,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case INCIDENT_EVENT: ev = new Incident_log_event(buf, event_len, description_event); break; + case ANNOTATE_ROWS_EVENT: + ev = new Annotate_rows_log_event(buf, event_len, description_event); + break; default: DBUG_PRINT("error",("Unknown event code: %d", (int) buf[EVENT_TYPE_OFFSET])); @@ -3795,6 +3799,13 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[DELETE_ROWS_EVENT-1]= 6;); post_header_len[INCIDENT_EVENT-1]= INCIDENT_HEADER_LEN; + // Set header length of the reserved events to 0 + memset(post_header_len + MYSQL_EVENTS_END - 1, 0, + (MARIA_EVENTS_BEGIN - MYSQL_EVENTS_END)*sizeof(uint8)); + + // Set header lengths of Maria events + post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN; + // Sanity-check that all post header lengths are initialized. IF_DBUG({ int i; @@ -4439,8 +4450,8 @@ Load_log_event::Load_log_event(const char *buf, uint event_len, */ if (event_len) copy_log_event(buf, event_len, - ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? - LOAD_HEADER_LEN + + (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + LOAD_HEADER_LEN + description_event->common_header_len : LOAD_HEADER_LEN + LOG_EVENT_HEADER_LEN), description_event); @@ -4477,7 +4488,7 @@ int Load_log_event::copy_log_event(const char *buf, ulong event_len, */ if (!(field_lens= (uchar*)sql_ex.init((char*)buf + body_offset, buf_end, - buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) + (uchar)buf[EVENT_TYPE_OFFSET] != LOAD_EVENT))) DBUG_RETURN(1); data_len = event_len - body_offset; @@ -6172,7 +6183,7 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, uint8 create_file_header_len= description_event->post_header_len[CREATE_FILE_EVENT-1]; if (!(event_buf= (char*) my_memdup(buf, len, MYF(MY_WME))) || copy_log_event(event_buf,len, - ((buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? + (((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) ? load_header_len + header_len : (fake_base ? (header_len+load_header_len) : (header_len+load_header_len) + @@ -7925,6 +7936,141 @@ void Rows_log_event::print_helper(FILE *file, #endif /************************************************************************** + Annotate_rows_log_event member functions +**************************************************************************/ + +#ifndef MYSQL_CLIENT +Annotate_rows_log_event::Annotate_rows_log_event(THD *thd) + : Log_event(thd, 0, true), + m_save_thd_query_txt(0), + m_save_thd_query_len(0) +{ + m_query_txt= thd->query(); + m_query_len= thd->query_length(); +} +#endif + +Annotate_rows_log_event::Annotate_rows_log_event(const char *buf, + uint event_len, + const Format_description_log_event *desc) + : Log_event(buf, desc), + m_save_thd_query_txt(0), + m_save_thd_query_len(0) +{ + m_query_len= event_len - desc->common_header_len; + m_query_txt= (char*) buf + desc->common_header_len; +} + +Annotate_rows_log_event::~Annotate_rows_log_event() +{ +#ifndef MYSQL_CLIENT + if (m_save_thd_query_txt) + thd->set_query(m_save_thd_query_txt, m_save_thd_query_len); +#endif +} + +int Annotate_rows_log_event::get_data_size() +{ + return m_query_len; +} + +Log_event_type Annotate_rows_log_event::get_type_code() +{ + return ANNOTATE_ROWS_EVENT; +} + +bool Annotate_rows_log_event::is_valid() const +{ + return (m_query_txt != NULL && m_query_len != 0); +} + +#ifndef MYSQL_CLIENT +bool Annotate_rows_log_event::write_data_header(IO_CACHE *file) +{ + return 0; +} +#endif + +#ifndef MYSQL_CLIENT +bool Annotate_rows_log_event::write_data_body(IO_CACHE *file) +{ + return my_b_safe_write(file, (uchar*) m_query_txt, m_query_len); +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +void Annotate_rows_log_event::pack_info(Protocol* protocol) +{ + if (m_query_txt && m_query_len) + protocol->store(m_query_txt, m_query_len, &my_charset_bin); +} +#endif + +#ifdef MYSQL_CLIENT +void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo) +{ + if (pinfo->short_form) + return; + + print_header(&pinfo->head_cache, pinfo, TRUE); + my_b_printf(&pinfo->head_cache, "\tAnnotate_rows:\n"); + + char *pbeg; // beginning of the next line + char *pend; // end of the next line + uint cnt= 0; // characters counter + + for (pbeg= m_query_txt; ; pbeg= pend) + { + // skip all \r's and \n's at the beginning of the next line + for (;; pbeg++) + { + if (++cnt > m_query_len) + return; + + if (*pbeg != '\r' && *pbeg != '\n') + break; + } + + // find end of the next line + for (pend= pbeg + 1; + ++cnt <= m_query_len && *pend != '\r' && *pend != '\n'; + pend++); + + // print next line + my_b_write(&pinfo->head_cache, (const uchar*) "#Q> ", 4); + my_b_write(&pinfo->head_cache, (const uchar*) pbeg, pend - pbeg); + my_b_write(&pinfo->head_cache, (const uchar*) "\n", 1); + } +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Annotate_rows_log_event::do_apply_event(Relay_log_info const *rli) +{ + m_save_thd_query_txt= thd->query(); + m_save_thd_query_len= thd->query_length(); + thd->set_query(m_query_txt, m_query_len); + return 0; +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +int Annotate_rows_log_event::do_update_pos(Relay_log_info *rli) +{ + rli->inc_event_relay_log_pos(); + return 0; +} +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +Log_event::enum_skip_reason +Annotate_rows_log_event::do_shall_skip(Relay_log_info *rli) +{ + return continue_group(rli); +} +#endif + +/************************************************************************** Table_map_log_event member functions and support functions **************************************************************************/ diff --git a/sql/log_event.h b/sql/log_event.h index 770fb29301b..6593929eb91 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -250,6 +250,7 @@ struct sql_ex_info #define EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN (4 + 4 + 4 + 1) #define EXECUTE_LOAD_QUERY_HEADER_LEN (QUERY_HEADER_LEN + EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN) #define INCIDENT_HEADER_LEN 2 +#define ANNOTATE_ROWS_HEADER_LEN 0 /* Max number of possible extra bytes in a replication event compared to a packet (i.e. a query) sent from client to master; @@ -582,8 +583,14 @@ enum Log_event_type */ INCIDENT_EVENT= 26, + /* New MySQL/Sun events are to be added right above this comment */ + MYSQL_EVENTS_END, + + MARIA_EVENTS_BEGIN= 160, + /* New Maria event numbers start from here */ + ANNOTATE_ROWS_EVENT= 160, + /* - Add new events here - right above this comment! Existing events (except ENUM_END_EVENT) should never change their numbers */ @@ -2988,6 +2995,59 @@ public: char *str_to_hex(char *to, const char *from, uint len); /** + @class Annotate_rows_log_event + + In row-based mode, if binlog_annotate_rows_events = ON, each group of + Table_map_log_events is preceded by an Annotate_rows_log_event which + contains the query which caused the subsequent rows operations. + + The Annotate_rows_log_event has no post-header and its body contains + the corresponding query (without trailing zero). Note. The query length + is to be calculated as a difference between the whole event length and + the common header length. +*/ +class Annotate_rows_log_event: public Log_event +{ +public: +#ifndef MYSQL_CLIENT + Annotate_rows_log_event(THD*); +#endif + Annotate_rows_log_event(const char *buf, uint event_len, + const Format_description_log_event*); + ~Annotate_rows_log_event(); + + virtual int get_data_size(); + virtual Log_event_type get_type_code(); + virtual bool is_valid() const; + +#ifndef MYSQL_CLIENT + virtual bool write_data_header(IO_CACHE*); + virtual bool write_data_body(IO_CACHE*); +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual void pack_info(Protocol*); +#endif + +#ifdef MYSQL_CLIENT + virtual void print(FILE*, PRINT_EVENT_INFO*); +#endif + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +private: + virtual int do_apply_event(Relay_log_info const*); + virtual int do_update_pos(Relay_log_info*); + virtual enum_skip_reason do_shall_skip(Relay_log_info*); +#endif + +private: + char *m_query_txt; + uint m_query_len; + char *m_save_thd_query_txt; + uint m_save_thd_query_len; +}; + +/** @class Table_map_log_event In row-based mode, every row operation event is preceded by a diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index 3f836a5ab1f..1b3c56ceae4 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -611,7 +611,11 @@ protected: /* BINLOG_DUMP options */ #define BINLOG_DUMP_NON_BLOCK 1 +#endif /* !MYSQL_CLIENT */ +#define BINLOG_SEND_ANNOTATE_ROWS_EVENT 2 + +#ifndef MYSQL_CLIENT /* sql_show.cc:show_log_files() */ #define SHOW_LOG_STATUS_FREE "FREE" #define SHOW_LOG_STATUS_INUSE "IN USE" diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 3d6879e5b23..bc03beaf46d 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -512,6 +512,7 @@ my_bool opt_local_infile, opt_slave_compressed_protocol; my_bool opt_safe_user_create = 0, opt_no_mix_types = 0; my_bool opt_show_slave_auth_info, opt_sql_bin_update = 0; my_bool opt_log_slave_updates= 0; +my_bool opt_replicate_annotate_rows_events= 0; bool slave_warning_issued = false; /* @@ -5870,6 +5871,8 @@ enum options_mysqld OPT_REPLICATE_IGNORE_DB, OPT_LOG_SLAVE_UPDATES, OPT_BINLOG_DO_DB, OPT_BINLOG_IGNORE_DB, OPT_BINLOG_FORMAT, + OPT_BINLOG_ANNOTATE_ROWS_EVENTS, + OPT_REPLICATE_ANNOTATE_ROWS_EVENTS, #ifndef DBUG_OFF OPT_BINLOG_SHOW_XID, #endif @@ -6096,6 +6099,18 @@ struct my_option my_long_options[] = #endif , &opt_binlog_format, &opt_binlog_format, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"binlog-annotate-rows-events", OPT_BINLOG_ANNOTATE_ROWS_EVENTS, + "Tells the master to annotate RBR events with the statement that " + "caused these events.", + (uchar**) &global_system_variables.binlog_annotate_rows_events, + (uchar**) &max_system_variables.binlog_annotate_rows_events, + 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"replicate-annotate-rows-events", OPT_REPLICATE_ANNOTATE_ROWS_EVENTS, + "Tells the slave to write annotate rows events recieved from the master " + "to its own binary log. Sensible only in pair with log-slave-updates option.", + (uchar**) &opt_replicate_annotate_rows_events, + (uchar**) &opt_replicate_annotate_rows_events, + 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0}, {"binlog-do-db", OPT_BINLOG_DO_DB, "Tells the master it should log updates for the specified database, " "and exclude all others not explicitly mentioned.", diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index c37c4735e37..a2d0b1e4904 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -38,7 +38,8 @@ Relay_log_info::Relay_log_info() inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), tables_to_lock(0), tables_to_lock_count(0), - last_event_start_time(0), m_flags(0) + last_event_start_time(0), m_flags(0), + m_annotate_event(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); @@ -72,6 +73,7 @@ Relay_log_info::~Relay_log_info() pthread_cond_destroy(&stop_cond); pthread_cond_destroy(&log_space_cond); relay_log.cleanup(); + free_annotate_event(); DBUG_VOID_RETURN; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 5cafcf47086..0ea3c23bfd8 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -423,8 +423,46 @@ public: (m_flags & (1UL << IN_STMT)); } + /** + Save pointer to Annotate_rows event and switch on the + binlog_annotate_rows_events for this sql thread. + To be called when sql thread recieves an Annotate_rows event. + */ + inline void set_annotate_event(Annotate_rows_log_event *event) + { + free_annotate_event(); + m_annotate_event= event; + sql_thd->variables.binlog_annotate_rows_events= 1; + } + + /** + Returns pointer to the saved Annotate_rows event or NULL if there is + no saved event. + */ + inline Annotate_rows_log_event* get_annotate_event() + { + return m_annotate_event; + } + + /** + Delete saved Annotate_rows event (if any) and switch off the + binlog_annotate_rows_events for this sql thread. + To be called when sql thread has applied the last (i.e. with + STMT_END_F flag) rbr event. + */ + inline void free_annotate_event() + { + if (m_annotate_event) + { + sql_thd->variables.binlog_annotate_rows_events= 0; + delete m_annotate_event; + m_annotate_event= 0; + } + } + private: uint32 m_flags; + Annotate_rows_log_event *m_annotate_event; }; diff --git a/sql/set_var.cc b/sql/set_var.cc index 910deafc432..894b6519127 100644 --- a/sql/set_var.cc +++ b/sql/set_var.cc @@ -182,6 +182,9 @@ static sys_var_const sys_back_log(&vars, "back_log", OPT_GLOBAL, SHOW_LONG, (uchar*) &back_log); static sys_var_const_os_str sys_basedir(&vars, "basedir", mysql_home); +static sys_var_thd_bool +sys_binlog_annotate_rows_events(&vars, "binlog_annotate_rows_events", + &SV::binlog_annotate_rows_events); static sys_var_long_ptr sys_binlog_cache_size(&vars, "binlog_cache_size", &binlog_cache_size); static sys_var_thd_binlog_format sys_binlog_format(&vars, "binlog_format", diff --git a/sql/slave.cc b/sql/slave.cc index 0e41d897dd4..e159645db38 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1867,6 +1867,9 @@ static int request_dump(MYSQL* mysql, Master_info* mi, *suppress_warnings= FALSE; + if (opt_log_slave_updates && opt_replicate_annotate_rows_events) + binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT; + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -2261,17 +2264,41 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli) } exec_res= apply_event_and_update_pos(ev, thd, rli); - /* - Format_description_log_event should not be deleted because it will be - used to read info about the relay log's format; it will be deleted when - the SQL thread does not need it, i.e. when this thread terminates. - */ - if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) - { - DBUG_PRINT("info", ("Deleting the event after it has been executed")); - delete ev; + switch (ev->get_type_code()) { + case FORMAT_DESCRIPTION_EVENT: + /* + Format_description_log_event should not be deleted because it + will be used to read info about the relay log's format; + it will be deleted when the SQL thread does not need it, + i.e. when this thread terminates. + */ + break; + case ANNOTATE_ROWS_EVENT: + /* + Annotate_rows event should not be deleted because after it has + been applied, thd->query points to the string inside this event. + The thd->query will be used to generate new Annotate_rows event + during applying the subsequent Rows events. + */ + rli->set_annotate_event((Annotate_rows_log_event*) ev); + break; + case DELETE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case WRITE_ROWS_EVENT: + /* + After the last Rows event has been applied, the saved Annotate_rows + event (if any) is not needed anymore and can be deleted. + */ + if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) + rli->free_annotate_event(); + /* fall through */ + default: + DBUG_PRINT("info", ("Deleting the event after it has been executed")); + delete ev; + break; } + /* update_log_pos failed: this should not happen, so we don't retry. @@ -2899,6 +2926,12 @@ pthread_handler_t handle_slave_sql(void *arg) thd->init_for_queries(); thd->temporary_tables = rli->save_temporary_tables; // restore temp tables set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables + /* + binlog_annotate_rows_events must be TRUE only after an Annotate_rows event + has been recieved and only till the last corresponding rbr event has been + applied. In all other cases it must be FALSE. + */ + thd->variables.binlog_annotate_rows_events= 0; pthread_mutex_lock(&LOCK_thread_count); threads.append(thd); pthread_mutex_unlock(&LOCK_thread_count); @@ -3382,7 +3415,7 @@ static int queue_binlog_ver_1_event(Master_info *mi, const char *buf, If we get Load event, we need to pass a non-reusable buffer to read_log_event, so we do a trick */ - if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) + if ((uchar)buf[EVENT_TYPE_OFFSET] == LOAD_EVENT) { if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME))))) { @@ -3589,13 +3622,13 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) LINT_INIT(inc_pos); if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && - buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) + (uchar)buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) DBUG_RETURN(queue_old_event(mi,buf,event_len)); LINT_INIT(inc_pos); pthread_mutex_lock(&mi->data_lock); - switch (buf[EVENT_TYPE_OFFSET]) { + switch ((uchar)buf[EVENT_TYPE_OFFSET]) { case STOP_EVENT: /* We needn't write this event to the relay log. Indeed, it just indicates a @@ -3698,9 +3731,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment mi->master_log_pos. */ - if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT && - buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT && - buf[EVENT_TYPE_OFFSET]!=STOP_EVENT) + if ((uchar)buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT && + (uchar)buf[EVENT_TYPE_OFFSET]!=STOP_EVENT) { mi->master_log_pos+= inc_pos; memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN); diff --git a/sql/slave.h b/sql/slave.h index 1a1cfcebd9b..63dd3afc6b8 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -106,6 +106,7 @@ extern MYSQL_PLUGIN_IMPORT char *relay_log_info_file; extern char *opt_relay_logname, *opt_relaylog_index_name; extern my_bool opt_skip_slave_start, opt_reckless_slave; extern my_bool opt_log_slave_updates; +extern my_bool opt_replicate_annotate_rows_events; extern ulonglong relay_log_space_limit; /* diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index da582c37ae9..9a7aac5774b 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -167,7 +167,7 @@ void mysql_client_binlog_statement(THD* thd) */ if (!have_fd_event) { - int type = bufptr[EVENT_TYPE_OFFSET]; + int type = (uchar)bufptr[EVENT_TYPE_OFFSET]; if (type == FORMAT_DESCRIPTION_EVENT || type == START_EVENT_V3) have_fd_event= TRUE; else diff --git a/sql/sql_class.h b/sql/sql_class.h index 2fe579713e1..ae7994c65c9 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -367,6 +367,7 @@ struct system_variables ulong ndb_index_stat_cache_entries; ulong ndb_index_stat_update_freq; ulong binlog_format; // binlog format for this thd (see enum_binlog_format) + my_bool binlog_annotate_rows_events; my_bool binlog_direct_non_trans_update; /* In slave thread we need to know in behalf of which @@ -1478,7 +1479,8 @@ public: */ void binlog_start_trans_and_stmt(); void binlog_set_stmt_begin(); - int binlog_write_table_map(TABLE *table, bool is_transactional); + int binlog_write_table_map(TABLE *table, bool is_transactional, + my_bool *with_annotate= 0); int binlog_write_row(TABLE* table, bool is_transactional, MY_BITMAP const* cols, size_t colcnt, const uchar *buf); diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 72ca7310ee1..48105cf9f3c 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -1966,6 +1966,11 @@ bool delayed_get_table(THD *thd, TABLE_LIST *table_list) pthread_mutex_lock(&LOCK_thread_count); thread_count++; pthread_mutex_unlock(&LOCK_thread_count); + /* + Annotating delayed inserts is not supported. + */ + di->thd.variables.binlog_annotate_rows_events= 0; + di->thd.set_db(table_list->db, (uint) strlen(table_list->db)); di->thd.set_query(my_strdup(table_list->table_name, MYF(MY_WME)), 0); if (di->thd.db == NULL || di->thd.query() == NULL) diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 95e48c531be..51361fdf6e4 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -489,7 +489,7 @@ impossible position"; DBUG_PRINT("info", ("Looked for a Format_description_log_event, found event type %d", (*packet)[EVENT_TYPE_OFFSET+1])); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) { binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F); @@ -557,31 +557,36 @@ impossible position"; #endif if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) { binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & LOG_EVENT_BINLOG_IN_USE_F); (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; } - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) + else if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) binlog_can_be_corrupted= FALSE; - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] != ANNOTATE_ROWS_EVENT || + (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) { - errmsg = "Failed on my_net_write()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + { + errmsg = "Failed on my_net_write()"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } - DBUG_PRINT("info", ("log event code %d", - (*packet)[LOG_EVENT_OFFSET+1] )); - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) - { - if (send_file(thd)) - { - errmsg = "failed in send_file()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } + DBUG_PRINT("info", ("log event code %d", + (*packet)[LOG_EVENT_OFFSET+1] )); + if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + { + if (send_file(thd)) + { + errmsg = "failed in send_file()"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + } } packet->set("\0", 1, &my_charset_bin); } @@ -677,23 +682,27 @@ impossible position"; if (read_packet) { - thd_proc_info(thd, "Sending binlog event to slave"); - if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) - { - errmsg = "Failed on my_net_write()"; - my_errno= ER_UNKNOWN_ERROR; - goto err; - } - - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) - { - if (send_file(thd)) + if ((uchar)(*packet)[EVENT_TYPE_OFFSET+1] != ANNOTATE_ROWS_EVENT || + (flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) + { + thd_proc_info(thd, "Sending binlog event to slave"); + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) { - errmsg = "failed in send_file()"; + errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; goto err; } - } + + if ((uchar)(*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + { + if (send_file(thd)) + { + errmsg = "failed in send_file()"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + } + } packet->set("\0", 1, &my_charset_bin); /* No need to net_flush because we will get to flush later when @@ -1774,6 +1783,11 @@ static sys_var_chain vars = { NULL, NULL }; static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates", OPT_GLOBAL, SHOW_MY_BOOL, (uchar*) &opt_log_slave_updates); +static sys_var_const +sys_replicate_annotate_rows_events(&vars, + "replicate_annotate_rows_events", + OPT_GLOBAL, SHOW_MY_BOOL, + (uchar*) &opt_replicate_annotate_rows_events); static sys_var_const sys_relay_log(&vars, "relay_log", OPT_GLOBAL, SHOW_CHAR_PTR, (uchar*) &opt_relay_logname); |