summaryrefslogtreecommitdiff
path: root/sql/log.cc
diff options
context:
space:
mode:
authorSeppo Jaakola <seppo.jaakola@codership.com>2012-04-13 01:33:24 +0300
committerSeppo Jaakola <seppo.jaakola@codership.com>2012-04-13 01:33:24 +0300
commit2fc1ec43560b453b4694adbc1aac11f3f23b1761 (patch)
tree880c3875dae28574a90bf891ef901027723d21f6 /sql/log.cc
parent51c77ec5d406843bb8c8131f0687f4f75839d045 (diff)
downloadmariadb-git-2fc1ec43560b453b4694adbc1aac11f3f23b1761.tar.gz
Initial push of codership-wsrep API implementation for MariaDB.
Merge of: lp:maria/5.5, #3334: http://bazaar.launchpad.net/~maria-captains/maria/5.5/revision/3334 lp:codership-mysql/5.5, #3725: http://bazaar.launchpad.net/~codership/codership-mysql/wsrep-5.5/revision/3725
Diffstat (limited to 'sql/log.cc')
-rw-r--r--sql/log.cc240
1 files changed, 232 insertions, 8 deletions
diff --git a/sql/log.cc b/sql/log.cc
index 577297fa1a4..ee7d548d81a 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -51,6 +51,9 @@
#include "sql_plugin.h"
#include "rpl_handler.h"
+#ifdef WITH_WSREP
+#include "wsrep_mysqld.h"
+#endif /* WITH_WSREP */
#include "debug_sync.h"
/* max size of the log message */
@@ -486,6 +489,9 @@ private:
};
handlerton *binlog_hton;
+#ifdef WITH_WSREP
+extern handlerton *wsrep_hton;
+#endif
bool LOGGER::is_log_table_enabled(uint log_table_type)
{
@@ -500,6 +506,134 @@ bool LOGGER::is_log_table_enabled(uint log_table_type)
}
}
+#ifdef WITH_WSREP
+IO_CACHE * get_trans_log(THD * thd)
+{
+ binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*)
+ thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ {
+ return cache_mngr->get_binlog_cache_log(true);
+ }
+ else
+ {
+ WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id);
+ return NULL;
+ }
+}
+
+
+bool wsrep_trans_cache_is_empty(THD *thd)
+{
+ bool res= TRUE;
+
+ if (thd_sql_command((const THD*) thd) != SQLCOM_SELECT)
+ res= FALSE;
+ else
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr)
+ {
+ res= cache_mngr->trx_cache.empty();
+ }
+ }
+ return res;
+}
+
+void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end)
+{
+ thd->binlog_flush_pending_rows_event(stmt_end);
+}
+void thd_binlog_trx_reset(THD * thd)
+{
+ /*
+ todo: fix autocommit select to not call the caller
+ */
+ if (thd_get_ha_data(thd, binlog_hton) != NULL)
+ {
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr) cache_mngr->reset(TRUE, TRUE);
+ }
+ thd->clear_binlog_table_maps();
+}
+
+void thd_binlog_rollback_stmt(THD * thd)
+{
+ WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id);
+ binlog_cache_mngr *const cache_mngr=
+ (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+ if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF);
+}
+/*
+ Write the contents of a cache to memory buffer.
+
+ This function quite the same as MYSQL_BIN_LOG::write_cache(),
+ with the exception that here we write in buffer instead of log file.
+ */
+
+int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len)
+{
+
+ if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
+ return ER_ERROR_ON_WRITE;
+ uint length= my_b_bytes_in_cache(cache);
+ long long total_length = 0;
+ uchar *buf_ptr = NULL;
+
+ do
+ {
+ /* bail out if buffer grows too large
+ This is a temporary fix to avoid flooding replication
+ TODO: remove this check for 0.7.4 release
+ */
+ if (total_length > wsrep_max_ws_size)
+ {
+ WSREP_WARN("transaction size limit (%lld) exceeded: %lld",
+ wsrep_max_ws_size, total_length);
+ if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0))
+ {
+ WSREP_WARN("failed to initialize io-cache");
+ }
+ if (buf_ptr) my_free(*buf);
+ *buf_len = 0;
+ return ER_ERROR_ON_WRITE;
+ }
+ if (total_length > 0)
+ {
+ *buf_len += length;
+ *buf = (uchar *)my_realloc(*buf, total_length+length, MYF(0));
+ if (!*buf)
+ {
+ WSREP_ERROR("io cache write problem: %d %d", *buf_len, length);
+ return ER_ERROR_ON_WRITE;
+ }
+ buf_ptr = *buf+total_length;
+ }
+ else
+ {
+ if (buf_ptr != NULL)
+ {
+ WSREP_ERROR("io cache alloc error: %d %d", *buf_len, length);
+ my_free(*buf);
+ }
+ if (length > 0)
+ {
+ *buf = (uchar *) my_malloc(length, MYF(0));
+ buf_ptr = *buf;
+ *buf_len = length;
+ }
+ }
+ total_length += length;
+
+ memcpy(buf_ptr, cache->read_pos, length);
+ cache->read_pos=cache->read_end;
+ } while ((cache->file >= 0) && (length= my_b_fill(cache)));
+
+ return 0;
+}
+#endif
/* Check if a given table is opened log table */
int check_if_log_table(size_t db_len, const char *db, size_t table_name_len,
@@ -1536,7 +1670,11 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos)
thd->binlog_setup_trx_data();
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ DBUG_ASSERT((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open());
+#else
DBUG_ASSERT(mysql_bin_log.is_open());
+#endif
*pos= cache_mngr->trx_cache.get_byte_position();
DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos));
DBUG_VOID_RETURN;
@@ -1584,7 +1722,16 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos)
int binlog_init(void *p)
{
binlog_hton= (handlerton *)p;
+#ifdef WITH_WSREP
+ if (WSREP_ON)
+ binlog_hton->state= SHOW_OPTION_YES;
+ else
+ {
+#endif /* WITH_WSREP */
binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
+#ifdef WITH_WSREP
+ }
+#endif /* WITH_WSREP */
binlog_hton->db_type=DB_TYPE_BINLOG;
binlog_hton->savepoint_offset= sizeof(my_off_t);
binlog_hton->close_connection= binlog_close_connection;
@@ -1840,6 +1987,9 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
DBUG_ENTER("binlog_commit");
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ if (!cache_mngr) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
DBUG_PRINT("debug",
("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
@@ -1896,6 +2046,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
int error= 0;
binlog_cache_mngr *const cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
+#ifdef WITH_WSREP
+ if (!cache_mngr) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s",
YESNO(all),
@@ -1924,8 +2077,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
cache_mngr->reset(false, true);
DBUG_RETURN(error);
}
-
+#ifdef WITH_WSREP
+ if (!wsrep_emulate_bin_log &&
+ mysql_bin_log.check_write_error(thd))
+#else
if (mysql_bin_log.check_write_error(thd))
+#endif
{
/*
"all == true" means that a "rollback statement" triggered the error and
@@ -1955,12 +2112,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
if (ending_trans(thd, all) &&
((thd->variables.option_bits & OPTION_KEEP_LOG) ||
(trans_has_updated_non_trans_table(thd) &&
- thd->variables.binlog_format == BINLOG_FORMAT_STMT) ||
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) ||
(cache_mngr->trx_cache.changes_to_non_trans_temp_table() &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED) ||
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) ||
(trans_has_updated_non_trans_table(thd) &&
ending_single_stmt_trans(thd,all) &&
- thd->variables.binlog_format == BINLOG_FORMAT_MIXED)))
+ WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED)))
error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr);
/*
Truncate the cache if:
@@ -1974,9 +2131,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
else if (ending_trans(thd, all) ||
(!(thd->variables.option_bits & OPTION_KEEP_LOG) &&
(!stmt_has_updated_non_trans_table(thd) ||
- thd->variables.binlog_format != BINLOG_FORMAT_STMT) &&
+ WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) &&
(!cache_mngr->trx_cache.changes_to_non_trans_temp_table() ||
- thd->variables.binlog_format != BINLOG_FORMAT_MIXED)))
+ WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED)))
error= binlog_truncate_trx_cache(thd, cache_mngr, all);
}
@@ -2070,7 +2227,9 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv)
binlog_trans_log_savepos(thd, (my_off_t*) sv);
/* Write it to the binary log */
-
+#ifdef WITH_WSREP
+ if (wsrep_emulate_bin_log) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
String log_query;
if (log_query.append(STRING_WITH_LEN("SAVEPOINT ")) ||
log_query.append("`") ||
@@ -2092,7 +2251,12 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv)
non-transactional table. Otherwise, truncate the binlog cache starting
from the SAVEPOINT command.
*/
+#ifdef WITH_WSREP
+ if (!wsrep_emulate_bin_log &&
+ unlikely(trans_has_updated_non_trans_table(thd) ||
+#else
if (unlikely(trans_has_updated_non_trans_table(thd) ||
+#endif
(thd->variables.option_bits & OPTION_KEEP_LOG)))
{
String log_query;
@@ -4720,6 +4884,7 @@ int THD::binlog_setup_trx_data()
DBUG_RETURN(0);
}
+
/*
Function to start a statement and optionally a transaction for the
binary log.
@@ -4839,7 +5004,12 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional,
table->s->table_map_id));
/* Pre-conditions */
+#ifdef WITH_WSREP
+ DBUG_ASSERT(is_current_stmt_binlog_format_row() &&
+ (WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open()));
+#else
DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open());
+#endif
DBUG_ASSERT(table->s->table_map_id != ULONG_MAX);
Table_map_log_event
@@ -4976,7 +5146,11 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
bool is_transactional)
{
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
+#ifdef WITH_WSREP
+ DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
+#else
DBUG_ASSERT(mysql_bin_log.is_open());
+#endif
DBUG_PRINT("enter", ("event: 0x%lx", (long) event));
int error= 0;
@@ -5056,7 +5230,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
mostly called if is_open() *was* true a few instructions before, but it
could have changed since.
*/
+#ifdef WITH_WSREP
+ if ((WSREP(thd) && wsrep_emulate_bin_log) || is_open())
+#else
if (likely(is_open()))
+#endif
{
my_off_t UNINIT_VAR(my_org_b_tell);
#ifdef HAVE_REPLICATION
@@ -5237,6 +5415,35 @@ err:
}
}
+#ifdef WITH_WSREP
+ if (WSREP(thd) && wsrep_incremental_data_collection &&
+ (wsrep_emulate_bin_log || mysql_bin_log.is_open()))
+ {
+ DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1);
+ if (!error)
+ {
+ IO_CACHE* cache= get_trans_log(thd);
+ uchar* buf= NULL;
+ uint buf_len= 0;
+
+ if (wsrep_emulate_bin_log)
+ thd->binlog_flush_pending_rows_event(false);
+ error= wsrep_write_cache(cache, &buf, &buf_len);
+ if (!error && buf_len > 0)
+ {
+ wsrep_status_t rc= wsrep->append_data(wsrep,
+ &thd->wsrep_trx_handle,
+ buf, buf_len);
+ if (rc != WSREP_OK)
+ {
+ sql_print_warning("WSREP: append_data() returned %d", rc);
+ error= 1;
+ }
+ }
+ if (buf_len) my_free(buf);
+ }
+ }
+#endif /* WITH_WSREP */
DBUG_RETURN(error);
}
@@ -5329,6 +5536,14 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
{
int error= 0;
DBUG_ENTER("MYSQL_BIN_LOG::rotate");
+#ifdef WITH_WSREP
+ if (WSREP_ON && wsrep_to_isolation)
+ {
+ WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d",
+ wsrep_to_isolation);
+ DBUG_RETURN(0);
+ }
+#endif
//todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log);
*check_purge= false;
@@ -5797,6 +6012,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
group_commit_entry entry;
DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog");
+#ifdef WITH_WSREP
+ if (wsrep_emulate_bin_log) DBUG_RETURN(0);
+#endif /* WITH_WSREP */
entry.thd= thd;
entry.cache_mngr= cache_mngr;
entry.error= 0;
@@ -7426,7 +7644,13 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all,
binlog_cache_mngr *cache_mngr=
(binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton);
-
+#ifdef WITH_WSREP
+ if (!cache_mngr)
+ {
+ WSREP_DEBUG("Skipping empty log_xid: %s", thd->query());
+ DBUG_RETURN(1);
+ }
+#endif /* WITH_WSREP */
cache_mngr->using_xa= TRUE;
cache_mngr->xa_xid= xid;
err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid);