diff options
Diffstat (limited to 'sql/log_event_server.cc')
-rw-r--r-- | sql/log_event_server.cc | 449 |
1 files changed, 394 insertions, 55 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index e22ad48489c..8d74f11fdef 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -52,6 +52,7 @@ #include "compat56.h" #include "wsrep_mysqld.h" #include "sql_insert.h" +#include "sql_table.h" #include <my_bitmap.h> #include "rpl_utility.h" @@ -139,7 +140,7 @@ static const char *HA_ERR(int i) deadlocks; such errors are handled automatically by rolling back re-trying the transactions, so should not pollute the error log. */ -static bool +bool is_parallel_retry_error(rpl_group_info *rgi, int err) { if (!rgi->is_parallel_exec) @@ -1304,11 +1305,25 @@ bool Query_log_event::write() start+= 8; } + if (gtid_flags_extra) + { + *start++= Q_GTID_FLAGS3; + *start++= gtid_flags_extra; + if (gtid_flags_extra & + (Gtid_log_event::FL_COMMIT_ALTER_E1 | + Gtid_log_event::FL_ROLLBACK_ALTER_E1)) + { + int8store(start, sa_seq_no); + start+= 8; + } + } + + /* NOTE: When adding new status vars, please don't forget to update the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function code_name() in this file. - + Here there could be code like if (command-line-option-which-says-"log_this_variable" && inited) { @@ -1416,7 +1431,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, lc_time_names_number(thd_arg->variables.lc_time_names->number), charset_database_number(0), table_map_for_update((ulonglong)thd_arg->table_map_for_update), - master_data_written(0) + master_data_written(0), + gtid_flags_extra(thd_arg->get_binlog_flags_for_alter()), + sa_seq_no(0) { /* status_vars_len is set just before writing the event */ @@ -1552,11 +1569,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, use_cache= trx_cache= TRUE; break; default: - use_cache= sqlcom_can_generate_row_events(thd); + use_cache= (gtid_flags_extra) ? false : sqlcom_can_generate_row_events(thd); break; } } + if (gtid_flags_extra & (Gtid_log_event::FL_COMMIT_ALTER_E1 | + Gtid_log_event::FL_ROLLBACK_ALTER_E1)) + sa_seq_no= thd_arg->get_binlog_start_alter_seq_no(); + if (!use_cache || direct) { cache_type= Log_event::EVENT_NO_CACHE; @@ -1628,6 +1649,223 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error) } +static start_alter_info *get_new_start_alter_info(THD *thd) +{ + /* + Why on global memory ?- So that process_commit/rollback_alter should not get + error when spawned threads exits too early. + */ + start_alter_info *info; + if (!(info= (start_alter_info *)my_malloc(PSI_INSTRUMENT_ME, + sizeof(start_alter_info), MYF(MY_WME)))) + { + sql_print_error("Failed to allocate memory for ddl log free list"); + return 0; + } + info->sa_seq_no= 0; + info->domain_id= 0; + info->direct_commit_alter= false; + info->state= start_alter_state::INVALID; + mysql_cond_init(0, &info->start_alter_cond, NULL); + info->error= 0; + + return info; +} + + +/* + Perform necessary actions for two-phase-logged ALTER parts, to + return + + 0 when the event's query proceeds normal parsing and execution + 1 when the event skips parsing and execution + -1 as error. +*/ +int Query_log_event::handle_split_alter_query_log_event(rpl_group_info *rgi, + bool &skip_error_check) +{ + int rc= 0; + + rgi->gtid_ev_flags_extra= gtid_flags_extra; + if (gtid_flags_extra & Gtid_log_event::FL_START_ALTER_E1) + { + //No Slave, Normal Slave, Start Alter under Worker 1 will simple binlog and exit + if(!rgi->rpt || rgi->reserved_start_alter_thread || WSREP(thd)) + { + rc= 1; + /* + We will just write the binlog and move to next event , because COMMIT + Alter will take care of actual work + */ + rgi->reserved_start_alter_thread= false; + thd->lex->sql_command= SQLCOM_ALTER_TABLE; + Write_log_with_flags wlwf(thd, Gtid_log_event::FL_START_ALTER_E1, + true /* wsrep to isolation end */); +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_thd_is_local(thd) && + // no need to supply other than db in this case + wsrep_to_isolation_begin(thd, db, NULL,NULL,NULL,NULL,NULL)) + return -1; +#endif + if (write_bin_log(thd, false, thd->query(), thd->query_length())) + return -1; + + my_ok(thd); + return rc; + } + if (!rgi->sa_info) + rgi->sa_info= get_new_start_alter_info(thd); + else + { + /* Not send Start-Alter into query execution when it's to rollback */ + mysql_mutex_lock(&rgi->rli->mi->start_alter_lock); + if (rgi->sa_info->state == start_alter_state::ROLLBACK_ALTER) + mysql_cond_broadcast(&rgi->sa_info->start_alter_cond); + mysql_mutex_unlock(&rgi->rli->mi->start_alter_lock); + } + + return rc; + } + + bool is_CA= (gtid_flags_extra & Gtid_log_event::FL_COMMIT_ALTER_E1) ? true : false; + if (is_CA) + { + DBUG_EXECUTE_IF("rpl_slave_stop_CA_before_binlog", + { + // the awake comes from STOP-SLAVE running driver (sql) thread + debug_sync_set_action(thd, + STRING_WITH_LEN("now WAIT_FOR proceed_CA_1")); + }); + } + start_alter_info *info=NULL; + Master_info *mi= NULL; + + rgi->gtid_ev_sa_seq_no= sa_seq_no; + // is set for both the direct execution and the write to binlog + thd->set_binlog_start_alter_seq_no(sa_seq_no); + mi= rgi->rli->mi; + mysql_mutex_lock(&mi->start_alter_list_lock); + { + List_iterator<start_alter_info> info_iterator(mi->start_alter_list); + while ((info= info_iterator++)) + { + if(info->sa_seq_no == rgi->gtid_ev_sa_seq_no && + info->domain_id == rgi->current_gtid.domain_id) + { + info_iterator.remove(); + break; + } + } + } + mysql_mutex_unlock(&mi->start_alter_list_lock); + + if (!info) + { + if (is_CA) + { + /* + error handeling, direct_commit_alter is turned on, so that we dont + wait for master reply in mysql_alter_table (in wait_for_master) + */ + rgi->direct_commit_alter= true; +#ifdef WITH_WSREP + if (WSREP(thd)) + thd->set_binlog_flags_for_alter(Gtid_log_event::FL_COMMIT_ALTER_E1); +#endif + goto cleanup; + } + else + { + //Just write the binlog because there is nothing to be done + goto write_binlog; + } + } + + mysql_mutex_lock(&mi->start_alter_lock); + if (info->state != start_alter_state::COMPLETED) + { + if (is_CA) + info->state= start_alter_state::COMMIT_ALTER; + else + info->state= start_alter_state::ROLLBACK_ALTER; + mysql_cond_broadcast(&info->start_alter_cond); + mysql_mutex_unlock(&mi->start_alter_lock); + /* + Wait till Start Alter worker has changed the state to ::COMPLETED + when start alter worker reaches the old code write_bin_log(), it will + change state to COMMITTED. + COMMITTED and `direct_commit_alter == true` at the same time indicates + the query needs re-execution by the CA running thread. + */ + mysql_mutex_lock(&mi->start_alter_lock); + + DBUG_ASSERT(info->state == start_alter_state::COMPLETED || + !info->direct_commit_alter); + + while(info->state != start_alter_state::COMPLETED) + mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock); + } + else + { + // SA has completed and left being kicked out by deadlock or ftwrl + DBUG_ASSERT(info->direct_commit_alter); + } + mysql_mutex_unlock(&mi->start_alter_lock); + + if (info->direct_commit_alter) + { + rgi->direct_commit_alter= true; // execute the query as if there was no SA + if (is_CA) + goto cleanup; + } + +write_binlog: + rc= 1; + + if(!is_CA) + { + if(((info && info->error) || error_code) && + global_system_variables.log_warnings > 2) + { + sql_print_information("Query '%s' having %d error code on master " + "is rolled back%s", query, error_code, + !(info && info->error) ? "." : ";"); + if (info && info->error) + sql_print_information("its execution on slave %sproduced %d error.", + info->error == error_code ? "re":"", info->error); + } + } + { + thd->lex->sql_command= SQLCOM_ALTER_TABLE; + Write_log_with_flags wlwf(thd, is_CA ? Gtid_log_event::FL_COMMIT_ALTER_E1 : + Gtid_log_event::FL_ROLLBACK_ALTER_E1, + true); +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_thd_is_local(thd) && + wsrep_to_isolation_begin(thd, db, NULL,NULL,NULL,NULL,NULL)) + rc= -1; +#endif + if (rc != -1 && + write_bin_log(thd, false, thd->query(), thd->query_length())) + rc= -1; + } + + if (!thd->is_error()) + { + skip_error_check= true; + my_ok(thd); + } + +cleanup: + if (info) + { + mysql_cond_destroy(&info->start_alter_cond); + my_free(info); + } + return rc; +} + + /** @todo Compare the values of "affected rows" around here. Something @@ -1656,6 +1894,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, Relay_log_info const *rli= rgi->rli; Rpl_filter *rpl_filter= rli->mi->rpl_filter; bool current_stmt_is_commit; + bool skip_error_check= false; DBUG_ENTER("Query_log_event::do_apply_event"); /* @@ -1666,6 +1905,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, you. */ thd->catalog= catalog_len ? (char *) catalog : (char *)""; + rgi->start_alter_ev= this; size_t valid_len= Well_formed_prefix(system_charset_info, db, db_len, NAME_LEN).length(); @@ -1710,13 +1950,15 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, */ if (is_trans_keyword() || rpl_filter->db_ok(thd->db.str)) { + bool is_rb_alter= gtid_flags_extra & Gtid_log_event::FL_ROLLBACK_ALTER_E1; + thd->set_time(when, when_sec_part); thd->set_query_and_id((char*)query_arg, q_len_arg, thd->charset(), next_query_id()); thd->variables.pseudo_thread_id= thread_id; // for temp tables DBUG_PRINT("query",("%s", thd->query())); - if (unlikely(!(expected_error= error_code)) || + if (unlikely(!(expected_error= !is_rb_alter ? error_code : 0)) || ignored_error_code(expected_error) || !unexpected_error_code(expected_error)) { @@ -1747,7 +1989,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, if (charset_inited) { rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info; - if (sql_info->cached_charset_compare(charset)) + if (thd->slave_thread && sql_info->cached_charset_compare(charset)) { /* Verify that we support the charsets found in the event. */ if (!(thd->variables.character_set_client= @@ -1885,47 +2127,69 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, thd->variables.option_bits|= OPTION_MASTER_SQL_ERROR; thd->variables.option_bits&= ~OPTION_GTID_BEGIN; } - /* Execute the query (note that we bypass dispatch_command()) */ - Parser_state parser_state; - if (!parser_state.init(thd, thd->query(), thd->query_length())) + + int sa_result= 0; + bool is_2p_alter= gtid_flags_extra & + (Gtid_log_event::FL_START_ALTER_E1 | + Gtid_log_event::FL_COMMIT_ALTER_E1 | + Gtid_log_event::FL_ROLLBACK_ALTER_E1); + if (is_2p_alter) + sa_result= handle_split_alter_query_log_event(rgi, skip_error_check); + if (sa_result == 0) { - DBUG_ASSERT(thd->m_digest == NULL); - thd->m_digest= & thd->m_digest_state; - DBUG_ASSERT(thd->m_statement_psi == NULL); - thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state, - stmt_info_rpl.m_key, - thd->db.str, thd->db.length, - thd->charset(), NULL); - THD_STAGE_INFO(thd, stage_starting); - MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length()); - if (thd->m_digest != NULL) - thd->m_digest->reset(thd->m_token_array, max_digest_length); - - if (thd->slave_thread) - { - /* - To be compatible with previous releases, the slave thread uses the global - log_slow_disabled_statements value, wich can be changed dynamically, so we - have to set the sql_log_slow respectively. - */ - thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE); - } - - mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); - /* Finalize server status flags after executing a statement. */ - thd->update_server_status(); - log_slow_statement(thd); - thd->lex->restore_set_statement_var(); + /* Execute the query (note that we bypass dispatch_command()) */ + Parser_state parser_state; + if (!parser_state.init(thd, thd->query(), thd->query_length())) + { + DBUG_ASSERT(thd->m_digest == NULL); + thd->m_digest= & thd->m_digest_state; + DBUG_ASSERT(thd->m_statement_psi == NULL); + thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state, + stmt_info_rpl.m_key, + thd->db.str, thd->db.length, + thd->charset(), NULL); + THD_STAGE_INFO(thd, stage_starting); + MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length()); + if (thd->m_digest != NULL) + thd->m_digest->reset(thd->m_token_array, max_digest_length); + + if (thd->slave_thread) + { + /* + To be compatible with previous releases, the slave thread uses the global + log_slow_disabled_statements value, wich can be changed dynamically, so we + have to set the sql_log_slow respectively. + */ + thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE); + } + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); + /* Finalize server status flags after executing a statement. */ + thd->update_server_status(); + log_slow_statement(thd); + thd->lex->restore_set_statement_var(); - /* - When THD::slave_expected_error gets reset inside execution stack - that is the case of to be ignored event. In this case the expected - error must change to the reset value as well. - */ - expected_error= thd->slave_expected_error; + /* + When THD::slave_expected_error gets reset inside execution stack + that is the case of to be ignored event. In this case the expected + error must change to the reset value as well. + */ + expected_error= thd->slave_expected_error; + } + } + else if (sa_result == -1) + { + rli->report(ERROR_LEVEL, expected_error, rgi->gtid_info(), + "TODO start alter error"); + thd->is_slave_error= 1; + goto end; } - thd->variables.option_bits&= ~OPTION_MASTER_SQL_ERROR; + if (is_2p_alter && !rgi->is_parallel_exec) + { + rgi->gtid_ev_flags_extra= 0; + rgi->direct_commit_alter= 0; + rgi->gtid_ev_sa_seq_no= 0; + } } else { @@ -1988,7 +2252,8 @@ compare_errors: If we expected a non-zero error code, and we don't get the same error code, and it should be ignored or is related to a concurrency issue. */ - actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0; + actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : + skip_error_check? expected_error : 0; DBUG_PRINT("info",("expected_error: %d sql_errno: %d", expected_error, actual_error)); @@ -2384,6 +2649,39 @@ bool Format_description_log_event::write() } #if defined(HAVE_REPLICATION) +/* + Auxiliary function to conduct cleanup of unfinished two-phase logged ALTERs. +*/ +static void check_and_remove_stale_alter(Relay_log_info *rli) +{ + Master_info *mi= rli->mi; + start_alter_info *info=NULL; + + mysql_mutex_lock(&mi->start_alter_list_lock); + List_iterator<start_alter_info> info_iterator(mi->start_alter_list); + while ((info= info_iterator++)) + { + DBUG_ASSERT(info->state == start_alter_state::REGISTERED); + + sql_print_warning("ALTER query started at %u-%u-%llu could not " + "be completed because of unexpected master server " + "or its binlog change", info->sa_seq_no, // todo:gtid + 0, 0); + info_iterator.remove(); + mysql_mutex_lock(&mi->start_alter_lock); + info->state= start_alter_state::ROLLBACK_ALTER; + mysql_mutex_unlock(&mi->start_alter_lock); + mysql_cond_broadcast(&info->start_alter_cond); + mysql_mutex_lock(&mi->start_alter_lock); + while(info->state != start_alter_state::COMPLETED) + mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock); + mysql_mutex_unlock(&mi->start_alter_lock); + mysql_cond_destroy(&info->start_alter_cond); + my_free(info); + } + mysql_mutex_unlock(&mi->start_alter_list_lock); +} + int Format_description_log_event::do_apply_event(rpl_group_info *rgi) { int ret= 0; @@ -2401,16 +2699,21 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi) original place when it comes to us; we'll know this by checking log_pos ("artificial" events have log_pos == 0). */ - if (!thd->rli_fake && - !is_artificial_event() && created && thd->transaction->all.ha_list) + if (!is_artificial_event() && created && !thd->rli_fake && !thd->rgi_fake) { - /* This is not an error (XA is safe), just an information */ - rli->report(INFORMATION_LEVEL, 0, NULL, - "Rolling back unfinished transaction (no COMMIT " - "or ROLLBACK in relay log). A probable cause is that " - "the master died while writing the transaction to " - "its binary log, thus rolled back too."); - rgi->cleanup_context(thd, 1); + // check_and_remove stale Start Alter:s + if (flags & LOG_EVENT_BINLOG_IN_USE_F) + check_and_remove_stale_alter(rli); + if (thd->transaction->all.ha_list) + { + /* This is not an error (XA is safe), just an information */ + rli->report(INFORMATION_LEVEL, 0, NULL, + "Rolling back unfinished transaction (no COMMIT " + "or ROLLBACK in relay log). A probable cause is that " + "the master died while writing the transaction to " + "its binary log, thus rolled back too."); + rgi->cleanup_context(thd, 1); + } } /* @@ -3330,7 +3633,14 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, extra_engines= count > 1 ? 0 : UCHAR_MAX; } if (extra_engines > 0) - flags_extra|= FL_EXTRA_MULTI_ENGINE; + flags_extra|= FL_EXTRA_MULTI_ENGINE_E1; + } + if (thd->get_binlog_flags_for_alter()) + { + flags_extra |= thd->get_binlog_flags_for_alter(); + if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1)) + sa_seq_no= thd->get_binlog_start_alter_seq_no(); + flags2|= FL_DDL; } } @@ -3403,12 +3713,18 @@ Gtid_log_event::write() buf[write_len]= flags_extra; write_len++; } - if (flags_extra & FL_EXTRA_MULTI_ENGINE) + if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1) { buf[write_len]= extra_engines; write_len++; } + if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1)) + { + int8store(buf + write_len, sa_seq_no); + write_len+= 8; + } + if (write_len < GTID_HEADER_LEN) { bzero(buf+write_len, GTID_HEADER_LEN-write_len); @@ -3472,6 +3788,20 @@ Gtid_log_event::pack_info(Protocol *protocol) p= strmov(p, " cid="); p= longlong10_to_str(commit_id, p, 10); } + if (flags_extra & FL_START_ALTER_E1) + { + p= strmov(p, " START ALTER"); + } + if (flags_extra & FL_COMMIT_ALTER_E1) + { + p= strmov(p, " COMMIT ALTER id="); + p= longlong10_to_str(sa_seq_no, p, 10); + } + if (flags_extra & FL_ROLLBACK_ALTER_E1) + { + p= strmov(p, " ROLLBACK ALTER id="); + p= longlong10_to_str(sa_seq_no, p, 10); + } protocol->store(buf, p-buf, &my_charset_bin); } @@ -3486,6 +3816,9 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) thd->variables.gtid_domain_id= this->domain_id; thd->variables.gtid_seq_no= this->seq_no; rgi->gtid_ev_flags2= flags2; + + rgi->gtid_ev_flags_extra= flags_extra; + rgi->gtid_ev_sa_seq_no= sa_seq_no; thd->reset_for_next_command(); if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates) @@ -3756,6 +4089,12 @@ Gtid_list_log_event::pack_info(Protocol *protocol) uint32 i; bool first; + /* + For output consistency and ease of reading, we sort the GTID list in + ascending order + */ + qsort(list, count, sizeof(rpl_gtid), compare_glle_gtids); + buf.length(0); buf.append(STRING_WITH_LEN("[")); first= true; |