summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc1081
1 files changed, 883 insertions, 198 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 52e60c237dc..9e20775a1aa 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -11,9 +11,10 @@
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
+ along with this program; if not, write to the Free Software Foundation,
+ 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
+#include <my_global.h>
#include "sql_priv.h"
#include "unireg.h" // HAVE_*
#include "rpl_mi.h"
@@ -32,6 +33,15 @@
static int count_relay_log_space(Relay_log_info* rli);
+/**
+ Current replication state (hash of last GTID executed, per replication
+ domain).
+*/
+rpl_slave_state *rpl_global_gtid_slave_state;
+/* Object used for MASTER_GTID_WAIT(). */
+gtid_waiting rpl_global_gtid_waiting;
+
+
// Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
@@ -42,24 +52,24 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
- save_temporary_tables(0), cur_log_old_open_count(0),
- error_on_rli_init_info(false), group_relay_log_pos(0),
- event_relay_log_pos(0),
+ save_temporary_tables(0), mi(0),
+ inuse_relaylog_list(0), last_inuse_relaylog(0),
+ cur_log_old_open_count(0), error_on_rli_init_info(false),
+ group_relay_log_pos(0), event_relay_log_pos(0),
#if HAVE_valgrind
is_fake(FALSE),
#endif
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
- last_master_timestamp(0), slave_skip_counter(0),
- abort_pos_wait(0), slave_run_id(0), sql_thd(0),
- inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
- until_log_pos(0), retried_trans(0),
- tables_to_lock(0), tables_to_lock_count(0),
- last_event_start_time(0), deferred_events(NULL),m_flags(0),
- row_stmt_start_timestamp(0), long_find_row_note_printed(false),
- m_annotate_event(0)
+ last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
+ abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
+ gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
+ slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
+ until_log_pos(0), retried_trans(0), executed_entries(0),
+ m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
+ relay_log.is_relay_log= TRUE;
#ifdef HAVE_PSI_INTERFACE
relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
key_RELAYLOG_update_cond,
@@ -71,20 +81,18 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
group_relay_log_name[0]= event_relay_log_name[0]=
group_master_log_name[0]= 0;
until_log_name[0]= ign_master_log_name_end[0]= 0;
+ max_relay_log_size= global_system_variables.max_relay_log_size;
bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf));
- cached_charset_invalidate();
mysql_mutex_init(key_relay_log_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_data_lock,
&data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_log_space_lock,
&log_space_lock, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
- mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@@ -94,17 +102,15 @@ Relay_log_info::~Relay_log_info()
{
DBUG_ENTER("Relay_log_info::~Relay_log_info");
+ reset_inuse_relaylog();
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
- mysql_mutex_destroy(&sleep_lock);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
- mysql_cond_destroy(&sleep_cond);
relay_log.cleanup();
- free_annotate_event();
DBUG_VOID_RETURN;
}
@@ -131,8 +137,6 @@ int init_relay_log_info(Relay_log_info* rli,
rli->abort_pos_wait=0;
rli->log_space_limit= relay_log_space_limit;
rli->log_space_total= 0;
- rli->tables_to_lock= 0;
- rli->tables_to_lock_count= 0;
char pattern[FN_REFLEN];
(void) my_realpath(pattern, slave_load_tmpdir, 0);
@@ -153,15 +157,6 @@ int init_relay_log_info(Relay_log_info* rli,
event, in flush_master_info(mi, 1, ?).
*/
- /*
- For the maximum log size, we choose max_relay_log_size if it is
- non-zero, max_binlog_size otherwise. If later the user does SET
- GLOBAL on one of these variables, fix_max_binlog_size and
- fix_max_relay_log_size will reconsider the choice (for example
- if the user changes max_relay_log_size to zero, we have to
- switch to using max_binlog_size for the relay log) and update
- rli->relay_log.max_size (and mysql_bin_log.max_size).
- */
{
/* Reports an error and returns, if the --relay-log's path
is a directory.*/
@@ -211,21 +206,42 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
name_warning_sent= 1;
}
- rli->relay_log.is_relay_log= TRUE;
+ /* For multimaster, add connection name to relay log filenames */
+ Master_info* mi= rli->mi;
+ char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
+ char *buf_relaylog_index_name= opt_relaylog_index_name;
+ mysql_mutex_t *log_lock;
+
+ create_logfile_name_with_suffix(buf_relay_logname,
+ sizeof(buf_relay_logname),
+ ln, 1, &mi->cmp_connection_name);
+ ln= buf_relay_logname;
+
+ if (opt_relaylog_index_name)
+ {
+ buf_relaylog_index_name= buf_relaylog_index_name_buff;
+ create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
+ sizeof(buf_relaylog_index_name_buff),
+ opt_relaylog_index_name, 0,
+ &mi->cmp_connection_name);
+ }
/*
note, that if open() fails, we'll still have index file open
but a destructor will take care of that
*/
- if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE) ||
- rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
- (max_relay_log_size ? max_relay_log_size :
- max_binlog_size), 1, TRUE))
+ log_lock= rli->relay_log.get_log_lock();
+ mysql_mutex_lock(log_lock);
+ if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
+ rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND,
+ mi->rli.max_relay_log_size, 1, TRUE))
{
+ mysql_mutex_unlock(log_lock);
mysql_mutex_unlock(&rli->data_lock);
- sql_print_error("Failed in open_log() called from init_relay_log_info()");
+ sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno);
DBUG_RETURN(1);
}
+ mysql_mutex_unlock(log_lock);
}
/* if file does not exist */
@@ -242,7 +258,7 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
{
sql_print_error("Failed to create a new relay log info file (\
file '%s', errno %d)", fname, my_errno);
- msg= current_thd->stmt_da->message();
+ msg= current_thd->get_stmt_da()->message();
goto err;
}
if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
@@ -250,7 +266,7 @@ file '%s', errno %d)", fname, my_errno);
{
sql_print_error("Failed to create a cache on relay log info file '%s'",
fname);
- msg= current_thd->stmt_da->message();
+ msg= current_thd->get_stmt_da()->message();
goto err;
}
@@ -299,20 +315,80 @@ Failed to open the existing relay log info file '%s' (errno %d)",
}
rli->info_fd = info_fd;
- int relay_log_pos, master_log_pos;
+ int relay_log_pos, master_log_pos, lines;
+ char *first_non_digit;
+ /*
+ In MySQL 5.6, there is a MASTER_DELAY option to CHANGE MASTER. This is
+ not yet merged into MariaDB (as of 10.0.13). However, we detect the
+ presense of the new option in relay-log.info, as a placeholder for
+ possible later merge of the feature, and to maintain file format
+ compatibility with MySQL 5.6+.
+ */
+ int dummy_sql_delay;
+
+ /*
+ Starting from MySQL 5.6.x, relay-log.info has a new format.
+ Now, its first line contains the number of lines in the file.
+ By reading this number we can determine which version our master.info
+ comes from. We can't simply count the lines in the file, since
+ versions before 5.6.x could generate files with more lines than
+ needed. If first line doesn't contain a number, or if it
+ contains a number less than LINES_IN_RELAY_LOG_INFO_WITH_DELAY,
+ then the file is treated like a file from pre-5.6.x version.
+ There is no ambiguity when reading an old master.info: before
+ 5.6.x, the first line contained the binlog's name, which is
+ either empty or has an extension (contains a '.'), so can't be
+ confused with an integer.
+
+ So we're just reading first line and trying to figure which
+ version is this.
+ */
+
+ /*
+ The first row is temporarily stored in mi->master_log_name, if
+ it is line count and not binlog name (new format) it will be
+ overwritten by the second row later.
+ */
if (init_strvar_from_file(rli->group_relay_log_name,
sizeof(rli->group_relay_log_name),
+ &rli->info_file, ""))
+ {
+ msg="Error reading slave log configuration";
+ goto err;
+ }
+
+ lines= strtoul(rli->group_relay_log_name, &first_non_digit, 10);
+
+ if (rli->group_relay_log_name[0] != '\0' &&
+ *first_non_digit == '\0' &&
+ lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY)
+ {
+ DBUG_PRINT("info", ("relay_log_info file is in new format."));
+ /* Seems to be new format => read relay log name from next line */
+ if (init_strvar_from_file(rli->group_relay_log_name,
+ sizeof(rli->group_relay_log_name),
+ &rli->info_file, ""))
+ {
+ msg="Error reading slave log configuration";
+ goto err;
+ }
+ }
+ else
+ DBUG_PRINT("info", ("relay_log_info file is in old format."));
+
+ if (init_intvar_from_file(&relay_log_pos,
+ &rli->info_file, BIN_LOG_HEADER_SIZE) ||
+ init_strvar_from_file(rli->group_master_log_name,
+ sizeof(rli->group_master_log_name),
&rli->info_file, "") ||
- init_intvar_from_file(&relay_log_pos,
- &rli->info_file, BIN_LOG_HEADER_SIZE) ||
- init_strvar_from_file(rli->group_master_log_name,
- sizeof(rli->group_master_log_name),
- &rli->info_file, "") ||
- init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
+ init_intvar_from_file(&master_log_pos, &rli->info_file, 0) ||
+ (lines >= LINES_IN_RELAY_LOG_INFO_WITH_DELAY &&
+ init_intvar_from_file(&dummy_sql_delay, &rli->info_file, 0)))
{
msg="Error reading slave log configuration";
goto err;
}
+
strmake_buf(rli->event_relay_log_name,rli->group_relay_log_name);
rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
rli->group_master_log_pos= master_log_pos;
@@ -320,6 +396,7 @@ Failed to open the existing relay log info file '%s' (errno %d)",
if (rli->is_relay_log_recovery && init_recovery(rli->mi, &msg))
goto err;
+ rli->relay_log_state.load(rpl_global_gtid_slave_state);
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
@@ -363,7 +440,7 @@ Failed to open the existing relay log info file '%s' (errno %d)",
rli->inited= 1;
rli->error_on_rli_init_info= false;
mysql_mutex_unlock(&rli->data_lock);
- DBUG_RETURN(error);
+ DBUG_RETURN(0);
err:
rli->error_on_rli_init_info= true;
@@ -444,6 +521,90 @@ void Relay_log_info::clear_until_condition()
/*
+ Read the correct format description event for starting to replicate from
+ a given position in a relay log file.
+*/
+Format_description_log_event *
+read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos,
+ const char **errmsg)
+{
+ Log_event *ev;
+ Format_description_log_event *fdev;
+ bool found= false;
+
+ /*
+ By default the relay log is in binlog format 3 (4.0).
+ Even if format is 4, this will work enough to read the first event
+ (Format_desc) (remember that format 4 is just lenghtened compared to format
+ 3; format 3 is a prefix of format 4).
+ */
+ fdev= new Format_description_log_event(3);
+
+ while (!found)
+ {
+ Log_event_type typ;
+
+ /*
+ Read the possible Format_description_log_event; if position
+ was 4, no need, it will be read naturally.
+ */
+ DBUG_PRINT("info",("looking for a Format_description_log_event"));
+
+ if (my_b_tell(cur_log) >= start_pos)
+ break;
+
+ if (!(ev= Log_event::read_log_event(cur_log, 0, fdev,
+ opt_slave_sql_verify_checksum)))
+ {
+ DBUG_PRINT("info",("could not read event, cur_log->error=%d",
+ cur_log->error));
+ if (cur_log->error) /* not EOF */
+ {
+ *errmsg= "I/O error reading event at position 4";
+ delete fdev;
+ return NULL;
+ }
+ break;
+ }
+ typ= ev->get_type_code();
+ if (typ == FORMAT_DESCRIPTION_EVENT)
+ {
+ DBUG_PRINT("info",("found Format_description_log_event"));
+ delete fdev;
+ fdev= (Format_description_log_event*) ev;
+ /*
+ As ev was returned by read_log_event, it has passed is_valid(), so
+ my_malloc() in ctor worked, no need to check again.
+ */
+ /*
+ Ok, we found a Format_description event. But it is not sure that this
+ describes the whole relay log; indeed, one can have this sequence
+ (starting from position 4):
+ Format_desc (of slave)
+ Rotate (of master)
+ Format_desc (of master)
+ So the Format_desc which really describes the rest of the relay log
+ is the 3rd event (it can't be further than that, because we rotate
+ the relay log when we queue a Rotate event from the master).
+ But what describes the Rotate is the first Format_desc.
+ So what we do is:
+ go on searching for Format_description events, until you exceed the
+ position (argument 'pos') or until you find another event than Rotate
+ or Format_desc.
+ */
+ }
+ else
+ {
+ DBUG_PRINT("info",("found event of another type=%d", typ));
+ found= (typ != ROTATE_EVENT);
+ delete ev;
+ }
+ }
+ return fdev;
+}
+
+
+/*
Open the given relay log
SYNOPSIS
@@ -518,6 +679,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
}
rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
/*
Test to see if the previous run was with the skip of purging
@@ -564,68 +727,13 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
*/
if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
{
- Log_event* ev;
- while (look_for_description_event)
+ if (look_for_description_event)
{
- /*
- Read the possible Format_description_log_event; if position
- was 4, no need, it will be read naturally.
- */
- DBUG_PRINT("info",("looking for a Format_description_log_event"));
-
- if (my_b_tell(rli->cur_log) >= pos)
- break;
-
- /*
- Because of we have rli->data_lock and log_lock, we can safely read an
- event
- */
- if (!(ev= Log_event::read_log_event(rli->cur_log, 0,
- rli->relay_log.description_event_for_exec,
- opt_slave_sql_verify_checksum)))
- {
- DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
- rli->cur_log->error));
- if (rli->cur_log->error) /* not EOF */
- {
- *errmsg= "I/O error reading event at position 4";
- goto err;
- }
- break;
- }
- else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
- {
- DBUG_PRINT("info",("found Format_description_log_event"));
- delete rli->relay_log.description_event_for_exec;
- rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
- /*
- As ev was returned by read_log_event, it has passed is_valid(), so
- my_malloc() in ctor worked, no need to check again.
- */
- /*
- Ok, we found a Format_description event. But it is not sure that this
- describes the whole relay log; indeed, one can have this sequence
- (starting from position 4):
- Format_desc (of slave)
- Rotate (of master)
- Format_desc (of master)
- So the Format_desc which really describes the rest of the relay log
- is the 3rd event (it can't be further than that, because we rotate
- the relay log when we queue a Rotate event from the master).
- But what describes the Rotate is the first Format_desc.
- So what we do is:
- go on searching for Format_description events, until you exceed the
- position (argument 'pos') or until you find another event than Rotate
- or Format_desc.
- */
- }
- else
- {
- DBUG_PRINT("info",("found event of another type=%d",
- ev->get_type_code()));
- look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
- delete ev;
- }
+ Format_description_log_event *fdev;
+ if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg)))
+ goto err;
+ delete rli->relay_log.description_event_for_exec;
+ rli->relay_log.description_event_for_exec= fdev;
}
my_b_seek(rli->cur_log,(off_t)pos);
#ifndef DBUG_OFF
@@ -692,7 +800,7 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
ulong init_abort_pos_wait;
int error=0;
struct timespec abstime; // for timeout checking
- const char *msg;
+ PSI_stage_info old_stage;
DBUG_ENTER("Relay_log_info::wait_for_pos");
if (!inited)
@@ -703,9 +811,9 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
set_timespec(abstime,timeout);
mysql_mutex_lock(&data_lock);
- msg= thd->enter_cond(&data_cond, &data_lock,
- "Waiting for the slave SQL thread to "
- "advance position");
+ thd->ENTER_COND(&data_cond, &data_lock,
+ &stage_waiting_for_the_slave_thread_to_advance_position,
+ &old_stage);
/*
This function will abort when it notices that some CHANGE MASTER or
RESET MASTER has changed the master info.
@@ -730,7 +838,7 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
ulong log_name_extension;
char log_name_tmp[FN_REFLEN]; //make a char[] from String
- strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
+ strmake(log_name_tmp, log_name->ptr(), MY_MIN(log_name->length(), FN_REFLEN-1));
char *p= fn_ext(log_name_tmp);
char *p_end;
@@ -740,7 +848,7 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
goto err;
}
// Convert 0-3 to 4
- log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
+ log_pos= MY_MAX(log_pos, BIN_LOG_HEADER_SIZE);
/* p points to '.' */
log_name_extension= strtoul(++p, &p_end, 10);
/*
@@ -849,7 +957,7 @@ int Relay_log_info::wait_for_pos(THD* thd, String* log_name,
}
err:
- thd->exit_cond(msg);
+ thd->EXIT_COND(&old_stage);
DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d \
improper_arguments: %d timed_out: %d",
thd->killed_errno(),
@@ -867,17 +975,65 @@ improper_arguments: %d timed_out: %d",
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
- bool skip_lock)
+ rpl_group_info *rgi,
+ bool skip_lock)
{
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
if (!skip_lock)
mysql_mutex_lock(&data_lock);
- inc_event_relay_log_pos();
- group_relay_log_pos= event_relay_log_pos;
- strmake_buf(group_relay_log_name,event_relay_log_name);
+ rgi->inc_event_relay_log_pos();
+ DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
+ (long) log_pos, (long) group_master_log_pos));
+ if (rgi->is_parallel_exec)
+ {
+ /* In case of parallel replication, do not update the position backwards. */
+ int cmp= strcmp(group_relay_log_name, rgi->event_relay_log_name);
+ if (cmp < 0)
+ {
+ group_relay_log_pos= rgi->future_event_relay_log_pos;
+ strmake_buf(group_relay_log_name, rgi->event_relay_log_name);
+ notify_group_relay_log_name_update();
+ } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
+ group_relay_log_pos= rgi->future_event_relay_log_pos;
+
+ /*
+ In the parallel case we need to update the master_log_name here, rather
+ than in Rotate_log_event::do_update_pos().
+ */
+ cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name);
+ if (cmp <= 0)
+ {
+ if (cmp < 0)
+ {
+ strcpy(group_master_log_name, rgi->future_event_master_log_name);
+ group_master_log_pos= log_pos;
+ }
+ else if (group_master_log_pos < log_pos)
+ group_master_log_pos= log_pos;
+ }
- notify_group_relay_log_name_update();
+ /*
+ In the parallel case, we only update the Seconds_Behind_Master at the
+ end of a transaction. In the non-parallel case, the value is updated as
+ soon as an event is read from the relay log; however this would be too
+ confusing for the user, seeing the slave reported as up-to-date when
+ potentially thousands of events are still queued up for worker threads
+ waiting for execution.
+ */
+ if (rgi->last_master_timestamp &&
+ rgi->last_master_timestamp > last_master_timestamp)
+ last_master_timestamp= rgi->last_master_timestamp;
+ }
+ else
+ {
+ /* Non-parallel case. */
+ group_relay_log_pos= event_relay_log_pos;
+ strmake_buf(group_relay_log_name, event_relay_log_name);
+ notify_group_relay_log_name_update();
+ if (log_pos) // 3.23 binlogs don't have log_posx
+ group_master_log_pos= log_pos;
+ }
/*
If the slave does not support transactions and replicates a transaction,
@@ -909,12 +1065,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
the relay log is not "val".
With the end_log_pos solution, we avoid computations involving lengthes.
*/
- DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
- (long) log_pos, (long) group_master_log_pos));
- if (log_pos) // 3.23 binlogs don't have log_posx
- {
- group_master_log_pos= log_pos;
- }
mysql_cond_broadcast(&data_cond);
if (!skip_lock)
mysql_mutex_unlock(&data_lock);
@@ -930,6 +1080,9 @@ void Relay_log_info::close_temporary_tables()
for (table=save_temporary_tables ; table ; table=next)
{
next=table->next;
+
+ /* Reset in_use as the table may have been created by another thd */
+ table->in_use=0;
/*
Don't ask for disk deletion. For now, anyway they will be deleted when
slave restarts, but it is a better intention to not delete them.
@@ -992,8 +1145,8 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
"log index file:%s.", rli->relay_log.get_index_fname());
DBUG_RETURN(1);
}
- if (rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
- (max_relay_log_size ? max_relay_log_size :
+ if (rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND,
+ (rli->max_relay_log_size ? rli->max_relay_log_size :
max_binlog_size), 1, TRUE))
{
sql_print_error("Unable to purge relay log files. Failed to open relay "
@@ -1025,26 +1178,36 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
rli->cur_log_fd= -1;
}
- if (rli->relay_log.reset_logs(thd))
+ if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0))
{
*errmsg = "Failed during log reset";
error=1;
goto err;
}
- /* Save name of used relay log file */
- strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
- strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
- rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
- if (count_relay_log_space(rli))
- {
- *errmsg= "Error counting relay log space";
- error=1;
- goto err;
- }
+ rli->relay_log_state.load(rpl_global_gtid_slave_state);
if (!just_reset)
+ {
+ /* Save name of used relay log file */
+ strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
+ strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
+ rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ rli->log_space_total= 0;
+
+ if (count_relay_log_space(rli))
+ {
+ *errmsg= "Error counting relay log space";
+ error=1;
+ goto err;
+ }
error= init_relay_log_pos(rli, rli->group_relay_log_name,
rli->group_relay_log_pos,
0 /* do not need data lock */, errmsg, 0);
+ }
+ else
+ {
+ /* Ensure relay log names are not used */
+ rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0;
+ }
if (!rli->inited && rli->error_on_rli_init_info)
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
@@ -1097,16 +1260,19 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
ulonglong log_pos;
DBUG_ENTER("Relay_log_info::is_until_satisfied");
- DBUG_ASSERT(until_condition != UNTIL_NONE);
+ DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
+ until_condition == UNTIL_RELAY_POS);
if (until_condition == UNTIL_MASTER_POS)
{
- if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
+ if (ev && ev->server_id == (uint32) global_system_variables.server_id &&
+ !replicate_same_server_id)
DBUG_RETURN(FALSE);
- log_name= group_master_log_name;
- log_pos= (!ev)? group_master_log_pos :
- ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ?
- group_master_log_pos : ev->log_pos - ev->data_written);
+ log_name= (mi->using_parallel() ?
+ future_event_master_log_name : group_master_log_name);
+ log_pos= ((!ev)? group_master_log_pos :
+ (get_flag(IN_TRANSACTION) || !ev->log_pos) ?
+ group_master_log_pos : ev->log_pos - ev->data_written);
}
else
{ /* until_condition == UNTIL_RELAY_POS */
@@ -1175,43 +1341,22 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
}
-void Relay_log_info::cached_charset_invalidate()
-{
- DBUG_ENTER("Relay_log_info::cached_charset_invalidate");
-
- /* Full of zeroes means uninitialized. */
- bzero(cached_charset, sizeof(cached_charset));
- DBUG_VOID_RETURN;
-}
-
-
-bool Relay_log_info::cached_charset_compare(char *charset) const
-{
- DBUG_ENTER("Relay_log_info::cached_charset_compare");
-
- if (memcmp(cached_charset, charset, sizeof(cached_charset)))
- {
- memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
- DBUG_RETURN(1);
- }
- DBUG_RETURN(0);
-}
-
-
-void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
- time_t event_creation_time)
+bool Relay_log_info::stmt_done(my_off_t event_master_log_pos, THD *thd,
+ rpl_group_info *rgi)
{
-#ifndef DBUG_OFF
- extern uint debug_not_change_ts_if_art_event;
-#endif
- clear_flag(IN_STMT);
+ int error= 0;
+ DBUG_ENTER("Relay_log_info::stmt_done");
+ DBUG_ASSERT(rgi->rli == this);
/*
If in a transaction, and if the slave supports transactions, just
inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
BEGIN/COMMIT, not with SET AUTOCOMMIT= .
+ We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
+ is also used for single row transactions.
+
CAUTION: opt_using_transactions means innodb || bdb ; suppose the
master supports InnoDB and BDB, but the slave supports only BDB,
problems will arise: - suppose an InnoDB table is created on the
@@ -1229,32 +1374,416 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
- if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
- inc_event_relay_log_pos();
+ if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
+ opt_using_transactions)
+ rgi->inc_event_relay_log_pos();
else
{
- inc_group_relay_log_pos(event_master_log_pos);
- flush_relay_log_info(this);
- /*
- Note that Rotate_log_event::do_apply_event() does not call this
- function, so there is no chance that a fake rotate event resets
- last_master_timestamp. Note that we update without mutex
- (probably ok - except in some very rare cases, only consequence
- is that value may take some time to display in
- Seconds_Behind_Master - not critical).
- */
- if (!(event_creation_time == 0 &&
- IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)))
- last_master_timestamp= event_creation_time;
+ inc_group_relay_log_pos(event_master_log_pos, rgi);
+ if (rpl_global_gtid_slave_state->record_and_update_gtid(thd, rgi))
+ {
+ report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
+ "Failed to update GTID state in %s.%s, slave state may become "
+ "inconsistent: %d: %s",
+ "mysql", rpl_gtid_slave_state_table_name.str,
+ thd->get_stmt_da()->sql_errno(), thd->get_stmt_da()->message());
+ /*
+ At this point we are not in a transaction (for example after DDL),
+ so we can not roll back. Anyway, normally updates to the slave
+ state table should not fail, and if they do, at least we made the
+ DBA aware of the problem in the error log.
+ */
+ }
+ DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ if (flush_relay_log_info(this))
+ error= 1;
+ DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
+ }
+ DBUG_RETURN(error);
+}
+
+
+int
+Relay_log_info::alloc_inuse_relaylog(const char *name)
+{
+ inuse_relaylog *ir;
+ uint32 gtid_count;
+ rpl_gtid *gtid_list;
+
+ if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
+ return 1;
+ }
+ gtid_count= relay_log_state.count();
+ if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count,
+ MYF(MY_WME))))
+ {
+ my_free(ir);
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
+ return 1;
+ }
+ if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
+ {
+ my_free(gtid_list);
+ my_free(ir);
+ DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ return 1;
+ }
+ ir->rli= this;
+ strmake_buf(ir->name, name);
+ ir->relay_log_state= gtid_list;
+ ir->relay_log_state_count= gtid_count;
+
+ if (!inuse_relaylog_list)
+ inuse_relaylog_list= ir;
+ else
+ {
+ last_inuse_relaylog->completed= true;
+ last_inuse_relaylog->next= ir;
+ }
+ last_inuse_relaylog= ir;
+ my_atomic_rwlock_init(&ir->inuse_relaylog_atomic_lock);
+
+ return 0;
+}
+
+
+void
+Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
+{
+ my_free(ir->relay_log_state);
+ my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock);
+ my_free(ir);
+}
+
+
+void
+Relay_log_info::reset_inuse_relaylog()
+{
+ inuse_relaylog *cur= inuse_relaylog_list;
+ while (cur)
+ {
+ DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
+ inuse_relaylog *next= cur->next;
+ free_inuse_relaylog(cur);
+ cur= next;
+ }
+ inuse_relaylog_list= last_inuse_relaylog= NULL;
+}
+
+
+int
+Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
+{
+ int res= 0;
+ while (count)
+ {
+ if (relay_log_state.update_nolock(gtid_list, false))
+ res= 1;
+ ++gtid_list;
+ --count;
}
+ return res;
}
+
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-void Relay_log_info::cleanup_context(THD *thd, bool error)
+int
+rpl_load_gtid_slave_state(THD *thd)
{
- DBUG_ENTER("Relay_log_info::cleanup_context");
+ TABLE_LIST tlist;
+ TABLE *table;
+ bool table_opened= false;
+ bool table_scanned= false;
+ bool array_inited= false;
+ struct local_element { uint64 sub_id; rpl_gtid gtid; };
+ struct local_element tmp_entry, *entry;
+ HASH hash;
+ DYNAMIC_ARRAY array;
+ int err= 0;
+ uint32 i;
+ DBUG_ENTER("rpl_load_gtid_slave_state");
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ bool loaded= rpl_global_gtid_slave_state->loaded;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (loaded)
+ DBUG_RETURN(0);
+
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0))))
+ goto end;
+ array_inited= true;
+
+ mysql_reset_thd_for_next_command(thd);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_READ);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
+
+ bitmap_set_all(table->read_set);
+ if ((err= table->file->ha_rnd_init_with_error(1)))
+ {
+ table->file->print_error(err, MYF(0));
+ goto end;
+ }
+ table_scanned= true;
+ for (;;)
+ {
+ uint32 domain_id, server_id;
+ uint64 sub_id, seq_no;
+ uchar *rec;
+
+ if ((err= table->file->ha_rnd_next(table->record[0])))
+ {
+ if (err == HA_ERR_RECORD_DELETED)
+ continue;
+ else if (err == HA_ERR_END_OF_FILE)
+ break;
+ else
+ {
+ table->file->print_error(err, MYF(0));
+ goto end;
+ }
+ }
+ domain_id= (ulonglong)table->field[0]->val_int();
+ sub_id= (ulonglong)table->field[1]->val_int();
+ server_id= (ulonglong)table->field[2]->val_int();
+ seq_no= (ulonglong)table->field[3]->val_int();
+ DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
+ (unsigned)domain_id, (unsigned)server_id,
+ (ulong)seq_no, (ulong)sub_id));
+
+ tmp_entry.sub_id= sub_id;
+ tmp_entry.gtid.domain_id= domain_id;
+ tmp_entry.gtid.server_id= server_id;
+ tmp_entry.gtid.seq_no= seq_no;
+ if ((err= insert_dynamic(&array, (uchar *)&tmp_entry)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ goto end;
+ }
+
+ if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ {
+ entry= (struct local_element *)rec;
+ if (entry->sub_id >= sub_id)
+ continue;
+ entry->sub_id= sub_id;
+ DBUG_ASSERT(entry->gtid.domain_id == domain_id);
+ entry->gtid.server_id= server_id;
+ entry->gtid.seq_no= seq_no;
+ }
+ else
+ {
+ if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
+ MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
+ err= 1;
+ goto end;
+ }
+ entry->sub_id= sub_id;
+ entry->gtid.domain_id= domain_id;
+ entry->gtid.server_id= server_id;
+ entry->gtid.seq_no= seq_no;
+ if ((err= my_hash_insert(&hash, (uchar *)entry)))
+ {
+ my_free(entry);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ goto end;
+ }
+ }
+ }
- DBUG_ASSERT(sql_thd == thd);
+ mysql_mutex_lock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ if (rpl_global_gtid_slave_state->loaded)
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ goto end;
+ }
+
+ for (i= 0; i < array.elements; ++i)
+ {
+ get_dynamic(&array, (uchar *)&tmp_entry, i);
+ if ((err= rpl_global_gtid_slave_state->update(tmp_entry.gtid.domain_id,
+ tmp_entry.gtid.server_id,
+ tmp_entry.sub_id,
+ tmp_entry.gtid.seq_no,
+ NULL)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ goto end;
+ }
+ }
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ entry= (struct local_element *)my_hash_element(&hash, i);
+ if (opt_bin_log &&
+ mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
+ entry->gtid.seq_no))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ goto end;
+ }
+ }
+
+ rpl_global_gtid_slave_state->loaded= true;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state->LOCK_slave_state);
+
+ err= 0; /* Clear HA_ERR_END_OF_FILE */
+
+end:
+ if (table_scanned)
+ {
+ table->file->ha_index_or_rnd_end();
+ ha_commit_trans(thd, FALSE);
+ ha_commit_trans(thd, TRUE);
+ }
+ if (table_opened)
+ {
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ }
+ if (array_inited)
+ delete_dynamic(&array);
+ my_hash_free(&hash);
+ DBUG_RETURN(err);
+}
+
+
+void
+rpl_group_info::reinit(Relay_log_info *rli)
+{
+ this->rli= rli;
+ tables_to_lock= NULL;
+ tables_to_lock_count= 0;
+ trans_retries= 0;
+ last_event_start_time= 0;
+ gtid_sub_id= 0;
+ commit_id= 0;
+ gtid_pending= false;
+ worker_error= 0;
+ row_stmt_start_timestamp= 0;
+ long_find_row_note_printed= false;
+ did_mark_start_commit= false;
+ last_master_timestamp = 0;
+ gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
+ commit_orderer.reinit();
+}
+
+rpl_group_info::rpl_group_info(Relay_log_info *rli)
+ : thd(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), parallel_entry(0),
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
+{
+ reinit(rli);
+ bzero(&current_gtid, sizeof(current_gtid));
+ mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
+}
+
+
+rpl_group_info::~rpl_group_info()
+{
+ free_annotate_event();
+ delete deferred_events;
+ mysql_mutex_destroy(&sleep_lock);
+ mysql_cond_destroy(&sleep_cond);
+}
+
+
+int
+event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
+{
+ uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id);
+ if (!sub_id)
+ {
+ /* Out of memory caused hash insertion to fail. */
+ return 1;
+ }
+ rgi->gtid_sub_id= sub_id;
+ rgi->current_gtid.domain_id= gev->domain_id;
+ rgi->current_gtid.server_id= gev->server_id;
+ rgi->current_gtid.seq_no= gev->seq_no;
+ rgi->commit_id= gev->commit_id;
+ rgi->gtid_pending= true;
+ return 0;
+}
+
+
+void
+delete_or_keep_event_post_apply(rpl_group_info *rgi,
+ Log_event_type typ, Log_event *ev)
+{
+ /*
+ ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
+ thread-safe for parallel replication.
+ */
+
+ switch (typ) {
+ case FORMAT_DESCRIPTION_EVENT:
+ /*
+ Format_description_log_event should not be deleted because it
+ will be used to read info about the relay log's format;
+ it will be deleted when the SQL thread does not need it,
+ i.e. when this thread terminates.
+ */
+ break;
+ case ANNOTATE_ROWS_EVENT:
+ /*
+ Annotate_rows event should not be deleted because after it has
+ been applied, thd->query points to the string inside this event.
+ The thd->query will be used to generate new Annotate_rows event
+ during applying the subsequent Rows events.
+ */
+ rgi->set_annotate_event((Annotate_rows_log_event*) ev);
+ break;
+ case DELETE_ROWS_EVENT_V1:
+ case UPDATE_ROWS_EVENT_V1:
+ case WRITE_ROWS_EVENT_V1:
+ case DELETE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case WRITE_ROWS_EVENT:
+ /*
+ After the last Rows event has been applied, the saved Annotate_rows
+ event (if any) is not needed anymore and can be deleted.
+ */
+ if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
+ rgi->free_annotate_event();
+ /* fall through */
+ default:
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ if (!rgi->is_deferred_event(ev))
+ delete ev;
+ break;
+ }
+}
+
+
+void rpl_group_info::cleanup_context(THD *thd, bool error)
+{
+ DBUG_ENTER("rpl_group_info::cleanup_context");
+ DBUG_PRINT("enter", ("error: %d", (int) error));
+
+ DBUG_ASSERT(this->thd == thd);
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because
@@ -1270,13 +1799,38 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
if (error)
{
trans_rollback_stmt(thd); // if a "statement transaction"
+ /* trans_rollback() also resets OPTION_GTID_BEGIN */
trans_rollback(thd); // if a "real transaction"
+ /*
+ Now that we have rolled back the transaction, make sure we do not
+ erroneously update the GTID position.
+ */
+ gtid_pending= false;
}
m_table_map.clear_tables();
slave_close_thread_tables(thd);
if (error)
+ {
thd->mdl_context.release_transactional_locks();
- clear_flag(IN_STMT);
+
+ if (thd == rli->sql_driver_thd)
+ {
+ /*
+ Reset flags. This is needed to handle incident events and errors in
+ the relay log noticed by the sql driver thread.
+ */
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
+ }
+
+ /*
+ Ensure we always release the domain for others to process, when using
+ --gtid-ignore-duplicates.
+ */
+ if (gtid_ignore_duplicate_state != GTID_DUPLICATE_NULL)
+ rpl_global_gtid_slave_state->release_domain_owner(this);
+ }
+
/*
Cleanup for the flags that have been set at do_apply_event.
*/
@@ -1291,12 +1845,18 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
reset_row_stmt_start_timestamp();
unset_long_find_row_note_printed();
+ DBUG_EXECUTE_IF("inject_sleep_gtid_100_x_x", {
+ if (current_gtid.domain_id == 100)
+ my_sleep(50000);
+ };);
+
DBUG_VOID_RETURN;
}
-void Relay_log_info::clear_tables_to_lock()
+
+void rpl_group_info::clear_tables_to_lock()
{
- DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
+ DBUG_ENTER("rpl_group_info::clear_tables_to_lock()");
#ifndef DBUG_OFF
/**
When replicating in RBR and MyISAM Merge tables are involved
@@ -1340,12 +1900,13 @@ void Relay_log_info::clear_tables_to_lock()
DBUG_VOID_RETURN;
}
-void Relay_log_info::slave_close_thread_tables(THD *thd)
+
+void rpl_group_info::slave_close_thread_tables(THD *thd)
{
- DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
- thd->stmt_da->can_overwrite_status= TRUE;
+ DBUG_ENTER("rpl_group_info::slave_close_thread_tables(THD *thd)");
+ thd->get_stmt_da()->set_overwrite_status(true);
thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
- thd->stmt_da->can_overwrite_status= FALSE;
+ thd->get_stmt_da()->set_overwrite_status(false);
close_thread_tables(thd);
/*
@@ -1373,4 +1934,128 @@ void Relay_log_info::slave_close_thread_tables(THD *thd)
clear_tables_to_lock();
DBUG_VOID_RETURN;
}
+
+
+
+static void
+mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco,
+ rpl_group_info *rgi)
+{
+ group_commit_orderer *tmp;
+ uint64 count= ++e->count_committing_event_groups;
+ /* Signal any following GCO whose wait_count has been reached now. */
+ tmp= gco;
+ while ((tmp= tmp->next_gco))
+ {
+ uint64 wait_count= tmp->wait_count;
+ if (wait_count > count)
+ break;
+ mysql_cond_broadcast(&tmp->COND_group_commit_orderer);
+ }
+}
+
+
+void
+rpl_group_info::mark_start_commit_no_lock()
+{
+ if (did_mark_start_commit)
+ return;
+ did_mark_start_commit= true;
+ mark_start_commit_inner(parallel_entry, gco, this);
+}
+
+
+void
+rpl_group_info::mark_start_commit()
+{
+ rpl_parallel_entry *e;
+
+ if (did_mark_start_commit)
+ return;
+ did_mark_start_commit= true;
+
+ e= this->parallel_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ mark_start_commit_inner(e, gco, this);
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+}
+
+
+/*
+ Format the current GTID as a string suitable for printing in error messages.
+
+ The string is stored in a buffer inside rpl_group_info, so remains valid
+ until next call to gtid_info() or until destruction of rpl_group_info.
+
+ If no GTID is available, then NULL is returned.
+*/
+char *
+rpl_group_info::gtid_info()
+{
+ if (!gtid_sub_id || !current_gtid.seq_no)
+ return NULL;
+ my_snprintf(gtid_info_buf, sizeof(gtid_info_buf), "Gtid %u-%u-%llu",
+ current_gtid.domain_id, current_gtid.server_id,
+ current_gtid.seq_no);
+ return gtid_info_buf;
+}
+
+
+/*
+ Undo the effect of a prior mark_start_commit().
+
+ This is only used for retrying a transaction in parallel replication, after
+ we have encountered a deadlock or other temporary error.
+
+ When we get such a deadlock, it means that the current group of transactions
+ did not yet all start committing (else they would not have deadlocked). So
+ we will not yet have woken up anything in the next group, our rgi->gco is
+ still live, and we can simply decrement the counter (to be incremented again
+ later, when the retry succeeds and reaches the commit step).
+*/
+void
+rpl_group_info::unmark_start_commit()
+{
+ rpl_parallel_entry *e;
+
+ if (!did_mark_start_commit)
+ return;
+ did_mark_start_commit= false;
+
+ e= this->parallel_entry;
+ mysql_mutex_lock(&e->LOCK_parallel_entry);
+ --e->count_committing_event_groups;
+ mysql_mutex_unlock(&e->LOCK_parallel_entry);
+}
+
+
+rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter)
+ : rpl_filter(filter)
+{
+ cached_charset_invalidate();
+}
+
+
+void rpl_sql_thread_info::cached_charset_invalidate()
+{
+ DBUG_ENTER("rpl_group_info::cached_charset_invalidate");
+
+ /* Full of zeroes means uninitialized. */
+ bzero(cached_charset, sizeof(cached_charset));
+ DBUG_VOID_RETURN;
+}
+
+
+bool rpl_sql_thread_info::cached_charset_compare(char *charset) const
+{
+ DBUG_ENTER("rpl_group_info::cached_charset_compare");
+
+ if (memcmp(cached_charset, charset, sizeof(cached_charset)))
+ {
+ memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
+ DBUG_RETURN(1);
+ }
+ DBUG_RETURN(0);
+}
+
#endif