diff options
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 141 |
1 files changed, 123 insertions, 18 deletions
diff --git a/sql/log.cc b/sql/log.cc index 75a895e25f8..371f9c06cce 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -52,9 +52,11 @@ #include "sql_plugin.h" #include "rpl_handler.h" +#include "wsrep_mysqld.h" #include "debug_sync.h" #include "sql_show.h" #include "my_pthread.h" +#include "wsrep_mysqld.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -63,6 +65,7 @@ #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") +handlerton *binlog_hton; LOGGER logger; MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period); @@ -511,8 +514,6 @@ private: binlog_cache_mngr(const binlog_cache_mngr& info); }; -handlerton *binlog_hton; - bool LOGGER::is_log_table_enabled(uint log_table_type) { switch (log_table_type) { @@ -526,7 +527,6 @@ bool LOGGER::is_log_table_enabled(uint log_table_type) } } - /** Check if a given table is opened log table @@ -1577,7 +1577,8 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) DBUG_ENTER("binlog_trans_log_savepos"); DBUG_ASSERT(pos != NULL); binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data(); - DBUG_ASSERT(mysql_bin_log.is_open()); + DBUG_ASSERT_IF_WSREP(((WSREP(thd) && wsrep_emulate_bin_log)) || mysql_bin_log.is_open()); + DBUG_ASSERT(IF_WSREP(1, mysql_bin_log.is_open())); *pos= cache_mngr->trx_cache.get_byte_position(); DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos)); DBUG_VOID_RETURN; @@ -1625,7 +1626,8 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) int binlog_init(void *p) { binlog_hton= (handlerton *)p; - binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO; + binlog_hton->state=IF_WSREP(WSREP_ON || opt_bin_log, opt_bin_log) ? + SHOW_OPTION_YES : SHOW_OPTION_NO; binlog_hton->db_type=DB_TYPE_BINLOG; binlog_hton->savepoint_offset= sizeof(my_off_t); binlog_hton->close_connection= binlog_close_connection; @@ -1745,6 +1747,16 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all, binlog_cache_mngr *cache_mngr) { DBUG_ENTER("binlog_commit_flush_stmt_cache"); +#ifdef WITH_WSREP + if (thd->wsrep_mysql_replicated > 0) + { + DBUG_ASSERT(WSREP_ON); + WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d", + thd->wsrep_mysql_replicated); + return 0; + } +#endif + Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), FALSE, TRUE, TRUE, 0); DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE)); @@ -1900,12 +1912,12 @@ static bool trans_cannot_safely_rollback(THD *thd, bool all) return ((thd->variables.option_bits & OPTION_KEEP_LOG) || (trans_has_updated_non_trans_table(thd) && - thd->variables.binlog_format == BINLOG_FORMAT_STMT) || + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) || (cache_mngr->trx_cache.changes_to_non_trans_temp_table() && - thd->variables.binlog_format == BINLOG_FORMAT_MIXED) || + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) || (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && - thd->variables.binlog_format == BINLOG_FORMAT_MIXED)); + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED)); } @@ -1927,6 +1939,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) DBUG_ENTER("binlog_commit"); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + if (!cache_mngr) + { + DBUG_ASSERT(WSREP(thd)); + DBUG_RETURN(0); + } +#endif /* WITH_WSREP */ DBUG_PRINT("debug", ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", @@ -1983,6 +2002,14 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) int error= 0; binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + if (!cache_mngr) + { + DBUG_ASSERT(WSREP(thd)); + DBUG_RETURN(0); + } + +#endif /* WITH_WSREP */ DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), @@ -2011,8 +2038,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) cache_mngr->reset(false, true); DBUG_RETURN(error); } - - if (mysql_bin_log.check_write_error(thd)) + if (IF_WSREP(!wsrep_emulate_bin_log,1) && + mysql_bin_log.check_write_error(thd)) { /* "all == true" means that a "rollback statement" triggered the error and @@ -2043,9 +2070,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) else if (ending_trans(thd, all) || (!(thd->variables.option_bits & OPTION_KEEP_LOG) && (!stmt_has_updated_non_trans_table(thd) || - thd->variables.binlog_format != BINLOG_FORMAT_STMT) && + WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) && (!cache_mngr->trx_cache.changes_to_non_trans_temp_table() || - thd->variables.binlog_format != BINLOG_FORMAT_MIXED))) + WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED))) error= binlog_truncate_trx_cache(thd, cache_mngr, all); } @@ -2150,8 +2177,13 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd) static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) { - DBUG_ENTER("binlog_savepoint_set"); int error= 1; + DBUG_ENTER("binlog_savepoint_set"); + +#ifdef WITH_WSREP + if (wsrep_emulate_bin_log) + DBUG_RETURN(0); +#endif /* WITH_WSREP */ char buf[1024]; String log_query(buf, sizeof(buf), &my_charset_bin); @@ -2190,7 +2222,8 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) non-transactional table. Otherwise, truncate the binlog cache starting from the SAVEPOINT command. */ - if (unlikely(trans_has_updated_non_trans_table(thd) || + if (IF_WSREP(!wsrep_emulate_bin_log, 1) && + unlikely(trans_has_updated_non_trans_table(thd) || (thd->variables.option_bits & OPTION_KEEP_LOG))) { char buf[1024]; @@ -2204,7 +2237,10 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) TRUE, FALSE, TRUE, errcode); DBUG_RETURN(mysql_bin_log.write(&qinfo)); } - binlog_trans_log_truncate(thd, *(my_off_t*)sv); + + if (IF_WSREP(!wsrep_emulate_bin_log, 1)) + binlog_trans_log_truncate(thd, *(my_off_t*)sv); + DBUG_RETURN(0); } @@ -5332,7 +5368,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, is_transactional= 1; /* Pre-conditions */ - DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); + DBUG_ASSERT(is_current_stmt_binlog_format_row()); + DBUG_ASSERT(IF_WSREP(WSREP_EMULATE_BINLOG(this), 0) || mysql_bin_log.is_open()); DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); Table_map_log_event @@ -5465,7 +5502,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); - DBUG_ASSERT(mysql_bin_log.is_open()); + DBUG_ASSERT(IF_WSREP(WSREP_EMULATE_BINLOG(thd),0) || mysql_bin_log.is_open()); DBUG_PRINT("enter", ("event: 0x%lx", (long) event)); int error= 0; @@ -5791,7 +5828,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) mostly called if is_open() *was* true a few instructions before, but it could have changed since. */ - if (likely(is_open())) + /* applier and replayer can skip writing binlog events */ + if (IF_WSREP((WSREP_EMULATE_BINLOG(thd) && (thd->wsrep_exec_mode != REPL_RECV)) || is_open(), likely(is_open()))) { my_off_t UNINIT_VAR(my_org_b_tell); #ifdef HAVE_REPLICATION @@ -6130,6 +6168,17 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) int error= 0; DBUG_ENTER("MYSQL_BIN_LOG::rotate"); +#ifdef WITH_WSREP + if (wsrep_to_isolation) + { + DBUG_ASSERT(WSREP_ON); + *check_purge= false; + WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d", + wsrep_to_isolation); + DBUG_RETURN(0); + } +#endif + //todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log); *check_purge= false; @@ -6675,6 +6724,11 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, Ha_trx_info *ha_info; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); +#ifdef WITH_WSREP + if (wsrep_emulate_bin_log) + DBUG_RETURN(0); +#endif /* WITH_WSREP */ + entry.thd= thd; entry.cache_mngr= cache_mngr; entry.error= 0; @@ -6683,6 +6737,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, entry.using_trx_cache= using_trx_cache; entry.need_unlog= false; ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list; + for (; ha_info; ha_info= ha_info->next()) { if (ha_info->is_started() && ha_info->ht() != binlog_hton && @@ -8819,7 +8874,10 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); if (!cache_mngr) + { + WSREP_DEBUG("Skipping empty log_xid: %s", thd->query()); DBUG_RETURN(0); + } cache_mngr->using_xa= TRUE; cache_mngr->xa_xid= xid; @@ -9649,3 +9707,50 @@ maria_declare_plugin(binlog) MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ } maria_declare_plugin_end; + +#ifdef WITH_WSREP +IO_CACHE * get_trans_log(THD * thd) +{ + binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) + thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + return cache_mngr->get_binlog_cache_log(true); + + WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id); + return NULL; +} + + +bool wsrep_trans_cache_is_empty(THD *thd) +{ + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + return (!cache_mngr || cache_mngr->trx_cache.empty()); +} + + +void thd_binlog_trx_reset(THD * thd) +{ + /* + todo: fix autocommit select to not call the caller + */ + if (thd_get_ha_data(thd, binlog_hton) != NULL) + { + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + cache_mngr->reset(false, true); + } + thd->clear_binlog_table_maps(); +} + + +void thd_binlog_rollback_stmt(THD * thd) +{ + WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); +} +#endif /* WITH_WSREP */ |