summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc133
1 files changed, 77 insertions, 56 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 48c458c2607..6726be36e74 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -119,7 +119,6 @@ static MYSQL_BIN_LOG::xid_count_per_binlog *
static bool start_binlog_background_thread();
-
static rpl_binlog_state rpl_global_gtid_binlog_state;
/**
@@ -3007,6 +3006,14 @@ void MYSQL_BIN_LOG::cleanup()
mysql_cond_destroy(&COND_binlog_background_thread);
mysql_cond_destroy(&COND_binlog_background_thread_end);
}
+
+ /*
+ Free data for global binlog state.
+ We can't do that automaticly as we need to do this before
+ safemalloc is shut down
+ */
+ if (!is_relay_log)
+ rpl_global_gtid_binlog_state.free();
DBUG_VOID_RETURN;
}
@@ -3286,7 +3293,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
there had been an entry (domain_id, server_id, 0).
*/
- Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state);
+ Gtid_list_log_event gl_ev(&rpl_global_gtid_binlog_state, 0);
if (gl_ev.write(&log_file))
goto err;
@@ -3847,9 +3854,6 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd, bool create_new_log)
if (!is_relay_log)
{
rpl_global_gtid_binlog_state.reset();
- mysql_mutex_lock(&LOCK_gtid_counter);
- global_gtid_counter= 0;
- mysql_mutex_unlock(&LOCK_gtid_counter);
}
/* Start logging with a new file */
@@ -5373,9 +5377,11 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
bool is_transactional)
{
rpl_gtid gtid;
- uint64 seq_no;
+ uint32 domain_id= thd->variables.gtid_domain_id;
+ uint32 server_id= thd->variables.server_id;
+ uint64 seq_no= thd->variables.gtid_seq_no;
+ int err;
- seq_no= thd->variables.gtid_seq_no;
/*
Reset the session variable gtid_seq_no, to reduce the risk of accidentally
producing a duplicate GTID.
@@ -5383,34 +5389,36 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
thd->variables.gtid_seq_no= 0;
if (seq_no != 0)
{
- /*
- If we see a higher sequence number, use that one as the basis of any
- later generated sequence numbers.
- */
- bump_seq_no_counter_if_needed(seq_no);
+ /* Use the specified sequence number. */
+ gtid.domain_id= domain_id;
+ gtid.server_id= server_id;
+ gtid.seq_no= seq_no;
+ mysql_mutex_lock(&LOCK_rpl_gtid_state);
+ err= rpl_global_gtid_binlog_state.update(&gtid, opt_gtid_strict_mode);
+ mysql_mutex_unlock(&LOCK_rpl_gtid_state);
+ if (err && thd->stmt_da->sql_errno()==ER_GTID_STRICT_OUT_OF_ORDER)
+ errno= ER_GTID_STRICT_OUT_OF_ORDER;
}
else
{
- mysql_mutex_lock(&LOCK_gtid_counter);
- seq_no= ++global_gtid_counter;
- mysql_mutex_unlock(&LOCK_gtid_counter);
+ /* Allocate the next sequence number for the GTID. */
+ mysql_mutex_lock(&LOCK_rpl_gtid_state);
+ err= rpl_global_gtid_binlog_state.update_with_next_gtid(domain_id,
+ server_id, &gtid);
+ mysql_mutex_unlock(&LOCK_rpl_gtid_state);
+ seq_no= gtid.seq_no;
}
- gtid.seq_no= seq_no;
- gtid.domain_id= thd->variables.gtid_domain_id;
+ if (err)
+ return true;
- Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone,
+ Gtid_log_event gtid_event(thd, seq_no, domain_id, standalone,
LOG_EVENT_SUPPRESS_USE_F, is_transactional);
- gtid.server_id= gtid_event.server_id;
/* Write the event to the binary log. */
if (gtid_event.write(&mysql_bin_log.log_file))
return true;
status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
- /* Update the replication state (last GTID in each replication domain). */
- mysql_mutex_lock(&LOCK_rpl_gtid_state);
- rpl_global_gtid_binlog_state.update(&gtid);
- mysql_mutex_unlock(&LOCK_rpl_gtid_state);
return false;
}
@@ -5511,9 +5519,6 @@ end:
end_io_cache(&cache);
if (opened)
mysql_file_close(file_no, MYF(0));
- /* Pick the next unused seq_no from the loaded binlog state. */
- bump_seq_no_counter_if_needed(
- rpl_global_gtid_binlog_state.seq_no_from_state());
return err;
}
@@ -5527,6 +5532,18 @@ MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size)
bool
+MYSQL_BIN_LOG::append_state_pos(String *str)
+{
+ bool err;
+
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ err= rpl_global_gtid_binlog_state.append_pos(str);
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ return err;
+}
+
+
+bool
MYSQL_BIN_LOG::find_in_binlog_state(uint32 domain_id, uint32 server_id,
rpl_gtid *out_gtid)
{
@@ -5543,33 +5560,44 @@ bool
MYSQL_BIN_LOG::lookup_domain_in_binlog_state(uint32 domain_id,
rpl_gtid *out_gtid)
{
- rpl_binlog_state::element *elem;
- bool res;
+ rpl_gtid *found_gtid;
+ bool res= false;
mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
- elem= (rpl_binlog_state::element *)
- my_hash_search(&rpl_global_gtid_binlog_state.hash,
- (const uchar *)&domain_id, 0);
- if (elem)
+ if ((found_gtid= rpl_global_gtid_binlog_state.find_most_recent(domain_id)))
{
+ *out_gtid= *found_gtid;
res= true;
- *out_gtid= *elem->last_gtid;
}
- else
- res= false;
mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
return res;
}
-void
-MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint64 seq_no)
+int
+MYSQL_BIN_LOG::bump_seq_no_counter_if_needed(uint32 domain_id, uint64 seq_no)
{
- mysql_mutex_lock(&LOCK_gtid_counter);
- if (global_gtid_counter < seq_no)
- global_gtid_counter= seq_no;
- mysql_mutex_unlock(&LOCK_gtid_counter);
+ int err;
+
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ err= rpl_global_gtid_binlog_state.bump_seq_no_if_needed(domain_id, seq_no);
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ return err;
+}
+
+
+bool
+MYSQL_BIN_LOG::check_strict_gtid_sequence(uint32 domain_id, uint32 server_id,
+ uint64 seq_no)
+{
+ bool err;
+
+ mysql_mutex_lock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ err= rpl_global_gtid_binlog_state.check_strict_sequence(domain_id, server_id,
+ seq_no);
+ mysql_mutex_unlock(&rpl_global_gtid_binlog_state.LOCK_binlog_state);
+ return err;
}
@@ -5642,7 +5670,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log);
prev_binlog_id= current_binlog_id;
- write_gtid_event(thd, true, using_trans);
+ if (write_gtid_event(thd, true, using_trans))
+ goto err;
}
else
{
@@ -6711,8 +6740,8 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty());
- current->error= write_transaction_or_stmt(current);
-
+ if ((current->error= write_transaction_or_stmt(current)))
+ current->commit_errno= errno;
strmake(cache_mngr->last_commit_pos_file, log_file_name,
sizeof(cache_mngr->last_commit_pos_file)-1);
commit_offset= my_b_write_tell(&log_file);
@@ -8257,9 +8286,6 @@ int TC_LOG_BINLOG::open(const char *opt_name)
error= recover(&log_info, log_name, &log,
(Format_description_log_event *)ev);
state_read= true;
- /* Pick the next unused seq_no from the recovered binlog state. */
- bump_seq_no_counter_if_needed(
- rpl_global_gtid_binlog_state.seq_no_from_state());
}
else
error= read_state_from_file();
@@ -8513,7 +8539,7 @@ binlog_background_thread(void *arg __attribute__((unused)))
thd->store_globals();
/*
- Load the slave replication GTID state from the mysql.rpl_slave_state
+ Load the slave replication GTID state from the mysql.gtid_slave_pos
table.
This is mostly so that we can start our seq_no counter from the highest
@@ -8723,16 +8749,11 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
case GTID_LIST_EVENT:
if (first_round)
{
- uint32 i;
Gtid_list_log_event *glev= (Gtid_list_log_event *)ev;
/* Initialise the binlog state from the Gtid_list event. */
- rpl_global_gtid_binlog_state.reset();
- for (i= 0; i < glev->count; ++i)
- {
- if (rpl_global_gtid_binlog_state.update(&(glev->list[i])))
- goto err2;
- }
+ if (rpl_global_gtid_binlog_state.load(glev->list, glev->count))
+ goto err2;
}
break;
@@ -8746,7 +8767,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
gtid.domain_id= gev->domain_id;
gtid.server_id= gev->server_id;
gtid.seq_no= gev->seq_no;
- if (rpl_global_gtid_binlog_state.update(&gtid))
+ if (rpl_global_gtid_binlog_state.update(&gtid, false))
goto err2;
}
break;