diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 264 |
1 files changed, 223 insertions, 41 deletions
diff --git a/sql/log.cc b/sql/log.cc index 355118dc701..3e7f3a043c3 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -90,7 +90,13 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton, static int binlog_commit(handlerton *hton, THD *thd, bool all); static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); +static int binlog_xa_recover_dummy(handlerton *hton, XID *xid_list, uint len); +static int binlog_commit_by_xid(handlerton *hton, XID *xid); +static int binlog_rollback_by_xid(handlerton *hton, XID *xid); static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); +static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, + Log_event *end_ev, bool all, bool using_stmt, + bool using_trx); static const LEX_CSTRING write_error_msg= { STRING_WITH_LEN("error writing to the binary log") }; @@ -1692,6 +1698,10 @@ int binlog_init(void *p) { binlog_hton->prepare= binlog_prepare; binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; + binlog_hton->commit_by_xid= binlog_commit_by_xid; + binlog_hton->rollback_by_xid= binlog_rollback_by_xid; + // recover needs to be set to make xa{commit,rollback}_handlerton effective + binlog_hton->recover= binlog_xa_recover_dummy; } binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; return 0; @@ -1765,7 +1775,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, DBUG_PRINT("enter", ("end_ev: %p", end_ev)); if ((using_stmt && !cache_mngr->stmt_cache.empty()) || - (using_trx && !cache_mngr->trx_cache.empty())) + (using_trx && !cache_mngr->trx_cache.empty()) || + thd->transaction.xid_state.is_explicit_XA()) { if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE)) DBUG_RETURN(1); @@ -1837,6 +1848,17 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all, DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); } + +inline size_t serialize_with_xid(XID *xid, char *buf, + const char *query, size_t q_len) +{ + memcpy(buf, query, q_len); + + return + q_len + strlen(static_cast<event_xid_t*>(xid)->serialize(buf + q_len)); +} + + /** This function flushes the trx-cache upon commit. @@ -1850,11 +1872,28 @@ static inline int binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { DBUG_ENTER("binlog_commit_flush_trx_cache"); - Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - TRUE, TRUE, TRUE, 0); + + const char query[]= "XA COMMIT "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]= "COMMIT"; + size_t buflen= sizeof("COMMIT") - 1; + + if (thd->lex->sql_command == SQLCOM_XA_COMMIT && + thd->lex->xa_opt != XA_ONE_PHASE) + { + DBUG_ASSERT(thd->transaction.xid_state.is_explicit_XA()); + DBUG_ASSERT(thd->transaction.xid_state.get_state_code() == + XA_PREPARED); + + buflen= serialize_with_xid(thd->transaction.xid_state.get_xid(), + buf, query, q_len); + } + Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); + DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } + /** This function flushes the trx-cache upon rollback. @@ -1868,8 +1907,20 @@ static inline int binlog_rollback_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { - Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"), - TRUE, TRUE, TRUE, 0); + const char query[]= "XA ROLLBACK "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]= "ROLLBACK"; + size_t buflen= sizeof("ROLLBACK") - 1; + + if (thd->transaction.xid_state.is_explicit_XA()) + { + /* for not prepared use plain ROLLBACK */ + if (thd->transaction.xid_state.get_state_code() == XA_PREPARED) + buflen= serialize_with_xid(thd->transaction.xid_state.get_xid(), + buf, query, q_len); + } + Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } @@ -1887,23 +1938,10 @@ static inline int binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, bool all, my_xid xid) { - if (xid) - { - Xid_log_event end_evt(thd, xid, TRUE); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); - } - else - { - /* - Empty xid occurs in XA COMMIT ... ONE PHASE. - In this case, we do not have a MySQL xid for the transaction, and the - external XA transaction coordinator will have to handle recovery if - needed. So we end the transaction with a plain COMMIT query event. - */ - Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - TRUE, TRUE, TRUE, 0); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); - } + DBUG_ASSERT(xid); // replaced former treatment of ONE-PHASE XA + + Xid_log_event end_evt(thd, xid, TRUE); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } /** @@ -1959,17 +1997,62 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) DBUG_RETURN(error); } + +inline bool is_preparing_xa(THD *thd) +{ + return + thd->transaction.xid_state.is_explicit_XA() && + thd->lex->sql_command == SQLCOM_XA_PREPARE; +} + + static int binlog_prepare(handlerton *hton, THD *thd, bool all) { - /* - do nothing. - just pretend we can do 2pc, so that MySQL won't - switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_and_order() - */ + /* Do nothing unless the transaction is a user XA. */ + return is_preparing_xa(thd) ? binlog_commit(NULL, thd, all) : 0; +} + + +static int binlog_xa_recover_dummy(handlerton *hton __attribute__((unused)), + XID *xid_list __attribute__((unused)), + uint len __attribute__((unused))) +{ + /* Does nothing. */ return 0; } + +static int binlog_commit_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + + (void) thd->binlog_setup_trx_data(); + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT); + + return binlog_commit(hton, thd, TRUE); +} + + +static int binlog_rollback_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + + (void) thd->binlog_setup_trx_data(); + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK || + (thd->transaction.xid_state.get_state_code() == XA_ROLLBACK_ONLY)); + return binlog_rollback(hton, thd, TRUE); +} + + +inline bool is_prepared_xa(THD *thd) +{ + return thd->transaction.xid_state.is_explicit_XA() && + thd->transaction.xid_state.get_state_code() == XA_PREPARED; +} + + /* We flush the cache wrapped in a beging/rollback if: . aborting a single or multi-statement transaction and; @@ -1992,7 +2075,50 @@ static bool trans_cannot_safely_rollback(THD *thd, bool all) thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED) || (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && - thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED)); + thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED) || + is_prepared_xa(thd)); +} + + +/** + Specific log flusher invoked through log_xa_prepare(). +*/ +static int binlog_commit_flush_xa_prepare(THD *thd, bool all, + binlog_cache_mngr *cache_mngr) +{ + XID *xid= thd->transaction.xid_state.get_xid(); + { + // todo assert wsrep_simulate || is_open() + + /* + Log the XA END event first. + We don't do that in trans_xa_end() as XA COMMIT ONE PHASE + is logged as simple BEGIN/COMMIT so the XA END should + not get to the log. + */ + const char query[]= "XA END "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]; + size_t buflen; + binlog_cache_data *cache_data; + IO_CACHE *file; + + memcpy(buf, query, q_len); + buflen= q_len + + strlen(static_cast<event_xid_t*>(xid)->serialize(buf + q_len)); + cache_data= cache_mngr->get_binlog_cache_data(true); + file= &cache_data->cache_log; + thd->lex->sql_command= SQLCOM_XA_END; + Query_log_event xa_end(thd, buf, buflen, true, false, true, 0); + if (mysql_bin_log.write_event(&xa_end, cache_data, file)) + return 1; + thd->lex->sql_command= SQLCOM_XA_PREPARE; + } + + cache_mngr->using_xa= FALSE; + XA_prepare_log_event end_evt(thd, xid, FALSE); + + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } @@ -2019,7 +2145,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) if (!cache_mngr) { - DBUG_ASSERT(WSREP(thd)); + DBUG_ASSERT(WSREP(thd) || + (thd->lex->sql_command != SQLCOM_XA_PREPARE && + !(thd->lex->sql_command == SQLCOM_XA_COMMIT && + thd->lex->xa_opt == XA_ONE_PHASE))); + DBUG_RETURN(0); } @@ -2038,7 +2168,8 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); } - if (cache_mngr->trx_cache.empty()) + if (cache_mngr->trx_cache.empty() && + thd->transaction.xid_state.get_state_code() != XA_PREPARED) { /* we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() @@ -2055,8 +2186,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) Otherwise, we accumulate the changes. */ if (likely(!error) && ending_trans(thd, all)) - error= binlog_commit_flush_trx_cache(thd, all, cache_mngr); - + { + error= is_preparing_xa(thd) ? + binlog_commit_flush_xa_prepare(thd, all, cache_mngr) : + binlog_commit_flush_trx_cache (thd, all, cache_mngr); + } /* This is part of the stmt rollback. */ @@ -2080,6 +2214,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); + int error= 0; binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); @@ -2087,6 +2222,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) if (!cache_mngr) { DBUG_ASSERT(WSREP(thd)); + DBUG_ASSERT(thd->lex->sql_command != SQLCOM_XA_ROLLBACK); + DBUG_RETURN(0); } @@ -2101,15 +2238,16 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ if (cache_mngr->stmt_cache.has_incident()) { - error= mysql_bin_log.write_incident(thd); + error |= static_cast<int>(mysql_bin_log.write_incident(thd)); cache_mngr->reset(true, false); } else if (!cache_mngr->stmt_cache.empty()) { - error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); + error |= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); } - if (cache_mngr->trx_cache.empty()) + if (cache_mngr->trx_cache.empty() && + thd->transaction.xid_state.get_state_code() != XA_PREPARED) { /* we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() @@ -7344,10 +7482,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, entry.all= all; entry.using_stmt_cache= using_stmt_cache; entry.using_trx_cache= using_trx_cache; - entry.need_unlog= false; + entry.need_unlog= is_preparing_xa(thd); ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; - for (; ha_info; ha_info= ha_info->next()) + for (; !entry.need_unlog && ha_info; ha_info= ha_info->next()) { if (ha_info->is_started() && ha_info->ht() != binlog_hton && !ha_info->ht()->commit_checkpoint_request) @@ -7920,7 +8058,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) We already checked before that at least one cache is non-empty; if both are empty we would have skipped calling into here. */ - DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); + DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || + !cache_mngr->trx_cache.empty() || + current->thd->transaction.xid_state.is_explicit_XA()); if (unlikely((current->error= write_transaction_or_stmt(current, commit_id)))) @@ -7929,7 +8069,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) strmake_buf(cache_mngr->last_commit_pos_file, log_file_name); commit_offset= my_b_write_tell(&log_file); cache_mngr->last_commit_pos_offset= commit_offset; - if (cache_mngr->using_xa && cache_mngr->xa_xid) + if ((cache_mngr->using_xa && cache_mngr->xa_xid) || current->need_unlog) { /* If all storage engines support commit_checkpoint_request(), then we @@ -8164,7 +8304,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, binlog_cache_mngr *mngr= entry->cache_mngr; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_or_stmt"); - if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id)) + if (write_gtid_event(entry->thd, is_prepared_xa(entry->thd), + entry->using_trx_cache, commit_id)) DBUG_RETURN(ER_ERROR_ON_WRITE); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && @@ -9868,6 +10009,46 @@ int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie)); } +static bool write_empty_xa_prepare(THD *thd, binlog_cache_mngr *cache_mngr) +{ + return binlog_commit_flush_xa_prepare(thd, true, cache_mngr); +} + +int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all) +{ + DBUG_ASSERT(is_preparing_xa(thd)); + + binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); + int cookie= 0; + + if (!cache_mngr->need_unlog) + { + Ha_trx_info *ha_info; + uint rw_count= ha_count_rw_all(thd, &ha_info); + bool rc= false; + + if (rw_count > 0) + { + /* an empty XA-prepare event group is logged */ +#ifndef DBUG_OFF + for (ha_info= thd->transaction.all.ha_list; rw_count > 1 && ha_info; + ha_info= ha_info->next()) + DBUG_ASSERT(ha_info->ht() != binlog_hton); +#endif + rc= write_empty_xa_prepare(thd, cache_mngr); // normally gains need_unlog + trans_register_ha(thd, true, binlog_hton, 0); // do it for future commmit + } + if (rw_count == 0 || !cache_mngr->need_unlog) + return rc; + } + + cookie= BINLOG_COOKIE_MAKE(cache_mngr->binlog_id, cache_mngr->delayed_error); + cache_mngr->need_unlog= false; + + return unlog(cookie, 1); +} + + void TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie) { @@ -10184,6 +10365,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, ((last_gtid_standalone && !ev->is_part_of_group(typ)) || (!last_gtid_standalone && (typ == XID_EVENT || + typ == XA_PREPARE_LOG_EVENT || (LOG_EVENT_IS_QUERY(typ) && (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback())))))) |