diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/handler.cc | 58 | ||||
-rw-r--r-- | sql/handler.h | 1 | ||||
-rw-r--r-- | sql/log.cc | 264 | ||||
-rw-r--r-- | sql/log.h | 10 | ||||
-rw-r--r-- | sql/log_event.cc | 63 | ||||
-rw-r--r-- | sql/log_event.h | 213 | ||||
-rw-r--r-- | sql/log_event_client.cc | 37 | ||||
-rw-r--r-- | sql/log_event_server.cc | 292 | ||||
-rw-r--r-- | sql/rpl_parallel.cc | 38 | ||||
-rw-r--r-- | sql/rpl_parallel.h | 3 | ||||
-rw-r--r-- | sql/rpl_rli.cc | 9 | ||||
-rw-r--r-- | sql/slave.cc | 3 | ||||
-rw-r--r-- | sql/sql_repl.cc | 4 | ||||
-rw-r--r-- | sql/xa.cc | 233 | ||||
-rw-r--r-- | sql/xa.h | 14 |
15 files changed, 1051 insertions, 191 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index c3fee4c78c9..d43a9a0aaa0 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1354,11 +1354,42 @@ int ha_prepare(THD *thd) } } + + DEBUG_SYNC(thd, "at_unlog_xa_prepare"); + + if (tc_log->unlog_xa_prepare(thd, all)) + { + ha_rollback_trans(thd, all); + error=1; + } } DBUG_RETURN(error); } +/* + Like ha_check_and_coalesce_trx_read_only to return counted number of + read-write transaction participants limited to two, but works in the 'all' + context. + Also returns the last found rw ha_info through the 2nd argument. +*/ +uint ha_count_rw_all(THD *thd, Ha_trx_info **ptr_ha_info) +{ + unsigned rw_ha_count= 0; + + for (auto ha_info= thd->transaction.all.ha_list; ha_info; + ha_info= ha_info->next()) + { + if (ha_info->is_trx_read_write()) + { + *ptr_ha_info= ha_info; + if (++rw_ha_count > 1) + break; + } + } + return rw_ha_count; +} + /** Check if we can skip the two-phase commit. @@ -1630,10 +1661,6 @@ int ha_commit_trans(THD *thd, bool all) need_prepare_ordered= FALSE; need_commit_ordered= FALSE; - DBUG_ASSERT(thd->transaction.implicit_xid.get_my_xid() == - thd->transaction.implicit_xid.quick_get_my_xid()); - xid= thd->transaction.xid_state.is_explicit_XA() ? 0 : - thd->transaction.implicit_xid.quick_get_my_xid(); for (Ha_trx_info *hi= ha_info; hi; hi= hi->next()) { @@ -1658,6 +1685,18 @@ int ha_commit_trans(THD *thd, bool all) DEBUG_SYNC(thd, "ha_commit_trans_after_prepare"); DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE();); + if (!is_real_trans) + { + error= commit_one_phase_2(thd, all, trans, is_real_trans); + goto done; + } + + DBUG_ASSERT(thd->transaction.implicit_xid.get_my_xid() == + thd->transaction.implicit_xid.quick_get_my_xid()); + DBUG_ASSERT(!thd->transaction.xid_state.is_explicit_XA() || + thd->lex->xa_opt == XA_ONE_PHASE); + xid= thd->transaction.implicit_xid.quick_get_my_xid(); + #ifdef WITH_WSREP if (run_wsrep_hooks && !error) { @@ -1668,14 +1707,6 @@ int ha_commit_trans(THD *thd, bool all) xid= s.get(); } } -#endif /* WITH_WSREP */ - - if (!is_real_trans) - { - error= commit_one_phase_2(thd, all, trans, is_real_trans); - goto done; - } -#ifdef WITH_WSREP if (run_wsrep_hooks && (error = wsrep_before_commit(thd, all))) goto wsrep_err; #endif /* WITH_WSREP */ @@ -1915,7 +1946,8 @@ int ha_rollback_trans(THD *thd, bool all) rollback without signalling following transactions. And in release builds, we explicitly do the signalling before rolling back. */ - DBUG_ASSERT(!(thd->rgi_slave && thd->rgi_slave->did_mark_start_commit)); + DBUG_ASSERT(!(thd->rgi_slave && thd->rgi_slave->did_mark_start_commit) || + thd->transaction.xid_state.is_explicit_XA()); if (thd->rgi_slave && thd->rgi_slave->did_mark_start_commit) thd->rgi_slave->unmark_start_commit(); } diff --git a/sql/handler.h b/sql/handler.h index 9044b2c82c2..d3b159d0d06 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -5187,4 +5187,5 @@ void print_keydup_error(TABLE *table, KEY *key, myf errflag); int del_global_index_stat(THD *thd, TABLE* table, KEY* key_info); int del_global_table_stat(THD *thd, const LEX_CSTRING *db, const LEX_CSTRING *table); +uint ha_count_rw_all(THD *thd, Ha_trx_info **ptr_ha_info); #endif /* HANDLER_INCLUDED */ diff --git a/sql/log.cc b/sql/log.cc index 355118dc701..3e7f3a043c3 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -90,7 +90,13 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton, static int binlog_commit(handlerton *hton, THD *thd, bool all); static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); +static int binlog_xa_recover_dummy(handlerton *hton, XID *xid_list, uint len); +static int binlog_commit_by_xid(handlerton *hton, XID *xid); +static int binlog_rollback_by_xid(handlerton *hton, XID *xid); static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); +static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, + Log_event *end_ev, bool all, bool using_stmt, + bool using_trx); static const LEX_CSTRING write_error_msg= { STRING_WITH_LEN("error writing to the binary log") }; @@ -1692,6 +1698,10 @@ int binlog_init(void *p) { binlog_hton->prepare= binlog_prepare; binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; + binlog_hton->commit_by_xid= binlog_commit_by_xid; + binlog_hton->rollback_by_xid= binlog_rollback_by_xid; + // recover needs to be set to make xa{commit,rollback}_handlerton effective + binlog_hton->recover= binlog_xa_recover_dummy; } binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; return 0; @@ -1765,7 +1775,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, DBUG_PRINT("enter", ("end_ev: %p", end_ev)); if ((using_stmt && !cache_mngr->stmt_cache.empty()) || - (using_trx && !cache_mngr->trx_cache.empty())) + (using_trx && !cache_mngr->trx_cache.empty()) || + thd->transaction.xid_state.is_explicit_XA()) { if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE)) DBUG_RETURN(1); @@ -1837,6 +1848,17 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all, DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); } + +inline size_t serialize_with_xid(XID *xid, char *buf, + const char *query, size_t q_len) +{ + memcpy(buf, query, q_len); + + return + q_len + strlen(static_cast<event_xid_t*>(xid)->serialize(buf + q_len)); +} + + /** This function flushes the trx-cache upon commit. @@ -1850,11 +1872,28 @@ static inline int binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { DBUG_ENTER("binlog_commit_flush_trx_cache"); - Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - TRUE, TRUE, TRUE, 0); + + const char query[]= "XA COMMIT "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]= "COMMIT"; + size_t buflen= sizeof("COMMIT") - 1; + + if (thd->lex->sql_command == SQLCOM_XA_COMMIT && + thd->lex->xa_opt != XA_ONE_PHASE) + { + DBUG_ASSERT(thd->transaction.xid_state.is_explicit_XA()); + DBUG_ASSERT(thd->transaction.xid_state.get_state_code() == + XA_PREPARED); + + buflen= serialize_with_xid(thd->transaction.xid_state.get_xid(), + buf, query, q_len); + } + Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); + DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } + /** This function flushes the trx-cache upon rollback. @@ -1868,8 +1907,20 @@ static inline int binlog_rollback_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { - Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"), - TRUE, TRUE, TRUE, 0); + const char query[]= "XA ROLLBACK "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]= "ROLLBACK"; + size_t buflen= sizeof("ROLLBACK") - 1; + + if (thd->transaction.xid_state.is_explicit_XA()) + { + /* for not prepared use plain ROLLBACK */ + if (thd->transaction.xid_state.get_state_code() == XA_PREPARED) + buflen= serialize_with_xid(thd->transaction.xid_state.get_xid(), + buf, query, q_len); + } + Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } @@ -1887,23 +1938,10 @@ static inline int binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, bool all, my_xid xid) { - if (xid) - { - Xid_log_event end_evt(thd, xid, TRUE); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); - } - else - { - /* - Empty xid occurs in XA COMMIT ... ONE PHASE. - In this case, we do not have a MySQL xid for the transaction, and the - external XA transaction coordinator will have to handle recovery if - needed. So we end the transaction with a plain COMMIT query event. - */ - Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - TRUE, TRUE, TRUE, 0); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); - } + DBUG_ASSERT(xid); // replaced former treatment of ONE-PHASE XA + + Xid_log_event end_evt(thd, xid, TRUE); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } /** @@ -1959,17 +1997,62 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) DBUG_RETURN(error); } + +inline bool is_preparing_xa(THD *thd) +{ + return + thd->transaction.xid_state.is_explicit_XA() && + thd->lex->sql_command == SQLCOM_XA_PREPARE; +} + + static int binlog_prepare(handlerton *hton, THD *thd, bool all) { - /* - do nothing. - just pretend we can do 2pc, so that MySQL won't - switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_and_order() - */ + /* Do nothing unless the transaction is a user XA. */ + return is_preparing_xa(thd) ? binlog_commit(NULL, thd, all) : 0; +} + + +static int binlog_xa_recover_dummy(handlerton *hton __attribute__((unused)), + XID *xid_list __attribute__((unused)), + uint len __attribute__((unused))) +{ + /* Does nothing. */ return 0; } + +static int binlog_commit_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + + (void) thd->binlog_setup_trx_data(); + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT); + + return binlog_commit(hton, thd, TRUE); +} + + +static int binlog_rollback_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + + (void) thd->binlog_setup_trx_data(); + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK || + (thd->transaction.xid_state.get_state_code() == XA_ROLLBACK_ONLY)); + return binlog_rollback(hton, thd, TRUE); +} + + +inline bool is_prepared_xa(THD *thd) +{ + return thd->transaction.xid_state.is_explicit_XA() && + thd->transaction.xid_state.get_state_code() == XA_PREPARED; +} + + /* We flush the cache wrapped in a beging/rollback if: . aborting a single or multi-statement transaction and; @@ -1992,7 +2075,50 @@ static bool trans_cannot_safely_rollback(THD *thd, bool all) thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED) || (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && - thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED)); + thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED) || + is_prepared_xa(thd)); +} + + +/** + Specific log flusher invoked through log_xa_prepare(). +*/ +static int binlog_commit_flush_xa_prepare(THD *thd, bool all, + binlog_cache_mngr *cache_mngr) +{ + XID *xid= thd->transaction.xid_state.get_xid(); + { + // todo assert wsrep_simulate || is_open() + + /* + Log the XA END event first. + We don't do that in trans_xa_end() as XA COMMIT ONE PHASE + is logged as simple BEGIN/COMMIT so the XA END should + not get to the log. + */ + const char query[]= "XA END "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]; + size_t buflen; + binlog_cache_data *cache_data; + IO_CACHE *file; + + memcpy(buf, query, q_len); + buflen= q_len + + strlen(static_cast<event_xid_t*>(xid)->serialize(buf + q_len)); + cache_data= cache_mngr->get_binlog_cache_data(true); + file= &cache_data->cache_log; + thd->lex->sql_command= SQLCOM_XA_END; + Query_log_event xa_end(thd, buf, buflen, true, false, true, 0); + if (mysql_bin_log.write_event(&xa_end, cache_data, file)) + return 1; + thd->lex->sql_command= SQLCOM_XA_PREPARE; + } + + cache_mngr->using_xa= FALSE; + XA_prepare_log_event end_evt(thd, xid, FALSE); + + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } @@ -2019,7 +2145,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) if (!cache_mngr) { - DBUG_ASSERT(WSREP(thd)); + DBUG_ASSERT(WSREP(thd) || + (thd->lex->sql_command != SQLCOM_XA_PREPARE && + !(thd->lex->sql_command == SQLCOM_XA_COMMIT && + thd->lex->xa_opt == XA_ONE_PHASE))); + DBUG_RETURN(0); } @@ -2038,7 +2168,8 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); } - if (cache_mngr->trx_cache.empty()) + if (cache_mngr->trx_cache.empty() && + thd->transaction.xid_state.get_state_code() != XA_PREPARED) { /* we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() @@ -2055,8 +2186,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) Otherwise, we accumulate the changes. */ if (likely(!error) && ending_trans(thd, all)) - error= binlog_commit_flush_trx_cache(thd, all, cache_mngr); - + { + error= is_preparing_xa(thd) ? + binlog_commit_flush_xa_prepare(thd, all, cache_mngr) : + binlog_commit_flush_trx_cache (thd, all, cache_mngr); + } /* This is part of the stmt rollback. */ @@ -2080,6 +2214,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); + int error= 0; binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); @@ -2087,6 +2222,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) if (!cache_mngr) { DBUG_ASSERT(WSREP(thd)); + DBUG_ASSERT(thd->lex->sql_command != SQLCOM_XA_ROLLBACK); + DBUG_RETURN(0); } @@ -2101,15 +2238,16 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ if (cache_mngr->stmt_cache.has_incident()) { - error= mysql_bin_log.write_incident(thd); + error |= static_cast<int>(mysql_bin_log.write_incident(thd)); cache_mngr->reset(true, false); } else if (!cache_mngr->stmt_cache.empty()) { - error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); + error |= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); } - if (cache_mngr->trx_cache.empty()) + if (cache_mngr->trx_cache.empty() && + thd->transaction.xid_state.get_state_code() != XA_PREPARED) { /* we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() @@ -7344,10 +7482,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, entry.all= all; entry.using_stmt_cache= using_stmt_cache; entry.using_trx_cache= using_trx_cache; - entry.need_unlog= false; + entry.need_unlog= is_preparing_xa(thd); ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; - for (; ha_info; ha_info= ha_info->next()) + for (; !entry.need_unlog && ha_info; ha_info= ha_info->next()) { if (ha_info->is_started() && ha_info->ht() != binlog_hton && !ha_info->ht()->commit_checkpoint_request) @@ -7920,7 +8058,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) We already checked before that at least one cache is non-empty; if both are empty we would have skipped calling into here. */ - DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); + DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || + !cache_mngr->trx_cache.empty() || + current->thd->transaction.xid_state.is_explicit_XA()); if (unlikely((current->error= write_transaction_or_stmt(current, commit_id)))) @@ -7929,7 +8069,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) strmake_buf(cache_mngr->last_commit_pos_file, log_file_name); commit_offset= my_b_write_tell(&log_file); cache_mngr->last_commit_pos_offset= commit_offset; - if (cache_mngr->using_xa && cache_mngr->xa_xid) + if ((cache_mngr->using_xa && cache_mngr->xa_xid) || current->need_unlog) { /* If all storage engines support commit_checkpoint_request(), then we @@ -8164,7 +8304,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, binlog_cache_mngr *mngr= entry->cache_mngr; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_or_stmt"); - if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id)) + if (write_gtid_event(entry->thd, is_prepared_xa(entry->thd), + entry->using_trx_cache, commit_id)) DBUG_RETURN(ER_ERROR_ON_WRITE); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && @@ -9868,6 +10009,46 @@ int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie)); } +static bool write_empty_xa_prepare(THD *thd, binlog_cache_mngr *cache_mngr) +{ + return binlog_commit_flush_xa_prepare(thd, true, cache_mngr); +} + +int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all) +{ + DBUG_ASSERT(is_preparing_xa(thd)); + + binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); + int cookie= 0; + + if (!cache_mngr->need_unlog) + { + Ha_trx_info *ha_info; + uint rw_count= ha_count_rw_all(thd, &ha_info); + bool rc= false; + + if (rw_count > 0) + { + /* an empty XA-prepare event group is logged */ +#ifndef DBUG_OFF + for (ha_info= thd->transaction.all.ha_list; rw_count > 1 && ha_info; + ha_info= ha_info->next()) + DBUG_ASSERT(ha_info->ht() != binlog_hton); +#endif + rc= write_empty_xa_prepare(thd, cache_mngr); // normally gains need_unlog + trans_register_ha(thd, true, binlog_hton, 0); // do it for future commmit + } + if (rw_count == 0 || !cache_mngr->need_unlog) + return rc; + } + + cookie= BINLOG_COOKIE_MAKE(cache_mngr->binlog_id, cache_mngr->delayed_error); + cache_mngr->need_unlog= false; + + return unlog(cookie, 1); +} + + void TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie) { @@ -10184,6 +10365,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, ((last_gtid_standalone && !ev->is_part_of_group(typ)) || (!last_gtid_standalone && (typ == XID_EVENT || + typ == XA_PREPARE_LOG_EVENT || (LOG_EVENT_IS_QUERY(typ) && (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback())))))) diff --git a/sql/log.h b/sql/log.h index e4424778111..41b25af0eaa 100644 --- a/sql/log.h +++ b/sql/log.h @@ -61,6 +61,7 @@ class TC_LOG bool need_prepare_ordered, bool need_commit_ordered) = 0; virtual int unlog(ulong cookie, my_xid xid)=0; + virtual int unlog_xa_prepare(THD *thd, bool all)= 0; virtual void commit_checkpoint_notify(void *cookie)= 0; protected: @@ -115,6 +116,10 @@ public: return 1; } int unlog(ulong cookie, my_xid xid) { return 0; } + int unlog_xa_prepare(THD *thd, bool all) + { + return 0; + } void commit_checkpoint_notify(void *cookie) { DBUG_ASSERT(0); }; }; @@ -198,6 +203,10 @@ class TC_LOG_MMAP: public TC_LOG int log_and_order(THD *thd, my_xid xid, bool all, bool need_prepare_ordered, bool need_commit_ordered); int unlog(ulong cookie, my_xid xid); + int unlog_xa_prepare(THD *thd, bool all) + { + return 0; + } void commit_checkpoint_notify(void *cookie); int recover(); @@ -697,6 +706,7 @@ public: int log_and_order(THD *thd, my_xid xid, bool all, bool need_prepare_ordered, bool need_commit_ordered); int unlog(ulong cookie, my_xid xid); + int unlog_xa_prepare(THD *thd, bool all); void commit_checkpoint_notify(void *cookie); int recover(LOG_INFO *linfo, const char *last_log_name, IO_CACHE *first_log, Format_description_log_event *fdle, bool do_xa); diff --git a/sql/log_event.cc b/sql/log_event.cc index 7ee38a007d1..d9537839f6c 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -18,7 +18,7 @@ #include "mariadb.h" #include "sql_priv.h" - +#include "handler.h" #ifndef MYSQL_CLIENT #include "unireg.h" #include "log_event.h" @@ -1182,6 +1182,9 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case XID_EVENT: ev = new Xid_log_event(buf, fdle); break; + case XA_PREPARE_LOG_EVENT: + ev = new XA_prepare_log_event(buf, fdle); + break; case RAND_EVENT: ev = new Rand_log_event(buf, fdle); break; @@ -1233,7 +1236,6 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, case PREVIOUS_GTIDS_LOG_EVENT: case TRANSACTION_CONTEXT_EVENT: case VIEW_CHANGE_EVENT: - case XA_PREPARE_LOG_EVENT: ev= new Ignorable_log_event(buf, fdle, get_type_str((Log_event_type) event_type)); break; @@ -2073,6 +2075,7 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver) post_header_len[USER_VAR_EVENT-1]= USER_VAR_HEADER_LEN; post_header_len[FORMAT_DESCRIPTION_EVENT-1]= FORMAT_DESCRIPTION_HEADER_LEN; post_header_len[XID_EVENT-1]= XID_HEADER_LEN; + post_header_len[XA_PREPARE_LOG_EVENT-1]= XA_PREPARE_HEADER_LEN; post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= BEGIN_LOAD_QUERY_HEADER_LEN; post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= EXECUTE_LOAD_QUERY_HEADER_LEN; /* @@ -2577,7 +2580,7 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, buf+= 8; domain_id= uint4korr(buf); buf+= 4; - flags2= *buf; + flags2= *(buf++); if (flags2 & FL_GROUP_COMMIT_ID) { if (event_len < (uint)header_size + GTID_HEADER_LEN + 2) @@ -2585,8 +2588,21 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len, seq_no= 0; // So is_valid() returns false return; } - ++buf; commit_id= uint8korr(buf); + buf+= 8; + } + if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA)) + { + xid.formatID= uint4korr(buf); + buf+= 4; + + xid.gtrid_length= (long) buf[0]; + xid.bqual_length= (long) buf[1]; + buf+= 2; + + long data_length= xid.bqual_length + xid.gtrid_length; + memcpy(xid.data, buf, data_length); + buf+= data_length; } } @@ -2773,7 +2789,7 @@ Rand_log_event::Rand_log_event(const char* buf, Xid_log_event:: Xid_log_event(const char* buf, const Format_description_log_event *description_event) - :Log_event(buf, description_event) + :Xid_apply_log_event(buf, description_event) { /* The Post-Header is empty. The Variable Data part begins immediately. */ buf+= description_event->common_header_len + @@ -2781,6 +2797,43 @@ Xid_log_event(const char* buf, memcpy((char*) &xid, buf, sizeof(xid)); } +/************************************************************************** + XA_prepare_log_event methods +**************************************************************************/ +XA_prepare_log_event:: +XA_prepare_log_event(const char* buf, + const Format_description_log_event *description_event) + :Xid_apply_log_event(buf, description_event) +{ + buf+= description_event->common_header_len + + description_event->post_header_len[XA_PREPARE_LOG_EVENT-1]; + one_phase= * (bool *) buf; + buf+= 1; + + m_xid.formatID= uint4korr(buf); + buf+= 4; + m_xid.gtrid_length= uint4korr(buf); + buf+= 4; + // Todo: validity here and elsewhere checks to be replaced by MDEV-21839 fixes + if (m_xid.gtrid_length <= 0 || m_xid.gtrid_length > MAXGTRIDSIZE) + { + m_xid.formatID= -1; + return; + } + m_xid.bqual_length= uint4korr(buf); + buf+= 4; + if (m_xid.bqual_length < 0 || m_xid.bqual_length > MAXBQUALSIZE) + { + m_xid.formatID= -1; + return; + } + DBUG_ASSERT(m_xid.gtrid_length + m_xid.bqual_length <= XIDDATASIZE); + + memcpy(m_xid.data, buf, m_xid.gtrid_length + m_xid.bqual_length); + + xid= NULL; +} + /************************************************************************** User_var_log_event methods diff --git a/sql/log_event.h b/sql/log_event.h index 67cf27b60a0..b634c6e2c94 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -222,6 +222,7 @@ class String; #define GTID_HEADER_LEN 19 #define GTID_LIST_HEADER_LEN 4 #define START_ENCRYPTION_HEADER_LEN 0 +#define XA_PREPARE_HEADER_LEN 0 /* Max number of possible extra bytes in a replication event compared to a @@ -664,6 +665,7 @@ enum Log_event_type /* MySQL 5.7 events, ignored by MariaDB */ TRANSACTION_CONTEXT_EVENT= 36, VIEW_CHANGE_EVENT= 37, + /* not ignored */ XA_PREPARE_LOG_EVENT= 38, /* @@ -3023,6 +3025,32 @@ private: #endif }; + +class Xid_apply_log_event: public Log_event +{ +public: +#ifdef MYSQL_SERVER + Xid_apply_log_event(THD* thd_arg): + Log_event(thd_arg, 0, TRUE) {} +#endif + Xid_apply_log_event(const char* buf, + const Format_description_log_event *description_event): + Log_event(buf, description_event) {} + + ~Xid_apply_log_event() {} + bool is_valid() const { return 1; } +private: +#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) + virtual int do_commit()= 0; + virtual int do_apply_event(rpl_group_info *rgi); + int do_record_gtid(THD *thd, rpl_group_info *rgi, bool in_trans, + void **out_hton); + enum_skip_reason do_shall_skip(rpl_group_info *rgi); + virtual const char* get_query()= 0; +#endif +}; + + /** @class Xid_log_event @@ -3035,18 +3063,22 @@ private: typedef ulonglong my_xid; // this line is the same as in handler.h #endif -class Xid_log_event: public Log_event +class Xid_log_event: public Xid_apply_log_event { - public: - my_xid xid; +public: + my_xid xid; #ifdef MYSQL_SERVER Xid_log_event(THD* thd_arg, my_xid x, bool direct): - Log_event(thd_arg, 0, TRUE), xid(x) + Xid_apply_log_event(thd_arg), xid(x) { if (direct) cache_type= Log_event::EVENT_NO_CACHE; } + const char* get_query() + { + return "COMMIT /* implicit, from Xid_log_event */"; + } #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); #endif /* HAVE_REPLICATION */ @@ -3062,15 +3094,172 @@ class Xid_log_event: public Log_event #ifdef MYSQL_SERVER bool write(); #endif - bool is_valid() const { return 1; } private: #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - virtual int do_apply_event(rpl_group_info *rgi); - enum_skip_reason do_shall_skip(rpl_group_info *rgi); + int do_commit(); #endif }; + +/** + @class XA_prepare_log_event + + Similar to Xid_log_event except that + - it is specific to XA transaction + - it carries out the prepare logics rather than the final committing + when @c one_phase member is off. The latter option is only for + compatibility with the upstream. + + From the groupping perspective the event finalizes the current + "prepare" group that is started with Gtid_log_event similarly to the + regular replicated transaction. +*/ + +/** + Function serializes XID which is characterized by by four last arguments + of the function. + Serialized XID is presented in valid hex format and is returned to + the caller in a buffer pointed by the first argument. + The buffer size provived by the caller must be not less than + 8 + 2 * XIDDATASIZE + 4 * sizeof(XID::formatID) + 1, see + {MYSQL_,}XID definitions. + + @param buf pointer to a buffer allocated for storing serialized data + @param fmt formatID value + @param gln gtrid_length value + @param bln bqual_length value + @param dat data value + + @return the value of the buffer pointer +*/ + +inline char *serialize_xid(char *buf, long fmt, long gln, long bln, + const char *dat) +{ + int i; + char *c= buf; + /* + Build a string consisting of the hex format representation of XID + as passed through fmt,gln,bln,dat argument: + X'hex11hex12...hex1m',X'hex21hex22...hex2n',11 + and store it into buf. + */ + c[0]= 'X'; + c[1]= '\''; + c+= 2; + for (i= 0; i < gln; i++) + { + c[0]=_dig_vec_lower[((uchar*) dat)[i] >> 4]; + c[1]=_dig_vec_lower[((uchar*) dat)[i] & 0x0f]; + c+= 2; + } + c[0]= '\''; + c[1]= ','; + c[2]= 'X'; + c[3]= '\''; + c+= 4; + + for (; i < gln + bln; i++) + { + c[0]=_dig_vec_lower[((uchar*) dat)[i] >> 4]; + c[1]=_dig_vec_lower[((uchar*) dat)[i] & 0x0f]; + c+= 2; + } + c[0]= '\''; + sprintf(c+1, ",%lu", fmt); + + return buf; +} + +/* + The size of the string containing serialized Xid representation + is computed as a sum of + eight as the number of formatting symbols (X'',X'',) + plus 2 x XIDDATASIZE (2 due to hex format), + plus space for decimal digits of XID::formatID, + plus one for 0x0. +*/ +static const uint ser_buf_size= + 8 + 2 * MYSQL_XIDDATASIZE + 4 * sizeof(long) + 1; + +struct event_mysql_xid_t : MYSQL_XID +{ + char buf[ser_buf_size]; + char *serialize() + { + return serialize_xid(buf, formatID, gtrid_length, bqual_length, data); + } +}; + +#ifndef MYSQL_CLIENT +struct event_xid_t : XID +{ + char buf[ser_buf_size]; + + char *serialize(char *buf_arg) + { + return serialize_xid(buf_arg, formatID, gtrid_length, bqual_length, data); + } + char *serialize() + { + return serialize(buf); + } +}; +#endif + +class XA_prepare_log_event: public Xid_apply_log_event +{ +protected: + + /* Constant contributor to subheader in write() by members of XID struct. */ + static const int xid_subheader_no_data= 12; + event_mysql_xid_t m_xid; + void *xid; + bool one_phase; + +public: +#ifdef MYSQL_SERVER + XA_prepare_log_event(THD* thd_arg, XID *xid_arg, bool one_phase_arg): + Xid_apply_log_event(thd_arg), xid(xid_arg), one_phase(one_phase_arg) + { + cache_type= Log_event::EVENT_NO_CACHE; + } +#ifdef HAVE_REPLICATION + void pack_info(Protocol* protocol); +#endif /* HAVE_REPLICATION */ +#else + bool print(FILE* file, PRINT_EVENT_INFO* print_event_info); +#endif + XA_prepare_log_event(const char* buf, + const Format_description_log_event *description_event); + ~XA_prepare_log_event() {} + Log_event_type get_type_code() { return XA_PREPARE_LOG_EVENT; } + bool is_valid() const { return m_xid.formatID != -1; } + int get_data_size() + { + return xid_subheader_no_data + m_xid.gtrid_length + m_xid.bqual_length; + } + +#ifdef MYSQL_SERVER + bool write(); +#endif + +private: +#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) + char query[sizeof("XA COMMIT ONE PHASE") + 1 + ser_buf_size]; + int do_commit(); + const char* get_query() + { + sprintf(query, + (one_phase ? "XA COMMIT %s ONE PHASE" : "XA PREPARE %s"), + m_xid.serialize()); + return query; + } +#endif +}; + + /** @class User_var_log_event @@ -3383,8 +3572,12 @@ public: uint64 seq_no; uint64 commit_id; uint32 domain_id; +#ifdef MYSQL_SERVER + event_xid_t xid; +#else + event_mysql_xid_t xid; +#endif uchar flags2; - /* Flags2. */ /* FL_STANDALONE is set when there is no terminating COMMIT event. */ @@ -3411,6 +3604,10 @@ public: static const uchar FL_WAITED= 16; /* FL_DDL is set for event group containing DDL. */ static const uchar FL_DDL= 32; + /* FL_PREPARED_XA is set for XA transaction. */ + static const uchar FL_PREPARED_XA= 64; + /* FL_"COMMITTED or ROLLED-BACK"_XA is set for XA transaction. */ + static const uchar FL_COMPLETED_XA= 128; #ifdef MYSQL_SERVER Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone, diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc index 6ee5587943d..7cb9c90eec6 100644 --- a/sql/log_event_client.cc +++ b/sql/log_event_client.cc @@ -3892,11 +3892,44 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) buf, print_event_info->delimiter)) goto err; } - if (!(flags2 & FL_STANDALONE)) - if (my_b_printf(&cache, is_flashback ? "COMMIT\n%s\n" : "BEGIN\n%s\n", print_event_info->delimiter)) + if ((flags2 & FL_PREPARED_XA) && !is_flashback) + { + my_b_write_string(&cache, "XA START "); + xid.serialize(); + my_b_write(&cache, (uchar*) xid.buf, strlen(xid.buf)); + if (my_b_printf(&cache, "%s\n", print_event_info->delimiter)) + goto err; + } + else if (!(flags2 & FL_STANDALONE)) + { + if (my_b_printf(&cache, is_flashback ? "COMMIT\n%s\n" : "BEGIN\n%s\n", + print_event_info->delimiter)) goto err; + } return cache.flush_data(); err: return 1; } + +bool XA_prepare_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) +{ + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F, this); + m_xid.serialize(); + + if (!print_event_info->short_form) + { + print_header(&cache, print_event_info, FALSE); + if (my_b_printf(&cache, "\tXID = %s\n", m_xid.buf)) + goto error; + } + + if (my_b_printf(&cache, "XA PREPARE %s\n%s\n", + m_xid.buf, print_event_info->delimiter)) + goto error; + + return cache.flush_data(); +error: + return TRUE; +} diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index ba904a254e9..f7156013069 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -1487,6 +1487,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, size_t que case SQLCOM_RELEASE_SAVEPOINT: case SQLCOM_ROLLBACK_TO_SAVEPOINT: case SQLCOM_SAVEPOINT: + case SQLCOM_XA_END: use_cache= trx_cache= TRUE; break; default: @@ -3218,6 +3219,18 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, /* Preserve any DDL or WAITED flag in the slave's binlog. */ if (thd_arg->rgi_slave) flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED)); + + XID_STATE &xid_state= thd->transaction.xid_state; + if (is_transactional && xid_state.is_explicit_XA() && + (thd->lex->sql_command == SQLCOM_XA_PREPARE || + xid_state.get_state_code() == XA_PREPARED)) + { + DBUG_ASSERT(thd->lex->xa_opt != XA_ONE_PHASE); + + flags2|= thd->lex->sql_command == SQLCOM_XA_PREPARE ? + FL_PREPARED_XA : FL_COMPLETED_XA; + xid.set(xid_state.get_xid()); + } } @@ -3260,7 +3273,7 @@ Gtid_log_event::peek(const char *event_start, size_t event_len, bool Gtid_log_event::write() { - uchar buf[GTID_HEADER_LEN+2]; + uchar buf[GTID_HEADER_LEN+2+sizeof(XID)]; size_t write_len; int8store(buf, seq_no); @@ -3272,8 +3285,22 @@ Gtid_log_event::write() write_len= GTID_HEADER_LEN + 2; } else + write_len= 13; + + if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA)) { - bzero(buf+13, GTID_HEADER_LEN-13); + int4store(&buf[write_len], xid.formatID); + buf[write_len +4]= (uchar) xid.gtrid_length; + buf[write_len +4+1]= (uchar) xid.bqual_length; + write_len+= 6; + long data_length= xid.bqual_length + xid.gtrid_length; + memcpy(buf+write_len, xid.data, data_length); + write_len+= data_length; + } + + if (write_len < GTID_HEADER_LEN) + { + bzero(buf+write_len, GTID_HEADER_LEN-write_len); write_len= GTID_HEADER_LEN; } return write_header(write_len) || @@ -3316,9 +3343,14 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, void Gtid_log_event::pack_info(Protocol *protocol) { - char buf[6+5+10+1+10+1+20+1+4+20+1]; + char buf[6+5+10+1+10+1+20+1+4+20+1+ ser_buf_size+5 /* sprintf */]; char *p; - p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID ")); + p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : + flags2 & FL_PREPARED_XA ? "XA START " : "BEGIN GTID ")); + if (flags2 & FL_PREPARED_XA) + { + p+= sprintf(p, "%s GTID ", xid.serialize()); + } p= longlong10_to_str(domain_id, p, 10); *p++= '-'; p= longlong10_to_str(server_id, p, 10); @@ -3378,16 +3410,37 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) bits|= (ulonglong)OPTION_RPL_SKIP_PARALLEL; thd->variables.option_bits= bits; DBUG_PRINT("info", ("Set OPTION_GTID_BEGIN")); - thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1, - &my_charset_bin, next_query_id()); - thd->lex->sql_command= SQLCOM_BEGIN; thd->is_slave_error= 0; - status_var_increment(thd->status_var.com_stat[thd->lex->sql_command]); - if (trans_begin(thd, 0)) + + char buf_xa[sizeof("XA START") + 1 + ser_buf_size]; + if (flags2 & FL_PREPARED_XA) { - DBUG_PRINT("error", ("trans_begin() failed")); - thd->is_slave_error= 1; + const char fmt[]= "XA START %s"; + + thd->lex->xid= &xid; + thd->lex->xa_opt= XA_NONE; + sprintf(buf_xa, fmt, xid.serialize()); + thd->set_query_and_id(buf_xa, static_cast<uint32>(strlen(buf_xa)), + &my_charset_bin, next_query_id()); + thd->lex->sql_command= SQLCOM_XA_START; + if (trans_xa_start(thd)) + { + DBUG_PRINT("error", ("trans_xa_start() failed")); + thd->is_slave_error= 1; + } } + else + { + thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string)-1, + &my_charset_bin, next_query_id()); + thd->lex->sql_command= SQLCOM_BEGIN; + if (trans_begin(thd, 0)) + { + DBUG_PRINT("error", ("trans_begin() failed")); + thd->is_slave_error= 1; + } + } + status_var_increment(thd->status_var.com_stat[thd->lex->sql_command]); thd->update_stats(); if (likely(!thd->is_slave_error)) @@ -3771,46 +3824,58 @@ bool slave_execute_deferred_events(THD *thd) /************************************************************************** - Xid_log_event methods + Xid_apply_log_event methods **************************************************************************/ #if defined(HAVE_REPLICATION) -void Xid_log_event::pack_info(Protocol *protocol) + +int Xid_apply_log_event::do_record_gtid(THD *thd, rpl_group_info *rgi, + bool in_trans, void **out_hton) { - char buf[128], *pos; - pos= strmov(buf, "COMMIT /* xid="); - pos= longlong10_to_str(xid, pos, 10); - pos= strmov(pos, " */"); - protocol->store(buf, (uint) (pos-buf), &my_charset_bin); -} -#endif + int err= 0; + Relay_log_info const *rli= rgi->rli; + rgi->gtid_pending= false; + err= rpl_global_gtid_slave_state->record_gtid(thd, &rgi->current_gtid, + rgi->gtid_sub_id, + in_trans, false, out_hton); -bool Xid_log_event::write() -{ - DBUG_EXECUTE_IF("do_not_write_xid", return 0;); - return write_header(sizeof(xid)) || - write_data((uchar*)&xid, sizeof(xid)) || - write_footer(); -} + if (unlikely(err)) + { + int ec= thd->get_stmt_da()->sql_errno(); + /* + Do not report an error if this is really a kill due to a deadlock. + In this case, the transaction will be re-tried instead. + */ + if (!is_parallel_retry_error(rgi, ec)) + rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(), + "Error during XID COMMIT: failed to update GTID state in " + "%s.%s: %d: %s", + "mysql", rpl_gtid_slave_state_table_name.str, ec, + thd->get_stmt_da()->message()); + thd->is_slave_error= 1; + } + return err; +} -#if defined(HAVE_REPLICATION) -int Xid_log_event::do_apply_event(rpl_group_info *rgi) +int Xid_apply_log_event::do_apply_event(rpl_group_info *rgi) { bool res; int err; - rpl_gtid gtid; uint64 sub_id= 0; - Relay_log_info const *rli= rgi->rli; void *hton= NULL; + rpl_gtid gtid; /* - XID_EVENT works like a COMMIT statement. And it also updates the - mysql.gtid_slave_pos table with the GTID of the current transaction. - + An instance of this class such as XID_EVENT works like a COMMIT + statement. It updates mysql.gtid_slave_pos with the GTID of the + current transaction. Therefore, it acts much like a normal SQL statement, so we need to do THD::reset_for_next_command() as if starting a new statement. + + XA_PREPARE_LOG_EVENT also updates the gtid table *but* the update gets + committed as separate "autocommit" transaction. */ thd->reset_for_next_command(); /* @@ -3824,41 +3889,34 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) if (rgi->gtid_pending) { sub_id= rgi->gtid_sub_id; - rgi->gtid_pending= false; - gtid= rgi->current_gtid; - err= rpl_global_gtid_slave_state->record_gtid(thd, >id, sub_id, true, - false, &hton); - if (unlikely(err)) + + if (!thd->transaction.xid_state.is_explicit_XA()) { - int ec= thd->get_stmt_da()->sql_errno(); - /* - Do not report an error if this is really a kill due to a deadlock. - In this case, the transaction will be re-tried instead. - */ - if (!is_parallel_retry_error(rgi, ec)) - rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(), - "Error during XID COMMIT: failed to update GTID state in " - "%s.%s: %d: %s", - "mysql", rpl_gtid_slave_state_table_name.str, ec, - thd->get_stmt_da()->message()); - thd->is_slave_error= 1; - return err; + if ((err= do_record_gtid(thd, rgi, true /* in_trans */, &hton))) + return err; + + DBUG_EXECUTE_IF("gtid_fail_after_record_gtid", + { + my_error(ER_ERROR_DURING_COMMIT, MYF(0), + HA_ERR_WRONG_COMMAND); + thd->is_slave_error= 1; + return 1; + }); } - - DBUG_EXECUTE_IF("gtid_fail_after_record_gtid", - { my_error(ER_ERROR_DURING_COMMIT, MYF(0), HA_ERR_WRONG_COMMAND); - thd->is_slave_error= 1; - return 1; - }); } - /* For a slave Xid_log_event is COMMIT */ - general_log_print(thd, COM_QUERY, - "COMMIT /* implicit, from Xid_log_event */"); + general_log_print(thd, COM_QUERY, get_query()); thd->variables.option_bits&= ~OPTION_GTID_BEGIN; - res= trans_commit(thd); /* Automatically rolls back on error. */ - thd->mdl_context.release_transactional_locks(); + res= do_commit(); + if (!res && rgi->gtid_pending) + { + DBUG_ASSERT(!thd->transaction.xid_state.is_explicit_XA()); + + if ((err= do_record_gtid(thd, rgi, false, &hton))) + return err; + } + #ifdef WITH_WSREP if (WSREP(thd)) mysql_mutex_lock(&thd->LOCK_thd_data); if ((!res || (WSREP(thd) && thd->wsrep_trx().state() == wsrep::transaction::s_must_replay )) && sub_id) @@ -3872,15 +3930,17 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) /* Increment the global status commit count variable */ - status_var_increment(thd->status_var.com_stat[SQLCOM_COMMIT]); + enum enum_sql_command cmd= !thd->transaction.xid_state.is_explicit_XA() ? + SQLCOM_COMMIT : SQLCOM_XA_PREPARE; + status_var_increment(thd->status_var.com_stat[cmd]); return res; } Log_event::enum_skip_reason -Xid_log_event::do_shall_skip(rpl_group_info *rgi) +Xid_apply_log_event::do_shall_skip(rpl_group_info *rgi) { - DBUG_ENTER("Xid_log_event::do_shall_skip"); + DBUG_ENTER("Xid_apply_log_event::do_shall_skip"); if (rgi->rli->slave_skip_counter > 0) { DBUG_ASSERT(!rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION)); @@ -3904,9 +3964,108 @@ Xid_log_event::do_shall_skip(rpl_group_info *rgi) #endif DBUG_RETURN(Log_event::do_shall_skip(rgi)); } +#endif /* HAVE_REPLICATION */ + +/************************************************************************** + Xid_log_event methods +**************************************************************************/ + +#if defined(HAVE_REPLICATION) +void Xid_log_event::pack_info(Protocol *protocol) +{ + char buf[128], *pos; + pos= strmov(buf, "COMMIT /* xid="); + pos= longlong10_to_str(xid, pos, 10); + pos= strmov(pos, " */"); + protocol->store(buf, (uint) (pos-buf), &my_charset_bin); +} + + +int Xid_log_event::do_commit() +{ + bool res; + res= trans_commit(thd); /* Automatically rolls back on error. */ + thd->mdl_context.release_transactional_locks(); + return res; +} +#endif + + +bool Xid_log_event::write() +{ + DBUG_EXECUTE_IF("do_not_write_xid", return 0;); + return write_header(sizeof(xid)) || + write_data((uchar*)&xid, sizeof(xid)) || + write_footer(); +} + +/************************************************************************** + XA_prepare_log_event methods +**************************************************************************/ + +#if defined(HAVE_REPLICATION) +void XA_prepare_log_event::pack_info(Protocol *protocol) +{ + char query[sizeof("XA COMMIT ONE PHASE") + 1 + ser_buf_size]; + + sprintf(query, + (one_phase ? "XA COMMIT %s ONE PHASE" : "XA PREPARE %s"), + m_xid.serialize()); + + protocol->store(query, strlen(query), &my_charset_bin); +} + + +int XA_prepare_log_event::do_commit() +{ + int res; + xid_t xid; + xid.set(m_xid.formatID, + m_xid.data, m_xid.gtrid_length, + m_xid.data + m_xid.gtrid_length, m_xid.bqual_length); + + thd->lex->xid= &xid; + if (!one_phase) + { + if ((res= thd->wait_for_prior_commit())) + return res; + + thd->lex->sql_command= SQLCOM_XA_PREPARE; + res= trans_xa_prepare(thd); + } + else + { + res= trans_xa_commit(thd); + thd->mdl_context.release_transactional_locks(); + } + + return res; +} #endif // HAVE_REPLICATION +bool XA_prepare_log_event::write() +{ + uchar data[1 + 4 + 4 + 4]= {one_phase,}; + uint8 one_phase_byte= one_phase; + + int4store(data+1, static_cast<XID*>(xid)->formatID); + int4store(data+(1+4), static_cast<XID*>(xid)->gtrid_length); + int4store(data+(1+4+4), static_cast<XID*>(xid)->bqual_length); + + DBUG_ASSERT(xid_subheader_no_data == sizeof(data) - 1); + + return write_header(sizeof(one_phase_byte) + xid_subheader_no_data + + static_cast<XID*>(xid)->gtrid_length + + static_cast<XID*>(xid)->bqual_length) || + write_data(data, sizeof(data)) || + write_data((uchar*) static_cast<XID*>(xid)->data, + static_cast<XID*>(xid)->gtrid_length + + static_cast<XID*>(xid)->bqual_length) || + write_footer(); +} + + /************************************************************************** User_var_log_event methods **************************************************************************/ @@ -8306,7 +8465,6 @@ bool event_that_should_be_ignored(const char *buf) event_type == PREVIOUS_GTIDS_LOG_EVENT || event_type == TRANSACTION_CONTEXT_EVENT || event_type == VIEW_CHANGE_EVENT || - event_type == XA_PREPARE_LOG_EVENT || (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F)) return 1; return 0; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index f875ab2b4a4..6d89651b067 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -672,12 +672,14 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi) static int is_group_ending(Log_event *ev, Log_event_type event_type) { - if (event_type == XID_EVENT) + if (event_type == XID_EVENT || event_type == XA_PREPARE_LOG_EVENT) return 1; if (event_type == QUERY_EVENT) // COMMIT/ROLLBACK are never compressed { Query_log_event *qev = (Query_log_event *)ev; - if (qev->is_commit()) + if (qev->is_commit() || + !strncmp(qev->query, STRING_WITH_LEN("XA COMMIT")) || + !strncmp(qev->query, STRING_WITH_LEN("XA ROLLBACK"))) return 1; if (qev->is_rollback()) return 2; @@ -2088,23 +2090,34 @@ rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt) and the LOCK_rpl_thread must be released with THD::EXIT_COND() instead of mysql_mutex_unlock. - If the flag `reuse' is set, the last worker thread will be returned again, + When `gtid_ev' is not NULL the last worker thread will be returned again, if it is still available. Otherwise a new worker thread is allocated. + + A worker for XA transaction is determined through xid hashing which + ensure for a XA-complete to be scheduled to the same-xid XA-prepare worker. */ rpl_parallel_thread * rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, - PSI_stage_info *old_stage, bool reuse) + PSI_stage_info *old_stage, + Gtid_log_event *gtid_ev) { uint32 idx; Relay_log_info *rli= rgi->rli; rpl_parallel_thread *thr; idx= rpl_thread_idx; - if (!reuse) + if (gtid_ev) { - ++idx; - if (idx >= rpl_thread_max) - idx= 0; + if (gtid_ev->flags2 & + (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) + idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), + gtid_ev->xid.key_length()) % rpl_thread_max; + else + { + ++idx; + if (idx >= rpl_thread_max) + idx= 0; + } rpl_thread_idx= idx; } thr= rpl_threads[idx]; @@ -2662,7 +2675,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, else { DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION); - if (typ == XID_EVENT || + if (typ == XID_EVENT || typ == XA_PREPARE_LOG_EVENT || (typ == QUERY_EVENT && // COMMIT/ROLLBACK are never compressed (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback()))) @@ -2673,10 +2686,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, } } + Gtid_log_event *gtid_ev= NULL; if (typ == GTID_EVENT) { rpl_gtid gtid; - Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); + gtid_ev= static_cast<Gtid_log_event *>(ev); uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ? 0 : gtid_ev->domain_id); @@ -2715,8 +2729,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, instead re-use a thread that we queued for previously. */ cur_thread= - e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, - typ != GTID_EVENT); + e->choose_thread(serial_rgi, &did_enter_cond, &old_stage, gtid_ev); if (!cur_thread) { /* This means we were killed. The error is already signalled. */ @@ -2734,7 +2747,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev, if (typ == GTID_EVENT) { - Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); bool new_gco; enum_slave_parallel_mode mode= rli->mi->parallel_mode; uchar gtid_flags= gtid_ev->flags2; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 4579d0da9bc..9d9fa63125c 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -345,7 +345,8 @@ struct rpl_parallel_entry { group_commit_orderer *current_gco; rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, - PSI_stage_info *old_stage, bool reuse); + PSI_stage_info *old_stage, + Gtid_log_event *gtid_ev); int queue_master_restart(rpl_group_info *rgi, Format_description_log_event *fdev); }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 01093d9a6da..c8914fcb5b0 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -35,7 +35,7 @@ #include "sql_table.h" static int count_relay_log_space(Relay_log_info* rli); - +bool xa_trans_force_rollback(THD *thd); /** Current replication state (hash of last GTID executed, per replication domain). @@ -2234,6 +2234,13 @@ void rpl_group_info::cleanup_context(THD *thd, bool error) if (unlikely(error)) { + /* + trans_rollback above does not rollback XA transactions + (todo/fixme consider to do so. + */ + if (thd->transaction.xid_state.is_explicit_XA()) + xa_trans_force_rollback(thd); + thd->mdl_context.release_transactional_locks(); if (thd == rli->sql_driver_thd) diff --git a/sql/slave.cc b/sql/slave.cc index 436d5e0b5c5..e9af0f75186 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -4241,7 +4241,7 @@ inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev) rli->clear_flag(Relay_log_info::IN_TRANSACTION); } } - if (typ == XID_EVENT) + if (typ == XID_EVENT || typ == XA_PREPARE_LOG_EVENT) rli->clear_flag(Relay_log_info::IN_TRANSACTION); if (typ == GTID_EVENT && !(((Gtid_log_event*) ev)->flags2 & Gtid_log_event::FL_STANDALONE)) @@ -7111,6 +7111,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) buf[EVENT_TYPE_OFFSET])) || (!mi->last_queued_gtid_standalone && ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || + (uchar)buf[EVENT_TYPE_OFFSET] == XA_PREPARE_LOG_EVENT || ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(buf, event_len, checksum_alg)))))) diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index f11b3a35a80..b8de0d411e0 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1652,7 +1652,7 @@ is_until_reached(binlog_send_info *info, ulong *ev_offset, return false; break; case GTID_UNTIL_STOP_AFTER_TRANSACTION: - if (event_type != XID_EVENT && + if (event_type != XID_EVENT && event_type != XA_PREPARE_LOG_EVENT && (event_type != QUERY_EVENT || /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ !Query_log_event::peek_is_commit_rollback (info->packet->ptr()+*ev_offset, @@ -1887,7 +1887,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type, info->gtid_skip_group= GTID_SKIP_NOT; return NULL; case GTID_SKIP_TRANSACTION: - if (event_type == XID_EVENT || + if (event_type == XID_EVENT || event_type == XA_PREPARE_LOG_EVENT || (event_type == QUERY_EVENT && /* QUERY_COMPRESSED_EVENT would never be commmit or rollback */ Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset, len - ev_offset, diff --git a/sql/xa.cc b/sql/xa.cc index 4d9846d2f4d..cde6350f38d 100644 --- a/sql/xa.cc +++ b/sql/xa.cc @@ -22,12 +22,11 @@ #include <pfs_transaction_provider.h> #include <mysql/psi/mysql_transaction.h> +static bool slave_applier_reset_xa_trans(THD *thd); + /*************************************************************************** Handling of XA id caching ***************************************************************************/ -enum xa_states { XA_ACTIVE= 0, XA_IDLE, XA_PREPARED, XA_ROLLBACK_ONLY }; - - struct XID_cache_insert_element { enum xa_states xa_state; @@ -159,6 +158,12 @@ static LF_HASH xid_cache; static bool xid_cache_inited; +enum xa_states XID_STATE::get_state_code() const +{ + return xid_cache_element ? xid_cache_element->xa_state : XA_NO_STATE; +} + + bool THD::fix_xid_hash_pins() { if (!xid_hash_pins) @@ -177,9 +182,8 @@ void XID_STATE::set_error(uint error) void XID_STATE::er_xaer_rmfail() const { static const char *xa_state_names[]= - { "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY" }; - my_error(ER_XAER_RMFAIL, MYF(0), is_explicit_XA() ? - xa_state_names[xid_cache_element->xa_state] : "NON-EXISTING"); + { "ACTIVE", "IDLE", "PREPARED", "ROLLBACK ONLY", "NON-EXISTING"}; + my_error(ER_XAER_RMFAIL, MYF(0), xa_state_names[get_state_code()]); } @@ -381,7 +385,7 @@ static bool xa_trans_rolled_back(XID_cache_element *element) @return TRUE if the rollback failed, FALSE otherwise. */ -static bool xa_trans_force_rollback(THD *thd) +bool xa_trans_force_rollback(THD *thd) { bool rc= false; @@ -390,8 +394,8 @@ static bool xa_trans_force_rollback(THD *thd) my_error(ER_XAER_RMERR, MYF(0)); rc= true; } - - thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); + thd->variables.option_bits&= + ~(OPTION_BEGIN | OPTION_KEEP_LOG | OPTION_GTID_BEGIN); thd->transaction.all.reset(); thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); @@ -500,6 +504,8 @@ bool trans_xa_end(THD *thd) bool trans_xa_prepare(THD *thd) { + int res= 1; + DBUG_ENTER("trans_xa_prepare"); if (!thd->transaction.xid_state.is_explicit_XA() || @@ -507,19 +513,41 @@ bool trans_xa_prepare(THD *thd) thd->transaction.xid_state.er_xaer_rmfail(); else if (!thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) my_error(ER_XAER_NOTA, MYF(0)); - else if (ha_prepare(thd)) - { - xid_cache_delete(thd, &thd->transaction.xid_state); - my_error(ER_XA_RBROLLBACK, MYF(0)); - } else { - thd->transaction.xid_state.xid_cache_element->xa_state= XA_PREPARED; - MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_PREPARED); + /* + Acquire metadata lock which will ensure that COMMIT is blocked + by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in + progress blocks FTWRL). + + We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does. + */ + MDL_request mdl_request; + MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_STATEMENT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout) || + ha_prepare(thd)) + { + if (!mdl_request.ticket) + ha_rollback_trans(thd, TRUE); + thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); + thd->transaction.all.reset(); + thd->server_status&= + ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); + xid_cache_delete(thd, &thd->transaction.xid_state); + my_error(ER_XA_RBROLLBACK, MYF(0)); + } + else + { + thd->transaction.xid_state.xid_cache_element->xa_state= XA_PREPARED; + MYSQL_SET_TRANSACTION_XA_STATE(thd->m_transaction_psi, XA_PREPARED); + res= thd->variables.pseudo_slave_mode || thd->slave_thread ? + slave_applier_reset_xa_trans(thd) : 0; + } } - DBUG_RETURN(thd->is_error() || - thd->transaction.xid_state.xid_cache_element->xa_state != XA_PREPARED); + DBUG_RETURN(res); } @@ -534,11 +562,13 @@ bool trans_xa_prepare(THD *thd) bool trans_xa_commit(THD *thd) { - bool res= TRUE; + bool res= true; + XID_STATE &xid_state= thd->transaction.xid_state; + DBUG_ENTER("trans_xa_commit"); - if (!thd->transaction.xid_state.is_explicit_XA() || - !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) + if (!xid_state.is_explicit_XA() || + !xid_state.xid_cache_element->xid.eq(thd->lex->xid)) { if (thd->in_multi_stmt_transaction_mode()) { @@ -567,7 +597,44 @@ bool trans_xa_commit(THD *thd) if (auto xs= xid_cache_search(thd, thd->lex->xid)) { res= xa_trans_rolled_back(xs); + /* + Acquire metadata lock which will ensure that COMMIT is blocked + by active FLUSH TABLES WITH READ LOCK (and vice versa COMMIT in + progress blocks FTWRL). + + We allow FLUSHer to COMMIT; we assume FLUSHer knows what it does. + */ + MDL_request mdl_request; + MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_STATEMENT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout)) + { + /* + We can't rollback an XA transaction on lock failure due to + Innodb redo log and bin log update is involved in rollback. + Return error to user for a retry. + */ + DBUG_ASSERT(thd->is_error()); + + xs->acquired_to_recovered(); + DBUG_RETURN(true); + } + DBUG_ASSERT(!xid_state.xid_cache_element); + + if (thd->wait_for_prior_commit()) + { + DBUG_ASSERT(thd->is_error()); + + xs->acquired_to_recovered(); + DBUG_RETURN(true); + } + + xid_state.xid_cache_element= xs; ha_commit_or_rollback_by_xid(thd->lex->xid, !res); + xid_state.xid_cache_element= 0; + + res= res || thd->is_error(); xid_cache_delete(thd, xs); } else @@ -575,12 +642,12 @@ bool trans_xa_commit(THD *thd) DBUG_RETURN(res); } - if (xa_trans_rolled_back(thd->transaction.xid_state.xid_cache_element)) + if (xa_trans_rolled_back(xid_state.xid_cache_element)) { xa_trans_force_rollback(thd); DBUG_RETURN(thd->is_error()); } - else if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_IDLE && + else if (xid_state.xid_cache_element->xa_state == XA_IDLE && thd->lex->xa_opt == XA_ONE_PHASE) { int r= ha_commit_trans(thd, TRUE); @@ -609,15 +676,19 @@ bool trans_xa_commit(THD *thd) if (thd->mdl_context.acquire_lock(&mdl_request, thd->variables.lock_wait_timeout)) { - ha_rollback_trans(thd, TRUE); + /* + We can't rollback an XA transaction on lock failure due to + Innodb redo log and bin log update is involved in rollback. + Return error to user for a retry. + */ my_error(ER_XAER_RMERR, MYF(0)); + DBUG_RETURN(true); } else { DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock"); - res= MY_TEST(ha_commit_one_phase(thd, 1)); - if (res) + if ((res= MY_TEST(ha_commit_one_phase(thd, 1)))) my_error(ER_XAER_RMERR, MYF(0)); else { @@ -633,7 +704,7 @@ bool trans_xa_commit(THD *thd) } else { - thd->transaction.xid_state.er_xaer_rmfail(); + xid_state.er_xaer_rmfail(); DBUG_RETURN(TRUE); } @@ -642,7 +713,7 @@ bool trans_xa_commit(THD *thd) thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); - xid_cache_delete(thd, &thd->transaction.xid_state); + xid_cache_delete(thd, &xid_state); trans_track_end_trx(thd); /* The transaction should be marked as complete in P_S. */ @@ -662,10 +733,12 @@ bool trans_xa_commit(THD *thd) bool trans_xa_rollback(THD *thd) { + XID_STATE &xid_state= thd->transaction.xid_state; + DBUG_ENTER("trans_xa_rollback"); - if (!thd->transaction.xid_state.is_explicit_XA() || - !thd->transaction.xid_state.xid_cache_element->xid.eq(thd->lex->xid)) + if (!xid_state.is_explicit_XA() || + !xid_state.xid_cache_element->xid.eq(thd->lex->xid)) { if (thd->in_multi_stmt_transaction_mode()) { @@ -680,8 +753,35 @@ bool trans_xa_rollback(THD *thd) if (auto xs= xid_cache_search(thd, thd->lex->xid)) { + MDL_request mdl_request; + MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_STATEMENT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout)) + { + /* + We can't rollback an XA transaction on lock failure due to + Innodb redo log and bin log update is involved in rollback. + Return error to user for a retry. + */ + DBUG_ASSERT(thd->is_error()); + + xs->acquired_to_recovered(); + DBUG_RETURN(true); + } xa_trans_rolled_back(xs); + DBUG_ASSERT(!xid_state.xid_cache_element); + + if (thd->wait_for_prior_commit()) + { + DBUG_ASSERT(thd->is_error()); + xs->acquired_to_recovered(); + DBUG_RETURN(true); + } + + xid_state.xid_cache_element= xs; ha_commit_or_rollback_by_xid(thd->lex->xid, 0); + xid_state.xid_cache_element= 0; xid_cache_delete(thd, xs); } else @@ -689,11 +789,27 @@ bool trans_xa_rollback(THD *thd) DBUG_RETURN(thd->get_stmt_da()->is_error()); } - if (thd->transaction.xid_state.xid_cache_element->xa_state == XA_ACTIVE) + if (xid_state.xid_cache_element->xa_state == XA_ACTIVE) { - thd->transaction.xid_state.er_xaer_rmfail(); + xid_state.er_xaer_rmfail(); DBUG_RETURN(TRUE); } + + MDL_request mdl_request; + MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_STATEMENT); + if (thd->mdl_context.acquire_lock(&mdl_request, + thd->variables.lock_wait_timeout)) + { + /* + We can't rollback an XA transaction on lock failure due to + Innodb redo log and bin log update is involved in rollback. + Return error to user for a retry. + */ + my_error(ER_XAER_RMERR, MYF(0)); + DBUG_RETURN(true); + } + DBUG_RETURN(xa_trans_force_rollback(thd)); } @@ -701,11 +817,15 @@ bool trans_xa_rollback(THD *thd) bool trans_xa_detach(THD *thd) { DBUG_ASSERT(thd->transaction.xid_state.is_explicit_XA()); -#if 1 - return xa_trans_force_rollback(thd); -#else + if (thd->transaction.xid_state.xid_cache_element->xa_state != XA_PREPARED) return xa_trans_force_rollback(thd); + else if (!thd->transaction.all.is_trx_read_write()) + { + thd->transaction.xid_state.set_error(ER_XA_RBROLLBACK); + ha_rollback_trans(thd, true); + } + thd->transaction.xid_state.xid_cache_element->acquired_to_recovered(); thd->transaction.xid_state.xid_cache_element= 0; thd->transaction.cleanup(); @@ -722,7 +842,6 @@ bool trans_xa_detach(THD *thd) thd->transaction.all.ha_list= 0; thd->transaction.all.no_2pc= 0; return false; -#endif } @@ -916,3 +1035,45 @@ bool mysql_xa_recover(THD *thd) my_eof(thd); DBUG_RETURN(0); } + + +/** + This is a specific to (pseudo-) slave applier collection of standard cleanup + actions to reset XA transaction state sim to @c ha_commit_one_phase. + THD of the slave applier is dissociated from a transaction object in engine + that continues to exist there. + + @param THD current thread + @return the value of is_error() +*/ + +static bool slave_applier_reset_xa_trans(THD *thd) +{ + thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); + thd->server_status&= + ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); + DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS")); + + thd->transaction.xid_state.xid_cache_element->acquired_to_recovered(); + thd->transaction.xid_state.xid_cache_element= 0; + + for (Ha_trx_info *ha_info= thd->transaction.all.ha_list, *ha_info_next; + ha_info; ha_info= ha_info_next) + { + ha_info_next= ha_info->next(); + ha_info->reset(); + } + thd->transaction.all.ha_list= 0; + + ha_close_connection(thd); + thd->transaction.cleanup(); + thd->transaction.all.reset(); + + DBUG_ASSERT(!thd->transaction.all.ha_list); + DBUG_ASSERT(!thd->transaction.all.no_2pc); + + thd->has_waiter= false; + MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi); // TODO/Fixme: commit? + thd->m_transaction_psi= NULL; + return thd->is_error(); +} @@ -1,3 +1,5 @@ +#ifndef XA_INCLUDED +#define XA_INCLUDED /* Copyright (c) 2000, 2016, Oracle and/or its affiliates. Copyright (c) 2009, 2019, MariaDB Corporation. @@ -16,8 +18,15 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ - class XID_cache_element; +enum xa_states +{ + XA_ACTIVE= 0, + XA_IDLE, + XA_PREPARED, + XA_ROLLBACK_ONLY, + XA_NO_STATE +}; struct XID_STATE { XID_cache_element *xid_cache_element; @@ -27,6 +36,7 @@ struct XID_STATE { void set_error(uint error); void er_xaer_rmfail() const; XID *get_xid() const; + enum xa_states get_state_code() const; }; void xid_cache_init(void); @@ -42,3 +52,5 @@ bool trans_xa_commit(THD *thd); bool trans_xa_rollback(THD *thd); bool trans_xa_detach(THD *thd); bool mysql_xa_recover(THD *thd); + +#endif /* XA_INCLUDED */ |