diff options
Diffstat (limited to 'sql/rpl_rli.cc')
-rw-r--r-- | sql/rpl_rli.cc | 505 |
1 files changed, 435 insertions, 70 deletions
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index b01f74408a6..797f5681ec5 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -32,6 +32,15 @@ static int count_relay_log_space(Relay_log_info* rli); +/** + Current replication state (hash of last GTID executed, per replication + domain). +*/ +rpl_slave_state rpl_global_gtid_slave_state; +/* Object used for MASTER_GTID_WAIT(). */ +gtid_waiting rpl_global_gtid_waiting; + + // Defined in slave.cc int init_intvar_from_file(int* var, IO_CACHE* f, int default_val); int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, @@ -42,23 +51,22 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id), info_fd(-1), cur_log_fd(-1), relay_log(&sync_relaylog_period), sync_counter(0), is_relay_log_recovery(is_slave_recovery), - save_temporary_tables(0), cur_log_old_open_count(0), group_relay_log_pos(0), + save_temporary_tables(0), mi(0), + cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0), #if HAVE_valgrind is_fake(FALSE), #endif group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), - last_master_timestamp(0), slave_skip_counter(0), - abort_pos_wait(0), slave_run_id(0), sql_thd(0), + last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), + abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), - until_log_pos(0), retried_trans(0), - tables_to_lock(0), tables_to_lock_count(0), - last_event_start_time(0), deferred_events(NULL),m_flags(0), - row_stmt_start_timestamp(0), long_find_row_note_printed(false), - m_annotate_event(0) + until_log_pos(0), retried_trans(0), executed_entries(0), + m_flags(0) { DBUG_ENTER("Relay_log_info::Relay_log_info"); + relay_log.is_relay_log= TRUE; #ifdef HAVE_PSI_INTERFACE relay_log.set_psi_keys(key_RELAYLOG_LOCK_index, key_RELAYLOG_update_cond, @@ -70,6 +78,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) group_relay_log_name[0]= event_relay_log_name[0]= group_master_log_name[0]= 0; until_log_name[0]= ign_master_log_name_end[0]= 0; + max_relay_log_size= global_system_variables.max_relay_log_size; bzero((char*) &info_file, sizeof(info_file)); bzero((char*) &cache_buf, sizeof(cache_buf)); cached_charset_invalidate(); @@ -78,12 +87,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery) &data_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_relay_log_info_log_space_lock, &log_space_lock, MY_MUTEX_INIT_FAST); - mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL); mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL); mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL); mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL); - mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL); relay_log.init_pthread_objects(); DBUG_VOID_RETURN; } @@ -96,14 +103,11 @@ Relay_log_info::~Relay_log_info() mysql_mutex_destroy(&run_lock); mysql_mutex_destroy(&data_lock); mysql_mutex_destroy(&log_space_lock); - mysql_mutex_destroy(&sleep_lock); mysql_cond_destroy(&data_cond); mysql_cond_destroy(&start_cond); mysql_cond_destroy(&stop_cond); mysql_cond_destroy(&log_space_cond); - mysql_cond_destroy(&sleep_cond); relay_log.cleanup(); - free_annotate_event(); DBUG_VOID_RETURN; } @@ -128,8 +132,6 @@ int init_relay_log_info(Relay_log_info* rli, rli->abort_pos_wait=0; rli->log_space_limit= relay_log_space_limit; rli->log_space_total= 0; - rli->tables_to_lock= 0; - rli->tables_to_lock_count= 0; char pattern[FN_REFLEN]; (void) my_realpath(pattern, slave_load_tmpdir, 0); @@ -150,15 +152,6 @@ int init_relay_log_info(Relay_log_info* rli, event, in flush_master_info(mi, 1, ?). */ - /* - For the maximum log size, we choose max_relay_log_size if it is - non-zero, max_binlog_size otherwise. If later the user does SET - GLOBAL on one of these variables, fix_max_binlog_size and - fix_max_relay_log_size will reconsider the choice (for example - if the user changes max_relay_log_size to zero, we have to - switch to using max_binlog_size for the relay log) and update - rli->relay_log.max_size (and mysql_bin_log.max_size). - */ { /* Reports an error and returns, if the --relay-log's path is a directory.*/ @@ -208,19 +201,35 @@ a file name for --relay-log-index option", opt_relaylog_index_name); name_warning_sent= 1; } - rli->relay_log.is_relay_log= TRUE; + /* For multimaster, add connection name to relay log filenames */ + Master_info* mi= rli->mi; + char buf_relay_logname[FN_REFLEN], buf_relaylog_index_name_buff[FN_REFLEN]; + char *buf_relaylog_index_name= opt_relaylog_index_name; + + create_logfile_name_with_suffix(buf_relay_logname, + sizeof(buf_relay_logname), + ln, 1, &mi->cmp_connection_name); + ln= buf_relay_logname; + + if (opt_relaylog_index_name) + { + buf_relaylog_index_name= buf_relaylog_index_name_buff; + create_logfile_name_with_suffix(buf_relaylog_index_name_buff, + sizeof(buf_relaylog_index_name_buff), + opt_relaylog_index_name, 0, + &mi->cmp_connection_name); + } /* note, that if open() fails, we'll still have index file open but a destructor will take care of that */ - if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln, TRUE) || - rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0, - (max_relay_log_size ? max_relay_log_size : - max_binlog_size), 1, TRUE)) + if (rli->relay_log.open_index_file(buf_relaylog_index_name, ln, TRUE) || + rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, + mi->rli.max_relay_log_size, 1, TRUE)) { mysql_mutex_unlock(&rli->data_lock); - sql_print_error("Failed in open_log() called from init_relay_log_info()"); + sql_print_error("Failed when trying to open logs for '%s' in init_relay_log_info(). Error: %M", ln, my_errno); DBUG_RETURN(1); } } @@ -512,6 +521,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log, } rli->group_relay_log_pos = rli->event_relay_log_pos = pos; + rli->clear_flag(Relay_log_info::IN_STMT); + rli->clear_flag(Relay_log_info::IN_TRANSACTION); /* Test to see if the previous run was with the skip of purging @@ -861,17 +872,54 @@ improper_arguments: %d timed_out: %d", void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, - bool skip_lock) + rpl_group_info *rgi, + bool skip_lock) { DBUG_ENTER("Relay_log_info::inc_group_relay_log_pos"); if (!skip_lock) mysql_mutex_lock(&data_lock); - inc_event_relay_log_pos(); - group_relay_log_pos= event_relay_log_pos; - strmake_buf(group_relay_log_name,event_relay_log_name); + rgi->inc_event_relay_log_pos(); + DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", + (long) log_pos, (long) group_master_log_pos)); + if (rgi->is_parallel_exec) + { + /* In case of parallel replication, do not update the position backwards. */ + int cmp= strcmp(group_relay_log_name, event_relay_log_name); + if (cmp < 0) + { + group_relay_log_pos= rgi->future_event_relay_log_pos; + strmake_buf(group_relay_log_name, event_relay_log_name); + notify_group_relay_log_name_update(); + } else if (cmp == 0 && group_relay_log_pos < rgi->future_event_relay_log_pos) + group_relay_log_pos= rgi->future_event_relay_log_pos; - notify_group_relay_log_name_update(); + /* + In the parallel case we need to update the master_log_name here, rather + than in Rotate_log_event::do_update_pos(). + */ + cmp= strcmp(group_master_log_name, rgi->future_event_master_log_name); + if (cmp <= 0) + { + if (cmp < 0) + { + strcpy(group_master_log_name, rgi->future_event_master_log_name); + notify_group_master_log_name_update(); + group_master_log_pos= log_pos; + } + else if (group_master_log_pos < log_pos) + group_master_log_pos= log_pos; + } + } + else + { + /* Non-parallel case. */ + group_relay_log_pos= event_relay_log_pos; + strmake_buf(group_relay_log_name, event_relay_log_name); + notify_group_relay_log_name_update(); + if (log_pos) // 3.23 binlogs don't have log_posx + group_master_log_pos= log_pos; + } /* If the slave does not support transactions and replicates a transaction, @@ -903,12 +951,6 @@ void Relay_log_info::inc_group_relay_log_pos(ulonglong log_pos, the relay log is not "val". With the end_log_pos solution, we avoid computations involving lengthes. */ - DBUG_PRINT("info", ("log_pos: %lu group_master_log_pos: %lu", - (long) log_pos, (long) group_master_log_pos)); - if (log_pos) // 3.23 binlogs don't have log_posx - { - group_master_log_pos= log_pos; - } mysql_cond_broadcast(&data_cond); if (!skip_lock) mysql_mutex_unlock(&data_lock); @@ -924,6 +966,9 @@ void Relay_log_info::close_temporary_tables() for (table=save_temporary_tables ; table ; table=next) { next=table->next; + + /* Reset in_use as the table may have been created by another thd */ + table->in_use=0; /* Don't ask for disk deletion. For now, anyway they will be deleted when slave restarts, but it is a better intention to not delete them. @@ -995,26 +1040,35 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset, rli->cur_log_fd= -1; } - if (rli->relay_log.reset_logs(thd)) + if (rli->relay_log.reset_logs(thd, !just_reset, NULL, 0)) { *errmsg = "Failed during log reset"; error=1; goto err; } - /* Save name of used relay log file */ - strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); - strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); - rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; - if (count_relay_log_space(rli)) - { - *errmsg= "Error counting relay log space"; - error=1; - goto err; - } if (!just_reset) + { + /* Save name of used relay log file */ + strmake_buf(rli->group_relay_log_name, rli->relay_log.get_log_fname()); + strmake_buf(rli->event_relay_log_name, rli->relay_log.get_log_fname()); + rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE; + rli->log_space_total= 0; + + if (count_relay_log_space(rli)) + { + *errmsg= "Error counting relay log space"; + error=1; + goto err; + } error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos, 0 /* do not need data lock */, errmsg, 0); + } + else + { + /* Ensure relay log names are not used */ + rli->group_relay_log_name[0]= rli->event_relay_log_name[0]= 0; + } err: #ifndef DBUG_OFF @@ -1065,16 +1119,18 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev) ulonglong log_pos; DBUG_ENTER("Relay_log_info::is_until_satisfied"); - DBUG_ASSERT(until_condition != UNTIL_NONE); + DBUG_ASSERT(until_condition == UNTIL_MASTER_POS || + until_condition == UNTIL_RELAY_POS); if (until_condition == UNTIL_MASTER_POS) { - if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id) + if (ev && ev->server_id == (uint32) global_system_variables.server_id && + !replicate_same_server_id) DBUG_RETURN(FALSE); log_name= group_master_log_name; - log_pos= (!ev)? group_master_log_pos : - ((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ? - group_master_log_pos : ev->log_pos - ev->data_written); + log_pos= ((!ev)? group_master_log_pos : + (get_flag(IN_TRANSACTION) || !ev->log_pos) ? + group_master_log_pos : ev->log_pos - ev->data_written); } else { /* until_condition == UNTIL_RELAY_POS */ @@ -1167,19 +1223,24 @@ bool Relay_log_info::cached_charset_compare(char *charset) const void Relay_log_info::stmt_done(my_off_t event_master_log_pos, - time_t event_creation_time) + time_t event_creation_time, THD *thd, + rpl_group_info *rgi) { #ifndef DBUG_OFF extern uint debug_not_change_ts_if_art_event; #endif - clear_flag(IN_STMT); + DBUG_ENTER("Relay_log_info::stmt_done"); + DBUG_ASSERT(rgi->rli == this); /* If in a transaction, and if the slave supports transactions, just inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with BEGIN/COMMIT, not with SET AUTOCOMMIT= . + We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN + is also used for single row transactions. + CAUTION: opt_using_transactions means innodb || bdb ; suppose the master supports InnoDB and BDB, but the slave supports only BDB, problems will arise: - suppose an InnoDB table is created on the @@ -1197,12 +1258,30 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, middle of the "transaction". START SLAVE will resume at BEGIN while the MyISAM table has already been updated. */ - if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions) - inc_event_relay_log_pos(); + if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && + opt_using_transactions) + rgi->inc_event_relay_log_pos(); else { - inc_group_relay_log_pos(event_master_log_pos); - flush_relay_log_info(this); + inc_group_relay_log_pos(event_master_log_pos, rgi); + if (rpl_global_gtid_slave_state.record_and_update_gtid(thd, rgi)) + { + report(WARNING_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, + "Failed to update GTID state in %s.%s, slave state may become " + "inconsistent: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, + thd->stmt_da->sql_errno(), thd->stmt_da->message()); + /* + At this point we are not in a transaction (for example after DDL), + so we can not roll back. Anyway, normally updates to the slave + state table should not fail, and if they do, at least we made the + DBA aware of the problem in the error log. + */ + } + DBUG_EXECUTE_IF("inject_crash_before_flush_rli", DBUG_SUICIDE();); + if (mi->using_gtid == Master_info::USE_GTID_NO) + flush_relay_log_info(this); + DBUG_EXECUTE_IF("inject_crash_after_flush_rli", DBUG_SUICIDE();); /* Note that Rotate_log_event::do_apply_event() does not call this function, so there is no chance that a fake rotate event resets @@ -1210,19 +1289,289 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos, (probably ok - except in some very rare cases, only consequence is that value may take some time to display in Seconds_Behind_Master - not critical). + + In parallel replication, we take care to not set last_master_timestamp + backwards, in case of out-of-order calls here. */ if (!(event_creation_time == 0 && - IF_DBUG(debug_not_change_ts_if_art_event > 0, 1))) + IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)) && + !(rgi->is_parallel_exec && event_creation_time <= last_master_timestamp) + ) last_master_timestamp= event_creation_time; } + DBUG_VOID_RETURN; } #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -void Relay_log_info::cleanup_context(THD *thd, bool error) +int +rpl_load_gtid_slave_state(THD *thd) { - DBUG_ENTER("Relay_log_info::cleanup_context"); + TABLE_LIST tlist; + TABLE *table; + bool table_opened= false; + bool table_scanned= false; + bool array_inited= false; + struct local_element { uint64 sub_id; rpl_gtid gtid; }; + struct local_element tmp_entry, *entry; + HASH hash; + DYNAMIC_ARRAY array; + int err= 0; + uint32 i; + DBUG_ENTER("rpl_load_gtid_slave_state"); + + mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); + bool loaded= rpl_global_gtid_slave_state.loaded; + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + if (loaded) + DBUG_RETURN(0); + + my_hash_init(&hash, &my_charset_bin, 32, + offsetof(local_element, gtid) + offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); + if ((err= my_init_dynamic_array(&array, sizeof(local_element), 0, 0, MYF(0)))) + goto end; + array_inited= true; + + mysql_reset_thd_for_next_command(thd); + + tlist.init_one_table(STRING_WITH_LEN("mysql"), + rpl_gtid_slave_state_table_name.str, + rpl_gtid_slave_state_table_name.length, + NULL, TL_READ); + if ((err= open_and_lock_tables(thd, &tlist, FALSE, 0))) + goto end; + table_opened= true; + table= tlist.table; + + if ((err= gtid_check_rpl_slave_state_table(table))) + goto end; + + bitmap_set_all(table->read_set); + if ((err= table->file->ha_rnd_init_with_error(1))) + { + table->file->print_error(err, MYF(0)); + goto end; + } + table_scanned= true; + for (;;) + { + uint32 domain_id, server_id; + uint64 sub_id, seq_no; + uchar *rec; + + if ((err= table->file->ha_rnd_next(table->record[0]))) + { + if (err == HA_ERR_RECORD_DELETED) + continue; + else if (err == HA_ERR_END_OF_FILE) + break; + else + { + table->file->print_error(err, MYF(0)); + goto end; + } + } + domain_id= (ulonglong)table->field[0]->val_int(); + sub_id= (ulonglong)table->field[1]->val_int(); + server_id= (ulonglong)table->field[2]->val_int(); + seq_no= (ulonglong)table->field[3]->val_int(); + DBUG_PRINT("info", ("Read slave state row: %u-%u-%lu sub_id=%lu\n", + (unsigned)domain_id, (unsigned)server_id, + (ulong)seq_no, (ulong)sub_id)); + + tmp_entry.sub_id= sub_id; + tmp_entry.gtid.domain_id= domain_id; + tmp_entry.gtid.server_id= server_id; + tmp_entry.gtid.seq_no= seq_no; + if ((err= insert_dynamic(&array, (uchar *)&tmp_entry))) + { + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } + + if ((rec= my_hash_search(&hash, (const uchar *)&domain_id, 0))) + { + entry= (struct local_element *)rec; + if (entry->sub_id >= sub_id) + continue; + entry->sub_id= sub_id; + DBUG_ASSERT(entry->gtid.domain_id == domain_id); + entry->gtid.server_id= server_id; + entry->gtid.seq_no= seq_no; + } + else + { + if (!(entry= (struct local_element *)my_malloc(sizeof(*entry), + MYF(MY_WME)))) + { + my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*entry)); + err= 1; + goto end; + } + entry->sub_id= sub_id; + entry->gtid.domain_id= domain_id; + entry->gtid.server_id= server_id; + entry->gtid.seq_no= seq_no; + if ((err= my_hash_insert(&hash, (uchar *)entry))) + { + my_free(entry); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } + } + } + + mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state); + if (rpl_global_gtid_slave_state.loaded) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + goto end; + } + + for (i= 0; i < array.elements; ++i) + { + get_dynamic(&array, (uchar *)&tmp_entry, i); + if ((err= rpl_global_gtid_slave_state.update(tmp_entry.gtid.domain_id, + tmp_entry.gtid.server_id, + tmp_entry.sub_id, + tmp_entry.gtid.seq_no))) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } + } + + for (i= 0; i < hash.records; ++i) + { + entry= (struct local_element *)my_hash_element(&hash, i); + if (opt_bin_log && + mysql_bin_log.bump_seq_no_counter_if_needed(entry->gtid.domain_id, + entry->gtid.seq_no)) + { + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); + my_error(ER_OUT_OF_RESOURCES, MYF(0)); + goto end; + } + } + + rpl_global_gtid_slave_state.loaded= true; + mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); - DBUG_ASSERT(sql_thd == thd); + err= 0; /* Clear HA_ERR_END_OF_FILE */ + +end: + if (table_scanned) + { + table->file->ha_index_or_rnd_end(); + ha_commit_trans(thd, FALSE); + ha_commit_trans(thd, TRUE); + } + if (table_opened) + { + close_thread_tables(thd); + thd->mdl_context.release_transactional_locks(); + } + if (array_inited) + delete_dynamic(&array); + my_hash_free(&hash); + DBUG_RETURN(err); +} + + +rpl_group_info::rpl_group_info(Relay_log_info *rli_) + : rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0), + wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0), + deferred_events(NULL), m_annotate_event(0), tables_to_lock(0), + tables_to_lock_count(0), trans_retries(0), last_event_start_time(0), + is_parallel_exec(false), is_error(false), + row_stmt_start_timestamp(0), long_find_row_note_printed(false) +{ + bzero(¤t_gtid, sizeof(current_gtid)); + mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL); +} + + +rpl_group_info::~rpl_group_info() +{ + free_annotate_event(); + delete deferred_events; + mysql_mutex_destroy(&sleep_lock); + mysql_cond_destroy(&sleep_cond); +} + + +int +event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) +{ + uint64 sub_id= rpl_global_gtid_slave_state.next_sub_id(gev->domain_id); + if (!sub_id) + { + /* Out of memory caused hash insertion to fail. */ + return 1; + } + rgi->gtid_sub_id= sub_id; + rgi->current_gtid.server_id= gev->server_id; + rgi->current_gtid.domain_id= gev->domain_id; + rgi->current_gtid.seq_no= gev->seq_no; + return 0; +} + + +void +delete_or_keep_event_post_apply(rpl_group_info *rgi, + Log_event_type typ, Log_event *ev) +{ + /* + ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be + thread-safe for parallel replication. + */ + + switch (typ) { + case FORMAT_DESCRIPTION_EVENT: + /* + Format_description_log_event should not be deleted because it + will be used to read info about the relay log's format; + it will be deleted when the SQL thread does not need it, + i.e. when this thread terminates. + */ + break; + case ANNOTATE_ROWS_EVENT: + /* + Annotate_rows event should not be deleted because after it has + been applied, thd->query points to the string inside this event. + The thd->query will be used to generate new Annotate_rows event + during applying the subsequent Rows events. + */ + rgi->set_annotate_event((Annotate_rows_log_event*) ev); + break; + case DELETE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case WRITE_ROWS_EVENT: + /* + After the last Rows event has been applied, the saved Annotate_rows + event (if any) is not needed anymore and can be deleted. + */ + if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) + rgi->free_annotate_event(); + /* fall through */ + default: + DBUG_PRINT("info", ("Deleting the event after it has been executed")); + if (!rgi->is_deferred_event(ev)) + delete ev; + break; + } +} + + +void rpl_group_info::cleanup_context(THD *thd, bool error) +{ + DBUG_ENTER("Relay_log_info::cleanup_context"); + DBUG_PRINT("enter", ("error: %d", (int) error)); + + DBUG_ASSERT(this->thd == thd); /* 1) Instances of Table_map_log_event, if ::do_apply_event() was called on them, may have opened tables, which we cannot be sure have been closed (because @@ -1243,8 +1592,20 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) m_table_map.clear_tables(); slave_close_thread_tables(thd); if (error) + { thd->mdl_context.release_transactional_locks(); - clear_flag(IN_STMT); + + if (thd == rli->sql_driver_thd) + { + /* + Reset flags. This is needed to handle incident events and errors in + the relay log noticed by the sql driver thread. + */ + rli->clear_flag(Relay_log_info::IN_STMT); + rli->clear_flag(Relay_log_info::IN_TRANSACTION); + } + } + /* Cleanup for the flags that have been set at do_apply_event. */ @@ -1262,7 +1623,8 @@ void Relay_log_info::cleanup_context(THD *thd, bool error) DBUG_VOID_RETURN; } -void Relay_log_info::clear_tables_to_lock() + +void rpl_group_info::clear_tables_to_lock() { DBUG_ENTER("Relay_log_info::clear_tables_to_lock()"); #ifndef DBUG_OFF @@ -1308,7 +1670,8 @@ void Relay_log_info::clear_tables_to_lock() DBUG_VOID_RETURN; } -void Relay_log_info::slave_close_thread_tables(THD *thd) + +void rpl_group_info::slave_close_thread_tables(THD *thd) { DBUG_ENTER("Relay_log_info::slave_close_thread_tables(THD *thd)"); thd->stmt_da->can_overwrite_status= TRUE; @@ -1341,4 +1704,6 @@ void Relay_log_info::slave_close_thread_tables(THD *thd) clear_tables_to_lock(); DBUG_VOID_RETURN; } + + #endif |