summaryrefslogtreecommitdiff
path: root/sql/log_event_server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event_server.cc')
-rw-r--r--sql/log_event_server.cc449
1 files changed, 394 insertions, 55 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index e22ad48489c..8d74f11fdef 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -52,6 +52,7 @@
#include "compat56.h"
#include "wsrep_mysqld.h"
#include "sql_insert.h"
+#include "sql_table.h"
#include <my_bitmap.h>
#include "rpl_utility.h"
@@ -139,7 +140,7 @@ static const char *HA_ERR(int i)
deadlocks; such errors are handled automatically by rolling back re-trying
the transactions, so should not pollute the error log.
*/
-static bool
+bool
is_parallel_retry_error(rpl_group_info *rgi, int err)
{
if (!rgi->is_parallel_exec)
@@ -1304,11 +1305,25 @@ bool Query_log_event::write()
start+= 8;
}
+ if (gtid_flags_extra)
+ {
+ *start++= Q_GTID_FLAGS3;
+ *start++= gtid_flags_extra;
+ if (gtid_flags_extra &
+ (Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1))
+ {
+ int8store(start, sa_seq_no);
+ start+= 8;
+ }
+ }
+
+
/*
NOTE: When adding new status vars, please don't forget to update
the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function
code_name() in this file.
-
+
Here there could be code like
if (command-line-option-which-says-"log_this_variable" && inited)
{
@@ -1416,7 +1431,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
lc_time_names_number(thd_arg->variables.lc_time_names->number),
charset_database_number(0),
table_map_for_update((ulonglong)thd_arg->table_map_for_update),
- master_data_written(0)
+ master_data_written(0),
+ gtid_flags_extra(thd_arg->get_binlog_flags_for_alter()),
+ sa_seq_no(0)
{
/* status_vars_len is set just before writing the event */
@@ -1552,11 +1569,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
use_cache= trx_cache= TRUE;
break;
default:
- use_cache= sqlcom_can_generate_row_events(thd);
+ use_cache= (gtid_flags_extra) ? false : sqlcom_can_generate_row_events(thd);
break;
}
}
+ if (gtid_flags_extra & (Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1))
+ sa_seq_no= thd_arg->get_binlog_start_alter_seq_no();
+
if (!use_cache || direct)
{
cache_type= Log_event::EVENT_NO_CACHE;
@@ -1628,6 +1649,223 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error)
}
+static start_alter_info *get_new_start_alter_info(THD *thd)
+{
+ /*
+ Why on global memory ?- So that process_commit/rollback_alter should not get
+ error when spawned threads exits too early.
+ */
+ start_alter_info *info;
+ if (!(info= (start_alter_info *)my_malloc(PSI_INSTRUMENT_ME,
+ sizeof(start_alter_info), MYF(MY_WME))))
+ {
+ sql_print_error("Failed to allocate memory for ddl log free list");
+ return 0;
+ }
+ info->sa_seq_no= 0;
+ info->domain_id= 0;
+ info->direct_commit_alter= false;
+ info->state= start_alter_state::INVALID;
+ mysql_cond_init(0, &info->start_alter_cond, NULL);
+ info->error= 0;
+
+ return info;
+}
+
+
+/*
+ Perform necessary actions for two-phase-logged ALTER parts, to
+ return
+
+ 0 when the event's query proceeds normal parsing and execution
+ 1 when the event skips parsing and execution
+ -1 as error.
+*/
+int Query_log_event::handle_split_alter_query_log_event(rpl_group_info *rgi,
+ bool &skip_error_check)
+{
+ int rc= 0;
+
+ rgi->gtid_ev_flags_extra= gtid_flags_extra;
+ if (gtid_flags_extra & Gtid_log_event::FL_START_ALTER_E1)
+ {
+ //No Slave, Normal Slave, Start Alter under Worker 1 will simple binlog and exit
+ if(!rgi->rpt || rgi->reserved_start_alter_thread || WSREP(thd))
+ {
+ rc= 1;
+ /*
+ We will just write the binlog and move to next event , because COMMIT
+ Alter will take care of actual work
+ */
+ rgi->reserved_start_alter_thread= false;
+ thd->lex->sql_command= SQLCOM_ALTER_TABLE;
+ Write_log_with_flags wlwf(thd, Gtid_log_event::FL_START_ALTER_E1,
+ true /* wsrep to isolation end */);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_thd_is_local(thd) &&
+ // no need to supply other than db in this case
+ wsrep_to_isolation_begin(thd, db, NULL,NULL,NULL,NULL,NULL))
+ return -1;
+#endif
+ if (write_bin_log(thd, false, thd->query(), thd->query_length()))
+ return -1;
+
+ my_ok(thd);
+ return rc;
+ }
+ if (!rgi->sa_info)
+ rgi->sa_info= get_new_start_alter_info(thd);
+ else
+ {
+ /* Not send Start-Alter into query execution when it's to rollback */
+ mysql_mutex_lock(&rgi->rli->mi->start_alter_lock);
+ if (rgi->sa_info->state == start_alter_state::ROLLBACK_ALTER)
+ mysql_cond_broadcast(&rgi->sa_info->start_alter_cond);
+ mysql_mutex_unlock(&rgi->rli->mi->start_alter_lock);
+ }
+
+ return rc;
+ }
+
+ bool is_CA= (gtid_flags_extra & Gtid_log_event::FL_COMMIT_ALTER_E1) ? true : false;
+ if (is_CA)
+ {
+ DBUG_EXECUTE_IF("rpl_slave_stop_CA_before_binlog",
+ {
+ // the awake comes from STOP-SLAVE running driver (sql) thread
+ debug_sync_set_action(thd,
+ STRING_WITH_LEN("now WAIT_FOR proceed_CA_1"));
+ });
+ }
+ start_alter_info *info=NULL;
+ Master_info *mi= NULL;
+
+ rgi->gtid_ev_sa_seq_no= sa_seq_no;
+ // is set for both the direct execution and the write to binlog
+ thd->set_binlog_start_alter_seq_no(sa_seq_no);
+ mi= rgi->rli->mi;
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ {
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while ((info= info_iterator++))
+ {
+ if(info->sa_seq_no == rgi->gtid_ev_sa_seq_no &&
+ info->domain_id == rgi->current_gtid.domain_id)
+ {
+ info_iterator.remove();
+ break;
+ }
+ }
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+
+ if (!info)
+ {
+ if (is_CA)
+ {
+ /*
+ error handeling, direct_commit_alter is turned on, so that we dont
+ wait for master reply in mysql_alter_table (in wait_for_master)
+ */
+ rgi->direct_commit_alter= true;
+#ifdef WITH_WSREP
+ if (WSREP(thd))
+ thd->set_binlog_flags_for_alter(Gtid_log_event::FL_COMMIT_ALTER_E1);
+#endif
+ goto cleanup;
+ }
+ else
+ {
+ //Just write the binlog because there is nothing to be done
+ goto write_binlog;
+ }
+ }
+
+ mysql_mutex_lock(&mi->start_alter_lock);
+ if (info->state != start_alter_state::COMPLETED)
+ {
+ if (is_CA)
+ info->state= start_alter_state::COMMIT_ALTER;
+ else
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ /*
+ Wait till Start Alter worker has changed the state to ::COMPLETED
+ when start alter worker reaches the old code write_bin_log(), it will
+ change state to COMMITTED.
+ COMMITTED and `direct_commit_alter == true` at the same time indicates
+ the query needs re-execution by the CA running thread.
+ */
+ mysql_mutex_lock(&mi->start_alter_lock);
+
+ DBUG_ASSERT(info->state == start_alter_state::COMPLETED ||
+ !info->direct_commit_alter);
+
+ while(info->state != start_alter_state::COMPLETED)
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+ }
+ else
+ {
+ // SA has completed and left being kicked out by deadlock or ftwrl
+ DBUG_ASSERT(info->direct_commit_alter);
+ }
+ mysql_mutex_unlock(&mi->start_alter_lock);
+
+ if (info->direct_commit_alter)
+ {
+ rgi->direct_commit_alter= true; // execute the query as if there was no SA
+ if (is_CA)
+ goto cleanup;
+ }
+
+write_binlog:
+ rc= 1;
+
+ if(!is_CA)
+ {
+ if(((info && info->error) || error_code) &&
+ global_system_variables.log_warnings > 2)
+ {
+ sql_print_information("Query '%s' having %d error code on master "
+ "is rolled back%s", query, error_code,
+ !(info && info->error) ? "." : ";");
+ if (info && info->error)
+ sql_print_information("its execution on slave %sproduced %d error.",
+ info->error == error_code ? "re":"", info->error);
+ }
+ }
+ {
+ thd->lex->sql_command= SQLCOM_ALTER_TABLE;
+ Write_log_with_flags wlwf(thd, is_CA ? Gtid_log_event::FL_COMMIT_ALTER_E1 :
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1,
+ true);
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_thd_is_local(thd) &&
+ wsrep_to_isolation_begin(thd, db, NULL,NULL,NULL,NULL,NULL))
+ rc= -1;
+#endif
+ if (rc != -1 &&
+ write_bin_log(thd, false, thd->query(), thd->query_length()))
+ rc= -1;
+ }
+
+ if (!thd->is_error())
+ {
+ skip_error_check= true;
+ my_ok(thd);
+ }
+
+cleanup:
+ if (info)
+ {
+ mysql_cond_destroy(&info->start_alter_cond);
+ my_free(info);
+ }
+ return rc;
+}
+
+
/**
@todo
Compare the values of "affected rows" around here. Something
@@ -1656,6 +1894,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
Relay_log_info const *rli= rgi->rli;
Rpl_filter *rpl_filter= rli->mi->rpl_filter;
bool current_stmt_is_commit;
+ bool skip_error_check= false;
DBUG_ENTER("Query_log_event::do_apply_event");
/*
@@ -1666,6 +1905,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
you.
*/
thd->catalog= catalog_len ? (char *) catalog : (char *)"";
+ rgi->start_alter_ev= this;
size_t valid_len= Well_formed_prefix(system_charset_info,
db, db_len, NAME_LEN).length();
@@ -1710,13 +1950,15 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
*/
if (is_trans_keyword() || rpl_filter->db_ok(thd->db.str))
{
+ bool is_rb_alter= gtid_flags_extra & Gtid_log_event::FL_ROLLBACK_ALTER_E1;
+
thd->set_time(when, when_sec_part);
thd->set_query_and_id((char*)query_arg, q_len_arg,
thd->charset(), next_query_id());
thd->variables.pseudo_thread_id= thread_id; // for temp tables
DBUG_PRINT("query",("%s", thd->query()));
- if (unlikely(!(expected_error= error_code)) ||
+ if (unlikely(!(expected_error= !is_rb_alter ? error_code : 0)) ||
ignored_error_code(expected_error) ||
!unexpected_error_code(expected_error))
{
@@ -1747,7 +1989,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
if (charset_inited)
{
rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info;
- if (sql_info->cached_charset_compare(charset))
+ if (thd->slave_thread && sql_info->cached_charset_compare(charset))
{
/* Verify that we support the charsets found in the event. */
if (!(thd->variables.character_set_client=
@@ -1885,47 +2127,69 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
thd->variables.option_bits|= OPTION_MASTER_SQL_ERROR;
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
}
- /* Execute the query (note that we bypass dispatch_command()) */
- Parser_state parser_state;
- if (!parser_state.init(thd, thd->query(), thd->query_length()))
+
+ int sa_result= 0;
+ bool is_2p_alter= gtid_flags_extra &
+ (Gtid_log_event::FL_START_ALTER_E1 |
+ Gtid_log_event::FL_COMMIT_ALTER_E1 |
+ Gtid_log_event::FL_ROLLBACK_ALTER_E1);
+ if (is_2p_alter)
+ sa_result= handle_split_alter_query_log_event(rgi, skip_error_check);
+ if (sa_result == 0)
{
- DBUG_ASSERT(thd->m_digest == NULL);
- thd->m_digest= & thd->m_digest_state;
- DBUG_ASSERT(thd->m_statement_psi == NULL);
- thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state,
- stmt_info_rpl.m_key,
- thd->db.str, thd->db.length,
- thd->charset(), NULL);
- THD_STAGE_INFO(thd, stage_starting);
- MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length());
- if (thd->m_digest != NULL)
- thd->m_digest->reset(thd->m_token_array, max_digest_length);
-
- if (thd->slave_thread)
- {
- /*
- To be compatible with previous releases, the slave thread uses the global
- log_slow_disabled_statements value, wich can be changed dynamically, so we
- have to set the sql_log_slow respectively.
- */
- thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE);
- }
-
- mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
- /* Finalize server status flags after executing a statement. */
- thd->update_server_status();
- log_slow_statement(thd);
- thd->lex->restore_set_statement_var();
+ /* Execute the query (note that we bypass dispatch_command()) */
+ Parser_state parser_state;
+ if (!parser_state.init(thd, thd->query(), thd->query_length()))
+ {
+ DBUG_ASSERT(thd->m_digest == NULL);
+ thd->m_digest= & thd->m_digest_state;
+ DBUG_ASSERT(thd->m_statement_psi == NULL);
+ thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state,
+ stmt_info_rpl.m_key,
+ thd->db.str, thd->db.length,
+ thd->charset(), NULL);
+ THD_STAGE_INFO(thd, stage_starting);
+ MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length());
+ if (thd->m_digest != NULL)
+ thd->m_digest->reset(thd->m_token_array, max_digest_length);
+
+ if (thd->slave_thread)
+ {
+ /*
+ To be compatible with previous releases, the slave thread uses the global
+ log_slow_disabled_statements value, wich can be changed dynamically, so we
+ have to set the sql_log_slow respectively.
+ */
+ thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE);
+ }
+ mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
+ /* Finalize server status flags after executing a statement. */
+ thd->update_server_status();
+ log_slow_statement(thd);
+ thd->lex->restore_set_statement_var();
- /*
- When THD::slave_expected_error gets reset inside execution stack
- that is the case of to be ignored event. In this case the expected
- error must change to the reset value as well.
- */
- expected_error= thd->slave_expected_error;
+ /*
+ When THD::slave_expected_error gets reset inside execution stack
+ that is the case of to be ignored event. In this case the expected
+ error must change to the reset value as well.
+ */
+ expected_error= thd->slave_expected_error;
+ }
+ }
+ else if (sa_result == -1)
+ {
+ rli->report(ERROR_LEVEL, expected_error, rgi->gtid_info(),
+ "TODO start alter error");
+ thd->is_slave_error= 1;
+ goto end;
}
-
thd->variables.option_bits&= ~OPTION_MASTER_SQL_ERROR;
+ if (is_2p_alter && !rgi->is_parallel_exec)
+ {
+ rgi->gtid_ev_flags_extra= 0;
+ rgi->direct_commit_alter= 0;
+ rgi->gtid_ev_sa_seq_no= 0;
+ }
}
else
{
@@ -1988,7 +2252,8 @@ compare_errors:
If we expected a non-zero error code, and we don't get the same error
code, and it should be ignored or is related to a concurrency issue.
*/
- actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
+ actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() :
+ skip_error_check? expected_error : 0;
DBUG_PRINT("info",("expected_error: %d sql_errno: %d",
expected_error, actual_error));
@@ -2384,6 +2649,39 @@ bool Format_description_log_event::write()
}
#if defined(HAVE_REPLICATION)
+/*
+ Auxiliary function to conduct cleanup of unfinished two-phase logged ALTERs.
+*/
+static void check_and_remove_stale_alter(Relay_log_info *rli)
+{
+ Master_info *mi= rli->mi;
+ start_alter_info *info=NULL;
+
+ mysql_mutex_lock(&mi->start_alter_list_lock);
+ List_iterator<start_alter_info> info_iterator(mi->start_alter_list);
+ while ((info= info_iterator++))
+ {
+ DBUG_ASSERT(info->state == start_alter_state::REGISTERED);
+
+ sql_print_warning("ALTER query started at %u-%u-%llu could not "
+ "be completed because of unexpected master server "
+ "or its binlog change", info->sa_seq_no, // todo:gtid
+ 0, 0);
+ info_iterator.remove();
+ mysql_mutex_lock(&mi->start_alter_lock);
+ info->state= start_alter_state::ROLLBACK_ALTER;
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ mysql_cond_broadcast(&info->start_alter_cond);
+ mysql_mutex_lock(&mi->start_alter_lock);
+ while(info->state != start_alter_state::COMPLETED)
+ mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock);
+ mysql_mutex_unlock(&mi->start_alter_lock);
+ mysql_cond_destroy(&info->start_alter_cond);
+ my_free(info);
+ }
+ mysql_mutex_unlock(&mi->start_alter_list_lock);
+}
+
int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
{
int ret= 0;
@@ -2401,16 +2699,21 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
original place when it comes to us; we'll know this by checking
log_pos ("artificial" events have log_pos == 0).
*/
- if (!thd->rli_fake &&
- !is_artificial_event() && created && thd->transaction->all.ha_list)
+ if (!is_artificial_event() && created && !thd->rli_fake && !thd->rgi_fake)
{
- /* This is not an error (XA is safe), just an information */
- rli->report(INFORMATION_LEVEL, 0, NULL,
- "Rolling back unfinished transaction (no COMMIT "
- "or ROLLBACK in relay log). A probable cause is that "
- "the master died while writing the transaction to "
- "its binary log, thus rolled back too.");
- rgi->cleanup_context(thd, 1);
+ // check_and_remove stale Start Alter:s
+ if (flags & LOG_EVENT_BINLOG_IN_USE_F)
+ check_and_remove_stale_alter(rli);
+ if (thd->transaction->all.ha_list)
+ {
+ /* This is not an error (XA is safe), just an information */
+ rli->report(INFORMATION_LEVEL, 0, NULL,
+ "Rolling back unfinished transaction (no COMMIT "
+ "or ROLLBACK in relay log). A probable cause is that "
+ "the master died while writing the transaction to "
+ "its binary log, thus rolled back too.");
+ rgi->cleanup_context(thd, 1);
+ }
}
/*
@@ -3330,7 +3633,14 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
extra_engines= count > 1 ? 0 : UCHAR_MAX;
}
if (extra_engines > 0)
- flags_extra|= FL_EXTRA_MULTI_ENGINE;
+ flags_extra|= FL_EXTRA_MULTI_ENGINE_E1;
+ }
+ if (thd->get_binlog_flags_for_alter())
+ {
+ flags_extra |= thd->get_binlog_flags_for_alter();
+ if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+ sa_seq_no= thd->get_binlog_start_alter_seq_no();
+ flags2|= FL_DDL;
}
}
@@ -3403,12 +3713,18 @@ Gtid_log_event::write()
buf[write_len]= flags_extra;
write_len++;
}
- if (flags_extra & FL_EXTRA_MULTI_ENGINE)
+ if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1)
{
buf[write_len]= extra_engines;
write_len++;
}
+ if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+ {
+ int8store(buf + write_len, sa_seq_no);
+ write_len+= 8;
+ }
+
if (write_len < GTID_HEADER_LEN)
{
bzero(buf+write_len, GTID_HEADER_LEN-write_len);
@@ -3472,6 +3788,20 @@ Gtid_log_event::pack_info(Protocol *protocol)
p= strmov(p, " cid=");
p= longlong10_to_str(commit_id, p, 10);
}
+ if (flags_extra & FL_START_ALTER_E1)
+ {
+ p= strmov(p, " START ALTER");
+ }
+ if (flags_extra & FL_COMMIT_ALTER_E1)
+ {
+ p= strmov(p, " COMMIT ALTER id=");
+ p= longlong10_to_str(sa_seq_no, p, 10);
+ }
+ if (flags_extra & FL_ROLLBACK_ALTER_E1)
+ {
+ p= strmov(p, " ROLLBACK ALTER id=");
+ p= longlong10_to_str(sa_seq_no, p, 10);
+ }
protocol->store(buf, p-buf, &my_charset_bin);
}
@@ -3486,6 +3816,9 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
thd->variables.gtid_domain_id= this->domain_id;
thd->variables.gtid_seq_no= this->seq_no;
rgi->gtid_ev_flags2= flags2;
+
+ rgi->gtid_ev_flags_extra= flags_extra;
+ rgi->gtid_ev_sa_seq_no= sa_seq_no;
thd->reset_for_next_command();
if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
@@ -3756,6 +4089,12 @@ Gtid_list_log_event::pack_info(Protocol *protocol)
uint32 i;
bool first;
+ /*
+ For output consistency and ease of reading, we sort the GTID list in
+ ascending order
+ */
+ qsort(list, count, sizeof(rpl_gtid), compare_glle_gtids);
+
buf.length(0);
buf.append(STRING_WITH_LEN("["));
first= true;