summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Widenius <monty@askmonty.org>2013-10-15 00:17:16 +0300
committerMichael Widenius <monty@askmonty.org>2013-10-15 00:17:16 +0300
commit5748eb3ec63adee035ec5305796bdbc67a654158 (patch)
treedbe4b5bff464a97acfd4cc99f49086b1a9ec227a
parent2842f6b5dc254c82aa3dc976cd5bd3645dc82a60 (diff)
downloadmariadb-git-5748eb3ec63adee035ec5305796bdbc67a654158.tar.gz
Moved the remaining variables, that depends on sql execution, from Relay_log_info to rpl_group_info:
-row_stmt_start_timestamp -last_event_start_time -long_find_row_note -trans_retries Added slave_executed_entries_lock to protect rli->executed_entries Added primitives for thread safe 64 bit increment Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread sql/log_event.cc: row_stmt_start and long_find_row_note is now in rpl_group_info sql/mysqld.cc: Added slave_executed_entries_lock to protect rli->executed_entries sql/mysqld.h: Added slave_executed_entries_lock to protect rli->executed_entries Added primitives for thread safe 64 bit increment sql/rpl_parallel.cc: Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread sql/rpl_rli.cc: Moved row_stmt_start_timestamp, last_event_start_time and long_find_row_note from Relay_log_info to rpl_group_info sql/rpl_rli.h: Moved trans_retries, row_stmt_start_timestamp, last_event_start_time and long_find_row_note from Relay_log_info to rpl_group_info sql/slave.cc: Use rgi for trans_retries and last_event_start_time Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread Reset trans_retries when object is created
-rw-r--r--sql/log_event.cc12
-rw-r--r--sql/log_event_old.cc2
-rw-r--r--sql/mysqld.cc3
-rw-r--r--sql/mysqld.h15
-rw-r--r--sql/rpl_parallel.cc26
-rw-r--r--sql/rpl_rli.cc10
-rw-r--r--sql/rpl_rli.h119
-rw-r--r--sql/slave.cc43
8 files changed, 134 insertions, 96 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 59fc856c3f2..55166b65df4 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -9293,7 +9293,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
set the initial time of this ROWS statement if it was not done
before in some other ROWS event.
*/
- const_cast<Relay_log_info*>(rli)->set_row_stmt_start_timestamp();
+ rgi->set_row_stmt_start_timestamp();
while (error == 0 && m_curr_row < m_rows_end)
{
@@ -11133,13 +11133,13 @@ static inline
void issue_long_find_row_warning(Log_event_type type,
const char *table_name,
bool is_index_scan,
- const Relay_log_info *rli)
+ rpl_group_info *rgi)
{
if ((global_system_variables.log_warnings > 1 &&
- !const_cast<Relay_log_info*>(rli)->is_long_find_row_note_printed()))
+ !rgi->is_long_find_row_note_printed()))
{
time_t now= my_time(0);
- time_t stmt_ts= const_cast<Relay_log_info*>(rli)->get_row_stmt_start_timestamp();
+ time_t stmt_ts= rgi->get_row_stmt_start_timestamp();
DBUG_EXECUTE_IF("inject_long_find_row_note",
stmt_ts-=(LONG_FIND_ROW_THRESHOLD*2););
@@ -11148,7 +11148,7 @@ void issue_long_find_row_warning(Log_event_type type,
if (delta > LONG_FIND_ROW_THRESHOLD)
{
- const_cast<Relay_log_info*>(rli)->set_long_find_row_note_printed();
+ rgi->set_long_find_row_note_printed();
const char* evt_type= type == DELETE_ROWS_EVENT ? " DELETE" : "n UPDATE";
const char* scan_type= is_index_scan ? "scanning an index" : "scanning the table";
@@ -11477,7 +11477,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
end:
if (is_table_scan || is_index_scan)
issue_long_find_row_warning(get_type_code(), m_table->alias.c_ptr(),
- is_index_scan, rgi->rli);
+ is_index_scan, rgi);
table->default_column_bitmaps();
DBUG_RETURN(error);
}
diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc
index b4f28abcf2b..174219a8e72 100644
--- a/sql/log_event_old.cc
+++ b/sql/log_event_old.cc
@@ -1740,7 +1740,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
problem. When WL#2975 is implemented, just remove the member
Relay_log_info::last_event_start_time and all its occurrences.
*/
- const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
+ rgi->last_event_start_time= my_time(0);
}
if (get_flags(STMT_END_F))
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index 9f1d9e48b1c..1e7deef8d89 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -492,6 +492,7 @@ my_atomic_rwlock_t global_query_id_lock;
my_atomic_rwlock_t thread_running_lock;
my_atomic_rwlock_t thread_count_lock;
my_atomic_rwlock_t statistics_lock;
+my_atomic_rwlock_t slave_executed_entries_lock;
ulong aborted_threads, aborted_connects;
ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size;
ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use;
@@ -1939,6 +1940,7 @@ void clean_up(bool print_message)
my_atomic_rwlock_destroy(&thread_running_lock);
my_atomic_rwlock_destroy(&thread_count_lock);
my_atomic_rwlock_destroy(&statistics_lock);
+ my_atomic_rwlock_destroy(&slave_executed_entries_lock);
free_charsets();
mysql_mutex_lock(&LOCK_thread_count);
DBUG_PRINT("quit", ("got thread count lock"));
@@ -7550,6 +7552,7 @@ static int mysql_init_variables(void)
my_atomic_rwlock_init(&thread_running_lock);
my_atomic_rwlock_init(&thread_count_lock);
my_atomic_rwlock_init(&statistics_lock);
+ my_atomic_rwlock_init(slave_executed_entries_lock);
strmov(server_version, MYSQL_SERVER_VERSION);
threads.empty();
thread_cache.empty();
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 0bd3687f4fb..e45b48f0332 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -367,6 +367,7 @@ extern mysql_cond_t COND_manager;
extern int32 thread_running;
extern int32 thread_count;
extern my_atomic_rwlock_t thread_running_lock, thread_count_lock;
+extern my_atomic_rwlock_t slave_executed_entries_lock;
extern char *opt_ssl_ca, *opt_ssl_capath, *opt_ssl_cert, *opt_ssl_cipher,
*opt_ssl_key;
@@ -507,6 +508,20 @@ inline void thread_safe_decrement32(int32 *value, my_atomic_rwlock_t *lock)
my_atomic_rwlock_wrunlock(lock);
}
+inline void thread_safe_increment64(int64 *value, my_atomic_rwlock_t *lock)
+{
+ my_atomic_rwlock_wrlock(lock);
+ (void) my_atomic_add64(value, 1);
+ my_atomic_rwlock_wrunlock(lock);
+}
+
+inline void thread_safe_decrement64(int64 *value, my_atomic_rwlock_t *lock)
+{
+ my_atomic_rwlock_wrlock(lock);
+ (void) my_atomic_add64(value, -1);
+ my_atomic_rwlock_wrunlock(lock);
+}
+
inline void
inc_thread_running()
{
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index c6411b01e60..19ae2a35339 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -9,10 +9,6 @@
ToDo list:
- - Review every field in Relay_log_info, and all code that accesses it.
- Split out the necessary parts into rpl_group_info, to avoid conflicts
- between parallel execution of events. (Such as deferred events ...)
-
- Error handling. If we fail in one of multiple parallel executions, we
need to make a best effort to complete prior transactions and roll back
following transactions, so slave binlog position will be correct.
@@ -43,10 +39,11 @@
slave rolls back the transaction; parallel execution needs to be able
to deal with this wrt. commit_orderer and such.
- - We should fail if we connect to the master with opt_slave_parallel_threads
- greater than zero and master does not support GTID. Just to avoid a bunch
- of potential problems, we won't be able to do any parallel replication
- in this case anyway.
+ - We should notice if the master doesn't support GTID, and then run in
+ single threaded mode against that master. This is needed to be able to
+ support multi-master-replication with old and new masters.
+
+ - Retry of failed transactions is not yet implemented for the parallel case.
*/
struct rpl_parallel_thread_pool global_rpl_thread_pool;
@@ -56,7 +53,7 @@ static int
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
- int err;
+ int err __attribute__((unused));
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
@@ -69,6 +66,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
qev->ev->thd= thd;
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
+
+ thread_safe_increment64(&rli->executed_entries,
+ &slave_executed_entries_lock);
/* ToDo: error handling. */
return err;
}
@@ -617,7 +617,10 @@ rpl_parallel::wait_for_done()
/*
do_event() is executed by the sql_driver_thd thread.
- It's main purpose is to find a thread that can exectue the query.
+ It's main purpose is to find a thread that can execute the query.
+
+ @retval false ok, event was accepted
+ @retval true error
*/
bool
@@ -643,7 +646,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
rli->abort_slave)
sql_thread_stopping= true;
if (sql_thread_stopping)
+ {
+ /* QQ: Need a better comment why we return false here */
return false;
+ }
if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev),
MYF(0))))
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index ae2b7558285..53481d2efaf 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -59,8 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
- last_event_start_time(0), m_flags(0),
- row_stmt_start_timestamp(0), long_find_row_note_printed(false)
+ m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -1420,7 +1419,8 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_)
: rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
- tables_to_lock_count(0)
+ tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
+ row_stmt_start_timestamp(0), long_find_row_note_printed(false)
{
bzero(&current_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
@@ -1551,8 +1551,8 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
- timestamp
- flag that decides whether the slave prints or not
*/
- rli->reset_row_stmt_start_timestamp();
- rli->unset_long_find_row_note_printed();
+ reset_row_stmt_start_timestamp();
+ unset_long_find_row_note_printed();
DBUG_VOID_RETURN;
}
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 9e96fb8e72c..68cd051be2a 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -298,14 +298,16 @@ public:
char cached_charset[6];
/*
- trans_retries varies between 0 to slave_transaction_retries and counts how
- many times the slave has retried the present transaction; gets reset to 0
- when the transaction finally succeeds. retried_trans is a cumulative
- counter: how many times the slave has retried a transaction (any) since
- slave started.
+ retried_trans is a cumulative counter: how many times the slave
+ has retried a transaction (any) since slave started.
+ Protected by data_lock.
*/
- ulong trans_retries, retried_trans;
- ulong executed_entries; /* For SLAVE STATUS */
+ ulong retried_trans;
+ /*
+ Number of executed events for SLAVE STATUS.
+ Protected by slave_executed_entries_lock
+ */
+ int64 executed_entries;
/*
If the end of the hot relay log is made of master's events ignored by the
@@ -381,13 +383,6 @@ public:
void cached_charset_invalidate();
bool cached_charset_compare(char *charset) const;
- /*
- Used to defer stopping the SQL thread to give it a chance
- to finish up the current group of events.
- The timestamp is set and reset in @c sql_slave_killed().
- */
- time_t last_event_start_time;
-
/**
Helper function to do after statement completion.
@@ -462,39 +457,6 @@ public:
m_flags&= ~flag;
}
- time_t get_row_stmt_start_timestamp()
- {
- return row_stmt_start_timestamp;
- }
-
- time_t set_row_stmt_start_timestamp()
- {
- if (row_stmt_start_timestamp == 0)
- row_stmt_start_timestamp= my_time(0);
-
- return row_stmt_start_timestamp;
- }
-
- void reset_row_stmt_start_timestamp()
- {
- row_stmt_start_timestamp= 0;
- }
-
- void set_long_find_row_note_printed()
- {
- long_find_row_note_printed= true;
- }
-
- void unset_long_find_row_note_printed()
- {
- long_find_row_note_printed= false;
- }
-
- bool is_long_find_row_note_printed()
- {
- return long_find_row_note_printed;
- }
-
private:
/*
@@ -504,13 +466,6 @@ private:
relay log.
*/
uint32 m_flags;
-
- /*
- Runtime state for printing a note when slave is taking
- too long while processing a row event.
- */
- time_t row_stmt_start_timestamp;
- bool long_find_row_note_printed;
};
@@ -592,6 +547,29 @@ struct rpl_group_info
mysql_mutex_t sleep_lock;
mysql_cond_t sleep_cond;
+ /*
+ trans_retries varies between 0 to slave_transaction_retries and counts how
+ many times the slave has retried the present transaction; gets reset to 0
+ when the transaction finally succeeds.
+ */
+ ulong trans_retries;
+
+ /*
+ Used to defer stopping the SQL thread to give it a chance
+ to finish up the current group of events.
+ The timestamp is set and reset in @c sql_slave_killed().
+ */
+ time_t last_event_start_time;
+
+private:
+ /*
+ Runtime state for printing a note when slave is taking
+ too long while processing a row event.
+ */
+ time_t row_stmt_start_timestamp;
+ bool long_find_row_note_printed;
+public:
+
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
@@ -673,6 +651,39 @@ struct rpl_group_info
void clear_tables_to_lock();
void cleanup_context(THD *, bool);
void slave_close_thread_tables(THD *);
+
+ time_t get_row_stmt_start_timestamp()
+ {
+ return row_stmt_start_timestamp;
+ }
+
+ time_t set_row_stmt_start_timestamp()
+ {
+ if (row_stmt_start_timestamp == 0)
+ row_stmt_start_timestamp= my_time(0);
+
+ return row_stmt_start_timestamp;
+ }
+
+ void reset_row_stmt_start_timestamp()
+ {
+ row_stmt_start_timestamp= 0;
+ }
+
+ void set_long_find_row_note_printed()
+ {
+ long_find_row_note_printed= true;
+ }
+
+ void unset_long_find_row_note_printed()
+ {
+ long_find_row_note_printed= false;
+ }
+
+ bool is_long_find_row_note_printed()
+ {
+ return long_find_row_note_printed;
+ }
};
diff --git a/sql/slave.cc b/sql/slave.cc
index 2504f723a78..61c63cd2862 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -1034,9 +1034,9 @@ static bool sql_slave_killed(rpl_group_info *rgi)
@c last_event_start_time the timer.
*/
- if (rli->last_event_start_time == 0)
- rli->last_event_start_time= my_time(0);
- ret= difftime(my_time(0), rli->last_event_start_time) <=
+ if (rgi->last_event_start_time == 0)
+ rgi->last_event_start_time= my_time(0);
+ ret= difftime(my_time(0), rgi->last_event_start_time) <=
SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE;
DBUG_EXECUTE_IF("stop_slave_middle_group",
@@ -1070,7 +1070,7 @@ static bool sql_slave_killed(rpl_group_info *rgi)
}
}
if (ret)
- rli->last_event_start_time= 0;
+ rgi->last_event_start_time= 0;
DBUG_RETURN(ret);
}
@@ -3047,10 +3047,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
ev->get_type_str(), ev->get_type_code(),
ev->server_id));
- DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu",
+ DBUG_PRINT("info", ("thd->options: %s%s; rgi->last_event_start_time: %lu",
FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
- (ulong) rli->last_event_start_time));
+ (ulong) rgi->last_event_start_time));
/*
Execute the event to change the database and update the binary
@@ -3385,14 +3385,16 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
Note, if lock wait timeout (innodb_lock_wait_timeout exceeded)
there is no rollback since 5.0.13 (ref: manual).
We have to not only seek but also
- a) init_master_info(), to seek back to hot relay log's start for later
- (for when we will come back to this hot log after re-processing the
- possibly existing old logs where BEGIN is: check_binlog_magic() will
- then need the cache to be at position 0 (see comments at beginning of
+
+ a) init_master_info(), to seek back to hot relay log's start
+ for later (for when we will come back to this hot log after
+ re-processing the possibly existing old logs where BEGIN is:
+ check_binlog_magic() will then need the cache to be at
+ position 0 (see comments at beginning of
init_master_info()).
b) init_relay_log_pos(), because the BEGIN may be an older relay log.
*/
- if (rli->trans_retries < slave_trans_retries)
+ if (serial_rgi->trans_retries < slave_trans_retries)
{
if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
sql_print_error("Failed to initialize the master info structure");
@@ -3407,15 +3409,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
exec_res= 0;
serial_rgi->cleanup_context(thd, 1);
/* chance for concurrent connection to get more locks */
- slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
+ slave_sleep(thd, min(serial_rgi->trans_retries,
+ MAX_SLAVE_RETRY_PAUSE),
sql_slave_killed, serial_rgi);
+ serial_rgi->trans_retries++;
mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS
- rli->trans_retries++;
rli->retried_trans++;
statistic_increment(slave_retried_transactions, LOCK_status);
mysql_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info", ("Slave retries transaction "
- "rli->trans_retries: %lu", rli->trans_retries));
+ "rgi->trans_retries: %lu",
+ serial_rgi->trans_retries));
}
}
else
@@ -3434,11 +3438,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
event, the execution will proceed as usual; in the case of a
non-transient error, the slave will stop with an error.
*/
- rli->trans_retries= 0; // restart from fresh
- DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu",
- rli->trans_retries));
+ serial_rgi->trans_retries= 0; // restart from fresh
+ DBUG_PRINT("info", ("Resetting retry counter, rgi->trans_retries: %lu",
+ serial_rgi->trans_retries));
}
}
+ thread_safe_increment64(&rli->executed_entries,
+ &slave_executed_entries_lock);
DBUG_RETURN(exec_res);
}
mysql_mutex_unlock(&rli->data_lock);
@@ -4179,8 +4185,6 @@ pthread_handler_t handle_slave_sql(void *arg)
mysql_mutex_lock(&rli->log_space_lock);
rli->ignore_log_space_limit= 0;
mysql_mutex_unlock(&rli->log_space_lock);
- rli->trans_retries= 0; // start from "no error"
- DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries));
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
@@ -4406,7 +4410,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
}
goto err;
}
- rli->executed_entries++;
}
if (opt_slave_parallel_threads > 0)