summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc162
1 files changed, 99 insertions, 63 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 754b877f654..629e046ed0a 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -14,6 +14,7 @@
along with this program; if not, write to the Free Software Foundation,
51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
+#include <my_global.h>
#include "sql_priv.h"
#include "unireg.h" // HAVE_*
#include "rpl_mi.h"
@@ -518,6 +519,90 @@ void Relay_log_info::clear_until_condition()
/*
+ Read the correct format description event for starting to replicate from
+ a given position in a relay log file.
+*/
+Format_description_log_event *
+read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
+ const char **errmsg)
+{
+ Log_event *ev;
+ Format_description_log_event *fdev;
+ bool found= false;
+
+ /*
+ By default the relay log is in binlog format 3 (4.0).
+ Even if format is 4, this will work enough to read the first event
+ (Format_desc) (remember that format 4 is just lenghtened compared to format
+ 3; format 3 is a prefix of format 4).
+ */
+ fdev= new Format_description_log_event(3);
+
+ while (!found)
+ {
+ Log_event_type typ;
+
+ /*
+ Read the possible Format_description_log_event; if position
+ was 4, no need, it will be read naturally.
+ */
+ DBUG_PRINT("info",("looking for a Format_description_log_event"));
+
+ if (my_b_tell(cur_log) >= start_pos)
+ break;
+
+ if (!(ev= Log_event::read_log_event(cur_log, 0, fdev,
+ opt_slave_sql_verify_checksum)))
+ {
+ DBUG_PRINT("info",("could not read event, cur_log->error=%d",
+ cur_log->error));
+ if (cur_log->error) /* not EOF */
+ {
+ *errmsg= "I/O error reading event at position 4";
+ delete fdev;
+ return NULL;
+ }
+ break;
+ }
+ typ= ev->get_type_code();
+ if (typ == FORMAT_DESCRIPTION_EVENT)
+ {
+ DBUG_PRINT("info",("found Format_description_log_event"));
+ delete fdev;
+ fdev= (Format_description_log_event*) ev;
+ /*
+ As ev was returned by read_log_event, it has passed is_valid(), so
+ my_malloc() in ctor worked, no need to check again.
+ */
+ /*
+ Ok, we found a Format_description event. But it is not sure that this
+ describes the whole relay log; indeed, one can have this sequence
+ (starting from position 4):
+ Format_desc (of slave)
+ Rotate (of master)
+ Format_desc (of master)
+ So the Format_desc which really describes the rest of the relay log
+ is the 3rd event (it can't be further than that, because we rotate
+ the relay log when we queue a Rotate event from the master).
+ But what describes the Rotate is the first Format_desc.
+ So what we do is:
+ go on searching for Format_description events, until you exceed the
+ position (argument 'pos') or until you find another event than Rotate
+ or Format_desc.
+ */
+ }
+ else
+ {
+ DBUG_PRINT("info",("found event of another type=%d", typ));
+ found= (typ != ROTATE_EVENT);
+ delete ev;
+ }
+ }
+ return fdev;
+}
+
+
+/*
Open the given relay log
SYNOPSIS
@@ -640,68 +725,13 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
*/
if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
{
- Log_event* ev;
- while (look_for_description_event)
+ if (look_for_description_event)
{
- /*
- Read the possible Format_description_log_event; if position
- was 4, no need, it will be read naturally.
- */
- DBUG_PRINT("info",("looking for a Format_description_log_event"));
-
- if (my_b_tell(rli->cur_log) >= pos)
- break;
-
- /*
- Because of we have rli->data_lock and log_lock, we can safely read an
- event
- */
- if (!(ev= Log_event::read_log_event(rli->cur_log, 0,
- rli->relay_log.description_event_for_exec,
- opt_slave_sql_verify_checksum)))
- {
- DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
- rli->cur_log->error));
- if (rli->cur_log->error) /* not EOF */
- {
- *errmsg= "I/O error reading event at position 4";
- goto err;
- }
- break;
- }
- else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
- {
- DBUG_PRINT("info",("found Format_description_log_event"));
- delete rli->relay_log.description_event_for_exec;
- rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
- /*
- As ev was returned by read_log_event, it has passed is_valid(), so
- my_malloc() in ctor worked, no need to check again.
- */
- /*
- Ok, we found a Format_description event. But it is not sure that this
- describes the whole relay log; indeed, one can have this sequence
- (starting from position 4):
- Format_desc (of slave)
- Rotate (of master)
- Format_desc (of master)
- So the Format_desc which really describes the rest of the relay log
- is the 3rd event (it can't be further than that, because we rotate
- the relay log when we queue a Rotate event from the master).
- But what describes the Rotate is the first Format_desc.
- So what we do is:
- go on searching for Format_description events, until you exceed the
- position (argument 'pos') or until you find another event than Rotate
- or Format_desc.
- */
- }
- else
- {
- DBUG_PRINT("info",("found event of another type=%d",
- ev->get_type_code()));
- look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
- delete ev;
- }
+ Format_description_log_event *fdev;
+ if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg)))
+ goto err;
+ delete rli->relay_log.description_event_for_exec;
+ rli->relay_log.description_event_for_exec= fdev;
}
my_b_seek(rli->cur_log,(off_t)pos);
#ifndef DBUG_OFF
@@ -956,11 +986,11 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
if (rgi->is_parallel_exec)
{
/* In case of parallel replication, do not update the position backwards. */
- int cmp= strcmp(group_relay_log_name, event_relay_log_name);
+ int cmp= strcmp(group_relay_log_name, rgi->event_relay_log_name);
if (cmp < 0)
{
group_relay_log_pos= rgi->future_event_relay_log_pos;
- strmake_buf(group_relay_log_name, event_relay_log_name);
+ strmake_buf(group_relay_log_name, rgi->event_relay_log_name);
notify_group_relay_log_name_update();
} else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
group_relay_log_pos= rgi->future_event_relay_log_pos;
@@ -1360,6 +1390,7 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
return 1;
}
+ ir->rli= this;
strmake_buf(ir->name, name);
if (!inuse_relaylog_list)
@@ -1686,6 +1717,11 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
trans_rollback_stmt(thd); // if a "statement transaction"
/* trans_rollback() also resets OPTION_GTID_BEGIN */
trans_rollback(thd); // if a "real transaction"
+ /*
+ Now that we have rolled back the transaction, make sure we do not
+ errorneously update the GTID position.
+ */
+ gtid_pending= false;
}
m_table_map.clear_tables();
slave_close_thread_tables(thd);