diff options
author | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-10-16 23:48:59 +0200 |
---|---|---|
committer | Kristian Nielsen <knielsen@knielsen-hq.org> | 2016-10-16 23:48:59 +0200 |
commit | c4776d3b2abfd5d1cf1d4d093067718e66a279fc (patch) | |
tree | e823d9b586d14698ab36383a12fbfadd4d03ea72 /sql | |
parent | ed4a6f12b3db90de2168273871e7153fb458aee6 (diff) | |
parent | 50f19ca8099994e992e1b411c7c05287855a7bdd (diff) | |
download | mariadb-git-c4776d3b2abfd5d1cf1d4d093067718e66a279fc.tar.gz |
Merge "remove unnecessary global mutex in parallel replication" into 10.1.0.1
Diffstat (limited to 'sql')
-rw-r--r-- | sql/rpl_parallel.cc | 4 | ||||
-rw-r--r-- | sql/slave.cc | 130 | ||||
-rw-r--r-- | sql/slave.h | 5 |
3 files changed, 96 insertions, 43 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 761471fc1cb..c507a132374 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -47,9 +47,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, if (!(ev->is_artificial_event() || ev->is_relay_log_event() || (ev->when == 0))) rgi->last_master_timestamp= ev->when + (time_t)ev->exec_time; - mysql_mutex_lock(&rli->data_lock); - /* Mutex will be released in apply_event_and_update_pos(). */ - err= apply_event_and_update_pos(ev, thd, rgi, rpt); + err= apply_event_and_update_pos_for_parallel(ev, thd, rgi); thread_safe_increment64(&rli->executed_entries); /* ToDo: error handling. */ diff --git a/sql/slave.cc b/sql/slave.cc index 4981819577f..a55d26b1aa0 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3377,39 +3377,17 @@ has_temporary_error(THD *thd) } -/** - Applies the given event and advances the relay log position. - - In essence, this function does: - - @code - ev->apply_event(rli); - ev->update_pos(rli); - @endcode - - But it also does some maintainance, such as skipping events if - needed and reporting errors. - - If the @c skip flag is set, then it is tested whether the event - should be skipped, by looking at the slave_skip_counter and the - server id. The skip flag should be set when calling this from a - replication thread but not set when executing an explicit BINLOG - statement. - - @retval 0 OK. - - @retval 1 Error calling ev->apply_event(). +/* + First half of apply_event_and_update_pos(), see below. + Setup some THD variables for applying the event. - @retval 2 No error calling ev->apply_event(), but error calling - ev->update_pos(). + Split out so that it can run with rli->data_lock held in non-parallel + replication, but without the mutex held in the parallel case. */ -int apply_event_and_update_pos(Log_event* ev, THD* thd, - rpl_group_info *rgi, - rpl_parallel_thread *rpt) +static int +apply_event_and_update_pos_setup(Log_event* ev, THD* thd, rpl_group_info *rgi) { - int exec_res= 0; - Relay_log_info* rli= rgi->rli; - DBUG_ENTER("apply_event_and_update_pos"); + DBUG_ENTER("apply_event_and_update_pos_setup"); DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)", ev->get_type_str(), ev->get_type_code(), @@ -3459,13 +3437,23 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); ev->thd = thd; // because up to this point, ev->thd == 0 - int reason= ev->shall_skip(rgi); - if (reason == Log_event::EVENT_SKIP_COUNT) - { - DBUG_ASSERT(rli->slave_skip_counter > 0); - rli->slave_skip_counter--; - } - mysql_mutex_unlock(&rli->data_lock); + DBUG_RETURN(ev->shall_skip(rgi)); +} + + +/* + Second half of apply_event_and_update_pos(), see below. + + Do the actual event apply (or skip), and position update. + */ +static int +apply_event_and_update_pos_apply(Log_event* ev, THD* thd, rpl_group_info *rgi, + int reason) +{ + int exec_res= 0; + Relay_log_info* rli= rgi->rli; + + DBUG_ENTER("apply_event_and_update_pos_apply"); DBUG_EXECUTE_IF("inject_slave_sql_before_apply_event", { DBUG_ASSERT(!debug_sync_set_action @@ -3554,6 +3542,72 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, /** + Applies the given event and advances the relay log position. + + In essence, this function does: + + @code + ev->apply_event(rli); + ev->update_pos(rli); + @endcode + + But it also does some maintainance, such as skipping events if + needed and reporting errors. + + If the @c skip flag is set, then it is tested whether the event + should be skipped, by looking at the slave_skip_counter and the + server id. The skip flag should be set when calling this from a + replication thread but not set when executing an explicit BINLOG + statement. + + @retval 0 OK. + + @retval 1 Error calling ev->apply_event(). + + @retval 2 No error calling ev->apply_event(), but error calling + ev->update_pos(). + + This function is only used in non-parallel replication, where it is called + with rli->data_lock held; this lock is released during this function. +*/ +int +apply_event_and_update_pos(Log_event* ev, THD* thd, rpl_group_info *rgi) +{ + Relay_log_info* rli= rgi->rli; + mysql_mutex_assert_owner(&rli->data_lock); + int reason= apply_event_and_update_pos_setup(ev, thd, rgi); + if (reason == Log_event::EVENT_SKIP_COUNT) + { + DBUG_ASSERT(rli->slave_skip_counter > 0); + rli->slave_skip_counter--; + } + mysql_mutex_unlock(&rli->data_lock); + return apply_event_and_update_pos_apply(ev, thd, rgi, reason); +} + + +/* + The version of above apply_event_and_update_pos() used in parallel + replication. Unlike the non-parallel case, this function is called without + rli->data_lock held. +*/ +int +apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd, + rpl_group_info *rgi) +{ + Relay_log_info* rli= rgi->rli; + mysql_mutex_assert_not_owner(&rli->data_lock); + int reason= apply_event_and_update_pos_setup(ev, thd, rgi); + /* + In parallel replication, sql_slave_skip_counter is handled in the SQL + driver thread, so 23 should never see EVENT_SKIP_COUNT here. + */ + DBUG_ASSERT(reason != Log_event::EVENT_SKIP_COUNT); + return apply_event_and_update_pos_apply(ev, thd, rgi, reason); +} + + +/** Keep the relay log transaction state up to date. The state reflects how things are after the given event, that has just been @@ -3803,7 +3857,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, serial_rgi->future_event_relay_log_pos= rli->future_event_relay_log_pos; serial_rgi->event_relay_log_name= rli->event_relay_log_name; serial_rgi->event_relay_log_pos= rli->event_relay_log_pos; - exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); + exec_res= apply_event_and_update_pos(ev, thd, serial_rgi); #ifdef WITH_WSREP WSREP_DEBUG("apply_event_and_update_pos() result: %d", exec_res); diff --git a/sql/slave.h b/sql/slave.h index e8a925ce560..a78ae4cff6f 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -243,8 +243,9 @@ void set_slave_thread_default_charset(THD *thd, rpl_group_info *rgi); int rotate_relay_log(Master_info* mi); int has_temporary_error(THD *thd); int apply_event_and_update_pos(Log_event* ev, THD* thd, - struct rpl_group_info *rgi, - rpl_parallel_thread *rpt); + struct rpl_group_info *rgi); +int apply_event_and_update_pos_for_parallel(Log_event* ev, THD* thd, + struct rpl_group_info *rgi); pthread_handler_t handle_slave_io(void *arg); void slave_output_error_info(rpl_group_info *rgi, THD *thd); |