summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/log.cc18
-rw-r--r--sql/log_event.cc657
-rw-r--r--sql/log_event.h338
-rw-r--r--sql/rpl_rli.cc38
-rw-r--r--sql/rpl_rli.h22
-rw-r--r--sql/rpl_utility.cc3
-rw-r--r--sql/rpl_utility.h2
-rw-r--r--sql/slave.cc194
-rw-r--r--sql/slave.h6
-rw-r--r--sql/sql_binlog.cc12
10 files changed, 898 insertions, 392 deletions
diff --git a/sql/log.cc b/sql/log.cc
index deb77890f35..7e60950977a 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -1548,7 +1548,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
DBUG_ASSERT(mysql_bin_log.is_open());
- if (all && trx_data->empty())
+ /*
+ The condition here has to be identical to the one inside
+ binlog_end_trans(), guarding the write of the transaction cache to
+ the binary log.
+ */
+ if ((all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) &&
+ trx_data->empty())
{
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
trx_data->reset();
@@ -2499,7 +2505,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/*
Set 'created' to 0, so that in next relay logs this event does not
trigger cleaning actions on the slave in
- Format_description_log_event::exec_event().
+ Format_description_log_event::apply_event_impl().
*/
description_event_for_queue->created= 0;
/* Don't set log_pos in event header */
@@ -3206,8 +3212,10 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock)
{
tc_log_page_waits++;
pthread_mutex_lock(&LOCK_prep_xids);
- while (prepared_xids)
+ while (prepared_xids) {
+ DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
+ }
pthread_mutex_unlock(&LOCK_prep_xids);
}
@@ -5061,8 +5069,10 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
{
pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0);
- if (--prepared_xids == 0)
+ if (--prepared_xids == 0) {
+ DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_signal(&COND_prep_xids);
+ }
pthread_mutex_unlock(&LOCK_prep_xids);
rotate_and_purge(0); // as ::write() did not rotate
}
diff --git a/sql/log_event.cc b/sql/log_event.cc
index f8d3c43bfba..8e6311ce53a 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -88,9 +88,10 @@ public:
operator&()
DESCRIPTION
- Function to return a pointer to the internal, so that the object
- can be treated as a IO_CACHE and used with the my_b_* IO_CACHE
- functions
+
+ Function to return a pointer to the internal cache, so that the
+ object can be treated as a IO_CACHE and used with the my_b_*
+ IO_CACHE functions
RETURN VALUE
A pointer to the internal IO_CACHE.
@@ -531,25 +532,19 @@ Log_event::Log_event(const char* buf,
#ifndef MYSQL_CLIENT
#ifdef HAVE_REPLICATION
-/*
- Log_event::exec_event()
-*/
-
-int Log_event::exec_event(struct st_relay_log_info* rli)
+int Log_event::do_update_pos(RELAY_LOG_INFO *rli)
{
- DBUG_ENTER("Log_event::exec_event");
-
/*
- rli is null when (as far as I (Guilhem) know)
- the caller is
- Load_log_event::exec_event *and* that one is called from
- Execute_load_log_event::exec_event.
- In this case, we don't do anything here ;
- Execute_load_log_event::exec_event will call Log_event::exec_event
- again later with the proper rli.
- Strictly speaking, if we were sure that rli is null
- only in the case discussed above, 'if (rli)' is useless here.
- But as we are not 100% sure, keep it for now.
+ rli is null when (as far as I (Guilhem) know) the caller is
+ Load_log_event::do_apply_event *and* that one is called from
+ Execute_load_log_event::do_apply_event. In this case, we don't
+ do anything here ; Execute_load_log_event::do_apply_event will
+ call Log_event::do_apply_event again later with the proper rli.
+ Strictly speaking, if we were sure that rli is null only in the
+ case discussed above, 'if (rli)' is useless here. But as we are
+ not 100% sure, keep it for now.
+
+ Matz: I don't think we will need this check with this refactoring.
*/
if (rli)
{
@@ -584,18 +579,37 @@ int Log_event::exec_event(struct st_relay_log_info* rli)
{
rli->inc_group_relay_log_pos(log_pos);
flush_relay_log_info(rli);
- /*
- Note that Rotate_log_event::exec_event() does not call this
- function, so there is no chance that a fake rotate event resets
- last_master_timestamp.
- Note that we update without mutex (probably ok - except in some very
- rare cases, only consequence is that value may take some time to
- display in Seconds_Behind_Master - not critical).
+ /*
+ Note that Rotate_log_event::do_apply_event() does not call
+ this function, so there is no chance that a fake rotate event
+ resets last_master_timestamp. Note that we update without
+ mutex (probably ok - except in some very rare cases, only
+ consequence is that value may take some time to display in
+ Seconds_Behind_Master - not critical).
*/
rli->last_master_timestamp= when;
}
}
- DBUG_RETURN(0);
+
+ return 0; // Cannot fail currently
+}
+
+
+Log_event::enum_skip_reason
+Log_event::do_shall_skip(RELAY_LOG_INFO *rli)
+{
+ DBUG_PRINT("info", ("ev->server_id=%lu, ::server_id=%lu,"
+ " rli->replicate_same_server_id=%d,"
+ " rli->slave_skip_counter=%d",
+ (ulong) server_id, (ulong) ::server_id,
+ rli->replicate_same_server_id,
+ rli->slave_skip_counter));
+ if (server_id == ::server_id && !rli->replicate_same_server_id)
+ return EVENT_SKIP_IGNORE;
+ else if (rli->slave_skip_counter > 0)
+ return EVENT_SKIP_COUNT;
+ else
+ return EVENT_SKIP_NOT;
}
@@ -742,7 +756,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
ulong data_len;
int result=0;
char buf[LOG_EVENT_MINIMAL_HEADER_LEN];
- DBUG_ENTER("read_log_event");
+ DBUG_ENTER("Log_event::read_log_event");
if (log_lock)
pthread_mutex_lock(log_lock);
@@ -817,7 +831,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file,
const Format_description_log_event *description_event)
#endif
{
- DBUG_ENTER("Log_event::read_log_event(IO_CACHE *, Format_description_log_event *");
+ DBUG_ENTER("Log_event::read_log_event");
DBUG_ASSERT(description_event != 0);
char head[LOG_EVENT_MINIMAL_HEADER_LEN];
/*
@@ -1887,27 +1901,28 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
/*
- Query_log_event::exec_event()
+ Query_log_event::do_apply_event()
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Query_log_event::exec_event(struct st_relay_log_info* rli)
+int Query_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
- return exec_event(rli, query, q_len);
+ return do_apply_event(rli, query, q_len);
}
-int Query_log_event::exec_event(struct st_relay_log_info* rli,
- const char *query_arg, uint32 q_len_arg)
+int Query_log_event::do_apply_event(RELAY_LOG_INFO const *rli,
+ const char *query_arg, uint32 q_len_arg)
{
LEX_STRING new_db;
int expected_error,actual_error= 0;
/*
- Colleagues: please never free(thd->catalog) in MySQL. This would lead to
- bugs as here thd->catalog is a part of an alloced block, not an entire
- alloced block (see Query_log_event::exec_event()). Same for thd->db.
- Thank you.
+ Colleagues: please never free(thd->catalog) in MySQL. This would
+ lead to bugs as here thd->catalog is a part of an alloced block,
+ not an entire alloced block (see
+ Query_log_event::do_apply_event()). Same for thd->db. Thank
+ you.
*/
thd->catalog= catalog_len ? (char *) catalog : (char *)"";
new_db.length= db_len;
@@ -1926,11 +1941,11 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli,
END of the current log event (COMMIT). We save it in rli so that InnoDB can
access it.
*/
- rli->future_group_master_log_pos= log_pos;
+ const_cast<RELAY_LOG_INFO*>(rli)->future_group_master_log_pos= log_pos;
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
- clear_all_errors(thd, rli);
- rli->clear_tables_to_lock();
+ clear_all_errors(thd, const_cast<RELAY_LOG_INFO*>(rli));
+ const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
/*
Note: We do not need to execute reset_one_shot_variables() if this
@@ -1939,8 +1954,8 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli,
its companion query. If the SET is ignored because of
db_ok(), the companion query will also be ignored, and if
the companion query is ignored in the db_ok() test of
- ::exec_event(), then the companion SET also have so we
- don't need to reset_one_shot_variables().
+ ::do_apply_event(), then the companion SET also have so
+ we don't need to reset_one_shot_variables().
*/
if (rpl_filter->db_ok(thd->db))
{
@@ -2056,7 +2071,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli,
to check/fix it.
*/
if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length))
- clear_all_errors(thd, rli); /* Can ignore query */
+ clear_all_errors(thd, const_cast<RELAY_LOG_INFO*>(rli)); /* Can ignore query */
else
{
slave_print_msg(ERROR_LEVEL, rli, expected_error,
@@ -2107,7 +2122,7 @@ Default database: '%s'. Query: '%s'",
ignored_error_code(actual_error))
{
DBUG_PRINT("info",("error ignored"));
- clear_all_errors(thd, rli);
+ clear_all_errors(thd, const_cast<RELAY_LOG_INFO*>(rli));
}
/*
Other cases: mostly we expected no error and get one.
@@ -2174,16 +2189,26 @@ end:
thd->first_successful_insert_id_in_prev_stmt= 0;
thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
+ return thd->query_error;
+}
+
+int Query_log_event::do_update_pos(RELAY_LOG_INFO *rli)
+{
/*
- If there was an error we stop. Otherwise we increment positions. Note that
- we will not increment group* positions if we are just after a SET
- ONE_SHOT, because SET ONE_SHOT should not be separated from its following
- updating query.
+ Note that we will not increment group* positions if we are just
+ after a SET ONE_SHOT, because SET ONE_SHOT should not be separated
+ from its following updating query.
*/
- return (thd->query_error ? thd->query_error :
- (thd->one_shot_set ? (rli->inc_event_relay_log_pos(),0) :
- Log_event::exec_event(rli)));
+ if (thd->one_shot_set)
+ {
+ rli->inc_event_relay_log_pos();
+ return 0;
+ }
+ else
+ return Log_event::do_update_pos(rli);
}
+
+
#endif
@@ -2312,7 +2337,7 @@ bool Start_log_event_v3::write(IO_CACHE* file)
/*
- Start_log_event_v3::exec_event()
+ Start_log_event_v3::do_apply_event()
The master started
@@ -2331,9 +2356,9 @@ bool Start_log_event_v3::write(IO_CACHE* file)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Start_log_event_v3::exec_event(struct st_relay_log_info* rli)
+int Start_log_event_v3::do_apply_event(RELAY_LOG_INFO const *rli)
{
- DBUG_ENTER("Start_log_event_v3::exec_event");
+ DBUG_ENTER("Start_log_event_v3::do_apply_event");
switch (binlog_version)
{
case 3:
@@ -2375,7 +2400,7 @@ int Start_log_event_v3::exec_event(struct st_relay_log_info* rli)
/* this case is impossible */
DBUG_RETURN(1);
}
- DBUG_RETURN(Log_event::exec_event(rli));
+ DBUG_RETURN(0);
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
@@ -2566,24 +2591,10 @@ bool Format_description_log_event::write(IO_CACHE* file)
}
#endif
-/*
- SYNOPSIS
- Format_description_log_event::exec_event()
-
- IMPLEMENTATION
- Save the information which describes the binlog's format, to be able to
- read all coming events.
- Call Start_log_event_v3::exec_event().
-*/
-
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Format_description_log_event::exec_event(struct st_relay_log_info* rli)
+int Format_description_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
- DBUG_ENTER("Format_description_log_event::exec_event");
-
- /* save the information describing this binlog */
- delete rli->relay_log.description_event_for_exec;
- rli->relay_log.description_event_for_exec= this;
+ DBUG_ENTER("Format_description_log_event::do_apply_event");
#ifdef USING_TRANSACTIONS
/*
@@ -2605,14 +2616,36 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli)
"or ROLLBACK in relay log). A probable cause is that "
"the master died while writing the transaction to "
"its binary log, thus rolled back too.");
- rli->cleanup_context(thd, 1);
+ const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, 1);
}
#endif
/*
- If this event comes from ourselves, there is no cleaning task to perform,
- we don't call Start_log_event_v3::exec_event() (this was just to update the
- log's description event).
+ If this event comes from ourselves, there is no cleaning task to
+ perform, we don't call Start_log_event_v3::do_apply_event()
+ (this was just to update the log's description event).
*/
+ if (server_id != (uint32) ::server_id)
+ {
+ /*
+ If the event was not requested by the slave i.e. the master sent
+ it while the slave asked for a position >4, the event will make
+ rli->group_master_log_pos advance. Say that the slave asked for
+ position 1000, and the Format_desc event's end is 96. Then in
+ the beginning of replication rli->group_master_log_pos will be
+ 0, then 96, then jump to first really asked event (which is
+ >96). So this is ok.
+ */
+ DBUG_RETURN(Start_log_event_v3::do_apply_event(rli));
+ }
+ DBUG_RETURN(0);
+}
+
+int Format_description_log_event::do_update_pos(RELAY_LOG_INFO *rli)
+{
+ /* save the information describing this binlog */
+ delete rli->relay_log.description_event_for_exec;
+ rli->relay_log.description_event_for_exec= this;
+
if (server_id == (uint32) ::server_id)
{
/*
@@ -2629,19 +2662,20 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli)
the Intvar_log_event respectively.
*/
rli->inc_event_relay_log_pos();
- DBUG_RETURN(0);
+ return 0;
+ }
+ else
+ {
+ return Log_event::do_update_pos(rli);
}
+}
- /*
- If the event was not requested by the slave i.e. the master sent it while
- the slave asked for a position >4, the event will make
- rli->group_master_log_pos advance. Say that the slave asked for position
- 1000, and the Format_desc event's end is 96. Then in the beginning of
- replication rli->group_master_log_pos will be 0, then 96, then jump to
- first really asked event (which is >96). So this is ok.
- */
- DBUG_RETURN(Start_log_event_v3::exec_event(rli));
+Log_event::enum_skip_reason
+Format_description_log_event::do_shall_skip(RELAY_LOG_INFO *rli)
+{
+ return Log_event::EVENT_SKIP_NOT;
}
+
#endif
@@ -3155,30 +3189,32 @@ void Load_log_event::set_fields(const char* affected_db,
Does the data loading job when executing a LOAD DATA on the slave
SYNOPSIS
- Load_log_event::exec_event
- net
- rli
- use_rli_only_for_errors - if set to 1, rli is provided to
- Load_log_event::exec_event only for this
- function to have RPL_LOG_NAME and
- rli->last_slave_error, both being used by
- error reports. rli's position advancing
- is skipped (done by the caller which is
- Execute_load_log_event::exec_event).
- - if set to 0, rli is provided for full use,
- i.e. for error reports and position
- advancing.
+ Load_log_event::do_apply_event
+ net
+ rli
+ use_rli_only_for_errors - if set to 1, rli is provided to
+ Load_log_event::do_apply_event
+ only for this function to have
+ RPL_LOG_NAME and
+ rli->last_slave_error, both being
+ used by error reports. rli's
+ position advancing is skipped (done
+ by the caller which is
+ Execute_load_log_event::do_apply_event).
+ - if set to 0, rli is provided for
+ full use, i.e. for error reports and
+ position advancing.
DESCRIPTION
Does the data loading job when executing a LOAD DATA on the slave
-
+
RETURN VALUE
- 0 Success
+ 0 Success
1 Failure
*/
-int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
- bool use_rli_only_for_errors)
+int Load_log_event::do_apply_event(NET* net, RELAY_LOG_INFO const *rli,
+ bool use_rli_only_for_errors)
{
LEX_STRING new_db;
new_db.length= db_len;
@@ -3187,9 +3223,9 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
DBUG_ASSERT(thd->query == 0);
thd->query_length= 0; // Should not be needed
thd->query_error= 0;
- clear_all_errors(thd, rli);
+ clear_all_errors(thd, const_cast<RELAY_LOG_INFO*>(rli));
- /* see Query_log_event::exec_event() and BUG#13360 */
+ /* see Query_log_event::do_apply_event() and BUG#13360 */
DBUG_ASSERT(!rli->m_table_map.count());
/*
Usually mysql_init_query() is called by mysql_parse(), but we need it here
@@ -3198,22 +3234,26 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
mysql_init_query(thd, 0, 0);
if (!use_rli_only_for_errors)
{
- /* Saved for InnoDB, see comment in Query_log_event::exec_event() */
- rli->future_group_master_log_pos= log_pos;
+ /*
+ Saved for InnoDB, see comment in
+ Query_log_event::do_apply_event()
+ */
+ const_cast<RELAY_LOG_INFO*>(rli)->future_group_master_log_pos= log_pos;
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
}
/*
- We test replicate_*_db rules. Note that we have already prepared the file
- to load, even if we are going to ignore and delete it now. So it is
- possible that we did a lot of disk writes for nothing. In other words, a
- big LOAD DATA INFILE on the master will still consume a lot of space on
- the slave (space in the relay log + space of temp files: twice the space
- of the file to load...) even if it will finally be ignored.
- TODO: fix this; this can be done by testing rules in
- Create_file_log_event::exec_event() and then discarding Append_block and
- al. Another way is do the filtering in the I/O thread (more efficient: no
- disk writes at all).
+ We test replicate_*_db rules. Note that we have already prepared
+ the file to load, even if we are going to ignore and delete it
+ now. So it is possible that we did a lot of disk writes for
+ nothing. In other words, a big LOAD DATA INFILE on the master will
+ still consume a lot of space on the slave (space in the relay log
+ + space of temp files: twice the space of the file to load...)
+ even if it will finally be ignored. TODO: fix this; this can be
+ done by testing rules in Create_file_log_event::do_apply_event()
+ and then discarding Append_block and al. Another way is do the
+ filtering in the I/O thread (more efficient: no disk writes at
+ all).
Note: We do not need to execute reset_one_shot_variables() if this
@@ -3222,8 +3262,8 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli,
its companion query. If the SET is ignored because of
db_ok(), the companion query will also be ignored, and if
the companion query is ignored in the db_ok() test of
- ::exec_event(), then the companion SET also have so we
- don't need to reset_one_shot_variables().
+ ::do_apply_event(), then the companion SET also have so
+ we don't need to reset_one_shot_variables().
*/
if (rpl_filter->db_ok(thd->db))
{
@@ -3419,7 +3459,7 @@ Fatal error running LOAD DATA INFILE on table '%s'. Default database: '%s'",
return 1;
}
- return ( use_rli_only_for_errors ? 0 : Log_event::exec_event(rli) );
+ return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) );
}
#endif
@@ -3513,6 +3553,7 @@ Rotate_log_event::Rotate_log_event(const char* buf, uint event_len,
ident_offset = post_header_len;
set_if_smaller(ident_len,FN_REFLEN-1);
new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME));
+ DBUG_PRINT("debug", ("new_log_ident: '%s'", new_log_ident));
DBUG_VOID_RETURN;
}
@@ -3532,8 +3573,20 @@ bool Rotate_log_event::write(IO_CACHE* file)
}
#endif
+/**
+ Helper function to detect if the event is inside a group.
+ */
+#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+static bool is_in_group(THD *const thd, RELAY_LOG_INFO *const rli)
+{
+ return (thd->options & OPTION_BEGIN) != 0 ||
+ (rli->last_event_start_time > 0);
+}
+#endif
+
+
/*
- Rotate_log_event::exec_event()
+ Rotate_log_event::do_apply_event()
Got a rotate log event from the master
@@ -3550,34 +3603,49 @@ bool Rotate_log_event::write(IO_CACHE* file)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
+int Rotate_log_event::do_update_pos(RELAY_LOG_INFO *rli)
{
- DBUG_ENTER("Rotate_log_event::exec_event");
+ DBUG_ENTER("Rotate_log_event::do_update_pos");
+#ifndef DBUG_OFF
+ char buf[32];
+#endif
+
+ DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu",
+ (ulong) this->server_id, (ulong) ::server_id));
+ DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident));
+ DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf)));
pthread_mutex_lock(&rli->data_lock);
rli->event_relay_log_pos= my_b_tell(rli->cur_log);
/*
- If we are in a transaction: the only normal case is when the I/O thread was
- copying a big transaction, then it was stopped and restarted: we have this
- in the relay log:
+ If we are in a transaction or in a group: the only normal case is
+ when the I/O thread was copying a big transaction, then it was
+ stopped and restarted: we have this in the relay log:
+
BEGIN
...
ROTATE (a fake one)
...
COMMIT or ROLLBACK
- In that case, we don't want to touch the coordinates which correspond to
- the beginning of the transaction.
- Starting from 5.0.0, there also are some rotates from the slave itself, in
- the relay log.
+
+ In that case, we don't want to touch the coordinates which
+ correspond to the beginning of the transaction. Starting from
+ 5.0.0, there also are some rotates from the slave itself, in the
+ relay log, which shall not change the group positions.
*/
- if (!(thd->options & OPTION_BEGIN))
+ if ((server_id != ::server_id || rli->replicate_same_server_id) &&
+ !is_in_group(thd, rli))
{
+ DBUG_PRINT("info", ("old group_master_log_name: '%s' "
+ "old group_master_log_pos: %lu",
+ rli->group_master_log_name,
+ (ulong) rli->group_master_log_pos));
memcpy(rli->group_master_log_name, new_log_ident, ident_len+1);
rli->notify_group_master_log_name_update();
rli->group_master_log_pos= pos;
rli->group_relay_log_pos= rli->event_relay_log_pos;
- DBUG_PRINT("info", ("group_master_log_name: '%s' "
- "group_master_log_pos: %lu",
+ DBUG_PRINT("info", ("new group_master_log_name: '%s' "
+ "new group_master_log_pos: %lu",
rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
/*
@@ -3596,8 +3664,27 @@ int Rotate_log_event::exec_event(struct st_relay_log_info* rli)
pthread_mutex_unlock(&rli->data_lock);
pthread_cond_broadcast(&rli->data_cond);
flush_relay_log_info(rli);
+
DBUG_RETURN(0);
}
+
+
+Log_event::enum_skip_reason
+Rotate_log_event::do_shall_skip(RELAY_LOG_INFO *rli)
+{
+ enum_skip_reason reason= Log_event::do_shall_skip(rli);
+
+ switch (reason) {
+ case Log_event::EVENT_SKIP_NOT:
+ case Log_event::EVENT_SKIP_COUNT:
+ return Log_event::EVENT_SKIP_NOT;
+
+ case Log_event::EVENT_SKIP_IGNORE:
+ return Log_event::EVENT_SKIP_IGNORE;
+ }
+ DBUG_ASSERT(0);
+}
+
#endif
@@ -3704,11 +3791,11 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
/*
- Intvar_log_event::exec_event()
+ Intvar_log_event::do_apply_event()
*/
#if defined(HAVE_REPLICATION)&& !defined(MYSQL_CLIENT)
-int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
+int Intvar_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
switch (type) {
case LAST_INSERT_ID_EVENT:
@@ -3719,9 +3806,33 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
thd->force_one_auto_inc_interval(val);
break;
}
+ return 0;
+}
+
+int Intvar_log_event::do_update_pos(RELAY_LOG_INFO *rli)
+{
rli->inc_event_relay_log_pos();
return 0;
}
+
+
+Log_event::enum_skip_reason
+Intvar_log_event::do_shall_skip(RELAY_LOG_INFO *rli)
+{
+ /*
+ It is a common error to set the slave skip counter to 1 instead of
+ 2 when recovering from an insert which used a auto increment,
+ rand, or user var. Therefore, if the slave skip counter is 1, we
+ just say that this event should be skipped by ignoring it, meaning
+ that we do not change the value of the slave skip counter since it
+ will be decreased by the following insert event.
+ */
+ if (rli->slave_skip_counter == 1)
+ return Log_event::EVENT_SKIP_IGNORE;
+ else
+ return Log_event::do_shall_skip(rli);
+}
+
#endif
@@ -3784,13 +3895,37 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Rand_log_event::exec_event(struct st_relay_log_info* rli)
+int Rand_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
thd->rand.seed1= (ulong) seed1;
thd->rand.seed2= (ulong) seed2;
+ return 0;
+}
+
+int Rand_log_event::do_update_pos(RELAY_LOG_INFO *rli)
+{
rli->inc_event_relay_log_pos();
return 0;
}
+
+
+Log_event::enum_skip_reason
+Rand_log_event::do_shall_skip(RELAY_LOG_INFO *rli)
+{
+ /*
+ It is a common error to set the slave skip counter to 1 instead of
+ 2 when recovering from an insert which used a auto increment,
+ rand, or user var. Therefore, if the slave skip counter is 1, we
+ just say that this event should be skipped by ignoring it, meaning
+ that we do not change the value of the slave skip counter since it
+ will be decreased by the following insert event.
+ */
+ if (rli->slave_skip_counter == 1)
+ return Log_event::EVENT_SKIP_IGNORE;
+ else
+ return Log_event::do_shall_skip(rli);
+}
+
#endif /* !MYSQL_CLIENT */
@@ -3857,12 +3992,12 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Xid_log_event::exec_event(struct st_relay_log_info* rli)
+int Xid_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
- return end_trans(thd, COMMIT) || Log_event::exec_event(rli);
+ return end_trans(thd, COMMIT);
}
#endif /* !MYSQL_CLIENT */
@@ -4140,11 +4275,11 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
/*
- User_var_log_event::exec_event()
+ User_var_log_event::do_apply_event()
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int User_var_log_event::exec_event(struct st_relay_log_info* rli)
+int User_var_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
Item *it= 0;
CHARSET_INFO *charset;
@@ -4206,9 +4341,31 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli)
e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0);
free_root(thd->mem_root,0);
+ return 0;
+}
+
+int User_var_log_event::do_update_pos(RELAY_LOG_INFO *rli)
+{
rli->inc_event_relay_log_pos();
return 0;
}
+
+Log_event::enum_skip_reason
+User_var_log_event::do_shall_skip(RELAY_LOG_INFO *rli)
+{
+ /*
+ It is a common error to set the slave skip counter to 1 instead
+ of 2 when recovering from an insert which used a auto increment,
+ rand, or user var. Therefore, if the slave skip counter is 1, we
+ just say that this event should be skipped by ignoring it, meaning
+ that we do not change the value of the slave skip counter since it
+ will be decreased by the following insert event.
+ */
+ if (rli->slave_skip_counter == 1)
+ return Log_event::EVENT_SKIP_IGNORE;
+ else
+ return Log_event::do_shall_skip(rli);
+}
#endif /* !MYSQL_CLIENT */
@@ -4248,7 +4405,7 @@ void Slave_log_event::pack_info(Protocol *protocol)
#ifndef MYSQL_CLIENT
Slave_log_event::Slave_log_event(THD* thd_arg,
- struct st_relay_log_info* rli)
+ RELAY_LOG_INFO* rli)
:Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0)
{
DBUG_ENTER("Slave_log_event");
@@ -4358,11 +4515,11 @@ Slave_log_event::Slave_log_event(const char* buf, uint event_len)
#ifndef MYSQL_CLIENT
-int Slave_log_event::exec_event(struct st_relay_log_info* rli)
+int Slave_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
if (mysql_bin_log.is_open())
mysql_bin_log.write(this);
- return Log_event::exec_event(rli);
+ return 0;
}
#endif /* !MYSQL_CLIENT */
@@ -4391,21 +4548,21 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
/*
- Stop_log_event::exec_event()
-
- The master stopped.
- We used to clean up all temporary tables but this is useless as, as the
- master has shut down properly, it has written all DROP TEMPORARY TABLE
- (prepared statements' deletion is TODO only when we binlog prep stmts).
- We used to clean up slave_load_tmpdir, but this is useless as it has been
- cleared at the end of LOAD DATA INFILE.
- So we have nothing to do here.
- The place were we must do this cleaning is in Start_log_event_v3::exec_event(),
- not here. Because if we come here, the master was sane.
+ Stop_log_event::do_apply_event()
+
+ The master stopped. We used to clean up all temporary tables but
+ this is useless as, as the master has shut down properly, it has
+ written all DROP TEMPORARY TABLE (prepared statements' deletion is
+ TODO only when we binlog prep stmts). We used to clean up
+ slave_load_tmpdir, but this is useless as it has been cleared at the
+ end of LOAD DATA INFILE. So we have nothing to do here. The place
+ were we must do this cleaning is in
+ Start_log_event_v3::do_apply_event(), not here. Because if we come
+ here, the master was sane.
*/
#ifndef MYSQL_CLIENT
-int Stop_log_event::exec_event(struct st_relay_log_info* rli)
+int Stop_log_event::do_update_pos(RELAY_LOG_INFO *rli)
{
/*
We do not want to update master_log pos because we get a rotate event
@@ -4423,6 +4580,7 @@ int Stop_log_event::exec_event(struct st_relay_log_info* rli)
}
return 0;
}
+
#endif /* !MYSQL_CLIENT */
#endif /* HAVE_REPLICATION */
@@ -4613,11 +4771,11 @@ void Create_file_log_event::pack_info(Protocol *protocol)
/*
- Create_file_log_event::exec_event()
+ Create_file_log_event::do_apply_event()
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Create_file_log_event::exec_event(struct st_relay_log_info* rli)
+int Create_file_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
char proc_info[17+FN_REFLEN+10], *fname_buf;
char *ext;
@@ -4679,7 +4837,7 @@ err:
if (fd >= 0)
my_close(fd, MYF(0));
thd->proc_info= 0;
- return error ? 1 : Log_event::exec_event(rli);
+ return error == 0;
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
@@ -4787,15 +4945,15 @@ int Append_block_log_event::get_create_or_append() const
}
/*
- Append_block_log_event::exec_event()
+ Append_block_log_event::do_apply_event()
*/
-int Append_block_log_event::exec_event(struct st_relay_log_info* rli)
+int Append_block_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
int fd;
int error = 1;
- DBUG_ENTER("Append_block_log_event::exec_event");
+ DBUG_ENTER("Append_block_log_event::do_apply_event");
fname= strmov(proc_info, "Making temp file ");
slave_load_file_stem(fname, file_id, server_id, ".data");
@@ -4834,7 +4992,7 @@ err:
if (fd >= 0)
my_close(fd, MYF(0));
thd->proc_info= 0;
- DBUG_RETURN(error ? error : Log_event::exec_event(rli));
+ DBUG_RETURN(error);
}
#endif
@@ -4918,18 +5076,18 @@ void Delete_file_log_event::pack_info(Protocol *protocol)
#endif
/*
- Delete_file_log_event::exec_event()
+ Delete_file_log_event::do_apply_event()
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
-int Delete_file_log_event::exec_event(struct st_relay_log_info* rli)
+int Delete_file_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
char fname[FN_REFLEN+10];
char *ext= slave_load_file_stem(fname, file_id, server_id, ".data");
(void) my_delete(fname, MYF(MY_WME));
strmov(ext, ".info");
(void) my_delete(fname, MYF(MY_WME));
- return Log_event::exec_event(rli);
+ return 0;
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
@@ -5015,10 +5173,10 @@ void Execute_load_log_event::pack_info(Protocol *protocol)
/*
- Execute_load_log_event::exec_event()
+ Execute_load_log_event::do_apply_event()
*/
-int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
+int Execute_load_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
char fname[FN_REFLEN+10];
char *ext;
@@ -5049,14 +5207,15 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
lev->thd = thd;
/*
- lev->exec_event should use rli only for errors
- i.e. should not advance rli's position.
- lev->exec_event is the place where the table is loaded (it calls
- mysql_load()).
+ lev->do_apply_event should use rli only for errors i.e. should
+ not advance rli's position.
+
+ lev->do_apply_event is the place where the table is loaded (it
+ calls mysql_load()).
*/
- rli->future_group_master_log_pos= log_pos;
- if (lev->exec_event(0,rli,1))
+ const_cast<RELAY_LOG_INFO*>(rli)->future_group_master_log_pos= log_pos;
+ if (lev->do_apply_event(0,rli,1))
{
/*
We want to indicate the name of the file that could not be loaded
@@ -5099,7 +5258,7 @@ err:
my_close(fd, MYF(0));
end_io_cache(&file);
}
- return error ? error : Log_event::exec_event(rli);
+ return error;
}
#endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */
@@ -5267,7 +5426,7 @@ void Execute_load_query_log_event::pack_info(Protocol *protocol)
int
-Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli)
+Execute_load_query_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
char *p;
char *buf;
@@ -5304,7 +5463,7 @@ Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli)
p= strmake(p, STRING_WITH_LEN(" INTO"));
p= strmake(p, query+fn_pos_end, q_len-fn_pos_end);
- error= Query_log_event::exec_event(rli, buf, p-buf);
+ error= Query_log_event::do_apply_event(rli, buf, p-buf);
/* Forging file name for deletion in same buffer */
*fname_end= 0;
@@ -5624,7 +5783,7 @@ int Rows_log_event::do_add_row_data(byte *const row_data,
the master does not have a default value (and isn't nullable)
*/
static int
-unpack_row(RELAY_LOG_INFO *rli,
+unpack_row(RELAY_LOG_INFO const *rli,
TABLE *table, uint const colcnt,
char const *row, MY_BITMAP const *cols,
char const **row_end, ulong *master_reclength,
@@ -5730,17 +5889,17 @@ unpack_row(RELAY_LOG_INFO *rli,
DBUG_RETURN(error);
}
-int Rows_log_event::exec_event(st_relay_log_info *rli)
+int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
- DBUG_ENTER("Rows_log_event::exec_event(st_relay_log_info*)");
+ DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)");
int error= 0;
char const *row_start= (char const *)m_rows_buf;
/*
- If m_table_id == ~0UL, then we have a dummy event that does
- not contain any data. In that case, we just remove all tables in
- the tables_to_lock list, close the thread tables, step the relay
- log position, and return with success.
+ If m_table_id == ~0UL, then we have a dummy event that does not
+ contain any data. In that case, we just remove all tables in the
+ tables_to_lock list, close the thread tables, and return with
+ success. The relay log position will be stepped in
*/
if (m_table_id == ~0UL)
{
@@ -5750,16 +5909,16 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
*/
DBUG_ASSERT(get_flags(STMT_END_F));
- rli->clear_tables_to_lock();
+ const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
close_thread_tables(thd);
thd->clear_error();
- rli->inc_event_relay_log_pos();
DBUG_RETURN(0);
}
/*
'thd' has been set by exec_relay_log_event(), just before calling
- exec_event(). We still check here to prevent future coding errors.
+ do_apply_event(). We still check here to prevent future coding
+ errors.
*/
DBUG_ASSERT(rli->sql_thd == thd);
@@ -5775,8 +5934,9 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
/*
lock_tables() reads the contents of thd->lex, so they must be
- initialized. Contrary to in Table_map_log_event::exec_event() we don't
- call mysql_init_query() as that may reset the binlog format.
+ initialized. Contrary to in
+ Table_map_log_event::do_apply_event() we don't call
+ mysql_init_query() as that may reset the binlog format.
*/
lex_start(thd, NULL, 0);
@@ -5805,7 +5965,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
"Error in %s event: when locking tables",
get_type_str());
}
- rli->clear_tables_to_lock();
+ const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
DBUG_RETURN(error);
}
@@ -5826,7 +5986,8 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
TABLE_LIST *tables= rli->tables_to_lock;
close_tables_for_reopen(thd, &tables);
- if ((error= open_tables(thd, &tables, &rli->tables_to_lock_count, 0)))
+ uint tables_count= rli->tables_to_lock_count;
+ if ((error= open_tables(thd, &tables, &tables_count, 0)))
{
if (thd->query_error || thd->is_fatal_error)
{
@@ -5841,7 +6002,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
"unexpected success or fatal error"));
thd->query_error= 1;
}
- rli->clear_tables_to_lock();
+ const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
DBUG_RETURN(error);
}
}
@@ -5885,24 +6046,24 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
*/
for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
{
- rli->m_table_map.set_table(ptr->table_id, ptr->table);
+ const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
}
#ifdef HAVE_QUERY_CACHE
query_cache.invalidate_locked_for_write(rli->tables_to_lock);
#endif
- rli->clear_tables_to_lock();
+ const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock();
}
DBUG_ASSERT(rli->tables_to_lock == NULL && rli->tables_to_lock_count == 0);
- TABLE* table= rli->m_table_map.get_table(m_table_id);
+ TABLE* table= const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id);
if (table)
{
/*
table == NULL means that this table should not be replicated
- (this was set up by Table_map_log_event::exec_event() which
- tested replicate-* rules).
+ (this was set up by Table_map_log_event::do_apply_event()
+ which tested replicate-* rules).
*/
/*
@@ -5959,7 +6120,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
break;
default:
- slave_print_msg(ERROR_LEVEL, rli, error,
+ slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno,
"Error in %s event: row application failed",
get_type_str());
thd->query_error= 1;
@@ -5969,7 +6130,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
row_start= row_end;
}
DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event",
- rli->abort_slave=1;);
+ const_cast<RELAY_LOG_INFO*>(rli)->abort_slave= 1;);
error= do_after_row_operations(table, error);
if (!cache_stmt)
{
@@ -5980,11 +6141,12 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
if (error)
{ /* error has occured during the transaction */
- slave_print_msg(ERROR_LEVEL, rli, error,
+ slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno,
"Error in %s event: error during transaction execution "
"on table %s.%s",
get_type_str(), table->s->db.str,
table->s->table_name.str);
+
/*
If one day we honour --skip-slave-errors in row-based replication, and
the error should be skipped, then we would clear mappings, rollback,
@@ -5997,7 +6159,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
rollback at the caller along with sbr.
*/
thd->reset_current_stmt_binlog_row_based();
- rli->cleanup_context(thd, 0); /* rollback at caller in step with sbr */
+ const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, error);
thd->query_error= 1;
DBUG_RETURN(error);
}
@@ -6041,8 +6203,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
*/
thd->reset_current_stmt_binlog_row_based();
- rli->cleanup_context(thd, 0);
- rli->transaction_end(thd);
+ const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, 0);
if (error == 0)
{
@@ -6055,7 +6216,6 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
do not become visible. We still prefer to wipe them out.
*/
thd->clear_error();
- error= Log_event::exec_event(rli);
}
else
slave_print_msg(ERROR_LEVEL, rli, error,
@@ -6082,17 +6242,17 @@ int Rows_log_event::exec_event(st_relay_log_info *rli)
wait (reached end of last relay log and nothing gets appended
there), we timeout after one minute, and notify DBA about the
problem. When WL#2975 is implemented, just remove the member
- st_relay_log_info::unsafe_to_stop_at and all its occurences.
+ st_relay_log_info::last_event_start_time and all its occurences.
*/
- rli->unsafe_to_stop_at= time(0);
+ const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0);
}
DBUG_ASSERT(error == 0);
thd->clear_error();
- rli->inc_event_relay_log_pos();
-
+
DBUG_RETURN(0);
}
+
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifndef MYSQL_CLIENT
@@ -6272,15 +6432,15 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
const char *const vpart= buf + common_header_len + post_header_len;
/* Extract the length of the various parts from the buffer */
- byte const* const ptr_dblen= (byte const*)vpart + 0;
+ byte const *const ptr_dblen= (byte const*)vpart + 0;
m_dblen= *(uchar*) ptr_dblen;
/* Length of database name + counter + terminating null */
- byte const* const ptr_tbllen= ptr_dblen + m_dblen + 2;
+ byte const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
m_tbllen= *(uchar*) ptr_tbllen;
/* Length of table name + counter + terminating null */
- byte const* const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
+ byte const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
m_colcnt= net_field_length(&ptr_after_colcnt);
@@ -6325,9 +6485,9 @@ Table_map_log_event::~Table_map_log_event()
*/
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-int Table_map_log_event::exec_event(st_relay_log_info *rli)
+int Table_map_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
- DBUG_ENTER("Table_map_log_event::exec_event(st_relay_log_info*)");
+ DBUG_ENTER("Table_map_log_event::do_apply_event(st_relay_log_info*)");
DBUG_ASSERT(rli->sql_thd == thd);
@@ -6450,29 +6610,24 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli)
locked by linking the table into the list of tables to lock.
*/
table_list->next_global= table_list->next_local= rli->tables_to_lock;
- rli->tables_to_lock= table_list;
- rli->tables_to_lock_count++;
+ const_cast<RELAY_LOG_INFO*>(rli)->tables_to_lock= table_list;
+ const_cast<RELAY_LOG_INFO*>(rli)->tables_to_lock_count++;
/* 'memory' is freed in clear_tables_to_lock */
}
- /*
- We explicitly do not call Log_event::exec_event() here since we do not
- want the relay log position to be flushed to disk. The flushing will be
- done by the last Rows_log_event that either ends a statement (outside a
- transaction) or a transaction.
-
- A table map event can *never* end a transaction or a statement, so we
- just step the relay log position.
- */
-
- if (likely(!error))
- rli->inc_event_relay_log_pos();
DBUG_RETURN(error);
err:
my_free((gptr) memory, MYF(MY_WME));
DBUG_RETURN(error);
}
+
+int Table_map_log_event::do_update_pos(RELAY_LOG_INFO *rli)
+{
+ rli->inc_event_relay_log_pos();
+ return 0;
+}
+
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
#ifndef MYSQL_CLIENT
@@ -6637,7 +6792,7 @@ int Write_rows_log_event::do_after_row_operations(TABLE *table, int error)
return error;
}
-int Write_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
+int Write_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
TABLE *table,
char const *const row_start,
char const **const row_end)
@@ -6778,6 +6933,32 @@ copy_extra_record_fields(TABLE *table,
return 0; // All OK
}
+/**
+ Check if an error is a duplicate key error.
+
+ This function is used to check if an error code is one of the
+ duplicate key error, i.e., and error code for which it is sensible
+ to do a <code>get_dup_key()</code> to retrieve the duplicate key.
+
+ @param errcode The error code to check.
+
+ @return <code>true</code> if the error code is such that
+ <code>get_dup_key()</code> will return true, <code>false</code>
+ otherwise.
+ */
+bool
+is_duplicate_key_error(int errcode)
+{
+ switch (errcode)
+ {
+ case HA_ERR_FOUND_DUPP_KEY:
+ case HA_ERR_FOUND_DUPP_UNIQUE:
+ return true;
+ }
+ return false;
+}
+
+
/*
Replace the provided record in the database.
@@ -6820,7 +7001,7 @@ replace_record(THD *thd, TABLE *table,
if ((keynum= table->file->get_dup_key(error)) < 0)
{
/* We failed to retrieve the duplicate key */
- DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
+ DBUG_RETURN(error);
}
/*
@@ -6837,7 +7018,10 @@ replace_record(THD *thd, TABLE *table,
{
error= table->file->rnd_pos(table->record[1], table->file->dup_ref);
if (error)
+ {
+ table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
+ }
}
else
{
@@ -6854,12 +7038,15 @@ replace_record(THD *thd, TABLE *table,
}
key_copy((byte*)key.get(), table->record[0], table->key_info + keynum, 0);
- error= table->file->index_read_idx(table->record[1], keynum,
+ error= table->file->index_read_idx(table->record[1], keynum,
(const byte*)key.get(),
table->key_info[keynum].key_length,
HA_READ_KEY_EXACT);
if (error)
+ {
+ table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
+ }
}
/*
@@ -6892,15 +7079,21 @@ replace_record(THD *thd, TABLE *table,
{
error=table->file->ha_update_row(table->record[1],
table->record[0]);
+ if (error)
+ table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
}
else
{
if ((error= table->file->ha_delete_row(table->record[1])))
+ {
+ table->file->print_error(error, MYF(0));
DBUG_RETURN(error);
+ }
/* Will retry ha_write_row() with the offending row removed. */
}
}
+
DBUG_RETURN(error);
}
@@ -7239,7 +7432,7 @@ int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error)
return error;
}
-int Delete_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
+int Delete_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
TABLE *table,
char const *const row_start,
char const **const row_end)
@@ -7374,7 +7567,7 @@ int Update_rows_log_event::do_after_row_operations(TABLE *table, int error)
return error;
}
-int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli,
+int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli,
TABLE *table,
char const *const row_start,
char const **const row_end)
diff --git a/sql/log_event.h b/sql/log_event.h
index 7cbe8925d9a..7cd231a8353 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -503,6 +503,7 @@ class THD;
class Format_description_log_event;
struct st_relay_log_info;
+typedef st_relay_log_info RELAY_LOG_INFO;
#ifdef MYSQL_CLIENT
/*
@@ -591,6 +592,33 @@ typedef struct st_print_event_info
class Log_event
{
public:
+ /**
+ Enumeration of what kinds of skipping (and non-skipping) that can
+ occur when the slave executes an event.
+
+ @see shall_skip
+ @see do_shall_skip
+ */
+ enum enum_skip_reason {
+ /**
+ Don't skip event.
+ */
+ EVENT_SKIP_NOT,
+
+ /**
+ Skip event by ignoring it.
+
+ This means that the slave skip counter will not be changed.
+ */
+ EVENT_SKIP_IGNORE,
+
+ /**
+ Skip event and decrease skip counter.
+ */
+ EVENT_SKIP_COUNT
+ };
+
+
/*
The following type definition is to be used whenever data is placed
and manipulated in a common buffer. Use this typedef for buffers
@@ -672,16 +700,14 @@ public:
static void init_show_field_list(List<Item>* field_list);
#ifdef HAVE_REPLICATION
int net_send(Protocol *protocol, const char* log_name, my_off_t pos);
+
/*
pack_info() is used by SHOW BINLOG EVENTS; as print() it prepares and sends
a string to display to the user, so it resembles print().
*/
+
virtual void pack_info(Protocol *protocol);
- /*
- The SQL slave thread calls exec_event() to execute the event; this is where
- the slave's data is modified.
- */
- virtual int exec_event(struct st_relay_log_info* rli);
+
#endif /* HAVE_REPLICATION */
virtual const char* get_db()
{
@@ -754,6 +780,127 @@ public:
*description_event);
/* returns the human readable name of the event's type */
const char* get_type_str();
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+public:
+
+ /**
+ Apply the event to the database.
+
+ This function represents the public interface for applying an
+ event.
+
+ @see do_apply_event
+ */
+ int apply_event(RELAY_LOG_INFO const *rli) {
+ return do_apply_event(rli);
+ }
+
+
+ /**
+ Update the relay log position.
+
+ This function represents the public interface for "stepping over"
+ the event and will update the relay log information.
+
+ @see do_update_pos
+ */
+ int update_pos(RELAY_LOG_INFO *rli)
+ {
+ return do_update_pos(rli);
+ }
+
+ /**
+ Decide if the event shall be skipped, and the reason for skipping
+ it.
+
+ @see do_shall_skip
+ */
+ enum_skip_reason shall_skip(RELAY_LOG_INFO *rli)
+ {
+ return do_shall_skip(rli);
+ }
+
+protected:
+ /**
+ Primitive to apply an event to the database.
+
+ This is where the change to the database is made.
+
+ @note The primitive is protected instead of private, since there
+ is a hierarchy of actions to be performed in some cases.
+
+ @see Format_description_log_event::do_apply_event()
+
+ @param rli Pointer to relay log info structure
+
+ @retval 0 Event applied successfully
+ @retval errno Error code if event application failed
+ */
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli)
+ {
+ return 0; /* Default implementation does nothing */
+ }
+
+
+ /**
+ Advance relay log coordinates.
+
+ This function is called to advance the relay log coordinates to
+ just after the event. It is essential that both the relay log
+ coordinate and the group log position is updated correctly, since
+ this function is used also for skipping events.
+
+ Normally, each implementation of do_update_pos() shall:
+
+ - Update the event position to refer to the position just after
+ the event.
+
+ - Update the group log position to refer to the position just
+ after the event <em>if the event is last in a group</em>
+
+ @param rli Pointer to relay log info structure
+
+ @retval 0 Coordinates changed successfully
+ @retval errno Error code if advancing failed (usually just
+ 1). Observe that handler errors are returned by the
+ do_apply_event() function, and not by this one.
+ */
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+
+
+ /**
+ Decide if this event shall be skipped or not and the reason for
+ skipping it.
+
+ The default implementation decide that the event shall be skipped
+ if either:
+
+ - the server id of the event is the same as the server id of the
+ server and <code>rli->replicate_same_server_id</code> is true,
+ or
+
+ - if <code>rli->slave_skip_counter</code> is greater than zero.
+
+ @see do_apply_event
+ @see do_update_pos
+
+ @retval Log_event::EVENT_SKIP_NOT
+ The event shall not be skipped and should be applied.
+
+ @retval Log_event::EVENT_SKIP_IGNORE
+ The event shall be skipped by just ignoring it, i.e., the slave
+ skip counter shall not be changed. This happends if, for example,
+ the originating server id of the event is the same as the server
+ id of the slave.
+
+ @retval Log_event::EVENT_SKIP_COUNT
+ The event shall be skipped because the slave skip counter was
+ non-zero. The caller shall decrease the counter by one.
+ */
+ virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
+
+#endif
};
/*
@@ -794,10 +941,10 @@ public:
uint16 error_code;
ulong thread_id;
/*
- For events created by Query_log_event::exec_event (and
- Load_log_event::exec_event()) we need the *original* thread id, to be able
- to log the event with the original (=master's) thread id (fix for
- BUG#1686).
+ For events created by Query_log_event::do_apply_event (and
+ Load_log_event::do_apply_event()) we need the *original* thread
+ id, to be able to log the event with the original (=master's)
+ thread id (fix for BUG#1686).
*/
ulong slave_proxy_id;
@@ -860,9 +1007,6 @@ public:
const char* get_db() { return db; }
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
- int exec_event(struct st_relay_log_info* rli, const char *query_arg,
- uint32 q_len_arg);
#endif /* HAVE_REPLICATION */
#else
void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info);
@@ -891,6 +1035,16 @@ public:
*/
virtual ulong get_post_header_size_for_derived() { return 0; }
/* Writes derived event-specific part of post header. */
+
+public: /* !!! Public in this patch to allow old usage */
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+
+ int do_apply_event(RELAY_LOG_INFO const *rli,
+ const char *query_arg,
+ uint32 q_len_arg);
+#endif /* HAVE_REPLICATION */
};
@@ -939,9 +1093,8 @@ public:
uint16 master_port;
#ifndef MYSQL_CLIENT
- Slave_log_event(THD* thd_arg, struct st_relay_log_info* rli);
+ Slave_log_event(THD* thd_arg, RELAY_LOG_INFO* rli);
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif
@@ -954,6 +1107,11 @@ public:
#ifndef MYSQL_CLIENT
bool write(IO_CACHE* file);
#endif
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const* rli);
+#endif
};
#endif /* HAVE_REPLICATION */
@@ -1023,12 +1181,6 @@ public:
const char* get_db() { return db; }
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli)
- {
- return exec_event(thd->slave_net,rli,0);
- }
- int exec_event(NET* net, struct st_relay_log_info* rli,
- bool use_rli_only_for_errors);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1060,6 +1212,17 @@ public:
+ LOAD_HEADER_LEN
+ sql_ex.data_size() + field_block_len + num_fields);
}
+
+public: /* !!! Public in this patch to allow old usage */
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const* rli)
+ {
+ return do_apply_event(thd->slave_net,rli,0);
+ }
+
+ int do_apply_event(NET *net, RELAY_LOG_INFO const *rli,
+ bool use_rli_only_for_errors);
+#endif
};
extern char server_version[SERVER_VERSION_LENGTH];
@@ -1117,7 +1280,6 @@ public:
Start_log_event_v3();
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
Start_log_event_v3() {}
@@ -1137,6 +1299,22 @@ public:
return START_V3_HEADER_LEN; //no variable-sized part
}
virtual bool is_artificial_event() { return artificial_event; }
+
+protected:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+ virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO*)
+ {
+ /*
+ Events from ourself should be skipped, but they should not
+ decrease the slave skip counter.
+ */
+ if (this->server_id == ::server_id)
+ return Log_event::EVENT_SKIP_IGNORE;
+ else
+ return Log_event::EVENT_SKIP_NOT;
+ }
+#endif
};
@@ -1162,13 +1340,6 @@ public:
uchar server_version_split[3];
Format_description_log_event(uint8 binlog_ver, const char* server_ver=0);
-
-#ifndef MYSQL_CLIENT
-#ifdef HAVE_REPLICATION
- int exec_event(struct st_relay_log_info* rli);
-#endif /* HAVE_REPLICATION */
-#endif
-
Format_description_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Format_description_log_event() { my_free((gptr)post_header_len, MYF(0)); }
@@ -1191,7 +1362,15 @@ public:
*/
return FORMAT_DESCRIPTION_HEADER_LEN;
}
+
void calc_server_version_split();
+
+protected:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+ virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
+#endif
};
@@ -1215,7 +1394,6 @@ public:
{}
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1230,6 +1408,13 @@ public:
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+ virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
+#endif
};
@@ -1256,7 +1441,6 @@ class Rand_log_event: public Log_event
{}
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1270,6 +1454,13 @@ class Rand_log_event: public Log_event
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+ virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
+#endif
};
/*****************************************************************************
@@ -1293,7 +1484,6 @@ class Xid_log_event: public Log_event
Xid_log_event(THD* thd_arg, my_xid x): Log_event(thd_arg,0,0), xid(x) {}
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1307,6 +1497,11 @@ class Xid_log_event: public Log_event
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+#endif
};
/*****************************************************************************
@@ -1336,7 +1531,6 @@ public:
val_len(val_len_arg), type(type_arg), charset_number(charset_number_arg)
{ is_null= !val; }
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif
@@ -1348,6 +1542,13 @@ public:
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+ virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
+#endif
};
@@ -1362,7 +1563,6 @@ public:
#ifndef MYSQL_CLIENT
Stop_log_event() :Log_event()
{}
- int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif
@@ -1373,6 +1573,22 @@ public:
~Stop_log_event() {}
Log_event_type get_type_code() { return STOP_EVENT;}
bool is_valid() const { return 1; }
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+ virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli)
+ {
+ /*
+ Events from ourself should be skipped, but they should not
+ decrease the slave skip counter.
+ */
+ if (this->server_id == ::server_id)
+ return Log_event::EVENT_SKIP_IGNORE;
+ else
+ return Log_event::EVENT_SKIP_NOT;
+ }
+#endif
};
/*****************************************************************************
@@ -1399,7 +1615,6 @@ public:
ulonglong pos_arg, uint flags);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1418,6 +1633,12 @@ public:
#ifndef MYSQL_CLIENT
bool write(IO_CACHE* file);
#endif
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+ virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
+#endif
};
@@ -1452,7 +1673,6 @@ public:
bool using_trans);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1486,6 +1706,11 @@ public:
*/
bool write_base(IO_CACHE* file);
#endif
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+#endif
};
@@ -1518,7 +1743,6 @@ public:
Append_block_log_event(THD* thd, const char* db_arg, char* block_arg,
uint block_len_arg, bool using_trans);
#ifdef HAVE_REPLICATION
- int exec_event(struct st_relay_log_info* rli);
void pack_info(Protocol* protocol);
virtual int get_create_or_append() const;
#endif /* HAVE_REPLICATION */
@@ -1536,6 +1760,11 @@ public:
bool write(IO_CACHE* file);
const char* get_db() { return db; }
#endif
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+#endif
};
@@ -1555,7 +1784,6 @@ public:
Delete_file_log_event(THD* thd, const char* db_arg, bool using_trans);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1572,6 +1800,11 @@ public:
bool write(IO_CACHE* file);
const char* get_db() { return db; }
#endif
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+#endif
};
@@ -1591,7 +1824,6 @@ public:
Execute_load_log_event(THD* thd, const char* db_arg, bool using_trans);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1607,6 +1839,11 @@ public:
bool write(IO_CACHE* file);
const char* get_db() { return db; }
#endif
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+#endif
};
@@ -1676,7 +1913,6 @@ public:
bool using_trans, bool suppress_use);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
- int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@@ -1695,7 +1931,12 @@ public:
#ifndef MYSQL_CLIENT
bool write_post_header_for_derived(IO_CACHE* file);
#endif
- };
+
+private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+#endif
+};
#ifdef MYSQL_CLIENT
@@ -1793,7 +2034,6 @@ public:
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
- virtual int exec_event(struct st_relay_log_info *rli);
virtual void pack_info(Protocol *protocol);
#endif
@@ -1803,6 +2043,11 @@ public:
private:
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+ virtual int do_update_pos(RELAY_LOG_INFO *rli);
+#endif
+
#ifndef MYSQL_CLIENT
TABLE *m_table;
#endif
@@ -1886,7 +2131,6 @@ public:
flag_set get_flags(flag_set flags) const { return m_flags & flags; }
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
- virtual int exec_event(struct st_relay_log_info *rli);
virtual void pack_info(Protocol *protocol);
#endif
@@ -1970,6 +2214,8 @@ protected:
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+ virtual int do_apply_event(RELAY_LOG_INFO const *rli);
+
/*
Primitive to prepare for a sequence of row executions.
@@ -2017,7 +2263,7 @@ private:
RETURN VALUE
Error code, if something went wrong, 0 otherwise.
*/
- virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
+ virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
char const *row_start, char const **row_end) = 0;
/*
@@ -2088,7 +2334,7 @@ private:
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
- virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
+ virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
char const *row_start, char const **row_end);
virtual int do_exec_row(TABLE *table);
#endif
@@ -2153,7 +2399,7 @@ private:
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
- virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
+ virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
char const *row_start, char const **row_end);
virtual int do_exec_row(TABLE *table);
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
@@ -2224,7 +2470,7 @@ private:
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
- virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
+ virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
char const *row_start, char const **row_end);
virtual int do_exec_row(TABLE *table);
#endif
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 8a051195dba..16e13f049e3 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -29,14 +29,15 @@ int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
st_relay_log_info::st_relay_log_info()
- :no_storage(FALSE), info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
+ :no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
+ info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
- unsafe_to_stop_at(0)
+ last_event_start_time(0)
{
DBUG_ENTER("st_relay_log_info::st_relay_log_info");
@@ -1001,6 +1002,22 @@ bool st_relay_log_info::is_until_satisfied()
log_pos= group_relay_log_pos;
}
+#ifndef DBUG_OFF
+ {
+ char buf[32];
+ DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
+ group_master_log_name, llstr(group_master_log_pos, buf)));
+ DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
+ group_relay_log_name, llstr(group_relay_log_pos, buf)));
+ DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
+ until_condition == UNTIL_MASTER_POS ? "master" : "relay",
+ log_name, llstr(log_pos, buf)));
+ DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
+ until_condition == UNTIL_MASTER_POS ? "master" : "relay",
+ until_log_name, llstr(until_log_pos, buf)));
+ }
+#endif
+
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
{
/*
@@ -1056,30 +1073,19 @@ void st_relay_log_info::cached_charset_invalidate()
}
-bool st_relay_log_info::cached_charset_compare(char *charset)
+bool st_relay_log_info::cached_charset_compare(char *charset) const
{
DBUG_ENTER("st_relay_log_info::cached_charset_compare");
if (bcmp(cached_charset, charset, sizeof(cached_charset)))
{
- memcpy(cached_charset, charset, sizeof(cached_charset));
+ memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
-void st_relay_log_info::transaction_end(THD* thd)
-{
- DBUG_ENTER("st_relay_log_info::transaction_end");
-
- /*
- Nothing to do here right now.
- */
-
- DBUG_VOID_RETURN;
-}
-
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
void st_relay_log_info::cleanup_context(THD *thd, bool error)
{
@@ -1106,7 +1112,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables();
close_thread_tables(thd);
clear_tables_to_lock();
- unsafe_to_stop_at= 0;
+ last_event_start_time= 0;
DBUG_VOID_RETURN;
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 45c9fb1cf96..3f06e108f6d 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -58,6 +58,15 @@ typedef struct st_relay_log_info
*/
bool no_storage;
+ /*
+ If true, events with the same server id should be replicated. This
+ field is set on creation of a relay log info structure by copying
+ the value of ::replicate_same_server_id and can be overridden if
+ necessary. For example of when this is done, check sql_binlog.cc,
+ where the BINLOG statement can be used to execute "raw" events.
+ */
+ bool replicate_same_server_id;
+
/*** The following variables can only be read when protect by data lock ****/
/*
@@ -292,14 +301,19 @@ typedef struct st_relay_log_info
When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
*/
void cached_charset_invalidate();
- bool cached_charset_compare(char *charset);
-
- void transaction_end(THD*);
+ bool cached_charset_compare(char *charset) const;
void cleanup_context(THD *, bool);
void clear_tables_to_lock();
- time_t unsafe_to_stop_at;
+ /*
+ Used by row-based replication to detect that it should not stop at
+ this event, but give it a chance to send more events. The time
+ where the last event inside a group started is stored here. If the
+ variable is zero, we are not in a group (but may be in a
+ transaction).
+ */
+ time_t last_event_start_time;
} RELAY_LOG_INFO;
diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc
index 65a44a4947b..1d7cc808f0c 100644
--- a/sql/rpl_utility.cc
+++ b/sql/rpl_utility.cc
@@ -108,7 +108,7 @@ field_length_from_packed(enum_field_types const field_type,
*/
int
-table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table)
+table_def::compatible_with(RELAY_LOG_INFO const *rli_arg, TABLE *table)
const
{
/*
@@ -116,6 +116,7 @@ table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table)
*/
uint const cols_to_check= min(table->s->fields, size());
int error= 0;
+ RELAY_LOG_INFO const *rli= const_cast<RELAY_LOG_INFO*>(rli_arg);
TABLE_SHARE const *const tsh= table->s;
diff --git a/sql/rpl_utility.h b/sql/rpl_utility.h
index b1aa642619c..17879a9ecfc 100644
--- a/sql/rpl_utility.h
+++ b/sql/rpl_utility.h
@@ -117,7 +117,7 @@ public:
@retval 1 if the table definition is not compatible with @c table
@retval 0 if the table definition is compatible with @c table
*/
- int compatible_with(RELAY_LOG_INFO *rli, TABLE *table) const;
+ int compatible_with(RELAY_LOG_INFO const *rli, TABLE *table) const;
private:
my_size_t m_size; // Number of elements in the types array
diff --git a/sql/slave.cc b/sql/slave.cc
index 6f62f74647a..278edae99f4 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -519,11 +519,11 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
really one minute of idleness, we don't timeout if the slave SQL thread
is actively working.
*/
- if (!rli->unsafe_to_stop_at)
+ if (rli->last_event_start_time == 0)
DBUG_RETURN(1);
DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
"it some grace period"));
- if (difftime(time(0), rli->unsafe_to_stop_at) > 60)
+ if (difftime(time(0), rli->last_event_start_time) > 60)
{
slave_print_msg(ERROR_LEVEL, rli, 0,
"SQL thread had to stop in an unsafe situation, in "
@@ -557,7 +557,7 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
void
*/
-void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
+void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli,
int err_code, const char* msg, ...)
{
void (*report_function)(const char *, ...);
@@ -579,9 +579,9 @@ void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
It's an error, it must be reported in Last_error and Last_errno in SHOW
SLAVE STATUS.
*/
- pbuff= rli->last_slave_error;
+ pbuff= const_cast<RELAY_LOG_INFO*>(rli)->last_slave_error;
pbuffsize= sizeof(rli->last_slave_error);
- rli->last_slave_errno = err_code;
+ const_cast<RELAY_LOG_INFO*>(rli)->last_slave_errno = err_code;
report_function= sql_print_error;
break;
case WARNING_LEVEL:
@@ -813,7 +813,7 @@ do not trust column Seconds_Behind_Master of SHOW SLAVE STATUS");
{
if ((master_row= mysql_fetch_row(master_res)) &&
(::server_id == strtoul(master_row[1], 0, 10)) &&
- !replicate_same_server_id)
+ !mi->rli.replicate_same_server_id)
errmsg= "The slave I/O thread stops because master and slave have equal \
MySQL server ids; these ids must be different for replication to work (or \
the --replicate-same-server-id option must be used on slave but this does \
@@ -1390,7 +1390,7 @@ void set_slave_thread_options(THD* thd)
DBUG_VOID_RETURN;
}
-void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli)
+void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO const *rli)
{
DBUG_ENTER("set_slave_thread_default_charset");
@@ -1401,7 +1401,14 @@ void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli)
thd->variables.collation_server=
global_system_variables.collation_server;
thd->update_charset();
- rli->cached_charset_invalidate();
+
+ /*
+ We use a const cast here since the conceptual (and externally
+ visible) behavior of the function is to set the default charset of
+ the thread. That the cache has to be invalidated is a secondary
+ effect.
+ */
+ const_cast<RELAY_LOG_INFO*>(rli)->cached_charset_invalidate();
DBUG_VOID_RETURN;
}
@@ -1609,7 +1616,8 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
}
-int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
+int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli,
+ int expected_error)
{
DBUG_ENTER("check_expected_error");
@@ -1715,77 +1723,42 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
if (ev)
{
int type_code = ev->get_type_code();
- int exec_res;
+ int exec_res= 0;
/*
- Queries originating from this server must be skipped.
- Low-level events (Format_desc, Rotate, Stop) from this server
- must also be skipped. But for those we don't want to modify
- group_master_log_pos, because these events did not exist on the master.
- Format_desc is not completely skipped.
- Skip queries specified by the user in slave_skip_counter.
- We can't however skip events that has something to do with the
- log files themselves.
- Filtering on own server id is extremely important, to ignore execution of
- events created by the creation/rotation of the relay log (remember that
- now the relay log starts with its Format_desc, has a Rotate etc).
*/
- DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id));
+ DBUG_PRINT("info",("type_code=%d (%s), server_id=%d",
+ type_code, ev->get_type_str(), ev->server_id));
+ DBUG_PRINT("info", ("thd->options={ %s%s}",
+ FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
+ FLAGSTR(thd->options, OPTION_BEGIN)));
- if ((ev->server_id == (uint32) ::server_id &&
- !replicate_same_server_id &&
- type_code != FORMAT_DESCRIPTION_EVENT) ||
- (rli->slave_skip_counter &&
- type_code != ROTATE_EVENT && type_code != STOP_EVENT &&
- type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
- {
- DBUG_PRINT("info", ("event skipped"));
- /*
- We only skip the event here and do not increase the group log
- position. In the event that we have to restart, this means
- that we might have to skip the event again, but that is a
- minor issue.
-
- If we were to increase the group log position when skipping an
- event, it might be that we are restarting at the wrong
- position and have events before that we should have executed,
- so not increasing the group log position is a sure bet in this
- case.
-
- In this way, we just step the group log position when we
- *know* that we are at the end of a group.
- */
- rli->inc_event_relay_log_pos();
- /*
- Protect against common user error of setting the counter to 1
- instead of 2 while recovering from an insert which used auto_increment,
- rand or user var.
- */
- if (rli->slave_skip_counter &&
- !((type_code == INTVAR_EVENT ||
- type_code == RAND_EVENT ||
- type_code == USER_VAR_EVENT) &&
- rli->slave_skip_counter == 1) &&
- /*
- The events from ourselves which have something to do with the relay
- log itself must be skipped, true, but they mustn't decrement
- rli->slave_skip_counter, because the user is supposed to not see
- these events (they are not in the master's binlog) and if we
- decremented, START SLAVE would for example decrement when it sees
- the Rotate, so the event which the user probably wanted to skip
- would not be skipped.
- */
- !(ev->server_id == (uint32) ::server_id &&
- (type_code == ROTATE_EVENT || type_code == STOP_EVENT ||
- type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT)))
- --rli->slave_skip_counter;
- pthread_mutex_unlock(&rli->data_lock);
- delete ev;
- DBUG_RETURN(0); // avoid infinite update loops
- }
- pthread_mutex_unlock(&rli->data_lock);
+
+ /*
+ Execute the event to change the database and update the binary
+ log coordinates, but first we set some data that is needed for
+ the thread.
+
+ The event will be executed unless it is supposed to be skipped.
+
+ Queries originating from this server must be skipped. Low-level
+ events (Format_description_log_event, Rotate_log_event,
+ Stop_log_event) from this server must also be skipped. But for
+ those we don't want to modify 'group_master_log_pos', because
+ these events did not exist on the master.
+ Format_description_log_event is not completely skipped.
+
+ Skip queries specified by the user in 'slave_skip_counter'. We
+ can't however skip events that has something to do with the log
+ files themselves.
+
+ Filtering on own server id is extremely important, to ignore
+ execution of events created by the creation/rotation of the relay
+ log (remember that now the relay log starts with its Format_desc,
+ has a Rotate etc).
+ */
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
@@ -1793,13 +1766,63 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
if (!ev->when)
ev->when = time(NULL);
ev->thd = thd; // because up to this point, ev->thd == 0
- DBUG_PRINT("info", ("thd->options={ %s%s}",
- FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
- FLAGSTR(thd->options, OPTION_BEGIN)));
- exec_res = ev->exec_event(rli);
- DBUG_PRINT("info", ("exec_event result: %d", exec_res));
- DBUG_ASSERT(rli->sql_thd==thd);
+ int reason= ev->shall_skip(rli);
+ if (reason == Log_event::EVENT_SKIP_COUNT)
+ --rli->slave_skip_counter;
+ pthread_mutex_unlock(&rli->data_lock);
+ if (reason == Log_event::EVENT_SKIP_NOT)
+ exec_res= ev->apply_event(rli);
+#ifndef DBUG_OFF
+ else
+ {
+ /*
+ This only prints information to the debug trace.
+
+ TODO: Print an informational message to the error log?
+ */
+ static const char *const explain[] = {
+ "event was not skipped", // EVENT_SKIP_NOT,
+ "event originated from this server", // EVENT_SKIP_IGNORE,
+ "event skip counter was non-zero" // EVENT_SKIP_COUNT
+ };
+ DBUG_PRINT("info", ("%s was skipped because %s",
+ ev->get_type_str(), explain[reason]));
+ }
+#endif
+
+ DBUG_PRINT("info", ("apply_event error = %d", exec_res));
+ if (exec_res == 0)
+ {
+ int error= ev->update_pos(rli);
+ char buf[22];
+ DBUG_PRINT("info", ("update_pos error = %d", error));
+ DBUG_PRINT("info", ("group %s %s",
+ llstr(rli->group_relay_log_pos, buf),
+ rli->group_relay_log_name));
+ DBUG_PRINT("info", ("event %s %s",
+ llstr(rli->event_relay_log_pos, buf),
+ rli->event_relay_log_name));
+ /*
+ The update should not fail, so print an error message and
+ return an error code.
+
+ TODO: Replace this with a decent error message when merged
+ with BUG#24954 (which adds several new error message).
+ */
+ if (error)
+ {
+ slave_print_msg(ERROR_LEVEL, rli, ER_UNKNOWN_ERROR,
+ "It was not possible to update the positions"
+ " of the relay log information: the slave may"
+ " be in an inconsistent state."
+ " Stopped in %s position %s",
+ rli->group_relay_log_name,
+ llstr(rli->group_relay_log_pos, buf));
+ DBUG_RETURN(1);
+ }
+ }
+
/*
Format_description_log_event should not be deleted because it will be
used to read info about the relay log's format; it will be deleted when
@@ -2366,13 +2389,17 @@ Slave SQL thread aborted. Can't execute init_slave query");
THD_CHECK_SENTRY(thd);
if (exec_relay_log_event(thd,rli))
{
+ DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(thd,rli))
{
/*
- retrieve as much info as possible from the thd and, error codes and warnings
- and print this to the error log as to allow the user to locate the error
+ retrieve as much info as possible from the thd and, error
+ codes and warnings and print this to the error log as to
+ allow the user to locate the error
*/
+ DBUG_PRINT("info", ("thd->net.last_errno=%d; rli->last_slave_errno=%d",
+ thd->net.last_errno, rli->last_slave_errno));
if (thd->net.last_errno != 0)
{
if (rli->last_slave_errno == 0)
@@ -2699,6 +2726,7 @@ static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(1);
}
+
pthread_mutex_lock(&mi->data_lock);
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
switch (ev->get_type_code()) {
@@ -2962,7 +2990,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
pthread_mutex_lock(log_lock);
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
- !replicate_same_server_id)
+ !mi->rli.replicate_same_server_id)
{
/*
Do not write it to the relay log.
diff --git a/sql/slave.h b/sql/slave.h
index f21266bbee4..107b74c09dd 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -162,9 +162,9 @@ bool show_binlog_info(THD* thd);
bool rpl_master_has_bug(RELAY_LOG_INFO *rli, uint bug_id);
const char *print_slave_db_safe(const char *db);
-int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int error_code);
+int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli, int error_code);
void skip_load_data_infile(NET* net);
-void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
+void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli,
int err_code, const char* msg, ...)
ATTRIBUTE_FORMAT(printf, 4, 5);
@@ -182,7 +182,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos,
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
const char** errmsg);
void set_slave_thread_options(THD* thd);
-void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli);
+void set_slave_thread_default_charset(THD *thd, RELAY_LOG_INFO const *rli);
void rotate_relay_log(MASTER_INFO* mi);
pthread_handler_t handle_slave_io(void *arg);
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index b0a54bec664..6f7bbda96de 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -163,9 +163,17 @@ void mysql_client_binlog_statement(THD* thd)
(ulong) uint4korr(bufptr+EVENT_LEN_OFFSET)));
#endif
ev->thd= thd;
- if (IF_DBUG(int err= ) ev->exec_event(thd->rli_fake))
+ /*
+ We go directly to the application phase, since we don't need
+ to check if the event shall be skipped or not.
+
+ Neither do we have to update the log positions, since that is
+ not used at all: the rli_fake instance is used only for error
+ reporting.
+ */
+ if (IF_DBUG(int err= ) ev->apply_event(thd->rli_fake))
{
- DBUG_PRINT("error", ("exec_event() returned: %d", err));
+ DBUG_PRINT("info", ("apply_event() returned: %d", err));
/*
TODO: Maybe a better error message since the BINLOG statement
now contains several events.