diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 1081 |
1 files changed, 793 insertions, 288 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 605f8289946..70803c88c3a 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1,15 +1,15 @@ /* Copyright (C) 2000-2003 MySQL AB - + This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. - + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - + You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ @@ -75,7 +75,6 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, const char* table_name, bool overwrite); static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi); - /* Find out which replications threads are running @@ -160,7 +159,7 @@ int init_slave() sql_print_error("Failed to allocate memory for the master info structure"); goto err; } - + if (init_master_info(active_mi,master_info_file,relay_log_info_file, !master_host, (SLAVE_IO | SLAVE_SQL))) { @@ -219,6 +218,13 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, pos Position in relay log file need_data_lock Set to 1 if this functions should do mutex locks errmsg Store pointer to error message here + look_for_description_event + 1 if we should look for such an event. We only need + this when the SQL thread starts and opens an existing + relay log and has to execute it (possibly from an + offset >4); then we need to read the first event of + the relay log to be able to parse the events we have + to execute. DESCRIPTION - Close old open relay log files. @@ -236,15 +242,35 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len, int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, ulonglong pos, bool need_data_lock, - const char** errmsg) + const char** errmsg, + bool look_for_description_event) { DBUG_ENTER("init_relay_log_pos"); + DBUG_PRINT("info", ("pos=%lu", pos)); *errmsg=0; pthread_mutex_t *log_lock=rli->relay_log.get_log_lock(); if (need_data_lock) pthread_mutex_lock(&rli->data_lock); + + /* + Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER + is, too, and init_slave() too; these 2 functions allocate a description + event in init_relay_log_pos, which is not freed by the terminating SQL slave + thread as that thread is not started by these functions. So we have to free + the description_event here, in case, so that there is no memory leak in + running, say, CHANGE MASTER. + */ + delete rli->relay_log.description_event_for_exec; + /* + By default the relay log is in binlog format 3 (4.0). + Even if format is 4, this will work enough to read the first event + (Format_desc) (remember that format 4 is just lenghtened compared to format + 3; format 3 is a prefix of format 4). + */ + rli->relay_log.description_event_for_exec= new + Format_description_log_event(3); pthread_mutex_lock(log_lock); @@ -284,9 +310,8 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, In this case, we will use the same IO_CACHE pointer to read data as the IO thread is using to write data. */ - rli->cur_log= rli->relay_log.get_log_file(); - if (my_b_tell(rli->cur_log) == 0 && - check_binlog_magic(rli->cur_log, errmsg)) + my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0); + if (check_binlog_magic(rli->cur_log,errmsg)) goto err; rli->cur_log_old_open_count=rli->relay_log.get_open_count(); } @@ -300,8 +325,85 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log, goto err; rli->cur_log = &rli->cache_buf; } - if (pos >= BIN_LOG_HEADER_SIZE) + /* + In all cases, check_binlog_magic() has been called so we're at offset 4 for + sure. + */ + if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */ + { + Log_event* ev; + while (look_for_description_event) + { + /* + Read the possible Format_description_log_event; if position + was 4, no need, it will be read naturally. + */ + DBUG_PRINT("info",("looking for a Format_description_log_event")); + + if (my_b_tell(rli->cur_log) >= pos) + break; + + /* + Because of we have rli->data_lock and log_lock, we can safely read an + event + */ + if (!(ev=Log_event::read_log_event(rli->cur_log,0, + rli->relay_log.description_event_for_exec))) + { + DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d", + rli->cur_log->error)); + if (rli->cur_log->error) /* not EOF */ + { + *errmsg= "I/O error reading event at position 4"; + goto err; + } + break; + } + else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) + { + DBUG_PRINT("info",("found Format_description_log_event")); + delete rli->relay_log.description_event_for_exec; + rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev; + /* + As ev was returned by read_log_event, it has passed is_valid(), so + my_malloc() in ctor worked, no need to check again. + */ + /* + Ok, we found a Format_description event. But it is not sure that this + describes the whole relay log; indeed, one can have this sequence + (starting from position 4): + Format_desc (of slave) + Rotate (of master) + Format_desc (of master) + So the Format_desc which really describes the rest of the relay log + is the 3rd event (it can't be further than that, because we rotate + the relay log when we queue a Rotate event from the master). + But what describes the Rotate is the first Format_desc. + So what we do is: + go on searching for Format_description events, until you exceed the + position (argument 'pos') or until you find another event than Rotate + or Format_desc. + */ + } + else + { + DBUG_PRINT("info",("found event of another type=%d", + ev->get_type_code())); + look_for_description_event= (ev->get_type_code() == ROTATE_EVENT); + delete ev; + } + } my_b_seek(rli->cur_log,(off_t)pos); +#ifndef DBUG_OFF + { + char llbuf1[22], llbuf2[22]; + DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s", + llstr(my_b_tell(rli->cur_log),llbuf1), + llstr(rli->event_relay_log_pos,llbuf2))); + } +#endif + + } err: /* @@ -316,13 +418,15 @@ err: if (need_data_lock) pthread_mutex_unlock(&rli->data_lock); + if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg) + *errmsg= "Invalid Format_description log event; could be out of memory"; DBUG_RETURN ((*errmsg) ? 1 : 0); } /* - Init functio to set up array for errors that should be skipped for slave + Init function to set up array for errors that should be skipped for slave SYNOPSIS init_slave_skip_errors() @@ -361,16 +465,15 @@ void init_slave_skip_errors(const char* arg) } -void st_relay_log_info::inc_group_relay_log_pos(ulonglong val, - ulonglong log_pos, - bool skip_lock) +void st_relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, + bool skip_lock) { if (!skip_lock) pthread_mutex_lock(&data_lock); - inc_event_relay_log_pos(val); + inc_event_relay_log_pos(); group_relay_log_pos= event_relay_log_pos; strmake(group_relay_log_name,event_relay_log_name, - sizeof(group_relay_log_name)-1); + sizeof(group_relay_log_name)-1); notify_group_relay_log_name_update(); @@ -384,24 +487,31 @@ void st_relay_log_info::inc_group_relay_log_pos(ulonglong val, not advance as it should on the non-transactional slave (it advances by big leaps, whereas it should advance by small leaps). */ + /* + In 4.x we used the event's len to compute the positions here. This is + wrong if the event was 3.23/4.0 and has been converted to 5.0, because + then the event's len is not what is was in the master's binlog, so this + will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0 + replication: Exec_master_log_pos is wrong). Only way to solve this is to + have the original offset of the end of the event the relay log. This is + what we do in 5.0: log_pos has become "end_log_pos" (because the real use + of log_pos in 4.0 was to compute the end_log_pos; so better to store + end_log_pos instead of begin_log_pos. + If we had not done this fix here, the problem would also have appeared + when the slave and master are 5.0 but with different event length (for + example the slave is more recent than the master and features the event + UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in + SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this + value which would lead to badly broken replication. + Even the relay_log_pos will be corrupted in this case, because the len is + the relay log is not "val". + With the end_log_pos solution, we avoid computations involving lengthes. + */ + DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", + (long) log_pos, (long) group_master_log_pos)); if (log_pos) // 3.23 binlogs don't have log_posx { -#if MYSQL_VERSION_ID < 50000 - /* - If the event was converted from a 3.23 format, get_event_len() has - grown by 6 bytes (at least for most events, except LOAD DATA INFILE - which is already a big problem for 3.23->4.0 replication); 6 bytes is - the difference between the header's size in 4.0 (LOG_EVENT_HEADER_LEN) - and the header's size in 3.23 (OLD_HEADER_LEN). Note that using - mi->old_format will not help if the I/O thread has not started yet. - Yes this is a hack but it's just to make 3.23->4.x replication work; - 3.23->5.0 replication is working much better. - */ - group_master_log_pos= log_pos + val - - (mi->old_format ? (LOG_EVENT_HEADER_LEN - OLD_HEADER_LEN) : 0); -#else - group_master_log_pos= log_pos+ val; -#endif /* MYSQL_VERSION_ID < 5000 */ + group_master_log_pos= log_pos; } pthread_cond_broadcast(&data_cond); if (!skip_lock) @@ -442,9 +552,9 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, /* Even if rli->inited==0, we still try to empty rli->master_log_* variables. Indeed, rli->inited==0 does not imply that they already are empty. - It could be that slave's info initialization partly succeeded : + It could be that slave's info initialization partly succeeded : for example if relay-log.info existed but *relay-bin*.* - have been manually removed, init_relay_log_info reads the old + have been manually removed, init_relay_log_info reads the old relay-log.info and fills rli->master_log_*, then init_relay_log_info checks for the existence of the relay log, this fails and init_relay_log_info leaves rli->inited to 0. @@ -453,7 +563,7 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, MASTER, the callers of purge_relay_logs, will delete bogus *.info files or replace them with correct files), however if the user does SHOW SLAVE STATUS before START SLAVE, he will see old, confusing rli->master_log_*. - In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS + In other words, we reinit rli->master_log_* for SHOW SLAVE STATUS to display fine in any case. */ @@ -482,13 +592,16 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, sizeof(rli->group_relay_log_name)-1); strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(), sizeof(rli->event_relay_log_name)-1); - // Just first log with magic number and nothing else - rli->log_space_total= BIN_LOG_HEADER_SIZE; rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; - rli->relay_log.reset_bytes_written(); + if (count_relay_log_space(rli)) + { + *errmsg= "Error counting relay log space"; + goto err; + } if (!just_reset) - error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, - 0 /* do not need data lock */, errmsg); + error= init_relay_log_pos(rli, rli->group_relay_log_name, + rli->group_relay_log_pos, + 0 /* do not need data lock */, errmsg, 0); err: #ifndef DBUG_OFF @@ -548,24 +661,26 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock, pthread_cond_t* term_cond, volatile uint *slave_running) { + DBUG_ENTER("terminate_slave_thread"); if (term_lock) { pthread_mutex_lock(term_lock); if (!*slave_running) { pthread_mutex_unlock(term_lock); - return ER_SLAVE_NOT_RUNNING; + DBUG_RETURN(ER_SLAVE_NOT_RUNNING); } } DBUG_ASSERT(thd != 0); + THD_CHECK_SENTRY(thd); /* - Is is criticate to test if the slave is running. Otherwise, we might + Is is critical to test if the slave is running. Otherwise, we might be referening freed memory trying to kick it */ - THD_CHECK_SENTRY(thd); while (*slave_running) // Should always be true { + DBUG_PRINT("loop", ("killing slave thread")); KICK_SLAVE(thd); /* There is a small chance that slave thread might miss the first @@ -577,7 +692,7 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock, } if (term_lock) pthread_mutex_unlock(term_lock); - return 0; + DBUG_RETURN(0); } @@ -636,7 +751,7 @@ int start_slave_thread(pthread_handler h_func, pthread_mutex_t *start_lock, thd->exit_cond(old_msg); pthread_mutex_lock(cond_lock); // re-acquire it as exit_cond() released if (thd->killed) - DBUG_RETURN(ER_SERVER_SHUTDOWN); + DBUG_RETURN(thd->killed_errno()); } } if (start_lock) @@ -737,7 +852,7 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len) SYNOPSIS tables_ok() - thd thread (SQL slave thread normally) + thd thread (SQL slave thread normally). Mustn't be null. tables list of tables to check NOTES @@ -755,6 +870,11 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len) second call will make the decision (because all_tables_not_ok() = !tables_ok(1st_list) && !tables_ok(2nd_list)). + Thought which arose from a question of a big customer "I want to include + all tables like "abc.%" except the "%.EFG"". This can't be done now. If we + supported Perl regexps we could do it with this pattern: /^abc\.(?!EFG)/ + (I could not find an equivalent in the regex library MySQL uses). + RETURN VALUES 0 should not be logged/replicated 1 should be logged/replicated @@ -765,7 +885,24 @@ bool tables_ok(THD* thd, TABLE_LIST* tables) bool some_tables_updating= 0; DBUG_ENTER("tables_ok"); - for (; tables; tables = tables->next) + /* + In routine, can't reliably pick and choose substatements, so always + replicate. + We can't reliably know if one substatement should be executed or not: + consider the case of this substatement: a SELECT on a non-replicated + constant table; if we don't execute it maybe it was going to fill a + variable which was going to be used by the next substatement to update + a replicated table? If we execute it maybe the constant non-replicated + table does not exist (and so we'll fail) while there was no need to + execute this as this SELECT does not influence replicated tables in the + rest of the routine? In other words: users are used to replicate-*-table + specifying how to handle updates to tables, these options don't say + anything about reads to tables; we can't guess. + */ + if (thd->spcont) + DBUG_RETURN(1); + + for (; tables; tables= tables->next_global) { char hash_key[2*NAME_LEN+2]; char *end; @@ -776,7 +913,7 @@ bool tables_ok(THD* thd, TABLE_LIST* tables) some_tables_updating= 1; end= strmov(hash_key, tables->db ? tables->db : thd->db); *end++= '.'; - len= (uint) (strmov(end, tables->real_name) - hash_key); + len= (uint) (strmov(end, tables->table_name) - hash_key); if (do_table_inited) // if there are any do's { if (hash_search(&replicate_do_table, (byte*) hash_key, len)) @@ -1164,29 +1301,86 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val) return 1; } +/* + Note that we rely on the master's version (3.23, 4.0.14 etc) instead of + relying on the binlog's version. This is not perfect: imagine an upgrade + of the master without waiting that all slaves are in sync with the master; + then a slave could be fooled about the binlog's format. This is what happens + when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0 + slaves are fooled. So we do this only to distinguish between 3.23 and more + recent masters (it's too late to change things for 3.23). + + RETURNS + 0 ok + 1 error +*/ static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi) { const char* errmsg= 0; - + /* - Note the following switch will bug when we have MySQL branch 30 ;) + Free old description_event_for_queue (that is needed if we are in + a reconnection). */ - switch (*mysql->server_version) { - case '3': - mi->old_format = - (strncmp(mysql->server_version, "3.23.57", 7) < 0) /* < .57 */ ? - BINLOG_FORMAT_323_LESS_57 : - BINLOG_FORMAT_323_GEQ_57 ; - break; - case '4': - mi->old_format = BINLOG_FORMAT_CURRENT; - break; - default: - /* 5.0 is not supported */ - errmsg = "Master reported an unrecognized MySQL version. Note that 4.1 \ -slaves can't replicate a 5.0 or newer master."; - break; + delete mi->rli.relay_log.description_event_for_queue; + mi->rli.relay_log.description_event_for_queue= 0; + + if (!my_isdigit(&my_charset_bin,*mysql->server_version)) + errmsg = "Master reported unrecognized MySQL version"; + else + { + /* + Note the following switch will bug when we have MySQL branch 30 ;) + */ + switch (*mysql->server_version) + { + case '0': + case '1': + case '2': + errmsg = "Master reported unrecognized MySQL version"; + break; + case '3': + mi->rli.relay_log.description_event_for_queue= new + Format_description_log_event(1, mysql->server_version); + break; + case '4': + mi->rli.relay_log.description_event_for_queue= new + Format_description_log_event(3, mysql->server_version); + break; + default: + /* + Master is MySQL >=5.0. Give a default Format_desc event, so that we can + take the early steps (like tests for "is this a 3.23 master") which we + have to take before we receive the real master's Format_desc which will + override this one. Note that the Format_desc we create below is garbage + (it has the format of the *slave*); it's only good to help know if the + master is 3.23, 4.0, etc. + */ + mi->rli.relay_log.description_event_for_queue= new + Format_description_log_event(4, mysql->server_version); + break; + } + } + + /* + This does not mean that a 5.0 slave will be able to read a 6.0 master; but + as we don't know yet, we don't want to forbid this for now. If a 5.0 slave + can't read a 6.0 master, this will show up when the slave can't read some + events sent by the master, and there will be error messages. + */ + + if (errmsg) + { + sql_print_error(errmsg); + return 1; + } + + /* as we are here, we tried to allocate the event */ + if (!mi->rli.relay_log.description_event_for_queue) + { + sql_print_error("Slave I/O thread failed to create a default Format_description_log_event"); + return 1; } /* @@ -1244,12 +1438,20 @@ not always make sense; please check the manual before using it)."; values of these 2 are never used (new connections don't use them). We don't test equality of global collation_database either as it's is going to be deprecated (made read-only) in 4.1 very soon. - We don't do it for <3.23.57 because masters <3.23.50 hang on - SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). + The test is only relevant if master < 5.0.3 (we'll test only if it's older + than the 5 branch; < 5.0.3 was alpha...), as >= 5.0.3 master stores + charset info in each binlog event. + We don't do it for 3.23 because masters <3.23.50 hang on + SELECT @@unknown_var (BUG#7965 - see changelog of 3.23.50). So finally we + test only if master is 4.x. */ - if (mi->old_format == BINLOG_FORMAT_323_LESS_57) + + /* redundant with rest of code but safer against later additions */ + if (*mysql->server_version == '3') goto err; - if (!mysql_real_query(mysql, "SELECT @@GLOBAL.COLLATION_SERVER", 32) && + + if ((*mysql->server_version == '4') && + !mysql_real_query(mysql, "SELECT @@GLOBAL.COLLATION_SERVER", 32) && (master_res= mysql_store_result(mysql))) { if ((master_row= mysql_fetch_row(master_res)) && @@ -1272,8 +1474,11 @@ be equal for replication to work"; such check will broke everything for them. (And now everything will work for them because by default both their master and slave will have 'SYSTEM' time zone). + This check is only necessary for 4.x masters (and < 5.0.4 masters but + those were alpha). */ - if (!mysql_real_query(mysql, "SELECT @@GLOBAL.TIME_ZONE", 25) && + if ((*mysql->server_version == '4') && + !mysql_real_query(mysql, "SELECT @@GLOBAL.TIME_ZONE", 25) && (master_res= mysql_store_result(mysql))) { if ((master_row= mysql_fetch_row(master_res)) && @@ -1324,7 +1529,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, packet_len= my_net_read(net); // read create table statement if (packet_len == packet_error) { - send_error(thd, ER_MASTER_NET_READ); + my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0)); DBUG_RETURN(1); } if (net->read_pos[0] == 255) // error from master @@ -1333,7 +1538,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, err_msg= (char*) net->read_pos + ((mysql->server_capabilities & CLIENT_PROTOCOL_41) ? 3+SQLSTATE_LENGTH+1 : 3); - net_printf(thd, ER_MASTER, err_msg); + my_error(ER_MASTER, MYF(0), err_msg); DBUG_RETURN(1); } thd->command = COM_TABLE_DUMP; @@ -1342,7 +1547,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, if (!(query = thd->strmake((char*) net->read_pos, packet_len))) { sql_print_error("create_table_from_dump: out of memory"); - net_printf(thd, ER_GET_ERRNO, "Out of memory"); + my_message(ER_GET_ERRNO, "Out of memory", MYF(0)); DBUG_RETURN(1); } thd->query= query; @@ -1351,12 +1556,11 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, bzero((char*) &tables,sizeof(tables)); tables.db = (char*)db; - tables.alias= tables.real_name= (char*)table_name; + tables.alias= tables.table_name= (char*)table_name; /* Drop the table if 'overwrite' is true */ if (overwrite && mysql_rm_table(thd,&tables,1,0)) /* drop if exists */ { - send_error(thd); sql_print_error("create_table_from_dump: failed to drop the table"); goto err; } @@ -1369,7 +1573,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, save_db = thd->db; save_db_length= thd->db_length; thd->db = (char*)db; - DBUG_ASSERT(thd->db); + DBUG_ASSERT(thd->db != 0); thd->db_length= strlen(thd->db); mysql_parse(thd, thd->query, packet_len); // run create table thd->db = save_db; // leave things the way the were before @@ -1383,7 +1587,6 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, tables.lock_type = TL_WRITE; if (!open_ltable(thd, &tables, TL_WRITE)) { - send_error(thd,0,0); // Send error from open_ltable sql_print_error("create_table_from_dump: could not open created table"); goto err; } @@ -1393,7 +1596,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, /* Copy the data file */ if (file->net_read_dump(net)) { - net_printf(thd, ER_MASTER_NET_READ); + my_message(ER_MASTER_NET_READ, ER(ER_MASTER_NET_READ), MYF(0)); sql_print_error("create_table_from_dump: failed in\ handler::net_read_dump()"); goto err; @@ -1413,7 +1616,7 @@ static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db, error=file->repair(thd,&check_opt) != 0; thd->net.vio = save_vio; if (error) - net_printf(thd, ER_INDEX_REBUILD,tables.table->real_name); + my_error(ER_INDEX_REBUILD, MYF(0), tables.table->s->table_name); err: close_thread_tables(thd); @@ -1436,12 +1639,11 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name, { if (!(mysql = mysql_init(NULL))) { - send_error(thd); // EOM DBUG_RETURN(1); } if (connect_to_master(thd, mysql, mi)) { - net_printf(thd, ER_CONNECT_TO_MASTER, mysql_error(mysql)); + my_error(ER_CONNECT_TO_MASTER, MYF(0), mysql_error(mysql)); mysql_close(mysql); DBUG_RETURN(1); } @@ -1465,7 +1667,7 @@ int fetch_master_table(THD *thd, const char *db_name, const char *table_name, if (!called_connected) mysql_close(mysql); if (errmsg && thd->vio_ok()) - send_error(thd, error, errmsg); + my_message(error, errmsg, MYF(0)); DBUG_RETURN(test(error)); // Return 1 on error } @@ -1489,7 +1691,8 @@ void end_master_info(MASTER_INFO* mi) } -int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) +static int init_relay_log_info(RELAY_LOG_INFO* rli, + const char* info_fname) { char fname[FN_REFLEN+128]; int info_fd; @@ -1497,7 +1700,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) int error = 0; DBUG_ENTER("init_relay_log_info"); - if (rli->inited) // Set if this function called + if (rli->inited) // Set if this function called DBUG_RETURN(0); fn_format(fname, info_fname, mysql_data_home, "", 4+32); pthread_mutex_lock(&rli->data_lock); @@ -1508,23 +1711,10 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) rli->log_space_limit= relay_log_space_limit; rli->log_space_total= 0; - // TODO: make this work with multi-master - if (!opt_relay_logname) - { - char tmp[FN_REFLEN]; - /* - TODO: The following should be using fn_format(); We just need to - first change fn_format() to cut the file name if it's too long. - */ - strmake(tmp,glob_hostname,FN_REFLEN-5); - strmov(strcend(tmp,'.'),"-relay-bin"); - opt_relay_logname=my_strdup(tmp,MYF(MY_WME)); - } - /* The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE. - Note that the I/O thread flushes it to disk after writing every event, in - flush_master_info(mi, 1). + Note that the I/O thread flushes it to disk after writing every + event, in flush_master_info(mi, 1). */ /* @@ -1536,16 +1726,25 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname) switch to using max_binlog_size for the relay log) and update rli->relay_log.max_size (and mysql_bin_log.max_size). */ - - if (open_log(&rli->relay_log, glob_hostname, opt_relay_logname, - "-relay-bin", opt_relaylog_index_name, - LOG_BIN, 1 /* read_append cache */, - 1 /* no auto events */, - max_relay_log_size ? max_relay_log_size : max_binlog_size)) { - pthread_mutex_unlock(&rli->data_lock); - sql_print_error("Failed in open_log() called from init_relay_log_info()"); - DBUG_RETURN(1); + char buf[FN_REFLEN]; + const char *ln; + ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin", + 1, buf); + + /* + note, that if open() fails, we'll still have index file open + but a destructor will take care of that + */ + if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) || + rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0, + (max_relay_log_size ? max_relay_log_size : + max_binlog_size), 1)) + { + pthread_mutex_unlock(&rli->data_lock); + sql_print_error("Failed in open_log() called from init_relay_log_info()"); + DBUG_RETURN(1); + } } /* if file does not exist */ @@ -1575,7 +1774,7 @@ file '%s', errno %d)", fname, my_errno); /* Init relay log with first entry in the relay index file */ if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */, - &msg)) + &msg, 0)) { sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)"); goto err; @@ -1640,7 +1839,7 @@ Failed to open the existing relay log info file '%s' (errno %d)", rli->group_relay_log_name, rli->group_relay_log_pos, 0 /* no data lock*/, - &msg)) + &msg, 0)) { char llbuf[22]; sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)", @@ -1649,8 +1848,18 @@ Failed to open the existing relay log info file '%s' (errno %d)", goto err; } } - DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); + +#ifndef DBUG_OFF + { + char llbuf1[22], llbuf2[22]; + DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s", + llstr(my_b_tell(rli->cur_log),llbuf1), + llstr(rli->event_relay_log_pos,llbuf2))); + DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); + } +#endif + /* Now change the cache from READ to WRITE - must do this before flush_relay_log_info @@ -1798,9 +2007,9 @@ void clear_until_condition(RELAY_LOG_INFO* rli) int init_master_info(MASTER_INFO* mi, const char* master_info_fname, - const char* slave_info_fname, - bool abort_if_no_master_info_file, - int thread_mask) + const char* slave_info_fname, + bool abort_if_no_master_info_file, + int thread_mask) { int fd,error; char fname[FN_REFLEN+128]; @@ -1814,7 +2023,7 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, last time. If this case pos_in_file would be set and we would get a crash when trying to read the signature for the binary relay log. - + We only rewind the read position if we are starting the SQL thread. The handle_slave_sql thread assumes that the read position is at the beginning of the file, and will read the @@ -1840,7 +2049,7 @@ int init_master_info(MASTER_INFO* mi, const char* master_info_fname, fd = mi->fd; /* does master.info exist ? */ - + if (access(fname,F_OK)) { if (abort_if_no_master_info_file) @@ -1876,7 +2085,7 @@ file '%s')", fname); { if (fd >= 0) reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0); - else + else { if ((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 ) { @@ -1896,52 +2105,52 @@ file '%s')", fname); mi->fd = fd; int port, connect_retry, master_log_pos, ssl= 0, lines; char *first_non_digit; - + /* Starting from 4.1.x master.info has new format. Now its - first line contains number of lines in file. By reading this - number we will be always distinguish to which version our - master.info corresponds to. We can't simply count lines in + first line contains number of lines in file. By reading this + number we will be always distinguish to which version our + master.info corresponds to. We can't simply count lines in file since versions before 4.1.x could generate files with more lines than needed. - If first line doesn't contain a number or contain number less than + If first line doesn't contain a number or contain number less than 14 then such file is treated like file from pre 4.1.1 version. - There is no ambiguity when reading an old master.info, as before + There is no ambiguity when reading an old master.info, as before 4.1.1, the first line contained the binlog's name, which is either - empty or has an extension (contains a '.'), so can't be confused + empty or has an extension (contains a '.'), so can't be confused with an integer. - So we're just reading first line and trying to figure which version + So we're just reading first line and trying to figure which version is this. */ - - /* - The first row is temporarily stored in mi->master_log_name, - if it is line count and not binlog name (new format) it will be + + /* + The first row is temporarily stored in mi->master_log_name, + if it is line count and not binlog name (new format) it will be overwritten by the second row later. */ if (init_strvar_from_file(mi->master_log_name, sizeof(mi->master_log_name), &mi->file, "")) goto errwithmsg; - + lines= strtoul(mi->master_log_name, &first_non_digit, 10); - if (mi->master_log_name[0]!='\0' && + if (mi->master_log_name[0]!='\0' && *first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL) { // Seems to be new format - if (init_strvar_from_file(mi->master_log_name, + if (init_strvar_from_file(mi->master_log_name, sizeof(mi->master_log_name), &mi->file, "")) goto errwithmsg; } else lines= 7; - + if (init_intvar_from_file(&master_log_pos, &mi->file, 4) || init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, master_host) || init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file, - master_user) || + master_user) || init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1, &mi->file, master_password) || init_intvar_from_file(&port, &mi->file, master_port) || @@ -1949,17 +2158,17 @@ file '%s')", fname); master_connect_retry)) goto errwithmsg; - /* - If file has ssl part use it even if we have server without - SSL support. But these option will be ignored later when - slave will try connect to master, so in this case warning + /* + If file has ssl part use it even if we have server without + SSL support. But these option will be ignored later when + slave will try connect to master, so in this case warning is printed. */ - if (lines >= LINES_IN_MASTER_INFO_WITH_SSL && + if (lines >= LINES_IN_MASTER_INFO_WITH_SSL && (init_intvar_from_file(&ssl, &mi->file, master_ssl) || - init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca), + init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca), &mi->file, master_ssl_ca) || - init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath), + init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath), &mi->file, master_ssl_capath) || init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert), &mi->file, master_ssl_cert) || @@ -1970,11 +2179,11 @@ file '%s')", fname); goto errwithmsg; #ifndef HAVE_OPENSSL if (ssl) - sql_print_error("SSL information in the master info file " + sql_print_warning("SSL information in the master info file " "('%s') are ignored because this MySQL slave was compiled " "without SSL support.", fname); #endif /* HAVE_OPENSSL */ - + /* This has to be handled here as init_intvar_from_file can't handle my_off_t types @@ -1994,15 +2203,15 @@ file '%s')", fname); mi->inited = 1; // now change cache READ -> WRITE - must do this before flush_master_info - reinit_io_cache(&mi->file, WRITE_CACHE,0L,0,1); + reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1); if ((error=test(flush_master_info(mi, 1)))) sql_print_error("Failed to flush master info file"); pthread_mutex_unlock(&mi->data_lock); DBUG_RETURN(error); - + errwithmsg: sql_print_error("Error reading master configuration"); - + err: if (fd >= 0) { @@ -2095,7 +2304,7 @@ void table_rule_ent_dynamic_array_to_str(String* s, DYNAMIC_ARRAY* a) } } -int show_master_info(THD* thd, MASTER_INFO* mi) +bool show_master_info(THD* thd, MASTER_INFO* mi) { // TODO: fix this for multi-master List<Item> field_list; @@ -2157,8 +2366,9 @@ int show_master_info(THD* thd, MASTER_INFO* mi) field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10, MYSQL_TYPE_LONGLONG)); - if (protocol->send_fields(&field_list, 1)) - DBUG_RETURN(-1); + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) + DBUG_RETURN(TRUE); if (mi->host[0]) { @@ -2271,10 +2481,10 @@ int show_master_info(THD* thd, MASTER_INFO* mi) pthread_mutex_unlock(&mi->data_lock); if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length())) - DBUG_RETURN(-1); + DBUG_RETURN(TRUE); } send_eof(thd); - DBUG_RETURN(0); + DBUG_RETURN(FALSE); } @@ -2344,6 +2554,7 @@ st_relay_log_info::st_relay_log_info() bzero((char*) &info_file, sizeof(info_file)); bzero((char*) &cache_buf, sizeof(cache_buf)); + cached_charset_invalidate(); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST); @@ -2364,6 +2575,7 @@ st_relay_log_info::~st_relay_log_info() pthread_cond_destroy(&start_cond); pthread_cond_destroy(&stop_cond); pthread_cond_destroy(&log_space_cond); + relay_log.cleanup(); } /* @@ -2401,17 +2613,16 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, ulong init_abort_pos_wait; int error=0; struct timespec abstime; // for timeout checking - set_timespec(abstime,timeout); - + const char *msg; DBUG_ENTER("wait_for_pos"); - DBUG_PRINT("enter",("group_master_log_name: '%s' pos: %lu timeout: %ld", - group_master_log_name, (ulong) group_master_log_pos, - (long) timeout)); + DBUG_PRINT("enter",("log_name: '%s' log_pos: %lu timeout: %lu", + log_name->c_ptr(), (ulong) log_pos, (ulong) timeout)); + set_timespec(abstime,timeout); pthread_mutex_lock(&data_lock); - const char *msg= thd->enter_cond(&data_cond, &data_lock, - "Waiting for the slave SQL thread to " - "advance position"); + msg= thd->enter_cond(&data_cond, &data_lock, + "Waiting for the slave SQL thread to " + "advance position"); /* This function will abort when it notices that some CHANGE MASTER or RESET MASTER has changed the master info. @@ -2468,6 +2679,12 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name, bool pos_reached; int cmp_result= 0; + DBUG_PRINT("info", + ("init_abort_pos_wait: %ld abort_pos_wait: %ld", + init_abort_pos_wait, abort_pos_wait)); + DBUG_PRINT("info",("group_master_log_name: '%s' pos: %lu", + group_master_log_name, (ulong) group_master_log_pos)); + /* group_master_log_name can be "", if we are just after a fresh replication start or after a CHANGE MASTER TO MASTER_HOST/PORT @@ -2550,7 +2767,7 @@ err: thd->exit_cond(msg); DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \ improper_arguments: %d timed_out: %d", - (int) thd->killed, + thd->killed_errno(), (int) (init_abort_pos_wait != abort_pos_wait), (int) slave_running, (int) (error == -2), @@ -2563,6 +2780,24 @@ improper_arguments: %d timed_out: %d", DBUG_RETURN( error ? error : event_count ); } +void set_slave_thread_options(THD* thd) +{ + thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | + OPTION_AUTO_IS_NULL; + thd->variables.completion_type= 0; +} + +void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli) +{ + thd->variables.character_set_client= + global_system_variables.character_set_client; + thd->variables.collation_connection= + global_system_variables.collation_connection; + thd->variables.collation_server= + global_system_variables.collation_server; + thd->update_charset(); + rli->cached_charset_invalidate(); +} /* init_slave_thread() @@ -2573,13 +2808,19 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type) DBUG_ENTER("init_slave_thread"); thd->system_thread = (thd_type == SLAVE_THD_SQL) ? SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; + /* + The two next lines are needed for replication of SP (CREATE PROCEDURE + needs a valid user to store in mysql.proc). + */ + thd->priv_user= (char *) ""; + thd->priv_host[0]= '\0'; thd->host_or_ip= ""; thd->client_capabilities = 0; my_net_init(&thd->net, 0); thd->net.read_timeout = slave_net_timeout; thd->master_access= ~0; - thd->priv_user = 0; thd->slave_thread = 1; + set_slave_thread_options(thd); /* It's nonsense to constrain the slave threads with max_join_size; if a query succeeded on master, we HAVE to execute it. So set @@ -2759,8 +3000,7 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings) *suppress_warnings= TRUE; } else - sql_print_error("Error reading packet from server: %s (\ -server_errno=%d)", + sql_print_error("Error reading packet from server: %s ( server_errno=%d)", mysql_error(mysql), mysql_errno(mysql)); return packet_error; } @@ -2768,8 +3008,8 @@ server_errno=%d)", /* Check if eof packet */ if (len < 8 && mysql->net.read_pos[0] == 254) { - sql_print_error("Slave: received end packet from server, apparent\ - master shutdown: %s", + sql_print_information("Slave: received end packet from server, apparent " + "master shutdown: %s", mysql_error(mysql)); return packet_error; } @@ -2841,13 +3081,14 @@ bool st_relay_log_info::is_until_satisfied() if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN) { - /* - We have no cached comaprison results so we should compare log names - and cache result + /* + We have no cached comparison results so we should compare log names + and cache result. + If we are after RESET SLAVE, and the SQL slave thread has not processed + any event yet, it could be that group_master_log_name is "". In that case, + just wait for more events (as there is no sensible comparison to do). */ - DBUG_ASSERT(*log_name || log_pos == 0); - if (*log_name) { const char *basename= log_name + dirname_length(log_name); @@ -2883,21 +3124,39 @@ bool st_relay_log_info::is_until_satisfied() } +void st_relay_log_info::cached_charset_invalidate() +{ + /* Full of zeroes means uninitialized. */ + bzero(cached_charset, sizeof(cached_charset)); +} + + +bool st_relay_log_info::cached_charset_compare(char *charset) +{ + if (bcmp(cached_charset, charset, sizeof(cached_charset))) + { + memcpy(cached_charset, charset, sizeof(cached_charset)); + return 1; + } + return 0; +} + + static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) { /* We acquire this mutex since we need it for all operations except - event execution. But we will release it in places where we will + event execution. But we will release it in places where we will wait for something for example inside of next_event(). */ pthread_mutex_lock(&rli->data_lock); - - if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE && - rli->is_until_satisfied()) + + if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE && + rli->is_until_satisfied()) { sql_print_error("Slave SQL thread stopped because it reached its" " UNTIL position %ld", (long) rli->until_pos()); - /* + /* Setting abort_slave flag because we do not want additional message about error in query execution to be printed. */ @@ -2905,11 +3164,11 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) pthread_mutex_unlock(&rli->data_lock); return 1; } - + Log_event * ev = next_event(rli); - + DBUG_ASSERT(rli->sql_thd==thd); - + if (sql_slave_killed(thd,rli)) { pthread_mutex_unlock(&rli->data_lock); @@ -2922,35 +3181,70 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) int exec_res; /* - Skip queries originating from this server or number of - queries specified by the user in slave_skip_counter - We can't however skip event's that has something to do with the + Queries originating from this server must be skipped. + Low-level events (Format_desc, Rotate, Stop) from this server + must also be skipped. But for those we don't want to modify + group_master_log_pos, because these events did not exist on the master. + Format_desc is not completely skipped. + Skip queries specified by the user in slave_skip_counter. + We can't however skip events that has something to do with the log files themselves. + Filtering on own server id is extremely important, to ignore execution of + events created by the creation/rotation of the relay log (remember that + now the relay log starts with its Format_desc, has a Rotate etc). */ - if ((ev->server_id == (uint32) ::server_id && !replicate_same_server_id) || - (rli->slave_skip_counter && type_code != ROTATE_EVENT)) + DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id)); + + if ((ev->server_id == (uint32) ::server_id && + !replicate_same_server_id && + type_code != FORMAT_DESCRIPTION_EVENT) || + (rli->slave_skip_counter && + type_code != ROTATE_EVENT && type_code != STOP_EVENT && + type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT)) { - /* TODO: I/O thread should not even log events with the same server id */ - rli->inc_group_relay_log_pos(ev->get_event_len(), - type_code != STOP_EVENT ? ev->log_pos : LL(0), - 1/* skip lock*/); - flush_relay_log_info(rli); + DBUG_PRINT("info", ("event skipped")); + if (thd->options & OPTION_BEGIN) + rli->inc_event_relay_log_pos(); + else + { + rli->inc_group_relay_log_pos((type_code == ROTATE_EVENT || + type_code == STOP_EVENT || + type_code == FORMAT_DESCRIPTION_EVENT) ? + LL(0) : ev->log_pos, + 1/* skip lock*/); + flush_relay_log_info(rli); + } /* - Protect against common user error of setting the counter to 1 - instead of 2 while recovering from an failed auto-increment insert + Protect against common user error of setting the counter to 1 + instead of 2 while recovering from an insert which used auto_increment, + rand or user var. */ - if (rli->slave_skip_counter && - !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) && - rli->slave_skip_counter == 1)) + if (rli->slave_skip_counter && + !((type_code == INTVAR_EVENT || + type_code == RAND_EVENT || + type_code == USER_VAR_EVENT) && + rli->slave_skip_counter == 1) && + /* + The events from ourselves which have something to do with the relay + log itself must be skipped, true, but they mustn't decrement + rli->slave_skip_counter, because the user is supposed to not see + these events (they are not in the master's binlog) and if we + decremented, START SLAVE would for example decrement when it sees + the Rotate, so the event which the user probably wanted to skip + would not be skipped. + */ + !(ev->server_id == (uint32) ::server_id && + (type_code == ROTATE_EVENT || type_code == STOP_EVENT || + type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT))) --rli->slave_skip_counter; pthread_mutex_unlock(&rli->data_lock); - delete ev; - return 0; // avoid infinite update loops - } + delete ev; + return 0; // avoid infinite update loops + } pthread_mutex_unlock(&rli->data_lock); - + thd->server_id = ev->server_id; // use the original server id for logging thd->set_time(); // time the query thd->lex->current_select= 0; @@ -2959,7 +3253,16 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) ev->thd = thd; exec_res = ev->exec_event(rli); DBUG_ASSERT(rli->sql_thd==thd); - delete ev; + /* + Format_description_log_event should not be deleted because it will be + used to read info about the relay log's format; it will be deleted when + the SQL thread does not need it, i.e. when this thread terminates. + */ + if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) + { + DBUG_PRINT("info", ("Deleting the event after it has been executed")); + delete ev; + } if (slave_trans_retries) { if (exec_res && @@ -2988,7 +3291,7 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) else if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, - 1, &errmsg)) + 1, &errmsg, 1)) sql_print_error("Error initializing relay log position: %s", errmsg); else @@ -3039,17 +3342,17 @@ extern "C" pthread_handler_decl(handle_slave_io,arg) { THD *thd; // needs to be first for thread_stack MYSQL *mysql; - MASTER_INFO *mi = (MASTER_INFO*)arg; + MASTER_INFO *mi = (MASTER_INFO*)arg; char llbuff[22]; uint retry_count; - + // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); DBUG_ENTER("handle_slave_io"); #ifndef DBUG_OFF slave_begin: -#endif +#endif DBUG_ASSERT(mi->inited); mysql= NULL ; retry_count= 0; @@ -3058,10 +3361,10 @@ slave_begin: /* Inform waiting threads that slave has started */ mi->slave_run_id++; -#ifndef DBUG_OFF +#ifndef DBUG_OFF mi->events_till_abort = abort_slave_event_count; -#endif - +#endif + thd= new THD; // note that contructor of THD uses DBUG_ ! THD_CHECK_SENTRY(thd); @@ -3082,17 +3385,16 @@ slave_begin: mi->abort_slave = 0; pthread_mutex_unlock(&mi->run_lock); pthread_cond_broadcast(&mi->start_cond); - + DBUG_PRINT("master_info",("log_file_name: '%s' position: %s", mi->master_log_name, llstr(mi->master_log_pos,llbuff))); - + if (!(mi->mysql = mysql = mysql_init(NULL))) { sql_print_error("Slave I/O thread: error in mysql_init()"); goto err; } - thd->proc_info = "Connecting to master"; // we can get killed during safe_connect @@ -3104,7 +3406,7 @@ slave_begin: llstr(mi->master_log_pos,llbuff)); else { - sql_print_error("Slave I/O thread killed while connecting to master"); + sql_print_information("Slave I/O thread killed while connecting to master"); goto err; } @@ -3116,7 +3418,8 @@ connected: thd->proc_info = "Checking master version"; if (get_master_version_and_clock(mysql, mi)) goto err; - if (!mi->old_format) + + if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1) { /* Register ourselves with the master. @@ -3127,22 +3430,22 @@ connected: if (register_slave_on_master(mysql) || update_slave_list(mysql, mi)) goto err; } - + DBUG_PRINT("info",("Starting reading binary log from master")); while (!io_slave_killed(thd,mi)) { - bool suppress_warnings= 0; + bool suppress_warnings= 0; thd->proc_info = "Requesting binlog dump"; if (request_dump(mysql, mi, &suppress_warnings)) { sql_print_error("Failed on request_dump()"); if (io_slave_killed(thd,mi)) { - sql_print_error("Slave I/O thread killed while requesting master \ + sql_print_information("Slave I/O thread killed while requesting master \ dump"); goto err; } - + mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT; thd->proc_info= "Waiting to reconnect after a failed binlog dump request"; #ifdef SIGNAL_WITH_VIO_CLOSE @@ -3163,7 +3466,7 @@ dump"); } if (io_slave_killed(thd,mi)) { - sql_print_error("Slave I/O thread killed while retrying master \ + sql_print_information("Slave I/O thread killed while retrying master \ dump"); goto err; } @@ -3176,7 +3479,7 @@ reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME, if (safe_reconnect(thd, mysql, mi, suppress_warnings) || io_slave_killed(thd,mi)) { - sql_print_error("Slave I/O thread killed during or \ + sql_print_information("Slave I/O thread killed during or \ after reconnect"); goto err; } @@ -3186,22 +3489,22 @@ after reconnect"); while (!io_slave_killed(thd,mi)) { - bool suppress_warnings= 0; - /* + bool suppress_warnings= 0; + /* We say "waiting" because read_event() will wait if there's nothing to - read. But if there's something to read, it will not wait. The important - thing is to not confuse users by saying "reading" whereas we're in fact - receiving nothing. + read. But if there's something to read, it will not wait. The + important thing is to not confuse users by saying "reading" whereas + we're in fact receiving nothing. */ thd->proc_info = "Waiting for master to send event"; ulong event_len = read_event(mysql, mi, &suppress_warnings); if (io_slave_killed(thd,mi)) { if (global_system_variables.log_warnings) - sql_print_error("Slave I/O thread killed while reading event"); + sql_print_information("Slave I/O thread killed while reading event"); goto err; } - + if (event_len == packet_error) { uint mysql_error_number= mysql_errno(mysql); @@ -3232,30 +3535,30 @@ max_allowed_packet", goto err; // Don't retry forever safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed, (void*) mi); - } + } if (io_slave_killed(thd,mi)) { if (global_system_variables.log_warnings) - sql_print_error("Slave I/O thread killed while waiting to \ + sql_print_information("Slave I/O thread killed while waiting to \ reconnect after a failed read"); goto err; } thd->proc_info = "Reconnecting after a failed master event read"; if (!suppress_warnings) - sql_print_error("Slave I/O thread: Failed reading log event, \ + sql_print_information("Slave I/O thread: Failed reading log event, \ reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME, llstr(mi->master_log_pos, llbuff)); if (safe_reconnect(thd, mysql, mi, suppress_warnings) || io_slave_killed(thd,mi)) { if (global_system_variables.log_warnings) - sql_print_error("Slave I/O thread killed during or after a \ + sql_print_information("Slave I/O thread killed during or after a \ reconnect done to recover from failed read"); goto err; } goto connected; } // if (event_len == packet_error) - + retry_count=0; // ok event, reset retry counter thd->proc_info = "Queueing master event to the relay log"; if (queue_event(mi,(const char*)mysql->net.read_pos + 1, @@ -3311,7 +3614,7 @@ log space"); // error = 0; err: // print the current replication position - sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s", + sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); VOID(pthread_mutex_lock(&LOCK_thread_count)); thd->query = thd->db = 0; // extra safety @@ -3326,6 +3629,9 @@ err: pthread_mutex_lock(&mi->run_lock); mi->slave_running = 0; mi->io_thd = 0; + /* Forget the relay log's format */ + delete mi->rli.relay_log.description_event_for_queue; + mi->rli.relay_log.description_event_for_queue= 0; // TODO: make rpl_status part of MASTER_INFO change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE); mi->abort_slave = 0; // TODO: check if this is needed @@ -3430,15 +3736,38 @@ slave_begin: if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, - 1 /*need data lock*/, &errmsg)) + 1 /*need data lock*/, &errmsg, + 1 /*look for a description_event*/)) { sql_print_error("Error initializing relay log position: %s", errmsg); goto err; } THD_CHECK_SENTRY(thd); - DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); +#ifndef DBUG_OFF + { + char llbuf1[22], llbuf2[22]; + DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s", + llstr(my_b_tell(rli->cur_log),llbuf1), + llstr(rli->event_relay_log_pos,llbuf2))); + DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE); + /* + Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the + correct position when it's called just after my_b_seek() (the questionable + stuff is those "seek is done on next read" comments in the my_b_seek() + source code). + The crude reality is that this assertion randomly fails whereas + replication seems to work fine. And there is no easy explanation why it + fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of + init_relay_log_pos() called above). Maybe the assertion would be + meaningful if we held rli->data_lock between the my_b_seek() and the + DBUG_ASSERT(). + */ +#ifdef SHOULD_BE_CHECKED + DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos); +#endif + } +#endif DBUG_ASSERT(rli->sql_thd == thd); DBUG_PRINT("master_info",("log_file_name: %s position: %s", @@ -3482,12 +3811,18 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ } /* Thread stopped. Print the current replication position to the log */ - sql_print_information("Slave SQL thread exiting, replication stopped in log \ - '%s' at position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); + sql_print_information("Slave SQL thread exiting, replication stopped in log " + "'%s' at position %s", + RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff)); err: VOID(pthread_mutex_lock(&LOCK_thread_count)); - thd->query = thd->db = 0; // extra safety + /* + Some extra safety, which should not been needed (normally, event deletion + should already have done these assignments (each event which sets these + variables is supposed to set them to 0 before terminating)). + */ + thd->query= thd->db= thd->catalog= 0; thd->query_length= thd->db_length= 0; VOID(pthread_mutex_unlock(&LOCK_thread_count)); thd->proc_info = "Waiting for slave mutex on exit"; @@ -3497,16 +3832,16 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun /* When master_pos_wait() wakes up it will check this and terminate */ rli->slave_running= 0; - /* - Going out of the transaction. Necessary to mark it, in case the user - restarts replication from a non-transactional statement (with CHANGE - MASTER). - */ + /* Forget the relay log's format */ + delete rli->relay_log.description_event_for_exec; + rli->relay_log.description_event_for_exec= 0; /* Wake up master_pos_wait() */ pthread_mutex_unlock(&rli->data_lock); DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions")); pthread_cond_broadcast(&rli->data_cond); rli->ignore_log_space_limit= 0; /* don't need any lock */ + /* we die so won't remember charset - re-update them on next thread start */ + rli->cached_charset_invalidate(); rli->save_temporary_tables = thd->temporary_tables; /* @@ -3599,7 +3934,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev) if (unlikely(cev_not_written)) break; Execute_load_log_event xev(thd,0,0); - xev.log_pos = mi->master_log_pos; + xev.log_pos = cev->log_pos; if (unlikely(mi->rli.relay_log.append(&xev))) { sql_print_error("Slave I/O: error writing Exec_load event to \ @@ -3613,7 +3948,6 @@ relay log"); { cev->block = (char*)net->read_pos; cev->block_len = num_bytes; - cev->log_pos = mi->master_log_pos; if (unlikely(mi->rli.relay_log.append(cev))) { sql_print_error("Slave I/O: error writing Create_file event to \ @@ -3627,7 +3961,7 @@ relay log"); { aev.block = (char*)net->read_pos; aev.block_len = num_bytes; - aev.log_pos = mi->master_log_pos; + aev.log_pos = cev->log_pos; if (unlikely(mi->rli.relay_log.append(&aev))) { sql_print_error("Slave I/O: error writing Append_block event to \ @@ -3655,6 +3989,7 @@ err: DESCRIPTION Updates the master info with the place in the next binary log where we should start reading. + Rotate the relay log to avoid mixed-format relay logs. NOTES We assume we already locked mi->data_lock @@ -3685,21 +4020,34 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev) if (disconnect_slave_event_count) events_till_disconnect++; #endif + + /* + If description_event_for_queue is format <4, there is conversion in the + relay log to the slave's format (4). And Rotate can mean upgrade or + nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so + no need to reset description_event_for_queue now. And if it's nothing (same + master version as before), no need (still using the slave's format). + */ + if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4) + { + delete mi->rli.relay_log.description_event_for_queue; + /* start from format 3 (MySQL 4.0) again */ + mi->rli.relay_log.description_event_for_queue= new + Format_description_log_event(3); + } + /* + Rotate the relay log makes binlog format detection easier (at next slave + start or mysqlbinlog) + */ + rotate_relay_log(mi); /* will take the right mutexes */ DBUG_RETURN(0); } - /* - queue_old_event() - - Writes a 3.23 event to the relay log. - - TODO: - Test this code before release - it has to be tested on a separate - setup with 3.23 master + Reads a 3.23 event and converts it to the slave's format. This code was + copied from MySQL 4.0. */ - -static int queue_old_event(MASTER_INFO *mi, const char *buf, +static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf, ulong event_len) { const char *errmsg = 0; @@ -3707,7 +4055,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, bool ignore_event= 0; char *tmp_buf = 0; RELAY_LOG_INFO *rli= &mi->rli; - DBUG_ENTER("queue_old_event"); + DBUG_ENTER("queue_binlog_ver_1_event"); /* If we get Load event, we need to pass a non-reusable buffer @@ -3739,7 +4087,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, connected to the master). */ Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg, - 1 /*old format*/ ); + mi->rli.relay_log.description_event_for_queue); if (unlikely(!ev)) { sql_print_error("Read invalid event from master: '%s',\ @@ -3749,7 +4097,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, DBUG_RETURN(1); } pthread_mutex_lock(&mi->data_lock); - ev->log_pos = mi->master_log_pos; + ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */ switch (ev->get_type_code()) { case STOP_EVENT: ignore_event= 1; @@ -3773,14 +4121,12 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, */ { /* We come here when and only when tmp_buf != 0 */ - DBUG_ASSERT(tmp_buf); + DBUG_ASSERT(tmp_buf != 0); + inc_pos=event_len; + ev->log_pos+= inc_pos; int error = process_io_create_file(mi,(Create_file_log_event*)ev); delete ev; - /* - We had incremented event_len, but now when it is used to calculate the - position in the master's log, we must use the original value. - */ - mi->master_log_pos += --event_len; + mi->master_log_pos += inc_pos; DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); pthread_mutex_unlock(&mi->data_lock); my_free((char*)tmp_buf, MYF(0)); @@ -3792,6 +4138,12 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, } if (likely(!ignore_event)) { + if (ev->log_pos) + /* + Don't do it for fake Rotate events (see comment in + Log_event::Log_event(const char* buf...) in log_event.cc). + */ + ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */ if (unlikely(rli->relay_log.append(ev))) { delete ev; @@ -3807,10 +4159,98 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf, DBUG_RETURN(0); } +/* + Reads a 4.0 event and converts it to the slave's format. This code was copied + from queue_binlog_ver_1_event(), with some affordable simplifications. +*/ +static int queue_binlog_ver_3_event(MASTER_INFO *mi, const char *buf, + ulong event_len) +{ + const char *errmsg = 0; + ulong inc_pos; + char *tmp_buf = 0; + RELAY_LOG_INFO *rli= &mi->rli; + DBUG_ENTER("queue_binlog_ver_3_event"); + + /* read_log_event() will adjust log_pos to be end_log_pos */ + Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue); + if (unlikely(!ev)) + { + sql_print_error("Read invalid event from master: '%s',\ + master could be corrupt but a more likely cause of this is a bug", + errmsg); + my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_RETURN(1); + } + pthread_mutex_lock(&mi->data_lock); + switch (ev->get_type_code()) { + case STOP_EVENT: + goto err; + case ROTATE_EVENT: + if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev))) + { + delete ev; + pthread_mutex_unlock(&mi->data_lock); + DBUG_RETURN(1); + } + inc_pos= 0; + break; + default: + inc_pos= event_len; + break; + } + if (unlikely(rli->relay_log.append(ev))) + { + delete ev; + pthread_mutex_unlock(&mi->data_lock); + DBUG_RETURN(1); + } + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + delete ev; + mi->master_log_pos+= inc_pos; +err: + DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); + pthread_mutex_unlock(&mi->data_lock); + DBUG_RETURN(0); +} + +/* + queue_old_event() + + Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0 + (exactly, slave's) format. To do the conversion, we create a 5.0 event from + the 3.23/4.0 bytes, then write this event to the relay log. + + TODO: + Test this code before release - it has to be tested on a separate + setup with 3.23 master or 4.0 master +*/ + +static int queue_old_event(MASTER_INFO *mi, const char *buf, + ulong event_len) +{ + switch (mi->rli.relay_log.description_event_for_queue->binlog_version) + { + case 1: + return queue_binlog_ver_1_event(mi,buf,event_len); + case 3: + return queue_binlog_ver_3_event(mi,buf,event_len); + default: /* unsupported format; eg version 2 */ + DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()", + mi->rli.relay_log.description_event_for_queue->binlog_version)); + return 1; + } +} /* queue_event() + If the event is 3.23/4.0, passes it to queue_old_event() which will convert + it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is + no format conversion, it's pure read/write of bytes. + So a 5.0.0 slave's relay log can contain events in the slave's format or in + any >=5.0.0 format. */ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) @@ -3820,7 +4260,8 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) RELAY_LOG_INFO *rli= &mi->rli; DBUG_ENTER("queue_event"); - if (mi->old_format) + if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 && + buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */) DBUG_RETURN(queue_old_event(mi,buf,event_len)); pthread_mutex_lock(&mi->data_lock); @@ -3837,7 +4278,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) master server shutdown. The only thing this does is cleaning. But cleaning is already done on a per-master-thread basis (as the master server is shutting down cleanly, it has written all DROP TEMPORARY TABLE - and DO RELEASE_LOCK; prepared statements' deletion are TODO). + prepared statements' deletion are TODO only when we binlog prep stmts). We don't even increment mi->master_log_pos, because we may be just after a Rotate event. Btw, in a few milliseconds we are going to have a Start @@ -3847,7 +4288,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) goto err; case ROTATE_EVENT: { - Rotate_log_event rev(buf,event_len,0); + Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue); if (unlikely(process_io_rotate(mi,&rev))) { error= 1; @@ -3860,6 +4301,42 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) inc_pos= 0; break; } + case FORMAT_DESCRIPTION_EVENT: + { + /* + Create an event, and save it (when we rotate the relay log, we will have + to write this event again). + */ + /* + We are the only thread which reads/writes description_event_for_queue. + The relay_log struct does not move (though some members of it can + change), so we needn't any lock (no rli->data_lock, no log lock). + */ + Format_description_log_event* tmp; + const char* errmsg; + if (!(tmp= (Format_description_log_event*) + Log_event::read_log_event(buf, event_len, &errmsg, + mi->rli.relay_log.description_event_for_queue))) + { + error= 2; + goto err; + } + delete mi->rli.relay_log.description_event_for_queue; + mi->rli.relay_log.description_event_for_queue= tmp; + /* + Though this does some conversion to the slave's format, this will + preserve the master's binlog format version, and number of event types. + */ + /* + If the event was not requested by the slave (the slave did not ask for + it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos + */ + inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0; + DBUG_PRINT("info",("binlog format is now %d", + mi->rli.relay_log.description_event_for_queue->binlog_version)); + + } + break; default: inc_pos= event_len; break; @@ -3886,23 +4363,32 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) We still want to increment, so that we won't re-read this event from the master if the slave IO thread is now stopped/restarted (more efficient if the events we are ignoring are big LOAD DATA INFILE). + But events which were generated by this slave and which do not exist in + the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment + mi->master_log_pos. */ - mi->master_log_pos+= inc_pos; + if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT && + buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT && + buf[EVENT_TYPE_OFFSET]!=STOP_EVENT) + mi->master_log_pos+= inc_pos; DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos)); } else { /* write the event to the relay log */ - if (likely(!(error= rli->relay_log.appendv(buf,event_len,0)))) + if (likely(!(rli->relay_log.appendv(buf,event_len,0)))) { mi->master_log_pos+= inc_pos; DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos)); rli->relay_log.harvest_bytes_written(&rli->log_space_total); } + else + error=3; } err: pthread_mutex_unlock(&mi->data_lock); + DBUG_PRINT("info", ("error=%d", error)); DBUG_RETURN(error); } @@ -3927,6 +4413,7 @@ void end_relay_log_info(RELAY_LOG_INFO* rli) } rli->inited = 0; rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT); + rli->relay_log.harvest_bytes_written(&rli->log_space_total); /* Delete the slave's temporary tables from memory. In the future there will be other actions than this, to ensure persistance @@ -4055,6 +4542,7 @@ replication resumed in log '%s' at position %s", mi->user, thd->set_active_vio(mysql->net.vio); #endif } + mysql->reconnect= 1; DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed)); DBUG_RETURN(slave_was_killed); } @@ -4148,6 +4636,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg) relay_log_pos Current log pos pending Number of bytes already processed from the event */ + rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE); my_b_seek(cur_log,rli->event_relay_log_pos); DBUG_RETURN(cur_log); } @@ -4206,28 +4695,40 @@ Log_event* next_event(RELAY_LOG_INFO* rli) hot_log=0; // Using old binary log } } + #ifndef DBUG_OFF { + /* This is an assertion which sometimes fails, let's try to track it */ char llbuf1[22], llbuf2[22]; + DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s", + llstr(my_b_tell(cur_log),llbuf1), + llstr(rli->event_relay_log_pos,llbuf2))); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - /* - The next assertion sometimes (very rarely) fails, let's try to track - it - */ - DBUG_PRINT("info", ("\ -Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s", - llstr(my_b_tell(cur_log),llbuf1), - llstr(rli->group_relay_log_pos,llbuf2))); - DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); + DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); } #endif /* Relay log is always in new format - if the master is 3.23, the - I/O thread will convert the format for us + I/O thread will convert the format for us. + A problem: the description event may be in a previous relay log. So if + the slave has been shutdown meanwhile, we would have to look in old relay + logs, which may even have been deleted. So we need to write this + description event at the beginning of the relay log. + When the relay log is created when the I/O thread starts, easy: the + master will send the description event and we will queue it. + But if the relay log is created by new_file(): then the solution is: + MYSQL_LOG::open() will write the buffered description event. */ - if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */))) + if ((ev=Log_event::read_log_event(cur_log,0, + rli->relay_log.description_event_for_exec))) + { DBUG_ASSERT(thd==rli->sql_thd); + /* + read it while we have a lock, to avoid a mutex lock in + inc_event_relay_log_pos() + */ + rli->future_event_relay_log_pos= my_b_tell(cur_log); if (hot_log) pthread_mutex_unlock(log_lock); DBUG_RETURN(ev); @@ -4384,8 +4885,8 @@ Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s", { #ifdef EXTRA_DEBUG if (global_system_variables.log_warnings) - sql_print_error("next log '%s' is currently active", - rli->linfo.log_file_name); + sql_print_information("next log '%s' is currently active", + rli->linfo.log_file_name); #endif rli->cur_log= cur_log= rli->relay_log.get_log_file(); rli->cur_log_old_open_count= rli->relay_log.get_open_count(); @@ -4414,8 +4915,8 @@ Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s", */ #ifdef EXTRA_DEBUG if (global_system_variables.log_warnings) - sql_print_error("next log '%s' is not active", - rli->linfo.log_file_name); + sql_print_information("next log '%s' is not active", + rli->linfo.log_file_name); #endif // open_binlog() will check the magic header if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name, @@ -4441,7 +4942,11 @@ event(errno: %d cur_log->error: %d)", } } if (!errmsg && global_system_variables.log_warnings) - errmsg = "slave SQL thread was killed"; + { + sql_print_information("Error reading relay log event: %s", + "slave SQL thread was killed"); + DBUG_RETURN(0); + } err: if (errmsg) @@ -4461,8 +4966,9 @@ void rotate_relay_log(MASTER_INFO* mi) DBUG_ENTER("rotate_relay_log"); RELAY_LOG_INFO* rli= &mi->rli; - lock_slave_threads(mi); - pthread_mutex_lock(&rli->data_lock); + /* We don't lock rli->run_lock. This would lead to deadlocks. */ + pthread_mutex_lock(&mi->run_lock); + /* We need to test inited because otherwise, new_file() will attempt to lock LOCK_log, which may not be inited (if we're not a slave). @@ -4491,8 +4997,7 @@ void rotate_relay_log(MASTER_INFO* mi) */ rli->relay_log.harvest_bytes_written(&rli->log_space_total); end: - pthread_mutex_unlock(&rli->data_lock); - unlock_slave_threads(mi); + pthread_mutex_unlock(&mi->run_lock); DBUG_VOID_RETURN; } |