summaryrefslogtreecommitdiff
path: root/sql/log_event.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event.cc')
-rw-r--r--sql/log_event.cc39
1 files changed, 27 insertions, 12 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 7f0933792be..4cebcb97461 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -3984,6 +3984,8 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error)
case ER_AUTOINC_READ_FAILED:
return (actual_error == ER_AUTOINC_READ_FAILED ||
actual_error == HA_ERR_AUTOINC_ERANGE);
+ case ER_UNKNOWN_TABLE:
+ return actual_error == ER_IT_IS_A_VIEW;
default:
break;
}
@@ -4018,6 +4020,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
rpl_gtid gtid;
Relay_log_info const *rli= rgi->rli;
Rpl_filter *rpl_filter= rli->mi->rpl_filter;
+ bool current_stmt_is_commit;
DBUG_ENTER("Query_log_event::do_apply_event");
/*
@@ -4044,7 +4047,9 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
- if (strcmp("COMMIT", query) == 0 && rgi->tables_to_lock)
+ current_stmt_is_commit= is_commit();
+
+ if (current_stmt_is_commit && rgi->tables_to_lock)
{
/*
Cleaning-up the last statement context:
@@ -4093,9 +4098,11 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
thd->variables.pseudo_thread_id= thread_id; // for temp tables
DBUG_PRINT("query",("%s", thd->query()));
- if (ignored_error_code((expected_error= error_code)) ||
- !unexpected_error_code(expected_error))
+ if (!(expected_error= error_code) ||
+ ignored_error_code(expected_error) ||
+ !unexpected_error_code(expected_error))
{
+ thd->slave_expected_error= expected_error;
if (flags2_inited)
/*
all bits of thd->variables.option_bits which are 1 in OPTIONS_WRITTEN_TO_BIN_LOG
@@ -4197,12 +4204,13 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
Record any GTID in the same transaction, so slave state is
transactionally consistent.
*/
- if (strcmp("COMMIT", query) == 0 && (sub_id= rgi->gtid_sub_id))
+ if (current_stmt_is_commit && (sub_id= rgi->gtid_sub_id))
{
/* Clear the GTID from the RLI so we don't accidentally reuse it. */
rgi->gtid_sub_id= 0;
gtid= rgi->current_gtid;
+ thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
if (rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false))
{
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
@@ -4232,6 +4240,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
concurrency_error_code(expected_error)))
{
thd->variables.option_bits|= OPTION_MASTER_SQL_ERROR;
+ thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
}
/* Execute the query (note that we bypass dispatch_command()) */
Parser_state parser_state;
@@ -4395,8 +4404,7 @@ Default database: '%s'. Query: '%s'",
to shutdown trying to finish incomplete events group.
*/
DBUG_EXECUTE_IF("stop_slave_middle_group",
- if (strcmp("COMMIT", query) != 0 &&
- strcmp("BEGIN", query) != 0)
+ if (!current_stmt_is_commit && is_begin() == 0)
{
if (thd->transaction.all.modified_non_trans_table)
const_cast<Relay_log_info*>(rli)->abort_slave= 1;
@@ -4457,7 +4465,7 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Query_log_event::do_shall_skip");
- DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len));
+ DBUG_PRINT("debug", ("query: '%s' q_len: %d", query, q_len));
DBUG_ASSERT(query && q_len > 0);
DBUG_ASSERT(thd == rgi->thd);
@@ -4473,13 +4481,13 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
{
if (is_begin())
{
- thd->variables.option_bits|= OPTION_BEGIN;
+ thd->variables.option_bits|= OPTION_BEGIN | OPTION_GTID_BEGIN;
DBUG_RETURN(Log_event::continue_group(rgi));
}
if (is_commit() || is_rollback())
{
- thd->variables.option_bits&= ~OPTION_BEGIN;
+ thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN);
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
}
@@ -5906,6 +5914,7 @@ error:
thd->reset_query();
thd->get_stmt_da()->set_overwrite_status(true);
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
+ thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN);
thd->get_stmt_da()->set_overwrite_status(false);
close_thread_tables(thd);
/*
@@ -6408,8 +6417,7 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
{
if (*need_dummy_event)
return Query_log_event::dummy_event(packet, ev_offset, checksum_alg);
- else
- return 0;
+ return 0;
}
*need_dummy_event= true;
@@ -6456,10 +6464,16 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
this->server_id, this->seq_no))
return 1;
}
+
+ DBUG_ASSERT((thd->variables.option_bits & OPTION_GTID_BEGIN) == 0);
if (flags2 & FL_STANDALONE)
return 0;
/* Execute this like a BEGIN query event. */
+ thd->variables.option_bits|= OPTION_BEGIN | OPTION_GTID_BEGIN;
+ DBUG_PRINT("info", ("Set OPTION_GTID_BEGIN"));
+ trans_begin(thd, 0);
+
thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1,
&my_charset_bin, next_query_id());
Parser_state parser_state;
@@ -7252,6 +7266,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
+ thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
res= trans_commit(thd); /* Automatically rolls back on error. */
thd->mdl_context.release_transactional_locks();
@@ -7273,7 +7288,7 @@ Xid_log_event::do_shall_skip(rpl_group_info *rgi)
if (rgi->rli->slave_skip_counter > 0)
{
DBUG_ASSERT(!rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION));
- thd->variables.option_bits&= ~OPTION_BEGIN;
+ thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN);
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
DBUG_RETURN(Log_event::do_shall_skip(rgi));