summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc1081
1 files changed, 793 insertions, 288 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 605f8289946..70803c88c3a 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1,15 +1,15 @@
/* Copyright (C) 2000-2003 MySQL AB
-
+
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
-
+
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
-
+
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
@@ -75,7 +75,6 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
const char* table_name, bool overwrite);
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi);
-
/*
Find out which replications threads are running
@@ -160,7 +159,7 @@ int init_slave()
sql_print_error("Failed to allocate memory for the master info structure");
goto err;
}
-
+
if (init_master_info(active_mi,master_info_file,relay_log_info_file,
!master_host, (SLAVE_IO | SLAVE_SQL)))
{
@@ -219,6 +218,13 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
pos Position in relay log file
need_data_lock Set to 1 if this functions should do mutex locks
errmsg Store pointer to error message here
+ look_for_description_event
+ 1 if we should look for such an event. We only need
+ this when the SQL thread starts and opens an existing
+ relay log and has to execute it (possibly from an
+ offset >4); then we need to read the first event of
+ the relay log to be able to parse the events we have
+ to execute.
DESCRIPTION
- Close old open relay log files.
@@ -236,15 +242,35 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
ulonglong pos, bool need_data_lock,
- const char** errmsg)
+ const char** errmsg,
+ bool look_for_description_event)
{
DBUG_ENTER("init_relay_log_pos");
+ DBUG_PRINT("info", ("pos=%lu", pos));
*errmsg=0;
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
if (need_data_lock)
pthread_mutex_lock(&rli->data_lock);
+
+ /*
+ Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
+ is, too, and init_slave() too; these 2 functions allocate a description
+ event in init_relay_log_pos, which is not freed by the terminating SQL slave
+ thread as that thread is not started by these functions. So we have to free
+ the description_event here, in case, so that there is no memory leak in
+ running, say, CHANGE MASTER.
+ */
+ delete rli->relay_log.description_event_for_exec;
+ /*
+ By default the relay log is in binlog format 3 (4.0).
+ Even if format is 4, this will work enough to read the first event
+ (Format_desc) (remember that format 4 is just lenghtened compared to format
+ 3; format 3 is a prefix of format 4).
+ */
+ rli->relay_log.description_event_for_exec= new
+ Format_description_log_event(3);
pthread_mutex_lock(log_lock);
@@ -284,9 +310,8 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
In this case, we will use the same IO_CACHE pointer to
read data as the IO thread is using to write data.
*/
- rli->cur_log= rli->relay_log.get_log_file();
- if (my_b_tell(rli->cur_log) == 0 &&
- check_binlog_magic(rli->cur_log, errmsg))
+ my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
+ if (check_binlog_magic(rli->cur_log,errmsg))
goto err;
rli->cur_log_old_open_count=rli->relay_log.get_open_count();
}
@@ -300,8 +325,85 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
goto err;
rli->cur_log = &rli->cache_buf;
}
- if (pos >= BIN_LOG_HEADER_SIZE)
+ /*
+ In all cases, check_binlog_magic() has been called so we're at offset 4 for
+ sure.
+ */
+ if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
+ {
+ Log_event* ev;
+ while (look_for_description_event)
+ {
+ /*
+ Read the possible Format_description_log_event; if position
+ was 4, no need, it will be read naturally.
+ */
+ DBUG_PRINT("info",("looking for a Format_description_log_event"));
+
+ if (my_b_tell(rli->cur_log) >= pos)
+ break;
+
+ /*
+ Because of we have rli->data_lock and log_lock, we can safely read an
+ event
+ */
+ if (!(ev=Log_event::read_log_event(rli->cur_log,0,
+ rli->relay_log.description_event_for_exec)))
+ {
+ DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
+ rli->cur_log->error));
+ if (rli->cur_log->error) /* not EOF */
+ {
+ *errmsg= "I/O error reading event at position 4";
+ goto err;
+ }
+ break;
+ }
+ else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
+ {
+ DBUG_PRINT("info",("found Format_description_log_event"));
+ delete rli->relay_log.description_event_for_exec;
+ rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
+ /*
+ As ev was returned by read_log_event, it has passed is_valid(), so
+ my_malloc() in ctor worked, no need to check again.
+ */
+ /*
+ Ok, we found a Format_description event. But it is not sure that this
+ describes the whole relay log; indeed, one can have this sequence
+ (starting from position 4):
+ Format_desc (of slave)
+ Rotate (of master)
+ Format_desc (of master)
+ So the Format_desc which really describes the rest of the relay log
+ is the 3rd event (it can't be further than that, because we rotate
+ the relay log when we queue a Rotate event from the master).
+ But what describes the Rotate is the first Format_desc.
+ So what we do is:
+ go on searching for Format_description events, until you exceed the
+ position (argument 'pos') or until you find another event than Rotate
+ or Format_desc.
+ */
+ }
+ else
+ {
+ DBUG_PRINT("info",("found event of another type=%d",
+ ev->get_type_code()));
+ look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
+ delete ev;
+ }
+ }
my_b_seek(rli->cur_log,(off_t)pos);
+#ifndef DBUG_OFF
+ {
+ char llbuf1[22], llbuf2[22];
+ DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
+ llstr(my_b_tell(rli->cur_log),llbuf1),
+ llstr(rli->event_relay_log_pos,llbuf2)));
+ }
+#endif
+
+ }
err:
/*
@@ -316,13 +418,15 @@ err:
if (need_data_lock)
pthread_mutex_unlock(&rli->data_lock);
+ if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
+ *errmsg= "Invalid Format_description log event; could be out of memory";
DBUG_RETURN ((*errmsg) ? 1 : 0);
}
/*
- Init functio to set up array for errors that should be skipped for slave
+ Init function to set up array for errors that should be skipped for slave
SYNOPSIS
init_slave_skip_errors()
@@ -361,16 +465,15 @@ void init_slave_skip_errors(const char* arg)
}
-void st_relay_log_info::inc_group_relay_log_pos(ulonglong val,
- ulonglong log_pos,
- bool skip_lock)
+void st_relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
+ bool skip_lock)
{
if (!skip_lock)
pthread_mutex_lock(&data_lock);
- inc_event_relay_log_pos(val);
+ inc_event_relay_log_pos();
group_relay_log_pos= event_relay_log_pos;
strmake(group_relay_log_name,event_relay_log_name,
- sizeof(group_relay_log_name)-1);
+ sizeof(group_relay_log_name)-1);
notify_group_relay_log_name_update();
@@ -384,24 +487,31 @@ void st_relay_log_info::inc_group_relay_log_pos(ulonglong val,
not advance as it should on the non-transactional slave (it advances by
big leaps, whereas it should advance by small leaps).
*/
+ /*
+ In 4.x we used the event's len to compute the positions here. This is
+ wrong if the event was 3.23/4.0 and has been converted to 5.0, because
+ then the event's len is not what is was in the master's binlog, so this
+ will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
+ replication: Exec_master_log_pos is wrong). Only way to solve this is to
+ have the original offset of the end of the event the relay log. This is
+ what we do in 5.0: log_pos has become "end_log_pos" (because the real use
+ of log_pos in 4.0 was to compute the end_log_pos; so better to store
+ end_log_pos instead of begin_log_pos.
+ If we had not done this fix here, the problem would also have appeared
+ when the slave and master are 5.0 but with different event length (for
+ example the slave is more recent than the master and features the event
+ UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
+ SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
+ value which would lead to badly broken replication.
+ Even the relay_log_pos will be corrupted in this case, because the len is
+ the relay log is not "val".
+ With the end_log_pos solution, we avoid computations involving lengthes.
+ */
+ DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
+ (long) log_pos, (long) group_master_log_pos));
if (log_pos) // 3.23 binlogs don't have log_posx
{
-#if MYSQL_VERSION_ID < 50000
- /*
- If the event was converted from a 3.23 format, get_event_len() has
- grown by 6 bytes (at least for most events, except LOAD DATA INFILE
- which is already a big problem for 3.23->4.0 replication); 6 bytes is
- the difference between the header's size in 4.0 (LOG_EVENT_HEADER_LEN)
- and the header's size in 3.23 (OLD_HEADER_LEN). Note that using
- mi->old_format will not help if the I/O thread has not started yet.
- Yes this is a hack but it's just to make 3.23->4.x replication work;
- 3.23->5.0 replication is working much better.
- */
- group_master_log_pos= log_pos + val -
- (mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0);
-#else
- group_master_log_pos= log_pos+ val;
-#endif /* MYSQL_VERSION_ID < 5000 */
+ group_master_log_pos= log_pos;
}
pthread_cond_broadcast(&data_cond);
if (!skip_lock)
@@ -442,9 +552,9 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
/*
Even if rli->inited==0, we still try to empty rli->master_log_* variables.
Indeed, rli->inited==0 does not imply that they already are empty.
- It could be that slave's info initialization partly succeeded :
+ It could be that slave's info initialization partly succeeded :
for example if relay-log.info existed but *relay-bin*.*
- have been manually removed, init_relay_log_info reads the old
+ have been manually removed, init_relay_log_info reads the old
relay-log.info and fills rli->master_log_*, then init_relay_log_info
checks for the existence of the relay log, this fails and
init_relay_log_info leaves rli->inited to 0.
@@ -453,7 +563,7 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
MASTER, the callers of purge_relay_logs, will delete bogus *.info files
or replace them with correct files), however if the user does SHOW SLAVE
STATUS before START SLAVE, he will see old, confusing rli->master_log_*.
- In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
+ In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS
to display fine in any case.
*/
@@ -482,13 +592,16 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
sizeof(rli->group_relay_log_name)-1);
strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
sizeof(rli->event_relay_log_name)-1);
- // Just first log with magic number and nothing else
- rli->log_space_total= BIN_LOG_HEADER_SIZE;
rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
- rli->relay_log.reset_bytes_written();
+ if (count_relay_log_space(rli))
+ {
+ *errmsg= "Error counting relay log space";
+ goto err;
+ }
if (!just_reset)
- error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos,
- 0 /* do not need data lock */, errmsg);
+ error= init_relay_log_pos(rli, rli->group_relay_log_name,
+ rli->group_relay_log_pos,
+ 0 /* do not need data lock */, errmsg, 0);
err:
#ifndef DBUG_OFF
@@ -548,24 +661,26 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
pthread_cond_t* term_cond,
volatile uint *slave_running)
{
+ DBUG_ENTER("terminate_slave_thread");
if (term_lock)
{
pthread_mutex_lock(term_lock);
if (!*slave_running)
{
pthread_mutex_unlock(term_lock);
- return ER_SLAVE_NOT_RUNNING;
+ DBUG_RETURN(ER_SLAVE_NOT_RUNNING);
}
}
DBUG_ASSERT(thd != 0);
+ THD_CHECK_SENTRY(thd);
/*
- Is is criticate to test if the slave is running. Otherwise, we might
+ Is is critical to test if the slave is running. Otherwise, we might
be referening freed memory trying to kick it
*/
- THD_CHECK_SENTRY(thd);
while (*slave_running) // Should always be true
{
+ DBUG_PRINT("loop", ("killing slave thread"));
KICK_SLAVE(thd);
/*
There is a small chance that slave thread might miss the first
@@ -577,7 +692,7 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
}
if (term_lock)
pthread_mutex_unlock(term_lock);
- return 0;
+ DBUG_RETURN(0);
}
@@ -636,7 +751,7 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock,
thd->exit_cond(old_msg);
pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released
if (thd->killed)
- DBUG_RETURN(ER_SERVER_SHUTDOWN);
+ DBUG_RETURN(thd->killed_errno());
}
}
if (start_lock)
@@ -737,7 +852,7 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
SYNOPSIS
tables_ok()
- thd thread (SQL slave thread normally)
+ thd thread (SQL slave thread normally). Mustn't be null.
tables list of tables to check
NOTES
@@ -755,6 +870,11 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
second call will make the decision (because
all_tables_not_ok() = !tables_ok(1st_list) && !tables_ok(2nd_list)).
+ Thought which arose from a question of a big customer "I want to include
+ all tables like "abc.%" except the "%.EFG"". This can't be done now. If we
+ supported Perl regexps we could do it with this pattern: /^abc\.(?!EFG)/
+ (I could not find an equivalent in the regex library MySQL uses).
+
RETURN VALUES
0 should not be logged/replicated
1 should be logged/replicated
@@ -765,7 +885,24 @@ bool tables_ok(THD* thd, TABLE_LIST* tables)
bool some_tables_updating= 0;
DBUG_ENTER("tables_ok");
- for (; tables; tables = tables->next)
+ /*
+ In routine, can't reliably pick and choose substatements, so always
+ replicate.
+ We can't reliably know if one substatement should be executed or not:
+ consider the case of this substatement: a SELECT on a non-replicated
+ constant table; if we don't execute it maybe it was going to fill a
+ variable which was going to be used by the next substatement to update
+ a replicated table? If we execute it maybe the constant non-replicated
+ table does not exist (and so we'll fail) while there was no need to
+ execute this as this SELECT does not influence replicated tables in the
+ rest of the routine? In other words: users are used to replicate-*-table
+ specifying how to handle updates to tables, these options don't say
+ anything about reads to tables; we can't guess.
+ */
+ if (thd->spcont)
+ DBUG_RETURN(1);
+
+ for (; tables; tables= tables->next_global)
{
char hash_key[2*NAME_LEN+2];
char *end;
@@ -776,7 +913,7 @@ bool tables_ok(THD* thd, TABLE_LIST* tables)
some_tables_updating= 1;
end= strmov(hash_key, tables->db ? tables->db : thd->db);
*end++= '.';
- len= (uint) (strmov(end, tables->real_name) - hash_key);
+ len= (uint) (strmov(end, tables->table_name) - hash_key);
if (do_table_inited) // if there are any do's
{
if (hash_search(&replicate_do_table, (byte*) hash_key, len))
@@ -1164,29 +1301,86 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
return 1;
}
+/*
+ Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
+ relying on the binlog's version. This is not perfect: imagine an upgrade
+ of the master without waiting that all slaves are in sync with the master;
+ then a slave could be fooled about the binlog's format. This is what happens
+ when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
+ slaves are fooled. So we do this only to distinguish between 3.23 and more
+ recent masters (it's too late to change things for 3.23).
+
+ RETURNS
+ 0 ok
+ 1 error
+*/
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi)
{
const char* errmsg= 0;
-
+
/*
- Note the following switch will bug when we have MySQL branch 30 ;)
+ Free old description_event_for_queue (that is needed if we are in
+ a reconnection).
*/
- switch (*mysql->server_version) {
- case '3':
- mi->old_format =
- (strncmp(mysql->server_version, "3.23.57", 7) < 0) /* < .57 */ ?
- BINLOG_FORMAT_323_LESS_57 :
- BINLOG_FORMAT_323_GEQ_57 ;
- break;
- case '4':
- mi->old_format = BINLOG_FORMAT_CURRENT;
- break;
- default:
- /* 5.0 is not supported */
- errmsg = "Master reported an unrecognized MySQL version. Note that 4.1 \
-slaves can't replicate a 5.0 or newer master.";
- break;
+ delete mi->rli.relay_log.description_event_for_queue;
+ mi->rli.relay_log.description_event_for_queue= 0;
+
+ if (!my_isdigit(&my_charset_bin,*mysql->server_version))
+ errmsg = "Master reported unrecognized MySQL version";
+ else
+ {
+ /*
+ Note the following switch will bug when we have MySQL branch 30 ;)
+ */
+ switch (*mysql->server_version)
+ {
+ case '0':
+ case '1':
+ case '2':
+ errmsg = "Master reported unrecognized MySQL version";
+ break;
+ case '3':
+ mi->rli.relay_log.description_event_for_queue= new
+ Format_description_log_event(1, mysql->server_version);
+ break;
+ case '4':
+ mi->rli.relay_log.description_event_for_queue= new
+ Format_description_log_event(3, mysql->server_version);
+ break;
+ default:
+ /*
+ Master is MySQL >=5.0. Give a default Format_desc event, so that we can
+ take the early steps (like tests for "is this a 3.23 master") which we
+ have to take before we receive the real master's Format_desc which will
+ override this one. Note that the Format_desc we create below is garbage
+ (it has the format of the *slave*); it's only good to help know if the
+ master is 3.23, 4.0, etc.
+ */
+ mi->rli.relay_log.description_event_for_queue= new
+ Format_description_log_event(4, mysql->server_version);
+ break;
+ }
+ }
+
+ /*
+ This does not mean that a 5.0 slave will be able to read a 6.0 master; but
+ as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
+ can't read a 6.0 master, this will show up when the slave can't read some
+ events sent by the master, and there will be error messages.
+ */
+
+ if (errmsg)
+ {
+ sql_print_error(errmsg);
+ return 1;
+ }
+
+ /* as we are here, we tried to allocate the event */
+ if (!mi->rli.relay_log.description_event_for_queue)
+ {
+ sql_print_error("Slave I/O thread failed to create a default Format_description_log_event");
+ return 1;
}
/*
@@ -1244,12 +1438,20 @@ not always make sense; please check the manual before using it).";
values of these 2 are never used (new connections don't use them).
We don't test equality of global collation_database either as it's is
going to be deprecated (made read-only) in 4.1 very soon.
- We don't do it for <3.23.57 because masters <3.23.50 hang on
- SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50).
+ The test is only relevant if master < 5.0.3 (we'll test only if it's older
+ than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores
+ charset info in each binlog event.
+ We don't do it for 3.23 because masters <3.23.50 hang on
+ SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we
+ test only if master is 4.x.
*/
- if (mi->old_format == BINLOG_FORMAT_323_LESS_57)
+
+ /* redundant with rest of code but safer against later additions */
+ if (*mysql->server_version == '3')
goto err;
- if (!mysql_real_query(mysql, "SELECT @@GLOBAL.COLLATION_SERVER", 32) &&
+
+ if ((*mysql->server_version == '4') &&
+ !mysql_real_query(mysql, "SELECT @@GLOBAL.COLLATION_SERVER", 32) &&
(master_res= mysql_store_result(mysql)))
{
if ((master_row= mysql_fetch_row(master_res)) &&
@@ -1272,8 +1474,11 @@ be equal for replication to work";
such check will broke everything for them. (And now everything will
work for them because by default both their master and slave will have
'SYSTEM' time zone).
+ This check is only necessary for 4.x masters (and < 5.0.4 masters but
+ those were alpha).
*/
- if (!mysql_real_query(mysql, "SELECT @@GLOBAL.TIME_ZONE", 25) &&
+ if ((*mysql->server_version == '4') &&
+ !mysql_real_query(mysql, "SELECT @@GLOBAL.TIME_ZONE", 25) &&
(master_res= mysql_store_result(mysql)))
{
if ((master_row= mysql_fetch_row(master_res)) &&
@@ -1324,7 +1529,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
packet_len= my_net_read(net); // read create table statement
if (packet_len == packet_error)
{
- send_error(thd, ER_MASTER_NET_READ);
+ my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
DBUG_RETURN(1);
}
if (net->read_pos[0] == 255) // error from master
@@ -1333,7 +1538,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
err_msg= (char*) net->read_pos + ((mysql->server_capabilities &
CLIENT_PROTOCOL_41) ?
3+SQLSTATE_LENGTH+1 : 3);
- net_printf(thd, ER_MASTER, err_msg);
+ my_error(ER_MASTER, MYF(0), err_msg);
DBUG_RETURN(1);
}
thd->command = COM_TABLE_DUMP;
@@ -1342,7 +1547,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
if (!(query = thd->strmake((char*) net->read_pos, packet_len)))
{
sql_print_error("create_table_from_dump: out of memory");
- net_printf(thd, ER_GET_ERRNO, "Out of memory");
+ my_message(ER_GET_ERRNO, "Out of memory", MYF(0));
DBUG_RETURN(1);
}
thd->query= query;
@@ -1351,12 +1556,11 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
bzero((char*) &tables,sizeof(tables));
tables.db = (char*)db;
- tables.alias= tables.real_name= (char*)table_name;
+ tables.alias= tables.table_name= (char*)table_name;
/* Drop the table if 'overwrite' is true */
if (overwrite && mysql_rm_table(thd,&tables,1,0)) /* drop if exists */
{
- send_error(thd);
sql_print_error("create_table_from_dump: failed to drop the table");
goto err;
}
@@ -1369,7 +1573,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
save_db = thd->db;
save_db_length= thd->db_length;
thd->db = (char*)db;
- DBUG_ASSERT(thd->db);
+ DBUG_ASSERT(thd->db != 0);
thd->db_length= strlen(thd->db);
mysql_parse(thd, thd->query, packet_len); // run create table
thd->db = save_db; // leave things the way the were before
@@ -1383,7 +1587,6 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
tables.lock_type = TL_WRITE;
if (!open_ltable(thd, &tables, TL_WRITE))
{
- send_error(thd,0,0); // Send error from open_ltable
sql_print_error("create_table_from_dump: could not open created table");
goto err;
}
@@ -1393,7 +1596,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
/* Copy the data file */
if (file->net_read_dump(net))
{
- net_printf(thd, ER_MASTER_NET_READ);
+ my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0));
sql_print_error("create_table_from_dump: failed in\
handler::net_read_dump()");
goto err;
@@ -1413,7 +1616,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
error=file->repair(thd,&check_opt) != 0;
thd->net.vio = save_vio;
if (error)
- net_printf(thd, ER_INDEX_REBUILD,tables.table->real_name);
+ my_error(ER_INDEX_REBUILD, MYF(0), tables.table->s->table_name);
err:
close_thread_tables(thd);
@@ -1436,12 +1639,11 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
{
if (!(mysql = mysql_init(NULL)))
{
- send_error(thd); // EOM
DBUG_RETURN(1);
}
if (connect_to_master(thd, mysql, mi))
{
- net_printf(thd, ER_CONNECT_TO_MASTER, mysql_error(mysql));
+ my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql));
mysql_close(mysql);
DBUG_RETURN(1);
}
@@ -1465,7 +1667,7 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name,
if (!called_connected)
mysql_close(mysql);
if (errmsg && thd->vio_ok())
- send_error(thd, error, errmsg);
+ my_message(error, errmsg, MYF(0));
DBUG_RETURN(test(error)); // Return 1 on error
}
@@ -1489,7 +1691,8 @@ void end_master_info(MASTER_INFO* mi)
}
-int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
+static int init_relay_log_info(RELAY_LOG_INFO* rli,
+ const char* info_fname)
{
char fname[FN_REFLEN+128];
int info_fd;
@@ -1497,7 +1700,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
int error = 0;
DBUG_ENTER("init_relay_log_info");
- if (rli->inited) // Set if this function called
+ if (rli->inited) // Set if this function called
DBUG_RETURN(0);
fn_format(fname, info_fname, mysql_data_home, "", 4+32);
pthread_mutex_lock(&rli->data_lock);
@@ -1508,23 +1711,10 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
rli->log_space_limit= relay_log_space_limit;
rli->log_space_total= 0;
- // TODO: make this work with multi-master
- if (!opt_relay_logname)
- {
- char tmp[FN_REFLEN];
- /*
- TODO: The following should be using fn_format(); We just need to
- first change fn_format() to cut the file name if it's too long.
- */
- strmake(tmp,glob_hostname,FN_REFLEN-5);
- strmov(strcend(tmp,'.'),"-relay-bin");
- opt_relay_logname=my_strdup(tmp,MYF(MY_WME));
- }
-
/*
The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
- Note that the I/O thread flushes it to disk after writing every event, in
- flush_master_info(mi, 1).
+ Note that the I/O thread flushes it to disk after writing every
+ event, in flush_master_info(mi, 1).
*/
/*
@@ -1536,16 +1726,25 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
switch to using max_binlog_size for the relay log) and update
rli->relay_log.max_size (and mysql_bin_log.max_size).
*/
-
- if (open_log(&rli->relay_log, glob_hostname, opt_relay_logname,
- "-relay-bin", opt_relaylog_index_name,
- LOG_BIN, 1 /* read_append cache */,
- 1 /* no auto events */,
- max_relay_log_size ? max_relay_log_size : max_binlog_size))
{
- pthread_mutex_unlock(&rli->data_lock);
- sql_print_error("Failed in open_log() called from init_relay_log_info()");
- DBUG_RETURN(1);
+ char buf[FN_REFLEN];
+ const char *ln;
+ ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
+ 1, buf);
+
+ /*
+ note, that if open() fails, we'll still have index file open
+ but a destructor will take care of that
+ */
+ if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
+ rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
+ (max_relay_log_size ? max_relay_log_size :
+ max_binlog_size), 1))
+ {
+ pthread_mutex_unlock(&rli->data_lock);
+ sql_print_error("Failed in open_log() called from init_relay_log_info()");
+ DBUG_RETURN(1);
+ }
}
/* if file does not exist */
@@ -1575,7 +1774,7 @@ file '%s', errno %d)", fname, my_errno);
/* Init relay log with first entry in the relay index file */
if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
- &msg))
+ &msg, 0))
{
sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
goto err;
@@ -1640,7 +1839,7 @@ Failed to open the existing relay log info file '%s' (errno %d)",
rli->group_relay_log_name,
rli->group_relay_log_pos,
0 /* no data lock*/,
- &msg))
+ &msg, 0))
{
char llbuf[22];
sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
@@ -1649,8 +1848,18 @@ Failed to open the existing relay log info file '%s' (errno %d)",
goto err;
}
}
- DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
+
+#ifndef DBUG_OFF
+ {
+ char llbuf1[22], llbuf2[22];
+ DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
+ llstr(my_b_tell(rli->cur_log),llbuf1),
+ llstr(rli->event_relay_log_pos,llbuf2)));
+ DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
+ }
+#endif
+
/*
Now change the cache from READ to WRITE - must do this
before flush_relay_log_info
@@ -1798,9 +2007,9 @@ void clear_until_condition(RELAY_LOG_INFO* rli)
int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
- const char* slave_info_fname,
- bool abort_if_no_master_info_file,
- int thread_mask)
+ const char* slave_info_fname,
+ bool abort_if_no_master_info_file,
+ int thread_mask)
{
int fd,error;
char fname[FN_REFLEN+128];
@@ -1814,7 +2023,7 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
last time. If this case pos_in_file would be set and we would
get a crash when trying to read the signature for the binary
relay log.
-
+
We only rewind the read position if we are starting the SQL
thread. The handle_slave_sql thread assumes that the read
position is at the beginning of the file, and will read the
@@ -1840,7 +2049,7 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname,
fd = mi->fd;
/* does master.info exist ? */
-
+
if (access(fname,F_OK))
{
if (abort_if_no_master_info_file)
@@ -1876,7 +2085,7 @@ file '%s')", fname);
{
if (fd >= 0)
reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
- else
+ else
{
if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
{
@@ -1896,52 +2105,52 @@ file '%s')", fname);
mi->fd = fd;
int port, connect_retry, master_log_pos, ssl= 0, lines;
char *first_non_digit;
-
+
/*
Starting from 4.1.x master.info has new format. Now its
- first line contains number of lines in file. By reading this
- number we will be always distinguish to which version our
- master.info corresponds to. We can't simply count lines in
+ first line contains number of lines in file. By reading this
+ number we will be always distinguish to which version our
+ master.info corresponds to. We can't simply count lines in
file since versions before 4.1.x could generate files with more
lines than needed.
- If first line doesn't contain a number or contain number less than
+ If first line doesn't contain a number or contain number less than
14 then such file is treated like file from pre 4.1.1 version.
- There is no ambiguity when reading an old master.info, as before
+ There is no ambiguity when reading an old master.info, as before
4.1.1, the first line contained the binlog's name, which is either
- empty or has an extension (contains a '.'), so can't be confused
+ empty or has an extension (contains a '.'), so can't be confused
with an integer.
- So we're just reading first line and trying to figure which version
+ So we're just reading first line and trying to figure which version
is this.
*/
-
- /*
- The first row is temporarily stored in mi->master_log_name,
- if it is line count and not binlog name (new format) it will be
+
+ /*
+ The first row is temporarily stored in mi->master_log_name,
+ if it is line count and not binlog name (new format) it will be
overwritten by the second row later.
*/
if (init_strvar_from_file(mi->master_log_name,
sizeof(mi->master_log_name), &mi->file,
""))
goto errwithmsg;
-
+
lines= strtoul(mi->master_log_name, &first_non_digit, 10);
- if (mi->master_log_name[0]!='\0' &&
+ if (mi->master_log_name[0]!='\0' &&
*first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
{ // Seems to be new format
- if (init_strvar_from_file(mi->master_log_name,
+ if (init_strvar_from_file(mi->master_log_name,
sizeof(mi->master_log_name), &mi->file, ""))
goto errwithmsg;
}
else
lines= 7;
-
+
if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
master_host) ||
init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
- master_user) ||
+ master_user) ||
init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
&mi->file, master_password) ||
init_intvar_from_file(&port, &mi->file, master_port) ||
@@ -1949,17 +2158,17 @@ file '%s')", fname);
master_connect_retry))
goto errwithmsg;
- /*
- If file has ssl part use it even if we have server without
- SSL support. But these option will be ignored later when
- slave will try connect to master, so in this case warning
+ /*
+ If file has ssl part use it even if we have server without
+ SSL support. But these option will be ignored later when
+ slave will try connect to master, so in this case warning
is printed.
*/
- if (lines >= LINES_IN_MASTER_INFO_WITH_SSL &&
+ if (lines >= LINES_IN_MASTER_INFO_WITH_SSL &&
(init_intvar_from_file(&ssl, &mi->file, master_ssl) ||
- init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca),
+ init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca),
&mi->file, master_ssl_ca) ||
- init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath),
+ init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath),
&mi->file, master_ssl_capath) ||
init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert),
&mi->file, master_ssl_cert) ||
@@ -1970,11 +2179,11 @@ file '%s')", fname);
goto errwithmsg;
#ifndef HAVE_OPENSSL
if (ssl)
- sql_print_error("SSL information in the master info file "
+ sql_print_warning("SSL information in the master info file "
"('%s') are ignored because this MySQL slave was compiled "
"without SSL support.", fname);
#endif /* HAVE_OPENSSL */
-
+
/*
This has to be handled here as init_intvar_from_file can't handle
my_off_t types
@@ -1994,15 +2203,15 @@ file '%s')", fname);
mi->inited = 1;
// now change cache READ -> WRITE - must do this before flush_master_info
- reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1);
+ reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
if ((error=test(flush_master_info(mi, 1))))
sql_print_error("Failed to flush master info file");
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(error);
-
+
errwithmsg:
sql_print_error("Error reading master configuration");
-
+
err:
if (fd >= 0)
{
@@ -2095,7 +2304,7 @@ void table_rule_ent_dynamic_array_to_str(String* s, DYNAMIC_ARRAY* a)
}
}
-int show_master_info(THD* thd, MASTER_INFO* mi)
+bool show_master_info(THD* thd, MASTER_INFO* mi)
{
// TODO: fix this for multi-master
List<Item> field_list;
@@ -2157,8 +2366,9 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10,
MYSQL_TYPE_LONGLONG));
- if (protocol->send_fields(&field_list, 1))
- DBUG_RETURN(-1);
+ if (protocol->send_fields(&field_list,
+ Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
+ DBUG_RETURN(TRUE);
if (mi->host[0])
{
@@ -2271,10 +2481,10 @@ int show_master_info(THD* thd, MASTER_INFO* mi)
pthread_mutex_unlock(&mi->data_lock);
if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
- DBUG_RETURN(-1);
+ DBUG_RETURN(TRUE);
}
send_eof(thd);
- DBUG_RETURN(0);
+ DBUG_RETURN(FALSE);
}
@@ -2344,6 +2554,7 @@ st_relay_log_info::st_relay_log_info()
bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf));
+ cached_charset_invalidate();
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
@@ -2364,6 +2575,7 @@ st_relay_log_info::~st_relay_log_info()
pthread_cond_destroy(&start_cond);
pthread_cond_destroy(&stop_cond);
pthread_cond_destroy(&log_space_cond);
+ relay_log.cleanup();
}
/*
@@ -2401,17 +2613,16 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
ulong init_abort_pos_wait;
int error=0;
struct timespec abstime; // for timeout checking
- set_timespec(abstime,timeout);
-
+ const char *msg;
DBUG_ENTER("wait_for_pos");
- DBUG_PRINT("enter",("group_master_log_name: '%s' pos: %lu timeout: %ld",
- group_master_log_name, (ulong) group_master_log_pos,
- (long) timeout));
+ DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu",
+ log_name->c_ptr(), (ulong) log_pos, (ulong) timeout));
+ set_timespec(abstime,timeout);
pthread_mutex_lock(&data_lock);
- const char *msg= thd->enter_cond(&data_cond, &data_lock,
- "Waiting for the slave SQL thread to "
- "advance position");
+ msg= thd->enter_cond(&data_cond, &data_lock,
+ "Waiting for the slave SQL thread to "
+ "advance position");
/*
This function will abort when it notices that some CHANGE MASTER or
RESET MASTER has changed the master info.
@@ -2468,6 +2679,12 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
bool pos_reached;
int cmp_result= 0;
+ DBUG_PRINT("info",
+ ("init_abort_pos_wait: %ld abort_pos_wait: %ld",
+ init_abort_pos_wait, abort_pos_wait));
+ DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu",
+ group_master_log_name, (ulong) group_master_log_pos));
+
/*
group_master_log_name can be "", if we are just after a fresh
replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
@@ -2550,7 +2767,7 @@ err:
thd->exit_cond(msg);
DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
improper_arguments: %d timed_out: %d",
- (int) thd->killed,
+ thd->killed_errno(),
(int) (init_abort_pos_wait != abort_pos_wait),
(int) slave_running,
(int) (error == -2),
@@ -2563,6 +2780,24 @@ improper_arguments: %d timed_out: %d",
DBUG_RETURN( error ? error : event_count );
}
+void set_slave_thread_options(THD* thd)
+{
+ thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
+ OPTION_AUTO_IS_NULL;
+ thd->variables.completion_type= 0;
+}
+
+void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli)
+{
+ thd->variables.character_set_client=
+ global_system_variables.character_set_client;
+ thd->variables.collation_connection=
+ global_system_variables.collation_connection;
+ thd->variables.collation_server=
+ global_system_variables.collation_server;
+ thd->update_charset();
+ rli->cached_charset_invalidate();
+}
/*
init_slave_thread()
@@ -2573,13 +2808,19 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
DBUG_ENTER("init_slave_thread");
thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
+ /*
+ The two next lines are needed for replication of SP (CREATE PROCEDURE
+ needs a valid user to store in mysql.proc).
+ */
+ thd->priv_user= (char *) "";
+ thd->priv_host[0]= '\0';
thd->host_or_ip= "";
thd->client_capabilities = 0;
my_net_init(&thd->net, 0);
thd->net.read_timeout = slave_net_timeout;
thd->master_access= ~0;
- thd->priv_user = 0;
thd->slave_thread = 1;
+ set_slave_thread_options(thd);
/*
It's nonsense to constrain the slave threads with max_join_size; if a
query succeeded on master, we HAVE to execute it. So set
@@ -2759,8 +3000,7 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
*suppress_warnings= TRUE;
}
else
- sql_print_error("Error reading packet from server: %s (\
-server_errno=%d)",
+ sql_print_error("Error reading packet from server: %s ( server_errno=%d)",
mysql_error(mysql), mysql_errno(mysql));
return packet_error;
}
@@ -2768,8 +3008,8 @@ server_errno=%d)",
/* Check if eof packet */
if (len < 8 && mysql->net.read_pos[0] == 254)
{
- sql_print_error("Slave: received end packet from server, apparent\
- master shutdown: %s",
+ sql_print_information("Slave: received end packet from server, apparent "
+ "master shutdown: %s",
mysql_error(mysql));
return packet_error;
}
@@ -2841,13 +3081,14 @@ bool st_relay_log_info::is_until_satisfied()
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
{
- /*
- We have no cached comaprison results so we should compare log names
- and cache result
+ /*
+ We have no cached comparison results so we should compare log names
+ and cache result.
+ If we are after RESET SLAVE, and the SQL slave thread has not processed
+ any event yet, it could be that group_master_log_name is "". In that case,
+ just wait for more events (as there is no sensible comparison to do).
*/
- DBUG_ASSERT(*log_name || log_pos == 0);
-
if (*log_name)
{
const char *basename= log_name + dirname_length(log_name);
@@ -2883,21 +3124,39 @@ bool st_relay_log_info::is_until_satisfied()
}
+void st_relay_log_info::cached_charset_invalidate()
+{
+ /* Full of zeroes means uninitialized. */
+ bzero(cached_charset, sizeof(cached_charset));
+}
+
+
+bool st_relay_log_info::cached_charset_compare(char *charset)
+{
+ if (bcmp(cached_charset, charset, sizeof(cached_charset)))
+ {
+ memcpy(cached_charset, charset, sizeof(cached_charset));
+ return 1;
+ }
+ return 0;
+}
+
+
static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
{
/*
We acquire this mutex since we need it for all operations except
- event execution. But we will release it in places where we will
+ event execution. But we will release it in places where we will
wait for something for example inside of next_event().
*/
pthread_mutex_lock(&rli->data_lock);
-
- if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE &&
- rli->is_until_satisfied())
+
+ if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE &&
+ rli->is_until_satisfied())
{
sql_print_error("Slave SQL thread stopped because it reached its"
" UNTIL position %ld", (long) rli->until_pos());
- /*
+ /*
Setting abort_slave flag because we do not want additional message about
error in query execution to be printed.
*/
@@ -2905,11 +3164,11 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
pthread_mutex_unlock(&rli->data_lock);
return 1;
}
-
+
Log_event * ev = next_event(rli);
-
+
DBUG_ASSERT(rli->sql_thd==thd);
-
+
if (sql_slave_killed(thd,rli))
{
pthread_mutex_unlock(&rli->data_lock);
@@ -2922,35 +3181,70 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
int exec_res;
/*
- Skip queries originating from this server or number of
- queries specified by the user in slave_skip_counter
- We can't however skip event's that has something to do with the
+ Queries originating from this server must be skipped.
+ Low-level events (Format_desc, Rotate, Stop) from this server
+ must also be skipped. But for those we don't want to modify
+ group_master_log_pos, because these events did not exist on the master.
+ Format_desc is not completely skipped.
+ Skip queries specified by the user in slave_skip_counter.
+ We can't however skip events that has something to do with the
log files themselves.
+ Filtering on own server id is extremely important, to ignore execution of
+ events created by the creation/rotation of the relay log (remember that
+ now the relay log starts with its Format_desc, has a Rotate etc).
*/
- if ((ev->server_id == (uint32) ::server_id && !replicate_same_server_id) ||
- (rli->slave_skip_counter && type_code != ROTATE_EVENT))
+ DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id));
+
+ if ((ev->server_id == (uint32) ::server_id &&
+ !replicate_same_server_id &&
+ type_code != FORMAT_DESCRIPTION_EVENT) ||
+ (rli->slave_skip_counter &&
+ type_code != ROTATE_EVENT && type_code != STOP_EVENT &&
+ type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
{
- /* TODO: I/O thread should not even log events with the same server id */
- rli->inc_group_relay_log_pos(ev->get_event_len(),
- type_code != STOP_EVENT ? ev->log_pos : LL(0),
- 1/* skip lock*/);
- flush_relay_log_info(rli);
+ DBUG_PRINT("info", ("event skipped"));
+ if (thd->options & OPTION_BEGIN)
+ rli->inc_event_relay_log_pos();
+ else
+ {
+ rli->inc_group_relay_log_pos((type_code == ROTATE_EVENT ||
+ type_code == STOP_EVENT ||
+ type_code == FORMAT_DESCRIPTION_EVENT) ?
+ LL(0) : ev->log_pos,
+ 1/* skip lock*/);
+ flush_relay_log_info(rli);
+ }
/*
- Protect against common user error of setting the counter to 1
- instead of 2 while recovering from an failed auto-increment insert
+ Protect against common user error of setting the counter to 1
+ instead of 2 while recovering from an insert which used auto_increment,
+ rand or user var.
*/
- if (rli->slave_skip_counter &&
- !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
- rli->slave_skip_counter == 1))
+ if (rli->slave_skip_counter &&
+ !((type_code == INTVAR_EVENT ||
+ type_code == RAND_EVENT ||
+ type_code == USER_VAR_EVENT) &&
+ rli->slave_skip_counter == 1) &&
+ /*
+ The events from ourselves which have something to do with the relay
+ log itself must be skipped, true, but they mustn't decrement
+ rli->slave_skip_counter, because the user is supposed to not see
+ these events (they are not in the master's binlog) and if we
+ decremented, START SLAVE would for example decrement when it sees
+ the Rotate, so the event which the user probably wanted to skip
+ would not be skipped.
+ */
+ !(ev->server_id == (uint32) ::server_id &&
+ (type_code == ROTATE_EVENT || type_code == STOP_EVENT ||
+ type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT)))
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
- delete ev;
- return 0; // avoid infinite update loops
- }
+ delete ev;
+ return 0; // avoid infinite update loops
+ }
pthread_mutex_unlock(&rli->data_lock);
-
+
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
thd->lex->current_select= 0;
@@ -2959,7 +3253,16 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
ev->thd = thd;
exec_res = ev->exec_event(rli);
DBUG_ASSERT(rli->sql_thd==thd);
- delete ev;
+ /*
+ Format_description_log_event should not be deleted because it will be
+ used to read info about the relay log's format; it will be deleted when
+ the SQL thread does not need it, i.e. when this thread terminates.
+ */
+ if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ {
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ delete ev;
+ }
if (slave_trans_retries)
{
if (exec_res &&
@@ -2988,7 +3291,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
else if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
- 1, &errmsg))
+ 1, &errmsg, 1))
sql_print_error("Error initializing relay log position: %s",
errmsg);
else
@@ -3039,17 +3342,17 @@ extern "C" pthread_handler_decl(handle_slave_io,arg)
{
THD *thd; // needs to be first for thread_stack
MYSQL *mysql;
- MASTER_INFO *mi = (MASTER_INFO*)arg;
+ MASTER_INFO *mi = (MASTER_INFO*)arg;
char llbuff[22];
uint retry_count;
-
+
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
my_thread_init();
DBUG_ENTER("handle_slave_io");
#ifndef DBUG_OFF
slave_begin:
-#endif
+#endif
DBUG_ASSERT(mi->inited);
mysql= NULL ;
retry_count= 0;
@@ -3058,10 +3361,10 @@ slave_begin:
/* Inform waiting threads that slave has started */
mi->slave_run_id++;
-#ifndef DBUG_OFF
+#ifndef DBUG_OFF
mi->events_till_abort = abort_slave_event_count;
-#endif
-
+#endif
+
thd= new THD; // note that contructor of THD uses DBUG_ !
THD_CHECK_SENTRY(thd);
@@ -3082,17 +3385,16 @@ slave_begin:
mi->abort_slave = 0;
pthread_mutex_unlock(&mi->run_lock);
pthread_cond_broadcast(&mi->start_cond);
-
+
DBUG_PRINT("master_info",("log_file_name: '%s' position: %s",
mi->master_log_name,
llstr(mi->master_log_pos,llbuff)));
-
+
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
sql_print_error("Slave I/O thread: error in mysql_init()");
goto err;
}
-
thd->proc_info = "Connecting to master";
// we can get killed during safe_connect
@@ -3104,7 +3406,7 @@ slave_begin:
llstr(mi->master_log_pos,llbuff));
else
{
- sql_print_error("Slave I/O thread killed while connecting to master");
+ sql_print_information("Slave I/O thread killed while connecting to master");
goto err;
}
@@ -3116,7 +3418,8 @@ connected:
thd->proc_info = "Checking master version";
if (get_master_version_and_clock(mysql, mi))
goto err;
- if (!mi->old_format)
+
+ if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
{
/*
Register ourselves with the master.
@@ -3127,22 +3430,22 @@ connected:
if (register_slave_on_master(mysql) || update_slave_list(mysql, mi))
goto err;
}
-
+
DBUG_PRINT("info",("Starting reading binary log from master"));
while (!io_slave_killed(thd,mi))
{
- bool suppress_warnings= 0;
+ bool suppress_warnings= 0;
thd->proc_info = "Requesting binlog dump";
if (request_dump(mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (io_slave_killed(thd,mi))
{
- sql_print_error("Slave I/O thread killed while requesting master \
+ sql_print_information("Slave I/O thread killed while requesting master \
dump");
goto err;
}
-
+
mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
thd->proc_info= "Waiting to reconnect after a failed binlog dump request";
#ifdef SIGNAL_WITH_VIO_CLOSE
@@ -3163,7 +3466,7 @@ dump");
}
if (io_slave_killed(thd,mi))
{
- sql_print_error("Slave I/O thread killed while retrying master \
+ sql_print_information("Slave I/O thread killed while retrying master \
dump");
goto err;
}
@@ -3176,7 +3479,7 @@ reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
io_slave_killed(thd,mi))
{
- sql_print_error("Slave I/O thread killed during or \
+ sql_print_information("Slave I/O thread killed during or \
after reconnect");
goto err;
}
@@ -3186,22 +3489,22 @@ after reconnect");
while (!io_slave_killed(thd,mi))
{
- bool suppress_warnings= 0;
- /*
+ bool suppress_warnings= 0;
+ /*
We say "waiting" because read_event() will wait if there's nothing to
- read. But if there's something to read, it will not wait. The important
- thing is to not confuse users by saying "reading" whereas we're in fact
- receiving nothing.
+ read. But if there's something to read, it will not wait. The
+ important thing is to not confuse users by saying "reading" whereas
+ we're in fact receiving nothing.
*/
thd->proc_info = "Waiting for master to send event";
ulong event_len = read_event(mysql, mi, &suppress_warnings);
if (io_slave_killed(thd,mi))
{
if (global_system_variables.log_warnings)
- sql_print_error("Slave I/O thread killed while reading event");
+ sql_print_information("Slave I/O thread killed while reading event");
goto err;
}
-
+
if (event_len == packet_error)
{
uint mysql_error_number= mysql_errno(mysql);
@@ -3232,30 +3535,30 @@ max_allowed_packet",
goto err; // Don't retry forever
safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
(void*) mi);
- }
+ }
if (io_slave_killed(thd,mi))
{
if (global_system_variables.log_warnings)
- sql_print_error("Slave I/O thread killed while waiting to \
+ sql_print_information("Slave I/O thread killed while waiting to \
reconnect after a failed read");
goto err;
}
thd->proc_info = "Reconnecting after a failed master event read";
if (!suppress_warnings)
- sql_print_error("Slave I/O thread: Failed reading log event, \
+ sql_print_information("Slave I/O thread: Failed reading log event, \
reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
llstr(mi->master_log_pos, llbuff));
if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
io_slave_killed(thd,mi))
{
if (global_system_variables.log_warnings)
- sql_print_error("Slave I/O thread killed during or after a \
+ sql_print_information("Slave I/O thread killed during or after a \
reconnect done to recover from failed read");
goto err;
}
goto connected;
} // if (event_len == packet_error)
-
+
retry_count=0; // ok event, reset retry counter
thd->proc_info = "Queueing master event to the relay log";
if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
@@ -3311,7 +3614,7 @@ log space");
// error = 0;
err:
// print the current replication position
- sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
+ sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->query = thd->db = 0; // extra safety
@@ -3326,6 +3629,9 @@ err:
pthread_mutex_lock(&mi->run_lock);
mi->slave_running = 0;
mi->io_thd = 0;
+ /* Forget the relay log's format */
+ delete mi->rli.relay_log.description_event_for_queue;
+ mi->rli.relay_log.description_event_for_queue= 0;
// TODO: make rpl_status part of MASTER_INFO
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
mi->abort_slave = 0; // TODO: check if this is needed
@@ -3430,15 +3736,38 @@ slave_begin:
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
- 1 /*need data lock*/, &errmsg))
+ 1 /*need data lock*/, &errmsg,
+ 1 /*look for a description_event*/))
{
sql_print_error("Error initializing relay log position: %s",
errmsg);
goto err;
}
THD_CHECK_SENTRY(thd);
- DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
+#ifndef DBUG_OFF
+ {
+ char llbuf1[22], llbuf2[22];
+ DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
+ llstr(my_b_tell(rli->cur_log),llbuf1),
+ llstr(rli->event_relay_log_pos,llbuf2)));
+ DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+ /*
+ Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
+ correct position when it's called just after my_b_seek() (the questionable
+ stuff is those "seek is done on next read" comments in the my_b_seek()
+ source code).
+ The crude reality is that this assertion randomly fails whereas
+ replication seems to work fine. And there is no easy explanation why it
+ fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
+ init_relay_log_pos() called above). Maybe the assertion would be
+ meaningful if we held rli->data_lock between the my_b_seek() and the
+ DBUG_ASSERT().
+ */
+#ifdef SHOULD_BE_CHECKED
+ DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
+#endif
+ }
+#endif
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
@@ -3482,12 +3811,18 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
}
/* Thread stopped. Print the current replication position to the log */
- sql_print_information("Slave SQL thread exiting, replication stopped in log \
- '%s' at position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
+ sql_print_information("Slave SQL thread exiting, replication stopped in log "
+ "'%s' at position %s",
+ RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
err:
VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query = thd->db = 0; // extra safety
+ /*
+ Some extra safety, which should not been needed (normally, event deletion
+ should already have done these assignments (each event which sets these
+ variables is supposed to set them to 0 before terminating)).
+ */
+ thd->query= thd->db= thd->catalog= 0;
thd->query_length= thd->db_length= 0;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
thd->proc_info = "Waiting for slave mutex on exit";
@@ -3497,16 +3832,16 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
/* When master_pos_wait() wakes up it will check this and terminate */
rli->slave_running= 0;
- /*
- Going out of the transaction. Necessary to mark it, in case the user
- restarts replication from a non-transactional statement (with CHANGE
- MASTER).
- */
+ /* Forget the relay log's format */
+ delete rli->relay_log.description_event_for_exec;
+ rli->relay_log.description_event_for_exec= 0;
/* Wake up master_pos_wait() */
pthread_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
pthread_cond_broadcast(&rli->data_cond);
rli->ignore_log_space_limit= 0; /* don't need any lock */
+ /* we die so won't remember charset - re-update them on next thread start */
+ rli->cached_charset_invalidate();
rli->save_temporary_tables = thd->temporary_tables;
/*
@@ -3599,7 +3934,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
if (unlikely(cev_not_written))
break;
Execute_load_log_event xev(thd,0,0);
- xev.log_pos = mi->master_log_pos;
+ xev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&xev)))
{
sql_print_error("Slave I/O: error writing Exec_load event to \
@@ -3613,7 +3948,6 @@ relay log");
{
cev->block = (char*)net->read_pos;
cev->block_len = num_bytes;
- cev->log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(cev)))
{
sql_print_error("Slave I/O: error writing Create_file event to \
@@ -3627,7 +3961,7 @@ relay log");
{
aev.block = (char*)net->read_pos;
aev.block_len = num_bytes;
- aev.log_pos = mi->master_log_pos;
+ aev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&aev)))
{
sql_print_error("Slave I/O: error writing Append_block event to \
@@ -3655,6 +3989,7 @@ err:
DESCRIPTION
Updates the master info with the place in the next binary
log where we should start reading.
+ Rotate the relay log to avoid mixed-format relay logs.
NOTES
We assume we already locked mi->data_lock
@@ -3685,21 +4020,34 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
if (disconnect_slave_event_count)
events_till_disconnect++;
#endif
+
+ /*
+ If description_event_for_queue is format <4, there is conversion in the
+ relay log to the slave's format (4). And Rotate can mean upgrade or
+ nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
+ no need to reset description_event_for_queue now. And if it's nothing (same
+ master version as before), no need (still using the slave's format).
+ */
+ if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
+ {
+ delete mi->rli.relay_log.description_event_for_queue;
+ /* start from format 3 (MySQL 4.0) again */
+ mi->rli.relay_log.description_event_for_queue= new
+ Format_description_log_event(3);
+ }
+ /*
+ Rotate the relay log makes binlog format detection easier (at next slave
+ start or mysqlbinlog)
+ */
+ rotate_relay_log(mi); /* will take the right mutexes */
DBUG_RETURN(0);
}
-
/*
- queue_old_event()
-
- Writes a 3.23 event to the relay log.
-
- TODO:
- Test this code before release - it has to be tested on a separate
- setup with 3.23 master
+ Reads a 3.23 event and converts it to the slave's format. This code was
+ copied from MySQL 4.0.
*/
-
-static int queue_old_event(MASTER_INFO *mi, const char *buf,
+static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
ulong event_len)
{
const char *errmsg = 0;
@@ -3707,7 +4055,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
bool ignore_event= 0;
char *tmp_buf = 0;
RELAY_LOG_INFO *rli= &mi->rli;
- DBUG_ENTER("queue_old_event");
+ DBUG_ENTER("queue_binlog_ver_1_event");
/*
If we get Load event, we need to pass a non-reusable buffer
@@ -3739,7 +4087,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
connected to the master).
*/
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
- 1 /*old format*/ );
+ mi->rli.relay_log.description_event_for_queue);
if (unlikely(!ev))
{
sql_print_error("Read invalid event from master: '%s',\
@@ -3749,7 +4097,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_RETURN(1);
}
pthread_mutex_lock(&mi->data_lock);
- ev->log_pos = mi->master_log_pos;
+ ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
switch (ev->get_type_code()) {
case STOP_EVENT:
ignore_event= 1;
@@ -3773,14 +4121,12 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
*/
{
/* We come here when and only when tmp_buf != 0 */
- DBUG_ASSERT(tmp_buf);
+ DBUG_ASSERT(tmp_buf != 0);
+ inc_pos=event_len;
+ ev->log_pos+= inc_pos;
int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev;
- /*
- We had incremented event_len, but now when it is used to calculate the
- position in the master's log, we must use the original value.
- */
- mi->master_log_pos += --event_len;
+ mi->master_log_pos += inc_pos;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
pthread_mutex_unlock(&mi->data_lock);
my_free((char*)tmp_buf, MYF(0));
@@ -3792,6 +4138,12 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
}
if (likely(!ignore_event))
{
+ if (ev->log_pos)
+ /*
+ Don't do it for fake Rotate events (see comment in
+ Log_event::Log_event(const char* buf...) in log_event.cc).
+ */
+ ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
if (unlikely(rli->relay_log.append(ev)))
{
delete ev;
@@ -3807,10 +4159,98 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_RETURN(0);
}
+/*
+ Reads a 4.0 event and converts it to the slave's format. This code was copied
+ from queue_binlog_ver_1_event(), with some affordable simplifications.
+*/
+static int queue_binlog_ver_3_event(MASTER_INFO *mi, const char *buf,
+ ulong event_len)
+{
+ const char *errmsg = 0;
+ ulong inc_pos;
+ char *tmp_buf = 0;
+ RELAY_LOG_INFO *rli= &mi->rli;
+ DBUG_ENTER("queue_binlog_ver_3_event");
+
+ /* read_log_event() will adjust log_pos to be end_log_pos */
+ Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
+ mi->rli.relay_log.description_event_for_queue);
+ if (unlikely(!ev))
+ {
+ sql_print_error("Read invalid event from master: '%s',\
+ master could be corrupt but a more likely cause of this is a bug",
+ errmsg);
+ my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_RETURN(1);
+ }
+ pthread_mutex_lock(&mi->data_lock);
+ switch (ev->get_type_code()) {
+ case STOP_EVENT:
+ goto err;
+ case ROTATE_EVENT:
+ if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
+ {
+ delete ev;
+ pthread_mutex_unlock(&mi->data_lock);
+ DBUG_RETURN(1);
+ }
+ inc_pos= 0;
+ break;
+ default:
+ inc_pos= event_len;
+ break;
+ }
+ if (unlikely(rli->relay_log.append(ev)))
+ {
+ delete ev;
+ pthread_mutex_unlock(&mi->data_lock);
+ DBUG_RETURN(1);
+ }
+ rli->relay_log.harvest_bytes_written(&rli->log_space_total);
+ delete ev;
+ mi->master_log_pos+= inc_pos;
+err:
+ DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
+ pthread_mutex_unlock(&mi->data_lock);
+ DBUG_RETURN(0);
+}
+
+/*
+ queue_old_event()
+
+ Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
+ (exactly, slave's) format. To do the conversion, we create a 5.0 event from
+ the 3.23/4.0 bytes, then write this event to the relay log.
+
+ TODO:
+ Test this code before release - it has to be tested on a separate
+ setup with 3.23 master or 4.0 master
+*/
+
+static int queue_old_event(MASTER_INFO *mi, const char *buf,
+ ulong event_len)
+{
+ switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
+ {
+ case 1:
+ return queue_binlog_ver_1_event(mi,buf,event_len);
+ case 3:
+ return queue_binlog_ver_3_event(mi,buf,event_len);
+ default: /* unsupported format; eg version 2 */
+ DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
+ mi->rli.relay_log.description_event_for_queue->binlog_version));
+ return 1;
+ }
+}
/*
queue_event()
+ If the event is 3.23/4.0, passes it to queue_old_event() which will convert
+ it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
+ no format conversion, it's pure read/write of bytes.
+ So a 5.0.0 slave's relay log can contain events in the slave's format or in
+ any >=5.0.0 format.
*/
int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
@@ -3820,7 +4260,8 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
RELAY_LOG_INFO *rli= &mi->rli;
DBUG_ENTER("queue_event");
- if (mi->old_format)
+ if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
+ buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
pthread_mutex_lock(&mi->data_lock);
@@ -3837,7 +4278,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
master server shutdown. The only thing this does is cleaning. But
cleaning is already done on a per-master-thread basis (as the master
server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
- and DO RELEASE_LOCK; prepared statements' deletion are TODO).
+ prepared statements' deletion are TODO only when we binlog prep stmts).
We don't even increment mi->master_log_pos, because we may be just after
a Rotate event. Btw, in a few milliseconds we are going to have a Start
@@ -3847,7 +4288,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
goto err;
case ROTATE_EVENT:
{
- Rotate_log_event rev(buf,event_len,0);
+ Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
if (unlikely(process_io_rotate(mi,&rev)))
{
error= 1;
@@ -3860,6 +4301,42 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
inc_pos= 0;
break;
}
+ case FORMAT_DESCRIPTION_EVENT:
+ {
+ /*
+ Create an event, and save it (when we rotate the relay log, we will have
+ to write this event again).
+ */
+ /*
+ We are the only thread which reads/writes description_event_for_queue.
+ The relay_log struct does not move (though some members of it can
+ change), so we needn't any lock (no rli->data_lock, no log lock).
+ */
+ Format_description_log_event* tmp;
+ const char* errmsg;
+ if (!(tmp= (Format_description_log_event*)
+ Log_event::read_log_event(buf, event_len, &errmsg,
+ mi->rli.relay_log.description_event_for_queue)))
+ {
+ error= 2;
+ goto err;
+ }
+ delete mi->rli.relay_log.description_event_for_queue;
+ mi->rli.relay_log.description_event_for_queue= tmp;
+ /*
+ Though this does some conversion to the slave's format, this will
+ preserve the master's binlog format version, and number of event types.
+ */
+ /*
+ If the event was not requested by the slave (the slave did not ask for
+ it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
+ */
+ inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
+ DBUG_PRINT("info",("binlog format is now %d",
+ mi->rli.relay_log.description_event_for_queue->binlog_version));
+
+ }
+ break;
default:
inc_pos= event_len;
break;
@@ -3886,23 +4363,32 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
We still want to increment, so that we won't re-read this event from the
master if the slave IO thread is now stopped/restarted (more efficient if
the events we are ignoring are big LOAD DATA INFILE).
+ But events which were generated by this slave and which do not exist in
+ the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
+ mi->master_log_pos.
*/
- mi->master_log_pos+= inc_pos;
+ if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
+ buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
+ buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
+ mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos));
}
else
{
/* write the event to the relay log */
- if (likely(!(error= rli->relay_log.appendv(buf,event_len,0))))
+ if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
{
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
}
+ else
+ error=3;
}
err:
pthread_mutex_unlock(&mi->data_lock);
+ DBUG_PRINT("info", ("error=%d", error));
DBUG_RETURN(error);
}
@@ -3927,6 +4413,7 @@ void end_relay_log_info(RELAY_LOG_INFO* rli)
}
rli->inited = 0;
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
+ rli->relay_log.harvest_bytes_written(&rli->log_space_total);
/*
Delete the slave's temporary tables from memory.
In the future there will be other actions than this, to ensure persistance
@@ -4055,6 +4542,7 @@ replication resumed in log '%s' at position %s", mi->user,
thd->set_active_vio(mysql->net.vio);
#endif
}
+ mysql->reconnect= 1;
DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
DBUG_RETURN(slave_was_killed);
}
@@ -4148,6 +4636,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
relay_log_pos Current log pos
pending Number of bytes already processed from the event
*/
+ rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
my_b_seek(cur_log,rli->event_relay_log_pos);
DBUG_RETURN(cur_log);
}
@@ -4206,28 +4695,40 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
hot_log=0; // Using old binary log
}
}
+
#ifndef DBUG_OFF
{
+ /* This is an assertion which sometimes fails, let's try to track it */
char llbuf1[22], llbuf2[22];
+ DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
+ llstr(my_b_tell(cur_log),llbuf1),
+ llstr(rli->event_relay_log_pos,llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- /*
- The next assertion sometimes (very rarely) fails, let's try to track
- it
- */
- DBUG_PRINT("info", ("\
-Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
- llstr(my_b_tell(cur_log),llbuf1),
- llstr(rli->group_relay_log_pos,llbuf2)));
- DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
+ DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
}
#endif
/*
Relay log is always in new format - if the master is 3.23, the
- I/O thread will convert the format for us
+ I/O thread will convert the format for us.
+ A problem: the description event may be in a previous relay log. So if
+ the slave has been shutdown meanwhile, we would have to look in old relay
+ logs, which may even have been deleted. So we need to write this
+ description event at the beginning of the relay log.
+ When the relay log is created when the I/O thread starts, easy: the
+ master will send the description event and we will queue it.
+ But if the relay log is created by new_file(): then the solution is:
+ MYSQL_LOG::open() will write the buffered description event.
*/
- if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
+ if ((ev=Log_event::read_log_event(cur_log,0,
+ rli->relay_log.description_event_for_exec)))
+
{
DBUG_ASSERT(thd==rli->sql_thd);
+ /*
+ read it while we have a lock, to avoid a mutex lock in
+ inc_event_relay_log_pos()
+ */
+ rli->future_event_relay_log_pos= my_b_tell(cur_log);
if (hot_log)
pthread_mutex_unlock(log_lock);
DBUG_RETURN(ev);
@@ -4384,8 +4885,8 @@ Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
{
#ifdef EXTRA_DEBUG
if (global_system_variables.log_warnings)
- sql_print_error("next log '%s' is currently active",
- rli->linfo.log_file_name);
+ sql_print_information("next log '%s' is currently active",
+ rli->linfo.log_file_name);
#endif
rli->cur_log= cur_log= rli->relay_log.get_log_file();
rli->cur_log_old_open_count= rli->relay_log.get_open_count();
@@ -4414,8 +4915,8 @@ Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
*/
#ifdef EXTRA_DEBUG
if (global_system_variables.log_warnings)
- sql_print_error("next log '%s' is not active",
- rli->linfo.log_file_name);
+ sql_print_information("next log '%s' is not active",
+ rli->linfo.log_file_name);
#endif
// open_binlog() will check the magic header
if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
@@ -4441,7 +4942,11 @@ event(errno: %d cur_log->error: %d)",
}
}
if (!errmsg && global_system_variables.log_warnings)
- errmsg = "slave SQL thread was killed";
+ {
+ sql_print_information("Error reading relay log event: %s",
+ "slave SQL thread was killed");
+ DBUG_RETURN(0);
+ }
err:
if (errmsg)
@@ -4461,8 +4966,9 @@ void rotate_relay_log(MASTER_INFO* mi)
DBUG_ENTER("rotate_relay_log");
RELAY_LOG_INFO* rli= &mi->rli;
- lock_slave_threads(mi);
- pthread_mutex_lock(&rli->data_lock);
+ /* We don't lock rli->run_lock. This would lead to deadlocks. */
+ pthread_mutex_lock(&mi->run_lock);
+
/*
We need to test inited because otherwise, new_file() will attempt to lock
LOCK_log, which may not be inited (if we're not a slave).
@@ -4491,8 +4997,7 @@ void rotate_relay_log(MASTER_INFO* mi)
*/
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
end:
- pthread_mutex_unlock(&rli->data_lock);
- unlock_slave_threads(mi);
+ pthread_mutex_unlock(&mi->run_lock);
DBUG_VOID_RETURN;
}