summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc178
1 files changed, 164 insertions, 14 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index a162d1d79f8..754b877f654 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -52,6 +52,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
save_temporary_tables(0), mi(0),
+ inuse_relaylog_list(0), last_inuse_relaylog(0),
cur_log_old_open_count(0), group_relay_log_pos(0),
event_relay_log_pos(0),
#if HAVE_valgrind
@@ -98,8 +99,18 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
Relay_log_info::~Relay_log_info()
{
+ inuse_relaylog *cur;
DBUG_ENTER("Relay_log_info::~Relay_log_info");
+ cur= inuse_relaylog_list;
+ while (cur)
+ {
+ DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
+ inuse_relaylog *next= cur->next;
+ my_atomic_rwlock_destroy(&cur->inuse_relaylog_atomic_lock);
+ my_free(cur);
+ cur= next;
+ }
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
@@ -305,20 +316,80 @@ Failed to open the existing relay log info file '%s' (errno %d)",
}
rli->info_fd = info_fd;
- int relay_log_pos, master_log_pos;
+ int relay_log_pos, master_log_pos, lines;
+ char *first_non_digit;
+ /*
+ In MySQL 5.6, there is a MASTER_DELAY option to CHANGE MASTER. This is
+ not yet merged into MariaDB (as of 10.0.13). However, we detect the
+ presense of the new option in relay-log.info, as a placeholder for
+ possible later merge of the feature, and to maintain file format
+ compatibility with MySQL 5.6+.
+ */
+ int dummy_sql_delay;
+
+ /*
+ Starting from MySQL 5.6.x, relay-log.info has a new format.
+ Now, its first line contains the number of lines in the file.
+ By reading this number we can determine which version our master.info
+ comes from. We can't simply count the lines in the file, since
+ versions before 5.6.x could generate files with more lines than
+ needed. If first line doesn't contain a number, or if it
+ contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
+ then the file is treated like a file from pre-5.6.x version.
+ There is no ambiguity when reading an old master.info: before
+ 5.6.x, the first line contained the binlog's name, which is
+ either empty or has an extension (contains a '.'), so can't be
+ confused with an integer.
+
+ So we're just reading first line and trying to figure which
+ version is this.
+ */
+
+ /*
+ The first row is temporarily stored in mi->master_log_name, if
+ it is line count and not binlog name (new format) it will be
+ overwritten by the second row later.
+ */
if (init_strvar_from_file(rli->group_relay_log_name,
sizeof(rli->group_relay_log_name),
+ &rli->info_file, ""))
+ {
+ msg="Error reading slave log configuration";
+ goto err;
+ }
+
+ lines= strtoul(rli->group_relay_log_name, &first_non_digit, 10);
+
+ if (rli->group_relay_log_name[0] != '\0' &&
+ *first_non_digit == '\0' &&
+ lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
+ {
+ DBUG_PRINT("info", ("relay_log_info file is in new format."));
+ /* Seems to be new format => read relay log name from next line */
+ if (init_strvar_from_file(rli->group_relay_log_name,
+ sizeof(rli->group_relay_log_name),
+ &rli->info_file, ""))
+ {
+ msg="Error reading slave log configuration";
+ goto err;
+ }
+ }
+ else
+ DBUG_PRINT("info", ("relay_log_info file is in old format."));
+
+ if (init_intvar_from_file(&relay_log_pos,
+ &rli->info_file, BIN_LOG_HEADER_SIZE) ||
+ init_strvar_from_file(rli->group_master_log_name,
+ sizeof(rli->group_master_log_name),
&rli->info_file, "") ||
- init_intvar_from_file(&relay_log_pos,
- &rli->info_file, BIN_LOG_HEADER_SIZE) ||
- init_strvar_from_file(rli->group_master_log_name,
- sizeof(rli->group_master_log_name),
- &rli->info_file, "") ||
- init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
+ init_intvar_from_file(&master_log_pos, &rli->info_file, 0) ||
+ (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
+ init_intvar_from_file(&dummy_sql_delay, &rli->info_file, 0)))
{
msg="Error reading slave log configuration";
goto err;
}
+
strmake_buf(rli->event_relay_log_name,rli->group_relay_log_name);
rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
rli->group_master_log_pos= master_log_pos;
@@ -1024,7 +1095,6 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
DBUG_ASSERT(rli->slave_running == 0);
DBUG_ASSERT(rli->mi->slave_running == 0);
- rli->slave_skip_counter=0;
mysql_mutex_lock(&rli->data_lock);
/*
@@ -1243,7 +1313,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
inc_group_relay_log_pos(event_master_log_pos, rgi);
if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi))
{
- report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
+ report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
"Failed to update GTID state in %s.%s, slave state may become "
"inconsistent: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str,
@@ -1279,6 +1349,33 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
DBUG_VOID_RETURN;
}
+
+int
+Relay_log_info::alloc_inuse_relaylog(const char *name)
+{
+ inuse_relaylog *ir;
+
+ if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
+ return 1;
+ }
+ strmake_buf(ir->name, name);
+
+ if (!inuse_relaylog_list)
+ inuse_relaylog_list= ir;
+ else
+ {
+ last_inuse_relaylog->completed= true;
+ last_inuse_relaylog->next= ir;
+ }
+ last_inuse_relaylog= ir;
+ my_atomic_rwlock_init(&ir->inuse_relaylog_atomic_lock);
+
+ return 0;
+}
+
+
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int
rpl_load_gtid_slave_state(THD *thd)
@@ -1465,6 +1562,9 @@ rpl_group_info::reinit(Relay_log_info *rli)
tables_to_lock_count= 0;
trans_retries= 0;
last_event_start_time= 0;
+ gtid_sub_id= 0;
+ commit_id= 0;
+ gtid_pending= false;
worker_error= 0;
row_stmt_start_timestamp= 0;
long_find_row_note_printed= false;
@@ -1474,7 +1574,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
}
rpl_group_info::rpl_group_info(Relay_log_info *rli)
- : thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ : thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
@@ -1505,9 +1605,11 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
return 1;
}
rgi->gtid_sub_id= sub_id;
- rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.domain_id= gev->domain_id;
+ rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.seq_no= gev->seq_no;
+ rgi->commit_id= gev->commit_id;
+ rgi->gtid_pending= true;
return 0;
}
@@ -1563,7 +1665,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
- DBUG_ENTER("Relay_log_info::cleanup_context");
+ DBUG_ENTER("rpl_group_info::cleanup_context");
DBUG_PRINT("enter", ("error: %d", (int) error));
DBUG_ASSERT(this->thd == thd);
@@ -1629,7 +1731,7 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
void rpl_group_info::clear_tables_to_lock()
{
- DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
+ DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
#ifndef DBUG_OFF
/**
When replicating in RBR and MyISAM Merge tables are involved
@@ -1676,7 +1778,7 @@ void rpl_group_info::clear_tables_to_lock()
void rpl_group_info::slave_close_thread_tables(THD *thd)
{
- DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
+ DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
thd->get_stmt_da()->set_overwrite_status(true);
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
thd->get_stmt_da()->set_overwrite_status(false);
@@ -1745,6 +1847,54 @@ rpl_group_info::mark_start_commit()
}
+/*
+ Format the current GTID as a string suitable for printing in error messages.
+
+ The string is stored in a buffer inside rpl_group_info, so remains valid
+ until next call to gtid_info() or until destruction of rpl_group_info.
+
+ If no GTID is available, then NULL is returned.
+*/
+char *
+rpl_group_info::gtid_info()
+{
+ if (!gtid_sub_id || !current_gtid.seq_no)
+ return NULL;
+ my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu",
+ current_gtid.domain_id, current_gtid.server_id,
+ current_gtid.seq_no);
+ return gtid_info_buf;
+}
+
+
+/*
+ Undo the effect of a prior mark_start_commit().
+
+ This is only used for retrying a transaction in parallel replication, after
+ we have encountered a deadlock or other temporary error.
+
+ When we get such a deadlock, it means that the current group of transactions
+ did not yet all start committing (else they would not have deadlocked). So
+ we will not yet have woken up anything in the next group, our rgi->gco is
+ still live, and we can simply decrement the counter (to be incremented again
+ later, when the retry succeeds and reaches the commit step).
+*/
+void
+rpl_group_info::unmark_start_commit()
+{
+ rpl_parallel_entry *e;
+
+ if (!did_mark_start_commit)
+ return;
+
+ e= this->parallel_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ --e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+ did_mark_start_commit= false;
+}
+
+
rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
: rpl_filter(filter)
{