summaryrefslogtreecommitdiff
path: root/sql/slave.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/slave.cc')
-rw-r--r--sql/slave.cc54
1 files changed, 33 insertions, 21 deletions
diff --git a/sql/slave.cc b/sql/slave.cc
index 359f4f8af9c..ae335a24811 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -623,8 +623,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL))
{
DBUG_PRINT("info",("Terminating SQL thread"));
- if (opt_slave_parallel_threads > 0 &&
- mi->rli.abort_slave && mi->rli.stop_for_until)
+ if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until)
{
mi->rli.stop_for_until= false;
mi->rli.parallel.stop_during_until();
@@ -2732,8 +2731,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
else
{
idle= mi->rli.sql_thread_caught_up;
- if (opt_slave_parallel_threads > 0 && idle &&
- !mi->rli.parallel.workers_idle())
+ if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle())
idle= false;
}
if (idle)
@@ -3534,7 +3532,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
the user might be surprised to see a claim that the slave is up to date
long before those queued events are actually executed.
*/
- if (opt_slave_parallel_threads == 0 &&
+ if (!rli->mi->using_parallel() &&
!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0)))
{
rli->last_master_timestamp= ev->when + (time_t) ev->exec_time;
@@ -3585,9 +3583,16 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
update_state_of_relay_log(rli, ev);
- if (opt_slave_parallel_threads > 0)
+ if (rli->mi->using_parallel())
{
int res= rli->parallel.do_event(serial_rgi, ev, event_size);
+ /*
+ In parallel replication, we need to update the relay log position
+ immediately so that it will be the correct position from which to
+ read the next event.
+ */
+ if (res == 0)
+ rli->event_relay_log_pos= rli->future_event_relay_log_pos;
if (res >= 0)
DBUG_RETURN(res);
/*
@@ -4585,7 +4590,21 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false;
- rli->gtid_skip_flag = GTID_SKIP_NOT;
+ if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() &&
+ rli->restart_gtid_pos.count() > 0)
+ {
+ /*
+ With parallel replication in GTID mode, if we have a multi-domain GTID
+ position, we need to start some way back in the relay log and skip any
+ GTID that was already applied before. Since event groups can be split
+ across multiple relay logs, this earlier starting point may be in the
+ middle of an already applied event group, so we also need to skip any
+ remaining part of such group.
+ */
+ rli->gtid_skip_flag = GTID_SKIP_TRANSACTION;
+ }
+ else
+ rli->gtid_skip_flag = GTID_SKIP_NOT;
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
@@ -4594,11 +4613,11 @@ pthread_handler_t handle_slave_sql(void *arg)
{
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
"Error initializing relay log position: %s", errmsg);
- goto err;
+ goto err_before_start;
}
rli->reset_inuse_relaylog();
if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
- goto err;
+ goto err_before_start;
strcpy(rli->future_event_master_log_name, rli->group_master_log_name);
THD_CHECK_SENTRY(thd);
@@ -4780,7 +4799,8 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
}
}
- if (opt_slave_parallel_threads > 0)
+ err:
+ if (mi->using_parallel())
rli->parallel.wait_for_done(thd, rli);
/* Thread stopped. Print the current replication position to the log */
@@ -4799,15 +4819,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
tmp.c_ptr_safe());
}
- err:
-
- /*
- Once again, in case we aborted with an error and skipped the first one.
- (We want the first one to be before the printout of stop position to
- get the correct position printed.)
- */
- if (opt_slave_parallel_threads > 0)
- rli->parallel.wait_for_done(thd, rli);
+ err_before_start:
/*
Some events set some playgrounds, which won't be cleared because thread
@@ -4830,7 +4842,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
ulong domain_count;
flush_relay_log_info(rli);
- if (opt_slave_parallel_threads > 0)
+ if (mi->using_parallel())
{
/*
In parallel replication GTID mode, we may stop with different domains
@@ -6548,7 +6560,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size)
llstr(my_b_tell(cur_log),llbuf1),
llstr(rli->event_relay_log_pos,llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(opt_slave_parallel_threads > 0 ||
+ DBUG_ASSERT(rli->mi->using_parallel() ||
my_b_tell(cur_log) == rli->event_relay_log_pos);
}
#endif