summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc129
1 files changed, 110 insertions, 19 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 09b1d79b1de..cb52abc68b3 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1965,6 +1965,55 @@ static int count_relay_log_space(RELAY_LOG_INFO* rli)
}
+/*
+ Builds a Rotate from the ignored events' info and writes it to relay log.
+
+ SYNOPSIS
+ write_ignored_events_info_to_relay_log()
+ thd pointer to I/O thread's thd
+ mi
+
+ DESCRIPTION
+ Slave I/O thread, going to die, must leave a durable trace of the
+ ignored events' end position for the use of the slave SQL thread, by
+ calling this function. Only that thread can call it (see assertion).
+ */
+static void write_ignored_events_info_to_relay_log(THD *thd, MASTER_INFO *mi)
+{
+ RELAY_LOG_INFO *rli= &mi->rli;
+ pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
+ DBUG_ASSERT(thd == mi->io_thd);
+ pthread_mutex_lock(log_lock);
+ if (rli->ign_master_log_name_end[0])
+ {
+ DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
+ Rotate_log_event *ev= new Rotate_log_event(thd, rli->ign_master_log_name_end,
+ 0, rli->ign_master_log_pos_end,
+ Rotate_log_event::DUP_NAME);
+ rli->ign_master_log_name_end[0]= 0;
+ /* can unlock before writing as slave SQL thd will soon see our Rotate */
+ pthread_mutex_unlock(log_lock);
+ if (likely((bool)ev))
+ {
+ ev->server_id= 0; // don't be ignored by slave SQL thread
+ if (unlikely(rli->relay_log.append(ev)))
+ sql_print_error("Slave I/O thread failed to write a Rotate event"
+ " to the relay log, "
+ "SHOW SLAVE STATUS may be inaccurate");
+ rli->relay_log.harvest_bytes_written(&rli->log_space_total);
+ flush_master_info(mi, 1);
+ delete ev;
+ }
+ else
+ sql_print_error("Slave I/O thread failed to create a Rotate event"
+ " (out of memory?), "
+ "SHOW SLAVE STATUS may be inaccurate");
+ }
+ else
+ pthread_mutex_unlock(log_lock);
+}
+
+
void init_master_info_with_options(MASTER_INFO* mi)
{
mi->master_log_name[0] = 0;
@@ -2561,7 +2610,7 @@ st_relay_log_info::st_relay_log_info()
{
group_relay_log_name[0]= event_relay_log_name[0]=
group_master_log_name[0]= 0;
- last_slave_error[0]=0; until_log_name[0]= 0;
+ last_slave_error[0]= until_log_name[0]= ign_master_log_name_end[0]= 0;
bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf));
@@ -3154,12 +3203,20 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
wait for something for example inside of next_event().
*/
pthread_mutex_lock(&rli->data_lock);
-
+ /*
+ This tests if the position of the end of the last previous executed event
+ hits the UNTIL barrier.
+ We would prefer to test if the position of the start (or possibly) end of
+ the to-be-read event hits the UNTIL barrier, this is different if there
+ was an event ignored by the I/O thread just before (BUG#13861 to be
+ fixed).
+ */
if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE &&
rli->is_until_satisfied())
{
+ char buf[22];
sql_print_error("Slave SQL thread stopped because it reached its"
- " UNTIL position %ld", (long) rli->until_pos());
+ " UNTIL position %s", llstr(rli->until_pos(), buf));
/*
Setting abort_slave flag because we do not want additional message about
error in query execution to be printed.
@@ -3347,6 +3404,7 @@ pthread_handler_t handle_slave_io(void *arg)
THD *thd; // needs to be first for thread_stack
MYSQL *mysql;
MASTER_INFO *mi = (MASTER_INFO*)arg;
+ RELAY_LOG_INFO *rli= &mi->rli;
char llbuff[22];
uint retry_count;
@@ -3589,16 +3647,16 @@ reconnect done to recover from failed read");
char llbuf1[22], llbuf2[22];
DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s \
ignore_log_space_limit=%d",
- llstr(mi->rli.log_space_limit,llbuf1),
- llstr(mi->rli.log_space_total,llbuf2),
- (int) mi->rli.ignore_log_space_limit));
+ llstr(rli->log_space_limit,llbuf1),
+ llstr(rli->log_space_total,llbuf2),
+ (int) rli->ignore_log_space_limit));
}
#endif
- if (mi->rli.log_space_limit && mi->rli.log_space_limit <
- mi->rli.log_space_total &&
- !mi->rli.ignore_log_space_limit)
- if (wait_for_relay_log_space(&mi->rli))
+ if (rli->log_space_limit && rli->log_space_limit <
+ rli->log_space_total &&
+ !rli->ignore_log_space_limit)
+ if (wait_for_relay_log_space(rli))
{
sql_print_error("Slave I/O thread aborted while waiting for relay \
log space");
@@ -3629,6 +3687,7 @@ err:
mysql_close(mysql);
mi->mysql=0;
}
+ write_ignored_events_info_to_relay_log(thd, mi);
thd->proc_info = "Waiting for slave mutex on exit";
pthread_mutex_lock(&mi->run_lock);
mi->slave_running = 0;
@@ -4013,6 +4072,7 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
if (unlikely(!rev->is_valid()))
DBUG_RETURN(1);
+ /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
mi->master_log_pos= rev->pos;
DBUG_PRINT("info", ("master_log_pos: '%s' %d",
@@ -4263,6 +4323,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
int error= 0;
ulong inc_pos;
RELAY_LOG_INFO *rli= &mi->rli;
+ pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
DBUG_ENTER("queue_event");
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
@@ -4271,11 +4332,6 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
pthread_mutex_lock(&mi->data_lock);
- /*
- TODO: figure out if other events in addition to Rotate
- require special processing.
- Guilhem 2003-06 : I don't think so.
- */
switch (buf[EVENT_TYPE_OFFSET]) {
case STOP_EVENT:
/*
@@ -4360,14 +4416,21 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
direct master (an unsupported, useless setup!).
*/
+ pthread_mutex_lock(log_lock);
+
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
!replicate_same_server_id)
{
/*
Do not write it to the relay log.
- We still want to increment, so that we won't re-read this event from the
- master if the slave IO thread is now stopped/restarted (more efficient if
- the events we are ignoring are big LOAD DATA INFILE).
+ a) We still want to increment mi->master_log_pos, so that we won't
+ re-read this event from the master if the slave IO thread is now
+ stopped/restarted (more efficient if the events we are ignoring are big
+ LOAD DATA INFILE).
+ b) We want to record that we are skipping events, for the information of
+ the slave SQL thread, otherwise that thread may let
+ rli->group_relay_log_pos stay too small if the last binlog's event is
+ ignored.
But events which were generated by this slave and which do not exist in
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
mi->master_log_pos.
@@ -4376,6 +4439,10 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
mi->master_log_pos+= inc_pos;
+ memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
+ DBUG_ASSERT(rli->ign_master_log_name_end[0]);
+ rli->ign_master_log_pos_end= mi->master_log_pos;
+ rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos));
}
else
@@ -4388,8 +4455,11 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
}
else
- error=3;
+ error= 3;
+ rli->ign_master_log_name_end[0]= 0; // last event is not ignored
}
+ pthread_mutex_unlock(log_lock);
+
err:
pthread_mutex_unlock(&mi->data_lock);
@@ -4773,6 +4843,27 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
rli->last_master_timestamp= 0;
DBUG_ASSERT(rli->relay_log.get_open_count() == rli->cur_log_old_open_count);
+
+ if (rli->ign_master_log_name_end[0])
+ {
+ /* We generate and return a Rotate, to make our positions advance */
+ DBUG_PRINT("info",("seeing an ignored end segment"));
+ ev= new Rotate_log_event(thd, rli->ign_master_log_name_end,
+ 0, rli->ign_master_log_pos_end,
+ Rotate_log_event::DUP_NAME |
+ Rotate_log_event::ZERO_LEN);
+ rli->ign_master_log_name_end[0]= 0;
+ if (unlikely(!ev))
+ {
+ errmsg= "Slave SQL thread failed to create a Rotate event "
+ "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
+ goto err;
+ }
+ pthread_mutex_unlock(log_lock);
+ ev->server_id= 0; // don't be ignored by slave SQL thread
+ DBUG_RETURN(ev);
+ }
+
/*
We can, and should release data_lock while we are waiting for
update. If we do not, show slave status will block