diff options
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 178 |
1 files changed, 164 insertions, 14 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index a162d1d79f8..754b877f654 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), sync_counter(0), is_relay_log_recovery(is_slave_recovery), save_temporary_tables(0), mi(0), + inuse_relaylog_list(0), last_inuse_relaylog(0), cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0), #if HAVE_valgrind @@ -98,8 +99,18 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) Relay_log_info::~Relay_log_info() { + inuse_relaylog *cur; DBUG_ENTER("Relay_log_info::~Relay_log_info"); + cur= inuse_relaylog_list; + while (cur) + { + DBUG_ASSERT(cur->queued_count == cur->dequeued_count); + inuse_relaylog *next= cur->next; + my_atomic_rwlock_destroy(&cur->inuse_relaylog_atomic_lock); + my_free(cur); + cur= next; + } mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&log_space_lock); @@ -305,20 +316,80 @@ Failed to open the existing relay log info file '%s' (errno %d)", } rli->info_fd = info_fd; - int relay_log_pos, master_log_pos; + int relay_log_pos, master_log_pos, lines; + char *first_non_digit; + /* + In MySQL 5.6, there is a MASTER_DELAY option to CHANGE MASTER. This is + not yet merged into MariaDB (as of 10.0.13). However, we detect the + presense of the new option in relay-log.info, as a placeholder for + possible later merge of the feature, and to maintain file format + compatibility with MySQL 5.6+. + */ + int dummy_sql_delay; + + /* + Starting from MySQL 5.6.x, relay-log.info has a new format. + Now, its first line contains the number of lines in the file. + By reading this number we can determine which version our master.info + comes from. We can't simply count the lines in the file, since + versions before 5.6.x could generate files with more lines than + needed. If first line doesn't contain a number, or if it + contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY, + then the file is treated like a file from pre-5.6.x version. + There is no ambiguity when reading an old master.info: before + 5.6.x, the first line contained the binlog's name, which is + either empty or has an extension (contains a '.'), so can't be + confused with an integer. + + So we're just reading first line and trying to figure which + version is this. + */ + + /* + The first row is temporarily stored in mi->master_log_name, if + it is line count and not binlog name (new format) it will be + overwritten by the second row later. + */ if (init_strvar_from_file(rli->group_relay_log_name, sizeof(rli->group_relay_log_name), + &rli->info_file, "")) + { + msg="Error reading slave log configuration"; + goto err; + } + + lines= strtoul(rli->group_relay_log_name, &first_non_digit, 10); + + if (rli->group_relay_log_name[0] != '\0' && + *first_non_digit == '\0' && + lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY) + { + DBUG_PRINT("info", ("relay_log_info file is in new format.")); + /* Seems to be new format => read relay log name from next line */ + if (init_strvar_from_file(rli->group_relay_log_name, + sizeof(rli->group_relay_log_name), + &rli->info_file, "")) + { + msg="Error reading slave log configuration"; + goto err; + } + } + else + DBUG_PRINT("info", ("relay_log_info file is in old format.")); + + if (init_intvar_from_file(&relay_log_pos, + &rli->info_file, BIN_LOG_HEADER_SIZE) || + init_strvar_from_file(rli->group_master_log_name, + sizeof(rli->group_master_log_name), &rli->info_file, "") || - init_intvar_from_file(&relay_log_pos, - &rli->info_file, BIN_LOG_HEADER_SIZE) || - init_strvar_from_file(rli->group_master_log_name, - sizeof(rli->group_master_log_name), - &rli->info_file, "") || - init_intvar_from_file(&master_log_pos, &rli->info_file, 0)) + init_intvar_from_file(&master_log_pos, &rli->info_file, 0) || + (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY && + init_intvar_from_file(&dummy_sql_delay, &rli->info_file, 0))) { msg="Error reading slave log configuration"; goto err; } + strmake_buf(rli->event_relay_log_name,rli->group_relay_log_name); rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos; rli->group_master_log_pos= master_log_pos; @@ -1024,7 +1095,6 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, DBUG_ASSERT(rli->slave_running == 0); DBUG_ASSERT(rli->mi->slave_running == 0); - rli->slave_skip_counter=0; mysql_mutex_lock(&rli->data_lock); /* @@ -1243,7 +1313,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, inc_group_relay_log_pos(event_master_log_pos, rgi); if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi)) { - report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, + report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(), "Failed to update GTID state in %s.%s, slave state may become " "inconsistent: %d: %s", "mysql", rpl_gtid_slave_state_table_name.str, @@ -1279,6 +1349,33 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, DBUG_VOID_RETURN; } + +int +Relay_log_info::alloc_inuse_relaylog(const char *name) +{ + inuse_relaylog *ir; + + if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); + return 1; + } + strmake_buf(ir->name, name); + + if (!inuse_relaylog_list) + inuse_relaylog_list= ir; + else + { + last_inuse_relaylog->completed= true; + last_inuse_relaylog->next= ir; + } + last_inuse_relaylog= ir; + my_atomic_rwlock_init(&ir->inuse_relaylog_atomic_lock); + + return 0; +} + + #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) int rpl_load_gtid_slave_state(THD *thd) @@ -1465,6 +1562,9 @@ rpl_group_info::reinit(Relay_log_info *rli) tables_to_lock_count= 0; trans_retries= 0; last_event_start_time= 0; + gtid_sub_id= 0; + commit_id= 0; + gtid_pending= false; worker_error= 0; row_stmt_start_timestamp= 0; long_find_row_note_printed= false; @@ -1474,7 +1574,7 @@ rpl_group_info::reinit(Relay_log_info *rli) } rpl_group_info::rpl_group_info(Relay_log_info *rli) - : thd(0), gtid_sub_id(0), wait_commit_sub_id(0), + : thd(0), wait_commit_sub_id(0), wait_commit_group_info(0), parallel_entry(0), deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) { @@ -1505,9 +1605,11 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) return 1; } rgi->gtid_sub_id= sub_id; - rgi->current_gtid.server_id= gev->server_id; rgi->current_gtid.domain_id= gev->domain_id; + rgi->current_gtid.server_id= gev->server_id; rgi->current_gtid.seq_no= gev->seq_no; + rgi->commit_id= gev->commit_id; + rgi->gtid_pending= true; return 0; } @@ -1563,7 +1665,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, void rpl_group_info::cleanup_context(THD *thd, bool error) { - DBUG_ENTER("Relay_log_info::cleanup_context"); + DBUG_ENTER("rpl_group_info::cleanup_context"); DBUG_PRINT("enter", ("error: %d", (int) error)); DBUG_ASSERT(this->thd == thd); @@ -1629,7 +1731,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) void rpl_group_info::clear_tables_to_lock() { - DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); + DBUG_ENTER("rpl_group_info::clear_tables_to_lock()"); #ifndef DBUG_OFF /** When replicating in RBR and MyISAM Merge tables are involved @@ -1676,7 +1778,7 @@ void rpl_group_info::clear_tables_to_lock() void rpl_group_info::slave_close_thread_tables(THD *thd) { - DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); + DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)"); thd->get_stmt_da()->set_overwrite_status(true); thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd); thd->get_stmt_da()->set_overwrite_status(false); @@ -1745,6 +1847,54 @@ rpl_group_info::mark_start_commit() } +/* + Format the current GTID as a string suitable for printing in error messages. + + The string is stored in a buffer inside rpl_group_info, so remains valid + until next call to gtid_info() or until destruction of rpl_group_info. + + If no GTID is available, then NULL is returned. +*/ +char * +rpl_group_info::gtid_info() +{ + if (!gtid_sub_id || !current_gtid.seq_no) + return NULL; + my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu", + current_gtid.domain_id, current_gtid.server_id, + current_gtid.seq_no); + return gtid_info_buf; +} + + +/* + Undo the effect of a prior mark_start_commit(). + + This is only used for retrying a transaction in parallel replication, after + we have encountered a deadlock or other temporary error. + + When we get such a deadlock, it means that the current group of transactions + did not yet all start committing (else they would not have deadlocked). So + we will not yet have woken up anything in the next group, our rgi->gco is + still live, and we can simply decrement the counter (to be incremented again + later, when the retry succeeds and reaches the commit step). +*/ +void +rpl_group_info::unmark_start_commit() +{ + rpl_parallel_entry *e; + + if (!did_mark_start_commit) + return; + + e= this->parallel_entry; + mysql_mutex_lock(&e->LOCK_parallel_entry); + --e->count_committing_event_groups; + mysql_mutex_unlock(&e->LOCK_parallel_entry); + did_mark_start_commit= false; +} + + rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter) : rpl_filter(filter) { |