diff options
author | Brave Galera Crew <devel@codership.com> | 2019-01-23 15:30:00 +0400 |
---|---|---|
committer | Sergey Vojtovich <svoj@mariadb.org> | 2019-01-23 15:30:00 +0400 |
commit | 36a2a185fe18d31a644da46cfabd9757a379280c (patch) | |
tree | 00ca186ce2cfdc3ab7e4979336a384e2b51c5aa9 /sql/wsrep_applier.cc | |
parent | 382115b99297ceaa4c3067f79efb5c2515013be5 (diff) | |
download | mariadb-git-36a2a185fe18d31a644da46cfabd9757a379280c.tar.gz |
Galera4
Diffstat (limited to 'sql/wsrep_applier.cc')
-rw-r--r-- | sql/wsrep_applier.cc | 320 |
1 files changed, 72 insertions, 248 deletions
diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index 1f50ee55711..2c4dab3bd20 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -14,12 +14,17 @@ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include "mariadb.h" +#include "mysql/service_wsrep.h" +#include "wsrep_applier.h" + #include "wsrep_priv.h" #include "wsrep_binlog.h" // wsrep_dump_rbr_buf() #include "wsrep_xid.h" +#include "wsrep_thd.h" +#include "wsrep_trans_observer.h" +#include "slave.h" // opt_log_slave_updates #include "log_event.h" // class THD, EVENT_LEN_OFFSET, etc. -#include "wsrep_applier.h" #include "debug_sync.h" /* @@ -27,7 +32,6 @@ At the end (*buf) is shitfed to point to the following event or NULL and (*buf_len) will be changed to account just being read bytes of the 1st event. */ - static Log_event* wsrep_read_log_event( char **arg_buf, size_t *arg_buf_len, const Format_description_log_event *description_event) @@ -35,7 +39,7 @@ static Log_event* wsrep_read_log_event( DBUG_ENTER("wsrep_read_log_event"); char *head= (*arg_buf); - uint data_len = uint4korr(head + EVENT_LEN_OFFSET); + uint data_len= uint4korr(head + EVENT_LEN_OFFSET); char *buf= (*arg_buf); const char *error= 0; Log_event *res= 0; @@ -62,12 +66,13 @@ void wsrep_set_apply_format(THD* thd, Format_description_log_event* ev) { if (thd->wsrep_apply_format) { - delete (Format_description_log_event*)thd->wsrep_apply_format; + delete (Format_description_log_event*)thd->wsrep_apply_format; } thd->wsrep_apply_format= ev; } -Format_description_log_event* wsrep_get_apply_format(THD* thd) +Format_description_log_event* +wsrep_get_apply_format(THD* thd) { if (thd->wsrep_apply_format) { @@ -79,45 +84,77 @@ Format_description_log_event* wsrep_get_apply_format(THD* thd) return thd->wsrep_rgi->rli->relay_log.description_event_for_exec; } -static wsrep_cb_status_t wsrep_apply_events(THD* thd, - const void* events_buf, - size_t buf_len) +void wsrep_apply_error::store(const THD* const thd) { - char *buf= (char *)events_buf; - int rcode= 0; - int event= 1; - Log_event_type typ; + Diagnostics_area::Sql_condition_iterator it= + thd->get_stmt_da()->sql_conditions(); + const Sql_condition* cond; - DBUG_ENTER("wsrep_apply_events"); + static size_t const max_len= 2*MAX_SLAVE_ERRMSG; // 2x so that we have enough + + if (NULL == str_) + { + // this must be freeable by standard free() + str_= static_cast<char*>(malloc(max_len)); + if (NULL == str_) + { + WSREP_ERROR("Failed to allocate %zu bytes for error buffer.", max_len); + len_= 0; + return; + } + } + else + { + /* This is possible when we invoke rollback after failed applying. + * In this situation DA should not be reset yet and should contain + * all previous errors from applying and new ones from rollbacking, + * so we just overwrite is from scratch */ + } - if (thd->killed == KILL_CONNECTION && - thd->wsrep_conflict_state != REPLAYING) + char* slider= str_; + const char* const buf_end= str_ + max_len - 1; // -1: leave space for \0 + + for (cond= it++; cond && slider < buf_end; cond= it++) { - WSREP_INFO("applier has been aborted, skipping apply_rbr: %lld", - (long long) wsrep_thd_trx_seqno(thd)); - DBUG_RETURN(WSREP_CB_FAILURE); + uint const err_code= cond->get_sql_errno(); + const char* const err_str= cond->get_message_text(); + + slider+= my_snprintf(slider, buf_end - slider, " %s, Error_code: %d;", + err_str, err_code); } - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->wsrep_query_state= QUERY_EXEC; - if (thd->wsrep_conflict_state!= REPLAYING) - thd->wsrep_conflict_state= NO_CONFLICT; - mysql_mutex_unlock(&thd->LOCK_thd_data); + *slider= '\0'; + len_= slider - str_ + 1; // +1: add \0 + + WSREP_DEBUG("Error buffer for thd %llu seqno %lld, %zu bytes: %s", + thd->thread_id, (long long)wsrep_thd_trx_seqno(thd), + len_, str_ ? str_ : "(null)"); +} + +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const void* events_buf, + size_t buf_len) +{ + char *buf= (char *)events_buf; + int rcode= 0; + int event= 1; + Log_event_type typ; + DBUG_ENTER("wsrep_apply_events"); if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", (long long) wsrep_thd_trx_seqno(thd)); - while(buf_len) + while (buf_len) { int exec_res; Log_event* ev= wsrep_read_log_event(&buf, &buf_len, - wsrep_get_apply_format(thd)); - + wsrep_get_apply_format(thd)); if (!ev) { WSREP_ERROR("applier could not read binlog event, seqno: %lld, len: %zu", (long long)wsrep_thd_trx_seqno(thd), buf_len); - rcode= 1; + rcode= WSREP_ERR_BAD_EVENT; goto error; } @@ -147,9 +184,12 @@ static wsrep_cb_status_t wsrep_apply_events(THD* thd, thd->set_server_id(ev->server_id); thd->set_time(); // time the query thd->transaction.start_time.reset(thd); + //#define mariadb_10_4_0 +#ifdef mariadb_10_4_0 wsrep_xid_init(&thd->transaction.xid_state.xid, thd->wsrep_trx_meta.gtid.uuid, thd->wsrep_trx_meta.gtid.seqno); +#endif thd->lex->current_select= 0; if (!ev->when) { @@ -162,13 +202,13 @@ static wsrep_cb_status_t wsrep_apply_events(THD* thd, (thd->variables.option_bits & ~OPTION_SKIP_REPLICATION) | (ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0); - ev->thd = thd; - exec_res = ev->apply_event(thd->wsrep_rgi); + ev->thd= thd; + exec_res= ev->apply_event(thd->wsrep_rgi); DBUG_PRINT("info", ("exec_event result: %d", exec_res)); if (exec_res) { - WSREP_WARN("RBR event %d %s apply warning: %d, %lld", + WSREP_WARN("Event %d %s apply failed: %d, seqno %lld", event, ev->get_type_str(), exec_res, (long long) wsrep_thd_trx_seqno(thd)); rcode= exec_res; @@ -178,230 +218,14 @@ static wsrep_cb_status_t wsrep_apply_events(THD* thd, } event++; - if (thd->wsrep_conflict_state!= NO_CONFLICT && - thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("conflict state after RBR event applying: %d, %lld", - thd->wsrep_query_state, (long long)wsrep_thd_trx_seqno(thd)); - - if (thd->wsrep_conflict_state == MUST_ABORT) { - WSREP_WARN("RBR event apply failed, rolling back: %lld", - (long long) wsrep_thd_trx_seqno(thd)); - trans_rollback(thd); - thd->locked_tables_list.unlock_locked_tables(thd); - /* Release transactional metadata locks. */ - thd->mdl_context.release_transactional_locks(); - thd->wsrep_conflict_state= NO_CONFLICT; - DBUG_RETURN(WSREP_CB_FAILURE); - } - delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); } - error: - mysql_mutex_lock(&thd->LOCK_thd_data); - thd->wsrep_query_state= QUERY_IDLE; - mysql_mutex_unlock(&thd->LOCK_thd_data); - - assert(thd->wsrep_exec_mode== REPL_RECV); - +error: if (thd->killed == KILL_CONNECTION) WSREP_INFO("applier aborted: %lld", (long long)wsrep_thd_trx_seqno(thd)); - if (rcode) DBUG_RETURN(WSREP_CB_FAILURE); - DBUG_RETURN(WSREP_CB_SUCCESS); -} - -wsrep_cb_status_t wsrep_apply_cb(void* const ctx, - const void* const buf, - size_t const buf_len, - uint32_t const flags, - const wsrep_trx_meta_t* meta) -{ - THD* const thd((THD*)ctx); - - assert(thd->wsrep_apply_toi == false); - - // Allow tests to block the applier thread using the DBUG facilities. - DBUG_EXECUTE_IF("sync.wsrep_apply_cb", - { - const char act[]= - "now " - "SIGNAL sync.wsrep_apply_cb_reached " - "WAIT_FOR signal.wsrep_apply_cb"; - DBUG_ASSERT(!debug_sync_set_action(thd, - STRING_WITH_LEN(act))); - };); - - thd->wsrep_trx_meta = *meta; - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Applying write set %lld: %p, %zu", - (long long)wsrep_thd_trx_seqno(thd), buf, buf_len); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "Applying write set"); -#endif /* WSREP_PROC_INFO */ - - /* tune FK and UK checking policy */ - if (wsrep_slave_UK_checks == FALSE) - thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS; - else - thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS; - - if (wsrep_slave_FK_checks == FALSE) - thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS; - else - thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; - - /* With galera we assume that the master has done the constraint checks */ - thd->variables.option_bits|= OPTION_NO_CHECK_CONSTRAINT_CHECKS; - - if (flags & WSREP_FLAG_ISOLATION) - { - thd->wsrep_apply_toi= true; - /* - Don't run in transaction mode with TOI actions. - */ - thd->variables.option_bits&= ~OPTION_BEGIN; - thd->server_status&= ~SERVER_STATUS_IN_TRANS; - } - wsrep_cb_status_t rcode(wsrep_apply_events(thd, buf, buf_len)); - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Applied write set %lld", (long long)wsrep_thd_trx_seqno(thd)); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "Applied write set"); -#endif /* WSREP_PROC_INFO */ - - if (WSREP_CB_SUCCESS != rcode) - { - wsrep_dump_rbr_buf_with_header(thd, buf, buf_len); - } - - if (thd->has_thd_temporary_tables()) - { - WSREP_DEBUG("Applier %lld has temporary tables. Closing them now..", - thd->thread_id); - thd->close_temporary_tables(); - } - - return rcode; -} - -static wsrep_cb_status_t wsrep_commit(THD* const thd) -{ -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Committing %lld", (long long)wsrep_thd_trx_seqno(thd)); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "Committing"); -#endif /* WSREP_PROC_INFO */ - - wsrep_cb_status_t const rcode(trans_commit(thd) ? - WSREP_CB_FAILURE : WSREP_CB_SUCCESS); - - if (WSREP_CB_SUCCESS == rcode) - { - thd->wsrep_rgi->cleanup_context(thd, false); -#ifdef GTID_SUPPORT - thd->variables.gtid_next.set_automatic(); -#endif /* GTID_SUPPORT */ - if (thd->wsrep_apply_toi) - { - wsrep_set_SE_checkpoint(thd->wsrep_trx_meta.gtid.uuid, - thd->wsrep_trx_meta.gtid.seqno); - } - } - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Committed %lld", (long long) wsrep_thd_trx_seqno(thd)); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "Committed"); -#endif /* WSREP_PROC_INFO */ - - return rcode; -} - -static wsrep_cb_status_t wsrep_rollback(THD* const thd) -{ -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Rolling back %lld", (long long)wsrep_thd_trx_seqno(thd)); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "Rolling back"); -#endif /* WSREP_PROC_INFO */ - - wsrep_cb_status_t const rcode(trans_rollback(thd) ? - WSREP_CB_FAILURE : WSREP_CB_SUCCESS); - -#ifdef WSREP_PROC_INFO - snprintf(thd->wsrep_info, sizeof(thd->wsrep_info) - 1, - "Rolled back %lld", (long long)wsrep_thd_trx_seqno(thd)); - thd_proc_info(thd, thd->wsrep_info); -#else - thd_proc_info(thd, "Rolled back"); -#endif /* WSREP_PROC_INFO */ - - return rcode; -} - -wsrep_cb_status_t wsrep_commit_cb(void* const ctx, - uint32_t const flags, - const wsrep_trx_meta_t* meta, - wsrep_bool_t* const exit, - bool const commit) -{ - THD* const thd((THD*)ctx); - - assert(meta->gtid.seqno == wsrep_thd_trx_seqno(thd)); - - wsrep_cb_status_t rcode; - - if (commit) - rcode = wsrep_commit(thd); - else - rcode = wsrep_rollback(thd); - - /* Cleanup */ wsrep_set_apply_format(thd, NULL); - thd->mdl_context.release_transactional_locks(); - thd->reset_query(); /* Mutex protected */ - free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC)); - thd->tx_isolation= (enum_tx_isolation) thd->variables.tx_isolation; - if (wsrep_slave_count_change < 0 && commit && WSREP_CB_SUCCESS == rcode) - { - mysql_mutex_lock(&LOCK_wsrep_slave_threads); - if (wsrep_slave_count_change < 0) - { - wsrep_slave_count_change++; - *exit = true; - } - mysql_mutex_unlock(&LOCK_wsrep_slave_threads); - } - - if (thd->wsrep_applier) - { - /* From trans_begin() */ - thd->variables.option_bits|= OPTION_BEGIN; - thd->server_status|= SERVER_STATUS_IN_TRANS; - thd->wsrep_apply_toi= false; - } - - return rcode; -} - - -wsrep_cb_status_t wsrep_unordered_cb(void* const ctx, - const void* const data, - size_t const size) -{ - return WSREP_CB_SUCCESS; + DBUG_RETURN(rcode); } |