diff options
Diffstat (limited to 'sql/log_event_server.cc')
-rw-r--r-- | sql/log_event_server.cc | 200 |
1 files changed, 137 insertions, 63 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 3910d910da1..6c71381a1fb 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -153,6 +153,30 @@ is_parallel_retry_error(rpl_group_info *rgi, int err) return has_temporary_error(rgi->thd); } +/** + Accumulate a Diagnostics_area's errors and warnings into an output buffer + + @param errbuf The output buffer to write error messages + @param errbuf_size The size of the output buffer + @param da The Diagnostics_area to check for errors +*/ +static void inline aggregate_da_errors(char *errbuf, size_t errbuf_size, + Diagnostics_area *da) +{ + const char *errbuf_end= errbuf + errbuf_size; + char *slider; + Diagnostics_area::Sql_condition_iterator it= da->sql_conditions(); + const Sql_condition *err; + size_t len; + for (err= it++, slider= errbuf; err && slider < errbuf_end - 1; + slider += len, err= it++) + { + len= my_snprintf(slider, errbuf_end - slider, + " %s, Error_code: %d;", err->get_message_text(), + err->get_sql_errno()); + } +} + /** Error reporting facility for Rows_log_event::do_apply_event @@ -173,13 +197,8 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error, const char *log_name, my_off_t pos) { const char *handler_error= (ha_error ? HA_ERR(ha_error) : NULL); - char buff[MAX_SLAVE_ERRMSG], *slider; - const char *buff_end= buff + sizeof(buff); - size_t len; - Diagnostics_area::Sql_condition_iterator it= - thd->get_stmt_da()->sql_conditions(); + char buff[MAX_SLAVE_ERRMSG]; Relay_log_info const *rli= rgi->rli; - const Sql_condition *err; buff[0]= 0; int errcode= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0; @@ -192,13 +211,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error, if (is_parallel_retry_error(rgi, errcode)) return; - for (err= it++, slider= buff; err && slider < buff_end - 1; - slider += len, err= it++) - { - len= my_snprintf(slider, buff_end - slider, - " %s, Error_code: %d;", err->get_message_text(), - err->get_sql_errno()); - } + aggregate_da_errors(buff, sizeof(buff), thd->get_stmt_da()); if (ha_error != 0 && !thd->killed) rli->report(level, errcode, rgi->gtid_info(), @@ -3571,7 +3584,8 @@ bool slave_execute_deferred_events(THD *thd) #if defined(HAVE_REPLICATION) int Xid_apply_log_event::do_record_gtid(THD *thd, rpl_group_info *rgi, - bool in_trans, void **out_hton) + bool in_trans, void **out_hton, + bool force_err) { int err= 0; Relay_log_info const *rli= rgi->rli; @@ -3586,14 +3600,26 @@ int Xid_apply_log_event::do_record_gtid(THD *thd, rpl_group_info *rgi, int ec= thd->get_stmt_da()->sql_errno(); /* Do not report an error if this is really a kill due to a deadlock. - In this case, the transaction will be re-tried instead. + In this case, the transaction will be re-tried instead. Unless force_err + is set, as in the case of XA PREPARE, as the GTID state is updated as a + separate transaction, and if that fails, we should not retry but exit in + error immediately. */ - if (!is_parallel_retry_error(rgi, ec)) + if (!is_parallel_retry_error(rgi, ec) || force_err) + { + char buff[MAX_SLAVE_ERRMSG]; + buff[0]= 0; + aggregate_da_errors(buff, sizeof(buff), thd->get_stmt_da()); + + if (force_err) + thd->clear_error(); + rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(), "Error during XID COMMIT: failed to update GTID state in " - "%s.%s: %d: %s", + "%s.%s: %d: %s the event's master log %s, end_log_pos %llu", "mysql", rpl_gtid_slave_state_table_name.str, ec, - thd->get_stmt_da()->message()); + buff, RPL_LOG_NAME, log_pos); + } thd->is_slave_error= 1; } @@ -3667,7 +3693,7 @@ int Xid_apply_log_event::do_apply_event(rpl_group_info *rgi) { DBUG_ASSERT(!thd->transaction->xid_state.is_explicit_XA()); - if ((err= do_record_gtid(thd, rgi, false, &hton))) + if ((err= do_record_gtid(thd, rgi, false, &hton, true))) return err; } @@ -4988,7 +5014,8 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) to avoid query cache being polluted with stale entries, */ # ifdef WITH_WSREP - if (!WSREP(thd) && !wsrep_thd_is_applying(thd)) + /* Query cache is not invalidated on wsrep applier here */ + if (!(WSREP(thd) && wsrep_thd_is_applying(thd))) # endif /* WITH_WSREP */ query_cache.invalidate_locked_for_write(thd, rgi->tables_to_lock); #endif /* HAVE_QUERY_CACHE */ @@ -6871,8 +6898,18 @@ Rows_log_event::write_row(rpl_group_info *rgi, int Rows_log_event::update_sequence() { TABLE *table= m_table; // pointer to event's table + bool old_master= false; + int err= 0; - if (!bitmap_is_set(table->rpl_write_set, MIN_VALUE_FIELD_NO)) + if (!bitmap_is_set(table->rpl_write_set, MIN_VALUE_FIELD_NO) || + ( +#if defined(WITH_WSREP) + ! WSREP(thd) && +#endif + !(table->in_use->rgi_slave->gtid_ev_flags2 & Gtid_log_event::FL_DDL) && + !(old_master= + rpl_master_has_bug(thd->rgi_slave->rli, + 29621, FALSE, FALSE, FALSE, TRUE)))) { /* This event come from a setval function executed on the master. Update the sequence next_number and round, like we do with setval() @@ -6885,12 +6922,27 @@ int Rows_log_event::update_sequence() return table->s->sequence->set_value(table, nextval, round, 0) > 0; } - + if (old_master && !WSREP(thd) && thd->rgi_slave->is_parallel_exec) + { + DBUG_ASSERT(thd->rgi_slave->parallel_entry); + /* + With parallel replication enabled, we can't execute alongside any other + transaction in which we may depend, so we force retry to release + the server layer table lock for possible prior in binlog order + same table transactions. + */ + if (thd->rgi_slave->parallel_entry->last_committed_sub_id < + thd->rgi_slave->wait_commit_sub_id) + { + err= ER_LOCK_DEADLOCK; + my_error(err, MYF(0)); + } + } /* Update all fields in table and update the active sequence, like with ALTER SEQUENCE */ - return table->file->ha_write_row(table->record[0]); + return err == 0 ? table->file->ha_write_row(table->record[0]) : err; } @@ -6906,19 +6958,21 @@ Write_rows_log_event::do_exec_row(rpl_group_info *rgi) const char *tmp= thd->get_proc_info(); LEX_CSTRING tmp_db= thd->db; char *message, msg[128]; - const char *table_name= m_table->s->table_name.str; - char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name)); - my_snprintf(msg, sizeof(msg),"Write_rows_log_event::write_row() on table %c%s%c", - quote_char, table_name, quote_char); + const LEX_CSTRING &table_name= m_table->s->table_name; + const char quote_char= + get_quote_char_for_identifier(thd, table_name.str, table_name.length); + my_snprintf(msg, sizeof msg, + "Write_rows_log_event::write_row() on table %c%.*s%c", + quote_char, int(table_name.length), table_name.str, quote_char); thd->reset_db(&m_table->s->db); message= msg; int error; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Write_rows_log_event::write_row(%lld) on table %c%s%c", - (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, - quote_char); + "Write_rows_log_event::write_row(%lld) on table %c%.*s%c", + (long long) wsrep_thd_trx_seqno(thd), quote_char, + int(table_name.length), table_name.str, quote_char); message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ @@ -6957,7 +7011,7 @@ uint8 Write_rows_log_event::get_trg_event_map() Returns TRUE if different. */ -static bool record_compare(TABLE *table) +static bool record_compare(TABLE *table, bool vers_from_plain= false) { bool result= FALSE; /** @@ -6990,10 +7044,19 @@ static bool record_compare(TABLE *table) /* Compare fields */ for (Field **ptr=table->field ; *ptr ; ptr++) { - if (table->versioned() && (*ptr)->vers_sys_field()) - { + /* + If the table is versioned, don't compare using the version if there is a + primary key. If there isn't a primary key, we need the version to + identify the correct record if there are duplicate rows in the data set. + However, if the primary server is unversioned (vers_from_plain is true), + then we implicitly use row_end as the primary key on our side. This is + because the implicit row_end value will be set to the maximum value for + the latest row update (which is what we care about). + */ + if (table->versioned() && (*ptr)->vers_sys_field() && + (table->s->primary_key < MAX_KEY || + (vers_from_plain && table->vers_start_field() == (*ptr)))) continue; - } /** We only compare field contents that are not null. NULL fields (i.e., their null bits) were compared @@ -7387,7 +7450,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi) /* We use this to test that the correct key is used in test cases. */ DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort();); - while (record_compare(table)) + while (record_compare(table, m_vers_from_plain)) { while ((error= table->file->ha_index_next(table->record[0]))) { @@ -7440,7 +7503,7 @@ int Rows_log_event::find_row(rpl_group_info *rgi) goto end; } } - while (record_compare(table)); + while (record_compare(table, m_vers_from_plain)); /* Note: above record_compare will take into accout all record fields @@ -7528,10 +7591,12 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) const char *tmp= thd->get_proc_info(); LEX_CSTRING tmp_db= thd->db; char *message, msg[128]; - const char *table_name= m_table->s->table_name.str; - char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name)); - my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::find_row() on table %c%s%c", - quote_char, table_name, quote_char); + const LEX_CSTRING &table_name= m_table->s->table_name; + const char quote_char= + get_quote_char_for_identifier(thd, table_name.str, table_name.length); + my_snprintf(msg, sizeof msg, + "Delete_rows_log_event::find_row() on table %c%.*s%c", + quote_char, int(table_name.length), table_name.str, quote_char); thd->reset_db(&m_table->s->db); message= msg; const bool invoke_triggers= (m_table->triggers && do_invoke_trigger()); @@ -7539,26 +7604,29 @@ int Delete_rows_log_event::do_exec_row(rpl_group_info *rgi) #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Delete_rows_log_event::find_row(%lld) on table %c%s%c", - (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, + "Delete_rows_log_event::find_row(%lld) on table %c%.*s%c", + (long long) wsrep_thd_trx_seqno(thd), quote_char, + int(table_name.length), table_name.str, quote_char); message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ thd_proc_info(thd, message); if (likely(!(error= find_row(rgi)))) - { + { /* Delete the record found, located in record[0] */ - my_snprintf(msg, sizeof(msg),"Delete_rows_log_event::ha_delete_row() on table %c%s%c", - quote_char, table_name, quote_char); + my_snprintf(msg, sizeof msg, + "Delete_rows_log_event::ha_delete_row() on table %c%.*s%c", + quote_char, int(table_name.length), table_name.str, + quote_char); message= msg; #ifdef WSREP_PROC_INFO snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Delete_rows_log_event::ha_delete_row(%lld) on table %c%s%c", - (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, - quote_char); + "Delete_rows_log_event::ha_delete_row(%lld) on table %c%.*s%c", + (long long) wsrep_thd_trx_seqno(thd), quote_char, + int(table_name.length), table_name.str, quote_char); message= thd->wsrep_info; #endif thd_proc_info(thd, message); @@ -7690,17 +7758,20 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) DBUG_ASSERT(m_table != NULL); LEX_CSTRING tmp_db= thd->db; char *message, msg[128]; - const char *table_name= m_table->s->table_name.str; - char quote_char= get_quote_char_for_identifier(thd, STRING_WITH_LEN(table_name)); - my_snprintf(msg, sizeof(msg),"Update_rows_log_event::find_row() on table %c%s%c", - quote_char, table_name, quote_char); + const LEX_CSTRING &table_name= m_table->s->table_name; + const char quote_char= + get_quote_char_for_identifier(thd, table_name.str, table_name.length); + my_snprintf(msg, sizeof msg, + "Update_rows_log_event::find_row() on table %c%.*s%c", + quote_char, int(table_name.length), table_name.str, quote_char); thd->reset_db(&m_table->s->db); message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::find_row(%lld) on table %c%s%c", - (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, + "Update_rows_log_event::find_row(%lld) on table %c%.*s%c", + (long long) wsrep_thd_trx_seqno(thd), quote_char, + int(table_name.length), table_name.str, quote_char); message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ @@ -7740,14 +7811,15 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) store_record(m_table,record[1]); m_curr_row= m_curr_row_end; - my_snprintf(msg, sizeof(msg),"Update_rows_log_event::unpack_current_row() on table %c%s%c", - quote_char, table_name, quote_char); + my_snprintf(msg, sizeof msg, + "Update_rows_log_event::unpack_current_row() on table %c%.*s%c", + quote_char, int(table_name.length), table_name.str, quote_char); message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::unpack_current_row(%lld) on table %c%s%c", - (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, - quote_char); + "Update_rows_log_event::unpack_current_row(%lld) on table %c%.*s%c", + (long long) wsrep_thd_trx_seqno(thd), quote_char, + int(table_name.length), table_name.str, quote_char); message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ @@ -7770,13 +7842,15 @@ Update_rows_log_event::do_exec_row(rpl_group_info *rgi) DBUG_DUMP("new values", m_table->record[0], m_table->s->reclength); #endif - my_snprintf(msg, sizeof(msg),"Update_rows_log_event::ha_update_row() on table %c%s%c", - quote_char, table_name, quote_char); + my_snprintf(msg, sizeof msg, + "Update_rows_log_event::ha_update_row() on table %c%.*s%c", + quote_char, int(table_name.length), table_name.str, quote_char); message= msg; #ifdef WSREP_PROC_INFO my_snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Update_rows_log_event::ha_update_row(%lld) on table %c%s%c", - (long long) wsrep_thd_trx_seqno(thd), quote_char, table_name, quote_char); + "Update_rows_log_event::ha_update_row(%lld) on table %c%.*s%c", + (long long) wsrep_thd_trx_seqno(thd), quote_char, + int(table_name.length), table_name.str, quote_char); message= thd->wsrep_info; #endif /* WSREP_PROC_INFO */ |