diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/log.cc | 18 | ||||
-rw-r--r-- | sql/log_event.cc | 657 | ||||
-rw-r--r-- | sql/log_event.h | 338 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 38 | ||||
-rw-r--r-- | sql/rpl_rli.h | 22 | ||||
-rw-r--r-- | sql/rpl_utility.cc | 3 | ||||
-rw-r--r-- | sql/rpl_utility.h | 2 | ||||
-rw-r--r-- | sql/slave.cc | 194 | ||||
-rw-r--r-- | sql/slave.h | 6 | ||||
-rw-r--r-- | sql/sql_binlog.cc | 12 |
10 files changed, 898 insertions, 392 deletions
diff --git a/sql/log.cc b/sql/log.cc index deb77890f35..7e60950977a 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1548,7 +1548,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; DBUG_ASSERT(mysql_bin_log.is_open()); - if (all && trx_data->empty()) + /* + The condition here has to be identical to the one inside + binlog_end_trans(), guarding the write of the transaction cache to + the binary log. + */ + if ((all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) && + trx_data->empty()) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid() trx_data->reset(); @@ -2499,7 +2505,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, /* Set 'created' to 0, so that in next relay logs this event does not trigger cleaning actions on the slave in - Format_description_log_event::exec_event(). + Format_description_log_event::apply_event_impl(). */ description_event_for_queue->created= 0; /* Don't set log_pos in event header */ @@ -3206,8 +3212,10 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock) { tc_log_page_waits++; pthread_mutex_lock(&LOCK_prep_xids); - while (prepared_xids) + while (prepared_xids) { + DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids); + } pthread_mutex_unlock(&LOCK_prep_xids); } @@ -5061,8 +5069,10 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) { pthread_mutex_lock(&LOCK_prep_xids); DBUG_ASSERT(prepared_xids > 0); - if (--prepared_xids == 0) + if (--prepared_xids == 0) { + DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids)); pthread_cond_signal(&COND_prep_xids); + } pthread_mutex_unlock(&LOCK_prep_xids); rotate_and_purge(0); // as ::write() did not rotate } diff --git a/sql/log_event.cc b/sql/log_event.cc index f8d3c43bfba..8e6311ce53a 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -88,9 +88,10 @@ public: operator&() DESCRIPTION - Function to return a pointer to the internal, so that the object - can be treated as a IO_CACHE and used with the my_b_* IO_CACHE - functions + + Function to return a pointer to the internal cache, so that the + object can be treated as a IO_CACHE and used with the my_b_* + IO_CACHE functions RETURN VALUE A pointer to the internal IO_CACHE. @@ -531,25 +532,19 @@ Log_event::Log_event(const char* buf, #ifndef MYSQL_CLIENT #ifdef HAVE_REPLICATION -/* - Log_event::exec_event() -*/ - -int Log_event::exec_event(struct st_relay_log_info* rli) +int Log_event::do_update_pos(RELAY_LOG_INFO *rli) { - DBUG_ENTER("Log_event::exec_event"); - /* - rli is null when (as far as I (Guilhem) know) - the caller is - Load_log_event::exec_event *and* that one is called from - Execute_load_log_event::exec_event. - In this case, we don't do anything here ; - Execute_load_log_event::exec_event will call Log_event::exec_event - again later with the proper rli. - Strictly speaking, if we were sure that rli is null - only in the case discussed above, 'if (rli)' is useless here. - But as we are not 100% sure, keep it for now. + rli is null when (as far as I (Guilhem) know) the caller is + Load_log_event::do_apply_event *and* that one is called from + Execute_load_log_event::do_apply_event. In this case, we don't + do anything here ; Execute_load_log_event::do_apply_event will + call Log_event::do_apply_event again later with the proper rli. + Strictly speaking, if we were sure that rli is null only in the + case discussed above, 'if (rli)' is useless here. But as we are + not 100% sure, keep it for now. + + Matz: I don't think we will need this check with this refactoring. */ if (rli) { @@ -584,18 +579,37 @@ int Log_event::exec_event(struct st_relay_log_info* rli) { rli->inc_group_relay_log_pos(log_pos); flush_relay_log_info(rli); - /* - Note that Rotate_log_event::exec_event() does not call this - function, so there is no chance that a fake rotate event resets - last_master_timestamp. - Note that we update without mutex (probably ok - except in some very - rare cases, only consequence is that value may take some time to - display in Seconds_Behind_Master - not critical). + /* + Note that Rotate_log_event::do_apply_event() does not call + this function, so there is no chance that a fake rotate event + resets last_master_timestamp. Note that we update without + mutex (probably ok - except in some very rare cases, only + consequence is that value may take some time to display in + Seconds_Behind_Master - not critical). */ rli->last_master_timestamp= when; } } - DBUG_RETURN(0); + + return 0; // Cannot fail currently +} + + +Log_event::enum_skip_reason +Log_event::do_shall_skip(RELAY_LOG_INFO *rli) +{ + DBUG_PRINT("info", ("ev->server_id=%lu, ::server_id=%lu," + " rli->replicate_same_server_id=%d," + " rli->slave_skip_counter=%d", + (ulong) server_id, (ulong) ::server_id, + rli->replicate_same_server_id, + rli->slave_skip_counter)); + if (server_id == ::server_id && !rli->replicate_same_server_id) + return EVENT_SKIP_IGNORE; + else if (rli->slave_skip_counter > 0) + return EVENT_SKIP_COUNT; + else + return EVENT_SKIP_NOT; } @@ -742,7 +756,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet, ulong data_len; int result=0; char buf[LOG_EVENT_MINIMAL_HEADER_LEN]; - DBUG_ENTER("read_log_event"); + DBUG_ENTER("Log_event::read_log_event"); if (log_lock) pthread_mutex_lock(log_lock); @@ -817,7 +831,7 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, const Format_description_log_event *description_event) #endif { - DBUG_ENTER("Log_event::read_log_event(IO_CACHE *, Format_description_log_event *"); + DBUG_ENTER("Log_event::read_log_event"); DBUG_ASSERT(description_event != 0); char head[LOG_EVENT_MINIMAL_HEADER_LEN]; /* @@ -1887,27 +1901,28 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) /* - Query_log_event::exec_event() + Query_log_event::do_apply_event() */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Query_log_event::exec_event(struct st_relay_log_info* rli) +int Query_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { - return exec_event(rli, query, q_len); + return do_apply_event(rli, query, q_len); } -int Query_log_event::exec_event(struct st_relay_log_info* rli, - const char *query_arg, uint32 q_len_arg) +int Query_log_event::do_apply_event(RELAY_LOG_INFO const *rli, + const char *query_arg, uint32 q_len_arg) { LEX_STRING new_db; int expected_error,actual_error= 0; /* - Colleagues: please never free(thd->catalog) in MySQL. This would lead to - bugs as here thd->catalog is a part of an alloced block, not an entire - alloced block (see Query_log_event::exec_event()). Same for thd->db. - Thank you. + Colleagues: please never free(thd->catalog) in MySQL. This would + lead to bugs as here thd->catalog is a part of an alloced block, + not an entire alloced block (see + Query_log_event::do_apply_event()). Same for thd->db. Thank + you. */ thd->catalog= catalog_len ? (char *) catalog : (char *)""; new_db.length= db_len; @@ -1926,11 +1941,11 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli, END of the current log event (COMMIT). We save it in rli so that InnoDB can access it. */ - rli->future_group_master_log_pos= log_pos; + const_cast<RELAY_LOG_INFO*>(rli)->future_group_master_log_pos= log_pos; DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); - clear_all_errors(thd, rli); - rli->clear_tables_to_lock(); + clear_all_errors(thd, const_cast<RELAY_LOG_INFO*>(rli)); + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock(); /* Note: We do not need to execute reset_one_shot_variables() if this @@ -1939,8 +1954,8 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli, its companion query. If the SET is ignored because of db_ok(), the companion query will also be ignored, and if the companion query is ignored in the db_ok() test of - ::exec_event(), then the companion SET also have so we - don't need to reset_one_shot_variables(). + ::do_apply_event(), then the companion SET also have so + we don't need to reset_one_shot_variables(). */ if (rpl_filter->db_ok(thd->db)) { @@ -2056,7 +2071,7 @@ int Query_log_event::exec_event(struct st_relay_log_info* rli, to check/fix it. */ if (mysql_test_parse_for_slave(thd, thd->query, thd->query_length)) - clear_all_errors(thd, rli); /* Can ignore query */ + clear_all_errors(thd, const_cast<RELAY_LOG_INFO*>(rli)); /* Can ignore query */ else { slave_print_msg(ERROR_LEVEL, rli, expected_error, @@ -2107,7 +2122,7 @@ Default database: '%s'. Query: '%s'", ignored_error_code(actual_error)) { DBUG_PRINT("info",("error ignored")); - clear_all_errors(thd, rli); + clear_all_errors(thd, const_cast<RELAY_LOG_INFO*>(rli)); } /* Other cases: mostly we expected no error and get one. @@ -2174,16 +2189,26 @@ end: thd->first_successful_insert_id_in_prev_stmt= 0; thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0; free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); + return thd->query_error; +} + +int Query_log_event::do_update_pos(RELAY_LOG_INFO *rli) +{ /* - If there was an error we stop. Otherwise we increment positions. Note that - we will not increment group* positions if we are just after a SET - ONE_SHOT, because SET ONE_SHOT should not be separated from its following - updating query. + Note that we will not increment group* positions if we are just + after a SET ONE_SHOT, because SET ONE_SHOT should not be separated + from its following updating query. */ - return (thd->query_error ? thd->query_error : - (thd->one_shot_set ? (rli->inc_event_relay_log_pos(),0) : - Log_event::exec_event(rli))); + if (thd->one_shot_set) + { + rli->inc_event_relay_log_pos(); + return 0; + } + else + return Log_event::do_update_pos(rli); } + + #endif @@ -2312,7 +2337,7 @@ bool Start_log_event_v3::write(IO_CACHE* file) /* - Start_log_event_v3::exec_event() + Start_log_event_v3::do_apply_event() The master started @@ -2331,9 +2356,9 @@ bool Start_log_event_v3::write(IO_CACHE* file) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Start_log_event_v3::exec_event(struct st_relay_log_info* rli) +int Start_log_event_v3::do_apply_event(RELAY_LOG_INFO const *rli) { - DBUG_ENTER("Start_log_event_v3::exec_event"); + DBUG_ENTER("Start_log_event_v3::do_apply_event"); switch (binlog_version) { case 3: @@ -2375,7 +2400,7 @@ int Start_log_event_v3::exec_event(struct st_relay_log_info* rli) /* this case is impossible */ DBUG_RETURN(1); } - DBUG_RETURN(Log_event::exec_event(rli)); + DBUG_RETURN(0); } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ @@ -2566,24 +2591,10 @@ bool Format_description_log_event::write(IO_CACHE* file) } #endif -/* - SYNOPSIS - Format_description_log_event::exec_event() - - IMPLEMENTATION - Save the information which describes the binlog's format, to be able to - read all coming events. - Call Start_log_event_v3::exec_event(). -*/ - #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Format_description_log_event::exec_event(struct st_relay_log_info* rli) +int Format_description_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { - DBUG_ENTER("Format_description_log_event::exec_event"); - - /* save the information describing this binlog */ - delete rli->relay_log.description_event_for_exec; - rli->relay_log.description_event_for_exec= this; + DBUG_ENTER("Format_description_log_event::do_apply_event"); #ifdef USING_TRANSACTIONS /* @@ -2605,14 +2616,36 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli) "or ROLLBACK in relay log). A probable cause is that " "the master died while writing the transaction to " "its binary log, thus rolled back too."); - rli->cleanup_context(thd, 1); + const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, 1); } #endif /* - If this event comes from ourselves, there is no cleaning task to perform, - we don't call Start_log_event_v3::exec_event() (this was just to update the - log's description event). + If this event comes from ourselves, there is no cleaning task to + perform, we don't call Start_log_event_v3::do_apply_event() + (this was just to update the log's description event). */ + if (server_id != (uint32) ::server_id) + { + /* + If the event was not requested by the slave i.e. the master sent + it while the slave asked for a position >4, the event will make + rli->group_master_log_pos advance. Say that the slave asked for + position 1000, and the Format_desc event's end is 96. Then in + the beginning of replication rli->group_master_log_pos will be + 0, then 96, then jump to first really asked event (which is + >96). So this is ok. + */ + DBUG_RETURN(Start_log_event_v3::do_apply_event(rli)); + } + DBUG_RETURN(0); +} + +int Format_description_log_event::do_update_pos(RELAY_LOG_INFO *rli) +{ + /* save the information describing this binlog */ + delete rli->relay_log.description_event_for_exec; + rli->relay_log.description_event_for_exec= this; + if (server_id == (uint32) ::server_id) { /* @@ -2629,19 +2662,20 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli) the Intvar_log_event respectively. */ rli->inc_event_relay_log_pos(); - DBUG_RETURN(0); + return 0; + } + else + { + return Log_event::do_update_pos(rli); } +} - /* - If the event was not requested by the slave i.e. the master sent it while - the slave asked for a position >4, the event will make - rli->group_master_log_pos advance. Say that the slave asked for position - 1000, and the Format_desc event's end is 96. Then in the beginning of - replication rli->group_master_log_pos will be 0, then 96, then jump to - first really asked event (which is >96). So this is ok. - */ - DBUG_RETURN(Start_log_event_v3::exec_event(rli)); +Log_event::enum_skip_reason +Format_description_log_event::do_shall_skip(RELAY_LOG_INFO *rli) +{ + return Log_event::EVENT_SKIP_NOT; } + #endif @@ -3155,30 +3189,32 @@ void Load_log_event::set_fields(const char* affected_db, Does the data loading job when executing a LOAD DATA on the slave SYNOPSIS - Load_log_event::exec_event - net - rli - use_rli_only_for_errors - if set to 1, rli is provided to - Load_log_event::exec_event only for this - function to have RPL_LOG_NAME and - rli->last_slave_error, both being used by - error reports. rli's position advancing - is skipped (done by the caller which is - Execute_load_log_event::exec_event). - - if set to 0, rli is provided for full use, - i.e. for error reports and position - advancing. + Load_log_event::do_apply_event + net + rli + use_rli_only_for_errors - if set to 1, rli is provided to + Load_log_event::do_apply_event + only for this function to have + RPL_LOG_NAME and + rli->last_slave_error, both being + used by error reports. rli's + position advancing is skipped (done + by the caller which is + Execute_load_log_event::do_apply_event). + - if set to 0, rli is provided for + full use, i.e. for error reports and + position advancing. DESCRIPTION Does the data loading job when executing a LOAD DATA on the slave - + RETURN VALUE - 0 Success + 0 Success 1 Failure */ -int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, - bool use_rli_only_for_errors) +int Load_log_event::do_apply_event(NET* net, RELAY_LOG_INFO const *rli, + bool use_rli_only_for_errors) { LEX_STRING new_db; new_db.length= db_len; @@ -3187,9 +3223,9 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, DBUG_ASSERT(thd->query == 0); thd->query_length= 0; // Should not be needed thd->query_error= 0; - clear_all_errors(thd, rli); + clear_all_errors(thd, const_cast<RELAY_LOG_INFO*>(rli)); - /* see Query_log_event::exec_event() and BUG#13360 */ + /* see Query_log_event::do_apply_event() and BUG#13360 */ DBUG_ASSERT(!rli->m_table_map.count()); /* Usually mysql_init_query() is called by mysql_parse(), but we need it here @@ -3198,22 +3234,26 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, mysql_init_query(thd, 0, 0); if (!use_rli_only_for_errors) { - /* Saved for InnoDB, see comment in Query_log_event::exec_event() */ - rli->future_group_master_log_pos= log_pos; + /* + Saved for InnoDB, see comment in + Query_log_event::do_apply_event() + */ + const_cast<RELAY_LOG_INFO*>(rli)->future_group_master_log_pos= log_pos; DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos)); } /* - We test replicate_*_db rules. Note that we have already prepared the file - to load, even if we are going to ignore and delete it now. So it is - possible that we did a lot of disk writes for nothing. In other words, a - big LOAD DATA INFILE on the master will still consume a lot of space on - the slave (space in the relay log + space of temp files: twice the space - of the file to load...) even if it will finally be ignored. - TODO: fix this; this can be done by testing rules in - Create_file_log_event::exec_event() and then discarding Append_block and - al. Another way is do the filtering in the I/O thread (more efficient: no - disk writes at all). + We test replicate_*_db rules. Note that we have already prepared + the file to load, even if we are going to ignore and delete it + now. So it is possible that we did a lot of disk writes for + nothing. In other words, a big LOAD DATA INFILE on the master will + still consume a lot of space on the slave (space in the relay log + + space of temp files: twice the space of the file to load...) + even if it will finally be ignored. TODO: fix this; this can be + done by testing rules in Create_file_log_event::do_apply_event() + and then discarding Append_block and al. Another way is do the + filtering in the I/O thread (more efficient: no disk writes at + all). Note: We do not need to execute reset_one_shot_variables() if this @@ -3222,8 +3262,8 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli, its companion query. If the SET is ignored because of db_ok(), the companion query will also be ignored, and if the companion query is ignored in the db_ok() test of - ::exec_event(), then the companion SET also have so we - don't need to reset_one_shot_variables(). + ::do_apply_event(), then the companion SET also have so + we don't need to reset_one_shot_variables(). */ if (rpl_filter->db_ok(thd->db)) { @@ -3419,7 +3459,7 @@ Fatal error running LOAD DATA INFILE on table '%s'. Default database: '%s'", return 1; } - return ( use_rli_only_for_errors ? 0 : Log_event::exec_event(rli) ); + return ( use_rli_only_for_errors ? 0 : Log_event::do_apply_event(rli) ); } #endif @@ -3513,6 +3553,7 @@ Rotate_log_event::Rotate_log_event(const char* buf, uint event_len, ident_offset = post_header_len; set_if_smaller(ident_len,FN_REFLEN-1); new_log_ident= my_strndup(buf + ident_offset, (uint) ident_len, MYF(MY_WME)); + DBUG_PRINT("debug", ("new_log_ident: '%s'", new_log_ident)); DBUG_VOID_RETURN; } @@ -3532,8 +3573,20 @@ bool Rotate_log_event::write(IO_CACHE* file) } #endif +/** + Helper function to detect if the event is inside a group. + */ +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) +static bool is_in_group(THD *const thd, RELAY_LOG_INFO *const rli) +{ + return (thd->options & OPTION_BEGIN) != 0 || + (rli->last_event_start_time > 0); +} +#endif + + /* - Rotate_log_event::exec_event() + Rotate_log_event::do_apply_event() Got a rotate log event from the master @@ -3550,34 +3603,49 @@ bool Rotate_log_event::write(IO_CACHE* file) */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Rotate_log_event::exec_event(struct st_relay_log_info* rli) +int Rotate_log_event::do_update_pos(RELAY_LOG_INFO *rli) { - DBUG_ENTER("Rotate_log_event::exec_event"); + DBUG_ENTER("Rotate_log_event::do_update_pos"); +#ifndef DBUG_OFF + char buf[32]; +#endif + + DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu", + (ulong) this->server_id, (ulong) ::server_id)); + DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident)); + DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf))); pthread_mutex_lock(&rli->data_lock); rli->event_relay_log_pos= my_b_tell(rli->cur_log); /* - If we are in a transaction: the only normal case is when the I/O thread was - copying a big transaction, then it was stopped and restarted: we have this - in the relay log: + If we are in a transaction or in a group: the only normal case is + when the I/O thread was copying a big transaction, then it was + stopped and restarted: we have this in the relay log: + BEGIN ... ROTATE (a fake one) ... COMMIT or ROLLBACK - In that case, we don't want to touch the coordinates which correspond to - the beginning of the transaction. - Starting from 5.0.0, there also are some rotates from the slave itself, in - the relay log. + + In that case, we don't want to touch the coordinates which + correspond to the beginning of the transaction. Starting from + 5.0.0, there also are some rotates from the slave itself, in the + relay log, which shall not change the group positions. */ - if (!(thd->options & OPTION_BEGIN)) + if ((server_id != ::server_id || rli->replicate_same_server_id) && + !is_in_group(thd, rli)) { + DBUG_PRINT("info", ("old group_master_log_name: '%s' " + "old group_master_log_pos: %lu", + rli->group_master_log_name, + (ulong) rli->group_master_log_pos)); memcpy(rli->group_master_log_name, new_log_ident, ident_len+1); rli->notify_group_master_log_name_update(); rli->group_master_log_pos= pos; rli->group_relay_log_pos= rli->event_relay_log_pos; - DBUG_PRINT("info", ("group_master_log_name: '%s' " - "group_master_log_pos: %lu", + DBUG_PRINT("info", ("new group_master_log_name: '%s' " + "new group_master_log_pos: %lu", rli->group_master_log_name, (ulong) rli->group_master_log_pos)); /* @@ -3596,8 +3664,27 @@ int Rotate_log_event::exec_event(struct st_relay_log_info* rli) pthread_mutex_unlock(&rli->data_lock); pthread_cond_broadcast(&rli->data_cond); flush_relay_log_info(rli); + DBUG_RETURN(0); } + + +Log_event::enum_skip_reason +Rotate_log_event::do_shall_skip(RELAY_LOG_INFO *rli) +{ + enum_skip_reason reason= Log_event::do_shall_skip(rli); + + switch (reason) { + case Log_event::EVENT_SKIP_NOT: + case Log_event::EVENT_SKIP_COUNT: + return Log_event::EVENT_SKIP_NOT; + + case Log_event::EVENT_SKIP_IGNORE: + return Log_event::EVENT_SKIP_IGNORE; + } + DBUG_ASSERT(0); +} + #endif @@ -3704,11 +3791,11 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) /* - Intvar_log_event::exec_event() + Intvar_log_event::do_apply_event() */ #if defined(HAVE_REPLICATION)&& !defined(MYSQL_CLIENT) -int Intvar_log_event::exec_event(struct st_relay_log_info* rli) +int Intvar_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { switch (type) { case LAST_INSERT_ID_EVENT: @@ -3719,9 +3806,33 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli) thd->force_one_auto_inc_interval(val); break; } + return 0; +} + +int Intvar_log_event::do_update_pos(RELAY_LOG_INFO *rli) +{ rli->inc_event_relay_log_pos(); return 0; } + + +Log_event::enum_skip_reason +Intvar_log_event::do_shall_skip(RELAY_LOG_INFO *rli) +{ + /* + It is a common error to set the slave skip counter to 1 instead of + 2 when recovering from an insert which used a auto increment, + rand, or user var. Therefore, if the slave skip counter is 1, we + just say that this event should be skipped by ignoring it, meaning + that we do not change the value of the slave skip counter since it + will be decreased by the following insert event. + */ + if (rli->slave_skip_counter == 1) + return Log_event::EVENT_SKIP_IGNORE; + else + return Log_event::do_shall_skip(rli); +} + #endif @@ -3784,13 +3895,37 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Rand_log_event::exec_event(struct st_relay_log_info* rli) +int Rand_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { thd->rand.seed1= (ulong) seed1; thd->rand.seed2= (ulong) seed2; + return 0; +} + +int Rand_log_event::do_update_pos(RELAY_LOG_INFO *rli) +{ rli->inc_event_relay_log_pos(); return 0; } + + +Log_event::enum_skip_reason +Rand_log_event::do_shall_skip(RELAY_LOG_INFO *rli) +{ + /* + It is a common error to set the slave skip counter to 1 instead of + 2 when recovering from an insert which used a auto increment, + rand, or user var. Therefore, if the slave skip counter is 1, we + just say that this event should be skipped by ignoring it, meaning + that we do not change the value of the slave skip counter since it + will be decreased by the following insert event. + */ + if (rli->slave_skip_counter == 1) + return Log_event::EVENT_SKIP_IGNORE; + else + return Log_event::do_shall_skip(rli); +} + #endif /* !MYSQL_CLIENT */ @@ -3857,12 +3992,12 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Xid_log_event::exec_event(struct st_relay_log_info* rli) +int Xid_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { /* For a slave Xid_log_event is COMMIT */ general_log_print(thd, COM_QUERY, "COMMIT /* implicit, from Xid_log_event */"); - return end_trans(thd, COMMIT) || Log_event::exec_event(rli); + return end_trans(thd, COMMIT); } #endif /* !MYSQL_CLIENT */ @@ -4140,11 +4275,11 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) /* - User_var_log_event::exec_event() + User_var_log_event::do_apply_event() */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int User_var_log_event::exec_event(struct st_relay_log_info* rli) +int User_var_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { Item *it= 0; CHARSET_INFO *charset; @@ -4206,9 +4341,31 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) e.update_hash(val, val_len, type, charset, DERIVATION_IMPLICIT, 0); free_root(thd->mem_root,0); + return 0; +} + +int User_var_log_event::do_update_pos(RELAY_LOG_INFO *rli) +{ rli->inc_event_relay_log_pos(); return 0; } + +Log_event::enum_skip_reason +User_var_log_event::do_shall_skip(RELAY_LOG_INFO *rli) +{ + /* + It is a common error to set the slave skip counter to 1 instead + of 2 when recovering from an insert which used a auto increment, + rand, or user var. Therefore, if the slave skip counter is 1, we + just say that this event should be skipped by ignoring it, meaning + that we do not change the value of the slave skip counter since it + will be decreased by the following insert event. + */ + if (rli->slave_skip_counter == 1) + return Log_event::EVENT_SKIP_IGNORE; + else + return Log_event::do_shall_skip(rli); +} #endif /* !MYSQL_CLIENT */ @@ -4248,7 +4405,7 @@ void Slave_log_event::pack_info(Protocol *protocol) #ifndef MYSQL_CLIENT Slave_log_event::Slave_log_event(THD* thd_arg, - struct st_relay_log_info* rli) + RELAY_LOG_INFO* rli) :Log_event(thd_arg, 0, 0) , mem_pool(0), master_host(0) { DBUG_ENTER("Slave_log_event"); @@ -4358,11 +4515,11 @@ Slave_log_event::Slave_log_event(const char* buf, uint event_len) #ifndef MYSQL_CLIENT -int Slave_log_event::exec_event(struct st_relay_log_info* rli) +int Slave_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { if (mysql_bin_log.is_open()) mysql_bin_log.write(this); - return Log_event::exec_event(rli); + return 0; } #endif /* !MYSQL_CLIENT */ @@ -4391,21 +4548,21 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) /* - Stop_log_event::exec_event() - - The master stopped. - We used to clean up all temporary tables but this is useless as, as the - master has shut down properly, it has written all DROP TEMPORARY TABLE - (prepared statements' deletion is TODO only when we binlog prep stmts). - We used to clean up slave_load_tmpdir, but this is useless as it has been - cleared at the end of LOAD DATA INFILE. - So we have nothing to do here. - The place were we must do this cleaning is in Start_log_event_v3::exec_event(), - not here. Because if we come here, the master was sane. + Stop_log_event::do_apply_event() + + The master stopped. We used to clean up all temporary tables but + this is useless as, as the master has shut down properly, it has + written all DROP TEMPORARY TABLE (prepared statements' deletion is + TODO only when we binlog prep stmts). We used to clean up + slave_load_tmpdir, but this is useless as it has been cleared at the + end of LOAD DATA INFILE. So we have nothing to do here. The place + were we must do this cleaning is in + Start_log_event_v3::do_apply_event(), not here. Because if we come + here, the master was sane. */ #ifndef MYSQL_CLIENT -int Stop_log_event::exec_event(struct st_relay_log_info* rli) +int Stop_log_event::do_update_pos(RELAY_LOG_INFO *rli) { /* We do not want to update master_log pos because we get a rotate event @@ -4423,6 +4580,7 @@ int Stop_log_event::exec_event(struct st_relay_log_info* rli) } return 0; } + #endif /* !MYSQL_CLIENT */ #endif /* HAVE_REPLICATION */ @@ -4613,11 +4771,11 @@ void Create_file_log_event::pack_info(Protocol *protocol) /* - Create_file_log_event::exec_event() + Create_file_log_event::do_apply_event() */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Create_file_log_event::exec_event(struct st_relay_log_info* rli) +int Create_file_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { char proc_info[17+FN_REFLEN+10], *fname_buf; char *ext; @@ -4679,7 +4837,7 @@ err: if (fd >= 0) my_close(fd, MYF(0)); thd->proc_info= 0; - return error ? 1 : Log_event::exec_event(rli); + return error == 0; } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ @@ -4787,15 +4945,15 @@ int Append_block_log_event::get_create_or_append() const } /* - Append_block_log_event::exec_event() + Append_block_log_event::do_apply_event() */ -int Append_block_log_event::exec_event(struct st_relay_log_info* rli) +int Append_block_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { char proc_info[17+FN_REFLEN+10], *fname= proc_info+17; int fd; int error = 1; - DBUG_ENTER("Append_block_log_event::exec_event"); + DBUG_ENTER("Append_block_log_event::do_apply_event"); fname= strmov(proc_info, "Making temp file "); slave_load_file_stem(fname, file_id, server_id, ".data"); @@ -4834,7 +4992,7 @@ err: if (fd >= 0) my_close(fd, MYF(0)); thd->proc_info= 0; - DBUG_RETURN(error ? error : Log_event::exec_event(rli)); + DBUG_RETURN(error); } #endif @@ -4918,18 +5076,18 @@ void Delete_file_log_event::pack_info(Protocol *protocol) #endif /* - Delete_file_log_event::exec_event() + Delete_file_log_event::do_apply_event() */ #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -int Delete_file_log_event::exec_event(struct st_relay_log_info* rli) +int Delete_file_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { char fname[FN_REFLEN+10]; char *ext= slave_load_file_stem(fname, file_id, server_id, ".data"); (void) my_delete(fname, MYF(MY_WME)); strmov(ext, ".info"); (void) my_delete(fname, MYF(MY_WME)); - return Log_event::exec_event(rli); + return 0; } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ @@ -5015,10 +5173,10 @@ void Execute_load_log_event::pack_info(Protocol *protocol) /* - Execute_load_log_event::exec_event() + Execute_load_log_event::do_apply_event() */ -int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) +int Execute_load_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { char fname[FN_REFLEN+10]; char *ext; @@ -5049,14 +5207,15 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli) lev->thd = thd; /* - lev->exec_event should use rli only for errors - i.e. should not advance rli's position. - lev->exec_event is the place where the table is loaded (it calls - mysql_load()). + lev->do_apply_event should use rli only for errors i.e. should + not advance rli's position. + + lev->do_apply_event is the place where the table is loaded (it + calls mysql_load()). */ - rli->future_group_master_log_pos= log_pos; - if (lev->exec_event(0,rli,1)) + const_cast<RELAY_LOG_INFO*>(rli)->future_group_master_log_pos= log_pos; + if (lev->do_apply_event(0,rli,1)) { /* We want to indicate the name of the file that could not be loaded @@ -5099,7 +5258,7 @@ err: my_close(fd, MYF(0)); end_io_cache(&file); } - return error ? error : Log_event::exec_event(rli); + return error; } #endif /* defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) */ @@ -5267,7 +5426,7 @@ void Execute_load_query_log_event::pack_info(Protocol *protocol) int -Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli) +Execute_load_query_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { char *p; char *buf; @@ -5304,7 +5463,7 @@ Execute_load_query_log_event::exec_event(struct st_relay_log_info* rli) p= strmake(p, STRING_WITH_LEN(" INTO")); p= strmake(p, query+fn_pos_end, q_len-fn_pos_end); - error= Query_log_event::exec_event(rli, buf, p-buf); + error= Query_log_event::do_apply_event(rli, buf, p-buf); /* Forging file name for deletion in same buffer */ *fname_end= 0; @@ -5624,7 +5783,7 @@ int Rows_log_event::do_add_row_data(byte *const row_data, the master does not have a default value (and isn't nullable) */ static int -unpack_row(RELAY_LOG_INFO *rli, +unpack_row(RELAY_LOG_INFO const *rli, TABLE *table, uint const colcnt, char const *row, MY_BITMAP const *cols, char const **row_end, ulong *master_reclength, @@ -5730,17 +5889,17 @@ unpack_row(RELAY_LOG_INFO *rli, DBUG_RETURN(error); } -int Rows_log_event::exec_event(st_relay_log_info *rli) +int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { - DBUG_ENTER("Rows_log_event::exec_event(st_relay_log_info*)"); + DBUG_ENTER("Rows_log_event::do_apply_event(st_relay_log_info*)"); int error= 0; char const *row_start= (char const *)m_rows_buf; /* - If m_table_id == ~0UL, then we have a dummy event that does - not contain any data. In that case, we just remove all tables in - the tables_to_lock list, close the thread tables, step the relay - log position, and return with success. + If m_table_id == ~0UL, then we have a dummy event that does not + contain any data. In that case, we just remove all tables in the + tables_to_lock list, close the thread tables, and return with + success. The relay log position will be stepped in */ if (m_table_id == ~0UL) { @@ -5750,16 +5909,16 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) */ DBUG_ASSERT(get_flags(STMT_END_F)); - rli->clear_tables_to_lock(); + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock(); close_thread_tables(thd); thd->clear_error(); - rli->inc_event_relay_log_pos(); DBUG_RETURN(0); } /* 'thd' has been set by exec_relay_log_event(), just before calling - exec_event(). We still check here to prevent future coding errors. + do_apply_event(). We still check here to prevent future coding + errors. */ DBUG_ASSERT(rli->sql_thd == thd); @@ -5775,8 +5934,9 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) /* lock_tables() reads the contents of thd->lex, so they must be - initialized. Contrary to in Table_map_log_event::exec_event() we don't - call mysql_init_query() as that may reset the binlog format. + initialized. Contrary to in + Table_map_log_event::do_apply_event() we don't call + mysql_init_query() as that may reset the binlog format. */ lex_start(thd, NULL, 0); @@ -5805,7 +5965,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) "Error in %s event: when locking tables", get_type_str()); } - rli->clear_tables_to_lock(); + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock(); DBUG_RETURN(error); } @@ -5826,7 +5986,8 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) TABLE_LIST *tables= rli->tables_to_lock; close_tables_for_reopen(thd, &tables); - if ((error= open_tables(thd, &tables, &rli->tables_to_lock_count, 0))) + uint tables_count= rli->tables_to_lock_count; + if ((error= open_tables(thd, &tables, &tables_count, 0))) { if (thd->query_error || thd->is_fatal_error) { @@ -5841,7 +6002,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) "unexpected success or fatal error")); thd->query_error= 1; } - rli->clear_tables_to_lock(); + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock(); DBUG_RETURN(error); } } @@ -5885,24 +6046,24 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) */ for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global) { - rli->m_table_map.set_table(ptr->table_id, ptr->table); + const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table); } #ifdef HAVE_QUERY_CACHE query_cache.invalidate_locked_for_write(rli->tables_to_lock); #endif - rli->clear_tables_to_lock(); + const_cast<RELAY_LOG_INFO*>(rli)->clear_tables_to_lock(); } DBUG_ASSERT(rli->tables_to_lock == NULL && rli->tables_to_lock_count == 0); - TABLE* table= rli->m_table_map.get_table(m_table_id); + TABLE* table= const_cast<RELAY_LOG_INFO*>(rli)->m_table_map.get_table(m_table_id); if (table) { /* table == NULL means that this table should not be replicated - (this was set up by Table_map_log_event::exec_event() which - tested replicate-* rules). + (this was set up by Table_map_log_event::do_apply_event() + which tested replicate-* rules). */ /* @@ -5959,7 +6120,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) break; default: - slave_print_msg(ERROR_LEVEL, rli, error, + slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno, "Error in %s event: row application failed", get_type_str()); thd->query_error= 1; @@ -5969,7 +6130,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) row_start= row_end; } DBUG_EXECUTE_IF("STOP_SLAVE_after_first_Rows_event", - rli->abort_slave=1;); + const_cast<RELAY_LOG_INFO*>(rli)->abort_slave= 1;); error= do_after_row_operations(table, error); if (!cache_stmt) { @@ -5980,11 +6141,12 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) if (error) { /* error has occured during the transaction */ - slave_print_msg(ERROR_LEVEL, rli, error, + slave_print_msg(ERROR_LEVEL, rli, thd->net.last_errno, "Error in %s event: error during transaction execution " "on table %s.%s", get_type_str(), table->s->db.str, table->s->table_name.str); + /* If one day we honour --skip-slave-errors in row-based replication, and the error should be skipped, then we would clear mappings, rollback, @@ -5997,7 +6159,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) rollback at the caller along with sbr. */ thd->reset_current_stmt_binlog_row_based(); - rli->cleanup_context(thd, 0); /* rollback at caller in step with sbr */ + const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, error); thd->query_error= 1; DBUG_RETURN(error); } @@ -6041,8 +6203,7 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) */ thd->reset_current_stmt_binlog_row_based(); - rli->cleanup_context(thd, 0); - rli->transaction_end(thd); + const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, 0); if (error == 0) { @@ -6055,7 +6216,6 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) do not become visible. We still prefer to wipe them out. */ thd->clear_error(); - error= Log_event::exec_event(rli); } else slave_print_msg(ERROR_LEVEL, rli, error, @@ -6082,17 +6242,17 @@ int Rows_log_event::exec_event(st_relay_log_info *rli) wait (reached end of last relay log and nothing gets appended there), we timeout after one minute, and notify DBA about the problem. When WL#2975 is implemented, just remove the member - st_relay_log_info::unsafe_to_stop_at and all its occurences. + st_relay_log_info::last_event_start_time and all its occurences. */ - rli->unsafe_to_stop_at= time(0); + const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0); } DBUG_ASSERT(error == 0); thd->clear_error(); - rli->inc_event_relay_log_pos(); - + DBUG_RETURN(0); } + #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #ifndef MYSQL_CLIENT @@ -6272,15 +6432,15 @@ Table_map_log_event::Table_map_log_event(const char *buf, uint event_len, const char *const vpart= buf + common_header_len + post_header_len; /* Extract the length of the various parts from the buffer */ - byte const* const ptr_dblen= (byte const*)vpart + 0; + byte const *const ptr_dblen= (byte const*)vpart + 0; m_dblen= *(uchar*) ptr_dblen; /* Length of database name + counter + terminating null */ - byte const* const ptr_tbllen= ptr_dblen + m_dblen + 2; + byte const *const ptr_tbllen= ptr_dblen + m_dblen + 2; m_tbllen= *(uchar*) ptr_tbllen; /* Length of table name + counter + terminating null */ - byte const* const ptr_colcnt= ptr_tbllen + m_tbllen + 2; + byte const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2; uchar *ptr_after_colcnt= (uchar*) ptr_colcnt; m_colcnt= net_field_length(&ptr_after_colcnt); @@ -6325,9 +6485,9 @@ Table_map_log_event::~Table_map_log_event() */ #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) -int Table_map_log_event::exec_event(st_relay_log_info *rli) +int Table_map_log_event::do_apply_event(RELAY_LOG_INFO const *rli) { - DBUG_ENTER("Table_map_log_event::exec_event(st_relay_log_info*)"); + DBUG_ENTER("Table_map_log_event::do_apply_event(st_relay_log_info*)"); DBUG_ASSERT(rli->sql_thd == thd); @@ -6450,29 +6610,24 @@ int Table_map_log_event::exec_event(st_relay_log_info *rli) locked by linking the table into the list of tables to lock. */ table_list->next_global= table_list->next_local= rli->tables_to_lock; - rli->tables_to_lock= table_list; - rli->tables_to_lock_count++; + const_cast<RELAY_LOG_INFO*>(rli)->tables_to_lock= table_list; + const_cast<RELAY_LOG_INFO*>(rli)->tables_to_lock_count++; /* 'memory' is freed in clear_tables_to_lock */ } - /* - We explicitly do not call Log_event::exec_event() here since we do not - want the relay log position to be flushed to disk. The flushing will be - done by the last Rows_log_event that either ends a statement (outside a - transaction) or a transaction. - - A table map event can *never* end a transaction or a statement, so we - just step the relay log position. - */ - - if (likely(!error)) - rli->inc_event_relay_log_pos(); DBUG_RETURN(error); err: my_free((gptr) memory, MYF(MY_WME)); DBUG_RETURN(error); } + +int Table_map_log_event::do_update_pos(RELAY_LOG_INFO *rli) +{ + rli->inc_event_relay_log_pos(); + return 0; +} + #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ #ifndef MYSQL_CLIENT @@ -6637,7 +6792,7 @@ int Write_rows_log_event::do_after_row_operations(TABLE *table, int error) return error; } -int Write_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli, +int Write_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli, TABLE *table, char const *const row_start, char const **const row_end) @@ -6778,6 +6933,32 @@ copy_extra_record_fields(TABLE *table, return 0; // All OK } +/** + Check if an error is a duplicate key error. + + This function is used to check if an error code is one of the + duplicate key error, i.e., and error code for which it is sensible + to do a <code>get_dup_key()</code> to retrieve the duplicate key. + + @param errcode The error code to check. + + @return <code>true</code> if the error code is such that + <code>get_dup_key()</code> will return true, <code>false</code> + otherwise. + */ +bool +is_duplicate_key_error(int errcode) +{ + switch (errcode) + { + case HA_ERR_FOUND_DUPP_KEY: + case HA_ERR_FOUND_DUPP_UNIQUE: + return true; + } + return false; +} + + /* Replace the provided record in the database. @@ -6820,7 +7001,7 @@ replace_record(THD *thd, TABLE *table, if ((keynum= table->file->get_dup_key(error)) < 0) { /* We failed to retrieve the duplicate key */ - DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); + DBUG_RETURN(error); } /* @@ -6837,7 +7018,10 @@ replace_record(THD *thd, TABLE *table, { error= table->file->rnd_pos(table->record[1], table->file->dup_ref); if (error) + { + table->file->print_error(error, MYF(0)); DBUG_RETURN(error); + } } else { @@ -6854,12 +7038,15 @@ replace_record(THD *thd, TABLE *table, } key_copy((byte*)key.get(), table->record[0], table->key_info + keynum, 0); - error= table->file->index_read_idx(table->record[1], keynum, + error= table->file->index_read_idx(table->record[1], keynum, (const byte*)key.get(), table->key_info[keynum].key_length, HA_READ_KEY_EXACT); if (error) + { + table->file->print_error(error, MYF(0)); DBUG_RETURN(error); + } } /* @@ -6892,15 +7079,21 @@ replace_record(THD *thd, TABLE *table, { error=table->file->ha_update_row(table->record[1], table->record[0]); + if (error) + table->file->print_error(error, MYF(0)); DBUG_RETURN(error); } else { if ((error= table->file->ha_delete_row(table->record[1]))) + { + table->file->print_error(error, MYF(0)); DBUG_RETURN(error); + } /* Will retry ha_write_row() with the offending row removed. */ } } + DBUG_RETURN(error); } @@ -7239,7 +7432,7 @@ int Delete_rows_log_event::do_after_row_operations(TABLE *table, int error) return error; } -int Delete_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli, +int Delete_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli, TABLE *table, char const *const row_start, char const **const row_end) @@ -7374,7 +7567,7 @@ int Update_rows_log_event::do_after_row_operations(TABLE *table, int error) return error; } -int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO *rli, +int Update_rows_log_event::do_prepare_row(THD *thd, RELAY_LOG_INFO const *rli, TABLE *table, char const *const row_start, char const **const row_end) diff --git a/sql/log_event.h b/sql/log_event.h index 7cbe8925d9a..7cd231a8353 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -503,6 +503,7 @@ class THD; class Format_description_log_event; struct st_relay_log_info; +typedef st_relay_log_info RELAY_LOG_INFO; #ifdef MYSQL_CLIENT /* @@ -591,6 +592,33 @@ typedef struct st_print_event_info class Log_event { public: + /** + Enumeration of what kinds of skipping (and non-skipping) that can + occur when the slave executes an event. + + @see shall_skip + @see do_shall_skip + */ + enum enum_skip_reason { + /** + Don't skip event. + */ + EVENT_SKIP_NOT, + + /** + Skip event by ignoring it. + + This means that the slave skip counter will not be changed. + */ + EVENT_SKIP_IGNORE, + + /** + Skip event and decrease skip counter. + */ + EVENT_SKIP_COUNT + }; + + /* The following type definition is to be used whenever data is placed and manipulated in a common buffer. Use this typedef for buffers @@ -672,16 +700,14 @@ public: static void init_show_field_list(List<Item>* field_list); #ifdef HAVE_REPLICATION int net_send(Protocol *protocol, const char* log_name, my_off_t pos); + /* pack_info() is used by SHOW BINLOG EVENTS; as print() it prepares and sends a string to display to the user, so it resembles print(). */ + virtual void pack_info(Protocol *protocol); - /* - The SQL slave thread calls exec_event() to execute the event; this is where - the slave's data is modified. - */ - virtual int exec_event(struct st_relay_log_info* rli); + #endif /* HAVE_REPLICATION */ virtual const char* get_db() { @@ -754,6 +780,127 @@ public: *description_event); /* returns the human readable name of the event's type */ const char* get_type_str(); + +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) +public: + + /** + Apply the event to the database. + + This function represents the public interface for applying an + event. + + @see do_apply_event + */ + int apply_event(RELAY_LOG_INFO const *rli) { + return do_apply_event(rli); + } + + + /** + Update the relay log position. + + This function represents the public interface for "stepping over" + the event and will update the relay log information. + + @see do_update_pos + */ + int update_pos(RELAY_LOG_INFO *rli) + { + return do_update_pos(rli); + } + + /** + Decide if the event shall be skipped, and the reason for skipping + it. + + @see do_shall_skip + */ + enum_skip_reason shall_skip(RELAY_LOG_INFO *rli) + { + return do_shall_skip(rli); + } + +protected: + /** + Primitive to apply an event to the database. + + This is where the change to the database is made. + + @note The primitive is protected instead of private, since there + is a hierarchy of actions to be performed in some cases. + + @see Format_description_log_event::do_apply_event() + + @param rli Pointer to relay log info structure + + @retval 0 Event applied successfully + @retval errno Error code if event application failed + */ + virtual int do_apply_event(RELAY_LOG_INFO const *rli) + { + return 0; /* Default implementation does nothing */ + } + + + /** + Advance relay log coordinates. + + This function is called to advance the relay log coordinates to + just after the event. It is essential that both the relay log + coordinate and the group log position is updated correctly, since + this function is used also for skipping events. + + Normally, each implementation of do_update_pos() shall: + + - Update the event position to refer to the position just after + the event. + + - Update the group log position to refer to the position just + after the event <em>if the event is last in a group</em> + + @param rli Pointer to relay log info structure + + @retval 0 Coordinates changed successfully + @retval errno Error code if advancing failed (usually just + 1). Observe that handler errors are returned by the + do_apply_event() function, and not by this one. + */ + virtual int do_update_pos(RELAY_LOG_INFO *rli); + + + /** + Decide if this event shall be skipped or not and the reason for + skipping it. + + The default implementation decide that the event shall be skipped + if either: + + - the server id of the event is the same as the server id of the + server and <code>rli->replicate_same_server_id</code> is true, + or + + - if <code>rli->slave_skip_counter</code> is greater than zero. + + @see do_apply_event + @see do_update_pos + + @retval Log_event::EVENT_SKIP_NOT + The event shall not be skipped and should be applied. + + @retval Log_event::EVENT_SKIP_IGNORE + The event shall be skipped by just ignoring it, i.e., the slave + skip counter shall not be changed. This happends if, for example, + the originating server id of the event is the same as the server + id of the slave. + + @retval Log_event::EVENT_SKIP_COUNT + The event shall be skipped because the slave skip counter was + non-zero. The caller shall decrease the counter by one. + */ + virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli); + +#endif }; /* @@ -794,10 +941,10 @@ public: uint16 error_code; ulong thread_id; /* - For events created by Query_log_event::exec_event (and - Load_log_event::exec_event()) we need the *original* thread id, to be able - to log the event with the original (=master's) thread id (fix for - BUG#1686). + For events created by Query_log_event::do_apply_event (and + Load_log_event::do_apply_event()) we need the *original* thread + id, to be able to log the event with the original (=master's) + thread id (fix for BUG#1686). */ ulong slave_proxy_id; @@ -860,9 +1007,6 @@ public: const char* get_db() { return db; } #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); - int exec_event(struct st_relay_log_info* rli, const char *query_arg, - uint32 q_len_arg); #endif /* HAVE_REPLICATION */ #else void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info); @@ -891,6 +1035,16 @@ public: */ virtual ulong get_post_header_size_for_derived() { return 0; } /* Writes derived event-specific part of post header. */ + +public: /* !!! Public in this patch to allow old usage */ +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); + virtual int do_update_pos(RELAY_LOG_INFO *rli); + + int do_apply_event(RELAY_LOG_INFO const *rli, + const char *query_arg, + uint32 q_len_arg); +#endif /* HAVE_REPLICATION */ }; @@ -939,9 +1093,8 @@ public: uint16 master_port; #ifndef MYSQL_CLIENT - Slave_log_event(THD* thd_arg, struct st_relay_log_info* rli); + Slave_log_event(THD* thd_arg, RELAY_LOG_INFO* rli); void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); #endif @@ -954,6 +1107,11 @@ public: #ifndef MYSQL_CLIENT bool write(IO_CACHE* file); #endif + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const* rli); +#endif }; #endif /* HAVE_REPLICATION */ @@ -1023,12 +1181,6 @@ public: const char* get_db() { return db; } #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli) - { - return exec_event(thd->slave_net,rli,0); - } - int exec_event(NET* net, struct st_relay_log_info* rli, - bool use_rli_only_for_errors); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1060,6 +1212,17 @@ public: + LOAD_HEADER_LEN + sql_ex.data_size() + field_block_len + num_fields); } + +public: /* !!! Public in this patch to allow old usage */ +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const* rli) + { + return do_apply_event(thd->slave_net,rli,0); + } + + int do_apply_event(NET *net, RELAY_LOG_INFO const *rli, + bool use_rli_only_for_errors); +#endif }; extern char server_version[SERVER_VERSION_LENGTH]; @@ -1117,7 +1280,6 @@ public: Start_log_event_v3(); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else Start_log_event_v3() {} @@ -1137,6 +1299,22 @@ public: return START_V3_HEADER_LEN; //no variable-sized part } virtual bool is_artificial_event() { return artificial_event; } + +protected: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); + virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO*) + { + /* + Events from ourself should be skipped, but they should not + decrease the slave skip counter. + */ + if (this->server_id == ::server_id) + return Log_event::EVENT_SKIP_IGNORE; + else + return Log_event::EVENT_SKIP_NOT; + } +#endif }; @@ -1162,13 +1340,6 @@ public: uchar server_version_split[3]; Format_description_log_event(uint8 binlog_ver, const char* server_ver=0); - -#ifndef MYSQL_CLIENT -#ifdef HAVE_REPLICATION - int exec_event(struct st_relay_log_info* rli); -#endif /* HAVE_REPLICATION */ -#endif - Format_description_log_event(const char* buf, uint event_len, const Format_description_log_event* description_event); ~Format_description_log_event() { my_free((gptr)post_header_len, MYF(0)); } @@ -1191,7 +1362,15 @@ public: */ return FORMAT_DESCRIPTION_HEADER_LEN; } + void calc_server_version_split(); + +protected: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); + virtual int do_update_pos(RELAY_LOG_INFO *rli); + virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli); +#endif }; @@ -1215,7 +1394,6 @@ public: {} #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1230,6 +1408,13 @@ public: bool write(IO_CACHE* file); #endif bool is_valid() const { return 1; } + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); + virtual int do_update_pos(RELAY_LOG_INFO *rli); + virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli); +#endif }; @@ -1256,7 +1441,6 @@ class Rand_log_event: public Log_event {} #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1270,6 +1454,13 @@ class Rand_log_event: public Log_event bool write(IO_CACHE* file); #endif bool is_valid() const { return 1; } + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); + virtual int do_update_pos(RELAY_LOG_INFO *rli); + virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli); +#endif }; /***************************************************************************** @@ -1293,7 +1484,6 @@ class Xid_log_event: public Log_event Xid_log_event(THD* thd_arg, my_xid x): Log_event(thd_arg,0,0), xid(x) {} #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1307,6 +1497,11 @@ class Xid_log_event: public Log_event bool write(IO_CACHE* file); #endif bool is_valid() const { return 1; } + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); +#endif }; /***************************************************************************** @@ -1336,7 +1531,6 @@ public: val_len(val_len_arg), type(type_arg), charset_number(charset_number_arg) { is_null= !val; } void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); #endif @@ -1348,6 +1542,13 @@ public: bool write(IO_CACHE* file); #endif bool is_valid() const { return 1; } + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); + virtual int do_update_pos(RELAY_LOG_INFO *rli); + virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli); +#endif }; @@ -1362,7 +1563,6 @@ public: #ifndef MYSQL_CLIENT Stop_log_event() :Log_event() {} - int exec_event(struct st_relay_log_info* rli); #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); #endif @@ -1373,6 +1573,22 @@ public: ~Stop_log_event() {} Log_event_type get_type_code() { return STOP_EVENT;} bool is_valid() const { return 1; } + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_update_pos(RELAY_LOG_INFO *rli); + virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli) + { + /* + Events from ourself should be skipped, but they should not + decrease the slave skip counter. + */ + if (this->server_id == ::server_id) + return Log_event::EVENT_SKIP_IGNORE; + else + return Log_event::EVENT_SKIP_NOT; + } +#endif }; /***************************************************************************** @@ -1399,7 +1615,6 @@ public: ulonglong pos_arg, uint flags); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1418,6 +1633,12 @@ public: #ifndef MYSQL_CLIENT bool write(IO_CACHE* file); #endif + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_update_pos(RELAY_LOG_INFO *rli); + virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli); +#endif }; @@ -1452,7 +1673,6 @@ public: bool using_trans); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1486,6 +1706,11 @@ public: */ bool write_base(IO_CACHE* file); #endif + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); +#endif }; @@ -1518,7 +1743,6 @@ public: Append_block_log_event(THD* thd, const char* db_arg, char* block_arg, uint block_len_arg, bool using_trans); #ifdef HAVE_REPLICATION - int exec_event(struct st_relay_log_info* rli); void pack_info(Protocol* protocol); virtual int get_create_or_append() const; #endif /* HAVE_REPLICATION */ @@ -1536,6 +1760,11 @@ public: bool write(IO_CACHE* file); const char* get_db() { return db; } #endif + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); +#endif }; @@ -1555,7 +1784,6 @@ public: Delete_file_log_event(THD* thd, const char* db_arg, bool using_trans); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1572,6 +1800,11 @@ public: bool write(IO_CACHE* file); const char* get_db() { return db; } #endif + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); +#endif }; @@ -1591,7 +1824,6 @@ public: Execute_load_log_event(THD* thd, const char* db_arg, bool using_trans); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1607,6 +1839,11 @@ public: bool write(IO_CACHE* file); const char* get_db() { return db; } #endif + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); +#endif }; @@ -1676,7 +1913,6 @@ public: bool using_trans, bool suppress_use); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); - int exec_event(struct st_relay_log_info* rli); #endif /* HAVE_REPLICATION */ #else void print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -1695,7 +1931,12 @@ public: #ifndef MYSQL_CLIENT bool write_post_header_for_derived(IO_CACHE* file); #endif - }; + +private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); +#endif +}; #ifdef MYSQL_CLIENT @@ -1793,7 +2034,6 @@ public: #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) - virtual int exec_event(struct st_relay_log_info *rli); virtual void pack_info(Protocol *protocol); #endif @@ -1803,6 +2043,11 @@ public: private: +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); + virtual int do_update_pos(RELAY_LOG_INFO *rli); +#endif + #ifndef MYSQL_CLIENT TABLE *m_table; #endif @@ -1886,7 +2131,6 @@ public: flag_set get_flags(flag_set flags) const { return m_flags & flags; } #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) - virtual int exec_event(struct st_relay_log_info *rli); virtual void pack_info(Protocol *protocol); #endif @@ -1970,6 +2214,8 @@ protected: private: #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) + virtual int do_apply_event(RELAY_LOG_INFO const *rli); + /* Primitive to prepare for a sequence of row executions. @@ -2017,7 +2263,7 @@ private: RETURN VALUE Error code, if something went wrong, 0 otherwise. */ - virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*, + virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*, char const *row_start, char const **row_end) = 0; /* @@ -2088,7 +2334,7 @@ private: virtual int do_before_row_operations(TABLE *table); virtual int do_after_row_operations(TABLE *table, int error); - virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*, + virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*, char const *row_start, char const **row_end); virtual int do_exec_row(TABLE *table); #endif @@ -2153,7 +2399,7 @@ private: virtual int do_before_row_operations(TABLE *table); virtual int do_after_row_operations(TABLE *table, int error); - virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*, + virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*, char const *row_start, char const **row_end); virtual int do_exec_row(TABLE *table); #endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */ @@ -2224,7 +2470,7 @@ private: virtual int do_before_row_operations(TABLE *table); virtual int do_after_row_operations(TABLE *table, int error); - virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*, + virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*, char const *row_start, char const **row_end); virtual int do_exec_row(TABLE *table); #endif diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 8a051195dba..16e13f049e3 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -29,14 +29,15 @@ int init_strvar_from_file(char *var, int max_size, IO_CACHE *f, st_relay_log_info::st_relay_log_info() - :no_storage(FALSE), info_fd(-1), cur_log_fd(-1), save_temporary_tables(0), + :no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id), + info_fd(-1), cur_log_fd(-1), save_temporary_tables(0), cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0), abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0), inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), tables_to_lock(0), tables_to_lock_count(0), - unsafe_to_stop_at(0) + last_event_start_time(0) { DBUG_ENTER("st_relay_log_info::st_relay_log_info"); @@ -1001,6 +1002,22 @@ bool st_relay_log_info::is_until_satisfied() log_pos= group_relay_log_pos; } +#ifndef DBUG_OFF + { + char buf[32]; + DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s", + group_master_log_name, llstr(group_master_log_pos, buf))); + DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s", + group_relay_log_name, llstr(group_relay_log_pos, buf))); + DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s", + until_condition == UNTIL_MASTER_POS ? "master" : "relay", + log_name, llstr(log_pos, buf))); + DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s", + until_condition == UNTIL_MASTER_POS ? "master" : "relay", + until_log_name, llstr(until_log_pos, buf))); + } +#endif + if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN) { /* @@ -1056,30 +1073,19 @@ void st_relay_log_info::cached_charset_invalidate() } -bool st_relay_log_info::cached_charset_compare(char *charset) +bool st_relay_log_info::cached_charset_compare(char *charset) const { DBUG_ENTER("st_relay_log_info::cached_charset_compare"); if (bcmp(cached_charset, charset, sizeof(cached_charset))) { - memcpy(cached_charset, charset, sizeof(cached_charset)); + memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset)); DBUG_RETURN(1); } DBUG_RETURN(0); } -void st_relay_log_info::transaction_end(THD* thd) -{ - DBUG_ENTER("st_relay_log_info::transaction_end"); - - /* - Nothing to do here right now. - */ - - DBUG_VOID_RETURN; -} - #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) void st_relay_log_info::cleanup_context(THD *thd, bool error) { @@ -1106,7 +1112,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error) m_table_map.clear_tables(); close_thread_tables(thd); clear_tables_to_lock(); - unsafe_to_stop_at= 0; + last_event_start_time= 0; DBUG_VOID_RETURN; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 45c9fb1cf96..3f06e108f6d 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -58,6 +58,15 @@ typedef struct st_relay_log_info */ bool no_storage; + /* + If true, events with the same server id should be replicated. This + field is set on creation of a relay log info structure by copying + the value of ::replicate_same_server_id and can be overridden if + necessary. For example of when this is done, check sql_binlog.cc, + where the BINLOG statement can be used to execute "raw" events. + */ + bool replicate_same_server_id; + /*** The following variables can only be read when protect by data lock ****/ /* @@ -292,14 +301,19 @@ typedef struct st_relay_log_info When the 6 bytes are equal to 0 is used to mean "cache is invalidated". */ void cached_charset_invalidate(); - bool cached_charset_compare(char *charset); - - void transaction_end(THD*); + bool cached_charset_compare(char *charset) const; void cleanup_context(THD *, bool); void clear_tables_to_lock(); - time_t unsafe_to_stop_at; + /* + Used by row-based replication to detect that it should not stop at + this event, but give it a chance to send more events. The time + where the last event inside a group started is stored here. If the + variable is zero, we are not in a group (but may be in a + transaction). + */ + time_t last_event_start_time; } RELAY_LOG_INFO; diff --git a/sql/rpl_utility.cc b/sql/rpl_utility.cc index 65a44a4947b..1d7cc808f0c 100644 --- a/sql/rpl_utility.cc +++ b/sql/rpl_utility.cc @@ -108,7 +108,7 @@ field_length_from_packed(enum_field_types const field_type, */ int -table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table) +table_def::compatible_with(RELAY_LOG_INFO const *rli_arg, TABLE *table) const { /* @@ -116,6 +116,7 @@ table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table) */ uint const cols_to_check= min(table->s->fields, size()); int error= 0; + RELAY_LOG_INFO const *rli= const_cast<RELAY_LOG_INFO*>(rli_arg); TABLE_SHARE const *const tsh= table->s; diff --git a/sql/rpl_utility.h b/sql/rpl_utility.h index b1aa642619c..17879a9ecfc 100644 --- a/sql/rpl_utility.h +++ b/sql/rpl_utility.h @@ -117,7 +117,7 @@ public: @retval 1 if the table definition is not compatible with @c table @retval 0 if the table definition is compatible with @c table */ - int compatible_with(RELAY_LOG_INFO *rli, TABLE *table) const; + int compatible_with(RELAY_LOG_INFO const *rli, TABLE *table) const; private: my_size_t m_size; // Number of elements in the types array diff --git a/sql/slave.cc b/sql/slave.cc index 6f62f74647a..278edae99f4 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -519,11 +519,11 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli) really one minute of idleness, we don't timeout if the slave SQL thread is actively working. */ - if (!rli->unsafe_to_stop_at) + if (rli->last_event_start_time == 0) DBUG_RETURN(1); DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving " "it some grace period")); - if (difftime(time(0), rli->unsafe_to_stop_at) > 60) + if (difftime(time(0), rli->last_event_start_time) > 60) { slave_print_msg(ERROR_LEVEL, rli, 0, "SQL thread had to stop in an unsafe situation, in " @@ -557,7 +557,7 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli) void */ -void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, +void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli, int err_code, const char* msg, ...) { void (*report_function)(const char *, ...); @@ -579,9 +579,9 @@ void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, It's an error, it must be reported in Last_error and Last_errno in SHOW SLAVE STATUS. */ - pbuff= rli->last_slave_error; + pbuff= const_cast<RELAY_LOG_INFO*>(rli)->last_slave_error; pbuffsize= sizeof(rli->last_slave_error); - rli->last_slave_errno = err_code; + const_cast<RELAY_LOG_INFO*>(rli)->last_slave_errno = err_code; report_function= sql_print_error; break; case WARNING_LEVEL: @@ -813,7 +813,7 @@ do not trust column Seconds_Behind_Master of SHOW SLAVE STATUS"); { if ((master_row= mysql_fetch_row(master_res)) && (::server_id == strtoul(master_row[1], 0, 10)) && - !replicate_same_server_id) + !mi->rli.replicate_same_server_id) errmsg= "The slave I/O thread stops because master and slave have equal \ MySQL server ids; these ids must be different for replication to work (or \ the --replicate-same-server-id option must be used on slave but this does \ @@ -1390,7 +1390,7 @@ void set_slave_thread_options(THD* thd) DBUG_VOID_RETURN; } -void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli) +void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO const *rli) { DBUG_ENTER("set_slave_thread_default_charset"); @@ -1401,7 +1401,14 @@ void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli) thd->variables.collation_server= global_system_variables.collation_server; thd->update_charset(); - rli->cached_charset_invalidate(); + + /* + We use a const cast here since the conceptual (and externally + visible) behavior of the function is to set the default charset of + the thread. That the cache has to be invalidated is a secondary + effect. + */ + const_cast<RELAY_LOG_INFO*>(rli)->cached_charset_invalidate(); DBUG_VOID_RETURN; } @@ -1609,7 +1616,8 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings) } -int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error) +int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli, + int expected_error) { DBUG_ENTER("check_expected_error"); @@ -1715,77 +1723,42 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) if (ev) { int type_code = ev->get_type_code(); - int exec_res; + int exec_res= 0; /* - Queries originating from this server must be skipped. - Low-level events (Format_desc, Rotate, Stop) from this server - must also be skipped. But for those we don't want to modify - group_master_log_pos, because these events did not exist on the master. - Format_desc is not completely skipped. - Skip queries specified by the user in slave_skip_counter. - We can't however skip events that has something to do with the - log files themselves. - Filtering on own server id is extremely important, to ignore execution of - events created by the creation/rotation of the relay log (remember that - now the relay log starts with its Format_desc, has a Rotate etc). */ - DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id)); + DBUG_PRINT("info",("type_code=%d (%s), server_id=%d", + type_code, ev->get_type_str(), ev->server_id)); + DBUG_PRINT("info", ("thd->options={ %s%s}", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN))); - if ((ev->server_id == (uint32) ::server_id && - !replicate_same_server_id && - type_code != FORMAT_DESCRIPTION_EVENT) || - (rli->slave_skip_counter && - type_code != ROTATE_EVENT && type_code != STOP_EVENT && - type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT)) - { - DBUG_PRINT("info", ("event skipped")); - /* - We only skip the event here and do not increase the group log - position. In the event that we have to restart, this means - that we might have to skip the event again, but that is a - minor issue. - - If we were to increase the group log position when skipping an - event, it might be that we are restarting at the wrong - position and have events before that we should have executed, - so not increasing the group log position is a sure bet in this - case. - - In this way, we just step the group log position when we - *know* that we are at the end of a group. - */ - rli->inc_event_relay_log_pos(); - /* - Protect against common user error of setting the counter to 1 - instead of 2 while recovering from an insert which used auto_increment, - rand or user var. - */ - if (rli->slave_skip_counter && - !((type_code == INTVAR_EVENT || - type_code == RAND_EVENT || - type_code == USER_VAR_EVENT) && - rli->slave_skip_counter == 1) && - /* - The events from ourselves which have something to do with the relay - log itself must be skipped, true, but they mustn't decrement - rli->slave_skip_counter, because the user is supposed to not see - these events (they are not in the master's binlog) and if we - decremented, START SLAVE would for example decrement when it sees - the Rotate, so the event which the user probably wanted to skip - would not be skipped. - */ - !(ev->server_id == (uint32) ::server_id && - (type_code == ROTATE_EVENT || type_code == STOP_EVENT || - type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT))) - --rli->slave_skip_counter; - pthread_mutex_unlock(&rli->data_lock); - delete ev; - DBUG_RETURN(0); // avoid infinite update loops - } - pthread_mutex_unlock(&rli->data_lock); + + /* + Execute the event to change the database and update the binary + log coordinates, but first we set some data that is needed for + the thread. + + The event will be executed unless it is supposed to be skipped. + + Queries originating from this server must be skipped. Low-level + events (Format_description_log_event, Rotate_log_event, + Stop_log_event) from this server must also be skipped. But for + those we don't want to modify 'group_master_log_pos', because + these events did not exist on the master. + Format_description_log_event is not completely skipped. + + Skip queries specified by the user in 'slave_skip_counter'. We + can't however skip events that has something to do with the log + files themselves. + + Filtering on own server id is extremely important, to ignore + execution of events created by the creation/rotation of the relay + log (remember that now the relay log starts with its Format_desc, + has a Rotate etc). + */ thd->server_id = ev->server_id; // use the original server id for logging thd->set_time(); // time the query @@ -1793,13 +1766,63 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) if (!ev->when) ev->when = time(NULL); ev->thd = thd; // because up to this point, ev->thd == 0 - DBUG_PRINT("info", ("thd->options={ %s%s}", - FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), - FLAGSTR(thd->options, OPTION_BEGIN))); - exec_res = ev->exec_event(rli); - DBUG_PRINT("info", ("exec_event result: %d", exec_res)); - DBUG_ASSERT(rli->sql_thd==thd); + int reason= ev->shall_skip(rli); + if (reason == Log_event::EVENT_SKIP_COUNT) + --rli->slave_skip_counter; + pthread_mutex_unlock(&rli->data_lock); + if (reason == Log_event::EVENT_SKIP_NOT) + exec_res= ev->apply_event(rli); +#ifndef DBUG_OFF + else + { + /* + This only prints information to the debug trace. + + TODO: Print an informational message to the error log? + */ + static const char *const explain[] = { + "event was not skipped", // EVENT_SKIP_NOT, + "event originated from this server", // EVENT_SKIP_IGNORE, + "event skip counter was non-zero" // EVENT_SKIP_COUNT + }; + DBUG_PRINT("info", ("%s was skipped because %s", + ev->get_type_str(), explain[reason])); + } +#endif + + DBUG_PRINT("info", ("apply_event error = %d", exec_res)); + if (exec_res == 0) + { + int error= ev->update_pos(rli); + char buf[22]; + DBUG_PRINT("info", ("update_pos error = %d", error)); + DBUG_PRINT("info", ("group %s %s", + llstr(rli->group_relay_log_pos, buf), + rli->group_relay_log_name)); + DBUG_PRINT("info", ("event %s %s", + llstr(rli->event_relay_log_pos, buf), + rli->event_relay_log_name)); + /* + The update should not fail, so print an error message and + return an error code. + + TODO: Replace this with a decent error message when merged + with BUG#24954 (which adds several new error message). + */ + if (error) + { + slave_print_msg(ERROR_LEVEL, rli, ER_UNKNOWN_ERROR, + "It was not possible to update the positions" + " of the relay log information: the slave may" + " be in an inconsistent state." + " Stopped in %s position %s", + rli->group_relay_log_name, + llstr(rli->group_relay_log_pos, buf)); + DBUG_RETURN(1); + } + } + /* Format_description_log_event should not be deleted because it will be used to read info about the relay log's format; it will be deleted when @@ -2366,13 +2389,17 @@ Slave SQL thread aborted. Can't execute init_slave query"); THD_CHECK_SENTRY(thd); if (exec_relay_log_event(thd,rli)) { + DBUG_PRINT("info", ("exec_relay_log_event() failed")); // do not scare the user if SQL thread was simply killed or stopped if (!sql_slave_killed(thd,rli)) { /* - retrieve as much info as possible from the thd and, error codes and warnings - and print this to the error log as to allow the user to locate the error + retrieve as much info as possible from the thd and, error + codes and warnings and print this to the error log as to + allow the user to locate the error */ + DBUG_PRINT("info", ("thd->net.last_errno=%d; rli->last_slave_errno=%d", + thd->net.last_errno, rli->last_slave_errno)); if (thd->net.last_errno != 0) { if (rli->last_slave_errno == 0) @@ -2699,6 +2726,7 @@ static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf, my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR)); DBUG_RETURN(1); } + pthread_mutex_lock(&mi->data_lock); ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */ switch (ev->get_type_code()) { @@ -2962,7 +2990,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len) pthread_mutex_lock(log_lock); if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) && - !replicate_same_server_id) + !mi->rli.replicate_same_server_id) { /* Do not write it to the relay log. diff --git a/sql/slave.h b/sql/slave.h index f21266bbee4..107b74c09dd 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -162,9 +162,9 @@ bool show_binlog_info(THD* thd); bool rpl_master_has_bug(RELAY_LOG_INFO *rli, uint bug_id); const char *print_slave_db_safe(const char *db); -int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int error_code); +int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli, int error_code); void skip_load_data_infile(NET* net); -void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli, +void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli, int err_code, const char* msg, ...) ATTRIBUTE_FORMAT(printf, 4, 5); @@ -182,7 +182,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos, int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset, const char** errmsg); void set_slave_thread_options(THD* thd); -void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli); +void set_slave_thread_default_charset(THD *thd, RELAY_LOG_INFO const *rli); void rotate_relay_log(MASTER_INFO* mi); pthread_handler_t handle_slave_io(void *arg); diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index b0a54bec664..6f7bbda96de 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -163,9 +163,17 @@ void mysql_client_binlog_statement(THD* thd) (ulong) uint4korr(bufptr+EVENT_LEN_OFFSET))); #endif ev->thd= thd; - if (IF_DBUG(int err= ) ev->exec_event(thd->rli_fake)) + /* + We go directly to the application phase, since we don't need + to check if the event shall be skipped or not. + + Neither do we have to update the log positions, since that is + not used at all: the rli_fake instance is used only for error + reporting. + */ + if (IF_DBUG(int err= ) ev->apply_event(thd->rli_fake)) { - DBUG_PRINT("error", ("exec_event() returned: %d", err)); + DBUG_PRINT("info", ("apply_event() returned: %d", err)); /* TODO: Maybe a better error message since the BINLOG statement now contains several events. |