diff options
author | Seppo Jaakola <seppo.jaakola@codership.com> | 2012-04-13 01:33:24 +0300 |
---|---|---|
committer | Seppo Jaakola <seppo.jaakola@codership.com> | 2012-04-13 01:33:24 +0300 |
commit | 2fc1ec43560b453b4694adbc1aac11f3f23b1761 (patch) | |
tree | 880c3875dae28574a90bf891ef901027723d21f6 /sql/log.cc | |
parent | 51c77ec5d406843bb8c8131f0687f4f75839d045 (diff) | |
download | mariadb-git-2fc1ec43560b453b4694adbc1aac11f3f23b1761.tar.gz |
Initial push of codership-wsrep API implementation for MariaDB.
Merge of:
lp:maria/5.5, #3334: http://bazaar.launchpad.net/~maria-captains/maria/5.5/revision/3334
lp:codership-mysql/5.5, #3725: http://bazaar.launchpad.net/~codership/codership-mysql/wsrep-5.5/revision/3725
Diffstat (limited to 'sql/log.cc')
-rw-r--r-- | sql/log.cc | 240 |
1 files changed, 232 insertions, 8 deletions
diff --git a/sql/log.cc b/sql/log.cc index 577297fa1a4..ee7d548d81a 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -51,6 +51,9 @@ #include "sql_plugin.h" #include "rpl_handler.h" +#ifdef WITH_WSREP +#include "wsrep_mysqld.h" +#endif /* WITH_WSREP */ #include "debug_sync.h" /* max size of the log message */ @@ -486,6 +489,9 @@ private: }; handlerton *binlog_hton; +#ifdef WITH_WSREP +extern handlerton *wsrep_hton; +#endif bool LOGGER::is_log_table_enabled(uint log_table_type) { @@ -500,6 +506,134 @@ bool LOGGER::is_log_table_enabled(uint log_table_type) } } +#ifdef WITH_WSREP +IO_CACHE * get_trans_log(THD * thd) +{ + binlog_cache_mngr *cache_mngr = (binlog_cache_mngr*) + thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + { + return cache_mngr->get_binlog_cache_log(true); + } + else + { + WSREP_DEBUG("binlog cache not initialized, conn :%ld", thd->thread_id); + return NULL; + } +} + + +bool wsrep_trans_cache_is_empty(THD *thd) +{ + bool res= TRUE; + + if (thd_sql_command((const THD*) thd) != SQLCOM_SELECT) + res= FALSE; + else + { + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) + { + res= cache_mngr->trx_cache.empty(); + } + } + return res; +} + +void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end) +{ + thd->binlog_flush_pending_rows_event(stmt_end); +} +void thd_binlog_trx_reset(THD * thd) +{ + /* + todo: fix autocommit select to not call the caller + */ + if (thd_get_ha_data(thd, binlog_hton) != NULL) + { + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) cache_mngr->reset(TRUE, TRUE); + } + thd->clear_binlog_table_maps(); +} + +void thd_binlog_rollback_stmt(THD * thd) +{ + WSREP_DEBUG("thd_binlog_rollback_stmt :%ld", thd->thread_id); + binlog_cache_mngr *const cache_mngr= + (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); + if (cache_mngr) cache_mngr->trx_cache.set_prev_position(MY_OFF_T_UNDEF); +} +/* + Write the contents of a cache to memory buffer. + + This function quite the same as MYSQL_BIN_LOG::write_cache(), + with the exception that here we write in buffer instead of log file. + */ + +int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) +{ + + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + return ER_ERROR_ON_WRITE; + uint length= my_b_bytes_in_cache(cache); + long long total_length = 0; + uchar *buf_ptr = NULL; + + do + { + /* bail out if buffer grows too large + This is a temporary fix to avoid flooding replication + TODO: remove this check for 0.7.4 release + */ + if (total_length > wsrep_max_ws_size) + { + WSREP_WARN("transaction size limit (%lld) exceeded: %lld", + wsrep_max_ws_size, total_length); + if (reinit_io_cache(cache, WRITE_CACHE, 0, 0, 0)) + { + WSREP_WARN("failed to initialize io-cache"); + } + if (buf_ptr) my_free(*buf); + *buf_len = 0; + return ER_ERROR_ON_WRITE; + } + if (total_length > 0) + { + *buf_len += length; + *buf = (uchar *)my_realloc(*buf, total_length+length, MYF(0)); + if (!*buf) + { + WSREP_ERROR("io cache write problem: %d %d", *buf_len, length); + return ER_ERROR_ON_WRITE; + } + buf_ptr = *buf+total_length; + } + else + { + if (buf_ptr != NULL) + { + WSREP_ERROR("io cache alloc error: %d %d", *buf_len, length); + my_free(*buf); + } + if (length > 0) + { + *buf = (uchar *) my_malloc(length, MYF(0)); + buf_ptr = *buf; + *buf_len = length; + } + } + total_length += length; + + memcpy(buf_ptr, cache->read_pos, length); + cache->read_pos=cache->read_end; + } while ((cache->file >= 0) && (length= my_b_fill(cache))); + + return 0; +} +#endif /* Check if a given table is opened log table */ int check_if_log_table(size_t db_len, const char *db, size_t table_name_len, @@ -1536,7 +1670,11 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) thd->binlog_setup_trx_data(); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + DBUG_ASSERT((WSREP(thd) && wsrep_emulate_bin_log) || mysql_bin_log.is_open()); +#else DBUG_ASSERT(mysql_bin_log.is_open()); +#endif *pos= cache_mngr->trx_cache.get_byte_position(); DBUG_PRINT("return", ("*pos: %lu", (ulong) *pos)); DBUG_VOID_RETURN; @@ -1584,7 +1722,16 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) int binlog_init(void *p) { binlog_hton= (handlerton *)p; +#ifdef WITH_WSREP + if (WSREP_ON) + binlog_hton->state= SHOW_OPTION_YES; + else + { +#endif /* WITH_WSREP */ binlog_hton->state=opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO; +#ifdef WITH_WSREP + } +#endif /* WITH_WSREP */ binlog_hton->db_type=DB_TYPE_BINLOG; binlog_hton->savepoint_offset= sizeof(my_off_t); binlog_hton->close_connection= binlog_close_connection; @@ -1840,6 +1987,9 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all) DBUG_ENTER("binlog_commit"); binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + if (!cache_mngr) DBUG_RETURN(0); +#endif /* WITH_WSREP */ DBUG_PRINT("debug", ("all: %d, in_transaction: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", @@ -1896,6 +2046,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) int error= 0; binlog_cache_mngr *const cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); +#ifdef WITH_WSREP + if (!cache_mngr) DBUG_RETURN(0); +#endif /* WITH_WSREP */ DBUG_PRINT("debug", ("all: %s, all.modified_non_trans_table: %s, stmt.modified_non_trans_table: %s", YESNO(all), @@ -1924,8 +2077,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) cache_mngr->reset(false, true); DBUG_RETURN(error); } - +#ifdef WITH_WSREP + if (!wsrep_emulate_bin_log && + mysql_bin_log.check_write_error(thd)) +#else if (mysql_bin_log.check_write_error(thd)) +#endif { /* "all == true" means that a "rollback statement" triggered the error and @@ -1955,12 +2112,12 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) if (ending_trans(thd, all) && ((thd->variables.option_bits & OPTION_KEEP_LOG) || (trans_has_updated_non_trans_table(thd) && - thd->variables.binlog_format == BINLOG_FORMAT_STMT) || + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_STMT) || (cache_mngr->trx_cache.changes_to_non_trans_temp_table() && - thd->variables.binlog_format == BINLOG_FORMAT_MIXED) || + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED) || (trans_has_updated_non_trans_table(thd) && ending_single_stmt_trans(thd,all) && - thd->variables.binlog_format == BINLOG_FORMAT_MIXED))) + WSREP_FORMAT(thd->variables.binlog_format) == BINLOG_FORMAT_MIXED))) error= binlog_rollback_flush_trx_cache(thd, all, cache_mngr); /* Truncate the cache if: @@ -1974,9 +2131,9 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) else if (ending_trans(thd, all) || (!(thd->variables.option_bits & OPTION_KEEP_LOG) && (!stmt_has_updated_non_trans_table(thd) || - thd->variables.binlog_format != BINLOG_FORMAT_STMT) && + WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_STMT) && (!cache_mngr->trx_cache.changes_to_non_trans_temp_table() || - thd->variables.binlog_format != BINLOG_FORMAT_MIXED))) + WSREP_FORMAT(thd->variables.binlog_format) != BINLOG_FORMAT_MIXED))) error= binlog_truncate_trx_cache(thd, cache_mngr, all); } @@ -2070,7 +2227,9 @@ static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) binlog_trans_log_savepos(thd, (my_off_t*) sv); /* Write it to the binary log */ - +#ifdef WITH_WSREP + if (wsrep_emulate_bin_log) DBUG_RETURN(0); +#endif /* WITH_WSREP */ String log_query; if (log_query.append(STRING_WITH_LEN("SAVEPOINT ")) || log_query.append("`") || @@ -2092,7 +2251,12 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) non-transactional table. Otherwise, truncate the binlog cache starting from the SAVEPOINT command. */ +#ifdef WITH_WSREP + if (!wsrep_emulate_bin_log && + unlikely(trans_has_updated_non_trans_table(thd) || +#else if (unlikely(trans_has_updated_non_trans_table(thd) || +#endif (thd->variables.option_bits & OPTION_KEEP_LOG))) { String log_query; @@ -4720,6 +4884,7 @@ int THD::binlog_setup_trx_data() DBUG_RETURN(0); } + /* Function to start a statement and optionally a transaction for the binary log. @@ -4839,7 +5004,12 @@ int THD::binlog_write_table_map(TABLE *table, bool is_transactional, table->s->table_map_id)); /* Pre-conditions */ +#ifdef WITH_WSREP + DBUG_ASSERT(is_current_stmt_binlog_format_row() && + (WSREP_EMULATE_BINLOG(this) || mysql_bin_log.is_open())); +#else DBUG_ASSERT(is_current_stmt_binlog_format_row() && mysql_bin_log.is_open()); +#endif DBUG_ASSERT(table->s->table_map_id != ULONG_MAX); Table_map_log_event @@ -4976,7 +5146,11 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, bool is_transactional) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); +#ifdef WITH_WSREP + DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()); +#else DBUG_ASSERT(mysql_bin_log.is_open()); +#endif DBUG_PRINT("enter", ("event: 0x%lx", (long) event)); int error= 0; @@ -5056,7 +5230,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) mostly called if is_open() *was* true a few instructions before, but it could have changed since. */ +#ifdef WITH_WSREP + if ((WSREP(thd) && wsrep_emulate_bin_log) || is_open()) +#else if (likely(is_open())) +#endif { my_off_t UNINIT_VAR(my_org_b_tell); #ifdef HAVE_REPLICATION @@ -5237,6 +5415,35 @@ err: } } +#ifdef WITH_WSREP + if (WSREP(thd) && wsrep_incremental_data_collection && + (wsrep_emulate_bin_log || mysql_bin_log.is_open())) + { + DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1); + if (!error) + { + IO_CACHE* cache= get_trans_log(thd); + uchar* buf= NULL; + uint buf_len= 0; + + if (wsrep_emulate_bin_log) + thd->binlog_flush_pending_rows_event(false); + error= wsrep_write_cache(cache, &buf, &buf_len); + if (!error && buf_len > 0) + { + wsrep_status_t rc= wsrep->append_data(wsrep, + &thd->wsrep_trx_handle, + buf, buf_len); + if (rc != WSREP_OK) + { + sql_print_warning("WSREP: append_data() returned %d", rc); + error= 1; + } + } + if (buf_len) my_free(buf); + } + } +#endif /* WITH_WSREP */ DBUG_RETURN(error); } @@ -5329,6 +5536,14 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge) { int error= 0; DBUG_ENTER("MYSQL_BIN_LOG::rotate"); +#ifdef WITH_WSREP + if (WSREP_ON && wsrep_to_isolation) + { + WSREP_DEBUG("avoiding binlog rotate due to TO isolation: %d", + wsrep_to_isolation); + DBUG_RETURN(0); + } +#endif //todo: fix the macro def and restore safe_mutex_assert_owner(&LOCK_log); *check_purge= false; @@ -5797,6 +6012,9 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, group_commit_entry entry; DBUG_ENTER("MYSQL_BIN_LOG::write_transaction_to_binlog"); +#ifdef WITH_WSREP + if (wsrep_emulate_bin_log) DBUG_RETURN(0); +#endif /* WITH_WSREP */ entry.thd= thd; entry.cache_mngr= cache_mngr; entry.error= 0; @@ -7426,7 +7644,13 @@ TC_LOG_BINLOG::log_and_order(THD *thd, my_xid xid, bool all, binlog_cache_mngr *cache_mngr= (binlog_cache_mngr*) thd_get_ha_data(thd, binlog_hton); - +#ifdef WITH_WSREP + if (!cache_mngr) + { + WSREP_DEBUG("Skipping empty log_xid: %s", thd->query()); + DBUG_RETURN(1); + } +#endif /* WITH_WSREP */ cache_mngr->using_xa= TRUE; cache_mngr->xa_xid= xid; err= binlog_commit_flush_xid_caches(thd, cache_mngr, all, xid); |