summaryrefslogtreecommitdiff
path: root/sql/rpl_rli.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r--sql/rpl_rli.cc505
1 files changed, 435 insertions, 70 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index b01f74408a6..797f5681ec5 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -32,6 +32,15 @@
static int count_relay_log_space(Relay_log_info* rli);
+/**
+ Current replication state (hash of last GTID executed, per replication
+ domain).
+*/
+rpl_slave_state rpl_global_gtid_slave_state;
+/* Object used for MASTER_GTID_WAIT(). */
+gtid_waiting rpl_global_gtid_waiting;
+
+
// Defined in slave.cc
int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
@@ -42,23 +51,22 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period),
sync_counter(0), is_relay_log_recovery(is_slave_recovery),
- save_temporary_tables(0), cur_log_old_open_count(0), group_relay_log_pos(0),
+ save_temporary_tables(0), mi(0),
+ cur_log_old_open_count(0), group_relay_log_pos(0),
event_relay_log_pos(0),
#if HAVE_valgrind
is_fake(FALSE),
#endif
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
- last_master_timestamp(0), slave_skip_counter(0),
- abort_pos_wait(0), slave_run_id(0), sql_thd(0),
+ last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
+ abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
- until_log_pos(0), retried_trans(0),
- tables_to_lock(0), tables_to_lock_count(0),
- last_event_start_time(0), deferred_events(NULL),m_flags(0),
- row_stmt_start_timestamp(0), long_find_row_note_printed(false),
- m_annotate_event(0)
+ until_log_pos(0), retried_trans(0), executed_entries(0),
+ m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
+ relay_log.is_relay_log= TRUE;
#ifdef HAVE_PSI_INTERFACE
relay_log.set_psi_keys(key_RELAYLOG_LOCK_index,
key_RELAYLOG_update_cond,
@@ -70,6 +78,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
group_relay_log_name[0]= event_relay_log_name[0]=
group_master_log_name[0]= 0;
until_log_name[0]= ign_master_log_name_end[0]= 0;
+ max_relay_log_size= global_system_variables.max_relay_log_size;
bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf));
cached_charset_invalidate();
@@ -78,12 +87,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
&data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_log_space_lock,
&log_space_lock, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
- mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@@ -96,14 +103,11 @@ Relay_log_info::~Relay_log_info()
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
- mysql_mutex_destroy(&sleep_lock);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
- mysql_cond_destroy(&sleep_cond);
relay_log.cleanup();
- free_annotate_event();
DBUG_VOID_RETURN;
}
@@ -128,8 +132,6 @@ int init_relay_log_info(Relay_log_info* rli,
rli->abort_pos_wait=0;
rli->log_space_limit= relay_log_space_limit;
rli->log_space_total= 0;
- rli->tables_to_lock= 0;
- rli->tables_to_lock_count= 0;
char pattern[FN_REFLEN];
(void) my_realpath(pattern, slave_load_tmpdir, 0);
@@ -150,15 +152,6 @@ int init_relay_log_info(Relay_log_info* rli,
event, in flush_master_info(mi, 1, ?).
*/
- /*
- For the maximum log size, we choose max_relay_log_size if it is
- non-zero, max_binlog_size otherwise. If later the user does SET
- GLOBAL on one of these variables, fix_max_binlog_size and
- fix_max_relay_log_size will reconsider the choice (for example
- if the user changes max_relay_log_size to zero, we have to
- switch to using max_binlog_size for the relay log) and update
- rli->relay_log.max_size (and mysql_bin_log.max_size).
- */
{
/* Reports an error and returns, if the --relay-log's path
is a directory.*/
@@ -208,19 +201,35 @@ a file name for --relay-log-index option", opt_relaylog_index_name);
name_warning_sent= 1;
}
- rli->relay_log.is_relay_log= TRUE;
+ /* For multimaster, add connection name to relay log filenames */
+ Master_info* mi= rli->mi;
+ char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN];
+ char *buf_relaylog_index_name= opt_relaylog_index_name;
+
+ create_logfile_name_with_suffix(buf_relay_logname,
+ sizeof(buf_relay_logname),
+ ln, 1, &mi->cmp_connection_name);
+ ln= buf_relay_logname;
+
+ if (opt_relaylog_index_name)
+ {
+ buf_relaylog_index_name= buf_relaylog_index_name_buff;
+ create_logfile_name_with_suffix(buf_relaylog_index_name_buff,
+ sizeof(buf_relaylog_index_name_buff),
+ opt_relaylog_index_name, 0,
+ &mi->cmp_connection_name);
+ }
/*
note, that if open() fails, we'll still have index file open
but a destructor will take care of that
*/
- if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE) ||
- rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
- (max_relay_log_size ? max_relay_log_size :
- max_binlog_size), 1, TRUE))
+ if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) ||
+ rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND,
+ mi->rli.max_relay_log_size, 1, TRUE))
{
mysql_mutex_unlock(&rli->data_lock);
- sql_print_error("Failed in open_log() called from init_relay_log_info()");
+ sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno);
DBUG_RETURN(1);
}
}
@@ -512,6 +521,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
}
rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
/*
Test to see if the previous run was with the skip of purging
@@ -861,17 +872,54 @@ improper_arguments: %d timed_out: %d",
void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
- bool skip_lock)
+ rpl_group_info *rgi,
+ bool skip_lock)
{
DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos");
if (!skip_lock)
mysql_mutex_lock(&data_lock);
- inc_event_relay_log_pos();
- group_relay_log_pos= event_relay_log_pos;
- strmake_buf(group_relay_log_name,event_relay_log_name);
+ rgi->inc_event_relay_log_pos();
+ DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
+ (long) log_pos, (long) group_master_log_pos));
+ if (rgi->is_parallel_exec)
+ {
+ /* In case of parallel replication, do not update the position backwards. */
+ int cmp= strcmp(group_relay_log_name, event_relay_log_name);
+ if (cmp < 0)
+ {
+ group_relay_log_pos= rgi->future_event_relay_log_pos;
+ strmake_buf(group_relay_log_name, event_relay_log_name);
+ notify_group_relay_log_name_update();
+ } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos)
+ group_relay_log_pos= rgi->future_event_relay_log_pos;
- notify_group_relay_log_name_update();
+ /*
+ In the parallel case we need to update the master_log_name here, rather
+ than in Rotate_log_event::do_update_pos().
+ */
+ cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name);
+ if (cmp <= 0)
+ {
+ if (cmp < 0)
+ {
+ strcpy(group_master_log_name, rgi->future_event_master_log_name);
+ notify_group_master_log_name_update();
+ group_master_log_pos= log_pos;
+ }
+ else if (group_master_log_pos < log_pos)
+ group_master_log_pos= log_pos;
+ }
+ }
+ else
+ {
+ /* Non-parallel case. */
+ group_relay_log_pos= event_relay_log_pos;
+ strmake_buf(group_relay_log_name, event_relay_log_name);
+ notify_group_relay_log_name_update();
+ if (log_pos) // 3.23 binlogs don't have log_posx
+ group_master_log_pos= log_pos;
+ }
/*
If the slave does not support transactions and replicates a transaction,
@@ -903,12 +951,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos,
the relay log is not "val".
With the end_log_pos solution, we avoid computations involving lengthes.
*/
- DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu",
- (long) log_pos, (long) group_master_log_pos));
- if (log_pos) // 3.23 binlogs don't have log_posx
- {
- group_master_log_pos= log_pos;
- }
mysql_cond_broadcast(&data_cond);
if (!skip_lock)
mysql_mutex_unlock(&data_lock);
@@ -924,6 +966,9 @@ void Relay_log_info::close_temporary_tables()
for (table=save_temporary_tables ; table ; table=next)
{
next=table->next;
+
+ /* Reset in_use as the table may have been created by another thd */
+ table->in_use=0;
/*
Don't ask for disk deletion. For now, anyway they will be deleted when
slave restarts, but it is a better intention to not delete them.
@@ -995,26 +1040,35 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
rli->cur_log_fd= -1;
}
- if (rli->relay_log.reset_logs(thd))
+ if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0))
{
*errmsg = "Failed during log reset";
error=1;
goto err;
}
- /* Save name of used relay log file */
- strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
- strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
- rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
- if (count_relay_log_space(rli))
- {
- *errmsg= "Error counting relay log space";
- error=1;
- goto err;
- }
if (!just_reset)
+ {
+ /* Save name of used relay log file */
+ strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname());
+ strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname());
+ rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
+ rli->log_space_total= 0;
+
+ if (count_relay_log_space(rli))
+ {
+ *errmsg= "Error counting relay log space";
+ error=1;
+ goto err;
+ }
error= init_relay_log_pos(rli, rli->group_relay_log_name,
rli->group_relay_log_pos,
0 /* do not need data lock */, errmsg, 0);
+ }
+ else
+ {
+ /* Ensure relay log names are not used */
+ rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0;
+ }
err:
#ifndef DBUG_OFF
@@ -1065,16 +1119,18 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
ulonglong log_pos;
DBUG_ENTER("Relay_log_info::is_until_satisfied");
- DBUG_ASSERT(until_condition != UNTIL_NONE);
+ DBUG_ASSERT(until_condition == UNTIL_MASTER_POS ||
+ until_condition == UNTIL_RELAY_POS);
if (until_condition == UNTIL_MASTER_POS)
{
- if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id)
+ if (ev && ev->server_id == (uint32) global_system_variables.server_id &&
+ !replicate_same_server_id)
DBUG_RETURN(FALSE);
log_name= group_master_log_name;
- log_pos= (!ev)? group_master_log_pos :
- ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ?
- group_master_log_pos : ev->log_pos - ev->data_written);
+ log_pos= ((!ev)? group_master_log_pos :
+ (get_flag(IN_TRANSACTION) || !ev->log_pos) ?
+ group_master_log_pos : ev->log_pos - ev->data_written);
}
else
{ /* until_condition == UNTIL_RELAY_POS */
@@ -1167,19 +1223,24 @@ bool Relay_log_info::cached_charset_compare(char *charset) const
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
- time_t event_creation_time)
+ time_t event_creation_time, THD *thd,
+ rpl_group_info *rgi)
{
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
#endif
- clear_flag(IN_STMT);
+ DBUG_ENTER("Relay_log_info::stmt_done");
+ DBUG_ASSERT(rgi->rli == this);
/*
If in a transaction, and if the slave supports transactions, just
inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
BEGIN/COMMIT, not with SET AUTOCOMMIT= .
+ We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
+ is also used for single row transactions.
+
CAUTION: opt_using_transactions means innodb || bdb ; suppose the
master supports InnoDB and BDB, but the slave supports only BDB,
problems will arise: - suppose an InnoDB table is created on the
@@ -1197,12 +1258,30 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
- if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
- inc_event_relay_log_pos();
+ if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
+ opt_using_transactions)
+ rgi->inc_event_relay_log_pos();
else
{
- inc_group_relay_log_pos(event_master_log_pos);
- flush_relay_log_info(this);
+ inc_group_relay_log_pos(event_master_log_pos, rgi);
+ if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi))
+ {
+ report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
+ "Failed to update GTID state in %s.%s, slave state may become "
+ "inconsistent: %d: %s",
+ "mysql", rpl_gtid_slave_state_table_name.str,
+ thd->stmt_da->sql_errno(), thd->stmt_da->message());
+ /*
+ At this point we are not in a transaction (for example after DDL),
+ so we can not roll back. Anyway, normally updates to the slave
+ state table should not fail, and if they do, at least we made the
+ DBA aware of the problem in the error log.
+ */
+ }
+ DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE(););
+ if (mi->using_gtid == Master_info::USE_GTID_NO)
+ flush_relay_log_info(this);
+ DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE(););
/*
Note that Rotate_log_event::do_apply_event() does not call this
function, so there is no chance that a fake rotate event resets
@@ -1210,19 +1289,289 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
(probably ok - except in some very rare cases, only consequence
is that value may take some time to display in
Seconds_Behind_Master - not critical).
+
+ In parallel replication, we take care to not set last_master_timestamp
+ backwards, in case of out-of-order calls here.
*/
if (!(event_creation_time == 0 &&
- IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)))
+ IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)) &&
+ !(rgi->is_parallel_exec && event_creation_time <= last_master_timestamp)
+ )
last_master_timestamp= event_creation_time;
}
+ DBUG_VOID_RETURN;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
-void Relay_log_info::cleanup_context(THD *thd, bool error)
+int
+rpl_load_gtid_slave_state(THD *thd)
{
- DBUG_ENTER("Relay_log_info::cleanup_context");
+ TABLE_LIST tlist;
+ TABLE *table;
+ bool table_opened= false;
+ bool table_scanned= false;
+ bool array_inited= false;
+ struct local_element { uint64 sub_id; rpl_gtid gtid; };
+ struct local_element tmp_entry, *entry;
+ HASH hash;
+ DYNAMIC_ARRAY array;
+ int err= 0;
+ uint32 i;
+ DBUG_ENTER("rpl_load_gtid_slave_state");
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ bool loaded= rpl_global_gtid_slave_state.loaded;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ if (loaded)
+ DBUG_RETURN(0);
+
+ my_hash_init(&hash, &my_charset_bin, 32,
+ offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id),
+ sizeof(uint32), NULL, my_free, HASH_UNIQUE);
+ if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0))))
+ goto end;
+ array_inited= true;
+
+ mysql_reset_thd_for_next_command(thd);
+
+ tlist.init_one_table(STRING_WITH_LEN("mysql"),
+ rpl_gtid_slave_state_table_name.str,
+ rpl_gtid_slave_state_table_name.length,
+ NULL, TL_READ);
+ if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0)))
+ goto end;
+ table_opened= true;
+ table= tlist.table;
+
+ if ((err= gtid_check_rpl_slave_state_table(table)))
+ goto end;
+
+ bitmap_set_all(table->read_set);
+ if ((err= table->file->ha_rnd_init_with_error(1)))
+ {
+ table->file->print_error(err, MYF(0));
+ goto end;
+ }
+ table_scanned= true;
+ for (;;)
+ {
+ uint32 domain_id, server_id;
+ uint64 sub_id, seq_no;
+ uchar *rec;
+
+ if ((err= table->file->ha_rnd_next(table->record[0])))
+ {
+ if (err == HA_ERR_RECORD_DELETED)
+ continue;
+ else if (err == HA_ERR_END_OF_FILE)
+ break;
+ else
+ {
+ table->file->print_error(err, MYF(0));
+ goto end;
+ }
+ }
+ domain_id= (ulonglong)table->field[0]->val_int();
+ sub_id= (ulonglong)table->field[1]->val_int();
+ server_id= (ulonglong)table->field[2]->val_int();
+ seq_no= (ulonglong)table->field[3]->val_int();
+ DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n",
+ (unsigned)domain_id, (unsigned)server_id,
+ (ulong)seq_no, (ulong)sub_id));
+
+ tmp_entry.sub_id= sub_id;
+ tmp_entry.gtid.domain_id= domain_id;
+ tmp_entry.gtid.server_id= server_id;
+ tmp_entry.gtid.seq_no= seq_no;
+ if ((err= insert_dynamic(&array, (uchar *)&tmp_entry)))
+ {
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ goto end;
+ }
+
+ if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0)))
+ {
+ entry= (struct local_element *)rec;
+ if (entry->sub_id >= sub_id)
+ continue;
+ entry->sub_id= sub_id;
+ DBUG_ASSERT(entry->gtid.domain_id == domain_id);
+ entry->gtid.server_id= server_id;
+ entry->gtid.seq_no= seq_no;
+ }
+ else
+ {
+ if (!(entry= (struct local_element *)my_malloc(sizeof(*entry),
+ MYF(MY_WME))))
+ {
+ my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry));
+ err= 1;
+ goto end;
+ }
+ entry->sub_id= sub_id;
+ entry->gtid.domain_id= domain_id;
+ entry->gtid.server_id= server_id;
+ entry->gtid.seq_no= seq_no;
+ if ((err= my_hash_insert(&hash, (uchar *)entry)))
+ {
+ my_free(entry);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ goto end;
+ }
+ }
+ }
+
+ mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ if (rpl_global_gtid_slave_state.loaded)
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ goto end;
+ }
+
+ for (i= 0; i < array.elements; ++i)
+ {
+ get_dynamic(&array, (uchar *)&tmp_entry, i);
+ if ((err= rpl_global_gtid_slave_state.update(tmp_entry.gtid.domain_id,
+ tmp_entry.gtid.server_id,
+ tmp_entry.sub_id,
+ tmp_entry.gtid.seq_no)))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ goto end;
+ }
+ }
+
+ for (i= 0; i < hash.records; ++i)
+ {
+ entry= (struct local_element *)my_hash_element(&hash, i);
+ if (opt_bin_log &&
+ mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id,
+ entry->gtid.seq_no))
+ {
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
+ my_error(ER_OUT_OF_RESOURCES, MYF(0));
+ goto end;
+ }
+ }
+
+ rpl_global_gtid_slave_state.loaded= true;
+ mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
- DBUG_ASSERT(sql_thd == thd);
+ err= 0; /* Clear HA_ERR_END_OF_FILE */
+
+end:
+ if (table_scanned)
+ {
+ table->file->ha_index_or_rnd_end();
+ ha_commit_trans(thd, FALSE);
+ ha_commit_trans(thd, TRUE);
+ }
+ if (table_opened)
+ {
+ close_thread_tables(thd);
+ thd->mdl_context.release_transactional_locks();
+ }
+ if (array_inited)
+ delete_dynamic(&array);
+ my_hash_free(&hash);
+ DBUG_RETURN(err);
+}
+
+
+rpl_group_info::rpl_group_info(Relay_log_info *rli_)
+ : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
+ wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
+ deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
+ tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
+ is_parallel_exec(false), is_error(false),
+ row_stmt_start_timestamp(0), long_find_row_note_printed(false)
+{
+ bzero(&current_gtid, sizeof(current_gtid));
+ mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
+}
+
+
+rpl_group_info::~rpl_group_info()
+{
+ free_annotate_event();
+ delete deferred_events;
+ mysql_mutex_destroy(&sleep_lock);
+ mysql_cond_destroy(&sleep_cond);
+}
+
+
+int
+event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
+{
+ uint64 sub_id= rpl_global_gtid_slave_state.next_sub_id(gev->domain_id);
+ if (!sub_id)
+ {
+ /* Out of memory caused hash insertion to fail. */
+ return 1;
+ }
+ rgi->gtid_sub_id= sub_id;
+ rgi->current_gtid.server_id= gev->server_id;
+ rgi->current_gtid.domain_id= gev->domain_id;
+ rgi->current_gtid.seq_no= gev->seq_no;
+ return 0;
+}
+
+
+void
+delete_or_keep_event_post_apply(rpl_group_info *rgi,
+ Log_event_type typ, Log_event *ev)
+{
+ /*
+ ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be
+ thread-safe for parallel replication.
+ */
+
+ switch (typ) {
+ case FORMAT_DESCRIPTION_EVENT:
+ /*
+ Format_description_log_event should not be deleted because it
+ will be used to read info about the relay log's format;
+ it will be deleted when the SQL thread does not need it,
+ i.e. when this thread terminates.
+ */
+ break;
+ case ANNOTATE_ROWS_EVENT:
+ /*
+ Annotate_rows event should not be deleted because after it has
+ been applied, thd->query points to the string inside this event.
+ The thd->query will be used to generate new Annotate_rows event
+ during applying the subsequent Rows events.
+ */
+ rgi->set_annotate_event((Annotate_rows_log_event*) ev);
+ break;
+ case DELETE_ROWS_EVENT:
+ case UPDATE_ROWS_EVENT:
+ case WRITE_ROWS_EVENT:
+ /*
+ After the last Rows event has been applied, the saved Annotate_rows
+ event (if any) is not needed anymore and can be deleted.
+ */
+ if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F))
+ rgi->free_annotate_event();
+ /* fall through */
+ default:
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ if (!rgi->is_deferred_event(ev))
+ delete ev;
+ break;
+ }
+}
+
+
+void rpl_group_info::cleanup_context(THD *thd, bool error)
+{
+ DBUG_ENTER("Relay_log_info::cleanup_context");
+ DBUG_PRINT("enter", ("error: %d", (int) error));
+
+ DBUG_ASSERT(this->thd == thd);
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because
@@ -1243,8 +1592,20 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables();
slave_close_thread_tables(thd);
if (error)
+ {
thd->mdl_context.release_transactional_locks();
- clear_flag(IN_STMT);
+
+ if (thd == rli->sql_driver_thd)
+ {
+ /*
+ Reset flags. This is needed to handle incident events and errors in
+ the relay log noticed by the sql driver thread.
+ */
+ rli->clear_flag(Relay_log_info::IN_STMT);
+ rli->clear_flag(Relay_log_info::IN_TRANSACTION);
+ }
+ }
+
/*
Cleanup for the flags that have been set at do_apply_event.
*/
@@ -1262,7 +1623,8 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
DBUG_VOID_RETURN;
}
-void Relay_log_info::clear_tables_to_lock()
+
+void rpl_group_info::clear_tables_to_lock()
{
DBUG_ENTER("Relay_log_info::clear_tables_to_lock()");
#ifndef DBUG_OFF
@@ -1308,7 +1670,8 @@ void Relay_log_info::clear_tables_to_lock()
DBUG_VOID_RETURN;
}
-void Relay_log_info::slave_close_thread_tables(THD *thd)
+
+void rpl_group_info::slave_close_thread_tables(THD *thd)
{
DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)");
thd->stmt_da->can_overwrite_status= TRUE;
@@ -1341,4 +1704,6 @@ void Relay_log_info::slave_close_thread_tables(THD *thd)
clear_tables_to_lock();
DBUG_VOID_RETURN;
}
+
+
#endif