summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/rpl_parallel.cc6
-rw-r--r--sql/rpl_rli.cc65
-rw-r--r--sql/rpl_rli.h21
-rw-r--r--sql/slave.cc31
4 files changed, 103 insertions, 20 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 027aa5f628c..3871ddcf8ef 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -75,18 +75,18 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
/* Do not update position if an earlier event group caused an error abort. */
DBUG_ASSERT(qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE);
+ rli= qev->rgi->rli;
e= qev->entry_for_queued;
- if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX || e->force_abort)
+ if (e->stop_on_error_sub_id < (uint64)ULONGLONG_MAX ||
+ (e->force_abort && !rli->stop_for_until))
return;
- rli= qev->rgi->rli;
mysql_mutex_lock(&rli->data_lock);
cmp= strcmp(rli->group_relay_log_name, qev->event_relay_log_name);
if (cmp < 0)
{
rli->group_relay_log_pos= qev->future_event_relay_log_pos;
strmake_buf(rli->group_relay_log_name, qev->event_relay_log_name);
- rli->notify_group_relay_log_name_update();
} else if (cmp == 0 &&
rli->group_relay_log_pos < qev->future_event_relay_log_pos)
rli->group_relay_log_pos= qev->future_event_relay_log_pos;
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 3e1bb28f701..7215cdd4b96 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -60,6 +60,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
sql_delay(0), sql_delay_end(0),
+ until_relay_log_names_defer(false),
m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -499,6 +500,8 @@ void Relay_log_info::clear_until_condition()
until_condition= Relay_log_info::UNTIL_NONE;
until_log_name[0]= 0;
until_log_pos= 0;
+ until_relay_log_names_defer= false;
+
DBUG_VOID_RETURN;
}
@@ -989,7 +992,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
{
group_relay_log_pos= rgi->future_event_relay_log_pos;
strmake_buf(group_relay_log_name, rgi->event_relay_log_name);
- notify_group_relay_log_name_update();
} else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
group_relay_log_pos= rgi->future_event_relay_log_pos;
@@ -1279,29 +1281,78 @@ err:
autoincrement or if we have transactions).
Should be called ONLY if until_condition != UNTIL_NONE !
+
+ In the parallel execution mode and UNTIL_MASTER_POS the file name is
+ presented by future_event_master_log_name which may be ahead of
+ group_master_log_name. Log_event::log_pos does relate to it nevertheless
+ so the pair comprises a correct binlog coordinate.
+ Internal group events and events that have zero log_pos also
+ produce the zero for the local log_pos which may not lead to the
+ function falsely return true.
+ In UNTIL_RELAY_POS the original caching and notification are simplified
+ to straightforward files comparison when the current event can't be
+ a part of an event group.
+
RETURN VALUE
true - condition met or error happened (condition seems to have
bad log file name)
false - condition not met
*/
-bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
+bool Relay_log_info::is_until_satisfied(Log_event *ev)
{
const char *log_name;
ulonglong log_pos;
+ /* Prevents stopping within transaction; needed solely for Relay UNTIL. */
+ bool in_trans= false;
+
DBUG_ENTER("Relay_log_info::is_until_satisfied");
if (until_condition == UNTIL_MASTER_POS)
{
log_name= (mi->using_parallel() ? future_event_master_log_name
: group_master_log_name);
- log_pos= master_beg_pos;
+ log_pos= (get_flag(Relay_log_info::IN_TRANSACTION) || !ev || !ev->log_pos) ?
+ (mi->using_parallel() ? 0 : group_master_log_pos) :
+ ev->log_pos - ev->data_written;
}
else
{
DBUG_ASSERT(until_condition == UNTIL_RELAY_POS);
- log_name= group_relay_log_name;
- log_pos= group_relay_log_pos;
+ if (!mi->using_parallel())
+ {
+ log_name= group_relay_log_name;
+ log_pos= group_relay_log_pos;
+ }
+ else
+ {
+ log_name= event_relay_log_name;
+ log_pos= event_relay_log_pos;
+ in_trans= get_flag(Relay_log_info::IN_TRANSACTION);
+ /*
+ until_log_names_cmp_result is set to UNKNOWN either
+ - by a non-group event *and* only when it is in the middle of a group
+ - or by a group event when the preceding group made the above
+ non-group event to defer the resetting.
+ */
+ if ((ev && !Log_event::is_group_event(ev->get_type_code())))
+ {
+ if (in_trans)
+ {
+ until_relay_log_names_defer= true;
+ }
+ else
+ {
+ until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
+ until_relay_log_names_defer= false;
+ }
+ }
+ else if (!in_trans && until_relay_log_names_defer)
+ {
+ until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
+ until_relay_log_names_defer= false;
+ }
+ }
}
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%llu",
@@ -1355,8 +1406,8 @@ bool Relay_log_info::is_until_satisfied(my_off_t master_beg_pos)
}
DBUG_RETURN(((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
- log_pos >= until_log_pos) ||
- until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
+ (log_pos >= until_log_pos && !in_trans)) ||
+ until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER));
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 4ec4821b67d..2bc0a80268a 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -219,7 +219,7 @@ public:
*/
char future_event_master_log_name[FN_REFLEN];
- /*
+ /*
Original log name and position of the group we're currently executing
(whose coordinates are group_relay_log_name/pos in the relay log)
in the master's binlog. These concern the *group*, because in the master's
@@ -419,7 +419,7 @@ public:
void close_temporary_tables();
/* Check if UNTIL condition is satisfied. See slave.cc for more. */
- bool is_until_satisfied(my_off_t);
+ bool is_until_satisfied(Log_event *ev);
inline ulonglong until_pos()
{
DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
@@ -427,7 +427,13 @@ public:
return ((until_condition == UNTIL_MASTER_POS) ? group_master_log_pos :
group_relay_log_pos);
}
-
+ inline char *until_name()
+ {
+ DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
+ until_condition == UNTIL_RELAY_POS);
+ return ((until_condition == UNTIL_MASTER_POS) ? group_master_log_name :
+ group_relay_log_name);
+ }
/**
Helper function to do after statement completion.
@@ -564,6 +570,15 @@ private:
relay_log.info had 4 lines. Now it has 5 lines.
*/
static const int LINES_IN_RELAY_LOG_INFO_WITH_DELAY= 5;
+ /*
+ Hint for when to stop event distribution by sql driver thread.
+ The flag is set ON by a non-group event when this event is in the middle
+ of a group (e.g a transaction group) so it's too early
+ to refresh the current-relay-log vs until-log cached comparison result.
+ And it is checked and to decide whether it's a right time to do so
+ when the being processed group has been fully scheduled.
+ */
+ bool until_relay_log_names_defer;
/*
Holds the state of the data in the relay log.
diff --git a/sql/slave.cc b/sql/slave.cc
index eb95afd140b..1bf83aa9652 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3921,12 +3921,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
(ev->server_id != global_system_variables.server_id ||
rli->replicate_same_server_id) &&
- rli->is_until_satisfied((rli->get_flag(Relay_log_info::IN_TRANSACTION) || !ev->log_pos)
- ? rli->group_master_log_pos
- : ev->log_pos - ev->data_written))
+ rli->is_until_satisfied(ev))
{
- sql_print_information("Slave SQL thread stopped because it reached its"
- " UNTIL position %llu", rli->until_pos());
/*
Setting abort_slave flag because we do not want additional
message about error in query execution to be printed.
@@ -5136,10 +5132,14 @@ pthread_handler_t handle_slave_sql(void *arg)
}
if ((rli->until_condition == Relay_log_info::UNTIL_MASTER_POS ||
rli->until_condition == Relay_log_info::UNTIL_RELAY_POS) &&
- rli->is_until_satisfied(rli->group_master_log_pos))
+ rli->is_until_satisfied(NULL))
{
sql_print_information("Slave SQL thread stopped because it reached its"
- " UNTIL position %llu", rli->until_pos());
+ " UNTIL position %llu in %s %s file",
+ rli->until_pos(), rli->until_name(),
+ rli->until_condition ==
+ Relay_log_info::UNTIL_MASTER_POS ?
+ "binlog" : "relaylog");
mysql_mutex_unlock(&rli->data_lock);
goto err;
}
@@ -5205,7 +5205,24 @@ pthread_handler_t handle_slave_sql(void *arg)
err:
if (mi->using_parallel())
rli->parallel.wait_for_done(thd, rli);
+ /* Gtid_list_log_event::do_apply_event has already reported the GTID until */
+ if (rli->stop_for_until && rli->until_condition != Relay_log_info::UNTIL_GTID)
+ {
+ if (global_system_variables.log_warnings > 2)
+ sql_print_information("Slave SQL thread UNTIL stop was requested at position "
+ "%llu in %s %s file",
+ rli->until_log_pos, rli->until_log_name,
+ rli->until_condition ==
+ Relay_log_info::UNTIL_MASTER_POS ?
+ "binlog" : "relaylog");
+ sql_print_information("Slave SQL thread stopped because it reached its"
+ " UNTIL position %llu in %s %s file",
+ rli->until_pos(), rli->until_name(),
+ rli->until_condition ==
+ Relay_log_info::UNTIL_MASTER_POS ?
+ "binlog" : "relaylog");
+ };
/* Thread stopped. Print the current replication position to the log */
{
StringBuffer<100> tmp;