diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 838 |
1 files changed, 537 insertions, 301 deletions
diff --git a/sql/log.cc b/sql/log.cc index 93ed664bff9..3cfe6bbfd37 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1,5 +1,5 @@ /* Copyright (c) 2000, 2018, Oracle and/or its affiliates. - Copyright (c) 2009, 2020, MariaDB Corporation. + Copyright (c) 2009, 2021, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -34,7 +34,6 @@ #include "sql_parse.h" // command_name #include "sql_time.h" // calc_time_from_sec, my_time_compare #include "tztime.h" // my_tz_OFFSET0, struct Time_zone -#include "sql_acl.h" // SUPER_ACL #include "log_event.h" // Query_log_event #include "rpl_filter.h" #include "rpl_rli.h" @@ -91,7 +90,13 @@ static bool binlog_savepoint_rollback_can_release_mdl(handlerton *hton, static int binlog_commit(handlerton *hton, THD *thd, bool all); static int binlog_rollback(handlerton *hton, THD *thd, bool all); static int binlog_prepare(handlerton *hton, THD *thd, bool all); +static int binlog_xa_recover_dummy(handlerton *hton, XID *xid_list, uint len); +static int binlog_commit_by_xid(handlerton *hton, XID *xid); +static int binlog_rollback_by_xid(handlerton *hton, XID *xid); static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); +static int binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, + Log_event *end_ev, bool all, bool using_stmt, + bool using_trx); static const LEX_CSTRING write_error_msg= { STRING_WITH_LEN("error writing to the binary log") }; @@ -253,7 +258,7 @@ void make_default_log_name(char **out, const char* log_ext, bool once) else { my_free(*out); - *out= my_strdup(buff, MYF(MY_WME)); + *out= my_strdup(PSI_INSTRUMENT_ME, buff, MYF(MY_WME)); } } @@ -388,12 +393,12 @@ public: In the future, we can refactor this and change it to avoid the set_binlog_info. */ - DBUG_ASSERT(saved_max_binlog_cache_size == 0 && - param_max_binlog_cache_size != 0 && - ptr_binlog_cache_use == 0 && - param_ptr_binlog_cache_use != 0 && - ptr_binlog_cache_disk_use == 0 && - param_ptr_binlog_cache_disk_use != 0); + DBUG_ASSERT(saved_max_binlog_cache_size == 0); + DBUG_ASSERT(param_max_binlog_cache_size != 0); + DBUG_ASSERT(ptr_binlog_cache_use == 0); + DBUG_ASSERT(param_ptr_binlog_cache_use != 0); + DBUG_ASSERT(ptr_binlog_cache_disk_use == 0); + DBUG_ASSERT(param_ptr_binlog_cache_disk_use != 0); saved_max_binlog_cache_size= param_max_binlog_cache_size; ptr_binlog_cache_use= param_ptr_binlog_cache_use; @@ -509,6 +514,12 @@ void Log_event_writer::add_status(enum_logged_status status) cache_data->add_status(status); } +void Log_event_writer::set_incident() +{ + cache_data->set_incident(); +} + + class binlog_cache_mngr { public: binlog_cache_mngr(my_off_t param_max_binlog_stmt_cache_size, @@ -711,7 +722,6 @@ bool Log_to_csv_event_handler:: uint field_index; Silence_log_table_errors error_handler; Open_tables_backup open_tables_backup; - ulonglong save_thd_options; bool save_time_zone_used; DBUG_ENTER("log_general"); @@ -721,9 +731,6 @@ bool Log_to_csv_event_handler:: */ save_time_zone_used= thd->time_zone_used; - save_thd_options= thd->variables.option_bits; - thd->variables.option_bits&= ~OPTION_BIN_LOG; - table_list.init_one_table(&MYSQL_SCHEMA_NAME, &GENERAL_LOG_NAME, 0, TL_WRITE_CONCURRENT_INSERT); @@ -771,7 +778,7 @@ bool Log_to_csv_event_handler:: DBUG_ASSERT(table->field[0]->type() == MYSQL_TYPE_TIMESTAMP); - ((Field_timestamp*) table->field[0])->store_TIME( + table->field[0]->store_timestamp( hrtime_to_my_time(event_time), hrtime_sec_part(event_time)); /* do a write */ @@ -803,7 +810,6 @@ bool Log_to_csv_event_handler:: table->field[field_index]->set_default(); } - /* log table entries are not replicated */ if (table->file->ha_write_row(table->record[0])) goto err; @@ -824,7 +830,6 @@ err: if (need_close) close_log_table(thd, &open_tables_backup); - thd->variables.option_bits= save_thd_options; thd->time_zone_used= save_time_zone_used; DBUG_RETURN(result); } @@ -877,7 +882,6 @@ bool Log_to_csv_event_handler:: ulong lock_time= (ulong) MY_MIN(lock_utime/1000000, TIME_MAX_VALUE_SECONDS); ulong query_time_micro= (ulong) (query_utime % 1000000); ulong lock_time_micro= (ulong) (lock_utime % 1000000); - DBUG_ENTER("Log_to_csv_event_handler::log_slow"); thd->push_internal_handler(& error_handler); @@ -912,7 +916,7 @@ bool Log_to_csv_event_handler:: /* store the time and user values */ DBUG_ASSERT(table->field[0]->type() == MYSQL_TYPE_TIMESTAMP); - ((Field_timestamp*) table->field[0])->store_TIME( + table->field[0]->store_timestamp( hrtime_to_my_time(current_time), hrtime_sec_part(current_time)); if (table->field[1]->store(user_host, user_host_len, client_cs)) goto err; @@ -994,7 +998,6 @@ bool Log_to_csv_event_handler:: 0, TRUE)) goto err; - /* log table entries are not replicated */ if (table->file->ha_write_row(table->record[0])) goto err; @@ -1354,7 +1357,7 @@ bool LOGGER::slow_log_print(THD *thd, const char *query, size_t query_length, my_hrtime_t current_time= { hrtime_from_time(thd->start_time) + thd->start_time_sec_part + query_utime }; - if (!query) + if (!query || thd->get_command() == COM_STMT_PREPARE) { is_command= TRUE; query= command_name[thd->get_command()].str; @@ -1683,9 +1686,6 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) int binlog_init(void *p) { binlog_hton= (handlerton *)p; - binlog_hton->state= (WSREP_ON || opt_bin_log) ? SHOW_OPTION_YES - : SHOW_OPTION_NO; - binlog_hton->db_type=DB_TYPE_BINLOG; binlog_hton->savepoint_offset= sizeof(my_off_t); binlog_hton->close_connection= binlog_close_connection; binlog_hton->savepoint_set= binlog_savepoint_set; @@ -1694,9 +1694,17 @@ int binlog_init(void *p) binlog_savepoint_rollback_can_release_mdl; binlog_hton->commit= binlog_commit; binlog_hton->rollback= binlog_rollback; - binlog_hton->prepare= binlog_prepare; - binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; - binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; + binlog_hton->drop_table= [](handlerton *, const char*) { return -1; }; + if (WSREP_ON || opt_bin_log) + { + binlog_hton->prepare= binlog_prepare; + binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; + binlog_hton->commit_by_xid= binlog_commit_by_xid; + binlog_hton->rollback_by_xid= binlog_rollback_by_xid; + // recover needs to be set to make xa{commit,rollback}_handlerton effective + binlog_hton->recover= binlog_xa_recover_dummy; + } + binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN | HTON_NO_ROLLBACK; return 0; } @@ -1725,8 +1733,8 @@ static int binlog_close_connection(handlerton *hton, THD *thd) if (len > 0) wsrep_dump_rbr_buf(thd, buf, len); } #endif /* WITH_WSREP */ - DBUG_ASSERT(cache_mngr->trx_cache.empty() && cache_mngr->stmt_cache.empty()); - thd_set_ha_data(thd, binlog_hton, NULL); + DBUG_ASSERT(cache_mngr->trx_cache.empty()); + DBUG_ASSERT(cache_mngr->stmt_cache.empty()); cache_mngr->~binlog_cache_mngr(); my_free(cache_mngr); DBUG_RETURN(0); @@ -1768,7 +1776,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, DBUG_PRINT("enter", ("end_ev: %p", end_ev)); if ((using_stmt && !cache_mngr->stmt_cache.empty()) || - (using_trx && !cache_mngr->trx_cache.empty())) + (using_trx && !cache_mngr->trx_cache.empty()) || + thd->transaction->xid_state.is_explicit_XA()) { if (using_stmt && thd->binlog_flush_pending_rows_event(TRUE, FALSE)) DBUG_RETURN(1); @@ -1805,8 +1814,8 @@ binlog_flush_cache(THD *thd, binlog_cache_mngr *cache_mngr, } cache_mngr->reset(using_stmt, using_trx); - DBUG_ASSERT((!using_stmt || cache_mngr->stmt_cache.empty()) && - (!using_trx || cache_mngr->trx_cache.empty())); + DBUG_ASSERT(!using_stmt || cache_mngr->stmt_cache.empty()); + DBUG_ASSERT(!using_trx || cache_mngr->trx_cache.empty()); DBUG_RETURN(error); } @@ -1840,6 +1849,17 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all, DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); } + +inline size_t serialize_with_xid(XID *xid, char *buf, + const char *query, size_t q_len) +{ + memcpy(buf, query, q_len); + + return + q_len + strlen(static_cast<event_xid_t*>(xid)->serialize(buf + q_len)); +} + + /** This function flushes the trx-cache upon commit. @@ -1853,11 +1873,28 @@ static inline int binlog_commit_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { DBUG_ENTER("binlog_commit_flush_trx_cache"); - Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - TRUE, TRUE, TRUE, 0); + + const char query[]= "XA COMMIT "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]= "COMMIT"; + size_t buflen= sizeof("COMMIT") - 1; + + if (thd->lex->sql_command == SQLCOM_XA_COMMIT && + thd->lex->xa_opt != XA_ONE_PHASE) + { + DBUG_ASSERT(thd->transaction->xid_state.is_explicit_XA()); + DBUG_ASSERT(thd->transaction->xid_state.get_state_code() == + XA_PREPARED); + + buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(), + buf, query, q_len); + } + Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); + DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } + /** This function flushes the trx-cache upon rollback. @@ -1871,8 +1908,20 @@ static inline int binlog_rollback_flush_trx_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { - Query_log_event end_evt(thd, STRING_WITH_LEN("ROLLBACK"), - TRUE, TRUE, TRUE, 0); + const char query[]= "XA ROLLBACK "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]= "ROLLBACK"; + size_t buflen= sizeof("ROLLBACK") - 1; + + if (thd->transaction->xid_state.is_explicit_XA()) + { + /* for not prepared use plain ROLLBACK */ + if (thd->transaction->xid_state.get_state_code() == XA_PREPARED) + buflen= serialize_with_xid(thd->transaction->xid_state.get_xid(), + buf, query, q_len); + } + Query_log_event end_evt(thd, buf, buflen, TRUE, TRUE, TRUE, 0); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, FALSE, TRUE)); } @@ -1890,23 +1939,10 @@ static inline int binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, bool all, my_xid xid) { - if (xid) - { - Xid_log_event end_evt(thd, xid, TRUE); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); - } - else - { - /* - Empty xid occurs in XA COMMIT ... ONE PHASE. - In this case, we do not have a MySQL xid for the transaction, and the - external XA transaction coordinator will have to handle recovery if - needed. So we end the transaction with a plain COMMIT query event. - */ - Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), - TRUE, TRUE, TRUE, 0); - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); - } + DBUG_ASSERT(xid); // replaced former treatment of ONE-PHASE XA + + Xid_log_event end_evt(thd, xid, TRUE); + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } /** @@ -1947,7 +1983,7 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) if (cache_mngr->trx_cache.has_incident()) error= mysql_bin_log.write_incident(thd); - thd->clear_binlog_table_maps(); + thd->reset_binlog_for_next_statement(); cache_mngr->reset(false, true); } @@ -1962,17 +1998,62 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all) DBUG_RETURN(error); } + +inline bool is_preparing_xa(THD *thd) +{ + return + thd->transaction->xid_state.is_explicit_XA() && + thd->lex->sql_command == SQLCOM_XA_PREPARE; +} + + static int binlog_prepare(handlerton *hton, THD *thd, bool all) { - /* - do nothing. - just pretend we can do 2pc, so that MySQL won't - switch to 1pc. - real work will be done in MYSQL_BIN_LOG::log_and_order() - */ + /* Do nothing unless the transaction is a user XA. */ + return is_preparing_xa(thd) ? binlog_commit(NULL, thd, all) : 0; +} + + +static int binlog_xa_recover_dummy(handlerton *hton __attribute__((unused)), + XID *xid_list __attribute__((unused)), + uint len __attribute__((unused))) +{ + /* Does nothing. */ return 0; } + +static int binlog_commit_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + + (void) thd->binlog_setup_trx_data(); + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT); + + return binlog_commit(hton, thd, TRUE); +} + + +static int binlog_rollback_by_xid(handlerton *hton, XID *xid) +{ + THD *thd= current_thd; + + (void) thd->binlog_setup_trx_data(); + + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK || + (thd->transaction->xid_state.get_state_code() == XA_ROLLBACK_ONLY)); + return binlog_rollback(hton, thd, TRUE); +} + + +inline bool is_prepared_xa(THD *thd) +{ + return thd->transaction->xid_state.is_explicit_XA() && + thd->transaction->xid_state.get_state_code() == XA_PREPARED; +} + + /* We flush the cache wrapped in a beging/rollback if: . aborting a single or multi-statement transaction and; @@ -1995,7 +2076,50 @@ static bool trans_cannot_safely_rollback(THD *thd, bool all) thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED) || (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && - thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED)); + thd->wsrep_binlog_format() == BINLOG_FORMAT_MIXED) || + is_prepared_xa(thd)); +} + + +/** + Specific log flusher invoked through log_xa_prepare(). +*/ +static int binlog_commit_flush_xa_prepare(THD *thd, bool all, + binlog_cache_mngr *cache_mngr) +{ + XID *xid= thd->transaction->xid_state.get_xid(); + { + // todo assert wsrep_simulate || is_open() + + /* + Log the XA END event first. + We don't do that in trans_xa_end() as XA COMMIT ONE PHASE + is logged as simple BEGIN/COMMIT so the XA END should + not get to the log. + */ + const char query[]= "XA END "; + const size_t q_len= sizeof(query) - 1; // do not count trailing 0 + char buf[q_len + ser_buf_size]; + size_t buflen; + binlog_cache_data *cache_data; + IO_CACHE *file; + + memcpy(buf, query, q_len); + buflen= q_len + + strlen(static_cast<event_xid_t*>(xid)->serialize(buf + q_len)); + cache_data= cache_mngr->get_binlog_cache_data(true); + file= &cache_data->cache_log; + thd->lex->sql_command= SQLCOM_XA_END; + Query_log_event xa_end(thd, buf, buflen, true, false, true, 0); + if (mysql_bin_log.write_event(&xa_end, cache_data, file)) + return 1; + thd->lex->sql_command= SQLCOM_XA_PREPARE; + } + + cache_mngr->using_xa= FALSE; + XA_prepare_log_event end_evt(thd, xid, FALSE); + + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); } @@ -2022,16 +2146,26 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) if (!cache_mngr) { - DBUG_ASSERT(WSREP(thd)); + DBUG_ASSERT(WSREP(thd) || + (thd->lex->sql_command != SQLCOM_XA_PREPARE && + !(thd->lex->sql_command == SQLCOM_XA_COMMIT && + thd->lex->xa_opt == XA_ONE_PHASE))); + DBUG_RETURN(0); } + /* + This is true if we are doing an alter table that is replicated as + CREATE TABLE ... SELECT + */ + if (thd->variables.option_bits & OPTION_BIN_COMMIT_OFF) + DBUG_RETURN(0); DBUG_PRINT("debug", ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", all, YESNO(thd->in_multi_stmt_transaction_mode()), - YESNO(thd->transaction.all.modified_non_trans_table), - YESNO(thd->transaction.stmt.modified_non_trans_table))); + YESNO(thd->transaction->all.modified_non_trans_table), + YESNO(thd->transaction->stmt.modified_non_trans_table))); thd->backup_stage(&org_stage); @@ -2041,7 +2175,8 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); } - if (cache_mngr->trx_cache.empty()) + if (cache_mngr->trx_cache.empty() && + thd->transaction->xid_state.get_state_code() != XA_PREPARED) { /* we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() @@ -2058,8 +2193,11 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) Otherwise, we accumulate the changes. */ if (likely(!error) && ending_trans(thd, all)) - error= binlog_commit_flush_trx_cache(thd, all, cache_mngr); - + { + error= is_preparing_xa(thd) ? + binlog_commit_flush_xa_prepare(thd, all, cache_mngr) : + binlog_commit_flush_trx_cache (thd, all, cache_mngr); + } /* This is part of the stmt rollback. */ @@ -2083,6 +2221,7 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) static int binlog_rollback(handlerton *hton, THD *thd, bool all) { DBUG_ENTER("binlog_rollback"); + int error= 0; binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); @@ -2090,13 +2229,15 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) if (!cache_mngr) { DBUG_ASSERT(WSREP(thd)); + DBUG_ASSERT(thd->lex->sql_command != SQLCOM_XA_ROLLBACK); + DBUG_RETURN(0); } DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), - YESNO(thd->transaction.all.modified_non_trans_table), - YESNO(thd->transaction.stmt.modified_non_trans_table))); + YESNO(thd->transaction->all.modified_non_trans_table), + YESNO(thd->transaction->stmt.modified_non_trans_table))); /* If an incident event is set we do not flush the content of the statement @@ -2104,20 +2245,22 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ if (cache_mngr->stmt_cache.has_incident()) { - error= mysql_bin_log.write_incident(thd); + error |= static_cast<int>(mysql_bin_log.write_incident(thd)); cache_mngr->reset(true, false); } else if (!cache_mngr->stmt_cache.empty()) { - error= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); + error |= binlog_commit_flush_stmt_cache(thd, all, cache_mngr); } - if (cache_mngr->trx_cache.empty()) + if (cache_mngr->trx_cache.empty() && + thd->transaction->xid_state.get_state_code() != XA_PREPARED) { /* we're here because cache_log was flushed in MYSQL_BIN_LOG::log_xid() */ cache_mngr->reset(false, true); + thd->reset_binlog_for_next_statement(); DBUG_RETURN(error); } if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd)) @@ -2162,6 +2305,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) */ if (!all) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); + thd->reset_binlog_for_next_statement(); DBUG_RETURN(error); } @@ -2214,8 +2358,8 @@ void MYSQL_BIN_LOG::set_write_error(THD *thd, bool is_transactional) if (WSREP_EMULATE_BINLOG(thd)) { if (is_transactional) - trans_register_ha(thd, TRUE, binlog_hton); - trans_register_ha(thd, FALSE, binlog_hton); + trans_register_ha(thd, TRUE, binlog_hton, 0); + trans_register_ha(thd, FALSE, binlog_hton, 0); } #endif /* WITH_WSREP */ DBUG_VOID_RETURN; @@ -2337,14 +2481,14 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) /* When a SAVEPOINT is executed inside a stored function/trigger we force the - pending event to be flushed with a STMT_END_F flag and clear the table maps + pending event to be flushed with a STMT_END_F flag and reset binlog as well to ensure that following DMLs will have a clean state to start with. ROLLBACK inside a stored routine has to finalize possibly existing current row-based pending event with cleaning up table maps. That ensures that following DMLs will have a clean state to start with. */ if (thd->in_sub_stmt) - thd->clear_binlog_table_maps(); + thd->reset_binlog_for_next_statement(); DBUG_RETURN(0); } @@ -2409,8 +2553,8 @@ File open_binlog(IO_CACHE *log, const char *log_file_name, const char **errmsg) *errmsg = "Could not open log file"; goto err; } - if (init_io_cache(log, file, (size_t)binlog_file_cache_size, READ_CACHE, 0, 0, - MYF(MY_WME|MY_DONT_CHECK_FILESIZE))) + if (init_io_cache_ext(log, file, (size_t)binlog_file_cache_size, READ_CACHE, + 0, 0, MYF(MY_WME|MY_DONT_CHECK_FILESIZE), key_file_binlog_cache)) { sql_print_error("Failed to create a cache on log (file '%s')", log_file_name); @@ -2588,24 +2732,14 @@ end: } -void MYSQL_LOG::init(enum_log_type log_type_arg, - enum cache_type io_cache_type_arg) -{ - DBUG_ENTER("MYSQL_LOG::init"); - log_type= log_type_arg; - io_cache_type= io_cache_type_arg; - DBUG_PRINT("info",("log_type: %d", log_type)); - DBUG_VOID_RETURN; -} - - bool MYSQL_LOG::init_and_set_log_file_name(const char *log_name, const char *new_name, ulong next_log_number, enum_log_type log_type_arg, enum cache_type io_cache_type_arg) { - init(log_type_arg, io_cache_type_arg); + log_type= log_type_arg; + io_cache_type= io_cache_type_arg; if (new_name) { @@ -2659,7 +2793,7 @@ bool MYSQL_LOG::open( write_error= 0; - if (!(name= my_strdup(log_name, MYF(MY_WME)))) + if (!(name= my_strdup(key_memory_MYSQL_LOG_name, log_name, MYF(MY_WME)))) { name= (char *)log_name; // for the error message goto err; @@ -2702,7 +2836,9 @@ bool MYSQL_LOG::open( else if ((seek_offset= mysql_file_tell(file, MYF(MY_WME)))) goto err; - if (init_io_cache(&log_file, file, IO_SIZE, io_cache_type, seek_offset, 0, + if (init_io_cache(&log_file, file, (log_type == LOG_NORMAL ? IO_SIZE : + LOG_BIN_IO_SIZE), + io_cache_type, seek_offset, 0, MYF(MY_WME | MY_NABP | ((log_type == LOG_BIN) ? MY_WAIT_IF_FULL : 0)))) goto err; @@ -3399,10 +3535,11 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, O_RDWR | O_CREAT | O_BINARY | O_CLOEXEC, MYF(MY_WME))) < 0 || mysql_file_sync(index_file_nr, MYF(MY_WME)) || - init_io_cache(&index_file, index_file_nr, + init_io_cache_ext(&index_file, index_file_nr, IO_SIZE, WRITE_CACHE, mysql_file_seek(index_file_nr, 0L, MY_SEEK_END, MYF(0)), - 0, MYF(MY_WME | MY_WAIT_IF_FULL)) || + 0, MYF(MY_WME | MY_WAIT_IF_FULL), + m_key_file_log_index_cache) || DBUG_EVALUATE_IF("fault_injection_openning_index", 1, 0)) { /* @@ -3456,7 +3593,6 @@ bool MYSQL_BIN_LOG::open_index_file(const char *index_file_name_arg, */ bool MYSQL_BIN_LOG::open(const char *log_name, - enum_log_type log_type_arg, const char *new_name, ulong next_log_number, enum cache_type io_cache_type_arg, @@ -3467,7 +3603,6 @@ bool MYSQL_BIN_LOG::open(const char *log_name, File file= -1; xid_count_per_binlog *new_xid_list_entry= NULL, *b; DBUG_ENTER("MYSQL_BIN_LOG::open"); - DBUG_PRINT("enter",("log_type: %d",(int) log_type_arg)); mysql_mutex_assert_owner(&LOCK_log); @@ -3487,7 +3622,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name, /* We need to calculate new log file name for purge to delete old */ if (init_and_set_log_file_name(log_name, new_name, next_log_number, - log_type_arg, io_cache_type_arg)) + LOG_BIN, io_cache_type_arg)) { sql_print_error("MYSQL_BIN_LOG::open failed to generate new file name."); if (!is_relay_log) @@ -4317,7 +4452,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD *thd, bool create_new_log, } } if (create_new_log && !open_index_file(index_file_name, 0, FALSE)) - if (unlikely((error= open(save_name, log_type, 0, next_log_number, + if (unlikely((error= open(save_name, 0, next_log_number, io_cache_type, max_size, 0, FALSE)))) goto err; my_free((void *) save_name); @@ -4454,14 +4589,16 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included) { rli->last_inuse_relaylog= NULL; included= 1; - to_purge_if_included= my_strdup(ir->name, MYF(0)); + to_purge_if_included= my_strdup(key_memory_Relay_log_info_group_relay_log_name, + ir->name, MYF(0)); } rli->free_inuse_relaylog(ir); ir= next; } rli->inuse_relaylog_list= ir; if (ir) - to_purge_if_included= my_strdup(ir->name, MYF(0)); + to_purge_if_included= my_strdup(key_memory_Relay_log_info_group_relay_log_name, + ir->name, MYF(0)); /* Read the next log file name from the index file and pass it back to @@ -5054,55 +5191,6 @@ MYSQL_BIN_LOG::is_xidlist_idle_nolock() return true; } -#ifdef WITH_WSREP -inline bool -is_gtid_cached_internal(IO_CACHE *file) -{ - uchar data[EVENT_TYPE_OFFSET+1]; - bool result= false; - my_off_t write_pos= my_b_tell(file); - if (reinit_io_cache(file, READ_CACHE, 0, 0, 0)) - return false; - /* - In the cache we have gtid event if , below condition is true, - */ - my_b_read(file, data, sizeof(data)); - uint event_type= (uchar)data[EVENT_TYPE_OFFSET]; - if (event_type == GTID_LOG_EVENT) - result= true; - /* - Cleanup , Why because we have not read the full buffer - and this will cause next to next reinit_io_cache(called in write_cache) - to make cache empty. - */ - file->read_pos= file->read_end; - if (reinit_io_cache(file, WRITE_CACHE, write_pos, 0, 0)) - return false; - return result; -} -#endif - -#ifdef WITH_WSREP -inline bool -MYSQL_BIN_LOG::is_gtid_cached(THD *thd) -{ - binlog_cache_mngr *mngr= (binlog_cache_mngr *) thd_get_ha_data( - thd, binlog_hton); - if (!mngr) - return false; - binlog_cache_data *cache_trans= mngr->get_binlog_cache_data( - use_trans_cache(thd, true)); - binlog_cache_data *cache_stmt= mngr->get_binlog_cache_data( - use_trans_cache(thd, false)); - if (cache_trans && !cache_trans->empty() && - is_gtid_cached_internal(&cache_trans->cache_log)) - return true; - if (cache_stmt && !cache_stmt->empty() && - is_gtid_cached_internal(&cache_stmt->cache_log)) - return true; - return false; -} -#endif /** Create a new log file name. @@ -5225,34 +5313,33 @@ int MYSQL_BIN_LOG::new_file_impl() } new_name_ptr=new_name; - if (log_type == LOG_BIN) { + /* + We log the whole file name for log file as the user may decide + to change base names at some point. + */ + Rotate_log_event r(new_name + dirname_length(new_name), 0, LOG_EVENT_OFFSET, + is_relay_log ? Rotate_log_event::RELAY_LOG : 0); + /* + The current relay-log's closing Rotate event must have checksum + value computed with an algorithm of the last relay-logged FD event. + */ + if (is_relay_log) + r.checksum_alg= relay_log_checksum_alg; + DBUG_ASSERT(!is_relay_log || + relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); + if (DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", + (error= close_on_error= TRUE), FALSE) || + (error= write_event(&r))) { - /* - We log the whole file name for log file as the user may decide - to change base names at some point. - */ - Rotate_log_event r(new_name+dirname_length(new_name), 0, LOG_EVENT_OFFSET, - is_relay_log ? Rotate_log_event::RELAY_LOG : 0); - /* - The current relay-log's closing Rotate event must have checksum - value computed with an algorithm of the last relay-logged FD event. - */ - if (is_relay_log) - r.checksum_alg= relay_log_checksum_alg; - DBUG_ASSERT(!is_relay_log || relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); - if(DBUG_EVALUATE_IF("fault_injection_new_file_rotate_event", (error=close_on_error=TRUE), FALSE) || - (error= write_event(&r))) - { - DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno=2;); - close_on_error= TRUE; - my_printf_error(ER_ERROR_ON_WRITE, - ER_THD_OR_DEFAULT(current_thd, ER_CANT_OPEN_FILE), - MYF(ME_FATAL), name, errno); - goto end; - } - bytes_written += r.data_written; + DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno= 2;); + close_on_error= TRUE; + my_printf_error(ER_ERROR_ON_WRITE, + ER_THD_OR_DEFAULT(current_thd, ER_CANT_OPEN_FILE), + MYF(ME_FATAL), name, errno); + goto end; } + bytes_written+= r.data_written; } /* @@ -5283,7 +5370,7 @@ int MYSQL_BIN_LOG::new_file_impl() delay_close= true; } close(close_flag); - if (log_type == LOG_BIN && checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF) + if (checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF) { DBUG_ASSERT(!is_relay_log); DBUG_ASSERT(binlog_checksum_options != checksum_alg_reset); @@ -5310,8 +5397,7 @@ int MYSQL_BIN_LOG::new_file_impl() { /* reopen the binary log file. */ file_to_open= new_name_ptr; - error= open(old_name, log_type, new_name_ptr, 0, io_cache_type, - max_size, 1, FALSE); + error= open(old_name, new_name_ptr, 0, io_cache_type, max_size, 1, FALSE); } /* handle reopening errors */ @@ -5366,7 +5452,10 @@ bool MYSQL_BIN_LOG::write_event(Log_event *ev, binlog_cache_data *cache_data, { Log_event_writer writer(file, 0, &crypto); if (crypto.scheme && file == &log_file) + { writer.ctx= alloca(crypto.ctx_size); + writer.set_encrypted_writer(); + } if (cache_data) cache_data->add_status(ev->logged_status()); return writer.write(ev); @@ -5534,17 +5623,19 @@ trans_has_updated_trans_table(const THD* thd) @param thd The client thread that executed the current statement. @return - @c true if a transactional table was updated, @c false otherwise. + @c true if a transactional table with rollback was updated, + @c false otherwise. */ bool stmt_has_updated_trans_table(const THD *thd) { Ha_trx_info *ha_info; - for (ha_info= thd->transaction.stmt.ha_list; ha_info; + for (ha_info= thd->transaction->stmt.ha_list; ha_info; ha_info= ha_info->next()) { - if (ha_info->is_trx_read_write() && ha_info->ht() != binlog_hton) + if (ha_info->is_trx_read_write() && + !(ha_info->ht()->flags & HTON_NO_ROLLBACK)) return (TRUE); } return (FALSE); @@ -5620,8 +5711,8 @@ bool ending_single_stmt_trans(THD* thd, const bool all) */ bool trans_has_updated_non_trans_table(const THD* thd) { - return (thd->transaction.all.modified_non_trans_table || - thd->transaction.stmt.modified_non_trans_table); + return (thd->transaction->all.modified_non_trans_table || + thd->transaction->stmt.modified_non_trans_table); } /** @@ -5634,7 +5725,7 @@ bool trans_has_updated_non_trans_table(const THD* thd) */ bool stmt_has_updated_non_trans_table(const THD* thd) { - return (thd->transaction.stmt.modified_non_trans_table); + return (thd->transaction->stmt.modified_non_trans_table); } /* @@ -5651,7 +5742,8 @@ binlog_cache_mngr *THD::binlog_setup_trx_data() if (cache_mngr) DBUG_RETURN(cache_mngr); // Already set up - cache_mngr= (binlog_cache_mngr*) my_malloc(sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL)); + cache_mngr= (binlog_cache_mngr*) my_malloc(key_memory_binlog_cache_mngr, + sizeof(binlog_cache_mngr), MYF(MY_ZEROFILL)); if (!cache_mngr || open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir, LOG_PREFIX, (size_t)binlog_stmt_cache_size, MYF(MY_WME)) || @@ -5694,7 +5786,7 @@ binlog_cache_mngr *THD::binlog_setup_trx_data() We only update the saved position if the old one was undefined, the reason is that there are some cases (e.g., for CREATE-SELECT) where the position is saved twice (e.g., both in - select_create::prepare() and THD::binlog_write_table_map()) , but + select_create::prepare() and binlog_write_table_map()) , but we should use the first. This means that calls to this function can be used to start the statement before the first table map event, to include some extra events. @@ -5727,36 +5819,52 @@ THD::binlog_start_trans_and_stmt() { DBUG_VOID_RETURN; } - /* Write Gtid - Get domain id only when gtid mode is set - If this event is replicate through a master then , - we will forward the same gtid another nodes - We have to do this only one time in mysql transaction. - Since this function is called multiple times , We will check for - ha_info->is_started() - */ + /* If this event replicates through a master-slave then we need to + inject manually GTID so it is preserved in the cluster. We are writing + directly to WSREP buffer and not in IO cache because in case of IO cache + GTID event will be duplicated in binlog. + We have to do this only one time in mysql transaction. + Since this function is called multiple times , We will check for + ha_info->is_started(). + */ Ha_trx_info *ha_info; ha_info= this->ha_data[binlog_hton->slot].ha_info + (mstmt_mode ? 1 : 0); - if (!ha_info->is_started() && wsrep_gtid_mode - && this->variables.gtid_seq_no) + if (!ha_info->is_started() && + (this->variables.gtid_seq_no || this->variables.wsrep_gtid_seq_no) && + wsrep_on(this) && + (this->wsrep_cs().mode() == wsrep::client_state::m_local)) { - binlog_cache_mngr *const cache_mngr= - (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); - binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(1); - IO_CACHE *file= &cache_data->cache_log; - Log_event_writer writer(file, cache_data); - Gtid_log_event gtid_event(this, this->variables.gtid_seq_no, - this->variables.gtid_domain_id, - true, LOG_EVENT_SUPPRESS_USE_F, - true, 0); - gtid_event.server_id= this->variables.server_id; - writer.write(>id_event); + uchar *buf= 0; + size_t len= 0; + IO_CACHE tmp_io_cache; + Log_event_writer writer(&tmp_io_cache, 0); + if(!open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, + 128, MYF(MY_WME))) + { + uint64 seqno= this->variables.gtid_seq_no; + uint32 domain_id= this->variables.gtid_domain_id; + uint32 server_id= this->variables.server_id; + if (!this->variables.gtid_seq_no && this->variables.wsrep_gtid_seq_no) + { + seqno= this->variables.wsrep_gtid_seq_no; + domain_id= wsrep_gtid_server.domain_id; + server_id= wsrep_gtid_server.server_id; + } + Gtid_log_event gtid_event(this, seqno, domain_id, true, + LOG_EVENT_SUPPRESS_USE_F, true, 0); + gtid_event.server_id= server_id; + writer.write(>id_event); + wsrep_write_cache_buf(&tmp_io_cache, &buf, &len); + if (len > 0) this->wsrep_cs().append_data(wsrep::const_buffer(buf, len)); + if (buf) my_free(buf); + close_cached_file(&tmp_io_cache); + } } #endif if (mstmt_mode) - trans_register_ha(this, TRUE, binlog_hton); - trans_register_ha(this, FALSE, binlog_hton); + trans_register_ha(this, TRUE, binlog_hton, 0); + trans_register_ha(this, FALSE, binlog_hton, 0); /* Mark statement transaction as read/write. We never start a binary log transaction and keep it read-only, @@ -5800,50 +5908,154 @@ binlog_start_consistent_snapshot(handlerton *hton, THD *thd) strmake_buf(cache_mngr->last_commit_pos_file, mysql_bin_log.last_commit_pos_file); cache_mngr->last_commit_pos_offset= mysql_bin_log.last_commit_pos_offset; - trans_register_ha(thd, TRUE, hton); + trans_register_ha(thd, TRUE, binlog_hton, 0); DBUG_RETURN(err); } + +/** + Prepare all tables that are updated for row logging + + Annotate events and table maps are written by binlog_write_table_maps() +*/ + +void THD::binlog_prepare_for_row_logging() +{ + DBUG_ENTER("THD::binlog_prepare_for_row_logging"); + for (TABLE *table= open_tables ; table; table= table->next) + { + if (table->query_id == query_id && table->current_lock == F_WRLCK) + table->file->prepare_for_row_logging(); + } + DBUG_VOID_RETURN; +} + +/** + Write annnotated row event (the query) if needed +*/ + +bool THD::binlog_write_annotated_row(Log_event_writer *writer) +{ + int error; + DBUG_ENTER("THD::binlog_write_annotated_row"); + + if (!(IF_WSREP(!wsrep_fragments_certified_for_stmt(this), true) && + variables.binlog_annotate_row_events && + query_length())) + DBUG_RETURN(0); + + Annotate_rows_log_event anno(this, 0, false); + if (unlikely((error= writer->write(&anno)))) + { + if (my_errno == EFBIG) + writer->set_incident(); + DBUG_RETURN(error); + } + DBUG_RETURN(0); +} + + +/** + Write table map events for all tables that are using row logging. + This includes all tables used by this statement, including tables + used in triggers. + + Also write annotate events and start transactions. + This is using the "tables_with_row_logging" list prepared by + THD::binlog_prepare_for_row_logging +*/ + +bool THD::binlog_write_table_maps() +{ + bool with_annotate; + MYSQL_LOCK *locks[2], **locks_end= locks; + DBUG_ENTER("THD::binlog_write_table_maps"); + + DBUG_ASSERT(!binlog_table_maps); + DBUG_ASSERT(is_current_stmt_binlog_format_row()); + + /* Initialize cache_mngr once per statement */ + binlog_start_trans_and_stmt(); + with_annotate= 1; // Write annotate with first map + + if ((*locks_end= extra_lock)) + locks_end++; + if ((*locks_end= lock)) + locks_end++; + + for (MYSQL_LOCK **cur_lock= locks ; cur_lock < locks_end ; cur_lock++) + { + TABLE **const end_ptr= (*cur_lock)->table + (*cur_lock)->table_count; + for (TABLE **table_ptr= (*cur_lock)->table; + table_ptr != end_ptr ; + ++table_ptr) + { + TABLE *table= *table_ptr; + bool restore= 0; + /* + We have to also write table maps for tables that have not yet been + used, like for tables in after triggers + */ + if (!table->file->row_logging && + table->query_id != query_id && table->current_lock == F_WRLCK) + { + if (table->file->prepare_for_row_logging()) + restore= 1; + } + if (table->file->row_logging) + { + if (binlog_write_table_map(table, with_annotate)) + DBUG_RETURN(1); + with_annotate= 0; + } + if (restore) + { + /* + Restore original setting so that it doesn't cause problem for the + next statement + */ + table->file->row_logging= table->file->row_logging_init= 0; + } + } + } + binlog_table_maps= 1; // Table maps written + DBUG_RETURN(0); +} + + /** This function writes a table map to the binary log. Note that in order to keep the signature uniform with related methods, we use a redundant parameter to indicate whether a transactional table was changed or not. - If with_annotate != NULL and - *with_annotate = TRUE write also Annotate_rows before the table map. - @param table a pointer to the table. - @param is_transactional @c true indicates a transactional table, - otherwise @c false a non-transactional. + @param with_annotate If true call binlog_write_annotated_row() + @return nonzero if an error pops up when writing the table map event. */ -int THD::binlog_write_table_map(TABLE *table, bool is_transactional, - my_bool *with_annotate) + +bool THD::binlog_write_table_map(TABLE *table, bool with_annotate) { int error; + bool is_transactional= table->file->row_logging_has_trans; DBUG_ENTER("THD::binlog_write_table_map"); DBUG_PRINT("enter", ("table: %p (%s: #%lu)", table, table->s->table_name.str, table->s->table_map_id)); + /* Pre-conditions */ + DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); + /* Ensure that all events in a GTID group are in the same cache */ if (variables.option_bits & OPTION_GTID_BEGIN) is_transactional= 1; - /* Pre-conditions */ - DBUG_ASSERT(is_current_stmt_binlog_format_row()); - DBUG_ASSERT(WSREP_EMULATE_BINLOG_NNULL(this) || mysql_bin_log.is_open()); - DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); - Table_map_log_event the_event(this, table, table->s->table_map_id, is_transactional); - if (binlog_table_maps == 0) - binlog_start_trans_and_stmt(); - binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(this, binlog_hton); binlog_cache_data *cache_data= (cache_mngr-> @@ -5851,25 +6063,17 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, IO_CACHE *file= &cache_data->cache_log; Log_event_writer writer(file, cache_data); - if (with_annotate && *with_annotate) - { - Annotate_rows_log_event anno(table->in_use, is_transactional, false); - /* Annotate event should be written not more than once */ - *with_annotate= 0; - if (unlikely((error= writer.write(&anno)))) - { - if (my_errno == EFBIG) - cache_data->set_incident(); - DBUG_RETURN(error); - } - } + if (with_annotate) + if (binlog_write_annotated_row(&writer)) + DBUG_RETURN(1); + if (unlikely((error= writer.write(&the_event)))) DBUG_RETURN(error); - binlog_table_maps++; DBUG_RETURN(0); } + /** This function retrieves a pending row event from a cache which is specified through the parameter @c is_transactional. Respectively, when it @@ -6032,20 +6236,9 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_ENTER("write_gtid_event"); DBUG_PRINT("enter", ("standalone: %d", standalone)); -#ifdef WITH_WSREP - if (WSREP(thd) && - (wsrep_thd_trx_seqno(thd) > 0) && - wsrep_gtid_mode && !thd->variables.gtid_seq_no) - { - domain_id= wsrep_gtid_domain_id; - } else { -#endif /* WITH_WSREP */ + seq_no= thd->variables.gtid_seq_no; domain_id= thd->variables.gtid_domain_id; -#ifdef WITH_WSREP - } -#endif /* WITH_WSREP */ local_server_id= thd->variables.server_id; - seq_no= thd->variables.gtid_seq_no; DBUG_ASSERT(local_server_id != 0); @@ -6092,8 +6285,11 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone, DBUG_ASSERT(this == &mysql_bin_log); #ifdef WITH_WSREP - if (wsrep_gtid_mode && is_gtid_cached(thd)) - DBUG_RETURN(false); + if (wsrep_gtid_mode) + { + thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + thd->variables.server_id= global_system_variables.server_id; + } #endif if (write_event(>id_event)) @@ -6309,12 +6505,15 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) (WSREP(thd) && !(thd->variables.option_bits & OPTION_BIN_LOG)))) DBUG_RETURN(0); - if (thd->variables.option_bits & OPTION_GTID_BEGIN) + if (thd->variables.option_bits & + (OPTION_GTID_BEGIN | OPTION_BIN_COMMIT_OFF)) { DBUG_PRINT("info", ("OPTION_GTID_BEGIN was set")); /* Wait for commit from binary log before we commit */ direct= 0; using_trans= 1; + /* Set cache_type to ensure we don't get checksums for this event */ + event_info->cache_type= Log_event::EVENT_TRANSACTIONAL_CACHE; } if (thd->binlog_evt_union.do_union) @@ -6370,7 +6569,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) nodes. A check has been added to stop them from getting logged into binary log files. */ - if (WSREP(thd)) option_bin_log_flag= true; + if (WSREP(thd)) + option_bin_log_flag= true; if ((!(option_bin_log_flag)) || (thd->lex->sql_command != SQLCOM_ROLLBACK_TO_SAVEPOINT && @@ -6390,7 +6590,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) DBUG_PRINT("info", ("direct is set")); DBUG_ASSERT(!thd->backup_commit_lock); - mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT); + MDL_REQUEST_INIT(&mdl_request, MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, + MDL_EXPLICIT); if (thd->mdl_context.acquire_lock(&mdl_request, thd->variables.lock_wait_timeout)) DBUG_RETURN(1); @@ -7093,8 +7294,10 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE *cache) CacheWriter writer(thd, &log_file, binlog_checksum_options, &crypto); if (crypto.scheme) + { writer.ctx= alloca(crypto.ctx_size); - + writer.set_encrypted_writer(); + } // while there is just one alg the following must hold: DBUG_ASSERT(binlog_checksum_options == BINLOG_CHECKSUM_ALG_OFF || binlog_checksum_options == BINLOG_CHECKSUM_ALG_CRC32); @@ -7429,10 +7632,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, entry.all= all; entry.using_stmt_cache= using_stmt_cache; entry.using_trx_cache= using_trx_cache; - entry.need_unlog= false; - ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + entry.need_unlog= is_preparing_xa(thd); + ha_info= all ? thd->transaction->all.ha_list : thd->transaction->stmt.ha_list; - for (; ha_info; ha_info= ha_info->next()) + for (; !entry.need_unlog && ha_info; ha_info= ha_info->next()) { if (ha_info->is_started() && ha_info->ht() != binlog_hton && !ha_info->ht()->commit_checkpoint_request) @@ -7790,7 +7993,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) wsrep_run_commit_hook(entry->thd, entry->all)) { /* Release commit order here */ - if (wsrep_ordered_commit(entry->thd, entry->all, wsrep_apply_error())) + if (wsrep_ordered_commit(entry->thd, entry->all)) result= -2; /* return -3, if this is leader */ @@ -8057,7 +8260,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) We already checked before that at least one cache is non-empty; if both are empty we would have skipped calling into here. */ - DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || !cache_mngr->trx_cache.empty()); + DBUG_ASSERT(!cache_mngr->stmt_cache.empty() || + !cache_mngr->trx_cache.empty() || + current->thd->transaction->xid_state.is_explicit_XA()); if (unlikely((current->error= write_transaction_or_stmt(current, commit_id)))) @@ -8066,7 +8271,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) strmake_buf(cache_mngr->last_commit_pos_file, log_file_name); commit_offset= my_b_write_tell(&log_file); cache_mngr->last_commit_pos_offset= commit_offset; - if (cache_mngr->using_xa && cache_mngr->xa_xid) + if ((cache_mngr->using_xa && cache_mngr->xa_xid) || current->need_unlog) { /* If all storage engines support commit_checkpoint_request(), then we @@ -8306,7 +8511,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry, binlog_cache_mngr *mngr= entry->cache_mngr; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_or_stmt"); - if (write_gtid_event(entry->thd, false, entry->using_trx_cache, commit_id)) + if (write_gtid_event(entry->thd, is_prepared_xa(entry->thd), + entry->using_trx_cache, commit_id)) DBUG_RETURN(ER_ERROR_ON_WRITE); if (entry->using_stmt_cache && !mngr->stmt_cache.empty() && @@ -8592,9 +8798,9 @@ void MYSQL_BIN_LOG::close(uint exiting) if (log_state == LOG_OPENED) { + DBUG_ASSERT(log_type == LOG_BIN); #ifdef HAVE_REPLICATION - if (log_type == LOG_BIN && - (exiting & LOG_CLOSE_STOP_EVENT)) + if (exiting & LOG_CLOSE_STOP_EVENT) { Stop_log_event s; // the checksumming rule for relay-log case is similar to Rotate @@ -8631,8 +8837,7 @@ void MYSQL_BIN_LOG::close(uint exiting) #endif /* HAVE_REPLICATION */ /* don't pwrite in a file opened with O_APPEND - it doesn't work */ - if (log_file.type == WRITE_CACHE && log_type == LOG_BIN - && !(exiting & LOG_CLOSE_DELAYED_CLOSE)) + if (log_file.type == WRITE_CACHE && !(exiting & LOG_CLOSE_DELAYED_CLOSE)) { my_off_t org_position= mysql_file_tell(log_file.file, MYF(0)); if (!failed_to_save_state) @@ -9012,7 +9217,7 @@ void TC_LOG::run_prepare_ordered(THD *thd, bool all) { Ha_trx_info *ha_info= - all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + all ? thd->transaction->all.ha_list : thd->transaction->stmt.ha_list; mysql_mutex_assert_owner(&LOCK_prepare_ordered); for (; ha_info; ha_info= ha_info->next()) @@ -9029,7 +9234,7 @@ void TC_LOG::run_commit_ordered(THD *thd, bool all) { Ha_trx_info *ha_info= - all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + all ? thd->transaction->all.ha_list : thd->transaction->stmt.ha_list; mysql_mutex_assert_owner(&LOCK_commit_ordered); for (; ha_info; ha_info= ha_info->next()) @@ -9114,7 +9319,8 @@ int TC_LOG_MMAP::log_and_order(THD *thd, my_xid xid, bool all, prev= queue; queue= next; } - DBUG_ASSERT(prev == &entry && prev->thd == thd); + DBUG_ASSERT(prev == &entry); + DBUG_ASSERT(prev->thd == thd); } else { @@ -9208,7 +9414,8 @@ int TC_LOG_MMAP::open(const char *opt_name) PAGE *pg; DBUG_ASSERT(total_ha_2pc > 1); - DBUG_ASSERT(opt_name && opt_name[0]); + DBUG_ASSERT(opt_name); + DBUG_ASSERT(opt_name[0]); tc_log_page_size= my_getpagesize(); @@ -9255,7 +9462,8 @@ int TC_LOG_MMAP::open(const char *opt_name) npages=(uint)file_length/tc_log_page_size; if (npages < 3) // to guarantee non-empty pool goto err; - if (!(pages=(PAGE *)my_malloc(npages*sizeof(PAGE), MYF(MY_WME|MY_ZEROFILL)))) + if (!(pages=(PAGE *)my_malloc(key_memory_TC_LOG_MMAP_pages, + npages*sizeof(PAGE), MYF(MY_WME|MY_ZEROFILL)))) goto err; inited=3; for (pg=pages, i=0; i < npages; i++, pg++) @@ -9565,7 +9773,8 @@ int TC_LOG_MMAP::unlog(ulong cookie, my_xid xid) { uint32 size= sizeof(*pending_checkpoint) + sizeof(ulong) * (ncookies - 1); if (!(pending_checkpoint= - (pending_cookies *)my_malloc(size, MYF(MY_ZEROFILL)))) + (pending_cookies *)my_malloc(PSI_INSTRUMENT_ME, size, + MYF(MY_ZEROFILL)))) { my_error(ER_OUTOFMEMORY, MYF(0), size); mysql_mutex_unlock(&LOCK_pending_checkpoint); @@ -9625,7 +9834,8 @@ int TC_LOG_MMAP::delete_entry(ulong cookie) PAGE *p=pages+(cookie/tc_log_page_size); my_xid *x=(my_xid *)(data+cookie); - DBUG_ASSERT(x >= p->start && x < p->end); + DBUG_ASSERT(x >= p->start); + DBUG_ASSERT(x < p->end); mysql_mutex_lock(&p->lock); *x=0; @@ -9704,8 +9914,8 @@ int TC_LOG_MMAP::recover() goto err1; } - if (my_hash_init(&xids, &my_charset_bin, tc_log_page_size/3, 0, - sizeof(my_xid), 0, 0, MYF(0))) + if (my_hash_init(PSI_INSTRUMENT_ME, &xids, &my_charset_bin, + tc_log_page_size/3, 0, sizeof(my_xid), 0, 0, MYF(0))) goto err1; for ( ; p < end_p ; p++) @@ -9770,7 +9980,8 @@ int TC_LOG_BINLOG::open(const char *opt_name) int error= 1; DBUG_ASSERT(total_ha_2pc > 1); - DBUG_ASSERT(opt_name && opt_name[0]); + DBUG_ASSERT(opt_name); + DBUG_ASSERT(opt_name[0]); if (!my_b_inited(&index_file)) { @@ -9783,7 +9994,7 @@ int TC_LOG_BINLOG::open(const char *opt_name) { mysql_mutex_lock(&LOCK_log); /* generate a new binlog to mask a corrupted one */ - open(opt_name, LOG_BIN, 0, 0, WRITE_CACHE, max_binlog_size, 0, TRUE); + open(opt_name, 0, 0, WRITE_CACHE, max_binlog_size, 0, TRUE); mysql_mutex_unlock(&LOCK_log); cleanup(); return 1; @@ -10009,6 +10220,48 @@ int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie)); } +static bool write_empty_xa_prepare(THD *thd, binlog_cache_mngr *cache_mngr) +{ + return binlog_commit_flush_xa_prepare(thd, true, cache_mngr); +} + +int TC_LOG_BINLOG::unlog_xa_prepare(THD *thd, bool all) +{ + DBUG_ASSERT(is_preparing_xa(thd)); + + binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); + int cookie= 0; + + if (!cache_mngr->need_unlog) + { + Ha_trx_info *ha_info; + uint rw_count= ha_count_rw_all(thd, &ha_info); + bool rc= false; + + /* + This transaction has not been binlogged as indicated by need_unlog. + Such exceptional cases include transactions with no effect to engines, + e.g REPLACE that does not change the dat but still the Engine + transaction branch claims to be rw, and few more. + In all such cases an empty XA-prepare group of events is bin-logged. + */ + if (rw_count > 0) + { + /* an empty XA-prepare event group is logged */ + rc= write_empty_xa_prepare(thd, cache_mngr); // normally gains need_unlog + trans_register_ha(thd, true, binlog_hton, 0); // do it for future commmit + } + if (rw_count == 0 || !cache_mngr->need_unlog) + return rc; + } + + cookie= BINLOG_COOKIE_MAKE(cache_mngr->binlog_id, cache_mngr->delayed_error); + cache_mngr->need_unlog= false; + + return unlog(cookie, 1); +} + + void TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie) { @@ -10218,13 +10471,13 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, #endif if (! fdle->is_valid() || - (do_xa && my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0, + (do_xa && my_hash_init(key_memory_binlog_recover_exec, &xids, &my_charset_bin, TC_LOG_PAGE_SIZE/3, 0, sizeof(my_xid), 0, 0, MYF(0)))) goto err1; if (do_xa) - init_alloc_root(&mem_root, "TC_LOG_BINLOG", TC_LOG_PAGE_SIZE, - TC_LOG_PAGE_SIZE, MYF(0)); + init_alloc_root(key_memory_binlog_recover_exec, &mem_root, + TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE, MYF(0)); fdle->flags&= ~LOG_EVENT_BINLOG_IN_USE_F; // abort on the first error @@ -10325,6 +10578,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name, ((last_gtid_standalone && !ev->is_part_of_group(typ)) || (!last_gtid_standalone && (typ == XID_EVENT || + typ == XA_PREPARE_LOG_EVENT || (LOG_EVENT_IS_QUERY(typ) && (((Query_log_event *)ev)->is_commit() || ((Query_log_event *)ev)->is_rollback())))))) @@ -10530,24 +10784,6 @@ MYSQL_BIN_LOG::do_binlog_recovery(const char *opt_name, bool do_xa_recovery) #ifdef INNODB_COMPATIBILITY_HOOKS -/** - Get the file name of the MySQL binlog. - @return the name of the binlog file -*/ -extern "C" -const char* mysql_bin_log_file_name(void) -{ - return mysql_bin_log.get_log_fname(); -} -/** - Get the current position of the MySQL binlog. - @return byte offset from the beginning of the binlog -*/ -extern "C" -ulonglong mysql_bin_log_file_pos(void) -{ - return (ulonglong) mysql_bin_log.get_log_file()->pos_in_file; -} /* Get the current position of the MySQL binlog for transaction currently being committed. @@ -10655,7 +10891,7 @@ static struct st_mysql_sys_var *binlog_sys_vars[]= /* Copy out the non-directory part of binlog position filename for the `binlog_snapshot_file' status variable, same way as it is done for - SHOW MASTER STATUS. + SHOW BINLOG STATUS. */ static void set_binlog_snapshot_file(const char *src) @@ -10826,7 +11062,7 @@ void wsrep_thd_binlog_trx_reset(THD * thd) cache_mngr->stmt_cache.reset(); } } - thd->clear_binlog_table_maps(); + thd->reset_binlog_for_next_statement(); DBUG_VOID_RETURN; } @@ -10879,8 +11115,8 @@ void wsrep_register_binlog_handler(THD *thd, bool trx) Set callbacks in order to be able to call commmit or rollback. */ if (trx) - trans_register_ha(thd, TRUE, binlog_hton); - trans_register_ha(thd, FALSE, binlog_hton); + trans_register_ha(thd, TRUE, binlog_hton, 0); + trans_register_ha(thd, FALSE, binlog_hton, 0); /* Set the binary log as read/write otherwise callbacks are not called. |