diff options
Diffstat (limited to 'sql/slave.cc')
-rw-r--r-- | sql/slave.cc | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/sql/slave.cc b/sql/slave.cc index 359f4f8af9c..ae335a24811 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -623,8 +623,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock) if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) { DBUG_PRINT("info",("Terminating SQL thread")); - if (opt_slave_parallel_threads > 0 && - mi->rli.abort_slave && mi->rli.stop_for_until) + if (mi->using_parallel() && mi->rli.abort_slave && mi->rli.stop_for_until) { mi->rli.stop_for_until= false; mi->rli.parallel.stop_during_until(); @@ -2732,8 +2731,7 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full, else { idle= mi->rli.sql_thread_caught_up; - if (opt_slave_parallel_threads > 0 && idle && - !mi->rli.parallel.workers_idle()) + if (mi->using_parallel() && idle && !mi->rli.parallel.workers_idle()) idle= false; } if (idle) @@ -3534,7 +3532,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, the user might be surprised to see a claim that the slave is up to date long before those queued events are actually executed. */ - if (opt_slave_parallel_threads == 0 && + if (!rli->mi->using_parallel() && !(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) { rli->last_master_timestamp= ev->when + (time_t) ev->exec_time; @@ -3585,9 +3583,16 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, update_state_of_relay_log(rli, ev); - if (opt_slave_parallel_threads > 0) + if (rli->mi->using_parallel()) { int res= rli->parallel.do_event(serial_rgi, ev, event_size); + /* + In parallel replication, we need to update the relay log position + immediately so that it will be the correct position from which to + read the next event. + */ + if (res == 0) + rli->event_relay_log_pos= rli->future_event_relay_log_pos; if (res >= 0) DBUG_RETURN(res); /* @@ -4585,7 +4590,21 @@ pthread_handler_t handle_slave_sql(void *arg) serial_rgi->gtid_sub_id= 0; serial_rgi->gtid_pending= false; - rli->gtid_skip_flag = GTID_SKIP_NOT; + if (mi->using_gtid != Master_info::USE_GTID_NO && mi->using_parallel() && + rli->restart_gtid_pos.count() > 0) + { + /* + With parallel replication in GTID mode, if we have a multi-domain GTID + position, we need to start some way back in the relay log and skip any + GTID that was already applied before. Since event groups can be split + across multiple relay logs, this earlier starting point may be in the + middle of an already applied event group, so we also need to skip any + remaining part of such group. + */ + rli->gtid_skip_flag = GTID_SKIP_TRANSACTION; + } + else + rli->gtid_skip_flag = GTID_SKIP_NOT; if (init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, @@ -4594,11 +4613,11 @@ pthread_handler_t handle_slave_sql(void *arg) { rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL, "Error initializing relay log position: %s", errmsg); - goto err; + goto err_before_start; } rli->reset_inuse_relaylog(); if (rli->alloc_inuse_relaylog(rli->group_relay_log_name)) - goto err; + goto err_before_start; strcpy(rli->future_event_master_log_name, rli->group_master_log_name); THD_CHECK_SENTRY(thd); @@ -4780,7 +4799,8 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, } } - if (opt_slave_parallel_threads > 0) + err: + if (mi->using_parallel()) rli->parallel.wait_for_done(thd, rli); /* Thread stopped. Print the current replication position to the log */ @@ -4799,15 +4819,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, tmp.c_ptr_safe()); } - err: - - /* - Once again, in case we aborted with an error and skipped the first one. - (We want the first one to be before the printout of stop position to - get the correct position printed.) - */ - if (opt_slave_parallel_threads > 0) - rli->parallel.wait_for_done(thd, rli); + err_before_start: /* Some events set some playgrounds, which won't be cleared because thread @@ -4830,7 +4842,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME, ulong domain_count; flush_relay_log_info(rli); - if (opt_slave_parallel_threads > 0) + if (mi->using_parallel()) { /* In parallel replication GTID mode, we may stop with different domains @@ -6548,7 +6560,7 @@ static Log_event* next_event(rpl_group_info *rgi, ulonglong *event_size) llstr(my_b_tell(cur_log),llbuf1), llstr(rli->event_relay_log_pos,llbuf2))); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(opt_slave_parallel_threads > 0 || + DBUG_ASSERT(rli->mi->using_parallel() || my_b_tell(cur_log) == rli->event_relay_log_pos); } #endif |