diff options
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 162 |
1 files changed, 99 insertions, 63 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 754b877f654..629e046ed0a 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -14,6 +14,7 @@ along with this program; if not, write to the Free Software Foundation, 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */ +#include <my_global.h> #include "sql_priv.h" #include "unireg.h" // HAVE_* #include "rpl_mi.h" @@ -518,6 +519,90 @@ void Relay_log_info::clear_until_condition() /* + Read the correct format description event for starting to replicate from + a given position in a relay log file. +*/ +Format_description_log_event * +read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, + const char **errmsg) +{ + Log_event *ev; + Format_description_log_event *fdev; + bool found= false; + + /* + By default the relay log is in binlog format 3 (4.0). + Even if format is 4, this will work enough to read the first event + (Format_desc) (remember that format 4 is just lenghtened compared to format + 3; format 3 is a prefix of format 4). + */ + fdev= new Format_description_log_event(3); + + while (!found) + { + Log_event_type typ; + + /* + Read the possible Format_description_log_event; if position + was 4, no need, it will be read naturally. + */ + DBUG_PRINT("info",("looking for a Format_description_log_event")); + + if (my_b_tell(cur_log) >= start_pos) + break; + + if (!(ev= Log_event::read_log_event(cur_log, 0, fdev, + opt_slave_sql_verify_checksum))) + { + DBUG_PRINT("info",("could not read event, cur_log->error=%d", + cur_log->error)); + if (cur_log->error) /* not EOF */ + { + *errmsg= "I/O error reading event at position 4"; + delete fdev; + return NULL; + } + break; + } + typ= ev->get_type_code(); + if (typ == FORMAT_DESCRIPTION_EVENT) + { + DBUG_PRINT("info",("found Format_description_log_event")); + delete fdev; + fdev= (Format_description_log_event*) ev; + /* + As ev was returned by read_log_event, it has passed is_valid(), so + my_malloc() in ctor worked, no need to check again. + */ + /* + Ok, we found a Format_description event. But it is not sure that this + describes the whole relay log; indeed, one can have this sequence + (starting from position 4): + Format_desc (of slave) + Rotate (of master) + Format_desc (of master) + So the Format_desc which really describes the rest of the relay log + is the 3rd event (it can't be further than that, because we rotate + the relay log when we queue a Rotate event from the master). + But what describes the Rotate is the first Format_desc. + So what we do is: + go on searching for Format_description events, until you exceed the + position (argument 'pos') or until you find another event than Rotate + or Format_desc. + */ + } + else + { + DBUG_PRINT("info",("found event of another type=%d", typ)); + found= (typ != ROTATE_EVENT); + delete ev; + } + } + return fdev; +} + + +/* Open the given relay log SYNOPSIS @@ -640,68 +725,13 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log, */ if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */ { - Log_event* ev; - while (look_for_description_event) + if (look_for_description_event) { - /* - Read the possible Format_description_log_event; if position - was 4, no need, it will be read naturally. - */ - DBUG_PRINT("info",("looking for a Format_description_log_event")); - - if (my_b_tell(rli->cur_log) >= pos) - break; - - /* - Because of we have rli->data_lock and log_lock, we can safely read an - event - */ - if (!(ev= Log_event::read_log_event(rli->cur_log, 0, - rli->relay_log.description_event_for_exec, - opt_slave_sql_verify_checksum))) - { - DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d", - rli->cur_log->error)); - if (rli->cur_log->error) /* not EOF */ - { - *errmsg= "I/O error reading event at position 4"; - goto err; - } - break; - } - else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) - { - DBUG_PRINT("info",("found Format_description_log_event")); - delete rli->relay_log.description_event_for_exec; - rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev; - /* - As ev was returned by read_log_event, it has passed is_valid(), so - my_malloc() in ctor worked, no need to check again. - */ - /* - Ok, we found a Format_description event. But it is not sure that this - describes the whole relay log; indeed, one can have this sequence - (starting from position 4): - Format_desc (of slave) - Rotate (of master) - Format_desc (of master) - So the Format_desc which really describes the rest of the relay log - is the 3rd event (it can't be further than that, because we rotate - the relay log when we queue a Rotate event from the master). - But what describes the Rotate is the first Format_desc. - So what we do is: - go on searching for Format_description events, until you exceed the - position (argument 'pos') or until you find another event than Rotate - or Format_desc. - */ - } - else - { - DBUG_PRINT("info",("found event of another type=%d", - ev->get_type_code())); - look_for_description_event= (ev->get_type_code() == ROTATE_EVENT); - delete ev; - } + Format_description_log_event *fdev; + if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg))) + goto err; + delete rli->relay_log.description_event_for_exec; + rli->relay_log.description_event_for_exec= fdev; } my_b_seek(rli->cur_log,(off_t)pos); #ifndef DBUG_OFF @@ -956,11 +986,11 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, if (rgi->is_parallel_exec) { /* In case of parallel replication, do not update the position backwards. */ - int cmp= strcmp(group_relay_log_name, event_relay_log_name); + int cmp= strcmp(group_relay_log_name, rgi->event_relay_log_name); if (cmp < 0) { group_relay_log_pos= rgi->future_event_relay_log_pos; - strmake_buf(group_relay_log_name, event_relay_log_name); + strmake_buf(group_relay_log_name, rgi->event_relay_log_name); notify_group_relay_log_name_update(); } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos) group_relay_log_pos= rgi->future_event_relay_log_pos; @@ -1360,6 +1390,7 @@ Relay_log_info::alloc_inuse_relaylog(const char *name) my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); return 1; } + ir->rli= this; strmake_buf(ir->name, name); if (!inuse_relaylog_list) @@ -1686,6 +1717,11 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) trans_rollback_stmt(thd); // if a "statement transaction" /* trans_rollback() also resets OPTION_GTID_BEGIN */ trans_rollback(thd); // if a "real transaction" + /* + Now that we have rolled back the transaction, make sure we do not + errorneously update the GTID position. + */ + gtid_pending= false; } m_table_map.clear_tables(); slave_close_thread_tables(thd); |