summaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorKristian Nielsen <knielsen@knielsen-hq.org>2016-10-16 23:48:59 +0200
committerKristian Nielsen <knielsen@knielsen-hq.org>2016-10-16 23:48:59 +0200
commitc4776d3b2abfd5d1cf1d4d093067718e66a279fc (patch)
treee823d9b586d14698ab36383a12fbfadd4d03ea72 /sql
parented4a6f12b3db90de2168273871e7153fb458aee6 (diff)
parent50f19ca8099994e992e1b411c7c05287855a7bdd (diff)
downloadmariadb-git-c4776d3b2abfd5d1cf1d4d093067718e66a279fc.tar.gz
Merge "remove unnecessary global mutex in parallel replication" into 10.1.0.1
Diffstat (limited to 'sql')
-rw-r--r--sql/rpl_parallel.cc4
-rw-r--r--sql/slave.cc130
-rw-r--r--sql/slave.h5
3 files changed, 96 insertions, 43 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 761471fc1cb..c507a132374 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -47,9 +47,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
if (!(ev->is_artificial_event() || ev->is_relay_log_event() ||
(ev->when == 0)))
rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time;
- mysql_mutex_lock(&rli->data_lock);
- /* Mutex will be released in apply_event_and_update_pos(). */
- err= apply_event_and_update_pos(ev, thd, rgi, rpt);
+ err= apply_event_and_update_pos_for_parallel(ev, thd, rgi);
thread_safe_increment64(&rli->executed_entries);
/* ToDo: error handling. */
diff --git a/sql/slave.cc b/sql/slave.cc
index 4981819577f..a55d26b1aa0 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -3377,39 +3377,17 @@ has_temporary_error(THD *thd)
}
-/**
- Applies the given event and advances the relay log position.
-
- In essence, this function does:
-
- @code
- ev->apply_event(rli);
- ev->update_pos(rli);
- @endcode
-
- But it also does some maintainance, such as skipping events if
- needed and reporting errors.
-
- If the @c skip flag is set, then it is tested whether the event
- should be skipped, by looking at the slave_skip_counter and the
- server id. The skip flag should be set when calling this from a
- replication thread but not set when executing an explicit BINLOG
- statement.
-
- @retval 0 OK.
-
- @retval 1 Error calling ev->apply_event().
+/*
+ First half of apply_event_and_update_pos(), see below.
+ Setup some THD variables for applying the event.
- @retval 2 No error calling ev->apply_event(), but error calling
- ev->update_pos().
+ Split out so that it can run with rli->data_lock held in non-parallel
+ replication, but without the mutex held in the parallel case.
*/
-int apply_event_and_update_pos(Log_event* ev, THD* thd,
- rpl_group_info *rgi,
- rpl_parallel_thread *rpt)
+static int
+apply_event_and_update_pos_setup(Log_event* ev, THD* thd, rpl_group_info *rgi)
{
- int exec_res= 0;
- Relay_log_info* rli= rgi->rli;
- DBUG_ENTER("apply_event_and_update_pos");
+ DBUG_ENTER("apply_event_and_update_pos_setup");
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
ev->get_type_str(), ev->get_type_code(),
@@ -3459,13 +3437,23 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
ev->thd = thd; // because up to this point, ev->thd == 0
- int reason= ev->shall_skip(rgi);
- if (reason == Log_event::EVENT_SKIP_COUNT)
- {
- DBUG_ASSERT(rli->slave_skip_counter > 0);
- rli->slave_skip_counter--;
- }
- mysql_mutex_unlock(&rli->data_lock);
+ DBUG_RETURN(ev->shall_skip(rgi));
+}
+
+
+/*
+ Second half of apply_event_and_update_pos(), see below.
+
+ Do the actual event apply (or skip), and position update.
+ */
+static int
+apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi,
+ int reason)
+{
+ int exec_res= 0;
+ Relay_log_info* rli= rgi->rli;
+
+ DBUG_ENTER("apply_event_and_update_pos_apply");
DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event",
{
DBUG_ASSERT(!debug_sync_set_action
@@ -3554,6 +3542,72 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
/**
+ Applies the given event and advances the relay log position.
+
+ In essence, this function does:
+
+ @code
+ ev->apply_event(rli);
+ ev->update_pos(rli);
+ @endcode
+
+ But it also does some maintainance, such as skipping events if
+ needed and reporting errors.
+
+ If the @c skip flag is set, then it is tested whether the event
+ should be skipped, by looking at the slave_skip_counter and the
+ server id. The skip flag should be set when calling this from a
+ replication thread but not set when executing an explicit BINLOG
+ statement.
+
+ @retval 0 OK.
+
+ @retval 1 Error calling ev->apply_event().
+
+ @retval 2 No error calling ev->apply_event(), but error calling
+ ev->update_pos().
+
+ This function is only used in non-parallel replication, where it is called
+ with rli->data_lock held; this lock is released during this function.
+*/
+int
+apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi)
+{
+ Relay_log_info* rli= rgi->rli;
+ mysql_mutex_assert_owner(&rli->data_lock);
+ int reason= apply_event_and_update_pos_setup(ev, thd, rgi);
+ if (reason == Log_event::EVENT_SKIP_COUNT)
+ {
+ DBUG_ASSERT(rli->slave_skip_counter > 0);
+ rli->slave_skip_counter--;
+ }
+ mysql_mutex_unlock(&rli->data_lock);
+ return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
+}
+
+
+/*
+ The version of above apply_event_and_update_pos() used in parallel
+ replication. Unlike the non-parallel case, this function is called without
+ rli->data_lock held.
+*/
+int
+apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
+ rpl_group_info *rgi)
+{
+ Relay_log_info* rli= rgi->rli;
+ mysql_mutex_assert_not_owner(&rli->data_lock);
+ int reason= apply_event_and_update_pos_setup(ev, thd, rgi);
+ /*
+ In parallel replication, sql_slave_skip_counter is handled in the SQL
+ driver thread, so 23 should never see EVENT_SKIP_COUNT here.
+ */
+ DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT);
+ return apply_event_and_update_pos_apply(ev, thd, rgi, reason);
+}
+
+
+/**
Keep the relay log transaction state up to date.
The state reflects how things are after the given event, that has just been
@@ -3803,7 +3857,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos;
serial_rgi->event_relay_log_name= rli->event_relay_log_name;
serial_rgi->event_relay_log_pos= rli->event_relay_log_pos;
- exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL);
+ exec_res= apply_event_and_update_pos(ev, thd, serial_rgi);
#ifdef WITH_WSREP
WSREP_DEBUG("apply_event_and_update_pos() result: %d", exec_res);
diff --git a/sql/slave.h b/sql/slave.h
index e8a925ce560..a78ae4cff6f 100644
--- a/sql/slave.h
+++ b/sql/slave.h
@@ -243,8 +243,9 @@ void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi);
int rotate_relay_log(Master_info* mi);
int has_temporary_error(THD *thd);
int apply_event_and_update_pos(Log_event* ev, THD* thd,
- struct rpl_group_info *rgi,
- rpl_parallel_thread *rpt);
+ struct rpl_group_info *rgi);
+int apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd,
+ struct rpl_group_info *rgi);
pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);