summaryrefslogtreecommitdiff
path: root/sql/log_event_server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log_event_server.cc')
-rw-r--r--sql/log_event_server.cc200
1 files changed, 137 insertions, 63 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 3910d910da1..6c71381a1fb 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -153,6 +153,30 @@ is_parallel_retry_error(rpl_group_info *rgi, int err)
return has_temporary_error(rgi->thd);
}
+/**
+ Accumulate a Diagnostics_area's errors and warnings into an output buffer
+
+ @param errbuf The output buffer to write error messages
+ @param errbuf_size The size of the output buffer
+ @param da The Diagnostics_area to check for errors
+*/
+static void inline aggregate_da_errors(char *errbuf, size_t errbuf_size,
+ Diagnostics_area *da)
+{
+ const char *errbuf_end= errbuf + errbuf_size;
+ char *slider;
+ Diagnostics_area::Sql_condition_iterator it= da->sql_conditions();
+ const Sql_condition *err;
+ size_t len;
+ for (err= it++, slider= errbuf; err && slider < errbuf_end - 1;
+ slider += len, err= it++)
+ {
+ len= my_snprintf(slider, errbuf_end - slider,
+ " %s, Error_code: %d;", err->get_message_text(),
+ err->get_sql_errno());
+ }
+}
+
/**
Error reporting facility for Rows_log_event::do_apply_event
@@ -173,13 +197,8 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
const char *log_name, my_off_t pos)
{
const char *handler_error= (ha_error ? HA_ERR(ha_error) : NULL);
- char buff[MAX_SLAVE_ERRMSG], *slider;
- const char *buff_end= buff + sizeof(buff);
- size_t len;
- Diagnostics_area::Sql_condition_iterator it=
- thd->get_stmt_da()->sql_conditions();
+ char buff[MAX_SLAVE_ERRMSG];
Relay_log_info const *rli= rgi->rli;
- const Sql_condition *err;
buff[0]= 0;
int errcode= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
@@ -192,13 +211,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
if (is_parallel_retry_error(rgi, errcode))
return;
- for (err= it++, slider= buff; err && slider < buff_end - 1;
- slider += len, err= it++)
- {
- len= my_snprintf(slider, buff_end - slider,
- " %s, Error_code: %d;", err->get_message_text(),
- err->get_sql_errno());
- }
+ aggregate_da_errors(buff, sizeof(buff), thd->get_stmt_da());
if (ha_error != 0 && !thd->killed)
rli->report(level, errcode, rgi->gtid_info(),
@@ -3571,7 +3584,8 @@ bool slave_execute_deferred_events(THD *thd)
#if defined(HAVE_REPLICATION)
int Xid_apply_log_event::do_record_gtid(THD *thd, rpl_group_info *rgi,
- bool in_trans, void **out_hton)
+ bool in_trans, void **out_hton,
+ bool force_err)
{
int err= 0;
Relay_log_info const *rli= rgi->rli;
@@ -3586,14 +3600,26 @@ int Xid_apply_log_event::do_record_gtid(THD *thd, rpl_group_info *rgi,
int ec= thd->get_stmt_da()->sql_errno();
/*
Do not report an error if this is really a kill due to a deadlock.
- In this case, the transaction will be re-tried instead.
+ In this case, the transaction will be re-tried instead. Unless force_err
+ is set, as in the case of XA PREPARE, as the GTID state is updated as a
+ separate transaction, and if that fails, we should not retry but exit in
+ error immediately.
*/
- if (!is_parallel_retry_error(rgi, ec))
+ if (!is_parallel_retry_error(rgi, ec) || force_err)
+ {
+ char buff[MAX_SLAVE_ERRMSG];
+ buff[0]= 0;
+ aggregate_da_errors(buff, sizeof(buff), thd->get_stmt_da());
+
+ if (force_err)
+ thd->clear_error();
+
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
"Error during XID COMMIT: failed to update GTID state in "
- "%s.%s: %d: %s",
+ "%s.%s: %d: %s the event's master log %s, end_log_pos %llu",
"mysql", rpl_gtid_slave_state_table_name.str, ec,
- thd->get_stmt_da()->message());
+ buff, RPL_LOG_NAME, log_pos);
+ }
thd->is_slave_error= 1;
}
@@ -3667,7 +3693,7 @@ int Xid_apply_log_event::do_apply_event(rpl_group_info *rgi)
{
DBUG_ASSERT(!thd->transaction->xid_state.is_explicit_XA());
- if ((err= do_record_gtid(thd, rgi, false, &hton)))
+ if ((err= do_record_gtid(thd, rgi, false, &hton, true)))
return err;
}
@@ -4988,7 +5014,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
to avoid query cache being polluted with stale entries,
*/
# ifdef WITH_WSREP
- if (!WSREP(thd) && !wsrep_thd_is_applying(thd))
+ /* Query cache is not invalidated on wsrep applier here */
+ if (!(WSREP(thd) && wsrep_thd_is_applying(thd)))
# endif /* WITH_WSREP */
query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock);
#endif /* HAVE_QUERY_CACHE */
@@ -6871,8 +6898,18 @@ Rows_log_event::write_row(rpl_group_info *rgi,
int Rows_log_event::update_sequence()
{
TABLE *table= m_table; // pointer to event's table
+ bool old_master= false;
+ int err= 0;
- if (!bitmap_is_set(table->rpl_write_set, MIN_VALUE_FIELD_NO))
+ if (!bitmap_is_set(table->rpl_write_set, MIN_VALUE_FIELD_NO) ||
+ (
+#if defined(WITH_WSREP)
+ ! WSREP(thd) &&
+#endif
+ !(table->in_use->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_DDL) &&
+ !(old_master=
+ rpl_master_has_bug(thd->rgi_slave->rli,
+ 29621, FALSE, FALSE, FALSE, TRUE))))
{
/* This event come from a setval function executed on the master.
Update the sequence next_number and round, like we do with setval()
@@ -6885,12 +6922,27 @@ int Rows_log_event::update_sequence()
return table->s->sequence->set_value(table, nextval, round, 0) > 0;
}
-
+ if (old_master && !WSREP(thd) && thd->rgi_slave->is_parallel_exec)
+ {
+ DBUG_ASSERT(thd->rgi_slave->parallel_entry);
+ /*
+ With parallel replication enabled, we can't execute alongside any other
+ transaction in which we may depend, so we force retry to release
+ the server layer table lock for possible prior in binlog order
+ same table transactions.
+ */
+ if (thd->rgi_slave->parallel_entry->last_committed_sub_id <
+ thd->rgi_slave->wait_commit_sub_id)
+ {
+ err= ER_LOCK_DEADLOCK;
+ my_error(err, MYF(0));
+ }
+ }
/*
Update all fields in table and update the active sequence, like with
ALTER SEQUENCE
*/
- return table->file->ha_write_row(table->record[0]);
+ return err == 0 ? table->file->ha_write_row(table->record[0]) : err;
}
@@ -6906,19 +6958,21 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi)
const char *tmp= thd->get_proc_info();
LEX_CSTRING tmp_db= thd->db;
char *message, msg[128];
- const char *table_name= m_table->s->table_name.str;
- char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name));
- my_snprintf(msg, sizeof(msg),"Write_rows_log_event::write_row() on table %c%s%c",
- quote_char, table_name, quote_char);
+ const LEX_CSTRING &table_name= m_table->s->table_name;
+ const char quote_char=
+ get_quote_char_for_identifier(thd, table_name.str, table_name.length);
+ my_snprintf(msg, sizeof msg,
+ "Write_rows_log_event::write_row() on table %c%.*s%c",
+ quote_char, int(table_name.length), table_name.str, quote_char);
thd->reset_db(&m_table->s->db);
message= msg;
int error;
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Write_rows_log_event::write_row(%lld) on table %c%s%c",
- (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name,
- quote_char);
+ "Write_rows_log_event::write_row(%lld) on table %c%.*s%c",
+ (long long) wsrep_thd_trx_seqno(thd), quote_char,
+ int(table_name.length), table_name.str, quote_char);
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
@@ -6957,7 +7011,7 @@ uint8 Write_rows_log_event::get_trg_event_map()
Returns TRUE if different.
*/
-static bool record_compare(TABLE *table)
+static bool record_compare(TABLE *table, bool vers_from_plain= false)
{
bool result= FALSE;
/**
@@ -6990,10 +7044,19 @@ static bool record_compare(TABLE *table)
/* Compare fields */
for (Field **ptr=table->field ; *ptr ; ptr++)
{
- if (table->versioned() && (*ptr)->vers_sys_field())
- {
+ /*
+ If the table is versioned, don't compare using the version if there is a
+ primary key. If there isn't a primary key, we need the version to
+ identify the correct record if there are duplicate rows in the data set.
+ However, if the primary server is unversioned (vers_from_plain is true),
+ then we implicitly use row_end as the primary key on our side. This is
+ because the implicit row_end value will be set to the maximum value for
+ the latest row update (which is what we care about).
+ */
+ if (table->versioned() && (*ptr)->vers_sys_field() &&
+ (table->s->primary_key < MAX_KEY ||
+ (vers_from_plain && table->vers_start_field() == (*ptr))))
continue;
- }
/**
We only compare field contents that are not null.
NULL fields (i.e., their null bits) were compared
@@ -7387,7 +7450,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
/* We use this to test that the correct key is used in test cases. */
DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort(););
- while (record_compare(table))
+ while (record_compare(table, m_vers_from_plain))
{
while ((error= table->file->ha_index_next(table->record[0])))
{
@@ -7440,7 +7503,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
goto end;
}
}
- while (record_compare(table));
+ while (record_compare(table, m_vers_from_plain));
/*
Note: above record_compare will take into accout all record fields
@@ -7528,10 +7591,12 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
const char *tmp= thd->get_proc_info();
LEX_CSTRING tmp_db= thd->db;
char *message, msg[128];
- const char *table_name= m_table->s->table_name.str;
- char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name));
- my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::find_row() on table %c%s%c",
- quote_char, table_name, quote_char);
+ const LEX_CSTRING &table_name= m_table->s->table_name;
+ const char quote_char=
+ get_quote_char_for_identifier(thd, table_name.str, table_name.length);
+ my_snprintf(msg, sizeof msg,
+ "Delete_rows_log_event::find_row() on table %c%.*s%c",
+ quote_char, int(table_name.length), table_name.str, quote_char);
thd->reset_db(&m_table->s->db);
message= msg;
const bool invoke_triggers= (m_table->triggers && do_invoke_trigger());
@@ -7539,26 +7604,29 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi)
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Delete_rows_log_event::find_row(%lld) on table %c%s%c",
- (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name,
+ "Delete_rows_log_event::find_row(%lld) on table %c%.*s%c",
+ (long long) wsrep_thd_trx_seqno(thd), quote_char,
+ int(table_name.length), table_name.str,
quote_char);
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
thd_proc_info(thd, message);
if (likely(!(error= find_row(rgi))))
- {
+ {
/*
Delete the record found, located in record[0]
*/
- my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::ha_delete_row() on table %c%s%c",
- quote_char, table_name, quote_char);
+ my_snprintf(msg, sizeof msg,
+ "Delete_rows_log_event::ha_delete_row() on table %c%.*s%c",
+ quote_char, int(table_name.length), table_name.str,
+ quote_char);
message= msg;
#ifdef WSREP_PROC_INFO
snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Delete_rows_log_event::ha_delete_row(%lld) on table %c%s%c",
- (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name,
- quote_char);
+ "Delete_rows_log_event::ha_delete_row(%lld) on table %c%.*s%c",
+ (long long) wsrep_thd_trx_seqno(thd), quote_char,
+ int(table_name.length), table_name.str, quote_char);
message= thd->wsrep_info;
#endif
thd_proc_info(thd, message);
@@ -7690,17 +7758,20 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
DBUG_ASSERT(m_table != NULL);
LEX_CSTRING tmp_db= thd->db;
char *message, msg[128];
- const char *table_name= m_table->s->table_name.str;
- char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name));
- my_snprintf(msg, sizeof(msg),"Update_rows_log_event::find_row() on table %c%s%c",
- quote_char, table_name, quote_char);
+ const LEX_CSTRING &table_name= m_table->s->table_name;
+ const char quote_char=
+ get_quote_char_for_identifier(thd, table_name.str, table_name.length);
+ my_snprintf(msg, sizeof msg,
+ "Update_rows_log_event::find_row() on table %c%.*s%c",
+ quote_char, int(table_name.length), table_name.str, quote_char);
thd->reset_db(&m_table->s->db);
message= msg;
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Update_rows_log_event::find_row(%lld) on table %c%s%c",
- (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name,
+ "Update_rows_log_event::find_row(%lld) on table %c%.*s%c",
+ (long long) wsrep_thd_trx_seqno(thd), quote_char,
+ int(table_name.length), table_name.str,
quote_char);
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
@@ -7740,14 +7811,15 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
store_record(m_table,record[1]);
m_curr_row= m_curr_row_end;
- my_snprintf(msg, sizeof(msg),"Update_rows_log_event::unpack_current_row() on table %c%s%c",
- quote_char, table_name, quote_char);
+ my_snprintf(msg, sizeof msg,
+ "Update_rows_log_event::unpack_current_row() on table %c%.*s%c",
+ quote_char, int(table_name.length), table_name.str, quote_char);
message= msg;
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Update_rows_log_event::unpack_current_row(%lld) on table %c%s%c",
- (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name,
- quote_char);
+ "Update_rows_log_event::unpack_current_row(%lld) on table %c%.*s%c",
+ (long long) wsrep_thd_trx_seqno(thd), quote_char,
+ int(table_name.length), table_name.str, quote_char);
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */
@@ -7770,13 +7842,15 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi)
DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength);
#endif
- my_snprintf(msg, sizeof(msg),"Update_rows_log_event::ha_update_row() on table %c%s%c",
- quote_char, table_name, quote_char);
+ my_snprintf(msg, sizeof msg,
+ "Update_rows_log_event::ha_update_row() on table %c%.*s%c",
+ quote_char, int(table_name.length), table_name.str, quote_char);
message= msg;
#ifdef WSREP_PROC_INFO
my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1,
- "Update_rows_log_event::ha_update_row(%lld) on table %c%s%c",
- (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, quote_char);
+ "Update_rows_log_event::ha_update_row(%lld) on table %c%.*s%c",
+ (long long) wsrep_thd_trx_seqno(thd), quote_char,
+ int(table_name.length), table_name.str, quote_char);
message= thd->wsrep_info;
#endif /* WSREP_PROC_INFO */