summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc141
1 files changed, 123 insertions, 18 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 75a895e25f8..371f9c06cce 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -52,9 +52,11 @@
#include "sql_plugin.h"
#include "rpl_handler.h"
+#include "wsrep_mysqld.h"
#include "debug_sync.h"
#include "sql_show.h"
#include "my_pthread.h"
+#include "wsrep_mysqld.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -63,6 +65,7 @@
#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")
+handlerton *binlog_hton;
LOGGER logger;
MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period);
@@ -511,8 +514,6 @@ private:
binlog_cache_mngr(const binlog_cache_mngr& info);
};
-handlerton *binlog_hton;
-
bool LOGGER::is_log_table_enabled(uint log_table_type)
{
switch (log_table_type) {
@@ -526,7 +527,6 @@ bool LOGGER::is_log_table_enabled(uint log_table_type)
}
}
-
/**
Check if a given table is opened log table
@@ -1577,7 +1577,8 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos)
DBUG_ENTER("binlog_trans_log_savepos");
DBUG_ASSERT(pos != NULL);
binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data();
- DBUG_ASSERT(mysql_bin_log.is_open());
+ DBUG_ASSERT_IF_WSREP(((WSREP(thd) && wsrep_emulate_bin_log)) || mysql_bin_log.is_open());
+ DBUG_ASSERT(IF_WSREP(1, mysql_bin_log.is_open()));
*pos= cache_mngr->trx_cache.get_byte_position();
DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos));
DBUG_VOID_RETURN;
@@ -1625,7 +1626,8 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
int binlog_init(void *p)
{
binlog_hton= (handlerton *)p;
- binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
+ binlog_hton->state=IF_WSREP(WSREP_ON || opt_bin_log, opt_bin_log) ?
+ SHOW_OPTION_YES : SHOW_OPTION_NO;
binlog_hton->db_type=DB_TYPE_BINLOG;
binlog_hton->savepoint_offset= sizeof(my_off_t);
binlog_hton->close_connection= binlog_close_connection;
@@ -1745,6 +1747,16 @@ binlog_commit_flush_stmt_cache(THD *thd, bool all,
binlog_cache_mngr *cache_mngr)
{
DBUG_ENTER("binlog_commit_flush_stmt_cache");
+#ifdef WITH_WSREP
+ if (thd->wsrep_mysql_replicated > 0)
+ {
+ DBUG_ASSERT(WSREP_ON);
+ WSREP_DEBUG("avoiding binlog_commit_flush_trx_cache: %d",
+ thd->wsrep_mysql_replicated);
+ return 0;
+ }
+#endif
+
Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"),
FALSE, TRUE, TRUE, 0);
DBUG_RETURN(binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, FALSE));
@@ -1900,12 +1912,12 @@ static bool trans_cannot_safely_rollback(THD *thd, bool all)
return ((thd->variables.option_bits & OPTION_KEEP_LOG) ||
(trans_has_updated_non_trans_table(thd) &&
- thd->variables.binlog_format == BINLOG_FORMAT_STMT) ||
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) ||
(cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED) ||
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) ||
(trans_has_updated_non_trans_table(thd) &&
ending_single_stmt_trans(thd,all) &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED));
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED));
}
@@ -1927,6 +1939,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
DBUG_ENTER("binlog_commit");
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ if (!cache_mngr)
+ {
+ DBUG_ASSERT(WSREP(thd));
+ DBUG_RETURN(0);
+ }
+#endif /* WITH_WSREP */
DBUG_PRINT("debug",
("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
@@ -1983,6 +2002,14 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
int error= 0;
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ if (!cache_mngr)
+ {
+ DBUG_ASSERT(WSREP(thd));
+ DBUG_RETURN(0);
+ }
+
+#endif /* WITH_WSREP */
DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
YESNO(all),
@@ -2011,8 +2038,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
-
- if (mysql_bin_log.check_write_error(thd))
+ if (IF_WSREP(!wsrep_emulate_bin_log,1) &&
+ mysql_bin_log.check_write_error(thd))
{
/*
"all == true" means that a "rollback statement" triggered the error and
@@ -2043,9 +2070,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
else if (ending_trans(thd, all) ||
(!(thd->variables.option_bits & OPTION_KEEP_LOG) &&
(!stmt_has_updated_non_trans_table(thd) ||
- thd->variables.binlog_format != BINLOG_FORMAT_STMT) &&
+ WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) &&
(!cache_mngr->trx_cache.changes_to_non_trans_temp_table() ||
- thd->variables.binlog_format != BINLOG_FORMAT_MIXED)))
+ WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED)))
error= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
@@ -2150,8 +2177,13 @@ bool MYSQL_BIN_LOG::check_write_error(THD *thd)
static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
{
- DBUG_ENTER("binlog_savepoint_set");
int error= 1;
+ DBUG_ENTER("binlog_savepoint_set");
+
+#ifdef WITH_WSREP
+ if (wsrep_emulate_bin_log)
+ DBUG_RETURN(0);
+#endif /* WITH_WSREP */
char buf[1024];
String log_query(buf, sizeof(buf), &my_charset_bin);
@@ -2190,7 +2222,8 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
- if (unlikely(trans_has_updated_non_trans_table(thd) ||
+ if (IF_WSREP(!wsrep_emulate_bin_log, 1) &&
+ unlikely(trans_has_updated_non_trans_table(thd) ||
(thd->variables.option_bits & OPTION_KEEP_LOG)))
{
char buf[1024];
@@ -2204,7 +2237,10 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
TRUE, FALSE, TRUE, errcode);
DBUG_RETURN(mysql_bin_log.write(&qinfo));
}
- binlog_trans_log_truncate(thd, *(my_off_t*)sv);
+
+ if (IF_WSREP(!wsrep_emulate_bin_log, 1))
+ binlog_trans_log_truncate(thd, *(my_off_t*)sv);
+
DBUG_RETURN(0);
}
@@ -5332,7 +5368,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
is_transactional= 1;
/* Pre-conditions */
- DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+ DBUG_ASSERT(is_current_stmt_binlog_format_row());
+ DBUG_ASSERT(IF_WSREP(WSREP_EMULATE_BINLOG(this), 0) || mysql_bin_log.is_open());
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event
@@ -5465,7 +5502,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
- DBUG_ASSERT(mysql_bin_log.is_open());
+ DBUG_ASSERT(IF_WSREP(WSREP_EMULATE_BINLOG(thd),0) || mysql_bin_log.is_open());
DBUG_PRINT("enter", ("event: 0x%lx", (long) event));
int error= 0;
@@ -5791,7 +5828,8 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
mostly called if is_open() *was* true a few instructions before, but it
could have changed since.
*/
- if (likely(is_open()))
+ /* applier and replayer can skip writing binlog events */
+ if (IF_WSREP((WSREP_EMULATE_BINLOG(thd) && (thd->wsrep_exec_mode != REPL_RECV)) || is_open(), likely(is_open())))
{
my_off_t UNINIT_VAR(my_org_b_tell);
#ifdef HAVE_REPLICATION
@@ -6130,6 +6168,17 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::rotate");
+#ifdef WITH_WSREP
+ if (wsrep_to_isolation)
+ {
+ DBUG_ASSERT(WSREP_ON);
+ *check_purge= false;
+ WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d",
+ wsrep_to_isolation);
+ DBUG_RETURN(0);
+ }
+#endif
+
//todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log);
*check_purge= false;
@@ -6675,6 +6724,11 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
Ha_trx_info *ha_info;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+#ifdef WITH_WSREP
+ if (wsrep_emulate_bin_log)
+ DBUG_RETURN(0);
+#endif /* WITH_WSREP */
+
entry.thd= thd;
entry.cache_mngr= cache_mngr;
entry.error= 0;
@@ -6683,6 +6737,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
entry.using_trx_cache= using_trx_cache;
entry.need_unlog= false;
ha_info= all ? thd->transaction.all.ha_list : thd->transaction.stmt.ha_list;
+
for (; ha_info; ha_info= ha_info->next())
{
if (ha_info->is_started() && ha_info->ht() != binlog_hton &&
@@ -8819,7 +8874,10 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data();
if (!cache_mngr)
+ {
+ WSREP_DEBUG("Skipping empty log_xid: %s", thd->query());
DBUG_RETURN(0);
+ }
cache_mngr->using_xa= TRUE;
cache_mngr->xa_xid= xid;
@@ -9649,3 +9707,50 @@ maria_declare_plugin(binlog)
MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
}
maria_declare_plugin_end;
+
+#ifdef WITH_WSREP
+IO_CACHE * get_trans_log(THD * thd)
+{
+ binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
+ thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ return cache_mngr->get_binlog_cache_log(true);
+
+ WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id);
+ return NULL;
+}
+
+
+bool wsrep_trans_cache_is_empty(THD *thd)
+{
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ return (!cache_mngr || cache_mngr->trx_cache.empty());
+}
+
+
+void thd_binlog_trx_reset(THD * thd)
+{
+ /*
+ todo: fix autocommit select to not call the caller
+ */
+ if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ cache_mngr->reset(false, true);
+ }
+ thd->clear_binlog_table_maps();
+}
+
+
+void thd_binlog_rollback_stmt(THD * thd)
+{
+ WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
+}
+#endif /* WITH_WSREP */