diff options
author | Michael Widenius <monty@askmonty.org> | 2013-10-15 00:17:16 +0300 |
---|---|---|
committer | Michael Widenius <monty@askmonty.org> | 2013-10-15 00:17:16 +0300 |
commit | 5748eb3ec63adee035ec5305796bdbc67a654158 (patch) | |
tree | dbe4b5bff464a97acfd4cc99f49086b1a9ec227a | |
parent | 2842f6b5dc254c82aa3dc976cd5bd3645dc82a60 (diff) | |
download | mariadb-git-5748eb3ec63adee035ec5305796bdbc67a654158.tar.gz |
Moved the remaining variables, that depends on sql execution, from Relay_log_info to rpl_group_info:
-row_stmt_start_timestamp
-last_event_start_time
-long_find_row_note
-trans_retries
Added slave_executed_entries_lock to protect rli->executed_entries
Added primitives for thread safe 64 bit increment
Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread
sql/log_event.cc:
row_stmt_start and long_find_row_note is now in rpl_group_info
sql/mysqld.cc:
Added slave_executed_entries_lock to protect rli->executed_entries
sql/mysqld.h:
Added slave_executed_entries_lock to protect rli->executed_entries
Added primitives for thread safe 64 bit increment
sql/rpl_parallel.cc:
Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread
sql/rpl_rli.cc:
Moved row_stmt_start_timestamp, last_event_start_time and long_find_row_note from Relay_log_info to rpl_group_info
sql/rpl_rli.h:
Moved trans_retries, row_stmt_start_timestamp, last_event_start_time and long_find_row_note from Relay_log_info to rpl_group_info
sql/slave.cc:
Use rgi for trans_retries and last_event_start_time
Update rli->executed_entries when event has executed, not when event has been sent to sql execution thread
Reset trans_retries when object is created
-rw-r--r-- | sql/log_event.cc | 12 | ||||
-rw-r--r-- | sql/log_event_old.cc | 2 | ||||
-rw-r--r-- | sql/mysqld.cc | 3 | ||||
-rw-r--r-- | sql/mysqld.h | 15 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 26 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 10 | ||||
-rw-r--r-- | sql/rpl_rli.h | 119 | ||||
-rw-r--r-- | sql/slave.cc | 43 |
8 files changed, 134 insertions, 96 deletions
diff --git a/sql/log_event.cc b/sql/log_event.cc index 59fc856c3f2..55166b65df4 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -9293,7 +9293,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) set the initial time of this ROWS statement if it was not done before in some other ROWS event. */ - const_cast<Relay_log_info*>(rli)->set_row_stmt_start_timestamp(); + rgi->set_row_stmt_start_timestamp(); while (error == 0 && m_curr_row < m_rows_end) { @@ -11133,13 +11133,13 @@ static inline void issue_long_find_row_warning(Log_event_type type, const char *table_name, bool is_index_scan, - const Relay_log_info *rli) + rpl_group_info *rgi) { if ((global_system_variables.log_warnings > 1 && - !const_cast<Relay_log_info*>(rli)->is_long_find_row_note_printed())) + !rgi->is_long_find_row_note_printed())) { time_t now= my_time(0); - time_t stmt_ts= const_cast<Relay_log_info*>(rli)->get_row_stmt_start_timestamp(); + time_t stmt_ts= rgi->get_row_stmt_start_timestamp(); DBUG_EXECUTE_IF("inject_long_find_row_note", stmt_ts-=(LONG_FIND_ROW_THRESHOLD*2);); @@ -11148,7 +11148,7 @@ void issue_long_find_row_warning(Log_event_type type, if (delta > LONG_FIND_ROW_THRESHOLD) { - const_cast<Relay_log_info*>(rli)->set_long_find_row_note_printed(); + rgi->set_long_find_row_note_printed(); const char* evt_type= type == DELETE_ROWS_EVENT ? " DELETE" : "n UPDATE"; const char* scan_type= is_index_scan ? "scanning an index" : "scanning the table"; @@ -11477,7 +11477,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi) end: if (is_table_scan || is_index_scan) issue_long_find_row_warning(get_type_code(), m_table->alias.c_ptr(), - is_index_scan, rgi->rli); + is_index_scan, rgi); table->default_column_bitmaps(); DBUG_RETURN(error); } diff --git a/sql/log_event_old.cc b/sql/log_event_old.cc index b4f28abcf2b..174219a8e72 100644 --- a/sql/log_event_old.cc +++ b/sql/log_event_old.cc @@ -1740,7 +1740,7 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi) problem. When WL#2975 is implemented, just remove the member Relay_log_info::last_event_start_time and all its occurrences. */ - const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); + rgi->last_event_start_time= my_time(0); } if (get_flags(STMT_END_F)) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 9f1d9e48b1c..1e7deef8d89 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -492,6 +492,7 @@ my_atomic_rwlock_t global_query_id_lock; my_atomic_rwlock_t thread_running_lock; my_atomic_rwlock_t thread_count_lock; my_atomic_rwlock_t statistics_lock; +my_atomic_rwlock_t slave_executed_entries_lock; ulong aborted_threads, aborted_connects; ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size; ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use; @@ -1939,6 +1940,7 @@ void clean_up(bool print_message) my_atomic_rwlock_destroy(&thread_running_lock); my_atomic_rwlock_destroy(&thread_count_lock); my_atomic_rwlock_destroy(&statistics_lock); + my_atomic_rwlock_destroy(&slave_executed_entries_lock); free_charsets(); mysql_mutex_lock(&LOCK_thread_count); DBUG_PRINT("quit", ("got thread count lock")); @@ -7550,6 +7552,7 @@ static int mysql_init_variables(void) my_atomic_rwlock_init(&thread_running_lock); my_atomic_rwlock_init(&thread_count_lock); my_atomic_rwlock_init(&statistics_lock); + my_atomic_rwlock_init(slave_executed_entries_lock); strmov(server_version, MYSQL_SERVER_VERSION); threads.empty(); thread_cache.empty(); diff --git a/sql/mysqld.h b/sql/mysqld.h index 0bd3687f4fb..e45b48f0332 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -367,6 +367,7 @@ extern mysql_cond_t COND_manager; extern int32 thread_running; extern int32 thread_count; extern my_atomic_rwlock_t thread_running_lock, thread_count_lock; +extern my_atomic_rwlock_t slave_executed_entries_lock; extern char *opt_ssl_ca, *opt_ssl_capath, *opt_ssl_cert, *opt_ssl_cipher, *opt_ssl_key; @@ -507,6 +508,20 @@ inline void thread_safe_decrement32(int32 *value, my_atomic_rwlock_t *lock) my_atomic_rwlock_wrunlock(lock); } +inline void thread_safe_increment64(int64 *value, my_atomic_rwlock_t *lock) +{ + my_atomic_rwlock_wrlock(lock); + (void) my_atomic_add64(value, 1); + my_atomic_rwlock_wrunlock(lock); +} + +inline void thread_safe_decrement64(int64 *value, my_atomic_rwlock_t *lock) +{ + my_atomic_rwlock_wrlock(lock); + (void) my_atomic_add64(value, -1); + my_atomic_rwlock_wrunlock(lock); +} + inline void inc_thread_running() { diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index c6411b01e60..19ae2a35339 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -9,10 +9,6 @@ ToDo list: - - Review every field in Relay_log_info, and all code that accesses it. - Split out the necessary parts into rpl_group_info, to avoid conflicts - between parallel execution of events. (Such as deferred events ...) - - Error handling. If we fail in one of multiple parallel executions, we need to make a best effort to complete prior transactions and roll back following transactions, so slave binlog position will be correct. @@ -43,10 +39,11 @@ slave rolls back the transaction; parallel execution needs to be able to deal with this wrt. commit_orderer and such. - - We should fail if we connect to the master with opt_slave_parallel_threads - greater than zero and master does not support GTID. Just to avoid a bunch - of potential problems, we won't be able to do any parallel replication - in this case anyway. + - We should notice if the master doesn't support GTID, and then run in + single threaded mode against that master. This is needed to be able to + support multi-master-replication with old and new masters. + + - Retry of failed transactions is not yet implemented for the parallel case. */ struct rpl_parallel_thread_pool global_rpl_thread_pool; @@ -56,7 +53,7 @@ static int rpt_handle_event(rpl_parallel_thread::queued_event *qev, struct rpl_parallel_thread *rpt) { - int err; + int err __attribute__((unused)); rpl_group_info *rgi= qev->rgi; Relay_log_info *rli= rgi->rli; THD *thd= rgi->thd; @@ -69,6 +66,9 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, qev->ev->thd= thd; err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); thd->rgi_slave= NULL; + + thread_safe_increment64(&rli->executed_entries, + &slave_executed_entries_lock); /* ToDo: error handling. */ return err; } @@ -617,7 +617,10 @@ rpl_parallel::wait_for_done() /* do_event() is executed by the sql_driver_thd thread. - It's main purpose is to find a thread that can exectue the query. + It's main purpose is to find a thread that can execute the query. + + @retval false ok, event was accepted + @retval true error */ bool @@ -643,7 +646,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev) rli->abort_slave) sql_thread_stopping= true; if (sql_thread_stopping) + { + /* QQ: Need a better comment why we return false here */ return false; + } if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), MYF(0)))) diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index ae2b7558285..53481d2efaf 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -59,8 +59,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), executed_entries(0), - last_event_start_time(0), m_flags(0), - row_stmt_start_timestamp(0), long_find_row_note_printed(false) + m_flags(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); @@ -1420,7 +1419,8 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_) : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), - tables_to_lock_count(0) + tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), + row_stmt_start_timestamp(0), long_find_row_note_printed(false) { bzero(¤t_gtid, sizeof(current_gtid)); mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, @@ -1551,8 +1551,8 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) - timestamp - flag that decides whether the slave prints or not */ - rli->reset_row_stmt_start_timestamp(); - rli->unset_long_find_row_note_printed(); + reset_row_stmt_start_timestamp(); + unset_long_find_row_note_printed(); DBUG_VOID_RETURN; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 9e96fb8e72c..68cd051be2a 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -298,14 +298,16 @@ public: char cached_charset[6]; /* - trans_retries varies between 0 to slave_transaction_retries and counts how - many times the slave has retried the present transaction; gets reset to 0 - when the transaction finally succeeds. retried_trans is a cumulative - counter: how many times the slave has retried a transaction (any) since - slave started. + retried_trans is a cumulative counter: how many times the slave + has retried a transaction (any) since slave started. + Protected by data_lock. */ - ulong trans_retries, retried_trans; - ulong executed_entries; /* For SLAVE STATUS */ + ulong retried_trans; + /* + Number of executed events for SLAVE STATUS. + Protected by slave_executed_entries_lock + */ + int64 executed_entries; /* If the end of the hot relay log is made of master's events ignored by the @@ -381,13 +383,6 @@ public: void cached_charset_invalidate(); bool cached_charset_compare(char *charset) const; - /* - Used to defer stopping the SQL thread to give it a chance - to finish up the current group of events. - The timestamp is set and reset in @c sql_slave_killed(). - */ - time_t last_event_start_time; - /** Helper function to do after statement completion. @@ -462,39 +457,6 @@ public: m_flags&= ~flag; } - time_t get_row_stmt_start_timestamp() - { - return row_stmt_start_timestamp; - } - - time_t set_row_stmt_start_timestamp() - { - if (row_stmt_start_timestamp == 0) - row_stmt_start_timestamp= my_time(0); - - return row_stmt_start_timestamp; - } - - void reset_row_stmt_start_timestamp() - { - row_stmt_start_timestamp= 0; - } - - void set_long_find_row_note_printed() - { - long_find_row_note_printed= true; - } - - void unset_long_find_row_note_printed() - { - long_find_row_note_printed= false; - } - - bool is_long_find_row_note_printed() - { - return long_find_row_note_printed; - } - private: /* @@ -504,13 +466,6 @@ private: relay log. */ uint32 m_flags; - - /* - Runtime state for printing a note when slave is taking - too long while processing a row event. - */ - time_t row_stmt_start_timestamp; - bool long_find_row_note_printed; }; @@ -592,6 +547,29 @@ struct rpl_group_info mysql_mutex_t sleep_lock; mysql_cond_t sleep_cond; + /* + trans_retries varies between 0 to slave_transaction_retries and counts how + many times the slave has retried the present transaction; gets reset to 0 + when the transaction finally succeeds. + */ + ulong trans_retries; + + /* + Used to defer stopping the SQL thread to give it a chance + to finish up the current group of events. + The timestamp is set and reset in @c sql_slave_killed(). + */ + time_t last_event_start_time; + +private: + /* + Runtime state for printing a note when slave is taking + too long while processing a row event. + */ + time_t row_stmt_start_timestamp; + bool long_find_row_note_printed; +public: + rpl_group_info(Relay_log_info *rli_); ~rpl_group_info(); @@ -673,6 +651,39 @@ struct rpl_group_info void clear_tables_to_lock(); void cleanup_context(THD *, bool); void slave_close_thread_tables(THD *); + + time_t get_row_stmt_start_timestamp() + { + return row_stmt_start_timestamp; + } + + time_t set_row_stmt_start_timestamp() + { + if (row_stmt_start_timestamp == 0) + row_stmt_start_timestamp= my_time(0); + + return row_stmt_start_timestamp; + } + + void reset_row_stmt_start_timestamp() + { + row_stmt_start_timestamp= 0; + } + + void set_long_find_row_note_printed() + { + long_find_row_note_printed= true; + } + + void unset_long_find_row_note_printed() + { + long_find_row_note_printed= false; + } + + bool is_long_find_row_note_printed() + { + return long_find_row_note_printed; + } }; diff --git a/sql/slave.cc b/sql/slave.cc index 2504f723a78..61c63cd2862 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1034,9 +1034,9 @@ static bool sql_slave_killed(rpl_group_info *rgi) @c last_event_start_time the timer. */ - if (rli->last_event_start_time == 0) - rli->last_event_start_time= my_time(0); - ret= difftime(my_time(0), rli->last_event_start_time) <= + if (rgi->last_event_start_time == 0) + rgi->last_event_start_time= my_time(0); + ret= difftime(my_time(0), rgi->last_event_start_time) <= SLAVE_WAIT_GROUP_DONE ? FALSE : TRUE; DBUG_EXECUTE_IF("stop_slave_middle_group", @@ -1070,7 +1070,7 @@ static bool sql_slave_killed(rpl_group_info *rgi) } } if (ret) - rli->last_event_start_time= 0; + rgi->last_event_start_time= 0; DBUG_RETURN(ret); } @@ -3047,10 +3047,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)", ev->get_type_str(), ev->get_type_code(), ev->server_id)); - DBUG_PRINT("info", ("thd->options: %s%s; rli->last_event_start_time: %lu", + DBUG_PRINT("info", ("thd->options: %s%s; rgi->last_event_start_time: %lu", FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT), FLAGSTR(thd->variables.option_bits, OPTION_BEGIN), - (ulong) rli->last_event_start_time)); + (ulong) rgi->last_event_start_time)); /* Execute the event to change the database and update the binary @@ -3385,14 +3385,16 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, Note, if lock wait timeout (innodb_lock_wait_timeout exceeded) there is no rollback since 5.0.13 (ref: manual). We have to not only seek but also - a) init_master_info(), to seek back to hot relay log's start for later - (for when we will come back to this hot log after re-processing the - possibly existing old logs where BEGIN is: check_binlog_magic() will - then need the cache to be at position 0 (see comments at beginning of + + a) init_master_info(), to seek back to hot relay log's start + for later (for when we will come back to this hot log after + re-processing the possibly existing old logs where BEGIN is: + check_binlog_magic() will then need the cache to be at + position 0 (see comments at beginning of init_master_info()). b) init_relay_log_pos(), because the BEGIN may be an older relay log. */ - if (rli->trans_retries < slave_trans_retries) + if (serial_rgi->trans_retries < slave_trans_retries) { if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL)) sql_print_error("Failed to initialize the master info structure"); @@ -3407,15 +3409,17 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, exec_res= 0; serial_rgi->cleanup_context(thd, 1); /* chance for concurrent connection to get more locks */ - slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE), + slave_sleep(thd, min(serial_rgi->trans_retries, + MAX_SLAVE_RETRY_PAUSE), sql_slave_killed, serial_rgi); + serial_rgi->trans_retries++; mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS - rli->trans_retries++; rli->retried_trans++; statistic_increment(slave_retried_transactions, LOCK_status); mysql_mutex_unlock(&rli->data_lock); DBUG_PRINT("info", ("Slave retries transaction " - "rli->trans_retries: %lu", rli->trans_retries)); + "rgi->trans_retries: %lu", + serial_rgi->trans_retries)); } } else @@ -3434,11 +3438,13 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, event, the execution will proceed as usual; in the case of a non-transient error, the slave will stop with an error. */ - rli->trans_retries= 0; // restart from fresh - DBUG_PRINT("info", ("Resetting retry counter, rli->trans_retries: %lu", - rli->trans_retries)); + serial_rgi->trans_retries= 0; // restart from fresh + DBUG_PRINT("info", ("Resetting retry counter, rgi->trans_retries: %lu", + serial_rgi->trans_retries)); } } + thread_safe_increment64(&rli->executed_entries, + &slave_executed_entries_lock); DBUG_RETURN(exec_res); } mysql_mutex_unlock(&rli->data_lock); @@ -4179,8 +4185,6 @@ pthread_handler_t handle_slave_sql(void *arg) mysql_mutex_lock(&rli->log_space_lock); rli->ignore_log_space_limit= 0; mysql_mutex_unlock(&rli->log_space_lock); - rli->trans_retries= 0; // start from "no error" - DBUG_PRINT("info", ("rli->trans_retries: %lu", rli->trans_retries)); if (init_relay_log_pos(rli, rli->group_relay_log_name, @@ -4406,7 +4410,6 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ } goto err; } - rli->executed_entries++; } if (opt_slave_parallel_threads > 0) |